dubbo服務(wù)注冊(cè)到nacos的過程剖析
前言
前面聊到到了我們的dubbo服務(wù)從redis遷移到nacos注冊(cè)中心,遷移后發(fā)現(xiàn),會(huì)時(shí)不時(shí)的拋一個(gè)異常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了這個(gè)剖析過程,當(dāng)然最后查明異常是我們的SLB網(wǎng)絡(luò)映射問題,和nacos沒有關(guān)系。
- dubbo版本:2.7.4.1
- nacos client版本:1.0.0
- nacos server版本:1.1.3
簡(jiǎn)述過程
- dubbo側(cè):dubbo通過nacos注冊(cè)中心實(shí)現(xiàn),注冊(cè)服務(wù)到nacos,同時(shí)添加心跳任務(wù),心跳任務(wù)每隔5s發(fā)送一次服務(wù)健康心跳。同時(shí)每隔1s查詢nacos服務(wù)列表是否有更新,如果有更新觸發(fā)服務(wù)實(shí)例更新通知,更新dubbo本地服務(wù)列表
- nacos側(cè):nacos接收到心跳后,如果此時(shí)服務(wù)實(shí)例不存在,則新建一個(gè)服務(wù)實(shí)例,如果此時(shí)服務(wù)實(shí)例不健康,則設(shè)置為健康狀態(tài),并主動(dòng)推送狀態(tài)到客戶端。nacos內(nèi)部有一個(gè)檢查服務(wù)狀態(tài)的任務(wù),如果15s沒有健康心跳上報(bào),則設(shè)置服務(wù)實(shí)例不健康,如果30s沒有健康心跳上報(bào),則下線這個(gè)服務(wù)實(shí)例,并推送狀態(tài)到客戶端。
源碼剖析具體實(shí)現(xiàn)
在dubbo的registry包下,針對(duì)服務(wù)注冊(cè)行為定義了四個(gè)接口,所有的服務(wù)注冊(cè)(zookeeper、nacos、redis、etcd等)支持都是這些接口的實(shí)現(xiàn)
- NotifyListener:服務(wù)變更通知監(jiān)聽的接口定義,在實(shí)現(xiàn)注冊(cè)中心時(shí)不需關(guān)心實(shí)現(xiàn),對(duì)接具體監(jiān)聽器往下傳遞這個(gè)實(shí)例就好
- RegistryService:服務(wù)注冊(cè)、取消注冊(cè)、定義、取消訂閱、服務(wù)查找的接口定義,是最核心的一個(gè)接口,包含了注冊(cè)中心實(shí)現(xiàn)的核心功能
- Registry:對(duì)RegistryService、Node的包裝,多了檢測(cè)服務(wù)是否可用,服務(wù)銷毀下線的方法,一般直接實(shí)現(xiàn)Registry接口
- RegistryFactory:通過注冊(cè)中心URL獲取注冊(cè)中心實(shí)現(xiàn)的接口定義,dubbo的spi設(shè)計(jì),針對(duì)每個(gè)具體實(shí)現(xiàn),映射了一個(gè)注冊(cè)中心協(xié)議頭,如nacos實(shí)現(xiàn)對(duì)應(yīng)了nacos:// 新對(duì)接一個(gè)注冊(cè)中心,并不需要直接實(shí)現(xiàn)Registry接口,可直接繼承FailbackRegistry抽象類,實(shí)現(xiàn)相關(guān)的do方法即可。dubbo針對(duì)服務(wù)注冊(cè)的抽象和nacos服務(wù)注冊(cè)的抽象非常契合,大部分接口可以直接對(duì)接使用,只有服務(wù)訂閱監(jiān)聽器的定義不一樣,稍微包裝轉(zhuǎn)換下即可,所以實(shí)現(xiàn)起來就非常簡(jiǎn)單了。
服務(wù)注冊(cè)
org.apache.dubbo.registry.nacos.NacosRegistry:152
@Override public void doRegister(URL url) { final String serviceName = getServiceName(url); final Instance instance = createInstance(url); execute(namingService -> namingService.registerInstance(serviceName, instance)); }
dubbo中,所以的服務(wù)都被封裝成了URL,對(duì)應(yīng)nacos中的服務(wù)實(shí)例Instance,所以服務(wù)注冊(cè)時(shí),只需要簡(jiǎn)單的將URL轉(zhuǎn)換成Instance就可以注冊(cè)到nacos中,下面看看namingService中的具體注冊(cè)行為。
com.alibaba.nacos.client.naming.NacosNamingService:283
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
如上代碼,除了注冊(cè)實(shí)例外,還判斷了instance實(shí)例是否是臨時(shí)實(shí)例,如果是臨時(shí)實(shí)例,則加入了beatReactor的心跳列表。這是因?yàn)?,nacos將服務(wù)分成了兩類,一類是臨時(shí)性的服務(wù), 像dubbo、spring cloud這種,需要通過心跳來保活,如果心跳沒有及時(shí)發(fā)送,服務(wù)端會(huì)自動(dòng)下線這個(gè)instance。一類是永久性服務(wù),如數(shù)據(jù)庫、緩存服務(wù)等, 客戶端不會(huì)也沒法發(fā)送心跳,這類服務(wù)就由服務(wù)端通過TCP端口檢測(cè)等方式反向探活。下面看看臨時(shí)實(shí)例的心跳是怎么發(fā)送的。
com.alibaba.nacos.client.naming.NacosNamingService:104
private int initClientBeatThreadCount(Properties properties) { if (properties == null) { return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT; } return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT), UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT); } //可通過配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10設(shè)置維護(hù)心跳的線程數(shù)
先看一段獲取心跳beatReactor線程池線程數(shù)量的初始化代碼,傳入的Properties是配置dubbo注冊(cè)中心時(shí)的參數(shù)列表,如果配置了namingClientBeatThreadCount,則取配置的值, 默認(rèn)維護(hù)心跳的線程池大小為:如果是單核的,就是一個(gè)線程,多核的就CPU核心數(shù)一半的線程。繼續(xù)心跳邏輯
com.alibaba.nacos.client.naming.beat.BeatReactor:78
class BeatProcessor implements Runnable { @Override public void run() { try { for (Map.Entry entry : dom2Beat.entrySet()) { BeatInfo beatInfo = entry.getValue(); if (beatInfo.isScheduled()) { continue; } beatInfo.setScheduled(true); executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); } } catch (Exception e) { NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e); } finally { executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS); } } } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { long result = serverProxy.sendBeat(beatInfo); beatInfo.setScheduled(false); if (result > 0) { clientBeatInterval = result; } } }
dom2Beat是一個(gè)存放需要心跳上報(bào)的臨時(shí)實(shí)例的map容器,NacosNamingService.registerInstance中通過判斷臨時(shí)節(jié)點(diǎn)添加到心跳列表的邏輯, 最終添加到了這個(gè)map里。BeatReactor初始化后會(huì)觸發(fā)BeatProcessor線程的調(diào)用,BeatProcessor線程是一個(gè)不斷自我觸發(fā)調(diào)用的線程,前一次 心跳上報(bào)邏輯執(zhí)行完后,間隔5S觸發(fā)下一次心跳上報(bào)。間隔時(shí)間由變量clientBeatInterval控制,受nacos服務(wù)端返回的心跳結(jié)果值的影響 心跳間隔可能會(huì)改變,nacos服務(wù)端從instance的元數(shù)據(jù)中尋找key為preserved.heart.beat.interval的值返回,如果為空則返回5S。 這個(gè)功能在dubbo2.7.4.1的版本里還不成熟,只能通過注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能夠直接在注冊(cè)中心的url參數(shù)配置就算成熟了,所以這個(gè)功能暫時(shí)不推薦使用,可以作為實(shí)驗(yàn)功能試試。
服務(wù)訂閱
org.apache.dubbo.registry.nacos.NacosRegistry:399
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) throws NacosException { if (!nacosListeners.containsKey(serviceName)) { EventListener eventListener = event -> { if (event instanceof NamingEvent) { NamingEvent e = (NamingEvent) event; notifySubscriber(url, listener, e.getInstances()); } }; namingService.subscribe(serviceName, eventListener); nacosListeners.put(serviceName, eventListener); } }
nacos的服務(wù)監(jiān)聽是EventListener,所以dubbo的服務(wù)訂閱只需要將NotifyListener的處理包裝進(jìn)onEvent中處理即可, 通過namingService.subscribe添加nacos的訂閱。最終EventListener對(duì)象會(huì)被添加到事件調(diào)度器的監(jiān)聽器列表中,見如下代碼:
com.alibaba.nacos.client.naming.core.EventDispatcher:
public class EventDispatcher { private ExecutorService executor = null; private BlockingQueuechangedServices = new LinkedBlockingQueue(); private ConcurrentMap observerMap = new ConcurrentHashMap(); public EventDispatcher() { executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); executor.execute(new Notifier()); } public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); Listobservers = Collections.synchronizedList(new ArrayList()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); } public void removeListener(String serviceName, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map"); Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters)); if (observers != null) { Iteratoriter = observers.iterator(); while (iter.hasNext()) { EventListener oldListener = iter.next(); if (oldListener.equals(listener)) { iter.remove(); } } if (observers.isEmpty()) { observerMap.remove(ServiceInfo.getKey(serviceName, clusters)); } } } public ListgetSubscribeServices() { ListserviceInfos = new ArrayList(); for (String key : observerMap.keySet()) { serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; } public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo); } private class Notifier implements Runnable { @Override public void run() { while (true) { ServiceInfo serviceInfo = null; try { serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null) { continue; } try { Listlisteners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { Listhosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } } public void setExecutor(ExecutorService executor) { ExecutorService oldExecutor = this.executor; this.executor = executor; oldExecutor.shutdown(); } }
EventDispatcher中維護(hù)了一個(gè)監(jiān)聽器列表observerMap,同時(shí)維護(hù)了一個(gè)事件變更的阻塞隊(duì)列changedServices,監(jiān)聽調(diào)度器初始化后,會(huì)觸發(fā)一個(gè)線程消費(fèi)阻塞隊(duì)列的 數(shù)據(jù),當(dāng)注冊(cè)服務(wù)發(fā)生變化時(shí),將變更數(shù)據(jù)入隊(duì),就能喚醒線程更新dubbo內(nèi)存中的服務(wù)列表了。上面已經(jīng)聊到,nacos client會(huì)以1s的頻次拉取注冊(cè)的實(shí)例,當(dāng)拉取到的實(shí)例和本地內(nèi)存的 有出入時(shí),就會(huì)觸發(fā)入隊(duì)操作,如:
com.alibaba.nacos.client.naming.core.HostReactor:296
public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private String clusters; private String serviceName; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } @Override public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateServiceNow(serviceName, clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateServiceNow(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS); lastRefTime = serviceObj.getLastRefTime(); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } } }
DEFAULT_DELAY值為1s,同時(shí),nacos也會(huì)主動(dòng)的推送數(shù)據(jù)變更事件,當(dāng)遇到nacos主動(dòng)推送時(shí),serviceInfoMap中的serviceObj會(huì)被更新,那么下次 nacos client拉取的時(shí)間間隔會(huì)被設(shè)置成10S之后,具體的和本地列表比對(duì)的邏輯都在updateServiceNow方法內(nèi),這里就不展開講述了。
結(jié)語
dubbo注冊(cè)服務(wù)到nacos以及訂閱服務(wù)是一個(gè)比較復(fù)雜的過程,在剖析的過程中,帶著疑問去看源碼會(huì)有事半功倍的效果,比如博主在看源碼前, 首先是為了尋找nacos的心跳異常,然后對(duì)nacos如何實(shí)現(xiàn)事件監(jiān)聽比較好奇。然后層層剖析漸進(jìn)明朗恍然大悟。當(dāng)然在剖析dubbo注冊(cè)服務(wù)到nacos時(shí),也需要了解 nacos服務(wù)端的處理邏輯,nacos服務(wù)端非常核心的兩個(gè)類ClientBeatCheckTask、ClientBeatProcessor,包含了心跳處理、健康檢測(cè)和事件推送的邏輯, 有興趣可以看看
以上就是dubbo服務(wù)注冊(cè)到nacos的過程剖析的詳細(xì)內(nèi)容,更多關(guān)于dubbo服務(wù)注冊(cè)到nacos的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 解決dubbo啟動(dòng)報(bào)服務(wù)注冊(cè)失敗Failed?to?register?dubbo
- springboot發(fā)布dubbo服務(wù)注冊(cè)到nacos實(shí)現(xiàn)方式
- springboot整合Dubbo與Feign的實(shí)現(xiàn)?(無注冊(cè)中心)
- dubbo服務(wù)使用redis注冊(cè)中心的系列異常解決
- 升級(jí)dubbo2.7.4.1版本平滑遷移到注冊(cè)中心nacos
- Dubbo無法訪問遠(yuǎn)程Zookeeper已注冊(cè)服務(wù)的問題解決方案
- 從dubbo zookeeper注冊(cè)地址提取出zookeeper地址的方法
- 解決Dubbo應(yīng)用啟動(dòng)注冊(cè)ZK獲取IP慢的原因之一
相關(guān)文章
spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式
這篇文章主要介紹了spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12解決IDEA?JDK9沒有module-info.java的問題
這篇文章主要介紹了解決IDEA?JDK9沒有module-info.java的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題
這篇文章主要介紹了解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢
這篇文章主要介紹了Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11簡(jiǎn)介Java的Spring框架的體系結(jié)構(gòu)以及安裝配置
這篇文章主要介紹了Java的Spring框架的體系結(jié)構(gòu)以及安裝配置,Spring框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12