如何解決executors線程池創(chuàng)建的線程不釋放的問題
executors線程池創(chuàng)建的線程不釋放問題
我們通過executors.newFixThreadPool 創(chuàng)建指定大小的線程池,在所有線程都結(jié)束后,線程池并未釋放線程。
我們之后再新建的線程池的線程將一直累加。
解決這個問題
只需要設(shè)置如下:
? ? ? ? ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); ? ? ? ? ? ? ?executor.setKeepAliveTime(10, TimeUnit.SECONDS); ? ? ? ? ? ? ?executor.allowCoreThreadTimeOut(true);
這樣設(shè)置后,隊列的任務(wù)全部結(jié)束后將,釋放所有的線程。
線程池中的線程為什么不會釋放而是循環(huán)等待任務(wù)呢
線程池
之前一直有這個疑問:我們平時使用線程都是各種new Thread(),然后直接在run()方法里面執(zhí)行我們要做的各種操作,使用完后需要做什么管理嗎?線程池為什么能維持住核心線程不釋放,一直接收任務(wù)進行處理呢?
線程
線程無他,主要有兩個方法,我們先看看start()方法介紹:
/** * Causes this thread to begin execution; the Java Virtual Machine * calls the <code>run</code> method of this thread. * <p> * The result is that two threads are running concurrently: the * current thread (which returns from the call to the * <code>start</code> method) and the other thread (which executes its * <code>run</code> method). * <p> * It is never legal to start a thread more than once. * In particular, a thread may not be restarted once it has completed * execution. * * @exception IllegalThreadStateException if the thread was already * started. * @see #run() * @see #stop() */ public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); started = false; try { nativeCreate(this, stackSize, daemon); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
- 從這個方法解釋上看,start()這個方法,最終會交給VM 去執(zhí)行run()方法,所以一般情況下,我們在隨便一個線程上執(zhí)行start(),里面的run()操作都會交給VM 去執(zhí)行。
- 而且還說明,重復(fù)啟用線程是不合法的,當一個線程完成的時候,may not be restarted once。
- 那么這種情況下,線程池是怎么做的?他為什么就能夠重復(fù)執(zhí)行各種任務(wù)呢?
帶著各種疑問,我們?nèi)タ纯淳€程池自己是怎么實現(xiàn)的。
線程池
線程池常用的創(chuàng)建方法有那么幾種:
- 1. newFixedThreadPool()
- 2. newSingleThreadExecutor()
- 3. newCachedThreadPool()
- 4. newScheduledThreadPool()
這4個方法創(chuàng)建的線程池實例具體就不一一介紹,無非是創(chuàng)建線程的多少,以及回收等問題,因為其實這4個方法最后都會調(diào)用統(tǒng)一的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
具體來說只是這幾個值的不同決定了4個線程池的作用:
1. corePoolSize 代表核心線程池的個數(shù),當線程池當前的個數(shù)大于核心線程池的時候,線程池會回收多出來的線程
2. maximumPoolSize 代表最大的線程池個數(shù),當線程池需要執(zhí)行的任務(wù)大于核心線程池的時候,會創(chuàng)建更多的線程,但是最大不能超過這個數(shù)
3. keepAliveTime 代表空余的線程存活的時間,當多余的線程完成任務(wù)的時候,需要多長時間進行回收,時間單位是unit 去控制
4. workQueue 非常重要,這個工作隊列會存放所有待執(zhí)行的Runnable對象
@param workQueue the queue to use for holding tasks before they areexecuted. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.
我們平時在使用線程池的時候,都是直接 實例.execute(Runnable),一起跟進去,看看這個方法具體做了什么
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //結(jié)合上文的注釋,我們得知,第一次,先判斷當前的核心線程數(shù), //如果小于初始化的值,馬上創(chuàng)建;然后第二個if,將這個任務(wù)插入到工作線程,雙重判斷任務(wù), //假定如果前面不能直接加入到線程池Worker集合里,則加入到workQueue隊列等待執(zhí)行。 //里面的if else判斷語句則是檢查當前線程池的狀態(tài)。如果線程池本身的狀態(tài)是要關(guān)閉并清理了, //我們則不能提交線程進去了。這里我們就要reject他們。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
所以其實主要起作用的還是addWorker()方法,我們繼續(xù)跟蹤進去:
private boolean addWorker(Runnable firstTask, boolean core) { ···多余代碼 try { w = new Worker(firstTask); 1.重點 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); 2. 重點 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
我們看重點部分,其實最重要的是firstTask這個Runnable,我們一直跟蹤這個對象就可以了,這個對象會new Worker(),那么這個wroker()就是一個包裝類,里面帶著我們實際需要執(zhí)行的任務(wù),后面進行一系列的判斷就會執(zhí)行t.start(); 這個t 就是包裝類worker類里面的Thread,所以整個邏輯又轉(zhuǎn)化進去Worker內(nèi)部。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { runWorker(this); } ...省略代碼 }
這個Worker包裝類,重要的屬性兩個,thread 就是剛才上面那個方法執(zhí)行的start()對象,這個thread又是把這個worker對象本身作為一個Runnable對象構(gòu)建出來的,那么當我們調(diào)用thread.start()方法時候,實際調(diào)用的就是Worker類的run()方法?,F(xiàn)在又要追蹤進去,看這個runWorker(this),做的是什么鬼東西
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
這個方法還是比較好懂的:
1. 一個大循環(huán),判斷條件是task != null || (task = getTask()) != null,task自然就是我們要執(zhí)行的任務(wù)了,當task空而且getTask()取不到任務(wù)的時候,這個while()就會結(jié)束,循環(huán)體里面進行的就是task.run();
2.這里我們其實可以打個心眼,那基本八九不離十了,肯定是這個循環(huán)一直沒有退出,所以才能維持著這一個線程不斷運行,當有外部任務(wù)進來的時候,循環(huán)體就能getTask()并且執(zhí)行。
3.下面最后放getTask()里面的代碼,驗證猜想
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
真相大白了,里面進行的也是一個死循環(huán),主要看 Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
工作隊列workQueue會一直去拿任務(wù),屬于核心線程的會一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心線程會 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時還沒有拿到,下一次循環(huán)判斷compareAndDecrementWorkerCount就會返回null,Worker對象的run()方法循環(huán)體的判斷為null,任務(wù)結(jié)束,然后線程被系統(tǒng)回收
注意:
一句話可以概述了,線程池就是用一堆包裝住Thread的Wroker類的集合,在里面有條件的進行著死循環(huán),從而可以不斷接受任務(wù)來進行。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java使用DateFormatter格式化日期時間的方法示例
這篇文章主要介紹了Java使用DateFormatter格式化日期時間的方法,結(jié)合具體實例分析了java使用DateFormatter格式化日期時間的相關(guān)操作技巧,需要的朋友可以參考下2017-04-04

java配置dbcp連接池(數(shù)據(jù)庫連接池)示例分享

springMVC中@RequestParam和@RequestPart的區(qū)別

SpringBoot連接MySql數(shù)據(jù)庫的原理及代碼示例

springboot實現(xiàn)敏感字段加密存儲解密顯示功能