欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?boot異步任務(wù)原理全面分析

 更新時間:2022年11月22日 10:38:22   作者:fenglllle  
這篇文章主要介紹了Spring?boot異步任務(wù)原理,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

前言

我們經(jīng)常在需要提升性能或者項(xiàng)目架構(gòu)解耦的過程中,使用線程池異步執(zhí)行任務(wù),經(jīng)常使用ThreadPoolExecutor創(chuàng)建線程池。那么Spring對異步任務(wù)是如何處理的呢?

1. spring 異步任務(wù)

估計或多或少了解過一些,比如@EnableAsync可以開啟異步任務(wù),@Async用于注解說明當(dāng)前方法是異步執(zhí)行,下面使用demo看看Spring的異步任務(wù)如何執(zhí)行。

pom依賴,其實(shí)僅依賴Spring core context 就可以了,這里演示,另外spring boot還要許多好玩的特性。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.1.3.RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

main & controller

@RestController
@SpringBootApplication
public class AsyncMain {
    public static void main(String[] args) {
        SpringApplication.run(AsyncMain.class, args);
    }
 
    @Autowired
    private TaskService taskService;
 
    @RequestMapping(value = "/async-task", method = RequestMethod.GET)
    public String asyncMapping(){
        System.out.println(Thread.currentThread().getThreadGroup() + "http-------" + Thread.currentThread().getName());
        taskService.doTask();
        return "exec http ok--------------";
    }
}

異步任務(wù)服務(wù)

@EnableAsync
@Service
public class TaskService {
 
    @Async
    public String doTask(){
        System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName());
        return "do task done";
    }
}

運(yùn)行main方法,訪問localhost:8080/async-task,控制臺可以看到:

可以看到線程的name是task-1,而http訪問的線程是http-nio-xxx。說明任務(wù)異步執(zhí)行了。然而Spring的異步任務(wù)是如何執(zhí)行的呢,我們也并未創(chuàng)建線程池,難道Spring替我們創(chuàng)建了?

2. Spring boot異步任務(wù)執(zhí)行過程分析

首先,需要執(zhí)行異步任務(wù),必須創(chuàng)建線程池,那我們來揪出Spring創(chuàng)建的線程池,從啟動日志可以看出

Spring默認(rèn)給我們創(chuàng)建了applicationTaskExecutor的ExecutorService的線程池。

通過源碼分析,Spring boot的starter已經(jīng)給我們設(shè)置了默認(rèn)的執(zhí)行器

/**
 * {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}.
 *
 * @author Stephane Nicoll
 * @author Camille Vienot
 * @since 2.1.0
 */
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {
 
	/**
	 * Bean name of the application {@link TaskExecutor}.
	 */
	public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
 
	private final TaskExecutionProperties properties;
 
	private final ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers;
 
	private final ObjectProvider<TaskDecorator> taskDecorator;
 
	public TaskExecutionAutoConfiguration(TaskExecutionProperties properties,
			ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
			ObjectProvider<TaskDecorator> taskDecorator) {
		this.properties = properties;
		this.taskExecutorCustomizers = taskExecutorCustomizers;
		this.taskDecorator = taskDecorator;
	}
 
	@Bean
	@ConditionalOnMissingBean
	public TaskExecutorBuilder taskExecutorBuilder() {
		TaskExecutionProperties.Pool pool = this.properties.getPool();
		TaskExecutorBuilder builder = new TaskExecutorBuilder();
		builder = builder.queueCapacity(pool.getQueueCapacity());
		builder = builder.corePoolSize(pool.getCoreSize());
		builder = builder.maxPoolSize(pool.getMaxSize());
		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
		builder = builder.keepAlive(pool.getKeepAlive());
		builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
		builder = builder.customizers(this.taskExecutorCustomizers);
		builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
		return builder;
	}
 
	@Lazy
	@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
			AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
	@ConditionalOnMissingBean(Executor.class)
	public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
		return builder.build();
	}
}

追根溯源:在Spring boot的autoconfigure中已經(jīng)定義了默認(rèn)實(shí)現(xiàn)

Spring為我們定義了兩種實(shí)現(xiàn),如上圖所示,根據(jù)Spring boot的配置定律,我們可以通過配置來定義異步任務(wù)的參數(shù)

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
 
	private final Pool pool = new Pool();
 
	/**
	 * Prefix to use for the names of newly created threads.
	 */
	private String threadNamePrefix = "task-";
 
	public Pool getPool() {
		return this.pool;
	}
 
	public String getThreadNamePrefix() {
		return this.threadNamePrefix;
	}
 
	public void setThreadNamePrefix(String threadNamePrefix) {
		this.threadNamePrefix = threadNamePrefix;
	}
 
	public static class Pool {
 
		/**
		 * Queue capacity. An unbounded capacity does not increase the pool and therefore
		 * ignores the "max-size" property.
		 */
		private int queueCapacity = Integer.MAX_VALUE;
 
		/**
		 * Core number of threads.
		 */
		private int coreSize = 8;
 
		/**
		 * Maximum allowed number of threads. If tasks are filling up the queue, the pool
		 * can expand up to that size to accommodate the load. Ignored if the queue is
		 * unbounded.
		 */
		private int maxSize = Integer.MAX_VALUE;
 
		/**
		 * Whether core threads are allowed to time out. This enables dynamic growing and
		 * shrinking of the pool.
		 */
		private boolean allowCoreThreadTimeout = true;
 
		/**
		 * Time limit for which threads may remain idle before being terminated.
		 */
		private Duration keepAlive = Duration.ofSeconds(60);

省略get set方法,spring boot的配置以spring.task.execution開頭,參數(shù)的設(shè)置參考如上源碼的屬性設(shè)置。

各位可以自行嘗試,當(dāng)然因?yàn)镾pring bean的定義方式,我們可以復(fù)寫bean來達(dá)到自定義的目的

? ? @Lazy
?? ?@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
?? ??? ??? ?AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
?? ?@ConditionalOnMissingBean(Executor.class)
?? ?public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
?? ??? ?return builder.build();
?? ?}

比如:

@Configuration
@EnableAsync
public class TaskAsyncConfig {
 
    @Bean
    public Executor initExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //定制線程名稱,還可以定制線程group
        executor.setThreadFactory(new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
 
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(Thread.currentThread().getThreadGroup(), r,
                        "async-task-" + threadNumber.getAndIncrement(),
                        0);
                return t;
            }
        });
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setKeepAliveSeconds(5);
        executor.setQueueCapacity(100);
//        executor.setRejectedExecutionHandler(null);
        return executor;
    }
}

重啟,訪問localhost:8080/async-task,證明我們寫的Executor已經(jīng)覆蓋系統(tǒng)默認(rèn)了。

3. Spring 異步任務(wù)執(zhí)行過程分析

方法斷點(diǎn)跟蹤

執(zhí)行異步任務(wù)使用Spring CGLib動態(tài)代理AOP實(shí)現(xiàn)

可以看出動態(tài)代理后使用AsyncExecutionInterceptor來處理異步邏輯,執(zhí)行submit方法

同理可以看出,默認(rèn)的taskExecutor使用BeanFactory中獲取。 

默認(rèn)使用SimpleAsyncUncaughtExceptionHandler處理異步異常。下面我們來試試

@EnableAsync
@Service
public class TaskService {
 
    @Async
    public String doTask(){
        System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName());
        throw new RuntimeException(" I`m a demo test exception-----------------");
    }
}

默認(rèn)會打印logger.error("Unexpected exception occurred invoking async method: " + method, ex);日志 

public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
 
	private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);
 
 
	@Override
	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
		if (logger.isErrorEnabled()) {
			logger.error("Unexpected exception occurred invoking async method: " + method, ex);
		}
	}
 
}

運(yùn)行測試

4. Spring 自定義Executor與自定義異步異常處理

需要實(shí)現(xiàn)AsyncConfigurer接口,可以看到Spring要我們配合EnableAsync與Configuration注解同時使用

/**
 * Interface to be implemented by @{@link org.springframework.context.annotation.Configuration
 * Configuration} classes annotated with @{@link EnableAsync} that wish to customize the
 * {@link Executor} instance used when processing async method invocations or the
 * {@link AsyncUncaughtExceptionHandler} instance used to process exception thrown from
 * async method with {@code void} return type.
 *
 * <p>Consider using {@link AsyncConfigurerSupport} providing default implementations for
 * both methods if only one element needs to be customized. Furthermore, backward compatibility
 * of this interface will be insured in case new customization options are introduced
 * in the future.
 *
 * <p>See @{@link EnableAsync} for usage examples.
 *
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.1
 * @see AbstractAsyncConfiguration
 * @see EnableAsync
 * @see AsyncConfigurerSupport
 */
public interface AsyncConfigurer {
 
	/**
	 * The {@link Executor} instance to be used when processing async
	 * method invocations.
	 */
	@Nullable
	default Executor getAsyncExecutor() {
		return null;
	}
 
	/**
	 * The {@link AsyncUncaughtExceptionHandler} instance to be used
	 * when an exception is thrown during an asynchronous method execution
	 * with {@code void} return type.
	 */
	@Nullable
	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return null;
	}
}

demo,如下改造

@Configuration
@EnableAsync
public class TaskAsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //定制線程名稱,還可以定制線程group
        executor.setThreadFactory(new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
 
            @Override
            public Thread newThread(Runnable r) {
                //重新定義一個名稱
                Thread t = new Thread(Thread.currentThread().getThreadGroup(), r,
                        "async-task-all" + threadNumber.getAndIncrement(),
                        0);
                return t;
            }
        });
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setKeepAliveSeconds(5);
        executor.setQueueCapacity(100);
//        executor.setRejectedExecutionHandler(null);
        executor.initialize();
        return executor;
    }
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                System.out.println("do exception by myself");
            }
        };
    }
 
}

記住,此時,Spring就不會替我們管理Executor了,需要我們自己初始化

executor.initialize();

觀其源碼就是new 一個ThreadPoolExecutor

@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
 
		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
 
		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);
 
		}
 
		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}
 
		this.threadPoolExecutor = executor;
		return executor;
	}

運(yùn)行,結(jié)果如下

總結(jié)

Spring boot將簡單的ThreadPoolExecutor通過封裝成了異步任務(wù),大大方便了程序的開發(fā)。

然而我們在如上的示例中,并沒有處理程序的異步執(zhí)行結(jié)果,其實(shí)Spring定義了結(jié)果的處理

/**
 * AOP Alliance {@code MethodInterceptor} that processes method invocations
 * asynchronously, using a given {@link org.springframework.core.task.AsyncTaskExecutor}.
 * Typically used with the {@link org.springframework.scheduling.annotation.Async} annotation.
 *
 * <p>In terms of target method signatures, any parameter types are supported.
 * However, the return type is constrained to either {@code void} or
 * {@code java.util.concurrent.Future}. In the latter case, the Future handle
 * returned from the proxy will be an actual asynchronous Future that can be used
 * to track the result of the asynchronous method execution. However, since the
 * target method needs to implement the same signature, it will have to return
 * a temporary Future handle that just passes the return value through
 * (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
 * or EJB 3.1's {@code javax.ejb.AsyncResult}).
 *
 * <p>When the return type is {@code java.util.concurrent.Future}, any exception thrown
 * during the execution can be accessed and managed by the caller. With {@code void}
 * return type however, such exceptions cannot be transmitted back. In that case an
 * {@link AsyncUncaughtExceptionHandler} can be registered to process such exceptions.
 *
 * <p>As of Spring 3.1.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is
 * preferred for use due to its support for executor qualification in conjunction with
 * Spring's {@code @Async} annotation.
 *
 * @author Juergen Hoeller
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.0
 * @see org.springframework.scheduling.annotation.Async
 * @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
 * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
 */
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
<p>In terms of target method signatures, any parameter types are supported.
?* However, the return type is constrained to either {@code void} or
?* {@code java.util.concurrent.Future}. In the latter case, the Future handle
?* returned from the proxy will be an actual asynchronous Future that can be used
?* to track the result of the asynchronous method execution. However, since the
?* target method needs to implement the same signature, it will have to return
?* a temporary Future handle that just passes the return value through
?* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
?* or EJB 3.1's {@code javax.ejb.AsyncResult}).

如果程序不返回void或者Future,那么通過AsyncResult來返回一個結(jié)果

另外Spring還定義了一個Task,即定時任務(wù)task,原理相同。

以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論