PowerJob的DispatchStrategy方法工作流程源碼解讀
序
本文主要研究一下PowerJob的DispatchStrategy
DispatchStrategy
tech/powerjob/common/enums/DispatchStrategy.java
@Getter @AllArgsConstructor public enum DispatchStrategy { HEALTH_FIRST(1), RANDOM(2); private final int v; public static DispatchStrategy of(Integer v) { if (v == null) { return HEALTH_FIRST; } for (DispatchStrategy ds : values()) { if (v.equals(ds.v)) { return ds; } } throw new IllegalArgumentException("unknown DispatchStrategy of " + v); } }
DispatchStrategy定義了HEALTH_FIRST、RANDOM兩個(gè)枚舉值
getSuitableWorkers
tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) { List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values()); workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); switch (dispatchStrategy) { case RANDOM: Collections.shuffle(workers); break; case HEALTH_FIRST: workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); break; default: // do nothing } // 限定集群大小(0代表不限制) if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) { workers = workers.subList(0, jobInfo.getMaxWorkerCount()); } return workers; }
WorkerClusterQueryService的getSuitableWorkers方法先通過getWorkerInfosByAppId獲取指定appId的WorkerInfo,然后通過filterWorker進(jìn)行一次過濾,最后根據(jù)dispatchStrategy來對(duì)workers進(jìn)行排序,如果是RANDOM則通過Collections.shuffle(workers)隨機(jī)化,如果是HEALTH_FIRST則根據(jù)systemMetrics的calculateScore結(jié)果進(jìn)行排序,如果有限定maxWorkerCount則對(duì)workers進(jìn)行subList,沒有則返回排序后的workers
getWorkerInfosByAppId
private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) { ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); return Collections.emptyMap(); } return clusterStatusHolder.getAllWorkers(); } public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() { return WorkerClusterManagerService.getAppId2ClusterStatus(); }
getWorkerInfosByAppId通過WorkerClusterManagerService.getAppId2ClusterStatus()獲取ClusterStatusHolder,在返回ClusterStatusHolder的getAllWorkers
filterWorker
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { for (WorkerFilter filter : workerFilters) { if (filter.filter(workerInfo, jobInfo)) { return true; } } return false; }
filterWorker方法則是遍歷workerFilters直接filter
calculateScore
tech/powerjob/common/model/SystemMetrics.java
public int calculateScore() { if (score > 0) { return score; } // Memory is vital to TaskTracker, so we set the multiplier factor as 2. double memScore = (jvmMaxMemory - jvmUsedMemory) * 2; // Calculate the remaining load of CPU. Multiplier is set as 1. double cpuScore = cpuProcessors - cpuLoad; // Windows can not fetch CPU load, set cpuScore as 1. if (cpuScore > cpuProcessors) { cpuScore = 1; } score = (int) (memScore + cpuScore); return score; }
SystemMetrics的calculateScore方法則是基于memScore與cpuScore來計(jì)算
WorkerFilter
public interface WorkerFilter { /** * * @param workerInfo worker info, maybe you need to use your customized info in SystemMetrics#extra * @param jobInfoDO job info * @return true will remove the worker in process list */ boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO); }
WorkerFilter定義了filter接口用于過濾worker,它有3個(gè)實(shí)現(xiàn)類,分別是DesignatedWorkerFilter、DisconnectedWorkerFilter、SystemMetricsWorkerFilter
DesignatedWorkerFilter
tech/powerjob/server/extension/defaultimpl/workerfilter/DesignatedWorkerFilter.java
@Slf4j @Component public class DesignatedWorkerFilter implements WorkerFilter { @Override public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { String designatedWorkers = jobInfo.getDesignatedWorkers(); // no worker is specified, no filter of any if (StringUtils.isEmpty(designatedWorkers)) { return false; } Set<String> designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(designatedWorkers)); for (String tagOrAddress : designatedWorkersSet) { if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) { return false; } } return true; } }
DesignatedWorkerFilter的filter方法遍歷jobInfo的designatedWorkers信息,判斷workerInfo的tag或者address是否匹配
DisconnectedWorkerFilter
tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java
@Slf4j @Component public class DisconnectedWorkerFilter implements WorkerFilter { @Override public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { boolean timeout = workerInfo.timeout(); if (timeout) { log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime()); } return timeout; } }
DisconnectedWorkerFilter的filter方法則通過WorkerInfo的timeout方法來判斷,它主要是判斷當(dāng)前時(shí)間與lastActiveTime的時(shí)間差是否大于WORKER_TIMEOUT_MS(60s)
SystemMetricsWorkerFilter
tech/powerjob/server/extension/defaultimpl/workerfilter/SystemMetricsWorkerFilter.java
@Slf4j @Component public class SystemMetricsWorkerFilter implements WorkerFilter { @Override public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { SystemMetrics metrics = workerInfo.getSystemMetrics(); boolean filter = !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); if (filter) { log.info("[Job-{}] filter worker[{}] because the {} do not meet the requirements", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getSystemMetrics()); } return filter; } }
SystemMetricsWorkerFilter的filter方法則根據(jù)workerInfo的SystemMetrics判斷可用cpu核數(shù)、內(nèi)存、磁盤空間是否大于閾值
小結(jié)
DispatchStrategy定義了HEALTH_FIRST、RANDOM兩個(gè)枚舉值;WorkerClusterQueryService的getSuitableWorkers方法先通過getWorkerInfosByAppId獲取指定appId的WorkerInfo,然后通過filterWorker進(jìn)行一次過濾,最后根據(jù)dispatchStrategy來對(duì)workers進(jìn)行排序,如果是RANDOM則通過Collections.shuffle(workers)隨機(jī)化,如果是HEALTH_FIRST則根據(jù)systemMetrics的calculateScore結(jié)果進(jìn)行排序,如果有限定maxWorkerCount則對(duì)workers進(jìn)行subList,沒有則返回排序后的workers。
以上就是PowerJob的DispatchStrategy方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob DispatchStrategy的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
idea新建mapper.xml文件詳細(xì)步驟如:mybatis-config
這篇文章主要介紹了idea新建xml模板設(shè)置,例如:mybatis-config,本文分步驟通過圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07springboot應(yīng)用訪問zookeeper的流程
這篇文章主要介紹了springboot應(yīng)用訪問zookeeper的流程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Java中字節(jié)流和字符流的區(qū)別與聯(lián)系
Java中的字節(jié)流和字符流是用于處理輸入和輸出的兩種不同的流,本文主要介紹了Java中字節(jié)流和字符流的區(qū)別與聯(lián)系,字節(jié)流以字節(jié)為單位進(jìn)行讀寫,適用于處理二進(jìn)制數(shù)據(jù),本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2024-12-12Java實(shí)現(xiàn)身份證號(hào)碼驗(yàn)證源碼示例分享
本篇文章主要介紹了Java實(shí)現(xiàn)身份證號(hào)碼驗(yàn)證源碼示例分享,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-10-10

Java 將PPT幻燈片轉(zhuǎn)為HTML文件的實(shí)現(xiàn)思路