Java并發(fā)編程中的CompletableFuture使用詳解
引言
并行,并發(fā)
并發(fā):一個實體上,多個任務有序執(zhí)行
并行:多個實體上,多個任務同時執(zhí)行
用戶線程
用戶線程是系統(tǒng)的工作線程,會完成程序需要完成的業(yè)務操作
守護線程
是一種特殊的線程,為其他線程服務的,在后臺默默的完成一些系統(tǒng)性的服務,如GC線程;
如果用戶線程全部結束,意味著程序需要完成的業(yè)務操作已經結束了,守護線程就沒有必要繼續(xù)運行了。所以當系統(tǒng)只剩下守護線程的時候,java虛擬機會自動退出。
1、Future
Future接口(實現(xiàn)類FutureTask)定義了操作異步任務執(zhí)行的一些方法,如獲取異步任務執(zhí)行的結果、取消任務的執(zhí)行、判斷任務是否被取消,判斷任務是否執(zhí)行完畢等。
Future接口可以為主線程開一個分支線程,專門為主線程處理耗時和費力的復雜業(yè)務
Future接口方法
取消任務:
boolean cancel(boolean mayInterruptIfRunning);
判斷任務是否被取消
boolean isCancelled();
斷任務是否執(zhí)行完畢
boolean isDone();
獲取異步任務執(zhí)行的結果:
V get() throws InterruptedException, ExecutionException;
獲取異步任務執(zhí)行的結果(限定時間內沒獲取到結果就拋出異常):
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
注:get方法獲取執(zhí)行結果會出現(xiàn)程序阻塞,所以一般放到程序最后調用
示例代碼
//固定大小線程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
FutureTask<String> futureTask1 = new FutureTask<>(()->{
TimeUnit.MICROSECONDS.sleep(500);
return "callable1";
});
threadPool.submit(futureTask1);
FutureTask<String> futureTask2 = new FutureTask<>(()->{
TimeUnit.SECONDS.sleep(3);
return "callable2";
});
threadPool.submit(futureTask2);
//獲取異步任務執(zhí)行結果
while (true){
if (futureTask1.isDone()){
System.out.println(futureTask1.get());
break;
}
}
System.out.println(futureTask2.get(2,TimeUnit.SECONDS));
//關閉線程池
threadPool.shutdown();2、CompletableFuture
CompletableFuture提供了Future的擴展功能,提供了函數(shù)式編程能力,可以在任務執(zhí)行完后通過回調的方式處理計算結果。
CompletableFuture的創(chuàng)建
方法:
- runAsync 無返回值
- supplyAsync 有返回值
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//無返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, threadPool);//不指定線程池就會使用默認的線程池
System.out.println(future.get());//null
//有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "supplyAsync";
}, threadPool);
System.out.println(future1.get());
threadPool.shutdown();CompletableFuture示例
一個階段的完成可能會觸發(fā)另一個階段
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
int nextInt = new Random().nextInt(10);
// int a = 10 /0;
return nextInt;
}, threadPool).whenComplete((v,e)->{//獲得上一步執(zhí)行完返回的結果 v;可能出現(xiàn)的異常 e
if (e==null){
System.out.println("成功獲得結果:"+v);
}
}).exceptionally(e->{//發(fā)生異常后自動調用
e.printStackTrace();
System.out.println("發(fā)生異常:"+e.getMessage());
return null;
});
threadPool.shutdown();
//todo 主線程執(zhí)行任務 (注意:主線程結束后默認線程池會自動關閉,推薦使用自定義線程池)
}CompletableFuture常用方法
getNow和complete
//getNow:立即獲取結果,若沒獲取到就使用備選結果
System.out.println(future1.getNow("xxxxx"));
//complete:如果操作未執(zhí)行完就將get獲得的值改為給定的值,然后返回true,反之get獲得的值就是操作執(zhí)行完返回的值,然后返回false
System.out.println(future1.complete("beixuan") + "\t" + future1.get());thenApply:計算結果存在傳遞關系,發(fā)生異常時后面步驟停止運行
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(v -> {//當這一步發(fā)生異常時,后續(xù)操作不執(zhí)行,直接跳到最后打印異常信息
//int i = 10/0;
return v + 2;
}).thenApply(v -> {
return v + 3;
}).whenComplete((v,e)->{
if (e==null){
System.out.println("thenApply:"+v);//6
}
}).exceptionally(e->{
e.printStackTrace();
return null;
});handle:和thenApply類似,但發(fā)生異常時后續(xù)操作可以正常執(zhí)行
CompletableFuture.supplyAsync(() -> {
System.out.println(1);
return 1;
}).handle((v,e) -> {//第一步發(fā)生異常停止運行,但后面可以正常運行,直至最后把異常打印出來
int i = 10/0;
System.out.println(3);
return v + 2;
}).handle((v,e) -> {//這里正常輸出
System.out.println(6);
return v + 3;
}).whenComplete((v,e)->{
if (e==null){
System.out.println("handle:"+v);
}
}).exceptionally(e->{
e.printStackTrace();
return null;
});thenAccept:接收上一步執(zhí)行完的結果,沒有返回值
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(v -> {
return v + 2;
}).thenAccept(v -> {
System.out.println(v);
});thenCombine:對兩個異步操作的結果進行合并,先完成的操作要等另一個慢的操作
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
return 10;
});
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
return 20;
});
System.out.println(futureA.thenCombine(futureB, (a, b) -> {
System.out.println("結果開始合并");
return a + b;
}).join());//30
//========================================================================
System.out.println(CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
return 20;
}), (x, y) -> {
System.out.println("結果開始合并1");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
return 30;
}), (x, y) -> {
System.out.println("結果開始合并2");
return x + y;
}).join());//60CompletableFuture案例
比較多個商城中同一物品的價格
package com.cheng.juc;
import lombok.Data;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
public class GoodsPriceDemo {
static List<NetMall> malls = Arrays.asList(new NetMall("JD"),new NetMall("TB"),new NetMall("DD"));
/**
* 輪流查詢價格
* @param malls
* @param goodsName
* @return
*/
public static List<String> getPrice(List<NetMall> malls,String goodsName){
return malls.stream()
.map(m -> String.format(goodsName + " in %s price is %.2f", m.getNetMallName(), m.calcPrice(goodsName)))
.collect(Collectors.toList());
}
/**
* 使用異步任務查詢價格
* @param malls
* @param goodsName
* @return
*/
public static List<String> getPricePlus(List<NetMall> malls,String goodsName){
ExecutorService threadPool = Executors.newFixedThreadPool(3);
return malls.stream()
//為每一個商城開啟一個異步任務,然后同時查詢價格
.map(m-> CompletableFuture.supplyAsync(()-> String.format(goodsName + " in %s price is %.2f", m.getNetMallName(), m.calcPrice(goodsName)),threadPool))
.collect(Collectors.toList())
.stream().map(d->d.join())
.collect(Collectors.toList());
}
public static void main(String[] args) {
long l1 = System.currentTimeMillis();
// List<String> price = getPrice(malls,"mysql");// 3s
List<String> price = getPricePlus(malls,"mysql");// 1s
price.stream().forEach(System.out::println);
long l2 = System.currentTimeMillis();
System.out.println("耗時:"+(l2 - l1));
}
}
@Data
class NetMall{
private String netMallName;
public NetMall(String netMallName){
this.netMallName = netMallName;
}
//計算價格
public BigDecimal calcPrice(String goodsName){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
BigDecimal result = BigDecimal.valueOf(ThreadLocalRandom.current().nextDouble() * 2 + goodsName.charAt(0));
return result;
}
}比較結果:
mysql in JD price is 110.37
mysql in TB price is 110.58
mysql in DD price is 109.48
到此這篇關于Java并發(fā)編程中的CompletableFuture使用詳解的文章就介紹到這了,更多相關Java的CompletableFuture內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實現(xiàn))
這篇文章主要介紹了基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實現(xiàn)),本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋參考下吧2023-11-11
struts2+spring+ibatis框架整合實現(xiàn)增刪改查
這篇文章主要為大家詳細介紹了struts2+spring+ibatis框架整合實現(xiàn)增刪改查操作,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-07-07
elasticsearch索引index數(shù)據(jù)功能源碼示例
這篇文章主要為大家介紹了elasticsearch索引index功能源碼示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04
SpringCloud Ribbon與OpenFeign詳解如何實現(xiàn)服務調用
這篇文章主要介紹了SpringCloud Ribbon與OpenFeign實現(xiàn)服務調用的過程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-09-09
Spring-AOP 靜態(tài)正則表達式方法如何匹配切面
這篇文章主要介紹了Spring-AOP 靜態(tài)正則表達式方法如何匹配切面的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
Java中LinkedHashSet的實現(xiàn)原理詳解
這篇文章主要介紹了Java中LinkedHasSet的實現(xiàn)原理詳解,LinkedHashSet?是具有可預知迭代順序的?Set?接口的哈希表和鏈接列表實現(xiàn),此實現(xiàn)與HashSet?的不同之處在于,后者維護著一個運行于所有條目的雙重鏈接列表,需要的朋友可以參考下2023-09-09

