欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ消息有效期與死信的處理過程

 更新時(shí)間:2022年03月01日 10:30:08   作者:lss0555  
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信?(dead?message)?之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX,本文重點(diǎn)給大家介紹RabbitMQ消息有效期與死信的相關(guān)知識,感興趣的朋友跟隨小編一起看看吧

一.前言

RabbitMQ的TTL全稱為Time-To-Live,表示的是消息的有效期。消息如果在隊(duì)列中一直沒有被消費(fèi)并且存在時(shí)間超過了TTL,消息就會(huì)變成了"死信" (Dead Message),后續(xù)無法再被消費(fèi)了。如果不設(shè)置TTL,則表示此消息永久有效(默認(rèn)消息是不會(huì)失效的)。如果將TTL設(shè)為0,則表示如果消息不能被立馬消費(fèi)則會(huì)被立即丟掉,這個(gè)特性可以部分替代RabbitMQ3.0以前支持的immediate參數(shù),之所以所部分代替,是應(yīng)為immediate參數(shù)在投遞失敗會(huì)有basic.return方法將消息體返回(這個(gè)功能可以利用死信隊(duì)列來實(shí)現(xiàn))。

設(shè)置TTL有兩種方式:

  • 隊(duì)列有效期:是聲明隊(duì)列的時(shí)候,在隊(duì)列的屬性中設(shè)置,這樣該隊(duì)列中的消息都會(huì)有相同的有效期
  • 消息有效期:發(fā)送消息時(shí)給消息設(shè)置屬性,可以為每條消息都設(shè)置不同的TTL

如果兩種方式都設(shè)置了,則以設(shè)置的較小的為準(zhǔn)。

  • 區(qū)別:如果聲明隊(duì)列時(shí)設(shè)置了有效期,則消息過期了就會(huì)被刪掉;如果是發(fā)消息時(shí)設(shè)置的有效期,消息過期了也不會(huì)被立馬刪掉,因?yàn)檫@時(shí)消息是否過期是在要投遞給消費(fèi)者時(shí)判斷的。

二.設(shè)置消息有效期

1.設(shè)置隊(duì)列的有效期TTL

定義隊(duì)列的方法如下:

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

該方法的arguments參數(shù)可以設(shè)置隊(duì)列的屬性,屬性名為x-message-ttl,單位為毫秒。后臺添加的話如下:

在這里插入圖片描述

代碼中設(shè)置如下:

Map<String, Object> arguments= new HashMap<String , Object>();
arguments.put("x-message-ttl " , 10000);//10秒鐘  單位為毫秒
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

命令行模式來設(shè)置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":100000}' --apply-to queues

通過HTTP接口調(diào)用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 100000}}' 
http://ip:15672/api/queues/{vhost}/{queuename}

2.設(shè)置隊(duì)列的有效期Expire

有效期Expire可以讓隊(duì)列在指定時(shí)間內(nèi) “未被使用” 的話會(huì)自動(dòng)過期刪除,未使用的意思是 queue 上沒有任何 consumer,queue 沒有被重新聲明,并且在過期時(shí)間段內(nèi)未調(diào)用過 basic.get 命令。該方式可用于,例如,RPC-style 的回復(fù) queue, 其中許多queue 會(huì)被創(chuàng)建出來,但是卻從未被使用。

服務(wù)器會(huì)確保在過期時(shí)間到達(dá)后 queue 被刪除,但是不保證刪除的動(dòng)作有多么的及時(shí)。在服務(wù)器重啟后,持久化的queue 的超時(shí)時(shí)間將重新計(jì)算。 x-expires 參數(shù)值以毫秒為單位,并且服從和 x-message-ttl 一樣的約束條件,且不能設(shè)置為 0 。所以,如果該參數(shù)設(shè)置為 10000 ,則表示該 queue 如果在 10s之內(nèi)未被使用則會(huì)被刪除。

代碼如下:

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 10000); 
channel.queueDeclare("queue", false, false, false, args);  

3.通過發(fā)送消息時(shí)設(shè)置有效期

發(fā)送消息的方法如下:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

在該方法的props參數(shù)可以設(shè)置其有效期:

       Map<String, Object> headers = new HashMap<String, Object>();
                        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                                .deliveryMode(2) // 消息持久
                                .contentEncoding("UTF-8") // 編碼方式
                                .contentType("text/plain")
                                .expiration("100000")
                                .headers(headers)
                                .build();
      channel.basicPublish("", queueName, properties, message.getBytes());

通過HTTPAPI 接口設(shè)置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"100000"},"routing_key":"routingkey","payload":"bodys","payload_encoding":"string"}'  
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

三.死信交換機(jī)DLX

介紹

  • 死信隊(duì)列:DLX,dead-letter-exchange
  • 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信 (dead message) 之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX

消息變成死信幾種情況

  • 消息被拒絕(Basic.Reject/Basic.Nack) ,井且設(shè)置requeue 參數(shù)為false
  • 消息過期
  • 隊(duì)列達(dá)到最大長度

死信處理過程

  • DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
  • 當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。
  • 可以監(jiān)聽這個(gè)隊(duì)列中的消息做相應(yīng)的處理。

用途

通過監(jiān)控消費(fèi)死信隊(duì)列中消息,來觀察和分析數(shù)據(jù)。
結(jié)合TTL實(shí)現(xiàn)延遲隊(duì)列(比如下單超過多長時(shí)間自動(dòng)關(guān)閉)

使用

代碼如下:

channel.exchangeDeclare("dlx_exchange" , "direct"); //創(chuàng)建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//設(shè)置死信交換機(jī)
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//設(shè)置DLX的路由鍵(可以不設(shè)置)
channel.queueDeclare("myqueue" , false , false , false , args);

實(shí)例

public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//聲明一個(gè)交換機(jī),做死信交換機(jī)用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//聲明一個(gè)隊(duì)列,做死信隊(duì)列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//隊(duì)列綁定到交換機(jī)上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
		
		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 1000);//設(shè)置消息有效期1秒,過期后變成私信消息,然后進(jìn)入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//設(shè)置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//設(shè)置DLX的路由鍵
		//為隊(duì)列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");
		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("測試死信消息").getBytes());
		System.out.println("發(fā)送消息時(shí)間:"+ConnectionUtil.formatDate(new Date()));
		channel.close();
		connection.close();
	}

說明:

申明死信隊(duì)列dlx_queue的綁定如下,與死信交換機(jī)dlx_exchange(topic類型)進(jìn)行綁定,routing key為"dlx.*"
申明隊(duì)列normal_queue,與交換機(jī)normal_exchange(fanout類型)進(jìn)行綁定

執(zhí)行流程:

  • 消息發(fā)送到交換機(jī)normal_exchange,然后路由到隊(duì)列normal_queue上
  • 因?yàn)殛?duì)列normal_queue沒有消費(fèi)者,消息過期后成為死信消息
  • 死信消息攜帶設(shè)置的x-dead-letter-routing-key=dlx.test進(jìn)入到死信交換機(jī)dlx_exechage
  • dlx_exechage與dlx_queue綁定的routing key為"dlx.*",死信消息的路由鍵dlx.test符合該規(guī)則被路由到dlx.queue上面。

參考:

https://www.jianshu.com/p/986ee5eb78bc

https://blog.csdn.net/u012988901/article/details/88958654

到此這篇關(guān)于RabbitMQ之消息有效期與死信的文章就介紹到這了,更多相關(guān)RabbitMQ消息有效期內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java8 CompletableFuture 異步執(zhí)行操作

    Java8 CompletableFuture 異步執(zhí)行操作

    CompletableFuture是java8提供的基于異步操作的封裝,日常開發(fā)中經(jīng)常會(huì)用到,接下來通過本文給大家介紹Java8 CompletableFuture 異步執(zhí)行操作,感興趣的朋友一起看看吧
    2021-06-06
  • 如何查看java進(jìn)程內(nèi)存占用情況

    如何查看java進(jìn)程內(nèi)存占用情況

    這篇文章主要介紹了如何查看java進(jìn)程內(nèi)存占用情況問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Springboot中的@Conditional注解詳解

    Springboot中的@Conditional注解詳解

    這篇文章主要介紹了Springboot中的@Conditional注解詳解,@Conditional來源于spring-context包下的一個(gè)注解,Conditional中文是條件的意思,@Conditional注解它的作用是按照一定的條件進(jìn)行判斷,滿足條件給容器注冊bean,需要的朋友可以參考下
    2023-09-09
  • 深入解析反編譯字節(jié)碼文件中的代碼邏輯JVM中的String操作

    深入解析反編譯字節(jié)碼文件中的代碼邏輯JVM中的String操作

    這篇文章主要介紹了深入解析反編譯字節(jié)碼文件中的代碼邏輯JVM中的String操作,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • 詳解Java中的防抖和節(jié)流

    詳解Java中的防抖和節(jié)流

    防抖是將多次執(zhí)行變?yōu)橹付〞r(shí)間內(nèi)不在觸發(fā)之后,執(zhí)行一次。節(jié)流是將多次執(zhí)行變?yōu)橹付〞r(shí)間不論觸發(fā)多少次,時(shí)間一到就執(zhí)行一次。這篇文章來和大家聊聊Java中的防抖和節(jié)流,感興趣的可以了解一下
    2022-08-08
  • Java中SynchronousQueue的底層實(shí)現(xiàn)原理剖析

    Java中SynchronousQueue的底層實(shí)現(xiàn)原理剖析

    BlockingQueue的實(shí)現(xiàn)類中,有一種阻塞隊(duì)列比較特殊,就是SynchronousQueue(同步移交隊(duì)列),隊(duì)列長度為0。本文就來剖析一下SynchronousQueue的底層實(shí)現(xiàn)原理,感興趣的可以了解一下
    2022-11-11
  • SpringMVC框架post提交數(shù)據(jù)庫出現(xiàn)亂碼解決方案

    SpringMVC框架post提交數(shù)據(jù)庫出現(xiàn)亂碼解決方案

    這篇文章主要介紹了SpringMVC框架post提交數(shù)據(jù)庫出現(xiàn)亂碼解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-09-09
  • Gradle build 報(bào)錯(cuò):Received status code 400 from server

    Gradle build 報(bào)錯(cuò):Received status code 400 from server

    這篇文章主要介紹了Gradle build 報(bào)錯(cuò):Received status code 400 from server,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié))

    Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié))

    這篇文章主要介紹了Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié)),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2019-07-07
  • Springcloud sentinel安裝和使用方法解析

    Springcloud sentinel安裝和使用方法解析

    這篇文章主要介紹了Springcloud sentinel安裝和使用方法解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-12-12

最新評論