java利用CompletionService保證任務(wù)先完成先獲取到執(zhí)行結(jié)果
CompletionService簡(jiǎn)介
在學(xué)習(xí)future的時(shí)候,我們提到,future.get()
方法會(huì)阻塞線(xiàn)程,所以如果A,B,C三個(gè)線(xiàn)程同時(shí)獲取執(zhí)行結(jié)果,如果A先執(zhí)行,但是A的執(zhí)行時(shí)間很長(zhǎng),那么即使B,C執(zhí)行很短,也無(wú)法獲取到B,C的執(zhí)行結(jié)果,因?yàn)橹骶€(xiàn)程阻塞在A.get()
上了。
ExecutorService executorService = Executors.newFixedThreadPool(4); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorService.submit(A)); futures.add(executorService.submit(B)); futures.add(executorService.submit(C)); // 遍歷 Future list,通過(guò) get() 方法獲取每個(gè) future 結(jié)果 for (Future future:futures) { Integer result = future.get(); // 其他業(yè)務(wù)邏輯 如果A執(zhí)行時(shí)間很長(zhǎng),阻塞 }
那么如何讓B,C也有機(jī)會(huì)能夠獲取到執(zhí)行結(jié)果呢?答案就是java.util.concurrent.CompletionService
。
CompletionService
是Java8的新增接口,JDK為其提供了一個(gè)實(shí)現(xiàn)類(lèi)ExecutorCompletionService
。這個(gè)類(lèi)是為線(xiàn)程池中Task
的執(zhí)行結(jié)果服務(wù)的,即為Executor
中Task
返回Future
而服務(wù)的。CompletionService
的實(shí)現(xiàn)目標(biāo)是任務(wù)先完成可優(yōu)先獲取到,即結(jié)果按照完成先后順序排序。
ExecutorService executorService = Executors.newFixedThreadPool(4); // ExecutorCompletionService 是 CompletionService 唯一實(shí)現(xiàn)類(lèi) CompletionService completionService = new ExecutorCompletionService<>(executorService ); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(completionService.submit(A)); futures.add(completionService.submit(B)); futures.add(completionService.submit(C)); // 遍歷 Future list,通過(guò) get() 方法獲取每個(gè) future 結(jié)果 for (int i = 0; i < futures.size(); i++) { Integer result = completionService.take().get(); // 其他業(yè)務(wù)邏輯 }
CompletionService原理
我們來(lái)試想一下,如果是你應(yīng)該如何解決上述Feture
帶來(lái)的阻塞問(wèn)題呢?可以通過(guò)阻塞隊(duì)列來(lái)實(shí)現(xiàn),偽代碼如下:
// 創(chuàng)建阻塞隊(duì)列 BlockingQueue<Integer> bq = new LinkedBlockingQueue<>(); // 任務(wù)A 異步進(jìn)入阻塞隊(duì)列 executor.execute(() -> bq.put(A.get())); // 任務(wù)B 異步進(jìn)入阻塞隊(duì)列 executor.execute(() -> bq.put(B.get())); // 任務(wù)C 異步進(jìn)入阻塞隊(duì)列 executor.execute(()-> bq.put(C.get())); for (int i = 0; i < 3; i++) { Integer r = bq.take(); // 異步執(zhí)行所有業(yè)務(wù)邏輯 executor.execute(()->action(r)); }
實(shí)際上CompletionService
的實(shí)現(xiàn)原理也是內(nèi)部維護(hù)了一個(gè)阻塞隊(duì)列,當(dāng)任務(wù)執(zhí)行結(jié)束就把任務(wù)的執(zhí)行結(jié)果加入到阻塞隊(duì)列中,不同的是CompletionService是把任務(wù)執(zhí)行結(jié)果的Future對(duì)象加入到阻塞隊(duì)列中。
CompletionService
是一個(gè)接口,submit()
用于提交任務(wù),take()和poll()
用于從阻塞隊(duì)列中獲取并移除一個(gè)元素,它們的區(qū)別在于如果阻塞隊(duì)列是空的,那么調(diào)用take()
方法的線(xiàn)程就會(huì)被阻塞,而poll()
方法會(huì)返回null
值。
public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
其實(shí)現(xiàn)類(lèi)ExecutorCompletionService
,實(shí)際上可以看做是Executor
和 BlockingQueue
的結(jié)合體,ExecutorCompletionService
把具體的計(jì)算任務(wù)交給 Executor
完成,通過(guò)BlockingQueue
的take()
方法獲得任務(wù)執(zhí)行的結(jié)果。
ExecutorCompletionService
有兩個(gè)構(gòu)造函數(shù)
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; // 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool // 其余框架也有實(shí)現(xiàn)了AbstractExecutorService抽象類(lèi),目前JDK里只有上述的三種實(shí)現(xiàn) // 如果不是,則為null this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; // 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool // 其余框架也有實(shí)現(xiàn)了AbstractExecutorService抽象類(lèi),目前JDK里只有上述的三種實(shí)現(xiàn) // 如果不是,則為null this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
兩個(gè)構(gòu)造器都需要傳入Executor
,如果不傳BlockingQueue<Futrue>
,默認(rèn)會(huì)創(chuàng)建一個(gè)LinkedBlockingQueue<Future<V>>
的隊(duì)列,該BlockingQueue
的作用是保存Executor
執(zhí)行的結(jié)果。
submit()
源碼如下:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; }
當(dāng)提交一個(gè)任務(wù)到ExecutorCompletionService
時(shí),首先需要將task
封裝成RunableFuture<V>
,通過(guò)newTaskFor()
完成,然后再將RunableFuture
封裝成QueueingFuture
,它是FutureTask
的一個(gè)子類(lèi),然后改寫(xiě)FutureTask
的done
方法,之后把Executor
執(zhí)行的計(jì)算結(jié)果放入BlockingQueue
中。
newTaskFor()
的源碼如下:
private RunnableFuture<V> newTaskFor(Callable<V> task) { // aes是AbstractExecutorService,其實(shí)現(xiàn)類(lèi)是ThreadPoolExecutor,F(xiàn)orkJoinPool,SchedulerThreadPoolExecutor if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
QueueingFuture
的源碼如下:
private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; // 會(huì)被java.util.concurrent.FutureTask#finishCompletion調(diào)用,判讀是否計(jì)算完成 // 計(jì)算結(jié)果放在阻塞隊(duì)列中 protected void done() { completionQueue.add(task); } }
take()
和poll()
方法如下:
// 從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)阻塞,直到有任務(wù)完成返回結(jié)果。 public Future<V> take() throws InterruptedException { return completionQueue.take(); } // 從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)返回null,該方法不會(huì)阻塞。 public Future<V> poll() { return completionQueue.poll(); } // 從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)返回null,該方法不會(huì)阻塞。 // 超時(shí) public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
以上就是java利用CompletionService保證任務(wù)先完成先獲取到執(zhí)行結(jié)果的詳細(xì)內(nèi)容,更多關(guān)于java CompletionService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)的貸款金額計(jì)算功能示例
這篇文章主要介紹了Java實(shí)現(xiàn)的貸款金額計(jì)算功能,結(jié)合實(shí)例形式分析了Java簡(jiǎn)單數(shù)值運(yùn)算及類(lèi)型轉(zhuǎn)換等相關(guān)操作技巧,需要的朋友可以參考下2018-01-01Maven工程搭建spring boot+spring mvc+JPA的示例
本篇文章主要介紹了Maven工程搭建spring boot+spring mvc+JPA的示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-01-01JDK動(dòng)態(tài)代理之ProxyGenerator生成代理類(lèi)的字節(jié)碼文件解析
這篇文章主要為大家詳細(xì)介紹了JDK動(dòng)態(tài)代理之ProxyGenerator生成代理類(lèi)的字節(jié)碼文件,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例
這篇文章主要介紹了java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例,需要的朋友可以參考下2014-04-04Java如何判斷一個(gè)空對(duì)象的常見(jiàn)方法
在Java中判斷對(duì)象是否為空是一項(xiàng)重要的編程技巧,可以有效防止空指針異常的發(fā)生,下面這篇文章主要給大家介紹了關(guān)于利用Java如何判斷一個(gè)空對(duì)象的相關(guān)資料,需要的朋友可以參考下2024-01-01詳解JVM的內(nèi)存對(duì)象介紹[創(chuàng)建和訪問(wèn)]
這篇文章主要介紹了JVM的內(nèi)存對(duì)象介紹[創(chuàng)建和訪問(wèn)],文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03