欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Nacos集群數(shù)據(jù)同步方式

 更新時(shí)間:2024年12月28日 08:55:16   作者:91猿說(shuō)編程  
文章主要介紹了Nacos集群中服務(wù)注冊(cè)信息的同步機(jī)制,涉及到負(fù)責(zé)節(jié)點(diǎn)和非負(fù)責(zé)節(jié)點(diǎn)之間的數(shù)據(jù)同步過(guò)程,以及DistroProtocol協(xié)議在同步中的應(yīng)用

引言

在Nacos屬于集群時(shí),當(dāng)服務(wù)器收到服務(wù)注冊(cè)請(qǐng)求后,發(fā)生了ClientEvent.ClientChangedEvent事件,就會(huì)觸發(fā)將注冊(cè)的服務(wù)信息同步給集群中的其他Nacos-server節(jié)點(diǎn)。

// DistroClientDataProcessor
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

同步時(shí),會(huì)涉及到一個(gè)負(fù)責(zé)節(jié)點(diǎn)和非負(fù)責(zé)節(jié)點(diǎn)

負(fù)責(zé)節(jié)點(diǎn)(發(fā)起同步)

也就是收到客戶端事件ClientChangedEvent后負(fù)責(zé)同步信息給其他非負(fù)責(zé)節(jié)點(diǎn), 所以這里只能有負(fù)責(zé)節(jié)點(diǎn)來(lái)進(jìn)行同步,非負(fù)責(zé)節(jié)點(diǎn)只能接收同步事件

// DistroClientDataProcessor
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
    return;
}

DistroProtocol

Distro是阿里巴巴的私有協(xié)議,distro協(xié)議是為了注冊(cè)中心而創(chuàng)造出的協(xié)議;

DistroProtocol會(huì)循環(huán)所有其他nacos節(jié)點(diǎn),提交一個(gè)異步任務(wù),這個(gè)異步任務(wù)會(huì)延遲1s,其實(shí)這里我們就可以看到這里涉及到客戶端的斷開(kāi)和客戶端的新增和修改,對(duì)于Delete操作,由DistroSyncDeleteTask處理,對(duì)于Change操作,由DistroSyncChangeTask處理,這里我們從DistroSyncChangeTask來(lái)看

// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

在調(diào)用syncToTarget后,會(huì)觸發(fā)任務(wù)DistroDelayTaskProcessor處理任務(wù),這是Distro協(xié)議的一個(gè)默認(rèn)延遲任務(wù)處理器,可以看到。 對(duì)于刪除類型的任務(wù),觸發(fā)任務(wù)DistroSyncDeleteTask , 對(duì)于刪除的任務(wù):DistroSyncChangeTask

public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    @Override
    public boolean process(NacosTask task) {
        
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

DistroSyncChangeTask

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    ...
    
    // 無(wú)回調(diào)
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    
    // 有回調(diào)
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //將得到的數(shù)據(jù)同步給其他服務(wù)節(jié)點(diǎn)
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    // 從DistroClientDataProcessor獲取DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

獲取同步數(shù)據(jù)getDistroData

這里獲取同步數(shù)據(jù)其實(shí)是從DistroClientDataProcessor 中獲取的,所以為Client的相關(guān)注冊(cè)服務(wù)信息

// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

可以看到generateSyncData 方法是關(guān)鍵獲取服務(wù)的方法,該方法提供了同步數(shù)據(jù),包含Client的注冊(cè)信息,包括客戶端注冊(cè)了哪些namespace,哪些group,哪些service,哪些instance。

// AbstractClient implements Client 
@Override
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

執(zhí)行同步數(shù)據(jù)syncData

這里的同步實(shí)際是由DistroClientTransportAgent來(lái)負(fù)責(zé)的,將數(shù)據(jù)分裝成DistroDataRequest 然后查詢到對(duì)于的服務(wù)節(jié)點(diǎn)Member然后調(diào)用asyncRequest異步方法執(zhí)行同步,后面的方法我就不跟了, 這時(shí)我們主要關(guān)注非負(fù)責(zé)節(jié)點(diǎn)收到同步請(qǐng)求后如何處理。

// DistroClientTransportAgent
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
        return;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}

非負(fù)責(zé)節(jié)點(diǎn)(接收請(qǐng)求)

當(dāng)負(fù)責(zé)節(jié)點(diǎn)將數(shù)據(jù)發(fā)送給非負(fù)責(zé)節(jié)點(diǎn)以后,將要處理發(fā)送過(guò)來(lái)的Client數(shù)據(jù)。通過(guò)DistroController收到數(shù)據(jù)后, 然后最終會(huì)DistroClientDataProcessor.processData方法來(lái)進(jìn)行處理

// DistroController.java
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
    ...
    DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
    distroProtocol.onReceive(distroHttpData);
    ...
}
// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

可以看出,這里分別對(duì)ADD/CHANGE和DELETE進(jìn)行了處理,這里我主要關(guān)注ADD/CHANGE,所以主要關(guān)注handlerClientSyncData方法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
    // 同步客戶端連接,此時(shí)如果客戶端不存在,則會(huì)注冊(cè)一個(gè)非負(fù)責(zé)節(jié)點(diǎn)client,后面就會(huì)獲取到該客戶端操作
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    // 獲取Client(此時(shí)注冊(cè)到的是ConnectionBasedClient)
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 更新Client數(shù)據(jù)
    upgradeClient(client, clientSyncData);
}

**注意:**這里要注意下此時(shí)的Client實(shí)現(xiàn)類ConnectionBasedClient,它的isNative屬性為false,這是非負(fù)責(zé)節(jié)點(diǎn)和負(fù)責(zé)節(jié)點(diǎn)的主要區(qū)別。

其實(shí)判斷當(dāng)前nacos節(jié)點(diǎn)是否為負(fù)責(zé)節(jié)點(diǎn)的依據(jù)就是這個(gè)**isNative屬性**,如果是客戶端直接注冊(cè)在這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為true;如果是由Distro協(xié)議,同步到這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為false。

那其實(shí)我們都知道2.x的版本以后使用了長(zhǎng)連接,所以**通過(guò)長(zhǎng)連接建立在哪個(gè)節(jié)點(diǎn)上,哪個(gè)節(jié)點(diǎn)就是責(zé)任節(jié)點(diǎn),客戶端也只會(huì)向這個(gè)責(zé)任節(jié)點(diǎn)發(fā)送請(qǐng)求**。

DistroClientDataProcessor的upgradeClient方法,更新Client里的注冊(cè)表信息,發(fā)布對(duì)應(yīng)事件

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

Distro協(xié)議負(fù)責(zé)集群數(shù)據(jù)統(tǒng)一

Distro為了確保集群間數(shù)據(jù)一致,不僅僅依賴于數(shù)據(jù)發(fā)生改變時(shí)的實(shí)時(shí)同步,后臺(tái)有定時(shí)任務(wù)做數(shù)據(jù)同步。

在1.x版本中,責(zé)任節(jié)點(diǎn)每5s同步所有Service的Instance列表的摘要(md5)給非責(zé)任節(jié)點(diǎn),非責(zé)任節(jié)點(diǎn)用對(duì)端傳來(lái)的服務(wù)md5比對(duì)本地服務(wù)的md5,如果發(fā)生改變,需要反查責(zé)任節(jié)點(diǎn)。

在2.x版本中,對(duì)這個(gè)流程做了改造,責(zé)任節(jié)點(diǎn)會(huì)發(fā)送Client全量數(shù)據(jù),非責(zé)任節(jié)點(diǎn)定時(shí)檢測(cè)同步過(guò)來(lái)的Client是否過(guò)期,減少1.x版本中的反查。

責(zé)任節(jié)點(diǎn)每5s向其他節(jié)點(diǎn)發(fā)送DataOperation=VERIFY類型的DistroData,來(lái)維持非責(zé)任節(jié)點(diǎn)的Client數(shù)據(jù)不過(guò)期。

// DistroVerifyTimedTask.java
@Override
public void run() {
    // 所有其他節(jié)點(diǎn)
    List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        // 遍歷這些節(jié)點(diǎn)發(fā)送Client.isNative=true的DistroData,type = VERIFY
        verifyForDataStorage(each, targetServer);
    }
}

非責(zé)任節(jié)點(diǎn)每5s掃描isNative=false的client,如果client 30s內(nèi)沒(méi)有被VERIFY的DistroData更新過(guò)續(xù)租時(shí)間,會(huì)刪除這個(gè)同步過(guò)來(lái)的Client數(shù)據(jù)。

//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && client.isExpire(currentTime)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
} 

// ConnectionBasedClient.java
@Override
public boolean isExpire(long currentTime) {
    // 判斷30s內(nèi)沒(méi)有續(xù)租 認(rèn)為過(guò)期
    return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目

    SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目

    這篇文章主要介紹了SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目,用于搭建RESTful API工程的腳手架,只需三分鐘你就可以開(kāi)始編寫業(yè)務(wù)代碼,不再煩惱于構(gòu)建項(xiàng)目與風(fēng)格統(tǒng)一,感興趣的小伙伴們可以參考一下
    2018-12-12
  • Java?17新特性詳細(xì)講解與代碼實(shí)例

    Java?17新特性詳細(xì)講解與代碼實(shí)例

    這篇文章主要給大家介紹了關(guān)于Java?17新特性詳細(xì)講解與代碼實(shí)例的相關(guān)資料,Java 17是2021年9月發(fā)布的最新版本,其中包含了很多新特性和改進(jìn),這些新特性和改進(jìn)將進(jìn)一步提高 Java 語(yǔ)言的性能和可用性,需要的朋友可以參考下
    2023-09-09
  • EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法

    EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法

    這篇文章主要介紹了EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • 學(xué)習(xí)spring事務(wù)與消息隊(duì)列

    學(xué)習(xí)spring事務(wù)與消息隊(duì)列

    這篇文章主要為大家詳細(xì)介紹了spring事務(wù)與消息隊(duì)列,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-10-10
  • 關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別

    關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別

    這篇文章主要介紹了關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • mybatis 逆向生成后遵循java駝峰法則的解決

    mybatis 逆向生成后遵循java駝峰法則的解決

    這篇文章主要介紹了mybatis 逆向生成后遵循java駝峰法則的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-11-11
  • 使用Logback設(shè)置property參數(shù)方式

    使用Logback設(shè)置property參數(shù)方式

    這篇文章主要介紹了使用Logback設(shè)置property參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(12)

    Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(12)

    下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你
    2021-07-07
  • 分布式框架Zookeeper?api的使用介紹

    分布式框架Zookeeper?api的使用介紹

    Zookeeper作為?個(gè)分布式框架,主要用來(lái)解決分布式?致性問(wèn)題,它提供了簡(jiǎn)單的分布式原語(yǔ),并且對(duì)多種編程語(yǔ)?提供了API,所以接下來(lái)重點(diǎn)來(lái)看下Zookeeper的java客戶端API使用方式
    2022-09-09
  • 詳解Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取

    詳解Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取

    本篇文章主要介紹了Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧。
    2016-12-12

最新評(píng)論