RocketMQ中的消費(fèi)者啟動(dòng)流程解讀
RocketMQ消費(fèi)者啟動(dòng)
問題
消費(fèi)者啟動(dòng)的時(shí)候,去哪拿的消息呢?
問題答案

(1)當(dāng)broker啟動(dòng)的時(shí)候,會(huì)把broker的地址端口、broker上的主題信息、主題隊(duì)列信息發(fā)送到nameserver(如圖中1)
(2)消費(fèi)者Client啟動(dòng)的時(shí)候會(huì)去nameserver拿toipc、topic隊(duì)列以及對(duì)應(yīng)的broker信息,拿到以后把信息存儲(chǔ)到本地(如圖中2)
(3)消費(fèi)者會(huì)給所有的broker發(fā)送心跳,并且附帶自己的消費(fèi)者組信息和ClientID信息,此時(shí)broker中就有消費(fèi)者組對(duì)應(yīng)的ClientID集合(如圖中3)
(4)消費(fèi)者啟動(dòng)后會(huì)reblance,有訂閱的主題隊(duì)列列表,并且通過broker可以拿到消費(fèi)者組的ClientID集合,兩個(gè)集合做rebalance,就可以拿到當(dāng)前消費(fèi)者對(duì)應(yīng)消費(fèi)的主題隊(duì)列
(5) 消費(fèi)者知道自己消費(fèi)的主題隊(duì)列,就可以根據(jù)隊(duì)列信息通過Netty發(fā)送消息
跟源碼
注意
本文是消費(fèi)者啟動(dòng)流程,所以不去關(guān)注broker和nameserver的啟動(dòng)流程,這樣關(guān)注點(diǎn)比較集中,因此步驟(1)本文不做描述。
消費(fèi)者啟動(dòng)時(shí)怎么拿到toipc的信息
消費(fèi)者啟動(dòng)的時(shí)候會(huì)調(diào)用 MQClientInstance###start()方法,start()方法里有會(huì)調(diào)用 MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會(huì)每隔一段時(shí)間更新一下topic路由信息
//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);會(huì)把路由信息保存到本地的一個(gè)HashMap里,這樣消費(fèi)者就拿到了topic的信息并且會(huì)把broker的信息保存下來
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //根據(jù)主題從nameserver獲取topic信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主題和主題隊(duì)列相關(guān)的broker保存下來
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}總結(jié):消費(fèi)者拿到主題的隊(duì)列列表和broker信息
消費(fèi)者給broker發(fā)現(xiàn)心跳的作用
MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會(huì)每隔一段時(shí)間給所有的broker發(fā)送心跳消息
//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);那么發(fā)送的心跳包中攜帶什么信息呢?如代碼中所示,攜帶clientID和組名稱
//MQClientInstance###prepareHeartbeatData
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
// clientID
//放入了當(dāng)前消費(fèi)者的clientID
//放入了當(dāng)前消費(fèi)者的clientID
//放入了當(dāng)前消費(fèi)者的clientID
heartbeatData.setClientID(this.clientId);
// Consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
//放入了當(dāng)前消費(fèi)者的組名稱
//放入了當(dāng)前消費(fèi)者的組名稱
//放入了當(dāng)前消費(fèi)者的組名稱
//放入了當(dāng)前消費(fèi)者的組名稱
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}此時(shí)broker拿到心跳消息怎么處理的呢?有一部分邏輯如下面代碼所示,記錄一下消費(fèi)者信息
//ClientManageProcessor###heartBeat(ChannelHandlerContext ctx, RemotingCommand request)
```java
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
//省略
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//省略
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
//省略
}
//省略
}消費(fèi)者怎么做reblance
MQClientInstance的start的方法里會(huì)開啟一個(gè)rebalance的線程,如下面代碼所示
//MQClientInstance###start()
public void start() throws MQClientException {
//省略
// Start rebalance service
this.rebalanceService.start();
//省略
}跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法,如下面代碼所示。根據(jù)主題隊(duì)列列表和消費(fèi)者組集合去做一個(gè)Rebalance,最后的返回結(jié)果是當(dāng)前消費(fèi)者需要消費(fèi)的主題隊(duì)列。
//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
//獲取訂閱的主題的隊(duì)列
//獲取訂閱的主題的隊(duì)列
//獲取訂閱的主題的隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//獲取同消費(fèi)者組的ClientID集合
//獲取同消費(fèi)者組的ClientID集合
//獲取同消費(fèi)者組的ClientID集合
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序
//排序
//排序
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列
//rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列
//rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
//rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列
//rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列
//rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列
allocateResultSet.addAll(allocateResult);
}
//此處看下面的消費(fèi)者怎么去拉消息
//此處看下面的消費(fèi)者怎么去拉消息
//此處看下面的消費(fèi)者怎么去拉消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
}
}總結(jié):消費(fèi)者拿到主題的隊(duì)列列表和消費(fèi)者組中ClientID集合,通過在消費(fèi)者這變做rebalance,從而確定被分配的主題隊(duì)列集合
消費(fèi)者怎么拉取消息
此處還是繼續(xù)跟上面的代碼,,然后執(zhí)行到下面的代碼,當(dāng)消費(fèi)者確定自己被分配的主題隊(duì)列后,會(huì)把主題隊(duì)列封裝成PullRequest 并進(jìn)行dispatch
//RebalanceImpl###updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
//省列
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
//省略
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
//省略
//派發(fā)請(qǐng)求任務(wù)
this.dispatchPullRequest(pullRequestList);
return changed;
}下面跟RebalanceImpl###dispatchPullRequest方法,最后跟到下面的代碼,就是把PullRequest放入到一個(gè)阻塞隊(duì)列里。
//PullMessageService###executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}那么誰取阻塞隊(duì)列里的數(shù)據(jù)誰就是消費(fèi)消息了? PullMessageService是一個(gè)線程,他的run方法里會(huì)取上面阻塞隊(duì)列里的PullRequest,如下面代碼所示
//PullMessageService###run()
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}從PullMessageService###pullMessage方法一直往下跟,就跟到下面的代碼
//DefaultMQPushConsumerImpl###pullMessage(final PullRequest pullRequest)
public void pullMessage(final PullRequest pullRequest) {
//省略
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
//省略,但是重要,后面會(huì)說
//省略,但是重要,后面會(huì)說
//省略,但是重要,后面會(huì)說
//省略,但是重要,后面會(huì)說
};
//省略
try {
//發(fā)送數(shù)據(jù)并且執(zhí)行回調(diào)方法,下面我們看一下回調(diào)方法的內(nèi)容就好好了
//發(fā)送數(shù)據(jù)并且執(zhí)行回調(diào)方法,下面我們看一下回調(diào)方法的內(nèi)容就好好了
//發(fā)送數(shù)據(jù)并且執(zhí)行回調(diào)方法,下面我們看一下回調(diào)方法的內(nèi)容就好好了
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}那么回調(diào)方法是什么邏輯呢?代碼如下所示,發(fā)現(xiàn)數(shù)據(jù)并且submitConsumeRequest
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
//發(fā)現(xiàn)數(shù)據(jù)
//發(fā)現(xiàn)數(shù)據(jù)
//發(fā)現(xiàn)數(shù)據(jù)
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//跟進(jìn)去
//跟進(jìn)去
//跟進(jìn)去
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
//省略
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
//省略
}
}
}
};跟上面submitConsumeRequest方法的到下面的代碼,封裝成ConsumeRequest,其實(shí)ConsumerRequest是一個(gè)線程
//ConsumeMessageConcurrentlyService###submitConsumeRequest
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//省略
}
}
}ConsumeRequest的run方法就會(huì)執(zhí)行我們注冊(cè)的listener方法,此時(shí)就消費(fèi)到數(shù)據(jù)
```java
@Override
public void run() {
//省略
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
//省略
}
}
總結(jié): 如下圖所示,RebalanceService線程會(huì)根據(jù)情況把請(qǐng)求放在PullMessageService的pullRequestQueue阻塞隊(duì)列隊(duì)列里,隊(duì)列的每一個(gè)節(jié)點(diǎn)就是拉請(qǐng)求;PullMessageService線程就是不斷去pullRequestQueue里拿任務(wù)然后去看一下broker中有沒有數(shù)據(jù),如果有數(shù)據(jù)就消費(fèi)。

總結(jié)
(1)忽然發(fā)現(xiàn)nameserver在整個(gè)過程中的作用感覺不是很大,其實(shí)我感覺這種設(shè)計(jì)還挺好的,因?yàn)榘阉械膲毫Χ挤旁趎ameserver返回減少系統(tǒng)的健壯性。
(2)RocketMQ的rebalance是在消息消費(fèi)者這邊實(shí)現(xiàn)的,這樣有一個(gè)很大的優(yōu)勢(shì)是減少nameserver和broker的壓力。那消費(fèi)者是怎么實(shí)現(xiàn)rebalance的呢?通過一個(gè)參數(shù)為當(dāng)前消費(fèi)者ID、主題隊(duì)列、消費(fèi)者組ClientID列表的匹配算法,每次只要保證算法的冪等性就可以了。
(3)RocketMQ的rebalance的rebalance是根據(jù)單個(gè)主題去實(shí)現(xiàn)的,這樣的一個(gè)缺點(diǎn)是容易出現(xiàn)消費(fèi)不平衡的問題。如下圖所示。

(4)RocketMQ是AP的,因?yàn)樗暮懿僮鞫际嵌际峭ㄟ^線程池的定時(shí)任務(wù)去做的。
到此這篇關(guān)于RocketMQ中的消費(fèi)者啟動(dòng)流程解讀的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)者啟動(dòng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問題
- RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解
- RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理
- RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解
- 詳解RocketMQ中的消費(fèi)者啟動(dòng)與消費(fèi)流程分析
- RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑
- RocketMq同組消費(fèi)者如何自動(dòng)設(shè)置InstanceName
相關(guān)文章
springboot 使用poi進(jìn)行數(shù)據(jù)的導(dǎo)出過程詳解
這篇文章主要介紹了springboot 使用poi進(jìn)行數(shù)據(jù)的導(dǎo)出過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09
spring cloud gateway中redis一直打印重連日志問題及解決
這篇文章主要介紹了spring cloud gateway中redis一直打印重連日志問題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
Java實(shí)現(xiàn)warcraft?java版游戲的示例代碼
致敬經(jīng)典的warcraft,《warcraft?java版》是一款即時(shí)戰(zhàn)略題材單機(jī)游戲,采用魔獸原味風(fēng)格和機(jī)制。本文將用java語言實(shí)現(xiàn),采用了swing技術(shù)進(jìn)行了界面化處理,感興趣的可以了解一下2022-09-09
webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實(shí)例代碼
這篇文章主要介紹了webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實(shí)例代碼,需要的朋友可以參考下2017-04-04
解決feign微服務(wù)間的文件上傳報(bào)錯(cuò)問題
這篇文章主要介紹了解決feign微服務(wù)間的文件上傳報(bào)錯(cuò)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
手把手教你用Java實(shí)現(xiàn)一套簡單的鑒權(quán)服務(wù)
現(xiàn)今大部分系統(tǒng)都會(huì)有自己的鑒權(quán)服務(wù),本文介紹了最常用的鑒權(quán)服務(wù),就是日常用戶的登錄登出,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
JDK9為何要將String的底層實(shí)現(xiàn)由char[]改成了byte[]
String 類的源碼已經(jīng)由?char[]?優(yōu)化為了?byte[]?來存儲(chǔ)字符串內(nèi)容,為什么要這樣做呢?本文就詳細(xì)的介紹一下,感興趣的可以了解一下2022-03-03
Java的Cglib動(dòng)態(tài)代理實(shí)現(xiàn)方式詳解
這篇文章主要介紹了Java的Cglib動(dòng)態(tài)代理實(shí)現(xiàn)方式詳解,CGLIB是強(qiáng)大的、高性能的代碼生成庫,被廣泛應(yīng)用于AOP框架,它底層使用ASM來操作字節(jié)碼生成新的類,為對(duì)象引入間接級(jí)別,以控制對(duì)象的訪問,需要的朋友可以參考下2023-11-11

