欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ?Push?消費(fèi)模型示例詳解

 更新時間:2022年09月20日 15:23:42   作者:磊叔的技術(shù)博客  
這篇文章主要為大家介紹了RocketMQ?Push?消費(fèi)模型示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

Push 模式是指由 Server 端來控制消息的推送,即當(dāng)有消息到 Server 之后,會將消息主動投遞給 client(Consumer 端)。

使用 DefaultMQPushConsumer 消費(fèi)消息

下面是使用 DefaultMQPushConsumer 消費(fèi)消息的官方示例代碼:

// 初始化consumer,并設(shè)置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
// 設(shè)置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//訂閱一個或多個topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注冊回調(diào)接口來處理從Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 啟動Consumer
consumer.start();

這里看到主要是通過 consumer 注冊回調(diào)接口來處理從 Broker 中收到的消息。這種監(jiān)聽回調(diào)的機(jī)制很容易想到是一種觀察者模式或者事件機(jī)制;對于這種 C-S 模型的架構(gòu)來說,如果要做到 Server 在有新消息時立即推送給 Client,那么 Client 和 Server 之間應(yīng)該是有連接存在的,Client 端開放端口來 watch Server 的推送。這里好論證,即可以查看當(dāng)前 Client 端所在進(jìn)程開啟了什么端口即可,通過如下指令查看:

  • 1、先通過 jps 查看 Consumer Client 的進(jìn)程號
?  rocketmq-4.9.4 git:(06f2208a3) jps
10722 Jps
4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar
1766
4121 BrokerStartup
4009 NamesrvStartup
9419 PushConsumer
9692 RemoteMavenServer36

可以看到 PushConsumer 的進(jìn)程號是 9419

  • 2、通過 lsof 命令查看進(jìn)程端口占用
?  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN
?  

這里沒有看到 PushConsumer 有開啟端口。同樣,這里可以看看 Broker 的進(jìn)程端口占用

?  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN
java    4121 glmapper  137u    IPv6 0xca1142b0f200067d        0t0                 TCP *:10912 (LISTEN)
java    4121 glmapper  141u    IPv6 0xca1142b0f1fc8cfd        0t0                 TCP *:10911 (LISTEN)
java    4121 glmapper  142u    IPv6 0xca1142b0f1fc935d        0t0                 TCP *:10909 (LISTEN)

所以得到一個初步的結(jié)論是,在 Push 模式下,Consumer Client 并沒有啟動端口來接收 Server 的消息推送。 那么 RocketMQ 是怎么實現(xiàn)的?

基于長輪詢機(jī)制的偽 push 實現(xiàn)

真正的 Push 方式,是 Server 端接收到消息后,主動把消息推送給 Client 端,這種情況一般需要 Client 和 Server 之間建立長連接。通過前面的分析,Client 既然沒有開啟端口用于接收 Server 的信息推送,那么只有一種可能就是 Client 自己去拉了消息,但是這種主動拉消息的方式是對于用戶無感的,從使用上體驗上來看,做到了和 push 一樣的效果;這種機(jī)制就是“長輪詢”。

為啥不用長連接方式,讓 Server 主動 Push 呢?其實很好理解,對于一個提供隊列服務(wù)的 Server 來說,用 Push方式主動推送有兩個問題:

  • 1、會增加 Server 端的工作量,進(jìn)而影響 Server 的性能
  • 2、Client 的處理能力存在差異,Client 的狀態(tài)不受 Server 控制,如果 Client 不能及時處理 Server 推送過來的消息,會造成各種潛在問題

客戶端側(cè)發(fā)起的長輪詢請求

下圖是初始化相關(guān)資源的過程,DefaultMQPushConsumer 是面向用戶使用的 API client 類,內(nèi)部處理實際上是委托給 DefaultMQPushConsumerImpl 來處理的。DefaultMQPushConsumerImpl#start 時,會初始化 MQClientInstance ,MQClientInstance 初始化過程中又會初始化一堆資源,比如請求-響應(yīng)的通道,開啟各種各樣的調(diào)度任務(wù)(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline狀態(tài)的 Broker、定期發(fā)送心跳給 Broker、定期持久化所有 Consumer Offset等等),開啟 pullMessageService,開啟 rebalance Service 等等。大致的調(diào)用鏈如下圖

下面這個代碼片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子類)

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            // 從 pullRequestQueue 中取 pullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

通過代碼,可以直觀的看起,pullMessageService 會一直從 pullRequestQueue 中取 pullRequest,然后執(zhí)行 pullMessage 請求。實際上 MessageQueue 是和 pullRequest 一一對應(yīng)的 ,pullRequest 全部存儲到該 Consumer 的 pullRequestQueue 隊列里面;消費(fèi)者會不停的從 PullRequest 的隊列里取 request 然后向broker 請求消息。

這里還有一個問題是隊列取出之后什么時候放回去的?在 pullMessage 的回調(diào)方法中,如果正常得到了 broker 的響應(yīng),那么會把 PullRequest放回隊列,相關(guān)代碼可以從 org.apache.rocketmq.client.consumer.PullCallbackonSuccess 方法中得到答案。

服務(wù)端阻塞請求

服務(wù)端處理 pullRequest 請求的是 PullMessageProcessor,當(dāng)沒有消息時,則通過 PullRequestHoldService 將當(dāng)前請求先 hold 住。

case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        // 如果是 LongPolling,則 hold 住
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }
        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

PullRequestHoldService 中會將所有的 PullRequest 緩存到 pullRequestTable。PullRequestHoldService 也是一個 task,默認(rèn)每次 hold 5s 然后再去檢查是否有新的消息過來,如果有新的消息到來,則喚醒對應(yīng)的線程來將消息返回給客戶端。

// 已省略無關(guān)代碼
public void run() {
    // loop
    while (!this.isStopped()) {
        // default hold 5s
        if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            this.waitForRunning(5 * 1000);
        } else {
            this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
        }
        long beginLockTimestamp = this.systemClock.now();
        // 檢查是否有新的消息到達(dá)
        this.checkHoldRequest();
        long costTime = this.systemClock.now() - beginLockTimestamp;
        if (costTime > 5 * 1000) {
            log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
        }
	}
}

客戶端回調(diào)處理

我們在編寫 consumer 代碼時,基于 push 模式是通過如下方式來監(jiān)聽消息的

//注冊回調(diào)接口來處理從Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

通過前面的分析,對于如何通過“長輪詢”實現(xiàn)偽“push” 有了大概得了解;客戶端通過一個定時任務(wù)不斷向 Broker 發(fā)請求,Broker 在沒有消息時先 hold 住一小段時間,當(dāng)有新的消息時會立即將消息返回給 consumer;本節(jié)就主要探討 consumer 在收到消息之后的處理邏輯,以及是怎么觸發(fā) MessageListener 回調(diào)執(zhí)行的。

客戶端發(fā)起請求的底層邏輯

以異步調(diào)用為例,代碼在

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync中,截取部分代碼如下:

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
    @Override
    public void operationComplete(ResponseFuture responseFuture) {
        RemotingCommand response = responseFuture.getResponseCommand();
        if (response != null) {
            try {
                PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                assert pullResult != null;
                // 成功回調(diào)
                pullCallback.onSuccess(pullResult);
            } catch (Exception e) {
                // 異?;卣{(diào)
                pullCallback.onException(e);
            }
        } else {
            if (!responseFuture.isSendRequestOK()) {
                 // 異?;卣{(diào)
                pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
            } else if (responseFuture.isTimeout()) {
                 // 異?;卣{(diào)
                pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                    responseFuture.getCause()));
            } else {
                 // 異?;卣{(diào)
                pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
            }
        }
    }
});

PullCallback 回調(diào)

PullCallback 回調(diào)邏輯在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage方法中,以正常返回消息為例:

// 已省略無關(guān)代碼
public void onSuccess(PullResult pullResult) {
    // 將接收到的消息 交給 consumeMessageService 處理
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispatchToConsume);
    // 將 pullRequest 放回 pullRequestQueue
 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

ConsumeRequest 是一個 Runnable,submitConsumeRequest 就是將返回結(jié)果丟在一個單獨(dú)的線程池中去處理返回結(jié)果的。ConsumeRequest 的 run 方法中,會拿到 messageListener,然后執(zhí)行 consumeMessage 方法。

總結(jié)

到此,關(guān)于 RocketMQ push 消費(fèi)模型基本就探討完了。從實現(xiàn)機(jī)制上來看,push 本質(zhì)上并不是在建立雙向通道的前提下,由 Server 主動推送給 Client 的,而是由 Client 端觸發(fā) pullRequest 請求,以長輪詢的方式“偽裝”的結(jié)果。從代碼上來,RocketMQ 代碼中使用了非常多的異步機(jī)制,如 pullRequestQueue 來解耦發(fā)送請求和等待結(jié)果,各種定時任務(wù)等等。

整體看,PushConsumer 采用了 長輪詢+超時時間+Pull的模式, 這種方式帶來的好處總結(jié)如下

  • 1、減少 Broker 的壓力,避免由于不同 Consumer 消費(fèi)能力導(dǎo)致 Broker 出現(xiàn)問題
  • 2、確保了 Consumer 不會負(fù)載過高,Consumer 在校驗自己的緩存消息沒有超過閾值才會去從 Broker 拉取消息,Broker 不會主動推過來
  • 3、兼顧了消息的即時性,Broker 在沒有消息的時候會先 hold 一小段時間,有消息會立即喚起線程將消息返回給 Consumer
  • 4、Broker 端無效請求的次數(shù)大大降低,Broker 在沒有消息時會掛起 PullRequest,而 Consumer 在未接收到Response 且未超時時,也不會重新發(fā)起 PullRequest

以上就是RocketMQ Push 消費(fèi)模型示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Push 消費(fèi)模型的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java String字符串內(nèi)容實現(xiàn)添加雙引號

    Java String字符串內(nèi)容實現(xiàn)添加雙引號

    這篇文章主要介紹了Java String字符串內(nèi)容實現(xiàn)添加雙引號,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼

    application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼

    這篇文章主要介紹了application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼,具有一定參考價值,需要的朋友可以了解下。
    2017-11-11
  • 解決JD-GUI for mac big sur打不開問題

    解決JD-GUI for mac big sur打不開問題

    這篇文章主要介紹了解決JD-GUI for mac big sur打不開問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • Java序列化常見實現(xiàn)方法代碼實例

    Java序列化常見實現(xiàn)方法代碼實例

    這篇文章主要介紹了Java序列化常見實現(xiàn)方法代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-11-11
  • java直接插入排序示例

    java直接插入排序示例

    這篇文章主要介紹了java直接插入排序示例,插入排序的比較次數(shù)仍然是n的平方,但在一般情況下,它要比冒泡排序快一倍,比選擇排序還要快一點(diǎn)。它常常被用在復(fù)雜排序算法的最后階段,比如快速排序。
    2014-05-05
  • java垃圾收集器與內(nèi)存分配策略詳解

    java垃圾收集器與內(nèi)存分配策略詳解

    本篇文章主要介紹了Java垃圾收集器與內(nèi)存分配策略的方法和原理總結(jié),Java垃圾回收器是Java虛擬機(jī)的重要模塊,具有一定的參考價值,有興趣的可以了解一下
    2021-08-08
  • Kafka的監(jiān)聽地址配置實例詳解

    Kafka的監(jiān)聽地址配置實例詳解

    這篇文章主要給大華介紹了關(guān)于Kafka監(jiān)聽地址配置的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2022-01-01
  • java實現(xiàn)短地址服務(wù)的方法(附代碼)

    java實現(xiàn)短地址服務(wù)的方法(附代碼)

    大多數(shù)情況下URL太長,字符多,不便于發(fā)布復(fù)制和存儲,本文就介紹了通過java實現(xiàn)短地址服務(wù),減少了許多使用太長URL帶來的不便,需要的朋友可以參考下
    2015-07-07
  • 詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法

    詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法

    本篇文章主要介紹了詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-04-04
  • Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題

    Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題

    這篇文章主要介紹了Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-03-03

最新評論