RabbitMQ 如何解決消息冪等性的問(wèn)題
前言
關(guān)于MQ消費(fèi)者的冪等性問(wèn)題,在于MQ的重試機(jī)制,因?yàn)榫W(wǎng)絡(luò)原因或客戶端延遲消費(fèi)導(dǎo)致重復(fù)消費(fèi)。使用MQ重試機(jī)制需要注意的事項(xiàng)以及如何解決消費(fèi)者冪等性問(wèn)題以下將逐一講解。
1. RabbitMQ自動(dòng)重試機(jī)制
消費(fèi)者在消費(fèi)消息的時(shí)候,如果消費(fèi)者業(yè)務(wù)邏輯出現(xiàn)程序異常,這個(gè)時(shí)候我們?nèi)绾翁幚恚?/p>
使用重試機(jī)制,RabbitMQ默認(rèn)開(kāi)啟重試機(jī)制。
實(shí)現(xiàn)原理:
- @RabbitHandler注解 底層使用Aop攔截,如果程序(消費(fèi)者)沒(méi)有拋出異常,自動(dòng)提交事務(wù)
- 如果Aop使用異常通知攔截獲取到異常后,自動(dòng)實(shí)現(xiàn)補(bǔ)償機(jī)制,消息緩存在RabbitMQ服務(wù)器端
注意:
- 默認(rèn)會(huì)一直重試到消費(fèi)者不拋異常為止,這樣顯然不好。我們需要修改重試機(jī)制策略,如間隔3s重試一次)
配置:
spring:
rabbitmq:
# 連接地址
host: 127.0.0.1
# 端口號(hào)
port: 5672
# 賬號(hào)
username: guest
# 密碼
password: guest
# 地址(類似于數(shù)據(jù)庫(kù)的概念)
virtual-host: /admin_vhost
# 消費(fèi)者監(jiān)聽(tīng)相關(guān)配置
listener:
simple:
retry:
# 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開(kāi)啟并一直重試
enabled: true
# 最大重試次數(shù)
max-attempts: 5
# 重試間隔時(shí)間(毫秒)
initial-interval: 3000
2. 如何合理選擇重試機(jī)制?
情況1: 消費(fèi)者獲取到消息后,調(diào)用第三方接口,但接口暫時(shí)無(wú)法訪問(wèn),是否需要重試? 需要重試,可能是因?yàn)榫W(wǎng)絡(luò)原因短暫不能訪問(wèn)
情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試? 不需要重試,因?yàn)閷儆诔绦騜ug需要重新發(fā)布版本
總結(jié):對(duì)于情況2,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問(wèn)題,那么不需要重試,重試也無(wú)濟(jì)于事。應(yīng)該采用日志記錄+定時(shí)任務(wù)job進(jìn)行健康檢查+人工進(jìn)行補(bǔ)償
3. 調(diào)用第三方接口自動(dòng)實(shí)現(xiàn)補(bǔ)償機(jī)制
我們知道了,RabbitMQ在消費(fèi)者消費(fèi)發(fā)生異常時(shí),會(huì)自動(dòng)進(jìn)行補(bǔ)償機(jī)制,所以我們(消費(fèi)者)在調(diào)用第三方接口時(shí),可以根據(jù)返回結(jié)果判斷是否成功:
- 成功:正常消費(fèi)
- 失?。菏謩?dòng)拋處一個(gè)異常,這時(shí)RabbitMQ自動(dòng)給我們做重試 (補(bǔ)償)。
4. 如何解決消費(fèi)者冪等性問(wèn)題
防止重復(fù)消費(fèi) (MQ重試機(jī)制需要注意的問(wèn)題)
產(chǎn)生原因:網(wǎng)絡(luò)延遲傳輸中,消費(fèi)者出現(xiàn)異?;蛘呦M(fèi)者延遲消費(fèi),會(huì)造成進(jìn)行MQ重試補(bǔ)償,在重試過(guò)程中,可能會(huì)造成重復(fù)消費(fèi)。
面試題:MQ中消費(fèi)者如何保證冪等性問(wèn)題,不被重復(fù)消費(fèi)?

偽代碼:
生產(chǎn)者核心代碼:
請(qǐng)求頭設(shè)置消息id(messageId)
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
//請(qǐng)求頭設(shè)置消息id(messageId)
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
}
消費(fèi)者核心代碼:
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message) throws Exception {
// 獲取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
//② 判斷唯一Id是否被消費(fèi),消息消費(fèi)成功后將id和狀態(tài)保存在日志表中,我們從(①步驟)表中獲取并判斷messageId的狀態(tài)即可
//從redis中獲取messageId的value
String value = redisUtils.get(messageId)+"";
if(value.equals("1") ){ //表示已經(jīng)消費(fèi)
return; //結(jié)束
}
System.out.println("郵件消費(fèi)者獲取生產(chǎn)者消息" + "messageId:" + messageId + ",消息內(nèi)容:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
// 獲取email參數(shù)
String email = jsonObject.getString("email");
// 請(qǐng)求地址
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
JSONObject result = HttpClientUtils.httpGet(emailUrl);
if (result == null) {
// 因?yàn)榫W(wǎng)絡(luò)原因,造成無(wú)法訪問(wèn),繼續(xù)重試
throw new Exception("調(diào)用接口失敗!");
}
System.out.println("執(zhí)行結(jié)束....");
//① 執(zhí)行到這里已經(jīng)消費(fèi)成功,我們可以修改messageId的狀態(tài),并存入日志表(可以存到redis中,key為消息Id、value為狀態(tài))
}
5. SpringBoot整合RabbitMQ應(yīng)答模式(ACK)
1.修改配置simple下添加 acknowledge-mode: manual:
spring:
rabbitmq:
# 連接地址
host: 127.0.0.1
# 端口號(hào)
port: 5672
# 賬號(hào)
username: guest
# 密碼
password: guest
# 地址(類似于數(shù)據(jù)庫(kù)的概念)
virtual-host: /admin_vhost
# 消費(fèi)者監(jiān)聽(tīng)相關(guān)配置
listener:
simple:
retry:
# 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開(kāi)啟并一直重試
enabled: true
# 最大重試次數(shù)
max-attempts: 5
# 重試間隔時(shí)間(毫秒)
initial-interval: 3000
# 開(kāi)啟手動(dòng)ack
acknowledge-mode: manual
2.消費(fèi)者增加代碼:
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手動(dòng)ack channel.basicAck(deliveryTag, false);手動(dòng)簽收
//郵件隊(duì)列
@Component
public class FanoutEamilConsumer {
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.out
.println(Thread.currentThread().getName() + ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:" + new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId());
// 手動(dòng)ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動(dòng)簽收
channel.basicAck(deliveryTag, false);
}
}
RabbitMQ 如何保證冪等性,數(shù)據(jù)一致性
mq的作用主要是用來(lái)解耦,削峰,異步,
增加MQ,系統(tǒng)的復(fù)雜性也會(huì)增加很多,
也會(huì)帶來(lái)其他的問(wèn)題,比如MQ掛了怎么辦,怎么保持?jǐn)?shù)據(jù)的冪等性
冪等性問(wèn)題通俗點(diǎn)講就是保證數(shù)據(jù)不被重復(fù)消費(fèi),同時(shí)數(shù)據(jù)也不能少,
也就是數(shù)據(jù)一致性問(wèn)題。
下面是MQ丟失的3種情況

1,生產(chǎn)者發(fā)送消息至MQ的數(shù)據(jù)丟失
解決方法:在生產(chǎn)者端開(kāi)啟comfirm 確認(rèn)模式,你每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的 id,
然后如果寫(xiě)入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè) ack 消息,告訴你說(shuō)這個(gè)消息 ok 了
2,MQ收到消息,暫存內(nèi)存中,還沒(méi)消費(fèi),自己掛掉,數(shù)據(jù)會(huì)都丟失
解決方式:MQ設(shè)置為持久化。將內(nèi)存數(shù)據(jù)持久化到磁盤(pán)中
3,消費(fèi)者剛拿到消息,還沒(méi)處理,掛掉了,MQ又以為消費(fèi)者處理完
解決方式:用 RabbitMQ 提供的 ack 機(jī)制,簡(jiǎn)單來(lái)說(shuō),就是你必須關(guān)閉 RabbitMQ 的自動(dòng) ack,可以通過(guò)一個(gè) api 來(lái)調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再在程序里 ack 一把。這樣的話,如果你還沒(méi)處理完,不就沒(méi)有 ack 了?那 RabbitMQ 就認(rèn)為你還沒(méi)處理完,這個(gè)時(shí)候 RabbitMQ 會(huì)把這個(gè)消費(fèi)分配給別的 consumer 去處理,消息是不會(huì)丟的。

數(shù)據(jù)重復(fù)的問(wèn)題簡(jiǎn)單的多,就是在消費(fèi)端判斷數(shù)據(jù)是否已經(jīng)被消費(fèi)過(guò)
- 比如你拿個(gè)數(shù)據(jù)要寫(xiě)庫(kù),你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
- 比如你是寫(xiě) Redis,那沒(méi)問(wèn)題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個(gè)場(chǎng)景,那做的稍微復(fù)雜一點(diǎn),你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時(shí)候,里面加一個(gè)全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費(fèi)到了之后,先根據(jù)這個(gè) id 去比如 Redis 里查一下,之前消費(fèi)過(guò)嗎?如果沒(méi)有消費(fèi)過(guò),你就處理,然后這個(gè) id 寫(xiě) Redis。如果消費(fèi)過(guò)了,那你就別處理了,保證別重復(fù)處理相同的消息即可。
- 比如基于數(shù)據(jù)庫(kù)的唯一鍵來(lái)保證重復(fù)數(shù)據(jù)不會(huì)重復(fù)插入多條。因?yàn)橛形ㄒ绘I約束了,重復(fù)數(shù)據(jù)插入只會(huì)報(bào)錯(cuò),不會(huì)導(dǎo)致數(shù)據(jù)庫(kù)中出現(xiàn)臟數(shù)據(jù)。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
vue3實(shí)現(xiàn)一個(gè)todo-list
這篇文章主要為大家詳細(xì)介紹了基于vuejs實(shí)現(xiàn)一個(gè)todolist項(xiàng)目,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來(lái)幫助2021-08-08
Session過(guò)期后實(shí)現(xiàn)自動(dòng)跳轉(zhuǎn)登錄頁(yè)面
這篇文章主要介紹了Session過(guò)期后實(shí)現(xiàn)自動(dòng)跳轉(zhuǎn)登錄頁(yè)面,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-12-12
Spring Boot 啟動(dòng)失敗:循環(huán)依賴排查到懶加載配置的過(guò)程解析
本文我將從一個(gè)真實(shí)的生產(chǎn)環(huán)境故障案例出發(fā),帶你深入了解Spring Boot循環(huán)依賴的檢測(cè)機(jī)制、排查方法和解決方案,通過(guò)系統(tǒng)性分析和實(shí)戰(zhàn)演練幫助掌握如何在復(fù)雜的應(yīng)用中處理循環(huán)依賴,感興趣的朋友跟隨小編一起看看吧2025-08-08

