關(guān)于Rabbitmq死信隊列及延時隊列的實現(xiàn)
什么是延遲隊列
我們常說的延遲隊列是指消息進入隊列后不會被立即消費,只有達到指定時間后才能被消費。
但RabbitMq中并沒有提供延遲隊列功能。
那么RabbitMQ如何實現(xiàn)延遲隊列
通過:死信隊列 + RabbitMQ的TTL特性實現(xiàn)。
實現(xiàn)原理
給一個普通帶有過期功能的隊列綁定一個死信隊列,消息先進延時隊列,過期了后消息進入死信隊列,死信隊列的消息會轉(zhuǎn)發(fā)到對應(yīng)的queue里面,我們只需要消費死信的queue里面的消息就可以了。
一、TTL特性說明
TTL就是消息或者隊列的過期功能。
當(dāng)消息過期就會進到死信隊列,死信隊列和普通隊列沒啥區(qū)別,然后我們只需要配置一個消費者來消費死信隊列里面的消息就可以了。
如果不設(shè)置x-message-ttl,則表示消息不會過期如果x-message-ttl設(shè)置為0,則表示除非此時可以直接投遞該消息到消費者,否則該消息將會被丟棄。
如果x-message-ttl設(shè)置不為0,表明消息或隊列中所有消息的最大存活時間(過期時間),單位是毫秒。
需要注意: RabbitMQ只會對隊列頭部的消息進行過期淘汰,消息是否過期是在即將投遞消息到消費者之前判定的,如果隊列出現(xiàn)消息堆積情況,則已過期的消息還是會繼續(xù)存活的
比如過期時間設(shè)置在消息上,由于消息進隊列是先進先出,假設(shè)先進去的消息過期時間長,后進的消息過期時間短,先進來的消息還沒過期,后面進來的消息就算到了過期時間,消息也不會過期也就不會從隊列中剔除,從而導(dǎo)致隊列里面的消息積壓。
二、死信隊列
簡單理解就是要被丟棄的消息才會放到死信隊列。
1、消息什么時候變?yōu)樗佬?/strong>
1、消息被否定接收,消費者使用basic.reject 或者 basic.nack并且requeue重回隊列屬性設(shè)為false
2、消息在隊列里的時間超過了該消息設(shè)置的過期時間(TTL)
3、消息隊列到達了它的最大長度,之后再收到的消息。
2、死信隊列的原理
當(dāng)一個消息在隊列里變?yōu)樗佬?,它會被重新publish到綁定的死信隊列所對應(yīng)的exchange交換機上,這個exchange就為DLX。因此我們只需要在聲明正常的業(yè)務(wù)隊列時添加一個可選的"x-dead-letter-exchange"參數(shù),值為死信交換機,死信就會被rabbitmq重新publish到配置的這個交換機上,我們接著監(jiān)聽這個死信交換機所綁定隊列就可以了。
下面看個案例: 正常情況,我們配置交換機-A,會為其綁定路由鍵-A,和路由鍵-A所映射的隊列-A,并設(shè)置這個隊列-A的死信交換機-B、死信路由鍵-B。
消息消費者-A,監(jiān)聽隊列-A,基本上就沒死信交換機什么事了,特殊時候隊列滿了,或者接收后給的回執(zhí)是false才會進到死信交換機-B。
現(xiàn)在我們把消費者-A刪了,并且設(shè)置隊列-A里面消息過期時間,那么所有進到隊列A里面消息就自然過期,然后被推給死信交換機,那么監(jiān)聽死信交換機-B的消息者-B,就可以得到過期后的消息了,就實現(xiàn)了延時功能。
3、死信交換機
死信交換機DLX實際上也是普通的交換機,說白的就是將死信(過期消息)路由到死信隊列的交換機。
4、死信隊列
死信隊列DLQ(Dead Letter Queue)實際上也是普通的隊列,只不過它存儲的是死信交換機路由過來的死信(過期消息)
三、代碼
1、給隊列綁定死信隊列
Map<String, Object> args = new HashMap<>(2); args.put(x-dead-letter-exchange,死信隊列交換機) args.put(x-dead-letter-routing-key,死信routeKey) args.put(x-message-ttl,隊列里面的消息最大存活時間) //申明一個普通隊列,并且給這個普通隊里綁定一個死信隊列。當(dāng)消息過期就會轉(zhuǎn)到死信隊列里面去。 Queue queue = QueueBuilder.durable(queue).withArguments(args).build();
2、設(shè)置過期時間
目前有兩種方式可以設(shè)置消息的TTL。
第一種是通過隊列的屬性設(shè)置,隊列中的所有消息都有相同的過期時間。
第二種方法是對消息本身進行單獨設(shè)置,每條消息的TTL可以不同。
如果兩種方法同時設(shè)置,則TTL以兩者之間較小的那個數(shù)值為準(zhǔn)。
消息在隊列中的生存時間一旦超過設(shè)置的TTL值時就會進到死信。
1、通過隊列屬性設(shè)置消息過期時間
Map<String, Object> arguments = Maps.newHashMap(); //設(shè)置消息發(fā)送到隊列中在被丟棄之前可以存活的時間,單位:毫秒 arguments.put("x-message-ttl", 5000); //聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
2、發(fā)送消息的時候,對消息本身設(shè)定過期時間
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000");//設(shè)置消息多久沒有被消費在過期。 message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
上面底層代碼是:
AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder(); properties.expiration("5000");//設(shè)置消息多久沒有被消費在過期。 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties.build(), message.getBytes());
到此這篇關(guān)于關(guān)于Rabbitmq死信隊列及延時隊列的實現(xiàn)的文章就介紹到這了,更多相關(guān)Rabbitmq死信隊列延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊列和延遲隊列
- SpringBoot+RabbitMQ?實現(xiàn)死信隊列的示例
- 如何利用rabbitMq的死信隊列實現(xiàn)延時消息
- 深入分析RabbitMQ中死信隊列與死信交換機
- 關(guān)于SpringBoot整合RabbitMQ實現(xiàn)死信隊列
- Springboot結(jié)合rabbitmq實現(xiàn)的死信隊列
- RabbitMQ之死信隊列深入解析
- springboot中RabbitMQ死信隊列的實現(xiàn)示例
- SpringBoot整合RabbitMQ實現(xiàn)延遲隊列和死信隊列
- springboot整合RabbitMQ中死信隊列的實現(xiàn)
相關(guān)文章
Mybatis-plus通過添加攔截器實現(xiàn)簡單數(shù)據(jù)權(quán)限
系統(tǒng)需要根據(jù)用戶所屬的公司,來做一下數(shù)據(jù)權(quán)限控制,具體一點,就是通過表中的company_id進行權(quán)限控制,項目使用的是mybatis-plus,所以通過添加攔截器的方式,修改查詢sql,實現(xiàn)數(shù)據(jù)權(quán)限,本文就通過代碼給大家詳細的講解一下,需要的朋友可以參考下2023-08-08Java實現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組
這篇文章主要介紹了Java實現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09java通過MySQL驅(qū)動攔截器實現(xiàn)執(zhí)行sql耗時計算
本文主要介紹了java通過MySQL驅(qū)動攔截器實現(xiàn)執(zhí)行sql耗時計算,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03JDBC數(shù)據(jù)庫連接過程及驅(qū)動加載與設(shè)計模式詳解
這篇文章主要介紹了JDBC數(shù)據(jù)庫連接過程及驅(qū)動加載與設(shè)計模式詳解,需要的朋友可以參考下2016-10-10Java中BeanUtils.copyProperties的11個坑總結(jié)
我們?nèi)粘i_發(fā)中,經(jīng)常涉及到DO、DTO、VO對象屬性拷貝賦值,很容易想到org.springframework.beans.BeanUtils的copyProperties,它會自動通過反射機制獲取源對象和目標(biāo)對象的屬性,pyProperties,會有好幾個坑呢,本文將給大家總結(jié)一下遇到的坑,需要的朋友可以參考下2023-05-05