深入理解Apache?RocketMQ?中Message?消息的核心概念
好的,我們來深入理解一下 Apache RocketMQ 中 Message (消息) 這個核心概念。這份文檔詳細(xì)闡述了消息的定義、在模型中的位置、內(nèi)部屬性、約束和使用建議。
推薦閱讀:RocketMQ 消息Message的結(jié)構(gòu)和使用方式詳解
你可以將 Message
看作是 RocketMQ 系統(tǒng)中數(shù)據(jù)傳輸和處理的最小原子單位。它承載了業(yè)務(wù)數(shù)據(jù),并附帶了豐富的元信息,是生產(chǎn)者、Broker 和消費者之間通信的載體。
1.Message的本質(zhì)定義
- 最小傳輸單元 (Smallest Unit of Data Transmission):
Message
是 RocketMQ 中數(shù)據(jù)傳輸?shù)?strong>基本單元。生產(chǎn)者將業(yè)務(wù)數(shù)據(jù)(負(fù)載)和擴展屬性封裝成Message
,發(fā)送給 Broker;Broker 再根據(jù)訂閱關(guān)系將Message
傳遞給消費者。
- 核心特性 (Characteristics):
- 不可變性 (Immutability):
- 消息一旦生成,其內(nèi)容(特別是系統(tǒng)屬性和負(fù)載)在傳輸和存儲過程中不會改變。它被視為一個“已發(fā)生的事件”。
- 消費者獲取到的消息是只讀的 (read-only)。在 5.x 版本中,這是強約束;在 3.x/4.x 版本中雖無強約束,但最佳實踐也建議不要修改。
- 最佳實踐:如果需要基于收到的消息發(fā)送新消息,應(yīng)該創(chuàng)建一個新消息(例如
MessageBuilder.buildFrom(m)
),而不是直接修改原消息。
- 持久性 (Persistence):
- 默認(rèn)情況下,RocketMQ 會將接收到的消息持久化存儲在 Broker 的存儲文件中。這是保證消息不丟失、支持消息追溯和系統(tǒng)故障恢復(fù)的基礎(chǔ)。
- 不可變性 (Immutability):
2.Message在模型中的位置
- 生命周期流程:
- 生產(chǎn) (Produced):由生產(chǎn)者 (Producer) 創(chuàng)建并初始化。
- 發(fā)送 (Sent):生產(chǎn)者將消息發(fā)送到 Apache RocketMQ Broker。
- 存儲 (Stored):Broker 接收到消息后,將其按接收順序存儲在特定 Topic 的某個 Queue 中。
- 消費 (Consumed):消費者 (Consumer) 根據(jù)訂閱關(guān)系,從 Broker 的相應(yīng) Queue 中拉取 (pull) 消息進行消費。
3.Message的核心內(nèi)部屬性
這些屬性分為系統(tǒng)保留屬性 (System retention attributes) 和可選屬性 (Optional attributes),以及負(fù)載 (Load)。
系統(tǒng)保留屬性 (由系統(tǒng)或生產(chǎn)者設(shè)置)
- Topic 名稱 (Topic name):
- 作用:標(biāo)識該消息屬于哪個邏輯主題。在集群內(nèi)必須唯一。
- 來源:由生產(chǎn)者 SDK 設(shè)置。
消息類型 (Message type):
- 作用:定義消息的語義和處理方式。RocketMQ 支持多種類型:
Normal
:普通消息,無特殊語義。FIFO
:順序消息,保證同一消息組 (Message Group) 內(nèi)的消息按發(fā)送順序被消費。實現(xiàn)依賴于 Queue 的有序性。Delay
:延遲消息,可以指定延遲時間(最大40天),延遲時間到后才對消費者可見。Transaction
:事務(wù)消息,用于實現(xiàn)分布式事務(wù),確保本地數(shù)據(jù)庫操作和消息發(fā)送的最終一致性。
- 作用:定義消息的語義和處理方式。RocketMQ 支持多種類型:
消息隊列 (Message queue):
- 作用:指明該消息最終被存儲在哪個具體的
Queue
中(屬于哪個 Topic 的哪個 Queue)。 - 來源:由 Broker 在消息到達后,根據(jù)路由策略(如輪詢、哈希等)指定并填充。
- 作用:指明該消息最終被存儲在哪個具體的
消息 Offset (Message offset):
- 作用:標(biāo)識該消息在其所屬 Queue 內(nèi)部的物理存儲位置(偏移量)。是 Broker 內(nèi)部管理消息順序和消費者消費進度的關(guān)鍵。
- 來源:由 Broker 指定并填充。從 0 開始遞增。
消息 ID (Message ID):
- 作用:消息的全局唯一標(biāo)識符。在集群內(nèi)絕對唯一,用于消息追蹤、排查問題。
- 來源:由生產(chǎn)者客戶端自動生成(通常是 32 位的數(shù)字和大寫字母組成的字符串)。
可選屬性 (由生產(chǎn)者設(shè)置)
- (可選) 消息 Keys (Message keys):
- 作用:為消息設(shè)置一個或多個索引鍵。主要用于消息查詢(通過
Message ID
或Message Key
在控制臺或通過 API 查找特定消息)和去重(結(jié)合業(yè)務(wù)邏輯)。 - 來源:由生產(chǎn)者客戶端定義。
- 作用:為消息設(shè)置一個或多個索引鍵。主要用于消息查詢(通過
- (可選) 消息 Tag (Message tag):
- 作用:消息的標(biāo)簽,用于消費者進行消息過濾。消費者可以訂閱特定的 Tag,從而只接收帶有該 Tag 的消息,實現(xiàn)簡單的消息分類。
- 來源:由生產(chǎn)者客戶端定義。
- 約束:每個消息只能設(shè)置一個 Tag。
- (可選) 定時時間 (Scheduled time):
- 作用:配合
Delay
消息類型使用,指定消息延遲的具體時間戳(毫秒級),而不是延遲時長。 - 來源:由消息生產(chǎn)者定義。
- 約束:最大延遲時間 40 天。
- 作用:配合
時間戳屬性
- 消息發(fā)送時間 (Message sending time):
- 作用:記錄消息在生產(chǎn)者客戶端本地被發(fā)送出去的時間戳(毫秒級)。
- 來源:由生產(chǎn)者客戶端填充。
- 注意:這是客戶端時間,可能與 Broker 時間有偏差。
- 消息存儲時間 (Message store timestamp):
- 作用:記錄消息被Broker 成功寫入存儲(落盤)的時間戳(毫秒級)。對于延遲消息和事務(wù)消息,消費者感知到的“有效時間”通?;诖藭r間。
- 來源:由Broker 填充。
- 注意:這是 Broker 時間,是消息在服務(wù)端的“出生”時間。
重試與自定義
- 重試次數(shù) (Retry times):
- 作用:記錄該消息被 Broker 重新投遞給消費者的次數(shù)。每次消費失敗觸發(fā)重試,次數(shù)加一。第一次消費時為 0。
- 來源:由 Broker 標(biāo)記。消費者可以獲取此信息以進行冪等處理或特殊邏輯。
- 自定義屬性 (Custom attributes):
- 作用:生產(chǎn)者可以添加任意的 Key-Value (字符串類型) 對作為擴展信息,供業(yè)務(wù)邏輯使用。
- 來源:由生產(chǎn)者根據(jù)需要指定。
- 消息負(fù)載 (Message load):
- 作用:消息的實際業(yè)務(wù)數(shù)據(jù)內(nèi)容,即有效載荷 (Payload)。
- 來源:由生產(chǎn)者序列化成二進制字節(jié)流后設(shè)置。
- 約束:大小不能超過系統(tǒng)限制。
4.Message的行為約束
- 大小限制 (Size Limit):
- 核心約束:單條消息的大小不能超過上限,否則發(fā)送會失敗。
- 默認(rèn)限制:4 MB。這是非常重要的參數(shù),直接影響網(wǎng)絡(luò)傳輸、存儲和處理性能。
5.Message的使用建議與最佳實踐
- 避免單條消息過大 (Overloaded transmission):
- 原因:RocketMQ 是事件驅(qū)動的中間件。過大的消息會:
- 加重網(wǎng)絡(luò)傳輸負(fù)擔(dān),增加延遲。
- 影響錯誤重試:重試大消息成本高。
- 影響流控 (Throttling):流控粒度可能不夠精細(xì)。
- 建議:
- 嚴(yán)格控制單條消息的數(shù)據(jù)量,使其盡可能小。
- 如果業(yè)務(wù)上必須傳輸大量數(shù)據(jù),強烈建議:
- 拆分消息:將大數(shù)據(jù)按固定大小拆分成多條小消息。
- 使用外部存儲:將實際數(shù)據(jù)(如文件、圖片)存放到對象存儲(如 OSS)、文件系統(tǒng)或數(shù)據(jù)庫中,然后在消息的
load
或custom attributes
中只傳遞數(shù)據(jù)的訪問鏈接 (URL) 或 ID。
- 原因:RocketMQ 是事件驅(qū)動的中間件。過大的消息會:
- 遵守消息不可變性原則 (Immutability):
- 正確做法:收到消息后,如果需要轉(zhuǎn)發(fā)或基于它生成新消息,使用
MessageBuilder.buildFrom(m)
這樣的方法創(chuàng)建一個新消息實例,然后修改新實例的屬性(如 Topic, Tag, Load 等)再發(fā)送。 - 錯誤做法:直接調(diào)用
m.update()
修改收到的消息m
的內(nèi)容,然后發(fā)送。這違反了不可變性原則,可能導(dǎo)致不可預(yù)知的行為或在 5.x 版本中被拒絕。
- 正確做法:收到消息后,如果需要轉(zhuǎn)發(fā)或基于它生成新消息,使用
總結(jié)與核心理解
Message
是原子單元:它封裝了業(yè)務(wù)數(shù)據(jù)和元信息,是 RocketMQ 傳輸?shù)淖钚挝弧?/li>Message
是不可變的:內(nèi)容一旦產(chǎn)生,在傳遞過程中不應(yīng)被修改。最佳實踐是“讀取-創(chuàng)建-發(fā)送”新消息。Message
是持久化的:默認(rèn)落盤存儲,保證可靠性。Message
擁有豐富的屬性:Topic/Queue/Offset
定義了其在系統(tǒng)中的位置和順序。Message ID
提供全局唯一標(biāo)識。Keys/Tag
支持查詢和過濾。Message Type
定義了語義(普通、順序、延遲、事務(wù))。Sending Time/Store Timestamp
記錄了關(guān)鍵時間點。Retry Times
協(xié)助處理消費失敗。Custom Attributes
提供擴展能力。Load
承載實際業(yè)務(wù)數(shù)據(jù)。
Message
有嚴(yán)格的大小限制:默認(rèn) 4MB。避免大消息是關(guān)鍵設(shè)計原則,應(yīng)通過拆分或外鏈方式處理大數(shù)據(jù)。- 最佳實踐:
- 合理使用
Tag
進行消息過濾。 - 利用
Keys
進行消息追蹤。 - 遵守不可變性,通過
buildFrom
創(chuàng)建新消息。 - 絕對不要發(fā)送超過 4MB 的消息,采用拆分或外鏈方案。
- 合理使用
簡而言之,Message
是 RocketMQ 的“信封”和“信件”本身。理解其結(jié)構(gòu)、屬性、約束和最佳實踐,對于設(shè)計高效、可靠、可維護的消息系統(tǒng)至關(guān)重要。記住:小消息、不可變、善用屬性、規(guī)避大負(fù)載。
相關(guān)文章
詳解Linux 服務(wù)管理兩種方式service和systemctl
systemd是Linux系統(tǒng)最新的初始化系統(tǒng)(init),作用是提高系統(tǒng)的啟動速度,盡可能啟動較少的進程,盡可能更多進程并發(fā)啟動。這篇文章主要介紹了Linux 服務(wù)管理兩種方式service和systemctl,需要的朋友可以參考下2019-09-09Ubuntu18 給terminal改個漂亮的命令行提示符的方法
這篇文章主要介紹了Ubuntu18 給terminal改個漂亮的命令行提示符的方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2019-06-06淺談生產(chǎn)者消費者模型(Linux系統(tǒng)下的兩種實現(xiàn)方法)
下面小編就為大家?guī)硪黄獪\談生產(chǎn)者消費者模型(Linux系統(tǒng)下的兩種實現(xiàn)方法)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-01-01Linux下安裝Oracle(CentOS-Oracle 12c)的方法
這篇文章主要介紹了Linux下安裝Oracle(CentOS-Oracle 12c)的方法,本文實例講解,介紹的非常詳細(xì),具有參考借鑒價值,感興趣的朋友一起看看吧2016-11-11