Java線程池中的Future實現(xiàn)詳解
一 線程池的提交方式
在向線程池提交線程的時候,有兩個方法, 一個是executor(Runnable runnable)方法, 另一個是submit(Runnable runnable)方法,
Future<Integer> result = executor.submit(task); //有返回值 executor.executor(task) //沒有返回值
二 Runnable、Callable、FutureTask的使用
(1) Runnable是一個任務(wù)的概念,這個任務(wù)沒有返回值
(2) Callable也是一個任務(wù),這個任務(wù)有返回值, 不能單獨(dú)使用,必須配合FutureTask一起使用
(3) FutureTask是一個任務(wù),FutureTask繼承了Runnable、Callable, 通過FutureTask可以獲取到任務(wù)執(zhí)行的狀態(tài)
(4) FutureTask任務(wù)執(zhí)行完成完成后,將結(jié)構(gòu)通過Future接口返回,調(diào)用者可以調(diào)用Future#get()方法獲取到數(shù)據(jù)
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主線程在執(zhí)行任務(wù)"); try { System.out.println("task運(yùn)行結(jié)果"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任務(wù)執(zhí)行完畢"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子線程在進(jìn)行計算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
三 FutureTask的實現(xiàn)
3.1 字段屬性
(1) 任務(wù)的狀態(tài):
(2) 具體執(zhí)行的任務(wù)和返回值、執(zhí)行的線程、等待結(jié)果的線程隊列
3.2 構(gòu)造方法
構(gòu)造方法的主要作用是將Runnable、Callable轉(zhuǎn)換為FutureTask
3.3 任務(wù)的執(zhí)行
任務(wù)執(zhí)行
FutureTask#run() 方法
public void run() { // 如果狀態(tài)不是 NEW,說明任務(wù)已經(jīng)執(zhí)行過或者已經(jīng)被取消,直接返回 // 如果狀態(tài)是 NEW,則嘗試把執(zhí)行線程保存在 runnerOffset(runner字段),如果賦值失敗,則直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 獲取構(gòu)造函數(shù)傳入的 Callable 值 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 正常調(diào)用 Callable 的 call 方法就可以獲取到返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 保存 call 方法拋出的異常 setException(ex); } if (ran) // 保存 call 方法的執(zhí)行結(jié)果 set(result); } } finally { runner = null; int s = state; // 如果任務(wù)被中斷,則執(zhí)行中斷處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask中保存了Callable,在run()方法中調(diào)用Callable.call()方法就可以獲取到返回值,然后通過 set(result) 保存正常程序運(yùn)行結(jié)果,或通過 setException(ex) 保存程序異常信息
FutureTask#設(shè)置返回值方法
** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes // 保存異常結(jié)果 protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } // 保存正常結(jié)果 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
setException() 和 set ()方法非常相似,都是將異?;蛘呓Y(jié)果保存在 Object 類型的 outcome 變量中,outcome 是成員變量,就要考慮線程安全,所以他們要通過 CAS方式設(shè)置 outcome 變量的值,既然是在 CAS 成功后 更改 outcome 的值,這也就是 outcome 沒有被 volatile 修飾的原因所在。
FutureTask#finishCompletion() 方法
這個方法是喚醒關(guān)注執(zhí)行結(jié)果的線程,通知他們?nèi)カ@取任務(wù)的執(zhí)行結(jié)果
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 喚醒等待隊列中的線程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
3.4 任務(wù)的獲取
get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果 state 還沒到 set outcome 結(jié)果的時候,則調(diào)用 awaitDone() 方法阻塞自己 if (s <= COMPLETING) s = awaitDone(false, 0L); // 返回結(jié)果 return report(s); }
awaitDone(boolean timed, long nanos)阻塞線程并且加入阻塞隊列中
// get 方法支持超時限制,如果沒有傳入超時時間,則接受的參數(shù)是 false 和 0L // 有等待就會有隊列排隊或者可響應(yīng)中斷,從方法簽名上看有 InterruptedException,說明該方法這是可以被中斷的 private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 計算等待截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果當(dāng)前線程被中斷,如果是,則在等待對立中刪除該節(jié)點(diǎn),并拋出 InterruptedException if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 狀態(tài)大于 COMPLETING 說明已經(jīng)達(dá)到某個最終狀態(tài)(正常結(jié)束/異常結(jié)束/取消) // 把 thread 只為空,并返回結(jié)果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果是COMPLETING 狀態(tài)(中間狀態(tài)),表示任務(wù)已結(jié)束,但 outcome 賦值還沒結(jié)束,這時主動讓出執(zhí)行權(quán),讓其他線程優(yōu)先執(zhí)行(只是發(fā)出這個信號,至于是否別的線程執(zhí)行一定會執(zhí)行可是不一定的) else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 等待節(jié)點(diǎn)為空 else if (q == null) // 將當(dāng)前線程構(gòu)造節(jié)點(diǎn) q = new WaitNode(); // 如果還沒有入隊列,則把當(dāng)前節(jié)點(diǎn)加入waiters首節(jié)點(diǎn)并替換原來waiters else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果設(shè)置超時時間 else if (timed) { nanos = deadline - System.nanoTime(); // 時間到,則不再等待結(jié)果 if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞等待特定時間 LockSupport.parkNanos(this, nanos); } else // 掛起當(dāng)前線程,知道被其他線程喚醒 LockSupport.park(this); } }
進(jìn)入這個方法會經(jīng)歷三個循環(huán):
- 第一輪for循環(huán),執(zhí)行的是q == null邏輯,這個時候會創(chuàng)建一個WaitNode節(jié)點(diǎn)
- 第二輪for循環(huán),執(zhí)行的是!queued,這個時候會把第一輪循環(huán)生成的節(jié)點(diǎn)的next執(zhí)行waiters,然后通過CAS的把節(jié)點(diǎn)q替換waiters,如果替換成功,第二輪結(jié)束
- 第三輪for循環(huán),執(zhí)行阻塞,或者阻塞特定的時間設(shè)置返回結(jié)果(set方法)/異常的方法(setException)兩個方法后,調(diào)用finishCompletion方法,這個方法會喚醒等待隊列中的線程.
3.5 任務(wù)的取消
將一個任務(wù)修改為終態(tài)的只有三種方法: set()方法 setException()方法 cancel 方法
查看 Future cancel(),該方法注釋上明確說明三種 cancel 操作一定失敗的情形
- 任務(wù)已經(jīng)執(zhí)行完成了
- 任務(wù)已經(jīng)被取消過了
- 任務(wù)因為某種原因不能被取消
其它情況下,cancel操作將返回true。值得注意的是,cancel操作返回 true 并不代表任務(wù)真的就是被取消, 這取決于發(fā)動cancel狀態(tài)時,任務(wù)所處的狀態(tài)
- 如果發(fā)起cancel時任務(wù)還沒有開始運(yùn)行,則隨后任務(wù)就不會被執(zhí)行;
- 如果發(fā)起cancel時任務(wù)已經(jīng)在運(yùn)行了,則這時就需要看 mayInterruptIfRunning 參數(shù)了:
如果mayInterruptIfRunning 為true, 則當(dāng)前在執(zhí)行的任務(wù)會被中斷
如果mayInterruptIfRunning 為false, 則可以允許正在執(zhí)行的任務(wù)繼續(xù)運(yùn)行,直到它執(zhí)行完
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 需要中斷任務(wù)執(zhí)行線程 if (mayInterruptIfRunning) { try { Thread t = runner; // 中斷線程 if (t != null) t.interrupt(); } finally { // final state // 修改為最終狀態(tài) INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 喚醒等待中的線程 finishCompletion(); } return true; }
到此這篇關(guān)于Java線程池中的Future實現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Java的Future實現(xiàn)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot集成SpringSecurity和JWT做登陸鑒權(quán)的實現(xiàn)
這篇文章主要介紹了SpringBoot集成SpringSecurity和JWT做登陸鑒權(quán)的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04深入解析Java中的編碼轉(zhuǎn)換以及編碼和解碼操作
這篇文章主要介紹了Java中的編碼轉(zhuǎn)換以及編碼和解碼操作,文中詳細(xì)解讀了編碼解碼的相關(guān)IO操作以及內(nèi)存使用方面的知識,需要的朋友可以參考下2016-02-02SpringBoot優(yōu)化連接數(shù)的方法詳解
SpringBoot開發(fā)最大的好處是簡化配置,內(nèi)置了Tomcat,下面這篇文章主要給大家介紹了關(guān)于SpringBoot優(yōu)化連接數(shù)的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06mybatis?<foreach>標(biāo)簽動態(tài)增刪改查方式
這篇文章主要介紹了mybatis?<foreach>標(biāo)簽動態(tài)增刪改查方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03java反射機(jī)制及beanUtils的實現(xiàn)原理分析
本文介紹了Java的反射機(jī)制、VO、DTO、PO的概念以及BeanUtils的實現(xiàn)原理和簡單示例,通過反射可以在運(yùn)行時動態(tài)操作類、方法和字段,BeanUtils用于在不同bean之間進(jìn)行屬性復(fù)制2024-12-12JDK9為何要將String的底層實現(xiàn)由char[]改成了byte[]
String 類的源碼已經(jīng)由?char[]?優(yōu)化為了?byte[]?來存儲字符串內(nèi)容,為什么要這樣做呢?本文就詳細(xì)的介紹一下,感興趣的可以了解一下2022-03-03詳解如何為SpringBoot項目中的自定義配置添加IDE支持
這篇文章主要介紹了詳解如何為SpringBoot項目中的自定義配置添加IDE支持,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02