nacos注冊(cè)中心單節(jié)點(diǎn)ap架構(gòu)源碼解析(最新推薦)
一、注冊(cè)流程
單nacos節(jié)點(diǎn)流程圖如下:
流程圖可以知,Nacos注冊(cè)流程包括客戶端的服務(wù)注冊(cè)、服務(wù)實(shí)例列表拉取、定時(shí)心跳任務(wù);以及服務(wù)端的定時(shí)檢查服務(wù)實(shí)例任務(wù)、服務(wù)實(shí)例更新推送5個(gè)功能。
服務(wù)注冊(cè):當(dāng)客戶端啟動(dòng)的時(shí)候會(huì)根據(jù)當(dāng)前微服務(wù)的配置信息把微服務(wù)注冊(cè)到nacos服務(wù)端。
服務(wù)實(shí)例列表拉?。寒?dāng)客戶端啟動(dòng)的時(shí)候從nacos服務(wù)端獲取當(dāng)前服務(wù)的名稱已經(jīng)注冊(cè)的實(shí)例數(shù)據(jù),并把這些實(shí)例數(shù)據(jù)緩存在客戶端的serviceInfoMap 對(duì)象中。
定時(shí)心跳任務(wù):當(dāng)客戶端向nacos服務(wù)注冊(cè)臨時(shí)實(shí)例對(duì)象的時(shí)候,會(huì)創(chuàng)建一個(gè)延期的任務(wù)去往服務(wù)端發(fā)送心跳信息。如果發(fā)送心跳信息成功,則又會(huì)創(chuàng)建一個(gè)延期任務(wù)往服務(wù)端注冊(cè)心跳信息,一直重復(fù)該邏輯。nacos服務(wù)端接收到客戶端的心跳信息就是更新客戶端實(shí)例的最后心跳時(shí)間。該時(shí)間用來判斷實(shí)例是否健康和是否需要?jiǎng)h除。
定時(shí)檢查服務(wù)實(shí)例任務(wù):nacos服務(wù)端在創(chuàng)建空服務(wù)對(duì)象的時(shí)候會(huì)通過線程池來定時(shí)執(zhí)行檢查服務(wù),其主要邏輯為判斷當(dāng)前時(shí)間和最后心跳時(shí)間之差是否大于健康超時(shí)時(shí)間和刪除實(shí)例超時(shí)時(shí)間,如果大于,則更新實(shí)例的健康狀態(tài)和刪除當(dāng)前實(shí)例。定時(shí)執(zhí)行的規(guī)則為5秒之后執(zhí)行檢查,并且每次執(zhí)行完檢查之后,5秒之后再次執(zhí)行檢查。
服務(wù)實(shí)例更新推送:當(dāng)有客戶端更新實(shí)例對(duì)象時(shí),服務(wù)端會(huì)先獲取該客戶端的服務(wù)名稱下所有已經(jīng)注冊(cè)的客戶端實(shí)例,并會(huì)針每一個(gè)客戶端發(fā)送一個(gè)更新serviceinfo的udp消息,客戶端監(jiān)聽收到nacos服務(wù)端發(fā)送的udp數(shù)據(jù)后進(jìn)行本地緩存的更新。
二、客戶端
一、服務(wù)注冊(cè)
根據(jù)spring-cloud-starter-alibaba-nacos-discovery的spring.factories文件,找到服務(wù)注冊(cè)啟動(dòng)配置類。
spring.factories文件內(nèi)容為如下,
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\ com.alibaba.cloud.nacos.NacosServiceAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
根據(jù)名稱判斷可以得出 NacosServiceRegistryAutoConfiguration 為服務(wù)注冊(cè)啟動(dòng)配置類,源碼如下
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }
關(guān)鍵類 NacosAutoServiceRegistration 的類圖結(jié)構(gòu)如下
上圖可知,NacosAutoServiceRegistration 實(shí)現(xiàn)了 ApplicationListener接口,該監(jiān)聽器會(huì)在SpringBoot啟動(dòng)的時(shí)候會(huì)自動(dòng)調(diào)用 onApplicationEvent方法,onApplicationEvent具體實(shí)現(xiàn)方法如下
public void onApplicationEvent(WebServerInitializedEvent event) { this.bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) { this.port.compareAndSet(0, event.getWebServer().getPort()); // 具體的啟動(dòng)方法 this.start(); } }
具體的啟動(dòng)方法this.start();方法的代碼如下,
public void start() { if (!this.isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } } else { if (!this.running.get()) { this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration())); // 關(guān)鍵邏輯 this.register(); if (this.shouldRegisterManagement()) { this.registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration())); this.running.compareAndSet(false, true); } }
關(guān)鍵邏輯為this.register();方法代碼如下
protected void register() { if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) { log.debug("Registration disabled."); return; } if (this.registration.getPort() < 0) { this.registration.setPort(getPort().get()); } super.register(); }
關(guān)鍵邏輯為super.register();方法代碼如下,
protected void register() { this.serviceRegistry.register(this.getRegistration()); }
關(guān)鍵邏輯為this.serviceRegistry.register方法代碼如下,
@Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } // 根據(jù)配置屬性構(gòu)建NamingService對(duì)象 NamingService namingService = namingService(); // 獲取服務(wù)名,默認(rèn)為 ${spring.application.name} String serviceId = registration.getServiceId(); // 獲取組名 ,默認(rèn)為 DEFAULT_GROUP String group = nacosDiscoveryProperties.getGroup(); // 創(chuàng)建注冊(cè)實(shí)例 Instance instance = getNacosInstanceFromRegistration(registration); try { // 發(fā)起注冊(cè) namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); } }
先通過getNacosInstanceFromRegistration方法創(chuàng)建實(shí)例對(duì)象,getNacosInstanceFromRegistration代碼如下,
private Instance getNacosInstanceFromRegistration(Registration registration) { Instance instance = new Instance(); // 獲取服務(wù)ip instance.setIp(registration.getHost()); // 獲取服務(wù) instance.setPort(registration.getPort()); // 獲取權(quán)重 instance.setWeight(nacosDiscoveryProperties.getWeight()); // 獲取集群名稱 instance.setClusterName(nacosDiscoveryProperties.getClusterName()); instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled()); // 獲取元數(shù)據(jù) instance.setMetadata(registration.getMetadata()); // 獲取是否為臨時(shí)實(shí)例 instance.setEphemeral(nacosDiscoveryProperties.isEphemeral()); return instance; }
然后通過namingService.registerInstance方法發(fā)起注冊(cè),registerInstance方法的代碼如下,
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 檢查 實(shí)例是否合法 // heart beat timeout must(默認(rèn)15秒) < heart beat interval (默認(rèn)5秒)拋異常 // ip delete timeout must(默認(rèn)30 秒) < heart beat interval(默認(rèn)5秒)拋異常 NamingUtils.checkInstanceIsLegal(instance); // 構(gòu)建 groupName@@serviceName String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 如果是臨時(shí)實(shí)例,則創(chuàng)建心跳信息,定時(shí)給nacos服務(wù)發(fā)送 if (instance.isEphemeral()) { BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance); this.beatReactor.addBeatInfo(groupedServiceName, beatInfo); } // 向 nacos-service 注冊(cè)實(shí)例 this.serverProxy.registerService(groupedServiceName, groupName, instance); }
先檢查實(shí)例是否合法,然后構(gòu)建服務(wù)名稱,規(guī)則為groupName@@serviceName。通過this.serverProxy.registerService方法向 nacos-service 注冊(cè)實(shí)例,代碼如下,
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance); final Map<String, String> params = new HashMap<String, String>(16); //設(shè)置 namespaceId params.put(CommonParams.NAMESPACE_ID, namespaceId); //設(shè)置 serviceName params.put(CommonParams.SERVICE_NAME, serviceName); //設(shè)置 groupName params.put(CommonParams.GROUP_NAME, groupName); //設(shè)置 clusterName params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // 調(diào)用 nacos-service 的nacosUrlInstance接口注冊(cè)實(shí)例 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
通過向reqApi方法向nacos服務(wù)端注冊(cè)當(dāng)前實(shí)例數(shù)據(jù),其實(shí)就是向 ${spring.cloud.nacos.discovery.server-addr}/nacos/v1/ns/instance 發(fā)送POST請(qǐng)求。該請(qǐng)求地址對(duì)應(yīng)的nacos服務(wù)端的源碼的naming工程中InstanceController的register方法,代碼如下,
public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //根據(jù)請(qǐng)求構(gòu)建 Instance 對(duì)象 final Instance instance = parseInstance(request); //注冊(cè) Instance 對(duì)象,serviceManager對(duì)象中保存了所有的服務(wù)對(duì)象。 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
先根據(jù)請(qǐng)求對(duì)象構(gòu)建Instance對(duì)象,然后通過serviceManager.registerInstance方法用來注冊(cè)Instance對(duì)象,registerInstance代碼如下
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 如果 namespaceId 為 key 的數(shù)據(jù)為空,則創(chuàng)建 service ,并初始化service createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 獲取 service 對(duì)象 Service service = getService(namespaceId, serviceName); // 如果 service為空 則報(bào)錯(cuò) if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 添加實(shí)例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
如果 namespaceId為key的數(shù)據(jù)為空,則創(chuàng)建 service,并初始化service。然后調(diào)用addInstance添加實(shí)例對(duì)象,addInstance方法代碼如下,
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 根據(jù) 命名空間 和 服務(wù)名稱 構(gòu)建 key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 獲取 service Service service = getService(namespaceId, serviceName); // 同步鎖 synchronized (service) { // 獲取服務(wù)下的實(shí)例集合(服務(wù)已有 + 新增的實(shí)例) List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 根據(jù)KEY添加服務(wù)的實(shí)例 consistencyService.put(key, instances); } }
addIpAddresses方法中會(huì)調(diào)用updateIpAddresses方法,且action為 add。該方法根據(jù)action的值來獲取該服務(wù)下的最新實(shí)例集合(新增實(shí)例或刪除實(shí)例加上目前服務(wù)已有的實(shí)例數(shù)據(jù)合集)。如果action為add表示新增,則方法最后返回的集合對(duì)象中會(huì)把該服務(wù)中已有的實(shí)例集合加上新增的實(shí)例集合數(shù)據(jù)一起返回 ;如果action為 remove表示刪除,則方法最后返回的集合對(duì)象中會(huì)把該服務(wù)中已有的實(shí)例集合刪除掉需要?jiǎng)h除的實(shí)例集合數(shù)據(jù)。后面通過調(diào)用consistencyService.put(key, instances)方法來把updateIpAddresses方法返回的值直接添加consistencyService的實(shí)例中。updateIpAddresses方法的代碼如下,
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { // 從本地緩存中獲取服務(wù)的實(shí)例數(shù)據(jù) Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); // 獲取 當(dāng)前服務(wù)下所有的 實(shí)例 List<Instance> currentIPs = service.allIPs(ephemeral); // 創(chuàng)建當(dāng)前實(shí)例數(shù)據(jù)map Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); // 創(chuàng)建 當(dāng)前實(shí)例Id set Set<String> currentInstanceIds = Sets.newHashSet(); // 遍歷當(dāng)前服務(wù)的所有實(shí)例,添加到 創(chuàng)建當(dāng)前實(shí)例數(shù)據(jù) map 和 當(dāng)前實(shí)例Id集合 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } // 構(gòu)造 實(shí)例集合對(duì)象的 map Map<String, Instance> instanceMap; // 如果有緩存數(shù)據(jù) if (datum != null && null != datum.value) { // 從本地緩存中以及當(dāng)前服務(wù)的內(nèi)存數(shù)據(jù)獲取最新服務(wù)的實(shí)例數(shù)據(jù) instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } // 如果沒有緩存數(shù)據(jù) else { // 創(chuàng)建 instanceMap instanceMap = new HashMap<>(ips.length); } // 遍歷參數(shù)傳過來的實(shí)例對(duì)象 for (Instance instance : ips) { // 如果 service 不包括 實(shí)例的 ClusterName 則創(chuàng)建 實(shí)例 Cluster,并初始化 if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } // 如果是刪除,則從 instanceMap 中 刪除 該實(shí)例 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } // 如果是新增 else { //獲取已存在的 實(shí)例 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { // 生成 實(shí)例 id instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } // instanceMap 添加instance實(shí)例 instanceMap.put(instance.getDatumKey(), instance); } } // 如果集合小于0 ,并且是新增操作則拋異常 if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } // 返回 服務(wù)中最新的實(shí)例數(shù)據(jù) return new CopyOnWriteArrayList<>(instanceMap.values()); }
通過updateIpAddresses方法拿到需要更新的實(shí)例集合對(duì)象后,再通過consistencyService.put(key, instances)把拿到的實(shí)例集合對(duì)象添加到實(shí)現(xiàn)了PersistentConsistencyServiceDelegateImpl或者EphemeralConsistencyService接口的實(shí)例對(duì)象中,consistencyService.put(key, instances)的源碼如下,
@Override public void put(String key, Record value) throws NacosException { // 根據(jù)key獲取具體的 consistencyService ,并且向其中添加具體的 key 和 value mapConsistencyService(key).put(key, value); }
根據(jù)key獲取具體的 consistencyService ,并且向其中添加具體的 key 和 value。consistencyService中根據(jù)key獲取集群的實(shí)例對(duì)象(臨時(shí)服務(wù)對(duì)象EphemeralConsistencyService和持久服務(wù)對(duì)象PersistentConsistencyServiceDelegateImpl)
private ConsistencyService mapConsistencyService(String key) { // 根據(jù)key返回具體的服務(wù)對(duì)象 return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
如果是注冊(cè)的臨時(shí)實(shí)例節(jié)點(diǎn),這里取到的是實(shí)現(xiàn)了ephemeralConsistencyService接口的DistroConsistencyServiceImpl 對(duì)象,它的put源碼如下:
@Override public void put(String key, Record value) throws NacosException { // 添加key 和 value onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
通過onPut方法添加key 和 value,opPut方法的代碼如下,
public void onPut(String key, Record value) { // 如果是臨時(shí)節(jié)點(diǎn)實(shí)例,則創(chuàng)建 Datum 并保存在 dataStore 中 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } // 如果 監(jiān)聽對(duì)象不包括 key 則返回 if (!listeners.containsKey(key)) { return; } // 向notifier對(duì)象添加通知任務(wù) notifier.addTask(key, DataOperation.CHANGE); }
如果是臨時(shí)實(shí)例節(jié)點(diǎn),則創(chuàng)建 Datum 并保存在 dataStore 中,然后通過notifier.addTask用來向notifier對(duì)象添加通知任務(wù),且操作類型為DataOperation.CHANGE,addTask方法的代碼如下:
public void addTask(String datumKey, DataOperation action) { // 如果services包括了當(dāng)前的 datumKey ,并且是修改操作 則直接返回 if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } // 如果是修改操作,則向 services 添加 datumKey if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } // 向 tasks中添加 Pair 對(duì)象 tasks.offer(Pair.with(datumKey, action)); }
以上代碼的tasks是用來存放具體實(shí)例key和動(dòng)作類型的對(duì)象,它是一個(gè)ArrayBlockingQueue對(duì)象,DistroConsistencyServiceImpl 對(duì)象的init方法代碼如下,
@PostConstruct public void init() { GlobalExecutor.submitDistroNotifyTask(notifier); }
根據(jù)以上代碼可知,在DistroConsistencyServiceImpl 實(shí)例對(duì)象初始化之后會(huì)往GlobalExecutor線程池對(duì)象中添加了一個(gè)notifier對(duì)象。notifier對(duì)象為一個(gè)實(shí)現(xiàn)了Runnable 的實(shí)例。上面的代碼會(huì)執(zhí)行notifier對(duì)象的run方法,notifier的run方法代碼如下:
public void run() { Loggers.DISTRO.info("distro notifier started"); // 死循環(huán)遍歷 for (; ; ) { try { // 獲取 tasks的數(shù)據(jù),如果沒有數(shù)據(jù)會(huì)阻塞當(dāng)前線程,直到tasks有數(shù)據(jù)為止。 Pair<String, DataOperation> pair = tasks.take(); // 處理數(shù)據(jù) handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
上面是一個(gè)死循環(huán),tasks.take()是一個(gè)阻塞式獲取數(shù)據(jù)的方法,如果tasks沒有數(shù)據(jù)則會(huì)阻塞當(dāng)前線程直到tasks.take()拿到數(shù)據(jù),拿到數(shù)據(jù)之后會(huì)調(diào)用handle方法處理,handle代碼如下,
private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); // 先從 services 中刪除 key services.remove(datumKey); int count = 0; // 根據(jù) key 獲取 服務(wù)對(duì)象數(shù)據(jù) ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey); if (recordListeners == null) { Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey); return; } for (RecordListener listener : recordListeners) { count++; try { // 如果是新增 if (action == DataOperation.CHANGE) { Datum datum = dataStore.get(datumKey); if (datum != null) { // 更新 serivce 的實(shí)例數(shù)據(jù) listener.onChange(datumKey, datum.value); } else { Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey); } continue; } // 如果是刪除 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }
根據(jù)action 為 DataOperation.CHANGE,代碼中執(zhí)行的代碼分支為listener.onChange(datumKey, datum.value),該方法的邏輯為修改服務(wù)的實(shí)例數(shù)據(jù),源碼如下
public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } // 更新 service 的 實(shí)例集合 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
以上代碼先遍歷所有的實(shí)例數(shù)據(jù)設(shè)置權(quán)值,再通過updateIPs方法更新服務(wù)實(shí)例,updateIPs方法的代碼如下:
public void updateIPs(Collection<Instance> instances, boolean ephemeral) { // 根據(jù) clusterMap 創(chuàng)建 ipMap對(duì)象 Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); // 根據(jù) clusterMap 初始化 ipMap對(duì)象 for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } // 遍歷最新的實(shí)例集合數(shù)據(jù) for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } // 如果集群名稱為null ,則設(shè)置默認(rèn)的集群名稱 DEFAULT if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } // 如果當(dāng)前 service 的clusterMap不包括 實(shí)例的 集群名稱,則需要?jiǎng)?chuàng)建新的集群對(duì)象 if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } // 如果當(dāng)前 ipMap 不包括 當(dāng)前實(shí)例的 集群名稱,則需要?jiǎng)?chuàng)建新的集群對(duì)象 List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } // 給當(dāng)前的 集群對(duì)象賦值 實(shí)例數(shù)據(jù)。 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } // 遍歷 ipMap對(duì)象,給 clusterMap 替換最新的 entryIPs for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); // 給 clusterMap 替換最新的 entryIPs clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); // 發(fā)布 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }
以上代碼先根據(jù)當(dāng)前服務(wù)下的集群信息構(gòu)造構(gòu)造ipMap對(duì)象,然后遍歷最新的實(shí)例集合數(shù)據(jù)更新ipMap對(duì)象,最后循環(huán)調(diào)用clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral)方法來更新當(dāng)前集群中的實(shí)例列表數(shù)據(jù)。updateIps方法代碼如下:
public void updateIps(List<Instance> ips, boolean ephemeral) { // 獲取 本集群中的 實(shí)例集合 Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; // 根據(jù)old的實(shí)例數(shù)據(jù) 構(gòu)建 hashmap HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); // 根據(jù)實(shí)例的 key 添加到 oldIpMap中 for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } // 獲取更新的 實(shí)例數(shù)據(jù) List List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } // 獲取新增的 實(shí)例數(shù)據(jù) List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } // 獲取刪除的 實(shí)例數(shù)據(jù) List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } // 根據(jù)傳進(jìn)來的 實(shí)例集合 創(chuàng)建需要更新的實(shí)例set toUpdateInstances = new HashSet<>(ips); // 直接替換 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; }
以上代碼就是更新cluster對(duì)象下的實(shí)例數(shù)據(jù)邏輯,根據(jù)代碼可知在cluster對(duì)象中更新實(shí)例數(shù)據(jù)就是拿傳進(jìn)來的實(shí)例列表創(chuàng)建set集合直接替換的。
二、服務(wù)實(shí)例列表拉取
客戶端程序啟動(dòng)之后,會(huì)執(zhí)行com.alibaba.cloud.nacos.discovery.NacosWatch類的start()方法,此方法中會(huì)執(zhí)行以下語句,
namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener);
此方法用來獲取當(dāng)前服務(wù)的實(shí)例數(shù)據(jù),subscribe方法代碼如下,
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { // 獲取服務(wù)列表數(shù)據(jù) hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener); }
通過hostReactor.subscribe方法獲取服務(wù)列表數(shù)據(jù),subscribe方法的代碼如下,
public void subscribe(String serviceName, String clusters, EventListener eventListener) { notifier.registerListener(serviceName, clusters, eventListener); // 獲取服務(wù)列表數(shù)據(jù) getServiceInfo(serviceName, clusters); }
通過getServiceInfo方法獲取服務(wù)列表數(shù)據(jù),getServiceInfo的代碼如下:
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 根據(jù)服務(wù)名稱和集群名稱獲取本地的服務(wù)列表數(shù)據(jù) ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); // 如果本地服務(wù)實(shí)例數(shù)據(jù)為null,則去獲取最新的服務(wù)實(shí)例列表 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey());
以上代碼可知,會(huì)根據(jù)服務(wù)名稱和clusters名稱獲取本地緩存serviceInfoMap對(duì)象中的服務(wù)列表數(shù)據(jù)。如果本地服務(wù)實(shí)例數(shù)據(jù)為null,則通過updateServiceNow方法去nacos服務(wù)端獲取最新的服務(wù)實(shí)例列表。updateServiceNow方法代碼如下:
try { // 更新本地服務(wù)方法 updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); }
updateService的代碼如下:
public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 調(diào)用服務(wù)代理類獲取服務(wù)實(shí)例列表,pushReceiver.getUdpPort()會(huì)隨機(jī)生成一個(gè)udp端口 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 如果 result不為空,則向本地緩存 serviceInfoMap 添加服務(wù)實(shí)例列表 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
通過調(diào)用服務(wù)代理類serverProxy的queryList方法獲取服務(wù)實(shí)例列表,pushReceiver.getUdpPort()會(huì)獲pushReceiver的udp端口,pushReceiver對(duì)象是一個(gè)udp數(shù)據(jù)接收類,用來接收nacos服務(wù)器發(fā)送的udp數(shù)據(jù),比如服務(wù)實(shí)例更新的消息。serverProxy.query方法的代碼如下:
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { // 構(gòu)造請(qǐng)求參數(shù) final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); // 客戶端的upd端口,服務(wù)端回調(diào)客戶端udp端口會(huì)用到 params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); // 向nacos服務(wù)器獲取服務(wù)列表數(shù)據(jù),并返回 return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); }
在構(gòu)造的請(qǐng)求參數(shù)中包括了客戶端的udpPort,該參數(shù)在服務(wù)端回調(diào)接口會(huì)用到。reqApi方法其實(shí)就向nacos服務(wù)器的/nacos/v1/ns/instance/list接口發(fā)送了請(qǐng)求消息,該接口對(duì)應(yīng)的nacos服務(wù)端的源碼的naming工程中InstanceController的list方法,代碼如下,
@GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public ObjectNode list(HttpServletRequest request) throws Exception { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY); int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0")); String env = WebUtils.optional(request, "env", StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); String app = WebUtils.optional(request, "app", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); // 獲取實(shí)例列表數(shù)據(jù) return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly); }
以上代碼先構(gòu)造相關(guān)參數(shù)信息,然后通過doSrvIpxt方法來獲取實(shí)例列表數(shù)據(jù),doSrvIpxt代碼如下:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 根據(jù)命名空間id和服務(wù)名稱獲取服務(wù) Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { // 如果端口大于0 ,并且是支持的客戶端 if (udpPort > 0 && pushService.canEnablePush(agent)) { // 添加 PushClient 對(duì)象 pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } // 如果服務(wù)對(duì)象為 null ,則構(gòu)造數(shù)據(jù)返回 if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis); result.replace("hosts", JacksonUtils.createEmptyArrayNode()); return result; } // 檢查服務(wù)是否可用 checkIfDisabled(service); List<Instance> srvedIPs; // 根據(jù)集群列表獲取具體服務(wù)下面的實(shí)例列表 srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); // filter ips using selector: if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } // 如果實(shí)例數(shù)據(jù)為空,則構(gòu)造數(shù)據(jù)返回 if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.set("hosts", JacksonUtils.createEmptyArrayNode()); result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; } Map<Boolean, List<Instance>> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); // 構(gòu)造健康和不健康的實(shí)例數(shù)據(jù) for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } if (isCheck) { result.put("reachProtectThreshold", false); } double threshold = service.getProtectThreshold(); if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); // 構(gòu)造返回的實(shí)例列表對(duì)象 for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) { List<Instance> ips = entry.getValue(); if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // remove disabled instance: if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put("ip", instance.getIp()); ipObj.put("port", instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put("valid", entry.getKey()); ipObj.put("healthy", entry.getKey()); ipObj.put("marked", instance.isMarked()); ipObj.put("instanceId", instance.getInstanceId()); ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put("enabled", instance.isEnabled()); ipObj.put("weight", instance.getWeight()); ipObj.put("clusterName", instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { ipObj.put("serviceName", instance.getServiceName()); } else { ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put("ephemeral", instance.isEphemeral()); hosts.add(ipObj); } } result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; }
以上代碼其實(shí)就是根據(jù)命名空間id和服務(wù)名稱獲取服務(wù)對(duì)象,然后根據(jù)不同情況構(gòu)造返回對(duì)象,正常情況會(huì)構(gòu)造一個(gè)ServiceInfo類型的ObjectNode對(duì)象,整個(gè)具體過程請(qǐng)看上面的代碼注釋。最后返回構(gòu)造的對(duì)象。
客戶端中拿到請(qǐng)求/nacos/v1/ns/instance/list接口的返回值之后會(huì)轉(zhuǎn)成一個(gè)ServiceInfo對(duì)象,并且把該對(duì)象賦值給本地的緩存對(duì)象serviceInfoMap,processServiceJson關(guān)鍵代碼如下:
// 將返回值轉(zhuǎn)換成 ServiceInfo 類型的對(duì)象 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); // 把該對(duì)象添加到本地緩存中 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
三、定時(shí)心跳任務(wù)
在客戶端向nacos服務(wù)端注冊(cè)服務(wù)的過程中,會(huì)調(diào)用com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)方法,在該代碼中有個(gè)判斷邏輯,如果是臨時(shí)實(shí)例則會(huì)創(chuàng)建一個(gè)BeatInfo對(duì)象添加到beatReactor中。代碼如下:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 檢查 實(shí)例是否合法 // heart beat timeout must(默認(rèn)15秒) > heart beat interval (默認(rèn)5秒) // ip delete timeout must(默認(rèn)30 秒) > heart beat interval NamingUtils.checkInstanceIsLegal(instance); // 構(gòu)建 groupName@@serviceName String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 如果是臨時(shí)實(shí)例,則創(chuàng)建心跳信息,定時(shí)給nacos服務(wù)發(fā)送 if (instance.isEphemeral()) { // 構(gòu)造心跳信息 BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance); // 執(zhí)行心跳定時(shí)任務(wù) this.beatReactor.addBeatInfo(groupedServiceName, beatInfo); } // 向 nacos-service 注冊(cè)實(shí)例 this.serverProxy.registerService(groupedServiceName, groupName, instance); }
beatInfo對(duì)象用來存儲(chǔ)心跳信息,buildBeatInfo方法代碼如下,
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(groupedServiceName); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; }
beatReactor中有一個(gè)ScheduledExecutorService類型的executorService實(shí)例用來執(zhí)行定時(shí)的線程,addBeatInfo的代碼如下,
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); // 線程池添加定時(shí)任務(wù),默認(rèn) 5 秒鐘之后 執(zhí)行 BeatTask executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
根據(jù)上面的executorService.schedule()代碼可知,BeatTask線程在固定的秒數(shù)之后執(zhí)行,而BeatTask實(shí)現(xiàn)了Runnable接口,即執(zhí)行BeatTask的run方法 。BeatTask的run方法代碼如下,
public void run() { // 如果 beatInfo 設(shè)置了 stop ,則停止 if (beatInfo.isStopped()) { return; } // 獲取下一次延期執(zhí)行的時(shí)間 long nextTime = beatInfo.getPeriod(); try { // 向服務(wù)端發(fā)送心跳信息 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } // 重新提交定時(shí)任務(wù),延期發(fā)送心跳信息 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }
以上代碼先獲取下一次延期執(zhí)行的時(shí)間,再通過serverProxy.sendBeat()向服務(wù)端發(fā)送心跳信息,最后重新提交定時(shí)任務(wù),延期發(fā)送心跳信息,serverProxy.sendBeat()代碼如下,
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); // 向nacos服務(wù)器發(fā)送心跳數(shù)據(jù),并返回 String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
reqApi方法其實(shí)就向nacos服務(wù)器端的/nacos/v1/ns/instance/beat接口發(fā)送了put類型的請(qǐng)求消息,該接口對(duì)應(yīng)的nacos服務(wù)端的源碼的naming工程中InstanceController的beat方法,beat方法的代碼如下,
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); // 獲取心跳信息 String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; // 如果 beat 數(shù)據(jù)不為空,則構(gòu)造 RsInfo 類型的 clientBeat 實(shí)例 if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } // 獲取集群名稱 String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); // 獲取 實(shí)例的 ip String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); // 獲取 實(shí)例的 端口 int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); // 如果 clientBeat 不為空,則設(shè)置 相關(guān)的信息 if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } // 獲取 namespaceId String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 獲取 serviceName String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); // 檢查 ServiceName 的格式 NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); // 根據(jù) 參數(shù) 獲取 具體的實(shí)例 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果 實(shí)例為 空 if (instance == null) { // 如果 clientBeat 為空 則構(gòu)造參數(shù) code 為 20404的結(jié)果返回 if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); // 如果 clientBeat 不為空 則構(gòu)造 instance 數(shù)據(jù),向 serviceManager 注冊(cè)實(shí)例。 instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } // 根據(jù)服務(wù)名稱獲取 服務(wù) Service service = serviceManager.getService(namespaceId, serviceName); // 如果服務(wù)為空 ,則拋異常 if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } // 如果 clientBeat 為空,則創(chuàng)建該對(duì)象 if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // 處理客戶端的 心跳對(duì)象 service.processClientBeat(clientBeat); // result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
先獲取心跳信息,然后構(gòu)造RsInfo類型的clientBeat實(shí)例。然后通過service.processClientBeat(clientBeat)方法處理客戶端的心跳對(duì)象,processClientBeat方法的代碼如下,
public void processClientBeat(final RsInfo rsInfo) { // 構(gòu)造 ClientBeatProcessor 對(duì)象 ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); // 定時(shí)執(zhí)行 ClientBeatProcessor 對(duì)象,這里是立即執(zhí)行,延期時(shí)間為 0 HealthCheckReactor.scheduleNow(clientBeatProcessor); }
ClientBeatProcessor是一個(gè)實(shí)現(xiàn)了Runnable的類,HealthCheckReactor是一個(gè)定時(shí)任務(wù)線程池,scheduleNow方法表示立即執(zhí)行clientBeatProcessor對(duì)象的run方法,clientBeatProcessor.run方法代碼如下,
public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } // 獲取ip String ip = rsInfo.getIp(); // 獲取 集群名稱 String clusterName = rsInfo.getCluster(); // 獲取端口 int port = rsInfo.getPort(); // 從服務(wù)對(duì)象中獲取集群對(duì)象 Cluster cluster = service.getClusterMap().get(clusterName); // 從集群對(duì)象中獲取所有的臨時(shí)實(shí)例列表 List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { // 找到 ip 和端口相同的 實(shí)例數(shù)據(jù) if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 更新 最后心跳時(shí)間 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked() && !instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } }
以上代碼可知,該方法主要用來更新客戶端實(shí)例的最后心跳時(shí)間。
三、服務(wù)端接口
一、定時(shí)檢查服務(wù)實(shí)例任務(wù)
在客戶端注冊(cè)服務(wù)的時(shí)候,會(huì)調(diào)用nacos服務(wù)端的com.alibaba.nacos.naming.controllers.InstanceController#register方法,其中會(huì)調(diào)用createEmptyService方法用來創(chuàng)建空的服務(wù)對(duì)象,最后會(huì)調(diào)用service.init()方法用來初始化服務(wù)對(duì)象,init方法代碼如下
public void init() { // 定時(shí)執(zhí)行 service 的 run 方法 處理超時(shí)的 instance HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
通過調(diào)用HealthCheckReactor.scheduleCheck()方法來定時(shí)執(zhí)行clientBeatCheckTask,scheduleCheck的代碼如下,
public static void scheduleCheck(ClientBeatCheckTask task) { // 5秒之后執(zhí)行 task,并且每次執(zhí)行task完之后,5秒之后再次執(zhí)行 task futureMap.computeIfAbsent(task.taskKey(), k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); }
以上代碼給定時(shí)任務(wù)線程池GlobalExecutor提交了一個(gè)task任務(wù),其中task是一個(gè)實(shí)現(xiàn)了Runable接口的類,線程池每次執(zhí)行的就是ClientBeatCheckTask 的run方法,run方法代碼如下,
public void run() { try { if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } // 獲取該服務(wù)下面的所有 注冊(cè)實(shí)例集合 List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { // 如果 當(dāng)前時(shí)間 減去 實(shí)例的最新心跳時(shí)間 如果大于 實(shí)例配置的心跳超時(shí)時(shí)間(默認(rèn)15秒) // 并且 實(shí)例的健康狀態(tài) true // 則設(shè)置服務(wù)的健康狀態(tài)為 false if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } // 如果 當(dāng)前時(shí)間 減去 實(shí)例的最新心跳時(shí)間 如果大于 實(shí)例配置的刪除超時(shí)時(shí)間(默認(rèn)30秒) // 則會(huì)調(diào)用 deleteIp 刪除方法刪除實(shí)例 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); // 刪除實(shí)例 deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } }
以上代碼就兩個(gè)邏輯,一個(gè)邏輯是判斷當(dāng)前時(shí)間減去實(shí)例的最新心跳時(shí)間是否大于實(shí)例配置的心跳超時(shí)時(shí)間(默認(rèn)15秒),如果大于則設(shè)置實(shí)例的健康狀態(tài)為false;第二個(gè)邏輯是 判斷當(dāng)前時(shí)間減去實(shí)例的最新心跳時(shí)間 是否大于實(shí)例配置的刪除超時(shí)時(shí)間(默認(rèn)30秒),如果大于則調(diào)用deleteIp(instance);刪除該實(shí)例,deleteIp的代碼如下,
NamingProxy.Request request = NamingProxy.Request.newRequest(); request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())) .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()) .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId()); // 構(gòu)造url地址 String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); // delete instance asynchronously: // 向本地服務(wù)器地址發(fā)送刪除請(qǐng)求 HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode()); } } @Override public void onError(Throwable throwable) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: ", instance.toJson(), throwable); } @Override public void onCancel() { } });
HttpClient.asyncHttpDelete方法其實(shí)就是向 ${spring.cloud.nacos.discovery.server-addr}/nacos/v1/ns/instance 發(fā)送Delete請(qǐng)求。該請(qǐng)求地址對(duì)應(yīng)的nacos服務(wù)端的源碼的naming工程中InstanceController的deregister方法,deregister代碼如下,
@CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception { // 從請(qǐng)求參數(shù)中構(gòu)造實(shí)例對(duì)象 Instance instance = getIpAddress(request); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName); return "ok"; } // 刪除實(shí)例數(shù)據(jù) serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); return "ok"; }
removeInstance方法是關(guān)鍵,用來刪除實(shí)例數(shù)據(jù),removeInstance代碼如下,
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 先獲取服務(wù)對(duì)象 Service service = getService(namespaceId, serviceName); // 服務(wù)對(duì)象加鎖 synchronized (service) { // 調(diào)用刪除實(shí)例對(duì)象的方法 removeInstance(namespaceId, serviceName, ephemeral, service, ips); } }
removeInstance方法的代碼如下,
private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException { // 構(gòu)造 key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 獲取服務(wù)下的實(shí)例集合(服務(wù)已有 減去 需要?jiǎng)h除實(shí)例) List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 根據(jù)KEY更新服務(wù)的實(shí)例 consistencyService.put(key, instances); }
substractIpAddresses方法用來獲取該服務(wù)下已經(jīng)減去需要?jiǎng)h除實(shí)例的實(shí)例數(shù)據(jù),其中調(diào)用的updateIpAddresses方法,action值為 remove。removeInstance方法的整體邏輯為通過updateIpAddresses方法拿到該服務(wù)中去掉刪除實(shí)例之后的實(shí)例集合對(duì)象,并把該實(shí)例集合對(duì)象添加到consistencyService對(duì)象中,consistencyService.put(key, instances)里面的邏輯和客戶端注冊(cè)服務(wù)一樣的邏輯。updateIpAddresses方法和consistencyService.put方法已經(jīng)在客戶端服務(wù)注冊(cè)章節(jié)已經(jīng)講了,這里不再講解。
二、服務(wù)實(shí)例更新推送
在客戶端更新服務(wù)實(shí)例的過程中nacos服務(wù)端會(huì)調(diào)用com.alibaba.nacos.naming.core.Service#updateIPs()方法(客戶端注冊(cè)服務(wù)的過程請(qǐng)看客戶端服務(wù)注冊(cè)章節(jié)),在該方法中會(huì)調(diào)用getPushService().serviceChanged(this)來發(fā)布當(dāng)前服務(wù)的修改事件,即會(huì)發(fā)布一個(gè)事件用來通知已經(jīng)和nacos服務(wù)端通信過的客戶端更新客戶端本地的服務(wù)信息。serviceChanged 的代碼如下,
public void serviceChanged(Service service) { // merge some change events to reduce the push frequency: if (futureMap .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { return; } // 發(fā)布服務(wù)修改事件 this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); }
applicationContext.publishEvent會(huì)觸發(fā)一個(gè)ServiceChangeEvent事件,其實(shí)就是觸發(fā)com.alibaba.nacos.naming.push.PushService#onApplicationEvent方法,其中邏輯為先根據(jù)命名空間id和服務(wù)名稱獲取所有的客戶端map對(duì)象,然后遍歷所有客戶端對(duì)象PushClient 構(gòu)造 ackEntry 對(duì)象,最后向具體的客戶端發(fā)送 upd 消息。獲取關(guān)鍵代碼如下,
if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { // 構(gòu)造 ackEntry 對(duì)象 ackEntry = prepareAckEntry(client, prepareHostsQData(client), lastRefTime); // 添加緩存 if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); // 向具體的客戶端發(fā)送 upd 消息 udpPush(ackEntry);
以上代碼prepareHostsQData的邏輯就是獲取該服務(wù)下客戶端所屬服務(wù)的所有實(shí)例數(shù)據(jù),并且構(gòu)造具體的Map<String,Object>對(duì)象,prepareHostsQData代碼如下,
private static Map<String, Object> prepareHostsData(PushClient client) throws Exception { Map<String, Object> cmd = new HashMap<String, Object>(2); cmd.put("type", "dom"); // 獲取客戶端所屬服務(wù)的所有實(shí)例數(shù)據(jù) cmd.put("data", client.getDataSource().getData(client)); return cmd; }
udpPush(ackEntry)里面封裝了發(fā)送udp消息的關(guān)鍵代碼,ackEntry封裝了udp的數(shù)據(jù)信息。
在客戶端獲取服務(wù)實(shí)例列表的時(shí)候,會(huì)生成一個(gè)PushReceiver對(duì)象,該對(duì)象用來監(jiān)聽和接收nacos服務(wù)端發(fā)送的udp數(shù)據(jù)。該對(duì)象實(shí)現(xiàn)了Runnable接口,并且在構(gòu)造方法中把自己提交給了一個(gè)內(nèi)部屬性的線程池對(duì)象。構(gòu)造方法如下,
public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; this.udpSocket = new DatagramSocket(); this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); } }
根據(jù)以上代碼可知,所以在創(chuàng)建PushReceiver對(duì)象之后會(huì)執(zhí)行run方法,run方法的代碼如下,
public void run() { while (!closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; // 構(gòu)造 DatagramPacket 對(duì)象 DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 監(jiān)聽upd數(shù)據(jù) udpSocket.receive(packet); // 構(gòu)造服務(wù)對(duì)象的string 數(shù)據(jù) String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { // 更新本地緩存 serviceInfoMap 的服務(wù)對(duì)象 hostReactor.processServiceJson(pushPacket.data); // send ack to server ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } else if ("dump".equals(pushPacket.type)) { // dump data to server ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + "\"}"; } else { // do nothing send ack only ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error("[NA] error while receiving push data", e); } } }
以上代碼用while一直監(jiān)聽udpSocket的客戶端upd的端口。當(dāng)接收到從nacos服務(wù)端發(fā)送過來的udp數(shù)據(jù)之后會(huì)接著調(diào)用 hostReactor.processServiceJson()方法來更新客戶端本地的serviceInfoMap 的服務(wù)對(duì)象。
到此這篇關(guān)于nacos注冊(cè)中心單節(jié)點(diǎn)ap架構(gòu)源碼解析的文章就介紹到這了,更多相關(guān)nacos注冊(cè)中心ap架構(gòu)源碼內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于SpringBoot多線程@Async的使用體驗(yàn)
這篇文章主要介紹了SpringBoot多線程@Async的使用體驗(yàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12springboot整合nacos的入門Demo及Nacos安裝部署
Nacos?提供了一組簡單易用的特性集,幫助您快速實(shí)現(xiàn)動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、服務(wù)配置、服務(wù)元數(shù)據(jù)及流量管理,Nacos?致力于幫助您發(fā)現(xiàn)、配置和管理微服務(wù),這篇文章主要介紹了springboot整合nacos的入門Demo,需要的朋友可以參考下2024-01-01Java實(shí)現(xiàn)文件和base64流的相互轉(zhuǎn)換功能示例
這篇文章主要介紹了Java實(shí)現(xiàn)文件和base64流的相互轉(zhuǎn)換功能,涉及Java文件讀取及base64 轉(zhuǎn)換相關(guān)操作技巧,需要的朋友可以參考下2018-05-05使用SpringBoot根據(jù)配置注入接口的不同實(shí)現(xiàn)類(代碼演示)
使用springboot開發(fā)時(shí)經(jīng)常用到@Autowired和@Resource進(jìn)行依賴注入,但是當(dāng)我們一個(gè)接口對(duì)應(yīng)多個(gè)不同的實(shí)現(xiàn)類的時(shí)候如果不進(jìn)行一下配置項(xiàng)目啟動(dòng)時(shí)就會(huì)報(bào)錯(cuò),那么怎么根據(jù)不同的需求注入不同的類型呢,感興趣的朋友一起看看吧2022-06-06Springboot整合Redis實(shí)現(xiàn)超賣問題還原和流程分析(分布式鎖)
最近在研究超賣的項(xiàng)目,寫一段簡單正常的超賣邏輯代碼,多個(gè)用戶同時(shí)操作同一段數(shù)據(jù)出現(xiàn)問題,糾結(jié)該如何處理呢?下面小編給大家?guī)砹薙pringboot整合Redis實(shí)現(xiàn)超賣問題還原和流程分析,感興趣的朋友一起看看吧2021-10-10Java 執(zhí)行CMD命令或執(zhí)行BAT批處理方式
這篇文章主要介紹了Java 執(zhí)行CMD命令或執(zhí)行BAT批處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08詳解springboot項(xiàng)目帶Tomcat和不帶Tomcat的兩種打包方式
這篇文章主要介紹了詳解springboot項(xiàng)目帶Tomcat和不帶Tomcat的兩種打包方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09