Springboot集成RabbitMQ死信隊列的實現(xiàn)
關(guān)于死信隊列
在大多數(shù)的MQ中間件中,都有死信隊列的概念。死信隊列同其他的隊列一樣都是普通的隊列。在RabbitMQ中并沒有特定的“死信隊列”類型,而是通過配置,將其實現(xiàn)。
當我們在創(chuàng)建一個業(yè)務(wù)的交換機和隊列的時候,可以配置參數(shù),指明另一個隊列為當前隊列的死信隊列,在RabbitMQ中,死信隊列(嚴格的說應(yīng)該是死信交換機)被稱為DLX Exchange。當消息“死掉”后,會被自動路由到DLX Exchange的queue中。
什么樣的消息會進入死信隊列?
1.消息的TTL過期。
2.消費者對broker應(yīng)答Nack,并且消息禁止重回隊列。
3.Queue隊列長度已達上限。
場景分析
以用戶訂單支付為場景。在各大電商平臺上,訂單的都有待支付時間,通常為30min。當用戶超過30min未支付訂單,該訂單的狀態(tài)應(yīng)該會變成“超時取消”,或類似的狀態(tài)值的改變。
如果不使用MQ,可以設(shè)計一個定時任務(wù),定時查詢數(shù)據(jù)庫,判斷訂單的狀態(tài)和支付時間是否已經(jīng)到期,若到期則修改訂單的狀態(tài)。但顯然,這不是一個很好的操作,頻繁訪問數(shù)據(jù)庫,造成不必要的資源浪費。
使用MQ,我們可以在下單的時候,當訂單數(shù)據(jù)入庫后,發(fā)送一條Message到Queue中,并設(shè)置過期時間為30min或自定義的支付過期時間。
/** * 發(fā)送帶有過期時間的消息 */ @GetMapping("/sendDlx") public void sendDlx() { Order order = new Order(); order.setItemId(1); order.setStatus(1); rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, JSON.toJSONString(order), message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 模擬,設(shè)置10S后消息過期 message.getMessageProperties().setExpiration("10000"); return message; }); }
若30min后,還未有消費者(下游服務(wù))消費這條消息,那么該條消息就會被路由到死信隊列中。我們可以設(shè)置一個監(jiān)聽去監(jiān)聽死信隊列,當收到死信隊列的消息后,則根據(jù)消息數(shù)據(jù),查詢數(shù)據(jù)庫訂單狀態(tài)是否還是待支付狀態(tài),若是,則修改成超時取消。
代碼實現(xiàn)
以下是demo,未做服務(wù)的拆分,因此整個流程都是單個服務(wù)實現(xiàn)的,所以就沒有下游服務(wù),但并不影響整體業(yè)務(wù)。
RabbitMQConfig
將需要的交換機,隊列,綁定都聲明成SpringBean。Spring會自動創(chuàng)建這些到RabbitMQ服務(wù)中。
@Value注解部分都是配置文件exchange、queue、routingKey的名稱。
/** * @author wulei */ @Configuration public class RabbitConfig { @Value("${sunspring.order.exchange}") private String orderExchange; @Value("${sunspring.order.queue}") private String orderQueue; @Value("${sunspring.order.routingKey}") private String orderRoutingKey; @Value("${sunspring.dlx.exchange}") private String dlxExchange; @Value("${sunspring.dlx.queue}") private String dlxQueue; @Value("${sunspring.dlx.routingKey}") private String dlxRoutingKey; /** * 聲明死信隊列 * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 聲明死信隊列 * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 綁定死信隊列到死信交換機 * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 聲明訂單業(yè)務(wù)交換機 * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 聲明訂單業(yè)務(wù)隊列 * @return Queue */ @Bean public Queue orderQueue() { Map<String,Object> arguments = new HashMap<>(2); // 綁定該隊列到私信交換機 arguments.put("x-dead-letter-exchange",dlxExchange); arguments.put("x-dead-letter-routing-key",dlxRoutingKey); return new Queue(orderQueue,true,false,false,arguments); } /** * 綁定訂單隊列到訂單交換機 * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
sunspring.order.exchange=sunspring_order_exchange sunspring.order.queue=sunspring_order_queue sunspring.order.routingKey=sunspring.order sunspring.dlx.exchange=sunspring_dlx_exchange sunspring.dlx.queue=sunspring.dlx.queue sunspring.dlx.routingKey=dlx
在聲明業(yè)務(wù)隊列時,創(chuàng)建了一個Map,并且put了兩個值,這兩個值就是死信隊列的聲明。
x-dead-letter-exchange:死信交換機的名稱
x-dead-letter-routing-key:死信交換機的路由鍵,因為demo中兩個交換機的類型都是direct的,因此路由鍵必須相同。
/** * 聲明訂單業(yè)務(wù)隊列 * @return Queue */ @Bean public Queue orderQueue() { Map<String,Object> arguments = new HashMap<>(2); // 綁定該隊列到私信交換機 arguments.put("x-dead-letter-exchange",dlxExchange); arguments.put("x-dead-letter-routing-key",dlxRoutingKey); return new Queue(orderQueue,true,false,false,arguments); }
監(jiān)控頁面
在exchange列表中有剛剛創(chuàng)建的業(yè)務(wù)交換機sunspring_order_exchange和死信交換機
sunspring_dlx_exchange
在Queue列表中,有死信隊列sunspring_dlx_queue和業(yè)務(wù)隊列sunspring_order_queue
并且業(yè)務(wù)隊列上有DLX標記,可見當前隊列已經(jīng)綁定了一個死信隊列。DLK表示的路由鍵。
場景模擬
生產(chǎn)者
生產(chǎn)者發(fā)送了一個過期時間為10S的消息。
message.getMessageProperties().setExpiration(“10000”);
/** * 發(fā)送帶有過期時間的消息 */ @GetMapping("/sendDlx") public void sendDlx() { Order order = new Order(); order.setItemId(1); order.setStatus(1); rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, JSON.toJSONString(order), message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setExpiration("10000"); return message; }); }
sunspring_order_queue接受到了一條消息,當前消息的狀態(tài)是ready的,表示沒有任何消費者消費這條消息。
10s后,當前消息路由到了死信隊列中,sunspring_order_queue消息數(shù)量變成0,sunspring_dlx_queue數(shù)量變成1。
消費者,設(shè)置死信隊列監(jiān)聽
通過設(shè)置對死信隊列的監(jiān)聽,可以發(fā)現(xiàn),在Springboot啟動之后,創(chuàng)建了對RabbitMQ的監(jiān)聽,死信隊列的消息也立刻被消費了。
因此,我們可以監(jiān)聽死信隊列,對未被消費的消息進行下一步操作。如場景分析中的更改訂單狀態(tài)。
@RabbitListener(queues = "sunspring.dlx.queue") public void dlxListener(Message message,Channel channel) throws IOException { System.out.println(new String(message.getBody())); //對消息進行業(yè)務(wù)處理.... channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
2019-08-20 20:05:05.158 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [120.27.243.91:5672]
2019-08-20 20:05:05.224 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://guest@120.27.243.91:5672/, localPort= 13563]
{"itemId":1,"status":1}
到此這篇關(guān)于Springboot集成RabbitMQ死信隊列的實現(xiàn)的文章就介紹到這了,更多相關(guān)Springboot RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- SpringAMQP消息隊列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- springboot2.5.6集成RabbitMq實現(xiàn)Topic主題模式(推薦)
- SpringBoot集成RabbitMQ的方法(死信隊列)
- springboot2.0集成rabbitmq的示例代碼
- Spring Boot系列教程之7步集成RabbitMQ的方法
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- spring boot集成rabbitmq的實例教程
- 詳解spring boot集成RabbitMQ
- Spring Boot 3 集成 RabbitMQ 實踐指南(原理解析)
相關(guān)文章
java實現(xiàn)ip地址與十進制數(shù)相互轉(zhuǎn)換
本文介紹在java中IP地址轉(zhuǎn)換十進制數(shù)及把10進制再轉(zhuǎn)換成IP地址的方法及實例參考,曬出來和大家分享一下2012-12-12Spring中的@Autowired、@Qualifier和@Primary注解詳解
這篇文章主要介紹了Spring中的@Autowired、@Qualifier和@Primary注解詳解,@Autowired?注解,可以對類成員變量、方法和構(gòu)造函數(shù)進行標注,完成自動裝配的工作,@Autowired?是默認根據(jù)?byType?進行自動裝配的,需要的朋友可以參考下2023-11-11Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明
這篇文章主要介紹了Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07java的各種集合為什么不安全(List、Set、Map)以及代替方案
這篇文章主要介紹了java的各種集合為什么不安全(List、Set、Map)以及代替方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10Spring Boot監(jiān)聽Redis Key失效事件實現(xiàn)定時任務(wù)的示例
這篇文章主要介紹了Spring Boot監(jiān)聽Redis Key失效事件實現(xiàn)定時任務(wù)的示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-04-04