從java源碼分析線程池(池化技術(shù))的實(shí)現(xiàn)原理
前言:
線程池是一個(gè)非常重要的知識(shí)點(diǎn),也是池化技術(shù)的一個(gè)典型應(yīng)用,相信很多人都有使用線程池的經(jīng)歷,但是對(duì)于線程池的實(shí)現(xiàn)原理大家都了解嗎?本篇文章我們將深入線程池源碼來(lái)一探究竟。
線程池的起源
背景: 隨著計(jì)算機(jī)硬件的升級(jí)換代,使我們的軟件具備多線程執(zhí)行任務(wù)的能力。當(dāng)我們?cè)谶M(jìn)行多線程編程時(shí),就需要?jiǎng)?chuàng)建線程,如果說(shuō)程序并發(fā)很高的話,我們會(huì)創(chuàng)建大量的線程,而每個(gè)線程執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程,會(huì)極大的降低系統(tǒng)性能,增加服務(wù)器開銷,因?yàn)閯?chuàng)建線程和銷毀線程都需要額外的消耗。
這時(shí)我們就可以借助池化技術(shù),來(lái)優(yōu)化這一缺陷,線程池就誕生了。
池化技術(shù)的本質(zhì)是在高并發(fā)場(chǎng)景下,為了實(shí)現(xiàn)資源復(fù)用,減少資源創(chuàng)建銷毀等開銷,如果并發(fā)數(shù)很小沒有明顯優(yōu)勢(shì)(資源一直占用系統(tǒng)內(nèi)存,沒有機(jī)會(huì)被使用)。
池化技術(shù)介紹: 什么時(shí)池化技術(shù)呢?池化技術(shù)是一種編程技巧,當(dāng)程序出現(xiàn)高并發(fā)時(shí),能夠明顯的優(yōu)化程序,降低系統(tǒng)頻繁創(chuàng)建銷毀連接等額外開銷。我們經(jīng)常接觸到的池化技術(shù)有數(shù)據(jù)庫(kù)連接池、線程池、對(duì)象池等等。池化技術(shù)的特點(diǎn)是將一些高成本的資源維護(hù)在一個(gè)特定的池子(內(nèi)存)中,規(guī)定其最小連接數(shù)、最大連接數(shù)、阻塞隊(duì)列,溢出規(guī)則等配置,方便統(tǒng)一管理。一般情況下也會(huì)附帶一些監(jiān)控,強(qiáng)制回收等配套功能。
池化技術(shù)作為一種資源使用技術(shù),典型的使用情形是:
- 獲取資源的成本較高的時(shí)候
- 請(qǐng)求資源的頻率很高且使用資源總數(shù)較低的時(shí)候
- 面對(duì)性能問題,涉及到處理時(shí)間延遲的時(shí)候
池化技術(shù)資源分類:
- 系統(tǒng)調(diào)用的系統(tǒng)資源,如線程、進(jìn)程、內(nèi)存分配等
- 網(wǎng)絡(luò)通信的遠(yuǎn)程資源, 如數(shù)據(jù)庫(kù)連接、套接字連接等
線程池的定義和使用
線程池是我們?yōu)榱艘?guī)避創(chuàng)建線程,銷毀線程額外開銷而誕生的,所以說(shuō)我們定義創(chuàng)建好線程池之后,就不需要自己來(lái)創(chuàng)建線程,而是使用線程池調(diào)用執(zhí)行我們的任務(wù)。下面我們一起看一下如何定義并創(chuàng)建線程池。
方案一:Executors(僅做了解,推薦使用方案二)
創(chuàng)建線程池可以使用Executors,其中提供了一系列工廠方法用于創(chuàng)建線程池,返回的線程池都實(shí)現(xiàn)了ExecutorService接口。
ExecutorService 接口是Executor接口的子類接口,使用更為廣泛,其提供了線程池生命周期管理的方法,返回 Future 對(duì)象。
也就是說(shuō)我們通過Executors創(chuàng)建線程池,得到ExecutorService
,通過ExecutorService
執(zhí)行異步任務(wù)(實(shí)現(xiàn)Runnable接口)
Executors 可以創(chuàng)建一下幾種類型的線程池:
newCachedThreadPool
創(chuàng)建一個(gè)可緩存線程池,如果線程池線程數(shù)量過剩,會(huì)在60秒后回收掉多余線程資源,當(dāng)任務(wù)書增加,線程不夠用,則會(huì)新建線程。- newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
newScheduledThreadPool
創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。newSingleThreadExecutor
創(chuàng)建一個(gè)單線程的線程池,只使用唯一的線程來(lái)執(zhí)行任務(wù),可以保證任務(wù)按照提交順序來(lái)完成。
方案二:ThreadPoolExecutor
在阿里巴巴開發(fā)規(guī)范中,規(guī)定線程池不允許通過Executors創(chuàng)建,而是通過ThreadPoolExecutor創(chuàng)建。
好處:讓寫的同學(xué)可以更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
ThreadPoolExecutor的七大參數(shù):
(1)corePoolSize
核心線程數(shù)量,核心線程會(huì)一直保留,不會(huì)被銷毀。
(2)maximumPoolSize
最大線程數(shù),當(dāng)核心線程不能滿足任務(wù)需要時(shí),系統(tǒng)就會(huì)創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
(3)keepAliveTime
存活時(shí)間,核心線程之外的線程空閑多長(zhǎng)時(shí)間就會(huì)被銷毀。
(4)timeUnit
代表線程存活的時(shí)間單位。
(5)BlockingQueue
阻塞隊(duì)列
- 如果正在執(zhí)行的任務(wù)超過了最大線程數(shù),可以存放在隊(duì)列中,當(dāng)線程池中有空閑資源就可以從隊(duì)列中取出任務(wù)繼續(xù)執(zhí)行。
- 隊(duì)列類型有如下幾種類型:LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue。
(6)threadFactory
線程工廠,用來(lái)創(chuàng)建線程的,可以自定義線程,比如我們可以定義線程組名稱,在jstack問題排查時(shí),非常有幫助。
(7)rejectedExecutionHandler
拒絕策略,
當(dāng)所有線程(最大線程數(shù))都在忙,并且任務(wù)隊(duì)列處于滿任務(wù)的狀態(tài),則會(huì)執(zhí)行拒絕策略。
JDK為我們提供了四種拒絕策略,我們必須都得熟悉
- AbortPolicy: 丟棄任務(wù),并拋出異常RejectedExecutionException。 默認(rèn)
- DiscardPolicy: 丟棄最新的任務(wù),不拋異常。
- DiscardOldestPolicy: 扔掉排隊(duì)時(shí)間最久的任務(wù),也就是最舊的任務(wù)。
- CallerRuns: 由調(diào)用者(提交異步任務(wù)的線程)處理任務(wù)。
線程池的實(shí)現(xiàn)原理
想要實(shí)現(xiàn)一個(gè)線程池我們就需要關(guān)心ThreadPoolExecutor類,因?yàn)镋xecutors創(chuàng)建線程池也是通過new ThreadPoolExecutor對(duì)象。
看一下ThreadPoolExecutor
的類繼承關(guān)系,可以看出為什么通過Executors
創(chuàng)建的線程池返回結(jié)果是ExecutorService,因?yàn)門hreadPoolExecutor是ExecutorService接口的實(shí)現(xiàn)類,而Executors創(chuàng)建線程池本質(zhì)也是創(chuàng)建的ThreadPoolExecutor 對(duì)象。
下面我們一起看一下ThreadPoolExecutor
的源碼,首先是ThreadPoolExecutor
內(nèi)定義的變量,常量:
// 復(fù)合類型變量 是一個(gè)原子整數(shù) 控制狀態(tài)(運(yùn)行狀態(tài)|線程池活躍線程數(shù)量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 private static final int CAPACITY ? = (1 << COUNT_BITS) - 1; // 容量 // 運(yùn)行狀態(tài)存儲(chǔ)在高位3位 private static final int RUNNING ? ?= -1 << COUNT_BITS; ?// 接受新任務(wù),并處理隊(duì)列任務(wù) private static final int SHUTDOWN ? = ?0 << COUNT_BITS; ?// 不接受新任務(wù),但會(huì)處理隊(duì)列任務(wù) private static final int STOP ? ? ? = ?1 << COUNT_BITS; ?// 不接受新任務(wù),不會(huì)處理隊(duì)列任務(wù),中斷正在處理的任務(wù) private static final int TIDYING ? ?= ?2 << COUNT_BITS; ?// 所有的任務(wù)已結(jié)束,活躍線程為0,線程過渡到TIDYING狀 態(tài),將會(huì)執(zhí)行terminated()鉤子方法 private static final int TERMINATED = ?3 << COUNT_BITS; ?// terminated()方法已經(jīng)完成 // 設(shè)置 ctl 參數(shù)方法 private static int runStateOf(int c) ? ? { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } ? ?/** ? ? * 阻塞隊(duì)列 ? ? */ ? ?private final BlockingQueue<Runnable> workQueue; ? ?/** ? ? * Lock 鎖. ? ? */ ? ?private final ReentrantLock mainLock = new ReentrantLock(); ? ?/** ? ? * 工人們 ? ? */ ? ?private final HashSet<Worker> workers = new HashSet<Worker>(); ? ?/** ? ? * 等待條件支持等待終止 ? ? */ ? ?private final Condition termination = mainLock.newCondition(); ? ?/** ? ? * 最大的池大小. ? ? */ ? ?private int largestPoolSize; ? ?/** ? ? * 完成任務(wù)數(shù) ? ? */ ? ?private long completedTaskCount; ? ?/** ? ? * 線程工廠 ? ? */ ? ?private volatile ThreadFactory threadFactory; ? ?/** ? ? * 拒絕策略 ? ? */ ? ?private volatile RejectedExecutionHandler handler; ? ?/** ? ? * 存活時(shí)間 ? ? */ ? ?private volatile long keepAliveTime; ? ?/** ? ? * 允許核心線程數(shù) ? ? */ ? ?private volatile boolean allowCoreThreadTimeOut; ? ?/** ? ? * 核心線程數(shù) ? ? */ ? ?private volatile int corePoolSize; ? ?/** ? ? * 最大線程數(shù) ? ? */ ? ?private volatile int maximumPoolSize; ? ?/** ? ? * 默認(rèn)拒絕策略 ? ? */ ? ?private static final RejectedExecutionHandler defaultHandler = ? ? ? ?new AbortPolicy(); ? ?/** ? ? * shutdown and shutdownNow權(quán)限 ? ? */ ? ?private static final RuntimePermission shutdownPerm = ? ? ? ?new RuntimePermission("modifyThread");
構(gòu)造器,,支持最少五種參數(shù),最大七中參數(shù)的四種構(gòu)造器:
? ?public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BlockingQueue<Runnable> workQueue) { ? ? ? ?this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ? ? Executors.defaultThreadFactory(), defaultHandler); ? } ? ?public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?ThreadFactory threadFactory) { ? ? ? ?this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ? ? threadFactory, defaultHandler); ? } ? ?public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?RejectedExecutionHandler handler) { ? ? ? ?this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ? ? Executors.defaultThreadFactory(), handler); ? } ? ?public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?ThreadFactory threadFactory, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?RejectedExecutionHandler handler) { ? ? ? ?if (corePoolSize < 0 || ? ? ? ? ? ?maximumPoolSize <= 0 || ? ? ? ? ? ?maximumPoolSize < corePoolSize || ? ? ? ? ? ?keepAliveTime < 0) ? ? ? ? ? ?throw new IllegalArgumentException(); ? ? ? ?if (workQueue == null || threadFactory == null || handler == null) ? ? ? ? ? ?throw new NullPointerException(); ? ? ? ?this.corePoolSize = corePoolSize; ? ? ? ?this.maximumPoolSize = maximumPoolSize; ? ? ? ?this.workQueue = workQueue; ? ? ? ?this.keepAliveTime = unit.toNanos(keepAliveTime); ? ? ? ?this.threadFactory = threadFactory; ? ? ? ?this.handler = handler; ? }
工人,線程池中執(zhí)行任務(wù)的,線程池就是通過這些工人進(jìn)行工作的,有核心員工(核心線程)和臨時(shí)工(人手不夠的時(shí)候,臨時(shí)創(chuàng)建的,如果空閑時(shí)間廠,就會(huì)被裁員),
? ?private final class Worker ? ? ? ?extends AbstractQueuedSynchronizer ? ? ? ?implements Runnable ? { ? ? ? ?private static final long serialVersionUID = 6138294804551838833L; ? ? ? ?// 工人的本質(zhì)就是個(gè)線程 ? ? ? ?final Thread thread; ? ? ? ?// 第一件工作任務(wù) ? ? ? ?Runnable firstTask; ? ? ?volatile long completedTasks; ? ? ? ?/** ? ? ? ? * 構(gòu)造器 ? ? ? ? */ ? ? ? ?Worker(Runnable firstTask) { ? ? ? ? ? ?setState(-1); // inhibit interrupts until runWorker ? ? ? ? ? ?this.firstTask = firstTask; ? ? ? ? ? ?this.thread = getThreadFactory().newThread(this); ? ? ? } ? ? ? ?/** 工作 */ ? ? ? ?public void run() { ? ? ? ? ? ?runWorker(this); ? ? ? } ? ? ? ?protected boolean isHeldExclusively() { ? ? ? ? ? ?return getState() != 0; ? ? ? } ? ? ? ?protected boolean tryAcquire(int unused) { ? ? ? ? ? ?if (compareAndSetState(0, 1)) { ? ? ? ? ? ? ? ?setExclusiveOwnerThread(Thread.currentThread()); ? ? ? ? ? ? ? ?return true; ? ? ? ? ? } ? ? ? ? ? ?return false; ? ? ? } ? ? ? ?protected boolean tryRelease(int unused) { ? ? ? ? ? ?setExclusiveOwnerThread(null); ? ? ? ? ? ?setState(0); ? ? ? ? ? ?return true; ? ? ? } ? ? ? ?public void lock() ? ? ? { acquire(1); } ? ? ? ?public boolean tryLock() { return tryAcquire(1); } ? ? ? ?public void unlock() ? ? { release(1); } ? ? ? ?public boolean isLocked() { return isHeldExclusively(); } ? ? ? ?void interruptIfStarted() { ? ? ? ? ? ?Thread t; ? ? ? ? ? ?if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?t.interrupt(); ? ? ? ? ? ? ? } catch (SecurityException ignore) { ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } ? }
核心方法,通過線程池執(zhí)行任務(wù)(這也是線程池的運(yùn)行原理):
- 檢驗(yàn)任務(wù)
- 獲取當(dāng)前線程池狀態(tài)
- 判斷上班工人數(shù)量是否小于核心員工數(shù)
- 如果小于則招人,安排工作
- 不小于則判斷等候區(qū)任務(wù)是否排滿
- 如果沒有排滿則任務(wù)排入等候區(qū)
- 如果排滿,看是否允許招人,允許招人則招臨時(shí)工
- 如果都不行,該線程池?zé)o法接收新任務(wù),開始按老板約定的拒絕策略,執(zhí)行拒絕策略
? ?public void execute(Runnable command) { ? ? ? ?if (command == null) ? ? ? ? ? ?throw new NullPointerException(); ? ? ? ?int c = ctl.get(); ? ? ? ?if (workerCountOf(c) < corePoolSize) { ? ? ? ? ? ?if (addWorker(command, true)) ? ? ? ? ? ? ? ?return; ? ? ? ? ? ?c = ctl.get(); ? ? ? } ? ? ? ?if (isRunning(c) && workQueue.offer(command)) { ? ? ? ? ? ?int recheck = ctl.get(); ? ? ? ? ? ?if (! isRunning(recheck) && remove(command)) ? ? ? ? ? ? ? ?reject(command); ? ? ? ? ? ?else if (workerCountOf(recheck) == 0) ? ? ? ? ? ? ? ?addWorker(null, false); ? ? ? } ? ? ? ?else if (!addWorker(command, false)) ? ? ? ? ? ?reject(command); ? }
submit()
方法是其抽象父類定義的,這里我們就可以明顯看到submit與execute的區(qū)別,通過submit調(diào)用,我們會(huì)創(chuàng)建RunnableFuture
,并且會(huì)返回Future,這里我們可以將返回值類型,告知submit方法,它就會(huì)通過泛型約束返回值。
public abstract class AbstractExecutorService implements ExecutorService { public Future<?> submit(Runnable task) { ? ? ? ?if (task == null) throw new NullPointerException(); ? ? ? ?RunnableFuture<Void> ftask = newTaskFor(task, null); ? ? ? ?execute(ftask); ? ? ? ?return ftask; ? } ? ?public <T> Future<T> submit(Runnable task, T result) { ? ? ? ?if (task == null) throw new NullPointerException(); ? ? ? ?RunnableFuture<T> ftask = newTaskFor(task, result); ? ? ? ?execute(ftask); ? ? ? ?return ftask; ? } ? ?public <T> Future<T> submit(Callable<T> task) { ? ? ? ?if (task == null) throw new NullPointerException(); ? ? ? ?RunnableFuture<T> ftask = newTaskFor(task); ? ? ? ?execute(ftask); ? ? ? ?return ftask; ? } ... } ? ? ?
addWorker()是招人的一個(gè)方法:
? ?private boolean addWorker(Runnable firstTask, boolean core) { ? ? ? ?retry: ? ? ? ?for (;;) { ? ? ? ? ? ?int c = ctl.get(); ? ? ? ? ? ?int rs = runStateOf(c); ? ? ? ? ? ?// 判斷狀態(tài),及任務(wù)列表 ? ? ? ? ? ?if (rs >= SHUTDOWN && ? ? ? ? ? ? ? ?! (rs == SHUTDOWN && ? ? ? ? ? ? ? ? ? firstTask == null && ? ? ? ? ? ? ? ? ? ! workQueue.isEmpty())) ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ?for (;;) { ? ? ? ? ? ? ? ?int wc = workerCountOf(c); ? ? ? ? ? ? ? ?if (wc >= CAPACITY || ? ? ? ? ? ? ? ? ? ?wc >= (core ? corePoolSize : maximumPoolSize)) ? ? ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ? ? ?if (compareAndIncrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?break retry; ? ? ? ? ? ? ? ?c = ctl.get(); ?// Re-read ctl ? ? ? ? ? ? ? ?if (runStateOf(c) != rs) ? ? ? ? ? ? ? ? ? ?continue retry; ? ? ? ? ? ? ? ?// else CAS failed due to workerCount change; retry inner loop ? ? ? ? ? } ? ? ? } ? ? ? ?boolean workerStarted = false; ? ? ? ?boolean workerAdded = false; ? ? ? ?Worker w = null; ? ? ? ?try { ? ? ? ? ? ?w = new Worker(firstTask); ? ? ? ? ? ?final Thread t = w.thread; ? ? ? ? ? ?if (t != null) { ? ? ? ? ? ? ? ?final ReentrantLock mainLock = this.mainLock; ? ? ? ? ? ? ? ?mainLock.lock(); ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?int rs = runStateOf(ctl.get()); ? ? ? ? ? ? ? ? ? ?if (rs < SHUTDOWN || ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) { ? ? ? ? ? ? ? ? ? ? ? ?if (t.isAlive()) // precheck that t is startable ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalThreadStateException(); ? ? ? ? ? ? ? ? ? ? ? ?workers.add(w); ? ? ? ? ? ? ? ? ? ? ? ?int s = workers.size(); ? ? ? ? ? ? ? ? ? ? ? ?if (s > largestPoolSize) ? ? ? ? ? ? ? ? ? ? ? ? ? ?largestPoolSize = s; ? ? ? ? ? ? ? ? ? ? ? ?workerAdded = true; ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? ? ?mainLock.unlock(); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?if (workerAdded) { ? ? ? ? ? ? ? ? ? ?t.start(); ? ? ? ? ? ? ? ? ? ?workerStarted = true; ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } finally { ? ? ? ? ? ?if (! workerStarted) ? ? ? ? ? ? ? ?addWorkerFailed(w); ? ? ? } ? ? ? ?return workerStarted; ? }
獲取任務(wù)的方法:
? ?private Runnable getTask() { ? ? ? ?boolean timedOut = false; // Did the last poll() time out? ? ? ? ?for (;;) { ? ? ? ? ? ?int c = ctl.get(); ? ? ? ? ? ?int rs = runStateOf(c); ? ? ? ? ? ?// Check if queue empty only if necessary. ? ? ? ? ? ?if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { ? ? ? ? ? ? ? ?decrementWorkerCount(); ? ? ? ? ? ? ? ?return null; ? ? ? ? ? } ? ? ? ? ? ?int wc = workerCountOf(c); ? ? ? ? ? ?// Are workers subject to culling? ? ? ? ? ? ?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ? ? ? ? ? ?if ((wc > maximumPoolSize || (timed && timedOut)) ? ? ? ? ? ? ? ?&& (wc > 1 || workQueue.isEmpty())) { ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?return null; ? ? ? ? ? ? ? ?continue; ? ? ? ? ? } ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Runnable r = timed ? ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ? ? ? ? ? ? ? ? ? ?workQueue.take(); ? ? ? ? ? ? ? ?if (r != null) ? ? ? ? ? ? ? ? ? ?return r; ? ? ? ? ? ? ? ?timedOut = true; ? ? ? ? ? } catch (InterruptedException retry) { ? ? ? ? ? ? ? ?timedOut = false; ? ? ? ? ? } ? ? ? } ? }
讓員工干活的方法,分配任務(wù),運(yùn)行任務(wù):
? final void runWorker(Worker w) { ? ? ? ?Thread wt = Thread.currentThread(); ? ? ? ?Runnable task = w.firstTask; ? ? ? ?w.firstTask = null; ? ? ? ?w.unlock(); // allow interrupts ? ? ? ?boolean completedAbruptly = true; ? ? ? ?try { ? ? ? ? ? ?while (task != null || (task = getTask()) != null) { ? ? ? ? ? ? ? ?w.lock(); ? ? ? ? ? ? ? ?// If pool is stopping, ensure thread is interrupted; ? ? ? ? ? ? ? ?// if not, ensure thread is not interrupted. This ? ? ? ? ? ? ? ?// requires a recheck in second case to deal with ? ? ? ? ? ? ? ?// shutdownNow race while clearing interrupt ? ? ? ? ? ? ? ?if ((runStateAtLeast(ctl.get(), STOP) || ? ? ? ? ? ? ? ? ? ? (Thread.interrupted() && ? ? ? ? ? ? ? ? ? ? ?runStateAtLeast(ctl.get(), STOP))) && ? ? ? ? ? ? ? ? ? ?!wt.isInterrupted()) ? ? ? ? ? ? ? ? ? ?wt.interrupt(); ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?beforeExecute(wt, task); ? ? ? ? ? ? ? ? ? ?Throwable thrown = null; ? ? ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ? ? ?task.run(); ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) { ? ? ? ? ? ? ? ? ? ? ? ?thrown = x; throw x; ? ? ? ? ? ? ? ? ? } catch (Error x) { ? ? ? ? ? ? ? ? ? ? ? ?thrown = x; throw x; ? ? ? ? ? ? ? ? ? } catch (Throwable x) { ? ? ? ? ? ? ? ? ? ? ? ?thrown = x; throw new Error(x); ? ? ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? ? ? ? ?afterExecute(task, thrown); ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? ? ?task = null; ? ? ? ? ? ? ? ? ? ?w.completedTasks++; ? ? ? ? ? ? ? ? ? ?w.unlock(); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? ? ? ?completedAbruptly = false; ? ? ? } finally { ? ? ? ? ? ?processWorkerExit(w, completedAbruptly); ? ? ? } ? }
到此這篇關(guān)于從java源碼分析線程池(池化技術(shù))的實(shí)現(xiàn)原理的文章就介紹到這了,更多相關(guān)Java線程池原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis查詢語(yǔ)句結(jié)果集的總結(jié)大全
這篇文章主要給大家總結(jié)介紹了關(guān)于Mybatis查詢語(yǔ)句結(jié)果集的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08Java Spring-IOC容器與Bean管理之基于注解的方式案例詳解
這篇文章主要介紹了Java Spring-IOC容器與Bean管理之基于注解的方式案例詳解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08Java語(yǔ)言中&&與& ||與|的區(qū)別是什么
這篇文章主要介紹了Java語(yǔ)言中&&與& ||與|的區(qū)別是什么的相關(guān)資料,需要的朋友可以參考下2017-04-04Java實(shí)現(xiàn)獲取小程序帶參二維碼并保存到本地
這篇文章主要介紹了Java實(shí)現(xiàn)獲取小程序帶參二維碼并保存到本地,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10關(guān)于log4j日志擴(kuò)展---自定義PatternLayout
這篇文章主要介紹了關(guān)于log4j日志擴(kuò)展---自定義PatternLayout,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12