欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Java并發(fā)包中線程池ThreadPoolExecutor

 更新時間:2021年06月11日 10:03:05   作者:CoderSheeper  
ThreadPoolExecutor是Java語言對于線程池的實現(xiàn)。線程池技術(shù)使線程在使用完畢后不回收而是重復(fù)利用。如果線程能夠復(fù)用,那么我們就可以使用固定數(shù)量的線程來解決并發(fā)問題,這樣一來不僅節(jié)約了系統(tǒng)資源,而且也會減少線程上下文切換的開銷

一、線程池簡介

線程池的使用主要是解決兩個問題:①當(dāng)執(zhí)行大量異步任務(wù)的時候線程池能夠提供更好的性能,在不使用線程池時候,每當(dāng)需要執(zhí)行異步任務(wù)的時候直接new一個線程來運行的話,線程的創(chuàng)建和銷毀都是需要開銷的。而線程池中的線程是可復(fù)用的,不需要每次執(zhí)行異步任務(wù)的時候重新創(chuàng)建和銷毀線程;②線程池提供一種資源限制和管理的手段,比如可以限制線程的個數(shù),動態(tài)的新增線程等等。

在下面的分析中,我們可以看到,線程池使用一個Integer的原子類型變量來記錄線程池狀態(tài)和線程池中的線程數(shù)量,通過線程池狀態(tài)來控制任務(wù)的執(zhí)行,每個工作線程Worker線程可以處理多個任務(wù)。

二、ThreadPoolExecutor類

2.1、ThreadPoolExecutor成員變量以含義

ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個Integer類型的原子變量,用來記錄線程池的狀態(tài)和線程池中的線程的個數(shù)。這里(Integer看做32位)ctl高三位表示線程池的狀態(tài),后面的29位表示線程池中的線程個數(shù)。如下所示是ThreadPoolExecutor源碼中的成員變量

//(高3位)表示線程池狀態(tài),(低29位)表示線程池中線程的個數(shù);
// 默認(rèn)狀態(tài)是RUNNING,線程池中線程個數(shù)為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//表示具體平臺下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)表示的數(shù)才是線程的個數(shù);
//其中Integer.SIZE=32,-3之后的低29位表示的就是線程的個數(shù)了
private static final int COUNT_BITS = Integer.SIZE - 3;

//線程最大個數(shù)(低29位)00011111111111111111111111111111(1<<29-1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//線程池狀態(tài)(高3位表示線程池狀態(tài))
//111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

//獲取高3位(運行狀態(tài))==> c & 11100000000000000000000000000000
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//獲取低29位(線程個數(shù))==> c &  00011111111111111111111111111111
private static int workerCountOf(int c)  { return c & CAPACITY; }

//計算原子變量ctl新值(運行狀態(tài)和線程個數(shù))
private static int ctlOf(int rs, int wc) { return rs | wc; }

下面我們簡單解釋一下上面的線程狀態(tài)的含義:

①RUNNING:接受新任務(wù)并處理阻塞隊列中的任務(wù)

②SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列中的任務(wù)

③STOP:拒絕新任務(wù)并拋棄阻塞隊列中的任務(wù),同時會中斷當(dāng)前正在執(zhí)行的任務(wù)

④TIDYING:所有任務(wù)執(zhí)行完之后(包含阻塞隊列中的任務(wù))當(dāng)前線程池中活躍的線程數(shù)量為0,將要調(diào)用terminated方法

⑥TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用之后的狀態(tài)

2.2、ThreadPoolExecutor的參數(shù)以及實現(xiàn)原理

①corePoolSize:線程池核心現(xiàn)車個數(shù)

②workQueue:用于保存等待任務(wù)執(zhí)行的任務(wù)的阻塞隊列(比如基于數(shù)組的有界阻塞隊列ArrayBlockingQueue、基于鏈表的無界阻塞隊列LinkedBlockingQueue等等)

③maximumPoolSize:線程池最大線程數(shù)量

④ThreadFactory:創(chuàng)建線程的工廠

⑤RejectedExecutionHandler:拒絕策略,表示當(dāng)隊列已滿并且線程數(shù)量達(dá)到線程池最大線程數(shù)量的時候?qū)π绿峤坏娜蝿?wù)所采取的策略,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調(diào)用者所在線程來運行該任務(wù))、DiscardOldestPolicy(丟掉阻塞隊列中最近的一個任務(wù)來處理當(dāng)前提交的任務(wù))、DiscardPolicy(不做處理,直接丟棄掉)

⑥keepAliveTime:存活時間,如果當(dāng)前線程池中的數(shù)量比核心線程數(shù)量多,并且當(dāng)前線程是閑置狀態(tài),該變量就是這些線程的最大生存時間

⑦TimeUnit:存活時間的時間單位。

根據(jù)上面的參數(shù)介紹,簡單了解一下線程池的實現(xiàn)原理,以提交一個新任務(wù)為開始點,分析線程池的主要處理流程

2.3、關(guān)于一些線程池的使用類型

①newFixedThreadPool:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)均為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前空閑即回收。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

②newSingleThreadExecutor:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1 的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前線程空閑即回收該線程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

③newCachedThreadPoolExecutor:創(chuàng)建一個按需創(chuàng)建線程的線程池,初始線程個數(shù)為0,最多線程個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列(最多只有一個元素),keepAliveTime=60說明只要當(dāng)前線程在60s內(nèi)空閑則回收。這個類型的線程池的特點就是:加入同步隊列的任務(wù)會被馬上執(zhí)行,同步隊列中最多只有一個任務(wù)

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

2.4、ThreadPoolExecutor中的其他成員

//獨占鎖,用來控制新增工作線程Worker操作的原子性
private final ReentrantLock mainLock = new ReentrantLock();

//工作線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務(wù)的線程對象
//Worker實現(xiàn)AQS,并自己實現(xiàn)了簡單不可重入獨占鎖,其中state=0表示當(dāng)前鎖未被獲取狀態(tài),state=1表示鎖被獲取,
//state=-1表示W(wǎng)ork創(chuàng)建時候的默認(rèn)狀態(tài),創(chuàng)建時候設(shè)置state=-1是為了防止runWorker方法運行前被中斷
private final HashSet<Worker> workers = new HashSet<Worker>();

//termination是該鎖對應(yīng)的條件隊列,在線程調(diào)用awaitTermination時候用來存放阻塞的線程
private final Condition termination = mainLock.newCondition();

三、execute(Runnable command)方法實現(xiàn)

executor方法的作用是提交任務(wù)command到線程池執(zhí)行,可以簡單的按照下面的圖進(jìn)行理解,ThreadPoolExecutor的實現(xiàn)類似于一個生產(chǎn)者消費者模型,當(dāng)用戶添加任務(wù)到線程池中相當(dāng)于生產(chǎn)者生產(chǎn)元素,workers工作線程則直接執(zhí)行任務(wù)或者從任務(wù)隊列中獲取任務(wù),相當(dāng)于消費之消費元素。

public void execute(Runnable command) {
    //(1)首先檢查任務(wù)是否為null,為null拋出異常,否則進(jìn)行下面的步驟
    if (command == null)
        throw new NullPointerException();
    //(2)ctl值中包含了當(dāng)前線程池的狀態(tài)和線程池中的線程數(shù)量
    int c = ctl.get();
    //(3)workerCountOf方法是獲取低29位,即獲取當(dāng)前線程池中的線程個數(shù),如果小于corePoolSize,就開啟新的線程運行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //(4)如果線程池處理RUNNING狀態(tài),就添加任務(wù)到阻塞隊列中
    if (isRunning(c) && workQueue.offer(command)) {
        //(4-1)二次檢查,獲取ctl值
        int recheck = ctl.get();
        //(4-2)如果當(dāng)前線程池不是出于RUNNING狀態(tài),就從隊列中刪除任務(wù),并執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //(4-3)否則,如果線程池為空,就添加一個線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //(5)如果隊列滿,則新增線程,如果新增線程失敗,就執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

我們在看一下上面代碼的執(zhí)行流程,按照標(biāo)記的數(shù)字進(jìn)行分析:

①步驟(3)判斷當(dāng)前線程池中的線程個數(shù)是否小于corePoolSize,如果小于核心線程數(shù),會向workers里面新增一個核心線程執(zhí)行任務(wù)。

②如果當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù),就執(zhí)行(4)。(4)首先判斷當(dāng)前線程池是否處于RUNNING狀態(tài),如果處于該狀態(tài),就添加任務(wù)到任務(wù)隊列中,這里需要判斷線程池的狀態(tài)是因為線程池可能已經(jīng)處于非RUNNING狀態(tài),而在非RUNNING狀態(tài)下是需要拋棄新任務(wù)的。

③如果想任務(wù)隊列中添加任務(wù)成功,需要進(jìn)行二次校驗,因為在添加任務(wù)到任務(wù)隊列后,可能線程池的狀態(tài)發(fā)生了變化,所以這里需要進(jìn)行二次校驗,如果當(dāng)前線程池已經(jīng)不是RUNNING狀態(tài)了,需要將任務(wù)從任務(wù)隊列中移除,然后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行4-3代碼重新判斷當(dāng)前線程池是否為空,如果線程池為空沒有線程,那么就需要新創(chuàng)建一個線程。

④如果上面的步驟(4)創(chuàng)建添加任務(wù)失敗,說明隊列已滿,那么(5)會嘗試再開啟新的線程執(zhí)行任務(wù)(類比上圖中的thread3和thread4,即不是核心線程的那些線程),如果當(dāng)前線程池中的線程個數(shù)已經(jīng)大于最大線程數(shù)maximumPoolSize,表示不能開啟新的線程。這就屬于線程池滿并且任務(wù)隊列滿,就需要執(zhí)行拒絕策略了。

下面我們在看看addWorker方法的實現(xiàn)

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //(6)檢查隊列是否只在必要時候為空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //(7)使用CAS增加線程個數(shù)
        for (;;) {
            //根據(jù)ctl值獲得當(dāng)前線程池中的線程數(shù)量
            int wc = workerCountOf(c);
            //(7-1)如果線程數(shù)量超出限制,返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //(7-2)CAS增加線程數(shù)量,同時只有一個線程可以成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // 重新讀取ctl值
            //(7-3)CAS失敗了,需要查看當(dāng)前線程池狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要跳轉(zhuǎn)到外層循環(huán)嘗試重新獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新進(jìn)行CAS增加線程數(shù)量
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //(8)執(zhí)行到這里說明CAS增加新線程個數(shù)成功了,我們需要開始創(chuàng)建新的工作線程Worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //(8-1)創(chuàng)建新的worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //(8-2)加獨占鎖,保證workers的同步,可能線程池中的多個線程調(diào)用了線程池的execute方法
            mainLock.lock();
            try {
                // (8-3)重新檢查線程池狀態(tài),以免在獲取鎖之前調(diào)用shutdown方法改變線程池狀態(tài)
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //(8-4)添加新任務(wù)
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //(8-6)添加新任務(wù)成功之后,啟動任務(wù)
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

簡單再分析說明一下上面的代碼,addWorker方法主要分為兩部分,第一部分是使用CAS線程安全的添加線程數(shù)量,第二部分則是創(chuàng)建新的線程并且將任務(wù)并發(fā)安全的添加到新的workers之中,然后啟動線程執(zhí)行。

①代碼(6)中檢查隊列是否只在必要時候為空,只有線程池狀態(tài)符合條件才能夠進(jìn)行下面的步驟,從(6)中的判斷條件來看,下面的集中情況addWorker會直接返回false

( I )當(dāng)前線程池狀態(tài)為STOP,TIDYING或者TERMINATED ; (I I)當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個任務(wù); (I I I)當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊列為空

②外層循環(huán)中判斷條件通過之后,在內(nèi)層循環(huán)中使用CAS增加線程數(shù),當(dāng)CAS成功就退出雙重循環(huán)進(jìn)行(8)步驟代碼的執(zhí)行,如果失敗需要查看當(dāng)前線程池的狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要進(jìn)行外層循環(huán)重新判斷線程池狀態(tài)然后在進(jìn)入內(nèi)層循環(huán)重新進(jìn)行CAS增加線程數(shù),如果線程池狀態(tài)沒有發(fā)生變化但是上一次CAS失敗就繼續(xù)進(jìn)行CAS嘗試。

③執(zhí)行到(8)代碼處,表明當(dāng)前已經(jīng)成功增加 了線程數(shù),但是還沒有線程執(zhí)行任務(wù)。ThreadPoolExecutor中使用全局獨占鎖mainLock來控制將新增的工作線程Worker線程安全的添加到工作者線程集合workers中。

④(8-2)獲取了獨占鎖,但是在獲取到鎖之后,還需要進(jìn)行重新檢查線程池的狀態(tài),這是為了避免在獲取全局獨占鎖之前其他線程調(diào)用了shutDown方法關(guān)閉了線程池。如果線程池已經(jīng)關(guān)閉需要釋放鎖。否則將新增的線程添加到工作集合中,釋放鎖啟動線程執(zhí)行任務(wù)。

上面的addWorker方法最后幾行中,會判斷添加工作線程是否成功,如果失敗,會執(zhí)行addWorkerFailed方法,將任務(wù)從workers中移除,并且workerCount做-1操作。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    //獲取鎖
    mainLock.lock();
    try {
      //如果worker不為null
      if (w != null)
          //workers移除worker
          workers.remove(w);
      //通過CAS操作,workerCount-1
      decrementWorkerCount();
      tryTerminate();
    } finally {
      //釋放鎖
      mainLock.unlock();
    }
}

四、工作線程Worker的執(zhí)行

4.1、工作線程Worker類源碼分析

上面查看addWorker方法在CAS更新線程數(shù)成功之后,下面就是創(chuàng)建新的Worker線程執(zhí)行任務(wù),所以我們這里先查看Worker類,下面是Worker類的源碼,我們可以看出,Worker類繼承了AQS并實現(xiàn)了Runnable接口,所以他既是一個自定義的同步組件,也是一個執(zhí)行任務(wù)的線程類。下面我們分析Worker類的執(zhí)行

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** 使用線程工廠創(chuàng)建的線程,執(zhí)行任務(wù) */
    final Thread thread;
    /** 初始化執(zhí)行任務(wù) */
    Runnable firstTask;
    /** 計數(shù) */
    volatile long completedTasks;

    /**
     * 給出初始firstTask,線程創(chuàng)建工廠創(chuàng)建新的線程
     */
    Worker(Runnable firstTask) {
        setState(-1); // 防止在調(diào)用runWorker之前被中斷
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); //使用threadFactory創(chuàng)建線程
    }

    /** run方法實際上執(zhí)行的是runWorker方法  */
    public void run() {
        runWorker(this);
    }

    // 關(guān)于同步狀態(tài)(鎖)
    //
    // 同步狀態(tài)state=0表示鎖未被獲取
    // 同步狀態(tài)state=1表示鎖被獲取

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    //下面都是重寫AQS的方法,Worker為自定義的同步組件
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

在構(gòu)造函數(shù)中我們可以看出,首先將同步狀態(tài)state置為-1,而Worker這個同步組件的state有三個值,其中state=-1表示W(wǎng)ork創(chuàng)建時候的默認(rèn)狀態(tài),創(chuàng)建時候設(shè)置state=-1是為了防止runWorker方法運行前被中斷前面說到過這個結(jié)論,這里置為-1是為了避免當(dāng)前Worker在調(diào)用runWorker方法之前被中斷(當(dāng)其他線程調(diào)用線程池的shutDownNow時候,如果Worker的state>=0則會中斷線程),設(shè)置為-1就不會被中斷了。而Worker實現(xiàn)Runnable接口,那么需要重寫run方法,在run方法中,我們可以看到,實際上執(zhí)行的是runWorker方法,在runWorker方法中,會首先調(diào)用unlock方法,該方法會將state置為0,所以這個時候調(diào)用shutDownNow方法就會中斷當(dāng)前線程,而這個時候已經(jīng)進(jìn)入了runWork方法了,就不會在還沒有執(zhí)行runWorker方法的時候就中斷線程。

4.2、runWorker方法的源碼分析

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 這個時候調(diào)用unlock方法,將state置為0,就可以被中斷了
    boolean completedAbruptly = true;
    try {
        //(10)如果當(dāng)前任務(wù)為null,或者從任務(wù)隊列中獲取到的任務(wù)為null,就跳轉(zhuǎn)到(11)處執(zhí)行清理工作
        while (task != null || (task = getTask()) != null) {
            //task不為null,就需要線程執(zhí)行任務(wù),這個時候,需要獲取工作線程內(nèi)部持有的獨占鎖
            w.lock();
            /**如果線程池已被停止(STOP)(至少大于STOP狀態(tài)),要確保線程都被中斷
             * 如果狀態(tài)不對,檢查當(dāng)前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP
             * 如果上述滿足,檢查該對象是否處于中斷狀態(tài),不清除中斷標(biāo)記
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //中斷該對象
                wt.interrupt();
            try {
                //執(zhí)行任務(wù)之前要做的事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); //執(zhí)行任務(wù)
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //執(zhí)行任務(wù)之后的方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //更新當(dāng)前已完成任務(wù)數(shù)量
                w.completedTasks++;
                //釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //執(zhí)行清理工作:處理并退出當(dāng)前worker
        processWorkerExit(w, completedAbruptly);
    }
}

我們梳理一下runWorker方法的執(zhí)行流程

①首先先執(zhí)行unlock方法,將Worker的state置為0,這樣工作線程就可以被中斷了(后續(xù)的操作如果線程池關(guān)閉就需要線程被中斷)

②首先判斷判斷當(dāng)前的任務(wù)(當(dāng)前工作線程中的task,或者從任務(wù)隊列中取出的task)是否為null,如果不為null就往下執(zhí)行,為null就執(zhí)行processWorkerExit方法。

③獲取工作線程內(nèi)部持有的獨占鎖(避免在執(zhí)行任務(wù)期間,其他線程調(diào)用shutdown后正在執(zhí)行的任務(wù)被中斷,shutdown只會中斷當(dāng)前被阻塞掛起的沒有執(zhí)行任務(wù)的線程)

④然后執(zhí)行beforeExecute()方法,該方法為擴(kuò)展接口代碼,表示在具體執(zhí)行任務(wù)之前所做的一些事情,然后執(zhí)行task.run()方法執(zhí)行具體任務(wù),執(zhí)行完之后會調(diào)用afterExecute()方法,用以處理任務(wù)執(zhí)行完畢之后的工作,也是一個擴(kuò)展接口代碼。

⑤更新當(dāng)前線程池完成的任務(wù)數(shù),并釋放鎖

4.3、執(zhí)行清理工作的方法processWorkerExit

下面是方法processWorkerExit的源碼,在下面的代碼中

①首先(1-1)處統(tǒng)計線程池完成的任務(wù)個數(shù),并且在此之前獲取全局鎖,然后更新當(dāng)前的全局計數(shù)器,然后從工作線程集合中移除當(dāng)前工作線程,完成清理工作。

②代碼(1-2)調(diào)用了tryTerminate方法,在該方法中,判斷了當(dāng)前線程池狀態(tài)是SHUTDOWN并且隊列不為空或者當(dāng)前線程池狀態(tài)為STOP并且當(dāng)前線程池中沒有活動線程,則置線程池狀態(tài)為TERMINATED。如果設(shè)置稱為了TERMINATED狀態(tài),還需要調(diào)用全局條件變量termination的signalAll方法喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的所有線程都停止,從而使得線程池為TERMINATED狀態(tài)。

③代碼(1-3)處判斷當(dāng)前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是,需要新增一個線程保證有足夠的線程可以執(zhí)行任務(wù)隊列中的任務(wù)或者提交的任務(wù)。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /*
    *completedAbruptly:是由runWorker傳過來的參數(shù),表示是否突然完成的意思
    *當(dāng)在就是在執(zhí)行任務(wù)過程當(dāng)中出現(xiàn)異常,就會突然完成,傳true
    *
    *如果是突然完成,需要通過CAS操作,更新workerCount(-1操作)
    *不是突然完成,則不需要-1,因為getTask方法當(dāng)中已經(jīng)-1(getTask方法中執(zhí)行了decrementWorkerCount()方法)
    */
    if (completedAbruptly)
        decrementWorkerCount();
    //(1-1)在統(tǒng)計完成任務(wù)個數(shù)之前加上全局鎖,然后統(tǒng)計線程池中完成的任務(wù)個數(shù)并更新全局計數(shù)器,并從工作集中刪除當(dāng)前worker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //獲得全局鎖
    try {
        completedTaskCount += w.completedTasks; //更新已完成的任務(wù)數(shù)量
        workers.remove(w); //將完成該任務(wù)的線程worker從工作線程集合中移除
    } finally {
        mainLock.unlock(); //釋放鎖
    }
    /**(1-2)
     * 這一個方法調(diào)用完成了下面的事情:
     * 判斷如果當(dāng)前線程池狀態(tài)是SHUTDOWN并且工作隊列為空,
     * 或者當(dāng)前線程池狀態(tài)是STOP并且當(dāng)前線程池里面沒有活動線程,
     * 則設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED,如果設(shè)置成了TERMINATED狀態(tài),
     * 還需要調(diào)用條件變量termination的signAll方法激活所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程
     */
    tryTerminate();

    //(1-3)如果當(dāng)前線程池中線程數(shù)小于核心線程,則增加核心線程數(shù)
    int c = ctl.get();
    //判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)
    if (runStateLessThan(c, STOP)) {
        //如果任務(wù)忽然完成,執(zhí)行后續(xù)的代碼
        if (!completedAbruptly) {
            //allowCoreThreadTimeOut表示是否允許核心線程超時,默認(rèn)為false
            //min這里當(dāng)默認(rèn)為allowCoreThreadTimeOut默認(rèn)為false的時候,min置為coorPoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //這里說明:如果允許核心線程超時,那么allowCoreThreadTimeOut可為true,那么min值為0,不需要維護(hù)核心線程了
            //如果min為0并且任務(wù)隊列不為空
            if (min == 0 && ! workQueue.isEmpty())
                min = 1; //這里表示如果min為0,且隊列不為空,那么至少需要一個核心線程存活來保證任務(wù)的執(zhí)行
            //如果工作線程數(shù)大于min,表示當(dāng)前線程數(shù)滿足,直接返回
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

在tryTerminate方法中,我們簡單說明了該方法的作用,下面是該方法的源碼,可以看出源碼實現(xiàn)上和上面所總結(jié)的功能是差不多的

final void tryTerminate() {
    for (;;) {
        //獲取線程池狀態(tài)
        int c = ctl.get();
        //如果線程池狀態(tài)為RUNNING
        //或者狀態(tài)大于TIDYING
        //或者狀態(tài)==SHUTDOWN并未任務(wù)隊列不為空
        //直接返回,不能調(diào)用terminated方法
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //如果線程池中工作線程數(shù)不為0,需要中斷線程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //獲得線程池的全局鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //通過CAS操作,將線程池狀態(tài)設(shè)置為TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }
                try {
                    //調(diào)用terminated方法
                    terminated();
                } finally {
                    //最終將線程狀態(tài)設(shè)置為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //調(diào)用條件變量termination的signaAll方法喚醒所有因為
                    //調(diào)用線程池的awaitTermination方法而被阻塞的線程
                    //private final Condition termination = mainLock.newCondition();
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

五、補充(shutdown、shutdownNow、awaitTermination方法)

5.1、shutdown操作

我們在使用線程池的時候知道,調(diào)用shutdown方法之后線程池就不會再接受新的任務(wù)了,但是任務(wù)隊列中的任務(wù)還是需要執(zhí)行完的。調(diào)用該方法會立刻返回,并不是等到線程池的任務(wù)隊列中的所有任務(wù)執(zhí)行完畢在返回的。

public void shutdown() {
    //獲得線程池的全局鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //進(jìn)行權(quán)限檢查
        checkShutdownAccess();

        //設(shè)置當(dāng)前線程池的狀態(tài)的SHUTDOWN,如果線程池狀態(tài)已經(jīng)是該狀態(tài)就會直接返回,下面我們會分析這個方法的源碼
        advanceRunState(SHUTDOWN);

        //設(shè)置中斷 標(biāo)志
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //嘗試將狀態(tài)變?yōu)門ERMINATED,上面已經(jīng)分析過該方法的源碼
    tryTerminate();
}

該方法的源碼比較簡短,首先檢查了安全管理器,是查看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限還需要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限將會拋出SecurityException異常或者空指針異常。下面我們查看一下advanceRunState 方法的源碼。

private void advanceRunState(int targetState) {
    for (;;) {
        //下面的方法執(zhí)行的就是:
        //首先獲取線程的ctl值,然后判斷當(dāng)前線程池的狀態(tài)如果已經(jīng)是SHUTDOWN,那么if條件第一個為真就直接返回
        //如果不是SHUTDOWN狀態(tài),就需要CAS的設(shè)置當(dāng)前狀態(tài)為SHUTDOWN
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            //private static int ctlOf(int rs, int wc) { return rs | wc; }
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

我們可以看出advanceRunState 方法實際上就是判斷當(dāng)前線程池的狀態(tài)是否為SHUTDWON,如果是那么就返回,否則就需要設(shè)置當(dāng)前狀態(tài)為SHUTDOWN。

我們再來看看shutdown方法中調(diào)用線程中斷的方法interruptIdleWorkers源碼

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //如果工作線程沒有被中斷,并且沒有正在運行設(shè)置中斷標(biāo)志
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //需要中斷當(dāng)前線程
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

上面的代碼中,需要設(shè)置所有空閑線程的中斷標(biāo)志。首先獲取線程池的全局鎖,同時只有一個線程可以調(diào)用shutdown方法設(shè)置中斷標(biāo)志。然后嘗試獲取工作線程Worker自己的鎖,獲取成功則可以設(shè)置中斷標(biāo)志(這是由于正在執(zhí)行任務(wù)的線程需要獲取自己的鎖,并且不可重入,所以正在執(zhí)行的任務(wù)沒有被中斷),這里要中斷的那些線程是阻塞到getTask()方法并嘗試從任務(wù)隊列中獲取任務(wù)的線程即空閑線程。

5.2、shutdownNow操作

在使用線程池的時候,如果我們調(diào)用了shutdownNow方法,線程池不僅不會再接受新的任務(wù),還會將任務(wù)隊列中的任務(wù)丟棄,正在執(zhí)行的任務(wù)也會被中斷,然后立刻返回該方法,不會等待激活的任務(wù)完成,返回值為當(dāng)前任務(wù)隊列中被丟棄的任務(wù)列表

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess(); //還是進(jìn)行權(quán)限檢查
        advanceRunState(STOP); //設(shè)置線程池狀態(tài)臺STOP
        interruptWorkers(); //中斷所有線程
        tasks = drainQueue(); //將任務(wù)隊列中的任務(wù)移動到task中
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks; //返回tasks
}

從上面的代碼中,我們可以可以發(fā)現(xiàn),shutdownNow方法也是首先需要檢查調(diào)用該方法的線程的權(quán)限,之后不同于shutdown方法之處在于需要即刻設(shè)置當(dāng)前線程池狀態(tài)為STOP,然后中斷所有線程(空閑線程+正在執(zhí)行任務(wù)的線程),移除任務(wù)隊列中的任務(wù)

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) //不需要判斷當(dāng)前線程是否在執(zhí)行任務(wù)(即不需要調(diào)用w.tryLock方法),中斷所有線程
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

5.3、awaitTermination操作

當(dāng)線程調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回,下面是該方法的源碼。

//調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //阻塞當(dāng)前線程,(獲取了Worker自己的鎖),那么當(dāng)前線程就不會再執(zhí)行任務(wù)(因為獲取不到鎖)
        for (;;) {
            //當(dāng)前線程池狀態(tài)為TERMINATED狀態(tài),會返回true
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            //超時時間到返回false
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

在上面的代碼中,調(diào)用者線程需要首先獲取線程Worker自己的獨占鎖,然后在循環(huán)判斷當(dāng)前線程池是否已經(jīng)是TERMINATED狀態(tài),如果是則直接返回,否則說明當(dāng)前線程池中還有線程正在執(zhí)行任務(wù),這時候需要查看當(dāng)前設(shè)置的超時時間是否小于0,小于0說明不需要等待就直接返回,如果大于0就需要調(diào)用條件變量termination的awaitNanos方法等待設(shè)置的時間,并在這段時間之內(nèi)等待線程池的狀態(tài)變?yōu)門ERMINATED。

我們在前面說到清理線程池的方法processWorkerExit的時候,需要調(diào)用tryTerminated方法,在該方法中會查看當(dāng)前線程池狀態(tài)是否為TERMINATED,如果是該狀態(tài)也會調(diào)用termination.signalAll()方法喚醒所有線程池中因調(diào)用awaitTermination而被阻塞住的線程。

如果是設(shè)置了超時時間,那么termination的awaitNanos方法也會返回,這時候需要重新檢查線程池狀態(tài)是否為TERMINATED,如果是則返回,不是就繼續(xù)阻塞自己。

以上就是Java并發(fā)包中線程池ThreadPoolExecutor原理探究的詳細(xì)內(nèi)容,更多關(guān)于Java 并發(fā)包 線程池 ThreadPoolExecutor的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 一個JAVA小項目--Web應(yīng)用自動生成Word

    一個JAVA小項目--Web應(yīng)用自動生成Word

    前段時間接到一個Web應(yīng)用自動生成Word的需求,現(xiàn)整理了下一些關(guān)鍵步驟拿來分享一下。
    2014-05-05
  • SpringMVC ModelAndView的用法使用詳解

    SpringMVC ModelAndView的用法使用詳解

    這篇文章主要介紹了SpringMVC ModelAndView的用法使用詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • java簡單操作word實例

    java簡單操作word實例

    這篇文章主要為大家詳細(xì)介紹了java簡單操作word實例,感興趣的朋友可以參考一下
    2016-03-03
  • Java Socket循環(huán)接收數(shù)據(jù)readLine()阻塞的解決方案

    Java Socket循環(huán)接收數(shù)據(jù)readLine()阻塞的解決方案

    這篇文章主要介紹了Java Socket循環(huán)接收數(shù)據(jù)readLine()阻塞的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 詳解Java如何優(yōu)雅地書寫if-else

    詳解Java如何優(yōu)雅地書寫if-else

    在日常開發(fā)中我們常常遇到有多個if?else的情況,之間書寫顯得代碼冗余難看,對于追求更高質(zhì)量代碼的同學(xué),就會思考如何優(yōu)雅地處理這種代碼。本文我們就來探討下幾種優(yōu)化if?else的方法
    2022-08-08
  • fastjson全局日期序列化設(shè)置導(dǎo)致JSONField失效問題解決方案

    fastjson全局日期序列化設(shè)置導(dǎo)致JSONField失效問題解決方案

    這篇文章主要介紹了fastjson通過代碼指定全局序列化返回時間格式,導(dǎo)致使用JSONField注解標(biāo)注屬性的特殊日期返回格式失效問題的解決方案
    2023-01-01
  • MyBatis關(guān)閉一級緩存的兩種方式(分注解和xml兩種方式)

    MyBatis關(guān)閉一級緩存的兩種方式(分注解和xml兩種方式)

    這篇文章主要介紹了MyBatis關(guān)閉一級緩存的兩種方式(分注解和xml兩種方式),mybatis默認(rèn)開啟一級緩存,執(zhí)行2次相同sql,但是第一次查詢sql結(jié)果會加工處理這個時候需要關(guān)閉一級緩存,本文給大家詳細(xì)講解需要的朋友可以參考下
    2022-11-11
  • mybatis-plus常用注解@TableId和@TableField的用法

    mybatis-plus常用注解@TableId和@TableField的用法

    本文主要介紹了mybatis-plus常用注解@TableId和@TableField的用法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • JAVA JSP頁面技術(shù)之EL表達(dá)式整理歸納總結(jié)

    JAVA JSP頁面技術(shù)之EL表達(dá)式整理歸納總結(jié)

    這篇文章主要介紹了java中JSP頁面技術(shù)之EL表達(dá)式概念作用以及語法等的使用,需要的朋友可以參考
    2017-04-04
  • Spring File Storage文件的對象存儲框架基本使用小結(jié)

    Spring File Storage文件的對象存儲框架基本使用小結(jié)

    在開發(fā)過程當(dāng)中,會使用到存文檔、圖片、視頻、音頻等等,這些都會涉及存儲的問題,文件可以直接存服務(wù)器,但需要考慮帶寬和存儲空間,另外一種方式就是使用云存儲,這篇文章主要介紹了Spring File Storage文件的對象存儲框架基本使用小結(jié),需要的朋友可以參考下
    2024-08-08

最新評論