SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊列和死信隊列
一、死信隊列
RabbitMQ的死信隊列(Dead Letter Queue,DLQ)是一種特殊的隊列,用于接收其他隊列中的“死信”消息。所謂“死信”,是指滿足一定條件而無法被消費(fèi)者正確處理的消息,這些條件包括消息被拒絕、消息過期、消息達(dá)到最大重試次數(shù)等。
當(dāng)消息成為死信時,RabbitMQ會將其重新發(fā)送到指定的死信隊列,而不是丟棄它們。這樣做的好處是可以對死信進(jìn)行分析和處理,例如記錄日志、重新入隊或者進(jìn)一步處理。
死信隊列通常與RabbitMQ的延遲隊列(Delayed Message Queue)一起使用,通過延遲隊列延遲消息的處理時間,可以更容易地觸發(fā)消息成為死信的條件,從而進(jìn)行測試和調(diào)試。
死信隊列在消息中間件中有許多實(shí)際應(yīng)用場景,主要用于處理無法被正常消費(fèi)的消息,增強(qiáng)了消息的可靠性和處理能力。以下是一些常見的應(yīng)用場景:
延遲消息處理:通過將消息發(fā)送到延遲隊列,在指定的時間后再將消息發(fā)送到目標(biāo)隊列,實(shí)現(xiàn)延遲處理消息的功能。
消息重試:當(dāng)消費(fèi)者無法處理消息時,消息可以被重新發(fā)送到隊列并設(shè)置重試次數(shù),達(dá)到最大重試次數(shù)后轉(zhuǎn)發(fā)到死信隊列,以便進(jìn)行進(jìn)一步處理。
異常處理:當(dāng)消息無法被消費(fèi)者正常處理時(如格式錯誤、業(yè)務(wù)異常等),將消息轉(zhuǎn)發(fā)到死信隊列,用于記錄日志、報警或人工處理。
消息超時處理:當(dāng)消息在隊列中等待時間過長時,可以設(shè)置消息的過期時間(TTL),超過時間后將消息轉(zhuǎn)發(fā)到死信隊列。
消息路由失敗:當(dāng)消息無法被正確路由到目標(biāo)隊列時,可以將消息發(fā)送到死信隊列,避免消息丟失。
消息版本兼容性處理:當(dāng)消息的格式或內(nèi)容發(fā)生變化時,通過死信隊列可以處理老版本消息,確保新版本系統(tǒng)的兼容性。
RabbitMQ的工作模式
死信隊列的工作模式
今天我要實(shí)現(xiàn)的就是這個延遲隊列和死信隊列。生產(chǎn)者首先向延遲隊列發(fā)送消息,待達(dá)到TTL后消息會被轉(zhuǎn)送到死信隊列當(dāng)中,消費(fèi)者會從死信隊列中獲取消息進(jìn)行消費(fèi)。
二、RabbitMQ相關(guān)的安裝
win10安裝rabbitMQ的詳細(xì)步驟_java_腳本之家 (jb51.net)
我這里直接引用別人的文章了,下載需要大家去看一看。
RabbitMQ延遲插件的安裝。
三、SpringBoot引入RabbitMQ
1.引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency>
2.創(chuàng)建隊列和交換器
這一步是很重要的,如果你配置錯誤了,消息很可能無法正確的傳送。要實(shí)現(xiàn)延遲隊列和死信隊列,我們一共要創(chuàng)建以下幾個組件:
- 延遲隊列
- 延遲隊列的交換器
- 死信隊列
- 死信隊列的交換器
在我們創(chuàng)建了這幾個組件之后,我們還要干一些事情,我們需要把這些組件進(jìn)行組裝,如果你不了解RabbitMQ的基礎(chǔ),你可以先看看基礎(chǔ)教學(xué),我這里簡單的說一下。RabbitMQ中有一種綁定方式,這種綁定方式會把BindingKey和RoutingKey完全匹配的進(jìn)行綁定,如下圖所示,生產(chǎn)者發(fā)送了一個BindingKey為“warning”的消息,那么這個消息就會被發(fā)送到Queue1和Queue2,這并不難理解。
我們要做的就是把隊列和交換器通過一個RoutingKey綁定在一起。
2.1 變量聲明
接下來的代碼要好好看了,首先我們把我們后邊要用到的名稱變量全部定義出來。因?yàn)檫@個名稱起的很長,我們不方便直接使用。創(chuàng)建DeadRabbitConfig。在類中定義如下變量,延遲隊列交換器名稱、延遲隊列名稱、延遲隊列Routing名稱。除此之外還有死信隊列交換器名稱、死信隊列名稱和死信Routing名稱。
// 延遲隊列交換器名稱 public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; // 延遲隊列A名稱 public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a"; // 延遲隊列B名稱 public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b"; // 延遲隊列routingA名稱 public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key"; // 延遲隊列routingB名稱 public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key"; // 死信隊列 public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key"; public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key"; public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a"; public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";
2.2 創(chuàng)建延遲交換器
// 注冊延遲交換器delayExchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); }
2.3 創(chuàng)建延遲隊列
這里的延遲隊列需要我們額外的配置一些參數(shù),用于和死信隊列進(jìn)行信息發(fā)送。這里我是用了兩種不同的方式構(gòu)建延遲隊列A和延遲隊列B,在延遲隊列A種我沒有設(shè)置TTL參數(shù),而是通過RabbitMQ的延遲插件實(shí)現(xiàn)的,而延遲隊列B我設(shè)置了TTL為10000ms,也就是十秒,十秒內(nèi)消息如果沒有被消費(fèi)掉就會發(fā)送到死信隊列。
// 注冊延遲隊列A 還要綁定死信交換器和死信routingA @Bean("delayQueueA") public Queue delayQueueA(){ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY); //args.put("x-message-ttl",6000); return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build(); } // 注冊延遲隊列B 還要綁定死信交換器和死信routingB @Bean("delayQueueB") public Queue delayQueueB(){ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY); args.put("x-message-ttl",10000); return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build(); }
2.4 延遲隊列綁定延遲交換器
// 延遲隊列A綁定交換器 @Bean public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){ return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME); } // 延遲隊列B綁定交換器 @Bean public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){ return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME); }
2.5 死信隊列配置
與延遲隊列不同的是,死信隊列并沒有配置延遲參數(shù)。
// 注冊死信隊列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUE_A_NAME); } // 注冊死信隊列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUE_B_NAME); } // 注冊死信交換器 @Bean public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 死信隊列A綁定死信交換器 @Bean public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){ return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY); } // 死信隊列B綁定死信交換器 @Bean public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){ return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY); }
到此為止,RabbitMQ的組件配置完成。
3. 添加application.yml
server: port: 8081 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
4. 添加RabbitMQListener (消費(fèi)者)
下方的代碼一共有兩個消費(fèi)者,一個消費(fèi)者獲取死信隊列A中的消息,另一個消費(fèi)者獲取死信隊列B中的消息。
@Component public class DeadLetterQueueConsumer { public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class); @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL") public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); LOGGER.info("當(dāng)前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg); System.out.println(message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL") public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); LOGGER.info("當(dāng)前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
5. 創(chuàng)建DelayMessageSender
這里采用的就是兩種不同的方式,一種方式是使用插件來延遲消息的發(fā)送,另一種是通過TTL參數(shù)。
@Component public class DelayMessageSender { @Resource RabbitTemplate rabbitTemplate; public void sendMessage(String msg,Integer delayTimes){ switch (delayTimes){ case 6: rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(6000)); return message; } }); break; case 10: rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg); break; } } }
6. 創(chuàng)建Controller
@RestController @RequestMapping("/student") public class StudentController { @Autowired DelayMessageSender messageSender; @RequestMapping("/send-message") public String sendMessage(String msg,Integer delayTimes){ System.out.println(new Date()); messageSender.sendMessage(msg,delayTimes); return "發(fā)送成功"; } }
7.測試
在瀏覽器中輸入以下地址進(jìn)入RabbitMQ界面。賬號密碼都是guest。
http://localhost:15672/
先來看看我們的初始隊列。這里是什么都沒有的。
然后我們啟動項(xiàng)目后在看。我們剛才創(chuàng)建出來的四個隊列全部都被加載了出來。
使用PostMan發(fā)送一次請求。
我們的請求在17s的時候發(fā)送到后端,消息打印在23s,說明我們的延遲隊列有效果。
接下來我們測試10s的延遲隊列。
10s后死信隊列B成功的接收到了消息。
四、死信隊列的應(yīng)用場景
延遲隊列通常用于需要延遲執(zhí)行某些任務(wù)或觸發(fā)某些事件的場景。例如,在電子商務(wù)中,可以使用延遲隊列實(shí)現(xiàn)訂單超時未支付自動取消功能。
1.訂單創(chuàng)建:
用戶下單后,系統(tǒng)生成訂單,并將訂單信息發(fā)送到一個普通隊列,同時設(shè)置一個TTL(Time-To-Live)為30分鐘。這個隊列配置了死信交換機(jī)(Dead Letter Exchange, DLX),當(dāng)消息過期后會被轉(zhuǎn)發(fā)到死信隊列。2.等待支付:
在30分鐘內(nèi),用戶可以完成支付。如果用戶在30分鐘內(nèi)支付完成,系統(tǒng)會從普通隊列中移除對應(yīng)的消息并正常處理訂單。3.訂單超時處理:
如果用戶未在30分鐘內(nèi)完成支付,消息會自動過期并轉(zhuǎn)發(fā)到死信交換機(jī),進(jìn)而轉(zhuǎn)發(fā)到死信隊列。4.取消訂單:
系統(tǒng)有一個專門的消費(fèi)者監(jiān)聽死信隊列。當(dāng)有消息進(jìn)入死信隊列時,消費(fèi)者會自動處理這些消息,即取消訂單、釋放庫存,并通知用戶訂單已取消。5.定時任務(wù)(可選):
雖然死信隊列已經(jīng)提供了超時訂單的處理,但為了防止消息丟失或處理延遲,可以設(shè)置一個定時任務(wù)定期檢查訂單狀態(tài),確保所有超時未支付的訂單都得到了處理。
以上就是SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊列和死信隊列的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java基礎(chǔ)之泛型知識點(diǎn)總結(jié)
這篇文章主要介紹了java基礎(chǔ)之泛型知識點(diǎn)總結(jié),文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有很好的幫助,需要的朋友可以參考下2021-04-04Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢
這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-12-12如何把spring boot應(yīng)用發(fā)布到Harbor
這篇文章主要介紹了如何把spring boot應(yīng)用發(fā)布到Harbor,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法
這篇文章主要介紹了Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法,代碼很簡單,需要的朋友可以參考下2018-05-05使用Java實(shí)現(xiàn)價格加密與優(yōu)化功能
在現(xiàn)代軟件開發(fā)中,數(shù)據(jù)加密是一個非常重要的環(huán)節(jié),尤其是在處理敏感信息(如價格、用戶數(shù)據(jù)等)時,本文將詳細(xì)介紹如何使用?Java?實(shí)現(xiàn)價格加密,并對代碼進(jìn)行優(yōu)化,需要的朋友可以參考下2025-01-01Springboot整合分頁插件PageHelper步驟解析
這篇文章主要介紹了Springboot整合分頁插件PageHelper步驟解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06