8張圖帶你全面了解Java?kafka的核心機(jī)制
kafka基礎(chǔ)架構(gòu)
現(xiàn)在假如有100T大小的消息要發(fā)送到kafka中,數(shù)據(jù)量非常大,一臺(tái)機(jī)器存儲(chǔ)不下,面對(duì)這種情況,你該如何設(shè)計(jì)呢?
很簡(jiǎn)單,分而治之,一臺(tái)不夠,那就多臺(tái),這就形成了一個(gè)kafka集群。如下圖所示,一個(gè)broker就是一個(gè)kafka節(jié)點(diǎn),100T數(shù)據(jù)就有3個(gè)節(jié)點(diǎn)分擔(dān),每個(gè)節(jié)點(diǎn)約33T,這樣就能解決問題了,還能提高吞吐量。

- Topic: 可以理解為一個(gè)隊(duì)列,一個(gè)kafka集群中可以定義很多的topic,比如上圖中的
topicA。 - Partition: 為了實(shí)現(xiàn)擴(kuò)展性,提高吞吐量,一個(gè)非常大的
topic可以分布到多個(gè)broker(即服務(wù)器)上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。比如上圖中的topicA被分成了3個(gè)partition。 - Replica: 副本,如果數(shù)據(jù)只放在一個(gè)
broker中,萬一這個(gè)broker宕機(jī)了怎么辦?為了實(shí)現(xiàn)高可用,一個(gè)topic的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè)Leader和若干個(gè)Follower。比如上圖中的虛線連接的就是它的副本。 - Leader: 每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是
Leader。 - Follower: 每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從
Leader中同步數(shù)據(jù),保持和Leader數(shù)據(jù)的同步。Leader發(fā)生故障時(shí),某個(gè)Follower會(huì)成為新的Leader。 - Producer: 消息生產(chǎn)者,就是向
Kafka broker發(fā)消息的客戶端,后面詳細(xì)講解。 - Consumer: 消息消費(fèi)者,向
Kafka broker取消息的客戶端,多個(gè)Consumer會(huì)組成一個(gè)消費(fèi)者組,后面詳細(xì)講解。 - Zookeeper:用來記錄kafka中的一些元數(shù)據(jù),比如kafka集群中的broker,leader是誰(shuí)等等,但
Kafka2.8.0版本以后也支持非zk的方式,大大減少了和zk的交互。
kafka生產(chǎn)者流程
前面通過一張圖片講解了kafka整體的架構(gòu),那現(xiàn)在我們來看看kafka生產(chǎn)者發(fā)送的整個(gè)過程,這里面也是大有文章。
在消息發(fā)送的過程中,涉及到了兩個(gè)線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。

- 在主線程中由
kafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator, 也稱為消息收集器)中。
- 攔截器: 可以用來在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計(jì)類工作。
- 序列化器: 用于在網(wǎng)絡(luò)傳輸中將數(shù)據(jù)序列化為字節(jié)流進(jìn)行傳輸,保證數(shù)據(jù)不會(huì)丟失。
- 分區(qū)器: 用于按照一定的規(guī)則將數(shù)據(jù)分發(fā)到不同的kafka broker節(jié)點(diǎn)中
Sender線程負(fù)責(zé)從RecordAccumulator獲取消息并將其發(fā)送到Kafka中。
RecordAccumulator主要用來緩存消息以便Sender線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。
RecordAccumulator緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory配置,默認(rèn)值為33554432B,即32M。
- 主線程中發(fā)送過來的消息都會(huì)被迫加到
RecordAccumulator的某個(gè)雙端隊(duì)列(Deque)中,RecordAccumulator內(nèi)部為每個(gè)分區(qū)都維護(hù)了一個(gè)雙端隊(duì)列,即Deque<ProducerBatch>, 消息寫入緩存時(shí),追加到雙端隊(duì)列的尾部。 Sender讀取消息時(shí),從雙端隊(duì)列的頭部讀取。ProducerBatch是指一個(gè)消息批次;與此同時(shí),會(huì)將較小的ProducerBatch湊成一個(gè)較大ProducerBatch,也可以減少網(wǎng)絡(luò)請(qǐng)求的次數(shù)以提升整體的吞吐量。ProducerBatch大小可以通過batch.size控制,默認(rèn)16kb。Sender線程會(huì)在有數(shù)據(jù)積累到batch.size,默認(rèn)16kb,或者如果數(shù)據(jù)遲遲未達(dá)到batch.size,Sender線程等待linger.ms設(shè)置的時(shí)間到了之后就會(huì)獲取數(shù)據(jù)。linger.ms單位ms,默認(rèn)值是0ms,表示沒有延遲。
Sender從RecordAccumulator獲取緩存的消息之后,會(huì)將數(shù)據(jù)封裝成網(wǎng)絡(luò)請(qǐng)求<Node,Request>的形式,這樣就可以將Request請(qǐng)求發(fā)往各個(gè)Node了。- 請(qǐng)求在從
sender線程發(fā)往Kafka之前還會(huì)保存到InFlightRequests中,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到服務(wù)端響應(yīng)的請(qǐng)求。InFlightRequests默認(rèn)每個(gè)分區(qū)下最多緩存5個(gè)請(qǐng)求,可以通過配置參數(shù)為max.in.flight.request.per. connection修改。 - 請(qǐng)求
Request通過通道Selector發(fā)送到kafka節(jié)點(diǎn)。 - 發(fā)送后,需要等待kafka的應(yīng)答機(jī)制,取決于配置項(xiàng)
acks.
- 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等待數(shù)據(jù)落盤就應(yīng)答。
- 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),
Leader收到數(shù)據(jù)后應(yīng)答。 - -1(all):生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader和副本節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1 和all 是等價(jià)的。
Request請(qǐng)求接受到kafka的響應(yīng)結(jié)果,如果成功的話,從InFlightRequests清除請(qǐng)求,否則的話需要進(jìn)行重發(fā)操作,可以通過配置項(xiàng)retries決定,當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試次數(shù)。默認(rèn)是 int 最大值,2147483647。- 清理消息累加器
RecordAccumulator中的數(shù)據(jù)。
kafka消費(fèi)者流程
原來kafka生產(chǎn)者發(fā)送經(jīng)過了這么多流程,我們現(xiàn)在來看看kafka消費(fèi)者又是如何進(jìn)行的呢?
Kafka 中的消費(fèi)是基于拉取模式的。消息的消費(fèi)一般有兩種模式:推送模式和拉取模式。推模式是服務(wù)端主動(dòng)將消息推送給消費(fèi)者,而拉模式是消費(fèi)者主動(dòng)向服務(wù)端發(fā)起請(qǐng)求來拉取消息。
kafka是以消費(fèi)者組進(jìn)行消費(fèi)的,一個(gè)消費(fèi)者組,由多個(gè)consumer組成。形成一個(gè)消費(fèi)者組的條件,是所有消費(fèi)者的groupid相同。

- 消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi)。如果向消費(fèi)組中添加更多的消費(fèi)者,超過主題分區(qū)數(shù)量,則有一部分消費(fèi)者就會(huì)閑置,不會(huì)接收任何消息。
- 消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
那么問題來了,kafka是如何指定消費(fèi)者組的每個(gè)消費(fèi)者消費(fèi)哪個(gè)分區(qū)?每次消費(fèi)的數(shù)量是多少呢?
一、如何制定消費(fèi)方案

- 消費(fèi)者consumerA,consumerB, consumerC向kafka集群中的協(xié)調(diào)器
coordinator發(fā)送JoinGroup的請(qǐng)求。coordinator主要是用來輔助實(shí)現(xiàn)消費(fèi)者組的初始化和分區(qū)的分配。
coordinator老大節(jié)點(diǎn)選擇 =groupid的hashcode值 % 50(__consumer_offsets內(nèi)置主題位移的分區(qū)數(shù)量)例如:groupid的hashcode值 為1,1% 50 = 1,那么__consumer_offsets主題的1號(hào)分區(qū),在哪個(gè)broker上,就選擇這個(gè)節(jié)點(diǎn)的coordinator作為這個(gè)消費(fèi)者組的老大。消費(fèi)者組下的所有的消費(fèi)者提交offset的時(shí)候就往這個(gè)分區(qū)去提交offset。
- 選出一個(gè)
consumer作為消費(fèi)中的leader,比如上圖中的ConsumerB。 - 消費(fèi)者
leader制定出消費(fèi)方案,比如誰(shuí)來消費(fèi)哪個(gè)分區(qū)等 - 把消費(fèi)方案發(fā)給
coordinator - 最后
coordinator就把消費(fèi)方 案下發(fā)給各個(gè)consumer, 圖中只畫了一條線,實(shí)際上是有下發(fā)各個(gè)consumer。
注意,每個(gè)消費(fèi)者都會(huì)和coordinator保持心跳(默認(rèn)3s),一旦超時(shí)(session.timeout.ms=45s),該消費(fèi)者會(huì)被移除,并觸發(fā)再平衡;或者消費(fèi)者處理消息的時(shí)間過長(zhǎng)(max.poll.interval.ms=5分鐘),也會(huì)觸發(fā)再平衡,也就是重新進(jìn)行上面的流程。
二、消費(fèi)者消費(fèi)細(xì)節(jié)
現(xiàn)在已經(jīng)初始化消費(fèi)者組信息,知道哪個(gè)消費(fèi)者消費(fèi)哪個(gè)分區(qū),接著我們來看看消費(fèi)者細(xì)節(jié)。

- 消費(fèi)者創(chuàng)建一個(gè)網(wǎng)絡(luò)連接客戶端
ConsumerNetworkClient, 發(fā)送消費(fèi)請(qǐng)求,可以進(jìn)行如下配置:
fetch.min.bytes: 每批次最小抓取大小,默認(rèn)1字節(jié)fetch.max.bytes: 每批次最大抓取大小,默認(rèn)50Mfetch.max.wait.ms:最大超時(shí)時(shí)間,默認(rèn)500ms
- 發(fā)送請(qǐng)求到kafka集群
- 成功的回調(diào),會(huì)將數(shù)據(jù)保存到
completedFetches隊(duì)列中 - 消費(fèi)者從隊(duì)列中抓取數(shù)據(jù),根據(jù)配置
max.poll.records一次拉取數(shù)據(jù)返回消息的最大條數(shù),默認(rèn)500條。 - 獲取到數(shù)據(jù)后,需要經(jīng)過反序列化器、攔截器等。
kafka的存儲(chǔ)機(jī)制
我們都知道消息發(fā)送到kafka,最終是存儲(chǔ)到磁盤中的,我們看下kafka是如何存儲(chǔ)的。

一個(gè)topic分為多個(gè)partition,每個(gè)partition對(duì)應(yīng)于一個(gè)log文件,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,每個(gè)partition分為多個(gè)segment。每個(gè)segment包括:“.index”文件、“.log”文件和.timeindex等文件,Producer生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log文件末端。

上圖中t1即為一個(gè)topic的名稱,而“t1-0/t1-1”則表明這個(gè)目錄是t1這個(gè)topic的哪個(gè)partition。

kafka中的索引文件以稀疏索引(sparseindex)的方式構(gòu)造消息的索引,如下圖所示:

1.根據(jù)目標(biāo)offset定位segment文件
2.找到小于等于目標(biāo)offset的最大offset對(duì)應(yīng)的索引項(xiàng)
3.定位到log文件
4.向下遍歷找到目標(biāo)Record
注意:index為稀疏索引,大約每往log文件寫入4kb數(shù)據(jù),會(huì)往index文件寫入一條索引。通過參數(shù)log.index.interval.bytes控制,默認(rèn)4kb。
那kafka中磁盤文件保存多久呢?
kafka 中默認(rèn)的日志保存時(shí)間為 7 天,可以通過調(diào)整如下參數(shù)修改保存時(shí)間。
log.retention.hours,最低優(yōu)先級(jí)小時(shí),默認(rèn) 7 天。log.retention.minutes,分鐘。log.retention.ms,最高優(yōu)先級(jí)毫秒。log.retention.check.interval.ms,負(fù)責(zé)設(shè)置檢查周期,默認(rèn) 5 分鐘。
總結(jié)
其實(shí)kafka中的細(xì)節(jié)十分多,本文也只是對(duì)kafka的一些核心機(jī)制從理論層面做了一個(gè)總結(jié),更多的細(xì)節(jié)還是需要自行去實(shí)踐,去學(xué)習(xí)。
以上就是8張圖帶你全面了解Java kafka的核心機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于Java kafka核心機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot引入遠(yuǎn)程nacos配置文件錯(cuò)誤的解決方案
本文為解決Spring Cloud Alibaba中配置導(dǎo)入問題,提供了詳細(xì)的步驟說明,包括引入依賴、配置nacos、創(chuàng)建bootstrap.yml文件以及測(cè)試配置導(dǎo)入是否成功的方法,幫助開發(fā)者快速解決相關(guān)問題2024-09-09
哈希表在算法題目中的實(shí)際應(yīng)用詳解(Java)
散列表(Hash?table,也叫哈希表)是根據(jù)關(guān)鍵碼值(Key?value)而直接進(jìn)行訪問的數(shù)據(jù)結(jié)構(gòu),下面這篇文章主要給大家介紹了關(guān)于哈希表在算法題目中的實(shí)際應(yīng)用,文中介紹的方法是Java,需要的朋友可以參考下2024-03-03
Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析
這篇文章主要介紹了Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
SpringBoot中的@ApiModelProperty注解作用
這篇文章主要介紹了SpringBoot中的@ApiModelProperty注解作用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01
解決@RequestBody使用不能class類型匹配的問題
這篇文章主要介紹了解決@RequestBody使用不能class類型匹配的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07

