Nacos集群數(shù)據(jù)同步方式
引言
在Nacos屬于集群時(shí),當(dāng)服務(wù)器收到服務(wù)注冊請求后,發(fā)生了ClientEvent.ClientChangedEvent事件,就會觸發(fā)將注冊的服務(wù)信息同步給集群中的其他Nacos-server節(jié)點(diǎn)。
// DistroClientDataProcessor
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}同步時(shí),會涉及到一個(gè)負(fù)責(zé)節(jié)點(diǎn)和非負(fù)責(zé)節(jié)點(diǎn)
負(fù)責(zé)節(jié)點(diǎn)(發(fā)起同步)
也就是收到客戶端事件ClientChangedEvent后負(fù)責(zé)同步信息給其他非負(fù)責(zé)節(jié)點(diǎn), 所以這里只能有負(fù)責(zé)節(jié)點(diǎn)來進(jìn)行同步,非負(fù)責(zé)節(jié)點(diǎn)只能接收同步事件
// DistroClientDataProcessor
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}DistroProtocol
Distro是阿里巴巴的私有協(xié)議,distro協(xié)議是為了注冊中心而創(chuàng)造出的協(xié)議;
DistroProtocol會循環(huán)所有其他nacos節(jié)點(diǎn),提交一個(gè)異步任務(wù),這個(gè)異步任務(wù)會延遲1s,其實(shí)這里我們就可以看到這里涉及到客戶端的斷開和客戶端的新增和修改,對于Delete操作,由DistroSyncDeleteTask處理,對于Change操作,由DistroSyncChangeTask處理,這里我們從DistroSyncChangeTask來看
// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}在調(diào)用syncToTarget后,會觸發(fā)任務(wù)DistroDelayTaskProcessor處理任務(wù),這是Distro協(xié)議的一個(gè)默認(rèn)延遲任務(wù)處理器,可以看到。 對于刪除類型的任務(wù),觸發(fā)任務(wù)DistroSyncDeleteTask , 對于刪除的任務(wù):DistroSyncChangeTask
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
@Override
public boolean process(NacosTask task) {
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
case DELETE:
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
}DistroSyncChangeTask
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
...
// 無回調(diào)
@Override
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return true;
}
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
}
// 有回調(diào)
@Override
protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return;
}
//將得到的數(shù)據(jù)同步給其他服務(wù)節(jié)點(diǎn)
getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);
}
// 從DistroClientDataProcessor獲取DistroData
private DistroData getDistroData(String type) {
DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
if (null != result) {
result.setType(OPERATION);
}
return result;
}
}獲取同步數(shù)據(jù)getDistroData
這里獲取同步數(shù)據(jù)其實(shí)是從DistroClientDataProcessor 中獲取的,所以為Client的相關(guān)注冊服務(wù)信息
// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {
Client client = clientManager.getClient(distroKey.getResourceKey());
if (null == client) {
return null;
}
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
return new DistroData(distroKey, data);
}可以看到generateSyncData 方法是關(guān)鍵獲取服務(wù)的方法,該方法提供了同步數(shù)據(jù),包含Client的注冊信息,包括客戶端注冊了哪些namespace,哪些group,哪些service,哪些instance。
// AbstractClient implements Client
@Override
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}執(zhí)行同步數(shù)據(jù)syncData
這里的同步實(shí)際是由DistroClientTransportAgent來負(fù)責(zé)的,將數(shù)據(jù)分裝成DistroDataRequest 然后查詢到對于的服務(wù)節(jié)點(diǎn)Member然后調(diào)用asyncRequest異步方法執(zhí)行同步,后面的方法我就不跟了, 這時(shí)我們主要關(guān)注非負(fù)責(zé)節(jié)點(diǎn)收到同步請求后如何處理。
// DistroClientTransportAgent
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
return;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
try {
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}非負(fù)責(zé)節(jié)點(diǎn)(接收請求)
當(dāng)負(fù)責(zé)節(jié)點(diǎn)將數(shù)據(jù)發(fā)送給非負(fù)責(zé)節(jié)點(diǎn)以后,將要處理發(fā)送過來的Client數(shù)據(jù)。通過DistroController收到數(shù)據(jù)后, 然后最終會DistroClientDataProcessor.processData方法來進(jìn)行處理
// DistroController.java
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
...
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
distroProtocol.onReceive(distroHttpData);
...
}// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}可以看出,這里分別對ADD/CHANGE和DELETE進(jìn)行了處理,這里我主要關(guān)注ADD/CHANGE,所以主要關(guān)注handlerClientSyncData方法。
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
// 同步客戶端連接,此時(shí)如果客戶端不存在,則會注冊一個(gè)非負(fù)責(zé)節(jié)點(diǎn)client,后面就會獲取到該客戶端操作
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
// 獲取Client(此時(shí)注冊到的是ConnectionBasedClient)
Client client = clientManager.getClient(clientSyncData.getClientId());
// 更新Client數(shù)據(jù)
upgradeClient(client, clientSyncData);
}**注意:**這里要注意下此時(shí)的Client實(shí)現(xiàn)類ConnectionBasedClient,它的isNative屬性為false,這是非負(fù)責(zé)節(jié)點(diǎn)和負(fù)責(zé)節(jié)點(diǎn)的主要區(qū)別。
其實(shí)判斷當(dāng)前nacos節(jié)點(diǎn)是否為負(fù)責(zé)節(jié)點(diǎn)的依據(jù)就是這個(gè)**isNative屬性**,如果是客戶端直接注冊在這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為true;如果是由Distro協(xié)議,同步到這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為false。
那其實(shí)我們都知道2.x的版本以后使用了長連接,所以**通過長連接建立在哪個(gè)節(jié)點(diǎn)上,哪個(gè)節(jié)點(diǎn)就是責(zé)任節(jié)點(diǎn),客戶端也只會向這個(gè)責(zé)任節(jié)點(diǎn)發(fā)送請求**。
DistroClientDataProcessor的upgradeClient方法,更新Client里的注冊表信息,發(fā)布對應(yīng)事件
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
Set<Service> syncedService = new HashSet<>();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}Distro協(xié)議負(fù)責(zé)集群數(shù)據(jù)統(tǒng)一
Distro為了確保集群間數(shù)據(jù)一致,不僅僅依賴于數(shù)據(jù)發(fā)生改變時(shí)的實(shí)時(shí)同步,后臺有定時(shí)任務(wù)做數(shù)據(jù)同步。
在1.x版本中,責(zé)任節(jié)點(diǎn)每5s同步所有Service的Instance列表的摘要(md5)給非責(zé)任節(jié)點(diǎn),非責(zé)任節(jié)點(diǎn)用對端傳來的服務(wù)md5比對本地服務(wù)的md5,如果發(fā)生改變,需要反查責(zé)任節(jié)點(diǎn)。
在2.x版本中,對這個(gè)流程做了改造,責(zé)任節(jié)點(diǎn)會發(fā)送Client全量數(shù)據(jù),非責(zé)任節(jié)點(diǎn)定時(shí)檢測同步過來的Client是否過期,減少1.x版本中的反查。
責(zé)任節(jié)點(diǎn)每5s向其他節(jié)點(diǎn)發(fā)送DataOperation=VERIFY類型的DistroData,來維持非責(zé)任節(jié)點(diǎn)的Client數(shù)據(jù)不過期。
// DistroVerifyTimedTask.java
@Override
public void run() {
// 所有其他節(jié)點(diǎn)
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
for (String each : distroComponentHolder.getDataStorageTypes()) {
// 遍歷這些節(jié)點(diǎn)發(fā)送Client.isNative=true的DistroData,type = VERIFY
verifyForDataStorage(each, targetServer);
}
}非責(zé)任節(jié)點(diǎn)每5s掃描isNative=false的client,如果client 30s內(nèi)沒有被VERIFY的DistroData更新過續(xù)租時(shí)間,會刪除這個(gè)同步過來的Client數(shù)據(jù)。
//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : clientManager.allClientId()) {
ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
if (null != client && client.isExpire(currentTime)) {
clientManager.clientDisconnected(each);
}
}
}
}
// ConnectionBasedClient.java
@Override
public boolean isExpire(long currentTime) {
// 判斷30s內(nèi)沒有續(xù)租 認(rèn)為過期
return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目
這篇文章主要介紹了SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目,用于搭建RESTful API工程的腳手架,只需三分鐘你就可以開始編寫業(yè)務(wù)代碼,不再煩惱于構(gòu)建項(xiàng)目與風(fēng)格統(tǒng)一,感興趣的小伙伴們可以參考一下2018-12-12
EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法
這篇文章主要介紹了EditPlus運(yùn)行java時(shí)從鍵盤輸入數(shù)據(jù)的操作方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
學(xué)習(xí)spring事務(wù)與消息隊(duì)列
這篇文章主要為大家詳細(xì)介紹了spring事務(wù)與消息隊(duì)列,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10
關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別
這篇文章主要介紹了關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
使用Logback設(shè)置property參數(shù)方式
這篇文章主要介紹了使用Logback設(shè)置property參數(shù)方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(12)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07
詳解Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取
本篇文章主要介紹了Java實(shí)現(xiàn)多種方式的http數(shù)據(jù)抓取,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧。2016-12-12

