欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java線程池源碼的深度解析

 更新時(shí)間:2022年10月24日 09:54:08   作者:JAVA旭陽(yáng)  
線程池的好處和使用本篇文章就不贅敘了,這篇文章主要通過線程池的源碼帶大家深入了解一下jdk8中線程池的實(shí)現(xiàn),感興趣的小伙伴可以了解一下

概述

線程池的好處和使用本篇文章就不贅敘了,不了解的可以參考下面兩篇文章:

那么本文重點(diǎn)是從源碼層面理解jdk8中線程池的實(shí)現(xiàn)。

核心機(jī)制

再分析源碼之前,我們還是先回顧和熟悉下線程的核心工作機(jī)制。

線程池工作原理

線程池采用的是一種生產(chǎn)者-消費(fèi)者的模型,如下圖:

  • 主線程調(diào)用execute、或者submit等方法提交任務(wù)給線程池。
  • 如果線程池中正在運(yùn)行的工作線程數(shù)量小于corePoolSize(核心線程數(shù)量),那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù)。
  • 如果線程池中正在運(yùn)行的工作線程數(shù)量大于或等于 corePoolSize(核心線程數(shù)量),那么將這個(gè)任務(wù)放入隊(duì)列,稍后執(zhí)行。
  • 如果這時(shí)隊(duì)列滿了且正在運(yùn)行的工作線程數(shù)量還小于 maximumPoolSize(最大線程數(shù)量),那么會(huì)創(chuàng)建非核心工作線程立刻運(yùn)行這個(gè)任務(wù),這部分非核心工作線程空閑超過一定的時(shí)間(keepAliveTime)時(shí),就會(huì)被銷毀回收。
  • 如果最終提交的任務(wù)超過了maximumPoolSize(最大線程數(shù)量),那么就會(huì)執(zhí)行拒絕策略。

線程池狀態(tài)

線程池的狀態(tài)有5種,他們的狀態(tài)轉(zhuǎn)換如上圖所示,這里記得區(qū)別線程的狀態(tài),它們不是一回事。

ThreadPoolExecutor類存放線程池的狀態(tài)信息很特別,是存儲(chǔ)在一個(gè)int類型原子變量的高3位,而低29位用來(lái)存儲(chǔ)線程池當(dāng)前運(yùn)行的線程數(shù)量。通過將線程池的狀態(tài)和線程數(shù)量合二為一,可以做到一次CAS原子操作更新數(shù)據(jù)。

狀態(tài)高3位值說(shuō)明
RUNNING111運(yùn)行狀態(tài),線程池被創(chuàng)建后的初始狀態(tài),能接受新提交的任務(wù),也能處理阻塞隊(duì)列中的任務(wù)。
SHUTDOWN000關(guān)閉狀態(tài),不再接受新提交的任務(wù),但任可以處理阻塞隊(duì)列中的任務(wù)。
STOP001停止?fàn)顟B(tài),會(huì)中斷正在處理的線程,不能接受新提交的任務(wù),也不會(huì)處理阻塞隊(duì)列中的任務(wù)。
TIDYING010所有任務(wù)都已經(jīng)終止,有效工作線程為0。
TERMINATED011終止?fàn)顟B(tài),線程池徹底終止。

源碼解析

上圖是線程池核心類ThreadPoolExecutor的類結(jié)構(gòu)圖:

  • Executor: 提交任務(wù)的基礎(chǔ)接口,只有一個(gè)execute方法。
  • ExecutorService: 繼承自Executor,它提供管理終止的方法,以及可以產(chǎn)生Future的方法,用于跟蹤一個(gè)或多個(gè)異步任務(wù)的進(jìn)度。
  • AbstractExecutorService: 提供ExecutorService執(zhí)行方法的默認(rèn)實(shí)現(xiàn)。
  • ThreadPoolExecutor: 線程池類本類,實(shí)現(xiàn)了線程池的核心邏輯。
  • Worker: ThreadPoolExecutor的內(nèi)部類,工作線程類,繼承自 AQS。
  • *Policy: 其他Policy結(jié)尾的都是內(nèi)置的決策策略類。

關(guān)鍵成員變量

1.線程池的狀態(tài)信息和線程數(shù)量信息(ctl)相關(guān)

線程的狀態(tài)信息和數(shù)量信息用同一個(gè)int的原子變量存儲(chǔ),高3位存儲(chǔ)狀態(tài)信息,低29位存儲(chǔ)線程數(shù)量。

// ctl,原子變量,存儲(chǔ)狀態(tài)和線程數(shù)量,初始化運(yùn)行狀態(tài)+0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 靜態(tài)常量,表示線程數(shù)量存放的位數(shù)29=32-3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程數(shù)量最大的容量,低 COUNT_BITS 位所能表達(dá)的最大數(shù)值,000 11111111111111111111 => 5億多
private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

通過位運(yùn)算符設(shè)置各個(gè)狀態(tài)的高三位值。

// 111 000000000000000000,轉(zhuǎn)換成整數(shù)后其實(shí)就是一個(gè)【負(fù)數(shù)】
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

從ctl中獲取線程池的狀態(tài)值

// ~CAPACITY = ~000 11111111111111111111 = 111 000000000000000000000(取反)
// &運(yùn)算符,和1&是它本身,和0&就是0,就可以獲得高位值。
private static int runStateOf(int c)     { return c & ~CAPACITY; }

從ctl中獲取線程池的數(shù)量

// CAPACITY = 000 11111111111111111111
// &運(yùn)算符,和1&是它本身,和0&就是0,就可以獲得低29位
private static int workerCountOf(int c)  { return c & CAPACITY; }

生成ctl值

// rs 表示線程池狀態(tài),wc 表示當(dāng)前線程池中 worker(線程)數(shù)量,相與以后就是合并后的狀態(tài)
private static int ctlOf(int rs, int wc) { return rs | wc; }

比較當(dāng)前線程池 ctl 所表示的狀態(tài)

線程池狀態(tài)值的大小關(guān)系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

// 比較當(dāng)前線程池 ctl 所表示的狀態(tài),是否小于某個(gè)狀態(tài) s
private static boolean runStateLessThan(int c, int s) { return c < s; }
// 比較當(dāng)前線程池 ctl 所表示的狀態(tài),是否大于等于某個(gè)狀態(tài)s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == 0
private static boolean isRunning(int c) { return c < SHUTDOWN; }

cas設(shè)置ctl的值

// 使用 CAS 方式 讓 ctl 值 +1 ,成功返回 true, 失敗返回 false
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
// 使用 CAS 方式 讓 ctl 值 -1 ,成功返回 true, 失敗返回 false
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
// 將 ctl 值減一,do while 循環(huán)會(huì)一直重試,直到成功為止
private void decrementWorkerCount() {
    do {} while (!compareAndDecrementWorkerCount(ctl.get()));
}

2.線程池中的隊(duì)列

// 線程池用于保存任務(wù)并將任務(wù)傳遞給工作線程的隊(duì)列
private final BlockingQueue<Runnable> workQueue;

3.控制并發(fā)的鎖

// 增加減少 worker 或者時(shí)修改線程池運(yùn)行狀態(tài)需要持有 mainLock
private final ReentrantLock mainLock = new ReentrantLock();

4.線程池中工作線程的集合

private final HashSet<Worker> workers = new HashSet<Worker>();

5.線程池構(gòu)造參數(shù)關(guān)系屬性

// 核心線程數(shù)量
private volatile int corePoolSize;
// 線程池最大線程數(shù)量
private volatile int maximumPoolSize;	
// 空閑線程存活時(shí)間
private volatile long keepAliveTime;	
// 創(chuàng)建線程時(shí)使用的線程工廠,默認(rèn)是 DefaultThreadFactory
private volatile ThreadFactory threadFactory;	
// 【超過核心線程提交任務(wù)就放入 阻塞隊(duì)列】
private final BlockingQueue<Runnable> workQueue;
// 拒絕策略
private volatile RejectedExecutionHandler handler;	

6.線程池監(jiān)控相關(guān)屬性

// 記錄線程池生命周期內(nèi)線程數(shù)最大值
private int largestPoolSize;	
// 記錄線程池所完成任務(wù)總數(shù),當(dāng)某個(gè) worker 退出時(shí)將完成的任務(wù)累加到該屬性
private long completedTaskCount;	

線程提交原理

線程池提交線程有多種方式如execute、submit或者invoke相關(guān)方法,我們重點(diǎn)關(guān)注在最基礎(chǔ)的execute()方法提交任務(wù),把它搞清楚了,其他的都不在話下。

execute(Runnable command)方法是線程提交的入口方法。

//  ThreadPoolExecutor#execute
public void execute(Runnable command) {
        // 如果任務(wù)為空,直接拋空指針
        if (command == null)
            throw new NullPointerException();
        // 獲取ctl的值,其中高3位是狀態(tài)信息,低3位是線程數(shù)量
        int c = ctl.get();
        // workerCountOf獲取當(dāng)前線程的數(shù)量
        // 當(dāng)前線程數(shù)量小于核心線程數(shù),調(diào)用addWorker創(chuàng)建一個(gè)工作線程
        if (workerCountOf(c) < corePoolSize) {
            // 調(diào)用addWorker方法創(chuàng)建工作線程,直接執(zhí)行任務(wù)。如果成功的話,直接結(jié)束方法。
            if (addWorker(command, true))
                return;
            // 由于并發(fā)等原因,addWorker添加失敗,會(huì)走到這里,再次獲取ctl的值
            c = ctl.get();
        }
    	// 如果線程池是運(yùn)行狀態(tài)的話,就把任務(wù)加入到隊(duì)列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 雙重檢查,因?yàn)閺纳洗螜z查到進(jìn)入此方法,線程池可能已成為SHUTDOWN狀態(tài)
            int recheck = ctl.get();
            // 如果發(fā)現(xiàn)線程池不是運(yùn)行狀態(tài)的話,那就移除這個(gè)任務(wù)
            if (!isRunning(recheck) && remove(command))
                // 任務(wù)出隊(duì)成功,走拒絕策略
                reject(command);
             // 執(zhí)行到這說(shuō)明線程池是 running 狀態(tài),獲取線程池中的線程數(shù)量,判斷是否是 0
             // 【擔(dān)保機(jī)制】,保證線程池在 running 狀態(tài)下,最起碼得有一個(gè)線程在工作
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 走到這里說(shuō)明線程不是運(yùn)行狀態(tài),或者就是隊(duì)列滿了,offer返回false
        // 再次調(diào)用addWoker創(chuàng)建新的線程,如果不成功(一般是超過了線程池最大線程數(shù)量),執(zhí)行拒絕策略
        else if (!addWorker(command, false))
            // 執(zhí)行拒絕策略
            reject(command);
    }

這個(gè)方法是提交線程的主干邏輯:

  • 提交一個(gè)任務(wù)時(shí),如果運(yùn)行的線程少于corePoolSize,通過調(diào)用addWorker添加一個(gè)工作線程,直接開始運(yùn)行。
  • 如果工作線程大于等于corePoolSize,并且前面addWorker失敗時(shí),需要將任務(wù)加入到隊(duì)列中,加入成功后,做了一層雙重校驗(yàn),因?yàn)檫@個(gè)過程可能線程池狀態(tài)發(fā)生變化了,如果已經(jīng)關(guān)閉,那么要移除剛剛加入的這個(gè)任務(wù)。
  • 如果加入隊(duì)列失敗,說(shuō)明隊(duì)列滿了,這時(shí)候調(diào)用addWorker方法再次創(chuàng)建線程,如果返回false,有可能是超過最大線程數(shù)量了,那么就執(zhí)行拒絕策略。

addWorker方法也是一個(gè)很關(guān)鍵的方法, 添加線程到線程池,返回 true 表示創(chuàng)建 Worker 成功,且啟動(dòng)線程。

//  ThreadPoolExecutor#addWorker
// core == true 表示采用核心線程數(shù)量限制,false 表示采用 maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
     // 自旋【判斷當(dāng)前線程池狀態(tài)是否允許創(chuàng)建線程】,允許就設(shè)置線程數(shù)量 + 1
    retry:
    for (;;) {
         // 獲取 ctl 的值
        int c = ctl.get();
        // 獲取當(dāng)前線程池運(yùn)行狀態(tài)
        int rs = runStateOf(c);

         // 判斷當(dāng)前線程池狀態(tài)【是否允許添加線程】
        
       // 如果線程池狀態(tài)大于SHUTDOWN 或者是SHUTDOWN狀態(tài),隊(duì)列是空了的話,都不允許創(chuàng)建新的線程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // false,沒有創(chuàng)建線程
            return false;
        // 再次自旋
        for (;;) {
            // 獲取線程池中線程數(shù)量
            int wc = workerCountOf(c);
            // 如果線程數(shù)量超過閾值的話,返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
          // 記錄線程數(shù)量已經(jīng)加 1,類比于申請(qǐng)到了一塊令牌,條件失敗說(shuō)明其他線程修改了數(shù)量
            if (compareAndIncrementWorkerCount(c))
                // 申請(qǐng)成功,跳出了 retry 這個(gè) for 自旋
                break retry;
             // CAS 失敗,沒有成功的申請(qǐng)到令牌
            c = ctl.get(); 
            // 判斷當(dāng)前線程池狀態(tài)是否發(fā)生過變化,被其他線程修改了,可能其他線程調(diào)用了 shutdown() 方法
            if (runStateOf(c) != rs)
                // 重新回到retry的執(zhí)行點(diǎn)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 下面開始真正創(chuàng)建線程了
    // 運(yùn)行標(biāo)記,表示創(chuàng)建的 worker 是否已經(jīng)啟動(dòng),false未啟動(dòng)  true啟動(dòng)
    boolean workerStarted = false;
    // 添加標(biāo)記,表示創(chuàng)建的 worker 是否添加到池子中了,默認(rèn)false未添加,true是添加。
    boolean workerAdded = false;
    Worker w = null;
    try {
        //【創(chuàng)建 Worker,底層通過線程工廠 newThread 方法創(chuàng)建執(zhí)行線程,指定了首先執(zhí)行的任務(wù)】
        w = new Worker(firstTask);
        // 將新創(chuàng)建的 worker 節(jié)點(diǎn)中的線程賦值給 t
        final Thread t = w.thread;
        // 這里的判斷為了防止 程序員自定義的 ThreadFactory 實(shí)現(xiàn)類有 bug,創(chuàng)造不出線程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加互斥鎖,要添加 worker 了
            mainLock.lock();
            try {
                 // 獲取最新線程池運(yùn)行狀態(tài)
                int rs = runStateOf(ctl.get());
            	// 判斷線程池是否為RUNNING狀態(tài),不是再【判斷當(dāng)前是否為SHUTDOWN狀態(tài)且firstTask為空,特殊情況】
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                     // 當(dāng)線程start后,線程isAlive會(huì)返回true,這里還沒開始啟動(dòng)線程,如果被啟動(dòng)了就需要報(bào)錯(cuò)
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //將新建的 Worker 添加到線程池中
                    workers.add(w);
                    int s = workers.size();
                    // 當(dāng)前池中的線程數(shù)量是一個(gè)新高,更新 largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                     // 添加標(biāo)記置為 true
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
             // 添加成功就【啟動(dòng)線程執(zhí)行任務(wù)】
            if (workerAdded) {
                // 啟動(dòng)線程
                t.start();
                // 運(yùn)行標(biāo)記置為 true
                workerStarted = true;
            }
        }
    } finally {
        // 線程啟動(dòng)失敗
        if (! workerStarted)
            // 清理工作,比如從線程池中移除。
            addWorkerFailed(w);
    }
    return workerStarted;
}

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 持有線程池全局鎖,因?yàn)椴僮鞯氖蔷€程池相關(guān)的東西
    mainLock.lock();
    try {
        //條件成立需要將 worker 在 workers 中清理出去。
        if (w != null)
            workers.remove(w);
        // 將線程池計(jì)數(shù) -1,相當(dāng)于歸還令牌。
        decrementWorkerCount();
        // 嘗試停止線程池
        tryTerminate();
    } finally {
        //釋放線程池全局鎖。
        mainLock.unlock();
    }
}

這里注意一個(gè)點(diǎn),SHUTDOWN 狀態(tài)也能添加線程,但是要求新加的 Woker 沒有 firstTask,而且當(dāng)前 queue 不為空,所以創(chuàng)建一個(gè)線程來(lái)幫助線程池執(zhí)行隊(duì)列中的任務(wù)。

Woker運(yùn)行原理

Woker類是ThreadPoolExecutor類的內(nèi)部類,見明知意,它是承擔(dān)了一個(gè)“工人”干活,也就是工作線程的責(zé)任。

1.Worker類

每個(gè) Worker 對(duì)象有一個(gè)初始任務(wù),啟動(dòng) Worker 時(shí)優(yōu)先執(zhí)行,這也是造成線程池不公平的原因。Worker 繼承自 AQS,本身具有鎖的特性,采用獨(dú)占鎖模式,state = 0 表示未被占用,> 0 表示被占用,< 0 表示初始狀態(tài)不能被搶鎖。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	// worker 內(nèi)部封裝的工作線程
    final Thread thread;		
    // worker 第一個(gè)執(zhí)行的任務(wù),普通的 Runnable 實(shí)現(xiàn)類或者是 FutureTask
    Runnable firstTask;	
    // 記錄當(dāng)前 worker 所完成任務(wù)數(shù)量
    volatile long completedTasks;	
    
    // 構(gòu)造方法
    Worker(Runnable firstTask) {
        // 設(shè)置AQS獨(dú)占模式為初始化中狀態(tài),這個(gè)狀態(tài)不能被搶占鎖
       	setState(-1);
        // firstTask不為空時(shí),當(dāng)worker啟動(dòng)后,內(nèi)部線程會(huì)優(yōu)先執(zhí)行firstTask,執(zhí)行完后會(huì)到queue中去獲取下個(gè)任務(wù)
        this.firstTask = firstTask;
        // 使用線程工廠創(chuàng)建一個(gè)線程,并且【將當(dāng)前worker指定為Runnable】,所以thread啟動(dòng)時(shí)會(huì)調(diào)用 worker.run()
        this.thread = getThreadFactory().newThread(this);
    }
    // 不可重入鎖,重寫了AQS中的方法
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

 protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        // 設(shè)置state為0,開始搶鎖
        setState(0);
        return true;
    }
}

2.Worker的工作方法run

// Worker#run
public void run() {
    // 調(diào)用自身的runWoker方法
    runWorker(this);
}
// Worker#runWorker
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 獲取 worker 的 firstTask
    Runnable task = w.firstTask;
    // 引用置空,【防止復(fù)用該線程時(shí)重復(fù)執(zhí)行該任務(wù)】
    w.firstTask = null;
	// 初始化 worker 時(shí)設(shè)置 state = -1,表示不允許搶占鎖
    // 這里需要設(shè)置 state = 0 和 exclusiveOwnerThread = null,開始獨(dú)占模式搶鎖
    w.unlock(); 
   // true 表示發(fā)生異常退出,false 表示正常退出。
    boolean completedAbruptly = true;
    try {
        // firstTask 不是 null 就直接運(yùn)行,否則去 queue 中獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            // worker 加鎖,shutdown 時(shí)會(huì)判斷當(dāng)前 worker 狀態(tài),【根據(jù)獨(dú)占鎖狀態(tài)判斷是否空閑】
            w.lock();
            // 說(shuō)明線程池狀態(tài)大于 STOP,目前處于 STOP/TIDYING/TERMINATION,此時(shí)給線程一個(gè)中斷信號(hào)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                // 線程不是處于中斷的情況
                !wt.isInterrupted())
                 // 中斷線程,設(shè)置線程的中斷標(biāo)志位為 true
                wt.interrupt();
            try {
                // 任務(wù)執(zhí)行前的回調(diào),空實(shí)現(xiàn),可以在子類中自定義
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真正執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                     // 鉤子方法,【任務(wù)執(zhí)行的后置處理】
                    afterExecute(task, thrown);
                }
            } finally {
                // 將局部變量task置為null,代表任務(wù)執(zhí)行完成
                task = null;
                // 更新worker完成任務(wù)數(shù)量
                w.completedTasks++;
                // 解鎖
                w.unlock();
            }
        }
         // getTask()方法返回null時(shí)會(huì)走到這里,表示queue為空并且線程空閑超過?;顣r(shí)間,【當(dāng)前線程執(zhí)行退出邏輯】
        completedAbruptly = false;
    } finally {
        // 正常退出 completedAbruptly = false
       	// 異常退出 completedAbruptly = true,【從 task.run() 內(nèi)部拋出異?!繒r(shí),跳到這一行
        processWorkerExit(w, completedAbruptly);
    }
}

3.getTask() 獲取任務(wù)

這個(gè)方法主要做了下面幾件事情:

  • 從阻塞隊(duì)列中獲取任務(wù)
  • 如果當(dāng)前線程空閑時(shí)間超過 keepAliveTime 就會(huì)被回收,主要通過調(diào)用隊(duì)列的超時(shí)獲取接口poll(long timeout, TimeUnit unit)實(shí)現(xiàn)。
private Runnable getTask() {
     // 超時(shí)標(biāo)記,表示當(dāng)前線程獲取任務(wù)是否超時(shí),true 表示已超時(shí)
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
         // 獲取線程池當(dāng)前運(yùn)行狀態(tài)
        int rs = runStateOf(c);

        // 如果發(fā)現(xiàn)線程池被關(guān)閉了,直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 使用 CAS 自旋的方式讓 ctl 值 -1
            decrementWorkerCount();
            return null;
        }

        // 獲取線程池中的線程數(shù)量
        int wc = workerCountOf(c);

        //timed用來(lái)判斷當(dāng)前線程是否超過一定時(shí)間沒有獲取任務(wù)就進(jìn)行銷毀回收,true是需要,false不需要, 有兩種情況
        //1. allowCoreThreadTimeOut為true代表允許回收核心線程,那就無(wú)所謂了,全部線程都執(zhí)行超時(shí)回收
        //2. 線程數(shù)量大于核心線程數(shù),當(dāng)前線程認(rèn)為是非核心線程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 同時(shí)滿足下面1和2條件下,說(shuō)明線程要回收,直接返回null
        // 1. 如果線程數(shù)量超過最大線程數(shù) 或者 上面的timed和超時(shí)時(shí)間timedOut都為true
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 2.如果線程數(shù)量大于1并且隊(duì)列時(shí)空的情況
            && (wc > 1 || workQueue.isEmpty())) {
            // 使用 CAS 機(jī)制將 ctl 值 -1 ,減 1 成功的線程,返回 null,代表可以退出
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 從隊(duì)列中獲取任務(wù),有下面兩種方法
            // timed為true, 調(diào)用超時(shí)方法poll獲取任務(wù)
            // timed為false,調(diào)用阻塞方法take獲取
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            獲取任務(wù)為 null 說(shuō)明超時(shí)了,將超時(shí)標(biāo)記設(shè)置為 true,進(jìn)入下一次循環(huán),就可以銷毀這個(gè)線程了
            timedOut = true;
        } catch (InterruptedException retry) {
             // 阻塞線程被打斷后超時(shí)標(biāo)記置為 false,【說(shuō)明被打斷不算超時(shí)】,要繼續(xù)獲取,直到超時(shí)或者獲取到任務(wù)
            // 如果線程池 SHUTDOWN 狀態(tài)下的打斷,會(huì)在循環(huán)獲取任務(wù)前判斷,返回 null
            timedOut = false;
        }
    }
}

4.processWorkerExit()工作線程退出方法

// 正常退出 completedAbruptly = false,異常退出為 true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 條件成立代表當(dāng)前 worker 是發(fā)生異常退出的,task 任務(wù)執(zhí)行過程中向上拋出異常了
    if (completedAbruptly) 
        // 從異常時(shí)到這里 ctl 一直沒有 -1,需要在這里 -1
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    // 加鎖
    mainLock.lock();
    try {
        // 將當(dāng)前 worker 完成的 task 數(shù)量,匯總到線程池的 completedTaskCount
        completedTaskCount += w.completedTasks;
		// 將 worker 從線程池中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();	// 解鎖
    }
	// 嘗試停止線程池,喚醒下一個(gè)線程
    tryTerminate();

    int c = ctl.get();
    // 線程池不是停止?fàn)顟B(tài)就應(yīng)該有線程運(yùn)行【擔(dān)保機(jī)制】
    if (runStateLessThan(c, STOP)) {
        // 正常退出的邏輯,是對(duì)空閑線程回收,不是執(zhí)行出錯(cuò)
        if (!completedAbruptly) {
            // 根據(jù)是否回收核心線程確定【線程池中的線程數(shù)量最小值】
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 最小值為 0,但是線程隊(duì)列不為空,需要一個(gè)線程來(lái)完成任務(wù)擔(dān)保機(jī)制
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 線程池中的線程數(shù)量大于最小值可以直接返回
            if (workerCountOf(c) >= min)
                return;
        }
        // 執(zhí)行 task 時(shí)發(fā)生異常,有個(gè)線程因?yàn)楫惓=K止了,需要添加
        // 或者線程池中的數(shù)量小于最小值,這里要?jiǎng)?chuàng)建一個(gè)新 worker 加進(jìn)線程池
        addWorker(null, false);
    }
}

總結(jié)

本文主要從源碼層面分析了線程池的運(yùn)行機(jī)理,總算知道了execute方法背后是如何運(yùn)轉(zhuǎn)的。

以上就是Java線程池源碼的深度解析的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 詳解Nacos配置中心的實(shí)現(xiàn)

    詳解Nacos配置中心的實(shí)現(xiàn)

    Spring Cloud Alibaba 是阿里巴巴提供的一站式微服務(wù)開發(fā)解決方案。而 Nacos 作為 Spring Cloud Alibaba 的核心組件之一,提供了兩個(gè)非常重要的功能:注冊(cè)中心和配置中心,我們今天來(lái)了解和實(shí)現(xiàn)一下二者
    2022-08-08
  • Swing常用組件之多行文本區(qū)JTextArea

    Swing常用組件之多行文本區(qū)JTextArea

    這篇文章主要為大家詳細(xì)介紹了Swing常用組件之多行文本區(qū)JTextArea,感興趣的朋友可以參考一下
    2016-05-05
  • Java?web實(shí)現(xiàn)頭像上傳以及讀取顯示

    Java?web實(shí)現(xiàn)頭像上傳以及讀取顯示

    這篇文章主要為大家詳細(xì)介紹了Java?web實(shí)現(xiàn)頭像上傳以及讀取顯示,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • IDEA進(jìn)程已結(jié)束,退出代碼-1073741819 (0xC0000005)的bug

    IDEA進(jìn)程已結(jié)束,退出代碼-1073741819 (0xC0000005)的bug

    這篇文章主要介紹了IDEA進(jìn)程已結(jié)束,退出代碼-1073741819 (0xC0000005)的bug,本文通過實(shí)例代碼圖文的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-04-04
  • spring源碼閱讀--aop實(shí)現(xiàn)原理講解

    spring源碼閱讀--aop實(shí)現(xiàn)原理講解

    這篇文章主要介紹了spring源碼閱讀--aop實(shí)現(xiàn)原理講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Spring Boot整合logback一個(gè)簡(jiǎn)單的日志集成架構(gòu)

    Spring Boot整合logback一個(gè)簡(jiǎn)單的日志集成架構(gòu)

    今天小編就為大家分享一篇關(guān)于Spring Boot整合logback一個(gè)簡(jiǎn)單的日志集成架構(gòu),小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-01-01
  • SpringBoot 對(duì)象存儲(chǔ) MinIO的詳細(xì)過程

    SpringBoot 對(duì)象存儲(chǔ) MinIO的詳細(xì)過程

    MinIO 是一個(gè)基于 Go 實(shí)現(xiàn)的高性能、兼容 S3 協(xié)議的對(duì)象存儲(chǔ),它適合存儲(chǔ)海量的非結(jié)構(gòu)化的數(shù)據(jù),這篇文章主要介紹了SpringBoot 對(duì)象存儲(chǔ) MinIO,需要的朋友可以參考下
    2023-07-07
  • 關(guān)于Java Spring三級(jí)緩存和循環(huán)依賴的深入理解

    關(guān)于Java Spring三級(jí)緩存和循環(huán)依賴的深入理解

    對(duì)于循環(huán)依賴,我相信讀者無(wú)論只是聽過也好,還是有過了解也好,至少都有所接觸。但是我發(fā)現(xiàn)目前許多博客對(duì)于循環(huán)依賴的講解并不清楚,都提到了Spring的循環(huán)依賴解決方案是三級(jí)緩存,但是三級(jí)緩存每一級(jí)的作用是什么,很多博客都沒有提到,本篇文章帶你深入了解
    2021-09-09
  • Java利用ffmpeg實(shí)現(xiàn)視頻MP4轉(zhuǎn)m3u8

    Java利用ffmpeg實(shí)現(xiàn)視頻MP4轉(zhuǎn)m3u8

    本文綜合了下網(wǎng)上教程,從ffmpeg工具轉(zhuǎn)碼,ffmpeg視頻播放,java語(yǔ)言操控ffmpeg轉(zhuǎn)碼,轉(zhuǎn)碼后視頻上傳阿里云oss,四個(gè)方面完整記錄下這個(gè)流程,需要的朋友可以參考下
    2024-02-02
  • Java自定義equals產(chǎn)生的問題分析

    Java自定義equals產(chǎn)生的問題分析

    這篇文章主要介紹了Java自定義equals時(shí)super.equals帶來(lái)的問題分析,總的來(lái)說(shuō)這并不是一道難題,那為什么要拿出這道題介紹?拿出這道題真正想要傳達(dá)的是解題的思路,以及不斷優(yōu)化探尋最優(yōu)解的過程。希望通過這道題能給你帶來(lái)一種解題優(yōu)化的思路
    2023-01-01

最新評(píng)論