Spring調(diào)度框架EnableScheduling&Scheduled源碼解析
前言
在實(shí)際項(xiàng)目開發(fā)中,有時(shí)會(huì)遇到定時(shí)調(diào)度的開發(fā)需要,這部分的功能在Spring框架中給出了較好的支持,即@EnableScheduling&Scheduled定時(shí)調(diào)度框架,本著不僅知其然還要知其所以然的指導(dǎo)思想,下面對(duì)該調(diào)度框架進(jìn)行源碼解析,以便更好的理解其執(zhí)行過(guò)程;
1.開啟調(diào)度框架
Spring框架中,為了開啟調(diào)度框架功能,需要在配置類上標(biāo)注@EnableScheduling注解,這也是Spring中Enable*模式的典型應(yīng)用,下面看一下@EnableScheduling的具體實(shí)現(xiàn):
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { }
這里通過(guò)@Import注解,導(dǎo)入了配置類SchedulingConfiguration,進(jìn)一步看下SchedulingConfiguration配置類的源碼,如下:
@Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
可以看到,這里定義了一個(gè)Bean后處理器ScheduledAnnotationBeanPostProcessor,調(diào)度框架的解析邏輯也是定義在ScheduledAnnotationBeanPostProcessor中的,下面著重對(duì)該部分進(jìn)行具體分析;
2.ScheduledAnnotationBeanPostProcessor Bean后處理器分析
Bean后處理器中,主要分析下后處理器的攔截方法,如下:
@Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
如上,postProcessAfterInitialization方法中,主要對(duì)標(biāo)注@Scheduled和聚合注解@Schedules的類成員方法進(jìn)行處理,主要分為2步:
1)識(shí)別標(biāo)注@Scheduled和聚合注解@Schedules的方法;
2)對(duì)注解方法調(diào)用processScheduled方法進(jìn)行處理;
方法processScheduled處理過(guò)程如下:
/** * Process the given {@code @Scheduled} method declaration on the given bean. * @param scheduled the {@code @Scheduled} annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance * @see #createRunnable(Object, Method) */ protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit()); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit()); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit()); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
上述處理過(guò)程主要包含以下幾步:
1)將調(diào)用目標(biāo)方法的過(guò)程包裝為ScheduledMethodRunnable類
2)構(gòu)造CronTask并進(jìn)行調(diào)度
3)構(gòu)造FixedDelayTask并進(jìn)行調(diào)度
4)構(gòu)造FixedRateTask并進(jìn)行調(diào)度
下面主要說(shuō)明下調(diào)度任務(wù)的類型以及具體的調(diào)度方法;
2.1 調(diào)度框架支持的Task類型
Spring調(diào)度框架中重要支持3種調(diào)度任務(wù)類型(繼承結(jié)構(gòu)如上圖),具體說(shuō)明如下:
1)CronTask:cron表達(dá)式調(diào)度的任務(wù)
2)FixedDelayTask:固定延遲時(shí)間執(zhí)行的任務(wù)
3)FixedRateTask:固定速率執(zhí)行的任務(wù)
2.2 對(duì)Task進(jìn)行調(diào)度執(zhí)行
上述3種的調(diào)度執(zhí)行實(shí)現(xiàn)近似,下面以FixedDelayTask進(jìn)行說(shuō)明,該任務(wù)的調(diào)度方法為scheduleFixedDelayTask,具體實(shí)現(xiàn)如下:
/** * Schedule the specified fixed-delay task, either right away if possible * or on initialization of the scheduler. * @return a handle to the scheduled task, allowing to cancel it * (or {@code null} if processing a previously registered task) * @since 5.0.2 */ @Nullable public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { if (task.getInitialDelay() > 0) { Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay()); scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval()); } else { scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval()); } } else { addFixedDelayTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }
這里主要包含以下幾步:
1)將調(diào)度任務(wù)包裝為ScheduledTask類型,其中封裝了執(zhí)行結(jié)果ScheduledFuture
2)存在任務(wù)調(diào)度器(taskScheduler)時(shí),直接進(jìn)行調(diào)度執(zhí)行
3)不存在任務(wù)調(diào)度器(taskScheduler)時(shí),將任務(wù)暫存到fixedDelayTasks中,待調(diào)用afterPropertiesSet方法時(shí)再進(jìn)行調(diào)度執(zhí)行
3.任務(wù)調(diào)度器
3.1 任務(wù)調(diào)度器獲取
任務(wù)調(diào)度器支持自定義,當(dāng)無(wú)自定義調(diào)度器時(shí),調(diào)度框架提供了默認(rèn)的任務(wù)調(diào)度器;
自定義任務(wù)調(diào)度器的處理邏輯在方法finishRegistration中,如下:
private void finishRegistration() { if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { if (logger.isTraceEnabled()) { logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " + ex.getMessage()); } try { this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { if (logger.isTraceEnabled()) { logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " + ex.getMessage()); } // Search for ScheduledExecutorService bean next... try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { if (logger.isTraceEnabled()) { logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " + ex2.getMessage()); } try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { if (logger.isTraceEnabled()) { logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " + ex2.getMessage()); } // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } this.registrar.afterPropertiesSet(); }
上述獲取任務(wù)調(diào)度器的優(yōu)先級(jí)順序?yàn)椋?/p>
1)當(dāng)Bean后處理器中定義了任務(wù)調(diào)度器時(shí),優(yōu)先取Bean后處理器的任務(wù)調(diào)度器
2)在BeanFactory中獲取Bean類型為SchedulingConfigurer的實(shí)例,在其方法configureTasks中可以自定義任務(wù)調(diào)度器
3)獲取BeanFactory中TaskScheduler類型的bean(如有)
4)獲取BeanFactory中ScheduledExecutorService類型的bean(如有)
5)當(dāng)上述方式獲取的任務(wù)調(diào)度器都不存在時(shí),會(huì)使用框架中默認(rèn)的任務(wù)調(diào)度器,如下:
if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); }
3.2 框架內(nèi)提供的任務(wù)調(diào)度器
框架內(nèi)提供的任務(wù)調(diào)度器主要包括:
1)ConcurrentTaskScheduler
2)ThreadPoolTaskScheduler
繼承結(jié)構(gòu)如下:
3.3 任務(wù)調(diào)度器執(zhí)行邏輯
以上述框架默認(rèn)的ConcurrentTaskScheduler進(jìn)行說(shuō)明,在調(diào)用調(diào)度器方法scheduleWithFixedDelay執(zhí)行時(shí),具體執(zhí)行邏輯為:
@Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) { long initialDelay = startTime.getTime() - this.clock.millis(); try { return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } }
這里主要包含2部分:
1)首先把task任務(wù)包裝為DelegatingErrorHandlingRunnable類型(支持嵌入錯(cuò)誤處理器邏輯),具體是在方法decorateTask中實(shí)現(xiàn)的,如下:
private Runnable decorateTask(Runnable task, boolean isRepeatingTask) { Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); if (this.enterpriseConcurrentScheduler) { result = ManagedTaskBuilder.buildManagedTask(result, task.toString()); } return result; } public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler( Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) { if (task instanceof DelegatingErrorHandlingRunnable) { return (DelegatingErrorHandlingRunnable) task; } ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask)); return new DelegatingErrorHandlingRunnable(task, eh); }
2)調(diào)用線程池方法scheduleWithFixedDelay進(jìn)行調(diào)度執(zhí)行
至此,Spring調(diào)度框架整體的處理過(guò)程總結(jié)如下:
開啟調(diào)度框架(@EnableScheduling)利用bean后處理器識(shí)別@Scheduled注解,并包裝為Task任務(wù)利用任務(wù)調(diào)度器(TaskScheduler,自定義或框架默認(rèn))進(jìn)行調(diào)度執(zhí)行
到此這篇關(guān)于Spring調(diào)度框架EnableScheduling&Scheduled源碼解析的文章就介紹到這了,更多相關(guān)EnableScheduling&Scheduled源碼內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中如何快速構(gòu)建項(xiàng)目腳手架的實(shí)現(xiàn)
這篇文章主要介紹了Java中如何快速構(gòu)建項(xiàng)目腳手架,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05spring boot實(shí)現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可)
這篇文章主要介紹了spring boot實(shí)現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12通過(guò)weblogic API解析如何獲取weblogic中服務(wù)的IP和端口操作
這篇文章主要介紹了通過(guò)weblogic API解析如何獲取weblogic中服務(wù)的IP和端口操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Springboot消除switch-case過(guò)程解析
這篇文章主要介紹了Springboot消除switch-case過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10如何基于SpringWeb?MultipartFile實(shí)現(xiàn)文件上傳、下載功能
在做項(xiàng)目時(shí),后端經(jīng)常采用上傳文件組件MultipartFile,下面這篇文章主要給大家介紹了關(guān)于如何基于SpringWeb?MultipartFile實(shí)現(xiàn)文件上傳、下載功能的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-07-07深入分析JAVA Synchronized關(guān)鍵字
這篇文章主要介紹了析JAVA Synchronized關(guān)鍵字的相關(guān)知識(shí),文中代碼非常詳細(xì),幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-06-06