RocketMQ?消息Message的結(jié)構(gòu)和使用方式詳解
?? RocketMQ 消息(Message)詳解
在 Apache RocketMQ 中,消息(Message) 是數(shù)據(jù)傳輸?shù)淖钚卧?,是生產(chǎn)者與消費者之間通信的“載體”。理解 Message
的結(jié)構(gòu)、屬性、生命周期和使用方式,是掌握 RocketMQ 的核心基礎(chǔ)。
推薦閱讀:深入理解Apache RocketMQ 中Message 消息的核心概念
一、什么是 Message?
? 定義:
Message 是 RocketMQ 中封裝實際業(yè)務(wù)數(shù)據(jù)的對象,包含消息體(Body)和一系列元數(shù)據(jù)(如 Topic、Tag、Key、Properties 等),用于在生產(chǎn)者與消費者之間傳遞信息。
類比:就像一封信,信紙是內(nèi)容(Body),信封上寫著收件人(Topic)、標(biāo)簽(Tag)、編號(Key)等信息。
二、Message 的核心結(jié)構(gòu)
一個 Message
對象主要由以下幾個部分組成:
字段 | 類型 | 是否必填 | 說明 |
---|---|---|---|
Topic | String | ? 必填 | 消息所屬的主題,用于路由和分類 |
Body | byte[] | ? 必填 | 消息的實際內(nèi)容,通常為序列化后的 JSON、Protobuf 等 |
Tags | String | ? 可選 | 子分類標(biāo)簽,用于消費者過濾(如 CREATE , CANCEL ) |
Keys | String | ? 可選 | 消息的唯一鍵或業(yè)務(wù)主鍵(如訂單號),用于排查、索引 |
Flag | int | ? 可選 | 消息標(biāo)志位(如是否壓縮) |
DelayTimeLevel | int | ? 可選 | 延遲消息級別(1~18),實現(xiàn)定時投遞 |
Properties | Map<String, String> | ? 可選 | 自定義屬性,RocketMQ 內(nèi)部也使用它存儲系統(tǒng)屬性 |
三、Message 各字段詳解
1.Topic(主題)
- 消息的邏輯分類,決定消息被發(fā)送到哪個隊列。
- 必須提前創(chuàng)建或允許自動創(chuàng)建。
- 示例:
ORDER_TOPIC
,USER_LOG_TOPIC
new Message("ORDER_TOPIC", ...);
2.Body(消息體)
- 實際傳輸?shù)臄?shù)據(jù),必須是字節(jié)數(shù)組。
- 通常通過 JSON、Protobuf、Hessian 等序列化框架編碼。
String content = "{\"orderId\":\"1001\",\"userId\":10086}"; Message msg = new Message(topic, tag, content.getBytes(StandardCharsets.UTF_8));
?? 注意:
- 單條消息大小默認(rèn)最大 4MB(可配置)
- 過大消息會影響性能,建議拆分或使用外部存儲(如上傳文件后傳 URL)
3.Tags(標(biāo)簽)
- 用于對同一 Topic 下的消息進行二次分類。
- 消費者可通過
subscribe("Topic", "TagA || TagB")
進行過濾。
// 發(fā)送 new Message("ORDER_TOPIC", "CREATE", "創(chuàng)建訂單".getBytes()); new Message("ORDER_TOPIC", "PAY", "支付完成".getBytes()); // 訂閱 CREATE 類型消息 consumer.subscribe("ORDER_TOPIC", "CREATE");
? 優(yōu)勢:輕量級過濾,避免消費者接收無關(guān)消息。
?? 注意:Tags 是字符串匹配,不支持正則(但支持
*
通配和||
多選)
4.Keys(消息鍵)
- 為消息設(shè)置唯一標(biāo)識或業(yè)務(wù)主鍵(如訂單號、用戶ID)。
- 支持通過
mqadmin queryMsgByKey
命令查詢消息。 - 支持索引,便于問題排查。
Message msg = new Message(...); msg.setKeys("ORDER_20240501001");
? 建議:關(guān)鍵業(yè)務(wù)消息務(wù)必設(shè)置 Keys,便于追蹤。
5.Properties(屬性)
- 鍵值對形式的擴展字段,可用于:
- 存儲自定義上下文(如 traceId、tenantId)
- RocketMQ 內(nèi)部使用(如
RECONSUME_TIME
、DELAY
、TRAN_MSG
)
msg.putUserProperty("traceId", "abc123"); msg.putUserProperty("source", "web");
?? 注意:系統(tǒng)屬性以
PREFIX_SYS_PROP
開頭,不要沖突。
6.DelayTimeLevel(延遲級別)
- 設(shè)置消息延遲投遞時間,實現(xiàn)“定時任務(wù)”功能。
- 取值范圍:1~18,對應(yīng)不同延遲時間:
級別 | 時間 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
Message msg = new Message("DELAY_TOPIC", "TAG", "延遲消息".getBytes()); msg.setDelayTimeLevel(5); // 延遲1分鐘 producer.send(msg);
?? 注意:延遲消息不保證精確時間,存在輕微誤差。
四、Message 的生命周期
1. 生產(chǎn)者創(chuàng)建 Message 對象 ↓ 2. 發(fā)送到 Broker(寫入 CommitLog) ↓ 3. 構(gòu)建 ConsumeQueue 和 IndexFile ↓ 4. 消費者拉取消息(根據(jù) Topic + Queue) ↓ 5. 處理成功 → 提交 Offset ↓ 6. 消息過期(默認(rèn) 72 小時)→ 被刪除
? 消息是持久化存儲的,即使消費者未上線,消息也不會丟失。
五、Message 的存儲機制
雖然 Message 是邏輯對象,但在 Broker 端有嚴(yán)格的物理存儲結(jié)構(gòu):
1.CommitLog
- 所有消息按到達順序追加寫入 CommitLog 文件(順序?qū)?,高性能?/li>
- 每個消息包含:Topic、Queue、Body、Properties 等完整信息
2.ConsumeQueue
- 每個 Topic 的每個 MessageQueue 對應(yīng)一個 ConsumeQueue
- 存儲消息的邏輯偏移量、大小、物理位置,用于快速定位消息
ConsumeQueue/{Topic}/{QueueId}/ ├── 00000000000000000000 └── ...
3.IndexFile
- 可選索引文件,支持通過 Keys 或時間范圍 查詢消息
- 用于排查問題(如“查找某個訂單的消息”)
IndexFile/index_1714567890000
六、Message 的發(fā)送方式回顧
方式 | 說明 |
---|---|
同步發(fā)送 | 阻塞等待結(jié)果,適用于關(guān)鍵消息 |
異步發(fā)送 | 回調(diào)通知結(jié)果,高吞吐場景 |
單向發(fā)送 | 不關(guān)心結(jié)果,日志類消息 |
事務(wù)消息 | 半消息 + 本地事務(wù) + 提交/回滾 |
所有方式發(fā)送的都是
Message
對象。
七、最佳實踐與注意事項
實踐 | 說明 |
---|---|
? 設(shè)置 Topic 和 Tag | 合理分類,便于管理和過濾 |
? 關(guān)鍵消息設(shè)置 Keys | 便于通過 mqadmin 查詢 |
? 控制 Body 大小 | ≤ 4MB,避免影響性能 |
? 使用 UTF-8 編碼 | 防止亂碼 |
? 避免空 Body | 可能導(dǎo)致異常 |
? 合理使用延遲消息 | 替代部分定時任務(wù),但不要濫用 |
? 自定義屬性用 putUserProperty | 避免覆蓋系統(tǒng)屬性 |
八、常見問題排查
問題 | 原因 | 解決方案 |
---|---|---|
MessageExt is null | 拉取超時或無消息 | 正?,F(xiàn)象,重試即可 |
msg put message to store error | 消息過大或磁盤滿 | 檢查大小限制和磁盤空間 |
延遲消息未按時投遞 | 時間誤差或 Broker 壓力大 | 接受輕微延遲,或使用外部調(diào)度系統(tǒng) |
通過 Key 查不到消息 | IndexFile 未生成或已過期 | 檢查 messageIndexEnable 配置 |
消息重復(fù) | 網(wǎng)絡(luò)重試、Rebalance | 消費者做冪等處理 |
? 總結(jié):Message 核心要點
維度 | 說明 |
---|---|
角色 | 消息傳輸?shù)幕締卧?/td> |
組成 | Topic + Body + Tags + Keys + Properties + Delay |
大小限制 | 默認(rèn) ≤ 4MB |
存儲方式 | 順序?qū)?CommitLog,索引通過 ConsumeQueue 和 IndexFile |
可查詢性 | 支持按 Key、時間、Offset 查詢 |
擴展性 | 支持自定義屬性,靈活傳遞上下文 |
高級功能 | 支持延遲、事務(wù)、順序消息 |
?? 一句話總結(jié):
Message 是 RocketMQ 的“數(shù)據(jù)包” —— 它不僅是業(yè)務(wù)數(shù)據(jù)的載體,更是路由、過濾、追蹤、延遲、事務(wù)等功能的基礎(chǔ)。
設(shè)計好 Message 的結(jié)構(gòu)與屬性,才能讓消息系統(tǒng)真正高效、可靠、易維護。
掌握 Message
,你就掌握了 RocketMQ 的“語言”。
到此這篇關(guān)于RocketMQ 消息Message的結(jié)構(gòu)和使用方式詳解的文章就介紹到這了,更多相關(guān)RocketMQ 消息Message內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)
這篇文章主要為大家介紹了Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05Mybatis 中Mapper使用package方式配置報錯的解決方案
這篇文章主要介紹了Mybatis 中Mapper使用package方式配置報錯的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07OpenFeign服務(wù)接口調(diào)用的過程詳解
Feign是一個聲明式WebService客戶端。使用Feign能讓編寫Web?Service客戶端更加簡單。它的使用方法是定義一個服務(wù)接口然后在上面添加注解,這篇文章主要介紹了OpenFeign服務(wù)接口調(diào)用,需要的朋友可以參考下2022-10-10net.sf.json.JSONObject 為null 的判斷方法
下面小編就為大家?guī)硪黄猲et.sf.json.JSONObject 為null 的判斷方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02Java 高并發(fā)十: JDK8對并發(fā)的新支持詳解
本文主要介紹Java 高并發(fā)JDK8的支持,這里整理了詳細(xì)的資料及1. LongAdder 2. CompletableFuture 3. StampedLock的介紹,有興趣的小伙伴可以參考下2016-09-09