RocketMQ延遲消息超詳細(xì)講解
一、什么是延時(shí)消息
當(dāng)消息寫入到Broker后,不能立刻被消費(fèi)者消費(fèi),需要等待指定的時(shí)長后才可被消費(fèi)處理的消息,稱為延時(shí)消息。
二、延時(shí)消息等級
RocketMQ延時(shí)消息的延遲時(shí)長不支持隨意時(shí)長的延遲,是通過特定的延遲等級來指定的。默認(rèn)支持18個(gè)等級的延遲消息,延時(shí)等級定義在RocketMQ服務(wù)端的MessageStoreConfig類中的如下變量中:
// MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; //發(fā)消息時(shí),設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況: level == 0,消息為非延遲消息 1<=level<=maxLevel,消息延遲特定時(shí)間,例如level==1,延遲1s level > maxLevel,則level== maxLevel,例如level==20,延遲2h
例如指定的延時(shí)等級為3,則表示延遲時(shí)長為10s,即延遲等級是從1開始計(jì)數(shù)的。
三、延時(shí)消息使用場景
采用RocketMQ的延時(shí)消息可以實(shí)現(xiàn)定時(shí)任務(wù)的功能,而無需使用定時(shí)器。使用場景主要有:
(1)、電商交易系統(tǒng)的訂單超時(shí)未支付,自動取消訂單
在電商交易系統(tǒng)中,像淘寶、京東,我們提交了一個(gè)訂單之后,在支付時(shí)都會提示,需要在指定時(shí)間內(nèi)(例如30分鐘)完成支付,否則訂單將被取消的消息,實(shí)際上這個(gè)超時(shí)未支付功能就可以使用延時(shí)消息來實(shí)現(xiàn)。在下單成功之后,就發(fā)送一個(gè)延時(shí)消息,然后指定消息的延時(shí)時(shí)間為30分鐘,這條消息將會在30分鐘后投遞給后臺業(yè)務(wù)系統(tǒng)(Consumer),此時(shí)才能被消費(fèi)者進(jìn)行消費(fèi),消費(fèi)消息的時(shí)候會再去檢查這個(gè)訂單的狀態(tài),確認(rèn)下是否支付成功,如果支付成功,則忽略不處理;如果訂單還是未支付,則進(jìn)行取消訂單、釋放庫存等操作;
(2)、活動場景
比如B站視頻投稿經(jīng)常會發(fā)起一些活動,Up主在活動期間可以按照活動規(guī)則投稿視頻,在活動時(shí)間截止后,后臺根據(jù)Up主完成任務(wù)的情況以及結(jié)合投稿視頻的播放量等進(jìn)行判定,然后派發(fā)對應(yīng)的獎勵。這種場景我們也可以采用延時(shí)消息來實(shí)現(xiàn),即在發(fā)起活動后,同時(shí)發(fā)送一條延時(shí)消息,延時(shí)時(shí)間設(shè)置為本次活動周期的時(shí)間。當(dāng)活動結(jié)束后,這條延時(shí)消息剛好可以被消費(fèi)者進(jìn)行消費(fèi),這樣就可以消費(fèi)消息然后執(zhí)行一系列的邏輯處理。
(3)、其它信息提醒等場景;
四、延時(shí)消息示例
(1)、編寫Consumer消費(fèi)端并啟動,等待接收Producer發(fā)送過來的消息
/** * 消息消費(fèi)者 */ public class MQConsumer { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費(fèi)者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqPushConsumer.setNamesrvAddr("10.0.90.86:9876"); // 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) // 如果不是第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 消費(fèi)者最小線程量 mqPushConsumer.setConsumeThreadMin(5); // 消費(fèi)者最大線程量 mqPushConsumer.setConsumeThreadMax(10); // 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 訂閱一個(gè)或者多個(gè)Topic,以及Tag來過濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用* mqPushConsumer.subscribe("DelayTopic", "*"); // 注冊回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 監(jiān)聽類實(shí)現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : msgList) { String body = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("消費(fèi)者接收到消息: " + message.toString() + "---消息內(nèi)容為:" + body + "消息被消費(fèi)時(shí)間:" + new Date(System.currentTimeMillis()) + ", 消息存儲時(shí)間: " + new Date(message.getBornTimestamp())); } // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費(fèi)者實(shí)例 mqPushConsumer.start(); } }
(2)、編寫Producer生產(chǎn)端,發(fā)送延時(shí)消息
RocketMQ要實(shí)現(xiàn)發(fā)送延遲消息,只需在發(fā)送消息之前調(diào)用Message#setDelayTimeLevel()方法設(shè)置消息的延遲等級即可。
只需要設(shè)置一個(gè)延遲級別即可,注意不是具體的延遲時(shí)間。如果設(shè)置的延遲級別超過最大值,那么將會重置為最大值。
/** * Producer端發(fā)送延遲消息:只需在發(fā)送消息之前調(diào)用setDelayTimeLevel()方法設(shè)置消息的延遲等級即可 */ public class SyncMQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqProducer.setNamesrvAddr("10.0.90.86:9876"); // 消息最大長度 默認(rèn)4M mqProducer.setMaxMessageSize(4096); // 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000 mqProducer.setSendMsgTimeout(3000); // 發(fā)送消息失敗重試次數(shù),默認(rèn)2 mqProducer.setRetryTimesWhenSendAsyncFailed(2); // 啟動消息生產(chǎn)者 mqProducer.start(); // 創(chuàng)建消息,并指定Topic(主題),Tag(標(biāo)簽)和消息內(nèi)容 Message message = new Message("DelayTopic", "", "hello, 這是延遲消息".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 設(shè)置延時(shí)等級為3, 所以這個(gè)消息將在10s之后發(fā)送, RocketMQ目前只支持固定的幾個(gè)延時(shí)時(shí)間,還不支持自定義延遲時(shí)間 message.setDelayTimeLevel(3); // 發(fā)送同步消息到一個(gè)Broker,可以通過sendResult返回消息是否成功送達(dá) SendResult sendResult = mqProducer.send(message); System.out.println(sendResult); // 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例 mqProducer.shutdown(); } }
(3)、啟動Producer
SendResult [sendStatus=SEND_OK, msgId=AC6E005A51B018B4AAC278E9F6F70000, offsetMsgId=0A005A5600002A9F0000000000003465, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=0], queueOffset=3]
從控制臺可以看到,消息發(fā)送狀態(tài)為SEND_OK,說明延遲消息已經(jīng)成功發(fā)送到RocketMQ Broker中。
(4)、觀察Consumer是否接收到消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=241, queueOffset=1, sysFlag=0, bornTimestamp=1645673399032, bornHost=/10.0.90.115:62807, storeTimestamp=1645673403365, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000355F, commitLogOffset=13663, bodyCRC=676533924, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DelayTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=DelayTopic, MAX_OFFSET=2, CONSUME_START_TIME=1645673409075, UNIQ_KEY=AC6E005A51B018B4AAC278E9F6F70000, CLUSTER=DefaultCluster, DELAY=3, WAIT=true, REAL_QID=0}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -27, -69, -74, -24, -65, -97, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是延遲消息消息被消費(fèi)時(shí)間:Thu Feb 24 11:30:09 CST 2022, 消息存儲時(shí)間: Thu Feb 24 11:29:59 CST 2022
可以看到,延遲消息成功被消息,并且我們注意到,消息被Consumer消費(fèi)的時(shí)間【Thu Feb 24 11:30:09 CST 2022】 - 消息存儲時(shí)間【Thu Feb 24 11:29:59 CST 2022】 = 10s,發(fā)送消息的時(shí)候,指定的延遲等級也是10s,也就是消息的消費(fèi)比存儲時(shí)間晚10秒。
五、延時(shí)消息實(shí)現(xiàn)原理
RocketMQ延時(shí)消息會暫存在名為SCHEDULE_TOPIC_XXXX的Topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個(gè)queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)。broker會調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫入真實(shí)的topic。
SCHEDULE_TOPIC_XXXX中consumequeue中的文件夾名稱就是隊(duì)列的名稱,并且【隊(duì)列名稱 = 延遲等級 - 1】;如下圖,在前面的例子中,我們執(zhí)定消息的延遲時(shí)間為10s,對應(yīng)的延遲等級是3,所以文件夾名稱為【3 - 1 = 2】
延遲消息在RocketMQ Broker端的流轉(zhuǎn)如下圖所示:
主要包含以下6個(gè)步驟:
(1)、修改消息Topic名稱和隊(duì)列信息
RocketMQ Broker端在存儲生產(chǎn)者寫入的消息時(shí),首先都會將其寫入到CommitLog中。之后根據(jù)消息中的Topic信息和隊(duì)列信息,將其轉(zhuǎn)發(fā)到目標(biāo)Topic的指定隊(duì)列(ConsumeQueue)中。
由于消息一旦存儲到ConsumeQueue中,消費(fèi)者就能消費(fèi)到,而延遲消息不能被立即消費(fèi),所以這里將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級別確定要投遞到哪個(gè)隊(duì)列下。同時(shí),還會將消息原來要發(fā)送到的目標(biāo)Topic和隊(duì)列信息存儲到消息的屬性中。
(2)、轉(zhuǎn)發(fā)消息到延遲主題SCHEDULE_TOPIC_XXXX的CosumeQueue中
CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進(jìn)行的。在轉(zhuǎn)發(fā)過程中,會對延遲消息進(jìn)行特殊處理,主要是計(jì)算這條延遲消息需要在什么時(shí)候進(jìn)行投遞。
投遞時(shí)間 = 消息存儲時(shí)間(storeTimestamp) + 延遲級別對應(yīng)的時(shí)間
需要注意的是,會將計(jì)算出的投遞時(shí)間當(dāng)做消息Tag的哈希值存儲到CosumeQueue中,CosumeQueue單個(gè)存儲單元組成結(jié)構(gòu)如下圖所示:
其中:
- Commit Log Offset:記錄在CommitLog中的位置;
- Size:記錄消息的大??;
- Message Tag HashCode:記錄消息Tag的哈希值,用于消息過濾。特別的,對于延遲消息,這個(gè)字段記錄的是消息的投遞時(shí)間戳。這也是為什么java中hashCode方法返回一個(gè)int型,只占用4個(gè)字節(jié),而這里Message Tag HashCode字段卻設(shè)計(jì)成8個(gè)字節(jié)的原因;
(3)、延遲服務(wù)消費(fèi)SCHEDULE_TOPIC_XXXX消息
Broker內(nèi)部有一個(gè)ScheduleMessageService類,其充當(dāng)延遲服務(wù),主要是消費(fèi)SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標(biāo)Topic中。
ScheduleMessageService在啟動時(shí),其會創(chuàng)建一個(gè)定時(shí)器Timer,并根據(jù)延遲級別的個(gè)數(shù),啟動對應(yīng)數(shù)量的TimerTask,每個(gè)TimerTask負(fù)責(zé)一個(gè)延遲級別的消費(fèi)與投遞。
需要注意的是,每個(gè)TimeTask在檢查消息是否到期時(shí),首先檢查對應(yīng)隊(duì)列中尚未投遞第一條消息,如果這條消息沒到期,那么之后的消息都不會檢查。如果到期了,則進(jìn)行投遞,并檢查之后的消息是否到期。
(4)、將信息重新存儲到CommitLog中
在將消息到期后,需要投遞到目標(biāo)Topic。由于在第一步已經(jīng)記錄了原來的Topic和隊(duì)列信息,因此這里重新設(shè)置,再存儲到CommitLog即可。此外,由于之前Message Tag HashCode字段存儲的是消息的投遞時(shí)間,這里需要重新計(jì)算tag的哈希值后再存儲。
(5)、將消息投遞到目標(biāo)Topic中
這一步與第二步類似,不過由于消息的Topic名稱已經(jīng)改為了目標(biāo)Topic。因此消息會直接投遞到目標(biāo)Topic的ConsumeQueue中,之后消費(fèi)者即消費(fèi)到這條消息。
(6)、消費(fèi)者消費(fèi)目標(biāo)topic中的數(shù)據(jù)。
到此這篇關(guān)于RocketMQ延遲消息超詳細(xì)講解的文章就介紹到這了,更多相關(guān)RocketMQ延遲消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java swing實(shí)現(xiàn)酒店管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java swing實(shí)現(xiàn)酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-02-02Mybatis-plus如何提前獲取實(shí)體類用雪花算法生成的ID
本文主要介紹了Mybatis-plus如何提前獲取實(shí)體類用雪花算法生成的ID,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Spring Security 實(shí)現(xiàn)用戶名密碼登錄流程源碼詳解
在服務(wù)端的安全管理使用了Spring Security,用戶登錄成功之后,Spring Security幫你把用戶信息保存在Session里,但是具體保存在哪里,要是不深究你可能就不知道,今天小編就帶大家具體了解一下Spring Security實(shí)現(xiàn)用戶名密碼登錄的流程2021-11-11SpringBoot的@RestControllerAdvice作用詳解
這篇文章主要介紹了SpringBoot的@RestControllerAdvice作用詳解,@RestContrllerAdvice是一種組合注解,由@ControllerAdvice,@ResponseBody組成,本質(zhì)上就是@Component,需要的朋友可以參考下2024-01-01groovy腳本定義結(jié)構(gòu)表一鍵生成POJO類
這篇文章主要為大家介紹了groovy腳本定義結(jié)構(gòu)表一鍵生成POJO類示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03