欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java利用CompletionService保證任務先完成先獲取到執(zhí)行結果

 更新時間:2023年08月17日 08:22:37   作者:Shawn_Shawn  
這篇文章主要為大家詳細介紹了java如何利用CompletionService來保證任務先完成先獲取到執(zhí)行結果,文中的示例代碼講解詳細,需要的可以參考下

CompletionService簡介

在學習future的時候,我們提到,future.get()方法會阻塞線程,所以如果A,B,C三個線程同時獲取執(zhí)行結果,如果A先執(zhí)行,但是A的執(zhí)行時間很長,那么即使B,C執(zhí)行很短,也無法獲取到B,C的執(zhí)行結果,因為主線程阻塞在A.get()上了。

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
// 遍歷 Future list,通過 get() 方法獲取每個 future 結果
for (Future future:futures) {
	Integer result = future.get();
	// 其他業(yè)務邏輯 如果A執(zhí)行時間很長,阻塞
}

那么如何讓B,C也有機會能夠獲取到執(zhí)行結果呢?答案就是java.util.concurrent.CompletionService

CompletionService是Java8的新增接口,JDK為其提供了一個實現(xiàn)類ExecutorCompletionService。這個類是為線程池中Task的執(zhí)行結果服務的,即為ExecutorTask返回Future而服務的。CompletionService的實現(xiàn)目標是任務先完成可優(yōu)先獲取到,即結果按照完成先后順序排序。

ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService 是 CompletionService 唯一實現(xiàn)類
CompletionService completionService = new ExecutorCompletionService<>(executorService );
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(completionService.submit(A));
futures.add(completionService.submit(B));
futures.add(completionService.submit(C));
// 遍歷 Future list,通過 get() 方法獲取每個 future 結果
for (int i = 0; i < futures.size(); i++) {
    Integer result = completionService.take().get();
    // 其他業(yè)務邏輯
}

CompletionService原理

我們來試想一下,如果是你應該如何解決上述Feture帶來的阻塞問題呢?可以通過阻塞隊列來實現(xiàn),偽代碼如下:

// 創(chuàng)建阻塞隊列
BlockingQueue<Integer> bq =
  new LinkedBlockingQueue<>();
// 任務A 異步進入阻塞隊列  
executor.execute(() -> bq.put(A.get()));
// 任務B 異步進入阻塞隊列  
executor.execute(() -> bq.put(B.get()));
// 任務C 異步進入阻塞隊列  
executor.execute(()-> bq.put(C.get()));
for (int i = 0; i < 3; i++) {
  Integer r = bq.take();
  // 異步執(zhí)行所有業(yè)務邏輯
  executor.execute(()->action(r));
}

實際上CompletionService的實現(xiàn)原理也是內(nèi)部維護了一個阻塞隊列,當任務執(zhí)行結束就把任務的執(zhí)行結果加入到阻塞隊列中,不同的是CompletionService是把任務執(zhí)行結果的Future對象加入到阻塞隊列中。

CompletionService是一個接口,submit()用于提交任務,take()和poll()用于從阻塞隊列中獲取并移除一個元素,它們的區(qū)別在于如果阻塞隊列是空的,那么調(diào)用take()方法的線程就會被阻塞,而poll()方法會返回null值。

public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

其實現(xiàn)類ExecutorCompletionService,實際上可以看做是ExecutorBlockingQueue的結合體,ExecutorCompletionService把具體的計算任務交給 Executor完成,通過BlockingQueuetake()方法獲得任務執(zhí)行的結果。

ExecutorCompletionService有兩個構造函數(shù)

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    // 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool
    // 其余框架也有實現(xiàn)了AbstractExecutorService抽象類,目前JDK里只有上述的三種實現(xiàn)
    // 如果不是,則為null
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    // 判斷executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool
    // 其余框架也有實現(xiàn)了AbstractExecutorService抽象類,目前JDK里只有上述的三種實現(xiàn)
    // 如果不是,則為null
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

兩個構造器都需要傳入Executor,如果不傳BlockingQueue<Futrue>,默認會創(chuàng)建一個LinkedBlockingQueue<Future<V>>的隊列,該BlockingQueue的作用是保存Executor執(zhí)行的結果。

submit()源碼如下:

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}
public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}

當提交一個任務到ExecutorCompletionService時,首先需要將task封裝成RunableFuture<V>,通過newTaskFor()完成,然后再將RunableFuture封裝成QueueingFuture,它是FutureTask的一個子類,然后改寫FutureTaskdone方法,之后把Executor執(zhí)行的計算結果放入BlockingQueue中。

newTaskFor()的源碼如下:

private RunnableFuture<V> newTaskFor(Callable<V> task) {
    // aes是AbstractExecutorService,其實現(xiàn)類是ThreadPoolExecutor,F(xiàn)orkJoinPool,SchedulerThreadPoolExecutor
    if (aes == null) 
        return new FutureTask<V>(task);
    else
        return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask<V>(task, result);
    else
        return aes.newTaskFor(task, result);
}

QueueingFuture的源碼如下:

private static class QueueingFuture<V> extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task,
                   BlockingQueue<Future<V>> completionQueue) {
        super(task, null);
        this.task = task;
        this.completionQueue = completionQueue;
    }
    private final Future<V> task;
    private final BlockingQueue<Future<V>> completionQueue;
    // 會被java.util.concurrent.FutureTask#finishCompletion調(diào)用,判讀是否計算完成
    // 計算結果放在阻塞隊列中
    protected void done() { completionQueue.add(task); }
}

take()poll()方法如下:

// 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會阻塞,直到有任務完成返回結果。
public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
// 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會返回null,該方法不會阻塞。
public Future<V> poll() {
    return completionQueue.poll();
}
// 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會返回null,該方法不會阻塞。
// 超時
public Future<V> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
    return completionQueue.poll(timeout, unit);
}

以上就是java利用CompletionService保證任務先完成先獲取到執(zhí)行結果的詳細內(nèi)容,更多關于java CompletionService的資料請關注腳本之家其它相關文章!

相關文章

  • 6個必備的Java并發(fā)面試種子題目合集

    6個必備的Java并發(fā)面試種子題目合集

    并發(fā)是Java面試的經(jīng)常會考到的知識點,這篇文章主要為大家整理了6個必備的Java并發(fā)面試種子題目,文中的示例代碼簡潔易懂,需要的可以學習一下
    2023-07-07
  • Java實現(xiàn)的貸款金額計算功能示例

    Java實現(xiàn)的貸款金額計算功能示例

    這篇文章主要介紹了Java實現(xiàn)的貸款金額計算功能,結合實例形式分析了Java簡單數(shù)值運算及類型轉換等相關操作技巧,需要的朋友可以參考下
    2018-01-01
  • Maven工程搭建spring boot+spring mvc+JPA的示例

    Maven工程搭建spring boot+spring mvc+JPA的示例

    本篇文章主要介紹了Maven工程搭建spring boot+spring mvc+JPA的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-01-01
  • Java入門案列之猜拳小游戲

    Java入門案列之猜拳小游戲

    這篇文章主要為大家詳細介紹了Java入門案列之猜拳小游戲,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-11-11
  • Java 在PDF中添加騎縫章示例解析

    Java 在PDF中添加騎縫章示例解析

    這篇文章主要介紹了Java 在PDF中添加騎縫章示例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件解析

    JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件解析

    這篇文章主要為大家詳細介紹了JDK動態(tài)代理之ProxyGenerator生成代理類的字節(jié)碼文件,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-02-02
  • log4j使用教程詳解(怎么使用log4j2)

    log4j使用教程詳解(怎么使用log4j2)

    Log4j 2的好處就不和大家說了,如果你搜了2,說明你對他已經(jīng)有一定的了解,并且想用它,所以這里直接就上手了
    2013-12-12
  • java實現(xiàn)小i機器人api接口調(diào)用示例

    java實現(xiàn)小i機器人api接口調(diào)用示例

    這篇文章主要介紹了java實現(xiàn)小i機器人api接口調(diào)用示例,需要的朋友可以參考下
    2014-04-04
  • Java如何判斷一個空對象的常見方法

    Java如何判斷一個空對象的常見方法

    在Java中判斷對象是否為空是一項重要的編程技巧,可以有效防止空指針異常的發(fā)生,下面這篇文章主要給大家介紹了關于利用Java如何判斷一個空對象的相關資料,需要的朋友可以參考下
    2024-01-01
  • 詳解JVM的內(nèi)存對象介紹[創(chuàng)建和訪問]

    詳解JVM的內(nèi)存對象介紹[創(chuàng)建和訪問]

    這篇文章主要介紹了JVM的內(nèi)存對象介紹[創(chuàng)建和訪問],文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-03-03

最新評論