如何解決executors線程池創(chuàng)建的線程不釋放的問題
executors線程池創(chuàng)建的線程不釋放問題
我們通過executors.newFixThreadPool 創(chuàng)建指定大小的線程池,在所有線程都結(jié)束后,線程池并未釋放線程。
我們之后再新建的線程池的線程將一直累加。
解決這個(gè)問題
只需要設(shè)置如下:
? ? ? ? ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); ? ? ? ? ? ? ?executor.setKeepAliveTime(10, TimeUnit.SECONDS); ? ? ? ? ? ? ?executor.allowCoreThreadTimeOut(true);
這樣設(shè)置后,隊(duì)列的任務(wù)全部結(jié)束后將,釋放所有的線程。
線程池中的線程為什么不會(huì)釋放而是循環(huán)等待任務(wù)呢
線程池
之前一直有這個(gè)疑問:我們平時(shí)使用線程都是各種new Thread(),然后直接在run()方法里面執(zhí)行我們要做的各種操作,使用完后需要做什么管理嗎?線程池為什么能維持住核心線程不釋放,一直接收任務(wù)進(jìn)行處理呢?
線程
線程無他,主要有兩個(gè)方法,我們先看看start()方法介紹:
/**
* Causes this thread to begin execution; the Java Virtual Machine
* calls the <code>run</code> method of this thread.
* <p>
* The result is that two threads are running concurrently: the
* current thread (which returns from the call to the
* <code>start</code> method) and the other thread (which executes its
* <code>run</code> method).
* <p>
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
*
* @exception IllegalThreadStateException if the thread was already
* started.
* @see #run()
* @see #stop()
*/
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
started = false;
try {
nativeCreate(this, stackSize, daemon);
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}- 從這個(gè)方法解釋上看,start()這個(gè)方法,最終會(huì)交給VM 去執(zhí)行run()方法,所以一般情況下,我們在隨便一個(gè)線程上執(zhí)行start(),里面的run()操作都會(huì)交給VM 去執(zhí)行。
- 而且還說明,重復(fù)啟用線程是不合法的,當(dāng)一個(gè)線程完成的時(shí)候,may not be restarted once。
- 那么這種情況下,線程池是怎么做的?他為什么就能夠重復(fù)執(zhí)行各種任務(wù)呢?
帶著各種疑問,我們?nèi)タ纯淳€程池自己是怎么實(shí)現(xiàn)的。
線程池
線程池常用的創(chuàng)建方法有那么幾種:
- 1. newFixedThreadPool()
- 2. newSingleThreadExecutor()
- 3. newCachedThreadPool()
- 4. newScheduledThreadPool()
這4個(gè)方法創(chuàng)建的線程池實(shí)例具體就不一一介紹,無非是創(chuàng)建線程的多少,以及回收等問題,因?yàn)槠鋵?shí)這4個(gè)方法最后都會(huì)調(diào)用統(tǒng)一的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}具體來說只是這幾個(gè)值的不同決定了4個(gè)線程池的作用:
1. corePoolSize 代表核心線程池的個(gè)數(shù),當(dāng)線程池當(dāng)前的個(gè)數(shù)大于核心線程池的時(shí)候,線程池會(huì)回收多出來的線程
2. maximumPoolSize 代表最大的線程池個(gè)數(shù),當(dāng)線程池需要執(zhí)行的任務(wù)大于核心線程池的時(shí)候,會(huì)創(chuàng)建更多的線程,但是最大不能超過這個(gè)數(shù)
3. keepAliveTime 代表空余的線程存活的時(shí)間,當(dāng)多余的線程完成任務(wù)的時(shí)候,需要多長時(shí)間進(jìn)行回收,時(shí)間單位是unit 去控制
4. workQueue 非常重要,這個(gè)工作隊(duì)列會(huì)存放所有待執(zhí)行的Runnable對(duì)象
@param workQueue the queue to use for holding tasks before they areexecuted. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.
我們平時(shí)在使用線程池的時(shí)候,都是直接 實(shí)例.execute(Runnable),一起跟進(jìn)去,看看這個(gè)方法具體做了什么
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//結(jié)合上文的注釋,我們得知,第一次,先判斷當(dāng)前的核心線程數(shù),
//如果小于初始化的值,馬上創(chuàng)建;然后第二個(gè)if,將這個(gè)任務(wù)插入到工作線程,雙重判斷任務(wù),
//假定如果前面不能直接加入到線程池Worker集合里,則加入到workQueue隊(duì)列等待執(zhí)行。
//里面的if else判斷語句則是檢查當(dāng)前線程池的狀態(tài)。如果線程池本身的狀態(tài)是要關(guān)閉并清理了,
//我們則不能提交線程進(jìn)去了。這里我們就要reject他們。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}所以其實(shí)主要起作用的還是addWorker()方法,我們繼續(xù)跟蹤進(jìn)去:
private boolean addWorker(Runnable firstTask, boolean core) {
···多余代碼
try {
w = new Worker(firstTask); 1.重點(diǎn)
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); 2. 重點(diǎn)
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}我們看重點(diǎn)部分,其實(shí)最重要的是firstTask這個(gè)Runnable,我們一直跟蹤這個(gè)對(duì)象就可以了,這個(gè)對(duì)象會(huì)new Worker(),那么這個(gè)wroker()就是一個(gè)包裝類,里面帶著我們實(shí)際需要執(zhí)行的任務(wù),后面進(jìn)行一系列的判斷就會(huì)執(zhí)行t.start(); 這個(gè)t 就是包裝類worker類里面的Thread,所以整個(gè)邏輯又轉(zhuǎn)化進(jìn)去Worker內(nèi)部。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
...省略代碼
}這個(gè)Worker包裝類,重要的屬性兩個(gè),thread 就是剛才上面那個(gè)方法執(zhí)行的start()對(duì)象,這個(gè)thread又是把這個(gè)worker對(duì)象本身作為一個(gè)Runnable對(duì)象構(gòu)建出來的,那么當(dāng)我們調(diào)用thread.start()方法時(shí)候,實(shí)際調(diào)用的就是Worker類的run()方法?,F(xiàn)在又要追蹤進(jìn)去,看這個(gè)runWorker(this),做的是什么鬼東西
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}這個(gè)方法還是比較好懂的:
1. 一個(gè)大循環(huán),判斷條件是task != null || (task = getTask()) != null,task自然就是我們要執(zhí)行的任務(wù)了,當(dāng)task空而且getTask()取不到任務(wù)的時(shí)候,這個(gè)while()就會(huì)結(jié)束,循環(huán)體里面進(jìn)行的就是task.run();
2.這里我們其實(shí)可以打個(gè)心眼,那基本八九不離十了,肯定是這個(gè)循環(huán)一直沒有退出,所以才能維持著這一個(gè)線程不斷運(yùn)行,當(dāng)有外部任務(wù)進(jìn)來的時(shí)候,循環(huán)體就能getTask()并且執(zhí)行。
3.下面最后放getTask()里面的代碼,驗(yàn)證猜想
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}真相大白了,里面進(jìn)行的也是一個(gè)死循環(huán),主要看 Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
工作隊(duì)列workQueue會(huì)一直去拿任務(wù),屬于核心線程的會(huì)一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心線程會(huì) workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時(shí)還沒有拿到,下一次循環(huán)判斷compareAndDecrementWorkerCount就會(huì)返回null,Worker對(duì)象的run()方法循環(huán)體的判斷為null,任務(wù)結(jié)束,然后線程被系統(tǒng)回收
注意:
一句話可以概述了,線程池就是用一堆包裝住Thread的Wroker類的集合,在里面有條件的進(jìn)行著死循環(huán),從而可以不斷接受任務(wù)來進(jìn)行。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java使用DateFormatter格式化日期時(shí)間的方法示例
這篇文章主要介紹了Java使用DateFormatter格式化日期時(shí)間的方法,結(jié)合具體實(shí)例分析了java使用DateFormatter格式化日期時(shí)間的相關(guān)操作技巧,需要的朋友可以參考下2017-04-04
java配置dbcp連接池(數(shù)據(jù)庫連接池)示例分享
Spring Boot啟動(dòng)流程斷點(diǎn)過程解析
springMVC中@RequestParam和@RequestPart的區(qū)別
Java基于Lock的生產(chǎn)者消費(fèi)者模型示例
SpringBoot連接MySql數(shù)據(jù)庫的原理及代碼示例
Java排序之冒泡排序的實(shí)現(xiàn)與優(yōu)化
springboot實(shí)現(xiàn)敏感字段加密存儲(chǔ)解密顯示功能

