RocketMQ設(shè)計(jì)之故障規(guī)避機(jī)制
NameServer
為了簡(jiǎn)化和客戶端通信,發(fā)現(xiàn)Broker故障時(shí)并不會(huì)立即通知客戶端。故障規(guī)避機(jī)制就是用來(lái)解決當(dāng)Broker出現(xiàn)故障,Producer
不能及時(shí)感知而導(dǎo)致消息發(fā)送失敗的問(wèn)題。默認(rèn)不開(kāi)啟,如果開(kāi)啟,消息發(fā)送失敗的時(shí)候會(huì)將失敗的Broker暫時(shí)排除在隊(duì)列選擇列表外
MQFaultStrategy類的:
public class MQFaultStrategy { ? ? private final static InternalLogger log = ClientLogger.getLog(); ? ? private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); ? ? private boolean sendLatencyFaultEnable = false; ? ? private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; ? ? private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; ? ? public long[] getNotAvailableDuration() { ? ? ? ? return notAvailableDuration; ? ? } ? ? public void setNotAvailableDuration(final long[] notAvailableDuration) { ? ? ? ? this.notAvailableDuration = notAvailableDuration; ? ? } ? ? public long[] getLatencyMax() { ? ? ? ? return latencyMax; ? ? } ? ? public void setLatencyMax(final long[] latencyMax) { ? ? ? ? this.latencyMax = latencyMax; ? ? } ? ? public boolean isSendLatencyFaultEnable() { ? ? ? ? return sendLatencyFaultEnable; ? ? } ? ? public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { ? ? ? ? this.sendLatencyFaultEnable = sendLatencyFaultEnable; ? ? } ? ? public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { ? ? ? ? //是否開(kāi)啟故障延遲機(jī)制 ? ? ? ? if (this.sendLatencyFaultEnable) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? int index = tpInfo.getSendWhichQueue().getAndIncrement(); ? ? ? ? ? ? ? ? for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { ? ? ? ? ? ? ? ? ? ? int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); ? ? ? ? ? ? ? ? ? ? if (pos < 0) ? ? ? ? ? ? ? ? ? ? ? ? pos = 0; ? ? ? ? ? ? ? ? ? ? MessageQueue mq = tpInfo.getMessageQueueList().get(pos); ? ? ? ? ? ? ? ? ? ? //判斷Queue是否可用 ? ? ? ? ? ? ? ? ? ? if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { ? ? ? ? ? ? ? ? ? ? ? ? if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) ? ? ? ? ? ? ? ? ? ? ? ? ? ? return mq; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); ? ? ? ? ? ? ? ? int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); ? ? ? ? ? ? ? ? if (writeQueueNums > 0) { ? ? ? ? ? ? ? ? ? ? final MessageQueue mq = tpInfo.selectOneMessageQueue(); ? ? ? ? ? ? ? ? ? ? if (notBestBroker != null) { ? ? ? ? ? ? ? ? ? ? ? ? mq.setBrokerName(notBestBroker); ? ? ? ? ? ? ? ? ? ? ? ? mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return mq; ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? latencyFaultTolerance.remove(notBestBroker); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? log.error("Error occurred when selecting message queue", e); ? ? ? ? ? ? } ? ? ? ? ? ? return tpInfo.selectOneMessageQueue(); ? ? ? ? } ? ? ? ? //默認(rèn)輪詢 ? ? ? ? return tpInfo.selectOneMessageQueue(lastBrokerName); ? ? } ? ? public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { ? ? ? ? if (this.sendLatencyFaultEnable) { ? ? ? ? ? ? long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); ? ? ? ? ? ? this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); ? ? ? ? } ? ? } ? ? private long computeNotAvailableDuration(final long currentLatency) { ? ? ? ? for (int i = latencyMax.length - 1; i >= 0; i--) { ? ? ? ? ? ? if (currentLatency >= latencyMax[i]) ? ? ? ? ? ? ? ? return this.notAvailableDuration[i]; ? ? ? ? } ? ? ? ? return 0; ? ? } }
在選擇查找路由時(shí),選擇消息隊(duì)列的關(guān)鍵步驟:
- 先按輪詢算法選擇一個(gè)消息隊(duì)列
- 從故障列表判斷該消息隊(duì)列是否可用
LatencyFaultToleranceImpl中判斷是否可用:
@Override public boolean isAvailable(final String name) { ? ? final FaultItem faultItem = this.faultItemTable.get(name); ? ? if (faultItem != null) { ? ? ? ? return faultItem.isAvailable(); ? ? } ? ? return true; } public boolean isAvailable() { ? ? ? ? ? ? return (System.currentTimeMillis() - startTimestamp) >= 0; ? ? ? ? }
- 判斷是否在故障列表中,不在故障列表中代表可用。
- 在故障列表中判斷當(dāng)前時(shí)間是否大于等于故障規(guī)避的開(kāi)始時(shí)間
startTimestamp
在消息發(fā)送結(jié)束后和發(fā)送出現(xiàn)異常時(shí)調(diào)用updateFaultItem()
方法來(lái)更新故障列表,computeNotAvailableDuration()
根據(jù)響應(yīng)時(shí)間來(lái)計(jì)算故障周期時(shí)長(zhǎng),響應(yīng)時(shí)間越長(zhǎng)故障周期越長(zhǎng)。網(wǎng)絡(luò)異常、Broker異常、客戶端異常都是固定響應(yīng)時(shí)長(zhǎng)30s,它們故障周期時(shí)長(zhǎng)為10分鐘。消息發(fā)送成功或線程中斷異常響應(yīng)時(shí)間在100毫秒以內(nèi),故障周期時(shí)長(zhǎng)為0。
LatencyFaultToleranceImpl類的updateFaultItem方法:
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { ? ? FaultItem old = this.faultItemTable.get(name); ? ? if (null == old) { ? ? ? ? final FaultItem faultItem = new FaultItem(name); ? ? ? ? faultItem.setCurrentLatency(currentLatency); ? ? ? ? faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ? ? ? ? //加入故障列表 ? ? ? ? old = this.faultItemTable.putIfAbsent(name, faultItem); ? ? ? ? if (old != null) { ? ? ? ? ? ? old.setCurrentLatency(currentLatency); ? ? ? ? ? ? old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ? ? ? ? } ? ? } else { ? ? ? ? old.setCurrentLatency(currentLatency); ? ? ? ? old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ? ? } }
FaultItem
存儲(chǔ)Broker名稱、響應(yīng)時(shí)長(zhǎng)、故障規(guī)避開(kāi)始時(shí)間,最重要的是故障規(guī)避開(kāi)始時(shí)間,用來(lái)判斷Queue是否可用
到此這篇關(guān)于RocketMQ設(shè)計(jì)之故障規(guī)避機(jī)制的文章就介紹到這了,更多相關(guān)RocketMQ故障規(guī)避機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談idea live template高級(jí)知識(shí)_進(jìn)階(給方法,類,js方法添加注釋)
下面小編就為大家?guī)?lái)一篇淺談idea live template高級(jí)知識(shí)_進(jìn)階(給方法,類,js方法添加注釋)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06Java中保留兩位小數(shù)的四種方法實(shí)現(xiàn)實(shí)例
今天小編就為大家分享一篇關(guān)于Java中保留兩位小數(shù)的四種方法實(shí)現(xiàn)實(shí)例,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-02-02Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息
這篇文章主要給大家介紹了關(guān)于Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11activemq整合springboot使用方法(個(gè)人微信小程序用)
這篇文章主要介紹了activemq整合springboot使用(個(gè)人微信小程序用),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03java實(shí)現(xiàn)http請(qǐng)求工具類示例
這篇文章主要介紹了java實(shí)現(xiàn)http請(qǐng)求工具類示例,需要的朋友可以參考下2014-05-05SpringBoot項(xiàng)目改為SpringCloud項(xiàng)目使用nacos作為注冊(cè)中心的方法
本文主要介紹了SpringBoot項(xiàng)目改為SpringCloud項(xiàng)目使用nacos作為注冊(cè)中心,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04