RocketMQ?producer容錯機制源碼解析
1. 前言
本文主要是介紹一下RocketMQ消息生產(chǎn)者在發(fā)送消息的時候發(fā)送失敗的問題處理?這里有兩個點,一個是關(guān)于消息的處理,一個是關(guān)于broker的處理,比如說發(fā)送消息到broker-a的broker失敗了,我們可能下次就不想發(fā)送到這個broker-a,這就涉及到一個選擇broker的問題,也就是選擇MessageQueue的問題。
2. 失敗重試
其實失敗重試我們在介紹RocketMQ消息生產(chǎn)者發(fā)送消息的時候介紹過了,其實同步發(fā)送與異步發(fā)送都會失敗重試的,比如說我發(fā)送一個消息,然后超時了,這時候在MQProducer層就會進(jìn)行控制重試,默認(rèn)是重試2次的,加上你發(fā)送那次,一共是發(fā)送3次,如果重試完還是有問題的話,這個時候就會拋出異常了。
我們來看下這一塊的代碼實現(xiàn)( DefaultMQProducerImpl 類sendDefaultImpl方法):

這塊其實就是用for循環(huán)實現(xiàn)的,其實不光RocketMQ,分布式遠(yuǎn)程調(diào)用框架Dubbo的失敗重試也是用for循環(huán)實現(xiàn)的。
3. 延遲故障
我們都知道,在RocketMQ中一個topic其實是有多個MessageQueue這么一個概念的,然后這些MessageQueue可能對應(yīng)著不同的broker name,比如說id是0和1的MessageQueue 對應(yīng)的broker name是 broker-a ,然后id是2和3的MessageQueue對應(yīng)的broker name 是broker-b
我們發(fā)送消息的時候,其實涉及到發(fā)送給哪個MessageQueue這么一個問題,當(dāng)然我們可以在發(fā)送消息的時候指定這個MessageQueue,如果你不指定的話,RocketMQ就會根據(jù)MQFaultStrategy 這么一個策略類給選擇出來一個MessageQueue。
我們先來看下是在哪里選擇的,其實就是在我們重試的循環(huán)中: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
...
// 重試發(fā)送
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// todo 選擇message queue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
...
我們可以看到,它會把topicPublishInfo 與 lastBrokerName 作為參數(shù)傳進(jìn)去,topicPublishInfo 里面其實就是那一堆MessageQueue, 然后這個lastBrokerName 是上次我們選擇的那個broker name , 這個接著我們來看下這個selectOneMessageQueue實現(xiàn):
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// todo
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
可以看到它調(diào)用了MQFaultStrategy 這個類的selectOneMessageQueue 方法,我們接著進(jìn)去:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 發(fā)送延遲故障啟用,默認(rèn)為false
if (this.sendLatencyFaultEnable) {
try {
// 獲取一個index
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 選取的這個broker是可用的 直接返回
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 到這里 找了一圈 還是沒有找到可用的broker
// todo 選擇 距離可用時間最近的
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// todo
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
這種延遲故障策略其實是由sendLatencyFaultEnable來控制的,它默認(rèn)是關(guān)閉的。
3.1 最普通的選擇策略
我們先來看下最普通的選擇策略,可以看到調(diào)用了TopicPublishInfo 的selectOneMessageQueue方法:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 消息第一個發(fā)送的時候 還沒有重試 也沒有上一個brokerName
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 這個 出現(xiàn)在重試的時候
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 避開 上次發(fā)送的brokerName
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// todo 到最后 沒有避開 只能隨機選一個
return selectOneMessageQueue();
}
}
它這里里面分成了2部分,一個是沒有 這個lastBroker的,也就是這個這個消息還沒有被重試過,這是第一次發(fā)送這個消息,這個時候它的lastBrokerName就是null,然后他就會直接走selectOneMessageQueue 這個無參方法。
public MessageQueue selectOneMessageQueue() {
// 相當(dāng)于 某個線程輪詢
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
先是獲取這個index ,然后使用index % MessageQueue集合的大小獲得一個MessageQueue集合值的一個下標(biāo)(索引),這個index 其實某個線程內(nèi)自增1的,這樣就形成了某個線程內(nèi)輪詢的效果。這個樣子的話,同步發(fā)送其實就是單線程的輪詢,異步發(fā)送就是多個線程并發(fā)發(fā)送,然后某個線程內(nèi)輪詢,我們看下他這個單個線程自增1效果是怎樣實現(xiàn)的。
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
// 如果不存在就創(chuàng)建 然后設(shè)置到threadLocalIndex中
if (null == index) {
index = Math.abs(random.nextInt());
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
this.threadLocalIndex.set(index);
return index;
}
}
可以看到這個sendWhichQueue 是用ThreadLocal實現(xiàn)的,然后這個樣子就可以一個線程一個index,而且不會出現(xiàn)線程安全問題。
好了這里我們就把這個消息第一次發(fā)送時候MessageQueue看完了,然后我們再來看下它其他重試的時候是怎樣選擇的,也就是lastBrokerName不是null的時候:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 消息第一個發(fā)送的時候 還沒有重試 也沒有上一個brokerName
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 這個 出現(xiàn)在重試的時候
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 避開 上次發(fā)送的brokerName
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// todo 到最后 沒有避開 只能隨機選一個
return selectOneMessageQueue();
}
}
這里其實就是選擇一個不是lastBrokerName 的MessageQueue,可以看到它是循環(huán) MessageQueue 集合大小數(shù)個,這樣可能把所有的MessageQueue都看一遍,注意 這個循環(huán)只是起到選多少次的作用,具體的選擇還是要走某線程輪詢的那一套,到最后是在是選不出來了,也就是沒有這一堆MessageQueue都是在lastBrokerName上的,只能調(diào)用selectOneMessageQueue輪詢選一個了。
到這我們就把最普通的選擇一個MessageQueue介紹完了。
3.2 延遲故障的實現(xiàn)
下面我們再來介紹下那個延遲故障的實現(xiàn),這個其實就是根據(jù)你這個broker 的響應(yīng)延遲時間的大小,來影響下次選擇這個broker的權(quán)重,他不是絕對的,因為根據(jù)它這個規(guī)則是在找不出來的話,他就會使用那套普通選擇算法來找個MessageQueue。
它是這樣一個原理:
- 在每次發(fā)送之后都收集一下它這次的一個響應(yīng)延遲,比如我10點1分1秒200毫秒給broker-a了一個消息,然后到了10點1分1秒900毫秒的時候才收到broker-a 的一個sendResult也就是響應(yīng),這個時候他就是700ms的延遲,它會跟你就這個300ms的延遲找到一個時間范圍,他就認(rèn)為你這個broker-a 這個broker 在某個時間段內(nèi),比如說30s內(nèi)是不可用的。然后下次選擇的時候,他在第一輪會找那些可用的broker,找不到的話,就找那些上次不是這個broker的,還是找不到的話,他就絕望了,用最普通的方式,也就是上面說的那種輪詢算法找一個MessageQueue出來。
接下來我們先來看下它的收集延遲的部分,是這個樣子的,還是在這個失敗重試?yán)锩妫缓笏鼤陧憫?yīng)后或者異常后面都加一行代碼來收集這些延遲:
... // todo 進(jìn)行發(fā)送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // todo isolation 參數(shù)為false(看一下異常情況) this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); ...
這是正常響應(yīng)后的,注意它的isolation 參數(shù),也就是隔離 是false,在看下異常的
...
catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
}
...
他這個isolation 參數(shù)就是true ,也就是需要隔離的意思。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// todo
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
可以看到是調(diào)用了mqFaultStrategy 的updateFaultItem 方法:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// 是否開啟延遲故障容錯
if (this.sendLatencyFaultEnable) {
// todo 計算不可用持續(xù)時間
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// todo 存儲
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
先是判斷是否開啟了這個延遲故障的這么一個配置,默認(rèn)是不啟動的,但是你可以自己啟動set下就可以了setSendLatencyFaultEnable(true)
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendLatencyFaultEnable(true);
首先是計算這個它認(rèn)為broker不可用的這么一個時間,參數(shù)就是你那個響應(yīng)延遲,熔斷的話就配置30000毫秒, 否則的話就是正常的那個響應(yīng)時間
/**
* 計算不可用持續(xù)時間
* @param currentLatency 當(dāng)前延遲
*/
private long computeNotAvailableDuration(final long currentLatency) {
// latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
// notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 倒著遍歷
for (int i = latencyMax.length - 1; i >= 0; i--) {
// 如果延遲大于某個時間,就返回對應(yīng)服務(wù)不可用時間,可以看出來,響應(yīng)延遲100ms以下是沒有問題的
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
他這個計算規(guī)則是這個樣子的,他有兩個數(shù)組,一個是響應(yīng)延遲的,一個是不可使用的時間,兩個排列都是從小到大的順序,倒著先找響應(yīng)延遲,如果你這個延遲大于某個時間,就找對應(yīng)下標(biāo)的不可使用的時間,比如說響應(yīng)延遲700ms,這時候他就會找到30000ms不可使用時間。
計算完這個不可使用時間后接著調(diào)用了latencyFaultTolerance的updateFaultItem方法,這個方法其實就是用來存儲的:
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 從緩存中獲取
FaultItem old = this.faultItemTable.get(name);
// 緩存沒有的情況
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
// 設(shè)置延遲
faultItem.setCurrentLatency(currentLatency);
// 設(shè)置啟用時間
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
// 設(shè)置faultItemTable 中
old = this.faultItemTable.putIfAbsent(name, faultItem);
// 如果已經(jīng)有了,拿到 老的進(jìn)行更新
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
// 緩存中已經(jīng)有了,直接拿老的進(jìn)行更新
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
他有個faultItemTable 這個緩存,記錄著 每個broker的FaultItem的項,這個FaultItem就是保存它能夠使用的一個時間(當(dāng)前時間戳+不可使用時間),其實這個方法就是做更新或者插入操作。
好了到這我們就把它這個收集響應(yīng)延遲指標(biāo)與計算可用時間這快就解析完了,再回頭看下那個選擇MessageQueue的方法:

可以看到它先是找那種可用的,然后不是上一個broker的那個,如果好幾輪下來沒有找到的話就選擇一個
public String pickOneAtLeast() {
// 將map中里面的放到tmpList 中
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
// 如果不是null
if (!tmpList.isEmpty()) {
// 洗牌算法
Collections.shuffle(tmpList);
// 排序
Collections.sort(tmpList);
final int half = tmpList.size() / 2;
// 沒有 2臺機器
if (half <= 0) {
// 選擇第一個
return tmpList.get(0).getName();
} else {
// 有2臺機器及以上,某個線程內(nèi)隨機選排在前半段的broker
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}
return null;
}
先是排序,然后將所有的broker/2 ,如果是小于等于0的話,說明就2個broker以下,選第一個,如果是2臺以上,就輪詢選一個
先來看下排序規(guī)則:
/**
* 失敗條目(規(guī)避規(guī)則條目)
*/
class FaultItem implements Comparable<FaultItem> {
// 條目唯一鍵,這里是brokerName
private final String name;
// todo currentLatency 和startTimestamp 被volatile修飾
// 本次消息發(fā)送的延遲時間
private volatile long currentLatency;
// 故障規(guī)避的開始時間
private volatile long startTimestamp;
public FaultItem(final String name) {
this.name = name;
}
@Override
public int compareTo(final FaultItem other) {
// 將能提供服務(wù)的放前面
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable())
return -1;
if (other.isAvailable())
return 1;
}
// 找延遲低的 放前面
if (this.currentLatency < other.currentLatency)
return -1;
else if (this.currentLatency > other.currentLatency) {
return 1;
}
// 找最近能提供服務(wù)的 放前面
if (this.startTimestamp < other.startTimestamp)
return -1;
else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}
它是把能提供服務(wù)的放前面,然后沒有,就找那種延遲低的放前面,也沒有的話就找最近能提供服務(wù)的放前頭。 找到這個broker 之后然后根據(jù)這個broker name 獲取寫隊列的個數(shù),其實你這個寫隊列個數(shù)有幾個,然后你這個broker對應(yīng)的MessageQueue就有幾個,如果write size >0的話,然后這個broker 不是null,就找一個mq,然后設(shè)置上它的broker name 與queue id
如果write<=0,直接移除這個broker對應(yīng)FaultItem,最后實在是找不到就按照上面那種普通方法來找了。
好了,到這我們延遲故障也介紹完成了。
參考文章
以上就是RocketMQ producer容錯機制源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ producer容錯機制的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
新手學(xué)習(xí)JQuery基本操作和使用案例解析
這篇文章主要介紹了新手學(xué)習(xí)JQuery基本操作和使用案例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-02-02
SpringBoot集成Druid配置(yaml版本配置文件)詳解
這篇文章主要介紹了SpringBoot集成Druid配置(yaml版本配置文件),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12
SpringBoot?MongoCustomConversions自定義轉(zhuǎn)換方式
這篇文章主要介紹了SpringBoot?MongoCustomConversions自定義轉(zhuǎn)換方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08

