Java線程池submit阻塞獲取結果的實現(xiàn)原理詳解
前言
Java線程池中提交任務運行,通常使用execute()方法就足夠了。那如果想要實現(xiàn)在主線程中阻塞獲取線程池任務運行的結果,該怎么辦呢?答案是用submit()方法提交任務。這也是面試中經(jīng)常被問到的一個知識點,execute()和submit()提交任務的的區(qū)別是什么?底層是如何實現(xiàn)的?
案例演示
現(xiàn)在我們通過簡單的例子演示下submit()方法的妙處。
@Test
public void testSubmit() throws ExecutionException, InterruptedException {
// 創(chuàng)建一個核心線程數(shù)為5的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
// 創(chuàng)建一個計算任務
Callable<Integer> myTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 10000; i++) {
result += i;
}
Thread.sleep(1000);
return result;
}
};
log.info("start submit task .....");
Future<Integer> future = threadPoolExecutor.submit(myTask);
Integer sum = future.get();
log.info("get submit result: [{}]", sum);
// use sum do other things
}
運行結果:

主線程的確阻塞等待線程返回。
Future類API
我們看到用submit提交任務最后返回一個Future對象,F(xiàn)uture表示異步計算的結果。那它都提供了什么API呢?
| 方法 | 說明 |
|---|---|
| V get() | 等待任務執(zhí)行完成,然后獲取其結果。 |
| V get(long timeout, TimeUnit unit) | 等待獲取任務執(zhí)行的結果,如果任務超過一定時間沒有執(zhí)行完畢,直接返回,拋出異常,不會一直等待下去。 |
| boolean isDone() | 如果此任務已完成,則返回true。完成可能是由于正常終止、異?;蛉∠?mdash;—在所有這些情況下,該方法都將返回true。 |
| boolean isCancelled() | 如果該任務在正常完成之前被取消,則返回true。 |
| boolean cancel(boolean mayInterruptIfRunning) | 試圖取消此任務的執(zhí)行。1. 如果任務已經(jīng)完成、已經(jīng)取消或由于其他原因無法取消,則此嘗試將失敗。 |
- 如果在調用cancel時此任務尚未啟動,則此任務不應運行。
- 如果任務已經(jīng)開始,那么mayInterruptIfRunning參數(shù)確定是否應該中斷執(zhí)行此任務的線程以試圖停止該任務。 |
和execute區(qū)別
從功能層面,我們已經(jīng)很明白他們最大區(qū)別,
execute()方式提交任務沒有返回值,直接線程中池異步運行任務。submit()方式提交任務有返回值Future, 調用get方法可以阻塞調用線程,等待任務運行返回的結果。
那從源碼層面,二者又有什么區(qū)別和聯(lián)系呢?
我們看下submit()提交的入口方法,代碼如下:
// AbstractExecutorService#submit
public <T> Future<T> submit(Callable<T> task) {
// 判空處理
if (task == null) throw new NullPointerException();
// 將提交的任務包裝成RunnableFuture
RunnableFuture<T> ftask = newTaskFor(task);
// 最終還是調用execute方法執(zhí)行任務
execute(ftask);
return ftask;
}
殊途同歸,最終都是調用execute()方法,只不過submit()方法在調用前做一層包裝,將任務包裝成RunnableFuture對象。
關于線程池中execute()方法提交的流程和原理實現(xiàn)不理解的,強烈建議先學習這篇文章:Java線程池源碼深度解析。
原理實現(xiàn)
本節(jié)內容我們聚焦在submit()方法的實現(xiàn)原理。
我們先思考下,如果讓我們設計實現(xiàn)調用get阻塞知道線程返回結果,要考慮哪些方面呢?
- 任務是否執(zhí)行結束或者執(zhí)行出錯等情況,是不是需要有個狀態(tài)位標記?
- 任務的執(zhí)行結果如何保存?
- 如果任務沒有執(zhí)行結束,如何阻塞當前線程,
LockSupport.park()是一種方式。 - 如果有多個外部線程獲取get,是不是應該也要把外部線程存下來,怎么存?因為后面任務執(zhí)行完后需要喚醒他們。
帶著這些問題和基本思路我們看下jdk8中是如何實現(xiàn)的。
RunnableFuture類介紹
submit()方法中調用newTaskFor()方法獲取RunnableFuture對象。
// AbstractExecutorService#newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 調用FutureTask的構造方法返回RunnableFuture對象
return new FutureTask<T>(callable);
}
FutureTask類結構圖如下:

FutureTask是一個異步計算任務,包裝了我們外部提交的任務。
- 實現(xiàn)了Runnable接口
- 實現(xiàn)了Future接口,該接口封裝了任務結果的獲取、任務是否結束等接口。
RunnableFuture類重要屬性
1.任務運行狀態(tài)state
// 存儲當前任務運行狀態(tài) private volatile int state; // 當前任務尚未執(zhí)行 private static final int NEW = 0; // 當前任務正在結束,尚未完全結束,一種臨界狀態(tài) private static final int COMPLETING = 1; // 當前任務正常結束 private static final int NORMAL = 2; // 當前任務執(zhí)行過程中發(fā)生了異常 private static final int EXCEPTIONAL = 3; // 當前任務被取消 private static final int CANCELLED = 4; // 當前任務中斷中 private static final int INTERRUPTING = 5; // 當前任務已中斷 private static final int INTERRUPTED = 6;
可能的狀態(tài)轉換有如下幾種:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
2.真正要執(zhí)行的任務callble
// 存放真正提交的原始任務 private Callable<V> callable;
3.存放執(zhí)行結果outcome
返回的結果或從get()中拋出的異常 private Object outcome;
4.當前正在運行任務的線程runner
//當前任務被線程執(zhí)行期間,保存當前任務的線程對象引用 private volatile Thread runner;
5.調用get獲取任務結果的等待線程集合waiters
//因為會有很多線程去get當前任務的結果,所以這里使用了一種stack數(shù)據(jù)結構來保存
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
數(shù)據(jù)結構如下圖:

RunnableFuture類構造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
// 設置要執(zhí)行的任務
this.callable = callable;
// 初始化時任務狀態(tài)為NEW
this.state = NEW;
}
任務執(zhí)行run()原理
submit()方法最終調用線程池的execute()方法,而execute()方法會創(chuàng)建出"工人"Worker對象,調用runWorker()方法,它主要是執(zhí)行外部提交的任務,也就是這里的FutureTask對象的run()方法, 我們重點看下run()方法。
FutureTask#run()開始執(zhí)行任務。
它主要的功能是完成包裝的callable的call方法執(zhí)行,并將執(zhí)行結果保存到outcome中,同時捕獲了call方法執(zhí)行出現(xiàn)的異常,并保存異常信息,而不是直接拋出。
public void run() {
// 狀態(tài)機不為NEW表示執(zhí)行完成或任務被取消了,直接返回
// 狀態(tài)機為NEW,同時將runner設置為當前線程,保證同一時刻只有一個線程執(zhí)行run方法,如果設置失敗也直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 取出原始的任務檢測不為空 且 再次檢查狀態(tài)為NEW(雙重校驗)
if (c != null && state == NEW) {
// 任務運行的結果
V result;
// 任務是否運行是否正常, true:正常, false-異常
boolean ran;
try {
// 任務執(zhí)行,將結果返回給result
result = c.call();
// 設置任務運行正常
ran = true;
} catch (Throwable ex) {
// 任務運行報錯的情況
// 設置結果為空
result = null;
// 設置任務運行異常標記
ran = false;
// 任務執(zhí)行拋出異常時,保存異常信息,而不直接拋出
setException(ex);
}
// 執(zhí)行成功則保存結果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 執(zhí)行完成后設置runner為null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 獲取任務狀態(tài)
int s = state;
// 如果被置為了中斷狀態(tài)則進行中斷的處理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask#set()方法處理正常執(zhí)行的運行結果
setException()方法主要完成做下面的工作。
- 將執(zhí)行結果保存到outcom變量中
- FutureTask的狀態(tài)從NEW修改為NORMAL
- 喚醒阻塞在waiters隊列中請求get的所有線程
protected void set(V v) {
// 將狀態(tài)由NEW更新為COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 保存任務的結果
outcome = v;
// 更新狀態(tài)的最終狀態(tài)-NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 通用的完成操作,主要作用就是喚醒阻塞在waiters隊列中請求get的線程
finishCompletion();
}
}
FutureTask#setException()方法處理執(zhí)行異常的結果
setException()方法主要完成做下面的工作。
- 將異常信息保存到outcom變量中
- FutureTask的狀態(tài)從NEW修改為EXCEPTIONAL
- 喚醒阻塞在waiters隊列中請求get的所有線程
// FutureTask#setException
protected void setException(Throwable t) {
// 將狀態(tài)由NEW更新為COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 將異常信息保存到輸出結果中
outcome = t;
// 更新狀態(tài)機處理異常的最終狀態(tài)-EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 通用的完成操作,主要作用就是喚醒阻塞在waiters隊列中請求get的線程
finishCompletion();
}
}
這里的finishCompletion()喚醒我們在后面講解,上面的整個邏輯可以用一張圖表示:

任務結果獲取get()原理
其他線程可以調用get()方法或者超時阻塞方法get(long timeout, TimeUnit unit)獲取任務運行的結果。
FutureTask#get()方法是獲取任務執(zhí)行結果的入口方法。
// 阻塞獲取任務結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 任務還沒有執(zhí)行完成,通過awaitDone方法進行阻塞等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回結果
return report(s);
}
// 超時阻塞獲取任務結果
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 判空處理
if (unit == null)
throw new NullPointerException();
int s = state;
// 任務還沒有執(zhí)行完成,通過awaitDone方法進行阻塞等待
if (s <= COMPLETING &&
// 如果awaitDone返回的結果還是小于等于COMPLETING,表示運行中,那么直接拋出超時異常
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 返回結果
return report(s);
}
FutureTask#awaitDone()方法阻塞等待任務執(zhí)行結束
該方法主要完成下面的工作:
- 判斷任務是否運行結束,結束的話直接返回運行狀態(tài)
- 如果任務沒有結果,將請求線程阻塞
- 請求線程阻塞時,會創(chuàng)建一個waiter節(jié)點,然后加入到阻塞等待的棧中
// 線程阻塞等待方法, timed等于 true表示阻塞等待有時間限制nanos, false表示沒有,一直阻塞
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 計算阻塞超時時間點
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
// 表示q是否添加到waiters棧中,默認false
boolean queued = false;
// 自旋操作
for (;;) {
// 如果阻塞線程被中斷則將當前線程從阻塞隊列中移除
if (Thread.interrupted()) {
// 從waiters棧中移除WaitNode,
removeWaiter(q);
// 返回中斷移除
throw new InterruptedException();
}
// 獲取任務的狀態(tài)
int s = state;
// 如果任務的狀態(tài)大于COMPLETING,表示線程運行結束了,直接返回
if (s > COMPLETING) {
// 任務已經(jīng)完成時直接返回結果
if (q != null)
q.thread = null;
// 返回狀態(tài)
return s;
}
// 如果任務狀態(tài)是COMPLETING
else if (s == COMPLETING)
// 如果任務執(zhí)行完成,但還差最后一步最終完成,則讓出CPU給任務執(zhí)行線程繼續(xù)執(zhí)行
Thread.yield();
// 如果任務狀態(tài)小于COMPLETING,說明任務還在運行中
// 如果q為空的情況
else if (q == null)
// 新進來的線程添加等待節(jié)點
q = new WaitNode();
// 如果任務還在運行中并且當前線程節(jié)點還不在waiters棧中,那么就加入
else if (!queued)
// 上一步節(jié)點創(chuàng)建完,還沒將其添加到waiters棧中,因此在下一個循環(huán)就會執(zhí)行此處進行入棧操作,并將當前線程的等待節(jié)點置于棧頂
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果任務還在運行中并且timed為true,表示有超時限制
else if (timed) {
// 如果設置了阻塞超時時間,則進行檢查是否達到阻塞超時時間,達到了則刪除當前線程的等待節(jié)點并退出循環(huán)返回,否則繼續(xù)阻塞
nanos = deadline - System.nanoTime();
// 如果nanos小于等于0
if (nanos <= 0L) {
// 從waiters棧中移除
removeWaiter(q);
//返回狀態(tài)
return state;
}
// 超時阻塞當前線程,超過時間,就會恢復
LockSupport.parkNanos(this, nanos);
}
// 如果任務還在運行中并且timed為false,沒有有超時限制
else
// 一直阻塞當前線程
LockSupport.park(this);
}
}
FutureTask#report方法解析返回任務結果
// 獲取任務結果方法:正常執(zhí)行則直接返回結果,否則拋出異常
private V report(int s) throws ExecutionException {
Object x = outcome;
// 如果狀態(tài)是正常情況
if (s == NORMAL)
// 直接返回
return (V)x;
// 如果狀態(tài)是取消了,拋出異常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
FutureTask#finishCompletion()方法用來喚醒前面等待的線程
上一步awaitDone方法會阻塞調用的線程,那么任務運行結束總要喚醒他們去拿結果吧,這個工作就在finishCompletion()方法中。
private void finishCompletion() {
// 遍歷waiters棧中的每個元素;
for (WaitNode q; (q = waiters) != null;) {
// cas設置waiters中q節(jié)點數(shù)據(jù)為null,成功的話,進入到if中
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 自選操作
for (;;) {
// 獲取節(jié)點中的線程
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 喚醒線程
LockSupport.unpark(t);
}
// 獲取下一個節(jié)點
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//鉤子方法,有子類去實現(xiàn)
done();
// 設置原來的任務callable為null
callable = null; // to reduce footprint
}
任務取消cancel()原理
可以調用FutureTask#cancel方法取消任務執(zhí)行,但是要注意下面幾點:
- 任務取消時會先檢查是否允許取消,當任務已經(jīng)完成或者正在完成(正常執(zhí)行并繼續(xù)處理結果 或 執(zhí)行異常處理異常結果)時不允許取消。
- cancel方法有個boolean入?yún)?,若為false,則只喚醒所有等待的線程,不中斷正在執(zhí)行的任務線程。若為true則直接中斷任務執(zhí)行線程,同時修改狀態(tài)為INTERRUPTED。
// 取消任務,參數(shù)mayInterruptIfRunning為true,會中斷運行中的線程,false不會
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果FutureTask的狀態(tài)不是NEW或者CAS設置失敗時,直接返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果參數(shù)mayInterruptIfRunning為true,中斷
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//cas修改狀態(tài)為INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 喚醒其他等待的線程
finishCompletion();
}
return true;
}
cancel方法實際上完成以下兩種狀態(tài)轉換之一:
- NEW -> CANCELLED (對應于mayInterruptIfRunning=false)
- NEW -> INTERRUPTING -> INTERRUPTED (對應于mayInterruptIfRunning=true)
總結
本文講解了線程池submit()提交任務的原理實現(xiàn),通過源碼很多平時項目中遇到的坑都找到了答案。比如說之前項目中用線程池submit()方法提交任務處理,發(fā)現(xiàn)任務的異常都不見了,這下明白了,原來是通過setException()保存下來了,只有通過get方法獲取到,只有看過源碼,才會豁然開朗。
以上就是Java線程池submit阻塞獲取結果的實現(xiàn)原理詳解的詳細內容,更多關于Java線程池submit阻塞獲取結果的資料請關注腳本之家其它相關文章!
相關文章
Java基于外觀模式實現(xiàn)美食天下食譜功能實例詳解
這篇文章主要介紹了Java基于外觀模式實現(xiàn)美食天下食譜功能,較為詳細的講述了外觀模式的概念、原理并結合實例形似詳細分析了Java基于外觀模式實現(xiàn)美食天下食譜功能的具體操作步驟與相關注意事項,需要的朋友可以參考下2018-05-05
mybatis 中 foreach collection的用法小結(三種)
這篇文章主要介紹了mybatis 中 foreach collection的用法小結(三種),需要的朋友可以參考下2017-10-10
SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能
這篇文章主要介紹了SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01

