RocketMQ?Push?消費(fèi)模型示例詳解
Push 模式是指由 Server 端來控制消息的推送,即當(dāng)有消息到 Server 之后,會將消息主動投遞給 client(Consumer 端)。
使用 DefaultMQPushConsumer 消費(fèi)消息
下面是使用 DefaultMQPushConsumer 消費(fèi)消息的官方示例代碼:
// 初始化consumer,并設(shè)置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
// 設(shè)置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//訂閱一個或多個topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注冊回調(diào)接口來處理從Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動Consumer
consumer.start();
這里看到主要是通過 consumer 注冊回調(diào)接口來處理從 Broker 中收到的消息。這種監(jiān)聽回調(diào)的機(jī)制很容易想到是一種觀察者模式或者事件機(jī)制;對于這種 C-S 模型的架構(gòu)來說,如果要做到 Server 在有新消息時立即推送給 Client,那么 Client 和 Server 之間應(yīng)該是有連接存在的,Client 端開放端口來 watch Server 的推送。這里好論證,即可以查看當(dāng)前 Client 端所在進(jìn)程開啟了什么端口即可,通過如下指令查看:
- 1、先通過 jps 查看 Consumer Client 的進(jìn)程號
? rocketmq-4.9.4 git:(06f2208a3) jps 10722 Jps 4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1766 4121 BrokerStartup 4009 NamesrvStartup 9419 PushConsumer 9692 RemoteMavenServer36
可以看到 PushConsumer 的進(jìn)程號是 9419
- 2、通過 lsof 命令查看進(jìn)程端口占用
? rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN ?
這里沒有看到 PushConsumer 有開啟端口。同樣,這里可以看看 Broker 的進(jìn)程端口占用
? rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN java 4121 glmapper 137u IPv6 0xca1142b0f200067d 0t0 TCP *:10912 (LISTEN) java 4121 glmapper 141u IPv6 0xca1142b0f1fc8cfd 0t0 TCP *:10911 (LISTEN) java 4121 glmapper 142u IPv6 0xca1142b0f1fc935d 0t0 TCP *:10909 (LISTEN)
所以得到一個初步的結(jié)論是,在 Push 模式下,Consumer Client 并沒有啟動端口來接收 Server 的消息推送。 那么 RocketMQ 是怎么實(shí)現(xiàn)的?
基于長輪詢機(jī)制的偽 push 實(shí)現(xiàn)
真正的 Push 方式,是 Server 端接收到消息后,主動把消息推送給 Client 端,這種情況一般需要 Client 和 Server 之間建立長連接。通過前面的分析,Client 既然沒有開啟端口用于接收 Server 的信息推送,那么只有一種可能就是 Client 自己去拉了消息,但是這種主動拉消息的方式是對于用戶無感的,從使用上體驗(yàn)上來看,做到了和 push 一樣的效果;這種機(jī)制就是“長輪詢”。
為啥不用長連接方式,讓 Server 主動 Push 呢?其實(shí)很好理解,對于一個提供隊(duì)列服務(wù)的 Server 來說,用 Push方式主動推送有兩個問題:
- 1、會增加 Server 端的工作量,進(jìn)而影響 Server 的性能
- 2、Client 的處理能力存在差異,Client 的狀態(tài)不受 Server 控制,如果 Client 不能及時處理 Server 推送過來的消息,會造成各種潛在問題
客戶端側(cè)發(fā)起的長輪詢請求
下圖是初始化相關(guān)資源的過程,DefaultMQPushConsumer 是面向用戶使用的 API client 類,內(nèi)部處理實(shí)際上是委托給 DefaultMQPushConsumerImpl 來處理的。DefaultMQPushConsumerImpl#start 時,會初始化 MQClientInstance ,MQClientInstance 初始化過程中又會初始化一堆資源,比如請求-響應(yīng)的通道,開啟各種各樣的調(diào)度任務(wù)(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline狀態(tài)的 Broker、定期發(fā)送心跳給 Broker、定期持久化所有 Consumer Offset等等),開啟 pullMessageService,開啟 rebalance Service 等等。大致的調(diào)用鏈如下圖

下面這個代碼片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子類)
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 從 pullRequestQueue 中取 pullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
通過代碼,可以直觀的看起,pullMessageService 會一直從 pullRequestQueue 中取 pullRequest,然后執(zhí)行 pullMessage 請求。實(shí)際上 MessageQueue 是和 pullRequest 一一對應(yīng)的 ,pullRequest 全部存儲到該 Consumer 的 pullRequestQueue 隊(duì)列里面;消費(fèi)者會不停的從 PullRequest 的隊(duì)列里取 request 然后向broker 請求消息。
這里還有一個問題是隊(duì)列取出之后什么時候放回去的?在 pullMessage 的回調(diào)方法中,如果正常得到了 broker 的響應(yīng),那么會把 PullRequest放回隊(duì)列,相關(guān)代碼可以從 org.apache.rocketmq.client.consumer.PullCallbackonSuccess 方法中得到答案。
服務(wù)端阻塞請求
服務(wù)端處理 pullRequest 請求的是 PullMessageProcessor,當(dāng)沒有消息時,則通過 PullRequestHoldService 將當(dāng)前請求先 hold 住。
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
// 如果是 LongPolling,則 hold 住
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
PullRequestHoldService 中會將所有的 PullRequest 緩存到 pullRequestTable。PullRequestHoldService 也是一個 task,默認(rèn)每次 hold 5s 然后再去檢查是否有新的消息過來,如果有新的消息到來,則喚醒對應(yīng)的線程來將消息返回給客戶端。
// 已省略無關(guān)代碼
public void run() {
// loop
while (!this.isStopped()) {
// default hold 5s
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 檢查是否有新的消息到達(dá)
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
}
}
客戶端回調(diào)處理
我們在編寫 consumer 代碼時,基于 push 模式是通過如下方式來監(jiān)聽消息的
//注冊回調(diào)接口來處理從Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
通過前面的分析,對于如何通過“長輪詢”實(shí)現(xiàn)偽“push” 有了大概得了解;客戶端通過一個定時任務(wù)不斷向 Broker 發(fā)請求,Broker 在沒有消息時先 hold 住一小段時間,當(dāng)有新的消息時會立即將消息返回給 consumer;本節(jié)就主要探討 consumer 在收到消息之后的處理邏輯,以及是怎么觸發(fā) MessageListener 回調(diào)執(zhí)行的。
客戶端發(fā)起請求的底層邏輯
以異步調(diào)用為例,代碼在
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync中,截取部分代碼如下:
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
// 成功回調(diào)
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
// 異常回調(diào)
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
// 異?;卣{(diào)
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
// 異常回調(diào)
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
// 異?;卣{(diào)
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
PullCallback 回調(diào)
PullCallback 回調(diào)邏輯在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage方法中,以正常返回消息為例:
// 已省略無關(guān)代碼
public void onSuccess(PullResult pullResult) {
// 將接收到的消息 交給 consumeMessageService 處理
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 將 pullRequest 放回 pullRequestQueue
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
ConsumeRequest 是一個 Runnable,submitConsumeRequest 就是將返回結(jié)果丟在一個單獨(dú)的線程池中去處理返回結(jié)果的。ConsumeRequest 的 run 方法中,會拿到 messageListener,然后執(zhí)行 consumeMessage 方法。
總結(jié)
到此,關(guān)于 RocketMQ push 消費(fèi)模型基本就探討完了。從實(shí)現(xiàn)機(jī)制上來看,push 本質(zhì)上并不是在建立雙向通道的前提下,由 Server 主動推送給 Client 的,而是由 Client 端觸發(fā) pullRequest 請求,以長輪詢的方式“偽裝”的結(jié)果。從代碼上來,RocketMQ 代碼中使用了非常多的異步機(jī)制,如 pullRequestQueue 來解耦發(fā)送請求和等待結(jié)果,各種定時任務(wù)等等。
整體看,PushConsumer 采用了 長輪詢+超時時間+Pull的模式, 這種方式帶來的好處總結(jié)如下 :
- 1、減少 Broker 的壓力,避免由于不同 Consumer 消費(fèi)能力導(dǎo)致 Broker 出現(xiàn)問題
- 2、確保了 Consumer 不會負(fù)載過高,Consumer 在校驗(yàn)自己的緩存消息沒有超過閾值才會去從 Broker 拉取消息,Broker 不會主動推過來
- 3、兼顧了消息的即時性,Broker 在沒有消息的時候會先 hold 一小段時間,有消息會立即喚起線程將消息返回給 Consumer
- 4、Broker 端無效請求的次數(shù)大大降低,Broker 在沒有消息時會掛起 PullRequest,而 Consumer 在未接收到Response 且未超時時,也不會重新發(fā)起 PullRequest
以上就是RocketMQ Push 消費(fèi)模型示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Push 消費(fèi)模型的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java String字符串內(nèi)容實(shí)現(xiàn)添加雙引號
這篇文章主要介紹了Java String字符串內(nèi)容實(shí)現(xiàn)添加雙引號,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼
這篇文章主要介紹了application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼,具有一定參考價值,需要的朋友可以了解下。2017-11-11
Java序列化常見實(shí)現(xiàn)方法代碼實(shí)例
這篇文章主要介紹了Java序列化常見實(shí)現(xiàn)方法代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11
java實(shí)現(xiàn)短地址服務(wù)的方法(附代碼)
大多數(shù)情況下URL太長,字符多,不便于發(fā)布復(fù)制和存儲,本文就介紹了通過java實(shí)現(xiàn)短地址服務(wù),減少了許多使用太長URL帶來的不便,需要的朋友可以參考下2015-07-07
詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法
本篇文章主要介紹了詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-04-04
Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題
這篇文章主要介紹了Java之不通過構(gòu)造函數(shù)創(chuàng)建一個對象問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-03-03

