SpringBoot整合RabbitMQ實現(xiàn)延遲隊列的示例詳解
如何保證消息不丟失
rabbitmq消息投遞路徑
生產(chǎn)者->交換機->隊列->消費者
總的來說分為三個階段。
- 1.生產(chǎn)者保證消息投遞可靠性。
- 2.mq內(nèi)部消息不丟失。
- 3.消費者消費成功。
什么是消息投遞可靠性
簡單點說就是消息百分百發(fā)送到消息隊列中。
我們可以開啟confirmCallback
生產(chǎn)者投遞消息后,mq會給生產(chǎn)者一個ack.根據(jù)ack,生產(chǎn)者就可以確認這條消息是否發(fā)送到mq.
開啟confirmCallback
修改配置文件
#NONE:禁用發(fā)布確認模式,是默認值,CORRELATED:發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法 spring: rabbitmq: publisher-confirm-type: correlated
測試代碼
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交換機是否收到消息,true是成功,false是失敗 * @param cause 失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根據(jù)ACK狀態(tài)做對應的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "雞你太美"); Thread.sleep(10000); }
通過returnCallback保證消息從交換器發(fā)送到隊列成功。 修改配置文件
spring: rabbitmq: #開啟returnCallback publisher-returns: true #交換機處理消息到路由失敗,則會返回給生產(chǎn)者 template: mandatory: true
測試代碼
@Test void testReturnCallback() { //為true,則交換機處理消息到路由失敗,則會返回給生產(chǎn)者 配置文件指定,則這里不需指定 rabbitTemplate.setMandatory(true); //開啟強制消息投遞(mandatory為設置為true),但消息未被路由至任何一個queue,則回退一條消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","測試returnCallback"); }
消費者消費消息時需要通過ack手動確認消息已消費。
修改配置文件
spring: rabbitmq: listener: simple: acknowledge-mode: manual
編寫測試代碼
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功確認,使用此回執(zhí)方法后,消息會被 rabbitmq broker 刪除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }
deliveryTags是消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加
ttl死信隊列
什么是死信隊列
沒有被及時消費的消息存放的隊列
消息有哪幾種情況成為死信
- 消費者拒收消息 (basic.reject/ basic.nack) ,并且沒有重新入隊 requeue=false
- 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
- 隊列的消息長度達到極限
- 結(jié)果:消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
死信隊列經(jīng)常用來做延遲隊列消費。
延遲隊列
生產(chǎn)者投遞到mq中并不希望這條消息立馬被消費,而是等待一段時間后再去消費。
springboot整合rabbitmq實現(xiàn)訂單超時自動關(guān)閉
package com.fandf.test.rabbit; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author fandongfeng * @date 2023/4/15 15:38 */ @Configuration public class RabbitMQConfig { /** * 訂單交換機 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 訂單隊列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 訂單路由key */ public static final String ORDER_QUEUE_ROUTING_KEY = "order.#"; /** * 死信交換機 */ public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange"; /** * 死信隊列 routingKey */ public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key"; /** * 死信隊列 */ public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue"; /** * 創(chuàng)建死信交換機 */ @Bean("orderDeadLetterExchange") public Exchange orderDeadLetterExchange() { return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false); } /** * 創(chuàng)建死信隊列 */ @Bean("orderDeadLetterQueue") public Queue orderDeadLetterQueue() { return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build(); } /** * 綁定死信交換機和死信隊列 */ @Bean("orderDeadLetterBinding") public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs(); } /** * 創(chuàng)建訂單交換機 */ @Bean("orderExchange") public Exchange orderExchange() { return new TopicExchange(ORDER_EXCHANGE, true, false); } /** * 創(chuàng)建訂單隊列 */ @Bean("orderQueue") public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); //消息過期后,進入到死信交換機 args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE); //消息過期后,進入到死信交換機的路由key args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY); //過期時間,單位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 綁定訂單交換機和隊列 */ @Bean("orderBinding") public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs(); } }
消費者
package com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }
測試類
@Test void testOrder() throws InterruptedException { //為true,則交換機處理消息到路由失敗,則會返回給生產(chǎn)者 配置文件指定,則這里不需指定 rabbitTemplate.setMandatory(true); //開啟強制消息投遞(mandatory為設置為true),但消息未被路由至任何一個queue,則回退一條消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "測試訂單延遲"); System.out.println("發(fā)送消息:" + DateUtil.now()); Thread.sleep(20000); }
程序輸出
發(fā)送消息:2023-04-16 15:14:34
收到消息:2023-04-16 15:14:44
msgTag=1
message=(Body:'測試訂單延遲' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=測試訂單延遲
到此這篇關(guān)于SpringBoot整合RabbitMQ實現(xiàn)延遲隊列的示例詳解的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中優(yōu)先隊列PriorityQueue常用方法示例
這篇文章主要介紹了Java中優(yōu)先隊列PriorityQueue常用方法示例,PriorityQueue是一種特殊的隊列,滿足隊列的“隊尾進、隊頭出”條件,但是每次插入或刪除元素后,都對隊列進行調(diào)整,使得隊列始終構(gòu)成最小堆(或最大堆),需要的朋友可以參考下2023-09-09MyBatis通過JDBC數(shù)據(jù)驅(qū)動生成的執(zhí)行語句問題
這篇文章主要介紹了MyBatis通過JDBC數(shù)據(jù)驅(qū)動生成的執(zhí)行語句問題的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-08-08在jmeter的beanshell中用java獲取系統(tǒng)當前時間的簡單實例
這篇文章介紹了在jmeter的beanshell中用java獲取系統(tǒng)當前時間的簡單實例,有需要的朋友可以參考一下2013-09-09springcloud微服務基于redis集群的單點登錄實現(xiàn)解析
這篇文章主要介紹了springcloud微服務基于redis集群的單點登錄實現(xiàn)解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09Java 中的Printstream介紹_動力節(jié)點Java學院整理
PrintStream 是打印輸出流,它繼承于FilterOutputStream。接下來通過本文給大家介紹Java 中的Printstream,需要的朋友參考下吧2017-05-05