欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例

 更新時(shí)間:2022年11月29日 11:19:53   作者:夢(mèng)想實(shí)現(xiàn)家_Z  
這篇文章主要為大家介紹了RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

RocketMQ發(fā)送消息

我們?cè)谑褂肦ocketMQ發(fā)送消息時(shí),一般都會(huì)使用DefaultMQProducer,類型的代碼如下:

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("42.192.50.8:9876");
try {
    producer.start();
    producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
    e.printStackTrace();
} finally {
    producer.shutdown();
}

上述代碼中,在消息發(fā)送之前調(diào)用了start()方法,如果不調(diào)用start()方法,直接發(fā)送消息,那么會(huì)出現(xiàn)以下報(bào)錯(cuò):

報(bào)錯(cuò)消息里面很明顯地告知我們,目前這個(gè)DefaultMQProducer狀態(tài)沒有準(zhǔn)備好,還不能發(fā)送消息。為了一探究竟,我們得去看看start()里面究竟做了什么操作呢?

start()里面究竟做了什么操作

我們根據(jù)源碼一路走下來,可以追蹤到DefaultMQProducerImpl.start(final boolean startFactory)這個(gè)方法:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 創(chuàng)建MQClientInstance
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 注冊(cè)Producer到MQClientInstance中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                // 啟動(dòng)MQClientInstance實(shí)例
                if (startFactory) {
                    mQClientFactory.start();
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

上述代碼主要做了以下幾點(diǎn):

1.創(chuàng)建MQClientInstance實(shí)例;

2.注冊(cè)Producer到MQClientInstance實(shí)例中;

3.啟動(dòng)MQClientInstance實(shí)例;

MQClientInstance實(shí)例并不是每次都會(huì)創(chuàng)建的,它創(chuàng)建出來也會(huì)緩存的MQClientManager中,不過根據(jù)源碼來看的話,每次創(chuàng)建Producer都會(huì)對(duì)應(yīng)創(chuàng)建一個(gè)新的MQClientInstance實(shí)例,所以一般情況下不建議一個(gè)應(yīng)用服務(wù)中重復(fù)創(chuàng)建Producer;

最終start()方法的關(guān)鍵實(shí)現(xiàn)邏輯還是需要進(jìn)入MQClientInstance.start()中:

    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果namesrv地址為null,那么就需要自己找namesrv地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 開啟一個(gè)請(qǐng)求響應(yīng)渠道,沒猜錯(cuò)的話,應(yīng)該是netty實(shí)現(xiàn)的
                    this.mQClientAPIImpl.start();
                    // 開啟定時(shí)任務(wù)
                    this.startScheduledTask();
                    // 開啟拉消息服務(wù)
                    this.pullMessageService.start();
                    // 開啟負(fù)載均衡服務(wù)
                    this.rebalanceService.start();
                    // 再開啟一個(gè)默認(rèn)生產(chǎn)者,這個(gè)生產(chǎn)者不需要啟動(dòng)MQClientInstance實(shí)例
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

看樣子,這才是start()方法真正要做的事情:

1.找namesrv地址,應(yīng)該是后面需要使用namesrv地址查詢對(duì)應(yīng)的broker;

2.開啟Netty客戶端的初始化,包括與namesrv建立信道;另外開啟兩個(gè)定時(shí)任務(wù),一個(gè)清除列表中過期的請(qǐng)求,第二個(gè)就是篩選可用的namesrv服務(wù);

3.開啟一些定時(shí)任務(wù);包括如果沒有設(shè)置namesrv地址的話,會(huì)從指定站點(diǎn)拉namesrv地址;清除下線broker并發(fā)送心跳給所有的broker等工作;

4.因?yàn)楫?dāng)前是生產(chǎn)者,所以pullMessageService很快就結(jié)束;

5.生產(chǎn)者不需要做負(fù)載均衡,所以rebalanceService很快也結(jié)束;

6.給默認(rèn)創(chuàng)建的生產(chǎn)者執(zhí)行一下start()方法,其實(shí)啥也沒做;

上述大多數(shù)任務(wù)都是給消費(fèi)者使用的,作為生產(chǎn)者,唯一起作用的就是前三步,查找namesrv地址、第二步與namesrv建立通信以及第三步對(duì)broker的一些定時(shí)清理工作;不過沒有發(fā)生消息之前,是不會(huì)從遠(yuǎn)程獲取任何數(shù)據(jù)的。所以綜上所述,start()方法里面只做了以下兩件事情:

1.與namesrv建立通信渠道,它甚至都沒有從namesrv獲取任何數(shù)據(jù);

2.啟動(dòng)一些定時(shí)任務(wù),包括清理下線的broker;

小結(jié)

雖然在生產(chǎn)者中,start()方法里面真正做的事情比較少,但是卻是非常有必要的。發(fā)送消息之前,我們沒有使用start()方法,導(dǎo)致消息發(fā)送失敗,是因?yàn)樯a(chǎn)者與namesrv之間的通信渠道沒有建立。

以上就是RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ調(diào)用start發(fā)送消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 使用@Cacheable緩存解決雙冒號(hào)::的問題

    使用@Cacheable緩存解決雙冒號(hào)::的問題

    這篇文章主要介紹了使用@Cacheable緩存解決雙冒號(hào)::的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • @RefreshScope在Quartz 觸發(fā)器類導(dǎo)致異常問題解決分析

    @RefreshScope在Quartz 觸發(fā)器類導(dǎo)致異常問題解決分析

    這篇文章主要為大家介紹了@RefreshScope在Quartz 觸發(fā)器類導(dǎo)致異常問題解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-02-02
  • Java并發(fā)Timer源碼分析

    Java并發(fā)Timer源碼分析

    這篇文章講述了java并發(fā)編程的相關(guān)知識(shí)點(diǎn),并通過Timer源碼分析更深入的講解了java并發(fā)編程。
    2018-07-07
  • Springboot整合Gson報(bào)錯(cuò)問題解決過程

    Springboot整合Gson報(bào)錯(cuò)問題解決過程

    這篇文章主要介紹了Springboot整合Gson報(bào)錯(cuò)問題解決過程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Java使用自定義注解+反射實(shí)現(xiàn)字典轉(zhuǎn)換代碼實(shí)例

    Java使用自定義注解+反射實(shí)現(xiàn)字典轉(zhuǎn)換代碼實(shí)例

    這篇文章主要介紹了Java使用自定義注解+反射實(shí)現(xiàn)字典轉(zhuǎn)換代碼實(shí)例,注解是一種能被添加到j(luò)ava代碼中的元數(shù)據(jù),類、方法、變量、參數(shù)和包都可以用注解來修飾,注解對(duì)于它所修飾的代碼并沒有直接的影響,需要的朋友可以參考下
    2023-09-09
  • MyBatis如何使用(二)

    MyBatis如何使用(二)

    這篇文章主要介紹了MyBatis如何使用(二)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2016-07-07
  • Java正則表達(dá)式匹配字符串并提取中間值的方法實(shí)例

    Java正則表達(dá)式匹配字符串并提取中間值的方法實(shí)例

    正則表達(dá)式常用于字符串處理、表單驗(yàn)證等場(chǎng)合,實(shí)用高效,下面這篇文章主要給大家介紹了關(guān)于Java正則表達(dá)式匹配字符串并提取中間值的相關(guān)資料,需要的朋友可以參考下
    2022-06-06
  • Java簡(jiǎn)化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計(jì)模式

    Java簡(jiǎn)化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計(jì)模式

    Java門面模式是一種結(jié)構(gòu)性設(shè)計(jì)模式,它為復(fù)雜系統(tǒng)提供了一個(gè)簡(jiǎn)單的接口,使得系統(tǒng)的客戶端能夠更加方便地使用系統(tǒng)功能。門面模式通過封裝復(fù)雜的子系統(tǒng),隱藏系統(tǒng)的實(shí)現(xiàn)細(xì)節(jié),提高了系統(tǒng)的易用性和靈活性
    2023-04-04
  • SpringBoot啟動(dòng)security后如何關(guān)閉彈出的/login頁(yè)面

    SpringBoot啟動(dòng)security后如何關(guān)閉彈出的/login頁(yè)面

    這篇文章主要介紹了SpringBoot啟動(dòng)security后如何關(guān)閉彈出的login頁(yè)面問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Java字節(jié)碼操縱框架ASM圖文實(shí)例詳解

    Java字節(jié)碼操縱框架ASM圖文實(shí)例詳解

    這篇文章主要為大家介紹了Java字節(jié)碼操縱框架ASM圖文實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-07-07

最新評(píng)論