欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Nacos配置中心集群原理及源碼分析

 更新時(shí)間:2022年03月31日 11:13:04   作者:mic  
這篇文章主要為大家介紹了Nacos配置中心集群原理及源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪

Nacos作為配置中心,必然需要保證服務(wù)節(jié)點(diǎn)的高可用性,那么Nacos是如何實(shí)現(xiàn)集群的呢?

下面這個(gè)圖,表示Nacos集群的部署圖。

Nacos集群工作原理

Nacos作為配置中心的集群結(jié)構(gòu)中,是一種無(wú)中心化節(jié)點(diǎn)的設(shè)計(jì),由于沒(méi)有主從節(jié)點(diǎn),也沒(méi)有選舉機(jī)制,所以為了能夠?qū)崿F(xiàn)熱備,就需要增加虛擬IP(VIP)。

Nacos的數(shù)據(jù)存儲(chǔ)分為兩部分

  • Mysql數(shù)據(jù)庫(kù)存儲(chǔ),所有Nacos節(jié)點(diǎn)共享同一份數(shù)據(jù),數(shù)據(jù)的副本機(jī)制由Mysql本身的主從方案來(lái)解決,從而保證數(shù)據(jù)的可靠性。
  • 每個(gè)節(jié)點(diǎn)的本地磁盤(pán),會(huì)保存一份全量數(shù)據(jù),具體路徑:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的設(shè)計(jì)中,Mysql是一個(gè)中心數(shù)據(jù)倉(cāng)庫(kù),且認(rèn)為在Mysql中的數(shù)據(jù)是絕對(duì)正確的。 除此之外,Nacos在啟動(dòng)時(shí)會(huì)把Mysql中的數(shù)據(jù)寫(xiě)一份到本地磁盤(pán)。

這么設(shè)計(jì)的好處是可以提高性能,當(dāng)客戶端需要請(qǐng)求某個(gè)配置項(xiàng)時(shí),服務(wù)端會(huì)想Ian從磁盤(pán)中讀取對(duì)應(yīng)文件返回,而磁盤(pán)的讀取效率要比數(shù)據(jù)庫(kù)效率高。

當(dāng)配置發(fā)生變更時(shí):

  • Nacos會(huì)把變更的配置保存到數(shù)據(jù)庫(kù),然后再寫(xiě)入本地文件。
  • 接著發(fā)送一個(gè)HTTP請(qǐng)求,給到集群中的其他節(jié)點(diǎn),其他節(jié)點(diǎn)收到事件后,從Mysql中dump剛剛寫(xiě)入的數(shù)據(jù)到本地文件中。

另外,NacosServer啟動(dòng)后,會(huì)同步啟動(dòng)一個(gè)定時(shí)任務(wù),每隔6小時(shí),會(huì)dump一次全量數(shù)據(jù)到本地文件

配置變更同步入口

當(dāng)配置發(fā)生修改、刪除、新增操作時(shí),通過(guò)發(fā)布一個(gè)notifyConfigChange事件。

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
   //省略..
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    }//省略
    return true;
}

AsyncNotifyService

配置數(shù)據(jù)變更事件,專(zhuān)門(mén)有一個(gè)監(jiān)聽(tīng)器AsyncNotifyService,它會(huì)處理數(shù)據(jù)變更后的同步事件。

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
                // 構(gòu)建NotifySingleTask,并添加到隊(duì)列中。
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) { //遍歷集群中的每個(gè)節(jié)點(diǎn)
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                //異步執(zhí)行任務(wù) AsyncTask
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

AsyncTask

@Override
public void run() {
    executeAsyncInvoke();
}
private void executeAsyncInvoke() {
    while (!queue.isEmpty()) {//遍歷隊(duì)列中的數(shù)據(jù),直到數(shù)據(jù)為空
        NotifySingleTask task = queue.poll(); //獲取task
        String targetIp = task.getTargetIP(); //獲取目標(biāo)ip
        
        if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目標(biāo)ip
            // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
            //判斷目標(biāo)ip的健康狀態(tài)
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
            if (unHealthNeedDelay) { //如果目標(biāo)服務(wù)是非健康,則繼續(xù)添加到隊(duì)列中,延后再執(zhí)行。
                // target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                        task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                        0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                //構(gòu)建header
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                //通過(guò)restTemplate發(fā)起遠(yuǎn)程調(diào)用,如果調(diào)用成功,則執(zhí)行AsyncNotifyCallBack的回調(diào)方法
                restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
            }
        }
    }
}

目標(biāo)節(jié)點(diǎn)接收請(qǐng)求

數(shù)據(jù)同步的請(qǐng)求地址為,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        //
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

dumpService.dump用來(lái)實(shí)現(xiàn)配置的更新,代碼如下

當(dāng)前任務(wù)會(huì)被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

TaskManager.addTask, 先調(diào)用父類(lèi)去完成任務(wù)添加。

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

在這種場(chǎng)景設(shè)計(jì)中,一般都會(huì)采用生產(chǎn)者消費(fèi)者模式來(lái)完成,因此這里不難猜測(cè)到,任務(wù)會(huì)被保存到一個(gè)隊(duì)列中,然后有另外一個(gè)線程來(lái)執(zhí)行。

NacosDelayTaskExecuteEngine

TaskManager的父類(lèi)是NacosDelayTaskExecuteEngine,

這個(gè)類(lèi)中有一個(gè)成員屬性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,專(zhuān)門(mén)來(lái)保存延期執(zhí)行的任務(wù)類(lèi)型AbstractDelayTask.

在這個(gè)類(lèi)的構(gòu)造方法中,初始化了一個(gè)延期執(zhí)行的任務(wù),其中具體的任務(wù)是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

ProcessRunnable

private class ProcessRunnable implements Runnable {
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}

processTasks

protected void processTasks() {
    //獲取所有的任務(wù)
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        //獲取任務(wù)處理器,這里返回的是DumpProcessor
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            //執(zhí)行具體任務(wù)
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

DumpProcessor.process

讀取數(shù)據(jù)庫(kù)的最新數(shù)據(jù),然后更新本地緩存和磁盤(pán)

以上就是Nacos配置中心集群原理及源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Nacos配置中心集群原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 原來(lái)Java中有兩個(gè)ArrayList

    原來(lái)Java中有兩個(gè)ArrayList

    原來(lái)Java中有兩個(gè)ArrayList,本文就帶著大家一起探究Java中的ArrayList,感興趣的小伙伴們可以參考一下
    2016-01-01
  • SpringBoot各種注解詳解

    SpringBoot各種注解詳解

    SpringBoot的一個(gè)核心功能是IOC,就是將Bean初始化加載到容器中,Bean是如何加載到容器的,可以使用SpringBoot注解方式或者Spring XML配置方式。SpringBoot注解方式減少了配置文件內(nèi)容,更加便于管理,并且使用注解可以大大提高了開(kāi)發(fā)效率
    2022-12-12
  • IDEA 2020.1 搜索不到Chinese ​(Simplified)​ Language Pack EAP,無(wú)法安裝的問(wèn)題

    IDEA 2020.1 搜索不到Chinese ​(Simplified)​ Language

    小編在安裝中文插件時(shí)遇到IDEA 2020.1 搜索不到Chinese &#8203;(Simplified)&#8203; Language Pack EAP,無(wú)法安裝的問(wèn)題,本文給大家分享我的解決方法,感興趣的朋友一起看看吧
    2020-04-04
  • spring @Profiles和@PropertySource實(shí)現(xiàn)根據(jù)環(huán)境切換配置文件

    spring @Profiles和@PropertySource實(shí)現(xiàn)根據(jù)環(huán)境切換配置文件

    這篇文章主要介紹了spring @Profiles和@PropertySource根據(jù)環(huán)境切換配置文件,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 解決Java?結(jié)構(gòu)化數(shù)據(jù)處理開(kāi)源庫(kù)?SPL的問(wèn)題

    解決Java?結(jié)構(gòu)化數(shù)據(jù)處理開(kāi)源庫(kù)?SPL的問(wèn)題

    這篇文章主要介紹了Java?結(jié)構(gòu)化數(shù)據(jù)處理開(kāi)源庫(kù)?SPL的問(wèn)題,Scala提供了較豐富的結(jié)構(gòu)化數(shù)據(jù)計(jì)算函數(shù),但編譯型語(yǔ)言的特點(diǎn),也使它不能成為理想的結(jié)構(gòu)化數(shù)據(jù)計(jì)算類(lèi)庫(kù),對(duì)此內(nèi)容感興趣的朋友一起看看吧
    2022-03-03
  • springboot2.0整合dubbo的示例代碼

    springboot2.0整合dubbo的示例代碼

    這篇文章主要介紹了springboot2.0整合dubbo的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-08-08
  • java中注解機(jī)制及其原理的詳解

    java中注解機(jī)制及其原理的詳解

    這篇文章主要介紹了java中注解機(jī)制及其原理的詳解的相關(guān)資料,希望通過(guò)本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下
    2017-10-10
  • log4j中l(wèi)ogger標(biāo)簽中additivity屬性的用法說(shuō)明

    log4j中l(wèi)ogger標(biāo)簽中additivity屬性的用法說(shuō)明

    這篇文章主要介紹了log4j中l(wèi)ogger標(biāo)簽中additivity屬性的用法說(shuō)明,基于很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Springboot整合log4j2日志全解總結(jié)

    Springboot整合log4j2日志全解總結(jié)

    這篇文章主要介紹了Springboot整合log4j2日志全解總結(jié),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-12-12
  • 淺談Java實(shí)現(xiàn)回溯算法之八皇后問(wèn)題

    淺談Java實(shí)現(xiàn)回溯算法之八皇后問(wèn)題

    八皇后問(wèn)題是一個(gè)古老而又著名的問(wèn)題,是學(xué)習(xí)回溯算法的一個(gè)經(jīng)典案例。今天我們就一起來(lái)探究一下吧
    2021-06-06

最新評(píng)論