Spring?EnableAsync注解異步執(zhí)行源碼解析
概述
基于 Spring Framework v5.2.6.RELEASE
Spring 終有一種非常簡便的方法使 Bean 中的一個方法變成異步執(zhí)行的方法,那就是在方法上標(biāo)記 @Async 注解,想要開啟這一特性,需要在一個配置類上標(biāo)記 @EnableAsync 注解。
本文將通過源碼分析 @EnableAsync 注解是如何開啟這一特性的。
@EnableAsync 分析
@EnableAsync 注解的源碼如下。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation.class; boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; }
注解的每一個屬性都指定了默認(rèn)值,后續(xù)的分析也會基于默認(rèn)的屬性值進(jìn)行分析。除此之外,注解上的 @Import 元注解引入了 AsyncConfigurationSelector 類。
從它的類關(guān)系中可以看出,AsyncConfigurationSelector 實(shí)現(xiàn)了 ImportSelector 接口,因此,當(dāng) Spring 掃描到配置類后,會執(zhí)行它的 selectImports 方法,獲取一個包含配置類名稱的數(shù)組,用于加載對應(yīng)的配置。
AsyncConfigurationSelector 雖然也包含了selectImports
方法,但是從參數(shù)類型中可以看出它不是接口中的selectImports
方法的實(shí)現(xiàn)方法,要找到接口中的實(shí)現(xiàn)方法,我們需要去 AsyncConfigurationSelector 的父類 AdviceModeImportSelector 中。
@Override public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector"); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null) { throw new IllegalArgumentException(String.format( "@%s is not present on importing class '%s' as expected", annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); String[] imports = selectImports(adviceMode); if (imports == null) { throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode); } return imports; }
這個方法中,主要是從 @EnableAsync 注解獲取各項(xiàng)屬性的值,然后使用adviceMode
屬性,調(diào)用另一個selectImports
方法獲取最終的結(jié)果。
此處被調(diào)用的selectImports
方法,就是 AsyncConfigurationSelector 中的 selectImports
方法。
@Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
在 @EnableAsync 注解中,mode的默認(rèn)值是AdviceMode.PROXY
,因此,這里引入的配置類是 ProxyAsyncConfiguration。
接下來分析 ProxyAsyncConfiguration 類。
ProxyAsyncConfiguration 分析
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
在 ProxyAsyncConfiguration 中,只有一個 Bean 配置,類型是 AsyncAnnotationBeanPostProcessor,由此可以知道,@EnableAsync 所開啟的功能,是通過 Bean 的后處理器來實(shí)現(xiàn)的。
上述的方法體中,通過構(gòu)造方法創(chuàng)建了 AsyncAnnotationBeanPostProcessor 對象。
public AsyncAnnotationBeanPostProcessor() { setBeforeExistingAdvisors(true); }
構(gòu)造方法中設(shè)置了一個屬性值,這個屬性是是beforeExistingAdvisors
,定義在父類 AbstractAdvisingBeanPostProcessor 中,這個屬性的默認(rèn)值是false,當(dāng)它的值為true時(shí),會將新的增強(qiáng)邏輯添加到增強(qiáng)邏輯列表的開頭而不是最后。
也就是說,@EnableAsync 提供的異步執(zhí)行特性,是基于 AOP 特性來實(shí)現(xiàn)的。
接著往下看,在創(chuàng)建了 AsyncAnnotationBeanPostProcessor 對象之后,為其配置了一些屬性,有一些屬性的值是從 @EnableAsync 屬性值獲取的,還有兩個屬性值需要留意,就是this.executor
和this.exceptionHandler
,這兩個成員變量的值是從哪兒來的呢?
我們可以找到 ProxyAsyncConfiguration 的父類 AbstractAsyncConfiguration,其中有一個標(biāo)記了 @Autowired 注解的方法。
// org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; }
如果我們自己配置了線程池和異常處理器,則會在這里執(zhí)行配置,這樣,我們配置的線程池和異常處理器就會被添加到 AsyncAnnotationBeanPostProcessor 中。
接下來,我們再分析 AsyncAnnotationBeanPostProcessor 后處理器是如何工作的。
AsyncAnnotationBeanPostProcessor 分析
從它的類繼承關(guān)系中可以看出,它是一個基于 AOP 特性來為 Bean 中的方法提供異步執(zhí)行功能的 Bean 后處理器。
AsyncAnnotationBeanPostProcessor 同時(shí)實(shí)現(xiàn)了 BeanFactoryAware 接口,在它的setBeanFactory
方法中,完成了 Advisor 的創(chuàng)建。
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
這里創(chuàng)建的 Advisor 類型是 AsyncAnnotationAdvisor,創(chuàng)建完之后,它被復(fù)制給了advisor
成員變量,這個成員變量定義在 AsyncAnnotationBeanPostProcessor 的父類 AbstractBeanFactoryAwareAdvisingPostProcessor 中。
這個advisor
成員變量就是處理增強(qiáng)邏輯的對象。
AsyncAnnotationAdvisor 分析
關(guān)于 Spring 是如何在后處理器中為 Bean 創(chuàng)建代理對象以及如何向代理對象中加入增強(qiáng)邏輯的,我之前的文章有很詳細(xì)的分析,可以閱讀之前關(guān)于 AOP 原理的分析文章來了解。下面我們直接分析 AsyncAnnotationAdvisor,它是完成方法異步執(zhí)行的核心。
一個 Advisor 通常有兩個非常重要的部分,一個是 Pointcut,用于匹配需要增強(qiáng)的方法,另一個是 Advice 也就是具體的增強(qiáng)邏輯。對于 AsyncAnnotationAdvisor 來說,這兩個部分都是在它的構(gòu)造方法中構(gòu)建的。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); }
其中可以看到兩行關(guān)鍵的代碼,他們分別完成了advice
和pointcut
成員變量的構(gòu)建。
this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes);
下面分別來看這兩部分。
Advice 構(gòu)建
先看buildAdvice
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildAdvice protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
Advice 的構(gòu)建比較簡單,這里可以看到,最終構(gòu)建的 Advice 是一個 AnnotationAsyncExecutionInterceptor 類型的攔截器,除了調(diào)用構(gòu)造方法創(chuàng)建之外,還配置了executor
和exceptionHandler
,這個攔截器應(yīng)該就是完成 AOP 增強(qiáng)邏輯的攔截器,我們放到后文中分析。
Pointcut 構(gòu)建
下面再看buildPointcut
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildPointcut protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
這個方法的邏輯比較簡單,首先創(chuàng)建了兩個 Pointcut 對象,cpc
用于匹配類型,mpc
用于匹配方法,他們的邏輯都很簡單,就是看類或者方法的定義是否包含 @Async 注解。
最后再將兩者合并為一個 ComposablePointcut 對象返回,ComposablePointcut 的作用就是將多個 Pointcut 對象合并成一個。
AnnotationAsyncExecutionInterceptor 分析
了解完上面的內(nèi)容,接下來就開始分析 AnnotationAsyncExecutionInterceptor 攔截器。它是一個包含 AOP 增強(qiáng)邏輯的攔截器,也是完成方法異步調(diào)用的核心邏輯。
AnnotationAsyncExecutionInterceptor 要完成它的任務(wù),有兩個比較核心的功能,一個是目標(biāo)方法的匹配,另一個就是攔截器的邏輯。目標(biāo)方法的匹配邏輯,我們在上文中已經(jīng)介紹過了,以下主要分析其攔截器邏輯,也就是它的invoke
方法。
以上是 AnnotationAsyncExecutionInterceptor 的類關(guān)系圖,它實(shí)現(xiàn)了 MethodInterceptor 接口,invoke
方法的實(shí)現(xiàn)在父類 AsyncExecutionInterceptor 中。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke @Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
從上面的源碼中可以看到三個關(guān)鍵的步驟:
- 找到目標(biāo)方法,并根據(jù)目標(biāo)方法獲取到執(zhí)行它的 AsyncTaskExecutor。
- 將目標(biāo)方法的調(diào)用,封裝到一個 Callable 異步任務(wù)
task
當(dāng)中。 - 通過doSubmit方法來異步調(diào)用上一步封裝的
task
。
下面我們詳細(xì)分析這三個步驟。
AsyncTaskExecutor 查找
AsyncTaskExecutor 在determineAsyncExecutor
方法中完成。
@Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
首先會從executors
中根據(jù)方法獲取對應(yīng)的 AsyncTaskExecutor,executors
是一個用來緩存 Executor 的成員變量。
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
當(dāng)?shù)谝淮芜M(jìn)入這個方法的時(shí)候,executors
肯定是空的,因此會進(jìn)入if
語句的邏輯獲取 Executor 然后再將其添加到executors
中。在if語句中,首先會通過getExecutorQualifier
方法獲取一個qualifier
,我們進(jìn)入方法查看獲取的過程。
// org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier @Override @Nullable protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); }
這個方法會從目標(biāo)方法或者其所在的類型上的 @Async 注解的value
屬性,作為方法的返回值復(fù)制給qualifier
。這個qualifier
的值是一個 Executor 的 Bean 名稱,也就是說,我們可以通過 @Async 的value
屬性指定執(zhí)行異步任務(wù)的 Executor 的 Bean 名稱。
如果qualifier
不是空的,那么,就會通過findQualifiedExecutor方法從 Spring 容器中獲取對應(yīng)的 Executor 實(shí)例。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#findQualifiedExecutor @Nullable protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) { if (beanFactory == null) { throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'"); } return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier); }
如果qualifier
是空的,那么就會通過this.defaultExecutor.get()
獲取默認(rèn)的 Executor,那么,默認(rèn)的 Executor 是什么呢?我們需要在去 AsyncAnnotationAdvisor 的buildAdvice
方法中,回顧一下 AnnotationAsyncExecutionInterceptor 創(chuàng)建的過程。
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
以上是 AnnotationAsyncExecutionInterceptor 創(chuàng)建的語句,從這里找到對應(yīng)的構(gòu)造方法。
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) { super(defaultExecutor); }
構(gòu)造方法需要提供一個默認(rèn)的 Executor,也就是defaultExecutor
參數(shù),這里提供了null
,不過我們可以繼續(xù)查看父類的構(gòu)造方法。
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) { this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new); }
在被調(diào)用的 AsyncExecutionAspectSupport 的構(gòu)造方法中,通過getDefaultExecutor
方法,提供了默認(rèn)的 Executor。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor @Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
這里看到,默認(rèn)的 Executor 是一個 SimpleAsyncTaskExecutor,也就是說,如果我們沒有在項(xiàng)目中配置線程池,則默認(rèn)使用 SimpleAsyncTaskExecutor 來執(zhí)行異步任務(wù)。
Callable 任務(wù)封裝
得到 Executor 之后,就是任務(wù)的封裝,這一步很簡單,就是將目標(biāo)方法的調(diào)用放到一個 Callable 類型的任務(wù)的call
方法中。
doSubmit 異步執(zhí)行方法
最后一步就是任務(wù)的提交,通過doSubmit
方法完成。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit @Nullable protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
其實(shí)就是調(diào)用了 Executor 的submit
異步執(zhí)行了任務(wù)。
不過這里有一點(diǎn)要說明,雖然在我們沒有配置 Excutor 的情況下 ,Spring 會使用默認(rèn)的 SimpleAsyncTaskExecutor 來執(zhí)行異步任務(wù),但是 SimpleAsyncTaskExecutor 會為每一個任務(wù)創(chuàng)建一個新的線程,而不是使用線程池來完成,很容易導(dǎo)致內(nèi)存溢出,因此,在實(shí)踐中最好為異步任務(wù)配置合適的線程池。
總結(jié)
本文以 @EnableAsync 作為切入點(diǎn),分析了 Spring 開啟基于注解的異步任務(wù)特性的原理,更多關(guān)于Spring EnableAsync注解的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
深入理解Java定時(shí)調(diào)度(Timer)機(jī)制
這篇文章主要介紹了深入理解Java定時(shí)調(diào)度(Timer)機(jī)制,本節(jié)我們主要分析 Timer 的功能。小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-01-01Java 使用json-lib處理JSON詳解及實(shí)例代碼
這篇文章主要介紹了Java 使用json-lib處理JSON詳解及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2017-02-02解決idea中maven新增的配置文件xx.xml沒生效問題
這篇文章主要介紹了如何解決idea中maven新增的配置文件xx.xml沒生效問題,公司項(xiàng)目有用自己的`私服,Maven正常去私服下載jar包是沒問題的,但阿里云鏡像找不到相關(guān)的jar包報(bào)錯,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06Java synchronize底層實(shí)現(xiàn)原理及優(yōu)化
這篇文章主要介紹了Java synchronize底層實(shí)現(xiàn)原理及優(yōu)化,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03手把手教你實(shí)現(xiàn)Java第三方應(yīng)用登錄
本文主要介紹了手把手教你實(shí)現(xiàn)Java第三方應(yīng)用登錄,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08