欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文搞懂Java的ThreadPoolExecutor原理

 更新時(shí)間:2023年06月13日 09:19:54   作者:半夏之沫  
都說經(jīng)典的就是好的,這句話放在Java的ThreadPoolExecutor上那是一點(diǎn)都沒錯(cuò),像現(xiàn)在數(shù)據(jù)庫連接的池化實(shí)現(xiàn),或者像Tomcat這種WEB服務(wù)器的線程管理,處處都有著ThreadPoolExecutor的影子,本篇文章將結(jié)合源碼實(shí)現(xiàn),對(duì)ThreadPoolExecutor的原理進(jìn)行一個(gè)深入學(xué)習(xí)

一. Executor框架簡介

Executor框架提供了組件來管理Java中的線程,Executor框架將其分為任務(wù),線程執(zhí)行任務(wù),任務(wù)執(zhí)行結(jié)果三部分。下面以表格形式對(duì)這三部分進(jìn)行說明。

項(xiàng)說明
任務(wù)Executor框架提供了Runnable接口和Callable接口,任務(wù)需要實(shí)現(xiàn)這兩個(gè)接口才能被線程執(zhí)行
線程執(zhí)行任務(wù)Executor框架提供了接口Executor和繼承于ExecutorExecutorService接口來定義任務(wù)執(zhí)行機(jī)制。Executor框架中的線程池類ThreadPoolExecutorScheduledThreadPoolExecutor均實(shí)現(xiàn)了ExecutorService接口
任務(wù)執(zhí)行結(jié)果Executor框架提供了Future接口和實(shí)現(xiàn)了Future接口的FutureTask類來定義任務(wù)執(zhí)行結(jié)果。

組件之間的類圖關(guān)系如下所示。

Executor接口是線程池的頂層接口,通常說到的線程池指的是ThreadPoolExecutor,同時(shí)ThreadPoolExecutor還有一個(gè)子類叫做ScheduledThreadPoolExecutor,其擴(kuò)展實(shí)現(xiàn)了延時(shí)執(zhí)行任務(wù)定時(shí)執(zhí)行任務(wù)的功能。

Executor框架指的是任務(wù)執(zhí)行任務(wù)的線程池任務(wù)執(zhí)行結(jié)果這三部分,切不可將Executor框架與Executor接口相混淆。

本篇文章就將對(duì)Executor框架中的ThreadPoolExecutor的源碼實(shí)現(xiàn)進(jìn)行學(xué)習(xí)。

二. 認(rèn)識(shí)ThreadPoolExecutor狀態(tài)

在學(xué)習(xí)ThreadPoolExecutor如何執(zhí)行任務(wù)前,先認(rèn)識(shí)一下ThreadPoolExecutor的狀態(tài)。

ThreadPoolExecutor繼承于AbstractExecutorService,并實(shí)現(xiàn)了ExecutorService接口,是Executor框架的核心類,用于管理線程。

ThreadPoolExecutor使用了原子整型ctl來表示線程池狀態(tài)和Worker數(shù)量。ctl是一個(gè)原子整型,前3位表示線程池狀態(tài),后29位表示Worker數(shù)量。ThreadPoolExecutor中這部分的源碼如下所示。

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 取整型前3位,即獲取線程池狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 取整型后29位,即獲取Worker數(shù)量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根據(jù)線程池狀態(tài)和Worker數(shù)量拼裝ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 線程池狀態(tài)判斷
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 線程池狀態(tài)判斷
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // 判斷線程池狀態(tài)是否為RUNNING
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    ......
}

可知ThreadPoolExecutor有如下五種線程池狀態(tài)。

  • RUNNING,線程池接受新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為111;
  • SHUTDOWN,線程池拒絕新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為000;
  • STOP,線程池拒絕新任務(wù),不會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),嘗試中斷正在執(zhí)行的任務(wù),ctl前三位表示為001;
  • TIDYING,所有任務(wù)被關(guān)閉,Worker數(shù)量為0,ctl前三位表示為010;
  • TERMINATEDterminated() 執(zhí)行完畢,ctl前三位表示為011。

得益于ctl的結(jié)構(gòu),所以無論Worker數(shù)量是多少,ThreadPoolExecutor中線程池狀態(tài)存在如下關(guān)系。

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

因此runStateLessThan()runStateAtLeast()isRunning() 方法可以方便的對(duì)線程池狀態(tài)進(jìn)行判斷。

三. 執(zhí)行任務(wù)源碼分析

作為線程池,ThreadPoolExecutor最重要也最經(jīng)典的地方,當(dāng)然就是執(zhí)行任務(wù)了。本節(jié)對(duì)ThreadPoolExecutor執(zhí)行任務(wù)的流程進(jìn)行一個(gè)學(xué)習(xí)。

ThreadPoolExecutor中執(zhí)行任務(wù)的入口方法為execute(),其實(shí)現(xiàn)如下。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker并執(zhí)行任務(wù)
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果線程池狀態(tài)突然不再是RUNNING,則嘗試將任務(wù)從任務(wù)阻塞隊(duì)列中刪除,刪除成功則為該任務(wù)執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池中Worker數(shù)量突然為0,則創(chuàng)建一個(gè)Worker來執(zhí)行任務(wù)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 執(zhí)行到這里表示線程池狀態(tài)已經(jīng)不是RUNNING或者任務(wù)阻塞隊(duì)列已滿
    // 此時(shí)嘗試新建一個(gè)Worker來執(zhí)行任務(wù)
    // 如果新建一個(gè)Worker來執(zhí)行任務(wù)失敗,表明線程池狀態(tài)不再是RUNNING或者Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

execute() 中會(huì)根據(jù)Worker數(shù)量和線程池狀態(tài)來決定是新建Worker來執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列。新建Worker來執(zhí)行任務(wù)的實(shí)現(xiàn)如下所示。

private boolean addWorker(Runnable firstTask, boolean core) {
    // 標(biāo)記外層for循環(huán)
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取線程池狀態(tài)
        int rs = runStateOf(c);
        // 線程池狀態(tài)為RUNNING時(shí),可以創(chuàng)建Worker
        // 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空時(shí),可以創(chuàng)建初始任務(wù)為null的Worker
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取Worker數(shù)量
            int wc = workerCountOf(c);
            // 如果Worker數(shù)量大于CAPACITY,拒絕創(chuàng)建Worker
            // core為true表示創(chuàng)建核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于核心線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該將任務(wù)添加到任務(wù)阻塞隊(duì)列
            // core為false表示創(chuàng)建非核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于最大線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該執(zhí)行拒絕策略
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 以CAS方式將Worker數(shù)量加1
            // 加1成功表明無競爭發(fā)生,從外層for循環(huán)跳出
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 加1失敗表明有競爭發(fā)生,此時(shí)需要重新獲取ctl的值
            c = ctl.get();
            // 重新獲取ctl后如果發(fā)現(xiàn)線程池狀態(tài)發(fā)生了改變,此時(shí)重新執(zhí)行外層for循環(huán),即需要基于新的線程池狀態(tài)判斷是否允許創(chuàng)建Worker
            // 重新獲取ctl后如果線程池狀態(tài)未發(fā)生改變,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),即嘗試再一次以CAS方式將Worker數(shù)量加1
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 創(chuàng)建一個(gè)Worker
        w = new Worker(firstTask);
        // 獲取Worker的線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 由于線程池中存儲(chǔ)Worker的集合為HashSet,因此將Worker添加到Worker集合時(shí)需要獲取全局鎖保證線程安全
            mainLock.lock();
            try {
                // 再一次獲取線程池狀態(tài)
                int rs = runStateOf(ctl.get());
                // 如果線程池狀態(tài)還是為RUNNING或者線程池狀態(tài)為SHUTDOWN但創(chuàng)建的Worker的初始任務(wù)為null,則允許將創(chuàng)建出來的Worker添加到集合
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 檢查一下Worker的線程是否可以啟動(dòng)(處于活動(dòng)狀態(tài)的線程無法再啟動(dòng))
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 將Worker添加到Worker集合
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize用于記錄線程池最多存在過的Worker數(shù)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動(dòng)Worker線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // Worker線程沒有成功啟動(dòng)起來,此時(shí)需要對(duì)該Worker的創(chuàng)建執(zhí)行回滾操作
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker() 方法中只允許兩種情況可以創(chuàng)建Worker。

  • 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
  • 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為nullWorker。

一旦Worker創(chuàng)建成功,就會(huì)將Worker的線程啟動(dòng),如果Worker創(chuàng)建失敗或者Worker的線程啟動(dòng)失敗,則會(huì)調(diào)用addWorkerFailed() 方法執(zhí)行回滾操作,其實(shí)現(xiàn)如下所示。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 如果Worker添加到了Worker集合中,則將Worker從Worker集合中刪除
        if (w != null)
            workers.remove(w);
        // 以CAS方式將Worker數(shù)量減1
        decrementWorkerCount();
        // 嘗試終止線程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

由于Worker自身實(shí)現(xiàn)了Runnable,因此Worker自身就是一個(gè)任務(wù),實(shí)際上Worker的線程執(zhí)行的任務(wù)就是Worker本身,因此addWorker() 中將Worker的線程啟動(dòng)時(shí),會(huì)調(diào)用Workerrun() 方法,其實(shí)現(xiàn)如下。

public void run() {
    runWorker(this);
}

Workerrun() 方法中調(diào)用了ThreadPoolExecutorrunWorker() 方法,其實(shí)現(xiàn)如下所示。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 如果task為null,則從任務(wù)阻塞隊(duì)列中獲取任務(wù)
        // 通常Worker啟動(dòng)時(shí)會(huì)先執(zhí)行初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 線程池正在停止時(shí),需要確保當(dāng)前Worker的線程是被中斷的
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // Worker執(zhí)行任務(wù)發(fā)生異常或者從getTask()中獲取任務(wù)為空時(shí)會(huì)執(zhí)行這里的邏輯
        // processWorkerExit()會(huì)將Worker從Worker集合中刪除,并嘗試終止線程池
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker() 方法就是先讓Worker將初始任務(wù)(如果有的話)執(zhí)行完,然后循環(huán)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來執(zhí)行,如果Worker執(zhí)行任務(wù)發(fā)生異?;蛘邚娜蝿?wù)阻塞隊(duì)列獲取任務(wù)失?。ǐ@取到的任務(wù)為null),則調(diào)用processWorkerExit() 方法來將自身從Worker集合中刪除。下面先看一下getTask() 方法的實(shí)現(xiàn)。

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 如果線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù)
        // 如果線程池狀態(tài)為STOP,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 如果allowCoreThreadTimeOut為true,或者當(dāng)前線程數(shù)大于核心線程數(shù),此時(shí)timed為true,表明從任務(wù)阻塞隊(duì)列以超時(shí)退出的方式獲取任務(wù)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 如果當(dāng)前線程數(shù)大于最大線程數(shù),則當(dāng)前Worker應(yīng)該被刪除
        // 如果當(dāng)前Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出,且任務(wù)阻塞隊(duì)列現(xiàn)在還是為空,則當(dāng)前Worker應(yīng)該被刪除
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 從任務(wù)阻塞隊(duì)列中獲取任務(wù)
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                // 獲取到任務(wù)則返回該任務(wù)
                return r;
            // timedOut為true表明Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask() 方法在如下情況不允許Worker從任務(wù)阻塞隊(duì)列中獲取任務(wù)。

  • 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空;
  • 線程池狀態(tài)為STOP。

如果Worker有資格從任務(wù)阻塞隊(duì)列獲取任務(wù),那么當(dāng)allowCoreThreadTimeOuttrue,或者當(dāng)前線程數(shù)大于核心線程數(shù)時(shí),Worker以超時(shí)退出的方式獲取任務(wù),否則Worker以一直阻塞的方式獲取任務(wù)。

當(dāng)WorkergetTask() 方法中獲取任務(wù)失敗時(shí),getTask() 方法會(huì)返回null,從而導(dǎo)致Worker會(huì)執(zhí)行processWorkerExit() 方法來刪除自身,其實(shí)現(xiàn)如下所示。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // completedAbruptly為true表明是執(zhí)行任務(wù)時(shí)發(fā)生異常導(dǎo)致Worker需要被刪除
    if (completedAbruptly)
        // 修正Worker數(shù)量
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 將Worker從Worker集合中刪除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 嘗試終止線程池
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

WorkerprocessWorkerExit() 方法中刪除自身之后,還會(huì)調(diào)用tryTerminate() 嘗試終止線程池,tryTerminate() 方法很精髓,后面會(huì)對(duì)其進(jìn)行詳細(xì)分析,這里暫且不談。至此,Worker的創(chuàng)建,執(zhí)行任務(wù),獲取任務(wù)和刪除的整個(gè)流程已經(jīng)大體分析完畢。

對(duì)于執(zhí)行任務(wù),現(xiàn)在簡單進(jìn)行一個(gè)小結(jié)。

ThreadPoolExecutor執(zhí)行任務(wù),第一步是根據(jù)Worker數(shù)量來決定是新建Worker來執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列,這里的判斷規(guī)則如下。

  • 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker來執(zhí)行任務(wù);
  • 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列;
  • 如果任務(wù)阻塞隊(duì)列已滿,則創(chuàng)建Worker來執(zhí)行任務(wù);
  • 如果Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行任務(wù)拒絕策略。

當(dāng)要新建Worker來執(zhí)行任務(wù)時(shí),只有兩種情況可以新建Worker,如下所示。

  • 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
  • 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為nullWorker。

Worker自身實(shí)現(xiàn)了Runnable,且Worker持有一個(gè)線程,當(dāng)Worker啟動(dòng)時(shí),就是啟動(dòng)Worker持有的線程,而這個(gè)線程執(zhí)行的任務(wù)就是Worker自身。

Worker啟動(dòng)后,會(huì)首先執(zhí)行自己的初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù)。

四. 關(guān)閉線程池源碼分析

不再使用的線程池,可以進(jìn)行關(guān)閉。關(guān)閉ThreadPoolExecutor的方法有shutdown()shutdownNow(),本節(jié)將對(duì)ThreadPoolExecutor的關(guān)閉進(jìn)行分析。

1. shutdown()

首先分析shutdown() 方法,其實(shí)現(xiàn)如下。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 循環(huán)通過CAS方式將線程池狀態(tài)置為SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中斷空閑Worker
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    // 嘗試終止線程池
    tryTerminate();
}

shutdown() 方法中首先會(huì)將線程池狀態(tài)置為SHUTDOWN,然后調(diào)用interruptIdleWorkers() 方法中斷空閑Worker,最后調(diào)用tryTerminate() 方法來嘗試終止線程池。那么這里要解釋一下什么是空閑Worker,先看一下interruptIdleWorkers() 的實(shí)現(xiàn)。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 中斷線程前需要先嘗試獲取Worker的鎖
            // 只能獲取到空閑Worker的鎖,所以shutdown()方法只會(huì)中斷空閑Worker
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

調(diào)用interruptIdleWorkers() 方法中斷Worker前首先需要嘗試獲取Worker的鎖,已知Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,然后在runWorker()Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,這里看一下Workerlock() 方法的實(shí)現(xiàn)。

public void lock() {
    acquire(1);
}
protected boolean tryAcquire(int unused) {
    // 以CAS方式將state從0設(shè)置為1
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

可以發(fā)現(xiàn),Workerlock() 中調(diào)用了acquire() 方法,該方法由AbstractQueuedSynchronizer抽象類提供,在acquire() 中會(huì)調(diào)用其子類實(shí)現(xiàn)的tryAcquire() 方法,tryAcquire() 方法會(huì)以CAS方式將state從0設(shè)置為1,因此這樣的設(shè)計(jì)讓Worker是一把不可重入鎖。

回到interruptIdleWorkers() 方法,前面提到該方法中斷Worker前會(huì)嘗試獲取Worker的鎖,能夠獲取到鎖才會(huì)中斷Worker,而因?yàn)?strong>Worker是不可重入鎖,所以正在執(zhí)行任務(wù)的Worker是無法獲取到鎖的,只有那些沒有執(zhí)行任務(wù)的Worker的鎖才能夠被獲取,因此所謂的中斷空閑Worker,實(shí)際就是中斷沒有執(zhí)行任務(wù)的Worker,那些執(zhí)行任務(wù)的Workershutdown() 方法被調(diào)用時(shí)不會(huì)被中斷,這些Worker執(zhí)行完任務(wù)后會(huì)繼續(xù)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來執(zhí)行,直到任務(wù)阻塞隊(duì)列為空,此時(shí)沒有被中斷過的Worker也會(huì)被刪除掉,等到線程池中沒有Worker以及任務(wù)阻塞隊(duì)列沒有任務(wù)后,線程池才會(huì)被終止掉。

對(duì)于shutdown() 方法,一句話總結(jié)就是:將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),等到線程池Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),關(guān)閉線程池。

2. shutdownNow()

現(xiàn)在再來分析shutdownNow() 方法。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 循環(huán)通過CAS方式將線程池狀態(tài)置為STOP
        advanceRunState(STOP);
        // 中斷所有Worker
        interruptWorkers();
        // 將任務(wù)阻塞隊(duì)列中的任務(wù)獲取出來并返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 嘗試終止線程池
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 中斷線程池中所有Worker
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

shutdownNow() 方法中首先會(huì)將線程池狀態(tài)置為STOP,然后調(diào)用interruptWorkers() 方法中斷線程池中的所有Worker,接著調(diào)用tryTerminate() 方法來嘗試終止線程池,最后shutdownNow() 方法會(huì)將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回。

shutdownNow() 方法調(diào)用之后,線程池中的所有Worker都會(huì)被中斷,包括正在執(zhí)行任務(wù)的Worker,等到所有Worker都被刪除之后,線程池即被終止,也就是說,shutdownNow() 不會(huì)保證當(dāng)前時(shí)刻正在執(zhí)行的任務(wù)會(huì)被安全的執(zhí)行完,并且會(huì)放棄執(zhí)行任務(wù)阻塞隊(duì)列中的所有任務(wù)。

3. tryTerminate()

關(guān)于線程池的關(guān)閉,還有一個(gè)重要的方法,那就是前面多次提到的tryTerminate() 方法,該方法能確保線程池可以被正確的關(guān)閉,其實(shí)現(xiàn)如下所示。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 如果線程池狀態(tài)為RUNNING,則沒有資格終止線程池
        // 如果線程池狀態(tài)大于等于TIDYING,則沒有資格終止線程池
        // 如果線程池狀態(tài)為SHUTDOWN但任務(wù)阻塞隊(duì)列不為空,則沒有資格終止線程池
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 線程池狀態(tài)為SHUTDOWN且任務(wù)阻塞隊(duì)列為空會(huì)執(zhí)行到這里
        // 線程池狀態(tài)為STOP會(huì)執(zhí)行到這里
        // Worker數(shù)量不為0,表明當(dāng)前還有正在執(zhí)行任務(wù)的Worker或者空閑的Worker,此時(shí)中斷一個(gè)空閑的Worker
        // 在這里被中斷的空閑Worker會(huì)在getTask()方法中返回null,從而執(zhí)行processWorkerExit(),最終該Worker會(huì)被刪除
        // processWorkerExit()方法中又會(huì)調(diào)用tryTerminate(),因此將shutdown信號(hào)在空閑Worker之間進(jìn)行了傳播
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將線程池狀態(tài)置為TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 終止線程池
                    terminated();
                } finally {
                    // 將線程池狀態(tài)最終置為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

tryTerminate() 方法的官方注釋中給出了兩種線程池會(huì)被終止的情況:

  • 線程池的狀態(tài)為SHUTDOWN,Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空;
  • 線程池的狀態(tài)為STOP,Worker數(shù)量為0。

官方注釋中還說明在所有可能導(dǎo)致線程池終止的操作中都應(yīng)該調(diào)用tryTerminate() 方法來嘗試終止線程池,因此線程池中Worker被刪除時(shí)任務(wù)阻塞隊(duì)列中任務(wù)被刪除時(shí)會(huì)調(diào)用tryTerminate(),以達(dá)到在線程池符合終止條件時(shí)及時(shí)終止線程池。

4. 小結(jié)

對(duì)于關(guān)閉線程池,簡單小結(jié)如下。

關(guān)閉ThreadPoolExecutor有兩種方式,如下所示。

  • shutdown()。調(diào)用shutdown() 方法會(huì)首先將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),然后中斷空閑Worker,等到線程池中Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),線程池被真正關(guān)閉;
  • shutdownNow()。調(diào)用shutdownNow() 方法會(huì)首先將線程池狀態(tài)置為STOP,然后中斷所有Worker(包括正在執(zhí)行任務(wù)的Worker),并將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回,當(dāng)線程池Worker數(shù)量為0時(shí),線程池被真正關(guān)閉。

還有一點(diǎn)需要說明,Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,所以正在執(zhí)行任務(wù)的Worker的鎖是無法被獲取的,換言之,只有沒有執(zhí)行任務(wù)的Worker的鎖才能被獲取,這些Worker就稱為空閑Worker

以上就是一文搞懂Java的ThreadPoolExecutor原理的詳細(xì)內(nèi)容,更多關(guān)于Java ThreadPoolExecutor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java接口和抽象類原理詳解

    Java接口和抽象類原理詳解

    這篇文章主要介紹了Java接口和抽象類原理詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • Spring?中的InitializingBean使用示例

    Spring?中的InitializingBean使用示例

    InitializingBean?是?Spring?框架中的一個(gè)接口,用于在?Spring?容器中初始化?bean?時(shí)執(zhí)行特定的初始化邏輯,這篇文章主要介紹了Spring?中的InitializingBean使用示例,需要的朋友可以參考下
    2024-08-08
  • spring boot配置讀寫分離的完整實(shí)現(xiàn)步驟

    spring boot配置讀寫分離的完整實(shí)現(xiàn)步驟

    數(shù)據(jù)庫配置主從之后,如何在代碼層面實(shí)現(xiàn)讀寫分離?所以下面這篇文章主要給大家介紹了關(guān)于spring boot配置讀寫分離的完整步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2018-09-09
  • 通過實(shí)例解析spring對(duì)象生命周期

    通過實(shí)例解析spring對(duì)象生命周期

    這篇文章主要介紹了通過實(shí)例解析spring對(duì)象生命周期,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • Servlet系列兩種創(chuàng)建方式

    Servlet系列兩種創(chuàng)建方式

    本文主要介紹了Servlet系列兩種創(chuàng)建方式,包含Servlet2.5之前使用和Servlet3.0后,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-02-02
  • Java結(jié)構(gòu)型設(shè)計(jì)模式之橋接模式詳細(xì)講解

    Java結(jié)構(gòu)型設(shè)計(jì)模式之橋接模式詳細(xì)講解

    橋接,顧名思義,就是用來連接兩個(gè)部分,使得兩個(gè)部分可以互相通訊。橋接模式將系統(tǒng)的抽象部分與實(shí)現(xiàn)部分分離解耦,使他們可以獨(dú)立的變化。本文通過示例詳細(xì)介紹了橋接模式的原理與使用,需要的可以參考一下
    2022-09-09
  • 使用postman傳遞list集合后臺(tái)springmvc接收

    使用postman傳遞list集合后臺(tái)springmvc接收

    這篇文章主要介紹了使用postman傳遞list集合后臺(tái)springmvc接收的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Maven中怎么手動(dòng)添加jar包到本地倉庫詳解(repository)

    Maven中怎么手動(dòng)添加jar包到本地倉庫詳解(repository)

    這篇文章主要給大家介紹了關(guān)于Maven中怎么手動(dòng)添加jar包到本地倉庫的相關(guān)資料,文中通過圖文以及實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2023-04-04
  • Java面試題沖刺第二天--Redis篇

    Java面試題沖刺第二天--Redis篇

    這篇文章主要為大家分享了最有價(jià)值的三道java面試題,涵蓋內(nèi)容全面,包括數(shù)據(jù)結(jié)構(gòu)和算法相關(guān)的題目、經(jīng)典面試編程題等,感興趣的小伙伴們可以參考一下
    2021-07-07
  • 解析SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能

    解析SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能

    這篇文章主要介紹了SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能,AOP面向切面編程,通過為目標(biāo)類織入切面的方式,實(shí)現(xiàn)對(duì)目標(biāo)類功能的增強(qiáng),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-09-09

最新評(píng)論