詳解kafka中的消息分區(qū)分配算法
背景
kafka有分區(qū)機(jī)制,一個(gè)主題topic在創(chuàng)建的時(shí)候,會設(shè)置分區(qū)。如果只有一個(gè)分區(qū),那所有的消費(fèi)者都訂閱的是這一個(gè)分區(qū)消息;如果有多個(gè)分區(qū)的話,那消費(fèi)者之間又是如何分配的呢?
分配算法
RangeAssignor
定義
Kafka默認(rèn)采?RangeAssignor的分配算法。
RangeAssignor策略的原理是按照消費(fèi)者總數(shù)和分區(qū)總數(shù)進(jìn)?整除運(yùn)算來獲得?個(gè)跨度,然 后將分區(qū)按照跨度進(jìn)?平均分配,以保證分區(qū)盡可能均勻地分配給所有的消費(fèi)者。對于每?個(gè) Topic,RangeAssignor策略會將消費(fèi)組內(nèi)所有訂閱這個(gè)Topic的消費(fèi)者按照名稱的字典序排序,然 后為每個(gè)消費(fèi)者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費(fèi)者會被多分配 ?個(gè)分區(qū)。
這種分配?式明顯的?個(gè)問題是隨著消費(fèi)者訂閱的Topic的數(shù)量的增加,不均衡的問題會越來 越嚴(yán)重,?如上圖中4個(gè)分區(qū)3個(gè)消費(fèi)者的場景,C0會多分配?個(gè)分區(qū)。如果此時(shí)再訂閱?個(gè)分區(qū) 數(shù)為4的Topic,那么C0?會?C1、C2多分配?個(gè)分區(qū),這樣C0總共就?C1、C2多分配兩個(gè)分區(qū) 了,?且隨著Topic的增加,這個(gè)情況會越來越嚴(yán)重。
源碼分析
public class RangeAssignor extends AbstractPartitionAssignor { .... @Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { // 1. 獲取每個(gè)topic被多少個(gè)consumer訂閱了 Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions); // 2. 存儲最終的分配?案 Map<String,List<String>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List consumersForTopic = topicEntry.getValue(); // 3. 每個(gè)topic的partition數(shù)量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); // 4. 表示平均每個(gè)consumer會分配到多少個(gè)partition int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); // 5. 平均分配后還剩下多少個(gè)partition未被分配 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); // 6. 這?是關(guān)鍵點(diǎn),分配原則是將未能被平均分配的partition分配到前 consumersWithExtraPartition個(gè)consumer for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
場景
可以完全平均分配
無法完全平均分配,排序靠前分的更多
消費(fèi)者數(shù)量大于分區(qū)數(shù)量,排名靠前先分得,排名靠后未分得分區(qū)
RoundRobinAssignor
定義
RoundRobinAssignor的分配策略是將消費(fèi)組內(nèi)訂閱的所有Topic的分區(qū)及所有消費(fèi)者進(jìn)?排序后盡 量均衡的分配(RangeAssignor是針對單個(gè)Topic的分區(qū)進(jìn)?排序分配的)。如果消費(fèi)組內(nèi),消費(fèi)者訂閱 的Topic列表是相同的(每個(gè)消費(fèi)者都訂閱了相同的Topic),那么分配結(jié)果是盡量均衡的(消費(fèi)者之間 分配到的分區(qū)數(shù)的差值不會超過1)。
源碼分析
package org.apache.kafka.clients.consumer; public class RoundRobinAssignor extends AbstractPartitionAssignor { @Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { <Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 環(huán)狀鏈表,存儲所有的consumer,?次迭代完之后?會回到原點(diǎn) CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 獲取所有訂閱的topic的partition總數(shù) for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).topics().contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); } return assignment; } .... }
場景
無法完全平均分配,排序靠前分的更多
StickyAssignor
定義
盡管RoundRobinAssignor已經(jīng)在RangeAssignor上做了?些優(yōu)化來更均衡的分配分區(qū),但是在?些情況下依舊會產(chǎn)?嚴(yán)重的分配偏差,從字?意義上看,Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每?次分配變更相對 上?次分配做最少的變動(dòng)(上?次的結(jié)果是有粘性的) 其?標(biāo)有兩點(diǎn):
- 分區(qū)的分配盡量的均衡
- 每?次重分配的結(jié)果盡量與上?次分配結(jié)果保持?致
場景
到此這篇關(guān)于詳解kafka中的消息分區(qū)分配算法的文章就介紹到這了,更多相關(guān)kafka消息分區(qū)分配算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java?LeetCode刷題稍有難度的貪心構(gòu)造算法
這篇文章主要為大家介紹了java?LeetCode刷題稍有難度的貪心構(gòu)造題解示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02Springboot jar文件如何打包zip在linux環(huán)境運(yùn)行
這篇文章主要介紹了Springboot jar文件如何打包zip在linux環(huán)境運(yùn)行,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02Java?中很好用的數(shù)據(jù)結(jié)構(gòu)(你絕對沒用過)
今天跟大家介紹的就是?java.util.EnumMap,也是?java.util?包下面的一個(gè)集合類,同樣的也有對應(yīng)的的?java.util.EnumSet,對java數(shù)據(jù)結(jié)構(gòu)相關(guān)知識感興趣的朋友一起看看吧2022-05-05Mybatis-plus的selectPage()分頁查詢不生效問題解決
本文主要介紹了Mybatis-plus的selectPage()分頁查詢不生效問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01springboot 緩存@EnableCaching實(shí)例
這篇文章主要介紹了springboot 緩存@EnableCaching實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11關(guān)于如何正確地定義Java內(nèi)部類方法詳解
在Java中,我們通常是把不同的類創(chuàng)建在不同的包里面,對于同一個(gè)包里的類來說,它們都是同一層次的,但其實(shí)還有另一種情況,有些類可以被定義在另一個(gè)類的內(nèi)部,本文將詳細(xì)帶你了解如何正確地定義Java內(nèi)部類,需要的朋友可以參考下2023-05-05springboot項(xiàng)目如何設(shè)置時(shí)區(qū)
這篇文章主要介紹了springboot項(xiàng)目如何設(shè)置時(shí)區(qū)問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07