欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring調(diào)度框架EnableScheduling&Scheduled源碼解析

 更新時(shí)間:2024年01月25日 10:32:45   作者:mumubili  
這篇文章主要介紹了Spring調(diào)度框架EnableScheduling&Scheduled源碼解析,@EnableScheduling&Scheduled定時(shí)調(diào)度框架,本著不僅知其然還要知其所以然的指導(dǎo)思想,下面對(duì)該調(diào)度框架進(jìn)行源碼解析,以便更好的理解其執(zhí)行過(guò)程,需要的朋友可以參考下

前言

在實(shí)際項(xiàng)目開發(fā)中,有時(shí)會(huì)遇到定時(shí)調(diào)度的開發(fā)需要,這部分的功能在Spring框架中給出了較好的支持,即@EnableScheduling&Scheduled定時(shí)調(diào)度框架,本著不僅知其然還要知其所以然的指導(dǎo)思想,下面對(duì)該調(diào)度框架進(jìn)行源碼解析,以便更好的理解其執(zhí)行過(guò)程;

1.開啟調(diào)度框架

Spring框架中,為了開啟調(diào)度框架功能,需要在配置類上標(biāo)注@EnableScheduling注解,這也是Spring中Enable*模式的典型應(yīng)用,下面看一下@EnableScheduling的具體實(shí)現(xiàn):

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}

這里通過(guò)@Import注解,導(dǎo)入了配置類SchedulingConfiguration,進(jìn)一步看下SchedulingConfiguration配置類的源碼,如下:

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
 
	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}
 
}

可以看到,這里定義了一個(gè)Bean后處理器ScheduledAnnotationBeanPostProcessor,調(diào)度框架的解析邏輯也是定義在ScheduledAnnotationBeanPostProcessor中的,下面著重對(duì)該部分進(jìn)行具體分析;

2.ScheduledAnnotationBeanPostProcessor Bean后處理器分析

Bean后處理器中,主要分析下后處理器的攔截方法,如下:

@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) {
		return bean;
	}
	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
				bean instanceof ScheduledExecutorService) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}
		Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
		if (!this.nonAnnotatedClasses.contains(targetClass) &&
				AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
			Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
						Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
								method, Scheduled.class, Schedules.class);
						return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
					});
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(targetClass);
				if (logger.isTraceEnabled()) {
					logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
				}
			}
			else {
				// Non-empty set of methods
				annotatedMethods.forEach((method, scheduledAnnotations) ->
						scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
				if (logger.isTraceEnabled()) {
					logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
							"': " + annotatedMethods);
				}
			}
		}
		return bean;
	}

如上,postProcessAfterInitialization方法中,主要對(duì)標(biāo)注@Scheduled和聚合注解@Schedules的類成員方法進(jìn)行處理,主要分為2步:

1)識(shí)別標(biāo)注@Scheduled和聚合注解@Schedules的方法;

2)對(duì)注解方法調(diào)用processScheduled方法進(jìn)行處理;

方法processScheduled處理過(guò)程如下:

/**
	 * Process the given {@code @Scheduled} method declaration on the given bean.
	 * @param scheduled the {@code @Scheduled} annotation
	 * @param method the method that the annotation has been declared on
	 * @param bean the target bean instance
	 * @see #createRunnable(Object, Method)
	 */
	protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
		try {
			Runnable runnable = createRunnable(bean, method);
			boolean processedSchedule = false;
			String errorMessage =
					"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
			Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
			// Determine initial delay
			long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
			String initialDelayString = scheduled.initialDelayString();
			if (StringUtils.hasText(initialDelayString)) {
				Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
				if (this.embeddedValueResolver != null) {
					initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
				}
				if (StringUtils.hasLength(initialDelayString)) {
					try {
						initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
					}
				}
			}
			// Check cron expression
			String cron = scheduled.cron();
			if (StringUtils.hasText(cron)) {
				String zone = scheduled.zone();
				if (this.embeddedValueResolver != null) {
					cron = this.embeddedValueResolver.resolveStringValue(cron);
					zone = this.embeddedValueResolver.resolveStringValue(zone);
				}
				if (StringUtils.hasLength(cron)) {
					Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
					processedSchedule = true;
					if (!Scheduled.CRON_DISABLED.equals(cron)) {
						TimeZone timeZone;
						if (StringUtils.hasText(zone)) {
							timeZone = StringUtils.parseTimeZoneString(zone);
						}
						else {
							timeZone = TimeZone.getDefault();
						}
						tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
					}
				}
			}
			// At this point we don't need to differentiate between initial delay set or not anymore
			if (initialDelay < 0) {
				initialDelay = 0;
			}
			// Check fixed delay
			long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
			if (fixedDelay >= 0) {
				Assert.isTrue(!processedSchedule, errorMessage);
				processedSchedule = true;
				tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
			}
			String fixedDelayString = scheduled.fixedDelayString();
			if (StringUtils.hasText(fixedDelayString)) {
				if (this.embeddedValueResolver != null) {
					fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
				}
				if (StringUtils.hasLength(fixedDelayString)) {
					Assert.isTrue(!processedSchedule, errorMessage);
					processedSchedule = true;
					try {
						fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
					}
					tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
				}
			}
			// Check fixed rate
			long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
			if (fixedRate >= 0) {
				Assert.isTrue(!processedSchedule, errorMessage);
				processedSchedule = true;
				tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
			}
			String fixedRateString = scheduled.fixedRateString();
			if (StringUtils.hasText(fixedRateString)) {
				if (this.embeddedValueResolver != null) {
					fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
				}
				if (StringUtils.hasLength(fixedRateString)) {
					Assert.isTrue(!processedSchedule, errorMessage);
					processedSchedule = true;
					try {
						fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
					}
					tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
				}
			}
			// Check whether we had any attribute set
			Assert.isTrue(processedSchedule, errorMessage);
			// Finally register the scheduled tasks
			synchronized (this.scheduledTasks) {
				Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
				regTasks.addAll(tasks);
			}
		}
		catch (IllegalArgumentException ex) {
			throw new IllegalStateException(
					"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
		}
	}

上述處理過(guò)程主要包含以下幾步:

1)將調(diào)用目標(biāo)方法的過(guò)程包裝為ScheduledMethodRunnable類

2)構(gòu)造CronTask并進(jìn)行調(diào)度

3)構(gòu)造FixedDelayTask并進(jìn)行調(diào)度

4)構(gòu)造FixedRateTask并進(jìn)行調(diào)度

下面主要說(shuō)明下調(diào)度任務(wù)的類型以及具體的調(diào)度方法;

2.1 調(diào)度框架支持的Task類型

 Spring調(diào)度框架中重要支持3種調(diào)度任務(wù)類型(繼承結(jié)構(gòu)如上圖),具體說(shuō)明如下:

1)CronTask:cron表達(dá)式調(diào)度的任務(wù)

2)FixedDelayTask:固定延遲時(shí)間執(zhí)行的任務(wù)

3)FixedRateTask:固定速率執(zhí)行的任務(wù)

2.2 對(duì)Task進(jìn)行調(diào)度執(zhí)行

上述3種的調(diào)度執(zhí)行實(shí)現(xiàn)近似,下面以FixedDelayTask進(jìn)行說(shuō)明,該任務(wù)的調(diào)度方法為scheduleFixedDelayTask,具體實(shí)現(xiàn)如下:

/**
	 * Schedule the specified fixed-delay task, either right away if possible
	 * or on initialization of the scheduler.
	 * @return a handle to the scheduled task, allowing to cancel it
	 * (or {@code null} if processing a previously registered task)
	 * @since 5.0.2
	 */
	@Nullable
	public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
		ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
		boolean newTask = false;
		if (scheduledTask == null) {
			scheduledTask = new ScheduledTask(task);
			newTask = true;
		}
		if (this.taskScheduler != null) {
			if (task.getInitialDelay() > 0) {
				Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());
				scheduledTask.future =
						this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
			}
			else {
				scheduledTask.future =
						this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());
			}
		}
		else {
			addFixedDelayTask(task);
			this.unresolvedTasks.put(task, scheduledTask);
		}
		return (newTask ? scheduledTask : null);
	}

這里主要包含以下幾步:

1)將調(diào)度任務(wù)包裝為ScheduledTask類型,其中封裝了執(zhí)行結(jié)果ScheduledFuture

2)存在任務(wù)調(diào)度器(taskScheduler)時(shí),直接進(jìn)行調(diào)度執(zhí)行

3)不存在任務(wù)調(diào)度器(taskScheduler)時(shí),將任務(wù)暫存到fixedDelayTasks中,待調(diào)用afterPropertiesSet方法時(shí)再進(jìn)行調(diào)度執(zhí)行

3.任務(wù)調(diào)度器

3.1 任務(wù)調(diào)度器獲取

任務(wù)調(diào)度器支持自定義,當(dāng)無(wú)自定義調(diào)度器時(shí),調(diào)度框架提供了默認(rèn)的任務(wù)調(diào)度器;

自定義任務(wù)調(diào)度器的處理邏輯在方法finishRegistration中,如下:

private void finishRegistration() {
		if (this.scheduler != null) {
			this.registrar.setScheduler(this.scheduler);
		}
		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, SchedulingConfigurer> beans =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
			List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
			AnnotationAwareOrderComparator.sort(configurers);
			for (SchedulingConfigurer configurer : configurers) {
				configurer.configureTasks(this.registrar);
			}
		}
		if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
			try {
				// Search for TaskScheduler bean...
				this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
			}
			catch (NoUniqueBeanDefinitionException ex) {
				if (logger.isTraceEnabled()) {
					logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
							ex.getMessage());
				}
				try {
					this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
				}
				catch (NoSuchBeanDefinitionException ex2) {
					if (logger.isInfoEnabled()) {
						logger.info("More than one TaskScheduler bean exists within the context, and " +
								"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
								"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
								"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
								ex.getBeanNamesFound());
					}
				}
			}
			catch (NoSuchBeanDefinitionException ex) {
				if (logger.isTraceEnabled()) {
					logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
							ex.getMessage());
				}
				// Search for ScheduledExecutorService bean next...
				try {
					this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
				}
				catch (NoUniqueBeanDefinitionException ex2) {
					if (logger.isTraceEnabled()) {
						logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
								ex2.getMessage());
					}
					try {
						this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
					}
					catch (NoSuchBeanDefinitionException ex3) {
						if (logger.isInfoEnabled()) {
							logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
									"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
									"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
									"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
									ex2.getBeanNamesFound());
						}
					}
				}
				catch (NoSuchBeanDefinitionException ex2) {
					if (logger.isTraceEnabled()) {
						logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
								ex2.getMessage());
					}
					// Giving up -> falling back to default scheduler within the registrar...
					logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
				}
			}
		}
		this.registrar.afterPropertiesSet();
	}

上述獲取任務(wù)調(diào)度器的優(yōu)先級(jí)順序?yàn)椋?/p>

1)當(dāng)Bean后處理器中定義了任務(wù)調(diào)度器時(shí),優(yōu)先取Bean后處理器的任務(wù)調(diào)度器

2)在BeanFactory中獲取Bean類型為SchedulingConfigurer的實(shí)例,在其方法configureTasks中可以自定義任務(wù)調(diào)度器

3)獲取BeanFactory中TaskScheduler類型的bean(如有)

4)獲取BeanFactory中ScheduledExecutorService類型的bean(如有)

5)當(dāng)上述方式獲取的任務(wù)調(diào)度器都不存在時(shí),會(huì)使用框架中默認(rèn)的任務(wù)調(diào)度器,如下:

if (this.taskScheduler == null) {
	this.localExecutor = Executors.newSingleThreadScheduledExecutor();
	this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}

3.2 框架內(nèi)提供的任務(wù)調(diào)度器

框架內(nèi)提供的任務(wù)調(diào)度器主要包括:

1)ConcurrentTaskScheduler

2)ThreadPoolTaskScheduler

繼承結(jié)構(gòu)如下:

3.3 任務(wù)調(diào)度器執(zhí)行邏輯 

以上述框架默認(rèn)的ConcurrentTaskScheduler進(jìn)行說(shuō)明,在調(diào)用調(diào)度器方法scheduleWithFixedDelay執(zhí)行時(shí),具體執(zhí)行邏輯為:

@Override
	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
		long initialDelay = startTime.getTime() - this.clock.millis();
		try {
			return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
		}
	}

這里主要包含2部分:

1)首先把task任務(wù)包裝為DelegatingErrorHandlingRunnable類型(支持嵌入錯(cuò)誤處理器邏輯),具體是在方法decorateTask中實(shí)現(xiàn)的,如下:

private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {
		Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
		if (this.enterpriseConcurrentScheduler) {
			result = ManagedTaskBuilder.buildManagedTask(result, task.toString());
		}
		return result;
	}
	public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler(
			Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) {
		if (task instanceof DelegatingErrorHandlingRunnable) {
			return (DelegatingErrorHandlingRunnable) task;
		}
		ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask));
		return new DelegatingErrorHandlingRunnable(task, eh);
	}

2)調(diào)用線程池方法scheduleWithFixedDelay進(jìn)行調(diào)度執(zhí)行

至此,Spring調(diào)度框架整體的處理過(guò)程總結(jié)如下:

開啟調(diào)度框架(@EnableScheduling)利用bean后處理器識(shí)別@Scheduled注解,并包裝為Task任務(wù)利用任務(wù)調(diào)度器(TaskScheduler,自定義或框架默認(rèn))進(jìn)行調(diào)度執(zhí)行

到此這篇關(guān)于Spring調(diào)度框架EnableScheduling&amp;Scheduled源碼解析的文章就介紹到這了,更多相關(guān)EnableScheduling&amp;Scheduled源碼內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論