RabbitMQ之死信隊(duì)列深入解析
1. 死信的概念
死信,顧名思義就是無(wú)法被消費(fèi)的消息,字面意思可以這樣理解,一般來(lái)說(shuō),producer將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因 queue 中的某些消息無(wú)法被消費(fèi),這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。
應(yīng)用場(chǎng)景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中。還有比如說(shuō):用戶在商場(chǎng)下單成功并點(diǎn)擊支付后在指定時(shí)間未支付時(shí)自動(dòng)失效。
2. 死信的來(lái)源
- 消息TLL過(guò)期
- 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無(wú)法再添加數(shù)據(jù)到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack )并且 requeue = false
3. 死信實(shí)戰(zhàn)
3.1 代碼架構(gòu)圖
3.2 消息 TTL 過(guò)期
生產(chǎn)者代碼
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); // 設(shè)置消息的TTL時(shí)間 new AMQP.BasicProperties().builder().expiration("10000").build(); // 該消息是用作演示隊(duì)列個(gè)數(shù)限制 for (int i = 0; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println("生產(chǎn)者發(fā)送消息:" + message); } } } }
消費(fèi)者 C1 代碼( 啟動(dòng)之后關(guān)閉消費(fèi)者 模擬其接收不到消息 )
public class Consumer01 { // 普通交換機(jī)名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交換機(jī)名稱 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 聲明死信和普通交換機(jī) 類(lèi)型為direct channel.exchangeDeclare(NORMAL_EXCHANGE, "direct"); channel.exchangeDeclare(DEAD_EXCHANGE, "direct"); // 聲明死信隊(duì)列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信隊(duì)列綁定死信交換機(jī) routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); // 正常隊(duì)列綁定死信隊(duì)列信息 Map<String, Object> params = new HashMap<>(); // 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù)key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常隊(duì)列設(shè)置死信 routing-key 參數(shù)是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, null); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接受消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接受消息" + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
生產(chǎn)者未發(fā)送消息
生產(chǎn)者發(fā)送了10條消息,此時(shí)正常消息隊(duì)列有10條未消費(fèi)信息
時(shí)間過(guò)去10秒,正常隊(duì)列里面的消息由于沒(méi)有被消費(fèi),消息進(jìn)入死信隊(duì)列
消費(fèi)者 C2 代碼(以上步驟完成后 啟動(dòng) C2 消費(fèi)者 它消費(fèi)死信隊(duì)列里面的消息)
public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, "direct"); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接受死信隊(duì)列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接受死信隊(duì)列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
3.3 隊(duì)列達(dá)到最大長(zhǎng)度
消費(fèi)生產(chǎn)者代碼去掉 TTL 屬性
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); // 該消息是用作演示隊(duì)列個(gè)數(shù)限制 for (int i = 0; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println("生產(chǎn)者發(fā)送消息:" + message); } } } }
C1 消費(fèi)者修改以下代碼(啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)
// 正常隊(duì)列綁定死信隊(duì)列信息 Map<String, Object> params = new HashMap<>(); // 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù)key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常隊(duì)列設(shè)置死信 routing-key 參數(shù)是固定值 params.put("x-dead-letter-routing-key", "lisi"); // 設(shè)置正常隊(duì)列長(zhǎng)度的限制 params.put("x-max-length", 6);// 添加該代碼
注意此時(shí)需要把原先隊(duì)列刪除,因?yàn)閰?shù)改變了
C2 消費(fèi)者代碼不變(啟動(dòng) C2 消費(fèi)者)
3.4 消息被拒
消息生產(chǎn)者代碼同上生產(chǎn)者一致
C1 消費(fèi)者代碼(啟動(dòng)之后關(guān)閉該消費(fèi)者,模擬接收不到消息)
public class Consumer01 { // 普通交換機(jī)名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交換機(jī)名稱 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 聲明死信和普通交換機(jī) 類(lèi)型為direct channel.exchangeDeclare(NORMAL_EXCHANGE, "direct"); channel.exchangeDeclare(DEAD_EXCHANGE, "direct"); // 聲明死信隊(duì)列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信隊(duì)列綁定死信交換機(jī) routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); // 正常隊(duì)列綁定死信隊(duì)列信息 Map<String, Object> params = new HashMap<>(); // 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù)key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常隊(duì)列設(shè)置死信 routing-key 參數(shù)是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, null); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接受消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); if (message.equals("info5")) { System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息"); // requeue 設(shè)置為false 代表拒絕重新入隊(duì) 該隊(duì)列如果配置了死信交換機(jī)將發(fā)送到死信隊(duì)列中 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer01 接受消息" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> { }); } }
生產(chǎn)者發(fā)送消息之后
C2 消費(fèi)者代碼不變
啟動(dòng)消費(fèi)者1 然后再啟動(dòng)消費(fèi)者2
到此這篇關(guān)于RabbitMQ之死信隊(duì)列深入解析的文章就介紹到這了,更多相關(guān)RabbitMQ死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
SpringBoot2.3新特性優(yōu)雅停機(jī)詳解
這篇文章主要介紹了SpringBoot2.3新特性優(yōu)雅停機(jī)詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05如何通過(guò)Kaptcha在Web頁(yè)面生成驗(yàn)證碼
這篇文章主要介紹了如何通過(guò)Kaptcha在Web頁(yè)面生成驗(yàn)證碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10SpringBoot中的maven插件spring-boot-maven-plugin使用
這篇文章主要介紹了SpringBoot中的maven插件spring-boot-maven-plugin使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Mybatis把返回結(jié)果封裝成map類(lèi)型的實(shí)現(xiàn)
本文主要介紹了Mybatis把返回結(jié)果封裝成map類(lèi)型的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03