Java Spring之@Async原理案例詳解
前言
用過(guò)Spring的人多多少少也都用過(guò)@Async注解,至于作用嘛,看注解名,大概能猜出來(lái),就是在方法執(zhí)行的時(shí)候進(jìn)行異步執(zhí)行。
一、如何使用@Async
使用@Async注解主要分兩步:
1.在配置類(lèi)上添加@EnableAsync注解
@ComponentScan(value = "com.wang") @Configuration @EnableAsync public class AppConfig { }
2.在想要異步執(zhí)行的方法上面加上@Async
@Service public class CycleService2 { @Autowired private CycleService1 cycleService1; @Async public void alsoDo() { System.out.println("create cycleService2"); } }
二、源碼解讀
1.@EnableAsync的作用
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. * 此處說(shuō)明的是方法執(zhí)行變成異步,掃描的是哪個(gè)注解,目前默認(rèn)的是Async和Asynchronous,開(kāi)發(fā)者也可以自定義 */ Class<? extends Annotation> annotation() default Annotation.class; /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. * 如何proxyTargetClass被設(shè)置成true,那么spring的所有proxy都會(huì)通過(guò)CGLIB方式實(shí)現(xiàn),不再使用Java默認(rèn)的基于接口的代理實(shí)現(xiàn)方式;而且此處如果設(shè)置,不僅僅是會(huì)影響添加了@Async注解的類(lèi)的proxy方式,加了@Transactional的類(lèi)也會(huì)變成CGLIB代理,不推薦修改;這個(gè)注解只有mode是默認(rèn)的PROXY,才有意義 */ boolean proxyTargetClass() default false; /** * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. * 代理方式的不同,默認(rèn)的是使用Spring的proxy方式,也可以換成原生的AspectJ的proxy方式。 * 這兩個(gè)的區(qū)別作用還是很明顯的 */ AdviceMode mode() default AdviceMode.PROXY; /** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. * 因?yàn)樵赽eanPostProcessor執(zhí)行的時(shí)候,會(huì)根據(jù)order值進(jìn)行排序,此處設(shè)置為最低值,就是想讓其最后執(zhí)行 * 其實(shí)即使不設(shè)置這個(gè)值,因?yàn)锳syncAnnotationBeanPostProcessor繼承了ProxyProcessorSupport,ProxyProcessorSupport中的order默認(rèn)也是最小優(yōu)先級(jí) * */ int order() default Ordered.LOWEST_PRECEDENCE; }
2. AsyncConfigurationSelector的作用
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, * respectively. */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } }
看過(guò)我之前博客的同學(xué)應(yīng)該知道,其實(shí)此處就是往Spring容器中增加一個(gè)新的需要掃描的類(lèi),很明顯可以看到差別主要集中在adviceMode的差別上。
3. adviceMode:PROXY(默認(rèn)值)
引入了ProxyAsyncConfiguration配置類(lèi)
3.1 ProxyAsyncConfiguration
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
作用也很明顯,就是往spring容器中添加了AsyncAnnotationBeanPostProcessor類(lèi)
3.2 AsyncAnnotationBeanPostProcessor
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor { // 刪除了一些無(wú)關(guān)緊要,或者默認(rèn)不會(huì)設(shè)置的屬性 public AsyncAnnotationBeanPostProcessor() { setBeforeExistingAdvisors(true); } /** * 因?yàn)锳syncAnnotationBeanPostProcessor實(shí)現(xiàn)了BeanFactoryAware接口 * 所以在實(shí)例化的過(guò)程中執(zhí)行到initializeBean步驟的時(shí)候,里面第一步就是執(zhí)行各種實(shí)現(xiàn)了Aware接口的接口方法 * 在此處new了一個(gè)advisor。advisor簡(jiǎn)單理解就是:advice+pointcut * @param beanFactory */ @Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; } }
其實(shí)可以看到最重要的方法,就是setBeanFactory了,該方法是在AsyncAnnotationBeanPostProcessor的生命周期最后一步initializeBean里面的第一小步,也就是執(zhí)行所有Aware接口的時(shí)候執(zhí)行。
對(duì)于AOP來(lái)說(shuō),其實(shí)最主要的就是advice+pointcut,也就是advisor,在生命周期的這一步,也創(chuàng)建了advisor。
3.3 AsyncAnnotationAdvisor
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); /** * 這兒設(shè)置符合pointCut需要的注解 * 此處的executor就是一個(gè)擴(kuò)展點(diǎn),如果不想用spring的默認(rèn)單線程線程池,可以自定義一個(gè)線程池 * exceptionHandler,顧名思義,就是我們的方法在線程池中執(zhí)行時(shí)拋出exception該如何handle使用的 * advice也就是咱們的interceptor * pointCut就不多解釋了,就是把設(shè)置符合什么條件會(huì)進(jìn)行interceptor的invoke方法 */ asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); }
可以看到最主要的工作就是buildAdvice和buildPointcut。advice的作用是定義在方法執(zhí)行方面,該如何執(zhí)行;pointcut的作用是定義方法的范圍
3.3.1 buildAdvice
protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { // new了一個(gè)interceptor AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
可以看到advice主要就是定義了一個(gè)爛機(jī)器interceptor,在方法執(zhí)行的時(shí)候進(jìn)行一些攔截,至于executor,是方法執(zhí)行器,默認(rèn)為null,exceptionHandler也默認(rèn)是null。
3.3.1.1 AnnotationAsyncExecutionInterceptor,異步執(zhí)行的原理
在AnnotationAsyncExecutionInterceptor的父類(lèi)AsyncExecutionInterceptor中,實(shí)現(xiàn)了攔截器的接口方法invoke,也就是真實(shí)的方法執(zhí)行邏輯。
/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); /**獲取一個(gè)任務(wù)執(zhí)行器 * 1. 從@Async注解里面獲取配置的任務(wù)執(zhí)行器 * 2. 從Spring容器中找TaskExecutor類(lèi)的bean * 3. 從spring容器中獲取名為"taskExecutor"的bean, * 4. 如果還沒(méi)有,new SimpleAsyncTaskExecutor()) */ AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } //將當(dāng)前方法執(zhí)行封裝成一個(gè)callable對(duì)象,然后放入到線程池里 Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; //任務(wù)提交 return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
可以看到主要做的事情是:
- 尋找任務(wù)執(zhí)行器:
- 從@Async注解里面獲取配置的任務(wù)執(zhí)行器
- 從Spring容器中找TaskExecutor類(lèi)的bean
- 從spring容器中獲取名為"taskExecutor"的bean,
- 如果還沒(méi)有,new SimpleAsyncTaskExecutor())可以看到其實(shí)我們是可以給@Async進(jìn)行任務(wù)執(zhí)行器的配置的。
- 將具體的方法封裝成callable的對(duì)象,然后doSubmit
- 此處我們就看一下默認(rèn)的doSumit,使用的SimpleAsyncTaskExecutor是如何實(shí)現(xiàn)的
- 最終會(huì)執(zhí)行到下面這個(gè)doExecute方法,默認(rèn)情況下threadFactory是null,所以默認(rèn)情況下,我們的方法,每次都是被創(chuàng)建了一個(gè)新的守護(hù)線程來(lái)進(jìn)行方法的執(zhí)行。
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
3.3.1.2 自定義任務(wù)執(zhí)行器
- 可以在配置類(lèi)里new SimpleAsyncTaskExecutor(),然后setThreadFactory,這樣修改了默認(rèn)線程的產(chǎn)生方式
- 比較主流的方式是,定義一個(gè)ThreadPoolTaskExecutor,也就是線程池任務(wù)執(zhí)行器,可以進(jìn)行線程復(fù)用
3.3.2 buildPointcut
/** * Calculate a pointcut for the given async annotation types, if any. * @param asyncAnnotationTypes the async annotation types to introspect * @return the applicable Pointcut object, or {@code null} if none */ protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { // 就是根據(jù)這兩個(gè)匹配器進(jìn)行匹配的 // 檢查類(lèi)上是否有@Async注解 Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); //檢查方法上是否有@Async注解 Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { // 取并集:類(lèi)上加了@Async或者類(lèi)的方法上加了@Async result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
主要方法就是定義了一個(gè)類(lèi)匹配pointcut和一個(gè)方法匹配pointcut。
4 什么時(shí)候判斷進(jìn)行advice的添加呢
當(dāng)然就是在對(duì)某個(gè)bean進(jìn)行proxy的判斷的時(shí)候,也就是bean的生命周期最后一步,也是initializeBean里最后的一步,對(duì)于BeanPostProcessor的執(zhí)行
3.4.1 AsyncAnnotationBeanPostProcessor#postProcessAfterInitialization
要注意的是AsyncAnnotationBeanPostProcessor的postProcessAfterInitialization方法其實(shí)是繼承的是父類(lèi)AbstractAdvisingBeanPostProcessor的。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { // 沒(méi)有通知,或者是AOP的基礎(chǔ)設(shè)施類(lèi),那么不進(jìn)行代理 if (this.advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } // 對(duì)已經(jīng)被代理的類(lèi),不再生成代理,只是將通知添加到代理類(lèi)的邏輯中 // 這里通過(guò)beforeExistingAdvisors決定是將通知添加到所有通知之前還是添加到所有通知之后 // 在使用@Async注解的時(shí)候,beforeExistingAdvisors被設(shè)置成了true, // @Async注解之所以把beforeExistingAdvisors設(shè)置為true,是因?yàn)樵揳dvisor和其他的advisor差別太大了,從情理上講,也應(yīng)該第一個(gè)執(zhí)行 // 意味著整個(gè)方法及其攔截邏輯都會(huì)異步執(zhí)行 if (bean instanceof Advised) { Advised advised = (Advised) bean; // 判斷bean是否符合該advisor的使用范圍,通過(guò)pointcut來(lái)判斷 if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } // 如果還不是一個(gè)代理類(lèi),也需要通過(guò)eligible來(lái)判斷是否符合使用該advisor的條件 if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
而在isEligible中,就是判斷當(dāng)前執(zhí)行生命周期的bean是否滿足我們的@Async注解的使用范圍,主要是通過(guò)其class來(lái)判斷
protected boolean isEligible(Class<?> targetClass) { Boolean eligible = this.eligibleBeans.get(targetClass); if (eligible != null) { return eligible; } if (this.advisor == null) { return false; } // 其實(shí)就是判斷類(lèi)是否可以進(jìn)行添加該advisor,也就是判斷是否符合該advisor的使用條件 // 就是把a(bǔ)dvisor的pointCut拿出來(lái),pointCut里的classMatcher和methodMatcher拿出來(lái)對(duì)類(lèi)及其方法進(jìn)行判斷 eligible = AopUtils.canApply(this.advisor, targetClass); this.eligibleBeans.put(targetClass, eligible); return eligible; }
具體的AopUtils.canApply(this.advisor, targetClass)邏輯就不寫(xiě)了,就是根據(jù)pointcut里設(shè)置的classFilter和methodMatcher類(lèi)判斷當(dāng)前bean的class是否需要進(jìn)行該advisor的使用。
總結(jié)
發(fā)現(xiàn)@Async注解還是挺麻煩的,特別是要寫(xiě)一篇簡(jiǎn)單易懂的博客,更難。
默認(rèn)配置實(shí)現(xiàn)原理:在執(zhí)行的時(shí)候?qū)ethod最終封裝成一個(gè)Runable對(duì)象,然后new一個(gè)線程,通過(guò)線程的start方法,進(jìn)行method的執(zhí)行,來(lái)實(shí)現(xiàn)異步。
到此這篇關(guān)于Java Spring之@Async原理案例詳解的文章就介紹到這了,更多相關(guān)Java Spring之@Async原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java普通項(xiàng)目讀取不到resources目錄下資源文件的解決辦法
這篇文章主要給大家介紹了關(guān)于java普通項(xiàng)目讀取不到resources目錄下資源文件的解決辦法,Web項(xiàng)目中應(yīng)該經(jīng)常有這樣的需求,在maven項(xiàng)目的resources目錄下放一些文件,比如一些配置文件,資源文件等,需要的朋友可以參考下2023-09-09Spring Cloud Zuul路由規(guī)則動(dòng)態(tài)更新解析
這篇文章主要介紹了Spring Cloud Zuul路由規(guī)則動(dòng)態(tài)更新解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Spring Boot中單例類(lèi)實(shí)現(xiàn)對(duì)象的注入方式
這篇文章主要介紹了Spring Boot中單例類(lèi)實(shí)現(xiàn)對(duì)象的注入方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Maven是什么?Maven的概念+作用+倉(cāng)庫(kù)的介紹+常用命令的詳解
Maven是一個(gè)項(xiàng)目管理工具,它包含了一個(gè)對(duì)象模型。一組標(biāo)準(zhǔn)集合,一個(gè)依賴(lài)管理系統(tǒng)。和用來(lái)運(yùn)行定義在生命周期階段中插件目標(biāo)和邏輯.,本文給大家介紹Maven的概念+作用+倉(cāng)庫(kù)的介紹+常用命令,感興趣的的朋友跟隨小編一起看看吧2020-09-09基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(詳解)
下面小編就為大家?guī)?lái)一篇基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06springboot websocket集群(stomp協(xié)議)連接時(shí)候傳遞參數(shù)
這篇文章主要介紹了springboot websocket集群(stomp協(xié)議)連接時(shí)候傳遞參數(shù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07Java使用EasyExcel實(shí)現(xiàn)Excel的導(dǎo)入導(dǎo)出
這篇文章主要給大家介紹了關(guān)于Java使用EasyExcel實(shí)現(xiàn)Excel的導(dǎo)入導(dǎo)出,在各種系統(tǒng)中,導(dǎo)入導(dǎo)出的數(shù)據(jù)一般都是通過(guò)Excel來(lái)完成的,需要的朋友可以參考下2023-07-07SpringBoot?整合?ElasticSearch操作各種高級(jí)查詢搜索
這篇文章主要介紹了SpringBoot?整合?ES?進(jìn)行各種高級(jí)查詢搜索的實(shí)踐記錄,本文主要圍繞?SpringBoot?整合?ElasticSearch?進(jìn)行各種高級(jí)查詢的介紹,需要的朋友可以參考下2022-06-06