欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解

 更新時(shí)間:2023年10月11日 09:22:55   作者:金甲蟲Scarb  
這篇文章主要介紹了RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解,RocketMQ是一款高性能、高可靠性的分布式消息中間件,消費(fèi)者是RocketMQ中的重要組成部分,消費(fèi)者負(fù)責(zé)從消息隊(duì)列中獲取消息并進(jìn)行處理,需要的朋友可以參考下

1. 背景

RocketMQ 的消費(fèi)可以算是 RocketMQ 的業(yè)務(wù)邏輯中最復(fù)雜的一塊。這里面涉及到許多消費(fèi)模式和特性。本想一篇文章寫完,寫到后面發(fā)現(xiàn)消費(fèi)涉及到的內(nèi)容太多,于是決定分多篇來寫。本文作為消費(fèi)系列的第一篇,主要講述 RocketMQ 消費(fèi)涉及到的模式和特性,也會概括性地講一下消費(fèi)流程。

我將 RocketMQ 的消費(fèi)流程大致分成 4 個(gè)步驟

重平衡消費(fèi)者拉取消息Broker 接收拉取請求后從存儲中查詢消息并返回消費(fèi)者消費(fèi)消息

每個(gè)步驟都會用一篇文章來講解。

先了解一下 RocketMQ 消費(fèi)涉及到地概念

2. 概念簡述

2.1 消費(fèi)組概念與消費(fèi)模式

和大多數(shù)消息隊(duì)列一樣,RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。在了解它們之前,需要先引入消費(fèi)組的概念。

2.1.1 消費(fèi)組

一個(gè)消費(fèi)者實(shí)例即是一個(gè)消費(fèi)者進(jìn)程,負(fù)責(zé)消費(fèi)消息。單個(gè)消費(fèi)者速度有限,在實(shí)際使用中通常會采用多個(gè)消費(fèi)者共同消費(fèi)同樣的 Topic 以加快消費(fèi)速度。這多個(gè)消費(fèi)同樣 Topic 的消費(fèi)者組成了消費(fèi)者組。

消費(fèi)組是一個(gè)邏輯概念,它包含了多個(gè)同一類的消費(fèi)者實(shí)例,通常這些消費(fèi)者都消費(fèi)同一類消息(都消費(fèi)相同的 Topic)且消費(fèi)邏輯一致。

消費(fèi)組的引入是用來在消費(fèi)消息時(shí)更好地進(jìn)行負(fù)載均衡和容錯(cuò)。

2.1.2 廣播消費(fèi)模式(BROADCASTING)

廣播消費(fèi)模式即全部的消息會廣播分發(fā)到所有的消費(fèi)者實(shí)例,每個(gè)消費(fèi)者實(shí)例會收到全量的消息(即便消費(fèi)組中有多個(gè)消費(fèi)者都訂閱同一 Topic)。

如下圖所示,生產(chǎn)者發(fā)送了 5 條消息,每個(gè)消費(fèi)組中的消費(fèi)者都收到全部的 5 條消息。

廣播模式使用較少,適合各個(gè)消費(fèi)者都需要通知的場景,如刷新應(yīng)用中的緩存。

廣播消費(fèi)模式

注意事項(xiàng):

  1. 廣播消費(fèi)模式下不支持 順序消息。
  2. 廣播消費(fèi)模式下不支持 重置消費(fèi)位點(diǎn)。
  3. 每條消息都需要被相同訂閱邏輯的多臺機(jī)器處理。
  4. 消費(fèi)進(jìn)度在客戶端維護(hù),出現(xiàn)重復(fù)消費(fèi)的概率稍大于集群模式。如果消費(fèi)進(jìn)度文件丟失,存在消息丟失的可能。
  5. 廣播模式下,消息隊(duì)列 RocketMQ 版保證每條消息至少被每臺客戶端消費(fèi)一次,但是并不會重投消費(fèi)失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費(fèi)失敗的情況。
  6. 廣播模式下,客戶端每一次重啟都會從最新消息消費(fèi)??蛻舳嗽诒煌V蛊陂g發(fā)送至服務(wù)端的消息將會被自動(dòng)跳過,請謹(jǐn)慎選擇。
  7. 廣播模式下,每條消息都會被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。
  8. 廣播模式下服務(wù)端不維護(hù)消費(fèi)進(jìn)度,所以消息隊(duì)列 RocketMQ 版控制臺不支持消息堆積查詢、消息堆積報(bào)警和訂閱關(guān)系查詢功能。

2.1.3 集群消費(fèi)模式(CLUSTERING)

集群消費(fèi)模式下,同一 Topic 下的一條消息只會被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說,消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。

更具體一點(diǎn),在同一消費(fèi)組中的不同消費(fèi)者會根據(jù)負(fù)載機(jī)制來平均地訂閱 Topic 中的每個(gè) Queue。(默認(rèn) AVG 負(fù)載方式)

廣播消費(fèi)模式

RocketMQ 默認(rèn)使用集群消費(fèi)模式,這也是大部分場景下會使用到的消費(fèi)模式。

2.2 消費(fèi)者拉取消息模式

2.2.1 Pull

指消費(fèi)者主動(dòng)拉取消息進(jìn)行消費(fèi),主動(dòng)從 Broker 拉取消息,主動(dòng)權(quán)由消費(fèi)者應(yīng)用控制。

2.2.2 Push

Broker 主動(dòng)將消息 Push 給消費(fèi)者,Broker 收到消息就會主動(dòng)推送到消費(fèi)者端。該模式的消費(fèi)實(shí)時(shí)性較高,也是主流場景中普遍采用的消費(fèi)形式。

消費(fèi)者組中的消費(fèi)者實(shí)例會根據(jù)預(yù)設(shè)的負(fù)載均衡算法對 Topic 中的 Queue 進(jìn)行均勻的訂閱,每個(gè) Queue 最多只能被一個(gè)消費(fèi)者訂閱。

在 RocketMQ 中,Push 消費(fèi)其實(shí)也是由 Pull 消費(fèi)(拉?。?shí)現(xiàn)。Push 消費(fèi)只是通過客戶端 API 層面的封裝讓用戶感覺像是 Broker 在推送消息給消費(fèi)者。

2.2.3 POP

RocketMQ 5.0 引入的新消費(fèi)形式,是 Pull 拉取的另一種實(shí)現(xiàn)。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分別消費(fèi)重試 Topic 和普通 Topic)。

POP 與 Pull 可以通過一個(gè)開關(guān)實(shí)時(shí)進(jìn)行切換。POP 模式下,Broker 來控制每個(gè)消費(fèi)者消費(fèi)的隊(duì)列和拉取的消息,把重平衡邏輯從客戶端移到了服務(wù)端。

主要解決了原來 Push 模式消費(fèi)的以下痛點(diǎn):

富客戶端:客戶端邏輯比較重,多語言支持不友好隊(duì)列獨(dú)占:Topic 中的一個(gè) Queue 最多只能被 1 個(gè) Push 消費(fèi)者消費(fèi),消費(fèi)者數(shù)量無法無限擴(kuò)展。且消費(fèi)者 hang 住時(shí)該隊(duì)列的消息會堆積。消費(fèi)后更新 offset:本地消費(fèi)成功才會提交 offset

RocketMQ 5.0 的輕量化 gRPC 客戶端就是基于 POP 消費(fèi)模式開發(fā)

2.3 隊(duì)列負(fù)載機(jī)制與重平衡

在集群消費(fèi)模式下,消費(fèi)組中的消費(fèi)者共同消費(fèi)訂閱的 Topic 中的所有消息,這里就存在 Topic 中的隊(duì)列如何分配給消費(fèi)者的問題。

2.3.1 隊(duì)列負(fù)載機(jī)制

RocketMQ Broker 中的隊(duì)列負(fù)載機(jī)制將一個(gè) Topic 的不同隊(duì)列按照算法盡可能平均地分配給消費(fèi)者組中的所有消費(fèi)者。RocketMQ 預(yù)設(shè)了多種負(fù)載算法供不同場景下的消費(fèi)。

AVG:將隊(duì)列按數(shù)量平均分配給多個(gè)消費(fèi)者,按 Broker 順序先分配第一個(gè) Broker 的所有隊(duì)列給第一個(gè)消費(fèi)者,然后給第二個(gè)。

AVG_BY_CIRCLE:將 Broker 上的隊(duì)列輪流分給不同消費(fèi)者,更適用于 Topic 在不同 Broker 之間分布不均勻的情況。

默認(rèn)采用 AVG 負(fù)載方式。

2.3.2 重平衡(Rebalance)

為消費(fèi)者分配隊(duì)列消費(fèi)的這一個(gè)負(fù)載過程并不是一勞永逸的,比如當(dāng)消費(fèi)者數(shù)量變化、Broker 掉線等情況發(fā)生后,原先的負(fù)載就變得不再均衡,此時(shí)就需要重新進(jìn)行負(fù)載均衡,這一過程被稱為重平衡機(jī)制。

每隔 20s,RocketMQ 會進(jìn)行一次檢查,檢查隊(duì)列數(shù)量、消費(fèi)者數(shù)量是否發(fā)生變化,如果變化則觸發(fā)消費(fèi)隊(duì)列重平衡,重新執(zhí)行上述負(fù)載算法。

2.4 消費(fèi)端高可靠

2.4.1 重試-死信機(jī)制

在實(shí)際使用中,消息的消費(fèi)可能出現(xiàn)失敗。RocketMQ 擁有重試機(jī)制和死信機(jī)制來保證消息消費(fèi)的可靠性。

正常消費(fèi):消費(fèi)成功則提交消費(fèi)位點(diǎn)

重試機(jī)制:如果正常消費(fèi)失敗,消息會被消費(fèi)者發(fā)回 Broker,放入重試 Topic: %RETRY%消費(fèi)者組 。最多重試消費(fèi) 16 次,重試的時(shí)間間隔逐漸變長。(消費(fèi)者組會自動(dòng)訂閱重試 Topic)。

這里地延遲重試采用了 RocketMQ 的延遲消息,重試的 16 次時(shí)間間隔為延遲消息配置的每個(gè)延遲等級的時(shí)間(從第三個(gè)等級開始)。如果修改延遲等級時(shí)間的配置,重試的時(shí)間間隔也會相應(yīng)發(fā)生變化。但即便延遲等級時(shí)間間隔配置不足 16 個(gè),仍會重試 16 次,后面按照最大的時(shí)間間隔來重試。

死信機(jī)制:如果正常消費(fèi)和重試 16 次均失敗,消息會保存到死信 Topic %DLQ%消費(fèi)者組 中,此時(shí)需人工介入處理

2.4.2 隊(duì)列負(fù)載機(jī)制與重平衡

當(dāng)發(fā)生 Broker 掛掉或者消費(fèi)者掛掉時(shí),會引發(fā)重平衡,可以自動(dòng)感知有組件掛掉的情況并重新調(diào)整消費(fèi)者的訂閱關(guān)系。

2.5 并發(fā)消費(fèi)與順序消費(fèi)

在消費(fèi)者客戶端消費(fèi)時(shí),有兩種訂閱消息的方式,分別是并發(fā)消費(fèi)和順序消費(fèi)。廣播模式不支持順序消費(fèi),僅有集群模式能使用順序消費(fèi)。

需要注意的是,這里所說的順序消費(fèi)指的是隊(duì)列維度的順序,即在消費(fèi)一個(gè)隊(duì)列時(shí),消費(fèi)消息的順序和消息發(fā)送的順序一致。如果一個(gè) Topic 有多個(gè)隊(duì)列, 是不可能達(dá)成 Topic 級別的順序消費(fèi)的,因?yàn)闊o法控制哪個(gè)隊(duì)列的消息被先消費(fèi)。Topic 只有一個(gè)隊(duì)列的情況下能夠?qū)崿F(xiàn) Topic 級別的順序消費(fèi)。

具體順序生產(chǎn)和消費(fèi)代碼見 官方文檔。

順序生產(chǎn)的方式為串行生產(chǎn),并在生產(chǎn)時(shí)指定隊(duì)列。

并發(fā)消費(fèi)的方式是調(diào)用消費(fèi)者的指定 MessageListenerConcurrently 作為消費(fèi)的回調(diào)類,順序消費(fèi)則使用 MessageListenerOrderly 類進(jìn)行回調(diào)。處理這兩種消費(fèi)方式的消費(fèi)服務(wù)也不同,分別是 ConsumeMessageConcurrentlyService ConsumeMessageOrderlyService

順序消費(fèi)的大致原理是依靠兩組鎖,一組在 Broker 端(Broker 鎖),鎖定隊(duì)列和消費(fèi)者的關(guān)系,保證同一時(shí)間只有一個(gè)消費(fèi)者在消費(fèi);在消費(fèi)者端也有一組鎖(消費(fèi)隊(duì)列鎖)以保證消費(fèi)的順序性。

2.6 消費(fèi)進(jìn)度保存和提交

消費(fèi)者消費(fèi)一批消息完成之后,需要保存消費(fèi)進(jìn)度。如果是集群消費(fèi)模式,還需要將消費(fèi)進(jìn)度讓其他消費(fèi)者知道,所以需要提交消費(fèi)進(jìn)度。這樣在消費(fèi)者重啟或隊(duì)列重平衡時(shí)可以根據(jù)消費(fèi)進(jìn)度繼續(xù)消費(fèi)。

不同模式下消費(fèi)進(jìn)度保存方式的不同:

  • 廣播模式:保存在消費(fèi)者本地。因?yàn)槊總€(gè)消費(fèi)者都需要消費(fèi)全量消息消息。在 LocalfileOffsetStore 當(dāng)中。
  • 集群模式:保存在 Broker,同時(shí)消費(fèi)者端緩存。因?yàn)橐粋€(gè) Topic 的消息只要被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)即可,所以消息的消費(fèi)進(jìn)度需要統(tǒng)一保存。通過 RemoteBrokerOffsetStore 存儲。

集群模式下,消費(fèi)者端有定時(shí)任務(wù),定時(shí)將內(nèi)存中的消費(fèi)進(jìn)度提交到 Broker,Broker 也有定時(shí)任務(wù)將內(nèi)存中的消費(fèi)偏移量持久化到磁盤。此外,消費(fèi)者向 Broker 拉取消息時(shí)也會提交消費(fèi)偏移量。注意,消費(fèi)者線程池提交的偏移量是線程池消費(fèi)的這一批消息中偏移量最小的消息的偏移量。

  1. 消費(fèi)完一批消息后將消息消費(fèi)進(jìn)度存在本地內(nèi)存
  2. 消費(fèi)者中有一個(gè)定時(shí)線程,每 5s 將內(nèi)存中所有隊(duì)列的消費(fèi)偏移量提交到 Broker
  3. Broker 收到消費(fèi)進(jìn)度先緩存到內(nèi)存,有一個(gè)定時(shí)任務(wù)每隔 5s 將消息偏移量持久化到磁盤
  4. 消費(fèi)者向 Broker 拉取消息時(shí)也會將隊(duì)列的消息偏移量提交到 Broker

3. 消費(fèi)流程

這張圖是阿里云的文章講解消費(fèi)時(shí)用到的,能夠清晰地表示客戶端 Push 模式并發(fā)消費(fèi)流程。

img

從左上角第一個(gè)方框開始看

  1. 消費(fèi)者啟動(dòng)時(shí)喚醒重平衡服務(wù) RebalanceService,重平衡服務(wù)是客戶端開始消費(fèi)的起點(diǎn)。
  2. 重平衡服務(wù)會周期性(每 20s)執(zhí)行重平衡方法 doRebalance),查詢所有注冊的 Broker,根據(jù)注冊的 Broker 數(shù)量為自身分配負(fù)載的隊(duì)列 rebalanceByTopic()
  3. 分配完隊(duì)列后,會為每個(gè)分配到的新隊(duì)列創(chuàng)建一個(gè)消息拉取請求 pullRequest,這個(gè)拉取請求中保存一個(gè)處理隊(duì)列 processQueue,即圖中的紅黑樹(TreeMap),用來保存拉取到的消息。紅黑樹保存消息的順序。
  4. 消息拉取線程應(yīng)用生產(chǎn)-消費(fèi)模式,用一個(gè)線程從拉取請求隊(duì)列 pullRequestQueue 中彈出拉取請求,執(zhí)行拉取任務(wù),將拉取到的消息放入處理隊(duì)列。
  5. 拉取請求在一次拉取消息完成之后會復(fù)用,重新被放入拉取請求隊(duì)列 pullRequestQueue 中
  6. 拉取完成后,在 NettyClientPublicExecutorThreadPool 線程池異步處理結(jié)果,將拉取到的消息放入處理隊(duì)列,然后調(diào)用 consumeMessageService.submitConsumeRequest,將處理隊(duì)列和 多個(gè)消費(fèi)任務(wù)提交到消
  7. 費(fèi)線程池。每個(gè)消費(fèi)任務(wù)消費(fèi) 1 批消息(1 批默認(rèn)為 1 條)
  8. 每個(gè)消費(fèi)者都有一個(gè)消費(fèi)線程池 consumeMessageThreadPool ,默認(rèn)有 20 個(gè)消費(fèi)線程。
  9. 消費(fèi)線程池的每個(gè)消費(fèi)線程會嘗試從消費(fèi)任務(wù)隊(duì)列中獲取消費(fèi)請求,執(zhí)行消費(fèi)業(yè)務(wù)邏輯 listener.consumeMessage。
  10. 消費(fèi)完成后,如果消費(fèi)成功,則更新偏移量 updateOffset(先更新到內(nèi)存 offsetTable,定時(shí)上報(bào)到 Broker。Broker 端也先放到內(nèi)存,定時(shí)刷盤)。

到此這篇關(guān)于RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解的文章就介紹到這了,更多相關(guān)RocketMQ中的消費(fèi)者內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring Boot集成MyBatis實(shí)現(xiàn)通用Mapper的配置及使用

    Spring Boot集成MyBatis實(shí)現(xiàn)通用Mapper的配置及使用

    關(guān)于MyBatis,大部分人都很熟悉。MyBatis 是一款優(yōu)秀的持久層框架,它支持定制化 SQL、存儲過程以及高級映射。這篇文章主要介紹了Spring Boot集成MyBatis實(shí)現(xiàn)通用Mapper,需要的朋友可以參考下
    2018-08-08
  • Java?Spring中Bean的作用域及生命周期

    Java?Spring中Bean的作用域及生命周期

    這篇文章主要介紹了Java?Spring中Bean的作用域及生命周期,Bean的作用域默認(rèn)是單例模式的,也就是說所有?的使?的都是同?個(gè)對象,更多相關(guān)內(nèi)容需要的朋友可以參考一下
    2022-08-08
  • java中httpclient封裝post請求和get的請求實(shí)例

    java中httpclient封裝post請求和get的請求實(shí)例

    這篇文章主要介紹了java中httpclient封裝post請求和get的請求實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • 詳解如何解析pom文件方法示例

    詳解如何解析pom文件方法示例

    這篇文章主要為大家介紹了詳解如何解析pom文件方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • SpringBoot項(xiàng)目調(diào)優(yōu)及垃圾回收器的比較詳解

    SpringBoot項(xiàng)目調(diào)優(yōu)及垃圾回收器的比較詳解

    這篇文章主要介紹了SpringBoot項(xiàng)目調(diào)優(yōu)及垃圾回收器的比較詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • Java多線程與優(yōu)先級詳細(xì)解讀

    Java多線程與優(yōu)先級詳細(xì)解讀

    這篇文章主要給大家介紹了關(guān)于Java中方法使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-08-08
  • 解讀RedisTemplate的各種操作(set、hash、list、string)

    解讀RedisTemplate的各種操作(set、hash、list、string)

    這篇文章主要介紹了解讀RedisTemplate的各種操作(set、hash、list、string),具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • java時(shí)間日期使用與查詢代碼詳解

    java時(shí)間日期使用與查詢代碼詳解

    這篇文章主要介紹了java時(shí)間日期使用與查詢代碼詳解,具有一定借鑒價(jià)值,需要的朋友可以參考下。
    2017-11-11
  • IDEA運(yùn)行Java項(xiàng)目報(bào)錯(cuò)java: 錯(cuò)誤: 不支持發(fā)行版本 xx的解決方法

    IDEA運(yùn)行Java項(xiàng)目報(bào)錯(cuò)java: 錯(cuò)誤: 不支持發(fā)行版本 xx的解決方法

    這篇文章主要介紹了IDEA運(yùn)行Java項(xiàng)目報(bào)錯(cuò)java: 錯(cuò)誤: 不支持發(fā)行版本 xx的解決方法,文中有詳細(xì)的解決方案供大家參考,對大家解決問題有一定的幫助,需要的朋友可以參考下
    2025-04-04
  • Java虛擬機(jī)內(nèi)存區(qū)域劃分詳解

    Java虛擬機(jī)內(nèi)存區(qū)域劃分詳解

    這篇文章主要介紹了Java虛擬機(jī)內(nèi)存區(qū)域劃分,本文邏輯清晰,可以幫助我們更好的掌握虛擬機(jī),對我們學(xué)習(xí)java來說是一種幫助,需要的朋友可以參考下
    2021-04-04

最新評論