Kafka?Producer中的消息緩存模型圖解詳解
前言
在閱讀本文之前, 希望你可以思考一下下面幾個(gè)問(wèn)題, 帶著問(wèn)題去閱讀文章會(huì)獲得更好的效果。
- 發(fā)送消息的時(shí)候, 當(dāng)Broker掛掉了,消息體還能寫(xiě)入到消息緩存中嗎?
- 當(dāng)消息還存儲(chǔ)在緩存中的時(shí)候, 假如Producer客戶端掛掉了,消息是不是就丟失了?
- 當(dāng)最新的ProducerBatch還有空余的內(nèi)存,但是接下來(lái)的一條消息很大,不足以加上上一個(gè)Batch中,會(huì)怎么辦呢?
- 那么創(chuàng)建ProducerBatch的時(shí)候,應(yīng)該分配多少的內(nèi)存呢?
什么是消息累加器RecordAccumulator
kafka為了提高Producer客戶端的發(fā)送吞吐量和提高性能,選擇了將消息暫時(shí)緩存起來(lái),等到滿足一定的條件, 再進(jìn)行批量發(fā)送, 這樣可以減少網(wǎng)絡(luò)請(qǐng)求,提高吞吐量。
而緩存這個(gè)消息的就是RecordAccumulator類.
上圖就是整個(gè)消息存放的緩存模型,我們接下來(lái)一個(gè)個(gè)來(lái)講解。
消息緩存模型
上圖表示的就是 消息緩存的模型, 生產(chǎn)的消息就是暫時(shí)存放在這個(gè)里面。
- 每條消息,我們按照TopicPartition維度,把他們放在不同的Deque<ProducerBatch> 隊(duì)列里面。
TopicPartition相同,會(huì)在相同Deque<ProducerBatch> 的里面。 - ProducerBatch : 表示同一個(gè)批次的消息, 消息真正發(fā)送到Broker端的時(shí)候都是按照批次來(lái)發(fā)送的,
這個(gè)批次可能包含一條或者多條消息。 - 如果沒(méi)有找到消息對(duì)應(yīng)的ProducerBatch隊(duì)列, 則創(chuàng)建一個(gè)隊(duì)列。
- 找到ProducerBatch隊(duì)列隊(duì)尾的Batch,發(fā)現(xiàn)Batch還可以塞下這條消息,則將消息直接塞到這個(gè)Batch中
- 找到ProducerBatch隊(duì)列隊(duì)尾的Batch,發(fā)現(xiàn)Batch中剩余內(nèi)存,不夠塞下這條消息,則會(huì)創(chuàng)建新的Batch
- 當(dāng)消息發(fā)送成功之后, Batch會(huì)被釋放掉。
ProducerBatch的內(nèi)存大小
那么創(chuàng)建ProducerBatch的時(shí)候,應(yīng)該分配多少的內(nèi)存呢?
先說(shuō)結(jié)論: 當(dāng)消息預(yù)估內(nèi)存大于batch.size
的時(shí)候,則按照消息預(yù)估內(nèi)存創(chuàng)建, 否則按照batch.size
的大小創(chuàng)建(默認(rèn)16k).
我們來(lái)看一段代碼,這段代碼就是在創(chuàng)建ProducerBatch的時(shí)候預(yù)估內(nèi)存的大小
RecordAccumulator#append
// 找到 batch.size 和 這條消息在batch中的總內(nèi)存大小的 最大值 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); // 申請(qǐng)內(nèi)存 buffer = free.allocate(size, maxTimeToBlock);
假設(shè)當(dāng)前生產(chǎn)了一條消息為M, 剛好消息M找不到可以存放消息的ProducerBatch(不存在或者滿了),那么這個(gè)時(shí)候就需要?jiǎng)?chuàng)建一個(gè)新的ProducerBatch了
預(yù)估消息的大小 跟batch.size
默認(rèn)大小16384(16kb). 對(duì)比,取最大值用于申請(qǐng)的內(nèi)存大小的值。
那么, 這個(gè)消息的預(yù)估是如何預(yù)估的?純粹的是消息體的大小嗎?
DefaultRecordBatch#estimateBatchSizeUpperBound
預(yù)估需要的Batch大小,是一個(gè)預(yù)估值,因?yàn)闆](méi)有考慮壓縮算法從額外開(kāi)銷
/** * 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。 * 這只是一個(gè)估計(jì),因?yàn)樗鼪](méi)有考慮使用的壓縮算法的額外開(kāi)銷。 **/ static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); }
- 預(yù)估這個(gè)消息M的大小 + 一個(gè)RECORD_BATCH_OVERHEAD的大小
- RECORD_BATCH_OVERHEAD是一個(gè)Batch里面的一些基本元信息,總共占用了 61B
- 消息M的大小也并不是單單的只有消息體的大小,總大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD
- MAX_RECORD_OVERHEAD :一條消息頭最大占用空間, 最大值為21B
也就是說(shuō)創(chuàng)建一個(gè)ProducerBatch,最少就要83B .
比如我發(fā)送一條消息 " 1 " , 預(yù)估得到的大小是 86B, 跟batch.size(默認(rèn)16384)
相比取最大值。 那么申請(qǐng)內(nèi)存的時(shí)候取最大值 16384 。
關(guān)于Batch的結(jié)構(gòu)和消息的結(jié)構(gòu),我們回頭單獨(dú)用一篇文章來(lái)講解。
內(nèi)存分配
我們都知道RecordAccumulator里面的緩存大小是一開(kāi)始定義好的, 由buffer.memory
控制, 默認(rèn)33554432 (32M)
當(dāng)生產(chǎn)的速度大于發(fā)送速度的時(shí)候,就可能出現(xiàn)Producer寫(xiě)入阻塞。
而且頻繁的創(chuàng)建和釋放ProducerBatch,會(huì)導(dǎo)致頻繁GC, 所有kafka中有個(gè)緩存池的概念,這個(gè)緩存池會(huì)被重復(fù)使用,但是只有固定( batch.size)的大小才能夠使用緩存池。
PS:以下16k指得是 batch.size的默認(rèn)值.
Batch的創(chuàng)建和釋放
1. 內(nèi)存16K 緩存池中有可用內(nèi)存
①. 創(chuàng)建Batch的時(shí)候, 會(huì)去緩存池中,獲取隊(duì)首的一塊內(nèi)存ByteBuffer 使用。
②. 消息發(fā)送完成,釋放Batch, 則會(huì)把這個(gè)ByteBuffer,放到緩存池的隊(duì)尾中,并且調(diào)用ByteBuffer.clear
清空數(shù)據(jù)。以便下次重復(fù)使用
2. 內(nèi)存16K 緩存池中無(wú)可用內(nèi)存
①. 創(chuàng)建Batch的時(shí)候, 去非緩存池中的內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說(shuō)的獲取內(nèi)存給Batch, 其實(shí)就是讓 非緩存池nonPooledAvailableMemory 減少 16K 的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。
②. 消息發(fā)送完成,釋放Batch, 則會(huì)把這個(gè)ByteBuffer,放到緩存池的隊(duì)尾中,并且調(diào)用ByteBuffer.clear
清空數(shù)據(jù), 以便下次重復(fù)使用
3. 內(nèi)存非16K 非緩存池中內(nèi)存夠用
①. 創(chuàng)建Batch的時(shí)候, 去非緩存池(nonPooledAvailableMemory)內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說(shuō)的獲取內(nèi)存給Batch, 其實(shí)就是讓 非緩存池(nonPooledAvailableMemory) 減少對(duì)應(yīng)的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。
②. 消息發(fā)送完成,釋放Batch, 純粹的是在非緩存池(nonPooledAvailableMemory)中加上剛剛釋放的Batch內(nèi)存大小。 當(dāng)然這個(gè)Batch會(huì)被GC掉
4. 內(nèi)存非16K 非緩存池內(nèi)存不夠用
①. 先嘗試將 緩存池中的內(nèi)存一個(gè)一個(gè)釋放到 非緩存池中, 直到非緩存池中的內(nèi)存夠用與創(chuàng)建Batch了
②. 創(chuàng)建Batch的時(shí)候, 去非緩存池(nonPooledAvailableMemory)內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說(shuō)的獲取內(nèi)存給Batch, 其實(shí)就是讓 非緩存池(nonPooledAvailableMemory) 減少對(duì)應(yīng)的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。
③. 消息發(fā)送完成,釋放Batch, 純粹的是在非緩存池(nonPooledAvailableMemory)中加上剛剛釋放的Batch內(nèi)存大小。 當(dāng)然這個(gè)Batch會(huì)被GC掉
例如: 下面我們需要?jiǎng)?chuàng)建 48k的batch, 因?yàn)槌^(guò)了16k,所以需要在非緩存池中分配內(nèi)存, 但是非緩存池中當(dāng)前可用內(nèi)存為0 , 分配不了, 這個(gè)時(shí)候就會(huì)嘗試去 緩存池里面釋放一部分內(nèi)存到 非緩存池。
釋放第一個(gè)ByteBuffer(16k) 不夠,則繼續(xù)釋放第二個(gè),直到釋放了3個(gè)之后總共48k,發(fā)現(xiàn)內(nèi)存這時(shí)候夠了, 再去創(chuàng)建Batch。
注意:這里我們涉及到的 非緩存池中的內(nèi)存分配, 僅僅指的的內(nèi)存數(shù)字的增加和減少。
問(wèn)題和答案
發(fā)送消息的時(shí)候, 當(dāng)Broker掛掉了,消息體還能寫(xiě)入到消息緩存中嗎?
當(dāng)Broker掛掉了,Producer會(huì)提示下面的警告??, 但是發(fā)送消息過(guò)程中
這個(gè)消息體還是可以寫(xiě)入到 消息緩存中的,也僅僅是寫(xiě)到到緩存中而已。
WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available
當(dāng)最新的ProducerBatch還有空余的內(nèi)存,但是接下來(lái)的一條消息很大,不足以加上上一個(gè)Batch中,會(huì)怎么辦呢?
那么會(huì)創(chuàng)建新的ProducerBatch。
那么創(chuàng)建ProducerBatch的時(shí)候,應(yīng)該分配多少的內(nèi)存呢?
觸發(fā)創(chuàng)建ProducerBatch的那條消息預(yù)估大小大于batch.size ,則以預(yù)估內(nèi)存創(chuàng)建。
否則,以batch.size創(chuàng)建。
還有一個(gè)問(wèn)題供大家思考:
當(dāng)消息還存儲(chǔ)在緩存中的時(shí)候, 假如Producer客戶端掛掉了,消息是不是就丟失了?
總結(jié)
到此這篇關(guān)于Kafka Producer中消息緩存模型的文章就介紹到這了,更多相關(guān)Kafka Producer消息緩存模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea 普通文件夾 轉(zhuǎn)換成 module操作
這篇文章主要介紹了idea 普通文件夾 轉(zhuǎn)換成 module操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08在idea中使用JaCoCo插件統(tǒng)計(jì)單元測(cè)試覆蓋率的實(shí)現(xiàn)
這篇文章主要介紹了在idea中使用JaCoCo插件統(tǒng)計(jì)單元測(cè)試覆蓋率的實(shí)現(xiàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01Spring Boot中Bean定義方調(diào)用方式解析
這篇文章主要介紹了Spring Boot中Bean定義方調(diào)用方式解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07Spring AOP日志框架實(shí)現(xiàn)過(guò)程圖解
這篇文章主要介紹了Spring AOP日志框架實(shí)現(xiàn)過(guò)程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09企業(yè)級(jí)Kubernetes管理平臺(tái)Wayne功能特性介紹
這篇文章主要為大家介紹了企業(yè)級(jí)Kubernetes管理平臺(tái)Wayne的功能特性及架構(gòu)設(shè)計(jì),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02Java 在Excel中添加分離型餅圖、環(huán)形圖的方法
這篇文章主要介紹了Java 在Excel中添加分離型餅圖、環(huán)形圖的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-12-12