前言
Sunday night,继续超短文模式(希望下周就可以不这么划水了hhhh
Kafka是我们日常的流处理任务中最为常用的数据源之一。随着数据类型和数据量的增大,难免要增加新的Kafka topic,或者为已有的topic增加更多partition。那么,Kafka后面作为消费者的实时处理引擎是如何感知到topic和partition变化的呢?本文以Spark Streaming和Flink为例来简单探究一下。
Spark Streaming的场合
根据官方文档(如上图),spark-streaming-kafka-0-10才支持Kafka的动态感知(即Dynamic Topic Subscription),翻翻源码,来到o.a.s.streaming.kafka010.DirectKafkaInputDStream类,每个微批次都会调用的compute()方法的第一行。
val untilOffsets = clamp(latestOffsets())
顾名思义,clamp()方法用来限制数据的流量,这里不提。而latestOffsets()方法返回各个partition当前最近的offset值,其具体实现如下(包含它调用的paranoidPoll()方法)。
/*** Returns the latest (highest) available offsets, taking new partitions into account.*/
protected def latestOffsets(): Map[TopicPartition, Long] = {val c = consumerparanoidPoll(c)val parts = c.assignment().asScala// make sure new partitions are reflected in currentOffsetsval newPartitions = parts.diff(currentOffsets.keySet)// Check if there's any partition been revoked because of consumer rebalance.val revokedPartitions = currentOffsets.keySet.diff(parts)if (revokedPartitions.nonEmpty) {throw new IllegalStateException(s"Previously tracked partitions " +s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " +s"rebalance. This is mostly due to another stream with same group id joined, " +s"please check if there're different streaming application misconfigure to use same " +s"group id. Fundamentally different stream should use different group id")}// position for new partitions determined by auto.offset.reset if no commitcurrentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap// find latest available offsetsc.seekToEnd(currentOffsets.keySet.asJava)parts.map(tp => tp -> c.position(tp)).toMap
}/*** The concern here is that poll might consume messages despite being paused,* which would throw off consumer position. Fix position if this happens.*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {// don't actually want to consume any messages, so pause all partitionsc.pause(c.assignment())val msgs = c.poll(0)if (!msgs.isEmpty) {// position should be minimum offset per topicpartitionmsgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>val tp = new TopicPartition(m.topic, m.partition)val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)acc + (tp -> off)}.foreach { case (tp, off) =>logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")c.seek(tp, off)}}
}
可见,在每次compute()方法执行时,都会通过paranoidPoll()方法来seek到每个TopicPartition对应的offset位置,并且通过latestOffsets()方法找出那些新加入的partition,并维护它们的offset,实现了动态感知。
由上也可以看出,Spark Streaming无法处理Kafka Consumer的Rebalance(之前讲过),所以一定要为不同的Streaming App设置不同的group.id。
Flink的场合
根据官方文档(如上图),Flink是支持Topic/Partition Discovery的,但是默认并未开启,需要手动配置flink.partition-discovery.interval-millis
参数,即动态感知新topic/partition的间隔,单位毫秒。
Flink Kafka Source的基类时o.a.f.streaming.connectors.kafka.FlinkKafkaConsumerBase抽象类,在其run()方法中,会先创建获取数据的KafkaFetcher,再判断是否启用了动态感知。
this.kafkaFetcher = createFetcher(sourceContext,subscribedPartitionsToStartOffsets,watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(),offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);if (!running) {return;
}// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
// 1) New state - partition discovery loop executed as separate thread, with this
// thread running the main fetcher loop
// 2) Old state - partition discovery is disabled and only the main fetcher loop is executed
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {kafkaFetcher.runFetchLoop();
} else {runWithPartitionDiscovery();
}
如果启用了,最终会调用createAndStartDiscoveryLoop()方法,启动一个单独的线程,负责以discoveryIntervalMillis为周期发现新的topic/partition,并传递给KafkaFetcher。
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {discoveryLoopThread = new Thread(() -> {try {// --------------------- partition discovery loop ---------------------// throughout the loop, we always eagerly check if we are still running before// performing the next operation, so that we can escape the loop as soon as possiblewhile (running) {if (LOG.isDebugEnabled()) {LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());}final List<KafkaTopicPartition> discoveredPartitions;try {discoveredPartitions = partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {// the partition discoverer may have been closed or woken up before or during the discovery;// this would only happen if the consumer was canceled; simply escape the loopbreak;}// no need to add the discovered partitions if we were closed during the meantimeif (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}// do not waste any time sleeping if we're not running anymoreif (running && discoveryIntervalMillis != 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {// may be interrupted if the consumer was canceled midway; simply escape the loopbreak;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start();
}
可见,Flink通过名为PartitionDiscoverer的组件来实现动态感知。上面的代码中调用了discoverPartitions()方法,其源码如下。
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {if (!closed && !wakeup) {try {List<KafkaTopicPartition> newDiscoveredPartitions;// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic patternif (topicsDescriptor.isFixedTopics()) {newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());} else {List<String> matchedTopics = getAllTopics();// retain topics that match the patternIterator<String> iter = matchedTopics.iterator();while (iter.hasNext()) {if (!topicsDescriptor.isMatchingTopic(iter.next())) {iter.remove();}}if (matchedTopics.size() != 0) {// get partitions only for matched topicsnewDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);} else {newDiscoveredPartitions = null;}}// (2) eliminate partition that are old partitions or should not be subscribed by this subtaskif (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);} else {Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();KafkaTopicPartition nextPartition;while (iter.hasNext()) {nextPartition = iter.next();if (!setAndCheckDiscoveredPartition(nextPartition)) {iter.remove();}}}return newDiscoveredPartitions;} catch (WakeupException e) {// the actual topic / partition metadata fetching methods// may be woken up midway; reset the wakeup flag and rethrowwakeup = false;throw e;}} else if (!closed && wakeup) {// may have been woken up before the method callwakeup = false;throw new WakeupException();} else {throw new ClosedException();}
首先,会根据传入的是单个固定的topic还是由正则表达式指定的多个topics来分别处理,最终都调用getAllPartitionsForTopics()方法来获取这些topic的所有partition(这个方法由抽象类AbstractPartitionDiscoverer的各个子类实现,很简单)。然后会遍历这些partition,并调用setAndCheckDiscoveredPartition()方法来检查之前是否消费过它们,如果是,则移除之,保证方法返回的是新加入的partition。
The End
明天早起搬砖,民那晚安晚安。