欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消息發(fā)送流程源碼剖析

 更新時間:2022年08月01日 11:31:50   作者:奔跑的毛球  
這篇文章主要為大家介紹了RocketMQ消息發(fā)送流程源碼剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

正文

就是說,我們打了個比方,把RocketMQ比作碼頭上的一個小房子,來送孩子登船的家長比作生產(chǎn)者,拉走孩子們的船夫比作消費者,所以,RocketMQ的故事就這么展開了。

這節(jié)我們研究研究,消息的發(fā)送流程。也就是說,消息孩子從進(jìn)門到坐到message queue座位上都經(jīng)歷了啥。

父母把消息孩子送到碼頭之后,門口的門童defaultMQProducerImpl.send()接過孩子,進(jìn)入到MQ房子內(nèi)部,然后引導(dǎo)孩子進(jìn)入Broker候船大廳內(nèi)的message queue座位上就坐。這就是消息發(fā)送的流程了。

而且孩子在剛被門童接到之后,就被規(guī)定了能在候船大廳待多久,默認(rèn)是3秒。也就是說,要是再小房子內(nèi)等了三秒沒走,就離開吧,你怕是沒想明白自己來干啥的。這就是消息的超時時間。

讀源碼

1 調(diào)用defaultMQProducerImpl.send()

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

2 設(shè)置過期時間

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

3 執(zhí)行defaultMQProducerImpl.sendDefaultImpl()方法

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

這里看看這幾個參數(shù),

  • communicationMode 是通信模式,同步異步還是單向
  • sendCallback 是針對異步模式的,異步模式需要設(shè)置發(fā)送完成后的回調(diào)。

sendDefaultImpl是發(fā)送消息的核心方法。

這里消息孩子進(jìn)到第一個卡口,先要檢查送孩子來的家長是否還能聯(lián)系上,若是能聯(lián)系到,就繼續(xù)。要是聯(lián)系不到,這孩子豈不是被拋棄了,不敢接不敢接,送到孤兒院吧。

然后需要檢查消息孩子了,首先是檢查孩子還在不在,別扔個衣服跑了。
然后看看孩子指定的這個topic,不能說我想去內(nèi)個topic哈,必須是實實在在的名字。而且上頭也規(guī)定了,這個topic的名字也不能太長,也不能包含特殊字符。已有的一些領(lǐng)導(dǎo)定過的也不能用哈。
接下來就是檢查孩子的body了,之前說body就是孩子的技能,首先,技能為空,不行不行,啥都不會是不行的。再者太長也不行,你唱首歌兩年,這沒法玩。

檢查message不為null

檢查topic

  • topic不能為空
  • topic不能太長
  • 不能包含特殊字符

檢查話題的名字是否被系統(tǒng)已占用

檢查body

  • 檢查是否為空
  • 檢查長度是否過長,最大為4MB 這樣

下邊我們看看sendDefaultImpl這個方法。給他拆成一段一段的看。

1 兩個校驗

//校驗生產(chǎn)者服務(wù)是ok的,可以聯(lián)系到的
this.makeSureStateOK();
//校驗消息的參數(shù)
Validators.checkMessage(msg, this.defaultMQProducer);
  • 第一個檢查,檢查生產(chǎn)者服務(wù)是否是正常工作的,若是不正常工作,就拋出異常。
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}
  • 第二個檢查,檢查消息本身是否為空,檢查topic,檢查消息的body
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // 這里校驗Topic的時候,校驗了不能為空,長度和特殊字符
    Validators.checkTopic(msg.getTopic());
    //這里則校驗了一些不允許使用的topic名字
    Validators.isNotAllowedSendTopic(msg.getTopic());
    // body不為空
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    // body長度不為0
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    // body 長度不能過長
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

2 獲取topic路由信息

嗯,這里孩子終于通過了檢查,服務(wù)人員開始帶著他去找自己指定的topic區(qū)域,指定是自己指定,劃分還是工作人員劃分的。咱總得知道這個topic區(qū)域在哪吧。

先去緩存筆記里找,有沒有這個區(qū)域的信息,若是沒有這個topic,就新建一個,然后更新到緩存筆記里邊。若有topic但是不知道在哪,就找name server大腦去申請這個topic在哪的信息。

執(zhí)行tryToFindTopicPublishInfo方法去獲取Topic的路由信息,若是不存在就新建,若是有topic但是緩存中沒有路由信息,則通過name server獲取路由信息。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //獲取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //不存在
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //新建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //包含路由信息就直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //不包含路由信息則向name server申請,修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3 計算重試次數(shù)

這就是計算消息孩子可以嘗試去找地方坐幾次,沒坐上,欸,我又來了,沒坐上,欸,我又來了。

這行代碼就是計算重試次數(shù)的,根據(jù)communicationMode傳入的值,同步異步還是單向的來決定重試次數(shù)是幾次。 很明顯,若是同步的,就會嘗試三次。若是異步的或者單向的就只發(fā)送一次。

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

4 執(zhí)行隊列選擇方法

我們之前說了,Broker類似于候船大廳,為了均分壓力,每次都要進(jìn)與上次不同的候船大廳。

執(zhí)行selectOneMessageQueue方法通過Queue將消息發(fā)送到與上次不同的一個Broker。也可以通過 sendLatencyFaultEnable判斷是否啟用延遲容錯開關(guān)

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

5 發(fā)送消息

這就是走過巷道坐到屬于自己的座位上了

然后就通過sendKernelImpl發(fā)送消息了,這是發(fā)送消息的核心方法。會準(zhǔn)備通信層的入?yún)?,并將請求發(fā)送給通信層,內(nèi)部實現(xiàn)是基于Netty的。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

以上就是RocketMQ消息發(fā)送流程源碼剖析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息發(fā)送流程的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Eclipse/MyEclipse轉(zhuǎn)IntelliJ IDEA完全攻略(圖文)

    Eclipse/MyEclipse轉(zhuǎn)IntelliJ IDEA完全攻略(圖文)

    這篇文章主要介紹了Eclipse/MyEclipse轉(zhuǎn)IntelliJ IDEA完全攻略(圖文),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • 淺談SpringMVC中的session用法及細(xì)節(jié)記錄

    淺談SpringMVC中的session用法及細(xì)節(jié)記錄

    下面小編就為大家?guī)硪黄獪\談SpringMVC中的session用法及細(xì)節(jié)記錄。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • Java 判斷實體對象及所有屬性是否為空的操作

    Java 判斷實體對象及所有屬性是否為空的操作

    這篇文章主要介紹了Java 判斷實體對象及所有屬性是否為空的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Java之String.format()方法案例講解

    Java之String.format()方法案例講解

    這篇文章主要介紹了Java之String.format()方法案例講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • springcloud feign傳輸List的坑及解決

    springcloud feign傳輸List的坑及解決

    這篇文章主要介紹了springcloud feign傳輸List的坑及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java中Switch的使用方法及新特性

    Java中Switch的使用方法及新特性

    在java中控制流程語句是由選擇語句、循環(huán)語句、跳轉(zhuǎn)語句構(gòu)成,選擇語句包括if和switch,在過多的使用if語句嵌套會使程序很難閱讀,這時就可以用到switch語句,這篇文章主要給大家介紹了關(guān)于Java中Switch的使用方法及新特性的相關(guān)資料,需要的朋友可以參考下
    2023-11-11
  • JAVA自定義注解詳情

    JAVA自定義注解詳情

    這篇文章主要介紹了Java自定義注解,結(jié)合實例形式總結(jié)分析了java常見的自定義注解類型、功能、用法及操作注意事項,需要的朋友可以參考下
    2021-10-10
  • Java線程的調(diào)度與優(yōu)先級詳解

    Java線程的調(diào)度與優(yōu)先級詳解

    這篇文章主要為大家詳細(xì)介紹了Java線程的調(diào)度與優(yōu)先級,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • 手把手教學(xué)Win10同時安裝兩個版本的JDK并隨時切換(JDK8和JDK11)

    手把手教學(xué)Win10同時安裝兩個版本的JDK并隨時切換(JDK8和JDK11)

    最近在學(xué)習(xí)JDK11的一些新特性,但是日常使用基本上都是基于JDK8,因此,需要在win環(huán)境下安裝多個版本的JDK,下面這篇文章主要給大家介紹了手把手教學(xué)Win10同時安裝兩個版本的JDK(JDK8和JDK11)并隨時切換的相關(guān)資料,需要的朋友可以參考下
    2023-03-03
  • SpringBoot讀取excel表格的示例代碼

    SpringBoot讀取excel表格的示例代碼

    這篇文章主要介紹了SpringBoot讀取excel表格的示例代碼,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-10-10

最新評論