SpringBoot基于RabbitMQ實(shí)現(xiàn)消息可靠性的方法
1.概述
消息從發(fā)送到消費(fèi)者接收 會經(jīng)歷的過程如下:
丟失消息的可能性
- 發(fā)送時丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費(fèi)就宕機(jī)
針對這些問題,RabbitMQ分別給出了解決方案
- 生產(chǎn)者確認(rèn)機(jī)制
- mq持久化
- 消費(fèi)者確認(rèn)機(jī)制
- 失敗重試機(jī)制
2. 生產(chǎn)者消息確認(rèn)
2.1 概述
RabbitMQ 提供了 publisher confirm
機(jī)制來避免消息發(fā)送到 MQ 過程中丟失。這種機(jī)制必須給每個消息指定一個唯一ID。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
2.2 實(shí)戰(zhàn)
2.2.1 修改配置
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
配置說明:
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:simple
:同步等待confirm結(jié)果,直到超時correlated
:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
publish-returns
:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallbacktemplate.mandatory
:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
2.2.2 定義 Return 回調(diào)
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項(xiàng)目加載時配置:
修改publisher服務(wù),添加一個:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 設(shè)置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投遞失敗,記錄日志 log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有業(yè)務(wù)需要,可以重發(fā)消息 }); } }
2.2.3 定義ConfirmCallback
ConfirmCallback 可以在發(fā)送消息時指定,因?yàn)槊總€業(yè)務(wù)處理 confirm 成功或失敗的邏輯不一定相同。
public void testSendMessage2SimpleQueue() throws InterruptedException { // 1.消息體 String message = "hello, spring amqp!"; // 2.全局唯一的消息ID,需要封裝到 CorrelationData 中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3.添加callback correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // 3.1.ack,消息成功 log.debug("消息發(fā)送成功, ID:{}", correlationData.getId()); }else{ // 3.2.nack,消息失敗 log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 4.發(fā)送消息 rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData); // 休眠一會兒,等待ack回執(zhí) //Thread.sleep(20); }
3. 消息持久化
生產(chǎn)者確認(rèn)可以確保消息投遞到 RabbitMQ 的隊(duì)列中,但是消息發(fā)送到 RabbitMQ 以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。
- 交換機(jī)持久化
- 隊(duì)列持久化
- 消息持久化
3.1 交換機(jī)持久化
@Bean public DirectExchange simpleExchange(){ // 三個參數(shù):①交換機(jī)名稱、②是否持久化、③當(dāng)沒有queue與其綁定時是否自動刪除 return new DirectExchange("simple.direct", true, false); }
事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。
3.2 隊(duì)列持久化
@Bean public Queue simpleQueue(){ // 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。
3.3 消息持久化
默認(rèn)情況下,SpringAMQP 交換機(jī) 隊(duì)列 以及發(fā)出的任何消息都是持久化的,不用特意指定。
4. 消費(fèi)者消息確認(rèn)
RabbitMQ 是 閱后即焚 機(jī)制,RabbitMQ 確認(rèn)消息被消費(fèi)者消費(fèi)后會立刻刪除。
而 RabbitMQ 是通過 消費(fèi)者回執(zhí) 來確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向 RabbitMQ 發(fā)送 ACK 回執(zhí),表明自己已經(jīng)處理消息。
設(shè)想這樣的場景:
- 1)RabbitMQ投遞消息給消費(fèi)者
- 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費(fèi)者宕機(jī),消息尚未處理
這樣,消息就丟失了。因此消費(fèi)者返回ACK的時機(jī)非常重要。
4.1 三種確認(rèn)模式
而 SpringAMQP 則允許配置三種確認(rèn)模式:
•manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
•auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack。
•none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會成功處理,因此消息投遞后立即被刪除(存在丟失消息的風(fēng)險)。
由此可知:
- none 模式下,消息投遞是不可靠的,可能丟失
- auto 模式類似事務(wù)機(jī)制,出現(xiàn)異常時返回nack,消息回滾到mq;沒有異常,返回ack
- manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時候該ack
一般,我們都是使用默認(rèn)的 auto 即可。
相關(guān)配置:
spring: rabbitmq: listener: simple: #acknowledge-mode: none # 關(guān)閉ack #acknowledge-mode: manual # 手動ack acknowledge-mode: auto # 自動ack
4.2 消息失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會不斷 requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次 requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:怎么辦呢?
4.2.1 本地重試機(jī)制
我們可以利用 Spring 的 retry 機(jī)制,在消費(fèi)者出現(xiàn)異常時利用 本地重試,而不是無限制的 requeue 到 mq 隊(duì)列。
修改 consumer 服務(wù)的 application.yml 文件,添加內(nèi)容:
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費(fèi)者失敗重試 initial-interval: 1000 # 初始的失敗等待時長為1秒 multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval max-attempts: 3 # 最大重試次數(shù) stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟consumer服務(wù),重復(fù)之前的測試。可以發(fā)現(xiàn):
- 在重試3次后,SpringAMQP 會拋出異常
AmqpRejectAndDontRequeueException
,說明本地重試觸發(fā)了。 - 查看 RabbitMQ 控制臺,發(fā)現(xiàn)消息被刪除了,說明最后 SpringAMQP 返回的是ack,mq刪除消息了。
結(jié)論
- 開啟本地重試時,消息處理過程中拋出異常,不會 requeue 到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,Spring 會返回 ack,消息會被丟棄。
4.2.2 失敗策略
在之前的測試中,達(dá)到最大重試次數(shù)后,消息會被丟棄,這是由 Spring 內(nèi)部機(jī)制決定的。
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有 MessageRecovery
接口來處理,它包含三種不同的實(shí)現(xiàn):
RejectAndDontRequeueRecoverer
:重試耗盡后,直接 reject,丟棄消息。默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer
:重試耗盡后,返回 nack,消息重新入隊(duì)RepublishMessageRecoverer
:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer
,失敗后將消息投遞到一個指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列
2)定義一個RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
代碼如下:
@Configuration public class ErrorMessageConfig { @Bean // 處理失敗消息的交換機(jī) public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean // 處理失敗消息的隊(duì)列 public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
其實(shí) 我們在生產(chǎn)中會指定死信交換機(jī)來處理失敗的消息
5. 總結(jié)
如何確保RabbitMQ消息的可靠性?
- 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會丟失
- 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
以上就是SpringBoot基于RabbitMQ實(shí)現(xiàn)消息可靠性的方法的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ消息可靠性的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringMVC中的@RequestMapping注解的使用詳細(xì)教程
@RequestMapping注解的作用就是將請求和處理請求的控制器方法關(guān)聯(lián)起來,建立映射關(guān)系,本文主要來和大家詳細(xì)講講它的具體使用,感興趣的可以了解一下2023-07-07Mybatis 實(shí)現(xiàn)打印sql語句的代碼
這篇文章主要介紹了Mybatis 實(shí)現(xiàn)打印sql語句的代碼,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07Java使用Tesseract-OCR實(shí)戰(zhàn)教程
本文介紹了如何在Java中使用Tesseract-OCR進(jìn)行文本提取,包括Tesseract-OCR的安裝、中文訓(xùn)練庫的配置、依賴庫的引入以及具體的代碼實(shí)現(xiàn),通過這個過程,我們將演示如何從視頻幀中提取文本2025-02-02出現(xiàn)java.lang.NoSuchMethodException異常的解決(靠譜)
這篇文章主要介紹了出現(xiàn)java.lang.NoSuchMethodException異常的解決方案(靠譜),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03SpringBoot項(xiàng)目集成Flyway詳細(xì)過程
今天帶大家學(xué)習(xí)SpringBoot項(xiàng)目集成Flyway詳細(xì)過程,文中有非常詳細(xì)的介紹及代碼示例,對正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05SpringBoot 創(chuàng)建容器的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot 創(chuàng)建容器的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10spring boot項(xiàng)目中MongoDB的使用方法
前段時間分享了關(guān)于Spring Boot中使用Redis的文章,除了Redis之后,我們在互聯(lián)網(wǎng)產(chǎn)品中還經(jīng)常會用到另外一款著名的NoSQL數(shù)據(jù)庫MongoDB。下面這篇文章主要給大家介紹了關(guān)于在spring boot項(xiàng)目中MongoDB的使用方法,需要的朋友可以參考下。2017-09-09