關(guān)于ScheduledThreadPoolExecutor不執(zhí)行的原因分析
ScheduledThreadPoolExecutor不執(zhí)行原因分析
最近在調(diào)試一個(gè)監(jiān)控應(yīng)用指標(biāo)的時(shí)候發(fā)現(xiàn)定時(shí)器在服務(wù)啟動(dòng)執(zhí)行一次之后就不執(zhí)行了,這里用的定時(shí)器是Java的調(diào)度線程池 ScheduledThreadPoolExecutor
,后來(lái)經(jīng)過(guò)排查發(fā)現(xiàn) ScheduledThreadPoolExecutor
線程池處理任務(wù)如果拋出異常,會(huì)導(dǎo)致線程池不調(diào)度;下面就通過(guò)一個(gè)例子簡(jiǎn)單分析下為什么異常會(huì)導(dǎo)致 ScheduledThreadPoolExecutor
不執(zhí)行。
ScheduledThreadPoolExecutor不調(diào)度分析
示例程序
在示例程序可以看到當(dāng)計(jì)數(shù)器中的計(jì)數(shù)達(dá)到5的時(shí)候就會(huì)主動(dòng)拋出一個(gè)異常,拋出異常后 ScheduledThreadPoolExecutor
就不調(diào)度了。
public class ScheduledTask { private static final AtomicInteger count = new AtomicInteger(0); private static final ScheduledThreadPoolExecutor SCHEDULED_TASK = new ScheduledThreadPoolExecutor( 1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "sc-task"); t.setDaemon(true); return t; } }); public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); SCHEDULED_TASK.scheduleWithFixedDelay(() -> { System.out.println(111); if (count.get() == 5) { throw new IllegalArgumentException("my exception"); } count.incrementAndGet(); }, 0, 5, TimeUnit.SECONDS); latch.await(); } }
源碼分析
- ScheduledThreadPoolExecutor#run
run方法內(nèi)部首先判斷任務(wù)是不是周期性的任務(wù),如果不是周期性任務(wù)通過(guò) ScheduledFutureTask.super.run();
執(zhí)行任務(wù);如果狀態(tài)是運(yùn)行中或shutdown,取消任務(wù)執(zhí)行;如果是周期性的任務(wù),通過(guò) ScheduledFutureTask.super.runAndReset()
執(zhí)行任務(wù)并且重新設(shè)置狀態(tài),成功了就會(huì)執(zhí)行 setNextRunTime
設(shè)置下次調(diào)度的時(shí)間,問(wèn)題就是出現(xiàn)在 ScheduledFutureTask.super.runAndReset()
,這里執(zhí)行任務(wù)出現(xiàn)了異常,導(dǎo)致結(jié)果為false,就不進(jìn)行下次調(diào)度時(shí)間設(shè)置等
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
- *FutureTask#runAndReset
在線程任務(wù)執(zhí)行過(guò)程中拋出異常,然后 catch
到了異常,最終導(dǎo)致這個(gè)方法返回false,然后 ScheduledThreadPoolExecutor#run
就不設(shè)置下次執(zhí)行時(shí)間了,代碼 c.call();
拋出異常,跳過(guò) ran = true;
代碼,最終 runAndReset
返回false。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
注意:
Java
的 ScheduledThreadPoolExecutor
定時(shí)任務(wù)線程池所調(diào)度的任務(wù)中如果拋出了異常,并且異常沒(méi)有捕獲直接拋到框架中,會(huì)導(dǎo)致 ScheduledThreadPoolExecutor
定時(shí)任務(wù)不調(diào)度了,具體是因?yàn)楫?dāng)異常拋到 ScheduledThreadPoolExecutor
框架中時(shí)不進(jìn)行下次調(diào)度時(shí)間的設(shè)置,從而導(dǎo)致 ScheduledThreadPoolExecutor
定時(shí)任務(wù)不調(diào)度。
ScheduledThreadPoolExecutor線程池例子
ScheduledThreadPoolExecutor 使用
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor,主要用來(lái)給定時(shí)間運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。
在 ScheduledThreadPoolExecutor 的實(shí)現(xiàn)中,使用了 FutureTask 運(yùn)行任務(wù)以及使用無(wú)界隊(duì)列 DelayedWorkQueue 來(lái)保存任務(wù)。
1. 使用示例
- 提交任務(wù)
ScheduledThreadPoolExecutor 實(shí)現(xiàn)了 ScheduledExecutorService 接口,其中,接口中有四個(gè)需要實(shí)現(xiàn)的方法,其中 schedule() 的兩個(gè)方法需要設(shè)置任務(wù)以及任務(wù)啟動(dòng)的延遲時(shí)間,scheduleAtFixedRate() 可以設(shè)置任務(wù)定時(shí)重復(fù)執(zhí)行,scheduleWithFixedDelay() 則是設(shè)置兩個(gè)任務(wù)之間的執(zhí)行延遲時(shí)間。
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2); ? poolExecutor.schedule(() -> { ? ?? ?// 提交的任務(wù) }, 5, TimeUnit.HOURS); ? poolExecutor.scheduleAtFixedRate(() -> { ? ?? ?// 提交的任務(wù) }, 0, 5, TimeUnit.HOURS); ? poolExecutor.scheduleWithFixedDelay(() -> { ? ?? ?// 提交的任務(wù) }, 0, 5, TimeUnit.HOURS);
- 簡(jiǎn)單例子
下面的例子中每 500 毫秒打印一次字符串,executor 會(huì)有 5 秒的時(shí)間來(lái)等待任務(wù)執(zhí)行結(jié)束,也就是一共可以打印 10 次字符串。
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10); ? executor.scheduleWithFixedDelay(() -> { ? ? ? System.out.println("測(cè)試"); ? }, 0, 500, TimeUnit.MILLISECONDS); ? try { ? ? ? executor.awaitTermination(5, TimeUnit.SECONDS); ? ? ? executor.shutdown(); ? } catch (InterruptedException e) { ? ? ? e.printStackTrace(); ? }
ScheduledThreadPoolExecutor 原理
1. DelayedWorkQueue
ScheduledThreadPoolExecutor 的構(gòu)造方法對(duì) DelayedWorkQueue 進(jìn)行了初始化,并且最大線程數(shù)量設(shè)置成了 Integer.MAX_VALUE。
public ScheduledThreadPoolExecutor(int corePoolSize) { ? ? ? super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, ? ? ? ? ? ? new DelayedWorkQueue()); ? }
其中,DelayedWorkQueue 中的隊(duì)列是 RunnableScheduledFuture 類(lèi)型以及容量為 16 的數(shù)組。
private static final int INITIAL_CAPACITY = 16; ? private RunnableScheduledFuture<?>[] queue = ? ? ? new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; ? private final ReentrantLock lock = new ReentrantLock();
隊(duì)列添加任務(wù)是以 DelayedWorkQueue 以堆作為數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)任務(wù),在添加元素的時(shí)候,會(huì)使用基于 Siftup 版本進(jìn)行元素添加,并且會(huì)根據(jù)任務(wù)的執(zhí)行時(shí)間的大小來(lái)排序。
public boolean offer(Runnable x) { ? ? ? if (x == null) ? ? ? ? ? throw new NullPointerException(); ? ? RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; ? ? ? final ReentrantLock lock = this.lock; ? ? ? lock.lock(); ? ? try { ?? ? ? ?// 當(dāng)隊(duì)列的容量不夠,會(huì)擴(kuò)充 50% ? ? ? ? int i = size; ? ? ? ? if (i >= queue.length) ? ? ? ? ? ? grow(); ? ? ? ? size = i + 1; ? ? ? ? if (i == 0) { ? ? ? ? ? ? queue[0] = e; ? ? ? ? ? ? setIndex(e, 0); ? ? ? ? } else { ? ? ? ? ? ? ? siftUp(i, e); ? ? ? ? ? } ? ? ? ? ? if (queue[0] == e) { ? ? ? ? ? ? ? leader = null; ? ? ? ? ? ? ? available.signal(); ? ? ? ? ? } ? ? ? } finally { ? ? ? ? ? lock.unlock(); ? ? ? } ? ? ? return true; ? }
2. delayedExecute()
ScheduledExecutorService 接口的四個(gè)實(shí)現(xiàn)方法中都涉及到了 delayedExecute(),方法主要用來(lái)判斷線程池的狀態(tài)以及對(duì)線程進(jìn)行初始化。
private void delayedExecute(RunnableScheduledFuture<?> task) { ?? ?// 如果線程池關(guān)閉了,需要執(zhí)行飽和策略 ? ? if (isShutdown()) ? ? ? ? ? reject(task); ? ? ? else { ?? ? ? ?// 添加到隊(duì)列中 ? ? ? ? super.getQueue().add(task); ? ? ? ? ? if (isShutdown() && ? ? ? ? ? ? ? !canRunInCurrentRunState(task.isPeriodic()) && ? ? ? ? ? ? ? remove(task)) ? ? ? ? ? ? ? task.cancel(false); ? ? ? ? ? else ?? ? ? ? ? ?// 判斷等待隊(duì)列中是否已經(jīng)滿了,會(huì)使用到 ThreadPoolExecutor ?? ? ? ? ? ?ensurePrestart(); ? ? ? } ? } void ensurePrestart() { ? ? ? int wc = workerCountOf(ctl.get()); ? ? ? if (wc < corePoolSize) ? ? ? ? ? addWorker(null, true); ? ? ? else if (wc == 0) ? ? ? ? ? addWorker(null, false); ? }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java8中方便又實(shí)用的Map函數(shù)總結(jié)
java8之后,常用的Map接口中添加了一些非常實(shí)用的函數(shù),可以大大簡(jiǎn)化一些特定場(chǎng)景的代碼編寫(xiě),提升代碼可讀性,快跟隨小編一起來(lái)看看吧2022-11-11SpringBoot下載Excel文件時(shí),報(bào)錯(cuò)文件損壞的解決方案
這篇文章主要介紹了SpringBoot下載Excel文件時(shí),報(bào)錯(cuò)文件損壞的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06詳解Spring Boot應(yīng)用的啟動(dòng)和停止(start啟動(dòng))
這篇文章主要介紹了詳解Spring Boot應(yīng)用的啟動(dòng)和停止(start啟動(dòng)),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12如何在Spring?Boot微服務(wù)使用ValueOperations操作Redis集群String字符串
這篇文章主要介紹了在Spring?Boot微服務(wù)使用ValueOperations操作Redis集群String字符串類(lèi)型數(shù)據(jù),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06詳談jvm--Java中init和clinit的區(qū)別
下面小編就為大家?guī)?lái)一篇詳談jvm--Java中init和clinit的區(qū)別。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-10-10Java輕松使用工具類(lèi)實(shí)現(xiàn)獲取MP3音頻時(shí)長(zhǎng)
在Java中,工具類(lèi)定義了一組公共方法,這篇文章將介紹Java中使用工具類(lèi)來(lái)獲取一個(gè)MP3音頻文件的時(shí)間長(zhǎng)度,感興趣的同學(xué)繼續(xù)往下閱讀吧2021-10-10實(shí)現(xiàn)一個(gè)簡(jiǎn)單Dubbo完整過(guò)程詳解
這篇文章主要為大家介紹了實(shí)現(xiàn)一個(gè)簡(jiǎn)單Dubbo完整過(guò)程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07MyBatis?Plus實(shí)現(xiàn)中文排序的兩種有效方法
在MyBatis?Plus項(xiàng)目開(kāi)發(fā)中,針對(duì)中文數(shù)據(jù)的排序需求是一個(gè)常見(jiàn)的挑戰(zhàn),尤其是在需要按照拼音或特定語(yǔ)言邏輯排序時(shí),本文整合了兩種有效的方法,旨在幫助開(kāi)發(fā)者克服MyBatis?Plus在處理中文排序時(shí)遇到的障礙,需要的朋友可以參考下2024-08-08Java字符串操作全解析之語(yǔ)法、示例與應(yīng)用場(chǎng)景分析
在Java算法題和日常開(kāi)發(fā)中,字符串處理是必備的核心技能,本文全面梳理Java中字符串的常用操作語(yǔ)法,結(jié)合代碼示例、應(yīng)用場(chǎng)景和避坑指南,可快速掌握字符串處理技巧,輕松應(yīng)對(duì)筆試面試高頻題目,感興趣的朋友一起看看吧2025-04-04java實(shí)現(xiàn)文件讀寫(xiě)與壓縮實(shí)例
這篇文章主要介紹了java實(shí)現(xiàn)文件讀寫(xiě)與壓縮實(shí)例,有助于讀者加深對(duì)文件操作的理解,需要的朋友可以參考下2014-07-07