欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理

 更新時(shí)間:2023年10月11日 10:31:00   作者:fFee-ops  
這篇文章主要介紹了RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理,業(yè)務(wù)實(shí)現(xiàn)消費(fèi)回調(diào)的時(shí)候,當(dāng)且僅當(dāng)此回調(diào)函數(shù)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才會(huì)認(rèn)為這批消息(默認(rèn)是1條)是消費(fèi)完成的,需要的朋友可以參考下

RocketMQ消費(fèi)進(jìn)度管理

業(yè)務(wù)實(shí)現(xiàn)消費(fèi)回調(diào)的時(shí)候,當(dāng)且僅當(dāng)此回調(diào)函數(shù)返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才會(huì)認(rèn)為這批消息(默認(rèn)是1條)是消費(fèi)完成的

如果這時(shí)候消息消費(fèi)失敗,例如數(shù)據(jù)庫(kù)異常,余額不足扣款失敗等一切業(yè)務(wù)認(rèn)為消息需要重試的場(chǎng)景,只要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ就會(huì)認(rèn)為這批消息消費(fèi)失敗了。

為了保證消息是肯定被至少消費(fèi)成功一次,RocketMQ會(huì)把這批消費(fèi)失敗的消息重發(fā)回Broker(topic不是原topic而是這個(gè)消費(fèi)租的RETRY topic),在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后,再次投遞到這個(gè)ConsumerGroup。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默 認(rèn)16次),就會(huì)投遞到DLQ死信隊(duì)列。應(yīng)用可以監(jiān)控死信隊(duì)列來(lái)做人工干預(yù)。

從哪里開(kāi)始消費(fèi)

當(dāng)新實(shí)例啟動(dòng)的時(shí)候,PushConsumer會(huì)拿到本消費(fèi)組broker已經(jīng)記錄好的消費(fèi)進(jìn)度,如果這個(gè)消費(fèi)進(jìn)度在Broker并沒(méi)有存儲(chǔ)起來(lái),證明這個(gè)是一個(gè)全新的消費(fèi)組,這時(shí)候客戶端有幾個(gè)策略可以選擇:

CONSUME_FROM_LAST_OFFSET //默認(rèn)策略,從該隊(duì)列最尾開(kāi)始消費(fèi),即跳過(guò)歷史消息
CONSUME_FROM_FIRST_OFFSET //從隊(duì)列最開(kāi)始開(kāi)始消費(fèi),即歷史消息(還儲(chǔ)存在broker的)全部消費(fèi)一遍
CONSUME_FROM_TIMESTAMP//從某個(gè)時(shí)間點(diǎn)開(kāi)始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個(gè)小時(shí)以前

消息ACK機(jī)制

RocketMQ是以consumer group+queue為單位是管理消費(fèi)進(jìn)度的,以一個(gè)consumer offset標(biāo)記這個(gè)消費(fèi)組在這條queue上的消費(fèi)進(jìn)度。

每次消息成功后,本地的消費(fèi)進(jìn)度會(huì)被更新,然后由定時(shí)器定時(shí)同步到broker(即,不是立刻同步到broker,有一段時(shí)間消費(fèi)進(jìn)度只會(huì)存在于本地,此時(shí)如果宕機(jī),那么未提交的消費(fèi)進(jìn)度就會(huì)被重新消費(fèi)),以此持久化消費(fèi)進(jìn)度。但是每次記錄消費(fèi)進(jìn)度的時(shí)候,只會(huì)把一批消息中最小的offset值為消費(fèi)進(jìn)度值,如下圖:

在這里插入圖片描述

比如2消費(fèi)失敗,rocketmq跳過(guò)2消費(fèi)到了8,8消費(fèi)成功了,但是提交的時(shí)候只會(huì)提交【消費(fèi)到了1】,因?yàn)?失敗了,所以會(huì)提交最小成功點(diǎn)

重復(fù)消費(fèi)問(wèn)題

由于消費(fèi)進(jìn)度只是記錄了一個(gè)下標(biāo),就可能出現(xiàn)拉取了100條消息如 2101-2200的消息,后面99條都消費(fèi)結(jié)束了,只有2101消費(fèi)一直沒(méi)有結(jié)束的情況。

在這里插入圖片描述

在這種情況下,RocketMQ為了保證消息肯定被消費(fèi)成功,消費(fèi)進(jìn)度職能維持在2101,直到2101也消費(fèi)結(jié)束了,本地的消費(fèi)進(jìn)度才能標(biāo)記2200消費(fèi)結(jié)束了(注:consumerOffset=2201)。 在這種設(shè)計(jì)下,就有消費(fèi)大量重復(fù)的風(fēng)險(xiǎn)。如2101在還沒(méi)有消費(fèi)完成的時(shí)候消費(fèi)實(shí)例突然退出(機(jī)器斷電,或者被kill)。這條queue的消費(fèi)進(jìn)度還是維持在2101,當(dāng)queue重新分配給新的實(shí)例的時(shí)候,新的實(shí)例從broker上拿到的消費(fèi)進(jìn)度還是維持在2101,這時(shí)候就會(huì)又從2101開(kāi)始消費(fèi),2102-2200這批消息實(shí)際上已經(jīng)被消費(fèi)過(guò)還是會(huì)投遞一次。

對(duì)于這個(gè)場(chǎng)景,RocketMQ暫時(shí)無(wú)能為力,所以業(yè)務(wù)必須要保證消息消費(fèi)的冪等性,這也是RocketMQ官方多次強(qiáng)調(diào)的態(tài)度。

重復(fù)消費(fèi)驗(yàn)證

查看當(dāng)前消費(fèi)進(jìn)度

檢查隊(duì)列消費(fèi)的當(dāng)前進(jìn)度。 查看RocketMQ 的config文件夾下的 consumerOffset.json

cat consumerOffset.json

在這里插入圖片描述

通過(guò)consumerOffset.json我們可以知道當(dāng)前 topicTest 主題的 rocket_test_consumer_group 組的 queue2 消費(fèi)到偏移量為32

消費(fèi)者發(fā)送消息

消費(fèi)者發(fā)送消息,并查看各個(gè)隊(duì)列消息的偏移量

在這里插入圖片描述

我們發(fā)現(xiàn)隊(duì)列2的偏移量最小為32 消費(fèi)的時(shí)候最小偏移量不提交,其他都正常

//隊(duì)列2的偏移量為32的數(shù)據(jù)在等待
if (ext.getQueueId() == 2 && ext.getQueueOffset() == 32) {
System.out.println("消息消費(fèi)耗時(shí)較廠接收queueId:[" + ext.getQueueId() + "],偏移量
offset:[" + ext.getQueueOffset() + "]");
//等待 模擬假死狀態(tài)
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

運(yùn)行查看日志

在這里插入圖片描述

我們發(fā)現(xiàn)只有隊(duì)列2的偏移量為32的消息消費(fèi)超時(shí),其他都已經(jīng)正常消費(fèi) 我們?cè)俨榭聪耤onsumerOffset.json

cat consumerOffset.json

在這里插入圖片描述

我們發(fā)現(xiàn)因?yàn)閞ocketMQ 整個(gè)消費(fèi)記錄都沒(méi)有被提交,所以下次消費(fèi)會(huì)全部再次消費(fèi)。 這里模擬出了整個(gè)消費(fèi)進(jìn)度都在本地,沒(méi)來(lái)得及提交給broker。

還有一種情況就是,進(jìn)度成功提交給broker了,queue0、1、3的消費(fèi)進(jìn)度都改變了。但是queue2的消費(fèi)進(jìn)度還是32,因?yàn)橄M(fèi)32的時(shí)候超時(shí)了,rocketmq只能提交最小成功offset!

再次消費(fèi)

去掉延時(shí)代碼繼續(xù)消費(fèi)

在這里插入圖片描述

我們發(fā)現(xiàn)消息被重復(fù)消費(fèi)了一遍

到此這篇關(guān)于RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)進(jìn)度管理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 基于Java中字符串內(nèi)存位置詳解

    基于Java中字符串內(nèi)存位置詳解

    下面小編就為大家?guī)?lái)一篇基于Java中字符串內(nèi)存位置詳解。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-08-08
  • 聊聊spring boot的WebFluxTagsProvider的使用

    聊聊spring boot的WebFluxTagsProvider的使用

    這篇文章主要介紹了聊聊spring boot的WebFluxTagsProvider的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • 詳解MyBatis-Plus Wrapper條件構(gòu)造器查詢大全

    詳解MyBatis-Plus Wrapper條件構(gòu)造器查詢大全

    這篇文章主要介紹了詳解MyBatis-Plus Wrapper條件構(gòu)造器查詢大全,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • Mybatis常用注解中的SQL注入實(shí)例詳解

    Mybatis常用注解中的SQL注入實(shí)例詳解

    MyBatis是一款優(yōu)秀的持久層框架,它支持定制化 SQL(靈活)、存儲(chǔ)過(guò)程(PLSQL模塊化的組件,數(shù)據(jù)庫(kù)的一部分)以及高級(jí)映射(表映射為Bean也可以將Bean映射為表),下面這篇文章主要給大家介紹了關(guān)于Mybatis常用注解中的SQL注入的相關(guān)資料,需要的朋友可以參考下
    2022-02-02
  • IDEA設(shè)置Tab選項(xiàng)卡快速的操作

    IDEA設(shè)置Tab選項(xiàng)卡快速的操作

    這篇文章主要介紹了IDEA設(shè)置Tab選項(xiàng)卡快速的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • Spring+SpringMVC+MyBatis深入學(xué)習(xí)及搭建(二)之MyBatis原始Dao開(kāi)發(fā)和mapper代理開(kāi)發(fā)

    Spring+SpringMVC+MyBatis深入學(xué)習(xí)及搭建(二)之MyBatis原始Dao開(kāi)發(fā)和mapper代理開(kāi)發(fā)

    這篇文章主要介紹了Spring+SpringMVC+MyBatis深入學(xué)習(xí)及搭建(二)之MyBatis原始Dao開(kāi)發(fā)和mapper代理開(kāi)發(fā),需要的朋友可以參考下
    2017-05-05
  • Java中SSM框架實(shí)現(xiàn)增刪改查功能代碼詳解

    Java中SSM框架實(shí)現(xiàn)增刪改查功能代碼詳解

    這篇文章主要介紹了Java中SSM框架實(shí)現(xiàn)增刪改查功能代碼詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • Java如何獲取相對(duì)路徑文件

    Java如何獲取相對(duì)路徑文件

    這篇文章主要介紹了Java如何獲取相對(duì)路徑文件問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • Docker?快速部署Springboot項(xiàng)目超詳細(xì)最新版

    Docker?快速部署Springboot項(xiàng)目超詳細(xì)最新版

    這篇文章主要介紹了Docker?快速部署Springboot項(xiàng)目超詳細(xì)最新版的相關(guān)資料,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • eclipse修改jvm參數(shù)調(diào)優(yōu)方法(2種)

    eclipse修改jvm參數(shù)調(diào)優(yōu)方法(2種)

    本篇文章主要介紹了eclipse修改jvm參數(shù)調(diào)優(yōu)方法(2種),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-02-02

最新評(píng)論