Java線程池ThreadPoolExecutor的使用及其原理詳細解讀
什么是線程池
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務。線程池線程都是后臺線程。每個線程都使用默認的堆棧大小,以默認的優(yōu)先級運行,并處于多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創(chuàng)建另一個輔助線程但線程的數(shù)目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。
線程池的作用
- 提高程序運行效率:創(chuàng)建一個新線程需要消耗一定的時間,而線程中的線程在使用時已經(jīng)創(chuàng)建完成,可節(jié)省此部分開銷。
- 解耦作用;線程的創(chuàng)建與執(zhí)行完全分開,方便維護。
- 資源復用:每次線程執(zhí)行完畢后會重新放入線程池中供其它調(diào)用者使用,可以減少線程銷毀的開銷。
- 提高cpu利用率:線程數(shù)過多會導致系統(tǒng)內(nèi)核額外的線程調(diào)度開銷,線程池可以控制創(chuàng)建線程數(shù)量以實現(xiàn)性能最大化。線程數(shù)量應該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡sockets等的數(shù)量,默認情況下線程數(shù)一般取cpu數(shù)量+2比較合適。
- 系統(tǒng)健壯性:接受突發(fā)性的大量請求,不至于使服務器因此產(chǎn)生大量線程去處理請求而導致崩潰。
關(guān)于Java線程池(ThreadPool)
jdk提供的線程池有哪些
在 JDK 1.5 之后推出了相關(guān)的 api,其提供的線程池有以下四種:
- Executors.newCachedThreadPool():創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務)的線程,當任務數(shù)增加時,此線程池又可以智能的添加新線程來處理任務。
- Executors.newFixedThreadPool(nThreads):創(chuàng)建固定大小的線程池。每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。
- Executors.newSingleThreadExecutor():創(chuàng)建單個線程的線程池。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。
- Executors.newScheduledThreadPool(corePoolSize):創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務的需求。
查看Executor源碼會發(fā)現(xiàn),Executor只是一個創(chuàng)建線程池的工具類,這四種方式創(chuàng)建的源碼就會發(fā)現(xiàn),都是利用 ThreadPoolExecutor 類實現(xiàn)的,真正的線程池接口是ExecutorService,其實現(xiàn)類為ThreadPoolExecutor。
Executors 部分源碼:
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
ThreadPoolExecutor 最終的構(gòu)造器源碼:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
- corePoolSize:線程池的大小。線程池創(chuàng)建之后不會立即去創(chuàng)建線程,而是等待線程的到來。當前執(zhí)行的線程數(shù)大于該值時,線程會加入到緩沖隊列。
- maximumPoolSize:線程池中創(chuàng)建的最大線程數(shù)。
- keepAliveTime:空閑的線程多久時間后被銷毀。默認情況下,該值在線程數(shù)大于corePoolSize時,對超出- corePoolSize值的這些線程起作用。
- unit:TimeUnit枚舉類型的值,代表keepAliveTime時間單位。
- workQueue: 用于在執(zhí)行任務之前保存任務的隊列。此隊列將僅包含execute方法提交的可運行任務。
- threadFactory:執(zhí)行程序創(chuàng)建新線程時要使用的工廠
- handler:線程拒絕策略。 當隊列和最大線程池都滿了之后的飽和策略。
其它構(gòu)造器如下:
// 構(gòu)造器一 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 構(gòu)造器二 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // 構(gòu)造器三 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
關(guān)于創(chuàng)建線程池
有一點是肯定的,線程池肯定是不是越大越好。 通常我們是需要根據(jù)這批任務執(zhí)行的性質(zhì)來確定的。
- IO 密集型任務:由于線程并不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數(shù) * 2
- CPU 密集型任務(大量復雜的運算)應當分配較少的線程,比如 CPU 個數(shù)相當?shù)拇笮 ?/li>
當然這些都是經(jīng)驗值,最好的方式還是根據(jù)實際情況測試得出最佳配置。
通常我們使用線程池可以通過如下方式去使用:
使用jdk提供的線程池
//創(chuàng)建一個可緩存的線程池 ExecutorService cachedThreadPool= Executors.newCachedThreadPool(); //通過execute方法執(zhí)行接口 cachedThreadPool.execute(()->{ //Runnable to do something. });
創(chuàng)建自定義線程池
ExecutorService threadPool = new ThreadPoolExecutor(// 自定義一個線程池 1, // 核心線程數(shù) 2, // 最大線程數(shù) 60, // 超過核心線程數(shù)的額外線程存活時間 TimeUnit.SECONDS, // 線程存活時間的時間單位 new ArrayBlockingQueue<>(3) // 有界隊列,容量是3個 , Executors.defaultThreadFactory() // 線程工廠 , new ThreadPoolExecutor.AbortPolicy() //線程的拒絕策略 ); //執(zhí)行一個任務 threadPool.execute(()->{ //線程執(zhí)行的具體邏輯 //Runnable to do something. });
線程池的任務隊列
隊列有三種通用策略:
- 直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
- 無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有corePoolSize 線程都忙時新任務在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立于其他任務,即任務執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web頁服務器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
- 有界隊列。當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量。
線程池的任務拒絕策略
jdk提供了四種拒絕策略:
- AbortPolicy: 丟棄任務,并拋出拒絕執(zhí)行 RejectedExecutionException 異常信息。必須處理好拋出的異常,否則會打斷當前的執(zhí)行流程,影響后續(xù)的任務執(zhí)行。同時它也是線程池默認的拒絕策略。
- CallerRunsPolicy: 當觸發(fā)拒絕策略,只要線程池沒有關(guān)閉的話,則使用調(diào)用線程直接運行任務。一般并發(fā)比較小,性能要求不高,不允許失敗。但是,由于調(diào)用者自己運行任務,如果任務提交速度過快,可能導致程序阻塞,性能效率上必然的損失較大
- DiscardPolicy: 直接丟棄,其他啥都沒有
- DiscardOldestPolicy: 當觸發(fā)拒絕策略,只要線程池沒有關(guān)閉的話,丟棄阻塞隊列 workQueue 中最老的一個任務,并將新任務加入
線程池如何執(zhí)行
所有任務的調(diào)度都是由execute方法完成的,這部分完成的工作是:檢查現(xiàn)在線程池的運行狀態(tài)、運行線程數(shù)、運行策略,決定接下來執(zhí)行的流程,是直接申請線程執(zhí)行,或是緩沖到隊列中執(zhí)行,亦或是直接拒絕該任務。
主要分三步進行:
- 如果運行的線程少于corePoolSize,請嘗試以給定的命令作為第一個線程開始一個新線程任務。對addWorke的調(diào)用以原子方式檢查運行狀態(tài)和workerCount,從而防止會增加當它不應該的時候,返回false。
- 如果任務可以成功排隊,那么我們?nèi)匀恍枰屑殭z查是否應該添加線程(因為自從上次檢查以來已有的已經(jīng)死了)或者自進入此方法后,池已關(guān)閉。所以我們重新檢查狀態(tài),必要時回滾排隊已停止,如果沒有,則啟動新線程。
- 如果無法將任務排隊,則嘗試添加新的線程。如果失敗了,我們就知道我們已經(jīng)關(guān)閉或者飽和了,所以拒絕這個任務。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
線程池的任務添加流程
在excute方法中調(diào)用了addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { // 外層循環(huán),負責判斷線程池狀態(tài),處理線程池狀態(tài)變量加1操作 retry: for (;;) { // 狀態(tài)總體相關(guān)值:運行狀態(tài) + 執(zhí)行線程任務數(shù)量 int c = ctl.get(); // 讀取狀態(tài)值 - 運行狀態(tài) int rs = runStateOf(c); // Check if queue empty only if necessary. // 滿足下面兩大條件的,說明線程池不能接受任務了,直接返回false處理 // 主要目的就是想說,只有線程池的狀態(tài)為 RUNNING 狀態(tài)時,線程池才會接收 // 新的任務,增加新的Worker工作線程 // 線程池的狀態(tài)已經(jīng)至少已經(jīng)處于不能接收任務的狀態(tài)了 if (rs >= SHUTDOWN && //目的是檢查線 程池是否處于關(guān)閉狀態(tài) ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 內(nèi)層循環(huán),負責worker數(shù)量加1操作 for (;;) { // 獲取當前worker線程數(shù)量 int wc = workerCountOf(c); if (wc >= CAPACITY || // 如果線程池數(shù)量達到最大上限值CAPACITY // core為true時判斷是否大于corePoolSize核心線程數(shù)量 // core為false時判斷是否大于maximumPoolSize最大設(shè)置的線程數(shù)量 wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 調(diào)用CAS原子操作,目的是worker線程數(shù)量加1 if (compareAndIncrementWorkerCount(c)) // break retry; c = ctl.get(); // Re-read ctl // CAS原子操作失敗的話,則再次讀取ctl值 if (runStateOf(c) != rs) // 如果剛剛讀取的c狀態(tài)不等于先前讀取的rs狀態(tài),則繼續(xù)外層循環(huán)判斷 continue retry; // else CAS failed due to workerCount change; retry inner loop // 之所以會CAS操作失敗,主要是由于多線程并發(fā)操作,導致workerCount // 工作線程數(shù)量改變而導致的,因此繼續(xù)內(nèi)層循環(huán)嘗試操作 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 創(chuàng)建一個Worker工作線程對象,將任務firstTask, // 新創(chuàng)建的線程thread都封裝到了Worker對象里面 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 由于對工作線程集合workers的添加或者刪除, // 涉及到線程安全問題,所以才加上鎖且該鎖為非公平鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 獲取鎖成功后,執(zhí)行臨界區(qū)代碼,首先檢查獲取當前線程池的狀態(tài)rs int rs = runStateOf(ctl.get()); // 當線程池處于可接收任務狀態(tài) // 或者是不可接收任務狀態(tài),但是有可能該任務等待隊列中的任務 // 滿足這兩種條件時,都可以添加新的工作線程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 添加新的工作線程到工作線程集合workers,workers是set集合 int s = workers.size(); if (s > largestPoolSize) // 變量記錄了線程池在整個生命周期中曾經(jīng)出現(xiàn)的最大線程個數(shù) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 往workers工作線程集合中添加成功后,則立馬調(diào)用線程start方法啟動起來 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 如果啟動線程失敗的話,還得將剛剛添加成功的線程共集合中移除并且做線 // 程數(shù)量做減1操作 addWorkerFailed(w); } return workerStarted; }
線程池中任務的執(zhí)行流程
runWorker 通過調(diào)用t.start()啟動了線程,線程池真正核心執(zhí)行任務的地方就在此
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // allow interrupts 允許中斷 w.unlock(); boolean completedAbruptly = true; try { // 不斷從等待隊列blockingQueue中獲取任務 // 之前addWorker(null, false)這樣的線程執(zhí)行時, // 會通過getTask中再次獲取任務并執(zhí)行 while (task != null || (task = getTask()) != null) { w.lock(); // 上鎖,并不是防止并發(fā)執(zhí)行任務, // 而是為了防止shutdown()被調(diào)用時不終止正在運行的worker線程 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // task.run()執(zhí)行前,由子類實現(xiàn) beforeExecute(wt, task); Throwable thrown = null; try { task.run(); // 執(zhí)行線程Runable的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // task.run()執(zhí)行后,由子類實現(xiàn) afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
如何關(guān)閉線程池
有運行任務自然也有關(guān)閉任務。 通過查看ExecutorService接口,其實無非就是兩個方法 shutdown()/shutdownNow()。
但他們有著重要的區(qū)別:
- shutdown() 執(zhí)行后停止接受新任務,會把隊列的任務執(zhí)行完畢。
- shutdownNow() 也是停止接受新任務,但會中斷所有的任務,將線程池狀態(tài)變?yōu)?stop。
threadPool.shutdown(); threadPool.shutdownNow();
到此這篇關(guān)于Java線程池ThreadPoolExecutor的使用及其原理詳細解讀的文章就介紹到這了,更多相關(guān)Java線程池ThreadPoolExecutor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談springboot項目中定時任務如何優(yōu)雅退出
這篇文章主要介紹了淺談springboot項目中定時任務如何優(yōu)雅退出?具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09詳解使用Mybatis-plus + velocity模板生成自定義的代碼
這篇文章主要介紹了詳解使用Mybatis-plus + velocity模板生成自定義的代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容
這篇文章主要介紹了Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容,文中有非常詳細的代碼示例,對正在學習java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04