Java多線程Future松獲取異步任務(wù)結(jié)果輕松實(shí)現(xiàn)
前言
最近因?yàn)橐恍﹤€(gè)人原因,未能抽出太多精力更新Java多線程系列,一擱置就是好幾個(gè)月,先向讀者諸君致歉。
在本系列的其他文章中,已經(jīng)提到過線程之間的相互協(xié)作, 通過分工,將程序系統(tǒng)的不同任務(wù)進(jìn)行線程分離,充分利用機(jī)器性能、提升特定線程的利用率和程序的體驗(yàn)感。
詳見拙作:Java多線程基礎(chǔ)--線程生命周期與線程協(xié)作詳解.
并在線程池相關(guān)文章中提到:作為程序構(gòu)建者,我們更關(guān)心線程(組)的特性和它們所執(zhí)行的任務(wù),并不愿意分心去做線程操作。
詳見拙作:Java多線程基礎(chǔ)--線程的創(chuàng)建與線程池管理
然而實(shí)際開發(fā)中,我們同樣關(guān)心一個(gè)任務(wù)對(duì)程序系統(tǒng)產(chǎn)生的影響,習(xí)慣上稱之為任務(wù)的的執(zhí)行結(jié)果。
Runnable的局限性
在前文中我們談到,通過編碼實(shí)現(xiàn)Runnable接口,將獲得具有邊界性的 "任務(wù)",在指定的線程(或者線程池)中運(yùn)行。
重新觀察該接口,不難發(fā)現(xiàn)它并沒有方法返回值:
public interface Runnable { void run(); }
在JDK1.5之前,想利用任務(wù)的執(zhí)行結(jié)果,需要小心的操作線程訪問臨界區(qū)資源。使用 回調(diào)
進(jìn)行解耦是非常不錯(cuò)的選擇。
練手小Demo -- 回顧既往文章知識(shí)
注意,為了減少篇幅使用了lambda,但jdk1.5之前并不支持lambda
將計(jì)算任務(wù)分離到其他線程執(zhí)行,再回到主線程消費(fèi)結(jié)果
我們將計(jì)算、IO等耗時(shí)任務(wù)丟到其他線程,讓主線程專注于自身業(yè)務(wù),假想它在接受用戶輸入以及處理反饋,但我們略去這一部分
我們可以設(shè)計(jì)出類似下面的代碼:
雖然它還有很多不合理之處值得優(yōu)化,但也足以用于演示
class Demo { static final Object queueLock = new Object(); static List<Runnable> mainQueue = new ArrayList<>(); static boolean running = true; static final Runnable FINISH = () -> running = false; public static void main(String[] args) { synchronized (queueLock) { mainQueue.add(Demo::onStart); } while (running) { Runnable runnable = null; synchronized (queueLock) { if (!mainQueue.isEmpty()) runnable = mainQueue.remove(0); } if (runnable != null) { runnable.run(); } Thread.yield(); } } public static void onStart() { //... } public static void finish() { synchronized (queueLock) { mainQueue.clear(); mainQueue.add(FINISH); } } }
再模擬一個(gè)計(jì)算的線程和任務(wù)回調(diào):
interface Callback { void onResultCalculated(int result); } class CalcThread extends Thread { private final Callback callback; private final int a; private final int b; public CalcThread(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { super.run(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } }
填充一下onStart業(yè)務(wù):
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new CalcThread(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300).start(); } }
復(fù)習(xí):優(yōu)化為使用Runnable
在前文我們提到,如果業(yè)務(wù)僅關(guān)注任務(wù)的執(zhí)行,并不過于關(guān)心線程本身,則可以利用Runnable:
class Demo { static class CalcRunnable implements Runnable { private final Callback callback; private final int a; private final int b; public CalcRunnable(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new Thread(new CalcRunnable(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300)).start(); } }
不難想象出:我們非常需要
- 讓特定線程、特定類型的線程方便地接收任務(wù),回顧本系列文章中的 線程池篇 ,線程池是應(yīng)運(yùn)而生
- 擁有比Synchronize更輕量的機(jī)制
- 擁有更方便的數(shù)據(jù)結(jié)構(gòu)
至此,我們可以體會(huì)到:JDK1.5之前,因?yàn)镴DK的功能不足,Java程序?qū)τ诰€程的使用 較為粗糙。
為異步而生的Future
終于在JDK1.5中,迎來(lái)了新特性: Future
以及先前文章中提到的線程池, 時(shí)光荏苒,一晃將近20年了。
/** * 略 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
盡管已經(jīng)移除了API注釋,但仍然能夠理解每個(gè)API的含義,不多做贅述。
顯而易見,為了增加返回值,沒有必要用如此復(fù)雜的 接口來(lái)替代 Runnable
。簡(jiǎn)單思考后可以對(duì)返回值的情況進(jìn)行歸納:
- 返回Runnable中業(yè)務(wù)的結(jié)果,例如計(jì)算、讀取資源等
- 單純的在Runnable執(zhí)行完畢后返回一個(gè)結(jié)果
從業(yè)務(wù)層上看,僅需要如下接口即可,它增加了返回值、并可以更友好地讓使用者處理異常:
作者按:拋開底層實(shí)現(xiàn),僅看業(yè)務(wù)方編碼需要
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
顯然,JDK需要提供后向兼容能力:
- Runnable 不能夠丟棄,也不應(yīng)當(dāng)丟棄
- 不能要求使用者完全的重構(gòu)代碼
所以一并提供了適配器,讓使用者進(jìn)行簡(jiǎn)單的局部重構(gòu)即可用上新特性
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
而Future恰如其名,它代表了在 "未來(lái)" 的一個(gè)結(jié)果和狀態(tài),為了更方便地處理異步而生。
并且內(nèi)置了 FutureTask
,在 FutureTask詳解 章節(jié)中再行展開。
類圖
在JDK1.8的基礎(chǔ)上,看一下精簡(jiǎn)的類圖結(jié)構(gòu):
FutureTask詳解
構(gòu)造函數(shù)
public class FutureTask { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
生命周期
public class FutureTask { //新建 private static final int NEW = 0; //處理中 private static final int COMPLETING = 1; //正常 private static final int NORMAL = 2; //異常 private static final int EXCEPTIONAL = 3; //已取消 private static final int CANCELLED = 4; //中斷中 private static final int INTERRUPTING = 5; //已中斷 private static final int INTERRUPTED = 6; }
可能的生命周期轉(zhuǎn)換如下:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
JDK中原汁原味的解釋如下:
The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified.
核心方法
本節(jié)從以下三塊入手閱讀源碼
- 狀態(tài)判斷
- 取消
- 獲取結(jié)果
狀態(tài)判斷API的實(shí)現(xiàn)非常簡(jiǎn)單
public class FutureTask { public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } }
取消:
- 當(dāng)前狀態(tài)為
NEW
且 CAS修改 state 成功,否則返回取消失敗 - 如果
mayInterruptIfRunning
則中斷在執(zhí)行的線程并CAS修改state為INTERRUPTED - 調(diào)用 finishCompletion
刪除并通知所有等待的線程
調(diào)用done()
設(shè)置callable為null
public class FutureTask { public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) { return false; } try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } }
獲取結(jié)果: 先判斷狀態(tài),如果未進(jìn)入到 COMPLETING
(即為NEW狀態(tài)),則阻塞等待狀態(tài)改變,返回結(jié)果或拋出異常
public class FutureTask { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } }
如何使用
而使用則非常簡(jiǎn)單,也非常的樸素。
我們以文中的的例子進(jìn)行改造:
- 沿用原Runnable邏輯
- 移除回調(diào),增加
CalcResult
- 將
CalcResult
對(duì)象作為既定返回結(jié)果,Runnable中設(shè)置其屬性
class Demo { static class CalcResult { public int result; } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); calcResult.result = result; }, calcResult); System.out.println("threadId" + Thread.currentThread().getId() + "反正干點(diǎn)什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } finish(); } }
如果直接使用新特性Callback,則如下:
直接返回結(jié)果,當(dāng)然也可以直接返回Integer,不再包裹一層
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<CalcResult> resultFuture = executor.submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); calcResult.result = result; return calcResult; }); System.out.println("threadId" + Thread.currentThread().getId() + "反正干點(diǎn)什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); finish(); } }
相信讀者諸君會(huì)有這樣的疑惑:
為何使用Future比原先的回調(diào)看起來(lái)粗糙?
首先要明確一點(diǎn):文中前段的回調(diào)Demo,雖然達(dá)成了既定目標(biāo),但效率并不高??!在當(dāng)時(shí)計(jì)算很昂貴的背景下,并不會(huì)如此莽撞地使用!
而在JDK1.5開始,提供了大量?jī)?nèi)容支持多線程開發(fā)??紤]到篇幅,會(huì)在系列文章中逐步展開。
另外,F(xiàn)utureTask中的CAS與Happens-Before本篇中亦不做展開。
接下來(lái),再做一些引申,簡(jiǎn)單看一看多線程業(yè)務(wù)模式。
引申,多線程業(yè)務(wù)模式
常用的多線程設(shè)計(jì)模式包括:
- Future模式
- Master-Worker模式
- Guarded Suspension模式
- 不變模式
- 生產(chǎn)者-消費(fèi)
Future模式
文中對(duì)于Future的使用方式遵循了Future模式。
業(yè)務(wù)方在使用時(shí),已經(jīng)明確了任務(wù)被分離到其他線程執(zhí)行時(shí)有等待期,在此期間,可以干點(diǎn)別的事情,不必浪費(fèi)系統(tǒng)資源。
Master-Worker模式
在程序系統(tǒng)中設(shè)計(jì)兩類線程,并相互協(xié)作:
- Master線程(單個(gè))
- Worker線程
Master線程負(fù)責(zé)接受任務(wù)、分配任務(wù)、接收(必要時(shí)進(jìn)一步組合)結(jié)果并返回;
Worker線程負(fù)責(zé)處理子任務(wù),當(dāng)子任務(wù)處理完成后,向Master線程返回結(jié)果;
作者按:此時(shí)可再次回想一下文章開頭的Demo
Guarded Suspension模式
- 使用緩存隊(duì)列,使得 服務(wù)線程/服務(wù)進(jìn)程 在未就緒、忙碌時(shí)能夠延遲處理請(qǐng)求。
- 使用等待-通知機(jī)制,將消費(fèi)
服務(wù)的返回結(jié)果
的方式規(guī)范化
不變模式
在并行開發(fā)過程中,為確保數(shù)據(jù)的一致性和正確性,有必要對(duì)對(duì)象進(jìn)行同步,而同步操作會(huì)對(duì)程序系統(tǒng)的性能產(chǎn)生相當(dāng)?shù)膿p耗。
因此,使用狀態(tài)不可改變的對(duì)象,依靠其不變性來(lái)確保 并行操作 在 沒有同步機(jī)制 的情況下,保持一致性和正確性。
- 對(duì)象創(chuàng)建后,其內(nèi)部狀態(tài)和數(shù)據(jù)不再發(fā)生改變
- 對(duì)象被共享、被多個(gè)線程訪問
生產(chǎn)者-消費(fèi)
設(shè)計(jì)兩類線程:若干個(gè)生產(chǎn)者線程和若干個(gè)消費(fèi)者線程。
生產(chǎn)者線程負(fù)責(zé)提交用戶請(qǐng)求,消費(fèi)者線程負(fù)責(zé)處理用戶請(qǐng)求。生產(chǎn)者和消費(fèi)者之間通過共享內(nèi)存緩沖區(qū)進(jìn)行通信。
內(nèi)存緩沖區(qū)的意義:
- 解決是數(shù)據(jù)在多線程間的共享問題
- 緩解生產(chǎn)者和消費(fèi)者之間的性能差
這幾種模式從不同角度出發(fā)解決特定問題,但亦有一定的相似之處,不再展開。
后記
至此,我們已經(jīng)進(jìn)入尾聲,JDK1.5中,對(duì)多線程的支持迎來(lái)一波井噴。本文以及系列文章中關(guān)于線程池的內(nèi)容也僅僅是基礎(chǔ)中的基礎(chǔ),仍舊有大量的內(nèi)容值得深入,本篇不再往下挖掘。
在后續(xù)的系列文章中,我們將展開AQS、HAPPENS-BEFORE等內(nèi)容,以及和本文高度關(guān)聯(lián)的CompleteFutureTask,JUC工具等。
更多關(guān)于Java Future異步任務(wù)獲取的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java開發(fā)實(shí)現(xiàn)的Socket雙向通信功能示例
這篇文章主要介紹了Java開發(fā)實(shí)現(xiàn)的Socket雙向通信功能,結(jié)合實(shí)例形式分析了java基于socket實(shí)現(xiàn)的服務(wù)器端與客戶端雙向通信相關(guān)操作技巧,需要的朋友可以參考下2018-01-01關(guān)于MD5算法原理與常用實(shí)現(xiàn)方式
這篇文章主要介紹了關(guān)于MD5算法原理與常用實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08SpringBoot連接PostgreSQL+MybatisPlus入門案例(代碼詳解)
這篇文章主要介紹了SpringBoot連接PostgreSQL+MybatisPlus入門案例,本文通過實(shí)例代碼圖文相結(jié)合給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-07-07Java中NoClassDefFoundError?和?ClassNotFoundException的區(qū)別
Java中NoClassDefFoundError和ClassNotFoundException的區(qū)別,從類繼承層次上來(lái)看,ClassNotFoundException是從Exception繼承的,所以ClassNotFoundException是一個(gè)檢查異常。具體詳情需要的朋友可以參考下面文章內(nèi)容2022-06-06關(guān)于springboot中的自定義配置項(xiàng)
這篇文章主要介紹了關(guān)于springboot中的自定義配置項(xiàng),在項(xiàng)目開發(fā)的過程中,經(jīng)常需要自定義系統(tǒng)業(yè)務(wù)方面的配置文件及配置項(xiàng),Spring Boot提供了@value注解、@ConfigurationProperties注解和Environment接口等3種方式自定義配置項(xiàng),需要的朋友可以參考下2023-07-07Java中ShardingSphere 數(shù)據(jù)分片的實(shí)現(xiàn)
其實(shí)很多人對(duì)分庫(kù)分表多少都有點(diǎn)恐懼,我們今天用ShardingSphere 給大家演示數(shù)據(jù)分片,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09SpringBoot返回統(tǒng)一的JSON標(biāo)準(zhǔn)格式實(shí)現(xiàn)步驟
這篇文章主要介紹了SpringBoot返回統(tǒng)一的JSON標(biāo)準(zhǔn)格式,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08用java實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-09-09