Spring中異步注解@Async的使用、原理及使用時可能導致的問題及解決方法
前言
其實最近都在研究事務(wù)相關(guān)的內(nèi)容,之所以寫這么一篇文章是因為前面寫了一篇關(guān)于循環(huán)依賴的文章:
《Spring循環(huán)依賴的解決辦法,你真的懂了嗎》
然后,很多同學碰到了下面這個問題,添加了Spring提供的一個異步注解@Async
循環(huán)依賴無法被解決了,下面是一些讀者的留言跟群里同學碰到的問題:
本著講一個知識點就要講明白、講透徹的原則,我決定單獨寫一篇這樣的文章對@Async
這個注解做一下詳細的介紹,這個注解帶來的問題遠遠不止循環(huán)依賴這么簡單,如果對它不夠熟悉的話建議慎用。
文章要點
@Async的基本使用
這個注解的作用在于可以讓被標注的方法異步執(zhí)行,但是有兩個前提條件
配置類上添加@EnableAsync
注解需要異步執(zhí)行的方法的所在類由Spring管理需要異步執(zhí)行的方法上添加了@Async
注解
我們通過一個Demo體會下這個注解的作用吧
第一步,配置類上開啟異步:
@EnableAsync @Configuration @ComponentScan("com.dmz.spring.async") public class Config { }
第二步,
[code]@Component // 這個類本身要被Spring管理public class DmzAsyncService { @Async // 添加注解表示這
@Component // 這個類本身要被Spring管理 public class DmzAsyncService { @Async // 添加注解表示這個方法要異步執(zhí)行 public void testAsync(){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("testAsync invoked"); } }
第三步,測試異步執(zhí)行
public class Main { public static void main(String[] args) { AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(Config.class); DmzAsyncService bean = ac.getBean(DmzAsyncService.class); bean.testAsync(); System.out.println("main函數(shù)執(zhí)行完成"); } } // 程序執(zhí)行結(jié)果如下: // main函數(shù)執(zhí)行完成 // testAsync invoked
通過上面的例子我們可以發(fā)現(xiàn),DmzAsyncService
中的testAsync
方法是異步執(zhí)行的,那么這背后的原理是什么呢?我們接著分析
原理分析
我們在分析某一個技術(shù)的時候,最重要的事情是,一定一定要找到代碼的入口,像Spring這種都很明顯,入口必定是在@EnableAsync
這個注解上面,我們來看看這個注解干了啥事(本文基于5.2.x
版本)
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented // 這里是重點,導入了一個ImportSelector @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { // 這個配置可以讓程序員配置需要被檢查的注解,默認情況下檢查的就是@Async注解 Class<? extends Annotation> annotation() default Annotation.class; // 默認使用jdk代理 boolean proxyTargetClass() default false; // 默認使用Spring AOP AdviceMode mode() default AdviceMode.PROXY; // 在后續(xù)分析我們會發(fā)現(xiàn),這個注解實際往容器中添加了一個 // AsyncAnnotationBeanPostProcessor,這個后置處理器實現(xiàn)了Ordered接口 // 這個配置主要代表了AsyncAnnotationBeanPostProcessor執(zhí)行的順序 int order() default Ordered.LOWEST_PRECEDENCE; }
上面這個注解做的最重要的事情就是導入了一個AsyncConfigurationSelector
,這個類的源碼如下:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { // 默認會使用SpringAOP進行代理 case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } }
這個類的作用是像容器中注冊了一個ProxyAsyncConfiguration
,這個類的繼承關(guān)系如下:
我們先看下它的父類AbstractAsyncConfiguration
,其源碼如下:
@Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableAsync; @Nullable protected Supplier<Executor> executor; @Nullable protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; // 這里主要就是檢查將其導入的類上是否有EnableAsync注解 // 如果沒有的話就報錯 @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } // 將容器中配置的AsyncConfigurer注入 // 異步執(zhí)行嘛,所以我們可以配置使用的線程池 // 另外也可以配置異常處理器 @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }
再來看看ProxyAsyncConfiguration
這個類的源碼
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); // 將通過AsyncConfigurer配置好的線程池跟異常處理器設(shè)置到這個后置處理器中 bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
這個類本身是一個配置類,它的作用是向容器中添加一個AsyncAnnotationBeanPostProcessor
。到這一步我們基本上就可以明白了,@Async
注解的就是通過AsyncAnnotationBeanPostProcessor
這個后置處理器生成一個代理對象來實現(xiàn)異步的,接下來我們就具體看看AsyncAnnotationBeanPostProcessor
是如何生成代理對象的,我們主要關(guān)注一下幾點即可:
- 是在生命周期的哪一步完成的代理?
- 切點的邏輯是怎么樣的?它會對什么樣的類進行攔截?
- 通知的邏輯是怎么樣的?是如何實現(xiàn)異步的?
基于上面幾個問題,我們進行逐一分析
是在生命周期的哪一步完成的代理?
我們抓住重點,AsyncAnnotationBeanPostProcessor
是一個后置處理器器,按照我們對Spring的了解,大概率是在這個后置處理器的postProcessAfterInitialization
方法中完成了代理,直接定位到這個方法,這個方法位于父類AbstractAdvisingBeanPostProcessor
中,具體代碼如下:
public Object postProcessAfterInitialization(Object bean, String beanName) { // 沒有通知,或者是AOP的基礎(chǔ)設(shè)施類,那么不進行代理 if (this.advisor == null || bean instanceof AopInfrastructureBean) { return bean; } // 對已經(jīng)被代理的類,不再生成代理,只是將通知添加到代理類的邏輯中 // 這里通過beforeExistingAdvisors決定是將通知添加到所有通知之前還是添加到所有通知之后 // 在使用@Async注解的時候,beforeExistingAdvisors被設(shè)置成了true // 意味著整個方法及其攔截邏輯都會異步執(zhí)行 if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } // 判斷需要對哪些Bean進行來代理 if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } return bean; }
果不其然,確實是在這個方法中完成的代理。接著我們就要思考,切點的過濾規(guī)則是什么呢?
切點的邏輯是怎么樣的?
其實也不難猜到肯定就是類上添加了@Async
注解或者類中含有被@Async
注解修飾的方法?;诖耍覀兛纯催@個isEligible
這個方法的實現(xiàn)邏輯,這個方位位于AbstractBeanFactoryAwareAdvisingPostProcessor
中,也是AsyncAnnotationBeanPostProcessor
的父類,對應(yīng)代碼如下:
// AbstractBeanFactoryAwareAdvisingPostProcessor的isEligible方法 // 調(diào)用了父類 protected boolean isEligible(Object bean, String beanName) { return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) && super.isEligible(bean, beanName)); } protected boolean isEligible(Object bean, String beanName) { return isEligible(bean.getClass()); } protected boolean isEligible(Class<?> targetClass) { Boolean eligible = this.eligibleBeans.get(targetClass); if (eligible != null) { return eligible; } if (this.advisor == null) { return false; } // 這里完成的判斷 eligible = AopUtils.canApply(this.advisor, targetClass); this.eligibleBeans.put(targetClass, eligible); return eligible; }
實際上最后就是根據(jù)advisor來確定是否要進行代理,在Spring中基于xml的AOP的詳細步驟這篇文章中我們提到過,advisor實際就是一個綁定了切點的通知,那么AsyncAnnotationBeanPostProcessor
這個advisor是什么時候被初始化的呢?我們直接定位到AsyncAnnotationBeanPostProcessor
的setBeanFactory
方法,其源碼如下:
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); // 在這里new了一個AsyncAnnotationAdvisor AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); // 完成了初始化 this.advisor = advisor; }
我們來看看AsyncAnnotationAdvisor
中的切點匹配規(guī)程是怎么樣的,直接定位到這個類的buildPointcut
方法中,其源碼如下:
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { // 就是根據(jù)這兩個匹配器進行匹配的 Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
代碼很簡單,就是根據(jù)cpc跟mpc兩個匹配器來進行匹配的,第一個是檢查類上是否有@Async注解,第二個是檢查方法是是否有@Async注解。
那么,到現(xiàn)在為止,我們已經(jīng)知道了它在何時創(chuàng)建代理,會為什么對象創(chuàng)建代理,最后我們還需要解決一個問題,代理的邏輯是怎么樣的,異步到底是如何實現(xiàn)的?
通知的邏輯是怎么樣的?是如何實現(xiàn)異步的?
前面也提到了advisor是一個綁定了切點的通知,前面分析了它的切點,那么現(xiàn)在我們就來看看它的通知邏輯,直接定位到AsyncAnnotationAdvisor
中的buildAdvice
方法,源碼如下:
protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
簡單吧,加了一個攔截器而已,對于interceptor類型的對象,我們關(guān)注它的核心方法invoke
就行了,代碼如下:
public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); // 異步執(zhí)行嘛,先獲取到一個線程池 AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } // 然后將這個方法封裝成一個 Callable對象傳入到線程池中執(zhí)行 Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; // 將任務(wù)提交到線程池 return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
導致的問題及解決方案
問題1:循環(huán)依賴報錯
就像在這張圖里這個讀者問的問題,
分為兩點回答:
第一:循環(huán)依賴為什么不能被解決?
這個問題其實很簡單,在《講一講Spring中的循環(huán)依賴》這篇文章中我從兩個方面分析了循環(huán)依賴的處理流程
簡單對象間的循環(huán)依賴處理AOP對象間的循環(huán)依賴處理
按照這種思路,@Async
注解導致的循環(huán)依賴應(yīng)該屬于AOP對象間的循環(huán)依賴
,也應(yīng)該能被處理。但是,重點來了,解決AOP對象間循環(huán)依賴的核心方法是三級緩存,如下:
在三級緩存緩存了一個工廠對象,這個工廠對象會調(diào)用getEarlyBeanReference
方法來獲取一個早期的代理對象的引用,其源碼如下:
protected Object getEarlyBeanReference(String beanName, RootBeanDefinition mbd, Object bean) { Object exposedObject = bean; if (!mbd.isSynthetic() && hasInstantiationAwareBeanPostProcessors()) { for (BeanPostProcessor bp : getBeanPostProcessors()) { // 看到這個判斷了嗎,通過@EnableAsync導入的后置處理器 // AsyncAnnotationBeanPostProcessor根本就不是一個SmartInstantiationAwareBeanPostProcessor // 這就意味著即使我們通過AsyncAnnotationBeanPostProcessor創(chuàng)建了一個代理對象 // 但是早期暴露出去的用于給別的Bean進行注入的那個對象還是原始對象 if (bp instanceof SmartInstantiationAwareBeanPostProcessor) { SmartInstantiationAwareBeanPostProcessor ibp = (SmartInstantiationAwareBeanPostProcessor) bp; exposedObject = ibp.getEarlyBeanReference(exposedObject, beanName); } } } return exposedObject; }
看完上面的代碼循環(huán)依賴的問題就很明顯了,因為早期暴露的對象跟最終放入容器中的對象不是同一個,所以報錯了。報錯的具體位置我在談?wù)勎覍pring Bean 生命周期的理解 文章末尾已經(jīng)分析過了,本文不再贅述
解決方案
就以上面讀者給出的Demo為例,只需要在為B注入A時添加一個@Lazy
注解即可
@Component public class B implements BService { @Autowired @Lazy private A a; public void doSomething() { } }
這個注解的作用在于,當為B注入A時,會為A生成一個代理對象注入到B中,當真正調(diào)用代理對象的方法時,底層會調(diào)用getBean(a)
去創(chuàng)建A對象,然后調(diào)用方法,這個注解的處理時機是在org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveDependency
方法中,處理這個注解的代碼位于org.springframework.context.annotation.ContextAnnotationAutowireCandidateResolver#buildLazyResolutionProxy
,這些代碼其實都在我之前的文章中分析過了
《Spring雜談 | Spring中的AutowireCandidateResolver》
《談?wù)凷pring中的對象跟Bean,你知道Spring怎么創(chuàng)建對象的嗎?》
所以本文不再做詳細分析
問題2:默認線程池不會復用線程
我覺得這是這個注解最坑的地方,沒有之一!我們來看看它默認使用的線程池是哪個,在前文的源碼分析中,我們可以看到?jīng)Q定要使用線程池的方法是org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
。其源碼如下:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; // 可以在@Async注解中配置線程池的名字 String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { // 獲取默認的線程池 targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
最終會調(diào)用到org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
這個方法中
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
可以看到,它默認使用的線程池是SimpleAsyncTaskExecutor
。我們不看這個類的源碼,只看它上面的文檔注釋,如下:
主要說了三點
- 為每個任務(wù)新起一個線程
- 默認線程數(shù)不做限制
- 不復用線程
就這三點,你還敢用嗎?只要你的任務(wù)耗時長一點,說不定服務(wù)器就給你來個OOM
。
解決方案
最好的辦法就是使用自定義的線程池,主要有這么幾種配置方法
在之前的源碼分析中,我們可以知道,可以通過AsyncConfigurer
來配置使用的線程池
如下:
public class DmzAsyncConfigurer implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { // 創(chuàng)建自定義的線程池 } }
直接在@Async注解中配置要使用的線程池的名稱
如下:
public class A implements AService { private B b; @Autowired public void setB(B b) { System.out.println(b); this.b = b; } @Async("dmzExecutor") public void doSomething() { } }
@EnableAsync @Configuration @ComponentScan("com.dmz.spring.async") @Aspect public class Config { @Bean("dmzExecutor") public Executor executor(){ // 創(chuàng)建自定義的線程池 return executor; } }
總結(jié)
本文主要介紹了Spring中異步注解的使用、原理及可能碰到的問題,針對每個問題文中也給出了方案。希望通過這篇文章能幫助你徹底掌握@Async
注解的使用,知其然并知其所以然!
到此這篇關(guān)于Spring中異步注解@Async的使用、原理及使用時可能導致的問題及解決方法的文章就介紹到這了,更多相關(guān)Spring 異步注解@Async使用原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot深入探究四種靜態(tài)資源訪問的方式
這一節(jié)詳細的學習一下SpringBoot的靜態(tài)資源訪問相關(guān)的知識點。像這樣的知識點還挺多,比如SpringBoot2的Junit單元測試等等。本章我們來了解靜態(tài)資源訪問的四種方式2022-05-05SparkSQL讀取hive數(shù)據(jù)本地idea運行的方法詳解
這篇文章主要介紹了SparkSQL讀取hive數(shù)據(jù)本地idea運行的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09MyBatis存儲過程、MyBatis分頁、MyBatis一對多增刪改查操作
本文通過一段代碼給大家介紹了MyBatis存儲過程、MyBatis分頁、MyBatis一對多增刪改查操作,非常不錯,具有參考借鑒價值,感興趣的朋友一起看看吧2016-11-11SpringAOP中基于注解實現(xiàn)通用日志打印方法詳解
這篇文章主要介紹了SpringAOP中基于注解實現(xiàn)通用日志打印方法詳解,在日常開發(fā)中,項目里日志是必不可少的,一般有業(yè)務(wù)日志,數(shù)據(jù)庫日志,異常日志等,主要用于幫助程序猿后期排查一些生產(chǎn)中的bug,需要的朋友可以參考下2023-12-12SpringBoot3和ShardingSphere5框架實現(xiàn)數(shù)據(jù)分庫分表
這篇文章主要介紹了SpringBoot3和ShardingSphere5框架實現(xiàn)數(shù)據(jù)分庫分表的相關(guān)資料,需要的朋友可以參考下2023-08-08Spring 重定向(Redirect)指南及相關(guān)策略問題
本文介紹了在Spring中實現(xiàn)重定向的三種不同方法,在執(zhí)行這些重定向時如何處理/傳遞屬性以及如何處理HTTP POST請求的重定向。關(guān)于Spring 重定向(Redirect)指南的相關(guān)知識大家參考下本文2017-11-11