使用ThreadPoolExecutor之高效處理并發(fā)任務
ThreadPoolExecutor是Java concurrent中用于管理線程池的類,它是Executor框架的一個實現。線程池是一種提高應用程序性能和可靠性的技術,它將多個任務分配給多個線程執(zhí)行,從而實現并發(fā)處理。
ThreadPoolExecutor提供了一種靈活的方式來管理線程池,可以控制線程池的大小、阻塞隊列的大小、線程池的狀態(tài)、線程的創(chuàng)建和銷毀等。
使用ThreadPoolExecutor可以幫助我們避免線程創(chuàng)建和銷毀的開銷,提高應用程序的性能和可伸縮性。
具體來說,它有以下這些優(yōu)點:
- 復用線程:線程池可以重用已經創(chuàng)建的線程,避免了線程創(chuàng)建和銷毀的開銷。
- 控制線程數量:線程池可以控制線程的數量,避免了線程數量過多或過少的問題。
- 提高響應速度:線程池可以提高應用程序的響應速度,因為線程可以立即執(zhí)行任務,而不需要等待線程創(chuàng)建和啟動。
- 提高可伸縮性:線程池可以提高應用程序的可伸縮性,因為它可以自動調整線程的數量,以適應不同的工作負載。
- 提高可靠性:線程池可以提高應用程序的可靠性,因為它可以避免線程崩潰或死鎖的問題,從而保證應用程序的穩(wěn)定性。
總之,ThreadPoolExecutor在多線程開發(fā)中是繞不開的一個類,用的好它可以顯著提升代碼的性能,用不好就有可能代理一些其他的問題,比如我曾經見過因錯誤設置阻塞隊列大小,導致嚴重的業(yè)務故障。
這也是阿里巴巴Java開發(fā)手冊中不允許用Executors去創(chuàng)建線程池的原因。
如何使用
接下來,我們通過一個完整的代碼示例來看下ThreadPoolExecutor具體如何使用:
import java.util.concurrent.*; public class ThreadPoolExecutorExample { public static void main(String[] args) { int corePoolSize = 2; int maximumPoolSize = 4; long keepAliveTime = 10; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); ThreadFactory threadFactory = Executors.defaultThreadFactory(); RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler ); for (int i = 1; i <= 10; i++) { executor.execute(new Task(i)); } executor.shutdown(); } static class Task implements Runnable { private int taskId; public Task(int taskId) { this.taskId = taskId; } @Override public void run() { System.out.println("Task #" + taskId + " is running on " + Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task #" + taskId + " is completed on " + Thread.currentThread().getName()); } } }
上面代碼也很簡單,就是構造了一個線程,讓其多線程打印一些信息。從上面代碼來看,它構造方法一共有7個參數,其中keepAliveTime和timeUnit其實是搭配使用的。
我們來看下具體每個參數的含義和作用:
- corePoolSize:線程池的核心線程數。當有新的任務提交到線程池時,如果當前線程池中的線程數小于corePoolSize,那么線程池會創(chuàng)建新的線程來執(zhí)行任務。如果當前線程池中的線程數大于或等于corePoolSize,那么線程池會將任務添加到阻塞隊列中等待執(zhí)行。默認值為1。
- maximumPoolSize:線程池的最大線程數。如果阻塞隊列已滿,且當前線程池中的線程數小于maximumPoolSize,那么線程池會創(chuàng)建新的線程來執(zhí)行任務。如果當前線程池中的線程數已經達到maximumPoolSize,那么線程池會根據拒絕策略來處理新的任務。默認值為Integer.MAX_VALUE。
- keepAliveTime和timeUnit:線程的空閑時間。當線程池中的線程數大于corePoolSize時,如果一個線程在keepAliveTime后沒有執(zhí)行任何任務,那么該線程將被終止。默認值為0,表示線程池中的所有線程都是長期存活的。
- workQueue:阻塞隊列。當線程池中的線程數已經達到corePoolSize時,新的任務將被添加到阻塞隊列中等待執(zhí)行。ThreadPoolExecutor提供了多種類型的阻塞隊列,包括SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue等。默認值為LinkedBlockingQueue。
- threadFactory:線程工廠。用于創(chuàng)建新的線程。如果沒有指定線程工廠,ThreadPoolExecutor將使用默認的線程工廠來創(chuàng)建線程。默認值為DefaultThreadFactory。
- handler:拒絕策略。當阻塞隊列已滿,且當前線程池中的線程數已經達到maximumPoolSize時,ThreadPoolExecutor會根據指定的拒絕策略來處理新的任務。ThreadPoolExecutor提供了多種拒絕策略,包括AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy和DiscardPolicy等。默認值為AbortPolicy。
在不同的應用場景下,我們可能需要調整這些參數,以提高應用程序的性能和可伸縮性。
例如,可以通過增加corePoolSize和maximumPoolSize來提高線程池的并發(fā)能力,或者通過增加阻塞隊列的大小來提高線程池的緩沖能力。同時,也可以通過指定不同的拒絕策略來處理新的任務,以避免任務丟失或應用程序崩潰的問題。
線程池狀態(tài)
ThreadPoolExecutor在實際使用中有四種狀態(tài),分別是RUNNING、SHUTDOWN、STOP和TERMINATED。
- RUNNING:線程池正在運行。在這個狀態(tài)下,線程池可以接受新的任務,并且會將任務添加到阻塞隊列中等待執(zhí)行,或者直接創(chuàng)建新的線程來執(zhí)行任務。
- SHUTDOWN: 線程池正在關閉。在這個狀態(tài)下,線程池不會接受新的任務,但會將阻塞隊列中的任務繼續(xù)執(zhí)行完畢。如果有新的任務提交到線程池,那么線程池會拒絕這些任務并拋出RejectedExecutionException異常。
- STOP: 線程池已經停止。在這個狀態(tài)下,線程池不會接受新的任務,并且會中斷正在執(zhí)行的任務。如果有新的任務提交到線程池,那么線程池會拒絕這些任務并拋出RejectedExecutionException異常。
- TERMINATED: 線程池已經終止。在這個狀態(tài)下,線程池中的所有任務都已經執(zhí)行完畢,并且所有的線程都已經被銷毀。如果需要重新使用線程池,那么需要重新創(chuàng)建一個新的線程池。
通過合理地控制線程池的狀態(tài),可以避免任務丟失或線程池崩潰的問題,并且可以提高應用程序的性能和可靠性。
例如,在應用程序關閉時,可以先將線程池的狀態(tài)設置為SHUTDOWN,等待線程池中的所有任務執(zhí)行完畢后再將狀態(tài)設置為STOP,最終將線程池狀態(tài)設置為TERMINATED,以確保線程池中的所有任務都能夠得到執(zhí)行。
線程池執(zhí)行任務的過程
ThreadPoolExecutor如何執(zhí)行任務涉及到任務的提交、執(zhí)行、取消和完成等多個方面。
- 任務的提交:可以通過execute()方法將任務提交到線程池中,execute()方法會將任務添加到阻塞隊列中等待執(zhí)行。也可以通過submit()方法將任務提交到線程池中,submit()方法會返回一個Future對象,可以用于獲取任務的執(zhí)行結果。
- 任務的執(zhí)行:線程池會從阻塞隊列中取出任務,并將任務分配給空閑的線程執(zhí)行。如果當前線程池中的線程數小于corePoolSize,那么線程池會創(chuàng)建新的線程來執(zhí)行任務。如果當前線程池中的線程數已經達到corePoolSize,那么線程池會將任務添加到阻塞隊列中等待執(zhí)行。如果阻塞隊列已滿,且當前線程池中的線程數小于maximumPoolSize,那么線程池會創(chuàng)建新的線程來執(zhí)行任務。如果當前線程池中的線程數已經達到maximumPoolSize,那么線程池會根據拒絕策略來處理新的任務。
- 任務的取消:可以通過cancel()方法將任務從阻塞隊列中移除,如果任務還沒有開始執(zhí)行,那么任務將被取消。如果任務已經在執(zhí)行,那么可以通過interrupt()方法中斷任務的執(zhí)行。
- 任務的完成:可以通過Future對象來獲取任務的執(zhí)行結果,也可以通過isDone()方法來判斷任務是否已經執(zhí)行完畢。當任務執(zhí)行完畢后,線程池會將任務從阻塞隊列中移除,并將線程返回到線程池中等待下一個任務的執(zhí)行。
通過合理地控制任務的提交、執(zhí)行、取消和完成等方面的內容,可以提高線程池的性能和可靠性,避免任務丟失或線程池崩潰的問題。
例如,在提交任務時,可以根據任務的類型和優(yōu)先級來選擇合適的阻塞隊列,以確保任務能夠得到及時執(zhí)行。在取消任務時,可以先使用isCancelled()方法來判斷任務是否已經被取消,以避免重復取消任務的問題。
在獲取任務的執(zhí)行結果時,可以使用get()方法來等待任務的執(zhí)行結果,或者使用get(timeout, unit)方法來設置超時時間,以避免任務執(zhí)行時間過長導致線程池阻塞的問題。
阻塞隊列
ThreadPoolExecutor提供了多種類型的阻塞隊列,用于存儲等待執(zhí)行的任務。不同類型的阻塞隊列有不同的特點和適用場景。
接下來介紹ThreadPoolExecutor的三種常用阻塞隊列:
- SynchronousQueue:同步隊列。SynchronousQueue是一個沒有容量的阻塞隊列,它的作用是將任務直接交給線程來執(zhí)行,而不是先將任務存儲在隊列中等待執(zhí)行。如果當前沒有空閑的線程來執(zhí)行任務,那么SynchronousQueue會阻塞任務的提交,直到有線程空閑為止。SynchronousQueue適用于任務執(zhí)行時間短、任務量大的場景,可以避免任務在隊列中等待的時間,提高線程池的響應速度。
- LinkedBlockingQueue:鏈表阻塞隊列。LinkedBlockingQueue是一個有容量的阻塞隊列,它的作用是將任務存儲在隊列中等待執(zhí)行。如果隊列已滿,那么新的任務將被阻塞,直到隊列中有空閑位置為止。LinkedBlockingQueue適用于任務執(zhí)行時間長、任務量大的場景,可以避免任務在線程池中等待執(zhí)行的時間過長,提高線程池的緩沖能力。
- ArrayBlockingQueue:數組阻塞隊列。ArrayBlockingQueue是一個有容量的阻塞隊列,它的作用和LinkedBlockingQueue類似,但是它是一個基于數組的隊列,而不是基于鏈表的隊列。ArrayBlockingQueue適用于任務執(zhí)行時間長、任務量大的場景,可以避免任務在線程池中等待執(zhí)行的時間過長,提高線程池的緩沖能力。與LinkedBlockingQueue相比,ArrayBlockingQueue的吞吐量更高,但是它的性能可能會受到數組大小的限制。
通過合理地選擇不同類型的阻塞隊列,可以根據應用程序的需求來提高線程池的性能和可靠性。
例如,對于任務執(zhí)行時間短、任務量大的場景,可以選擇SynchronousQueue來避免任務在隊列中等待的時間;對于任務執(zhí)行時間長、任務量大的場景,可以選擇LinkedBlockingQueue或ArrayBlockingQueue來提高線程池的緩沖能力。同時,也可以通過調整阻塞隊列的大小來控制線程池的緩沖能力,以適應不同的工作負載。
拒絕策略
ThreadPoolExecutor的拒絕策略用于處理新的任務提交到線程池時,如果線程池已經達到最大線程數和阻塞隊列已滿的情況下,線程池應該如何處理這些新的任務。
ThreadPoolExecutor提供了四種常用的拒絕策略:
- AbortPolicy:拋出RejectedExecutionException異常。這是ThreadPoolExecutor的默認拒絕策略,如果線程池已經達到最大線程數和阻塞隊列已滿的情況下,新的任務將被拒絕并拋出異常。
- CallerRunsPolicy: 由提交任務的線程來執(zhí)行任務。如果線程池已經達到最大線程數和阻塞隊列已滿的情況下,新的任務將被提交到線程池的調用線程中執(zhí)行。這種策略可以避免任務丟失,但是會降低線程池的吞吐量。
- DiscardOldestPolicy:丟棄最老的任務。如果線程池已經達到最大線程數和阻塞隊列已滿的情況下,新的任務將被丟棄,而不是拋出異常或者執(zhí)行任務。這種策略可以避免線程池阻塞,但是可能會丟失一些重要的任務。
- DiscardPolicy:直接丟棄任務。如果線程池已經達到最大線程數和阻塞隊列已滿的情況下,新的任務將被直接丟棄,而不是拋出異?;蛘邎?zhí)行任務。這種策略可以避免線程池阻塞,但是會丟失所有的任務。
通過合理地選擇不同類型的拒絕策略,可以根據應用程序的需求來處理新的任務提交到線程池時的情況。
例如,對于重要的任務,可以選擇CallerRunsPolicy來確保任務能夠得到執(zhí)行;對于不重要的任務,可以選擇DiscardPolicy來避免線程池阻塞。同時,也可以通過實現RejectedExecutionHandler接口來自定義拒絕策略,例如我們可以丟下任務前將其記錄下:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class DiscardOldestWithKafkaRejectedExecutionHandler implements RejectedExecutionHandler { private String kafkaTopic; public DiscardOldestWithKafkaRejectedExecutionHandler(String kafkaTopic) { this.kafkaTopic = kafkaTopic; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 獲取最早的未執(zhí)行任務 Runnable earliestTask = executor.getQueue().peek(); if (earliestTask != null) { // 將最早的未執(zhí)行任務發(fā)到kafka里 KafkaProducer.send(kafkaTopic, earliestTask.toString()); } // 將當前任務提交到線程池 executor.execute(r); } }
在這個示例中,我們實現了一個DiscardOldestWithKafkaRejectedExecutionHandler,它繼承了RejectedExecutionHandler接口,并重寫了rejectedExecution()方法。它的作用是當線程池中的任務隊列和線程池都滿了,并且新的任務被拒絕時,這個方法會將最早未運行的任務拿出來,丟棄到kafka中,然后提交當前的任務。
線程池的監(jiān)控和調優(yōu)
線程池穩(wěn)定和高效的運行也是非常重要的,要想它穩(wěn)定高效運行,就離不開監(jiān)控和調優(yōu)。
下面給出一些監(jiān)控和調優(yōu)ThreadPoolExecutor的方法論:
- 監(jiān)控指標: 可以通過ThreadPoolExecutor提供的一些監(jiān)控指標來了解線程池的狀態(tài)和性能,例如線程池大小、活躍線程數、任務隊列大小、已完成任務數、拒絕任務數等??梢酝ㄟ^調用ThreadPoolExecutor的getPoolSize()、getActiveCount()、getQueueSize()、getCompletedTaskCount()、getRejectedExecutionCount()等方法來獲取這些監(jiān)控指標,以便及時發(fā)現和解決線程池的問題。
- 調優(yōu)策略: 可以通過調整線程池的參數來優(yōu)化線程池的性能和可靠性,例如調整corePoolSize和maximumPoolSize來提高線程池的并發(fā)能力,調整keepAliveTime來控制線程的空閑時間,調整阻塞隊列的大小來提高線程池的緩沖能力,選擇合適的拒絕策略來處理新的任務提交時的情況等。同時,也可以通過監(jiān)控指標來實時調整線程池的參數,以適應不同的工作負載。
- 線程池的優(yōu)化: 可以通過線程池的優(yōu)化來提高線程池的性能和可靠性,例如使用線程池前先評估任務的類型和優(yōu)先級,選擇合適的阻塞隊列和拒絕策略,避免任務的等待和丟失;使用線程池時避免過度提交任務,控制任務的數量和質量;使用線程池時避免長時間的空閑或者過度使用線程池,以避免資源浪費和線程池崩潰的問題。
通過合理地監(jiān)控和調優(yōu)ThreadPoolExecutor,可以提高線程池的性能和可靠性,避免任務丟失或線程池崩潰的問題,并且可以提高應用程序的性能和可靠性。
最佳實踐
使用ThreadPoolExecutor的最佳實踐涉及到線程池的參數設置、任務處理、異常處理等多個方面。
下面介紹一些使用ThreadPoolExecutor的最佳實踐:
- 線程池參數設置:在創(chuàng)建ThreadPoolExecutor時,需要根據應用程序的需求來設置線程池的參數,例如corePoolSize、maximumPoolSize、keepAliveTime、阻塞隊列類型和大小、拒絕策略等??梢愿鶕蝿盏念愋秃蛢?yōu)先級來選擇合適的阻塞隊列和拒絕策略,以避免任務的等待和丟失。
- 任務處理:在提交任務時,需要根據任務的類型和優(yōu)先級來選擇合適的提交方式,例如使用execute()方法或submit()方法提交任務。同時,還需要注意任務的異常處理,可以通過實現UncaughtExceptionHandler接口來自定義異常處理方式,以避免任務異常導致線程池崩潰的問題。
- 線程池的關閉:在關閉線程池時,需要注意線程池的狀態(tài)和任務的處理??梢酝ㄟ^調用shutdown()方法或shutdownNow()方法來關閉線程池,前者會等待所有任務執(zhí)行完畢再關閉線程池,后者會立即關閉線程池并中斷所有任務的執(zhí)行。同時,還需要注意線程池的狀態(tài),可以通過isShutdown()方法和isTerminated()方法來判斷線程池是否已經關閉。
- 線程池的監(jiān)控和調優(yōu):在使用ThreadPoolExecutor時,需要及時監(jiān)控線程池的狀態(tài)和性能,以便及時發(fā)現和解決線程池的問題??梢酝ㄟ^監(jiān)控指標和調優(yōu)策略來優(yōu)化線程池的性能和可靠性,例如調整線程池的參數、選擇合適的阻塞隊列和拒絕策略、避免任務的等待和丟失等。
通過遵循ThreadPoolExecutor的最佳實踐,可以提高線程池的性能和可靠性,避免任務丟失或線程池崩潰的問題,并且可以提高應用程序的性能和可靠性。
總結
ThreadPoolExecutor是Java中用于管理線程池的一個類,它能夠創(chuàng)建和管理線程池,以提高應用程序的性能和可靠性。它具有靈活的線程池管理、高效的任務處理和可靠的異常處理等特點。尤其適用于任務量大、執(zhí)行時間長、任務類型多樣的應用場景,例如Web服務器、數據庫連接池、文件處理等。通過合理地設置線程池參數、處理任務和異常、監(jiān)控和調優(yōu)線程池,可以提高應用程序的性能和可靠性,避免任務丟失或線程池崩潰的問題。
為了充分利用ThreadPoolExecutor提高應用程序的性能和可靠性,我們可以采取以下措施:
- 合理設置線程池參數,例如corePoolSize(核心線程數)、maximumPoolSize(最大線程數)、keepAliveTime(線程空閑時間)、阻塞隊列類型和大小、拒絕策略等,以適應不同的工作負載和應用場景。
- 處理任務和異常,例如選擇合適的任務提交方式(如execute或submit),實現UncaughtExceptionHandler接口來自定義異常處理方式,以避免任務異常導致線程池崩潰的問題。
- 監(jiān)控和調優(yōu)線程池,例如及時監(jiān)控線程池的狀態(tài)和性能、調整線程池的參數、選擇合適的阻塞隊列和拒絕策略、避免任務的等待和丟失等,以提高線程池的性能和可靠性。
通過以上措施,我們可以充分發(fā)揮ThreadPoolExecutor的優(yōu)勢,提高應用程序的性能和可靠性,避免任務丟失或線程池崩潰的問題。同時,這也有助于我們在面臨大量任務、長時間執(zhí)行和多種任務類型的應用場景時,更好地應對挑戰(zhàn),實現高效、穩(wěn)定的應用程序運行。
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
詳解@ConfigurationProperties實現原理與實戰(zhàn)
這篇文章主要介紹了詳解@ConfigurationProperties實現原理與實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10解決spring boot hibernate 懶加載的問題
這篇文章主要介紹了解決spring boot hibernate 懶加載的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10springboot打包無法讀取yml、properties等配置文件的解決
這篇文章主要介紹了springboot打包無法讀取yml、properties等配置文件的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-04-04