Java中的FutureTask源碼解析
一、簡介
1、FutureTask是一個可取消的異步計算。這個類是Future的實現(xiàn)類,有開始和取消一個計算的方法,如果一個計算已經(jīng)完成可以查看結(jié)果。如果在計算沒有完成的情況下調(diào)用get獲取計算結(jié)果會阻塞。且一旦任務(wù)完成后,計算不能重新開始或被取消,除非計算被runAndReset調(diào)用執(zhí)行。
2、FutureTask被用來去封裝一個Callable或者Runnable,一個FutureTask能夠被submit作為一個Executor
3、FutureTask 的線程安全由CAS來保證。
二、源碼分析
1、成員屬性
public class FutureTask<V> implements RunnableFuture<V> {
//state表示的任務(wù)的狀態(tài)
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
//任務(wù)
private Callable<V> callable;
//存儲任務(wù)完成以后的結(jié)果
private Object outcome;
//執(zhí)行當前任務(wù)的線程
private volatile Thread runner;
//執(zhí)行當前任務(wù)被阻塞的線程
private volatile WaitNode waiters;
}可能有的狀態(tài)轉(zhuǎn)換:
NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
注意:state用volatile修飾的,如果在多線程并發(fā)的情況下,某一個線程改變了任務(wù)的狀態(tài),其他線程都能夠立馬知道,保證了state字段的可見性。
2、構(gòu)造函數(shù)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}很好的詮釋了FutureTask封裝了Runnable或Callable,構(gòu)造完成后將任務(wù)的狀態(tài)變?yōu)镹EW。同時注意,封裝Runnable時用的Executors的靜態(tài)方法callable
順帶看下Executors.callable()這個方法,這個方法的功能是把Runnable轉(zhuǎn)換成Callable,代碼如下:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}所以,F(xiàn)utureTask封裝Runnable使用了適配器模式的設(shè)計模式
3、核心方法
//運行任務(wù)的方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; //得到當前任務(wù)
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //當前任務(wù)調(diào)用call方法,執(zhí)行,同時,執(zhí)行完后將結(jié)果返回
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran) //表示任務(wù)執(zhí)行成功
set(result); //CAS改變?nèi)蝿?wù)的狀態(tài)從NEW->COMPLETING->NORMAL,同時將任務(wù)返回的結(jié)果保存到outcome屬性中,再移除并喚醒所有等待線程
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; //將任務(wù)成功執(zhí)行完后返回的結(jié)果保存到outcome中
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終的狀態(tài),表示任務(wù)結(jié)束
finishCompletion(); //移除并喚醒所有等待線程
}
}
//該方法用于移除并喚醒所有等待線程
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); //喚醒
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt(); //打斷
} finally { // 設(shè)置成為最終態(tài)INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion(); //移除并喚醒所有等待線程
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); //如果任務(wù)沒有完成或者其他的問題,將阻塞;創(chuàng)建一個新節(jié)點存入阻塞棧中
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}三、示例
常用使用方式:
- 第一種方式: Future + ExecutorService
- 第二種方式: FutureTask + ExecutorService
- 第三種方式: FutureTask + Thread
第一種方式:Future + ExecutorService
public class FutureDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Future future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Long start = System.currentTimeMillis();
while (true) {
Long current = System.currentTimeMillis();
if ((current - start) > 1000) {
return 1;
}
}
}
});
try {
Integer result = (Integer)future.get();
System.out.println(result);
}catch (Exception e){
e.printStackTrace();
}
}
}第二種方式:FutureTask + ExecutorService
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);第三種方式:FutureTask + Thread
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
Thread thread = new Thread(futureTask);
thread.setName("Task thread");
thread.start();四、總結(jié)
1、FutureTask用來封裝Runnable或者Callable接口,可以當成一個任務(wù)。
2、在Java并發(fā)程序中FutureTask表示一個可以取消的異步運算。它有啟動和取消運算、查詢運算是否完成和取回運算結(jié)果等方法。只有當運算完成的時候結(jié)果才能取回,如果運算尚未完成get方法將會阻塞。一個FutureTask對象可以對調(diào)用了Callable和Runnable的對象進行包裝,由于FutureTask也是調(diào)用了Runnable接口所以它可以提交給Executor來執(zhí)行。
3、FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景,通過傳入Runnable或者Callable的任務(wù)給FutureTask,直接調(diào)用其run方法或者放入線程池執(zhí)行,之后可以在外部通過FutureTask的get方法異步獲取執(zhí)行結(jié)果,因此,F(xiàn)utureTask非常適合用于耗時的計算,主線程可以在完成自己的任務(wù)后,再去獲取結(jié)果。另外,F(xiàn)utureTask還可以確保即使調(diào)用了多次run方法,它都只會執(zhí)行一次Runnable或者Callable任務(wù),或者通過cancel取消FutureTask的執(zhí)行等。
4、FutureTask間接繼承了Runnable和Callable
5、FutureTask的線程安全由CAS操作來保證
6、FutureTask結(jié)果返回機制 :只有任務(wù)成功執(zhí)行完成后,通過get方法能夠得到任務(wù)返回的結(jié)果,其他情況都會導(dǎo)致阻塞。
到此這篇關(guān)于Java中的FutureTask源碼解析的文章就介紹到這了,更多相關(guān)FutureTask源碼解析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
在CentOS上安裝Java 17并實現(xiàn)多版本共存的詳細教程
在現(xiàn)代軟件開發(fā)中,Java 作為一種廣泛使用的編程語言,其版本更新頻繁,不同項目可能依賴不同版本的 Java 運行環(huán)境,CentOS 作為一款流行的 Linux 發(fā)行版,常被用于服務(wù)器部署和開發(fā)環(huán)境,本文將詳細介紹如何在 CentOS 上安裝 Java 17,并實現(xiàn)與現(xiàn)有 Java 8 的多版本共存2025-03-03
SpringKafka消息發(fā)布之KafkaTemplate與事務(wù)支持功能
通過本文介紹的基本用法、序列化選項、事務(wù)支持、錯誤處理和性能優(yōu)化技術(shù),開發(fā)者可以構(gòu)建高效可靠的Kafka消息發(fā)布系統(tǒng),事務(wù)支持特性尤為重要,它確保了在分布式環(huán)境中的數(shù)據(jù)一致性,感興趣的朋友一起看看吧2025-04-04

