Spring定時(shí)任務(wù)并行(異步)處理方式
Spring定時(shí)任務(wù)并行(異步)
最近項(xiàng)目中遇到一個(gè)問題 , 在SpringBoot中設(shè)置了定時(shí)任務(wù)之后 , 在某個(gè)點(diǎn)總是沒有執(zhí)行 . 經(jīng)過搜索研究發(fā)現(xiàn) , spring 定時(shí)器任務(wù)scheduled-tasks默認(rèn)配置是單線程串行執(zhí)行的 .
即在當(dāng)前時(shí)間點(diǎn)之內(nèi) . 如果同時(shí)有兩個(gè)定時(shí)任務(wù)需要執(zhí)行的時(shí)候 , 排在第二個(gè)的任務(wù)就必須等待第一個(gè)任務(wù)執(zhí)行完畢執(zhí)行才能正常運(yùn)行.
如果第一個(gè)任務(wù)耗時(shí)較久的話 , 就會(huì)造成第二個(gè)任務(wù)不能及時(shí)執(zhí)行 .
這樣就可能由于時(shí)效性造成其他問題 . 而在實(shí)際項(xiàng)目中 , 我們也往往需要這些定時(shí)任務(wù)是"各干各的" , 而不是排隊(duì)執(zhí)行.
以下為默認(rèn)串行的定時(shí)任務(wù)代碼
package com.xbz.timerTask.task; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @title 測試spring定時(shí)任務(wù)執(zhí)行 * @createDate 2017年8月18日 * @version 1.0 */ @Component @Configuration @EnableScheduling public class MyTestTask { private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Scheduled(fixedDelay = 1000) public void executeUpdateYqTask() { System.out.println(Thread.currentThread().getName() + " >>> task one " + format.format(new Date())); } @Scheduled(fixedDelay = 1000) public void executeRepaymentTask() throws InterruptedException { System.out.println(Thread.currentThread().getName() + " >>> task two " + format.format(new Date())); Thread.sleep(5000); } }
啟動(dòng)項(xiàng)目之后 , 發(fā)現(xiàn)控制臺(tái)輸出如下 :
可以發(fā)現(xiàn) , 一直是pool-5-thread-1一個(gè)線程在執(zhí)行定時(shí)任務(wù) , 這顯然不符合我們的業(yè)務(wù)需求.
如何把定時(shí)任務(wù)改造成異步呢 , 在spring中網(wǎng)上文檔較多 , 不再敘述 . 但在SpringBoot找到的相關(guān)資料也是新建xml文件的方式配置 , 實(shí)際上這就違背了SpringBoot減少配置文件的初衷 .
在SpringBoot可以自定義以下線程池配置
package com.xbz.config; @Configuration @EnableScheduling public class ScheduleConfig implements SchedulingConfigurer, AsyncConfigurer{ /** 異步處理 */ public void configureTasks(ScheduledTaskRegistrar taskRegistrar){ TaskScheduler taskScheduler = taskScheduler(); taskRegistrar.setTaskScheduler(taskScheduler); } /** 定時(shí)任務(wù)多線程處理 */ @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskScheduler(){ ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(20); scheduler.setThreadNamePrefix("task-"); scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } /** 異步處理 */ public Executor getAsyncExecutor(){ Executor executor = taskScheduler(); return executor; } /** 異步處理 異常 */ public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){ return new SimpleAsyncUncaughtExceptionHandler(); } }
此時(shí)再啟動(dòng)定時(shí)任務(wù) , 就發(fā)現(xiàn)已經(jīng)是異步處理的了 .
如果項(xiàng)目中同時(shí)配置了異步任務(wù)的線程池和定時(shí)任務(wù)的異步線程處理
配置類如下 :
package com.xbz.config; import java.lang.reflect.Method; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; /** * @title 使用自定義的線程池執(zhí)行異步任務(wù) , 并設(shè)置定時(shí)任務(wù)的異步處理 * @version 1.0 */ @Configuration @EnableAsync @EnableScheduling public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer { private static final Logger LOG = LogManager.getLogger(ExecutorConfig.class.getName()); @Autowired private TaskThreadPoolConfig config; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(config.getCorePoolSize()); executor.setMaxPoolSize(config.getMaxPoolSize()); executor.setQueueCapacity(config.getQueueCapacity()); executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); executor.setThreadNamePrefix("taskExecutor-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * @title 異步任務(wù)中異常處理 * @description * @author Xingbz * @createDate 2017年9月11日 * @return * @see org.springframework.scheduling.annotation.AsyncConfigurer#getAsyncUncaughtExceptionHandler() * @version 1.0 */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { LOG.error("==========================" + ex.getMessage() + "=======================", ex); LOG.error("exception method:" + method.getName()); } }; } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { TaskScheduler taskScheduler = taskScheduler(); taskRegistrar.setTaskScheduler(taskScheduler); } /** * 并行任務(wù)使用策略:多線程處理 * * @return ThreadPoolTaskScheduler 線程池 */ @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(config.getCorePoolSize()); scheduler.setThreadNamePrefix("task-"); scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } }
需要注意:
- 這兩個(gè)配置類只能同時(shí)配置一個(gè)
- 如果配置了第二個(gè) , 則第一個(gè)就無需再用
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA2023版本創(chuàng)建Spring項(xiàng)目只能勾選17和21卻無法使用Java8的完美解決方案
想創(chuàng)建一個(gè)springboot的項(xiàng)目,本地安裝的是1.8,但是在使用Spring Initializr創(chuàng)建項(xiàng)目時(shí),發(fā)現(xiàn)版本只有17和21,這篇文章主要介紹了IDEA2023版本創(chuàng)建Sping項(xiàng)目只能勾選17和21,卻無法使用Java8的解決方法,需要的朋友可以參考下2023-12-12關(guān)于注解式的分布式Elasticsearch的封裝案例
這篇文章主要介紹了關(guān)于注解式的分布式Elasticsearch的封裝案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-01-012022年最新java?8?(?jdk1.8u321)安裝圖文教程
這篇文章主要介紹了2022年最新java?8?(?jdk1.8u321)安裝圖文教程,截止2022年1月,官方出的jdk1.8目前已更新到8u321的版本,本文通過圖文并茂的形式給大家介紹安裝過程,需要的朋友可以參考下2022-08-08Invalid?bound?statement?(not?found)出現(xiàn)原因以及解決辦法
這篇文章主要給大家介紹了關(guān)于Invalid?bound?statement?(not?found)出現(xiàn)原因以及解決辦法的相關(guān)資料,文中給出了詳細(xì)的解決方法,需要的朋友可以參考下2023-07-07關(guān)于Spring BeanPostProcessor的執(zhí)行順序
這篇文章主要介紹了Spring BeanPostProcessor的執(zhí)行順序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10Java多線程工具CompletableFuture的使用教程
CompletableFuture實(shí)現(xiàn)了CompletionStage接口和Future接口,前者是對(duì)后者的一個(gè)擴(kuò)展,增加了異步回調(diào)、流式處理、多個(gè)Future組合處理的能力。本文就來詳細(xì)講講CompletableFuture的使用方式,需要的可以參考一下2022-08-0830分鐘入門Java8之lambda表達(dá)式學(xué)習(xí)
本篇文章主要介紹了30分鐘入門Java8之lambda表達(dá)式學(xué)習(xí),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-04-04java并發(fā)學(xué)習(xí)之Executor源碼解析
這篇文章主要為大家介紹了java并發(fā)學(xué)習(xí)之Executor源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07