RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理
RocketMQ消費(fèi)進(jìn)度管理
業(yè)務(wù)實(shí)現(xiàn)消費(fèi)回調(diào)的時(shí)候,當(dāng)且僅當(dāng)此回調(diào)函數(shù)返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才會(huì)認(rèn)為這批消息(默認(rèn)是1條)是消費(fèi)完成的
如果這時(shí)候消息消費(fèi)失敗,例如數(shù)據(jù)庫(kù)異常,余額不足扣款失敗等一切業(yè)務(wù)認(rèn)為消息需要重試的場(chǎng)景,只要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ就會(huì)認(rèn)為這批消息消費(fèi)失敗了。
為了保證消息是肯定被至少消費(fèi)成功一次,RocketMQ會(huì)把這批消費(fèi)失敗的消息重發(fā)回Broker(topic不是原topic而是這個(gè)消費(fèi)租的RETRY topic),在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后,再次投遞到這個(gè)ConsumerGroup。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默 認(rèn)16次),就會(huì)投遞到DLQ死信隊(duì)列。應(yīng)用可以監(jiān)控死信隊(duì)列來(lái)做人工干預(yù)。
從哪里開(kāi)始消費(fèi)
當(dāng)新實(shí)例啟動(dòng)的時(shí)候,PushConsumer會(huì)拿到本消費(fèi)組broker已經(jīng)記錄好的消費(fèi)進(jìn)度,如果這個(gè)消費(fèi)進(jìn)度在Broker并沒(méi)有存儲(chǔ)起來(lái),證明這個(gè)是一個(gè)全新的消費(fèi)組,這時(shí)候客戶端有幾個(gè)策略可以選擇:
CONSUME_FROM_LAST_OFFSET //默認(rèn)策略,從該隊(duì)列最尾開(kāi)始消費(fèi),即跳過(guò)歷史消息 CONSUME_FROM_FIRST_OFFSET //從隊(duì)列最開(kāi)始開(kāi)始消費(fèi),即歷史消息(還儲(chǔ)存在broker的)全部消費(fèi)一遍 CONSUME_FROM_TIMESTAMP//從某個(gè)時(shí)間點(diǎn)開(kāi)始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個(gè)小時(shí)以前
消息ACK機(jī)制
RocketMQ是以consumer group+queue為單位是管理消費(fèi)進(jìn)度的,以一個(gè)consumer offset標(biāo)記這個(gè)消費(fèi)組在這條queue上的消費(fèi)進(jìn)度。
每次消息成功后,本地的消費(fèi)進(jìn)度會(huì)被更新,然后由定時(shí)器定時(shí)同步到broker(即,不是立刻同步到broker,有一段時(shí)間消費(fèi)進(jìn)度只會(huì)存在于本地,此時(shí)如果宕機(jī),那么未提交的消費(fèi)進(jìn)度就會(huì)被重新消費(fèi)),以此持久化消費(fèi)進(jìn)度。但是每次記錄消費(fèi)進(jìn)度的時(shí)候,只會(huì)把一批消息中最小的offset值為消費(fèi)進(jìn)度值,如下圖:
比如2消費(fèi)失敗,rocketmq跳過(guò)2消費(fèi)到了8,8消費(fèi)成功了,但是提交的時(shí)候只會(huì)提交【消費(fèi)到了1】,因?yàn)?失敗了,所以會(huì)提交最小成功點(diǎn)
重復(fù)消費(fèi)問(wèn)題
由于消費(fèi)進(jìn)度只是記錄了一個(gè)下標(biāo),就可能出現(xiàn)拉取了100條消息如 2101-2200的消息,后面99條都消費(fèi)結(jié)束了,只有2101消費(fèi)一直沒(méi)有結(jié)束的情況。
在這種情況下,RocketMQ為了保證消息肯定被消費(fèi)成功,消費(fèi)進(jìn)度職能維持在2101,直到2101也消費(fèi)結(jié)束了,本地的消費(fèi)進(jìn)度才能標(biāo)記2200消費(fèi)結(jié)束了(注:consumerOffset=2201)。 在這種設(shè)計(jì)下,就有消費(fèi)大量重復(fù)的風(fēng)險(xiǎn)。如2101在還沒(méi)有消費(fèi)完成的時(shí)候消費(fèi)實(shí)例突然退出(機(jī)器斷電,或者被kill)。這條queue的消費(fèi)進(jìn)度還是維持在2101,當(dāng)queue重新分配給新的實(shí)例的時(shí)候,新的實(shí)例從broker上拿到的消費(fèi)進(jìn)度還是維持在2101,這時(shí)候就會(huì)又從2101開(kāi)始消費(fèi),2102-2200這批消息實(shí)際上已經(jīng)被消費(fèi)過(guò)還是會(huì)投遞一次。
對(duì)于這個(gè)場(chǎng)景,RocketMQ暫時(shí)無(wú)能為力,所以業(yè)務(wù)必須要保證消息消費(fèi)的冪等性,這也是RocketMQ官方多次強(qiáng)調(diào)的態(tài)度。
重復(fù)消費(fèi)驗(yàn)證
查看當(dāng)前消費(fèi)進(jìn)度
檢查隊(duì)列消費(fèi)的當(dāng)前進(jìn)度。 查看RocketMQ 的config文件夾下的 consumerOffset.json
cat consumerOffset.json
通過(guò)consumerOffset.json我們可以知道當(dāng)前 topicTest
主題的 rocket_test_consumer_group
組的 queue2
消費(fèi)到偏移量為32
消費(fèi)者發(fā)送消息
消費(fèi)者發(fā)送消息,并查看各個(gè)隊(duì)列消息的偏移量
我們發(fā)現(xiàn)隊(duì)列2的偏移量最小為32 消費(fèi)的時(shí)候最小偏移量不提交,其他都正常
//隊(duì)列2的偏移量為32的數(shù)據(jù)在等待 if (ext.getQueueId() == 2 && ext.getQueueOffset() == 32) { System.out.println("消息消費(fèi)耗時(shí)較廠接收queueId:[" + ext.getQueueId() + "],偏移量 offset:[" + ext.getQueueOffset() + "]"); //等待 模擬假死狀態(tài) try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } }
運(yùn)行查看日志
我們發(fā)現(xiàn)只有隊(duì)列2的偏移量為32的消息消費(fèi)超時(shí),其他都已經(jīng)正常消費(fèi) 我們?cè)俨榭聪耤onsumerOffset.json
cat consumerOffset.json
我們發(fā)現(xiàn)因?yàn)閞ocketMQ 整個(gè)消費(fèi)記錄都沒(méi)有被提交,所以下次消費(fèi)會(huì)全部再次消費(fèi)。 這里模擬出了整個(gè)消費(fèi)進(jìn)度都在本地,沒(méi)來(lái)得及提交給broker。
還有一種情況就是,進(jìn)度成功提交給broker了,queue0、1、3的消費(fèi)進(jìn)度都改變了。但是queue2的消費(fèi)進(jìn)度還是32,因?yàn)橄M(fèi)32的時(shí)候超時(shí)了,rocketmq只能提交最小成功offset!
再次消費(fèi)
去掉延時(shí)代碼繼續(xù)消費(fèi)
我們發(fā)現(xiàn)消息被重復(fù)消費(fèi)了一遍
到此這篇關(guān)于RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)進(jìn)度管理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問(wèn)題
- RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解
- RocketMQ中的消費(fèi)者啟動(dòng)流程解讀
- RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解
- 詳解RocketMQ中的消費(fèi)者啟動(dòng)與消費(fèi)流程分析
- RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑
- RocketMq同組消費(fèi)者如何自動(dòng)設(shè)置InstanceName
相關(guān)文章
聊聊spring boot的WebFluxTagsProvider的使用
這篇文章主要介紹了聊聊spring boot的WebFluxTagsProvider的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07詳解MyBatis-Plus Wrapper條件構(gòu)造器查詢大全
這篇文章主要介紹了詳解MyBatis-Plus Wrapper條件構(gòu)造器查詢大全,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08Spring+SpringMVC+MyBatis深入學(xué)習(xí)及搭建(二)之MyBatis原始Dao開(kāi)發(fā)和mapper代理開(kāi)發(fā)
這篇文章主要介紹了Spring+SpringMVC+MyBatis深入學(xué)習(xí)及搭建(二)之MyBatis原始Dao開(kāi)發(fā)和mapper代理開(kāi)發(fā),需要的朋友可以參考下2017-05-05Java中SSM框架實(shí)現(xiàn)增刪改查功能代碼詳解
這篇文章主要介紹了Java中SSM框架實(shí)現(xiàn)增刪改查功能代碼詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07Docker?快速部署Springboot項(xiàng)目超詳細(xì)最新版
這篇文章主要介紹了Docker?快速部署Springboot項(xiàng)目超詳細(xì)最新版的相關(guān)資料,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04eclipse修改jvm參數(shù)調(diào)優(yōu)方法(2種)
本篇文章主要介紹了eclipse修改jvm參數(shù)調(diào)優(yōu)方法(2種),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-02-02