關(guān)于Future機制原理及解析
future機制是
在通過線程去執(zhí)行某個任務的時候,如果比較耗時,我們可以通過futureTask機制,異步返回,繼續(xù)去執(zhí)行其他的任務,在需要獲取執(zhí)行結(jié)果的時候,通過future.get()方法去獲取,如果任務沒有執(zhí)行完畢,會通過lockSupport.park()方法去阻塞主線程,直到run()方法執(zhí)行完畢之后,會通過lockSupport.unpark()方法去喚醒線程
應用
public class FutureTest { ? ? public static void main(String[] args) throws ExecutionException, InterruptedException { ? ? ? ? MyThread myThread = new MyThread(); ? ? ? ? FutureTask<String> futureTask = new FutureTask<>(myThread); ? ? ? ? /** ? ? ? ? ?* 同一個futureTask對象,通過多個線程進行多次調(diào)用,但是只會執(zhí)行一次 ? ? ? ? ?* 如果是計算的操作,需要進行多次計算操作,需要聲明不同的futureTask對象 ? ? ? ? ?*/ ? ? ? ? new Thread(futureTask, "A").start(); ? ? ? ? new Thread(futureTask, "B").start(); ? ? ? ? System.out.println(Thread.currentThread().getName() + "測試,在調(diào)用future.get()方法之前,可以執(zhí)行其他邏輯 "); ? ? ? ? System.out.println(futureTask.get()); ? ? ? ? System.out.println("測試futureTask的get方法阻塞"); ? ? } } class MyThread implements Callable<String> { ? ? @Override ? ? public String call() throws Exception { ? ? ? ? System.out.println(Thread.currentThread().getName() + " 測試callable"); ? ? ? ? TimeUnit.SECONDS.sleep(4); ? ? ? ? return "success"; ? ? } }
這里就是模擬thread比較耗時,此時就可以通過futureTask異步返回之后,在需要使用的時候,調(diào)用其get方法去獲取執(zhí)行結(jié)果,如果call()方法還沒有執(zhí)行完,那futureTask.get()方法會一直阻塞,直到線程執(zhí)行完畢,獲取到執(zhí)行結(jié)果
原理
我們先說future.get()是如何阻塞的,也就是說在線程對應的方法還未執(zhí)行完時,主線程如何去阻塞?
在源碼中,有一個重要的屬性
private volatile int state; ? ? /** ? ? ?* 在構(gòu)造函數(shù)中,設置為new ? ? ?*/ ? ? private static final int NEW ? ? ? ? ?= 0; ? ? /** ? ? ?* 線程正常執(zhí)行完畢,通過CAS將state修改為completing ? ? ?*/ ? ? private static final int COMPLETING ? = 1; ? ? /** ? ? ?* ? ? ?*/ ? ? private static final int NORMAL ? ? ? = 2; ? ? /** ? ? ?* 執(zhí)行線程的時候,如果拋出異常,通過cas修改為exceptional ? ? ?*/ ? ? private static final int EXCEPTIONAL ?= 3; ? ? /** ? ? ?* 如果調(diào)用了cancel(boolean mayInterruptIfRunning)方法 ? ? ?* 入?yún)⒌膍ayInterruptIfRunning為true,就通過cas將state設置為INTERRUPTING ? ? ?* 如果為false,就通過cas將state修改為cancelled ? ? ?*/ ? ? private static final int CANCELLED ? ?= 4; ? ? private static final int INTERRUPTING = 5; ? ? /** ? ? ?* 在調(diào)用cancel()方法,入?yún)閠rue的情況下,如果中斷成功,通過cas將state從 ? ? ?* INTERRUPTING修改為INTERRUPTED ? ? ?*/ ? ? private static final int INTERRUPTED ?= 6;
接著來說get()方法
get()
可以看到,在get()方法中,會先判斷當前state是否小于等于 COMPLETING,如果是,就去阻塞
/** ? ? ?* @throws CancellationException {@inheritDoc} ? ? ?* 在調(diào)用future.get()方法的時候,如果線程的run()沒有執(zhí)行完畢,主線程會等待,直到run()方法正確的執(zhí)行完畢 ? ? ?* 在內(nèi)部,是通過lockSupport.park()方法去阻塞線程的 ? ? ?* 在該機制中,涉及到一個state狀態(tài)標識 ? ? ?*/ ? ? public V get() throws InterruptedException, ExecutionException { ? ? ? ? int s = state; ? ? ? ? if (s <= COMPLETING) ? ? ? ? ? ? s = awaitDone(false, 0L); ? ? ? ? return report(s); ? ? }
對于get方法來說,最為核心的邏輯是在awaitDone()方法中,在該方法中,會調(diào)用lockSupport.park(this),去阻塞當前線程;
所以,我們可以知道,如果主線程在調(diào)用future.get()方法的時候,假如此時run()方法還沒有執(zhí)行完畢,會阻塞當前線程,那阻塞之后,總要有一個地方去喚起,在run()方法正常執(zhí)行完畢之后,會喚醒當前阻塞的線程,繼續(xù)執(zhí)行業(yè)務邏輯
run()
public void run() { ? ? ? ? /** ? ? ? ? ?* 1.校驗當前state是否是new狀態(tài)且通過cas設置當前線程成功 ? ? ? ? ?*/ ? ? ? ? if (state != NEW || ? ? ? ? ? ? !UNSAFE.compareAndSwapObject(this, runnerOffset, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?null, Thread.currentThread())) ? ? ? ? ? ? return; ? ? ? ? try { ? ? ? ? ? ? Callable<V> c = callable; ? ? ? ? ? ? /** ? ? ? ? ? ? ?* 2.校驗當前線程中的callable是否有效且state為new ? ? ? ? ? ? ?*/ ? ? ? ? ? ? if (c != null && state == NEW) { ? ? ? ? ? ? ? ? V result; ? ? ? ? ? ? ? ? boolean ran; ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ? ? ?* 2.1 執(zhí)行call()方法,如果正常執(zhí)行完畢,設置ran為true ? ? ? ? ? ? ? ? ? ? ? */ ? ? ? ? ? ? ? ? ? ? result = c.call(); ? ? ? ? ? ? ? ? ? ? ran = true; ? ? ? ? ? ? ? ? } catch (Throwable ex) { ? ? ? ? ? ? ? ? ? ? result = null; ? ? ? ? ? ? ? ? ? ? ran = false; ? ? ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ? ? ?* 2.2 假如執(zhí)行報錯,被捕獲到,在setException方法中,也會去喚醒阻塞的線程 ? ? ? ? ? ? ? ? ? ? ?*/ ? ? ? ? ? ? ? ? ? ? setException(ex); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ?* 3.如果ran為true,就調(diào)用set方法,在set方法中,有一步跟重要的操作,就是通過lockSupport.unpark()喚醒線程 ? ? ? ? ? ? ? ? ?*/ ? ? ? ? ? ? ? ? if (ran) ? ? ? ? ? ? ? ? ? ? set(result); ? ? ? ? ? ? } ? ? ? ? } finally { ? ? ? ? ? ? // runner must be non-null until state is settled to ? ? ? ? ? ? // prevent concurrent calls to run() ? ? ? ? ? ? runner = null; ? ? ? ? ? ? // state must be re-read after nulling runner to prevent ? ? ? ? ? ? // leaked interrupts ? ? ? ? ? ? int s = state; ? ? ? ? ? ? if (s >= INTERRUPTING) ? ? ? ? ? ? ? ? handlePossibleCancellationInterrupt(s); ? ? ? ? } ? ? }
在這里的run()方法中,setException和set(result)方法中,都會調(diào)用一個方法:finishCompletion()
set()
?? ?/* ?? ?* 在set方法中,會通過cas更新當前的state狀態(tài) ? ? ?* 然后在調(diào)用finishCompletion的時候,會喚醒阻塞的線程 ? ? ?* @param v the value ? ? ?*/ ? ? protected void set(V v) { ? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { ? ? ? ? ? ? outcome = v; ? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state ? ? ? ? ? ? finishCompletion(); ? ? ? ? } ? ? }
在線程正常執(zhí)行完之后,會通過cas將state的狀態(tài)從new修改為completing,然后再修改為normal
finishCompletion()
/** ? ? ?* Removes and signals all waiting threads, invokes done(), and ? ? ?* nulls out callable. ? ? ?* 這個方法大致的意思是:從等待隊列中獲取到當前在等待的線程信息,然后通過cas將q設置為null ? ? ?* 最后會通過lockSupport.unPark()方法喚醒線程 ? ? ?*/ ? ? private void finishCompletion() { ? ? ? ? // assert state > COMPLETING; ? ? ? ? /** ? ? ? ? ?* 1.從waiters中獲取到等待的節(jié)點 ? ? ? ? ?*/ ? ? ? ? for (WaitNode q; (q = waiters) != null;) { ? ? ? ? ? ? /** ? ? ? ? ? ? ?* 2.通過cas將q設置為null ? ? ? ? ? ? ?*/ ? ? ? ? ? ? if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { ? ? ? ? ? ? ? ? for (;;) { ? ? ? ? ? ? ? ? ? ? Thread t = q.thread; ? ? ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ? ? ?* 3.喚醒線程 ? ? ? ? ? ? ? ? ? ? ?*/ ? ? ? ? ? ? ? ? ? ? if (t != null) { ? ? ? ? ? ? ? ? ? ? ? ? q.thread = null; ? ? ? ? ? ? ? ? ? ? ? ? LockSupport.unpark(t); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? WaitNode next = q.next; ? ? ? ? ? ? ? ? ? ? if (next == null) ? ? ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? ? ? ? ? q.next = null; // unlink to help gc ? ? ? ? ? ? ? ? ? ? q = next; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? done(); ? ? ? ? callable = null; ? ? ? ?// to reduce footprint ? ? }
這個方法也不需要做過多解釋了,就是從waitNode中,獲取到當前等待的線程,然后喚醒
總結(jié)
所以:對于future.get()方法,如果run()方法沒有執(zhí)行完成,該方法會阻塞線程,并且在方法正常執(zhí)行完畢之后,喚醒阻塞的線程,繼續(xù)去執(zhí)行對應的業(yè)務代碼
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot啟動報錯屬性循環(huán)依賴報錯問題的解決
這篇文章主要介紹了SpringBoot啟動報錯屬性循環(huán)依賴報錯問題的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05springBoot+webMagic實現(xiàn)網(wǎng)站爬蟲的實例代碼
這篇文章主要介紹了springBoot+webMagic實現(xiàn)網(wǎng)站爬蟲的實例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-05-05簡單捋捋@RequestParam 和 @RequestBody的使用
這篇文章主要介紹了簡單捋捋@RequestParam 和 @RequestBody的使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-12-12