欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob的TimingStrategyHandler工作流程源碼解讀

 更新時(shí)間:2024年01月12日 09:29:38   作者:codecraft  
這篇文章主要為大家介紹了PowerJob的TimingStrategyHandler工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJob的TimingStrategyHandler

TimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java

public interface TimingStrategyHandler {
    /**
     * 校驗(yàn)表達(dá)式
     *
     * @param timeExpression 時(shí)間表達(dá)式
     */
    void validate(String timeExpression);
    /**
     * 計(jì)算下次觸發(fā)時(shí)間
     *
     * @param preTriggerTime 上次觸發(fā)時(shí)間 (not null)
     * @param timeExpression 時(shí)間表達(dá)式
     * @param startTime      開(kāi)始時(shí)間(include)
     * @param endTime        結(jié)束時(shí)間(include)
     * @return next trigger time
     */
    Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime);
    /**
     * 支持的定時(shí)策略
     *
     * @return TimeExpressionType
     */
    TimeExpressionType supportType();
}
TimingStrategyHandler接口定義了validate、calculateNextTriggerTime、supportType方法

TimeExpressionType

tech/powerjob/common/enums/TimeExpressionType.java

@Getter
@AllArgsConstructor
@ToString
public enum TimeExpressionType {
    API(1),
    CRON(2),
    FIXED_RATE(3),
    FIXED_DELAY(4),
    WORKFLOW(5),
    DAILY_TIME_INTERVAL(11);
    private final int v;
    public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v));
    /**
     * 首次計(jì)算觸發(fā)時(shí)間時(shí)必須計(jì)算出一個(gè)有效值
     */
    public static final List<Integer> INSPECT_TYPES =  Collections.unmodifiableList(Lists.newArrayList(CRON.v, DAILY_TIME_INTERVAL.v));
    public static TimeExpressionType of(int v) {
        for (TimeExpressionType type : values()) {
            if (type.v == v) {
                return type;
            }
        }
        throw new IllegalArgumentException("unknown TimeExpressionType of " + v);
    }
}
TimeExpressionType枚舉定義了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL幾種類型

AbstractTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java

public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler {
    @Override
    public void validate(String timeExpression) {
        // do nothing
    }
    @Override
    public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
        // do nothing
        return null;
    }
}
AbstractTimingStrategyHandler實(shí)現(xiàn)了TimingStrategyHandler的validate、calculateNextTriggerTime方法

ApiTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java

@Component
public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler {
    @Override
    public TimeExpressionType supportType() {
        return TimeExpressionType.API;
    }
}
ApiTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.API

FixedRateTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java

@Component
public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler {
    @Override
    public void validate(String timeExpression) {
        long delay;
        try {
            delay = Long.parseLong(timeExpression);
        } catch (Exception e) {
            throw new PowerJobException("invalid timeExpression!");
        }
        // 默認(rèn) 120s ,超過(guò)這個(gè)限制應(yīng)該使用考慮使用其他類型以減少資源占用
        int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
        if (delay > maxInterval) {
            throw new PowerJobException("the rate must be less than " + maxInterval + "ms");
        }
        if (delay <= 0) {
            throw new PowerJobException("the rate must be greater than 0 ms");
        }
    }
    @Override
    public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
        long r = startTime != null && startTime > preTriggerTime
                ? startTime : preTriggerTime + Long.parseLong(timeExpression);
        return endTime != null && endTime < r ? null : r;
    }
    @Override
    public TimeExpressionType supportType() {
        return TimeExpressionType.FIXED_RATE;
    }
}
FixedRateTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其validate方法校驗(yàn)interval參數(shù),要求大于0而且不能大于120s;calculateNextTriggerTime方法先根據(jù)startTime、preTriggerTime、timeExpression計(jì)算再與endTime做比較;其supportType返回的是TimeExpressionType.FIXED_RATE

FixedDelayTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java

@Component
public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler {

    @Override
    public void validate(String timeExpression) {
        long delay;
        try {
            delay = Long.parseLong(timeExpression);
        } catch (Exception e) {
            throw new PowerJobException("invalid timeExpression!");
        }
        // 默認(rèn) 120s ,超過(guò)這個(gè)限制應(yīng)該考慮使用其他類型以減少資源占用
        int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
        if (delay > maxInterval) {
            throw new PowerJobException("the delay must be less than " + maxInterval + "ms");
        }
        if (delay <= 0) {
            throw new PowerJobException("the delay must be greater than 0 ms");
        }
    }

    @Override
    public TimeExpressionType supportType() {
        return TimeExpressionType.FIXED_DELAY;
    }
}
FixedDelayTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其validate要求delay大于0且小于等于120s;其supportType返回的是TimeExpressionType.FIXED_DELAY

WorkflowTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java

@Component
public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler {
    @Override
    public TimeExpressionType supportType() {
        return TimeExpressionType.WORKFLOW;
    }
}
WorkflowTimingStrategyHandler繼承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.WORKFLOW

CronTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java

@Component
public class CronTimingStrategyHandler implements TimingStrategyHandler {
    private final CronParser cronParser;
    /**
     * @see CronDefinitionBuilder#instanceDefinitionFor
     * <p>
     * Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter.
     * https://github.com/PowerJob/PowerJob/issues/382
     */
    public CronTimingStrategyHandler() {
        CronDefinition cronDefinition = CronDefinitionBuilder.defineCron()
                .withSeconds().withValidRange(0, 59).and()
                .withMinutes().withValidRange(0, 59).and()
                .withHours().withValidRange(0, 23).and()
                .withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and()
                .withMonth().withValidRange(1, 12).and()
                .withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and()
                .withYear().withValidRange(1970, 2099).withStrictRange().optional().and()
                .instance();
        this.cronParser = new CronParser(cronDefinition);
    }
    @Override
    public void validate(String timeExpression) {
        cronParser.parse(timeExpression);
    }
    @Override
    public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
        Cron cron = cronParser.parse(timeExpression);
        ExecutionTime executionTime = ExecutionTime.forCron(cron);
        if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {
            // 需要計(jì)算出離 startTime 最近的一次真正的觸發(fā)時(shí)間
            Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()));
            preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime);
        }
        Instant instant = Instant.ofEpochMilli(preTriggerTime);
        ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
        Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime);
        if (opt.isPresent()) {
            long nextTriggerTime = opt.get().toEpochSecond() * 1000;
            if (endTime != null && endTime < nextTriggerTime) {
                return null;
            }
            return nextTriggerTime;
        }
        return null;
    }
    @Override
    public TimeExpressionType supportType() {
        return TimeExpressionType.CRON;
    }
}
CronTimingStrategyHandler實(shí)現(xiàn)了TimingStrategyHandler接口,其構(gòu)造器先創(chuàng)建了CronDefinition,再根據(jù)CronDefinition創(chuàng)建CronParser;其validate方法調(diào)用了cronParser.parse(timeExpression);calculateNextTriggerTime方法先解析timeExpression,再解析為ExecutionTime,再根據(jù)startTime和preTriggerTime計(jì)算新的preTriggerTime;最后通過(guò)executionTime.nextExecution計(jì)算nextTriggerTime;其supportType返回的是TimeExpressionType.CRON

DailyTimeIntervalStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/DailyTimeIntervalStrategyHandler.java

@Component
public class DailyTimeIntervalStrategyHandler implements TimingStrategyHandler {
    /**
     * 使用中國(guó)星期?。?!
     */
    private static final Set<Integer> ALL_DAY = Sets.newHashSet(1, 2, 3, 4, 5, 6, 7);
    @Override
    public TimeExpressionType supportType() {
        return TimeExpressionType.DAILY_TIME_INTERVAL;
    }
    @Override
    @SneakyThrows
    public void validate(String timeExpression) {
        DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);
        CommonUtils.requireNonNull(ep.interval, "interval can't be null or empty in DailyTimeIntervalExpress");
        CommonUtils.requireNonNull(ep.startTimeOfDay, "startTimeOfDay can't be null or empty in DailyTimeIntervalExpress");
        CommonUtils.requireNonNull(ep.endTimeOfDay, "endTimeOfDay can't be null or empty in DailyTimeIntervalExpress");
        TimeOfDay startTime = TimeOfDay.from(ep.startTimeOfDay);
        TimeOfDay endTime = TimeOfDay.from(ep.endTimeOfDay);
        if (endTime.before(startTime)) {
            throw new IllegalArgumentException("endTime should after startTime!");
        }
        if (StringUtils.isNotEmpty(ep.intervalUnit)) {
            TimeUnit.valueOf(ep.intervalUnit);
        }
    }
    @Override
    @SneakyThrows
    public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
        DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);
        // 未開(kāi)始狀態(tài)下,用起點(diǎn)算調(diào)度時(shí)間
        if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {
            return calculateInRangeTime(startTime, ep);
        }
        // 間隔時(shí)間
        TimeUnit timeUnit = Optional.ofNullable(ep.intervalUnit).map(TimeUnit::valueOf).orElse(TimeUnit.SECONDS);
        long interval = timeUnit.toMillis(ep.interval);
        Long ret = calculateInRangeTime(preTriggerTime + interval, ep);
        if (ret == null || ret <= Optional.ofNullable(endTime).orElse(Long.MAX_VALUE)) {
            return ret;
        }
        return null;
    }
    /**
     * 計(jì)算最近一次在范圍中的時(shí)間
     * @param time 當(dāng)前時(shí)間基準(zhǔn),可能直接返回該時(shí)間作為結(jié)果
     * @param ep 表達(dá)式
     * @return 最近一次在范圍中的時(shí)間
     */
    static Long calculateInRangeTime(Long time, DailyTimeIntervalExpress ep) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(new Date(time));
        int year = calendar.get(Calendar.YEAR);
        // 月份 + 1,轉(zhuǎn)為熟悉的 1~12 月
        int month = calendar.get(Calendar.MONTH) + 1;
        int day = calendar.get(Calendar.DAY_OF_MONTH);
        // 判斷是否符合"日"的執(zhí)行條件
        int week = TimeUtils.calculateWeek(year, month, day);
        Set<Integer> targetDays = CollectionUtils.isEmpty(ep.daysOfWeek) ? ALL_DAY : ep.daysOfWeek;
        // 未包含情況下,將時(shí)間改寫(xiě)為符合條件日的 00:00 分,重新開(kāi)始遞歸(這部分應(yīng)該有性能更優(yōu)的寫(xiě)法,不過(guò)這個(gè)調(diào)度模式應(yīng)該很難觸發(fā)瓶頸,先簡(jiǎn)單好用的實(shí)現(xiàn))
        if (!targetDays.contains(week)) {
            simpleSetCalendar(calendar, 0, 0, 0);
            Date tomorrowZero = DateUtils.addDays(calendar.getTime(), 1);
            return calculateInRangeTime(tomorrowZero.getTime(), ep);
        }
        // 范圍的開(kāi)始時(shí)間
        TimeOfDay rangeStartTime = TimeOfDay.from(ep.startTimeOfDay);
        simpleSetCalendar(calendar, rangeStartTime.getHour(), rangeStartTime.getMinute(), rangeStartTime.getSecond());
        long todayStartTs = calendar.getTimeInMillis();
        // 未開(kāi)始
        if (time < todayStartTs) {
            return todayStartTs;
        }
        TimeOfDay rangeEndTime = TimeOfDay.from(ep.endTimeOfDay);
        simpleSetCalendar(calendar, rangeEndTime.getHour(), rangeEndTime.getMinute(), rangeEndTime.getSecond());
        long todayEndTs = calendar.getTimeInMillis();
        // 范圍之間
        if (time <= todayEndTs) {
            return time;
        }
        // 已結(jié)束,重新計(jì)算第二天時(shí)間
        simpleSetCalendar(calendar, 0, 0, 0);
        return calculateInRangeTime(DateUtils.addDays(calendar.getTime(), 1).getTime(), ep);
    }
    //......
}
DailyTimeIntervalStrategyHandler實(shí)現(xiàn)了TimingStrategyHandler接口,其supportType返回的是TimeExpressionType.DAILY_TIME_INTERVAL;其validate方法先解析參數(shù)為DailyTimeIntervalExpress,然后校驗(yàn)其endTime不能比startTime??;其calculateNextTriggerTime方法主要是通過(guò)calculateInRangeTime來(lái)計(jì)算最近一次在范圍中的時(shí)間

TimingStrategyService

tech/powerjob/server/core/scheduler/TimingStrategyService.java

@Slf4j
@Service
public class TimingStrategyService {
    private static final int NEXT_N_TIMES = 5;
    private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!");
    private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer;
    public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) {
        // init
        strategyContainer = new EnumMap<>(TimeExpressionType.class);
        for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) {
            strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler);
        }
    }
    /**
     * 計(jì)算接下來(lái)幾次的調(diào)度時(shí)間
     *
     * @param timeExpressionType 定時(shí)表達(dá)式類型
     * @param timeExpression     表達(dá)式
     * @param startTime          起始時(shí)間(include)
     * @param endTime            結(jié)束時(shí)間(include)
     * @return 調(diào)度時(shí)間列表
     */
    public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
        TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType);
        List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES);
        Long nextTriggerTime = System.currentTimeMillis();
        do {
            nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime);
            if (nextTriggerTime == null) {
                break;
            }
            triggerTimeList.add(nextTriggerTime);
        } while (triggerTimeList.size() < NEXT_N_TIMES);
        if (triggerTimeList.isEmpty()) {
            return TIPS;
        }
        return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList());
    }
    /**
     * 計(jì)算下次的調(diào)度時(shí)間
     *
     * @param preTriggerTime     上次觸發(fā)時(shí)間(nullable)
     * @param timeExpressionType 定時(shí)表達(dá)式類型
     * @param timeExpression     表達(dá)式
     * @param startTime          起始時(shí)間(include)
     * @param endTime            結(jié)束時(shí)間(include)
     * @return 下次的調(diào)度時(shí)間
     */
    public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
        if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) {
            preTriggerTime = System.currentTimeMillis();
        }
        return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime);
    }
    /**
     * 計(jì)算下次的調(diào)度時(shí)間并檢查校驗(yàn)規(guī)則
     *
     * @param timeExpressionType 定時(shí)表達(dá)式類型
     * @param timeExpression     表達(dá)式
     * @param startTime          起始時(shí)間(include)
     * @param endTime            結(jié)束時(shí)間(include)
     * @return 下次的調(diào)度時(shí)間
     */
    public Long calculateNextTriggerTimeWithInspection( TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
        Long nextTriggerTime = calculateNextTriggerTime(null, timeExpressionType, timeExpression, startTime, endTime);
        if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) {
            throw new PowerJobException("time expression is out of date: " + timeExpression);
        }
        return nextTriggerTime;
    }
    public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
        if (endTime != null) {
            if (endTime <= System.currentTimeMillis()) {
                throw new PowerJobException("lifecycle is out of date!");
            }
            if (startTime != null && startTime > endTime) {
                throw new PowerJobException("lifecycle is invalid! start time must earlier then end time.");
            }
        }
        getHandler(timeExpressionType).validate(timeExpression);
    }
    private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) {
        TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType);
        if (timingStrategyHandler == null) {
            throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType);
        }
        return timingStrategyHandler;
    }
}
TimingStrategyService的構(gòu)造器遍歷timingStrategyHandlers,然后根據(jù)其supportType構(gòu)建Map<TimeExpressionType, TimingStrategyHandler>;其calculateNextTriggerTimes先根據(jù)timeExpressionType獲取到對(duì)應(yīng)的TimingStrategyHandler,再循環(huán)調(diào)用TimingStrategyHandler.calculateNextTriggerTime方法來(lái)計(jì)算nextTriggerTime,最后返回最近5次的調(diào)度時(shí)間;calculateNextTriggerTimeWithInspection方法會(huì)計(jì)算nextTriggerTime并針對(duì)CRON及DAILY_TIME_INTERVAL類型的要求其不能為null;validate方法調(diào)用的是對(duì)應(yīng)TimingStrategyHandler的validate方法

小結(jié)

TimingStrategyHandler接口定義了validate、calculateNextTriggerTime、supportType方法;其支持的TimeExpressionType枚舉定義了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL幾種類型,分別對(duì)應(yīng)了ApiTimingStrategyHandler、CronTimingStrategyHandler、FixedRateTimingStrategyHandler、FixedDelayTimingStrategyHandler、WorkflowTimingStrategyHandler、DailyTimeIntervalStrategyHandler;TimingStrategyService則聚合了這些TimingStrategyHandler,對(duì)外提供了calculateNextTriggerTimes、calculateNextTriggerTime、calculateNextTriggerTimeWithInspection、validate方法。

以上就是PowerJob的TimingStrategyHandler工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob TimingStrategyHandler的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論