RabbitMQ消息的延遲隊列詳解
Dead Letter Exchange(死信交換機)
在MQ中,當消息成為死信(Dead message 死掉的信息)后,消息中間件可以將其從當前隊列發(fā)送到另一個隊列中,這個隊列就是死信隊列。而 在RabbitMQ中,由于有交換機的概念,實際是將死信發(fā)送給了死信交換機(Dead Letter Exchange,簡稱DLX)。死信交換機和死信隊列和普通的沒有區(qū)別。
消息成為死信的情況
- 隊列消息長度到達限制
- 消費者拒簽消息,并且不把消息重新放入原隊列
- 消息到達存活時間未被消費
有些隊列的消息成為死信后,(比如過期了或者隊列滿了)這些死信一般情況下是會被 RabbitMQ 清理的。但是你可以配置某個交換機為此隊列的死信交換機,該隊列的消息成為死信后會被重新發(fā)送到此 DLX 。至于怎么處理這個DLX中的死信就是看具體的業(yè)務場景了,DLX 中的信息可以被路由到新的隊列。
生產者
/** * 普通交換機綁定普通交換機 * * @return */ @Bean public Queue queueA() { //信息配置 Map<String, Object> map = new HashMap<>(); //message在該隊列queue的存活時間最大為15秒 map.put("x-message-ttl", 15000); //x-dead-letter-exchange參數(shù)是設置該隊列的死信交換器(DLX) map.put("x-dead-letter-exchange", "exchangeB"); //x-dead-letter-routing-key參數(shù)是給這個DLX指定路由鍵 map.put("x-dead-letter-routing-key", "queueB"); return new Queue("queueA", true, false, false, map); } @Bean public DirectExchange exchangeA() { return new DirectExchange("exchangeA"); } @Bean public Binding bindingA() { return BindingBuilder .bind(queueA()) .to(exchangeA()).with("queueA"); } /** * 死信交換機綁定死信交換機 * * @return */ @Bean public Queue queueB() { return new Queue("queueB"); } @Bean public DirectExchange exchangeB() { return new DirectExchange("exchangeB"); } @Bean public Binding bindingB() { return BindingBuilder .bind(queueB()) .to(exchangeB()).with("queueB"); }
模擬發(fā)送請求
@RequestMapping("/send6") public String sendSix() throws JsonProcessingException { rabbitTemplate.convertAndSend("exchangeA", "queueA", "檢查訂單是否過期"); return "??"; }
這時我發(fā)送請求到隊列queueA,并設置了15秒的延遲,將超時的信息調用到死信交換機中。在這里我是沒開啟消費者所有沒有消費者去處理該請求的,信息在queueA隊列等待15秒后將會轉到死信交換機queueB隊列進行處理:
延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。經典的應用場景是下單減庫存。
根據(jù)以上結論,在rabbitmq中消費者只要接到信息就會自動確認進行處理。所以在上面并沒有開啟消費者,當請求時效后(如訂單未支付,定時30分鐘自動取消功能)我們不應該再讓它正常處理,而把該請求放到死信交換機中安排對應的處理,所以我們需要打消費者自動處理請求改成手動。
如果手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確認可以在業(yè)務失敗后進行一些操作,如果消息未被 ACK 則會發(fā)送到下一個消費者
如果某個服務忘記 ACK 了,則 RabbitMQ 不會再發(fā)送數(shù)據(jù)給它,因為 RabbitMQ 認為該服務的處理能力有限
ACK 機制還可以起到限流作用,比如在接收到某條消息時休眠幾秒鐘
消息確認模式有:
- AcknowledgeMode.NONE:自動確認
- AcknowledgeMode.AUTO:根據(jù)情況確認
- AcknowledgeMode.MANUAL:手動確認
確認消息(局部方法處理消息)
默認情況下消息消費者是自動 ack (確認)消息的,如果要手動 ack(確認)則需要修改確認模式為 manual
消費者添加手動確認消息配置配置 :
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manua
消費者接受消息:
package com.ycxw.consumer.demos; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class DLXReceiver { @RabbitListener(queues = {"queueA"}) @RabbitHandler public void handlerA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println("已接受到隊列queueA傳遞過來的消息:" + msg); channel.basicReject(tag, false);// 拒接消息,如果為true則拒絕后又從新回到隊列被接受(循環(huán)),除非消息過期。 //channel.basicAck(tag, true); 確認消息()一次性全接受,如果為false則接受一次 } /** * 接受死信消息 * * @param msg */ @RabbitListener(queues = {"queueB"}) @RabbitHandler public void handlerB(String msg) { /** * ...接受到信息,去數(shù)據(jù)庫處理 */ System.out.println("已接受到隊列queueB傳遞過來的消息:" + msg); } }
第一次進入普通隊列別拒絕后,轉到死信隊列中處理...
需要注意的 basicAck 方法需要傳遞兩個參數(shù)
- deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
- multiple:為了減少網絡流量,手動確認可以被批處理,當該參數(shù)為 true 時,則可以一次性確認 delivery_tag 小于等于傳入值的所有消息
到此這篇關于RabbitMQ消息的延遲隊列詳解的文章就介紹到這了,更多相關RabbitMQ延遲隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法
下面小編就為大家分享一篇java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-03-03