java利用CompletionService保證任務先完成先獲取到執(zhí)行結果
CompletionService簡介
在學習future的時候,我們提到,future.get()
方法會阻塞線程,所以如果A,B,C三個線程同時獲取執(zhí)行結果,如果A先執(zhí)行,但是A的執(zhí)行時間很長,那么即使B,C執(zhí)行很短,也無法獲取到B,C的執(zhí)行結果,因為主線程阻塞在A.get()
上了。
ExecutorService executorService = Executors.newFixedThreadPool(4); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorService.submit(A)); futures.add(executorService.submit(B)); futures.add(executorService.submit(C)); // 遍歷 Future list,通過 get() 方法獲取每個 future 結果 for (Future future:futures) { Integer result = future.get(); // 其他業(yè)務邏輯 如果A執(zhí)行時間很長,阻塞 }
那么如何讓B,C也有機會能夠獲取到執(zhí)行結果呢?答案就是java.util.concurrent.CompletionService
。
CompletionService
是Java8的新增接口,JDK為其提供了一個實現(xiàn)類ExecutorCompletionService
。這個類是為線程池中Task
的執(zhí)行結果服務的,即為Executor
中Task
返回Future
而服務的。CompletionService
的實現(xiàn)目標是任務先完成可優(yōu)先獲取到,即結果按照完成先后順序排序。
ExecutorService executorService = Executors.newFixedThreadPool(4); // ExecutorCompletionService 是 CompletionService 唯一實現(xiàn)類 CompletionService completionService = new ExecutorCompletionService<>(executorService ); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(completionService.submit(A)); futures.add(completionService.submit(B)); futures.add(completionService.submit(C)); // 遍歷 Future list,通過 get() 方法獲取每個 future 結果 for (int i = 0; i < futures.size(); i++) { Integer result = completionService.take().get(); // 其他業(yè)務邏輯 }
CompletionService原理
我們來試想一下,如果是你應該如何解決上述Feture
帶來的阻塞問題呢?可以通過阻塞隊列來實現(xiàn),偽代碼如下:
// 創(chuàng)建阻塞隊列 BlockingQueue<Integer> bq = new LinkedBlockingQueue<>(); // 任務A 異步進入阻塞隊列 executor.execute(() -> bq.put(A.get())); // 任務B 異步進入阻塞隊列 executor.execute(() -> bq.put(B.get())); // 任務C 異步進入阻塞隊列 executor.execute(()-> bq.put(C.get())); for (int i = 0; i < 3; i++) { Integer r = bq.take(); // 異步執(zhí)行所有業(yè)務邏輯 executor.execute(()->action(r)); }
實際上CompletionService
的實現(xiàn)原理也是內(nèi)部維護了一個阻塞隊列,當任務執(zhí)行結束就把任務的執(zhí)行結果加入到阻塞隊列中,不同的是CompletionService是把任務執(zhí)行結果的Future對象加入到阻塞隊列中。
CompletionService
是一個接口,submit()
用于提交任務,take()和poll()
用于從阻塞隊列中獲取并移除一個元素,它們的區(qū)別在于如果阻塞隊列是空的,那么調(diào)用take()
方法的線程就會被阻塞,而poll()
方法會返回null
值。
public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
其實現(xiàn)類ExecutorCompletionService
,實際上可以看做是Executor
和 BlockingQueue
的結合體,ExecutorCompletionService
把具體的計算任務交給 Executor
完成,通過BlockingQueue
的take()
方法獲得任務執(zhí)行的結果。
ExecutorCompletionService
有兩個構造函數(shù)
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; // 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool // 其余框架也有實現(xiàn)了AbstractExecutorService抽象類,目前JDK里只有上述的三種實現(xiàn) // 如果不是,則為null this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; // 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool // 其余框架也有實現(xiàn)了AbstractExecutorService抽象類,目前JDK里只有上述的三種實現(xiàn) // 如果不是,則為null this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
兩個構造器都需要傳入Executor
,如果不傳BlockingQueue<Futrue>
,默認會創(chuàng)建一個LinkedBlockingQueue<Future<V>>
的隊列,該BlockingQueue
的作用是保存Executor
執(zhí)行的結果。
submit()
源碼如下:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; }
當提交一個任務到ExecutorCompletionService
時,首先需要將task
封裝成RunableFuture<V>
,通過newTaskFor()
完成,然后再將RunableFuture
封裝成QueueingFuture
,它是FutureTask
的一個子類,然后改寫FutureTask
的done
方法,之后把Executor
執(zhí)行的計算結果放入BlockingQueue
中。
newTaskFor()
的源碼如下:
private RunnableFuture<V> newTaskFor(Callable<V> task) { // aes是AbstractExecutorService,其實現(xiàn)類是ThreadPoolExecutor,F(xiàn)orkJoinPool,SchedulerThreadPoolExecutor if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
QueueingFuture
的源碼如下:
private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; // 會被java.util.concurrent.FutureTask#finishCompletion調(diào)用,判讀是否計算完成 // 計算結果放在阻塞隊列中 protected void done() { completionQueue.add(task); } }
take()
和poll()
方法如下:
// 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會阻塞,直到有任務完成返回結果。 public Future<V> take() throws InterruptedException { return completionQueue.take(); } // 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會返回null,該方法不會阻塞。 public Future<V> poll() { return completionQueue.poll(); } // 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會返回null,該方法不會阻塞。 // 超時 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
以上就是java利用CompletionService保證任務先完成先獲取到執(zhí)行結果的詳細內(nèi)容,更多關于java CompletionService的資料請關注腳本之家其它相關文章!
相關文章
Maven工程搭建spring boot+spring mvc+JPA的示例
本篇文章主要介紹了Maven工程搭建spring boot+spring mvc+JPA的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件解析
這篇文章主要為大家詳細介紹了JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02java實現(xiàn)小i機器人api接口調(diào)用示例
這篇文章主要介紹了java實現(xiàn)小i機器人api接口調(diào)用示例,需要的朋友可以參考下2014-04-04詳解JVM的內(nèi)存對象介紹[創(chuàng)建和訪問]
這篇文章主要介紹了JVM的內(nèi)存對象介紹[創(chuàng)建和訪問],文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03