Java多線程Future實(shí)現(xiàn)優(yōu)雅獲取線程的執(zhí)行結(jié)果
為什么要使用Future
線程獲取到運(yùn)行結(jié)果有幾種方式
public class Sum {
?private Sum(){}
?public static int sum(int n){
? ?int sum = 0;
? ?for (int i = 0; i < n; i++) {
? ? ?sum += n;
? }
? ?return sum;
}
}Thread.sleep()
private static int sum_sleep = 0;
Thread thread = new Thread(() -> sum_sleep = Sum.sum(100));
thread.start();
TimeUnit.SECONDS.sleep(1);
System.out.printf("get result by thread.sleep: %d\n", sum_sleep);使用sleep()方法獲取,這種方法,有不可控性,也許sleep1秒鐘,但是線程還沒有執(zhí)行完成,可能會(huì)導(dǎo)致獲取到的結(jié)果不準(zhǔn)確。
Thread.join()
private static int sum_join = 0;
Thread thread = new Thread(() -> sum_join = Sum.sum(100));
thread.start();
thread.join();
System.out.printf("get result by thread.join: %d\n", sum_join);循環(huán)
private static int sum_loop = 0;
private static volatile boolean flag;
?
Thread thread = new Thread(() -> {
?sum_loop = Sum.sum(100);
?flag = true;
});
thread.start();
int i = 0;
while (!flag) {
?i++;
}
System.out.printf("get result by loopLock: %d\n", sum_loop);notifyAll() / wait()
private static class NotifyAndWaitTest {
?
? ?private Integer sum = null;
?
? ?private synchronized void sum_wait_notify() {
? ? ?sum = Sum.sum(100);
? ? ?notifyAll();
? }
?
? ?private synchronized Integer getSum() {
? ? ?while (sum == null) {
? ? ? ?try {
? ? ? ? ?wait();
? ? ? } catch (Exception e) {
? ? ? ? ?e.printStackTrace();
? ? ? }
? ? }
? ? ?return sum;
? }
}
private static void getResultByNotifyAndWait() throws Exception {
? ?NotifyAndWaitTest test = new NotifyAndWaitTest();
? ?new Thread(test::sum_wait_notify).start();
? ?System.out.printf("get result by NotifyAndWait: %d\n", test.getSum());
}Lock & Condition
private static class LockAndConditionTest {
?
? ?private Integer sum = null;
? ?private final Lock lock = new ReentrantLock();
? ?private final Condition condition = lock.newCondition();
?
? ?public void sum() {
? ? ?try {
? ? ? ?lock.lock();
? ? ? ?sum = Sum.sum(100);
? ? ? ?condition.signalAll();
? ? } catch (Exception e) {
? ? ? ?e.printStackTrace();
? ? } finally {
? ? ? ?lock.unlock();
? ? }
? }
?
? ?public Integer getSum() {
? ? ?try {
? ? ? ?lock.lock();
? ? ? ?while (Objects.isNull(sum)) {
? ? ? ? ?try {
? ? ? ? ? ?condition.await();
? ? ? ? } catch (Exception e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? }
? ? ? }
? ? } catch (Exception e) {
? ? ? ?e.printStackTrace();
? ? } finally {
? ? ? ?lock.unlock();
? ? }
? ? ?return sum;
? }
}
?
private static void getResultByLockAndCondition() throws Exception {
?LockAndConditionTest test = new LockAndConditionTest();
?new Thread(test::sum).start();
?System.out.printf("get result by lock and condition: %d\n", test.getSum());
}BlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
new Thread(() -> queue.offer(Sum.sum(100))).start();
System.out.printf("get result by blocking queue: %d\n", queue.take());CountDownLatch
private static int sum_countDownLatch = 0;
?
private static void getResultByCountDownLatch() {
?CountDownLatch latch = new CountDownLatch(1);
?
?new Thread(
? ? ? ? () -> {
? ? ? ? ? ?sum_countDownLatch = Sum.sum(100);
? ? ? ? ? ?latch.countDown();
? ? ? ? })
? ? .start();
?try {
? ?latch.await();
} catch (Exception e) {
? ?e.printStackTrace();
}
?System.out.printf("get result by countDownLatch: %d\n", sum_countDownLatch);
}CyclicBarrier
private static int sum_cyclicBarrier = 0;
?
private static void getResultByCycleBarrier() {
?CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
?new Thread(
? () -> {
? ? ?sum_cyclicBarrier = Sum.sum(100);
? ? ?try {
? ? ? ?cyclicBarrier.await();
? ? } catch (Exception e) {
? ? ? ?e.printStackTrace();
? ? }
? })
? .start();
?try {
? ?cyclicBarrier.await();
} catch (Exception e) {
? ?e.printStackTrace();
}
?System.out.printf("get result by cyclicBarrier: %d\n", sum_cyclicBarrier);
}Semaphore
private static int sum_semaphore = 0;
private static void getResultBySemaphore() {
?Semaphore semaphore = new Semaphore(0);
?new Thread(
? () -> {
? ? ?sum_semaphore = Sum.sum(100);
? ? ?semaphore.release();
? })
? .start();
?
?try {
? ?semaphore.acquire();
? ?System.out.printf("get result by semaphore: %d\n", sum_semaphore);
} catch (InterruptedException e) {
? ?e.printStackTrace();
}
}上面提到的獲取線程執(zhí)行結(jié)果的方法,暫時(shí)基于之前學(xué)到的內(nèi)容,我只能想到這些。這些實(shí)現(xiàn)方式也不是很優(yōu)雅,不是最佳實(shí)踐。
線程池,利用ThreadPoolExecutor的execute(Runnable command)方法,利用這個(gè)方法雖說可以提交任務(wù),但是卻沒有辦法獲取任務(wù)執(zhí)行結(jié)果。
那么我們?nèi)绻枰@取任務(wù)的執(zhí)行結(jié)果并且優(yōu)雅的實(shí)現(xiàn),可以通過Future接口和Callable接口配合實(shí)現(xiàn), 本文將會(huì)通過具體的例子講解如何使用Future。
Future最主要的作用是,比如當(dāng)做比較耗時(shí)運(yùn)算的時(shí)候,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運(yùn)行效率會(huì)大大降低。我們可以把運(yùn)算的過程放到子線程去執(zhí)行,再通過Future去控制子線程執(zhí)行的計(jì)算過程,最后獲取到計(jì)算結(jié)果。這樣一來就可以把整個(gè)程序的運(yùn)行效率提高,是一種異步的思想。
如何使用Future
要想使用Future首先得先了解一下Callable。Callable 接口相比于 Runnable 的一大優(yōu)勢是可以有返回結(jié)果,那這個(gè)返回結(jié)果怎么獲取呢?就可以用 Future 類的 get 方法來獲取 。因此,Future 相當(dāng)于一個(gè)存儲(chǔ)器,它存儲(chǔ)了 Callable 的call方法的任務(wù)結(jié)果。
一般情況下,Future,Callable,ExecutorService是一起使用的,ExecutorService里相關(guān)的代碼如下:
// 提交 Runnable 任務(wù) // 由于Runnable接口的run方法沒有返回值,所以,F(xiàn)uture僅僅是用來斷言任務(wù)已經(jīng)結(jié)束,有點(diǎn)類似join(); Future<?> submit(Runnable task); // 提交 Callable 任務(wù) // Callable里的call方法是有返回值的,所以這個(gè)方法返回的Future對(duì)象可以通過調(diào)用其get()方法來獲取任務(wù)的執(zhí) //行結(jié)果。 <T> Future<T> submit(Callable<T> task); // 提交 Runnable 任務(wù)及結(jié)果引用 ? // Future的返回值就是傳給submit()方法的參數(shù)result。 <T> Future<T> submit(Runnable task, T result);
具體使用方法如下:
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> Sum.sum(100));
?
System.out.printf("get result by Callable + Future: %d\n", future.get());
executor.shutdown();Future實(shí)現(xiàn)原理
Future基本概述
Future接口5個(gè)方法:
// 取消任務(wù) boolean cancel(boolean mayInterruptIfRunning); // 判斷任務(wù)是否已取消 ? boolean isCancelled(); // 判斷任務(wù)是否已結(jié)束 boolean isDone(); // 獲得任務(wù)執(zhí)行結(jié)果 阻塞,被調(diào)用時(shí),如果任務(wù)還沒有執(zhí)行完,那么調(diào)用get()方法的線程會(huì)阻塞。直到任務(wù)執(zhí)行完 // 才會(huì)被喚醒 get(); // 獲得任務(wù)執(zhí)行結(jié)果,支持超時(shí) get(long timeout, TimeUnit unit);
cancel(boolean mayInterruptIfRunning):
- 用來取消異步任務(wù)的執(zhí)行。
- 如果異步任務(wù)已經(jīng)完成或者已經(jīng)被取消,或者由于某些原因不能取消,則會(huì)返回
false。 - 如果任務(wù)還沒有被執(zhí)行,則會(huì)返回
true并且異步任務(wù)不會(huì)被執(zhí)行。 - 如果任務(wù)已經(jīng)開始執(zhí)行了但是還沒有執(zhí)行完成,若
mayInterruptIfRunning為true,則會(huì)立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning為false,則會(huì)返回true且不會(huì)中斷任務(wù)執(zhí)行線程。
isCanceled():
- 判斷任務(wù)是否被取消。
- 如果任務(wù)在結(jié)束(正常執(zhí)行結(jié)束或者執(zhí)行異常結(jié)束)前被取消則返回
true,否則返回false。
isDone():
- ·判斷任務(wù)是否已經(jīng)完成,如果完成則返回
true,否則返回false。 - 任務(wù)執(zhí)行過程中發(fā)生異常、任務(wù)被取消也屬于任務(wù)已完成,也會(huì)返回
true。
get():
- 獲取任務(wù)執(zhí)行結(jié)果,如果任務(wù)還沒完成則會(huì)阻塞等待直到任務(wù)執(zhí)行完成。
- 如果任務(wù)被取消則會(huì)拋出
CancellationException異常。 - 如果任務(wù)執(zhí)行過程發(fā)生異常則會(huì)拋出
ExecutionException異常。 - 如果阻塞等待過程中被中斷則會(huì)拋出
InterruptedException異常。
get(long timeout,Timeunit unit):
- 帶超時(shí)時(shí)間的
get()版本,上面講述的get()方法,同樣適用這里。 - 如果阻塞等待過程中超時(shí)則會(huì)拋出
TimeoutException異常。
使用IDEA,查看Future的實(shí)現(xiàn)類其實(shí)有很多,比如FutureTask,F(xiàn)orkJoinTask,CompletableFuture等,其余基本是繼承了ForkJoinTask實(shí)現(xiàn)的內(nèi)部類。
本篇文章主要講解FutureTask的實(shí)現(xiàn)原理
FutureTask基本概述
FutureTask 為 Future 提供了基礎(chǔ)實(shí)現(xiàn),如獲取任務(wù)執(zhí)行結(jié)果(get)和取消任務(wù)(cancel)等。如果任務(wù)尚未完成,獲取任務(wù)執(zhí)行結(jié)果時(shí)將會(huì)阻塞。一旦執(zhí)行結(jié)束,任務(wù)就不能被重啟或取消(除非使用runAndReset執(zhí)行計(jì)算)。FutureTask 常用來封裝 Callable 和 Runnable,也可以作為一個(gè)任務(wù)提交到線程池中執(zhí)行。除了作為一個(gè)獨(dú)立的類之外,此類也提供了一些功能性函數(shù)供我們創(chuàng)建自定義 task 類使用。FutureTask 的線程安全由CAS來保證。
// 創(chuàng)建 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2); // 創(chuàng)建線程池 ExecutorService es = Executors.newCachedThreadPool(); // 提交 FutureTask es.submit(futureTask); // 獲取計(jì)算結(jié)果 Integer result = futureTask.get();
// 創(chuàng)建 FutureTask FutureTask<Integer> futureTask ?= new FutureTask<>(()-> 1+2); // 創(chuàng)建并啟動(dòng)線程 Thread T1 = new Thread(futureTask); T1.start(); // 獲取計(jì)算結(jié)果 Integer result = futureTask.get();
FutureTask可以很容易獲取子線程的執(zhí)行結(jié)果。
FutureTask實(shí)現(xiàn)原理
構(gòu)造函數(shù)
public FutureTask(Callable<V> callable) {
?if (callable == null)
? ?throw new NullPointerException();
?this.callable = callable;
?this.state = NEW; ? ? ? // ensure visibility of callable
}
?
public FutureTask(Runnable runnable, V result) {
?this.callable = Executors.callable(runnable, result);
?this.state = NEW; ? ? ? // ensure visibility of callable
}FutureTask提供了兩個(gè)構(gòu)造器
Callable接口有返回,將callable賦值給this.callable。
Runnable接口無返回,如果想要獲取到執(zhí)行結(jié)果,需要傳V result給FutureTask,FutureTask將Runnable和result封裝成Callable,再將callable賦值給this.callable。
狀態(tài)初始化狀態(tài)為NEW
FutureTask內(nèi)置狀態(tài)有:
private volatile int state; // 可見性 private static final int NEW ? ? ? ? ?= 0; private static final int COMPLETING ? = 1; private static final int NORMAL ? ? ? = 2; private static final int EXCEPTIONAL ?= 3; private static final int CANCELLED ? ?= 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED ?= 6;
- NEW 初始狀態(tài)
- COMPLETING 任務(wù)已經(jīng)執(zhí)行完(正?;蛘弋惓?,準(zhǔn)備賦值結(jié)果,但是這個(gè)狀態(tài)會(huì)時(shí)間會(huì)比較短,屬于中間狀態(tài)。
- NORMAL 任務(wù)已經(jīng)正常執(zhí)行完,并已將任務(wù)返回值賦值到結(jié)果
- EXCEPTIONAL 任務(wù)執(zhí)行失敗,并將異常賦值到結(jié)果
- CANCELLED 取消
- INTERRUPTING 準(zhǔn)備嘗試中斷執(zhí)行任務(wù)的線程
- INTERRUPTED 對(duì)執(zhí)行任務(wù)的線程進(jìn)行中斷(未必中斷到)
狀態(tài)轉(zhuǎn)換:

run()執(zhí)行流程
public void run() {
? ?if (state != NEW ||
? ? ? ?!RUNNER.compareAndSet(this, null, Thread.currentThread()))
? ? ? ?return;
? ?try {
? ? ? ?Callable<V> c = callable;
? ? ? ?if (c != null && state == NEW) {
? ? ? ? ? ?V result;
? ? ? ? ? ?boolean ran;
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?result = c.call();
? ? ? ? ? ? ? ?ran = true;
? ? ? ? ? } catch (Throwable ex) {
? ? ? ? ? ? ? ?result = null;
? ? ? ? ? ? ? ?ran = false;
? ? ? ? ? ? ? ?setException(ex);
? ? ? ? ? }
? ? ? ? ? ?if (ran)
? ? ? ? ? ? ? ?set(result);
? ? ? }
? } finally {
? ? ? ?// runner must be non-null until state is settled to
? ? ? ?// prevent concurrent calls to run()
? ? ? ?runner = null;
? ? ? ?// state must be re-read after nulling runner to prevent
? ? ? ?// leaked interrupts
? ? ? ?int s = state;
? ? ? ?if (s >= INTERRUPTING)
? ? ? ? ? ?handlePossibleCancellationInterrupt(s);
? }
}
set()
protected void set(V v) {
// state變量,通過CAS操作,將NEW->COMPLETING
? ?if (STATE.compareAndSet(this, NEW, COMPLETING)) {
? ? ? ?// 將結(jié)果賦值給outcome屬性
? ? ? ?outcome = v;
? ? ? ?// state狀態(tài)直接賦值為NORMAL,不需要CAS
? ? ? ?STATE.setRelease(this, NORMAL); // final state
? ? ? ?finishCompletion();
? }
}setException()
protected void setException(Throwable t) {
? ?// state變量,通過CAS操作,將NEW->COMPLETING
? ?if (STATE.compareAndSet(this, NEW, COMPLETING)) {
? ? ? ?// 將異常賦值給outcome屬性
? ? ? ?outcome = t;
? ? ? ?// state狀態(tài)直接賦值為EXCEPTIONAL,不需要CAS
? ? ? ?STATE.setRelease(this, EXCEPTIONAL); // final state
? ? ? ?finishCompletion();
? }
}finishCompletion()
set()和setException()兩個(gè)方法最后都調(diào)用了finishCompletion()方法,完成一些善后工作,具體流程如下:
private void finishCompletion() {
? ?// assert state > COMPLETING;
? ?for (WaitNode q; (q = waiters) != null;) {
? ? ? ?// 移除等待線程
? ? ? ?if (WAITERS.weakCompareAndSet(this, q, null)) {
? ? ? ? ? ?// 自旋遍歷等待線程
? ? ? ? ? ?for (;;) {
? ? ? ? ? ? ? ?Thread t = q.thread;
? ? ? ? ? ? ? ?if (t != null) {
? ? ? ? ? ? ? ? ? ?q.thread = null;
? ? ? ? ? ? ? ? ? ?// 喚醒等待線程
? ? ? ? ? ? ? ? ? ?LockSupport.unpark(t);
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?WaitNode next = q.next;
? ? ? ? ? ? ? ?if (next == null)
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ?q.next = null; // unlink to help gc
? ? ? ? ? ? ? ?q = next;
? ? ? ? ? }
? ? ? ? ? ?break;
? ? ? }
? }
? ?// 任務(wù)完成后調(diào)用函數(shù),自定義擴(kuò)展
? ?done();
?
? ?callable = null; ? ? ? ?// to reduce footprint
}handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) {
? ?if (s == INTERRUPTING)
? ? ? ?// 在中斷者中斷線程之前可能會(huì)延遲,所以我們只需要讓出CPU時(shí)間片自旋等待
? ? ? ?while (state == INTERRUPTING)
? ? ? ? ? ?Thread.yield(); // wait out pending interrupt
}get()執(zhí)行流程
public V get() throws InterruptedException, ExecutionException {
? ?int s = state;
? ?if (s <= COMPLETING)
? ? ? ?// awaitDone用于等待任務(wù)完成,或任務(wù)因?yàn)橹袛嗷虺瑫r(shí)而終止。返回任務(wù)的完成狀態(tài)。
? ? ? ?s = awaitDone(false, 0L);
? ?return report(s);
}具體流程:

awaitDone()
private int awaitDone(boolean timed, long nanos)
? ?throws InterruptedException {
? ?long startTime = 0L; ? ?// Special value 0L means not yet parked
? ?WaitNode q = null;
? ?boolean queued = false;
? ?for (;;) {
? ? ? ?// 獲取到當(dāng)前狀態(tài)
? ? ? ?int s = state;
? ? ? ?// 如果當(dāng)前狀態(tài)不為NEW或者COMPLETING
? ? ? ?if (s > COMPLETING) {
? ? ? ? ? ?if (q != null)
? ? ? ? ? ? ? ?q.thread = null;
? ? ? ? ? ?// 直接返回state
? ? ? ? ? ?return s;
? ? ? }
? ? ? ?// COMPLETING是一個(gè)很短暫的狀態(tài),調(diào)用Thread.yield期望讓出時(shí)間片,之后重試循環(huán)。
? ? ? ?else if (s == COMPLETING)
? ? ? ? ? ?Thread.yield();
? ? ? ?// 如果阻塞線程被中斷則將當(dāng)前線程從阻塞隊(duì)列中移除
? ? ? ?else if (Thread.interrupted()) {
? ? ? ? ? ?removeWaiter(q);
? ? ? ? ? ?throw new InterruptedException();
? ? ? }
? ? ? ?// 新進(jìn)來的線程添加等待節(jié)點(diǎn)
? ? ? ?else if (q == null) {
? ? ? ? ? ?if (timed && nanos <= 0L)
? ? ? ? ? ? ? ?return s;
? ? ? ? ? ?q = new WaitNode();
? ? ? }
? ? ? ?else if (!queued)
? ? ? ? ? ?/*
? ? ? ? ? ? * 這是Treiber Stack算法入棧的邏輯。
? ? ? ? ? ? * Treiber Stack是一個(gè)基于CAS的無鎖并發(fā)棧實(shí)現(xiàn),
? ? ? ? ? ? * 更多可以參考https://en.wikipedia.org/wiki/Treiber_Stack
? ? ? ? ? ? */
? ? ? ? ? ?queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
? ? ? ?else if (timed) {
? ? ? ? ? ?final long parkNanos;
? ? ? ? ? ?if (startTime == 0L) { // first time
? ? ? ? ? ? ? ?startTime = System.nanoTime();
? ? ? ? ? ? ? ?if (startTime == 0L)
? ? ? ? ? ? ? ? ? ?startTime = 1L;
? ? ? ? ? ? ? ?parkNanos = nanos;
? ? ? ? ? } else {
? ? ? ? ? ? ? ?long elapsed = System.nanoTime() - startTime;
? ? ? ? ? ? ? ?if (elapsed >= nanos) {
? ? ? ? ? ? ? ? ? ?// 超時(shí),移除棧中節(jié)點(diǎn)。
? ? ? ? ? ? ? ? ? ?removeWaiter(q);
? ? ? ? ? ? ? ? ? ?return state;
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?parkNanos = nanos - elapsed;
? ? ? ? ? }
? ? ? ? ? ?// nanoTime may be slow; recheck before parking
? ? ? ? ? ?// 未超市并且狀態(tài)為NEW,阻塞當(dāng)前線程
? ? ? ? ? ?if (state < COMPLETING)
? ? ? ? ? ? ? ?LockSupport.parkNanos(this, parkNanos);
? ? ? }
? ? ? ?else
? ? ? ? ? ?LockSupport.park(this);
? }
}removeWaiter()
private void removeWaiter(WaitNode node) {
?if (node != null) {
? ?node.thread = null;
? ?retry:
? ?for (;;) { ? ? ? ? ?// restart on removeWaiter race
? ? ?for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
? ? ? ?s = q.next;
? ? ? ?// 如果當(dāng)前節(jié)點(diǎn)仍有效,則置pred為當(dāng)前節(jié)點(diǎn),繼續(xù)遍歷。
? ? ? ?if (q.thread != null)
? ? ? ? ?pred = q;
? ? ? ?/*
? ? ? ?* 當(dāng)前節(jié)點(diǎn)已無效且有前驅(qū),則將前驅(qū)的后繼置為當(dāng)前節(jié)點(diǎn)的后繼實(shí)現(xiàn)刪除節(jié)點(diǎn)。
? ? ? ?* 如果前驅(qū)節(jié)點(diǎn)已無效,則重新遍歷waiters棧。
? ? ? ?*/
? ? ? ?else if (pred != null) {
? ? ? ? ?pred.next = s;
? ? ? ? ?if (pred.thread == null) // check for race
? ? ? ? ? ?continue retry;
? ? ? }
? ? ? ?/*
? ? ? ?* 當(dāng)前節(jié)點(diǎn)已無效,且當(dāng)前節(jié)點(diǎn)沒有前驅(qū),則將棧頂置為當(dāng)前節(jié)點(diǎn)的后繼。
? ? ? ?* 失敗的話重新遍歷waiters棧。
? ? ? ?*/
? ? ? ?else if (!WAITERS.compareAndSet(this, q, s))
? ? ? ? ?continue retry;
? ? }
? ? ?break;
? }
}
}report()
private V report(int s) throws ExecutionException {
? ?Object x = outcome;
? ?if (s == NORMAL)
? ? ? ?return (V)x;
? ?if (s >= CANCELLED)
? ? ? ?throw new CancellationException();
? ?throw new ExecutionException((Throwable)x);
}cancel()執(zhí)行流程
public boolean cancel(boolean mayInterruptIfRunning) {
? ?// 狀態(tài)機(jī)不是NEW 或CAS更新狀態(tài) 流轉(zhuǎn)到INTERRUPTING或者CANCELLED失敗,不允許cancel
? ?if (!(state == NEW && STATE.compareAndSet
? ? ? ? (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
? ? ? ?return false;
? ?try { ? ?// in case call to interrupt throws exception
? ? ? ?// 如果要求中斷執(zhí)行中的任務(wù),則直接中斷任務(wù)執(zhí)行線程,并更新狀態(tài)機(jī)為最終狀態(tài)INTERRUPTED
? ? ? ?if (mayInterruptIfRunning) {
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?Thread t = runner;
? ? ? ? ? ? ? ?if (t != null)
? ? ? ? ? ? ? ? ? ?t.interrupt();
? ? ? ? ? } finally { // final state
? ? ? ? ? ? ? ?STATE.setRelease(this, INTERRUPTED);
? ? ? ? ? }
? ? ? }
? } finally {
? ? ? ?finishCompletion();
? }
? ?return true;
}經(jīng)典案例
引用極客時(shí)間-java并發(fā)編程課程的案例燒水泡茶:

并發(fā)編程可以總結(jié)為三個(gè)核心問題:分工,同步和互斥。編寫并發(fā)程序,首先要做分工。
- T1負(fù)責(zé)洗水壺,燒開水,泡茶這三道工序
- T2負(fù)責(zé)洗茶壺,洗茶杯,拿茶葉三道工序。
- T1在執(zhí)行泡茶這道工序需要等到T2完成拿茶葉的工作。(join,countDownLatch,阻塞隊(duì)列都可以完成)

// 創(chuàng)建任務(wù) T2 的 FutureTask
FutureTask<String> ft2
?= new FutureTask<>(new T2Task());
// 創(chuàng)建任務(wù) T1 的 FutureTask
FutureTask<String> ft1
?= new FutureTask<>(new T1Task(ft2));
// 線程 T1 執(zhí)行任務(wù) ft1
Thread T1 = new Thread(ft1);
T1.start();
// 線程 T2 執(zhí)行任務(wù) ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待線程 T1 執(zhí)行結(jié)果
System.out.println(ft1.get());
?
// T1Task 需要執(zhí)行的任務(wù):
// 洗水壺、燒開水、泡茶
class T1Task implements Callable<String>{
?FutureTask<String> ft2;
?// T1 任務(wù)需要 T2 任務(wù)的 FutureTask
?T1Task(FutureTask<String> ft2){
? ?this.ft2 = ft2;
}
?@Override
?String call() throws Exception {
? ?System.out.println("T1: 洗水壺...");
? ?TimeUnit.SECONDS.sleep(1);
? ?System.out.println("T1: 燒開水...");
? ?TimeUnit.SECONDS.sleep(15);
? ?// 獲取 T2 線程的茶葉 ?
? ?String tf = ft2.get();
? ?System.out.println("T1: 拿到茶葉:"+tf);
?
? ?System.out.println("T1: 泡茶...");
? ?return " 上茶:" + tf;
}
}
// T2Task 需要執(zhí)行的任務(wù):
// 洗茶壺、洗茶杯、拿茶葉
class T2Task implements Callable<String> {
?@Override
?String call() throws Exception {
? ?System.out.println("T2: 洗茶壺...");
? ?TimeUnit.SECONDS.sleep(1);
?
? ?System.out.println("T2: 洗茶杯...");
? ?TimeUnit.SECONDS.sleep(2);
?
? ?System.out.println("T2: 拿茶葉...");
? ?TimeUnit.SECONDS.sleep(1);
? ?return " 龍井 ";
}
}
// 一次執(zhí)行結(jié)果:
//T1: 洗水壺...
//T2: 洗茶壺...
//T1: 燒開水...
//T2: 洗茶杯...
//T2: 拿茶葉...
//T1: 拿到茶葉: 龍井
//T1: 泡茶...
//上茶: 龍井以上就是Java多線程Future實(shí)現(xiàn)優(yōu)雅獲取線程的執(zhí)行結(jié)果的詳細(xì)內(nèi)容,更多關(guān)于Java獲取線程執(zhí)行結(jié)果的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring?Security權(quán)限注解啟動(dòng)及邏輯處理使用示例
這篇文章主要為大家介紹了Spring?Security權(quán)限注解啟動(dòng)及邏輯處理使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
詳談spring中bean注入無效和new創(chuàng)建對(duì)象的區(qū)別
這篇文章主要介紹了spring中bean注入無效和new創(chuàng)建對(duì)象的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
Mybatis-Plus中updateById方法不能更新空值問題解決
本文主要介紹了Mybatis-Plus中updateById方法不能更新空值問題解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08
JAVA使用JDBC技術(shù)操作SqlServer數(shù)據(jù)庫實(shí)例代碼
本篇文章主要介紹了JAVA使用JDBC技術(shù)操作SqlServer數(shù)據(jù)庫實(shí)例代碼,具有一定的參考價(jià)值,有興趣的可以了解一下。2017-01-01
Java+Selenium實(shí)現(xiàn)控制瀏覽器的啟動(dòng)選項(xiàng)Options
這篇文章主要為大家詳細(xì)介紹了如何使用java代碼利用selenium控制瀏覽器的啟動(dòng)選項(xiàng)Options的代碼操作,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2023-01-01
Spring Cache + Caffeine的整合與使用示例詳解
對(duì)于一些項(xiàng)目里需要對(duì)數(shù)據(jù)庫里的某些數(shù)據(jù)一直重復(fù)請(qǐng)求的,且這些數(shù)據(jù)基本是固定的,在這種情況下,可以借助簡單使用本地緩存來緩存這些數(shù)據(jù),本文介紹一下Spring Cache和Caffeine的使用,感興趣的朋友一起看看吧2023-12-12
Java利用樸素貝葉斯分類算法實(shí)現(xiàn)信息分類
貝葉斯分類算法是統(tǒng)計(jì)學(xué)的一種分類方法,它是一類利用概率統(tǒng)計(jì)知識(shí)進(jìn)行分類的算法。本文將利用樸素貝葉斯分類算法實(shí)現(xiàn)信息分類,需要的可以參考一下2022-06-06

