RocketMQ中消費者概念和消費流程詳解
1. 背景
RocketMQ 的消費可以算是 RocketMQ 的業(yè)務(wù)邏輯中最復(fù)雜的一塊。這里面涉及到許多消費模式和特性。本想一篇文章寫完,寫到后面發(fā)現(xiàn)消費涉及到的內(nèi)容太多,于是決定分多篇來寫。本文作為消費系列的第一篇,主要講述 RocketMQ 消費涉及到的模式和特性,也會概括性地講一下消費流程。
我將 RocketMQ 的消費流程大致分成 4 個步驟
重平衡消費者拉取消息Broker 接收拉取請求后從存儲中查詢消息并返回消費者消費消息
每個步驟都會用一篇文章來講解。
先了解一下 RocketMQ 消費涉及到地概念
2. 概念簡述
2.1 消費組概念與消費模式
和大多數(shù)消息隊列一樣,RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。在了解它們之前,需要先引入消費組的概念。
2.1.1 消費組
一個消費者實例即是一個消費者進(jìn)程,負(fù)責(zé)消費消息。單個消費者速度有限,在實際使用中通常會采用多個消費者共同消費同樣的 Topic 以加快消費速度。這多個消費同樣 Topic 的消費者組成了消費者組。
消費組是一個邏輯概念,它包含了多個同一類的消費者實例,通常這些消費者都消費同一類消息(都消費相同的 Topic)且消費邏輯一致。
消費組的引入是用來在消費消息時更好地進(jìn)行負(fù)載均衡和容錯。
2.1.2 廣播消費模式(BROADCASTING)
廣播消費模式即全部的消息會廣播分發(fā)到所有的消費者實例,每個消費者實例會收到全量的消息(即便消費組中有多個消費者都訂閱同一 Topic)。
如下圖所示,生產(chǎn)者發(fā)送了 5 條消息,每個消費組中的消費者都收到全部的 5 條消息。
廣播模式使用較少,適合各個消費者都需要通知的場景,如刷新應(yīng)用中的緩存。
注意事項:
- 廣播消費模式下不支持 順序消息。
- 廣播消費模式下不支持 重置消費位點。
- 每條消息都需要被相同訂閱邏輯的多臺機(jī)器處理。
- 消費進(jìn)度在客戶端維護(hù),出現(xiàn)重復(fù)消費的概率稍大于集群模式。如果消費進(jìn)度文件丟失,存在消息丟失的可能。
- 廣播模式下,消息隊列 RocketMQ 版保證每條消息至少被每臺客戶端消費一次,但是并不會重投消費失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費失敗的情況。
- 廣播模式下,客戶端每一次重啟都會從最新消息消費??蛻舳嗽诒煌V蛊陂g發(fā)送至服務(wù)端的消息將會被自動跳過,請謹(jǐn)慎選擇。
- 廣播模式下,每條消息都會被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。
- 廣播模式下服務(wù)端不維護(hù)消費進(jìn)度,所以消息隊列 RocketMQ 版控制臺不支持消息堆積查詢、消息堆積報警和訂閱關(guān)系查詢功能。
2.1.3 集群消費模式(CLUSTERING)
集群消費模式下,同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負(fù)載均衡到了同一個消費組的多個消費者實例上。
更具體一點,在同一消費組中的不同消費者會根據(jù)負(fù)載機(jī)制來平均地訂閱 Topic 中的每個 Queue。(默認(rèn) AVG 負(fù)載方式)
RocketMQ 默認(rèn)使用集群消費模式,這也是大部分場景下會使用到的消費模式。
2.2 消費者拉取消息模式
2.2.1 Pull
指消費者主動拉取消息進(jìn)行消費,主動從 Broker 拉取消息,主動權(quán)由消費者應(yīng)用控制。
2.2.2 Push
指 Broker 主動將消息 Push 給消費者,Broker 收到消息就會主動推送到消費者端。該模式的消費實時性較高,也是主流場景中普遍采用的消費形式。
消費者組中的消費者實例會根據(jù)預(yù)設(shè)的負(fù)載均衡算法對 Topic 中的 Queue 進(jìn)行均勻的訂閱,每個 Queue 最多只能被一個消費者訂閱。
在 RocketMQ 中,Push 消費其實也是由 Pull 消費(拉取)實現(xiàn)。Push 消費只是通過客戶端 API 層面的封裝讓用戶感覺像是 Broker 在推送消息給消費者。
2.2.3 POP
RocketMQ 5.0 引入的新消費形式,是 Pull 拉取的另一種實現(xiàn)。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分別消費重試 Topic 和普通 Topic)。
POP 與 Pull 可以通過一個開關(guān)實時進(jìn)行切換。POP 模式下,Broker 來控制每個消費者消費的隊列和拉取的消息,把重平衡邏輯從客戶端移到了服務(wù)端。
主要解決了原來 Push 模式消費的以下痛點:
富客戶端:客戶端邏輯比較重,多語言支持不友好隊列獨占:Topic 中的一個 Queue 最多只能被 1 個 Push 消費者消費,消費者數(shù)量無法無限擴(kuò)展。且消費者 hang 住時該隊列的消息會堆積。消費后更新 offset:本地消費成功才會提交 offset
RocketMQ 5.0 的輕量化 gRPC 客戶端就是基于 POP 消費模式開發(fā)
2.3 隊列負(fù)載機(jī)制與重平衡
在集群消費模式下,消費組中的消費者共同消費訂閱的 Topic 中的所有消息,這里就存在 Topic 中的隊列如何分配給消費者的問題。
2.3.1 隊列負(fù)載機(jī)制
RocketMQ Broker 中的隊列負(fù)載機(jī)制將一個 Topic 的不同隊列按照算法盡可能平均地分配給消費者組中的所有消費者。RocketMQ 預(yù)設(shè)了多種負(fù)載算法供不同場景下的消費。
AVG:將隊列按數(shù)量平均分配給多個消費者,按 Broker 順序先分配第一個 Broker 的所有隊列給第一個消費者,然后給第二個。
AVG_BY_CIRCLE:將 Broker 上的隊列輪流分給不同消費者,更適用于 Topic 在不同 Broker 之間分布不均勻的情況。
默認(rèn)采用 AVG 負(fù)載方式。
2.3.2 重平衡(Rebalance)
為消費者分配隊列消費的這一個負(fù)載過程并不是一勞永逸的,比如當(dāng)消費者數(shù)量變化、Broker 掉線等情況發(fā)生后,原先的負(fù)載就變得不再均衡,此時就需要重新進(jìn)行負(fù)載均衡,這一過程被稱為重平衡機(jī)制。
每隔 20s,RocketMQ 會進(jìn)行一次檢查,檢查隊列數(shù)量、消費者數(shù)量是否發(fā)生變化,如果變化則觸發(fā)消費隊列重平衡,重新執(zhí)行上述負(fù)載算法。
2.4 消費端高可靠
2.4.1 重試-死信機(jī)制
在實際使用中,消息的消費可能出現(xiàn)失敗。RocketMQ 擁有重試機(jī)制和死信機(jī)制來保證消息消費的可靠性。
正常消費:消費成功則提交消費位點
重試機(jī)制:如果正常消費失敗,消息會被消費者發(fā)回 Broker,放入重試 Topic: %RETRY%消費者組
。最多重試消費 16 次,重試的時間間隔逐漸變長。(消費者組會自動訂閱重試 Topic)。
這里地延遲重試采用了 RocketMQ 的延遲消息,重試的 16 次時間間隔為延遲消息配置的每個延遲等級的時間(從第三個等級開始)。如果修改延遲等級時間的配置,重試的時間間隔也會相應(yīng)發(fā)生變化。但即便延遲等級時間間隔配置不足 16 個,仍會重試 16 次,后面按照最大的時間間隔來重試。
死信機(jī)制:如果正常消費和重試 16 次均失敗,消息會保存到死信 Topic %DLQ%消費者組
中,此時需人工介入處理
2.4.2 隊列負(fù)載機(jī)制與重平衡
當(dāng)發(fā)生 Broker 掛掉或者消費者掛掉時,會引發(fā)重平衡,可以自動感知有組件掛掉的情況并重新調(diào)整消費者的訂閱關(guān)系。
2.5 并發(fā)消費與順序消費
在消費者客戶端消費時,有兩種訂閱消息的方式,分別是并發(fā)消費和順序消費。廣播模式不支持順序消費,僅有集群模式能使用順序消費。
需要注意的是,這里所說的順序消費指的是隊列維度的順序,即在消費一個隊列時,消費消息的順序和消息發(fā)送的順序一致。如果一個 Topic 有多個隊列, 是不可能達(dá)成 Topic 級別的順序消費的,因為無法控制哪個隊列的消息被先消費。Topic 只有一個隊列的情況下能夠?qū)崿F(xiàn) Topic 級別的順序消費。
具體順序生產(chǎn)和消費代碼見 官方文檔。
順序生產(chǎn)的方式為串行生產(chǎn),并在生產(chǎn)時指定隊列。
并發(fā)消費的方式是調(diào)用消費者的指定 MessageListenerConcurrently
作為消費的回調(diào)類,順序消費則使用 MessageListenerOrderly
類進(jìn)行回調(diào)。處理這兩種消費方式的消費服務(wù)也不同,分別是 ConsumeMessageConcurrentlyService
和 ConsumeMessageOrderlyService
。
順序消費的大致原理是依靠兩組鎖,一組在 Broker 端(Broker 鎖),鎖定隊列和消費者的關(guān)系,保證同一時間只有一個消費者在消費;在消費者端也有一組鎖(消費隊列鎖)以保證消費的順序性。
2.6 消費進(jìn)度保存和提交
消費者消費一批消息完成之后,需要保存消費進(jìn)度。如果是集群消費模式,還需要將消費進(jìn)度讓其他消費者知道,所以需要提交消費進(jìn)度。這樣在消費者重啟或隊列重平衡時可以根據(jù)消費進(jìn)度繼續(xù)消費。
不同模式下消費進(jìn)度保存方式的不同:
- 廣播模式:保存在消費者本地。因為每個消費者都需要消費全量消息消息。在
LocalfileOffsetStore
當(dāng)中。 - 集群模式:保存在 Broker,同時消費者端緩存。因為一個 Topic 的消息只要被消費者組中的一個消費者消費即可,所以消息的消費進(jìn)度需要統(tǒng)一保存。通過
RemoteBrokerOffsetStore
存儲。
集群模式下,消費者端有定時任務(wù),定時將內(nèi)存中的消費進(jìn)度提交到 Broker,Broker 也有定時任務(wù)將內(nèi)存中的消費偏移量持久化到磁盤。此外,消費者向 Broker 拉取消息時也會提交消費偏移量。注意,消費者線程池提交的偏移量是線程池消費的這一批消息中偏移量最小的消息的偏移量。
- 消費完一批消息后將消息消費進(jìn)度存在本地內(nèi)存
- 消費者中有一個定時線程,每 5s 將內(nèi)存中所有隊列的消費偏移量提交到 Broker
- Broker 收到消費進(jìn)度先緩存到內(nèi)存,有一個定時任務(wù)每隔 5s 將消息偏移量持久化到磁盤
- 消費者向 Broker 拉取消息時也會將隊列的消息偏移量提交到 Broker
3. 消費流程
這張圖是阿里云的文章講解消費時用到的,能夠清晰地表示客戶端 Push 模式并發(fā)消費流程。
從左上角第一個方框開始看
- 消費者啟動時喚醒重平衡服務(wù) RebalanceService,重平衡服務(wù)是客戶端開始消費的起點。
- 重平衡服務(wù)會周期性(每 20s)執(zhí)行重平衡方法 doRebalance),查詢所有注冊的 Broker,根據(jù)注冊的 Broker 數(shù)量為自身分配負(fù)載的隊列 rebalanceByTopic()
- 分配完隊列后,會為每個分配到的新隊列創(chuàng)建一個消息拉取請求 pullRequest,這個拉取請求中保存一個處理隊列 processQueue,即圖中的紅黑樹(TreeMap),用來保存拉取到的消息。紅黑樹保存消息的順序。
- 消息拉取線程應(yīng)用生產(chǎn)-消費模式,用一個線程從拉取請求隊列 pullRequestQueue 中彈出拉取請求,執(zhí)行拉取任務(wù),將拉取到的消息放入處理隊列。
- 拉取請求在一次拉取消息完成之后會復(fù)用,重新被放入拉取請求隊列 pullRequestQueue 中
- 拉取完成后,在 NettyClientPublicExecutorThreadPool 線程池異步處理結(jié)果,將拉取到的消息放入處理隊列,然后調(diào)用 consumeMessageService.submitConsumeRequest,將處理隊列和 多個消費任務(wù)提交到消
- 費線程池。每個消費任務(wù)消費 1 批消息(1 批默認(rèn)為 1 條)
- 每個消費者都有一個消費線程池 consumeMessageThreadPool ,默認(rèn)有 20 個消費線程。
- 消費線程池的每個消費線程會嘗試從消費任務(wù)隊列中獲取消費請求,執(zhí)行消費業(yè)務(wù)邏輯 listener.consumeMessage。
- 消費完成后,如果消費成功,則更新偏移量 updateOffset(先更新到內(nèi)存 offsetTable,定時上報到 Broker。Broker 端也先放到內(nèi)存,定時刷盤)。
到此這篇關(guān)于RocketMQ中消費者概念和消費流程詳解的文章就介紹到這了,更多相關(guān)RocketMQ中的消費者內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot集成MyBatis實現(xiàn)通用Mapper的配置及使用
關(guān)于MyBatis,大部分人都很熟悉。MyBatis 是一款優(yōu)秀的持久層框架,它支持定制化 SQL、存儲過程以及高級映射。這篇文章主要介紹了Spring Boot集成MyBatis實現(xiàn)通用Mapper,需要的朋友可以參考下2018-08-08java中httpclient封裝post請求和get的請求實例
這篇文章主要介紹了java中httpclient封裝post請求和get的請求實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10SpringBoot項目調(diào)優(yōu)及垃圾回收器的比較詳解
這篇文章主要介紹了SpringBoot項目調(diào)優(yōu)及垃圾回收器的比較詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04解讀RedisTemplate的各種操作(set、hash、list、string)
這篇文章主要介紹了解讀RedisTemplate的各種操作(set、hash、list、string),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12IDEA運行Java項目報錯java: 錯誤: 不支持發(fā)行版本 xx的解決方法
這篇文章主要介紹了IDEA運行Java項目報錯java: 錯誤: 不支持發(fā)行版本 xx的解決方法,文中有詳細(xì)的解決方案供大家參考,對大家解決問題有一定的幫助,需要的朋友可以參考下2025-04-04