Java中的FutureTask源碼解析
一、簡(jiǎn)介
1、FutureTask是一個(gè)可取消的異步計(jì)算。這個(gè)類是Future的實(shí)現(xiàn)類,有開(kāi)始和取消一個(gè)計(jì)算的方法,如果一個(gè)計(jì)算已經(jīng)完成可以查看結(jié)果。如果在計(jì)算沒(méi)有完成的情況下調(diào)用get獲取計(jì)算結(jié)果會(huì)阻塞。且一旦任務(wù)完成后,計(jì)算不能重新開(kāi)始或被取消,除非計(jì)算被runAndReset調(diào)用執(zhí)行。
2、FutureTask被用來(lái)去封裝一個(gè)Callable或者Runnable,一個(gè)FutureTask能夠被submit作為一個(gè)Executor
3、FutureTask 的線程安全由CAS來(lái)保證。
二、源碼分析
1、成員屬性
public class FutureTask<V> implements RunnableFuture<V> { //state表示的任務(wù)的狀態(tài) private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; //任務(wù) private Callable<V> callable; //存儲(chǔ)任務(wù)完成以后的結(jié)果 private Object outcome; //執(zhí)行當(dāng)前任務(wù)的線程 private volatile Thread runner; //執(zhí)行當(dāng)前任務(wù)被阻塞的線程 private volatile WaitNode waiters; }
可能有的狀態(tài)轉(zhuǎn)換:
NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
注意:state用volatile修飾的,如果在多線程并發(fā)的情況下,某一個(gè)線程改變了任務(wù)的狀態(tài),其他線程都能夠立馬知道,保證了state字段的可見(jiàn)性。
2、構(gòu)造函數(shù)
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
很好的詮釋了FutureTask封裝了Runnable或Callable,構(gòu)造完成后將任務(wù)的狀態(tài)變?yōu)镹EW。同時(shí)注意,封裝Runnable時(shí)用的Executors的靜態(tài)方法callable
順帶看下Executors.callable()這個(gè)方法,這個(gè)方法的功能是把Runnable轉(zhuǎn)換成Callable,代碼如下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
所以,F(xiàn)utureTask封裝Runnable使用了適配器模式的設(shè)計(jì)模式
3、核心方法
//運(yùn)行任務(wù)的方法 public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //得到當(dāng)前任務(wù) if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //當(dāng)前任務(wù)調(diào)用call方法,執(zhí)行,同時(shí),執(zhí)行完后將結(jié)果返回 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //表示任務(wù)執(zhí)行成功 set(result); //CAS改變?nèi)蝿?wù)的狀態(tài)從NEW->COMPLETING->NORMAL,同時(shí)將任務(wù)返回的結(jié)果保存到outcome屬性中,再移除并喚醒所有等待線程 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //將任務(wù)成功執(zhí)行完后返回的結(jié)果保存到outcome中 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終的狀態(tài),表示任務(wù)結(jié)束 finishCompletion(); //移除并喚醒所有等待線程 } } //該方法用于移除并喚醒所有等待線程 private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //喚醒 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; } public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); //打斷 } finally { // 設(shè)置成為最終態(tài)INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); //移除并喚醒所有等待線程 } return true; } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); //如果任務(wù)沒(méi)有完成或者其他的問(wèn)題,將阻塞;創(chuàng)建一個(gè)新節(jié)點(diǎn)存入阻塞棧中 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
三、示例
常用使用方式:
- 第一種方式: Future + ExecutorService
- 第二種方式: FutureTask + ExecutorService
- 第三種方式: FutureTask + Thread
第一種方式:Future + ExecutorService
public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Future future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { Long start = System.currentTimeMillis(); while (true) { Long current = System.currentTimeMillis(); if ((current - start) > 1000) { return 1; } } } }); try { Integer result = (Integer)future.get(); System.out.println(result); }catch (Exception e){ e.printStackTrace(); } } }
第二種方式:FutureTask + ExecutorService
ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask);
第三種方式:FutureTask + Thread
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task()); Thread thread = new Thread(futureTask); thread.setName("Task thread"); thread.start();
四、總結(jié)
1、FutureTask用來(lái)封裝Runnable或者Callable接口,可以當(dāng)成一個(gè)任務(wù)。
2、在Java并發(fā)程序中FutureTask表示一個(gè)可以取消的異步運(yùn)算。它有啟動(dòng)和取消運(yùn)算、查詢運(yùn)算是否完成和取回運(yùn)算結(jié)果等方法。只有當(dāng)運(yùn)算完成的時(shí)候結(jié)果才能取回,如果運(yùn)算尚未完成get方法將會(huì)阻塞。一個(gè)FutureTask對(duì)象可以對(duì)調(diào)用了Callable和Runnable的對(duì)象進(jìn)行包裝,由于FutureTask也是調(diào)用了Runnable接口所以它可以提交給Executor來(lái)執(zhí)行。
3、FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場(chǎng)景,通過(guò)傳入Runnable或者Callable的任務(wù)給FutureTask,直接調(diào)用其run方法或者放入線程池執(zhí)行,之后可以在外部通過(guò)FutureTask的get方法異步獲取執(zhí)行結(jié)果,因此,F(xiàn)utureTask非常適合用于耗時(shí)的計(jì)算,主線程可以在完成自己的任務(wù)后,再去獲取結(jié)果。另外,F(xiàn)utureTask還可以確保即使調(diào)用了多次run方法,它都只會(huì)執(zhí)行一次Runnable或者Callable任務(wù),或者通過(guò)cancel取消FutureTask的執(zhí)行等。
4、FutureTask間接繼承了Runnable和Callable
5、FutureTask的線程安全由CAS操作來(lái)保證
6、FutureTask結(jié)果返回機(jī)制 :只有任務(wù)成功執(zhí)行完成后,通過(guò)get方法能夠得到任務(wù)返回的結(jié)果,其他情況都會(huì)導(dǎo)致阻塞。
到此這篇關(guān)于Java中的FutureTask源碼解析的文章就介紹到這了,更多相關(guān)FutureTask源碼解析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA刪除字符串固定下標(biāo)字串的實(shí)現(xiàn)
本文主要介紹了JAVA刪除字符串固定下標(biāo)字串的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04mybatis中的if?test判斷入?yún)⒌闹祮?wèn)題
這篇文章主要介紹了mybatis中的if?test判斷入?yún)⒌闹祮?wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06在CentOS上安裝Java 17并實(shí)現(xiàn)多版本共存的詳細(xì)教程
在現(xiàn)代軟件開(kāi)發(fā)中,Java 作為一種廣泛使用的編程語(yǔ)言,其版本更新頻繁,不同項(xiàng)目可能依賴不同版本的 Java 運(yùn)行環(huán)境,CentOS 作為一款流行的 Linux 發(fā)行版,常被用于服務(wù)器部署和開(kāi)發(fā)環(huán)境,本文將詳細(xì)介紹如何在 CentOS 上安裝 Java 17,并實(shí)現(xiàn)與現(xiàn)有 Java 8 的多版本共存2025-03-03SpringKafka消息發(fā)布之KafkaTemplate與事務(wù)支持功能
通過(guò)本文介紹的基本用法、序列化選項(xiàng)、事務(wù)支持、錯(cuò)誤處理和性能優(yōu)化技術(shù),開(kāi)發(fā)者可以構(gòu)建高效可靠的Kafka消息發(fā)布系統(tǒng),事務(wù)支持特性尤為重要,它確保了在分布式環(huán)境中的數(shù)據(jù)一致性,感興趣的朋友一起看看吧2025-04-04java后臺(tái)如何接收get請(qǐng)求傳過(guò)來(lái)的數(shù)組
這篇文章主要介紹了java后臺(tái)如何接收get請(qǐng)求傳過(guò)來(lái)的數(shù)組問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11