RocketMQ生產者如何規(guī)避故障Broker方式詳解
前言
在消息發(fā)送過程中,生產者從NameServer
中獲取到了指定Topic
對應的Broker
信息,在同步發(fā)送消息的代碼中,如果消息發(fā)送失敗,生產者默認是會重試兩次的。那么Broker
有問題的情況下,無論重試多少次都是沒有意義的,消息生產者是如何規(guī)避這些故障Broker
的呢?
收集故障Broker
我們在所有的發(fā)送消息源碼中都可以找到這樣一段代碼,可在DefaultMQProducerImpl
類中查找:
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
無論是發(fā)送成功還是失敗,RocketMQ生產者客戶端都會做這一步操作:
// 發(fā)送成功的話,isolation傳false,失敗isolation傳true public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
如果Broker
產生故障,那么會創(chuàng)建一個FaultItem
對象記錄故障的Broker
,并把結果放進故障規(guī)避表faultItemTable
中,數據格式如下:
"broker-a": { // broker名稱 "name": "broker-a", "currentLatency": 發(fā)送消息消耗的時間,毫秒值之差, // 解除規(guī)避的時間,絕對時間 "startTimestamp": 時間戳毫秒值 }, "broker-b": { // broker名稱 "name": "broker-b", "currentLatency": 發(fā)送消息消耗的時間,毫秒值之差, // 解除規(guī)避的時間,絕對時間 "startTimestamp": 時間戳毫秒值 }
發(fā)送成功的Broker
設置的故障規(guī)避時間為0,發(fā)送失敗的Broker
將被設置為規(guī)避30秒;
選擇Broker
在MQFaultStrategy.selectOneMessageQueue()
方法中,我們分三部分來分析如何選擇Broker。
- 輪詢選擇一個可用的Broker
// 輪詢的基本套路,一個自增變量 int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 通過對隊列數量取模,獲取選定的Broker所在的位置 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 判斷Broker是否在規(guī)避時間內,如果不在規(guī)避時間內,就選擇這個Broker,否則繼續(xù)循環(huán)直至所有Broker都在規(guī)避時間內 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; }
1.輪詢的基本套路都是通過一個自增變量來對所有的Broker數量取模,這樣就可以命中一個Broker;
2.針對命中的Broker判斷是否在規(guī)避時間范圍內,不在規(guī)避時間內就可以返回;否則只能進入第二個方案;
- 選擇一個相對延遲低的Broker
// 把所有規(guī)避列表中的Broker按延遲高低排序,并從延遲低的Broker中選擇一個 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); // 判斷該Broker是否允許寫消息 if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } // 返回選中的Broker return mq; }
1.從規(guī)避列表中找到延時比較低的Broker;
2.判斷該Broker是否允許寫消息,允許寫消息的話就直接返回,否則再進入下一個方案;
- 默認的選擇
return tpInfo.selectOneMessageQueue();
最后直接輪詢一個Broker
直接返回:
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
該方案是默認方案,沒有開啟故障規(guī)避配置的話,所有Broker的選擇都是使用的該方案;
小結
RocketMQ
通過設置故障規(guī)避表的方式,把所有的Broker
的延遲數據都保留在故障規(guī)避表中,根據該列表制定了以下幾種策略:
1.優(yōu)先選擇不在規(guī)避時間范圍內的Broker
;
2.如果所有Broker
都在規(guī)避時間內,優(yōu)先選擇延遲低的Broker
;
3.如果依然沒有選中合適的Broker
,那么就直接挑一個Broker
來用;
以上就是RocketMQ生產者如何規(guī)避故障Broker方式詳解的詳細內容,更多關于RocketMQ生產者規(guī)避故障Broker的資料請關注腳本之家其它相關文章!