欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java8中AbstractExecutorService與FutureTask源碼詳解

 更新時(shí)間:2022年01月26日 09:24:36   作者:孫大圣666  
這篇文章主要給大家介紹了關(guān)于Java8中AbstractExecutorService與FutureTask的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

前言

本篇博客重點(diǎn)講解ThreadPoolExecutor的三個(gè)基礎(chǔ)設(shè)施類AbstractExecutorService、FutureTask和ExecutorCompletionService的實(shí)現(xiàn)細(xì)節(jié),AbstractExecutorService實(shí)現(xiàn)了ExecutorService的大部分接口,子類只需實(shí)現(xiàn)excute方法和shutdown相關(guān)方法即可;FutureTask是RunnableFuture接口的主要實(shí)現(xiàn),該接口是Runnable和Future的包裝類接口,會(huì)執(zhí)行Runnable對(duì)應(yīng)的run方法,調(diào)用方可以通過(guò)Future接口獲取任務(wù)的執(zhí)行狀態(tài)和結(jié)果;ExecutorCompletionService是幫助獲取多個(gè)RunnableFuture任務(wù)的執(zhí)行結(jié)果的工具類,基于FutureTask執(zhí)行完成時(shí)的回調(diào)方法done實(shí)現(xiàn)的。

一、AbstractExecutorService

1、定義

ThreadPoolExecutor的類繼承關(guān)系如下:

其中ExecutorService的子類如下:

右上角帶S的表示內(nèi)部類,我們重點(diǎn)關(guān)注ThreadPoolExecutor,ScheduledThreadPoolExecutor和ForkJoinPool三個(gè)類的實(shí)現(xiàn),后面兩個(gè)類會(huì)在后面的博客中逐一探討。

Executor包含的方法如下:

ExecutorService包含的方法如下:

上述接口方法中涉及的Callable接口的定義如下:

該接口也是表示一個(gè)執(zhí)行任務(wù),跟常見的Runnable接口的區(qū)別在于call方法有返回值而run方法沒有返回值。 

Future表示某個(gè)任務(wù)的執(zhí)行結(jié)果,其定義的方法如下:

其子類比較多,如下:

后面會(huì)將涉及的子類逐一探討的。 AbstractExecutorService基于Executor接口的excute方法實(shí)現(xiàn)了大部分的ExecutorService的接口,子類只需要重點(diǎn)實(shí)現(xiàn)excute方法和shutdown相關(guān)方法即可,下面來(lái)分析其具體的實(shí)現(xiàn)。

2、submit

//Runnable接口方法沒有返回值,但是可以通過(guò)Future判斷任務(wù)是否執(zhí)行完成
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 
//因?yàn)镽unnable的run方法沒有返回值,所以如果run方法正常執(zhí)行完成,其結(jié)果就是result
public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
 
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
 
//都是返回FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
 
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

3、invokeAll

//執(zhí)行完成tasks中所有的任務(wù),如果有一個(gè)拋出異常,則取消掉剩余的任務(wù)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //遍歷tasks中的任務(wù)將其轉(zhuǎn)換成RunnableFuture,然后提交到線程池執(zhí)行
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            //遍歷Future列表
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { //如果未執(zhí)行完成
                    try {
                        //等待任務(wù)執(zhí)行完成
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            //所有任務(wù)都執(zhí)行完了
            done = true;
            return futures;
        } finally {
            if (!done)
                //出現(xiàn)異常,將所有的任務(wù)都取消掉
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
 
//邏輯同上,不過(guò)加了等待時(shí)間限制,所有的任務(wù)的累計(jì)時(shí)間不能超過(guò)指定值,如果超時(shí)直接返回Future列表
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        //轉(zhuǎn)換成納秒    
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //轉(zhuǎn)換成Future
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            
            //計(jì)算終止時(shí)間
            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();
 
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime(); //計(jì)算剩余時(shí)間
                if (nanos <= 0L) //如果超時(shí)了則直接返回
                    return futures;
            }
 
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { //任務(wù)未執(zhí)行
                    if (nanos <= 0L)
                        return futures; //等待超時(shí)
                    try {
                        //等待任務(wù)執(zhí)行完成
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done) //出現(xiàn)異常,取消掉剩余未執(zhí)行的任務(wù)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

4、invokeAny

//多個(gè)任務(wù)只要有一個(gè)執(zhí)行成功就返回,并把剩余的已提交未執(zhí)行的任務(wù)給取消掉
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
 
//多個(gè)任務(wù)只要有一個(gè)執(zhí)行成功就返回,并把剩余的已提交未執(zhí)行的任務(wù)給取消掉
//如果指定時(shí)間內(nèi)沒有執(zhí)行成功的,則拋出TimeoutException 異常
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
 
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        //參數(shù)校驗(yàn)
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
 
        try {
           
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();
 
            //提交一個(gè)任務(wù)
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;
 
            for (;;) {
                //獲取最新的已完成任務(wù)
                Future<T> f = ecs.poll();
                if (f == null) {
                    //沒有執(zhí)行完的
                    if (ntasks > 0) {
                        --ntasks;
                        //繼續(xù)添加下一個(gè)任務(wù)
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0) //所有任務(wù)都執(zhí)行失敗了,沒有執(zhí)行成功的
                        break;
                    else if (timed) { //等待超時(shí)
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        //計(jì)算剩余等待時(shí)間
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        //所有任務(wù)都提交了,阻塞等待某個(gè)任務(wù)執(zhí)行完成
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        //某個(gè)任務(wù)已執(zhí)行完成,如果拋出異常則執(zhí)行下一個(gè)任務(wù)
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            } //for循環(huán)終止
            
            //所有任務(wù)都執(zhí)行失敗了
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
 
        } finally {
            //返回前,將未執(zhí)行完成的任務(wù)都取消掉
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

二、FutureTask

1、定義

FutureTask的類繼承關(guān)系如下:

    

RunnableFuture接口沒有新增方法,將Runnable的run方法由public改成包級(jí)訪問了,如下:

該類包含的實(shí)例屬性如下:

    /** 執(zhí)行的任務(wù)*/
    private Callable<V> callable;
    
    /** 任務(wù)執(zhí)行的結(jié)果或者執(zhí)行過(guò)程中拋出的異常 */
    private Object outcome; // non-volatile, protected by state reads/writes
    
    /** 執(zhí)行任務(wù)的線程 */
    private volatile Thread runner;
    
    /** 等待線程的鏈表*/
    private volatile WaitNode waiters;
 
    //狀態(tài)
    private volatile int state;

其中WaitNode是一個(gè)簡(jiǎn)單的內(nèi)部類,其定義如下:

該類包含的靜態(tài)屬性都是字段偏移量,通過(guò)static代碼塊初始化,如下:

FutureTask定義了多個(gè)表示狀態(tài)的常量,如下:

    //初始狀態(tài)
    private static final int NEW          = 0;
    
    //是一個(gè)很短暫的中間狀態(tài),表示任務(wù)已執(zhí)行完成,保存完執(zhí)行結(jié)果后就流轉(zhuǎn)成NORMAL或者EXCEPTIONAL
    private static final int COMPLETING   = 1;
    
    //正常執(zhí)行完成
    private static final int NORMAL       = 2;
    
    //異常終止
    private static final int EXCEPTIONAL  = 3;
    
    //任務(wù)被取消了
    private static final int CANCELLED    = 4;
    
    //是一個(gè)很短暫的中間狀態(tài),調(diào)用interrupt方法后,會(huì)將狀態(tài)流轉(zhuǎn)成INTERRUPTED
    private static final int INTERRUPTING = 5;
    
    //任務(wù)執(zhí)行已中斷
    private static final int INTERRUPTED  = 6;

可能的狀態(tài)流轉(zhuǎn)如下圖:

2、構(gòu)造方法

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       //初始狀態(tài)是NEW
    }
 
public FutureTask(Runnable runnable, V result) {
        //將Runnable適配成Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
 
//Executors方法
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

其中RunnableAdapter是Executors的一個(gè)靜態(tài)內(nèi)部類,其實(shí)現(xiàn)如下:

3、get 

get方法用于阻塞當(dāng)前線程直到任務(wù)執(zhí)行完成,如果阻塞的過(guò)程中被中斷則拋出異常InterruptedException,可以限制阻塞的時(shí)間,如果等待超時(shí)還是未完成則拋出異常TimeoutException。

//阻塞當(dāng)前線程等待任務(wù)執(zhí)行完成
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) //如果未完成
            s = awaitDone(false, 0L);
        return report(s);
    }
 
//同上,可以限制等待的時(shí)間
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
             //阻塞當(dāng)前線程,如果返回值還是未完成說(shuō)明是等待超時(shí)了,則拋出異常
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) 
            throw new TimeoutException();
        return report(s);
    }
 
//timed為true表示等待指定的時(shí)間,否則是無(wú)期限等待
//該方法返回退出此方法時(shí)的狀態(tài)
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        //計(jì)算等待的終止時(shí)間
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) { //如果當(dāng)前線程被中斷了,則從等待鏈表中移除,并拋出異常
                removeWaiter(q);
                throw new InterruptedException();
            }
 
            int s = state;
            if (s > COMPLETING) { //如果任務(wù)已執(zhí)行完
                if (q != null)
                    q.thread = null; 
                return s;
            }
            else if (s == COMPLETING) //正在狀態(tài)流轉(zhuǎn)的過(guò)程中,讓出當(dāng)前CPU時(shí)間片
                Thread.yield();
            //未開始執(zhí)行    
            else if (q == null)
                q = new WaitNode(); 
            else if (!queued)
                //修改waiters屬性,插入到鏈表頭
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //已插入到鏈表中
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) { //等待超時(shí),從鏈表中移除
                    removeWaiter(q);
                    return state;
                }
                //讓當(dāng)前線程休眠
                LockSupport.parkNanos(this, nanos);
            }
            else
                //讓當(dāng)前線程休眠
                LockSupport.park(this);
        }
    }
 
private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;//將thread置為null
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    //q.thread為null,需要被移除    
                    else if (pred != null) {
                        pred.next = s; //將q從鏈表移除
                        if (pred.thread == null) //如果為null,則從頭開始遍歷
                            continue retry;
                    }
                    //q.thread為null,pred為null,之前沒有有效節(jié)點(diǎn),修改waiters,修改失敗重試
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }
 
//awaitDone正常返回后調(diào)用此方法,此時(shí)狀態(tài)應(yīng)該是COMPLETING之后了
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL) //如果是正常結(jié)束
            return (V)x;
        if (s >= CANCELLED) //如果被取消了
            throw new CancellationException();
        throw new ExecutionException((Throwable)x); //如果出現(xiàn)異常了
    }

4、run / runAndReset

run方法是有線程池調(diào)用的,會(huì)執(zhí)行Callable任務(wù),保存執(zhí)行的結(jié)果,如果出現(xiàn)異常則保存異常對(duì)象,并完成狀態(tài)流轉(zhuǎn),最后將等待任務(wù)完成的阻塞中的線程喚醒。runAndReset和run類似,區(qū)別在于runAndReset正常執(zhí)行完成后不會(huì)保存執(zhí)行的結(jié)果,不會(huì)改變狀態(tài),狀態(tài)還是NEW,如果是正常執(zhí)行則返回true,該方法是子類使用的,其調(diào)用鏈如下:

這兩方法的實(shí)現(xiàn)如下: 

 
//由線程池中的某個(gè)線程調(diào)用此方法
public void run() {
        //如果不等于NEW,說(shuō)明其他某個(gè)線程正在執(zhí)行任務(wù)
        //如果等于NEW,則cas修改runner屬性,修改失敗說(shuō)明其他某個(gè)線程也準(zhǔn)備執(zhí)行這個(gè)任務(wù)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        //cas成功表示這個(gè)任務(wù)由當(dāng)前線程搶占成功    
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //執(zhí)行任務(wù)
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //出現(xiàn)異常
                    result = null;
                    ran = false;
                    setException(ex); //保存異常對(duì)象
                }
                if (ran)
                    //執(zhí)行成功保存結(jié)果
                    set(result);
            }
        } finally {
            //如果任務(wù)被cancel了,則上述setException和set方法因?yàn)闋顟B(tài)不是NEW了會(huì)直接返回
            runner = null;
            int s = state;
            if (s >= INTERRUPTING) //如果被中斷,自旋等待中斷完成
                handlePossibleCancellationInterrupt(s);
        }
    }
 
//跟run方法相比區(qū)別就是正常執(zhí)行完成不會(huì)保存結(jié)果,不會(huì)流轉(zhuǎn)狀態(tài)
protected boolean runAndReset() {
        //如果state不是NEW或者cas修改runner失敗
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    //執(zhí)行任務(wù),但是不保存結(jié)果,狀態(tài)就不會(huì)從NEW流轉(zhuǎn)成NORMAL
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex); //保存異常實(shí)例
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING) //任務(wù)被中斷了,自旋等待中斷完成
                handlePossibleCancellationInterrupt(s);
        }
        //返回任務(wù)是否正常完成
        return ran && s == NEW;
    }
 
//保存異常對(duì)象并修改狀態(tài)
protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //只有原來(lái)的狀態(tài)是NEW才進(jìn)入下面的邏輯
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //任務(wù)執(zhí)行完成,喚醒阻塞的線程
            finishCompletion();
        }
    }
 
//保存執(zhí)行結(jié)果并修改狀態(tài)
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //只有原來(lái)的狀態(tài)是NEW才進(jìn)入下面的邏輯
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
             //任務(wù)執(zhí)行完成,喚醒阻塞的線程
            finishCompletion();
        }
    }
 
private void handlePossibleCancellationInterrupt(int s) {
        //正在中斷的過(guò)程中
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); //自旋等待中斷完成
    }
 
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                //cas將waiters置為null
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        //喚醒阻塞的新線程
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    //遍歷下一個(gè)節(jié)點(diǎn)
                    WaitNode next = q.next;
                    if (next == null) //遍歷結(jié)束,終止循環(huán)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break; //終止外層循環(huán)
            }
        }
        //執(zhí)行完成的回調(diào)方法,默認(rèn)是空實(shí)現(xiàn),子類可改寫此方法
        done();
 
        callable = null;        // to reduce footprint
    }

5、 cancel

cancel方法的參數(shù)為true,則會(huì)將當(dāng)前狀態(tài)由NEW改成INTERRUPTING,如果此任務(wù)已經(jīng)開始執(zhí)行了,則將正在執(zhí)行任務(wù)的線程標(biāo)記為已中斷,如果該線程響應(yīng)中斷則可能拋出異常,如果不響應(yīng)中斷則繼續(xù)執(zhí)行,最后再將狀態(tài)改成INTERRUPTED;如果方法的參數(shù)為false,則將當(dāng)前狀態(tài)由NEW改成CANCELLED,如果此任務(wù)已經(jīng)開始執(zhí)行了則會(huì)繼續(xù)執(zhí)行。上述兩種情形下,狀態(tài)流轉(zhuǎn)完成后都會(huì)喚醒還在阻塞中的等待線程,如果任務(wù)已經(jīng)開始執(zhí)行并且繼續(xù)執(zhí)行,因?yàn)闋顟B(tài)已經(jīng)不是NEW了,直接結(jié)果不會(huì)保存下來(lái)。

 
//如果mayInterruptIfRunning為true,則會(huì)將正在執(zhí)行任務(wù)的線程標(biāo)記為已中斷,線程有可能繼續(xù)執(zhí)行,也有可能響應(yīng)中斷拋出異常
//如果為false,則標(biāo)記為CANCELLED,如果任務(wù)已經(jīng)開始執(zhí)行了則會(huì)繼續(xù)執(zhí)行
//如果未執(zhí)行,則標(biāo)記為CANCELLED或者INTERRUPTING都會(huì)讓這任務(wù)不會(huì)被執(zhí)行了
public boolean cancel(boolean mayInterruptIfRunning) {
        //如果state不是NEW 或者cas修改失敗,則返回false
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt(); //將正在執(zhí)行任務(wù)的線程標(biāo)記為已中斷
                } finally { 
                    //修改狀態(tài)為已中斷 
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
             //喚醒等待的線程
            finishCompletion();
        }
        return true;
    }

三、ExecutorCompletionService

1、定義

ExecutorCompletionService是一個(gè)幫助獲取多個(gè)Future執(zhí)行結(jié)果的工具類,其類繼承關(guān)系如下:

CompletionService包含的方法如下:

后面會(huì)講解各方法的用途,該類包含的屬性如下:

    //執(zhí)行任務(wù)的線程池實(shí)現(xiàn)
    private final Executor executor;
    
    //調(diào)用其newTaskFor方法
    private final AbstractExecutorService aes;
 
    //已執(zhí)行完成的Future阻塞隊(duì)列
    private final BlockingQueue<Future<V>> completionQueue;

其構(gòu)造方法實(shí)現(xiàn)如下:

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        //如果executor繼承自AbstractExecutorService,則aes為executor,否則為null
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //沒有指定隊(duì)列,默認(rèn)使用基于鏈表的無(wú)固定容量的LinkedBlockingQueue    
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
 
public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

2、submit

submit方法將Callable或者Runnable任務(wù)包裝成一個(gè)RunnableFuture,然后提交到線程池中,返回RunnableFuture實(shí)例。

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        //將其包裝成RunnableFuture實(shí)現(xiàn)類
        RunnableFuture<V> f = newTaskFor(task);
        //提交任務(wù)到線程池
        executor.execute(new QueueingFuture(f));
        return f;
    }
 
public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }
 
private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task); //默認(rèn)使用FutureTask作為RunnableFuture的實(shí)現(xiàn)
        else
            return aes.newTaskFor(task);//如果aes不為null,則使用該類的特定實(shí)現(xiàn)
    }
 
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

其中QueueingFuture是一個(gè)內(nèi)部類,繼承自FutureTask,其實(shí)現(xiàn)如下:

重點(diǎn)改寫了done方法的實(shí)現(xiàn),如果任務(wù)已經(jīng)執(zhí)行完成,則會(huì)將該Future實(shí)例添加到阻塞隊(duì)列中。

3、take / poll 

這三方法就是從已完成的Future阻塞隊(duì)列中獲取并移除Future實(shí)例,如果隊(duì)列為空,take方法會(huì)無(wú)期限阻塞阻塞,不帶時(shí)間參數(shù)的poll方法不會(huì)阻塞返回null,帶時(shí)間參數(shù)的poll方法會(huì)阻塞指定的時(shí)間,如果超時(shí)則返回null,其實(shí)現(xiàn)都是直接調(diào)用阻塞隊(duì)列的方法,如下:

總結(jié) 

到此這篇關(guān)于Java8中AbstractExecutorService與FutureTask源碼詳解的文章就介紹到這了,更多相關(guān)Java8 AbstractExecutorService與FutureTask內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論