欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java CompletableFuture實現(xiàn)異步編排詳解

 更新時間:2023年01月30日 10:36:07   作者:興趣使然的L  
這篇文章主要為大家介紹了java CompletableFuture實現(xiàn)異步編排詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

為什么需要異步執(zhí)行?

場景:電商系統(tǒng)中獲取一個完整的商品信息可能分為以下幾步:

①獲取商品基本信息

②獲取商品圖片信息

③獲取商品促銷活動信息

④獲取商品各種類的基本信息 等操作,如果使用串行方式去執(zhí)行這些操作,假設每個操作執(zhí)行1s,那么用戶看到完整的商品詳情就需要4s的時間,如果使用并行方式執(zhí)行這些操作,可能只需要1s就可以完成。所以這就是異步執(zhí)行的好處。

JDK5的Future接口

Future接口用于代表異步計算的結果,通過Future接口提供的方法可以查看異步計算是否執(zhí)行完成,或者等待執(zhí)行結果并獲取執(zhí)行結果,同時還可以取消執(zhí)行。

列舉Future接口的方法:

  • get():獲取任務執(zhí)行結果,如果任務還沒完成則會阻塞等待直到任務執(zhí)行完成。如果任務被取消則會拋出CancellationException異常,如果任務執(zhí)行過程發(fā)生異常則會拋出ExecutionException異常,如果阻塞等待過程中被中斷則會拋出InterruptedException異常。
  • get(long timeout,Timeunit unit):帶超時時間的get()方法,如果阻塞等待過程中超時則會拋出TimeoutException異常。
  • cancel():用于取消異步任務的執(zhí)行。如果異步任務已經(jīng)完成或者已經(jīng)被取消,或者由于某些原因不能取消,則會返回false。如果任務還沒有被執(zhí)行,則會返回true并且異步任務不會被執(zhí)行。如果任務已經(jīng)開始執(zhí)行了但是還沒有執(zhí)行完成,若mayInterruptIfRunning為true,則會立即中斷執(zhí)行任務的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務執(zhí)行線程。
  • isCanceled():判斷任務是否被取消,如果任務在結束(正常執(zhí)行結束或者執(zhí)行異常結束)前被取消則返回true,否則返回false。
  • isDone():判斷任務是否已經(jīng)完成,如果完成則返回true,否則返回false。需要注意的是:任務執(zhí)行過程中發(fā)生異常、任務被取消也屬于任務已完成,也會返回true。

使用Future接口和Callable接口實現(xiàn)異步執(zhí)行:

public static void main(String[] args) {
	// 快速創(chuàng)建線程池
	ExecutorService executorService = Executors.newFixedThreadPool(4);
	// 獲取商品基本信息(可以使用Lambda表達式簡化Callable接口,這里為了便于觀察不使用)
	Future<String> future1 = executorService.submit(new Callable<String>() {
		@Override
		public String call() throws Exception {
			return "獲取到商品基本信息";
		}
	});
	// 獲取商品圖片信息
	Future<String> future2 = executorService.submit(new Callable<String>() {
		@Override
		public String call() throws Exception {
			return "獲取商品圖片信息";
		}
	});
	// 獲取商品促銷信息
	Future<String> future3 = executorService.submit(new Callable<String>() {
		@Override
		public String call() throws Exception {
			return "獲取商品促銷信息";
		}
	});
	// 獲取商品各種類基本信息
	Future<String> future4 = executorService.submit(new Callable<String>() {
		@Override
		public String call() throws Exception {
			return "獲取商品各種類基本信息";
		}
	});
        // 獲取結果
	try {
		System.out.println(future1.get());
		System.out.println(future2.get());
		System.out.println(future3.get());
		System.out.println(future4.get());
	} catch (InterruptedException | ExecutionException e) {
		e.printStackTrace();
	}finally {
		executorService.shutdown();
	}
}

既然Future可以實現(xiàn)異步執(zhí)行并獲取結果,為什么還會需要CompletableFuture?

簡述一下Future接口的弊端:

  • 不支持手動完成
    • 當提交了一個任務,但是執(zhí)行太慢了,通過其他路徑已經(jīng)獲取到了任務結果,現(xiàn)在沒法把這個任務結果通知到正在執(zhí)行的線程,所以必須主動取消或者一直等待它執(zhí)行完成。
  • 不支持進一步的非阻塞調用
    • 通過Future的get()方法會一直阻塞到任務完成,但是想在獲取任務之后執(zhí)行額外的任務,因為 Future 不支持回調函數(shù),所以無法實現(xiàn)這個功能。
  • 不支持鏈式調用
    • 對于Future的執(zhí)行結果,想繼續(xù)傳到下一個Future處理使用,從而形成一個鏈式的pipline調用,這在 Future中無法實現(xiàn)。
  • 不支持多個 Future 合并
    • 比如有10個Future并行執(zhí)行,想在所有的Future運行完畢之后,執(zhí)行某些函數(shù),是無法通過Future實現(xiàn)的。
  • 不支持異常處理
    • Future的API沒有任何的異常處理的api,所以在異步運行時,如果出了異常問題不好定位。

使用Future接口可以通過get()阻塞式獲取結果或者通過輪詢+isDone()非阻塞式獲取結果,但是前一種方法會阻塞,后一種會耗費CPU資源,所以JDK的Future接口實現(xiàn)異步執(zhí)行對獲取結果不太友好,所以在JDK8時推出了CompletableFuture實現(xiàn)異步編排。

CompletableFuture的使用

CompletableFuture概述

JDK8中新增加了一個包含50個方法左右的類CompletableFuture,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,提供了函數(shù)式編程的能力,可以通過回調的方式處理計算結果,并且提供了轉換和組合CompletableFuture的方法。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

CompletableFuture類實現(xiàn)了Future接口和CompletionStage接口,即除了可以使用Future接口的所有方法之外,CompletionStage<T>接口提供了更多方法來更好的實現(xiàn)異步編排,并且大量的使用了JDK8引入的函數(shù)式編程概念。后面會細致的介紹常用的API。

① 創(chuàng)建CompletableFuture的方式

使用new關鍵字創(chuàng)建

// 無返回結果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回結果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知返回結果(底層其實也是帶參數(shù)的構造器賦值)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");

創(chuàng)建一個返回結果類型為String的CompletableFuture,可以使用Future接口的get()方法獲取該值(同樣也會阻塞)。

可以使用無參構造器返回一個沒有結果的CompletableFuture,也可以通過構造器的傳參CompletableFuture設置好返回結果,或者使用CompletableFuture.completedFuture(U value)構造一個已知結果的CompletableFuture。

使用CompletableFuture類的靜態(tài)工廠方法(常用)

  • runAsync() 無返回值
// 使用默認線程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 使用自定義線程池(推薦)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) 

runAsync()方法的參數(shù)是Runnable接口,這是一個函數(shù)式接口,不允許返回值。當需要異步操作且不關心返回結果的時候可以使用runAsync()方法。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通過Lambda表達式實現(xiàn)Runnable接口
        CompletableFuture.runAsync(()-> System.out.println("獲取商品基本信息成功"), executor).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
  • supplyAsync() 有返回值
// 使用默認線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 使用自定義線程池(推薦)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

supplyAsync()方法的參數(shù)是Supplier<U>供給型接口(無參有返回值),這也是一個函數(shù)式接口,U是返回結果值的類型。當需要異步操作且關心返回結果的時候,可以使用supplyAsync()方法。

// 例子
public static void main(String[] args) {
	// 快速創(chuàng)建線程池
	ExecutorService executor = Executors.newFixedThreadPool(4);
	try {
		// 通過Lambda表達式實現(xiàn)執(zhí)行內容,并返回結果通過CompletableFuture接收
		CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
			System.out.println("獲取商品信息成功");
			return "信息";
		}, executor);
		// 輸出結果
		System.out.println(completableFuture.get());
	} catch (InterruptedException | ExecutionException e) {
		e.printStackTrace();
	}finally {
		executor.shutdown();
	}
}  

關于第二個參數(shù)Executor executor說明

在沒有指定第二個參數(shù)(即沒有指定線程池)時,CompletableFuture直接使用默認的ForkJoinPool.commonPool()作為它的線程池執(zhí)行異步代碼。

在實際生產(chǎn)中會使用自定義的線程池來執(zhí)行異步代碼,具體可以參考另一篇文章深入理解線程池ThreadPoolExecutor ,里面的第二節(jié)有生產(chǎn)中怎么創(chuàng)建自定義線程的例子,可以參考一下。

② 獲得異步執(zhí)行結果

get() 阻塞式獲取執(zhí)行結果

該方法調用后如果任務還沒完成則會阻塞等待直到任務執(zhí)行完成。如果任務執(zhí)行過程發(fā)生異常則會拋出ExecutionException異常,如果阻塞等待過程中被中斷則會拋出InterruptedException異常。

get(long timeout, TimeUnit unit) 帶超時的阻塞式獲取執(zhí)行結果

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

該方法調用后如果如果任務還沒完成則會阻塞等待直到任務執(zhí)行完成或者超出timeout時間,如果阻塞等待過程中超時則會拋出TimeoutException異常。

getNow(T valueIfAbsent) 立刻獲取執(zhí)行結果

public T getNow(T valueIfAbsent)

該方法調用后,會立刻獲取結果不會阻塞等待。如果任務完成則直接返回執(zhí)行完成后的結果,如果任務沒有完成,則返回調用方法時傳入的參數(shù)valueIfAbsent值。

join() 不拋異常的阻塞時獲取執(zhí)行結果

public T join()

該方法和get()方法作用一樣,只是不會拋出異常。

complete(T value) 主動觸發(fā)計算,返回異步是否執(zhí)行完畢

public boolean complete(T value)

該方法調用后,會主動觸發(fā)計算結果,如果此時異步執(zhí)行并沒有完成(此時boolean值返回true),則通過get()拿到的數(shù)據(jù)會是complete()設置的參數(shù)value值,如果此時異步執(zhí)行已經(jīng)完成(此時boolean值返回false),則通過get()拿到的就是執(zhí)行完成的結果。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通過Lambda表達式實現(xiàn)執(zhí)行內容,并返回結果通過CompletableFuture接收
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 休眠2秒,使得異步執(zhí)行變慢,會導致主動觸發(fā)計算先執(zhí)行,此時返回的get就是555
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 666;
        }, executor);
        // 主動觸發(fā)計算,判斷異步執(zhí)行是否完成
        System.out.println(completableFuture.complete(555));
        // 輸出結果
        System.out.println(completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    true
    555
**/

③ 對執(zhí)行結果進行處理

whenComplete 等待前面任務執(zhí)行完再執(zhí)行當前處理

public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action)

在創(chuàng)建好的初始任務或者是上一個任務后通過鏈式調用該方法,會在之前任務執(zhí)行完成后繼續(xù)執(zhí)行whenComplete里的內容(whenComplete傳入的action只是對之前任務的結果進行處理),即使用該方法可以避免前面說到的Future接口的問題,不再需要通過阻塞或者輪詢的方式去獲取結果,而是通過調用該方法等任務執(zhí)行完畢自動調用。

該方法的參數(shù)為BiConsumer<? super T, ? super Throwable> action消費者接口,可以接收兩個參數(shù),一個是任務執(zhí)行完的結果,一個是執(zhí)行任務時的異常。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .whenComplete((res, ex) -> System.out.println("任務執(zhí)行完畢,結果為" + res + " 異常為" + ex)
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    任務執(zhí)行完畢,結果為666 異常為null
**/

除了上述的方法外,還有一些類似的方法如XXXAsync()或者是XXXAsync(XX,Executor executor),對于這些方法,這里統(tǒng)一說明,后續(xù)文章中將不會再列舉

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor)

XXXAsync():表示上一個任務執(zhí)行完成后,不會再使用之前任務中的線程,而是重新使用從默認線程(ForkJoinPool 線程池)中重新獲取新的線程執(zhí)行當前任務。

XXXAsync(XX,Executor executor):表示不會沿用之前任務的線程,而是使用自己第二個參數(shù)指定的線程池重新獲取線程執(zhí)行當前任務。

④ 對執(zhí)行結果進行消費

thenRun 前面任務執(zhí)行完后執(zhí)行當前任務,不關心前面任務的結果,也沒返回值

public CompletableFuture<Void> thenRun(Runnable action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像這樣鏈式調用該方法表示:執(zhí)行任務A完成后接著執(zhí)行任務B,但是任務B不需要A的結果,并且執(zhí)行完任務B也不會返回結果。

thenRun(Runnable action)的參數(shù)為Runnable接口即沒有傳入?yún)?shù)。

// 例子
public static void main(String[] args) {
	// 快速創(chuàng)建線程池
	ExecutorService executor = Executors.newFixedThreadPool(4);
	try {
		CompletableFuture.supplyAsync(() -> 666, executor)
                    .thenRun(() -> System.out.println("我都沒有參數(shù)怎么拿到之前的結果,我也沒有返回值。")
                );
	} catch (Exception e) {
		e.printStackTrace();
	}finally {
		executor.shutdown();
	}
}
/**
輸出結果:
    我都沒有參數(shù)怎么拿到之前的結果,我也沒有返回值。
**/

thenAccept 前面任務執(zhí)行完后執(zhí)行當前任務,消費前面的結果,沒有返回值

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像這樣鏈式調用該方法表示:執(zhí)行任務A完成后接著執(zhí)行任務B,而且任務B需要A的結果,但是執(zhí)行完任務B不會返回結果。

thenAccept(Consumer<? super T> action)的參數(shù)為消費者接口,即可以傳入一個參數(shù),該參數(shù)為上一個任務的執(zhí)行結果。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenAccept((res) -> System.out.println("我能拿到上一個的結果" + res + ",但是我沒法傳出去。")
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    我能拿到上一個的結果666,但是我沒法傳出去。
**/

thenApply 前面任務執(zhí)行完后執(zhí)行當前任務,消費前面的結果,具有返回值

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像這樣鏈式調用該方法表示:執(zhí)行任務A完成后接著執(zhí)行任務B,而且任務B需要A的結果,并且執(zhí)行完任務B需要有返回結果。

thenApply(Function<? super T,? extends U> fn)的參數(shù)為函數(shù)式接口,即可以傳入一個參數(shù)類型為T,該參數(shù)是上一個任務的執(zhí)行結果,并且函數(shù)式接口需要有返回值,類型為U。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {
                        System.out.println("我能拿到上一個的結果" + res + "并且我要將結果傳出去");
                        return res;
                    }
                ).whenComplete((res, ex) -> System.out.println("結果" + res));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    我能拿到上一個的結果666并且我要將結果傳出去
    結果666
**/

⑤ 異常處理

exceptionally 異常捕獲,只消費前面任務中出現(xiàn)的異常信息,具有返回值

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

可以通過鏈式調用該方法來獲取異常信息,并且具有返回值。如果某一個任務出現(xiàn)異常被exceptionally捕獲到則剩余的任務將不會再執(zhí)行。類似于Java異常處理的catch。

exceptionally(Function<Throwable, ? extends T> fn)的參數(shù)是函數(shù)式接口,具有一個參數(shù)以及返回值,該參數(shù)為前面任務的異常信息。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> {
                    if (Math.random() < 0.5) throw new RuntimeException("error");
                    return 666;
                }, executor)
                .thenApply((res) -> {
                    System.out.println("不出現(xiàn)異常,結果為" + res);
                    return res;
                }).exceptionally((ex) -> {
                    ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
// 這是不出現(xiàn)異常的情況
不出現(xiàn)異常,結果為666
// 這是出現(xiàn)異常的情況
java.util.concurrent.CompletionException: java.lang.RuntimeException: error
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: error
        at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
**/

handle 異常處理,消費前面的結果及異常信息,具有返回值,不會中斷后續(xù)任務

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

可以通過鏈式調用該方法可以跟thenApply()一樣可以消費前面任務的結果并完成自己任務內容,并且具有返回值。不同之處在于出現(xiàn)異常也可以接著往下執(zhí)行,根據(jù)異常參數(shù)做進一步處理。

handle(BiFunction<? super T, Throwable, ? extends U> fn)的參數(shù)是消費者接口,一個參數(shù)是任務執(zhí)行結果,一個是異常信息,并且具有返回值。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {
                    if (Math.random() < 0.5) throw new RuntimeException("error");
                    return res;
                }).handle((res, ex) -> {
                    System.out.println("結果" + res + "(null表示之前出現(xiàn)異常導致結果無法傳過來)");
                    return res == null ? -1 : res;
                }).thenApply((res) -> {
                    System.out.println("結果為" + res + "(-1表示之前出現(xiàn)異常,經(jīng)過handler使得結果處理成-1)");
                    return res;
                }).exceptionally((ex) -> {
                    ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
// 這是不出現(xiàn)異常的情況
結果666(null表示之前出現(xiàn)異常導致結果無法傳過來)
結果為666(-1表示之前出現(xiàn)異常,經(jīng)過handler使得結果處理成-1)
// 這是出現(xiàn)異常的情況
結果null(null表示之前出現(xiàn)異常導致結果無法傳過來)
結果為-1(-1表示之前出現(xiàn)異常,經(jīng)過handler使得結果處理成-1)
**/

可以看到通過handle類似于Java異常處理的finally,出現(xiàn)異常并不會像使用exceptionally那樣中斷后續(xù)的任務,而是繼續(xù)執(zhí)行,可以通過handle為之前出現(xiàn)異常無法獲得的結果重新賦值(根據(jù)業(yè)務需求設置安全值之類的)。

⑥ 兩組任務按順序執(zhí)行

thenCompose 實現(xiàn)兩組任務按前后順序執(zhí)行

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn)

A.thenCompose(B)相當于任務A要排在任務B前面,即順序的執(zhí)行任務A、任務B。該方法的參數(shù)是函數(shù)式接口,函數(shù)式接口的參數(shù)是調用者的執(zhí)行結果,返回值是另一個任務B。

public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務A先執(zhí)行結果為666");
            return 666;
        }, executor);
        actionA.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {
            System.out.println("任務B后執(zhí)行結果加上333");
            return 333 + res;
        })).whenComplete((res, ex) -> System.out.println(res));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    任務A先執(zhí)行結果為666
    任務B后執(zhí)行結果加上333
    999
**/

⑦ 兩組任務誰快用誰

applyToEither 比較兩組任務執(zhí)行速度,誰快消費誰的執(zhí)行結果

public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn)

該方法用于比較兩組任務的執(zhí)行速度,誰先執(zhí)行完就用誰的執(zhí)行結果。

傳入?yún)?shù)說明:第一個參數(shù)傳入的是另一個任務的執(zhí)行內容,第二個參數(shù)傳入的是最終這兩個任務誰快返回誰的結果,并通過當前函數(shù)式接口進行接收和處理(使用函數(shù)式接口,有參且有返回值)。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務A等待久一點,執(zhí)行結果為555");
            return 555;
        }, executor);
        actionA.applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任務B很快,執(zhí)行結果為666");
            return 666;
        }), (res) -> {
            System.out.println("最終使用的執(zhí)行結果為" + res);
            return res;
        });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    任務B很快,執(zhí)行結果為666
    最終使用的執(zhí)行結果為666
    任務A等待久一點,執(zhí)行結果為555
**/

除了applyToEither對任務最終結果進行獲取并消費,并且具有返回值的方法外,還有兩個類似的方法。

// 這個方法效果和上面的一樣,比誰快拿誰的結果,不同的是這個方法只消費不具有返回值
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action)
// 這個方法效果和上面的一樣,比誰快拿誰的結果,不同的是這個方法不消費結果也不具有返回值
public CompletableFuture<Void> runAfterEither(
        CompletionStage<?> other, Runnable action)

⑧ 兩組任務完成后合并

thenCombine 等待兩組任務執(zhí)行完畢后,合并兩組任務的執(zhí)行結果

 public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

該方法用于兩組任務都完成后,將兩組任務的執(zhí)行結果一起交給當前方法的BiFunction處理。先完成的任務會等待后者任務完成。

傳入?yún)?shù)說明:第一個參數(shù)傳入的是另一個任務的執(zhí)行內容,第二個參數(shù)傳入的是帶兩個參數(shù)的函數(shù)式接口(第一個參數(shù)是任務1的執(zhí)行結果,第二個參數(shù)是任務2的執(zhí)行結果,具有返回值)。

// 例子
public static void main(String[] args) {
	// 快速創(chuàng)建線程池
	ExecutorService executor = Executors.newFixedThreadPool(4);
	try {
		CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
			try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
			System.out.println("任務A等待久一點,執(zhí)行結果為333");
			return 333;
		}, executor);
		CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
			System.out.println("任務B很快,執(zhí)行結果為666");
			return 666;
		}, executor);
		actionA.thenCombine(actionB, (res1, res2) -> {
			System.out.println("最終使用的執(zhí)行結果為" + (res1 + res2));
			return res1 + res2;
		});
	} catch (Exception e) {
		e.printStackTrace();
	}finally {
		executor.shutdown();
	}
}
/**
輸出結果:
    任務B很快,執(zhí)行結果為666
    任務A等待久一點,執(zhí)行結果為333
    最終使用的執(zhí)行結果為999
**/

除了thenCombine對任務最終結果進行獲取并消費,并且具有返回值的方法外,還有兩個類似的方法。

// 這個方法效果和上面的一樣,獲取合并結果,不同的是這個方法只消費不具有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
	CompletionStage<? extends U> other,
	BiConsumer<? super T, ? super U> action)
// 這個方法效果和上面的一樣,獲取合并結果,不同的是這個方法不消費結果也不具有返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action)

⑨ 多任務組合

allOf 實現(xiàn)并行地執(zhí)行多個任務,等待所有任務執(zhí)行完成(無需考慮執(zhí)行順序)

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

該方法可以實現(xiàn)并行地執(zhí)行多個任務,適用于多個任務沒有依賴關系,可以互相獨立執(zhí)行的,傳入?yún)?shù)為多個任務,沒有返回值。

allOf()方法會等待所有的任務執(zhí)行完畢再返回,可以通過get()阻塞確保所有任務執(zhí)行完畢

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務A等待2秒后執(zhí)行完畢");
        }, executor);
        CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {
            System.out.println("任務B很快執(zhí)行完畢");
        }, executor);
        CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務C等待1秒后執(zhí)行完畢");
        }, executor);
        CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務D等待5秒后執(zhí)行完畢");
        }, executor);
        CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    任務B很快執(zhí)行完畢
    任務C等待1秒后執(zhí)行完畢
    任務A等待2秒后執(zhí)行完畢
    任務D等待5秒后執(zhí)行完畢
**/

anyOf 實現(xiàn)并行地執(zhí)行多個任務,只要有個一個完成的便會返回執(zhí)行結果

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

該方法可以實現(xiàn)并行地執(zhí)行多個任務,傳入?yún)?shù)為多個任務,具有返回值。該方法不會等待所有任務執(zhí)行完成后再返回結果,而是當有一個任務完成時,便會返回那個任務的執(zhí)行結果。

// 例子
public static void main(String[] args) {
    // 快速創(chuàng)建線程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務A等待2秒后執(zhí)行完畢");
            return 555;
        }, executor);
        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務B很快執(zhí)行完畢");
            return 666;
        }, executor);
        CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務C等待1秒后執(zhí)行完畢");
            return 999;
        }, executor);
        CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任務D等待5秒后執(zhí)行完畢");
            return 888;
        }, executor);
        System.out.println("最先執(zhí)行完的返回結果為" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
輸出結果:
    任務B很快執(zhí)行完畢
    最先執(zhí)行完的返回結果為666
    任務C等待1秒后執(zhí)行完畢
    任務A等待2秒后執(zhí)行完畢
    任務D等待5秒后執(zhí)行完畢
**/

一個使用CompletableFuture異步編排的例子

不需要關心例子中的業(yè)務內容,使用時按照自己業(yè)務的需求,對不同的需求調用不同API即可。

編寫任務時主要關心以下幾點:

① 是否需要消費之前任務的結果

② 是否需要返回結果給其他任務消費

③ 是否要求順序執(zhí)行(是否允許并行,有沒有前置要求)

/**
 * 該方法用于獲取單個商品的所有信息
 * 1. 商品的基本信息
 * 2. 商品的圖片信息
 * 3. 商品的銷售屬性組合
 * 4. 商品的各種分類基本信息
 * 5. 商品的促銷信息
 */
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
	// 創(chuàng)建商品Vo通過各個任務去完善Vo的信息
	SkuItemVo skuItemVo = new SkuItemVo();
	// 獲取商品基本信息 查詢到后設置進Vo中,返回基本信息給后續(xù)任務消費 (使用自定義的線程池進行異步)
	CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
		SkuInfoEntity info = this.getById(skuId);
		skuItemVo.setInfo(info);
		return info;
	}, executor);
	// 獲取商品的圖片信息 獲取后設置進Vo中,此處不需要消費圖片信息,也不需要返回結果。所以使用runAsync即可
	CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
		List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
		skuItemVo.setImages(imagesEntities);
	}, executor);
	// 獲取商品銷售屬性 因為要利用之前查詢到的基本信息,但后續(xù)任務不需要消費銷售屬性(不需要返回結果),所以使用thenAcceptAsync消費之前的基本信息,不返回銷售信息。
	CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
		List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
		skuItemVo.setSaleAttr(saleAttrVos);
	}, executor);
	// 獲取商品各分類基本信息,同樣要消費之前的基本信息,但無需返回,所以使用thenAcceptAsync即可
	CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
		SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
		skuItemVo.setDesc(spuInfoDescEntity);
	}, executor);
	// 獲取商品的促銷信息 這個也不需要消費之前任務的結果,也不需要返回結果。所以直接使用runAsync即可
	CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
		R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
		if (skuSeckilInfo.getCode() == 0) {
			SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
			});
			skuItemVo.setSeckillSkuVo(seckilInfoData);
			if (seckilInfoData != null) {
				long currentTime = System.currentTimeMillis();
				if (currentTime > seckilInfoData.getEndTime()) {
					skuItemVo.setSeckillSkuVo(null);
				}
			}
		}
	}, executor);
	// 使用allOf()組合所有任務,并且使用get()阻塞,等待所有任務完成。
        // 補充:infoFuture不能放入allOf中,因為allOf是并行無序執(zhí)行(需要多個條件是無依賴性的)的,當上面任務中有需要消費infoFuture的結果,所以需要先執(zhí)行infoFuture。
	CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get();
	// 最后返回商品Vo
	return skuItemVo;
}

以上就是java CompletableFuture實現(xiàn)異步編排詳解的詳細內容,更多關于java CompletableFuture異步編排的資料請關注腳本之家其它相關文章!

相關文章

  • Mybatis mapper接口動態(tài)代理開發(fā)步驟解析

    Mybatis mapper接口動態(tài)代理開發(fā)步驟解析

    這篇文章主要介紹了Mybatis mapper接口動態(tài)代理開發(fā)步驟解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-07-07
  • Java postgresql數(shù)組字段類型處理方法詳解

    Java postgresql數(shù)組字段類型處理方法詳解

    這篇文章主要介紹了Java postgresql數(shù)組字段類型處理方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • SpringBoot動態(tài)定時功能實現(xiàn)方案詳解

    SpringBoot動態(tài)定時功能實現(xiàn)方案詳解

    在SpringBoot項目中簡單使用定時任務,不過由于要借助cron表達式且都提前定義好放在配置文件里,不能在項目運行中動態(tài)修改任務執(zhí)行時間,實在不太靈活。現(xiàn)在我們就來實現(xiàn)可以動態(tài)修改cron表達式的定時任務,感興趣的可以了解一下
    2022-11-11
  • elasticsearch通過guice注入Node組裝啟動過程

    elasticsearch通過guice注入Node組裝啟動過程

    這篇文章主要為大家介紹了?elasticsearch通過guice注入Node組裝啟動過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-04-04
  • 自己編寫IOC控制反轉及AOP面向切面

    自己編寫IOC控制反轉及AOP面向切面

    本文展示通過一個案例來自己手寫IOC和AOP代碼,通過銀行轉賬案例詳細的代碼編寫和文檔解釋來說明IOC和AOP的思想,會分享存在的問題和解決問題的思路
    2021-06-06
  • Java數(shù)據(jù)結構(線性表)詳解

    Java數(shù)據(jù)結構(線性表)詳解

    本文主要介紹了Java數(shù)據(jù)結構(線性表)的相關知識。具有很好的參考價值,下面跟著小編一起來看下吧
    2017-01-01
  • SpringMvc定制化深入探究原理

    SpringMvc定制化深入探究原理

    SpringMVC是一種基于Java,實現(xiàn)了Web MVC設計模式,請求驅動類型的輕量級Web框架,即使用了MVC架構模式的思想,將Web層進行職責解耦,這篇文章主要介紹了SpringMvc定制化原理
    2022-10-10
  • Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法

    Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法

    這篇文章主要介紹了Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • Java中類的定義和初始化示例詳解

    Java中類的定義和初始化示例詳解

    這篇文章主要給大家介紹了關于Java中類的定義和初始化的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-01-01
  • spring boot項目application.properties文件存放及使用介紹

    spring boot項目application.properties文件存放及使用介紹

    這篇文章主要介紹了spring boot項目application.properties文件存放及使用介紹,我們的application.properties文件中會有很多敏感信息,大家在使用過程中要多加小心
    2021-06-06

最新評論