欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring boot定時(shí)任務(wù)的原理及動(dòng)態(tài)創(chuàng)建詳解

 更新時(shí)間:2019年03月04日 09:11:05   作者:HJZ  
這篇文章主要給大家介紹了關(guān)于Spring boot定時(shí)任務(wù)的原理及動(dòng)態(tài)創(chuàng)建的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧

v一、前言

定時(shí)任務(wù)一般是項(xiàng)目中都需要用到的,可以用于定時(shí)處理一些特殊的任務(wù)。這篇文章主要給大家介紹了關(guān)于Spring boot定時(shí)任務(wù)的原理及動(dòng)態(tài)創(chuàng)建的相關(guān)內(nèi)容,下面來一起看看詳細(xì)的介紹吧

上周工作遇到了一個(gè)需求,同步多個(gè)省份銷號(hào)數(shù)據(jù),解綁微信粉絲。分省定時(shí)將銷號(hào)數(shù)據(jù)放到SFTP服務(wù)器上,我需要開發(fā)定時(shí)任務(wù)去解析文件。因?yàn)槭嵌嗍》?,服?wù)器、文件名規(guī)則、數(shù)據(jù)規(guī)則都不一定,所以要做成可配置是有一定難度的。數(shù)據(jù)規(guī)則這塊必須強(qiáng)烈要求統(tǒng)一,服務(wù)器、文件名規(guī)則都可以從配置中心去讀。每新增一個(gè)省份的配置,后臺(tái)感知到后,動(dòng)態(tài)生成定時(shí)任務(wù)。

v二、Springboot引入定時(shí)任務(wù)核心配置

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

 @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
 public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
 return new ScheduledAnnotationBeanPostProcessor();
 }

}

接下來主要看一下這個(gè)核心后置處理器:ScheduledAnnotationBeanPostProcessor 。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
 if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
  bean instanceof ScheduledExecutorService) {
 // Ignore AOP infrastructure such as scoped proxies.
 return bean;
 }

 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
 if (!this.nonAnnotatedClasses.contains(targetClass)) {
 Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
   Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
    method, Scheduled.class, Schedules.class);
   return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
  });
 if (annotatedMethods.isEmpty()) {
  this.nonAnnotatedClasses.add(targetClass);
  if (logger.isTraceEnabled()) {
  logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
  }
 }
 else {
  // Non-empty set of methods
  annotatedMethods.forEach((method, scheduledMethods) ->
   scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
  if (logger.isTraceEnabled()) {
  logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
   "': " + annotatedMethods);
  }
 }
 }
 return bean;
}

1、處理Scheduled注解,通過ScheduledTaskRegistrar注冊(cè)定時(shí)任務(wù)。

private void finishRegistration() {
 if (this.scheduler != null) {
 this.registrar.setScheduler(this.scheduler);
 }

 if (this.beanFactory instanceof ListableBeanFactory) {
 Map<String, SchedulingConfigurer> beans =
  ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
 List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
 AnnotationAwareOrderComparator.sort(configurers);
 for (SchedulingConfigurer configurer : configurers) {
  configurer.configureTasks(this.registrar);
 }
 }

 if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
 Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
 try {
  // Search for TaskScheduler bean...
  this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
 }
 catch (NoUniqueBeanDefinitionException ex) {
  logger.trace("Could not find unique TaskScheduler bean", ex);
  try {
  this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
  }
  catch (NoSuchBeanDefinitionException ex2) {
  if (logger.isInfoEnabled()) {
   logger.info("More than one TaskScheduler bean exists within the context, and " +
    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
    ex.getBeanNamesFound());
  }
  }
 }
 catch (NoSuchBeanDefinitionException ex) {
  logger.trace("Could not find default TaskScheduler bean", ex);
  // Search for ScheduledExecutorService bean next...
  try {
  this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
  }
  catch (NoUniqueBeanDefinitionException ex2) {
  logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
  try {
   this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
  }
  catch (NoSuchBeanDefinitionException ex3) {
   if (logger.isInfoEnabled()) {
   logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
    ex2.getBeanNamesFound());
   }
  }
  }
  catch (NoSuchBeanDefinitionException ex2) {
  logger.trace("Could not find default ScheduledExecutorService bean", ex2);
  // Giving up -> falling back to default scheduler within the registrar...
  logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
  }
 }
 }

 this.registrar.afterPropertiesSet();
}

  1、通過一系列的SchedulingConfigurer動(dòng)態(tài)配置ScheduledTaskRegistrar。

  2、向ScheduledTaskRegistrar注冊(cè)一個(gè)TaskScheduler(用于對(duì)Runnable的任務(wù)進(jìn)行調(diào)度,它包含有多種觸發(fā)規(guī)則)。

  3、registrar.afterPropertiesSet(),在這開始安排所有的定時(shí)任務(wù)開始執(zhí)行了。

protected void scheduleTasks() {
 if (this.taskScheduler == null) {
 this.localExecutor = Executors.newSingleThreadScheduledExecutor();
 this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
 }
 if (this.triggerTasks != null) {
 for (TriggerTask task : this.triggerTasks) {
  addScheduledTask(scheduleTriggerTask(task));
 }
 }
 if (this.cronTasks != null) {
 for (CronTask task : this.cronTasks) {
  addScheduledTask(scheduleCronTask(task));
 }
 }
 if (this.fixedRateTasks != null) {
 for (IntervalTask task : this.fixedRateTasks) {
  addScheduledTask(scheduleFixedRateTask(task));
 }
 }
 if (this.fixedDelayTasks != null) {
 for (IntervalTask task : this.fixedDelayTasks) {
  addScheduledTask(scheduleFixedDelayTask(task));
 }
 }
}

  1、TriggerTask:動(dòng)態(tài)定時(shí)任務(wù)。通過Trigger#nextExecutionTime 給定的觸發(fā)上下文確定下一個(gè)執(zhí)行時(shí)間。

  2、CronTask:動(dòng)態(tài)定時(shí)任務(wù),TriggerTask子類。通過cron表達(dá)式確定的時(shí)間觸發(fā)下一個(gè)任務(wù)執(zhí)行。

  3、IntervalTask:一定時(shí)間延遲之后,周期性執(zhí)行的任務(wù)。

  4、taskScheduler 如果為空,默認(rèn)是ConcurrentTaskScheduler,并使用默認(rèn)單線程的ScheduledExecutor。

v三、主要看一下CronTask工作原理

ScheduledTaskRegistrar.java
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {
 ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
 boolean newTask = false;
 if (scheduledTask == null) {
 scheduledTask = new ScheduledTask(task);
 newTask = true;
 }
 if (this.taskScheduler != null) {
 scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
 }
 else {
 addCronTask(task);
 this.unresolvedTasks.put(task, scheduledTask);
 }
 return (newTask ? scheduledTask : null);
}

ConcurrentTaskScheduler.java
@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
 try {
 if (this.enterpriseConcurrentScheduler) {
  return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
 }
 else {
  ErrorHandler errorHandler =
   (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
  return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
 }
 }
 catch (RejectedExecutionException ex) {
 throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
 }
}

ReschedulingRunnable.java
@Nullable
public ScheduledFuture<?> schedule() {
 synchronized (this.triggerContextMonitor) {
 this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
 if (this.scheduledExecutionTime == null) {
  return null;
 }
 long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
 this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
 return this;
 }
}

private ScheduledFuture<?> obtainCurrentFuture() {
 Assert.state(this.currentFuture != null, "No scheduled future");
 return this.currentFuture;
}

@Override
public void run() {
 Date actualExecutionTime = new Date();
 super.run();
 Date completionTime = new Date();
 synchronized (this.triggerContextMonitor) {
 Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
 this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
 if (!obtainCurrentFuture().isCancelled()) {
  schedule();
 }
 }
}

  1、最終將task和trigger都封裝到了ReschedulingRunnable中。

  2、ReschedulingRunnable實(shí)現(xiàn)了任務(wù)重復(fù)調(diào)度(schedule方法中調(diào)用調(diào)度器executor并傳入自身對(duì)象,executor會(huì)調(diào)用run方法,run方法又調(diào)用了schedule方法)。

  3、ReschedulingRunnable schedule方法加了同步鎖,只能有一個(gè)線程拿到下次執(zhí)行時(shí)間并加入執(zhí)行器的調(diào)度。

  4、不同的ReschedulingRunnable對(duì)象之間在線程池夠用的情況下是不會(huì)相互影響的,也就是說滿足線程池的條件下,TaskScheduler的schedule方法的多次調(diào)用是可以交叉執(zhí)行的。

ScheduledThreadPoolExecutor.java
public ScheduledFuture<?> schedule(Runnable command,
     long delay,
     TimeUnit unit) {
 if (command == null || unit == null)
 throw new NullPointerException();
 RunnableScheduledFuture<?> t = decorateTask(command,
 new ScheduledFutureTask<Void>(command, null,
     triggerTime(delay, unit)));
 delayedExecute(t);
 return t;
}


private void delayedExecute(RunnableScheduledFuture<?> task) {
 if (isShutdown())
 reject(task);
 else {
 super.getQueue().add(task);
 if (isShutdown() &&
  !canRunInCurrentRunState(task.isPeriodic()) &&
  remove(task))
  task.cancel(false);
 else
  ensurePrestart();
 }
}

  ScheduledFutureTask 工作原理如下圖所示【太懶了,不想畫圖了,盜圖一張】。

 

  1、ScheduledFutureTask會(huì)放入優(yōu)先阻塞隊(duì)列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆實(shí)現(xiàn))

  2、上圖中的Thread對(duì)象即ThreadPoolExecutor.Worker,實(shí)現(xiàn)了Runnable接口

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
 setState(-1); // inhibit interrupts until runWorker
 this.firstTask = firstTask;
 this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
 runWorker(this);
}

  1、Worker中維護(hù)了Thread對(duì)象,Thread對(duì)象的Runnable實(shí)例即Worker自身

  2、ThreadPoolExecutor#addWorker方法中會(huì)創(chuàng)建Worker對(duì)象,然后拿到Worker中的thread實(shí)例并start,這樣就創(chuàng)建了線程池中的一個(gè)線程實(shí)例

  3、Worker的run方法會(huì)調(diào)用ThreadPoolExecutor#runWorker方法,這才是任務(wù)最終被執(zhí)行的地方,該方法示意如下

 ?。?)首先取傳入的task執(zhí)行,如果task是null,只要該線程池處于運(yùn)行狀態(tài),就會(huì)通過getTask方法從workQueue中取任務(wù)。ThreadPoolExecutor的execute方法會(huì)在無法產(chǎn)生core線程的時(shí)候向  workQueue隊(duì)列中offer任務(wù)。
getTask方法從隊(duì)列中取task的時(shí)候會(huì)根據(jù)相關(guān)配置決定是否阻塞和阻塞多久。如果getTask方法結(jié)束,返回的是null,runWorker循環(huán)結(jié)束,執(zhí)行processWorkerExit方法。
至此,該線程結(jié)束自己的使命,從線程池中“消失”。

 ?。?)在開始執(zhí)行任務(wù)之前,會(huì)調(diào)用Worker的lock方法,目的是阻止task正在被執(zhí)行的時(shí)候被interrupt,通過調(diào)用clearInterruptsForTaskRun方法來保證的(后面可以看一下這個(gè)方法),該線程沒有自己的interrupt set了。

  (3)beforeExecute和afterExecute方法用于在執(zhí)行任務(wù)前后執(zhí)行一些自定義的操作,這兩個(gè)方法是空的,留給繼承類去填充功能。

我們可以在beforeExecute方法中拋出異常,這樣task不會(huì)被執(zhí)行,而且在跳出該循環(huán)的時(shí)候completedAbruptly的值是true,表示the worker died due to user exception,會(huì)用decrementWorkerCount調(diào)整wc。

  (4)因?yàn)镽unnable的run方法不能拋出Throwables異常,所以這里重新包裝異常然后拋出,拋出的異常會(huì)使當(dāng)當(dāng)前線程死掉,可以在afterExecute中對(duì)異常做一些處理。

  (5)afterExecute方法也可能拋出異常,也可能使當(dāng)前線程死掉。

v四、動(dòng)態(tài)創(chuàng)建定時(shí)任務(wù)

v  TaskConfiguration 配置類

@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {

 @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
 public ScheduledExecutorService scheduledAnnotationProcessor() {
 return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
 }

 private static class DefaultThreadFactory implements ThreadFactory {
 private static final AtomicInteger poolNumber = new AtomicInteger(1);
 private final ThreadGroup group;
 private final AtomicInteger threadNumber = new AtomicInteger(1);
 private final String namePrefix;

 DefaultThreadFactory() {
  SecurityManager s = System.getSecurityManager();
  group = (s != null) ? s.getThreadGroup() :
   Thread.currentThread().getThreadGroup();
  namePrefix = "pool-" +
   poolNumber.getAndIncrement() +
   "-schedule-";
 }

 @Override
 public Thread newThread(Runnable r) {
  Thread t = new Thread(group, r,
   namePrefix + threadNumber.getAndIncrement(),
   0);
  if (t.isDaemon()) {
  t.setDaemon(false);
  }
  if (t.getPriority() != Thread.NORM_PRIORITY) {
  t.setPriority(Thread.NORM_PRIORITY);
  }
  return t;
 }
 }
}

  1、保證ConcurrentTaskScheduler不使用默認(rèn)單線程的ScheduledExecutor,而是corePoolSize=5的線程池

  2、自定義線程池工廠類

v  DynamicTask 動(dòng)態(tài)定時(shí)任務(wù)

@Configuration
public class DynamicTask implements SchedulingConfigurer {
 private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class);

 private static final ExecutorService es = new ThreadPoolExecutor(10, 20,
   0L, TimeUnit.MILLISECONDS,
   new LinkedBlockingQueue<>(10),
   new DynamicTaskConsumeThreadFactory());


 private volatile ScheduledTaskRegistrar registrar;
 private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
 private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();

 private volatile List<TaskConstant> taskConstants = Lists.newArrayList();

 @Override
 public void configureTasks(ScheduledTaskRegistrar registrar) {
  this.registrar = registrar;
  this.registrar.addTriggerTask(() -> {
     if (!CollectionUtils.isEmpty(taskConstants)) {
      LOGGER.info("檢測(cè)動(dòng)態(tài)定時(shí)任務(wù)列表...");
      List<TimingTask> tts = new ArrayList<>();
      taskConstants
        .forEach(taskConstant -> {
         TimingTask tt = new TimingTask();
         tt.setExpression(taskConstant.getCron());
         tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
         tts.add(tt);
        });
      this.refreshTasks(tts);
     }
    }
    , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
 }


 public List<TaskConstant> getTaskConstants() {
  return taskConstants;
 }

 private void refreshTasks(List<TimingTask> tasks) {
  //取消已經(jīng)刪除的策略任務(wù)
  Set<String> taskIds = scheduledFutures.keySet();
  for (String taskId : taskIds) {
   if (!exists(tasks, taskId)) {
    scheduledFutures.get(taskId).cancel(false);
   }
  }
  for (TimingTask tt : tasks) {
   String expression = tt.getExpression();
   if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
    LOGGER.error("定時(shí)任務(wù)DynamicTask cron表達(dá)式不合法: " + expression);
    continue;
   }
   //如果配置一致,則不需要重新創(chuàng)建定時(shí)任務(wù)
   if (scheduledFutures.containsKey(tt.getTaskId())
     && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
    continue;
   }
   //如果策略執(zhí)行時(shí)間發(fā)生了變化,則取消當(dāng)前策略的任務(wù)
   if (scheduledFutures.containsKey(tt.getTaskId())) {
    scheduledFutures.remove(tt.getTaskId()).cancel(false);
    cronTasks.remove(tt.getTaskId());
   }
   CronTask task = new CronTask(tt, expression);
   ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
   cronTasks.put(tt.getTaskId(), task);
   scheduledFutures.put(tt.getTaskId(), future);
  }
 }

 private boolean exists(List<TimingTask> tasks, String taskId) {
  for (TimingTask task : tasks) {
   if (task.getTaskId().equals(taskId)) {
    return true;
   }
  }
  return false;
 }

 @PreDestroy
 public void destroy() {
  this.registrar.destroy();
 }

 public static class TaskConstant {
  private String cron;
  private String taskId;

  public String getCron() {
   return cron;
  }

  public void setCron(String cron) {
   this.cron = cron;
  }

  public String getTaskId() {
   return taskId;
  }

  public void setTaskId(String taskId) {
   this.taskId = taskId;
  }
 }

 private class TimingTask implements Runnable {
  private String expression;

  private String taskId;

  public String getTaskId() {
   return taskId;
  }

  public void setTaskId(String taskId) {
   this.taskId = taskId;
  }

  @Override
  public void run() {
   //設(shè)置隊(duì)列大小10
   LOGGER.error("當(dāng)前CronTask: " + this);
   DynamicBlockingQueue queue = new DynamicBlockingQueue(3);
   es.submit(() -> {
    while (!queue.isDone() || !queue.isEmpty()) {
     try {
      String content = queue.poll(500, TimeUnit.MILLISECONDS);
      if (StringUtils.isBlank(content)) {
       return;
      }
      LOGGER.info("DynamicBlockingQueue 消費(fèi):" + content);
      TimeUnit.MILLISECONDS.sleep(500);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   });

   //隊(duì)列放入數(shù)據(jù)
   for (int i = 0; i < 5; ++i) {
    try {
     queue.put(String.valueOf(i));
     LOGGER.info("DynamicBlockingQueue 生產(chǎn):" + i);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
   queue.setDone(true);
  }

  public String getExpression() {
   return expression;
  }

  public void setExpression(String expression) {
   this.expression = expression;
  }

  @Override
  public String toString() {
   return ReflectionToStringBuilder.toString(this
     , ToStringStyle.JSON_STYLE
     , false
     , false
     , TimingTask.class);
  }

 }

 /**
  * 隊(duì)列消費(fèi)線程工廠類
  */
 private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {
  private static final AtomicInteger poolNumber = new AtomicInteger(1);
  private final ThreadGroup group;
  private final AtomicInteger threadNumber = new AtomicInteger(1);
  private final String namePrefix;

  DynamicTaskConsumeThreadFactory() {
   SecurityManager s = System.getSecurityManager();
   group = (s != null) ? s.getThreadGroup() :
     Thread.currentThread().getThreadGroup();
   namePrefix = "pool-" +
     poolNumber.getAndIncrement() +
     "-dynamic-task-";
  }

  @Override
  public Thread newThread(Runnable r) {
   Thread t = new Thread(group, r,
     namePrefix + threadNumber.getAndIncrement(),
     0);
   if (t.isDaemon()) {
    t.setDaemon(false);
   }
   if (t.getPriority() != Thread.NORM_PRIORITY) {
    t.setPriority(Thread.NORM_PRIORITY);
   }
   return t;
  }
 }

 private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {
  DynamicBlockingQueue(int capacity) {
   super(capacity);
  }


  private volatile boolean done = false;

  public boolean isDone() {
   return done;
  }

  public void setDone(boolean done) {
   this.done = done;
  }
 }
}

  1、taskConstants 動(dòng)態(tài)任務(wù)列表

  2、ScheduledTaskRegistrar#addTriggerTask 添加動(dòng)態(tài)周期定時(shí)任務(wù),檢測(cè)動(dòng)態(tài)任務(wù)列表的變化

CronTask task = new CronTask(tt, expression);
ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(tt.getTaskId(), task);
scheduledFutures.put(tt.getTaskId(), future);

  3、動(dòng)態(tài)創(chuàng)建cron定時(shí)任務(wù),拿到ScheduledFuture實(shí)例并緩存起來

  4、在刷新任務(wù)列表時(shí),通過緩存的ScheduledFuture實(shí)例和CronTask實(shí)例,來決定是否取消、移除失效的動(dòng)態(tài)定時(shí)任務(wù)。

v  DynamicTaskTest 動(dòng)態(tài)定時(shí)任務(wù)測(cè)試類

@RunWith(SpringRunner.class)
@SpringBootTest
public class DynamicTaskTest {

 @Autowired
 private DynamicTask dynamicTask;

 @Test
 public void test() throws InterruptedException {
  List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
  DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
  taskConstant.setCron("0/5 * * * * ?");
  taskConstant.setTaskId("test1");
  taskConstans.add(taskConstant);


  DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
  taskConstant1.setCron("0/5 * * * * ?");
  taskConstant1.setTaskId("test2");
  taskConstans.add(taskConstant1);

  DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
  taskConstant2.setCron("0/5 * * * * ?");
  taskConstant2.setTaskId("test3");
  taskConstans.add(taskConstant2);

  TimeUnit.SECONDS.sleep(40);
  //移除并添加新的配置
  taskConstans.remove(taskConstans.size() - 1);
  DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
  taskConstant3.setCron("0/5 * * * * ?");
  taskConstant3.setTaskId("test4");
  taskConstans.add(taskConstant3);
//
  TimeUnit.MINUTES.sleep(50);
 }
}

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • SpringBoot集成Dubbo啟用gRPC協(xié)議

    SpringBoot集成Dubbo啟用gRPC協(xié)議

    這篇文章主要介紹了SpringBoot集成Dubbo啟用gRPC協(xié)議,以及與原生 gRPC 在代碼編寫過程中的區(qū)別。感興趣的同學(xué)可以參考閱讀
    2023-04-04
  • java 自定義可繼承枚舉Enum的案例

    java 自定義可繼承枚舉Enum的案例

    這篇文章主要介紹了java 自定義可繼承枚舉Enum的案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • JavaWeb三大組件之Filter過濾器詳解

    JavaWeb三大組件之Filter過濾器詳解

    這篇文章主要介紹了JavaWeb三大組件之Filter過濾器詳解,過濾器Filter是Java?Web應(yīng)用中的一種組件,它在請(qǐng)求到達(dá)Servlet或JSP之前或者響應(yīng)送回客戶端之前,對(duì)請(qǐng)求和響應(yīng)進(jìn)行預(yù)處理和后處理操作,需要的朋友可以參考下
    2023-10-10
  • Spring中事務(wù)管理方案和事務(wù)管理器及事務(wù)控制的API詳解

    Spring中事務(wù)管理方案和事務(wù)管理器及事務(wù)控制的API詳解

    這篇文章主要介紹了Spring中事務(wù)管理方案和事務(wù)管理器及事務(wù)控制的API詳解,事務(wù)管理是指對(duì)事務(wù)進(jìn)行管理和控制,以確保事務(wù)的正確性和完整性,事務(wù)管理的作用是保證數(shù)據(jù)庫(kù)的數(shù)據(jù)操作的一致性和可靠性,需要的朋友可以參考下
    2023-08-08
  • IO中flush()函數(shù)的使用代碼示例

    IO中flush()函數(shù)的使用代碼示例

    這篇文章主要介紹了IO中flush()函數(shù)的使用代碼示例,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-01-01
  • Springboot2 配置AOP日志的方法步驟

    Springboot2 配置AOP日志的方法步驟

    這篇文章主要介紹了Springboot2 配置AOP日志的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • Java編程使用卡片布局管理器示例【基于swing組件】

    Java編程使用卡片布局管理器示例【基于swing組件】

    這篇文章主要介紹了Java編程使用卡片布局管理器,結(jié)合實(shí)例形式分析了java基于swing組件的卡片布局管理器具體實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下
    2018-01-01
  • maven依賴傳遞和依賴沖突原理

    maven依賴傳遞和依賴沖突原理

    這篇文章主要介紹了maven依賴傳遞和依賴沖突原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • 淺談Java垃圾回收機(jī)制

    淺談Java垃圾回收機(jī)制

    這篇文章主要介紹了淺談Java垃圾回收機(jī)制,文中有非常詳細(xì)的圖文示例及代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很好的幫助,需要的朋友可以參考下
    2021-05-05
  • Java由淺入深通關(guān)抽象類與接口上

    Java由淺入深通關(guān)抽象類與接口上

    在類中沒有包含足夠的信息來描繪一個(gè)具體的對(duì)象,這樣的類稱為抽象類,接口是Java中最重要的概念之一,它可以被理解為一種特殊的類,不同的是接口的成員沒有執(zhí)行體,是由全局常量和公共的抽象方法所組成,本文給大家介紹Java抽象類和接口,感興趣的朋友一起看看吧
    2022-04-04

最新評(píng)論