springcloud中RabbitMQ死信隊列與延遲交換機(jī)實現(xiàn)方法
0.引言
死信隊列是消息隊列中非常重要的概念,同時我們需要業(yè)務(wù)場景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消。那么本期,我們就來講解死信隊列,以及如何通過延遲交換機(jī)來實現(xiàn)延遲發(fā)送的需求。
1. 死信隊列
1.2 什么是死信?
理解死信隊列前,我們先講解什么是死信,所謂死信就是沒有被成功消費的消息,但并不是所有未成功消費的消息都是死信消息,死信消息的產(chǎn)生來源于以下三種途徑: (1)消息被消費者拒絕,參數(shù)requeue設(shè)置為false的消息 (2)過期的消息,過期消息分為兩種: a. 發(fā)送消息時,設(shè)置了某一條消息的生存時間(message TTL),如果生存時間到了,消息還沒有被消費,就會被標(biāo)注為死信消息 b. 設(shè)置了隊列的消息生存時間,針對隊列中所有的消息,如果生存時間到了,消息還沒有被消費,就會被標(biāo)注為死信消息 (3)當(dāng)隊列達(dá)到了最大長度后,再發(fā)送過來的消息就會直接變成死信消息
1.3 什么是死信隊列?
直接來講,用來盛裝死信的隊列就是死信隊列,好像是一句廢話,所以其重點在于理解死信的概念。
死信隊列的作用: (1)隊列在已滿的情況下,會將消息發(fā)送到死信隊列中,這樣消息就不會丟失了,回頭再從死信隊列里將消息取出來進(jìn)行消費即可 (2)可以基于死信隊列實現(xiàn)延遲消費的效果。具體的實現(xiàn)我們后續(xù)講解
1.4 創(chuàng)建死信交換機(jī)、死信隊列
死信交換機(jī)、死信隊列其實都是普通的交換機(jī)、隊列,只是專門聲明出來用于存儲死信消息的。我們只需要通過deadLetterExchange
方法來聲明死信交換機(jī),然后用deadLetterRoutingKey
方法來聲明死信隊列
如下代碼所示,我們創(chuàng)建了test.queue
、test.exchange
及dead.queue
、dead.exchange
,并且在test.queue
中將死信交換機(jī)和死信路由指定到了測試隊列中
注意
:涉及到修改隊列、交換機(jī)屬性的,如果該隊列、交換機(jī)已經(jīng)存在需要將其刪除后才能生效,否則可能還會報錯。
@Configuration public class RabbitMqConfig { private static final String TEST_EXCHANGE = "test.exchange"; private static final String TEST_QUEUE = "test.queue"; private static final String TEST_ROUTING_KEY = "test.routing.key"; private static final String DEAD_EXCHANGE = "dead.exchange"; private static final String DEAD_QUEUE = "dead.queue"; private static final String DEAD_ROUTING_KEY = "dead.routing.key"; @Bean public Queue deadQueue(){ return new Queue(DEAD_QUEUE); } public DirectExchange deadExchange(){ // 設(shè)置演示,使用了直接交換機(jī)Direct,大家可以根據(jù)自己的業(yè)務(wù)情況聲明為其他類型的交換機(jī) return new DirectExchange(DEAD_EXCHANGE); public Binding deadBinding(Queue deadQueue,Exchange deadExchange){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build(); public DirectExchange testExchange(){ return new DirectExchange(TEST_EXCHANGE); public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){ return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY); }
1.5 實現(xiàn)死信消息
1.5.1 基于消費者進(jìn)行reject或nack實現(xiàn)死信消息
@Component public class QueueListener { @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE) public void handler(MyMessage messageInfo, Message message, Channel channel) { try{ System.out.println("接收的消息:"+messageInfo.toString()); // requeue參數(shù)設(shè)置為false 設(shè)置死信消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); // multiple和requeue設(shè)置為false 設(shè)置死信消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // 返回ack 確認(rèn)接收到消息 // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (IOException e){ try { channel.basicRecover(); } catch (IOException ex) { ex.printStackTrace(); log.error("消息處理失?。簕}",e.getMessage()); } } } }
1.5.2 基于生存時間實現(xiàn)
(1)發(fā)送消息時設(shè)置生存時間
@GetMapping("sendTestQueueWithExpiration") public String sendTestQueueWithExpiration(){ MyMessage message = new MyMessage(1L,"物流提醒","到達(dá)裝貨區(qū)域,注意上傳憑證",new Date()); rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> { msg.getMessageProperties().setExpiration("5000"); return msg; }); return "發(fā)送成功"; }
(2)隊列設(shè)置生存時間
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 10s 過期 .ttl(10000) .build(); }
1.5.3 基于隊列max_length實現(xiàn)
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 容量最大100條 .maxLength(100) .build(); }
1.6 基于死信隊列實現(xiàn)消息延遲發(fā)送
上述我們說過死信隊列還可以消息延遲發(fā)送,其思路就是: (1)消息發(fā)送時設(shè)置消息的生存時間,其生存時間就是我們想要延遲的時間 (2)消息者監(jiān)控死信隊列進(jìn)行消費
正常隊列的消息因為沒有消費者消費,同時又指定了生存時間,到達(dá)時間后消息轉(zhuǎn)發(fā)到死信隊列中,消費者監(jiān)聽了死信隊列從而將其消費掉。
基于死信隊列實現(xiàn)消息延遲發(fā)送的問題
如果有兩個消息,一個是5s生存時間,一個是10s生存時間,當(dāng)我們先發(fā)送了10s生存時間的消息到queue中時,因為rabbitmq只會監(jiān)控隊列最外側(cè)的消息的生存時間,也就是監(jiān)控10s生存時間的消息,而5s生存時間的消息只會在最外側(cè)的10s消息到期后才會監(jiān)控,也就導(dǎo)致我實際需要5s生存的消息,實際需要10s才監(jiān)聽到了。
所以呢,基于死信隊列實現(xiàn)的延遲消息,只使用于延遲時間一致的消息。
為了適配更多的延遲場景,已經(jīng)更加簡單的實現(xiàn)延遲消息,我們引入了延遲交換機(jī)
2. 延遲交換機(jī)
延遲交換機(jī)并不是rabbitmq自帶的功能,而是要通過安裝延遲交換機(jī)插件delayed_message_exchange
來實現(xiàn)
其插件的安裝我們之間已經(jīng)講解過,不再累敘,可以參考如下博文 springcloud:安裝rabbitmq并配置延遲隊列插件
通過延遲交換機(jī)實現(xiàn)的延遲消息,其重點主要在交換機(jī)上,隊列就是普通隊列,消息發(fā)送到交換機(jī)上后,會記錄消息的延遲時間,到達(dá)時間后才會發(fā)送到隊列中,這樣消費者通過監(jiān)控隊列,就能在指定時間獲取到消息
因此延遲交換機(jī)與普通交換機(jī)的實現(xiàn),只在創(chuàng)建交換機(jī)時,其他的操作與普通交換機(jī)無異,因此使用起來也很方便
創(chuàng)建延遲交換機(jī),通過x-delayed-type
屬性聲明交換機(jī)類型,可以是direct也可以是topic,具體支持4中交換機(jī)類型,如果不清楚的可以參考之前的博文
@Configuration public class RabbitMqDelayConfig { public static final String DELAY_EXCHANGE = "delay.exchange"; public static final String DELAY_QUEUE = "delay.queue"; public static final String DELAY_ROUTING_KEY = "delay.routing.key"; @Bean public Exchange delayExchange(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type","direct"); return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Queue delayQueue(){ return new Queue(DELAY_QUEUE); } @Bean public Binding delayBinding(Queue delayQueue, Exchange delayExchange){ return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs(); } }
發(fā)送消息時指定延遲時間,單位毫秒
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(30000); return message; } });
我們還可以將該方法封裝為工具類方法,方便之后調(diào)用
/** * 發(fā)送 延遲隊列 * @param exchange 交換機(jī) * @param routeKey 路由 * @param message 消息 * @param delaySecond 延遲秒數(shù) */ public void send(String exchange, String routeKey, Object message, int delaySecond){ rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> { // 消息持久化 msg.getMessageProperties().setDelay(delaySecond * 1000); return msg; }); }
3. 應(yīng)用場景
延遲消息的應(yīng)用場景豐富,除了我們開篇所說的30分鐘未支付自動取消訂單,還比如到貨后72小時未簽收自動簽收
基本上所有需要延遲觸發(fā)的業(yè)務(wù)場景都可以用rabbitmq延遲隊列來實現(xiàn)。
4. 練習(xí)題
對于剛接觸rabbitmq的同學(xué),這里我提供一個練習(xí)題給大家,也讓大家在實操中加強(qiáng)對于rabbitmq的理解:
需求:訂單到貨后72小時未簽收,自動簽收 講解:我們這里要實現(xiàn)訂單到貨后的自動簽收功能,訂單到貨后會觸發(fā)發(fā)送自動簽收消息的方法,訂單已簽收的狀態(tài)status為2,到貨狀態(tài)為1,如果72小時前已經(jīng)簽收了即status被更新為2了,那么需要取消自動簽收(不執(zhí)行自動簽收,即忽略自動簽收消息)
到此這篇關(guān)于springcloud:RabbitMQ死信隊列與延遲交換機(jī)實現(xiàn)的文章就介紹到這了,更多相關(guān)springcloud RabbitMQ死信隊列與延遲交換機(jī)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java計算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法
這篇文章主要介紹了java計算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法,涉及java字符串的遍歷、轉(zhuǎn)換及運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02SpringBoot使用Jasypt對配置文件和數(shù)據(jù)庫密碼加密
在做數(shù)據(jù)庫敏感信息保護(hù)時,應(yīng)加密存儲,本文就來介紹一下SpringBoot使用Jasypt對配置文件和數(shù)據(jù)庫密碼加密,具有一定的參考價值,感興趣的可以了解一下2024-02-02Java基本數(shù)據(jù)類型與封裝類型詳解(int和Integer區(qū)別)
這篇文章主要介紹了Java基本數(shù)據(jù)類型與封裝類型詳解(int和Integer區(qū)別) ,需要的朋友可以參考下2017-02-02Hadoop 使用IntelliJ IDEA 進(jìn)行遠(yuǎn)程調(diào)試代碼的配置方法
這篇文章主要介紹了Hadoop 使用IntelliJ IDEA 進(jìn)行遠(yuǎn)程調(diào)試代碼的配置方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04將字符串?dāng)?shù)字格式化為樣式1,000,000,000的方法
這篇文章主要介紹了將字符串?dāng)?shù)字格式化為樣式1,000,000,000的方法,有需要的朋友可以參考一下2014-01-01