RocketMQ?Push?消費(fèi)模型示例詳解
Push 模式是指由 Server 端來控制消息的推送,即當(dāng)有消息到 Server 之后,會將消息主動投遞給 client(Consumer 端)。
使用 DefaultMQPushConsumer 消費(fèi)消息
下面是使用 DefaultMQPushConsumer 消費(fèi)消息的官方示例代碼:
// 初始化consumer,并設(shè)置consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup"); // 設(shè)置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); //訂閱一個或多個topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息 consumer.subscribe("TopicTest", "*"); //注冊回調(diào)接口來處理從Broker中收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動Consumer consumer.start();
這里看到主要是通過 consumer 注冊回調(diào)接口來處理從 Broker 中收到的消息。這種監(jiān)聽回調(diào)的機(jī)制很容易想到是一種觀察者模式或者事件機(jī)制;對于這種 C-S 模型的架構(gòu)來說,如果要做到 Server 在有新消息時立即推送給 Client,那么 Client 和 Server 之間應(yīng)該是有連接存在的,Client 端開放端口來 watch Server 的推送。這里好論證,即可以查看當(dāng)前 Client 端所在進(jìn)程開啟了什么端口即可,通過如下指令查看:
- 1、先通過 jps 查看 Consumer Client 的進(jìn)程號
? rocketmq-4.9.4 git:(06f2208a3) jps 10722 Jps 4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1766 4121 BrokerStartup 4009 NamesrvStartup 9419 PushConsumer 9692 RemoteMavenServer36
可以看到 PushConsumer 的進(jìn)程號是 9419
- 2、通過 lsof 命令查看進(jìn)程端口占用
? rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN ?
這里沒有看到 PushConsumer 有開啟端口。同樣,這里可以看看 Broker 的進(jìn)程端口占用
? rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN java 4121 glmapper 137u IPv6 0xca1142b0f200067d 0t0 TCP *:10912 (LISTEN) java 4121 glmapper 141u IPv6 0xca1142b0f1fc8cfd 0t0 TCP *:10911 (LISTEN) java 4121 glmapper 142u IPv6 0xca1142b0f1fc935d 0t0 TCP *:10909 (LISTEN)
所以得到一個初步的結(jié)論是,在 Push 模式下,Consumer Client 并沒有啟動端口來接收 Server 的消息推送。 那么 RocketMQ 是怎么實現(xiàn)的?
基于長輪詢機(jī)制的偽 push 實現(xiàn)
真正的 Push 方式,是 Server 端接收到消息后,主動把消息推送給 Client 端,這種情況一般需要 Client 和 Server 之間建立長連接。通過前面的分析,Client 既然沒有開啟端口用于接收 Server 的信息推送,那么只有一種可能就是 Client 自己去拉了消息,但是這種主動拉消息的方式是對于用戶無感的,從使用上體驗上來看,做到了和 push 一樣的效果;這種機(jī)制就是“長輪詢”。
為啥不用長連接方式,讓 Server 主動 Push 呢?其實很好理解,對于一個提供隊列服務(wù)的 Server 來說,用 Push方式主動推送有兩個問題:
- 1、會增加 Server 端的工作量,進(jìn)而影響 Server 的性能
- 2、Client 的處理能力存在差異,Client 的狀態(tài)不受 Server 控制,如果 Client 不能及時處理 Server 推送過來的消息,會造成各種潛在問題
客戶端側(cè)發(fā)起的長輪詢請求
下圖是初始化相關(guān)資源的過程,DefaultMQPushConsumer 是面向用戶使用的 API client 類,內(nèi)部處理實際上是委托給 DefaultMQPushConsumerImpl 來處理的。DefaultMQPushConsumerImpl#start 時,會初始化 MQClientInstance ,MQClientInstance 初始化過程中又會初始化一堆資源,比如請求-響應(yīng)的通道,開啟各種各樣的調(diào)度任務(wù)(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline狀態(tài)的 Broker、定期發(fā)送心跳給 Broker、定期持久化所有 Consumer Offset等等),開啟 pullMessageService,開啟 rebalance Service 等等。大致的調(diào)用鏈如下圖
下面這個代碼片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子類)
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 從 pullRequestQueue 中取 pullRequest PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
通過代碼,可以直觀的看起,pullMessageService 會一直從 pullRequestQueue 中取 pullRequest,然后執(zhí)行 pullMessage 請求。實際上 MessageQueue 是和 pullRequest 一一對應(yīng)的 ,pullRequest 全部存儲到該 Consumer 的 pullRequestQueue 隊列里面;消費(fèi)者會不停的從 PullRequest 的隊列里取 request 然后向broker 請求消息。
這里還有一個問題是隊列取出之后什么時候放回去的?在 pullMessage 的回調(diào)方法中,如果正常得到了 broker 的響應(yīng),那么會把 PullRequest放回隊列,相關(guān)代碼可以從 org.apache.rocketmq.client.consumer.PullCallback
onSuccess 方法中得到答案。
服務(wù)端阻塞請求
服務(wù)端處理 pullRequest 請求的是 PullMessageProcessor,當(dāng)沒有消息時,則通過 PullRequestHoldService 將當(dāng)前請求先 hold 住。
case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; // 如果是 LongPolling,則 hold 住 if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
PullRequestHoldService 中會將所有的 PullRequest 緩存到 pullRequestTable。PullRequestHoldService 也是一個 task,默認(rèn)每次 hold 5s 然后再去檢查是否有新的消息過來,如果有新的消息到來,則喚醒對應(yīng)的線程來將消息返回給客戶端。
// 已省略無關(guān)代碼 public void run() { // loop while (!this.isStopped()) { // default hold 5s if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 檢查是否有新的消息到達(dá) this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } }
客戶端回調(diào)處理
我們在編寫 consumer 代碼時,基于 push 模式是通過如下方式來監(jiān)聽消息的
//注冊回調(diào)接口來處理從Broker中收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
通過前面的分析,對于如何通過“長輪詢”實現(xiàn)偽“push” 有了大概得了解;客戶端通過一個定時任務(wù)不斷向 Broker 發(fā)請求,Broker 在沒有消息時先 hold 住一小段時間,當(dāng)有新的消息時會立即將消息返回給 consumer;本節(jié)就主要探討 consumer 在收到消息之后的處理邏輯,以及是怎么觸發(fā) MessageListener 回調(diào)執(zhí)行的。
客戶端發(fā)起請求的底層邏輯
以異步調(diào)用為例,代碼在
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
中,截取部分代碼如下:
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; // 成功回調(diào) pullCallback.onSuccess(pullResult); } catch (Exception e) { // 異?;卣{(diào) pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { // 異?;卣{(diào) pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { // 異?;卣{(diào) pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { // 異?;卣{(diào) pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } });
PullCallback 回調(diào)
PullCallback 回調(diào)邏輯在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
方法中,以正常返回消息為例:
// 已省略無關(guān)代碼 public void onSuccess(PullResult pullResult) { // 將接收到的消息 交給 consumeMessageService 處理 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); // 將 pullRequest 放回 pullRequestQueue DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }
ConsumeRequest 是一個 Runnable,submitConsumeRequest 就是將返回結(jié)果丟在一個單獨(dú)的線程池中去處理返回結(jié)果的。ConsumeRequest 的 run 方法中,會拿到 messageListener,然后執(zhí)行 consumeMessage 方法。
總結(jié)
到此,關(guān)于 RocketMQ push 消費(fèi)模型基本就探討完了。從實現(xiàn)機(jī)制上來看,push 本質(zhì)上并不是在建立雙向通道的前提下,由 Server 主動推送給 Client 的,而是由 Client 端觸發(fā) pullRequest 請求,以長輪詢的方式“偽裝”的結(jié)果。從代碼上來,RocketMQ 代碼中使用了非常多的異步機(jī)制,如 pullRequestQueue 來解耦發(fā)送請求和等待結(jié)果,各種定時任務(wù)等等。
整體看,PushConsumer 采用了 長輪詢+超時時間+Pull的模式, 這種方式帶來的好處總結(jié)如下 :
- 1、減少 Broker 的壓力,避免由于不同 Consumer 消費(fèi)能力導(dǎo)致 Broker 出現(xiàn)問題
- 2、確保了 Consumer 不會負(fù)載過高,Consumer 在校驗自己的緩存消息沒有超過閾值才會去從 Broker 拉取消息,Broker 不會主動推過來
- 3、兼顧了消息的即時性,Broker 在沒有消息的時候會先 hold 一小段時間,有消息會立即喚起線程將消息返回給 Consumer
- 4、Broker 端無效請求的次數(shù)大大降低,Broker 在沒有消息時會掛起 PullRequest,而 Consumer 在未接收到Response 且未超時時,也不會重新發(fā)起 PullRequest
以上就是RocketMQ Push 消費(fèi)模型示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Push 消費(fèi)模型的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java String字符串內(nèi)容實現(xiàn)添加雙引號
這篇文章主要介紹了Java String字符串內(nèi)容實現(xiàn)添加雙引號,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼
這篇文章主要介紹了application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼,具有一定參考價值,需要的朋友可以了解下。2017-11-11詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法
本篇文章主要介紹了詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-04-04Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題
這篇文章主要介紹了Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-03-03