欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解決RocketMQ的冪等性問題

 更新時(shí)間:2025年07月29日 11:04:01   作者:鏘鏘忒  
重復(fù)消費(fèi)因調(diào)用鏈路長、消息發(fā)送超時(shí)或消費(fèi)者故障導(dǎo)致,通過生產(chǎn)者消息查詢、Redis緩存及消費(fèi)者唯一主鍵可以確保冪等性,避免重復(fù)處理,本文主要介紹了解決RocketMQ的冪等性問題,感興趣的可以了解一下

造成重復(fù)消費(fèi)的原因

  • 當(dāng)系統(tǒng)的調(diào)用鏈路比較長的時(shí)候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時(shí)候,如果系統(tǒng)B處理成功,但是遲遲沒有將調(diào)用成功的結(jié)果返回給系統(tǒng)A的時(shí)候,系統(tǒng)A就會(huì)嘗試重新發(fā)起請(qǐng)求給系統(tǒng)B,造成系統(tǒng)B重復(fù)處理,發(fā)起多條消息給RocketMQ造成重復(fù)消費(fèi);
  • 在系統(tǒng)B發(fā)送給RocketMQ的時(shí)候,也有可能會(huì)發(fā)生和上面一樣的問題,消息發(fā)送超時(shí),結(jié)果系統(tǒng)B重試,導(dǎo)致RocketMQ接收到了重讀消息;
  • 當(dāng)RocketMQ成功接收到消息,并將消息交給消費(fèi)者處理,如果消費(fèi)者消費(fèi)完成后還沒來得及提交CONSUME_SUCCESS給RocketMQ,自己宕機(jī)或者重啟了,那么RocketMQ沒有接收到CONSUME_SUCCESS,就會(huì)認(rèn)為消費(fèi)失敗了,會(huì)重發(fā)消息給消費(fèi)者再次消費(fèi);

通過冪等性來保證,只要保證重復(fù)消息不對(duì)結(jié)果產(chǎn)生影響,就完美地解決這個(gè)問題。

解決方法

生產(chǎn)者端

  • RocketMQ支持消息查詢的功能,只要去RocketMQ查詢一下是否已經(jīng)發(fā)送過該條消息就可以了,不存在則發(fā)送,存在則不發(fā)送,也就是message.setKeys();
  • 引入Redis,在發(fā)送消息到RocketMQ成功之后,向Redis中插入一條數(shù)據(jù),如果發(fā)送重試,則先去Redis查詢一個(gè)該條消息是否已經(jīng)發(fā)送過了,存在的話就不重復(fù)發(fā)送消息了;

缺點(diǎn)
方法一:RocketMQ消息查詢的性能不是特別好,如果在高并發(fā)的場(chǎng)景下,每條消息在發(fā)送到RocketMQ時(shí)都去查詢一下,可能會(huì)影響接口的性能

方法二:在一些極端的場(chǎng)景下,Redis也無法保證消息發(fā)送成功之后,就一定能寫入Redis成功,比如寫入消息成功而Redis此時(shí)宕機(jī),那么再次查詢Redis判斷消息是否已經(jīng)發(fā)送過,是無法得到正確結(jié)果的;

消費(fèi)者端

  1. 建立一個(gè)消息表,拿到這個(gè)消息做數(shù)據(jù)庫的insert操作。給這個(gè)消息做一個(gè)唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復(fù)消費(fèi)的情況,就會(huì)導(dǎo)致主鍵沖突。
  2. 拿到這個(gè)消息做redis的set的操作.redis就是天然冪等性

代碼實(shí)現(xiàn)

方式一:
生產(chǎn)者

public class MQProducer {
    public static void main(String[] args) throws MQClientException {
        //創(chuàng)建生產(chǎn)者
        DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
        //設(shè)置NameServer地址
        producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
        //設(shè)置生產(chǎn)者實(shí)例名稱
        producer.setInstanceName("producer");
        //啟動(dòng)生產(chǎn)者
        producer.start();
        try {
            //發(fā)送消息
            for (int i=0;i<1;i++){
                Thread.sleep(1000); //每秒發(fā)送一次
                //創(chuàng)建消息
                Message msg = new Message("wn04", // topic 主題名稱
                        "TagA", // tag 臨時(shí)值
                        ("w-"+i).getBytes()// body 內(nèi)容
                );
                //消息的唯一標(biāo)識(shí)
                msg.setKeys(System.currentTimeMillis() + "");
                //發(fā)送消息
                SendResult sendResult=producer.send(msg);
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }

}

消費(fèi)者端:

public class MQConsumer {

    //保存標(biāo)識(shí)的集合
    static private Map<String, String> logMap = new HashMap<>();

    public static void main(String[] args) throws MQClientException {
        //創(chuàng)建消費(fèi)者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //設(shè)置NameServer地址
        consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
        //設(shè)置消費(fèi)者實(shí)例名稱
        consumer.setInstanceName("consumer");
        //訂閱topic
        consumer.subscribe("wn04","TagA");
        //監(jiān)聽消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                String key = null;
                String msgId = null;
                try {
                    for (MessageExt msg : list) {
                        key = msg.getKeys();
                        //判斷集合當(dāng)中有沒有存在key,存在就不需要重試,不存在先存key再回來重試后消費(fèi)消息
                        if (logMap.containsKey(key)) {
                            // 無需繼續(xù)重試。
                            System.out.println("key:"+key+",無需重試...");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        msgId = msg.getMsgId();
                        System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                        //模擬異常
                        int i = 1 / 0;
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //重試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                } finally {
                    //保存key
                    logMap.put(key, msgId);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started...");

    }
}

到此這篇關(guān)于解決RocketMQ的冪等性問題的文章就介紹到這了,更多相關(guān)RocketMQ 冪等性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • IDEA引入本地jar包的幾種方法

    IDEA引入本地jar包的幾種方法

    本文主要介紹了IDEA引入本地jar包的幾種方法,文中通過圖文結(jié)合的方式碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧
    2024-01-01
  • 關(guān)于Spring?Ioc和DI注解的問題

    關(guān)于Spring?Ioc和DI注解的問題

    這篇文章主要介紹了Spring?Ioc和DI注解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-03-03
  • JAVA進(jìn)程突然消失問題解決方案

    JAVA進(jìn)程突然消失問題解決方案

    這篇文章主要介紹了JAVA進(jìn)程突然消失問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片

    java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片

    線程調(diào)度器和時(shí)間分片是多線程編程和操作系統(tǒng)設(shè)計(jì)中的核心概念,本文主要介紹了java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-10-10
  • 使用Idea maven創(chuàng)建Spring項(xiàng)目過程圖解

    使用Idea maven創(chuàng)建Spring項(xiàng)目過程圖解

    這篇文章主要介紹了使用Idea maven創(chuàng)建Spring項(xiàng)目過程圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • Java 中String StringBuilder 與 StringBuffer詳解及用法實(shí)例

    Java 中String StringBuilder 與 StringBuffer詳解及用法實(shí)例

    這篇文章主要介紹了Java 中String StringBuilder 與 StringBuffer詳解及用法實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • 使用springboot 獲取控制器參數(shù)的幾種方法小結(jié)

    使用springboot 獲取控制器參數(shù)的幾種方法小結(jié)

    這篇文章主要介紹了使用springboot 獲取控制器參數(shù)的幾種方法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java爬蟲爬取漫畫示例

    Java爬蟲爬取漫畫示例

    這篇文章主要介紹了Java爬蟲爬取漫畫示例,大部分的爬蟲入門教學(xué)都是爬取圖片的,本文就來測(cè)試一下爬取網(wǎng)站的漫畫,需要的朋友可以參考下
    2023-04-04
  • Java壓縮/解壓文件的實(shí)現(xiàn)代碼

    Java壓縮/解壓文件的實(shí)現(xiàn)代碼

    本文通過實(shí)例代碼給大家分享了Java壓縮/解壓文件的方法,需要的朋友參考下吧
    2017-09-09
  • SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實(shí)現(xiàn)

    SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實(shí)現(xiàn)

    這篇文章主要介紹了SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11

最新評(píng)論