欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot基于RabbitMQ實(shí)現(xiàn)消息可靠性的方法

 更新時間:2024年04月19日 08:52:59   作者:請回答1024  
RabbitMQ 提供了 publisher confirm 機(jī)制來避免消息發(fā)送到 MQ 過程中丟失,這種機(jī)制必須給每個消息指定一個唯一ID,消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功,本文給大家介紹了SpringBoot基于RabbitMQ實(shí)現(xiàn)消息可靠性的方法,需要的朋友可以參考下

1.概述

消息從發(fā)送到消費(fèi)者接收 會經(jīng)歷的過程如下:

在這里插入圖片描述

丟失消息的可能性

  • 發(fā)送時丟失:
    • 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
    • 消息到達(dá)exchange后未到達(dá)queue
  • MQ宕機(jī),queue將消息丟失
  • consumer接收到消息后未消費(fèi)就宕機(jī)

針對這些問題,RabbitMQ分別給出了解決方案

  • 生產(chǎn)者確認(rèn)機(jī)制
  • mq持久化
  • 消費(fèi)者確認(rèn)機(jī)制
  • 失敗重試機(jī)制

2. 生產(chǎn)者消息確認(rèn)

2.1 概述

RabbitMQ 提供了 publisher confirm 機(jī)制來避免消息發(fā)送到 MQ 過程中丟失。這種機(jī)制必須給每個消息指定一個唯一ID。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。

返回結(jié)果有兩種方式:

  • publisher-confirm,發(fā)送者確認(rèn)
    • 消息成功投遞到交換機(jī),返回ack
    • 消息未投遞到交換機(jī),返回nack
  • publisher-return,發(fā)送者回執(zhí)
    • 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。

image.png

image.png

2.2 實(shí)戰(zhàn)

2.2.1 修改配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

配置說明:

  • publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
    • simple:同步等待confirm結(jié)果,直到超時
    • correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
  • publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
  • template.mandatory:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息

2.2.2 定義 Return 回調(diào)

每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項(xiàng)目加載時配置:
修改publisher服務(wù),添加一個:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 設(shè)置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投遞失敗,記錄日志
            log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有業(yè)務(wù)需要,可以重發(fā)消息
        });
    }
}

2.2.3 定義ConfirmCallback

ConfirmCallback 可以在發(fā)送消息時指定,因?yàn)槊總€業(yè)務(wù)處理 confirm 成功或失敗的邏輯不一定相同。

public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息體
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封裝到 CorrelationData 中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失敗
                log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.發(fā)送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一會兒,等待ack回執(zhí)
    //Thread.sleep(20);
}

3. 消息持久化

生產(chǎn)者確認(rèn)可以確保消息投遞到 RabbitMQ 的隊(duì)列中,但是消息發(fā)送到 RabbitMQ 以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。

  • 交換機(jī)持久化
  • 隊(duì)列持久化
  • 消息持久化

3.1 交換機(jī)持久化

@Bean
public DirectExchange simpleExchange(){
    // 三個參數(shù):①交換機(jī)名稱、②是否持久化、③當(dāng)沒有queue與其綁定時是否自動刪除
    return new DirectExchange("simple.direct", true, false);
}

事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。

3.2 隊(duì)列持久化

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。

3.3 消息持久化

默認(rèn)情況下,SpringAMQP 交換機(jī) 隊(duì)列 以及發(fā)出的任何消息都是持久化的,不用特意指定。

4. 消費(fèi)者消息確認(rèn)

RabbitMQ 是 閱后即焚 機(jī)制,RabbitMQ 確認(rèn)消息被消費(fèi)者消費(fèi)后會立刻刪除。

而 RabbitMQ 是通過 消費(fèi)者回執(zhí) 來確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向 RabbitMQ 發(fā)送 ACK 回執(zhí),表明自己已經(jīng)處理消息。

設(shè)想這樣的場景:

  • 1)RabbitMQ投遞消息給消費(fèi)者
  • 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除消息
  • 4)消費(fèi)者宕機(jī),消息尚未處理

這樣,消息就丟失了。因此消費(fèi)者返回ACK的時機(jī)非常重要。

4.1 三種確認(rèn)模式

而 SpringAMQP 則允許配置三種確認(rèn)模式:
•manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
•auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack。
•none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會成功處理,因此消息投遞后立即被刪除(存在丟失消息的風(fēng)險)。

由此可知:

  • none 模式下,消息投遞是不可靠的,可能丟失
  • auto 模式類似事務(wù)機(jī)制,出現(xiàn)異常時返回nack,消息回滾到mq;沒有異常,返回ack
  • manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時候該ack

一般,我們都是使用默認(rèn)的 auto 即可。

相關(guān)配置

spring:
  rabbitmq:
    listener:
      simple:
        #acknowledge-mode: none # 關(guān)閉ack
        #acknowledge-mode: manual  # 手動ack
        acknowledge-mode: auto # 自動ack

4.2 消息失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會不斷 requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次 requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:怎么辦呢?

4.2.1 本地重試機(jī)制

我們可以利用 Spring 的 retry 機(jī)制,在消費(fèi)者出現(xiàn)異常時利用 本地重試,而不是無限制的 requeue 到 mq 隊(duì)列。

修改 consumer 服務(wù)的 application.yml 文件,添加內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費(fèi)者失敗重試
          initial-interval: 1000 # 初始的失敗等待時長為1秒
          multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟consumer服務(wù),重復(fù)之前的測試。可以發(fā)現(xiàn):

  • 在重試3次后,SpringAMQP 會拋出異常 AmqpRejectAndDontRequeueException,說明本地重試觸發(fā)了。
  • 查看 RabbitMQ 控制臺,發(fā)現(xiàn)消息被刪除了,說明最后 SpringAMQP 返回的是ack,mq刪除消息了。

結(jié)論

  • 開啟本地重試時,消息處理過程中拋出異常,不會 requeue 到隊(duì)列,而是在消費(fèi)者本地重試
  • 重試達(dá)到最大次數(shù)后,Spring 會返回 ack,消息會被丟棄。

4.2.2 失敗策略

在之前的測試中,達(dá)到最大重試次數(shù)后,消息會被丟棄,這是由 Spring 內(nèi)部機(jī)制決定的。

在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有 MessageRecovery 接口來處理,它包含三種不同的實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接 reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回 nack,消息重新入隊(duì)
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列

2)定義一個RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)

代碼如下

@Configuration
public class ErrorMessageConfig {
    @Bean	//	處理失敗消息的交換機(jī)
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean	//	處理失敗消息的隊(duì)列
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

其實(shí) 我們在生產(chǎn)中會指定死信交換機(jī)來處理失敗的消息

5. 總結(jié)

如何確保RabbitMQ消息的可靠性?

  • 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
  • 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會丟失
  • 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
  • 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理

以上就是SpringBoot基于RabbitMQ實(shí)現(xiàn)消息可靠性的方法的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ消息可靠性的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java反射與Fastjson的危險反序列化詳解

    Java反射與Fastjson的危險反序列化詳解

    在?Java?中,Computer.class是一個引用,它表示了?Computer?的字節(jié)碼對象(Class對象),這個對象被廣泛應(yīng)用于反射、序列化等操作中,那么為什么?parseObject?需要這個引用呢,帶著這個問題我們一起通過本文學(xué)習(xí)下吧
    2024-07-07
  • SpringMVC中的@RequestMapping注解的使用詳細(xì)教程

    SpringMVC中的@RequestMapping注解的使用詳細(xì)教程

    @RequestMapping注解的作用就是將請求和處理請求的控制器方法關(guān)聯(lián)起來,建立映射關(guān)系,本文主要來和大家詳細(xì)講講它的具體使用,感興趣的可以了解一下
    2023-07-07
  • Mybatis 實(shí)現(xiàn)打印sql語句的代碼

    Mybatis 實(shí)現(xiàn)打印sql語句的代碼

    這篇文章主要介紹了Mybatis 實(shí)現(xiàn)打印sql語句的代碼,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • Java類的繼承原理與用法分析

    Java類的繼承原理與用法分析

    這篇文章主要介紹了Java類的繼承原理與用法,結(jié)合實(shí)例形式分析了java類的繼承相關(guān)原理、使用方法及操作注意事項(xiàng),需要的朋友可以參考下
    2020-02-02
  • Java正則匹配中文的方法實(shí)例分析

    Java正則匹配中文的方法實(shí)例分析

    這篇文章主要介紹了Java正則匹配中文的方法,結(jié)合實(shí)例形式分析了Java針對中文、標(biāo)點(diǎn)及引號等匹配操作相關(guān)技巧,需要的朋友可以參考下
    2017-03-03
  • Java使用Tesseract-OCR實(shí)戰(zhàn)教程

    Java使用Tesseract-OCR實(shí)戰(zhàn)教程

    本文介紹了如何在Java中使用Tesseract-OCR進(jìn)行文本提取,包括Tesseract-OCR的安裝、中文訓(xùn)練庫的配置、依賴庫的引入以及具體的代碼實(shí)現(xiàn),通過這個過程,我們將演示如何從視頻幀中提取文本
    2025-02-02
  • 出現(xiàn)java.lang.NoSuchMethodException異常的解決(靠譜)

    出現(xiàn)java.lang.NoSuchMethodException異常的解決(靠譜)

    這篇文章主要介紹了出現(xiàn)java.lang.NoSuchMethodException異常的解決方案(靠譜),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • SpringBoot項(xiàng)目集成Flyway詳細(xì)過程

    SpringBoot項(xiàng)目集成Flyway詳細(xì)過程

    今天帶大家學(xué)習(xí)SpringBoot項(xiàng)目集成Flyway詳細(xì)過程,文中有非常詳細(xì)的介紹及代碼示例,對正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下
    2021-05-05
  • SpringBoot 創(chuàng)建容器的實(shí)現(xiàn)

    SpringBoot 創(chuàng)建容器的實(shí)現(xiàn)

    這篇文章主要介紹了SpringBoot 創(chuàng)建容器的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • spring boot項(xiàng)目中MongoDB的使用方法

    spring boot項(xiàng)目中MongoDB的使用方法

    前段時間分享了關(guān)于Spring Boot中使用Redis的文章,除了Redis之后,我們在互聯(lián)網(wǎng)產(chǎn)品中還經(jīng)常會用到另外一款著名的NoSQL數(shù)據(jù)庫MongoDB。下面這篇文章主要給大家介紹了關(guān)于在spring boot項(xiàng)目中MongoDB的使用方法,需要的朋友可以參考下。
    2017-09-09

最新評論