欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ NameServer保障數(shù)據(jù)一致性實(shí)現(xiàn)方法講解

 更新時(shí)間:2022年12月12日 08:55:39   作者:小王曾是少年  
這篇文章主要介紹了RocketMQ NameServer保障數(shù)據(jù)一致性實(shí)現(xiàn)方法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

路由注冊(cè)角度

對(duì)于ZooKeeper這樣的強(qiáng)一致性組件,使用主從分離的架構(gòu),數(shù)據(jù)只寫到主節(jié)點(diǎn),主從之間的數(shù)據(jù)同步通過(guò)內(nèi)部機(jī)制來(lái)進(jìn)行數(shù)據(jù)復(fù)制。

對(duì)于RocketMQ來(lái)說(shuō),NameServer節(jié)點(diǎn)之間是互相不進(jìn)行通信的,這樣也就無(wú)法進(jìn)行數(shù)據(jù)復(fù)制。RocketMQ采用的機(jī)制是:在Broker節(jié)點(diǎn)啟動(dòng)的時(shí)候,輪詢所有的NameServer節(jié)點(diǎn),并與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接,發(fā)送注冊(cè)請(qǐng)求。

相應(yīng)的,NameServer節(jié)點(diǎn)內(nèi)部也會(huì)維護(hù)一個(gè)Broker列表,用來(lái)動(dòng)態(tài)存儲(chǔ)Broker的信息,做服務(wù)發(fā)現(xiàn)。

與此同時(shí),Broker使用心跳機(jī)制來(lái)向所有NameServer節(jié)點(diǎn)證明自己是存活的,即定期發(fā)送心跳包;收到心跳包之后,NameServer節(jié)點(diǎn)會(huì)更新這個(gè)Broker的最新存活時(shí)間。

注意: NameServer節(jié)點(diǎn)在處理心跳包時(shí),存在多個(gè)請(qǐng)求同時(shí)處理同一張表的情況,為了保證并發(fā)安全性,RocketMQ引入了讀寫鎖(ReadWriteLock),保證了多個(gè)Producer并發(fā)讀取路由信息不受影響,但同一時(shí)刻只能處理一個(gè)Broker發(fā)來(lái)的心跳包,這也符合讀多寫少的經(jīng)典場(chǎng)景。

路由剔除

正常情況下:

如果Broker下線,則會(huì)與NameServer斷開長(zhǎng)連接,底層基于Netty的通道關(guān)閉監(jiān)聽(tīng)器會(huì)監(jiān)聽(tīng)到連接斷開事件,然后將這個(gè)Broker信息剔除。

異常情況下:

NameServer有一個(gè)周期為10s的定時(shí)任務(wù),定期掃描Broker表,如果超過(guò)120s沒(méi)有收到某個(gè)Broker的心跳包,則會(huì)判定其失效并移除。

對(duì)于日常運(yùn)維的需求,RocketMQ提供了優(yōu)雅剔除路由信息的方式,即可以先禁止Broker的寫權(quán)限,這樣發(fā)送到這個(gè)Broker的請(qǐng)求都會(huì)收到一個(gè)NO_PERMISSION的響應(yīng),客戶端自動(dòng)重試其他的Broker

路由發(fā)現(xiàn)

生產(chǎn)者視角:

一般是在發(fā)送第一條消息時(shí),才會(huì)根據(jù)TopicNameServer獲取路由信息

消費(fèi)者視角:

訂閱的Topic一般是固定的,所以在啟動(dòng)時(shí)就會(huì)拉取

針對(duì)路由信息可能變化的場(chǎng)景,RocketMQ提供了定時(shí)拉取Topic最新路由信息的機(jī)制,以應(yīng)對(duì)Broker集群發(fā)生變化的場(chǎng)景。

DefaultMQProducerDefaultMQConsumer有一個(gè)pollNameServerInterval的配置項(xiàng),用于指定從NameServer獲取路由信息的周期,其底層依賴MQClientInstance類,MQClientInstance類中的updateTopicRouteInfoFromNameServer方法,可以根據(jù)指定的時(shí)間間隔,周期性地從NameServer里拉取路由信息。在拉取時(shí),會(huì)將當(dāng)前啟動(dòng)的ProducerConsumer需要用到的Topic列表放到一個(gè)集合里,逐個(gè)進(jìn)行更新,源碼如下:

/**
* 更新單個(gè)Topic路由信息
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
    return updateTopicRouteInfoFromNameServer(topic, false, null);
}
/**
* 更新單個(gè)Topic路由信息
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                	// 使用默認(rèn)TopicKey獲取TopicRouteData
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                if (topicRouteData != null) {
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    boolean changed = topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }
                    if (changed) {
                    	// 克隆出一個(gè)實(shí)例cloneTopicRouteData : topicRouteData會(huì)被設(shè)置到下面的publishInfo/subscribeInfo 
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
						// 更新Broker地址相關(guān)信息,當(dāng)某個(gè)Broker心跳超時(shí)后,會(huì)被從brokerAddrTable中移除
                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }
                        // Update Pub info
                        {
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                            publishInfo.setHaveTopicRouterInfo(true);
                            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQProducerInner> entry = it.next();
                                MQProducerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicPublishInfo(topic, publishInfo);
                                }
                            }
                        }
                        // Update sub info
                        {
                            Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQConsumerInner> entry = it.next();
                                MQConsumerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                }
                            }
                        }
                        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                }
            } catch (MQClientException e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
            } catch (RemotingException e) {
                log.error("updateTopicRouteInfoFromNameServer Exception", e);
                throw new IllegalStateException(e);
            } finally {
                this.lockNamesrv.unlock();
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
        }
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }
    return false;
}

當(dāng)Broker宕機(jī)時(shí),還可以通過(guò)客戶端的重試機(jī)制來(lái)解決,避免因?yàn)槎〞r(shí)更新路由信息不及時(shí)導(dǎo)致的服務(wù)宕機(jī)~~

到此這篇關(guān)于RocketMQ NameServer保障數(shù)據(jù)一致性實(shí)現(xiàn)方法講解的文章就介紹到這了,更多相關(guān)RocketMQ NameServer內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中super和this關(guān)鍵字詳解

    Java中super和this關(guān)鍵字詳解

    這篇文章主要介紹了Java中super和this關(guān)鍵字詳解,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-06-06
  • 分布式難題ElasticSearch解決大數(shù)據(jù)量檢索面試

    分布式難題ElasticSearch解決大數(shù)據(jù)量檢索面試

    這篇文章主要為大家介紹了分布式面試難題,ElasticSearch解決大數(shù)據(jù)量檢索的問(wèn)題分析回答,讓面試官無(wú)話可說(shuō),幫助大家實(shí)現(xiàn)面試開薪自由
    2022-03-03
  • java maven中如何引入自己的lib

    java maven中如何引入自己的lib

    在JavaMaven項(xiàng)目中引入自己的庫(kù)可以簡(jiǎn)化為幾個(gè)步驟:首先,確保庫(kù)以JAR格式存在或打包成JAR;其次,將JAR文件放置在項(xiàng)目目錄或安裝到本地Maven倉(cāng)庫(kù);最后,在pom.xml中添加依賴,這樣做可以使項(xiàng)目更加模塊化,便于管理和維護(hù),感興趣的朋友跟隨小編一起看看吧
    2024-09-09
  • Java Socket編程實(shí)例(三)- TCP服務(wù)端線程池

    Java Socket編程實(shí)例(三)- TCP服務(wù)端線程池

    這篇文章主要講解Java Socket編程中TCP服務(wù)端線程池的實(shí)例,希望能給大家做一個(gè)參考。
    2016-06-06
  • Springboot AOP開發(fā)教程

    Springboot AOP開發(fā)教程

    AOP是OOP的延續(xù),是軟件開發(fā)中的一個(gè)熱點(diǎn),也是Spring框架中的一個(gè)重要內(nèi)容,是函數(shù)式編程的一種衍生范型,本文給大家介紹Springboot AOP開發(fā)教程,感興趣的朋友跟隨小編一起看看吧
    2024-03-03
  • SpringBoot(cloud)自動(dòng)裝配bean找不到類型的問(wèn)題

    SpringBoot(cloud)自動(dòng)裝配bean找不到類型的問(wèn)題

    這篇文章主要介紹了SpringBoot(cloud)自動(dòng)裝配bean找不到類型的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • springboot整合swagger3報(bào)Unable to infer base url錯(cuò)誤問(wèn)題

    springboot整合swagger3報(bào)Unable to infer base&nbs

    這篇文章主要介紹了springboot整合swagger3報(bào)Unable to infer base url錯(cuò)誤問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Spring Bean常用依賴注入方式詳解

    Spring Bean常用依賴注入方式詳解

    這篇文章主要介紹了Spring Bean常用三種依賴注入方式詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 一段眼睛跟著鼠標(biāo)轉(zhuǎn)動(dòng)的跟蹤眼代碼

    一段眼睛跟著鼠標(biāo)轉(zhuǎn)動(dòng)的跟蹤眼代碼

    java實(shí)現(xiàn)的眼睛跟著鼠標(biāo)轉(zhuǎn)動(dòng)的跟蹤眼代碼
    2008-10-10
  • java連接mysql數(shù)據(jù)庫(kù)及測(cè)試是否連接成功的方法

    java連接mysql數(shù)據(jù)庫(kù)及測(cè)試是否連接成功的方法

    這篇文章主要介紹了java連接mysql數(shù)據(jù)庫(kù)及測(cè)試是否連接成功的方法,結(jié)合完整實(shí)例形式分析了java基于jdbc連接mysql數(shù)據(jù)庫(kù)并返回連接狀態(tài)的具體步驟與相關(guān)操作技巧,需要的朋友可以參考下
    2017-09-09

最新評(píng)論