Spring boot Rabbitmq消息防丟失實(shí)踐
前言
之前看很多網(wǎng)上大佬的防丟失的文章,文章中理論知識偏多,所以自己想著實(shí)踐一下,實(shí)踐過程中也踩了一些坑,因此寫出了這篇文章。如果文章有誤人子弟的地方,望在評論區(qū)指出。
導(dǎo)致消息出現(xiàn)丟失的原因
- 發(fā)送時失敗,指發(fā)送端發(fā)送完消息準(zhǔn)備到達(dá)消息隊列的過程中,因網(wǎng)絡(luò)波動、消息隊列服務(wù)宕機(jī)等,消息隊列服務(wù)無法接收消息,所以導(dǎo)致了丟失。
- 到達(dá)時宕機(jī),消息隊列服務(wù)接收到消息之后,如果沒有開啟持久化,消息會存儲在內(nèi)存中(當(dāng)然內(nèi)存吃緊的話,也會轉(zhuǎn)入磁盤,緩解內(nèi)存),如果這個時候服務(wù)掛了,那么內(nèi)存中的消息就會丟失。
- 發(fā)送到消費(fèi)端失敗,消費(fèi)端接收到了消息的時候,消費(fèi)端服務(wù)掛了,而rabbitmq默認(rèn)自動ack,也就是說rabbitmq發(fā)送到消費(fèi)端,一旦認(rèn)定了消費(fèi)端接收了,無論有無消費(fèi)成功,rabbitmq都認(rèn)為是發(fā)送成功。
下面我們以這三種情況進(jìn)行實(shí)踐。
環(huán)境
jdk1.8
Spring boot 2.3.7.RELEASE
Spring-boot-starter-amqp 2.3.7.RELEASE
Rabbitmq 3.7.7
準(zhǔn)備工作
我事先準(zhǔn)備了好了交換機(jī)以及隊列:
- 交換機(jī):
message.log.test.exchange
和message.log.test2.exchange
- 隊列:
message.loss.test.queue
其中message.loss.test.queue
和message.log.test.exchange
是綁定關(guān)系,而message.log.test2.exchange
沒有綁定隊列
1.發(fā)送時失敗
發(fā)送時失敗,rabbitmq有兩種情況是屬于發(fā)送時失敗。
- 消息未到rabbitmq的交換機(jī)(exchange)
- 消息到達(dá)了rabbitmq的交換機(jī)(exchange),但是沒有到達(dá)隊列(queue)
第一種的解決方式是使用confirm機(jī)制。第二種解決方式則是使用return機(jī)制。
使用confirm機(jī)制
模擬場景
confirm機(jī)制是當(dāng)發(fā)送端的消息沒有到達(dá)rabbitmq的交換機(jī)(exchange)時,會觸發(fā)confirm方法,告訴發(fā)送端該消息沒有到達(dá)rabbitmq,需要做業(yè)務(wù)處理。
這里我們發(fā)送消息到rabbitmq不存在的交換機(jī)上,就可以模擬上述場景。
實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口
/** * 當(dāng)消息沒有到達(dá)Rabbitmq的交換機(jī)時觸發(fā)該方法(當(dāng)然到達(dá)了也會觸發(fā),) */ @Component public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * * @param correlationData 消息屬性體 * @param ack 是否成功,成功到達(dá)true,沒有到達(dá),false * @param cause rabbitmq自身給的信息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //第一個坑,如果發(fā)送端發(fā)送消息時沒有對correlationData進(jìn)行處理,conirm方法接收到的對象都會是null //當(dāng)接收失敗并且correlationData對象為null,證明目前已經(jīng)無法追溯回業(yè)務(wù),可以做業(yè)務(wù)日志處理 if(!ack&&correlationData==null){ System.out.println(cause); //日志處理。。。 return; } //如果接收失敗 if(!ack){ System.out.println("消息Id:"+correlationData.getId()); Message message=correlationData.getReturnedMessage(); System.out.println("消息體:"+new String(message.getBody())); //這里可以持久化業(yè)務(wù)消息體到數(shù)據(jù)庫,然后定時去進(jìn)行補(bǔ)償處理或者重試等等 return; } //處理完成 } }
發(fā)送端代碼
/** * 消息的推送 * @return */ @PostMapping("push") public boolean push(){ TestMessage testMessage=new TestMessage(); testMessage.setName("mq名稱"); testMessage.setBusinessId("業(yè)務(wù)Id"); //定義CorrelationData對象以及消息屬性。不然comfirm方法無論失敗還是成功,CorrelationData參數(shù)永遠(yuǎn)是null CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //傳遞業(yè)務(wù)數(shù)據(jù) correlationData.setReturnedMessage(new Message(JSONObject.toJSON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties())); //發(fā)送消息(這里發(fā)送給了message.log.test.exchange11交換機(jī),但實(shí)際rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData); return true; }
這里是我踩的第一個坑,如果發(fā)送端不定義correlationData,那么confirm接收到的correlationData對象參數(shù) 都會是null
實(shí)現(xiàn)效果
使用return機(jī)制
模擬場景
當(dāng)消息到達(dá)了rabbitmq的交換機(jī)的時候,但是又沒有到達(dá)隊列,那么就會觸發(fā)return方法。
下面我們定義一個沒有綁定隊列的交換機(jī),然后發(fā)送消息到交換機(jī),就可以模擬上述場景
實(shí)現(xiàn)RabbitTemplate.ReturnCallback
/** * 當(dāng)消息沒有到達(dá)Rabbitmq的隊列時就會觸發(fā)該方法 */ @Component public class ReturnCallBack implements RabbitTemplate.ReturnCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); } /** * @param message 消息體 * @param replyCode 返回代碼 * @param replyText 返回文本 * @param exchange 交換機(jī) * @param routingKey 發(fā)送方定義的路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息標(biāo)識:" + message.getMessageProperties().getDeliveryTag()); String messageBody = null; try { messageBody = new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println("消息:" + messageBody); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }
發(fā)送端代碼
/** * 消息的推送 * @return */ @PostMapping("push2") public boolean push2(){ TestMessage testMessage=new TestMessage(); testMessage.setName("mq名稱2"); testMessage.setBusinessId("業(yè)務(wù)Id"); template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString()); return true; }
這里需注意消息體需要JSON序列化,不然returnedMessage方法接收的消息body會是亂碼
實(shí)現(xiàn)效果
rabbitmq服務(wù)掛了,造成內(nèi)存的消息丟失。
這個開啟rabbitmq的持久化機(jī)制就好了,開啟之后消息到達(dá)rabbitmq服務(wù),會實(shí)時轉(zhuǎn)入磁盤。這里怎么設(shè)置就不多說了,網(wǎng)上挺多文章可以解答。
不過即使開啟了還是會有一種情況會造成消息丟失,那就是消息即將要持久化到磁盤的那一刻,服務(wù)掛了,就會造成丟失,不過這種情況我也不知道怎么模擬,所以就暫不實(shí)踐了。
發(fā)送到消費(fèi)端消費(fèi)失敗
上面提到默認(rèn)情況下rabbitmq使用的是自動ack的方式,我們將它改成手動ack的方式,就可以解決這個問題。
修改application.yml配置文件
rabbitmq: listener: simple: #開啟手動確認(rèn) acknowledge-mode: manual #開啟失敗后的重試機(jī)制 retry: enabled: true #最多重試3次 max-attempts: 3
下面我們試一下幾種消費(fèi)端消費(fèi)不成功的場景
消費(fèi)了,但是忘記做手動確認(rèn)ack的操作代碼。
@Component public class TestConsumer { /** * 消費(fèi) * @param testmessage 消息體 * @param message 消息屬性 * @param channel mq通道對象 */ @RabbitListener(queues = {"message.loss.test.queue"}) public void test(TestMessage testmessage, Message message, Channel channel) throws IOException { System.out.println("消費(fèi)testmessage消息:"+testmessage.getName()); // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
效果
效果流程:
- 第一次用Postman請求之后,控制臺顯示了消息被消費(fèi)的信號。
- 然后去查看rabbitmq后臺管理剛剛被消費(fèi)的消息以及變?yōu)?strong>Unacked
- 停止程序后(關(guān)閉消費(fèi)端),過一陣子,后臺管理顯示消息變回了Ready,也就是說重新回到了隊列。
- 重新啟動程序(開啟消費(fèi)段),消息被重新消費(fèi)。
總而言之,如果消費(fèi)端沒有做手動確認(rèn)的操作,那么在消費(fèi)端還沒關(guān)閉之前,消息會變成Unacked,不會再次被消費(fèi),但一旦消費(fèi)端關(guān)閉了,消息會重新回到隊列,讓消費(fèi)端消費(fèi)。
消費(fèi)過程中,觸發(fā)了未知異常,代碼沒有try catch
/** * 消費(fèi) * @param testmessage 消息體 * @param message 消息屬性 * @param channel mq通道對象 */ @RabbitListener(queues = {"message.loss.test.queue"}) public void test(TestMessage testmessage, Message message, Channel channel) throws IOException { System.out.println("消費(fèi)testmessage消息:"+testmessage.getName()); //故意觸發(fā)異常 if(!StringUtils.isEmpty(testmessage.getName())){ throw new RuntimeException("11211"); } channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
效果1
上面的效果圖顯示,我在觸發(fā)了異常之后,消息重試了三次,也就是我在application.yml 配置的重試三次
如果我去掉重試機(jī)制會是什么效果。
效果2
效果和忘記做ack操作的效果一樣,消息沒有ack后,消息會變成Unacked狀態(tài),消費(fèi)端關(guān)閉后消息會重新回到隊列,然后重新鏈接的時候,就會再消費(fèi)一次。
總結(jié)
到此這篇關(guān)于Spring boot Rabbitmq消息防丟失實(shí)踐的文章就介紹到這了,更多相關(guān)Spring boot Rabbitmq 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用Spring Social輕松搞定微信授權(quán)登錄的方法示例
這篇文章主要介紹了利用Spring Social輕松搞定微信授權(quán)登錄的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12透明化Sharding-JDBC數(shù)據(jù)庫字段加解密方案
這篇文章主要為大家介紹了透明化Sharding-JDBC數(shù)據(jù)庫字段加解密方案,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-02-02SpringCloud-Hystrix實(shí)現(xiàn)原理總結(jié)
通過hystrix可以解決雪崩效應(yīng)問題,它提供了資源隔離、降級機(jī)制、融斷、緩存等功能。接下來通過本文給大家分享SpringCloud-Hystrix實(shí)現(xiàn)原理,感興趣的朋友一起看看吧2021-05-05Springboot ApplicationRunner的使用解讀
這篇文章主要介紹了Springboot ApplicationRunner的使用解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05springboot項(xiàng)目(jar包)指定配置文件啟動圖文教程
這篇文章主要給大家介紹了關(guān)于springboot項(xiàng)目(jar包)指定配置文件啟動的相關(guān)資料,在多環(huán)境部署過程中、及線上運(yùn)維中可能會遇到臨時指定配置文件的情況,需要的朋友可以參考下2023-07-07