欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java線程池submit阻塞獲取結果的實現(xiàn)原理詳解

 更新時間:2022年10月24日 10:02:33   作者:JAVA旭陽  
Java線程池中提交任務運行,通常使用execute()方法就足夠了。那如果想要實現(xiàn)在主線程中阻塞獲取線程池任務運行的結果,該怎么辦呢?本文就來和大家一起討論討論

前言

Java線程池中提交任務運行,通常使用execute()方法就足夠了。那如果想要實現(xiàn)在主線程中阻塞獲取線程池任務運行的結果,該怎么辦呢?答案是用submit()方法提交任務。這也是面試中經常被問到的一個知識點,execute()submit()提交任務的的區(qū)別是什么?底層是如何實現(xiàn)的?

案例演示

現(xiàn)在我們通過簡單的例子演示下submit()方法的妙處。

@Test
public void testSubmit() throws ExecutionException, InterruptedException {
    // 創(chuàng)建一個核心線程數(shù)為5的線程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));

    // 創(chuàng)建一個計算任務
    Callable<Integer> myTask = new Callable<Integer>() {

        @Override
        public Integer call() throws Exception {
            int result = 0;
            for (int i = 0; i < 10000; i++) {
                result += i;
            }
            Thread.sleep(1000);
            return result;
        }
    };

    log.info("start submit task .....");
    Future<Integer> future = threadPoolExecutor.submit(myTask);

    Integer sum = future.get();
    log.info("get submit result: [{}]", sum);

    // use sum do other things
}

運行結果:

主線程的確阻塞等待線程返回。

Future類API

我們看到用submit提交任務最后返回一個Future對象,F(xiàn)uture表示異步計算的結果。那它都提供了什么API呢?

方法說明
V get()等待任務執(zhí)行完成,然后獲取其結果。
V get(long timeout, TimeUnit unit)等待獲取任務執(zhí)行的結果,如果任務超過一定時間沒有執(zhí)行完畢,直接返回,拋出異常,不會一直等待下去。
boolean isDone()如果此任務已完成,則返回true。完成可能是由于正常終止、異常或取消——在所有這些情況下,該方法都將返回true。
boolean isCancelled()如果該任務在正常完成之前被取消,則返回true。
boolean cancel(boolean mayInterruptIfRunning)試圖取消此任務的執(zhí)行。1. 如果任務已經完成、已經取消或由于其他原因無法取消,則此嘗試將失敗。
  • 如果在調用cancel時此任務尚未啟動,則此任務不應運行。
  • 如果任務已經開始,那么mayInterruptIfRunning參數(shù)確定是否應該中斷執(zhí)行此任務的線程以試圖停止該任務。 |

和execute區(qū)別

從功能層面,我們已經很明白他們最大區(qū)別,

  • execute()方式提交任務沒有返回值,直接線程中池異步運行任務。
  • submit()方式提交任務有返回值Future, 調用get方法可以阻塞調用線程,等待任務運行返回的結果。

那從源碼層面,二者又有什么區(qū)別和聯(lián)系呢?

我們看下submit()提交的入口方法,代碼如下:

// AbstractExecutorService#submit
public <T> Future<T> submit(Callable<T> task) {
    // 判空處理
    if (task == null) throw new NullPointerException();
    // 將提交的任務包裝成RunnableFuture
    RunnableFuture<T> ftask = newTaskFor(task);
    // 最終還是調用execute方法執(zhí)行任務
    execute(ftask);
    return ftask;
}

殊途同歸,最終都是調用execute()方法,只不過submit()方法在調用前做一層包裝,將任務包裝成RunnableFuture對象。

關于線程池中execute()方法提交的流程和原理實現(xiàn)不理解的,強烈建議先學習這篇文章:Java線程池源碼深度解析。

原理實現(xiàn)

本節(jié)內容我們聚焦在submit()方法的實現(xiàn)原理。

我們先思考下,如果讓我們設計實現(xiàn)調用get阻塞知道線程返回結果,要考慮哪些方面呢?

  • 任務是否執(zhí)行結束或者執(zhí)行出錯等情況,是不是需要有個狀態(tài)位標記?
  • 任務的執(zhí)行結果如何保存?
  • 如果任務沒有執(zhí)行結束,如何阻塞當前線程,LockSupport.park()是一種方式。
  • 如果有多個外部線程獲取get,是不是應該也要把外部線程存下來,怎么存?因為后面任務執(zhí)行完后需要喚醒他們。

帶著這些問題和基本思路我們看下jdk8中是如何實現(xiàn)的。

RunnableFuture類介紹

submit()方法中調用newTaskFor()方法獲取RunnableFuture對象。

// AbstractExecutorService#newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // 調用FutureTask的構造方法返回RunnableFuture對象
    return new FutureTask<T>(callable);
}

FutureTask類結構圖如下:

FutureTask是一個異步計算任務,包裝了我們外部提交的任務。

  • 實現(xiàn)了Runnable接口
  • 實現(xiàn)了Future接口,該接口封裝了任務結果的獲取、任務是否結束等接口。

RunnableFuture類重要屬性

1.任務運行狀態(tài)state

// 存儲當前任務運行狀態(tài)
private volatile int state;
// 當前任務尚未執(zhí)行
private static final int NEW          = 0;
// 當前任務正在結束,尚未完全結束,一種臨界狀態(tài)
private static final int COMPLETING   = 1;
// 當前任務正常結束
private static final int NORMAL       = 2;
// 當前任務執(zhí)行過程中發(fā)生了異常
private static final int EXCEPTIONAL  = 3;
// 當前任務被取消
private static final int CANCELLED    = 4;
// 當前任務中斷中
private static final int INTERRUPTING = 5;
// 當前任務已中斷
private static final int INTERRUPTED  = 6;

可能的狀態(tài)轉換有如下幾種:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

2.真正要執(zhí)行的任務callble

// 存放真正提交的原始任務
private Callable<V> callable;

3.存放執(zhí)行結果outcome

返回的結果或從get()中拋出的異常
private Object outcome;

4.當前正在運行任務的線程runner

//當前任務被線程執(zhí)行期間,保存當前任務的線程對象引用
private volatile Thread runner;

5.調用get獲取任務結果的等待線程集合waiters

//因為會有很多線程去get當前任務的結果,所以這里使用了一種stack數(shù)據(jù)結構來保存
private volatile WaitNode waiters;

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

數(shù)據(jù)結構如下圖:

RunnableFuture類構造方法

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        // 設置要執(zhí)行的任務
        this.callable = callable;
        // 初始化時任務狀態(tài)為NEW
        this.state = NEW;       
}

任務執(zhí)行run()原理

submit()方法最終調用線程池的execute()方法,而execute()方法會創(chuàng)建出"工人"Worker對象,調用runWorker()方法,它主要是執(zhí)行外部提交的任務,也就是這里的FutureTask對象的run()方法, 我們重點看下run()方法。

FutureTask#run()開始執(zhí)行任務。

它主要的功能是完成包裝的callable的call方法執(zhí)行,并將執(zhí)行結果保存到outcome中,同時捕獲了call方法執(zhí)行出現(xiàn)的異常,并保存異常信息,而不是直接拋出。

public void run() {
    // 狀態(tài)機不為NEW表示執(zhí)行完成或任務被取消了,直接返回
    // 狀態(tài)機為NEW,同時將runner設置為當前線程,保證同一時刻只有一個線程執(zhí)行run方法,如果設置失敗也直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 取出原始的任務檢測不為空 且 再次檢查狀態(tài)為NEW(雙重校驗)
        if (c != null && state == NEW) {
            // 任務運行的結果
            V result;
            // 任務是否運行是否正常, true:正常, false-異常
            boolean ran;
            try {
                // 任務執(zhí)行,將結果返回給result
                result = c.call();
                // 設置任務運行正常
                ran = true;
            } catch (Throwable ex) {
                // 任務運行報錯的情況
                // 設置結果為空
                result = null;
                // 設置任務運行異常標記
                ran = false;
                // 任務執(zhí)行拋出異常時,保存異常信息,而不直接拋出
                setException(ex);
            }
            // 執(zhí)行成功則保存結果
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        // 執(zhí)行完成后設置runner為null
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        // 獲取任務狀態(tài)
        int s = state;
        // 如果被置為了中斷狀態(tài)則進行中斷的處理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

FutureTask#set()方法處理正常執(zhí)行的運行結果

setException()方法主要完成做下面的工作。

  • 將執(zhí)行結果保存到outcom變量中
  • FutureTask的狀態(tài)從NEW修改為NORMAL
  • 喚醒阻塞在waiters隊列中請求get的所有線程
protected void set(V v) {
    // 將狀態(tài)由NEW更新為COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 保存任務的結果
        outcome = v;
       // 更新狀態(tài)的最終狀態(tài)-NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // 通用的完成操作,主要作用就是喚醒阻塞在waiters隊列中請求get的線程
        finishCompletion();
    }
}

FutureTask#setException()方法處理執(zhí)行異常的結果

setException()方法主要完成做下面的工作。

  • 將異常信息保存到outcom變量中
  • FutureTask的狀態(tài)從NEW修改為EXCEPTIONAL
  • 喚醒阻塞在waiters隊列中請求get的所有線程
// FutureTask#setException
protected void setException(Throwable t) {
    // 將狀態(tài)由NEW更新為COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 將異常信息保存到輸出結果中
        outcome = t;
        // 更新狀態(tài)機處理異常的最終狀態(tài)-EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // 通用的完成操作,主要作用就是喚醒阻塞在waiters隊列中請求get的線程
        finishCompletion();
    }
}

這里的finishCompletion()喚醒我們在后面講解,上面的整個邏輯可以用一張圖表示:

任務結果獲取get()原理

其他線程可以調用get()方法或者超時阻塞方法get(long timeout, TimeUnit unit)獲取任務運行的結果。

FutureTask#get()方法是獲取任務執(zhí)行結果的入口方法。

// 阻塞獲取任務結果
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 任務還沒有執(zhí)行完成,通過awaitDone方法進行阻塞等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 返回結果
    return report(s);
}

// 超時阻塞獲取任務結果
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    // 判空處理
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    // 任務還沒有執(zhí)行完成,通過awaitDone方法進行阻塞等待
    if (s <= COMPLETING &&
        // 如果awaitDone返回的結果還是小于等于COMPLETING,表示運行中,那么直接拋出超時異常
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    // 返回結果
    return report(s);
}

FutureTask#awaitDone()方法阻塞等待任務執(zhí)行結束

該方法主要完成下面的工作:

  • 判斷任務是否運行結束,結束的話直接返回運行狀態(tài)
  • 如果任務沒有結果,將請求線程阻塞
  • 請求線程阻塞時,會創(chuàng)建一個waiter節(jié)點,然后加入到阻塞等待的棧中
// 線程阻塞等待方法, timed等于 true表示阻塞等待有時間限制nanos, false表示沒有,一直阻塞
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    // 計算阻塞超時時間點
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    // 表示q是否添加到waiters棧中,默認false
    boolean queued = false;
    // 自旋操作
    for (;;) {
        // 如果阻塞線程被中斷則將當前線程從阻塞隊列中移除
        if (Thread.interrupted()) {
            // 從waiters棧中移除WaitNode,
            removeWaiter(q);
            // 返回中斷移除
            throw new InterruptedException();
        }

        // 獲取任務的狀態(tài)
        int s = state;
        // 如果任務的狀態(tài)大于COMPLETING,表示線程運行結束了,直接返回
        if (s > COMPLETING) { 
            // 任務已經完成時直接返回結果
            if (q != null)
                q.thread = null;
            // 返回狀態(tài)
            return s;
        }
        // 如果任務狀態(tài)是COMPLETING     
        else if (s == COMPLETING) 
            // 如果任務執(zhí)行完成,但還差最后一步最終完成,則讓出CPU給任務執(zhí)行線程繼續(xù)執(zhí)行
            Thread.yield();
        // 如果任務狀態(tài)小于COMPLETING,說明任務還在運行中
        // 如果q為空的情況    
        else if (q == null)
            // 新進來的線程添加等待節(jié)點
            q = new WaitNode();
        // 如果任務還在運行中并且當前線程節(jié)點還不在waiters棧中,那么就加入   
        else if (!queued)
            // 上一步節(jié)點創(chuàng)建完,還沒將其添加到waiters棧中,因此在下一個循環(huán)就會執(zhí)行此處進行入棧操作,并將當前線程的等待節(jié)點置于棧頂
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 如果任務還在運行中并且timed為true,表示有超時限制
        else if (timed) {
            // 如果設置了阻塞超時時間,則進行檢查是否達到阻塞超時時間,達到了則刪除當前線程的等待節(jié)點并退出循環(huán)返回,否則繼續(xù)阻塞
            nanos = deadline - System.nanoTime();
            // 如果nanos小于等于0
            if (nanos <= 0L) {
                // 從waiters棧中移除
                removeWaiter(q);
                //返回狀態(tài)
                return state;
            }
            // 超時阻塞當前線程,超過時間,就會恢復
            LockSupport.parkNanos(this, nanos);
        }
        // 如果任務還在運行中并且timed為false,沒有有超時限制
        else
            // 一直阻塞當前線程
            LockSupport.park(this);
    }
}

FutureTask#report方法解析返回任務結果

// 獲取任務結果方法:正常執(zhí)行則直接返回結果,否則拋出異常
private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 如果狀態(tài)是正常情況
    if (s == NORMAL)
        // 直接返回
        return (V)x;
    // 如果狀態(tài)是取消了,拋出異常
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

FutureTask#finishCompletion()方法用來喚醒前面等待的線程

上一步awaitDone方法會阻塞調用的線程,那么任務運行結束總要喚醒他們去拿結果吧,這個工作就在finishCompletion()方法中。

private void finishCompletion() {
    // 遍歷waiters棧中的每個元素;
    for (WaitNode q; (q = waiters) != null;) {
        // cas設置waiters中q節(jié)點數(shù)據(jù)為null,成功的話,進入到if中
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 自選操作
            for (;;) {
                // 獲取節(jié)點中的線程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 喚醒線程
                    LockSupport.unpark(t);
                }
                // 獲取下一個節(jié)點
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
	//鉤子方法,有子類去實現(xiàn)
    done();
    // 設置原來的任務callable為null
    callable = null;        // to reduce footprint
}

任務取消cancel()原理

可以調用FutureTask#cancel方法取消任務執(zhí)行,但是要注意下面幾點:

  • 任務取消時會先檢查是否允許取消,當任務已經完成或者正在完成(正常執(zhí)行并繼續(xù)處理結果 或 執(zhí)行異常處理異常結果)時不允許取消。
  • cancel方法有個boolean入?yún)?,若為false,則只喚醒所有等待的線程,不中斷正在執(zhí)行的任務線程。若為true則直接中斷任務執(zhí)行線程,同時修改狀態(tài)為INTERRUPTED。
// 取消任務,參數(shù)mayInterruptIfRunning為true,會中斷運行中的線程,false不會
public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果FutureTask的狀態(tài)不是NEW或者CAS設置失敗時,直接返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        // 如果參數(shù)mayInterruptIfRunning為true,中斷
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //cas修改狀態(tài)為INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 喚醒其他等待的線程
        finishCompletion();
    }
    return true;
}

cancel方法實際上完成以下兩種狀態(tài)轉換之一:

  • NEW -> CANCELLED (對應于mayInterruptIfRunning=false)
  • NEW -> INTERRUPTING -> INTERRUPTED (對應于mayInterruptIfRunning=true)

總結

本文講解了線程池submit()提交任務的原理實現(xiàn),通過源碼很多平時項目中遇到的坑都找到了答案。比如說之前項目中用線程池submit()方法提交任務處理,發(fā)現(xiàn)任務的異常都不見了,這下明白了,原來是通過setException()保存下來了,只有通過get方法獲取到,只有看過源碼,才會豁然開朗。

以上就是Java線程池submit阻塞獲取結果的實現(xiàn)原理詳解的詳細內容,更多關于Java線程池submit阻塞獲取結果的資料請關注腳本之家其它相關文章!

相關文章

  • Java語言十大基礎特性分析

    Java語言十大基礎特性分析

    這篇文章介紹了Java語言十大基礎特性,它有哪些優(yōu)勢,需要的朋友可以參考下。
    2017-08-08
  • Java的信號量semaphore講解

    Java的信號量semaphore講解

    這篇文章主要介紹了Java的信號量semaphore講解,Semaphore底層是基于AbstractQueuedSynchronizer來實現(xiàn)的,Semaphore稱為計數(shù)信號量,它允許n個任務同時訪問某個資源,需要的朋友可以參考下
    2023-12-12
  • Spring框架 XML配置事務控制的步驟操作

    Spring框架 XML配置事務控制的步驟操作

    這篇文章主要介紹了Spring框架 XML配置事務控制的步驟操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java基于外觀模式實現(xiàn)美食天下食譜功能實例詳解

    Java基于外觀模式實現(xiàn)美食天下食譜功能實例詳解

    這篇文章主要介紹了Java基于外觀模式實現(xiàn)美食天下食譜功能,較為詳細的講述了外觀模式的概念、原理并結合實例形似詳細分析了Java基于外觀模式實現(xiàn)美食天下食譜功能的具體操作步驟與相關注意事項,需要的朋友可以參考下
    2018-05-05
  • 淺析Java中的WeakHashMap

    淺析Java中的WeakHashMap

    這篇文章主要介紹了淺析Java中的WeakHashMap,WeakHashMap其實和HashMap大多數(shù)行為是一樣的,只是WeakHashMap不會阻止GC回收key對象,那么WeakHashMap是怎么做到的呢,這就是我們研究的主要問題,需要的朋友可以參考下
    2023-09-09
  • mybatis 中 foreach collection的用法小結(三種)

    mybatis 中 foreach collection的用法小結(三種)

    這篇文章主要介紹了mybatis 中 foreach collection的用法小結(三種),需要的朋友可以參考下
    2017-10-10
  • 解讀String字符串拼接的原理

    解讀String字符串拼接的原理

    這篇文章主要介紹了關于String字符串拼接的原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • Java手寫Redis服務端的實現(xiàn)

    Java手寫Redis服務端的實現(xiàn)

    本文主要介紹了Java手寫Redis服務端的實現(xiàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能

    SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能

    這篇文章主要介紹了SpringBoot和前端聯(lián)動實現(xiàn)存儲瀏覽記錄功能,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2024-01-01
  • spring boot實現(xiàn)文件上傳

    spring boot實現(xiàn)文件上傳

    這篇文章主要為大家詳細介紹了spring boot實現(xiàn)文件上傳,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-08-08

最新評論