springcloud中RabbitMQ死信隊(duì)列與延遲交換機(jī)實(shí)現(xiàn)方法
0.引言
死信隊(duì)列是消息隊(duì)列中非常重要的概念,同時(shí)我們需要業(yè)務(wù)場(chǎng)景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消。那么本期,我們就來講解死信隊(duì)列,以及如何通過延遲交換機(jī)來實(shí)現(xiàn)延遲發(fā)送的需求。
1. 死信隊(duì)列
1.2 什么是死信?
理解死信隊(duì)列前,我們先講解什么是死信,所謂死信就是沒有被成功消費(fèi)的消息,但并不是所有未成功消費(fèi)的消息都是死信消息,死信消息的產(chǎn)生來源于以下三種途徑: (1)消息被消費(fèi)者拒絕,參數(shù)requeue設(shè)置為false的消息 (2)過期的消息,過期消息分為兩種: a. 發(fā)送消息時(shí),設(shè)置了某一條消息的生存時(shí)間(message TTL),如果生存時(shí)間到了,消息還沒有被消費(fèi),就會(huì)被標(biāo)注為死信消息 b. 設(shè)置了隊(duì)列的消息生存時(shí)間,針對(duì)隊(duì)列中所有的消息,如果生存時(shí)間到了,消息還沒有被消費(fèi),就會(huì)被標(biāo)注為死信消息 (3)當(dāng)隊(duì)列達(dá)到了最大長(zhǎng)度后,再發(fā)送過來的消息就會(huì)直接變成死信消息
1.3 什么是死信隊(duì)列?
直接來講,用來盛裝死信的隊(duì)列就是死信隊(duì)列,好像是一句廢話,所以其重點(diǎn)在于理解死信的概念。
死信隊(duì)列的作用: (1)隊(duì)列在已滿的情況下,會(huì)將消息發(fā)送到死信隊(duì)列中,這樣消息就不會(huì)丟失了,回頭再?gòu)乃佬抨?duì)列里將消息取出來進(jìn)行消費(fèi)即可 (2)可以基于死信隊(duì)列實(shí)現(xiàn)延遲消費(fèi)的效果。具體的實(shí)現(xiàn)我們后續(xù)講解
1.4 創(chuàng)建死信交換機(jī)、死信隊(duì)列
死信交換機(jī)、死信隊(duì)列其實(shí)都是普通的交換機(jī)、隊(duì)列,只是專門聲明出來用于存儲(chǔ)死信消息的。我們只需要通過deadLetterExchange
方法來聲明死信交換機(jī),然后用deadLetterRoutingKey
方法來聲明死信隊(duì)列
如下代碼所示,我們創(chuàng)建了test.queue
、test.exchange
及dead.queue
、dead.exchange
,并且在test.queue
中將死信交換機(jī)和死信路由指定到了測(cè)試隊(duì)列中
注意
:涉及到修改隊(duì)列、交換機(jī)屬性的,如果該隊(duì)列、交換機(jī)已經(jīng)存在需要將其刪除后才能生效,否則可能還會(huì)報(bào)錯(cuò)。
@Configuration public class RabbitMqConfig { private static final String TEST_EXCHANGE = "test.exchange"; private static final String TEST_QUEUE = "test.queue"; private static final String TEST_ROUTING_KEY = "test.routing.key"; private static final String DEAD_EXCHANGE = "dead.exchange"; private static final String DEAD_QUEUE = "dead.queue"; private static final String DEAD_ROUTING_KEY = "dead.routing.key"; @Bean public Queue deadQueue(){ return new Queue(DEAD_QUEUE); } public DirectExchange deadExchange(){ // 設(shè)置演示,使用了直接交換機(jī)Direct,大家可以根據(jù)自己的業(yè)務(wù)情況聲明為其他類型的交換機(jī) return new DirectExchange(DEAD_EXCHANGE); public Binding deadBinding(Queue deadQueue,Exchange deadExchange){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build(); public DirectExchange testExchange(){ return new DirectExchange(TEST_EXCHANGE); public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){ return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY); }
1.5 實(shí)現(xiàn)死信消息
1.5.1 基于消費(fèi)者進(jìn)行reject或nack實(shí)現(xiàn)死信消息
@Component public class QueueListener { @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE) public void handler(MyMessage messageInfo, Message message, Channel channel) { try{ System.out.println("接收的消息:"+messageInfo.toString()); // requeue參數(shù)設(shè)置為false 設(shè)置死信消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); // multiple和requeue設(shè)置為false 設(shè)置死信消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // 返回ack 確認(rèn)接收到消息 // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (IOException e){ try { channel.basicRecover(); } catch (IOException ex) { ex.printStackTrace(); log.error("消息處理失?。簕}",e.getMessage()); } } } }
1.5.2 基于生存時(shí)間實(shí)現(xiàn)
(1)發(fā)送消息時(shí)設(shè)置生存時(shí)間
@GetMapping("sendTestQueueWithExpiration") public String sendTestQueueWithExpiration(){ MyMessage message = new MyMessage(1L,"物流提醒","到達(dá)裝貨區(qū)域,注意上傳憑證",new Date()); rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> { msg.getMessageProperties().setExpiration("5000"); return msg; }); return "發(fā)送成功"; }
(2)隊(duì)列設(shè)置生存時(shí)間
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 10s 過期 .ttl(10000) .build(); }
1.5.3 基于隊(duì)列max_length實(shí)現(xiàn)
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 容量最大100條 .maxLength(100) .build(); }
1.6 基于死信隊(duì)列實(shí)現(xiàn)消息延遲發(fā)送
上述我們說過死信隊(duì)列還可以消息延遲發(fā)送,其思路就是: (1)消息發(fā)送時(shí)設(shè)置消息的生存時(shí)間,其生存時(shí)間就是我們想要延遲的時(shí)間 (2)消息者監(jiān)控死信隊(duì)列進(jìn)行消費(fèi)
正常隊(duì)列的消息因?yàn)闆]有消費(fèi)者消費(fèi),同時(shí)又指定了生存時(shí)間,到達(dá)時(shí)間后消息轉(zhuǎn)發(fā)到死信隊(duì)列中,消費(fèi)者監(jiān)聽了死信隊(duì)列從而將其消費(fèi)掉。
基于死信隊(duì)列實(shí)現(xiàn)消息延遲發(fā)送的問題
如果有兩個(gè)消息,一個(gè)是5s生存時(shí)間,一個(gè)是10s生存時(shí)間,當(dāng)我們先發(fā)送了10s生存時(shí)間的消息到queue中時(shí),因?yàn)閞abbitmq只會(huì)監(jiān)控隊(duì)列最外側(cè)的消息的生存時(shí)間,也就是監(jiān)控10s生存時(shí)間的消息,而5s生存時(shí)間的消息只會(huì)在最外側(cè)的10s消息到期后才會(huì)監(jiān)控,也就導(dǎo)致我實(shí)際需要5s生存的消息,實(shí)際需要10s才監(jiān)聽到了。
所以呢,基于死信隊(duì)列實(shí)現(xiàn)的延遲消息,只使用于延遲時(shí)間一致的消息。
為了適配更多的延遲場(chǎng)景,已經(jīng)更加簡(jiǎn)單的實(shí)現(xiàn)延遲消息,我們引入了延遲交換機(jī)
2. 延遲交換機(jī)
延遲交換機(jī)并不是rabbitmq自帶的功能,而是要通過安裝延遲交換機(jī)插件delayed_message_exchange
來實(shí)現(xiàn)
其插件的安裝我們之間已經(jīng)講解過,不再累敘,可以參考如下博文 springcloud:安裝rabbitmq并配置延遲隊(duì)列插件
通過延遲交換機(jī)實(shí)現(xiàn)的延遲消息,其重點(diǎn)主要在交換機(jī)上,隊(duì)列就是普通隊(duì)列,消息發(fā)送到交換機(jī)上后,會(huì)記錄消息的延遲時(shí)間,到達(dá)時(shí)間后才會(huì)發(fā)送到隊(duì)列中,這樣消費(fèi)者通過監(jiān)控隊(duì)列,就能在指定時(shí)間獲取到消息
因此延遲交換機(jī)與普通交換機(jī)的實(shí)現(xiàn),只在創(chuàng)建交換機(jī)時(shí),其他的操作與普通交換機(jī)無異,因此使用起來也很方便
創(chuàng)建延遲交換機(jī),通過x-delayed-type
屬性聲明交換機(jī)類型,可以是direct也可以是topic,具體支持4中交換機(jī)類型,如果不清楚的可以參考之前的博文
@Configuration public class RabbitMqDelayConfig { public static final String DELAY_EXCHANGE = "delay.exchange"; public static final String DELAY_QUEUE = "delay.queue"; public static final String DELAY_ROUTING_KEY = "delay.routing.key"; @Bean public Exchange delayExchange(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type","direct"); return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Queue delayQueue(){ return new Queue(DELAY_QUEUE); } @Bean public Binding delayBinding(Queue delayQueue, Exchange delayExchange){ return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs(); } }
發(fā)送消息時(shí)指定延遲時(shí)間,單位毫秒
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(30000); return message; } });
我們還可以將該方法封裝為工具類方法,方便之后調(diào)用
/** * 發(fā)送 延遲隊(duì)列 * @param exchange 交換機(jī) * @param routeKey 路由 * @param message 消息 * @param delaySecond 延遲秒數(shù) */ public void send(String exchange, String routeKey, Object message, int delaySecond){ rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> { // 消息持久化 msg.getMessageProperties().setDelay(delaySecond * 1000); return msg; }); }
3. 應(yīng)用場(chǎng)景
延遲消息的應(yīng)用場(chǎng)景豐富,除了我們開篇所說的30分鐘未支付自動(dòng)取消訂單,還比如到貨后72小時(shí)未簽收自動(dòng)簽收
基本上所有需要延遲觸發(fā)的業(yè)務(wù)場(chǎng)景都可以用rabbitmq延遲隊(duì)列來實(shí)現(xiàn)。
4. 練習(xí)題
對(duì)于剛接觸rabbitmq的同學(xué),這里我提供一個(gè)練習(xí)題給大家,也讓大家在實(shí)操中加強(qiáng)對(duì)于rabbitmq的理解:
需求:訂單到貨后72小時(shí)未簽收,自動(dòng)簽收 講解:我們這里要實(shí)現(xiàn)訂單到貨后的自動(dòng)簽收功能,訂單到貨后會(huì)觸發(fā)發(fā)送自動(dòng)簽收消息的方法,訂單已簽收的狀態(tài)status為2,到貨狀態(tài)為1,如果72小時(shí)前已經(jīng)簽收了即status被更新為2了,那么需要取消自動(dòng)簽收(不執(zhí)行自動(dòng)簽收,即忽略自動(dòng)簽收消息)
到此這篇關(guān)于springcloud:RabbitMQ死信隊(duì)列與延遲交換機(jī)實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)springcloud RabbitMQ死信隊(duì)列與延遲交換機(jī)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java計(jì)算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法
這篇文章主要介紹了java計(jì)算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法,涉及java字符串的遍歷、轉(zhuǎn)換及運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02SpringBoot使用Jasypt對(duì)配置文件和數(shù)據(jù)庫(kù)密碼加密
在做數(shù)據(jù)庫(kù)敏感信息保護(hù)時(shí),應(yīng)加密存儲(chǔ),本文就來介紹一下SpringBoot使用Jasypt對(duì)配置文件和數(shù)據(jù)庫(kù)密碼加密,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02Java基本數(shù)據(jù)類型與封裝類型詳解(int和Integer區(qū)別)
這篇文章主要介紹了Java基本數(shù)據(jù)類型與封裝類型詳解(int和Integer區(qū)別) ,需要的朋友可以參考下2017-02-02Hadoop 使用IntelliJ IDEA 進(jìn)行遠(yuǎn)程調(diào)試代碼的配置方法
這篇文章主要介紹了Hadoop 使用IntelliJ IDEA 進(jìn)行遠(yuǎn)程調(diào)試代碼的配置方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04將字符串?dāng)?shù)字格式化為樣式1,000,000,000的方法
這篇文章主要介紹了將字符串?dāng)?shù)字格式化為樣式1,000,000,000的方法,有需要的朋友可以參考一下2014-01-01