Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解
前言
Java線程池中提交任務(wù)運(yùn)行,通常使用execute()
方法就足夠了。那如果想要實(shí)現(xiàn)在主線程中阻塞獲取線程池任務(wù)運(yùn)行的結(jié)果,該怎么辦呢?答案是用submit()
方法提交任務(wù)。這也是面試中經(jīng)常被問到的一個(gè)知識(shí)點(diǎn),execute()
和submit()
提交任務(wù)的的區(qū)別是什么?底層是如何實(shí)現(xiàn)的?
案例演示
現(xiàn)在我們通過簡(jiǎn)單的例子演示下submit()方法的妙處。
@Test public void testSubmit() throws ExecutionException, InterruptedException { // 創(chuàng)建一個(gè)核心線程數(shù)為5的線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)); // 創(chuàng)建一個(gè)計(jì)算任務(wù) Callable<Integer> myTask = new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 10000; i++) { result += i; } Thread.sleep(1000); return result; } }; log.info("start submit task ....."); Future<Integer> future = threadPoolExecutor.submit(myTask); Integer sum = future.get(); log.info("get submit result: [{}]", sum); // use sum do other things }
運(yùn)行結(jié)果:
主線程的確阻塞等待線程返回。
Future類API
我們看到用submit提交任務(wù)最后返回一個(gè)Future對(duì)象,F(xiàn)uture表示異步計(jì)算的結(jié)果。那它都提供了什么API呢?
方法 | 說明 |
---|---|
V get() | 等待任務(wù)執(zhí)行完成,然后獲取其結(jié)果。 |
V get(long timeout, TimeUnit unit) | 等待獲取任務(wù)執(zhí)行的結(jié)果,如果任務(wù)超過一定時(shí)間沒有執(zhí)行完畢,直接返回,拋出異常,不會(huì)一直等待下去。 |
boolean isDone() | 如果此任務(wù)已完成,則返回true。完成可能是由于正常終止、異?;蛉∠?mdash;—在所有這些情況下,該方法都將返回true。 |
boolean isCancelled() | 如果該任務(wù)在正常完成之前被取消,則返回true。 |
boolean cancel(boolean mayInterruptIfRunning) | 試圖取消此任務(wù)的執(zhí)行。1. 如果任務(wù)已經(jīng)完成、已經(jīng)取消或由于其他原因無法取消,則此嘗試將失敗。 |
- 如果在調(diào)用cancel時(shí)此任務(wù)尚未啟動(dòng),則此任務(wù)不應(yīng)運(yùn)行。
- 如果任務(wù)已經(jīng)開始,那么mayInterruptIfRunning參數(shù)確定是否應(yīng)該中斷執(zhí)行此任務(wù)的線程以試圖停止該任務(wù)。 |
和execute區(qū)別
從功能層面,我們已經(jīng)很明白他們最大區(qū)別,
execute()
方式提交任務(wù)沒有返回值,直接線程中池異步運(yùn)行任務(wù)。submit()
方式提交任務(wù)有返回值Future, 調(diào)用get方法可以阻塞調(diào)用線程,等待任務(wù)運(yùn)行返回的結(jié)果。
那從源碼層面,二者又有什么區(qū)別和聯(lián)系呢?
我們看下submit()
提交的入口方法,代碼如下:
// AbstractExecutorService#submit public <T> Future<T> submit(Callable<T> task) { // 判空處理 if (task == null) throw new NullPointerException(); // 將提交的任務(wù)包裝成RunnableFuture RunnableFuture<T> ftask = newTaskFor(task); // 最終還是調(diào)用execute方法執(zhí)行任務(wù) execute(ftask); return ftask; }
殊途同歸,最終都是調(diào)用execute()
方法,只不過submit()
方法在調(diào)用前做一層包裝,將任務(wù)包裝成RunnableFuture
對(duì)象。
關(guān)于線程池中execute()
方法提交的流程和原理實(shí)現(xiàn)不理解的,強(qiáng)烈建議先學(xué)習(xí)這篇文章:Java線程池源碼深度解析。
原理實(shí)現(xiàn)
本節(jié)內(nèi)容我們聚焦在submit()方法的實(shí)現(xiàn)原理。
我們先思考下,如果讓我們?cè)O(shè)計(jì)實(shí)現(xiàn)調(diào)用get阻塞知道線程返回結(jié)果,要考慮哪些方面呢?
- 任務(wù)是否執(zhí)行結(jié)束或者執(zhí)行出錯(cuò)等情況,是不是需要有個(gè)狀態(tài)位標(biāo)記?
- 任務(wù)的執(zhí)行結(jié)果如何保存?
- 如果任務(wù)沒有執(zhí)行結(jié)束,如何阻塞當(dāng)前線程,
LockSupport.park()
是一種方式。 - 如果有多個(gè)外部線程獲取get,是不是應(yīng)該也要把外部線程存下來,怎么存?因?yàn)楹竺嫒蝿?wù)執(zhí)行完后需要喚醒他們。
帶著這些問題和基本思路我們看下jdk8中是如何實(shí)現(xiàn)的。
RunnableFuture類介紹
submit()
方法中調(diào)用newTaskFor()
方法獲取RunnableFuture
對(duì)象。
// AbstractExecutorService#newTaskFor protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { // 調(diào)用FutureTask的構(gòu)造方法返回RunnableFuture對(duì)象 return new FutureTask<T>(callable); }
FutureTask
類結(jié)構(gòu)圖如下:
FutureTask
是一個(gè)異步計(jì)算任務(wù),包裝了我們外部提交的任務(wù)。
- 實(shí)現(xiàn)了Runnable接口
- 實(shí)現(xiàn)了Future接口,該接口封裝了任務(wù)結(jié)果的獲取、任務(wù)是否結(jié)束等接口。
RunnableFuture類重要屬性
1.任務(wù)運(yùn)行狀態(tài)state
// 存儲(chǔ)當(dāng)前任務(wù)運(yùn)行狀態(tài) private volatile int state; // 當(dāng)前任務(wù)尚未執(zhí)行 private static final int NEW = 0; // 當(dāng)前任務(wù)正在結(jié)束,尚未完全結(jié)束,一種臨界狀態(tài) private static final int COMPLETING = 1; // 當(dāng)前任務(wù)正常結(jié)束 private static final int NORMAL = 2; // 當(dāng)前任務(wù)執(zhí)行過程中發(fā)生了異常 private static final int EXCEPTIONAL = 3; // 當(dāng)前任務(wù)被取消 private static final int CANCELLED = 4; // 當(dāng)前任務(wù)中斷中 private static final int INTERRUPTING = 5; // 當(dāng)前任務(wù)已中斷 private static final int INTERRUPTED = 6;
可能的狀態(tài)轉(zhuǎn)換有如下幾種:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
2.真正要執(zhí)行的任務(wù)callble
// 存放真正提交的原始任務(wù) private Callable<V> callable;
3.存放執(zhí)行結(jié)果outcome
返回的結(jié)果或從get()中拋出的異常 private Object outcome;
4.當(dāng)前正在運(yùn)行任務(wù)的線程runner
//當(dāng)前任務(wù)被線程執(zhí)行期間,保存當(dāng)前任務(wù)的線程對(duì)象引用 private volatile Thread runner;
5.調(diào)用get獲取任務(wù)結(jié)果的等待線程集合waiters
//因?yàn)闀?huì)有很多線程去get當(dāng)前任務(wù)的結(jié)果,所以這里使用了一種stack數(shù)據(jù)結(jié)構(gòu)來保存 private volatile WaitNode waiters; static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
數(shù)據(jù)結(jié)構(gòu)如下圖:
RunnableFuture類構(gòu)造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); // 設(shè)置要執(zhí)行的任務(wù) this.callable = callable; // 初始化時(shí)任務(wù)狀態(tài)為NEW this.state = NEW; }
任務(wù)執(zhí)行run()原理
submit()
方法最終調(diào)用線程池的execute()
方法,而execute()
方法會(huì)創(chuàng)建出"工人"Worker
對(duì)象,調(diào)用runWorker()
方法,它主要是執(zhí)行外部提交的任務(wù),也就是這里的FutureTask
對(duì)象的run()
方法, 我們重點(diǎn)看下run()
方法。
FutureTask#run()
開始執(zhí)行任務(wù)。
它主要的功能是完成包裝的callable的call方法執(zhí)行,并將執(zhí)行結(jié)果保存到outcome中,同時(shí)捕獲了call方法執(zhí)行出現(xiàn)的異常,并保存異常信息,而不是直接拋出。
public void run() { // 狀態(tài)機(jī)不為NEW表示執(zhí)行完成或任務(wù)被取消了,直接返回 // 狀態(tài)機(jī)為NEW,同時(shí)將runner設(shè)置為當(dāng)前線程,保證同一時(shí)刻只有一個(gè)線程執(zhí)行run方法,如果設(shè)置失敗也直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 取出原始的任務(wù)檢測(cè)不為空 且 再次檢查狀態(tài)為NEW(雙重校驗(yàn)) if (c != null && state == NEW) { // 任務(wù)運(yùn)行的結(jié)果 V result; // 任務(wù)是否運(yùn)行是否正常, true:正常, false-異常 boolean ran; try { // 任務(wù)執(zhí)行,將結(jié)果返回給result result = c.call(); // 設(shè)置任務(wù)運(yùn)行正常 ran = true; } catch (Throwable ex) { // 任務(wù)運(yùn)行報(bào)錯(cuò)的情況 // 設(shè)置結(jié)果為空 result = null; // 設(shè)置任務(wù)運(yùn)行異常標(biāo)記 ran = false; // 任務(wù)執(zhí)行拋出異常時(shí),保存異常信息,而不直接拋出 setException(ex); } // 執(zhí)行成功則保存結(jié)果 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() // 執(zhí)行完成后設(shè)置runner為null runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts // 獲取任務(wù)狀態(tài) int s = state; // 如果被置為了中斷狀態(tài)則進(jìn)行中斷的處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask#set()
方法處理正常執(zhí)行的運(yùn)行結(jié)果
setException()方法主要完成做下面的工作。
- 將執(zhí)行結(jié)果保存到outcom變量中
- FutureTask的狀態(tài)從NEW修改為NORMAL
- 喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的所有線程
protected void set(V v) { // 將狀態(tài)由NEW更新為COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保存任務(wù)的結(jié)果 outcome = v; // 更新狀態(tài)的最終狀態(tài)-NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 通用的完成操作,主要作用就是喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的線程 finishCompletion(); } }
FutureTask#setException()
方法處理執(zhí)行異常的結(jié)果
setException()方法主要完成做下面的工作。
- 將異常信息保存到outcom變量中
- FutureTask的狀態(tài)從NEW修改為EXCEPTIONAL
- 喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的所有線程
// FutureTask#setException protected void setException(Throwable t) { // 將狀態(tài)由NEW更新為COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 將異常信息保存到輸出結(jié)果中 outcome = t; // 更新狀態(tài)機(jī)處理異常的最終狀態(tài)-EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state // 通用的完成操作,主要作用就是喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的線程 finishCompletion(); } }
這里的finishCompletion()
喚醒我們?cè)诤竺嬷v解,上面的整個(gè)邏輯可以用一張圖表示:
任務(wù)結(jié)果獲取get()原理
其他線程可以調(diào)用get()
方法或者超時(shí)阻塞方法get(long timeout, TimeUnit unit)
獲取任務(wù)運(yùn)行的結(jié)果。
FutureTask#get()
方法是獲取任務(wù)執(zhí)行結(jié)果的入口方法。
// 阻塞獲取任務(wù)結(jié)果 public V get() throws InterruptedException, ExecutionException { int s = state; // 任務(wù)還沒有執(zhí)行完成,通過awaitDone方法進(jìn)行阻塞等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 返回結(jié)果 return report(s); } // 超時(shí)阻塞獲取任務(wù)結(jié)果 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 判空處理 if (unit == null) throw new NullPointerException(); int s = state; // 任務(wù)還沒有執(zhí)行完成,通過awaitDone方法進(jìn)行阻塞等待 if (s <= COMPLETING && // 如果awaitDone返回的結(jié)果還是小于等于COMPLETING,表示運(yùn)行中,那么直接拋出超時(shí)異常 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 返回結(jié)果 return report(s); }
FutureTask#awaitDone()
方法阻塞等待任務(wù)執(zhí)行結(jié)束
該方法主要完成下面的工作:
- 判斷任務(wù)是否運(yùn)行結(jié)束,結(jié)束的話直接返回運(yùn)行狀態(tài)
- 如果任務(wù)沒有結(jié)果,將請(qǐng)求線程阻塞
- 請(qǐng)求線程阻塞時(shí),會(huì)創(chuàng)建一個(gè)waiter節(jié)點(diǎn),然后加入到阻塞等待的棧中
// 線程阻塞等待方法, timed等于 true表示阻塞等待有時(shí)間限制nanos, false表示沒有,一直阻塞 private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 計(jì)算阻塞超時(shí)時(shí)間點(diǎn) final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; // 表示q是否添加到waiters棧中,默認(rèn)false boolean queued = false; // 自旋操作 for (;;) { // 如果阻塞線程被中斷則將當(dāng)前線程從阻塞隊(duì)列中移除 if (Thread.interrupted()) { // 從waiters棧中移除WaitNode, removeWaiter(q); // 返回中斷移除 throw new InterruptedException(); } // 獲取任務(wù)的狀態(tài) int s = state; // 如果任務(wù)的狀態(tài)大于COMPLETING,表示線程運(yùn)行結(jié)束了,直接返回 if (s > COMPLETING) { // 任務(wù)已經(jīng)完成時(shí)直接返回結(jié)果 if (q != null) q.thread = null; // 返回狀態(tài) return s; } // 如果任務(wù)狀態(tài)是COMPLETING else if (s == COMPLETING) // 如果任務(wù)執(zhí)行完成,但還差最后一步最終完成,則讓出CPU給任務(wù)執(zhí)行線程繼續(xù)執(zhí)行 Thread.yield(); // 如果任務(wù)狀態(tài)小于COMPLETING,說明任務(wù)還在運(yùn)行中 // 如果q為空的情況 else if (q == null) // 新進(jìn)來的線程添加等待節(jié)點(diǎn) q = new WaitNode(); // 如果任務(wù)還在運(yùn)行中并且當(dāng)前線程節(jié)點(diǎn)還不在waiters棧中,那么就加入 else if (!queued) // 上一步節(jié)點(diǎn)創(chuàng)建完,還沒將其添加到waiters棧中,因此在下一個(gè)循環(huán)就會(huì)執(zhí)行此處進(jìn)行入棧操作,并將當(dāng)前線程的等待節(jié)點(diǎn)置于棧頂 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果任務(wù)還在運(yùn)行中并且timed為true,表示有超時(shí)限制 else if (timed) { // 如果設(shè)置了阻塞超時(shí)時(shí)間,則進(jìn)行檢查是否達(dá)到阻塞超時(shí)時(shí)間,達(dá)到了則刪除當(dāng)前線程的等待節(jié)點(diǎn)并退出循環(huán)返回,否則繼續(xù)阻塞 nanos = deadline - System.nanoTime(); // 如果nanos小于等于0 if (nanos <= 0L) { // 從waiters棧中移除 removeWaiter(q); //返回狀態(tài) return state; } // 超時(shí)阻塞當(dāng)前線程,超過時(shí)間,就會(huì)恢復(fù) LockSupport.parkNanos(this, nanos); } // 如果任務(wù)還在運(yùn)行中并且timed為false,沒有有超時(shí)限制 else // 一直阻塞當(dāng)前線程 LockSupport.park(this); } }
FutureTask#report
方法解析返回任務(wù)結(jié)果
// 獲取任務(wù)結(jié)果方法:正常執(zhí)行則直接返回結(jié)果,否則拋出異常 private V report(int s) throws ExecutionException { Object x = outcome; // 如果狀態(tài)是正常情況 if (s == NORMAL) // 直接返回 return (V)x; // 如果狀態(tài)是取消了,拋出異常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
FutureTask#finishCompletion()
方法用來喚醒前面等待的線程
上一步awaitDone
方法會(huì)阻塞調(diào)用的線程,那么任務(wù)運(yùn)行結(jié)束總要喚醒他們?nèi)ツ媒Y(jié)果吧,這個(gè)工作就在finishCompletion()
方法中。
private void finishCompletion() { // 遍歷waiters棧中的每個(gè)元素; for (WaitNode q; (q = waiters) != null;) { // cas設(shè)置waiters中q節(jié)點(diǎn)數(shù)據(jù)為null,成功的話,進(jìn)入到if中 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自選操作 for (;;) { // 獲取節(jié)點(diǎn)中的線程 Thread t = q.thread; if (t != null) { q.thread = null; // 喚醒線程 LockSupport.unpark(t); } // 獲取下一個(gè)節(jié)點(diǎn) WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //鉤子方法,有子類去實(shí)現(xiàn) done(); // 設(shè)置原來的任務(wù)callable為null callable = null; // to reduce footprint }
任務(wù)取消cancel()原理
可以調(diào)用FutureTask#cancel
方法取消任務(wù)執(zhí)行,但是要注意下面幾點(diǎn):
- 任務(wù)取消時(shí)會(huì)先檢查是否允許取消,當(dāng)任務(wù)已經(jīng)完成或者正在完成(正常執(zhí)行并繼續(xù)處理結(jié)果 或 執(zhí)行異常處理異常結(jié)果)時(shí)不允許取消。
- cancel方法有個(gè)boolean入?yún)?,若為false,則只喚醒所有等待的線程,不中斷正在執(zhí)行的任務(wù)線程。若為true則直接中斷任務(wù)執(zhí)行線程,同時(shí)修改狀態(tài)為INTERRUPTED。
// 取消任務(wù),參數(shù)mayInterruptIfRunning為true,會(huì)中斷運(yùn)行中的線程,false不會(huì) public boolean cancel(boolean mayInterruptIfRunning) { // 如果FutureTask的狀態(tài)不是NEW或者CAS設(shè)置失敗時(shí),直接返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果參數(shù)mayInterruptIfRunning為true,中斷 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state //cas修改狀態(tài)為INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 喚醒其他等待的線程 finishCompletion(); } return true; }
cancel方法實(shí)際上完成以下兩種狀態(tài)轉(zhuǎn)換之一:
- NEW -> CANCELLED (對(duì)應(yīng)于mayInterruptIfRunning=false)
- NEW -> INTERRUPTING -> INTERRUPTED (對(duì)應(yīng)于mayInterruptIfRunning=true)
總結(jié)
本文講解了線程池submit()提交任務(wù)的原理實(shí)現(xiàn),通過源碼很多平時(shí)項(xiàng)目中遇到的坑都找到了答案。比如說之前項(xiàng)目中用線程池submit()方法提交任務(wù)處理,發(fā)現(xiàn)任務(wù)的異常都不見了,這下明白了,原來是通過setException()保存下來了,只有通過get方法獲取到,只有看過源碼,才會(huì)豁然開朗。
以上就是Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于Java線程池submit阻塞獲取結(jié)果的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能實(shí)例詳解
這篇文章主要介紹了Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能,較為詳細(xì)的講述了外觀模式的概念、原理并結(jié)合實(shí)例形似詳細(xì)分析了Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能的具體操作步驟與相關(guān)注意事項(xiàng),需要的朋友可以參考下2018-05-05mybatis 中 foreach collection的用法小結(jié)(三種)
這篇文章主要介紹了mybatis 中 foreach collection的用法小結(jié)(三種),需要的朋友可以參考下2017-10-10Java手寫Redis服務(wù)端的實(shí)現(xiàn)
本文主要介紹了Java手寫Redis服務(wù)端的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12SpringBoot和前端聯(lián)動(dòng)實(shí)現(xiàn)存儲(chǔ)瀏覽記錄功能
這篇文章主要介紹了SpringBoot和前端聯(lián)動(dòng)實(shí)現(xiàn)存儲(chǔ)瀏覽記錄功能,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01