spring schedule任務(wù)調(diào)度方式
spring schedule任務(wù)調(diào)度
啟用 Spring 的任務(wù)調(diào)度功能需要使用@EnableScheduling注解,該注解會(huì)引入ScheduledAnnotationBeanPostProcessor。
beanprocessor是一個(gè)bean后置處理器,負(fù)責(zé)掃描帶有 @Scheduled 注解的方法,將其轉(zhuǎn)換為可執(zhí)行的任務(wù),并根據(jù)注解的屬性將其注冊(cè)到 TaskScheduler 中進(jìn)行管理和執(zhí)行。
這樣,開發(fā)者只需要在普通 Spring Bean 的方法上添加 @Scheduled 注解,Spring 就能自動(dòng)地按照指定的時(shí)間策略執(zhí)行這些方法,而無需手動(dòng)創(chuàng)建和管理線程。其內(nèi)部有一個(gè)registrar是 ScheduledTaskRegistrar用來注冊(cè)任務(wù)。
查找 @Scheduled 注解
在ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization()方法處理所有的@Scheduled,具體處理每個(gè)注解方法是
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//解析所有的@Scheduled注解
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
}
else {
// 調(diào)用processScheduled()方法初始化調(diào)度任務(wù)
annotatedMethods.forEach((method, scheduledAnnotations) ->
scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
}
}
return bean;
}processScheduled(Scheduled scheduled, Method method, Object bean)
scheduled:注解配置任務(wù)周期相關(guān)信息method:注解所在方法bean:注解所在實(shí)例對(duì)象。有bean和method就可以通過反射 進(jìn)行方法調(diào)用。
processScheduled()方法首先將schedueld注解上的方法封裝傳給你一個(gè)runnable任務(wù),然后
封裝任務(wù)
protected Runnable createRunnable(Object target, Method method) {
//@Scheduled 注解修飾的方法必須是無參的
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}這里就是將實(shí)例方法包裝成一個(gè)ScheduledMethodRunnable對(duì)象,
ScheduledMethodRunnable.run方法就是通過反射調(diào)用該方法。
ReflectionUtils.makeAccessible(this.method); this.method.invoke(this.target);
注冊(cè)任務(wù)
下一步會(huì)解析 @Scheduled 注解中的屬性,如
fixedRate: 以固定的時(shí)間間隔(毫秒)執(zhí)行任務(wù),在上一次任務(wù)開始后等待指定的時(shí)間。fixedDelay: 在上一次任務(wù)完成后,等待指定的時(shí)間(毫秒)再執(zhí)行下一次任務(wù)。cron: 使用 Cron 表達(dá)式定義任務(wù)的執(zhí)行時(shí)間。initialDelay: 任務(wù)第一次執(zhí)行前的延遲時(shí)間(毫秒)。
不同的類型會(huì)通過registrar不同方法進(jìn)行注冊(cè)。
this.registrar.scheduleCronTask() this.registrar.scheduleFixedDelayTask() this.registrar.scheduleFixedRateTask()
這里又引入了一個(gè)重要的類ScheduledTaskRegistrar來注冊(cè)任務(wù)。
這里以scheduleCronTask方法為例來看下cron表達(dá)式類型任務(wù)的注冊(cè):
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
//taskScheduler是否初始化
if (this.taskScheduler != null) {
//創(chuàng)建任務(wù)
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {//taskScheduler未初始化,將任務(wù)放到未處理列表里
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}入?yún)⑹且粋€(gè)CronTask類型,在上面的processScheduled()方法調(diào)用實(shí)例是
/** runnable就是封裝的ScheduledMethodRunnable */ this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron /**cron表達(dá)式*/, timeZone)))
最后調(diào)用taskScheduler.schedule(task.getRunnable(), task.getTrigger())來啟動(dòng)任務(wù)。
taskScheduler
在看taskScheduler.schedule()方法前首先來看taskScheduler是怎么初始化的。
這還要看ScheduledAnnotationBeanPostProcessor ,其實(shí)現(xiàn)了SmartInitializingSingleton接口。在所有Singleton Bean 初始化完成后被調(diào)用afterSingletonsInstantiated()方法。該方法又會(huì)調(diào)用finishRegistration();來完成惹我你的注冊(cè)。
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
}
}
catch (NoSuchBeanDefinitionException ex2) {
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
this.registrar.afterPropertiesSet();
}這里首先會(huì)從beanfacotry中查找SchedulingConfigurer類型的bean,然后調(diào)用configureTasks來加載自定義schedule配置信息,這里入?yún)⑹莚egistrar,然后從容器中查找TaskScheduler、ScheduledExecutorService類型的bean來初始化taskScheduler。最后調(diào)用registrar.afterPropertiesSet()。這里還有一步兜底,如果schedule還是為空,則默認(rèn)創(chuàng)建一個(gè)ConcurrentTaskScheduler類型的scheduler。
registrar.afterPropertiesSet()方法
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}任務(wù)執(zhí)行
回過頭來繼續(xù)看taskScheduler.schedule()任務(wù)的注冊(cè),這里就看默認(rèn)的ConcurrentTaskScheduler.schedule()方法。
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}最后調(diào)用ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule()
來看ReschedulingRunnable的schedule方法
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}計(jì)算好下次執(zhí)行時(shí)間initialDelay,使用線程池executor延遲執(zhí)行當(dāng)前ReschedulingRunnable。
run方法
public void run() {
Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
super.run();
Date completionTime = new Date(this.triggerContext.getClock().millis());
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
schedule();
}
}
}這里super.run()就是調(diào)用ReschedulingRunnable extends DelegatingErrorHandlingRunnable構(gòu)造方法創(chuàng)建傳入的task,也就是原始schedule注解方法。
super.run()就是DelegatingErrorHandlingRunnable的run方法
public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(errorHandler, "ErrorHandler must not be null");
this.delegate = delegate;
this.errorHandler = errorHandler;
}
public void run() {
try {
this.delegate.run();
}
catch (UndeclaredThrowableException ex) {
this.errorHandler.handleError(ex.getUndeclaredThrowable());
}
catch (Throwable ex) {
this.errorHandler.handleError(ex);
}
}回到ReschedulingRunnable的run方法,在執(zhí)行完被代理task后,如果任務(wù)沒有被取消,又調(diào)用schedule()方法進(jìn)行下一次任務(wù)執(zhí)行。這樣就完成了任務(wù)的周期性執(zhí)行。
怎么動(dòng)態(tài)控制定時(shí)任務(wù)?
如果想取消或修改某個(gè)任務(wù)執(zhí)行周期,這個(gè)時(shí)候該如何做呢?
這個(gè)時(shí)候可以使用上面說的SchedulingConfigurer接口,該接口回暴露ScheduledTaskRegistrar類實(shí)例。上面代碼分析可以看到,所有的任務(wù)都是通過該類進(jìn)行初始化的,通過該類可以動(dòng)態(tài)的添加任務(wù)。并且schedule()方法返回的是一個(gè)ScheduledFuture,可以通過調(diào)用cancel方法來取消任務(wù)。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
這一次搞懂Spring自定義標(biāo)簽以及注解解析原理說明
這篇文章主要介紹了這一次搞懂Spring自定義標(biāo)簽以及注解解析原理說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-08-08
SpringBoot和前端聯(lián)動(dòng)實(shí)現(xiàn)存儲(chǔ)瀏覽記錄功能
這篇文章主要介紹了SpringBoot和前端聯(lián)動(dòng)實(shí)現(xiàn)存儲(chǔ)瀏覽記錄功能,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01
詳談cxf和axis兩種框架下的webservice客戶端開發(fā)
這篇文章主要介紹了詳談cxf和axis兩種框架下的webservice客戶端開發(fā),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
java讀取resources文件詳解及實(shí)現(xiàn)代碼
這篇文章主要介紹了java讀取resources文件詳解及實(shí)現(xiàn)代碼的相關(guān)資料,在開發(fā)項(xiàng)目的時(shí)候經(jīng)常會(huì)遇到讀取文件夾里面的內(nèi)容,需要的朋友可以參考下2017-07-07
Mybatis-plus使用selectList查詢數(shù)據(jù)為null的問題及解決辦法
這篇文章主要介紹了Mybatis-plus使用selectList查詢數(shù)據(jù)為null的問題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07
elasticsearch 8.2.3 安裝及springboot簡(jiǎn)單使用
這篇文章主要介紹了elasticsearch 8.2.3 安裝及springboot簡(jiǎn)單使用,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06
SpringBoot中controller深層詳細(xì)講解
這篇文章主要介紹了SpringBoot在Controller層接收參數(shù)的常用方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-02-02
Java進(jìn)階教程之運(yùn)行時(shí)類型識(shí)別RTTI機(jī)制
這篇文章主要介紹了Java進(jìn)階教程之運(yùn)行時(shí)類型識(shí)別RTTI機(jī)制,在Java運(yùn)行時(shí),RTTI維護(hù)類的相關(guān)信息,比如多態(tài)(polymorphism)就是基于RTTI實(shí)現(xiàn)的,需要的朋友可以參考下2014-09-09

