Java線程池submit阻塞獲取結果的實現(xiàn)原理詳解
前言
Java線程池中提交任務運行,通常使用execute()
方法就足夠了。那如果想要實現(xiàn)在主線程中阻塞獲取線程池任務運行的結果,該怎么辦呢?答案是用submit()
方法提交任務。這也是面試中經常被問到的一個知識點,execute()
和submit()
提交任務的的區(qū)別是什么?底層是如何實現(xiàn)的?
案例演示
現(xiàn)在我們通過簡單的例子演示下submit()方法的妙處。
@Test public void testSubmit() throws ExecutionException, InterruptedException { // 創(chuàng)建一個核心線程數(shù)為5的線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)); // 創(chuàng)建一個計算任務 Callable<Integer> myTask = new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 10000; i++) { result += i; } Thread.sleep(1000); return result; } }; log.info("start submit task ....."); Future<Integer> future = threadPoolExecutor.submit(myTask); Integer sum = future.get(); log.info("get submit result: [{}]", sum); // use sum do other things }
運行結果:
主線程的確阻塞等待線程返回。
Future類API
我們看到用submit提交任務最后返回一個Future對象,F(xiàn)uture表示異步計算的結果。那它都提供了什么API呢?
方法 | 說明 |
---|---|
V get() | 等待任務執(zhí)行完成,然后獲取其結果。 |
V get(long timeout, TimeUnit unit) | 等待獲取任務執(zhí)行的結果,如果任務超過一定時間沒有執(zhí)行完畢,直接返回,拋出異常,不會一直等待下去。 |
boolean isDone() | 如果此任務已完成,則返回true。完成可能是由于正常終止、異常或取消——在所有這些情況下,該方法都將返回true。 |
boolean isCancelled() | 如果該任務在正常完成之前被取消,則返回true。 |
boolean cancel(boolean mayInterruptIfRunning) | 試圖取消此任務的執(zhí)行。1. 如果任務已經完成、已經取消或由于其他原因無法取消,則此嘗試將失敗。 |
- 如果在調用cancel時此任務尚未啟動,則此任務不應運行。
- 如果任務已經開始,那么mayInterruptIfRunning參數(shù)確定是否應該中斷執(zhí)行此任務的線程以試圖停止該任務。 |
和execute區(qū)別
從功能層面,我們已經很明白他們最大區(qū)別,
execute()
方式提交任務沒有返回值,直接線程中池異步運行任務。submit()
方式提交任務有返回值Future, 調用get方法可以阻塞調用線程,等待任務運行返回的結果。
那從源碼層面,二者又有什么區(qū)別和聯(lián)系呢?
我們看下submit()
提交的入口方法,代碼如下:
// AbstractExecutorService#submit public <T> Future<T> submit(Callable<T> task) { // 判空處理 if (task == null) throw new NullPointerException(); // 將提交的任務包裝成RunnableFuture RunnableFuture<T> ftask = newTaskFor(task); // 最終還是調用execute方法執(zhí)行任務 execute(ftask); return ftask; }
殊途同歸,最終都是調用execute()
方法,只不過submit()
方法在調用前做一層包裝,將任務包裝成RunnableFuture
對象。
關于線程池中execute()
方法提交的流程和原理實現(xiàn)不理解的,強烈建議先學習這篇文章:Java線程池源碼深度解析。
原理實現(xiàn)
本節(jié)內容我們聚焦在submit()方法的實現(xiàn)原理。
我們先思考下,如果讓我們設計實現(xiàn)調用get阻塞知道線程返回結果,要考慮哪些方面呢?
- 任務是否執(zhí)行結束或者執(zhí)行出錯等情況,是不是需要有個狀態(tài)位標記?
- 任務的執(zhí)行結果如何保存?
- 如果任務沒有執(zhí)行結束,如何阻塞當前線程,
LockSupport.park()
是一種方式。 - 如果有多個外部線程獲取get,是不是應該也要把外部線程存下來,怎么存?因為后面任務執(zhí)行完后需要喚醒他們。
帶著這些問題和基本思路我們看下jdk8中是如何實現(xiàn)的。
RunnableFuture類介紹
submit()
方法中調用newTaskFor()
方法獲取RunnableFuture
對象。
// AbstractExecutorService#newTaskFor protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { // 調用FutureTask的構造方法返回RunnableFuture對象 return new FutureTask<T>(callable); }
FutureTask
類結構圖如下:
FutureTask
是一個異步計算任務,包裝了我們外部提交的任務。
- 實現(xiàn)了Runnable接口
- 實現(xiàn)了Future接口,該接口封裝了任務結果的獲取、任務是否結束等接口。
RunnableFuture類重要屬性
1.任務運行狀態(tài)state
// 存儲當前任務運行狀態(tài) private volatile int state; // 當前任務尚未執(zhí)行 private static final int NEW = 0; // 當前任務正在結束,尚未完全結束,一種臨界狀態(tài) private static final int COMPLETING = 1; // 當前任務正常結束 private static final int NORMAL = 2; // 當前任務執(zhí)行過程中發(fā)生了異常 private static final int EXCEPTIONAL = 3; // 當前任務被取消 private static final int CANCELLED = 4; // 當前任務中斷中 private static final int INTERRUPTING = 5; // 當前任務已中斷 private static final int INTERRUPTED = 6;
可能的狀態(tài)轉換有如下幾種:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
2.真正要執(zhí)行的任務callble
// 存放真正提交的原始任務 private Callable<V> callable;
3.存放執(zhí)行結果outcome
返回的結果或從get()中拋出的異常 private Object outcome;
4.當前正在運行任務的線程runner
//當前任務被線程執(zhí)行期間,保存當前任務的線程對象引用 private volatile Thread runner;
5.調用get獲取任務結果的等待線程集合waiters
//因為會有很多線程去get當前任務的結果,所以這里使用了一種stack數(shù)據(jù)結構來保存 private volatile WaitNode waiters; static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
數(shù)據(jù)結構如下圖:
RunnableFuture類構造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); // 設置要執(zhí)行的任務 this.callable = callable; // 初始化時任務狀態(tài)為NEW this.state = NEW; }
任務執(zhí)行run()原理
submit()
方法最終調用線程池的execute()
方法,而execute()
方法會創(chuàng)建出"工人"Worker
對象,調用runWorker()
方法,它主要是執(zhí)行外部提交的任務,也就是這里的FutureTask
對象的run()
方法, 我們重點看下run()
方法。
FutureTask#run()
開始執(zhí)行任務。
它主要的功能是完成包裝的callable的call方法執(zhí)行,并將執(zhí)行結果保存到outcome中,同時捕獲了call方法執(zhí)行出現(xiàn)的異常,并保存異常信息,而不是直接拋出。
public void run() { // 狀態(tài)機不為NEW表示執(zhí)行完成或任務被取消了,直接返回 // 狀態(tài)機為NEW,同時將runner設置為當前線程,保證同一時刻只有一個線程執(zhí)行run方法,如果設置失敗也直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 取出原始的任務檢測不為空 且 再次檢查狀態(tài)為NEW(雙重校驗) if (c != null && state == NEW) { // 任務運行的結果 V result; // 任務是否運行是否正常, true:正常, false-異常 boolean ran; try { // 任務執(zhí)行,將結果返回給result result = c.call(); // 設置任務運行正常 ran = true; } catch (Throwable ex) { // 任務運行報錯的情況 // 設置結果為空 result = null; // 設置任務運行異常標記 ran = false; // 任務執(zhí)行拋出異常時,保存異常信息,而不直接拋出 setException(ex); } // 執(zhí)行成功則保存結果 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() // 執(zhí)行完成后設置runner為null runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts // 獲取任務狀態(tài) int s = state; // 如果被置為了中斷狀態(tài)則進行中斷的處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask#set()
方法處理正常執(zhí)行的運行結果
setException()方法主要完成做下面的工作。
- 將執(zhí)行結果保存到outcom變量中
- FutureTask的狀態(tài)從NEW修改為NORMAL
- 喚醒阻塞在waiters隊列中請求get的所有線程
protected void set(V v) { // 將狀態(tài)由NEW更新為COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保存任務的結果 outcome = v; // 更新狀態(tài)的最終狀態(tài)-NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 通用的完成操作,主要作用就是喚醒阻塞在waiters隊列中請求get的線程 finishCompletion(); } }
FutureTask#setException()
方法處理執(zhí)行異常的結果
setException()方法主要完成做下面的工作。
- 將異常信息保存到outcom變量中
- FutureTask的狀態(tài)從NEW修改為EXCEPTIONAL
- 喚醒阻塞在waiters隊列中請求get的所有線程
// FutureTask#setException protected void setException(Throwable t) { // 將狀態(tài)由NEW更新為COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 將異常信息保存到輸出結果中 outcome = t; // 更新狀態(tài)機處理異常的最終狀態(tài)-EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state // 通用的完成操作,主要作用就是喚醒阻塞在waiters隊列中請求get的線程 finishCompletion(); } }
這里的finishCompletion()
喚醒我們在后面講解,上面的整個邏輯可以用一張圖表示:
任務結果獲取get()原理
其他線程可以調用get()
方法或者超時阻塞方法get(long timeout, TimeUnit unit)
獲取任務運行的結果。
FutureTask#get()
方法是獲取任務執(zhí)行結果的入口方法。
// 阻塞獲取任務結果 public V get() throws InterruptedException, ExecutionException { int s = state; // 任務還沒有執(zhí)行完成,通過awaitDone方法進行阻塞等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 返回結果 return report(s); } // 超時阻塞獲取任務結果 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 判空處理 if (unit == null) throw new NullPointerException(); int s = state; // 任務還沒有執(zhí)行完成,通過awaitDone方法進行阻塞等待 if (s <= COMPLETING && // 如果awaitDone返回的結果還是小于等于COMPLETING,表示運行中,那么直接拋出超時異常 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 返回結果 return report(s); }
FutureTask#awaitDone()
方法阻塞等待任務執(zhí)行結束
該方法主要完成下面的工作:
- 判斷任務是否運行結束,結束的話直接返回運行狀態(tài)
- 如果任務沒有結果,將請求線程阻塞
- 請求線程阻塞時,會創(chuàng)建一個waiter節(jié)點,然后加入到阻塞等待的棧中
// 線程阻塞等待方法, timed等于 true表示阻塞等待有時間限制nanos, false表示沒有,一直阻塞 private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 計算阻塞超時時間點 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; // 表示q是否添加到waiters棧中,默認false boolean queued = false; // 自旋操作 for (;;) { // 如果阻塞線程被中斷則將當前線程從阻塞隊列中移除 if (Thread.interrupted()) { // 從waiters棧中移除WaitNode, removeWaiter(q); // 返回中斷移除 throw new InterruptedException(); } // 獲取任務的狀態(tài) int s = state; // 如果任務的狀態(tài)大于COMPLETING,表示線程運行結束了,直接返回 if (s > COMPLETING) { // 任務已經完成時直接返回結果 if (q != null) q.thread = null; // 返回狀態(tài) return s; } // 如果任務狀態(tài)是COMPLETING else if (s == COMPLETING) // 如果任務執(zhí)行完成,但還差最后一步最終完成,則讓出CPU給任務執(zhí)行線程繼續(xù)執(zhí)行 Thread.yield(); // 如果任務狀態(tài)小于COMPLETING,說明任務還在運行中 // 如果q為空的情況 else if (q == null) // 新進來的線程添加等待節(jié)點 q = new WaitNode(); // 如果任務還在運行中并且當前線程節(jié)點還不在waiters棧中,那么就加入 else if (!queued) // 上一步節(jié)點創(chuàng)建完,還沒將其添加到waiters棧中,因此在下一個循環(huán)就會執(zhí)行此處進行入棧操作,并將當前線程的等待節(jié)點置于棧頂 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果任務還在運行中并且timed為true,表示有超時限制 else if (timed) { // 如果設置了阻塞超時時間,則進行檢查是否達到阻塞超時時間,達到了則刪除當前線程的等待節(jié)點并退出循環(huán)返回,否則繼續(xù)阻塞 nanos = deadline - System.nanoTime(); // 如果nanos小于等于0 if (nanos <= 0L) { // 從waiters棧中移除 removeWaiter(q); //返回狀態(tài) return state; } // 超時阻塞當前線程,超過時間,就會恢復 LockSupport.parkNanos(this, nanos); } // 如果任務還在運行中并且timed為false,沒有有超時限制 else // 一直阻塞當前線程 LockSupport.park(this); } }
FutureTask#report
方法解析返回任務結果
// 獲取任務結果方法:正常執(zhí)行則直接返回結果,否則拋出異常 private V report(int s) throws ExecutionException { Object x = outcome; // 如果狀態(tài)是正常情況 if (s == NORMAL) // 直接返回 return (V)x; // 如果狀態(tài)是取消了,拋出異常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
FutureTask#finishCompletion()
方法用來喚醒前面等待的線程
上一步awaitDone
方法會阻塞調用的線程,那么任務運行結束總要喚醒他們去拿結果吧,這個工作就在finishCompletion()
方法中。
private void finishCompletion() { // 遍歷waiters棧中的每個元素; for (WaitNode q; (q = waiters) != null;) { // cas設置waiters中q節(jié)點數(shù)據(jù)為null,成功的話,進入到if中 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自選操作 for (;;) { // 獲取節(jié)點中的線程 Thread t = q.thread; if (t != null) { q.thread = null; // 喚醒線程 LockSupport.unpark(t); } // 獲取下一個節(jié)點 WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //鉤子方法,有子類去實現(xiàn) done(); // 設置原來的任務callable為null callable = null; // to reduce footprint }
任務取消cancel()原理
可以調用FutureTask#cancel
方法取消任務執(zhí)行,但是要注意下面幾點:
- 任務取消時會先檢查是否允許取消,當任務已經完成或者正在完成(正常執(zhí)行并繼續(xù)處理結果 或 執(zhí)行異常處理異常結果)時不允許取消。
- cancel方法有個boolean入?yún)?,若為false,則只喚醒所有等待的線程,不中斷正在執(zhí)行的任務線程。若為true則直接中斷任務執(zhí)行線程,同時修改狀態(tài)為INTERRUPTED。
// 取消任務,參數(shù)mayInterruptIfRunning為true,會中斷運行中的線程,false不會 public boolean cancel(boolean mayInterruptIfRunning) { // 如果FutureTask的狀態(tài)不是NEW或者CAS設置失敗時,直接返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果參數(shù)mayInterruptIfRunning為true,中斷 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state //cas修改狀態(tài)為INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 喚醒其他等待的線程 finishCompletion(); } return true; }
cancel方法實際上完成以下兩種狀態(tài)轉換之一:
- NEW -> CANCELLED (對應于mayInterruptIfRunning=false)
- NEW -> INTERRUPTING -> INTERRUPTED (對應于mayInterruptIfRunning=true)
總結
本文講解了線程池submit()提交任務的原理實現(xiàn),通過源碼很多平時項目中遇到的坑都找到了答案。比如說之前項目中用線程池submit()方法提交任務處理,發(fā)現(xiàn)任務的異常都不見了,這下明白了,原來是通過setException()保存下來了,只有通過get方法獲取到,只有看過源碼,才會豁然開朗。
以上就是Java線程池submit阻塞獲取結果的實現(xiàn)原理詳解的詳細內容,更多關于Java線程池submit阻塞獲取結果的資料請關注腳本之家其它相關文章!
相關文章
Java基于外觀模式實現(xiàn)美食天下食譜功能實例詳解
這篇文章主要介紹了Java基于外觀模式實現(xiàn)美食天下食譜功能,較為詳細的講述了外觀模式的概念、原理并結合實例形似詳細分析了Java基于外觀模式實現(xiàn)美食天下食譜功能的具體操作步驟與相關注意事項,需要的朋友可以參考下2018-05-05mybatis 中 foreach collection的用法小結(三種)
這篇文章主要介紹了mybatis 中 foreach collection的用法小結(三種),需要的朋友可以參考下2017-10-10SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能
這篇文章主要介紹了SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01