springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
1. 死信隊(duì)列
死信隊(duì)列是一種特殊的消息隊(duì)列,用來(lái)存儲(chǔ)無(wú)法被正常消費(fèi)的消息,常被用來(lái)實(shí)現(xiàn)延遲處理,異常消息處理等,提高了系統(tǒng)的可伸縮性和容錯(cuò)性,能夠應(yīng)對(duì)高并發(fā)和異常消息。
死信隊(duì)列中的消息被稱為死信消息,用來(lái)分發(fā)死信消息的交換機(jī)被稱為死信交換機(jī)(Dead Letter Exchange,DLX)。
死信隊(duì)列在實(shí)際項(xiàng)目中的應(yīng)用場(chǎng)景有很多如:
- 訂單超時(shí)未支付,將此消息放入死信隊(duì)列中,等待后續(xù)處理(延遲等待)
- 消息消費(fèi)失敗將消息放入死信隊(duì)列中進(jìn)行重試(消息重試機(jī)制)
2.正常消息成為死信消息的條件
- 消息到了過(guò)期時(shí)間仍然未被消費(fèi)者消費(fèi)
- 隊(duì)列已滿無(wú)法保存新消息
- 消息被拒絕消費(fèi)且未設(shè)置重新放入隊(duì)列
3.消費(fèi)者1
package com.hong.rabbitmq9; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * @Description: 死信隊(duì)列消費(fèi)者1 * @Author: hong * @Date: 2024-01-17 21:04 * @Version: 1.0 **/ public class Consumer1 { //正常交換機(jī)名稱 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機(jī)名稱 public static final String DEAD_EXCHANGE = "dead_exchange"; //正常隊(duì)列名稱 public static final String NORMAL_QUEUE = "normal_queue"; //死信隊(duì)列名稱 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,false,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); //正常隊(duì)列綁定死信隊(duì)列信息 Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "lisi"); //聲明正常隊(duì)列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); System.out.println("Consumer1等待接收消息:"); DeliverCallback deliverCallback = (comsumerTag, message) -> { System.out.println( "routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; CancelCallback cancelCallback = var -> { }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback); } }
4.生產(chǎn)者
package com.hong.rabbitmq9; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; /** * @Description: 死信隊(duì)列消息生產(chǎn)者 * @Author: hong * @Date: 2024-01-17 20:49 * @Version: 1.0 **/ public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //設(shè)置消息的 TTL 時(shí)間 10s AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); //該信息是用作演示隊(duì)列個(gè)數(shù)限制 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); System.out.println("生產(chǎn)者發(fā)送消息:" + message); } } }
啟動(dòng)消費(fèi)者1后馬上關(guān)閉消費(fèi)者1,模擬消費(fèi)者1接收不到消息,再啟動(dòng)生產(chǎn)者
生產(chǎn)者發(fā)送10條正常隊(duì)列中有10條消息
10s后正常隊(duì)列中的消息由于沒(méi)有消費(fèi)者消費(fèi)進(jìn)入死信隊(duì)列中
5.消費(fèi)者2
package com.hong.rabbitmq9; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * @Description: 死信隊(duì)列-死信消費(fèi)者 * @Author: hong * @Date: 2024-01-17 21:31 * @Version: 1.0 **/ public class Consumer2 { public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,false,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("Consumer2等待接收死信消息:"); DeliverCallback deliverCallback = (comsumerTag, message) -> { System.out.println( "routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; CancelCallback cancelCallback = var -> { }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback); } }
啟動(dòng)消費(fèi)者2,使其消費(fèi)死信隊(duì)列中的消息
6.隊(duì)列達(dá)到最大長(zhǎng)度
以上代碼是TTL,隊(duì)列達(dá)到最大長(zhǎng)度只要將上述代碼稍微改動(dòng)一下即可
6.1.注釋掉生產(chǎn)者代碼中的ttl部分
package com.hong.rabbitmq9; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; /** * @Description: 死信隊(duì)列消息生產(chǎn)者 * @Author: hong * @Date: 2024-01-17 20:49 * @Version: 1.0 **/ public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //設(shè)置消息的 TTL 時(shí)間 10s // AMQP.BasicProperties properties = new AMQP.BasicProperties() // .builder().expiration("10000").build(); //該信息是用作演示隊(duì)列個(gè)數(shù)限制 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println("生產(chǎn)者發(fā)送消息:" + message); } } }
6.2.消費(fèi)者1代碼中加最大長(zhǎng)度
package com.hong.rabbitmq9; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * @Description: 死信隊(duì)列消費(fèi)者1 * @Author: hong * @Date: 2024-01-17 21:04 * @Version: 1.0 **/ public class Consumer1 { //正常交換機(jī)名稱 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機(jī)名稱 public static final String DEAD_EXCHANGE = "dead_exchange"; //正常隊(duì)列名稱 public static final String NORMAL_QUEUE = "normal_queue"; //死信隊(duì)列名稱 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,false,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); //正常隊(duì)列綁定死信隊(duì)列信息 Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "lisi"); map.put("x-max-length",8); //聲明正常隊(duì)列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); System.out.println("Consumer1等待接收消息:"); DeliverCallback deliverCallback = (comsumerTag, message) -> { System.out.println( "routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; CancelCallback cancelCallback = var -> { }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback); } }
啟動(dòng)消費(fèi)者1后立馬關(guān)閉,模擬隊(duì)列已滿
到此這篇關(guān)于springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)springboot RabbitMQ死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- RabbitMQ之死信隊(duì)列深入解析
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Spring Cloud基于zuul實(shí)現(xiàn)網(wǎng)關(guān)過(guò)程解析
這篇文章主要介紹了Spring Cloud基于zuul實(shí)現(xiàn)網(wǎng)關(guān)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12SpringBoot 使用 @Value 注解讀取配置文件給靜態(tài)變量賦值
這篇文章主要介紹了SpringBoot 使用 @Value 注解讀取配置文件給靜態(tài)變量賦值,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11如何對(duì)Mysql數(shù)據(jù)表查詢出來(lái)的結(jié)果進(jìn)行排序
這篇文章主要介紹了如何對(duì)Mysql數(shù)據(jù)表查詢出來(lái)的結(jié)果進(jìn)行排序問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08REST架構(gòu)及RESTful應(yīng)用程序簡(jiǎn)介
這篇文章主要為大家介紹了REST架構(gòu)及RESTful的應(yīng)用程序簡(jiǎn)介,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03快速解決List集合add元素,添加多個(gè)對(duì)象出現(xiàn)重復(fù)的問(wèn)題
這篇文章主要介紹了快速解決List集合add元素,添加多個(gè)對(duì)象出現(xiàn)重復(fù)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08