Nacos源碼之注冊中心的實現(xiàn)詳解
引言
在平時的工作中多多少少都會接觸到注冊中心,當你的應用從單機到拆分成多個服務,每個服務又有多個實例的情況時,那么對服務IP地址管理的要求就會越來越高。而注冊中心就是干這個的。
最經(jīng)典的注冊中心實現(xiàn)方式是Zookeeper,在很多RPC框架中都有基于Zookeeper注冊中心的實現(xiàn),如Dubbo,Motan。有興趣的可以直接去閱讀相關源碼。
對于注冊中心的使用,其實就是在yaml文件中做一些配置,然后有對應的管理頁面可以查看和操作。當然我們肯定不僅僅局限于使用,更需要了解其背后的實現(xiàn)和設計。因為公司最新的應用使用的是Nacos,所以近期簡單閱讀了一下Nacos關于注冊中心的源碼實現(xiàn)。
基于的版本是2.1.2。
1-從DEMO出發(fā)
對于源碼的閱讀,可以從最簡單的demo入門。
Nacos作為一個服務端,想要使用其服務發(fā)現(xiàn)功能,可以直接使用其提供的客戶端代碼。
在RPC框架中,我們的服務一般分為Provider和Consumer.
Provider會向注冊中心進行服務注冊
Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
//注冊
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
System.out.println("instances after register: " + naming.getAllInstances("nacos.test.3"));
Consumer則會監(jiān)聽對于服務注冊的實例信息
Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("test-thread");
return thread;
});
//訂閱服務列表
naming.subscribe("nacos.test.3", new AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onEvent(Event event) {
System.out.println("serviceName: " + ((NamingEvent) event).getServiceName());
System.out.println("instances from event: " + ((NamingEvent) event).getInstances());
}
});
上面我們就已經(jīng)完成了服務的注冊與發(fā)現(xiàn),雖然在項目中都是基于SpringBoot整合來實現(xiàn),但是其本質都是基于這些API代碼來實現(xiàn)。
2-服務注冊
通過上面的案例,我們已經(jīng)知道了如何想Nacos進行服務的注冊。接下來就來看看在注冊的過程中都做了哪些事情吧。
void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;
通過注冊的接口,我們大概可以知道注冊一個服務需要哪些要素
服務名稱,實例的ip和端口,以及實例所屬的集群名稱。這些要素其實就組成了Nacos服務的分級存儲模型。
在Service的上層是Group和Namespace,他們共同組成了注冊中心的數(shù)據(jù)模型。
我們通過官方文檔的兩張圖可以更加詳細的了解服務發(fā)現(xiàn)的數(shù)據(jù)模型:


有了上面的知識,那么閱讀后面的源碼會輕松不少。
registerInstance接口會通過http或者grpc的方式向-->Nacos發(fā)起請求,來進行服務的注冊。那么
這里就需要閱讀服務的代碼了。
服務端會通過下面的方法實現(xiàn)服務的注冊。
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
這個方法有兩個實現(xiàn)類,我這里選擇的是InstanceOperatorServiceImpl來查看(Nacos V1版本的實現(xiàn),另一種是V2版本的實現(xiàn))。
通過ServiceManager方法中的registerInstance去注冊實例 ,步驟如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
checkServiceIsNull(service, namespaceId, serviceName);
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
1-創(chuàng)建Service
對應服務分級存儲模型中的服務(如果存在就直接返回)
/** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
上面是Service的存儲結構ConcurrentHashMap。是一個雙層Map,先通過namespace找,然后在通過group和serviceName找到具體的service。這里其實可以回頭看我們之前的demo代碼,理解一下所傳遞的參數(shù)。
創(chuàng)建Serivce后會有一個putServiceAndInit方法做一些初始化操作,需要特別注意:
private void putServiceAndInit(Service service) throws NacosException {
putService(service);
service = getService(service.getNamespaceId(), service.getName());
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
這里的consistencyService代表了Nacos中的一致性協(xié)議。Nacos支持CP 協(xié)議以及 AP 協(xié)議。
對于注冊中心的服務發(fā)現(xiàn)功能,常用的就是AP協(xié)議,來保障服務的可用性。這里內(nèi)容較多就不展開了。我們只需要關注AP協(xié)議對應的實現(xiàn)類DistroConsistencyServiceImpl,Distro 協(xié)議是阿里自研的最終?致性協(xié)議。
Service初始化完成后往這個協(xié)議里面加了一個Listen監(jiān)聽。當有對應的事件發(fā)生時,就會調(diào)用Service中onChange方法。
2-添加實例到Service中
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
//鎖
synchronized (service) {
//Compare and get new instance list.將新注冊的和已經(jīng)存在的都返回
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}
consistencyService.put(key, instances); 最終是將所有的instances通過一致性協(xié)議寫入到Nacos集群中。
到這里當前的registerInstance已經(jīng)結束了,后面的操作就全部在Distro協(xié)議中去完成了。
3-AP 協(xié)議下 consistencyService.put方法
在Distro協(xié)議下,Nacos的每個節(jié)點都是平等的處理寫請求,并且把新數(shù)據(jù)會同步到其他的節(jié)點(關于此實現(xiàn)的詳細介紹這里不在展開)。
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//寫入
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
可以看到數(shù)據(jù)會先寫入到DataStore中,可以看到也是一個ConcurrentHashMap。
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
dataMap.put(key, value);
}
然后會將其添加到一個阻塞隊列中
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
這里放入阻塞隊列中,說明肯定有異步線程去單獨消費。這樣做的可以提升整體服務注冊的性能,并且可以避免并發(fā)加鎖的情況,因為是在一個線程中進行處理:
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
在handel中就對應了具體的處理
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
這里listener.onChange(datumKey, dataStore.get(datumKey).value);就會觸發(fā)前面Service中添加的監(jiān)聽,代碼也就走回了Service中的onChange方法。
4-Serivce.onChange方法
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//更新實例信息,并且通知訂閱者服務變化的信息
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
這個方法主要做的就是將Service中的Cluster和Instance數(shù)據(jù)進行更新(就是前面圖中數(shù)據(jù)模型對應的集群和實例),并且通知訂閱者。
整個更新邏輯都在updateIPs方法中,更新的方式使用了copy-on-write的思想。
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
//新的ipMap
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG.warn(
"cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
//更新到真正的clusterMap中
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
最終是直接將Instance數(shù)據(jù)進行了替換,使用這種方式可以解決讀寫并發(fā)互斥的情況。
private Set<Instance> ephemeralInstances = new HashSet<>(); ephemeralInstances = toUpdateInstances;
最后通過PushService來通知訂閱者,底層是基于UDP的推送。
3-走回DEMO
到此我們在回到demo中,大概就知道naming.subscribe方法中的onEvent方法是如何被調(diào)用的了??隙ㄊ墙邮盏搅藀ushService的通知,然后進行回調(diào)了所有的subscribe。這塊代碼大家可以自行去查看。
4-總結和說明
上面的代碼其實基本都是在1.x的版本中已經(jīng)存在和使用的,在2.x的版本中Nacos服務端注冊提供了v2版本的實現(xiàn)如果是InstanceControllerV2。
也提供了v2版本的實現(xiàn),大家也可以去學習:
instanceServiceV2.registerInstance(namespaceId, serviceName, instance);
并且這只是Nacos的服務發(fā)現(xiàn)的源碼的一小部門,只是入門和熟悉了一下Nacos的源碼。還有很多功能沒有涉及到,如健康檢查、Distro協(xié)議的設計,是如何保證最終一致性的。這些在后面有時間的時候也會做一些記錄和分享。
以上就是Nacos源碼之注冊中心的實現(xiàn)詳解的詳細內(nèi)容,更多關于Nacos注冊中心源碼的資料請關注腳本之家其它相關文章!
相關文章
spring和quartz整合,并簡單調(diào)用(實例講解)
下面小編就為大家?guī)硪黄猻pring和quartz整合,并簡單調(diào)用(實例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07
Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎
內(nèi)存泄漏是指無用對象持續(xù)占有內(nèi)存或無用對象的內(nèi)存得不到及時釋放,從而造成內(nèi)存空間的浪費稱為內(nèi)存泄漏。長生命周期的對象持有短生命周期對象的引用就很可能發(fā)生內(nèi)存泄漏,盡管短生命周期對象已經(jīng)不再需要,但是因為長生命周期持有它的引用而導致不能被回收2021-09-09
FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式)
這篇文章主要介紹了FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
Java代碼性能測試實戰(zhàn)之ContiPerf安裝使用
這篇文章主要為大家介紹了Java代碼性能測試實戰(zhàn)之ContiPerf安裝使用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-06-06
Mybatis通過Mapper代理連接數(shù)據(jù)庫的方法
這篇文章主要介紹了Mybatis通過Mapper代理連接數(shù)據(jù)庫的方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-11-11

