欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于ScheduledThreadPoolExecutor不執(zhí)行的原因分析

 更新時(shí)間:2023年08月10日 14:46:32   作者:Redick01  
這篇文章主要介紹了關(guān)于ScheduledThreadPoolExecutor不執(zhí)行的原因分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

ScheduledThreadPoolExecutor不執(zhí)行原因分析

最近在調(diào)試一個(gè)監(jiān)控應(yīng)用指標(biāo)的時(shí)候發(fā)現(xiàn)定時(shí)器在服務(wù)啟動(dòng)執(zhí)行一次之后就不執(zhí)行了,這里用的定時(shí)器是Java的調(diào)度線程池 ScheduledThreadPoolExecutor ,后來(lái)經(jīng)過(guò)排查發(fā)現(xiàn) ScheduledThreadPoolExecutor 線程池處理任務(wù)如果拋出異常,會(huì)導(dǎo)致線程池不調(diào)度;下面就通過(guò)一個(gè)例子簡(jiǎn)單分析下為什么異常會(huì)導(dǎo)致 ScheduledThreadPoolExecutor 不執(zhí)行。

ScheduledThreadPoolExecutor不調(diào)度分析

示例程序

在示例程序可以看到當(dāng)計(jì)數(shù)器中的計(jì)數(shù)達(dá)到5的時(shí)候就會(huì)主動(dòng)拋出一個(gè)異常,拋出異常后 ScheduledThreadPoolExecutor 就不調(diào)度了。

public class ScheduledTask {
    private static final AtomicInteger count = new AtomicInteger(0);
    private static final ScheduledThreadPoolExecutor SCHEDULED_TASK = new ScheduledThreadPoolExecutor(
            1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "sc-task");
            t.setDaemon(true);
            return t;
        }
    });
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        SCHEDULED_TASK.scheduleWithFixedDelay(() -> {
            System.out.println(111);
            if (count.get() == 5) {
                throw new IllegalArgumentException("my exception");
            }
            count.incrementAndGet();
        }, 0, 5, TimeUnit.SECONDS);
        latch.await();
    }
}

源碼分析

  • ScheduledThreadPoolExecutor#run

run方法內(nèi)部首先判斷任務(wù)是不是周期性的任務(wù),如果不是周期性任務(wù)通過(guò) ScheduledFutureTask.super.run(); 執(zhí)行任務(wù);如果狀態(tài)是運(yùn)行中或shutdown,取消任務(wù)執(zhí)行;如果是周期性的任務(wù),通過(guò) ScheduledFutureTask.super.runAndReset() 執(zhí)行任務(wù)并且重新設(shè)置狀態(tài),成功了就會(huì)執(zhí)行 setNextRunTime 設(shè)置下次調(diào)度的時(shí)間,問(wèn)題就是出現(xiàn)在 ScheduledFutureTask.super.runAndReset() ,這里執(zhí)行任務(wù)出現(xiàn)了異常,導(dǎo)致結(jié)果為false,就不進(jìn)行下次調(diào)度時(shí)間設(shè)置等

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
  • *FutureTask#runAndReset

在線程任務(wù)執(zhí)行過(guò)程中拋出異常,然后 catch 到了異常,最終導(dǎo)致這個(gè)方法返回false,然后 ScheduledThreadPoolExecutor#run 就不設(shè)置下次執(zhí)行時(shí)間了,代碼 c.call(); 拋出異常,跳過(guò) ran = true; 代碼,最終 runAndReset 返回false。

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

注意:

Java ScheduledThreadPoolExecutor 定時(shí)任務(wù)線程池所調(diào)度的任務(wù)中如果拋出了異常,并且異常沒(méi)有捕獲直接拋到框架中,會(huì)導(dǎo)致 ScheduledThreadPoolExecutor 定時(shí)任務(wù)不調(diào)度了,具體是因?yàn)楫?dāng)異常拋到 ScheduledThreadPoolExecutor 框架中時(shí)不進(jìn)行下次調(diào)度時(shí)間的設(shè)置,從而導(dǎo)致 ScheduledThreadPoolExecutor 定時(shí)任務(wù)不調(diào)度。

ScheduledThreadPoolExecutor線程池例子

ScheduledThreadPoolExecutor 使用

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor,主要用來(lái)給定時(shí)間運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。

在 ScheduledThreadPoolExecutor 的實(shí)現(xiàn)中,使用了 FutureTask 運(yùn)行任務(wù)以及使用無(wú)界隊(duì)列 DelayedWorkQueue 來(lái)保存任務(wù)。

1. 使用示例

  • 提交任務(wù)

ScheduledThreadPoolExecutor 實(shí)現(xiàn)了 ScheduledExecutorService 接口,其中,接口中有四個(gè)需要實(shí)現(xiàn)的方法,其中 schedule() 的兩個(gè)方法需要設(shè)置任務(wù)以及任務(wù)啟動(dòng)的延遲時(shí)間,scheduleAtFixedRate() 可以設(shè)置任務(wù)定時(shí)重復(fù)執(zhí)行,scheduleWithFixedDelay() 則是設(shè)置兩個(gè)任務(wù)之間的執(zhí)行延遲時(shí)間。

ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2); ?
poolExecutor.schedule(() -> { ?
?? ?// 提交的任務(wù)
}, 5, TimeUnit.HOURS); ?
poolExecutor.scheduleAtFixedRate(() -> { ?
?? ?// 提交的任務(wù)
}, 0, 5, TimeUnit.HOURS); ?
poolExecutor.scheduleWithFixedDelay(() -> { ?
?? ?// 提交的任務(wù)
}, 0, 5, TimeUnit.HOURS);
  • 簡(jiǎn)單例子

下面的例子中每 500 毫秒打印一次字符串,executor 會(huì)有 5 秒的時(shí)間來(lái)等待任務(wù)執(zhí)行結(jié)束,也就是一共可以打印 10 次字符串。

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10); ?
executor.scheduleWithFixedDelay(() -> { ?
? ? System.out.println("測(cè)試"); ?
}, 0, 500, TimeUnit.MILLISECONDS); ?
try { ?
? ? executor.awaitTermination(5, TimeUnit.SECONDS); ?
? ? executor.shutdown(); ?
} catch (InterruptedException e) { ?
? ? e.printStackTrace(); ?
}

ScheduledThreadPoolExecutor 原理

1. DelayedWorkQueue

ScheduledThreadPoolExecutor 的構(gòu)造方法對(duì) DelayedWorkQueue 進(jìn)行了初始化,并且最大線程數(shù)量設(shè)置成了 Integer.MAX_VALUE。

public ScheduledThreadPoolExecutor(int corePoolSize) { ?
? ? super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, ?
? ? ? ? ? new DelayedWorkQueue()); ?
}

其中,DelayedWorkQueue 中的隊(duì)列是 RunnableScheduledFuture 類(lèi)型以及容量為 16 的數(shù)組。

private static final int INITIAL_CAPACITY = 16; ?
private RunnableScheduledFuture<?>[] queue = ?
? ? new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; ?
private final ReentrantLock lock = new ReentrantLock();

隊(duì)列添加任務(wù)是以 DelayedWorkQueue 以堆作為數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)任務(wù),在添加元素的時(shí)候,會(huì)使用基于 Siftup 版本進(jìn)行元素添加,并且會(huì)根據(jù)任務(wù)的執(zhí)行時(shí)間的大小來(lái)排序。

public boolean offer(Runnable x) { ?
? ? if (x == null) ?
? ? ? ? throw new NullPointerException();
? ? RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; ?
? ? final ReentrantLock lock = this.lock; ?
? ? lock.lock();
? ? try {
?? ? ? ?// 當(dāng)隊(duì)列的容量不夠,會(huì)擴(kuò)充 50%
? ? ? ? int i = size;
? ? ? ? if (i >= queue.length)
? ? ? ? ? ? grow();
? ? ? ? size = i + 1;
? ? ? ? if (i == 0) {
? ? ? ? ? ? queue[0] = e;
? ? ? ? ? ? setIndex(e, 0);
? ? ? ? } else { ?
? ? ? ? ? ? siftUp(i, e); ?
? ? ? ? } ?
? ? ? ? if (queue[0] == e) { ?
? ? ? ? ? ? leader = null; ?
? ? ? ? ? ? available.signal(); ?
? ? ? ? } ?
? ? } finally { ?
? ? ? ? lock.unlock(); ?
? ? } ?
? ? return true; ?
}

2. delayedExecute()

ScheduledExecutorService 接口的四個(gè)實(shí)現(xiàn)方法中都涉及到了 delayedExecute(),方法主要用來(lái)判斷線程池的狀態(tài)以及對(duì)線程進(jìn)行初始化。

private void delayedExecute(RunnableScheduledFuture<?> task) {
?? ?// 如果線程池關(guān)閉了,需要執(zhí)行飽和策略
? ? if (isShutdown()) ?
? ? ? ? reject(task); ?
? ? else {
?? ? ? ?// 添加到隊(duì)列中
? ? ? ? super.getQueue().add(task); ?
? ? ? ? if (isShutdown() && ?
? ? ? ? ? ? !canRunInCurrentRunState(task.isPeriodic()) && ?
? ? ? ? ? ? remove(task)) ?
? ? ? ? ? ? task.cancel(false); ?
? ? ? ? else
?? ? ? ? ? ?// 判斷等待隊(duì)列中是否已經(jīng)滿了,會(huì)使用到 ThreadPoolExecutor
?? ? ? ? ? ?ensurePrestart(); ?
? ? } ?
}
void ensurePrestart() { ?
? ? int wc = workerCountOf(ctl.get()); ?
? ? if (wc < corePoolSize) ?
? ? ? ? addWorker(null, true); ?
? ? else if (wc == 0) ?
? ? ? ? addWorker(null, false); ?
}

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論