欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解決RocketMQ的冪等性問題

 更新時間:2025年07月29日 11:04:01   作者:鏘鏘忒  
重復消費因調(diào)用鏈路長、消息發(fā)送超時或消費者故障導致,通過生產(chǎn)者消息查詢、Redis緩存及消費者唯一主鍵可以確保冪等性,避免重復處理,本文主要介紹了解決RocketMQ的冪等性問題,感興趣的可以了解一下

造成重復消費的原因

  • 當系統(tǒng)的調(diào)用鏈路比較長的時候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時候,如果系統(tǒng)B處理成功,但是遲遲沒有將調(diào)用成功的結(jié)果返回給系統(tǒng)A的時候,系統(tǒng)A就會嘗試重新發(fā)起請求給系統(tǒng)B,造成系統(tǒng)B重復處理,發(fā)起多條消息給RocketMQ造成重復消費;
  • 在系統(tǒng)B發(fā)送給RocketMQ的時候,也有可能會發(fā)生和上面一樣的問題,消息發(fā)送超時,結(jié)果系統(tǒng)B重試,導致RocketMQ接收到了重讀消息;
  • 當RocketMQ成功接收到消息,并將消息交給消費者處理,如果消費者消費完成后還沒來得及提交CONSUME_SUCCESS給RocketMQ,自己宕機或者重啟了,那么RocketMQ沒有接收到CONSUME_SUCCESS,就會認為消費失敗了,會重發(fā)消息給消費者再次消費;

通過冪等性來保證,只要保證重復消息不對結(jié)果產(chǎn)生影響,就完美地解決這個問題。

解決方法

生產(chǎn)者端

  • RocketMQ支持消息查詢的功能,只要去RocketMQ查詢一下是否已經(jīng)發(fā)送過該條消息就可以了,不存在則發(fā)送,存在則不發(fā)送,也就是message.setKeys();
  • 引入Redis,在發(fā)送消息到RocketMQ成功之后,向Redis中插入一條數(shù)據(jù),如果發(fā)送重試,則先去Redis查詢一個該條消息是否已經(jīng)發(fā)送過了,存在的話就不重復發(fā)送消息了;

缺點
方法一:RocketMQ消息查詢的性能不是特別好,如果在高并發(fā)的場景下,每條消息在發(fā)送到RocketMQ時都去查詢一下,可能會影響接口的性能;

方法二:在一些極端的場景下,Redis也無法保證消息發(fā)送成功之后,就一定能寫入Redis成功,比如寫入消息成功而Redis此時宕機,那么再次查詢Redis判斷消息是否已經(jīng)發(fā)送過,是無法得到正確結(jié)果的;

消費者端

  1. 建立一個消息表,拿到這個消息做數(shù)據(jù)庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復消費的情況,就會導致主鍵沖突。
  2. 拿到這個消息做redis的set的操作.redis就是天然冪等性

代碼實現(xiàn)

方式一:
生產(chǎn)者

public class MQProducer {
    public static void main(String[] args) throws MQClientException {
        //創(chuàng)建生產(chǎn)者
        DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
        //設(shè)置NameServer地址
        producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
        //設(shè)置生產(chǎn)者實例名稱
        producer.setInstanceName("producer");
        //啟動生產(chǎn)者
        producer.start();
        try {
            //發(fā)送消息
            for (int i=0;i<1;i++){
                Thread.sleep(1000); //每秒發(fā)送一次
                //創(chuàng)建消息
                Message msg = new Message("wn04", // topic 主題名稱
                        "TagA", // tag 臨時值
                        ("w-"+i).getBytes()// body 內(nèi)容
                );
                //消息的唯一標識
                msg.setKeys(System.currentTimeMillis() + "");
                //發(fā)送消息
                SendResult sendResult=producer.send(msg);
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }

}

消費者端:

public class MQConsumer {

    //保存標識的集合
    static private Map<String, String> logMap = new HashMap<>();

    public static void main(String[] args) throws MQClientException {
        //創(chuàng)建消費者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //設(shè)置NameServer地址
        consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
        //設(shè)置消費者實例名稱
        consumer.setInstanceName("consumer");
        //訂閱topic
        consumer.subscribe("wn04","TagA");
        //監(jiān)聽消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                String key = null;
                String msgId = null;
                try {
                    for (MessageExt msg : list) {
                        key = msg.getKeys();
                        //判斷集合當中有沒有存在key,存在就不需要重試,不存在先存key再回來重試后消費消息
                        if (logMap.containsKey(key)) {
                            // 無需繼續(xù)重試。
                            System.out.println("key:"+key+",無需重試...");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        msgId = msg.getMsgId();
                        System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                        //模擬異常
                        int i = 1 / 0;
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //重試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                } finally {
                    //保存key
                    logMap.put(key, msgId);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started...");

    }
}

到此這篇關(guān)于解決RocketMQ的冪等性問題的文章就介紹到這了,更多相關(guān)RocketMQ 冪等性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • IDEA引入本地jar包的幾種方法

    IDEA引入本地jar包的幾種方法

    本文主要介紹了IDEA引入本地jar包的幾種方法,文中通過圖文結(jié)合的方式碼介紹的非常詳細,對大家的學習或工作有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧
    2024-01-01
  • 關(guān)于Spring?Ioc和DI注解的問題

    關(guān)于Spring?Ioc和DI注解的問題

    這篇文章主要介紹了Spring?Ioc和DI注解,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-03-03
  • JAVA進程突然消失問題解決方案

    JAVA進程突然消失問題解決方案

    這篇文章主要介紹了JAVA進程突然消失問題解決方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-03-03
  • java實現(xiàn)線程調(diào)度器和時間分片

    java實現(xiàn)線程調(diào)度器和時間分片

    線程調(diào)度器和時間分片是多線程編程和操作系統(tǒng)設(shè)計中的核心概念,本文主要介紹了java實現(xiàn)線程調(diào)度器和時間分片,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-10-10
  • 使用Idea maven創(chuàng)建Spring項目過程圖解

    使用Idea maven創(chuàng)建Spring項目過程圖解

    這篇文章主要介紹了使用Idea maven創(chuàng)建Spring項目過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-02-02
  • Java 中String StringBuilder 與 StringBuffer詳解及用法實例

    Java 中String StringBuilder 與 StringBuffer詳解及用法實例

    這篇文章主要介紹了Java 中String StringBuilder 與 StringBuffer詳解及用法實例的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • 使用springboot 獲取控制器參數(shù)的幾種方法小結(jié)

    使用springboot 獲取控制器參數(shù)的幾種方法小結(jié)

    這篇文章主要介紹了使用springboot 獲取控制器參數(shù)的幾種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java爬蟲爬取漫畫示例

    Java爬蟲爬取漫畫示例

    這篇文章主要介紹了Java爬蟲爬取漫畫示例,大部分的爬蟲入門教學都是爬取圖片的,本文就來測試一下爬取網(wǎng)站的漫畫,需要的朋友可以參考下
    2023-04-04
  • Java壓縮/解壓文件的實現(xiàn)代碼

    Java壓縮/解壓文件的實現(xiàn)代碼

    本文通過實例代碼給大家分享了Java壓縮/解壓文件的方法,需要的朋友參考下吧
    2017-09-09
  • SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實現(xiàn)

    SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實現(xiàn)

    這篇文章主要介紹了SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11

最新評論