關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
概念介紹
什么是死信
死信可以理解成沒有被正常消費(fèi)的消息,在RabbitMQ中以下幾種情況會(huì)被認(rèn)定為死信:
- 消費(fèi)者使用basic.reject或basic.nack(重新排隊(duì)參數(shù)設(shè)置為false)對(duì)消息進(jìn)行否定確認(rèn)。
- 消息到達(dá)生存時(shí)間還未被消費(fèi)。
- 隊(duì)列超過長(zhǎng)度限制,消息被丟棄。
這些消息會(huì)被發(fā)送到死信交換機(jī)并路由到死信隊(duì)列中(在RabbitMQ中死信交換機(jī)和死信隊(duì)列就是普通的交換機(jī)和隊(duì)列)。其流轉(zhuǎn)過程如下圖
死信隊(duì)列應(yīng)用
- 作為消息可靠性的一個(gè)擴(kuò)展。比如,在隊(duì)列已滿的情況下也不會(huì)丟失消息。
- 可以實(shí)現(xiàn)延遲消費(fèi)功能。比如,訂單15分鐘內(nèi)未支付。
注意事項(xiàng):基于死信隊(duì)列實(shí)現(xiàn)的延遲消費(fèi)不適合時(shí)間過于復(fù)雜的場(chǎng)景。比如,一個(gè)隊(duì)列中第一條消息TTL為10s,第二條消息TTL為5s,由于RabbitMQ只會(huì)監(jiān)聽第一條消息,所以本應(yīng)第二條消息先達(dá)到TTL會(huì)在第一條消息的TTL之后。對(duì)于該現(xiàn)象有兩種解決方案:
- 維護(hù)多個(gè)隊(duì)列,每個(gè)隊(duì)列維護(hù)一個(gè)TTL時(shí)間。
- 使用延遲交換機(jī)。這種方式需要下載插件支持
工程搭建
環(huán)境說明
- RabbitMQ環(huán)境
- Java版本:JDK1.8
- Maven版本:apache-maven-3.6.3
- 開發(fā)工具:IntelliJ IDEA
搭建步驟
1.創(chuàng)建SpringBoot項(xiàng)目。
2.pom.xml文件導(dǎo)入RabbitMQ依賴。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.application.yml文件添加RabbitMQ配置。
spring: # rabbitmq配置信息 RabbitProperties類 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / # 開啟confirm機(jī)制 publisher-confirm-type: correlated # 開啟return機(jī)制 publisher-returns: true #全局配置,局部配置存在就以局部為準(zhǔn) listener: simple: acknowledge-mode: manual # 手動(dòng)ACK
實(shí)現(xiàn)死信
準(zhǔn)備Exchange&Queue
@Configuration public class RabbitMQConfig { /** * 正常隊(duì)列 */ public static final String EXCHANGE = "boot-exchange"; public static final String QUEUE = "boot-queue"; public static final String ROUTING_KEY = "boot-rout"; /** * 死信隊(duì)列 */ public static final String DEAD_EXCHANGE = "dead-exchange"; public static final String DEAD_QUEUE = "dead-queue"; public static final String DEAD_ROUTING_KEY = "dead-rout"; /** * 聲明死信交換機(jī) * * @return */ @Bean public Exchange deadExchange() { return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build(); } /** * 聲明死信隊(duì)列 * * @return */ @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 綁定死信的隊(duì)列和交換機(jī) * * @param deadExchange * @param deadQueue * @return */ @Bean public Binding deadBind(Exchange deadExchange, Queue deadQueue) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } /** * 聲明交換機(jī),同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); * * @return */ @Bean public Exchange bootExchange() { return ExchangeBuilder.directExchange(EXCHANGE).build(); } /** * 聲明隊(duì)列,同channel.queueDeclare(QUEUE, true, false, false, null); * 綁定死信交換機(jī)及路由key * * @return */ @Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //聲明隊(duì)列屬性有更改時(shí)需要?jiǎng)h除隊(duì)列 //給隊(duì)列設(shè)置消息時(shí)長(zhǎng) //.ttl(10000) //隊(duì)列最大長(zhǎng)度 .maxLength(1) .build(); } /** * 綁定隊(duì)列和交換機(jī),同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY); * * @param bootExchange * @param bootQueue * @return */ @Bean public Binding bootBind(Exchange bootExchange, Queue bootQueue) { return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs(); } }
監(jiān)聽死信隊(duì)列
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) public void listener_dead(String msg, Channel channel, Message message) throws IOException { System.out.println("死信接收到消息" + msg); System.out.println("唯一標(biāo)識(shí):" + message.getMessageProperties().getCorrelationId()); System.out.println("messageID:" + message.getMessageProperties().getMessageId()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
方式一、消費(fèi)者拒絕&否認(rèn)
- 拒絕消息
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void listener(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息" + msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false) }
- 否認(rèn)消息
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void listener(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息" + msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); }
方式二、超過消息TTL 發(fā)送消息時(shí)設(shè)置TTL
@SpringBootTest public class Publisher { @Autowired private RabbitTemplate template; /** * 5秒未被消費(fèi)會(huì)路由到死信隊(duì)列 */ @Test public void publish_expir() { template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> { message.getMessageProperties().setExpiration("5000"); return message; }); } }
- 設(shè)置隊(duì)列所有消息的TTL
更新RabbitMQConfig類中bootQueue() ,更新后需要?jiǎng)h除隊(duì)列,因?yàn)殛?duì)列屬性有更改。
@Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //聲明隊(duì)列屬性有更改時(shí)需要?jiǎng)h除隊(duì)列 //給隊(duì)列設(shè)置消息時(shí)長(zhǎng) .ttl(10000) .build(); }
方式三、超過隊(duì)列長(zhǎng)度限制
設(shè)置隊(duì)列長(zhǎng)度限制,當(dāng)隊(duì)列長(zhǎng)度超過設(shè)置的閾值,消息便會(huì)路由到死信隊(duì)列。
@Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //聲明隊(duì)列屬性有更改時(shí)需要?jiǎng)h除隊(duì)列 .maxLength(1) .build(); }
代碼倉(cāng)庫(kù) 點(diǎn)我
到此這篇關(guān)于關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列的文章就介紹到這了,更多相關(guān)RabbitMQ實(shí)現(xiàn)死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- RabbitMQ之死信隊(duì)列深入解析
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Springboot集成ClickHouse及應(yīng)用場(chǎng)景分析
這篇文章主要介紹了Springboot集成ClickHouse的實(shí)例代碼,本文通過應(yīng)用場(chǎng)景實(shí)例代碼介紹了整合springboot的詳細(xì)過程,感興趣的朋友跟隨小編一起看看吧2022-02-02Mybatis的mapper標(biāo)簽 namespace屬性用法說明
這篇文章主要介紹了Mybatis的mapper標(biāo)簽 namespace屬性用法說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Java?PTA?計(jì)算3到7位?水仙花數(shù)實(shí)例
這篇文章主要介紹了Java?PTA?計(jì)算3到7位?水仙花數(shù)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03如何解決redis的NOAUTH Authentication required異常
這篇文章主要介紹了Jedis異常解決:NOAUTH Authentication required,,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值2019-07-07詳解springboot項(xiàng)目帶Tomcat和不帶Tomcat的兩種打包方式
這篇文章主要介紹了詳解springboot項(xiàng)目帶Tomcat和不帶Tomcat的兩種打包方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Springboot MultipartFile文件上傳與下載的實(shí)現(xiàn)示例
在Spring Boot項(xiàng)目中,可以使用MultipartFile類來處理文件上傳和下載操作,本文就詳細(xì)介紹了如何使用,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08SpringCloud配置服務(wù)端的ConfigServer設(shè)置安全認(rèn)證
這篇文章主要為大家介紹了SpringCloud配置服務(wù)端的ConfigServer設(shè)置安全認(rèn)證,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08