RabbitMQ消息隊列實現(xiàn)延遲任務(wù)示例
一、序言
延遲任務(wù)應(yīng)用廣泛,延遲任務(wù)典型應(yīng)用場景有訂單超時自動取消;支付回調(diào)重試。其中訂單超時取消具有冪等性屬性,無需考慮重復(fù)消費問題;支付回調(diào)重試需要考慮重復(fù)消費問題。
延遲任務(wù)具有如下特點:在未來的某個時間點執(zhí)行;一般僅執(zhí)行一次。
1、實現(xiàn)原理
生產(chǎn)者將帶有延遲信息的消息發(fā)送到RabbitMQ交換機中,等待延遲時間結(jié)束方將消息轉(zhuǎn)發(fā)到綁定的隊列中,消費者通過監(jiān)聽隊列消費消息。延遲任務(wù)的關(guān)鍵在消息在交換機中停留。
顯而易見,基于RabbitMQ實現(xiàn)延遲任務(wù)對服務(wù)器的可靠性要求極高,交換機內(nèi)部消息無持久化機制,比如單機模式服務(wù)重啟,未開始的延遲任務(wù)均丟失。
2、組件選型
二、方案設(shè)計
(一)服務(wù)器
RabbitMQ服務(wù)需要安裝x-delayed-message
插件以處理延遲消息。
(二)生產(chǎn)者
延遲任務(wù)的實現(xiàn)對生產(chǎn)者的要求是將消息可靠的投遞到交換機,因此使用confirm確認機制即可。
訂單生成之后,先入庫,然后以訂單ID為key將訂單詳情存入Redis中(持久化),向RabbitMQ發(fā)送異步confirm確定請求。如果收到正常投遞返回,則刪除Redis中訂單ID為key的數(shù)據(jù),回收內(nèi)存,否則以訂單ID為key,從Redis中查詢出訂單數(shù)據(jù),重新發(fā)送。
(三)消費者
延遲任務(wù)的實現(xiàn)對消費者的要求是以信息不丟失的方式消費消息,具體表現(xiàn)在:手動確認消息的消費,防止消息丟失;消費端持續(xù)穩(wěn)定,防止消息堆積;消息消費失敗有重試機制。
考慮到訂單延遲取消屬于冪等性操作,因此無需考慮消息的重復(fù)消費問題。
三、SpringBoot實現(xiàn)
實現(xiàn)部分僅貼一部分核心源碼,完整項目請訪問GitHub。
(一)生產(chǎn)者
考慮到下單是極為重要的操作,因此首先將訂單落庫、存盤,然后進行后續(xù)操作。
for (long i = 1; i <= 10; i++) { /* 1.模擬生成訂單 */ BuOrder order = createOrder(i); /* 2.訂單入庫 */ orderService.removeById(order); orderService.saveOrUpdate(order); /* 3.將訂單存入信息Redis */ RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order); /* 4.向RabbitMQ異步投遞消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); }
生產(chǎn)者可靠投遞消息
public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData == null) { return; } String key = ORDER_PREFIX + correlationData.getId(); if (ack) { /* 如果消息投遞成功,則刪除Redis中訂單數(shù)據(jù),回收內(nèi)存 */ RedisUtils.deleteObject(key); } else { /* 從Redis中讀取訂單數(shù)據(jù),重新投遞 */ BuOrder order = RedisUtils.getObject(key, BuOrder.class); /* 重新投遞消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); } }
(二)消費者
消費者端手動確認,避免消息丟失;失敗自動重試。
@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME) public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException { if (Objects.equals(0, order.getOrderStatus())) { /* 修改訂單狀態(tài),設(shè)置為關(guān)閉狀態(tài) */ orderService.updateById(new BuOrder(order.getOrderId(), -1)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info(String.format("消費者節(jié)點01消費編號為【%s】的消息", order.getOrderId())); } }
消費者可靠消費應(yīng)至少開啟兩個及以上應(yīng)用,確保消息隊列中不積壓消息。
(三)通用工具包
上述代碼涉及一個工具類RabbitUtils
,存在于如下依賴中,主要封裝RabbitMQ極常用的工具方法。
<dependency> <groupId>xin.altitude.cms</groupId> <artifactId>ucode-cms-common</artifactId> <version>1.4.3.1</version> </dependency>
以上就是RabbitMQ消息隊列實現(xiàn)延遲任務(wù)示例的詳細內(nèi)容,更多關(guān)于RabbitMQ消息隊列延遲任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Netty分布式FastThreadLocal的set方法實現(xiàn)邏輯剖析
這篇文章主要為大家介紹了Netty分布式FastThreadLocal的set方法實現(xiàn)邏輯剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03使用Java如何對復(fù)雜的數(shù)據(jù)類型排序和比大小
我相信大家在第一次接觸算法的時候,最先接觸的肯定也是從排序算法開始的,下面這篇文章主要給大家介紹了關(guān)于使用Java如何對復(fù)雜的數(shù)據(jù)類型排序和比大小的相關(guān)資料,需要的朋友可以參考下2023-12-12Mybatis實現(xiàn)動態(tài)增刪改查功能的示例代碼
這篇文章主要介紹了Mybatis實現(xiàn)動態(tài)增刪改查功能的示例代碼,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04解析Java中所有錯誤和異常的父類java.lang.Throwable
這篇文章主要介紹了Java中所有錯誤和異常的父類java.lang.Throwable,文章中簡單地分析了其源碼,說明在代碼注釋中,需要的朋友可以參考下2016-03-03