欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解

 更新時間:2022年11月29日 10:32:21   作者:夢想實(shí)現(xiàn)家_Z  
這篇文章主要為大家介紹了RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

RocketMQ中為,我們創(chuàng)建消息生產(chǎn)者時,只需要設(shè)置NameServer地址,消息就能正確地發(fā)送到對應(yīng)的Broker中,那么RocketMQ消息生產(chǎn)者是如何找到Broker的呢?如果有多個Broker實(shí)例,那么消息發(fā)送是如何選擇發(fā)送到哪個Broker的呢?

從NameServer查詢Topic信息

通過Debug消息發(fā)送send()方法,我們最終可以定位到DefaultMQProducerImpl.sendDefaultImpl()這個方法,并且我們找到了最關(guān)鍵的Topic信息:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

這個方法就是通過topicNameServer拉出對應(yīng)的Broker信息:

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

1.一開始的話,是從當(dāng)前緩存中找Topic信息,第一次肯定是找不到的;

2.找不到Topic信息,那么就調(diào)用updateTopicRouteInfoFromNameServer(topic)NameServer拉對應(yīng)的信息,如果拉到了就更新到緩存中;

3.如果依然找不到Topic信息,說明沒有任何Broker上面是有這個Topic的;但是我們還要拉開啟了自動創(chuàng)建Topic配置的Broker信息,通過updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)實(shí)現(xiàn);

生產(chǎn)者客戶端會從兩個地方獲取Broker信息,第一個就是從內(nèi)存緩存中獲取,第二個就是從NameServer中獲取。從NameServer中分兩次獲取,一次是獲取存在的Topic對應(yīng)的Broker信息,第二次是獲取還沒有創(chuàng)建出來的Topic對應(yīng)的Broker信息;

如何選擇Broker

當(dāng)客戶端拿到了Topic對應(yīng)的Broker信息后,它是如何選擇目標(biāo)Broker的呢?繼續(xù)向下看,我們找到了關(guān)鍵代碼:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                  ......

1.如果是同步發(fā)送消息,那么【總的發(fā)送次數(shù)】=1+【重試次數(shù)】,如果是異步發(fā)送,默認(rèn)是1;我們當(dāng)前是同步模式,所以會存在重試;

2.選擇Broker的關(guān)鍵代碼就在selectOneMessageQueue()方法中,通過前面拿到的topicPublishInfo作為參數(shù),lastBrokerName作為額外的考慮參數(shù);

追蹤代碼,我們進(jìn)入MQFaultStrategy.selectOneMessageQueue()中:

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

1.如果開啟了延遲故障規(guī)避,那么執(zhí)行規(guī)避策略;

  • 1.1:輪詢找一個Broker,該Broker要么不在規(guī)避名單內(nèi),要么已經(jīng)度過了規(guī)避期(發(fā)送消息失敗會將目標(biāo)Broker放進(jìn)規(guī)避名單,沉默一段時間);
  • 1.2:如果所有的Broker都沒有度過規(guī)避期,那么從比較好的那一部分Broker里面找一個出來;
  • 1.3:如果依然沒有找到合適的Broker,那么就隨機(jī)選一個Broker;

2.否則就隨機(jī)選一個Broker

下面我們來看一下隨機(jī)發(fā)送的策略是怎么實(shí)現(xiàn)的:

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

1.如果第一次發(fā)送消息,那么通過自增求余的方式從列表中找一個Broker,其實(shí)就是輪詢方式;

2.如果不是第一次發(fā)送消息,那么會盡可能避開上一次的Broker服務(wù),也是為了讓Broker服務(wù)負(fù)載均衡;

3.如果沒有避開上一次的Broker,那么再向后找另一個Broker;除非只有一個Broker服務(wù),否則會盡可能避開上次發(fā)送的Broker;

小結(jié)

通過源碼分析,我們已經(jīng)知道了生產(chǎn)者是如何選擇目標(biāo)Broker的了:

1.第一次發(fā)消息,通過輪詢的方式選擇Broker;

2.后續(xù)發(fā)消息會規(guī)避上次的Broker,同樣采用輪詢的方式選擇Broker

3.在消息發(fā)送過程中,存在一個Broker規(guī)避列表,用戶可以通過setSendLatencyFaultEnable(true)開啟故障規(guī)避策略,客戶端會盡可能選擇不在規(guī)避列表中的Broker,如果所有的Broker都在規(guī)避列表中,那么會選擇一個相對比較好的Broker來用;

以上就是RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息生產(chǎn)者Broker的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論