Java線程池之線程復(fù)用原理全面解析
什么是線程復(fù)用
在Java中,我們正常創(chuàng)建線程執(zhí)行任務(wù),一般都是一條線程綁定一個(gè)Runnable執(zhí)行任務(wù)。
而Runnable實(shí)際只是一個(gè)普通接口,真正要執(zhí)行,則還是利用了Thread類的run方法。
這個(gè)rurn方法由native本地方法start0進(jìn)行調(diào)用。
我們看Thread類的run方法實(shí)現(xiàn)
/* What will be run. */ private Runnable target; /** * If this thread was constructed using a separate * <code>Runnable</code> run object, then that * <code>Runnable</code> object's <code>run</code> method is called; * otherwise, this method does nothing and returns. * <p> * Subclasses of <code>Thread</code> should override this method. * * @see #start() * @see #stop() * @see #Thread(ThreadGroup, Runnable, String) */ @Override public void run() { if (target != null) { target.run(); } }
很明顯,Thread類的run方法就是使用構(gòu)造Thread類傳入來(lái)的Runnable對(duì)象,執(zhí)行Runnable的run方法。
這樣可以很好的將任務(wù)和Thread類解耦,如果繼承Thread類再去重寫run方法當(dāng)然也是可以,但卻耦合了,并且Java是單繼承,所以繼承Thread類這種方式通常不會(huì)使用,沒(méi)有任何好處。
現(xiàn)在問(wèn)題是,一個(gè)線程只能執(zhí)行一個(gè)Runnable對(duì)象,那么這條線程它就是不能復(fù)用的,完成任務(wù)它就該Terminated了。
如果系統(tǒng)任務(wù)很多,頻繁創(chuàng)建線程帶來(lái)的開銷大,線程數(shù)量不可控導(dǎo)致系統(tǒng)處于一種不安全的狀況,系統(tǒng)隨時(shí)可能被大量線程搞跨,于是線程池就出現(xiàn)了。
線程池要解決的問(wèn)題就是用少量線程處理更多的任務(wù),這樣一來(lái),線程池首先要實(shí)現(xiàn)的就是線程復(fù)用。
不能說(shuō)還是一條線程只處理一個(gè)Runnable任務(wù),而是一條線程處理無(wú)數(shù)Runnable任務(wù)。
最容易想到的方案就是將Runnable對(duì)象放到隊(duì)列中,在Thread類的run方法中不斷從隊(duì)列中拉取任務(wù)執(zhí)行,這樣一來(lái)就實(shí)現(xiàn)了線程復(fù)用。
當(dāng)然,實(shí)際線程池也差不多是這么干的,下面我們?cè)敿?xì)看一下線程池實(shí)現(xiàn)線程復(fù)用的原理。
線程池處理任務(wù)的過(guò)程
在線程池原理解析中有詳述線程池創(chuàng)建線程及處理任務(wù)的過(guò)程。
這里再次簡(jiǎn)單看一下流程圖以方便理解下面的線程復(fù)用原理解析。
線程復(fù)用原理解析
線程處理任務(wù)過(guò)程源碼解析
首先我們看看線程池是怎么使用的
import cn.hutool.core.thread.ThreadFactoryBuilder; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author kangming.ning * @date 2023-02-24 16:27 * @since 1.0 **/ public class CustomThreadPool1 { private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNamePrefix("線程池-").build(); private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) throws InterruptedException { Runnable r = () -> { System.out.println(Thread.currentThread().getName() + " is running"); }; for (int i = 0; i < 35; i++) { Thread.sleep(1000); threadPoolExecutor.submit(r); } } }
可見,threadPoolExecutor的sumit方法就是用來(lái)提交任務(wù)的,于是,從這個(gè)方法開始分析源碼,把源碼的關(guān)注點(diǎn)放在線程復(fù)用部分。
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
第一句只是用來(lái)包裝一下有返回值的任務(wù),不必關(guān)注,重點(diǎn)看execute(ftask)這句。
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
代碼量不多,信息量極大。
注釋的內(nèi)容就是在解釋線程池執(zhí)行任務(wù)的處理過(guò)程,這個(gè)看上面的流程圖即可。
任務(wù)如果為空直接拋空指針異常。
下面看第一個(gè)if語(yǔ)句
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
如果worker數(shù)量少于核心線程數(shù),則通過(guò)addWorker(command, true)方法添加一個(gè)worker。這里要注意,線程池把每一條線程都封裝成了Worker的實(shí)例。
addWorker方法的作用是在線程池中創(chuàng)建一個(gè)線程并執(zhí)行第一個(gè)參數(shù)傳入的任務(wù),它的第二個(gè)參數(shù)是個(gè)boolean值,如果傳入 true 則代表增加線程時(shí)判斷當(dāng)前線程是否少于 corePoolSize,小于則增加新線程,大于等于則不增加;如果傳入false則使用maximumPoolSize來(lái)判斷是否增加新線程。
接下來(lái)看下面第二個(gè)if的代碼
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
執(zhí)行到這里說(shuō)明核心線程數(shù)已滿或者說(shuō)addWorker失敗了。此時(shí)先檢查線程池是否為運(yùn)行狀態(tài),是的話直接把任務(wù)放隊(duì)列,這跟上面的流程圖是一致的,核心線程數(shù)滿則放隊(duì)列。
當(dāng)然當(dāng)任務(wù)提交成功后還是會(huì)重新檢查線程池的狀態(tài),如果線程池沒(méi)在跑則會(huì)移除任務(wù)并且執(zhí)行拒絕策略。
再看里面的else if分支
if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);
進(jìn)入else if分支說(shuō)明線程池是在運(yùn)行的,這里是檢查一下是否有線程可供使用,雖說(shuō)上面已經(jīng)檢查過(guò)目前的線程數(shù)已大于核心線程數(shù),但不排除核心線程數(shù)設(shè)置為0 這種情況,這樣一來(lái),任務(wù)添加后缺沒(méi)線程去執(zhí)行,這種情況是不允許的。
再往下看最后一句else if代碼
else if (!addWorker(command, false)) reject(command);
能執(zhí)行到這里,說(shuō)明要么是線程池不在運(yùn)行中,要么就是核心線程和隊(duì)列都滿了,此時(shí)需要開啟線程池的后備力量,嘗試添加非核心線程直到線程數(shù)達(dá)到最大線程數(shù)限制,注意到addWorker方法第二個(gè)參數(shù)傳了false,正是添加線程時(shí)使用最大線程數(shù)限制來(lái)判斷是否添加新線程。
假設(shè)添加失敗意味著最大線程數(shù)也達(dá)到了最大值并且沒(méi)空閑線程去執(zhí)行當(dāng)前任務(wù),此時(shí)執(zhí)行reject拒絕策略。
線程復(fù)用源碼解析
通過(guò)上面的解析我們可以看到,添加線程以執(zhí)行任務(wù)的核心方法是addWorker,大概看一下Worker的代碼
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
可見,Worker對(duì)Thread進(jìn)行了封裝,它本身也是一個(gè)Runnable對(duì)象,內(nèi)部的Thread對(duì)象則是真正用來(lái)執(zhí)行任務(wù)的線程對(duì)象。
因此添加Worker實(shí)則就是在線程池中添加運(yùn)行任務(wù)的線程,可以看出在Worker的構(gòu)造函數(shù)中新建了一條線程并且把引用賦值給了thread對(duì)象。
而在上面的addWorker方法中start了這條線程,而這條線程的Runnable對(duì)象正是Worker對(duì)象自身。
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
既然addWorker方法執(zhí)行了線程的start方法,因此Worker類里面的run方法將被系統(tǒng)調(diào)度
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
里面只有一個(gè)runWorker方法,并且把Worker對(duì)象傳了進(jìn)去,明顯,runWorker是實(shí)現(xiàn)線程復(fù)用的方法 。
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
代碼不多,注釋很多,核心意思就是這是一個(gè)死循環(huán),不斷從隊(duì)列獲取任務(wù)進(jìn)行執(zhí)行。
通過(guò)上面代碼可以清晰的看出,一開始將firstTask賦值給task Runnable對(duì)象,然后下面有個(gè)while死循環(huán),不斷的從隊(duì)列獲取task進(jìn)行執(zhí)行,里面的核心邏輯就是task.run(),Runnable對(duì)象的run方法由這條Worker線程像調(diào)用普通方法一樣的調(diào)用,這個(gè)就是線程復(fù)用的原理。
將Runnable對(duì)象放隊(duì)列,然后在一個(gè)主循環(huán)里面不斷從隊(duì)列里獲取任務(wù)進(jìn)行執(zhí)行。
最后看一下getTask方法
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
可見,里面就是從隊(duì)列里面獲取一個(gè)Runnable對(duì)象進(jìn)行返回而已。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot controller 增加指定前綴的兩種實(shí)現(xiàn)方法
這篇文章主要介紹了springboot controller 增加指定前綴的兩種實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02java模擬實(shí)現(xiàn)銀行ATM機(jī)操作
這篇文章主要為大家詳細(xì)介紹了java模擬實(shí)現(xiàn)銀行ATM機(jī)操作,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05Springboot實(shí)現(xiàn)TLS雙向認(rèn)證的方法
本文介紹了使用keytool生成和管理自簽名CA證書、服務(wù)器證書和客戶端證書的方法,適合Java生態(tài)系統(tǒng),通過(guò)配置信任庫(kù)和服務(wù)器/客戶端配置,實(shí)現(xiàn)了Spring Boot中的TLS雙向認(rèn)證,感興趣的朋友一起看看吧2025-02-02Springboot中的Validation參數(shù)校驗(yàn)詳解
這篇文章主要介紹了Springboot中的Validation參數(shù)校驗(yàn)詳解,Springboot參數(shù)校驗(yàn)是一種常用的驗(yàn)證機(jī)制,在傳遞參數(shù)時(shí)進(jìn)行校驗(yàn),以確保參數(shù)的有效性和正確性,該機(jī)制可以幫助開發(fā)者在代碼實(shí)現(xiàn)前就避免一些常見的錯(cuò)誤,需要的朋友可以參考下2023-10-10分布式開發(fā)醫(yī)療掛號(hào)系統(tǒng)數(shù)據(jù)字典模塊前后端實(shí)現(xiàn)
這篇文章主要為大家介紹了分布式開發(fā)醫(yī)療掛號(hào)系統(tǒng)數(shù)據(jù)字典模塊前后端實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04淺談SpringMVC之視圖解析器(ViewResolver)
本篇文章主要介紹了淺談SpringMVC之視圖解析器(ViewResolver),具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08使用SpringBoot創(chuàng)建一個(gè)RESTful API的詳細(xì)步驟
使用 Java 的 Spring Boot 創(chuàng)建 RESTful API 可以滿足多種開發(fā)場(chǎng)景,它提供了快速開發(fā)、易于配置、可擴(kuò)展、可維護(hù)的優(yōu)點(diǎn),尤其適合現(xiàn)代軟件開發(fā)的需求,幫助你快速構(gòu)建出高性能的后端服務(wù),需要的朋友可以參考下2025-01-01Java中synchronized關(guān)鍵字修飾方法同步的用法詳解
synchronized可以用來(lái)同步靜態(tài)和非靜態(tài)方法,下面就具體來(lái)看一下Java中synchronized關(guān)鍵字修飾方法同步的用法詳解:2016-06-06