Nacos集群數(shù)據(jù)同步方式
引言
在Nacos屬于集群時(shí),當(dāng)服務(wù)器收到服務(wù)注冊(cè)請(qǐng)求后,發(fā)生了ClientEvent.ClientChangedEvent事件,就會(huì)觸發(fā)將注冊(cè)的服務(wù)信息同步給集群中的其他Nacos-server節(jié)點(diǎn)。
// DistroClientDataProcessor private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); // Only ephemeral data sync by Distro, persist client should sync by raft. if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } }
同步時(shí),會(huì)涉及到一個(gè)負(fù)責(zé)節(jié)點(diǎn)和非負(fù)責(zé)節(jié)點(diǎn)
負(fù)責(zé)節(jié)點(diǎn)(發(fā)起同步)
也就是收到客戶端事件ClientChangedEvent后負(fù)責(zé)同步信息給其他非負(fù)責(zé)節(jié)點(diǎn), 所以這里只能有負(fù)責(zé)節(jié)點(diǎn)來(lái)進(jìn)行同步,非負(fù)責(zé)節(jié)點(diǎn)只能接收同步事件
// DistroClientDataProcessor // Only ephemeral data sync by Distro, persist client should sync by raft. if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; }
DistroProtocol
Distro是阿里巴巴的私有協(xié)議,distro協(xié)議是為了注冊(cè)中心而創(chuàng)造出的協(xié)議;
DistroProtocol會(huì)循環(huán)所有其他nacos節(jié)點(diǎn),提交一個(gè)異步任務(wù),這個(gè)異步任務(wù)會(huì)延遲1s,其實(shí)這里我們就可以看到這里涉及到客戶端的斷開(kāi)和客戶端的新增和修改,對(duì)于Delete操作,由DistroSyncDeleteTask處理,對(duì)于Change操作,由DistroSyncChangeTask處理,這里我們從DistroSyncChangeTask來(lái)看
// DistroProtocol public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); } }
在調(diào)用syncToTarget后,會(huì)觸發(fā)任務(wù)DistroDelayTaskProcessor處理任務(wù),這是Distro協(xié)議的一個(gè)默認(rèn)延遲任務(wù)處理器,可以看到。 對(duì)于刪除類型的任務(wù),觸發(fā)任務(wù)DistroSyncDeleteTask , 對(duì)于刪除的任務(wù):DistroSyncChangeTask
public class DistroDelayTaskProcessor implements NacosTaskProcessor { @Override public boolean process(NacosTask task) { DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); switch (distroDelayTask.getAction()) { case DELETE: DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask); return true; case CHANGE: case ADD: DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; default: return false; } } }
DistroSyncChangeTask
public class DistroSyncChangeTask extends AbstractDistroExecuteTask { ... // 無(wú)回調(diào) @Override protected boolean doExecute() { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return true; } return getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer()); } // 有回調(diào) @Override protected void doExecuteWithCallback(DistroCallback callback) { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return; } //將得到的數(shù)據(jù)同步給其他服務(wù)節(jié)點(diǎn) getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer(), callback); } // 從DistroClientDataProcessor獲取DistroData private DistroData getDistroData(String type) { DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey()); if (null != result) { result.setType(OPERATION); } return result; } }
獲取同步數(shù)據(jù)getDistroData
這里獲取同步數(shù)據(jù)其實(shí)是從DistroClientDataProcessor 中獲取的,所以為Client的相關(guān)注冊(cè)服務(wù)信息
// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor @Override public DistroData getDistroData(DistroKey distroKey) { Client client = clientManager.getClient(distroKey.getResourceKey()); if (null == client) { return null; } byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData()); return new DistroData(distroKey, data); }
可以看到generateSyncData 方法是關(guān)鍵獲取服務(wù)的方法,該方法提供了同步數(shù)據(jù),包含Client的注冊(cè)信息,包括客戶端注冊(cè)了哪些namespace,哪些group,哪些service,哪些instance。
// AbstractClient implements Client @Override public ClientSyncData generateSyncData() { List<String> namespaces = new LinkedList<>(); List<String> groupNames = new LinkedList<>(); List<String> serviceNames = new LinkedList<>(); List<InstancePublishInfo> instances = new LinkedList<>(); for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) { namespaces.add(entry.getKey().getNamespace()); groupNames.add(entry.getKey().getGroup()); serviceNames.add(entry.getKey().getName()); instances.add(entry.getValue()); } return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances); }
執(zhí)行同步數(shù)據(jù)syncData
這里的同步實(shí)際是由DistroClientTransportAgent來(lái)負(fù)責(zé)的,將數(shù)據(jù)分裝成DistroDataRequest 然后查詢到對(duì)于的服務(wù)節(jié)點(diǎn)Member然后調(diào)用asyncRequest異步方法執(zhí)行同步,后面的方法我就不跟了, 這時(shí)我們主要關(guān)注非負(fù)責(zé)節(jié)點(diǎn)收到同步請(qǐng)求后如何處理。
// DistroClientTransportAgent @Override public void syncData(DistroData data, String targetServer, DistroCallback callback) { if (isNoExistTarget(targetServer)) { callback.onSuccess(); return; } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); try { clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member)); } catch (NacosException nacosException) { callback.onFailed(nacosException); } }
非負(fù)責(zé)節(jié)點(diǎn)(接收請(qǐng)求)
當(dāng)負(fù)責(zé)節(jié)點(diǎn)將數(shù)據(jù)發(fā)送給非負(fù)責(zé)節(jié)點(diǎn)以后,將要處理發(fā)送過(guò)來(lái)的Client數(shù)據(jù)。通過(guò)DistroController收到數(shù)據(jù)后, 然后最終會(huì)DistroClientDataProcessor.processData方法來(lái)進(jìn)行處理
// DistroController.java @PutMapping("/datum") public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception { ... DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue()); distroProtocol.onReceive(distroHttpData); ... }
// DistroClientDataProcessor.java @Override public boolean processData(DistroData distroData) { switch (distroData.getType()) { case ADD: case CHANGE: ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class) .deserialize(distroData.getContent(), ClientSyncData.class); handlerClientSyncData(clientSyncData); return true; case DELETE: String deleteClientId = distroData.getDistroKey().getResourceKey(); Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId); clientManager.clientDisconnected(deleteClientId); return true; default: return false; } }
可以看出,這里分別對(duì)ADD/CHANGE和DELETE進(jìn)行了處理,這里我主要關(guān)注ADD/CHANGE,所以主要關(guān)注handlerClientSyncData方法。
private void handlerClientSyncData(ClientSyncData clientSyncData) { Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId()); // 同步客戶端連接,此時(shí)如果客戶端不存在,則會(huì)注冊(cè)一個(gè)非負(fù)責(zé)節(jié)點(diǎn)client,后面就會(huì)獲取到該客戶端操作 clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes()); // 獲取Client(此時(shí)注冊(cè)到的是ConnectionBasedClient) Client client = clientManager.getClient(clientSyncData.getClientId()); // 更新Client數(shù)據(jù) upgradeClient(client, clientSyncData); }
**注意:**這里要注意下此時(shí)的Client實(shí)現(xiàn)類ConnectionBasedClient,它的isNative屬性為false,這是非負(fù)責(zé)節(jié)點(diǎn)和負(fù)責(zé)節(jié)點(diǎn)的主要區(qū)別。
其實(shí)判斷當(dāng)前nacos節(jié)點(diǎn)是否為負(fù)責(zé)節(jié)點(diǎn)的依據(jù)就是這個(gè)**isNative屬性**,如果是客戶端直接注冊(cè)在這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為true;如果是由Distro協(xié)議,同步到這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為false。
那其實(shí)我們都知道2.x的版本以后使用了長(zhǎng)連接,所以**通過(guò)長(zhǎng)連接建立在哪個(gè)節(jié)點(diǎn)上,哪個(gè)節(jié)點(diǎn)就是責(zé)任節(jié)點(diǎn),客戶端也只會(huì)向這個(gè)責(zé)任節(jié)點(diǎn)發(fā)送請(qǐng)求**。
DistroClientDataProcessor的upgradeClient方法,更新Client里的注冊(cè)表信息,發(fā)布對(duì)應(yīng)事件
private void upgradeClient(Client client, ClientSyncData clientSyncData) { List<String> namespaces = clientSyncData.getNamespaces(); List<String> groupNames = clientSyncData.getGroupNames(); List<String> serviceNames = clientSyncData.getServiceNames(); List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos(); Set<Service> syncedService = new HashSet<>(); for (int i = 0; i < namespaces.size(); i++) { Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i)); Service singleton = ServiceManager.getInstance().getSingleton(service); syncedService.add(singleton); InstancePublishInfo instancePublishInfo = instances.get(i); if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) { client.addServiceInstance(singleton, instancePublishInfo); NotifyCenter.publishEvent( new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); } } for (Service each : client.getAllPublishedService()) { if (!syncedService.contains(each)) { client.removeServiceInstance(each); NotifyCenter.publishEvent( new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId())); } } }
Distro協(xié)議負(fù)責(zé)集群數(shù)據(jù)統(tǒng)一
Distro為了確保集群間數(shù)據(jù)一致,不僅僅依賴于數(shù)據(jù)發(fā)生改變時(shí)的實(shí)時(shí)同步,后臺(tái)有定時(shí)任務(wù)做數(shù)據(jù)同步。
在1.x版本中,責(zé)任節(jié)點(diǎn)每5s同步所有Service的Instance列表的摘要(md5)給非責(zé)任節(jié)點(diǎn),非責(zé)任節(jié)點(diǎn)用對(duì)端傳來(lái)的服務(wù)md5比對(duì)本地服務(wù)的md5,如果發(fā)生改變,需要反查責(zé)任節(jié)點(diǎn)。
在2.x版本中,對(duì)這個(gè)流程做了改造,責(zé)任節(jié)點(diǎn)會(huì)發(fā)送Client全量數(shù)據(jù),非責(zé)任節(jié)點(diǎn)定時(shí)檢測(cè)同步過(guò)來(lái)的Client是否過(guò)期,減少1.x版本中的反查。
責(zé)任節(jié)點(diǎn)每5s向其他節(jié)點(diǎn)發(fā)送DataOperation=VERIFY類型的DistroData,來(lái)維持非責(zé)任節(jié)點(diǎn)的Client數(shù)據(jù)不過(guò)期。
// DistroVerifyTimedTask.java @Override public void run() { // 所有其他節(jié)點(diǎn) List<Member> targetServer = serverMemberManager.allMembersWithoutSelf(); for (String each : distroComponentHolder.getDataStorageTypes()) { // 遍歷這些節(jié)點(diǎn)發(fā)送Client.isNative=true的DistroData,type = VERIFY verifyForDataStorage(each, targetServer); } }
非責(zé)任節(jié)點(diǎn)每5s掃描isNative=false的client,如果client 30s內(nèi)沒(méi)有被VERIFY的DistroData更新過(guò)續(xù)租時(shí)間,會(huì)刪除這個(gè)同步過(guò)來(lái)的Client數(shù)據(jù)。
//ConnectionBasedClientManager->ExpiredClientCleaner private static class ExpiredClientCleaner implements Runnable { @Override public void run() { long currentTime = System.currentTimeMillis(); for (String each : clientManager.allClientId()) { ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each); if (null != client && client.isExpire(currentTime)) { clientManager.clientDisconnected(each); } } } } // ConnectionBasedClient.java @Override public boolean isExpire(long currentTime) { // 判斷30s內(nèi)沒(méi)有續(xù)租 認(rèn)為過(guò)期 return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime(); }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目
這篇文章主要介紹了SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目,用于搭建RESTful API工程的腳手架,只需三分鐘你就可以開(kāi)始編寫業(yè)務(wù)代碼,不再煩惱于構(gòu)建項(xiàng)目與風(fēng)格統(tǒng)一,感興趣的小伙伴們可以參考一下2018-12-12EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法
這篇文章主要介紹了EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03學(xué)習(xí)spring事務(wù)與消息隊(duì)列
這篇文章主要為大家詳細(xì)介紹了spring事務(wù)與消息隊(duì)列,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別
這篇文章主要介紹了關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02使用Logback設(shè)置property參數(shù)方式
這篇文章主要介紹了使用Logback設(shè)置property參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(12)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-07-07詳解Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取
本篇文章主要介紹了Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧。2016-12-12