深入講解RocketMQ原理
RocketMQ
1.1 為什么要選RocketMQ
總結(jié)一下: 選擇中間件的可以從這些維度來(lái)考慮:可靠性,性能,功能,可運(yùn)維行,可拓展性,社區(qū)活躍度
。目前常用的幾個(gè)中間件,ActiveMQ作為“老古董”,市面上用的已經(jīng)不多,其它幾種:
RabbitMQ
:優(yōu)點(diǎn):輕量,迅捷,容易部署和使用,擁有靈活的路由配置缺點(diǎn):性能和吞吐量不太理想,不易進(jìn)行二次開(kāi)發(fā)RocketMQ
:優(yōu)點(diǎn):性能好,高吞吐量,穩(wěn)定可靠,有活躍的中文社區(qū)缺點(diǎn):兼容性上不是太好Kafka
:優(yōu)點(diǎn):擁有強(qiáng)大的性能及吞吐量,兼容性很好缺點(diǎn):由于“攢一波再處理”導(dǎo)致延遲比較高
1.2 RocketMQ優(yōu)缺點(diǎn)
RocketMQ
優(yōu)點(diǎn):
- 單機(jī)吞吐量:十萬(wàn)級(jí)
- 可用性:非常高,分布式架構(gòu)
- 消息可靠性:經(jīng)過(guò)參數(shù)優(yōu)化配置,消息可以做到0丟失
- 功能支持:MQ功能較為完善,還是分布式的,擴(kuò)展性好
- 支持10億級(jí)別的消息堆積,不會(huì)因?yàn)槎逊e導(dǎo)致性能下降
- 源碼是Java,方便結(jié)合公司自己的業(yè)務(wù)二次開(kāi)發(fā)
- 天生為金融互聯(lián)網(wǎng)領(lǐng)域而生,對(duì)于可靠性要求很高的場(chǎng)景,尤其是電商里面的訂單扣款,以及業(yè)務(wù)削峰,在大量交易涌入時(shí),后端可能無(wú)法及時(shí)處理的情況
RoketMQ
在穩(wěn)定性上可能更值得信賴,這些業(yè)務(wù)場(chǎng)景在阿里雙11已經(jīng)經(jīng)歷了多次考驗(yàn)
RocketMQ
缺點(diǎn):
- 支持的客戶端語(yǔ)言不多,目前是Java及c++,其中c++不成熟
- 沒(méi)有在
MQ
核心中去實(shí)現(xiàn)JMS
等接口,有些系統(tǒng)要遷移需要修改大量代碼
1.3 消息模型
1.3.1 消息隊(duì)列模型
消息隊(duì)列有兩種模型:隊(duì)列模型
和發(fā)布/訂閱模型
1.隊(duì)列模型
這是最初的一種消息隊(duì)列模型,對(duì)應(yīng)著消息隊(duì)列發(fā)-存-收
的模型。生產(chǎn)者往某個(gè)隊(duì)列里面發(fā)送消息,一個(gè)隊(duì)列可以存儲(chǔ)多個(gè)生產(chǎn)者的消息,一個(gè)隊(duì)列也可以有多個(gè)消費(fèi)者,但是消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,也就是說(shuō)每條消息只能被一個(gè)消費(fèi)者消費(fèi)。
2.發(fā)布/訂閱模型
如果需要將一份消息數(shù)據(jù)分發(fā)給多個(gè)消費(fèi)者,并且每個(gè)消費(fèi)者都要求收到全量的消息。很顯然,隊(duì)列模型無(wú)法滿足這個(gè)需求。解決的方式就是發(fā)布/訂閱模型。 在發(fā)布 - 訂閱
模型中,消息的發(fā)送方稱為發(fā)布者(Publisher
),消息的接收方稱為訂閱者(Subscriber
),服務(wù)端存放消息的容器
稱為主題(Topic
)。發(fā)布者將消息發(fā)送到主題中,訂閱者在接收消息之前需要先訂閱主題
。“訂閱”在這里既是一個(gè)動(dòng)作,同時(shí)還可以認(rèn)為是主題在消費(fèi)時(shí)的一個(gè)邏輯副本,每份訂閱中,訂閱者都可以接收到主題的所有消息。
它和 隊(duì)列模式
的異同:生產(chǎn)者就是發(fā)布者,隊(duì)列就是主題,消費(fèi)者就是訂閱者,無(wú)本質(zhì)區(qū)別。唯一的不同點(diǎn)在于:一份消息數(shù)據(jù)是否可以被多次消費(fèi)
1.3.2 RocketMQ消息模型
RocketMQ
使用的消息模型是標(biāo)準(zhǔn)的發(fā)布-訂閱
模型,在RocketMQ
的術(shù)語(yǔ)表中,生產(chǎn)者、消費(fèi)者和主題,與發(fā)布-訂閱模型中的概念是完全一樣的。
1.3.3 RocketMQ中成員
RocketMQ
本身的消息是由下面幾部分組成:
Message
Message
(消息)就是要傳輸?shù)男畔?一條消息必須有一個(gè)主題(Topic
),主題可以看做是你的信件要郵寄的地址。 一條消息也可以擁有一個(gè)可選的標(biāo)簽(Tag
)和額處的鍵值對(duì),它們可以用于設(shè)置一個(gè)業(yè)務(wù)Key
并在 Broker
上查找此消息以便在開(kāi)發(fā)期間查找問(wèn)題。
Topic
Topic
(主題)可以看做消息的歸類,它是消息的第一級(jí)類型
。比如一個(gè)電商系統(tǒng)可以分為:交易消息
、物流消息
等,一條消息必須有一個(gè) Topic
Topic
與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個(gè) Topic
可以有0個(gè)
、1個(gè)
、多個(gè)生產(chǎn)者
向其發(fā)送消息,一個(gè)生產(chǎn)者也可以同時(shí)向不同的 Topic
發(fā)送消息。 一個(gè) Topic
也可以被 0個(gè)
、1個(gè)
、多個(gè)消費(fèi)者
訂閱。
Tag
Tag
(標(biāo)簽)可以看作子主題
,它是消息的第二級(jí)類型
,用于為用戶提供額外的靈活性。使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic
而不同的 Tag
來(lái)標(biāo)識(shí)。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒(méi)有 Tag
標(biāo)簽有助于保持你的代碼干凈和連貫,并且還可以為 RocketMQ
提供的查詢系統(tǒng)提供幫助。
Group
- Consumer Group :
RocketMQ
中,訂閱者的概念是通過(guò)消費(fèi)組(Consumer Group
)來(lái)體現(xiàn)的。每個(gè)消費(fèi)組都消費(fèi)主題中一份完整的消息,不同消費(fèi)組之間消費(fèi)進(jìn)度彼此不受影響,也就是說(shuō),一條消息被Consumer Group1
消費(fèi)過(guò),也會(huì)再給Consumer Group2
消費(fèi)。消費(fèi)組中包含多個(gè)消費(fèi)者,同一個(gè)組內(nèi)的消費(fèi)者是競(jìng)爭(zhēng)消費(fèi)的關(guān)系,每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)組內(nèi)的一部分消息。默認(rèn)情況,如果一條消息被消費(fèi)者Consumer1
消費(fèi)了,那同組的其他消費(fèi)者就不會(huì)再收到這條消息。 - Producer Group :生產(chǎn)者組,簡(jiǎn)單來(lái)說(shuō)就是多個(gè)發(fā)送同一類消息的生產(chǎn)者稱之為一個(gè)生產(chǎn)者組,一群
Topic
相同的Producer
Message Queue
Message Queue
(消息隊(duì)列),一個(gè) Topic
下可以設(shè)置多個(gè)消息隊(duì)列,Topic
包括多個(gè) Message Queue
,如果一個(gè) Consumer
需要獲取 Topic
下所有的消息,就要遍歷所有的 Message Queue
。
RocketMQ
還有一些其它的Queue
——例如ConsumerQueue
Offset
在Topic
的消費(fèi)過(guò)程中,由于消息需要被不同的組進(jìn)行多次消費(fèi),所以消費(fèi)完的消息并不會(huì)立即被刪除,這就需要RocketMQ
為每個(gè)消費(fèi)組在每個(gè)隊(duì)列上維護(hù)一個(gè)消費(fèi)位置(Consumer Offset
),這個(gè)位置之前的消息都被消費(fèi)過(guò),之后的消息都沒(méi)有被消費(fèi)過(guò),每成功消費(fèi)一條消息,消費(fèi)位置就加一
也可以這么說(shuō),Queue
是一個(gè)長(zhǎng)度無(wú)限的數(shù)組,Offset
就是下標(biāo)。
總結(jié)圖示
RocketMQ
的消息模型中,這些就是比較關(guān)鍵的概念了 畫(huà)張圖總結(jié)一下
1.4 消息的消費(fèi)模式
消息消費(fèi)模式有兩種:Clustering
(集群消費(fèi))和Broadcasting
(廣播消費(fèi))
默認(rèn)情況下就是集群消費(fèi)
,這種模式下一個(gè)消費(fèi)者組
共同消費(fèi)一個(gè)主題的多個(gè)隊(duì)列,一個(gè)隊(duì)列只會(huì)被一個(gè)消費(fèi)者消費(fèi),如果某個(gè)消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會(huì)接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。 而廣播消費(fèi)消息會(huì)發(fā)給消費(fèi)者組中的每一個(gè)消費(fèi)者進(jìn)行消費(fèi)。
1.5 RoctetMQ基本架構(gòu)
先看圖,RocketMQ
的基本架構(gòu)
RocketMQ
一共有四個(gè)部分組成:NameServer
,Broker
,Producer 生產(chǎn)者
,Consumer 消費(fèi)者
,它們對(duì)應(yīng)了:發(fā)現(xiàn)
、發(fā)
、存
、收
,為了保證高可用,一般每一部分都是集群部署的
類比一下我們生活的郵政系統(tǒng)—— 郵政系統(tǒng)要正常運(yùn)行,離不開(kāi)下面這四個(gè)角色, 一是發(fā)信者
,二 是收信者
, 三是負(fù)責(zé)暫存?zhèn)鬏數(shù)泥]局
, 四是負(fù)責(zé)協(xié)調(diào)各個(gè)地方郵局的管理機(jī)構(gòu)
。對(duì)應(yīng)到 RocketMQ 中,這四個(gè)角色就是 Producer
、 Consumer
、 Broker
、NameServer
1.5.1 NameServer
NameServer
是一個(gè)無(wú)狀態(tài)的服務(wù)器,角色類似于 Kafka
使用的 Zookeeper
,但比 Zookeeper
更輕量。
特點(diǎn):
每個(gè) NameServer
結(jié)點(diǎn)之間是相互獨(dú)立,彼此沒(méi)有任何信息交互。 Nameserver
被設(shè)計(jì)成幾乎是無(wú)狀態(tài)的,通過(guò)部署多個(gè)結(jié)點(diǎn)來(lái)標(biāo)識(shí)自己是一個(gè)偽集群,Producer
在發(fā)送消息前從 NameServer
中獲取 Topic
的路由信息也就是發(fā)往哪個(gè) Broker
,Consumer
也會(huì)定時(shí)從 NameServer
獲取 Topic
的路由信息,Broker
在啟動(dòng)時(shí)會(huì)向 NameServer
注冊(cè),并定時(shí)進(jìn)行心跳連接,且定時(shí)同步維護(hù)的 Topic
到 NameServer
功能主要有兩個(gè):
- 和
Broker
結(jié)點(diǎn)保持長(zhǎng)連接。 - 維護(hù)
Topic
的路由信息。
1.5.2 Broker
消息存儲(chǔ)和中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)和轉(zhuǎn)發(fā)消息
Broker
內(nèi)部維護(hù)著一個(gè)個(gè) Consumer Queue
,用來(lái)存儲(chǔ)消息的索引,真正存儲(chǔ)消息的地方是 CommitLog
(日志文件)
單個(gè) Broker
與所有的 Nameserver
保持著長(zhǎng)連接和心跳,并會(huì)定時(shí)將 Topic
信息同步到 NameServer
,和 NameServer
的通信底層是通過(guò) Netty
實(shí)現(xiàn)的。
1.5.3 Producer
消息生產(chǎn)者,業(yè)務(wù)端負(fù)責(zé)發(fā)送消息,由用戶自行實(shí)現(xiàn)和分布式部署。
Producer
由用戶進(jìn)行分布式部署,消息由Producer
通過(guò)多種負(fù)載均衡模式發(fā)送到Broker
集群,發(fā)送低延時(shí),支持快速失敗。 RocketMQ
提供了三種方式發(fā)送消息:同步
、異步
和單向
同步發(fā)送
:同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營(yíng)銷短信。異步發(fā)送
:異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包,一般用于可能鏈路耗時(shí)較長(zhǎng)而對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,例如用戶視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù)。單向發(fā)送
:?jiǎn)蜗虬l(fā)送是指只負(fù)責(zé)發(fā)送消息而不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā),適用于某些耗時(shí)非常短但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集
1.5.4 Consumer
消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。
Consumer
也由用戶部署,支持PUSH
和PULL
兩種消費(fèi)模式,支持集群消費(fèi)和廣播消費(fèi),提供實(shí)時(shí)的消息訂閱機(jī)制。
Pull
:拉取型消費(fèi)者(Pull Consumer
)主動(dòng)從消息服務(wù)器拉取信息,只要批量拉取到消息,用戶應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程,所以Pull
稱為主動(dòng)消費(fèi)型
Push
:推送型消費(fèi)者(Push Consumer
)封裝了消息的拉取、消費(fèi)進(jìn)度和其他的內(nèi)部維護(hù)工作,將消息到達(dá)時(shí)執(zhí)行的回調(diào)接口留給用戶應(yīng)用程序來(lái)實(shí)現(xiàn)。所以Push
稱為被動(dòng)消費(fèi)類型
,但其實(shí)從實(shí)現(xiàn)上看還是從消息服務(wù)器中拉取消息,不同于Pull
的是Push
首先要注冊(cè)消費(fèi)監(jiān)聽(tīng)器,當(dāng)監(jiān)聽(tīng)器處觸發(fā)后才開(kāi)始消費(fèi)消息
2 原理
2.1 RocketMQ整體工作流程
簡(jiǎn)單來(lái)說(shuō),RocketMQ
是一個(gè)分布式消息隊(duì)列,也就是消息隊(duì)列
+分布式系統(tǒng)
作為消息隊(duì)列,它是發(fā)-存-收
的一個(gè)模型,對(duì)應(yīng)的就是Producer、Broker、Cosumer
;作為分布式系統(tǒng),它要有服務(wù)端、客戶端、注冊(cè)中心,對(duì)應(yīng)的就是Broker、Producer/Consumer、NameServer
所以我們看一下它主要的工作流程:RocketMQ
由NameServer
注冊(cè)中心集群、Producer
生產(chǎn)者集群、Consumer
消費(fèi)者集群和若干Broker
(RocketMQ
進(jìn)程)組成:
Broker
在啟動(dòng)的時(shí)候去向所有的NameServer
注冊(cè),并保持長(zhǎng)連接,每30s發(fā)送一次心跳Producer
在發(fā)送消息的時(shí)候從NameServer
獲取Broker
服務(wù)器地址,根據(jù)負(fù)載均衡算法選擇一臺(tái)服務(wù)器來(lái)發(fā)送消息Conusmer
消費(fèi)消息的時(shí)候同樣從NameServer
獲取Broker
地址,然后主動(dòng)拉取消息來(lái)消費(fèi)
2.2 為什么RocketMQ不使用Zookeeper作為注冊(cè)中心
Kafka
我們都知道采用Zookeeper
作為注冊(cè)中心——當(dāng)然也開(kāi)始逐漸去Zookeeper
,RocketMQ
不使用Zookeeper
其實(shí)主要可能從這幾方面來(lái)考慮:
- 基于可用性的考慮
根據(jù)CAP
理論,同時(shí)最多只能滿足兩個(gè)點(diǎn),而Zookeeper
滿足的是CP
,也就是說(shuō)Zookeeper
并不能保證服務(wù)的可用性,Zookeeper
在進(jìn)行選舉的時(shí)候,整個(gè)選舉的時(shí)間太長(zhǎng),期間整個(gè)集群都處于不可用的狀態(tài),而這對(duì)于一個(gè)注冊(cè)中心來(lái)說(shuō)肯定是不能接受的,作為服務(wù)發(fā)現(xiàn)來(lái)說(shuō)就應(yīng)該是為可用性而設(shè)計(jì)。 - 基于性能的考慮
NameServer
本身的實(shí)現(xiàn)非常輕量,而且可以通過(guò)增加機(jī)器的方式水平擴(kuò)展,增加集群的抗壓能力,而Zookeeper
的寫(xiě)是不可擴(kuò)展的,Zookeeper
要解決這個(gè)問(wèn)題只能通過(guò)劃分領(lǐng)域,劃分多個(gè)Zookeeper
集群來(lái)解決,首先操作起來(lái)太復(fù)雜,其次這樣還是又違反了CAP
中的A的設(shè)計(jì),導(dǎo)致服務(wù)之間是不連通的。 - 持久化的機(jī)制來(lái)帶的問(wèn)題
ZooKeeper
的ZAB
協(xié)議對(duì)每一個(gè)寫(xiě)請(qǐng)求,會(huì)在每個(gè)ZooKeeper
節(jié)點(diǎn)上保持寫(xiě)一個(gè)事務(wù)日志,同時(shí)再加上定期的將內(nèi)存數(shù)據(jù)鏡像(Snapshot
)到磁盤(pán)來(lái)保證數(shù)據(jù)的一致性和持久性,而對(duì)于一個(gè)簡(jiǎn)單的服務(wù)發(fā)現(xiàn)的場(chǎng)景來(lái)說(shuō),這其實(shí)沒(méi)有太大的必要,這個(gè)實(shí)現(xiàn)方案太重了。而且本身存儲(chǔ)的數(shù)據(jù)應(yīng)該是高度定制化的。 - 消息發(fā)送應(yīng)該
弱
依賴注冊(cè)中心RocketMQ
的設(shè)計(jì)理念也正是基于此,生產(chǎn)者在第一次發(fā)送消息的時(shí)候從NameServer
獲取到Broker
地址后緩存到本地,如果NameServer
整個(gè)集群不可用,短時(shí)間內(nèi)對(duì)于生產(chǎn)者和消費(fèi)者并不會(huì)產(chǎn)生太大影響。
2.3 Broker保存數(shù)據(jù)(CommitLog,ConsumeQueue,Indexfile)
RocketMQ
主要的存儲(chǔ)文件包括CommitLog
文件、ConsumeQueue
文件、Indexfile
文件
CommitLog
:消息主體以及元數(shù)據(jù)的存儲(chǔ)主體,存儲(chǔ)Producer
端寫(xiě)入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G
, 文件名長(zhǎng)度為20位
,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000
代表了第一個(gè)文件,起始偏移量為0
,文件大小為1G=1073741824
;當(dāng)?shù)谝粋€(gè)文件寫(xiě)滿了,第二個(gè)文件為00000000001073741824
,起始偏移量為1073741824
,以此類推。消息主要是順序?qū)懭肴罩疚募?,?dāng)文件滿了,寫(xiě)入下一個(gè)文件。
CommitLog
文件保存于${Rocket_Home}/store/commitlog
目錄中,從圖中我們可以明顯看出來(lái)文件名的偏移量,每個(gè)文件默認(rèn)1G
,寫(xiě)滿后自動(dòng)生成一個(gè)新的文件。
ConsumeQueue
:消息消費(fèi)隊(duì)列,引入的目的主要是提高消息消費(fèi)的性能,由于RocketMQ
是基于主題topic
的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的,如果要遍歷commitlog
文件中根據(jù)topic
檢索消息是非常低效的。Consumer
即可根據(jù)ConsumeQueue
來(lái)查找待消費(fèi)的消息。其中,ConsumeQueue
(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic
下的隊(duì)列消息在CommitLog
中的起始物理偏移量offset
,消息大小size
和消息Tag
的HashCode
值ConsumeQueue
文件可以看成是基于Topic
的CommitLog
索引文件,故ConsumeQueue
文件夾的組織方式如下:topic/queue/file
三層組織結(jié)構(gòu),具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
同樣ConsumeQueue
文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20
個(gè)字節(jié),分別為8
字節(jié)的CommitLog
物理偏移量、4字節(jié)的消息長(zhǎng)度、8字節(jié)tag hashcode
,單個(gè)文件由30W
個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目,每個(gè)ConsumeQueue
文件大小約5.72M
;
IndexFile
:IndexFile
(索引文件)提供了一種可以通過(guò)key
或時(shí)間區(qū)間來(lái)查詢消息的方法。Index
文件的存儲(chǔ)位置是:{fileName}
,文件名fileName
是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile
文件大小約為400M
,一個(gè)IndexFile
可以保存2000W
個(gè)索引,IndexFile
的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap
結(jié)構(gòu),故RocketMQ
的索引文件其底層實(shí)現(xiàn)為hash
索引
總結(jié)一下:RocketMQ
采用的是混合型的存儲(chǔ)結(jié)構(gòu),即為Broker
單個(gè)實(shí)例下所有的隊(duì)列共用一個(gè)日志數(shù)據(jù)文件(即為CommitLog
)來(lái)存儲(chǔ)。
RocketMQ
的混合型存儲(chǔ)結(jié)構(gòu)(多個(gè)Topic
的消息實(shí)體內(nèi)容都存儲(chǔ)于一個(gè)CommitLog
中)針對(duì)Producer
和Consumer
分別采用了數(shù)據(jù)
和索引
部分相分離的存儲(chǔ)結(jié)構(gòu),Producer
發(fā)送消息至Broker
端,然后Broker
端使用同步或者異步的方式對(duì)消息刷盤(pán)持久化,保存至CommitLog
中。
只要消息被刷盤(pán)持久化至磁盤(pán)文件CommitLog
中,那么Producer
發(fā)送的消息就不會(huì)丟失。正因?yàn)槿绱耍?code>Consumer也就肯定有機(jī)會(huì)去消費(fèi)這條消息。當(dāng)無(wú)法拉取到消息后,可以等下一次消息拉取,同時(shí)服務(wù)端也支持長(zhǎng)輪詢模式,如果一個(gè)消息拉取請(qǐng)求未拉取到消息,Broker
允許等待30s
的時(shí)間,只要這段時(shí)間內(nèi)有新消息到達(dá),將直接返回給消費(fèi)端。 這里,RocketMQ
的具體做法是,使用Broker
端的后臺(tái)服務(wù)線程—ReputMessageService
不停地分發(fā)請(qǐng)求并異步構(gòu)建ConsumeQueue
(邏輯消費(fèi)隊(duì)列)和IndexFile
(索引文件)數(shù)據(jù)。
2.4 RocketMQ怎么對(duì)文件進(jìn)行讀寫(xiě)
RocketMQ
對(duì)文件的讀寫(xiě)巧妙地利用了操作系統(tǒng)的一些高效文件讀寫(xiě)方式——PageCache
、順序讀寫(xiě)
、零拷貝
2.4.1 PageCache、順序讀取
在RocketMQ
中,ConsumeQueue
邏輯消費(fèi)隊(duì)列存儲(chǔ)的數(shù)據(jù)較少,并且是順序讀取,在page cache
機(jī)制的預(yù)讀取作用下,Consume Queue
文件的讀性能幾乎接近讀內(nèi)存,即使在有消息堆積情況下也不會(huì)影響性能。而對(duì)于CommitLog
消息存儲(chǔ)的日志數(shù)據(jù)文件來(lái)說(shuō),讀取消息內(nèi)容時(shí)候會(huì)產(chǎn)生較多的隨機(jī)訪問(wèn)讀取,嚴(yán)重影響性能。如果選擇合適的系統(tǒng)IO
調(diào)度算法,比如設(shè)置調(diào)度算法為Deadline
(此時(shí)塊存儲(chǔ)采用SSD的話),隨機(jī)讀的性能也會(huì)有所提升。
頁(yè)緩存(PageCache
)是OS
對(duì)文件的緩存,用于加速對(duì)文件的讀寫(xiě)。一般來(lái)說(shuō),程序?qū)ξ募M(jìn)行順序讀寫(xiě)的速度幾乎接近于內(nèi)存的讀寫(xiě)速度,主要原因就是由于OS
使用PageCache
機(jī)制對(duì)讀寫(xiě)訪問(wèn)操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache
。對(duì)于數(shù)據(jù)的寫(xiě)入,OS
會(huì)先寫(xiě)入至Cache
內(nèi),隨后通過(guò)異步的方式由pdflush
內(nèi)核線程將Cache
內(nèi)的數(shù)據(jù)刷盤(pán)至物理磁盤(pán)上。對(duì)于數(shù)據(jù)的讀取,如果一次讀取文件時(shí)出現(xiàn)未命中PageCache
的情況,OS
從物理磁盤(pán)上訪問(wèn)讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取
2.4.2 零拷貝
RocketMQ
主要通過(guò)MappedByteBuffer
對(duì)文件進(jìn)行讀寫(xiě)操作。其中,利用了NIO
中的FileChannel
模型將磁盤(pán)上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種Mmap
的方式減少了傳統(tǒng)IO
,將磁盤(pán)文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū),和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來(lái)回進(jìn)行拷貝的性能開(kāi)銷),將對(duì)文件的操作轉(zhuǎn)化為直接對(duì)內(nèi)存地址進(jìn)行操作,從而極大地提高了文件的讀寫(xiě)效率(正因?yàn)樾枰褂脙?nèi)存映射機(jī)制,故RocketMQ
的文件存儲(chǔ)都使用定長(zhǎng)結(jié)構(gòu)來(lái)存儲(chǔ),方便一次將整個(gè)文件映射至內(nèi)存)。
什么是零拷貝 在操作系統(tǒng)中,使用傳統(tǒng)的方式,數(shù)據(jù)需要經(jīng)歷幾次拷貝,還要經(jīng)歷用戶態(tài)/內(nèi)核態(tài)
切換
- 從磁盤(pán)復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存;
- 從內(nèi)核態(tài)內(nèi)存復(fù)制到用戶態(tài)內(nèi)存;
- 然后從用戶態(tài)內(nèi)存復(fù)制到網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存;
- 最后是從網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存復(fù)制到網(wǎng)卡中進(jìn)行傳輸。
所以,可以通過(guò)零拷貝的方式,減少用戶態(tài)與內(nèi)核態(tài)的上下文切換和內(nèi)存拷貝的次數(shù),用來(lái)提升I/O
的性能。零拷貝比較常見(jiàn)的實(shí)現(xiàn)方式是mmap
,這種機(jī)制在Java
中是通過(guò)MappedByteBuffer
實(shí)現(xiàn)的。
2.5 消息刷盤(pán)怎么實(shí)現(xiàn)
RocketMQ
提供了兩種刷盤(pán)策略:同步刷盤(pán)
和異步刷盤(pán)
???????同步刷盤(pán)
:在消息達(dá)到Broker
的內(nèi)存之后,必須刷到commitLog
日志文件中才算成功,然后返回Producer
數(shù)據(jù)已經(jīng)發(fā)送成功。異步刷盤(pán)
:異步刷盤(pán)是指消息達(dá)到Broker
內(nèi)存后就返回Producer
數(shù)據(jù)已經(jīng)發(fā)送成功,會(huì)喚醒一個(gè)線程去將數(shù)據(jù)持久化到CommitLog
日志文件中
Broker
在消息的存取時(shí)直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無(wú)法避免機(jī)器掉電時(shí)數(shù)據(jù)丟失,所以需要持久化到磁盤(pán)中
刷盤(pán)的最終實(shí)現(xiàn)都是使用NIO
中的 MappedByteBuffer.force()
將映射區(qū)的數(shù)據(jù)寫(xiě)入到磁盤(pán),如果是同步刷盤(pán)的話,在Broker
把消息寫(xiě)到CommitLog
映射區(qū)后,就會(huì)等待寫(xiě)入完成 異步而言,只是喚醒對(duì)應(yīng)的線程,不保證執(zhí)行的時(shí)機(jī),流程如圖所示。
2.6 RocketMQ的負(fù)載均衡
RocketMQ
中的負(fù)載均衡都在Client
端完成,具體來(lái)說(shuō)的話,主要可以分為Producer
端發(fā)送消息時(shí)候的負(fù)載均衡和Consumer
端訂閱消息的負(fù)載均衡。
2.6.1 Producer的負(fù)載均衡
Producer
端在發(fā)送消息的時(shí)候,會(huì)先根據(jù)Topic
找到指定的TopicPublishInfo
,在獲取了TopicPublishInfo
路由信息后,RocketMQ
的客戶端在默認(rèn)方式下selectOneMessageQueue()
方法會(huì)從TopicPublishInfo
中的messageQueueList
中選擇一個(gè)隊(duì)列(MessageQueue
)進(jìn)行發(fā)送消息。具這里有一個(gè)sendLatencyFaultEnable
開(kāi)關(guān)變量,如果開(kāi)啟,在隨機(jī)遞增取模的基礎(chǔ)上,再過(guò)濾掉not available
的Broker
代理。
Producer負(fù)載均衡:索引遞增隨機(jī)取模 public MessageQueue selectOneMessageQueue(){ //索引遞增 int index = this.sendWhichQueue.incrementAndGet(); //利用索引取隨機(jī)數(shù),取余數(shù) int pos = Math.abs(index) % this.messageQueueList.size(); if(pos<0){ pos=0; } return this.messageQueueList.get(pos); }
所謂的latencyFaultTolerance
,是指對(duì)之前失敗的,按一定的時(shí)間做退避。例如,如果上次請(qǐng)求的latency
超過(guò)550Lms
,就退避3000Lms
;超過(guò)1000L
,就退避60000L
;如果關(guān)閉,采用隨機(jī)遞增取模的方式選擇一個(gè)隊(duì)列(MessageQueue
)來(lái)發(fā)送消息,latencyFaultTolerance
機(jī)制是實(shí)現(xiàn)消息發(fā)送高可用的核心關(guān)鍵所在。
2.6.2 Consumer的負(fù)載均衡
在RocketMQ
中,Consumer
端的兩種消費(fèi)模式(Push/Pull
)都是基于拉模式來(lái)獲取消息的,而在Push
模式只是對(duì)pull
模式的一種封裝,其本質(zhì)實(shí)現(xiàn)為消息拉取線程在從服務(wù)器拉取到一批消息后,然后提交到消息消費(fèi)線程池后,又“馬不停蹄”的繼續(xù)向服務(wù)器再次嘗試?yán)∠?。如果未拉取到消息,則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費(fèi)方式(Push/Pull
)中,均需要Consumer
端知道從Broker
端的哪一個(gè)消息隊(duì)列中去獲取消息。因此,有必要在Consumer
端來(lái)做負(fù)載均衡,即Broker
端中多個(gè)MessageQueue
分配給同一個(gè)ConsumerGroup
中的哪些Consumer
消費(fèi)。
Consumer
端的心跳包發(fā)送 在Consumer
啟動(dòng)后,它就會(huì)通過(guò)定時(shí)任務(wù)不斷地向RocketMQ
集群中的所有Broker
實(shí)例發(fā)送心跳包(其中包含了,消息消費(fèi)分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端id的值等信息)。Broker
端在收到Consumer
的心跳消息后,會(huì)將它維護(hù)在ConsumerManager
的本地緩存變量—consumerTable
,同時(shí)并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量—channelInfoTable
中,為之后做Consumer
端的負(fù)載均衡提供可以依據(jù)的元數(shù)據(jù)信息。Consumer
端實(shí)現(xiàn)負(fù)載均衡的核心類—RebalanceImpl
在Consumer
實(shí)例的啟動(dòng)流程中的啟動(dòng)MQClientInstance
實(shí)例部分,會(huì)完成負(fù)載均衡服務(wù)線程—RebalanceService
的啟動(dòng)(每隔20s執(zhí)行一次)。 通過(guò)查看源碼可以發(fā)現(xiàn),RebalanceService
線程的run()
方法最終調(diào)用的是RebalanceImpl
類的rebalanceByTopic()
方法,這個(gè)方法是實(shí)現(xiàn)Consumer
端負(fù)載均衡的核心。rebalanceByTopic()
方法會(huì)根據(jù)消費(fèi)者通信類型為廣播模式
還是集群模式
做不同的邏輯處理
2.7 RocketMQ消息長(zhǎng)輪詢
所謂的長(zhǎng)輪詢,就是Consumer
拉取消息,如果對(duì)應(yīng)的Queue
如果沒(méi)有數(shù)據(jù),Broker
不會(huì)立即返回,而是把 PullReuqest
hold起來(lái),等待 queue
消息后,或者長(zhǎng)輪詢阻塞時(shí)間到了,再重新處理該 queue
上的所有 PullRequest
PullMessageProcessor#processRequest
//如果沒(méi)有拉到數(shù)據(jù) case ResponseCode.PULL_NOT_FOUND: // broker 和 consumer 都允許 suspend,默認(rèn)開(kāi)啟 if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); //封裝一個(gè)PullRequest PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); //把PullRequest掛起來(lái) this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
掛起的請(qǐng)求,有一個(gè)服務(wù)線程會(huì)不停地檢查,看queue
中是否有數(shù)據(jù),或者超時(shí)。
PullRequestHoldService#run()
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); //檢查hold住的請(qǐng)求 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
到此這篇關(guān)于深入講解RocketMQ原理的文章就介紹到這了,更多相關(guān)RocketMQ原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot mybais配置多數(shù)據(jù)源過(guò)程解析
這篇文章主要介紹了Springboot+mybais配置多數(shù)據(jù)源過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03Spring?@EventListener?異步中使用condition的問(wèn)題及處理
這篇文章主要介紹了Spring?@EventListener?異步中使用condition的問(wèn)題及處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Java中File、Base64、MultipartFile之間相互轉(zhuǎn)換的代碼詳解
File、Base64和MultipartFile都是在編程中常用的類或者數(shù)據(jù)類型,用于處理文件和數(shù)據(jù)的存儲(chǔ)、傳輸和轉(zhuǎn)換等操作,本文將給大家介紹了Java中File、Base64、MultipartFile之間相互轉(zhuǎn)換,文中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下2024-04-04javaWeb使用servlet搭建服務(wù)器入門(mén)
這篇文章主要為大家詳細(xì)介紹了javaWeb使用servlet搭建服務(wù)器入門(mén),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11springboot 如何解決static調(diào)用service為null
這篇文章主要介紹了springboot 如何解決static調(diào)用service為null的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例
本篇文章主要介紹了Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例,具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08基于Spring5實(shí)現(xiàn)登錄注冊(cè)功能
這篇文章主要為大家詳細(xì)介紹了基于Spring5實(shí)現(xiàn)登錄注冊(cè)功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-09-09JAVA實(shí)現(xiàn)經(jīng)典游戲坦克大戰(zhàn)的示例代碼
小時(shí)候大家都玩過(guò)坦克大戰(zhàn)吧,熟悉的旋律和豐富的關(guān)卡陪伴了我們一整個(gè)寒暑假。本文將通過(guò)Java+Swing實(shí)現(xiàn)這一經(jīng)典游戲,感興趣的可以學(xué)習(xí)一下2022-01-01