Android開發(fā)中線程池源碼解析
線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調(diào)度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時間任務(wù)時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度??捎镁€程數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量。 例如,線程數(shù)一般取cpu數(shù)量+2比較合適,線程數(shù)過多會導(dǎo)致額外的線程切換開銷。----摘自維基百科
我們在Android或者Java開發(fā)中,日常所使用的就是ThreadPoolExecutor了,我們先來看下如何使用一個線程池來代替多線程開發(fā)。
使用線程池
// 創(chuàng)建一個核心線程數(shù)為5,最大線程數(shù)為10,空閑線程存活時間為60s的線程池對象
val threadPoolExecutor = ThreadPoolExecutor(
5, 10, 60,
TimeUnit.MINUTES,
ArrayBlockingQueue<Runnable>(100),
RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }
)
// 測試
for (i in 1..10) {
threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }
}
// 結(jié)果
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-4
// execute thread is:pool-1-thread-3
// execute thread is:pool-1-thread-2
// execute thread is:pool-1-thread-1
從結(jié)果就可以看出來,執(zhí)行時間操作,但是只創(chuàng)建了5個線程,另外5次都是復(fù)用線程的。這樣就達到了復(fù)用存在的線程、減少對象的創(chuàng)建和銷毀的額外開銷;并且可以控制最大線程數(shù),也就是控制了最大并發(fā)數(shù)。
知道如何使用一個線程池還不夠,我們需要看看ThreadPoolExecutor是如何創(chuàng)建、復(fù)用這些線程的。下面我們看看創(chuàng)建ThreadPoolExecutor對象的幾個參數(shù):
構(gòu)造方法
/**
* 創(chuàng)建一個ThreadPoolExecutor對象
*
* @param corePoolSize 核心線程數(shù),這些線程會一直在線程池中,除非設(shè)置了 allowCoreThreadTimeOut
* @param maximumPoolSize 最大線程數(shù),運行線程創(chuàng)建的最大值
* @param keepAliveTime 當線程數(shù)>核心線程數(shù)的時候,這個值就是空閑且非核心線程存活的時間
* @param unit keepAliveTime的單位
* @param workQueue 保存task的隊列,直到執(zhí)行execute()方法執(zhí)行
* @param threadFactory ThreadFactory是一個接口,里面只有Thread newThread(Runnable r)方法,用來創(chuàng)建線程,
* 默認采用Executors.defaultThreadFactory()
* @param handler 拒絕處理任務(wù)時的策略,如果線程池滿了且所有線程都不處于空閑狀態(tài),
* 通過RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)來處理傳進來的Runnable
* 系統(tǒng)提供了四種:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()
* 默認采用new AbortPolicy()
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我在方法頭注釋中我都一一解釋了幾個參數(shù)的作用,還有幾點需要注意的就是:
- 核心線程數(shù)不能小于0;
- 最大線程數(shù)不能小于0;
- 最大線程數(shù)不能小于核心線程數(shù);
- 空閑線程的存活時間不能小于0;
通過上面的解釋我們很明白的知道前面幾個參數(shù)的作用,但是最后兩個參數(shù)我們并不能通過表面的解釋通曉它,既然不能通過表象看懂他倆,那就看看默認的實現(xiàn)是如何做的,這樣在接下來的源碼分析中很有幫助。
ThreadFactory:線程工廠
ThreadFactory 是一個接口,里面只由唯一的 Thread newThread(Runnable r); 方法,此方法是用來創(chuàng)建線程的,從接口中我們得到的就只有這么多,下面我們看看 Executors 默認的 DefaultThreadFactory 類:
// 靜態(tài)內(nèi)部類
static class DefaultThreadFactory implements ThreadFactory {
// 線程池的標識,從1開始沒創(chuàng)建一個線程池+1
private static final AtomicInteger poolNumber = new AtomicInteger(1);
// 線程組
private final ThreadGroup group;
// 線程名中的結(jié)尾標識,從1開始每創(chuàng)建一個線程+1
private final AtomicInteger threadNumber = new AtomicInteger(1);
// 線程名
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
RejectedExecutionHandler:拒絕處理任務(wù)的策略
RejectedExecutionHandler 也是一個接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我們可以自定義策略,也可以用上面提到的封裝好的四種策略,先看一下四種策略分別怎么拒絕任務(wù)的:
CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() {
}
/**
* 如果線程池還沒關(guān)閉,那么就再次執(zhí)行這個Runnable
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() {
}
/**
* 這個策略就是拋出異常,不做其他處理
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() {
}
/**
* 什么也不做,也就是拋棄了這個Runnable
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() {
}
/**
* 1. 線程池未關(guān)閉
* 2. 獲取隊列中的下一個Runnable
* 3. 獲取到了,但是不對它進行處理,也就是拋棄它
* 4. 執(zhí)行我們傳過來的這個Runnable
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
重要的參數(shù)
除了上述構(gòu)造方法中的幾個參數(shù)外,線程池還有幾個比較核心的參數(shù),如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
// ctl 的低29位表示線程池中的線程數(shù),高3位表示當前線程狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// (2^29) -1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 運行狀態(tài):接受新任務(wù)并處理排隊的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
// 關(guān)閉狀態(tài):不接受新任務(wù),但處理排隊的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 停止狀態(tài):不接受新任務(wù),不處理排隊的任務(wù),中斷正在進行的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
// 整理狀態(tài):整理狀態(tài),所有任務(wù)已終止,workerCount為零,線程將運行terminate()方法
private static final int TIDYING = 2 << COUNT_BITS;
// 終止狀態(tài):terminate()方法執(zhí)行完成
private static final int TERMINATED = 3 << COUNT_BITS;
// 表示線程是否允許或停止
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 線程的有效數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
......后面的源碼暫時省略
}
execute:執(zhí)行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果運行中的線程數(shù)小于核心線程數(shù),執(zhí)行addWorker(command, true)創(chuàng)建新的核心Thread執(zhí)行任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 1. 已經(jīng)滿足:運行中的線程數(shù)大于核心線程數(shù),但是小于最大線程數(shù)
// 2. 需要滿足:線程池在運行狀態(tài)
// 3. 需要滿足:添加到工作隊列中成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程不在運行狀態(tài),就從工作隊列中移除command
// 并且執(zhí)行拒絕策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 線程池處于運行狀態(tài),但是沒有線程,則addWorker(null, false)
// 至于這里為什么要傳入一個null,因為在最外層的if條件中我們已經(jīng)將Runnable添加到工作隊列中了
// 而且在runWorker()源碼中也可以得到答案,如果傳入的Runnable為空,就會去工作隊列中取task。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 執(zhí)行addWorker()創(chuàng)建新的非核心線程Thread執(zhí)行任務(wù)
// addWorker() 失敗,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
從上面源碼中可以看出,execute()一個新的任務(wù),主要有以下這幾種情況:
1、核心線程未滿,直接新建核心線程并執(zhí)行任務(wù);
2、核心線程滿了,工作隊列未滿,將任務(wù)添加到工作隊列中;
3、核心線程和工作隊列都滿,但是最大線程數(shù)未達到,新建線程并執(zhí)行任務(wù);
4、上面條件都不滿足,那么就執(zhí)行拒絕策略。
更形象的可以看下方流程圖:

添加任務(wù)的流程圖
addWorker(Runnable , boolean):添加Worker
private boolean addWorker(Runnable firstTask, boolean core) {
// 標記外循環(huán),比如在內(nèi)循環(huán)中break retry就直接跳出外循環(huán)
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 直接返回false有以下3種情況:
// 1. 線程池狀態(tài)為STOP、TIDYING、TERMINATED
// 2. 線程池狀態(tài)不是running狀態(tài),并且firstTask不為空
// 3. 線程池狀態(tài)不是running狀態(tài),并且工作隊列為空
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (; ; ) {
int wc = workerCountOf(c);
// 如果添加的是核心線程,但是運行的線程數(shù)大于等于核心線程數(shù),那么就不添加了,直接返回
// 如果添加的是非核心線程,但是運行的線程數(shù)大于等于最大線程數(shù),那么也不添加,直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加workerCount的值 +1
if (compareAndIncrementWorkerCount(c))
// 跳出外循環(huán)
break retry;
c = ctl.get(); // 重新檢查線程池狀態(tài)
if (runStateOf(c) != rs)
continue retry;
// 重新檢查的狀態(tài)和之前不合,再次從外循環(huán)進入
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 線程池重入鎖
final ReentrantLock mainLock = this.mainLock;
// 獲得鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 線程池在運行狀態(tài)或者是線程池關(guān)閉同時Runnable也為空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 想Worker中添加新的Worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// 如果添加成功,啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker() 主要就是在滿足種種條件(上述源碼中解釋了)后,新建一個Worker對象,并添加到HashSet<Worker> workers中去,最后調(diào)用新建Worker對象的Thread變量的start()方法。
Worker類
Worker是一個繼承了AQS并實現(xiàn)了Runnable的內(nèi)部類,我們重點看看它的run()方法,因為上面addWorker()中,t.start()觸發(fā)的就是它的run()方法:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask;
/**
* Per-thread task counter
*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 這邊是把Runnable傳給了Thread,也就是說Thread.run()就是執(zhí)行了下面的run()方法
this.thread = getThreadFactory().newThread(this);
}
/**
* Delegates main run loop to outer runWorker
*/
public void run() {
runWorker(this);
}
}
run()方法實際調(diào)用了runWorker(Worker)方法
runWorker(Worker)方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 釋放鎖,允許中斷
boolean completedAbruptly = true;
try {
// 1. worker中的task不為空
// 2. 如果worker的task為空,那么取WorkerQueue的task
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這是一個空方法,可由子類實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行task
task.run();
}
.... 省略
// 這是一個空方法,可由子類實現(xiàn)
finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask():
```java
private Runnable getTask() {
// 進入死循環(huán)
for (; ; ) {
try {
// 為true的條件:
// allowCoreThreadTimeOut=true: 核心線程需根據(jù)keepAliveTime超時等待
// 核心線程數(shù)已滿
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果timed為true,執(zhí)行BlockQueue.poll(),這個操作在取不到task的時候會等待keepAliveTime,然后返回null
// 如果timed為false,執(zhí)行BlockQueue.take(),這個操作在隊列為空的時候一直阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
}
}
}
```
線程池的源碼按照上述的幾個方法(execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask())的順序來分析,你就可以很清晰的將運作過程了解清楚,同事構(gòu)造方法和幾個重要的參數(shù)一定要懂,不然對于后面的源碼分析很受阻礙,相信大家通過這篇文章可以加深對線程池的理解。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Android自定義控件實現(xiàn)優(yōu)雅的廣告輪播圖
這篇文章主要為大家詳細介紹了Android自定義控件實現(xiàn)優(yōu)雅的廣告輪播圖,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03
創(chuàng)建子線程對Android進行網(wǎng)絡(luò)訪問
這篇文章介紹了Android中創(chuàng)建子線程進行網(wǎng)絡(luò)訪問的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考2021-11-11
Android線程中設(shè)置控件的值提示報錯的解決方法
這篇文章主要介紹了Android線程中設(shè)置控件的值提示報錯的解決方法,實例分析了textview報錯的原因以及Handler設(shè)置來解決錯誤的實現(xiàn)技巧,需要的朋友可以參考下2016-06-06
怎樣實現(xiàn)android http-post方法實例說明
android http-post方法在開發(fā)中如何實現(xiàn),具體代碼如下,感興趣的朋友可以參考下哈,希望對大家有所幫助2013-06-06
Android ScrollView嵌套ExpandableListView顯示不正常的問題的解決辦法
這篇文章主要介紹了Android ScrollView嵌套ExpandableListView顯示不正常的問題的解決辦法的相關(guān)資料,需要的朋友可以參考下2017-02-02
深入分析Android NFC技術(shù) android nfc開發(fā)
本篇文章我們對android開發(fā)中nfc技術(shù)做了全面的原理分析以及實現(xiàn)過程,需要的讀者們一起參考一下吧。2017-11-11
Android使用RecyclerView實現(xiàn)投票系統(tǒng)
這篇文章主要為大家詳細介紹了Android使用RecyclerView實現(xiàn)投票系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-11-11
Android之IphoneTreeView帶組指示器的ExpandableListView效果
在正在顯示的最上面的組的標簽位置添加一個和組視圖完全一樣的視圖,作為組標簽。這個標簽的位置要隨著列表的滑動不斷變化,以保持總是顯示在最上方,并且該消失的時候就消失2013-06-06

