RabbitMQ消息的延遲隊(duì)列詳解
Dead Letter Exchange(死信交換機(jī))
在MQ中,當(dāng)消息成為死信(Dead message 死掉的信息)后,消息中間件可以將其從當(dāng)前隊(duì)列發(fā)送到另一個(gè)隊(duì)列中,這個(gè)隊(duì)列就是死信隊(duì)列。而 在RabbitMQ中,由于有交換機(jī)的概念,實(shí)際是將死信發(fā)送給了死信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)。死信交換機(jī)和死信隊(duì)列和普通的沒(méi)有區(qū)別。

消息成為死信的情況
- 隊(duì)列消息長(zhǎng)度到達(dá)限制
- 消費(fèi)者拒簽消息,并且不把消息重新放入原隊(duì)列
- 消息到達(dá)存活時(shí)間未被消費(fèi)
有些隊(duì)列的消息成為死信后,(比如過(guò)期了或者隊(duì)列滿了)這些死信一般情況下是會(huì)被 RabbitMQ 清理的。但是你可以配置某個(gè)交換機(jī)為此隊(duì)列的死信交換機(jī),該隊(duì)列的消息成為死信后會(huì)被重新發(fā)送到此 DLX 。至于怎么處理這個(gè)DLX中的死信就是看具體的業(yè)務(wù)場(chǎng)景了,DLX 中的信息可以被路由到新的隊(duì)列。
生產(chǎn)者

/**
* 普通交換機(jī)綁定普通交換機(jī)
*
* @return
*/
@Bean
public Queue queueA() {
//信息配置
Map<String, Object> map = new HashMap<>();
//message在該隊(duì)列queue的存活時(shí)間最大為15秒
map.put("x-message-ttl", 15000);
//x-dead-letter-exchange參數(shù)是設(shè)置該隊(duì)列的死信交換器(DLX)
map.put("x-dead-letter-exchange", "exchangeB");
//x-dead-letter-routing-key參數(shù)是給這個(gè)DLX指定路由鍵
map.put("x-dead-letter-routing-key", "queueB");
return new Queue("queueA", true, false, false, map);
}
@Bean
public DirectExchange exchangeA() {
return new DirectExchange("exchangeA");
}
@Bean
public Binding bindingA() {
return BindingBuilder
.bind(queueA())
.to(exchangeA()).with("queueA");
}
/**
* 死信交換機(jī)綁定死信交換機(jī)
*
* @return
*/
@Bean
public Queue queueB() {
return new Queue("queueB");
}
@Bean
public DirectExchange exchangeB() {
return new DirectExchange("exchangeB");
}
@Bean
public Binding bindingB() {
return BindingBuilder
.bind(queueB())
.to(exchangeB()).with("queueB");
}模擬發(fā)送請(qǐng)求
@RequestMapping("/send6")
public String sendSix() throws JsonProcessingException {
rabbitTemplate.convertAndSend("exchangeA", "queueA", "檢查訂單是否過(guò)期");
return "??";
}這時(shí)我發(fā)送請(qǐng)求到隊(duì)列queueA,并設(shè)置了15秒的延遲,將超時(shí)的信息調(diào)用到死信交換機(jī)中。在這里我是沒(méi)開(kāi)啟消費(fèi)者所有沒(méi)有消費(fèi)者去處理該請(qǐng)求的,信息在queueA隊(duì)列等待15秒后將會(huì)轉(zhuǎn)到死信交換機(jī)queueB隊(duì)列進(jìn)行處理:

延遲隊(duì)列
延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間后,才會(huì)被消費(fèi)。經(jīng)典的應(yīng)用場(chǎng)景是下單減庫(kù)存。

根據(jù)以上結(jié)論,在rabbitmq中消費(fèi)者只要接到信息就會(huì)自動(dòng)確認(rèn)進(jìn)行處理。所以在上面并沒(méi)有開(kāi)啟消費(fèi)者,當(dāng)請(qǐng)求時(shí)效后(如訂單未支付,定時(shí)30分鐘自動(dòng)取消功能)我們不應(yīng)該再讓它正常處理,而把該請(qǐng)求放到死信交換機(jī)中安排對(duì)應(yīng)的處理,所以我們需要打消費(fèi)者自動(dòng)處理請(qǐng)求改成手動(dòng)。
如果手動(dòng)確認(rèn)則當(dāng)消費(fèi)者調(diào)用 ack、nack、reject 幾種方法進(jìn)行確認(rèn),手動(dòng)確認(rèn)可以在業(yè)務(wù)失敗后進(jìn)行一些操作,如果消息未被 ACK 則會(huì)發(fā)送到下一個(gè)消費(fèi)者
如果某個(gè)服務(wù)忘記 ACK 了,則 RabbitMQ 不會(huì)再發(fā)送數(shù)據(jù)給它,因?yàn)?RabbitMQ 認(rèn)為該服務(wù)的處理能力有限
ACK 機(jī)制還可以起到限流作用,比如在接收到某條消息時(shí)休眠幾秒鐘
消息確認(rèn)模式有:
- AcknowledgeMode.NONE:自動(dòng)確認(rèn)
- AcknowledgeMode.AUTO:根據(jù)情況確認(rèn)
- AcknowledgeMode.MANUAL:手動(dòng)確認(rèn)
確認(rèn)消息(局部方法處理消息)
默認(rèn)情況下消息消費(fèi)者是自動(dòng) ack (確認(rèn))消息的,如果要手動(dòng) ack(確認(rèn))則需要修改確認(rèn)模式為 manual
消費(fèi)者添加手動(dòng)確認(rèn)消息配置配置 :
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manua
消費(fèi)者接受消息:
package com.ycxw.consumer.demos;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DLXReceiver {
@RabbitListener(queues = {"queueA"})
@RabbitHandler
public void handlerA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("已接受到隊(duì)列queueA傳遞過(guò)來(lái)的消息:" + msg);
channel.basicReject(tag, false);// 拒接消息,如果為true則拒絕后又從新回到隊(duì)列被接受(循環(huán)),除非消息過(guò)期。
//channel.basicAck(tag, true); 確認(rèn)消息()一次性全接受,如果為false則接受一次
}
/**
* 接受死信消息
*
* @param msg
*/
@RabbitListener(queues = {"queueB"})
@RabbitHandler
public void handlerB(String msg) {
/**
* ...接受到信息,去數(shù)據(jù)庫(kù)處理
*/
System.out.println("已接受到隊(duì)列queueB傳遞過(guò)來(lái)的消息:" + msg);
}
}第一次進(jìn)入普通隊(duì)列別拒絕后,轉(zhuǎn)到死信隊(duì)列中處理...

需要注意的 basicAck 方法需要傳遞兩個(gè)參數(shù)
- deliveryTag(唯一標(biāo)識(shí) ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊(cè)后,會(huì)建立起一個(gè) Channel ,RabbitMQ 會(huì)用 basic.deliver 方法向消費(fèi)者推送消息,這個(gè)方法攜帶了一個(gè) delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識(shí) ID,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
- multiple:為了減少網(wǎng)絡(luò)流量,手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息
到此這篇關(guān)于RabbitMQ消息的延遲隊(duì)列詳解的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java的內(nèi)存區(qū)域與內(nèi)存溢出異常你了解嗎
這篇文章主要為大家詳細(xì)介紹了Java的內(nèi)存區(qū)域與內(nèi)存溢出異常,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-03-03
Spring如何使用@Indexed加快啟動(dòng)速度
這篇文章主要介紹了Spring如何使用@Indexed加快啟動(dòng)速度,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
java 通過(guò)發(fā)送json,post請(qǐng)求,返回json數(shù)據(jù)的方法
下面小編就為大家分享一篇java 通過(guò)發(fā)送json,post請(qǐng)求,返回json數(shù)據(jù)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-03-03
springboot如何實(shí)現(xiàn)自動(dòng)裝配源碼解讀
這篇文章主要介紹了springboot如何實(shí)現(xiàn)自動(dòng)裝配源碼賞析,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12
Java?IDEA集成開(kāi)發(fā)工具中英文切換圖文教程
相信很多小伙伴們剛接觸IDEA時(shí),看到一堆英文界面不知道如何下手,這篇文章主要給大家介紹了關(guān)于Java?IDEA集成開(kāi)發(fā)工具中英文切換的相關(guān)資料,需要的朋友可以參考下2024-04-04

