java異步編程CompletableFuture使用示例詳解
一、簡(jiǎn)單介紹
CompletableFuture 同時(shí)實(shí)現(xiàn)了 Future 和 CompletionStage 接口。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture 除了提供了更為好用和強(qiáng)大的 Future 特性之外,還提供了函數(shù)式編程的能力。

Future 接口有 5 個(gè)方法:
boolean cancel(boolean mayInterruptIfRunning) :嘗試取消執(zhí)行任務(wù)。
boolean isCancelled() :判斷任務(wù)是否被取消。
boolean isDone() : 判斷任務(wù)是否已經(jīng)被執(zhí)行完成。
get() :等待任務(wù)執(zhí)行完成并獲取運(yùn)算結(jié)果。
get(long timeout, TimeUnit unit) :多了一個(gè)超時(shí)時(shí)間。

CompletionStage<T> 接口中的方法比較多,CompletableFuture 的函數(shù)式能力就是這個(gè)接口賦予的。從這個(gè)接口的方法參數(shù)你就可以發(fā)現(xiàn)其大量使用了 Java8 引入的函數(shù)式編程。
二、常見操作
1、使用默認(rèn)線程池
// 使用默認(rèn)的線程池,F(xiàn)orkJoinPool
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println("come in");
System.out.println("是否為守護(hù)線程: " + Thread.currentThread().isDaemon());
System.out.println("當(dāng)前線程名稱:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("is ending");
return "hello";
});
System.out.println("I am main thread");
輸出如下:
I am main thread
come in
是否為守護(hù)線程: true
從輸出結(jié)果可以看出,使用默認(rèn)的線程池,啟動(dòng)是線程是守護(hù)線程,如果主線程執(zhí)行完畢,會(huì)導(dǎo)致守護(hù)線程結(jié)束。 導(dǎo)致 is ending 無(wú)法輸出。
2、使用自定義線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 使用自定義線程池
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println("come in");
System.out.println("是否為守護(hù)線程: " + Thread.currentThread().isDaemon());
System.out.println("當(dāng)前線程名稱:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("is ending");
return "hello";
},executorService);
System.out.println("I am main thread");
// 關(guān)閉線程池
executorService.shutdown();
輸出如下:
I am main thread
come in
是否為守護(hù)線程: false
當(dāng)前線程名稱:pool-1-thread-1
is ending
可以看出,使用自定義線程池,創(chuàng)建出來(lái)的線程不是守護(hù)線程。
3、獲取線程的執(zhí)行結(jié)果
1、 get()
會(huì)阻塞當(dāng)前主線程,等待任務(wù)線程執(zhí)行完成,獲取任務(wù)線程結(jié)果
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 使用自定義線程池
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println("come in");
System.out.println("是否為守護(hù)線程: " + Thread.currentThread().isDaemon());
System.out.println("當(dāng)前線程名稱:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("is ending");
return "hello";
},executorService);
System.out.println("I am main thread");
System.out.println("等待線程執(zhí)行結(jié)束");
// 阻塞等待任務(wù)執(zhí)行完成。
String s = completableFuture.get();
System.out.println("線程執(zhí)行結(jié)果為:" + s);
executorService.shutdown();
輸出為:
I am main thread
等待線程執(zhí)行結(jié)束
come in
是否為守護(hù)線程: false
當(dāng)前線程名稱:pool-1-thread-1
is ending
線程執(zhí)行結(jié)果為:hello
2、getNow(defaultValue)
如果線程執(zhí)行完成,獲取線程的執(zhí)行結(jié)果,如果沒有執(zhí)行完,獲取傳遞的默認(rèn)結(jié)果值 defaultValue 。
3、whenComplete(BiConsumer<? super T, ? super Throwable> action)
異步回調(diào),獲取線程的執(zhí)行結(jié)果。
completableFuture.whenComplete((v,e)->{
// 沒有異常
if (e == null) {
System.out.println("執(zhí)行結(jié)果為:" + v);
}
});
異常返回的處理。
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 使用自定義線程池
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println("come in");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 10 / 0;
return "hello";
},executorService);
completableFuture.whenComplete((v,e)->{
// 沒有異常
if (e == null) {
System.out.println("執(zhí)行結(jié)果為:" + v);
} else {
System.out.println(e.getMessage());
System.out.println("異常了,要處理拉");
}
});
executorService.shutdown();
輸出結(jié)果如下:
come in
java.lang.ArithmeticException: / by zero
異常了,要處理拉
三、處理異步結(jié)算的結(jié)果
當(dāng)我們獲取到異步計(jì)算的結(jié)果之后,還可以對(duì)其進(jìn)行進(jìn)一步的處理,比較常用的方法有下面幾個(gè):
thenApply()
thenAccept()
thenRun()
whenComplete()
thenApply() 方法接受一個(gè) Function 實(shí)例,用它來(lái)處理結(jié)果。
CompletableFuture.supplyAsync(()->{
return 10;
}).thenApply((s)->{
return s + 20;
}).thenApply(s->{
return s+ 30;
}).whenComplete((v,e)->{
if (e == null) {
System.out.println("結(jié)果為:" + v);
}
});
// 等待線程執(zhí)行完畢
Thread.sleep(2000);
輸出結(jié)果為:
結(jié)果為:60
如果你不需要從回調(diào)函數(shù)中獲取返回結(jié)果,可以使用 thenAccept() 或者 thenRun()。這兩個(gè)方法的區(qū)別在于 thenRun() 不能訪問(wèn)異步計(jì)算的結(jié)果。
四、異常處理
可以通過(guò) handle() 方法來(lái)處理任務(wù)執(zhí)行過(guò)程中可能出現(xiàn)的拋出異常的情況。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Computation error!");
}
return "hello!";
}).handle((res, ex) -> {
// res 代表返回的結(jié)果
// ex 的類型為 Throwable ,代表拋出的異常
return res != null ? res : ex.getMessage();
});
System.out.println(future.get());
輸出如下:
java.lang.RuntimeException: Computation error!
如果你想讓 CompletableFuture 的結(jié)果就是異常的話,可以使用 completeExceptionally() 方法為其賦值。
五、組合 CompletableFuture
你可以使用 thenCompose() 按順序鏈接兩個(gè) CompletableFuture 對(duì)象。
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "hello!")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world!"));
System.out.println(future.get());
輸出如下:
hello!world!
在實(shí)際開發(fā)中,這個(gè)方法還是非常有用的。比如說(shuō),我們先要獲取用戶信息然后再用用戶信息去做其他事情。
和 thenCompose() 方法類似的還有 thenCombine() 方法, thenCombine() 同樣可以組合兩個(gè) CompletableFuture 對(duì)象。
thenCompose() 可以兩個(gè) CompletableFuture 對(duì)象,并將前一個(gè)任務(wù)的返回結(jié)果作為下一個(gè)任務(wù)的參數(shù),它們之間存在著先后順序。
thenCombine() 會(huì)在兩個(gè)任務(wù)都執(zhí)行完成后,把兩個(gè)任務(wù)的結(jié)果合并。兩個(gè)任務(wù)是并行執(zhí)行的,它們之間并沒有先后依賴順序。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "hello!")
// 兩個(gè)任務(wù)并行執(zhí)行,執(zhí)行完,進(jìn)行合并操作
.thenCombine(CompletableFuture.supplyAsync(
() -> "world!"), (s1, s2) -> s1 + s2)
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "nice!"));
System.out.println(completableFuture.get());
輸出如下:
hello!world!nice!
六、并行運(yùn)行多個(gè) CompletableFuture
你可以通過(guò) CompletableFuture 的 allOf()這個(gè)靜態(tài)方法來(lái)并行運(yùn)行多個(gè) CompletableFuture 。
實(shí)際項(xiàng)目中,我們經(jīng)常需要并行運(yùn)行多個(gè)互不相關(guān)的任務(wù),這些任務(wù)之間沒有依賴關(guān)系,可以互相獨(dú)立地運(yùn)行。
比說(shuō)我們要讀取處理 6 個(gè)文件,這 6 個(gè)任務(wù)都是沒有執(zhí)行順序依賴的任務(wù),但是我們需要返回給用戶的時(shí)候?qū)⑦@幾個(gè)文件的處理的結(jié)果進(jìn)行統(tǒng)計(jì)整理。像這種情況我們就可以使用并行運(yùn)行多個(gè) CompletableFuture 來(lái)處理。
示例代碼如下:
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定義業(yè)務(wù)操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定義業(yè)務(wù)操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);
try {
headerFuture.join();
} catch (Exception ex) {
......
}
System.out.println("all done. ");
------
經(jīng)常和 allOf() 方法拿來(lái)對(duì)比的是 anyOf() 方法。
allOf() 方法會(huì)等到所有的 CompletableFuture 都運(yùn)行完成之后再返回 anyOf() 方法不會(huì)等待所有的 CompletableFuture 都運(yùn)行完成之后再返回,只要有一個(gè)執(zhí)行完成即可!
七、案例
模擬:平臺(tái)商城,查詢商品價(jià)格
class PlatShopping {
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
// 平臺(tái)名稱
private String name;
public PlatShopping(String name) {
this.name = name;
}
/**
* 獲取書籍名稱
* @param bookName 書名
* @return
*/
public Double getPrice(String bookName) {
try {
// 模擬查詢
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Math.random() * 100;
}
1、從多個(gè)平臺(tái)獲取書價(jià)格
構(gòu)造多個(gè)平臺(tái)查詢價(jià)格
public static void main(String[] args) throws
InterruptedException,
ExecutionException {
PlatShopping jd = new PlatShopping("京東");
PlatShopping taobao = new PlatShopping("淘寶");
PlatShopping tianmao= new PlatShopping("天貓");
List<PlatShopping> platShoppings = new ArrayList<>();
platShoppings.add(jd);
platShoppings.add(taobao);
platShoppings.add(tianmao);
long c1 = currentTimeMillis();
String allPlatPrice = getAllPlatPrice(platShoppings,"java高級(jí)編程");
System.out.println(String.format("總耗時(shí):%d", currentTimeMillis()- c1));
System.out.println(allPlatPrice);
}
獲取所有的書價(jià)格結(jié)果(順序執(zhí)行)
private static String getAllPlatPrice(List<PlatShopping> platShoppings, String bookName) {
return platShoppings.stream().map(p->{
Double price = p.getPrice(bookName);
String data = format("%s 上 %s 的價(jià)格 %.2f", p.getName(), bookName, price);
return data;
}).collect(Collectors.joining("\n"));
}
輸出結(jié)果如下: 效率低
總耗時(shí):3077
京東 上 java高級(jí)編程 的價(jià)格 60.47
淘寶 上 java高級(jí)編程 的價(jià)格 89.12
天貓 上 java高級(jí)編程 的價(jià)格 79.15
使用并行,硬編碼處理。
private static String getAllPlatPrice(List<PlatShopping> platShoppings, String bookName) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(0);
Double price = p.getPrice(bookName);
String data = format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(1);
Double price = p.getPrice(bookName);
String data = format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(2);
Double price = p.getPrice(bookName);
String data = format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
return task1.thenCombineAsync(task2,(s1,s2)->{
return s1 + s2;
}).thenCombineAsync(task3,(s1,s2) -> {
return s1 + s2;
}).join();
}
}
輸出結(jié)果如下:
總耗時(shí):1065
京東 上 java高級(jí)編程 的價(jià)格 65.41
淘寶 上 java高級(jí)編程 的價(jià)格 35.13
天貓 上 java高級(jí)編程 的價(jià)格 19.70
使用allOf
private static String getAllPlatPrice(List<PlatShopping> platShoppings, String bookName) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(0);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(1);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(2);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
StringBuilder stringBuilder = new StringBuilder();
// 等待三個(gè)都執(zhí)行完成
CompletableFuture.allOf(task1, task2, task3).join();
// 獲取結(jié)果
stringBuilder.append(task1.join()).append(task2.join()).append(task3.join());
return stringBuilder.toString();
}
輸出結(jié)果如下:
總耗時(shí):1064
京東 上 java高級(jí)編程 的價(jià)格 46.49
淘寶 上 java高級(jí)編程 的價(jià)格 4.59
天貓 上 java高級(jí)編程 的價(jià)格 74.47
使用流
private static String getAllPlatPrice(List<PlatShopping> platShoppings, String bookName) {
// 批量提交任務(wù),返回并行任務(wù)的集合列表
List<CompletableFuture<String>> completableFutureList = platShoppings.stream().map(p -> {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String data = String.format("%s 上 %s 的價(jià)格 %.2f", p.getName(), bookName, p.getPrice(bookName));
return data;
});
return completableFuture;
}).collect(Collectors.toList());
// 分別獲取每個(gè)任務(wù)的結(jié)果,再進(jìn)行聚合
String result = completableFutureList.stream().map(c -> c.join())
.collect(Collectors.joining("\n"));
return result;
}
運(yùn)行結(jié)果如下:
總耗時(shí):1062
京東 上 java高級(jí)編程 的價(jià)格 69.99
淘寶 上 java高級(jí)編程 的價(jià)格 58.18
天貓 上 java高級(jí)編程 的價(jià)格 5.05
2、從任意一個(gè)平臺(tái)獲取結(jié)果就返回
構(gòu)造多平臺(tái),
public static void main(String[] args) throws
InterruptedException,
ExecutionException {
PlatShopping jd = new PlatShopping("京東");
PlatShopping taobao = new PlatShopping("淘寶");
PlatShopping tianmao= new PlatShopping("天貓");
List<PlatShopping> platShoppings = new ArrayList<>();
platShoppings.add(jd);
platShoppings.add(taobao);
platShoppings.add(tianmao);
long c1 = currentTimeMillis();
String onePrice = getOnePrice(platShoppings,"java高級(jí)編程");
System.out.println(String.format("總耗時(shí):%d", currentTimeMillis()- c1));
System.out.println(onePrice);
}
使用anyOf
private static String getOnePrice(List<PlatShopping> platShoppings, String bookName) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(0);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(1);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(2);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
Object join = CompletableFuture.anyOf(task1, task2, task3).join();
return (String) join;
}
輸出如下:
總耗時(shí):1063
京東 上 java高級(jí)編程 的價(jià)格 4.93
使用 applyToEitherAsync
private static String getOnePrice(List<PlatShopping> platShoppings, String bookName) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(0);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
});
String result = task1.applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(1);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
}), (t) -> t).applyToEitherAsync(
CompletableFuture.supplyAsync(() -> {
PlatShopping p = platShoppings.get(2);
Double price = p.getPrice(bookName);
String data = String.format("%s 上 %s 的價(jià)格 %.2f \n", p.getName(), bookName, price);
return data;
}),
(t) -> t
).join();
return result;
}
輸出如下:
總耗時(shí):1063
京東 上 java高級(jí)編程 的價(jià)格 52.31
另外,建議可以看看京東的 asyncToolopen in new window 這個(gè)并發(fā)框架,里面大量使用到了 CompletableFuture 。
以上就是java異步編程CompletableFuture使用示例詳解的詳細(xì)內(nèi)容,更多關(guān)于java異步CompletableFuture的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java線程實(shí)現(xiàn)時(shí)間動(dòng)態(tài)顯示
這篇文章主要為大家詳細(xì)介紹了Java線程實(shí)現(xiàn)時(shí)間動(dòng)態(tài)顯示,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-04-04
使用spring-cache一行代碼解決緩存擊穿問(wèn)題
本文主要介紹了使用spring-cache一行代碼解決緩存擊穿問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
MyBatis不用@Param傳遞多個(gè)參數(shù)的操作
這篇文章主要介紹了MyBatis不用@Param傳遞多個(gè)參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02
解決IDEA service層跳轉(zhuǎn)實(shí)現(xiàn)類的快捷圖標(biāo)消失問(wèn)題
這篇文章主要介紹了解決IDEA service層跳轉(zhuǎn)實(shí)現(xiàn)類的快捷圖標(biāo)消失問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02
詳解Java兩種方式簡(jiǎn)單實(shí)現(xiàn):爬取網(wǎng)頁(yè)并且保存
本篇文章主要介紹了Java兩種方式簡(jiǎn)單實(shí)現(xiàn):爬取網(wǎng)頁(yè)并且保存 ,主要用UrlConnection、HttpClient爬取實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2016-12-12
為什么阿里要慎重使用ArrayList中的subList方法
這篇文章主要介紹了為什么要慎重使用ArrayList中的subList方法,subList是List接口中定義的一個(gè)方法,該方法主要用于返回一個(gè)集合中的一段、可以理解為截取一個(gè)集合中的部分元素,他的返回值也是一個(gè)List。,需要的朋友可以參考下2019-06-06
詳解eclipse創(chuàng)建maven項(xiàng)目實(shí)現(xiàn)動(dòng)態(tài)web工程完整示例
這篇文章主要介紹了詳解eclipse創(chuàng)建maven項(xiàng)目實(shí)現(xiàn)動(dòng)態(tài)web工程完整示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-12-12

