欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中的CompletionService批量異步執(zhí)行詳解

 更新時間:2023年12月22日 09:05:54   作者:Java面試365  
這篇文章主要介紹了Java中的CompletionService批量異步執(zhí)行詳解,我們知道線程池可以執(zhí)行異步任務,同時可以通過返回值Future獲取返回值,所以異步任務大多數(shù)采用ThreadPoolExecutor+Future,需要的朋友可以參考下

前景引入

我們知道線程池可以執(zhí)行異步任務,同時可以通過返回值Future獲取返回值,所以異步任務大多數(shù)采用ThreadPoolExecutor+Future,如果存在如下情況,需要從任務一二三中獲取返回值后,保存到數(shù)據(jù)庫中,用異步邏輯實現(xiàn)代碼應該如下所示。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    Future<Integer> f1 = executorService.submit(() -> {
        System.out.println("執(zhí)行任務一");
        return 1;
    });
    Future<Integer> f2 = executorService.submit(() -> {
        System.out.println("執(zhí)行任務二");
        return 2;
    });
    Future<Integer> f3 = executorService.submit(() -> {
        System.out.println("執(zhí)行任務三");
        return 3;
    });
    Integer r1 = f1.get();
    executorService.execute(()->{
        // 省略保存r1操作
        System.out.println(r1);
    });
    Integer r2 = f2.get();
    executorService.execute(()->{
        // 省略保存r2操作
        System.out.println(r2);
    });
    Integer r3 = f3.get();
    executorService.execute(()->{
        // 省略保存r3操作
        System.out.println(r3);
    });
    executorService.shutdown();
}

這樣寫的代碼一點毛病沒有,邏輯都是正常的,但如果存在任務一查詢了比較耗時的操作,由于f1.get是阻塞執(zhí)行,那么就算任務二和任務三已經(jīng)返回結果,任務二的返回值和任務三的返回值都是不能保存到數(shù)據(jù)庫的,因為f1.get將主線程阻塞了。

批量異步實現(xiàn)

那可以如何處理呢?可以采用萬能的阻塞隊列,任務先執(zhí)行完畢的先入隊,這樣可以保證其它線程入庫的速度不受影響,提高效率。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
    Future<Integer> f1 = executorService.submit(() -> {
        System.out.println("執(zhí)行任務一");
        Thread.sleep(5000);
        return 1;
    });
    Future<Integer> f2 = executorService.submit(() -> {
        System.out.println("執(zhí)行任務二");
        return 2;
    });
    Future<Integer> f3 = executorService.submit(() -> {
        System.out.println("執(zhí)行任務三");
        Thread.sleep(3000);
        return 3;
    });
    executorService.execute(()->{
        try {
            Integer r1 = f1.get();
            // 阻塞隊列入隊操作
            queue.put(r1);
            System.out.println(r1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    executorService.execute(()->{
        try {
            Integer r2 = f2.get();
            queue.put(r2);
            System.out.println(r2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    executorService.execute(()->{
        try {
            Integer r3 = f3.get();
            queue.put(r3);
            System.out.println(r3);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    // 循環(huán)次數(shù)不要使用queue.size限制,因為不同時刻queue.size值是有可能不同的
    for (int i = 0; i <3; i++) {
        Integer integer = queue.take();
        // 省略保存integer操作
        executorService.execute(()->{
            System.out.println("保存入庫=="+integer);
        });
    }
    executorService.shutdown();
}

產(chǎn)生結果如下

image-20220307193401671

同樣的在生產(chǎn)中不建議使用,因為SDK為我們提供了工具類CompletionService,CompletionService內部就維護了一個阻塞隊列,唯一與上述代碼實現(xiàn)有所區(qū)別的是,阻塞隊列入庫的是Future對象,其余原理類似。

CompletionService

如何創(chuàng)建CompletionService

CompletionService同樣是一個接口,其具體實現(xiàn)為ExecutorCompletionService,創(chuàng)建CompletionService對象有兩種方式

public ExecutorCompletionService(Executor executor);
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)

CompletionService對象的創(chuàng)建都是需要指定線程池,如果在創(chuàng)建時沒有傳入阻塞對象,那么會采用默認的LinkedBlockingQueue無界阻塞隊列,如果應用到生產(chǎn)可能會產(chǎn)生OOM的情況,這是需要注意的。

CompletionService初體驗

CompletionService如何做到批量執(zhí)行異步任務呢,將上述場景采用CompletionService實現(xiàn)下

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletionService completionService = new ExecutorCompletionService(executorService);
    Future<Integer> f1 = completionService.submit(() -> {
        System.out.println("執(zhí)行任務一");
        Thread.sleep(5000);
        return 1;
    });
    Future<Integer> f2 = completionService.submit(() -> {
        System.out.println("執(zhí)行任務二");
        return 2;
    });
    Future<Integer> f3 = completionService.submit(() -> {
        System.out.println("執(zhí)行任務三");
        Thread.sleep(3000);
        return 3;
    });
    for (int i = 0; i <3 ; i++) {
        Future take = completionService.take();
        Integer integer = (Integer) take.get();
        executorService.execute(()->{
            System.out.println("執(zhí)行入庫=="+integer);
        });
    }
    executorService.shutdown();
}

CompletionService接口說明

CompletionService的方法不多,使用起來比較簡單,方法簽名如下

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletionService completionService = new ExecutorCompletionService(executorService);
    Future<Integer> f1 = completionService.submit(() -> {
        System.out.println("執(zhí)行任務一");
        Thread.sleep(5000);
        return 1;
    });
    Future<Integer> f2 = completionService.submit(() -> {
        System.out.println("執(zhí)行任務二");
        return 2;
    });
    Future<Integer> f3 = completionService.submit(() -> {
        System.out.println("執(zhí)行任務三");
        Thread.sleep(3000);
        return 3;
    });
    for (int i = 0; i <3 ; i++) {
        Future take = completionService.take();
        Integer integer = (Integer) take.get();
        executorService.execute(()->{
            System.out.println("執(zhí)行入庫=="+integer);
        });
    }
    executorService.shutdown();
}

總結

CompletionService主要是去解決無效等待的問題,如果一個耗時較長的任務在執(zhí)行,那么可以采用這種方式避免無效的等待

CompletionService還能讓異步任務的執(zhí)行結果有序化,先執(zhí)行完就先進入阻塞隊列。

到此這篇關于Java中的CompletionService批量異步執(zhí)行詳解的文章就介紹到這了,更多相關CompletionService批量異步執(zhí)行內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Mybatis批量修改的操作代碼

    Mybatis批量修改的操作代碼

    這篇文章主要介紹了Mybatis批量修改的操作代碼,包括整體批量修改的詳細代碼,代碼簡單易懂,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2017-09-09
  • Java if(boolean)和if(boolean=true)區(qū)別解析

    Java if(boolean)和if(boolean=true)區(qū)別解析

    這篇文章主要介紹了Java if(boolean)和if(boolean=true)區(qū)別解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-02-02
  • JavaWeb實現(xiàn)自動登錄功能

    JavaWeb實現(xiàn)自動登錄功能

    這篇文章主要為大家詳細介紹了JavaWeb實現(xiàn)自動登錄功能,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • 解決報錯:java:讀取jar包時出錯:error in opening zip file問題

    解決報錯:java:讀取jar包時出錯:error in opening zip 

    文章總結:解決Java讀取jar包時出錯的問題,通過下載源碼并刷新項目解決了問題,希望對大家有所幫助
    2024-11-11
  • Java動態(tài)批量生成logback日志文件的示例

    Java動態(tài)批量生成logback日志文件的示例

    本文主要介紹了Java動態(tài)批量生成logback日志文件的示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2025-04-04
  • mybatis?查詢返回Map<String,Object>類型

    mybatis?查詢返回Map<String,Object>類型

    本文主要介紹了mybatis?查詢返回Map<String,Object>類型,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03
  • Java實現(xiàn)后端跨域的常見解決方案

    Java實現(xiàn)后端跨域的常見解決方案

    跨源資源共享(CORS——Cross-Origin Resource Sharing,跨源資源共享,或通俗地譯為跨域資源共享)是一種基于 HTTP 頭的機制,跨域的解決方案有很多種,前后端都有,本文給大家主要介紹Java實現(xiàn)后端跨域的常見解決方案,需要的朋友可以參考下
    2024-04-04
  • SpringBoot接口防重復提交的三種解決方案

    SpringBoot接口防重復提交的三種解決方案

    在Web開發(fā)中,防止用戶重復提交表單是一個常見的需求,用戶可能會因為網(wǎng)絡延遲、誤操作等原因多次點擊提交按鈕,導致后臺接收到多個相同的請求,本文將介紹幾種在Spring Boot中實現(xiàn)接口防重復提交的方法,需要的朋友可以參考下
    2024-11-11
  • Mybatis-plus通用查詢方法封裝的實現(xiàn)

    Mybatis-plus通用查詢方法封裝的實現(xiàn)

    本文主要介紹了Mybatis-plus通用查詢方法封裝的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-07-07
  • java線程池使用后到底要關閉嗎

    java線程池使用后到底要關閉嗎

    這篇文章主要給大家介紹了關于java線程池使用后到底要不要關閉的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-01-01

最新評論