PowerJob的ServerDiscoveryService工作流程源碼解讀
序
本文主要研究一下PowerJob的ServerDiscoveryService
ServerDiscoveryService
tech/powerjob/worker/background/ServerDiscoveryService.java
@Slf4j
public class ServerDiscoveryService {
private final Long appId;
private final PowerJobWorkerConfig config;
private String currentServerAddress;
private final Map<String, String> ip2Address = Maps.newHashMap();
/**
* 服務發(fā)現(xiàn)地址
*/
private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
/**
* 失敗次數(shù)
*/
private static int FAILED_COUNT = 0;
/**
* 最大失敗次數(shù)
*/
private static final int MAX_FAILED_COUNT = 3;
public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
this.appId = appId;
this.config = config;
}
//......
}ServerDiscoveryService定義了currentServerAddress、ip2Address、服務發(fā)現(xiàn)url模版,失敗次數(shù),最大失敗次數(shù)
start
public void start(ScheduledExecutorService timingPool) {
this.currentServerAddress = discovery();
if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
throw new PowerJobException("can't find any available server, this worker has been quarantined.");
}
// 這里必須保證成功
timingPool.scheduleAtFixedRate(() -> {
try {
this.currentServerAddress = discovery();
} catch (Exception e) {
log.error("[PowerDiscovery] fail to discovery server!", e);
}
}
, 10, 10, TimeUnit.SECONDS);
}其start方法先通過discovery方法獲取currentServerAddress,然后注冊定時任務每隔10s重新刷新一下currentServerAddress
discovery
private String discovery() {
if (ip2Address.isEmpty()) {
config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
}
String result = null;
// 先對當前機器發(fā)起請求
String currentServer = currentServerAddress;
if (!StringUtils.isEmpty(currentServer)) {
String ip = currentServer.split(":")[0];
// 直接請求當前Server的HTTP服務,可以少一次網(wǎng)絡開銷,減輕Server負擔
String firstServerAddress = ip2Address.get(ip);
if (firstServerAddress != null) {
result = acquire(firstServerAddress);
}
}
for (String httpServerAddress : config.getServerAddress()) {
if (StringUtils.isEmpty(result)) {
result = acquire(httpServerAddress);
}else {
break;
}
}
if (StringUtils.isEmpty(result)) {
log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
// 在 Server 高可用的前提下,連續(xù)失敗多次,說明該節(jié)點與外界失聯(lián),Server已經(jīng)將秒級任務轉移到其他Worker,需要殺死本地的任務
if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
frequentInstanceIds.forEach(instanceId -> {
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
taskTracker.destroy();
log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
});
}
FAILED_COUNT = 0;
}
return null;
} else {
// 重置失敗次數(shù)
FAILED_COUNT = 0;
log.debug("[PowerDiscovery] current server is {}.", result);
return result;
}
}discovery方法從config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值則acquire,否則遍歷config.getServerAddress()執(zhí)行acquire;若還沒有獲取到則判斷FAILED_COUNT是否超出MAX_FAILED_COUNT,超出則遍歷HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨個執(zhí)行remove及destory
acquire
private String acquire(String httpServerAddress) {
String result = null;
String url = buildServerDiscoveryUrl(httpServerAddress);
try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
try {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
}catch (Exception ignore) {
}
}
return null;
}
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setCurrentServer(currentServerAddress)
.setProtocol(config.getProtocol().name().toUpperCase());
String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
return String.format(DISCOVERY_URL, address, query);
}acquire方法通過buildServerDiscoveryUrl構建url,然后執(zhí)行HttpUtils.get(url)獲取地址
ServerController
tech/powerjob/server/web/controller/ServerController.java
@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {
private ServerInfo serverInfo;
private final TransportService transportService;
private final ServerElectionService serverElectionService;
private final AppInfoRepository appInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
//......
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
return ResultDTO.success(serverElectionService.elect(request));
}
//......
}acquireServer方法執(zhí)行serverElectionService.elect(request)返回server地址
elect
tech/powerjob/server/remote/server/election/ServerElectionService.java
public String elect(ServerDiscoveryRequest request) {
if (!accurate()) {
final String currentServer = request.getCurrentServer();
// 如果是本機,就不需要查數(shù)據(jù)庫那么復雜的操作了,直接返回成功
Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
return currentServer;
}
}
return getServer0(request);
}elect方法判斷如果是本機就直接返回,否則執(zhí)行getServer0
getServer0
private String getServer0(ServerDiscoveryRequest discoveryRequest) {
final Long appId = discoveryRequest.getAppId();
final String protocol = discoveryRequest.getProtocol();
Set<String> downServerCache = Sets.newHashSet();
for (int i = 0; i < RETRY_TIMES; i++) {
// 無鎖獲取當前數(shù)據(jù)庫中的Server
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
throw new PowerJobException(appId + " is not registered!");
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
String activeAddress = activeAddress(originServer, downServerCache, protocol);
if (StringUtils.isNotEmpty(activeAddress)) {
return activeAddress;
}
// 無可用Server,重新進行Server選舉,需要加鎖
String lockName = String.format(SERVER_ELECT_LOCK, appId);
boolean lockStatus = lockService.tryLock(lockName, 30000);
if (!lockStatus) {
try {
Thread.sleep(500);
}catch (Exception ignore) {
}
continue;
}
try {
// 可能上一臺機器已經(jīng)完成了Server選舉,需要再次判斷
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
if (StringUtils.isNotEmpty(address)) {
return address;
}
// 篡位,如果本機存在協(xié)議,則作為Server調(diào)度該 worker
final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
if (targetProtocolInfo != null) {
// 注意,寫入 AppInfoDO#currentServer 的永遠是 default 的地址,僅在返回的時候特殊處理為協(xié)議地址
appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return targetProtocolInfo.getAddress();
}
}catch (Exception e) {
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
} finally {
lockService.unlock(lockName);
}
}
throw new PowerJobException("server elect failed for app " + appId);
}getServer0方法先判斷appInfoRepository中當前appId的originServer是否存活,是則直接返回,否則加鎖將transportService.defaultProtocol().getAddress()寫入到appInfo的currentServer
小結
PowerJob的ServerDiscoveryService定義了start方法,它先通過discovery方法獲取currentServerAddress,然后注冊定時任務每隔10s重新刷新一下currentServerAddress;
discovery方法主要是遍歷config.getServerAddress()執(zhí)行acquire;
acquire方法通過buildServerDiscoveryUrl構建url,然后執(zhí)行HttpUtils.get(url)獲取該appId的server地址。
以上就是PowerJob的ServerDiscoveryService工作流程源碼解讀的詳細內(nèi)容,更多關于PowerJob ServerDiscoveryService的資料請關注腳本之家其它相關文章!
- PowerJob的TimingStrategyHandler工作流程源碼解讀
- PowerJob的IdGenerateService工作流程源碼解讀
- PowerJob LockService方法工作流程源碼解讀
- PowerJob的Evaluator方法工作流程源碼解讀
- PowerJob的DatabaseMonitorAspect源碼流程
- PowerJob的AbstractScriptProcessor實現(xiàn)類工作流程源碼解讀
- PowerJob的WorkerHealthReporter工作流程源碼解讀
- PowerJob的OmsLogHandler工作流程源碼解析
- PowerJob的ProcessorLoader工作流程源碼解讀
- PowerJob的DispatchStrategy方法工作流程源碼解讀
相關文章
SpringBoot加載不出來application.yml文件的解決方法
這篇文章主要介紹了SpringBoot加載不出來application.yml文件的解決方法,文中通過示例代碼講解的非常詳細,對大家的學習或者工作有一定的幫助,需要的朋友跟著小編來一起來學習吧2023-12-12
springboot集成elasticsearch7的圖文方法
本文記錄springboot集成elasticsearch7的方法,本文通過圖文實例代碼相結合給大家介紹的非常詳細,需要的朋友參考下吧2021-05-05
Spring擴展點之BeanFactoryPostProcessor詳解
這篇文章主要介紹了Spring擴展點之BeanFactoryPostProcessor詳解,Spring的設計非常優(yōu)雅,有很多的擴展點供我們對項目進行擴展,今天學習一下Spring其中擴展點之一的BeanFactoryPostProcessor,需要的朋友可以參考下2023-11-11

