RocketMQ消息過濾與查詢的實(shí)現(xiàn)
消息過濾
RocketMQ分布式消息隊(duì)列的消息過濾方式有別于其它MQ中間件,是在Consumer端訂閱消息時再做消息過濾的。
RocketMQ這么做是還是在于其Producer端寫入消息和Consomer端訂閱消息采用分離存儲的機(jī)制來實(shí)現(xiàn)的,Consumer端訂閱消息是需要通過ConsumeQueue這個消息消費(fèi)的邏輯隊(duì)列拿到一個索引,然后再從CommitLog里面讀取真正的消息實(shí)體內(nèi)容,所以說到底也是還繞不開其存儲結(jié)構(gòu)。
其ConsumeQueue的存儲結(jié)構(gòu)如下,可以看到其中有8個字節(jié)存儲的Message Tag的哈希值,基于Tag的消息過濾正式基于這個字段值的。
主要支持如下2種的過濾方式
(1) Tag過濾方式:
Consumer端在訂閱消息時除了指定Topic還可以指定TAG,如果一個消息有多個TAG,可以用||分隔。
其中,Consumer端會將這個訂閱請求構(gòu)建成一個 SubscriptionData,發(fā)送一個Pull消息的請求給Broker端。Broker端從RocketMQ的文件存儲層—Store讀取數(shù)據(jù)之前,會用這些數(shù)據(jù)先構(gòu)建一個MessageFilter,然后傳給Store。
Store從 ConsumeQueue讀取到一條記錄后,會用它記錄的消息tag hash值去做過濾,由于在服務(wù)端只是根據(jù)hashcode進(jìn)行判斷,無法精確對tag原始字符串進(jìn)行過濾,故在消息消費(fèi)端拉取到消息后,還需要對消息的原始tag字符串進(jìn)行比對,如果不同,則丟棄該消息,不進(jìn)行消息消費(fèi)。
(2) SQL92的過濾方式:
這種方式的大致做法和上面的Tag過濾方式一樣,只是在Store層的具體過濾過程不太一樣,真正的 SQL expression 的構(gòu)建和執(zhí)行由rocketmq-filter模塊負(fù)責(zé)的。
每次過濾都去執(zhí)行SQL表達(dá)式會影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執(zhí)行。SQL92的表達(dá)式上下文為消息的屬性。
消息查詢
RocketMQ支持按照下面兩種維度(“按照Message Id查詢消息”、“按照Message Key查詢消息”)進(jìn)行消息查詢。
按照MessageId查詢消息
RocketMQ中的MessageId的長度總共有16字節(jié),其中包含了消息存儲主機(jī)地址(IP地址和端口),消息Commit Log offset。
“按照MessageId查詢消息”在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發(fā)送(業(yè)務(wù)請求碼:VIEW_MESSAGE_BY_ID)。
Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息返回。
按照Message Key查詢消息
“按照Message Key查詢消息”,主要是基于RocketMQ的IndexFile索引文件來實(shí)現(xiàn)的。RocketMQ的索引文件邏輯結(jié)構(gòu),類似JDK中HashMap的實(shí)現(xiàn)。
索引文件的具體結(jié)構(gòu)如下:
IndexFile索引文件為用戶提供通過“按照Message Key查詢消息”的消息索引查詢服務(wù),IndexFile文件的存儲位置是:$HOME\store\index\${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040個字節(jié)大小。
如果消息的properties中設(shè)置了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作。
如果消息設(shè)置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引。
其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。
NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。
Timestamp記錄的是消息storeTimestamp之間的差,并不是一個絕對的時間。整個Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計信息,4\*500W的 Slot Table并不保存真正的索引數(shù)據(jù),而是保存每個槽位對應(yīng)的單向鏈表的頭。
20\*2000W 是真正的索引數(shù)據(jù),即一個 Index File 可以保存 2000W個索引。
“按照Message Key查詢消息”的方式,RocketMQ的具體做法是,主要通過Broker端的QueryMessageProcessor業(yè)務(wù)處理器來查詢,讀取消息的過程就是用topic和key找到IndexFile索引文件中的一條記錄,根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實(shí)體內(nèi)容。
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA設(shè)置生成帶注釋的getter和setter的圖文教程
通常我們用idea默認(rèn)生成的getter和setter方法是不帶注釋的,當(dāng)然,我們同樣可以設(shè)置idea像MyEclipse一樣生成帶有Javadoc的模板,具體設(shè)置方法,大家參考下本文2018-05-05feign客戶端HTTP狀態(tài)碼為204時?響應(yīng)體被忽略的問題
這篇文章主要介紹了feign客戶端HTTP狀態(tài)碼為204時?響應(yīng)體被忽略的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03java?freemarker實(shí)現(xiàn)動態(tài)生成excel文件
這篇文章主要為大家詳細(xì)介紹了java如何通過freemarker實(shí)現(xiàn)動態(tài)生成excel文件,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-12-12Springboot自定義注解&傳參&簡單應(yīng)用方式
SpringBoot框架中,通過自定義注解結(jié)合AOP可以實(shí)現(xiàn)功能如日志記錄與耗時統(tǒng)計,首先創(chuàng)建LogController和TimeConsuming注解,并為LogController定義參數(shù),然后,在目標(biāo)方法上應(yīng)用這些注解,最后,使用AspectJ的AOP功能,通過切點(diǎn)表達(dá)式定位這些注解2024-10-10Java8新特性之接口中的默認(rèn)方法和靜態(tài)方法詳解
今天帶大家學(xué)習(xí)的是Java8新特性的相關(guān)知識,文章圍繞著Java接口中的默認(rèn)方法和靜態(tài)方法展開,文中有非常詳細(xì)的的代碼示例,需要的朋友可以參考下2021-06-06