java使用RabbitMQ實(shí)現(xiàn)延遲消息示例
在分布式系統(tǒng)中,消息隊(duì)列通常用于解耦服務(wù),RabbitMQ是一個(gè)廣泛使用的消息隊(duì)列服務(wù)。延遲消息(也稱為延時(shí)隊(duì)列或TTL消息)是一種常見的場景應(yīng)用,特別適合處理某些任務(wù)在一段時(shí)間后執(zhí)行的需求,如訂單超時(shí)處理、延時(shí)通知等。
本文將以具體代碼為例,展示如何使用RabbitMQ來實(shí)現(xiàn)延遲消息處理,涵蓋隊(duì)列和交換機(jī)的配置、消息的發(fā)送與接收以及死信隊(duì)列的處理。
什么是延遲消息?
延遲消息是指消息在發(fā)送到隊(duì)列后,經(jīng)過設(shè)定的時(shí)間延遲再被消費(fèi)。RabbitMQ 本身沒有直接支持延遲隊(duì)列的功能,但可以通過 TTL(Time To Live)+ 死信隊(duì)列(Dead Letter Queue, DLQ) 的組合來實(shí)現(xiàn)。當(dāng)消息超過TTL(消息存活時(shí)間)后,不會(huì)被立即消費(fèi),而是會(huì)被轉(zhuǎn)發(fā)到綁定的死信隊(duì)列,從而實(shí)現(xiàn)延遲處理。
RabbitMQ中的延遲消息原理
在RabbitMQ中,我們可以通過以下幾個(gè)概念來實(shí)現(xiàn)延遲消息:
- TTL(Time To Live):可以為隊(duì)列設(shè)置TTL,消息超過該時(shí)間后會(huì)被標(biāo)記為“死信”。
- 死信隊(duì)列(Dead Letter Queue):當(dāng)消息在正常隊(duì)列中過期或處理失敗時(shí),RabbitMQ可以將它們路由到一個(gè)死信隊(duì)列,死信隊(duì)列可以用來處理這些過期或未處理的消息。
- x-dead-letter-exchange 和 x-dead-letter-routing-key:可以通過配置隊(duì)列的參數(shù),將過期消息發(fā)送到一個(gè)專門的死信交換器,并根據(jù)指定的路由鍵轉(zhuǎn)發(fā)到死信隊(duì)列。
消息來到ttl.queue消息隊(duì)列,過期時(shí)間內(nèi)無人消費(fèi),消息來到死信交換機(jī)hmall.direct,在direct.queue消息隊(duì)列無需等待。
1. RabbitMQ的配置
首先,我們需要配置兩個(gè)隊(duì)列和兩個(gè)交換機(jī):一個(gè)用于存放延時(shí)消息,另一個(gè)用于處理超時(shí)的死信消息。
package com.heima.stroke.configuration; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { // 延遲時(shí)間 單位:毫秒 (這里設(shè)為30秒) private static final long DELAY_TIME = 1000 * 30; // 行程超時(shí)隊(duì)列 public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE"; // 行程死信隊(duì)列 public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE"; // 行程超時(shí)隊(duì)列交換機(jī) public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE"; // 行程死信隊(duì)列交換機(jī) public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE"; // 行程超時(shí)交換機(jī) Routing Key public static final String STROKE_OVER_KEY = "STROKE_OVER_KEY"; // 行程死信交換機(jī) Routing Key public static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY"; /** * 聲明行程超時(shí)隊(duì)列,并設(shè)置其參數(shù) * x-dead-letter-exchange:綁定的死信交換機(jī) * x-dead-letter-routing-key:死信路由Key * x-message-ttl:消息的過期時(shí)間 */ @Bean public Queue strokeOverQueue() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE); args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY); args.put("x-message-ttl", DELAY_TIME); // 設(shè)置TTL為30秒 return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build(); } @Bean public DirectExchange strokeOverQueueExchange() { return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE); } @Bean public Binding bindingStrokeOverDirect() { return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY); } }
解釋:
TTL設(shè)置:我們通過x-message-ttl
設(shè)置消息的過期時(shí)間為30秒。
死信隊(duì)列綁定:通過x-dead-letter-exchange
和x-dead-letter-routing-key
設(shè)置,當(dāng)消息過期時(shí),它會(huì)被轉(zhuǎn)發(fā)到死信交換機(jī),再路由到死信隊(duì)列。
2. 生產(chǎn)者發(fā)送延遲消息
接下來,我們通過生產(chǎn)者向超時(shí)隊(duì)列發(fā)送消息,這些消息將在TTL過期后轉(zhuǎn)發(fā)到死信隊(duì)列。
package com.heima.stroke.rabbitmq; import com.alibaba.fastjson.JSON; import com.heima.modules.vo.StrokeVO; import com.heima.stroke.configuration.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MQProducer { private final static Logger logger = LoggerFactory.getLogger(MQProducer.class); @Autowired RabbitTemplate rabbitTemplate; /** * 發(fā)送延時(shí)消息到行程超時(shí)隊(duì)列 * * @param strokeVO 消息體 */ public void sendOver(StrokeVO strokeVO) { String mqMessage = JSON.toJSONString(strokeVO); logger.info("send timeout msg:{}", mqMessage); rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage); } }
解釋:
sendOver
方法將消息發(fā)送到超時(shí)隊(duì)列,消息將在超時(shí)后進(jìn)入死信隊(duì)列。生產(chǎn)者不需要額外處理TTL或死信的配置,只需發(fā)送消息即可。
3. 消費(fèi)者監(jiān)聽死信隊(duì)列
當(dāng)消息超過TTL后,將會(huì)被轉(zhuǎn)發(fā)到死信隊(duì)列。消費(fèi)者需要監(jiān)聽死信隊(duì)列并處理這些消息。
package com.heima.stroke.rabbitmq; import com.alibaba.fastjson.JSON; import com.heima.modules.vo.StrokeVO; import com.heima.stroke.configuration.RabbitConfig; import com.heima.stroke.handler.StrokeHandler; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class MQConsumer { private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class); @Autowired private StrokeHandler strokeHandler; /** * 監(jiān)聽死信隊(duì)列 * * @param message 消息體 * @param channel RabbitMQ的Channel * @param tag 消息的Delivery Tag */ @RabbitListener( bindings = { @QueueBinding( value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.STROKE_DEAD_KEY) }) @RabbitHandler public void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class); logger.info("get dead msg:{}", message.getBody()); if (strokeVO == null) { return; } try { // 處理超時(shí)的行程消息 strokeHandler.timeoutHandel(strokeVO); // 手動(dòng)確認(rèn)消息 channel.basicAck(tag, false); } catch (Exception e) { e.printStackTrace(); } } }
解釋:
@RabbitListener
注解綁定了死信隊(duì)列的監(jiān)聽器。當(dāng)消息被轉(zhuǎn)發(fā)到死信隊(duì)列時(shí),該消費(fèi)者會(huì)接收到消息。
使用 channel.basicAck(tag, false)
手動(dòng)確認(rèn)消息處理成功,確保消息不會(huì)重復(fù)消費(fèi)。
4. 處理超時(shí)業(yè)務(wù)邏輯
在我們的業(yè)務(wù)中,當(dāng)消息超時(shí)未處理時(shí),將其狀態(tài)設(shè)置為超時(shí)。
public void timeoutHandel(StrokeVO strokeVO) { // 獲取司機(jī)行程ID和乘客行程ID String inviterTripId = strokeVO.getInviterTripId(); String inviteeTripId = strokeVO.getInviteeTripId(); // 檢查邀請(qǐng)狀態(tài)是否為未確認(rèn) String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId); String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId); if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) && String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) { // 更新為超時(shí)狀態(tài) redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode())); redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode())); } }
到此這篇關(guān)于java使用RabbitMQ實(shí)現(xiàn)延遲消息示例的文章就介紹到這了,更多相關(guān)java RabbitMQ延遲消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA 自帶的數(shù)據(jù)庫工具真的很牛(收藏版)
這篇文章主要介紹了IDEA 自帶的數(shù)據(jù)庫工具真的很牛(收藏版),本文以 IntelliJ IDEA/ Mac 版本作為演示,其他版本的應(yīng)該也差距不大,需要的朋友可以參考下2021-04-04springBoot項(xiàng)目如何實(shí)現(xiàn)啟動(dòng)多個(gè)實(shí)例
這篇文章主要介紹了springBoot項(xiàng)目如何實(shí)現(xiàn)啟動(dòng)多個(gè)實(shí)例的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java中Druid連接池連接超時(shí)獲取不到連接的解決
這篇文章主要介紹了Java中Druid連接池連接超時(shí)獲取不到連接的解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Maven項(xiàng)目繼承實(shí)現(xiàn)過程圖解
這篇文章主要介紹了Maven項(xiàng)目繼承實(shí)現(xiàn)過程圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08關(guān)于@Autowierd && @Resource 你真的了解嗎
這篇文章主要介紹了關(guān)于@Autowierd && @Resource的具體使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08spring集成mybatis實(shí)現(xiàn)mysql數(shù)據(jù)庫讀寫分離
本文通過實(shí)例代碼給大家介紹了spring集成mybatis實(shí)現(xiàn)mysql數(shù)據(jù)庫讀寫分離,需要的朋友可以參考下2017-08-08使用springboot實(shí)現(xiàn)上傳文件時(shí)校驗(yàn)文件是否有病毒
在SpringBoot中實(shí)現(xiàn)文件上傳時(shí)的病毒校驗(yàn),可以使用ClamAV、Metascan或VirusTotal等工具,這些工具通過掃描上傳的文件,可以有效地檢測和阻止惡意軟件的傳播,安裝和配置ClamAV服務(wù)的步驟如下:下載并安裝ClamAV二進(jìn)制文件,配置clamd.conf文件2025-01-01