RocketMQ中的消費者啟動流程解讀
RocketMQ消費者啟動
問題
消費者啟動的時候,去哪拿的消息呢?
問題答案
(1)當broker啟動的時候,會把broker的地址端口、broker上的主題信息、主題隊列信息發(fā)送到nameserver(如圖中1)
(2)消費者Client啟動的時候會去nameserver拿toipc、topic隊列以及對應的broker信息,拿到以后把信息存儲到本地(如圖中2)
(3)消費者會給所有的broker發(fā)送心跳,并且附帶自己的消費者組信息和ClientID信息,此時broker中就有消費者組對應的ClientID集合(如圖中3)
(4)消費者啟動后會reblance,有訂閱的主題隊列列表,并且通過broker可以拿到消費者組的ClientID集合,兩個集合做rebalance,就可以拿到當前消費者對應消費的主題隊列
(5) 消費者知道自己消費的主題隊列,就可以根據(jù)隊列信息通過Netty發(fā)送消息
跟源碼
注意
本文是消費者啟動流程,所以不去關注broker和nameserver的啟動流程,這樣關注點比較集中,因此步驟(1)本文不做描述。
消費者啟動時怎么拿到toipc的信息
消費者啟動的時候會調用 MQClientInstance###start()方法,start()方法里有會調用 MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會每隔一段時間更新一下topic路由信息
//MQClientInstance###startScheduledTask() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
會把路由信息保存到本地的一個HashMap里,這樣消費者就拿到了topic的信息并且會把broker的信息保存下來
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //根據(jù)主題從nameserver獲取topic信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //把主題和主題隊列相關的broker保存下來 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); }
總結:消費者拿到主題的隊列列表和broker信息
消費者給broker發(fā)現(xiàn)心跳的作用
MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會每隔一段時間給所有的broker發(fā)送心跳消息
//MQClientInstance###startScheduledTask() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
那么發(fā)送的心跳包中攜帶什么信息呢?如代碼中所示,攜帶clientID和組名稱
//MQClientInstance###prepareHeartbeatData private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID //放入了當前消費者的clientID //放入了當前消費者的clientID //放入了當前消費者的clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { ConsumerData consumerData = new ConsumerData(); //放入了當前消費者的組名稱 //放入了當前消費者的組名稱 //放入了當前消費者的組名稱 //放入了當前消費者的組名稱 consumerData.setGroupName(impl.groupName()); consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel()); consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode()); heartbeatData.getConsumerDataSet().add(consumerData); } } // Producer for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { ProducerData producerData = new ProducerData(); producerData.setGroupName(entry.getKey()); heartbeatData.getProducerDataSet().add(producerData); } } return heartbeatData; }
此時broker拿到心跳消息怎么處理的呢?有一部分邏輯如下面代碼所示,記錄一下消費者信息
//ClientManageProcessor###heartBeat(ChannelHandlerContext ctx, RemotingCommand request) ```java public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { //省略 for (ConsumerData data : heartbeatData.getConsumerDataSet()) { //省略 boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); //省略 } //省略 }
消費者怎么做reblance
MQClientInstance的start的方法里會開啟一個rebalance的線程,如下面代碼所示
//MQClientInstance###start() public void start() throws MQClientException { //省略 // Start rebalance service this.rebalanceService.start(); //省略 }
跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法,如下面代碼所示。根據(jù)主題隊列列表和消費者組集合去做一個Rebalance,最后的返回結果是當前消費者需要消費的主題隊列。
//RebalanceImpl##rebalanceByTopic private void rebalanceByTopic(final String topic, final boolean isOrder) { //獲取訂閱的主題的隊列 //獲取訂閱的主題的隊列 //獲取訂閱的主題的隊列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //獲取同消費者組的ClientID集合 //獲取同消費者組的ClientID集合 //獲取同消費者組的ClientID集合 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //排序 //排序 //排序 Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列 //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列 //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列 //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列 //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列 allocateResultSet.addAll(allocateResult); } //此處看下面的消費者怎么去拉消息 //此處看下面的消費者怎么去拉消息 //此處看下面的消費者怎么去拉消息 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); } }
總結:消費者拿到主題的隊列列表和消費者組中ClientID集合,通過在消費者這變做rebalance,從而確定被分配的主題隊列集合
消費者怎么拉取消息
此處還是繼續(xù)跟上面的代碼,,然后執(zhí)行到下面的代碼,當消費者確定自己被分配的主題隊列后,會把主題隊列封裝成PullRequest 并進行dispatch
//RebalanceImpl###updateProcessQueueTableInRebalance private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { //省列 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { //省略 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } //省略 //派發(fā)請求任務 this.dispatchPullRequest(pullRequestList); return changed; }
下面跟RebalanceImpl###dispatchPullRequest方法,最后跟到下面的代碼,就是把PullRequest放入到一個阻塞隊列里。
//PullMessageService###executePullRequestImmediately public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
那么誰取阻塞隊列里的數(shù)據(jù)誰就是消費消息了? PullMessageService是一個線程,他的run方法里會取上面阻塞隊列里的PullRequest,如下面代碼所示
//PullMessageService###run() public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
從PullMessageService###pullMessage方法一直往下跟,就跟到下面的代碼
//DefaultMQPushConsumerImpl###pullMessage(final PullRequest pullRequest) public void pullMessage(final PullRequest pullRequest) { //省略 final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { //省略,但是重要,后面會說 //省略,但是重要,后面會說 //省略,但是重要,后面會說 //省略,但是重要,后面會說 }; //省略 try { //發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內(nèi)容就好好了 //發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內(nèi)容就好好了 //發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內(nèi)容就好好了 this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
那么回調方法是什么邏輯呢?代碼如下所示,發(fā)現(xiàn)數(shù)據(jù)并且submitConsumeRequest
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { //發(fā)現(xiàn)數(shù)據(jù) //發(fā)現(xiàn)數(shù)據(jù) //發(fā)現(xiàn)數(shù)據(jù) case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); //跟進去 //跟進去 //跟進去 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } //省略 break; case NO_NEW_MSG: case NO_MATCHED_MSG: //省略 } } } };
跟上面submitConsumeRequest方法的到下面的代碼,封裝成ConsumeRequest,其實ConsumerRequest是一個線程
//ConsumeMessageConcurrentlyService###submitConsumeRequest public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { //省略 } } }
ConsumeRequest的run方法就會執(zhí)行我們注冊的listener方法,此時就消費到數(shù)據(jù) ```java @Override public void run() { //省略 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); //省略 } }
總結: 如下圖所示,RebalanceService線程會根據(jù)情況把請求放在PullMessageService的pullRequestQueue阻塞隊列隊列里,隊列的每一個節(jié)點就是拉請求;PullMessageService線程就是不斷去pullRequestQueue里拿任務然后去看一下broker中有沒有數(shù)據(jù),如果有數(shù)據(jù)就消費。
總結
(1)忽然發(fā)現(xiàn)nameserver在整個過程中的作用感覺不是很大,其實我感覺這種設計還挺好的,因為把所有的壓力都放在nameserver返回減少系統(tǒng)的健壯性。
(2)RocketMQ的rebalance是在消息消費者這邊實現(xiàn)的,這樣有一個很大的優(yōu)勢是減少nameserver和broker的壓力。那消費者是怎么實現(xiàn)rebalance的呢?通過一個參數(shù)為當前消費者ID、主題隊列、消費者組ClientID列表的匹配算法,每次只要保證算法的冪等性就可以了。
(3)RocketMQ的rebalance的rebalance是根據(jù)單個主題去實現(xiàn)的,這樣的一個缺點是容易出現(xiàn)消費不平衡的問題。如下圖所示。
(4)RocketMQ是AP的,因為他的很操作都是都是通過線程池的定時任務去做的。
到此這篇關于RocketMQ中的消費者啟動流程解讀的文章就介紹到這了,更多相關RocketMQ消費者啟動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot 使用poi進行數(shù)據(jù)的導出過程詳解
這篇文章主要介紹了springboot 使用poi進行數(shù)據(jù)的導出過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09spring cloud gateway中redis一直打印重連日志問題及解決
這篇文章主要介紹了spring cloud gateway中redis一直打印重連日志問題及解決,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05Java實現(xiàn)warcraft?java版游戲的示例代碼
致敬經(jīng)典的warcraft,《warcraft?java版》是一款即時戰(zhàn)略題材單機游戲,采用魔獸原味風格和機制。本文將用java語言實現(xiàn),采用了swing技術進行了界面化處理,感興趣的可以了解一下2022-09-09webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實例代碼
這篇文章主要介紹了webuploader在springMVC+jquery+Java開發(fā)環(huán)境下的大文件分片上傳的實例代碼,需要的朋友可以參考下2017-04-04JDK9為何要將String的底層實現(xiàn)由char[]改成了byte[]
String 類的源碼已經(jīng)由?char[]?優(yōu)化為了?byte[]?來存儲字符串內(nèi)容,為什么要這樣做呢?本文就詳細的介紹一下,感興趣的可以了解一下2022-03-03Java的Cglib動態(tài)代理實現(xiàn)方式詳解
這篇文章主要介紹了Java的Cglib動態(tài)代理實現(xiàn)方式詳解,CGLIB是強大的、高性能的代碼生成庫,被廣泛應用于AOP框架,它底層使用ASM來操作字節(jié)碼生成新的類,為對象引入間接級別,以控制對象的訪問,需要的朋友可以參考下2023-11-11