欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

深入講解RocketMQ原理

 更新時間:2023年07月04日 11:11:26   作者:愛吃牛肉的大老虎  
這篇文章主要介紹了詳解SpringBoot整合RocketMQ,RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等,需要的朋友可以參考下

RocketMQ

1.1 為什么要選RocketMQ

在這里插入圖片描述

總結一下: 選擇中間件的可以從這些維度來考慮:可靠性,性能,功能,可運維行,可拓展性,社區(qū)活躍度。目前常用的幾個中間件,ActiveMQ作為“老古董”,市面上用的已經不多,其它幾種:

  • RabbitMQ:優(yōu)點:輕量,迅捷,容易部署和使用,擁有靈活的路由配置缺點:性能和吞吐量不太理想,不易進行二次開發(fā)
  • RocketMQ:優(yōu)點:性能好,高吞吐量,穩(wěn)定可靠,有活躍的中文社區(qū)缺點:兼容性上不是太好
  • Kafka:優(yōu)點:擁有強大的性能及吞吐量,兼容性很好缺點:由于“攢一波再處理”導致延遲比較高

1.2 RocketMQ優(yōu)缺點

RocketMQ優(yōu)點:

  • 單機吞吐量:十萬級
  • 可用性:非常高,分布式架構
  • 消息可靠性:經過參數優(yōu)化配置,消息可以做到0丟失
  • 功能支持:MQ功能較為完善,還是分布式的,擴展性好
  • 支持10億級別的消息堆積,不會因為堆積導致性能下降
  • 源碼是Java,方便結合公司自己的業(yè)務二次開發(fā)
  • 天生為金融互聯(lián)網領域而生,對于可靠性要求很高的場景,尤其是電商里面的訂單扣款,以及業(yè)務削峰,在大量交易涌入時,后端可能無法及時處理的情況
  • RoketMQ在穩(wěn)定性上可能更值得信賴,這些業(yè)務場景在阿里雙11已經經歷了多次考驗

RocketMQ缺點:

  • 支持的客戶端語言不多,目前是Java及c++,其中c++不成熟
  • 沒有在MQ核心中去實現JMS等接口,有些系統(tǒng)要遷移需要修改大量代碼

1.3 消息模型

1.3.1 消息隊列模型

消息隊列有兩種模型:隊列模型發(fā)布/訂閱模型

1.隊列模型

這是最初的一種消息隊列模型,對應著消息隊列發(fā)-存-收的模型。生產者往某個隊列里面發(fā)送消息,一個隊列可以存儲多個生產者的消息,一個隊列也可以有多個消費者,但是消費者之間是競爭關系,也就是說每條消息只能被一個消費者消費。

在這里插入圖片描述

2.發(fā)布/訂閱模型

如果需要將一份消息數據分發(fā)給多個消費者,并且每個消費者都要求收到全量的消息。很顯然,隊列模型無法滿足這個需求。解決的方式就是發(fā)布/訂閱模型。 在發(fā)布 - 訂閱模型中,消息的發(fā)送方稱為發(fā)布者(Publisher),消息的接收方稱為訂閱者(Subscriber),服務端存放消息的容器稱為主題(Topic)。發(fā)布者將消息發(fā)送到主題中,訂閱者在接收消息之前需要先訂閱主題。“訂閱”在這里既是一個動作,同時還可以認為是主題在消費時的一個邏輯副本,每份訂閱中,訂閱者都可以接收到主題的所有消息。

在這里插入圖片描述

它和 隊列模式的異同:生產者就是發(fā)布者,隊列就是主題,消費者就是訂閱者,無本質區(qū)別。唯一的不同點在于:一份消息數據是否可以被多次消費

1.3.2 RocketMQ消息模型

RocketMQ使用的消息模型是標準的發(fā)布-訂閱模型,在RocketMQ的術語表中,生產者、消費者和主題,與發(fā)布-訂閱模型中的概念是完全一樣的。

1.3.3 RocketMQ中成員

RocketMQ本身的消息是由下面幾部分組成:

在這里插入圖片描述

Message

Message(消息)就是要傳輸的信息 一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。 一條消息也可以擁有一個可選的標簽(Tag)和額處的鍵值對,它們可以用于設置一個業(yè)務Key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。

Topic

Topic(主題)可以看做消息的歸類,它是消息的第一級類型。比如一個電商系統(tǒng)可以分為:交易消息物流消息等,一條消息必須有一個 Topic

Topic 與生產者和消費者的關系非常松散,一個 Topic 可以有0個、1個、多個生產者向其發(fā)送消息,一個生產者也可以同時向不同的 Topic 發(fā)送消息。 一個 Topic 也可以被 0個、1個多個消費者訂閱。

Tag

Tag(標簽)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標簽,同一業(yè)務模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag

標簽有助于保持你的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。

Group

  • Consumer Group :RocketMQ中,訂閱者的概念是通過消費組(Consumer Group)來體現的。每個消費組都消費主題中一份完整的消息,不同消費組之間消費進度彼此不受影響,也就是說,一條消息被Consumer Group1消費過,也會再給Consumer Group2消費。消費組中包含多個消費者,同一個組內的消費者是競爭消費的關系,每個消費者負責消費組內的一部分消息。默認情況,如果一條消息被消費者Consumer1消費了,那同組的其他消費者就不會再收到這條消息。
  • Producer Group :生產者組,簡單來說就是多個發(fā)送同一類消息的生產者稱之為一個生產者組,一群Topic相同的Producer 

Message Queue

Message Queue(消息隊列),一個 Topic 下可以設置多個消息隊列,Topic 包括多個 Message Queue ,如果一個 Consumer 需要獲取 Topic下所有的消息,就要遍歷所有的 Message Queue。

RocketMQ還有一些其它的Queue——例如ConsumerQueue

Offset

Topic的消費過程中,由于消息需要被不同的組進行多次消費,所以消費完的消息并不會立即被刪除,這就需要RocketMQ為每個消費組在每個隊列上維護一個消費位置(Consumer Offset),這個位置之前的消息都被消費過,之后的消息都沒有被消費過,每成功消費一條消息,消費位置就加一

也可以這么說,Queue 是一個長度無限的數組,Offset 就是下標。

總結圖示

RocketMQ的消息模型中,這些就是比較關鍵的概念了 畫張圖總結一下

在這里插入圖片描述

1.4 消息的消費模式

消息消費模式有兩種:Clustering(集群消費)和Broadcasting(廣播消費)

在這里插入圖片描述

默認情況下就是集群消費,這種模式下一個消費者組共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續(xù)消費。 而廣播消費消息會發(fā)給消費者組中的每一個消費者進行消費。

1.5 RoctetMQ基本架構

先看圖,RocketMQ的基本架構

在這里插入圖片描述

RocketMQ 一共有四個部分組成:NameServer,BrokerProducer 生產者Consumer 消費者,它們對應了:發(fā)現、發(fā)、,為了保證高可用,一般每一部分都是集群部署的

類比一下我們生活的郵政系統(tǒng)—— 郵政系統(tǒng)要正常運行,離不開下面這四個角色, 一是發(fā)信者,二 是收信者, 三是負責暫存?zhèn)鬏數泥]局, 四是負責協(xié)調各個地方郵局的管理機構。對應到 RocketMQ 中,這四個角色就是 Producer、 ConsumerBroker、NameServer

在這里插入圖片描述

1.5.1 NameServer

NameServer 是一個無狀態(tài)的服務器,角色類似于 Kafka使用的 Zookeeper,但比 Zookeeper 更輕量。

特點:

每個 NameServer 結點之間是相互獨立,彼此沒有任何信息交互。 Nameserver 被設計成幾乎是無狀態(tài)的,通過部署多個結點來標識自己是一個偽集群,Producer 在發(fā)送消息前從 NameServer中獲取 Topic 的路由信息也就是發(fā)往哪個 Broker,Consumer 也會定時從 NameServer 獲取 Topic 的路由信息,Broker 在啟動時會向 NameServer 注冊,并定時進行心跳連接,且定時同步維護的 TopicNameServer 功能主要有兩個:

  • Broker 結點保持長連接。
  • 維護 Topic 的路由信息。

1.5.2 Broker

消息存儲和中轉角色,負責存儲和轉發(fā)消息

Broker 內部維護著一個個 Consumer Queue,用來存儲消息的索引,真正存儲消息的地方是 CommitLog(日志文件)

單個 Broker 與所有的 Nameserver 保持著長連接和心跳,并會定時將 Topic 信息同步到 NameServer,和 NameServer 的通信底層是通過 Netty 實現的。

1.5.3 Producer

消息生產者,業(yè)務端負責發(fā)送消息,由用戶自行實現和分布式部署。

Producer由用戶進行分布式部署,消息由Producer通過多種負載均衡模式發(fā)送到Broker集群,發(fā)送低延時,支持快速失敗。 RocketMQ 提供了三種方式發(fā)送消息:同步、異步單向

  • 同步發(fā)送:同步發(fā)送指消息發(fā)送方發(fā)出數據后會在收到接收方發(fā)回響應之后才發(fā)下一個數據包。一般用于重要通知消息,例如重要通知郵件、營銷短信。
  • 異步發(fā)送:異步發(fā)送指發(fā)送方發(fā)出數據后,不等接收方發(fā)回響應,接著發(fā)送下個數據包,一般用于可能鏈路耗時較長而對響應時間敏感的業(yè)務場景,例如用戶視頻上傳后通知啟動轉碼服務。
  • 單向發(fā)送:單向發(fā)送是指只負責發(fā)送消息而不等待服務器回應且沒有回調函數觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集

1.5.4 Consumer

消息消費者,負責消費消息,一般是后臺系統(tǒng)負責異步消費。

Consumer也由用戶部署,支持PUSHPULL兩種消費模式,支持集群消費和廣播消費,提供實時的消息訂閱機制。

  • Pull:拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啟動消費過程,所以 Pull 稱為主動消費型
  • Push:推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的內部維護工作,將消息到達時執(zhí)行的回調接口留給用戶應用程序來實現。所以 Push 稱為被動消費類型,但其實從實現上看還是從消息服務器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費監(jiān)聽器,當監(jiān)聽器處觸發(fā)后才開始消費消息

2 原理

2.1 RocketMQ整體工作流程

簡單來說,RocketMQ是一個分布式消息隊列,也就是消息隊列+分布式系統(tǒng)

作為消息隊列,它是發(fā)-存-收的一個模型,對應的就是Producer、Broker、Cosumer;作為分布式系統(tǒng),它要有服務端、客戶端、注冊中心,對應的就是Broker、Producer/Consumer、NameServer

所以我們看一下它主要的工作流程:RocketMQNameServer注冊中心集群、Producer生產者集群、Consumer消費者集群和若干BrokerRocketMQ進程)組成:

  • Broker在啟動的時候去向所有的NameServer注冊,并保持長連接,每30s發(fā)送一次心跳
  • Producer在發(fā)送消息的時候從NameServer獲取Broker服務器地址,根據負載均衡算法選擇一臺服務器來發(fā)送消息
  • Conusmer消費消息的時候同樣從NameServer獲取Broker地址,然后主動拉取消息來消費

在這里插入圖片描述

2.2 為什么RocketMQ不使用Zookeeper作為注冊中心

Kafka我們都知道采用Zookeeper作為注冊中心——當然也開始逐漸去Zookeeper,RocketMQ不使用Zookeeper其實主要可能從這幾方面來考慮:

  • 基于可用性的考慮
    根據CAP理論,同時最多只能滿足兩個點,而Zookeeper滿足的是CP,也就是說Zookeeper并不能保證服務的可用性,Zookeeper在進行選舉的時候,整個選舉的時間太長,期間整個集群都處于不可用的狀態(tài),而這對于一個注冊中心來說肯定是不能接受的,作為服務發(fā)現來說就應該是為可用性而設計。
  • 基于性能的考慮
    NameServer本身的實現非常輕量,而且可以通過增加機器的方式水平擴展,增加集群的抗壓能力,而Zookeeper的寫是不可擴展的,Zookeeper要解決這個問題只能通過劃分領域,劃分多個Zookeeper集群來解決,首先操作起來太復雜,其次這樣還是又違反了CAP中的A的設計,導致服務之間是不連通的。
  • 持久化的機制來帶的問題
    ZooKeeper 的 ZAB 協(xié)議對每一個寫請求,會在每個ZooKeeper節(jié)點上保持寫一個事務日志,同時再加上定期的將內存數據鏡像(Snapshot)到磁盤來保證數據的一致性和持久性,而對于一個簡單的服務發(fā)現的場景來說,這其實沒有太大的必要,這個實現方案太重了。而且本身存儲的數據應該是高度定制化的。
  • 消息發(fā)送應該依賴注冊中心
    RocketMQ的設計理念也正是基于此,生產者在第一次發(fā)送消息的時候從NameServer獲取到Broker地址后緩存到本地,如果NameServer整個集群不可用,短時間內對于生產者和消費者并不會產生太大影響。

2.3 Broker保存數據(CommitLog,ConsumeQueue,Indexfile)

RocketMQ主要的存儲文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件

在這里插入圖片描述

在這里插入圖片描述

  • CommitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G, 文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件。

CommitLog文件保存于${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來文件名的偏移量,每個文件默認1G,寫滿后自動生成一個新的文件。

在這里插入圖片描述

  • ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。 Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息TagHashCodeConsumeQueue文件可以看成是基于TopicCommitLog索引文件,故ConsumeQueue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName} 同樣ConsumeQueue文件采取定長設計,每一個條目共20個字節(jié),分別為8字節(jié)的CommitLog物理偏移量、4字節(jié)的消息長度、8字節(jié)tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;

在這里插入圖片描述

  • IndexFileIndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。Index文件的存儲位置是:{fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統(tǒng)中實現HashMap結構,故RocketMQ的索引文件其底層實現為hash索引

在這里插入圖片描述

總結一下:RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。

RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲于一個CommitLog中)針對ProducerConsumer分別采用了數據索引部分相分離的存儲結構,Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。

只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發(fā)送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內有新消息到達,將直接返回給消費端。 這里,RocketMQ的具體做法是,使用Broker端的后臺服務線程—ReputMessageService不停地分發(fā)請求并異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。

在這里插入圖片描述

2.4 RocketMQ怎么對文件進行讀寫

RocketMQ對文件的讀寫巧妙地利用了操作系統(tǒng)的一些高效文件讀寫方式——PageCache順序讀寫、零拷貝

2.4.1 PageCache、順序讀取

RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,并且是順序讀取,在page cache機制的預讀取作用下,Consume Queue文件的讀性能幾乎接近讀內存,即使在有消息堆積情況下也不會影響性能。而對于CommitLog消息存儲的日志數據文件來說,讀取消息內容時候會產生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統(tǒng)IO調度算法,比如設置調度算法為Deadline(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。

頁緩存(PageCache)是OS對文件的緩存,用于加速對文件的讀寫。一般來說,程序對文件進行順序讀寫的速度幾乎接近于內存的讀寫速度,主要原因就是由于OS使用PageCache機制對讀寫訪問操作進行了性能優(yōu)化,將一部分的內存用作PageCache。對于數據的寫入,OS會先寫入至Cache內,隨后通過異步的方式由pdflush內核線程將Cache內的數據刷盤至物理磁盤上。對于數據的讀取,如果一次讀取文件時出現未命中PageCache的情況,OS從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取

2.4.2 零拷貝

RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁盤上的物理文件直接映射到用戶態(tài)的內存地址中(這種Mmap的方式減少了傳統(tǒng)IO,將磁盤文件數據在操作系統(tǒng)內核地址空間的緩沖區(qū),和用戶應用程序地址空間的緩沖區(qū)之間來回進行拷貝的性能開銷),將對文件的操作轉化為直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內存映射機制,故RocketMQ的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至內存)。

什么是零拷貝 在操作系統(tǒng)中,使用傳統(tǒng)的方式,數據需要經歷幾次拷貝,還要經歷用戶態(tài)/內核態(tài)切換

  • 從磁盤復制數據到內核態(tài)內存;
  • 從內核態(tài)內存復制到用戶態(tài)內存;
  • 然后從用戶態(tài)內存復制到網絡驅動的內核態(tài)內存;
  • 最后是從網絡驅動的內核態(tài)內存復制到網卡中進行傳輸。

在這里插入圖片描述

所以,可以通過零拷貝的方式,減少用戶態(tài)與內核態(tài)的上下文切換和內存拷貝的次數,用來提升I/O的性能。零拷貝比較常見的實現方式是mmap,這種機制在Java中是通過MappedByteBuffer實現的。

在這里插入圖片描述

2.5 消息刷盤怎么實現

RocketMQ提供了兩種刷盤策略:同步刷盤異步刷盤

  • ???????同步刷盤:在消息達到Broker的內存之后,必須刷到commitLog日志文件中才算成功,然后返回Producer數據已經發(fā)送成功。
  • 異步刷盤:異步刷盤是指消息達到Broker內存后就返回Producer數據已經發(fā)送成功,會喚醒一個線程去將數據持久化到CommitLog日志文件中

Broker在消息的存取時直接操作的是內存(內存映射文件),這可以提供系統(tǒng)的吞吐量,但是無法避免機器掉電時數據丟失,所以需要持久化到磁盤中

刷盤的最終實現都是使用NIO中的 MappedByteBuffer.force() 將映射區(qū)的數據寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區(qū)后,就會等待寫入完成 異步而言,只是喚醒對應的線程,不保證執(zhí)行的時機,流程如圖所示。

在這里插入圖片描述

2.6 RocketMQ的負載均衡

RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端發(fā)送消息時候的負載均衡和Consumer端訂閱消息的負載均衡。

2.6.1 Producer的負載均衡

Producer端在發(fā)送消息的時候,會先根據Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發(fā)送消息。具這里有一個sendLatencyFaultEnable開關變量,如果開啟,在隨機遞增取模的基礎上,再過濾掉not availableBroker代理。

Producer負載均衡:索引遞增隨機取模
public MessageQueue selectOneMessageQueue(){
	//索引遞增
	int index = this.sendWhichQueue.incrementAndGet();
	//利用索引取隨機數,取余數
	int pos = Math.abs(index) % this.messageQueueList.size();
	if(pos<0){
		pos=0;	
	}
	return this.messageQueueList.get(pos);
}

所謂的latencyFaultTolerance,是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關閉,采用隨機遞增取模的方式選擇一個隊列(MessageQueue)來發(fā)送消息,latencyFaultTolerance機制是實現消息發(fā)送高可用的核心關鍵所在。

2.6.2 Consumer的負載均衡

RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基于拉模式來獲取消息的,而在Push模式只是對pull模式的一種封裝,其本質實現為消息拉取線程在從服務器拉取到一批消息后,然后提交到消息消費線程池后,又“馬不停蹄”的繼續(xù)向服務器再次嘗試拉取消息。如果未拉取到消息,則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費方式(Push/Pull)中,均需要Consumer端知道從Broker端的哪一個消息隊列中去獲取消息。因此,有必要在Consumer端來做負載均衡,即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費。

  1. Consumer端的心跳包發(fā)送 在Consumer啟動后,它就會通過定時任務不斷地向RocketMQ集群中的所有Broker實例發(fā)送心跳包(其中包含了,消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后,會將它維護在ConsumerManager的本地緩存變量—consumerTable,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負載均衡提供可以依據的元數據信息。
  2. Consumer端實現負載均衡的核心類—RebalanceImplConsumer實例的啟動流程中的啟動MQClientInstance實例部分,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執(zhí)行一次)。 通過查看源碼可以發(fā)現,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法,這個方法是實現Consumer端負載均衡的核心。 rebalanceByTopic()方法會根據消費者通信類型為廣播模式還是集群模式做不同的邏輯處理

2.7 RocketMQ消息長輪詢

所謂的長輪詢,就是Consumer拉取消息,如果對應的Queue如果沒有數據,Broker不會立即返回,而是把 PullReuqest hold起來,等待 queue消息后,或者長輪詢阻塞時間到了,再重新處理該 queue 上的所有 PullRequest

在這里插入圖片描述

PullMessageProcessor#processRequest

 //如果沒有拉到數據
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允許 suspend,默認開啟
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
          pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
     }
    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    //封裝一個PullRequest
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
   //把PullRequest掛起來
   this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
     response = null;
     break;
}

掛起的請求,有一個服務線程會不停地檢查,看queue中是否有數據,或者超時。

PullRequestHoldService#run()

@Override
public void run() {
   log.info("{} service started", this.getServiceName());
   while (!this.isStopped()) {
       try {
          if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                 this.waitForRunning(5 * 1000);
          } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
         long beginLockTimestamp = this.systemClock.now();
         //檢查hold住的請求
         this.checkHoldRequest();
         long costTime = this.systemClock.now() - beginLockTimestamp;
         if (costTime > 5 * 1000) {
              log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
         }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
	log.info("{} service end", this.getServiceName());
    }

到此這篇關于深入講解RocketMQ原理的文章就介紹到這了,更多相關RocketMQ原理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Springboot mybais配置多數據源過程解析

    Springboot mybais配置多數據源過程解析

    這篇文章主要介紹了Springboot+mybais配置多數據源過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-03-03
  • Spring?@EventListener?異步中使用condition的問題及處理

    Spring?@EventListener?異步中使用condition的問題及處理

    這篇文章主要介紹了Spring?@EventListener?異步中使用condition的問題及處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringBoot如何接收前端傳遞參數

    SpringBoot如何接收前端傳遞參數

    這篇文章主要介紹了SpringBoot如何接收前端傳遞參數,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2024-08-08
  • Java中File、Base64、MultipartFile之間相互轉換的代碼詳解

    Java中File、Base64、MultipartFile之間相互轉換的代碼詳解

    File、Base64和MultipartFile都是在編程中常用的類或者數據類型,用于處理文件和數據的存儲、傳輸和轉換等操作,本文將給大家介紹了Java中File、Base64、MultipartFile之間相互轉換,文中有詳細的代碼示例供大家參考,需要的朋友可以參考下
    2024-04-04
  • javaWeb使用servlet搭建服務器入門

    javaWeb使用servlet搭建服務器入門

    這篇文章主要為大家詳細介紹了javaWeb使用servlet搭建服務器入門,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-11-11
  • springboot 如何解決static調用service為null

    springboot 如何解決static調用service為null

    這篇文章主要介紹了springboot 如何解決static調用service為null的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Java設計模式之代理模式詳解

    Java設計模式之代理模式詳解

    這篇文章主要介紹了Java設計模式之代理模式詳解,文中有非常詳細的代碼示例,對正在學習java的小伙伴們有很好的幫助,需要的朋友可以參考下
    2021-05-05
  • Java 處理圖片與base64 編碼的相互轉換的示例

    Java 處理圖片與base64 編碼的相互轉換的示例

    本篇文章主要介紹了Java 處理圖片與base64 編碼的相互轉換的示例,具有一定的參考價值,有興趣的可以了解一下
    2017-08-08
  • 基于Spring5實現登錄注冊功能

    基于Spring5實現登錄注冊功能

    這篇文章主要為大家詳細介紹了基于Spring5實現登錄注冊功能,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • JAVA實現經典游戲坦克大戰(zhàn)的示例代碼

    JAVA實現經典游戲坦克大戰(zhàn)的示例代碼

    小時候大家都玩過坦克大戰(zhàn)吧,熟悉的旋律和豐富的關卡陪伴了我們一整個寒暑假。本文將通過Java+Swing實現這一經典游戲,感興趣的可以學習一下
    2022-01-01

最新評論