深入講解RocketMQ原理
RocketMQ
1.1 為什么要選RocketMQ
總結一下: 選擇中間件的可以從這些維度來考慮:可靠性,性能,功能,可運維行,可拓展性,社區(qū)活躍度
。目前常用的幾個中間件,ActiveMQ作為“老古董”,市面上用的已經不多,其它幾種:
RabbitMQ
:優(yōu)點:輕量,迅捷,容易部署和使用,擁有靈活的路由配置缺點:性能和吞吐量不太理想,不易進行二次開發(fā)RocketMQ
:優(yōu)點:性能好,高吞吐量,穩(wěn)定可靠,有活躍的中文社區(qū)缺點:兼容性上不是太好Kafka
:優(yōu)點:擁有強大的性能及吞吐量,兼容性很好缺點:由于“攢一波再處理”導致延遲比較高
1.2 RocketMQ優(yōu)缺點
RocketMQ
優(yōu)點:
- 單機吞吐量:十萬級
- 可用性:非常高,分布式架構
- 消息可靠性:經過參數優(yōu)化配置,消息可以做到0丟失
- 功能支持:MQ功能較為完善,還是分布式的,擴展性好
- 支持10億級別的消息堆積,不會因為堆積導致性能下降
- 源碼是Java,方便結合公司自己的業(yè)務二次開發(fā)
- 天生為金融互聯(lián)網領域而生,對于可靠性要求很高的場景,尤其是電商里面的訂單扣款,以及業(yè)務削峰,在大量交易涌入時,后端可能無法及時處理的情況
RoketMQ
在穩(wěn)定性上可能更值得信賴,這些業(yè)務場景在阿里雙11已經經歷了多次考驗
RocketMQ
缺點:
- 支持的客戶端語言不多,目前是Java及c++,其中c++不成熟
- 沒有在
MQ
核心中去實現JMS
等接口,有些系統(tǒng)要遷移需要修改大量代碼
1.3 消息模型
1.3.1 消息隊列模型
消息隊列有兩種模型:隊列模型
和發(fā)布/訂閱模型
1.隊列模型
這是最初的一種消息隊列模型,對應著消息隊列發(fā)-存-收
的模型。生產者往某個隊列里面發(fā)送消息,一個隊列可以存儲多個生產者的消息,一個隊列也可以有多個消費者,但是消費者之間是競爭關系,也就是說每條消息只能被一個消費者消費。
2.發(fā)布/訂閱模型
如果需要將一份消息數據分發(fā)給多個消費者,并且每個消費者都要求收到全量的消息。很顯然,隊列模型無法滿足這個需求。解決的方式就是發(fā)布/訂閱模型。 在發(fā)布 - 訂閱
模型中,消息的發(fā)送方稱為發(fā)布者(Publisher
),消息的接收方稱為訂閱者(Subscriber
),服務端存放消息的容器
稱為主題(Topic
)。發(fā)布者將消息發(fā)送到主題中,訂閱者在接收消息之前需要先訂閱主題
。“訂閱”在這里既是一個動作,同時還可以認為是主題在消費時的一個邏輯副本,每份訂閱中,訂閱者都可以接收到主題的所有消息。
它和 隊列模式
的異同:生產者就是發(fā)布者,隊列就是主題,消費者就是訂閱者,無本質區(qū)別。唯一的不同點在于:一份消息數據是否可以被多次消費
1.3.2 RocketMQ消息模型
RocketMQ
使用的消息模型是標準的發(fā)布-訂閱
模型,在RocketMQ
的術語表中,生產者、消費者和主題,與發(fā)布-訂閱模型中的概念是完全一樣的。
1.3.3 RocketMQ中成員
RocketMQ
本身的消息是由下面幾部分組成:
Message
Message
(消息)就是要傳輸的信息 一條消息必須有一個主題(Topic
),主題可以看做是你的信件要郵寄的地址。 一條消息也可以擁有一個可選的標簽(Tag
)和額處的鍵值對,它們可以用于設置一個業(yè)務Key
并在 Broker
上查找此消息以便在開發(fā)期間查找問題。
Topic
Topic
(主題)可以看做消息的歸類,它是消息的第一級類型
。比如一個電商系統(tǒng)可以分為:交易消息
、物流消息
等,一條消息必須有一個 Topic
Topic
與生產者和消費者的關系非常松散,一個 Topic
可以有0個
、1個
、多個生產者
向其發(fā)送消息,一個生產者也可以同時向不同的 Topic
發(fā)送消息。 一個 Topic
也可以被 0個
、1個
、多個消費者
訂閱。
Tag
Tag
(標簽)可以看作子主題
,它是消息的第二級類型
,用于為用戶提供額外的靈活性。使用標簽,同一業(yè)務模塊不同目的的消息就可以用相同 Topic
而不同的 Tag
來標識。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag
標簽有助于保持你的代碼干凈和連貫,并且還可以為 RocketMQ
提供的查詢系統(tǒng)提供幫助。
Group
- Consumer Group :
RocketMQ
中,訂閱者的概念是通過消費組(Consumer Group
)來體現的。每個消費組都消費主題中一份完整的消息,不同消費組之間消費進度彼此不受影響,也就是說,一條消息被Consumer Group1
消費過,也會再給Consumer Group2
消費。消費組中包含多個消費者,同一個組內的消費者是競爭消費的關系,每個消費者負責消費組內的一部分消息。默認情況,如果一條消息被消費者Consumer1
消費了,那同組的其他消費者就不會再收到這條消息。 - Producer Group :生產者組,簡單來說就是多個發(fā)送同一類消息的生產者稱之為一個生產者組,一群
Topic
相同的Producer
Message Queue
Message Queue
(消息隊列),一個 Topic
下可以設置多個消息隊列,Topic
包括多個 Message Queue
,如果一個 Consumer
需要獲取 Topic
下所有的消息,就要遍歷所有的 Message Queue
。
RocketMQ
還有一些其它的Queue
——例如ConsumerQueue
Offset
在Topic
的消費過程中,由于消息需要被不同的組進行多次消費,所以消費完的消息并不會立即被刪除,這就需要RocketMQ
為每個消費組在每個隊列上維護一個消費位置(Consumer Offset
),這個位置之前的消息都被消費過,之后的消息都沒有被消費過,每成功消費一條消息,消費位置就加一
也可以這么說,Queue
是一個長度無限的數組,Offset
就是下標。
總結圖示
RocketMQ
的消息模型中,這些就是比較關鍵的概念了 畫張圖總結一下
1.4 消息的消費模式
消息消費模式有兩種:Clustering
(集群消費)和Broadcasting
(廣播消費)
默認情況下就是集群消費
,這種模式下一個消費者組
共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續(xù)消費。 而廣播消費消息會發(fā)給消費者組中的每一個消費者進行消費。
1.5 RoctetMQ基本架構
先看圖,RocketMQ
的基本架構
RocketMQ
一共有四個部分組成:NameServer
,Broker
,Producer 生產者
,Consumer 消費者
,它們對應了:發(fā)現
、發(fā)
、存
、收
,為了保證高可用,一般每一部分都是集群部署的
類比一下我們生活的郵政系統(tǒng)—— 郵政系統(tǒng)要正常運行,離不開下面這四個角色, 一是發(fā)信者
,二 是收信者
, 三是負責暫存?zhèn)鬏數泥]局
, 四是負責協(xié)調各個地方郵局的管理機構
。對應到 RocketMQ 中,這四個角色就是 Producer
、 Consumer
、 Broker
、NameServer
1.5.1 NameServer
NameServer
是一個無狀態(tài)的服務器,角色類似于 Kafka
使用的 Zookeeper
,但比 Zookeeper
更輕量。
特點:
每個 NameServer
結點之間是相互獨立,彼此沒有任何信息交互。 Nameserver
被設計成幾乎是無狀態(tài)的,通過部署多個結點來標識自己是一個偽集群,Producer
在發(fā)送消息前從 NameServer
中獲取 Topic
的路由信息也就是發(fā)往哪個 Broker
,Consumer
也會定時從 NameServer
獲取 Topic
的路由信息,Broker
在啟動時會向 NameServer
注冊,并定時進行心跳連接,且定時同步維護的 Topic
到 NameServer
功能主要有兩個:
- 和
Broker
結點保持長連接。 - 維護
Topic
的路由信息。
1.5.2 Broker
消息存儲和中轉角色,負責存儲和轉發(fā)消息
Broker
內部維護著一個個 Consumer Queue
,用來存儲消息的索引,真正存儲消息的地方是 CommitLog
(日志文件)
單個 Broker
與所有的 Nameserver
保持著長連接和心跳,并會定時將 Topic
信息同步到 NameServer
,和 NameServer
的通信底層是通過 Netty
實現的。
1.5.3 Producer
消息生產者,業(yè)務端負責發(fā)送消息,由用戶自行實現和分布式部署。
Producer
由用戶進行分布式部署,消息由Producer
通過多種負載均衡模式發(fā)送到Broker
集群,發(fā)送低延時,支持快速失敗。 RocketMQ
提供了三種方式發(fā)送消息:同步
、異步
和單向
同步發(fā)送
:同步發(fā)送指消息發(fā)送方發(fā)出數據后會在收到接收方發(fā)回響應之后才發(fā)下一個數據包。一般用于重要通知消息,例如重要通知郵件、營銷短信。異步發(fā)送
:異步發(fā)送指發(fā)送方發(fā)出數據后,不等接收方發(fā)回響應,接著發(fā)送下個數據包,一般用于可能鏈路耗時較長而對響應時間敏感的業(yè)務場景,例如用戶視頻上傳后通知啟動轉碼服務。單向發(fā)送
:單向發(fā)送是指只負責發(fā)送消息而不等待服務器回應且沒有回調函數觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集
1.5.4 Consumer
消息消費者,負責消費消息,一般是后臺系統(tǒng)負責異步消費。
Consumer
也由用戶部署,支持PUSH
和PULL
兩種消費模式,支持集群消費和廣播消費,提供實時的消息訂閱機制。
Pull
:拉取型消費者(Pull Consumer
)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啟動消費過程,所以Pull
稱為主動消費型
Push
:推送型消費者(Push Consumer
)封裝了消息的拉取、消費進度和其他的內部維護工作,將消息到達時執(zhí)行的回調接口留給用戶應用程序來實現。所以Push
稱為被動消費類型
,但其實從實現上看還是從消息服務器中拉取消息,不同于Pull
的是Push
首先要注冊消費監(jiān)聽器,當監(jiān)聽器處觸發(fā)后才開始消費消息
2 原理
2.1 RocketMQ整體工作流程
簡單來說,RocketMQ
是一個分布式消息隊列,也就是消息隊列
+分布式系統(tǒng)
作為消息隊列,它是發(fā)-存-收
的一個模型,對應的就是Producer、Broker、Cosumer
;作為分布式系統(tǒng),它要有服務端、客戶端、注冊中心,對應的就是Broker、Producer/Consumer、NameServer
所以我們看一下它主要的工作流程:RocketMQ
由NameServer
注冊中心集群、Producer
生產者集群、Consumer
消費者集群和若干Broker
(RocketMQ
進程)組成:
Broker
在啟動的時候去向所有的NameServer
注冊,并保持長連接,每30s發(fā)送一次心跳Producer
在發(fā)送消息的時候從NameServer
獲取Broker
服務器地址,根據負載均衡算法選擇一臺服務器來發(fā)送消息Conusmer
消費消息的時候同樣從NameServer
獲取Broker
地址,然后主動拉取消息來消費
2.2 為什么RocketMQ不使用Zookeeper作為注冊中心
Kafka
我們都知道采用Zookeeper
作為注冊中心——當然也開始逐漸去Zookeeper
,RocketMQ
不使用Zookeeper
其實主要可能從這幾方面來考慮:
- 基于可用性的考慮
根據CAP
理論,同時最多只能滿足兩個點,而Zookeeper
滿足的是CP
,也就是說Zookeeper
并不能保證服務的可用性,Zookeeper
在進行選舉的時候,整個選舉的時間太長,期間整個集群都處于不可用的狀態(tài),而這對于一個注冊中心來說肯定是不能接受的,作為服務發(fā)現來說就應該是為可用性而設計。 - 基于性能的考慮
NameServer
本身的實現非常輕量,而且可以通過增加機器的方式水平擴展,增加集群的抗壓能力,而Zookeeper
的寫是不可擴展的,Zookeeper
要解決這個問題只能通過劃分領域,劃分多個Zookeeper
集群來解決,首先操作起來太復雜,其次這樣還是又違反了CAP
中的A的設計,導致服務之間是不連通的。 - 持久化的機制來帶的問題
ZooKeeper
的ZAB
協(xié)議對每一個寫請求,會在每個ZooKeeper
節(jié)點上保持寫一個事務日志,同時再加上定期的將內存數據鏡像(Snapshot
)到磁盤來保證數據的一致性和持久性,而對于一個簡單的服務發(fā)現的場景來說,這其實沒有太大的必要,這個實現方案太重了。而且本身存儲的數據應該是高度定制化的。 - 消息發(fā)送應該
弱
依賴注冊中心RocketMQ
的設計理念也正是基于此,生產者在第一次發(fā)送消息的時候從NameServer
獲取到Broker
地址后緩存到本地,如果NameServer
整個集群不可用,短時間內對于生產者和消費者并不會產生太大影響。
2.3 Broker保存數據(CommitLog,ConsumeQueue,Indexfile)
RocketMQ
主要的存儲文件包括CommitLog
文件、ConsumeQueue
文件、Indexfile
文件
CommitLog
:消息主體以及元數據的存儲主體,存儲Producer
端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G
, 文件名長度為20位
,左邊補零,剩余為起始偏移量,比如00000000000000000000
代表了第一個文件,起始偏移量為0
,文件大小為1G=1073741824
;當第一個文件寫滿了,第二個文件為00000000001073741824
,起始偏移量為1073741824
,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件。
CommitLog
文件保存于${Rocket_Home}/store/commitlog
目錄中,從圖中我們可以明顯看出來文件名的偏移量,每個文件默認1G
,寫滿后自動生成一個新的文件。
ConsumeQueue
:消息消費隊列,引入的目的主要是提高消息消費的性能,由于RocketMQ
是基于主題topic
的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog
文件中根據topic
檢索消息是非常低效的。Consumer
即可根據ConsumeQueue
來查找待消費的消息。其中,ConsumeQueue
(邏輯消費隊列)作為消費消息的索引,保存了指定Topic
下的隊列消息在CommitLog
中的起始物理偏移量offset
,消息大小size
和消息Tag
的HashCode
值ConsumeQueue
文件可以看成是基于Topic
的CommitLog
索引文件,故ConsumeQueue
文件夾的組織方式如下:topic/queue/file
三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
同樣ConsumeQueue
文件采取定長設計,每一個條目共20
個字節(jié),分別為8
字節(jié)的CommitLog
物理偏移量、4字節(jié)的消息長度、8字節(jié)tag hashcode
,單個文件由30W
個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue
文件大小約5.72M
;
IndexFile
:IndexFile
(索引文件)提供了一種可以通過key
或時間區(qū)間來查詢消息的方法。Index
文件的存儲位置是:{fileName}
,文件名fileName
是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile
文件大小約為400M
,一個IndexFile
可以保存2000W
個索引,IndexFile
的底層存儲設計為在文件系統(tǒng)中實現HashMap
結構,故RocketMQ
的索引文件其底層實現為hash
索引
總結一下:RocketMQ
采用的是混合型的存儲結構,即為Broker
單個實例下所有的隊列共用一個日志數據文件(即為CommitLog
)來存儲。
RocketMQ
的混合型存儲結構(多個Topic
的消息實體內容都存儲于一個CommitLog
中)針對Producer
和Consumer
分別采用了數據
和索引
部分相分離的存儲結構,Producer
發(fā)送消息至Broker
端,然后Broker
端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog
中。
只要消息被刷盤持久化至磁盤文件CommitLog
中,那么Producer
發(fā)送的消息就不會丟失。正因為如此,Consumer
也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker
允許等待30s
的時間,只要這段時間內有新消息到達,將直接返回給消費端。 這里,RocketMQ
的具體做法是,使用Broker
端的后臺服務線程—ReputMessageService
不停地分發(fā)請求并異步構建ConsumeQueue
(邏輯消費隊列)和IndexFile
(索引文件)數據。
2.4 RocketMQ怎么對文件進行讀寫
RocketMQ
對文件的讀寫巧妙地利用了操作系統(tǒng)的一些高效文件讀寫方式——PageCache
、順序讀寫
、零拷貝
2.4.1 PageCache、順序讀取
在RocketMQ
中,ConsumeQueue
邏輯消費隊列存儲的數據較少,并且是順序讀取,在page cache
機制的預讀取作用下,Consume Queue
文件的讀性能幾乎接近讀內存,即使在有消息堆積情況下也不會影響性能。而對于CommitLog
消息存儲的日志數據文件來說,讀取消息內容時候會產生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統(tǒng)IO
調度算法,比如設置調度算法為Deadline
(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。
頁緩存(PageCache
)是OS
對文件的緩存,用于加速對文件的讀寫。一般來說,程序對文件進行順序讀寫的速度幾乎接近于內存的讀寫速度,主要原因就是由于OS
使用PageCache
機制對讀寫訪問操作進行了性能優(yōu)化,將一部分的內存用作PageCache
。對于數據的寫入,OS
會先寫入至Cache
內,隨后通過異步的方式由pdflush
內核線程將Cache
內的數據刷盤至物理磁盤上。對于數據的讀取,如果一次讀取文件時出現未命中PageCache
的情況,OS
從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取
2.4.2 零拷貝
RocketMQ
主要通過MappedByteBuffer
對文件進行讀寫操作。其中,利用了NIO
中的FileChannel
模型將磁盤上的物理文件直接映射到用戶態(tài)的內存地址中(這種Mmap
的方式減少了傳統(tǒng)IO
,將磁盤文件數據在操作系統(tǒng)內核地址空間的緩沖區(qū),和用戶應用程序地址空間的緩沖區(qū)之間來回進行拷貝的性能開銷),將對文件的操作轉化為直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內存映射機制,故RocketMQ
的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至內存)。
什么是零拷貝 在操作系統(tǒng)中,使用傳統(tǒng)的方式,數據需要經歷幾次拷貝,還要經歷用戶態(tài)/內核態(tài)
切換
- 從磁盤復制數據到內核態(tài)內存;
- 從內核態(tài)內存復制到用戶態(tài)內存;
- 然后從用戶態(tài)內存復制到網絡驅動的內核態(tài)內存;
- 最后是從網絡驅動的內核態(tài)內存復制到網卡中進行傳輸。
所以,可以通過零拷貝的方式,減少用戶態(tài)與內核態(tài)的上下文切換和內存拷貝的次數,用來提升I/O
的性能。零拷貝比較常見的實現方式是mmap
,這種機制在Java
中是通過MappedByteBuffer
實現的。
2.5 消息刷盤怎么實現
RocketMQ
提供了兩種刷盤策略:同步刷盤
和異步刷盤
???????同步刷盤
:在消息達到Broker
的內存之后,必須刷到commitLog
日志文件中才算成功,然后返回Producer
數據已經發(fā)送成功。異步刷盤
:異步刷盤是指消息達到Broker
內存后就返回Producer
數據已經發(fā)送成功,會喚醒一個線程去將數據持久化到CommitLog
日志文件中
Broker
在消息的存取時直接操作的是內存(內存映射文件),這可以提供系統(tǒng)的吞吐量,但是無法避免機器掉電時數據丟失,所以需要持久化到磁盤中
刷盤的最終實現都是使用NIO
中的 MappedByteBuffer.force()
將映射區(qū)的數據寫入到磁盤,如果是同步刷盤的話,在Broker
把消息寫到CommitLog
映射區(qū)后,就會等待寫入完成 異步而言,只是喚醒對應的線程,不保證執(zhí)行的時機,流程如圖所示。
2.6 RocketMQ的負載均衡
RocketMQ
中的負載均衡都在Client
端完成,具體來說的話,主要可以分為Producer
端發(fā)送消息時候的負載均衡和Consumer
端訂閱消息的負載均衡。
2.6.1 Producer的負載均衡
Producer
端在發(fā)送消息的時候,會先根據Topic
找到指定的TopicPublishInfo
,在獲取了TopicPublishInfo
路由信息后,RocketMQ
的客戶端在默認方式下selectOneMessageQueue()
方法會從TopicPublishInfo
中的messageQueueList
中選擇一個隊列(MessageQueue
)進行發(fā)送消息。具這里有一個sendLatencyFaultEnable
開關變量,如果開啟,在隨機遞增取模的基礎上,再過濾掉not available
的Broker
代理。
Producer負載均衡:索引遞增隨機取模 public MessageQueue selectOneMessageQueue(){ //索引遞增 int index = this.sendWhichQueue.incrementAndGet(); //利用索引取隨機數,取余數 int pos = Math.abs(index) % this.messageQueueList.size(); if(pos<0){ pos=0; } return this.messageQueueList.get(pos); }
所謂的latencyFaultTolerance
,是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency
超過550Lms
,就退避3000Lms
;超過1000L
,就退避60000L
;如果關閉,采用隨機遞增取模的方式選擇一個隊列(MessageQueue
)來發(fā)送消息,latencyFaultTolerance
機制是實現消息發(fā)送高可用的核心關鍵所在。
2.6.2 Consumer的負載均衡
在RocketMQ
中,Consumer
端的兩種消費模式(Push/Pull
)都是基于拉模式來獲取消息的,而在Push
模式只是對pull
模式的一種封裝,其本質實現為消息拉取線程在從服務器拉取到一批消息后,然后提交到消息消費線程池后,又“馬不停蹄”的繼續(xù)向服務器再次嘗試拉取消息。如果未拉取到消息,則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費方式(Push/Pull
)中,均需要Consumer
端知道從Broker
端的哪一個消息隊列中去獲取消息。因此,有必要在Consumer
端來做負載均衡,即Broker
端中多個MessageQueue
分配給同一個ConsumerGroup
中的哪些Consumer
消費。
Consumer
端的心跳包發(fā)送 在Consumer
啟動后,它就會通過定時任務不斷地向RocketMQ
集群中的所有Broker
實例發(fā)送心跳包(其中包含了,消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端id的值等信息)。Broker
端在收到Consumer
的心跳消息后,會將它維護在ConsumerManager
的本地緩存變量—consumerTable
,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable
中,為之后做Consumer
端的負載均衡提供可以依據的元數據信息。Consumer
端實現負載均衡的核心類—RebalanceImpl
在Consumer
實例的啟動流程中的啟動MQClientInstance
實例部分,會完成負載均衡服務線程—RebalanceService
的啟動(每隔20s執(zhí)行一次)。 通過查看源碼可以發(fā)現,RebalanceService
線程的run()
方法最終調用的是RebalanceImpl
類的rebalanceByTopic()
方法,這個方法是實現Consumer
端負載均衡的核心。rebalanceByTopic()
方法會根據消費者通信類型為廣播模式
還是集群模式
做不同的邏輯處理
2.7 RocketMQ消息長輪詢
所謂的長輪詢,就是Consumer
拉取消息,如果對應的Queue
如果沒有數據,Broker
不會立即返回,而是把 PullReuqest
hold起來,等待 queue
消息后,或者長輪詢阻塞時間到了,再重新處理該 queue
上的所有 PullRequest
PullMessageProcessor#processRequest
//如果沒有拉到數據 case ResponseCode.PULL_NOT_FOUND: // broker 和 consumer 都允許 suspend,默認開啟 if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); //封裝一個PullRequest PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); //把PullRequest掛起來 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
掛起的請求,有一個服務線程會不停地檢查,看queue
中是否有數據,或者超時。
PullRequestHoldService#run()
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); //檢查hold住的請求 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
到此這篇關于深入講解RocketMQ原理的文章就介紹到這了,更多相關RocketMQ原理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring?@EventListener?異步中使用condition的問題及處理
這篇文章主要介紹了Spring?@EventListener?異步中使用condition的問題及處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12Java中File、Base64、MultipartFile之間相互轉換的代碼詳解
File、Base64和MultipartFile都是在編程中常用的類或者數據類型,用于處理文件和數據的存儲、傳輸和轉換等操作,本文將給大家介紹了Java中File、Base64、MultipartFile之間相互轉換,文中有詳細的代碼示例供大家參考,需要的朋友可以參考下2024-04-04springboot 如何解決static調用service為null
這篇文章主要介紹了springboot 如何解決static調用service為null的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06