springboot中rabbitmq實(shí)現(xiàn)消息可靠性機(jī)制詳解
1. 生產(chǎn)者模塊通過(guò)publisher confirm機(jī)制實(shí)現(xiàn)消息可靠性
1.1 生產(chǎn)者模塊導(dǎo)入rabbitmq相關(guān)依賴
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用于mq消息的序列化與反序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
1.2 配置文件中進(jìn)行mq的相關(guān)配置
spring.rabbitmq.host=10.128.240.183 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
- publish-confirm-type:開(kāi)啟publisher-confirm,有以下可選值
simple:同步等待confirm結(jié)果,直到超時(shí)
correlated:異步回調(diào),定義ConfirmCallback。mq返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
- publish-returns:開(kāi)啟publish-return功能??梢远xReturnCallback
- template.mandatory: 定義消息路由失敗的策略
true:調(diào)用ReturnCallback
false:直接丟棄消息
1.3 定義ReturnCallback(消息投遞到隊(duì)列失敗觸發(fā)此回調(diào))
- 每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback。
- 當(dāng)消息投遞失敗,就會(huì)調(diào)用生產(chǎn)者的returnCallback中定義的處理邏輯
- 可以在容器啟動(dòng)時(shí)就配置這個(gè)回調(diào)
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對(duì)象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) { // 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示 return; } // 記錄日志 log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發(fā)消息 }); } }
1.4 定義ConfirmCallback(消息到達(dá)交換機(jī)觸發(fā)此回調(diào))
可以為redisTemplate指定一個(gè)統(tǒng)一的確認(rèn)回調(diào)
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對(duì)象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) { // 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示 return; } // 記錄日志 log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發(fā)消息 }); // 設(shè)置統(tǒng)一的confirm回調(diào)。只要消息到達(dá)broker就ack=true rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("這是統(tǒng)一的回調(diào)"); System.out.println("correlationData:" + correlationData); System.out.println("ack:" + b); System.out.println("cause:" + s); } }); } }
也可以為特定的消息定制回調(diào)
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testmq() throws InterruptedException { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result->{ if (result.isAck()) { // ACK log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId()); } else { // NACK log.error("消息投遞到交換機(jī)失敗!消息ID:{}", correlationData.getId()); // 重發(fā)消息 } },ex->{ // 記錄日志 log.error("消息發(fā)送失??!", ex); // 重發(fā)消息 }); rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData); }
2. 消費(fèi)者模塊開(kāi)啟消息確認(rèn)
2.1 添加配置
# 手動(dòng)ack消息,不使用默認(rèn)的消費(fèi)端確認(rèn) spring.rabbitmq.listener.simple.acknowledge-mode=manual
- none:關(guān)閉ack,消息投遞時(shí)不可靠的,可能丟失
- auto:類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq,沒(méi)有異常,返回
- ackmanual:我們自己指定什么時(shí)候返回ack
2.2 manual模式在監(jiān)聽(tīng)器中自定義返回ack
@RabbitListener(queues = "order.release.order.queue") @Service public class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到過(guò)期的訂單信息,準(zhǔn)備關(guān)閉訂單" + orderEntity.getOrderSn()); try { orderService.closeOrder(orderEntity); // 第二個(gè)參數(shù)為false則表示僅確認(rèn)此條消息。如果為true則表示對(duì)收到的多條消息同時(shí)確認(rèn) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 第二個(gè)參數(shù)為ture表示將這個(gè)消息重新加入隊(duì)列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
3. 消費(fèi)者模塊開(kāi)啟消息失敗重試機(jī)制
3.1 配置文件添加配置,開(kāi)啟本地重試
spring: rabbitmq: listener: simple: retry: enabled: true # 開(kāi)啟消費(fèi)者失敗重試 initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒 multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval max-attempts: 3 # 最大重試次數(shù) stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
- 開(kāi)啟本地重試,如果消息處理過(guò)程總拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,spring會(huì)返回ack,消息會(huì)被丟棄
4. 消費(fèi)者模塊添加失敗策略(用于開(kāi)啟失敗本地重試功能后)
- 當(dāng)開(kāi)啟本地重試后,重試最大次數(shù)后消息直接丟棄。
- 三種策略,都繼承于MessageRecovery接口
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
4.2 定義處理失敗消息的交換機(jī)和隊(duì)列 沒(méi)有會(huì)自動(dòng)創(chuàng)建相應(yīng)的隊(duì)列、交換機(jī)與綁定關(guān)系,有了就啥也不做
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } // 路由鍵為key @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
4.3 向容器中添加一個(gè)失敗策略組件
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ // error為路由鍵 return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
到此這篇關(guān)于springboot中rabbitmq實(shí)現(xiàn)消息可靠性的文章就介紹到這了,更多相關(guān)springboot rabbitmq消息可靠性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實(shí)踐
這篇文章主要介紹了基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實(shí)踐,需要的朋友可以參考下2024-08-08Java?@Scheduled定時(shí)任務(wù)不執(zhí)行解決辦法
這篇文章主要給大家介紹了關(guān)于Java?@Scheduled定時(shí)任務(wù)不執(zhí)行解決的相關(guān)資料,當(dāng)@Scheduled定時(shí)任務(wù)不執(zhí)行時(shí)可以根據(jù)以下步驟進(jìn)行排查和解決,需要的朋友可以參考下2023-10-10spring boot配置ssl(多cer格式)超詳細(xì)教程
這篇文章主要介紹了spring boot配置ssl(多cer格式)超詳細(xì)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-11-11Java Web項(xiàng)目部署在Tomcat運(yùn)行出錯(cuò)與解決方法示例
這篇文章主要介紹了Java Web項(xiàng)目部署在Tomcat運(yùn)行出錯(cuò)與解決方法,結(jié)合具體實(shí)例形式分析了Java Web項(xiàng)目部署在Tomcat過(guò)程中由于xml配置文件導(dǎo)致的錯(cuò)誤問(wèn)題常見(jiàn)提示與解決方法,需要的朋友可以參考下2017-03-03