解析Java異步之call future
一、概述
我們大家都知道,在 Java 中創(chuàng)建線(xiàn)程主要有三種方式:
- 繼承 Thread 類(lèi);
- 實(shí)現(xiàn) Runnable 接口;
- 實(shí)現(xiàn) Callable 接口。
而后兩者的區(qū)別在于 Callable 接口中的 call()
方法可以異步地返回一個(gè)計(jì)算結(jié)果 Future,并且一般需要配合ExecutorService 來(lái)執(zhí)行。這一套操作在代碼實(shí)現(xiàn)上似乎也并不難,可是對(duì)于call()方法具體怎么(被ExecutorService)執(zhí)行的,以及 Future 這個(gè)結(jié)果是怎么獲取的,卻又不是很清楚了。
那么本篇文章,我們就一起來(lái)學(xué)習(xí)下 Callable 接口以及 Future 的使用,主要面向兩個(gè)問(wèn)題:
- 承載著具體任務(wù)的 call() 方法如何被執(zhí)行的?
- 任務(wù)的執(zhí)行結(jié)果如何得到?
你可能會(huì)說(shuō),這兩個(gè)難道不是一個(gè)問(wèn)題嗎?任務(wù)執(zhí)行了就會(huì)有返回結(jié)果,而返回結(jié)果也一定是任務(wù)執(zhí)行了才返回的,難道還能返回一個(gè)其他任務(wù)的結(jié)果么??不要著急,耐心的看下去,你就會(huì)發(fā)現(xiàn),這兩個(gè)還真的就是一個(gè)問(wèn)題。
本文將分為兩個(gè)部分,第一部分分別介紹 任務(wù)
、執(zhí)行
、以及結(jié)果
這三個(gè)概念在 Java API 中的實(shí)體和各自的繼承關(guān)系,第二部分通過(guò)一個(gè)簡(jiǎn)單的例子回顧他們的用法,再理解下這兩個(gè)問(wèn)題的答案。
二、Callable、Executor 與 Future
既然是一個(gè)任務(wù)被執(zhí)行并返回結(jié)果,那么我們先來(lái)看看具體的任務(wù),也就是 Callable 接口。
2.1、任務(wù):Callable
非常簡(jiǎn)單,只包含一個(gè)有泛型返回值的 call() 方法,需要在最后返回定義類(lèi)型的結(jié)果。如果任務(wù)沒(méi)有需要返回的結(jié)果,那么將泛型 V 設(shè)為 void 并return null;
就可以了。對(duì)比的是 Runnable,另一個(gè)明顯的區(qū)別則是 Callable可以?huà)伋霎惓!?/p>
public interface Callable<V> { V call() throws Exception; } public interface Runnable { public abstract void run(); }
2.2、執(zhí)行:ExecutorService
說(shuō)到線(xiàn)程就少不了線(xiàn)程池,而說(shuō)到線(xiàn)程池肯定離不開(kāi) Executor
接口。下面這幅圖是 Executor 的框架,我們常用的是其中的兩個(gè)具體實(shí)現(xiàn)類(lèi) ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor,在 Executors 類(lèi)中通過(guò)靜態(tài)方法獲取。Executors 中包含了線(xiàn)程池以及線(xiàn)程工廠(chǎng)的構(gòu)造,與 Executor 接口的關(guān)系類(lèi)似于 Collection 接口和 Collections 類(lèi)的關(guān)系。
那么我們自頂向下,從源碼上了解一下 Executor 框架,學(xué)習(xí)學(xué)習(xí)任務(wù)是如何被執(zhí)行的。首先是 Executor 接口,其中只定義了 execute() 方法。
public interface Executor { void execute(Runnable command); }
ExecutorService 接口繼承了 Executor 接口,主要擴(kuò)展了一系列的 submit() 方法以及對(duì) executor 的終止和判斷狀態(tài)。以第一個(gè)<T> Future<T> submit(Callable<T> task);
為例,其中 task 為用戶(hù)定義的執(zhí)行的異步任務(wù),F(xiàn)uture 表示了任務(wù)的執(zhí)行結(jié)果,泛型 T 代表任務(wù)結(jié)果的類(lèi)型。
public interface ExecutorService extends Executor { void shutdown(); // 現(xiàn)有任務(wù)完成后停止線(xiàn)程池 List<Runnable> shutdownNow(); // 立即停止線(xiàn)程池 boolean isShutdown(); // 判斷是否已停止 boolean isTerminated(); <T> Future<T> submit(Callable<T> task); // 提交Callale任務(wù) <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); // 針對(duì)Callable集合的invokeAll()等方法 }
抽象類(lèi)AbstractExecutorService
是 ThreadPoolExecutor
的基類(lèi),在下面的代碼中,它實(shí)現(xiàn)了ExecutorService 接口中的 submit() 方法。注釋中是對(duì)應(yīng)的 newTaskFor() 方法的代碼,非常簡(jiǎn)單,就是將傳入的Callable 或 Runnable 參數(shù)封裝成一個(gè) FutureTask 對(duì)象。
// 1.第一個(gè)重載方法,參數(shù)為Callable public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); // return new FutureTask<T>(callable); execute(ftask); return ftask; } // 2.第二個(gè)重載方法,參數(shù)為Runnable public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); // return new FutureTask<T>(task, null); execute(ftask); return ftask; } // 3.第三個(gè)重載方法,參數(shù)為Runnable + 返回對(duì)象 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); // return new FutureTask<T>(task, result); execute(ftask); return ftask; }
那么也就是說(shuō),無(wú)論傳入的是 Callable 還是 Runnable,submit() 方法其實(shí)就做了三件事
具體來(lái)說(shuō),submit() 中首先生成了一個(gè) RunnableFuture 引用的 FutureTask 實(shí)例,然后調(diào)用 execute() 方法來(lái)執(zhí)行它,那么我們可以推測(cè) FutureTask 繼承自 RunnableFuture,而 RunnableFuture 又實(shí)現(xiàn)了 Runnable,因?yàn)閑xecute() 的參數(shù)應(yīng)為 Runnable 類(lèi)型。上面還涉及到了 FutureTask 的構(gòu)造函數(shù),也來(lái)看一下。
public FutureTask(Callable<V> callable) { this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); // 通過(guò)適配器將runnable在call()中執(zhí)行并返回result this.state = NEW; }
FutureTask 共有兩個(gè)構(gòu)造方法。第一個(gè)構(gòu)造方法比較簡(jiǎn)單,對(duì)應(yīng)上面的第一個(gè) submit(),采用組合的方式封裝Callable 并將狀態(tài)設(shè)為NEW
;而第二個(gè)構(gòu)造方法對(duì)應(yīng)上面的后兩個(gè) submit() 重載,不同之處是首先使用了Executors.callable
來(lái)將 Runnable 和 result 組合成 Callable,這里采用了適配器RunnableAdapter implements Callable
,巧妙地在 call() 中執(zhí)行 Runnable 并返回結(jié)果。
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; // 返回的結(jié)果;顯然:需要在run()中賦值 RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
在適配器設(shè)計(jì)模式中,通常包含**目標(biāo)接口 Target、適配器 Adapter 和被適配者 Adaptee **三類(lèi)角色,其中目標(biāo)接口代表客戶(hù)端(當(dāng)前業(yè)務(wù)系統(tǒng))所需要的功能,通常為借口或抽象類(lèi);被適配者為現(xiàn)存的不能滿(mǎn)足使用需求的類(lèi);適配器是一個(gè)轉(zhuǎn)換器,也稱(chēng) wrapper,用于給被適配者添加目標(biāo)功能,使得客戶(hù)端可以按照目標(biāo)接口的格式正確訪(fǎng)問(wèn)。對(duì)于 RunnableAdapter 來(lái)說(shuō),Callable 是其目標(biāo)接口,而 Runnable 則是被適配者。RunnableAdapter 通過(guò)覆蓋 call() 方法使其可按照 Callable 的要求來(lái)使用,同時(shí)其構(gòu)造方法中接收被適配者和目標(biāo)對(duì)象,滿(mǎn)足了 call() 方法有返回值的要求。
那么總結(jié)一下 submit() 方法執(zhí)行的流程,就是:Callable 被封裝在 Runnable 的子類(lèi)中傳入 execute() 得以執(zhí)行。
2.3、結(jié)果:Future
要說(shuō) Future 就是異步任務(wù)的執(zhí)行結(jié)果其實(shí)并不準(zhǔn)確,因?yàn)樗砹艘粋€(gè)任務(wù)的執(zhí)行過(guò)程,有狀態(tài)、可以被取消,而 get() 方法的返回值才是任務(wù)的結(jié)果。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
我們?cè)谏厦嬷羞€提到了 RuunableFuture 和 FutureTask。從官方的注釋來(lái)看,RuunableFuture 就是一個(gè)可以 run的 future,實(shí)現(xiàn)了 Runnable 和 Future 兩個(gè)接口,在 run() 方法中執(zhí)行完計(jì)算時(shí)應(yīng)該將結(jié)果保存起來(lái)以便通過(guò) get()獲取。
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation unless it has been cancelled. */ void run(); }
FutureTask 直接實(shí)現(xiàn)了 RunnableFuture 接口,作為執(zhí)行過(guò)程,共有下面這幾種狀態(tài),其中 COMPLETING 為一個(gè)暫時(shí)狀態(tài),表示正在設(shè)置結(jié)果或異常,對(duì)應(yīng)的,設(shè)置完成后狀態(tài)變?yōu)?NORMAL 或 EXCEPTIONAL;CANCELLED、INTERRUPTED 表示任務(wù)被取消或中斷。在上面的構(gòu)造方法中,將 state 初始化為 NEW。
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
然后是 FutureTask 的主要內(nèi)容,主要是 run() 和 get()。注意 outcome 的注釋?zhuān)瑹o(wú)論是否發(fā)生異常返回的都是這個(gè) outcome,因?yàn)樵趫?zhí)行中如果執(zhí)行成功就將結(jié)果設(shè)置給了它(set()
),而發(fā)生異常時(shí)將異常賦給了他(setException()
),而在獲取結(jié)果時(shí)也都返回了 outcome(通過(guò)report()
)。
public class FutureTask<V> implements RunnableFuture<V> { private Callable<V> callable; // target,待執(zhí)行的任務(wù) /** 保存執(zhí)行結(jié)果或異常,在get()方法中返回/拋出 */ private Object outcome; // 非volatile,通過(guò)CAS保證線(xiàn)程安全 public void run() { ...... Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 調(diào)用call()執(zhí)行用戶(hù)任務(wù)并獲取結(jié)果 ran = true; // 執(zhí)行完成,ran置為true } catch (Throwable ex) { // 調(diào)用call()出現(xiàn)異常,而run()方法繼續(xù)執(zhí)行 result = null; ran = false; setException(ex); // setException(Throwable t): compareAndSwapInt(NEW, COMPLETING); outcome = t; } if (ran) set(result); // set(V v): compareAndSwapInt(NEW, COMPLETING); outcome = v; } } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 加入隊(duì)列等待COMPLETING完成,可響應(yīng)超時(shí)、中斷 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 超時(shí)等待 } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) // 將outcome作為執(zhí)行結(jié)果返回 return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); // 將outcome作為捕獲的返回 } }
FutureTask 實(shí)現(xiàn)了 RunnableFuture 接口,所以有兩方面的作用。
- 第一,作為 Runnable 傳入 execute() 方法來(lái)執(zhí)行,同時(shí)封裝 Callable 對(duì)象并在 run() 中調(diào)用其 call() 方法;
- 第二,作為 Future 管理任務(wù)的執(zhí)行狀態(tài),將 call() 的返回值保存在 outcome 中以通過(guò) get() 獲取。這似乎就能回答開(kāi)頭的兩個(gè)問(wèn)題,并且渾然天成,就好像是一個(gè)問(wèn)題,除非發(fā)生異常的時(shí)候返回的不是任務(wù)的結(jié)果而是異常對(duì)象。
總結(jié)一下繼承關(guān)系:
三、使用舉例
文章的標(biāo)題有點(diǎn)唬人,說(shuō)到底還是講 Callable 的用法?,F(xiàn)在我們知道了 Future 代表了任務(wù)執(zhí)行的過(guò)程和結(jié)果,作為 call() 方法的返回值來(lái)獲取執(zhí)行結(jié)果;而 FutureTask 是一個(gè) Runnable 的 Future,既是任務(wù)執(zhí)行的過(guò)程和結(jié)果,又是 call 方法最終執(zhí)行的載體。下面通過(guò)一個(gè)例子看看他們?cè)谑褂蒙系膮^(qū)別。
首先創(chuàng)建一個(gè)任務(wù),即定義一個(gè)任務(wù)類(lèi)實(shí)現(xiàn) Callable 接口,在 call() 方法里添加我們的操作,這里用耗時(shí)三秒然后返回 100 模擬計(jì)算過(guò)程。
class MyTask implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子線(xiàn)程開(kāi)始計(jì)算..."); for (int i=0;i<3;++i){ Thread.sleep(1000); System.out.println("子線(xiàn)程計(jì)算中,用時(shí) "+(i+1)+" 秒"); } System.out.println("子線(xiàn)程計(jì)算完成,返回:100"); return 100; } }
然后呢,創(chuàng)建一個(gè)線(xiàn)程池,并實(shí)例化一個(gè) MyTask 備用。
ExecutorService executor = Executors.newCachedThreadPool(); MyTask task = new MyTask();
現(xiàn)在,分別使用 Future 和 FutureTask 來(lái)獲取執(zhí)行結(jié)果,看看他們有什么區(qū)別。
3.1、使用Future
Future 一般作為 submit() 的返回值使用,并在主線(xiàn)程中以阻塞的方式獲取異步任務(wù)的執(zhí)行結(jié)果。
System.out.println("主線(xiàn)程啟動(dòng)線(xiàn)程池"); Future<Integer> future = executor.submit(task); System.out.println("主線(xiàn)程得到返回結(jié)果:"+future.get()); executor.shutdown();
看看輸出結(jié)果:
主線(xiàn)程啟動(dòng)線(xiàn)程池
子線(xiàn)程開(kāi)始計(jì)算...
子線(xiàn)程計(jì)算中,用時(shí) 1 秒
子線(xiàn)程計(jì)算中,用時(shí) 2 秒
子線(xiàn)程計(jì)算中,用時(shí) 3 秒
子線(xiàn)程計(jì)算完成,返回:100
主線(xiàn)程得到返回結(jié)果:100
由于 get() 方法阻塞獲取結(jié)果,所以輸出順序?yàn)樽泳€(xiàn)程計(jì)算完成后主線(xiàn)程輸出結(jié)果。
3.2、使用FutureTask
由于 FutureTask 集任務(wù)與結(jié)果于一身,所以我們可以使用 FutureTask 自身而非返回值來(lái)管理任務(wù),這需要首先利用 Callable 對(duì)象來(lái)構(gòu)造 FutureTask,并調(diào)用不同的submit()
重載方法。
System.out.println("主線(xiàn)程啟動(dòng)線(xiàn)程池"); FutureTask<Integer> futureTask = new FutureTask<>(task); executor.submit(futureTask); // 作為Ruunable傳入submit()中 System.out.println("主線(xiàn)程得到返回結(jié)果:"+futureTask.get()); // 作為Future獲取結(jié)果 executor.shutdown();
這段程序的輸出與上面中完全相同,其實(shí)兩者在實(shí)際執(zhí)行中的區(qū)別也不大,雖然前者調(diào)用了submit(Callable<T> task)
而后者調(diào)用了submit(Runnable task)
,但最終都通過(guò)execute(futuretask)
來(lái)把任務(wù)加入線(xiàn)程池中。
四、總結(jié)
上面大費(fèi)周章其實(shí)只是盡可能細(xì)致地講清楚了 Callable 中的任務(wù)是如何執(zhí)行的,總結(jié)起來(lái)就是:
線(xiàn)程池中,submit() 方法實(shí)際上將 Callable 封裝在 FutureTask 中,將其作為 Runnable 的子類(lèi)傳給 execute()真正執(zhí)行;FutureTask 在 run() 中調(diào)用 Callable 對(duì)象的 call() 方法并接收返回值或捕獲異常保存在Object outcome
中,同時(shí)管理執(zhí)行過(guò)程中的狀態(tài)state
;FutureTask 同時(shí)作為 Future 的子類(lèi),通過(guò) get() 返回任務(wù)的執(zhí)行結(jié)果,若未執(zhí)行完成則通過(guò)等待隊(duì)列進(jìn)行阻塞等待完成;
FutureTask 作為一個(gè) Runnable 的 Future,其中最重要的兩個(gè)方法如下。
以上就是解析Java異步之call future的詳細(xì)內(nèi)容,更多關(guān)于Java異步 call future的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
利用Java發(fā)送郵件的實(shí)現(xiàn)代碼
這篇文章給大家分享了如何利用Java發(fā)送郵件,文章通過(guò)實(shí)例代碼介紹的很詳細(xì),有需要的可以參考借鑒。2016-08-08Java服務(wù)限流算法的6種實(shí)現(xiàn)
服務(wù)限流是指通過(guò)控制請(qǐng)求的速率或次數(shù)來(lái)達(dá)到保護(hù)服務(wù)的目的,本文主要介紹了Java服務(wù)限流算法的6種實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-05-05Java使用線(xiàn)程池批量處理數(shù)據(jù)操作具體流程
這篇文章主要給大家介紹了關(guān)于Java使用線(xiàn)程池批量處理數(shù)據(jù)操作的相關(guān)資料,Java多線(xiàn)程編程中線(xiàn)程池是一個(gè)非常重要的概念,線(xiàn)程池可以提高線(xiàn)程的復(fù)用率和任務(wù)調(diào)度的效率,尤其是當(dāng)需要查詢(xún)大批量數(shù)據(jù)時(shí),需要的朋友可以參考下2023-06-06MyBatis中一對(duì)多的xml配置方式(嵌套查詢(xún)/嵌套結(jié)果)
這篇文章主要介紹了MyBatis中一對(duì)多的xml配置方式(嵌套查詢(xún)/嵌套結(jié)果),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03java實(shí)現(xiàn)檢測(cè)是否字符串中包含中文
本文給大家分享了2個(gè)使用java檢測(cè)字符串中是否包含中文的代碼,都非常的實(shí)用,最后附上了各種字符的unicode編碼的范圍,方便我們以后使用正則進(jìn)行匹配檢測(cè)。2015-10-10深入理解happens-before和as-if-serial語(yǔ)義
本文大部分整理自《Java并發(fā)編程的藝術(shù)》,溫故而知新,加深對(duì)基礎(chǔ)的理解程度。下面可以和小編來(lái)一起學(xué)習(xí)下2019-05-05