欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

深入講解RocketMQ原理

 更新時(shí)間:2023年07月04日 11:11:26   作者:愛(ài)吃牛肉的大老虎  
這篇文章主要介紹了詳解SpringBoot整合RocketMQ,RocketMQ作為一款純java、分布式、隊(duì)列模型的開(kāi)源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等,需要的朋友可以參考下

RocketMQ

1.1 為什么要選RocketMQ

在這里插入圖片描述

總結(jié)一下: 選擇中間件的可以從這些維度來(lái)考慮:可靠性,性能,功能,可運(yùn)維行,可拓展性,社區(qū)活躍度。目前常用的幾個(gè)中間件,ActiveMQ作為“老古董”,市面上用的已經(jīng)不多,其它幾種:

  • RabbitMQ:優(yōu)點(diǎn):輕量,迅捷,容易部署和使用,擁有靈活的路由配置缺點(diǎn):性能和吞吐量不太理想,不易進(jìn)行二次開(kāi)發(fā)
  • RocketMQ:優(yōu)點(diǎn):性能好,高吞吐量,穩(wěn)定可靠,有活躍的中文社區(qū)缺點(diǎn):兼容性上不是太好
  • Kafka:優(yōu)點(diǎn):擁有強(qiáng)大的性能及吞吐量,兼容性很好缺點(diǎn):由于“攢一波再處理”導(dǎo)致延遲比較高

1.2 RocketMQ優(yōu)缺點(diǎn)

RocketMQ優(yōu)點(diǎn):

  • 單機(jī)吞吐量:十萬(wàn)級(jí)
  • 可用性:非常高,分布式架構(gòu)
  • 消息可靠性:經(jīng)過(guò)參數(shù)優(yōu)化配置,消息可以做到0丟失
  • 功能支持:MQ功能較為完善,還是分布式的,擴(kuò)展性好
  • 支持10億級(jí)別的消息堆積,不會(huì)因?yàn)槎逊e導(dǎo)致性能下降
  • 源碼是Java,方便結(jié)合公司自己的業(yè)務(wù)二次開(kāi)發(fā)
  • 天生為金融互聯(lián)網(wǎng)領(lǐng)域而生,對(duì)于可靠性要求很高的場(chǎng)景,尤其是電商里面的訂單扣款,以及業(yè)務(wù)削峰,在大量交易涌入時(shí),后端可能無(wú)法及時(shí)處理的情況
  • RoketMQ在穩(wěn)定性上可能更值得信賴,這些業(yè)務(wù)場(chǎng)景在阿里雙11已經(jīng)經(jīng)歷了多次考驗(yàn)

RocketMQ缺點(diǎn):

  • 支持的客戶端語(yǔ)言不多,目前是Java及c++,其中c++不成熟
  • 沒(méi)有在MQ核心中去實(shí)現(xiàn)JMS等接口,有些系統(tǒng)要遷移需要修改大量代碼

1.3 消息模型

1.3.1 消息隊(duì)列模型

消息隊(duì)列有兩種模型:隊(duì)列模型發(fā)布/訂閱模型

1.隊(duì)列模型

這是最初的一種消息隊(duì)列模型,對(duì)應(yīng)著消息隊(duì)列發(fā)-存-收的模型。生產(chǎn)者往某個(gè)隊(duì)列里面發(fā)送消息,一個(gè)隊(duì)列可以存儲(chǔ)多個(gè)生產(chǎn)者的消息,一個(gè)隊(duì)列也可以有多個(gè)消費(fèi)者,但是消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,也就是說(shuō)每條消息只能被一個(gè)消費(fèi)者消費(fèi)。

在這里插入圖片描述

2.發(fā)布/訂閱模型

如果需要將一份消息數(shù)據(jù)分發(fā)給多個(gè)消費(fèi)者,并且每個(gè)消費(fèi)者都要求收到全量的消息。很顯然,隊(duì)列模型無(wú)法滿足這個(gè)需求。解決的方式就是發(fā)布/訂閱模型。 在發(fā)布 - 訂閱模型中,消息的發(fā)送方稱為發(fā)布者(Publisher),消息的接收方稱為訂閱者(Subscriber),服務(wù)端存放消息的容器稱為主題(Topic)。發(fā)布者將消息發(fā)送到主題中,訂閱者在接收消息之前需要先訂閱主題。“訂閱”在這里既是一個(gè)動(dòng)作,同時(shí)還可以認(rèn)為是主題在消費(fèi)時(shí)的一個(gè)邏輯副本,每份訂閱中,訂閱者都可以接收到主題的所有消息。

在這里插入圖片描述

它和 隊(duì)列模式的異同:生產(chǎn)者就是發(fā)布者,隊(duì)列就是主題,消費(fèi)者就是訂閱者,無(wú)本質(zhì)區(qū)別。唯一的不同點(diǎn)在于:一份消息數(shù)據(jù)是否可以被多次消費(fèi)

1.3.2 RocketMQ消息模型

RocketMQ使用的消息模型是標(biāo)準(zhǔn)的發(fā)布-訂閱模型,在RocketMQ的術(shù)語(yǔ)表中,生產(chǎn)者、消費(fèi)者和主題,與發(fā)布-訂閱模型中的概念是完全一樣的。

1.3.3 RocketMQ中成員

RocketMQ本身的消息是由下面幾部分組成:

在這里插入圖片描述

Message

Message(消息)就是要傳輸?shù)男畔?一條消息必須有一個(gè)主題(Topic),主題可以看做是你的信件要郵寄的地址。 一條消息也可以擁有一個(gè)可選的標(biāo)簽(Tag)和額處的鍵值對(duì),它們可以用于設(shè)置一個(gè)業(yè)務(wù)Key 并在 Broker 上查找此消息以便在開(kāi)發(fā)期間查找問(wèn)題。

Topic

Topic(主題)可以看做消息的歸類,它是消息的第一級(jí)類型。比如一個(gè)電商系統(tǒng)可以分為:交易消息物流消息等,一條消息必須有一個(gè) Topic

Topic 與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個(gè) Topic 可以有0個(gè)1個(gè)、多個(gè)生產(chǎn)者向其發(fā)送消息,一個(gè)生產(chǎn)者也可以同時(shí)向不同的 Topic 發(fā)送消息。 一個(gè) Topic 也可以被 0個(gè)、1個(gè)、多個(gè)消費(fèi)者訂閱。

Tag

Tag(標(biāo)簽)可以看作子主題,它是消息的第二級(jí)類型,用于為用戶提供額外的靈活性。使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來(lái)標(biāo)識(shí)。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒(méi)有 Tag

標(biāo)簽有助于保持你的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。

Group

  • Consumer Group :RocketMQ中,訂閱者的概念是通過(guò)消費(fèi)組(Consumer Group)來(lái)體現(xiàn)的。每個(gè)消費(fèi)組都消費(fèi)主題中一份完整的消息,不同消費(fèi)組之間消費(fèi)進(jìn)度彼此不受影響,也就是說(shuō),一條消息被Consumer Group1消費(fèi)過(guò),也會(huì)再給Consumer Group2消費(fèi)。消費(fèi)組中包含多個(gè)消費(fèi)者,同一個(gè)組內(nèi)的消費(fèi)者是競(jìng)爭(zhēng)消費(fèi)的關(guān)系,每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)組內(nèi)的一部分消息。默認(rèn)情況,如果一條消息被消費(fèi)者Consumer1消費(fèi)了,那同組的其他消費(fèi)者就不會(huì)再收到這條消息。
  • Producer Group :生產(chǎn)者組,簡(jiǎn)單來(lái)說(shuō)就是多個(gè)發(fā)送同一類消息的生產(chǎn)者稱之為一個(gè)生產(chǎn)者組,一群Topic相同的Producer 

Message Queue

Message Queue(消息隊(duì)列),一個(gè) Topic 下可以設(shè)置多個(gè)消息隊(duì)列,Topic 包括多個(gè) Message Queue ,如果一個(gè) Consumer 需要獲取 Topic下所有的消息,就要遍歷所有的 Message Queue。

RocketMQ還有一些其它的Queue——例如ConsumerQueue

Offset

Topic的消費(fèi)過(guò)程中,由于消息需要被不同的組進(jìn)行多次消費(fèi),所以消費(fèi)完的消息并不會(huì)立即被刪除,這就需要RocketMQ為每個(gè)消費(fèi)組在每個(gè)隊(duì)列上維護(hù)一個(gè)消費(fèi)位置(Consumer Offset),這個(gè)位置之前的消息都被消費(fèi)過(guò),之后的消息都沒(méi)有被消費(fèi)過(guò),每成功消費(fèi)一條消息,消費(fèi)位置就加一

也可以這么說(shuō),Queue 是一個(gè)長(zhǎng)度無(wú)限的數(shù)組,Offset 就是下標(biāo)。

總結(jié)圖示

RocketMQ的消息模型中,這些就是比較關(guān)鍵的概念了 畫(huà)張圖總結(jié)一下

在這里插入圖片描述

1.4 消息的消費(fèi)模式

消息消費(fèi)模式有兩種:Clustering(集群消費(fèi))和Broadcasting(廣播消費(fèi))

在這里插入圖片描述

默認(rèn)情況下就是集群消費(fèi),這種模式下一個(gè)消費(fèi)者組共同消費(fèi)一個(gè)主題的多個(gè)隊(duì)列,一個(gè)隊(duì)列只會(huì)被一個(gè)消費(fèi)者消費(fèi),如果某個(gè)消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會(huì)接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。 而廣播消費(fèi)消息會(huì)發(fā)給消費(fèi)者組中的每一個(gè)消費(fèi)者進(jìn)行消費(fèi)。

1.5 RoctetMQ基本架構(gòu)

先看圖,RocketMQ的基本架構(gòu)

在這里插入圖片描述

RocketMQ 一共有四個(gè)部分組成:NameServer,Broker,Producer 生產(chǎn)者Consumer 消費(fèi)者,它們對(duì)應(yīng)了:發(fā)現(xiàn)發(fā)、,為了保證高可用,一般每一部分都是集群部署的

類比一下我們生活的郵政系統(tǒng)—— 郵政系統(tǒng)要正常運(yùn)行,離不開(kāi)下面這四個(gè)角色, 一是發(fā)信者,二 是收信者, 三是負(fù)責(zé)暫存?zhèn)鬏數(shù)泥]局, 四是負(fù)責(zé)協(xié)調(diào)各個(gè)地方郵局的管理機(jī)構(gòu)。對(duì)應(yīng)到 RocketMQ 中,這四個(gè)角色就是 ProducerConsumer、 Broker、NameServer

在這里插入圖片描述

1.5.1 NameServer

NameServer 是一個(gè)無(wú)狀態(tài)的服務(wù)器,角色類似于 Kafka使用的 Zookeeper,但比 Zookeeper 更輕量。

特點(diǎn):

每個(gè) NameServer 結(jié)點(diǎn)之間是相互獨(dú)立,彼此沒(méi)有任何信息交互。 Nameserver 被設(shè)計(jì)成幾乎是無(wú)狀態(tài)的,通過(guò)部署多個(gè)結(jié)點(diǎn)來(lái)標(biāo)識(shí)自己是一個(gè)偽集群,Producer 在發(fā)送消息前從 NameServer中獲取 Topic 的路由信息也就是發(fā)往哪個(gè) Broker,Consumer 也會(huì)定時(shí)從 NameServer 獲取 Topic 的路由信息,Broker 在啟動(dòng)時(shí)會(huì)向 NameServer 注冊(cè),并定時(shí)進(jìn)行心跳連接,且定時(shí)同步維護(hù)的 TopicNameServer 功能主要有兩個(gè):

  • Broker 結(jié)點(diǎn)保持長(zhǎng)連接。
  • 維護(hù) Topic 的路由信息。

1.5.2 Broker

消息存儲(chǔ)和中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)和轉(zhuǎn)發(fā)消息

Broker 內(nèi)部維護(hù)著一個(gè)個(gè) Consumer Queue,用來(lái)存儲(chǔ)消息的索引,真正存儲(chǔ)消息的地方是 CommitLog(日志文件)

單個(gè) Broker 與所有的 Nameserver 保持著長(zhǎng)連接和心跳,并會(huì)定時(shí)將 Topic 信息同步到 NameServer,和 NameServer 的通信底層是通過(guò) Netty 實(shí)現(xiàn)的。

1.5.3 Producer

消息生產(chǎn)者,業(yè)務(wù)端負(fù)責(zé)發(fā)送消息,由用戶自行實(shí)現(xiàn)和分布式部署。

Producer由用戶進(jìn)行分布式部署,消息由Producer通過(guò)多種負(fù)載均衡模式發(fā)送到Broker集群,發(fā)送低延時(shí),支持快速失敗。 RocketMQ 提供了三種方式發(fā)送消息:同步、異步單向

  • 同步發(fā)送:同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營(yíng)銷短信。
  • 異步發(fā)送:異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包,一般用于可能鏈路耗時(shí)較長(zhǎng)而對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,例如用戶視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù)。
  • 單向發(fā)送:?jiǎn)蜗虬l(fā)送是指只負(fù)責(zé)發(fā)送消息而不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā),適用于某些耗時(shí)非常短但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集

1.5.4 Consumer

消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。

Consumer也由用戶部署,支持PUSHPULL兩種消費(fèi)模式,支持集群消費(fèi)和廣播消費(fèi),提供實(shí)時(shí)的消息訂閱機(jī)制。

  • Pull:拉取型消費(fèi)者(Pull Consumer)主動(dòng)從消息服務(wù)器拉取信息,只要批量拉取到消息,用戶應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程,所以 Pull 稱為主動(dòng)消費(fèi)型
  • Push:推送型消費(fèi)者(Push Consumer)封裝了消息的拉取、消費(fèi)進(jìn)度和其他的內(nèi)部維護(hù)工作,將消息到達(dá)時(shí)執(zhí)行的回調(diào)接口留給用戶應(yīng)用程序來(lái)實(shí)現(xiàn)。所以 Push 稱為被動(dòng)消費(fèi)類型,但其實(shí)從實(shí)現(xiàn)上看還是從消息服務(wù)器中拉取消息,不同于 Pull 的是 Push 首先要注冊(cè)消費(fèi)監(jiān)聽(tīng)器,當(dāng)監(jiān)聽(tīng)器處觸發(fā)后才開(kāi)始消費(fèi)消息

2 原理

2.1 RocketMQ整體工作流程

簡(jiǎn)單來(lái)說(shuō),RocketMQ是一個(gè)分布式消息隊(duì)列,也就是消息隊(duì)列+分布式系統(tǒng)

作為消息隊(duì)列,它是發(fā)-存-收的一個(gè)模型,對(duì)應(yīng)的就是Producer、Broker、Cosumer;作為分布式系統(tǒng),它要有服務(wù)端、客戶端、注冊(cè)中心,對(duì)應(yīng)的就是Broker、Producer/Consumer、NameServer

所以我們看一下它主要的工作流程:RocketMQNameServer注冊(cè)中心集群、Producer生產(chǎn)者集群、Consumer消費(fèi)者集群和若干BrokerRocketMQ進(jìn)程)組成:

  • Broker在啟動(dòng)的時(shí)候去向所有的NameServer注冊(cè),并保持長(zhǎng)連接,每30s發(fā)送一次心跳
  • Producer在發(fā)送消息的時(shí)候從NameServer獲取Broker服務(wù)器地址,根據(jù)負(fù)載均衡算法選擇一臺(tái)服務(wù)器來(lái)發(fā)送消息
  • Conusmer消費(fèi)消息的時(shí)候同樣從NameServer獲取Broker地址,然后主動(dòng)拉取消息來(lái)消費(fèi)

在這里插入圖片描述

2.2 為什么RocketMQ不使用Zookeeper作為注冊(cè)中心

Kafka我們都知道采用Zookeeper作為注冊(cè)中心——當(dāng)然也開(kāi)始逐漸去ZookeeperRocketMQ不使用Zookeeper其實(shí)主要可能從這幾方面來(lái)考慮:

  • 基于可用性的考慮
    根據(jù)CAP理論,同時(shí)最多只能滿足兩個(gè)點(diǎn),而Zookeeper滿足的是CP,也就是說(shuō)Zookeeper并不能保證服務(wù)的可用性,Zookeeper在進(jìn)行選舉的時(shí)候,整個(gè)選舉的時(shí)間太長(zhǎng),期間整個(gè)集群都處于不可用的狀態(tài),而這對(duì)于一個(gè)注冊(cè)中心來(lái)說(shuō)肯定是不能接受的,作為服務(wù)發(fā)現(xiàn)來(lái)說(shuō)就應(yīng)該是為可用性而設(shè)計(jì)。
  • 基于性能的考慮
    NameServer本身的實(shí)現(xiàn)非常輕量,而且可以通過(guò)增加機(jī)器的方式水平擴(kuò)展,增加集群的抗壓能力,而Zookeeper的寫(xiě)是不可擴(kuò)展的,Zookeeper要解決這個(gè)問(wèn)題只能通過(guò)劃分領(lǐng)域,劃分多個(gè)Zookeeper集群來(lái)解決,首先操作起來(lái)太復(fù)雜,其次這樣還是又違反了CAP中的A的設(shè)計(jì),導(dǎo)致服務(wù)之間是不連通的。
  • 持久化的機(jī)制來(lái)帶的問(wèn)題
    ZooKeeper 的 ZAB 協(xié)議對(duì)每一個(gè)寫(xiě)請(qǐng)求,會(huì)在每個(gè)ZooKeeper節(jié)點(diǎn)上保持寫(xiě)一個(gè)事務(wù)日志,同時(shí)再加上定期的將內(nèi)存數(shù)據(jù)鏡像(Snapshot)到磁盤(pán)來(lái)保證數(shù)據(jù)的一致性和持久性,而對(duì)于一個(gè)簡(jiǎn)單的服務(wù)發(fā)現(xiàn)的場(chǎng)景來(lái)說(shuō),這其實(shí)沒(méi)有太大的必要,這個(gè)實(shí)現(xiàn)方案太重了。而且本身存儲(chǔ)的數(shù)據(jù)應(yīng)該是高度定制化的。
  • 消息發(fā)送應(yīng)該依賴注冊(cè)中心
    RocketMQ的設(shè)計(jì)理念也正是基于此,生產(chǎn)者在第一次發(fā)送消息的時(shí)候從NameServer獲取到Broker地址后緩存到本地,如果NameServer整個(gè)集群不可用,短時(shí)間內(nèi)對(duì)于生產(chǎn)者和消費(fèi)者并不會(huì)產(chǎn)生太大影響。

2.3 Broker保存數(shù)據(jù)(CommitLog,ConsumeQueue,Indexfile)

RocketMQ主要的存儲(chǔ)文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件

在這里插入圖片描述

在這里插入圖片描述

  • CommitLog:消息主體以及元數(shù)據(jù)的存儲(chǔ)主體,存儲(chǔ)Producer端寫(xiě)入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G, 文件名長(zhǎng)度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€(gè)文件寫(xiě)滿了,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募?,?dāng)文件滿了,寫(xiě)入下一個(gè)文件。

CommitLog文件保存于${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來(lái)文件名的偏移量,每個(gè)文件默認(rèn)1G,寫(xiě)滿后自動(dòng)生成一個(gè)新的文件。

在這里插入圖片描述

  • ConsumeQueue:消息消費(fèi)隊(duì)列,引入的目的主要是提高消息消費(fèi)的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。 Consumer即可根據(jù)ConsumeQueue來(lái)查找待消費(fèi)的消息。其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息TagHashCodeConsumeQueue文件可以看成是基于TopicCommitLog索引文件,故ConsumeQueue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName} 同樣ConsumeQueue文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié),分別為8字節(jié)的CommitLog物理偏移量、4字節(jié)的消息長(zhǎng)度、8字節(jié)tag hashcode,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目,每個(gè)ConsumeQueue文件大小約5.72M

在這里插入圖片描述

  • IndexFileIndexFile(索引文件)提供了一種可以通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法。Index文件的存儲(chǔ)位置是:{fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故RocketMQ的索引文件其底層實(shí)現(xiàn)為hash索引

在這里插入圖片描述

總結(jié)一下:RocketMQ采用的是混合型的存儲(chǔ)結(jié)構(gòu),即為Broker單個(gè)實(shí)例下所有的隊(duì)列共用一個(gè)日志數(shù)據(jù)文件(即為CommitLog)來(lái)存儲(chǔ)。

RocketMQ的混合型存儲(chǔ)結(jié)構(gòu)(多個(gè)Topic的消息實(shí)體內(nèi)容都存儲(chǔ)于一個(gè)CommitLog中)針對(duì)ProducerConsumer分別采用了數(shù)據(jù)索引部分相分離的存儲(chǔ)結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對(duì)消息刷盤(pán)持久化,保存至CommitLog中。

只要消息被刷盤(pán)持久化至磁盤(pán)文件CommitLog中,那么Producer發(fā)送的消息就不會(huì)丟失。正因?yàn)槿绱耍?code>Consumer也就肯定有機(jī)會(huì)去消費(fèi)這條消息。當(dāng)無(wú)法拉取到消息后,可以等下一次消息拉取,同時(shí)服務(wù)端也支持長(zhǎng)輪詢模式,如果一個(gè)消息拉取請(qǐng)求未拉取到消息,Broker允許等待30s的時(shí)間,只要這段時(shí)間內(nèi)有新消息到達(dá),將直接返回給消費(fèi)端。 這里,RocketMQ的具體做法是,使用Broker端的后臺(tái)服務(wù)線程—ReputMessageService不停地分發(fā)請(qǐng)求并異步構(gòu)建ConsumeQueue(邏輯消費(fèi)隊(duì)列)和IndexFile(索引文件)數(shù)據(jù)。

在這里插入圖片描述

2.4 RocketMQ怎么對(duì)文件進(jìn)行讀寫(xiě)

RocketMQ對(duì)文件的讀寫(xiě)巧妙地利用了操作系統(tǒng)的一些高效文件讀寫(xiě)方式——PageCache、順序讀寫(xiě)、零拷貝

2.4.1 PageCache、順序讀取

RocketMQ中,ConsumeQueue邏輯消費(fèi)隊(duì)列存儲(chǔ)的數(shù)據(jù)較少,并且是順序讀取,在page cache機(jī)制的預(yù)讀取作用下,Consume Queue文件的讀性能幾乎接近讀內(nèi)存,即使在有消息堆積情況下也不會(huì)影響性能。而對(duì)于CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件來(lái)說(shuō),讀取消息內(nèi)容時(shí)候會(huì)產(chǎn)生較多的隨機(jī)訪問(wèn)讀取,嚴(yán)重影響性能。如果選擇合適的系統(tǒng)IO調(diào)度算法,比如設(shè)置調(diào)度算法為Deadline(此時(shí)塊存儲(chǔ)采用SSD的話),隨機(jī)讀的性能也會(huì)有所提升。

頁(yè)緩存(PageCache)是OS對(duì)文件的緩存,用于加速對(duì)文件的讀寫(xiě)。一般來(lái)說(shuō),程序?qū)ξ募M(jìn)行順序讀寫(xiě)的速度幾乎接近于內(nèi)存的讀寫(xiě)速度,主要原因就是由于OS使用PageCache機(jī)制對(duì)讀寫(xiě)訪問(wèn)操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache。對(duì)于數(shù)據(jù)的寫(xiě)入,OS會(huì)先寫(xiě)入至Cache內(nèi),隨后通過(guò)異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤(pán)至物理磁盤(pán)上。對(duì)于數(shù)據(jù)的讀取,如果一次讀取文件時(shí)出現(xiàn)未命中PageCache的情況,OS從物理磁盤(pán)上訪問(wèn)讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取

2.4.2 零拷貝

RocketMQ主要通過(guò)MappedByteBuffer對(duì)文件進(jìn)行讀寫(xiě)操作。其中,利用了NIO中的FileChannel模型將磁盤(pán)上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種Mmap的方式減少了傳統(tǒng)IO,將磁盤(pán)文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū),和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來(lái)回進(jìn)行拷貝的性能開(kāi)銷),將對(duì)文件的操作轉(zhuǎn)化為直接對(duì)內(nèi)存地址進(jìn)行操作,從而極大地提高了文件的讀寫(xiě)效率(正因?yàn)樾枰褂脙?nèi)存映射機(jī)制,故RocketMQ的文件存儲(chǔ)都使用定長(zhǎng)結(jié)構(gòu)來(lái)存儲(chǔ),方便一次將整個(gè)文件映射至內(nèi)存)。

什么是零拷貝 在操作系統(tǒng)中,使用傳統(tǒng)的方式,數(shù)據(jù)需要經(jīng)歷幾次拷貝,還要經(jīng)歷用戶態(tài)/內(nèi)核態(tài)切換

  • 從磁盤(pán)復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存;
  • 從內(nèi)核態(tài)內(nèi)存復(fù)制到用戶態(tài)內(nèi)存;
  • 然后從用戶態(tài)內(nèi)存復(fù)制到網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存;
  • 最后是從網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存復(fù)制到網(wǎng)卡中進(jìn)行傳輸。

在這里插入圖片描述

所以,可以通過(guò)零拷貝的方式,減少用戶態(tài)與內(nèi)核態(tài)的上下文切換和內(nèi)存拷貝的次數(shù),用來(lái)提升I/O的性能。零拷貝比較常見(jiàn)的實(shí)現(xiàn)方式是mmap,這種機(jī)制在Java中是通過(guò)MappedByteBuffer實(shí)現(xiàn)的。

在這里插入圖片描述

2.5 消息刷盤(pán)怎么實(shí)現(xiàn)

RocketMQ提供了兩種刷盤(pán)策略:同步刷盤(pán)異步刷盤(pán)

  • ???????同步刷盤(pán):在消息達(dá)到Broker的內(nèi)存之后,必須刷到commitLog日志文件中才算成功,然后返回Producer數(shù)據(jù)已經(jīng)發(fā)送成功。
  • 異步刷盤(pán):異步刷盤(pán)是指消息達(dá)到Broker內(nèi)存后就返回Producer數(shù)據(jù)已經(jīng)發(fā)送成功,會(huì)喚醒一個(gè)線程去將數(shù)據(jù)持久化到CommitLog日志文件中

Broker在消息的存取時(shí)直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無(wú)法避免機(jī)器掉電時(shí)數(shù)據(jù)丟失,所以需要持久化到磁盤(pán)中

刷盤(pán)的最終實(shí)現(xiàn)都是使用NIO中的 MappedByteBuffer.force() 將映射區(qū)的數(shù)據(jù)寫(xiě)入到磁盤(pán),如果是同步刷盤(pán)的話,在Broker把消息寫(xiě)到CommitLog映射區(qū)后,就會(huì)等待寫(xiě)入完成 異步而言,只是喚醒對(duì)應(yīng)的線程,不保證執(zhí)行的時(shí)機(jī),流程如圖所示。

在這里插入圖片描述

2.6 RocketMQ的負(fù)載均衡

RocketMQ中的負(fù)載均衡都在Client端完成,具體來(lái)說(shuō)的話,主要可以分為Producer端發(fā)送消息時(shí)候的負(fù)載均衡和Consumer端訂閱消息的負(fù)載均衡。

2.6.1 Producer的負(fù)載均衡

Producer端在發(fā)送消息的時(shí)候,會(huì)先根據(jù)Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認(rèn)方式下selectOneMessageQueue()方法會(huì)從TopicPublishInfo中的messageQueueList中選擇一個(gè)隊(duì)列(MessageQueue)進(jìn)行發(fā)送消息。具這里有一個(gè)sendLatencyFaultEnable開(kāi)關(guān)變量,如果開(kāi)啟,在隨機(jī)遞增取模的基礎(chǔ)上,再過(guò)濾掉not availableBroker代理。

Producer負(fù)載均衡:索引遞增隨機(jī)取模
public MessageQueue selectOneMessageQueue(){
	//索引遞增
	int index = this.sendWhichQueue.incrementAndGet();
	//利用索引取隨機(jī)數(shù),取余數(shù)
	int pos = Math.abs(index) % this.messageQueueList.size();
	if(pos<0){
		pos=0;	
	}
	return this.messageQueueList.get(pos);
}

所謂的latencyFaultTolerance,是指對(duì)之前失敗的,按一定的時(shí)間做退避。例如,如果上次請(qǐng)求的latency超過(guò)550Lms,就退避3000Lms;超過(guò)1000L,就退避60000L;如果關(guān)閉,采用隨機(jī)遞增取模的方式選擇一個(gè)隊(duì)列(MessageQueue)來(lái)發(fā)送消息,latencyFaultTolerance機(jī)制是實(shí)現(xiàn)消息發(fā)送高可用的核心關(guān)鍵所在。

2.6.2 Consumer的負(fù)載均衡

RocketMQ中,Consumer端的兩種消費(fèi)模式(Push/Pull)都是基于拉模式來(lái)獲取消息的,而在Push模式只是對(duì)pull模式的一種封裝,其本質(zhì)實(shí)現(xiàn)為消息拉取線程在從服務(wù)器拉取到一批消息后,然后提交到消息消費(fèi)線程池后,又“馬不停蹄”的繼續(xù)向服務(wù)器再次嘗試?yán)∠?。如果未拉取到消息,則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費(fèi)方式(Push/Pull)中,均需要Consumer端知道從Broker端的哪一個(gè)消息隊(duì)列中去獲取消息。因此,有必要在Consumer端來(lái)做負(fù)載均衡,即Broker端中多個(gè)MessageQueue分配給同一個(gè)ConsumerGroup中的哪些Consumer消費(fèi)。

  1. Consumer端的心跳包發(fā)送 在Consumer啟動(dòng)后,它就會(huì)通過(guò)定時(shí)任務(wù)不斷地向RocketMQ集群中的所有Broker實(shí)例發(fā)送心跳包(其中包含了,消息消費(fèi)分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后,會(huì)將它維護(hù)在ConsumerManager的本地緩存變量—consumerTable,同時(shí)并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負(fù)載均衡提供可以依據(jù)的元數(shù)據(jù)信息。
  2. Consumer端實(shí)現(xiàn)負(fù)載均衡的核心類—RebalanceImplConsumer實(shí)例的啟動(dòng)流程中的啟動(dòng)MQClientInstance實(shí)例部分,會(huì)完成負(fù)載均衡服務(wù)線程—RebalanceService的啟動(dòng)(每隔20s執(zhí)行一次)。 通過(guò)查看源碼可以發(fā)現(xiàn),RebalanceService線程的run()方法最終調(diào)用的是RebalanceImpl類的rebalanceByTopic()方法,這個(gè)方法是實(shí)現(xiàn)Consumer端負(fù)載均衡的核心。 rebalanceByTopic()方法會(huì)根據(jù)消費(fèi)者通信類型為廣播模式還是集群模式做不同的邏輯處理

2.7 RocketMQ消息長(zhǎng)輪詢

所謂的長(zhǎng)輪詢,就是Consumer拉取消息,如果對(duì)應(yīng)的Queue如果沒(méi)有數(shù)據(jù),Broker不會(huì)立即返回,而是把 PullReuqest hold起來(lái),等待 queue消息后,或者長(zhǎng)輪詢阻塞時(shí)間到了,再重新處理該 queue 上的所有 PullRequest

在這里插入圖片描述

PullMessageProcessor#processRequest

 //如果沒(méi)有拉到數(shù)據(jù)
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允許 suspend,默認(rèn)開(kāi)啟
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
          pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
     }
    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    //封裝一個(gè)PullRequest
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
   //把PullRequest掛起來(lái)
   this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
     response = null;
     break;
}

掛起的請(qǐng)求,有一個(gè)服務(wù)線程會(huì)不停地檢查,看queue中是否有數(shù)據(jù),或者超時(shí)。

PullRequestHoldService#run()

@Override
public void run() {
   log.info("{} service started", this.getServiceName());
   while (!this.isStopped()) {
       try {
          if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                 this.waitForRunning(5 * 1000);
          } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
         long beginLockTimestamp = this.systemClock.now();
         //檢查hold住的請(qǐng)求
         this.checkHoldRequest();
         long costTime = this.systemClock.now() - beginLockTimestamp;
         if (costTime > 5 * 1000) {
              log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
         }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
	log.info("{} service end", this.getServiceName());
    }

到此這篇關(guān)于深入講解RocketMQ原理的文章就介紹到這了,更多相關(guān)RocketMQ原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Springboot mybais配置多數(shù)據(jù)源過(guò)程解析

    Springboot mybais配置多數(shù)據(jù)源過(guò)程解析

    這篇文章主要介紹了Springboot+mybais配置多數(shù)據(jù)源過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • Spring?@EventListener?異步中使用condition的問(wèn)題及處理

    Spring?@EventListener?異步中使用condition的問(wèn)題及處理

    這篇文章主要介紹了Spring?@EventListener?異步中使用condition的問(wèn)題及處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringBoot如何接收前端傳遞參數(shù)

    SpringBoot如何接收前端傳遞參數(shù)

    這篇文章主要介紹了SpringBoot如何接收前端傳遞參數(shù),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2024-08-08
  • Java中File、Base64、MultipartFile之間相互轉(zhuǎn)換的代碼詳解

    Java中File、Base64、MultipartFile之間相互轉(zhuǎn)換的代碼詳解

    File、Base64和MultipartFile都是在編程中常用的類或者數(shù)據(jù)類型,用于處理文件和數(shù)據(jù)的存儲(chǔ)、傳輸和轉(zhuǎn)換等操作,本文將給大家介紹了Java中File、Base64、MultipartFile之間相互轉(zhuǎn)換,文中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下
    2024-04-04
  • javaWeb使用servlet搭建服務(wù)器入門(mén)

    javaWeb使用servlet搭建服務(wù)器入門(mén)

    這篇文章主要為大家詳細(xì)介紹了javaWeb使用servlet搭建服務(wù)器入門(mén),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-11-11
  • springboot 如何解決static調(diào)用service為null

    springboot 如何解決static調(diào)用service為null

    這篇文章主要介紹了springboot 如何解決static調(diào)用service為null的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Java設(shè)計(jì)模式之代理模式詳解

    Java設(shè)計(jì)模式之代理模式詳解

    這篇文章主要介紹了Java設(shè)計(jì)模式之代理模式詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很好的幫助,需要的朋友可以參考下
    2021-05-05
  • Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例

    Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例

    本篇文章主要介紹了Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例,具有一定的參考價(jià)值,有興趣的可以了解一下
    2017-08-08
  • 基于Spring5實(shí)現(xiàn)登錄注冊(cè)功能

    基于Spring5實(shí)現(xiàn)登錄注冊(cè)功能

    這篇文章主要為大家詳細(xì)介紹了基于Spring5實(shí)現(xiàn)登錄注冊(cè)功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • JAVA實(shí)現(xiàn)經(jīng)典游戲坦克大戰(zhàn)的示例代碼

    JAVA實(shí)現(xiàn)經(jīng)典游戲坦克大戰(zhàn)的示例代碼

    小時(shí)候大家都玩過(guò)坦克大戰(zhàn)吧,熟悉的旋律和豐富的關(guān)卡陪伴了我們一整個(gè)寒暑假。本文將通過(guò)Java+Swing實(shí)現(xiàn)這一經(jīng)典游戲,感興趣的可以學(xué)習(xí)一下
    2022-01-01

最新評(píng)論