欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java 線程池ThreadPoolExecutor源碼解析

 更新時(shí)間:2022年03月11日 13:04:42   作者:Q.E.D.  
這篇文章主要介紹了Java 線程池ThreadPoolExecutor源碼解析

引導(dǎo)語(yǔ)

線程池我們?cè)诠ぷ髦薪?jīng)常會(huì)用到。在請(qǐng)求量大時(shí),使用線程池,可以充分利用機(jī)器資源,增加請(qǐng)求的處理速度,本章節(jié)我們就和大家一起來(lái)學(xué)習(xí)線程池。

本章的順序,先說(shuō)源碼,弄懂原理,接著看一看面試題,最后看看實(shí)際工作中是如何運(yùn)用線程池的。

1、整體架構(gòu)圖

我們畫了線程池的整體圖,如下:

圖片描述

 本小節(jié)主要就按照這個(gè)圖來(lái)進(jìn)行 ThreadPoolExecutor 源碼的講解,大家在看各個(gè)方法時(shí),可以結(jié)合這個(gè)圖一起看。

1.1、類結(jié)構(gòu)

首先我們來(lái)看一下 ThreadPoolExecutor 的類結(jié)構(gòu),如下圖:

圖片描述

從上圖中,我們從命名上來(lái)看,都有 Executor 的共同命名,Executor 的中文意思為執(zhí)行的意思,表示對(duì)提供的任務(wù)進(jìn)行執(zhí)行,我們?cè)诘谖逭戮€程中學(xué)習(xí)到了幾種任務(wù):Runnable、Callable、FutureTask,之前我們都是使用 Thread 來(lái)執(zhí)行這些任務(wù)的,除了 Thread,這些 Executor 命名的類和接口也是可以執(zhí)行這幾種任務(wù)的,接下來(lái)我們大概的看下這幾個(gè)類的大概含義:

Executor:定義 execute 方法來(lái)執(zhí)行任務(wù),入?yún)⑹?Runnable,無(wú)出參:

圖片描述

 ExecutorService:Executor 的功能太弱,ExecutorService 豐富了對(duì)任務(wù)的執(zhí)行和管理的功能,主要代碼如下:

// 關(guān)閉,不會(huì)接受新的任務(wù),也不會(huì)等待未完成的任務(wù)
// 如果需要等待未完成的任務(wù),可以使用 awaitTermination 方法
void shutdown();
// executor 是否已經(jīng)關(guān)閉了,返回值 true 表示已關(guān)閉
boolean isShutdown();
// 所有的任務(wù)是否都已經(jīng)終止,是的話,返回 true
boolean isTerminated();
// 在超時(shí)時(shí)間內(nèi),等待剩余的任務(wù)終止
boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
// 提交有返回值的任務(wù),使用 get 方法可以阻塞等待任務(wù)的執(zhí)行結(jié)果返回
<T> Future<T> submit(Callable<T> task);
// 提交沒(méi)有返回值的任務(wù),如果使用 get 方法的話,任務(wù)執(zhí)行完之后得到的是 null 值
Future<?> submit(Runnable task);
// 給定任務(wù)集合,返回已經(jīng)執(zhí)行完成的 Future 集合,每個(gè)返回的 Future 都是 isDone = true 的狀態(tài)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
// 給定任務(wù)中有一個(gè)執(zhí)行成功就返回,如果拋異常,其余未完成的任務(wù)將被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

AbstractExecutorService 是一個(gè)抽象類,封裝了 Executor 的很多通用功能,比如:

// 把 Runnable 轉(zhuǎn)化成 RunnableFuture
// RunnableFuture 是一個(gè)接口,實(shí)現(xiàn)了 Runnable 和 Future
// FutureTask 是 RunnableFuture 的實(shí)現(xiàn)類,主要是對(duì)任務(wù)進(jìn)行各種管理
// Runnable + Future => RunnableFuture => FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
// 提交無(wú)返回值的任務(wù)
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // ftask 其實(shí)是 FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
// 提交有返回值的任務(wù)
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // ftask 其實(shí)是 FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

有幾個(gè)點(diǎn)需要注意下:

FutureTask 我們?cè)诘谖逭掠姓f(shuō),其本身就是一個(gè)任務(wù),而且具備對(duì)任務(wù)管理的功能,比如可以通過(guò) get 方法拿到任務(wù)的執(zhí)行結(jié)果;

submit 方法是我們平時(shí)使用線程池時(shí)提交任務(wù)的方法,支持 Runable 和 Callable 兩種任務(wù)的提交,方法中 execute 方法是其子類 ThreadPoolExecutor 實(shí)現(xiàn)的,不管是那種任務(wù)入?yún)?,execute 方法最終執(zhí)行的任務(wù)都是 FutureTask;

ThreadPoolExecutor 繼承了 AbstractExecutorService 抽象類,具備以上三個(gè)類的所有功能。

1.2、類注釋

ThreadPoolExecutor 的類注釋有很多,我們選取關(guān)鍵的注釋如下:

  1. ExecutorService 使用線程池中的線程執(zhí)行提交的任務(wù),線程池我們可以使用 Executors 進(jìn)行配置;
  2. 線程池解決兩個(gè)問(wèn)題:1:通過(guò)減少任務(wù)間的調(diào)度開銷 (主要是通過(guò)線程池中的線程被重復(fù)使用的方式),來(lái)提高大量任務(wù)時(shí)的執(zhí)行性能;2:提供了一種方式來(lái)管理線程和消費(fèi),維護(hù)基本數(shù)據(jù)統(tǒng)計(jì)等工作,比如統(tǒng)計(jì)已完成的任務(wù)數(shù);
  3. Executors 為常用的場(chǎng)景設(shè)定了可直接初始化線程池的方法,比如 Executors#newCachedThreadPool 無(wú)界的線程池,并且可以自動(dòng)回收;Executors#newFixedThreadPool 固定大小線程池;Executors#newSingleThreadExecutor 單個(gè)線程的線程池;
  4. 為了在各種上下文中使用線程池,線程池提供可供擴(kuò)展的參數(shù)設(shè)置:1:coreSize:當(dāng)新任務(wù)提交時(shí),發(fā)現(xiàn)運(yùn)行的線程數(shù)小于 coreSize,一個(gè)新的線程將被創(chuàng)建,即使這時(shí)候其它工作線程是空閑的,可以通過(guò) getCorePoolSize 方法獲得 coreSize;2:maxSize: 當(dāng)任務(wù)提交時(shí),coreSize < 運(yùn)行線程數(shù) <= maxSize,但隊(duì)列沒(méi)有滿時(shí),任務(wù)提交到隊(duì)列中,如果隊(duì)列滿了,在 maxSize 允許的范圍內(nèi)新建線程;
  5. 一般來(lái)說(shuō),coreSize 和 maxSize 在線程池初始化時(shí)就已經(jīng)設(shè)定了,但我們也可以通過(guò) setCorePoolSize、setMaximumPoolSize 方法動(dòng)態(tài)的修改這兩個(gè)值;
  6. 默認(rèn)的,core threads 需要到任務(wù)提交后才創(chuàng)建的,但我們可以分別使用 prestartCoreThread、prestartAllCoreThreads 兩個(gè)方法來(lái)提前創(chuàng)建一個(gè)、所有的 core threads;
  7. 新的線程被默認(rèn) ThreadFactory 創(chuàng)建時(shí),優(yōu)先級(jí)會(huì)被限制成 NORM_PRIORITY,默認(rèn)會(huì)被設(shè)置成非守護(hù)線程,這個(gè)和新建線程的繼承是不同的;
  8. Keep-alive times 參數(shù)的作用:1:如果當(dāng)前線程池中有超過(guò) coreSize 的線程;2:并且線程空閑的時(shí)間超過(guò) keepAliveTime,當(dāng)前線程就會(huì)被回收,這樣可以避免線程沒(méi)有被使用時(shí)的資源浪費(fèi);
  9. 通過(guò) setKeepAliveTime 方法可以動(dòng)態(tài)的設(shè)置 keepAliveTime 的值;
  10. 如果設(shè)置 allowCoreThreadTimeOut 為 ture 的話,core thread 空閑時(shí)間超過(guò) keepAliveTime 的話,也會(huì)被回收;
  11. 線程池新建時(shí),有多種隊(duì)列可供選擇,比如:1:SynchronousQueue,為了避免任務(wù)被拒絕,要求線程池的 maxSize 無(wú)界,缺點(diǎn)是當(dāng)任務(wù)提交的速度超過(guò)消費(fèi)的速度時(shí),可能出現(xiàn)無(wú)限制的線程增長(zhǎng);2:LinkedBlockingQueue,無(wú)界隊(duì)列,未消費(fèi)的任務(wù)可以在隊(duì)列中等待;3:ArrayBlockingQueue,有界隊(duì)列,可以防止資源被耗盡;
  12. 隊(duì)列的維護(hù):提供了 getQueue () 方法方便我們進(jìn)行監(jiān)控和調(diào)試,嚴(yán)禁用于其他目的,remove 和 purge 兩個(gè)方法可以對(duì)隊(duì)列中的元素進(jìn)行操作;
  13. 在 Executor 已經(jīng)關(guān)閉或?qū)ψ畲缶€程和最大隊(duì)列都使用飽和時(shí),可以使用 RejectedExecutionHandler 類進(jìn)行異常捕捉,有如下四種處理策略:ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.DiscardPolicy、ThreadPoolExecutor.CallerRunsPolicy、ThreadPoolExecutor.DiscardOldestPolicy;
  14. 線程池提供了很多可供擴(kuò)展的鉤子函數(shù),比如有:1:提供在每個(gè)任務(wù)執(zhí)行之前 beforeExecute 和執(zhí)行之后 afterExecute 的鉤子方法,主要用于操作執(zhí)行環(huán)境,比如初始化 ThreadLocals、收集統(tǒng)計(jì)數(shù)據(jù)、添加日志條目等;2: 如果在執(zhí)行器執(zhí)行完成之后想干一些事情,可以實(shí)現(xiàn) terminated 方法,如果鉤子方法執(zhí)行時(shí)發(fā)生異常,工作線程可能會(huì)失敗并立即終止。

可以看到 ThreadPoolExecutor 的注釋是非常多的,也是非常重要的,我們很多面試的題目,在注釋上都能找到答案。

1.3、ThreadPoolExecutor 重要屬性

接下來(lái)我們來(lái)看一看 ThreadPoolExecutor 都有哪些重要屬性,如下:

//ctl 線程池狀態(tài)控制字段,由兩部分組成:
//1:workerCount  wc 工作線程數(shù),我們限制 workerCount 最大到(2^29)-1,大概 5 億個(gè)線程
//2:runState rs 線程池的狀態(tài),提供了生命周期的控制,源碼中有很多關(guān)于狀態(tài)的校驗(yàn),狀態(tài)枚舉如下:
//RUNNING(-536870912):接受新任務(wù)或者處理隊(duì)列里的任務(wù)。
//SHUTDOWN(0):不接受新任務(wù),但仍在處理已經(jīng)在隊(duì)列里面的任務(wù)。
//STOP(-536870912):不接受新任務(wù),也不處理隊(duì)列中的任務(wù),對(duì)正在執(zhí)行的任務(wù)進(jìn)行中斷。
//TIDYING(1073741824): 所以任務(wù)都被中斷,workerCount 是 0,整理狀態(tài)
//TERMINATED(1610612736): terminated() 已經(jīng)完成的時(shí)候
//runState 之間的轉(zhuǎn)變過(guò)程:
//RUNNING -> SHUTDOWN:調(diào)用 shudown(),finalize()
//(RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow()
//SHUTDOWN -> TIDYING -> workerCount ==0
//STOP -> TIDYING -> workerCount ==0
//TIDYING -> TERMINATED -> terminated() 執(zhí)行完成之后
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911
// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//-536870912
private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
private static final int STOP       =  1 << COUNT_BITS;//-536870912
private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
// 已完成任務(wù)的計(jì)數(shù)
volatile long completedTasks;
// 線程池最大容量
private int largestPoolSize;
// 已經(jīng)完成的任務(wù)數(shù)
private long completedTaskCount;
// 用戶可控制的參數(shù)都是 volatile 修飾的
// 可以使用 threadFactory 創(chuàng)建 thread
// 創(chuàng)建失敗一般不拋出異常,只有在 OutOfMemoryError 時(shí)候才會(huì)
private volatile ThreadFactory threadFactory;
// 飽和或者運(yùn)行中拒絕任務(wù)的 handler 處理類
private volatile RejectedExecutionHandler handler;
// 線程存活時(shí)間設(shè)置
private volatile long keepAliveTime;
// 設(shè)置 true 的話,核心線程空閑 keepAliveTime 時(shí)間后,也會(huì)被回收
private volatile boolean allowCoreThreadTimeOut;
// coreSize
private volatile int corePoolSize;
// maxSize 最大限制 (2^29)-1
private volatile int maximumPoolSize;
// 默認(rèn)的拒絕策略
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
// 隊(duì)列會(huì) hold 住任務(wù),并且利用隊(duì)列的阻塞的特性,來(lái)保持線程的存活周期
private final BlockingQueue<Runnable> workQueue;
// 大多數(shù)情況下是控制對(duì) workers 的訪問(wèn)權(quán)限
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
// 包含線程池中所有的工作線程
private final HashSet<Worker> workers = new HashSet<Worker>();

屬性也是非常多,為了方便理解線程池的狀態(tài)扭轉(zhuǎn),畫了一個(gè)圖:

圖片描述

 Worker 我們可以理解成線程池中任務(wù)運(yùn)行的最小單元,Worker 的大致結(jié)構(gòu)如下:

// 線程池中任務(wù)執(zhí)行的最小單元
// Worker 繼承 AQS,具有鎖功能
// Worker 實(shí)現(xiàn) Runnable,本身是一個(gè)可執(zhí)行的任務(wù)
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // 任務(wù)運(yùn)行的線程
    final Thread thread;
    // 需要執(zhí)行的任務(wù)
    Runnable firstTask;
    // 非常巧妙的設(shè)計(jì),Worker本身是個(gè) Runnable,把自己作為任務(wù)傳遞給 thread
    // 內(nèi)部有個(gè)屬性又設(shè)置了 Runnable
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 把 Worker 自己作為 thread 運(yùn)行的任務(wù)
        this.thread = getThreadFactory().newThread(this);
    }
   /** Worker 本身是 Runnable,run 方法是 Worker 執(zhí)行的入口, runWorker 是外部的方法 */
    public void run() {
        runWorker(this);
    }
    private static final long serialVersionUID = 6138294804551838833L;
    // Lock methods
    // 0 代表沒(méi)有鎖住,1 代表鎖住
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    // 嘗試加鎖,CAS 賦值為 1,表示鎖住
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // 嘗試釋放鎖,釋放鎖沒(méi)有 CAS 校驗(yàn),可以任意的釋放鎖
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

理解 Worker 非常關(guān)鍵,主要有以下幾點(diǎn):

Worker 很像是任務(wù)的代理,在線程池中,最小的執(zhí)行單位就是 Worker,所以 Worker 實(shí)現(xiàn)了 Runnable 接口,實(shí)現(xiàn)了 run 方法;

在 Worker 初始化時(shí) this.thread = getThreadFactory ().newThread (this) 這行代碼比較關(guān)鍵,它把當(dāng)前 Worker 作為線程的構(gòu)造器入?yún)ⅲ覀冊(cè)诤罄m(xù)的實(shí)現(xiàn)中會(huì)發(fā)現(xiàn)這樣的代碼:Thread t = w.thread;t.start (),此時(shí)的 w 是 Worker 的引用申明,此處 t.start 實(shí)際上執(zhí)行的就是 Worker 的 run 方法;

Worker 本身也實(shí)現(xiàn)了 AQS,所以其本身也是一個(gè)鎖,其在執(zhí)行任務(wù)的時(shí)候,會(huì)鎖住自己,任務(wù)執(zhí)行完成之后,會(huì)釋放自己。

2、線程池的任務(wù)提交

線程池的任務(wù)提交從 submit 方法說(shuō)起,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:

  1. 把 Runnable 和 Callable 都轉(zhuǎn)化成 FutureTask,這個(gè)我們之前看過(guò)源碼了;
  2. 使用 execute 方法執(zhí)行 FutureTask。

execute 方法是 ThreadPoolExecutor 中的方法,源碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 工作的線程小于核心線程數(shù),創(chuàng)建新的線程,成功返回,失敗不拋異常
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 線程池狀態(tài)可能發(fā)生變化
        c = ctl.get();
    }
    // 工作的線程大于等于核心線程數(shù),或者新建線程失敗
    // 線程池狀態(tài)正常,并且可以入隊(duì)的話,嘗試入隊(duì)列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果線程池狀態(tài)異常 嘗試從隊(duì)列中移除任務(wù),可以移除的話就拒絕掉任務(wù)
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 發(fā)現(xiàn)可運(yùn)行的線程數(shù)是 0,就初始化一個(gè)線程,這里是個(gè)極限情況,入隊(duì)的時(shí)候,突然發(fā)現(xiàn)
        // 可用線程都被回收了
        else if (workerCountOf(recheck) == 0)
            // Runnable是空的,不會(huì)影響新增線程,但是線程在 start 的時(shí)候不會(huì)運(yùn)行
            // Thread.run() 里面有判斷
            addWorker(null, false);
    }
    // 隊(duì)列滿了,開啟線程到 maxSize,如果失敗直接拒絕,
    else if (!addWorker(command, false))
        reject(command);
}

execute 方法執(zhí)行的就是整體架構(gòu)圖的左半邊的邏輯,其中多次調(diào)用 addWorker 方法,addWorker 方法的作用是新建一個(gè) Worker,我們一起來(lái)看下源碼:

// 結(jié)合線程池的情況看是否可以添加新的 worker
// firstTask 不為空可以直接執(zhí)行,為空?qǐng)?zhí)行不了,Thread.run()方法有判斷,Runnable為空不執(zhí)行
// core 為 true 表示線程最大新增個(gè)數(shù)是 coresize,false 表示最大新增個(gè)數(shù)是 maxsize
// 返回 true 代表成功,false 失敗
// break retry 跳到retry處,且不再進(jìn)入循環(huán)
// continue retry 跳到retry處,且再次進(jìn)入循環(huán)
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 先是各種狀態(tài)的校驗(yàn)
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // rs >= SHUTDOWN 說(shuō)明線程池狀態(tài)不正常
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            // 工作中的線程數(shù)大于等于容量,或者大于等于 coreSize or maxSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // break 結(jié)束 retry 的 for 循環(huán)
                break retry;
            c = ctl.get();  // Re-read ctl
            // 線程池狀態(tài)被更改
            if (runStateOf(c) != rs)
                // 跳轉(zhuǎn)到retry位置
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 巧妙的設(shè)計(jì),Worker 本身是個(gè) Runnable.
        // 在初始化的過(guò)程中,會(huì)把 worker 丟給 thread 去初始化
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動(dòng)線程,實(shí)際上去執(zhí)行 Worker.run 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker 方法首先是執(zhí)行了一堆校驗(yàn),然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 執(zhí)行 Worker,上文我們說(shuō)了 Worker 在初始化時(shí)的關(guān)鍵代碼:this.thread = getThreadFactory ().newThread (this),Worker(this) 是作為新建線程的構(gòu)造器入?yún)⒌?,所?t.start () 會(huì)執(zhí)行到 Worker 的 run 方法上,源碼如下:

public void run() {
    runWorker(this);
}

runWorker 方法是非常重要的方法,我們一起看下源碼實(shí)現(xiàn):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    //幫助gc回收
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task 為空的情況:
        // 1:任務(wù)入隊(duì)列了,極限情況下,發(fā)現(xiàn)沒(méi)有運(yùn)行的線程,于是新增一個(gè)線程;
        // 2:線程執(zhí)行完任務(wù)執(zhí)行,再次回到 while 循環(huán)。
        // 如果 task 為空,會(huì)使用 getTask 方法阻塞從隊(duì)列中拿數(shù)據(jù),如果拿不到數(shù)據(jù),會(huì)阻塞住
        while (task != null || (task = getTask()) != null) {
            //鎖住 worker
            w.lock();
            // 線程池 stop 中,但是線程沒(méi)有到達(dá)中斷狀態(tài),幫助線程中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //執(zhí)行 before 鉤子函數(shù)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //同步執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //執(zhí)行 after 鉤子函數(shù),如果這里拋出異常,會(huì)覆蓋 catch 的異常
                    //所以這里異常最好不要拋出來(lái)
                    afterExecute(task, thrown);
                }
            } finally {
                //任務(wù)執(zhí)行完成,計(jì)算解鎖
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //做一些拋出異常的善后工作
        processWorkerExit(w, completedAbruptly);
    }
}

這個(gè)方法執(zhí)行的邏輯是架構(gòu)圖中的標(biāo)紅部分:

圖片描述

我們聚焦一下這行代碼:task.run () 此時(shí)的 task 是什么呢?此時(shí)的 task 是 FutureTask 類,所以我們繼續(xù)追索到 FutureTask 類的 run 方法的源碼,如下:

/**
 * run 方法可以直接被調(diào)用
 * 也可以由線程池進(jìn)行調(diào)用
 */
public void run() {
    // 狀態(tài)不是任務(wù)創(chuàng)建,或者當(dāng)前任務(wù)已經(jīng)有線程在執(zhí)行了
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // Callable 不為空,并且已經(jīng)初始化完成
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 調(diào)用執(zhí)行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 給 outcome 賦值
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

 run 方法中有兩行關(guān)鍵代碼:

  • result = c.call () 這行代碼是真正執(zhí)行業(yè)務(wù)代碼的地方;
  • set (result) 這里是給 outCome 賦值,這樣 Future.get 方法執(zhí)行時(shí),就可以從 outCome 中拿值,這個(gè)我們?cè)凇禙uture、ExecutorService 源碼解析》章節(jié)中都有說(shuō)到。

至此,submit 方法就執(zhí)行完成了,整體流程比較復(fù)雜,我們畫一個(gè)圖釋義一下任務(wù)提交執(zhí)行的主流程:

圖片描述

3、線程執(zhí)行完任務(wù)之后都在干啥 

線程執(zhí)行完任務(wù)之后,是消亡還是干什么呢?這是一個(gè)值得思考的問(wèn)題,我們可以從源碼中找到答案,從 ThreadPoolExecutor 的 runWorker 方法中,不知道有沒(méi)有同學(xué)注意到一個(gè) while 循環(huán),我們截圖釋義一下:

圖片描述

 這個(gè) while 循環(huán)有個(gè) getTask 方法,getTask 的主要作用是阻塞從隊(duì)列中拿任務(wù)出來(lái),如果隊(duì)列中有任務(wù),那么就可以拿出來(lái)執(zhí)行,如果隊(duì)列中沒(méi)有任務(wù),這個(gè)線程會(huì)一直阻塞到有任務(wù)為止(或者超時(shí)阻塞),下面我們一起來(lái)看下 getTask 方法,源碼如下:

// 從阻塞隊(duì)列中拿任務(wù)
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //線程池關(guān)閉 && 隊(duì)列為空,不需要在運(yùn)行了,直接放回
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // true  運(yùn)行的線程數(shù)大于 coreSize || 核心線程也可以被滅亡
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 隊(duì)列以 LinkedBlockingQueue 為例,timedOut 為 true 的話說(shuō)明下面 poll 方法執(zhí)行返回的是 null
        // 說(shuō)明在等待 keepAliveTime 時(shí)間后,隊(duì)列中仍然沒(méi)有數(shù)據(jù)
        // 說(shuō)明此線程已經(jīng)空閑了 keepAliveTime 了
        // 再加上 wc > 1 || workQueue.isEmpty() 的判斷
        // 所以使用 compareAndDecrementWorkerCount 方法使線程池?cái)?shù)量減少 1
        // 并且直接 return,return 之后,此空閑的線程會(huì)自動(dòng)被回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 從隊(duì)列中阻塞拿 worker
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 設(shè)置已超時(shí),說(shuō)明此時(shí)隊(duì)列沒(méi)有數(shù)據(jù)
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

代碼有兩處關(guān)鍵:

使用隊(duì)列的 poll 或 take 方法從隊(duì)列中拿數(shù)據(jù),根據(jù)隊(duì)列的特性,隊(duì)列中有任務(wù)可以返回,隊(duì)列中無(wú)任務(wù)會(huì)阻塞;

方法中的第二個(gè) if 判斷,說(shuō)的是在滿足一定條件下(條件看注釋),會(huì)減少空閑的線程,減少的手段是使可用線程數(shù)減一,并且直接 return,直接 return 后,該線程就執(zhí)行結(jié)束了,JVM 會(huì)自動(dòng)回收該線程。

4、總結(jié)

本章節(jié)主要以 submit 方法為主線闡述了 ThreadPoolExecutor 的整體架構(gòu)和底層源碼,只要有隊(duì)列和線程的基礎(chǔ)知識(shí)的話,理解 ThreadPoolExecutor 并不復(fù)雜。ThreadPoolExecutor 還有一些其他的源碼,比如說(shuō)拒絕請(qǐng)求的策略、得到各種屬性、設(shè)置各種屬性等等方法,這些方法都比較簡(jiǎn)單,感興趣的同學(xué)可以自己去看一看。

以上就是Java 線程池ThreadPoolExecutor源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Java 線程池ThreadPoolExecutor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論