一篇文章徹底搞懂jdk8線程池
這可能是最簡(jiǎn)短的線程池分析文章了。
頂層設(shè)計(jì),定義執(zhí)行接口
Interface Executor(){ void execute(Runnable command); }
ExecutorService,定義控制接口
interface ExecutorService extends Executor{ }
抽象實(shí)現(xiàn)ExecutorService中的大部分方法
abstract class AbstractExecutorService implements ExecutorService{ //此處把ExecutorService中的提交方法都實(shí)現(xiàn)了 }
我們看下提交中的核心
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // ① //核心線程數(shù)沒(méi)有滿就繼續(xù)添加核心線程 if (addWorker(command, true)) // ② return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // ③ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))// ④ reject(command); //⑦ else if (workerCountOf(recheck) == 0) // ⑤ //如果worker為0,則添加一個(gè)非核心worker,所以線程池里至少有一個(gè)線程 addWorker(null, false);// ⑥ } //隊(duì)列滿了以后,添加非核心線程 else if (!addWorker(command, false))// ⑧ reject(command);//⑦ }
這里就會(huì)有幾道常見的面試題
1,什么時(shí)候用核心線程,什么時(shí)候啟用非核心線程?
添加任務(wù)時(shí)優(yōu)先使用核心線程,核心線程滿了以后,任務(wù)放入隊(duì)列中。只要隊(duì)列不填滿,就一直使用核心線程執(zhí)行任務(wù)(代碼①②)。
當(dāng)隊(duì)列滿了以后開始使用增加非核心線程來(lái)執(zhí)行隊(duì)列中的任務(wù)(代碼⑧)。
2,0個(gè)核心線程,2個(gè)非核心線程,隊(duì)列100,添加99個(gè)任務(wù)是否會(huì)執(zhí)行?
會(huì)執(zhí)行,添加隊(duì)列成功后,如果worker的數(shù)量為0,會(huì)添加非核心線程執(zhí)行任務(wù)(見代碼⑤⑥)
3,隊(duì)列滿了會(huì)怎么樣?
隊(duì)列滿了,會(huì)優(yōu)先啟用非核心線程執(zhí)行任務(wù),如果非核心線程也滿了,那就執(zhí)行拒絕策略。
4,submit 和execute的區(qū)別是?
submit將執(zhí)行任務(wù)包裝成了RunnableFuture,最終返回了Future,executor 方法執(zhí)行無(wú)返回值。
addworker實(shí)現(xiàn)
ThreadPoolExecutor extends AbstractExecutorService{ //保存所有的執(zhí)行線程(worker) HashSet<Worker> workers = new HashSet<Worker>(); //存放待執(zhí)行的任務(wù),這塊具體由指定的隊(duì)列實(shí)現(xiàn) BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ } //添加執(zhí)行worker private boolean addWorker(Runnable firstTask, boolean core) { //這里每次都會(huì)基礎(chǔ)校驗(yàn)和cas校驗(yàn),防止并發(fā)無(wú)法創(chuàng)建線程, retry: for(;;){ for(;;){ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } try{ //創(chuàng)建一個(gè)worker w = new Worker(firstTask); final Thread t = w.thread; try{ //加鎖校驗(yàn),添加到workers集合中 workers.add(w); } //添加成功,將對(duì)應(yīng)的線程啟動(dòng),執(zhí)行任務(wù) t.start(); }finally{ //失敗執(zhí)行進(jìn)行釋放資源 addWorkerFailed(Worker w) } } //Worker 是對(duì)任務(wù)和線程的封裝 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ //線程啟動(dòng)后會(huì)循環(huán)執(zhí)行任務(wù) public void run() { runWorker(this); } } //循環(huán)執(zhí)行 final void runWorker(Worker w) { try{ while (task != null || (task = getTask()) != null) { //執(zhí)行前的可擴(kuò)展點(diǎn) beforeExecute(wt, task); try{ //執(zhí)行任務(wù) task.run(); }finally{ //執(zhí)行后的可擴(kuò)展點(diǎn),這塊也把異常給吃了 afterExecute(task, thrown); } } //這里會(huì)對(duì)執(zhí)行的任務(wù)進(jìn)行統(tǒng)計(jì) }finally{ //異?;蛘呤茄h(huán)退出都會(huì)走這里 processWorkerExit(w, completedAbruptly); } } //獲取執(zhí)行任務(wù),此處決定runWorker的狀態(tài) private Runnable getTask() { //worker的淘汰策略:允許超時(shí)或者工作線程>核心線程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //滿足淘汰策略且...,就返回null,交由processWorkerExit去處理線程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } // 滿足淘汰策略,就等一定的時(shí)間poll(),不滿足,就一直等待take() Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); } //處理任務(wù)退出(循環(huán)獲取不到任務(wù)的時(shí)候) private void processWorkerExit(Worker w, boolean completedAbruptly) { //異常退出的,不能調(diào)整線程數(shù)的 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //不管成功或失敗,都執(zhí)行以下邏輯 //1,計(jì)數(shù),2,減去一個(gè)線程 completedTaskCount += w.completedTasks; //將線程移除,并不關(guān)心是否非核心 workers.remove(w); //如果是還是運(yùn)行狀態(tài) if (!completedAbruptly) { //正常終止的,處理邏輯 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //核心線程為0 ,最小值也是1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //總線程數(shù)大于min就不再添加 if (workerCountOf(c) >= min) return; // replacement not needed } //異常退出一定還會(huì)添加worker,正常退出一般不會(huì)再添加線程,除非核心線程數(shù)為0 addWorker(null, false); } }
這里涉及到幾個(gè)點(diǎn):
1,任務(wù)異常以后雖然有throw異常,但是外面有好幾個(gè)finally代碼;
2,在finally中,進(jìn)行了任務(wù)的統(tǒng)計(jì)以及worker移除;
3,如果還有等待處理的任務(wù),最少添加一個(gè)worker(不管核心線程數(shù)是否為0)
這里會(huì)引申出來(lái)幾個(gè)面試題:
1, 線程池中核心線程數(shù)如何設(shè)置?
cpu密集型:一般為核心線程數(shù)+1,盡可能減少cpu的并行;
IO密集型:可以設(shè)置核心線程數(shù)稍微多些,將IO等待期間的空閑cpu充分利用起來(lái)。
2,線程池使用隊(duì)列的意義?
a)線程的資源是有限的,且線程的創(chuàng)建成本比較高;
b) 要保證cpu資源的合理利用(不能直接給cpu提一堆任務(wù),cpu處理不過(guò)來(lái),大家都慢了)
c) 利用了削峰填谷的思想(保證任務(wù)執(zhí)行的可用性);
d) 隊(duì)列過(guò)大也會(huì)把內(nèi)存撐爆。
3,為什么要用阻塞隊(duì)列?而不是非阻塞隊(duì)列?
a) 利用阻塞的特性,在沒(méi)有任務(wù)時(shí)阻塞一定的時(shí)間,防止資源被釋放(getTask和processWorkExit);
b) 阻塞隊(duì)列在阻塞時(shí),CPU狀態(tài)是wait,等有任務(wù)時(shí),會(huì)被喚醒,不會(huì)占用太多的資源;
線程池有兩個(gè)地方:
1,在execute方法中(提交任務(wù)時(shí)),只要工作線程為0,就至少添加一個(gè)Worker;
2,在processWorkerExit中(正?;虍惓=Y(jié)束時(shí)),只要有待處理的任務(wù),就會(huì)增加Worker
所以正常情況下線程池一定會(huì)保證所有任務(wù)的執(zhí)行。
我們?cè)诳聪耇hreadPoolExecutor中以下幾個(gè)方法
public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
確保了核心線程數(shù)必須是滿的,這些方法特別是在批處理的時(shí)候,或者動(dòng)態(tài)調(diào)整核心線程數(shù)的大小時(shí)很有用。
我們?cè)倏聪翬xecutors中常見的創(chuàng)建線程池的方法:
一、newFixedThreadPool 與newSingleThreadExecutor
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
特點(diǎn):
1,核心線程數(shù)和最大線程數(shù)大小一樣(唯一不同的是,一個(gè)是1,一個(gè)是自定義);
2,隊(duì)列用的是LinkedBlockingQueue(長(zhǎng)度是Integer.Max_VALUE)
當(dāng)任務(wù)的生產(chǎn)速度大于消費(fèi)速度后,很容易將系統(tǒng)內(nèi)存撐爆。
二、 newCachedThreadPool 和
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特點(diǎn):最大線程數(shù)為Integer.MAX_VALUE
當(dāng)任務(wù)提交過(guò)多時(shí),線程創(chuàng)建過(guò)多容易導(dǎo)致無(wú)法創(chuàng)建
三、 newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
這個(gè)主要是并行度,默認(rèn)為cpu的核數(shù)。
四、newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
封裝起來(lái)的要么最大線程數(shù)不可控,要么隊(duì)列長(zhǎng)度不可控,所以阿里規(guī)約里也不建議使用Executors方法創(chuàng)建線程池。
ps:
生產(chǎn)上使用線程池,最好是將關(guān)鍵任務(wù)和非關(guān)鍵任務(wù)分開設(shè)立線程池,非關(guān)鍵業(yè)務(wù)影響關(guān)鍵業(yè)務(wù)的執(zhí)行。
總結(jié)
到此這篇關(guān)于jdk8線程池的文章就介紹到這了,更多相關(guān)jdk8線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring boot整合Shiro實(shí)現(xiàn)單點(diǎn)登錄的示例代碼
本篇文章主要介紹了spring boot整合Shiro實(shí)現(xiàn)單點(diǎn)登錄的示例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-01-01利用spring?boot+WebSocket實(shí)現(xiàn)后臺(tái)主動(dòng)消息推送功能
目前對(duì)于服務(wù)端向客戶端推送數(shù)據(jù),常用技術(shù)方案有輪詢、websocket等,下面這篇文章主要給大家介紹了關(guān)于利用spring?boot+WebSocket實(shí)現(xiàn)后臺(tái)主動(dòng)消息推送功能的相關(guān)資料,需要的朋友可以參考下2022-04-04java編程實(shí)現(xiàn)優(yōu)先隊(duì)列的二叉堆代碼分享
這篇文章主要介紹了java編程實(shí)現(xiàn)優(yōu)先隊(duì)列的二叉堆代碼分享,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11SpringBoot中shiro過(guò)濾器的重寫與配置詳解
在前后端分離跨域訪問(wèn)的項(xiàng)目中shiro進(jìn)行權(quán)限攔截失效 (即使有正確權(quán)限的訪問(wèn)也會(huì)被攔截) 時(shí)造成302重定向錯(cuò)誤等問(wèn)題,為解決這個(gè)問(wèn)題,就需要進(jìn)行shiro過(guò)濾器的重寫以及配置。本文詳細(xì)介紹了解決方法,需要的可以參考一下2022-04-04mybatis plus自動(dòng)生成器解析(及遇到的坑)
這篇文章主要介紹了mybatis-plus自動(dòng)生成器及遇到的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03java連接sql server 2008數(shù)據(jù)庫(kù)代碼
Java的學(xué)習(xí),很重要的一點(diǎn)是對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作。2013-03-03詳解微信開發(fā)之Author網(wǎng)頁(yè)授權(quán)
微信開發(fā)中,經(jīng)常有這樣的需求:獲得用戶頭像、綁定微信號(hào)給用戶發(fā)信息,那么實(shí)現(xiàn)這些的前提就是授權(quán)!本文對(duì)此進(jìn)行系統(tǒng)介紹,需要的朋友一起來(lái)看下吧2016-12-12