Spring動(dòng)態(tài)管理定時(shí)任務(wù)之ThreadPoolTaskScheduler解讀
Spring動(dòng)態(tài)管理定時(shí)任務(wù)ThreadPoolTaskScheduler
Spring任務(wù)調(diào)度核心類ThreadPoolTaskScheduler,API文檔解釋如下:
Implementation of Spring's TaskScheduler interface, wrapping a native java.util.concurrent.ScheduledThreadPoolExecutor.
Spring的TaskScheduler接口的實(shí)現(xiàn),包裝了一個(gè)本地java.util.concurrent.ScheduledThreadPoolExecutor。
實(shí)現(xiàn)思路
注入調(diào)度類bean,初始化一個(gè)ConcurrentHashMap容器,用來(lái)保存多個(gè)定時(shí)任務(wù)的狀態(tài),每一個(gè)任務(wù)的運(yùn)行狀態(tài)被封裝在ScheduledFuture中,借此類可取消對(duì)應(yīng)的定時(shí)任務(wù)。
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.cjia.spidercommon.model.SpiderJob;
import com.cjia.spiderjob.mapper.SpiderJobMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* 用來(lái)管理(啟動(dòng)、停止、新增、刪除、更新編輯、查看運(yùn)行狀態(tài))定時(shí)任務(wù)(增量任務(wù))
*/
@Slf4j
@RestController
@RequestMapping("spiderJob/cron")
public class CronJobController extends SpiderJobController {
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private Map<Integer, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
@Resource
private SpiderJobMapper spiderJobMapper;
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
/**
* 啟動(dòng)單個(gè)定時(shí)任務(wù)
*/
@RequestMapping("/start/{jobId}")
public String start(@PathVariable Integer jobId) {
SpiderJob job = spiderJobMapper.selectById(jobId);
if (job == null) {
log.warn("任務(wù)[{}]已不存在,無(wú)法啟動(dòng)!", jobId);
return "任務(wù)[" + jobId + "]已不存在,無(wú)法啟動(dòng)!";
}
int enable = job.getEnable();
if (enable == 0) {
log.warn("任務(wù)[{}]已被禁用,無(wú)法啟動(dòng)!", jobId);
return "任務(wù)[" + jobId + "]已被禁用,無(wú)法啟動(dòng)!";
}
// 檢測(cè)該任務(wù)是否已在運(yùn)行調(diào)度中
if (futureMap.get(jobId) != null) {
log.warn("任務(wù)[{}]已在調(diào)度運(yùn)行,無(wú)法重復(fù)啟動(dòng)!", jobId);
return "任務(wù)[" + jobId + "]已在調(diào)度運(yùn)行,無(wú)法重復(fù)啟動(dòng)!";
}
String cron = job.getCron();
// TODO check cron
ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(job), new CronTrigger(cron));
log.info("任務(wù)[{}]已被啟動(dòng)!", jobId);
futureMap.put(jobId, future);
return "任務(wù)[" + jobId + "]已被啟動(dòng)!";
}
/**
* 批量啟動(dòng)定時(shí)任務(wù)
*/
@RequestMapping("/startBatch/{jobIds}")
public String startBatch(@PathVariable String jobIds) {
// TODO jobIds valid
String[] jobIdsArr = jobIds.split(",");
StringBuffer sb = new StringBuffer();
for (String jobId : jobIdsArr) {
String result = start(Integer.valueOf(jobId));
sb.append(result).append("<br>");
}
return sb.toString();
}
/**
* 停止單個(gè)定時(shí)任務(wù)
*/
@RequestMapping("/stop/{jobId}")
public String stop(@PathVariable Integer jobId) {
// 檢測(cè)該任務(wù)是否已在運(yùn)行調(diào)度中
ScheduledFuture<?> future = futureMap.get(jobId);
if (future == null) {
log.warn("任務(wù)[{}]已不在調(diào)度中,無(wú)法停止!", jobId);
return "任務(wù)[" + jobId + "]已不在調(diào)度中,無(wú)法停止!";
} else {
future.cancel(true);
futureMap.remove(jobId);
log.info("任務(wù)[{}]已被停止!", jobId);
return "任務(wù)[" + jobId + "]已被停止!";
}
}
/**
* 批量停止定時(shí)任務(wù)
*/
@RequestMapping("/stopBatch/{jobIds}")
public String stopBatch(@PathVariable String jobIds) {
// TODO jobIds valid
String[] jobIdsArr = jobIds.split(",");
StringBuffer sb = new StringBuffer();
for (String jobId : jobIdsArr) {
String result = stop(Integer.valueOf(jobId));
sb.append(result).append("<br>");
}
return sb.toString();
}
/**
* 查看當(dāng)前時(shí)刻調(diào)度中的定時(shí)任務(wù)
*/
@RequestMapping("/status")
public String getAllStatus() {
Set<Integer> runningKeys = futureMap.keySet();
return "當(dāng)前正在調(diào)度的任務(wù)列表:" + runningKeys.toString();
}
@Data
private class MyRunnable implements Runnable {
private SpiderJob job;
public MyRunnable(SpiderJob job) {
this.job = job;
}
@Override
public void run() {
log.info("運(yùn)行定時(shí)任務(wù)[{}: {}] at {}!", job.getId(), job.getBizName(), LocalDateTime.now());
executeIncrementJob(job.getBizName());
}
}
}ThreadPoolTaskScheduler 定時(shí)任務(wù)實(shí)現(xiàn)
org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定時(shí)任務(wù)調(diào)度線程池
CREATE TABLE `sys_job` ( `id` bigint(20) NOT NULL COMMENT '任務(wù)key', `job_name` varchar(64) NOT NULL COMMENT '任務(wù)名稱', `bean_class` varchar(128) NOT NULL COMMENT '類路徑', `cron_expression` varchar(64) NOT NULL COMMENT 'cron表達(dá)式', `status` tinyint(1) NOT NULL COMMENT '狀態(tài)值 @JobStatusEnum 詳見(jiàn)具體枚舉類', `is_deleted` tinyint(1) DEFAULT '0' COMMENT '刪除標(biāo)識(shí) 1是 0否', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Configuration
@Slf4j
public class SchedulingConfigure {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
log.info("開(kāi)始創(chuàng)建定時(shí)任務(wù)調(diào)度線程池");
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
log.info("創(chuàng)建定時(shí)任務(wù)調(diào)度線程池完成!");
return threadPoolTaskScheduler;
}
}public enum JobStatusEnum {
/**
* 未加入調(diào)度器
*/
NOT_SCHEDULE(0, "未加入調(diào)度器"),
/**
* 加入調(diào)度器,但未運(yùn)行
*/
SCHEDULED_BUT_NOT_RUNNING(1, "加入調(diào)度器,但未運(yùn)行"),
/**
* 從調(diào)度器中已刪除
*/
DELETED(2, "從調(diào)度器中已刪除"),
;
private Integer status;
private String detail;
JobStatusEnum(Integer status, String detail) {
this.status = status;
this.detail = detail;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getDetail() {
return detail;
}
public void setDetail(String detail) {
this.detail = detail;
}
}
@Component
@Slf4j
public class ScheduledJobService {
private final ReentrantLock lock = new ReentrantLock();
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Autowired
private SysJobService jobService;
@Autowired
private SpringBeanUtils springBeanUtils;
/**
* 已經(jīng)加入調(diào)度器的任務(wù)map
*/
private final ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
/**
* 初始化啟動(dòng)任務(wù)
*
* @param sysJobs 數(shù)據(jù)庫(kù)任務(wù)集合
*/
public void initAllJob(List<SysJob> sysJobs) {
if (CollectionUtils.isEmpty(sysJobs)) {
return;
}
for (SysJob sysJob : sysJobs) {
if (JobStatusEnum.NOT_SCHEDULE.getStatus().equals(sysJob.getStatus())
|| JobStatusEnum.DELETED.getStatus().equals(sysJob.getStatus())
|| this.isScheduled(sysJob.getId())) {
// 任務(wù)初始化狀態(tài)或已刪除或已加載到調(diào)度器中
continue;
}
// 將任務(wù)加入調(diào)度器
this.doScheduleJob(sysJob);
}
}
/**
* 啟動(dòng)任務(wù)
*
* @param jobId job主鍵id
*/
public void start(Long jobId) {
log.info("啟動(dòng)任務(wù):-> jobId_{}", jobId);
// 加入調(diào)度器
schedule(jobId);
log.info("啟動(dòng)任務(wù)結(jié)束:-> jobId_{}", jobId);
// 更新任務(wù)狀態(tài)
jobService.updateJobStatus(jobId, JobStatusEnum.SCHEDULED_BUT_NOT_RUNNING.getStatus());
}
/**
* 停止任務(wù)
*
* @param jobId job主鍵id
*/
public void stop(Long jobId) {
log.info("停止任務(wù):-> jobId_{}", jobId);
// 取消任務(wù)
cancel(jobId);
log.info("停止任務(wù)結(jié)束:-> jobId_{}", jobId);
// 更新表中任務(wù)狀態(tài)為已停止
jobService.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus());
}
/**
* 移除任務(wù)
*
* @param jobId job主鍵id
*/
public void remove(Long jobId) {
log.info("移除任務(wù):-> jobId_{}", jobId);
// 取消任務(wù)
cancel(jobId);
log.info("移除任務(wù)結(jié)束:-> jobId_{}", jobId);
// 更新表中任務(wù)狀態(tài)為已刪除
jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.getStatus());
}
/**
* 取消
*
* @param jobId 工作id
*/
private void cancel(Long jobId) {
// 任務(wù)是否存在
if (scheduledFutureMap.containsKey(jobId)) {
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(jobId);
if (!scheduledFuture.isCancelled()) {
// 取消調(diào)度
scheduledFuture.cancel(true);
}
}
}
private void schedule(Long jobId) {
// 添加鎖,只允許單個(gè)線程訪問(wèn),防止任務(wù)啟動(dòng)多次
lock.lock();
try {
if (isScheduled(jobId)) {
log.error("任務(wù)jobId_{}已經(jīng)加入調(diào)度器,無(wú)需重復(fù)操作", jobId);
return;
}
// 通過(guò)jobKey查詢jobBean對(duì)象
SysJob sysJob = jobService.getById(jobId);
// 啟動(dòng)定時(shí)任務(wù)
doScheduleJob(sysJob);
} finally {
// 釋放鎖資源
lock.unlock();
}
}
/**
* 執(zhí)行啟動(dòng)任務(wù)
*
* @param sysJob 任務(wù)實(shí)體類對(duì)象
*/
private void doScheduleJob(SysJob sysJob) {
Long jobId = sysJob.getId();
String beanClass = sysJob.getBeanClass();
String jobName = sysJob.getJobName();
String cron = sysJob.getCronExpression();
// 從Spring中獲取目標(biāo)的job業(yè)務(wù)實(shí)現(xiàn)類
ScheduledJob scheduledJob = parseFrom(beanClass);
if (scheduledJob == null) {
return;
}
scheduledJob.setJobId(jobId);
scheduledJob.setJobName(jobName);
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob,
triggerContext -> {
CronTrigger cronTrigger = new CronTrigger(cron);
return cronTrigger.nextExecutionTime(triggerContext);
});
log.info("任務(wù)加入調(diào)度器 -> jobId:{},jobName:{}", jobId, jobName);
// 將啟動(dòng)的任務(wù)放入map
assert scheduledFuture != null;
scheduledFutureMap.put(jobId, scheduledFuture);
}
/**
* 任務(wù)是否已經(jīng)進(jìn)入調(diào)度器
*
* @param jobId 任務(wù)主鍵key
* @return {@link Boolean}
*/
private Boolean isScheduled(Long jobId) {
if (scheduledFutureMap.containsKey(jobId)) {
return !scheduledFutureMap.get(jobId).isCancelled();
}
return false;
}
private ScheduledJob parseFrom(String beanClass) {
try {
Class<?> clazz = Class.forName(beanClass);
return (ScheduledJob) springBeanUtils.getBean(clazz);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
@Component
public class SpringBeanUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringBeanUtils.applicationContext = applicationContext;
}
/**
* 獲取applicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通過(guò)name獲取 Bean.
*/
public Object getBean(String name) {
return getApplicationContext().getBean(name);
}
/**
* 通過(guò)class獲取Bean.
*/
public <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
/**
* 通過(guò)name,以及Clazz返回指定的Bean
*/
public <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
@Data
public abstract class ScheduledJob implements Runnable {
/**
* 任務(wù)主鍵id
*/
private Long jobId;
/**
* 任務(wù)名
*/
private String jobName;
}
@Component
public class SchedulerTestDemo extends ScheduledJob {
@Override
public void run() {
System.out.println("我是定時(shí)任務(wù)要執(zhí)行的類..");
System.out.println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now());
}
}
/**
* 項(xiàng)目啟動(dòng)時(shí),將數(shù)據(jù)庫(kù)中job定時(shí)任務(wù)加載
*/
@Component
public class GrapeApplicationListener {
private final ScheduledJobService scheduledJobService;
private final ISysJobService sysJobService;
public GrapeApplicationListener(ISysJobService sysJobService, ScheduledJobService scheduledJobService) {
this.sysJobService = sysJobService;
this.scheduledJobService = scheduledJobService;
}
@PostConstruct
public void initStartJob() {
// 初始化job
scheduledJobService.initAllJob(sysJobService.list());
}
}
@SpringBootApplication(scanBasePackages = {"com.example.grape"})
@MapperScan("com.example.grape.dao.mapper")
@EnableScheduling
public class GrapeApplication {
public static void main(String[] args) {
SpringApplication.run(GrapeApplication.class, args);
}
}
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringTask實(shí)現(xiàn)定時(shí)任務(wù)方法講解
- SpringBoot?ScheduledTaskRegistrar解決動(dòng)態(tài)定時(shí)任務(wù)思路詳解
- SpringBoot定時(shí)任務(wù)動(dòng)態(tài)擴(kuò)展ScheduledTaskRegistrar詳解
- 使用spring-task定時(shí)任務(wù)動(dòng)態(tài)配置修改執(zhí)行時(shí)間
- Spring Task定時(shí)任務(wù)每天零點(diǎn)執(zhí)行一次的操作
- SpringBoot2 task scheduler 定時(shí)任務(wù)調(diào)度器四種方式
- spring boot task實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建定時(shí)任務(wù)的方法
- java 中Spring task定時(shí)任務(wù)的深入理解
- Spring Task定時(shí)任務(wù)的配置和使用詳解
- SpringTask-Timer實(shí)現(xiàn)定時(shí)任務(wù)的詳細(xì)代碼
相關(guān)文章
JAVA JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)詳解
這篇文章主要介紹了JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)劃分原理詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-09-09
Spring?加載?Application?Context五種方式小結(jié)
這篇文章主要介紹了Spring?加載?Application?Context五種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。2022-01-01
java中ImageReader和BufferedImage獲取圖片尺寸實(shí)例
這篇文章主要介紹了java中ImageReader和BufferedImage獲取圖片尺寸實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01
Java多線程死鎖問(wèn)題詳解(wait和notify)
線程之間形成相互等待資源的環(huán)時(shí),就會(huì)形成順序死鎖,下面這篇文章主要給大家介紹了關(guān)于Java多線程死鎖問(wèn)題(wait和notify)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-01-01
SpringMVC實(shí)現(xiàn)返回響應(yīng)的項(xiàng)目實(shí)踐
本文主要介紹了SpringMVC實(shí)現(xiàn)返回響應(yīng)的項(xiàng)目實(shí)踐,包含返回靜態(tài)頁(yè)面,返回?cái)?shù)據(jù),返回html片段等實(shí)例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
SpringBoot+Tess4j實(shí)現(xiàn)牛的OCR識(shí)別工具的示例代碼
這篇文章主要介紹了SpringBoot+Tess4j實(shí)現(xiàn)牛的OCR識(shí)別工具的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
簡(jiǎn)單了解synchronized和lock的區(qū)別
這篇文章主要介紹了簡(jiǎn)單了解synchronized和lock的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09
Java設(shè)計(jì)模式中的策略模式詳細(xì)解析
這篇文章主要介紹了Java設(shè)計(jì)模式中的策略模式詳細(xì)解析,所謂策略模式,指的是做某一件事時(shí)有多種選擇(即策略),且不同的策略之間相互獨(dú)立,而且無(wú)論使用哪種策略,得到的結(jié)果都是相同的,需要的朋友可以參考下2023-12-12

