Java從源碼看異步任務計算FutureTask
前言:
大家是否熟悉FutureTask呢?或者說你有沒有異步計算的需求呢?FutureTask就能夠很好的幫助你實現(xiàn)異步計算,并且可以實現(xiàn)同步獲取異步任務的計算結(jié)果。下面我們就一起從源碼分析一下FutureTask。
了解一下什么是FutureTask?
FutureTask 是一個可取消的異步計算。
FutureTask提供了對Future的基本實現(xiàn),可以調(diào)用方法去開始和取消一個計算,可以查詢計算是否完成,并且獲取計算結(jié)果。
FutureTask只能在計算完成后獲取到計算結(jié)果,一旦計算完成,將不能重啟或者取消,除非調(diào)用runAndReset方法。
FutureTask除了實現(xiàn)了Future接口以外,還實現(xiàn)了Runnable接口,因此FutureTask是可以交由線程池的Executor執(zhí)行,也可以直接使用一個異步線程調(diào)用執(zhí)行(futureTask.run())。
FutureTask 是如何實現(xiàn)的呢?
首先,我們看一下FutureTask類的繼承結(jié)構(gòu),如下圖,它實現(xiàn)的是RunnableFuture接口,而RunnableFuture繼承自Future和函數(shù)式接口Runnable,所以說FutureTask本質(zhì)就是一個可運行的Future。

Future 接口約定了一些異步計算類必須要實現(xiàn)的功能,源碼如下:
package java.util.concurrent;
public interface Future<V> {
? ?/**
? ? * 嘗試取消任務的執(zhí)行,并返回取消結(jié)果。
? ? * 參數(shù)mayInterruptIfRunning:是否中斷線程。
? ? */
? ?boolean cancel(boolean mayInterruptIfRunning);
? ?/**
? ? * 判斷任務是否被取消(正常結(jié)束之前被被取消返回true)
? ? */
? ?boolean isCancelled();
? ?/**
? ? * 判斷當前任務是否執(zhí)行完畢,包括正常執(zhí)行完畢、執(zhí)行異?;蛘呷蝿杖∠?。
? ? */
? ?boolean isDone();
? ?/**
? ? * 獲取任務執(zhí)行結(jié)果,任務結(jié)束之前會阻塞。
? ? */
? ?V get() throws InterruptedException, ExecutionException;
? ?/**
? ? * 在指定時間內(nèi)嘗試獲取執(zhí)行結(jié)果。若超時則拋出超時異常TimeoutException
? ? */
? ?V get(long timeout, TimeUnit unit)
? ? ? ?throws InterruptedException, ExecutionException, TimeoutException;
}
Runnable 接口我們都很熟悉,他就是一個函數(shù)式接口,我們常用其創(chuàng)建一個線程。
package java.lang;
?
@FunctionalInterface
public interface Runnable {
public abstract void run();
}FutureTask就是一個將要被執(zhí)行的任務,它包含了以上接口具體的實現(xiàn),F(xiàn)utureTask內(nèi)部定義了任務的狀態(tài)state和一些狀態(tài)的常量,它的內(nèi)部核心是一個Callable callable,我們通過構(gòu)造函數(shù)可以傳入callable或者是runnable,最后都會內(nèi)部轉(zhuǎn)為callable,因為我們需要獲取異步任務的執(zhí)行結(jié)果,只有通過Callable創(chuàng)建的線程才會返回結(jié)果。
我們可以通過此時的狀態(tài)判斷Future中isCancelled(),isDone()的返回結(jié)果。
以下為FutureTask源碼,內(nèi)含核心源碼分析注釋
package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 任務的運行狀態(tài)
*/
private volatile int state;
private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 完成
private static final int NORMAL = 2; // 正常
private static final int EXCEPTIONAL = 3; // 異常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 中斷中
private static final int INTERRUPTED = 6; // 中斷的
private Callable<V> callable;
/**
* 返回結(jié)果
*/
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
...
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
/*
* 取消任務實現(xiàn)
* 如果任務還沒有啟動就調(diào)用了cancel(true),任務將永遠不會被執(zhí)行。
* 如果任務已經(jīng)啟動,參數(shù)mayInterruptIfRunning將決定任務是否應該中斷執(zhí)行該任務的線程,以嘗試中斷該任務。
* 如果任務任務已經(jīng)取消、已經(jīng)完成或者其他原因不能取消,嘗試將失敗。
*/
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
/*
* 等待獲取結(jié)果
* 獲取當前狀態(tài),判斷是否執(zhí)行完成。并且判斷時間是否超時
* 如果任務沒有執(zhí)行完成,就阻塞等待完成,若超時拋出超時等待異常。
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/*
* 等待獲取結(jié)果
* 獲取當前狀態(tài),判斷是否執(zhí)行完成。
* 如果任務沒有執(zhí)行完成,就阻塞等待完成。
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**
* 根據(jù)狀態(tài)判斷返回結(jié)果還是異常
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
protected void done() { }
/**
* 設置結(jié)果借助CAS確認狀態(tài)是否完成狀態(tài)
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/**
* 設置異常,當運行完成出現(xiàn)異常,設置異常狀態(tài)
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
/*
* 執(zhí)行callable獲取結(jié)果,或者異常
* 判斷狀態(tài)是不是啟動過的,如果是新建才可以執(zhí)行run方法
*/
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* 重新執(zhí)行
*/
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
/*
* 處理可能取消的中斷
*/
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
/**
* 移除并喚醒所有等待線程,執(zhí)行done,置空callable
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
/**
* 等待完成
* 首先判斷是否超時
* 處理中斷的,然后處理異常狀態(tài)的,處理完成的...
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
/**
* 去除等待
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
} FutureTask 運行流程
一般來說,我們可以認為FutureTask具有以下三種狀態(tài):
未啟動:新建的FutureTask,在run()沒執(zhí)行之前,F(xiàn)utureTask處于未啟動狀態(tài)。
private static final int NEW = 0; // 新建
已啟動:FutureTask對象的run方法啟動并執(zhí)行的過程中,F(xiàn)utureTask處于已啟動狀態(tài)。
已完成:FutureTask正常執(zhí)行結(jié)束,或者FutureTask執(zhí)行被取消(FutureTask對象cancel方法),或者FutureTask對象run方法執(zhí)行拋出異常而導致中斷而結(jié)束,F(xiàn)utureTask都處于已完成狀態(tài)。
private static final int COMPLETING = 1; // 完成 private static final int NORMAL = 2; // 完成后正常設置結(jié)果 private static final int EXCEPTIONAL = 3; // 完成后異常設置異常 private static final int CANCELLED = 4; // 執(zhí)行取消 private static final int INTERRUPTING = 5; // 中斷中 private static final int INTERRUPTED = 6; // 中斷的
FutureTask 的使用
使用一(直接新建一個線程調(diào)用):
FutureTask<Integer> task = new FutureTask<>(new Callable() {
@Override
public Integer call() throws Exception {
return sum();
}
});
new Thread(task).stat();
Integer result = task.get();使用二(結(jié)合線程池使用)
FutureTask<Integer> task = new FutureTask<>(new Callable() {
@Override
public Integer call() throws Exception {
return sum();
}
});
Executors.newCachedThreadPool().submit(task);
Integer result = task.get();到此這篇關(guān)于Java從源碼看異步任務計算FutureTask的文章就介紹到這了,更多相關(guān)Java FutureTask內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringBoot中使用JPA作為數(shù)據(jù)持久化框架
這篇文章主要介紹了SpringBoot中使用JPA作為數(shù)據(jù)持久化框架的相關(guān)知識,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-03-03
jasypt對配置文件的數(shù)據(jù)加密與解密方式
這篇文章主要介紹了jasypt對配置文件的數(shù)據(jù)加密與解密方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
java的arraylist排序示例(arraylist用法)
這篇文章主要介紹了java的arraylist排序示例,學習一下arraylist的用法,需要的朋友可以參考下2014-03-03

