欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解

 更新時(shí)間:2023年10月11日 09:22:55   作者:金甲蟲(chóng)Scarb  
這篇文章主要介紹了RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解,RocketMQ是一款高性能、高可靠性的分布式消息中間件,消費(fèi)者是RocketMQ中的重要組成部分,消費(fèi)者負(fù)責(zé)從消息隊(duì)列中獲取消息并進(jìn)行處理,需要的朋友可以參考下

1. 背景

RocketMQ 的消費(fèi)可以算是 RocketMQ 的業(yè)務(wù)邏輯中最復(fù)雜的一塊。這里面涉及到許多消費(fèi)模式和特性。本想一篇文章寫(xiě)完,寫(xiě)到后面發(fā)現(xiàn)消費(fèi)涉及到的內(nèi)容太多,于是決定分多篇來(lái)寫(xiě)。本文作為消費(fèi)系列的第一篇,主要講述 RocketMQ 消費(fèi)涉及到的模式和特性,也會(huì)概括性地講一下消費(fèi)流程。

我將 RocketMQ 的消費(fèi)流程大致分成 4 個(gè)步驟

重平衡消費(fèi)者拉取消息Broker 接收拉取請(qǐng)求后從存儲(chǔ)中查詢消息并返回消費(fèi)者消費(fèi)消息

每個(gè)步驟都會(huì)用一篇文章來(lái)講解。

先了解一下 RocketMQ 消費(fèi)涉及到地概念

2. 概念簡(jiǎn)述

2.1 消費(fèi)組概念與消費(fèi)模式

和大多數(shù)消息隊(duì)列一樣,RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。在了解它們之前,需要先引入消費(fèi)組的概念。

2.1.1 消費(fèi)組

一個(gè)消費(fèi)者實(shí)例即是一個(gè)消費(fèi)者進(jìn)程,負(fù)責(zé)消費(fèi)消息。單個(gè)消費(fèi)者速度有限,在實(shí)際使用中通常會(huì)采用多個(gè)消費(fèi)者共同消費(fèi)同樣的 Topic 以加快消費(fèi)速度。這多個(gè)消費(fèi)同樣 Topic 的消費(fèi)者組成了消費(fèi)者組。

消費(fèi)組是一個(gè)邏輯概念,它包含了多個(gè)同一類(lèi)的消費(fèi)者實(shí)例,通常這些消費(fèi)者都消費(fèi)同一類(lèi)消息(都消費(fèi)相同的 Topic)且消費(fèi)邏輯一致。

消費(fèi)組的引入是用來(lái)在消費(fèi)消息時(shí)更好地進(jìn)行負(fù)載均衡和容錯(cuò)。

2.1.2 廣播消費(fèi)模式(BROADCASTING)

廣播消費(fèi)模式即全部的消息會(huì)廣播分發(fā)到所有的消費(fèi)者實(shí)例,每個(gè)消費(fèi)者實(shí)例會(huì)收到全量的消息(即便消費(fèi)組中有多個(gè)消費(fèi)者都訂閱同一 Topic)。

如下圖所示,生產(chǎn)者發(fā)送了 5 條消息,每個(gè)消費(fèi)組中的消費(fèi)者都收到全部的 5 條消息。

廣播模式使用較少,適合各個(gè)消費(fèi)者都需要通知的場(chǎng)景,如刷新應(yīng)用中的緩存。

廣播消費(fèi)模式

注意事項(xiàng):

  1. 廣播消費(fèi)模式下不支持 順序消息。
  2. 廣播消費(fèi)模式下不支持 重置消費(fèi)位點(diǎn)。
  3. 每條消息都需要被相同訂閱邏輯的多臺(tái)機(jī)器處理。
  4. 消費(fèi)進(jìn)度在客戶端維護(hù),出現(xiàn)重復(fù)消費(fèi)的概率稍大于集群模式。如果消費(fèi)進(jìn)度文件丟失,存在消息丟失的可能。
  5. 廣播模式下,消息隊(duì)列 RocketMQ 版保證每條消息至少被每臺(tái)客戶端消費(fèi)一次,但是并不會(huì)重投消費(fèi)失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費(fèi)失敗的情況。
  6. 廣播模式下,客戶端每一次重啟都會(huì)從最新消息消費(fèi)。客戶端在被停止期間發(fā)送至服務(wù)端的消息將會(huì)被自動(dòng)跳過(guò),請(qǐng)謹(jǐn)慎選擇。
  7. 廣播模式下,每條消息都會(huì)被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。
  8. 廣播模式下服務(wù)端不維護(hù)消費(fèi)進(jìn)度,所以消息隊(duì)列 RocketMQ 版控制臺(tái)不支持消息堆積查詢、消息堆積報(bào)警和訂閱關(guān)系查詢功能。

2.1.3 集群消費(fèi)模式(CLUSTERING)

集群消費(fèi)模式下,同一 Topic 下的一條消息只會(huì)被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說(shuō),消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。

更具體一點(diǎn),在同一消費(fèi)組中的不同消費(fèi)者會(huì)根據(jù)負(fù)載機(jī)制來(lái)平均地訂閱 Topic 中的每個(gè) Queue。(默認(rèn) AVG 負(fù)載方式)

廣播消費(fèi)模式

RocketMQ 默認(rèn)使用集群消費(fèi)模式,這也是大部分場(chǎng)景下會(huì)使用到的消費(fèi)模式。

2.2 消費(fèi)者拉取消息模式

2.2.1 Pull

指消費(fèi)者主動(dòng)拉取消息進(jìn)行消費(fèi),主動(dòng)從 Broker 拉取消息,主動(dòng)權(quán)由消費(fèi)者應(yīng)用控制。

2.2.2 Push

Broker 主動(dòng)將消息 Push 給消費(fèi)者,Broker 收到消息就會(huì)主動(dòng)推送到消費(fèi)者端。該模式的消費(fèi)實(shí)時(shí)性較高,也是主流場(chǎng)景中普遍采用的消費(fèi)形式。

消費(fèi)者組中的消費(fèi)者實(shí)例會(huì)根據(jù)預(yù)設(shè)的負(fù)載均衡算法對(duì) Topic 中的 Queue 進(jìn)行均勻的訂閱,每個(gè) Queue 最多只能被一個(gè)消費(fèi)者訂閱。

在 RocketMQ 中,Push 消費(fèi)其實(shí)也是由 Pull 消費(fèi)(拉?。?shí)現(xiàn)。Push 消費(fèi)只是通過(guò)客戶端 API 層面的封裝讓用戶感覺(jué)像是 Broker 在推送消息給消費(fèi)者。

2.2.3 POP

RocketMQ 5.0 引入的新消費(fèi)形式,是 Pull 拉取的另一種實(shí)現(xiàn)。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分別消費(fèi)重試 Topic 和普通 Topic)。

POP 與 Pull 可以通過(guò)一個(gè)開(kāi)關(guān)實(shí)時(shí)進(jìn)行切換。POP 模式下,Broker 來(lái)控制每個(gè)消費(fèi)者消費(fèi)的隊(duì)列和拉取的消息,把重平衡邏輯從客戶端移到了服務(wù)端。

主要解決了原來(lái) Push 模式消費(fèi)的以下痛點(diǎn):

富客戶端:客戶端邏輯比較重,多語(yǔ)言支持不友好隊(duì)列獨(dú)占:Topic 中的一個(gè) Queue 最多只能被 1 個(gè) Push 消費(fèi)者消費(fèi),消費(fèi)者數(shù)量無(wú)法無(wú)限擴(kuò)展。且消費(fèi)者 hang 住時(shí)該隊(duì)列的消息會(huì)堆積。消費(fèi)后更新 offset:本地消費(fèi)成功才會(huì)提交 offset

RocketMQ 5.0 的輕量化 gRPC 客戶端就是基于 POP 消費(fèi)模式開(kāi)發(fā)

2.3 隊(duì)列負(fù)載機(jī)制與重平衡

在集群消費(fèi)模式下,消費(fèi)組中的消費(fèi)者共同消費(fèi)訂閱的 Topic 中的所有消息,這里就存在 Topic 中的隊(duì)列如何分配給消費(fèi)者的問(wèn)題。

2.3.1 隊(duì)列負(fù)載機(jī)制

RocketMQ Broker 中的隊(duì)列負(fù)載機(jī)制將一個(gè) Topic 的不同隊(duì)列按照算法盡可能平均地分配給消費(fèi)者組中的所有消費(fèi)者。RocketMQ 預(yù)設(shè)了多種負(fù)載算法供不同場(chǎng)景下的消費(fèi)。

AVG:將隊(duì)列按數(shù)量平均分配給多個(gè)消費(fèi)者,按 Broker 順序先分配第一個(gè) Broker 的所有隊(duì)列給第一個(gè)消費(fèi)者,然后給第二個(gè)。

AVG_BY_CIRCLE:將 Broker 上的隊(duì)列輪流分給不同消費(fèi)者,更適用于 Topic 在不同 Broker 之間分布不均勻的情況。

默認(rèn)采用 AVG 負(fù)載方式。

2.3.2 重平衡(Rebalance)

為消費(fèi)者分配隊(duì)列消費(fèi)的這一個(gè)負(fù)載過(guò)程并不是一勞永逸的,比如當(dāng)消費(fèi)者數(shù)量變化、Broker 掉線等情況發(fā)生后,原先的負(fù)載就變得不再均衡,此時(shí)就需要重新進(jìn)行負(fù)載均衡,這一過(guò)程被稱為重平衡機(jī)制。

每隔 20s,RocketMQ 會(huì)進(jìn)行一次檢查,檢查隊(duì)列數(shù)量、消費(fèi)者數(shù)量是否發(fā)生變化,如果變化則觸發(fā)消費(fèi)隊(duì)列重平衡,重新執(zhí)行上述負(fù)載算法。

2.4 消費(fèi)端高可靠

2.4.1 重試-死信機(jī)制

在實(shí)際使用中,消息的消費(fèi)可能出現(xiàn)失敗。RocketMQ 擁有重試機(jī)制和死信機(jī)制來(lái)保證消息消費(fèi)的可靠性。

正常消費(fèi):消費(fèi)成功則提交消費(fèi)位點(diǎn)

重試機(jī)制:如果正常消費(fèi)失敗,消息會(huì)被消費(fèi)者發(fā)回 Broker,放入重試 Topic: %RETRY%消費(fèi)者組 。最多重試消費(fèi) 16 次,重試的時(shí)間間隔逐漸變長(zhǎng)。(消費(fèi)者組會(huì)自動(dòng)訂閱重試 Topic)。

這里地延遲重試采用了 RocketMQ 的延遲消息,重試的 16 次時(shí)間間隔為延遲消息配置的每個(gè)延遲等級(jí)的時(shí)間(從第三個(gè)等級(jí)開(kāi)始)。如果修改延遲等級(jí)時(shí)間的配置,重試的時(shí)間間隔也會(huì)相應(yīng)發(fā)生變化。但即便延遲等級(jí)時(shí)間間隔配置不足 16 個(gè),仍會(huì)重試 16 次,后面按照最大的時(shí)間間隔來(lái)重試。

死信機(jī)制:如果正常消費(fèi)和重試 16 次均失敗,消息會(huì)保存到死信 Topic %DLQ%消費(fèi)者組 中,此時(shí)需人工介入處理

2.4.2 隊(duì)列負(fù)載機(jī)制與重平衡

當(dāng)發(fā)生 Broker 掛掉或者消費(fèi)者掛掉時(shí),會(huì)引發(fā)重平衡,可以自動(dòng)感知有組件掛掉的情況并重新調(diào)整消費(fèi)者的訂閱關(guān)系。

2.5 并發(fā)消費(fèi)與順序消費(fèi)

在消費(fèi)者客戶端消費(fèi)時(shí),有兩種訂閱消息的方式,分別是并發(fā)消費(fèi)和順序消費(fèi)。廣播模式不支持順序消費(fèi),僅有集群模式能使用順序消費(fèi)。

需要注意的是,這里所說(shuō)的順序消費(fèi)指的是隊(duì)列維度的順序,即在消費(fèi)一個(gè)隊(duì)列時(shí),消費(fèi)消息的順序和消息發(fā)送的順序一致。如果一個(gè) Topic 有多個(gè)隊(duì)列, 是不可能達(dá)成 Topic 級(jí)別的順序消費(fèi)的,因?yàn)闊o(wú)法控制哪個(gè)隊(duì)列的消息被先消費(fèi)。Topic 只有一個(gè)隊(duì)列的情況下能夠?qū)崿F(xiàn) Topic 級(jí)別的順序消費(fèi)。

具體順序生產(chǎn)和消費(fèi)代碼見(jiàn) 官方文檔。

順序生產(chǎn)的方式為串行生產(chǎn),并在生產(chǎn)時(shí)指定隊(duì)列。

并發(fā)消費(fèi)的方式是調(diào)用消費(fèi)者的指定 MessageListenerConcurrently 作為消費(fèi)的回調(diào)類(lèi),順序消費(fèi)則使用 MessageListenerOrderly 類(lèi)進(jìn)行回調(diào)。處理這兩種消費(fèi)方式的消費(fèi)服務(wù)也不同,分別是 ConsumeMessageConcurrentlyService ConsumeMessageOrderlyService 。

順序消費(fèi)的大致原理是依靠?jī)山M鎖,一組在 Broker 端(Broker 鎖),鎖定隊(duì)列和消費(fèi)者的關(guān)系,保證同一時(shí)間只有一個(gè)消費(fèi)者在消費(fèi);在消費(fèi)者端也有一組鎖(消費(fèi)隊(duì)列鎖)以保證消費(fèi)的順序性。

2.6 消費(fèi)進(jìn)度保存和提交

消費(fèi)者消費(fèi)一批消息完成之后,需要保存消費(fèi)進(jìn)度。如果是集群消費(fèi)模式,還需要將消費(fèi)進(jìn)度讓其他消費(fèi)者知道,所以需要提交消費(fèi)進(jìn)度。這樣在消費(fèi)者重啟或隊(duì)列重平衡時(shí)可以根據(jù)消費(fèi)進(jìn)度繼續(xù)消費(fèi)。

不同模式下消費(fèi)進(jìn)度保存方式的不同:

  • 廣播模式:保存在消費(fèi)者本地。因?yàn)槊總€(gè)消費(fèi)者都需要消費(fèi)全量消息消息。在 LocalfileOffsetStore 當(dāng)中。
  • 集群模式:保存在 Broker,同時(shí)消費(fèi)者端緩存。因?yàn)橐粋€(gè) Topic 的消息只要被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)即可,所以消息的消費(fèi)進(jìn)度需要統(tǒng)一保存。通過(guò) RemoteBrokerOffsetStore 存儲(chǔ)。

集群模式下,消費(fèi)者端有定時(shí)任務(wù),定時(shí)將內(nèi)存中的消費(fèi)進(jìn)度提交到 Broker,Broker 也有定時(shí)任務(wù)將內(nèi)存中的消費(fèi)偏移量持久化到磁盤(pán)。此外,消費(fèi)者向 Broker 拉取消息時(shí)也會(huì)提交消費(fèi)偏移量。注意,消費(fèi)者線程池提交的偏移量是線程池消費(fèi)的這一批消息中偏移量最小的消息的偏移量。

  1. 消費(fèi)完一批消息后將消息消費(fèi)進(jìn)度存在本地內(nèi)存
  2. 消費(fèi)者中有一個(gè)定時(shí)線程,每 5s 將內(nèi)存中所有隊(duì)列的消費(fèi)偏移量提交到 Broker
  3. Broker 收到消費(fèi)進(jìn)度先緩存到內(nèi)存,有一個(gè)定時(shí)任務(wù)每隔 5s 將消息偏移量持久化到磁盤(pán)
  4. 消費(fèi)者向 Broker 拉取消息時(shí)也會(huì)將隊(duì)列的消息偏移量提交到 Broker

3. 消費(fèi)流程

這張圖是阿里云的文章講解消費(fèi)時(shí)用到的,能夠清晰地表示客戶端 Push 模式并發(fā)消費(fèi)流程。

img

從左上角第一個(gè)方框開(kāi)始看

  1. 消費(fèi)者啟動(dòng)時(shí)喚醒重平衡服務(wù) RebalanceService,重平衡服務(wù)是客戶端開(kāi)始消費(fèi)的起點(diǎn)。
  2. 重平衡服務(wù)會(huì)周期性(每 20s)執(zhí)行重平衡方法 doRebalance),查詢所有注冊(cè)的 Broker,根據(jù)注冊(cè)的 Broker 數(shù)量為自身分配負(fù)載的隊(duì)列 rebalanceByTopic()
  3. 分配完隊(duì)列后,會(huì)為每個(gè)分配到的新隊(duì)列創(chuàng)建一個(gè)消息拉取請(qǐng)求 pullRequest,這個(gè)拉取請(qǐng)求中保存一個(gè)處理隊(duì)列 processQueue,即圖中的紅黑樹(shù)(TreeMap),用來(lái)保存拉取到的消息。紅黑樹(shù)保存消息的順序。
  4. 消息拉取線程應(yīng)用生產(chǎn)-消費(fèi)模式,用一個(gè)線程從拉取請(qǐng)求隊(duì)列 pullRequestQueue 中彈出拉取請(qǐng)求,執(zhí)行拉取任務(wù),將拉取到的消息放入處理隊(duì)列。
  5. 拉取請(qǐng)求在一次拉取消息完成之后會(huì)復(fù)用,重新被放入拉取請(qǐng)求隊(duì)列 pullRequestQueue 中
  6. 拉取完成后,在 NettyClientPublicExecutorThreadPool 線程池異步處理結(jié)果,將拉取到的消息放入處理隊(duì)列,然后調(diào)用 consumeMessageService.submitConsumeRequest,將處理隊(duì)列和 多個(gè)消費(fèi)任務(wù)提交到消
  7. 費(fèi)線程池。每個(gè)消費(fèi)任務(wù)消費(fèi) 1 批消息(1 批默認(rèn)為 1 條)
  8. 每個(gè)消費(fèi)者都有一個(gè)消費(fèi)線程池 consumeMessageThreadPool ,默認(rèn)有 20 個(gè)消費(fèi)線程。
  9. 消費(fèi)線程池的每個(gè)消費(fèi)線程會(huì)嘗試從消費(fèi)任務(wù)隊(duì)列中獲取消費(fèi)請(qǐng)求,執(zhí)行消費(fèi)業(yè)務(wù)邏輯 listener.consumeMessage。
  10. 消費(fèi)完成后,如果消費(fèi)成功,則更新偏移量 updateOffset(先更新到內(nèi)存 offsetTable,定時(shí)上報(bào)到 Broker。Broker 端也先放到內(nèi)存,定時(shí)刷盤(pán))。

到此這篇關(guān)于RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解的文章就介紹到這了,更多相關(guān)RocketMQ中的消費(fèi)者內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring Boot集成MyBatis實(shí)現(xiàn)通用Mapper的配置及使用

    Spring Boot集成MyBatis實(shí)現(xiàn)通用Mapper的配置及使用

    關(guān)于MyBatis,大部分人都很熟悉。MyBatis 是一款優(yōu)秀的持久層框架,它支持定制化 SQL、存儲(chǔ)過(guò)程以及高級(jí)映射。這篇文章主要介紹了Spring Boot集成MyBatis實(shí)現(xiàn)通用Mapper,需要的朋友可以參考下
    2018-08-08
  • Java?Spring中Bean的作用域及生命周期

    Java?Spring中Bean的作用域及生命周期

    這篇文章主要介紹了Java?Spring中Bean的作用域及生命周期,Bean的作用域默認(rèn)是單例模式的,也就是說(shuō)所有?的使?的都是同?個(gè)對(duì)象,更多相關(guān)內(nèi)容需要的朋友可以參考一下
    2022-08-08
  • java中httpclient封裝post請(qǐng)求和get的請(qǐng)求實(shí)例

    java中httpclient封裝post請(qǐng)求和get的請(qǐng)求實(shí)例

    這篇文章主要介紹了java中httpclient封裝post請(qǐng)求和get的請(qǐng)求實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • 詳解如何解析pom文件方法示例

    詳解如何解析pom文件方法示例

    這篇文章主要為大家介紹了詳解如何解析pom文件方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • SpringBoot項(xiàng)目調(diào)優(yōu)及垃圾回收器的比較詳解

    SpringBoot項(xiàng)目調(diào)優(yōu)及垃圾回收器的比較詳解

    這篇文章主要介紹了SpringBoot項(xiàng)目調(diào)優(yōu)及垃圾回收器的比較詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • Java多線程與優(yōu)先級(jí)詳細(xì)解讀

    Java多線程與優(yōu)先級(jí)詳細(xì)解讀

    這篇文章主要給大家介紹了關(guān)于Java中方法使用的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-08-08
  • 解讀RedisTemplate的各種操作(set、hash、list、string)

    解讀RedisTemplate的各種操作(set、hash、list、string)

    這篇文章主要介紹了解讀RedisTemplate的各種操作(set、hash、list、string),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • java時(shí)間日期使用與查詢代碼詳解

    java時(shí)間日期使用與查詢代碼詳解

    這篇文章主要介紹了java時(shí)間日期使用與查詢代碼詳解,具有一定借鑒價(jià)值,需要的朋友可以參考下。
    2017-11-11
  • IDEA運(yùn)行Java項(xiàng)目報(bào)錯(cuò)java: 錯(cuò)誤: 不支持發(fā)行版本 xx的解決方法

    IDEA運(yùn)行Java項(xiàng)目報(bào)錯(cuò)java: 錯(cuò)誤: 不支持發(fā)行版本 xx的解決方法

    這篇文章主要介紹了IDEA運(yùn)行Java項(xiàng)目報(bào)錯(cuò)java: 錯(cuò)誤: 不支持發(fā)行版本 xx的解決方法,文中有詳細(xì)的解決方案供大家參考,對(duì)大家解決問(wèn)題有一定的幫助,需要的朋友可以參考下
    2025-04-04
  • Java虛擬機(jī)內(nèi)存區(qū)域劃分詳解

    Java虛擬機(jī)內(nèi)存區(qū)域劃分詳解

    這篇文章主要介紹了Java虛擬機(jī)內(nèi)存區(qū)域劃分,本文邏輯清晰,可以幫助我們更好的掌握虛擬機(jī),對(duì)我們學(xué)習(xí)java來(lái)說(shuō)是一種幫助,需要的朋友可以參考下
    2021-04-04

最新評(píng)論