Spring中的@Async原理分析
前言
之前編程都是自定義new ThreadPoolExecutor(。。。),并調(diào)用invokeAll等進(jìn)行并發(fā)編程。
后面發(fā)現(xiàn)只要在方法上添加@Async注解,并使用@EnableAsync進(jìn)行開啟,并且@since為Spring 3.1版本。
我使用的Spring 5版本的,默認(rèn)會使用SimpleAsyncTaskExecutor類型。就是一個大坑。
1、@Async
@Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation.class; boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; }
與之前分析@EnableTransactionManagement一樣,屬性都差不多。使用@Import方式將AsyncConfigurationSelector注冊為bean。
實(shí)現(xiàn)了ImportSelector接口
public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
@EnableAsync上沒有配置mode,則默認(rèn)使用jdk方式實(shí)現(xiàn)。返回ProxyAsyncConfiguration將其注入為bean。
2、ProxyAsyncConfiguration
1)、實(shí)現(xiàn)ImportAware
則在ProxyAsyncConfiguration初始化為bean時,會進(jìn)行回調(diào),實(shí)現(xiàn)方法如下:
public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } }
獲取@EnableAsync注解上的配置信息,并保存到 enableAsync屬性中。
2)、AsyncAnnotationBeanPostProcessor
將 AsyncAnnotationBeanPostProcessor初始化為bean
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; }
3、AsyncAnnotationBeanPostProcessor
實(shí)現(xiàn)了很多Aware接口,注入了BeanFactory和BeanClassLoader,主要是在setBeanFactory方法中:
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
new 了一個AsyncAnnotationAdvisor,而線程池和異常處理器是從初始化 ProxyAsyncConfiguration時傳入的,默認(rèn)都為null。構(gòu)造器如下:
public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); }
buildAdvice:構(gòu)建攔截器
protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
初始化了一個AnnotationAsyncExecutionInterceptor 攔截器,后續(xù)進(jìn)行分析。使用有參構(gòu)造,但是異步任務(wù)的線程池為null。
buildPointcut:根據(jù)Async構(gòu)建攔截匹配點(diǎn)
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; // asyncAnnotationTypes默認(rèn)只要Async類型 for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { // result肯定是null,先添加Class類型的切點(diǎn)匹配器 result = new ComposablePointcut(cpc); } else { result.union(cpc); } // 再添加Method類型的切點(diǎn)攔截器 result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
默認(rèn)情況下 asyncAnnotationTypes中只要Async類型,則初始化了配置Async的類和方法的 匹配攔截器(AnnotationMatchingPointcut),并且都添加到ComposablePointcut中。
一切初始化完成后,在每個bean的生命周期都會進(jìn)行回調(diào) postProcessAfterInitialization方法:
public Object postProcessAfterInitialization(Object bean, String beanName) { if (this.advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) { ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.copyFrom(this); proxyFactory.setTarget(bean); return proxyFactory; }
4、AnnotationAsyncExecutionInterceptor
顯然核心實(shí)現(xiàn)在 invoke方法中:
public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on " + " AsyncExecutionInterceptor either"); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
先獲取執(zhí)行的方法信息,再判斷執(zhí)行的異步線程池,再講任務(wù)提交給線程池。
1)、獲取線程池(determineAsyncExecutor)
之前初始化的時候,傳入的線程池為null,則:
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) { this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new); }
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }
beanFactory.getBean(TaskExecutor.class)
最后是獲取了BeanFactory中的TaskExecutor的子類的bean(可能不存在)。
protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
使用本地緩存ConcurrentHashMap, key為Methed, value為線程池。
1、先獲取執(zhí)行的方法的@Async的value值
protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); }
如果獲取到配置的值(如定義方法時為:@Async("order") ),則獲取正在的線程池
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) { if (beanFactory == null) { throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'"); } return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier); }
2、如果@Async上沒有配置,則獲取默認(rèn)值
targetExecutor = this.defaultExecutor.get();
就是之前從BeanFactory中獲取TaskExecutor.class類型的實(shí)現(xiàn),當(dāng)前版本為spring5,,獲取到的類型為SimpleAsyncTaskExecutor
2)、執(zhí)行任務(wù)(doSubmit)
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (completableFuturePresent) { Future<Object> result = AsyncExecutionAspectSupport.CompletableFutureDelegate .processCompletableFuture(returnType, task, executor); if (result != null) { return result; } } if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor)executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
根據(jù)我們定義的方法的返回值進(jìn)行處理,返回值可以是 null、Future、Spring的AsyncResult是ListenableFuture的子類。
5、SimpleAsyncTaskExecutor
如果使用@Async沒有配置線程池,并且沒有給AnnotationAsyncExecutionInterceptor設(shè)置線程池,則調(diào)用時就是一個坑,每次創(chuàng)建一個線程。
submit()方法:
@Override public <T> Future<T> submit(Callable<T> task) { FutureTask<T> future = new FutureTask<>(task); execute(future, TIMEOUT_INDEFINITE); return future; }
execute()執(zhí)行方法:
@Override public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); } else { doExecute(taskToUse); } }
doExecute()方法:
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; }
是否初始化了線程工廠,有則用工廠進(jìn)行new,否則還是new。也就是說只要使用默認(rèn)SimpleAsyncTaskExecutor線程池,每次執(zhí)行任務(wù)就new一個新的線程。
到此這篇關(guān)于Spring中的@Async原理分析的文章就介紹到這了,更多相關(guān)@Async原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于Java的Condition接口最佳理解方式
這篇文章主要介紹了關(guān)于Java的Condition接口最佳理解方式,Condition就是實(shí)現(xiàn)了管程里面的條件變量,Java?語言內(nèi)置的管程里只有一個條件變量,而Lock&Condition實(shí)現(xiàn)的管程支持多個條件變量,需要的朋友可以參考下2023-05-05Windows10系統(tǒng)下修改jar中的文件并重新打包成jar文件然后運(yùn)行的操作步驟
這篇文章主要介紹了Windows10系統(tǒng)下修改jar中的文件并重新打包成jar文件然后運(yùn)行的操作步驟,文中通過圖文結(jié)合的形式給大家講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-08-08SpringMVC前后端傳值的幾種實(shí)現(xiàn)方式
本文主要介紹了SpringMVC前后端傳值的方式實(shí)現(xiàn),包括使用HttpServletRequest、HttpSession、Model和ModelAndView等方法,具有一定的參考價值,感興趣的可以了解一下2025-02-02Java警告:原發(fā)性版11需要目標(biāo)發(fā)行版11的解決方法和步驟
這篇文章主要介紹了Java警告:原發(fā)性版11需要目標(biāo)發(fā)行版11的解決方法和步驟,文中通過圖文介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用java具有一定的參考借鑒價值,需要的朋友可以參考下2025-04-04