8張圖帶你全面了解Java?kafka的核心機(jī)制
kafka基礎(chǔ)架構(gòu)
現(xiàn)在假如有100T大小的消息要發(fā)送到kafka中,數(shù)據(jù)量非常大,一臺(tái)機(jī)器存儲(chǔ)不下,面對(duì)這種情況,你該如何設(shè)計(jì)呢?
很簡(jiǎn)單,分而治之,一臺(tái)不夠,那就多臺(tái),這就形成了一個(gè)kafka集群。如下圖所示,一個(gè)broker就是一個(gè)kafka節(jié)點(diǎn),100T數(shù)據(jù)就有3個(gè)節(jié)點(diǎn)分擔(dān),每個(gè)節(jié)點(diǎn)約33T,這樣就能解決問(wèn)題了,還能提高吞吐量。
- Topic: 可以理解為一個(gè)隊(duì)列,一個(gè)kafka集群中可以定義很多的topic,比如上圖中的
topicA
。 - Partition: 為了實(shí)現(xiàn)擴(kuò)展性,提高吞吐量,一個(gè)非常大的
topic
可以分布到多個(gè)broker
(即服務(wù)器)上,一個(gè)topic
可以分為多個(gè)partition
,每個(gè)partition
是一個(gè)有序的隊(duì)列。比如上圖中的topicA被分成了3個(gè)partition
。 - Replica: 副本,如果數(shù)據(jù)只放在一個(gè)
broker
中,萬(wàn)一這個(gè)broker
宕機(jī)了怎么辦?為了實(shí)現(xiàn)高可用,一個(gè)topic
的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè)Leader
和若干個(gè)Follower
。比如上圖中的虛線連接的就是它的副本。 - Leader: 每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是
Leader
。 - Follower: 每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從
Leader
中同步數(shù)據(jù),保持和Leader
數(shù)據(jù)的同步。Leader
發(fā)生故障時(shí),某個(gè)Follower
會(huì)成為新的Leader
。 - Producer: 消息生產(chǎn)者,就是向
Kafka broker
發(fā)消息的客戶端,后面詳細(xì)講解。 - Consumer: 消息消費(fèi)者,向
Kafka broker
取消息的客戶端,多個(gè)Consumer
會(huì)組成一個(gè)消費(fèi)者組,后面詳細(xì)講解。 - Zookeeper:用來(lái)記錄kafka中的一些元數(shù)據(jù),比如kafka集群中的broker,leader是誰(shuí)等等,但
Kafka
2.8.0版本以后也支持非zk的方式,大大減少了和zk的交互。
kafka生產(chǎn)者流程
前面通過(guò)一張圖片講解了kafka整體的架構(gòu),那現(xiàn)在我們來(lái)看看kafka生產(chǎn)者發(fā)送的整個(gè)過(guò)程,這里面也是大有文章。
在消息發(fā)送的過(guò)程中,涉及到了兩個(gè)線程——main
線程和 Sender
線程。在 main
線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator
。main
線程將消息發(fā)送給 RecordAccumulator
,Sender
線程不斷從 RecordAccumulator
中拉取消息發(fā)送到 Kafka Broker
。
- 在主線程中由
kafkaProducer
創(chuàng)建消息,然后通過(guò)可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator
, 也稱為消息收集器)中。
- 攔截器: 可以用來(lái)在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過(guò)濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來(lái)在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計(jì)類工作。
- 序列化器: 用于在網(wǎng)絡(luò)傳輸中將數(shù)據(jù)序列化為字節(jié)流進(jìn)行傳輸,保證數(shù)據(jù)不會(huì)丟失。
- 分區(qū)器: 用于按照一定的規(guī)則將數(shù)據(jù)分發(fā)到不同的kafka broker節(jié)點(diǎn)中
Sender
線程負(fù)責(zé)從RecordAccumulator
獲取消息并將其發(fā)送到Kafka
中。
RecordAccumulator
主要用來(lái)緩存消息以便Sender
線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。
RecordAccumulator
緩存的大小可以通過(guò)生產(chǎn)者客戶端參數(shù)buffer.memory
配置,默認(rèn)值為33554432B
,即32M
。
- 主線程中發(fā)送過(guò)來(lái)的消息都會(huì)被迫加到
RecordAccumulator
的某個(gè)雙端隊(duì)列(Deque
)中,RecordAccumulator
內(nèi)部為每個(gè)分區(qū)都維護(hù)了一個(gè)雙端隊(duì)列,即Deque<ProducerBatch>
, 消息寫(xiě)入緩存時(shí),追加到雙端隊(duì)列的尾部。 Sender
讀取消息時(shí),從雙端隊(duì)列的頭部讀取。ProducerBatch
是指一個(gè)消息批次;與此同時(shí),會(huì)將較小的ProducerBatch
湊成一個(gè)較大ProducerBatch
,也可以減少網(wǎng)絡(luò)請(qǐng)求的次數(shù)以提升整體的吞吐量。ProducerBatch
大小可以通過(guò)batch.size
控制,默認(rèn)16kb
。Sender
線程會(huì)在有數(shù)據(jù)積累到batch.size
,默認(rèn)16kb,或者如果數(shù)據(jù)遲遲未達(dá)到batch.size
,Sender
線程等待linger.ms
設(shè)置的時(shí)間到了之后就會(huì)獲取數(shù)據(jù)。linger.ms
單位ms
,默認(rèn)值是0ms
,表示沒(méi)有延遲。
Sender
從RecordAccumulator
獲取緩存的消息之后,會(huì)將數(shù)據(jù)封裝成網(wǎng)絡(luò)請(qǐng)求<Node,Request>
的形式,這樣就可以將Request
請(qǐng)求發(fā)往各個(gè)Node
了。- 請(qǐng)求在從
sender
線程發(fā)往Kafka
之前還會(huì)保存到InFlightRequests
中,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒(méi)有收到服務(wù)端響應(yīng)的請(qǐng)求。InFlightRequests
默認(rèn)每個(gè)分區(qū)下最多緩存5個(gè)請(qǐng)求,可以通過(guò)配置參數(shù)為max.in.flight.request.per. connection
修改。 - 請(qǐng)求
Request
通過(guò)通道Selector
發(fā)送到kafka
節(jié)點(diǎn)。 - 發(fā)送后,需要等待kafka的應(yīng)答機(jī)制,取決于配置項(xiàng)
acks
.
- 0:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),不需要等待數(shù)據(jù)落盤(pán)就應(yīng)答。
- 1:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),
Leader
收到數(shù)據(jù)后應(yīng)答。 - -1(all):生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader和副本節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1 和all 是等價(jià)的。
Request
請(qǐng)求接受到kafka的響應(yīng)結(jié)果,如果成功的話,從InFlightRequests
清除請(qǐng)求,否則的話需要進(jìn)行重發(fā)操作,可以通過(guò)配置項(xiàng)retries
決定,當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries
表示重試次數(shù)。默認(rèn)是 int 最大值,2147483647
。- 清理消息累加器
RecordAccumulator
中的數(shù)據(jù)。
kafka消費(fèi)者流程
原來(lái)kafka生產(chǎn)者發(fā)送經(jīng)過(guò)了這么多流程,我們現(xiàn)在來(lái)看看kafka消費(fèi)者又是如何進(jìn)行的呢?
Kafka 中的消費(fèi)是基于拉取模式的。消息的消費(fèi)一般有兩種模式:推送模式和拉取模式。推模式是服務(wù)端主動(dòng)將消息推送給消費(fèi)者,而拉模式是消費(fèi)者主動(dòng)向服務(wù)端發(fā)起請(qǐng)求來(lái)拉取消息。
kafka是以消費(fèi)者組進(jìn)行消費(fèi)的,一個(gè)消費(fèi)者組,由多個(gè)consumer組成。形成一個(gè)消費(fèi)者組的條件,是所有消費(fèi)者的groupid相同。
- 消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi)。如果向消費(fèi)組中添加更多的消費(fèi)者,超過(guò)主題分區(qū)數(shù)量,則有一部分消費(fèi)者就會(huì)閑置,不會(huì)接收任何消息。
- 消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
那么問(wèn)題來(lái)了,kafka是如何指定消費(fèi)者組的每個(gè)消費(fèi)者消費(fèi)哪個(gè)分區(qū)?每次消費(fèi)的數(shù)量是多少呢?
一、如何制定消費(fèi)方案
- 消費(fèi)者consumerA,consumerB, consumerC向kafka集群中的協(xié)調(diào)器
coordinator
發(fā)送JoinGroup
的請(qǐng)求。coordinator
主要是用來(lái)輔助實(shí)現(xiàn)消費(fèi)者組的初始化和分區(qū)的分配。
coordinator
老大節(jié)點(diǎn)選擇 =groupid
的hashcode
值 % 50(__consumer_offsets
內(nèi)置主題位移的分區(qū)數(shù)量)例如:groupid
的hashcode值 為1,1% 50 = 1
,那么__consumer_offsets
主題的1號(hào)分區(qū),在哪個(gè)broker
上,就選擇這個(gè)節(jié)點(diǎn)的coordinator
作為這個(gè)消費(fèi)者組的老大。消費(fèi)者組下的所有的消費(fèi)者提交offset
的時(shí)候就往這個(gè)分區(qū)去提交offset
。
- 選出一個(gè)
consumer
作為消費(fèi)中的leader
,比如上圖中的ConsumerB
。 - 消費(fèi)者
leader
制定出消費(fèi)方案,比如誰(shuí)來(lái)消費(fèi)哪個(gè)分區(qū)等 - 把消費(fèi)方案發(fā)給
coordinator
- 最后
coordinator
就把消費(fèi)方 案下發(fā)給各個(gè)consumer
, 圖中只畫(huà)了一條線,實(shí)際上是有下發(fā)各個(gè)consumer
。
注意,每個(gè)消費(fèi)者都會(huì)和coordinator
保持心跳(默認(rèn)3s),一旦超時(shí)(session.timeout.ms=45s
),該消費(fèi)者會(huì)被移除,并觸發(fā)再平衡;或者消費(fèi)者處理消息的時(shí)間過(guò)長(zhǎng)(max.poll.interval.ms
=5分鐘),也會(huì)觸發(fā)再平衡,也就是重新進(jìn)行上面的流程。
二、消費(fèi)者消費(fèi)細(xì)節(jié)
現(xiàn)在已經(jīng)初始化消費(fèi)者組信息,知道哪個(gè)消費(fèi)者消費(fèi)哪個(gè)分區(qū),接著我們來(lái)看看消費(fèi)者細(xì)節(jié)。
- 消費(fèi)者創(chuàng)建一個(gè)網(wǎng)絡(luò)連接客戶端
ConsumerNetworkClient
, 發(fā)送消費(fèi)請(qǐng)求,可以進(jìn)行如下配置:
fetch.min.bytes
: 每批次最小抓取大小,默認(rèn)1字節(jié)fetch.max.bytes
: 每批次最大抓取大小,默認(rèn)50Mfetch.max.wait.ms
:最大超時(shí)時(shí)間,默認(rèn)500ms
- 發(fā)送請(qǐng)求到kafka集群
- 成功的回調(diào),會(huì)將數(shù)據(jù)保存到
completedFetches
隊(duì)列中 - 消費(fèi)者從隊(duì)列中抓取數(shù)據(jù),根據(jù)配置
max.poll.records
一次拉取數(shù)據(jù)返回消息的最大條數(shù),默認(rèn)500條。 - 獲取到數(shù)據(jù)后,需要經(jīng)過(guò)反序列化器、攔截器等。
kafka的存儲(chǔ)機(jī)制
我們都知道消息發(fā)送到kafka,最終是存儲(chǔ)到磁盤(pán)中的,我們看下kafka是如何存儲(chǔ)的。
一個(gè)topic
分為多個(gè)partition
,每個(gè)partition對(duì)應(yīng)于一個(gè)log
文件,為防止log文件過(guò)大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,每個(gè)partition
分為多個(gè)segment
。每個(gè)segment
包括:“.index
”文件、“.log
”文件和.timeindex
等文件,Producer
生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log文件末端。
上圖中t1即為一個(gè)topic
的名稱,而“t1-0/t1-1”則表明這個(gè)目錄是t1這個(gè)topic
的哪個(gè)partition
。
kafka中的索引文件以稀疏索引(sparseindex
)的方式構(gòu)造消息的索引,如下圖所示:
1.根據(jù)目標(biāo)offset
定位segment
文件
2.找到小于等于目標(biāo)offset
的最大offset
對(duì)應(yīng)的索引項(xiàng)
3.定位到log
文件
4.向下遍歷找到目標(biāo)Record
注意:index為稀疏索引,大約每往log
文件寫(xiě)入4kb
數(shù)據(jù),會(huì)往index
文件寫(xiě)入一條索引。通過(guò)參數(shù)log.index.interval.bytes
控制,默認(rèn)4kb
。
那kafka中磁盤(pán)文件保存多久呢?
kafka 中默認(rèn)的日志保存時(shí)間為 7 天,可以通過(guò)調(diào)整如下參數(shù)修改保存時(shí)間。
log.retention.hours
,最低優(yōu)先級(jí)小時(shí),默認(rèn) 7 天。log.retention.minutes
,分鐘。log.retention.ms
,最高優(yōu)先級(jí)毫秒。log.retention.check.interval.ms
,負(fù)責(zé)設(shè)置檢查周期,默認(rèn) 5 分鐘。
總結(jié)
其實(shí)kafka中的細(xì)節(jié)十分多,本文也只是對(duì)kafka的一些核心機(jī)制從理論層面做了一個(gè)總結(jié),更多的細(xì)節(jié)還是需要自行去實(shí)踐,去學(xué)習(xí)。
以上就是8張圖帶你全面了解Java kafka的核心機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于Java kafka核心機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot引入遠(yuǎn)程nacos配置文件錯(cuò)誤的解決方案
本文為解決Spring Cloud Alibaba中配置導(dǎo)入問(wèn)題,提供了詳細(xì)的步驟說(shuō)明,包括引入依賴、配置nacos、創(chuàng)建bootstrap.yml文件以及測(cè)試配置導(dǎo)入是否成功的方法,幫助開(kāi)發(fā)者快速解決相關(guān)問(wèn)題2024-09-09哈希表在算法題目中的實(shí)際應(yīng)用詳解(Java)
散列表(Hash?table,也叫哈希表)是根據(jù)關(guān)鍵碼值(Key?value)而直接進(jìn)行訪問(wèn)的數(shù)據(jù)結(jié)構(gòu),下面這篇文章主要給大家介紹了關(guān)于哈希表在算法題目中的實(shí)際應(yīng)用,文中介紹的方法是Java,需要的朋友可以參考下2024-03-03Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析
這篇文章主要介紹了Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08SpringBoot中的@ApiModelProperty注解作用
這篇文章主要介紹了SpringBoot中的@ApiModelProperty注解作用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01解決@RequestBody使用不能class類型匹配的問(wèn)題
這篇文章主要介紹了解決@RequestBody使用不能class類型匹配的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07