Android開發(fā)中線程池源碼解析
線程池(英語(yǔ):thread pool):一種線程使用模式。線程過多會(huì)帶來調(diào)度開銷,進(jìn)而影響緩存局部性和整體性能。而線程池維護(hù)著多個(gè)線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷毀線程的代價(jià)。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度??捎镁€程數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量。 例如,線程數(shù)一般取cpu數(shù)量+2比較合適,線程數(shù)過多會(huì)導(dǎo)致額外的線程切換開銷。----摘自維基百科
我們?cè)贏ndroid或者Java開發(fā)中,日常所使用的就是ThreadPoolExecutor了,我們先來看下如何使用一個(gè)線程池來代替多線程開發(fā)。
使用線程池
// 創(chuàng)建一個(gè)核心線程數(shù)為5,最大線程數(shù)為10,空閑線程存活時(shí)間為60s的線程池對(duì)象 val threadPoolExecutor = ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, ArrayBlockingQueue<Runnable>(100), RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") } ) // 測(cè)試 for (i in 1..10) { threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") } } // 結(jié)果 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-5 // execute thread is:pool-1-thread-5 // execute thread is:pool-1-thread-4 // execute thread is:pool-1-thread-3 // execute thread is:pool-1-thread-2 // execute thread is:pool-1-thread-1
從結(jié)果就可以看出來,執(zhí)行時(shí)間操作,但是只創(chuàng)建了5個(gè)線程,另外5次都是復(fù)用線程的。這樣就達(dá)到了復(fù)用存在的線程、減少對(duì)象的創(chuàng)建和銷毀的額外開銷;并且可以控制最大線程數(shù),也就是控制了最大并發(fā)數(shù)。
知道如何使用一個(gè)線程池還不夠,我們需要看看ThreadPoolExecutor是如何創(chuàng)建、復(fù)用這些線程的。下面我們看看創(chuàng)建ThreadPoolExecutor對(duì)象的幾個(gè)參數(shù):
構(gòu)造方法
/** * 創(chuàng)建一個(gè)ThreadPoolExecutor對(duì)象 * * @param corePoolSize 核心線程數(shù),這些線程會(huì)一直在線程池中,除非設(shè)置了 allowCoreThreadTimeOut * @param maximumPoolSize 最大線程數(shù),運(yùn)行線程創(chuàng)建的最大值 * @param keepAliveTime 當(dāng)線程數(shù)>核心線程數(shù)的時(shí)候,這個(gè)值就是空閑且非核心線程存活的時(shí)間 * @param unit keepAliveTime的單位 * @param workQueue 保存task的隊(duì)列,直到執(zhí)行execute()方法執(zhí)行 * @param threadFactory ThreadFactory是一個(gè)接口,里面只有Thread newThread(Runnable r)方法,用來創(chuàng)建線程, * 默認(rèn)采用Executors.defaultThreadFactory() * @param handler 拒絕處理任務(wù)時(shí)的策略,如果線程池滿了且所有線程都不處于空閑狀態(tài), * 通過RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)來處理傳進(jìn)來的Runnable * 系統(tǒng)提供了四種:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy() * 默認(rèn)采用new AbortPolicy() */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
我在方法頭注釋中我都一一解釋了幾個(gè)參數(shù)的作用,還有幾點(diǎn)需要注意的就是:
- 核心線程數(shù)不能小于0;
- 最大線程數(shù)不能小于0;
- 最大線程數(shù)不能小于核心線程數(shù);
- 空閑線程的存活時(shí)間不能小于0;
通過上面的解釋我們很明白的知道前面幾個(gè)參數(shù)的作用,但是最后兩個(gè)參數(shù)我們并不能通過表面的解釋通曉它,既然不能通過表象看懂他倆,那就看看默認(rèn)的實(shí)現(xiàn)是如何做的,這樣在接下來的源碼分析中很有幫助。
ThreadFactory:線程工廠
ThreadFactory 是一個(gè)接口,里面只由唯一的 Thread newThread(Runnable r); 方法,此方法是用來創(chuàng)建線程的,從接口中我們得到的就只有這么多,下面我們看看 Executors 默認(rèn)的 DefaultThreadFactory 類:
// 靜態(tài)內(nèi)部類 static class DefaultThreadFactory implements ThreadFactory { // 線程池的標(biāo)識(shí),從1開始沒創(chuàng)建一個(gè)線程池+1 private static final AtomicInteger poolNumber = new AtomicInteger(1); // 線程組 private final ThreadGroup group; // 線程名中的結(jié)尾標(biāo)識(shí),從1開始每創(chuàng)建一個(gè)線程+1 private final AtomicInteger threadNumber = new AtomicInteger(1); // 線程名 private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
RejectedExecutionHandler:拒絕處理任務(wù)的策略
RejectedExecutionHandler 也是一個(gè)接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我們可以自定義策略,也可以用上面提到的封裝好的四種策略,先看一下四種策略分別怎么拒絕任務(wù)的:
CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * 如果線程池還沒關(guān)閉,那么就再次執(zhí)行這個(gè)Runnable */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * 這個(gè)策略就是拋出異常,不做其他處理 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * 什么也不做,也就是拋棄了這個(gè)Runnable */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * 1. 線程池未關(guān)閉 * 2. 獲取隊(duì)列中的下一個(gè)Runnable * 3. 獲取到了,但是不對(duì)它進(jìn)行處理,也就是拋棄它 * 4. 執(zhí)行我們傳過來的這個(gè)Runnable */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
重要的參數(shù)
除了上述構(gòu)造方法中的幾個(gè)參數(shù)外,線程池還有幾個(gè)比較核心的參數(shù),如下:
public class ThreadPoolExecutor extends AbstractExecutorService { // ctl 的低29位表示線程池中的線程數(shù),高3位表示當(dāng)前線程狀態(tài) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29 private static final int COUNT_BITS = Integer.SIZE - 3; // (2^29) -1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 運(yùn)行狀態(tài):接受新任務(wù)并處理排隊(duì)的任務(wù) private static final int RUNNING = -1 << COUNT_BITS; // 關(guān)閉狀態(tài):不接受新任務(wù),但處理排隊(duì)的任務(wù) private static final int SHUTDOWN = 0 << COUNT_BITS; // 停止?fàn)顟B(tài):不接受新任務(wù),不處理排隊(duì)的任務(wù),中斷正在進(jìn)行的任務(wù) private static final int STOP = 1 << COUNT_BITS; // 整理狀態(tài):整理狀態(tài),所有任務(wù)已終止,workerCount為零,線程將運(yùn)行terminate()方法 private static final int TIDYING = 2 << COUNT_BITS; // 終止?fàn)顟B(tài):terminate()方法執(zhí)行完成 private static final int TERMINATED = 3 << COUNT_BITS; // 表示線程是否允許或停止 private static int runStateOf(int c) { return c & ~CAPACITY; } // 線程的有效數(shù)量 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } ......后面的源碼暫時(shí)省略 }
execute:執(zhí)行
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果運(yùn)行中的線程數(shù)小于核心線程數(shù),執(zhí)行addWorker(command, true)創(chuàng)建新的核心Thread執(zhí)行任務(wù) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 1. 已經(jīng)滿足:運(yùn)行中的線程數(shù)大于核心線程數(shù),但是小于最大線程數(shù) // 2. 需要滿足:線程池在運(yùn)行狀態(tài) // 3. 需要滿足:添加到工作隊(duì)列中成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果線程不在運(yùn)行狀態(tài),就從工作隊(duì)列中移除command // 并且執(zhí)行拒絕策略 if (!isRunning(recheck) && remove(command)) reject(command); // 線程池處于運(yùn)行狀態(tài),但是沒有線程,則addWorker(null, false) // 至于這里為什么要傳入一個(gè)null,因?yàn)樵谧钔鈱拥膇f條件中我們已經(jīng)將Runnable添加到工作隊(duì)列中了 // 而且在runWorker()源碼中也可以得到答案,如果傳入的Runnable為空,就會(huì)去工作隊(duì)列中取task。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 執(zhí)行addWorker()創(chuàng)建新的非核心線程Thread執(zhí)行任務(wù) // addWorker() 失敗,執(zhí)行拒絕策略 else if (!addWorker(command, false)) reject(command); }
從上面源碼中可以看出,execute()一個(gè)新的任務(wù),主要有以下這幾種情況:
1、核心線程未滿,直接新建核心線程并執(zhí)行任務(wù);
2、核心線程滿了,工作隊(duì)列未滿,將任務(wù)添加到工作隊(duì)列中;
3、核心線程和工作隊(duì)列都滿,但是最大線程數(shù)未達(dá)到,新建線程并執(zhí)行任務(wù);
4、上面條件都不滿足,那么就執(zhí)行拒絕策略。
更形象的可以看下方流程圖:
添加任務(wù)的流程圖
addWorker(Runnable , boolean):添加Worker
private boolean addWorker(Runnable firstTask, boolean core) { // 標(biāo)記外循環(huán),比如在內(nèi)循環(huán)中break retry就直接跳出外循環(huán) retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 直接返回false有以下3種情況: // 1. 線程池狀態(tài)為STOP、TIDYING、TERMINATED // 2. 線程池狀態(tài)不是running狀態(tài),并且firstTask不為空 // 3. 線程池狀態(tài)不是running狀態(tài),并且工作隊(duì)列為空 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { int wc = workerCountOf(c); // 如果添加的是核心線程,但是運(yùn)行的線程數(shù)大于等于核心線程數(shù),那么就不添加了,直接返回 // 如果添加的是非核心線程,但是運(yùn)行的線程數(shù)大于等于最大線程數(shù),那么也不添加,直接返回 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增加workerCount的值 +1 if (compareAndIncrementWorkerCount(c)) // 跳出外循環(huán) break retry; c = ctl.get(); // 重新檢查線程池狀態(tài) if (runStateOf(c) != rs) continue retry; // 重新檢查的狀態(tài)和之前不合,再次從外循環(huán)進(jìn)入 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 線程池重入鎖 final ReentrantLock mainLock = this.mainLock; // 獲得鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 線程池在運(yùn)行狀態(tài)或者是線程池關(guān)閉同時(shí)Runnable也為空 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 想Worker中添加新的Worker workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 釋放鎖 mainLock.unlock(); } // 如果添加成功,啟動(dòng)線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker() 主要就是在滿足種種條件(上述源碼中解釋了)后,新建一個(gè)Worker對(duì)象,并添加到HashSet<Worker> workers中去,最后調(diào)用新建Worker對(duì)象的Thread變量的start()方法。
Worker類
Worker是一個(gè)繼承了AQS并實(shí)現(xiàn)了Runnable的內(nèi)部類,我們重點(diǎn)看看它的run()方法,因?yàn)樯厦鎍ddWorker()中,t.start()觸發(fā)的就是它的run()方法:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** * Thread this worker is running in. Null if factory fails. */ final Thread thread; /** * Initial task to run. Possibly null. */ Runnable firstTask; /** * Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 這邊是把Runnable傳給了Thread,也就是說Thread.run()就是執(zhí)行了下面的run()方法 this.thread = getThreadFactory().newThread(this); } /** * Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } }
run()方法實(shí)際調(diào)用了runWorker(Worker)方法
runWorker(Worker)方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 釋放鎖,允許中斷 boolean completedAbruptly = true; try { // 1. worker中的task不為空 // 2. 如果worker的task為空,那么取WorkerQueue的task while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 這是一個(gè)空方法,可由子類實(shí)現(xiàn) beforeExecute(wt, task); Throwable thrown = null; try { // 執(zhí)行task task.run(); } .... 省略 // 這是一個(gè)空方法,可由子類實(shí)現(xiàn) finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask():
```java private Runnable getTask() { // 進(jìn)入死循環(huán) for (; ; ) { try { // 為true的條件: // allowCoreThreadTimeOut=true: 核心線程需根據(jù)keepAliveTime超時(shí)等待 // 核心線程數(shù)已滿 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果timed為true,執(zhí)行BlockQueue.poll(),這個(gè)操作在取不到task的時(shí)候會(huì)等待keepAliveTime,然后返回null // 如果timed為false,執(zhí)行BlockQueue.take(),這個(gè)操作在隊(duì)列為空的時(shí)候一直阻塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; } } } ```
線程池的源碼按照上述的幾個(gè)方法(execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask())的順序來分析,你就可以很清晰的將運(yùn)作過程了解清楚,同事構(gòu)造方法和幾個(gè)重要的參數(shù)一定要懂,不然對(duì)于后面的源碼分析很受阻礙,相信大家通過這篇文章可以加深對(duì)線程池的理解。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Android自定義控件實(shí)現(xiàn)優(yōu)雅的廣告輪播圖
這篇文章主要為大家詳細(xì)介紹了Android自定義控件實(shí)現(xiàn)優(yōu)雅的廣告輪播圖,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-03-03創(chuàng)建子線程對(duì)Android進(jìn)行網(wǎng)絡(luò)訪問
這篇文章介紹了Android中創(chuàng)建子線程進(jìn)行網(wǎng)絡(luò)訪問的方法,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考2021-11-11Android線程中設(shè)置控件的值提示報(bào)錯(cuò)的解決方法
這篇文章主要介紹了Android線程中設(shè)置控件的值提示報(bào)錯(cuò)的解決方法,實(shí)例分析了textview報(bào)錯(cuò)的原因以及Handler設(shè)置來解決錯(cuò)誤的實(shí)現(xiàn)技巧,需要的朋友可以參考下2016-06-06怎樣實(shí)現(xiàn)android http-post方法實(shí)例說明
android http-post方法在開發(fā)中如何實(shí)現(xiàn),具體代碼如下,感興趣的朋友可以參考下哈,希望對(duì)大家有所幫助2013-06-06Android ScrollView嵌套ExpandableListView顯示不正常的問題的解決辦法
這篇文章主要介紹了Android ScrollView嵌套ExpandableListView顯示不正常的問題的解決辦法的相關(guān)資料,需要的朋友可以參考下2017-02-02Android實(shí)現(xiàn)隨機(jī)生成驗(yàn)證碼
在登錄注冊(cè)軟件時(shí),經(jīng)常會(huì)要求填寫隨機(jī)驗(yàn)證碼,這篇文章為大家詳細(xì)主要介紹了Android實(shí)現(xiàn)隨機(jī)生成驗(yàn)證碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08深入分析Android NFC技術(shù) android nfc開發(fā)
本篇文章我們對(duì)android開發(fā)中nfc技術(shù)做了全面的原理分析以及實(shí)現(xiàn)過程,需要的讀者們一起參考一下吧。2017-11-11Android使用RecyclerView實(shí)現(xiàn)投票系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Android使用RecyclerView實(shí)現(xiàn)投票系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-11-11Android之IphoneTreeView帶組指示器的ExpandableListView效果
在正在顯示的最上面的組的標(biāo)簽位置添加一個(gè)和組視圖完全一樣的視圖,作為組標(biāo)簽。這個(gè)標(biāo)簽的位置要隨著列表的滑動(dòng)不斷變化,以保持總是顯示在最上方,并且該消失的時(shí)候就消失2013-06-06Android?Flutter制作一個(gè)修改組件屬性的動(dòng)畫
flutter為我們提供了一個(gè)AnimationController來對(duì)動(dòng)畫進(jìn)行詳盡的控制,不過直接是用AnimationController是比較復(fù)雜的,如果只是對(duì)一個(gè)widget的屬性進(jìn)行修改,可以做成動(dòng)畫嗎,本文就來探討一下2023-05-05