欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka?Producer中的消息緩存模型圖解詳解

 更新時(shí)間:2022年04月15日 09:51:17   作者:石臻臻的雜貨鋪  
Kafka中消息是以Topic進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向Topic的,下面這篇文章主要給大家介紹了關(guān)于Kafka?Producer中消息緩存模型的相關(guān)資料,需要的朋友可以參考下

前言

在閱讀本文之前, 希望你可以思考一下下面幾個(gè)問(wèn)題, 帶著問(wèn)題去閱讀文章會(huì)獲得更好的效果。

  1. 發(fā)送消息的時(shí)候, 當(dāng)Broker掛掉了,消息體還能寫(xiě)入到消息緩存中嗎?
  2. 當(dāng)消息還存儲(chǔ)在緩存中的時(shí)候, 假如Producer客戶端掛掉了,消息是不是就丟失了?
  3. 當(dāng)最新的ProducerBatch還有空余的內(nèi)存,但是接下來(lái)的一條消息很大,不足以加上上一個(gè)Batch中,會(huì)怎么辦呢?
  4. 那么創(chuàng)建ProducerBatch的時(shí)候,應(yīng)該分配多少的內(nèi)存呢?

什么是消息累加器RecordAccumulator

kafka為了提高Producer客戶端的發(fā)送吞吐量和提高性能,選擇了將消息暫時(shí)緩存起來(lái),等到滿足一定的條件, 再進(jìn)行批量發(fā)送, 這樣可以減少網(wǎng)絡(luò)請(qǐng)求,提高吞吐量。

而緩存這個(gè)消息的就是RecordAccumulator類.

整體模型圖

上圖就是整個(gè)消息存放的緩存模型,我們接下來(lái)一個(gè)個(gè)來(lái)講解。

消息緩存模型

上圖表示的就是 消息緩存的模型, 生產(chǎn)的消息就是暫時(shí)存放在這個(gè)里面。

  1. 每條消息,我們按照TopicPartition維度,把他們放在不同的Deque<ProducerBatch> 隊(duì)列里面。
    TopicPartition相同,會(huì)在相同Deque<ProducerBatch> 的里面。
  2. ProducerBatch : 表示同一個(gè)批次的消息, 消息真正發(fā)送到Broker端的時(shí)候都是按照批次來(lái)發(fā)送的,
    這個(gè)批次可能包含一條或者多條消息。
  3. 如果沒(méi)有找到消息對(duì)應(yīng)的ProducerBatch隊(duì)列, 則創(chuàng)建一個(gè)隊(duì)列。
  4. 找到ProducerBatch隊(duì)列隊(duì)尾的Batch,發(fā)現(xiàn)Batch還可以塞下這條消息,則將消息直接塞到這個(gè)Batch中
  5. 找到ProducerBatch隊(duì)列隊(duì)尾的Batch,發(fā)現(xiàn)Batch中剩余內(nèi)存,不夠塞下這條消息,則會(huì)創(chuàng)建新的Batch
  6. 當(dāng)消息發(fā)送成功之后, Batch會(huì)被釋放掉。

ProducerBatch的內(nèi)存大小

那么創(chuàng)建ProducerBatch的時(shí)候,應(yīng)該分配多少的內(nèi)存呢?

先說(shuō)結(jié)論: 當(dāng)消息預(yù)估內(nèi)存大于batch.size的時(shí)候,則按照消息預(yù)估內(nèi)存創(chuàng)建, 否則按照batch.size的大小創(chuàng)建(默認(rèn)16k).

我們來(lái)看一段代碼,這段代碼就是在創(chuàng)建ProducerBatch的時(shí)候預(yù)估內(nèi)存的大小

RecordAccumulator#append

       // 找到 batch.size 和 這條消息在batch中的總內(nèi)存大小的 最大值
       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       // 申請(qǐng)內(nèi)存
       buffer = free.allocate(size, maxTimeToBlock);

假設(shè)當(dāng)前生產(chǎn)了一條消息為M, 剛好消息M找不到可以存放消息的ProducerBatch(不存在或者滿了),那么這個(gè)時(shí)候就需要?jiǎng)?chuàng)建一個(gè)新的ProducerBatch了

預(yù)估消息的大小 跟batch.size 默認(rèn)大小16384(16kb). 對(duì)比,取最大值用于申請(qǐng)的內(nèi)存大小的值。

那么, 這個(gè)消息的預(yù)估是如何預(yù)估的?純粹的是消息體的大小嗎?

DefaultRecordBatch#estimateBatchSizeUpperBound

預(yù)估需要的Batch大小,是一個(gè)預(yù)估值,因?yàn)闆](méi)有考慮壓縮算法從額外開(kāi)銷

    /**
    * 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。
    * 這只是一個(gè)估計(jì),因?yàn)樗鼪](méi)有考慮使用的壓縮算法的額外開(kāi)銷。
    **/
    static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
    }
  1. 預(yù)估這個(gè)消息M的大小 + 一個(gè)RECORD_BATCH_OVERHEAD的大小
  2. RECORD_BATCH_OVERHEAD是一個(gè)Batch里面的一些基本元信息,總共占用了 61B
  3. 消息M的大小也并不是單單的只有消息體的大小,總大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD
  4. MAX_RECORD_OVERHEAD :一條消息頭最大占用空間, 最大值為21B

也就是說(shuō)創(chuàng)建一個(gè)ProducerBatch,最少就要83B .

比如我發(fā)送一條消息 " 1 " , 預(yù)估得到的大小是 86B, 跟batch.size(默認(rèn)16384) 相比取最大值。 那么申請(qǐng)內(nèi)存的時(shí)候取最大值 16384 。

關(guān)于Batch的結(jié)構(gòu)和消息的結(jié)構(gòu),我們回頭單獨(dú)用一篇文章來(lái)講解

內(nèi)存分配

我們都知道RecordAccumulator里面的緩存大小是一開(kāi)始定義好的, 由buffer.memory控制, 默認(rèn)33554432 (32M)

當(dāng)生產(chǎn)的速度大于發(fā)送速度的時(shí)候,就可能出現(xiàn)Producer寫(xiě)入阻塞。

而且頻繁的創(chuàng)建和釋放ProducerBatch,會(huì)導(dǎo)致頻繁GC, 所有kafka中有個(gè)緩存池的概念,這個(gè)緩存池會(huì)被重復(fù)使用,但是只有固定( batch.size)的大小才能夠使用緩存池。

PS:以下16k指得是 batch.size的默認(rèn)值.

Batch的創(chuàng)建和釋放

1. 內(nèi)存16K 緩存池中有可用內(nèi)存

①. 創(chuàng)建Batch的時(shí)候, 會(huì)去緩存池中,獲取隊(duì)首的一塊內(nèi)存ByteBuffer 使用。

②. 消息發(fā)送完成,釋放Batch, 則會(huì)把這個(gè)ByteBuffer,放到緩存池的隊(duì)尾中,并且調(diào)用ByteBuffer.clear 清空數(shù)據(jù)。以便下次重復(fù)使用

2. 內(nèi)存16K 緩存池中無(wú)可用內(nèi)存

①. 創(chuàng)建Batch的時(shí)候, 去非緩存池中的內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說(shuō)的獲取內(nèi)存給Batch, 其實(shí)就是讓 非緩存池nonPooledAvailableMemory 減少 16K 的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。

②. 消息發(fā)送完成,釋放Batch, 則會(huì)把這個(gè)ByteBuffer,放到緩存池的隊(duì)尾中,并且調(diào)用ByteBuffer.clear 清空數(shù)據(jù), 以便下次重復(fù)使用

3. 內(nèi)存非16K 非緩存池中內(nèi)存夠用

①. 創(chuàng)建Batch的時(shí)候, 去非緩存池(nonPooledAvailableMemory)內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說(shuō)的獲取內(nèi)存給Batch, 其實(shí)就是讓 非緩存池(nonPooledAvailableMemory) 減少對(duì)應(yīng)的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。

②. 消息發(fā)送完成,釋放Batch, 純粹的是在非緩存池(nonPooledAvailableMemory)中加上剛剛釋放的Batch內(nèi)存大小。 當(dāng)然這個(gè)Batch會(huì)被GC掉

4. 內(nèi)存非16K 非緩存池內(nèi)存不夠用

①. 先嘗試將 緩存池中的內(nèi)存一個(gè)一個(gè)釋放到 非緩存池中, 直到非緩存池中的內(nèi)存夠用與創(chuàng)建Batch了

②. 創(chuàng)建Batch的時(shí)候, 去非緩存池(nonPooledAvailableMemory)內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說(shuō)的獲取內(nèi)存給Batch, 其實(shí)就是讓 非緩存池(nonPooledAvailableMemory) 減少對(duì)應(yīng)的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。

③. 消息發(fā)送完成,釋放Batch, 純粹的是在非緩存池(nonPooledAvailableMemory)中加上剛剛釋放的Batch內(nèi)存大小。 當(dāng)然這個(gè)Batch會(huì)被GC掉

例如: 下面我們需要?jiǎng)?chuàng)建 48k的batch, 因?yàn)槌^(guò)了16k,所以需要在非緩存池中分配內(nèi)存, 但是非緩存池中當(dāng)前可用內(nèi)存為0 , 分配不了, 這個(gè)時(shí)候就會(huì)嘗試去 緩存池里面釋放一部分內(nèi)存到 非緩存池。

釋放第一個(gè)ByteBuffer(16k) 不夠,則繼續(xù)釋放第二個(gè),直到釋放了3個(gè)之后總共48k,發(fā)現(xiàn)內(nèi)存這時(shí)候夠了, 再去創(chuàng)建Batch。

注意:這里我們涉及到的 非緩存池中的內(nèi)存分配, 僅僅指的的內(nèi)存數(shù)字的增加和減少。

問(wèn)題和答案

發(fā)送消息的時(shí)候, 當(dāng)Broker掛掉了,消息體還能寫(xiě)入到消息緩存中嗎?

當(dāng)Broker掛掉了,Producer會(huì)提示下面的警告??, 但是發(fā)送消息過(guò)程中

這個(gè)消息體還是可以寫(xiě)入到 消息緩存中的,也僅僅是寫(xiě)到到緩存中而已。

WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available
當(dāng)最新的ProducerBatch還有空余的內(nèi)存,但是接下來(lái)的一條消息很大,不足以加上上一個(gè)Batch中,會(huì)怎么辦呢?

那么會(huì)創(chuàng)建新的ProducerBatch。

那么創(chuàng)建ProducerBatch的時(shí)候,應(yīng)該分配多少的內(nèi)存呢?

觸發(fā)創(chuàng)建ProducerBatch的那條消息預(yù)估大小大于batch.size ,則以預(yù)估內(nèi)存創(chuàng)建。
否則,以batch.size創(chuàng)建。

還有一個(gè)問(wèn)題供大家思考:

當(dāng)消息還存儲(chǔ)在緩存中的時(shí)候, 假如Producer客戶端掛掉了,消息是不是就丟失了?

總結(jié)

到此這篇關(guān)于Kafka Producer中消息緩存模型的文章就介紹到這了,更多相關(guān)Kafka Producer消息緩存模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • idea 普通文件夾 轉(zhuǎn)換成 module操作

    idea 普通文件夾 轉(zhuǎn)換成 module操作

    這篇文章主要介紹了idea 普通文件夾 轉(zhuǎn)換成 module操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-08-08
  • 在idea中使用JaCoCo插件統(tǒng)計(jì)單元測(cè)試覆蓋率的實(shí)現(xiàn)

    在idea中使用JaCoCo插件統(tǒng)計(jì)單元測(cè)試覆蓋率的實(shí)現(xiàn)

    這篇文章主要介紹了在idea中使用JaCoCo插件統(tǒng)計(jì)單元測(cè)試覆蓋率的實(shí)現(xiàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-01-01
  • Java swing仿酷狗音樂(lè)播放器

    Java swing仿酷狗音樂(lè)播放器

    這篇文章主要為大家詳細(xì)介紹了Java swing實(shí)現(xiàn)音樂(lè)播放器,Java開(kāi)發(fā)圖形界面程序音樂(lè)播放器仿酷狗音樂(lè)播放器,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-06-06
  • 老生常談java中的fail-fast機(jī)制

    老生常談java中的fail-fast機(jī)制

    下面小編就為大家?guī)?lái)一篇老生常談java中的fail-fast機(jī)制。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-08-08
  • Spring Boot中Bean定義方調(diào)用方式解析

    Spring Boot中Bean定義方調(diào)用方式解析

    這篇文章主要介紹了Spring Boot中Bean定義方調(diào)用方式解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • Spring AOP日志框架實(shí)現(xiàn)過(guò)程圖解

    Spring AOP日志框架實(shí)現(xiàn)過(guò)程圖解

    這篇文章主要介紹了Spring AOP日志框架實(shí)現(xiàn)過(guò)程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-09-09
  • 企業(yè)級(jí)Kubernetes管理平臺(tái)Wayne功能特性介紹

    企業(yè)級(jí)Kubernetes管理平臺(tái)Wayne功能特性介紹

    這篇文章主要為大家介紹了企業(yè)級(jí)Kubernetes管理平臺(tái)Wayne的功能特性及架構(gòu)設(shè)計(jì),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-02-02
  • Java 在Excel中添加分離型餅圖、環(huán)形圖的方法

    Java 在Excel中添加分離型餅圖、環(huán)形圖的方法

    這篇文章主要介紹了Java 在Excel中添加分離型餅圖、環(huán)形圖的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-12-12
  • resty mail的簡(jiǎn)單發(fā)送郵件方法

    resty mail的簡(jiǎn)單發(fā)送郵件方法

    這篇文章主要為大家介紹了簡(jiǎn)單的resty mail發(fā)送郵件方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-03-03
  • Java使用BouncyCastle加密

    Java使用BouncyCastle加密

    本文主要介紹了Java使用BouncyCastle加密,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06

最新評(píng)論