欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Nacos源碼之注冊中心的實現(xiàn)詳解

 更新時間:2023年02月09日 14:03:28   作者:leo的跟班  
這篇文章主要為大家介紹了Nacos源碼之注冊中心的實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

在平時的工作中多多少少都會接觸到注冊中心,當你的應用從單機到拆分成多個服務,每個服務又有多個實例的情況時,那么對服務IP地址管理的要求就會越來越高。而注冊中心就是干這個的。

最經(jīng)典的注冊中心實現(xiàn)方式是Zookeeper,在很多RPC框架中都有基于Zookeeper注冊中心的實現(xiàn),如Dubbo,Motan。有興趣的可以直接去閱讀相關源碼。

對于注冊中心的使用,其實就是在yaml文件中做一些配置,然后有對應的管理頁面可以查看和操作。當然我們肯定不僅僅局限于使用,更需要了解其背后的實現(xiàn)和設計。因為公司最新的應用使用的是Nacos,所以近期簡單閱讀了一下Nacos關于注冊中心的源碼實現(xiàn)。

基于的版本是2.1.2。

1-從DEMO出發(fā)

對于源碼的閱讀,可以從最簡單的demo入門。

Nacos作為一個服務端,想要使用其服務發(fā)現(xiàn)功能,可以直接使用其提供的客戶端代碼。

在RPC框架中,我們的服務一般分為Provider和Consumer.

Provider會向注冊中心進行服務注冊

Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
//注冊
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
System.out.println("instances after register: " + naming.getAllInstances("nacos.test.3"));

Consumer則會監(jiān)聽對于服務注冊的實例信息

Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
		runnable -> {
			Thread thread = new Thread(runnable);
			thread.setName("test-thread");
			return thread;
		});
//訂閱服務列表
naming.subscribe("nacos.test.3", new AbstractEventListener() {
	//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
	//So you can override getExecutor() to async handle event.
	@Override
	public Executor getExecutor() {
		return executor;
	}
	@Override
	public void onEvent(Event event) {
		System.out.println("serviceName: " + ((NamingEvent) event).getServiceName());
		System.out.println("instances from event: " + ((NamingEvent) event).getInstances());
	}
});      

上面我們就已經(jīng)完成了服務的注冊與發(fā)現(xiàn),雖然在項目中都是基于SpringBoot整合來實現(xiàn),但是其本質都是基于這些API代碼來實現(xiàn)。

2-服務注冊

通過上面的案例,我們已經(jīng)知道了如何想Nacos進行服務的注冊。接下來就來看看在注冊的過程中都做了哪些事情吧。

void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;

通過注冊的接口,我們大概可以知道注冊一個服務需要哪些要素

服務名稱,實例的ip和端口,以及實例所屬的集群名稱。這些要素其實就組成了Nacos服務的分級存儲模型。

在Service的上層是Group和Namespace,他們共同組成了注冊中心的數(shù)據(jù)模型。

我們通過官方文檔的兩張圖可以更加詳細的了解服務發(fā)現(xiàn)的數(shù)據(jù)模型:

有了上面的知識,那么閱讀后面的源碼會輕松不少。

registerInstance接口會通過http或者grpc的方式向-->Nacos發(fā)起請求,來進行服務的注冊。那么

這里就需要閱讀服務的代碼了。

服務端會通過下面的方法實現(xiàn)服務的注冊。

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

這個方法有兩個實現(xiàn)類,我這里選擇的是InstanceOperatorServiceImpl來查看(Nacos V1版本的實現(xiàn),另一種是V2版本的實現(xiàn))。

通過ServiceManager方法中的registerInstance去注冊實例 ,步驟如下:

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        Service service = getService(namespaceId, serviceName);
        checkServiceIsNull(service, namespaceId, serviceName);
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

1-創(chuàng)建Service

對應服務分級存儲模型中的服務(如果存在就直接返回)

/**
 * Map(namespace, Map(group::serviceName, Service)).
 */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

上面是Service的存儲結構ConcurrentHashMap。是一個雙層Map,先通過namespace找,然后在通過group和serviceName找到具體的service。這里其實可以回頭看我們之前的demo代碼,理解一下所傳遞的參數(shù)。

創(chuàng)建Serivce后會有一個putServiceAndInit方法做一些初始化操作,需要特別注意:

private void putServiceAndInit(Service service) throws NacosException {
	putService(service);
	service = getService(service.getNamespaceId(), service.getName());
	service.init();
	consistencyService
			.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
	consistencyService
			.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
	Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

這里的consistencyService代表了Nacos中的一致性協(xié)議。Nacos支持CP 協(xié)議以及 AP 協(xié)議。

對于注冊中心的服務發(fā)現(xiàn)功能,常用的就是AP協(xié)議,來保障服務的可用性。這里內(nèi)容較多就不展開了。我們只需要關注AP協(xié)議對應的實現(xiàn)類DistroConsistencyServiceImpl,Distro 協(xié)議是阿里自研的最終?致性協(xié)議。

Service初始化完成后往這個協(xié)議里面加了一個Listen監(jiān)聽。當有對應的事件發(fā)生時,就會調(diào)用Service中onChange方法。

2-添加實例到Service中

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        Service service = getService(namespaceId, serviceName);
        //鎖
        synchronized (service) {
            //Compare and get new instance list.將新注冊的和已經(jīng)存在的都返回
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            consistencyService.put(key, instances);
        }
    }

consistencyService.put(key, instances); 最終是將所有的instances通過一致性協(xié)議寫入到Nacos集群中。

到這里當前的registerInstance已經(jīng)結束了,后面的操作就全部在Distro協(xié)議中去完成了。

3-AP 協(xié)議下 consistencyService.put方法

在Distro協(xié)議下,Nacos的每個節(jié)點都是平等的處理寫請求,并且把新數(shù)據(jù)會同步到其他的節(jié)點(關于此實現(xiàn)的詳細介紹這里不在展開)。

 public void onPut(String key, Record value) {
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            //寫入
            dataStore.put(key, datum);
        }
        if (!listeners.containsKey(key)) {
            return;
        }
        notifier.addTask(key, DataOperation.CHANGE);
    }

可以看到數(shù)據(jù)會先寫入到DataStore中,可以看到也是一個ConcurrentHashMap。

private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
	dataMap.put(key, value);
}

然后會將其添加到一個阻塞隊列中

public void addTask(String datumKey, DataOperation action) {
	if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
		return;
	}
	if (action == DataOperation.CHANGE) {
		services.put(datumKey, StringUtils.EMPTY);
	}
	tasks.offer(Pair.with(datumKey, action));
}
BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

這里放入阻塞隊列中,說明肯定有異步線程去單獨消費。這樣做的可以提升整體服務注冊的性能,并且可以避免并發(fā)加鎖的情況,因為是在一個線程中進行處理:

 public void run() {
	Loggers.DISTRO.info("distro notifier started");
	for (; ; ) {
		try {
			Pair<String, DataOperation> pair = tasks.take();
			handle(pair);
		} catch (Throwable e) {
			Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
		}
	}
}

在handel中就對應了具體的處理

for (RecordListener listener : listeners.get(datumKey)) {
		count++;
		try {
			if (action == DataOperation.CHANGE) {
				listener.onChange(datumKey, dataStore.get(datumKey).value);
				continue;
			}
			if (action == DataOperation.DELETE) {
				listener.onDelete(datumKey);
				continue;
			}
		} catch (Throwable e) {
			Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
		}
}

這里listener.onChange(datumKey, dataStore.get(datumKey).value);就會觸發(fā)前面Service中添加的監(jiān)聽,代碼也就走回了Service中的onChange方法。

4-Serivce.onChange方法

public void onChange(String key, Instances value) throws Exception {
	Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
	for (Instance instance : value.getInstanceList()) {
		if (instance == null) {
			// Reject this abnormal instance list:
			throw new RuntimeException("got null instance " + key);
		}
		if (instance.getWeight() > 10000.0D) {
			instance.setWeight(10000.0D);
		}
		if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
			instance.setWeight(0.01D);
		}
	}
        //更新實例信息,并且通知訂閱者服務變化的信息
	updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
	recalculateChecksum();
}

這個方法主要做的就是將Service中的Cluster和Instance數(shù)據(jù)進行更新(就是前面圖中數(shù)據(jù)模型對應的集群和實例),并且通知訂閱者。

整個更新邏輯都在updateIPs方法中,更新的方式使用了copy-on-write的思想。

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        //新的ipMap
	Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
	for (String clusterName : clusterMap.keySet()) {
		ipMap.put(clusterName, new ArrayList<>());
	}
	for (Instance instance : instances) {
		try {
			if (instance == null) {
				Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
				continue;
			}
			if (StringUtils.isEmpty(instance.getClusterName())) {
				instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
			}
			if (!clusterMap.containsKey(instance.getClusterName())) {
				Loggers.SRV_LOG.warn(
						"cluster: {} not found, ip: {}, will create new cluster with default configuration.",
						instance.getClusterName(), instance.toJson());
				Cluster cluster = new Cluster(instance.getClusterName(), this);
				cluster.init();
				getClusterMap().put(instance.getClusterName(), cluster);
			}
			List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
			if (clusterIPs == null) {
				clusterIPs = new LinkedList<>();
				ipMap.put(instance.getClusterName(), clusterIPs);
			}
			clusterIPs.add(instance);
		} catch (Exception e) {
			Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
		}
	}
	for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
		//make every ip mine
		List<Instance> entryIPs = entry.getValue();
                //更新到真正的clusterMap中
		clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
	}
	setLastModifiedMillis(System.currentTimeMillis());
	getPushService().serviceChanged(this);
	ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
	StringBuilder stringBuilder = new StringBuilder();
	for (Instance instance : allIPs()) {
		stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
	}
	Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
			stringBuilder.toString());
}

最終是直接將Instance數(shù)據(jù)進行了替換,使用這種方式可以解決讀寫并發(fā)互斥的情況。

private Set<Instance> ephemeralInstances = new HashSet<>();
ephemeralInstances = toUpdateInstances;

最后通過PushService來通知訂閱者,底層是基于UDP的推送。

3-走回DEMO

到此我們在回到demo中,大概就知道naming.subscribe方法中的onEvent方法是如何被調(diào)用的了??隙ㄊ墙邮盏搅藀ushService的通知,然后進行回調(diào)了所有的subscribe。這塊代碼大家可以自行去查看。

4-總結和說明

上面的代碼其實基本都是在1.x的版本中已經(jīng)存在和使用的,在2.x的版本中Nacos服務端注冊提供了v2版本的實現(xiàn)如果是InstanceControllerV2。

也提供了v2版本的實現(xiàn),大家也可以去學習:

instanceServiceV2.registerInstance(namespaceId, serviceName, instance);

并且這只是Nacos的服務發(fā)現(xiàn)的源碼的一小部門,只是入門和熟悉了一下Nacos的源碼。還有很多功能沒有涉及到,如健康檢查、Distro協(xié)議的設計,是如何保證最終一致性的。這些在后面有時間的時候也會做一些記錄和分享。

以上就是Nacos源碼之注冊中心的實現(xiàn)詳解的詳細內(nèi)容,更多關于Nacos注冊中心源碼的資料請關注腳本之家其它相關文章!

相關文章

  • spring和quartz整合,并簡單調(diào)用(實例講解)

    spring和quartz整合,并簡單調(diào)用(實例講解)

    下面小編就為大家?guī)硪黄猻pring和quartz整合,并簡單調(diào)用(實例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • JVM參數(shù)-Xms和-Xmx的作用及說明

    JVM參數(shù)-Xms和-Xmx的作用及說明

    這篇文章主要介紹了JVM參數(shù)-Xms和-Xmx的作用及說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 基于SpringBoot使用MyBatis插件的問題

    基于SpringBoot使用MyBatis插件的問題

    MyBatis-Plus并不能為我們解決所有問題,例如一些復雜的SQL,多表聯(lián)查,我們就需要自己去編寫代碼和SQL語句,我們該如何快速的解決這個問題呢,這個時候可以使用MyBatisX插件,今天小編給大家?guī)砹薙pringBoot使用MyBatis插件問題,感興趣的朋友一起看看吧
    2022-03-03
  • Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎

    Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎

    內(nèi)存泄漏是指無用對象持續(xù)占有內(nèi)存或無用對象的內(nèi)存得不到及時釋放,從而造成內(nèi)存空間的浪費稱為內(nèi)存泄漏。長生命周期的對象持有短生命周期對象的引用就很可能發(fā)生內(nèi)存泄漏,盡管短生命周期對象已經(jīng)不再需要,但是因為長生命周期持有它的引用而導致不能被回收
    2021-09-09
  • FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式)

    FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式)

    這篇文章主要介紹了FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java代碼性能測試實戰(zhàn)之ContiPerf安裝使用

    Java代碼性能測試實戰(zhàn)之ContiPerf安裝使用

    這篇文章主要為大家介紹了Java代碼性能測試實戰(zhàn)之ContiPerf安裝使用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-06-06
  • Mybatis通過Mapper代理連接數(shù)據(jù)庫的方法

    Mybatis通過Mapper代理連接數(shù)據(jù)庫的方法

    這篇文章主要介紹了Mybatis通過Mapper代理連接數(shù)據(jù)庫的方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-11-11
  • Spring的定時任務@Scheduled源碼詳解

    Spring的定時任務@Scheduled源碼詳解

    這篇文章主要介紹了Spring的定時任務@Scheduled源碼詳解,@Scheduled注解是包org.springframework.scheduling.annotation中的一個注解,主要是用來開啟定時任務,本文提供了部分實現(xiàn)代碼與思路,需要的朋友可以參考下
    2023-09-09
  • 六個Java集合使用時需要注意的事項

    六個Java集合使用時需要注意的事項

    這篇文章主要為大家詳細介紹了六個Java集合使用時需要注意的事項,文中的示例代碼講解詳細,對我們學習java有一定的幫助,需要的可以參考一下
    2023-01-01
  • Spring的事件機制知識點詳解及實例分析

    Spring的事件機制知識點詳解及實例分析

    在本篇內(nèi)容里小編給大家分享的是一篇關于Spring的事件機制知識點詳解及實例分析,有需要的朋友么可以參考下。
    2021-12-12

最新評論