關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
概念介紹
什么是死信
死信可以理解成沒(méi)有被正常消費(fèi)的消息,在RabbitMQ中以下幾種情況會(huì)被認(rèn)定為死信:
- 消費(fèi)者使用basic.reject或basic.nack(重新排隊(duì)參數(shù)設(shè)置為false)對(duì)消息進(jìn)行否定確認(rèn)。
- 消息到達(dá)生存時(shí)間還未被消費(fèi)。
- 隊(duì)列超過(guò)長(zhǎng)度限制,消息被丟棄。
這些消息會(huì)被發(fā)送到死信交換機(jī)并路由到死信隊(duì)列中(在RabbitMQ中死信交換機(jī)和死信隊(duì)列就是普通的交換機(jī)和隊(duì)列)。其流轉(zhuǎn)過(guò)程如下圖

死信隊(duì)列應(yīng)用
- 作為消息可靠性的一個(gè)擴(kuò)展。比如,在隊(duì)列已滿的情況下也不會(huì)丟失消息。
- 可以實(shí)現(xiàn)延遲消費(fèi)功能。比如,訂單15分鐘內(nèi)未支付。
注意事項(xiàng):基于死信隊(duì)列實(shí)現(xiàn)的延遲消費(fèi)不適合時(shí)間過(guò)于復(fù)雜的場(chǎng)景。比如,一個(gè)隊(duì)列中第一條消息TTL為10s,第二條消息TTL為5s,由于RabbitMQ只會(huì)監(jiān)聽第一條消息,所以本應(yīng)第二條消息先達(dá)到TTL會(huì)在第一條消息的TTL之后。對(duì)于該現(xiàn)象有兩種解決方案:
- 維護(hù)多個(gè)隊(duì)列,每個(gè)隊(duì)列維護(hù)一個(gè)TTL時(shí)間。
- 使用延遲交換機(jī)。這種方式需要下載插件支持
工程搭建
環(huán)境說(shuō)明
- RabbitMQ環(huán)境
- Java版本:JDK1.8
- Maven版本:apache-maven-3.6.3
- 開發(fā)工具:IntelliJ IDEA
搭建步驟
1.創(chuàng)建SpringBoot項(xiàng)目。
2.pom.xml文件導(dǎo)入RabbitMQ依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.application.yml文件添加RabbitMQ配置。
spring:
# rabbitmq配置信息 RabbitProperties類
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
# 開啟confirm機(jī)制
publisher-confirm-type: correlated
# 開啟return機(jī)制
publisher-returns: true
#全局配置,局部配置存在就以局部為準(zhǔn)
listener:
simple:
acknowledge-mode: manual # 手動(dòng)ACK實(shí)現(xiàn)死信
準(zhǔn)備Exchange&Queue
@Configuration
public class RabbitMQConfig {
/**
* 正常隊(duì)列
*/
public static final String EXCHANGE = "boot-exchange";
public static final String QUEUE = "boot-queue";
public static final String ROUTING_KEY = "boot-rout";
/**
* 死信隊(duì)列
*/
public static final String DEAD_EXCHANGE = "dead-exchange";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead-rout";
/**
* 聲明死信交換機(jī)
*
* @return
*/
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
}
/**
* 聲明死信隊(duì)列
*
* @return
*/
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
/**
* 綁定死信的隊(duì)列和交換機(jī)
*
* @param deadExchange
* @param deadQueue
* @return
*/
@Bean
public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
/**
* 聲明交換機(jī),同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
*
* @return
*/
@Bean
public Exchange bootExchange() {
return ExchangeBuilder.directExchange(EXCHANGE).build();
}
/**
* 聲明隊(duì)列,同channel.queueDeclare(QUEUE, true, false, false, null);
* 綁定死信交換機(jī)及路由key
*
* @return
*/
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//聲明隊(duì)列屬性有更改時(shí)需要?jiǎng)h除隊(duì)列
//給隊(duì)列設(shè)置消息時(shí)長(zhǎng)
//.ttl(10000)
//隊(duì)列最大長(zhǎng)度
.maxLength(1)
.build();
}
/**
* 綁定隊(duì)列和交換機(jī),同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
*
* @param bootExchange
* @param bootQueue
* @return
*/
@Bean
public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
}
}監(jiān)聽死信隊(duì)列
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
public void listener_dead(String msg, Channel channel, Message message) throws IOException {
System.out.println("死信接收到消息" + msg);
System.out.println("唯一標(biāo)識(shí):" + message.getMessageProperties().getCorrelationId());
System.out.println("messageID:" + message.getMessageProperties().getMessageId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}方式一、消費(fèi)者拒絕&否認(rèn)
- 拒絕消息
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void listener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息" + msg);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
}- 否認(rèn)消息
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void listener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息" + msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}方式二、超過(guò)消息TTL 發(fā)送消息時(shí)設(shè)置TTL
@SpringBootTest
public class Publisher {
@Autowired
private RabbitTemplate template;
/**
* 5秒未被消費(fèi)會(huì)路由到死信隊(duì)列
*/
@Test
public void publish_expir() {
template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
message.getMessageProperties().setExpiration("5000");
return message;
});
}
}- 設(shè)置隊(duì)列所有消息的TTL
更新RabbitMQConfig類中bootQueue() ,更新后需要?jiǎng)h除隊(duì)列,因?yàn)殛?duì)列屬性有更改。
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//聲明隊(duì)列屬性有更改時(shí)需要?jiǎng)h除隊(duì)列
//給隊(duì)列設(shè)置消息時(shí)長(zhǎng)
.ttl(10000)
.build();
}方式三、超過(guò)隊(duì)列長(zhǎng)度限制
設(shè)置隊(duì)列長(zhǎng)度限制,當(dāng)隊(duì)列長(zhǎng)度超過(guò)設(shè)置的閾值,消息便會(huì)路由到死信隊(duì)列。
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//聲明隊(duì)列屬性有更改時(shí)需要?jiǎng)h除隊(duì)列
.maxLength(1)
.build();
}代碼倉(cāng)庫(kù) 點(diǎn)我
到此這篇關(guān)于關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列的文章就介紹到這了,更多相關(guān)RabbitMQ實(shí)現(xiàn)死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- RabbitMQ之死信隊(duì)列深入解析
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Springboot集成ClickHouse及應(yīng)用場(chǎng)景分析
這篇文章主要介紹了Springboot集成ClickHouse的實(shí)例代碼,本文通過(guò)應(yīng)用場(chǎng)景實(shí)例代碼介紹了整合springboot的詳細(xì)過(guò)程,感興趣的朋友跟隨小編一起看看吧2022-02-02
Mybatis的mapper標(biāo)簽 namespace屬性用法說(shuō)明
這篇文章主要介紹了Mybatis的mapper標(biāo)簽 namespace屬性用法說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Java?PTA?計(jì)算3到7位?水仙花數(shù)實(shí)例
這篇文章主要介紹了Java?PTA?計(jì)算3到7位?水仙花數(shù)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
如何解決redis的NOAUTH Authentication required異常
這篇文章主要介紹了Jedis異常解決:NOAUTH Authentication required,,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值2019-07-07
詳解springboot項(xiàng)目帶Tomcat和不帶Tomcat的兩種打包方式
這篇文章主要介紹了詳解springboot項(xiàng)目帶Tomcat和不帶Tomcat的兩種打包方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
Springboot MultipartFile文件上傳與下載的實(shí)現(xiàn)示例
在Spring Boot項(xiàng)目中,可以使用MultipartFile類來(lái)處理文件上傳和下載操作,本文就詳細(xì)介紹了如何使用,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08
SpringCloud配置服務(wù)端的ConfigServer設(shè)置安全認(rèn)證
這篇文章主要為大家介紹了SpringCloud配置服務(wù)端的ConfigServer設(shè)置安全認(rèn)證,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08

