RocketMQ?消息Message的結(jié)構(gòu)和使用方式詳解
?? RocketMQ 消息(Message)詳解
在 Apache RocketMQ 中,消息(Message) 是數(shù)據(jù)傳輸?shù)淖钚卧?,是生產(chǎn)者與消費(fèi)者之間通信的“載體”。理解 Message 的結(jié)構(gòu)、屬性、生命周期和使用方式,是掌握 RocketMQ 的核心基礎(chǔ)。
推薦閱讀:深入理解Apache RocketMQ 中Message 消息的核心概念
一、什么是 Message?
? 定義:
Message 是 RocketMQ 中封裝實際業(yè)務(wù)數(shù)據(jù)的對象,包含消息體(Body)和一系列元數(shù)據(jù)(如 Topic、Tag、Key、Properties 等),用于在生產(chǎn)者與消費(fèi)者之間傳遞信息。
類比:就像一封信,信紙是內(nèi)容(Body),信封上寫著收件人(Topic)、標(biāo)簽(Tag)、編號(Key)等信息。
二、Message 的核心結(jié)構(gòu)
一個 Message 對象主要由以下幾個部分組成:
| 字段 | 類型 | 是否必填 | 說明 |
|---|---|---|---|
| Topic | String | ? 必填 | 消息所屬的主題,用于路由和分類 |
| Body | byte[] | ? 必填 | 消息的實際內(nèi)容,通常為序列化后的 JSON、Protobuf 等 |
| Tags | String | ? 可選 | 子分類標(biāo)簽,用于消費(fèi)者過濾(如 CREATE, CANCEL) |
| Keys | String | ? 可選 | 消息的唯一鍵或業(yè)務(wù)主鍵(如訂單號),用于排查、索引 |
| Flag | int | ? 可選 | 消息標(biāo)志位(如是否壓縮) |
| DelayTimeLevel | int | ? 可選 | 延遲消息級別(1~18),實現(xiàn)定時投遞 |
| Properties | Map<String, String> | ? 可選 | 自定義屬性,RocketMQ 內(nèi)部也使用它存儲系統(tǒng)屬性 |
三、Message 各字段詳解
1.Topic(主題)
- 消息的邏輯分類,決定消息被發(fā)送到哪個隊列。
- 必須提前創(chuàng)建或允許自動創(chuàng)建。
- 示例:
ORDER_TOPIC,USER_LOG_TOPIC
new Message("ORDER_TOPIC", ...);
2.Body(消息體)
- 實際傳輸?shù)臄?shù)據(jù),必須是字節(jié)數(shù)組。
- 通常通過 JSON、Protobuf、Hessian 等序列化框架編碼。
String content = "{\"orderId\":\"1001\",\"userId\":10086}";
Message msg = new Message(topic, tag, content.getBytes(StandardCharsets.UTF_8));
?? 注意:
- 單條消息大小默認(rèn)最大 4MB(可配置)
- 過大消息會影響性能,建議拆分或使用外部存儲(如上傳文件后傳 URL)
3.Tags(標(biāo)簽)
- 用于對同一 Topic 下的消息進(jìn)行二次分類。
- 消費(fèi)者可通過
subscribe("Topic", "TagA || TagB")進(jìn)行過濾。
// 發(fā)送
new Message("ORDER_TOPIC", "CREATE", "創(chuàng)建訂單".getBytes());
new Message("ORDER_TOPIC", "PAY", "支付完成".getBytes());
// 訂閱 CREATE 類型消息
consumer.subscribe("ORDER_TOPIC", "CREATE");? 優(yōu)勢:輕量級過濾,避免消費(fèi)者接收無關(guān)消息。
?? 注意:Tags 是字符串匹配,不支持正則(但支持
*通配和||多選)
4.Keys(消息鍵)
- 為消息設(shè)置唯一標(biāo)識或業(yè)務(wù)主鍵(如訂單號、用戶ID)。
- 支持通過
mqadmin queryMsgByKey命令查詢消息。 - 支持索引,便于問題排查。
Message msg = new Message(...);
msg.setKeys("ORDER_20240501001");
? 建議:關(guān)鍵業(yè)務(wù)消息務(wù)必設(shè)置 Keys,便于追蹤。
5.Properties(屬性)
- 鍵值對形式的擴(kuò)展字段,可用于:
- 存儲自定義上下文(如 traceId、tenantId)
- RocketMQ 內(nèi)部使用(如
RECONSUME_TIME、DELAY、TRAN_MSG)
msg.putUserProperty("traceId", "abc123");
msg.putUserProperty("source", "web");
?? 注意:系統(tǒng)屬性以
PREFIX_SYS_PROP開頭,不要沖突。
6.DelayTimeLevel(延遲級別)
- 設(shè)置消息延遲投遞時間,實現(xiàn)“定時任務(wù)”功能。
- 取值范圍:1~18,對應(yīng)不同延遲時間:
| 級別 | 時間 |
|---|---|
| 1 | 1s |
| 2 | 5s |
| 3 | 10s |
| 4 | 30s |
| 5 | 1m |
| 6 | 2m |
| 7 | 3m |
| 8 | 4m |
| 9 | 5m |
| 10 | 6m |
| 11 | 7m |
| 12 | 8m |
| 13 | 9m |
| 14 | 10m |
| 15 | 20m |
| 16 | 30m |
| 17 | 1h |
| 18 | 2h |
Message msg = new Message("DELAY_TOPIC", "TAG", "延遲消息".getBytes());
msg.setDelayTimeLevel(5); // 延遲1分鐘
producer.send(msg);
?? 注意:延遲消息不保證精確時間,存在輕微誤差。
四、Message 的生命周期
1. 生產(chǎn)者創(chuàng)建 Message 對象 ↓ 2. 發(fā)送到 Broker(寫入 CommitLog) ↓ 3. 構(gòu)建 ConsumeQueue 和 IndexFile ↓ 4. 消費(fèi)者拉取消息(根據(jù) Topic + Queue) ↓ 5. 處理成功 → 提交 Offset ↓ 6. 消息過期(默認(rèn) 72 小時)→ 被刪除
? 消息是持久化存儲的,即使消費(fèi)者未上線,消息也不會丟失。
五、Message 的存儲機(jī)制
雖然 Message 是邏輯對象,但在 Broker 端有嚴(yán)格的物理存儲結(jié)構(gòu):
1.CommitLog
- 所有消息按到達(dá)順序追加寫入 CommitLog 文件(順序?qū)懀咝阅埽?/li>
- 每個消息包含:Topic、Queue、Body、Properties 等完整信息
2.ConsumeQueue
- 每個 Topic 的每個 MessageQueue 對應(yīng)一個 ConsumeQueue
- 存儲消息的邏輯偏移量、大小、物理位置,用于快速定位消息
ConsumeQueue/{Topic}/{QueueId}/
├── 00000000000000000000
└── ...
3.IndexFile
- 可選索引文件,支持通過 Keys 或時間范圍 查詢消息
- 用于排查問題(如“查找某個訂單的消息”)
IndexFile/index_1714567890000
六、Message 的發(fā)送方式回顧
| 方式 | 說明 |
|---|---|
| 同步發(fā)送 | 阻塞等待結(jié)果,適用于關(guān)鍵消息 |
| 異步發(fā)送 | 回調(diào)通知結(jié)果,高吞吐場景 |
| 單向發(fā)送 | 不關(guān)心結(jié)果,日志類消息 |
| 事務(wù)消息 | 半消息 + 本地事務(wù) + 提交/回滾 |
所有方式發(fā)送的都是
Message對象。
七、最佳實踐與注意事項
| 實踐 | 說明 |
|---|---|
| ? 設(shè)置 Topic 和 Tag | 合理分類,便于管理和過濾 |
| ? 關(guān)鍵消息設(shè)置 Keys | 便于通過 mqadmin 查詢 |
| ? 控制 Body 大小 | ≤ 4MB,避免影響性能 |
| ? 使用 UTF-8 編碼 | 防止亂碼 |
| ? 避免空 Body | 可能導(dǎo)致異常 |
| ? 合理使用延遲消息 | 替代部分定時任務(wù),但不要濫用 |
? 自定義屬性用 putUserProperty | 避免覆蓋系統(tǒng)屬性 |
八、常見問題排查
| 問題 | 原因 | 解決方案 |
|---|---|---|
MessageExt is null | 拉取超時或無消息 | 正常現(xiàn)象,重試即可 |
msg put message to store error | 消息過大或磁盤滿 | 檢查大小限制和磁盤空間 |
| 延遲消息未按時投遞 | 時間誤差或 Broker 壓力大 | 接受輕微延遲,或使用外部調(diào)度系統(tǒng) |
| 通過 Key 查不到消息 | IndexFile 未生成或已過期 | 檢查 messageIndexEnable 配置 |
| 消息重復(fù) | 網(wǎng)絡(luò)重試、Rebalance | 消費(fèi)者做冪等處理 |
? 總結(jié):Message 核心要點(diǎn)
| 維度 | 說明 |
|---|---|
| 角色 | 消息傳輸?shù)幕締卧?/td> |
| 組成 | Topic + Body + Tags + Keys + Properties + Delay |
| 大小限制 | 默認(rèn) ≤ 4MB |
| 存儲方式 | 順序?qū)?CommitLog,索引通過 ConsumeQueue 和 IndexFile |
| 可查詢性 | 支持按 Key、時間、Offset 查詢 |
| 擴(kuò)展性 | 支持自定義屬性,靈活傳遞上下文 |
| 高級功能 | 支持延遲、事務(wù)、順序消息 |
?? 一句話總結(jié):
Message 是 RocketMQ 的“數(shù)據(jù)包” —— 它不僅是業(yè)務(wù)數(shù)據(jù)的載體,更是路由、過濾、追蹤、延遲、事務(wù)等功能的基礎(chǔ)。
設(shè)計好 Message 的結(jié)構(gòu)與屬性,才能讓消息系統(tǒng)真正高效、可靠、易維護(hù)。
掌握 Message,你就掌握了 RocketMQ 的“語言”。
到此這篇關(guān)于RocketMQ 消息Message的結(jié)構(gòu)和使用方式詳解的文章就介紹到這了,更多相關(guān)RocketMQ 消息Message內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)
這篇文章主要為大家介紹了Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Mybatis 中Mapper使用package方式配置報錯的解決方案
這篇文章主要介紹了Mybatis 中Mapper使用package方式配置報錯的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
OpenFeign服務(wù)接口調(diào)用的過程詳解
Feign是一個聲明式WebService客戶端。使用Feign能讓編寫Web?Service客戶端更加簡單。它的使用方法是定義一個服務(wù)接口然后在上面添加注解,這篇文章主要介紹了OpenFeign服務(wù)接口調(diào)用,需要的朋友可以參考下2022-10-10
Java實現(xiàn)時間戳轉(zhuǎn)代碼運(yùn)行時長
這篇文章主要為大家詳細(xì)介紹了如何使用Java實現(xiàn)時間戳轉(zhuǎn)代碼運(yùn)行時長功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-06-06
net.sf.json.JSONObject 為null 的判斷方法
下面小編就為大家?guī)硪黄猲et.sf.json.JSONObject 為null 的判斷方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02
Java 高并發(fā)十: JDK8對并發(fā)的新支持詳解
本文主要介紹Java 高并發(fā)JDK8的支持,這里整理了詳細(xì)的資料及1. LongAdder 2. CompletableFuture 3. StampedLock的介紹,有興趣的小伙伴可以參考下2016-09-09

