欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ之死信隊列深入解析

 更新時間:2023年09月20日 11:19:32   作者:JAVA開發(fā)區(qū)  
這篇文章主要介紹了RabbitMQ之死信隊列深入解析,?死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取消息進行消費,需要的朋友可以參考下

1. 死信的概念

死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取消息進行消費,但某些時候由于特定的原因 queue 中的某些消息無法被消費,這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。

應用場景:為了保證訂單業(yè)務的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊列機制,當消息消費發(fā)生異常時,將消息投入死信隊列中。還有比如說:用戶在商場下單成功并點擊支付后在指定時間未支付時自動失效。

2. 死信的來源

  • 消息TLL過期
  • 隊列達到最大長度(隊列滿了,無法再添加數(shù)據(jù)到 mq 中)
  • 消息被拒絕(basic.reject 或 basic.nack )并且 requeue = false

3. 死信實戰(zhàn)

3.1 代碼架構圖

在這里插入圖片描述

3.2 消息 TTL 過期

生產(chǎn)者代碼

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 設置消息的TTL時間
            new AMQP.BasicProperties().builder().expiration("10000").build();
            // 該消息是用作演示隊列個數(shù)限制
            for (int i = 0; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
                System.out.println("生產(chǎn)者發(fā)送消息:" + message);
            }
        }
    }
}

消費者 C1 代碼( 啟動之后關閉消費者 模擬其接收不到消息 )

public class Consumer01 {
    // 普通交換機名稱
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交換機名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 聲明死信和普通交換機 類型為direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
        channel.exchangeDeclare(DEAD_EXCHANGE, "direct");
        // 聲明死信隊列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        // 死信隊列綁定死信交換機 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        // 正常隊列綁定死信隊列信息
        Map<String, Object> params = new HashMap<>();
        // 正常隊列設置死信交換機 參數(shù)key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常隊列設置死信 routing-key 參數(shù)是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, null);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接受消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接受消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

生產(chǎn)者未發(fā)送消息

在這里插入圖片描述

生產(chǎn)者發(fā)送了10條消息,此時正常消息隊列有10條未消費信息

在這里插入圖片描述

時間過去10秒,正常隊列里面的消息由于沒有被消費,消息進入死信隊列

在這里插入圖片描述

消費者 C2 代碼(以上步驟完成后 啟動 C2 消費者 它消費死信隊列里面的消息)

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, "direct");
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接受死信隊列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接受死信隊列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

在這里插入圖片描述

3.3 隊列達到最大長度

消費生產(chǎn)者代碼去掉 TTL 屬性

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 該消息是用作演示隊列個數(shù)限制
            for (int i = 0; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
                System.out.println("生產(chǎn)者發(fā)送消息:" + message);
            }
        }
    }
}

C1 消費者修改以下代碼(啟動之后關閉該消費者 模擬其接收不到消息)

  // 正常隊列綁定死信隊列信息
  Map<String, Object> params = new HashMap<>();
  // 正常隊列設置死信交換機 參數(shù)key 是固定值
  params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  // 正常隊列設置死信 routing-key 參數(shù)是固定值
  params.put("x-dead-letter-routing-key", "lisi");
  // 設置正常隊列長度的限制
  params.put("x-max-length", 6);// 添加該代碼

注意此時需要把原先隊列刪除,因為參數(shù)改變了

C2 消費者代碼不變(啟動 C2 消費者)

在這里插入圖片描述

3.4 消息被拒

消息生產(chǎn)者代碼同上生產(chǎn)者一致

C1 消費者代碼(啟動之后關閉該消費者,模擬接收不到消息)

public class Consumer01 {
    // 普通交換機名稱
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交換機名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 聲明死信和普通交換機 類型為direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
        channel.exchangeDeclare(DEAD_EXCHANGE, "direct");
        // 聲明死信隊列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        // 死信隊列綁定死信交換機 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        // 正常隊列綁定死信隊列信息
        Map<String, Object> params = new HashMap<>();
        // 正常隊列設置死信交換機 參數(shù)key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常隊列設置死信 routing-key 參數(shù)是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, null);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接受消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");
                // requeue 設置為false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發(fā)送到死信隊列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接受消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
        });
    }
}

生產(chǎn)者發(fā)送消息之后

在這里插入圖片描述

C2 消費者代碼不變

啟動消費者1 然后再啟動消費者2

在這里插入圖片描述

到此這篇關于RabbitMQ之死信隊列深入解析的文章就介紹到這了,更多相關RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot2.3新特性優(yōu)雅停機詳解

    SpringBoot2.3新特性優(yōu)雅停機詳解

    這篇文章主要介紹了SpringBoot2.3新特性優(yōu)雅停機詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-05-05
  • 詳解Springboot配置文件的使用

    詳解Springboot配置文件的使用

    在springboot項目中,也可以使用yml類型的配置文件代替properties文件。接下來通過本文給大家分享Springboot配置文件的使用,感興趣的朋友一起看看吧
    2017-07-07
  • 2個java希爾排序示例

    2個java希爾排序示例

    java希爾排序示例,希爾排序是插入排序的一種類型,也可以用一個形象的叫法縮小增量法,需要的朋友可以參考下
    2014-05-05
  • 如何通過Kaptcha在Web頁面生成驗證碼

    如何通過Kaptcha在Web頁面生成驗證碼

    這篇文章主要介紹了如何通過Kaptcha在Web頁面生成驗證碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-10-10
  • SpringBoot中的maven插件spring-boot-maven-plugin使用

    SpringBoot中的maven插件spring-boot-maven-plugin使用

    這篇文章主要介紹了SpringBoot中的maven插件spring-boot-maven-plugin使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Springquartz的配置方式詳解

    Springquartz的配置方式詳解

    本文介紹了在Spring框架中使用Quartz進行任務調(diào)度的三種方式:使用@Scheduled注解、XML配置和Java配置,每種方式都有其特點和適用場景,感興趣的朋友一起看看吧
    2025-01-01
  • Java Base64解碼錯誤及解決方法

    Java Base64解碼錯誤及解決方法

    本篇文章給大家從一個Java Base64解碼錯誤著手給大家分析了錯誤的原因以及解決辦法,有興趣的可以參考學習下。
    2018-02-02
  • Mybatis把返回結果封裝成map類型的實現(xiàn)

    Mybatis把返回結果封裝成map類型的實現(xiàn)

    本文主要介紹了Mybatis把返回結果封裝成map類型的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03
  • Java實現(xiàn)順序表的增刪查改功能

    Java實現(xiàn)順序表的增刪查改功能

    這篇文章主要介紹了Java實現(xiàn)順序表的增刪查改功能,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-04-04
  • SpringMVC中文亂碼踩坑記錄

    SpringMVC中文亂碼踩坑記錄

    這篇文章主要介紹了SpringMVC中文亂碼踩坑記錄,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-08-08

最新評論