詳解kafka中的消息分區(qū)分配算法
背景
kafka有分區(qū)機制,一個主題topic在創(chuàng)建的時候,會設置分區(qū)。如果只有一個分區(qū),那所有的消費者都訂閱的是這一個分區(qū)消息;如果有多個分區(qū)的話,那消費者之間又是如何分配的呢?
分配算法
RangeAssignor
定義
Kafka默認采?RangeAssignor的分配算法。
RangeAssignor策略的原理是按照消費者總數(shù)和分區(qū)總數(shù)進?整除運算來獲得?個跨度,然 后將分區(qū)按照跨度進?平均分配,以保證分區(qū)盡可能均勻地分配給所有的消費者。對于每?個 Topic,RangeAssignor策略會將消費組內(nèi)所有訂閱這個Topic的消費者按照名稱的字典序排序,然 后為每個消費者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費者會被多分配 ?個分區(qū)。
這種分配?式明顯的?個問題是隨著消費者訂閱的Topic的數(shù)量的增加,不均衡的問題會越來 越嚴重,?如上圖中4個分區(qū)3個消費者的場景,C0會多分配?個分區(qū)。如果此時再訂閱?個分區(qū) 數(shù)為4的Topic,那么C0?會?C1、C2多分配?個分區(qū),這樣C0總共就?C1、C2多分配兩個分區(qū) 了,?且隨著Topic的增加,這個情況會越來越嚴重。
源碼分析
public class RangeAssignor extends AbstractPartitionAssignor {
....
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
// 1. 獲取每個topic被多少個consumer訂閱了
Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
// 2. 存儲最終的分配?案
Map<String,List<String>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList());
for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List consumersForTopic = topicEntry.getValue();
// 3. 每個topic的partition數(shù)量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
Collections.sort(consumersForTopic);
// 4. 表示平均每個consumer會分配到多少個partition
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 5. 平均分配后還剩下多少個partition未被分配
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
// 6. 這?是關鍵點,分配原則是將未能被平均分配的partition分配到前 consumersWithExtraPartition個consumer
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}場景
可以完全平均分配

無法完全平均分配,排序靠前分的更多

消費者數(shù)量大于分區(qū)數(shù)量,排名靠前先分得,排名靠后未分得分區(qū)

RoundRobinAssignor
定義
RoundRobinAssignor的分配策略是將消費組內(nèi)訂閱的所有Topic的分區(qū)及所有消費者進?排序后盡 量均衡的分配(RangeAssignor是針對單個Topic的分區(qū)進?排序分配的)。如果消費組內(nèi),消費者訂閱 的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那么分配結果是盡量均衡的(消費者之間 分配到的分區(qū)數(shù)的差值不會超過1)。
源碼分析
package org.apache.kafka.clients.consumer;
public class RoundRobinAssignor extends AbstractPartitionAssignor {
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
<Map> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 環(huán)狀鏈表,存儲所有的consumer,?次迭代完之后?會回到原點
CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 獲取所有訂閱的topic的partition總數(shù) for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
while (!subscriptions.get(assigner.peek()).topics().contains(topic))
assigner.next();
assignment.get(assigner.next()).add(partition);
}
return assignment;
}
.... }場景
無法完全平均分配,排序靠前分的更多

StickyAssignor
定義
盡管RoundRobinAssignor已經(jīng)在RangeAssignor上做了?些優(yōu)化來更均衡的分配分區(qū),但是在?些情況下依舊會產(chǎn)?嚴重的分配偏差,從字?意義上看,Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每?次分配變更相對 上?次分配做最少的變動(上?次的結果是有粘性的) 其?標有兩點:
- 分區(qū)的分配盡量的均衡
- 每?次重分配的結果盡量與上?次分配結果保持?致
場景

到此這篇關于詳解kafka中的消息分區(qū)分配算法的文章就介紹到這了,更多相關kafka消息分區(qū)分配算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Springboot jar文件如何打包zip在linux環(huán)境運行
這篇文章主要介紹了Springboot jar文件如何打包zip在linux環(huán)境運行,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02
Java?中很好用的數(shù)據(jù)結構(你絕對沒用過)
今天跟大家介紹的就是?java.util.EnumMap,也是?java.util?包下面的一個集合類,同樣的也有對應的的?java.util.EnumSet,對java數(shù)據(jù)結構相關知識感興趣的朋友一起看看吧2022-05-05
Mybatis-plus的selectPage()分頁查詢不生效問題解決
本文主要介紹了Mybatis-plus的selectPage()分頁查詢不生效問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-01-01

