PowerJob的TimingStrategyHandler工作流程源碼解讀
序
本文主要研究一下PowerJob的TimingStrategyHandler
TimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java
public interface TimingStrategyHandler { /** * 校驗(yàn)表達(dá)式 * * @param timeExpression 時(shí)間表達(dá)式 */ void validate(String timeExpression); /** * 計(jì)算下次觸發(fā)時(shí)間 * * @param preTriggerTime 上次觸發(fā)時(shí)間 (not null) * @param timeExpression 時(shí)間表達(dá)式 * @param startTime 開(kāi)始時(shí)間(include) * @param endTime 結(jié)束時(shí)間(include) * @return next trigger time */ Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime); /** * 支持的定時(shí)策略 * * @return TimeExpressionType */ TimeExpressionType supportType(); }
TimingStrategyHandler接口定義了validate、calculateNextTriggerTime、supportType方法
TimeExpressionType
tech/powerjob/common/enums/TimeExpressionType.java
@Getter @AllArgsConstructor @ToString public enum TimeExpressionType { API(1), CRON(2), FIXED_RATE(3), FIXED_DELAY(4), WORKFLOW(5), DAILY_TIME_INTERVAL(11); private final int v; public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v)); /** * 首次計(jì)算觸發(fā)時(shí)間時(shí)必須計(jì)算出一個(gè)有效值 */ public static final List<Integer> INSPECT_TYPES = Collections.unmodifiableList(Lists.newArrayList(CRON.v, DAILY_TIME_INTERVAL.v)); public static TimeExpressionType of(int v) { for (TimeExpressionType type : values()) { if (type.v == v) { return type; } } throw new IllegalArgumentException("unknown TimeExpressionType of " + v); } }
TimeExpressionType枚舉定義了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL幾種類型
AbstractTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java
public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler { @Override public void validate(String timeExpression) { // do nothing } @Override public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { // do nothing return null; } }
AbstractTimingStrategyHandler實(shí)現(xiàn)了TimingStrategyHandler的validate、calculateNextTriggerTime方法
ApiTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java
@Component public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler { @Override public TimeExpressionType supportType() { return TimeExpressionType.API; } }
ApiTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.API
FixedRateTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java
@Component public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler { @Override public void validate(String timeExpression) { long delay; try { delay = Long.parseLong(timeExpression); } catch (Exception e) { throw new PowerJobException("invalid timeExpression!"); } // 默認(rèn) 120s ,超過(guò)這個(gè)限制應(yīng)該使用考慮使用其他類型以減少資源占用 int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000")); if (delay > maxInterval) { throw new PowerJobException("the rate must be less than " + maxInterval + "ms"); } if (delay <= 0) { throw new PowerJobException("the rate must be greater than 0 ms"); } } @Override public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { long r = startTime != null && startTime > preTriggerTime ? startTime : preTriggerTime + Long.parseLong(timeExpression); return endTime != null && endTime < r ? null : r; } @Override public TimeExpressionType supportType() { return TimeExpressionType.FIXED_RATE; } }
FixedRateTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其validate方法校驗(yàn)interval參數(shù),要求大于0而且不能大于120s;calculateNextTriggerTime方法先根據(jù)startTime、preTriggerTime、timeExpression計(jì)算再與endTime做比較;其supportType返回的是TimeExpressionType.FIXED_RATE
FixedDelayTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java
@Component public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler { @Override public void validate(String timeExpression) { long delay; try { delay = Long.parseLong(timeExpression); } catch (Exception e) { throw new PowerJobException("invalid timeExpression!"); } // 默認(rèn) 120s ,超過(guò)這個(gè)限制應(yīng)該考慮使用其他類型以減少資源占用 int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000")); if (delay > maxInterval) { throw new PowerJobException("the delay must be less than " + maxInterval + "ms"); } if (delay <= 0) { throw new PowerJobException("the delay must be greater than 0 ms"); } } @Override public TimeExpressionType supportType() { return TimeExpressionType.FIXED_DELAY; } }
FixedDelayTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其validate要求delay大于0且小于等于120s;其supportType返回的是TimeExpressionType.FIXED_DELAY
WorkflowTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java
@Component public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler { @Override public TimeExpressionType supportType() { return TimeExpressionType.WORKFLOW; } }
WorkflowTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.WORKFLOW
CronTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java
@Component public class CronTimingStrategyHandler implements TimingStrategyHandler { private final CronParser cronParser; /** * @see CronDefinitionBuilder#instanceDefinitionFor * <p> * Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter. * https://github.com/PowerJob/PowerJob/issues/382 */ public CronTimingStrategyHandler() { CronDefinition cronDefinition = CronDefinitionBuilder.defineCron() .withSeconds().withValidRange(0, 59).and() .withMinutes().withValidRange(0, 59).and() .withHours().withValidRange(0, 23).and() .withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and() .withMonth().withValidRange(1, 12).and() .withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and() .withYear().withValidRange(1970, 2099).withStrictRange().optional().and() .instance(); this.cronParser = new CronParser(cronDefinition); } @Override public void validate(String timeExpression) { cronParser.parse(timeExpression); } @Override public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { Cron cron = cronParser.parse(timeExpression); ExecutionTime executionTime = ExecutionTime.forCron(cron); if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) { // 需要計(jì)算出離 startTime 最近的一次真正的觸發(fā)時(shí)間 Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault())); preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime); } Instant instant = Instant.ofEpochMilli(preTriggerTime); ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime); if (opt.isPresent()) { long nextTriggerTime = opt.get().toEpochSecond() * 1000; if (endTime != null && endTime < nextTriggerTime) { return null; } return nextTriggerTime; } return null; } @Override public TimeExpressionType supportType() { return TimeExpressionType.CRON; } }
CronTimingStrategyHandler實(shí)現(xiàn)了TimingStrategyHandler接口,其構(gòu)造器先創(chuàng)建了CronDefinition,再根據(jù)CronDefinition創(chuàng)建CronParser;其validate方法調(diào)用了cronParser.parse(timeExpression);calculateNextTriggerTime方法先解析timeExpression,再解析為ExecutionTime,再根據(jù)startTime和preTriggerTime計(jì)算新的preTriggerTime;最后通過(guò)executionTime.nextExecution計(jì)算nextTriggerTime;其supportType返回的是TimeExpressionType.CRON
DailyTimeIntervalStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/DailyTimeIntervalStrategyHandler.java
@Component public class DailyTimeIntervalStrategyHandler implements TimingStrategyHandler { /** * 使用中國(guó)星期?。?! */ private static final Set<Integer> ALL_DAY = Sets.newHashSet(1, 2, 3, 4, 5, 6, 7); @Override public TimeExpressionType supportType() { return TimeExpressionType.DAILY_TIME_INTERVAL; } @Override @SneakyThrows public void validate(String timeExpression) { DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class); CommonUtils.requireNonNull(ep.interval, "interval can't be null or empty in DailyTimeIntervalExpress"); CommonUtils.requireNonNull(ep.startTimeOfDay, "startTimeOfDay can't be null or empty in DailyTimeIntervalExpress"); CommonUtils.requireNonNull(ep.endTimeOfDay, "endTimeOfDay can't be null or empty in DailyTimeIntervalExpress"); TimeOfDay startTime = TimeOfDay.from(ep.startTimeOfDay); TimeOfDay endTime = TimeOfDay.from(ep.endTimeOfDay); if (endTime.before(startTime)) { throw new IllegalArgumentException("endTime should after startTime!"); } if (StringUtils.isNotEmpty(ep.intervalUnit)) { TimeUnit.valueOf(ep.intervalUnit); } } @Override @SneakyThrows public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class); // 未開(kāi)始狀態(tài)下,用起點(diǎn)算調(diào)度時(shí)間 if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) { return calculateInRangeTime(startTime, ep); } // 間隔時(shí)間 TimeUnit timeUnit = Optional.ofNullable(ep.intervalUnit).map(TimeUnit::valueOf).orElse(TimeUnit.SECONDS); long interval = timeUnit.toMillis(ep.interval); Long ret = calculateInRangeTime(preTriggerTime + interval, ep); if (ret == null || ret <= Optional.ofNullable(endTime).orElse(Long.MAX_VALUE)) { return ret; } return null; } /** * 計(jì)算最近一次在范圍中的時(shí)間 * @param time 當(dāng)前時(shí)間基準(zhǔn),可能直接返回該時(shí)間作為結(jié)果 * @param ep 表達(dá)式 * @return 最近一次在范圍中的時(shí)間 */ static Long calculateInRangeTime(Long time, DailyTimeIntervalExpress ep) { Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date(time)); int year = calendar.get(Calendar.YEAR); // 月份 + 1,轉(zhuǎn)為熟悉的 1~12 月 int month = calendar.get(Calendar.MONTH) + 1; int day = calendar.get(Calendar.DAY_OF_MONTH); // 判斷是否符合"日"的執(zhí)行條件 int week = TimeUtils.calculateWeek(year, month, day); Set<Integer> targetDays = CollectionUtils.isEmpty(ep.daysOfWeek) ? ALL_DAY : ep.daysOfWeek; // 未包含情況下,將時(shí)間改寫(xiě)為符合條件日的 00:00 分,重新開(kāi)始遞歸(這部分應(yīng)該有性能更優(yōu)的寫(xiě)法,不過(guò)這個(gè)調(diào)度模式應(yīng)該很難觸發(fā)瓶頸,先簡(jiǎn)單好用的實(shí)現(xiàn)) if (!targetDays.contains(week)) { simpleSetCalendar(calendar, 0, 0, 0); Date tomorrowZero = DateUtils.addDays(calendar.getTime(), 1); return calculateInRangeTime(tomorrowZero.getTime(), ep); } // 范圍的開(kāi)始時(shí)間 TimeOfDay rangeStartTime = TimeOfDay.from(ep.startTimeOfDay); simpleSetCalendar(calendar, rangeStartTime.getHour(), rangeStartTime.getMinute(), rangeStartTime.getSecond()); long todayStartTs = calendar.getTimeInMillis(); // 未開(kāi)始 if (time < todayStartTs) { return todayStartTs; } TimeOfDay rangeEndTime = TimeOfDay.from(ep.endTimeOfDay); simpleSetCalendar(calendar, rangeEndTime.getHour(), rangeEndTime.getMinute(), rangeEndTime.getSecond()); long todayEndTs = calendar.getTimeInMillis(); // 范圍之間 if (time <= todayEndTs) { return time; } // 已結(jié)束,重新計(jì)算第二天時(shí)間 simpleSetCalendar(calendar, 0, 0, 0); return calculateInRangeTime(DateUtils.addDays(calendar.getTime(), 1).getTime(), ep); } //...... }
DailyTimeIntervalStrategyHandler實(shí)現(xiàn)了TimingStrategyHandler接口,其supportType返回的是TimeExpressionType.DAILY_TIME_INTERVAL;其validate方法先解析參數(shù)為DailyTimeIntervalExpress,然后校驗(yàn)其endTime不能比startTime??;其calculateNextTriggerTime方法主要是通過(guò)calculateInRangeTime來(lái)計(jì)算最近一次在范圍中的時(shí)間
TimingStrategyService
tech/powerjob/server/core/scheduler/TimingStrategyService.java
@Slf4j @Service public class TimingStrategyService { private static final int NEXT_N_TIMES = 5; private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!"); private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer; public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) { // init strategyContainer = new EnumMap<>(TimeExpressionType.class); for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) { strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler); } } /** * 計(jì)算接下來(lái)幾次的調(diào)度時(shí)間 * * @param timeExpressionType 定時(shí)表達(dá)式類型 * @param timeExpression 表達(dá)式 * @param startTime 起始時(shí)間(include) * @param endTime 結(jié)束時(shí)間(include) * @return 調(diào)度時(shí)間列表 */ public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType); List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES); Long nextTriggerTime = System.currentTimeMillis(); do { nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime); if (nextTriggerTime == null) { break; } triggerTimeList.add(nextTriggerTime); } while (triggerTimeList.size() < NEXT_N_TIMES); if (triggerTimeList.isEmpty()) { return TIPS; } return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList()); } /** * 計(jì)算下次的調(diào)度時(shí)間 * * @param preTriggerTime 上次觸發(fā)時(shí)間(nullable) * @param timeExpressionType 定時(shí)表達(dá)式類型 * @param timeExpression 表達(dá)式 * @param startTime 起始時(shí)間(include) * @param endTime 結(jié)束時(shí)間(include) * @return 下次的調(diào)度時(shí)間 */ public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) { preTriggerTime = System.currentTimeMillis(); } return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime); } /** * 計(jì)算下次的調(diào)度時(shí)間并檢查校驗(yàn)規(guī)則 * * @param timeExpressionType 定時(shí)表達(dá)式類型 * @param timeExpression 表達(dá)式 * @param startTime 起始時(shí)間(include) * @param endTime 結(jié)束時(shí)間(include) * @return 下次的調(diào)度時(shí)間 */ public Long calculateNextTriggerTimeWithInspection( TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { Long nextTriggerTime = calculateNextTriggerTime(null, timeExpressionType, timeExpression, startTime, endTime); if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) { throw new PowerJobException("time expression is out of date: " + timeExpression); } return nextTriggerTime; } public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { if (endTime != null) { if (endTime <= System.currentTimeMillis()) { throw new PowerJobException("lifecycle is out of date!"); } if (startTime != null && startTime > endTime) { throw new PowerJobException("lifecycle is invalid! start time must earlier then end time."); } } getHandler(timeExpressionType).validate(timeExpression); } private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) { TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType); if (timingStrategyHandler == null) { throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType); } return timingStrategyHandler; } }
TimingStrategyService的構(gòu)造器遍歷timingStrategyHandlers,然后根據(jù)其supportType構(gòu)建Map<TimeExpressionType, TimingStrategyHandler>;其calculateNextTriggerTimes先根據(jù)timeExpressionType獲取到對(duì)應(yīng)的TimingStrategyHandler,再循環(huán)調(diào)用TimingStrategyHandler.calculateNextTriggerTime方法來(lái)計(jì)算nextTriggerTime,最后返回最近5次的調(diào)度時(shí)間;calculateNextTriggerTimeWithInspection方法會(huì)計(jì)算nextTriggerTime并針對(duì)CRON及DAILY_TIME_INTERVAL類型的要求其不能為null;validate方法調(diào)用的是對(duì)應(yīng)TimingStrategyHandler的validate方法
小結(jié)
TimingStrategyHandler接口定義了validate、calculateNextTriggerTime、supportType方法;其支持的TimeExpressionType枚舉定義了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL幾種類型,分別對(duì)應(yīng)了ApiTimingStrategyHandler、CronTimingStrategyHandler、FixedRateTimingStrategyHandler、FixedDelayTimingStrategyHandler、WorkflowTimingStrategyHandler、DailyTimeIntervalStrategyHandler;TimingStrategyService則聚合了這些TimingStrategyHandler,對(duì)外提供了calculateNextTriggerTimes、calculateNextTriggerTime、calculateNextTriggerTimeWithInspection、validate方法。
以上就是PowerJob的TimingStrategyHandler工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob TimingStrategyHandler的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)公用實(shí)體類轉(zhuǎn)Tree結(jié)構(gòu)
這篇文章主要為大家介紹了一個(gè)Java工具類,可以實(shí)現(xiàn)Java公用實(shí)體類轉(zhuǎn)Tree結(jié)構(gòu),文中的示例代碼簡(jiǎn)潔易懂,感興趣的小伙伴可以參考一下2024-10-10SpringCloud集成Eureka并實(shí)現(xiàn)負(fù)載均衡的過(guò)程詳解
這篇文章主要給大家詳細(xì)介紹了SpringCloud集成Eureka并實(shí)現(xiàn)負(fù)載均衡的過(guò)程,文章通過(guò)代碼示例和圖文講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的參考價(jià)值,需要的朋友可以參考下2023-11-11Java 常用類解析:java異常機(jī)制,異常棧,異常處理方式,異常鏈,異常丟失詳解
這篇文章主要介紹了Java 常用類解析:java異常機(jī)制,異常棧,異常處理方式,異常鏈,異常丟失詳解的相關(guān)資料,需要的朋友可以參考下2017-03-03SpringBoot Security密碼加鹽實(shí)例
這篇文章主要為打擊介紹了SpringBoot Security密碼加鹽實(shí)例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02Java模擬棧和隊(duì)列數(shù)據(jù)結(jié)構(gòu)的基本示例講解
這篇文章主要介紹了Java模擬棧和隊(duì)列數(shù)據(jù)結(jié)構(gòu)的基本示例,棧的后進(jìn)先出和隊(duì)列的先進(jìn)先出是數(shù)據(jù)結(jié)構(gòu)中最基礎(chǔ)的知識(shí),本文則又對(duì)Java實(shí)現(xiàn)棧和隊(duì)列結(jié)構(gòu)的方法進(jìn)行了細(xì)分,需要的朋友可以參考下2016-04-04java僅用30行代碼就實(shí)現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換
這篇文章主要介紹了java僅用30行代碼就實(shí)現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04