關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
什么是延遲隊(duì)列
我們常說的延遲隊(duì)列是指消息進(jìn)入隊(duì)列后不會(huì)被立即消費(fèi),只有達(dá)到指定時(shí)間后才能被消費(fèi)。
但RabbitMq中并沒有提供延遲隊(duì)列功能。
那么RabbitMQ如何實(shí)現(xiàn)延遲隊(duì)列
通過:死信隊(duì)列 + RabbitMQ的TTL特性實(shí)現(xiàn)。
實(shí)現(xiàn)原理
給一個(gè)普通帶有過期功能的隊(duì)列綁定一個(gè)死信隊(duì)列,消息先進(jìn)延時(shí)隊(duì)列,過期了后消息進(jìn)入死信隊(duì)列,死信隊(duì)列的消息會(huì)轉(zhuǎn)發(fā)到對應(yīng)的queue里面,我們只需要消費(fèi)死信的queue里面的消息就可以了。
一、TTL特性說明
TTL就是消息或者隊(duì)列的過期功能。
當(dāng)消息過期就會(huì)進(jìn)到死信隊(duì)列,死信隊(duì)列和普通隊(duì)列沒啥區(qū)別,然后我們只需要配置一個(gè)消費(fèi)者來消費(fèi)死信隊(duì)列里面的消息就可以了。
如果不設(shè)置x-message-ttl,則表示消息不會(huì)過期如果x-message-ttl設(shè)置為0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。
如果x-message-ttl設(shè)置不為0,表明消息或隊(duì)列中所有消息的最大存活時(shí)間(過期時(shí)間),單位是毫秒。
需要注意: RabbitMQ只會(huì)對隊(duì)列頭部的消息進(jìn)行過期淘汰,消息是否過期是在即將投遞消息到消費(fèi)者之前判定的,如果隊(duì)列出現(xiàn)消息堆積情況,則已過期的消息還是會(huì)繼續(xù)存活的
比如過期時(shí)間設(shè)置在消息上,由于消息進(jìn)隊(duì)列是先進(jìn)先出,假設(shè)先進(jìn)去的消息過期時(shí)間長,后進(jìn)的消息過期時(shí)間短,先進(jìn)來的消息還沒過期,后面進(jìn)來的消息就算到了過期時(shí)間,消息也不會(huì)過期也就不會(huì)從隊(duì)列中剔除,從而導(dǎo)致隊(duì)列里面的消息積壓。
二、死信隊(duì)列
簡單理解就是要被丟棄的消息才會(huì)放到死信隊(duì)列。
1、消息什么時(shí)候變?yōu)樗佬?/strong>
1、消息被否定接收,消費(fèi)者使用basic.reject 或者 basic.nack并且requeue重回隊(duì)列屬性設(shè)為false
2、消息在隊(duì)列里的時(shí)間超過了該消息設(shè)置的過期時(shí)間(TTL)
3、消息隊(duì)列到達(dá)了它的最大長度,之后再收到的消息。
2、死信隊(duì)列的原理
當(dāng)一個(gè)消息在隊(duì)列里變?yōu)樗佬?,它?huì)被重新publish到綁定的死信隊(duì)列所對應(yīng)的exchange交換機(jī)上,這個(gè)exchange就為DLX。因此我們只需要在聲明正常的業(yè)務(wù)隊(duì)列時(shí)添加一個(gè)可選的"x-dead-letter-exchange"參數(shù),值為死信交換機(jī),死信就會(huì)被rabbitmq重新publish到配置的這個(gè)交換機(jī)上,我們接著監(jiān)聽這個(gè)死信交換機(jī)所綁定隊(duì)列就可以了。
下面看個(gè)案例: 正常情況,我們配置交換機(jī)-A,會(huì)為其綁定路由鍵-A,和路由鍵-A所映射的隊(duì)列-A,并設(shè)置這個(gè)隊(duì)列-A的死信交換機(jī)-B、死信路由鍵-B。
消息消費(fèi)者-A,監(jiān)聽隊(duì)列-A,基本上就沒死信交換機(jī)什么事了,特殊時(shí)候隊(duì)列滿了,或者接收后給的回執(zhí)是false才會(huì)進(jìn)到死信交換機(jī)-B。
現(xiàn)在我們把消費(fèi)者-A刪了,并且設(shè)置隊(duì)列-A里面消息過期時(shí)間,那么所有進(jìn)到隊(duì)列A里面消息就自然過期,然后被推給死信交換機(jī),那么監(jiān)聽死信交換機(jī)-B的消息者-B,就可以得到過期后的消息了,就實(shí)現(xiàn)了延時(shí)功能。
3、死信交換機(jī)
死信交換機(jī)DLX實(shí)際上也是普通的交換機(jī),說白的就是將死信(過期消息)路由到死信隊(duì)列的交換機(jī)。
4、死信隊(duì)列
死信隊(duì)列DLQ(Dead Letter Queue)實(shí)際上也是普通的隊(duì)列,只不過它存儲(chǔ)的是死信交換機(jī)路由過來的死信(過期消息)
三、代碼
1、給隊(duì)列綁定死信隊(duì)列
Map<String, Object> args = new HashMap<>(2); args.put(x-dead-letter-exchange,死信隊(duì)列交換機(jī)) args.put(x-dead-letter-routing-key,死信routeKey) args.put(x-message-ttl,隊(duì)列里面的消息最大存活時(shí)間) //申明一個(gè)普通隊(duì)列,并且給這個(gè)普通隊(duì)里綁定一個(gè)死信隊(duì)列。當(dāng)消息過期就會(huì)轉(zhuǎn)到死信隊(duì)列里面去。 Queue queue = QueueBuilder.durable(queue).withArguments(args).build();
2、設(shè)置過期時(shí)間
目前有兩種方式可以設(shè)置消息的TTL。
第一種是通過隊(duì)列的屬性設(shè)置,隊(duì)列中的所有消息都有相同的過期時(shí)間。
第二種方法是對消息本身進(jìn)行單獨(dú)設(shè)置,每條消息的TTL可以不同。
如果兩種方法同時(shí)設(shè)置,則TTL以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。
消息在隊(duì)列中的生存時(shí)間一旦超過設(shè)置的TTL值時(shí)就會(huì)進(jìn)到死信。
1、通過隊(duì)列屬性設(shè)置消息過期時(shí)間
Map<String, Object> arguments = Maps.newHashMap(); //設(shè)置消息發(fā)送到隊(duì)列中在被丟棄之前可以存活的時(shí)間,單位:毫秒 arguments.put("x-message-ttl", 5000); //聲明隊(duì)列 channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
2、發(fā)送消息的時(shí)候,對消息本身設(shè)定過期時(shí)間
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000");//設(shè)置消息多久沒有被消費(fèi)在過期。 message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
上面底層代碼是:
AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder(); properties.expiration("5000");//設(shè)置消息多久沒有被消費(fèi)在過期。 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties.build(), message.getBytes());
到此這篇關(guān)于關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Rabbitmq死信隊(duì)列延時(shí)隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- RabbitMQ之死信隊(duì)列深入解析
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Mybatis-plus通過添加攔截器實(shí)現(xiàn)簡單數(shù)據(jù)權(quán)限
系統(tǒng)需要根據(jù)用戶所屬的公司,來做一下數(shù)據(jù)權(quán)限控制,具體一點(diǎn),就是通過表中的company_id進(jìn)行權(quán)限控制,項(xiàng)目使用的是mybatis-plus,所以通過添加攔截器的方式,修改查詢sql,實(shí)現(xiàn)數(shù)據(jù)權(quán)限,本文就通過代碼給大家詳細(xì)的講解一下,需要的朋友可以參考下2023-08-08Logger.error打印錯(cuò)誤異常的詳細(xì)堆棧信息
這篇文章主要介紹了Logger.error打印錯(cuò)誤異常的詳細(xì)堆棧信息,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02Java實(shí)現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組
這篇文章主要介紹了Java實(shí)現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09Java正則表達(dá)式matcher.group()用法代碼
這篇文章主要給大家介紹了關(guān)于Java正則表達(dá)式matcher.group()用法的相關(guān)資料,最近在做一個(gè)項(xiàng)目,需要使用matcher.group()方法匹配出需要的內(nèi)容,文中給出了詳細(xì)的代碼示例,需要的朋友可以參考下2023-08-08java通過MySQL驅(qū)動(dòng)攔截器實(shí)現(xiàn)執(zhí)行sql耗時(shí)計(jì)算
本文主要介紹了java通過MySQL驅(qū)動(dòng)攔截器實(shí)現(xiàn)執(zhí)行sql耗時(shí)計(jì)算,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03JDBC數(shù)據(jù)庫連接過程及驅(qū)動(dòng)加載與設(shè)計(jì)模式詳解
這篇文章主要介紹了JDBC數(shù)據(jù)庫連接過程及驅(qū)動(dòng)加載與設(shè)計(jì)模式詳解,需要的朋友可以參考下2016-10-10Java中BeanUtils.copyProperties的11個(gè)坑總結(jié)
我們?nèi)粘i_發(fā)中,經(jīng)常涉及到DO、DTO、VO對象屬性拷貝賦值,很容易想到org.springframework.beans.BeanUtils的copyProperties,它會(huì)自動(dòng)通過反射機(jī)制獲取源對象和目標(biāo)對象的屬性,pyProperties,會(huì)有好幾個(gè)坑呢,本文將給大家總結(jié)一下遇到的坑,需要的朋友可以參考下2023-05-05Springboot敏感字段脫敏的實(shí)現(xiàn)思路
這篇文章主要介紹了Springboot敏感字段脫敏的實(shí)現(xiàn)思路,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-09-09