欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消費冪概念與使用分析

 更新時間:2023年02月13日 11:15:05   作者:每天都要進步一點點  
如果有?個操作,多次執(zhí)?與?次執(zhí)?所產(chǎn)?的影響是相同的,我們就稱這個操作是冪等的。當出現(xiàn)消費者對某條消息重復(fù)消費的情況時,重復(fù)消費的結(jié)果與消費?次的結(jié)果是相同的,并且多次消費并未對業(yè)務(wù)系統(tǒng)產(chǎn)?任何負?影響,那么這整個過程就可實現(xiàn)消息冪等

一、什么是消費冪等

冪等:如果有一個操作,多次執(zhí)行與一次執(zhí)行所產(chǎn)生的影響是相同的,我們就稱這個操作是冪等的。

基于上述的概念,結(jié)合消息消費的場景,我們能夠總結(jié)出消息冪等的概念:

如果消息重試多次,消費者端對該重復(fù)消息消費多次與消費一次的結(jié)果是相同的,并且多次消費沒有對系統(tǒng)產(chǎn)生副作用,那么我們就稱這個過程是消息冪等的。

在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息很有可能會出現(xiàn)重復(fù)發(fā)送或重復(fù)消費。如果重復(fù)的消息可能會影響業(yè)務(wù)處理,那么就應(yīng)該對消息做冪等處理。

二、消息重復(fù)的場景分析

由于網(wǎng)絡(luò)原因閃斷,ACK返回失敗等情況出現(xiàn),不可避免的會發(fā)生消息重復(fù)的情況。最常見的有下面三種場景:

(1)、生產(chǎn)者發(fā)送消息時發(fā)生消息重復(fù)

當一條消息已被成功發(fā)送到RocketMQ的Broker中,并且Broker已經(jīng)持久化到磁盤了,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者生產(chǎn)者宕機現(xiàn)象,導(dǎo)致Broker對生產(chǎn)者應(yīng)答失敗。 如果此時生產(chǎn)者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息,那么后續(xù)Consumer就一定會消費兩次該消息。

(2)、消費者消費消息時發(fā)生消息重復(fù)

消息已投遞到Consumer并完成業(yè)務(wù)處理,都會向RocketMQ Broker返回ACK確認響應(yīng),但是由于網(wǎng)絡(luò)閃斷等原因,可能導(dǎo)致Broker沒能成功收到Consumer發(fā)送的消費成功ACK響應(yīng),此時Broker認為Consumer沒能消費成功,為了保證消息至少被消費一次,Broker將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,此時消費者就會收到與之前處理過的內(nèi)容相同、Message ID也相同的消息。

(3)、負載均衡時發(fā)生消息重復(fù)

當Broker重啟或Consumer重啟、擴容或縮容時,都會觸發(fā)重新負載均衡(Rebalance),此時Consumer去讀取Broker中的offset可能還沒及時更新,此時Consumer可能會收到曾經(jīng)被消費過的消息。

可以看到,無論是發(fā)送時重復(fù)還是消費時重復(fù),最終的效果均為消費者消費時收到了重復(fù)的消息,那么我們就知道:只需要在消費者端統(tǒng)一進行冪等處理就能夠?qū)崿F(xiàn)消息冪等。

三、如何實現(xiàn)消費冪等

由于做冪等操作不可避免要產(chǎn)生巨大的開銷,RocketMQ 為了追求高性能,本身沒有提供消費冪等的特性,它要求我們在業(yè)務(wù)上進行去重,也就是說自己在消費消息時要做到冪等性。RocketMQ 雖然不能嚴格保證不重復(fù),但是正常情況下很少會出現(xiàn)重復(fù)發(fā)送、消費重復(fù)情況,只有網(wǎng)絡(luò)異常,Consumer 啟停等異常情況下會出現(xiàn)消息重復(fù)。 所以消費者在接收到消息以后,有必要根據(jù)業(yè)務(wù)上的唯一 Key 對消息做冪等處理的必要性。

前面介紹到,RocketMQ的消息有消息ID(Message ID)、消息Key(Message Key)兩個屬性。因為 Message ID 有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。 最好的方式是根據(jù)業(yè)務(wù)唯一標識作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標識可以通過消息Key 進行設(shè)置:

Message message = new Message();
// 設(shè)置消息的Key
message.setKey("XXX");
mqProducer.send(message);

生產(chǎn)者發(fā)送消息的時候,消息已經(jīng)設(shè)置了唯一的Message Key,在Consumer消費消息時,可以根據(jù)消息的Key 進行冪等處理。

// 根據(jù)業(yè)務(wù)唯一標識Key做冪等處理
mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for(MessageExt msg : msgs){
            // 獲取到消息Key
            String key = msg.getKeys();
            // 偽代碼如下:
            // 1. 根據(jù)消息key去redis查詢是否存在的記錄
            Object obj = redis.get(key);
            if (null != obj) {
                logger.info("消息重復(fù)消費了");
                // ...
            } else {
                // 2. 從數(shù)據(jù)庫中查詢是否存在記錄
                MessageLog messageLog = messageService.getByMessageKey(key);
                if (null != messageLog) {
                     logger.info("消息重復(fù)消費了");
                     // ...
                } else {
                    // 3. 寫redis、DB
                    // 業(yè)務(wù)處理
                    redis.set(xxx, xxx);
                    messageService.save(xxx);
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

這里給一個通用性的解決方案 :使用數(shù)據(jù)庫 + Redis實現(xiàn)消息消費冪等。

(1)、Consumer消費消息時,拿到唯一的業(yè)務(wù)標識---消息Key,然后根據(jù)消息Key去Redis緩存中查詢是否存在對應(yīng)的記錄,如果存在,則說明本次操作是重復(fù)性操作;如果緩存中不存在此Key對應(yīng)的記錄,則執(zhí)行下一步;

(2)、根據(jù)消息Key去數(shù)據(jù)庫中查詢是否存在對應(yīng)的記錄,如果存在,則說明本次操作是重復(fù)性操作;如果不存在的話,則執(zhí)行下一步;

(3)、在同一個事務(wù)中完成三項操作,保證下面三項操作同時成功,同時失?。?/p>

a、進行業(yè)務(wù)處理;

b、將消息Key通過set(key, value, expireTime)寫入到Redis緩存中;

c、將消息Key作為數(shù)據(jù)庫表的主鍵或者唯一鍵插入到表中;

關(guān)于第二步中再次去從數(shù)據(jù)庫中校驗是否存在對應(yīng)的記錄,其實這一步也是有必要的。由于我們一般都會在緩存使用過程中設(shè)置過期時間,如果緩存一旦過期,就可能發(fā)生緩存穿透,使請求直接滲透到數(shù)據(jù)庫中,所以我們此時還是要從數(shù)據(jù)庫中再次校驗一下,將二者結(jié)合在一起是一個比較好的方案。

到此這篇關(guān)于RocketMQ消費冪概念與使用分析的文章就介紹到這了,更多相關(guān)RocketMQ消費冪等內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java實現(xiàn)駝峰下劃線互轉(zhuǎn)的使用示例

    Java實現(xiàn)駝峰下劃線互轉(zhuǎn)的使用示例

    駝峰和下劃線互轉(zhuǎn)場景是在不同命名規(guī)范的情況下,需要進行字段名稱的轉(zhuǎn)換,本文就來介紹一下Java實現(xiàn)駝峰下劃線互轉(zhuǎn)的使用示例,感興趣的可以了解一下
    2023-12-12
  • mybatis 字段名自動轉(zhuǎn)小寫的實現(xiàn)

    mybatis 字段名自動轉(zhuǎn)小寫的實現(xiàn)

    這篇文章主要介紹了mybatis 字段名自動轉(zhuǎn)小寫的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Springboot集成Spring Security實現(xiàn)JWT認證的步驟詳解

    Springboot集成Spring Security實現(xiàn)JWT認證的步驟詳解

    這篇文章主要介紹了Springboot集成Spring Security實現(xiàn)JWT認證的步驟詳解,幫助大家更好的理解和使用springboot,感興趣的朋友可以了解下
    2021-02-02
  • Java日常練習(xí)題,每天進步一點點(45)

    Java日常練習(xí)題,每天進步一點點(45)

    下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你
    2021-07-07
  • Java中的ObjectOutputStream類使用

    Java中的ObjectOutputStream類使用

    ObjectOutputStream是Java.io包中的一個類,用于將Java對象的狀態(tài)信息序列化為字節(jié)流,序列化是將對象狀態(tài)轉(zhuǎn)換為字節(jié)流的過程,反序列化則是將字節(jié)流恢復(fù)為對象,本文介紹了ObjectOutputStream的原理、主要方法、使用步驟以及注意事項
    2024-09-09
  • Java 泛型實例詳解

    Java 泛型實例詳解

    本文主要介紹Java 泛型的知識,這里給代碼實例對Java 泛型深度理解,有需要的朋友可以看下
    2016-07-07
  • Spring中的@Async原理分析

    Spring中的@Async原理分析

    這篇文章主要介紹了Spring中的@Async原理分析,自定義new ThreadPoolExecutor并調(diào)用invokeAll等進行并發(fā)編程,后面發(fā)現(xiàn)只要在方法上添加@Async注解,并使用@EnableAsync進行開啟默認會使用SimpleAsyncTaskExecutor類型,需要的朋友可以參考下
    2024-01-01
  • spring webflux自定義netty 參數(shù)解析

    spring webflux自定義netty 參數(shù)解析

    這篇文章主要介紹了spring webflux自定義netty 參數(shù)解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java注解結(jié)合aspectj AOP進行日志打印的操作

    java注解結(jié)合aspectj AOP進行日志打印的操作

    這篇文章主要介紹了java注解結(jié)合aspectj AOP進行日志打印的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • 解決Springboot集成Redis集群配置公網(wǎng)IP連接報私網(wǎng)IP連接失敗問題

    解決Springboot集成Redis集群配置公網(wǎng)IP連接報私網(wǎng)IP連接失敗問題

    在Springboot 集成 Redis集群配置公網(wǎng)IP連接報私網(wǎng)IP連接失敗,一直報私有IP連接失敗,所以本文小編給大家介紹了如何解決報錯問題,如果有遇到相同問題的同學(xué),可以參考閱讀本文
    2023-10-10

最新評論