RocketMQ消息發(fā)送流程源碼剖析
正文
就是說(shuō),我們打了個(gè)比方,把RocketMQ比作碼頭上的一個(gè)小房子,來(lái)送孩子登船的家長(zhǎng)比作生產(chǎn)者,拉走孩子們的船夫比作消費(fèi)者,所以,RocketMQ的故事就這么展開(kāi)了。
這節(jié)我們研究研究,消息的發(fā)送流程。也就是說(shuō),消息孩子從進(jìn)門到坐到message queue座位上都經(jīng)歷了啥。
父母把消息孩子送到碼頭之后,門口的門童defaultMQProducerImpl.send()接過(guò)孩子,進(jìn)入到MQ房子內(nèi)部,然后引導(dǎo)孩子進(jìn)入Broker候船大廳內(nèi)的message queue座位上就坐。這就是消息發(fā)送的流程了。
而且孩子在剛被門童接到之后,就被規(guī)定了能在候船大廳待多久,默認(rèn)是3秒。也就是說(shuō),要是再小房子內(nèi)等了三秒沒(méi)走,就離開(kāi)吧,你怕是沒(méi)想明白自己來(lái)干啥的。這就是消息的超時(shí)時(shí)間。
讀源碼
1 調(diào)用defaultMQProducerImpl.send()
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
2 設(shè)置過(guò)期時(shí)間
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
3 執(zhí)行defaultMQProducerImpl.sendDefaultImpl()方法
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
這里看看這幾個(gè)參數(shù),
- communicationMode 是通信模式,同步異步還是單向
- sendCallback 是針對(duì)異步模式的,異步模式需要設(shè)置發(fā)送完成后的回調(diào)。
sendDefaultImpl是發(fā)送消息的核心方法。
這里消息孩子進(jìn)到第一個(gè)卡口,先要檢查送孩子來(lái)的家長(zhǎng)是否還能聯(lián)系上,若是能聯(lián)系到,就繼續(xù)。要是聯(lián)系不到,這孩子豈不是被拋棄了,不敢接不敢接,送到孤兒院吧。
然后需要檢查消息孩子了,首先是檢查孩子還在不在,別扔個(gè)衣服跑了。
然后看看孩子指定的這個(gè)topic,不能說(shuō)我想去內(nèi)個(gè)topic哈,必須是實(shí)實(shí)在在的名字。而且上頭也規(guī)定了,這個(gè)topic的名字也不能太長(zhǎng),也不能包含特殊字符。已有的一些領(lǐng)導(dǎo)定過(guò)的也不能用哈。
接下來(lái)就是檢查孩子的body了,之前說(shuō)body就是孩子的技能,首先,技能為空,不行不行,啥都不會(huì)是不行的。再者太長(zhǎng)也不行,你唱首歌兩年,這沒(méi)法玩。
檢查message不為null
檢查topic
- topic不能為空
- topic不能太長(zhǎng)
- 不能包含特殊字符
檢查話題的名字是否被系統(tǒng)已占用
檢查body
- 檢查是否為空
- 檢查長(zhǎng)度是否過(guò)長(zhǎng),最大為4MB 這樣
下邊我們看看sendDefaultImpl這個(gè)方法。給他拆成一段一段的看。
1 兩個(gè)校驗(yàn)
//校驗(yàn)生產(chǎn)者服務(wù)是ok的,可以聯(lián)系到的 this.makeSureStateOK(); //校驗(yàn)消息的參數(shù) Validators.checkMessage(msg, this.defaultMQProducer);
- 第一個(gè)檢查,檢查生產(chǎn)者服務(wù)是否是正常工作的,若是不正常工作,就拋出異常。
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}
- 第二個(gè)檢查,檢查消息本身是否為空,檢查topic,檢查消息的body
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// 這里校驗(yàn)Topic的時(shí)候,校驗(yàn)了不能為空,長(zhǎng)度和特殊字符
Validators.checkTopic(msg.getTopic());
//這里則校驗(yàn)了一些不允許使用的topic名字
Validators.isNotAllowedSendTopic(msg.getTopic());
// body不為空
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
// body長(zhǎng)度不為0
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
// body 長(zhǎng)度不能過(guò)長(zhǎng)
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
2 獲取topic路由信息
嗯,這里孩子終于通過(guò)了檢查,服務(wù)人員開(kāi)始帶著他去找自己指定的topic區(qū)域,指定是自己指定,劃分還是工作人員劃分的。咱總得知道這個(gè)topic區(qū)域在哪吧。
先去緩存筆記里找,有沒(méi)有這個(gè)區(qū)域的信息,若是沒(méi)有這個(gè)topic,就新建一個(gè),然后更新到緩存筆記里邊。若有topic但是不知道在哪,就找name server大腦去申請(qǐng)這個(gè)topic在哪的信息。
執(zhí)行tryToFindTopicPublishInfo方法去獲取Topic的路由信息,若是不存在就新建,若是有topic但是緩存中沒(méi)有路由信息,則通過(guò)name server獲取路由信息。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//獲取topic信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//不存在
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//新建
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//修改topic的路由信息并更新到本地
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//包含路由信息就直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//不包含路由信息則向name server申請(qǐng),修改topic的路由信息并更新到本地
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
3 計(jì)算重試次數(shù)
這就是計(jì)算消息孩子可以嘗試去找地方坐幾次,沒(méi)坐上,欸,我又來(lái)了,沒(méi)坐上,欸,我又來(lái)了。
這行代碼就是計(jì)算重試次數(shù)的,根據(jù)communicationMode傳入的值,同步異步還是單向的來(lái)決定重試次數(shù)是幾次。 很明顯,若是同步的,就會(huì)嘗試三次。若是異步的或者單向的就只發(fā)送一次。
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
4 執(zhí)行隊(duì)列選擇方法
我們之前說(shuō)了,Broker類似于候船大廳,為了均分壓力,每次都要進(jìn)與上次不同的候船大廳。
執(zhí)行selectOneMessageQueue方法通過(guò)Queue將消息發(fā)送到與上次不同的一個(gè)Broker。也可以通過(guò) sendLatencyFaultEnable判斷是否啟用延遲容錯(cuò)開(kāi)關(guān)
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
5 發(fā)送消息
這就是走過(guò)巷道坐到屬于自己的座位上了
然后就通過(guò)sendKernelImpl發(fā)送消息了,這是發(fā)送消息的核心方法。會(huì)準(zhǔn)備通信層的入?yún)?,并將?qǐng)求發(fā)送給通信層,內(nèi)部實(shí)現(xiàn)是基于Netty的。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
以上就是RocketMQ消息發(fā)送流程源碼剖析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息發(fā)送流程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Eclipse/MyEclipse轉(zhuǎn)IntelliJ IDEA完全攻略(圖文)
這篇文章主要介紹了Eclipse/MyEclipse轉(zhuǎn)IntelliJ IDEA完全攻略(圖文),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-01-01
淺談SpringMVC中的session用法及細(xì)節(jié)記錄
下面小編就為大家?guī)?lái)一篇淺談SpringMVC中的session用法及細(xì)節(jié)記錄。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05
Java 判斷實(shí)體對(duì)象及所有屬性是否為空的操作
這篇文章主要介紹了Java 判斷實(shí)體對(duì)象及所有屬性是否為空的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12
Java線程的調(diào)度與優(yōu)先級(jí)詳解
這篇文章主要為大家詳細(xì)介紹了Java線程的調(diào)度與優(yōu)先級(jí),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-03-03
手把手教學(xué)Win10同時(shí)安裝兩個(gè)版本的JDK并隨時(shí)切換(JDK8和JDK11)
最近在學(xué)習(xí)JDK11的一些新特性,但是日常使用基本上都是基于JDK8,因此,需要在win環(huán)境下安裝多個(gè)版本的JDK,下面這篇文章主要給大家介紹了手把手教學(xué)Win10同時(shí)安裝兩個(gè)版本的JDK(JDK8和JDK11)并隨時(shí)切換的相關(guān)資料,需要的朋友可以參考下2023-03-03

