RabbitMQ消息有效期與死信的處理過程
一.前言
RabbitMQ的TTL全稱為Time-To-Live,表示的是消息的有效期。消息如果在隊(duì)列中一直沒有被消費(fèi)并且存在時(shí)間超過了TTL,消息就會(huì)變成了"死信" (Dead Message),后續(xù)無法再被消費(fèi)了。如果不設(shè)置TTL,則表示此消息永久有效(默認(rèn)消息是不會(huì)失效的)。如果將TTL設(shè)為0,則表示如果消息不能被立馬消費(fèi)則會(huì)被立即丟掉,這個(gè)特性可以部分替代RabbitMQ3.0以前支持的immediate參數(shù),之所以所部分代替,是應(yīng)為immediate參數(shù)在投遞失敗會(huì)有basic.return方法將消息體返回(這個(gè)功能可以利用死信隊(duì)列來實(shí)現(xiàn))。
設(shè)置TTL有兩種方式:
- 隊(duì)列有效期:是聲明隊(duì)列的時(shí)候,在隊(duì)列的屬性中設(shè)置,這樣該隊(duì)列中的消息都會(huì)有相同的有效期
- 消息有效期:發(fā)送消息時(shí)給消息設(shè)置屬性,可以為每條消息都設(shè)置不同的TTL
如果兩種方式都設(shè)置了,則以設(shè)置的較小的為準(zhǔn)。
- 區(qū)別:如果聲明隊(duì)列時(shí)設(shè)置了有效期,則消息過期了就會(huì)被刪掉;如果是發(fā)消息時(shí)設(shè)置的有效期,消息過期了也不會(huì)被立馬刪掉,因?yàn)檫@時(shí)消息是否過期是在要投遞給消費(fèi)者時(shí)判斷的。
二.設(shè)置消息有效期
1.設(shè)置隊(duì)列的有效期TTL
定義隊(duì)列的方法如下:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
該方法的arguments參數(shù)可以設(shè)置隊(duì)列的屬性,屬性名為x-message-ttl,單位為毫秒。后臺添加的話如下:
代碼中設(shè)置如下:
Map<String, Object> arguments= new HashMap<String , Object>(); arguments.put("x-message-ttl " , 10000);//10秒鐘 單位為毫秒 channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;
命令行模式來設(shè)置:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":100000}' --apply-to queues
通過HTTP接口調(diào)用:
$ curl -i -u guest:guest -H "content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 100000}}' http://ip:15672/api/queues/{vhost}/{queuename}
2.設(shè)置隊(duì)列的有效期Expire
有效期Expire可以讓隊(duì)列在指定時(shí)間內(nèi) “未被使用” 的話會(huì)自動(dòng)過期刪除,未使用的意思是 queue 上沒有任何 consumer,queue 沒有被重新聲明,并且在過期時(shí)間段內(nèi)未調(diào)用過 basic.get 命令。該方式可用于,例如,RPC-style 的回復(fù) queue, 其中許多queue 會(huì)被創(chuàng)建出來,但是卻從未被使用。
服務(wù)器會(huì)確保在過期時(shí)間到達(dá)后 queue 被刪除,但是不保證刪除的動(dòng)作有多么的及時(shí)。在服務(wù)器重啟后,持久化的queue 的超時(shí)時(shí)間將重新計(jì)算。 x-expires 參數(shù)值以毫秒為單位,并且服從和 x-message-ttl 一樣的約束條件,且不能設(shè)置為 0 。所以,如果該參數(shù)設(shè)置為 10000 ,則表示該 queue 如果在 10s之內(nèi)未被使用則會(huì)被刪除。
代碼如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 10000); channel.queueDeclare("queue", false, false, false, args);
3.通過發(fā)送消息時(shí)設(shè)置有效期
發(fā)送消息的方法如下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
在該方法的props參數(shù)可以設(shè)置其有效期:
Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 消息持久 .contentEncoding("UTF-8") // 編碼方式 .contentType("text/plain") .expiration("100000") .headers(headers) .build(); channel.basicPublish("", queueName, properties, message.getBytes());
通過HTTPAPI 接口設(shè)置:
$ curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"properties":{"expiration":"100000"},"routing_key":"routingkey","payload":"bodys","payload_encoding":"string"}' http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish
三.死信交換機(jī)DLX
介紹
- 死信隊(duì)列:DLX,dead-letter-exchange
- 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信 (dead message) 之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX
消息變成死信幾種情況
- 消息被拒絕(Basic.Reject/Basic.Nack) ,井且設(shè)置requeue 參數(shù)為false
- 消息過期
- 隊(duì)列達(dá)到最大長度
死信處理過程
- DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
- 當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。
- 可以監(jiān)聽這個(gè)隊(duì)列中的消息做相應(yīng)的處理。
用途
通過監(jiān)控消費(fèi)死信隊(duì)列中消息,來觀察和分析數(shù)據(jù)。
結(jié)合TTL實(shí)現(xiàn)延遲隊(duì)列(比如下單超過多長時(shí)間自動(dòng)關(guān)閉)
使用
代碼如下:
channel.exchangeDeclare("dlx_exchange" , "direct"); //創(chuàng)建DLX: dlx_exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange" , "dlx_exchange ");//設(shè)置死信交換機(jī) args.put("x-dead-letter-routing-key" , "dlx-routing-key");//設(shè)置DLX的路由鍵(可以不設(shè)置) channel.queueDeclare("myqueue" , false , false , false , args);
實(shí)例
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明一個(gè)交換機(jī),做死信交換機(jī)用 channel.exchangeDeclare("dlx_exchange", "topic", true, false, null); //聲明一個(gè)隊(duì)列,做死信隊(duì)列用 channel.queueDeclare("dlx_queue", true, false, false, null); //隊(duì)列綁定到交換機(jī)上 channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*"); channel.exchangeDeclare("normal_exchange", "fanout", true, false, null); Map<String, Object> arguments=new HashMap<String, Object>(); arguments.put("x-message-ttl" , 1000);//設(shè)置消息有效期1秒,過期后變成私信消息,然后進(jìn)入DLX arguments.put("x-dead-letter-exchange" , "dlx_exchange");//設(shè)置DLX arguments.put("x-dead-letter-routing-key" , "dlx.test");//設(shè)置DLX的路由鍵 //為隊(duì)列normal_queue 添加DLX channel.queueDeclare("normal_queue", true, false, false, arguments); channel.queueBind("normal_queue", "normal_exchange", ""); channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("測試死信消息").getBytes()); System.out.println("發(fā)送消息時(shí)間:"+ConnectionUtil.formatDate(new Date())); channel.close(); connection.close(); }
說明:
申明死信隊(duì)列dlx_queue的綁定如下,與死信交換機(jī)dlx_exchange(topic類型)進(jìn)行綁定,routing key為"dlx.*"
申明隊(duì)列normal_queue,與交換機(jī)normal_exchange(fanout類型)進(jìn)行綁定
執(zhí)行流程:
- 消息發(fā)送到交換機(jī)normal_exchange,然后路由到隊(duì)列normal_queue上
- 因?yàn)殛?duì)列normal_queue沒有消費(fèi)者,消息過期后成為死信消息
- 死信消息攜帶設(shè)置的x-dead-letter-routing-key=dlx.test進(jìn)入到死信交換機(jī)dlx_exechage
- dlx_exechage與dlx_queue綁定的routing key為"dlx.*",死信消息的路由鍵dlx.test符合該規(guī)則被路由到dlx.queue上面。
參考:
https://www.jianshu.com/p/986ee5eb78bc
https://blog.csdn.net/u012988901/article/details/88958654
到此這篇關(guān)于RabbitMQ之消息有效期與死信的文章就介紹到這了,更多相關(guān)RabbitMQ消息有效期內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java8 CompletableFuture 異步執(zhí)行操作
CompletableFuture是java8提供的基于異步操作的封裝,日常開發(fā)中經(jīng)常會(huì)用到,接下來通過本文給大家介紹Java8 CompletableFuture 異步執(zhí)行操作,感興趣的朋友一起看看吧2021-06-06深入解析反編譯字節(jié)碼文件中的代碼邏輯JVM中的String操作
這篇文章主要介紹了深入解析反編譯字節(jié)碼文件中的代碼邏輯JVM中的String操作,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Java中SynchronousQueue的底層實(shí)現(xiàn)原理剖析
BlockingQueue的實(shí)現(xiàn)類中,有一種阻塞隊(duì)列比較特殊,就是SynchronousQueue(同步移交隊(duì)列),隊(duì)列長度為0。本文就來剖析一下SynchronousQueue的底層實(shí)現(xiàn)原理,感興趣的可以了解一下2022-11-11SpringMVC框架post提交數(shù)據(jù)庫出現(xiàn)亂碼解決方案
這篇文章主要介紹了SpringMVC框架post提交數(shù)據(jù)庫出現(xiàn)亂碼解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Gradle build 報(bào)錯(cuò):Received status code 400 from server
這篇文章主要介紹了Gradle build 報(bào)錯(cuò):Received status code 400 from server,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié))
這篇文章主要介紹了Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié)),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-07-07