欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot CompletableFuture異步線程池詳解

 更新時間:2025年04月06日 11:18:23   作者:青衣畫白扇  
這篇文章主要介紹了springboot CompletableFuture異步線程池的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

初始化異步的4種方法

1、繼承Thread

2、實現(xiàn)Runnable

3、實現(xiàn)Callable接口+Future (可以拿到返回結(jié)果,可以處理異常 jdk1.5之后)

4、線程池【ExecutorService】(實際開發(fā)中使用)

  • 方式1和2 :主線程無法獲取線程的運算結(jié)果。
  • 方式3:主線程可以獲取線程的運算結(jié)果,但是不利于控制服務(wù)器中的線程資源??梢詫?dǎo)致服務(wù)器資源耗盡。
  • 方式4:線程池性能穩(wěn)定,控制資源,也可以獲取執(zhí)行結(jié)果,并捕獲異常。

線程池七大參數(shù)

  1. corePoolSize :[5] 核心線程數(shù)【一直存在除非 (allowCoreThreadTimeOut會回收)】;線程池,創(chuàng)建好后就準備好5個線程 Thread thread = new Thread() ;并沒有啟動 。只有往線程池提交任務(wù)后才會執(zhí)行 thread.start();
  2. maximumPoolSize : [200] 最大線程數(shù);控制資源
  3. keepAliveTime :存活時間。如果當前的線程數(shù)量大于core(核心)數(shù)量。釋放空閑的線程(maximumPoolSize-corePoolSize)。只要線程空閑大于指定的keepAliveTime 。
  4. unit :時間單位。
  5. BlockingQueue workQueue:阻塞隊列。如果任務(wù)有很多,就會將目前多的任務(wù)放在隊列里面。只要有線程空閑,就回去隊列里面取出新的任務(wù)繼續(xù)執(zhí)行。
    new LinedBlockingDeque<>() :默認是Integer的最大值 會造成內(nèi)存不足
  6. threadFactory:線程的創(chuàng)建工廠
  7. RejectedExecutionHandler handler:拒絕策列,如果隊列滿了執(zhí)行相應(yīng)的拒絕策略
    7.1 DiscardOldestPolicy :新任務(wù)進來時丟棄掉沒有執(zhí)行的舊任務(wù)
    7.2 CallerRunsPolicy:直接調(diào)用run方法同步執(zhí)行
    7.3 AbortPolicy:直接丟棄新任務(wù)并拋出異常
    7.4 DiscardPolicy:直接丟棄不拋出異常

工作順序

  1. 線程池創(chuàng)建,準備好core數(shù)量的核心線程,準備接受任務(wù)
  2. core滿了,就會將再進來的任務(wù)放在阻塞隊列中,空閑的core就會自己去阻塞隊列獲取任務(wù)執(zhí)行。
  3. 阻塞隊列滿了,就會直接開新線程執(zhí)行,最大只能開到max指定的數(shù)量。
  4. max滿了就用RejectedExecutionHandler 策略拒絕任務(wù)。
  5. max都執(zhí)行完成,有很多空閑,在指定的時間keepAliveTime以后,釋放max-core這些線程

Executors

  1. newCachedThreadPool() core是0,所有都可以回收
  2. newFixedThreadPool() 固定大小 core= max ;都不可回收
  3. newScheduledThreadPllo() 定時任務(wù)的線程池
  4. newSingleThreadExecutor() 單線程的線程池,后臺獲取到任務(wù)去挨個執(zhí)行

1、創(chuàng)建異步對象

CompletableFuture 提供了四個靜態(tài)方法

//可以獲取到返回值,可傳入自定義線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
//沒有返回值,可傳入自定義線程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

測試

public static ExecutorService service = Executors.newFixedThreadPool(10);
  //沒有返回值
        CompletableFuture.runAsync(()->{
            System.out.println("異步任務(wù)成功完成了");
        },service);
        //空入?yún)⒅挥蟹祷刂?
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return "返回值";
        }, service);

2、計算完成時回調(diào)方法

//上一個任務(wù)完成和上一個任務(wù)用同一個線程
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
//交給線程池重新啟動一個線程
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
//任務(wù)執(zhí)行完成后(只能感知)
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
//感知異常同時修改返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

測試代碼

 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        }, service).whenComplete((res,excption)->{
            //雖然能得到異常消息但是不能修改返回結(jié)果
            System.out.println("異步任務(wù)成功完成了"+res+"或者異常:"+excption);
        }).exceptionally(throwable ->{
            //處理異常并可以數(shù)據(jù)修改返回值
            return 0;
       });

3、handle 方法(異常時處理并返回)

//和上一個任務(wù)使用同一個線程執(zhí)行
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
//默認線程池開啟線程執(zhí)行
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
//指定自己的線程池開啟線程執(zhí)行
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

4、handle 測試

       //方法執(zhí)行完成后的處理不論成功還是失敗
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        }, service).handle((res,thr)->{
            //未出現(xiàn)異常 當然這里可以不寫 為了掩飾
           if(res!=null){
               return 0;
           }
           //出現(xiàn)異常
           if(thr!=null){
               return 1;
           }
           return 0;
        });

5、 線程串行化方法(B任務(wù)需要A任務(wù)的執(zhí)行結(jié)果后才能執(zhí)行)

//A-->B-->C 感知上一步結(jié)果并返回最后一次的結(jié)果
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)

//B可以感知到A的返回值 
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor)
//A->完成后(有Async開啟新的線程沒有就是和A一個線程)感知不到上一步的執(zhí)行結(jié)果                                                   
public CompletableFuture<Void> thenRun(Runnable action) 
public CompletableFuture<Void> thenRunAsync(Runnable action) 
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) 

測試

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            return 10;
        }, service).thenRunAsync(() -> {
            //感知不到 任務(wù)A的結(jié)果
        },service);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            return 10;
        }, service).thenAcceptAsync((res) -> {
            //可以感知到任務(wù)A的結(jié)果,但是不能返回數(shù)據(jù)
            int i = res / 2;
        },service);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            return 10;
        }, service).thenApplyAsync((res) -> {
        //返回值以最后一次返回為準
            return 10 + res;
        }, service);

6、兩個任務(wù)組合 - 都要完成

以下三種方式 兩個任務(wù)都必須完成,才觸發(fā)該任務(wù)
//組合兩個future,獲取兩個future 任務(wù)的返回結(jié)果,并返回當前任務(wù)的返回值
public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) 

//組合兩個future,獲取兩個future 任務(wù)的返回結(jié)果,然后處理任務(wù),沒有返回值。
public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)

public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)
        
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) 
        
//組合兩個future ,不需要獲取future的結(jié)果,只需要兩個 future處理完成后處理該任務(wù)        
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) 
                                                     
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)

測試

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            System.out.println(Thread.currentThread().getId());
            System.out.println("任務(wù)一結(jié)束");
            return 10/2;
        }, service);
        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            System.out.println(Thread.currentThread().getId());
            System.out.println("任務(wù)二結(jié)束");
            return "future02";
        }, service);
        //不能感知到結(jié)果
        CompletableFuture<Void> future03void = future01.runAfterBothAsync(future02, () -> {
            System.out.println("任務(wù)三執(zhí)行結(jié)束");
        }, service);
        //可以感知獲取到前兩個任務(wù)結(jié)果
        CompletableFuture<Void> future03No = future01.thenAcceptBothAsync(future02, (res1, res2) -> {
            System.out.println("任務(wù)三執(zhí)行結(jié)束");
        }, service);
        CompletableFuture<String> future03 = future01.thenCombineAsync(future02, (res1, res2) -> {
            System.out.println("任務(wù)三執(zhí)行結(jié)束");
            return res1 + "-" + res2;
        }, service);

7、兩個任務(wù)一個完成

//兩個任務(wù)有一個執(zhí)行完成,獲取它的返回值,處理任務(wù)并有新的返回值
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) 

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
        
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T>other,Function<? super T, U> fn,Executor executor) 
//兩個任務(wù)有一個執(zhí)行完成,獲取它的返回值,處理任務(wù),沒有新的返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor)
//兩個任務(wù)有一個執(zhí)行完成,不需要獲取future的結(jié)果,處理任務(wù) ,也沒有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
                                                       
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

測試

		 /**
         * 兩個任務(wù)有一個完成就執(zhí)行三
         *
         */
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId());
            System.out.println("任務(wù)一結(jié)束");

            return 10/2;
        }, service);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            System.out.println(Thread.currentThread().getId());

            System.out.println("任務(wù)二結(jié)束");
            return "future02";
        }, service);
        //不感知結(jié)果,自己也沒有返回值
        future01.runAfterEitherAsync(future02,()->{
            System.out.println("任務(wù)三執(zhí)行結(jié)束");
        },service);
        //感知到結(jié)果,自己沒有返回值
        future01.acceptEitherAsync(future02,(res)->{
            //感知到線程已經(jīng)處理完成的結(jié)果
            System.out.println("任務(wù)三執(zhí)行結(jié)束"+res);
        },service);
        //感知到結(jié)果并返回自己的返回值
        CompletableFuture<String> stringCompletableFuture = future01.applyToEitherAsync(future02, res -> {
            return "任務(wù)三結(jié)果" + res;
        }, service);

8、多任務(wù)組合

//等待所有任務(wù)完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//只要有一個任務(wù)完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

測試

CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            System.out.println(Thread.currentThread().getId());
            System.out.println("任務(wù)一結(jié)束");

            return 10/2;
        }, service);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            //任務(wù)A
            System.out.println(Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務(wù)二結(jié)束");
            return "future02";
        }, service);
        //此方法是阻塞式等待不建議在這使用
        //future01.get();
        //future02.get();
        
		//等待所有任務(wù)完成不會阻塞
        CompletableFuture<Void> allOf= CompletableFuture.allOf(future01, future02);
        //等待所有的都完成(兩個任務(wù)同時執(zhí)行)
        allOf.get();
        log.info("end");
        System.out.println(future02.get()+""+future01.get());
        //只有一個完成
        CompletableFuture<Object> anyOf= CompletableFuture.anyOf(future01, future02);
        anyOf.get();
        System.out.println(anyOf.get());

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論