深入理解Java編程線程池的實現(xiàn)原理
在前面的文章中,我們使用線程的時候就去創(chuàng)建一個線程,這樣實現(xiàn)起來非常簡便,但是就會有一個問題:
如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。
那么有沒有一種辦法使得線程可以復(fù)用,就是執(zhí)行完一個任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)?
在Java中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然后再講述它的實現(xiàn)原理,接著給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。
以下是本文的目錄大綱:
一.Java中的ThreadPoolExecutor類
二.深入剖析線程池實現(xiàn)原理
三.使用示例
四.如何合理配置線程池的大小
若有不正之處請多多諒解,并歡迎批評指正。
一.Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現(xiàn)源碼。
在ThreadPoolExecutor類中提供了四個構(gòu)造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個構(gòu)造器,事實上,通過觀察每個構(gòu)造器的源碼具體實現(xiàn),發(fā)現(xiàn)前面三個構(gòu)造器都是調(diào)用的第四個構(gòu)造器進行的初始化工作。
下面解釋下一下構(gòu)造器中各個參數(shù)的含義:
corePoolSize:核心池的大小,這個參數(shù)跟后面講述的線程池的實現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務(wù)放到緩存隊列當中;
maximumPoolSize:線程池最大線程數(shù),這個參數(shù)也是一個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個線程;
keepAliveTime:表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;
unit:參數(shù)keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
workQueue:一個阻塞隊列,用來存儲等待執(zhí)行的任務(wù),這個參數(shù)的選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關(guān)。
threadFactory:線程工廠,主要用來創(chuàng)建線程;
handler:表示當拒絕處理任務(wù)時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
具體參數(shù)的配置與線程池的關(guān)系將在下一節(jié)講述。
從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現(xiàn):
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }
AbstractExecutorService是一個抽象類,它實現(xiàn)了ExecutorService接口。
我們接著看ExecutorService接口的實現(xiàn):
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現(xiàn):
public interface Executor { void execute(Runnable command); }
到這里,大家應(yīng)該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關(guān)系了。
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現(xiàn)了ExecutorService接口,基本實現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute() submit() shutdown() shutdownNow()
execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現(xiàn),這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務(wù),交由線程池去執(zhí)行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn),在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實現(xiàn),會發(fā)現(xiàn)它實際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。
二.深入剖析線程池實現(xiàn)原理
在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現(xiàn)原理,將從下面幾個方面講解:
1.線程池狀態(tài)
2.任務(wù)的執(zhí)行
3.線程池中的線程初始化
4.任務(wù)緩存隊列及排隊策略
5.任務(wù)拒絕策略
6.線程池的關(guān)閉
7.線程池容量的動態(tài)調(diào)整
1.線程池狀態(tài)
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態(tài):
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示當前線程池的狀態(tài),它是一個volatile變量用來保證線程之間的可見性;
下面的幾個static final變量表示runState可能的幾個取值。
當創(chuàng)建線程池后,初始時,線程池處于RUNNING狀態(tài);
如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時線程池不能夠接受新的任務(wù),它會等待所有任務(wù)執(zhí)行完畢;
如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務(wù),并且會去嘗試終止正在執(zhí)行的任務(wù);
當線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。
2.任務(wù)的執(zhí)行
在了解將任務(wù)提交給線程池到任務(wù)執(zhí)行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務(wù)緩存隊列,用來存放等待執(zhí)行的任務(wù) private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態(tài)鎖,對線程池狀態(tài)(比如線程池大小 //、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集 private volatile long keepAliveTime; //線程存貨時間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設(shè)置存活時間 private volatile int corePoolSize; //核心池的大?。淳€程池中的線程數(shù)目大于這個參數(shù)時,提交的任務(wù)會被放進任務(wù)緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數(shù) private volatile int poolSize; //線程池中當前的線程數(shù) private volatile RejectedExecutionHandler handler; //任務(wù)拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來創(chuàng)建線程 private int largestPoolSize; //用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù) private long completedTaskCount; //用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個數(shù)
每個變量的作用都已經(jīng)標明出來了,這里要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。
corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:
假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務(wù)。
因此只要當10個工人中有工人是空閑的,來了任務(wù)就分配給空閑的工人做;
當10個工人都有任務(wù)在做時,如果還來了任務(wù),就把任務(wù)進行排隊等待;
如果說新任務(wù)數(shù)目增長的速度遠遠大于工人做任務(wù)的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然后就將任務(wù)也分配給這4個臨時工人做;
如果說著14個工人做任務(wù)的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務(wù)或者拋棄前面的一些任務(wù)了。
當這14個工人當中有人空閑時,而新任務(wù)增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務(wù)量突然過大時的一種補救措施。
不過為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。
largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經(jīng)有過的最大線程數(shù)目,跟線程池的容量沒有任何關(guān)系。
下面我們進入正題,看一下任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過程。
在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute()方法,雖然通過submit也可以提交任務(wù),但是實際上submit方法里面最終調(diào)用的還是execute()方法,所以我們只需要研究execute()方法的實現(xiàn)原理即可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:
首先,判斷提交的任務(wù)command是否為null,若是null,則拋出空指針異常;
接著是這句,這句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
由于是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數(shù)不小于核心池大小,那么就會直接進入下面的if語句塊了。
如果線程池中當前線程數(shù)小于核心池大小,則接著執(zhí)行后半部分,也就是執(zhí)行
addIfUnderCorePoolSize(command)
如果執(zhí)行完addIfUnderCorePoolSize這個方法返回false,則繼續(xù)執(zhí)行下面的if語句塊,否則整個方法就直接執(zhí)行完畢了。
如果執(zhí)行完addIfUnderCorePoolSize這個方法返回false,然后接著判斷:
if (runState == RUNNING && workQueue.offer(command))
如果當前線程池處于RUNNING狀態(tài),則將任務(wù)放入任務(wù)緩存隊列;如果當前線程池不處于RUNNING狀態(tài)或者任務(wù)放入緩存隊列失敗,則執(zhí)行:
addIfUnderMaximumPoolSize(command)
如果執(zhí)行addIfUnderMaximumPoolSize方法失敗,則執(zhí)行reject()方法進行任務(wù)拒絕處理。
回到前面:
if (runState == RUNNING && workQueue.offer(command))
這句的執(zhí)行,如果說當前線程池處于RUNNING狀態(tài)且將任務(wù)放入任務(wù)緩存隊列成功,則繼續(xù)進行判斷:
if (runState != RUNNING || poolSize == 0)
這句判斷是為了防止在將此任務(wù)添加進任務(wù)緩存隊列的同時其他線程突然調(diào)用shutdown或者shutdownNow方法關(guān)閉了線程池的一種應(yīng)急措施。如果是這樣就執(zhí)行:
ensureQueuedTaskHandled(command)
進行應(yīng)急處理,從名字可以看出是保證 添加到任務(wù)緩存隊列中的任務(wù)得到處理。
我們接著看2個關(guān)鍵方法的實現(xiàn):addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù) } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
這個是addIfUnderCorePoolSize方法的具體實現(xiàn),從名字可以看出它的意圖就是當?shù)陀诤诵某源笮r執(zhí)行的方法。下面看其具體實現(xiàn),首先獲取到鎖,因為這地方涉及到線程池狀態(tài)的變化,先通過if語句判斷當前線程池中的線程數(shù)目是否小于核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經(jīng)判斷過了嗎,只有線程池當前線程數(shù)目小于核心池大小才會執(zhí)行addIfUnderCorePoolSize方法的,為何這地方還要繼續(xù)判斷?原因很簡單,前面的判斷過程中并沒有加鎖,因此可能在execute方法判斷的時候poolSize小于corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務(wù),就可能導(dǎo)致poolSize不小于corePoolSize了,所以需要在這個地方繼續(xù)判斷。然后接著判斷線程池的狀態(tài)是否為RUNNING,原因也很簡單,因為有可能在其他線程中調(diào)用了shutdown或者shutdownNow方法。然后就是執(zhí)行
t = addThread(firstTask);
這個方法也非常關(guān)鍵,傳進去的參數(shù)為提交的任務(wù),返回值為Thread類型。然后接著在下面判斷t是否為空,為空則表明創(chuàng)建線程失?。磒oolSize>=corePoolSize或者runState不等于RUNNING),否則調(diào)用t.start()方法啟動線程。
我們來看一下addThread方法的實現(xiàn):
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //創(chuàng)建一個線程,執(zhí)行任務(wù) if (t != null) { w.thread = t; //將創(chuàng)建的線程的引用賦值為w的成員變量 workers.add(w); int nt = ++poolSize; //當前線程數(shù)加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
在addThread方法中,首先用提交的任務(wù)創(chuàng)建了一個Worker對象,然后調(diào)用線程工廠threadFactory創(chuàng)建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接著通過workers.add(w)將Worker對象添加到工作集當中。
下面我們看一下Worker類的實現(xiàn):
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現(xiàn),用戶可以根據(jù) //自己需要重載這個方法和后面的afterExecute方法來進行一些統(tǒng)計信息,比如某個任務(wù)的執(zhí)行時間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當任務(wù)隊列中沒有任務(wù)時,進行清理工作 } } }
它實際上實現(xiàn)了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:
Thread t = new Thread(w);
相當于傳進去了一個Runnable任務(wù),在線程t中執(zhí)行這個Runnable。
既然Worker實現(xiàn)了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
從run方法的實現(xiàn)可以看出,它首先執(zhí)行的是通過構(gòu)造器傳進來的任務(wù)firstTask,在調(diào)用runTask()執(zhí)行完firstTask之后,在while循環(huán)里面不斷通過getTask()去取新的任務(wù)來執(zhí)行,那么去哪里取呢?自然是從任務(wù)緩存隊列里面去取,getTask是ThreadPoolExecutor類中的方法,并不是Worker類中的方法,下面是getTask方法的實現(xiàn):
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數(shù)大于核心池大小或者允許為核心池線程設(shè)置空閑時間, //則通過poll取任務(wù),若等待一定的時間取不到任務(wù),則返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //如果沒取到任務(wù),即r為null,則判斷當前的worker是否可以退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中斷處于空閑狀態(tài)的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } }
在getTask中,先判斷當前線程池狀態(tài),如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。
如果runState為SHUTDOWN或者RUNNING,則從任務(wù)緩存隊列取任務(wù)。
如果當前線程池的線程數(shù)大于核心池大小corePoolSize或者允許為核心池中的線程設(shè)置空閑存活時間,則調(diào)用poll(time,timeUnit)來取任務(wù),這個方法會等待一定的時間,如果取不到任務(wù)就返回null。
然后判斷取到的任務(wù)r是否為null,為null則通過調(diào)用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現(xiàn):
private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; //如果runState大于等于STOP,或者任務(wù)緩存隊列為空了 //或者 允許為核心池線程設(shè)置空閑存活時間并且線程池中的線程數(shù)目大于1 try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
也就是說如果線程池處于STOP狀態(tài)、或者任務(wù)隊列已為空或者允許為核心池線程設(shè)置空閑存活時間并且線程數(shù)大于1時,允許worker退出。如果允許worker退出,則調(diào)用interruptIdleWorkers()中斷處于空閑狀態(tài)的worker,我們看一下interruptIdleWorkers()的實現(xiàn):
void interruptIdleWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //實際上調(diào)用的是worker的interruptIfIdle()方法 w.interruptIfIdle(); } finally { mainLock.unlock(); } }
從實現(xiàn)可以看出,它實際上調(diào)用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { //注意這里,是調(diào)用tryLock()來獲取鎖的,因為如果當前worker正在執(zhí)行任務(wù),鎖已經(jīng)被獲取了,是無法獲取到鎖的 //如果成功獲取了鎖,說明當前worker處于空閑狀態(tài) try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } }
這里有一個非常巧妙的設(shè)計方式,假如我們來設(shè)計線程池,可能會有一個任務(wù)分派線程,當發(fā)現(xiàn)有線程空閑時,就從任務(wù)緩存隊列中取一個任務(wù)交給空閑線程執(zhí)行。但是在這里,并沒有采用這樣的方式,因為這樣會要額外地對任務(wù)分派線程進行管理,無形地會增加難度和復(fù)雜度,這里直接讓執(zhí)行完任務(wù)的線程去任務(wù)緩存隊列里面取任務(wù)來執(zhí)行。
我們再看addIfUnderMaximumPoolSize方法的實現(xiàn),這個方法的實現(xiàn)思想和addIfUnderCorePoolSize方法的實現(xiàn)思想非常相似,唯一的區(qū)別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數(shù)達到了核心池大小并且往任務(wù)隊列中添加任務(wù)失敗的情況下執(zhí)行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
看到?jīng)]有,其實它和addIfUnderCorePoolSize方法的實現(xiàn)基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。
到這里,大部分朋友應(yīng)該對任務(wù)提交給線程池之后到被執(zhí)行的整個過程有了一個基本的了解,下面總結(jié)一下:
1)首先,要清楚corePoolSize和maximumPoolSize的含義;
2)其次,要知道Worker是用來起到什么作用的;
3)要知道任務(wù)提交給線程池之后的處理策略,這里總結(jié)一下主要有4點:
如果當前線程池中的線程數(shù)目小于corePoolSize,則每來一個任務(wù),就會創(chuàng)建一個線程去執(zhí)行這個任務(wù);
如果當前線程池中的線程數(shù)目>=corePoolSize,則每來一個任務(wù),會嘗試將其添加到任務(wù)緩存隊列當中,若添加成功,則該任務(wù)會等待空閑線程將其取出去執(zhí)行;若添加失?。ㄒ话銇碚f是任務(wù)緩存隊列已滿),則會嘗試創(chuàng)建新的線程去執(zhí)行這個任務(wù);
如果當前線程池中的線程數(shù)目達到maximumPoolSize,則會采取任務(wù)拒絕策略進行處理;
如果線程池中的線程數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;如果允許為核心池中的線程設(shè)置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
3.線程池中的線程初始化
默認情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會創(chuàng)建線程。
在實際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個方法辦到:
prestartCoreThread():初始化一個核心線程;
prestartAllCoreThreads():初始化所有核心線程
下面是這2個方法的實現(xiàn):
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數(shù)是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數(shù)是null ++n; return n; }
注意上面?zhèn)鬟M去的參數(shù)是null,根據(jù)第2小節(jié)的分析可知如果傳進去的參數(shù)為null,則最后執(zhí)行線程會阻塞在getTask方法中的
r = workQueue.take();
即等待任務(wù)隊列中有任務(wù)。
4.任務(wù)緩存隊列及排隊策略
在前面我們多次提到了任務(wù)緩存隊列,即workQueue,它用來存放等待執(zhí)行的任務(wù)。
workQueue的類型為BlockingQueue<Runnable>,通??梢匀∠旅嫒N類型:
1)ArrayBlockingQueue:基于數(shù)組的先進先出隊列,此隊列創(chuàng)建時必須指定大??;
2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創(chuàng)建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務(wù),而是將直接新建一個線程來執(zhí)行新來的任務(wù)。
5.任務(wù)拒絕策略
當線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
6.線程池的關(guān)閉
ThreadPoolExecutor提供了兩個方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
shutdown():不會立即終止線程池,而是要等所有任務(wù)緩存隊列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊列,返回尚未執(zhí)行的任務(wù)
7.線程池容量的動態(tài)調(diào)整
ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設(shè)置核心池大小
setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當上述參數(shù)從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。
三.使用示例
前面我們討論了關(guān)于線程池的實現(xiàn)原理,這一節(jié)我們來看一下它的具體使用:
public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊列中等待執(zhí)行的任務(wù)數(shù)目:"+ executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在執(zhí)行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"執(zhí)行完畢"); } }
執(zhí)行結(jié)果:
正在執(zhí)行task 0 線程池中線程數(shù)目:1,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:2,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 1 線程池中線程數(shù)目:3,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 2 線程池中線程數(shù)目:4,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 3 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 4 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:1,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:3,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:4,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:6,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 10 線程池中線程數(shù)目:7,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 11 線程池中線程數(shù)目:8,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 12 線程池中線程數(shù)目:9,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 13 線程池中線程數(shù)目:10,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 14 task 3執(zhí)行完畢 task 0執(zhí)行完畢 task 2執(zhí)行完畢 task 1執(zhí)行完畢 正在執(zhí)行task 8 正在執(zhí)行task 7 正在執(zhí)行task 6 正在執(zhí)行task 5 task 4執(zhí)行完畢 task 10執(zhí)行完畢 task 11執(zhí)行完畢 task 13執(zhí)行完畢 task 12執(zhí)行完畢 正在執(zhí)行task 9 task 14執(zhí)行完畢 task 8執(zhí)行完畢 task 5執(zhí)行完畢 task 7執(zhí)行完畢 task 6執(zhí)行完畢 task 9執(zhí)行完畢
從執(zhí)行結(jié)果可以看出,當線程池中線程的數(shù)目大于5時,便將任務(wù)放入任務(wù)緩存隊列里面,當任務(wù)緩存隊列滿了之后,便創(chuàng)建新的線程。如果上面程序中,將for循環(huán)中改成執(zhí)行20個任務(wù),就會拋出任務(wù)拒絕異常了。
不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態(tài)方法來創(chuàng)建線程池:
Executors.newCachedThreadPool(); //創(chuàng)建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創(chuàng)建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創(chuàng)建固定容量大小的緩沖池
下面是這三個靜態(tài)方法的具體實現(xiàn);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從它們的具體實現(xiàn)來看,它們實際上也是調(diào)用了ThreadPoolExecutor,只不過參數(shù)都已配置好了。
newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運行,當線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態(tài)方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數(shù)有點麻煩,要根據(jù)實際任務(wù)的類型和數(shù)量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
四.如何合理配置線程池的大小
本節(jié)來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。
一般需要根據(jù)任務(wù)的類型來配置線程池大?。?/p>
如果是CPU密集型任務(wù),就需要盡量壓榨CPU,參考值可以設(shè)為 NCPU+1
如果是IO密集型任務(wù),參考值可以設(shè)置為2*NCPU
當然,這只是一個參考值,具體的設(shè)置還需要根據(jù)實際情況進行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運行情況和系統(tǒng)負載、資源利用率來進行適當調(diào)整。
總結(jié)
以上就是本文關(guān)于深入理解Java編程線程池的實現(xiàn)原理的全部內(nèi)容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復(fù)大家的。感謝朋友們對本站的支持!
相關(guān)文章
微信js sdk invalid signature簽名錯誤問題的解決方法分析
這篇文章主要介紹了微信js sdk invalid signature簽名錯誤問題的解決方法,結(jié)合實例形式分析了微信簽名錯誤問題相關(guān)解決方法,需要的朋友可以參考下2019-04-04JDK21中虛擬線程到底是什么以及用法總結(jié)(看完便知)
這篇文章主要給大家介紹了關(guān)于JDK21中虛擬線程到底是什么以及用法的相關(guān)資料,虛擬線程是一種輕量化的線程封裝,由jvm直接調(diào)度和管理,反之普通的線程其實是調(diào)用的操作系統(tǒng)的能力,對應(yīng)的是操作系統(tǒng)級的線程,需要的朋友可以參考下2023-12-12SpringBoot+WebSocket+Netty實現(xiàn)消息推送的示例代碼
這篇文章主要介紹了SpringBoot+WebSocket+Netty實現(xiàn)消息推送的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧2020-04-04