SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析詳解
一、前言
微服務(wù)續(xù)約都有通用的設(shè)計,就是(微服務(wù))客戶端使用心跳機制向注冊中心報告自己還活著(可以提供服務(wù)),它們的心跳機制略有不同。而Eureka Client客戶端會每隔 30 秒發(fā)送一次心跳來續(xù)約,通過續(xù)約來告知 Eureka Server注冊中心該 Eureka Client客戶端正常運行,沒有出現(xiàn)問題。那么心跳機制是什么呢、底層基于什么的?客戶端發(fā)送心跳的代碼在哪里?注冊中心怎么處理的?
二、客戶端續(xù)約
真正觸發(fā)的還是SpringBoot的自動裝配,這里不會過多贅述,下面直奔主題:
1、入口
構(gòu)造初始化
private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor heartbeatExecutor; @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { ......此處省略n行代碼..... try { // default size of 2 - 1 each for heartbeat and cacheRefresh心跳和緩存刷新的默認大小分別為2-1 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 心跳執(zhí)行者 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff ......此處省略n行代碼..... // 最后,初始化調(diào)度任務(wù)(例如,集群解析器、 heartbeat、 instanceInfo replicator、 fetch initScheduledTasks(); ......此處省略n行代碼..... }
主要邏輯:
scheduler和heartbeatExecutor都是DiscoveryClient的私有成員變量,并且是final的,故在構(gòu)造方法中必須初始化。而DiscoveryClient的構(gòu)造初始化前面也講了,是在SpringBoot的自動裝配過程調(diào)用的。構(gòu)造方法中:
1)scheduler是交給jdk的Executors工具類創(chuàng)建的,核心線程數(shù)為2(心跳和緩存刷新需用到)。
2)直接調(diào)用ThreadPoolExecutor原生構(gòu)造方法初始化,核心線程數(shù)為1,使用SynchronousQueue隊列。
3)最后,初始化調(diào)度任務(wù)(例如,集群解析器、 心跳、 服務(wù)實例復(fù)制、 刷新),進入下面邏輯分析
initScheduledTasks()調(diào)度執(zhí)行心跳任務(wù)
private void initScheduledTasks() { if (clientConfig.shouldRegisterWithEureka()) { /* LeaseInfo: public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30; // Client settings private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL; */ // 默認30 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer心跳任務(wù) heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); // 默認的情況下會每隔30秒向注冊中心 (eureka.instance.lease-renewal-interval-in-seconds)發(fā)送一次心跳來進行服務(wù)續(xù)約 scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator實例信息復(fù)制任務(wù) instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 狀態(tài)變更監(jiān)聽者 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { // Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING] logger.info("Saw local status change event {}", statusChangeEvent); instanceInfoReplicator.onDemandUpdate(); } }; // 初始化狀態(tài)變更監(jiān)聽者 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 定時刷新服務(wù)實例信息和檢查應(yīng)用狀態(tài)的變化,在服務(wù)實例信息發(fā)生改變的情況下向server重新發(fā)起注冊 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
主要邏輯:
- 根據(jù)配置決定是否發(fā)送心跳,默認會發(fā)送。在LeaseInfo租約信息中維護發(fā)送心跳時間,默認間隔為30秒,可以在yml配置文件(eureka.instance.lease-renewal-interval-in-seconds)中更改默認值。
- 初始化heartbeatTask,真正的心跳任務(wù)為HeartbeatThread類型(下面3分析)。
- 調(diào)度執(zhí)行心跳任務(wù),默認的情況下會每隔30秒向注冊中心 (eureka.instance.lease-renewal-interval-in-seconds)發(fā)送一次心跳來進行服務(wù)續(xù)約
- 本方法下面的邏輯上一節(jié)已經(jīng)分析
2、TimedSupervisorTask組件
可見TimedSupervisorTask是Runnable類型的任務(wù),那么它的任務(wù)邏輯在run()方法。
構(gòu)造初始化
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.name = name; this.scheduler = scheduler; // heartbeatExecutor或cacheRefreshExecutor this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); // HeartbeatThread或CacheRefreshThread等類型任務(wù) this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. successCounter = Monitors.newCounter("success"); timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); }
初始化自己的一些字段,在阿里Java編程規(guī)范中是強烈建議給線程起別名的,這樣便于監(jiān)控排查問題等。name線程名字,在心跳任務(wù)中為heartbeat;scheduler字段傳遞進來是為了周期性執(zhí)行任務(wù);executor用于提交任務(wù),下面分析;task任務(wù),為HeartbeatThread或CacheRefreshThread類型任務(wù);delay,用于存儲以及計算延遲時間;最大延遲時間不能超過maxDelay。
TimedSupervisorTask#run()任務(wù)邏輯
@Override public void run() { // Future模式 Future<?> future = null; try { // 提交任務(wù)待執(zhí)行 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 阻塞直到完成或超時 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // 更新以便計算延遲時間 delay.set(timeoutMillis); // 更新 threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { // 任務(wù)主管超時了 logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); // 任務(wù)線程超時的時候,就把delay變量翻倍,但不會超過外部調(diào)用時設(shè)定的最大延時時間 long newDelay = Math.min(maxDelay, currentDelay * 2); // CAS更新延遲時間,考慮到多線程,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 一旦線程池的阻塞隊列中放滿了待處理任務(wù),觸發(fā)了拒絕策略 if (executor.isShutdown() || scheduler.isShutdown()) { // 線程池關(guān)閉,拒絕任務(wù) logger.warn("task supervisor shutting down, reject the task", e); } else { // 線程池拒絕任務(wù) logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { // 任務(wù)主管關(guān)閉,不能接受任務(wù) logger.warn("task supervisor shutting down, can't accept the task"); } else { // 任務(wù)主管拋出了一個異常 logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { // 上面的異常catch了沒有外跑,下面繼續(xù)運行 if (future != null) { // 中斷 future.cancel(true); } // 調(diào)度器沒有關(guān)閉,延遲繼續(xù)周期性執(zhí)行任務(wù)。這樣的周期性任務(wù)時間設(shè)置靈活 if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
主要邏輯:
- 提交任務(wù)待執(zhí)行
- 阻塞直到完成或超時
- 更新以便計算延遲時間
異常處理:
- 超時異常,任務(wù)線程超時的時候,就把delay變量翻倍,但不會超過外部調(diào)用時設(shè)定的最大延時時間。CAS更新延遲時間,考慮到多線程,所以用了CAS。
- 拒絕執(zhí)行異常,一旦線程池的阻塞隊列中放滿了待處理任務(wù),觸發(fā)了拒絕策略。
- 其他異常
上面的異常catch了沒有外跑,下面繼續(xù)運行:
1)丟棄當(dāng)前任務(wù);
2)調(diào)度器沒有關(guān)閉,延遲繼續(xù)周期性執(zhí)行任務(wù)。這樣的周期性任務(wù)時間設(shè)置靈活
3、心跳任務(wù)
HeartbeatThread私有內(nèi)部類
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
HeartbeatThread實現(xiàn)了Runnable接口,也就是上面說的task,它的邏輯封裝了出去:
發(fā)送心跳
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { // 發(fā)送心跳 httpResponse = eurekaTransport.registrationClient .sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); // 打印日志,如:心跳狀態(tài) logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); // 響應(yīng)狀態(tài)沒有找到重新注冊 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { // 無法發(fā)送心跳! logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
主要邏輯:
- EurekaTransport是DiscoevryClient的內(nèi)部類,封裝了幾個與注冊中心通信的XXXclient。獲取通信類發(fā)送心跳請求,傳入appName、唯一ID以及instanceInfo。
- 響應(yīng)狀態(tài)沒有找到重新注冊,畢竟當(dāng)前客戶端是運行中正常狀態(tài)
- 返回是否發(fā)送心跳成功
4、發(fā)送心跳到注冊中心
構(gòu)建請求數(shù)據(jù)發(fā)送心跳
@Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { // 拼接請求URL String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { // 構(gòu)建請求資源 WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); // 請求注冊中心 response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity() && !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } // 返回構(gòu)建結(jié)果 return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { // 關(guān)閉資源 response.close(); } } }
主要邏輯:
- 拼接請求URL
- 構(gòu)建請求資源數(shù)據(jù)
- 請求注冊中心
- 返回構(gòu)建結(jié)果
- 關(guān)閉資源
三、服務(wù)端處理客戶端續(xù)約
InstanceRegistry父子關(guān)系圖上一節(jié)也分析了,在使用super關(guān)鍵字時注意一下。
1、InstanceRegistry#renew()邏輯
@Override public boolean renew(final String appName, final String serverId, boolean isReplication) { log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); List<Application> applications = getSortedApplications(); for (Application input : applications) { if (input.getName().equals(appName)) { InstanceInfo instance = null; for (InstanceInfo info : input.getInstances()) { if (info.getId().equals(serverId)) { instance = info; break; } } // 發(fā)布服務(wù)實例處理心跳事件 publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } // 調(diào)用父類的處理心跳方法 return super.renew(appName, serverId, isReplication); }
這里邏輯主要是委托父類處理心跳,具體邏輯見下面分析:
2、PeerAwareInstanceRegistryImpl#renew()邏輯
public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { // 服務(wù)續(xù)約成功,將所有的Eureka操作復(fù)制到對等的Eureka節(jié)點,除了復(fù)制到此節(jié)點的流量。 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
PeerAwareInstanceRegistryImpl職責(zé):處理將所有操作復(fù)制到 AbstractInstanceRegistry的peer Eureka節(jié)點,以保持所有操作同步。這里如果服務(wù)續(xù)約成功,將所有的Eureka操作復(fù)制到對等的Eureka節(jié)點,除了復(fù)制到此節(jié)點的流量。
3、AbstractInstanceRegistry#renew()邏輯
public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); // 根據(jù)appName從本地注冊表服務(wù)實例信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { // 沒有找到租約 RENEW_NOT_FOUND.increment(isReplication); // 注冊: 租約不存在,注冊資源: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { // 獲取服務(wù)實例信息 InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { // 實例狀態(tài){}與實例{}的重寫實例狀態(tài){}不同。因此將狀態(tài)設(shè)置為覆蓋狀態(tài) logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), overriddenInstanceStatus.name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); // 續(xù)約 leaseToRenew.renew(); return true; } }
主要邏輯:
- 根據(jù)appName從本地注冊表服務(wù)實例信息
- 沒有找到租約,返回false
- 獲取到的服務(wù)實例狀態(tài)為UNKNOWN,返回false;續(xù)約,更新續(xù)約字段,下面分析
Lease#renew()邏輯
public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; }
lastUpdateTimestamp是Lease租約的字段,維護租約時間,在服務(wù)剔除下線會根據(jù)該字段判斷是否過期需要對服務(wù)剔除下線處理。下一篇我們就來探討一下,敬請期待!??!
到此這篇關(guān)于SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析詳解的文章就介紹到這了,更多相關(guān)SpringCloud微服務(wù)續(xù)約內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中List與數(shù)組相互轉(zhuǎn)換實例分析
這篇文章主要介紹了Java中List與數(shù)組相互轉(zhuǎn)換的方法,實例分析了Java中List與數(shù)組相互轉(zhuǎn)換中容易出現(xiàn)的問題與相關(guān)的解決方法,具有一定參考借鑒價值,需要的朋友可以參考下2015-05-05IDEA使用properties配置文件進行mysql數(shù)據(jù)庫連接的教程圖解
Properties類是 鍵和值均為字符串的可以永久存儲到文件中的key-value集合。這篇文章主要介紹了IDEA使用properties配置文件進行mysql數(shù)據(jù)路連接 ,需要的朋友可以參考下2018-10-10java實現(xiàn)拉鉤網(wǎng)上的FizzBuzzWhizz問題示例
這篇文章主要介紹了java實現(xiàn)拉鉤網(wǎng)上的FizzBuzzWhizz問題示例,需要的朋友可以參考下2014-05-05使用Spring Boot Mybatis 搞反向工程的步驟
這篇文章主要介紹了使用Spring Boot Mybatis 搞反向工程的步驟,幫助大家更好的理解和使用spring boot框架,感興趣的朋友可以了解下2021-01-01java生成json實現(xiàn)隱藏掉關(guān)鍵屬性
這篇文章主要介紹了java生成json實現(xiàn)隱藏掉關(guān)鍵屬性,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03