Spring?boot異步任務(wù)原理全面分析
前言
我們經(jīng)常在需要提升性能或者項目架構(gòu)解耦的過程中,使用線程池異步執(zhí)行任務(wù),經(jīng)常使用ThreadPoolExecutor創(chuàng)建線程池。那么Spring對異步任務(wù)是如何處理的呢?
1. spring 異步任務(wù)
估計或多或少了解過一些,比如@EnableAsync可以開啟異步任務(wù),@Async用于注解說明當前方法是異步執(zhí)行,下面使用demo看看Spring的異步任務(wù)如何執(zhí)行。
pom依賴,其實僅依賴Spring core context 就可以了,這里演示,另外spring boot還要許多好玩的特性。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.3.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>main & controller
@RestController
@SpringBootApplication
public class AsyncMain {
public static void main(String[] args) {
SpringApplication.run(AsyncMain.class, args);
}
@Autowired
private TaskService taskService;
@RequestMapping(value = "/async-task", method = RequestMethod.GET)
public String asyncMapping(){
System.out.println(Thread.currentThread().getThreadGroup() + "http-------" + Thread.currentThread().getName());
taskService.doTask();
return "exec http ok--------------";
}
}異步任務(wù)服務(wù)
@EnableAsync
@Service
public class TaskService {
@Async
public String doTask(){
System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName());
return "do task done";
}
}運行main方法,訪問localhost:8080/async-task,控制臺可以看到:

可以看到線程的name是task-1,而http訪問的線程是http-nio-xxx。說明任務(wù)異步執(zhí)行了。然而Spring的異步任務(wù)是如何執(zhí)行的呢,我們也并未創(chuàng)建線程池,難道Spring替我們創(chuàng)建了?
2. Spring boot異步任務(wù)執(zhí)行過程分析
首先,需要執(zhí)行異步任務(wù),必須創(chuàng)建線程池,那我們來揪出Spring創(chuàng)建的線程池,從啟動日志可以看出

Spring默認給我們創(chuàng)建了applicationTaskExecutor的ExecutorService的線程池。
通過源碼分析,Spring boot的starter已經(jīng)給我們設(shè)置了默認的執(zhí)行器
/**
* {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}.
*
* @author Stephane Nicoll
* @author Camille Vienot
* @since 2.1.0
*/
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {
/**
* Bean name of the application {@link TaskExecutor}.
*/
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
private final TaskExecutionProperties properties;
private final ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers;
private final ObjectProvider<TaskDecorator> taskDecorator;
public TaskExecutionAutoConfiguration(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
this.properties = properties;
this.taskExecutorCustomizers = taskExecutorCustomizers;
this.taskDecorator = taskDecorator;
}
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.taskExecutorCustomizers);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
}追根溯源:在Spring boot的autoconfigure中已經(jīng)定義了默認實現(xiàn)

Spring為我們定義了兩種實現(xiàn),如上圖所示,根據(jù)Spring boot的配置定律,我們可以通過配置來定義異步任務(wù)的參數(shù)
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";
public Pool getPool() {
return this.pool;
}
public String getThreadNamePrefix() {
return this.threadNamePrefix;
}
public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
public static class Pool {
/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* Core number of threads.
*/
private int coreSize = 8;
/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;
/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;
/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);省略get set方法,spring boot的配置以spring.task.execution開頭,參數(shù)的設(shè)置參考如上源碼的屬性設(shè)置。
各位可以自行嘗試,當然因為Spring bean的定義方式,我們可以復寫bean來達到自定義的目的
? ? @Lazy
?? ?@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
?? ??? ??? ?AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
?? ?@ConditionalOnMissingBean(Executor.class)
?? ?public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
?? ??? ?return builder.build();
?? ?}比如:
@Configuration
@EnableAsync
public class TaskAsyncConfig {
@Bean
public Executor initExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//定制線程名稱,還可以定制線程group
executor.setThreadFactory(new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r,
"async-task-" + threadNumber.getAndIncrement(),
0);
return t;
}
});
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setKeepAliveSeconds(5);
executor.setQueueCapacity(100);
// executor.setRejectedExecutionHandler(null);
return executor;
}
}重啟,訪問localhost:8080/async-task,證明我們寫的Executor已經(jīng)覆蓋系統(tǒng)默認了。

3. Spring 異步任務(wù)執(zhí)行過程分析
方法斷點跟蹤

執(zhí)行異步任務(wù)使用Spring CGLib動態(tài)代理AOP實現(xiàn)

可以看出動態(tài)代理后使用AsyncExecutionInterceptor來處理異步邏輯,執(zhí)行submit方法

同理可以看出,默認的taskExecutor使用BeanFactory中獲取。

默認使用SimpleAsyncUncaughtExceptionHandler處理異步異常。下面我們來試試
@EnableAsync
@Service
public class TaskService {
@Async
public String doTask(){
System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName());
throw new RuntimeException(" I`m a demo test exception-----------------");
}
}默認會打印logger.error("Unexpected exception occurred invoking async method: " + method, ex);日志
public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
if (logger.isErrorEnabled()) {
logger.error("Unexpected exception occurred invoking async method: " + method, ex);
}
}
}運行測試

4. Spring 自定義Executor與自定義異步異常處理
需要實現(xiàn)AsyncConfigurer接口,可以看到Spring要我們配合EnableAsync與Configuration注解同時使用
/**
* Interface to be implemented by @{@link org.springframework.context.annotation.Configuration
* Configuration} classes annotated with @{@link EnableAsync} that wish to customize the
* {@link Executor} instance used when processing async method invocations or the
* {@link AsyncUncaughtExceptionHandler} instance used to process exception thrown from
* async method with {@code void} return type.
*
* <p>Consider using {@link AsyncConfigurerSupport} providing default implementations for
* both methods if only one element needs to be customized. Furthermore, backward compatibility
* of this interface will be insured in case new customization options are introduced
* in the future.
*
* <p>See @{@link EnableAsync} for usage examples.
*
* @author Chris Beams
* @author Stephane Nicoll
* @since 3.1
* @see AbstractAsyncConfiguration
* @see EnableAsync
* @see AsyncConfigurerSupport
*/
public interface AsyncConfigurer {
/**
* The {@link Executor} instance to be used when processing async
* method invocations.
*/
@Nullable
default Executor getAsyncExecutor() {
return null;
}
/**
* The {@link AsyncUncaughtExceptionHandler} instance to be used
* when an exception is thrown during an asynchronous method execution
* with {@code void} return type.
*/
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}demo,如下改造
@Configuration
@EnableAsync
public class TaskAsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//定制線程名稱,還可以定制線程group
executor.setThreadFactory(new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
//重新定義一個名稱
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r,
"async-task-all" + threadNumber.getAndIncrement(),
0);
return t;
}
});
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setKeepAliveSeconds(5);
executor.setQueueCapacity(100);
// executor.setRejectedExecutionHandler(null);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.out.println("do exception by myself");
}
};
}
}記住,此時,Spring就不會替我們管理Executor了,需要我們自己初始化
executor.initialize();
觀其源碼就是new 一個ThreadPoolExecutor
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}運行,結(jié)果如下

總結(jié)
Spring boot將簡單的ThreadPoolExecutor通過封裝成了異步任務(wù),大大方便了程序的開發(fā)。
然而我們在如上的示例中,并沒有處理程序的異步執(zhí)行結(jié)果,其實Spring定義了結(jié)果的處理
/**
* AOP Alliance {@code MethodInterceptor} that processes method invocations
* asynchronously, using a given {@link org.springframework.core.task.AsyncTaskExecutor}.
* Typically used with the {@link org.springframework.scheduling.annotation.Async} annotation.
*
* <p>In terms of target method signatures, any parameter types are supported.
* However, the return type is constrained to either {@code void} or
* {@code java.util.concurrent.Future}. In the latter case, the Future handle
* returned from the proxy will be an actual asynchronous Future that can be used
* to track the result of the asynchronous method execution. However, since the
* target method needs to implement the same signature, it will have to return
* a temporary Future handle that just passes the return value through
* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
* or EJB 3.1's {@code javax.ejb.AsyncResult}).
*
* <p>When the return type is {@code java.util.concurrent.Future}, any exception thrown
* during the execution can be accessed and managed by the caller. With {@code void}
* return type however, such exceptions cannot be transmitted back. In that case an
* {@link AsyncUncaughtExceptionHandler} can be registered to process such exceptions.
*
* <p>As of Spring 3.1.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is
* preferred for use due to its support for executor qualification in conjunction with
* Spring's {@code @Async} annotation.
*
* @author Juergen Hoeller
* @author Chris Beams
* @author Stephane Nicoll
* @since 3.0
* @see org.springframework.scheduling.annotation.Async
* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
*/
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {<p>In terms of target method signatures, any parameter types are supported.
?* However, the return type is constrained to either {@code void} or
?* {@code java.util.concurrent.Future}. In the latter case, the Future handle
?* returned from the proxy will be an actual asynchronous Future that can be used
?* to track the result of the asynchronous method execution. However, since the
?* target method needs to implement the same signature, it will have to return
?* a temporary Future handle that just passes the return value through
?* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
?* or EJB 3.1's {@code javax.ejb.AsyncResult}).如果程序不返回void或者Future,那么通過AsyncResult來返回一個結(jié)果
另外Spring還定義了一個Task,即定時任務(wù)task,原理相同。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot Web開發(fā)之系統(tǒng)任務(wù)啟動與路徑映射和框架整合
這篇文章主要介紹了SpringBoot Web開發(fā)中的系統(tǒng)任務(wù)啟動與路徑映射和Servlet、Filter、Listener框架整合,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-08-08
Java concurrency之公平鎖(二)_動力節(jié)點Java學院整理
這篇文章主要為大家詳細介紹了Java concurrency之公平鎖的第二篇內(nèi)容,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06
SpringBoot使用Maven打包異常-引入外部jar的問題及解決方案
這篇文章主要介紹了SpringBoot使用Maven打包異常-引入外部jar,需要的朋友可以參考下2020-06-06
MyBatis插入Insert、InsertSelective的區(qū)別及使用心得
這篇文章主要介紹了MyBatis插入Insert、InsertSelective的區(qū)別及使用心得,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
詳解Java虛擬機30個常用知識點之1——類文件結(jié)構(gòu)
這篇文章主要介紹了Java虛擬機類文件結(jié)構(gòu),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03

