欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程工具CompletableFuture詳解

 更新時(shí)間:2024年01月22日 08:31:55   作者:SolidCocoi  
這篇文章主要介紹了Java多線程工具CompletableFuture詳解,CompletableFuture?是?java?1.8?追加的新特性,通俗的話來(lái)說(shuō),是一個(gè)函數(shù)式的,用于控制多任務(wù)同步、異步組合操作的工具,需要的朋友可以參考下

簡(jiǎn)介

CompletableFuture 是 java 1.8 追加的新特性,通俗的話來(lái)說(shuō),是一個(gè)函數(shù)式的,用于控制多任務(wù)同步、異步組合操作的工具,實(shí)現(xiàn)諸如:

  • 控制若干個(gè)線程任務(wù)間是同步還是異步
  • 控制若干個(gè)線程間的先后執(zhí)行順序、依賴關(guān)系
  • 若干個(gè)線程任務(wù),任意其中一個(gè)完成就執(zhí)行某種邏輯
  • ……

將變得十分簡(jiǎn)單。如果你對(duì)前端有一定了解,你會(huì)發(fā)現(xiàn)它和 Javascript 中的 Promise 是十分類似的。

使用方法

CompletableFuture 需要依仗線程池實(shí)現(xiàn)自身功能,這個(gè)線程池是個(gè)非必填值,如果未特殊指明,將會(huì)使用 ForkJoinPool 的實(shí)例,構(gòu)造方法為 ForkJoinPool.makeCommonPool(),該線程池大小為 Runtime.getRuntime().availableProcessors() - 1 即 當(dāng)前電腦 cpu 可用核心數(shù) -1。

常見(jiàn) API

方法名稱備注
complete標(biāo)識(shí)自身已完成任務(wù),并傳入一個(gè)參數(shù)作為 CompletableFuture.get() 將獲取的值;標(biāo)識(shí)結(jié)束是 CAS 方式設(shè)置,只有 未結(jié)束→結(jié)束 的變化才能成功,complete 操作返回 true 時(shí)才真正影響 CompletableFuture.get() 將獲取的值
completedFuture這是個(gè)靜態(tài)方法,構(gòu)造一個(gè)已完成的 CompletableFuture 對(duì)象,并以傳入的參數(shù)作為 CompletableFuture.get() 將獲取的值
get阻塞直至任務(wù)完成,并獲取該任務(wù)的返回值
join阻塞直至任務(wù)完成,并獲取該任務(wù)的返回值(幾乎與 get 等同,但 join 不會(huì)拋出檢查型異常,不強(qiáng)制要求你必須處理)
cancel標(biāo)識(shí)自身已完成,無(wú)法阻斷自身任務(wù),但會(huì)構(gòu)造 CancellationException傳給關(guān)聯(lián)在該對(duì)象后續(xù)的 CompletableFuture,后續(xù)的 CompletableFuture 會(huì)因捕獲到異常而終止任務(wù)。另:該函數(shù)入?yún)魅氲?boolean 不會(huì)產(chǎn)生任何作用( javadoc 里這么描述也是絕了)
completeExceptionally可以理解為 cancel 的可自定義異常版本,其入?yún)⒕褪莻鬟f給后續(xù) CompletableFuture 對(duì)象的異常
exceptionallyCompletableFuture 鏈路上發(fā)生異常時(shí)會(huì)觸發(fā)該方法,給鏈路的最后一個(gè) CompletableFuture 對(duì)象配置,即可對(duì)全鏈路進(jìn)行異常捕獲,其入?yún)楫惓L幚頃r(shí)需要執(zhí)行的 Function
isDoneCompletableFuture 任務(wù)是否已完成

剩下的大多 API 是 run、accept、apply、then、either、both、async …… 的組合,本質(zhì)上都是語(yǔ)法糖,用了原生的 @FunctionalInterface,決定傳入的函數(shù)有無(wú)返回值、有無(wú)入?yún)ⅰ⒃诋?dāng)前任務(wù)結(jié)束后開(kāi)始執(zhí)行、是否任意一個(gè)完成就結(jié)束、是否全部完成才結(jié)束、是否另起線程執(zhí)行任務(wù) ……

// 你的下一步業(yè)務(wù)邏輯
Runnable next = new MyRunnable();
// 因?yàn)槭?run 所以無(wú)返回值, 因?yàn)槭?then 所以在 completableFuture1 對(duì)應(yīng)的任務(wù)結(jié)束后,執(zhí)行一段任務(wù)
completableFuture1.thenRun(next);
// 因?yàn)槭?run 所以無(wú)返回值, 因?yàn)槭?then 所以在 completableFuture1 對(duì)應(yīng)的任務(wù)結(jié)束后,因?yàn)?async ,所以要另起一個(gè)線程,執(zhí)行一段任務(wù)
completableFuture1.thenRunAsync(next);

在看得懂函數(shù)式編程的情況下,其他你可通過(guò)源碼函數(shù)定義以此類推

如果想使用自定義的線程池執(zhí)行任務(wù),那么使用帶 Executor executor 的重載函數(shù)即可,后不再重復(fù)說(shuō)明,例如:

// 自定義線程池
ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20,
        2L, TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(10),
        // 你可以根據(jù)你的需要自己實(shí)現(xiàn) Handler,此處為簡(jiǎn)寫使用現(xiàn)成的 Handler,此處捕獲的是流量過(guò)載異常
        new ThreadPoolExecutor.AbortPolicy());
// 通過(guò)自定義線程池使用 CompletableFuture
CompletableFuture.runAsync(() -> {
    System.out.println("我的業(yè)務(wù)代碼");
}, executorService);

使用示例

為簡(jiǎn)潔代碼,睡眠模擬任務(wù)運(yùn)行耗時(shí)均使用下列函數(shù),后續(xù)不再重復(fù)說(shuō)明

public static void sleep(Long sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

請(qǐng)時(shí)刻注意要進(jìn)行異常處理,意味著你的 completableFuture 鏈路得保證調(diào)用過(guò)下列代碼,進(jìn)行異常處理(鏈路最后一個(gè)使用,則全鏈路可捕獲異常),后續(xù)為簡(jiǎn)潔代碼已省略

completableFuture.exceptionally((Throwable ex) -> {
            ex.printStackTrace();
			// 取決于 completableFuture 有無(wú)返回值,類型是啥。此處實(shí)例是 completableFuture 為 CompletableFuture<Long>
            return -1L;
        });

將常規(guī)線程任務(wù)轉(zhuǎn)化為 CompletableFuture 對(duì)象

CompletableFuture completableFuture = new CompletableFuture();
    Long startTs = System.currentTimeMillis();
    new Thread(() -> {
		sleep(200L);
		completableFuture.complete("完成");// 設(shè)置 completableFuture 結(jié)果并將狀態(tài)設(shè)置為已完成
    }).start();
    while (!completableFuture.isDone()) {
        // 非阻塞式獲取結(jié)果,如果當(dāng)前未執(zhí)行完成則返回入?yún)⒆址?我還沒(méi)完成"
        // 執(zhí)行到此處恰好任務(wù)完成,然后在執(zhí)行 while 循環(huán)判斷跳出,你在循環(huán)內(nèi)最多輸出一次 “完成”
        System.out.println(completableFuture.getNow("未完"));
    }
    System.out.println("最終結(jié)果:" + completableFuture.getNow("未完") + " " + (System.currentTimeMillis() - startTs) + " ms");

阻塞到任意一個(gè)任務(wù)完成

public static void 阻塞到任意一個(gè)完成() throws IOException {
        // 模擬一個(gè)耗時(shí) 20 L 的任務(wù)
        CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            sleep(20L);
            System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 20L;
        });
        // 模擬一個(gè)耗時(shí) 10 L 的任務(wù)
        CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            sleep(10L);
            System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 10L;
        });
        // applyToEitherAsync 代表另起一個(gè)線程去執(zhí)行第二個(gè)入?yún)⒌拇a塊,這里其實(shí)沒(méi)啥影響,我就不加 Async 了
        CompletableFuture result = completableFuture1.applyToEither(completableFuture2, fasterOne -> {
            System.out.println(fasterOne);
            return fasterOne;
        });
        // 除了寫回調(diào)函數(shù)方法外的另一種獲取最快值的方法
        System.out.println("最快的為:" + result.join());
        // 調(diào)用讀取行阻塞住,防止異步任務(wù)還未完成就退出了
        System.in.read();
    }

遇到特別多任務(wù)的情況下,你可以嘗試數(shù)組

CompletableFuture[] array = new CompletableFuture[2];
        array[0] = completableFuture1;
        array[1] = completableFuture2;
        CompletableFuture fasterOne = CompletableFuture.anyOf(array);
        System.out.println("最快的為:" + fasterOne.join());

阻塞到全部任務(wù)完成

public static void 阻塞到全部完成() {
        Long startTs = System.currentTimeMillis();
        // 模擬一個(gè)耗時(shí) 20 L 的任務(wù)
        CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            sleep(20L);
            System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 20L;
        });
        // 模擬一個(gè)耗時(shí) 10 L 的任務(wù)
        CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            sleep(10L);
            System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 10L;
        });
        // thenAcceptBothAsync 代表另起一個(gè)線程去執(zhí)行第二個(gè)入?yún)⒌拇a塊,這里其實(shí)沒(méi)啥影響,我就不加 Async 了
        CompletableFuture result = completableFuture1.thenAcceptBoth(completableFuture2, (r1, r2) -> {
            System.out.println("completableFuture1 :" + r1);
            System.out.println("completableFuture2 :" + r2);
        });
        // thenAcceptBoth 是沒(méi)有返回值的,所以這里是 null ,但這句代碼還是有作用的,相當(dāng)于阻塞到全部任務(wù)都完成
        System.out.println("返回這個(gè) null 之后意味著全部任務(wù)已完成:" + result.join());
    }

遇到特別多任務(wù)的情況下,你可以嘗試數(shù)組

CompletableFuture[] array = new CompletableFuture[2];
    array[0] = completableFuture1;
    array[1] = completableFuture2;
    CompletableFuture<Void> all = CompletableFuture.allOf(array);
    System.out.println("返回這個(gè) null 之后意味著全部任務(wù)已完成:" + all.join());

合并任務(wù)

合并任務(wù)會(huì)涉及到 Compose、Combine,他們區(qū)別在于合并的邏輯不同:

  • Compose: 合并的兩個(gè)任務(wù)間是同步阻塞執(zhí)行的,后一個(gè)任務(wù)需要阻塞等待第一個(gè)任務(wù)執(zhí)行完成。你需要傳入一個(gè)函數(shù) —— 已知第一個(gè)任務(wù)的返回值,返回合并之后的 CompletableFuture 對(duì)象
  • Combine: 合并的兩個(gè)任務(wù)間是異步執(zhí)行的。你需要傳入另一個(gè)任務(wù)、一個(gè)函數(shù) —— 已知兩個(gè)任務(wù)的返回值,合并成最終返回值
public static void 合并() {
        Long startTs = System.currentTimeMillis();
        // 模擬一個(gè)耗時(shí) 100 L 的任務(wù)
        CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            sleep(100L);
            System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 100L;
        });
        // 模擬一個(gè)耗時(shí) 100 L 的任務(wù)
        CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            sleep(100L);
            System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 100L;
        });
        // 模擬 completableFuture1 合并一個(gè)耗時(shí) 120 L 的任務(wù),返回值為兩個(gè)任務(wù)總工時(shí)
        // thenCombineAsync 代表另起一個(gè)線程去執(zhí)行第二個(gè)入?yún)⒌拇a塊,這里其實(shí)沒(méi)啥影響,我就不加 Async 了
        CompletableFuture<Long> completableFuture3 = completableFuture1.thenCombine(CompletableFuture.supplyAsync(() -> {
            sleep(120L);
            return 120L;
        }), (x, y) -> x + y);
        // 模擬 completableFuture2 合并一個(gè)耗時(shí) 50 L 的任務(wù),返回值為兩個(gè)任務(wù)總工時(shí)
        // thenComposeAsync 代表另起一個(gè)線程去執(zhí)行第一個(gè)入?yún)⒌拇a塊,這里其實(shí)沒(méi)啥影響,我就不加 Async 了
        CompletableFuture<Long> completableFuture4 = completableFuture2.thenCompose(r2 -> {
            CompletableFuture<Long> temp = CompletableFuture.completedFuture(r2);
            return temp.thenApply(rTemp -> {
                sleep(50L);
                return temp.join() + 50L;
            });
        });
        boolean printFlag3 = true;
        boolean printFlag4 = true;
        String completableFuture3Info = null;
        String completableFuture4Info = null;
        while (!completableFuture3.isDone() || !completableFuture4.isDone()) {
            if (completableFuture3.isDone()) {
                if (printFlag3) {
                    printFlag3 = false;
                    completableFuture3Info = "completableFuture3 完成:" + completableFuture3.join() + " --" + (System.currentTimeMillis() - startTs);
                }
            }
            if (completableFuture4.isDone()) {
                if (printFlag4) {
                    printFlag4 = false;
                    completableFuture4Info = "completableFuture4 完成:" + completableFuture4.join() + " --" + (System.currentTimeMillis() - startTs);
                }
            }
        }
        System.out.println(completableFuture3Info != null ? completableFuture3Info : "completableFuture3 actual:" + completableFuture3.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs));
        System.out.println(completableFuture4Info != null ? completableFuture4Info : "completableFuture4 actual:" + completableFuture4.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs));
    }

你會(huì)觀測(cè)到總工時(shí)更長(zhǎng)的反而實(shí)際結(jié)束時(shí)間點(diǎn)更早,completableFuture3 早于 completableFuture4

到此這篇關(guān)于Java多線程工具CompletableFuture詳解的文章就介紹到這了,更多相關(guān)多線程工具CompletableFuture內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論