Springboot死信隊(duì)列?DLX?配置和使用思路分析
前言
上一篇博客 Springboot——整合RabbitMq測(cè)試TTL中,針對(duì)設(shè)置單個(gè)消息期限
或者整個(gè)隊(duì)列消息期限
,進(jìn)行了一些配置和說(shuō)明。同時(shí)也都列舉了一些區(qū)別關(guān)系。
但考慮過(guò)一個(gè)問(wèn)題了沒(méi)有?
不管是設(shè)置哪種方式,如果消息期限到了,隊(duì)列都會(huì)將該消息進(jìn)行丟棄處理。
這么做合適么?
假設(shè)是某個(gè)設(shè)備的重要信息
,或者某個(gè)重要的訂單信息
,因?yàn)?code>規(guī)定時(shí)間內(nèi)未被及時(shí)消費(fèi)就將其舍棄,是否會(huì)造成很嚴(yán)重的后果?
有人會(huì)說(shuō),設(shè)置消息永不過(guò)期!等著消費(fèi)者能夠成功監(jiān)聽到該隊(duì)列,將消息消費(fèi)不就可以了嘛!
但這里需要考慮另外一個(gè)問(wèn)題:
每個(gè)服務(wù)器的容量是有上限的!如果消息一直存在隊(duì)列,如果一直不會(huì)被消費(fèi),豈不是很占用服務(wù)器資源?
如何解決這個(gè)問(wèn)題,就是今天這篇文章需要說(shuō)到的死信隊(duì)列
。
什么是死信
說(shuō)道死信
,可能大部分觀眾大姥爺會(huì)有懵逼的想法,什么是死信?
死信隊(duì)列,俗稱
DLX
,翻譯過(guò)來(lái)的名稱為Dead Letter Exchange
死信交換機(jī)
。
當(dāng)消息限定時(shí)間內(nèi)未被消費(fèi),成為
Dead Message
后,可以被重新發(fā)送
到另一個(gè)交換機(jī)
中,發(fā)揮其應(yīng)有的價(jià)值!
配置和測(cè)試死信
思路分析
需要測(cè)試死信隊(duì)列
,則需要先梳理整體的思路,如可以采取如下方式進(jìn)行配置:
從上面的邏輯圖中,可以發(fā)現(xiàn)大致的思路:
1、消息隊(duì)列分為正常交換機(jī)
、正常消息隊(duì)列
;以及死信交換機(jī)
和死信隊(duì)列
。
2、正常隊(duì)列針對(duì)死信信息
,需要將數(shù)據(jù)
重新
發(fā)送至死信交換機(jī)
中。
配置類編寫
結(jié)合上面的思路,編寫具體的配置類。如下所示:
package cn.linkpower.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 死信隊(duì)列配置 */ @Configuration public class DeadMsgMqConfig { // 定義正常交換機(jī)和正常隊(duì)列信息(交換機(jī)名、隊(duì)列名、路由key) public static final String queue_name = "xj_natural_queue"; public static final String exchange_name = "xj_natural_exchange"; public static final String routing_key = "xj_natural_routingKey"; // 定義死信交換機(jī)名、死信隊(duì)列名、路由key public static final String queue_name_dead = "xj_dead_queue"; public static final String exchange_name_dead = "xj_dead_exchange"; public static final String routing_key_dead = "xj_dead_routingKey"; /** * 設(shè)置正常的消息隊(duì)列; * 正常的消息隊(duì)列具備以下幾種功能: * 1、消息正常消費(fèi),需要綁定對(duì)應(yīng)的消費(fèi)者(這里為了測(cè)試死信,不創(chuàng)建消費(fèi)者) * 2、當(dāng)消息失效后,需要將指定的消息發(fā)送至 死信交換機(jī) 中 * @return */ @Bean(value = "getNaturalQueue") public Queue getNaturalQueue(){ return QueueBuilder.durable(queue_name) // 正常的隊(duì)列,在消息失效后,需要將消息丟入 死信 交換機(jī)中 // 這里只需要針對(duì)名稱進(jìn)行綁定 .withArgument("x-dead-letter-exchange",exchange_name_dead) // 丟入 死信交換機(jī),需要設(shè)定指定的 routingkey .withArgument("x-dead-letter-routing-key",routing_key_dead) // 設(shè)置正常隊(duì)列中消息的存活時(shí)間為 10s,當(dāng)然也可以針對(duì)單個(gè)消息進(jìn)行設(shè)定不同的過(guò)期時(shí)間 .withArgument("x-message-ttl",10000) // 設(shè)定當(dāng)前隊(duì)列中,允許存放的最大消息數(shù)目 .withArgument("x-max-length",10) .build(); } * 設(shè)定正常的消息交換機(jī) @Bean(value = "getNaturalExchange") public Exchange getNaturalExchange(){ // 這里為了測(cè)試,采取 direct exchange return ExchangeBuilder.directExchange(exchange_name) .durable(true) // 設(shè)定持久化 * 將正常的消息交換機(jī)和正常的消息隊(duì)列進(jìn)行綁定 * @param queue * @param directExchange @Bean public Binding bindNaturalExchangeAndQueue( @Qualifier(value = "getNaturalQueue") Queue queue, @Qualifier(value = "getNaturalExchange") Exchange directExchange ){ return BindingBuilder // 綁定消息隊(duì)列 .bind(queue) // 至指定的消息交換機(jī) .to(directExchange) // 匹配 routingkey .with(routing_key) // 無(wú)參數(shù),不加會(huì)報(bào)錯(cuò)提示 .noargs(); * 定義死信隊(duì)列 @Bean(value = "getDealQueue") public Queue getDealQueue(){ return QueueBuilder.durable(queue_name_dead).build(); * 定義死信交換機(jī) @Bean(value = "getDeadExchange") public Exchange getDeadExchange(){ return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build(); * 將死信交換機(jī)和死信隊(duì)列進(jìn)行綁定 * @param deadQueue * @param directDeadExchange public Binding bindDeadExchangeAndQueue( @Qualifier(value = "getDealQueue") Queue deadQueue, @Qualifier(value = "getDeadExchange") Exchange directDeadExchange return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs(); }
編寫消息發(fā)送服務(wù)
默認(rèn)采取rabbitTemplate.convertAndSend
方法,進(jìn)行消息的發(fā)送處理。但為了保證消息生產(chǎn)者
能夠成功將數(shù)據(jù)發(fā)送至正常交換機(jī)
,同時(shí)為了保證正常交換機(jī)
能夠?qū)?shù)據(jù)信息,推送至正常消息隊(duì)列
。需要對(duì)其增加監(jiān)聽。
package cn.linkpower.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 直接發(fā)送消息 * @param exchange * @param routingKey * @param msg */ public void sendMessage(String exchange,String routingKey,Object msg) { // 設(shè)置交換機(jī)處理失敗消息的模式 true 表示消息由交換機(jī) 到達(dá)不了隊(duì)列時(shí),會(huì)將消息重新返回給生產(chǎn)者 // 如果不設(shè)置這個(gè)指令,則交換機(jī)向隊(duì)列推送消息失敗后,不會(huì)觸發(fā) setReturnCallback rabbitTemplate.setMandatory(true); //消息消費(fèi)者確認(rèn)收到消息后,手動(dòng)ack回執(zhí) rabbitTemplate.setConfirmCallback(this); // return 配置 rabbitTemplate.setReturnCallback(this); //發(fā)送消息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } * 交換機(jī)并未將數(shù)據(jù)丟入指定的隊(duì)列中時(shí),觸發(fā) * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 參數(shù)三:true 表示如果消息無(wú)法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄 * @param message 消息對(duì)象 * @param replyCode 錯(cuò)誤碼 * @param replyText 錯(cuò)誤信息 * @param exchange 交換機(jī) * @param routingKey 路由鍵 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); * 消息生產(chǎn)者發(fā)送消息至交換機(jī)時(shí)觸發(fā),用于判斷交換機(jī)是否成功收到消息 * @param correlationData 相關(guān)配置信息 * @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息 true 表示交換機(jī)收到 * @param cause 失敗原因 public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機(jī)接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒(méi)有接收到 log.info("---- confirm ----ack==false cause="+cause); } }
測(cè)試
既然說(shuō)到測(cè)試,那么需要編寫一個(gè)測(cè)試類
,能夠?qū)a(chǎn)生的消息,推送至指定的正常消息交換機(jī)
中去。
package cn.linkpower.controller; import cn.linkpower.config.DeadMsgMqConfig; import cn.linkpower.service.RabbitmqService; import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.jni.Time; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; @Slf4j @RestController public class DeadMsgController { @Autowired private RabbitmqService rabbitmqService; @RequestMapping("/deadMsgTest") public String deadMsgTest() throws InterruptedException { // 向正常的消息隊(duì)列中丟數(shù)據(jù),測(cè)試限定時(shí)間未消費(fèi)后,死信隊(duì)列的情況 // 配置文件中,針對(duì)于正常隊(duì)列而言,設(shè)置有10條上限大小 for (int i = 0; i < 20; i++) { String msg = "dead msg test "+i; log.info("發(fā)送消息,消息信息為:{}",msg); // 向正常的消息交換機(jī)中傳遞數(shù)據(jù) rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg); TimeUnit.SECONDS.sleep(2); } return "ok"; } }
啟動(dòng)項(xiàng)目,訪問(wèn)指定的鏈接,進(jìn)行數(shù)據(jù)產(chǎn)生和將消息發(fā)送交換機(jī)操作:
http://localhost/deadMsgTest
控制臺(tái)部分日志展示:
消息什么時(shí)候會(huì)成為死信消息?
1、隊(duì)列消息長(zhǎng)度到達(dá)限制;
2、消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊(duì)列,requeue=false
;
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
3、原隊(duì)列存在消息過(guò)期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);
總結(jié)
此處只是為了進(jìn)行配置和測(cè)試需要,暫未定義任何正常消息隊(duì)列消費(fèi)者
和死信消息隊(duì)列消費(fèi)者
信息。
1、死信交換機(jī)和死信隊(duì)列和普通的沒(méi)有區(qū)別
2、當(dāng)消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列
參考資料
RabbitMQ死信隊(duì)列在SpringBoot中的使用
代碼下載
到此這篇關(guān)于Springboot死信隊(duì)列 DLX 配置和使用的文章就介紹到這了,更多相關(guān)Springboot死信隊(duì)列 DLX內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基礎(chǔ)之SpringBoot整合knife4j
Swagger現(xiàn)在已經(jīng)成了最流行的接口文檔生成與管理工具,但是你是否在用的時(shí)候也在吐槽,它是真的不好看,接口測(cè)試的json數(shù)據(jù)沒(méi)法格式化,測(cè)試地址如果更改了還要去改配置,接口測(cè)試時(shí)增加token驗(yàn)證是真的麻煩…針對(duì)Swagger的種種缺點(diǎn),Knife4j就呼之欲出了.需要的朋友可以參考下2021-05-05SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版)
這篇文章主要介紹了SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05java實(shí)現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-11-11Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法實(shí)例
這篇文章主要給大家介紹了關(guān)于Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01Java動(dòng)態(tài)代理分析及簡(jiǎn)單實(shí)例
這篇文章主要介紹了 Java動(dòng)態(tài)代理分析及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-02-02