Spring?boot異步任務(wù)原理全面分析
前言
我們經(jīng)常在需要提升性能或者項(xiàng)目架構(gòu)解耦的過程中,使用線程池異步執(zhí)行任務(wù),經(jīng)常使用ThreadPoolExecutor創(chuàng)建線程池。那么Spring對異步任務(wù)是如何處理的呢?
1. spring 異步任務(wù)
估計或多或少了解過一些,比如@EnableAsync可以開啟異步任務(wù),@Async用于注解說明當(dāng)前方法是異步執(zhí)行,下面使用demo看看Spring的異步任務(wù)如何執(zhí)行。
pom依賴,其實(shí)僅依賴Spring core context 就可以了,這里演示,另外spring boot還要許多好玩的特性。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.1.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.1.3.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
main & controller
@RestController @SpringBootApplication public class AsyncMain { public static void main(String[] args) { SpringApplication.run(AsyncMain.class, args); } @Autowired private TaskService taskService; @RequestMapping(value = "/async-task", method = RequestMethod.GET) public String asyncMapping(){ System.out.println(Thread.currentThread().getThreadGroup() + "http-------" + Thread.currentThread().getName()); taskService.doTask(); return "exec http ok--------------"; } }
異步任務(wù)服務(wù)
@EnableAsync @Service public class TaskService { @Async public String doTask(){ System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName()); return "do task done"; } }
運(yùn)行main方法,訪問localhost:8080/async-task,控制臺可以看到:
可以看到線程的name是task-1,而http訪問的線程是http-nio-xxx。說明任務(wù)異步執(zhí)行了。然而Spring的異步任務(wù)是如何執(zhí)行的呢,我們也并未創(chuàng)建線程池,難道Spring替我們創(chuàng)建了?
2. Spring boot異步任務(wù)執(zhí)行過程分析
首先,需要執(zhí)行異步任務(wù),必須創(chuàng)建線程池,那我們來揪出Spring創(chuàng)建的線程池,從啟動日志可以看出
Spring默認(rèn)給我們創(chuàng)建了applicationTaskExecutor的ExecutorService的線程池。
通過源碼分析,Spring boot的starter已經(jīng)給我們設(shè)置了默認(rèn)的執(zhí)行器
/** * {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}. * * @author Stephane Nicoll * @author Camille Vienot * @since 2.1.0 */ @ConditionalOnClass(ThreadPoolTaskExecutor.class) @Configuration @EnableConfigurationProperties(TaskExecutionProperties.class) public class TaskExecutionAutoConfiguration { /** * Bean name of the application {@link TaskExecutor}. */ public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; private final TaskExecutionProperties properties; private final ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers; private final ObjectProvider<TaskDecorator> taskDecorator; public TaskExecutionAutoConfiguration(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { this.properties = properties; this.taskExecutorCustomizers = taskExecutorCustomizers; this.taskDecorator = taskDecorator; } @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder() { TaskExecutionProperties.Pool pool = this.properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix()); builder = builder.customizers(this.taskExecutorCustomizers); builder = builder.taskDecorator(this.taskDecorator.getIfUnique()); return builder; } @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } }
追根溯源:在Spring boot的autoconfigure中已經(jīng)定義了默認(rèn)實(shí)現(xiàn)
Spring為我們定義了兩種實(shí)現(xiàn),如上圖所示,根據(jù)Spring boot的配置定律,我們可以通過配置來定義異步任務(wù)的參數(shù)
@ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties { private final Pool pool = new Pool(); /** * Prefix to use for the names of newly created threads. */ private String threadNamePrefix = "task-"; public Pool getPool() { return this.pool; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Pool { /** * Queue capacity. An unbounded capacity does not increase the pool and therefore * ignores the "max-size" property. */ private int queueCapacity = Integer.MAX_VALUE; /** * Core number of threads. */ private int coreSize = 8; /** * Maximum allowed number of threads. If tasks are filling up the queue, the pool * can expand up to that size to accommodate the load. Ignored if the queue is * unbounded. */ private int maxSize = Integer.MAX_VALUE; /** * Whether core threads are allowed to time out. This enables dynamic growing and * shrinking of the pool. */ private boolean allowCoreThreadTimeout = true; /** * Time limit for which threads may remain idle before being terminated. */ private Duration keepAlive = Duration.ofSeconds(60);
省略get set方法,spring boot的配置以spring.task.execution開頭,參數(shù)的設(shè)置參考如上源碼的屬性設(shè)置。
各位可以自行嘗試,當(dāng)然因?yàn)镾pring bean的定義方式,我們可以復(fù)寫bean來達(dá)到自定義的目的
? ? @Lazy ?? ?@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, ?? ??? ??? ?AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) ?? ?@ConditionalOnMissingBean(Executor.class) ?? ?public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { ?? ??? ?return builder.build(); ?? ?}
比如:
@Configuration @EnableAsync public class TaskAsyncConfig { @Bean public Executor initExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //定制線程名稱,還可以定制線程group executor.setThreadFactory(new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "async-task-" + threadNumber.getAndIncrement(), 0); return t; } }); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setKeepAliveSeconds(5); executor.setQueueCapacity(100); // executor.setRejectedExecutionHandler(null); return executor; } }
重啟,訪問localhost:8080/async-task,證明我們寫的Executor已經(jīng)覆蓋系統(tǒng)默認(rèn)了。
3. Spring 異步任務(wù)執(zhí)行過程分析
方法斷點(diǎn)跟蹤
執(zhí)行異步任務(wù)使用Spring CGLib動態(tài)代理AOP實(shí)現(xiàn)
可以看出動態(tài)代理后使用AsyncExecutionInterceptor來處理異步邏輯,執(zhí)行submit方法
同理可以看出,默認(rèn)的taskExecutor使用BeanFactory中獲取。
默認(rèn)使用SimpleAsyncUncaughtExceptionHandler處理異步異常。下面我們來試試
@EnableAsync @Service public class TaskService { @Async public String doTask(){ System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName()); throw new RuntimeException(" I`m a demo test exception-----------------"); } }
默認(rèn)會打印logger.error("Unexpected exception occurred invoking async method: " + method, ex);日志
public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler { private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class); @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { if (logger.isErrorEnabled()) { logger.error("Unexpected exception occurred invoking async method: " + method, ex); } } }
運(yùn)行測試
4. Spring 自定義Executor與自定義異步異常處理
需要實(shí)現(xiàn)AsyncConfigurer接口,可以看到Spring要我們配合EnableAsync與Configuration注解同時使用
/** * Interface to be implemented by @{@link org.springframework.context.annotation.Configuration * Configuration} classes annotated with @{@link EnableAsync} that wish to customize the * {@link Executor} instance used when processing async method invocations or the * {@link AsyncUncaughtExceptionHandler} instance used to process exception thrown from * async method with {@code void} return type. * * <p>Consider using {@link AsyncConfigurerSupport} providing default implementations for * both methods if only one element needs to be customized. Furthermore, backward compatibility * of this interface will be insured in case new customization options are introduced * in the future. * * <p>See @{@link EnableAsync} for usage examples. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see AbstractAsyncConfiguration * @see EnableAsync * @see AsyncConfigurerSupport */ public interface AsyncConfigurer { /** * The {@link Executor} instance to be used when processing async * method invocations. */ @Nullable default Executor getAsyncExecutor() { return null; } /** * The {@link AsyncUncaughtExceptionHandler} instance to be used * when an exception is thrown during an asynchronous method execution * with {@code void} return type. */ @Nullable default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
demo,如下改造
@Configuration @EnableAsync public class TaskAsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //定制線程名稱,還可以定制線程group executor.setThreadFactory(new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { //重新定義一個名稱 Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "async-task-all" + threadNumber.getAndIncrement(), 0); return t; } }); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setKeepAliveSeconds(5); executor.setQueueCapacity(100); // executor.setRejectedExecutionHandler(null); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { System.out.println("do exception by myself"); } }; } }
記住,此時,Spring就不會替我們管理Executor了,需要我們自己初始化
executor.initialize();
觀其源碼就是new 一個ThreadPoolExecutor
@Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
運(yùn)行,結(jié)果如下
總結(jié)
Spring boot將簡單的ThreadPoolExecutor通過封裝成了異步任務(wù),大大方便了程序的開發(fā)。
然而我們在如上的示例中,并沒有處理程序的異步執(zhí)行結(jié)果,其實(shí)Spring定義了結(jié)果的處理
/** * AOP Alliance {@code MethodInterceptor} that processes method invocations * asynchronously, using a given {@link org.springframework.core.task.AsyncTaskExecutor}. * Typically used with the {@link org.springframework.scheduling.annotation.Async} annotation. * * <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@code java.util.concurrent.Future}. In the latter case, the Future handle * returned from the proxy will be an actual asynchronous Future that can be used * to track the result of the asynchronous method execution. However, since the * target method needs to implement the same signature, it will have to return * a temporary Future handle that just passes the return value through * (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult} * or EJB 3.1's {@code javax.ejb.AsyncResult}). * * <p>When the return type is {@code java.util.concurrent.Future}, any exception thrown * during the execution can be accessed and managed by the caller. With {@code void} * return type however, such exceptions cannot be transmitted back. In that case an * {@link AsyncUncaughtExceptionHandler} can be registered to process such exceptions. * * <p>As of Spring 3.1.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is * preferred for use due to its support for executor qualification in conjunction with * Spring's {@code @Async} annotation. * * @author Juergen Hoeller * @author Chris Beams * @author Stephane Nicoll * @since 3.0 * @see org.springframework.scheduling.annotation.Async * @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor */ public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
<p>In terms of target method signatures, any parameter types are supported. ?* However, the return type is constrained to either {@code void} or ?* {@code java.util.concurrent.Future}. In the latter case, the Future handle ?* returned from the proxy will be an actual asynchronous Future that can be used ?* to track the result of the asynchronous method execution. However, since the ?* target method needs to implement the same signature, it will have to return ?* a temporary Future handle that just passes the return value through ?* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult} ?* or EJB 3.1's {@code javax.ejb.AsyncResult}).
如果程序不返回void或者Future,那么通過AsyncResult來返回一個結(jié)果
另外Spring還定義了一個Task,即定時任務(wù)task,原理相同。
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot Web開發(fā)之系統(tǒng)任務(wù)啟動與路徑映射和框架整合
這篇文章主要介紹了SpringBoot Web開發(fā)中的系統(tǒng)任務(wù)啟動與路徑映射和Servlet、Filter、Listener框架整合,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08Java concurrency之公平鎖(二)_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了Java concurrency之公平鎖的第二篇內(nèi)容,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06用java將GBK工程轉(zhuǎn)為uft8的方法實(shí)例
本篇文章主要介紹了用java將GBK工程轉(zhuǎn)為uft8的方法實(shí)例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-08-08SpringBoot使用Maven打包異常-引入外部jar的問題及解決方案
這篇文章主要介紹了SpringBoot使用Maven打包異常-引入外部jar,需要的朋友可以參考下2020-06-06MyBatis插入Insert、InsertSelective的區(qū)別及使用心得
這篇文章主要介紹了MyBatis插入Insert、InsertSelective的區(qū)別及使用心得,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12詳解Java虛擬機(jī)30個常用知識點(diǎn)之1——類文件結(jié)構(gòu)
這篇文章主要介紹了Java虛擬機(jī)類文件結(jié)構(gòu),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03