欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java8中CompletableFuture的用法全解

 更新時(shí)間:2022年01月26日 09:25:21   作者:孫大圣666  
這篇文章主要給大家介紹了關(guān)于Java8中CompletableFuture用法的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

前言

CompletableFuture實(shí)現(xiàn)了CompletionStage接口和Future接口,前者是對(duì)后者的一個(gè)擴(kuò)展,增加了異步回調(diào)、流式處理、多個(gè)Future組合處理的能力,使Java在處理多任務(wù)的協(xié)同工作時(shí)更加順暢便利。

一、創(chuàng)建異步任務(wù)

1、Future.submit

通常的線(xiàn)程池接口類(lèi)ExecutorService,其中execute方法的返回值是void,即無(wú)法獲取異步任務(wù)的執(zhí)行狀態(tài),3個(gè)重載的submit方法的返回值是Future,可以據(jù)此獲取任務(wù)執(zhí)行的狀態(tài)和結(jié)果,示例如下:

    @Test
    public void test3() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        Future<Double> cf = executorService.submit(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成,如果已完成則直接返回結(jié)果
        //如果執(zhí)行任務(wù)異常,則get方法會(huì)把之前捕獲的異常重新拋出
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

執(zhí)行結(jié)果如下:

子線(xiàn)程是異步執(zhí)行的,主線(xiàn)程休眠等待子線(xiàn)程執(zhí)行完成,子線(xiàn)程執(zhí)行完成后喚醒主線(xiàn)程,主線(xiàn)程獲取任務(wù)執(zhí)行結(jié)果后退出。

很多博客說(shuō)使用不帶等待時(shí)間限制的get方法時(shí),如果子線(xiàn)程執(zhí)行異常了會(huì)導(dǎo)致主線(xiàn)程長(zhǎng)期阻塞,這其實(shí)是錯(cuò)誤的,子線(xiàn)程執(zhí)行異常時(shí)其異常會(huì)被捕獲,然后修改任務(wù)的狀態(tài)為異常結(jié)束并喚醒等待的主線(xiàn)程,get方法判斷任務(wù)狀態(tài)發(fā)生變更,就終止等待了,并拋出異常,可參考《Java8 AbstractExecutorService 和 FutureTask 源碼解析》中FutureTask的實(shí)現(xiàn)。將上述用例中if(false)改成if(true) ,執(zhí)行結(jié)果如下:

get方法拋出異常導(dǎo)致主線(xiàn)程異常終止。 

2、supplyAsync / runAsync

     supplyAsync表示創(chuàng)建帶返回值的異步任務(wù)的,相當(dāng)于ExecutorService submit(Callable<T> task) 方法,runAsync表示創(chuàng)建無(wú)返回值的異步任務(wù),相當(dāng)于ExecutorService submit(Runnable task)方法,這兩方法的效果跟submit是一樣的,測(cè)試用例如下:

    @Test
    public void test2() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù),有返回值
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }
 
   @Test
    public void test4() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù),無(wú)返回值
        CompletableFuture cf = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
            }
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

 這兩方法各有一個(gè)重載版本,可以指定執(zhí)行異步任務(wù)的Executor實(shí)現(xiàn),如果不指定,默認(rèn)使用ForkJoinPool.commonPool(),如果機(jī)器是單核的,則默認(rèn)使用ThreadPerTaskExecutor,該類(lèi)是一個(gè)內(nèi)部類(lèi),每次執(zhí)行execute都會(huì)創(chuàng)建一個(gè)新線(xiàn)程。測(cè)試用例如下:

   @Test
    public void test2() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        },pool);
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }
 
@Test
    public void test4() throws Exception {
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture cf = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
            }
        },executorService);
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

二、異步回調(diào)

1、thenApply / thenApplyAsync

thenApply 表示某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作,即回調(diào)方法,會(huì)將該任務(wù)的執(zhí)行結(jié)果即方法返回值作為入?yún)鬟f到回調(diào)方法中,測(cè)試用例如下:

@Test
    public void test5() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        },pool);
        //cf關(guān)聯(lián)的異步任務(wù)的返回值作為方法入?yún)?,傳入到thenApply的方法中
        //thenApply這里實(shí)際創(chuàng)建了一個(gè)新的CompletableFuture實(shí)例
        CompletableFuture<String> cf2=cf.thenApply((result)->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return "test:"+result;
        });
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("run result->"+cf.get());
        System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

其執(zhí)行結(jié)果如下:

job1執(zhí)行結(jié)束后,將job1的方法返回值作為入?yún)鬟f到j(luò)ob2中并立即執(zhí)行job2。thenApplyAsync與thenApply的區(qū)別在于,前者是將job2提交到線(xiàn)程池中異步執(zhí)行,實(shí)際執(zhí)行job2的線(xiàn)程可能是另外一個(gè)線(xiàn)程,后者是由執(zhí)行job1的線(xiàn)程立即執(zhí)行job2,即兩個(gè)job都是同一個(gè)線(xiàn)程執(zhí)行的。將上述測(cè)試用例中thenApply改成thenApplyAsync后,執(zhí)行結(jié)果如下:

從輸出可知,執(zhí)行job1和job2是兩個(gè)不同的線(xiàn)程。thenApplyAsync有一個(gè)重載版本,可以指定執(zhí)行異步任務(wù)的Executor實(shí)現(xiàn),如果不指定,默認(rèn)使用ForkJoinPool.commonPool()。 下述的多個(gè)方法,每個(gè)方法都有兩個(gè)以Async結(jié)尾的方法,一個(gè)使用默認(rèn)的Executor實(shí)現(xiàn),一個(gè)使用指定的Executor實(shí)現(xiàn),不帶Async的方法是由觸發(fā)該任務(wù)的線(xiàn)程執(zhí)行該任務(wù),帶Async的方法是由觸發(fā)該任務(wù)的線(xiàn)程將任務(wù)提交到線(xiàn)程池,執(zhí)行任務(wù)的線(xiàn)程跟觸發(fā)任務(wù)的線(xiàn)程不一定是同一個(gè)。

2、thenAccept / thenRun

thenAccept 同 thenApply 接收上一個(gè)任務(wù)的返回值作為參數(shù),但是無(wú)返回值;thenRun 的方法沒(méi)有入?yún)?,也買(mǎi)有返回值,測(cè)試用例如下:

@Test
    public void test6() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        },pool);
        //cf關(guān)聯(lián)的異步任務(wù)的返回值作為方法入?yún)?,傳入到thenApply的方法中
        CompletableFuture cf2=cf.thenApply((result)->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return "test:"+result;
        }).thenAccept((result)-> { //接收上一個(gè)任務(wù)的執(zhí)行結(jié)果作為入?yún)?,但是沒(méi)有返回值
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(result);
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
        }).thenRun(()->{ //無(wú)入?yún)?,也沒(méi)有返回值
            System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("thenRun do something");
            System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
        });
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("run result->"+cf.get());
        System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
        //cf2 等待最后一個(gè)thenRun執(zhí)行完成
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

其執(zhí)行結(jié)果如下:

3、 exceptionally

exceptionally方法指定某個(gè)任務(wù)執(zhí)行異常時(shí)執(zhí)行的回調(diào)方法,會(huì)將拋出異常作為參數(shù)傳遞到回調(diào)方法中,如果該任務(wù)正常執(zhí)行則會(huì)exceptionally方法返回的CompletionStage的result就是該任務(wù)正常執(zhí)行的結(jié)果,測(cè)試用例如下:

@Test
    public void test2() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        },pool);
        //cf執(zhí)行異常時(shí),將拋出的異常作為入?yún)鬟f給回調(diào)方法
        CompletableFuture<Double> cf2= cf.exceptionally((param)->{
             System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("error stack trace->");
            param.printStackTrace();
            System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
             return -1.1;
        });
        //cf正常執(zhí)行時(shí)執(zhí)行的邏輯,如果執(zhí)行異常則不調(diào)用此邏輯
        CompletableFuture cf3=cf.thenAccept((param)->{
            System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("param->"+param);
            System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成,此處無(wú)論是job2和job3都可以實(shí)現(xiàn)job2退出,主線(xiàn)程才退出,如果是cf,則主線(xiàn)程不會(huì)等待job2執(zhí)行完成自動(dòng)退出了
        //cf2.get時(shí),沒(méi)有異常,但是依然有返回值,就是cf的返回值
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

其輸出如下:

拋出異常后,只有cf2執(zhí)行了,cf3沒(méi)有執(zhí)行。將上述示例中的if(true) 改成if(false),其輸出如下:

cf2沒(méi)有指定,其result就是cf執(zhí)行的結(jié)果,理論上cf2.get應(yīng)該立即返回的,此處是等待了cf3,即job2執(zhí)行完成后才返回,具體原因且待下篇源碼分析時(shí)再做探討。 

4、whenComplete

whenComplete是當(dāng)某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的回調(diào)方法,會(huì)將執(zhí)行結(jié)果或者執(zhí)行期間拋出的異常傳遞給回調(diào)方法,如果是正常執(zhí)行則異常為null,回調(diào)方法對(duì)應(yīng)的CompletableFuture的result和該任務(wù)一致,如果該任務(wù)正常執(zhí)行,則get方法返回執(zhí)行結(jié)果,如果是執(zhí)行異常,則get方法拋出異常。測(cè)試用例如下:

@Test
    public void test10() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        //cf執(zhí)行完成后會(huì)將執(zhí)行結(jié)果和執(zhí)行過(guò)程中拋出的異常傳入回調(diào)方法,如果是正常執(zhí)行的則傳入的異常為null
        CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{
            System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(b!=null){
                System.out.println("error stack trace->");
                b.printStackTrace();
            }else{
                System.out.println("run succ,result->"+a);
            }
            System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
        });
        //等待子任務(wù)執(zhí)行完成
        System.out.println("main thread start wait,time->"+System.currentTimeMillis());
        //如果cf是正常執(zhí)行的,cf2.get的結(jié)果就是cf執(zhí)行的結(jié)果
        //如果cf是執(zhí)行異常,則cf2.get會(huì)拋出異常
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

執(zhí)行結(jié)果如下:

 將上述示例中的if(false) 改成if(true),其輸出如下:

5、handle

跟whenComplete基本一致,區(qū)別在于handle的回調(diào)方法有返回值,且handle方法返回的CompletableFuture的result是回調(diào)方法的執(zhí)行結(jié)果或者回調(diào)方法執(zhí)行期間拋出的異常,與原始CompletableFuture的result無(wú)關(guān)了。測(cè)試用例如下:

 @Test
    public void test10() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        //cf執(zhí)行完成后會(huì)將執(zhí)行結(jié)果和執(zhí)行過(guò)程中拋出的異常傳入回調(diào)方法,如果是正常執(zhí)行的則傳入的異常為null
        CompletableFuture<String> cf2=cf.handle((a,b)->{
            System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(b!=null){
                System.out.println("error stack trace->");
                b.printStackTrace();
            }else{
                System.out.println("run succ,result->"+a);
            }
            System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
            if(b!=null){
                return "run error";
            }else{
                return "run succ";
            }
        });
        //等待子任務(wù)執(zhí)行完成
        System.out.println("main thread start wait,time->"+System.currentTimeMillis());
        //get的結(jié)果是cf2的返回值,跟cf沒(méi)關(guān)系了
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

 其執(zhí)行結(jié)果如下:

將上述示例中的if(true) 改成if(false),其輸出如下:

三、組合處理

1、thenCombine / thenAcceptBoth / runAfterBoth

這三個(gè)方法都是將兩個(gè)CompletableFuture組合起來(lái),只有這兩個(gè)都正常執(zhí)行完了才會(huì)執(zhí)行某個(gè)任務(wù),區(qū)別在于,thenCombine會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)鬟f到指定方法中,且該方法有返回值;thenAcceptBoth同樣將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,但是無(wú)返回值;runAfterBoth沒(méi)有入?yún)?,也沒(méi)有返回值。注意兩個(gè)任務(wù)中只要有一個(gè)執(zhí)行異常,則將該異常信息作為指定任務(wù)的執(zhí)行結(jié)果。測(cè)試用例如下:

@Test
    public void test7() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,且有返回值
        CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            System.out.println("job3 param a->"+a+",b->"+b);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return a+b;
        });
 
        //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,無(wú)返回值
        CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{
            System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
            System.out.println("job4 param a->"+a+",b->"+b);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
        });
 
        //cf4和cf3都執(zhí)行完成后,執(zhí)行cf5,無(wú)入?yún)?,無(wú)返回值
        CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{
            System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println("cf5 do something");
            System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
        });
 
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("cf run result->"+cf.get());
        System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
        System.out.println("cf5 run result->"+cf5.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

 其運(yùn)行結(jié)果如下:

job1 和 job2幾乎同時(shí)運(yùn)行,job2比job1先執(zhí)行完成,等job1退出后,job3和job4幾乎同時(shí)開(kāi)始運(yùn)行,job4先退出,等job3執(zhí)行完成,job5開(kāi)始了,等job5執(zhí)行完成后,主線(xiàn)程退出。

2、applyToEither / acceptEither / runAfterEither

這三個(gè)方法都是將兩個(gè)CompletableFuture組合起來(lái),只要其中一個(gè)執(zhí)行完了就會(huì)執(zhí)行某個(gè)任務(wù),其區(qū)別在于applyToEither會(huì)將已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,并有返回值;acceptEither同樣將已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,但是沒(méi)有返回值;runAfterEither沒(méi)有方法入?yún)ⅲ矝](méi)有返回值。注意兩個(gè)任務(wù)中只要有一個(gè)執(zhí)行異常,則將該異常信息作為指定任務(wù)的執(zhí)行結(jié)果。測(cè)試用例如下:

@Test
    public void test8() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,且有返回值
        CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            System.out.println("job3 param result->"+result);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return result;
        });
 
        //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,無(wú)返回值
        CompletableFuture cf4=cf.acceptEither(cf2,(result)->{
            System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
            System.out.println("job4 param result->"+result);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
        });
 
        //cf4和cf3都執(zhí)行完成后,執(zhí)行cf5,無(wú)入?yún)?,無(wú)返回值
        CompletableFuture cf5=cf4.runAfterEither(cf3,()->{
            System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println("cf5 do something");
            System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
        });
 
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("cf run result->"+cf.get());
        System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
        System.out.println("cf5 run result->"+cf5.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

 其運(yùn)行結(jié)果如下:

job1 和job2 同時(shí)開(kāi)始運(yùn)行,job2先執(zhí)行完成,然后job4開(kāi)始執(zhí)行,理論上job3和job4應(yīng)該同時(shí)開(kāi)始運(yùn)行,但是此時(shí)只有job4開(kāi)始執(zhí)行了,job3是等待job1執(zhí)行完成后才開(kāi)始執(zhí)行,job4先于job3執(zhí)行完成,然后job5開(kāi)始執(zhí)行,等job5執(zhí)行完成后,主線(xiàn)程退出。上述差異且到下篇源碼分析時(shí)再做探討。

3、thenCompose

thenCompose方法會(huì)在某個(gè)任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為方法入?yún)⑷缓髨?zhí)行指定的方法,該方法會(huì)返回一個(gè)新的CompletableFuture實(shí)例,如果該CompletableFuture實(shí)例的result不為null,則返回一個(gè)基于該result的新的CompletableFuture實(shí)例;如果該CompletableFuture實(shí)例為null,則,然后執(zhí)行這個(gè)新任務(wù),測(cè)試用例如下:

    @Test
    public void test9() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<String> cf2= cf.thenCompose((param)->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
                return "job3 test";
            });
        });
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("cf run result->"+cf.get());
        System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
        System.out.println("cf2 run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

其輸出如下:

job1執(zhí)行完成后job2開(kāi)始執(zhí)行,等job2執(zhí)行完成后會(huì)把job3返回,然后執(zhí)行job3,等job3執(zhí)行完成后,主線(xiàn)程退出。

4、allOf / anyOf

allOf返回的CompletableFuture是多個(gè)任務(wù)都執(zhí)行完成后才會(huì)執(zhí)行,只有有一個(gè)任務(wù)執(zhí)行異常,則返回的CompletableFuture執(zhí)行g(shù)et方法時(shí)會(huì)拋出異常,如果都是正常執(zhí)行,則get返回null。

 @Test
    public void test11() throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1300);
            } catch (InterruptedException e) {
            }
//            throw new RuntimeException("test");
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return 2.2;
        });
        //allof等待所有任務(wù)執(zhí)行完成才執(zhí)行cf4,如果有一個(gè)任務(wù)異常終止,則cf4.get時(shí)會(huì)拋出異常,都是正常執(zhí)行,cf4.get返回null
        //anyOf是只有一個(gè)任務(wù)執(zhí)行完成,無(wú)論是正常執(zhí)行或者執(zhí)行異常,都會(huì)執(zhí)行cf4,cf4.get的結(jié)果就是已執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果
        CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{
           if(b!=null){
               System.out.println("error stack trace->");
               b.printStackTrace();
           }else{
               System.out.println("run succ,result->"+a);
           }
        });
 
        System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
        //等待子任務(wù)執(zhí)行完成
        System.out.println("cf4 run result->"+cf4.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

 其輸出如下:

主線(xiàn)程等待最后一個(gè)job1執(zhí)行完成后退出。anyOf返回的CompletableFuture是多個(gè)任務(wù)只要其中一個(gè)執(zhí)行完成就會(huì)執(zhí)行,其get返回的是已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果,如果該任務(wù)執(zhí)行異常,則拋出異常。將上述測(cè)試用例中allOf改成anyOf后,其輸出如下:

總結(jié)

到此這篇關(guān)于Java8中CompletableFuture用法的文章就介紹到這了,更多相關(guān)Java8 CompletableFuture用法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java list與數(shù)組之間的轉(zhuǎn)換詳細(xì)解析

    java list與數(shù)組之間的轉(zhuǎn)換詳細(xì)解析

    以下是對(duì)java中l(wèi)ist與數(shù)組之間的轉(zhuǎn)換進(jìn)行了詳細(xì)的分析介紹,需要的朋友可以過(guò)來(lái)參考下
    2013-09-09
  • 設(shè)計(jì)模式在Spring框架中的應(yīng)用匯總

    設(shè)計(jì)模式在Spring框架中的應(yīng)用匯總

    這篇文章主要介紹了設(shè)計(jì)模式在Spring框架中的應(yīng)用匯總,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Java-JFrame-swing嵌套瀏覽器的具體步驟

    Java-JFrame-swing嵌套瀏覽器的具體步驟

    下面小編就為大家?guī)?lái)一篇Java-JFrame-swing嵌套瀏覽器的具體步驟。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-10-10
  • 解決MultipartFile.transferTo(dest) 報(bào)FileNotFoundExcep的問(wèn)題

    解決MultipartFile.transferTo(dest) 報(bào)FileNotFoundExcep的問(wèn)題

    這篇文章主要介紹了解決MultipartFile.transferTo(dest) 報(bào)FileNotFoundExcep的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • java對(duì)象初始化代碼詳解

    java對(duì)象初始化代碼詳解

    這篇文章主要介紹了java對(duì)象初始化代碼詳解,涉及實(shí)例變量的初始化,類(lèi)變量的初始化等相關(guān)介紹幾代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-11-11
  • Spring Cloud Gateway 默認(rèn)的filter功能和執(zhí)行順序介紹

    Spring Cloud Gateway 默認(rèn)的filter功能和執(zhí)行順序介紹

    這篇文章主要介紹了Spring Cloud Gateway 默認(rèn)的filter功能和執(zhí)行順序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • 解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題

    解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題

    這篇文章主要介紹了解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題,本文通過(guò)兩種方案給大家詳細(xì)介紹,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-05-05
  • SpringBoot基于SpringSecurity表單登錄和權(quán)限驗(yàn)證的示例

    SpringBoot基于SpringSecurity表單登錄和權(quán)限驗(yàn)證的示例

    這篇文章主要介紹了SpringBoot基于SpringSecurity表單登錄和權(quán)限驗(yàn)證的示例。文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • eclipse實(shí)現(xiàn)可認(rèn)證的DH密鑰交換協(xié)議

    eclipse實(shí)現(xiàn)可認(rèn)證的DH密鑰交換協(xié)議

    這篇文章主要介紹了eclipse實(shí)現(xiàn)可認(rèn)證的DH密鑰交換協(xié)議,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-06-06
  • Java中Tuple的功能及使用詳解

    Java中Tuple的功能及使用詳解

    Java中的Tuple是一種非常有用的數(shù)據(jù)結(jié)構(gòu),它可以讓開(kāi)發(fā)者在處理多個(gè)數(shù)據(jù)元素時(shí)更加方便和高效,通過(guò)Tuple,我們可以將多個(gè)變量打包成一個(gè)對(duì)象,從而減少了代碼量,提高了代碼可讀性,這篇文章主要介紹了Java中Tuple的功能及使用場(chǎng)景,需要的朋友可以參考下
    2024-01-01

最新評(píng)論