一篇文章徹底搞懂jdk8線程池
這可能是最簡短的線程池分析文章了。
頂層設計,定義執(zhí)行接口
Interface Executor(){
void execute(Runnable command);
}
ExecutorService,定義控制接口
interface ExecutorService extends Executor{
}

抽象實現(xiàn)ExecutorService中的大部分方法
abstract class AbstractExecutorService implements ExecutorService{ //此處把ExecutorService中的提交方法都實現(xiàn)了
}

我們看下提交中的核心
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // ①
//核心線程數(shù)沒有滿就繼續(xù)添加核心線程
if (addWorker(command, true)) // ②
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // ③
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))// ④
reject(command); //⑦
else if (workerCountOf(recheck) == 0) // ⑤
//如果worker為0,則添加一個非核心worker,所以線程池里至少有一個線程
addWorker(null, false);// ⑥
}
//隊列滿了以后,添加非核心線程
else if (!addWorker(command, false))// ⑧
reject(command);//⑦
}

這里就會有幾道常見的面試題
1,什么時候用核心線程,什么時候啟用非核心線程?
添加任務時優(yōu)先使用核心線程,核心線程滿了以后,任務放入隊列中。只要隊列不填滿,就一直使用核心線程執(zhí)行任務(代碼①②)。
當隊列滿了以后開始使用增加非核心線程來執(zhí)行隊列中的任務(代碼⑧)。
2,0個核心線程,2個非核心線程,隊列100,添加99個任務是否會執(zhí)行?
會執(zhí)行,添加隊列成功后,如果worker的數(shù)量為0,會添加非核心線程執(zhí)行任務(見代碼⑤⑥)
3,隊列滿了會怎么樣?
隊列滿了,會優(yōu)先啟用非核心線程執(zhí)行任務,如果非核心線程也滿了,那就執(zhí)行拒絕策略。
4,submit 和execute的區(qū)別是?
submit將執(zhí)行任務包裝成了RunnableFuture,最終返回了Future,executor 方法執(zhí)行無返回值。
addworker實現(xiàn)
ThreadPoolExecutor extends AbstractExecutorService{
//保存所有的執(zhí)行線程(worker)
HashSet<Worker> workers = new HashSet<Worker>();
//存放待執(zhí)行的任務,這塊具體由指定的隊列實現(xiàn)
BlockingQueue<Runnable> workQueue;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
}
//添加執(zhí)行worker
private boolean addWorker(Runnable firstTask, boolean core) {
//這里每次都會基礎校驗和cas校驗,防止并發(fā)無法創(chuàng)建線程,
retry:
for(;;){
for(;;){
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
try{
//創(chuàng)建一個worker
w = new Worker(firstTask);
final Thread t = w.thread;
try{
//加鎖校驗,添加到workers集合中
workers.add(w);
}
//添加成功,將對應的線程啟動,執(zhí)行任務
t.start();
}finally{
//失敗執(zhí)行進行釋放資源
addWorkerFailed(Worker w)
}
}
//Worker 是對任務和線程的封裝
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
//線程啟動后會循環(huán)執(zhí)行任務
public void run() {
runWorker(this);
}
}
//循環(huán)執(zhí)行
final void runWorker(Worker w) {
try{
while (task != null || (task = getTask()) != null) {
//執(zhí)行前的可擴展點
beforeExecute(wt, task);
try{
//執(zhí)行任務
task.run();
}finally{
//執(zhí)行后的可擴展點,這塊也把異常給吃了
afterExecute(task, thrown);
}
}
//這里會對執(zhí)行的任務進行統(tǒng)計
}finally{
//異常或者是循環(huán)退出都會走這里
processWorkerExit(w, completedAbruptly);
}
}
//獲取執(zhí)行任務,此處決定runWorker的狀態(tài)
private Runnable getTask() {
//worker的淘汰策略:允許超時或者工作線程>核心線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//滿足淘汰策略且...,就返回null,交由processWorkerExit去處理線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 滿足淘汰策略,就等一定的時間poll(),不滿足,就一直等待take()
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
}
//處理任務退出(循環(huán)獲取不到任務的時候)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//異常退出的,不能調整線程數(shù)的
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//不管成功或失敗,都執(zhí)行以下邏輯
//1,計數(shù),2,減去一個線程
completedTaskCount += w.completedTasks;
//將線程移除,并不關心是否非核心
workers.remove(w);
//如果是還是運行狀態(tài)
if (!completedAbruptly) {
//正常終止的,處理邏輯
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//核心線程為0 ,最小值也是1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//總線程數(shù)大于min就不再添加
if (workerCountOf(c) >= min)
return; // replacement not needed
} //異常退出一定還會添加worker,正常退出一般不會再添加線程,除非核心線程數(shù)為0
addWorker(null, false);
}
}

這里涉及到幾個點:
1,任務異常以后雖然有throw異常,但是外面有好幾個finally代碼;
2,在finally中,進行了任務的統(tǒng)計以及worker移除;
3,如果還有等待處理的任務,最少添加一個worker(不管核心線程數(shù)是否為0)
這里會引申出來幾個面試題:
1, 線程池中核心線程數(shù)如何設置?
cpu密集型:一般為核心線程數(shù)+1,盡可能減少cpu的并行;
IO密集型:可以設置核心線程數(shù)稍微多些,將IO等待期間的空閑cpu充分利用起來。
2,線程池使用隊列的意義?
a)線程的資源是有限的,且線程的創(chuàng)建成本比較高;
b) 要保證cpu資源的合理利用(不能直接給cpu提一堆任務,cpu處理不過來,大家都慢了)
c) 利用了削峰填谷的思想(保證任務執(zhí)行的可用性);
d) 隊列過大也會把內存撐爆。
3,為什么要用阻塞隊列?而不是非阻塞隊列?
a) 利用阻塞的特性,在沒有任務時阻塞一定的時間,防止資源被釋放(getTask和processWorkExit);
b) 阻塞隊列在阻塞時,CPU狀態(tài)是wait,等有任務時,會被喚醒,不會占用太多的資源;
線程池有兩個地方:
1,在execute方法中(提交任務時),只要工作線程為0,就至少添加一個Worker;
2,在processWorkerExit中(正?;虍惓=Y束時),只要有待處理的任務,就會增加Worker
所以正常情況下線程池一定會保證所有任務的執(zhí)行。
我們在看下ThreadPoolExecutor中以下幾個方法
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
確保了核心線程數(shù)必須是滿的,這些方法特別是在批處理的時候,或者動態(tài)調整核心線程數(shù)的大小時很有用。
我們再看下Executors中常見的創(chuàng)建線程池的方法:
一、newFixedThreadPool 與newSingleThreadExecutor
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
特點:
1,核心線程數(shù)和最大線程數(shù)大小一樣(唯一不同的是,一個是1,一個是自定義);
2,隊列用的是LinkedBlockingQueue(長度是Integer.Max_VALUE)
當任務的生產速度大于消費速度后,很容易將系統(tǒng)內存撐爆。
二、 newCachedThreadPool 和
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特點:最大線程數(shù)為Integer.MAX_VALUE
當任務提交過多時,線程創(chuàng)建過多容易導致無法創(chuàng)建
三、 newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
這個主要是并行度,默認為cpu的核數(shù)。
四、newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
封裝起來的要么最大線程數(shù)不可控,要么隊列長度不可控,所以阿里規(guī)約里也不建議使用Executors方法創(chuàng)建線程池。
ps:
生產上使用線程池,最好是將關鍵任務和非關鍵任務分開設立線程池,非關鍵業(yè)務影響關鍵業(yè)務的執(zhí)行。
總結
到此這篇關于jdk8線程池的文章就介紹到這了,更多相關jdk8線程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring boot整合Shiro實現(xiàn)單點登錄的示例代碼
本篇文章主要介紹了spring boot整合Shiro實現(xiàn)單點登錄的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01
利用spring?boot+WebSocket實現(xiàn)后臺主動消息推送功能
目前對于服務端向客戶端推送數(shù)據(jù),常用技術方案有輪詢、websocket等,下面這篇文章主要給大家介紹了關于利用spring?boot+WebSocket實現(xiàn)后臺主動消息推送功能的相關資料,需要的朋友可以參考下2022-04-04
java編程實現(xiàn)優(yōu)先隊列的二叉堆代碼分享
這篇文章主要介紹了java編程實現(xiàn)優(yōu)先隊列的二叉堆代碼分享,具有一定參考價值,需要的朋友可以了解下。2017-11-11
java連接sql server 2008數(shù)據(jù)庫代碼
Java的學習,很重要的一點是對數(shù)據(jù)庫進行操作。2013-03-03

