欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot中rabbitmq實(shí)現(xiàn)消息可靠性機(jī)制詳解

 更新時(shí)間:2021年09月25日 10:33:27   作者:每一個(gè)不曾起舞的日子,都是對(duì)生命的辜負(fù)  
這篇文章主要介紹了springboot中rabbitmq實(shí)現(xiàn)消息可靠性機(jī)制詳解,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下

1. 生產(chǎn)者模塊通過(guò)publisher confirm機(jī)制實(shí)現(xiàn)消息可靠性

 1.1 生產(chǎn)者模塊導(dǎo)入rabbitmq相關(guān)依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--用于mq消息的序列化與反序列化-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

1.2 配置文件中進(jìn)行mq的相關(guān)配置

spring.rabbitmq.host=10.128.240.183
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
        
spring.rabbitmq.publisher-confirm-type=correlated
        
spring.rabbitmq.publisher-returns=true
        
spring.rabbitmq.template.mandatory=true
  • publish-confirm-type:開(kāi)啟publisher-confirm,有以下可選值

simple:同步等待confirm結(jié)果,直到超時(shí)
correlated:異步回調(diào),定義ConfirmCallback。mq返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback

  • publish-returns:開(kāi)啟publish-return功能??梢远xReturnCallback
  • template.mandatory: 定義消息路由失敗的策略

true:調(diào)用ReturnCallback
false:直接丟棄消息

1.3 定義ReturnCallback(消息投遞到隊(duì)列失敗觸發(fā)此回調(diào))

  • 每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback。
  • 當(dāng)消息投遞失敗,就會(huì)調(diào)用生產(chǎn)者的returnCallback中定義的處理邏輯
  • 可以在容器啟動(dòng)時(shí)就配置這個(gè)回調(diào)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對(duì)象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發(fā)消息
        });
    }
}

1.4 定義ConfirmCallback(消息到達(dá)交換機(jī)觸發(fā)此回調(diào))

可以為redisTemplate指定一個(gè)統(tǒng)一的確認(rèn)回調(diào)

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對(duì)象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發(fā)消息
        });
 
        
        // 設(shè)置統(tǒng)一的confirm回調(diào)。只要消息到達(dá)broker就ack=true
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("這是統(tǒng)一的回調(diào)");
                System.out.println("correlationData:" + correlationData);
                System.out.println("ack:" + b);
                System.out.println("cause:" + s);
            }
        });
    }
}

也可以為特定的消息定制回調(diào)

 @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void testmq() throws InterruptedException {
 
 
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 
        correlationData.getFuture().addCallback(result->{
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投遞到交換機(jī)失敗!消息ID:{}", correlationData.getId());
                // 重發(fā)消息
            }
        },ex->{
            // 記錄日志
            log.error("消息發(fā)送失??!", ex);
            // 重發(fā)消息
        });
        rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData);
    }

2. 消費(fèi)者模塊開(kāi)啟消息確認(rèn)

2.1 添加配置

# 手動(dòng)ack消息,不使用默認(rèn)的消費(fèi)端確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • none:關(guān)閉ack,消息投遞時(shí)不可靠的,可能丟失
  • auto:類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq,沒(méi)有異常,返回
  • ackmanual:我們自己指定什么時(shí)候返回ack

2.2 manual模式在監(jiān)聽(tīng)器中自定義返回ack

@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
 
    @Autowired
    private OrderService orderService;
 
 
    @RabbitHandler
    private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("收到過(guò)期的訂單信息,準(zhǔn)備關(guān)閉訂單" + orderEntity.getOrderSn());
 
        try {
            orderService.closeOrder(orderEntity);
            // 第二個(gè)參數(shù)為false則表示僅確認(rèn)此條消息。如果為true則表示對(duì)收到的多條消息同時(shí)確認(rèn)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 第二個(gè)參數(shù)為ture表示將這個(gè)消息重新加入隊(duì)列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

3. 消費(fèi)者模塊開(kāi)啟消息失敗重試機(jī)制

3.1 配置文件添加配置,開(kāi)啟本地重試

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開(kāi)啟消費(fèi)者失敗重試
          initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒
          multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
  • 開(kāi)啟本地重試,如果消息處理過(guò)程總拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
  • 重試達(dá)到最大次數(shù)后,spring會(huì)返回ack,消息會(huì)被丟棄

4.  消費(fèi)者模塊添加失敗策略(用于開(kāi)啟失敗本地重試功能后)

  • 當(dāng)開(kāi)啟本地重試后,重試最大次數(shù)后消息直接丟棄。
  • 三種策略,都繼承于MessageRecovery接口
  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

4.2 定義處理失敗消息的交換機(jī)和隊(duì)列 沒(méi)有會(huì)自動(dòng)創(chuàng)建相應(yīng)的隊(duì)列、交換機(jī)與綁定關(guān)系,有了就啥也不做

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
 
// 路由鍵為key
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

4.3 向容器中添加一個(gè)失敗策略組件

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    // error為路由鍵
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

到此這篇關(guān)于springboot中rabbitmq實(shí)現(xiàn)消息可靠性的文章就介紹到這了,更多相關(guān)springboot rabbitmq消息可靠性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java指定保留小數(shù)位數(shù)的方法

    Java指定保留小數(shù)位數(shù)的方法

    本篇文章主要介紹了Java指定保留小數(shù)位數(shù)的方法,很多時(shí)候需要規(guī)定保留的小數(shù)位數(shù),這里整理了詳細(xì)的代碼,有需要的小伙伴可以參考下。
    2017-03-03
  • 基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實(shí)踐

    基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實(shí)踐

    這篇文章主要介紹了基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實(shí)踐,需要的朋友可以參考下
    2024-08-08
  • 使用java實(shí)現(xiàn)猜拳小游戲

    使用java實(shí)現(xiàn)猜拳小游戲

    這篇文章主要為大家詳細(xì)介紹了使用java實(shí)現(xiàn)猜拳小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • Java?@Scheduled定時(shí)任務(wù)不執(zhí)行解決辦法

    Java?@Scheduled定時(shí)任務(wù)不執(zhí)行解決辦法

    這篇文章主要給大家介紹了關(guān)于Java?@Scheduled定時(shí)任務(wù)不執(zhí)行解決的相關(guān)資料,當(dāng)@Scheduled定時(shí)任務(wù)不執(zhí)行時(shí)可以根據(jù)以下步驟進(jìn)行排查和解決,需要的朋友可以參考下
    2023-10-10
  • springboot參數(shù)傳中文亂碼的解決方案

    springboot參數(shù)傳中文亂碼的解決方案

    這篇文章主要介紹了springboot參數(shù)傳中文亂碼的解決方案,幫助大家更好的理解和學(xué)習(xí)使用springboot,感興趣的朋友可以了解下
    2021-03-03
  • 深入了解Spring中Bean的作用域和生命周期

    深入了解Spring中Bean的作用域和生命周期

    這篇文章主要介紹了深入了解Spring中Bean的作用域和生命周期,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • IDEA實(shí)現(xiàn)JDBC的操作步驟

    IDEA實(shí)現(xiàn)JDBC的操作步驟

    JDBC提供了一種基準(zhǔn),據(jù)此可以構(gòu)建更高級(jí)的工具和接口,使數(shù)據(jù)庫(kù)開(kāi)發(fā)人員能夠編寫(xiě)數(shù)據(jù)庫(kù)應(yīng)用程序,本文給大家介紹IDEA實(shí)現(xiàn)JDBC的操作步驟,感興趣的朋友一起看看吧
    2022-01-01
  • spring boot配置ssl(多cer格式)超詳細(xì)教程

    spring boot配置ssl(多cer格式)超詳細(xì)教程

    這篇文章主要介紹了spring boot配置ssl(多cer格式)超詳細(xì)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2023-11-11
  • java中的Reference和引用類型實(shí)例精講

    java中的Reference和引用類型實(shí)例精講

    這篇文章主要為大家介紹了java中的Reference和引用類型示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09
  • Java Web項(xiàng)目部署在Tomcat運(yùn)行出錯(cuò)與解決方法示例

    Java Web項(xiàng)目部署在Tomcat運(yùn)行出錯(cuò)與解決方法示例

    這篇文章主要介紹了Java Web項(xiàng)目部署在Tomcat運(yùn)行出錯(cuò)與解決方法,結(jié)合具體實(shí)例形式分析了Java Web項(xiàng)目部署在Tomcat過(guò)程中由于xml配置文件導(dǎo)致的錯(cuò)誤問(wèn)題常見(jiàn)提示與解決方法,需要的朋友可以參考下
    2017-03-03

最新評(píng)論