Java8中CompletableFuture的用法全解
前言
CompletableFuture實(shí)現(xiàn)了CompletionStage接口和Future接口,前者是對(duì)后者的一個(gè)擴(kuò)展,增加了異步回調(diào)、流式處理、多個(gè)Future組合處理的能力,使Java在處理多任務(wù)的協(xié)同工作時(shí)更加順暢便利。
一、創(chuàng)建異步任務(wù)
1、Future.submit
通常的線(xiàn)程池接口類(lèi)ExecutorService,其中execute方法的返回值是void,即無(wú)法獲取異步任務(wù)的執(zhí)行狀態(tài),3個(gè)重載的submit方法的返回值是Future,可以據(jù)此獲取任務(wù)執(zhí)行的狀態(tài)和結(jié)果,示例如下:
@Test public void test3() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù): ExecutorService executorService= Executors.newSingleThreadExecutor(); Future<Double> cf = executorService.submit(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(false){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return 1.2; } }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成,如果已完成則直接返回結(jié)果 //如果執(zhí)行任務(wù)異常,則get方法會(huì)把之前捕獲的異常重新拋出 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
執(zhí)行結(jié)果如下:
子線(xiàn)程是異步執(zhí)行的,主線(xiàn)程休眠等待子線(xiàn)程執(zhí)行完成,子線(xiàn)程執(zhí)行完成后喚醒主線(xiàn)程,主線(xiàn)程獲取任務(wù)執(zhí)行結(jié)果后退出。
很多博客說(shuō)使用不帶等待時(shí)間限制的get方法時(shí),如果子線(xiàn)程執(zhí)行異常了會(huì)導(dǎo)致主線(xiàn)程長(zhǎng)期阻塞,這其實(shí)是錯(cuò)誤的,子線(xiàn)程執(zhí)行異常時(shí)其異常會(huì)被捕獲,然后修改任務(wù)的狀態(tài)為異常結(jié)束并喚醒等待的主線(xiàn)程,get方法判斷任務(wù)狀態(tài)發(fā)生變更,就終止等待了,并拋出異常,可參考《Java8 AbstractExecutorService 和 FutureTask 源碼解析》中FutureTask的實(shí)現(xiàn)。將上述用例中if(false)改成if(true) ,執(zhí)行結(jié)果如下:
get方法拋出異常導(dǎo)致主線(xiàn)程異常終止。
2、supplyAsync / runAsync
supplyAsync表示創(chuàng)建帶返回值的異步任務(wù)的,相當(dāng)于ExecutorService submit(Callable<T> task) 方法,runAsync表示創(chuàng)建無(wú)返回值的異步任務(wù),相當(dāng)于ExecutorService submit(Runnable task)方法,這兩方法的效果跟submit是一樣的,測(cè)試用例如下:
@Test public void test2() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù),有返回值 CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return 1.2; } }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); } @Test public void test4() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù),無(wú)返回值 CompletableFuture cf = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(false){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); } }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
這兩方法各有一個(gè)重載版本,可以指定執(zhí)行異步任務(wù)的Executor實(shí)現(xiàn),如果不指定,默認(rèn)使用ForkJoinPool.commonPool(),如果機(jī)器是單核的,則默認(rèn)使用ThreadPerTaskExecutor,該類(lèi)是一個(gè)內(nèi)部類(lèi),每次執(zhí)行execute都會(huì)創(chuàng)建一個(gè)新線(xiàn)程。測(cè)試用例如下:
@Test public void test2() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return 1.2; } },pool); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); } @Test public void test4() throws Exception { ExecutorService executorService= Executors.newSingleThreadExecutor(); // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture cf = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(false){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); } },executorService); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
二、異步回調(diào)
1、thenApply / thenApplyAsync
thenApply 表示某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作,即回調(diào)方法,會(huì)將該任務(wù)的執(zhí)行結(jié)果即方法返回值作為入?yún)鬟f到回調(diào)方法中,測(cè)試用例如下:
@Test public void test5() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; },pool); //cf關(guān)聯(lián)的異步任務(wù)的返回值作為方法入?yún)?,傳入到thenApply的方法中 //thenApply這里實(shí)際創(chuàng)建了一個(gè)新的CompletableFuture實(shí)例 CompletableFuture<String> cf2=cf.thenApply((result)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return "test:"+result; }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其執(zhí)行結(jié)果如下:
job1執(zhí)行結(jié)束后,將job1的方法返回值作為入?yún)鬟f到j(luò)ob2中并立即執(zhí)行job2。thenApplyAsync與thenApply的區(qū)別在于,前者是將job2提交到線(xiàn)程池中異步執(zhí)行,實(shí)際執(zhí)行job2的線(xiàn)程可能是另外一個(gè)線(xiàn)程,后者是由執(zhí)行job1的線(xiàn)程立即執(zhí)行job2,即兩個(gè)job都是同一個(gè)線(xiàn)程執(zhí)行的。將上述測(cè)試用例中thenApply改成thenApplyAsync后,執(zhí)行結(jié)果如下:
從輸出可知,執(zhí)行job1和job2是兩個(gè)不同的線(xiàn)程。thenApplyAsync有一個(gè)重載版本,可以指定執(zhí)行異步任務(wù)的Executor實(shí)現(xiàn),如果不指定,默認(rèn)使用ForkJoinPool.commonPool()。 下述的多個(gè)方法,每個(gè)方法都有兩個(gè)以Async結(jié)尾的方法,一個(gè)使用默認(rèn)的Executor實(shí)現(xiàn),一個(gè)使用指定的Executor實(shí)現(xiàn),不帶Async的方法是由觸發(fā)該任務(wù)的線(xiàn)程執(zhí)行該任務(wù),帶Async的方法是由觸發(fā)該任務(wù)的線(xiàn)程將任務(wù)提交到線(xiàn)程池,執(zhí)行任務(wù)的線(xiàn)程跟觸發(fā)任務(wù)的線(xiàn)程不一定是同一個(gè)。
2、thenAccept / thenRun
thenAccept 同 thenApply 接收上一個(gè)任務(wù)的返回值作為參數(shù),但是無(wú)返回值;thenRun 的方法沒(méi)有入?yún)?,也買(mǎi)有返回值,測(cè)試用例如下:
@Test public void test6() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; },pool); //cf關(guān)聯(lián)的異步任務(wù)的返回值作為方法入?yún)?,傳入到thenApply的方法中 CompletableFuture cf2=cf.thenApply((result)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return "test:"+result; }).thenAccept((result)-> { //接收上一個(gè)任務(wù)的執(zhí)行結(jié)果作為入?yún)?,但是沒(méi)有返回值 System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(result); System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); }).thenRun(()->{ //無(wú)入?yún)?,也沒(méi)有返回值 System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("thenRun do something"); System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); //cf2 等待最后一個(gè)thenRun執(zhí)行完成 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其執(zhí)行結(jié)果如下:
3、 exceptionally
exceptionally方法指定某個(gè)任務(wù)執(zhí)行異常時(shí)執(zhí)行的回調(diào)方法,會(huì)將拋出異常作為參數(shù)傳遞到回調(diào)方法中,如果該任務(wù)正常執(zhí)行則會(huì)exceptionally方法返回的CompletionStage的result就是該任務(wù)正常執(zhí)行的結(jié)果,測(cè)試用例如下:
@Test public void test2() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } },pool); //cf執(zhí)行異常時(shí),將拋出的異常作為入?yún)鬟f給回調(diào)方法 CompletableFuture<Double> cf2= cf.exceptionally((param)->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("error stack trace->"); param.printStackTrace(); System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return -1.1; }); //cf正常執(zhí)行時(shí)執(zhí)行的邏輯,如果執(zhí)行異常則不調(diào)用此邏輯 CompletableFuture cf3=cf.thenAccept((param)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("param->"+param); System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成,此處無(wú)論是job2和job3都可以實(shí)現(xiàn)job2退出,主線(xiàn)程才退出,如果是cf,則主線(xiàn)程不會(huì)等待job2執(zhí)行完成自動(dòng)退出了 //cf2.get時(shí),沒(méi)有異常,但是依然有返回值,就是cf的返回值 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其輸出如下:
拋出異常后,只有cf2執(zhí)行了,cf3沒(méi)有執(zhí)行。將上述示例中的if(true) 改成if(false),其輸出如下:
cf2沒(méi)有指定,其result就是cf執(zhí)行的結(jié)果,理論上cf2.get應(yīng)該立即返回的,此處是等待了cf3,即job2執(zhí)行完成后才返回,具體原因且待下篇源碼分析時(shí)再做探討。
4、whenComplete
whenComplete是當(dāng)某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的回調(diào)方法,會(huì)將執(zhí)行結(jié)果或者執(zhí)行期間拋出的異常傳遞給回調(diào)方法,如果是正常執(zhí)行則異常為null,回調(diào)方法對(duì)應(yīng)的CompletableFuture的result和該任務(wù)一致,如果該任務(wù)正常執(zhí)行,則get方法返回執(zhí)行結(jié)果,如果是執(zhí)行異常,則get方法拋出異常。測(cè)試用例如下:
@Test public void test10() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(false){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } }); //cf執(zhí)行完成后會(huì)將執(zhí)行結(jié)果和執(zhí)行過(guò)程中拋出的異常傳入回調(diào)方法,如果是正常執(zhí)行的則傳入的異常為null CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); }); //等待子任務(wù)執(zhí)行完成 System.out.println("main thread start wait,time->"+System.currentTimeMillis()); //如果cf是正常執(zhí)行的,cf2.get的結(jié)果就是cf執(zhí)行的結(jié)果 //如果cf是執(zhí)行異常,則cf2.get會(huì)拋出異常 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
執(zhí)行結(jié)果如下:
將上述示例中的if(false) 改成if(true),其輸出如下:
5、handle
跟whenComplete基本一致,區(qū)別在于handle的回調(diào)方法有返回值,且handle方法返回的CompletableFuture的result是回調(diào)方法的執(zhí)行結(jié)果或者回調(diào)方法執(zhí)行期間拋出的異常,與原始CompletableFuture的result無(wú)關(guān)了。測(cè)試用例如下:
@Test public void test10() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } }); //cf執(zhí)行完成后會(huì)將執(zhí)行結(jié)果和執(zhí)行過(guò)程中拋出的異常傳入回調(diào)方法,如果是正常執(zhí)行的則傳入的異常為null CompletableFuture<String> cf2=cf.handle((a,b)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); if(b!=null){ return "run error"; }else{ return "run succ"; } }); //等待子任務(wù)執(zhí)行完成 System.out.println("main thread start wait,time->"+System.currentTimeMillis()); //get的結(jié)果是cf2的返回值,跟cf沒(méi)關(guān)系了 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其執(zhí)行結(jié)果如下:
將上述示例中的if(true) 改成if(false),其輸出如下:
三、組合處理
1、thenCombine / thenAcceptBoth / runAfterBoth
這三個(gè)方法都是將兩個(gè)CompletableFuture組合起來(lái),只有這兩個(gè)都正常執(zhí)行完了才會(huì)執(zhí)行某個(gè)任務(wù),區(qū)別在于,thenCombine會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)鬟f到指定方法中,且該方法有返回值;thenAcceptBoth同樣將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,但是無(wú)返回值;runAfterBoth沒(méi)有入?yún)?,也沒(méi)有返回值。注意兩個(gè)任務(wù)中只要有一個(gè)執(zhí)行異常,則將該異常信息作為指定任務(wù)的執(zhí)行結(jié)果。測(cè)試用例如下:
@Test public void test7() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,且有返回值 CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); System.out.println("job3 param a->"+a+",b->"+b); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return a+b; }); //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,無(wú)返回值 CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{ System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); System.out.println("job4 param a->"+a+",b->"+b); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); }); //cf4和cf3都執(zhí)行完成后,執(zhí)行cf5,無(wú)入?yún)?,無(wú)返回值 CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{ System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis()); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("cf5 do something"); System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis()); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("cf run result->"+cf.get()); System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis()); System.out.println("cf5 run result->"+cf5.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其運(yùn)行結(jié)果如下:
job1 和 job2幾乎同時(shí)運(yùn)行,job2比job1先執(zhí)行完成,等job1退出后,job3和job4幾乎同時(shí)開(kāi)始運(yùn)行,job4先退出,等job3執(zhí)行完成,job5開(kāi)始了,等job5執(zhí)行完成后,主線(xiàn)程退出。
2、applyToEither / acceptEither / runAfterEither
這三個(gè)方法都是將兩個(gè)CompletableFuture組合起來(lái),只要其中一個(gè)執(zhí)行完了就會(huì)執(zhí)行某個(gè)任務(wù),其區(qū)別在于applyToEither會(huì)將已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,并有返回值;acceptEither同樣將已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,但是沒(méi)有返回值;runAfterEither沒(méi)有方法入?yún)ⅲ矝](méi)有返回值。注意兩個(gè)任務(wù)中只要有一個(gè)執(zhí)行異常,則將該異常信息作為指定任務(wù)的執(zhí)行結(jié)果。測(cè)試用例如下:
@Test public void test8() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,且有返回值 CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); System.out.println("job3 param result->"+result); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return result; }); //cf和cf2的異步任務(wù)都執(zhí)行完成后,會(huì)將其執(zhí)行結(jié)果作為方法入?yún)鬟f給cf3,無(wú)返回值 CompletableFuture cf4=cf.acceptEither(cf2,(result)->{ System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); System.out.println("job4 param result->"+result); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); }); //cf4和cf3都執(zhí)行完成后,執(zhí)行cf5,無(wú)入?yún)?,無(wú)返回值 CompletableFuture cf5=cf4.runAfterEither(cf3,()->{ System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis()); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("cf5 do something"); System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis()); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("cf run result->"+cf.get()); System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis()); System.out.println("cf5 run result->"+cf5.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其運(yùn)行結(jié)果如下:
job1 和job2 同時(shí)開(kāi)始運(yùn)行,job2先執(zhí)行完成,然后job4開(kāi)始執(zhí)行,理論上job3和job4應(yīng)該同時(shí)開(kāi)始運(yùn)行,但是此時(shí)只有job4開(kāi)始執(zhí)行了,job3是等待job1執(zhí)行完成后才開(kāi)始執(zhí)行,job4先于job3執(zhí)行完成,然后job5開(kāi)始執(zhí)行,等job5執(zhí)行完成后,主線(xiàn)程退出。上述差異且到下篇源碼分析時(shí)再做探討。
3、thenCompose
thenCompose方法會(huì)在某個(gè)任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為方法入?yún)⑷缓髨?zhí)行指定的方法,該方法會(huì)返回一個(gè)新的CompletableFuture實(shí)例,如果該CompletableFuture實(shí)例的result不為null,則返回一個(gè)基于該result的新的CompletableFuture實(shí)例;如果該CompletableFuture實(shí)例為null,則,然后執(zhí)行這個(gè)新任務(wù),測(cè)試用例如下:
@Test public void test9() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<String> cf2= cf.thenCompose((param)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return "job3 test"; }); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("cf run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); System.out.println("cf2 run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其輸出如下:
job1執(zhí)行完成后job2開(kāi)始執(zhí)行,等job2執(zhí)行完成后會(huì)把job3返回,然后執(zhí)行job3,等job3執(zhí)行完成后,主線(xiàn)程退出。
4、allOf / anyOf
allOf返回的CompletableFuture是多個(gè)任務(wù)都執(zhí)行完成后才會(huì)執(zhí)行,只有有一個(gè)任務(wù)執(zhí)行異常,則返回的CompletableFuture執(zhí)行g(shù)et方法時(shí)會(huì)拋出異常,如果都是正常執(zhí)行,則get返回null。
@Test public void test11() throws Exception { // 創(chuàng)建異步執(zhí)行任務(wù): CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(1300); } catch (InterruptedException e) { } // throw new RuntimeException("test"); System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return 2.2; }); //allof等待所有任務(wù)執(zhí)行完成才執(zhí)行cf4,如果有一個(gè)任務(wù)異常終止,則cf4.get時(shí)會(huì)拋出異常,都是正常執(zhí)行,cf4.get返回null //anyOf是只有一個(gè)任務(wù)執(zhí)行完成,無(wú)論是正常執(zhí)行或者執(zhí)行異常,都會(huì)執(zhí)行cf4,cf4.get的結(jié)果就是已執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果 CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{ if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } }); System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis()); //等待子任務(wù)執(zhí)行完成 System.out.println("cf4 run result->"+cf4.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
其輸出如下:
主線(xiàn)程等待最后一個(gè)job1執(zhí)行完成后退出。anyOf返回的CompletableFuture是多個(gè)任務(wù)只要其中一個(gè)執(zhí)行完成就會(huì)執(zhí)行,其get返回的是已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果,如果該任務(wù)執(zhí)行異常,則拋出異常。將上述測(cè)試用例中allOf改成anyOf后,其輸出如下:
總結(jié)
到此這篇關(guān)于Java8中CompletableFuture用法的文章就介紹到這了,更多相關(guān)Java8 CompletableFuture用法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java list與數(shù)組之間的轉(zhuǎn)換詳細(xì)解析
以下是對(duì)java中l(wèi)ist與數(shù)組之間的轉(zhuǎn)換進(jìn)行了詳細(xì)的分析介紹,需要的朋友可以過(guò)來(lái)參考下2013-09-09設(shè)計(jì)模式在Spring框架中的應(yīng)用匯總
這篇文章主要介紹了設(shè)計(jì)模式在Spring框架中的應(yīng)用匯總,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11解決MultipartFile.transferTo(dest) 報(bào)FileNotFoundExcep的問(wèn)題
這篇文章主要介紹了解決MultipartFile.transferTo(dest) 報(bào)FileNotFoundExcep的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Spring Cloud Gateway 默認(rèn)的filter功能和執(zhí)行順序介紹
這篇文章主要介紹了Spring Cloud Gateway 默認(rèn)的filter功能和執(zhí)行順序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題
這篇文章主要介紹了解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題,本文通過(guò)兩種方案給大家詳細(xì)介紹,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05SpringBoot基于SpringSecurity表單登錄和權(quán)限驗(yàn)證的示例
這篇文章主要介紹了SpringBoot基于SpringSecurity表單登錄和權(quán)限驗(yàn)證的示例。文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09eclipse實(shí)現(xiàn)可認(rèn)證的DH密鑰交換協(xié)議
這篇文章主要介紹了eclipse實(shí)現(xiàn)可認(rèn)證的DH密鑰交換協(xié)議,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-06-06