RabbitMQ 如何解決消息冪等性的問題
前言
關于MQ消費者的冪等性問題,在于MQ的重試機制,因為網絡原因或客戶端延遲消費導致重復消費。使用MQ重試機制需要注意的事項以及如何解決消費者冪等性問題以下將逐一講解。
1. RabbitMQ自動重試機制
消費者在消費消息的時候,如果消費者業(yè)務邏輯出現(xiàn)程序異常,這個時候我們如何處理?
使用重試機制,RabbitMQ默認開啟重試機制。
實現(xiàn)原理:
- @RabbitHandler注解 底層使用Aop攔截,如果程序(消費者)沒有拋出異常,自動提交事務
- 如果Aop使用異常通知攔截獲取到異常后,自動實現(xiàn)補償機制,消息緩存在RabbitMQ服務器端
注意:
- 默認會一直重試到消費者不拋異常為止,這樣顯然不好。我們需要修改重試機制策略,如間隔3s重試一次)
配置:
spring: rabbitmq: # 連接地址 host: 127.0.0.1 # 端口號 port: 5672 # 賬號 username: guest # 密碼 password: guest # 地址(類似于數(shù)據(jù)庫的概念) virtual-host: /admin_vhost # 消費者監(jiān)聽相關配置 listener: simple: retry: # 開啟消費者(程序出現(xiàn)異常)重試機制,默認開啟并一直重試 enabled: true # 最大重試次數(shù) max-attempts: 5 # 重試間隔時間(毫秒) initial-interval: 3000
2. 如何合理選擇重試機制?
情況1: 消費者獲取到消息后,調用第三方接口,但接口暫時無法訪問,是否需要重試? 需要重試,可能是因為網絡原因短暫不能訪問
情況2: 消費者獲取到消息后,拋出數(shù)據(jù)轉換異常,是否需要重試? 不需要重試,因為屬于程序bug需要重新發(fā)布版本
總結:對于情況2,如果消費者代碼拋出異常是需要發(fā)布新版本才能解決的問題,那么不需要重試,重試也無濟于事。應該采用日志記錄+定時任務job進行健康檢查+人工進行補償
3. 調用第三方接口自動實現(xiàn)補償機制
我們知道了,RabbitMQ在消費者消費發(fā)生異常時,會自動進行補償機制,所以我們(消費者)在調用第三方接口時,可以根據(jù)返回結果判斷是否成功:
- 成功:正常消費
- 失?。菏謩訏佁幰粋€異常,這時RabbitMQ自動給我們做重試 (補償)。
4. 如何解決消費者冪等性問題
防止重復消費 (MQ重試機制需要注意的問題)
產生原因:網絡延遲傳輸中,消費者出現(xiàn)異常或者消費者延遲消費,會造成進行MQ重試補償,在重試過程中,可能會造成重復消費。
面試題:MQ中消費者如何保證冪等性問題,不被重復消費?
偽代碼:
生產者核心代碼:
請求頭設置消息id(messageId)
@Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { String msg = "my_fanout_msg:" + System.currentTimeMillis(); //請求頭設置消息id(messageId) Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); amqpTemplate.convertAndSend(queueName, message); } }
消費者核心代碼:
@RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { // 獲取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); //② 判斷唯一Id是否被消費,消息消費成功后將id和狀態(tài)保存在日志表中,我們從(①步驟)表中獲取并判斷messageId的狀態(tài)即可 //從redis中獲取messageId的value String value = redisUtils.get(messageId)+""; if(value.equals("1") ){ //表示已經消費 return; //結束 } System.out.println("郵件消費者獲取生產者消息" + "messageId:" + messageId + ",消息內容:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 獲取email參數(shù) String email = jsonObject.getString("email"); // 請求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因為網絡原因,造成無法訪問,繼續(xù)重試 throw new Exception("調用接口失敗!"); } System.out.println("執(zhí)行結束...."); //① 執(zhí)行到這里已經消費成功,我們可以修改messageId的狀態(tài),并存入日志表(可以存到redis中,key為消息Id、value為狀態(tài)) }
5. SpringBoot整合RabbitMQ應答模式(ACK)
1.修改配置simple下添加 acknowledge-mode: manual:
spring: rabbitmq: # 連接地址 host: 127.0.0.1 # 端口號 port: 5672 # 賬號 username: guest # 密碼 password: guest # 地址(類似于數(shù)據(jù)庫的概念) virtual-host: /admin_vhost # 消費者監(jiān)聽相關配置 listener: simple: retry: # 開啟消費者(程序出現(xiàn)異常)重試機制,默認開啟并一直重試 enabled: true # 最大重試次數(shù) max-attempts: 5 # 重試間隔時間(毫秒) initial-interval: 3000 # 開啟手動ack acknowledge-mode: manual
2.消費者增加代碼:
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手動ack channel.basicAck(deliveryTag, false);手動簽收
//郵件隊列 @Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { System.out .println(Thread.currentThread().getName() + ",郵件消費者獲取生產者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // 手動ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收 channel.basicAck(deliveryTag, false); } }
RabbitMQ 如何保證冪等性,數(shù)據(jù)一致性
mq的作用主要是用來解耦,削峰,異步,
增加MQ,系統(tǒng)的復雜性也會增加很多,
也會帶來其他的問題,比如MQ掛了怎么辦,怎么保持數(shù)據(jù)的冪等性
冪等性問題通俗點講就是保證數(shù)據(jù)不被重復消費,同時數(shù)據(jù)也不能少,
也就是數(shù)據(jù)一致性問題。
下面是MQ丟失的3種情況
1,生產者發(fā)送消息至MQ的數(shù)據(jù)丟失
解決方法:在生產者端開啟comfirm 確認模式,你每次寫的消息都會分配一個唯一的 id,
然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了
2,MQ收到消息,暫存內存中,還沒消費,自己掛掉,數(shù)據(jù)會都丟失
解決方式:MQ設置為持久化。將內存數(shù)據(jù)持久化到磁盤中
3,消費者剛拿到消息,還沒處理,掛掉了,MQ又以為消費者處理完
解決方式:用 RabbitMQ 提供的 ack 機制,簡單來說,就是你必須關閉 RabbitMQ 的自動 ack,可以通過一個 api 來調用就行,然后每次你自己代碼里確保處理完的時候,再在程序里 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。
數(shù)據(jù)重復的問題簡單的多,就是在消費端判斷數(shù)據(jù)是否已經被消費過
- 比如你拿個數(shù)據(jù)要寫庫,你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
- 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發(fā)送每條數(shù)據(jù)的時候,里面加一個全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費到了之后,先根據(jù)這個 id 去比如 Redis 里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個 id 寫 Redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。
- 比如基于數(shù)據(jù)庫的唯一鍵來保證重復數(shù)據(jù)不會重復插入多條。因為有唯一鍵約束了,重復數(shù)據(jù)插入只會報錯,不會導致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Spring Cloud Zuul路由網關服務過濾實現(xiàn)代碼
這篇文章主要介紹了Spring Cloud Zuul路由網關服務過濾實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-04-04Java如何將Excel數(shù)據(jù)導入到數(shù)據(jù)庫
這篇文章主要為大家詳細介紹了Java將Excel數(shù)據(jù)導入到數(shù)據(jù)庫的方法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-10-10SpringBoot中分頁插件PageHelper的使用詳解
分頁查詢是為了高效展示大量數(shù)據(jù),通過分頁將數(shù)據(jù)劃分為多個部分逐頁展示,原生方法需手動計算數(shù)據(jù)起始行,而使用PageHelper插件則簡化這一過程,本文給大家介紹SpringBoot中分頁插件PageHelper的使用,感興趣的朋友一起看看吧2024-09-09java實現(xiàn)利用String類的簡單方法讀取xml文件中某個標簽中的內容
下面小編就為大家?guī)硪黄猨ava實現(xiàn)利用String類的簡單方法讀取xml文件中某個標簽中的內容。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-12-12Java創(chuàng)建型設計模式之工廠方法模式深入詳解
工廠方法模式(FACTORY METHOD)是一種常用的類創(chuàng)建型設計模式,此模式的核心精神是封裝類中變化的部分,提取其中個性化善變的部分為獨立類,通過依賴注入以達到解耦、復用和方便后期維護拓展的目的。它的核心結構有四個角色,分別是抽象工廠、具體工廠、抽象產品、具體產品2022-09-09