spring的異步執(zhí)行使用與源碼詳解
在實際的開發(fā)過程中,有些業(yè)務(wù)邏輯使用異步的方式處理更為合理。比如在某個業(yè)務(wù)邏輯中,需要把一些數(shù)據(jù)存入到redis緩存中,這個操作只是一個輔助的功能,成功或者失敗對主業(yè)務(wù)并不會產(chǎn)生根本影響,這個過程可以通過異步的方法去進行。
Spring中通過在方法上設(shè)置@Async
注解,可使得方法被異步調(diào)用。也就是說該方法會在調(diào)用時立即返回,而這個方法的實際執(zhí)行交給Spring的TaskExecutor去完成。
異步執(zhí)行的使用
配置類
使用@EnableAsync注解開啟異步功能。
package com.morris.spring.config; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @EnableAsync // 開啟Async public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { // 自定義線程池 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(4); executor.setQueueCapacity(10); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } }
service層的使用
在需要異步執(zhí)行的方法上面加上@Async注解。
package com.morris.spring.service; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @Slf4j public class AsyncService { @Async public void noResult() { log.info("execute noResult"); } @Async public Future<String> hasResult() throws InterruptedException { log.info("execute hasResult"); TimeUnit.SECONDS.sleep(5); return new AsyncResult<>("hasResult success"); } @Async public CompletableFuture<String> completableFuture() throws InterruptedException { log.info(" execute completableFuture"); TimeUnit.SECONDS.sleep(5); return CompletableFuture.completedFuture("completableFuture success"); } }
測試類
package com.morris.spring.demo.async; import com.morris.spring.config.AsyncConfig; import com.morris.spring.service.AsyncService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * 異步調(diào)用的演示 */ @Slf4j public class AsyncDemo { @Test public void test() throws ExecutionException, InterruptedException { AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(); applicationContext.register(AsyncService.class); applicationContext.register(AsyncConfig.class); applicationContext.refresh(); AsyncService asyncService = applicationContext.getBean(AsyncService.class); asyncService.noResult(); // 無結(jié)果 Future<String> future = asyncService.hasResult(); log.info("hasResult: {}", future.get()); // 有結(jié)果 CompletableFuture<String> completableFuture = asyncService.completableFuture(); completableFuture.thenAcceptAsync(System.out::println);// 異步回調(diào) log.info("completableFuture call down"); } }
運行結(jié)果如下:
INFO MyExecutor-1 AsyncService:16 - execute noResult INFO MyExecutor-2 AsyncService:21 - execute hasResult INFO main AsyncDemo:29 - hasResult: hasResult success INFO MyExecutor-1 AsyncService:28 - execute completableFuture INFO main AsyncDemo:33 - completableFuture call down
通過日志可以發(fā)現(xiàn)AsyncService的方法都是通過線程名為MyExecutor-1的線程執(zhí)行的,這個名稱的前綴是在AsyncConfig中指定的,而不是通過main線程執(zhí)行的。
兩個疑問:
- 是否可以不配置Executor線程池,Spring會默認(rèn)創(chuàng)建默認(rèn)的Executor,還是會報錯?
- Executor線程池中執(zhí)行任務(wù)時如果拋出了異常,可否自定義異常的處理類對異常進行捕獲處理?
源碼分析
@EnableAsync
@EnableAsync主要是向Spring容器中導(dǎo)入了AsyncConfigurationSelector類。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync {
AsyncConfigurationSelector
AsyncConfigurationSelector的主要方法當(dāng)然是selectImports(),注意這里會先調(diào)用父類的selectImports() org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)
public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector"); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null) { throw new IllegalArgumentException(String.format( "@%s is not present on importing class '%s' as expected", annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); // 模板方法模式,回調(diào)子類的selectImports String[] imports = selectImports(adviceMode); if (imports == null) { throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode); } return imports; }
org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports
public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: // 奇怪???@Transaction、@EnableCaching都是注入兩個類,一個config,一個registrar導(dǎo)入aop的入口類 // 而這里只有一個config類ProxyAsyncConfiguration return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
AsyncConfigurationSelector又導(dǎo)入了配置類ProxyAsyncConfiguration。
ProxyAsyncConfiguration
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { /** * 先看父類AbstractAsyncConfiguration * @return */ @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); // 實例化AsyncAnnotationBeanPostProcessor AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
ProxyAsyncConfiguration向容器中注入了一個AsyncAnnotationBeanPostProcessor。
疑問:這里為啥是BeanPostProcessor,不應(yīng)該像事務(wù)切面或者緩存切面一樣,注入一個Advisor和XxxxInterceptor(Advice)嗎?
AbstractAsyncConfiguration
AbstractAsyncConfiguration是ProxyAsyncConfiguration的父類。
@Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableAsync; @Nullable protected Supplier<Executor> executor; @Nullable protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; /** * 實現(xiàn)了ImportAware.setImportMetadata * 在ProxyAsyncConfiguration初始化后被調(diào)用 * @param importMetadata */ @Override public void setImportMetadata(AnnotationMetadata importMetadata) { // 取得@EnableAsync注解 this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { // configurers默認(rèn)為空,除非手動注入AsyncConfigurer if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }
從這里可以看出,可以通過向spring容器中注入AsyncConfigurer來指定執(zhí)行異步任務(wù)的線程池和異常處理器。
AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor的繼承結(jié)構(gòu)圖:
AsyncAnnotationBeanPostProcessor主要實現(xiàn)了BeanFactoryAware和BeanPostProcessor接口。
org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); // 實例化Advisor AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
在AsyncAnnotationBeanPostProcessor實例化時實例化了切面AsyncAnnotationAdvisor。
每個bean實例化完后都會調(diào)用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判斷是否要生成代理對象。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { ... ... /** * @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String) */ // isEligible會判斷哪些bean要生成代理 // 就是使用advisor中的pointcut進行匹配 if (isEligible(bean, beanName)) { // 創(chuàng)建代理 ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
AsyncAnnotationAdvisor
切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切點ComposablePointcut。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); // 創(chuàng)建AnnotationAsyncExecutionInterceptor this.pointcut = buildPointcut(asyncAnnotationTypes); // 創(chuàng)建ComposablePointcut } protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; } protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 類 Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法 if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); // 類和方法的組合切點 } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
AnnotationMatchingPointcut切面其實就是查看類或者方法上面有沒有@Async注解。
AnnotationAsyncExecutionInterceptor
AnnotationAsyncExecutionInterceptor類主要負(fù)責(zé)增強邏輯的實現(xiàn)。
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); // 獲得線程池 AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } // 將目標(biāo)方法的執(zhí)行封裝為Callable,方便提交到線程池 Callable<Object> task = () -> { try { // 執(zhí)行目標(biāo)方法 Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; // 提交任務(wù) return oSubmit(task, executor, invocation.getMethod().getReturnType()); }
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; /** * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier(java.lang.reflect.Method) */ // 獲得@Async注解中的value屬性中指定的taskExecutor名稱 String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { // 獲取默認(rèn)的taskExecutor targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
determineAsyncExecutor()負(fù)責(zé)獲取異步任務(wù)執(zhí)行的線程池,線程池的查找步驟如下:
- 從spring容器中尋找@Async注解中的value屬性中指定的taskExecutor
- 尋找默認(rèn)的defaultExecutor
默認(rèn)的defaultExecutor是怎么來的?
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure
public void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { // defaultExecutor默認(rèn)為從beanFactory獲取TaskExecutor或者bean名字為taskExecutor的Executor,beanFactory.getBean(TaskExecutor.class) this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); // exceptionHandler默認(rèn)為SimpleAsyncUncaughtExceptionHandler this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); }
defaultExecutor首先取參數(shù)傳入的defaultExecutor,這個參數(shù)來自接口AsyncConfigurer.getAsyncExecutor(),如果參數(shù)為null,那么就調(diào)用getDefaultExecutor(),注意這個方法子類AsyncExecutionInterceptor重寫了:
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
如果找不到defaultExecutor就會創(chuàng)建一個SimpleAsyncTaskExecutor。
再來看看父類的AsyncExecutionAspectSupport#getDefaultExecutor: org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { // 找名為taskExecutor的Executor return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }
先從beanFactory中獲取TaskExecutor類型的對象,然后再找名為taskExecutor的Executor對象。
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { // 執(zhí)行任務(wù) if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
doSubmit()負(fù)責(zé)將任務(wù)提交至線程池中,并對各種方法的返回值進行處理。
到此這篇關(guān)于spring的異步執(zhí)行使用與源碼詳解的文章就介紹到這了,更多相關(guān)spring的異步執(zhí)行內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot 啟動項目打印接口列表的實現(xiàn)
這篇文章主要介紹了springboot 啟動項目打印接口列表的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09一文教會你用mybatis查詢數(shù)據(jù)庫數(shù)據(jù)
MyBatis本身是一個數(shù)據(jù)庫連接框架,可以認(rèn)為是JDBC的升級版,下面這篇文章主要給大家介紹了關(guān)于mybatis查詢數(shù)據(jù)庫數(shù)據(jù)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04Spring加載配置和讀取多個Properties文件的講解
今天小編就為大家分享一篇關(guān)于Spring加載配置和讀取多個Properties文件的講解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03SpringBoot整合RabbitMQ實現(xiàn)消息確認(rèn)機制
這篇文章主要介紹了SpringBoot整合RabbitMQ實現(xiàn)消息確認(rèn)機制,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-08-08Springboot之修改啟動端口的兩種方式(小結(jié))
這篇文章主要介紹了Springboot之修改啟動端口的兩種方式(小結(jié)),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09