Java多線程Future實現(xiàn)優(yōu)雅獲取線程的執(zhí)行結(jié)果
為什么要使用Future
線程獲取到運行結(jié)果有幾種方式
public class Sum { ?private Sum(){} ?public static int sum(int n){ ? ?int sum = 0; ? ?for (int i = 0; i < n; i++) { ? ? ?sum += n; ? } ? ?return sum; } }
Thread.sleep()
private static int sum_sleep = 0; Thread thread = new Thread(() -> sum_sleep = Sum.sum(100)); thread.start(); TimeUnit.SECONDS.sleep(1); System.out.printf("get result by thread.sleep: %d\n", sum_sleep);
使用sleep()
方法獲取,這種方法,有不可控性,也許sleep
1秒鐘,但是線程還沒有執(zhí)行完成,可能會導(dǎo)致獲取到的結(jié)果不準(zhǔn)確。
Thread.join()
private static int sum_join = 0; Thread thread = new Thread(() -> sum_join = Sum.sum(100)); thread.start(); thread.join(); System.out.printf("get result by thread.join: %d\n", sum_join);
循環(huán)
private static int sum_loop = 0; private static volatile boolean flag; ? Thread thread = new Thread(() -> { ?sum_loop = Sum.sum(100); ?flag = true; }); thread.start(); int i = 0; while (!flag) { ?i++; } System.out.printf("get result by loopLock: %d\n", sum_loop);
notifyAll() / wait()
private static class NotifyAndWaitTest { ? ? ?private Integer sum = null; ? ? ?private synchronized void sum_wait_notify() { ? ? ?sum = Sum.sum(100); ? ? ?notifyAll(); ? } ? ? ?private synchronized Integer getSum() { ? ? ?while (sum == null) { ? ? ? ?try { ? ? ? ? ?wait(); ? ? ? } catch (Exception e) { ? ? ? ? ?e.printStackTrace(); ? ? ? } ? ? } ? ? ?return sum; ? } } private static void getResultByNotifyAndWait() throws Exception { ? ?NotifyAndWaitTest test = new NotifyAndWaitTest(); ? ?new Thread(test::sum_wait_notify).start(); ? ?System.out.printf("get result by NotifyAndWait: %d\n", test.getSum()); }
Lock & Condition
private static class LockAndConditionTest { ? ? ?private Integer sum = null; ? ?private final Lock lock = new ReentrantLock(); ? ?private final Condition condition = lock.newCondition(); ? ? ?public void sum() { ? ? ?try { ? ? ? ?lock.lock(); ? ? ? ?sum = Sum.sum(100); ? ? ? ?condition.signalAll(); ? ? } catch (Exception e) { ? ? ? ?e.printStackTrace(); ? ? } finally { ? ? ? ?lock.unlock(); ? ? } ? } ? ? ?public Integer getSum() { ? ? ?try { ? ? ? ?lock.lock(); ? ? ? ?while (Objects.isNull(sum)) { ? ? ? ? ?try { ? ? ? ? ? ?condition.await(); ? ? ? ? } catch (Exception e) { ? ? ? ? ? ?throw new RuntimeException(e); ? ? ? ? } ? ? ? } ? ? } catch (Exception e) { ? ? ? ?e.printStackTrace(); ? ? } finally { ? ? ? ?lock.unlock(); ? ? } ? ? ?return sum; ? } } ? private static void getResultByLockAndCondition() throws Exception { ?LockAndConditionTest test = new LockAndConditionTest(); ?new Thread(test::sum).start(); ?System.out.printf("get result by lock and condition: %d\n", test.getSum()); }
BlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1); new Thread(() -> queue.offer(Sum.sum(100))).start(); System.out.printf("get result by blocking queue: %d\n", queue.take());
CountDownLatch
private static int sum_countDownLatch = 0; ? private static void getResultByCountDownLatch() { ?CountDownLatch latch = new CountDownLatch(1); ? ?new Thread( ? ? ? ? () -> { ? ? ? ? ? ?sum_countDownLatch = Sum.sum(100); ? ? ? ? ? ?latch.countDown(); ? ? ? ? }) ? ? .start(); ?try { ? ?latch.await(); } catch (Exception e) { ? ?e.printStackTrace(); } ?System.out.printf("get result by countDownLatch: %d\n", sum_countDownLatch); }
CyclicBarrier
private static int sum_cyclicBarrier = 0; ? private static void getResultByCycleBarrier() { ?CyclicBarrier cyclicBarrier = new CyclicBarrier(2); ?new Thread( ? () -> { ? ? ?sum_cyclicBarrier = Sum.sum(100); ? ? ?try { ? ? ? ?cyclicBarrier.await(); ? ? } catch (Exception e) { ? ? ? ?e.printStackTrace(); ? ? } ? }) ? .start(); ?try { ? ?cyclicBarrier.await(); } catch (Exception e) { ? ?e.printStackTrace(); } ?System.out.printf("get result by cyclicBarrier: %d\n", sum_cyclicBarrier); }
Semaphore
private static int sum_semaphore = 0; private static void getResultBySemaphore() { ?Semaphore semaphore = new Semaphore(0); ?new Thread( ? () -> { ? ? ?sum_semaphore = Sum.sum(100); ? ? ?semaphore.release(); ? }) ? .start(); ? ?try { ? ?semaphore.acquire(); ? ?System.out.printf("get result by semaphore: %d\n", sum_semaphore); } catch (InterruptedException e) { ? ?e.printStackTrace(); } }
上面提到的獲取線程執(zhí)行結(jié)果的方法,暫時基于之前學(xué)到的內(nèi)容,我只能想到這些。這些實現(xiàn)方式也不是很優(yōu)雅,不是最佳實踐。
線程池,利用ThreadPoolExecutor
的execute(Runnable command)
方法,利用這個方法雖說可以提交任務(wù),但是卻沒有辦法獲取任務(wù)執(zhí)行結(jié)果。
那么我們?nèi)绻枰@取任務(wù)的執(zhí)行結(jié)果并且優(yōu)雅的實現(xiàn),可以通過Future
接口和Callable
接口配合實現(xiàn), 本文將會通過具體的例子講解如何使用Future
。
Future
最主要的作用是,比如當(dāng)做比較耗時運算的時候,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運行效率會大大降低。我們可以把運算的過程放到子線程去執(zhí)行,再通過Future
去控制子線程執(zhí)行的計算過程,最后獲取到計算結(jié)果。這樣一來就可以把整個程序的運行效率提高,是一種異步的思想。
如何使用Future
要想使用Future
首先得先了解一下Callable
。Callable
接口相比于 Runnable
的一大優(yōu)勢是可以有返回結(jié)果,那這個返回結(jié)果怎么獲取呢?就可以用 Future
類的 get 方法來獲取 。因此,Future
相當(dāng)于一個存儲器,它存儲了 Callable
的call
方法的任務(wù)結(jié)果。
一般情況下,Future,Callable,ExecutorService
是一起使用的,ExecutorService
里相關(guān)的代碼如下:
// 提交 Runnable 任務(wù) // 由于Runnable接口的run方法沒有返回值,所以,F(xiàn)uture僅僅是用來斷言任務(wù)已經(jīng)結(jié)束,有點類似join(); Future<?> submit(Runnable task); // 提交 Callable 任務(wù) // Callable里的call方法是有返回值的,所以這個方法返回的Future對象可以通過調(diào)用其get()方法來獲取任務(wù)的執(zhí) //行結(jié)果。 <T> Future<T> submit(Callable<T> task); // 提交 Runnable 任務(wù)及結(jié)果引用 ? // Future的返回值就是傳給submit()方法的參數(shù)result。 <T> Future<T> submit(Runnable task, T result);
具體使用方法如下:
ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> future = executor.submit(() -> Sum.sum(100)); ? System.out.printf("get result by Callable + Future: %d\n", future.get()); executor.shutdown();
Future實現(xiàn)原理
Future基本概述
Future
接口5個方法:
// 取消任務(wù) boolean cancel(boolean mayInterruptIfRunning); // 判斷任務(wù)是否已取消 ? boolean isCancelled(); // 判斷任務(wù)是否已結(jié)束 boolean isDone(); // 獲得任務(wù)執(zhí)行結(jié)果 阻塞,被調(diào)用時,如果任務(wù)還沒有執(zhí)行完,那么調(diào)用get()方法的線程會阻塞。直到任務(wù)執(zhí)行完 // 才會被喚醒 get(); // 獲得任務(wù)執(zhí)行結(jié)果,支持超時 get(long timeout, TimeUnit unit);
cancel(boolean mayInterruptIfRunning)
:
- 用來取消異步任務(wù)的執(zhí)行。
- 如果異步任務(wù)已經(jīng)完成或者已經(jīng)被取消,或者由于某些原因不能取消,則會返回
false
。 - 如果任務(wù)還沒有被執(zhí)行,則會返回
true
并且異步任務(wù)不會被執(zhí)行。 - 如果任務(wù)已經(jīng)開始執(zhí)行了但是還沒有執(zhí)行完成,若
mayInterruptIfRunning
為true
,則會立即中斷執(zhí)行任務(wù)的線程并返回true
,若mayInterruptIfRunning
為false
,則會返回true
且不會中斷任務(wù)執(zhí)行線程。
isCanceled()
:
- 判斷任務(wù)是否被取消。
- 如果任務(wù)在結(jié)束(正常執(zhí)行結(jié)束或者執(zhí)行異常結(jié)束)前被取消則返回
true
,否則返回false
。
isDone()
:
- ·判斷任務(wù)是否已經(jīng)完成,如果完成則返回
true
,否則返回false
。 - 任務(wù)執(zhí)行過程中發(fā)生異常、任務(wù)被取消也屬于任務(wù)已完成,也會返回
true
。
get()
:
- 獲取任務(wù)執(zhí)行結(jié)果,如果任務(wù)還沒完成則會阻塞等待直到任務(wù)執(zhí)行完成。
- 如果任務(wù)被取消則會拋出
CancellationException
異常。 - 如果任務(wù)執(zhí)行過程發(fā)生異常則會拋出
ExecutionException
異常。 - 如果阻塞等待過程中被中斷則會拋出
InterruptedException
異常。
get(long timeout,Timeunit unit)
:
- 帶超時時間的
get()
版本,上面講述的get()
方法,同樣適用這里。 - 如果阻塞等待過程中超時則會拋出
TimeoutException
異常。
使用IDEA,查看Future
的實現(xiàn)類其實有很多,比如FutureTask,F(xiàn)orkJoinTask,CompletableFuture
等,其余基本是繼承了ForkJoinTask
實現(xiàn)的內(nèi)部類。
本篇文章主要講解FutureTask
的實現(xiàn)原理
FutureTask基本概述
FutureTask
為 Future
提供了基礎(chǔ)實現(xiàn),如獲取任務(wù)執(zhí)行結(jié)果(get)
和取消任務(wù)(cancel)
等。如果任務(wù)尚未完成,獲取任務(wù)執(zhí)行結(jié)果時將會阻塞。一旦執(zhí)行結(jié)束,任務(wù)就不能被重啟或取消(除非使用runAndReset
執(zhí)行計算)。FutureTask
常用來封裝 Callable
和 Runnable
,也可以作為一個任務(wù)提交到線程池中執(zhí)行。除了作為一個獨立的類之外,此類也提供了一些功能性函數(shù)供我們創(chuàng)建自定義 task
類使用。FutureTask 的線程安全由CAS
來保證。
// 創(chuàng)建 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2); // 創(chuàng)建線程池 ExecutorService es = Executors.newCachedThreadPool(); // 提交 FutureTask es.submit(futureTask); // 獲取計算結(jié)果 Integer result = futureTask.get();
// 創(chuàng)建 FutureTask FutureTask<Integer> futureTask ?= new FutureTask<>(()-> 1+2); // 創(chuàng)建并啟動線程 Thread T1 = new Thread(futureTask); T1.start(); // 獲取計算結(jié)果 Integer result = futureTask.get();
FutureTask
可以很容易獲取子線程的執(zhí)行結(jié)果。
FutureTask實現(xiàn)原理
構(gòu)造函數(shù)
public FutureTask(Callable<V> callable) { ?if (callable == null) ? ?throw new NullPointerException(); ?this.callable = callable; ?this.state = NEW; ? ? ? // ensure visibility of callable } ? public FutureTask(Runnable runnable, V result) { ?this.callable = Executors.callable(runnable, result); ?this.state = NEW; ? ? ? // ensure visibility of callable }
FutureTask
提供了兩個構(gòu)造器
Callable
接口有返回,將callable
賦值給this.callable
。
Runnable
接口無返回,如果想要獲取到執(zhí)行結(jié)果,需要傳V result
給FutureTask
,FutureTask
將Runnable
和result
封裝成Callable
,再將callable
賦值給this.callable
。
狀態(tài)初始化狀態(tài)為NEW
FutureTask
內(nèi)置狀態(tài)有:
private volatile int state; // 可見性 private static final int NEW ? ? ? ? ?= 0; private static final int COMPLETING ? = 1; private static final int NORMAL ? ? ? = 2; private static final int EXCEPTIONAL ?= 3; private static final int CANCELLED ? ?= 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED ?= 6;
- NEW 初始狀態(tài)
- COMPLETING 任務(wù)已經(jīng)執(zhí)行完(正?;蛘弋惓?,準(zhǔn)備賦值結(jié)果,但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。
- NORMAL 任務(wù)已經(jīng)正常執(zhí)行完,并已將任務(wù)返回值賦值到結(jié)果
- EXCEPTIONAL 任務(wù)執(zhí)行失敗,并將異常賦值到結(jié)果
- CANCELLED 取消
- INTERRUPTING 準(zhǔn)備嘗試中斷執(zhí)行任務(wù)的線程
- INTERRUPTED 對執(zhí)行任務(wù)的線程進(jìn)行中斷(未必中斷到)
狀態(tài)轉(zhuǎn)換:
run()執(zhí)行流程
public void run() { ? ?if (state != NEW || ? ? ? ?!RUNNER.compareAndSet(this, null, Thread.currentThread())) ? ? ? ?return; ? ?try { ? ? ? ?Callable<V> c = callable; ? ? ? ?if (c != null && state == NEW) { ? ? ? ? ? ?V result; ? ? ? ? ? ?boolean ran; ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?result = c.call(); ? ? ? ? ? ? ? ?ran = true; ? ? ? ? ? } catch (Throwable ex) { ? ? ? ? ? ? ? ?result = null; ? ? ? ? ? ? ? ?ran = false; ? ? ? ? ? ? ? ?setException(ex); ? ? ? ? ? } ? ? ? ? ? ?if (ran) ? ? ? ? ? ? ? ?set(result); ? ? ? } ? } finally { ? ? ? ?// runner must be non-null until state is settled to ? ? ? ?// prevent concurrent calls to run() ? ? ? ?runner = null; ? ? ? ?// state must be re-read after nulling runner to prevent ? ? ? ?// leaked interrupts ? ? ? ?int s = state; ? ? ? ?if (s >= INTERRUPTING) ? ? ? ? ? ?handlePossibleCancellationInterrupt(s); ? } }
set()
protected void set(V v) { // state變量,通過CAS操作,將NEW->COMPLETING ? ?if (STATE.compareAndSet(this, NEW, COMPLETING)) { ? ? ? ?// 將結(jié)果賦值給outcome屬性 ? ? ? ?outcome = v; ? ? ? ?// state狀態(tài)直接賦值為NORMAL,不需要CAS ? ? ? ?STATE.setRelease(this, NORMAL); // final state ? ? ? ?finishCompletion(); ? } }
setException()
protected void setException(Throwable t) { ? ?// state變量,通過CAS操作,將NEW->COMPLETING ? ?if (STATE.compareAndSet(this, NEW, COMPLETING)) { ? ? ? ?// 將異常賦值給outcome屬性 ? ? ? ?outcome = t; ? ? ? ?// state狀態(tài)直接賦值為EXCEPTIONAL,不需要CAS ? ? ? ?STATE.setRelease(this, EXCEPTIONAL); // final state ? ? ? ?finishCompletion(); ? } }
finishCompletion()
set()
和setException()
兩個方法最后都調(diào)用了finishCompletion()
方法,完成一些善后工作,具體流程如下:
private void finishCompletion() { ? ?// assert state > COMPLETING; ? ?for (WaitNode q; (q = waiters) != null;) { ? ? ? ?// 移除等待線程 ? ? ? ?if (WAITERS.weakCompareAndSet(this, q, null)) { ? ? ? ? ? ?// 自旋遍歷等待線程 ? ? ? ? ? ?for (;;) { ? ? ? ? ? ? ? ?Thread t = q.thread; ? ? ? ? ? ? ? ?if (t != null) { ? ? ? ? ? ? ? ? ? ?q.thread = null; ? ? ? ? ? ? ? ? ? ?// 喚醒等待線程 ? ? ? ? ? ? ? ? ? ?LockSupport.unpark(t); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?WaitNode next = q.next; ? ? ? ? ? ? ? ?if (next == null) ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? ?q.next = null; // unlink to help gc ? ? ? ? ? ? ? ?q = next; ? ? ? ? ? } ? ? ? ? ? ?break; ? ? ? } ? } ? ?// 任務(wù)完成后調(diào)用函數(shù),自定義擴展 ? ?done(); ? ? ?callable = null; ? ? ? ?// to reduce footprint }
handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) { ? ?if (s == INTERRUPTING) ? ? ? ?// 在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待 ? ? ? ?while (state == INTERRUPTING) ? ? ? ? ? ?Thread.yield(); // wait out pending interrupt }
get()執(zhí)行流程
public V get() throws InterruptedException, ExecutionException { ? ?int s = state; ? ?if (s <= COMPLETING) ? ? ? ?// awaitDone用于等待任務(wù)完成,或任務(wù)因為中斷或超時而終止。返回任務(wù)的完成狀態(tài)。 ? ? ? ?s = awaitDone(false, 0L); ? ?return report(s); }
具體流程:
awaitDone()
private int awaitDone(boolean timed, long nanos) ? ?throws InterruptedException { ? ?long startTime = 0L; ? ?// Special value 0L means not yet parked ? ?WaitNode q = null; ? ?boolean queued = false; ? ?for (;;) { ? ? ? ?// 獲取到當(dāng)前狀態(tài) ? ? ? ?int s = state; ? ? ? ?// 如果當(dāng)前狀態(tài)不為NEW或者COMPLETING ? ? ? ?if (s > COMPLETING) { ? ? ? ? ? ?if (q != null) ? ? ? ? ? ? ? ?q.thread = null; ? ? ? ? ? ?// 直接返回state ? ? ? ? ? ?return s; ? ? ? } ? ? ? ?// COMPLETING是一個很短暫的狀態(tài),調(diào)用Thread.yield期望讓出時間片,之后重試循環(huán)。 ? ? ? ?else if (s == COMPLETING) ? ? ? ? ? ?Thread.yield(); ? ? ? ?// 如果阻塞線程被中斷則將當(dāng)前線程從阻塞隊列中移除 ? ? ? ?else if (Thread.interrupted()) { ? ? ? ? ? ?removeWaiter(q); ? ? ? ? ? ?throw new InterruptedException(); ? ? ? } ? ? ? ?// 新進(jìn)來的線程添加等待節(jié)點 ? ? ? ?else if (q == null) { ? ? ? ? ? ?if (timed && nanos <= 0L) ? ? ? ? ? ? ? ?return s; ? ? ? ? ? ?q = new WaitNode(); ? ? ? } ? ? ? ?else if (!queued) ? ? ? ? ? ?/* ? ? ? ? ? ? * 這是Treiber Stack算法入棧的邏輯。 ? ? ? ? ? ? * Treiber Stack是一個基于CAS的無鎖并發(fā)棧實現(xiàn), ? ? ? ? ? ? * 更多可以參考https://en.wikipedia.org/wiki/Treiber_Stack ? ? ? ? ? ? */ ? ? ? ? ? ?queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); ? ? ? ?else if (timed) { ? ? ? ? ? ?final long parkNanos; ? ? ? ? ? ?if (startTime == 0L) { // first time ? ? ? ? ? ? ? ?startTime = System.nanoTime(); ? ? ? ? ? ? ? ?if (startTime == 0L) ? ? ? ? ? ? ? ? ? ?startTime = 1L; ? ? ? ? ? ? ? ?parkNanos = nanos; ? ? ? ? ? } else { ? ? ? ? ? ? ? ?long elapsed = System.nanoTime() - startTime; ? ? ? ? ? ? ? ?if (elapsed >= nanos) { ? ? ? ? ? ? ? ? ? ?// 超時,移除棧中節(jié)點。 ? ? ? ? ? ? ? ? ? ?removeWaiter(q); ? ? ? ? ? ? ? ? ? ?return state; ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?parkNanos = nanos - elapsed; ? ? ? ? ? } ? ? ? ? ? ?// nanoTime may be slow; recheck before parking ? ? ? ? ? ?// 未超市并且狀態(tài)為NEW,阻塞當(dāng)前線程 ? ? ? ? ? ?if (state < COMPLETING) ? ? ? ? ? ? ? ?LockSupport.parkNanos(this, parkNanos); ? ? ? } ? ? ? ?else ? ? ? ? ? ?LockSupport.park(this); ? } }
removeWaiter()
private void removeWaiter(WaitNode node) { ?if (node != null) { ? ?node.thread = null; ? ?retry: ? ?for (;;) { ? ? ? ? ?// restart on removeWaiter race ? ? ?for (WaitNode pred = null, q = waiters, s; q != null; q = s) { ? ? ? ?s = q.next; ? ? ? ?// 如果當(dāng)前節(jié)點仍有效,則置pred為當(dāng)前節(jié)點,繼續(xù)遍歷。 ? ? ? ?if (q.thread != null) ? ? ? ? ?pred = q; ? ? ? ?/* ? ? ? ?* 當(dāng)前節(jié)點已無效且有前驅(qū),則將前驅(qū)的后繼置為當(dāng)前節(jié)點的后繼實現(xiàn)刪除節(jié)點。 ? ? ? ?* 如果前驅(qū)節(jié)點已無效,則重新遍歷waiters棧。 ? ? ? ?*/ ? ? ? ?else if (pred != null) { ? ? ? ? ?pred.next = s; ? ? ? ? ?if (pred.thread == null) // check for race ? ? ? ? ? ?continue retry; ? ? ? } ? ? ? ?/* ? ? ? ?* 當(dāng)前節(jié)點已無效,且當(dāng)前節(jié)點沒有前驅(qū),則將棧頂置為當(dāng)前節(jié)點的后繼。 ? ? ? ?* 失敗的話重新遍歷waiters棧。 ? ? ? ?*/ ? ? ? ?else if (!WAITERS.compareAndSet(this, q, s)) ? ? ? ? ?continue retry; ? ? } ? ? ?break; ? } } }
report()
private V report(int s) throws ExecutionException { ? ?Object x = outcome; ? ?if (s == NORMAL) ? ? ? ?return (V)x; ? ?if (s >= CANCELLED) ? ? ? ?throw new CancellationException(); ? ?throw new ExecutionException((Throwable)x); }
cancel()執(zhí)行流程
public boolean cancel(boolean mayInterruptIfRunning) { ? ?// 狀態(tài)機不是NEW 或CAS更新狀態(tài) 流轉(zhuǎn)到INTERRUPTING或者CANCELLED失敗,不允許cancel ? ?if (!(state == NEW && STATE.compareAndSet ? ? ? ? (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) ? ? ? ?return false; ? ?try { ? ?// in case call to interrupt throws exception ? ? ? ?// 如果要求中斷執(zhí)行中的任務(wù),則直接中斷任務(wù)執(zhí)行線程,并更新狀態(tài)機為最終狀態(tài)INTERRUPTED ? ? ? ?if (mayInterruptIfRunning) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Thread t = runner; ? ? ? ? ? ? ? ?if (t != null) ? ? ? ? ? ? ? ? ? ?t.interrupt(); ? ? ? ? ? } finally { // final state ? ? ? ? ? ? ? ?STATE.setRelease(this, INTERRUPTED); ? ? ? ? ? } ? ? ? } ? } finally { ? ? ? ?finishCompletion(); ? } ? ?return true; }
經(jīng)典案例
引用極客時間-java并發(fā)編程課程的案例燒水泡茶:
并發(fā)編程可以總結(jié)為三個核心問題:分工,同步和互斥。編寫并發(fā)程序,首先要做分工。
- T1負(fù)責(zé)洗水壺,燒開水,泡茶這三道工序
- T2負(fù)責(zé)洗茶壺,洗茶杯,拿茶葉三道工序。
- T1在執(zhí)行泡茶這道工序需要等到T2完成拿茶葉的工作。(join,countDownLatch,阻塞隊列都可以完成)
// 創(chuàng)建任務(wù) T2 的 FutureTask FutureTask<String> ft2 ?= new FutureTask<>(new T2Task()); // 創(chuàng)建任務(wù) T1 的 FutureTask FutureTask<String> ft1 ?= new FutureTask<>(new T1Task(ft2)); // 線程 T1 執(zhí)行任務(wù) ft1 Thread T1 = new Thread(ft1); T1.start(); // 線程 T2 執(zhí)行任務(wù) ft2 Thread T2 = new Thread(ft2); T2.start(); // 等待線程 T1 執(zhí)行結(jié)果 System.out.println(ft1.get()); ? // T1Task 需要執(zhí)行的任務(wù): // 洗水壺、燒開水、泡茶 class T1Task implements Callable<String>{ ?FutureTask<String> ft2; ?// T1 任務(wù)需要 T2 任務(wù)的 FutureTask ?T1Task(FutureTask<String> ft2){ ? ?this.ft2 = ft2; } ?@Override ?String call() throws Exception { ? ?System.out.println("T1: 洗水壺..."); ? ?TimeUnit.SECONDS.sleep(1); ? ?System.out.println("T1: 燒開水..."); ? ?TimeUnit.SECONDS.sleep(15); ? ?// 獲取 T2 線程的茶葉 ? ? ?String tf = ft2.get(); ? ?System.out.println("T1: 拿到茶葉:"+tf); ? ? ?System.out.println("T1: 泡茶..."); ? ?return " 上茶:" + tf; } } // T2Task 需要執(zhí)行的任務(wù): // 洗茶壺、洗茶杯、拿茶葉 class T2Task implements Callable<String> { ?@Override ?String call() throws Exception { ? ?System.out.println("T2: 洗茶壺..."); ? ?TimeUnit.SECONDS.sleep(1); ? ? ?System.out.println("T2: 洗茶杯..."); ? ?TimeUnit.SECONDS.sleep(2); ? ? ?System.out.println("T2: 拿茶葉..."); ? ?TimeUnit.SECONDS.sleep(1); ? ?return " 龍井 "; } } // 一次執(zhí)行結(jié)果: //T1: 洗水壺... //T2: 洗茶壺... //T1: 燒開水... //T2: 洗茶杯... //T2: 拿茶葉... //T1: 拿到茶葉: 龍井 //T1: 泡茶... //上茶: 龍井
以上就是Java多線程Future實現(xiàn)優(yōu)雅獲取線程的執(zhí)行結(jié)果的詳細(xì)內(nèi)容,更多關(guān)于Java獲取線程執(zhí)行結(jié)果的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring?Security權(quán)限注解啟動及邏輯處理使用示例
這篇文章主要為大家介紹了Spring?Security權(quán)限注解啟動及邏輯處理使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07詳談spring中bean注入無效和new創(chuàng)建對象的區(qū)別
這篇文章主要介紹了spring中bean注入無效和new創(chuàng)建對象的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02Mybatis-Plus中updateById方法不能更新空值問題解決
本文主要介紹了Mybatis-Plus中updateById方法不能更新空值問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08JAVA使用JDBC技術(shù)操作SqlServer數(shù)據(jù)庫實例代碼
本篇文章主要介紹了JAVA使用JDBC技術(shù)操作SqlServer數(shù)據(jù)庫實例代碼,具有一定的參考價值,有興趣的可以了解一下。2017-01-01Java+Selenium實現(xiàn)控制瀏覽器的啟動選項Options
這篇文章主要為大家詳細(xì)介紹了如何使用java代碼利用selenium控制瀏覽器的啟動選項Options的代碼操作,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2023-01-01Spring Cache + Caffeine的整合與使用示例詳解
對于一些項目里需要對數(shù)據(jù)庫里的某些數(shù)據(jù)一直重復(fù)請求的,且這些數(shù)據(jù)基本是固定的,在這種情況下,可以借助簡單使用本地緩存來緩存這些數(shù)據(jù),本文介紹一下Spring Cache和Caffeine的使用,感興趣的朋友一起看看吧2023-12-12