Nacos源碼之注冊(cè)中心的實(shí)現(xiàn)詳解
引言
在平時(shí)的工作中多多少少都會(huì)接觸到注冊(cè)中心,當(dāng)你的應(yīng)用從單機(jī)到拆分成多個(gè)服務(wù),每個(gè)服務(wù)又有多個(gè)實(shí)例的情況時(shí),那么對(duì)服務(wù)IP地址管理的要求就會(huì)越來(lái)越高。而注冊(cè)中心就是干這個(gè)的。
最經(jīng)典的注冊(cè)中心實(shí)現(xiàn)方式是Zookeeper,在很多RPC框架中都有基于Zookeeper注冊(cè)中心的實(shí)現(xiàn),如Dubbo,Motan。有興趣的可以直接去閱讀相關(guān)源碼。
對(duì)于注冊(cè)中心的使用,其實(shí)就是在yaml文件中做一些配置,然后有對(duì)應(yīng)的管理頁(yè)面可以查看和操作。當(dāng)然我們肯定不僅僅局限于使用,更需要了解其背后的實(shí)現(xiàn)和設(shè)計(jì)。因?yàn)楣咀钚碌膽?yīng)用使用的是Nacos,所以近期簡(jiǎn)單閱讀了一下Nacos關(guān)于注冊(cè)中心的源碼實(shí)現(xiàn)。
基于的版本是2.1.2。
1-從DEMO出發(fā)
對(duì)于源碼的閱讀,可以從最簡(jiǎn)單的demo入門(mén)。
Nacos作為一個(gè)服務(wù)端,想要使用其服務(wù)發(fā)現(xiàn)功能,可以直接使用其提供的客戶(hù)端代碼。
在RPC框架中,我們的服務(wù)一般分為Provider和Consumer.
Provider會(huì)向注冊(cè)中心進(jìn)行服務(wù)注冊(cè)
Properties properties = new Properties(); properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost")); properties.setProperty("namespace", System.getProperty("namespace", "public")); NamingService naming = NamingFactory.createNamingService(properties); //注冊(cè) naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1"); System.out.println("instances after register: " + naming.getAllInstances("nacos.test.3"));
Consumer則會(huì)監(jiān)聽(tīng)對(duì)于服務(wù)注冊(cè)的實(shí)例信息
Properties properties = new Properties(); properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost")); properties.setProperty("namespace", System.getProperty("namespace", "public")); NamingService naming = NamingFactory.createNamingService(properties); Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), runnable -> { Thread thread = new Thread(runnable); thread.setName("test-thread"); return thread; }); //訂閱服務(wù)列表 naming.subscribe("nacos.test.3", new AbstractEventListener() { //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback. //So you can override getExecutor() to async handle event. @Override public Executor getExecutor() { return executor; } @Override public void onEvent(Event event) { System.out.println("serviceName: " + ((NamingEvent) event).getServiceName()); System.out.println("instances from event: " + ((NamingEvent) event).getInstances()); } });
上面我們就已經(jīng)完成了服務(wù)的注冊(cè)與發(fā)現(xiàn),雖然在項(xiàng)目中都是基于SpringBoot整合來(lái)實(shí)現(xiàn),但是其本質(zhì)都是基于這些API代碼來(lái)實(shí)現(xiàn)。
2-服務(wù)注冊(cè)
通過(guò)上面的案例,我們已經(jīng)知道了如何想Nacos進(jìn)行服務(wù)的注冊(cè)。接下來(lái)就來(lái)看看在注冊(cè)的過(guò)程中都做了哪些事情吧。
void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;
通過(guò)注冊(cè)的接口,我們大概可以知道注冊(cè)一個(gè)服務(wù)需要哪些要素
服務(wù)名稱(chēng),實(shí)例的ip和端口,以及實(shí)例所屬的集群名稱(chēng)。這些要素其實(shí)就組成了Nacos服務(wù)的分級(jí)存儲(chǔ)模型。
在Service的上層是Group和Namespace,他們共同組成了注冊(cè)中心的數(shù)據(jù)模型。
我們通過(guò)官方文檔的兩張圖可以更加詳細(xì)的了解服務(wù)發(fā)現(xiàn)的數(shù)據(jù)模型:
有了上面的知識(shí),那么閱讀后面的源碼會(huì)輕松不少。
registerInstance接口會(huì)通過(guò)http或者grpc的方式向-->Nacos發(fā)起請(qǐng)求,來(lái)進(jìn)行服務(wù)的注冊(cè)。那么
這里就需要閱讀服務(wù)的代碼了。
服務(wù)端會(huì)通過(guò)下面的方法實(shí)現(xiàn)服務(wù)的注冊(cè)。
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
這個(gè)方法有兩個(gè)實(shí)現(xiàn)類(lèi),我這里選擇的是InstanceOperatorServiceImpl來(lái)查看(Nacos V1版本的實(shí)現(xiàn),另一種是V2版本的實(shí)現(xiàn))。
通過(guò)ServiceManager方法中的registerInstance去注冊(cè)實(shí)例 ,步驟如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); checkServiceIsNull(service, namespaceId, serviceName); addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
1-創(chuàng)建Service
對(duì)應(yīng)服務(wù)分級(jí)存儲(chǔ)模型中的服務(wù)(如果存在就直接返回)
/** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
上面是Service的存儲(chǔ)結(jié)構(gòu)ConcurrentHashMap。是一個(gè)雙層Map,先通過(guò)namespace找,然后在通過(guò)group和serviceName找到具體的service。這里其實(shí)可以回頭看我們之前的demo代碼,理解一下所傳遞的參數(shù)。
創(chuàng)建Serivce后會(huì)有一個(gè)putServiceAndInit方法做一些初始化操作,需要特別注意:
private void putServiceAndInit(Service service) throws NacosException { putService(service); service = getService(service.getNamespaceId(), service.getName()); service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
這里的consistencyService代表了Nacos中的一致性協(xié)議。Nacos支持CP 協(xié)議以及 AP 協(xié)議。
對(duì)于注冊(cè)中心的服務(wù)發(fā)現(xiàn)功能,常用的就是AP協(xié)議,來(lái)保障服務(wù)的可用性。這里內(nèi)容較多就不展開(kāi)了。我們只需要關(guān)注AP協(xié)議對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)DistroConsistencyServiceImpl,Distro 協(xié)議是阿里自研的最終?致性協(xié)議。
Service初始化完成后往這個(gè)協(xié)議里面加了一個(gè)Listen監(jiān)聽(tīng)。當(dāng)有對(duì)應(yīng)的事件發(fā)生時(shí),就會(huì)調(diào)用Service中onChange方法。
2-添加實(shí)例到Service中
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); //鎖 synchronized (service) { //Compare and get new instance list.將新注冊(cè)的和已經(jīng)存在的都返回 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
consistencyService.put(key, instances); 最終是將所有的instances通過(guò)一致性協(xié)議寫(xiě)入到Nacos集群中。
到這里當(dāng)前的registerInstance已經(jīng)結(jié)束了,后面的操作就全部在Distro協(xié)議中去完成了。
3-AP 協(xié)議下 consistencyService.put方法
在Distro協(xié)議下,Nacos的每個(gè)節(jié)點(diǎn)都是平等的處理寫(xiě)請(qǐng)求,并且把新數(shù)據(jù)會(huì)同步到其他的節(jié)點(diǎn)(關(guān)于此實(shí)現(xiàn)的詳細(xì)介紹這里不在展開(kāi))。
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //寫(xiě)入 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } notifier.addTask(key, DataOperation.CHANGE); }
可以看到數(shù)據(jù)會(huì)先寫(xiě)入到DataStore中,可以看到也是一個(gè)ConcurrentHashMap。
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024); public void put(String key, Datum value) { dataMap.put(key, value); }
然后會(huì)將其添加到一個(gè)阻塞隊(duì)列中
public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); } BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
這里放入阻塞隊(duì)列中,說(shuō)明肯定有異步線(xiàn)程去單獨(dú)消費(fèi)。這樣做的可以提升整體服務(wù)注冊(cè)的性能,并且可以避免并發(fā)加鎖的情況,因?yàn)槭窃谝粋€(gè)線(xiàn)程中進(jìn)行處理:
public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
在handel中就對(duì)應(yīng)了具體的處理
for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } }
這里listener.onChange(datumKey, dataStore.get(datumKey).value);就會(huì)觸發(fā)前面Service中添加的監(jiān)聽(tīng),代碼也就走回了Service中的onChange方法。
4-Serivce.onChange方法
public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //更新實(shí)例信息,并且通知訂閱者服務(wù)變化的信息 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
這個(gè)方法主要做的就是將Service中的Cluster和Instance數(shù)據(jù)進(jìn)行更新(就是前面圖中數(shù)據(jù)模型對(duì)應(yīng)的集群和實(shí)例),并且通知訂閱者。
整個(gè)更新邏輯都在updateIPs方法中,更新的方式使用了copy-on-write的思想。
public void updateIPs(Collection<Instance> instances, boolean ephemeral) { //新的ipMap Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG.warn( "cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //更新到真正的clusterMap中 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(','); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }
最終是直接將Instance數(shù)據(jù)進(jìn)行了替換,使用這種方式可以解決讀寫(xiě)并發(fā)互斥的情況。
private Set<Instance> ephemeralInstances = new HashSet<>(); ephemeralInstances = toUpdateInstances;
最后通過(guò)PushService來(lái)通知訂閱者,底層是基于UDP的推送。
3-走回DEMO
到此我們?cè)诨氐絛emo中,大概就知道naming.subscribe方法中的onEvent方法是如何被調(diào)用的了??隙ㄊ墙邮盏搅藀ushService的通知,然后進(jìn)行回調(diào)了所有的subscribe。這塊代碼大家可以自行去查看。
4-總結(jié)和說(shuō)明
上面的代碼其實(shí)基本都是在1.x的版本中已經(jīng)存在和使用的,在2.x的版本中Nacos服務(wù)端注冊(cè)提供了v2版本的實(shí)現(xiàn)如果是InstanceControllerV2。
也提供了v2版本的實(shí)現(xiàn),大家也可以去學(xué)習(xí):
instanceServiceV2.registerInstance(namespaceId, serviceName, instance);
并且這只是Nacos的服務(wù)發(fā)現(xiàn)的源碼的一小部門(mén),只是入門(mén)和熟悉了一下Nacos的源碼。還有很多功能沒(méi)有涉及到,如健康檢查、Distro協(xié)議的設(shè)計(jì),是如何保證最終一致性的。這些在后面有時(shí)間的時(shí)候也會(huì)做一些記錄和分享。
以上就是Nacos源碼之注冊(cè)中心的實(shí)現(xiàn)詳解的詳細(xì)內(nèi)容,更多關(guān)于Nacos注冊(cè)中心源碼的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
spring和quartz整合,并簡(jiǎn)單調(diào)用(實(shí)例講解)
下面小編就為大家?guī)?lái)一篇spring和quartz整合,并簡(jiǎn)單調(diào)用(實(shí)例講解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-07-07JVM參數(shù)-Xms和-Xmx的作用及說(shuō)明
這篇文章主要介紹了JVM參數(shù)-Xms和-Xmx的作用及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08基于SpringBoot使用MyBatis插件的問(wèn)題
MyBatis-Plus并不能為我們解決所有問(wèn)題,例如一些復(fù)雜的SQL,多表聯(lián)查,我們就需要自己去編寫(xiě)代碼和SQL語(yǔ)句,我們?cè)撊绾慰焖俚慕鉀Q這個(gè)問(wèn)題呢,這個(gè)時(shí)候可以使用MyBatisX插件,今天小編給大家?guī)?lái)了SpringBoot使用MyBatis插件問(wèn)題,感興趣的朋友一起看看吧2022-03-03Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎
內(nèi)存泄漏是指無(wú)用對(duì)象持續(xù)占有內(nèi)存或無(wú)用對(duì)象的內(nèi)存得不到及時(shí)釋放,從而造成內(nèi)存空間的浪費(fèi)稱(chēng)為內(nèi)存泄漏。長(zhǎng)生命周期的對(duì)象持有短生命周期對(duì)象的引用就很可能發(fā)生內(nèi)存泄漏,盡管短生命周期對(duì)象已經(jīng)不再需要,但是因?yàn)殚L(zhǎng)生命周期持有它的引用而導(dǎo)致不能被回收2021-09-09FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式)
這篇文章主要介紹了FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Java代碼性能測(cè)試實(shí)戰(zhàn)之ContiPerf安裝使用
這篇文章主要為大家介紹了Java代碼性能測(cè)試實(shí)戰(zhàn)之ContiPerf安裝使用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06Mybatis通過(guò)Mapper代理連接數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了Mybatis通過(guò)Mapper代理連接數(shù)據(jù)庫(kù)的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-11-11Spring的定時(shí)任務(wù)@Scheduled源碼詳解
這篇文章主要介紹了Spring的定時(shí)任務(wù)@Scheduled源碼詳解,@Scheduled注解是包org.springframework.scheduling.annotation中的一個(gè)注解,主要是用來(lái)開(kāi)啟定時(shí)任務(wù),本文提供了部分實(shí)現(xiàn)代碼與思路,需要的朋友可以參考下2023-09-09六個(gè)Java集合使用時(shí)需要注意的事項(xiàng)
這篇文章主要為大家詳細(xì)介紹了六個(gè)Java集合使用時(shí)需要注意的事項(xiàng),文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)java有一定的幫助,需要的可以參考一下2023-01-01Spring的事件機(jī)制知識(shí)點(diǎn)詳解及實(shí)例分析
在本篇內(nèi)容里小編給大家分享的是一篇關(guān)于Spring的事件機(jī)制知識(shí)點(diǎn)詳解及實(shí)例分析,有需要的朋友么可以參考下。2021-12-12