问题描述
我正在使用flink从kafka读取并写入redis。
为了测试,我只想阅读来自kafka的前10条消息。
因此我使用计数器并在counter = 10
时尝试停止消费者
AtomicInteger counter = new AtomicInteger(0);
FlinkKafkaConsumer08<String> kafkaConsumer =
new FlinkKafkaConsumer08<>("my topic",
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
// It should only read 10 kafka message
return counter.getAndIncrement() > 9;
}
},
properties);
但我在redis中得到30条消息:
llen rtp:example
(integer) 30
当我将条件更改为counter.getAndIncrement() > 8
,它会将27条消息写入redis。
总是三倍。
完整代码:
public class FlinkEntry {
private final static JedisCluster JEDIS_CLUSTER;
static {
Set<HostAndPort> hostAndPorts = new HashSet<>();
hostAndPorts.add(new HostAndPort("localhost", 7001));
JEDIS_CLUSTER = new JedisCluster(hostAndPorts);
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> kafkaConsumer = createKafkaConsumer();
DataStream<String> dataStream = environment.addSource(kafkaConsumer);
SinkFunction<String> redisSink = createRedisSink();
dataStream.addSink(redisSink);
environment.execute();
}
private static FlinkKafkaConsumer08<String> createKafkaConsumer() {
Properties properties = new Properties();
//... set kafka property
AtomicInteger counter = new AtomicInteger(0);
FlinkKafkaConsumer08<String> kafkaConsumer =
new FlinkKafkaConsumer08<>("my topic",
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
// It should only read 10 kafka message
return counter.getAndIncrement() > 9;
}
},
properties);
kafkaConsumer.setStartFromLatest();
return kafkaConsumer;
}
private static SinkFunction<String> createRedisSink() {
return new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) {
JEDIS_CLUSTER.lpush("rtp:example", value);
JEDIS_CLUSTER.expire("rtp:example", 10 * 60);
}
};
}
}
1楼
理解这一点的一种方法是通过调用禁用操作员链接
env.disableOperatorChaining();
然后查看一些指标 - 例如,源处的numRecordsOut和接收器处的numRecordsIn。 我还要仔细检查整个作业是否正在运行并将并行性设置为1。
您需要禁用链接,否则整个作业将折叠为单个任务,并且不会为两个运算符之间的通信收集任何指标。