kafka的消息存儲(chǔ)機(jī)制和原理分析
消息的保存路徑
消息發(fā)送端發(fā)送消息到 broker 上以后,消息是如何持久化的?
數(shù)據(jù)分片
kafka 使用日志文件的方式來(lái)保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一個(gè) offset 值來(lái)表示它在分區(qū)中的偏移量。
Kafka 中存儲(chǔ)的一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,一個(gè)分片 并不是直接對(duì)應(yīng)在一個(gè)磁盤上的日志文件,而是對(duì)應(yīng)磁盤上的一個(gè)目錄,這個(gè)目錄的命名規(guī)則是<topic_name>_<partition_id>。
比如創(chuàng)建一個(gè)名為firstTopic的topic,其中有3個(gè)partition,那么在 kafka 的數(shù)據(jù)目錄(/tmp/kafka-log)中就有 3 個(gè)目錄,firstTopic-0~3
多個(gè)分區(qū)在集群中多個(gè)broker上的分配方法
1.將所有 N Broker 和待分配的 i 個(gè) Partition 排序
2.將第 i 個(gè) Partition 分配到第(i mod n)個(gè) Broker 上
log分段
每個(gè)分片目錄中,kafka 通過分段的方式將 數(shù)據(jù) 分為多個(gè) LogSegment,一個(gè) LogSegment 對(duì)應(yīng)磁盤上的一個(gè)日志文件(00000000000000000000.log)和一個(gè)索引文件(如上:00000000000000000000.index),其中日志文件是用來(lái)記錄消息的。索引文件是用來(lái)保存消息的索引。
每個(gè)LogSegment 的大小可以在server.properties 中l(wèi)og.segment.bytes=107370 (設(shè)置分段大小,默認(rèn)是1gb)選項(xiàng)進(jìn)行設(shè)置。
segment 的 index file 和 data file 2 個(gè)文件一一對(duì)應(yīng),成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為 segment 索引文件、數(shù)據(jù)文件.命名規(guī)則:partion 全局的第一個(gè) segment從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment文件最后一條消息的 offset 值進(jìn)行遞增。數(shù)值最大為 64 位long 大小,20 位數(shù)字字符長(zhǎng)度,沒有數(shù)字用 0 填充
第一個(gè) log 文件的最后一個(gè) offset 為:5376,所以下一個(gè)segment 的文件命名為: 0000000000000005376.log。
對(duì)應(yīng)的 index 為 00000000000000005376.index
kafka 這種分片和分段策略,避免了數(shù)據(jù)量過大時(shí),數(shù)據(jù)文件文件無(wú)限擴(kuò)張帶來(lái)的隱患,更有助于消息文件的維護(hù)以及被消費(fèi)的消息的清理。
日志和索引文件內(nèi)容分析
通過下面這條命令可以看到 kafka 消息日志的內(nèi)容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
輸出結(jié)果為:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一條消息,會(huì)包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意義:
offset
:記錄號(hào) ;position
:偏移量;createTime
:創(chuàng)建時(shí)間、keysize
和valuesize
表示key
和value
的大小compresscodec
:表示壓縮編碼payload
:表示消息的具體內(nèi)容
為了提高查找消息的性能,kafka為每一個(gè)日志文件添加 了2 個(gè)索引文件:OffsetIndex 和 TimeIndex,分別對(duì)應(yīng)*.index以及*.timeindex, *.TimeIndex 是映射時(shí)間戳和相對(duì) offset的文件
查看索引內(nèi)容命令:
?sh ?kafka-run-class.shkafka.tools.DumpLogSegments ?--files ?/tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
索引文件和日志文件內(nèi)容關(guān)系如下
如上圖所示,index 文件中存儲(chǔ)了索引以及物理偏移量。
log 文件存儲(chǔ)了消息的內(nèi)容。
索引文件中保存了部分offset和偏移量position的對(duì)應(yīng)關(guān)系。
比如 index文件中 [4053,80899],表示在 log 文件中,對(duì)應(yīng)的是第 4053 條記錄,物理偏移量(position)為 80899.
在 partition 中通過 offset 查找 message過程
- 根據(jù) offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一個(gè)文件的最后一個(gè)offset 進(jìn)行命名的,所以,使用二分查找算法能夠根據(jù)offset 快速定位到指定的索引文件
- 找到索引文件后,根據(jù) offset 進(jìn)行定位,找到索引文件中的匹配范圍的偏移量position。(kafka 采用稀疏索引的方式來(lái)提高查找性能)
- 得到 position 以后,再到對(duì)應(yīng)的 log 文件中,從 position處開始查找 offset 對(duì)應(yīng)的消息,將每條消息的 offset 與目標(biāo) offset 進(jìn)行比較,直到找到消息
比如說(shuō),我們要查找 offset=2490 這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個(gè)索引,再到 log 文件中,根據(jù) 49111 這個(gè) position 開始查找,比較每條消息的 offset 是否大于等于 2490。最后查找到對(duì)應(yīng)的消息以后返回
日志的清除策略以及壓縮策略
日志的清理策略有兩個(gè)
- 根據(jù)消息的保留時(shí)間,當(dāng)消息在 kafka 中保存的時(shí)間超過了指定的時(shí)間,就會(huì)觸發(fā)清理過程
- 根據(jù) topic 存儲(chǔ)的數(shù)據(jù)大小,當(dāng) topic 所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息。
通過 log.retention.bytes 和 log.retention.hours 這兩個(gè)參數(shù)來(lái)設(shè)置,當(dāng)其中任意一個(gè)達(dá)到要求,都會(huì)執(zhí)行刪除。默認(rèn)的保留時(shí)間是:7 天
kafka會(huì)啟動(dòng)一個(gè)后臺(tái)線程,定期檢查是否存在可以刪除的消息。
日志壓縮策略
Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過這個(gè)功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況,在很多實(shí)際場(chǎng)景中,消息的 key 和 value 的值之間的對(duì)應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫(kù)中的數(shù)據(jù)會(huì)不斷被修改一樣,消費(fèi)者只關(guān)心 key 對(duì)應(yīng)的最新的 value。
因此,我們可以開啟 kafka 的日志壓縮功能,服務(wù)端會(huì)在后臺(tái)啟動(dòng)Cleaner線程池,定期將相同的key進(jìn)行合并,只保留最新的 value 值。日志的壓縮原理如下圖:
消息寫入的性能
順序?qū)?/h3>
我們現(xiàn)在大部分企業(yè)仍然用的是機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對(duì)應(yīng)的柱面、磁頭以及對(duì)應(yīng)的扇區(qū);
這個(gè)過程相對(duì)內(nèi)存來(lái)說(shuō)會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來(lái)的時(shí)間消耗,kafka 采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)。
零拷貝
即使采用順序?qū)?,但是頻繁的 I/O 操作仍然會(huì)造成磁盤的性能瓶頸,所以 kafka還有一個(gè)性能策略:零拷貝
消息從發(fā)送到落地保存,broker 維護(hù)的消息日志本身就是文件目錄,每個(gè)文件都是二進(jìn)制保存,生產(chǎn)者和消費(fèi)者使用相同的格式來(lái)處理。
在消費(fèi)者獲取消息時(shí),服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動(dòng)的通過 socket 發(fā)送給消費(fèi)者。
雖然這個(gè)操作描述起來(lái)很簡(jiǎn)單,但實(shí)際上經(jīng)歷了很多步驟。如下:
- 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁(yè)緩存
- 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
- 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到 socket 緩存中
- 操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這個(gè)過程涉及到 4 次上下文切換以及 4 次數(shù)據(jù)復(fù)制,并且有兩次復(fù)制操作是由 CPU 完成。但是這個(gè)過程中,數(shù)據(jù)完全沒有進(jìn)行變化,僅僅是從磁盤復(fù)制到網(wǎng)卡緩沖區(qū)。通過“零拷貝”技術(shù),可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作,同時(shí)也會(huì)減少上下文切換次數(shù)。
現(xiàn)代的 unix 操作系統(tǒng)提供一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁(yè)緩存?zhèn)鬏數(shù)?socket;在 Linux 中,是通過 sendfile 系統(tǒng)調(diào)用來(lái)完成的。
Java 提供了訪問這個(gè)系統(tǒng)調(diào)用的方法:FileChannel.transferTo API。
使用 sendfile,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁(yè)緩存發(fā)送到網(wǎng)絡(luò)上。
所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring常用注解及http數(shù)據(jù)轉(zhuǎn)換教程
這篇文章主要為大家介紹了Spring常用注解及http數(shù)據(jù)轉(zhuǎn)換原理以及接收復(fù)雜嵌套對(duì)象參數(shù)與Http數(shù)據(jù)轉(zhuǎn)換的原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03基于eclipse.ini內(nèi)存設(shè)置的問題詳解
本篇文章是對(duì)eclipse.ini內(nèi)存設(shè)置的問題進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05java計(jì)算兩個(gè)時(shí)間相差天數(shù)的方法匯總
這篇文章主要介紹了java計(jì)算兩個(gè)時(shí)間相差天數(shù)的方法,感興趣的小伙伴們可以參考一下2015-11-11Java中BigDecimal,DateFormatter?和迭代器的"陷阱"
這篇文章主要介紹了Java中BigDecimal,DateFormatter?和迭代器的"陷阱",文章圍繞主題展開詳細(xì)的內(nèi)容介紹,感興趣的小伙伴可以參考一下2022-06-06mybatis動(dòng)態(tài)SQL?if的test寫法及規(guī)則詳解
這篇文章主要介紹了mybatis動(dòng)態(tài)SQL?if的test寫法及規(guī)則詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01AsyncHttpClient?ClientStats源碼流程解讀
這篇文章主要為大家介紹了AsyncHttpClient?ClientStats源碼流程解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12解決啟用 Spring-Cloud-OpenFeign 配置可刷新項(xiàng)目無(wú)法啟動(dòng)的問題
這篇文章主要介紹了解決啟用 Spring-Cloud-OpenFeign 配置可刷新項(xiàng)目無(wú)法啟動(dòng)的問題,本文重點(diǎn)給大家介紹Spring-Cloud-OpenFeign的原理及問題解決方法,需要的朋友可以參考下2021-10-10學(xué)習(xí)Java之如何正確地向上轉(zhuǎn)型與向下轉(zhuǎn)型
面向?qū)ο蟮牡谌齻€(gè)特征是多態(tài),實(shí)現(xiàn)多態(tài)有三個(gè)必要條件:繼承、方法重寫和向上轉(zhuǎn)型,在學(xué)習(xí)多態(tài)之前,我們還要先學(xué)習(xí)Java的類型轉(zhuǎn)換,本篇文章就來(lái)帶大家認(rèn)識(shí)什么是類型轉(zhuǎn)換,看看類型轉(zhuǎn)換都有哪幾種情況,以及如何避免類型轉(zhuǎn)換時(shí)出現(xiàn)異常2023-05-05