PowerJob分布式任務調(diào)度源碼流程解讀
架構(gòu)圖
官方提供:
本文主要研究一下PowerJob的任務調(diào)度
CoreScheduleTaskManager
tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java
@Service @Slf4j @RequiredArgsConstructor public class CoreScheduleTaskManager implements InitializingBean, DisposableBean { private final PowerScheduleService powerScheduleService; private final InstanceStatusCheckService instanceStatusCheckService; private final List<Thread> coreThreadContainer = new ArrayList<>(); @SuppressWarnings("AlibabaAvoidManuallyCreateThread") @Override public void afterPropertiesSet() { // 定時調(diào)度 coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob")); // 數(shù)據(jù)清理 coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData")); // 狀態(tài)檢查 coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance")); coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance")); coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance")); coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance")); coreThreadContainer.forEach(Thread::start); } //...... }
CoreScheduleTaskManager在afterPropertiesSet的時候會啟動一系列的線程,它們都是LoopRunnable類型的,分別調(diào)度powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)、powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)、powerScheduleService::scheduleCronWorkflow、powerScheduleService::scheduleFrequentJob、powerScheduleService::cleanData、instanceStatusCheckService::checkRunningInstance、instanceStatusCheckService::checkWaitingDispatchInstance、instanceStatusCheckService::checkWaitingWorkerReceiveInstance、instanceStatusCheckService::checkWorkflowInstance
LoopRunnable
@RequiredArgsConstructor private static class LoopRunnable implements Runnable { private final String taskName; private final Long runningInterval; private final Runnable innerRunnable; @SuppressWarnings("BusyWait") @Override public void run() { log.info("start task : {}.", taskName); while (true) { try { innerRunnable.run(); Thread.sleep(runningInterval); } catch (InterruptedException e) { log.warn("[{}] task has been interrupted!", taskName, e); break; } catch (Exception e) { log.error("[{}] task failed!", taskName, e); } } } }
LoopRunnable的構(gòu)造器接收taskName、runningInterval、innerRunnable三個參數(shù),其run方法通過while true循環(huán)內(nèi)部執(zhí)行innerRunnable.run(),執(zhí)行完sleep指定的runningInterval,若捕獲到InterruptedException則break跳出循環(huán),若其他異常則打印error日志
PowerScheduleService
PowerScheduleService主要提供了scheduleNormalJob、scheduleCronWorkflow、scheduleFrequentJob、cleanData方法
scheduleNormalJob
tech/powerjob/server/core/scheduler/PowerScheduleService.java
public void scheduleNormalJob(TimeExpressionType timeExpressionType) { long start = System.currentTimeMillis(); // 調(diào)度 CRON 表達式 JOB try { final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[NormalScheduler] current server has no app's job to schedule."); return; } scheduleNormalJob0(timeExpressionType, allAppIds); } catch (Exception e) { log.error("[NormalScheduler] schedule cron job failed.", e); } long cost = System.currentTimeMillis() - start; log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost); if (cost > SCHEDULE_RATE) { log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost); } }
scheduleNormalJob方法主要是查詢當前server負責的appId列表,然后內(nèi)部委托改為scheduleNormalJob0
scheduleNormalJob0
private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List<Long> appIds) { long nowTime = System.currentTimeMillis(); long timeThreshold = nowTime + 2 * SCHEDULE_RATE; Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { try { // 查詢條件:任務開啟 + 使用CRON表達調(diào)度時間 + 指定appId + 即將需要調(diào)度執(zhí)行 List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold); if (CollectionUtils.isEmpty(jobInfos)) { return; } // 1. 批量寫日志表 Map<Long, Long> jobId2InstanceId = Maps.newHashMap(); log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos); jobInfos.forEach(jobInfo -> { Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId(); jobId2InstanceId.put(jobInfo.getId(), instanceId); }); instanceInfoRepository.flush(); // 2. 推入時間輪中等待調(diào)度執(zhí)行 jobInfos.forEach(jobInfoDO -> { Long instanceId = jobId2InstanceId.get(jobInfoDO.getId()); long targetTriggerTime = jobInfoDO.getNextTriggerTime(); long delay = 0; if (targetTriggerTime < nowTime) { log.warn("[Job-{}] schedule delay, expect: {}, current: {}", jobInfoDO.getId(), targetTriggerTime, System.currentTimeMillis()); } else { delay = targetTriggerTime - nowTime; } InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId, Optional.empty(), Optional.empty())); }); // 3. 計算下一次調(diào)度時間(忽略5S內(nèi)的重復執(zhí)行,即CRON模式下最小的連續(xù)執(zhí)行間隔為 SCHEDULE_RATE ms) jobInfos.forEach(jobInfoDO -> { try { refreshJob(timeExpressionType, jobInfoDO); } catch (Exception e) { log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e); } }); jobInfoRepository.flush(); } catch (Exception e) { log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e); } }); }
scheduleNormalJob0主要是調(diào)度CRON、DAILY_TIME_INTERVAL類型的任務,它通過jobInfoRepository查找指定appId、狀態(tài)啟用、指定TimeExpressionType,以及NextTriggerTime小于等于nowTime + 2 * SCHEDULE_RATE的任務,然后挨個執(zhí)行instanceService.create創(chuàng)建任務實例,然后放入到InstanceTimeWheelService.schedule進行調(diào)度,最后計算和更新一下每個job的nextTriggerTime
scheduleCronWorkflow
public void scheduleCronWorkflow() { long start = System.currentTimeMillis(); // 調(diào)度 CRON 表達式 WORKFLOW try { final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule."); return; } scheduleWorkflowCore(allAppIds); } catch (Exception e) { log.error("[CronWorkflowSchedule] schedule cron workflow failed.", e); } long cost = System.currentTimeMillis() - start; log.info("[CronWorkflowSchedule] cron workflow schedule use {} ms.", cost); if (cost > SCHEDULE_RATE) { log.warn("[CronWorkflowSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost); } }
scheduleCronWorkflow主要是調(diào)度CRON 表達式 WORKFLOW,內(nèi)部委托給scheduleWorkflowCore
scheduleFrequentJob
public void scheduleFrequentJob() { long start = System.currentTimeMillis(); // 調(diào)度 FIX_RATE/FIX_DELAY 表達式 JOB try { final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[FrequentJobSchedule] current server has no app's job to schedule."); return; } scheduleFrequentJobCore(allAppIds); } catch (Exception e) { log.error("[FrequentJobSchedule] schedule frequent job failed.", e); } long cost = System.currentTimeMillis() - start; log.info("[FrequentJobSchedule] frequent job schedule use {} ms.", cost); if (cost > SCHEDULE_RATE) { log.warn("[FrequentJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost); } }
scheduleFrequentJob主要是調(diào)度FIX_RATE/FIX_DELAY 表達式 JOB,內(nèi)部委托給了scheduleFrequentJobCore
scheduleFrequentJobCore
private void scheduleFrequentJobCore(List<Long> appIds) { Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { try { // 查詢所有的秒級任務(只包含ID) List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.FREQUENT_TYPES); if (CollectionUtils.isEmpty(jobIds)) { return; } // 查詢?nèi)罩居涗洷碇惺欠翊嬖谙嚓P(guān)的任務 List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.GENERALIZED_RUNNING_STATUS); Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList); List<Long> notRunningJobIds = Lists.newLinkedList(); jobIds.forEach(jobId -> { if (!runningJobIdSet.contains(jobId)) { notRunningJobIds.add(jobId); } }); if (CollectionUtils.isEmpty(notRunningJobIds)) { return; } notRunningJobIds.forEach(jobId -> { Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId); jobInfoOpt.ifPresent(jobInfoDO -> { LifeCycle lifeCycle = LifeCycle.parse(jobInfoDO.getLifecycle()); // 生命周期已經(jīng)結(jié)束 if (lifeCycle.getEnd() != null && lifeCycle.getEnd() < System.currentTimeMillis()) { jobInfoDO.setStatus(SwitchableStatus.DISABLE.getV()); jobInfoDO.setGmtModified(new Date()); jobInfoRepository.saveAndFlush(jobInfoDO); log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId()); } else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) { log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId()); jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis()); } }); }); } catch (Exception e) { log.error("[FrequentScheduler] schedule frequent job failed.", e); } }); }
scheduleFrequentJobCore主要是調(diào)度秒級任務,它先找出秒級任務的id,然后過濾掉正在運行的任務,剩下的未運行的任務挨個判斷是否需要調(diào)度,需要則執(zhí)行jobService.runJob
cleanData
public void cleanData() { try { final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (allAppIds.isEmpty()) { return; } WorkerClusterManagerService.clean(allAppIds); } catch (Exception e) { log.error("[CleanData] clean data failed.", e); } }
cleanData主要是通過WorkerClusterManagerService.clean來維護當前server負責的appId緩存
InstanceStatusCheckService
InstanceStatusCheckService提供了checkRunningInstance、checkWaitingDispatchInstance、checkWaitingWorkerReceiveInstance、checkWorkflowInstance方法
小結(jié)
PowerJob的CoreScheduleTaskManager在afterPropertiesSet的時候會啟動一系列的線程,它們都是LoopRunnable類型的,其中scheduleNormalJob主要是調(diào)度CRON、DAILY_TIME_INTERVAL類型的任務,scheduleCronWorkflow主要是調(diào)度CRON 表達式 WORKFLOW任務,scheduleFrequentJob主要是調(diào)度FIX_RATE/FIX_DELAY 表達式 JOB。
以上就是PowerJob分布式任務調(diào)度源碼流程解讀的詳細內(nèi)容,更多關(guān)于PowerJob分布式任務調(diào)度的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java環(huán)境變量path和classpath的配置
這篇文章主要為大家詳細介紹了java系統(tǒng)環(huán)境變量path和classpath的配置過程,感興趣的小伙伴們可以參考一下2016-07-07如何利用Spring把元素解析成BeanDefinition對象
這篇文章主要介紹了如何利用Spring把元素解析成BeanDefinition對象,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08Java泛型模擬scala實現(xiàn)自定義ArrayList方式
這篇文章主要介紹了Java泛型模擬scala實現(xiàn)自定義ArrayList方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10Java常用鎖synchronized和ReentrantLock的區(qū)別
這篇文章主要介紹了Java常用鎖synchronized和ReentrantLock的區(qū)別,二者的功效都是相同的,但又有很多不同點,下面我們就進入文章了解具體的相關(guān)內(nèi)容吧。需要的小伙伴也可以參考一下2022-05-05