RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解
前言
在RocketMQ
中為,我們創(chuàng)建消息生產(chǎn)者時,只需要設(shè)置NameServer
地址,消息就能正確地發(fā)送到對應(yīng)的Broker
中,那么RocketMQ
消息生產(chǎn)者是如何找到Broker
的呢?如果有多個Broker
實(shí)例,那么消息發(fā)送是如何選擇發(fā)送到哪個Broker
的呢?
從NameServer查詢Topic信息
通過Debug消息發(fā)送send()
方法,我們最終可以定位到DefaultMQProducerImpl.sendDefaultImpl()
這個方法,并且我們找到了最關(guān)鍵的Topic信息:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
這個方法就是通過topic
從NameServer
拉出對應(yīng)的Broker
信息:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
1.一開始的話,是從當(dāng)前緩存中找Topic
信息,第一次肯定是找不到的;
2.找不到Topic
信息,那么就調(diào)用updateTopicRouteInfoFromNameServer(topic)
從NameServer
拉對應(yīng)的信息,如果拉到了就更新到緩存中;
3.如果依然找不到Topic
信息,說明沒有任何Broker
上面是有這個Topic
的;但是我們還要拉開啟了自動創(chuàng)建Topic
配置的Broker
信息,通過updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)
實(shí)現(xiàn);
生產(chǎn)者客戶端會從兩個地方獲取Broker
信息,第一個就是從內(nèi)存緩存中獲取,第二個就是從NameServer
中獲取。從NameServer
中分兩次獲取,一次是獲取存在的Topic
對應(yīng)的Broker
信息,第二次是獲取還沒有創(chuàng)建出來的Topic
對應(yīng)的Broker
信息;
如何選擇Broker
當(dāng)客戶端拿到了Topic
對應(yīng)的Broker
信息后,它是如何選擇目標(biāo)Broker
的呢?繼續(xù)向下看,我們找到了關(guān)鍵代碼:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); ......
1.如果是同步發(fā)送消息,那么【總的發(fā)送次數(shù)】=1+【重試次數(shù)】,如果是異步發(fā)送,默認(rèn)是1;我們當(dāng)前是同步模式,所以會存在重試;
2.選擇Broker
的關(guān)鍵代碼就在selectOneMessageQueue()
方法中,通過前面拿到的topicPublishInfo
作為參數(shù),lastBrokerName
作為額外的考慮參數(shù);
追蹤代碼,我們進(jìn)入MQFaultStrategy.selectOneMessageQueue()
中:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
1.如果開啟了延遲故障規(guī)避,那么執(zhí)行規(guī)避策略;
- 1.1:輪詢找一個
Broker
,該Broker
要么不在規(guī)避名單內(nèi),要么已經(jīng)度過了規(guī)避期(發(fā)送消息失敗會將目標(biāo)Broker放進(jìn)規(guī)避名單,沉默一段時間); - 1.2:如果所有的
Broker
都沒有度過規(guī)避期,那么從比較好的那一部分Broker
里面找一個出來; - 1.3:如果依然沒有找到合適的
Broker
,那么就隨機(jī)選一個Broker
;
2.否則就隨機(jī)選一個Broker
;
下面我們來看一下隨機(jī)發(fā)送的策略是怎么實(shí)現(xiàn)的:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
1.如果第一次發(fā)送消息,那么通過自增求余的方式從列表中找一個Broker
,其實(shí)就是輪詢方式;
2.如果不是第一次發(fā)送消息,那么會盡可能避開上一次的Broker
服務(wù),也是為了讓Broker
服務(wù)負(fù)載均衡;
3.如果沒有避開上一次的Broker
,那么再向后找另一個Broker
;除非只有一個Broker
服務(wù),否則會盡可能避開上次發(fā)送的Broker
;
小結(jié)
通過源碼分析,我們已經(jīng)知道了生產(chǎn)者是如何選擇目標(biāo)Broker
的了:
1.第一次發(fā)消息,通過輪詢的方式選擇Broker
;
2.后續(xù)發(fā)消息會規(guī)避上次的Broker
,同樣采用輪詢的方式選擇Broker
;
3.在消息發(fā)送過程中,存在一個Broker
規(guī)避列表,用戶可以通過setSendLatencyFaultEnable(true)
開啟故障規(guī)避策略,客戶端會盡可能選擇不在規(guī)避列表中的Broker
,如果所有的Broker
都在規(guī)避列表中,那么會選擇一個相對比較好的Broker
來用;
以上就是RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息生產(chǎn)者Broker的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實(shí)現(xiàn)左旋轉(zhuǎn)字符串
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)左旋轉(zhuǎn)字符串,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-03-03MyBatis標(biāo)簽之Select?resultType和resultMap詳解
這篇文章主要介紹了MyBatis標(biāo)簽之Select?resultType和resultMap,在MyBatis中有一個ResultMap標(biāo)簽,它是為了映射select標(biāo)簽查詢出來的結(jié)果集,下面使用一個簡單的例子,來介紹 resultMap 的使用方法,需要的朋友可以參考下2022-09-09java實(shí)現(xiàn)順序結(jié)構(gòu)線性列表的函數(shù)代碼
java實(shí)現(xiàn)順序結(jié)構(gòu)線性列表的函數(shù)代碼。需要的朋友可以過來參考下,希望對大家有所幫助2013-10-10SpringBoot注解@Import原理之關(guān)于ConfigurationClassPostProcessor源碼解析
這篇文章主要介紹了SpringBoot注解@Import原理之關(guān)于ConfigurationClassPostProcessor源碼解析,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07java.lang.NoSuchMethodException: com.sun.proxy.$Proxy58.list
這篇文章主要介紹了java.lang.NoSuchMethodException: com.sun.proxy.$Proxy58.list錯誤解決辦法的相關(guān)資料,需要的朋友可以參考下2016-12-12Java中Map與對象之間互相轉(zhuǎn)換的幾種常用方式
在Java中將對象和Map相互轉(zhuǎn)換是常見的操作,可以通過不同的方式實(shí)現(xiàn)這種轉(zhuǎn)換,下面這篇文章主要給大家介紹了關(guān)于Java中Map與對象之間互相轉(zhuǎn)換的幾種常用方式,需要的朋友可以參考下2024-01-01使用ClassFinal實(shí)現(xiàn)SpringBoot項(xiàng)目jar包加密的操作指南
在實(shí)際開發(fā)中,保護(hù)項(xiàng)目的安全性和保密性是至關(guān)重要的,針對于 Spring Boot 項(xiàng)目,我們需要將 JAR 包進(jìn)行加密從而有效地防止未經(jīng)授權(quán)的訪問和修改,本文將介紹如何使用ClassFinal在 Spring Boot 項(xiàng)目中實(shí)現(xiàn) JAR 包加密,需要的朋友可以參考下2024-06-06Struts2實(shí)現(xiàn)對action請求對象的攔截操作方法
這篇文章主要介紹了Struts2實(shí)現(xiàn)對action請求對象的攔截操作方法,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-11-11