欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Nacos客戶端配置中心緩存動(dòng)態(tài)更新實(shí)現(xiàn)源碼

 更新時(shí)間:2022年03月31日 11:45:34   作者:mic  
這篇文章主要為大家介紹了Nacos客戶端配置中心緩存動(dòng)態(tài)更新實(shí)現(xiàn)源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪

Nacos 作為配置中心,當(dāng)應(yīng)用程序去訪問(wèn)Nacos動(dòng)態(tài)獲取配置源之后,會(huì)緩存到本地內(nèi)存以及磁盤中。
由于Nacos作為動(dòng)態(tài)配置中心,意味著后續(xù)配置變更之后需要讓所有相關(guān)的客戶端感知,并更新本地內(nèi)存!

那么這個(gè)功能是在哪里實(shí)現(xiàn)的呢? 以及它是采用什么樣的方式來(lái)實(shí)現(xiàn)配置的更新的呢? 我們一起來(lái)探索一下源碼的實(shí)現(xiàn)!

客戶端配置緩存更新

當(dāng)客戶端拿到配置后,需要?jiǎng)討B(tài)刷新,從而保證數(shù)據(jù)和服務(wù)器端是一致的,這個(gè)過(guò)程是如何實(shí)現(xiàn)的呢?在這一小節(jié)中我們來(lái)做一個(gè)詳細(xì)分析。

Nacos采用長(zhǎng)輪訓(xùn)機(jī)制來(lái)實(shí)現(xiàn)數(shù)據(jù)變更的同步,原理如下!

整體工作流程如下:

  • 客戶端發(fā)起長(zhǎng)輪訓(xùn)請(qǐng)求
  • 服務(wù)端收到請(qǐng)求以后,先比較服務(wù)端緩存中的數(shù)據(jù)是否相同,如果不通,則直接返回
  • 如果相同,則通過(guò)schedule延遲29.5s之后再執(zhí)行比較
  • 為了保證當(dāng)服務(wù)端在29.5s之內(nèi)發(fā)生數(shù)據(jù)變化能夠及時(shí)通知給客戶端,服務(wù)端采用事件訂閱的方式來(lái)監(jiān)聽(tīng)服務(wù)端本地?cái)?shù)據(jù)變化的事件,一旦收到事件,則觸發(fā)DataChangeTask的通知,并且遍歷allStubs隊(duì)列中的ClientLongPolling,把結(jié)果寫回到客戶端,就完成了一次數(shù)據(jù)的推送
  • 如果 DataChangeTask 任務(wù)完成了數(shù)據(jù)的 “推送” 之后,ClientLongPolling 中的調(diào)度任務(wù)又開(kāi)始執(zhí)行了怎么辦呢?
    很簡(jiǎn)單,只要在進(jìn)行 “推送” 操作之前,先將原來(lái)等待執(zhí)行的調(diào)度任務(wù)取消掉就可以了,這樣就防止了推送操作寫完響應(yīng)數(shù)據(jù)之后,調(diào)度任務(wù)又去寫響應(yīng)數(shù)據(jù),這時(shí)肯定會(huì)報(bào)錯(cuò)的。所以,在ClientLongPolling方法中,最開(kāi)始的一個(gè)步驟就是刪除訂閱事件

長(zhǎng)輪訓(xùn)任務(wù)啟動(dòng)入口

在NacosConfigService的構(gòu)造方法中,當(dāng)這個(gè)類被實(shí)例化以后,有做一些事情

  • 初始化一個(gè)HttpAgent,這里又用到了裝飾起模式,實(shí)際工作的類是ServerHttpAgent, MetricsHttpAgent內(nèi)部也是調(diào)用了ServerHttpAgent的方法,增加了監(jiān)控統(tǒng)計(jì)的信息
  • ClientWorker, 客戶端的一個(gè)工作類,agent作為參數(shù)傳入到clientworker,可以基本猜測(cè)到里面會(huì)用到agent做一些遠(yuǎn)程相關(guān)的事情
public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties); //
    this.configFilterChainManager = new ConfigFilterChainManager(properties);
    //初始化網(wǎng)絡(luò)通信組件
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start(); 
    //初始化ClientWorker
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}

ClientWorker

在上述初始化代碼中,我們重點(diǎn)需要關(guān)注ClientWorker這個(gè)類,它的構(gòu)造方法如下

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager; //初始化配置過(guò)濾管理器
    // Initialize the timeout parameter
    init(properties); //初始化配置
    //初始化一個(gè)定時(shí)調(diào)度的線程池,重寫了threadfactory方法
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
     //初始化一個(gè)定時(shí)調(diào)度的線程池,從里面的name名字來(lái)看,似乎和長(zhǎng)輪訓(xùn)有關(guān)系。而這個(gè)長(zhǎng)輪訓(xùn)應(yīng)該是和nacos服務(wù)端的長(zhǎng)輪訓(xùn)
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    //設(shè)置定時(shí)任務(wù)的執(zhí)行頻率,并且調(diào)用checkConfigInfo這個(gè)方法,猜測(cè)是定時(shí)去檢測(cè)配置是否發(fā)生了變化
        //首次執(zhí)行延遲時(shí)間為1毫秒、延遲時(shí)間為10毫秒
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

可以看到 ClientWorker 除了將 HttpAgent 維持在自己內(nèi)部,還創(chuàng)建了兩個(gè)線程池:

第一個(gè)線程池是只擁有一個(gè)線程用來(lái)執(zhí)行定時(shí)任務(wù)的 executor,executor 每隔 10ms 就會(huì)執(zhí)行一次 checkConfigInfo() 方法,從方法名上可以知道是每 10 ms 檢查一次配置信息。

第二個(gè)線程池是一個(gè)普通的線程池,從 ThreadFactory 的名稱可以看到這個(gè)線程池是做長(zhǎng)輪詢的。

checkConfigInfo

ClientWorker構(gòu)造初始化中,啟動(dòng)了一個(gè)定時(shí)任務(wù)去執(zhí)行checkConfigInfo()方法,這個(gè)方法主要是定時(shí)檢查本地配置和服務(wù)器上的配置的變更情況,這個(gè)方法定義如下.

public void checkConfigInfo() {
    // Dispatch tasks.
    int listenerSize = cacheMap.size(); //
    // Round up the longingTaskCount.
     // 向上取整為批數(shù),監(jiān)聽(tīng)的配置數(shù)量除以3000,得到一個(gè)整數(shù),代表長(zhǎng)輪訓(xùn)任務(wù)的數(shù)量
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
     //currentLongingTaskCount表示當(dāng)前的長(zhǎng)輪訓(xùn)任務(wù)數(shù)量,如果小于計(jì)算的結(jié)果,則可以繼續(xù)創(chuàng)建
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

這個(gè)方法主要的目的是用來(lái)檢查服務(wù)端的配置信息是否發(fā)生了變化。如果有變化,則觸發(fā)listener通知

cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用來(lái)存儲(chǔ)監(jiān)聽(tīng)變更的緩存集合。key是根據(jù)dataID/group/tenant(租戶) 拼接的值。Value是對(duì)應(yīng)存儲(chǔ)在nacos服務(wù)器上的配置文件的內(nèi)容。

默認(rèn)情況下,每個(gè)長(zhǎng)輪訓(xùn)LongPullingRunnable任務(wù)默認(rèn)處理3000個(gè)監(jiān)聽(tīng)配置集。如果超過(guò)3000, 則需要啟動(dòng)多個(gè)LongPollingRunnable去執(zhí)行。

currentLongingTaskCount保存已啟動(dòng)的LongPullingRunnable任務(wù)數(shù)

executorService就是在ClientWorker構(gòu)造方法中初始化的線程池

LongPollingRunnable.run

LongPollingRunnable長(zhǎng)輪訓(xùn)任務(wù)的實(shí)現(xiàn)邏輯,代碼比較長(zhǎng),我們分段來(lái)分析。

第一部分主要有兩個(gè)邏輯

  • 對(duì)任務(wù)按照批次分類
  • 檢查當(dāng)前批次的緩存和本地文件的數(shù)據(jù)是否一致,如果發(fā)生了變化,則觸發(fā)監(jiān)聽(tīng)。
class LongPollingRunnable implements Runnable {
    private final int taskId; //表示當(dāng)前任務(wù)批次id
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 遍歷CacheMap,把CacheMap中和當(dāng)前任務(wù)id相同的緩存,保存到cacheDatas
            // 通過(guò)checkLocalConfig方法
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) { //這里表示數(shù)據(jù)有變化,需要通知監(jiān)聽(tīng)器
                            cacheData.checkListenerMd5(); //通知所有針對(duì)當(dāng)前配置設(shè)置了監(jiān)聽(tīng)的監(jiān)聽(tīng)器
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }
           //省略部分
        } catch (Throwable e) {
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出現(xiàn)異常,到下一次taskPenaltyTime后重新執(zhí)行任務(wù)
        }
    }
}

checkLocalConfig

檢查本地配置,這里面有三種情況

  • 如果isUseLocalConfigInfo為false,表示不使用本地配置,但是本地緩存路徑的文件是存在的,于是把isUseLocalConfigInfo設(shè)置為true,并且更新cacheData的內(nèi)容以及文件的更新時(shí)間
  • 如果isUseLocalConfigInfo為true,表示使用本地配置文件,但是本地緩存文件不存在,則設(shè)置為false,不通知監(jiān)聽(tīng)器。
  • 如果isUseLocalConfigInfo為true,并且本地緩存文件也存在,但是緩存的的時(shí)間和文件的更新時(shí)間不一致,則更新cacheData中的內(nèi)容,并且isUseLocalConfigInfo設(shè)置為true。
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
    // 沒(méi)有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        
        LOGGER.warn(
                "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }
     // 有 -> 沒(méi)有。不通知業(yè)務(wù)監(jiān)聽(tīng)器,從server拿到配置后通知。
    // If use local config info, then it doesn't notify business listener and notify after getting from server.
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
        return;
    }
     // 有變更
    if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
            .lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        LOGGER.warn(
                "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

checkListenerMd5

遍歷用戶自己添加的監(jiān)聽(tīng)器,如果發(fā)現(xiàn)數(shù)據(jù)的md5值不同,則發(fā)送通知

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}

檢查服務(wù)端配置

在LongPollingRunnable.run中,先通過(guò)本地配置的讀取和檢查來(lái)判斷數(shù)據(jù)是否發(fā)生變化從而實(shí)現(xiàn)變化的通知

接著,當(dāng)前的線程還需要去遠(yuǎn)程服務(wù)器上獲得最新的數(shù)據(jù),檢查哪些數(shù)據(jù)發(fā)生了變化

  • 通過(guò)checkUpdateDataIds獲取遠(yuǎn)程服務(wù)器上數(shù)據(jù)變更的dataid
  • 遍歷這些變化的集合,然后調(diào)用getServerConfig從遠(yuǎn)程服務(wù)器獲得對(duì)應(yīng)的內(nèi)容
  • 更新本地的cache,設(shè)置為服務(wù)器端返回的內(nèi)容
  • 最后遍歷cacheDatas,找到變化的數(shù)據(jù)進(jìn)行通知
// check server config
//從服務(wù)端獲取發(fā)生變化的數(shù)據(jù)的DataID列表,保存在List<String>集合中
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
//遍歷發(fā)生了變更的配置項(xiàng)
for (String groupKey : changedGroupKeys) {
    String[] key = GroupKey.parseKey(groupKey);
    String dataId = key[0];
    String group = key[1];
    String tenant = null;
    if (key.length == 3) {
        tenant = key[2];
    }
    try {
        //逐項(xiàng)根據(jù)這些配置項(xiàng)獲取配置信息
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
        //把配置信息保存到CacheData中
        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
        cache.setContent(response.getContent());
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        if (null != response.getConfigType()) {
            cache.setType(response.getConfigType());
        }
        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());
    } catch (NacosException ioe) {
        String message = String
            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ioe);
    }
}
//再遍歷CacheData這個(gè)集合,找到發(fā)生變化的數(shù)據(jù)進(jìn)行通知
for (CacheData cacheData : cacheDatas) {
    if (!cacheData.isInitializing() || inInitializingCacheList
        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
        cacheData.checkListenerMd5();
        cacheData.setInitializing(false);
    }
}
inInitializingCacheList.clear();
 //繼續(xù)傳遞當(dāng)前線程進(jìn)行輪詢
executorService.execute(this);

checkUpdateDataIds

這個(gè)方法主要是向服務(wù)器端發(fā)起檢查請(qǐng)求,判斷自己本地的配置和服務(wù)端的配置是否一致。

  • 首先從cacheDatas集合中找到isUseLocalConfigInfo為false的緩存
  • 把需要檢查的配置項(xiàng),拼接成一個(gè)字符串,調(diào)用checkUpdateConfigStr進(jìn)行驗(yàn)證
/**
 * 從Server獲取值變化了的DataID列表。返回的對(duì)象里只有dataId和group是有效的。 保證不返回NULL。
 */
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) { //把需要檢查的配置項(xiàng),拼接成一個(gè)字符串
        if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的緩存
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {//
                // cacheData 首次出現(xiàn)在cacheMap中&首次check更新
                inInitializingCacheList
                    .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

checkUpdateConfigStr

從Server獲取值變化了的DataID列表。返回的對(duì)象里只有dataId和group是有效的。 保證不返回NULL。

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    //拼接參數(shù)和header
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {//判斷可能發(fā)生變更的字符串是否為空,如果是,則直接返回。
        return Collections.emptyList();
    }
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // 設(shè)置readTimeoutMs,也就是本次請(qǐng)求等待響應(yīng)的超時(shí)時(shí)間,默認(rèn)是30s
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        //發(fā)起遠(yuǎn)程調(diào)用
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        if (result.ok()) { //如果響應(yīng)成功
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData()); //解析并更新數(shù)據(jù),返回的是確實(shí)發(fā)生了數(shù)據(jù)變更的字符串:tenant/group/dataid。
        } else {//如果響應(yīng)失敗
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

客戶端緩存配置長(zhǎng)輪訓(xùn)機(jī)制總結(jié)

整體實(shí)現(xiàn)的核心點(diǎn)就一下幾個(gè)部分

對(duì)本地緩存的配置做任務(wù)拆分,每一個(gè)批次是3000條

針對(duì)每3000條創(chuàng)建一個(gè)線程去執(zhí)行

先把每一個(gè)批次的緩存和本地磁盤文件中的數(shù)據(jù)進(jìn)行比較,

  • 如果和本地配置不一致,則表示該緩存發(fā)生了更新,直接通知客戶端監(jiān)聽(tīng)
  • 如果本地緩存和磁盤數(shù)據(jù)一致,則需要發(fā)起遠(yuǎn)程請(qǐng)求檢查配置變化

先以tenent/groupId/dataId拼接成字符串,發(fā)送到服務(wù)端進(jìn)行檢查,返回發(fā)生了變更的配置

客戶端收到變更配置列表,再逐項(xiàng)遍歷發(fā)送到服務(wù)端獲取配置內(nèi)容。

服務(wù)端配置更新的推送

分析完客戶端之后,隨著好奇心的驅(qū)使,服務(wù)端是如何處理客戶端的請(qǐng)求的?那么同樣,我們需要思考幾個(gè)問(wèn)題

  • 服務(wù)端是如何實(shí)現(xiàn)長(zhǎng)輪訓(xùn)機(jī)制的
  • 客戶端的超時(shí)時(shí)間為什么要設(shè)置30s

客戶端發(fā)起的請(qǐng)求地址是:/v1/cs/configs/listener,于是找到這個(gè)接口進(jìn)行查看,代碼如下。

//# ConfigController.java
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    Map<String, String> clientMd5Map;
    try {
        //解析客戶端傳遞過(guò)來(lái)的可能發(fā)生變化的配置項(xiàng)目,轉(zhuǎn)化為Map集合(key=dataId,value=md5)
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    // 開(kāi)始執(zhí)行長(zhǎng)輪訓(xùn)。
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig

這個(gè)方法主要是用來(lái)做長(zhǎng)輪訓(xùn)和短輪詢的判斷

  • 如果是長(zhǎng)輪訓(xùn),直接走addLongPollingClient方法
  • 如果是短輪詢,直接比較服務(wù)端的數(shù)據(jù),如果存在md5不一致,直接把數(shù)據(jù)返回。
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    // 判斷當(dāng)前請(qǐng)求是否支持長(zhǎng)輪訓(xùn)。()
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    //如果是短輪詢,走下面的請(qǐng)求,下面的請(qǐng)求就是把客戶端傳過(guò)來(lái)的數(shù)據(jù)和服務(wù)端的數(shù)據(jù)逐項(xiàng)進(jìn)行比較,保存到changeGroups中。
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    // Before 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    Loggers.AUTH.info("new content:" + newResult);
    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

addLongPollingClient

把客戶端的請(qǐng)求,保存到長(zhǎng)輪訓(xùn)的執(zhí)行引擎中。

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    //獲取客戶端長(zhǎng)輪訓(xùn)的超時(shí)時(shí)間
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); 
    //不允許斷開(kāi)的標(biāo)記
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    //應(yīng)用名稱
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    //
    String tag = req.getHeader("Vipserver-Tag");
    //延期時(shí)間,默認(rèn)為500ms
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // 提前500ms返回一個(gè)響應(yīng),避免客戶端出現(xiàn)超時(shí)
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        //通過(guò)md5判斷客戶端請(qǐng)求過(guò)來(lái)的key是否有和服務(wù)器端有不一致的,如果有,則保存到changedGroups中。
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) { //如果發(fā)現(xiàn)有變更,則直接把請(qǐng)求返回給客戶端
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag為true,說(shuō)明不需要掛起客戶端,所以直接返回。
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        }
    }
    //獲取請(qǐng)求端的ip
    String ip = RequestUtil.getRemoteIp(req);
    // Must be called by http thread, or send response.
    //把當(dāng)前請(qǐng)求轉(zhuǎn)化為一個(gè)異步請(qǐng)求(意味著此時(shí)tomcat線程被釋放,也就是客戶端的請(qǐng)求,需要通過(guò)asyncContext來(lái)手動(dòng)觸發(fā)返回,否則一直掛起)
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L); //設(shè)置異步請(qǐng)求超時(shí)時(shí)間,
    //執(zhí)行長(zhǎng)輪訓(xùn)請(qǐng)求
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling

接下來(lái)我們來(lái)分析一下,clientLongPolling到底做了什么操作。或者說(shuō)我們可以先猜測(cè)一下應(yīng)該會(huì)做什么事情

  • 這個(gè)任務(wù)要阻塞29.5s才能執(zhí)行,因?yàn)榱ⅠR執(zhí)行沒(méi)有任何意義,畢竟前面已經(jīng)執(zhí)行過(guò)一次了
  • 如果在29.5s+之內(nèi),數(shù)據(jù)發(fā)生變化,需要提前通知。需要有一種監(jiān)控機(jī)制

基于這些猜想,我們可以看看它的實(shí)現(xiàn)過(guò)程

從代碼粗粒度來(lái)看,它的實(shí)現(xiàn)似乎和我們的猜想一致,在run方法中,通過(guò)scheduler.schedule實(shí)現(xiàn)了一個(gè)定時(shí)任務(wù),它的delay時(shí)間正好是前面計(jì)算的29.5s。在這個(gè)任務(wù)中,會(huì)通過(guò)MD5Util.compareMd5來(lái)進(jìn)行計(jì)算

那另外一個(gè),當(dāng)數(shù)據(jù)發(fā)生變化以后,肯定不能等到29.5s之后才通知呀,那怎么辦呢?我們發(fā)現(xiàn)有一個(gè)allSubs的東西,它似乎和發(fā)布訂閱有關(guān)系。那是不是有可能當(dāng)前的clientLongPolling訂閱了數(shù)據(jù)變化的事件呢?

class ClientLongPolling implements Runnable {
    @Override
    public void run() {
        //構(gòu)建一個(gè)異步任務(wù),延后29.5s執(zhí)行
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() { //如果達(dá)到29.5s,說(shuō)明這個(gè)期間沒(méi)有做任何配置修改,則自動(dòng)觸發(fā)執(zhí)行
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this); //移除訂閱關(guān)系
                    if (isFixedPolling()) { //如果是固定間隔的長(zhǎng)輪訓(xùn)
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        //比較變更的key
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {//如果大于0,表示有變更,直接響應(yīng)
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null); //否則返回null
                        }
                    } else {
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }
            }
        }, timeoutTime, TimeUnit.MILLISECONDS);
        allSubs.add(this);  //把當(dāng)前線程添加到訂閱事件隊(duì)列中
    }
}

allSubs

allSubs是一個(gè)隊(duì)列,隊(duì)列里面放了ClientLongPolling這個(gè)對(duì)象。這個(gè)隊(duì)列似乎和配置變更有某種關(guān)聯(lián)關(guān)系。

那么這里必須要實(shí)現(xiàn)的是,當(dāng)用戶在nacos 控制臺(tái)修改了配置之后,必須要從這個(gè)訂閱關(guān)系中取出關(guān)注的客戶端長(zhǎng)連接,然后把變更的結(jié)果返回。于是我們?nèi)タ碙ongPollingService的構(gòu)造方法查找訂閱關(guān)系

/**
 * 長(zhǎng)輪詢訂閱關(guān)系
 */
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);

LongPollingService

在LongPollingService的構(gòu)造方法中,使用了一個(gè)NotifyCenter訂閱了一個(gè)事件,其中不難發(fā)現(xiàn),如果這個(gè)事件的實(shí)例是LocalDataChangeEvent,也就是服務(wù)端數(shù)據(jù)發(fā)生變更的時(shí)間,就會(huì)執(zhí)行一個(gè)DataChangeTask的線程。

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    //注冊(cè)LocalDataChangeEvent訂閱事件
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) { //如果觸發(fā)了LocalDataChangeEvent,則執(zhí)行下面的代碼
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
}

DataChangeTask

數(shù)據(jù)變更事件線程,代碼如下

class DataChangeTask implements Runnable {
    @Override
    public void run() {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey); //
            //遍歷所有訂閱事件表
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling
                //判斷當(dāng)前的ClientLongPolling中,請(qǐng)求的key是否包含當(dāng)前修改的groupKey
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta方式且betaIps不包含當(dāng)前客戶端ip,直接返回
                        continue;
                    }
                    // If published tag is not in the tag list, then it skipped.
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果配置了tag標(biāo)簽且不包含當(dāng)前客戶端的tag,直接返回
                        continue;
                    }
					//
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships. 移除當(dāng)前客戶端的訂閱關(guān)系
                    LogUtil.CLIENT_LOG
                            .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                    RequestUtil
                                            .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                    "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                    clientSub.sendResponse(Arrays.asList(groupKey)); //響應(yīng)客戶端請(qǐng)求。
                }
            }
        } catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
        }
    }
}

原理總結(jié)

以上就是Nacos客戶端配置中心緩存動(dòng)態(tài)更新實(shí)現(xiàn)源碼的詳細(xì)內(nèi)容,更多關(guān)于Nacos客戶端配置中心緩存動(dòng)態(tài)更新的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 一文帶你學(xué)會(huì)Spring?JDBC的使用

    一文帶你學(xué)會(huì)Spring?JDBC的使用

    JDBC?就是?數(shù)據(jù)庫(kù)開(kāi)發(fā)?操作的?代名詞,因?yàn)橹灰乾F(xiàn)代商業(yè)項(xiàng)目的開(kāi)發(fā)那么一定是離不開(kāi)?數(shù)據(jù)庫(kù)?的,不管你搞的是什么,只要是想使用動(dòng)態(tài)的開(kāi)發(fā)結(jié)構(gòu),那么一定就是?JDBC?,那么下面來(lái)教教大家傳統(tǒng)JDBC的使用
    2022-09-09
  • Feign調(diào)用服務(wù)時(shí)丟失Cookie和Header信息的解決方案

    Feign調(diào)用服務(wù)時(shí)丟失Cookie和Header信息的解決方案

    這篇文章主要介紹了Feign調(diào)用服務(wù)時(shí)丟失Cookie和Header信息的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • IntelliJ IDEA2021.1 配置大全(超詳細(xì)教程)

    IntelliJ IDEA2021.1 配置大全(超詳細(xì)教程)

    這篇文章主要介紹了IntelliJ IDEA2021.1 配置大全(超詳細(xì)教程),需要的朋友可以參考下
    2021-04-04
  • Java中單例模式的七種寫法示例

    Java中單例模式的七種寫法示例

    作為一個(gè)Java開(kāi)發(fā)者,也許你覺(jué)得自己對(duì)單例模式的了解已經(jīng)足夠多了,但究竟你自己了解的程度到底怎樣呢?下面這篇文章主要給大家介紹了關(guān)于Java中單例模式的七種寫法,需要的朋友可以參考下
    2021-09-09
  • 基于Springboot一個(gè)注解搞定數(shù)據(jù)字典的實(shí)踐方案

    基于Springboot一個(gè)注解搞定數(shù)據(jù)字典的實(shí)踐方案

    這篇文章主要介紹了基于Springboot一個(gè)注解搞定數(shù)據(jù)字典問(wèn)題,大致的方向是自定義注解,在序列化的時(shí)候進(jìn)行數(shù)據(jù)處理,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-06-06
  • springmvc請(qǐng)求轉(zhuǎn)發(fā)和重定向問(wèn)題(攜帶參數(shù)和不攜帶參數(shù))

    springmvc請(qǐng)求轉(zhuǎn)發(fā)和重定向問(wèn)題(攜帶參數(shù)和不攜帶參數(shù))

    這篇文章主要介紹了springmvc請(qǐng)求轉(zhuǎn)發(fā)和重定向問(wèn)題(攜帶參數(shù)和不攜帶參數(shù)),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • Java中BeanUtils.copyProperties基本用法與小坑

    Java中BeanUtils.copyProperties基本用法與小坑

    本文主要介紹了Java中BeanUtils.copyProperties基本用法與小坑,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證的示例步驟

    SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證的示例步驟

    權(quán)限驗(yàn)證是一種用于控制對(duì)系統(tǒng)資源和操作的訪問(wèn)的機(jī)制。它允許開(kāi)發(fā)人員定義誰(shuí)可以執(zhí)行特定操作或訪問(wèn)特定資源,并確保只有經(jīng)過(guò)授權(quán)的用戶才能執(zhí)行這些操作,這篇文章主要介紹了SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證,需要的朋友可以參考下
    2023-08-08
  • Java基本類型與byte數(shù)組之間相互轉(zhuǎn)換方法

    Java基本類型與byte數(shù)組之間相互轉(zhuǎn)換方法

    下面小編就為大家?guī)?lái)一篇Java基本類型與byte數(shù)組之間相互轉(zhuǎn)換方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-08-08
  • java實(shí)現(xiàn)文件重命名

    java實(shí)現(xiàn)文件重命名

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)文件重命名,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-03-03

最新評(píng)論