java?CompletableFuture異步任務(wù)編排示例詳解
前言
在之前的項(xiàng)目開發(fā)中,都沒怎么使用過CompletableFuture的功能,只聽說過和異步編程有關(guān)。為了能夠在將來有需要的時(shí)候用得上,這兩天花了點(diǎn)時(shí)間學(xué)習(xí)了一下,并簡(jiǎn)單地總結(jié)一下如何使用CompletableFuture完成異步任務(wù)編排。
先創(chuàng)建一個(gè)自定義的線程池,后續(xù)所有代碼都會(huì)使用到:
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() { private final AtomicInteger THREAD_NUM = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); // 設(shè)置為守護(hù)線程,main線程結(jié)束就跟著一起結(jié)束,否則main函數(shù)結(jié)束jvm還在 t.setDaemon(true); t.setName("completable-future-test-Thread-" + THREAD_NUM.incrementAndGet()); return t; } }, new ThreadPoolExecutor.AbortPolicy());
同步串行
同步串行代表任務(wù)1、任務(wù)2、任務(wù)3按時(shí)間先后順序執(zhí)行,并且都是同一個(gè)線程來執(zhí)行。
示例代碼如下:
CompletableFuture .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR) .thenApply( (task1Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println("拿到上一個(gè)任務(wù)的返回值:" + task1Result); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }) .thenAccept( (task2Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println("拿到上一個(gè)任務(wù)的返回值:" + task2Result); System.out.println(taskName + "執(zhí)行結(jié)束"); });
執(zhí)行結(jié)果:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task2
拿到上一個(gè)任務(wù)的返回值:task1
task2執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task3
拿到上一個(gè)任務(wù)的返回值:task2
task3執(zhí)行結(jié)束
1.入口函數(shù)supplyAsync()
代表一個(gè)異步的有返回值的函數(shù),之所以異步,是與主線程區(qū)別,從線程池中的拿一個(gè)線程來執(zhí)行。
2.thenApply()
和thenAccept()
沒有Async
,意味著是和前面的任務(wù)共用一個(gè)線程,從執(zhí)行結(jié)果上我們也可以看到線程名稱相同。
3.thenApply()
需要接收上一個(gè)任務(wù)的返回值,并且自己也要有返回值。
4.thenAccept()
需要接收上一個(gè)任務(wù)的返回值,但是它不需要返回值。
異步串行
異步串行代表任務(wù)1、任務(wù)2、任務(wù)3按時(shí)間先后順序執(zhí)行,并由不同的線程來執(zhí)行。
示例代碼如下:
CompletableFuture // 有返回值 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR) // 需要上一個(gè)任務(wù)的返回值,并且自身有返回值 .thenApplyAsync( (task1Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println("拿到上一個(gè)任務(wù)的返回值:" + task1Result); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR) // 不需要上一個(gè)任務(wù)的返回值,自身也沒有返回值 .thenRunAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println("thenRunAsync()不需要上一個(gè)任務(wù)的返回值"); System.out.println(taskName + "執(zhí)行結(jié)束"); }, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task2
拿到上一個(gè)任務(wù)的返回值:task1
task2執(zhí)行結(jié)束
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task3
thenRunAsync()不需要上一個(gè)任務(wù)的返回值
task3執(zhí)行結(jié)束
1.入口函數(shù)依然是supplyAsync()
,需要傳入一個(gè)有返回值的函數(shù)作為參數(shù);如果想要沒有返回值的函數(shù)傳進(jìn)來的話,可以使用CompletableFuture.runAsync()
;
2.thenApplyAsync()
和thenRunAsync()
分別表示里面的任務(wù)都是異步執(zhí)行的,和執(zhí)行前面的任務(wù)不是同一個(gè)線程;
3.thenRunAsync()
需要傳入一個(gè)既不需要參數(shù),也沒有返回值的任務(wù);
并行任務(wù)
并行代表任務(wù)1、任務(wù)2、任務(wù)3沒有依賴關(guān)系,分別由不同的線程執(zhí)行;
示例代碼如下:
CompletableFuture<String> future1 = CompletableFuture .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture<Void> future2 = CompletableFuture .runAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); }, THREAD_POOL_EXECUTOR); CompletableFuture<String> future3 = CompletableFuture .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task3
task3執(zhí)行結(jié)束
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
一看執(zhí)行結(jié)果,明顯是亂序的,并且三個(gè)任務(wù)分別由三個(gè)線程執(zhí)行,符合咱們的預(yù)期;注意異步的方法后面都是帶有Async
關(guān)鍵字的;
多任務(wù)結(jié)果合并計(jì)算
- 兩個(gè)任務(wù)結(jié)果的合并
任務(wù)3的執(zhí)行依賴于任務(wù)1、任務(wù)2的返回值,并且任務(wù)1和任務(wù)3由同一個(gè)線程執(zhí)行,任務(wù)2單獨(dú)一個(gè)線程執(zhí)行;
示例代碼如下:
CompletableFuture // 任務(wù)1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR) .thenCombine( CompletableFuture // 任務(wù)2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR), // 任務(wù)3 (task1Result, task2Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("task1結(jié)果:" + task1Result + "\ttask2結(jié)果:" + task2Result); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; });
執(zhí)行結(jié)果如下:
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
正在執(zhí)行任務(wù)task2
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task3
task1結(jié)果:task1 task2結(jié)果:task2
正在執(zhí)行任務(wù)task3
task3執(zhí)行結(jié)束
CompletableFuture
提供了thenCombine()
來合并另一個(gè)CompletableFuture
的執(zhí)行結(jié)果,所以thenCombine()
需要兩個(gè)參數(shù),第一個(gè)參數(shù)是另一個(gè)CompletableFuture
,第二個(gè)參數(shù)會(huì)收集前兩個(gè)任務(wù)的返回值,類似下面這樣:
(result1,result2)->{ // 執(zhí)行業(yè)務(wù)邏輯 return result3; }
如果小伙伴們想要實(shí)現(xiàn)任務(wù)3也是單獨(dú)的線程執(zhí)行的話,可以使用thenCombineAsync()
這個(gè)方法。代碼如下:
CompletableFuture // 任務(wù)1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR) .thenCombineAsync( CompletableFuture // 任務(wù)2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return 2; }, THREAD_POOL_EXECUTOR), // 任務(wù)3 (task1Result, task2Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("task1結(jié)果:" + task1Result + "\ttask2結(jié)果:" + task2Result); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return 2L; }, THREAD_POOL_EXECUTOR);
如果任務(wù)3中不需要返回結(jié)果,可以使用thenAcceptBoth()
或thenAcceptBothAsync()
,使用方式與thenCombineAsync()
類似;
- 多任務(wù)結(jié)果合并
示例代碼如下:
CompletableFuture future1 = CompletableFuture // 任務(wù)1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future2 = CompletableFuture // 任務(wù)2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future3 = CompletableFuture // 任務(wù)3 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3}; CompletableFuture.allOf(futures) // 任務(wù)4 .whenCompleteAsync( (v, e) -> { List<Object> values = new ArrayList<>(); for (CompletableFuture future : futures) { try { values.add(future.get()); } catch (Exception ex) { } } Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task4"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("前面任務(wù)的處理結(jié)果:" + values); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); }, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task4
前面任務(wù)的處理結(jié)果:[task1, task2, task3]
正在執(zhí)行任務(wù)task4
task4執(zhí)行結(jié)束
之所以最后任務(wù)4的線程是completable-future-test-Thread-2
,那是因?yàn)榫€程池的核心線程數(shù)設(shè)置為3,線程數(shù)設(shè)置高一點(diǎn)就會(huì)創(chuàng)建新的線程處理;
從上述代碼示例中,我們可以收獲到另一個(gè)知識(shí)點(diǎn):allOf()
,它的作用是要求所有的任務(wù)全部完成才能執(zhí)行后面的任務(wù)。
任一任務(wù)完成
在一批任務(wù)中,只要有一個(gè)任務(wù)完成,那么就可以向后繼續(xù)執(zhí)行其他任務(wù)。
為了代碼演示無異議,后續(xù)代碼中,我們把線程數(shù)提升到4。
示例代碼如下:
CompletableFuture future1 = CompletableFuture // 任務(wù)1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future2 = CompletableFuture // 任務(wù)2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future3 = CompletableFuture // 任務(wù)3 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture.anyOf(future1, future2, future3) .thenApplyAsync((taskResult) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task4"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("前面任務(wù)的處理結(jié)果:" + taskResult); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
task2執(zhí)行結(jié)束
completable-future-test-Thread-5開始執(zhí)行任務(wù):task4
前面任務(wù)的處理結(jié)果:task1
正在執(zhí)行任務(wù)task4
task4執(zhí)行結(jié)束
可以看到,任務(wù)1第一個(gè)結(jié)束,所以任務(wù)4中接收到的執(zhí)行結(jié)果就是任務(wù)1的返回值。
快速失敗
在一批任務(wù)當(dāng)中,只要有任意一個(gè)任務(wù)執(zhí)行產(chǎn)生異常了,那么就直接結(jié)束;否則就要等待所有任務(wù)成功執(zhí)行完畢。
示例代碼如下:
CompletableFuture future1 = CompletableFuture // 任務(wù)1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future2 = CompletableFuture // 任務(wù)2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); throw new RuntimeException("任務(wù)2異常!"); }, THREAD_POOL_EXECUTOR); CompletableFuture future3 = CompletableFuture // 任務(wù)3 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); throw new RuntimeException("任務(wù)3異常!"); }, THREAD_POOL_EXECUTOR); CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3}; CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures); // 創(chuàng)建一個(gè)任務(wù)來監(jiān)聽異常 CompletableFuture<?> anyException = new CompletableFuture<>(); for (CompletableFuture<?> completableFuture : futures) { completableFuture.exceptionally((t) -> { // 任何一個(gè)任務(wù)異常都會(huì)讓anyException任務(wù)完成 anyException.completeExceptionally(t); return null; }); } // 要么allCompletableFuture全部成功,要么一個(gè)出現(xiàn)異常就結(jié)束任務(wù) CompletableFuture.anyOf(allCompletableFuture, anyException) .whenComplete((value, exception) -> { if (Objects.nonNull(exception)) { System.out.println("產(chǎn)生異常,提前結(jié)束!"); exception.printStackTrace(); return; } System.out.println("所有任務(wù)正常完成!"); });
執(zhí)行結(jié)果如下:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1 completable-future-test-Thread-3開始執(zhí)行任務(wù):task2 completable-future-test-Thread-4開始執(zhí)行任務(wù):task3 正在執(zhí)行任務(wù)task2 正在執(zhí)行任務(wù)task3 正在執(zhí)行任務(wù)task1 task2執(zhí)行結(jié)束 task1執(zhí)行結(jié)束 task3執(zhí)行結(jié)束 產(chǎn)生異常,提前結(jié)束! java.util.concurrent.CompletionException: java.lang.RuntimeException: 任務(wù)2異常! at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.RuntimeException: 任務(wù)2異常! at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more
CompletableFuture
沒有現(xiàn)成的api實(shí)現(xiàn)快速失敗的功能,所以我們只能結(jié)合allOf()
和anyOf()
來邏輯來自定義方法完成快速失敗的邏輯;
1.我們需要額外創(chuàng)建一個(gè)CompletableFuture來監(jiān)聽所有的CompletableFuture,一旦其中一個(gè)CompletableFuture產(chǎn)生異常,我們就設(shè)置額外的CompletableFuture立即完成。
2.把所有的CompletableFuture和額外的CompletableFuture放在anyOf()
方法中,這樣一旦額外的CompletableFuture完成,說明產(chǎn)生異常了;否則就需要等待所有的CompletableFuture完成。
注意
- 異常處理
最后需要注意的是,所有的CompletableFuture
任務(wù)一定要加上異常處理:
CompletableFuture // 任務(wù)1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName); System.out.println("正在執(zhí)行任務(wù)" + taskName); System.out.println(taskName + "執(zhí)行結(jié)束"); return taskName; }, THREAD_POOL_EXECUTOR) .whenComplete((v,e)->{ if(Objects.nonNull(e)){ // todo // 處理異常 } if(Objects.nonNull(v)){ // todo } });
還可以通過另外兩個(gè)方法處理:exceptionally()
或者handle()
;
- 自定義線程池
CompletableFuture
默認(rèn)的線程池是ForkJoinThreadPool
,建議大家在使用的時(shí)候盡可能地使用自定義線程池,這樣方便后續(xù)的代碼優(yōu)化以及相關(guān)的日志查看。
以上就是java CompletableFuture異步任務(wù)編排示例詳解的詳細(xì)內(nèi)容,更多關(guān)于java CompletableFuture異步編排的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中List轉(zhuǎn)Map List實(shí)現(xiàn)的幾種姿勢(shì)
本文主要介紹了Java中List轉(zhuǎn)Map List實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06解決Maven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法
下面小編就為大家?guī)硪黄鉀QMaven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-07-07如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序
這篇文章主要介紹了如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12利用java和sqlserver建立簡(jiǎn)易圖書管理系統(tǒng)的完整步驟
圖書館管理系統(tǒng)是圖書館管理工作中不可缺少的部分,它對(duì)于圖書館的管理者和使用者都非常重要,下面這篇文章主要給大家介紹了關(guān)于利用java和sqlserver建立簡(jiǎn)易圖書管理系統(tǒng)的完整步驟,需要的朋友可以參考下2022-06-06springboot2.3 整合mybatis-plus 高級(jí)功能(圖文詳解)
這篇文章主要介紹了springboot2.3 整合mybatis-plus 高級(jí)功能,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接
RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,這篇文章主要介紹了Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接,需要的朋友可以參考下2022-10-10深入理解java異常處理機(jī)制的原理和開發(fā)應(yīng)用
Java異常處理機(jī)制在日常開發(fā)中應(yīng)用頻繁,本篇文章主要在基礎(chǔ)的使用方法上,更進(jìn)一步的,如何更加合理的使用異常機(jī)制,希望可以對(duì)各位朋友能有所幫助。2017-04-04