欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ?消息Message的結(jié)構(gòu)和使用方式詳解

 更新時間:2025年08月01日 10:46:20   作者:csdn_tom_168  
Message是RocketMQ的數(shù)據(jù)包,它不僅是業(yè)務(wù)數(shù)據(jù)的載體,更是路由、過濾、追蹤、延遲、事務(wù)等功能的基礎(chǔ),掌握Message,你就掌握了RocketMQ的語言,本文給大家介紹什么是?Message及理解Message的結(jié)構(gòu)、屬性、生命周期和使用方式,感興趣的朋友一起看看吧

?? RocketMQ 消息(Message)詳解

在 Apache RocketMQ 中,消息(Message) 是數(shù)據(jù)傳輸?shù)淖钚卧?,是生產(chǎn)者與消費者之間通信的“載體”。理解 Message 的結(jié)構(gòu)、屬性、生命周期和使用方式,是掌握 RocketMQ 的核心基礎(chǔ)。

推薦閱讀:深入理解Apache RocketMQ 中Message 消息的核心概念

一、什么是 Message?

? 定義:

Message 是 RocketMQ 中封裝實際業(yè)務(wù)數(shù)據(jù)的對象,包含消息體(Body)和一系列元數(shù)據(jù)(如 Topic、Tag、Key、Properties 等),用于在生產(chǎn)者與消費者之間傳遞信息。

類比:就像一封信,信紙是內(nèi)容(Body),信封上寫著收件人(Topic)、標(biāo)簽(Tag)、編號(Key)等信息。

二、Message 的核心結(jié)構(gòu)

一個 Message 對象主要由以下幾個部分組成:

字段類型是否必填說明
TopicString? 必填消息所屬的主題,用于路由和分類
Bodybyte[]? 必填消息的實際內(nèi)容,通常為序列化后的 JSON、Protobuf 等
TagsString? 可選子分類標(biāo)簽,用于消費者過濾(如 CREATE, CANCEL
KeysString? 可選消息的唯一鍵或業(yè)務(wù)主鍵(如訂單號),用于排查、索引
Flagint? 可選消息標(biāo)志位(如是否壓縮)
DelayTimeLevelint? 可選延遲消息級別(1~18),實現(xiàn)定時投遞
PropertiesMap<String, String>? 可選自定義屬性,RocketMQ 內(nèi)部也使用它存儲系統(tǒng)屬性

三、Message 各字段詳解

1.Topic(主題)

  • 消息的邏輯分類,決定消息被發(fā)送到哪個隊列。
  • 必須提前創(chuàng)建或允許自動創(chuàng)建。
  • 示例:ORDER_TOPIC, USER_LOG_TOPIC
new Message("ORDER_TOPIC", ...);

2.Body(消息體)

  • 實際傳輸?shù)臄?shù)據(jù),必須是字節(jié)數(shù)組。
  • 通常通過 JSON、Protobuf、Hessian 等序列化框架編碼。
String content = "{\"orderId\":\"1001\",\"userId\":10086}";
Message msg = new Message(topic, tag, content.getBytes(StandardCharsets.UTF_8));

?? 注意:

  • 單條消息大小默認(rèn)最大 4MB(可配置)
  • 過大消息會影響性能,建議拆分或使用外部存儲(如上傳文件后傳 URL)

3.Tags(標(biāo)簽)

  • 用于對同一 Topic 下的消息進行二次分類。
  • 消費者可通過 subscribe("Topic", "TagA || TagB") 進行過濾。
// 發(fā)送
new Message("ORDER_TOPIC", "CREATE", "創(chuàng)建訂單".getBytes());
new Message("ORDER_TOPIC", "PAY", "支付完成".getBytes());
// 訂閱 CREATE 類型消息
consumer.subscribe("ORDER_TOPIC", "CREATE");

? 優(yōu)勢:輕量級過濾,避免消費者接收無關(guān)消息。

?? 注意:Tags 是字符串匹配,不支持正則(但支持 * 通配和 || 多選)

4.Keys(消息鍵)

  • 為消息設(shè)置唯一標(biāo)識或業(yè)務(wù)主鍵(如訂單號、用戶ID)。
  • 支持通過 mqadmin queryMsgByKey 命令查詢消息。
  • 支持索引,便于問題排查。
Message msg = new Message(...);
msg.setKeys("ORDER_20240501001");

? 建議:關(guān)鍵業(yè)務(wù)消息務(wù)必設(shè)置 Keys,便于追蹤。

5.Properties(屬性)

  • 鍵值對形式的擴展字段,可用于:
    • 存儲自定義上下文(如 traceId、tenantId)
    • RocketMQ 內(nèi)部使用(如 RECONSUME_TIMEDELAY、TRAN_MSG
msg.putUserProperty("traceId", "abc123");
msg.putUserProperty("source", "web");

?? 注意:系統(tǒng)屬性以 PREFIX_SYS_PROP 開頭,不要沖突。

6.DelayTimeLevel(延遲級別)

  • 設(shè)置消息延遲投遞時間,實現(xiàn)“定時任務(wù)”功能。
  • 取值范圍:1~18,對應(yīng)不同延遲時間:
級別時間
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h
Message msg = new Message("DELAY_TOPIC", "TAG", "延遲消息".getBytes());
msg.setDelayTimeLevel(5); // 延遲1分鐘
producer.send(msg);

?? 注意:延遲消息不保證精確時間,存在輕微誤差。

四、Message 的生命周期

1. 生產(chǎn)者創(chuàng)建 Message 對象
   ↓
2. 發(fā)送到 Broker(寫入 CommitLog)
   ↓
3. 構(gòu)建 ConsumeQueue 和 IndexFile
   ↓
4. 消費者拉取消息(根據(jù) Topic + Queue)
   ↓
5. 處理成功 → 提交 Offset
   ↓
6. 消息過期(默認(rèn) 72 小時)→ 被刪除

? 消息是持久化存儲的,即使消費者未上線,消息也不會丟失。

五、Message 的存儲機制

雖然 Message 是邏輯對象,但在 Broker 端有嚴(yán)格的物理存儲結(jié)構(gòu):

1.CommitLog

  • 所有消息按到達順序追加寫入 CommitLog 文件(順序?qū)?,高性能?/li>
  • 每個消息包含:Topic、Queue、Body、Properties 等完整信息

2.ConsumeQueue

  • 每個 Topic 的每個 MessageQueue 對應(yīng)一個 ConsumeQueue
  • 存儲消息的邏輯偏移量、大小、物理位置,用于快速定位消息
ConsumeQueue/{Topic}/{QueueId}/
   ├── 00000000000000000000
   └── ...

3.IndexFile

  • 可選索引文件,支持通過 Keys 或時間范圍 查詢消息
  • 用于排查問題(如“查找某個訂單的消息”)
IndexFile/index_1714567890000

六、Message 的發(fā)送方式回顧

方式說明
同步發(fā)送阻塞等待結(jié)果,適用于關(guān)鍵消息
異步發(fā)送回調(diào)通知結(jié)果,高吞吐場景
單向發(fā)送不關(guān)心結(jié)果,日志類消息
事務(wù)消息半消息 + 本地事務(wù) + 提交/回滾

所有方式發(fā)送的都是 Message 對象。

七、最佳實踐與注意事項

實踐說明
? 設(shè)置 Topic 和 Tag合理分類,便于管理和過濾
? 關(guān)鍵消息設(shè)置 Keys便于通過 mqadmin 查詢
? 控制 Body 大小≤ 4MB,避免影響性能
? 使用 UTF-8 編碼防止亂碼
? 避免空 Body可能導(dǎo)致異常
? 合理使用延遲消息替代部分定時任務(wù),但不要濫用
? 自定義屬性用 putUserProperty避免覆蓋系統(tǒng)屬性

八、常見問題排查

問題原因解決方案
MessageExt is null拉取超時或無消息正?,F(xiàn)象,重試即可
msg put message to store error消息過大或磁盤滿檢查大小限制和磁盤空間
延遲消息未按時投遞時間誤差或 Broker 壓力大接受輕微延遲,或使用外部調(diào)度系統(tǒng)
通過 Key 查不到消息IndexFile 未生成或已過期檢查 messageIndexEnable 配置
消息重復(fù)網(wǎng)絡(luò)重試、Rebalance消費者做冪等處理

? 總結(jié):Message 核心要點

維度說明
角色消息傳輸?shù)幕締卧?/td>
組成Topic + Body + Tags + Keys + Properties + Delay
大小限制默認(rèn) ≤ 4MB
存儲方式順序?qū)?CommitLog,索引通過 ConsumeQueue 和 IndexFile
可查詢性支持按 Key、時間、Offset 查詢
擴展性支持自定義屬性,靈活傳遞上下文
高級功能支持延遲、事務(wù)、順序消息

?? 一句話總結(jié):

Message 是 RocketMQ 的“數(shù)據(jù)包” —— 它不僅是業(yè)務(wù)數(shù)據(jù)的載體,更是路由、過濾、追蹤、延遲、事務(wù)等功能的基礎(chǔ)。
設(shè)計好 Message 的結(jié)構(gòu)與屬性,才能讓消息系統(tǒng)真正高效、可靠、易維護。

掌握 Message,你就掌握了 RocketMQ 的“語言”。

到此這篇關(guān)于RocketMQ 消息Message的結(jié)構(gòu)和使用方式詳解的文章就介紹到這了,更多相關(guān)RocketMQ 消息Message內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)

    Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)

    這篇文章主要為大家介紹了Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-05-05
  • Mybatis 中Mapper使用package方式配置報錯的解決方案

    Mybatis 中Mapper使用package方式配置報錯的解決方案

    這篇文章主要介紹了Mybatis 中Mapper使用package方式配置報錯的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • OpenFeign服務(wù)接口調(diào)用的過程詳解

    OpenFeign服務(wù)接口調(diào)用的過程詳解

    Feign是一個聲明式WebService客戶端。使用Feign能讓編寫Web?Service客戶端更加簡單。它的使用方法是定義一個服務(wù)接口然后在上面添加注解,這篇文章主要介紹了OpenFeign服務(wù)接口調(diào)用,需要的朋友可以參考下
    2022-10-10
  • Java使用BigDecimal解決小數(shù)計算問題

    Java使用BigDecimal解決小數(shù)計算問題

    Java中的BigDecimal是一個內(nèi)置類,用于精確表示任意大小的十進制數(shù),它提供了一種處理浮點運算精度問題的方法,特別適合金融、貨幣交易等需要高精度計算的場景,本文給大家介紹了java中如何使用BigDecimal解決小數(shù)計算問題,需要的朋友可以參考下
    2024-08-08
  • Java實現(xiàn)時間戳轉(zhuǎn)代碼運行時長

    Java實現(xiàn)時間戳轉(zhuǎn)代碼運行時長

    這篇文章主要為大家詳細(xì)介紹了如何使用Java實現(xiàn)時間戳轉(zhuǎn)代碼運行時長功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2025-06-06
  • Java使用Socket簡單通訊詳解

    Java使用Socket簡單通訊詳解

    這篇文章主要介紹了Java使用Socket簡單通訊詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • net.sf.json.JSONObject 為null 的判斷方法

    net.sf.json.JSONObject 為null 的判斷方法

    下面小編就為大家?guī)硪黄猲et.sf.json.JSONObject 為null 的判斷方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-02-02
  • Java 高并發(fā)十: JDK8對并發(fā)的新支持詳解

    Java 高并發(fā)十: JDK8對并發(fā)的新支持詳解

    本文主要介紹Java 高并發(fā)JDK8的支持,這里整理了詳細(xì)的資料及1. LongAdder 2. CompletableFuture 3. StampedLock的介紹,有興趣的小伙伴可以參考下
    2016-09-09
  • Json在Struts中的轉(zhuǎn)換與傳遞方法

    Json在Struts中的轉(zhuǎn)換與傳遞方法

    下面小編就為大家?guī)硪黄狫son在Struts中的轉(zhuǎn)換與傳遞方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-11-11
  • JAVA日常開發(fā)中讀寫XML的方法詳解

    JAVA日常開發(fā)中讀寫XML的方法詳解

    這篇文章主要介紹了JAVA日常開發(fā)中讀寫XML的相關(guān)資料,詳細(xì)講解了在Java中如何使用DOM(文檔對象模型)和SAX(簡單API?for?XML)兩種方式讀取XML文件,以及如何使用DOM和JAXB(Java?Architecture?for?XML?Binding)兩種方式寫入XML文件,需要的朋友可以參考下
    2024-12-12

最新評論