Java中的線程池ThreadPoolExecutor細(xì)致講解
1. 線程池狀態(tài)
- RUNNING:允許提交并處理任務(wù)
- SHUTDOWN: 不允許提交新的任務(wù),但是會(huì)處理完已提交的任務(wù)
- STOP:不允許提交新的任務(wù),也不會(huì)處理阻塞隊(duì)列未執(zhí)行的,并設(shè)置正在執(zhí)行的線程的中斷標(biāo)志位
- TIDYING:所有任務(wù)執(zhí)行完畢,池中工作的線程數(shù)為0,等待執(zhí)行terminated()方法
- TERMINATED:terminated()方法執(zhí)行完畢
線程池的shutdown()方法,將線程池由RUNNING轉(zhuǎn)為SHUTDOWN狀態(tài)
線程池的shutdownNow()方法,將線程池由RUNNING或SHUTDOWN轉(zhuǎn)為STOP狀態(tài)
SHUTDOWN和STOP狀態(tài)最終都會(huì)變?yōu)門ERMINATED
2. ThreadPoolExecutor構(gòu)造函數(shù)
- public ThreadPoolExecutor(int corePoolSize, 線程池中核心線程數(shù)最大值
- int maximumPoolSize, 線程池中能擁有最多線程數(shù)
- long keepAliveTime, 空閑線程存活時(shí)間
- TimeUnit unit, keepAliveTime單位
- BlockingQueue workQueue, 用于緩存任務(wù)的阻塞隊(duì)列
- ThreadFactory threadFactory, 創(chuàng)建線程的工廠
- RejectedExecutionHandler handler 拒絕策略
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(“export_data_pool_factory” + “-%d”).build(); ExecutorService pool = new ThreadPoolExecutor(5, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(500), factory, new ThreadPoolExecutor.CallerRunsPolicy());
3. 線程池工作原理
3.1 任務(wù)執(zhí)行流程
注: execute()無返回值發(fā)生異常會(huì)拋出 submit()返回值為Feature,異常無感知需要通過返回的Feature獲取異常信息
當(dāng)調(diào)用線程池execute()或者submit() 方法向線程池提交一個(gè)任務(wù)時(shí),線程池會(huì)做如下判斷:
- 如果有空閑線程,且當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新的線程執(zhí)行該任務(wù);
- 如果沒有空閑線程,且當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新的線程執(zhí)行該任務(wù);(與第一條相同,此處另寫原因見下面注釋)
注:此處網(wǎng)上教程為『如果有空閑線程,則直接執(zhí)行該任務(wù); 如果沒有空閑線程,且當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新的線程執(zhí)行該任務(wù)』 但是 ThreadPoolExecutor 的官方注釋:“When a new task is submitted in method {@link #execute(Runnable)}, and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle”,經(jīng)測試官方注釋正確
package test; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.*; import java.util.*; import java.util.concurrent.*; public class Test1 { private final ExecutorService pool; { ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test_pool_factory" + "-%d").build(); pool = new ThreadPoolExecutor(3, 5, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), factory, new ThreadPoolExecutor.CallerRunsPolicy()); } public void close() { this.pool.shutdown(); } public void runTask(int i) { pool.submit(new Runnable() { @Override public void run() { try { System.out.println("index:" + i + " id:" + Thread.currentThread().getId() + " poolSize: " + ((ThreadPoolExecutor)pool).getPoolSize() + " activeSize:" + ((ThreadPoolExecutor)pool).getActiveCount()); System.out.println("index:" + i + " id:" + Thread.currentThread().getId() + " name:" + Thread.currentThread().getName()); System.out.println("==============================================="); } catch (Exception e) { System.out.printf("ERROR: id:%s name:%s time:%s ERROR### \n", Thread.currentThread().getId(), Thread.currentThread().getName(), new Date()); } } }); } public static void main(String[] args) throws IOException, InterruptedException { Test1 t = new Test1(); for (int i = 0; i < 5; i++) { System.out.println("poolSize:" + ((ThreadPoolExecutor)t.pool).getPoolSize() + " activeSize:" + ((ThreadPoolExecutor)t.pool).getActiveCount()); t.runTask(i); Thread.sleep(100); } t.close(); } }
通過運(yùn)行結(jié)果可以看到,任務(wù)執(zhí)行完畢后poolSize已經(jīng)有線程,并且都處于空閑狀態(tài),但當(dāng)poolSize<corePoolSize時(shí)每次都會(huì)新建一個(gè)線程執(zhí)行任務(wù)
- 如果沒有空閑線程,且當(dāng)前的線程數(shù)等于corePoolSize,同時(shí)阻塞隊(duì)列未滿,則將任務(wù)入隊(duì)列,而不添加新的線程;
- 如果沒有空閑線程,且阻塞隊(duì)列已滿,同時(shí)池中的線程數(shù)小于maximumPoolSize ,則創(chuàng)建新的線程執(zhí)行任務(wù);
- 如果沒有空閑線程,且阻塞隊(duì)列已滿,同時(shí)池中的線程數(shù)等于maximumPoolSize ,則根據(jù)構(gòu)造函數(shù)中的 handler 指定的策略來拒絕新的任務(wù)。
3.2 空閑線程釋放策略
當(dāng)線程空閑時(shí)間超過keepAliveTime時(shí),如果線程池設(shè)置了allowCoreThreadTimeout參數(shù)為true(默認(rèn)false)則直接釋放掉該線程(它最終會(huì)收縮到0),如果沒有設(shè)置則判斷當(dāng)前線程數(shù) > corePoolSize,則該線程會(huì)被釋放掉(它最終會(huì)收縮到 corePoolSize 的大小)。
3.3 任務(wù)隊(duì)列(workQueue)
任務(wù)隊(duì)列:決定了緩存任務(wù)的排隊(duì)策略
- 有界隊(duì)列
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于 阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool 使用了這個(gè)隊(duì)列。
- ArrayBlockingQueue:一個(gè)由數(shù)組支持的有界阻塞隊(duì)列。此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。一旦創(chuàng)建了這樣的緩存區(qū),就不能再增加其容量。試圖向已滿隊(duì)列中放入元素會(huì)導(dǎo)致操作受阻塞;試圖從空隊(duì)列中提取元素將導(dǎo)致類似阻塞。
- 無界隊(duì)列
- LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的無界阻塞隊(duì)列,它可以指定容量也可以不指定容量(實(shí)際上任何無限容量的隊(duì)列/棧都是有容量的,這個(gè)容量就是Integer.MAX_VALUE)
- PriorityBlockingQueue:是一個(gè)按照優(yōu)先級(jí)進(jìn)行內(nèi)部元素排序的無界阻塞隊(duì)列。隊(duì)列中的元素必須實(shí)現(xiàn) Comparable 接口,這樣才能通過實(shí)現(xiàn)compareTo()方法進(jìn)行排序。優(yōu)先級(jí)最高的元素將始終排在隊(duì)列的頭部;PriorityBlockingQueue 不會(huì)保證優(yōu)先級(jí)一樣的元素的排序。
ThreadPoolExecutor線程池推薦了三種等待隊(duì)列,SynchronousQueue 、LinkedBlockQueue和 ArrayBlockingQueue
3.4 threadFactory
threadFactory :指定創(chuàng)建線程的工廠。(可以不指定) 如果不指定線程工廠時(shí),ThreadPoolExecutor 會(huì)使用ThreadPoolExecutor.defaultThreadFactory 創(chuàng)建線程。默認(rèn)工廠創(chuàng)建的線程:同屬于相同的線程組,具有同為 Thread.NORM_PRIORITY 的優(yōu)先級(jí),以及名為 “pool-XXX-thread-” 的線程名(XXX為創(chuàng)建線程時(shí)順序序號(hào)),且創(chuàng)建的線程都是非守護(hù)進(jìn)程。
3.5 handler 拒絕策略
handler :表示當(dāng) workQueue 已滿,且池中的線程數(shù)達(dá)到 maximumPoolSize 時(shí),線程池拒絕添加新任務(wù)時(shí)采取的策略。(可以不指定)
- ThreadPoolExecutor.AbortPolicy():拋出RejectedExecutionException異常。默認(rèn)策略
- ThreadPoolExecutor.CallerRunsPolicy():由向線程池提交任務(wù)的線程來執(zhí)行該任務(wù)
- ThreadPoolExecutor.DiscardPolicy():拋棄當(dāng)前的任務(wù)
- ThreadPoolExecutor.DiscardOldestPolicy():拋棄最舊的任務(wù)(最先提交而沒有得到執(zhí)行的任務(wù))
4. 常用方法
除了修改創(chuàng)建線程池參數(shù)的修改allowCoreThreadTimeOut(boolean value),setKeepAliveTime(long timt, TimeUnit unit),setMaximumPoolSize(int maximumPoolSize),setCorePoolSize(int corePoolSize),setThreadFactory(ThreadFactory threadFactory),setRejectedExecutionHandler(RejectedExecutionHandler handler)外
- getCorePoolSize():返回線程池的核心線程數(shù),返回在線程池的coreSize大??;
- getMaximumPoolSize():返回線程池的最大線程數(shù),返回線程池的coreSize大小;
- getLargestPoolSize():記錄了曾經(jīng)出現(xiàn)的最大線程個(gè)數(shù)(水位線);
- getPoolSize():線程池中當(dāng)前線程的數(shù)量;
- getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;
- prestartAllCoreThreads():會(huì)啟動(dòng)所有核心線程,無論是否有待執(zhí)行的任務(wù),線程池都會(huì)創(chuàng)建新的線程,直到池中線程數(shù)量達(dá)到 corePoolSize;
- prestartCoreThread():會(huì)啟動(dòng)一個(gè)核心線程(同上);
- allowCoreThreadTimeOut(true):允許核心線程在KeepAliveTime時(shí)間后,退出;
到此這篇關(guān)于Java中的線程池ThreadPoolExecutor細(xì)致講解的文章就介紹到這了,更多相關(guān)Java的ThreadPoolExecutor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解mybatis 批量更新數(shù)據(jù)兩種方法效率對比
這篇文章主要介紹了詳解mybatis 批量更新數(shù)據(jù)兩種方法效率對比,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02SpringBoot使用Nacos進(jìn)行application.yml配置管理詳解
Nacos是阿里巴巴開源的一個(gè)微服務(wù)配置管理和服務(wù)發(fā)現(xiàn)的解決方案,下面我們來看看在SpringBoot中如何使用Nacos進(jìn)行application.yml配置管理吧2025-03-03Java數(shù)據(jù)結(jié)構(gòu)之實(shí)現(xiàn)跳表
今天帶大家來學(xué)習(xí)Java數(shù)據(jù)結(jié)構(gòu)的相關(guān)知識(shí),文中對用Java實(shí)現(xiàn)跳表作了非常詳細(xì)的圖文解說及代碼示例,對正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05java求數(shù)組元素重復(fù)次數(shù)和java字符串比較大小示例
這篇文章主要介紹了java求數(shù)組元素重復(fù)次數(shù)和java字符串比較大小示例,需要的朋友可以參考下2014-04-04maven無法自動(dòng)導(dǎo)入依賴jar包解決方式
有時(shí)候Maven無法自動(dòng)導(dǎo)入包是因?yàn)樵摪聪螺d到本地倉庫中,本文就來介紹一下解決方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-08-08Java設(shè)計(jì)模式之抽象工廠模式實(shí)例詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之抽象工廠模式,結(jié)合實(shí)例形式分析了抽象工廠模式的概念、功能、定義與使用方法,需要的朋友可以參考下2017-09-09