欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring中的@Async原理分析

 更新時間:2024年01月09日 09:00:12   作者:it_lihongmin  
這篇文章主要介紹了Spring中的@Async原理分析,自定義new ThreadPoolExecutor并調(diào)用invokeAll等進(jìn)行并發(fā)編程,后面發(fā)現(xiàn)只要在方法上添加@Async注解,并使用@EnableAsync進(jìn)行開啟默認(rèn)會使用SimpleAsyncTaskExecutor類型,需要的朋友可以參考下

前言

之前編程都是自定義new ThreadPoolExecutor(。。。),并調(diào)用invokeAll等進(jìn)行并發(fā)編程。

后面發(fā)現(xiàn)只要在方法上添加@Async注解,并使用@EnableAsync進(jìn)行開啟,并且@since為Spring 3.1版本。

我使用的Spring 5版本的,默認(rèn)會使用SimpleAsyncTaskExecutor類型。就是一個大坑。

1、@Async

@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
 
    Class<? extends Annotation> annotation() default Annotation.class;
 
    boolean proxyTargetClass() default false;
 
    AdviceMode mode() default AdviceMode.PROXY;
 
    int order() default Ordered.LOWEST_PRECEDENCE;
}

與之前分析@EnableTransactionManagement一樣,屬性都差不多。使用@Import方式將AsyncConfigurationSelector注冊為bean。

實(shí)現(xiàn)了ImportSelector接口 

public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
        case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
        default:
            return null;
    }
}

@EnableAsync上沒有配置mode,則默認(rèn)使用jdk方式實(shí)現(xiàn)。返回ProxyAsyncConfiguration將其注入為bean。

2、ProxyAsyncConfiguration

1)、實(shí)現(xiàn)ImportAware

則在ProxyAsyncConfiguration初始化為bean時,會進(jìn)行回調(diào),實(shí)現(xiàn)方法如下:

public void setImportMetadata(AnnotationMetadata importMetadata) {
	this.enableAsync = AnnotationAttributes.fromMap(
			importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
	if (this.enableAsync == null) {
		throw new IllegalArgumentException(
				"@EnableAsync is not present on importing class " + 
           importMetadata.getClassName());
	}
}

獲取@EnableAsync注解上的配置信息,并保存到 enableAsync屬性中。

2)、AsyncAnnotationBeanPostProcessor

將 AsyncAnnotationBeanPostProcessor初始化為bean

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
	Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
	AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
	bpp.configure(this.executor, this.exceptionHandler);
	Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
	if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class,
         "annotation")) {
		bpp.setAsyncAnnotationType(customAsyncAnnotation);
	}
	bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
	bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
	return bpp;
}

3、AsyncAnnotationBeanPostProcessor

實(shí)現(xiàn)了很多Aware接口,注入了BeanFactory和BeanClassLoader,主要是在setBeanFactory方法中:

public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
 
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}

new 了一個AsyncAnnotationAdvisor,而線程池和異常處理器是從初始化 ProxyAsyncConfiguration時傳入的,默認(rèn)都為null。構(gòu)造器如下:

public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, 
    @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
 
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
        ClassUtils.forName("javax.ejb.Asynchronous", 
        AsyncAnnotationAdvisor.class.getClassLoader()));
    } catch (ClassNotFoundException ex) {
    // If EJB 3.1 API not present, simply ignore.
    }
    this.advice = buildAdvice(executor, exceptionHandler);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}

buildAdvice:構(gòu)建攔截器

protected Advice buildAdvice(@Nullable Supplier<Executor> executor, 
    @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
 
    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    interceptor.configure(executor, exceptionHandler);
    return interceptor;
}

初始化了一個AnnotationAsyncExecutionInterceptor 攔截器,后續(xù)進(jìn)行分析。使用有參構(gòu)造,但是異步任務(wù)的線程池為null。

buildPointcut:根據(jù)Async構(gòu)建攔截匹配點(diǎn)

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    ComposablePointcut result = null;
    // asyncAnnotationTypes默認(rèn)只要Async類型
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
        if (result == null) {
            // result肯定是null,先添加Class類型的切點(diǎn)匹配器
            result = new ComposablePointcut(cpc);
        } else {
            result.union(cpc);
        }
        // 再添加Method類型的切點(diǎn)攔截器
        result = result.union(mpc);
    }
    return (result != null ? result : Pointcut.TRUE);
}

默認(rèn)情況下 asyncAnnotationTypes中只要Async類型,則初始化了配置Async的類和方法的 匹配攔截器(AnnotationMatchingPointcut),并且都添加到ComposablePointcut中。

一切初始化完成后,在每個bean的生命周期都會進(jìn)行回調(diào) postProcessAfterInitialization方法:

public Object postProcessAfterInitialization(Object bean, String beanName) {
	if (this.advisor == null || bean instanceof AopInfrastructureBean) {
		// Ignore AOP infrastructure such as scoped proxies.
		return bean;
	}
 
	if (bean instanceof Advised) {
		Advised advised = (Advised) bean;
		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
			// Add our local Advisor to the existing proxy's Advisor chain...
			if (this.beforeExistingAdvisors) {
				advised.addAdvisor(0, this.advisor);
			}
			else {
				advised.addAdvisor(this.advisor);
			}
			return bean;
		}
	}
 
	if (isEligible(bean, beanName)) {
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);
		return proxyFactory.getProxy(getProxyClassLoader());
	}
 
	// No proxy needed.
	return bean;
}
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.copyFrom(this);
    proxyFactory.setTarget(bean);
    return proxyFactory;
}

4、AnnotationAsyncExecutionInterceptor

顯然核心實(shí)現(xiàn)在 invoke方法中:

public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
 
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
        throw new IllegalStateException(
			"No executor specified and no default executor set on " +
            " AsyncExecutionInterceptor either");
    }
 
    Callable<Object> task = () -> {
        try {
            Object result = invocation.proceed();
            if (result instanceof Future) {
                return ((Future<?>) result).get();
            }
        } catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        } catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

先獲取執(zhí)行的方法信息,再判斷執(zhí)行的異步線程池,再講任務(wù)提交給線程池。

1)、獲取線程池(determineAsyncExecutor)

之前初始化的時候,傳入的線程池為null,則:

public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
	this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
	this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	if (beanFactory != null) {
		try {
			// Search for TaskExecutor bean... not plain Executor since that would
			// match with ScheduledExecutorService as well, which is unusable for
			// our purposes here. TaskExecutor is more clearly designed for it.
			return beanFactory.getBean(TaskExecutor.class);
		}
		catch (NoUniqueBeanDefinitionException ex) {
			logger.debug("Could not find unique TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				if (logger.isInfoEnabled()) {
					logger.info("More than one TaskExecutor bean found within the context, and none is named " +
							"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
							"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
				}
			}
		}
		catch (NoSuchBeanDefinitionException ex) {
			logger.debug("Could not find default TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				logger.info("No task executor bean found for async processing: " +
						"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
			}
			// Giving up -> either using local default executor or none at all...
		}
	}
	return null;
}

beanFactory.getBean(TaskExecutor.class)

最后是獲取了BeanFactory中的TaskExecutor的子類的bean(可能不存在)。

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
        Executor targetExecutor;
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        } else {
            targetExecutor = this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
            (AsyncListenableTaskExecutor) targetExecutor : new 
                 TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    return executor;
}

使用本地緩存ConcurrentHashMap, key為Methed, value為線程池。

1、先獲取執(zhí)行的方法的@Async的value值

protected String getExecutorQualifier(Method method) {
    // Maintainer's note: changes made here should also be made in
    // AnnotationAsyncExecutionAspect#getExecutorQualifier
    Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
    if (async == null) {
        async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
    }
    return (async != null ? async.value() : null);
}

如果獲取到配置的值(如定義方法時為:@Async("order") ),則獲取正在的線程池

protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
    if (beanFactory == null) {
        throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
					" to access qualified executor '" + qualifier + "'");
    }
    return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}

2、如果@Async上沒有配置,則獲取默認(rèn)值

 targetExecutor = this.defaultExecutor.get();

就是之前從BeanFactory中獲取TaskExecutor.class類型的實(shí)現(xiàn),當(dāng)前版本為spring5,,獲取到的類型為SimpleAsyncTaskExecutor

2)、執(zhí)行任務(wù)(doSubmit)

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, 
    Class<?> returnType) {
    
    if (completableFuturePresent) {
        Future<Object> result = AsyncExecutionAspectSupport.CompletableFutureDelegate
        .processCompletableFuture(returnType, task, executor);
        if (result != null) {
            return result;
        }
    }
 
    if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor)executor).submitListenable(task);
    } else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    } else {
        executor.submit(task);
        return null;
    }
}

根據(jù)我們定義的方法的返回值進(jìn)行處理,返回值可以是 null、Future、Spring的AsyncResult是ListenableFuture的子類。

5、SimpleAsyncTaskExecutor

如果使用@Async沒有配置線程池,并且沒有給AnnotationAsyncExecutionInterceptor設(shè)置線程池,則調(diào)用時就是一個坑,每次創(chuàng)建一個線程。

submit()方法:

@Override
public <T> Future<T> submit(Callable<T> task) {
    FutureTask<T> future = new FutureTask<>(task);
    execute(future, TIMEOUT_INDEFINITE);
    return future;
}

execute()執(zhí)行方法:

@Override
public void execute(Runnable task, long startTimeout) {
    Assert.notNull(task, "Runnable must not be null");
    Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        this.concurrencyThrottle.beforeAccess();
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    } else {
        doExecute(taskToUse);
    }
}

doExecute()方法:

protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) 
        : createThread(task));
    thread.start();
}
public Thread createThread(Runnable runnable) {
    Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
    thread.setPriority(getThreadPriority());
    thread.setDaemon(isDaemon());
    return thread;
}

是否初始化了線程工廠,有則用工廠進(jìn)行new,否則還是new。也就是說只要使用默認(rèn)SimpleAsyncTaskExecutor線程池,每次執(zhí)行任務(wù)就new一個新的線程。

到此這篇關(guān)于Spring中的@Async原理分析的文章就介紹到這了,更多相關(guān)@Async原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java模擬新浪微博登陸抓取數(shù)據(jù)

    Java模擬新浪微博登陸抓取數(shù)據(jù)

    本文主要介紹了Java模擬新浪微博登陸抓取數(shù)據(jù)的實(shí)現(xiàn)方法。具有很好的參考價值,下面跟著小編一起來看下吧
    2017-02-02
  • 使用SpringBoot_jar方式啟動并配置日志文件

    使用SpringBoot_jar方式啟動并配置日志文件

    這篇文章主要介紹了使用SpringBoot_jar方式啟動并配置日志文件操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • 關(guān)于Java的Condition接口最佳理解方式

    關(guān)于Java的Condition接口最佳理解方式

    這篇文章主要介紹了關(guān)于Java的Condition接口最佳理解方式,Condition就是實(shí)現(xiàn)了管程里面的條件變量,Java?語言內(nèi)置的管程里只有一個條件變量,而Lock&Condition實(shí)現(xiàn)的管程支持多個條件變量,需要的朋友可以參考下
    2023-05-05
  • 詳解Java如何利用位操作符創(chuàng)建位掩碼

    詳解Java如何利用位操作符創(chuàng)建位掩碼

    在本文中,我們來看看如何使用位操作符實(shí)現(xiàn)低級別的位掩碼。我們將看到我們?nèi)绾螌⒁粋€單一的int變量作為一個單獨(dú)的數(shù)據(jù)容器,感興趣的可以跟隨小編一起學(xué)習(xí)一下
    2022-10-10
  • Windows10系統(tǒng)下修改jar中的文件并重新打包成jar文件然后運(yùn)行的操作步驟

    Windows10系統(tǒng)下修改jar中的文件并重新打包成jar文件然后運(yùn)行的操作步驟

    這篇文章主要介紹了Windows10系統(tǒng)下修改jar中的文件并重新打包成jar文件然后運(yùn)行的操作步驟,文中通過圖文結(jié)合的形式給大家講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2024-08-08
  • SpringMVC前后端傳值的幾種實(shí)現(xiàn)方式

    SpringMVC前后端傳值的幾種實(shí)現(xiàn)方式

    本文主要介紹了SpringMVC前后端傳值的方式實(shí)現(xiàn),包括使用HttpServletRequest、HttpSession、Model和ModelAndView等方法,具有一定的參考價值,感興趣的可以了解一下
    2025-02-02
  • SpringMVC請求數(shù)據(jù)詳解講解

    SpringMVC請求數(shù)據(jù)詳解講解

    Spring MVC 是 Spring 提供的一個基于 MVC 設(shè)計(jì)模式的輕量級 Web 開發(fā)框架,本質(zhì)上相當(dāng)于 Servlet,Spring MVC 角色劃分清晰,分工明細(xì),本章來講解SpringMVC如何請求數(shù)據(jù)
    2022-07-07
  • Java基于代理模式解決紅酒經(jīng)銷問題詳解

    Java基于代理模式解決紅酒經(jīng)銷問題詳解

    這篇文章主要介紹了Java基于代理模式解決紅酒經(jīng)銷問題,詳細(xì)描述了代理模式的概念、原理并結(jié)合實(shí)例形式分析了java基于代理模式解決紅酒經(jīng)銷問題的相關(guān)步驟、實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下
    2018-04-04
  • Java警告:原發(fā)性版11需要目標(biāo)發(fā)行版11的解決方法和步驟

    Java警告:原發(fā)性版11需要目標(biāo)發(fā)行版11的解決方法和步驟

    這篇文章主要介紹了Java警告:原發(fā)性版11需要目標(biāo)發(fā)行版11的解決方法和步驟,文中通過圖文介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用java具有一定的參考借鑒價值,需要的朋友可以參考下
    2025-04-04
  • Java中csv文件讀寫超詳細(xì)分析

    Java中csv文件讀寫超詳細(xì)分析

    CSV是一種通用的、相對簡單的文件格式,其文件以純文本形式存儲表格數(shù)據(jù),下面這篇文章主要給大家介紹了關(guān)于Java中csv文件讀寫分析的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-05-05

最新評論