了解Java線程池創(chuàng)建過程
前言
最近在改進(jìn)項(xiàng)目的并發(fā)功能,但開發(fā)起來磕磕碰碰的??戳撕枚噘Y料,總算加深了認(rèn)識。于是打算配合查看源代碼,總結(jié)并發(fā)編程的原理。
準(zhǔn)備從用得最多的線程池開始,圍繞創(chuàng)建、執(zhí)行、關(guān)閉認(rèn)識線程池整個生命周期的實(shí)現(xiàn)原理。后續(xù)再研究原子變量、并發(fā)容器、阻塞隊(duì)列、同步工具、鎖等等主題。java.util.concurrent里的并發(fā)工具用起來不難,但不能僅僅會用,我們要read the fucking source code,哈哈。順便說聲,我用的JDK是1.8。
Executor框架
Executor是一套線程池管理框架,接口里只有一個方法execute,執(zhí)行Runnable任務(wù)。ExecutorService接口擴(kuò)展了Executor,添加了線程生命周期的管理,提供任務(wù)終止、返回任務(wù)結(jié)果等方法。AbstractExecutorService實(shí)現(xiàn)了ExecutorService,提供例如submit方法的默認(rèn)實(shí)現(xiàn)邏輯。
然后到今天的主題ThreadPoolExecutor,繼承了AbstractExecutorService,提供線程池的具體實(shí)現(xiàn)。
構(gòu)造方法
下面是ThreadPoolExecutor最普通的構(gòu)造函數(shù),最多有七個參數(shù)。具體代碼不貼了,只是一些參數(shù)校驗(yàn)和設(shè)置的語句。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize是線程池的目標(biāo)大小,即是線程池剛剛創(chuàng)建起來,還沒有任務(wù)要執(zhí)行時的大小。maximumPoolSize是線程池的最大上限。keepAliveTime是線程的存活時間,當(dāng)線程池內(nèi)的線程數(shù)量大于corePoolSize,超出存活時間的空閑線程就會被回收。unit就不用說了,剩下的三個參數(shù)看后文的分析。
預(yù)設(shè)的定制線程池
ThreadPoolExecutor預(yù)設(shè)了一些已經(jīng)定制好的線程池,由Executors里的工廠方法創(chuàng)建。下面分析newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool的創(chuàng)建參數(shù)。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為傳入的固定數(shù)量,keepAliveTim設(shè)置為0。線程池創(chuàng)建后,線程數(shù)量將會固定不變,適合需要線程很穩(wěn)定的場合。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor是線程數(shù)量固定為1的newFixedThreadPool版本,保證池內(nèi)的任務(wù)串行。注意到返回的是FinalizableDelegatedExecutorService,來看看源碼:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizableDelegatedExecutorService繼承了DelegatedExecutorService,僅僅在gc時增加關(guān)閉線程池的操作,再來看看DelegatedExecutorService的源碼:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }
代碼很簡單,DelegatedExecutorService包裝了ExecutorService,使其只暴露出ExecutorService的方法,因此不能再配置線程池的參數(shù)。本來,線程池創(chuàng)建的參數(shù)是可以調(diào)整的,ThreadPoolExecutor提供了set方法。使用newSingleThreadExecutor目的是生成單線程串行的線程池,如果還能配置線程池大小,那就沒意思了。
Executors還提供了unconfigurableExecutorService方法,將普通線程池包裝成不可配置的線程池。如果不想線程池被不明所以的后人修改,可以調(diào)用這個方法。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool生成一個會緩存的線程池,線程數(shù)量可以從0到Integer.MAX_VALUE,超時時間為1分鐘。線程池用起來的效果是:如果有空閑線程,會復(fù)用線程;如果沒有空閑線程,會新建線程;如果線程空閑超過1分鐘,將會被回收。
newScheduledThreadPool
newScheduledThreadPool將會創(chuàng)建一個可定時執(zhí)行任務(wù)的線程池。這個不打算在本文展開,后續(xù)會另開文章細(xì)講。
等待隊(duì)列
newCachedThreadPool的線程上限幾乎等同于無限,但系統(tǒng)資源是有限的,任務(wù)的處理速度總有可能比不上任務(wù)的提交速度。因此,可以為ThreadPoolExecutor提供一個阻塞隊(duì)列來保存因線程不足而等待的Runnable任務(wù),這就是BlockingQueue。
JDK為BlockingQueue提供了幾種實(shí)現(xiàn)方式,常用的有:
- ArrayBlockingQueue:數(shù)組結(jié)構(gòu)的阻塞隊(duì)列
- LinkedBlockingQueue:鏈表結(jié)構(gòu)的阻塞隊(duì)列
- PriorityBlockingQueue:有優(yōu)先級的阻塞隊(duì)列
- SynchronousQueue:不會存儲元素的阻塞隊(duì)列
newFixedThreadPool和newSingleThreadExecutor在默認(rèn)情況下使用一個無界的LinkedBlockingQueue。要注意的是,如果任務(wù)一直提交,但線程池又不能及時處理,等待隊(duì)列將會無限制地加長,系統(tǒng)資源總會有消耗殆盡的一刻。所以,推薦使用有界的等待隊(duì)列,避免資源耗盡。但解決一個問題,又會帶來新問題:隊(duì)列填滿之后,再來新任務(wù),這個時候怎么辦?后文會介紹如何處理隊(duì)列飽和。
newCachedThreadPool使用的SynchronousQueue十分有趣,看名稱是個隊(duì)列,但它卻不能存儲元素。要將一個任務(wù)放進(jìn)隊(duì)列,必須有另一個線程去接收這個任務(wù),一個進(jìn)就有一個出,隊(duì)列不會存儲任何東西。因此,SynchronousQueue是一種移交機(jī)制,不能算是隊(duì)列。newCachedThreadPool生成的是一個沒有上限的線程池,理論上提交多少任務(wù)都可以,使用SynchronousQueue作為等待隊(duì)列正合適。
飽和策略
當(dāng)有界的等待隊(duì)列滿了之后,就需要用到飽和策略去處理,ThreadPoolExecutor的飽和策略通過傳入RejectedExecutionHandler來實(shí)現(xiàn)。如果沒有為構(gòu)造函數(shù)傳入,將會使用默認(rèn)的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默認(rèn)的實(shí)現(xiàn),直接拋出一個RejectedExecutionException異常,讓調(diào)用者自己處理。除此之外,還有幾種飽和策略,來看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果隊(duì)列滿了,后續(xù)的任務(wù)都拋棄掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy會將等待隊(duì)列里最舊的任務(wù)踢走,讓新任務(wù)得以執(zhí)行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一種飽和策略是CallerRunsPolicy,它既不拋棄新任務(wù),也不拋棄舊任務(wù),而是直接在當(dāng)前線程運(yùn)行這個任務(wù)。當(dāng)前線程一般就是主線程啊,讓主線程運(yùn)行任務(wù),說不定就阻塞了。如果不是想清楚了整套方案,還是少用這種策略為妙。
ThreadFactory
每當(dāng)線程池需要創(chuàng)建一個新線程,都是通過線程工廠獲取。如果不為ThreadPoolExecutor設(shè)定一個線程工廠,就會使用默認(rèn)的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平時打印線程池里線程的name時,會輸出形如pool-1-thread-1之類的名稱,就是在這里設(shè)置的。這個默認(rèn)的線程工廠,創(chuàng)建的線程是普通的非守護(hù)線程,如果需要定制,實(shí)現(xiàn)ThreadFactory后傳給ThreadPoolExecutor即可。
不看代碼不總結(jié)不會知道,光是線程池的創(chuàng)建就可以引出很多學(xué)問。別看平時創(chuàng)建線程池是一句代碼的事,其實(shí)ThreadPoolExecutor提供了很靈活的定制方法。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
springboot?maven?打包插件介紹及注意事項(xiàng)說明
這篇文章主要介紹了springboot?maven?打包插件介紹及注意事項(xiàng)說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12淺談Java中Lambda表達(dá)式的相關(guān)操作
java8新特性,Lambda是一個匿名函數(shù),類似Python中的Lambda表達(dá)式、js中的箭頭函數(shù),目的簡化操作,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06Spring標(biāo)準(zhǔn)的xml文件頭實(shí)例分析
這篇文章主要介紹了Spring標(biāo)準(zhǔn)的xml文件頭實(shí)例分析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11JVM調(diào)整java虛擬機(jī)可使用的最大內(nèi)存的方法
本文主要介紹了調(diào)整JVM的內(nèi)存參數(shù)來優(yōu)化Java應(yīng)用程序的性能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-01-01Java設(shè)計(jì)模式——工廠設(shè)計(jì)模式詳解
這篇文章主要介紹了Java設(shè)計(jì)模式——工廠設(shè)計(jì)模式詳解,具有一定參考價值,需要的朋友可以了解下。2017-11-11springboot2.0.0配置多數(shù)據(jù)源出現(xiàn)jdbcUrl is required with driverClassN
這篇文章主要介紹了springboot2.0.0配置多數(shù)據(jù)源出現(xiàn)jdbcUrl is required with driverClassName的錯誤,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11