springboot CompletableFuture異步線程池詳解
初始化異步的4種方法
1、繼承Thread
2、實現(xiàn)Runnable
3、實現(xiàn)Callable接口+Future (可以拿到返回結(jié)果,可以處理異常 jdk1.5之后)
4、線程池【ExecutorService】(實際開發(fā)中使用)
- 方式1和2 :主線程無法獲取線程的運算結(jié)果。
- 方式3:主線程可以獲取線程的運算結(jié)果,但是不利于控制服務(wù)器中的線程資源??梢詫?dǎo)致服務(wù)器資源耗盡。
- 方式4:線程池性能穩(wěn)定,控制資源,也可以獲取執(zhí)行結(jié)果,并捕獲異常。
線程池七大參數(shù)
- corePoolSize :[5] 核心線程數(shù)【一直存在除非 (allowCoreThreadTimeOut會回收)】;線程池,創(chuàng)建好后就準備好5個線程 Thread thread = new Thread() ;并沒有啟動 。只有往線程池提交任務(wù)后才會執(zhí)行 thread.start();
- maximumPoolSize : [200] 最大線程數(shù);控制資源
- keepAliveTime :存活時間。如果當前的線程數(shù)量大于core(核心)數(shù)量。釋放空閑的線程(maximumPoolSize-corePoolSize)。只要線程空閑大于指定的keepAliveTime 。
- unit :時間單位。
- BlockingQueue workQueue:阻塞隊列。如果任務(wù)有很多,就會將目前多的任務(wù)放在隊列里面。只要有線程空閑,就回去隊列里面取出新的任務(wù)繼續(xù)執(zhí)行。
new LinedBlockingDeque<>() :默認是Integer的最大值 會造成內(nèi)存不足 - threadFactory:線程的創(chuàng)建工廠
- RejectedExecutionHandler handler:拒絕策列,如果隊列滿了執(zhí)行相應(yīng)的拒絕策略
7.1 DiscardOldestPolicy :新任務(wù)進來時丟棄掉沒有執(zhí)行的舊任務(wù)
7.2 CallerRunsPolicy:直接調(diào)用run方法同步執(zhí)行
7.3 AbortPolicy:直接丟棄新任務(wù)并拋出異常
7.4 DiscardPolicy:直接丟棄不拋出異常
工作順序
- 線程池創(chuàng)建,準備好core數(shù)量的核心線程,準備接受任務(wù)
- core滿了,就會將再進來的任務(wù)放在阻塞隊列中,空閑的core就會自己去阻塞隊列獲取任務(wù)執(zhí)行。
- 阻塞隊列滿了,就會直接開新線程執(zhí)行,最大只能開到max指定的數(shù)量。
- max滿了就用RejectedExecutionHandler 策略拒絕任務(wù)。
- max都執(zhí)行完成,有很多空閑,在指定的時間keepAliveTime以后,釋放max-core這些線程
Executors
- newCachedThreadPool() core是0,所有都可以回收
- newFixedThreadPool() 固定大小 core= max ;都不可回收
- newScheduledThreadPllo() 定時任務(wù)的線程池
- newSingleThreadExecutor() 單線程的線程池,后臺獲取到任務(wù)去挨個執(zhí)行
1、創(chuàng)建異步對象
CompletableFuture 提供了四個靜態(tài)方法
//可以獲取到返回值,可傳入自定義線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) //沒有返回值,可傳入自定義線程池 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
測試
public static ExecutorService service = Executors.newFixedThreadPool(10); //沒有返回值 CompletableFuture.runAsync(()->{ System.out.println("異步任務(wù)成功完成了"); },service); //空入?yún)⒅挥蟹祷刂? CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> { return "返回值"; }, service);
2、計算完成時回調(diào)方法
//上一個任務(wù)完成和上一個任務(wù)用同一個線程 public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); //交給線程池重新啟動一個線程 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); //任務(wù)執(zhí)行完成后(只能感知) public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); //感知異常同時修改返回值 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
測試代碼
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 10; }, service).whenComplete((res,excption)->{ //雖然能得到異常消息但是不能修改返回結(jié)果 System.out.println("異步任務(wù)成功完成了"+res+"或者異常:"+excption); }).exceptionally(throwable ->{ //處理異常并可以數(shù)據(jù)修改返回值 return 0; });
3、handle 方法(異常時處理并返回)
//和上一個任務(wù)使用同一個線程執(zhí)行 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) //默認線程池開啟線程執(zhí)行 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) //指定自己的線程池開啟線程執(zhí)行 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
4、handle 測試
//方法執(zhí)行完成后的處理不論成功還是失敗 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 10; }, service).handle((res,thr)->{ //未出現(xiàn)異常 當然這里可以不寫 為了掩飾 if(res!=null){ return 0; } //出現(xiàn)異常 if(thr!=null){ return 1; } return 0; });
5、 線程串行化方法(B任務(wù)需要A任務(wù)的執(zhí)行結(jié)果后才能執(zhí)行)
//A-->B-->C 感知上一步結(jié)果并返回最后一次的結(jié)果 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor) //B可以感知到A的返回值 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) //A->完成后(有Async開啟新的線程沒有就是和A一個線程)感知不到上一步的執(zhí)行結(jié)果 public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
測試
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { //任務(wù)A return 10; }, service).thenRunAsync(() -> { //感知不到 任務(wù)A的結(jié)果 },service);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { //任務(wù)A return 10; }, service).thenAcceptAsync((res) -> { //可以感知到任務(wù)A的結(jié)果,但是不能返回數(shù)據(jù) int i = res / 2; },service);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { //任務(wù)A return 10; }, service).thenApplyAsync((res) -> { //返回值以最后一次返回為準 return 10 + res; }, service);
6、兩個任務(wù)組合 - 都要完成
以下三種方式 兩個任務(wù)都必須完成,才觸發(fā)該任務(wù) //組合兩個future,獲取兩個future 任務(wù)的返回結(jié)果,并返回當前任務(wù)的返回值 public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) //組合兩個future,獲取兩個future 任務(wù)的返回結(jié)果,然后處理任務(wù),沒有返回值。 public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) //組合兩個future ,不需要獲取future的結(jié)果,只需要兩個 future處理完成后處理該任務(wù) public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
測試
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { //任務(wù)A System.out.println(Thread.currentThread().getId()); System.out.println("任務(wù)一結(jié)束"); return 10/2; }, service); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { //任務(wù)A System.out.println(Thread.currentThread().getId()); System.out.println("任務(wù)二結(jié)束"); return "future02"; }, service); //不能感知到結(jié)果 CompletableFuture<Void> future03void = future01.runAfterBothAsync(future02, () -> { System.out.println("任務(wù)三執(zhí)行結(jié)束"); }, service); //可以感知獲取到前兩個任務(wù)結(jié)果 CompletableFuture<Void> future03No = future01.thenAcceptBothAsync(future02, (res1, res2) -> { System.out.println("任務(wù)三執(zhí)行結(jié)束"); }, service); CompletableFuture<String> future03 = future01.thenCombineAsync(future02, (res1, res2) -> { System.out.println("任務(wù)三執(zhí)行結(jié)束"); return res1 + "-" + res2; }, service);
7、兩個任務(wù)一個完成
//兩個任務(wù)有一個執(zhí)行完成,獲取它的返回值,處理任務(wù)并有新的返回值 public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T>other,Function<? super T, U> fn,Executor executor) //兩個任務(wù)有一個執(zhí)行完成,獲取它的返回值,處理任務(wù),沒有新的返回值 public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor) //兩個任務(wù)有一個執(zhí)行完成,不需要獲取future的結(jié)果,處理任務(wù) ,也沒有返回值 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
測試
/** * 兩個任務(wù)有一個完成就執(zhí)行三 * */ CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> { //任務(wù)A try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getId()); System.out.println("任務(wù)一結(jié)束"); return 10/2; }, service); CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> { //任務(wù)A System.out.println(Thread.currentThread().getId()); System.out.println("任務(wù)二結(jié)束"); return "future02"; }, service); //不感知結(jié)果,自己也沒有返回值 future01.runAfterEitherAsync(future02,()->{ System.out.println("任務(wù)三執(zhí)行結(jié)束"); },service); //感知到結(jié)果,自己沒有返回值 future01.acceptEitherAsync(future02,(res)->{ //感知到線程已經(jīng)處理完成的結(jié)果 System.out.println("任務(wù)三執(zhí)行結(jié)束"+res); },service); //感知到結(jié)果并返回自己的返回值 CompletableFuture<String> stringCompletableFuture = future01.applyToEitherAsync(future02, res -> { return "任務(wù)三結(jié)果" + res; }, service);
8、多任務(wù)組合
//等待所有任務(wù)完成 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) //只要有一個任務(wù)完成 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
測試
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> { //任務(wù)A System.out.println(Thread.currentThread().getId()); System.out.println("任務(wù)一結(jié)束"); return 10/2; }, service); CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> { //任務(wù)A System.out.println(Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)二結(jié)束"); return "future02"; }, service); //此方法是阻塞式等待不建議在這使用 //future01.get(); //future02.get(); //等待所有任務(wù)完成不會阻塞 CompletableFuture<Void> allOf= CompletableFuture.allOf(future01, future02); //等待所有的都完成(兩個任務(wù)同時執(zhí)行) allOf.get(); log.info("end"); System.out.println(future02.get()+""+future01.get()); //只有一個完成 CompletableFuture<Object> anyOf= CompletableFuture.anyOf(future01, future02); anyOf.get(); System.out.println(anyOf.get());
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis關(guān)聯(lián)查詢遇到的坑-無主鍵的關(guān)聯(lián)數(shù)據(jù)去重問題
這篇文章主要介紹了Mybatis關(guān)聯(lián)查詢遇到的坑-無主鍵的關(guān)聯(lián)數(shù)據(jù)去重問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03Java中的抽象工廠模式_動力節(jié)點Java學(xué)院整理
抽象工廠模式是工廠方法模式的升級版本,他用來創(chuàng)建一組相關(guān)或者相互依賴的對象。下面通過本文給大家分享Java中的抽象工廠模式,感興趣的朋友一起看看吧2017-08-08詳解javaweb中jstl如何循環(huán)List中的Map數(shù)據(jù)
這篇文章主要介紹了詳解javaweb中jstl如何循環(huán)List中的Map數(shù)據(jù)的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下2017-10-10Java 實戰(zhàn)項目之疫情防控管理系統(tǒng)詳解
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java實現(xiàn)一個疫情防控管理系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11