RocketMQ-延遲消息的處理流程介紹
概述
RocketMQ 支持發(fā)送延遲消息,但不支持任意時間的延遲消息的設(shè)置,僅支持內(nèi)置預(yù)設(shè)值的延遲時間間隔的延遲消息;
預(yù)設(shè)值的延遲時間間隔為:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在消息創(chuàng)建的時候,調(diào)用 setDelayTimeLevel(int level) 方法設(shè)置延遲時間;
broker在接收到延遲消息的時候會把對應(yīng)延遲級別的消息先存儲到對應(yīng)的延遲隊列中,等延遲消息時間到達(dá)時,會把消息重新存儲到對應(yīng)的topic的queue里面。
Broker處理延遲消息
CommitLog.putMessage()
//獲取消息的sysflag final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); //非事務(wù)消息 或 已commit事務(wù)消息 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery 判斷消息是否設(shè)置延遲 if (msg.getDelayTimeLevel() > 0) { //判斷延遲級別是否大于最大級別,如果大于最大值,則將延遲級別設(shè)置為最大級 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //延遲消息的topic為 SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; //獲取延遲級別,一個延遲級別對應(yīng)一個Queue queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId //消息原始的topic,queueid保存到消息的property中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
1、判斷消息類型,如果是非事務(wù)消息、已commit事務(wù)消息,才能處理延遲消息
2、判斷消息是否設(shè)置延遲級別,如果延遲級別大于0,則該消息為延遲消息
3、判斷延遲級別是否大于最大級別,如果大于最大值,則將延遲級別設(shè)置為最大級
4、延遲消息的topic為 SCHEDULE_TOPIC_XXXX
5、獲取延遲級別,一個延遲級別對應(yīng)一個Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
啟動延遲消息定時任務(wù)
ScheduleMessageService.start()
延遲消息投遞
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java調(diào)用echarts提供的地圖壓縮方法來壓縮地圖
今天小編就為大家分享一篇關(guān)于Java調(diào)用echarts提供的地圖壓縮方法來壓縮地圖,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12