欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Nacos源碼之注冊(cè)中心的實(shí)現(xiàn)詳解

 更新時(shí)間:2023年02月09日 14:03:28   作者:leo的跟班  
這篇文章主要為大家介紹了Nacos源碼之注冊(cè)中心的實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

在平時(shí)的工作中多多少少都會(huì)接觸到注冊(cè)中心,當(dāng)你的應(yīng)用從單機(jī)到拆分成多個(gè)服務(wù),每個(gè)服務(wù)又有多個(gè)實(shí)例的情況時(shí),那么對(duì)服務(wù)IP地址管理的要求就會(huì)越來(lái)越高。而注冊(cè)中心就是干這個(gè)的。

最經(jīng)典的注冊(cè)中心實(shí)現(xiàn)方式是Zookeeper,在很多RPC框架中都有基于Zookeeper注冊(cè)中心的實(shí)現(xiàn),如Dubbo,Motan。有興趣的可以直接去閱讀相關(guān)源碼。

對(duì)于注冊(cè)中心的使用,其實(shí)就是在yaml文件中做一些配置,然后有對(duì)應(yīng)的管理頁(yè)面可以查看和操作。當(dāng)然我們肯定不僅僅局限于使用,更需要了解其背后的實(shí)現(xiàn)和設(shè)計(jì)。因?yàn)楣咀钚碌膽?yīng)用使用的是Nacos,所以近期簡(jiǎn)單閱讀了一下Nacos關(guān)于注冊(cè)中心的源碼實(shí)現(xiàn)。

基于的版本是2.1.2。

1-從DEMO出發(fā)

對(duì)于源碼的閱讀,可以從最簡(jiǎn)單的demo入門(mén)。

Nacos作為一個(gè)服務(wù)端,想要使用其服務(wù)發(fā)現(xiàn)功能,可以直接使用其提供的客戶(hù)端代碼。

在RPC框架中,我們的服務(wù)一般分為Provider和Consumer.

Provider會(huì)向注冊(cè)中心進(jìn)行服務(wù)注冊(cè)

Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
//注冊(cè)
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
System.out.println("instances after register: " + naming.getAllInstances("nacos.test.3"));

Consumer則會(huì)監(jiān)聽(tīng)對(duì)于服務(wù)注冊(cè)的實(shí)例信息

Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
		runnable -> {
			Thread thread = new Thread(runnable);
			thread.setName("test-thread");
			return thread;
		});
//訂閱服務(wù)列表
naming.subscribe("nacos.test.3", new AbstractEventListener() {
	//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
	//So you can override getExecutor() to async handle event.
	@Override
	public Executor getExecutor() {
		return executor;
	}
	@Override
	public void onEvent(Event event) {
		System.out.println("serviceName: " + ((NamingEvent) event).getServiceName());
		System.out.println("instances from event: " + ((NamingEvent) event).getInstances());
	}
});      

上面我們就已經(jīng)完成了服務(wù)的注冊(cè)與發(fā)現(xiàn),雖然在項(xiàng)目中都是基于SpringBoot整合來(lái)實(shí)現(xiàn),但是其本質(zhì)都是基于這些API代碼來(lái)實(shí)現(xiàn)。

2-服務(wù)注冊(cè)

通過(guò)上面的案例,我們已經(jīng)知道了如何想Nacos進(jìn)行服務(wù)的注冊(cè)。接下來(lái)就來(lái)看看在注冊(cè)的過(guò)程中都做了哪些事情吧。

void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;

通過(guò)注冊(cè)的接口,我們大概可以知道注冊(cè)一個(gè)服務(wù)需要哪些要素

服務(wù)名稱(chēng),實(shí)例的ip和端口,以及實(shí)例所屬的集群名稱(chēng)。這些要素其實(shí)就組成了Nacos服務(wù)的分級(jí)存儲(chǔ)模型。

在Service的上層是Group和Namespace,他們共同組成了注冊(cè)中心的數(shù)據(jù)模型。

我們通過(guò)官方文檔的兩張圖可以更加詳細(xì)的了解服務(wù)發(fā)現(xiàn)的數(shù)據(jù)模型:

有了上面的知識(shí),那么閱讀后面的源碼會(huì)輕松不少。

registerInstance接口會(huì)通過(guò)http或者grpc的方式向-->Nacos發(fā)起請(qǐng)求,來(lái)進(jìn)行服務(wù)的注冊(cè)。那么

這里就需要閱讀服務(wù)的代碼了。

服務(wù)端會(huì)通過(guò)下面的方法實(shí)現(xiàn)服務(wù)的注冊(cè)。

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

這個(gè)方法有兩個(gè)實(shí)現(xiàn)類(lèi),我這里選擇的是InstanceOperatorServiceImpl來(lái)查看(Nacos V1版本的實(shí)現(xiàn),另一種是V2版本的實(shí)現(xiàn))。

通過(guò)ServiceManager方法中的registerInstance去注冊(cè)實(shí)例 ,步驟如下:

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        Service service = getService(namespaceId, serviceName);
        checkServiceIsNull(service, namespaceId, serviceName);
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

1-創(chuàng)建Service

對(duì)應(yīng)服務(wù)分級(jí)存儲(chǔ)模型中的服務(wù)(如果存在就直接返回)

/**
 * Map(namespace, Map(group::serviceName, Service)).
 */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

上面是Service的存儲(chǔ)結(jié)構(gòu)ConcurrentHashMap。是一個(gè)雙層Map,先通過(guò)namespace找,然后在通過(guò)group和serviceName找到具體的service。這里其實(shí)可以回頭看我們之前的demo代碼,理解一下所傳遞的參數(shù)。

創(chuàng)建Serivce后會(huì)有一個(gè)putServiceAndInit方法做一些初始化操作,需要特別注意:

private void putServiceAndInit(Service service) throws NacosException {
	putService(service);
	service = getService(service.getNamespaceId(), service.getName());
	service.init();
	consistencyService
			.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
	consistencyService
			.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
	Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

這里的consistencyService代表了Nacos中的一致性協(xié)議。Nacos支持CP 協(xié)議以及 AP 協(xié)議。

對(duì)于注冊(cè)中心的服務(wù)發(fā)現(xiàn)功能,常用的就是AP協(xié)議,來(lái)保障服務(wù)的可用性。這里內(nèi)容較多就不展開(kāi)了。我們只需要關(guān)注AP協(xié)議對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)DistroConsistencyServiceImpl,Distro 協(xié)議是阿里自研的最終?致性協(xié)議。

Service初始化完成后往這個(gè)協(xié)議里面加了一個(gè)Listen監(jiān)聽(tīng)。當(dāng)有對(duì)應(yīng)的事件發(fā)生時(shí),就會(huì)調(diào)用Service中onChange方法。

2-添加實(shí)例到Service中

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        Service service = getService(namespaceId, serviceName);
        //鎖
        synchronized (service) {
            //Compare and get new instance list.將新注冊(cè)的和已經(jīng)存在的都返回
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            consistencyService.put(key, instances);
        }
    }

consistencyService.put(key, instances); 最終是將所有的instances通過(guò)一致性協(xié)議寫(xiě)入到Nacos集群中。

到這里當(dāng)前的registerInstance已經(jīng)結(jié)束了,后面的操作就全部在Distro協(xié)議中去完成了。

3-AP 協(xié)議下 consistencyService.put方法

在Distro協(xié)議下,Nacos的每個(gè)節(jié)點(diǎn)都是平等的處理寫(xiě)請(qǐng)求,并且把新數(shù)據(jù)會(huì)同步到其他的節(jié)點(diǎn)(關(guān)于此實(shí)現(xiàn)的詳細(xì)介紹這里不在展開(kāi))。

 public void onPut(String key, Record value) {
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            //寫(xiě)入
            dataStore.put(key, datum);
        }
        if (!listeners.containsKey(key)) {
            return;
        }
        notifier.addTask(key, DataOperation.CHANGE);
    }

可以看到數(shù)據(jù)會(huì)先寫(xiě)入到DataStore中,可以看到也是一個(gè)ConcurrentHashMap。

private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
	dataMap.put(key, value);
}

然后會(huì)將其添加到一個(gè)阻塞隊(duì)列中

public void addTask(String datumKey, DataOperation action) {
	if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
		return;
	}
	if (action == DataOperation.CHANGE) {
		services.put(datumKey, StringUtils.EMPTY);
	}
	tasks.offer(Pair.with(datumKey, action));
}
BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

這里放入阻塞隊(duì)列中,說(shuō)明肯定有異步線(xiàn)程去單獨(dú)消費(fèi)。這樣做的可以提升整體服務(wù)注冊(cè)的性能,并且可以避免并發(fā)加鎖的情況,因?yàn)槭窃谝粋€(gè)線(xiàn)程中進(jìn)行處理:

 public void run() {
	Loggers.DISTRO.info("distro notifier started");
	for (; ; ) {
		try {
			Pair<String, DataOperation> pair = tasks.take();
			handle(pair);
		} catch (Throwable e) {
			Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
		}
	}
}

在handel中就對(duì)應(yīng)了具體的處理

for (RecordListener listener : listeners.get(datumKey)) {
		count++;
		try {
			if (action == DataOperation.CHANGE) {
				listener.onChange(datumKey, dataStore.get(datumKey).value);
				continue;
			}
			if (action == DataOperation.DELETE) {
				listener.onDelete(datumKey);
				continue;
			}
		} catch (Throwable e) {
			Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
		}
}

這里listener.onChange(datumKey, dataStore.get(datumKey).value);就會(huì)觸發(fā)前面Service中添加的監(jiān)聽(tīng),代碼也就走回了Service中的onChange方法。

4-Serivce.onChange方法

public void onChange(String key, Instances value) throws Exception {
	Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
	for (Instance instance : value.getInstanceList()) {
		if (instance == null) {
			// Reject this abnormal instance list:
			throw new RuntimeException("got null instance " + key);
		}
		if (instance.getWeight() > 10000.0D) {
			instance.setWeight(10000.0D);
		}
		if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
			instance.setWeight(0.01D);
		}
	}
        //更新實(shí)例信息,并且通知訂閱者服務(wù)變化的信息
	updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
	recalculateChecksum();
}

這個(gè)方法主要做的就是將Service中的Cluster和Instance數(shù)據(jù)進(jìn)行更新(就是前面圖中數(shù)據(jù)模型對(duì)應(yīng)的集群和實(shí)例),并且通知訂閱者。

整個(gè)更新邏輯都在updateIPs方法中,更新的方式使用了copy-on-write的思想。

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        //新的ipMap
	Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
	for (String clusterName : clusterMap.keySet()) {
		ipMap.put(clusterName, new ArrayList<>());
	}
	for (Instance instance : instances) {
		try {
			if (instance == null) {
				Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
				continue;
			}
			if (StringUtils.isEmpty(instance.getClusterName())) {
				instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
			}
			if (!clusterMap.containsKey(instance.getClusterName())) {
				Loggers.SRV_LOG.warn(
						"cluster: {} not found, ip: {}, will create new cluster with default configuration.",
						instance.getClusterName(), instance.toJson());
				Cluster cluster = new Cluster(instance.getClusterName(), this);
				cluster.init();
				getClusterMap().put(instance.getClusterName(), cluster);
			}
			List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
			if (clusterIPs == null) {
				clusterIPs = new LinkedList<>();
				ipMap.put(instance.getClusterName(), clusterIPs);
			}
			clusterIPs.add(instance);
		} catch (Exception e) {
			Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
		}
	}
	for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
		//make every ip mine
		List<Instance> entryIPs = entry.getValue();
                //更新到真正的clusterMap中
		clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
	}
	setLastModifiedMillis(System.currentTimeMillis());
	getPushService().serviceChanged(this);
	ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
	StringBuilder stringBuilder = new StringBuilder();
	for (Instance instance : allIPs()) {
		stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
	}
	Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
			stringBuilder.toString());
}

最終是直接將Instance數(shù)據(jù)進(jìn)行了替換,使用這種方式可以解決讀寫(xiě)并發(fā)互斥的情況。

private Set<Instance> ephemeralInstances = new HashSet<>();
ephemeralInstances = toUpdateInstances;

最后通過(guò)PushService來(lái)通知訂閱者,底層是基于UDP的推送。

3-走回DEMO

到此我們?cè)诨氐絛emo中,大概就知道naming.subscribe方法中的onEvent方法是如何被調(diào)用的了??隙ㄊ墙邮盏搅藀ushService的通知,然后進(jìn)行回調(diào)了所有的subscribe。這塊代碼大家可以自行去查看。

4-總結(jié)和說(shuō)明

上面的代碼其實(shí)基本都是在1.x的版本中已經(jīng)存在和使用的,在2.x的版本中Nacos服務(wù)端注冊(cè)提供了v2版本的實(shí)現(xiàn)如果是InstanceControllerV2。

也提供了v2版本的實(shí)現(xiàn),大家也可以去學(xué)習(xí):

instanceServiceV2.registerInstance(namespaceId, serviceName, instance);

并且這只是Nacos的服務(wù)發(fā)現(xiàn)的源碼的一小部門(mén),只是入門(mén)和熟悉了一下Nacos的源碼。還有很多功能沒(méi)有涉及到,如健康檢查、Distro協(xié)議的設(shè)計(jì),是如何保證最終一致性的。這些在后面有時(shí)間的時(shí)候也會(huì)做一些記錄和分享。

以上就是Nacos源碼之注冊(cè)中心的實(shí)現(xiàn)詳解的詳細(xì)內(nèi)容,更多關(guān)于Nacos注冊(cè)中心源碼的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • spring和quartz整合,并簡(jiǎn)單調(diào)用(實(shí)例講解)

    spring和quartz整合,并簡(jiǎn)單調(diào)用(實(shí)例講解)

    下面小編就為大家?guī)?lái)一篇spring和quartz整合,并簡(jiǎn)單調(diào)用(實(shí)例講解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-07-07
  • JVM參數(shù)-Xms和-Xmx的作用及說(shuō)明

    JVM參數(shù)-Xms和-Xmx的作用及說(shuō)明

    這篇文章主要介紹了JVM參數(shù)-Xms和-Xmx的作用及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 基于SpringBoot使用MyBatis插件的問(wèn)題

    基于SpringBoot使用MyBatis插件的問(wèn)題

    MyBatis-Plus并不能為我們解決所有問(wèn)題,例如一些復(fù)雜的SQL,多表聯(lián)查,我們就需要自己去編寫(xiě)代碼和SQL語(yǔ)句,我們?cè)撊绾慰焖俚慕鉀Q這個(gè)問(wèn)題呢,這個(gè)時(shí)候可以使用MyBatisX插件,今天小編給大家?guī)?lái)了SpringBoot使用MyBatis插件問(wèn)題,感興趣的朋友一起看看吧
    2022-03-03
  • Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎

    Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎

    內(nèi)存泄漏是指無(wú)用對(duì)象持續(xù)占有內(nèi)存或無(wú)用對(duì)象的內(nèi)存得不到及時(shí)釋放,從而造成內(nèi)存空間的浪費(fèi)稱(chēng)為內(nèi)存泄漏。長(zhǎng)生命周期的對(duì)象持有短生命周期對(duì)象的引用就很可能發(fā)生內(nèi)存泄漏,盡管短生命周期對(duì)象已經(jīng)不再需要,但是因?yàn)殚L(zhǎng)生命周期持有它的引用而導(dǎo)致不能被回收
    2021-09-09
  • FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式)

    FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式)

    這篇文章主要介紹了FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java代碼性能測(cè)試實(shí)戰(zhàn)之ContiPerf安裝使用

    Java代碼性能測(cè)試實(shí)戰(zhàn)之ContiPerf安裝使用

    這篇文章主要為大家介紹了Java代碼性能測(cè)試實(shí)戰(zhàn)之ContiPerf安裝使用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-06-06
  • Mybatis通過(guò)Mapper代理連接數(shù)據(jù)庫(kù)的方法

    Mybatis通過(guò)Mapper代理連接數(shù)據(jù)庫(kù)的方法

    這篇文章主要介紹了Mybatis通過(guò)Mapper代理連接數(shù)據(jù)庫(kù)的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-11-11
  • Spring的定時(shí)任務(wù)@Scheduled源碼詳解

    Spring的定時(shí)任務(wù)@Scheduled源碼詳解

    這篇文章主要介紹了Spring的定時(shí)任務(wù)@Scheduled源碼詳解,@Scheduled注解是包org.springframework.scheduling.annotation中的一個(gè)注解,主要是用來(lái)開(kāi)啟定時(shí)任務(wù),本文提供了部分實(shí)現(xiàn)代碼與思路,需要的朋友可以參考下
    2023-09-09
  • 六個(gè)Java集合使用時(shí)需要注意的事項(xiàng)

    六個(gè)Java集合使用時(shí)需要注意的事項(xiàng)

    這篇文章主要為大家詳細(xì)介紹了六個(gè)Java集合使用時(shí)需要注意的事項(xiàng),文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)java有一定的幫助,需要的可以參考一下
    2023-01-01
  • Spring的事件機(jī)制知識(shí)點(diǎn)詳解及實(shí)例分析

    Spring的事件機(jī)制知識(shí)點(diǎn)詳解及實(shí)例分析

    在本篇內(nèi)容里小編給大家分享的是一篇關(guān)于Spring的事件機(jī)制知識(shí)點(diǎn)詳解及實(shí)例分析,有需要的朋友么可以參考下。
    2021-12-12

最新評(píng)論