解決RocketMQ的冪等性問題
造成重復(fù)消費(fèi)的原因
- 當(dāng)系統(tǒng)的調(diào)用鏈路比較長的時(shí)候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時(shí)候,如果系統(tǒng)B處理成功,但是遲遲沒有將調(diào)用成功的結(jié)果返回給系統(tǒng)A的時(shí)候,系統(tǒng)A就會(huì)嘗試重新發(fā)起請(qǐng)求給系統(tǒng)B,造成系統(tǒng)B重復(fù)處理,發(fā)起多條消息給RocketMQ造成重復(fù)消費(fèi);
- 在系統(tǒng)B發(fā)送給RocketMQ的時(shí)候,也有可能會(huì)發(fā)生和上面一樣的問題,消息發(fā)送超時(shí),結(jié)果系統(tǒng)B重試,導(dǎo)致RocketMQ接收到了重讀消息;
- 當(dāng)RocketMQ成功接收到消息,并將消息交給消費(fèi)者處理,如果消費(fèi)者消費(fèi)完成后還沒來得及提交CONSUME_SUCCESS給RocketMQ,自己宕機(jī)或者重啟了,那么RocketMQ沒有接收到CONSUME_SUCCESS,就會(huì)認(rèn)為消費(fèi)失敗了,會(huì)重發(fā)消息給消費(fèi)者再次消費(fèi);
通過冪等性來保證,只要保證重復(fù)消息不對(duì)結(jié)果產(chǎn)生影響,就完美地解決這個(gè)問題。
解決方法
生產(chǎn)者端
- RocketMQ支持消息查詢的功能,只要去RocketMQ查詢一下是否已經(jīng)發(fā)送過該條消息就可以了,不存在則發(fā)送,存在則不發(fā)送,也就是message.setKeys();
- 引入Redis,在發(fā)送消息到RocketMQ成功之后,向Redis中插入一條數(shù)據(jù),如果發(fā)送重試,則先去Redis查詢一個(gè)該條消息是否已經(jīng)發(fā)送過了,存在的話就不重復(fù)發(fā)送消息了;
缺點(diǎn)
方法一:RocketMQ消息查詢的性能不是特別好,如果在高并發(fā)的場(chǎng)景下,每條消息在發(fā)送到RocketMQ時(shí)都去查詢一下,可能會(huì)影響接口的性能;
方法二:在一些極端的場(chǎng)景下,Redis也無法保證消息發(fā)送成功之后,就一定能寫入Redis成功,比如寫入消息成功而Redis此時(shí)宕機(jī),那么再次查詢Redis判斷消息是否已經(jīng)發(fā)送過,是無法得到正確結(jié)果的;
消費(fèi)者端
- 建立一個(gè)消息表,拿到這個(gè)消息做數(shù)據(jù)庫的insert操作。給這個(gè)消息做一個(gè)唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復(fù)消費(fèi)的情況,就會(huì)導(dǎo)致主鍵沖突。
- 拿到這個(gè)消息做redis的set的操作.redis就是天然冪等性
代碼實(shí)現(xiàn)
方式一:
生產(chǎn)者
public class MQProducer { public static void main(String[] args) throws MQClientException { //創(chuàng)建生產(chǎn)者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //設(shè)置NameServer地址 producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876"); //設(shè)置生產(chǎn)者實(shí)例名稱 producer.setInstanceName("producer"); //啟動(dòng)生產(chǎn)者 producer.start(); try { //發(fā)送消息 for (int i=0;i<1;i++){ Thread.sleep(1000); //每秒發(fā)送一次 //創(chuàng)建消息 Message msg = new Message("wn04", // topic 主題名稱 "TagA", // tag 臨時(shí)值 ("w-"+i).getBytes()// body 內(nèi)容 ); //消息的唯一標(biāo)識(shí) msg.setKeys(System.currentTimeMillis() + ""); //發(fā)送消息 SendResult sendResult=producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
消費(fèi)者端:
public class MQConsumer { //保存標(biāo)識(shí)的集合 static private Map<String, String> logMap = new HashMap<>(); public static void main(String[] args) throws MQClientException { //創(chuàng)建消費(fèi)者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //設(shè)置NameServer地址 consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876"); //設(shè)置消費(fèi)者實(shí)例名稱 consumer.setInstanceName("consumer"); //訂閱topic consumer.subscribe("wn04","TagA"); //監(jiān)聽消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { String key = null; String msgId = null; try { for (MessageExt msg : list) { key = msg.getKeys(); //判斷集合當(dāng)中有沒有存在key,存在就不需要重試,不存在先存key再回來重試后消費(fèi)消息 if (logMap.containsKey(key)) { // 無需繼續(xù)重試。 System.out.println("key:"+key+",無需重試..."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } msgId = msg.getMsgId(); System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody())); //模擬異常 int i = 1 / 0; } } catch (Exception e) { //e.printStackTrace(); //重試 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { //保存key logMap.put(key, msgId); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started..."); } }
到此這篇關(guān)于解決RocketMQ的冪等性問題的文章就介紹到這了,更多相關(guān)RocketMQ 冪等性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片
線程調(diào)度器和時(shí)間分片是多線程編程和操作系統(tǒng)設(shè)計(jì)中的核心概念,本文主要介紹了java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-10-10使用Idea maven創(chuàng)建Spring項(xiàng)目過程圖解
這篇文章主要介紹了使用Idea maven創(chuàng)建Spring項(xiàng)目過程圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02Java 中String StringBuilder 與 StringBuffer詳解及用法實(shí)例
這篇文章主要介紹了Java 中String StringBuilder 與 StringBuffer詳解及用法實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-02-02使用springboot 獲取控制器參數(shù)的幾種方法小結(jié)
這篇文章主要介紹了使用springboot 獲取控制器參數(shù)的幾種方法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11