欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中RocketMq的消費方式詳解

 更新時間:2023年10月11日 09:11:19   作者:獵戶星座。  
這篇文章主要介紹了Java中RocketMq的消費方式詳解,RocketMQ的消費方式都是基于拉模式拉取消息的,而在這其中有一種長輪詢機制(對普通輪詢的一種優(yōu)化),來平衡上面Push/Pull模型的各自缺點,需要的朋友可以參考下

 一、如何選擇消息消費的方式—Pull or Push?

1.1 MQ中Pull和Push的兩種消費方式

對于任何一款消息中間件而言,消費者客戶端一般有兩種方式從消息中間件獲取消息并消費:

(1)Push方式:由消息中間件(MQ消息服務器代理)主動地將消息推送給消費者;采用Push方式,可以盡可能實時地將消息發(fā)送給消費者進行消費。但是,在消費者的處理消息的能力較弱的時候(比如,消費者端的業(yè)務系統(tǒng)處理一條消息的流程比較復雜,其中的調(diào)用鏈路比較多導致消費時間比較久。概括起來地說就是“慢消費問題”),而MQ不斷地向消費者Push消息,消費者端的緩沖區(qū)可能會溢出,導致異常;

(2)Pull方式:由消費者客戶端主動向消息中間件(MQ消息服務器代理)拉取消息;采用Pull方式,如何設置Pull消息的頻率需要重點去考慮,舉個例子來說,可能1分鐘內(nèi)連續(xù)來了1000條消息,然后2小時內(nèi)沒有新消息產(chǎn)生(概括起來說就是“消息延遲與忙等待”)。如果每次Pull的時間間隔比較久,會增加消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間內(nèi)MQ中并沒有任何消息可以消費,那么會產(chǎn)生很多無效的Pull請求的RPC開銷,影響MQ整體的網(wǎng)絡性能;

1.2 RocketMQ消息消費的長輪詢機制

思考題: 上面簡要說明了Push和Pull兩種消息消費方式的概念和各自特點。如果長時間沒有消息,而消費者端又不停的發(fā)送Pull請求不就會導致RocketMQ中Broker端負載很高嗎?那么在RocketMQ中如何解決以做到高效的消息消費呢?

通過研究源碼可知,RocketMQ的消費方式都是基于拉模式拉取消息的,而在這其中有一種長輪詢機制(對普通輪詢的一種優(yōu)化),來平衡上面Push/Pull模型的各自缺點?;驹O計思路是:消費者如果第一次嘗試Pull消息失敗(比如:Broker端沒有可以消費的消息),并不立即給消費者客戶端返回Response的響應,而是先hold住并且掛起請求(將請求保存至pullRequestTable本地緩存變量中),然后Broker端的后臺獨立線程—PullRequestHoldService會從pullRequestTable本地緩存變量中不斷地去取,具體的做法是查詢待拉取消息的偏移量是否小于消費隊列最大偏移量,如果條件成立則說明有新消息達到Broker端(這里,在RocketMQ的Broker端會有一個后臺獨立線程—ReputMessageService不停地構建ConsumeQueue/IndexFile數(shù)據(jù),同時取出hold住的請求并進行二次處理),則通過重新調(diào)用一次業(yè)務處理器—PullMessageProcessor的處理請求方法—processRequest()來重新嘗試拉取消息(此處,每隔5S重試一次,默認長輪詢整體的時間設置為30s)。

RocketMQ消息Pull的長輪詢機制的關鍵在于Broker端的PullRequestHoldService和ReputMessageService兩個后臺線程。對于RocketMQ的長輪詢(LongPolling)消費模式后面會專門詳細介紹。

二、RocketMQ中兩種消費方式的demo代碼

(1)Pull模式的Consumer端代碼如下:

        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.println(pullResult.getMsgFoundList().get(0).toString());
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
        consumer.shutdown();

在示例代碼中,可以看到業(yè)務工程在Consumer啟動后,Consumer主動獲取MessageQueue的Set集合,遍歷該集合中的每一個隊列,發(fā)送Pull的請求(參數(shù)中帶有隊列中的消息偏移量),同時需要Consumer端自己保存消息消費的offset偏移量至本地變量中。

在Pull模式下,需要業(yè)務應用代碼自身去完成比較多的事情,因此在實際應用中用的較少。(2)Push模式的Consumer端代碼如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest111", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName("consumer1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

在示例代碼中,業(yè)務工程的應用程序使用Push方式進行消費時,Consumer端注冊了一個監(jiān)聽器,Consumer在收到消息后主動調(diào)用這個監(jiān)聽器完成消費并進行對應的業(yè)務邏輯處理。

由此可見,業(yè)務應用代碼只需要完成消息消費即可,無需參與MQ本身的一些任務處理(ps:業(yè)務代碼顯得更為簡潔一些)。

三、RocketMQ中消費者Push方式的啟動流程

這一節(jié)主要先講下RocketMQ消費者的啟動流程,看下在啟動的時候究竟完成了什么樣的操作。由于RocketMQ的DefaultMQPushConsumer和DefaultMQPullConsumer啟動流程大部分類似,而DefaultMQPushConsumer更為復雜一些,因此這一節(jié)內(nèi)容主要講的是DefaultMQPushConsumer啟動流程。Push方式的Consumer啟動流程的時序圖如下圖所示:

 從上面的時序圖上可以看出,Push方式的Consumer啟動流程完成的任務比較多,主要任務如下:

(1)設置consumerGroup、NameServer服務地址、消費起始偏移地址并根據(jù)參數(shù)Topic構建Consumer端的SubscriptionData(訂閱關系值);

(2)在Consumer端注冊消費者監(jiān)聽器,當消息到來時完成消費消息;

(3)啟動defaultMQPushConsumerImpl實例,主要完成前置校驗、復制訂閱關系(將defaultMQPushConsumer的訂閱關系復制至rebalanceImpl中,包括retryTopic(重試主題)對應的訂閱關系)、創(chuàng)建MQClientInstance實例、設置rebalanceImpl的各個屬性值、pullAPIWrapper包裝類對象的初始化、初始化offsetStore實例并加載消費進度、啟動消息消費服務線程以及在MQClientInstance中注冊consumer等任務;

(4)啟動MQClientInstance實例,其中包括完成客戶端網(wǎng)絡通信線程、拉取消息服務線程、負載均衡服務線程和若干個定時任務的啟動;

(5)向所有的Broker端發(fā)送心跳(采用加鎖方式);

(6)最后,喚醒負載均衡服務線程在Consumer端開始負載均衡;

四、RocketMQ中Pull和Push兩種消費模式流程簡析

RocketMQ提供了兩種消費模式,Push和Pull,大多數(shù)場景使用的是Push模式,在源碼中這兩種模式分別對應的是DefaultMQPushConsumer類和DefaultMQPullConsumer類。Push模式實際上在內(nèi)部還是使用的Pull方式實現(xiàn)的,通過Pull不斷地輪詢Broker獲取消息,當不存在新消息時,Broker端會掛起Pull請求,直到有新消息產(chǎn)生才取消掛起,返回新消息。

(1)RocketMQ的Pull消費模式流程簡析 RocketMQ的Pull模式相對來得簡單,從上面的demo代碼中可以看出,業(yè)務應用代碼通過由Topic獲取到的MessageQueue直接拉取消息(最后真正執(zhí)行的是PullAPIWrapper的pullKernelImpl()方法,通過發(fā)送拉取消息的RPC請求給Broker端)。其中,消息消費的偏移量需要Consumer端自己去維護。

(2)RocketMQ的Push消費模式流程簡析 在本文前面已經(jīng)提到過了,從嚴格意義上說,RocketMQ并沒有實現(xiàn)真正的消息消費的Push模式,而是對Pull模式進行了一定的優(yōu)化,一方面在Consumer端開啟后臺獨立的線程—PullMessageService不斷地從阻塞隊列—pullRequestQueue中獲取PullRequest請求并通過網(wǎng)絡通信模塊發(fā)送Pull消息的RPC請求給Broker端。另外一方面,后臺獨立線程—rebalanceService根據(jù)Topic中消息隊列個數(shù)和當前消費組內(nèi)消費者個數(shù)進行負載均衡,將產(chǎn)生的對應PullRequest實例放入阻塞隊列—pullRequestQueue中。這里算是比較典型的生產(chǎn)者-消費者模型,實現(xiàn)了準實時的自動消息拉取。然后,再根據(jù)業(yè)務反饋是否成功消費來推動消費進度。 在Broker端,PullMessageProcessor業(yè)務處理器收到Pull消息的RPC請求后,通過MessageStore實例從commitLog獲取消息。如1.2節(jié)內(nèi)容所述,如果第一次嘗試Pull消息失?。ū热鏐roker端沒有可以消費的消息),則通過長輪詢機制先hold住并且掛起該請求,然后通過Broker端的后臺線程PullRequestHoldService重新嘗試和后臺線程ReputMessageService的二次處理。

到此這篇關于Java中RocketMq的消費方式詳解的文章就介紹到這了,更多相關RocketMq的消費方式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 解決Springboot項目啟動后自動創(chuàng)建多表關聯(lián)的數(shù)據(jù)庫與表的方案

    解決Springboot項目啟動后自動創(chuàng)建多表關聯(lián)的數(shù)據(jù)庫與表的方案

    這篇文章主要介紹了解決Springboot項目啟動后自動創(chuàng)建多表關聯(lián)的數(shù)據(jù)庫與表的方案,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • Java零基礎講解異常

    Java零基礎講解異常

    異常就是不正常,比如當我們身體出現(xiàn)了異常我們會根據(jù)身體情況選擇喝開水、吃藥、看病、等?異常處理方法。?java異常處理機制是我們java語言使用異常處理機制為程序提供了錯誤處理的能力,程序出現(xiàn)的錯誤,程序可以安全的退出,以保證程序正常的運行等
    2022-04-04
  • 在idea中git實現(xiàn)里查看歷史代碼方式

    在idea中git實現(xiàn)里查看歷史代碼方式

    這篇文章主要介紹了在idea中git里查看歷史代碼的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • springboot+maven快速構建項目的示例代碼

    springboot+maven快速構建項目的示例代碼

    本篇文章主要介紹了springboot+maven快速構建項目的示例代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • Java使用Jdom讀取xml解析實例

    Java使用Jdom讀取xml解析實例

    這篇文章主要介紹了Java使用Jdom讀取xml解析,以實例形式較為詳細的分析了Jdom操作XML文件實現(xiàn)讀取操作的相關技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-10-10
  • Java替換(新增)JSON串里面的某個節(jié)點操作

    Java替換(新增)JSON串里面的某個節(jié)點操作

    這篇文章主要介紹了Java替換(新增)JSON串里面的某個節(jié)點操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • 詳解如何用SpringBoot 2.3.0.M1創(chuàng)建Docker映像

    詳解如何用SpringBoot 2.3.0.M1創(chuàng)建Docker映像

    這篇文章主要介紹了詳解如何用SpringBoot 2.3.0.M1創(chuàng)建Docker映像,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-05-05
  • Java判斷字符串為空、字符串是否為數(shù)字

    Java判斷字符串為空、字符串是否為數(shù)字

    這篇文章主要介紹了Java判斷字符串為空、字符串是否為數(shù)字,其中數(shù)字的判斷介紹了3種方法,需要的朋友可以參考下
    2014-06-06
  • Mybatis如何自動生成數(shù)據(jù)庫表結構總結

    Mybatis如何自動生成數(shù)據(jù)庫表結構總結

    這篇文章主要給大家介紹了關于Mybatis如何自動生成數(shù)據(jù)庫表結構的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者使用Mybatis具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-11-11
  • 解決struts2 攔截器修改request的parameters參數(shù)失敗的問題

    解決struts2 攔截器修改request的parameters參數(shù)失敗的問題

    這篇文章主要介紹了解決struts2 攔截器修改request的parameters參數(shù)失敗的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評論