Async的線程池使用選擇解析
前言
在Spring中我們經(jīng)常會(huì)用到異步操作,注解中使用 @EnableAsync
和 @Async
就可以使用它了。但是最近發(fā)現(xiàn)在異步中線程號(hào)使用的是我們項(xiàng)目中自定義的線程池 ThreadPoolTaskExecutor
而不是之前熟悉的 SimpleAsyncTaskExecutor
那么來(lái)看一下他的執(zhí)行過(guò)程吧。
正文
- 首先要使異步生效,我們得在啟動(dòng)類(lèi)中加入
@EnableAsync
那么就點(diǎn)開(kāi)它看看。它會(huì)使用@Import
注入一個(gè)AsyncConfigurationSelector
類(lèi),啟動(dòng)是通過(guò)父類(lèi)可以決定它使用的是配置類(lèi)ProxyAsyncConfiguration
。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; public AsyncConfigurationSelector() { } @Nullable public String[] selectImports(AdviceMode adviceMode) { switch(adviceMode) { case PROXY: return new String[]{ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"}; default: return null; } } }
- 點(diǎn)開(kāi)能夠看到注入一個(gè)
AsyncAnnotationBeanPostProcessor
。它實(shí)現(xiàn)了BeanPostProcessor
接口,因此它是一個(gè)后處理器,用于將Spring AOP
的Advisor
應(yīng)用于給定的bean
。從而該bean
上通過(guò)異步注解所定義的方法在調(diào)用時(shí)會(huì)被真正地異步調(diào)用起來(lái)。
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { public ProxyAsyncConfiguration() { } @Bean( name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"} ) @Role(2) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder((Integer)this.enableAsync.getNumber("order")); return bpp; } }
AsyncAnnotationBeanPostProcessor
的父類(lèi)實(shí)現(xiàn)了BeanFactoryAware
,那么會(huì)在AsyncAnnotationBeanPostProcessor
實(shí)例化之后回調(diào)setBeanFactory()
來(lái)實(shí)例化切面AsyncAnnotationAdvisor
。
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); //定義一個(gè)切面 AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
AsyncAnnotationAdvisor
構(gòu)造和聲明切入的目標(biāo)(切點(diǎn))和代碼增強(qiáng)(通知)。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } //通知 this.advice = buildAdvice(executor, exceptionHandler); //切入點(diǎn) this.pointcut = buildPointcut(asyncAnnotationTypes); }
- 通知就是最終要執(zhí)行的。
buildAdvice
用于構(gòu)建通知,主要是創(chuàng)建一個(gè)AnnotationAsyncExecutionInterceptor
類(lèi)型的攔截器,并且配置好使用的執(zhí)行器和異常處理器。真正的異步執(zhí)行的代碼在AsyncExecutionAspectSupport
中!
protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); //配置攔截器 interceptor.configure(executor, exceptionHandler); return interceptor; }
- 配置攔截器,通過(guò)參數(shù)配置自定義的執(zhí)行器和異常處理器或者使用默認(rèn)的執(zhí)行器和異常處理器。
public void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { //默認(rèn)執(zhí)行器 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); }
getDefaultExecutor()
方法,用來(lái)查找默認(rèn)的執(zhí)行器,父類(lèi)AsyncExecutionAspectSupport
首先尋找唯一一個(gè)類(lèi)型為TaskExecutor
的執(zhí)行器并返回,若存在多個(gè)則尋找默認(rèn)的執(zhí)行器taskExecutor
,若無(wú)法找到則直接返回null。子類(lèi)AsyncExecutionInterceptor
重寫(xiě)getDefaultExecutor
方法,首先調(diào)用父類(lèi)邏輯,返回null則配置一個(gè)名為SimpleAsyncTaskExecutor
的執(zhí)行器
/** * 父類(lèi) * 獲取或構(gòu)建此通知實(shí)例的默認(rèn)執(zhí)行器 * 這里返回的執(zhí)行器將被緩存以供后續(xù)使用 * 默認(rèn)實(shí)現(xiàn)搜索唯一的TaskExecutor的bean * 在上下文中,用于名為“taskExecutor”的Executor bean。 * 如果兩者都不是可解析的,這個(gè)實(shí)現(xiàn)將返回 null */ @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // 搜索唯一的一個(gè)TaskExecutor類(lèi)型的bean并返回 return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { //找不到唯一一個(gè)bean異常后,搜索一個(gè)TaskExecutor類(lèi)型的“taskExecutor”的bean并返回 logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { //未找到異常時(shí)搜索一個(gè)TaskExecutor類(lèi)型的“taskExecutor”的bean并返回 logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; } /** * 子類(lèi) * 如父類(lèi)為null則重新實(shí)例化一個(gè)名為SimpleAsyncTaskExecutor的執(zhí)行器 */ @Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
所以,到了這一步就可以理解為什么異步線程名默認(rèn)叫 SimpleAsyncTaskExecutor-xx
,為什么有了自己的線程池有可能異步用到了自己的線程池配置。
我們有這個(gè)切入點(diǎn)之后,每次請(qǐng)求接口執(zhí)行異步方法前都會(huì)執(zhí)行 AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
用來(lái)決策使用哪個(gè)執(zhí)行器
@Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { //在緩存的執(zhí)行器中選擇一個(gè)對(duì)應(yīng)方法的執(zhí)行器 AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method); if (executor == null) { //獲取@Async注解中的value(指定的執(zhí)行器) String qualifier = this.getExecutorQualifier(method); Executor targetExecutor; if (StringUtils.hasLength(qualifier)) { //獲取指定執(zhí)行器的bean targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier); } else { //選擇默認(rèn)的執(zhí)行器 targetExecutor = (Executor)this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor); //將執(zhí)行器進(jìn)行緩存 this.executors.put(method, executor); } return (AsyncTaskExecutor)executor; }
當(dāng)有了執(zhí)行器調(diào)用 doSubmit
方法將任務(wù)加入到執(zhí)行器中。
異步任務(wù),默認(rèn)將采用SimpleAsyncTaskExecutor作為執(zhí)行器!它有如下特點(diǎn):
不復(fù)用線程,也就是說(shuō)為每個(gè)任務(wù)新起一個(gè)線程。但是可以通過(guò) concurrencyLimit
屬性來(lái)控制并發(fā)線程數(shù)量,但是默認(rèn)情況下不做限制( concurrencyLimit
取值為-1)。
因此,如果我們使用異步任務(wù),一定不能采用默認(rèn)執(zhí)行器的配置,以防OOM異常!最好的方式是指定執(zhí)行器!
總結(jié)
本文主要以看源碼的方式來(lái)了解異步注解 @Async
是如何在項(xiàng)目中選擇線程以及使用線程的,盡量給異步任務(wù)指定一個(gè)獨(dú)有線程池,這樣會(huì)的避免不與其他業(yè)務(wù)共用線程池而造成影響。
以上就是Async的線程池使用選擇解析的詳細(xì)內(nèi)容,更多關(guān)于Async線程池使用的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java-URLDecoder、URLEncoder使用及說(shuō)明
本文介紹了Java中URLDecoder和URLEncoder類(lèi)的使用方法,包括編碼和解碼規(guī)則、推薦的編碼方案、解碼器處理非法字符的方法以及URL編碼和解碼的示例2024-12-12解決@Scheduled定時(shí)器使用@Thransactional事物問(wèn)題
這篇文章主要介紹了解決@Scheduled定時(shí)器使用@Thransactional事物問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08解決阿里代碼規(guī)范檢測(cè)中方法缺少javadoc注釋的問(wèn)題
這篇文章主要介紹了解決阿里代碼規(guī)范檢測(cè)中方法缺少javadoc注釋的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08利用Java實(shí)現(xiàn)圖片轉(zhuǎn)化為ASCII圖的示例代碼
本文將詳細(xì)講解如何利用 Java 實(shí)現(xiàn)圖片轉(zhuǎn)化為 ASCII 圖,從項(xiàng)目背景與意義、相關(guān)技術(shù)知識(shí),到系統(tǒng)需求與架構(gòu)設(shè)計(jì),再到詳細(xì)實(shí)現(xiàn)思路、完整代碼和代碼解讀,最后對(duì)項(xiàng)目進(jìn)行總結(jié)與展望,需要的朋友可以參考下2025-03-03Spring?Boot?項(xiàng)目中?JPA?語(yǔ)法的基本使用方法
這篇文章主要介紹了?Spring?Boot?項(xiàng)目中?JPA?語(yǔ)法的基本使用方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-10-10idea中@Autowired注解下變量報(bào)紅的解決
這篇文章主要介紹了idea中@Autowired注解下變量報(bào)紅的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11