java開發(fā)RocketMQ生產(chǎn)者高可用示例詳解
引言
前邊兩章說了點(diǎn)基礎(chǔ)的,從這章開始,我們挖挖源碼。看看RocketMQ是怎么工作的。
首先呢,這個(gè)生產(chǎn)者就是送孩子去碼頭的家長,孩子們呢,就是消息了。
我們看看消息孩子們都長啥樣。
1 消息
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主題名字 private String topic; //消息擴(kuò)展信息,Tag,keys,延遲級(jí)別都存在這里 private Map<String, String> properties; //消息體,字節(jié)數(shù)組 private byte[] body; //設(shè)置消息的key, public void setKeys(String keys) {} //設(shè)置topic public void setTopic(String topic) {} //延遲級(jí)別 public int setDelayTimeLevel(int level) {} //消息過濾的標(biāo)記 public void setTags(String tags) {} //擴(kuò)展信息存放在此 public void putUserProperty(final String name, final String value) {} }
消息就是孩子們,這些孩子們呢,有各自的特點(diǎn),也有共性。同一個(gè)家長送來的兩個(gè)孩子可以是去同一個(gè)地方的,也可以是去不同的地方的。
1.1 topic
首先呢,每個(gè)孩子消息都有一個(gè)屬性topic,這個(gè)我們上文說到了,是一個(gè)候船大廳。孩子們進(jìn)來之后,走到自己指定的候船大廳的指定區(qū)域(平時(shí)出門坐火車高鐵不也是指定的站臺(tái)乘車么),坐到message queue座位上等,等著出行。
Broker有一個(gè)或者多個(gè)topic,消息會(huì)存放到topic內(nèi)的message queue內(nèi),等待被消費(fèi)。
1.2 Body
孩子消息,也有一個(gè)Body屬性,這就是他的能力,他會(huì)畫畫,他會(huì)唱歌,他會(huì)干啥干啥,就記錄在這個(gè)Body屬性里。等走出去了,體現(xiàn)價(jià)值的地方也是這個(gè)Body屬性。
Body就是消息體,消費(fèi)者會(huì)根據(jù)消息體執(zhí)行對(duì)應(yīng)的操作。
1.3 tag
這個(gè)tag我們上節(jié)說了,就是一個(gè)標(biāo)記,有的孩子背著畫板,相機(jī),有的游船就特意找到這些孩子拉走,完成他們的任務(wù)。
可以給消息設(shè)置tag屬性,消費(fèi)者可以選擇含有特定tag屬性的消息進(jìn)行消費(fèi)。
1.4 key
key就是每個(gè)孩子消息的名字了。要找哪個(gè)孩子,喊他名就行。
對(duì)發(fā)送的消息設(shè)置好 Key,以后可以根據(jù)這個(gè)Key 來查找消息。比如消息異常,消息丟失,進(jìn)行查找會(huì)很方便。
1.5 延遲級(jí)別
當(dāng)然,還有的孩子來就不急著走,來之前就想好了,要恰個(gè)飯,得30分鐘,所以自己來了會(huì)等30分鐘后被接走。
設(shè)置延遲級(jí)別可以規(guī)定多久后消息可以被消費(fèi)。
2 生產(chǎn)者高可用
每個(gè)送孩子來的家長都希望能送到候船大廳里,更不希望孩子被搞丟了,這個(gè)時(shí)候這個(gè)候船大廳就需要一些保證機(jī)制了。
2.1 客戶端保證生產(chǎn)者高可用
2.1.1 重試機(jī)制
就是說家長送來了,孩子進(jìn)到候船大廳之后,沒能成功坐到message queue座位上,這個(gè)時(shí)候工作人員會(huì)安排重試,再去看是否有座位坐。重試次數(shù)默認(rèn)是2次,也就是說,消息孩子共有3次找座位坐的機(jī)會(huì)。
看源碼,我特意加了注解,大致可以看懂一些了。
//這里取到了重試的次數(shù) int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //獲取消息隊(duì)列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //發(fā)送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException拋出了異常,其他的exception都會(huì)繼續(xù)重試 throw e; } } else { break; } }
重試代碼如上,這個(gè)sendDefaultImpl
方法中,會(huì)嘗試發(fā)送三次消息,若是都失敗,才會(huì)拋出對(duì)應(yīng)的錯(cuò)誤。
2.1.2 客戶端容錯(cuò)
若是有多個(gè)Broker候車大廳的時(shí)候,服務(wù)人員會(huì)安排消息孩子選擇一個(gè)相對(duì)不擁擠,比較容易進(jìn)入的來進(jìn)入。當(dāng)然那些已經(jīng)關(guān)閉的,停電的,沒有服務(wù)能力的,我們是不會(huì)進(jìn)的。
MQ Client會(huì)維護(hù)一個(gè)Broker的發(fā)送延遲信息,根據(jù)這個(gè)信息會(huì)選擇一個(gè)相對(duì)延遲較低的Broker來發(fā)送消息。會(huì)主動(dòng)剔除哪些已經(jīng)宕機(jī),不可用或發(fā)送延遲級(jí)別較高的Broker.
選擇Broker
就是在選擇message queue
,對(duì)應(yīng)的代碼如下:
這里會(huì)先判斷延遲容錯(cuò)開關(guān)是否開啟,這個(gè)開關(guān)默認(rèn)是關(guān)閉的,若是開啟的話,會(huì)優(yōu)先選擇延遲較低的Broker。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判斷發(fā)送延遲容錯(cuò)開關(guān)是否開啟 if (this.sendLatencyFaultEnable) { try { //選擇一個(gè)延遲上可以接受,并且和上次發(fā)送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延遲時(shí)間可以接受,則返回這個(gè)Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步?jīng)]能選中一個(gè)Broker,就選擇一個(gè)延遲較低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前邊都沒選中一個(gè)Broker,就隨機(jī)選一個(gè)Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
但是當(dāng)延遲容錯(cuò)開關(guān)為關(guān)閉狀態(tài)的時(shí)候,執(zhí)行的代碼如下:
為了均勻分散Broker的壓力,會(huì)選擇與之前不同的Broker。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是沒有上次的Brokername做參考,就隨機(jī)選一個(gè) if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就選一個(gè)其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //這里判斷遇上一個(gè)使用的Broker不是同一個(gè) if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上邊的都沒選中,那么就隨機(jī)選一個(gè) return selectOneMessageQueue(); } }
2.2 Broker端保證生產(chǎn)者高可用
Broker候船大廳為了能確切的接收到消息孩子,至少會(huì)有兩個(gè)廳,一個(gè)主廳一個(gè)副廳,一般來說孩子都會(huì)進(jìn)入到主廳,然后一頓操作,卡該忙信那機(jī)資(影分身之術(shù)),然后讓分身進(jìn)入到副廳,這樣當(dāng)主廳停電了,不工作了,副廳的分身只要去完成了任務(wù)就ok的。一般來說都是主廳的消息孩子去坐船完成任務(wù)。
之后我們會(huì)聊到Broker的主從復(fù)制,分為同步復(fù)制和異步復(fù)制,同步復(fù)制時(shí)指當(dāng)master 收到消息之后,同步到slaver才算消息發(fā)送成功。異步復(fù)制是只要master收到消息就算成功。生產(chǎn)中建議至少部署兩臺(tái)master和兩臺(tái)slaver。
下一篇,我們聊聊,消息的發(fā)送流程,就是說,一個(gè)消息孩子,從進(jìn)碼頭的門到坐到message queue座位上,都經(jīng)歷了啥。
以上就是java開發(fā)RocketMQ生產(chǎn)者高可用示例詳解的詳細(xì)內(nèi)容,更多關(guān)于java RocketMQ生產(chǎn)者高可用的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
簡(jiǎn)述springboot及springboot cloud環(huán)境搭建
這篇文章主要介紹了簡(jiǎn)述springboot及springboot cloud環(huán)境搭建的方法,包括spring boot 基礎(chǔ)應(yīng)用環(huán)境搭建,需要的朋友可以參考下2017-07-07java中實(shí)現(xiàn)對(duì)象排序的兩種方法(Comparable,Comparator)
這篇文章主要給大家介紹了關(guān)于java中實(shí)現(xiàn)對(duì)象排序的兩種方法,一種是實(shí)現(xiàn)Comparable進(jìn)行排序,另一種是實(shí)現(xiàn)Comparator進(jìn)行排序,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-12-12SpringBoot項(xiàng)目接入Nacos的實(shí)現(xiàn)步驟
SpringBoot項(xiàng)目使用nacos作為配置中心和服務(wù)注冊(cè)中心,同時(shí)兼容dubbo的注冊(cè)中心。 本Demo項(xiàng)目使用的SpringBoot版本是2.3.9.RELEASE2021-05-05IntelliJ?IDEA教程之clean或者install?Maven項(xiàng)目的操作方法
這篇文章主要介紹了IntelliJ?IDEA教程之clean或者install?Maven項(xiàng)目的操作方法,本文分步驟給大家介紹兩種方式講解如何調(diào)試出窗口,需要的朋友可以參考下2023-04-04微服務(wù)之間如何通過feign調(diào)用接口上傳文件
這篇文章主要介紹了微服務(wù)之間如何通過feign調(diào)用接口上傳文件的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06