Java8?CompletableFuture?runAsync學(xué)習(xí)總結(jié)submit()?execute()等
一般的 Executors 的 execute以及submit
并發(fā)包下 Executors 創(chuàng)建的線程存在 一個(gè) execute(),以及三個(gè) submit()
不同的是使用 execute() 執(zhí)行的任務(wù)是沒有返回值的,使用 submit() 則是存在返回值的,這與接下里要說的 CompletableFuture.runAsync 有些類似。
測(cè)試代碼如下:
@Test public void justFor(){ ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Float> submit = executorService.submit(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); Float aFloat = null; try { aFloat = submit.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 根據(jù)自己需要決定是否需要調(diào)用關(guān)閉線程 // executorService.shutdown(); System.out.println("aFloat = " + aFloat); }
結(jié)果:
Thread.currentThread() = Thread[pool-2-thread-1,5,main]
aFloat = 1.03
使用 submit 可以通過 get獲取線程中任務(wù)的返回結(jié)果,可以通過對(duì)象獲取當(dāng)前狀態(tài) isDone 或者 isCancelled ;
子線程異步執(zhí)行,主線程休眠等待子線程執(zhí)行完成,子線程執(zhí)行完成后喚醒主線程,主線程獲取任務(wù)執(zhí)行結(jié)果后退出
此時(shí)我加入一個(gè)異常代碼,使其必定出錯(cuò)再來看看結(jié)果
@Test public void justFor(){ ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Float> submit = executorService.submit(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); int ii = 1/0; return 1.2f; }); Float aFloat = null; try { aFloat = submit.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // executorService.shutdown(); System.out.println("aFloat = " + aFloat); }
執(zhí)行結(jié)果:
此時(shí)即使異常依舊終止了子線程以及主線程的執(zhí)行。
CompletableFuture 的 supplyAsync() / runAsync()
supplyAsync
表示創(chuàng)建帶返回值的異步任務(wù),相當(dāng)于ExecutorService submit(Callable< T> task)runAsync
表示創(chuàng)建無(wú)返回值的異步任務(wù),相當(dāng)于ExecutorService submit(Runnable task)方法,這兩個(gè)方法效果與 submit 一致
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); try { Float aFloat = floatCompletableFuture.get(); System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println("aFloat = " + aFloat); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
輸出結(jié)果:
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread.currentThread() = Thread[main,5,main]
aFloat = 1.03
日志中 ForkJoinPool 為jdk1.7 提供的一個(gè)新的分而治之的性能更好的并發(fā)處理線程池,比一般的Executors 更好一點(diǎn),適用于高密度計(jì)算的任務(wù)。
但也可以如此寫
即將該任務(wù)提交到指定的線程池中執(zhí)行該任務(wù);
輸出的線程池不一致
類似的 runAsync() 也可以這樣,使用自己的異步線程或者提交到指定的線程池中執(zhí)行
可以看得出使用第二個(gè)參數(shù)均提供了可以指定 Executor 沒有指定時(shí)默認(rèn)使用 ForkJoinPool.commonPool()
一般的
runAsync 如下:
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return; });
可以看得出 并沒有任何返回值
CompletableFuture 的 thenApply() / thenApplyAsync()
thenApply 表示某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作即回調(diào)方法,會(huì)將該任務(wù)的執(zhí)行結(jié)果即方法的返回值會(huì)作為作為入?yún)鬟f到接下來的回調(diào)方法中
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); CompletableFuture<Float> floatCompletableFuture1 = floatCompletableFuture.thenApply((resultFloat) -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println("接受上一個(gè) resultFloat = " + resultFloat); return 2.01f; }); CompletableFuture<Float> floatCompletableFuture2 = floatCompletableFuture1.thenApplyAsync((result2) -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println("result2 = " + result2); return 2.21f; }); try { Float aFloat = floatCompletableFuture.get(); System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println("aFloat = " + aFloat); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
輸出結(jié)果:
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread.currentThread() = Thread[main,5,main]
接受上一個(gè) resultFloat = 1.03
Thread.currentThread() = Thread[main,5,main]
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
aFloat = 1.03
result2 = 2.01
thenApplyAsync 與 thenApply 區(qū)別:
thenApplyAsync
將任務(wù)異步處理,可以選擇提交到某一個(gè)線程池中執(zhí)行thenApply
則將會(huì)在上一個(gè)任務(wù)的同一個(gè)線程中執(zhí)行
上面代碼也可以連著書寫如下:
CompletableFuture 的 thenAccept() / thenRun()
thenAccept
同thenApply
接收上一個(gè)任務(wù)的返回值作為參數(shù)但是沒有返回值thenAcceptAsync
同上但為異步線程,可以指定提交到某一個(gè)線程池中thenRun
方法沒有入?yún)?,也沒有返回值thenRunAsync
同上但為異步線程,可以指定提交到某一個(gè)線程池中
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); CompletableFuture<Void> floatCompletableFuture1= floatCompletableFuture.thenApply((resultFloat) -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println("接受上一個(gè) resultFloat = " + resultFloat); return 2.01f; }).thenAccept((result)->{ System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println("result = " + result); }).thenRun(()->{ System.out.println("Thread.currentThread() = " + Thread.currentThread()); System.out.println(" doNothing"); }); }
CompletableFuture exceptionally
指定某個(gè)任務(wù)執(zhí)行異常時(shí)執(zhí)行的回調(diào)方法,會(huì)將拋出異常作為參數(shù)傳遞到回調(diào)方法中,如果該任務(wù)正常執(zhí)行則 exceptionally方法返回的CompletionStage的result就是該任務(wù)正常執(zhí)行的結(jié)果
正常示例:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); // float ii = 1/0; return 1.03f; }); CompletableFuture<Float> exceptionally = floatCompletableFuture.exceptionally((exception) -> { System.out.println("catch exception"); exception.printStackTrace(); return 0.0f; }); floatCompletableFuture.thenAccept((result)->{ System.out.println("is OK"); System.out.println("result = " + result); }); }
輸出結(jié)果:
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
is OK
異常示例:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); int a = 121/0; return 1.03f; }); CompletableFuture<Float> exceptionally = floatCompletableFuture.exceptionally((exception) -> { System.out.println("catch exception"); exception.printStackTrace(); return 0.0f; }); floatCompletableFuture.thenAccept((result)->{ System.out.println("is OK"); System.out.println("result = " + result); }); }
結(jié)果:
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
catch exception
CompletableFuture whenComplete
當(dāng)某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的回調(diào)方法,會(huì)將執(zhí)行結(jié)果或者執(zhí)行期間拋出的異常傳遞給回調(diào)方法
- 正常執(zhí)行則異常為null,回調(diào)方法對(duì)應(yīng)的CompletableFuture的result和該任務(wù)一致
- 異常執(zhí)行,則get方法拋出異常
- 同樣提供 Async 異步相關(guān)方法
正常:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); floatCompletableFuture.whenComplete((result, exception) -> { System.out.println("result = " + result); System.out.println("exception = " + exception); }); }
輸出:
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
result = 1.03
exception = null
異常時(shí)示例:
public static void main(String[] args) { CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); int ii = 12 / 0; return 1.03f; }); floatCompletableFuture.whenComplete((result, exception) -> { System.out.println("result = " + result); System.out.println("exception = " + exception); }); }
輸出:
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
result = null
exception = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
CompletableFuture handle
與 whenComplete 基本一致
區(qū)別在于handle的回調(diào)方法有返回值,且handle方法返回的CompletableFuture的result是回調(diào)方法的執(zhí)行結(jié)果或者回調(diào)方法執(zhí)行期間拋出的異常,與原始CompletableFuture的result無(wú)關(guān)
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); int ii = 12 / 0; return 1.03f; }); floatCompletableFuture.handle((result, exception) -> { System.out.println("result = " + result); System.out.println("exception = " + exception); return "???"; }); }
CompletableFuture 組合處理 thenCombine / thenAcceptBoth / runAfterBoth
三個(gè)方法都是將兩個(gè) CompletableFuture 組合起來
只有這兩個(gè)都正常執(zhí)行完了才會(huì)執(zhí)行某個(gè)任務(wù)區(qū)別在于
thenCombine
會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)鬟f到指定方法中,且該方法有返回值;thenAcceptBoth 同樣將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,但是無(wú)返回值;runAfterBoth
沒有入?yún)?,也沒有返回值。注意兩個(gè)任務(wù)中只要有一個(gè)執(zhí)行異常,則將該異常信息作為指定任務(wù)的執(zhí)行結(jié)果
同時(shí)這些也提供了Async 異步方法
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 2.03f; }); // 傳遞結(jié)果 有返回值 CompletableFuture<String> objectCompletableFuture = a1.thenCombine(a2, (a, b) -> { return "12"; }); // 傳遞結(jié)果 無(wú)返回值 CompletableFuture<Void> voidCompletableFuture = a1.thenAcceptBoth(a2, (a, b) -> { }); // 無(wú)入?yún)?無(wú)返回值 a1.runAfterBoth(a2, ()->{ // }); }
CompletableFuture applyToEither / acceptEither / runAfterEither
三個(gè)方法都是將兩個(gè)CompletableFuture組合起來
但與上面不同的是只要其中一個(gè)執(zhí)行完了就會(huì)執(zhí)行某個(gè)任務(wù),區(qū)別
applyToEither
會(huì)將已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,并有返回值?/li>acceptEither
同樣將已經(jīng)執(zhí)行完成的任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,但是沒有返回值;runAfterEither 沒有方法入?yún)?,也沒有返回值。
注意 兩個(gè)任務(wù)中只要有一個(gè)執(zhí)行異常,則將該異常信息作為指定任務(wù)的執(zhí)行結(jié)果
同時(shí)這些也提供了Async 異步方法
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 2.03f; }); // 傳遞結(jié)果 有返回值 CompletableFuture<String> objectCompletableFuture = a1.thenCombine(a2, (a, b) -> { return "12"; }); // 傳遞結(jié)果 無(wú)返回值 CompletableFuture<Void> voidCompletableFuture = a1.thenAcceptBoth(a2, (a, b) -> { }); // 無(wú)入?yún)?無(wú)返回值 a1.runAfterBoth(a2, ()->{ // }); }
CompletableFuture thenCompose
thenCompose
- 在某個(gè)任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為方法入?yún)⑷缓髨?zhí)行指定方法,該方法會(huì)返回一個(gè)新的CompletableFuture實(shí)例
- 如果該CompletableFuture實(shí)例的result不為null,則返回一個(gè)基于該result的新的CompletableFuture實(shí)例;
- 如果該CompletableFuture實(shí)例為null,則執(zhí)行這個(gè)新任務(wù)
同樣的提供Async方式
@Test public void justFor(){ CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 2.03f; }); // 傳遞結(jié)果 有返回值 CompletableFuture<String> objectCompletableFuture = a1.applyToEither(a2, (b) -> { return "12"; }); // 傳遞結(jié)果 無(wú)返回值 CompletableFuture<Void> voidCompletableFuture = a1.acceptEither(a2, (b) -> { }); // 無(wú)入?yún)?無(wú)返回值 a1.runAfterEither(a2, ()->{ // }); }
CompletableFuture 的 allOf() anyOf()
allOf
返回的CompletableFuture是多個(gè)任務(wù)都執(zhí)行完成后才會(huì)執(zhí)行,只要有一個(gè)任務(wù)執(zhí)行異常,則返回的 CompletableFuture 執(zhí)行g(shù)et方法時(shí)會(huì)拋出異常,如果都正常執(zhí)行,則get返回nullanyOf
只要有一個(gè)任務(wù)執(zhí)行完成,無(wú)論是正常執(zhí)行或者執(zhí)行異常,都會(huì)執(zhí)行向下執(zhí)行
示例代碼:
@Test public void justFor(){ CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 1.03f; }); CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> { System.out.println("Thread.currentThread() = " + Thread.currentThread()); return 2.03f; }); // 傳遞結(jié)果 有返回值 CompletableFuture<String> objectCompletableFuture = a1.applyToEither(a2, (b) -> { return "12"; }); // 傳遞結(jié)果 無(wú)返回值 CompletableFuture<Void> voidCompletableFuture = a1.acceptEither(a2, (b) -> { }); // 無(wú)入?yún)?無(wú)返回值 a1.runAfterEither(a2, ()->{ // }); }
參考文章
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Java?CompletableFuture實(shí)現(xiàn)多線程異步編排
- 詳解Java8?CompletableFuture的并行處理用法
- Java8 使用工廠方法supplyAsync創(chuàng)建CompletableFuture實(shí)例
- Java8 自定義CompletableFuture的原理解析
- Java8 CompletableFuture 異步執(zhí)行操作
- Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)
- Java8新的異步編程方式CompletableFuture實(shí)現(xiàn)
- Java8 CompletableFuture詳解
- Java中的CompletableFuture原理與用法
相關(guān)文章
java異步調(diào)用的4種實(shí)現(xiàn)方法
日常開發(fā)中,會(huì)經(jīng)常遇到說,前臺(tái)調(diào)服務(wù),本文主要介紹了java異步調(diào)用的4種實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03SpringCloud?hystrix斷路器與全局解耦全面介紹
什么是服務(wù)降級(jí)?當(dāng)服務(wù)器壓力劇增的情況下,根據(jù)實(shí)際業(yè)務(wù)情況及流量,對(duì)一些服務(wù)和頁(yè)面有策略的不處理或換種簡(jiǎn)單的方式處理,從而釋放服務(wù)器資源以保證核心交易正常運(yùn)作或高效運(yùn)作2022-10-10springboot整合gateway實(shí)現(xiàn)網(wǎng)關(guān)功能的示例代碼
本文主要介紹了springboot整合gateway實(shí)現(xiàn)網(wǎng)關(guān)功能的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02詳解關(guān)于Windows10 Java環(huán)境變量配置問題的解決辦法
這篇文章主要介紹了關(guān)于Windows10 Java環(huán)境變量配置問題的解決辦法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03