Nacos源碼閱讀方法
為什么我會(huì)經(jīng)常閱讀源碼呢,因?yàn)殚喿x源碼能讓你更加接近大佬,哈哈,這是我瞎扯的。
這篇文章將會(huì)帶大家閱讀Nacos源碼 以及 教大家閱讀源碼的技巧,我們正式開(kāi)始吧!
先給大家獻(xiàn)上一張我梳理的高清源碼圖,方便大家對(duì)nacos的源碼有一個(gè)整體上的認(rèn)識(shí)。

有了這張圖,我們就很容易去看nacos源碼了。
如何找切入點(diǎn)
首先我們得要找一個(gè)切入點(diǎn)進(jìn)入到nacos源碼中,那么就從nacos依賴(lài)入手
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>進(jìn)入這個(gè)依賴(lài)文件,會(huì)發(fā)現(xiàn)它又依賴(lài)了一個(gè)組件:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
</dependency>進(jìn)入依賴(lài)之后,我們發(fā)現(xiàn)它長(zhǎng)這樣:

從這張圖中,我們發(fā)現(xiàn)了一個(gè)熟悉的配置文件spring.factories,這是sringboot自動(dòng)裝配的必備文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
因?yàn)檫@張主要說(shuō)的是服務(wù)注冊(cè)源碼,所以我們可以只用關(guān)注(NacosServiceRegistryAutoConfiguration)自動(dòng)裝配文件
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
我們看到的是三個(gè)bean注入,這里給大家介紹一個(gè)看源碼的小技巧:自動(dòng)裝配的文件中申明的bean類(lèi),我們只需要看帶有auto的bean,這個(gè)往往是入口;NacosAutoServiceRegistration 帶有auto,我們點(diǎn)進(jìn)去看看里面都有什么:
@Override
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
super.register();
}
里面有一個(gè)register()方法,我在這里打個(gè)斷點(diǎn),因?yàn)槲也聹y(cè)這個(gè)就是注冊(cè)的入口,我現(xiàn)在使用debug模式,啟動(dòng)一個(gè)服務(wù),看它會(huì)不會(huì)調(diào)用這個(gè)方法:
客戶(hù)端注冊(cè)
這里貼上我debug后,進(jìn)入register方法的調(diào)用鏈截圖

看到這個(gè)調(diào)用鏈,看到一個(gè)onApplicationEvent的回調(diào)方法,找到這個(gè)方法所在的類(lèi)AbstractAutoServiceRegistration
這個(gè)類(lèi)繼承了ApplicationListener這個(gè)多播器監(jiān)聽(tīng)器,spring啟動(dòng)之后,會(huì)發(fā)布多播器事件,然后回調(diào)實(shí)現(xiàn)多播器組件的onApplicationEvent方法,我們從這個(gè)方法開(kāi)始分析:
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event); // 綁定端口,并啟動(dòng)
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
// 設(shè)置端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 啟動(dòng)客戶(hù)端注冊(cè)組件
this.start();
}
public void start() {
// 省略分支代碼
// 調(diào)用注冊(cè)
register();
}
因?yàn)閟pringcloud提供了多種注冊(cè)中心擴(kuò)展,但是我們這里只引用了nacos注冊(cè)中心,所以這里直接調(diào)用的是NacosServiceRegistry的register方法:
public void register(Registration registration) {
// 省略分支代碼
// 獲取服務(wù)id
String serviceId = registration.getServiceId();
// 獲取組配置
String group = nacosDiscoveryProperties.getGroup();
// 封裝服務(wù)實(shí)例
Instance instance = getNacosInstanceFromRegistration(registration);
// 調(diào)用 命名服務(wù)的 registerInstance方法 注冊(cè)實(shí)例
namingService.registerInstance(serviceId, group, instance);
}
進(jìn)入到registerInstance方法
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
// 省略分支代碼
// 與服務(wù)端建立心跳,默認(rèn)每隔5秒定時(shí)發(fā)送新跳包
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 通過(guò)http方式向服務(wù)端發(fā)送注冊(cè)請(qǐng)求
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
serverproxy通過(guò)調(diào)用對(duì)http進(jìn)行封裝的reapi方法,向服務(wù)端接口("/nacos/v1/ns/instance")發(fā)送請(qǐng)求,
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
Map<String, String> params = new HashMap(9);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
我們知道nacos經(jīng)常是以集群形式部署的,那客戶(hù)端是如何選擇其中一個(gè)節(jié)點(diǎn)發(fā)送呢,肯定得實(shí)現(xiàn)負(fù)載均衡的邏輯,我們點(diǎn)擊reqAPI,看它是如何實(shí)現(xiàn)的
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
// 隨機(jī)獲取一個(gè)索引,servers保存的是所有nacos節(jié)點(diǎn)地址
int index = random.nextInt(servers.size());
// 遍歷所有節(jié)點(diǎn),根據(jù)index值,從servers中找到對(duì)應(yīng)位置的server,進(jìn)行請(qǐng)求調(diào)用,如果調(diào)用成功則返回,否則依次往后遍歷,直到請(qǐng)求成功
for(int i = 0; i < servers.size(); ++i) {
String server = (String)servers.get(index);
try {
return this.callServer(api, params, server, method);
} catch (NacosException var11) {
exception = var11;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
} catch (Exception var12) {
exception = var12;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
}
// index+1 然后取模 是保證index不會(huì)越界
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
}
到這里,客戶(hù)端注冊(cè)的代碼已經(jīng)分析完了,不過(guò)這還不是本篇的結(jié)束,我們還得繼續(xù)分析服務(wù)端是如何處理客戶(hù)端發(fā)送過(guò)來(lái)的注冊(cè)請(qǐng)求:
服務(wù)端處理客戶(hù)端注冊(cè)請(qǐng)求
如果需要查看服務(wù)端源碼的話,則需要將nacos源碼下下來(lái) 下載地址
我們從服務(wù)注冊(cè)api接口地址(/nacos/v1/ns/instance),可以找到對(duì)應(yīng)的controller為(com.alibaba.nacos.naming.controllers.InstanceController)
因?yàn)樽?cè)實(shí)例發(fā)送的是post請(qǐng)求,所以直接找被postmapping注解的register方法
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
// 獲取服務(wù)名
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// 獲取命名空間id
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 注冊(cè)實(shí)例
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}
我們點(diǎn)擊進(jìn)入到registerInstance方法:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 執(zhí)行添加實(shí)例的操作
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
分析
在nacos中,注冊(cè)實(shí)例后,還需要將注冊(cè)信息同步到其他節(jié)點(diǎn),所有在nacos中存在兩種同步模式AP和CP,ap和cp主要體現(xiàn)在集群中如何同步注冊(cè)信息到其它集群節(jié)點(diǎn)的實(shí)現(xiàn)方式上;
nacos通過(guò)ephemeral 字段值來(lái)決定是使用ap方式同步還是cp方式同步,默認(rèn)使用的的ap方式同步注冊(cè)信息。com.alibaba.nacos.naming.core.ServiceManager.addInstance()
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
// 生成服務(wù)的key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 獲取服務(wù)
Service service = getService(namespaceId, serviceName);
// 使用同步鎖處理
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 調(diào)用consistencyService.put 處理同步過(guò)來(lái)的服務(wù)
consistencyService.put(key, instances);
}
}
我們?cè)谶M(jìn)入到consistencyService.put方法中

點(diǎn)擊put方法時(shí),會(huì)看到有三個(gè)實(shí)現(xiàn)類(lèi),根據(jù)上下文(或者debug方式),可以推斷出這里引用的是DelegateConsistencyServiceImpl實(shí)現(xiàn)類(lèi)
@Override
public void put(String key, Record value) throws NacosException {
// 進(jìn)入到這個(gè)put方法后,就可以知道應(yīng)該使用ap方式同步還是cp方式同步
mapConsistencyService(key).put(key, value);
}
從下面的方法中 可以判斷通過(guò)key來(lái)判斷使用ap還是cp來(lái)同步注冊(cè)信息,其中key是由ephemeral字段組成;
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
AP 方式同步的流程(ephemeralConsistencyService) 本地服務(wù)器處理注冊(cè)信息&將注冊(cè)信息同步到其它節(jié)點(diǎn)
@Override
public void put(String key, Record value) throws NacosException {
// 處理本地注冊(cè)列表
onPut(key, value);
// 添加阻塞任務(wù),同步信息到其他集群節(jié)點(diǎn)
taskDispatcher.addTask(key);
}
處理本地注冊(cè)節(jié)點(diǎn)
nacos將key做為一個(gè)task,添加到notifer中阻塞隊(duì)列tasks中,并且使用單線程執(zhí)行,其中notifer是初始化的時(shí)候,作為一個(gè)線程被放到線程池中(線程池只設(shè)置了一個(gè)核心線程);
這里有一個(gè)點(diǎn)需要告訴大家:在大多數(shù)分布式框架,都會(huì)采用單線程的阻塞隊(duì)列來(lái)處理耗時(shí)的任務(wù),一方面解決并發(fā)問(wèn)題,另一方面能夠解決并發(fā)帶來(lái)的寫(xiě)寫(xiě)沖突問(wèn)題。
線程中的主要處理邏輯就是,循環(huán)讀取阻塞隊(duì)列中的內(nèi)容,然后處理注冊(cè)信息,更新到內(nèi)存注冊(cè)列表中。
同步注冊(cè)信息到其他集群節(jié)點(diǎn)
nacos同樣也是把注冊(cè)key作為一個(gè)task存放到 TaskDispatcher 中的taskShedule阻塞隊(duì)列中,然后開(kāi)啟線程循環(huán)讀取阻塞隊(duì)列:
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
// 省略判斷代碼
// 添加同步的key
keys.add(key);
// 計(jì)數(shù)
dataSize++;
// 判斷同步的key大小是否等于 批量同步設(shè)置的限量 或者 判斷據(jù)上次同步時(shí)間 是否大于 配置的間隔周期,如果滿足任意一個(gè),則開(kāi)始同步
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 遍歷所有集群節(jié)點(diǎn),直接調(diào)用http進(jìn)行同步
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
// 記錄本次同步時(shí)間
lastDispatchTime = System.currentTimeMillis();
// 計(jì)數(shù)清零
dataSize = 0;
}
}
}
}
使用ap方式作同步的過(guò)程很簡(jiǎn)單,但是這里面有兩種設(shè)計(jì)思路來(lái)解決單個(gè)key同步的問(wèn)題:
如果有新的key推送上來(lái),nacos就發(fā)起一次同步,這會(huì)造成網(wǎng)絡(luò)資源浪費(fèi),因?yàn)槊看瓮降木椭挥幸粋€(gè)key或者幾個(gè)key;
同步少量的key解決方案: 只有積累到指定數(shù)量的key,才發(fā)起批量同步距離上次同步時(shí)間超過(guò)配置的限制時(shí)間,則忽略key數(shù)量,直接發(fā)起同步 CP 方式同步的流程(RaftConsistencyServiceImpl)
cp模式追求的是數(shù)據(jù)一致性,為了數(shù)據(jù)一致性,那么肯定得選出一個(gè)leader,由leader首先同步,然后再由leader通知follower前來(lái)獲取最新的注冊(cè)節(jié)點(diǎn)(或者主動(dòng)推送給follower)
nacos使用raft協(xié)議來(lái)進(jìn)行選舉leader,來(lái)實(shí)現(xiàn)cp模式。
同樣進(jìn)入到 RaftConsistencyServiceImpl的put方法
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
進(jìn)入到raftCore.signalPublish方法中,我提取幾個(gè)關(guān)鍵的代碼
// 首先判斷當(dāng)前nacos節(jié)點(diǎn)是否是leader,如果不是leader,則獲取leader節(jié)點(diǎn)的ip,然后將請(qǐng)求轉(zhuǎn)發(fā)到leader處理,否則往下走
if (!isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
同樣采用同樣隊(duì)列的方式,去處理本地注冊(cè)列表
onPublish(datum, peers.local());
public void onPublish(Datum datum, RaftPeer source) throws Exception {
// 添加同步key任務(wù)到阻塞隊(duì)列中
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
遍歷所有集群節(jié)點(diǎn),發(fā)送http同步請(qǐng)求
for (final String server : peers.allServersIncludeMyself()) {
// 如果是leader,則不進(jìn)行同步
if (isLeader(server)) {
latch.countDown();
continue;
}
// 組裝url 發(fā)送同步請(qǐng)求到其它集群節(jié)點(diǎn)
final String url = buildURL(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
到此,nacos服務(wù)注冊(cè)及服務(wù)實(shí)例同步的主干源碼已經(jīng)分析完了。
總結(jié)
對(duì)于剛開(kāi)始接觸nacos源碼的同學(xué),可以先把頭上的圖多看幾遍,然后對(duì)照著源碼找到對(duì)應(yīng)的位置 ,最后結(jié)合圖再結(jié)合本文,整體連貫的看下來(lái),相信會(huì)有很大收獲的;雖然閱讀源碼的過(guò)程很痛苦,但是你只要堅(jiān)持下來(lái)了,掌握到了閱讀源碼的技巧,你就會(huì)發(fā)現(xiàn)再難的源碼,你都能把它啃下來(lái);后面我會(huì)專(zhuān)門(mén)寫(xiě)一篇教你如何高效閱讀源碼的文章,希望對(duì)于剛接觸源碼的同學(xué)能有所幫助。
到此這篇關(guān)于Nacos源碼閱讀方法的文章就介紹到這了,更多相關(guān)Nacos源碼閱讀內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot實(shí)現(xiàn)網(wǎng)站的登陸注冊(cè)邏輯記錄
登陸注冊(cè)功能是我們?nèi)粘i_(kāi)發(fā)中經(jīng)常遇到的一個(gè)功能,下面這篇文章主要給大家介紹了關(guān)于SpringBoot實(shí)現(xiàn)網(wǎng)站的登陸注冊(cè)邏輯的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2021-10-10
SpringMVC響應(yīng)視圖和結(jié)果視圖詳解
這篇文章主要介紹了SpringMVC響應(yīng)視圖和結(jié)果視圖,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
基于ScheduledExecutorService的兩種方法(詳解)
下面小編就為大家?guī)?lái)一篇基于ScheduledExecutorService的兩種方法(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-10-10
Springboot基礎(chǔ)之RedisUtils工具類(lèi)
本文來(lái)說(shuō)下RedisUtils工具類(lèi),主要介紹了整合Redis、MyBatis,封裝RedisUtils工具類(lèi)等知識(shí),文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很好的幫助,需要的朋友可以參考下2021-05-05
基于RecyclerChart的KLine繪制Volume實(shí)現(xiàn)詳解
這篇文章主要為大家介紹了基于RecyclerChart的KLine繪制Volume實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
mybatis generator 配置 反向生成Entity簡(jiǎn)單增刪改查(推薦)
這篇文章主要介紹了mybatis generator 配置 反向生成Entity簡(jiǎn)單增刪改查(推薦)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-12-12
JPA如何使用nativequery多表關(guān)聯(lián)查詢(xún)返回自定義實(shí)體類(lèi)
這篇文章主要介紹了JPA如何使用nativequery多表關(guān)聯(lián)查詢(xún)返回自定義實(shí)體類(lèi),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
SpringMVC通過(guò)模型視圖ModelAndView渲染視圖的實(shí)現(xiàn)
這篇文章主要介紹了SpringMVC通過(guò)模型視圖ModelAndView渲染視圖的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12

