欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

分布式消息隊(duì)列RocketMQ概念詳解

 更新時間:2023年05月09日 14:19:17   作者:山河亦問安  
RocketMQ?是阿里開源的分布式消息中間件,跟其它中間件相比,RocketMQ?的特點(diǎn)是純JAVA實(shí)現(xiàn),是一套提供了消息生產(chǎn),存儲,消費(fèi)全過程API的軟件系統(tǒng),本文詳細(xì)介紹了分布式消息隊(duì)列RocketMQ概念,需要的朋友可以參考下

1.MQ概述

1.1 RocketMQ簡介

RocketMQ 是阿里開源的分布式消息中間件,跟其它中間件相比,RocketMQ 的特點(diǎn)是純JAVA實(shí)現(xiàn),是一套提供了消息生產(chǎn),存儲,消費(fèi)全過程API的軟件系統(tǒng)。

1.2 MQ用途

限流削峰

MQ可以將系統(tǒng)的超量請求暫存其中,以便系統(tǒng)后期可以慢慢進(jìn)行處理,從而避免了請求的丟失或系統(tǒng)被壓垮。

 異步解耦

上游系統(tǒng)對下游系統(tǒng)的調(diào)用若為同步調(diào)用,則會大大降低系統(tǒng)的吞吐量與并發(fā)度,且系統(tǒng)耦合度太高、而異步調(diào)用則會解決這些問題。所以兩層之間若要實(shí)現(xiàn)由同步到異步的轉(zhuǎn)化,一般性做法就是,在這兩層間添加一個MQ層。

 數(shù)據(jù)收集

分布式系統(tǒng)會產(chǎn)生海量級數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控數(shù)據(jù)、用戶行為等。針對這些數(shù)據(jù)流進(jìn)行實(shí)時或批量采集匯總,然后對這些數(shù)據(jù)流進(jìn)行大數(shù)據(jù)分析,這是當(dāng)前互聯(lián)網(wǎng)平臺的必備技術(shù)。通過MQ完成此類數(shù)據(jù)收集是最好的選擇。

1.3 常見MQ產(chǎn)品

RabbitMQ
RabbitMQ是使用ErLang語言開發(fā)的一款MQ產(chǎn)品。其吞吐量較Kafka與RocketMQ要低,且由于其不是Java語言開發(fā),所以公司內(nèi)部對其實(shí)現(xiàn)定制化開發(fā)難度較大。
Kafka
Kafka是使用Scala/Java語言開發(fā)的一款MQ產(chǎn)品。其最大的特點(diǎn)就是高吞吐量,常用于大數(shù)據(jù)領(lǐng)域的實(shí)時計算、日志采集等場景。其沒有遵循任何常見的MQ協(xié)議,而是使用自研協(xié)議。
RocketMQ
RocketMQ是使用Java語言開發(fā)的一款MQ產(chǎn)品。經(jīng)過數(shù)年阿里雙11的考驗(yàn),性能與穩(wěn)定性非常高。其沒有遵循任何常見的MQ協(xié)議,而是使用自研協(xié)議。

對比

2.RocketMQ 基本概念

2.1 消息

消息是指,消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個主題。單個消息所占空間不會很大。

RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業(yè)務(wù)標(biāo)識的Key,以方便對消息的查詢。不過需要注意的是,MessageId有兩個:在生產(chǎn)者send()消息時會自動生成一個MessageId(msgId),當(dāng)消息到達(dá)Broker后,Broker也會自動生成一個MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標(biāo)識。 

msgId:由producer端生成,其生成規(guī)則為: producerIp + 進(jìn)程pid + MessageClientIDSetter類的ClassLoader的hashCode + 當(dāng)前時間 + AutomicInteger自增計數(shù)器 
offsetMsgId:由broker端生成,其生成規(guī)則為:brokerIp + 物理分區(qū)的offset(Queue中的偏移量) 
key:由用戶指定的業(yè)務(wù)相關(guān)的唯一標(biāo)識

2.2 主題

Topic表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進(jìn)行消息訂閱的基本單位。 一個生產(chǎn)者可以同時發(fā)送多種Topic的消息;而一個消費(fèi)者只對某種特定的Topic感興趣,即只可以訂閱和消費(fèi)一種Topic的消息。 

2.3 標(biāo)簽

標(biāo)簽為消息設(shè)置的標(biāo)簽,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。 標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。 Topic是消息的一級分類,Tag是消息的二級分類。Topic相當(dāng)于貨物,Tag相當(dāng)于上海山東等地區(qū)。

2.4 隊(duì)列

存儲消息的物理實(shí)體。 一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。 一個Topic的Queue也被稱為一個Topic中消息的分區(qū)(Partition)。 一個Topic的Queue中的消息只能被一個消費(fèi)者組中的一個消費(fèi)者消費(fèi)。 一個Queue中的消息不允許同一個消費(fèi)者組中的多個消費(fèi)者同時消費(fèi)。

分片不同于分區(qū)。在RocketMQ中,分片指的是存放相應(yīng)Topic的Broker。每個分片中會創(chuàng)建出相應(yīng)數(shù)量的分區(qū),即Queue,每個Queue的大小都是相同的。

2.5 Producer

消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息。Producer通過MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊(duì)列進(jìn)行消息投遞,投遞的過程支持快速失敗并且低延遲。  例如:用戶提交的請求寫入到MQ的過程,就是消息生產(chǎn)的過程,在這里用戶就是生產(chǎn)者 。

RocketMQ中的消息生產(chǎn)者都是以生產(chǎn)者組(Producer Group)的形式出現(xiàn)的。生產(chǎn)者組是同一類生產(chǎn)者的集合,這類Producer發(fā)送相同Topic類型的消息。一個生產(chǎn)者組可以同時發(fā)送多個主題的消息。如果主題中有多個隊(duì)列,生產(chǎn)者組只有一個生產(chǎn)者,生產(chǎn)者會采取輪詢的方式進(jìn)行發(fā)送消息。

生產(chǎn)者代碼如下:

導(dǎo)入依賴

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

 生產(chǎn)者代碼

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer order = new DefaultMQProducer("order");
        order.setNamesrvAddr("localhost:9876");
        order.start();
        Message message = new Message("myTopic", "myTag", ("test").getBytes());
        SendResult result = order.send(message);
        System.out.println(result);
        order.shutdown();
    }

2.6 Consumer

消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息。一個消息消費(fèi)者會從Broker服務(wù)器中獲取到消息,并對消息進(jìn)行相關(guān)業(yè)務(wù)處理。  例如:系統(tǒng)從MQ中讀取到請求,并對請求進(jìn)行處理的過程就是消息消費(fèi)的過程,在這里系統(tǒng)就是消費(fèi)者。  

RocketMQ中的消息消費(fèi)者都是以消費(fèi)者組(Consumer Group)的形式出現(xiàn)的。消費(fèi)者組是同一類消費(fèi)者的集合,這類Consumer消費(fèi)的是同一個Topic類型的消息。 消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,并不是將消息負(fù)載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接著消費(fèi)原Consumer消費(fèi)的Queue)的目標(biāo)變得非常容易。

消費(fèi)者代碼

  public static void main(String[] args) throws MQClientException {
 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("myTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("收到的消息"+list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
 
    }

負(fù)載均衡策略

queue 個數(shù)大于 Consumer個數(shù), 那么 Consumer 會平均分配 queue,不夠平均,會根據(jù)clientId排序來拿取余數(shù)
queue個數(shù)小于Consumer個數(shù),那么會有Consumer閑置,就是浪費(fèi)掉了,其余Consumer平均分配到queue

消費(fèi)者組中Consumer的數(shù)量應(yīng)該小于等于訂閱Topic的Queue數(shù)量。如果超出Queue數(shù)量,則多出的Consumer將不能消費(fèi)消息。

2.7 NameServer

NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。 
主要包括兩個功能: 
Broker管理:接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù);提供心跳檢測機(jī)制,檢查Broker是否還存活。

路由信息管理:每個NameServer中都保存著Broker集群的整個路由信息和用于客戶端查詢的隊(duì)列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。NameServer可以獲取整個Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。 

路由注冊 

Name Server既然是注冊中心,那么是如何完成注冊的呢? NameServer通常也是以集群的方式部署,不過,NameServer是無狀態(tài)的,即NameServer集群中的各個節(jié)點(diǎn)間是無差異的,各節(jié)點(diǎn)間相互不進(jìn)行信息通訊。 那各節(jié)點(diǎn)中的數(shù)據(jù)是如何進(jìn)行數(shù)據(jù)同步的呢?在Broker節(jié)點(diǎn)啟動時,輪詢NameServer列表,與每個NameServer節(jié)點(diǎn)建立長連接,發(fā)起注冊請求。在NameServer內(nèi)部維護(hù)著?個Broker列表,用來動態(tài)存儲Broker的信息。  

Broker節(jié)點(diǎn)為了證明自己是活著的,為了維護(hù)與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發(fā)送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會更新心跳時間戳,記錄這個Broker的最新存活時間。 

路由剔除 
由于Broker關(guān)機(jī)、宕機(jī)或網(wǎng)絡(luò)抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除。 NameServer中有?個定時任務(wù),每隔10秒就會掃描?次Broker表,查看每一個Broker的最新心跳時間戳距離當(dāng)前時間是否超過120秒,如果超過,則會判定Broker失效,然后將其從Broker列表中剔除。 

路由發(fā)現(xiàn) 
RocketMQ的路由發(fā)現(xiàn)采用的是Pull模型。當(dāng)Topic路由信息出現(xiàn)變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取Topic最新的路由。 默認(rèn)客戶端每30秒會拉取一次最新的路由。

2.8 Broker

Broker充當(dāng)著消息中轉(zhuǎn)角色,負(fù)責(zé)存儲消息、轉(zhuǎn)發(fā)消息。
Broker在RocketMQ系統(tǒng)中負(fù)責(zé)接收并存儲從生產(chǎn)者發(fā)送來的消息,同時為消費(fèi)者的拉取請求作準(zhǔn)備。Broker同時也存儲著消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組消費(fèi)進(jìn)度偏移offset、主題、隊(duì)列等。

模塊如下圖:

Remoting Module:整個Broker的實(shí)體,負(fù)責(zé)處理來自clients端的請求。而這個Broker實(shí)體則由以下模塊構(gòu)成。

Client Manager:客戶端管理器。負(fù)責(zé)接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例如,維護(hù)Consumer的Topic訂閱信息

Store Service:存儲服務(wù)。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。

HA Service:高可用服務(wù),提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能。

Index Service:索引服務(wù)。根據(jù)特定的Message key,對投遞到Broker的消息進(jìn)行索引服務(wù),同時也提供根據(jù)Message Key對消息進(jìn)行快速查詢的功能。

2.9 RocketMQ 工作流程

工作流程如下圖:

1)啟動NameServer,NameServer啟動后開始監(jiān)聽端口,等待Broker、Producer、Consumer連接。

2)啟動Broker時,Broker會與所有的NameServer建立并保持長連接,然后每50秒向NameServer定時發(fā)送心跳包。

3)發(fā)送消息前,可以先創(chuàng)建Topic,創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,當(dāng)然,在創(chuàng)建Topic時也會將Topic與Broker的關(guān)系寫入到NameServer中。不過,這步是可選的,也可以在發(fā)送消息時自動創(chuàng)建Topic。

4) Producer發(fā)送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取路由信息,即當(dāng)前發(fā)送的Topic消息的Queue與Broker的地址(IP+Port)的映射關(guān)系。然后根據(jù)算法策略從隊(duì)選擇一個Queue,與隊(duì)列所在的Broker建立長連接從而向Broker發(fā)消息。當(dāng)然,在獲取到路由信息后,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。

5)Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取其所訂閱Topic的路由信息,然后根據(jù)算法策略從路由信息中獲取到其所要消費(fèi)的Queue,然后直接跟Broker建立長連接,開始消費(fèi)其中的消息。Consumer在獲取到路由信息后,同樣也會每30秒從NameServer更新一次路由信息。不過不同于Producer的是,Consumer還會向Broker發(fā)送心跳,以確保Broker的存活狀態(tài)。

以上就是分布式消息隊(duì)列RocketMQ概念詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ概念的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論