欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring boot Rabbitmq消息防丟失實(shí)踐

 更新時間:2022年09月25日 10:05:27   作者:我贏了算我輸???????  
這篇文章主要介紹了Spring boot Rabbitmq消息防丟失實(shí)踐,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下

前言

之前看很多網(wǎng)上大佬的防丟失的文章,文章中理論知識偏多,所以自己想著實(shí)踐一下,實(shí)踐過程中也踩了一些坑,因此寫出了這篇文章。如果文章有誤人子弟的地方,望在評論區(qū)指出。

導(dǎo)致消息出現(xiàn)丟失的原因

  • 發(fā)送時失敗,指發(fā)送端發(fā)送完消息準(zhǔn)備到達(dá)消息隊列的過程中,因網(wǎng)絡(luò)波動、消息隊列服務(wù)宕機(jī)等,消息隊列服務(wù)無法接收消息,所以導(dǎo)致了丟失。
  • 到達(dá)時宕機(jī),消息隊列服務(wù)接收到消息之后,如果沒有開啟持久化,消息會存儲在內(nèi)存中(當(dāng)然內(nèi)存吃緊的話,也會轉(zhuǎn)入磁盤,緩解內(nèi)存),如果這個時候服務(wù)掛了,那么內(nèi)存中的消息就會丟失。
  • 發(fā)送到消費(fèi)端失敗,消費(fèi)端接收到了消息的時候,消費(fèi)端服務(wù)掛了,而rabbitmq默認(rèn)自動ack,也就是說rabbitmq發(fā)送到消費(fèi)端,一旦認(rèn)定了消費(fèi)端接收了,無論有無消費(fèi)成功,rabbitmq都認(rèn)為是發(fā)送成功。

下面我們以這三種情況進(jìn)行實(shí)踐。

環(huán)境

jdk1.8
Spring boot 2.3.7.RELEASE
Spring-boot-starter-amqp 2.3.7.RELEASE
Rabbitmq 3.7.7

準(zhǔn)備工作

我事先準(zhǔn)備了好了交換機(jī)以及隊列:

  • 交換機(jī):message.log.test.exchangemessage.log.test2.exchange
  • 隊列:message.loss.test.queue

其中message.loss.test.queuemessage.log.test.exchange是綁定關(guān)系,而message.log.test2.exchange沒有綁定隊列

1.發(fā)送時失敗

發(fā)送時失敗,rabbitmq有兩種情況是屬于發(fā)送時失敗。

  • 消息未到rabbitmq的交換機(jī)(exchange)
  • 消息到達(dá)了rabbitmq的交換機(jī)(exchange),但是沒有到達(dá)隊列(queue)

第一種的解決方式是使用confirm機(jī)制。第二種解決方式則是使用return機(jī)制。

使用confirm機(jī)制

模擬場景

confirm機(jī)制是當(dāng)發(fā)送端的消息沒有到達(dá)rabbitmq的交換機(jī)(exchange)時,會觸發(fā)confirm方法,告訴發(fā)送端該消息沒有到達(dá)rabbitmq,需要做業(yè)務(wù)處理。
這里我們發(fā)送消息到rabbitmq不存在的交換機(jī)上,就可以模擬上述場景。

實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口

/**
 * 當(dāng)消息沒有到達(dá)Rabbitmq的交換機(jī)時觸發(fā)該方法(當(dāng)然到達(dá)了也會觸發(fā),)
 */
@Component
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){

        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *
     * @param correlationData 消息屬性體
     * @param ack 是否成功,成功到達(dá)true,沒有到達(dá),false
     * @param cause rabbitmq自身給的信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        //第一個坑,如果發(fā)送端發(fā)送消息時沒有對correlationData進(jìn)行處理,conirm方法接收到的對象都會是null
        //當(dāng)接收失敗并且correlationData對象為null,證明目前已經(jīng)無法追溯回業(yè)務(wù),可以做業(yè)務(wù)日志處理
        if(!ack&&correlationData==null){
            System.out.println(cause);
            //日志處理。。。

            return;
        }
        //如果接收失敗
        if(!ack){
            System.out.println("消息Id:"+correlationData.getId());
            Message message=correlationData.getReturnedMessage();
            System.out.println("消息體:"+new String(message.getBody()));
            //這里可以持久化業(yè)務(wù)消息體到數(shù)據(jù)庫,然后定時去進(jìn)行補(bǔ)償處理或者重試等等
            return;
        }

        //處理完成

    }
}

發(fā)送端代碼

/**
 * 消息的推送
 * @return
 */
@PostMapping("push")
public boolean push(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名稱");
    testMessage.setBusinessId("業(yè)務(wù)Id");

    //定義CorrelationData對象以及消息屬性。不然comfirm方法無論失敗還是成功,CorrelationData參數(shù)永遠(yuǎn)是null
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //傳遞業(yè)務(wù)數(shù)據(jù)
    correlationData.setReturnedMessage(new Message(JSONObject.toJSON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties()));

  //發(fā)送消息(這里發(fā)送給了message.log.test.exchange11交換機(jī),但實(shí)際rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData);

    return true;
}

這里是我踩的第一個坑,如果發(fā)送端不定義correlationData,那么confirm接收到的correlationData對象參數(shù) 都會是null

實(shí)現(xiàn)效果

使用return機(jī)制

模擬場景

當(dāng)消息到達(dá)了rabbitmq的交換機(jī)的時候,但是又沒有到達(dá)隊列,那么就會觸發(fā)return方法。
下面我們定義一個沒有綁定隊列的交換機(jī),然后發(fā)送消息到交換機(jī),就可以模擬上述場景

實(shí)現(xiàn)RabbitTemplate.ReturnCallback

/**
 * 當(dāng)消息沒有到達(dá)Rabbitmq的隊列時就會觸發(fā)該方法
 */
@Component
public class ReturnCallBack implements RabbitTemplate.ReturnCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * @param message    消息體
     * @param replyCode  返回代碼
     * @param replyText  返回文本
     * @param exchange   交換機(jī)
     * @param routingKey 發(fā)送方定義的路由key
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息標(biāo)識:" + message.getMessageProperties().getDeliveryTag());
        String messageBody = null;
        try {
            messageBody = new String(message.getBody(), "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println("消息:" + messageBody);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);

    }
}

發(fā)送端代碼

/**
 * 消息的推送
 * @return
 */
@PostMapping("push2")
public boolean push2(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名稱2");
    testMessage.setBusinessId("業(yè)務(wù)Id");

    template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString());

    return true;
}

這里需注意消息體需要JSON序列化,不然returnedMessage方法接收的消息body會是亂碼

實(shí)現(xiàn)效果

rabbitmq服務(wù)掛了,造成內(nèi)存的消息丟失。

這個開啟rabbitmq的持久化機(jī)制就好了,開啟之后消息到達(dá)rabbitmq服務(wù),會實(shí)時轉(zhuǎn)入磁盤。這里怎么設(shè)置就不多說了,網(wǎng)上挺多文章可以解答。

不過即使開啟了還是會有一種情況會造成消息丟失,那就是消息即將要持久化到磁盤的那一刻,服務(wù)掛了,就會造成丟失,不過這種情況我也不知道怎么模擬,所以就暫不實(shí)踐了。

發(fā)送到消費(fèi)端消費(fèi)失敗

上面提到默認(rèn)情況下rabbitmq使用的是自動ack的方式,我們將它改成手動ack的方式,就可以解決這個問題。

修改application.yml配置文件

rabbitmq:
 listener:
  simple:
    #開啟手動確認(rèn)
    acknowledge-mode: manual
    #開啟失敗后的重試機(jī)制
    retry:
      enabled: true
      #最多重試3次
      max-attempts: 3

下面我們試一下幾種消費(fèi)端消費(fèi)不成功的場景

消費(fèi)了,但是忘記做手動確認(rèn)ack的操作代碼。

@Component
public class TestConsumer {

    /**
     * 消費(fèi)
     * @param testmessage 消息體
     * @param message 消息屬性
     * @param channel mq通道對象
     */
    @RabbitListener(queues = {"message.loss.test.queue"})
    public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
        System.out.println("消費(fèi)testmessage消息:"+testmessage.getName());
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

效果

效果流程:

  • 第一次用Postman請求之后,控制臺顯示了消息被消費(fèi)的信號。
  • 然后去查看rabbitmq后臺管理剛剛被消費(fèi)的消息以及變?yōu)?strong>Unacked
  • 停止程序后(關(guān)閉消費(fèi)端),過一陣子,后臺管理顯示消息變回了Ready,也就是說重新回到了隊列。
  • 重新啟動程序(開啟消費(fèi)段),消息被重新消費(fèi)。

總而言之,如果消費(fèi)端沒有做手動確認(rèn)的操作,那么在消費(fèi)端還沒關(guān)閉之前,消息會變成Unacked,不會再次被消費(fèi),但一旦消費(fèi)端關(guān)閉了,消息會重新回到隊列,讓消費(fèi)端消費(fèi)。

消費(fèi)過程中,觸發(fā)了未知異常,代碼沒有try catch

/**
 * 消費(fèi)
 * @param testmessage 消息體
 * @param message 消息屬性
 * @param channel mq通道對象
 */
@RabbitListener(queues = {"message.loss.test.queue"})
public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
    System.out.println("消費(fèi)testmessage消息:"+testmessage.getName());
    //故意觸發(fā)異常
    if(!StringUtils.isEmpty(testmessage.getName())){

        throw new RuntimeException("11211");
    }
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

效果1

上面的效果圖顯示,我在觸發(fā)了異常之后,消息重試了三次,也就是我在application.yml 配置的重試三次

如果我去掉重試機(jī)制會是什么效果。

效果2

 效果和忘記做ack操作的效果一樣,消息沒有ack后,消息會變成Unacked狀態(tài),消費(fèi)端關(guān)閉后消息會重新回到隊列,然后重新鏈接的時候,就會再消費(fèi)一次。

總結(jié)

到此這篇關(guān)于Spring boot Rabbitmq消息防丟失實(shí)踐的文章就介紹到這了,更多相關(guān)Spring boot Rabbitmq 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 利用Spring Social輕松搞定微信授權(quán)登錄的方法示例

    利用Spring Social輕松搞定微信授權(quán)登錄的方法示例

    這篇文章主要介紹了利用Spring Social輕松搞定微信授權(quán)登錄的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-12-12
  • 透明化Sharding-JDBC數(shù)據(jù)庫字段加解密方案

    透明化Sharding-JDBC數(shù)據(jù)庫字段加解密方案

    這篇文章主要為大家介紹了透明化Sharding-JDBC數(shù)據(jù)庫字段加解密方案,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-02-02
  • Java Web中解決路徑(絕對路徑與相對路徑)問題

    Java Web中解決路徑(絕對路徑與相對路徑)問題

    這篇文章主要介紹了Java Web中解決路徑問題的相關(guān)資料,java 文件路徑有絕對路徑與相對路徑,這里提供了幾種方法解決所有路徑問題,需要的朋友可以參考下
    2017-01-01
  • Java SpringMVC自學(xué)自講

    Java SpringMVC自學(xué)自講

    本篇文章主要介紹了java SpringMVC——如何獲取請求參數(shù)詳解,詳細(xì)的介紹了每種參數(shù)注解的用法及其實(shí)例。感興趣的小伙伴們可以參考一下
    2021-09-09
  • java的接口解耦方式

    java的接口解耦方式

    這篇文章主要介紹了java的接口解耦方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • SpringCloud-Hystrix實(shí)現(xiàn)原理總結(jié)

    SpringCloud-Hystrix實(shí)現(xiàn)原理總結(jié)

    通過hystrix可以解決雪崩效應(yīng)問題,它提供了資源隔離、降級機(jī)制、融斷、緩存等功能。接下來通過本文給大家分享SpringCloud-Hystrix實(shí)現(xiàn)原理,感興趣的朋友一起看看吧
    2021-05-05
  • Springboot ApplicationRunner的使用解讀

    Springboot ApplicationRunner的使用解讀

    這篇文章主要介紹了Springboot ApplicationRunner的使用解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-05-05
  • SpringBoot利用redis集成消息隊列的方法

    SpringBoot利用redis集成消息隊列的方法

    這篇文章主要介紹了SpringBoot利用redis集成消息隊列的方法,需要的朋友可以參考下
    2017-08-08
  • springboot項(xiàng)目(jar包)指定配置文件啟動圖文教程

    springboot項(xiàng)目(jar包)指定配置文件啟動圖文教程

    這篇文章主要給大家介紹了關(guān)于springboot項(xiàng)目(jar包)指定配置文件啟動的相關(guān)資料,在多環(huán)境部署過程中、及線上運(yùn)維中可能會遇到臨時指定配置文件的情況,需要的朋友可以參考下
    2023-07-07
  • 使用feign發(fā)送http請求解析報錯的問題

    使用feign發(fā)送http請求解析報錯的問題

    這篇文章主要介紹了使用feign發(fā)送http請求解析報錯的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評論