Springboot死信隊列?DLX?配置和使用思路分析
前言
上一篇博客 Springboot——整合RabbitMq測試TTL中,針對設(shè)置單個消息期限
或者整個隊列消息期限
,進(jìn)行了一些配置和說明。同時也都列舉了一些區(qū)別關(guān)系。
但考慮過一個問題了沒有?
不管是設(shè)置哪種方式,如果消息期限到了,隊列都會將該消息進(jìn)行丟棄處理。
這么做合適么?
假設(shè)是某個設(shè)備的重要信息
,或者某個重要的訂單信息
,因為規(guī)定時間內(nèi)未被及時消費就將其舍棄
,是否會造成很嚴(yán)重的后果?
有人會說,設(shè)置消息永不過期!等著消費者能夠成功監(jiān)聽到該隊列,將消息消費不就可以了嘛!
但這里需要考慮另外一個問題:
每個服務(wù)器的容量是有上限的!如果消息一直存在隊列,如果一直不會被消費,豈不是很占用服務(wù)器資源?
如何解決這個問題,就是今天這篇文章需要說到的死信隊列
。
什么是死信
說道死信
,可能大部分觀眾大姥爺會有懵逼的想法,什么是死信?
死信隊列,俗稱
DLX
,翻譯過來的名稱為Dead Letter Exchange
死信交換機(jī)
。
當(dāng)消息限定時間內(nèi)未被消費,成為
Dead Message
后,可以被重新發(fā)送
到另一個交換機(jī)
中,發(fā)揮其應(yīng)有的價值!
配置和測試死信
思路分析
需要測試死信隊列
,則需要先梳理整體的思路,如可以采取如下方式進(jìn)行配置:
從上面的邏輯圖中,可以發(fā)現(xiàn)大致的思路:
1、消息隊列分為正常交換機(jī)
、正常消息隊列
;以及死信交換機(jī)
和死信隊列
。
2、正常隊列針對死信信息
,需要將數(shù)據(jù)
重新
發(fā)送至死信交換機(jī)
中。
配置類編寫
結(jié)合上面的思路,編寫具體的配置類。如下所示:
package cn.linkpower.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 死信隊列配置 */ @Configuration public class DeadMsgMqConfig { // 定義正常交換機(jī)和正常隊列信息(交換機(jī)名、隊列名、路由key) public static final String queue_name = "xj_natural_queue"; public static final String exchange_name = "xj_natural_exchange"; public static final String routing_key = "xj_natural_routingKey"; // 定義死信交換機(jī)名、死信隊列名、路由key public static final String queue_name_dead = "xj_dead_queue"; public static final String exchange_name_dead = "xj_dead_exchange"; public static final String routing_key_dead = "xj_dead_routingKey"; /** * 設(shè)置正常的消息隊列; * 正常的消息隊列具備以下幾種功能: * 1、消息正常消費,需要綁定對應(yīng)的消費者(這里為了測試死信,不創(chuàng)建消費者) * 2、當(dāng)消息失效后,需要將指定的消息發(fā)送至 死信交換機(jī) 中 * @return */ @Bean(value = "getNaturalQueue") public Queue getNaturalQueue(){ return QueueBuilder.durable(queue_name) // 正常的隊列,在消息失效后,需要將消息丟入 死信 交換機(jī)中 // 這里只需要針對名稱進(jìn)行綁定 .withArgument("x-dead-letter-exchange",exchange_name_dead) // 丟入 死信交換機(jī),需要設(shè)定指定的 routingkey .withArgument("x-dead-letter-routing-key",routing_key_dead) // 設(shè)置正常隊列中消息的存活時間為 10s,當(dāng)然也可以針對單個消息進(jìn)行設(shè)定不同的過期時間 .withArgument("x-message-ttl",10000) // 設(shè)定當(dāng)前隊列中,允許存放的最大消息數(shù)目 .withArgument("x-max-length",10) .build(); } * 設(shè)定正常的消息交換機(jī) @Bean(value = "getNaturalExchange") public Exchange getNaturalExchange(){ // 這里為了測試,采取 direct exchange return ExchangeBuilder.directExchange(exchange_name) .durable(true) // 設(shè)定持久化 * 將正常的消息交換機(jī)和正常的消息隊列進(jìn)行綁定 * @param queue * @param directExchange @Bean public Binding bindNaturalExchangeAndQueue( @Qualifier(value = "getNaturalQueue") Queue queue, @Qualifier(value = "getNaturalExchange") Exchange directExchange ){ return BindingBuilder // 綁定消息隊列 .bind(queue) // 至指定的消息交換機(jī) .to(directExchange) // 匹配 routingkey .with(routing_key) // 無參數(shù),不加會報錯提示 .noargs(); * 定義死信隊列 @Bean(value = "getDealQueue") public Queue getDealQueue(){ return QueueBuilder.durable(queue_name_dead).build(); * 定義死信交換機(jī) @Bean(value = "getDeadExchange") public Exchange getDeadExchange(){ return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build(); * 將死信交換機(jī)和死信隊列進(jìn)行綁定 * @param deadQueue * @param directDeadExchange public Binding bindDeadExchangeAndQueue( @Qualifier(value = "getDealQueue") Queue deadQueue, @Qualifier(value = "getDeadExchange") Exchange directDeadExchange return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs(); }
編寫消息發(fā)送服務(wù)
默認(rèn)采取rabbitTemplate.convertAndSend
方法,進(jìn)行消息的發(fā)送處理。但為了保證消息生產(chǎn)者
能夠成功將數(shù)據(jù)發(fā)送至正常交換機(jī)
,同時為了保證正常交換機(jī)
能夠?qū)?shù)據(jù)信息,推送至正常消息隊列
。需要對其增加監(jiān)聽。
package cn.linkpower.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 直接發(fā)送消息 * @param exchange * @param routingKey * @param msg */ public void sendMessage(String exchange,String routingKey,Object msg) { // 設(shè)置交換機(jī)處理失敗消息的模式 true 表示消息由交換機(jī) 到達(dá)不了隊列時,會將消息重新返回給生產(chǎn)者 // 如果不設(shè)置這個指令,則交換機(jī)向隊列推送消息失敗后,不會觸發(fā) setReturnCallback rabbitTemplate.setMandatory(true); //消息消費者確認(rèn)收到消息后,手動ack回執(zhí) rabbitTemplate.setConfirmCallback(this); // return 配置 rabbitTemplate.setReturnCallback(this); //發(fā)送消息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } * 交換機(jī)并未將數(shù)據(jù)丟入指定的隊列中時,觸發(fā) * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 參數(shù)三:true 表示如果消息無法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄 * @param message 消息對象 * @param replyCode 錯誤碼 * @param replyText 錯誤信息 * @param exchange 交換機(jī) * @param routingKey 路由鍵 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); * 消息生產(chǎn)者發(fā)送消息至交換機(jī)時觸發(fā),用于判斷交換機(jī)是否成功收到消息 * @param correlationData 相關(guān)配置信息 * @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息 true 表示交換機(jī)收到 * @param cause 失敗原因 public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機(jī)接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } }
測試
既然說到測試,那么需要編寫一個測試類
,能夠?qū)a(chǎn)生的消息,推送至指定的正常消息交換機(jī)
中去。
package cn.linkpower.controller; import cn.linkpower.config.DeadMsgMqConfig; import cn.linkpower.service.RabbitmqService; import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.jni.Time; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; @Slf4j @RestController public class DeadMsgController { @Autowired private RabbitmqService rabbitmqService; @RequestMapping("/deadMsgTest") public String deadMsgTest() throws InterruptedException { // 向正常的消息隊列中丟數(shù)據(jù),測試限定時間未消費后,死信隊列的情況 // 配置文件中,針對于正常隊列而言,設(shè)置有10條上限大小 for (int i = 0; i < 20; i++) { String msg = "dead msg test "+i; log.info("發(fā)送消息,消息信息為:{}",msg); // 向正常的消息交換機(jī)中傳遞數(shù)據(jù) rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg); TimeUnit.SECONDS.sleep(2); } return "ok"; } }
啟動項目,訪問指定的鏈接,進(jìn)行數(shù)據(jù)產(chǎn)生和將消息發(fā)送交換機(jī)操作:
http://localhost/deadMsgTest
控制臺部分日志展示:
消息什么時候會成為死信消息?
1、隊列消息長度到達(dá)限制;
2、消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊列,requeue=false
;
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
3、原隊列存在消息過期設(shè)置,消息到達(dá)超時時間未被消費;
總結(jié)
此處只是為了進(jìn)行配置和測試需要,暫未定義任何正常消息隊列消費者
和死信消息隊列消費者
信息。
1、死信交換機(jī)和死信隊列和普通的沒有區(qū)別
2、當(dāng)消息成為死信后,如果該隊列綁定了死信交換機(jī),則消息會被死信交換機(jī)重新路由到死信隊列
參考資料
代碼下載
到此這篇關(guān)于Springboot死信隊列 DLX 配置和使用的文章就介紹到這了,更多相關(guān)Springboot死信隊列 DLX內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基礎(chǔ)之SpringBoot整合knife4j
Swagger現(xiàn)在已經(jīng)成了最流行的接口文檔生成與管理工具,但是你是否在用的時候也在吐槽,它是真的不好看,接口測試的json數(shù)據(jù)沒法格式化,測試地址如果更改了還要去改配置,接口測試時增加token驗證是真的麻煩…針對Swagger的種種缺點,Knife4j就呼之欲出了.需要的朋友可以參考下2021-05-05SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版)
這篇文章主要介紹了SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05java實現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字
這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-11-11