詳解kafka中的消息分區(qū)分配算法
背景
kafka有分區(qū)機制,一個主題topic在創(chuàng)建的時候,會設置分區(qū)。如果只有一個分區(qū),那所有的消費者都訂閱的是這一個分區(qū)消息;如果有多個分區(qū)的話,那消費者之間又是如何分配的呢?
分配算法
RangeAssignor
定義
Kafka默認采?RangeAssignor的分配算法。
RangeAssignor策略的原理是按照消費者總數和分區(qū)總數進?整除運算來獲得?個跨度,然 后將分區(qū)按照跨度進?平均分配,以保證分區(qū)盡可能均勻地分配給所有的消費者。對于每?個 Topic,RangeAssignor策略會將消費組內所有訂閱這個Topic的消費者按照名稱的字典序排序,然 后為每個消費者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費者會被多分配 ?個分區(qū)。
這種分配?式明顯的?個問題是隨著消費者訂閱的Topic的數量的增加,不均衡的問題會越來 越嚴重,?如上圖中4個分區(qū)3個消費者的場景,C0會多分配?個分區(qū)。如果此時再訂閱?個分區(qū) 數為4的Topic,那么C0?會?C1、C2多分配?個分區(qū),這樣C0總共就?C1、C2多分配兩個分區(qū) 了,?且隨著Topic的增加,這個情況會越來越嚴重。
源碼分析
public class RangeAssignor extends AbstractPartitionAssignor { .... @Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { // 1. 獲取每個topic被多少個consumer訂閱了 Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions); // 2. 存儲最終的分配?案 Map<String,List<String>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List consumersForTopic = topicEntry.getValue(); // 3. 每個topic的partition數量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); // 4. 表示平均每個consumer會分配到多少個partition int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); // 5. 平均分配后還剩下多少個partition未被分配 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); // 6. 這?是關鍵點,分配原則是將未能被平均分配的partition分配到前 consumersWithExtraPartition個consumer for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
場景
可以完全平均分配
無法完全平均分配,排序靠前分的更多
消費者數量大于分區(qū)數量,排名靠前先分得,排名靠后未分得分區(qū)
RoundRobinAssignor
定義
RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分區(qū)及所有消費者進?排序后盡 量均衡的分配(RangeAssignor是針對單個Topic的分區(qū)進?排序分配的)。如果消費組內,消費者訂閱 的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那么分配結果是盡量均衡的(消費者之間 分配到的分區(qū)數的差值不會超過1)。
源碼分析
package org.apache.kafka.clients.consumer; public class RoundRobinAssignor extends AbstractPartitionAssignor { @Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { <Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 環(huán)狀鏈表,存儲所有的consumer,?次迭代完之后?會回到原點 CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 獲取所有訂閱的topic的partition總數 for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).topics().contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); } return assignment; } .... }
場景
無法完全平均分配,排序靠前分的更多
StickyAssignor
定義
盡管RoundRobinAssignor已經在RangeAssignor上做了?些優(yōu)化來更均衡的分配分區(qū),但是在?些情況下依舊會產?嚴重的分配偏差,從字?意義上看,Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每?次分配變更相對 上?次分配做最少的變動(上?次的結果是有粘性的) 其?標有兩點:
- 分區(qū)的分配盡量的均衡
- 每?次重分配的結果盡量與上?次分配結果保持?致
場景
到此這篇關于詳解kafka中的消息分區(qū)分配算法的文章就介紹到這了,更多相關kafka消息分區(qū)分配算法內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Springboot jar文件如何打包zip在linux環(huán)境運行
這篇文章主要介紹了Springboot jar文件如何打包zip在linux環(huán)境運行,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02Mybatis-plus的selectPage()分頁查詢不生效問題解決
本文主要介紹了Mybatis-plus的selectPage()分頁查詢不生效問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-01-01