spring的異步執(zhí)行使用與源碼詳解
在實(shí)際的開(kāi)發(fā)過(guò)程中,有些業(yè)務(wù)邏輯使用異步的方式處理更為合理。比如在某個(gè)業(yè)務(wù)邏輯中,需要把一些數(shù)據(jù)存入到redis緩存中,這個(gè)操作只是一個(gè)輔助的功能,成功或者失敗對(duì)主業(yè)務(wù)并不會(huì)產(chǎn)生根本影響,這個(gè)過(guò)程可以通過(guò)異步的方法去進(jìn)行。
Spring中通過(guò)在方法上設(shè)置@Async
注解,可使得方法被異步調(diào)用。也就是說(shuō)該方法會(huì)在調(diào)用時(shí)立即返回,而這個(gè)方法的實(shí)際執(zhí)行交給Spring的TaskExecutor去完成。
異步執(zhí)行的使用
配置類(lèi)
使用@EnableAsync注解開(kāi)啟異步功能。
package com.morris.spring.config; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @EnableAsync // 開(kāi)啟Async public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { // 自定義線程池 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(4); executor.setQueueCapacity(10); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } }
service層的使用
在需要異步執(zhí)行的方法上面加上@Async注解。
package com.morris.spring.service; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @Slf4j public class AsyncService { @Async public void noResult() { log.info("execute noResult"); } @Async public Future<String> hasResult() throws InterruptedException { log.info("execute hasResult"); TimeUnit.SECONDS.sleep(5); return new AsyncResult<>("hasResult success"); } @Async public CompletableFuture<String> completableFuture() throws InterruptedException { log.info(" execute completableFuture"); TimeUnit.SECONDS.sleep(5); return CompletableFuture.completedFuture("completableFuture success"); } }
測(cè)試類(lèi)
package com.morris.spring.demo.async; import com.morris.spring.config.AsyncConfig; import com.morris.spring.service.AsyncService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * 異步調(diào)用的演示 */ @Slf4j public class AsyncDemo { @Test public void test() throws ExecutionException, InterruptedException { AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(); applicationContext.register(AsyncService.class); applicationContext.register(AsyncConfig.class); applicationContext.refresh(); AsyncService asyncService = applicationContext.getBean(AsyncService.class); asyncService.noResult(); // 無(wú)結(jié)果 Future<String> future = asyncService.hasResult(); log.info("hasResult: {}", future.get()); // 有結(jié)果 CompletableFuture<String> completableFuture = asyncService.completableFuture(); completableFuture.thenAcceptAsync(System.out::println);// 異步回調(diào) log.info("completableFuture call down"); } }
運(yùn)行結(jié)果如下:
INFO MyExecutor-1 AsyncService:16 - execute noResult INFO MyExecutor-2 AsyncService:21 - execute hasResult INFO main AsyncDemo:29 - hasResult: hasResult success INFO MyExecutor-1 AsyncService:28 - execute completableFuture INFO main AsyncDemo:33 - completableFuture call down
通過(guò)日志可以發(fā)現(xiàn)AsyncService的方法都是通過(guò)線程名為MyExecutor-1的線程執(zhí)行的,這個(gè)名稱(chēng)的前綴是在AsyncConfig中指定的,而不是通過(guò)main線程執(zhí)行的。
兩個(gè)疑問(wèn):
- 是否可以不配置Executor線程池,Spring會(huì)默認(rèn)創(chuàng)建默認(rèn)的Executor,還是會(huì)報(bào)錯(cuò)?
- Executor線程池中執(zhí)行任務(wù)時(shí)如果拋出了異常,可否自定義異常的處理類(lèi)對(duì)異常進(jìn)行捕獲處理?
源碼分析
@EnableAsync
@EnableAsync主要是向Spring容器中導(dǎo)入了AsyncConfigurationSelector類(lèi)。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync {
AsyncConfigurationSelector
AsyncConfigurationSelector的主要方法當(dāng)然是selectImports(),注意這里會(huì)先調(diào)用父類(lèi)的selectImports() org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)
public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector"); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null) { throw new IllegalArgumentException(String.format( "@%s is not present on importing class '%s' as expected", annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); // 模板方法模式,回調(diào)子類(lèi)的selectImports String[] imports = selectImports(adviceMode); if (imports == null) { throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode); } return imports; }
org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports
public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: // 奇怪???@Transaction、@EnableCaching都是注入兩個(gè)類(lèi),一個(gè)config,一個(gè)registrar導(dǎo)入aop的入口類(lèi) // 而這里只有一個(gè)config類(lèi)ProxyAsyncConfiguration return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
AsyncConfigurationSelector又導(dǎo)入了配置類(lèi)ProxyAsyncConfiguration。
ProxyAsyncConfiguration
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { /** * 先看父類(lèi)AbstractAsyncConfiguration * @return */ @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); // 實(shí)例化AsyncAnnotationBeanPostProcessor AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
ProxyAsyncConfiguration向容器中注入了一個(gè)AsyncAnnotationBeanPostProcessor。
疑問(wèn):這里為啥是BeanPostProcessor,不應(yīng)該像事務(wù)切面或者緩存切面一樣,注入一個(gè)Advisor和XxxxInterceptor(Advice)嗎?
AbstractAsyncConfiguration
AbstractAsyncConfiguration是ProxyAsyncConfiguration的父類(lèi)。
@Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableAsync; @Nullable protected Supplier<Executor> executor; @Nullable protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; /** * 實(shí)現(xiàn)了ImportAware.setImportMetadata * 在ProxyAsyncConfiguration初始化后被調(diào)用 * @param importMetadata */ @Override public void setImportMetadata(AnnotationMetadata importMetadata) { // 取得@EnableAsync注解 this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { // configurers默認(rèn)為空,除非手動(dòng)注入AsyncConfigurer if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }
從這里可以看出,可以通過(guò)向spring容器中注入AsyncConfigurer來(lái)指定執(zhí)行異步任務(wù)的線程池和異常處理器。
AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor的繼承結(jié)構(gòu)圖:
AsyncAnnotationBeanPostProcessor主要實(shí)現(xiàn)了BeanFactoryAware和BeanPostProcessor接口。
org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); // 實(shí)例化Advisor AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
在AsyncAnnotationBeanPostProcessor實(shí)例化時(shí)實(shí)例化了切面AsyncAnnotationAdvisor。
每個(gè)bean實(shí)例化完后都會(huì)調(diào)用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判斷是否要生成代理對(duì)象。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { ... ... /** * @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String) */ // isEligible會(huì)判斷哪些bean要生成代理 // 就是使用advisor中的pointcut進(jìn)行匹配 if (isEligible(bean, beanName)) { // 創(chuàng)建代理 ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
AsyncAnnotationAdvisor
切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切點(diǎn)ComposablePointcut。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); // 創(chuàng)建AnnotationAsyncExecutionInterceptor this.pointcut = buildPointcut(asyncAnnotationTypes); // 創(chuàng)建ComposablePointcut } protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; } protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 類(lèi) Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法 if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); // 類(lèi)和方法的組合切點(diǎn) } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
AnnotationMatchingPointcut切面其實(shí)就是查看類(lèi)或者方法上面有沒(méi)有@Async注解。
AnnotationAsyncExecutionInterceptor
AnnotationAsyncExecutionInterceptor類(lèi)主要負(fù)責(zé)增強(qiáng)邏輯的實(shí)現(xiàn)。
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); // 獲得線程池 AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } // 將目標(biāo)方法的執(zhí)行封裝為Callable,方便提交到線程池 Callable<Object> task = () -> { try { // 執(zhí)行目標(biāo)方法 Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; // 提交任務(wù) return oSubmit(task, executor, invocation.getMethod().getReturnType()); }
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; /** * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier(java.lang.reflect.Method) */ // 獲得@Async注解中的value屬性中指定的taskExecutor名稱(chēng) String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { // 獲取默認(rèn)的taskExecutor targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
determineAsyncExecutor()負(fù)責(zé)獲取異步任務(wù)執(zhí)行的線程池,線程池的查找步驟如下:
- 從spring容器中尋找@Async注解中的value屬性中指定的taskExecutor
- 尋找默認(rèn)的defaultExecutor
默認(rèn)的defaultExecutor是怎么來(lái)的?
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure
public void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { // defaultExecutor默認(rèn)為從beanFactory獲取TaskExecutor或者bean名字為taskExecutor的Executor,beanFactory.getBean(TaskExecutor.class) this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); // exceptionHandler默認(rèn)為SimpleAsyncUncaughtExceptionHandler this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); }
defaultExecutor首先取參數(shù)傳入的defaultExecutor,這個(gè)參數(shù)來(lái)自接口AsyncConfigurer.getAsyncExecutor(),如果參數(shù)為null,那么就調(diào)用getDefaultExecutor(),注意這個(gè)方法子類(lèi)AsyncExecutionInterceptor重寫(xiě)了:
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
如果找不到defaultExecutor就會(huì)創(chuàng)建一個(gè)SimpleAsyncTaskExecutor。
再來(lái)看看父類(lèi)的AsyncExecutionAspectSupport#getDefaultExecutor: org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { // 找名為taskExecutor的Executor return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }
先從beanFactory中獲取TaskExecutor類(lèi)型的對(duì)象,然后再找名為taskExecutor的Executor對(duì)象。
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { // 執(zhí)行任務(wù) if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
doSubmit()負(fù)責(zé)將任務(wù)提交至線程池中,并對(duì)各種方法的返回值進(jìn)行處理。
到此這篇關(guān)于spring的異步執(zhí)行使用與源碼詳解的文章就介紹到這了,更多相關(guān)spring的異步執(zhí)行內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot 啟動(dòng)項(xiàng)目打印接口列表的實(shí)現(xiàn)
這篇文章主要介紹了springboot 啟動(dòng)項(xiàng)目打印接口列表的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09一文教會(huì)你用mybatis查詢(xún)數(shù)據(jù)庫(kù)數(shù)據(jù)
MyBatis本身是一個(gè)數(shù)據(jù)庫(kù)連接框架,可以認(rèn)為是JDBC的升級(jí)版,下面這篇文章主要給大家介紹了關(guān)于mybatis查詢(xún)數(shù)據(jù)庫(kù)數(shù)據(jù)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04線程池中使用spring aop事務(wù)增強(qiáng)
這篇文章主要介紹了線程池中使用spring aop事務(wù)增強(qiáng),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02Spring加載配置和讀取多個(gè)Properties文件的講解
今天小編就為大家分享一篇關(guān)于Spring加載配置和讀取多個(gè)Properties文件的講解,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03SpringBoot整合RabbitMQ實(shí)現(xiàn)消息確認(rèn)機(jī)制
這篇文章主要介紹了SpringBoot整合RabbitMQ實(shí)現(xiàn)消息確認(rèn)機(jī)制,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08Springboot之修改啟動(dòng)端口的兩種方式(小結(jié))
這篇文章主要介紹了Springboot之修改啟動(dòng)端口的兩種方式(小結(jié)),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09