Spring5源碼解析之Spring中的異步和計(jì)劃任務(wù)
Java提供了許多創(chuàng)建線程池的方式,并得到一個(gè)Future實(shí)例來作為任務(wù)結(jié)果。對于Spring同樣小菜一碟,通過其scheduling包就可以做到將任務(wù)線程中后臺執(zhí)行。
在本文的第一部分中,我們將討論下Spring中執(zhí)行計(jì)劃任務(wù)的一些基礎(chǔ)知識。之后,我們將解釋這些類是如何一起協(xié)作來啟動(dòng)并執(zhí)行計(jì)劃任務(wù)的。下一部分將介紹計(jì)劃和異步任務(wù)的配置。最后,我們來寫個(gè)Demo,看看如何通過單元測試來編排計(jì)劃任務(wù)。
什么是Spring中的異步任務(wù)?
在我們正式的進(jìn)入話題之前,對于Spring,我們需要理解下它實(shí)現(xiàn)的兩個(gè)不同的概念:異步任務(wù)和調(diào)度任務(wù)。顯然,兩者有一個(gè)很大的共同點(diǎn):都在后臺工作。但是,它們之間存在了很大差異。調(diào)度任務(wù)與異步不同,其作用與Linux中的CRON job完全相同(windows里面也有計(jì)劃任務(wù))。舉個(gè)栗子,有一個(gè)任務(wù)必須每40分鐘執(zhí)行一次,那么,可以通過XML文件或者注解來進(jìn)行此配置。簡單的異步任務(wù)在后臺執(zhí)行就好,無需配置執(zhí)行頻率。
因?yàn)樗鼈兪莾煞N不同的任務(wù)類型,它們兩個(gè)的執(zhí)行者自然也就不同。第一個(gè)看起來有點(diǎn)像Java的并發(fā)執(zhí)行器(concurrency executor),這里會(huì)有專門去寫一篇關(guān)于Java中的執(zhí)行器來具體了解。根據(jù)Spring文檔TaskExecutor所述,它提供了基于Spring的抽象來處理線程池,這點(diǎn),也可以通過其類的注釋去了解。另一個(gè)抽象接口是TaskScheduler,它用于在將來給定的時(shí)間點(diǎn)來安排任務(wù),并執(zhí)行一次或定期執(zhí)行。
在分析源碼的過程中,發(fā)現(xiàn)另一個(gè)比較有趣的點(diǎn)是觸發(fā)器。它存在兩種類型:CronTrigger或PeriodTrigger。第一個(gè)模擬CRON任務(wù)的行為。所以我們可以在將來確切時(shí)間點(diǎn)提交一個(gè)任務(wù)的執(zhí)行。另一個(gè)觸發(fā)器可用于定期執(zhí)行任務(wù)。
Spring的異步任務(wù)類
讓我們從org.springframework.core.task.TaskExecutor類的分析開始。你會(huì)發(fā)現(xiàn),其簡單的不行,它是一個(gè)擴(kuò)展Java的Executor接口的接口。它的唯一方法也就是是執(zhí)行,在參數(shù)中使用Runnable類型的任務(wù)。
package org.springframework.core.task; import java.util.concurrent.Executor; /** * Simple task executor interface that abstracts the execution * of a {@link Runnable}. * * <p>Implementations can use all sorts of different execution strategies, * such as: synchronous, asynchronous, using a thread pool, and more. * * <p>Equivalent to JDK 1.5's {@link java.util.concurrent.Executor} * interface; extending it now in Spring 3.0, so that clients may declare * a dependency on an Executor and receive any TaskExecutor implementation. * This interface remains separate from the standard Executor interface * mainly for backwards compatibility with JDK 1.4 in Spring 2.x. * * @author Juergen Hoeller * @since 2.0 * @see java.util.concurrent.Executor */ @FunctionalInterface public interface TaskExecutor extends Executor { /** * Execute the given {@code task}. * <p>The call might return immediately if the implementation uses * an asynchronous execution strategy, or might block in the case * of synchronous execution. * @param task the {@code Runnable} to execute (never {@code null}) * @throws TaskRejectedException if the given task was not accepted */ @Override void execute(Runnable task); }
相對來說,org.springframework.scheduling.TaskScheduler接口就有點(diǎn)復(fù)雜了。它定義了一組以schedule開頭的名稱的方法允許我們定義將來要執(zhí)行的任務(wù)。所有 schedule* 方法返回java.util.concurrent.ScheduledFuture實(shí)例。Spring5中對scheduleAtFixedRate方法做了進(jìn)一步的充實(shí),其實(shí)最終調(diào)用的還是ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
public interface TaskScheduler { /** * Schedule the given {@link Runnable}, invoking it whenever the trigger * indicates a next execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link Trigger} interface, * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object * wrapping a cron expression * @return a {@link ScheduledFuture} representing pending completion of the task, * or {@code null} if the given Trigger object never fires (i.e. returns * {@code null} from {@link Trigger#nextExecutionTime}) * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.CronTrigger */ @Nullable ScheduledFuture<?> schedule(Runnable task, Trigger trigger); /** * Schedule the given {@link Runnable}, invoking it at the specified execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * 使用了默認(rèn)實(shí)現(xiàn),值得我們學(xué)習(xí)使用的,Java9中同樣可以有私有實(shí)現(xiàn)的,從這里我們可以做到我只通過 * 一個(gè)接口你來實(shí)現(xiàn),我把其他相應(yīng)的功能默認(rèn)實(shí)現(xiàn)下,最后調(diào)用你自定義實(shí)現(xiàn)的接口即可,使接口功能更 * 加一目了然 * @since 5.0 * @see #schedule(Runnable, Date) */ default ScheduledFuture<?> schedule(Runnable task, Instant startTime) { return schedule(task, Date.from(startTime)); } /** * Schedule the given {@link Runnable}, invoking it at the specified execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) */ ScheduledFuture<?> schedule(Runnable task, Date startTime); ... /** * Schedule the given {@link Runnable}, invoking it at the specified execution time * and subsequently with the given period. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired first execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @param period the interval between successive executions of the task * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @since 5.0 * @see #scheduleAtFixedRate(Runnable, Date, long) */ default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis()); } /** * Schedule the given {@link Runnable}, invoking it at the specified execution time * and subsequently with the given period. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired first execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @param period the interval between successive executions of the task (in milliseconds) * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) */ ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period); ... }
之前提到兩個(gè)觸發(fā)器組件,都實(shí)現(xiàn)了org.springframework.scheduling.Trigger接口。這里,我們只需關(guān)注一個(gè)的方法nextExecutionTime ,其定義下一個(gè)觸發(fā)任務(wù)的執(zhí)行時(shí)間。它的兩個(gè)實(shí)現(xiàn),CronTrigger和PeriodicTrigger,由org.springframework.scheduling.TriggerContext來實(shí)現(xiàn)信息的存儲,由此,我們可以很輕松獲得一個(gè)任務(wù)的最后一個(gè)執(zhí)行時(shí)間(lastScheduledExecutionTime),給定任務(wù)的最后完成時(shí)間(lastCompletionTime)或最后一個(gè)實(shí)際執(zhí)行時(shí)間(lastActualExecutionTime)。接下來,我們通過閱讀源代碼來簡單的了解下這些東西。org.springframework.scheduling.concurrent.ConcurrentTaskScheduler包含一個(gè)私有類EnterpriseConcurrentTriggerScheduler。在這個(gè)class里面,我們可以找到schedule方法:
public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) { ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor; return executor.schedule(task, new javax.enterprise.concurrent.Trigger() { @Override public Date getNextRunTime(LastExecution le, Date taskScheduledTime) { return trigger.nextExecutionTime(le != null ? new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) : new SimpleTriggerContext()); } @Override public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) { return false; } }); }
SimpleTriggerContext從其名字就可以看到很多東西了,因?yàn)樗鼘?shí)現(xiàn)了TriggerContext接口。
/** * Simple data holder implementation of the {@link TriggerContext} interface. * * @author Juergen Hoeller * @since 3.0 */ public class SimpleTriggerContext implements TriggerContext { @Nullable private volatile Date lastScheduledExecutionTime; @Nullable private volatile Date lastActualExecutionTime; @Nullable private volatile Date lastCompletionTime; ... /** * Create a SimpleTriggerContext with the given time values. * @param lastScheduledExecutionTime last <i>scheduled</i> execution time * @param lastActualExecutionTime last <i>actual</i> execution time * @param lastCompletionTime last completion time */ public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) { this.lastScheduledExecutionTime = lastScheduledExecutionTime; this.lastActualExecutionTime = lastActualExecutionTime; this.lastCompletionTime = lastCompletionTime; } ... }
也正如你看到的,在構(gòu)造函數(shù)中設(shè)置的時(shí)間值來自javax.enterprise.concurrent.LastExecution的實(shí)現(xiàn),其中:
- getScheduledStart:返回上次開始執(zhí)行任務(wù)的時(shí)間。它對應(yīng)于TriggerContext的lastScheduledExecutionTime屬性。
- getRunStart:返回給定任務(wù)開始運(yùn)行的時(shí)間。在TriggerContext中,它對應(yīng)于lastActualExecutionTime。
- getRunEnd:任務(wù)終止時(shí)返回。它用于在TriggerContext中設(shè)置lastCompletionTime。
Spring調(diào)度和異步執(zhí)行中的另一個(gè)重要類是org.springframework.core.task.support.TaskExecutorAdapter。它是一個(gè)將java.util.concurrent.Executor作為Spring基本的執(zhí)行器的適配器(描述的有點(diǎn)拗口,看下面代碼就明了了),之前已經(jīng)描述了TaskExecutor。實(shí)際上,它引用了Java的ExecutorService,它也是繼承了Executor接口。此引用用于完成所有提交的任務(wù)。
/** * Adapter that takes a JDK {@code java.util.concurrent.Executor} and * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it. * Also detects an extended {@code java.util.concurrent.ExecutorService 從此解釋上面的說明}, adapting * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. * * @author Juergen Hoeller * @since 3.0 * @see java.util.concurrent.Executor * @see java.util.concurrent.ExecutorService * @see java.util.concurrent.Executors */ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { private final Executor concurrentExecutor; @Nullable private TaskDecorator taskDecorator; ... /** * Create a new TaskExecutorAdapter, * using the given JDK concurrent executor. * @param concurrentExecutor the JDK concurrent executor to delegate to */ public TaskExecutorAdapter(Executor concurrentExecutor) { Assert.notNull(concurrentExecutor, "Executor must not be null"); this.concurrentExecutor = concurrentExecutor; } /** * Delegates to the specified JDK concurrent executor. * @see java.util.concurrent.Executor#execute(Runnable) */ @Override public void execute(Runnable task) { try { doExecute(this.concurrentExecutor, this.taskDecorator, task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException( "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); } } @Override public void execute(Runnable task, long startTimeout) { execute(task); } @Override public Future<?> submit(Runnable task) { try { if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { return ((ExecutorService) this.concurrentExecutor).submit(task); } else { FutureTask<Object> future = new FutureTask<>(task, null); doExecute(this.concurrentExecutor, this.taskDecorator, future); return future; } } catch (RejectedExecutionException ex) { throw new TaskRejectedException( "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); } } ... }
在Spring中配置異步和計(jì)劃任務(wù)
下面我們通過代碼的方式來實(shí)現(xiàn)異步任務(wù)。首先,我們需要通過注解來啟用配置。它的XML配置如下:
<task:scheduler id="taskScheduler"/> <task:executor id="taskExecutor" pool-size="2" /> <task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/> <context:component-scan base-package="com.migo.async"/>
可以通過將@EnableScheduling和@EnableAsync注解添加到配置類(用@Configuration注解)來激活兩者。完事,我們就可以開始著手實(shí)現(xiàn)調(diào)度和異步任務(wù)。為了實(shí)現(xiàn)調(diào)度任務(wù),我們可以使用@Scheduled注解。我們可以從org.springframework.scheduling.annotation包中找到這個(gè)注解。它包含了以下幾個(gè)屬性:
- cron:使用CRON風(fēng)格(Linux配置定時(shí)任務(wù)的風(fēng)格)的配置來配置需要啟動(dòng)的帶注解的任務(wù)。
- zone:要解析CRON表達(dá)式的時(shí)區(qū)。
- fixedDelay或fixedDelayString:在固定延遲時(shí)間后執(zhí)行任務(wù)。即任務(wù)將在最后一次調(diào)用結(jié)束和下一次調(diào)用的開始之間的這個(gè)固定時(shí)間段后執(zhí)行。
- fixedRate或fixedRateString:使用fixedRate注解的方法的調(diào)用將以固定的時(shí)間段(例如:每10秒鐘)進(jìn)行,與執(zhí)行生命周期(開始,結(jié)束)無關(guān)。
- initialDelay或initialDelayString:延遲首次執(zhí)行調(diào)度方法的時(shí)間。請注意,所有值(fixedDelay ,fixedRate ,initialDelay )必須以毫秒表示。 需要特別記住的是 ,用@Scheduled注解的方法不能接受任何參數(shù),并且不返回任何內(nèi)容(void),如果有返回值,返回值也會(huì)被忽略掉的,沒什么卵用。定時(shí)任務(wù)方法由容器管理,而不是由調(diào)用者在運(yùn)行時(shí)調(diào)用。它們由 org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor來解析,其中包含以下方法來拒絕執(zhí)行所有不正確定義的函數(shù):
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); /** * 之前的版本中直接把返回值非空的給拒掉了,在Spring 4.3 Spring5 的版本中就沒那么嚴(yán)格了 * Assert.isTrue(void.class.equals(method.getReturnType()), * "Only void-returning methods may be annotated with @Scheduled"); **/ // ...
/** * 注釋很重要 * An annotation that marks a method to be scheduled. Exactly one of * the {@link #cron()}, {@link #fixedDelay()}, or {@link #fixedRate()} * attributes must be specified. * * <p>The annotated method must expect no arguments. It will typically have * a {@code void} return type; if not, the returned value will be ignored * when called through the scheduler. * * <p>Processing of {@code @Scheduled} annotations is performed by * registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be * done manually or, more conveniently, through the {@code <task:annotation-driven/>} * element or @{@link EnableScheduling} annotation. * * <p>This annotation may be used as a <em>meta-annotation</em> to create custom * <em>composed annotations</em> with attribute overrides. * * @author Mark Fisher * @author Dave Syer * @author Chris Beams * @since 3.0 * @see EnableScheduling * @see ScheduledAnnotationBeanPostProcessor * @see Schedules */ @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(Schedules.class) public @interface Scheduled { ... }
使用@Async注解標(biāo)記一個(gè)方法或一個(gè)類(通過標(biāo)記一個(gè)類,我們自動(dòng)將其所有方法標(biāo)記為異步)。與@Scheduled不同,異步任務(wù)可以接受參數(shù),并可能返回某些東西。
寫一個(gè)在Spring中執(zhí)行異步任務(wù)的Demo
有了上面這些知識,我們可以來編寫異步和計(jì)劃任務(wù)。我們將通過測試用例來展示。我們從不同的任務(wù)執(zhí)行器(task executors)的測試開始:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:applicationContext-test.xml"}) @WebAppConfiguration public class TaskExecutorsTest { @Test public void simpeAsync() throws InterruptedException { /** * SimpleAsyncTaskExecutor creates new Thread for every task and executes it asynchronously. The threads aren't reused as in * native Java's thread pools. * * The number of concurrently executed threads can be specified through concurrencyLimit bean property * (concurrencyLimit XML attribute). Here it's more simple to invoke setConcurrencyLimit method. * Here the tasks will be executed by 2 simultaneous threads. Without specifying this value, * the number of executed threads will be indefinite. * * You can observe that only 2 tasks are executed at a given time - even if 3 are submitted to execution (lines 40-42). **/ SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("thread_name_prefix_____"); executor.setConcurrencyLimit(2); executor.execute(new SimpleTask("SimpleAsyncTask-1", Counters.simpleAsyncTask, 1000)); executor.execute(new SimpleTask("SimpleAsyncTask-2", Counters.simpleAsyncTask, 1000)); Thread.sleep(1050); assertTrue("2 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 2); executor.execute(new SimpleTask("SimpleAsyncTask-3", Counters.simpleAsyncTask, 1000)); executor.execute(new SimpleTask("SimpleAsyncTask-4", Counters.simpleAsyncTask, 1000)); executor.execute(new SimpleTask("SimpleAsyncTask-5", Counters.simpleAsyncTask, 2000)); Thread.sleep(1050); assertTrue("4 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 4); executor.execute(new SimpleTask("SimpleAsyncTask-6", Counters.simpleAsyncTask, 1000)); Thread.sleep(1050); assertTrue("6 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 6); } @Test public void syncTaskTest() { /** * SyncTask works almost as Java's CountDownLatch. In fact, this executor is synchronous with the calling thread. In our case, * SyncTaskExecutor tasks will be synchronous with JUnit thread. It means that the testing thread will sleep 5 * seconds after executing the third task ('SyncTask-3'). To prove that, we check if the total execution time is ~5 seconds. **/ long start = System.currentTimeMillis(); SyncTaskExecutor executor = new SyncTaskExecutor(); executor.execute(new SimpleTask("SyncTask-1", Counters.syncTask, 0)); executor.execute(new SimpleTask("SyncTask-2", Counters.syncTask, 0)); executor.execute(new SimpleTask("SyncTask-3", Counters.syncTask, 0)); executor.execute(new SimpleTask("SyncTask-4", Counters.syncTask, 5000)); executor.execute(new SimpleTask("SyncTask-5", Counters.syncTask, 0)); long end = System.currentTimeMillis(); int execTime = Math.round((end-start)/1000); assertTrue("Execution time should be 5 seconds but was "+execTime+" seconds", execTime == 5); } @Test public void threadPoolTest() throws InterruptedException { /** * This executor can be used to expose Java's native ThreadPoolExecutor as Spring bean, with the * possibility to set core pool size, max pool size and queue capacity through bean properties. * * It works exactly as ThreadPoolExecutor from java.util.concurrent package. It means that our pool starts * with 2 threads (core pool size) and can be growth until 3 (max pool size). * In additionally, 1 task can be stored in the queue. This task will be treated * as soon as one from 3 threads ends to execute provided task. In our case, we try to execute 5 tasks * in 3 places pool and 1 place queue. So the 5th task should be rejected and TaskRejectedException should be thrown. **/ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(3); executor.setQueueCapacity(1); executor.initialize(); executor.execute(new SimpleTask("ThreadPoolTask-1", Counters.threadPool, 1000)); executor.execute(new SimpleTask("ThreadPoolTask-2", Counters.threadPool, 1000)); executor.execute(new SimpleTask("ThreadPoolTask-3", Counters.threadPool, 1000)); executor.execute(new SimpleTask("ThreadPoolTask-4", Counters.threadPool, 1000)); boolean wasTre = false; try { executor.execute(new SimpleTask("ThreadPoolTask-5", Counters.threadPool, 1000)); } catch (TaskRejectedException tre) { wasTre = true; } assertTrue("The last task should throw a TaskRejectedException but it wasn't", wasTre); Thread.sleep(3000); assertTrue("4 tasks should be terminated, but "+Counters.threadPool.getNb()+" were instead", Counters.threadPool.getNb()==4); } } class SimpleTask implements Runnable { private String name; private Counters counter; private int sleepTime; public SimpleTask(String name, Counters counter, int sleepTime) { this.name = name; this.counter = counter; this.sleepTime = sleepTime; } @Override public void run() { try { Thread.sleep(this.sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } this.counter.increment(); System.out.println("Running task '"+this.name+"' in Thread "+Thread.currentThread().getName()); } @Override public String toString() { return "Task {"+this.name+"}"; } } enum Counters { simpleAsyncTask(0), syncTask(0), threadPool(0); private int nb; public int getNb() { return this.nb; } public synchronized void increment() { this.nb++; } private Counters(int n) { this.nb = n; } }
在過去,我們可以有更多的執(zhí)行器可以使用(SimpleThreadPoolTaskExecutor,TimerTaskExecutor 這些都2.x 3.x的老古董了)。但都被棄用并由本地Java的執(zhí)行器取代成為Spring的首選??纯摧敵龅慕Y(jié)果:
Running task 'SimpleAsyncTask-1' in Thread thread_name_prefix_____1 Running task 'SimpleAsyncTask-2' in Thread thread_name_prefix_____2 Running task 'SimpleAsyncTask-3' in Thread thread_name_prefix_____3 Running task 'SimpleAsyncTask-4' in Thread thread_name_prefix_____4 Running task 'SimpleAsyncTask-5' in Thread thread_name_prefix_____5 Running task 'SimpleAsyncTask-6' in Thread thread_name_prefix_____6 Running task 'SyncTask-1' in Thread main Running task 'SyncTask-2' in Thread main Running task 'SyncTask-3' in Thread main Running task 'SyncTask-4' in Thread main Running task 'SyncTask-5' in Thread main Running task 'ThreadPoolTask-2' in Thread ThreadPoolTaskExecutor-2 Running task 'ThreadPoolTask-1' in Thread ThreadPoolTaskExecutor-1 Running task 'ThreadPoolTask-4' in Thread ThreadPoolTaskExecutor-3 Running task 'ThreadPoolTask-3' in Thread ThreadPoolTaskExecutor-2
以此我們可以推斷出,第一個(gè)測試為每個(gè)任務(wù)創(chuàng)建新的線程。通過使用不同的線程名稱,我們可以看到相應(yīng)區(qū)別。第二個(gè),同步執(zhí)行器,應(yīng)該執(zhí)行所調(diào)用線程中的任務(wù)。這里可以看到'main'是主線程的名稱,它的主線程調(diào)用執(zhí)行同步所有任務(wù)。最后一種例子涉及最大可創(chuàng)建3個(gè)線程的線程池。從結(jié)果可以看到,他們也確實(shí)只有3個(gè)創(chuàng)建線程。
現(xiàn)在,我們將編寫一些單元測試來看看@Async和@Scheduled實(shí)現(xiàn)。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:applicationContext-test.xml"}) @WebAppConfiguration public class AnnotationTest { @Autowired private GenericApplicationContext context; @Test public void testScheduled() throws InterruptedException { System.out.println("Start sleeping"); Thread.sleep(6000); System.out.println("Wake up !"); TestScheduledTask scheduledTask = (TestScheduledTask) context.getBean("testScheduledTask"); /** * Test fixed delay. It's executed every 6 seconds. The first execution is registered after application's context start. **/ assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedDelayCounter(), scheduledTask.getFixedDelayCounter() == 2); /** * Test fixed rate. It's executed every 6 seconds. The first execution is registered after application's context start. * Unlike fixed delay, a fixed rate configuration executes one task with specified time. For example, it will execute on * 6 seconds delayed task at 10:30:30, 10:30:36, 10:30:42 and so on - even if the task 10:30:30 taken 30 seconds to * be terminated. In teh case of fixed delay, if the first task takes 30 seconds, the next will be executed 6 seconds * after the first one, so the execution flow will be: 10:30:30, 10:31:06, 10:31:12. **/ assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedRateCounter(), scheduledTask.getFixedRateCounter() == 2); /** * Test fixed rate with initial delay attribute. The initialDelay attribute is set to 6 seconds. It causes that * scheduled method is executed 6 seconds after application's context start. In our case, it should be executed * only once because of previous Thread.sleep(6000) invocation. **/ assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getInitialDelayCounter(), scheduledTask.getInitialDelayCounter() == 1); /** * Test cron scheduled task. Cron is scheduled to be executed every 6 seconds. It's executed only once, * so we can deduce that it's not invoked directly before applications * context start, but only after configured time (6 seconds in our case). **/ assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getCronCounter(), scheduledTask.getCronCounter() == 1); } @Test public void testAsyc() throws InterruptedException { /** * To test @Async annotation, we can create a bean in-the-fly. AsyncCounter bean is a * simple counter which value should be equals to 2 at the end of the test. A supplementary test * concerns threads which execute both of AsyncCounter methods: one which * isn't annotated with @Async and another one which is annotated with it. For the first one, invoking * thread should have the same name as the main thread. For annotated method, it can't be executed in * the main thread. It must be executed asynchrounously in a new thread. **/ context.registerBeanDefinition("asyncCounter", new RootBeanDefinition(AsyncCounter.class)); String currentName = Thread.currentThread().getName(); AsyncCounter asyncCounter = context.getBean("asyncCounter", AsyncCounter.class); asyncCounter.incrementNormal(); assertTrue("Thread executing normal increment should be the same as JUnit thread but it wasn't (expected '"+currentName+"', got '"+asyncCounter.getNormalThreadName()+"')", asyncCounter.getNormalThreadName().equals(currentName)); asyncCounter.incrementAsync(); // sleep 50ms and give some time to AsyncCounter to update asyncThreadName value Thread.sleep(50); assertFalse("Thread executing @Async increment shouldn't be the same as JUnit thread but it wasn (JUnit thread '"+currentName+"', @Async thread '"+asyncCounter.getAsyncThreadName()+"')", asyncCounter.getAsyncThreadName().equals(currentName)); System.out.println("Main thread execution's name: "+currentName); System.out.println("AsyncCounter normal increment thread execution's name: "+asyncCounter.getNormalThreadName()); System.out.println("AsyncCounter @Async increment thread execution's name: "+asyncCounter.getAsyncThreadName()); assertTrue("Counter should be 2, but was "+asyncCounter.getCounter(), asyncCounter.getCounter()==2); } } class AsyncCounter { private int counter = 0; private String normalThreadName; private String asyncThreadName; public void incrementNormal() { normalThreadName = Thread.currentThread().getName(); this.counter++; } @Async public void incrementAsync() { asyncThreadName = Thread.currentThread().getName(); this.counter++; } public String getAsyncThreadName() { return asyncThreadName; } public String getNormalThreadName() { return normalThreadName; } public int getCounter() { return this.counter; } }
另外,我們需要?jiǎng)?chuàng)建新的配置文件和一個(gè)包含定時(shí)任務(wù)方法的類:
<!-- imported configuration file first --> <!-- Activates various annotations to be detected in bean classes --> <context:annotation-config /> <!-- Scans the classpath for annotated components that will be auto-registered as Spring beans. For example @Controller and @Service. Make sure to set the correct base-package--> <context:component-scan base-package="com.migo.test.schedulers" /> <task:scheduler id="taskScheduler"/> <task:executor id="taskExecutor" pool-size="40" /> <task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
// scheduled methods after, all are executed every 6 seconds (scheduledAtFixedRate and scheduledAtFixedDelay start to execute at // application context start, two other methods begin 6 seconds after application's context start) @Component public class TestScheduledTask { private int fixedRateCounter = 0; private int fixedDelayCounter = 0; private int initialDelayCounter = 0; private int cronCounter = 0; @Scheduled(fixedRate = 6000) public void scheduledAtFixedRate() { System.out.println("<R> Increment at fixed rate"); fixedRateCounter++; } @Scheduled(fixedDelay = 6000) public void scheduledAtFixedDelay() { System.out.println("<D> Incrementing at fixed delay"); fixedDelayCounter++; } @Scheduled(fixedDelay = 6000, initialDelay = 6000) public void scheduledWithInitialDelay() { System.out.println("<DI> Incrementing with initial delay"); initialDelayCounter++; } @Scheduled(cron = "**/6 ** ** ** ** **") public void scheduledWithCron() { System.out.println("<C> Incrementing with cron"); cronCounter++; } public int getFixedRateCounter() { return this.fixedRateCounter; } public int getFixedDelayCounter() { return this.fixedDelayCounter; } public int getInitialDelayCounter() { return this.initialDelayCounter; } public int getCronCounter() { return this.cronCounter; } }
該測試的輸出:
<R> Increment at fixed rate <D> Incrementing at fixed delay Start sleeping <C> Incrementing with cron <DI> Incrementing with initial delay <R> Increment at fixed rate <D> Incrementing at fixed delay Wake up ! Main thread execution's name: main AsyncCounter normal increment thread execution's name: main AsyncCounter @Async increment thread execution's name: taskExecutor-1
本文向我們介紹了關(guān)于Spring框架另一個(gè)大家比較感興趣的功能–定時(shí)任務(wù)。我們可以看到,與Linux CRON風(fēng)格配置類似,這些任務(wù)同樣可以按照固定的頻率進(jìn)行定時(shí)任務(wù)的設(shè)置。我們還通過例子證明了使用@Async注解的方法會(huì)在不同線程中執(zhí)行。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Java Spring5學(xué)習(xí)之JdbcTemplate詳解
- Java基礎(chǔ)之Spring5的核心之一IOC容器
- 使用idea和gradle編譯spring5源碼的方法步驟
- spring5新特性全面介紹
- IDEA2020.1構(gòu)建Spring5.2.x源碼的方法
- Spring5.2.x 源碼本地環(huán)境搭建的方法步驟
- spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的聲明詳解
- idea2020導(dǎo)入spring5.1的源碼詳細(xì)教程
- Idea+maven搭建SSH(struts2+hibernate5+spring5)環(huán)境的方法步驟
- Spring5+SpringMvc+Hibernate5整合的實(shí)現(xiàn)
- Spring5中的WebClient使用方法詳解
- spring5 webclient使用指南詳解
- 淺談Spring5 響應(yīng)式編程
- Java基礎(chǔ)之spring5新功能學(xué)習(xí)
相關(guān)文章
linux的shell命令檢測某個(gè)java程序是否執(zhí)行
ps -ef |grep java|grep2016-04-04解決Spring session(redis存儲方式)監(jiān)聽導(dǎo)致創(chuàng)建大量redisMessageListenerConta
這篇文章主要介紹了解決Spring session(redis存儲方式)監(jiān)聽導(dǎo)致創(chuàng)建大量redisMessageListenerContailner-X線程問題,需要的朋友可以參考下2018-08-08Java開發(fā)深入分析講解二叉樹的遞歸和非遞歸遍歷方法
樹是一種重要的非線性數(shù)據(jù)結(jié)構(gòu),直觀地看,它是數(shù)據(jù)元素(在樹中稱為結(jié)點(diǎn))按分支關(guān)系組織起來的結(jié)構(gòu),很象自然界中的樹那樣。樹結(jié)構(gòu)在客觀世界中廣泛存在,如人類社會(huì)的族譜和各種社會(huì)組織機(jī)構(gòu)都可用樹形象表示,本篇介紹二叉樹的遞歸與非遞歸遍歷的方法2022-05-05Springboot項(xiàng)目中如何讓非Spring管理的類獲得一個(gè)注入的Bean
這篇文章主要介紹了Springboot項(xiàng)目中如何讓非Spring管理的類獲得一個(gè)注入的Bean問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12springboot集成Swagger的方法(讓你擁有屬于自己的api管理器)
在大型的項(xiàng)目中,如果你有非常多的接口需要統(tǒng)一管理,或者需要進(jìn)行接口測試,那么我們通常會(huì)在繁雜地api中找到需要進(jìn)行測試或者管理的接口,接下來通過本文給大家介紹springboot集成Swagger的方法讓你擁有屬于自己的api管理器,感興趣的朋友一起看看吧2021-11-11Spring Boot整合web層實(shí)現(xiàn)過程詳解
這篇文章主要介紹了Spring Boot整合web層實(shí)現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Java通過SSLEngine與NIO實(shí)現(xiàn)HTTPS訪問的操作方法
這篇文章主要介紹了Java通過SSLEngine與NIO實(shí)現(xiàn)HTTPS訪問,需要在Connect操作、Connected操作、Read和Write操作中加入SSL相關(guān)的處理即可,需要的朋友可以參考下2021-08-08