Nacos客戶端本地緩存和故障轉(zhuǎn)移方式
在Nacos客戶端從Server獲得服務的時候,在某些時候出現(xiàn)了一些故障, 這時候為了保證服務正常,Nacos進行了故障轉(zhuǎn)移,原理就是將之前緩存的服務信息拿出來用,防止服務出現(xiàn)問題,涉及到的核心類為ServiceInfoHolder和FailoverReactor。
本地緩存有兩方面,第一方面是從注冊中心獲得實例信息會緩存在內(nèi)存當中,也就是通過Map的形式承載,這樣查詢操作都方便。第二方面便是通過磁盤文件的形式定時緩存起來,以備不時之需。
故障轉(zhuǎn)移也分兩方面,第一方面是故障轉(zhuǎn)移的開關(guān)是通過文件來標記的;第二方面是當開啟故障轉(zhuǎn)移之后,當發(fā)生故障時,可以從故障轉(zhuǎn)移備份的文件中來獲得服務實例信息。
1. ServiceInfoHolder
ServiceInfoHolder類,顧名思義,服務信息的持有者。每次客戶端從注冊中心獲取新的服務信息時都會調(diào)用該類,其中processServiceInfo方法來進行本地化處理,包括更新緩存服務、發(fā)布事件、更新本地文件等。
ServiceInfoHolder類持有了ServiceInfo,通過一個ConcurrentMap來儲存
// ServiceInfoHolder private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
當從服務端拉會服務信息時,就會往這個map中進行存儲。
// ServiceInfoHolder public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { .... //緩存服務信息 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); .... }
在創(chuàng)建ServiceInfoHolder時會做如下事情
- 初始化本地緩存目錄
- 根據(jù)配置從本地緩存初始化服務,默認false
- 創(chuàng)建FailoverReactor并相互持有ServiceInfoHolder
// ServiceInfoHolder public ServiceInfoHolder(String namespace, Properties properties) { initCacheDir(namespace, properties); if (isLoadCacheAtStart(properties)) { this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap<>(16); } this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushEmptyProtection = isPushEmptyProtect(properties); }
本地緩存目錄
在processServiceInfo中會進行本地緩存寫入,其實就是寫入這個目錄(這個目錄是在創(chuàng)建ServiceInfoHolder時根據(jù)配置初始化的),所以目錄中的數(shù)據(jù)正常是最新讀取到的服務信息
// ServiceInfoHolder public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { .... // 記錄Service本地文件 DiskCache.write(serviceInfo, cacheDir); .... }
本地緩存目錄默認路徑:${user.home}/nacos/naming/public,也可以自定義,通過System.setProperty("JM.SNAPSHOT.PATH")自定義
2. FailoverReactor
在ServiceInfoHolder的構(gòu)造方法中,還會初始化一個FailoverReactor類,同樣是ServiceInfoHolder的成員變量。FailoverReactor的作用便是用來處理故障轉(zhuǎn)移的。
構(gòu)造故障轉(zhuǎn)移做了如下事情:
// FailoverReactor public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) { // 持有ServiceInfoHolder的引用 this.serviceInfoHolder = serviceInfoHolder; // 拼接故障目錄:${user.home}/nacos/naming/public/failover this.failoverDir = cacheDir + FAILOVER_DIR; // 初始化executorService,支持延時執(zhí)行 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); // 守護線程模式運行 thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.failover"); return thread; } }); // 其他初始化操作,通過executorService開啟多個定時任務執(zhí)行 this.init(); }
init方法執(zhí)行
在這個方法中開啟了三個定時任務,這三個任務其實都是FailoverReactor的內(nèi)部類
1. 初始化立即執(zhí)行,然后每間隔5秒再執(zhí)行SwitchRefresher
2. 初始化延遲30分鐘執(zhí)行,執(zhí)行間隔24小時,執(zhí)行任務DiskFileWriter
3. 初始化立即執(zhí)行,執(zhí)行間隔10秒,執(zhí)行核心操作為DiskFileWriter
// FailoverReactor public void init() { // 初始化立即執(zhí)行,執(zhí)行間隔5秒,執(zhí)行任務SwitchRefresher executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS); // 初始化延遲30分鐘執(zhí)行,執(zhí)行間隔24小時,執(zhí)行任務DiskFileWriter executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES); // 10秒后如果故障目錄為空,則執(zhí)行DiskFileWriter任務強制備份 executorService.schedule(new Runnable() { @Override public void run() { try { File cacheDir = new File(failoverDir); ... File[] files = cacheDir.listFiles(); if (files == null || files.length <= 0) { new DiskFileWriter().run(); } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to backup file on startup.", e); } } }, 10000L, TimeUnit.MILLISECONDS); }
DiskFileWriter刷盤任務
將ServiceInfo寫入備份磁盤
// FailoverReactor class DiskFileWriter extends TimerTask { @Override public void run() { Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap(); for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) { ServiceInfo serviceInfo = entry.getValue(); ... // 將緩存寫入磁盤 DiskCache.write(serviceInfo, failoverDir); } } }
SwitchRefresher根據(jù)標記故障轉(zhuǎn)移文件切換內(nèi)存標志
- 如果故障轉(zhuǎn)移文件不存在,則直接返回(文件開關(guān))
- 比較文件修改時間,如果已經(jīng)修改,則獲取故障轉(zhuǎn)移文件中的內(nèi)容。
- 故障轉(zhuǎn)移文件中存儲了0和1標識。0表示關(guān)閉,1表示開啟。
- 當為開啟狀態(tài)時,執(zhí)行線程FailoverFileReader。
// FailoverReactor class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L; @Override public void run() { try { File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH); // 文件不存在則退出 if (!switchFile.exists()) { ... return; } long modified = switchFile.lastModified(); if (lastModifiedMillis < modified) { lastModifiedMillis = modified; // 獲取故障轉(zhuǎn)移文件內(nèi)容 String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString()); if (!StringUtils.isEmpty(failover)) { String[] lines = failover.split(DiskCache.getLineSeparator()); for (String line : lines) { String line1 = line.trim(); // 1 表示開啟故障轉(zhuǎn)移模式 if (IS_FAILOVER_MODE.equals(line1)) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString()); new FailoverFileReader().run(); // 0 表示關(guān)閉故障轉(zhuǎn)移模式 } else if (NO_FAILOVER_MODE.equals(line1)) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); } } } else { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read failover switch.", e); } } }
FailoverFileReader故障后讀取備份文件
該任務是故障轉(zhuǎn)移的核心, 故障轉(zhuǎn)移文件讀取,基本操作就是讀取failover目錄存儲的**備份服務信息文件**內(nèi)容,然后轉(zhuǎn)換成ServiceInfo,并且將所有的ServiceInfo儲存在FailoverReactor的ServiceMap屬性中。
流程如下:
1. 讀取failover目錄下的所有文件,進行遍歷處理
2. 如果文件不存在跳過
3. 如果文件是故障轉(zhuǎn)移開關(guān)標志文件跳過
4. 讀取文件中的備份內(nèi)容,轉(zhuǎn)換為ServiceInfo對象
5. 將ServiceInfo對象放入到domMap中
6. 最后判斷domMap不為空,賦值給serviceMap
// FailoverReactor class FailoverFileReader implements Runnable { @Override public void run() { Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16); File cacheDir = new File(failoverDir); File[] files = cacheDir.listFiles(); if (files == null) { return; } for (File file : files) { // 如果是故障轉(zhuǎn)移標志文件,則跳過 if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) { continue; } ServiceInfo dom = new ServiceInfo(file.getName()); BufferedReader reader = null; try { String dataString = ConcurrentDiskUtil .getFileContent(file, Charset.defaultCharset().toString()); reader = new BufferedReader(new StringReader(dataString)); String json = reader.readLine(); dom = JacksonUtils.toObj(json, ServiceInfo.class); } finally { reader.close(); } if (!CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } } // 讀入緩存 if (domMap.size() > 0) { serviceMap = domMap; } } }·
FailoverReactor.serviceMap的使用
我們在ServiceInfoHolder中的getServiceInfo方法就會判斷,如果當前是故障切換狀態(tài),就會從FailoverReactor中獲取服務,那么這是就用到了FailoverReactor.serviceMap緩存的服務了
// ServiceInfoHolder public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) { ... if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } return serviceInfoMap.get(key); }
// FailoverReactor public ServiceInfo getService(String key) { ServiceInfo serviceInfo = serviceMap.get(key); if (serviceInfo == null) { serviceInfo = new ServiceInfo(); serviceInfo.setName(key); } return serviceInfo; }
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java統(tǒng)計字符串中指定元素出現(xiàn)次數(shù)方法
這篇文章主要介紹了java統(tǒng)計字符串中指定元素出現(xiàn)次數(shù)方法,需要的朋友可以參考下2015-12-12SpringBoot整合JavaMail實現(xiàn)發(fā)郵件的項目實踐
本文主要介紹了SpringBoot整合JavaMail實現(xiàn)發(fā)郵件的項目實踐,詳細闡述了使用SpringBoot和JavaMail發(fā)送郵件的步驟,具有一定的參考價值,感興趣的可以了解一下2023-10-10關(guān)于Cannot?resolve?com.microsoft.sqlserver:sqljdbc4:4.0報錯問題解
這篇文章主要給大家介紹了關(guān)于Cannot?resolve?com.microsoft.sqlserver:sqljdbc4:4.0報錯問題的解決辦法,這個是在pom文件中添加依賴出現(xiàn)報錯問題,需要的朋友可以參考下2024-02-02MyBatis?Generator生成數(shù)據(jù)庫模型實現(xiàn)示例
這篇文章主要為大家介紹了MyBatis?Generator生成數(shù)據(jù)庫模型實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12