java利用CompletionService保證任務(wù)先完成先獲取到執(zhí)行結(jié)果
CompletionService簡介
在學(xué)習(xí)future的時候,我們提到,future.get()方法會阻塞線程,所以如果A,B,C三個線程同時獲取執(zhí)行結(jié)果,如果A先執(zhí)行,但是A的執(zhí)行時間很長,那么即使B,C執(zhí)行很短,也無法獲取到B,C的執(zhí)行結(jié)果,因為主線程阻塞在A.get()上了。
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
// 遍歷 Future list,通過 get() 方法獲取每個 future 結(jié)果
for (Future future:futures) {
Integer result = future.get();
// 其他業(yè)務(wù)邏輯 如果A執(zhí)行時間很長,阻塞
}那么如何讓B,C也有機會能夠獲取到執(zhí)行結(jié)果呢?答案就是java.util.concurrent.CompletionService。
CompletionService是Java8的新增接口,JDK為其提供了一個實現(xiàn)類ExecutorCompletionService。這個類是為線程池中Task的執(zhí)行結(jié)果服務(wù)的,即為Executor中Task返回Future而服務(wù)的。CompletionService的實現(xiàn)目標(biāo)是任務(wù)先完成可優(yōu)先獲取到,即結(jié)果按照完成先后順序排序。
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService 是 CompletionService 唯一實現(xiàn)類
CompletionService completionService = new ExecutorCompletionService<>(executorService );
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(completionService.submit(A));
futures.add(completionService.submit(B));
futures.add(completionService.submit(C));
// 遍歷 Future list,通過 get() 方法獲取每個 future 結(jié)果
for (int i = 0; i < futures.size(); i++) {
Integer result = completionService.take().get();
// 其他業(yè)務(wù)邏輯
}CompletionService原理
我們來試想一下,如果是你應(yīng)該如何解決上述Feture帶來的阻塞問題呢?可以通過阻塞隊列來實現(xiàn),偽代碼如下:
// 創(chuàng)建阻塞隊列
BlockingQueue<Integer> bq =
new LinkedBlockingQueue<>();
// 任務(wù)A 異步進入阻塞隊列
executor.execute(() -> bq.put(A.get()));
// 任務(wù)B 異步進入阻塞隊列
executor.execute(() -> bq.put(B.get()));
// 任務(wù)C 異步進入阻塞隊列
executor.execute(()-> bq.put(C.get()));
for (int i = 0; i < 3; i++) {
Integer r = bq.take();
// 異步執(zhí)行所有業(yè)務(wù)邏輯
executor.execute(()->action(r));
}實際上CompletionService的實現(xiàn)原理也是內(nèi)部維護了一個阻塞隊列,當(dāng)任務(wù)執(zhí)行結(jié)束就把任務(wù)的執(zhí)行結(jié)果加入到阻塞隊列中,不同的是CompletionService是把任務(wù)執(zhí)行結(jié)果的Future對象加入到阻塞隊列中。
CompletionService是一個接口,submit()用于提交任務(wù),take()和poll()用于從阻塞隊列中獲取并移除一個元素,它們的區(qū)別在于如果阻塞隊列是空的,那么調(diào)用take()方法的線程就會被阻塞,而poll()方法會返回null值。
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}其實現(xiàn)類ExecutorCompletionService,實際上可以看做是Executor和 BlockingQueue的結(jié)合體,ExecutorCompletionService把具體的計算任務(wù)交給 Executor完成,通過BlockingQueue的take()方法獲得任務(wù)執(zhí)行的結(jié)果。
ExecutorCompletionService有兩個構(gòu)造函數(shù)
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
// 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool
// 其余框架也有實現(xiàn)了AbstractExecutorService抽象類,目前JDK里只有上述的三種實現(xiàn)
// 如果不是,則為null
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
// 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool
// 其余框架也有實現(xiàn)了AbstractExecutorService抽象類,目前JDK里只有上述的三種實現(xiàn)
// 如果不是,則為null
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}兩個構(gòu)造器都需要傳入Executor,如果不傳BlockingQueue<Futrue>,默認會創(chuàng)建一個LinkedBlockingQueue<Future<V>>的隊列,該BlockingQueue的作用是保存Executor執(zhí)行的結(jié)果。
submit()源碼如下:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}當(dāng)提交一個任務(wù)到ExecutorCompletionService時,首先需要將task封裝成RunableFuture<V>,通過newTaskFor()完成,然后再將RunableFuture封裝成QueueingFuture,它是FutureTask的一個子類,然后改寫FutureTask的done方法,之后把Executor執(zhí)行的計算結(jié)果放入BlockingQueue中。
newTaskFor()的源碼如下:
private RunnableFuture<V> newTaskFor(Callable<V> task) {
// aes是AbstractExecutorService,其實現(xiàn)類是ThreadPoolExecutor,F(xiàn)orkJoinPool,SchedulerThreadPoolExecutor
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}QueueingFuture的源碼如下:
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
// 會被java.util.concurrent.FutureTask#finishCompletion調(diào)用,判讀是否計算完成
// 計算結(jié)果放在阻塞隊列中
protected void done() { completionQueue.add(task); }
}take()和poll()方法如下:
// 從結(jié)果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒有就會阻塞,直到有任務(wù)完成返回結(jié)果。
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
// 從結(jié)果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒有就會返回null,該方法不會阻塞。
public Future<V> poll() {
return completionQueue.poll();
}
// 從結(jié)果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒有就會返回null,該方法不會阻塞。
// 超時
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}以上就是java利用CompletionService保證任務(wù)先完成先獲取到執(zhí)行結(jié)果的詳細內(nèi)容,更多關(guān)于java CompletionService的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Maven工程搭建spring boot+spring mvc+JPA的示例
本篇文章主要介紹了Maven工程搭建spring boot+spring mvc+JPA的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01
JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件解析
這篇文章主要為大家詳細介紹了JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02
java實現(xiàn)小i機器人api接口調(diào)用示例
這篇文章主要介紹了java實現(xiàn)小i機器人api接口調(diào)用示例,需要的朋友可以參考下2014-04-04
詳解JVM的內(nèi)存對象介紹[創(chuàng)建和訪問]
這篇文章主要介紹了JVM的內(nèi)存對象介紹[創(chuàng)建和訪問],文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03

