欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

dubbo服務(wù)注冊(cè)到nacos的過程剖析

 更新時(shí)間:2022年02月24日 10:17:44   作者:kl  
這篇文章主要為大家介紹了dubbo服務(wù)注冊(cè)到nacos的過程剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職極限

前言

前面聊到到了我們的dubbo服務(wù)從redis遷移到nacos注冊(cè)中心,遷移后發(fā)現(xiàn),會(huì)時(shí)不時(shí)的拋一個(gè)異常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了這個(gè)剖析過程,當(dāng)然最后查明異常是我們的SLB網(wǎng)絡(luò)映射問題,和nacos沒有關(guān)系。

  • dubbo版本:2.7.4.1
  • nacos client版本:1.0.0
  • nacos server版本:1.1.3

簡(jiǎn)述過程

  • dubbo側(cè):dubbo通過nacos注冊(cè)中心實(shí)現(xiàn),注冊(cè)服務(wù)到nacos,同時(shí)添加心跳任務(wù),心跳任務(wù)每隔5s發(fā)送一次服務(wù)健康心跳。同時(shí)每隔1s查詢nacos服務(wù)列表是否有更新,如果有更新觸發(fā)服務(wù)實(shí)例更新通知,更新dubbo本地服務(wù)列表
  • nacos側(cè):nacos接收到心跳后,如果此時(shí)服務(wù)實(shí)例不存在,則新建一個(gè)服務(wù)實(shí)例,如果此時(shí)服務(wù)實(shí)例不健康,則設(shè)置為健康狀態(tài),并主動(dòng)推送狀態(tài)到客戶端。nacos內(nèi)部有一個(gè)檢查服務(wù)狀態(tài)的任務(wù),如果15s沒有健康心跳上報(bào),則設(shè)置服務(wù)實(shí)例不健康,如果30s沒有健康心跳上報(bào),則下線這個(gè)服務(wù)實(shí)例,并推送狀態(tài)到客戶端。

源碼剖析具體實(shí)現(xiàn)

在dubbo的registry包下,針對(duì)服務(wù)注冊(cè)行為定義了四個(gè)接口,所有的服務(wù)注冊(cè)(zookeeper、nacos、redis、etcd等)支持都是這些接口的實(shí)現(xiàn)

  • NotifyListener:服務(wù)變更通知監(jiān)聽的接口定義,在實(shí)現(xiàn)注冊(cè)中心時(shí)不需關(guān)心實(shí)現(xiàn),對(duì)接具體監(jiān)聽器往下傳遞這個(gè)實(shí)例就好
  • RegistryService:服務(wù)注冊(cè)、取消注冊(cè)、定義、取消訂閱、服務(wù)查找的接口定義,是最核心的一個(gè)接口,包含了注冊(cè)中心實(shí)現(xiàn)的核心功能
  • Registry:對(duì)RegistryService、Node的包裝,多了檢測(cè)服務(wù)是否可用,服務(wù)銷毀下線的方法,一般直接實(shí)現(xiàn)Registry接口
  • RegistryFactory:通過注冊(cè)中心URL獲取注冊(cè)中心實(shí)現(xiàn)的接口定義,dubbo的spi設(shè)計(jì),針對(duì)每個(gè)具體實(shí)現(xiàn),映射了一個(gè)注冊(cè)中心協(xié)議頭,如nacos實(shí)現(xiàn)對(duì)應(yīng)了nacos:// 新對(duì)接一個(gè)注冊(cè)中心,并不需要直接實(shí)現(xiàn)Registry接口,可直接繼承FailbackRegistry抽象類,實(shí)現(xiàn)相關(guān)的do方法即可。dubbo針對(duì)服務(wù)注冊(cè)的抽象和nacos服務(wù)注冊(cè)的抽象非常契合,大部分接口可以直接對(duì)接使用,只有服務(wù)訂閱監(jiān)聽器的定義不一樣,稍微包裝轉(zhuǎn)換下即可,所以實(shí)現(xiàn)起來就非常簡(jiǎn)單了。

服務(wù)注冊(cè)

org.apache.dubbo.registry.nacos.NacosRegistry:152

@Override
    public void doRegister(URL url) {
        final String serviceName = getServiceName(url);
        final Instance instance = createInstance(url);
        execute(namingService -> namingService.registerInstance(serviceName, instance));
    }

dubbo中,所以的服務(wù)都被封裝成了URL,對(duì)應(yīng)nacos中的服務(wù)實(shí)例Instance,所以服務(wù)注冊(cè)時(shí),只需要簡(jiǎn)單的將URL轉(zhuǎn)換成Instance就可以注冊(cè)到nacos中,下面看看namingService中的具體注冊(cè)行為。

com.alibaba.nacos.client.naming.NacosNamingService:283

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

如上代碼,除了注冊(cè)實(shí)例外,還判斷了instance實(shí)例是否是臨時(shí)實(shí)例,如果是臨時(shí)實(shí)例,則加入了beatReactor的心跳列表。這是因?yàn)?,nacos將服務(wù)分成了兩類,一類是臨時(shí)性的服務(wù), 像dubbo、spring cloud這種,需要通過心跳來保活,如果心跳沒有及時(shí)發(fā)送,服務(wù)端會(huì)自動(dòng)下線這個(gè)instance。一類是永久性服務(wù),如數(shù)據(jù)庫、緩存服務(wù)等, 客戶端不會(huì)也沒法發(fā)送心跳,這類服務(wù)就由服務(wù)端通過TCP端口檢測(cè)等方式反向探活。下面看看臨時(shí)實(shí)例的心跳是怎么發(fā)送的。

com.alibaba.nacos.client.naming.NacosNamingService:104

private int initClientBeatThreadCount(Properties properties) {
        if (properties == null) {
            return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
        }
        return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
            UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }
    //可通過配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10設(shè)置維護(hù)心跳的線程數(shù)

先看一段獲取心跳beatReactor線程池線程數(shù)量的初始化代碼,傳入的Properties是配置dubbo注冊(cè)中心時(shí)的參數(shù)列表,如果配置了namingClientBeatThreadCount,則取配置的值, 默認(rèn)維護(hù)心跳的線程池大小為:如果是單核的,就是一個(gè)線程,多核的就CPU核心數(shù)一半的線程。繼續(xù)心跳邏輯

com.alibaba.nacos.client.naming.beat.BeatReactor:78

class BeatProcessor implements Runnable {
        @Override
        public void run() {
            try {
                for (Map.Entry entry : dom2Beat.entrySet()) {
                    BeatInfo beatInfo = entry.getValue();
                    if (beatInfo.isScheduled()) {
                        continue;
                    }
                    beatInfo.setScheduled(true);
                    executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
            } finally {
                executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
            }
        }
    }
    class BeatTask implements Runnable {
        BeatInfo beatInfo;
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
        @Override
        public void run() {
            long result = serverProxy.sendBeat(beatInfo);
            beatInfo.setScheduled(false);
            if (result > 0) {
                clientBeatInterval = result;
            }
        }
    }

dom2Beat是一個(gè)存放需要心跳上報(bào)的臨時(shí)實(shí)例的map容器,NacosNamingService.registerInstance中通過判斷臨時(shí)節(jié)點(diǎn)添加到心跳列表的邏輯, 最終添加到了這個(gè)map里。BeatReactor初始化后會(huì)觸發(fā)BeatProcessor線程的調(diào)用,BeatProcessor線程是一個(gè)不斷自我觸發(fā)調(diào)用的線程,前一次 心跳上報(bào)邏輯執(zhí)行完后,間隔5S觸發(fā)下一次心跳上報(bào)。間隔時(shí)間由變量clientBeatInterval控制,受nacos服務(wù)端返回的心跳結(jié)果值的影響 心跳間隔可能會(huì)改變,nacos服務(wù)端從instance的元數(shù)據(jù)中尋找key為preserved.heart.beat.interval的值返回,如果為空則返回5S。 這個(gè)功能在dubbo2.7.4.1的版本里還不成熟,只能通過注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能夠直接在注冊(cè)中心的url參數(shù)配置就算成熟了,所以這個(gè)功能暫時(shí)不推薦使用,可以作為實(shí)驗(yàn)功能試試。

服務(wù)訂閱

org.apache.dubbo.registry.nacos.NacosRegistry:399

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
            throws NacosException {
        if (!nacosListeners.containsKey(serviceName)) {
            EventListener eventListener = event -> {
                if (event instanceof NamingEvent) {
                    NamingEvent e = (NamingEvent) event;
                    notifySubscriber(url, listener, e.getInstances());
                }
            };
            namingService.subscribe(serviceName, eventListener);
            nacosListeners.put(serviceName, eventListener);
        }
    }

nacos的服務(wù)監(jiān)聽是EventListener,所以dubbo的服務(wù)訂閱只需要將NotifyListener的處理包裝進(jìn)onEvent中處理即可, 通過namingService.subscribe添加nacos的訂閱。最終EventListener對(duì)象會(huì)被添加到事件調(diào)度器的監(jiān)聽器列表中,見如下代碼:

com.alibaba.nacos.client.naming.core.EventDispatcher:

public class EventDispatcher {
    private ExecutorService executor = null;
    private BlockingQueuechangedServices = new LinkedBlockingQueue();
    private ConcurrentMap observerMap = new ConcurrentHashMap();
    public EventDispatcher() {
        executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                thread.setDaemon(true);
                return thread;
            }
        });
        executor.execute(new Notifier());
    }
    public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
        NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
        Listobservers = Collections.synchronizedList(new ArrayList());
        observers.add(listener);
        observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
        if (observers != null) {
            observers.add(listener);
        }
        serviceChanged(serviceInfo);
    }
    public void removeListener(String serviceName, String clusters, EventListener listener) {
        NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
        Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
        if (observers != null) {
            Iteratoriter = observers.iterator();
            while (iter.hasNext()) {
                EventListener oldListener = iter.next();
                if (oldListener.equals(listener)) {
                    iter.remove();
                }
            }
            if (observers.isEmpty()) {
                observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
            }
        }
    }
    public ListgetSubscribeServices() {
        ListserviceInfos = new ArrayList();
        for (String key : observerMap.keySet()) {
            serviceInfos.add(ServiceInfo.fromKey(key));
        }
        return serviceInfos;
    }
    public void serviceChanged(ServiceInfo serviceInfo) {
        if (serviceInfo == null) {
            return;
        }
        changedServices.add(serviceInfo);
    }
    private class Notifier implements Runnable {
        @Override
        public void run() {
            while (true) {
                ServiceInfo serviceInfo = null;
                try {
                    serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                } catch (Exception ignore) {
                }
                if (serviceInfo == null) {
                    continue;
                }
                try {
                    Listlisteners = observerMap.get(serviceInfo.getKey());
                    if (!CollectionUtils.isEmpty(listeners)) {
                        for (EventListener listener : listeners) {
                            Listhosts = Collections.unmodifiableList(serviceInfo.getHosts());
                            listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
                        }
                    }
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] notify error for service: "
                        + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
                }
            }
        }
    }
    public void setExecutor(ExecutorService executor) {
        ExecutorService oldExecutor = this.executor;
        this.executor = executor;
        oldExecutor.shutdown();
    }
}

EventDispatcher中維護(hù)了一個(gè)監(jiān)聽器列表observerMap,同時(shí)維護(hù)了一個(gè)事件變更的阻塞隊(duì)列changedServices,監(jiān)聽調(diào)度器初始化后,會(huì)觸發(fā)一個(gè)線程消費(fèi)阻塞隊(duì)列的 數(shù)據(jù),當(dāng)注冊(cè)服務(wù)發(fā)生變化時(shí),將變更數(shù)據(jù)入隊(duì),就能喚醒線程更新dubbo內(nèi)存中的服務(wù)列表了。上面已經(jīng)聊到,nacos client會(huì)以1s的頻次拉取注冊(cè)的實(shí)例,當(dāng)拉取到的實(shí)例和本地內(nèi)存的 有出入時(shí),就會(huì)觸發(fā)入隊(duì)操作,如:

com.alibaba.nacos.client.naming.core.HostReactor:296

public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private String clusters;
        private String serviceName;
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }
        @Override
        public void run() {
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }
                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            }
        }
    }

DEFAULT_DELAY值為1s,同時(shí),nacos也會(huì)主動(dòng)的推送數(shù)據(jù)變更事件,當(dāng)遇到nacos主動(dòng)推送時(shí),serviceInfoMap中的serviceObj會(huì)被更新,那么下次 nacos client拉取的時(shí)間間隔會(huì)被設(shè)置成10S之后,具體的和本地列表比對(duì)的邏輯都在updateServiceNow方法內(nèi),這里就不展開講述了。

結(jié)語

dubbo注冊(cè)服務(wù)到nacos以及訂閱服務(wù)是一個(gè)比較復(fù)雜的過程,在剖析的過程中,帶著疑問去看源碼會(huì)有事半功倍的效果,比如博主在看源碼前, 首先是為了尋找nacos的心跳異常,然后對(duì)nacos如何實(shí)現(xiàn)事件監(jiān)聽比較好奇。然后層層剖析漸進(jìn)明朗恍然大悟。當(dāng)然在剖析dubbo注冊(cè)服務(wù)到nacos時(shí),也需要了解 nacos服務(wù)端的處理邏輯,nacos服務(wù)端非常核心的兩個(gè)類ClientBeatCheckTask、ClientBeatProcessor,包含了心跳處理、健康檢測(cè)和事件推送的邏輯, 有興趣可以看看

以上就是dubbo服務(wù)注冊(cè)到nacos的過程剖析的詳細(xì)內(nèi)容,更多關(guān)于dubbo服務(wù)注冊(cè)到nacos的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式

    spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式

    這篇文章主要介紹了spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • 解決IDEA?JDK9沒有module-info.java的問題

    解決IDEA?JDK9沒有module-info.java的問題

    這篇文章主要介紹了解決IDEA?JDK9沒有module-info.java的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • mybatis框架的xml映射文件常用查詢指南

    mybatis框架的xml映射文件常用查詢指南

    這篇文章主要給大家介紹了關(guān)于mybatis框架的xml映射文件常用查詢的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Java的Character類詳解

    Java的Character類詳解

    在實(shí)際開發(fā)過程中,我們經(jīng)常會(huì)遇到需要使用對(duì)象,而不是內(nèi)置數(shù)據(jù)類型的情況。為了解決這個(gè)問題,Java語言為內(nèi)置數(shù)據(jù)類型char提供了包裝類Character類。本文詳細(xì)介紹了Java的Character類,感興趣的同學(xué)可以參考閱讀
    2023-04-04
  • maven依賴版本沒有按照最短路徑原則生效的解決方案

    maven依賴版本沒有按照最短路徑原則生效的解決方案

    這篇文章主要介紹了maven依賴版本沒有生效的解決方案,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下
    2021-01-01
  • 解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題

    解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題

    這篇文章主要介紹了解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-01-01
  • Java webSerivce的使用看完你就明白了

    Java webSerivce的使用看完你就明白了

    因?yàn)榍岸螘r(shí)間,需要使用到webService來調(diào)用公司的其他系統(tǒng)api接口,但是請(qǐng)求方式和我熟知的http請(qǐng)求不一樣,是基于soap協(xié)議來傳輸xml數(shù)據(jù)格式,請(qǐng)求的參數(shù)極其復(fù)雜,需要封裝多層xml數(shù)據(jù)格式,并且我不知道對(duì)方的api接口是什么語言,甚至不知道他們存在于什么平臺(tái)
    2022-03-03
  • Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢

    Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢

    這篇文章主要介紹了Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • 簡(jiǎn)介Java的Spring框架的體系結(jié)構(gòu)以及安裝配置

    簡(jiǎn)介Java的Spring框架的體系結(jié)構(gòu)以及安裝配置

    這篇文章主要介紹了Java的Spring框架的體系結(jié)構(gòu)以及安裝配置,Spring框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下
    2015-12-12
  • java8中的HashMap原理詳解

    java8中的HashMap原理詳解

    這篇文章主要介紹了java8中的HashMap原理詳解,HashMap是日常開發(fā)中非常常用的容器,HashMap實(shí)現(xiàn)了Map接口,底層的實(shí)現(xiàn)原理是哈希表,HashMap不是一個(gè)線程安全的容器,需要的朋友可以參考下
    2023-09-09

最新評(píng)論