RocketMQ延遲消息簡明介紹
前言
場景可以是這樣的,雙11搶手機,一個新手機4000-5000,到0點的時候,沖著興奮勁,搶到了。但是摸了摸錢包,又冷靜下來了,好像不是很必要換手機。就放在那里沒有支付,過了30分鐘,自動取消了。這里就是使用延遲消息的場景,當下單之后,向消息隊列發(fā)送一條延遲30分鐘消費的消息。等到30分鐘過了,然后消費消息,執(zhí)行檢查任務,要是對應的訂單支付了,就什么都不做,要是沒支付,就取消訂單。
RocketMQ的延遲消息是org.apache.rocketmq.broker.schedule.ScheduleMessageService
類實現的
核心屬性
RMQ_SYS_SCHEDULE_TOPIC
在之前的版本中叫SCHEDULE_TOPIC
,是系統(tǒng)內置的Topic,用來保存所有的定時消息。沒有執(zhí)行的定時消息都會被保存在這個topic中。
FIRST_DELAY_TIME
第一次執(zhí)行定時任務的延遲時間,默認是1秒。
private static final long FIRST_DELAY_TIME = 1000L;
DELAY_FOR_A_WHILE
第二次以及之后每次定時任務執(zhí)行的間隔時間,默認100ms。
private static final long DELAY_FOR_A_WHILE = 100L;
DELAY_FOR_A_PERIOD
若是延遲消息投遞失敗,則在這個時間過后繼續(xù)投遞,默認10秒。
private static final long DELAY_FOR_A_PERIOD = 10000L;
delayLevelTable
這是保存延遲級別和延遲時間映射關系的地方
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
offsetTable
保存延遲級別和對應的消費位點
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);
核心方法
queueId2DelayLevel
將queueId轉換為延遲級別
public static int queueId2DelayLevel(final int queueId) { return queueId + 1; }
delayLevel2QueueId
將延遲級別轉換為queueId
public static int delayLevel2QueueId(final int delayLevel) { return delayLevel - 1; }
updateOffset
更新延遲消息topic的消費位點
private void updateOffset(int delayLevel, long offset) { this.offsetTable.put(delayLevel, offset); if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) { long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); } }
computeDeliverTimestamp
根據延遲消息級別和消息的存儲時間計算該延遲消息的投遞時間
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { Long time = this.delayLevelTable.get(delayLevel); if (time != null) { return time + storeTimestamp; } return storeTimestamp + 1000; }
start()
啟動延遲消息服務
shutdown()
關閉start方法中啟動的額timer任務
load()
加載消息的消費位點信息和全部的延遲級別信息。延遲級別信息默認如下。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
parseDelayLevel
格式化所有的延遲級別信息,保存到內存中。
到此這篇關于RocketMQ延遲消息簡明介紹的文章就介紹到這了,更多相關RocketMQ延遲消息內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springBoot之如何獲取接口請求數據和返回數據實現日志
這篇文章主要介紹了springBoot之如何獲取接口請求數據和返回數據實現日志問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04IDEA下因Lombok插件產生的Library source does not match the bytecode報
這篇文章主要介紹了IDEA下因Lombok插件產生的Library source does not match the bytecode報錯問題及解決方法,親測試過好用,需要的朋友可以參考下2020-04-04解決springboot沒有啟動標識,啟動類也沒有啟動標識的問題
這篇文章主要介紹了解決springboot沒有啟動標識,啟動類也沒有啟動標識的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01