RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)
延遲隊(duì)列
延遲隊(duì)列存儲(chǔ)的對(duì)象肯定是對(duì)應(yīng)的延時(shí)消息,所謂”延時(shí)消息”是指當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立即拿到消息,而是等待指定時(shí)間后,消費(fèi)者才拿到這個(gè)消息進(jìn)行消費(fèi)。
應(yīng)用場(chǎng)景
三方支付,掃碼支付調(diào)用上游的掃碼接口,當(dāng)掃碼有效期過(guò)后去調(diào)用查詢接口查詢結(jié)果。實(shí)現(xiàn)方式:每當(dāng)一筆掃碼支付請(qǐng)求后,立即將此訂單號(hào)放入延遲隊(duì)列中(RabbitMQ),隊(duì)列過(guò)期時(shí)間為二維碼有效期,此隊(duì)列沒(méi)有設(shè)置消費(fèi)者,過(guò)了有效期后消息會(huì)重新路由到指定的的隊(duì)列,有消費(fèi)者去執(zhí)行訂單查詢。
RabbitMQ本身沒(méi)有直接支持延遲隊(duì)列功能,但是可以通過(guò)以下特性模擬出延遲隊(duì)列的功能。 但是我們可以通過(guò)RabbitMQ的兩個(gè)特性來(lái)曲線實(shí)現(xiàn)延遲隊(duì)列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)
Time To Live(TTL)
RabbitMQ可以針對(duì)Queue和Message設(shè)置 x-message-tt,來(lái)控制消息的生存時(shí)間,如果超時(shí),則消息變?yōu)閐ead letter(死信)RabbitMQ針對(duì)隊(duì)列中的消息過(guò)期時(shí)間有兩種方法可以設(shè)置。
A: 通過(guò)隊(duì)列屬性設(shè)置,隊(duì)列中所有消息都有相同的過(guò)期時(shí)間。
<!-- 將消息放入此隊(duì)列里,此隊(duì)列設(shè)置過(guò)期時(shí)間,不制造消費(fèi)者讓其過(guò)期,過(guò)期后變成死信,消息會(huì)放入指定的新隊(duì)列里,實(shí)現(xiàn)消息的延遲消費(fèi) --> <rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">${qrcode.expire.icbc}</value> </entry> <!--消息過(guò)期根據(jù)重新路由 --> <entry key="x-dead-letter-exchange" value="directExchange"/> <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/> </rabbit:queue-arguments> </rabbit:queue>
B: 對(duì)消息進(jìn)行單獨(dú)設(shè)置,每條消息TTL可以不同。
<!-- 將消息放入此隊(duì)列里,次隊(duì)列設(shè)置過(guò)期時(shí)間,不制造消費(fèi)者讓其過(guò)期,過(guò)期后變成死信,消息會(huì)放入指定的新隊(duì)列里,實(shí)現(xiàn)消息的延遲消費(fèi) --> <rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <!--消息過(guò)期根據(jù)重新路由 --> <entry key="x-dead-letter-exchange" value="directExchange"/> <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/> </rabbit:queue-arguments> </rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(ttl.toString()); return message; } }
如果同時(shí)使用,則消息的過(guò)期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過(guò)設(shè)置的TTL值,就成為dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:指定routing-key發(fā)送
隊(duì)列出現(xiàn)dead letter的情況有:
消息或者隊(duì)列的TTL過(guò)期
隊(duì)列達(dá)到最大長(zhǎng)度
消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信后,它能被重新publish到另一個(gè)Exchange。這時(shí)候消息就可以重新被消費(fèi)。
注意一:ttl設(shè)置之后,下次修改時(shí)間,會(huì)報(bào)錯(cuò),這時(shí)候,需要先刪除該隊(duì)列,重啟項(xiàng)目。否則會(huì)報(bào)錯(cuò)。
注意二:消費(fèi)者中,拋異常了沒(méi)處理,會(huì)一直重復(fù)消費(fèi)
注意三:下面的代碼我模擬了1-10號(hào)消息,消息的內(nèi)容里面是1-10。過(guò)期的時(shí)間是10-1秒。這里要注意,雖然10是第一個(gè)發(fā)送,但是它過(guò)期的時(shí)間最長(zhǎng)。
過(guò)了10s以后,消費(fèi)者開(kāi)始收到數(shù)據(jù),但是它是一次性收到如下結(jié)果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一個(gè)收到的還是10。10是第一個(gè)放進(jìn)隊(duì)列,但是它的過(guò)期時(shí)間最長(zhǎng)。所以由此可見(jiàn),即使一個(gè)消息比在同一隊(duì)列中的其他消息提前過(guò)期,提前過(guò)期的也不會(huì)優(yōu)先進(jìn)入死信隊(duì)列,它們還是按照入庫(kù)的順序讓消費(fèi)者消費(fèi)。如果第一進(jìn)去的消息過(guò)期時(shí)間是1小時(shí),那么死信隊(duì)列的消費(fèi)者也許等1小時(shí)才能收到第一個(gè)消息。參考官方文檔發(fā)現(xiàn)“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當(dāng)過(guò)期的消息到了隊(duì)列的頂端(隊(duì)首),才會(huì)被真正的丟棄或者進(jìn)入死信隊(duì)列。
所以在考慮使用RabbitMQ來(lái)實(shí)現(xiàn)延遲任務(wù)隊(duì)列的時(shí)候,需要確保業(yè)務(wù)上每個(gè)任務(wù)的延遲時(shí)間是一致的。如果遇到不同的任務(wù)類型需要不同的延時(shí)的話,需要為每一種不同延遲時(shí)間的消息建立單獨(dú)的消息隊(duì)列。(也可以考慮緩存隊(duì)列,DelayQueue實(shí)現(xiàn)定時(shí)延遲執(zhí)行任務(wù),但是也有缺點(diǎn):就是項(xiàng)目重啟緩存里的數(shù)據(jù)就會(huì)丟失,DelayQueue的使用詳見(jiàn)其他博文)
for(int i = 10; i>0; i-- ){ amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime)); }
到此這篇關(guān)于RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)RabbitMQ 延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解
- 手把手帶你掌握SpringBoot RabbitMQ延遲隊(duì)列
- 如何通過(guò)Python實(shí)現(xiàn)RabbitMQ延遲隊(duì)列
- RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)詳解
- Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- C#實(shí)現(xiàn)rabbitmq 延遲隊(duì)列功能實(shí)例代碼
- rabbitmq延遲隊(duì)列的使用方式
相關(guān)文章
java判斷各類型字符個(gè)數(shù)實(shí)例代碼
大家好,本篇文章主要講的是java判斷各類型字符個(gè)數(shù)實(shí)例代碼,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下,方便下次瀏覽2021-12-12Jenkins Pipeline 部署 SpringBoot 應(yīng)用的教程詳解
這篇文章主要介紹了Jenkins Pipeline 部署 SpringBoot 應(yīng)用的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07Java基礎(chǔ)教程之final關(guān)鍵字淺析
這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)教程之final關(guān)鍵字的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06Jpa 實(shí)現(xiàn)自動(dòng)更新表中的創(chuàng)建日期和修改時(shí)間
這篇文章主要介紹了Jpa 實(shí)現(xiàn)自動(dòng)更新表中的創(chuàng)建日期和修改時(shí)間,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01Java的MyBatis框架中關(guān)鍵的XML字段映射的配置參數(shù)詳解
將XML文件的schema字段映射到數(shù)據(jù)庫(kù)的schema是我們操作數(shù)據(jù)庫(kù)的常用手段,這里我們就來(lái)整理一些Java的MyBatis框架中關(guān)鍵的XML字段映射的配置參數(shù)詳解,需要的朋友可以參考下2016-06-06