RocketMQ中的消費(fèi)者啟動(dòng)流程解讀
RocketMQ消費(fèi)者啟動(dòng)
問(wèn)題
消費(fèi)者啟動(dòng)的時(shí)候,去哪拿的消息呢?
問(wèn)題答案
(1)當(dāng)broker啟動(dòng)的時(shí)候,會(huì)把broker的地址端口、broker上的主題信息、主題隊(duì)列信息發(fā)送到nameserver(如圖中1)
(2)消費(fèi)者Client啟動(dòng)的時(shí)候會(huì)去nameserver拿toipc、topic隊(duì)列以及對(duì)應(yīng)的broker信息,拿到以后把信息存儲(chǔ)到本地(如圖中2)
(3)消費(fèi)者會(huì)給所有的broker發(fā)送心跳,并且附帶自己的消費(fèi)者組信息和ClientID信息,此時(shí)broker中就有消費(fèi)者組對(duì)應(yīng)的ClientID集合(如圖中3)
(4)消費(fèi)者啟動(dòng)后會(huì)reblance,有訂閱的主題隊(duì)列列表,并且通過(guò)broker可以拿到消費(fèi)者組的ClientID集合,兩個(gè)集合做rebalance,就可以拿到當(dāng)前消費(fèi)者對(duì)應(yīng)消費(fèi)的主題隊(duì)列
(5) 消費(fèi)者知道自己消費(fèi)的主題隊(duì)列,就可以根據(jù)隊(duì)列信息通過(guò)Netty發(fā)送消息
跟源碼
注意
本文是消費(fèi)者啟動(dòng)流程,所以不去關(guān)注broker和nameserver的啟動(dòng)流程,這樣關(guān)注點(diǎn)比較集中,因此步驟(1)本文不做描述。
消費(fèi)者啟動(dòng)時(shí)怎么拿到toipc的信息
消費(fèi)者啟動(dòng)的時(shí)候會(huì)調(diào)用 MQClientInstance###start()方法,start()方法里有會(huì)調(diào)用 MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會(huì)每隔一段時(shí)間更新一下topic路由信息
//MQClientInstance###startScheduledTask() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
會(huì)把路由信息保存到本地的一個(gè)HashMap里,這樣消費(fèi)者就拿到了topic的信息并且會(huì)把broker的信息保存下來(lái)
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //根據(jù)主題從nameserver獲取topic信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //把主題和主題隊(duì)列相關(guān)的broker保存下來(lái) TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); }
總結(jié):消費(fèi)者拿到主題的隊(duì)列列表和broker信息
消費(fèi)者給broker發(fā)現(xiàn)心跳的作用
MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會(huì)每隔一段時(shí)間給所有的broker發(fā)送心跳消息
//MQClientInstance###startScheduledTask() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
那么發(fā)送的心跳包中攜帶什么信息呢?如代碼中所示,攜帶clientID和組名稱
//MQClientInstance###prepareHeartbeatData private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID //放入了當(dāng)前消費(fèi)者的clientID //放入了當(dāng)前消費(fèi)者的clientID //放入了當(dāng)前消費(fèi)者的clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { ConsumerData consumerData = new ConsumerData(); //放入了當(dāng)前消費(fèi)者的組名稱 //放入了當(dāng)前消費(fèi)者的組名稱 //放入了當(dāng)前消費(fèi)者的組名稱 //放入了當(dāng)前消費(fèi)者的組名稱 consumerData.setGroupName(impl.groupName()); consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel()); consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode()); heartbeatData.getConsumerDataSet().add(consumerData); } } // Producer for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { ProducerData producerData = new ProducerData(); producerData.setGroupName(entry.getKey()); heartbeatData.getProducerDataSet().add(producerData); } } return heartbeatData; }
此時(shí)broker拿到心跳消息怎么處理的呢?有一部分邏輯如下面代碼所示,記錄一下消費(fèi)者信息
//ClientManageProcessor###heartBeat(ChannelHandlerContext ctx, RemotingCommand request) ```java public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { //省略 for (ConsumerData data : heartbeatData.getConsumerDataSet()) { //省略 boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); //省略 } //省略 }
消費(fèi)者怎么做reblance
MQClientInstance的start的方法里會(huì)開啟一個(gè)rebalance的線程,如下面代碼所示
//MQClientInstance###start() public void start() throws MQClientException { //省略 // Start rebalance service this.rebalanceService.start(); //省略 }
跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法,如下面代碼所示。根據(jù)主題隊(duì)列列表和消費(fèi)者組集合去做一個(gè)Rebalance,最后的返回結(jié)果是當(dāng)前消費(fèi)者需要消費(fèi)的主題隊(duì)列。
//RebalanceImpl##rebalanceByTopic private void rebalanceByTopic(final String topic, final boolean isOrder) { //獲取訂閱的主題的隊(duì)列 //獲取訂閱的主題的隊(duì)列 //獲取訂閱的主題的隊(duì)列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //獲取同消費(fèi)者組的ClientID集合 //獲取同消費(fèi)者組的ClientID集合 //獲取同消費(fèi)者組的ClientID集合 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //排序 //排序 //排序 Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列 //rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列 //rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { //rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列 //rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列 //rebalance算法核心實(shí)現(xiàn),最后的結(jié)果是返回應(yīng)該消費(fèi)的隊(duì)列 allocateResultSet.addAll(allocateResult); } //此處看下面的消費(fèi)者怎么去拉消息 //此處看下面的消費(fèi)者怎么去拉消息 //此處看下面的消費(fèi)者怎么去拉消息 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); } }
總結(jié):消費(fèi)者拿到主題的隊(duì)列列表和消費(fèi)者組中ClientID集合,通過(guò)在消費(fèi)者這變做rebalance,從而確定被分配的主題隊(duì)列集合
消費(fèi)者怎么拉取消息
此處還是繼續(xù)跟上面的代碼,,然后執(zhí)行到下面的代碼,當(dāng)消費(fèi)者確定自己被分配的主題隊(duì)列后,會(huì)把主題隊(duì)列封裝成PullRequest 并進(jìn)行dispatch
//RebalanceImpl###updateProcessQueueTableInRebalance private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { //省列 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { //省略 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } //省略 //派發(fā)請(qǐng)求任務(wù) this.dispatchPullRequest(pullRequestList); return changed; }
下面跟RebalanceImpl###dispatchPullRequest方法,最后跟到下面的代碼,就是把PullRequest放入到一個(gè)阻塞隊(duì)列里。
//PullMessageService###executePullRequestImmediately public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
那么誰(shuí)取阻塞隊(duì)列里的數(shù)據(jù)誰(shuí)就是消費(fèi)消息了? PullMessageService是一個(gè)線程,他的run方法里會(huì)取上面阻塞隊(duì)列里的PullRequest,如下面代碼所示
//PullMessageService###run() public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
從PullMessageService###pullMessage方法一直往下跟,就跟到下面的代碼
//DefaultMQPushConsumerImpl###pullMessage(final PullRequest pullRequest) public void pullMessage(final PullRequest pullRequest) { //省略 final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { //省略,但是重要,后面會(huì)說(shuō) //省略,但是重要,后面會(huì)說(shuō) //省略,但是重要,后面會(huì)說(shuō) //省略,但是重要,后面會(huì)說(shuō) }; //省略 try { //發(fā)送數(shù)據(jù)并且執(zhí)行回調(diào)方法,下面我們看一下回調(diào)方法的內(nèi)容就好好了 //發(fā)送數(shù)據(jù)并且執(zhí)行回調(diào)方法,下面我們看一下回調(diào)方法的內(nèi)容就好好了 //發(fā)送數(shù)據(jù)并且執(zhí)行回調(diào)方法,下面我們看一下回調(diào)方法的內(nèi)容就好好了 this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
那么回調(diào)方法是什么邏輯呢?代碼如下所示,發(fā)現(xiàn)數(shù)據(jù)并且submitConsumeRequest
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { //發(fā)現(xiàn)數(shù)據(jù) //發(fā)現(xiàn)數(shù)據(jù) //發(fā)現(xiàn)數(shù)據(jù) case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); //跟進(jìn)去 //跟進(jìn)去 //跟進(jìn)去 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } //省略 break; case NO_NEW_MSG: case NO_MATCHED_MSG: //省略 } } } };
跟上面submitConsumeRequest方法的到下面的代碼,封裝成ConsumeRequest,其實(shí)ConsumerRequest是一個(gè)線程
//ConsumeMessageConcurrentlyService###submitConsumeRequest public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { //省略 } } }
ConsumeRequest的run方法就會(huì)執(zhí)行我們注冊(cè)的listener方法,此時(shí)就消費(fèi)到數(shù)據(jù) ```java @Override public void run() { //省略 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); //省略 } }
總結(jié): 如下圖所示,RebalanceService線程會(huì)根據(jù)情況把請(qǐng)求放在PullMessageService的pullRequestQueue阻塞隊(duì)列隊(duì)列里,隊(duì)列的每一個(gè)節(jié)點(diǎn)就是拉請(qǐng)求;PullMessageService線程就是不斷去pullRequestQueue里拿任務(wù)然后去看一下broker中有沒(méi)有數(shù)據(jù),如果有數(shù)據(jù)就消費(fèi)。
總結(jié)
(1)忽然發(fā)現(xiàn)nameserver在整個(gè)過(guò)程中的作用感覺(jué)不是很大,其實(shí)我感覺(jué)這種設(shè)計(jì)還挺好的,因?yàn)榘阉械膲毫Χ挤旁趎ameserver返回減少系統(tǒng)的健壯性。
(2)RocketMQ的rebalance是在消息消費(fèi)者這邊實(shí)現(xiàn)的,這樣有一個(gè)很大的優(yōu)勢(shì)是減少nameserver和broker的壓力。那消費(fèi)者是怎么實(shí)現(xiàn)rebalance的呢?通過(guò)一個(gè)參數(shù)為當(dāng)前消費(fèi)者ID、主題隊(duì)列、消費(fèi)者組ClientID列表的匹配算法,每次只要保證算法的冪等性就可以了。
(3)RocketMQ的rebalance的rebalance是根據(jù)單個(gè)主題去實(shí)現(xiàn)的,這樣的一個(gè)缺點(diǎn)是容易出現(xiàn)消費(fèi)不平衡的問(wèn)題。如下圖所示。
(4)RocketMQ是AP的,因?yàn)樗暮懿僮鞫际嵌际峭ㄟ^(guò)線程池的定時(shí)任務(wù)去做的。
到此這篇關(guān)于RocketMQ中的消費(fèi)者啟動(dòng)流程解讀的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)者啟動(dòng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問(wèn)題
- RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解
- RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理
- RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解
- 詳解RocketMQ中的消費(fèi)者啟動(dòng)與消費(fèi)流程分析
- RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑
- RocketMq同組消費(fèi)者如何自動(dòng)設(shè)置InstanceName
相關(guān)文章
springboot 使用poi進(jìn)行數(shù)據(jù)的導(dǎo)出過(guò)程詳解
這篇文章主要介紹了springboot 使用poi進(jìn)行數(shù)據(jù)的導(dǎo)出過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09spring cloud gateway中redis一直打印重連日志問(wèn)題及解決
這篇文章主要介紹了spring cloud gateway中redis一直打印重連日志問(wèn)題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Java實(shí)現(xiàn)warcraft?java版游戲的示例代碼
致敬經(jīng)典的warcraft,《warcraft?java版》是一款即時(shí)戰(zhàn)略題材單機(jī)游戲,采用魔獸原味風(fēng)格和機(jī)制。本文將用java語(yǔ)言實(shí)現(xiàn),采用了swing技術(shù)進(jìn)行了界面化處理,感興趣的可以了解一下2022-09-09webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實(shí)例代碼
這篇文章主要介紹了webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實(shí)例代碼,需要的朋友可以參考下2017-04-04解決feign微服務(wù)間的文件上傳報(bào)錯(cuò)問(wèn)題
這篇文章主要介紹了解決feign微服務(wù)間的文件上傳報(bào)錯(cuò)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06手把手教你用Java實(shí)現(xiàn)一套簡(jiǎn)單的鑒權(quán)服務(wù)
現(xiàn)今大部分系統(tǒng)都會(huì)有自己的鑒權(quán)服務(wù),本文介紹了最常用的鑒權(quán)服務(wù),就是日常用戶的登錄登出,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05JDK9為何要將String的底層實(shí)現(xiàn)由char[]改成了byte[]
String 類的源碼已經(jīng)由?char[]?優(yōu)化為了?byte[]?來(lái)存儲(chǔ)字符串內(nèi)容,為什么要這樣做呢?本文就詳細(xì)的介紹一下,感興趣的可以了解一下2022-03-03Java的Cglib動(dòng)態(tài)代理實(shí)現(xiàn)方式詳解
這篇文章主要介紹了Java的Cglib動(dòng)態(tài)代理實(shí)現(xiàn)方式詳解,CGLIB是強(qiáng)大的、高性能的代碼生成庫(kù),被廣泛應(yīng)用于AOP框架,它底層使用ASM來(lái)操作字節(jié)碼生成新的類,為對(duì)象引入間接級(jí)別,以控制對(duì)象的訪問(wèn),需要的朋友可以參考下2023-11-11