dubbo服務(wù)注冊到nacos的過程剖析
前言
前面聊到到了我們的dubbo服務(wù)從redis遷移到nacos注冊中心,遷移后發(fā)現(xiàn),會時不時的拋一個異常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了這個剖析過程,當(dāng)然最后查明異常是我們的SLB網(wǎng)絡(luò)映射問題,和nacos沒有關(guān)系。
- dubbo版本:2.7.4.1
- nacos client版本:1.0.0
- nacos server版本:1.1.3
簡述過程
- dubbo側(cè):dubbo通過nacos注冊中心實(shí)現(xiàn),注冊服務(wù)到nacos,同時添加心跳任務(wù),心跳任務(wù)每隔5s發(fā)送一次服務(wù)健康心跳。同時每隔1s查詢nacos服務(wù)列表是否有更新,如果有更新觸發(fā)服務(wù)實(shí)例更新通知,更新dubbo本地服務(wù)列表
- nacos側(cè):nacos接收到心跳后,如果此時服務(wù)實(shí)例不存在,則新建一個服務(wù)實(shí)例,如果此時服務(wù)實(shí)例不健康,則設(shè)置為健康狀態(tài),并主動推送狀態(tài)到客戶端。nacos內(nèi)部有一個檢查服務(wù)狀態(tài)的任務(wù),如果15s沒有健康心跳上報(bào),則設(shè)置服務(wù)實(shí)例不健康,如果30s沒有健康心跳上報(bào),則下線這個服務(wù)實(shí)例,并推送狀態(tài)到客戶端。
源碼剖析具體實(shí)現(xiàn)
在dubbo的registry包下,針對服務(wù)注冊行為定義了四個接口,所有的服務(wù)注冊(zookeeper、nacos、redis、etcd等)支持都是這些接口的實(shí)現(xiàn)
- NotifyListener:服務(wù)變更通知監(jiān)聽的接口定義,在實(shí)現(xiàn)注冊中心時不需關(guān)心實(shí)現(xiàn),對接具體監(jiān)聽器往下傳遞這個實(shí)例就好
- RegistryService:服務(wù)注冊、取消注冊、定義、取消訂閱、服務(wù)查找的接口定義,是最核心的一個接口,包含了注冊中心實(shí)現(xiàn)的核心功能
- Registry:對RegistryService、Node的包裝,多了檢測服務(wù)是否可用,服務(wù)銷毀下線的方法,一般直接實(shí)現(xiàn)Registry接口
- RegistryFactory:通過注冊中心URL獲取注冊中心實(shí)現(xiàn)的接口定義,dubbo的spi設(shè)計(jì),針對每個具體實(shí)現(xiàn),映射了一個注冊中心協(xié)議頭,如nacos實(shí)現(xiàn)對應(yīng)了nacos:// 新對接一個注冊中心,并不需要直接實(shí)現(xiàn)Registry接口,可直接繼承FailbackRegistry抽象類,實(shí)現(xiàn)相關(guān)的do方法即可。dubbo針對服務(wù)注冊的抽象和nacos服務(wù)注冊的抽象非常契合,大部分接口可以直接對接使用,只有服務(wù)訂閱監(jiān)聽器的定義不一樣,稍微包裝轉(zhuǎn)換下即可,所以實(shí)現(xiàn)起來就非常簡單了。
服務(wù)注冊
org.apache.dubbo.registry.nacos.NacosRegistry:152
@Override
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName, instance));
}dubbo中,所以的服務(wù)都被封裝成了URL,對應(yīng)nacos中的服務(wù)實(shí)例Instance,所以服務(wù)注冊時,只需要簡單的將URL轉(zhuǎn)換成Instance就可以注冊到nacos中,下面看看namingService中的具體注冊行為。
com.alibaba.nacos.client.naming.NacosNamingService:283
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}如上代碼,除了注冊實(shí)例外,還判斷了instance實(shí)例是否是臨時實(shí)例,如果是臨時實(shí)例,則加入了beatReactor的心跳列表。這是因?yàn)椋琻acos將服務(wù)分成了兩類,一類是臨時性的服務(wù), 像dubbo、spring cloud這種,需要通過心跳來?;?,如果心跳沒有及時發(fā)送,服務(wù)端會自動下線這個instance。一類是永久性服務(wù),如數(shù)據(jù)庫、緩存服務(wù)等, 客戶端不會也沒法發(fā)送心跳,這類服務(wù)就由服務(wù)端通過TCP端口檢測等方式反向探活。下面看看臨時實(shí)例的心跳是怎么發(fā)送的。
com.alibaba.nacos.client.naming.NacosNamingService:104
private int initClientBeatThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
}
return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
//可通過配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10設(shè)置維護(hù)心跳的線程數(shù)先看一段獲取心跳beatReactor線程池線程數(shù)量的初始化代碼,傳入的Properties是配置dubbo注冊中心時的參數(shù)列表,如果配置了namingClientBeatThreadCount,則取配置的值, 默認(rèn)維護(hù)心跳的線程池大小為:如果是單核的,就是一個線程,多核的就CPU核心數(shù)一半的線程。繼續(xù)心跳邏輯
com.alibaba.nacos.client.naming.beat.BeatReactor:78
class BeatProcessor implements Runnable {
@Override
public void run() {
try {
for (Map.Entry entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
if (beatInfo.isScheduled()) {
continue;
}
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {
executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
}
}
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) {
clientBeatInterval = result;
}
}
}dom2Beat是一個存放需要心跳上報(bào)的臨時實(shí)例的map容器,NacosNamingService.registerInstance中通過判斷臨時節(jié)點(diǎn)添加到心跳列表的邏輯, 最終添加到了這個map里。BeatReactor初始化后會觸發(fā)BeatProcessor線程的調(diào)用,BeatProcessor線程是一個不斷自我觸發(fā)調(diào)用的線程,前一次 心跳上報(bào)邏輯執(zhí)行完后,間隔5S觸發(fā)下一次心跳上報(bào)。間隔時間由變量clientBeatInterval控制,受nacos服務(wù)端返回的心跳結(jié)果值的影響 心跳間隔可能會改變,nacos服務(wù)端從instance的元數(shù)據(jù)中尋找key為preserved.heart.beat.interval的值返回,如果為空則返回5S。 這個功能在dubbo2.7.4.1的版本里還不成熟,只能通過注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能夠直接在注冊中心的url參數(shù)配置就算成熟了,所以這個功能暫時不推薦使用,可以作為實(shí)驗(yàn)功能試試。
服務(wù)訂閱
org.apache.dubbo.registry.nacos.NacosRegistry:399
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
if (!nacosListeners.containsKey(serviceName)) {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
};
namingService.subscribe(serviceName, eventListener);
nacosListeners.put(serviceName, eventListener);
}
}nacos的服務(wù)監(jiān)聽是EventListener,所以dubbo的服務(wù)訂閱只需要將NotifyListener的處理包裝進(jìn)onEvent中處理即可, 通過namingService.subscribe添加nacos的訂閱。最終EventListener對象會被添加到事件調(diào)度器的監(jiān)聽器列表中,見如下代碼:
com.alibaba.nacos.client.naming.core.EventDispatcher:
public class EventDispatcher {
private ExecutorService executor = null;
private BlockingQueuechangedServices = new LinkedBlockingQueue();
private ConcurrentMap observerMap = new ConcurrentHashMap();
public EventDispatcher() {
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
thread.setDaemon(true);
return thread;
}
});
executor.execute(new Notifier());
}
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
Listobservers = Collections.synchronizedList(new ArrayList());
observers.add(listener);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
serviceChanged(serviceInfo);
}
public void removeListener(String serviceName, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
if (observers != null) {
Iteratoriter = observers.iterator();
while (iter.hasNext()) {
EventListener oldListener = iter.next();
if (oldListener.equals(listener)) {
iter.remove();
}
}
if (observers.isEmpty()) {
observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
}
}
}
public ListgetSubscribeServices() {
ListserviceInfos = new ArrayList();
for (String key : observerMap.keySet()) {
serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}
public void serviceChanged(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return;
}
changedServices.add(serviceInfo);
}
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}
if (serviceInfo == null) {
continue;
}
try {
Listlisteners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
Listhosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
}
public void setExecutor(ExecutorService executor) {
ExecutorService oldExecutor = this.executor;
this.executor = executor;
oldExecutor.shutdown();
}
}EventDispatcher中維護(hù)了一個監(jiān)聽器列表observerMap,同時維護(hù)了一個事件變更的阻塞隊(duì)列changedServices,監(jiān)聽調(diào)度器初始化后,會觸發(fā)一個線程消費(fèi)阻塞隊(duì)列的 數(shù)據(jù),當(dāng)注冊服務(wù)發(fā)生變化時,將變更數(shù)據(jù)入隊(duì),就能喚醒線程更新dubbo內(nèi)存中的服務(wù)列表了。上面已經(jīng)聊到,nacos client會以1s的頻次拉取注冊的實(shí)例,當(dāng)拉取到的實(shí)例和本地內(nèi)存的 有出入時,就會觸發(fā)入隊(duì)操作,如:
com.alibaba.nacos.client.naming.core.HostReactor:296
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
}DEFAULT_DELAY值為1s,同時,nacos也會主動的推送數(shù)據(jù)變更事件,當(dāng)遇到nacos主動推送時,serviceInfoMap中的serviceObj會被更新,那么下次 nacos client拉取的時間間隔會被設(shè)置成10S之后,具體的和本地列表比對的邏輯都在updateServiceNow方法內(nèi),這里就不展開講述了。
結(jié)語
dubbo注冊服務(wù)到nacos以及訂閱服務(wù)是一個比較復(fù)雜的過程,在剖析的過程中,帶著疑問去看源碼會有事半功倍的效果,比如博主在看源碼前, 首先是為了尋找nacos的心跳異常,然后對nacos如何實(shí)現(xiàn)事件監(jiān)聽比較好奇。然后層層剖析漸進(jìn)明朗恍然大悟。當(dāng)然在剖析dubbo注冊服務(wù)到nacos時,也需要了解 nacos服務(wù)端的處理邏輯,nacos服務(wù)端非常核心的兩個類ClientBeatCheckTask、ClientBeatProcessor,包含了心跳處理、健康檢測和事件推送的邏輯, 有興趣可以看看
以上就是dubbo服務(wù)注冊到nacos的過程剖析的詳細(xì)內(nèi)容,更多關(guān)于dubbo服務(wù)注冊到nacos的資料請關(guān)注腳本之家其它相關(guān)文章!
- 解決dubbo啟動報(bào)服務(wù)注冊失敗Failed?to?register?dubbo
- springboot發(fā)布dubbo服務(wù)注冊到nacos實(shí)現(xiàn)方式
- springboot整合Dubbo與Feign的實(shí)現(xiàn)?(無注冊中心)
- dubbo服務(wù)使用redis注冊中心的系列異常解決
- 升級dubbo2.7.4.1版本平滑遷移到注冊中心nacos
- Dubbo無法訪問遠(yuǎn)程Zookeeper已注冊服務(wù)的問題解決方案
- 從dubbo zookeeper注冊地址提取出zookeeper地址的方法
- 解決Dubbo應(yīng)用啟動注冊ZK獲取IP慢的原因之一
相關(guān)文章
spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式
這篇文章主要介紹了spring-@Autowired注入與構(gòu)造函數(shù)注入使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
解決IDEA?JDK9沒有module-info.java的問題
這篇文章主要介紹了解決IDEA?JDK9沒有module-info.java的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題
這篇文章主要介紹了解決SpringBoot的@DeleteMapping注解的方法不被調(diào)用問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01
Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢
這篇文章主要介紹了Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11
簡介Java的Spring框架的體系結(jié)構(gòu)以及安裝配置
這篇文章主要介紹了Java的Spring框架的體系結(jié)構(gòu)以及安裝配置,Spring框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12

