Java中Future、FutureTask原理以及與線程池的搭配使用
Java中的Future和Future通常和線程池搭配使用,用來獲取線程池返回執(zhí)行后的返回值。我們假設通過Executors工廠方法構建一個線程池es ,es要執(zhí)行某個任務有兩種方式,一種是執(zhí)行 es.execute(runnable) ,這種情況是沒有返回值的; 另外一種情況是執(zhí)行 es.submit(runnale)或者 es.submit(callable) ,這種情況會返回一個Future的對象,然后調用Future的get()來獲取返回值。
Future
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future是一個接口,他提供給了我們方法來檢測當前的任務是否已經(jīng)結束,還可以等待任務結束并且拿到一個結果,通過調用Future的get()方法可以當任務結束后返回一個結果值,如果工作沒有結束,則會阻塞當前線程,直到任務執(zhí)行完畢,我們可以通過調用cancel()方法來停止一個任務,如果任務已經(jīng)停止,則cancel()方法會返回true;如果任務已經(jīng)完成或者已經(jīng)停止了或者這個任務無法停止,則cancel()會返回一個false。當一個任務被成功停止后,他無法再次執(zhí)行。isDone()和isCancel()方法可以判斷當前工作是否完成和是否取消。
線程池中有以下幾個方法:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
都返回一個Future對象,仔細查看發(fā)現(xiàn),所有的方法最終都將runnable或者callable轉變成一個RunnableFuture的對象,這個RunnableFutre的對象是一個同時繼承了Runnable和Future的接口
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
然后調用executor(runnable)方法,關于executor(runnable)的具體實現(xiàn)我們后面再講。最后返回一個RunnableFuture對象。RunnableFuture這個接口直有一個具體的實現(xiàn)類,那就時我們接下來要講的FutureTask。
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
FutureTask實現(xiàn)了RunnableFuture的接口,既然我們知道最終返回的是一個FutureTask對象ftask,而且我們可以通過ftask.get()可以的來得到execute(task)的返回值,這個過程具體事怎么實現(xiàn)的了??這個也是本篇文章的所要講的。
我們可以先來猜測一下它的實現(xiàn)過程,首先Runnable的run()是沒有返回值的,所以當es.submit()的參數(shù)只有一個Runnable對象的時候,通過ftask.get()得到的也是一個null值,當參數(shù)還有一個result的時候,就返回這個result;如果參數(shù)是一個Callable的對象的時候,Callable的call()是有返回值的,同時這個call()方法會在轉換的Runable對象ftask的run()方法中被調用,然后將它的返回值賦值給一個全局變量,最后在ftask的get()方法中得到這個值。猜想對不對了? 我們直接看源碼。
將Runnable對象轉為RunnableFuture的方法:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
Executors::callable()
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
Executors的內部類RunnableAdapter
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
將Callable對象轉為RunnableFuture的方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
接下來我們來看execute(runnbale)的執(zhí)行過程:
execute(runnable)最終的實現(xiàn)是在ThreadPoolExecutor,基本上所有的線程池都是通過ThreadPoolExecutor的構造方法傳入不同的參數(shù)來構造的。
ThreadPoolExecutor::executor(runnable) :
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
如上所示,這個過程分為三步:
Step 1:
如果當前線程池的的線程小于核心線程的數(shù)量的時候,就會調用addWorker檢查運行狀態(tài)和正在運行的線程數(shù)量,通過返回false來防止錯誤地添加線程,然后執(zhí)行當前任務。
Step 2:
否則當前線程池的的線程大于核心線程的數(shù)量的時候,我們仍然需要先判斷是否需要添加一個新的線程來執(zhí)行這個任務,因為可能已經(jīng)存在的線程此刻任務執(zhí)行完畢處于空閑狀態(tài),這個時候可以直接復用。否則創(chuàng)建一個新的線程來執(zhí)行此任務。
Step 3:
如果不能再添加新的任務,就拒絕。
執(zhí)行execute(runnable)最終會回調runnable的run()方法,也就是FutureTask的對象ftask的run()方法,源碼如下:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
通過執(zhí)行result = c.call()拿到返回值,然后set(result) ,因此get()方法獲得的值正是這個result。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
MybatisPlus lambdaQueryWrapper中常用方法的使用
本文主要介紹了MybatisPlus lambdaQueryWrapper中常用方法的使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-07-07java線程池ThreadPoolExecutor的八種拒絕策略示例詳解
ThreadPoolExecutor是一個典型的緩存池化設計的產(chǎn)物,因為池子有大小,當池子體積不夠承載時,就涉及到拒絕策略。JDK中已預設了?4?種線程池拒絕策略,下面結合場景詳細聊聊這些策略的使用場景以及還能擴展哪些拒絕策略2021-11-11淺談MultipartFile中transferTo方法的坑
這篇文章主要介紹了MultipartFile中transferTo方法的坑,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Mybatis一對多關聯(lián)關系映射實現(xiàn)過程解析
這篇文章主要介紹了Mybatis一對多關聯(lián)關系映射實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02