Spring定時任務(wù)并行(異步)處理方式
Spring定時任務(wù)并行(異步)
最近項目中遇到一個問題 , 在SpringBoot中設(shè)置了定時任務(wù)之后 , 在某個點總是沒有執(zhí)行 . 經(jīng)過搜索研究發(fā)現(xiàn) , spring 定時器任務(wù)scheduled-tasks默認配置是單線程串行執(zhí)行的 .
即在當前時間點之內(nèi) . 如果同時有兩個定時任務(wù)需要執(zhí)行的時候 , 排在第二個的任務(wù)就必須等待第一個任務(wù)執(zhí)行完畢執(zhí)行才能正常運行.
如果第一個任務(wù)耗時較久的話 , 就會造成第二個任務(wù)不能及時執(zhí)行 .
這樣就可能由于時效性造成其他問題 . 而在實際項目中 , 我們也往往需要這些定時任務(wù)是"各干各的" , 而不是排隊執(zhí)行.
以下為默認串行的定時任務(wù)代碼
package com.xbz.timerTask.task;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @title 測試spring定時任務(wù)執(zhí)行
* @createDate 2017年8月18日
* @version 1.0
*/
@Component
@Configuration
@EnableScheduling
public class MyTestTask {
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Scheduled(fixedDelay = 1000)
public void executeUpdateYqTask() {
System.out.println(Thread.currentThread().getName() + " >>> task one " + format.format(new Date()));
}
@Scheduled(fixedDelay = 1000)
public void executeRepaymentTask() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " >>> task two " + format.format(new Date()));
Thread.sleep(5000);
}
}啟動項目之后 , 發(fā)現(xiàn)控制臺輸出如下 :

可以發(fā)現(xiàn) , 一直是pool-5-thread-1一個線程在執(zhí)行定時任務(wù) , 這顯然不符合我們的業(yè)務(wù)需求.
如何把定時任務(wù)改造成異步呢 , 在spring中網(wǎng)上文檔較多 , 不再敘述 . 但在SpringBoot找到的相關(guān)資料也是新建xml文件的方式配置 , 實際上這就違背了SpringBoot減少配置文件的初衷 .
在SpringBoot可以自定義以下線程池配置
package com.xbz.config;
@Configuration
@EnableScheduling
public class ScheduleConfig implements SchedulingConfigurer, AsyncConfigurer{
/** 異步處理 */
public void configureTasks(ScheduledTaskRegistrar taskRegistrar){
TaskScheduler taskScheduler = taskScheduler();
taskRegistrar.setTaskScheduler(taskScheduler);
}
/** 定時任務(wù)多線程處理 */
@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskScheduler taskScheduler(){
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(20);
scheduler.setThreadNamePrefix("task-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
return scheduler;
}
/** 異步處理 */
public Executor getAsyncExecutor(){
Executor executor = taskScheduler();
return executor;
}
/** 異步處理 異常 */
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return new SimpleAsyncUncaughtExceptionHandler();
}
}此時再啟動定時任務(wù) , 就發(fā)現(xiàn)已經(jīng)是異步處理的了 .

如果項目中同時配置了異步任務(wù)的線程池和定時任務(wù)的異步線程處理
配置類如下 :
package com.xbz.config;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
/**
* @title 使用自定義的線程池執(zhí)行異步任務(wù) , 并設(shè)置定時任務(wù)的異步處理
* @version 1.0
*/
@Configuration
@EnableAsync
@EnableScheduling
public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer {
private static final Logger LOG = LogManager.getLogger(ExecutorConfig.class.getName());
@Autowired
private TaskThreadPoolConfig config;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(config.getCorePoolSize());
executor.setMaxPoolSize(config.getMaxPoolSize());
executor.setQueueCapacity(config.getQueueCapacity());
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
executor.setThreadNamePrefix("taskExecutor-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* @title 異步任務(wù)中異常處理
* @description
* @author Xingbz
* @createDate 2017年9月11日
* @return
* @see org.springframework.scheduling.annotation.AsyncConfigurer#getAsyncUncaughtExceptionHandler()
* @version 1.0
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
LOG.error("==========================" + ex.getMessage() + "=======================", ex);
LOG.error("exception method:" + method.getName());
}
};
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
TaskScheduler taskScheduler = taskScheduler();
taskRegistrar.setTaskScheduler(taskScheduler);
}
/**
* 并行任務(wù)使用策略:多線程處理
*
* @return ThreadPoolTaskScheduler 線程池
*/
@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(config.getCorePoolSize());
scheduler.setThreadNamePrefix("task-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
return scheduler;
}
}需要注意:
- 這兩個配置類只能同時配置一個
- 如果配置了第二個 , 則第一個就無需再用
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA2023版本創(chuàng)建Spring項目只能勾選17和21卻無法使用Java8的完美解決方案
想創(chuàng)建一個springboot的項目,本地安裝的是1.8,但是在使用Spring Initializr創(chuàng)建項目時,發(fā)現(xiàn)版本只有17和21,這篇文章主要介紹了IDEA2023版本創(chuàng)建Sping項目只能勾選17和21,卻無法使用Java8的解決方法,需要的朋友可以參考下2023-12-12
關(guān)于注解式的分布式Elasticsearch的封裝案例
這篇文章主要介紹了關(guān)于注解式的分布式Elasticsearch的封裝案例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01
2022年最新java?8?(?jdk1.8u321)安裝圖文教程
這篇文章主要介紹了2022年最新java?8?(?jdk1.8u321)安裝圖文教程,截止2022年1月,官方出的jdk1.8目前已更新到8u321的版本,本文通過圖文并茂的形式給大家介紹安裝過程,需要的朋友可以參考下2022-08-08
Invalid?bound?statement?(not?found)出現(xiàn)原因以及解決辦法
這篇文章主要給大家介紹了關(guān)于Invalid?bound?statement?(not?found)出現(xiàn)原因以及解決辦法的相關(guān)資料,文中給出了詳細的解決方法,需要的朋友可以參考下2023-07-07
關(guān)于Spring BeanPostProcessor的執(zhí)行順序
這篇文章主要介紹了Spring BeanPostProcessor的執(zhí)行順序,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
Java多線程工具CompletableFuture的使用教程
CompletableFuture實現(xiàn)了CompletionStage接口和Future接口,前者是對后者的一個擴展,增加了異步回調(diào)、流式處理、多個Future組合處理的能力。本文就來詳細講講CompletableFuture的使用方式,需要的可以參考一下2022-08-08

