欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中RocketMq的消費(fèi)方式詳解

 更新時間:2023年10月11日 09:11:19   作者:獵戶星座。  
這篇文章主要介紹了Java中RocketMq的消費(fèi)方式詳解,RocketMQ的消費(fèi)方式都是基于拉模式拉取消息的,而在這其中有一種長輪詢機(jī)制(對普通輪詢的一種優(yōu)化),來平衡上面Push/Pull模型的各自缺點(diǎn),需要的朋友可以參考下

 一、如何選擇消息消費(fèi)的方式—Pull or Push?

1.1 MQ中Pull和Push的兩種消費(fèi)方式

對于任何一款消息中間件而言,消費(fèi)者客戶端一般有兩種方式從消息中間件獲取消息并消費(fèi):

(1)Push方式:由消息中間件(MQ消息服務(wù)器代理)主動地將消息推送給消費(fèi)者;采用Push方式,可以盡可能實(shí)時地將消息發(fā)送給消費(fèi)者進(jìn)行消費(fèi)。但是,在消費(fèi)者的處理消息的能力較弱的時候(比如,消費(fèi)者端的業(yè)務(wù)系統(tǒng)處理一條消息的流程比較復(fù)雜,其中的調(diào)用鏈路比較多導(dǎo)致消費(fèi)時間比較久。概括起來地說就是“慢消費(fèi)問題”),而MQ不斷地向消費(fèi)者Push消息,消費(fèi)者端的緩沖區(qū)可能會溢出,導(dǎo)致異常;

(2)Pull方式:由消費(fèi)者客戶端主動向消息中間件(MQ消息服務(wù)器代理)拉取消息;采用Pull方式,如何設(shè)置Pull消息的頻率需要重點(diǎn)去考慮,舉個例子來說,可能1分鐘內(nèi)連續(xù)來了1000條消息,然后2小時內(nèi)沒有新消息產(chǎn)生(概括起來說就是“消息延遲與忙等待”)。如果每次Pull的時間間隔比較久,會增加消息的延遲,即消息到達(dá)消費(fèi)者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間內(nèi)MQ中并沒有任何消息可以消費(fèi),那么會產(chǎn)生很多無效的Pull請求的RPC開銷,影響MQ整體的網(wǎng)絡(luò)性能;

1.2 RocketMQ消息消費(fèi)的長輪詢機(jī)制

思考題: 上面簡要說明了Push和Pull兩種消息消費(fèi)方式的概念和各自特點(diǎn)。如果長時間沒有消息,而消費(fèi)者端又不停的發(fā)送Pull請求不就會導(dǎo)致RocketMQ中Broker端負(fù)載很高嗎?那么在RocketMQ中如何解決以做到高效的消息消費(fèi)呢?

通過研究源碼可知,RocketMQ的消費(fèi)方式都是基于拉模式拉取消息的,而在這其中有一種長輪詢機(jī)制(對普通輪詢的一種優(yōu)化),來平衡上面Push/Pull模型的各自缺點(diǎn)?;驹O(shè)計思路是:消費(fèi)者如果第一次嘗試Pull消息失?。ū热纾築roker端沒有可以消費(fèi)的消息),并不立即給消費(fèi)者客戶端返回Response的響應(yīng),而是先hold住并且掛起請求(將請求保存至pullRequestTable本地緩存變量中),然后Broker端的后臺獨(dú)立線程—PullRequestHoldService會從pullRequestTable本地緩存變量中不斷地去取,具體的做法是查詢待拉取消息的偏移量是否小于消費(fèi)隊(duì)列最大偏移量,如果條件成立則說明有新消息達(dá)到Broker端(這里,在RocketMQ的Broker端會有一個后臺獨(dú)立線程—ReputMessageService不停地構(gòu)建ConsumeQueue/IndexFile數(shù)據(jù),同時取出hold住的請求并進(jìn)行二次處理),則通過重新調(diào)用一次業(yè)務(wù)處理器—PullMessageProcessor的處理請求方法—processRequest()來重新嘗試?yán)∠ⅲù颂?,每?S重試一次,默認(rèn)長輪詢整體的時間設(shè)置為30s)。

RocketMQ消息Pull的長輪詢機(jī)制的關(guān)鍵在于Broker端的PullRequestHoldService和ReputMessageService兩個后臺線程。對于RocketMQ的長輪詢(LongPolling)消費(fèi)模式后面會專門詳細(xì)介紹。

二、RocketMQ中兩種消費(fèi)方式的demo代碼

(1)Pull模式的Consumer端代碼如下:

        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.println(pullResult.getMsgFoundList().get(0).toString());
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
        consumer.shutdown();

在示例代碼中,可以看到業(yè)務(wù)工程在Consumer啟動后,Consumer主動獲取MessageQueue的Set集合,遍歷該集合中的每一個隊(duì)列,發(fā)送Pull的請求(參數(shù)中帶有隊(duì)列中的消息偏移量),同時需要Consumer端自己保存消息消費(fèi)的offset偏移量至本地變量中。

在Pull模式下,需要業(yè)務(wù)應(yīng)用代碼自身去完成比較多的事情,因此在實(shí)際應(yīng)用中用的較少。(2)Push模式的Consumer端代碼如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest111", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName("consumer1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

在示例代碼中,業(yè)務(wù)工程的應(yīng)用程序使用Push方式進(jìn)行消費(fèi)時,Consumer端注冊了一個監(jiān)聽器,Consumer在收到消息后主動調(diào)用這個監(jiān)聽器完成消費(fèi)并進(jìn)行對應(yīng)的業(yè)務(wù)邏輯處理。

由此可見,業(yè)務(wù)應(yīng)用代碼只需要完成消息消費(fèi)即可,無需參與MQ本身的一些任務(wù)處理(ps:業(yè)務(wù)代碼顯得更為簡潔一些)。

三、RocketMQ中消費(fèi)者Push方式的啟動流程

這一節(jié)主要先講下RocketMQ消費(fèi)者的啟動流程,看下在啟動的時候究竟完成了什么樣的操作。由于RocketMQ的DefaultMQPushConsumer和DefaultMQPullConsumer啟動流程大部分類似,而DefaultMQPushConsumer更為復(fù)雜一些,因此這一節(jié)內(nèi)容主要講的是DefaultMQPushConsumer啟動流程。Push方式的Consumer啟動流程的時序圖如下圖所示:

 從上面的時序圖上可以看出,Push方式的Consumer啟動流程完成的任務(wù)比較多,主要任務(wù)如下:

(1)設(shè)置consumerGroup、NameServer服務(wù)地址、消費(fèi)起始偏移地址并根據(jù)參數(shù)Topic構(gòu)建Consumer端的SubscriptionData(訂閱關(guān)系值);

(2)在Consumer端注冊消費(fèi)者監(jiān)聽器,當(dāng)消息到來時完成消費(fèi)消息;

(3)啟動defaultMQPushConsumerImpl實(shí)例,主要完成前置校驗(yàn)、復(fù)制訂閱關(guān)系(將defaultMQPushConsumer的訂閱關(guān)系復(fù)制至rebalanceImpl中,包括retryTopic(重試主題)對應(yīng)的訂閱關(guān)系)、創(chuàng)建MQClientInstance實(shí)例、設(shè)置rebalanceImpl的各個屬性值、pullAPIWrapper包裝類對象的初始化、初始化offsetStore實(shí)例并加載消費(fèi)進(jìn)度、啟動消息消費(fèi)服務(wù)線程以及在MQClientInstance中注冊consumer等任務(wù);

(4)啟動MQClientInstance實(shí)例,其中包括完成客戶端網(wǎng)絡(luò)通信線程、拉取消息服務(wù)線程、負(fù)載均衡服務(wù)線程和若干個定時任務(wù)的啟動;

(5)向所有的Broker端發(fā)送心跳(采用加鎖方式);

(6)最后,喚醒負(fù)載均衡服務(wù)線程在Consumer端開始負(fù)載均衡;

四、RocketMQ中Pull和Push兩種消費(fèi)模式流程簡析

RocketMQ提供了兩種消費(fèi)模式,Push和Pull,大多數(shù)場景使用的是Push模式,在源碼中這兩種模式分別對應(yīng)的是DefaultMQPushConsumer類和DefaultMQPullConsumer類。Push模式實(shí)際上在內(nèi)部還是使用的Pull方式實(shí)現(xiàn)的,通過Pull不斷地輪詢Broker獲取消息,當(dāng)不存在新消息時,Broker端會掛起Pull請求,直到有新消息產(chǎn)生才取消掛起,返回新消息。

(1)RocketMQ的Pull消費(fèi)模式流程簡析 RocketMQ的Pull模式相對來得簡單,從上面的demo代碼中可以看出,業(yè)務(wù)應(yīng)用代碼通過由Topic獲取到的MessageQueue直接拉取消息(最后真正執(zhí)行的是PullAPIWrapper的pullKernelImpl()方法,通過發(fā)送拉取消息的RPC請求給Broker端)。其中,消息消費(fèi)的偏移量需要Consumer端自己去維護(hù)。

(2)RocketMQ的Push消費(fèi)模式流程簡析 在本文前面已經(jīng)提到過了,從嚴(yán)格意義上說,RocketMQ并沒有實(shí)現(xiàn)真正的消息消費(fèi)的Push模式,而是對Pull模式進(jìn)行了一定的優(yōu)化,一方面在Consumer端開啟后臺獨(dú)立的線程—PullMessageService不斷地從阻塞隊(duì)列—pullRequestQueue中獲取PullRequest請求并通過網(wǎng)絡(luò)通信模塊發(fā)送Pull消息的RPC請求給Broker端。另外一方面,后臺獨(dú)立線程—rebalanceService根據(jù)Topic中消息隊(duì)列個數(shù)和當(dāng)前消費(fèi)組內(nèi)消費(fèi)者個數(shù)進(jìn)行負(fù)載均衡,將產(chǎn)生的對應(yīng)PullRequest實(shí)例放入阻塞隊(duì)列—pullRequestQueue中。這里算是比較典型的生產(chǎn)者-消費(fèi)者模型,實(shí)現(xiàn)了準(zhǔn)實(shí)時的自動消息拉取。然后,再根據(jù)業(yè)務(wù)反饋是否成功消費(fèi)來推動消費(fèi)進(jìn)度。 在Broker端,PullMessageProcessor業(yè)務(wù)處理器收到Pull消息的RPC請求后,通過MessageStore實(shí)例從commitLog獲取消息。如1.2節(jié)內(nèi)容所述,如果第一次嘗試Pull消息失?。ū热鏐roker端沒有可以消費(fèi)的消息),則通過長輪詢機(jī)制先hold住并且掛起該請求,然后通過Broker端的后臺線程PullRequestHoldService重新嘗試和后臺線程ReputMessageService的二次處理。

到此這篇關(guān)于Java中RocketMq的消費(fèi)方式詳解的文章就介紹到這了,更多相關(guān)RocketMq的消費(fèi)方式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 解決Springboot項(xiàng)目啟動后自動創(chuàng)建多表關(guān)聯(lián)的數(shù)據(jù)庫與表的方案

    解決Springboot項(xiàng)目啟動后自動創(chuàng)建多表關(guān)聯(lián)的數(shù)據(jù)庫與表的方案

    這篇文章主要介紹了解決Springboot項(xiàng)目啟動后自動創(chuàng)建多表關(guān)聯(lián)的數(shù)據(jù)庫與表的方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • Java零基礎(chǔ)講解異常

    Java零基礎(chǔ)講解異常

    異常就是不正常,比如當(dāng)我們身體出現(xiàn)了異常我們會根據(jù)身體情況選擇喝開水、吃藥、看病、等?異常處理方法。?java異常處理機(jī)制是我們java語言使用異常處理機(jī)制為程序提供了錯誤處理的能力,程序出現(xiàn)的錯誤,程序可以安全的退出,以保證程序正常的運(yùn)行等
    2022-04-04
  • 在idea中g(shù)it實(shí)現(xiàn)里查看歷史代碼方式

    在idea中g(shù)it實(shí)現(xiàn)里查看歷史代碼方式

    這篇文章主要介紹了在idea中g(shù)it里查看歷史代碼的實(shí)現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • springboot+maven快速構(gòu)建項(xiàng)目的示例代碼

    springboot+maven快速構(gòu)建項(xiàng)目的示例代碼

    本篇文章主要介紹了springboot+maven快速構(gòu)建項(xiàng)目的示例代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • Java使用Jdom讀取xml解析實(shí)例

    Java使用Jdom讀取xml解析實(shí)例

    這篇文章主要介紹了Java使用Jdom讀取xml解析,以實(shí)例形式較為詳細(xì)的分析了Jdom操作XML文件實(shí)現(xiàn)讀取操作的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-10-10
  • Java替換(新增)JSON串里面的某個節(jié)點(diǎn)操作

    Java替換(新增)JSON串里面的某個節(jié)點(diǎn)操作

    這篇文章主要介紹了Java替換(新增)JSON串里面的某個節(jié)點(diǎn)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • 詳解如何用SpringBoot 2.3.0.M1創(chuàng)建Docker映像

    詳解如何用SpringBoot 2.3.0.M1創(chuàng)建Docker映像

    這篇文章主要介紹了詳解如何用SpringBoot 2.3.0.M1創(chuàng)建Docker映像,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • Java判斷字符串為空、字符串是否為數(shù)字

    Java判斷字符串為空、字符串是否為數(shù)字

    這篇文章主要介紹了Java判斷字符串為空、字符串是否為數(shù)字,其中數(shù)字的判斷介紹了3種方法,需要的朋友可以參考下
    2014-06-06
  • Mybatis如何自動生成數(shù)據(jù)庫表結(jié)構(gòu)總結(jié)

    Mybatis如何自動生成數(shù)據(jù)庫表結(jié)構(gòu)總結(jié)

    這篇文章主要給大家介紹了關(guān)于Mybatis如何自動生成數(shù)據(jù)庫表結(jié)構(gòu)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • 解決struts2 攔截器修改request的parameters參數(shù)失敗的問題

    解決struts2 攔截器修改request的parameters參數(shù)失敗的問題

    這篇文章主要介紹了解決struts2 攔截器修改request的parameters參數(shù)失敗的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評論