PowerJob的ServerDiscoveryService工作流程源碼解讀
序
本文主要研究一下PowerJob的ServerDiscoveryService
ServerDiscoveryService
tech/powerjob/worker/background/ServerDiscoveryService.java
@Slf4j public class ServerDiscoveryService { private final Long appId; private final PowerJobWorkerConfig config; private String currentServerAddress; private final Map<String, String> ip2Address = Maps.newHashMap(); /** * 服務(wù)發(fā)現(xiàn)地址 */ private static final String DISCOVERY_URL = "http://%s/server/acquire?%s"; /** * 失敗次數(shù) */ private static int FAILED_COUNT = 0; /** * 最大失敗次數(shù) */ private static final int MAX_FAILED_COUNT = 3; public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) { this.appId = appId; this.config = config; } //...... }
ServerDiscoveryService定義了currentServerAddress、ip2Address、服務(wù)發(fā)現(xiàn)url模版,失敗次數(shù),最大失敗次數(shù)
start
public void start(ScheduledExecutorService timingPool) { this.currentServerAddress = discovery(); if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) { throw new PowerJobException("can't find any available server, this worker has been quarantined."); } // 這里必須保證成功 timingPool.scheduleAtFixedRate(() -> { try { this.currentServerAddress = discovery(); } catch (Exception e) { log.error("[PowerDiscovery] fail to discovery server!", e); } } , 10, 10, TimeUnit.SECONDS); }
其start方法先通過(guò)discovery方法獲取currentServerAddress,然后注冊(cè)定時(shí)任務(wù)每隔10s重新刷新一下currentServerAddress
discovery
private String discovery() { if (ip2Address.isEmpty()) { config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x)); } String result = null; // 先對(duì)當(dāng)前機(jī)器發(fā)起請(qǐng)求 String currentServer = currentServerAddress; if (!StringUtils.isEmpty(currentServer)) { String ip = currentServer.split(":")[0]; // 直接請(qǐng)求當(dāng)前Server的HTTP服務(wù),可以少一次網(wǎng)絡(luò)開(kāi)銷(xiāo),減輕Server負(fù)擔(dān) String firstServerAddress = ip2Address.get(ip); if (firstServerAddress != null) { result = acquire(firstServerAddress); } } for (String httpServerAddress : config.getServerAddress()) { if (StringUtils.isEmpty(result)) { result = acquire(httpServerAddress); }else { break; } } if (StringUtils.isEmpty(result)) { log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined."); // 在 Server 高可用的前提下,連續(xù)失敗多次,說(shuō)明該節(jié)點(diǎn)與外界失聯(lián),Server已經(jīng)將秒級(jí)任務(wù)轉(zhuǎn)移到其他Worker,需要?dú)⑺辣镜氐娜蝿?wù) if (FAILED_COUNT++ > MAX_FAILED_COUNT) { log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker."); List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys(); if (!CollectionUtils.isEmpty(frequentInstanceIds)) { frequentInstanceIds.forEach(instanceId -> { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId); taskTracker.destroy(); log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId); }); } FAILED_COUNT = 0; } return null; } else { // 重置失敗次數(shù) FAILED_COUNT = 0; log.debug("[PowerDiscovery] current server is {}.", result); return result; } }
discovery方法從config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值則acquire,否則遍歷config.getServerAddress()執(zhí)行acquire;若還沒(méi)有獲取到則判斷FAILED_COUNT是否超出MAX_FAILED_COUNT,超出則遍歷HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨個(gè)執(zhí)行remove及destory
acquire
private String acquire(String httpServerAddress) { String result = null; String url = buildServerDiscoveryUrl(httpServerAddress); try { result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); }catch (Exception ignore) { } if (!StringUtils.isEmpty(result)) { try { ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class); if (resultDTO.isSuccess()) { return resultDTO.getData().toString(); } }catch (Exception ignore) { } } return null; } private String buildServerDiscoveryUrl(String address) { ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest() .setAppId(appId) .setCurrentServer(currentServerAddress) .setProtocol(config.getProtocol().name().toUpperCase()); String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap()); return String.format(DISCOVERY_URL, address, query); }
acquire方法通過(guò)buildServerDiscoveryUrl構(gòu)建url,然后執(zhí)行HttpUtils.get(url)獲取地址
ServerController
tech/powerjob/server/web/controller/ServerController.java
@RestController @RequestMapping("/server") @RequiredArgsConstructor public class ServerController implements ServerInfoAware { private ServerInfo serverInfo; private final TransportService transportService; private final ServerElectionService serverElectionService; private final AppInfoRepository appInfoRepository; private final WorkerClusterQueryService workerClusterQueryService; //...... @GetMapping("/acquire") public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) { return ResultDTO.success(serverElectionService.elect(request)); } //...... }
acquireServer方法執(zhí)行serverElectionService.elect(request)返回server地址
elect
tech/powerjob/server/remote/server/election/ServerElectionService.java
public String elect(ServerDiscoveryRequest request) { if (!accurate()) { final String currentServer = request.getCurrentServer(); // 如果是本機(jī),就不需要查數(shù)據(jù)庫(kù)那么復(fù)雜的操作了,直接返回成功 Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol())); if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) { log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer); return currentServer; } } return getServer0(request); }
elect方法判斷如果是本機(jī)就直接返回,否則執(zhí)行g(shù)etServer0
getServer0
private String getServer0(ServerDiscoveryRequest discoveryRequest) { final Long appId = discoveryRequest.getAppId(); final String protocol = discoveryRequest.getProtocol(); Set<String> downServerCache = Sets.newHashSet(); for (int i = 0; i < RETRY_TIMES; i++) { // 無(wú)鎖獲取當(dāng)前數(shù)據(jù)庫(kù)中的Server Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { throw new PowerJobException(appId + " is not registered!"); } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); String activeAddress = activeAddress(originServer, downServerCache, protocol); if (StringUtils.isNotEmpty(activeAddress)) { return activeAddress; } // 無(wú)可用Server,重新進(jìn)行Server選舉,需要加鎖 String lockName = String.format(SERVER_ELECT_LOCK, appId); boolean lockStatus = lockService.tryLock(lockName, 30000); if (!lockStatus) { try { Thread.sleep(500); }catch (Exception ignore) { } continue; } try { // 可能上一臺(tái)機(jī)器已經(jīng)完成了Server選舉,需要再次判斷 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol); if (StringUtils.isNotEmpty(address)) { return address; } // 篡位,如果本機(jī)存在協(xié)議,則作為Server調(diào)度該 worker final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol); if (targetProtocolInfo != null) { // 注意,寫(xiě)入 AppInfoDO#currentServer 的永遠(yuǎn)是 default 的地址,僅在返回的時(shí)候特殊處理為協(xié)議地址 appInfo.setCurrentServer(transportService.defaultProtocol().getAddress()); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); return targetProtocolInfo.getAddress(); } }catch (Exception e) { log.error("[ServerElection] write new server to db failed for app {}.", appName, e); } finally { lockService.unlock(lockName); } } throw new PowerJobException("server elect failed for app " + appId); }
getServer0方法先判斷appInfoRepository中當(dāng)前appId的originServer是否存活,是則直接返回,否則加鎖將transportService.defaultProtocol().getAddress()寫(xiě)入到appInfo的currentServer
小結(jié)
PowerJob的ServerDiscoveryService定義了start方法,它先通過(guò)discovery方法獲取currentServerAddress,然后注冊(cè)定時(shí)任務(wù)每隔10s重新刷新一下currentServerAddress;
discovery方法主要是遍歷config.getServerAddress()執(zhí)行acquire;
acquire方法通過(guò)buildServerDiscoveryUrl構(gòu)建url,然后執(zhí)行HttpUtils.get(url)獲取該appId的server地址。
以上就是PowerJob的ServerDiscoveryService工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob ServerDiscoveryService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- PowerJob的TimingStrategyHandler工作流程源碼解讀
- PowerJob的IdGenerateService工作流程源碼解讀
- PowerJob LockService方法工作流程源碼解讀
- PowerJob的Evaluator方法工作流程源碼解讀
- PowerJob的DatabaseMonitorAspect源碼流程
- PowerJob的AbstractScriptProcessor實(shí)現(xiàn)類(lèi)工作流程源碼解讀
- PowerJob的WorkerHealthReporter工作流程源碼解讀
- PowerJob的OmsLogHandler工作流程源碼解析
- PowerJob的ProcessorLoader工作流程源碼解讀
- PowerJob的DispatchStrategy方法工作流程源碼解讀
相關(guān)文章
Java并發(fā)編程——volatile關(guān)鍵字
這篇文章主要介紹了Java并發(fā)編程——volatile關(guān)鍵字的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)Java并發(fā)編程,感興趣的朋友可以了解下2020-10-10SpringBoot加載不出來(lái)application.yml文件的解決方法
這篇文章主要介紹了SpringBoot加載不出來(lái)application.yml文件的解決方法,文中通過(guò)示例代碼講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作有一定的幫助,需要的朋友跟著小編來(lái)一起來(lái)學(xué)習(xí)吧2023-12-12關(guān)于kafka發(fā)送消息的三種方式總結(jié)
這篇文章主要介紹了關(guān)于kafka發(fā)送消息的三種方式總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04springboot集成elasticsearch7的圖文方法
本文記錄springboot集成elasticsearch7的方法,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-05-05Spring擴(kuò)展點(diǎn)之BeanFactoryPostProcessor詳解
這篇文章主要介紹了Spring擴(kuò)展點(diǎn)之BeanFactoryPostProcessor詳解,Spring的設(shè)計(jì)非常優(yōu)雅,有很多的擴(kuò)展點(diǎn)供我們對(duì)項(xiàng)目進(jìn)行擴(kuò)展,今天學(xué)習(xí)一下Spring其中擴(kuò)展點(diǎn)之一的BeanFactoryPostProcessor,需要的朋友可以參考下2023-11-11