springboot實現(xiàn)rabbitmq消息確認(rèn)的示例代碼
概述
RabbitMQ的消息確認(rèn)有兩種。 一種是消息發(fā)送確認(rèn)。這種是用來確認(rèn)生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發(fā)送確認(rèn)分為兩步,一是確認(rèn)是否到達(dá)交換器,二是確認(rèn)是否到達(dá)隊列。 第二種是消費接收確認(rèn)。這種是確認(rèn)消費者是否成功消費了隊列中的消息。
一、運行效果
二、實現(xiàn)過程
①、引入rabbitmq包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
②、修改application.properties配置
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 發(fā)送者開啟 confirm 確認(rèn)機制 spring.rabbitmq.publisher-confirms=true # 發(fā)送者開啟 return 確認(rèn)機制 spring.rabbitmq.publisher-returns=true #################################################### # 設(shè)置消費端手動 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重試 spring.rabbitmq.listener.simple.retry.enabled=true
③、定義exchange和queue,并將queue綁定在exchange上
package com.mm.springbootrabbitmqconfirmdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean(name = "confirmQueue") public Queue confirmQueue(){ return new Queue("confirmQueue",true,false,false); } @Bean(name = "confirmExchange") public FanoutExchange confirmExchange(){ return new FanoutExchange("confirmExchange"); } @Bean public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange); } }
④、消息發(fā)送確認(rèn)
發(fā)送消息確認(rèn):用來確認(rèn)生產(chǎn)者 producer
將消息發(fā)送到 broker
,broker
上的交換機 exchange
再投遞給隊列 queue
的過程中,消息是否成功投遞。
消息從 producer
到 rabbitmq broker
有一個 confirmCallback
確認(rèn)模式。
消息從 exchange
到 queue
投遞失敗有一個 returnCallback
退回模式。
我們可以利用這兩個Callback
來確保消息的100%送達(dá)。
1、 ConfirmCallback確認(rèn)模式
消息只要被 rabbitmq broker
接收到就會觸發(fā) confirmCallback
回調(diào) 。
package com.mm.springbootrabbitmqconfirmdemo.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("消息發(fā)送異常!"); } else { log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } }
實現(xiàn)接口 ConfirmCallback
,重寫其confirm()
方法,方法內(nèi)有三個參數(shù)correlationData
、ack
、cause
。
correlationData
:對象內(nèi)部只有一個id
屬性,用來表示當(dāng)前消息的唯一性。ack
:消息投遞到broker
的狀態(tài),true
表示成功。cause
:表示投遞失敗的原因。
但消息被 broker
接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會被投遞到目標(biāo) queue
里。所以接下來需要用到 returnCallback
。
2、 ReturnCallback 退回模式
如果消息未能投遞到目標(biāo) queue
里將觸發(fā)回調(diào) returnCallback
,一旦向 queue
投遞消息未成功,這里一般會記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補償?shù)炔僮鳌?/p>
com.mm.springbootrabbitmqconfirmdemo.service; lombok.extern.slf4j.; org.springframework.amqp.core.Message; org.springframework.amqp.rabbit.core.RabbitTemplate; org.springframework.stereotype.; ReturnCallbackService RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;
實現(xiàn)接口ReturnCallback
,重寫 returnedMessage()
方法,方法有五個參數(shù)message
(消息體)、replyCode
(響應(yīng)code)、replyText
(響應(yīng)內(nèi)容)、exchange
(交換機)、routingKey
(隊列)。
下邊是具體的消息發(fā)送,在rabbitTemplate
中設(shè)置 Confirm
和 Return
回調(diào),我們通過setDeliveryMode()
對消息做持久化處理,為了后續(xù)測試創(chuàng)建一個 CorrelationData
對象,添加一個id
為10000000000
。
⑤、消息發(fā)送確認(rèn)
消息接收確認(rèn)要比消息發(fā)送確認(rèn)簡單一點,因為只有一個消息回執(zhí)(ack
)的過程。使用@RabbitHandler
注解標(biāo)注的方法要增加 channel
(信道)、message
兩個參數(shù)。
@Slf4j @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); //TODO 具體業(yè)務(wù) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重復(fù)處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息 } else { log.error("消息即將再次返回隊列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
消費消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。
1、basicAck
basicAck
:表示成功確認(rèn),使用此回執(zhí)方法后,消息會被rabbitmq broker
刪除。
void?basicAck(long?deliveryTag,?boolean?multiple)
deliveryTag
:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag
都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag
的消息進行ack
、nack
、reject
等操作。
multiple
:是否批量確認(rèn),值為 true
則會一次性 ack
所有小于當(dāng)前消息 deliveryTag
的消息。
舉個栗子: 假設(shè)我先發(fā)送三條消息deliveryTag
分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時deliveryTag
為8,multiple
設(shè)置為 true,會將5、6、7、8的消息全部進行確認(rèn)。
2、basicNack
basicNack
:表示失敗確認(rèn),一般在消費消息業(yè)務(wù)異常時用到此方法,可以將消息重新投遞入隊列。
void?basicNack(long?deliveryTag,?boolean?multiple,?boolean?requeue)
deliveryTag
:表示消息投遞序號。
multiple
:是否批量確認(rèn)。
requeue
:值為 true
消息將重新入隊列。
3、basicReject
basicReject
:拒絕消息,與basicNack
區(qū)別在于不能進行批量操作,其他用法很相似。
void?basicReject(long?deliveryTag,?boolean?requeue)
deliveryTag
:表示消息投遞序號。
requeue
:值為 true
消息將重新入隊列。
三、項目結(jié)構(gòu)圖
四、補充
1、別忘確認(rèn)消息
這是一個非常沒技術(shù)含量的坑,但卻是非常容易犯錯的地方。
開啟消息確認(rèn)機制,消費消息別忘了channel.basicAck
,否則消息會一直存在,導(dǎo)致重復(fù)消費。
2、消息無限投遞
在我最開始接觸消息確認(rèn)機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業(yè)務(wù)邏輯后確認(rèn)消息, int a = 1 / 0
發(fā)生異常后將消息重新投入隊列。
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消費者 2 號收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
3、重復(fù)消費
如何保證 MQ 的消費是冪等性,這個需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL
、或者redis
將消息持久化,通過再消息中的唯一性屬性校驗。
可以看到使用了 RabbitMQ
以后,我們的業(yè)務(wù)鏈路明顯變長了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場景也增加了。例如:
消息生產(chǎn)者 - > rabbitmq服務(wù)器(消息發(fā)送失?。?/p>
rabbitmq服務(wù)器自身故障導(dǎo)致消息丟失
消息消費者 - > rabbitmq服務(wù)(消費消息失敗)
到此這篇關(guān)于springboot實現(xiàn)rabbitmq消息確認(rèn)的示例代碼的文章就介紹到這了,更多相關(guān)springboot rabbitmq消息確認(rèn)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java并發(fā)編程中的ConcurrentLinkedQueue詳解
這篇文章主要介紹了Java并發(fā)編程中的ConcurrentLinkedQueue詳解,GetThread線程不會因為ConcurrentLinkedQueue隊列為空而等待,而是直接返回null,所以當(dāng)實現(xiàn)隊列不空時,等待時,則需要用戶自己實現(xiàn)等待邏輯,需要的朋友可以參考下2023-12-12springboot項目讀取resources目錄下的文件的9種方式
本文主要介紹了springboot項目讀取resources目錄下的文件的9種方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04Spring Boot多模塊化后,服務(wù)間調(diào)用的坑及解決
這篇文章主要介紹了Spring Boot多模塊化后,服務(wù)間調(diào)用的坑及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06java面試常問的Runnable和Callable的區(qū)別
大家好,本篇文章主要講的是java面試常問的Runnable和Callable的區(qū)別,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下2022-01-01Spring init-method與destroy-method屬性的用法解析
這篇文章主要介紹了Spring init-method與destroy-method屬性的用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08