Kafka?日志存儲實現(xiàn)過程
正文
在進行詳解之前,我想先聲明一下,本次我們進行講解說明的是 Kafka 消息存儲的信息文件內(nèi)容,不是所謂的 Kafka 服務(wù)器運行產(chǎn)生的日志文件,這一點希望大家清楚。
Kafka 消息是以主題為單位進行歸類,各個主題之間是彼此獨立的,互不影響。每個主題又可以分為一個或多個分區(qū)。每個分區(qū)各自存在一個記錄消息數(shù)據(jù)的日志文件。也就是該文要著重關(guān)注的內(nèi)容。我們根據(jù)如下的圖進行進一步說明:
圖中,創(chuàng)建了一個 demo-topic
主題,其存在 7 個 Parition,對應(yīng)的每個 Parition 下存在一個 [Topic-Parition]
命名的消息日志文件。在理想情況下,數(shù)據(jù)流量分攤到各個 Parition 中,實現(xiàn)了負載均衡的效果。在分區(qū)日志文件中,你會發(fā)現(xiàn)很多類型的文件,比如:.index、.timestamp、.log、.snapshot
等,其中,文件名一致的文件集合就稱為 LogSement。我們先留有這樣的一個整體的日志結(jié)構(gòu)概念,接下來我們一一的進行詳細的說明其中的設(shè)計。
LogSegment
我們已經(jīng)知道分區(qū)日志文件中包含很多的 LogSegment ,Kafka 日志追加是順序?qū)懭氲?,LogSegment 可以減小日志文件的大小,進行日志刪除的時候和數(shù)據(jù)查找的時候可以快速定位。同時,ActiveLogSegment 也就是活躍的日志分段擁有文件擁有寫入權(quán)限,其余的 LogSegment 只有只讀的權(quán)限。
日志文件存在多種后綴文件,重點需要關(guān)注 .index、.timestamp、.log
三種類型。其他的日志類型功能作用,請查詢下面圖表:
類別 | 作用 |
---|---|
.index | 偏移量索引文件 |
.timestamp | 時間戳索引文件 |
.log | 日志文件 |
.snaphot | 快照文件 |
.deleted | |
.cleaned | 日志清理時臨時文件 |
.swap | Log Compaction 之后的臨時文件 |
Leader-epoch-checkpoint |
每個 LogSegment 都有一個基準(zhǔn)偏移量,用來表示當(dāng)前 LogSegment 中第一條消息的 offset。偏移量是一個 64 位的長整形數(shù),固定是20位數(shù)字,長度未達到,用 0 進行填補,索引文件和日志文件都由該作為文件名命名規(guī)則(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特別說明一下,如果日志文件名為 00000000000000000121.log
,則當(dāng)前日志文件的一條數(shù)據(jù)偏移量就是 121,偏移量是從 0 開始的。
如果想要查看相應(yīng)文件內(nèi)容可以通過 kafka-run-class.sh
腳本查看 .log
:
/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
2.0 中可以使用 kafka-dump-log.sh
查 看.index
文件
/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
日志與索引文件
配置項 | 默認值 | 說明 |
---|---|---|
log.index.interval.bytes | 4096 (4K) | 增加索引項字節(jié)間隔密度,會影響索引文件中的區(qū)間密度和查詢效率 |
log.segment.bytes | 1073741824 (1G) | 日志文件最大值 |
log.roll.ms | 當(dāng)前日志分段中消息的最大時間戳與當(dāng)前系統(tǒng)的時間戳的差值允許的最大范圍,毫秒維度 | |
log.roll.hours | 168 (7天) | 當(dāng)前日志分段中消息的最大時間戳與當(dāng)前系統(tǒng)的時間戳的差值允許的最大范圍,小時維度 |
log.index.size.max.bytes | 10485760 (10MB) | 觸發(fā)偏移量索引文件或時間戳索引文件分段字節(jié)限額 |
偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關(guān)系。時間戳索引文件則根據(jù)時間戳查找對應(yīng)的偏移量。
Kafka 中的索引文件是以稀疏索引的方式構(gòu)造消息的索引,他并不保證每一個消息在索引文件中都有對應(yīng)的索引項。每當(dāng)寫入一定量的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,通過修改 log.index.interval.bytes
的值,改變索引項的密度。
切分文件
從上文中可知,日志文件和索引文件都會存在多個文件,組成多個 SegmentLog,那么其切分的規(guī)則是怎樣的呢?
當(dāng)滿足如下幾個條件中的其中之一,就會觸發(fā)文件的切分:
- 當(dāng)前日志分段文件的大小超過了 broker 端參數(shù)
log.segment.bytes
配置的值。log.segment.bytes
參數(shù)的默認值為 1073741824,即 1GB。 - 當(dāng)前日志分段中消息的最大時間戳與當(dāng)前系統(tǒng)的時間戳的差值大于
log.roll.ms
或log.roll.hours
參數(shù)配置的值。如果同時配置了log.roll.ms
和log.roll.hours
參數(shù),那么log.roll.ms
的優(yōu)先級高。默認情況下,只配置了log.roll.hours
參數(shù),其值為168,即 7 天。 - 偏移量索引文件或時間戳索引文件的大小達到 broker 端參數(shù)
log.index.size.max.bytes
配置的值。log.index.size.max.bytes
的默認值為 10485760,即 10MB。 - 追加的消息的偏移量與當(dāng)前日志分段的偏移量之間的差值大于
Integer.MAX_VALUE
,即要追加的消息的偏移量不能轉(zhuǎn)變?yōu)橄鄬ζ屏俊?/li>
為什么是 Integer.MAX_VALUE
?
在偏移量索引文件中,每個索引項共占用 8 個字節(jié),并分為兩部分。相對偏移量和物理地址。
相對偏移量:表示消息相對與基準(zhǔn)偏移量的偏移量,占 4 個字節(jié)
物理地址:消息在日志分段文件中對應(yīng)的物理位置,也占 4 個字節(jié)
4 個字節(jié)剛好對應(yīng) Integer.MAX_VALUE
,如果大于 Integer.MAX_VALUE
,則不能用 4 個字節(jié)進行表示了。
索引文件切分過程
索引文件會根據(jù) log.index.size.max.bytes
值進行預(yù)先分配空間,即文件創(chuàng)建的時候就是最大值,當(dāng)真正的進行索引文件切分的時候,才會將其裁剪到實際數(shù)據(jù)大小的文件。這一點是跟日志文件有所區(qū)別的地方。其意義降低了代碼邏輯的復(fù)雜性。
查找消息
offset 查詢
偏移量索引由相對偏移量和物理地址組成。
可以通過如下命令解析.index
文件
/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0 offset:20 position:320 offset:43 position:1220
注意:offset 與 position 沒有直接關(guān)系哦,由于存在數(shù)據(jù)刪除和日志清理。
e.g. 如何查看 偏移量為 23 的消息?
Kafka 中存在一個 ConcurrentSkipListMap
來保存在每個日志分段,通過跳躍表方式,定位到在 00000000000000000000.index
,通過二分法在偏移量索引文件中找到不大于 23 的最大索引項,即 offset 20 那欄,然后從日志分段文件中的物理位置為320 開始順序查找偏移量為 23 的消息。
時間戳方式查詢
在上文已經(jīng)有所提及,通過時間戳方式進行查找消息,需要通過查找時間戳索引和偏移量索引兩個文件。
時間戳索引索引格式
e.g. 查找時間戳為 1557554753430 開始的消息?
- 將 1557554753430 和每個日志分段中最大時間戳 largestTimeStamp 逐一對比,直到找到不小于 1557554753430 所對應(yīng)的日志分段。日志分段中的 largestTimeStamp 的計算是先查詢該日志分段所對應(yīng)時間戳索引文件,找到最后一條索引項,若最后一條索引項的時間戳字段值大于 0 ,則取該值,否則去該日志分段的最近修改時間。
- 找到相應(yīng)日志分段之后,使用二分法進行定位,與偏移量索引方式類似,找到不大于 1557554753430 最大索引項,也就是 [1557554753420 430]。
- 拿著偏移量為 430 到偏移量索引文件中使用二分法找到不大于 430 最大索引項,即 [20,320] 。
- 日志文件中從 320 的物理位置開始查找不小于 1557554753430 數(shù)據(jù)。
注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應(yīng)的哦。因為數(shù)據(jù)的寫入是各自追加。
在偏移量索引文件中,索引數(shù)據(jù)都是順序記錄 offset ,但時間戳索引文件中每個追加的索引時間戳必須大于之前追加的索引項,否則不予追加。在 Kafka 0.11.0.0
以后,消息信息中存在若干的時間戳信息。如果 broker 端參數(shù) log.message.timestamp.type
設(shè)置為 LogAppendTIme ,那么時間戳必定能保持單調(diào)增長。反之如果是 CreateTime 則無法保證順序。
日志清理
日志清理,不是日志刪除哦,這還是有所區(qū)別的,日志刪除會在下文進行說明。
Kafka 提供兩種日志清理策略:
日志刪除:按照一定的刪除策略,將不滿足條件的數(shù)據(jù)進行數(shù)據(jù)刪除
日志壓縮:針對每個消息的 Key 進行整合,對于有相同 Key 的不同 Value 值,只保留最后一個版本。
Kafka 提供 log.cleanup.policy
參數(shù)進行相應(yīng)配置,默認值:delete,還可以選擇 compact。
是否支持針對具體的 Topic 進行配置?
答案是肯定的,主題級別的配置項是 cleanup.policy
。
日志刪除
配置 | 默認值 | 說明 |
---|---|---|
log.retention.check.interval.ms | 300000 (5分鐘) | 檢測頻率 |
log.retention.hours | 168 (7天) | 日志保留時間小時 |
log.retention.minutes | 日志保留時間分鐘 | |
log.retention.ms | 日志保留時間毫秒 | |
file.delete.delay.ms | 60000 (1分鐘) | 延遲執(zhí)行刪除時間 |
log.retention.bytes | -1 無窮大 | 運行保留日志文件最大值 |
log.retention.bytes | 1073741824 (1G) | 日志文件最大值 |
Kafka 會周期性根據(jù)相應(yīng)規(guī)則進行日志數(shù)據(jù)刪除,保留策略有 3 種:基于時間的保留策略、基于日志大小的保留策略和基于日志其實偏移量的保留策略。
基于時間
日志刪除任務(wù)會根據(jù) log.retention.hours/log.retention.minutes/log.retention.ms
設(shè)定日志保留的時間節(jié)點。如果超過該設(shè)定值,就需要進行刪除。默認是 7 天,log.retention.ms
優(yōu)先級最高。
如何查找日志分段文件中已經(jīng)過去的數(shù)據(jù)呢?
Kafka 依據(jù)日志分段中最大的時間戳進行定位,首先要查詢該日志分段所對應(yīng)的時間戳索引文件,查找時間戳索引文件中最后一條索引項,若最后一條索引項的時間戳字段值大于 0,則取該值,否則取最近修改時間。
為什么不直接選最近修改時間呢?
因為日志文件可以有意無意的被修改,并不能真實的反應(yīng)日志分段的最大時間信息。
刪除過程
- 從日志對象中所維護日志分段的跳躍表中移除待刪除的日志分段,保證沒有線程對這些日志分段進行讀取操作。
- 這些日志分段所有文件添加 上
.delete
后綴。 - 交由一個以
"delete-file"
命名的延遲任務(wù)來刪除這些.delete
為后綴的文件。延遲執(zhí)行時間可以通過file.delete.delay.ms
進行設(shè)置
如果活躍的日志分段中也存在需要刪除的數(shù)據(jù)時?
Kafka 會先切分出一個新的日志分段作為活躍日志分段,然后執(zhí)行刪除操作。
基于日志大小
日志刪除任務(wù)會檢查當(dāng)前日志的大小是否超過設(shè)定值。設(shè)定項為 log.retention.bytes
,單個日志分段的大小由 log.regment.bytes
進行設(shè)定。
刪除過程
- 計算需要被刪除的日志總大小 (當(dāng)前日志文件大小-retention值)。
- 從日志文件第一個 LogSegment 開始查找可刪除的日志分段的文件集合。
- 執(zhí)行刪除。
基于日志起始偏移量
基于日志起始偏移量的保留策略的判斷依據(jù)是某日志分段的下一個日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,則可以刪除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一個日志分段的基準(zhǔn)偏移量,存在數(shù)據(jù)刪除,可能與之相等的那條數(shù)據(jù)已經(jīng)被刪除了。
刪除過程
- 從頭開始遍歷每一個日志分段,日志分段 1 的下一個日志分段的起始偏移量為 11,小于 logStartOffset,將 日志分段 1 加入到刪除隊列中
- 日志分段 2 的下一個日志分段的起始偏移量為 23,小于 logStartOffset,將 日志分段 2 加入到刪除隊列中
- 日志分段 3 的下一個日志分段的起始偏移量為 30,大于 logStartOffset,則不進行刪除。
以上就是Kafka 日志存儲實現(xiàn)過程的詳細內(nèi)容,更多關(guān)于Kafka 日志存儲的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java批量導(dǎo)入導(dǎo)出文件的實例分享(兼容xls,xlsx)
這篇文章主要給大家介紹了利用java批量導(dǎo)入導(dǎo)出文件的相關(guān)資料,文中給出了詳細的實例代碼,并且兼容xls,xlsx,對大家具有一定的參考學(xué)習(xí)價值,下面跟著小編一起來看看詳細的介紹吧。2017-06-06如何在IDEA上安裝scala插件并創(chuàng)建工程(圖文教程)
這篇文章主要介紹了一文教你如何在IDEA上安裝scala插件并創(chuàng)建工程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07Idea導(dǎo)入eureka源碼實現(xiàn)過程解析
這篇文章主要介紹了Idea導(dǎo)入eureka源碼實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08Spring使用注解方式實現(xiàn)創(chuàng)建對象
這篇文章主要介紹了Spring使用注解方式實現(xiàn)創(chuàng)建對象,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2023-02-02關(guān)于@Autowired注解和靜態(tài)方法及new的關(guān)系
這篇文章主要介紹了關(guān)于@Autowired注解和靜態(tài)方法及new的關(guān)系,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02SpringBoot中的@EnableAutoConfiguration注解解析
這篇文章主要介紹了SpringBoot中的@EnableAutoConfiguration注解解析,@EnableAutoConfiguration也是借助@Import的幫助,將所有符合自動配置條件的bean定義注冊到IoC容器,需要的朋友可以參考下2023-09-09Java中的CopyOnWriteArrayList原理詳解
這篇文章主要介紹了Java中的CopyOnWriteArrayList原理詳解,如源碼所示,CopyOnWriteArrayList和ArrayList一樣,都在內(nèi)部維護了一個數(shù)組,操作CopyOnWriteArrayList其實就是在操作內(nèi)部的數(shù)組,需要的朋友可以參考下2023-12-12