欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Eureka源碼閱讀Client啟動(dòng)入口注冊續(xù)約及定時(shí)任務(wù)

 更新時(shí)間:2022年10月15日 12:15:19   作者:hsfxuebao  
這篇文章主要為大家介紹了Eureka源碼閱讀Client啟動(dòng)入口注冊續(xù)約及定時(shí)任務(wù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

本文主要是解析下Spring Cloud整合Eureka Client的源碼,這塊代碼比較多,而且都是些簡單代碼,我們稍微看下就行,這就是介紹下Eureka Client初始化過程,不管你Spring Cloud 怎樣封裝,底層還是Eureka Client的內(nèi)容,初始化過程包括下面:

  • 去Eureka Server 拉取全量注冊表,
  • 創(chuàng)建定時(shí)任務(wù),包括定時(shí)去Eureka Server 上增量拉取注冊表信息,定時(shí)renew (服務(wù)續(xù)約)。
  • 服務(wù)注冊

1.環(huán)境

2. Spring Cloud整合Eureka Client 啟動(dòng)入口

要看Spring Cloud 怎樣整合 Eureka Client ,就需要找到它們的自動(dòng)裝配配置類 在spring-cloud-starter-netflix-eureka-client 依賴的pom文件中,在依賴pom文件中有spring-cloud-netflix-eureka-client, 在這個(gè)里面能夠找到spring.factories 文件,這個(gè)文件是spring spi文件。

核心就是EurekaClientAutoConfiguration 這個(gè)自動(dòng)裝配類:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
      "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
      "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
      "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
}

2.1 封裝配置文件的類

2.1.1 EurekaClientConfigBean

@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
   return new EurekaClientConfigBean();
}

其讀取的是eureka.client前輟的配置信息。這個(gè)類已經(jīng)被@ConfigurationProperties注解了,所以這些 配置信息可以被自動(dòng)封裝并注冊到容器。

2.1.2 EurekaInstanceConfigBean

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
      ManagementMetadataProvider managementMetadataProvider) {
}

其讀取的是eureka.instance的屬性值。這個(gè)類也已經(jīng)被@ConfigurationProperties注解了,所以這些配 置信息可以被自動(dòng)封裝并注冊到容器。

2.2 EurekaClient

接下來,看看核心類EurekaClient是怎么注入進(jìn)去的? 在EurekaClientAutoConfiguration文件中,我們發(fā)現(xiàn)有兩個(gè)地方都可以注入EurekaClient,分別為:

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
   @Bean(destroyMethod = "shutdown")
   @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
   public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
      return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
   }
}
// 另一個(gè)是:
@Configuration(proxyBeanMethods = false)
@ConditionalOnRefreshScope
protected static class RefreshableEurekaClientConfiguration {
   @Bean(destroyMethod = "shutdown")
   @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
   @org.springframework.cloud.context.config.annotation.RefreshScope
   @Lazy
   public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
         EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
      }
}

這就需要分析到底哪一個(gè)注解生效了?

@ConditionalOnMissingRefreshScope

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(OnMissingRefreshScopeCondition.class)
@interface ConditionalOnMissingRefreshScope {
}
private static class OnMissingRefreshScopeCondition extends AnyNestedCondition {
   OnMissingRefreshScopeCondition() {
      super(ConfigurationPhase.REGISTER_BEAN);
   }
  @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope")
   static class MissingClass {
   }
   @ConditionalOnMissingBean(RefreshAutoConfiguration.class)
   static class MissingScope {
   }
   @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "false")
   static class OnPropertyDisabled {
   }
}

大家 可以看看 AnyNestedCondition這個(gè)注解,意思就是 只要滿足任意一個(gè)條件就符合。通過分析,我們知道這三個(gè)條件都是滿足的,所以這個(gè)注解不生效,這個(gè)類不生效。

@ConditionalOnRefreshScope

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ConditionalOnClass(RefreshScope.class)
@ConditionalOnBean(RefreshAutoConfiguration.class)
@ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true", matchIfMissing = true)
@interface ConditionalOnRefreshScope {
}

通過這個(gè)注解EurekaClientAutoConfiguration上的注解@AutoConfigureAfter,我們知道當(dāng)前類注入是在RefreshAutoConfiguration之后注入到容器中。而RefreshScope就是在RefreshAutoConfiguration之后中注入的。所以我們需要分析這個(gè)類就可以了。

@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
  "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
  "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
      "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
}

2.2.1 ApplicationInfoManager

@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
		EurekaInstanceConfig config) {
	InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
	return new ApplicationInfoManager(config, instanceInfo);
}

創(chuàng)建ApplicationInfoManager 對象,這個(gè)對象主要就是管著當(dāng)前實(shí)例信息,也就是instanceInfo , 可以看到,在這個(gè)方法中先是創(chuàng)建的instanceInfo,然后將instanceInfo 作為構(gòu)造參數(shù)傳入了ApplicationInfoManager 中。

這個(gè)實(shí)例信息instanceInfo 里面維護(hù)了你當(dāng)前實(shí)例的ip ,端口,appName等信息,注冊的時(shí)候就是拿這些信息到Eureka Server 上注冊。

2.2.2 EurekaClient

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
	return new CloudEurekaClient(manager, config, this.optionalArgs,
			this.context);
}

創(chuàng)建Eureka Client 對象,這個(gè)CloudEurekaClient 類是Spring Cloud 搞得,然后繼承Eureka 原生的DiscoveryClient 類。

public class CloudEurekaClient extends DiscoveryClient 

我們可以看看它的構(gòu)造

最重要的是,它調(diào)用了父類的DiscoveryClient 的構(gòu)造,下面重點(diǎn)介紹。

2.3 小結(jié)

總結(jié)以上的信息,從EurekaClientAutoConfiguration等方面可羅列出如下幾個(gè)比較重要的類,如下:

類名介紹與作用
EurekaClientConfig封裝了Eureka Client 與 Eureka Server 交互時(shí)所需要的配置信息,Spring Cloud 為其提供了默認(rèn)配置類: EurekaClientConfigBean。
ApplicationInfoManager作為應(yīng)用信息管理器,管理服務(wù)實(shí)例類 Instancenfo 和服務(wù)實(shí)例配置信息類EurekaInstanceConfig。
InstanceInfo封裝了將被發(fā)送到 Eureka Server 進(jìn)行服務(wù)注冊的服務(wù)實(shí)例元數(shù)據(jù),它在Eureka 注冊表中代表著一個(gè)服務(wù)實(shí)例,其他服務(wù)可通過 InstanceInfo來了解該服務(wù)實(shí)例的相關(guān)信息,從而進(jìn)行相關(guān)操作。
EurekaInstanceConfig封裝了 Eureka Client 自身服務(wù)實(shí)例的配置信息,主要用于構(gòu)建 InstanceInfo,通常這些信息在配置文件的 eureka.instance 前綴下進(jìn)行設(shè)置,Spring Cloud 通過 EurekaInstanceBean 配置類提供默認(rèn)配置。
DiscoveryClientSpring Cloud中定義用來做服務(wù)發(fā)現(xiàn)的客戶端接口。

3. DiscoveryClient類的解析

3.1 DiscoveryClient 作用

DiscoveryClientEureka Client 的核心類,其作用與下:

  • 注冊實(shí)例到 Eureka Server 中
  • 發(fā)送心跳更新與 Eureka Server 的續(xù)約
  • 在服務(wù)關(guān)閉時(shí)取消與 Eureka Server 的續(xù)約,完成服務(wù)下限
  • 獲取在 Eureka Server 中的服務(wù)實(shí)例列表

3.2 DiscoveryClient 的類結(jié)構(gòu)

可以先看下 DiscoveryClient 的類結(jié)構(gòu)圖:

從類結(jié)構(gòu)圖上可以看出 DiscoveryClient 類實(shí)現(xiàn)了 EurekaCientEurekaCient 又繼承了LookupService,這里看看 LookupService 類:

public interface LookupService<T> {
    // 根據(jù)服務(wù)實(shí)例名稱獲取 Application
    Application getApplication(String appName);
    // 獲取當(dāng)前注冊表中所有的服務(wù)實(shí)例信息
    Applications getApplications();
    // 根據(jù)服務(wù)實(shí)例 Id 獲取服務(wù)實(shí)例信息
    List<InstanceInfo> getInstancesById(String id);
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

Application 是持有服務(wù)實(shí)例信息列表,它表示同一個(gè)服務(wù)的集群信息,這些服務(wù)實(shí)例乃是掛載在同一個(gè)服務(wù)名 appName 之下,而 InstanceInfo 則是代表著一個(gè)服務(wù)實(shí)例的信息,Application 類代碼如下:

public class Application {
    private static Random shuffleRandom = new Random();
    // 服務(wù)名
    private String name;
    // 標(biāo)識(shí)服務(wù)狀態(tài)
    @XStreamOmitField
    private volatile boolean isDirty = false;
    @XStreamImplicit
    private final Set<InstanceInfo> instances;
    private final AtomicReference<List<InstanceInfo>> shuffledInstances;
    private final Map<String, InstanceInfo> instancesMap;
    // ........
}

Application 中對 InstanceInfo 的操作都是同步的,為的是保證其原子性。Applications 則是注冊表中所有服務(wù)實(shí)例的集合,其間的操作也都是同步的。EurekaClient 繼承了 LookupService 接口,為 DiscoveryClient 提供一個(gè)上層接口,其目的是為了Eureka1.0x 到 Eureka2.x 的升級(jí)做過渡。

EurekaCient 接口在 LookupService 的基礎(chǔ)上提供了更豐富的方法,譬如:

  • 提供做種方式獲取 InstanceInfo,例如根據(jù)區(qū)域、Eureka Server 地址獲取等。
  • 提供本地客戶端(區(qū)域、可用區(qū))的數(shù)據(jù),這部分與 AWS 相關(guān)
  • 提供了為客戶端注冊和獲取健康檢查處理器的功能

除了相關(guān)查詢接口外,EurekaClient 提供以下的兩個(gè)方法,需頗多關(guān)注:

public interface EurekaClient extends LookupService {
    // .......
    // 為 Eureka Client 注冊健康處理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
    // 監(jiān)聽 Client 服務(wù)實(shí)例信息的更新
    public void registerEventListener(EurekaEventListener eventListener);
}

在 Eureka Server 中一般是通過心跳來識(shí)別一個(gè)實(shí)例的狀態(tài),而在 Eureka Client 中則存在一個(gè)定時(shí)任務(wù)定時(shí)通過 HealthCheckHandler 檢測當(dāng)前 Client 的狀態(tài),當(dāng) 其狀態(tài)發(fā)生變化的時(shí)候,將會(huì)觸發(fā)新的注冊事件,更新 Eureka Server 的注冊表中的相關(guān)實(shí)例信息。

3.3 DiscoveryClient 構(gòu)造函數(shù)

DiscoveryClient 的構(gòu)造函數(shù)中,會(huì)有如下操作,如:服注冊表信息、服務(wù)注冊、初始化發(fā)送心跳、緩存刷新、注冊定時(shí)任務(wù)等。因此 DiscoveryClient 的構(gòu)造函數(shù)貫穿了 Eureka Client 啟動(dòng)階段的各項(xiàng)任務(wù)。

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    // 省略相關(guān)信息
}

DiscoveryClient 的構(gòu)造函數(shù)中有如下幾個(gè)參數(shù):ApplicationInfoManager、EurekaClientConfig、AbstractDiscoveryClientOptionalArgs、Provider<BackupRegistry>、EndpointRandomizer。前兩個(gè)參數(shù)前面已做介紹,AbstractDiscoveryClientOptionalArgs 用于注入一些可選參數(shù),BackupRegistry則充當(dāng)備份注冊中心的職責(zé),EndpointRandomizer 則是作為端點(diǎn)隨機(jī)器。對DiscoveryClient 的構(gòu)造函數(shù)的職責(zé)做一個(gè)簡單概括:

  • 相關(guān)配置賦值,如ApplicationInfoManager、EurekaClientConfig等
  • 備份注冊中心初始化,默認(rèn)沒有實(shí)現(xiàn)
  • 拉去 Eureka Server 注冊表信息
  • 注冊前預(yù)處理
  • 向 Eureka Server 注冊自身
  • 初始化定時(shí)任務(wù)、緩存刷新、按需注冊定時(shí)任務(wù)

后面將會(huì)對這些步驟中對重要點(diǎn)進(jìn)行相關(guān)分析。

4. Eureka Client 初始化

接下來我們看下DiscoveryClient 是怎樣初始化的(構(gòu)造方法中)。代碼如下:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    ...
    // 如果開啟拉取注冊表的話
    if (clientConfig.shouldFetchRegistry()) {
        try {
            // todo 拉取注冊表信息
            boolean primaryFetchRegistryResult = fetchRegistry(false);
            if (!primaryFetchRegistryResult) {
                logger.info("Initial registry fetch from primary servers failed");
            }
            ...
        }
    }
    ...
    // 如果進(jìn)行服務(wù)注冊的話 clientConfig.shouldEnforceRegistrationAtInit() 默認(rèn)false
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            // todo 進(jìn)行服務(wù)注冊
            if (!register()) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } 
        ...
    }
    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    // todo 定時(shí)任務(wù)
    initScheduledTasks();
   ...
}

4.1 拉取注冊表信息

 // 如果開啟拉取注冊表的話
if (clientConfig.shouldFetchRegistry()) {
      // 拉取注冊表信息
      boolean primaryFetchRegistryResult = fetchRegistry(false);
}

如果開啟拉取注冊信息,就會(huì)調(diào)用fetchRegistry 方法去Eureka Server上面拉取注冊表信息。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    // If the delta is disabled or if it is the first time, get all
    // applications
     Applications applications = getApplications();
    if (clientConfig.shouldDisableDelta()  // 關(guān)閉增量,默認(rèn)false
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
        // todo 全量拉取注冊表信息
        getAndStoreFullRegistry();
    } else {
        // todo 增量更新
        getAndUpdateDelta(applications);
    }
    // 設(shè)置hashCode
    applications.setAppsHashCode(applications.getReconcileHashCode());
    logTotalInstances();
}

可以看下最上面的注釋,不啟用增量 或者是第一次,就拉取全量注冊表信息。

不啟用增量|| 強(qiáng)制全量|| 本地注冊表是空的, 這個(gè)時(shí)候就會(huì)調(diào)用getAndStoreFullRegistry 方法去Eureka Server 拉取全量注冊表。 否則的話調(diào)用 getAndUpdateDelta 方法獲取增量注冊表信息。

4.1.1 全量拉取注冊表信息

接下來我們看下getAndStoreFullRegistry 方法,看看是怎樣拉取全量注冊表的。

// 獲取所有注冊表信息
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications apps = null;
    // 交給網(wǎng)絡(luò)傳輸組件,發(fā)起網(wǎng)絡(luò)請求,獲得響應(yīng)
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
              // todo apps請求url
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        //
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

這里其實(shí)就是調(diào)用網(wǎng)絡(luò)組件來發(fā)起請求,得到響應(yīng)了,然后拿到所有得實(shí)例信息后,將實(shí)例信息設(shè)置到本地注冊表中。 我們這里再深入一點(diǎn),看看eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 是請求得哪個(gè)url:

@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        // 拼接region
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        // 提交get請求
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            applications = response.getEntity(Applications.class);
        }
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } 
}

拉取全量注冊表的請求為:GET請求,path為:apps/

4.1.2 增量拉取注冊表信息

getAndUpdateDelta(applications);代碼如下:

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications delta = null;
    // 提交請求
    EurekaHttpResponse&lt;Applications&gt; httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    if (delta == null) {
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                /**
                 * 這里要將從Server獲取到的所有變更信息更新到本地緩存。這些變
                 * 更信來自于兩類Region:本地Region與遠(yuǎn)程Region。而本地緩存也
                 * 分為兩類:緩存本地Region的applications與緩存所有遠(yuǎn)程Region
                 * 的注冊信息的map(key為遠(yuǎn)程Region,value為該遠(yuǎn)程Region的注冊
                 * 表)
                 */
                // todo
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } 
        ...
}

增量拉取注冊表的請求: GET請求 path為: apps/delta

然后,我們重點(diǎn)看一下updateDelta(delta);方法:

private void updateDelta(Applications delta) {
    int deltaCount = 0;
    for (Application app : delta.getRegisteredApplications()) {
        for (InstanceInfo instance : app.getInstances()) {
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            // 不是本地region,遠(yuǎn)程region
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }
            ++deltaCount;
            // 有新增加的實(shí)例信息
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            // 有修改的
            } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Modified instance {} to the existing apps ", instance.getId());
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            // 有刪除的
            } else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    existingApp.removeInstance(instance);
                    /*
                     * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                     * if instance list is empty, we remove the application.
                     */
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
   ...
}

這個(gè)方法就是更新客戶端本地的注冊表信息。

4.2 服務(wù)注冊

// 如果進(jìn)行服務(wù)注冊的話 clientConfig.shouldEnforceRegistrationAtInit() 默認(rèn)false
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
    try {
        // todo 進(jìn)行服務(wù)注冊
        if (!register()) {
            throw new IllegalStateException("Registration error at startup. Invalid server response.");
        }
    } catch (Throwable th) {
        logger.error("Registration error at startup: {}", th.getMessage());
        throw new IllegalStateException(th);
    }
}

如果在這里進(jìn)行服務(wù)注冊的話,需要配置文件中增加下面配置(默認(rèn)是false):

eureka.client.should-enforce-registration-at-init: true

所以在這里是沒有服務(wù)注冊的,那么服務(wù)注冊是在哪里呢?在會(huì)面分析續(xù)約定時(shí)任務(wù)時(shí)完成了服務(wù)注冊,不過,我們在這里也看一下服務(wù)注冊的代碼:

boolean register() throws Throwable {
    EurekaHttpResponse<Void> httpResponse;
    try {
        // todo 進(jìn)行服務(wù)注冊
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } 
    ...
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

接下來看:

@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    Response response = null;
    try {
        Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
        addExtraProperties(resourceBuilder);
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .accept(MediaType.APPLICATION_JSON)
                .acceptEncoding("gzip")
                .post(Entity.json(info));
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

服務(wù)注冊:POST請求,path為:“apps/" + appName

4.3 定時(shí)任務(wù)

initScheduledTasks();

初始化定時(shí)任務(wù)。我們分別看一下:

4.3.1 定時(shí)更新客戶端注冊表任務(wù)

private void initScheduledTasks() {
    // todo 拉取注冊表 增量拉取定時(shí)任務(wù)
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        // 拉取間隔 默認(rèn)是30s
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        // todo 放入定時(shí)任務(wù),默認(rèn)30s執(zhí)行一次
        // 在這里看只有一個(gè)任務(wù),在任務(wù)完成的時(shí)候會(huì)重新開啟一個(gè)新的任務(wù),可以點(diǎn)進(jìn)去看看
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
}

默認(rèn)每隔30s 增量拉取注冊表信息。拉取注冊表信息,最終還是走我們上面介紹的fetchRegistry 方法。

我們看一下com.netflix.discovery.TimedSupervisorTask#run:

@Override
public void run() {
    Future<?> future = null;
    try {
        // 使用Future,可以設(shè)定子線程的超時(shí)時(shí)間,這樣當(dāng)前線程就不用無限等待了
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 阻塞 獲取任務(wù)的執(zhí)行結(jié)果
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
        // delay是個(gè)很有用的變量,后面會(huì)用到,這里記得每次執(zhí)行任務(wù)成功都會(huì)將delay重置
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        successCounter.increment();
    } catch (TimeoutException e) {
        logger.warn("task supervisor timed out", e);
        timeoutCounter.increment();
        long currentDelay = delay.get();
        // 任務(wù)線程超時(shí)的時(shí)候,就把delay變量翻倍,但不會(huì)超過外部調(diào)用時(shí)設(shè)定的最大延時(shí)時(shí)間
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        // 設(shè)置為最新的值,考慮到多線程,所以用了CAS
        delay.compareAndSet(currentDelay, newDelay);
    } catch (RejectedExecutionException e) {
        // 一旦線程池的阻塞隊(duì)列中放滿了待處理任務(wù),觸發(fā)了拒絕策略,就會(huì)將調(diào)度器停掉
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, reject the task", e);
        } else {
            logger.warn("task supervisor rejected the task", e);
        }
        rejectedCounter.increment();
    } catch (Throwable e) {
        // 一旦出現(xiàn)未知的異常,就停掉調(diào)度器
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, can't accept the task");
        } else {
            logger.warn("task supervisor threw an exception", e);
        }
        throwableCounter.increment();
    } finally {
        // 這里任務(wù)要么執(zhí)行完畢,要么發(fā)生異常,都用cancel方法來清理任務(wù);
        if (future != null) {
            future.cancel(true);
        }
        // 只要調(diào)度器沒有停止,就再指定等待時(shí)間之后在執(zhí)行一次同樣的任務(wù)
        if (!scheduler.isShutdown()) {
            // todo 下一次時(shí)間 再次執(zhí)行這個(gè)任務(wù)
            //這里就是周期性任務(wù)的原因:只要沒有停止調(diào)度器,就再創(chuàng)建一次性任務(wù),執(zhí)行時(shí)間時(shí)delay的值,
            //假設(shè)外部調(diào)用時(shí)傳入的超時(shí)時(shí)間為30秒(構(gòu)造方法的入?yún)imeout),最大間隔時(shí)間為50秒(構(gòu)造方法的入?yún)xpBackOffBound)
            //如果最近一次任務(wù)沒有超時(shí),那么就在30秒后開始新任務(wù),
            //如果最近一次任務(wù)超時(shí)了,那么就在50秒后開始新任務(wù)(異常處理中有個(gè)乘以二的操作,乘以二后的60秒超過了最大間隔50秒)
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}

在這個(gè)Task中時(shí)機(jī)執(zhí)行的還是入?yún)⒌姆椒?code>new CacheRefreshThread():

new TimedSupervisorTask(
        "cacheRefresh",
        scheduler,
        cacheRefreshExecutor,
        registryFetchIntervalSeconds,
        TimeUnit.SECONDS,
        expBackOffBound,
        new CacheRefreshThread()
);
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}
@VisibleForTesting
void refreshRegistry() {
    try {
        ...
        // todo 拉取注冊表
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }
       ...
    } 
    ...
}

在執(zhí)行完這個(gè)任務(wù)之后,會(huì)調(diào)用TimedSupervisorTask#run中finally代碼,在這里又重新開啟了新的定時(shí)任務(wù):

finally {
        // 這里任務(wù)要么執(zhí)行完畢,要么發(fā)生異常,都用cancel方法來清理任務(wù);
        if (future != null) {
            future.cancel(true);
        }
        // 只要調(diào)度器沒有停止,就再指定等待時(shí)間之后在執(zhí)行一次同樣的任務(wù)
        if (!scheduler.isShutdown()) {
            // todo 下一次時(shí)間 再次執(zhí)行這個(gè)任務(wù)
            //這里就是周期性任務(wù)的原因:只要沒有停止調(diào)度器,就再創(chuàng)建一次性任務(wù),執(zhí)行時(shí)間時(shí)delay的值,
            //假設(shè)外部調(diào)用時(shí)傳入的超時(shí)時(shí)間為30秒(構(gòu)造方法的入?yún)imeout),最大間隔時(shí)間為50秒(構(gòu)造方法的入?yún)xpBackOffBound)
            //如果最近一次任務(wù)沒有超時(shí),那么就在30秒后開始新任務(wù),
            //如果最近一次任務(wù)超時(shí)了,那么就在50秒后開始新任務(wù)(異常處理中有個(gè)乘以二的操作,乘以二后的60秒超過了最大間隔50秒)
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }

這樣就實(shí)現(xiàn)了每隔30s調(diào)用一個(gè)拉取注冊表的任務(wù)。

4.3.2 定時(shí)服務(wù)續(xù)約任務(wù)

private void initScheduledTasks() {
    ...
    // 開啟注冊
    if (clientConfig.shouldRegisterWithEureka()) {
        // todo 服務(wù)續(xù)約定時(shí)任務(wù)
        // 續(xù)約間隔時(shí)間 30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 指定client從server更新注冊表的最大時(shí)間間隔指數(shù)(倍數(shù)),默認(rèn)為10
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
        // Heartbeat timer
        // todo 續(xù)約,心跳定時(shí)任務(wù)
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );
        // 續(xù)約定時(shí)任務(wù)
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);

每30s 執(zhí)行一次服務(wù)續(xù)約。直接看下HeartbeatThread 類。

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

走的是renew 方法請求服務(wù)續(xù)約,成功后會(huì)更新lastSuccessfulHeartbeatTimestamp 字段。

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        // 如果是沒有發(fā)現(xiàn)該實(shí)例信息的話
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            // todo 進(jìn)行服務(wù)注冊,如果我們不在配置文件中指定服務(wù)初始化就注冊該服務(wù),那么服務(wù)的注冊實(shí)際是在這里注冊的
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

很簡單,就是調(diào)用 eurekaTransport.registrationClient.sendHeartBeat 方法發(fā)送服務(wù)續(xù)約的請求,如果你實(shí)例信息在Eureka Server中不存在的話,就進(jìn)行服務(wù)注冊,我們再稍微看下sendHeartBeat 方法,里面請求uri就是 String urlPath = “apps/” + appName + ‘/’ + id;

服務(wù)續(xù)約請求:PUT請求, path為:apps/{appName}/{instanceId}

4.3.3 定時(shí)更新Client信息給Server任務(wù)

private void initScheduledTasks() {
    ...
    // 開啟注冊
    if (clientConfig.shouldRegisterWithEureka()) {
        ...
        // todo 定時(shí)更新Client信息給服務(wù)端
        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }
            // 監(jiān)聽到StatusChangeEvent 事件,調(diào)用notify方法
            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                logger.info("Saw local status change event {}", statusChangeEvent);
                // todo 通知執(zhí)行方法,這個(gè)方法就是立即向 服務(wù)端發(fā)起注冊請求
                instanceInfoReplicator.onDemandUpdate();
            }
        };
        // 向applicationInfoManager 中注冊 狀態(tài)變化事件監(jiān)聽器
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        // todo  參數(shù)默認(rèn)40s
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } 
    ...
}

我們看下這個(gè)start啟動(dòng) 方法:

public void start(int initialDelayMs) {
     if (started.compareAndSet(false, true)) {
         instanceInfo.setIsDirty();  // for initial register
         Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
         scheduledPeriodicRef.set(next);
     }
}

這里有個(gè)非常重要的點(diǎn),調(diào)用了實(shí)例信息的setIsDirty 方法,后面的注釋說是為了初始化服務(wù)注冊。

創(chuàng)建一個(gè)延時(shí)任務(wù),默認(rèn)是40s??纯?0s執(zhí)行啥東西。com.netflix.discovery.InstanceInfoReplicator#run:

public void run() {
    try {
        // 刷新實(shí)例信息
        discoveryClient.refreshInstanceInfo();
        // 獲取臟的時(shí)間戳
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            // todo 客戶端重新發(fā)起  注冊請求
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

如果這個(gè)時(shí)間戳不是null的話,調(diào)用register 方法進(jìn)行服務(wù)注冊,這個(gè)時(shí)間戳肯定不是null的, instanceInfo.setIsDirty(); // for initial register 我們上面這個(gè)方法就是設(shè)置了這個(gè)時(shí)間戳。最后又將這個(gè)任務(wù)放入延時(shí)調(diào)度中。

其實(shí)這個(gè)定時(shí)任務(wù)是為了檢測服務(wù)信息有沒有變動(dòng),如果有變動(dòng)重新注冊到Eureka Server上去。

下面我們來看一下狀態(tài)改變監(jiān)聽器statusChangeListener:

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }
    // 監(jiān)聽到StatusChangeEvent 事件,調(diào)用notify方法
    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        logger.info("Saw local status change event {}", statusChangeEvent);
        // todo 通知執(zhí)行方法,這個(gè)方法就是立即向 服務(wù)端發(fā)起注冊請求
        instanceInfoReplicator.onDemandUpdate();
    }
};
// 向applicationInfoManager 中注冊 狀態(tài)變化事件監(jiān)聽器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

如果 Eureka Client 狀態(tài)發(fā)生變化(在Spring Boot 通過 Actuator 對服務(wù)狀態(tài)進(jìn)行監(jiān)控,具體實(shí)現(xiàn)為 EurekaHealthCheckHandler),注冊在 ApplicationInfoManager 的狀態(tài)改變監(jiān)控器將會(huì)被觸發(fā),從而調(diào)用InstanceInfoReplicator#onDemandUpdate方法,檢查服務(wù)實(shí)例信息和服務(wù)狀態(tài)的變化,可能會(huì)引起按需注冊任務(wù),代碼如下:

public boolean onDemandUpdate() {
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            // 提交
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        // 取消定時(shí)任務(wù)
                        latestPeriodic.cancel(false);
                    }
                    // todo 執(zhí)行 向 Server端重新 注冊的請求
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

InstanceInfoReplicator#onDemandUpdate 方法中調(diào)用 InstanceInfoReplicator#run 方法檢查服務(wù)實(shí)例信息和服務(wù)狀態(tài)的變化,并在服務(wù)實(shí)例信息和服務(wù)狀態(tài)發(fā)生變化的情況下向 Eureka Server 發(fā)起重新注冊的請求,為了防止重新執(zhí)行 run 方法,onDemandUpdate 方法還會(huì)取消執(zhí)行上次已經(jīng)提交且未完成的 run方法,執(zhí)行最新的按需注冊任務(wù)。

4.4 總結(jié)

服務(wù)注冊的時(shí)機(jī)

Client提交register()請求的情況有三種:

  • 在應(yīng)用啟動(dòng)時(shí)就可以直接進(jìn)行register(),不過,需要提前在配置文件中配置
  • renew時(shí),如果server端返回的是NOT_FOUND,則提交register()
  • 當(dāng)Client的配置信息發(fā)生了變更,則Client提交register()

Client實(shí)例化

Eureka Client 實(shí)例化的時(shí)候有幾個(gè)重要步驟,分別如下:

全量拉取注冊表信息,放入自己本地注冊表中。

創(chuàng)建定時(shí)任務(wù),

  • 定時(shí)服務(wù)續(xù)約任務(wù),默認(rèn)是30s,
  • 定時(shí)更新 客戶端注冊表信息,默認(rèn)是30s,
  • 定時(shí)更新Client信息給Server端,重新服務(wù)注冊,默認(rèn)是40s。

參考文章

eureka-0.10.11源碼(注釋)

springcloud-source-study學(xué)習(xí)github地址

以上就是Eureka源碼閱讀Client啟動(dòng)入口注冊續(xù)約及定時(shí)任務(wù)的詳細(xì)內(nèi)容,更多關(guān)于Eureka源碼Client啟動(dòng)入口的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論