欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析詳解

 更新時(shí)間:2022年11月23日 11:38:26   作者:卡布奇諾-海晨  
這篇文章主要介紹了SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析,服務(wù)續(xù)期和服務(wù)注冊(cè)非常相似,服務(wù)注冊(cè)在Eureka?Client程序啟動(dòng)之后開(kāi)啟,并同時(shí)開(kāi)啟服務(wù)續(xù)期的定時(shí)任務(wù)

一、前言

微服務(wù)續(xù)約都有通用的設(shè)計(jì),就是(微服務(wù))客戶(hù)端使用心跳機(jī)制向注冊(cè)中心報(bào)告自己還活著(可以提供服務(wù)),它們的心跳機(jī)制略有不同。而Eureka Client客戶(hù)端會(huì)每隔 30 秒發(fā)送一次心跳來(lái)續(xù)約,通過(guò)續(xù)約來(lái)告知 Eureka Server注冊(cè)中心該 Eureka Client客戶(hù)端正常運(yùn)行,沒(méi)有出現(xiàn)問(wèn)題。那么心跳機(jī)制是什么呢、底層基于什么的?客戶(hù)端發(fā)送心跳的代碼在哪里?注冊(cè)中心怎么處理的?

二、客戶(hù)端續(xù)約

真正觸發(fā)的還是SpringBoot的自動(dòng)裝配,這里不會(huì)過(guò)多贅述,下面直奔主題:

1、入口

構(gòu)造初始化

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor heartbeatExecutor;
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
                    AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            ......此處省略n行代碼.....
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh心跳和緩存刷新的默認(rèn)大小分別為2-1
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            // 心跳執(zhí)行者
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            ......此處省略n行代碼.....
        // 最后,初始化調(diào)度任務(wù)(例如,集群解析器、 heartbeat、 instanceInfo replicator、 fetch
        initScheduledTasks();
            ......此處省略n行代碼.....
}

主要邏輯:

scheduler和heartbeatExecutor都是DiscoveryClient的私有成員變量,并且是final的,故在構(gòu)造方法中必須初始化。而DiscoveryClient的構(gòu)造初始化前面也講了,是在SpringBoot的自動(dòng)裝配過(guò)程調(diào)用的。構(gòu)造方法中:

1)scheduler是交給jdk的Executors工具類(lèi)創(chuàng)建的,核心線(xiàn)程數(shù)為2(心跳和緩存刷新需用到)。

2)直接調(diào)用ThreadPoolExecutor原生構(gòu)造方法初始化,核心線(xiàn)程數(shù)為1,使用SynchronousQueue隊(duì)列。

3)最后,初始化調(diào)度任務(wù)(例如,集群解析器、 心跳、 服務(wù)實(shí)例復(fù)制、 刷新),進(jìn)入下面邏輯分析

initScheduledTasks()調(diào)度執(zhí)行心跳任務(wù)

    private void initScheduledTasks() {
        if (clientConfig.shouldRegisterWithEureka()) {
            /*  LeaseInfo:
                public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
                // Client settings
                private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL;
             */
            // 默認(rèn)30
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
            // Heartbeat timer心跳任務(wù)
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            // 默認(rèn)的情況下會(huì)每隔30秒向注冊(cè)中心 (eureka.instance.lease-renewal-interval-in-seconds)發(fā)送一次心跳來(lái)進(jìn)行服務(wù)續(xù)約
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);
            // InstanceInfo replicator實(shí)例信息復(fù)制任務(wù)
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
            // 狀態(tài)變更監(jiān)聽(tīng)者
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    // Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING]
                    logger.info("Saw local status change event {}", statusChangeEvent);
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            // 初始化狀態(tài)變更監(jiān)聽(tīng)者
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 定時(shí)刷新服務(wù)實(shí)例信息和檢查應(yīng)用狀態(tài)的變化,在服務(wù)實(shí)例信息發(fā)生改變的情況下向server重新發(fā)起注冊(cè)
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

主要邏輯:

  • 根據(jù)配置決定是否發(fā)送心跳,默認(rèn)會(huì)發(fā)送。在LeaseInfo租約信息中維護(hù)發(fā)送心跳時(shí)間,默認(rèn)間隔為30秒,可以在yml配置文件(eureka.instance.lease-renewal-interval-in-seconds)中更改默認(rèn)值。
  • 初始化heartbeatTask,真正的心跳任務(wù)為HeartbeatThread類(lèi)型(下面3分析)。
  • 調(diào)度執(zhí)行心跳任務(wù),默認(rèn)的情況下會(huì)每隔30秒向注冊(cè)中心 (eureka.instance.lease-renewal-interval-in-seconds)發(fā)送一次心跳來(lái)進(jìn)行服務(wù)續(xù)約
  • 本方法下面的邏輯上一節(jié)已經(jīng)分析

2、TimedSupervisorTask組件

可見(jiàn)TimedSupervisorTask是Runnable類(lèi)型的任務(wù),那么它的任務(wù)邏輯在run()方法。

構(gòu)造初始化

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler;
        // heartbeatExecutor或cacheRefreshExecutor
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        // HeartbeatThread或CacheRefreshThread等類(lèi)型任務(wù)
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        // Initialize the counters and register.
        successCounter = Monitors.newCounter("success");
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

初始化自己的一些字段,在阿里Java編程規(guī)范中是強(qiáng)烈建議給線(xiàn)程起別名的,這樣便于監(jiān)控排查問(wèn)題等。name線(xiàn)程名字,在心跳任務(wù)中為heartbeat;scheduler字段傳遞進(jìn)來(lái)是為了周期性執(zhí)行任務(wù);executor用于提交任務(wù),下面分析;task任務(wù),為HeartbeatThread或CacheRefreshThread類(lèi)型任務(wù);delay,用于存儲(chǔ)以及計(jì)算延遲時(shí)間;最大延遲時(shí)間不能超過(guò)maxDelay。

TimedSupervisorTask#run()任務(wù)邏輯

    @Override
    public void run() {
        // Future模式
        Future<?> future = null;
        try {
            // 提交任務(wù)待執(zhí)行
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 阻塞直到完成或超時(shí)
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  
            // 更新以便計(jì)算延遲時(shí)間
            delay.set(timeoutMillis);
            // 更新
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            // 任務(wù)主管超時(shí)了
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            // 任務(wù)線(xiàn)程超時(shí)的時(shí)候,就把delay變量翻倍,但不會(huì)超過(guò)外部調(diào)用時(shí)設(shè)定的最大延時(shí)時(shí)間
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            // CAS更新延遲時(shí)間,考慮到多線(xiàn)程,所以用了CAS
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            // 一旦線(xiàn)程池的阻塞隊(duì)列中放滿(mǎn)了待處理任務(wù),觸發(fā)了拒絕策略
            if (executor.isShutdown() || scheduler.isShutdown()) {
                // 線(xiàn)程池關(guān)閉,拒絕任務(wù)
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                // 線(xiàn)程池拒絕任務(wù)
                logger.warn("task supervisor rejected the task", e);
            }
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                // 任務(wù)主管關(guān)閉,不能接受任務(wù)
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                // 任務(wù)主管拋出了一個(gè)異常
                logger.warn("task supervisor threw an exception", e);
            }
            throwableCounter.increment();
        } finally {
            // 上面的異常catch了沒(méi)有外跑,下面繼續(xù)運(yùn)行
            if (future != null) {
                // 中斷
                future.cancel(true);
            }
            // 調(diào)度器沒(méi)有關(guān)閉,延遲繼續(xù)周期性執(zhí)行任務(wù)。這樣的周期性任務(wù)時(shí)間設(shè)置靈活
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

主要邏輯:

  • 提交任務(wù)待執(zhí)行
  • 阻塞直到完成或超時(shí)
  • 更新以便計(jì)算延遲時(shí)間

異常處理:

  • 超時(shí)異常,任務(wù)線(xiàn)程超時(shí)的時(shí)候,就把delay變量翻倍,但不會(huì)超過(guò)外部調(diào)用時(shí)設(shè)定的最大延時(shí)時(shí)間。CAS更新延遲時(shí)間,考慮到多線(xiàn)程,所以用了CAS。
  • 拒絕執(zhí)行異常,一旦線(xiàn)程池的阻塞隊(duì)列中放滿(mǎn)了待處理任務(wù),觸發(fā)了拒絕策略。
  • 其他異常

上面的異常catch了沒(méi)有外跑,下面繼續(xù)運(yùn)行:

1)丟棄當(dāng)前任務(wù);

2)調(diào)度器沒(méi)有關(guān)閉,延遲繼續(xù)周期性執(zhí)行任務(wù)。這樣的周期性任務(wù)時(shí)間設(shè)置靈活

3、心跳任務(wù)

HeartbeatThread私有內(nèi)部類(lèi)

    private class HeartbeatThread implements Runnable {
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

HeartbeatThread實(shí)現(xiàn)了Runnable接口,也就是上面說(shuō)的task,它的邏輯封裝了出去:

發(fā)送心跳

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            // 發(fā)送心跳
            httpResponse = eurekaTransport.registrationClient
                    .sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            // 打印日志,如:心跳狀態(tài)
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            // 響應(yīng)狀態(tài)沒(méi)有找到重新注冊(cè)
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            // 無(wú)法發(fā)送心跳!
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

主要邏輯:

  • EurekaTransport是DiscoevryClient的內(nèi)部類(lèi),封裝了幾個(gè)與注冊(cè)中心通信的XXXclient。獲取通信類(lèi)發(fā)送心跳請(qǐng)求,傳入appName、唯一ID以及instanceInfo。
  • 響應(yīng)狀態(tài)沒(méi)有找到重新注冊(cè),畢竟當(dāng)前客戶(hù)端是運(yùn)行中正常狀態(tài)
  • 返回是否發(fā)送心跳成功

4、發(fā)送心跳到注冊(cè)中心

構(gòu)建請(qǐng)求數(shù)據(jù)發(fā)送心跳

    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        // 拼接請(qǐng)求URL
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            // 構(gòu)建請(qǐng)求資源
            WebResource webResource = jerseyClient.resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            // 請(qǐng)求注冊(cè)中心
            response = requestBuilder.put(ClientResponse.class);
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity() &&
                    !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            // 返回構(gòu)建結(jié)果
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP PUT {}{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                // 關(guān)閉資源
                response.close();
            }
        }
    }

主要邏輯:

  • 拼接請(qǐng)求URL
  • 構(gòu)建請(qǐng)求資源數(shù)據(jù)
  • 請(qǐng)求注冊(cè)中心
  • 返回構(gòu)建結(jié)果
  • 關(guān)閉資源

三、服務(wù)端處理客戶(hù)端續(xù)約

InstanceRegistry父子關(guān)系圖上一節(jié)也分析了,在使用super關(guān)鍵字時(shí)注意一下。

1、InstanceRegistry#renew()邏輯

	@Override
	public boolean renew(final String appName, final String serverId,
			boolean isReplication) {
		log("renew " + appName + " serverId " + serverId + ", isReplication {}"
				+ isReplication);
		List<Application> applications = getSortedApplications();
		for (Application input : applications) {
			if (input.getName().equals(appName)) {
				InstanceInfo instance = null;
				for (InstanceInfo info : input.getInstances()) {
					if (info.getId().equals(serverId)) {
						instance = info;
						break;
					}
				}
				// 發(fā)布服務(wù)實(shí)例處理心跳事件
				publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
						instance, isReplication));
				break;
			}
		}
		// 調(diào)用父類(lèi)的處理心跳方法
		return super.renew(appName, serverId, isReplication);
	}

這里邏輯主要是委托父類(lèi)處理心跳,具體邏輯見(jiàn)下面分析:

2、PeerAwareInstanceRegistryImpl#renew()邏輯

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            // 服務(wù)續(xù)約成功,將所有的Eureka操作復(fù)制到對(duì)等的Eureka節(jié)點(diǎn),除了復(fù)制到此節(jié)點(diǎn)的流量。
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

PeerAwareInstanceRegistryImpl職責(zé):處理將所有操作復(fù)制到 AbstractInstanceRegistry的peer Eureka節(jié)點(diǎn),以保持所有操作同步。這里如果服務(wù)續(xù)約成功,將所有的Eureka操作復(fù)制到對(duì)等的Eureka節(jié)點(diǎn),除了復(fù)制到此節(jié)點(diǎn)的流量。

3、AbstractInstanceRegistry#renew()邏輯

    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        // 根據(jù)appName從本地注冊(cè)表服務(wù)實(shí)例信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            // 沒(méi)有找到租約
            RENEW_NOT_FOUND.increment(isReplication);
            // 注冊(cè): 租約不存在,注冊(cè)資源:
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 獲取服務(wù)實(shí)例信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    // 實(shí)例狀態(tài){}與實(shí)例{}的重寫(xiě)實(shí)例狀態(tài){}不同。因此將狀態(tài)設(shè)置為覆蓋狀態(tài)
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    overriddenInstanceStatus.name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            // 續(xù)約
            leaseToRenew.renew();
            return true;
        }
    }

主要邏輯:

  • 根據(jù)appName從本地注冊(cè)表服務(wù)實(shí)例信息
  • 沒(méi)有找到租約,返回false
  • 獲取到的服務(wù)實(shí)例狀態(tài)為UNKNOWN,返回false;續(xù)約,更新續(xù)約字段,下面分析

Lease#renew()邏輯

    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
 
    }

lastUpdateTimestamp是Lease租約的字段,維護(hù)租約時(shí)間,在服務(wù)剔除下線(xiàn)會(huì)根據(jù)該字段判斷是否過(guò)期需要對(duì)服務(wù)剔除下線(xiàn)處理。下一篇我們就來(lái)探討一下,敬請(qǐng)期待?。?!

到此這篇關(guān)于SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析詳解的文章就介紹到這了,更多相關(guān)SpringCloud微服務(wù)續(xù)約內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論