Eureka源碼閱讀Client啟動(dòng)入口注冊續(xù)約及定時(shí)任務(wù)
引言
本文主要是解析下Spring Cloud
整合Eureka Client
的源碼,這塊代碼比較多,而且都是些簡單代碼,我們稍微看下就行,這就是介紹下Eureka Client初始化過程,不管你Spring Cloud 怎樣封裝,底層還是Eureka Client的內(nèi)容,初始化過程包括下面:
- 去Eureka Server 拉取全量注冊表,
- 創(chuàng)建定時(shí)任務(wù),包括定時(shí)去Eureka Server 上增量拉取注冊表信息,定時(shí)renew (服務(wù)續(xù)約)。
- 服務(wù)注冊
1.環(huán)境
- eureka版本:1.10.11
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
測試代碼:github.com/hsfxuebao/s…
2. Spring Cloud整合Eureka Client 啟動(dòng)入口
要看Spring Cloud 怎樣整合 Eureka Client ,就需要找到它們的自動(dòng)裝配配置類 在spring-cloud-starter-netflix-eureka-client
依賴的pom文件中,在依賴pom文件中有spring-cloud-netflix-eureka-client
, 在這個(gè)里面能夠找到spring.factories
文件,這個(gè)文件是spring spi
文件。
核心就是EurekaClientAutoConfiguration 這個(gè)自動(dòng)裝配類:
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { }
2.1 封裝配置文件的類
2.1.1 EurekaClientConfigBean
@Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { return new EurekaClientConfigBean(); }
其讀取的是eureka.client
前輟的配置信息。這個(gè)類已經(jīng)被@ConfigurationProperties
注解了,所以這些 配置信息可以被自動(dòng)封裝并注冊到容器。
2.1.2 EurekaInstanceConfigBean
@Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { }
其讀取的是eureka.instance
的屬性值。這個(gè)類也已經(jīng)被@ConfigurationProperties
注解了,所以這些配 置信息可以被自動(dòng)封裝并注冊到容器。
2.2 EurekaClient
接下來,看看核心類EurekaClient
是怎么注入進(jìn)去的? 在EurekaClientAutoConfiguration
文件中,我們發(fā)現(xiàn)有兩個(gè)地方都可以注入EurekaClient
,分別為:
@Configuration(proxyBeanMethods = false) @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } } // 另一個(gè)是: @Configuration(proxyBeanMethods = false) @ConditionalOnRefreshScope protected static class RefreshableEurekaClientConfiguration { @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) { } }
這就需要分析到底哪一個(gè)注解生效了?
@ConditionalOnMissingRefreshScope
@Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Conditional(OnMissingRefreshScopeCondition.class) @interface ConditionalOnMissingRefreshScope { } private static class OnMissingRefreshScopeCondition extends AnyNestedCondition { OnMissingRefreshScopeCondition() { super(ConfigurationPhase.REGISTER_BEAN); } @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope") static class MissingClass { } @ConditionalOnMissingBean(RefreshAutoConfiguration.class) static class MissingScope { } @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "false") static class OnPropertyDisabled { } }
大家 可以看看 AnyNestedCondition
這個(gè)注解,意思就是 只要滿足任意一個(gè)條件就符合
。通過分析,我們知道這三個(gè)條件都是滿足的,所以這個(gè)注解不生效,這個(gè)類不生效。
@ConditionalOnRefreshScope
@Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @ConditionalOnClass(RefreshScope.class) @ConditionalOnBean(RefreshAutoConfiguration.class) @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true", matchIfMissing = true) @interface ConditionalOnRefreshScope { }
通過這個(gè)注解EurekaClientAutoConfiguration
上的注解@AutoConfigureAfter
,我們知道當(dāng)前類注入是在RefreshAutoConfiguration之后
注入到容器中。而RefreshScope
就是在RefreshAutoConfiguration
之后中注入的。所以我們需要分析這個(gè)類就可以了。
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { }
2.2.1 ApplicationInfoManager
@Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); }
創(chuàng)建ApplicationInfoManager
對象,這個(gè)對象主要就是管著當(dāng)前實(shí)例信息,也就是instanceInfo
, 可以看到,在這個(gè)方法中先是創(chuàng)建的instanceInfo
,然后將instanceInfo
作為構(gòu)造參數(shù)傳入了ApplicationInfoManager
中。
這個(gè)實(shí)例信息instanceInfo
里面維護(hù)了你當(dāng)前實(shí)例的ip ,端口,appName
等信息,注冊的時(shí)候就是拿這些信息到Eureka Server
上注冊。
2.2.2 EurekaClient
@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }
創(chuàng)建Eureka Client
對象,這個(gè)CloudEurekaClient
類是Spring Cloud
搞得,然后繼承Eureka
原生的DiscoveryClient
類。
public class CloudEurekaClient extends DiscoveryClient
我們可以看看它的構(gòu)造
最重要的是,它調(diào)用了父類的DiscoveryClient
的構(gòu)造,下面重點(diǎn)介紹。
2.3 小結(jié)
總結(jié)以上的信息,從EurekaClientAutoConfiguration
等方面可羅列出如下幾個(gè)比較重要的類,如下:
類名 | 介紹與作用 |
---|---|
EurekaClientConfig | 封裝了Eureka Client 與 Eureka Server 交互時(shí)所需要的配置信息,Spring Cloud 為其提供了默認(rèn)配置類: EurekaClientConfigBean。 |
ApplicationInfoManager | 作為應(yīng)用信息管理器,管理服務(wù)實(shí)例類 Instancenfo 和服務(wù)實(shí)例配置信息類EurekaInstanceConfig。 |
InstanceInfo | 封裝了將被發(fā)送到 Eureka Server 進(jìn)行服務(wù)注冊的服務(wù)實(shí)例元數(shù)據(jù),它在Eureka 注冊表中代表著一個(gè)服務(wù)實(shí)例,其他服務(wù)可通過 InstanceInfo來了解該服務(wù)實(shí)例的相關(guān)信息,從而進(jìn)行相關(guān)操作。 |
EurekaInstanceConfig | 封裝了 Eureka Client 自身服務(wù)實(shí)例的配置信息,主要用于構(gòu)建 InstanceInfo,通常這些信息在配置文件的 eureka.instance 前綴下進(jìn)行設(shè)置,Spring Cloud 通過 EurekaInstanceBean 配置類提供默認(rèn)配置。 |
DiscoveryClient | Spring Cloud中定義用來做服務(wù)發(fā)現(xiàn)的客戶端接口。 |
3. DiscoveryClient類的解析
3.1 DiscoveryClient 作用
DiscoveryClient
是Eureka Client
的核心類,其作用與下:
- 注冊實(shí)例到 Eureka Server 中
- 發(fā)送心跳更新與 Eureka Server 的續(xù)約
- 在服務(wù)關(guān)閉時(shí)取消與 Eureka Server 的續(xù)約,完成服務(wù)下限
- 獲取在 Eureka Server 中的服務(wù)實(shí)例列表
3.2 DiscoveryClient 的類結(jié)構(gòu)
可以先看下 DiscoveryClient
的類結(jié)構(gòu)圖:
從類結(jié)構(gòu)圖上可以看出 DiscoveryClient
類實(shí)現(xiàn)了 EurekaCient
,EurekaCient
又繼承了LookupService
,這里看看 LookupService
類:
public interface LookupService<T> { // 根據(jù)服務(wù)實(shí)例名稱獲取 Application Application getApplication(String appName); // 獲取當(dāng)前注冊表中所有的服務(wù)實(shí)例信息 Applications getApplications(); // 根據(jù)服務(wù)實(shí)例 Id 獲取服務(wù)實(shí)例信息 List<InstanceInfo> getInstancesById(String id); InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure); }
Application
是持有服務(wù)實(shí)例信息列表,它表示同一個(gè)服務(wù)的集群信息,這些服務(wù)實(shí)例乃是掛載在同一個(gè)服務(wù)名 appName
之下,而 InstanceInfo
則是代表著一個(gè)服務(wù)實(shí)例的信息,Application
類代碼如下:
public class Application { private static Random shuffleRandom = new Random(); // 服務(wù)名 private String name; // 標(biāo)識(shí)服務(wù)狀態(tài) @XStreamOmitField private volatile boolean isDirty = false; @XStreamImplicit private final Set<InstanceInfo> instances; private final AtomicReference<List<InstanceInfo>> shuffledInstances; private final Map<String, InstanceInfo> instancesMap; // ........ }
在 Application
中對 InstanceInfo
的操作都是同步的,為的是保證其原子性。Applications
則是注冊表中所有服務(wù)實(shí)例的集合,其間的操作也都是同步的。EurekaClient
繼承了 LookupService
接口,為 DiscoveryClient
提供一個(gè)上層接口,其目的是為了Eureka1.0x 到 Eureka2.x 的升級(jí)做過渡。
EurekaCient
接口在 LookupService
的基礎(chǔ)上提供了更豐富的方法,譬如:
- 提供做種方式獲取 InstanceInfo,例如根據(jù)區(qū)域、Eureka Server 地址獲取等。
- 提供本地客戶端(區(qū)域、可用區(qū))的數(shù)據(jù),這部分與 AWS 相關(guān)
- 提供了為客戶端注冊和獲取健康檢查處理器的功能
除了相關(guān)查詢接口外,EurekaClient
提供以下的兩個(gè)方法,需頗多關(guān)注:
public interface EurekaClient extends LookupService { // ....... // 為 Eureka Client 注冊健康處理器 public void registerHealthCheck(HealthCheckHandler healthCheckHandler); // 監(jiān)聽 Client 服務(wù)實(shí)例信息的更新 public void registerEventListener(EurekaEventListener eventListener); }
在 Eureka Server 中一般是通過心跳來識(shí)別一個(gè)實(shí)例的狀態(tài),而在 Eureka Client 中則存在一個(gè)定時(shí)任務(wù)定時(shí)通過 HealthCheckHandler
檢測當(dāng)前 Client 的狀態(tài),當(dāng) 其狀態(tài)發(fā)生變化的時(shí)候,將會(huì)觸發(fā)新的注冊事件,更新 Eureka Server
的注冊表中的相關(guān)實(shí)例信息。
3.3 DiscoveryClient 構(gòu)造函數(shù)
在 DiscoveryClient
的構(gòu)造函數(shù)中,會(huì)有如下操作,如:服注冊表信息、服務(wù)注冊、初始化發(fā)送心跳、緩存刷新、注冊定時(shí)任務(wù)等。因此 DiscoveryClient
的構(gòu)造函數(shù)貫穿了 Eureka Client
啟動(dòng)階段的各項(xiàng)任務(wù)。
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 省略相關(guān)信息 }
在DiscoveryClient
的構(gòu)造函數(shù)中有如下幾個(gè)參數(shù):ApplicationInfoManager、EurekaClientConfig、AbstractDiscoveryClientOptionalArgs、Provider<BackupRegistry>、EndpointRandomizer
。前兩個(gè)參數(shù)前面已做介紹,AbstractDiscoveryClientOptionalArgs
用于注入一些可選參數(shù),BackupRegistry
則充當(dāng)備份注冊中心的職責(zé),EndpointRandomizer
則是作為端點(diǎn)隨機(jī)器。對DiscoveryClient
的構(gòu)造函數(shù)的職責(zé)做一個(gè)簡單概括:
- 相關(guān)配置賦值,如ApplicationInfoManager、EurekaClientConfig等
- 備份注冊中心初始化,默認(rèn)沒有實(shí)現(xiàn)
- 拉去 Eureka Server 注冊表信息
- 注冊前預(yù)處理
- 向 Eureka Server 注冊自身
- 初始化定時(shí)任務(wù)、緩存刷新、按需注冊定時(shí)任務(wù)
后面將會(huì)對這些步驟中對重要點(diǎn)進(jìn)行相關(guān)分析。
4. Eureka Client 初始化
接下來我們看下DiscoveryClient
是怎樣初始化的(構(gòu)造方法中)。代碼如下:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { ... // 如果開啟拉取注冊表的話 if (clientConfig.shouldFetchRegistry()) { try { // todo 拉取注冊表信息 boolean primaryFetchRegistryResult = fetchRegistry(false); if (!primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed"); } ... } } ... // 如果進(jìn)行服務(wù)注冊的話 clientConfig.shouldEnforceRegistrationAtInit() 默認(rèn)false if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // todo 進(jìn)行服務(wù)注冊 if (!register()) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } ... } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch // todo 定時(shí)任務(wù) initScheduledTasks(); ... }
4.1 拉取注冊表信息
// 如果開啟拉取注冊表的話 if (clientConfig.shouldFetchRegistry()) { // 拉取注冊表信息 boolean primaryFetchRegistryResult = fetchRegistry(false); }
如果開啟拉取注冊信息,就會(huì)調(diào)用fetchRegistry
方法去Eureka Server
上面拉取注冊表信息。
private boolean fetchRegistry(boolean forceFullRegistryFetch) { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() // 關(guān)閉增量,默認(rèn)false || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { // todo 全量拉取注冊表信息 getAndStoreFullRegistry(); } else { // todo 增量更新 getAndUpdateDelta(applications); } // 設(shè)置hashCode applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); }
可以看下最上面的注釋,不啟用增量 或者是第一次,就拉取全量注冊表信息。
不啟用增量|| 強(qiáng)制全量|| 本地注冊表是空的, 這個(gè)時(shí)候就會(huì)調(diào)用getAndStoreFullRegistry
方法去Eureka Server 拉取全量注冊表。 否則的話調(diào)用 getAndUpdateDelta
方法獲取增量注冊表信息。
4.1.1 全量拉取注冊表信息
接下來我們看下getAndStoreFullRegistry
方法,看看是怎樣拉取全量注冊表的。
// 獲取所有注冊表信息 private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; // 交給網(wǎng)絡(luò)傳輸組件,發(fā)起網(wǎng)絡(luò)請求,獲得響應(yīng) EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null // todo apps請求url ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
這里其實(shí)就是調(diào)用網(wǎng)絡(luò)組件來發(fā)起請求,得到響應(yīng)了,然后拿到所有得實(shí)例信息后,將實(shí)例信息設(shè)置到本地注冊表中。 我們這里再深入一點(diǎn),看看eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
是請求得哪個(gè)url:
@Override public EurekaHttpResponse<Applications> getApplications(String... regions) { return getApplicationsInternal("apps/", regions); } private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); // 拼接region if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); // 提交get請求 response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } }
拉取全量注冊表的請求為:GET請求,path為:apps/
4.1.2 增量拉取注冊表信息
getAndUpdateDelta(applications);
代碼如下:
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 提交請求 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { /** * 這里要將從Server獲取到的所有變更信息更新到本地緩存。這些變 * 更信來自于兩類Region:本地Region與遠(yuǎn)程Region。而本地緩存也 * 分為兩類:緩存本地Region的applications與緩存所有遠(yuǎn)程Region * 的注冊信息的map(key為遠(yuǎn)程Region,value為該遠(yuǎn)程Region的注冊 * 表) */ // todo updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } ... }
增量拉取注冊表的請求: GET請求 path為: apps/delta
然后,我們重點(diǎn)看一下updateDelta(delta);
方法:
private void updateDelta(Applications delta) { int deltaCount = 0; for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); // 不是本地region,遠(yuǎn)程region if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; // 有新增加的實(shí)例信息 if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); // 有修改的 } else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); // 有刪除的 } else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. */ if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } ... }
這個(gè)方法就是更新客戶端本地的注冊表信息。
4.2 服務(wù)注冊
// 如果進(jìn)行服務(wù)注冊的話 clientConfig.shouldEnforceRegistrationAtInit() 默認(rèn)false if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // todo 進(jìn)行服務(wù)注冊 if (!register()) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } }
如果在這里進(jìn)行服務(wù)注冊的話,需要配置文件中增加下面配置(默認(rèn)是false):
eureka.client.should-enforce-registration-at-init: true
所以在這里是沒有服務(wù)注冊的,那么服務(wù)注冊是在哪里呢?在會(huì)面分析續(xù)約定時(shí)任務(wù)時(shí)完成了服務(wù)注冊,不過,我們在這里也看一下服務(wù)注冊的代碼:
boolean register() throws Throwable { EurekaHttpResponse<Void> httpResponse; try { // todo 進(jìn)行服務(wù)注冊 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } ... } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
接下來看:
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); Response response = null; try { Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request(); addExtraProperties(resourceBuilder); addExtraHeaders(resourceBuilder); response = resourceBuilder .accept(MediaType.APPLICATION_JSON) .acceptEncoding("gzip") .post(Entity.json(info)); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
服務(wù)注冊:POST請求,path為:“apps/" + appName
4.3 定時(shí)任務(wù)
initScheduledTasks();
初始化定時(shí)任務(wù)。我們分別看一下:
4.3.1 定時(shí)更新客戶端注冊表任務(wù)
private void initScheduledTasks() { // todo 拉取注冊表 增量拉取定時(shí)任務(wù) if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // 拉取間隔 默認(rèn)是30s int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); // todo 放入定時(shí)任務(wù),默認(rèn)30s執(zhí)行一次 // 在這里看只有一個(gè)任務(wù),在任務(wù)完成的時(shí)候會(huì)重新開啟一個(gè)新的任務(wù),可以點(diǎn)進(jìn)去看看 scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } }
默認(rèn)每隔30s
增量拉取注冊表信息。拉取注冊表信息,最終還是走我們上面介紹的fetchRegistry
方法。
我們看一下com.netflix.discovery.TimedSupervisorTask#run
:
@Override public void run() { Future<?> future = null; try { // 使用Future,可以設(shè)定子線程的超時(shí)時(shí)間,這樣當(dāng)前線程就不用無限等待了 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 阻塞 獲取任務(wù)的執(zhí)行結(jié)果 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout // delay是個(gè)很有用的變量,后面會(huì)用到,這里記得每次執(zhí)行任務(wù)成功都會(huì)將delay重置 delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); // 任務(wù)線程超時(shí)的時(shí)候,就把delay變量翻倍,但不會(huì)超過外部調(diào)用時(shí)設(shè)定的最大延時(shí)時(shí)間 long newDelay = Math.min(maxDelay, currentDelay * 2); // 設(shè)置為最新的值,考慮到多線程,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 一旦線程池的阻塞隊(duì)列中放滿了待處理任務(wù),觸發(fā)了拒絕策略,就會(huì)將調(diào)度器停掉 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { // 一旦出現(xiàn)未知的異常,就停掉調(diào)度器 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { // 這里任務(wù)要么執(zhí)行完畢,要么發(fā)生異常,都用cancel方法來清理任務(wù); if (future != null) { future.cancel(true); } // 只要調(diào)度器沒有停止,就再指定等待時(shí)間之后在執(zhí)行一次同樣的任務(wù) if (!scheduler.isShutdown()) { // todo 下一次時(shí)間 再次執(zhí)行這個(gè)任務(wù) //這里就是周期性任務(wù)的原因:只要沒有停止調(diào)度器,就再創(chuàng)建一次性任務(wù),執(zhí)行時(shí)間時(shí)delay的值, //假設(shè)外部調(diào)用時(shí)傳入的超時(shí)時(shí)間為30秒(構(gòu)造方法的入?yún)imeout),最大間隔時(shí)間為50秒(構(gòu)造方法的入?yún)xpBackOffBound) //如果最近一次任務(wù)沒有超時(shí),那么就在30秒后開始新任務(wù), //如果最近一次任務(wù)超時(shí)了,那么就在50秒后開始新任務(wù)(異常處理中有個(gè)乘以二的操作,乘以二后的60秒超過了最大間隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
在這個(gè)Task中時(shí)機(jī)執(zhí)行的還是入?yún)⒌姆椒?code>new CacheRefreshThread():
new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() );
class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } @VisibleForTesting void refreshRegistry() { try { ... // todo 拉取注冊表 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } ... } ... }
在執(zhí)行完這個(gè)任務(wù)之后,會(huì)調(diào)用TimedSupervisorTask#run
中finally代碼,在這里又重新開啟了新的定時(shí)任務(wù):
finally { // 這里任務(wù)要么執(zhí)行完畢,要么發(fā)生異常,都用cancel方法來清理任務(wù); if (future != null) { future.cancel(true); } // 只要調(diào)度器沒有停止,就再指定等待時(shí)間之后在執(zhí)行一次同樣的任務(wù) if (!scheduler.isShutdown()) { // todo 下一次時(shí)間 再次執(zhí)行這個(gè)任務(wù) //這里就是周期性任務(wù)的原因:只要沒有停止調(diào)度器,就再創(chuàng)建一次性任務(wù),執(zhí)行時(shí)間時(shí)delay的值, //假設(shè)外部調(diào)用時(shí)傳入的超時(shí)時(shí)間為30秒(構(gòu)造方法的入?yún)imeout),最大間隔時(shí)間為50秒(構(gòu)造方法的入?yún)xpBackOffBound) //如果最近一次任務(wù)沒有超時(shí),那么就在30秒后開始新任務(wù), //如果最近一次任務(wù)超時(shí)了,那么就在50秒后開始新任務(wù)(異常處理中有個(gè)乘以二的操作,乘以二后的60秒超過了最大間隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } }
這樣就實(shí)現(xiàn)了每隔30s調(diào)用一個(gè)拉取注冊表的任務(wù)。
4.3.2 定時(shí)服務(wù)續(xù)約任務(wù)
private void initScheduledTasks() { ... // 開啟注冊 if (clientConfig.shouldRegisterWithEureka()) { // todo 服務(wù)續(xù)約定時(shí)任務(wù) // 續(xù)約間隔時(shí)間 30s int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 指定client從server更新注冊表的最大時(shí)間間隔指數(shù)(倍數(shù)),默認(rèn)為10 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer // todo 續(xù)約,心跳定時(shí)任務(wù) heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); // 續(xù)約定時(shí)任務(wù) scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
每30s 執(zhí)行一次服務(wù)續(xù)約。直接看下HeartbeatThread
類。
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
走的是renew
方法請求服務(wù)續(xù)約,成功后會(huì)更新lastSuccessfulHeartbeatTimestamp
字段。
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); // 如果是沒有發(fā)現(xiàn)該實(shí)例信息的話 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); // todo 進(jìn)行服務(wù)注冊,如果我們不在配置文件中指定服務(wù)初始化就注冊該服務(wù),那么服務(wù)的注冊實(shí)際是在這里注冊的 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
很簡單,就是調(diào)用 eurekaTransport.registrationClient.sendHeartBeat
方法發(fā)送服務(wù)續(xù)約的請求,如果你實(shí)例信息在Eureka Server中不存在的話,就進(jìn)行服務(wù)注冊,我們再稍微看下sendHeartBeat 方法,里面請求uri就是 String urlPath = “apps/” + appName + ‘/’ + id;
服務(wù)續(xù)約請求:PUT請求, path為:apps/{appName}/{instanceId}
4.3.3 定時(shí)更新Client信息給Server任務(wù)
private void initScheduledTasks() { ... // 開啟注冊 if (clientConfig.shouldRegisterWithEureka()) { ... // todo 定時(shí)更新Client信息給服務(wù)端 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } // 監(jiān)聽到StatusChangeEvent 事件,調(diào)用notify方法 @Override public void notify(StatusChangeEvent statusChangeEvent) { logger.info("Saw local status change event {}", statusChangeEvent); // todo 通知執(zhí)行方法,這個(gè)方法就是立即向 服務(wù)端發(fā)起注冊請求 instanceInfoReplicator.onDemandUpdate(); } }; // 向applicationInfoManager 中注冊 狀態(tài)變化事件監(jiān)聽器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // todo 參數(shù)默認(rèn)40s instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } ... }
我們看下這個(gè)start啟動(dòng)
方法:
public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
這里有個(gè)非常重要的點(diǎn),調(diào)用了實(shí)例信息的setIsDirty
方法,后面的注釋說是為了初始化服務(wù)注冊。
創(chuàng)建一個(gè)延時(shí)任務(wù),默認(rèn)是40s??纯?0s執(zhí)行啥東西。com.netflix.discovery.InstanceInfoReplicator#run
:
public void run() { try { // 刷新實(shí)例信息 discoveryClient.refreshInstanceInfo(); // 獲取臟的時(shí)間戳 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // todo 客戶端重新發(fā)起 注冊請求 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
如果這個(gè)時(shí)間戳不是null的話,調(diào)用register
方法進(jìn)行服務(wù)注冊,這個(gè)時(shí)間戳肯定不是null的, instanceInfo.setIsDirty(); // for initial register
我們上面這個(gè)方法就是設(shè)置了這個(gè)時(shí)間戳。最后又將這個(gè)任務(wù)放入延時(shí)調(diào)度中。
其實(shí)這個(gè)定時(shí)任務(wù)是為了檢測服務(wù)信息有沒有變動(dòng),如果有變動(dòng)重新注冊到Eureka Server上去。
下面我們來看一下狀態(tài)改變監(jiān)聽器statusChangeListener:
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } // 監(jiān)聽到StatusChangeEvent 事件,調(diào)用notify方法 @Override public void notify(StatusChangeEvent statusChangeEvent) { logger.info("Saw local status change event {}", statusChangeEvent); // todo 通知執(zhí)行方法,這個(gè)方法就是立即向 服務(wù)端發(fā)起注冊請求 instanceInfoReplicator.onDemandUpdate(); } }; // 向applicationInfoManager 中注冊 狀態(tài)變化事件監(jiān)聽器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); }
如果 Eureka Client 狀態(tài)發(fā)生變化(在Spring Boot 通過 Actuator
對服務(wù)狀態(tài)進(jìn)行監(jiān)控,具體實(shí)現(xiàn)為 EurekaHealthCheckHandler
),注冊在 ApplicationInfoManager
的狀態(tài)改變監(jiān)控器將會(huì)被觸發(fā),從而調(diào)用InstanceInfoReplicator#onDemandUpdate
方法,檢查服務(wù)實(shí)例信息和服務(wù)狀態(tài)的變化,可能會(huì)引起按需注冊任務(wù),代碼如下:
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { // 提交 scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); // 取消定時(shí)任務(wù) latestPeriodic.cancel(false); } // todo 執(zhí)行 向 Server端重新 注冊的請求 InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }
InstanceInfoReplicator#onDemandUpdate
方法中調(diào)用 InstanceInfoReplicator#run
方法檢查服務(wù)實(shí)例信息和服務(wù)狀態(tài)的變化,并在服務(wù)實(shí)例信息和服務(wù)狀態(tài)發(fā)生變化的情況下向 Eureka Server 發(fā)起重新注冊的請求,為了防止重新執(zhí)行 run
方法,onDemandUpdate
方法還會(huì)取消執(zhí)行上次已經(jīng)提交且未完成的 run方法,執(zhí)行最新的按需注冊任務(wù)。
4.4 總結(jié)
服務(wù)注冊的時(shí)機(jī)
Client提交register()
請求的情況有三種:
- 在應(yīng)用啟動(dòng)時(shí)就可以直接進(jìn)行
register()
,不過,需要提前在配置文件中配置 - 在
renew
時(shí),如果server端返回的是NOT_FOUND
,則提交register()
- 當(dāng)Client的配置信息發(fā)生了變更,則Client提交
register()
Client實(shí)例化
Eureka Client 實(shí)例化的時(shí)候有幾個(gè)重要步驟,分別如下:
全量拉取注冊表信息,放入自己本地注冊表中。
創(chuàng)建定時(shí)任務(wù),
- 定時(shí)服務(wù)續(xù)約任務(wù),默認(rèn)是30s,
- 定時(shí)更新 客戶端注冊表信息,默認(rèn)是30s,
- 定時(shí)更新Client信息給Server端,重新服務(wù)注冊,默認(rèn)是40s。
參考文章
springcloud-source-study學(xué)習(xí)github地址
以上就是Eureka源碼閱讀Client啟動(dòng)入口注冊續(xù)約及定時(shí)任務(wù)的詳細(xì)內(nèi)容,更多關(guān)于Eureka源碼Client啟動(dòng)入口的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java+opencv3.2.0實(shí)現(xiàn)輪廓檢測
這篇文章主要為大家詳細(xì)介紹了Java+opencv3.2.0實(shí)現(xiàn)輪廓檢測,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-07-07全面解析JPA?倉庫repository中的findAll()方法
這篇文章主要介紹了全面解析JPA?倉庫repository中的findAll()方法,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02java設(shè)計(jì)模式Ctrl?C和Ctrl?V的原型模式詳解
這篇文章主要為大家介紹了java設(shè)計(jì)模式Ctrl?C和Ctrl?V的原型模式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02詳解Spring mvc DispatchServlet 實(shí)現(xiàn)機(jī)制
本篇文章主要介紹了詳解Spring mvc DispatchServlet 實(shí)現(xiàn)機(jī)制,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09Java實(shí)現(xiàn)一鍵生成表controller,service,mapper文件
這篇文章主要為大家詳細(xì)介紹了如何利用Java語言實(shí)現(xiàn)一鍵生成表controller,service,mapper文件,文中的示例代碼講解詳細(xì),需要的可以收藏一下2023-05-05java語言實(shí)現(xiàn)權(quán)重隨機(jī)算法完整實(shí)例
這篇文章主要介紹了java語言實(shí)現(xiàn)權(quán)重隨機(jī)算法完整實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-11-11