欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消費(fèi)冪概念與使用分析

 更新時(shí)間:2023年02月13日 11:15:05   作者:每天都要進(jìn)步一點(diǎn)點(diǎn)  
如果有?個(gè)操作,多次執(zhí)?與?次執(zhí)?所產(chǎn)?的影響是相同的,我們就稱這個(gè)操作是冪等的。當(dāng)出現(xiàn)消費(fèi)者對(duì)某條消息重復(fù)消費(fèi)的情況時(shí),重復(fù)消費(fèi)的結(jié)果與消費(fèi)?次的結(jié)果是相同的,并且多次消費(fèi)并未對(duì)業(yè)務(wù)系統(tǒng)產(chǎn)?任何負(fù)?影響,那么這整個(gè)過(guò)程就可實(shí)現(xiàn)消息冪等

一、什么是消費(fèi)冪等

冪等:如果有一個(gè)操作,多次執(zhí)行與一次執(zhí)行所產(chǎn)生的影響是相同的,我們就稱這個(gè)操作是冪等的。

基于上述的概念,結(jié)合消息消費(fèi)的場(chǎng)景,我們能夠總結(jié)出消息冪等的概念:

如果消息重試多次,消費(fèi)者端對(duì)該重復(fù)消息消費(fèi)多次與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)沒(méi)有對(duì)系統(tǒng)產(chǎn)生副作用,那么我們就稱這個(gè)過(guò)程是消息冪等的。

在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息很有可能會(huì)出現(xiàn)重復(fù)發(fā)送或重復(fù)消費(fèi)。如果重復(fù)的消息可能會(huì)影響業(yè)務(wù)處理,那么就應(yīng)該對(duì)消息做冪等處理。

二、消息重復(fù)的場(chǎng)景分析

由于網(wǎng)絡(luò)原因閃斷,ACK返回失敗等情況出現(xiàn),不可避免的會(huì)發(fā)生消息重復(fù)的情況。最常見(jiàn)的有下面三種場(chǎng)景:

(1)、生產(chǎn)者發(fā)送消息時(shí)發(fā)生消息重復(fù)

當(dāng)一條消息已被成功發(fā)送到RocketMQ的Broker中,并且Broker已經(jīng)持久化到磁盤了,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者生產(chǎn)者宕機(jī)現(xiàn)象,導(dǎo)致Broker對(duì)生產(chǎn)者應(yīng)答失敗。 如果此時(shí)生產(chǎn)者意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息,那么后續(xù)Consumer就一定會(huì)消費(fèi)兩次該消息。

(2)、消費(fèi)者消費(fèi)消息時(shí)發(fā)生消息重復(fù)

消息已投遞到Consumer并完成業(yè)務(wù)處理,都會(huì)向RocketMQ Broker返回ACK確認(rèn)響應(yīng),但是由于網(wǎng)絡(luò)閃斷等原因,可能導(dǎo)致Broker沒(méi)能成功收到Consumer發(fā)送的消費(fèi)成功ACK響應(yīng),此時(shí)Broker認(rèn)為Consumer沒(méi)能消費(fèi)成功,為了保證消息至少被消費(fèi)一次,Broker將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過(guò)的消息,此時(shí)消費(fèi)者就會(huì)收到與之前處理過(guò)的內(nèi)容相同、Message ID也相同的消息。

(3)、負(fù)載均衡時(shí)發(fā)生消息重復(fù)

當(dāng)Broker重啟或Consumer重啟、擴(kuò)容或縮容時(shí),都會(huì)觸發(fā)重新負(fù)載均衡(Rebalance),此時(shí)Consumer去讀取Broker中的offset可能還沒(méi)及時(shí)更新,此時(shí)Consumer可能會(huì)收到曾經(jīng)被消費(fèi)過(guò)的消息。

可以看到,無(wú)論是發(fā)送時(shí)重復(fù)還是消費(fèi)時(shí)重復(fù),最終的效果均為消費(fèi)者消費(fèi)時(shí)收到了重復(fù)的消息,那么我們就知道:只需要在消費(fèi)者端統(tǒng)一進(jìn)行冪等處理就能夠?qū)崿F(xiàn)消息冪等。

三、如何實(shí)現(xiàn)消費(fèi)冪等

由于做冪等操作不可避免要產(chǎn)生巨大的開(kāi)銷,RocketMQ 為了追求高性能,本身沒(méi)有提供消費(fèi)冪等的特性,它要求我們?cè)跇I(yè)務(wù)上進(jìn)行去重,也就是說(shuō)自己在消費(fèi)消息時(shí)要做到冪等性。RocketMQ 雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送、消費(fèi)重復(fù)情況,只有網(wǎng)絡(luò)異常,Consumer 啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。 所以消費(fèi)者在接收到消息以后,有必要根據(jù)業(yè)務(wù)上的唯一 Key 對(duì)消息做冪等處理的必要性。

前面介紹到,RocketMQ的消息有消息ID(Message ID)、消息Key(Message Key)兩個(gè)屬性。因?yàn)?Message ID 有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。 最好的方式是根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標(biāo)識(shí)可以通過(guò)消息Key 進(jìn)行設(shè)置:

Message message = new Message();
// 設(shè)置消息的Key
message.setKey("XXX");
mqProducer.send(message);

生產(chǎn)者發(fā)送消息的時(shí)候,消息已經(jīng)設(shè)置了唯一的Message Key,在Consumer消費(fèi)消息時(shí),可以根據(jù)消息的Key 進(jìn)行冪等處理。

// 根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)Key做冪等處理
mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for(MessageExt msg : msgs){
            // 獲取到消息Key
            String key = msg.getKeys();
            // 偽代碼如下:
            // 1. 根據(jù)消息key去redis查詢是否存在的記錄
            Object obj = redis.get(key);
            if (null != obj) {
                logger.info("消息重復(fù)消費(fèi)了");
                // ...
            } else {
                // 2. 從數(shù)據(jù)庫(kù)中查詢是否存在記錄
                MessageLog messageLog = messageService.getByMessageKey(key);
                if (null != messageLog) {
                     logger.info("消息重復(fù)消費(fèi)了");
                     // ...
                } else {
                    // 3. 寫redis、DB
                    // 業(yè)務(wù)處理
                    redis.set(xxx, xxx);
                    messageService.save(xxx);
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

這里給一個(gè)通用性的解決方案 :使用數(shù)據(jù)庫(kù) + Redis實(shí)現(xiàn)消息消費(fèi)冪等。

(1)、Consumer消費(fèi)消息時(shí),拿到唯一的業(yè)務(wù)標(biāo)識(shí)---消息Key,然后根據(jù)消息Key去Redis緩存中查詢是否存在對(duì)應(yīng)的記錄,如果存在,則說(shuō)明本次操作是重復(fù)性操作;如果緩存中不存在此Key對(duì)應(yīng)的記錄,則執(zhí)行下一步;

(2)、根據(jù)消息Key去數(shù)據(jù)庫(kù)中查詢是否存在對(duì)應(yīng)的記錄,如果存在,則說(shuō)明本次操作是重復(fù)性操作;如果不存在的話,則執(zhí)行下一步;

(3)、在同一個(gè)事務(wù)中完成三項(xiàng)操作,保證下面三項(xiàng)操作同時(shí)成功,同時(shí)失?。?/p>

a、進(jìn)行業(yè)務(wù)處理;

b、將消息Key通過(guò)set(key, value, expireTime)寫入到Redis緩存中;

c、將消息Key作為數(shù)據(jù)庫(kù)表的主鍵或者唯一鍵插入到表中;

關(guān)于第二步中再次去從數(shù)據(jù)庫(kù)中校驗(yàn)是否存在對(duì)應(yīng)的記錄,其實(shí)這一步也是有必要的。由于我們一般都會(huì)在緩存使用過(guò)程中設(shè)置過(guò)期時(shí)間,如果緩存一旦過(guò)期,就可能發(fā)生緩存穿透,使請(qǐng)求直接滲透到數(shù)據(jù)庫(kù)中,所以我們此時(shí)還是要從數(shù)據(jù)庫(kù)中再次校驗(yàn)一下,將二者結(jié)合在一起是一個(gè)比較好的方案。

到此這篇關(guān)于RocketMQ消費(fèi)冪概念與使用分析的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)冪等內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳細(xì)總結(jié)Java基礎(chǔ)類和包裝類

    詳細(xì)總結(jié)Java基礎(chǔ)類和包裝類

    近幾天一直在復(fù)習(xí)Java基礎(chǔ)知識(shí),今天就帶大家總結(jié)一下Java基礎(chǔ)類和包裝類,下文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)Java基礎(chǔ)的小伙伴們很有幫助,需要的朋友可以參考下
    2021-05-05
  • 淺談Spring中HandlerMapping的使用

    淺談Spring中HandlerMapping的使用

    這篇文章主要介紹了淺談Spring中HandlerMapping的使用,Spingmvc中的HandlerMapping負(fù)責(zé)解析請(qǐng)求URL,對(duì)應(yīng)到Handler進(jìn)行處理,這里的Handler一般為Controller里的一個(gè)方法method,也可以為servlet或者Controller等,需要的朋友可以參考下
    2023-08-08
  • 基于SpringBoot實(shí)現(xiàn)發(fā)送帶附件的郵件

    基于SpringBoot實(shí)現(xiàn)發(fā)送帶附件的郵件

    這篇文章主要介紹了基于SpringBoot實(shí)現(xiàn)發(fā)送帶附件的郵件,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • 詳解Java高級(jí)特性之反射

    詳解Java高級(jí)特性之反射

    這篇文章主要介紹了Java高級(jí)特性之反射的相關(guān)知識(shí),文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-06-06
  • Java調(diào)用接口如何獲取json數(shù)據(jù)解析后保存到數(shù)據(jù)庫(kù)

    Java調(diào)用接口如何獲取json數(shù)據(jù)解析后保存到數(shù)據(jù)庫(kù)

    這篇文章主要介紹了Java調(diào)用接口如何獲取json數(shù)據(jù)解析后保存到數(shù)據(jù)庫(kù)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • springboot-assembly自定義打包全過(guò)程

    springboot-assembly自定義打包全過(guò)程

    這篇文章主要介紹了springboot-assembly自定義打包全過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • 通過(guò)案例了解靜態(tài)修飾符static使用場(chǎng)景

    通過(guò)案例了解靜態(tài)修飾符static使用場(chǎng)景

    這篇文章主要介紹了通過(guò)案例了解靜態(tài)修飾符static使用場(chǎng)景,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-10-10
  • SpringBoot使用Scheduling實(shí)現(xiàn)定時(shí)任務(wù)的示例代碼

    SpringBoot使用Scheduling實(shí)現(xiàn)定時(shí)任務(wù)的示例代碼

    Spring Boot提供了一種方便的方式來(lái)實(shí)現(xiàn)定時(shí)任務(wù),即使用Spring的@Scheduled注解,通過(guò)在方法上添加@Scheduled注解,我們可以指定方法在何時(shí)執(zhí)行,本文我們就給大家介紹一下SpringBoot如何使用Scheduling實(shí)現(xiàn)定時(shí)任務(wù),需要的朋友可以參考下
    2023-08-08
  • Springboot中@Value注解的場(chǎng)景用法及可能遇到的問(wèn)題詳解

    Springboot中@Value注解的場(chǎng)景用法及可能遇到的問(wèn)題詳解

    這篇文章主要給大家介紹了關(guān)于Springboot中@Value注解的場(chǎng)景用法及可能遇到問(wèn)題的相關(guān)資料, @Value通常用于注入外部化屬性,即外部配置屬性的注入,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2023-11-11
  • javax.validation.constraints注解使用

    javax.validation.constraints注解使用

    這篇文章主要介紹了javax.validation.constraints注解使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07

最新評(píng)論