Java線程池ThreadPoolExecutor原理及使用實(shí)例
引導(dǎo)
要求:線程資源必須通過(guò)線程池提供,不允許在應(yīng)用自行顯式創(chuàng)建線程;
說(shuō)明:使用線程池的好處是減少在創(chuàng)建和銷(xiāo)毀線程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷(xiāo),解決資源不足的問(wèn)題。如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗內(nèi)存或者“過(guò)度切換”的問(wèn)題。
線程池介紹線程池概述
- 線程池,顧名思義是一個(gè)放著線程的池子,這個(gè)池子的線程主要是用來(lái)執(zhí)行任務(wù)的。當(dāng)用戶提交任務(wù)時(shí),線程池會(huì)創(chuàng)建線程去執(zhí)行任務(wù),若任務(wù)超過(guò)了核心線程數(shù)的時(shí)候,會(huì)在一個(gè)任務(wù)隊(duì)列里進(jìn)行排隊(duì)等待,這個(gè)詳細(xì)流程,我們會(huì)后面細(xì)講。
- 任務(wù),通常是一些抽象的且離散的工作單元,我們會(huì)把應(yīng)用程序的工作分解到多個(gè)任務(wù)中去執(zhí)行。一般我們需要使用多線程執(zhí)行任務(wù)的時(shí)候,這些任務(wù)最好都是相互獨(dú)立的,這樣有一定的任務(wù)邊界供程序把控。
- 多線程,當(dāng)使用多線程的時(shí)候,任務(wù)處理過(guò)程就可以從主線程中剝離出來(lái),任務(wù)可以并行處理,同時(shí)處理多個(gè)請(qǐng)求。當(dāng)然了,任務(wù)處理代碼必須是線程安全的。
為何要使用線程池?
降低開(kāi)銷(xiāo):在創(chuàng)建和銷(xiāo)毀線程的時(shí)候會(huì)產(chǎn)生很大的系統(tǒng)開(kāi)銷(xiāo),頻繁創(chuàng)建/銷(xiāo)毀意味著CPU資源的頻繁切換和占用,線程是屬于稀缺資源,不可以頻繁的創(chuàng)建。假設(shè)創(chuàng)建線程的時(shí)長(zhǎng)記為t1,線程執(zhí)行任務(wù)的時(shí)長(zhǎng)記為t2,銷(xiāo)毀線程的時(shí)長(zhǎng)記為t3,如果我們執(zhí)行任務(wù)t2<t1+t3,那么這樣的開(kāi)銷(xiāo)是不劃算的,不使用線程池去避免創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo),將是極大的資源浪費(fèi)。
易復(fù)用和管理:將線程都放在一個(gè)池子里,便于統(tǒng)一管理(可以延時(shí)執(zhí)行,可以統(tǒng)一命名線程名稱等),同時(shí),也便于任務(wù)進(jìn)行復(fù)用。
解耦:將線程的創(chuàng)建和銷(xiāo)毀與執(zhí)行任務(wù)完全分離出來(lái),這樣方便于我們進(jìn)行維護(hù),也讓我們更專注于業(yè)務(wù)開(kāi)發(fā)。線程池的優(yōu)勢(shì)提高資源的利用性:通過(guò)池化可以重復(fù)利用已創(chuàng)建的線程,空閑線程可以處理新提交的任務(wù),從而降低了創(chuàng)建和銷(xiāo)毀線程的資源開(kāi)銷(xiāo)。提高線程的管理性:在一個(gè)線程池中管理執(zhí)行任務(wù)的線程,對(duì)線程可以進(jìn)行統(tǒng)一的創(chuàng)建、銷(xiāo)毀以及監(jiān)控等,對(duì)線程數(shù)做控制,防止線程的無(wú)限制創(chuàng)建,避免線程數(shù)量的急劇上升而導(dǎo)致CPU過(guò)度調(diào)度等問(wèn)題,從而更合理的分配和使用內(nèi)核資源。提高程序的響應(yīng)性:提交任務(wù)后,有空閑線程可以直接去執(zhí)行任務(wù),無(wú)需新建。提高系統(tǒng)的可擴(kuò)展性:利用線程池可以更好的擴(kuò)展一些功能,比如定時(shí)線程池可以實(shí)現(xiàn)系統(tǒng)的定時(shí)任務(wù)。線程池原理線程池的參數(shù)類型
一共有7個(gè):corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,(5+2,前5個(gè)重要)
int corePoolSize:該線程池中核心線程數(shù)最大值
這邊我們區(qū)分兩個(gè)概念:
核心線程:線程池新建線程的時(shí)候,當(dāng)前線程總數(shù)< corePoolSize,新建的線程即為核心線程。非核心線程:線程池新建線程的時(shí)候,當(dāng)前線程總數(shù)< corePoolSize,新建的線程即為核心線程。
核心線程默認(rèn)情況下會(huì)一直存活在線程池中,即使這個(gè)核心線程不工作(空閑狀態(tài)),除非ThreadPoolExecutor 的 allowCoreThreadTimeOut這個(gè)屬性為 true,那么核心線程如果空閑狀態(tài)下,超過(guò)一定時(shí)間后就被銷(xiāo)毀。
int maximumPoolSize:線程總數(shù)最大值
線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù)
long keepAliveTime:非核心線程空閑超時(shí)時(shí)間
keepAliveTime即為空閑線程允許的最大的存活時(shí)間。如果一個(gè)非核心線程空閑狀態(tài)的時(shí)長(zhǎng)超過(guò)keepAliveTime了,就會(huì)被銷(xiāo)毀掉。注意:如果設(shè)置allowCoreThreadTimeOut = true,就變成核心線程超時(shí)銷(xiāo)毀了。
TimeUnit unit:是keepAliveTime 的單位
TimeUnit 是一個(gè)枚舉類型,列舉如下:
單位
單位 | 說(shuō)明 |
---|---|
NANOSECONDS | 1微毫秒 = 1微秒 / 1000 |
MICROSECONDS | 1微秒 = 1毫秒 / 1000 |
MILLISECONDS | 1毫秒 = 1秒 /1000 |
SECONDS | 秒 |
MINUTES | 分 |
HOURS | 小時(shí) |
DAYS | 天 |
BlockingQueue workQueue:存放任務(wù)的阻塞隊(duì)列
當(dāng)核心線程都在工作的時(shí)候,新提交的任務(wù)就會(huì)被添加到這個(gè)工作阻塞隊(duì)列中進(jìn)行排隊(duì)等待;如果阻塞隊(duì)列也滿了,線程池就新建非核心線程去執(zhí)行任務(wù)。workQueue維護(hù)的是等待執(zhí)行的Runnable對(duì)象。
常用的 workQueue 類型:(無(wú)界隊(duì)列、有界隊(duì)列、同步移交隊(duì)列)
SynchronousQueue:同步移交隊(duì)列,適用于非常大的或者無(wú)界的線程池,可以避免任務(wù)排隊(duì),SynchronousQueue隊(duì)列接收到任務(wù)后,會(huì)直接將任務(wù)從生產(chǎn)者移交給工作者線程,這種移交機(jī)制高效。它是一種不存儲(chǔ)元素的隊(duì)列,任務(wù)不會(huì)先放到隊(duì)列中去等線程來(lái)取,而是直接移交給執(zhí)行的線程。只有當(dāng)線程池是無(wú)界的或可以拒絕任務(wù)的時(shí)候,SynchronousQueue隊(duì)列的使用才有意義,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即無(wú)限大。要將一個(gè)元素放入SynchronousQueue,就需要有另一個(gè)線程在等待接收這個(gè)元素。若沒(méi)有線程在等待,并且線程池的當(dāng)前線程數(shù)小于最大值,則ThreadPoolExecutor就會(huì)新建一個(gè)線程;否則,根據(jù)飽和策略,拒絕任務(wù)。newCachedThreadPool默認(rèn)使用的就是這種同步移交隊(duì)列。吞吐量高于LinkedBlockingQueue。
LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,F(xiàn)IFO原則排序。當(dāng)任務(wù)提交過(guò)來(lái),若當(dāng)前線程數(shù)小于corePoolSize核心線程數(shù),則線程池新建核心線程去執(zhí)行任務(wù);若當(dāng)前線程數(shù)等于corePoolSize核心線程數(shù),則進(jìn)入工作隊(duì)列進(jìn)行等待。LinkedBlockingQueue隊(duì)列沒(méi)有最大值限制,只要任務(wù)數(shù)超過(guò)核心線程數(shù),都會(huì)被添加到隊(duì)列中,這就會(huì)導(dǎo)致總線程數(shù)永遠(yuǎn)不會(huì)超過(guò) corePoolSize,所以maximumPoolSize 是一個(gè)無(wú)效設(shè)定。newFixedThreadPool和newSingleThreadPool默認(rèn)是使用的是無(wú)界LinkedBlockingQueue隊(duì)列。吞吐量高于ArrayBlockingQueue。
ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,可以設(shè)置隊(duì)列上限值,F(xiàn)IFO原則排序。當(dāng)任務(wù)提交時(shí),若當(dāng)前線程小于corePoolSize核心線程數(shù),則新建核心線程執(zhí)行任務(wù);若當(dāng)先線程數(shù)等于corePoolSize核心線程數(shù),則進(jìn)入隊(duì)列排隊(duì)等候;若隊(duì)列的任務(wù)數(shù)也排滿了,則新建非核心線程執(zhí)行任務(wù);若隊(duì)列滿了且總線程數(shù)達(dá)到了maximumPoolSize最大線程數(shù),則根據(jù)飽和策略進(jìn)行任務(wù)的拒絕。
DelayQueue:延遲隊(duì)列,隊(duì)列內(nèi)的元素必須實(shí)現(xiàn) Delayed 接口。當(dāng)任務(wù)提交時(shí),入隊(duì)列后只有達(dá)到指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)
PriorityBlockingQueue:優(yōu)先級(jí)阻塞隊(duì)列,根據(jù)優(yōu)先級(jí)執(zhí)行任務(wù),優(yōu)先級(jí)是通過(guò)自然排序或者是Comparator定義實(shí)現(xiàn)。
注意: 只有當(dāng)任務(wù)相互獨(dú)立沒(méi)有任何依賴的時(shí)候,線程池或工作隊(duì)列設(shè)置有界是合理的;若任務(wù)之間存在依賴性,需要使用無(wú)界的
線程池,如newCachedThreadPool,否則有可能會(huì)導(dǎo)致死鎖問(wèn)題。
ThreadFactory threadFactory
創(chuàng)建線程的方式,這是一個(gè)接口,你 new 他的時(shí)候需要實(shí)現(xiàn)他的 Thread newThread(Runnable r) 方法,一般用不上,
RejectedExecutionHandler handler:飽和策略
拋出異常專用,當(dāng)隊(duì)列和最大線程池都滿了之后的飽和策略。
線程池工作流程
一般流程即為:創(chuàng)建worker線程;添加任務(wù)入workQueue隊(duì)列;worker線程執(zhí)行任務(wù)。
當(dāng)一個(gè)任務(wù)被添加進(jìn)線程池時(shí):
1.當(dāng)前線程數(shù)量未達(dá)到 corePoolSize,則新建一個(gè)線程(核心線程)執(zhí)行任務(wù)
2.當(dāng)前線程數(shù)量達(dá)到了 corePoolSize,則將任務(wù)移入阻塞隊(duì)列等待,讓空閑線程處理;
3.當(dāng)阻塞隊(duì)列已滿,新建線程(非核心線程)執(zhí)行任務(wù)
4.當(dāng)阻塞隊(duì)列已滿,總線程數(shù)又達(dá)到了 maximumPoolSize,就會(huì)按照拒絕策略處理無(wú)法執(zhí)行的任務(wù),比如RejectedExecutionHandler拋出異常。
這邊,為了大家能夠更好的去理解這塊的流程,我們舉一個(gè)例子。生活中我們經(jīng)常會(huì)去打一些公司的咨詢電話或者是一些特定機(jī)構(gòu)的投訴電話,而那個(gè)公司或者機(jī)構(gòu)的客服中心就是一個(gè)線程池,正式員工的客服小姐姐就好比是核心線程,比如有6個(gè)客服小姐姐。
5. 當(dāng)用戶的電話打進(jìn)到公司的客服中心的時(shí)候(提交任務(wù));
6. 客服中心會(huì)調(diào)度客服小姐姐去接聽(tīng)電話(創(chuàng)建線程執(zhí)行任務(wù)),如果接聽(tīng)的電話超過(guò)了6個(gè),6個(gè)客服小姐姐都在接聽(tīng)的工作狀態(tài)了(核心線程池滿了),這時(shí)客服中心會(huì)有一個(gè)電話接聽(tīng)等待通道(進(jìn)入任務(wù)隊(duì)列等待),就是我們經(jīng)常聽(tīng)到的“您的通話在排隊(duì),前面排隊(duì)n人?!?br />
7. 當(dāng)然,這個(gè)電話接聽(tīng)等待通道也是有上限的,當(dāng)超過(guò)這個(gè)上限的時(shí)候(任務(wù)隊(duì)列滿了),客服中心就會(huì)立即安排外協(xié)員工(非核心線程),也就是非正式員工去接聽(tīng)額外的電話(任務(wù)隊(duì)列滿了,正式和非正式員工數(shù)量>總?cè)蝿?wù)數(shù),線程池創(chuàng)建非核心線程去執(zhí)行任務(wù))。
8. 當(dāng)用戶電話數(shù)激增,客服中心控制臺(tái)發(fā)現(xiàn)這個(gè)時(shí)候正式員工和外協(xié)員工的總和已經(jīng)滿足不了這些用戶電話接入了(總線程池滿),就開(kāi)始根據(jù)一些公司電話接聽(tīng)規(guī)則去拒絕這些電話(按照拒絕策略處理無(wú)法執(zhí)行的任務(wù))
線程池狀態(tài)
RUNNING:運(yùn)行狀態(tài),指可以接受任務(wù)并執(zhí)行隊(duì)列里的任務(wù)。
SHUTDOWN:調(diào)用了 shutdown() 方法,不再接受新任務(wù),但隊(duì)列里的任務(wù)會(huì)執(zhí)行完畢。
STOP:指調(diào)用了 shutdownNow() 方法,不再接受新任務(wù),所有任務(wù)都變成STOP狀態(tài),不管是否正在執(zhí)行。該操作會(huì)拋棄阻塞隊(duì)列里的所有任務(wù)并中斷所有正在執(zhí)行任務(wù)。
TIDYING:所有任務(wù)都執(zhí)行完畢,程序調(diào)用 shutdown()/shutdownNow() 方法都會(huì)將線程更新為此狀態(tài),若調(diào)用shutdown(),則等執(zhí)行任務(wù)全部結(jié)束,隊(duì)列即為空,變成TIDYING狀態(tài);調(diào)用shutdownNow()方法后,隊(duì)列任務(wù)清空且正在執(zhí)行的任務(wù)中斷后,更新為T(mén)IDYING狀態(tài)。
TERMINATED:終止?fàn)顟B(tài),當(dāng)線程執(zhí)行 terminated() 后會(huì)更新為這個(gè)狀態(tài)。
線程池源碼
線程池核心接口
ThreadPoolExecutor,在java.util.concurrent下。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù) int maximumPoolSize, //最大線程數(shù) long keepAliveTime, //空閑線程存活時(shí)間 TimeUnit unit, //存活時(shí)間單位 BlockingQueue<Runnable> workQueue, //任務(wù)的阻塞隊(duì)列 ThreadFactory threadFactory, //新線程的產(chǎn)生方式 RejectedExecutionHandler handler //拒絕策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPoolExecutor 繼承 AbstractExecutorService;AbstractExecutorService 實(shí)現(xiàn) ExecutorService, ExecutorService 繼承 Executor
public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
線程池構(gòu)造方法
1)5參數(shù)構(gòu)造器
// 5參數(shù)構(gòu)造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
2)6參數(shù)構(gòu)造器-1
// 6參數(shù)構(gòu)造器-1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
3)6參數(shù)構(gòu)造器-2
// 6參數(shù)構(gòu)造器-2 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
4)7參數(shù)構(gòu)造器
// 7參數(shù)構(gòu)造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
四種線程池
常規(guī)用法
//創(chuàng)建固定數(shù)目線程的線程池
Executors.newFixedThreadPool(200);//創(chuàng)建一個(gè)無(wú)限線程的線程池,無(wú)需等待隊(duì)列,任務(wù)提交即執(zhí)行
Executors.newCachedThreadPool()//創(chuàng)建有且僅有一個(gè)線程的線程池
Executors.newSingleThreadExecutor();
newCachedThreadPool():可緩存線程池
介紹
newCachedThreadPool將創(chuàng)建一個(gè)可緩存的線程,如果當(dāng)前線程數(shù)超過(guò)處理任務(wù)時(shí),回收空閑線程;當(dāng)需求增加時(shí),可以添加新線程去處理任務(wù)。
- 線程數(shù)無(wú)限制,corePoolSize數(shù)值為0, maximumPoolSize 的數(shù)值都是為 Integer.MAX_VALUE。
- 若線程未回收,任務(wù)到達(dá)時(shí),會(huì)復(fù)用空閑線程;若無(wú)空閑線程,則新建線程執(zhí)行任務(wù)。
- 因?yàn)閺?fù)用性,一定程序減少頻繁創(chuàng)建/銷(xiāo)毀線程,減少系統(tǒng)開(kāi)銷(xiāo)。
- 工作隊(duì)列可以選用SynchronousQueue。
創(chuàng)建方法
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
源碼
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newFixedThreadPool():定長(zhǎng)線程池
介紹
newFixedThreadPool創(chuàng)建一個(gè)固定長(zhǎng)度的線程池,每次提交一個(gè)任務(wù)的時(shí)候就會(huì)創(chuàng)建一個(gè)新的線程,直到達(dá)到線程池的最大數(shù)量限制。
- 定長(zhǎng),可以控制線程最大并發(fā)數(shù), corePoolSize 和 maximumPoolSize 的數(shù)值都是nThreads。
- 超出的線程會(huì)在隊(duì)列中等待。
- 工作隊(duì)列可以選用LinkedBlockingQueue。
創(chuàng)建方法
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
源碼
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newScheduledThreadPool():定時(shí)線程池
介紹
newScheduledThreadPool創(chuàng)建一個(gè)固定長(zhǎng)度的線程池,并且以延遲或者定時(shí)的方式去執(zhí)行任務(wù)。
創(chuàng)建方法:
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
源碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
newSingleThreadExecutor():?jiǎn)尉€程化的線程池
介紹
newSingleThreadExecutor顧名思義,是一個(gè)單線程的Executor,只創(chuàng)建一個(gè)工作線程執(zhí)行任務(wù),若這個(gè)唯一的線程異常故障了,會(huì)新建另一個(gè)線程來(lái)替代,newSingleThreadExecutor可以保證任務(wù)依照在工作隊(duì)列的排隊(duì)順序來(lái)串行執(zhí)行。
- 有且僅有一個(gè)工作線程執(zhí)行任務(wù);
- 所有任務(wù)按照工作隊(duì)列的排隊(duì)順序執(zhí)行,先進(jìn)先出的順序。
- 單個(gè)線程的線程池就是線程池中只有一個(gè)線程負(fù)責(zé)任務(wù),所以 corePoolSize 和 maximumPoolSize 的數(shù)值都是為 1;當(dāng)這個(gè)線程出現(xiàn)任何異常后,線程池會(huì)自動(dòng)創(chuàng)建一個(gè)線程,始終保持線程池中有且只有一個(gè)存活的線程。
- 工作隊(duì)列可以選用LinkedBlockingQueue。
創(chuàng)建方法
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
源碼
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
execute()方法
介紹
ThreadPoolExecutor.execute(Runnable command)方法,即可向線程池內(nèi)添加一個(gè)任務(wù)
execute源碼
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //獲取當(dāng)前線程池的狀態(tài) int c = ctl.get(); //若當(dāng)前線程數(shù)量小于corePoolSize,則創(chuàng)建一個(gè)新的線程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //判斷當(dāng)前線程是否處于運(yùn)行狀態(tài),且寫(xiě)入任務(wù)阻塞隊(duì)列是否成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次獲取線程狀態(tài)進(jìn)行雙重檢查;如果線程變成非運(yùn)行狀態(tài),則從阻塞隊(duì)列移除任務(wù); if (! isRunning(recheck) && remove(command)) //執(zhí)行拒絕策略 reject(command); //若當(dāng)前線程池為空,則新建一個(gè)線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //當(dāng)前線程為非運(yùn)行狀態(tài)并且嘗試新建線程,若失敗則執(zhí)行拒絕策略。 else if (!addWorker(command, false)) reject(command); }
流程分析
1)若當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker()方法創(chuàng)建線程執(zhí)行任務(wù)。
2)若當(dāng)前線程不小于corePoolSize,則將任務(wù)添加到workQueue隊(duì)列,等待空閑線程來(lái)執(zhí)行。
3)若隊(duì)列里的任務(wù)數(shù)到達(dá)上限,且當(dāng)前運(yùn)行線程小于maximumPoolSize,任務(wù)入workQueue隊(duì)列失敗,新建線程執(zhí)行任務(wù);
4)若創(chuàng)建線程也失敗(隊(duì)列任務(wù)數(shù)到達(dá)上限,且當(dāng)前線程數(shù)達(dá)到了maximumPoolSize),對(duì)于新加入的任務(wù),就會(huì)調(diào)用reject()(內(nèi)部調(diào)用handler)拒絕接受任務(wù)。
Q&A
兩種關(guān)閉線程池的區(qū)別
shutdown(): 執(zhí)行后停止接受新任務(wù),會(huì)把隊(duì)列的任務(wù)執(zhí)行完畢。
shutdownNow(): 執(zhí)行后停止接受新任務(wù),但會(huì)中斷所有的任務(wù)(不管是否正在執(zhí)行中),將線程池狀態(tài)變?yōu)?STOP狀態(tài)。
拒絕策略有哪些?
ThreadPoolExecutor的飽和策略可以通過(guò)調(diào)用setRejectedExecutionHandler來(lái)修改。JDK提供了幾種不同的
RejectedExecutionHandler實(shí)現(xiàn),每種實(shí)現(xiàn)都包含有不同的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
拒絕策略如下:
- CallerRunsPolicy : 調(diào)用線程處理任務(wù)
- AbortPolicy : 拋出異常
- DiscardPolicy : 直接丟棄
- DiscardOldestPolicy : 丟棄隊(duì)列中最老的任務(wù),執(zhí)行新任務(wù)
RejectedExecutionHandler rejected = null;
//默認(rèn)策略,阻塞隊(duì)列滿,則丟任務(wù)、拋出異常
rejected = new ThreadPoolExecutor.AbortPolicy();//阻塞隊(duì)列滿,則丟任務(wù),不拋異常
rejected = new ThreadPoolExecutor.DiscardPolicy();//刪除隊(duì)列中最舊的任務(wù)(最早進(jìn)入隊(duì)列的任務(wù)),嘗試重新提交新的任務(wù)
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();//隊(duì)列滿,不丟任務(wù),不拋異常,若添加到線程池失敗,那么主線程會(huì)自己去執(zhí)行該任務(wù)
rejected = new ThreadPoolExecutor.CallerRunsPolicy();
(1)AbortPolicy、DiscardPolicy和DiscardOldestPolicy
AbortPolicy是默認(rèn)的飽和策略,就是中止任務(wù),該策略將拋出RejectedExecutionException。調(diào)用者可以捕獲這個(gè)異常然后去編寫(xiě)代碼處理異常。
當(dāng)新提交的任務(wù)無(wú)法保存到隊(duì)列中等待執(zhí)行時(shí),DiscardPolicy會(huì)悄悄的拋棄該任務(wù)。
DiscardOldestPolicy則會(huì)拋棄最舊的(下一個(gè)將被執(zhí)行的任務(wù)),然后嘗試重新提交新的任務(wù)。如果工作隊(duì)列是那個(gè)優(yōu)先級(jí)隊(duì)列時(shí),搭配DiscardOldestPolicy飽和策略會(huì)導(dǎo)致優(yōu)先級(jí)最高的那個(gè)任務(wù)被拋棄,所以兩者不要組合使用。
(2)CallerRunsPolicy
CallerRunsPolicy是“調(diào)用者運(yùn)行”策略,實(shí)現(xiàn)了一種調(diào)節(jié)機(jī)制 。它不會(huì)拋棄任務(wù),也不會(huì)拋出異常。 而是將任務(wù)回退到調(diào)用者。它不會(huì)在線程池中執(zhí)行任務(wù),而是在一個(gè)調(diào)用了execute的線程中執(zhí)行該任務(wù)。在線程滿后,新任務(wù)將交由調(diào)用線程池execute方法的主線程執(zhí)行,而由于主線程在忙碌,所以不會(huì)執(zhí)行accept方法,從而實(shí)現(xiàn)了一種平緩的性能降低。
當(dāng)工作隊(duì)列被填滿后,沒(méi)有預(yù)定義的飽和策略來(lái)阻塞execute(除了拋棄就是中止還有去讓調(diào)用者去執(zhí)行)。然而可以通過(guò)Semaphore來(lái)限制任務(wù)的到達(dá)率。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot中@Autowired注入service時(shí)出現(xiàn)循環(huán)依賴問(wèn)題的解決方法
在Spring Boot開(kāi)發(fā)過(guò)程中,@Autowired注入Service時(shí)出現(xiàn)循環(huán)依賴是一個(gè)常見(jiàn)問(wèn)題,循環(huán)依賴指的是兩個(gè)或多個(gè)Bean相互依賴,形成閉環(huán),導(dǎo)致Spring容器無(wú)法正常初始化這些Bean,這里提供幾種解決Spring Boot中@Autowired注入Service時(shí)循環(huán)依賴問(wèn)題的方法2024-02-02Spring如何替換掉默認(rèn)common-logging.jar
這篇文章主要介紹了Spring如何替換掉默認(rèn)common-logging.jar,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05spring+srpingmvc+hibernate實(shí)現(xiàn)動(dòng)態(tài)ztree生成樹(shù)狀圖效果
這篇文章主要介紹了spring+srpingmvc+hibernate動(dòng)態(tài)ztree生成樹(shù)狀圖效果,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-11-11Mapper層繼承BaseMapper<T>需要引入的pom依賴方式
這篇文章主要介紹了Mapper層繼承BaseMapper<T>需要引入的pom依賴方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01如何解決Mybatis-plus中@TableLogic注解失效問(wèn)題
這篇文章主要介紹了如何解決Mybatis-plus中@TableLogic注解失效問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05SpringBoot+logback默認(rèn)日志的配置和使用方式
這篇文章主要介紹了SpringBoot+logback默認(rèn)日志的配置和使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05springboot啟動(dòng)feign項(xiàng)目報(bào)錯(cuò):Service id not legal hostnam的解決
這篇文章主要介紹了springboot啟動(dòng)feign項(xiàng)目報(bào)錯(cuò):Service id not legal hostnam的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Log4j 配置日志打印時(shí)區(qū)的實(shí)現(xiàn)方法
下面小編就為大家分享一篇Log4j 配置日志打印時(shí)區(qū)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-12-12SpringBoot讀取properties文件配置項(xiàng)過(guò)程解析
這篇文章主要介紹了SpringBoot讀取properties文件配置項(xiàng)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06