欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ延遲消息超詳細講解

 更新時間:2023年02月14日 08:30:21   作者:每天都要進步一點點  
延時消息是指發(fā)送到 RocketMQ 后不會馬上被消費者拉取到,而是等待固定的時間,才能被消費者拉取到。延時消息的使用場景很多,比如電商場景下關(guān)閉超時未支付的訂單,某些場景下需要在固定時間后發(fā)送提示消息

一、什么是延時消息

當消息寫入到Broker后,不能立刻被消費者消費,需要等待指定的時長后才可被消費處理的消息,稱為延時消息。

二、延時消息等級

RocketMQ延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的。默認支持18個等級的延遲消息,延時等級定義在RocketMQ服務(wù)端的MessageStoreConfig類中的如下變量中:

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//發(fā)消息時,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:
level == 0,消息為非延遲消息
1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
level > maxLevel,則level== maxLevel,例如level==20,延遲2h

例如指定的延時等級為3,則表示延遲時長為10s,即延遲等級是從1開始計數(shù)的。

三、延時消息使用場景

采用RocketMQ的延時消息可以實現(xiàn)定時任務(wù)的功能,而無需使用定時器。使用場景主要有:

(1)、電商交易系統(tǒng)的訂單超時未支付,自動取消訂單

在電商交易系統(tǒng)中,像淘寶、京東,我們提交了一個訂單之后,在支付時都會提示,需要在指定時間內(nèi)(例如30分鐘)完成支付,否則訂單將被取消的消息,實際上這個超時未支付功能就可以使用延時消息來實現(xiàn)。在下單成功之后,就發(fā)送一個延時消息,然后指定消息的延時時間為30分鐘,這條消息將會在30分鐘后投遞給后臺業(yè)務(wù)系統(tǒng)(Consumer),此時才能被消費者進行消費,消費消息的時候會再去檢查這個訂單的狀態(tài),確認下是否支付成功,如果支付成功,則忽略不處理;如果訂單還是未支付,則進行取消訂單、釋放庫存等操作;

(2)、活動場景

比如B站視頻投稿經(jīng)常會發(fā)起一些活動,Up主在活動期間可以按照活動規(guī)則投稿視頻,在活動時間截止后,后臺根據(jù)Up主完成任務(wù)的情況以及結(jié)合投稿視頻的播放量等進行判定,然后派發(fā)對應(yīng)的獎勵。這種場景我們也可以采用延時消息來實現(xiàn),即在發(fā)起活動后,同時發(fā)送一條延時消息,延時時間設(shè)置為本次活動周期的時間。當活動結(jié)束后,這條延時消息剛好可以被消費者進行消費,這樣就可以消費消息然后執(zhí)行一系列的邏輯處理。

(3)、其它信息提醒等場景;

四、延時消息示例

(1)、編寫Consumer消費端并啟動,等待接收Producer發(fā)送過來的消息

/**
 * 消息消費者
 */
public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費者名稱
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
        mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");
        // 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
        // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費
        mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 設(shè)置消費模型,集群還是廣播,默認為集群
        mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 消費者最小線程量
        mqPushConsumer.setConsumeThreadMin(5);
        // 消費者最大線程量
        mqPushConsumer.setConsumeThreadMax(10);
        // 設(shè)置一次消費消息的條數(shù),默認是1
        mqPushConsumer.setConsumeMessageBatchMaxSize(1);
        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息,如果訂閱該主題下的所有tag,則使用*
        mqPushConsumer.subscribe("DelayTopic", "*");
        // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息
        mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            // 監(jiān)聽類實現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù)
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt message : msgList) {
                    String body = new String(message.getBody(), StandardCharsets.UTF_8);
                    System.out.println("消費者接收到消息: " + message.toString() + "---消息內(nèi)容為:" + body + "消息被消費時間:" + new Date(System.currentTimeMillis()) + ", 消息存儲時間: " + new Date(message.getBornTimestamp()));
                }
                // 標記該消息已經(jīng)被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        mqPushConsumer.start();
    }
}

(2)、編寫Producer生產(chǎn)端,發(fā)送延時消息

RocketMQ要實現(xiàn)發(fā)送延遲消息,只需在發(fā)送消息之前調(diào)用Message#setDelayTimeLevel()方法設(shè)置消息的延遲等級即可。

只需要設(shè)置一個延遲級別即可,注意不是具體的延遲時間。如果設(shè)置的延遲級別超過最大值,那么將會重置為最大值。

/**
 * Producer端發(fā)送延遲消息:只需在發(fā)送消息之前調(diào)用setDelayTimeLevel()方法設(shè)置消息的延遲等級即可
 */
public class SyncMQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱
        DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
        mqProducer.setNamesrvAddr("10.0.90.86:9876");
        // 消息最大長度 默認4M
        mqProducer.setMaxMessageSize(4096);
        // 發(fā)送消息超時時間,默認3000
        mqProducer.setSendMsgTimeout(3000);
        // 發(fā)送消息失敗重試次數(shù),默認2
        mqProducer.setRetryTimesWhenSendAsyncFailed(2);
        // 啟動消息生產(chǎn)者
        mqProducer.start();
        // 創(chuàng)建消息,并指定Topic(主題),Tag(標簽)和消息內(nèi)容
        Message message = new Message("DelayTopic", "", "hello, 這是延遲消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 設(shè)置延時等級為3, 所以這個消息將在10s之后發(fā)送, RocketMQ目前只支持固定的幾個延時時間,還不支持自定義延遲時間
        message.setDelayTimeLevel(3);
        // 發(fā)送同步消息到一個Broker,可以通過sendResult返回消息是否成功送達
        SendResult sendResult = mqProducer.send(message);
        System.out.println(sendResult);
        // 如果不再發(fā)送消息,關(guān)閉Producer實例
        mqProducer.shutdown();
    }
}

(3)、啟動Producer

SendResult [sendStatus=SEND_OK, msgId=AC6E005A51B018B4AAC278E9F6F70000, offsetMsgId=0A005A5600002A9F0000000000003465, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=0], queueOffset=3]

從控制臺可以看到,消息發(fā)送狀態(tài)為SEND_OK,說明延遲消息已經(jīng)成功發(fā)送到RocketMQ Broker中。

(4)、觀察Consumer是否接收到消息

消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=241, queueOffset=1, sysFlag=0, bornTimestamp=1645673399032, bornHost=/10.0.90.115:62807, storeTimestamp=1645673403365, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000355F, commitLogOffset=13663, bodyCRC=676533924, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DelayTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=DelayTopic, MAX_OFFSET=2, CONSUME_START_TIME=1645673409075, UNIQ_KEY=AC6E005A51B018B4AAC278E9F6F70000, CLUSTER=DefaultCluster, DELAY=3, WAIT=true, REAL_QID=0}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -27, -69, -74, -24, -65, -97, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是延遲消息消息被消費時間:Thu Feb 24 11:30:09 CST 2022, 消息存儲時間: Thu Feb 24 11:29:59 CST 2022

可以看到,延遲消息成功被消息,并且我們注意到,消息被Consumer消費的時間【Thu Feb 24 11:30:09 CST 2022】 - 消息存儲時間【Thu Feb 24 11:29:59 CST 2022】 = 10s,發(fā)送消息的時候,指定的延遲等級也是10s,也就是消息的消費比存儲時間晚10秒。

五、延時消息實現(xiàn)原理

RocketMQ延時消息會暫存在名為SCHEDULE_TOPIC_XXXX的Topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費。broker會調(diào)度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。

SCHEDULE_TOPIC_XXXX中consumequeue中的文件夾名稱就是隊列的名稱,并且【隊列名稱 = 延遲等級 - 1】;如下圖,在前面的例子中,我們執(zhí)定消息的延遲時間為10s,對應(yīng)的延遲等級是3,所以文件夾名稱為【3 - 1 = 2】

延遲消息在RocketMQ Broker端的流轉(zhuǎn)如下圖所示:

主要包含以下6個步驟:

(1)、修改消息Topic名稱和隊列信息

RocketMQ Broker端在存儲生產(chǎn)者寫入的消息時,首先都會將其寫入到CommitLog中。之后根據(jù)消息中的Topic信息和隊列信息,將其轉(zhuǎn)發(fā)到目標Topic的指定隊列(ConsumeQueue)中。

由于消息一旦存儲到ConsumeQueue中,消費者就能消費到,而延遲消息不能被立即消費,所以這里將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級別確定要投遞到哪個隊列下。同時,還會將消息原來要發(fā)送到的目標Topic和隊列信息存儲到消息的屬性中。

(2)、轉(zhuǎn)發(fā)消息到延遲主題SCHEDULE_TOPIC_XXXX的CosumeQueue中

CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進行的。在轉(zhuǎn)發(fā)過程中,會對延遲消息進行特殊處理,主要是計算這條延遲消息需要在什么時候進行投遞。

投遞時間 = 消息存儲時間(storeTimestamp) + 延遲級別對應(yīng)的時間

需要注意的是,會將計算出的投遞時間當做消息Tag的哈希值存儲到CosumeQueue中,CosumeQueue單個存儲單元組成結(jié)構(gòu)如下圖所示:

其中:

  • Commit Log Offset:記錄在CommitLog中的位置;
  • Size:記錄消息的大??;
  • Message Tag HashCode:記錄消息Tag的哈希值,用于消息過濾。特別的,對于延遲消息,這個字段記錄的是消息的投遞時間戳。這也是為什么java中hashCode方法返回一個int型,只占用4個字節(jié),而這里Message Tag HashCode字段卻設(shè)計成8個字節(jié)的原因;

(3)、延遲服務(wù)消費SCHEDULE_TOPIC_XXXX消息

Broker內(nèi)部有一個ScheduleMessageService類,其充當延遲服務(wù),主要是消費SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標Topic中。

ScheduleMessageService在啟動時,其會創(chuàng)建一個定時器Timer,并根據(jù)延遲級別的個數(shù),啟動對應(yīng)數(shù)量的TimerTask,每個TimerTask負責一個延遲級別的消費與投遞。

需要注意的是,每個TimeTask在檢查消息是否到期時,首先檢查對應(yīng)隊列中尚未投遞第一條消息,如果這條消息沒到期,那么之后的消息都不會檢查。如果到期了,則進行投遞,并檢查之后的消息是否到期。

(4)、將信息重新存儲到CommitLog中

在將消息到期后,需要投遞到目標Topic。由于在第一步已經(jīng)記錄了原來的Topic和隊列信息,因此這里重新設(shè)置,再存儲到CommitLog即可。此外,由于之前Message Tag HashCode字段存儲的是消息的投遞時間,這里需要重新計算tag的哈希值后再存儲。

(5)、將消息投遞到目標Topic中

這一步與第二步類似,不過由于消息的Topic名稱已經(jīng)改為了目標Topic。因此消息會直接投遞到目標Topic的ConsumeQueue中,之后消費者即消費到這條消息。

(6)、消費者消費目標topic中的數(shù)據(jù)。

到此這篇關(guān)于RocketMQ延遲消息超詳細講解的文章就介紹到這了,更多相關(guān)RocketMQ延遲消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java Lambda 表達式源碼解析

    Java Lambda 表達式源碼解析

    這篇文章主要介紹了Java Lambda在JVM中是如何實現(xiàn)的,感興趣的小伙伴一起來了解了解
    2021-08-08
  • Java swing實現(xiàn)酒店管理系統(tǒng)

    Java swing實現(xiàn)酒店管理系統(tǒng)

    這篇文章主要為大家詳細介紹了Java swing實現(xiàn)酒店管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-02-02
  • Mybatis-plus如何提前獲取實體類用雪花算法生成的ID

    Mybatis-plus如何提前獲取實體類用雪花算法生成的ID

    本文主要介紹了Mybatis-plus如何提前獲取實體類用雪花算法生成的ID,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • Spring Security 實現(xiàn)用戶名密碼登錄流程源碼詳解

    Spring Security 實現(xiàn)用戶名密碼登錄流程源碼詳解

    在服務(wù)端的安全管理使用了Spring Security,用戶登錄成功之后,Spring Security幫你把用戶信息保存在Session里,但是具體保存在哪里,要是不深究你可能就不知道,今天小編就帶大家具體了解一下Spring Security實現(xiàn)用戶名密碼登錄的流程
    2021-11-11
  • SpringBoot使用GTS的示例詳解

    SpringBoot使用GTS的示例詳解

    這篇文章主要介紹了SpringBoot使用GTS的示例詳解,代碼簡單易懂,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-10-10
  • Java 將Excel轉(zhuǎn)為SVG的方法

    Java 將Excel轉(zhuǎn)為SVG的方法

    本文以Java示例展示如何將Excel文檔轉(zhuǎn)為SVG格式。通過本文中的方法,在將Excel轉(zhuǎn)為SVG時,如果sheet工作表中手動設(shè)置了分頁,則將每個分頁的內(nèi)容單獨保存為一個svg文件,如果sheet工作表中沒有設(shè)置分頁,則將Excel sheet表格中默認的分頁范圍保存為svg。
    2021-05-05
  • 詳解Springboot分布式限流實踐

    詳解Springboot分布式限流實踐

    這篇文章主要介紹了詳解Springboot分布式限流實踐 ,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-06-06
  • springboot返回前端中文亂碼的解決

    springboot返回前端中文亂碼的解決

    這篇文章主要介紹了springboot返回前端中文亂碼的解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-09-09
  • SpringBoot的@RestControllerAdvice作用詳解

    SpringBoot的@RestControllerAdvice作用詳解

    這篇文章主要介紹了SpringBoot的@RestControllerAdvice作用詳解,@RestContrllerAdvice是一種組合注解,由@ControllerAdvice,@ResponseBody組成,本質(zhì)上就是@Component,需要的朋友可以參考下
    2024-01-01
  • groovy腳本定義結(jié)構(gòu)表一鍵生成POJO類

    groovy腳本定義結(jié)構(gòu)表一鍵生成POJO類

    這篇文章主要為大家介紹了groovy腳本定義結(jié)構(gòu)表一鍵生成POJO類示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-03-03

最新評論