欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring boot Rabbitmq消息防丟失實(shí)踐

 更新時(shí)間:2022年09月25日 10:05:27   作者:我贏了算我輸???????  
這篇文章主要介紹了Spring boot Rabbitmq消息防丟失實(shí)踐,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下

前言

之前看很多網(wǎng)上大佬的防丟失的文章,文章中理論知識(shí)偏多,所以自己想著實(shí)踐一下,實(shí)踐過(guò)程中也踩了一些坑,因此寫出了這篇文章。如果文章有誤人子弟的地方,望在評(píng)論區(qū)指出。

導(dǎo)致消息出現(xiàn)丟失的原因

  • 發(fā)送時(shí)失敗,指發(fā)送端發(fā)送完消息準(zhǔn)備到達(dá)消息隊(duì)列的過(guò)程中,因網(wǎng)絡(luò)波動(dòng)、消息隊(duì)列服務(wù)宕機(jī)等,消息隊(duì)列服務(wù)無(wú)法接收消息,所以導(dǎo)致了丟失。
  • 到達(dá)時(shí)宕機(jī),消息隊(duì)列服務(wù)接收到消息之后,如果沒(méi)有開(kāi)啟持久化,消息會(huì)存儲(chǔ)在內(nèi)存中(當(dāng)然內(nèi)存吃緊的話,也會(huì)轉(zhuǎn)入磁盤,緩解內(nèi)存),如果這個(gè)時(shí)候服務(wù)掛了,那么內(nèi)存中的消息就會(huì)丟失。
  • 發(fā)送到消費(fèi)端失敗,消費(fèi)端接收到了消息的時(shí)候,消費(fèi)端服務(wù)掛了,而rabbitmq默認(rèn)自動(dòng)ack,也就是說(shuō)rabbitmq發(fā)送到消費(fèi)端,一旦認(rèn)定了消費(fèi)端接收了,無(wú)論有無(wú)消費(fèi)成功,rabbitmq都認(rèn)為是發(fā)送成功。

下面我們以這三種情況進(jìn)行實(shí)踐。

環(huán)境

jdk1.8
Spring boot 2.3.7.RELEASE
Spring-boot-starter-amqp 2.3.7.RELEASE
Rabbitmq 3.7.7

準(zhǔn)備工作

我事先準(zhǔn)備了好了交換機(jī)以及隊(duì)列:

  • 交換機(jī):message.log.test.exchangemessage.log.test2.exchange
  • 隊(duì)列:message.loss.test.queue

其中message.loss.test.queuemessage.log.test.exchange是綁定關(guān)系,而message.log.test2.exchange沒(méi)有綁定隊(duì)列

1.發(fā)送時(shí)失敗

發(fā)送時(shí)失敗,rabbitmq有兩種情況是屬于發(fā)送時(shí)失敗。

  • 消息未到rabbitmq的交換機(jī)(exchange)
  • 消息到達(dá)了rabbitmq的交換機(jī)(exchange),但是沒(méi)有到達(dá)隊(duì)列(queue)

第一種的解決方式是使用confirm機(jī)制。第二種解決方式則是使用return機(jī)制。

使用confirm機(jī)制

模擬場(chǎng)景

confirm機(jī)制是當(dāng)發(fā)送端的消息沒(méi)有到達(dá)rabbitmq的交換機(jī)(exchange)時(shí),會(huì)觸發(fā)confirm方法,告訴發(fā)送端該消息沒(méi)有到達(dá)rabbitmq,需要做業(yè)務(wù)處理。
這里我們發(fā)送消息到rabbitmq不存在的交換機(jī)上,就可以模擬上述場(chǎng)景。

實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口

/**
 * 當(dāng)消息沒(méi)有到達(dá)Rabbitmq的交換機(jī)時(shí)觸發(fā)該方法(當(dāng)然到達(dá)了也會(huì)觸發(fā),)
 */
@Component
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){

        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *
     * @param correlationData 消息屬性體
     * @param ack 是否成功,成功到達(dá)true,沒(méi)有到達(dá),false
     * @param cause rabbitmq自身給的信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        //第一個(gè)坑,如果發(fā)送端發(fā)送消息時(shí)沒(méi)有對(duì)correlationData進(jìn)行處理,conirm方法接收到的對(duì)象都會(huì)是null
        //當(dāng)接收失敗并且correlationData對(duì)象為null,證明目前已經(jīng)無(wú)法追溯回業(yè)務(wù),可以做業(yè)務(wù)日志處理
        if(!ack&&correlationData==null){
            System.out.println(cause);
            //日志處理。。。

            return;
        }
        //如果接收失敗
        if(!ack){
            System.out.println("消息Id:"+correlationData.getId());
            Message message=correlationData.getReturnedMessage();
            System.out.println("消息體:"+new String(message.getBody()));
            //這里可以持久化業(yè)務(wù)消息體到數(shù)據(jù)庫(kù),然后定時(shí)去進(jìn)行補(bǔ)償處理或者重試等等
            return;
        }

        //處理完成

    }
}

發(fā)送端代碼

/**
 * 消息的推送
 * @return
 */
@PostMapping("push")
public boolean push(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名稱");
    testMessage.setBusinessId("業(yè)務(wù)Id");

    //定義CorrelationData對(duì)象以及消息屬性。不然comfirm方法無(wú)論失敗還是成功,CorrelationData參數(shù)永遠(yuǎn)是null
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //傳遞業(yè)務(wù)數(shù)據(jù)
    correlationData.setReturnedMessage(new Message(JSONObject.toJSON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties()));

  //發(fā)送消息(這里發(fā)送給了message.log.test.exchange11交換機(jī),但實(shí)際rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData);

    return true;
}

這里是我踩的第一個(gè)坑,如果發(fā)送端不定義correlationData,那么confirm接收到的correlationData對(duì)象參數(shù) 都會(huì)是null

實(shí)現(xiàn)效果

使用return機(jī)制

模擬場(chǎng)景

當(dāng)消息到達(dá)了rabbitmq的交換機(jī)的時(shí)候,但是又沒(méi)有到達(dá)隊(duì)列,那么就會(huì)觸發(fā)return方法。
下面我們定義一個(gè)沒(méi)有綁定隊(duì)列的交換機(jī),然后發(fā)送消息到交換機(jī),就可以模擬上述場(chǎng)景

實(shí)現(xiàn)RabbitTemplate.ReturnCallback

/**
 * 當(dāng)消息沒(méi)有到達(dá)Rabbitmq的隊(duì)列時(shí)就會(huì)觸發(fā)該方法
 */
@Component
public class ReturnCallBack implements RabbitTemplate.ReturnCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * @param message    消息體
     * @param replyCode  返回代碼
     * @param replyText  返回文本
     * @param exchange   交換機(jī)
     * @param routingKey 發(fā)送方定義的路由key
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息標(biāo)識(shí):" + message.getMessageProperties().getDeliveryTag());
        String messageBody = null;
        try {
            messageBody = new String(message.getBody(), "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println("消息:" + messageBody);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);

    }
}

發(fā)送端代碼

/**
 * 消息的推送
 * @return
 */
@PostMapping("push2")
public boolean push2(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名稱2");
    testMessage.setBusinessId("業(yè)務(wù)Id");

    template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString());

    return true;
}

這里需注意消息體需要JSON序列化,不然returnedMessage方法接收的消息body會(huì)是亂碼

實(shí)現(xiàn)效果

rabbitmq服務(wù)掛了,造成內(nèi)存的消息丟失。

這個(gè)開(kāi)啟rabbitmq的持久化機(jī)制就好了,開(kāi)啟之后消息到達(dá)rabbitmq服務(wù),會(huì)實(shí)時(shí)轉(zhuǎn)入磁盤。這里怎么設(shè)置就不多說(shuō)了,網(wǎng)上挺多文章可以解答。

不過(guò)即使開(kāi)啟了還是會(huì)有一種情況會(huì)造成消息丟失,那就是消息即將要持久化到磁盤的那一刻,服務(wù)掛了,就會(huì)造成丟失,不過(guò)這種情況我也不知道怎么模擬,所以就暫不實(shí)踐了。

發(fā)送到消費(fèi)端消費(fèi)失敗

上面提到默認(rèn)情況下rabbitmq使用的是自動(dòng)ack的方式,我們將它改成手動(dòng)ack的方式,就可以解決這個(gè)問(wèn)題。

修改application.yml配置文件

rabbitmq:
 listener:
  simple:
    #開(kāi)啟手動(dòng)確認(rèn)
    acknowledge-mode: manual
    #開(kāi)啟失敗后的重試機(jī)制
    retry:
      enabled: true
      #最多重試3次
      max-attempts: 3

下面我們?cè)囈幌聨追N消費(fèi)端消費(fèi)不成功的場(chǎng)景

消費(fèi)了,但是忘記做手動(dòng)確認(rèn)ack的操作代碼。

@Component
public class TestConsumer {

    /**
     * 消費(fèi)
     * @param testmessage 消息體
     * @param message 消息屬性
     * @param channel mq通道對(duì)象
     */
    @RabbitListener(queues = {"message.loss.test.queue"})
    public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
        System.out.println("消費(fèi)testmessage消息:"+testmessage.getName());
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

效果

效果流程:

  • 第一次用Postman請(qǐng)求之后,控制臺(tái)顯示了消息被消費(fèi)的信號(hào)。
  • 然后去查看rabbitmq后臺(tái)管理剛剛被消費(fèi)的消息以及變?yōu)?strong>Unacked
  • 停止程序后(關(guān)閉消費(fèi)端),過(guò)一陣子,后臺(tái)管理顯示消息變回了Ready,也就是說(shuō)重新回到了隊(duì)列。
  • 重新啟動(dòng)程序(開(kāi)啟消費(fèi)段),消息被重新消費(fèi)。

總而言之,如果消費(fèi)端沒(méi)有做手動(dòng)確認(rèn)的操作,那么在消費(fèi)端還沒(méi)關(guān)閉之前,消息會(huì)變成Unacked,不會(huì)再次被消費(fèi),但一旦消費(fèi)端關(guān)閉了,消息會(huì)重新回到隊(duì)列,讓消費(fèi)端消費(fèi)。

消費(fèi)過(guò)程中,觸發(fā)了未知異常,代碼沒(méi)有try catch

/**
 * 消費(fèi)
 * @param testmessage 消息體
 * @param message 消息屬性
 * @param channel mq通道對(duì)象
 */
@RabbitListener(queues = {"message.loss.test.queue"})
public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
    System.out.println("消費(fèi)testmessage消息:"+testmessage.getName());
    //故意觸發(fā)異常
    if(!StringUtils.isEmpty(testmessage.getName())){

        throw new RuntimeException("11211");
    }
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

效果1

上面的效果圖顯示,我在觸發(fā)了異常之后,消息重試了三次,也就是我在application.yml 配置的重試三次

如果我去掉重試機(jī)制會(huì)是什么效果。

效果2

 效果和忘記做ack操作的效果一樣,消息沒(méi)有ack后,消息會(huì)變成Unacked狀態(tài),消費(fèi)端關(guān)閉后消息會(huì)重新回到隊(duì)列,然后重新鏈接的時(shí)候,就會(huì)再消費(fèi)一次。

總結(jié)

到此這篇關(guān)于Spring boot Rabbitmq消息防丟失實(shí)踐的文章就介紹到這了,更多相關(guān)Spring boot Rabbitmq 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 利用Spring Social輕松搞定微信授權(quán)登錄的方法示例

    利用Spring Social輕松搞定微信授權(quán)登錄的方法示例

    這篇文章主要介紹了利用Spring Social輕松搞定微信授權(quán)登錄的方法示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-12-12
  • 透明化Sharding-JDBC數(shù)據(jù)庫(kù)字段加解密方案

    透明化Sharding-JDBC數(shù)據(jù)庫(kù)字段加解密方案

    這篇文章主要為大家介紹了透明化Sharding-JDBC數(shù)據(jù)庫(kù)字段加解密方案,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-02-02
  • Java Web中解決路徑(絕對(duì)路徑與相對(duì)路徑)問(wèn)題

    Java Web中解決路徑(絕對(duì)路徑與相對(duì)路徑)問(wèn)題

    這篇文章主要介紹了Java Web中解決路徑問(wèn)題的相關(guān)資料,java 文件路徑有絕對(duì)路徑與相對(duì)路徑,這里提供了幾種方法解決所有路徑問(wèn)題,需要的朋友可以參考下
    2017-01-01
  • Java SpringMVC自學(xué)自講

    Java SpringMVC自學(xué)自講

    本篇文章主要介紹了java SpringMVC——如何獲取請(qǐng)求參數(shù)詳解,詳細(xì)的介紹了每種參數(shù)注解的用法及其實(shí)例。感興趣的小伙伴們可以參考一下
    2021-09-09
  • java的接口解耦方式

    java的接口解耦方式

    這篇文章主要介紹了java的接口解耦方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • SpringCloud-Hystrix實(shí)現(xiàn)原理總結(jié)

    SpringCloud-Hystrix實(shí)現(xiàn)原理總結(jié)

    通過(guò)hystrix可以解決雪崩效應(yīng)問(wèn)題,它提供了資源隔離、降級(jí)機(jī)制、融斷、緩存等功能。接下來(lái)通過(guò)本文給大家分享SpringCloud-Hystrix實(shí)現(xiàn)原理,感興趣的朋友一起看看吧
    2021-05-05
  • Springboot ApplicationRunner的使用解讀

    Springboot ApplicationRunner的使用解讀

    這篇文章主要介紹了Springboot ApplicationRunner的使用解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-05-05
  • SpringBoot利用redis集成消息隊(duì)列的方法

    SpringBoot利用redis集成消息隊(duì)列的方法

    這篇文章主要介紹了SpringBoot利用redis集成消息隊(duì)列的方法,需要的朋友可以參考下
    2017-08-08
  • springboot項(xiàng)目(jar包)指定配置文件啟動(dòng)圖文教程

    springboot項(xiàng)目(jar包)指定配置文件啟動(dòng)圖文教程

    這篇文章主要給大家介紹了關(guān)于springboot項(xiàng)目(jar包)指定配置文件啟動(dòng)的相關(guān)資料,在多環(huán)境部署過(guò)程中、及線上運(yùn)維中可能會(huì)遇到臨時(shí)指定配置文件的情況,需要的朋友可以參考下
    2023-07-07
  • 使用feign發(fā)送http請(qǐng)求解析報(bào)錯(cuò)的問(wèn)題

    使用feign發(fā)送http請(qǐng)求解析報(bào)錯(cuò)的問(wèn)題

    這篇文章主要介紹了使用feign發(fā)送http請(qǐng)求解析報(bào)錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評(píng)論