解決RocketMQ的冪等性問題
造成重復消費的原因
- 當系統(tǒng)的調(diào)用鏈路比較長的時候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時候,如果系統(tǒng)B處理成功,但是遲遲沒有將調(diào)用成功的結(jié)果返回給系統(tǒng)A的時候,系統(tǒng)A就會嘗試重新發(fā)起請求給系統(tǒng)B,造成系統(tǒng)B重復處理,發(fā)起多條消息給RocketMQ造成重復消費;
- 在系統(tǒng)B發(fā)送給RocketMQ的時候,也有可能會發(fā)生和上面一樣的問題,消息發(fā)送超時,結(jié)果系統(tǒng)B重試,導致RocketMQ接收到了重讀消息;
- 當RocketMQ成功接收到消息,并將消息交給消費者處理,如果消費者消費完成后還沒來得及提交CONSUME_SUCCESS給RocketMQ,自己宕機或者重啟了,那么RocketMQ沒有接收到CONSUME_SUCCESS,就會認為消費失敗了,會重發(fā)消息給消費者再次消費;

通過冪等性來保證,只要保證重復消息不對結(jié)果產(chǎn)生影響,就完美地解決這個問題。
解決方法
生產(chǎn)者端
- RocketMQ支持消息查詢的功能,只要去RocketMQ查詢一下是否已經(jīng)發(fā)送過該條消息就可以了,不存在則發(fā)送,存在則不發(fā)送,也就是message.setKeys();
- 引入Redis,在發(fā)送消息到RocketMQ成功之后,向Redis中插入一條數(shù)據(jù),如果發(fā)送重試,則先去Redis查詢一個該條消息是否已經(jīng)發(fā)送過了,存在的話就不重復發(fā)送消息了;

缺點
方法一:RocketMQ消息查詢的性能不是特別好,如果在高并發(fā)的場景下,每條消息在發(fā)送到RocketMQ時都去查詢一下,可能會影響接口的性能;
方法二:在一些極端的場景下,Redis也無法保證消息發(fā)送成功之后,就一定能寫入Redis成功,比如寫入消息成功而Redis此時宕機,那么再次查詢Redis判斷消息是否已經(jīng)發(fā)送過,是無法得到正確結(jié)果的;
消費者端
- 建立一個消息表,拿到這個消息做數(shù)據(jù)庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復消費的情況,就會導致主鍵沖突。
- 拿到這個消息做redis的set的操作.redis就是天然冪等性
代碼實現(xiàn)
方式一:
生產(chǎn)者
public class MQProducer {
public static void main(String[] args) throws MQClientException {
//創(chuàng)建生產(chǎn)者
DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
//設(shè)置NameServer地址
producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
//設(shè)置生產(chǎn)者實例名稱
producer.setInstanceName("producer");
//啟動生產(chǎn)者
producer.start();
try {
//發(fā)送消息
for (int i=0;i<1;i++){
Thread.sleep(1000); //每秒發(fā)送一次
//創(chuàng)建消息
Message msg = new Message("wn04", // topic 主題名稱
"TagA", // tag 臨時值
("w-"+i).getBytes()// body 內(nèi)容
);
//消息的唯一標識
msg.setKeys(System.currentTimeMillis() + "");
//發(fā)送消息
SendResult sendResult=producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消費者端:
public class MQConsumer {
//保存標識的集合
static private Map<String, String> logMap = new HashMap<>();
public static void main(String[] args) throws MQClientException {
//創(chuàng)建消費者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
//設(shè)置NameServer地址
consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
//設(shè)置消費者實例名稱
consumer.setInstanceName("consumer");
//訂閱topic
consumer.subscribe("wn04","TagA");
//監(jiān)聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
String key = null;
String msgId = null;
try {
for (MessageExt msg : list) {
key = msg.getKeys();
//判斷集合當中有沒有存在key,存在就不需要重試,不存在先存key再回來重試后消費消息
if (logMap.containsKey(key)) {
// 無需繼續(xù)重試。
System.out.println("key:"+key+",無需重試...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgId = msg.getMsgId();
System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
//模擬異常
int i = 1 / 0;
}
} catch (Exception e) {
//e.printStackTrace();
//重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} finally {
//保存key
logMap.put(key, msgId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started...");
}
}
到此這篇關(guān)于解決RocketMQ的冪等性問題的文章就介紹到這了,更多相關(guān)RocketMQ 冪等性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Idea maven創(chuàng)建Spring項目過程圖解
這篇文章主要介紹了使用Idea maven創(chuàng)建Spring項目過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02
Java 中String StringBuilder 與 StringBuffer詳解及用法實例
這篇文章主要介紹了Java 中String StringBuilder 與 StringBuffer詳解及用法實例的相關(guān)資料,需要的朋友可以參考下2017-02-02
使用springboot 獲取控制器參數(shù)的幾種方法小結(jié)
這篇文章主要介紹了使用springboot 獲取控制器參數(shù)的幾種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實現(xiàn)
這篇文章主要介紹了SpringBoot整合JavaMail通過阿里云企業(yè)郵箱發(fā)送郵件的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-11-11

