SpringBoot實現(xiàn)定時任務動態(tài)管理示例
一、功能說明
SpringBoot的定時任務的加強工具,實現(xiàn)對SpringBoot原生的定時任務進行動態(tài)管理,完全兼容原生@Scheduled注解,無需對原本的定時任務進行修改
二、快速使用
具體的功能已經(jīng)封裝成SpringBoot-starter即插即用:
<dependency> <groupId>com.github.guoyixing</groupId> <artifactId>spring-boot-starter-super-scheduled</artifactId> <version>0.3.1</version> </dependency>
使用方法和源碼:
- 碼云:https://gitee.com/qiaodaimadewangcai/super-scheduled
- github:https://github.com/guoyixing/super-scheduled
推薦一個開源免費的 Spring Boot 實戰(zhàn)項目:
https://github.com/javastacks/spring-boot-best-practice
三、實現(xiàn)原理
1、動態(tài)管理實現(xiàn)
(1) 配置管理介紹
@Component("superScheduledConfig") public class SuperScheduledConfig { /** * 執(zhí)行定時任務的線程池 */ private ThreadPoolTaskScheduler taskScheduler; /** * 定時任務名稱與定時任務回調(diào)鉤子 的關聯(lián)關系容器 */ private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>(); /** * 定時任務名稱與定時任務需要執(zhí)行的邏輯 的關聯(lián)關系容器 */ private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>(); /** * 定時任務名稱與定時任務的源信息 的關聯(lián)關系容器 */ private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>(); /* 普通的get/sets省略 */ }
(2) 使用后處理器攔截SpringBoot原本的定時任務
- 實現(xiàn)
ApplicationContextAware
接口拿到SpringBoot的上下文 - 實現(xiàn)
BeanPostProcessor
接口,將這個類標記為后處理器,后處理器會在每個bean實例化之后執(zhí)行 - 使用
@DependsOn
注解強制依賴SuperScheduledConfig
類,讓SpringBoot實例化SuperScheduledPostProcessor
類之前先實例化SuperScheduledConfig
類 - 主要實現(xiàn)邏輯在
postProcessAfterInitialization()
方法中
@DependsOn({"superScheduledConfig"}) @Component @Order public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware { protected final Log logger = LogFactory.getLog(getClass()); private ApplicationContext applicationContext; /** * 實例化bean之前的操作 * @param bean bean實例 * @param beanName bean的Name */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } /** * 實例化bean之后的操作 * @param bean bean實例 * @param beanName bean的Name */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //1.獲取配置管理器 SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class); //2.獲取當前實例化完成的bean的所有方法 Method[] methods = bean.getClass().getDeclaredMethods(); //循環(huán)處理對每個方法逐一處理 if (methods.length > 0) { for (Method method : methods) { //3.嘗試在該方法上獲取@Scheduled注解(SpringBoot的定時任務注解) Scheduled annotation = method.getAnnotation(Scheduled.class); //如果無法獲取到@Scheduled注解,就跳過這個方法 if (annotation == null) { continue; } //4.創(chuàng)建定時任務的源屬性 //創(chuàng)建定時任務的源屬性(用來記錄定時任務的配置,初始化的時候記錄的是注解上原本的屬性) ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean); //對注解上獲取到源屬性中的屬性進行檢測 if (!scheduledSource.check()) { throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的注解參數(shù)錯誤"); } //生成定時任務的名稱(id),使用beanName+“.”+方法名 String name = beanName + "." + method.getName(); //將以key-value的形式,將源數(shù)據(jù)存入配置管理器中,key:定時任務的名稱 value:源數(shù)據(jù) superScheduledConfig.addScheduledSource(name, scheduledSource); try { //5.將原本SpringBoot的定時任務取消掉 clearOriginalScheduled(annotation); } catch (Exception e) { throw new SuperScheduledException("在關閉原始方法" + beanName + method.getName() + "時出現(xiàn)錯誤"); } } } //最后bean保持原有返回 return bean; } /** * 修改注解原先的屬性 * @param annotation 注解實例對象 * @throws Exception */ private void clearOriginalScheduled(Scheduled annotation) throws Exception { changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED); changeAnnotationValue(annotation, "fixedDelay", -1L); changeAnnotationValue(annotation, "fixedDelayString", ""); changeAnnotationValue(annotation, "fixedRate", -1L); changeAnnotationValue(annotation, "fixedRateString", ""); changeAnnotationValue(annotation, "initialDelay", -1L); changeAnnotationValue(annotation, "initialDelayString", ""); } /** * 獲取SpringBoot的上下文 * @param applicationContext SpringBoot的上下文 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
(3) 使用ApplicationRunner
初始化自定義的定時任務運行器
- 實現(xiàn)
ApplicationContextAware
接口拿到SpringBoot的上下文 - 使用
@DependsOn
注解強制依賴threadPoolTaskScheduler
類 - 實現(xiàn)
ApplicationRunner
接口,在所有bean初始化結束之后,運行自定義邏輯 - 主要實現(xiàn)邏輯在
run()
方法中
@DependsOn("threadPoolTaskScheduler") @Component public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware { protected final Log logger = LogFactory.getLog(getClass()); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private ApplicationContext applicationContext; /** * 定時任務配置管理器 */ @Autowired private SuperScheduledConfig superScheduledConfig; /** * 定時任務執(zhí)行線程 */ @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Override public void run(ApplicationArguments args) { //1.定時任務配置管理器中緩存 定時任務執(zhí)行線程 superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2.獲取所有定時任務源數(shù)據(jù) Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource(); //逐一處理定時任務 for (String name : nameToScheduledSource.keySet()) { //3.獲取定時任務源數(shù)據(jù) ScheduledSource scheduledSource = nameToScheduledSource.get(name); //4.獲取所有增強類 String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class); //5.創(chuàng)建執(zhí)行控制器 SuperScheduledRunnable runnable = new SuperScheduledRunnable(); //配置執(zhí)行控制器 runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6.逐一處理增強類(增強器實現(xiàn)原理后面具體分析) List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length); for (String baseStrengthenBeanName : baseStrengthenBeanNames) { //7.將增強器代理成point Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName); //創(chuàng)建代理 Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy.setSuperScheduledName(name); //8.所有的points連成起來 points.add(proxy); } //將point形成調(diào)用鏈 runnable.setChain(new Chain(points)); //將執(zhí)行邏輯封裝并緩存到定時任務配置管理器中 superScheduledConfig.addRunnable(name, runnable::invoke); try { //8.啟動定時任務 ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler , scheduledSource, runnable::invoke); //將線程回調(diào)鉤子存到任務配置管理器中 superScheduledConfig.addScheduledFuture(name, schedule); logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經(jīng)啟動..."); } catch (Exception e) { throw new SuperScheduledException("任務" + name + "啟動失敗,錯誤信息:" + e.getLocalizedMessage()); } } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
(4) 進行動態(tài)管理
@Component public class SuperScheduledManager { protected final Log logger = LogFactory.getLog(getClass()); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private SuperScheduledConfig superScheduledConfig; /** * 修改Scheduled的執(zhí)行周期 * * @param name scheduled的名稱 * @param cron cron表達式 */ public void setScheduledCron(String name, String cron) { //終止原先的任務 cancelScheduled(name); //創(chuàng)建新的任務 ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setCron(cron); addScheduled(name, scheduledSource); } /** * 修改Scheduled的fixedDelay * * @param name scheduled的名稱 * @param fixedDelay 上一次執(zhí)行完畢時間點之后多長時間再執(zhí)行 */ public void setScheduledFixedDelay(String name, Long fixedDelay) { //終止原先的任務 cancelScheduled(name); //創(chuàng)建新的任務 ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name, scheduledSource); } /** * 修改Scheduled的fixedRate * * @param name scheduled的名稱 * @param fixedRate 上一次開始執(zhí)行之后多長時間再執(zhí)行 */ public void setScheduledFixedRate(String name, Long fixedRate) { //終止原先的任務 cancelScheduled(name); //創(chuàng)建新的任務 ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name, scheduledSource); } /** * 查詢所有啟動的Scheduled */ public List<String> getRunScheduledName() { Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet(); return new ArrayList<>(names); } /** * 查詢所有的Scheduled */ public List<String> getAllSuperScheduledName() { Set<String> names = superScheduledConfig.getNameToRunnable().keySet(); return new ArrayList<>(names); } /** * 終止Scheduled * * @param name scheduled的名稱 */ public void cancelScheduled(String name) { ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name); scheduledFuture.cancel(true); superScheduledConfig.removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經(jīng)終止..."); } /** * 啟動Scheduled * * @param name scheduled的名稱 * @param scheduledSource 定時任務的源信息 */ public void addScheduled(String name, ScheduledSource scheduledSource) { if (getRunScheduledName().contains(name)) { throw new SuperScheduledException("定時任務" + name + "已經(jīng)被啟動過了"); } if (!scheduledSource.check()) { throw new SuperScheduledException("定時任務" + name + "源數(shù)據(jù)內(nèi)容錯誤"); } scheduledSource.refreshType(); Runnable runnable = superScheduledConfig.getRunnable(name); ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler(); ScheduledFuture<?> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable); logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經(jīng)啟動..."); superScheduledConfig.addScheduledSource(name, scheduledSource); superScheduledConfig.addScheduledFuture(name, schedule); } /** * 以cron類型啟動Scheduled * * @param name scheduled的名稱 * @param cron cron表達式 */ public void addCronScheduled(String name, String cron) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setCron(cron); addScheduled(name, scheduledSource); } /** * 以fixedDelay類型啟動Scheduled * * @param name scheduled的名稱 * @param fixedDelay 上一次執(zhí)行完畢時間點之后多長時間再執(zhí)行 * @param initialDelay 第一次執(zhí)行的延遲時間 */ public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if (initialDelay != null && initialDelay.length == 1) { scheduledSource.setInitialDelay(initialDelay[0]); } else if (initialDelay != null && initialDelay.length > 1) { throw new SuperScheduledException("第一次執(zhí)行的延遲時間只能傳入一個參數(shù)"); } addScheduled(name, scheduledSource); } /** * 以fixedRate類型啟動Scheduled * * @param name scheduled的名稱 * @param fixedRate 上一次開始執(zhí)行之后多長時間再執(zhí)行 * @param initialDelay 第一次執(zhí)行的延遲時間 */ public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setFixedRate(fixedRate); if (initialDelay != null && initialDelay.length == 1) { scheduledSource.setInitialDelay(initialDelay[0]); } else if (initialDelay != null && initialDelay.length > 1) { throw new SuperScheduledException("第一次執(zhí)行的延遲時間只能傳入一個參數(shù)"); } addScheduled(name, scheduledSource); } /** * 手動執(zhí)行一次任務 * * @param name scheduled的名稱 */ public void runScheduled(String name) { Runnable runnable = superScheduledConfig.getRunnable(name); runnable.run(); } }
2、增強接口實現(xiàn)
增強器實現(xiàn)的整體思路與SpringAop的思路一致,實現(xiàn)沒有Aop復雜
(1) 增強接口
@Order(Ordered.HIGHEST_PRECEDENCE) public interface BaseStrengthen { /** * 前置強化方法 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void before(Object bean, Method method, Object[] args); /** * 后置強化方法 * 出現(xiàn)異常不會執(zhí)行 * 如果未出現(xiàn)異常,在afterFinally方法之后執(zhí)行 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void after(Object bean, Method method, Object[] args); /** * 異常強化方法 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void exception(Object bean, Method method, Object[] args); /** * Finally強化方法,出現(xiàn)異常也會執(zhí)行 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void afterFinally(Object bean, Method method, Object[] args); }
(2) 代理抽象類
public abstract class Point { /** * 定時任務名 */ private String superScheduledName; /** * 抽象的執(zhí)行方法,使用代理實現(xiàn) * @param runnable 定時任務執(zhí)行器 */ public abstract Object invoke(SuperScheduledRunnable runnable); /* 普通的get/sets省略 */ }
(3) 調(diào)用鏈類
public class Chain { private List<Point> list; private int index = -1; /** * 索引自增1 */ public int incIndex() { return ++index; } /** * 索引還原 */ public void resetIndex() { this.index = -1; } }
(4) cglib動態(tài)代理實現(xiàn)
使用cglib代理增強器,將增強器全部代理成調(diào)用鏈節(jié)點Point
public class RunnableBaseInterceptor implements MethodInterceptor { /** * 定時任務執(zhí)行器 */ private SuperScheduledRunnable runnable; /** * 定時任務增強類 */ private BaseStrengthen strengthen; @Override public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object result; //如果執(zhí)行的是invoke()方法 if ("invoke".equals(method.getName())) { //前置強化方法 strengthen.before(obj, method, args); try { //調(diào)用執(zhí)行器中的invoke()方法 result = runnable.invoke(); } catch (Exception e) { //異常強化方法 strengthen.exception(obj, method, args); throw new SuperScheduledException(strengthen.getClass() + "中強化執(zhí)行時發(fā)生錯誤", e); } finally { //Finally強化方法,出現(xiàn)異常也會執(zhí)行 strengthen.afterFinally(obj, method, args); } //后置強化方法 strengthen.after(obj, method, args); } else { //直接執(zhí)行方法 result = methodProxy.invokeSuper(obj, args); } return result; } public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) { this.runnable = runnable; if (BaseStrengthen.class.isAssignableFrom(object.getClass())) { this.strengthen = (BaseStrengthen) object; } else { throw new SuperScheduledException(object.getClass() + "對象不是BaseStrengthen類型"); } } public RunnableBaseInterceptor() { } }
(5) 定時任務執(zhí)行器實現(xiàn)
public class SuperScheduledRunnable { /** * 原始的方法 */ private Method method; /** * 方法所在的bean */ private Object bean; /** * 增強器的調(diào)用鏈 */ private Chain chain; public Object invoke() { Object result; //索引自增1 if (chain.incIndex() == chain.getList().size()) { //調(diào)用鏈中的增強方法已經(jīng)全部執(zhí)行結束 try { //調(diào)用鏈索引初始化 chain.resetIndex(); //增強器全部執(zhí)行完畢,執(zhí)行原本的方法 result = method.invoke(bean); } catch (IllegalAccessException | InvocationTargetException e) { throw new SuperScheduledException(e.getLocalizedMessage()); } } else { //獲取被代理后的方法增強器 Point point = chain.getList().get(chain.getIndex()); //執(zhí)行增強器代理 //增強器代理中,會回調(diào)方法執(zhí)行器,形成調(diào)用鏈,逐一運行調(diào)用鏈中的增強器 result = point.invoke(this); } return result; } /* 普通的get/sets省略 */ }
(6) 增強器代理邏輯
com.gyx.superscheduled.core.SuperScheduledApplicationRunner
類中的代碼片段
//創(chuàng)建執(zhí)行控制器 SuperScheduledRunnable runnable = new SuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //用來存放 增強器的代理對象 List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length); //循環(huán)所有的增強器的beanName for (String baseStrengthenBeanName : baseStrengthenBeanNames) { //獲取增強器的bean對象 Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName); //將增強器代理成Point節(jié)點 Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy.setSuperScheduledName(name); //增強器的代理對象緩存到list中 points.add(proxy); } //將增強器代理實例的集合生成調(diào)用鏈 //執(zhí)行控制器中設置調(diào)用鏈 runnable.setChain(new Chain(points));
以上就是SpringBoot實現(xiàn)定時任務動態(tài)管理示例的詳細內(nèi)容,更多關于SpringBoot定時任務動態(tài)管理的資料請關注腳本之家其它相關文章!
相關文章
Springboot 如何指定獲取自己寫的配置properties文件的值
這篇文章主要介紹了Springboot 如何指定獲取自己寫的配置properties文件的值,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07springboot下使用shiro自定義filter的個人經(jīng)驗分享
這篇文章主要介紹了springboot下使用shiro自定義filter的個人經(jīng)驗,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09java多線程并發(fā)executorservice(任務調(diào)度)類
這篇文章主要介紹了線程并發(fā)ScheduledExecutorService類,設置 ScheduledExecutorService ,2秒后,在 1 分鐘內(nèi)每 10 秒鐘蜂鳴一次2014-01-01自帶IDEA插件的阿里開源診斷神器Arthas線上項目BUG調(diào)試
這篇文章主要為大家介紹了自帶IDEA插件阿里開源診斷神器Arthas線上項目BUG調(diào)試,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06Java使用NIO優(yōu)化IO實現(xiàn)文件上傳下載功能
IO 是基于流來讀取的,而NIO則是基于塊讀取,面向流 的 I/O 系統(tǒng)一次一個字節(jié)地處理數(shù)據(jù),這篇文章主要介紹了Java使用NIO優(yōu)化IO實現(xiàn)文件上傳下載功能,需要的朋友可以參考下2022-07-07Java Web檢查用戶登錄狀態(tài)(防止用戶訪問到非法頁面)
一般javaweb網(wǎng)站都有用戶登錄,而有一些操作必須用戶登錄才能進行,本文主要介紹了Java Web檢查用戶登錄狀態(tài),具有一定的參考價值,感興趣的可以了解一下2023-09-09