欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot集成RabbitMQ死信隊列的實現(xiàn)

 更新時間:2021年09月16日 09:43:59   作者:小伙子你那什么車啊  
在大多數(shù)的MQ中間件中,都有死信隊列的概念。本文主要介紹了Springboot集成RabbitMQ死信隊列的實現(xiàn),具有一定的參考價值,感興趣的小伙伴們可以參考一下

關(guān)于死信隊列

在大多數(shù)的MQ中間件中,都有死信隊列的概念。死信隊列同其他的隊列一樣都是普通的隊列。在RabbitMQ中并沒有特定的“死信隊列”類型,而是通過配置,將其實現(xiàn)。
當我們在創(chuàng)建一個業(yè)務(wù)的交換機和隊列的時候,可以配置參數(shù),指明另一個隊列為當前隊列的死信隊列,在RabbitMQ中,死信隊列(嚴格的說應(yīng)該是死信交換機)被稱為DLX Exchange。當消息“死掉”后,會被自動路由到DLX Exchange的queue中。

什么樣的消息會進入死信隊列?

1.消息的TTL過期。
2.消費者對broker應(yīng)答Nack,并且消息禁止重回隊列。
3.Queue隊列長度已達上限。

場景分析

以用戶訂單支付為場景。在各大電商平臺上,訂單的都有待支付時間,通常為30min。當用戶超過30min未支付訂單,該訂單的狀態(tài)應(yīng)該會變成“超時取消”,或類似的狀態(tài)值的改變。
如果不使用MQ,可以設(shè)計一個定時任務(wù),定時查詢數(shù)據(jù)庫,判斷訂單的狀態(tài)和支付時間是否已經(jīng)到期,若到期則修改訂單的狀態(tài)。但顯然,這不是一個很好的操作,頻繁訪問數(shù)據(jù)庫,造成不必要的資源浪費。
使用MQ,我們可以在下單的時候,當訂單數(shù)據(jù)入庫后,發(fā)送一條Message到Queue中,并設(shè)置過期時間為30min或自定義的支付過期時間。

   /**
     * 發(fā)送帶有過期時間的消息
     */
    @GetMapping("/sendDlx")
    public void sendDlx() {
        Order order = new Order();
        order.setItemId(1);
        order.setStatus(1);
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, 
                JSON.toJSONString(order), message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            // 模擬,設(shè)置10S后消息過期
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
    }

若30min后,還未有消費者(下游服務(wù))消費這條消息,那么該條消息就會被路由到死信隊列中。我們可以設(shè)置一個監(jiān)聽去監(jiān)聽死信隊列,當收到死信隊列的消息后,則根據(jù)消息數(shù)據(jù),查詢數(shù)據(jù)庫訂單狀態(tài)是否還是待支付狀態(tài),若是,則修改成超時取消。

代碼實現(xiàn)

以下是demo,未做服務(wù)的拆分,因此整個流程都是單個服務(wù)實現(xiàn)的,所以就沒有下游服務(wù),但并不影響整體業(yè)務(wù)。

RabbitMQConfig

將需要的交換機,隊列,綁定都聲明成SpringBean。Spring會自動創(chuàng)建這些到RabbitMQ服務(wù)中。
@Value注解部分都是配置文件exchange、queue、routingKey的名稱。

/**
 * @author wulei
 */
@Configuration
public class RabbitConfig {

    @Value("${sunspring.order.exchange}")
    private String orderExchange;

    @Value("${sunspring.order.queue}")
    private String orderQueue;

    @Value("${sunspring.order.routingKey}")
    private String orderRoutingKey;

    @Value("${sunspring.dlx.exchange}")
    private String dlxExchange;

    @Value("${sunspring.dlx.queue}")
    private String dlxQueue;

    @Value("${sunspring.dlx.routingKey}")
    private String dlxRoutingKey;

    /**
     * 聲明死信隊列
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }

    /**
     * 聲明死信隊列
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }

    /**
     * 綁定死信隊列到死信交換機
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }

    /**
     * 聲明訂單業(yè)務(wù)交換機
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }

    /**
     * 聲明訂單業(yè)務(wù)隊列
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        Map<String,Object> arguments = new HashMap<>(2);
        // 綁定該隊列到私信交換機
        arguments.put("x-dead-letter-exchange",dlxExchange);
        arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
        return new Queue(orderQueue,true,false,false,arguments);
    }

    /**
     * 綁定訂單隊列到訂單交換機
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);

    }
}
sunspring.order.exchange=sunspring_order_exchange
sunspring.order.queue=sunspring_order_queue
sunspring.order.routingKey=sunspring.order

sunspring.dlx.exchange=sunspring_dlx_exchange
sunspring.dlx.queue=sunspring.dlx.queue
sunspring.dlx.routingKey=dlx

在聲明業(yè)務(wù)隊列時,創(chuàng)建了一個Map,并且put了兩個值,這兩個值就是死信隊列的聲明。
x-dead-letter-exchange:死信交換機的名稱
x-dead-letter-routing-key:死信交換機的路由鍵,因為demo中兩個交換機的類型都是direct的,因此路由鍵必須相同。

/**
     * 聲明訂單業(yè)務(wù)隊列
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        Map<String,Object> arguments = new HashMap<>(2);
        // 綁定該隊列到私信交換機
        arguments.put("x-dead-letter-exchange",dlxExchange);
        arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
        return new Queue(orderQueue,true,false,false,arguments);
    }

監(jiān)控頁面

在exchange列表中有剛剛創(chuàng)建的業(yè)務(wù)交換機sunspring_order_exchange和死信交換機
sunspring_dlx_exchange

exchange列表

在Queue列表中,有死信隊列sunspring_dlx_queue和業(yè)務(wù)隊列sunspring_order_queue
并且業(yè)務(wù)隊列上有DLX標記,可見當前隊列已經(jīng)綁定了一個死信隊列。DLK表示的路由鍵。

queue列表

場景模擬

生產(chǎn)者

生產(chǎn)者發(fā)送了一個過期時間為10S的消息。
message.getMessageProperties().setExpiration(“10000”);

/**
     * 發(fā)送帶有過期時間的消息
     */
    @GetMapping("/sendDlx")
    public void sendDlx() {
        Order order = new Order();
        order.setItemId(1);
        order.setStatus(1);
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
                JSON.toJSONString(order), message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
    }

sunspring_order_queue接受到了一條消息,當前消息的狀態(tài)是ready的,表示沒有任何消費者消費這條消息。

在這里插入圖片描述

10s后,當前消息路由到了死信隊列中,sunspring_order_queue消息數(shù)量變成0,sunspring_dlx_queue數(shù)量變成1。

在這里插入圖片描述

消費者,設(shè)置死信隊列監(jiān)聽

通過設(shè)置對死信隊列的監(jiān)聽,可以發(fā)現(xiàn),在Springboot啟動之后,創(chuàng)建了對RabbitMQ的監(jiān)聽,死信隊列的消息也立刻被消費了。

因此,我們可以監(jiān)聽死信隊列,對未被消費的消息進行下一步操作。如場景分析中的更改訂單狀態(tài)。

   @RabbitListener(queues = "sunspring.dlx.queue")
    public void dlxListener(Message message,Channel channel) throws IOException {
        System.out.println(new String(message.getBody()));

        //對消息進行業(yè)務(wù)處理....
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

2019-08-20 20:05:05.158 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [120.27.243.91:5672]
2019-08-20 20:05:05.224 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://guest@120.27.243.91:5672/, localPort= 13563]
{"itemId":1,"status":1}

到此這篇關(guān)于Springboot集成RabbitMQ死信隊列的實現(xiàn)的文章就介紹到這了,更多相關(guān)Springboot RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java多線程中的wait與notify方法詳解

    Java多線程中的wait與notify方法詳解

    這篇文章主要介紹了Java多線程中的wait與notify方法詳解,線程的調(diào)度是無序的,但有些情況要求線程的執(zhí)行是有序的,因此,我們可以使用 wait() 方法來使線程執(zhí)行有序,需要的朋友可以參考下
    2023-08-08
  • 淺談Java后臺對JSON格式的處理操作

    淺談Java后臺對JSON格式的處理操作

    下面小編就為大家?guī)硪黄獪\談Java后臺對JSON格式的處理操作。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-06-06
  • JavaMail郵件發(fā)送機制詳解

    JavaMail郵件發(fā)送機制詳解

    這篇文章主要介紹了JavaMail郵件發(fā)送機制詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11
  • Java類初始化順序詳解

    Java類初始化順序詳解

    這篇文章主要介紹了Java類初始化順序詳解,java語言在使用過程中最先開始就是初始化,在工作中如果遇到什么問題需?要定位往往到最后也可能是初始化的問題,因此掌握初始化的順序很重要,需要的朋友可以參考下
    2023-08-08
  • java實現(xiàn)ip地址與十進制數(shù)相互轉(zhuǎn)換

    java實現(xiàn)ip地址與十進制數(shù)相互轉(zhuǎn)換

    本文介紹在java中IP地址轉(zhuǎn)換十進制數(shù)及把10進制再轉(zhuǎn)換成IP地址的方法及實例參考,曬出來和大家分享一下
    2012-12-12
  • Spring中的@Autowired、@Qualifier和@Primary注解詳解

    Spring中的@Autowired、@Qualifier和@Primary注解詳解

    這篇文章主要介紹了Spring中的@Autowired、@Qualifier和@Primary注解詳解,@Autowired?注解,可以對類成員變量、方法和構(gòu)造函數(shù)進行標注,完成自動裝配的工作,@Autowired?是默認根據(jù)?byType?進行自動裝配的,需要的朋友可以參考下
    2023-11-11
  • Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明

    Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明

    這篇文章主要介紹了Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • java的各種集合為什么不安全(List、Set、Map)以及代替方案

    java的各種集合為什么不安全(List、Set、Map)以及代替方案

    這篇文章主要介紹了java的各種集合為什么不安全(List、Set、Map)以及代替方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-10-10
  • Java三種IO模型原理實例詳解

    Java三種IO模型原理實例詳解

    這篇文章主要介紹了Java三種IO模型原理實例詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-05-05
  • Spring Boot監(jiān)聽Redis Key失效事件實現(xiàn)定時任務(wù)的示例

    Spring Boot監(jiān)聽Redis Key失效事件實現(xiàn)定時任務(wù)的示例

    這篇文章主要介紹了Spring Boot監(jiān)聽Redis Key失效事件實現(xiàn)定時任務(wù)的示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-04-04

最新評論