Spring boot Rabbitmq消息防丟失實(shí)踐
前言
之前看很多網(wǎng)上大佬的防丟失的文章,文章中理論知識(shí)偏多,所以自己想著實(shí)踐一下,實(shí)踐過(guò)程中也踩了一些坑,因此寫出了這篇文章。如果文章有誤人子弟的地方,望在評(píng)論區(qū)指出。
導(dǎo)致消息出現(xiàn)丟失的原因
- 發(fā)送時(shí)失敗,指發(fā)送端發(fā)送完消息準(zhǔn)備到達(dá)消息隊(duì)列的過(guò)程中,因網(wǎng)絡(luò)波動(dòng)、消息隊(duì)列服務(wù)宕機(jī)等,消息隊(duì)列服務(wù)無(wú)法接收消息,所以導(dǎo)致了丟失。
- 到達(dá)時(shí)宕機(jī),消息隊(duì)列服務(wù)接收到消息之后,如果沒(méi)有開(kāi)啟持久化,消息會(huì)存儲(chǔ)在內(nèi)存中(當(dāng)然內(nèi)存吃緊的話,也會(huì)轉(zhuǎn)入磁盤,緩解內(nèi)存),如果這個(gè)時(shí)候服務(wù)掛了,那么內(nèi)存中的消息就會(huì)丟失。
- 發(fā)送到消費(fèi)端失敗,消費(fèi)端接收到了消息的時(shí)候,消費(fèi)端服務(wù)掛了,而rabbitmq默認(rèn)自動(dòng)ack,也就是說(shuō)rabbitmq發(fā)送到消費(fèi)端,一旦認(rèn)定了消費(fèi)端接收了,無(wú)論有無(wú)消費(fèi)成功,rabbitmq都認(rèn)為是發(fā)送成功。
下面我們以這三種情況進(jìn)行實(shí)踐。
環(huán)境
jdk1.8
Spring boot 2.3.7.RELEASE
Spring-boot-starter-amqp 2.3.7.RELEASE
Rabbitmq 3.7.7
準(zhǔn)備工作
我事先準(zhǔn)備了好了交換機(jī)以及隊(duì)列:
- 交換機(jī):
message.log.test.exchange
和message.log.test2.exchange
- 隊(duì)列:
message.loss.test.queue
其中message.loss.test.queue
和message.log.test.exchange
是綁定關(guān)系,而message.log.test2.exchange
沒(méi)有綁定隊(duì)列
1.發(fā)送時(shí)失敗
發(fā)送時(shí)失敗,rabbitmq有兩種情況是屬于發(fā)送時(shí)失敗。
- 消息未到rabbitmq的交換機(jī)(exchange)
- 消息到達(dá)了rabbitmq的交換機(jī)(exchange),但是沒(méi)有到達(dá)隊(duì)列(queue)
第一種的解決方式是使用confirm機(jī)制。第二種解決方式則是使用return機(jī)制。
使用confirm機(jī)制
模擬場(chǎng)景
confirm機(jī)制是當(dāng)發(fā)送端的消息沒(méi)有到達(dá)rabbitmq的交換機(jī)(exchange)時(shí),會(huì)觸發(fā)confirm方法,告訴發(fā)送端該消息沒(méi)有到達(dá)rabbitmq,需要做業(yè)務(wù)處理。
這里我們發(fā)送消息到rabbitmq不存在的交換機(jī)上,就可以模擬上述場(chǎng)景。
實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口
/** * 當(dāng)消息沒(méi)有到達(dá)Rabbitmq的交換機(jī)時(shí)觸發(fā)該方法(當(dāng)然到達(dá)了也會(huì)觸發(fā),) */ @Component public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * * @param correlationData 消息屬性體 * @param ack 是否成功,成功到達(dá)true,沒(méi)有到達(dá),false * @param cause rabbitmq自身給的信息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //第一個(gè)坑,如果發(fā)送端發(fā)送消息時(shí)沒(méi)有對(duì)correlationData進(jìn)行處理,conirm方法接收到的對(duì)象都會(huì)是null //當(dāng)接收失敗并且correlationData對(duì)象為null,證明目前已經(jīng)無(wú)法追溯回業(yè)務(wù),可以做業(yè)務(wù)日志處理 if(!ack&&correlationData==null){ System.out.println(cause); //日志處理。。。 return; } //如果接收失敗 if(!ack){ System.out.println("消息Id:"+correlationData.getId()); Message message=correlationData.getReturnedMessage(); System.out.println("消息體:"+new String(message.getBody())); //這里可以持久化業(yè)務(wù)消息體到數(shù)據(jù)庫(kù),然后定時(shí)去進(jìn)行補(bǔ)償處理或者重試等等 return; } //處理完成 } }
發(fā)送端代碼
/** * 消息的推送 * @return */ @PostMapping("push") public boolean push(){ TestMessage testMessage=new TestMessage(); testMessage.setName("mq名稱"); testMessage.setBusinessId("業(yè)務(wù)Id"); //定義CorrelationData對(duì)象以及消息屬性。不然comfirm方法無(wú)論失敗還是成功,CorrelationData參數(shù)永遠(yuǎn)是null CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //傳遞業(yè)務(wù)數(shù)據(jù) correlationData.setReturnedMessage(new Message(JSONObject.toJSON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties())); //發(fā)送消息(這里發(fā)送給了message.log.test.exchange11交換機(jī),但實(shí)際rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData); return true; }
這里是我踩的第一個(gè)坑,如果發(fā)送端不定義correlationData,那么confirm接收到的correlationData對(duì)象參數(shù) 都會(huì)是null
實(shí)現(xiàn)效果
使用return機(jī)制
模擬場(chǎng)景
當(dāng)消息到達(dá)了rabbitmq的交換機(jī)的時(shí)候,但是又沒(méi)有到達(dá)隊(duì)列,那么就會(huì)觸發(fā)return方法。
下面我們定義一個(gè)沒(méi)有綁定隊(duì)列的交換機(jī),然后發(fā)送消息到交換機(jī),就可以模擬上述場(chǎng)景
實(shí)現(xiàn)RabbitTemplate.ReturnCallback
/** * 當(dāng)消息沒(méi)有到達(dá)Rabbitmq的隊(duì)列時(shí)就會(huì)觸發(fā)該方法 */ @Component public class ReturnCallBack implements RabbitTemplate.ReturnCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); } /** * @param message 消息體 * @param replyCode 返回代碼 * @param replyText 返回文本 * @param exchange 交換機(jī) * @param routingKey 發(fā)送方定義的路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息標(biāo)識(shí):" + message.getMessageProperties().getDeliveryTag()); String messageBody = null; try { messageBody = new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println("消息:" + messageBody); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }
發(fā)送端代碼
/** * 消息的推送 * @return */ @PostMapping("push2") public boolean push2(){ TestMessage testMessage=new TestMessage(); testMessage.setName("mq名稱2"); testMessage.setBusinessId("業(yè)務(wù)Id"); template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString()); return true; }
這里需注意消息體需要JSON序列化,不然returnedMessage方法接收的消息body會(huì)是亂碼
實(shí)現(xiàn)效果
rabbitmq服務(wù)掛了,造成內(nèi)存的消息丟失。
這個(gè)開(kāi)啟rabbitmq的持久化機(jī)制就好了,開(kāi)啟之后消息到達(dá)rabbitmq服務(wù),會(huì)實(shí)時(shí)轉(zhuǎn)入磁盤。這里怎么設(shè)置就不多說(shuō)了,網(wǎng)上挺多文章可以解答。
不過(guò)即使開(kāi)啟了還是會(huì)有一種情況會(huì)造成消息丟失,那就是消息即將要持久化到磁盤的那一刻,服務(wù)掛了,就會(huì)造成丟失,不過(guò)這種情況我也不知道怎么模擬,所以就暫不實(shí)踐了。
發(fā)送到消費(fèi)端消費(fèi)失敗
上面提到默認(rèn)情況下rabbitmq使用的是自動(dòng)ack的方式,我們將它改成手動(dòng)ack的方式,就可以解決這個(gè)問(wèn)題。
修改application.yml配置文件
rabbitmq: listener: simple: #開(kāi)啟手動(dòng)確認(rèn) acknowledge-mode: manual #開(kāi)啟失敗后的重試機(jī)制 retry: enabled: true #最多重試3次 max-attempts: 3
下面我們?cè)囈幌聨追N消費(fèi)端消費(fèi)不成功的場(chǎng)景
消費(fèi)了,但是忘記做手動(dòng)確認(rèn)ack的操作代碼。
@Component public class TestConsumer { /** * 消費(fèi) * @param testmessage 消息體 * @param message 消息屬性 * @param channel mq通道對(duì)象 */ @RabbitListener(queues = {"message.loss.test.queue"}) public void test(TestMessage testmessage, Message message, Channel channel) throws IOException { System.out.println("消費(fèi)testmessage消息:"+testmessage.getName()); // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
效果
效果流程:
- 第一次用Postman請(qǐng)求之后,控制臺(tái)顯示了消息被消費(fèi)的信號(hào)。
- 然后去查看rabbitmq后臺(tái)管理剛剛被消費(fèi)的消息以及變?yōu)?strong>Unacked
- 停止程序后(關(guān)閉消費(fèi)端),過(guò)一陣子,后臺(tái)管理顯示消息變回了Ready,也就是說(shuō)重新回到了隊(duì)列。
- 重新啟動(dòng)程序(開(kāi)啟消費(fèi)段),消息被重新消費(fèi)。
總而言之,如果消費(fèi)端沒(méi)有做手動(dòng)確認(rèn)的操作,那么在消費(fèi)端還沒(méi)關(guān)閉之前,消息會(huì)變成Unacked,不會(huì)再次被消費(fèi),但一旦消費(fèi)端關(guān)閉了,消息會(huì)重新回到隊(duì)列,讓消費(fèi)端消費(fèi)。
消費(fèi)過(guò)程中,觸發(fā)了未知異常,代碼沒(méi)有try catch
/** * 消費(fèi) * @param testmessage 消息體 * @param message 消息屬性 * @param channel mq通道對(duì)象 */ @RabbitListener(queues = {"message.loss.test.queue"}) public void test(TestMessage testmessage, Message message, Channel channel) throws IOException { System.out.println("消費(fèi)testmessage消息:"+testmessage.getName()); //故意觸發(fā)異常 if(!StringUtils.isEmpty(testmessage.getName())){ throw new RuntimeException("11211"); } channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
效果1
上面的效果圖顯示,我在觸發(fā)了異常之后,消息重試了三次,也就是我在application.yml 配置的重試三次
如果我去掉重試機(jī)制會(huì)是什么效果。
效果2
效果和忘記做ack操作的效果一樣,消息沒(méi)有ack后,消息會(huì)變成Unacked狀態(tài),消費(fèi)端關(guān)閉后消息會(huì)重新回到隊(duì)列,然后重新鏈接的時(shí)候,就會(huì)再消費(fèi)一次。
總結(jié)
到此這篇關(guān)于Spring boot Rabbitmq消息防丟失實(shí)踐的文章就介紹到這了,更多相關(guān)Spring boot Rabbitmq 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用Spring Social輕松搞定微信授權(quán)登錄的方法示例
這篇文章主要介紹了利用Spring Social輕松搞定微信授權(quán)登錄的方法示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12透明化Sharding-JDBC數(shù)據(jù)庫(kù)字段加解密方案
這篇文章主要為大家介紹了透明化Sharding-JDBC數(shù)據(jù)庫(kù)字段加解密方案,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-02-02Java Web中解決路徑(絕對(duì)路徑與相對(duì)路徑)問(wèn)題
這篇文章主要介紹了Java Web中解決路徑問(wèn)題的相關(guān)資料,java 文件路徑有絕對(duì)路徑與相對(duì)路徑,這里提供了幾種方法解決所有路徑問(wèn)題,需要的朋友可以參考下2017-01-01SpringCloud-Hystrix實(shí)現(xiàn)原理總結(jié)
通過(guò)hystrix可以解決雪崩效應(yīng)問(wèn)題,它提供了資源隔離、降級(jí)機(jī)制、融斷、緩存等功能。接下來(lái)通過(guò)本文給大家分享SpringCloud-Hystrix實(shí)現(xiàn)原理,感興趣的朋友一起看看吧2021-05-05Springboot ApplicationRunner的使用解讀
這篇文章主要介紹了Springboot ApplicationRunner的使用解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-05-05SpringBoot利用redis集成消息隊(duì)列的方法
這篇文章主要介紹了SpringBoot利用redis集成消息隊(duì)列的方法,需要的朋友可以參考下2017-08-08springboot項(xiàng)目(jar包)指定配置文件啟動(dòng)圖文教程
這篇文章主要給大家介紹了關(guān)于springboot項(xiàng)目(jar包)指定配置文件啟動(dòng)的相關(guān)資料,在多環(huán)境部署過(guò)程中、及線上運(yùn)維中可能會(huì)遇到臨時(shí)指定配置文件的情況,需要的朋友可以參考下2023-07-07使用feign發(fā)送http請(qǐng)求解析報(bào)錯(cuò)的問(wèn)題
這篇文章主要介紹了使用feign發(fā)送http請(qǐng)求解析報(bào)錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03