Java8中AbstractExecutorService與FutureTask源碼詳解
前言
本篇博客重點(diǎn)講解ThreadPoolExecutor的三個(gè)基礎(chǔ)設(shè)施類AbstractExecutorService、FutureTask和ExecutorCompletionService的實(shí)現(xiàn)細(xì)節(jié),AbstractExecutorService實(shí)現(xiàn)了ExecutorService的大部分接口,子類只需實(shí)現(xiàn)excute方法和shutdown相關(guān)方法即可;FutureTask是RunnableFuture接口的主要實(shí)現(xiàn),該接口是Runnable和Future的包裝類接口,會(huì)執(zhí)行Runnable對(duì)應(yīng)的run方法,調(diào)用方可以通過(guò)Future接口獲取任務(wù)的執(zhí)行狀態(tài)和結(jié)果;ExecutorCompletionService是幫助獲取多個(gè)RunnableFuture任務(wù)的執(zhí)行結(jié)果的工具類,基于FutureTask執(zhí)行完成時(shí)的回調(diào)方法done實(shí)現(xiàn)的。
一、AbstractExecutorService
1、定義
ThreadPoolExecutor的類繼承關(guān)系如下:
其中ExecutorService的子類如下:
右上角帶S的表示內(nèi)部類,我們重點(diǎn)關(guān)注ThreadPoolExecutor,ScheduledThreadPoolExecutor和ForkJoinPool三個(gè)類的實(shí)現(xiàn),后面兩個(gè)類會(huì)在后面的博客中逐一探討。
Executor包含的方法如下:
ExecutorService包含的方法如下:
上述接口方法中涉及的Callable接口的定義如下:
該接口也是表示一個(gè)執(zhí)行任務(wù),跟常見的Runnable接口的區(qū)別在于call方法有返回值而run方法沒有返回值。
Future表示某個(gè)任務(wù)的執(zhí)行結(jié)果,其定義的方法如下:
其子類比較多,如下:
后面會(huì)將涉及的子類逐一探討的。 AbstractExecutorService基于Executor接口的excute方法實(shí)現(xiàn)了大部分的ExecutorService的接口,子類只需要重點(diǎn)實(shí)現(xiàn)excute方法和shutdown相關(guān)方法即可,下面來(lái)分析其具體的實(shí)現(xiàn)。
2、submit
//Runnable接口方法沒有返回值,但是可以通過(guò)Future判斷任務(wù)是否執(zhí)行完成 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } //因?yàn)镽unnable的run方法沒有返回值,所以如果run方法正常執(zhí)行完成,其結(jié)果就是result public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } //都是返回FutureTask protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
3、invokeAll
//執(zhí)行完成tasks中所有的任務(wù),如果有一個(gè)拋出異常,則取消掉剩余的任務(wù) public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { //遍歷tasks中的任務(wù)將其轉(zhuǎn)換成RunnableFuture,然后提交到線程池執(zhí)行 for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } //遍歷Future列表 for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { //如果未執(zhí)行完成 try { //等待任務(wù)執(zhí)行完成 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } //所有任務(wù)都執(zhí)行完了 done = true; return futures; } finally { if (!done) //出現(xiàn)異常,將所有的任務(wù)都取消掉 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } } //邏輯同上,不過(guò)加了等待時(shí)間限制,所有的任務(wù)的累計(jì)時(shí)間不能超過(guò)指定值,如果超時(shí)直接返回Future列表 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); //轉(zhuǎn)換成納秒 long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { //轉(zhuǎn)換成Future for (Callable<T> t : tasks) futures.add(newTaskFor(t)); //計(jì)算終止時(shí)間 final long deadline = System.nanoTime() + nanos; final int size = futures.size(); for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); nanos = deadline - System.nanoTime(); //計(jì)算剩余時(shí)間 if (nanos <= 0L) //如果超時(shí)了則直接返回 return futures; } for (int i = 0; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { //任務(wù)未執(zhí)行 if (nanos <= 0L) return futures; //等待超時(shí) try { //等待任務(wù)執(zhí)行完成 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } nanos = deadline - System.nanoTime(); } } done = true; return futures; } finally { if (!done) //出現(xiàn)異常,取消掉剩余未執(zhí)行的任務(wù) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
4、invokeAny
//多個(gè)任務(wù)只要有一個(gè)執(zhí)行成功就返回,并把剩余的已提交未執(zhí)行的任務(wù)給取消掉 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } //多個(gè)任務(wù)只要有一個(gè)執(zhí)行成功就返回,并把剩余的已提交未執(zhí)行的任務(wù)給取消掉 //如果指定時(shí)間內(nèi)沒有執(zhí)行成功的,則拋出TimeoutException 異常 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { //參數(shù)校驗(yàn) if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); //提交一個(gè)任務(wù) futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { //獲取最新的已完成任務(wù) Future<T> f = ecs.poll(); if (f == null) { //沒有執(zhí)行完的 if (ntasks > 0) { --ntasks; //繼續(xù)添加下一個(gè)任務(wù) futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) //所有任務(wù)都執(zhí)行失敗了,沒有執(zhí)行成功的 break; else if (timed) { //等待超時(shí) f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); //計(jì)算剩余等待時(shí)間 nanos = deadline - System.nanoTime(); } else //所有任務(wù)都提交了,阻塞等待某個(gè)任務(wù)執(zhí)行完成 f = ecs.take(); } if (f != null) { --active; try { //某個(gè)任務(wù)已執(zhí)行完成,如果拋出異常則執(zhí)行下一個(gè)任務(wù) return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } //for循環(huán)終止 //所有任務(wù)都執(zhí)行失敗了 if (ee == null) ee = new ExecutionException(); throw ee; } finally { //返回前,將未執(zhí)行完成的任務(wù)都取消掉 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
二、FutureTask
1、定義
FutureTask的類繼承關(guān)系如下:
RunnableFuture接口沒有新增方法,將Runnable的run方法由public改成包級(jí)訪問了,如下:
該類包含的實(shí)例屬性如下:
/** 執(zhí)行的任務(wù)*/ private Callable<V> callable; /** 任務(wù)執(zhí)行的結(jié)果或者執(zhí)行過(guò)程中拋出的異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** 執(zhí)行任務(wù)的線程 */ private volatile Thread runner; /** 等待線程的鏈表*/ private volatile WaitNode waiters; //狀態(tài) private volatile int state;
其中WaitNode是一個(gè)簡(jiǎn)單的內(nèi)部類,其定義如下:
該類包含的靜態(tài)屬性都是字段偏移量,通過(guò)static代碼塊初始化,如下:
FutureTask定義了多個(gè)表示狀態(tài)的常量,如下:
//初始狀態(tài) private static final int NEW = 0; //是一個(gè)很短暫的中間狀態(tài),表示任務(wù)已執(zhí)行完成,保存完執(zhí)行結(jié)果后就流轉(zhuǎn)成NORMAL或者EXCEPTIONAL private static final int COMPLETING = 1; //正常執(zhí)行完成 private static final int NORMAL = 2; //異常終止 private static final int EXCEPTIONAL = 3; //任務(wù)被取消了 private static final int CANCELLED = 4; //是一個(gè)很短暫的中間狀態(tài),調(diào)用interrupt方法后,會(huì)將狀態(tài)流轉(zhuǎn)成INTERRUPTED private static final int INTERRUPTING = 5; //任務(wù)執(zhí)行已中斷 private static final int INTERRUPTED = 6;
可能的狀態(tài)流轉(zhuǎn)如下圖:
2、構(gòu)造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; //初始狀態(tài)是NEW } public FutureTask(Runnable runnable, V result) { //將Runnable適配成Callable this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } //Executors方法 public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
其中RunnableAdapter是Executors的一個(gè)靜態(tài)內(nèi)部類,其實(shí)現(xiàn)如下:
3、get
get方法用于阻塞當(dāng)前線程直到任務(wù)執(zhí)行完成,如果阻塞的過(guò)程中被中斷則拋出異常InterruptedException,可以限制阻塞的時(shí)間,如果等待超時(shí)還是未完成則拋出異常TimeoutException。
//阻塞當(dāng)前線程等待任務(wù)執(zhí)行完成 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //如果未完成 s = awaitDone(false, 0L); return report(s); } //同上,可以限制等待的時(shí)間 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && //阻塞當(dāng)前線程,如果返回值還是未完成說(shuō)明是等待超時(shí)了,則拋出異常 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } //timed為true表示等待指定的時(shí)間,否則是無(wú)期限等待 //該方法返回退出此方法時(shí)的狀態(tài) private int awaitDone(boolean timed, long nanos) throws InterruptedException { //計(jì)算等待的終止時(shí)間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { //如果當(dāng)前線程被中斷了,則從等待鏈表中移除,并拋出異常 removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { //如果任務(wù)已執(zhí)行完 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) //正在狀態(tài)流轉(zhuǎn)的過(guò)程中,讓出當(dāng)前CPU時(shí)間片 Thread.yield(); //未開始執(zhí)行 else if (q == null) q = new WaitNode(); else if (!queued) //修改waiters屬性,插入到鏈表頭 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //已插入到鏈表中 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //等待超時(shí),從鏈表中移除 removeWaiter(q); return state; } //讓當(dāng)前線程休眠 LockSupport.parkNanos(this, nanos); } else //讓當(dāng)前線程休眠 LockSupport.park(this); } } private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null;//將thread置為null retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; //q.thread為null,需要被移除 else if (pred != null) { pred.next = s; //將q從鏈表移除 if (pred.thread == null) //如果為null,則從頭開始遍歷 continue retry; } //q.thread為null,pred為null,之前沒有有效節(jié)點(diǎn),修改waiters,修改失敗重試 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } //awaitDone正常返回后調(diào)用此方法,此時(shí)狀態(tài)應(yīng)該是COMPLETING之后了 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) //如果是正常結(jié)束 return (V)x; if (s >= CANCELLED) //如果被取消了 throw new CancellationException(); throw new ExecutionException((Throwable)x); //如果出現(xiàn)異常了 }
4、run / runAndReset
run方法是有線程池調(diào)用的,會(huì)執(zhí)行Callable任務(wù),保存執(zhí)行的結(jié)果,如果出現(xiàn)異常則保存異常對(duì)象,并完成狀態(tài)流轉(zhuǎn),最后將等待任務(wù)完成的阻塞中的線程喚醒。runAndReset和run類似,區(qū)別在于runAndReset正常執(zhí)行完成后不會(huì)保存執(zhí)行的結(jié)果,不會(huì)改變狀態(tài),狀態(tài)還是NEW,如果是正常執(zhí)行則返回true,該方法是子類使用的,其調(diào)用鏈如下:
這兩方法的實(shí)現(xiàn)如下:
//由線程池中的某個(gè)線程調(diào)用此方法 public void run() { //如果不等于NEW,說(shuō)明其他某個(gè)線程正在執(zhí)行任務(wù) //如果等于NEW,則cas修改runner屬性,修改失敗說(shuō)明其他某個(gè)線程也準(zhǔn)備執(zhí)行這個(gè)任務(wù) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //cas成功表示這個(gè)任務(wù)由當(dāng)前線程搶占成功 try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //執(zhí)行任務(wù) result = c.call(); ran = true; } catch (Throwable ex) { //出現(xiàn)異常 result = null; ran = false; setException(ex); //保存異常對(duì)象 } if (ran) //執(zhí)行成功保存結(jié)果 set(result); } } finally { //如果任務(wù)被cancel了,則上述setException和set方法因?yàn)闋顟B(tài)不是NEW了會(huì)直接返回 runner = null; int s = state; if (s >= INTERRUPTING) //如果被中斷,自旋等待中斷完成 handlePossibleCancellationInterrupt(s); } } //跟run方法相比區(qū)別就是正常執(zhí)行完成不會(huì)保存結(jié)果,不會(huì)流轉(zhuǎn)狀態(tài) protected boolean runAndReset() { //如果state不是NEW或者cas修改runner失敗 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { //執(zhí)行任務(wù),但是不保存結(jié)果,狀態(tài)就不會(huì)從NEW流轉(zhuǎn)成NORMAL c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); //保存異常實(shí)例 } } } finally { runner = null; s = state; if (s >= INTERRUPTING) //任務(wù)被中斷了,自旋等待中斷完成 handlePossibleCancellationInterrupt(s); } //返回任務(wù)是否正常完成 return ran && s == NEW; } //保存異常對(duì)象并修改狀態(tài) protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //只有原來(lái)的狀態(tài)是NEW才進(jìn)入下面的邏輯 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //任務(wù)執(zhí)行完成,喚醒阻塞的線程 finishCompletion(); } } //保存執(zhí)行結(jié)果并修改狀態(tài) protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //只有原來(lái)的狀態(tài)是NEW才進(jìn)入下面的邏輯 outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //任務(wù)執(zhí)行完成,喚醒阻塞的線程 finishCompletion(); } } private void handlePossibleCancellationInterrupt(int s) { //正在中斷的過(guò)程中 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); //自旋等待中斷完成 } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //cas將waiters置為null for (;;) { Thread t = q.thread; if (t != null) { //喚醒阻塞的新線程 q.thread = null; LockSupport.unpark(t); } //遍歷下一個(gè)節(jié)點(diǎn) WaitNode next = q.next; if (next == null) //遍歷結(jié)束,終止循環(huán) break; q.next = null; // unlink to help gc q = next; } break; //終止外層循環(huán) } } //執(zhí)行完成的回調(diào)方法,默認(rèn)是空實(shí)現(xiàn),子類可改寫此方法 done(); callable = null; // to reduce footprint }
5、 cancel
cancel方法的參數(shù)為true,則會(huì)將當(dāng)前狀態(tài)由NEW改成INTERRUPTING,如果此任務(wù)已經(jīng)開始執(zhí)行了,則將正在執(zhí)行任務(wù)的線程標(biāo)記為已中斷,如果該線程響應(yīng)中斷則可能拋出異常,如果不響應(yīng)中斷則繼續(xù)執(zhí)行,最后再將狀態(tài)改成INTERRUPTED;如果方法的參數(shù)為false,則將當(dāng)前狀態(tài)由NEW改成CANCELLED,如果此任務(wù)已經(jīng)開始執(zhí)行了則會(huì)繼續(xù)執(zhí)行。上述兩種情形下,狀態(tài)流轉(zhuǎn)完成后都會(huì)喚醒還在阻塞中的等待線程,如果任務(wù)已經(jīng)開始執(zhí)行并且繼續(xù)執(zhí)行,因?yàn)闋顟B(tài)已經(jīng)不是NEW了,直接結(jié)果不會(huì)保存下來(lái)。
//如果mayInterruptIfRunning為true,則會(huì)將正在執(zhí)行任務(wù)的線程標(biāo)記為已中斷,線程有可能繼續(xù)執(zhí)行,也有可能響應(yīng)中斷拋出異常 //如果為false,則標(biāo)記為CANCELLED,如果任務(wù)已經(jīng)開始執(zhí)行了則會(huì)繼續(xù)執(zhí)行 //如果未執(zhí)行,則標(biāo)記為CANCELLED或者INTERRUPTING都會(huì)讓這任務(wù)不會(huì)被執(zhí)行了 public boolean cancel(boolean mayInterruptIfRunning) { //如果state不是NEW 或者cas修改失敗,則返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); //將正在執(zhí)行任務(wù)的線程標(biāo)記為已中斷 } finally { //修改狀態(tài)為已中斷 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //喚醒等待的線程 finishCompletion(); } return true; }
三、ExecutorCompletionService
1、定義
ExecutorCompletionService是一個(gè)幫助獲取多個(gè)Future執(zhí)行結(jié)果的工具類,其類繼承關(guān)系如下:
CompletionService包含的方法如下:
后面會(huì)講解各方法的用途,該類包含的屬性如下:
//執(zhí)行任務(wù)的線程池實(shí)現(xiàn) private final Executor executor; //調(diào)用其newTaskFor方法 private final AbstractExecutorService aes; //已執(zhí)行完成的Future阻塞隊(duì)列 private final BlockingQueue<Future<V>> completionQueue;
其構(gòu)造方法實(shí)現(xiàn)如下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; //如果executor繼承自AbstractExecutorService,則aes為executor,否則為null this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; //沒有指定隊(duì)列,默認(rèn)使用基于鏈表的無(wú)固定容量的LinkedBlockingQueue this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
2、submit
submit方法將Callable或者Runnable任務(wù)包裝成一個(gè)RunnableFuture,然后提交到線程池中,返回RunnableFuture實(shí)例。
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); //將其包裝成RunnableFuture實(shí)現(xiàn)類 RunnableFuture<V> f = newTaskFor(task); //提交任務(wù)到線程池 executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); //默認(rèn)使用FutureTask作為RunnableFuture的實(shí)現(xiàn) else return aes.newTaskFor(task);//如果aes不為null,則使用該類的特定實(shí)現(xiàn) } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
其中QueueingFuture是一個(gè)內(nèi)部類,繼承自FutureTask,其實(shí)現(xiàn)如下:
重點(diǎn)改寫了done方法的實(shí)現(xiàn),如果任務(wù)已經(jīng)執(zhí)行完成,則會(huì)將該Future實(shí)例添加到阻塞隊(duì)列中。
3、take / poll
這三方法就是從已完成的Future阻塞隊(duì)列中獲取并移除Future實(shí)例,如果隊(duì)列為空,take方法會(huì)無(wú)期限阻塞阻塞,不帶時(shí)間參數(shù)的poll方法不會(huì)阻塞返回null,帶時(shí)間參數(shù)的poll方法會(huì)阻塞指定的時(shí)間,如果超時(shí)則返回null,其實(shí)現(xiàn)都是直接調(diào)用阻塞隊(duì)列的方法,如下:
總結(jié)
到此這篇關(guān)于Java8中AbstractExecutorService與FutureTask源碼詳解的文章就介紹到這了,更多相關(guān)Java8 AbstractExecutorService與FutureTask內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
分布式開發(fā)醫(yī)療掛號(hào)系統(tǒng)數(shù)據(jù)字典模塊前后端實(shí)現(xiàn)
這篇文章主要為大家介紹了分布式開發(fā)醫(yī)療掛號(hào)系統(tǒng)數(shù)據(jù)字典模塊前后端實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04JAVA像SQL一樣對(duì)List對(duì)象集合進(jìn)行排序
這篇文章主要介紹了JAVA像SQL一樣對(duì)List對(duì)象集合進(jìn)行排序的實(shí)現(xiàn)方法,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07Java基于Semaphore構(gòu)建阻塞對(duì)象池
這篇文章主要介紹了Java基于Semaphore構(gòu)建阻塞對(duì)象池,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Mybatis實(shí)現(xiàn)傳入多個(gè)參數(shù)的四種方法詳細(xì)講解
這篇文章主要介紹了Mybatis實(shí)現(xiàn)傳入多個(gè)參數(shù)的四種方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2023-01-01Spring Security代碼實(shí)現(xiàn)JWT接口權(quán)限授予與校驗(yàn)功能
本文給大家介紹Spring Security代碼實(shí)現(xiàn)JWT接口權(quán)限授予與校驗(yàn)功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2019-12-12深入理解Java8新特性之Stream API的創(chuàng)建方式和中間操作步驟
Stream是Java8的一大亮點(diǎn),是對(duì)容器對(duì)象功能的增強(qiáng),它專注于對(duì)容器對(duì)象進(jìn)行各種非常便利、高效的 聚合操作(aggregate operation)或者大批量數(shù)據(jù)操作。Stream API借助于同樣新出現(xiàn)的Lambda表達(dá)式,極大的提高編程效率和程序可讀性,感興趣的朋友快來(lái)看看吧2021-11-11Java的JSON轉(zhuǎn)換庫(kù)GSON的基本使用方法示例
GSON是Google制作的一個(gè)可以讓Java對(duì)象與JSON互相轉(zhuǎn)換的類庫(kù),下面我們就來(lái)看一下Java的JSON轉(zhuǎn)換庫(kù)GSON的基本使用方法示例:2016-06-06SpringBoot集成Mybatis-Plus多租戶架構(gòu)實(shí)現(xiàn)
本文主要介紹了SpringBoot集成Mybatis-Plus多租戶架構(gòu)實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09