java線程池ThreadPoolExecutor類使用小結(jié)
在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供,不允許在應(yīng)用中自行顯示的創(chuàng)建線程,這樣一方面是線程的創(chuàng)建更加規(guī)范,可以合理控制開辟線程的數(shù)量;另一方面線程的細節(jié)管理交給線程池處理,優(yōu)化了資源的開銷。而線程池不允許使用Executors去創(chuàng)建,而要通過ThreadPoolExecutor方式,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等創(chuàng)建線程池的方法,但都有其局限性,不夠靈活;另外由于前面幾種方法內(nèi)部也是通過ThreadPoolExecutor方式實現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池,避免資源耗盡的風(fēng)險。
下面我們就對ThreadPoolExecutor的使用方法進行一個詳細的概述。
首先看下ThreadPoolExecutor的構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}構(gòu)造函數(shù)的參數(shù)含義如下:
corePoolSize:指定了線程池中的線程數(shù)量,它的數(shù)量決定了添加的任務(wù)是開辟新的線程去執(zhí)行,還是放到workQueue任務(wù)隊列中去;
maximumPoolSize:指定了線程池中的最大線程數(shù)量,這個參數(shù)會根據(jù)你使用的workQueue任務(wù)隊列的類型,決定線程池會開辟的最大線程數(shù)量;
keepAliveTime:當(dāng)線程池中空閑線程數(shù)量超過corePoolSize時,多余的線程會在多長時間內(nèi)被銷毀;
unit:keepAliveTime的單位
workQueue:任務(wù)隊列,被添加到線程池中,但尚未被執(zhí)行的任務(wù);它一般分為直接提交隊列、有界任務(wù)隊列、無界任務(wù)隊列、優(yōu)先任務(wù)隊列幾種;
threadFactory:線程工廠,用于創(chuàng)建線程,一般用默認即可;
handler:拒絕策略;當(dāng)任務(wù)太多來不及處理時,如何拒絕任務(wù);
接下來我們對其中比較重要參數(shù)做進一步的了解:
一、workQueue任務(wù)隊列
上面我們已經(jīng)介紹過了,它一般分為直接提交隊列、有界任務(wù)隊列、無界任務(wù)隊列、優(yōu)先任務(wù)隊列;
1、直接提交隊列:設(shè)置為SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,沒執(zhí)行一個插入操作就會阻塞,需要再執(zhí)行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應(yīng)的插入操作。
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//maximumPoolSize設(shè)置為2 ,拒絕策略為AbortPolic策略,直接拋出異常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public ThreadTask() {
public void run() {
System.out.println(Thread.currentThread().getName());輸出結(jié)果為
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)
可以看到,當(dāng)任務(wù)隊列為SynchronousQueue,創(chuàng)建的線程數(shù)大于maximumPoolSize時,直接執(zhí)行了拒絕策略拋出異常。
使用SynchronousQueue隊列,提交的任務(wù)不會被保存,總是會馬上提交執(zhí)行。如果用于執(zhí)行任務(wù)的線程數(shù)量小于maximumPoolSize,則嘗試創(chuàng)建新的進程,如果達到maximumPoolSize設(shè)置的最大值,則根據(jù)你設(shè)置的handler執(zhí)行拒絕策略。因此這種方式你提交的任務(wù)不會被緩存起來,而是會被馬上執(zhí)行,在這種情況下,你需要對你程序的并發(fā)量有個準(zhǔn)確的評估,才能設(shè)置合適的maximumPoolSize數(shù)量,否則很容易就會執(zhí)行拒絕策略;
2、有界的任務(wù)隊列:有界的任務(wù)隊列可以使用ArrayBlockingQueue實現(xiàn),如下所示
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockingQueue有界任務(wù)隊列,若有新的任務(wù)需要執(zhí)行時,線程池會創(chuàng)建新的線程,直到創(chuàng)建的線程數(shù)量達到corePoolSize時,則會將新的任務(wù)加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續(xù)創(chuàng)建線程,直到線程數(shù)量達到maximumPoolSize設(shè)置的最大線程數(shù)量,若大于maximumPoolSize,則執(zhí)行拒絕策略。在這種情況下,線程數(shù)量的上限與有界任務(wù)隊列的狀態(tài)有直接關(guān)系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態(tài),線程數(shù)將一直維持在corePoolSize以下,反之當(dāng)任務(wù)隊列已滿時,則會以maximumPoolSize為最大線程數(shù)上限。
3、無界的任務(wù)隊列:有界任務(wù)隊列可以使用LinkedBlockingQueue實現(xiàn),如下所示
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用無界任務(wù)隊列,線程池的任務(wù)隊列可以無限制的添加新的任務(wù),而線程池創(chuàng)建的最大線程數(shù)量就是你corePoolSize設(shè)置的數(shù)量,也就是說在這種情況下maximumPoolSize這個參數(shù)是無效的,哪怕你的任務(wù)隊列中緩存了很多未執(zhí)行的任務(wù),當(dāng)線程池的線程數(shù)達到corePoolSize后,就不會再增加了;若后續(xù)有新的任務(wù)加入,則直接進入隊列等待,當(dāng)使用這種任務(wù)隊列模式時,一定要注意你任務(wù)提交與處理之間的協(xié)調(diào)與控制,不然會出現(xiàn)隊列中的任務(wù)由于無法及時處理導(dǎo)致一直增長,直到最后資源耗盡的問題。
4、優(yōu)先任務(wù)隊列:優(yōu)先任務(wù)隊列通過PriorityBlockingQueue實現(xiàn),下面我們通過一個例子演示下
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//優(yōu)先任務(wù)隊列
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<20;i++) {
pool.execute(new ThreadTask(i));
}
}
}
public class ThreadTask implements Runnable,Comparable<ThreadTask>{
private int priority;
public int getPriority() {
return priority;
public void setPriority(int priority) {
this.priority = priority;
public ThreadTask() {
public ThreadTask(int priority) {
//當(dāng)前對象和其他對象做比較,當(dāng)前優(yōu)先級大就返回-1,優(yōu)先級小就返回1,值越小優(yōu)先級越高
public int compareTo(ThreadTask o) {
return this.priority>o.priority?-1:1;
public void run() {
try {
//讓線程阻塞,使后續(xù)任務(wù)進入緩存隊列
Thread.sleep(1000);
System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}我們來看下執(zhí)行的結(jié)果情況
priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1
大家可以看到除了第一個任務(wù)直接創(chuàng)建線程執(zhí)行外,其他的任務(wù)都被放入了優(yōu)先任務(wù)隊列,按優(yōu)先級進行了重新排列執(zhí)行,且線程池的線程數(shù)一直為corePoolSize,也就是只有一個。
通過運行的代碼我們可以看出PriorityBlockingQueue它其實是一個特殊的無界隊列,它其中無論添加了多少個任務(wù),線程池創(chuàng)建的線程數(shù)也不會超過corePoolSize的數(shù)量,只不過其他隊列一般是按照先進先出的規(guī)則處理任務(wù),而PriorityBlockingQueue隊列可以自定義規(guī)則根據(jù)任務(wù)的優(yōu)先級順序先后執(zhí)行。
二、拒絕策略
一般我們創(chuàng)建線程池時,為防止資源被耗盡,任務(wù)隊列都會選擇創(chuàng)建有界任務(wù)隊列,但種模式下如果出現(xiàn)任務(wù)隊列已滿且線程池創(chuàng)建的線程數(shù)達到你設(shè)置的最大線程數(shù)時,這時就需要你指定ThreadPoolExecutor的RejectedExecutionHandler參數(shù)即合理的拒絕策略,來處理線程池"超載"的情況。ThreadPoolExecutor自帶的拒絕策略如下:
1、AbortPolicy策略:該策略會直接拋出異常,阻止系統(tǒng)正常工作;
2、CallerRunsPolicy策略:如果線程池的線程數(shù)量達到上限,該策略會把任務(wù)隊列中的任務(wù)放在調(diào)用者線程當(dāng)中運行;
3、DiscardOledestPolicy策略:該策略會丟棄任務(wù)隊列中最老的一個任務(wù),也就是當(dāng)前任務(wù)隊列中最先被添加進去的,馬上要被執(zhí)行的那個任務(wù),并嘗試再次提交;
4、DiscardPolicy策略:該策略會默默丟棄無法處理的任務(wù),不予任何處理。當(dāng)然使用此策略,業(yè)務(wù)場景中需允許任務(wù)的丟失;
以上內(nèi)置的策略均實現(xiàn)了RejectedExecutionHandler接口,當(dāng)然你也可以自己擴展RejectedExecutionHandler接口,定義自己的拒絕策略,我們看下示例代碼:
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定義拒絕策略
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"執(zhí)行了拒絕策略");
}
});
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
try {
//讓線程阻塞,使后續(xù)任務(wù)進入緩存隊列
Thread.sleep(1000);
System.out.println("ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
輸出結(jié)果:
com.hhxx.test.ThreadTask@33909752執(zhí)行了拒絕策略
com.hhxx.test.ThreadTask@55f96302執(zhí)行了拒絕策略
com.hhxx.test.ThreadTask@3d4eac69執(zhí)行了拒絕策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
可以看到由于任務(wù)加了休眠阻塞,執(zhí)行需要花費一定時間,導(dǎo)致會有一定的任務(wù)被丟棄,從而執(zhí)行自定義的拒絕策略;
三、ThreadFactory自定義線程創(chuàng)建
線程池中線程就是通過ThreadPoolExecutor中的ThreadFactory,線程工廠創(chuàng)建的。那么通過自定義ThreadFactory,可以按需要對線程池中創(chuàng)建的線程進行一些特殊的設(shè)置,如命名、優(yōu)先級等,下面代碼我們通過ThreadFactory對線程池中創(chuàng)建的線程進行記錄與命名
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定義線程工廠
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("線程"+r.hashCode()+"創(chuàng)建");
//線程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
//輸出執(zhí)行線程的名稱
System.out.println("ThreadName:"+Thread.currentThread().getName());我們看下輸出結(jié)果
線程118352462創(chuàng)建
線程1550089733創(chuàng)建
線程865113938創(chuàng)建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
線程1442407170創(chuàng)建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170
可以看到線程池中,每個線程的創(chuàng)建我們都進行了記錄輸出與命名。
四、ThreadPoolExecutor擴展
ThreadPoolExecutor擴展主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口實現(xiàn)的,
1、beforeExecute:線程池中任務(wù)運行前執(zhí)行
2、afterExecute:線程池中任務(wù)運行完畢后執(zhí)行
3、terminated:線程池退出后執(zhí)行
通過這三個接口我們可以監(jiān)控每個任務(wù)的開始和結(jié)束時間,或者其他一些功能。下面我們可以通過代碼實現(xiàn)一下
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args ) throws InterruptedException
{
//實現(xiàn)自定義接口
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("線程"+r.hashCode()+"創(chuàng)建");
//線程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy()) {
protected void beforeExecute(Thread t,Runnable r) {
System.out.println("準(zhǔn)備執(zhí)行:"+ ((ThreadTask)r).getTaskName());
}
protected void afterExecute(Runnable r,Throwable t) {
System.out.println("執(zhí)行完畢:"+((ThreadTask)r).getTaskName());
}
protected void terminated() {
System.out.println("線程池退出");
}
};
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask("Task"+i));
}
pool.shutdown();
}
}
public class ThreadTask implements Runnable{
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public ThreadTask(String name) {
this.setTaskName(name);
}
public void run() {
//輸出執(zhí)行線程的名稱
System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
}
}我看下輸出結(jié)果
線程118352462創(chuàng)建
線程1550089733創(chuàng)建
準(zhǔn)備執(zhí)行:Task0
準(zhǔn)備執(zhí)行:Task1
TaskNameTask0---ThreadName:threadPool118352462
線程865113938創(chuàng)建
執(zhí)行完畢:Task0
TaskNameTask1---ThreadName:threadPool1550089733
執(zhí)行完畢:Task1
準(zhǔn)備執(zhí)行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
執(zhí)行完畢:Task3
準(zhǔn)備執(zhí)行:Task2
準(zhǔn)備執(zhí)行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
執(zhí)行完畢:Task4
準(zhǔn)備執(zhí)行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
執(zhí)行完畢:Task5
準(zhǔn)備執(zhí)行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
執(zhí)行完畢:Task6
準(zhǔn)備執(zhí)行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
執(zhí)行完畢:Task8
準(zhǔn)備執(zhí)行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
準(zhǔn)備執(zhí)行:Task7
執(zhí)行完畢:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
執(zhí)行完畢:Task7
執(zhí)行完畢:Task2
線程池退出
可以看到通過對beforeExecute()、afterExecute()和terminated()的實現(xiàn),我們對線程池中線程的運行狀態(tài)進行了監(jiān)控,在其執(zhí)行前后輸出了相關(guān)打印信息。另外使用shutdown方法可以比較安全的關(guān)閉線程池,當(dāng)線程池調(diào)用該方法后,線程池中不再接受后續(xù)添加的任務(wù)。但是,此時線程池不會立刻退出,直到添加到線程池中的任務(wù)都已經(jīng)處理完成,才會退出。
五、線程池線程數(shù)量
線程吃線程數(shù)量的設(shè)置沒有一個明確的指標(biāo),根據(jù)實際情況,只要不是設(shè)置的偏大和偏小都問題不大,結(jié)合下面這個公式即可
/**
* Nthreads=CPU數(shù)量
* Ucpu=目標(biāo)CPU的使用率,0<=Ucpu<=1
* W/C=任務(wù)等待時間與任務(wù)計算時間的比率
*/
Nthreads = Ncpu*Ucpu*(1+W/C)以上就是對ThreadPoolExecutor類從構(gòu)造函數(shù)、拒絕策略、自定義線程創(chuàng)建等方面介紹了其詳細的使用方法,從而我們可以根據(jù)自己的需要,靈活配置和使用線程池創(chuàng)建線程,其中如有不足與不正確的地方還望指出與海涵。
到此這篇關(guān)于java線程池ThreadPoolExecutor類使用詳解的文章就介紹到這了,更多相關(guān)java線程池ThreadPoolExecutor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot使用@Value實現(xiàn)給靜態(tài)變量注入值
這篇文章主要介紹了SpringBoot使用@Value實現(xiàn)給靜態(tài)變量注入值的方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
解決spring boot創(chuàng)建項目遇到配置的問題
這篇文章主要介紹了解決spring boot創(chuàng)建項目遇到配置的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
Java基礎(chǔ)教程之final關(guān)鍵字淺析
這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)教程之final關(guān)鍵字的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06

