關(guān)于Future機(jī)制原理及解析
future機(jī)制是
在通過線程去執(zhí)行某個(gè)任務(wù)的時(shí)候,如果比較耗時(shí),我們可以通過futureTask機(jī)制,異步返回,繼續(xù)去執(zhí)行其他的任務(wù),在需要獲取執(zhí)行結(jié)果的時(shí)候,通過future.get()方法去獲取,如果任務(wù)沒有執(zhí)行完畢,會(huì)通過lockSupport.park()方法去阻塞主線程,直到run()方法執(zhí)行完畢之后,會(huì)通過lockSupport.unpark()方法去喚醒線程
應(yīng)用
public class FutureTest { ? ? public static void main(String[] args) throws ExecutionException, InterruptedException { ? ? ? ? MyThread myThread = new MyThread(); ? ? ? ? FutureTask<String> futureTask = new FutureTask<>(myThread); ? ? ? ? /** ? ? ? ? ?* 同一個(gè)futureTask對(duì)象,通過多個(gè)線程進(jìn)行多次調(diào)用,但是只會(huì)執(zhí)行一次 ? ? ? ? ?* 如果是計(jì)算的操作,需要進(jìn)行多次計(jì)算操作,需要聲明不同的futureTask對(duì)象 ? ? ? ? ?*/ ? ? ? ? new Thread(futureTask, "A").start(); ? ? ? ? new Thread(futureTask, "B").start(); ? ? ? ? System.out.println(Thread.currentThread().getName() + "測(cè)試,在調(diào)用future.get()方法之前,可以執(zhí)行其他邏輯 "); ? ? ? ? System.out.println(futureTask.get()); ? ? ? ? System.out.println("測(cè)試futureTask的get方法阻塞"); ? ? } } class MyThread implements Callable<String> { ? ? @Override ? ? public String call() throws Exception { ? ? ? ? System.out.println(Thread.currentThread().getName() + " 測(cè)試callable"); ? ? ? ? TimeUnit.SECONDS.sleep(4); ? ? ? ? return "success"; ? ? } }
這里就是模擬thread比較耗時(shí),此時(shí)就可以通過futureTask異步返回之后,在需要使用的時(shí)候,調(diào)用其get方法去獲取執(zhí)行結(jié)果,如果call()方法還沒有執(zhí)行完,那futureTask.get()方法會(huì)一直阻塞,直到線程執(zhí)行完畢,獲取到執(zhí)行結(jié)果
原理
我們先說future.get()是如何阻塞的,也就是說在線程對(duì)應(yīng)的方法還未執(zhí)行完時(shí),主線程如何去阻塞?
在源碼中,有一個(gè)重要的屬性
private volatile int state; ? ? /** ? ? ?* 在構(gòu)造函數(shù)中,設(shè)置為new ? ? ?*/ ? ? private static final int NEW ? ? ? ? ?= 0; ? ? /** ? ? ?* 線程正常執(zhí)行完畢,通過CAS將state修改為completing ? ? ?*/ ? ? private static final int COMPLETING ? = 1; ? ? /** ? ? ?* ? ? ?*/ ? ? private static final int NORMAL ? ? ? = 2; ? ? /** ? ? ?* 執(zhí)行線程的時(shí)候,如果拋出異常,通過cas修改為exceptional ? ? ?*/ ? ? private static final int EXCEPTIONAL ?= 3; ? ? /** ? ? ?* 如果調(diào)用了cancel(boolean mayInterruptIfRunning)方法 ? ? ?* 入?yún)⒌膍ayInterruptIfRunning為true,就通過cas將state設(shè)置為INTERRUPTING ? ? ?* 如果為false,就通過cas將state修改為cancelled ? ? ?*/ ? ? private static final int CANCELLED ? ?= 4; ? ? private static final int INTERRUPTING = 5; ? ? /** ? ? ?* 在調(diào)用cancel()方法,入?yún)閠rue的情況下,如果中斷成功,通過cas將state從 ? ? ?* INTERRUPTING修改為INTERRUPTED ? ? ?*/ ? ? private static final int INTERRUPTED ?= 6;
接著來說get()方法
get()
可以看到,在get()方法中,會(huì)先判斷當(dāng)前state是否小于等于 COMPLETING,如果是,就去阻塞
/** ? ? ?* @throws CancellationException {@inheritDoc} ? ? ?* 在調(diào)用future.get()方法的時(shí)候,如果線程的run()沒有執(zhí)行完畢,主線程會(huì)等待,直到run()方法正確的執(zhí)行完畢 ? ? ?* 在內(nèi)部,是通過lockSupport.park()方法去阻塞線程的 ? ? ?* 在該機(jī)制中,涉及到一個(gè)state狀態(tài)標(biāo)識(shí) ? ? ?*/ ? ? public V get() throws InterruptedException, ExecutionException { ? ? ? ? int s = state; ? ? ? ? if (s <= COMPLETING) ? ? ? ? ? ? s = awaitDone(false, 0L); ? ? ? ? return report(s); ? ? }
對(duì)于get方法來說,最為核心的邏輯是在awaitDone()方法中,在該方法中,會(huì)調(diào)用lockSupport.park(this),去阻塞當(dāng)前線程;
所以,我們可以知道,如果主線程在調(diào)用future.get()方法的時(shí)候,假如此時(shí)run()方法還沒有執(zhí)行完畢,會(huì)阻塞當(dāng)前線程,那阻塞之后,總要有一個(gè)地方去喚起,在run()方法正常執(zhí)行完畢之后,會(huì)喚醒當(dāng)前阻塞的線程,繼續(xù)執(zhí)行業(yè)務(wù)邏輯
run()
public void run() { ? ? ? ? /** ? ? ? ? ?* 1.校驗(yàn)當(dāng)前state是否是new狀態(tài)且通過cas設(shè)置當(dāng)前線程成功 ? ? ? ? ?*/ ? ? ? ? if (state != NEW || ? ? ? ? ? ? !UNSAFE.compareAndSwapObject(this, runnerOffset, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?null, Thread.currentThread())) ? ? ? ? ? ? return; ? ? ? ? try { ? ? ? ? ? ? Callable<V> c = callable; ? ? ? ? ? ? /** ? ? ? ? ? ? ?* 2.校驗(yàn)當(dāng)前線程中的callable是否有效且state為new ? ? ? ? ? ? ?*/ ? ? ? ? ? ? if (c != null && state == NEW) { ? ? ? ? ? ? ? ? V result; ? ? ? ? ? ? ? ? boolean ran; ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ? ? ?* 2.1 執(zhí)行call()方法,如果正常執(zhí)行完畢,設(shè)置ran為true ? ? ? ? ? ? ? ? ? ? ? */ ? ? ? ? ? ? ? ? ? ? result = c.call(); ? ? ? ? ? ? ? ? ? ? ran = true; ? ? ? ? ? ? ? ? } catch (Throwable ex) { ? ? ? ? ? ? ? ? ? ? result = null; ? ? ? ? ? ? ? ? ? ? ran = false; ? ? ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ? ? ?* 2.2 假如執(zhí)行報(bào)錯(cuò),被捕獲到,在setException方法中,也會(huì)去喚醒阻塞的線程 ? ? ? ? ? ? ? ? ? ? ?*/ ? ? ? ? ? ? ? ? ? ? setException(ex); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ?* 3.如果ran為true,就調(diào)用set方法,在set方法中,有一步跟重要的操作,就是通過lockSupport.unpark()喚醒線程 ? ? ? ? ? ? ? ? ?*/ ? ? ? ? ? ? ? ? if (ran) ? ? ? ? ? ? ? ? ? ? set(result); ? ? ? ? ? ? } ? ? ? ? } finally { ? ? ? ? ? ? // runner must be non-null until state is settled to ? ? ? ? ? ? // prevent concurrent calls to run() ? ? ? ? ? ? runner = null; ? ? ? ? ? ? // state must be re-read after nulling runner to prevent ? ? ? ? ? ? // leaked interrupts ? ? ? ? ? ? int s = state; ? ? ? ? ? ? if (s >= INTERRUPTING) ? ? ? ? ? ? ? ? handlePossibleCancellationInterrupt(s); ? ? ? ? } ? ? }
在這里的run()方法中,setException和set(result)方法中,都會(huì)調(diào)用一個(gè)方法:finishCompletion()
set()
?? ?/* ?? ?* 在set方法中,會(huì)通過cas更新當(dāng)前的state狀態(tài) ? ? ?* 然后在調(diào)用finishCompletion的時(shí)候,會(huì)喚醒阻塞的線程 ? ? ?* @param v the value ? ? ?*/ ? ? protected void set(V v) { ? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { ? ? ? ? ? ? outcome = v; ? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state ? ? ? ? ? ? finishCompletion(); ? ? ? ? } ? ? }
在線程正常執(zhí)行完之后,會(huì)通過cas將state的狀態(tài)從new修改為completing,然后再修改為normal
finishCompletion()
/** ? ? ?* Removes and signals all waiting threads, invokes done(), and ? ? ?* nulls out callable. ? ? ?* 這個(gè)方法大致的意思是:從等待隊(duì)列中獲取到當(dāng)前在等待的線程信息,然后通過cas將q設(shè)置為null ? ? ?* 最后會(huì)通過lockSupport.unPark()方法喚醒線程 ? ? ?*/ ? ? private void finishCompletion() { ? ? ? ? // assert state > COMPLETING; ? ? ? ? /** ? ? ? ? ?* 1.從waiters中獲取到等待的節(jié)點(diǎn) ? ? ? ? ?*/ ? ? ? ? for (WaitNode q; (q = waiters) != null;) { ? ? ? ? ? ? /** ? ? ? ? ? ? ?* 2.通過cas將q設(shè)置為null ? ? ? ? ? ? ?*/ ? ? ? ? ? ? if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { ? ? ? ? ? ? ? ? for (;;) { ? ? ? ? ? ? ? ? ? ? Thread t = q.thread; ? ? ? ? ? ? ? ? ? ? /** ? ? ? ? ? ? ? ? ? ? ?* 3.喚醒線程 ? ? ? ? ? ? ? ? ? ? ?*/ ? ? ? ? ? ? ? ? ? ? if (t != null) { ? ? ? ? ? ? ? ? ? ? ? ? q.thread = null; ? ? ? ? ? ? ? ? ? ? ? ? LockSupport.unpark(t); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? WaitNode next = q.next; ? ? ? ? ? ? ? ? ? ? if (next == null) ? ? ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? ? ? ? ? q.next = null; // unlink to help gc ? ? ? ? ? ? ? ? ? ? q = next; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? done(); ? ? ? ? callable = null; ? ? ? ?// to reduce footprint ? ? }
這個(gè)方法也不需要做過多解釋了,就是從waitNode中,獲取到當(dāng)前等待的線程,然后喚醒
總結(jié)
所以:對(duì)于future.get()方法,如果run()方法沒有執(zhí)行完成,該方法會(huì)阻塞線程,并且在方法正常執(zhí)行完畢之后,喚醒阻塞的線程,繼續(xù)去執(zhí)行對(duì)應(yīng)的業(yè)務(wù)代碼
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java 實(shí)現(xiàn)定時(shí)任務(wù)的三種方法
這篇文章主要介紹了Java 實(shí)現(xiàn)定時(shí)任務(wù)的三種方法,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下2021-03-03SpringBoot啟動(dòng)報(bào)錯(cuò)屬性循環(huán)依賴報(bào)錯(cuò)問題的解決
這篇文章主要介紹了SpringBoot啟動(dòng)報(bào)錯(cuò)屬性循環(huán)依賴報(bào)錯(cuò)問題的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-05-05springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲的實(shí)例代碼
這篇文章主要介紹了springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲的實(shí)例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05Java將不同的List集合復(fù)制到另一個(gè)集合常見的方法
在Java中,有時(shí)候我們需要將一個(gè)List對(duì)象的屬性值復(fù)制到另一個(gè)List對(duì)象中,使得兩個(gè)對(duì)象的屬性值相同,這篇文章主要介紹了Java將不同的List集合復(fù)制到另一個(gè)集合常見的方法,需要的朋友可以參考下2024-09-09簡(jiǎn)單捋捋@RequestParam 和 @RequestBody的使用
這篇文章主要介紹了簡(jiǎn)單捋捋@RequestParam 和 @RequestBody的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12