java線程池中Worker線程執(zhí)行流程原理解析
引言
在《【高并發(fā)】別鬧了,這樣理解線程池執(zhí)行任務(wù)的核心流程才正確??!》一文中我們深度分析了線程池執(zhí)行任務(wù)的核心流程,在ThreadPoolExecutor類的addWorker(Runnable, boolean)方法中,使用CAS安全的更新線程的數(shù)量之后,接下來就是創(chuàng)建新的Worker線程執(zhí)行任務(wù),所以,我們先來分析下Worker類的源碼。
Worker類分析
Worker類從類的結(jié)構(gòu)上來看,繼承了AQS(AbstractQueuedSynchronizer類)并實(shí)現(xiàn)了Runnable接口。本質(zhì)上,Worker類既是一個(gè)同步組件,也是一個(gè)執(zhí)行任務(wù)的線程。
接下來,我們看下Worker類的源碼,如下所示。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //執(zhí)行任務(wù)的線程類 final Thread thread; //初始化執(zhí)行的任務(wù),第一次執(zhí)行的任務(wù) Runnable firstTask; //完成任務(wù)的計(jì)數(shù) volatile long completedTasks; //Worker類的構(gòu)造方法,初始化任務(wù)并調(diào)用線程工廠創(chuàng)建執(zhí)行任務(wù)的線程 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //重寫Runnable接口的run()方法 public void run() { //調(diào)用ThreadPoolExecutor類的runWorker(Worker)方法 runWorker(this); } //檢測是否是否獲取到鎖 //state=0表示未獲取到鎖 //state=1表示已獲取到鎖 protected boolean isHeldExclusively() { return getState() != 0; } //使用AQS設(shè)置線程狀態(tài) protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
在Worker類的構(gòu)造方法中,可以看出,首先將同步狀態(tài)state設(shè)置為-1,設(shè)置為-1是為了防止runWorker方法運(yùn)行之前被中斷。這是因?yàn)槿绻渌€程調(diào)用線程池的shutdownNow()方法時(shí),如果Worker類中的state狀態(tài)的值大于0,則會(huì)中斷線程,如果state狀態(tài)的值為-1,則不會(huì)中斷線程。
Worker類實(shí)現(xiàn)了Runnable接口,需要重寫run方法,而Worker的run方法本質(zhì)上調(diào)用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會(huì)首先調(diào)用unlock方法,該方法會(huì)將state置為0,所以這個(gè)時(shí)候調(diào)用shutDownNow方法就會(huì)中斷當(dāng)前線程,而這個(gè)時(shí)候已經(jīng)進(jìn)入了runWork方法,就不會(huì)在還沒有執(zhí)行runWorker方法的時(shí)候就中斷線程。
注意:大家需要重點(diǎn)理解Worker類的實(shí)現(xiàn)。
Worker類中調(diào)用了ThreadPoolExecutor類的runWorker(Worker)方法。接下來,我們一起看下ThreadPoolExecutor類的runWorker(Worker)方法的實(shí)現(xiàn)。
runWorker(Worker)方法
首先,我們看下RunWorker(Worker)方法的源碼,如下所示。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //釋放鎖,將state設(shè)置為0,允許中斷任務(wù)的執(zhí)行 w.unlock(); boolean completedAbruptly = true; try { //如果任務(wù)不為空,或者從任務(wù)隊(duì)列中獲取的任務(wù)不為空,則執(zhí)行while循環(huán) while (task != null || (task = getTask()) != null) { //如果任務(wù)不為空,則獲取Worker工作線程的獨(dú)占鎖 w.lock(); //如果線程已經(jīng)停止,或者中斷線程后線程終止并且沒有成功中斷線程 //大家好好理解下這個(gè)邏輯 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中斷線程 wt.interrupt(); try { //執(zhí)行任務(wù)前執(zhí)行的邏輯 beforeExecute(wt, task); Throwable thrown = null; try { //調(diào)用Runable接口的run方法執(zhí)行任務(wù) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執(zhí)行任務(wù)后執(zhí)行的邏輯 afterExecute(task, thrown); } } finally { //任務(wù)執(zhí)行完成后,將其設(shè)置為空 task = null; //完成的任務(wù)數(shù)量加1 w.completedTasks++; //釋放工作線程獲得的鎖 w.unlock(); } } completedAbruptly = false; } finally { //執(zhí)行退出Worker線程的邏輯 processWorkerExit(w, completedAbruptly); } }
這里,我們拆解runWorker(Worker)方法。
(1)獲取當(dāng)前線程的句柄和工作線程中的任務(wù),并將工作線程中的任務(wù)設(shè)置為空,執(zhí)行unlock方法釋放鎖,將state狀態(tài)設(shè)置為0,此時(shí)可以中斷工作線程,代碼如下所示。
Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //釋放鎖,將state設(shè)置為0,允許中斷任務(wù)的執(zhí)行 w.unlock();
(2)在while循環(huán)中進(jìn)行判斷,如果任務(wù)不為空,或者從任務(wù)隊(duì)列中獲取的任務(wù)不為空,則執(zhí)行while循環(huán),否則,調(diào)用processWorkerExit(Worker, boolean)方法退出Worker工作線程。
while (task != null || (task = getTask()) != null)
(3)如果滿足while的循環(huán)條件,首先獲取工作線程內(nèi)部的獨(dú)占鎖,并執(zhí)行一系列的邏輯判斷來檢測是否需要中斷當(dāng)前線程的執(zhí)行,代碼如下所示。
//如果任務(wù)不為空,則獲取Worker工作線程的獨(dú)占鎖 w.lock(); //如果線程已經(jīng)停止,或者中斷線程后線程終止并且沒有成功中斷線程 //大家好好理解下這個(gè)邏輯 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中斷線程 wt.interrupt();
(4)調(diào)用執(zhí)行任務(wù)前執(zhí)行的邏輯,如下所示
//執(zhí)行任務(wù)前執(zhí)行的邏輯 beforeExecute(wt, task);
(5)調(diào)用Runable接口的run方法執(zhí)行任務(wù)
//調(diào)用Runable接口的run方法執(zhí)行任務(wù) task.run();
(6)調(diào)用執(zhí)行任務(wù)后執(zhí)行的邏輯
//執(zhí)行任務(wù)后執(zhí)行的邏輯 afterExecute(task, thrown);
(7)將完成的任務(wù)設(shè)置為空,完成的任務(wù)數(shù)量加1并釋放工作線程的鎖。
//任務(wù)執(zhí)行完成后,將其設(shè)置為空 task = null; //完成的任務(wù)數(shù)量加1 w.completedTasks++; //釋放工作線程獲得的鎖 w.unlock();
(8)退出Worker線程的執(zhí)行,如下所示
//執(zhí)行退出Worker線程的邏輯 processWorkerExit(w, completedAbruptly);
從代碼分析上可以看到,當(dāng)從Worker線程中獲取的任務(wù)為空時(shí),會(huì)調(diào)用getTask()方法從任務(wù)隊(duì)列中獲取任務(wù),接下來,我們看下getTask()方法的實(shí)現(xiàn)。
getTask()方法
我們先來看下getTask()方法的源代碼,如下所示。
private Runnable getTask() { //輪詢是否超時(shí)的標(biāo)識(shí) boolean timedOut = false; //自旋for循環(huán) for (;;) { //獲取ctl int c = ctl.get(); //獲取線程池的狀態(tài) int rs = runStateOf(c); //檢測任務(wù)隊(duì)列是否在線程池停止或關(guān)閉的時(shí)候?yàn)榭? //也就是說任務(wù)隊(duì)列是否在線程池未正常運(yùn)行時(shí)為空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //減少Worker線程的數(shù)量 decrementWorkerCount(); return null; } //獲取線程池中線程的數(shù)量 int wc = workerCountOf(c); //檢測當(dāng)前線程池中的線程數(shù)量是否大于corePoolSize的值或者是否正在等待執(zhí)行任務(wù) boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果線程池中的線程數(shù)量大于corePoolSize //獲取大于corePoolSize或者是否正在等待執(zhí)行任務(wù)并且輪詢超時(shí) //并且當(dāng)前線程池中的線程數(shù)量大于1或者任務(wù)隊(duì)列為空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //成功減少線程池中的工作線程數(shù)量 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //從任務(wù)隊(duì)列中獲取任務(wù) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //任務(wù)不為空直接返回任務(wù) if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask()方法的邏輯比較簡單,大家看源碼就可以了,我這里就不重復(fù)描述了。
接下來,我們看下在正式調(diào)用Runnable的run()方法前后,執(zhí)行的beforeExecute方法和afterExecute方法。
beforeExecute(Thread, Runnable)方法
beforeExecute(Thread, Runnable)方法的源代碼如下所示。
protected void beforeExecute(Thread t, Runnable r) { }
可以看到,beforeExecute(Thread, Runnable)方法的方法體為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫beforeExecute(Thread, Runnable)方法,使得線程池正式執(zhí)行任務(wù)之前,執(zhí)行我們自己定義的業(yè)務(wù)邏輯。
afterExecute(Runnable, Throwable)方法
afterExecute(Runnable, Throwable)方法的源代碼如下所示。
protected void afterExecute(Runnable r, Throwable t) { }
可以看到,afterExecute(Runnable, Throwable)方法的方法體同樣為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫afterExecute(Runnable, Throwable)方法,使得線程池在執(zhí)行任務(wù)之后執(zhí)行我們自己定義的業(yè)務(wù)邏輯。
接下來,就是退出工作線程的processWorkerExit(Worker, boolean)方法。
processWorkerExit(Worker, boolean)方法
processWorkerExit(Worker, boolean)方法的邏輯主要是執(zhí)行退出Worker線程,并且對(duì)一些資源進(jìn)行清理,源代碼如下所示。
private void processWorkerExit(Worker w, boolean completedAbruptly) { //執(zhí)行過程中出現(xiàn)了異常,突然中斷 if (completedAbruptly) //將工作線程的數(shù)量減1 decrementWorkerCount(); //獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //累加完成的任務(wù)數(shù)量 completedTaskCount += w.completedTasks; //將完成的任務(wù)從workers集合中移除 workers.remove(w); } finally { //釋放鎖 mainLock.unlock(); } //嘗試終止工作線程的執(zhí)行 tryTerminate(); //獲取ctl int c = ctl.get(); //判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN) if (runStateLessThan(c, STOP)) { //如果沒有突然中斷完成 if (!completedAbruptly) { //如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果min為0并且工作隊(duì)列不為空 if (min == 0 && ! workQueue.isEmpty()) //min的值設(shè)置為1 min = 1; //如果線程池中的線程數(shù)量大于min的值 if (workerCountOf(c) >= min) //返回,不再執(zhí)行程序 return; } //調(diào)用addWorker方法 addWorker(null, false); } }
接下來,我們拆解processWorkerExit(Worker, boolean)方法。
(1)執(zhí)行過程中出現(xiàn)了異常,突然中斷執(zhí)行,則將工作線程數(shù)量減1,如下所示。
//執(zhí)行過程中出現(xiàn)了異常,突然中斷 if (completedAbruptly) //將工作線程的數(shù)量減1 decrementWorkerCount();
(2)獲取鎖累加完成的任務(wù)數(shù)量,并將完成的任務(wù)從workers集合中移除,并釋放,如下所示。
//獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //累加完成的任務(wù)數(shù)量 completedTaskCount += w.completedTasks; //將完成的任務(wù)從workers集合中移除 workers.remove(w); } finally { //釋放鎖 mainLock.unlock(); }
(3)嘗試終止工作線程的執(zhí)行
//嘗試終止工作線程的執(zhí)行 tryTerminate();
(4)處判斷當(dāng)前線程池中的線程個(gè)數(shù)是否小于核心線程數(shù),如果是,需要新增一個(gè)線程保證有足夠的線程可以執(zhí)行任務(wù)隊(duì)列中的任務(wù)或者提交的任務(wù)。
//獲取ctl int c = ctl.get(); //判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN) if (runStateLessThan(c, STOP)) { //如果沒有突然中斷完成 if (!completedAbruptly) { //如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果min為0并且工作隊(duì)列不為空 if (min == 0 && ! workQueue.isEmpty()) //min的值設(shè)置為1 min = 1; //如果線程池中的線程數(shù)量大于min的值 if (workerCountOf(c) >= min) //返回,不再執(zhí)行程序 return; } //調(diào)用addWorker方法 addWorker(null, false); }
接下來,我們看下tryTerminate()方法。
tryTerminate()方法
tryTerminate()方法的源代碼如下所示。
final void tryTerminate() { //自旋for循環(huán) for (;;) { //獲取ctl int c = ctl.get(); //如果線程池的狀態(tài)為RUNNING //或者狀態(tài)大于TIDYING //或者狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空 //直接返回程序,不再執(zhí)行后續(xù)邏輯 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果當(dāng)前線程池中的線程數(shù)量不等于0 if (workerCountOf(c) != 0) { //中斷線程的執(zhí)行 interruptIdleWorkers(ONLY_ONE); return; } //獲取線程池的全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //通過CAS將線程池的狀態(tài)設(shè)置為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //調(diào)用terminated()方法 terminated(); } finally { //將線程池狀態(tài)設(shè)置為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //喚醒所有因?yàn)檎{(diào)用線程池的awaitTermination方法而被阻塞的線程 termination.signalAll(); } return; } } finally { //釋放鎖 mainLock.unlock(); } } }
(1)獲取ctl,根據(jù)情況設(shè)置線程池狀態(tài)或者中斷線程的執(zhí)行,并返回。
//獲取ctl int c = ctl.get(); //如果線程池的狀態(tài)為RUNNING //或者狀態(tài)大于TIDYING //或者狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空 //直接返回程序,不再執(zhí)行后續(xù)邏輯 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果當(dāng)前線程池中的線程數(shù)量不等于0 if (workerCountOf(c) != 0) { //中斷線程的執(zhí)行 interruptIdleWorkers(ONLY_ONE); return; }
(2)獲取全局鎖,通過CAS設(shè)置線程池的狀態(tài),調(diào)用terminated()方法執(zhí)行邏輯,最終將線程池的狀態(tài)設(shè)置為TERMINATED,喚醒所有因?yàn)檎{(diào)用線程池的awaitTermination方法而被阻塞的線程,最終釋放鎖,如下所示。
//獲取線程池的全局 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //通過CAS將線程池的狀態(tài)設(shè)置為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //調(diào)用terminated()方法 terminated(); } finally { //將線程池狀態(tài)設(shè)置為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //喚醒所有因?yàn)檎{(diào)用線程池的awaitTermination方法而被阻塞的線程 termination.signalAll(); } return; } } finally { //釋放鎖 mainLock.unlock(); }
接下來,看下terminated()方法。
terminated()方法
terminated()方法的源代碼如下所示。
protected void terminated() { }
可以看到,terminated()方法的方法體為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫terminated()方法,值得Worker線程調(diào)用tryTerminate()方法時(shí)執(zhí)行我們自己定義的terminated()方法的業(yè)務(wù)邏輯。
以上就是java線程池中Worker線程執(zhí)行流程原理解析的詳細(xì)內(nèi)容,更多關(guān)于java線程池Worker線程執(zhí)行的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用java代碼獲取新浪微博應(yīng)用的access token代碼實(shí)例
這篇文章主要介紹了使用java代碼獲取新浪微博應(yīng)用的access token實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05Java實(shí)現(xiàn)驗(yàn)證碼具體代碼(圖片、漢字)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)驗(yàn)證碼具體代碼,包括圖片驗(yàn)證碼、漢字驗(yàn)證碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-01-01SpringBoot利用注解來實(shí)現(xiàn)Redis分布式鎖
有些業(yè)務(wù)請(qǐng)求,屬于耗時(shí)操作,需要加鎖,防止后續(xù)的并發(fā)操作,同時(shí)對(duì)數(shù)據(jù)庫的數(shù)據(jù)進(jìn)行操作,需要避免對(duì)之前的業(yè)務(wù)造成影響。本文將利用注解來實(shí)現(xiàn)Redis分布式鎖,需要的可以參考一下2022-09-09SpringBoot中整合Ehcache實(shí)現(xiàn)熱點(diǎn)數(shù)據(jù)緩存的詳細(xì)過程
這篇文章主要介紹了SpringBoot中整合Ehcache實(shí)現(xiàn)熱點(diǎn)數(shù)據(jù)緩存,SpringBoot 中使用 Ehcache 比較簡單,只需要簡單配置,說白了還是 Spring Cache 的用法,合理使用緩存機(jī)制,可以很好地提高項(xiàng)目的響應(yīng)速度,需要的朋友可以參考下2023-04-04Mybatis與Jpa的區(qū)別和性能對(duì)比總結(jié)
mybatis和jpa兩個(gè)持久層框架,從底層到用法都不同,但是實(shí)現(xiàn)的功能是一樣的,所以說一直以來頗有爭議,所以下面這篇文章主要給大家介紹了關(guān)于Mybatis與Jpa的區(qū)別和性能對(duì)比的相關(guān)資料,需要的朋友可以參考下2021-06-06Spring中property-placeholder的使用與解析詳解
本篇文章主要介紹了Spring中property-placeholder的使用與解析詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05