欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解kafka中的消息分區(qū)分配算法

 更新時(shí)間:2022年04月15日 14:08:23   作者:溫故知新之java  
kafka有分區(qū)機(jī)制,一個(gè)主題topic在創(chuàng)建的時(shí)候,會設(shè)置分區(qū)。如果只有一個(gè)分區(qū),那所有的消費(fèi)者都訂閱的是這一個(gè)分區(qū)消息;如果有多個(gè)分區(qū)的話,那消費(fèi)者之間又是如何分配的呢?本文就來為大家詳細(xì)講解一下

背景

kafka有分區(qū)機(jī)制,一個(gè)主題topic在創(chuàng)建的時(shí)候,會設(shè)置分區(qū)。如果只有一個(gè)分區(qū),那所有的消費(fèi)者都訂閱的是這一個(gè)分區(qū)消息;如果有多個(gè)分區(qū)的話,那消費(fèi)者之間又是如何分配的呢?

分配算法

RangeAssignor

定義

Kafka默認(rèn)采?RangeAssignor的分配算法。

RangeAssignor策略的原理是按照消費(fèi)者總數(shù)和分區(qū)總數(shù)進(jìn)?整除運(yùn)算來獲得?個(gè)跨度,然 后將分區(qū)按照跨度進(jìn)?平均分配,以保證分區(qū)盡可能均勻地分配給所有的消費(fèi)者。對于每?個(gè) Topic,RangeAssignor策略會將消費(fèi)組內(nèi)所有訂閱這個(gè)Topic的消費(fèi)者按照名稱的字典序排序,然 后為每個(gè)消費(fèi)者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費(fèi)者會被多分配 ?個(gè)分區(qū)。

這種分配?式明顯的?個(gè)問題是隨著消費(fèi)者訂閱的Topic的數(shù)量的增加,不均衡的問題會越來 越嚴(yán)重,?如上圖中4個(gè)分區(qū)3個(gè)消費(fèi)者的場景,C0會多分配?個(gè)分區(qū)。如果此時(shí)再訂閱?個(gè)分區(qū) 數(shù)為4的Topic,那么C0?會?C1、C2多分配?個(gè)分區(qū),這樣C0總共就?C1、C2多分配兩個(gè)分區(qū) 了,?且隨著Topic的增加,這個(gè)情況會越來越嚴(yán)重。

源碼分析

public class RangeAssignor extends AbstractPartitionAssignor {
    ....
    @Override 
    public Map> assign(Map partitionsPerTopic, Map subscriptions) { 
        // 1. 獲取每個(gè)topic被多少個(gè)consumer訂閱了 
        Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions); 
        // 2. 存儲最終的分配?案 
        Map<String,List<String>> assignment = new HashMap<>(); 
        for (String memberId : subscriptions.keySet()) 
            assignment.put(memberId, new ArrayList()); 
        for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { 
            String topic = topicEntry.getKey(); 
            List consumersForTopic = topicEntry.getValue(); 
            // 3. 每個(gè)topic的partition數(shù)量 
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic); 
            if (numPartitionsForTopic == null) 
            continue; 
            Collections.sort(consumersForTopic); 
            // 4. 表示平均每個(gè)consumer會分配到多少個(gè)partition 
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); 
            // 5. 平均分配后還剩下多少個(gè)partition未被分配 
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); 
            List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); 
            // 6. 這?是關(guān)鍵點(diǎn),分配原則是將未能被平均分配的partition分配到前 consumersWithExtraPartition個(gè)consumer
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) { 
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); 
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); 
                } 
            } 
            return assignment; 
    }
}

場景

可以完全平均分配

無法完全平均分配,排序靠前分的更多

消費(fèi)者數(shù)量大于分區(qū)數(shù)量,排名靠前先分得,排名靠后未分得分區(qū)

RoundRobinAssignor

定義

RoundRobinAssignor的分配策略是將消費(fèi)組內(nèi)訂閱的所有Topic的分區(qū)及所有消費(fèi)者進(jìn)?排序后盡 量均衡的分配(RangeAssignor是針對單個(gè)Topic的分區(qū)進(jìn)?排序分配的)。如果消費(fèi)組內(nèi),消費(fèi)者訂閱 的Topic列表是相同的(每個(gè)消費(fèi)者都訂閱了相同的Topic),那么分配結(jié)果是盡量均衡的(消費(fèi)者之間 分配到的分區(qū)數(shù)的差值不會超過1)。

源碼分析

package org.apache.kafka.clients.consumer; 
public class RoundRobinAssignor extends AbstractPartitionAssignor { 
@Override 
public Map> assign(Map partitionsPerTopic, Map subscriptions) { 
        <Map> assignment = new HashMap<>(); 
        for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 環(huán)狀鏈表,存儲所有的consumer,?次迭代完之后?會回到原點(diǎn) 
        CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 獲取所有訂閱的topic的partition總數(shù) for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { 
        final String topic = partition.topic(); 
        while (!subscriptions.get(assigner.peek()).topics().contains(topic)) 
            assigner.next(); 
            assignment.get(assigner.next()).add(partition); 
        }
        return assignment; 
    } 
.... }

場景

無法完全平均分配,排序靠前分的更多

StickyAssignor

定義

盡管RoundRobinAssignor已經(jīng)在RangeAssignor上做了?些優(yōu)化來更均衡的分配分區(qū),但是在?些情況下依舊會產(chǎn)?嚴(yán)重的分配偏差,從字?意義上看,Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每?次分配變更相對 上?次分配做最少的變動(dòng)(上?次的結(jié)果是有粘性的) 其?標(biāo)有兩點(diǎn):

  • 分區(qū)的分配盡量的均衡
  • 每?次重分配的結(jié)果盡量與上?次分配結(jié)果保持?致

場景

到此這篇關(guān)于詳解kafka中的消息分區(qū)分配算法的文章就介紹到這了,更多相關(guān)kafka消息分區(qū)分配算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java?LeetCode刷題稍有難度的貪心構(gòu)造算法

    java?LeetCode刷題稍有難度的貪心構(gòu)造算法

    這篇文章主要為大家介紹了java?LeetCode刷題稍有難度的貪心構(gòu)造題解示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-02-02
  • Springboot jar文件如何打包zip在linux環(huán)境運(yùn)行

    Springboot jar文件如何打包zip在linux環(huán)境運(yùn)行

    這篇文章主要介紹了Springboot jar文件如何打包zip在linux環(huán)境運(yùn)行,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • Java?中很好用的數(shù)據(jù)結(jié)構(gòu)(你絕對沒用過)

    Java?中很好用的數(shù)據(jù)結(jié)構(gòu)(你絕對沒用過)

    今天跟大家介紹的就是?java.util.EnumMap,也是?java.util?包下面的一個(gè)集合類,同樣的也有對應(yīng)的的?java.util.EnumSet,對java數(shù)據(jù)結(jié)構(gòu)相關(guān)知識感興趣的朋友一起看看吧
    2022-05-05
  • java中關(guān)于深拷貝的幾種方式總結(jié)

    java中關(guān)于深拷貝的幾種方式總結(jié)

    這篇文章主要介紹了java中關(guān)于深拷貝的幾種方式總結(jié),具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • Java導(dǎo)出txt文件的方法

    Java導(dǎo)出txt文件的方法

    這篇文章主要介紹了Java導(dǎo)出txt文件的方法,實(shí)例分析了兩種java導(dǎo)出txt文本文件的使用技巧,需要的朋友可以參考下
    2015-05-05
  • 兩行Javascript代碼生成UUID的方法

    兩行Javascript代碼生成UUID的方法

    這篇文章主要介紹了兩行Javascript代碼生成UUID的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • Mybatis-plus的selectPage()分頁查詢不生效問題解決

    Mybatis-plus的selectPage()分頁查詢不生效問題解決

    本文主要介紹了Mybatis-plus的selectPage()分頁查詢不生效問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • springboot 緩存@EnableCaching實(shí)例

    springboot 緩存@EnableCaching實(shí)例

    這篇文章主要介紹了springboot 緩存@EnableCaching實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 關(guān)于如何正確地定義Java內(nèi)部類方法詳解

    關(guān)于如何正確地定義Java內(nèi)部類方法詳解

    在Java中,我們通常是把不同的類創(chuàng)建在不同的包里面,對于同一個(gè)包里的類來說,它們都是同一層次的,但其實(shí)還有另一種情況,有些類可以被定義在另一個(gè)類的內(nèi)部,本文將詳細(xì)帶你了解如何正確地定義Java內(nèi)部類,需要的朋友可以參考下
    2023-05-05
  • springboot項(xiàng)目如何設(shè)置時(shí)區(qū)

    springboot項(xiàng)目如何設(shè)置時(shí)區(qū)

    這篇文章主要介紹了springboot項(xiàng)目如何設(shè)置時(shí)區(qū)問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07

最新評論