基于SpringBoot實(shí)現(xiàn)輕量級(jí)的動(dòng)態(tài)定時(shí)任務(wù)調(diào)度的方法
在使用SpringBoot框架進(jìn)行開(kāi)發(fā)時(shí),一般都是通過(guò)@Scheduled注解進(jìn)行定時(shí)任務(wù)的開(kāi)發(fā):
@Component public class TestTask { @Scheduled(cron="0/5 * * * * ? ") //每5秒執(zhí)行一次 public void execute(){ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("任務(wù)執(zhí)行" + df.format(new Date())); } }
但是這種方式存在一個(gè)問(wèn)題,那就是任務(wù)的周期控制是死的,必須編寫在代碼中,如果遇到需要在系統(tǒng)運(yùn)行過(guò)程中想中止、立即執(zhí)行、修改執(zhí)行周期等動(dòng)態(tài)操作的需求時(shí),使用注解的方式便不能滿足了,當(dāng)然為了滿足此種需求可以額外再引入其他任務(wù)調(diào)度插件(例如XXL-Job等),但是引入其他組件是需要衡量成本的,額外的依賴成本、組件的維護(hù)成本、開(kāi)發(fā)的復(fù)雜度等等,所以如果系統(tǒng)體量不是那么大,完全沒(méi)必要通過(guò)增加組件來(lái)完成,可以基于SpringBoot框架實(shí)現(xiàn)一套內(nèi)置輕量級(jí)的任務(wù)調(diào)度。
設(shè)計(jì)思路
整體設(shè)計(jì)
這里我們把定時(shí)任務(wù)以類作為基礎(chǔ)單位,即一個(gè)類為一個(gè)任務(wù),然后通過(guò)配置數(shù)據(jù)的方式,進(jìn)行任務(wù)的讀取,通過(guò)反射生成任務(wù)對(duì)象,使用SpringBoot本身的線程池任務(wù)調(diào)度,完成動(dòng)態(tài)的定時(shí)任務(wù)驅(qū)動(dòng),同時(shí)通過(guò)接口支撐實(shí)現(xiàn)相應(yīng)的REST API對(duì)外暴露接口
任務(wù)模型
首先基于模板模式,設(shè)計(jì)基礎(chǔ)的任務(wù)執(zhí)行流程抽象類,定義出一個(gè)定時(shí)任務(wù)需要執(zhí)行的內(nèi)容和步驟和一些通用的方法函數(shù),后續(xù)具體的定時(shí)任務(wù)直接繼承該父類,實(shí)現(xiàn)該父類的before、start、after三個(gè)抽象函數(shù)即可,所有公共操作均在抽象父類完成
特殊說(shuō)明:
基于此方法創(chuàng)建的類是不歸Spring的容器管理的,所以自定義的任務(wù)子類中是無(wú)法使用SpringBoot中的任何注解,尤其在自定義任務(wù)類中如果需要依賴其他Bean時(shí),需要借助抽象父類AbstractBaseCronTask中已經(jīng)實(shí)現(xiàn)的<T> T getServer(Class<T> className)來(lái)完成,getServer的實(shí)現(xiàn)如下:
public <T> T getServer(Class<T> className){ return applicationContext.getBean(className); }
是通過(guò)SpringBoot中的ApplicationContext接口來(lái)獲取Spring的上下文,以此來(lái)滿足可以獲取Spring中其他Bean的訴求。
例如,有個(gè)定時(shí)任務(wù)TaskOne類,它需要使用UserService類中的 caculateMoney()的方法,勢(shì)必這個(gè)定時(shí)任務(wù)需要依賴UserService類,而TaskOne并非是Spring創(chuàng)建的對(duì)象,而是我們?nèi)藶楦深A(yù)生成的對(duì)象,所以它是不在Spring的Bean管理范圍的,自然也就無(wú)法使用@Autowird等方式注入U(xiǎn)serService類,此時(shí)就需要使用getServer方法來(lái)獲取UserService對(duì)象
//自定義定時(shí)任務(wù)類 public class TaskOne extends AbstractBaseCronTask { private UserService userService; public TestTask(TaskEntity taskEntity) { super(taskEntity); } @Override public void beforeJob() { //任務(wù)運(yùn)行第一步,先將userService進(jìn)行變量注入 userService = getServer(UserService.class); …… } @Override public void startJob() { if(XXXX){ //直接調(diào)用getServer獲取需要的bean User user = getServer(UserMapper.class).findUser("111223") userService.caluateMoney(user); //……其他代碼 } } @Override public void afterJob() { } }
任務(wù)對(duì)象加載過(guò)程
核心邏輯在于利用反射,在SpringBoot啟動(dòng)后動(dòng)態(tài)創(chuàng)建相應(yīng)的定時(shí)任務(wù)類,并將其放置到SpringBoot的定時(shí)線程池中進(jìn)行維護(hù),同時(shí)將該對(duì)象同步存放至內(nèi)存中一份,便于可以實(shí)時(shí)調(diào)用,當(dāng)進(jìn)行修改任務(wù)相關(guān)配置時(shí),需要重新加載一次內(nèi)容。
public class TaskScheduleServerImpl implements TaskScheduleServer { //正在運(yùn)行的任務(wù) private static ConcurrentHashMap<String, ScheduledFuture> runningTasks = new ConcurrentHashMap<>(); //線程池任務(wù)調(diào)度 private ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); public boolean addTaskToScheduling(TaskEntity task) { if(!runningTasks.containsKey(task.getTaskId())){ try{ Class<?> clazz = Class.forName(task.getTaskClass()); Constructor c = clazz.getConstructor(TaskEntity.class); AbstractBaseCronTask runnable = (AbstractBaseCronTask) c.newInstance(task); //反射方式生成對(duì)象不屬于Spring容器管控,對(duì)于Spring的bean使用需要手動(dòng)注入 runnable.setApplicationContext(context); CronTrigger cron = new CronTrigger(task.getTaskCron()); //put到runTasks runningTasks.put(task.getTaskId(), Objects.requireNonNull(this.threadPoolTaskScheduler.schedule(runnable, cron))); //存入內(nèi)存中,便于外部調(diào)用 ramTasks.put(task.getTaskId(),runnable); task.setTaskRamStatus(1); taskInfoOpMapper.updateTaskInfo(task); return true; }catch (Exception e){ log.error("定時(shí)任務(wù)加載失敗..."+e); } } return false; } }
部分源碼
這里將配置內(nèi)容放入數(shù)據(jù)庫(kù)中,直接以數(shù)據(jù)庫(kù)中的表作為任務(wù)配置的基礎(chǔ)
/** * 任務(wù)對(duì)象 **/ @Data public class TaskEntity implements Serializable { //任務(wù)唯一ID private String taskId; //任務(wù)名稱 private String taskName; //任務(wù)描述 private String taskDesc; //執(zhí)行周期配置 private String taskCron; //任務(wù)類的全路徑 private String taskClass; //任務(wù)的額外配置 private String taskOutConfig; //任務(wù)創(chuàng)建時(shí)間 private String taskCreateTime; //任務(wù)是否啟動(dòng),1啟用,0不啟用 private Integer taskIsUse; //是否隨系統(tǒng)啟動(dòng)立即執(zhí)行 private Integer taskBootUp; //任務(wù)上次執(zhí)行狀態(tài) private Integer taskLastRun; //任務(wù)是否加載至內(nèi)存中 private Integer taskRamStatus; }
核心邏輯,加載定時(shí)任務(wù)接口及其實(shí)現(xiàn)類
public interface TaskScheduleServer { ConcurrentHashMap<String, AbstractBaseCronTask> getTaskSchedulingRam(); /** * 初始化任務(wù)調(diào)度 */ void initScheduling(); /** * 添加任務(wù)至內(nèi)存及容器 * @param taskEntity 任務(wù)實(shí)體 * @return boolean */ boolean addTaskToScheduling(TaskEntity taskEntity); /** * 從任務(wù)調(diào)度器中移除任務(wù) * @param id 任務(wù)id * @return Boolean */ boolean removeTaskFromScheduling(String id); /** * 執(zhí)行指定任務(wù) * @param id 任務(wù)id * @return double 耗時(shí) */ double runTaskById(String id); /** * 清空任務(wù) */ void claearAllTask(); /** * 加載所有任務(wù) */ void loadAllTask(); /** * 運(yùn)行開(kāi)機(jī)自啟任務(wù) */ void runBootUpTask(); } @Slf4j @Component public class TaskScheduleServerImpl implements TaskScheduleServer { ………… @Override public double runTaskById(String id) { TaskEntity task = taskInfoOpMapper.queryTaskInfoById(id); if(null!=task) { if (runningTasks.containsKey(task.getTaskId())){ ramTasks.get(task.getTaskId()).run(); return ramTasks.get(task.getTaskId()).getRunTime(); } } return 0d; } @Override public void claearAllTask() { ramTasks.clear(); log.info("【定時(shí)任務(wù)控制器】清除內(nèi)存任務(wù) 完成"); runningTasks.clear(); log.info("【定時(shí)任務(wù)控制器】清除線程任務(wù) 完成"); threadPoolTaskScheduler.shutdown(); } @Override public void loadAllTask() { List<TaskEntity> allTask = taskInfoOpMapper.queryTaskInfo(null); for (TaskEntity task : allTask) { if(addTaskToScheduling(task)){ log.info("【定時(shí)任務(wù)初始化】裝填任務(wù):{} [ 任務(wù)執(zhí)行周期:{} ] [ bootup:{}]",task.getTaskName(),task.getTaskCron(),task.getTaskBootUp()); } } } @Override public void runBootUpTask() { TaskEntity entity = new TaskEntity().taskBootUp(1); List<TaskEntity> list = taskInfoOpMapper.queryTaskInfo(entity); for(TaskEntity task:list){ runTaskById(task.getTaskId()); } } }
在SpringBoot中的加載類
@Order(3) @Component @Slf4j public class AfterAppStarted implements ApplicationRunner { TaskScheduleServer taskScheduleServer; @Autowired public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) { this.taskScheduleServer = taskScheduleServer; } @Override public void run(ApplicationArguments args) throws Exception { //運(yùn)行隨系統(tǒng)啟動(dòng)的定時(shí)任務(wù) taskScheduleServer.runBootUpTask(); } }
對(duì)外暴露控制接口及其Service
@RestController @RequestMapping("/taskScheduling/manage") @Api(tags = "數(shù)據(jù)源管理服務(wù)") public class TaskSchedulingController { TaskScheduleManagerService taskScheduleManagerService; @Autowired public void setTaskScheduleManagerService(TaskScheduleManagerService taskScheduleManagerService) { this.taskScheduleManagerService = taskScheduleManagerService; } @PostMapping("/search") @Operation(summary = "分頁(yè)查詢?nèi)蝿?wù)") public Response searchData(@RequestBody SearchTaskDto param){ return Response.success(taskScheduleManagerService.searchTaskForPage(param)); } @GetMapping("/detail") @Operation(summary = "具體任務(wù)對(duì)象") public Response searchDetail(String taskId){ return Response.success(taskScheduleManagerService.searchTaskDetail(taskId)); } @GetMapping("/shutdown") @Operation(summary = "關(guān)閉指定任務(wù)") public Response shutdownTask(String taskId){ return Response.success(taskScheduleManagerService.shutdownTask(taskId)); } @GetMapping("/open") @Operation(summary = "開(kāi)啟指定任務(wù)") public Response openTask(String taskId){ return Response.success(taskScheduleManagerService.openTask(taskId)); } @GetMapping("/run") @Operation(summary = "運(yùn)行指定任務(wù)") public Response runTask(String taskId){ return Response.success(taskScheduleManagerService.runTask(taskId)); } @PostMapping("/update") @Operation(summary = "更新指定任務(wù)") public Response updateTask(@RequestBody TaskEntity taskEntity){ return Response.success(taskScheduleManagerService.updateTaskBusinessInfo(taskEntity)); } }
相關(guān)接口實(shí)現(xiàn)類
@Service public class TaskScheduleManagerServiceImpl implements TaskScheduleManagerService { private TaskInfoOpMapper taskInfoOpMapper; private TaskScheduleServer taskScheduleServer; @Autowired public void setTaskInfoOpMapper(TaskInfoOpMapper taskInfoOpMapper) { this.taskInfoOpMapper = taskInfoOpMapper; } @Autowired public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) { this.taskScheduleServer = taskScheduleServer; } @Override public IPage<TaskEntity> searchTaskForPage(SearchTaskDto dto) { Page<TaskEntity> pageParam = new Page<>(1,10); pageParam.setAsc("task_id"); return taskInfoOpMapper.queryTaskInfoPage(pageParam,dto.getFilterKey(),dto.getBootUp(),dto.getLastRunStatus()); } @Override public TaskEntity searchTaskDetail(String taskId) { if(!StringUtils.isEmpty(taskId)){ return taskInfoOpMapper.queryTaskInfoById(taskId); } return null; } @Override public TaskRunRetDto runTask(String taskId) { AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId); TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.run, 0); if(null != task) { double time = taskScheduleServer.runTaskById(taskId); result.setResult(1); return result.extend(time).taskInfo(task.getThisTaskInfo()); } else { return result.extend("任務(wù)未啟用"); } } @Override public TaskRunRetDto shutdownTask(String taskId) { AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId); TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.shutdown, 0); if(null != task) { boolean flag = taskScheduleServer.removeTaskFromScheduling(taskId); if(flag) { result.setResult(1); } return result.extend("任務(wù)成功關(guān)閉").taskInfo(task.getThisTaskInfo()); } else { return result.extend("任務(wù)未啟用"); } } @Override public TaskRunRetDto openTask(String taskId) { TaskEntity task = taskInfoOpMapper.queryTaskInfoById(taskId); TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.open, 0); if(null != task) { if (!taskScheduleServer.getTaskSchedulingRam().containsKey(taskId)) { boolean flag = taskScheduleServer.addTaskToScheduling(task); if(flag) { result.setResult(1); } return result.extend("任務(wù)開(kāi)啟成功").taskInfo(task); } else { return result.extend("任務(wù)處于啟動(dòng)狀態(tài)").taskInfo(task); } }else { return result.extend("任務(wù)不存在!"); } } @Override public TaskRunRetDto updateTaskBusinessInfo(TaskEntity entity) { TaskEntity task = searchTaskDetail(entity.getTaskId()); TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.update, 0).taskInfo(entity); String config = entity.getTaskOutConfig(); if(null != config && !JSONUtil.isJson(config) && !JSONUtil.isJsonArray(config)){ result.setResult(0); result.extend("更新任務(wù)失敗,任務(wù)配置必須為JSON或空"); result.taskInfo(entity); return result; } task.setTaskCron(entity.getTaskCron()); task.setTaskOutConfig(entity.getTaskOutConfig()); task.setTaskName(entity.getTaskName()); task.setTaskDesc(entity.getTaskDesc()); int num = taskInfoOpMapper.updateTaskInfo(task); if (num == 1) { result.setResult(1); result.extend("成功更新任務(wù)"); result.taskInfo(entity); //重新刷新任務(wù) taskScheduleServer.removeTaskFromScheduling(entity.getTaskId()); taskScheduleServer.addTaskToScheduling(task); } return result; }
效果
數(shù)據(jù)庫(kù)中配置任務(wù)
任務(wù)代碼
public class TestTask extends AbstractBaseCronTask { public TestTask(TaskEntity taskEntity) { super(taskEntity); } @Override public void beforeJob() { log.info("測(cè)試任務(wù)開(kāi)始"); } @Override public void startJob() { } @Override public void afterJob() { } }
任務(wù)查看
執(zhí)行效果
到此這篇關(guān)于基于SpringBoot實(shí)現(xiàn)輕量級(jí)的動(dòng)態(tài)定時(shí)任務(wù)調(diào)度的文章就介紹到這了,更多相關(guān)SpringBoot動(dòng)態(tài)定時(shí)任務(wù)調(diào)度內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot項(xiàng)目使用@Scheduled注解實(shí)現(xiàn)定時(shí)任務(wù)的方法
- SpringBoot整合ShedLock解決定時(shí)任務(wù)防止重復(fù)執(zhí)行的問(wèn)題
- SpringBoot項(xiàng)目如何使用多線程執(zhí)行定時(shí)任務(wù)
- SpringBoot最新定時(shí)任務(wù)的7種實(shí)現(xiàn)方案
- SpringBoot集成ShedLock實(shí)現(xiàn)分布式定時(shí)任務(wù)的示例代碼
- springboot定時(shí)任務(wù)不起作用問(wèn)題及解決
- SpringBoot最簡(jiǎn)單的定時(shí)任務(wù)@Scheduler的使用及解讀
相關(guān)文章
Springboot實(shí)現(xiàn)導(dǎo)入導(dǎo)出Excel的方法
今天帶各位小伙伴學(xué)習(xí)Springboot實(shí)現(xiàn)導(dǎo)入導(dǎo)出Excel的方法,文中有非常詳細(xì)的介紹,對(duì)正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05java 中 poi解析Excel文件版本問(wèn)題解決辦法
這篇文章主要介紹了java 中 poi解析Excel文件版本問(wèn)題解決辦法的相關(guān)資料,需要的朋友可以參考下2017-08-08解決JSON.toJSONString首字母大小寫的問(wèn)題
這篇文章主要介紹了解決JSON.toJSONString首字母大小寫的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02java使用compareTo實(shí)現(xiàn)一個(gè)類的對(duì)象之間比較大小操作
這篇文章主要介紹了java使用compareTo實(shí)現(xiàn)一個(gè)類的對(duì)象之間比較大小操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09Springboot項(xiàng)目升級(jí)2.2.x升至2.7.x的示例代碼
本文主要介紹了Springboot項(xiàng)目升級(jí)2.2.x升至2.7.x的示例代碼,會(huì)有很多的坑,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09springboot整合log4j的踩坑實(shí)戰(zhàn)記錄
log日志的重要性不言而喻,所以我們需要在系統(tǒng)內(nèi)根據(jù)實(shí)際的業(yè)務(wù)進(jìn)行日志的整合,下面這篇文章主要給大家介紹了關(guān)于springboot整合log4j的踩坑實(shí)戰(zhàn)記錄,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04關(guān)于spring中bean注冊(cè)的優(yōu)先級(jí)分析
Spring框架中,Bean的定義方式主要有三種:XML定義、注解掃描和配置類中的@Bean注解,在Bean注冊(cè)過(guò)程中,XML定義的GenericBeanDefinition優(yōu)先級(jí)最高2024-09-09SWT(JFace)體驗(yàn)之ApplicationWindow
SWT(JFace)體驗(yàn)之ApplicationWindow2009-06-06