Java中RocketMQ的延遲消息詳解
RocketMQ簡介
RocketMQ是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠、萬億級容量、靈活可伸縮的消息發(fā)布與訂閱服務(wù)。
它前身是MetaQ,是阿里基于Kafka的設(shè)計(jì)使用Java進(jìn)行自主研發(fā)的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻(xiàn)給Apache軟件基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化項(xiàng)目。2017 年,Apache軟件基金會宣布RocketMQ已孵化成為 Apache頂級項(xiàng)目(Top Level Project,簡稱為TLP ),是國內(nèi)首個互聯(lián)網(wǎng)中間件在 Apache上的頂級項(xiàng)目。
延遲消息
生產(chǎn)者把消息發(fā)送到消息隊(duì)列中以后,并不期望被立即消費(fèi),而是等待指定時(shí)間后才可以被消費(fèi)者消費(fèi),這類消息通常被稱為延遲消息。
在RocketMQ中,支持延遲消息,但是不支持任意時(shí)間精度的延遲消息,只支持特定級別的延遲消息。如果要支持任意時(shí)間精度,不能避免在Broker層面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免產(chǎn)生巨大的性能開銷。
消息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。在發(fā)送消息時(shí),設(shè)置消息延遲級別即可,設(shè)置消息延遲級別時(shí)有以下3種情況:
- 設(shè)置消息延遲級別等于0時(shí),則該消息為非延遲消息。
- 設(shè)置消息延遲級別大于等于1并且小于等于18時(shí),消息延遲特定時(shí)間,如:設(shè)置消息延遲級別等于1,則延遲1s;設(shè)置消息延遲級別等于2,則延遲5s,以此類推。
- 設(shè)置消息延遲級別大于18時(shí),則該消息延遲級別為18,如:設(shè)置消息延遲級別等于20,則延遲2h。
延遲消息示例
首先,寫一個消費(fèi)者,用于消費(fèi)延遲消息:
public class Consumer { public static void main(String[] args) throws MQClientException { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 實(shí)例化消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup"); // 設(shè)置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費(fèi)的消息 consumer.subscribe("OneMoreTopic", "*"); // 注冊回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s %s Receive New Messages:%n" , sdf.format(new Date()) , Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.printf("\tMsg Id: %s%n", msg.getMsgId()); System.out.printf("\tBody: %s%n", new String(msg.getBody())); } // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 啟動消費(fèi)者實(shí)例 consumer.start(); System.out.println("Consumer Started."); } }
再寫一個延遲消息的生產(chǎn)者,用于發(fā)送延遲消息:
public class DelayProducer { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 實(shí)例化消息生產(chǎn)者Producer DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup"); // 設(shè)置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 啟動Producer實(shí)例 producer.start(); Message msg = new Message("OneMoreTopic" , "DelayMessage", "This is a delay message.".getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" //設(shè)置消息延遲級別為3,也就是延遲10s。 msg.setDelayTimeLevel(3); // 發(fā)送消息到一個Broker SendResult sendResult = producer.send(msg); // 通過sendResult返回消息是否成功送達(dá) System.out.printf("%s Send Status: %s, Msg Id: %s %n" , sdf.format(new Date()) , sendResult.getSendStatus() , sendResult.getMsgId()); // 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例。 producer.shutdown(); } }
運(yùn)行生產(chǎn)者以后,就會發(fā)送一條延遲消息:
10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000
10秒鐘后,消費(fèi)者收到的這條延遲消息:
10:37:25.026 ConsumeMessageThread_1 Receive New Messages: Msg Id: C0A8006D5AB018B4AAC216E0DB690000 Body: This is a delay message.
延遲消息的原理分析
以下分析的RocketMQ源碼的版本號是4.7.1,版本不同源碼略有差別。
CommitLog
在org.apache.rocketmq.store.CommitLog中,針對延遲消息做了一些處理:
// 延遲級別大于0,就是延時(shí)消息 if (msg.getDelayTimeLevel() > 0) { // 判斷當(dāng)前延遲級別,如果大于最大延遲級別, // 就設(shè)置當(dāng)前延遲級別為最大延遲級別。 if (msg.getDelayTimeLevel() > this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()); } // 獲取延遲消息的主題, // 其中RMQ_SYS_SCHEDULE_TOPIC的值為SCHEDULE_TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根據(jù)延遲級別獲取延遲消息的隊(duì)列Id, // 隊(duì)列Id其實(shí)就是延遲級別減1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 備份真正的主題和隊(duì)列Id MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 設(shè)置延時(shí)消息的主題和隊(duì)列Id msg.setTopic(topic); msg.setQueueId(queueId); }
可以看到,每一個延遲消息的主題都被暫時(shí)更改為SCHEDULE_TOPIC_XXXX,并且根據(jù)延遲級別延遲消息變更了新的隊(duì)列Id。接下來,處理延遲消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。
ScheduleMessageService
ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進(jìn)行初始化的,初始化包括構(gòu)造對象和調(diào)用 load
方法。最后,再執(zhí)行ScheduleMessageService的 start
方法:
public void start() { // 使用AtomicBoolean確保start方法僅有效執(zhí)行一次 if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 遍歷所有延遲級別 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // key為延遲級別 Integer level = entry.getKey(); // value為延遲級別對應(yīng)的毫秒數(shù) Long timeDelay = entry.getValue(); // 根據(jù)延遲級別獲得對應(yīng)隊(duì)列的偏移量 Long offset = this.offsetTable.get(level); // 如果偏移量為null,則設(shè)置為0 if (null == offset) { offset = 0L; } if (timeDelay != null) { // 為每個延遲級別創(chuàng)建定時(shí)任務(wù), // 第一次啟動任務(wù)延遲為FIRST_DELAY_TIME,也就是1秒 this.timer.schedule( new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 延遲10秒后每隔flushDelayOffsetInterval執(zhí)行一次任務(wù), // 其中,flushDelayOffsetInterval默認(rèn)配置也為10秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 持久化每個隊(duì)列消費(fèi)的偏移量 if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore .getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
遍歷所有延遲級別,根據(jù)延遲級別獲得對應(yīng)隊(duì)列的偏移量,如果偏移量不存在,則設(shè)置為0。然后為每個延遲級別創(chuàng)建定時(shí)任務(wù),第一次啟動任務(wù)延遲為1秒,第二次及以后的啟動任務(wù)延遲才是延遲級別相應(yīng)的延遲時(shí)間。
然后,又創(chuàng)建了一個定時(shí)任務(wù),用于持久化每個隊(duì)列消費(fèi)的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進(jìn)行配置,默認(rèn)為10秒。
定時(shí)任務(wù)
ScheduleMessageService的 start
方法執(zhí)行之后,每個延遲級別都創(chuàng)建自己的定時(shí)任務(wù),這里的定時(shí)任務(wù)的具體實(shí)現(xiàn)就在DeliverDelayedMessageTimerTask類之中,它核心代碼是executeOnTimeup方法之中,我們來看一下主要部分:
// 根據(jù)主題和隊(duì)列Id獲取消息隊(duì)列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue( TopicValidator.RMQ_SYS_SCHEDULE_TOPIC , delayLevel2QueueId(delayLevel));
如果沒有獲取到對應(yīng)的消息隊(duì)列,則在DELAY_FOR_A_WHILE(默認(rèn)為100)毫秒后再執(zhí)行任務(wù)。如果獲取到了,就繼續(xù)執(zhí)行下面操作:
// 根據(jù)消費(fèi)偏移量從消息隊(duì)列中獲取所有有效消息 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
如果沒有獲取到有效消息,則在DELAY_FOR_A_WHILE(默認(rèn)為100)毫秒后再執(zhí)行任務(wù)。如果獲取到了,就繼續(xù)執(zhí)行下面操作:
// 遍歷所有消息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 獲取消息的物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 獲取消息的物理長度 int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略部分代碼... long now = System.currentTimeMillis(); // 計(jì)算消息應(yīng)該被消費(fèi)的時(shí)間 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 計(jì)算下一條消息的偏移量 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE) long countdown = deliverTimestamp - now; // 省略部分代碼... }
如果當(dāng)前消息不到消費(fèi)的時(shí)間,則在 countdown
毫秒后再執(zhí)行任務(wù)。如果到消費(fèi)的時(shí)間,就繼續(xù)執(zhí)行下面操作:
// 根據(jù)消息的物理偏移量和大小獲取消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy);
如果獲取到消息,則繼續(xù)執(zhí)行下面操作:
// 重新構(gòu)建新的消息,包括: // 1.清除消息的延遲級別 // 2.恢復(fù)真正的消息主題和隊(duì)列Id MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}," + " discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } // 重新把消息發(fā)送到真正的消息隊(duì)列上 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner);
清除了消息的延遲級別,并且恢復(fù)了真正的消息主題和隊(duì)列Id,重新把消息發(fā)送到真正的消息隊(duì)列上以后,消費(fèi)者就可以立即消費(fèi)了。
總結(jié)
經(jīng)過以上對源碼的分析,可以總結(jié)出延遲消息的實(shí)現(xiàn)步驟:
如果消息的延遲級別大于0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊(duì)列Id為延遲級別減1。消息進(jìn)入SCHEDULE_TOPIC_XXXX的隊(duì)列中。定時(shí)任務(wù)根據(jù)上次拉取的偏移量不斷從隊(duì)列中取出所有消息。根據(jù)消息的物理偏移量和大小再次獲取消息。根據(jù)消息屬性重新創(chuàng)建消息,清除延遲級別,恢復(fù)原主題和隊(duì)列Id。重新發(fā)送消息到原主題的隊(duì)列中,供消費(fèi)者進(jìn)行消費(fèi)。
到此這篇關(guān)于Java中RocketMQ的延遲消息詳解的文章就介紹到這了,更多相關(guān)RocketMQ的延遲消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java靜態(tài)和非靜態(tài)成員變量初始化過程解析
這篇文章主要介紹了Java靜態(tài)和非靜態(tài)成員變量初始化過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01Java反射之Call stack introspection詳解
這篇文章主要介紹了Java反射之Call stack introspection詳解,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11Java實(shí)現(xiàn)字符數(shù)組全排列的方法
這篇文章主要介紹了Java實(shí)現(xiàn)字符數(shù)組全排列的方法,涉及Java針對字符數(shù)組的遍歷及排序算法的實(shí)現(xiàn)技巧,需要的朋友可以參考下2015-12-12struts1之簡單mvc示例_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了struts1之簡單mvc示例的相關(guān)資料,需要的朋友可以參考下2017-09-09在Java中避免NullPointerException的解決方案
這篇文章主要介紹了在Java中避免NullPointerException的解決方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04