欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ中的消費者啟動流程解讀

 更新時間:2023年10月11日 10:19:42   作者:CBeann  
這篇文章主要介紹了RocketMQ中的消費者啟動流程解讀,RocketMQ是一款高性能、高可靠性的分布式消息中間件,消費者是RocketMQ中的重要組成部分,消費者負責從消息隊列中獲取消息并進行處理,需要的朋友可以參考下

RocketMQ消費者啟動

問題

消費者啟動的時候,去哪拿的消息呢?

問題答案

請?zhí)砑訄D片描述

(1)當broker啟動的時候,會把broker的地址端口、broker上的主題信息、主題隊列信息發(fā)送到nameserver(如圖中1)

(2)消費者Client啟動的時候會去nameserver拿toipc、topic隊列以及對應的broker信息,拿到以后把信息存儲到本地(如圖中2)

(3)消費者會給所有的broker發(fā)送心跳,并且附帶自己的消費者組信息和ClientID信息,此時broker中就有消費者組對應的ClientID集合(如圖中3)

(4)消費者啟動后會reblance,有訂閱的主題隊列列表,并且通過broker可以拿到消費者組的ClientID集合,兩個集合做rebalance,就可以拿到當前消費者對應消費的主題隊列

(5) 消費者知道自己消費的主題隊列,就可以根據(jù)隊列信息通過Netty發(fā)送消息

跟源碼

注意

本文是消費者啟動流程,所以不去關注broker和nameserver的啟動流程,這樣關注點比較集中,因此步驟(1)本文不做描述。

消費者啟動時怎么拿到toipc的信息

消費者啟動的時候會調用 MQClientInstance###start()方法,start()方法里有會調用 MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會每隔一段時間更新一下topic路由信息

//MQClientInstance###startScheduledTask()
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

會把路由信息保存到本地的一個HashMap里,這樣消費者就拿到了topic的信息并且會把broker的信息保存下來

//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//根據(jù)主題從nameserver獲取topic信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主題和主題隊列相關的broker保存下來
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

總結:消費者拿到主題的隊列列表和broker信息

消費者給broker發(fā)現(xiàn)心跳的作用

MQClientInstance###startScheduledTask()方法,里面的一段代碼如下,會每隔一段時間給所有的broker發(fā)送心跳消息

//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

那么發(fā)送的心跳包中攜帶什么信息呢?如代碼中所示,攜帶clientID和組名稱

//MQClientInstance###prepareHeartbeatData
private HeartbeatData prepareHeartbeatData() {
        HeartbeatData heartbeatData = new HeartbeatData();
        // clientID
        //放入了當前消費者的clientID
        //放入了當前消費者的clientID
        //放入了當前消費者的clientID
        heartbeatData.setClientID(this.clientId);
        // Consumer
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                ConsumerData consumerData = new ConsumerData(); 
                //放入了當前消費者的組名稱
                //放入了當前消費者的組名稱
                //放入了當前消費者的組名稱
                //放入了當前消費者的組名稱
                consumerData.setGroupName(impl.groupName());
                consumerData.setConsumeType(impl.consumeType());
                consumerData.setMessageModel(impl.messageModel());
                consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                consumerData.setUnitMode(impl.isUnitMode());
                heartbeatData.getConsumerDataSet().add(consumerData);
            }
        }
        // Producer
        for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                ProducerData producerData = new ProducerData();
                producerData.setGroupName(entry.getKey());
                heartbeatData.getProducerDataSet().add(producerData);
            }
        }
        return heartbeatData;
    }

此時broker拿到心跳消息怎么處理的呢?有一部分邏輯如下面代碼所示,記錄一下消費者信息

//ClientManageProcessor###heartBeat(ChannelHandlerContext ctx, RemotingCommand request)
```java
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        //省略
        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            //省略
            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );
            //省略
        }
       //省略
    }

消費者怎么做reblance

MQClientInstance的start的方法里會開啟一個rebalance的線程,如下面代碼所示

//MQClientInstance###start()
public void start() throws MQClientException {
 //省略
 // Start rebalance service
 this.rebalanceService.start();
 //省略
}

跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法,如下面代碼所示。根據(jù)主題隊列列表和消費者組集合去做一個Rebalance,最后的返回結果是當前消費者需要消費的主題隊列。

//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
                //獲取訂閱的主題的隊列
                //獲取訂閱的主題的隊列
                //獲取訂閱的主題的隊列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //獲取同消費者組的ClientID集合
                //獲取同消費者組的ClientID集合
                //獲取同消費者組的ClientID集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //排序
                    //排序
                    //排序
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
                        //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
                        //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
                        //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
                        //rebalance算法核心實現(xiàn),最后的結果是返回應該消費的隊列
                        allocateResultSet.addAll(allocateResult);
                    }
                    //此處看下面的消費者怎么去拉消息
                    //此處看下面的消費者怎么去拉消息
                    //此處看下面的消費者怎么去拉消息
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        }
    }

總結:消費者拿到主題的隊列列表和消費者組中ClientID集合,通過在消費者這變做rebalance,從而確定被分配的主題隊列集合

消費者怎么拉取消息

此處還是繼續(xù)跟上面的代碼,,然后執(zhí)行到下面的代碼,當消費者確定自己被分配的主題隊列后,會把主題隊列封裝成PullRequest 并進行dispatch

//RebalanceImpl###updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
       //省列
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
                        //省略
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
            }
        }
         //省略
        //派發(fā)請求任務
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }

下面跟RebalanceImpl###dispatchPullRequest方法,最后跟到下面的代碼,就是把PullRequest放入到一個阻塞隊列里。

//PullMessageService###executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

那么誰取阻塞隊列里的數(shù)據(jù)誰就是消費消息了? PullMessageService是一個線程,他的run方法里會取上面阻塞隊列里的PullRequest,如下面代碼所示

//PullMessageService###run()
public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }

從PullMessageService###pullMessage方法一直往下跟,就跟到下面的代碼

//DefaultMQPushConsumerImpl###pullMessage(final PullRequest pullRequest) 
public void pullMessage(final PullRequest pullRequest) {
        //省略
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() {
            //省略,但是重要,后面會說
            //省略,但是重要,后面會說
            //省略,但是重要,后面會說
            //省略,但是重要,后面會說
        };
           //省略
        try {
            //發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內(nèi)容就好好了
            //發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內(nèi)容就好好了
            //發(fā)送數(shù)據(jù)并且執(zhí)行回調方法,下面我們看一下回調方法的內(nèi)容就好好了
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }

那么回調方法是什么邏輯呢?代碼如下所示,發(fā)現(xiàn)數(shù)據(jù)并且submitConsumeRequest

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        //發(fā)現(xiàn)數(shù)據(jù)
                        //發(fā)現(xiàn)數(shù)據(jù)
                        //發(fā)現(xiàn)數(shù)據(jù)
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                //跟進去
                                //跟進去
                                //跟進去
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            //省略
                            break;
                        case NO_NEW_MSG:
                        case NO_MATCHED_MSG:
                            //省略
                    }
                }
            }
        };

跟上面submitConsumeRequest方法的到下面的代碼,封裝成ConsumeRequest,其實ConsumerRequest是一個線程

//ConsumeMessageConcurrentlyService###submitConsumeRequest
public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            //省略
            }
        }
    }
ConsumeRequest的run方法就會執(zhí)行我們注冊的listener方法,此時就消費到數(shù)據(jù)
```java
@Override
        public void run() {
            //省略
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            //省略
            }
        }

在這里插入圖片描述

總結: 如下圖所示,RebalanceService線程會根據(jù)情況把請求放在PullMessageService的pullRequestQueue阻塞隊列隊列里,隊列的每一個節(jié)點就是拉請求;PullMessageService線程就是不斷去pullRequestQueue里拿任務然后去看一下broker中有沒有數(shù)據(jù),如果有數(shù)據(jù)就消費。

在這里插入圖片描述

總結

(1)忽然發(fā)現(xiàn)nameserver在整個過程中的作用感覺不是很大,其實我感覺這種設計還挺好的,因為把所有的壓力都放在nameserver返回減少系統(tǒng)的健壯性。

(2)RocketMQ的rebalance是在消息消費者這邊實現(xiàn)的,這樣有一個很大的優(yōu)勢是減少nameserver和broker的壓力。那消費者是怎么實現(xiàn)rebalance的呢?通過一個參數(shù)為當前消費者ID、主題隊列、消費者組ClientID列表的匹配算法,每次只要保證算法的冪等性就可以了。

(3)RocketMQ的rebalance的rebalance是根據(jù)單個主題去實現(xiàn)的,這樣的一個缺點是容易出現(xiàn)消費不平衡的問題。如下圖所示。

在這里插入圖片描述

(4)RocketMQ是AP的,因為他的很操作都是都是通過線程池的定時任務去做的。

到此這篇關于RocketMQ中的消費者啟動流程解讀的文章就介紹到這了,更多相關RocketMQ消費者啟動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

最新評論