RocketMQ延遲消息簡(jiǎn)明介紹
前言
場(chǎng)景可以是這樣的,雙11搶手機(jī),一個(gè)新手機(jī)4000-5000,到0點(diǎn)的時(shí)候,沖著興奮勁,搶到了。但是摸了摸錢包,又冷靜下來(lái)了,好像不是很必要換手機(jī)。就放在那里沒(méi)有支付,過(guò)了30分鐘,自動(dòng)取消了。這里就是使用延遲消息的場(chǎng)景,當(dāng)下單之后,向消息隊(duì)列發(fā)送一條延遲30分鐘消費(fèi)的消息。等到30分鐘過(guò)了,然后消費(fèi)消息,執(zhí)行檢查任務(wù),要是對(duì)應(yīng)的訂單支付了,就什么都不做,要是沒(méi)支付,就取消訂單。
RocketMQ的延遲消息是org.apache.rocketmq.broker.schedule.ScheduleMessageService類實(shí)現(xiàn)的
核心屬性
RMQ_SYS_SCHEDULE_TOPIC
在之前的版本中叫SCHEDULE_TOPIC,是系統(tǒng)內(nèi)置的Topic,用來(lái)保存所有的定時(shí)消息。沒(méi)有執(zhí)行的定時(shí)消息都會(huì)被保存在這個(gè)topic中。
FIRST_DELAY_TIME
第一次執(zhí)行定時(shí)任務(wù)的延遲時(shí)間,默認(rèn)是1秒。
private static final long FIRST_DELAY_TIME = 1000L;
DELAY_FOR_A_WHILE
第二次以及之后每次定時(shí)任務(wù)執(zhí)行的間隔時(shí)間,默認(rèn)100ms。
private static final long DELAY_FOR_A_WHILE = 100L;
DELAY_FOR_A_PERIOD
若是延遲消息投遞失敗,則在這個(gè)時(shí)間過(guò)后繼續(xù)投遞,默認(rèn)10秒。
private static final long DELAY_FOR_A_PERIOD = 10000L;
delayLevelTable
這是保存延遲級(jí)別和延遲時(shí)間映射關(guān)系的地方
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);offsetTable
保存延遲級(jí)別和對(duì)應(yīng)的消費(fèi)位點(diǎn)
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
核心方法
queueId2DelayLevel
將queueId轉(zhuǎn)換為延遲級(jí)別
public static int queueId2DelayLevel(final int queueId) {
return queueId + 1;
}
delayLevel2QueueId
將延遲級(jí)別轉(zhuǎn)換為queueId
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}updateOffset
更新延遲消息topic的消費(fèi)位點(diǎn)
private void updateOffset(int delayLevel, long offset) {
this.offsetTable.put(delayLevel, offset);
if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) {
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
}
}
computeDeliverTimestamp
根據(jù)延遲消息級(jí)別和消息的存儲(chǔ)時(shí)間計(jì)算該延遲消息的投遞時(shí)間
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
Long time = this.delayLevelTable.get(delayLevel);
if (time != null) {
return time + storeTimestamp;
}
return storeTimestamp + 1000;
}
start()
啟動(dòng)延遲消息服務(wù)
shutdown()
關(guān)閉start方法中啟動(dòng)的額timer任務(wù)
load()
加載消息的消費(fèi)位點(diǎn)信息和全部的延遲級(jí)別信息。延遲級(jí)別信息默認(rèn)如下。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
parseDelayLevel
格式化所有的延遲級(jí)別信息,保存到內(nèi)存中。
到此這篇關(guān)于RocketMQ延遲消息簡(jiǎn)明介紹的文章就介紹到這了,更多相關(guān)RocketMQ延遲消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)簡(jiǎn)單的猜數(shù)字小游戲
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)簡(jiǎn)單猜數(shù)字小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-03-03
springBoot之如何獲取接口請(qǐng)求數(shù)據(jù)和返回?cái)?shù)據(jù)實(shí)現(xiàn)日志
這篇文章主要介紹了springBoot之如何獲取接口請(qǐng)求數(shù)據(jù)和返回?cái)?shù)據(jù)實(shí)現(xiàn)日志問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
IDEA下因Lombok插件產(chǎn)生的Library source does not match the bytecode報(bào)
這篇文章主要介紹了IDEA下因Lombok插件產(chǎn)生的Library source does not match the bytecode報(bào)錯(cuò)問(wèn)題及解決方法,親測(cè)試過(guò)好用,需要的朋友可以參考下2020-04-04
基于Java驗(yàn)證jwt token代碼實(shí)例
這篇文章主要介紹了基于Java驗(yàn)證jwt token代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
SpringBoot Admin的簡(jiǎn)單使用的方法步驟
本文主要介紹了SpringBoot Admin的簡(jiǎn)單使用的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
解決springboot沒(méi)有啟動(dòng)標(biāo)識(shí),啟動(dòng)類也沒(méi)有啟動(dòng)標(biāo)識(shí)的問(wèn)題
這篇文章主要介紹了解決springboot沒(méi)有啟動(dòng)標(biāo)識(shí),啟動(dòng)類也沒(méi)有啟動(dòng)標(biāo)識(shí)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01

