RocketMQ生產(chǎn)者如何規(guī)避故障Broker方式詳解
前言
在消息發(fā)送過(guò)程中,生產(chǎn)者從NameServer中獲取到了指定Topic對(duì)應(yīng)的Broker信息,在同步發(fā)送消息的代碼中,如果消息發(fā)送失敗,生產(chǎn)者默認(rèn)是會(huì)重試兩次的。那么Broker有問(wèn)題的情況下,無(wú)論重試多少次都是沒(méi)有意義的,消息生產(chǎn)者是如何規(guī)避這些故障Broker的呢?
收集故障Broker
我們?cè)谒械陌l(fā)送消息源碼中都可以找到這樣一段代碼,可在DefaultMQProducerImpl類中查找:
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
無(wú)論是發(fā)送成功還是失敗,RocketMQ生產(chǎn)者客戶端都會(huì)做這一步操作:
// 發(fā)送成功的話,isolation傳false,失敗isolation傳true
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
如果Broker產(chǎn)生故障,那么會(huì)創(chuàng)建一個(gè)FaultItem對(duì)象記錄故障的Broker,并把結(jié)果放進(jìn)故障規(guī)避表faultItemTable中,數(shù)據(jù)格式如下:
"broker-a": {
// broker名稱
"name": "broker-a",
"currentLatency": 發(fā)送消息消耗的時(shí)間,毫秒值之差,
// 解除規(guī)避的時(shí)間,絕對(duì)時(shí)間
"startTimestamp": 時(shí)間戳毫秒值
},
"broker-b": {
// broker名稱
"name": "broker-b",
"currentLatency": 發(fā)送消息消耗的時(shí)間,毫秒值之差,
// 解除規(guī)避的時(shí)間,絕對(duì)時(shí)間
"startTimestamp": 時(shí)間戳毫秒值
}
發(fā)送成功的Broker設(shè)置的故障規(guī)避時(shí)間為0,發(fā)送失敗的Broker將被設(shè)置為規(guī)避30秒;
選擇Broker
在MQFaultStrategy.selectOneMessageQueue()方法中,我們分三部分來(lái)分析如何選擇Broker。
- 輪詢選擇一個(gè)可用的Broker
// 輪詢的基本套路,一個(gè)自增變量
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 通過(guò)對(duì)隊(duì)列數(shù)量取模,獲取選定的Broker所在的位置
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判斷Broker是否在規(guī)避時(shí)間內(nèi),如果不在規(guī)避時(shí)間內(nèi),就選擇這個(gè)Broker,否則繼續(xù)循環(huán)直至所有Broker都在規(guī)避時(shí)間內(nèi)
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
1.輪詢的基本套路都是通過(guò)一個(gè)自增變量來(lái)對(duì)所有的Broker數(shù)量取模,這樣就可以命中一個(gè)Broker;
2.針對(duì)命中的Broker判斷是否在規(guī)避時(shí)間范圍內(nèi),不在規(guī)避時(shí)間內(nèi)就可以返回;否則只能進(jìn)入第二個(gè)方案;
- 選擇一個(gè)相對(duì)延遲低的Broker
// 把所有規(guī)避列表中的Broker按延遲高低排序,并從延遲低的Broker中選擇一個(gè)
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 判斷該Broker是否允許寫消息
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
// 返回選中的Broker
return mq;
}
1.從規(guī)避列表中找到延時(shí)比較低的Broker;
2.判斷該Broker是否允許寫消息,允許寫消息的話就直接返回,否則再進(jìn)入下一個(gè)方案;
- 默認(rèn)的選擇
return tpInfo.selectOneMessageQueue();
最后直接輪詢一個(gè)Broker直接返回:
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
該方案是默認(rèn)方案,沒(méi)有開啟故障規(guī)避配置的話,所有Broker的選擇都是使用的該方案;
小結(jié)
RocketMQ通過(guò)設(shè)置故障規(guī)避表的方式,把所有的Broker的延遲數(shù)據(jù)都保留在故障規(guī)避表中,根據(jù)該列表制定了以下幾種策略:
1.優(yōu)先選擇不在規(guī)避時(shí)間范圍內(nèi)的Broker;
2.如果所有Broker都在規(guī)避時(shí)間內(nèi),優(yōu)先選擇延遲低的Broker;
3.如果依然沒(méi)有選中合適的Broker,那么就直接挑一個(gè)Broker來(lái)用;
以上就是RocketMQ生產(chǎn)者如何規(guī)避故障Broker方式詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ生產(chǎn)者規(guī)避故障Broker的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java隨機(jī)生成一個(gè)名字和對(duì)應(yīng)拼音的方法
這篇文章主要介紹了java隨機(jī)生成一個(gè)名字和對(duì)應(yīng)拼音的方法,涉及java針對(duì)數(shù)組及隨機(jī)數(shù)操作的相關(guān)技巧,需要的朋友可以參考下2015-07-07
使用maven?shade插件解決項(xiàng)目版本沖突詳解
這篇文章主要為大家介紹了使用maven?shade插件解決項(xiàng)目版本沖突詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
spring多數(shù)據(jù)源配置實(shí)現(xiàn)方法實(shí)例分析
這篇文章主要介紹了spring多數(shù)據(jù)源配置實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了spring多數(shù)據(jù)源配置相關(guān)操作技巧與使用注意事項(xiàng),需要的朋友可以參考下2019-12-12

