欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

8張圖帶你全面了解Java?kafka的核心機(jī)制

 更新時(shí)間:2023年05月17日 10:51:46   作者:JAVA旭陽(yáng)  
kafka是目前企業(yè)中很常用的消息隊(duì)列產(chǎn)品,可以用于削峰、解耦、異步通信,本文就通過(guò)幾張圖帶大家全面認(rèn)識(shí)一下kafka,現(xiàn)在我們不妨帶入kafka設(shè)計(jì)者的角度去思考該如何設(shè)計(jì),它的架構(gòu)是怎么樣的、都有哪些組件組成、如何進(jìn)行擴(kuò)展等等,需要的朋友可以參考下

kafka基礎(chǔ)架構(gòu)

現(xiàn)在假如有100T大小的消息要發(fā)送到kafka中,數(shù)據(jù)量非常大,一臺(tái)機(jī)器存儲(chǔ)不下,面對(duì)這種情況,你該如何設(shè)計(jì)呢?

很簡(jiǎn)單,分而治之,一臺(tái)不夠,那就多臺(tái),這就形成了一個(gè)kafka集群。如下圖所示,一個(gè)broker就是一個(gè)kafka節(jié)點(diǎn),100T數(shù)據(jù)就有3個(gè)節(jié)點(diǎn)分擔(dān),每個(gè)節(jié)點(diǎn)約33T,這樣就能解決問(wèn)題了,還能提高吞吐量。

  • Topic: 可以理解為一個(gè)隊(duì)列,一個(gè)kafka集群中可以定義很多的topic,比如上圖中的topicA。
  • Partition: 為了實(shí)現(xiàn)擴(kuò)展性,提高吞吐量,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。比如上圖中的topicA被分成了3個(gè)partition。
  • Replica: 副本,如果數(shù)據(jù)只放在一個(gè)broker中,萬(wàn)一這個(gè)broker宕機(jī)了怎么辦?為了實(shí)現(xiàn)高可用,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè)Follower。比如上圖中的虛線連接的就是它的副本。
  • Leader: 每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 Leader
  • Follower: 每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 會(huì)成為新的 Leader。
  • Producer: 消息生產(chǎn)者,就是向 Kafka broker發(fā)消息的客戶端,后面詳細(xì)講解。
  • Consumer: 消息消費(fèi)者,向 Kafka broker 取消息的客戶端,多個(gè)Consumer會(huì)組成一個(gè)消費(fèi)者組,后面詳細(xì)講解。
  • Zookeeper:用來(lái)記錄kafka中的一些元數(shù)據(jù),比如kafka集群中的broker,leader是誰(shuí)等等,但Kafka2.8.0版本以后也支持非zk的方式,大大減少了和zk的交互。

kafka生產(chǎn)者流程

前面通過(guò)一張圖片講解了kafka整體的架構(gòu),那現(xiàn)在我們來(lái)看看kafka生產(chǎn)者發(fā)送的整個(gè)過(guò)程,這里面也是大有文章。

在消息發(fā)送的過(guò)程中,涉及到了兩個(gè)線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulatorSender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。

  • 在主線程中由 kafkaProducer 創(chuàng)建消息,然后通過(guò)可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator, 也稱為消息收集器)中。
  • 攔截器: 可以用來(lái)在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過(guò)濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來(lái)在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計(jì)類工作。
  • 序列化器: 用于在網(wǎng)絡(luò)傳輸中將數(shù)據(jù)序列化為字節(jié)流進(jìn)行傳輸,保證數(shù)據(jù)不會(huì)丟失。
  • 分區(qū)器: 用于按照一定的規(guī)則將數(shù)據(jù)分發(fā)到不同的kafka broker節(jié)點(diǎn)中
  • Sender 線程負(fù)責(zé)從 RecordAccumulator 獲取消息并將其發(fā)送到 Kafka 中。
  • RecordAccumulator 主要用來(lái)緩存消息以便 Sender 線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。
  • RecordAccumulator 緩存的大小可以通過(guò)生產(chǎn)者客戶端參數(shù) buffer.memory 配置,默認(rèn)值為 33554432B ,即 32M。
  • 主線程中發(fā)送過(guò)來(lái)的消息都會(huì)被迫加到 RecordAccumulator 的某個(gè)雙端隊(duì)列( Deque )中,RecordAccumulator 內(nèi)部為每個(gè)分區(qū)都維護(hù)了一個(gè)雙端隊(duì)列,即 Deque<ProducerBatch>, 消息寫(xiě)入緩存時(shí),追加到雙端隊(duì)列的尾部。
  • Sender 讀取消息時(shí),從雙端隊(duì)列的頭部讀取。ProducerBatch 是指一個(gè)消息批次;與此同時(shí),會(huì)將較小的 ProducerBatch 湊成一個(gè)較大 ProducerBatch ,也可以減少網(wǎng)絡(luò)請(qǐng)求的次數(shù)以提升整體的吞吐量。ProducerBatch 大小可以通過(guò)batch.size 控制,默認(rèn)16kb
  • Sender 線程會(huì)在有數(shù)據(jù)積累到batch.size,默認(rèn)16kb,或者如果數(shù)據(jù)遲遲未達(dá)到batch.sizeSender線程等待linger.ms設(shè)置的時(shí)間到了之后就會(huì)獲取數(shù)據(jù)。linger.ms單位ms,默認(rèn)值是0ms,表示沒(méi)有延遲。
  • SenderRecordAccumulator 獲取緩存的消息之后,會(huì)將數(shù)據(jù)封裝成網(wǎng)絡(luò)請(qǐng)求<Node,Request> 的形式,這樣就可以將 Request 請(qǐng)求發(fā)往各個(gè) Node 了。
  • 請(qǐng)求在從 sender 線程發(fā)往 Kafka 之前還會(huì)保存到 InFlightRequests 中,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒(méi)有收到服務(wù)端響應(yīng)的請(qǐng)求。InFlightRequests默認(rèn)每個(gè)分區(qū)下最多緩存5個(gè)請(qǐng)求,可以通過(guò)配置參數(shù)為max.in.flight.request.per. connection修改。
  • 請(qǐng)求Request通過(guò)通道Selector發(fā)送到kafka節(jié)點(diǎn)。
  • 發(fā)送后,需要等待kafka的應(yīng)答機(jī)制,取決于配置項(xiàng)acks.
  • 0:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),不需要等待數(shù)據(jù)落盤(pán)就應(yīng)答。
  • 1:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader 收到數(shù)據(jù)后應(yīng)答。
  • -1(all):生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader和副本節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1 和all 是等價(jià)的。
  • Request請(qǐng)求接受到kafka的響應(yīng)結(jié)果,如果成功的話,從InFlightRequests 清除請(qǐng)求,否則的話需要進(jìn)行重發(fā)操作,可以通過(guò)配置項(xiàng)retries決定,當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試次數(shù)。默認(rèn)是 int 最大值,2147483647。
  • 清理消息累加器RecordAccumulator 中的數(shù)據(jù)。

kafka消費(fèi)者流程

原來(lái)kafka生產(chǎn)者發(fā)送經(jīng)過(guò)了這么多流程,我們現(xiàn)在來(lái)看看kafka消費(fèi)者又是如何進(jìn)行的呢?

Kafka 中的消費(fèi)是基于拉取模式的。消息的消費(fèi)一般有兩種模式:推送模式和拉取模式。推模式是服務(wù)端主動(dòng)將消息推送給消費(fèi)者,而拉模式是消費(fèi)者主動(dòng)向服務(wù)端發(fā)起請(qǐng)求來(lái)拉取消息。

kafka是以消費(fèi)者組進(jìn)行消費(fèi)的,一個(gè)消費(fèi)者組,由多個(gè)consumer組成。形成一個(gè)消費(fèi)者組的條件,是所有消費(fèi)者的groupid相同。

  • 消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi)。如果向消費(fèi)組中添加更多的消費(fèi)者,超過(guò)主題分區(qū)數(shù)量,則有一部分消費(fèi)者就會(huì)閑置,不會(huì)接收任何消息。
  • 消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。

那么問(wèn)題來(lái)了,kafka是如何指定消費(fèi)者組的每個(gè)消費(fèi)者消費(fèi)哪個(gè)分區(qū)?每次消費(fèi)的數(shù)量是多少呢?

一、如何制定消費(fèi)方案

  • 消費(fèi)者consumerA,consumerB, consumerC向kafka集群中的協(xié)調(diào)器coordinator發(fā)送JoinGroup的請(qǐng)求。coordinator主要是用來(lái)輔助實(shí)現(xiàn)消費(fèi)者組的初始化和分區(qū)的分配。
  • coordinator老大節(jié)點(diǎn)選擇 = groupidhashcode值 % 50( __consumer_offsets內(nèi)置主題位移的分區(qū)數(shù)量)例如: groupid的hashcode值 為1,1% 50 = 1,那么__consumer_offsets 主題的1號(hào)分區(qū),在哪個(gè)broker上,就選擇這個(gè)節(jié)點(diǎn)的coordinator作為這個(gè)消費(fèi)者組的老大。消費(fèi)者組下的所有的消費(fèi)者提交offset的時(shí)候就往這個(gè)分區(qū)去提交offset。
  • 選出一個(gè) consumer作為消費(fèi)中的leader,比如上圖中的ConsumerB。
  • 消費(fèi)者leader制定出消費(fèi)方案,比如誰(shuí)來(lái)消費(fèi)哪個(gè)分區(qū)等
  • 把消費(fèi)方案發(fā)給coordinator
  • 最后coordinator就把消費(fèi)方 案下發(fā)給各個(gè)consumer, 圖中只畫(huà)了一條線,實(shí)際上是有下發(fā)各個(gè)consumer。

注意,每個(gè)消費(fèi)者都會(huì)和coordinator保持心跳(默認(rèn)3s),一旦超時(shí)(session.timeout.ms=45s),該消費(fèi)者會(huì)被移除,并觸發(fā)再平衡;或者消費(fèi)者處理消息的時(shí)間過(guò)長(zhǎng)(max.poll.interval.ms=5分鐘),也會(huì)觸發(fā)再平衡,也就是重新進(jìn)行上面的流程。

二、消費(fèi)者消費(fèi)細(xì)節(jié)

現(xiàn)在已經(jīng)初始化消費(fèi)者組信息,知道哪個(gè)消費(fèi)者消費(fèi)哪個(gè)分區(qū),接著我們來(lái)看看消費(fèi)者細(xì)節(jié)。

  • 消費(fèi)者創(chuàng)建一個(gè)網(wǎng)絡(luò)連接客戶端ConsumerNetworkClient, 發(fā)送消費(fèi)請(qǐng)求,可以進(jìn)行如下配置:
  • fetch.min.bytes: 每批次最小抓取大小,默認(rèn)1字節(jié)
  • fetch.max.bytes: 每批次最大抓取大小,默認(rèn)50M
  • fetch.max.wait.ms:最大超時(shí)時(shí)間,默認(rèn)500ms
  • 發(fā)送請(qǐng)求到kafka集群
  • 成功的回調(diào),會(huì)將數(shù)據(jù)保存到completedFetches隊(duì)列中
  • 消費(fèi)者從隊(duì)列中抓取數(shù)據(jù),根據(jù)配置max.poll.records一次拉取數(shù)據(jù)返回消息的最大條數(shù),默認(rèn)500條。
  • 獲取到數(shù)據(jù)后,需要經(jīng)過(guò)反序列化器、攔截器等。

kafka的存儲(chǔ)機(jī)制

我們都知道消息發(fā)送到kafka,最終是存儲(chǔ)到磁盤(pán)中的,我們看下kafka是如何存儲(chǔ)的。

一個(gè)topic分為多個(gè)partition,每個(gè)partition對(duì)應(yīng)于一個(gè)log文件,為防止log文件過(guò)大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,每個(gè)partition分為多個(gè)segment。每個(gè)segment包括:“.index”文件、“.log”文件和.timeindex等文件,Producer生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log文件末端。

上圖中t1即為一個(gè)topic的名稱,而“t1-0/t1-1”則表明這個(gè)目錄是t1這個(gè)topic的哪個(gè)partition

kafka中的索引文件以稀疏索引(sparseindex)的方式構(gòu)造消息的索引,如下圖所示:

1.根據(jù)目標(biāo)offset定位segment文件

2.找到小于等于目標(biāo)offset的最大offset對(duì)應(yīng)的索引項(xiàng)

3.定位到log文件

4.向下遍歷找到目標(biāo)Record

注意:index為稀疏索引,大約每往log文件寫(xiě)入4kb數(shù)據(jù),會(huì)往index文件寫(xiě)入一條索引。通過(guò)參數(shù)log.index.interval.bytes控制,默認(rèn)4kb

那kafka中磁盤(pán)文件保存多久呢?

kafka 中默認(rèn)的日志保存時(shí)間為 7 天,可以通過(guò)調(diào)整如下參數(shù)修改保存時(shí)間。

  • log.retention.hours,最低優(yōu)先級(jí)小時(shí),默認(rèn) 7 天。
  • log.retention.minutes,分鐘。
  • log.retention.ms,最高優(yōu)先級(jí)毫秒。
  • log.retention.check.interval.ms,負(fù)責(zé)設(shè)置檢查周期,默認(rèn) 5 分鐘。

總結(jié)

其實(shí)kafka中的細(xì)節(jié)十分多,本文也只是對(duì)kafka的一些核心機(jī)制從理論層面做了一個(gè)總結(jié),更多的細(xì)節(jié)還是需要自行去實(shí)踐,去學(xué)習(xí)。

以上就是8張圖帶你全面了解Java kafka的核心機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于Java kafka核心機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • springboot引入遠(yuǎn)程nacos配置文件錯(cuò)誤的解決方案

    springboot引入遠(yuǎn)程nacos配置文件錯(cuò)誤的解決方案

    本文為解決Spring Cloud Alibaba中配置導(dǎo)入問(wèn)題,提供了詳細(xì)的步驟說(shuō)明,包括引入依賴、配置nacos、創(chuàng)建bootstrap.yml文件以及測(cè)試配置導(dǎo)入是否成功的方法,幫助開(kāi)發(fā)者快速解決相關(guān)問(wèn)題
    2024-09-09
  • Java自定義類加載器實(shí)現(xiàn)類隔離詳解

    Java自定義類加載器實(shí)現(xiàn)類隔離詳解

    由于每種組件的不同版本所依賴的jar包不同,我們可以借鑒tomcat的實(shí)現(xiàn)方式,通過(guò)自定義類加載器打破雙親委派機(jī)制來(lái)實(shí)現(xiàn)類隔離,從而達(dá)到操作多組件多版本的目的。本文就來(lái)和大家詳細(xì)聊聊實(shí)現(xiàn)方法
    2023-03-03
  • 哈希表在算法題目中的實(shí)際應(yīng)用詳解(Java)

    哈希表在算法題目中的實(shí)際應(yīng)用詳解(Java)

    散列表(Hash?table,也叫哈希表)是根據(jù)關(guān)鍵碼值(Key?value)而直接進(jìn)行訪問(wèn)的數(shù)據(jù)結(jié)構(gòu),下面這篇文章主要給大家介紹了關(guān)于哈希表在算法題目中的實(shí)際應(yīng)用,文中介紹的方法是Java,需要的朋友可以參考下
    2024-03-03
  • SpringBoot彩色日志配置方式

    SpringBoot彩色日志配置方式

    這篇文章主要介紹了SpringBoot彩色日志配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析

    Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析

    這篇文章主要介紹了Mybatis之通用Mapper動(dòng)態(tài)表名及其原理分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • java的泛型你真的了解嗎

    java的泛型你真的了解嗎

    這篇文章主要為大家詳細(xì)介紹了java的泛型,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-03-03
  • SpringBoot中的@ApiModelProperty注解作用

    SpringBoot中的@ApiModelProperty注解作用

    這篇文章主要介紹了SpringBoot中的@ApiModelProperty注解作用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。
    2022-01-01
  • Spring中propagation的傳播機(jī)制詳解

    Spring中propagation的傳播機(jī)制詳解

    這篇文章主要介紹了Spring中propagation的傳播機(jī)制詳解,要搞懂事務(wù)的傳播機(jī)制,那么就要明白邏輯事務(wù)中各個(gè)事務(wù)的關(guān)系,才能徹底理解事務(wù)傳播特性,在Spring事務(wù)中,各個(gè)邏輯事務(wù)的關(guān)系可以是并列、覆蓋或包含,需要的朋友可以參考下
    2023-12-12
  • java 求解二維數(shù)組列最小值

    java 求解二維數(shù)組列最小值

    這篇文章主要介紹了java 求解二維數(shù)組列最小值的相關(guān)資料,需要的朋友可以參考下
    2017-05-05
  • 解決@RequestBody使用不能class類型匹配的問(wèn)題

    解決@RequestBody使用不能class類型匹配的問(wèn)題

    這篇文章主要介紹了解決@RequestBody使用不能class類型匹配的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07

最新評(píng)論