SpringCloud微服務(wù)剔除下線功能實(shí)現(xiàn)原理分析
一、前言
上一篇SpringCloud微服務(wù)續(xù)約源碼解析已經(jīng)分析了心跳機(jī)制是什么、底層實(shí)現(xiàn)、客戶端發(fā)送心跳的主要代碼、注冊(cè)中心處理心跳的過(guò)程,這節(jié)跟它是緊密關(guān)聯(lián)的。聯(lián)系的樞紐就是lastUpdateTimestamp最后更新時(shí)間戳,它是Lease租約類的一個(gè)用volatile關(guān)鍵字修飾的對(duì)其他線程透明可見(jiàn)的字段。那么Eureka是如何使用該字段判斷服務(wù)是否過(guò)期的?然后進(jìn)行服務(wù)的剔除下線?需要借助什么機(jī)制?該機(jī)制是什么時(shí)候能觸發(fā)的?帶著這些問(wèn)題,我們下面來(lái)探究一番:
二、微服務(wù)剔除下線源碼解析
EurekaBootStrap是Eureka項(xiàng)目里面的,用于啟動(dòng)Eureka服務(wù)器的類:
Eureka 服務(wù)器使用類路徑中eureka.server.props指定的EurekaServerConfig進(jìn)行配置。Eureka客戶端組件也是通過(guò)使用eureka.client.props指定的配置 EurekaInstanceConfig初始化的。如果服務(wù)器在AWS云中運(yùn)行,則eureka服務(wù)器將其綁定到指定的彈性ip。
1、EurekaBootStrap#contextInitialized()
@Override public void contextInitialized(ServletContextEvent event) { try { initEurekaEnvironment(); // 初始化注冊(cè)中心上下文 initEurekaServerContext(); ServletContext sc = event.getServletContext(); sc.setAttribute(EurekaServerContext.class.getName(), serverContext); } catch (Throwable e) { logger.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
它這里也使用了事件機(jī)制,但是不是基于Spring的,感興趣的可以去了解下。初始化注冊(cè)中心上下文,即下面的處理邏輯:
1.1、初始化注冊(cè)中心上下文
protected void initEurekaServerContext() throws Exception { EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); logger.info("Initializing the eureka client..."); logger.info(eurekaServerConfig.getJsonCodecName()); ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig); ApplicationInfoManager applicationInfoManager = null; if (eurekaClient == null) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); } PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); } PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); EurekaServerContextHolder.initialize(serverContext); serverContext.initialize(); logger.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = registry.syncUp(); registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
做一些初始化工作,重點(diǎn)關(guān)注registry.openForTraffic(applicationInfoManager, registryCount);的調(diào)用,進(jìn)入下面處理邏輯:
1.2、openForTraffic()邏輯
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 更新每30秒發(fā)生一次,一分鐘應(yīng)該是2倍。 this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } // 更改服務(wù)實(shí)例狀態(tài)為UP logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 調(diào)用父類初始化 super.postInit(); }
更改服務(wù)實(shí)例狀態(tài)為UP,調(diào)用父類初始化。
1.3、postInit()執(zhí)行任務(wù)
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
終于來(lái)到剔除任務(wù)了,前面說(shuō)了什么,就是一些初始化的工作。它這里的執(zhí)行器是Timer,跟Nacos不一樣,區(qū)別的話感興趣的就自行去搞個(gè)明白。我們進(jìn)入下面的分析:
1.4、剔除任務(wù)
EvictionTask是TimerTask類型任務(wù)。
class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } /** * 計(jì)算一個(gè)補(bǔ)償時(shí)間,該時(shí)間定義為自上一次迭代以來(lái)該任務(wù)的實(shí)際執(zhí)行時(shí)間,與配置的執(zhí)行時(shí)間量相比較。 * 這對(duì)于時(shí)間變化(例如由于時(shí)鐘偏差或 gc)導(dǎo)致實(shí)際的驅(qū)逐任務(wù)根據(jù)配置的周期在所需時(shí)間之后執(zhí)行的情況 * 非常有用。 */ long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } }
主要邏輯:
計(jì)算一個(gè)補(bǔ)償時(shí)間,該時(shí)間定義為自上一次迭代以來(lái)該任務(wù)的實(shí)際執(zhí)行時(shí)間,與配置的執(zhí)行時(shí)間量相比較。這對(duì)于時(shí)間變化(例如由于時(shí)鐘偏差或 gc)導(dǎo)致實(shí)際的驅(qū)逐任務(wù)根據(jù)配置的周期在所需時(shí)間之后執(zhí)行的情況非常有用。
調(diào)用evict(compensationTimeMs)剔除處理,下面分析:
2、服務(wù)剔除下線
2.1、AbstractInstanceRegistry#evict()邏輯
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { // DS: 租約到期目前已禁用。 logger.debug("DS: lease expiration is currently disabled."); return; } // 我們首先收集所有過(guò)期的物品,以隨機(jī)的順序驅(qū)逐它們。對(duì)于大型驅(qū)逐集,如果我們不這樣做, // 我們可能會(huì)在自我保護(hù)啟動(dòng)之前刪除整個(gè)應(yīng)用程序。通過(guò)隨機(jī)化,影響應(yīng)該均勻地分布在所有應(yīng)用程序中。 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // 判斷租約是否過(guò)期 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { // 添加到過(guò)期續(xù)租集合 expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. // 為了補(bǔ)償 GC 暫?;蚱频谋镜貢r(shí)間,我們需要使用當(dāng)前的注冊(cè)表大小作為觸發(fā)自我保存的基礎(chǔ)。 // 沒(méi)有這個(gè),我們就會(huì)清除整個(gè)注冊(cè)表。 // 獲取注冊(cè)表租約總數(shù) int registrySize = (int) getLocalRegistrySize(); // 計(jì)算注冊(cè)表租約的閾值 (總數(shù)乘以 續(xù)租百分比 默認(rèn)85%),得出要續(xù)租的數(shù)量 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); // 理論要剔除的數(shù)量 = 總數(shù)-要續(xù)租的數(shù)量 int evictionLimit = registrySize - registrySizeThreshold; // 實(shí)際剔除的數(shù)量 = min(實(shí)際租期到期服務(wù)實(shí)例個(gè)數(shù),理論剔除數(shù)量) int toEvict = Math.min(expiredLeases.size(), evictionLimit); // 將要剔除數(shù)量大于0,把它們下線處理,從本地注冊(cè)表移除掉以保證高可用 if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // 選擇一個(gè)隨機(jī)的項(xiàng)目(Knuth 洗牌算法) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); // 注冊(cè)表: {}/{}的租約已過(guò)期 logger.warn("DS: Registry: expired lease for {}/{}", appName, id); // 服務(wù)下線 internalCancel(appName, id, false); } } }
主要邏輯:
- 判斷租約到期是否禁用,如果禁用return。默認(rèn)啟用
- 首先收集所有過(guò)期的租約,以隨機(jī)的順序剔除它們。對(duì)于大型剔除集,如果不這樣做,可能會(huì)在自我保護(hù)啟動(dòng)之前刪除整個(gè)應(yīng)用程序。通過(guò)隨機(jī)化,影響應(yīng)該均勻地分布在所有應(yīng)用程序中。判斷租約是否過(guò)期,如果過(guò)期添加到過(guò)期租約集合,繼續(xù)遍歷到。
- 為了補(bǔ)償 GC 暫?;蚱频谋镜貢r(shí)間,需要使用當(dāng)前的注冊(cè)表大小作為觸發(fā)自我保存的基礎(chǔ)。沒(méi)有這個(gè),就會(huì)清除整個(gè)注冊(cè)表。1)獲取注冊(cè)表租約總數(shù);2)計(jì)算注冊(cè)表租約的閾值 (總數(shù)乘以 續(xù)租百分比 默認(rèn)85%),得出要續(xù)租的數(shù)量;3)理論要剔除的數(shù)量 = 總數(shù)-要續(xù)租的數(shù)量;4)實(shí)際剔除的數(shù)量 = min(實(shí)際租期到期服務(wù)實(shí)例個(gè)數(shù),理論剔除數(shù)量);
- 將要剔除數(shù)量大于0,把它們下線處理,從本地注冊(cè)表移除掉以保證高可用:選擇一個(gè)隨機(jī)的項(xiàng)目(Knuth 洗牌算法),調(diào)用internalCancel(appName, id, false)下線處理。
2.1、判斷是否過(guò)期
public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); }
如果是cancel()處理前面的值就大于0,一般是判斷后面部分邏輯:如果當(dāng)前系統(tǒng)時(shí)間戳小于后面的時(shí)間戳之和,則沒(méi)有過(guò)期;否則大于就是過(guò)期了。
duration的值也可以通過(guò)配置文件更改,通過(guò)yml配置文件中eureka:instance:lease-expiration-duration-in-seconds:指定,不過(guò)必須大于eureka:instance:lease-renewal-interval-in-seconds默認(rèn)值或指定值。設(shè)置duration太長(zhǎng)可能意味著即使實(shí)例不存在,流量也可能被路由到該實(shí)例。將此值設(shè)置得太小可能意味著,由于臨時(shí)網(wǎng)絡(luò)故障,該實(shí)例可能會(huì)從流量中刪除。因此duration的值要設(shè)置為至少高于eureka:instance:lease-renewal-interval-in-seconds中默認(rèn)的或指定的值。
2.2、從本地列表異常下線處理
cancel(String,String,boolean)方法被PeerAwareInstanceRegistry重寫了,因此每個(gè)取消請(qǐng)求都被復(fù)制到對(duì)等點(diǎn)。然而,對(duì)于在遠(yuǎn)程對(duì)等點(diǎn)中被視為有效取消的過(guò)期,這是不需要的,因此自我保存模式不會(huì)啟用。
protected boolean internalCancel(String appName, String id, boolean isReplication) { // 加鎖 read.lock(); try { CANCEL.increment(isReplication); // 根據(jù)appName從本地注冊(cè)表獲取租約服務(wù)實(shí)例 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { // 根據(jù)唯一ID從本地移除服務(wù)實(shí)例,下線 leaseToCancel = gMap.remove(id); } recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { // 下線失敗,因?yàn)樽饧s信息中不存在該服務(wù)實(shí)例 CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { // 通過(guò)更新剔除時(shí)間取消租約。 leaseToCancel.cancel(); // 從租約獲取服務(wù)實(shí)例 InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } // 使特定應(yīng)用程序的緩存失效 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); } } finally { // 釋放鎖 read.unlock(); } synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; }
主要邏輯:
- 獲取鎖后,根據(jù)appName從本地注冊(cè)表獲取租約服務(wù)實(shí)例
- 根據(jù)唯一ID從本地移除服務(wù)實(shí)例,下線
- 如果需下線租約信息為空,則下線失敗,因?yàn)樽饧s信息中不存在該服務(wù)實(shí)例,return假;否則可能通過(guò)更新剔除時(shí)間取消租約,從租約獲取服務(wù)實(shí)例以便使特定應(yīng)用程序的緩存失效
- 釋放鎖
到此這篇關(guān)于SpringCloud微服務(wù)剔除下線功能實(shí)現(xiàn)原理分析的文章就介紹到這了,更多相關(guān)SpringCloud微服務(wù)剔除下線內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中定時(shí)任務(wù)@Scheduled的多線程使用詳解
這篇文章主要為大家詳細(xì)介紹了pring Boot定時(shí)任務(wù)@Scheduled的多線程原理以及如何加入線程池來(lái)處理定時(shí)任務(wù),感興趣的可以了解一下2023-04-04Java實(shí)現(xiàn)合并多個(gè)PDF的示例代碼
這篇文章主要介紹了通過(guò)Java實(shí)現(xiàn)合并多個(gè)PDF,并將合并后的新PDF存儲(chǔ)到文件夾下,文中的示例代碼簡(jiǎn)潔易懂,感興趣的可以跟隨小編一起試一試2022-01-01Mybatis 查詢語(yǔ)句條件為枚舉類型時(shí)報(bào)錯(cuò)的解決
這篇文章主要介紹了Mybatis 查詢語(yǔ)句條件為枚舉類型時(shí)報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Spring Cloud Hystrix實(shí)現(xiàn)服務(wù)容錯(cuò)的方法
Hystrix是SpringCloud中重要的熔斷保護(hù)組件,由Netflix開(kāi)源,主要提供延遲和容錯(cuò)管理,以保障分布式系統(tǒng)的高可用性和魯棒性,通過(guò)封裝依賴項(xiàng)實(shí)現(xiàn)服務(wù)間隔離,引入回退邏輯應(yīng)對(duì)依賴服務(wù)故障,有效防止系統(tǒng)崩潰和服務(wù)級(jí)聯(lián)故障2024-10-10Mybatis實(shí)現(xiàn)數(shù)據(jù)的增刪改查實(shí)例(CRUD)
本篇文章主要介紹了Mybatis實(shí)現(xiàn)數(shù)據(jù)的增刪改查實(shí)例(CRUD),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05SpringBoot Maven Clean報(bào)錯(cuò)解決方案
這篇文章主要介紹了SpringBoot Maven Clean報(bào)錯(cuò)解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03SpringBoot實(shí)現(xiàn)阿里云快遞物流查詢的示例代碼
本文將基于springboot實(shí)現(xiàn)快遞物流查詢,物流信息的獲取通過(guò)阿里云第三方實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2021-10-10