RocketMQ中的消費者啟動流程解讀
RocketMQ消費者啟動
問題
消費者啟動的時候,去哪拿的消息呢?
問題答案

(1)當broker啟動的時候,會把broker的地址端口、broker上的主題信息、主題隊列信息發(fā)送到nameserver(如圖中1)
(2)消費者Client啟動的時候會去nameserver拿toipc、topic隊列以及對應的broker信息,拿到以后把信息存儲到本地(如圖中2)
(3)消費者會給所有的broker發(fā)送心跳,并且附帶自己的消費者組信息和ClientID信息,此時broker中就有消費者組對應的ClientID集合(如圖中3)
(4)消費者啟動后會reblance,有訂閱的主題隊列列表,并且通過broker可以拿到消費者組的ClientID集合,兩個集合做rebalance,就可以拿到當前消費者對應消費的主題隊列
(5) 消費者知道自己消費的主題隊列,就可以根據(jù)隊列信息通過Netty發(fā)送消息
跟源碼
注意
本文是消費者啟動流程,所以不去關注broker和nameserver的啟動流程,這樣關注點比較集中,因此步驟(1)本文不做描述。
消費者啟動時怎么拿到toipc的信息
消費者啟動的時候會調用 MQClientInstance###start()方法,start()方法里有會調用 MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會每隔一段時間更新一下topic路由信息
//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);會把路由信息保存到本地的一個HashMap里,這樣消費者就拿到了topic的信息并且會把broker的信息保存下來
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //根據(jù)主題從nameserver獲取topic信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主題和主題隊列相關的broker保存下來
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}總結:消費者拿到主題的隊列列表和broker信息
消費者給broker發(fā)現(xiàn)心跳的作用
MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會每隔一段時間給所有的broker發(fā)送心跳消息
//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);那么發(fā)送的心跳包中攜帶什么信息呢?如代碼中所示,攜帶clientID和組名稱
//MQClientInstance###prepareHeartbeatData
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
// clientID
//放入了當前消費者的clientID
//放入了當前消費者的clientID
//放入了當前消費者的clientID
heartbeatData.setClientID(this.clientId);
// Consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
//放入了當前消費者的組名稱
//放入了當前消費者的組名稱
//放入了當前消費者的組名稱
//放入了當前消費者的組名稱
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}此時broker拿到心跳消息怎么處理的呢?有一部分邏輯如下面代碼所示,記錄一下消費者信息
//ClientManageProcessor###heartBeat(ChannelHandlerContext ctx, RemotingCommand request)
```java
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
//省略
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//省略
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
//省略
}
//省略
}消費者怎么做reblance
MQClientInstance的start的方法里會開啟一個rebalance的線程,如下面代碼所示
//MQClientInstance###start()
public void start() throws MQClientException {
//省略
// Start rebalance service
this.rebalanceService.start();
//省略
}跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法,如下面代碼所示。根據(jù)主題隊列列表和消費者組集合去做一個Rebalance,最后的返回結果是當前消費者需要消費的主題隊列。
//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
//獲取訂閱的主題的隊列
//獲取訂閱的主題的隊列
//獲取訂閱的主題的隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//獲取同消費者組的ClientID集合
//獲取同消費者組的ClientID集合
//獲取同消費者組的ClientID集合
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序
//排序
//排序
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
//rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
//rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
//rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
//rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
//rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
allocateResultSet.addAll(allocateResult);
}
//此處看下面的消費者怎么去拉消息
//此處看下面的消費者怎么去拉消息
//此處看下面的消費者怎么去拉消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
}
}總結:消費者拿到主題的隊列列表和消費者組中ClientID集合,通過在消費者這變做rebalance,從而確定被分配的主題隊列集合
消費者怎么拉取消息
此處還是繼續(xù)跟上面的代碼,,然后執(zhí)行到下面的代碼,當消費者確定自己被分配的主題隊列后,會把主題隊列封裝成PullRequest 并進行dispatch
//RebalanceImpl###updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
//省列
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
//省略
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
//省略
//派發(fā)請求任務
this.dispatchPullRequest(pullRequestList);
return changed;
}下面跟RebalanceImpl###dispatchPullRequest方法,最后跟到下面的代碼,就是把PullRequest放入到一個阻塞隊列里。
//PullMessageService###executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}那么誰取阻塞隊列里的數(shù)據(jù)誰就是消費消息了? PullMessageService是一個線程,他的run方法里會取上面阻塞隊列里的PullRequest,如下面代碼所示
//PullMessageService###run()
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}從PullMessageService###pullMessage方法一直往下跟,就跟到下面的代碼
//DefaultMQPushConsumerImpl###pullMessage(final PullRequest pullRequest)
public void pullMessage(final PullRequest pullRequest) {
//省略
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
//省略,但是重要,后面會說
//省略,但是重要,后面會說
//省略,但是重要,后面會說
//省略,但是重要,后面會說
};
//省略
try {
//發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內容就好好了
//發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內容就好好了
//發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內容就好好了
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}那么回調方法是什么邏輯呢?代碼如下所示,發(fā)現(xiàn)數(shù)據(jù)并且submitConsumeRequest
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
//發(fā)現(xiàn)數(shù)據(jù)
//發(fā)現(xiàn)數(shù)據(jù)
//發(fā)現(xiàn)數(shù)據(jù)
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//跟進去
//跟進去
//跟進去
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
//省略
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
//省略
}
}
}
};跟上面submitConsumeRequest方法的到下面的代碼,封裝成ConsumeRequest,其實ConsumerRequest是一個線程
//ConsumeMessageConcurrentlyService###submitConsumeRequest
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//省略
}
}
}ConsumeRequest的run方法就會執(zhí)行我們注冊的listener方法,此時就消費到數(shù)據(jù)
```java
@Override
public void run() {
//省略
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
//省略
}
}
總結: 如下圖所示,RebalanceService線程會根據(jù)情況把請求放在PullMessageService的pullRequestQueue阻塞隊列隊列里,隊列的每一個節(jié)點就是拉請求;PullMessageService線程就是不斷去pullRequestQueue里拿任務然后去看一下broker中有沒有數(shù)據(jù),如果有數(shù)據(jù)就消費。

總結
(1)忽然發(fā)現(xiàn)nameserver在整個過程中的作用感覺不是很大,其實我感覺這種設計還挺好的,因為把所有的壓力都放在nameserver返回減少系統(tǒng)的健壯性。
(2)RocketMQ的rebalance是在消息消費者這邊實現(xiàn)的,這樣有一個很大的優(yōu)勢是減少nameserver和broker的壓力。那消費者是怎么實現(xiàn)rebalance的呢?通過一個參數(shù)為當前消費者ID、主題隊列、消費者組ClientID列表的匹配算法,每次只要保證算法的冪等性就可以了。
(3)RocketMQ的rebalance的rebalance是根據(jù)單個主題去實現(xiàn)的,這樣的一個缺點是容易出現(xiàn)消費不平衡的問題。如下圖所示。

(4)RocketMQ是AP的,因為他的很操作都是都是通過線程池的定時任務去做的。
到此這篇關于RocketMQ中的消費者啟動流程解讀的文章就介紹到這了,更多相關RocketMQ消費者啟動內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot 使用poi進行數(shù)據(jù)的導出過程詳解
這篇文章主要介紹了springboot 使用poi進行數(shù)據(jù)的導出過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09
spring cloud gateway中redis一直打印重連日志問題及解決
這篇文章主要介紹了spring cloud gateway中redis一直打印重連日志問題及解決,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05
Java實現(xiàn)warcraft?java版游戲的示例代碼
致敬經典的warcraft,《warcraft?java版》是一款即時戰(zhàn)略題材單機游戲,采用魔獸原味風格和機制。本文將用java語言實現(xiàn),采用了swing技術進行了界面化處理,感興趣的可以了解一下2022-09-09
webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實例代碼
這篇文章主要介紹了webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實例代碼,需要的朋友可以參考下2017-04-04
JDK9為何要將String的底層實現(xiàn)由char[]改成了byte[]
String 類的源碼已經由?char[]?優(yōu)化為了?byte[]?來存儲字符串內容,為什么要這樣做呢?本文就詳細的介紹一下,感興趣的可以了解一下2022-03-03
Java的Cglib動態(tài)代理實現(xiàn)方式詳解
這篇文章主要介紹了Java的Cglib動態(tài)代理實現(xiàn)方式詳解,CGLIB是強大的、高性能的代碼生成庫,被廣泛應用于AOP框架,它底層使用ASM來操作字節(jié)碼生成新的類,為對象引入間接級別,以控制對象的訪問,需要的朋友可以參考下2023-11-11

