RocketMQ-延遲消息的處理流程介紹
概述
RocketMQ 支持發(fā)送延遲消息,但不支持任意時(shí)間的延遲消息的設(shè)置,僅支持內(nèi)置預(yù)設(shè)值的延遲時(shí)間間隔的延遲消息;
預(yù)設(shè)值的延遲時(shí)間間隔為:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在消息創(chuàng)建的時(shí)候,調(diào)用 setDelayTimeLevel(int level) 方法設(shè)置延遲時(shí)間;
broker在接收到延遲消息的時(shí)候會(huì)把對(duì)應(yīng)延遲級(jí)別的消息先存儲(chǔ)到對(duì)應(yīng)的延遲隊(duì)列中,等延遲消息時(shí)間到達(dá)時(shí),會(huì)把消息重新存儲(chǔ)到對(duì)應(yīng)的topic的queue里面。
Broker處理延遲消息
CommitLog.putMessage()
//獲取消息的sysflag final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); //非事務(wù)消息 或 已commit事務(wù)消息 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery 判斷消息是否設(shè)置延遲 if (msg.getDelayTimeLevel() > 0) { //判斷延遲級(jí)別是否大于最大級(jí)別,如果大于最大值,則將延遲級(jí)別設(shè)置為最大級(jí) if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //延遲消息的topic為 SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; //獲取延遲級(jí)別,一個(gè)延遲級(jí)別對(duì)應(yīng)一個(gè)Queue queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId //消息原始的topic,queueid保存到消息的property中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
1、判斷消息類型,如果是非事務(wù)消息、已commit事務(wù)消息,才能處理延遲消息
2、判斷消息是否設(shè)置延遲級(jí)別,如果延遲級(jí)別大于0,則該消息為延遲消息
3、判斷延遲級(jí)別是否大于最大級(jí)別,如果大于最大值,則將延遲級(jí)別設(shè)置為最大級(jí)
4、延遲消息的topic為 SCHEDULE_TOPIC_XXXX
5、獲取延遲級(jí)別,一個(gè)延遲級(jí)別對(duì)應(yīng)一個(gè)Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
啟動(dòng)延遲消息定時(shí)任務(wù)
ScheduleMessageService.start()
延遲消息投遞
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java保留小數(shù)的四種實(shí)現(xiàn)方法
這篇文章主要為大家詳細(xì)介紹了java保留小數(shù)的四種實(shí)現(xiàn)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-11-11Java調(diào)用echarts提供的地圖壓縮方法來壓縮地圖
今天小編就為大家分享一篇關(guān)于Java調(diào)用echarts提供的地圖壓縮方法來壓縮地圖,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12Spring中@DependsOn注解的使用代碼實(shí)例
這篇文章主要介紹了Spring中@DependsOn注解的使用代碼實(shí)例,Spring中@DependsOn,主要是使用在類和方法上, 作用是當(dāng)前對(duì)象要依賴另外一些對(duì)象,被依賴的對(duì)象會(huì)先注冊(cè)到Spring的IOC容器中,需要的朋友可以參考下2024-01-01