前言
在之前的一篇文章中,笔者介绍了Kafka Consumer Group(消费者组)以及Rebalance(重平衡)的概念:
为了使得Consumer易于组织、可扩展以及更好地容错,Kafka将一个或多个Consumer组织为Consumer Group,即消费者组。Consumer Group的唯一标识就是
group.id
。Group内的所有Consumer共同消费已订阅的各个Topic的所有Partition,并且保证每个Partition只分配给该Group内的唯一一个Consumer……
Rebalance就是一个Consumer Group内的所有Consumer分配消费已订阅的Topic的各Partition的过程。
问题来了:Partition是按照什么规则分配给Group中的各个Consumer的呢?本文就来简单讲解一下Kafka的Partition分配策略,可以通过partition.asssignment.strategy
参数进行配置。目前一共有3种:
- org.apache.kafka.clients.consumer.RangeAssignor
- org.apache.kafka.clients.consumer.RoundRobinAssignor
- org.apache.kafka.clients.consumer.StickyAssignor
当然也支持自定义分配策略。下面逐个进行讲解。
RangeAssignor
RangeAssignor是默认的分配策略。
对于每个被订阅的Topic,设Consumer总数为c,Partition总数为p,那么RangeAssignor会根据p / c
的结果得出一个区间值r,以及余数值p % c
,记为m。将所有Consumer按照预设好的Member ID字典序排序,从第一个Consumer开始顺序分配,前m个Consumer分配连续的(r + 1)个Partition,后(c - m)个Consumer分配连续的r个Partition。也就是说,如果不能均分,那么排在前面的Consumer会被多分配1个Partition。
可见,这种策略是按照范围来尽量平均分配Partition的,所以得名RangeAssignor。下面举一个例子,如图所示。
图中有1个Topic,8个Partition,3个Consumer,最终分配的比例为3:3:2,大致是均匀的。
如果我们再多加两个Topic,每个Topic有2个Partition,分配结果又是如何呢?答案如图所示。
显然,c2完全没有分配到t2中的Partition,因为c0和c1按照字典序排在它前面,已经被优先分配到了。如果推广上述情况,很容易发现该策略无法保证平均分配,造成负载倾斜。当订阅了多个Topic时,尤其需要注意RangeAssignor的适用性。
RoundRobinAssignor
"round-robin"一词的含义是轮询。RoundRobinAssignor策略仍然会将所有Consumer按照预设好的Member ID字典序排序,同时也会将所有Topic中的所有Partition按照字典序排序(注意这点不同),再轮询进行分配。单纯文字描述可能不容易理解,下面还是用单个Topic的情况举例。
多个Topic的情况也类似,按列观察图下方的分配列表就可以看出轮询的风格。
由于分配时是按所有Partition来的,所以即使Topic之间Partition的数量是不平均的,分配结果也是基本平均的,克服了RangeAssignor的缺点。
但是,RoundRobinAssignor也并非十全十美的。由于一个Group内的Consumer可以订阅不同的Topic,所以仍然可能会产生倾斜。考虑一种情况:Topic t0、t1、t2分别有1、2、3个Partition,而Consumer c0订阅了t0,c1订阅了t0~t1,c2订阅了t0~t2,那么分配结果会如下图所示,显然不是最优的。
StickyAssignor
StickyAssignor是在Kafka 0.11版本引入的,它的实现比RangeAssignor和RoundRobinAssignor都要复杂得多(代码算上注释有将近1k行)。按照文档的描述,它的设计目的有二:
- First, it guarantees an assignment that is as balanced as possible——Partition的分配尽量平均,这也是前两种分配策略努力要达到的目标;
- Second, it preserved as many existing assignment as possible when a reassignment occurs——当Partition重新分配时,能够尽量保留上一次的分配,即尽量少将已经分配了的Partition分配给其他的Consumer,这也就是"sticky"(粘性)一词的含义。
如果两者发生冲突,则优先保证平均分配。StickyAssignor的流程也比较晦涩难懂,主流程位于其assign()方法中,逻辑简述如下,看官将就读读,也可以自行参考源码。
- 获取当前的分配方案currentAssignment。如果为空,说明是初次分配。
- 获取每个Partition可能分配给的Consumer的映射表,记为partition2AllPotentialConsumers;获取每个Consumer可能分配到的Partition列表,记为consumer2AllPotentialPartitions。
- 遍历各Consumer,如果Consumer还没有任何分配,将其加入currentAssignment集合,但对应的Partition列表仍为空。
- 创建currentPartitionConsumer映射表,该映射表用来记录当前哪个Partition已经分配给了哪个Consumer。
- 调用sortPartitions()方法对所有Partition进行排序,返回结果为sortedPartitions,分为两种情况:
- 如果不是初次分配,并且每个Consumer订阅的Topic是相同的,那么就将Consumer按照已分配的Partition数量从高到低排序,将这些Partition按轮询的方式插入sortedPartitions,再将未被分配的Partition插入sortedPartitions;
- 否则,就将Partition按照可能分配给的Consumer数量(即partition2AllPotentialConsumers)从低到高排序,将这些Partition插入sortedPartitions。
- 遍历currentAssignment,将已经分配了的Partition从sortedPartitions中移除,剩下的就是需要分配的Partition,记为unassignedPartitions。
- 调用balance()方法对currentAssignment进行平衡(当然也包含初次分配),几个要点如下:
- 遍历unassignedPartitions,将未分配的Partition依次分配给订阅了对应Topic且拥有Partition最少的Consumer;
- 通过partition2AllPotentialConsumers检查一个Partition是否可以被多于一个Consumer消费。如是,说明此Partition可以被重分配;
- 检查Consumer已分配的Partition数是否超过了允许分配的最大数量(即consumer2AllPotentialPartitions的size),并且它分配的Partition是否可以被多于一个Consumer消费。如果以上两个条件都为否,说明该Consumer可以参与重分配,否则将该Consumer排除出去;
- 根据平衡分数(balance score)进行实际的重分配动作。平衡分数定义为每两个Consumer之间Partition数差值的绝对值,每分配一个Partition都会更新此分数。当达到绝对均衡或者所有Partition都已经被分配时,流程结束。
按照上述流程来解决上一节RoundRobinAssignor倾斜的问题,分配结果如下图所示。
这种分配方式虽然不均衡,但已经是最优的。现假设Consumer c0下线,分配结果就会变成:
这样就保留了5个Partition的原始分配,只需要将t0/p0重新分配给c1即可,并且是均匀的。如果按照RoundRobinAssignor的逻辑做重分配,则是如下图所示。
这样只会保留3个Partition的原始分配,并且分配是不均匀的,可见StickyAssignor确实是更优的策略。
自定义分配策略
自定义分配策略可以通过继承PartitionAssignor接口或者AbstractPartitionAssignor抽象类来实现,后者相对比较简单。下面给出一个在Consumer之间随机分配Partition的示例,部分代码复用了RangeAssignor的实现,简单易懂。
public class RandomAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "random";}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet()) {assignment.put(memberId, new ArrayList<TopicPartition>());}for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;for (TopicPartition partition : partitions(topic, numPartitionsForTopic)) {int rand = ThreadLocalRandom.current().nextInt(consumersForTopic.size());assignment.get(consumersForTopic.get(rand)).add(partition);}}return assignment;}private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}
}
注意RandomAssignor基本上不能用于生产环境,只是个示例而已。
The End
明天早起搬砖,民那晚安晚安。