Spring動(dòng)態(tài)管理定時(shí)任務(wù)之ThreadPoolTaskScheduler解讀
Spring動(dòng)態(tài)管理定時(shí)任務(wù)ThreadPoolTaskScheduler
Spring任務(wù)調(diào)度核心類ThreadPoolTaskScheduler,API文檔解釋如下:
Implementation of Spring's TaskScheduler interface, wrapping a native java.util.concurrent.ScheduledThreadPoolExecutor.
Spring的TaskScheduler接口的實(shí)現(xiàn),包裝了一個(gè)本地java.util.concurrent.ScheduledThreadPoolExecutor。
實(shí)現(xiàn)思路
注入調(diào)度類bean,初始化一個(gè)ConcurrentHashMap容器,用來(lái)保存多個(gè)定時(shí)任務(wù)的狀態(tài),每一個(gè)任務(wù)的運(yùn)行狀態(tài)被封裝在ScheduledFuture中,借此類可取消對(duì)應(yīng)的定時(shí)任務(wù)。
import java.time.LocalDateTime; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.cjia.spidercommon.model.SpiderJob; import com.cjia.spiderjob.mapper.SpiderJobMapper; import lombok.Data; import lombok.extern.slf4j.Slf4j; /** * 用來(lái)管理(啟動(dòng)、停止、新增、刪除、更新編輯、查看運(yùn)行狀態(tài))定時(shí)任務(wù)(增量任務(wù)) */ @Slf4j @RestController @RequestMapping("spiderJob/cron") public class CronJobController extends SpiderJobController { @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; private Map<Integer, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); @Resource private SpiderJobMapper spiderJobMapper; @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { return new ThreadPoolTaskScheduler(); } /** * 啟動(dòng)單個(gè)定時(shí)任務(wù) */ @RequestMapping("/start/{jobId}") public String start(@PathVariable Integer jobId) { SpiderJob job = spiderJobMapper.selectById(jobId); if (job == null) { log.warn("任務(wù)[{}]已不存在,無(wú)法啟動(dòng)!", jobId); return "任務(wù)[" + jobId + "]已不存在,無(wú)法啟動(dòng)!"; } int enable = job.getEnable(); if (enable == 0) { log.warn("任務(wù)[{}]已被禁用,無(wú)法啟動(dòng)!", jobId); return "任務(wù)[" + jobId + "]已被禁用,無(wú)法啟動(dòng)!"; } // 檢測(cè)該任務(wù)是否已在運(yùn)行調(diào)度中 if (futureMap.get(jobId) != null) { log.warn("任務(wù)[{}]已在調(diào)度運(yùn)行,無(wú)法重復(fù)啟動(dòng)!", jobId); return "任務(wù)[" + jobId + "]已在調(diào)度運(yùn)行,無(wú)法重復(fù)啟動(dòng)!"; } String cron = job.getCron(); // TODO check cron ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(job), new CronTrigger(cron)); log.info("任務(wù)[{}]已被啟動(dòng)!", jobId); futureMap.put(jobId, future); return "任務(wù)[" + jobId + "]已被啟動(dòng)!"; } /** * 批量啟動(dòng)定時(shí)任務(wù) */ @RequestMapping("/startBatch/{jobIds}") public String startBatch(@PathVariable String jobIds) { // TODO jobIds valid String[] jobIdsArr = jobIds.split(","); StringBuffer sb = new StringBuffer(); for (String jobId : jobIdsArr) { String result = start(Integer.valueOf(jobId)); sb.append(result).append("<br>"); } return sb.toString(); } /** * 停止單個(gè)定時(shí)任務(wù) */ @RequestMapping("/stop/{jobId}") public String stop(@PathVariable Integer jobId) { // 檢測(cè)該任務(wù)是否已在運(yùn)行調(diào)度中 ScheduledFuture<?> future = futureMap.get(jobId); if (future == null) { log.warn("任務(wù)[{}]已不在調(diào)度中,無(wú)法停止!", jobId); return "任務(wù)[" + jobId + "]已不在調(diào)度中,無(wú)法停止!"; } else { future.cancel(true); futureMap.remove(jobId); log.info("任務(wù)[{}]已被停止!", jobId); return "任務(wù)[" + jobId + "]已被停止!"; } } /** * 批量停止定時(shí)任務(wù) */ @RequestMapping("/stopBatch/{jobIds}") public String stopBatch(@PathVariable String jobIds) { // TODO jobIds valid String[] jobIdsArr = jobIds.split(","); StringBuffer sb = new StringBuffer(); for (String jobId : jobIdsArr) { String result = stop(Integer.valueOf(jobId)); sb.append(result).append("<br>"); } return sb.toString(); } /** * 查看當(dāng)前時(shí)刻調(diào)度中的定時(shí)任務(wù) */ @RequestMapping("/status") public String getAllStatus() { Set<Integer> runningKeys = futureMap.keySet(); return "當(dāng)前正在調(diào)度的任務(wù)列表:" + runningKeys.toString(); } @Data private class MyRunnable implements Runnable { private SpiderJob job; public MyRunnable(SpiderJob job) { this.job = job; } @Override public void run() { log.info("運(yùn)行定時(shí)任務(wù)[{}: {}] at {}!", job.getId(), job.getBizName(), LocalDateTime.now()); executeIncrementJob(job.getBizName()); } } }
ThreadPoolTaskScheduler 定時(shí)任務(wù)實(shí)現(xiàn)
org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定時(shí)任務(wù)調(diào)度線程池
CREATE TABLE `sys_job` ( `id` bigint(20) NOT NULL COMMENT '任務(wù)key', `job_name` varchar(64) NOT NULL COMMENT '任務(wù)名稱', `bean_class` varchar(128) NOT NULL COMMENT '類路徑', `cron_expression` varchar(64) NOT NULL COMMENT 'cron表達(dá)式', `status` tinyint(1) NOT NULL COMMENT '狀態(tài)值 @JobStatusEnum 詳見(jiàn)具體枚舉類', `is_deleted` tinyint(1) DEFAULT '0' COMMENT '刪除標(biāo)識(shí) 1是 0否', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Configuration @Slf4j public class SchedulingConfigure { @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { log.info("開(kāi)始創(chuàng)建定時(shí)任務(wù)調(diào)度線程池"); ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setPoolSize(20); threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-"); threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskScheduler.setAwaitTerminationSeconds(60); log.info("創(chuàng)建定時(shí)任務(wù)調(diào)度線程池完成!"); return threadPoolTaskScheduler; } }
public enum JobStatusEnum { /** * 未加入調(diào)度器 */ NOT_SCHEDULE(0, "未加入調(diào)度器"), /** * 加入調(diào)度器,但未運(yùn)行 */ SCHEDULED_BUT_NOT_RUNNING(1, "加入調(diào)度器,但未運(yùn)行"), /** * 從調(diào)度器中已刪除 */ DELETED(2, "從調(diào)度器中已刪除"), ; private Integer status; private String detail; JobStatusEnum(Integer status, String detail) { this.status = status; this.detail = detail; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } public String getDetail() { return detail; } public void setDetail(String detail) { this.detail = detail; } }
@Component @Slf4j public class ScheduledJobService { private final ReentrantLock lock = new ReentrantLock(); @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired private SysJobService jobService; @Autowired private SpringBeanUtils springBeanUtils; /** * 已經(jīng)加入調(diào)度器的任務(wù)map */ private final ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>(); /** * 初始化啟動(dòng)任務(wù) * * @param sysJobs 數(shù)據(jù)庫(kù)任務(wù)集合 */ public void initAllJob(List<SysJob> sysJobs) { if (CollectionUtils.isEmpty(sysJobs)) { return; } for (SysJob sysJob : sysJobs) { if (JobStatusEnum.NOT_SCHEDULE.getStatus().equals(sysJob.getStatus()) || JobStatusEnum.DELETED.getStatus().equals(sysJob.getStatus()) || this.isScheduled(sysJob.getId())) { // 任務(wù)初始化狀態(tài)或已刪除或已加載到調(diào)度器中 continue; } // 將任務(wù)加入調(diào)度器 this.doScheduleJob(sysJob); } } /** * 啟動(dòng)任務(wù) * * @param jobId job主鍵id */ public void start(Long jobId) { log.info("啟動(dòng)任務(wù):-> jobId_{}", jobId); // 加入調(diào)度器 schedule(jobId); log.info("啟動(dòng)任務(wù)結(jié)束:-> jobId_{}", jobId); // 更新任務(wù)狀態(tài) jobService.updateJobStatus(jobId, JobStatusEnum.SCHEDULED_BUT_NOT_RUNNING.getStatus()); } /** * 停止任務(wù) * * @param jobId job主鍵id */ public void stop(Long jobId) { log.info("停止任務(wù):-> jobId_{}", jobId); // 取消任務(wù) cancel(jobId); log.info("停止任務(wù)結(jié)束:-> jobId_{}", jobId); // 更新表中任務(wù)狀態(tài)為已停止 jobService.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus()); } /** * 移除任務(wù) * * @param jobId job主鍵id */ public void remove(Long jobId) { log.info("移除任務(wù):-> jobId_{}", jobId); // 取消任務(wù) cancel(jobId); log.info("移除任務(wù)結(jié)束:-> jobId_{}", jobId); // 更新表中任務(wù)狀態(tài)為已刪除 jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.getStatus()); } /** * 取消 * * @param jobId 工作id */ private void cancel(Long jobId) { // 任務(wù)是否存在 if (scheduledFutureMap.containsKey(jobId)) { ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(jobId); if (!scheduledFuture.isCancelled()) { // 取消調(diào)度 scheduledFuture.cancel(true); } } } private void schedule(Long jobId) { // 添加鎖,只允許單個(gè)線程訪問(wèn),防止任務(wù)啟動(dòng)多次 lock.lock(); try { if (isScheduled(jobId)) { log.error("任務(wù)jobId_{}已經(jīng)加入調(diào)度器,無(wú)需重復(fù)操作", jobId); return; } // 通過(guò)jobKey查詢jobBean對(duì)象 SysJob sysJob = jobService.getById(jobId); // 啟動(dòng)定時(shí)任務(wù) doScheduleJob(sysJob); } finally { // 釋放鎖資源 lock.unlock(); } } /** * 執(zhí)行啟動(dòng)任務(wù) * * @param sysJob 任務(wù)實(shí)體類對(duì)象 */ private void doScheduleJob(SysJob sysJob) { Long jobId = sysJob.getId(); String beanClass = sysJob.getBeanClass(); String jobName = sysJob.getJobName(); String cron = sysJob.getCronExpression(); // 從Spring中獲取目標(biāo)的job業(yè)務(wù)實(shí)現(xiàn)類 ScheduledJob scheduledJob = parseFrom(beanClass); if (scheduledJob == null) { return; } scheduledJob.setJobId(jobId); scheduledJob.setJobName(jobName); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob, triggerContext -> { CronTrigger cronTrigger = new CronTrigger(cron); return cronTrigger.nextExecutionTime(triggerContext); }); log.info("任務(wù)加入調(diào)度器 -> jobId:{},jobName:{}", jobId, jobName); // 將啟動(dòng)的任務(wù)放入map assert scheduledFuture != null; scheduledFutureMap.put(jobId, scheduledFuture); } /** * 任務(wù)是否已經(jīng)進(jìn)入調(diào)度器 * * @param jobId 任務(wù)主鍵key * @return {@link Boolean} */ private Boolean isScheduled(Long jobId) { if (scheduledFutureMap.containsKey(jobId)) { return !scheduledFutureMap.get(jobId).isCancelled(); } return false; } private ScheduledJob parseFrom(String beanClass) { try { Class<?> clazz = Class.forName(beanClass); return (ScheduledJob) springBeanUtils.getBean(clazz); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } }
@Component public class SpringBeanUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringBeanUtils.applicationContext = applicationContext; } /** * 獲取applicationContext */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 通過(guò)name獲取 Bean. */ public Object getBean(String name) { return getApplicationContext().getBean(name); } /** * 通過(guò)class獲取Bean. */ public <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } /** * 通過(guò)name,以及Clazz返回指定的Bean */ public <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); } }
@Data public abstract class ScheduledJob implements Runnable { /** * 任務(wù)主鍵id */ private Long jobId; /** * 任務(wù)名 */ private String jobName; }
@Component public class SchedulerTestDemo extends ScheduledJob { @Override public void run() { System.out.println("我是定時(shí)任務(wù)要執(zhí)行的類.."); System.out.println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now()); } }
/** * 項(xiàng)目啟動(dòng)時(shí),將數(shù)據(jù)庫(kù)中job定時(shí)任務(wù)加載 */ @Component public class GrapeApplicationListener { private final ScheduledJobService scheduledJobService; private final ISysJobService sysJobService; public GrapeApplicationListener(ISysJobService sysJobService, ScheduledJobService scheduledJobService) { this.sysJobService = sysJobService; this.scheduledJobService = scheduledJobService; } @PostConstruct public void initStartJob() { // 初始化job scheduledJobService.initAllJob(sysJobService.list()); } }
@SpringBootApplication(scanBasePackages = {"com.example.grape"}) @MapperScan("com.example.grape.dao.mapper") @EnableScheduling public class GrapeApplication { public static void main(String[] args) { SpringApplication.run(GrapeApplication.class, args); } }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringTask實(shí)現(xiàn)定時(shí)任務(wù)方法講解
- SpringBoot?ScheduledTaskRegistrar解決動(dòng)態(tài)定時(shí)任務(wù)思路詳解
- SpringBoot定時(shí)任務(wù)動(dòng)態(tài)擴(kuò)展ScheduledTaskRegistrar詳解
- 使用spring-task定時(shí)任務(wù)動(dòng)態(tài)配置修改執(zhí)行時(shí)間
- Spring Task定時(shí)任務(wù)每天零點(diǎn)執(zhí)行一次的操作
- SpringBoot2 task scheduler 定時(shí)任務(wù)調(diào)度器四種方式
- spring boot task實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建定時(shí)任務(wù)的方法
- java 中Spring task定時(shí)任務(wù)的深入理解
- Spring Task定時(shí)任務(wù)的配置和使用詳解
- SpringTask-Timer實(shí)現(xiàn)定時(shí)任務(wù)的詳細(xì)代碼
相關(guān)文章
JAVA JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)詳解
這篇文章主要介紹了JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)劃分原理詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-09-09Spring?加載?Application?Context五種方式小結(jié)
這篇文章主要介紹了Spring?加載?Application?Context五種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。2022-01-01java中ImageReader和BufferedImage獲取圖片尺寸實(shí)例
這篇文章主要介紹了java中ImageReader和BufferedImage獲取圖片尺寸實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01Java多線程死鎖問(wèn)題詳解(wait和notify)
線程之間形成相互等待資源的環(huán)時(shí),就會(huì)形成順序死鎖,下面這篇文章主要給大家介紹了關(guān)于Java多線程死鎖問(wèn)題(wait和notify)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-01-01SpringMVC實(shí)現(xiàn)返回響應(yīng)的項(xiàng)目實(shí)踐
本文主要介紹了SpringMVC實(shí)現(xiàn)返回響應(yīng)的項(xiàng)目實(shí)踐,包含返回靜態(tài)頁(yè)面,返回?cái)?shù)據(jù),返回html片段等實(shí)例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02SpringBoot+Tess4j實(shí)現(xiàn)牛的OCR識(shí)別工具的示例代碼
這篇文章主要介紹了SpringBoot+Tess4j實(shí)現(xiàn)牛的OCR識(shí)別工具的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01簡(jiǎn)單了解synchronized和lock的區(qū)別
這篇文章主要介紹了簡(jiǎn)單了解synchronized和lock的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09Java設(shè)計(jì)模式中的策略模式詳細(xì)解析
這篇文章主要介紹了Java設(shè)計(jì)模式中的策略模式詳細(xì)解析,所謂策略模式,指的是做某一件事時(shí)有多種選擇(即策略),且不同的策略之間相互獨(dú)立,而且無(wú)論使用哪種策略,得到的結(jié)果都是相同的,需要的朋友可以參考下2023-12-12