java?CompletableFuture異步任務(wù)編排示例詳解
前言
在之前的項(xiàng)目開發(fā)中,都沒怎么使用過CompletableFuture的功能,只聽說過和異步編程有關(guān)。為了能夠在將來有需要的時候用得上,這兩天花了點(diǎn)時間學(xué)習(xí)了一下,并簡單地總結(jié)一下如何使用CompletableFuture完成異步任務(wù)編排。
先創(chuàng)建一個自定義的線程池,后續(xù)所有代碼都會使用到:
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {
private final AtomicInteger THREAD_NUM = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
// 設(shè)置為守護(hù)線程,main線程結(jié)束就跟著一起結(jié)束,否則main函數(shù)結(jié)束jvm還在
t.setDaemon(true);
t.setName("completable-future-test-Thread-" + THREAD_NUM.incrementAndGet());
return t;
}
}, new ThreadPoolExecutor.AbortPolicy());
同步串行

同步串行代表任務(wù)1、任務(wù)2、任務(wù)3按時間先后順序執(zhí)行,并且都是同一個線程來執(zhí)行。
示例代碼如下:
CompletableFuture
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR)
.thenApply(
(task1Result) -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println("拿到上一個任務(wù)的返回值:" + task1Result);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
})
.thenAccept(
(task2Result) -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println("拿到上一個任務(wù)的返回值:" + task2Result);
System.out.println(taskName + "執(zhí)行結(jié)束");
});
執(zhí)行結(jié)果:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task2
拿到上一個任務(wù)的返回值:task1
task2執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task3
拿到上一個任務(wù)的返回值:task2
task3執(zhí)行結(jié)束
1.入口函數(shù)supplyAsync()代表一個異步的有返回值的函數(shù),之所以異步,是與主線程區(qū)別,從線程池中的拿一個線程來執(zhí)行。
2.thenApply()和thenAccept()沒有Async,意味著是和前面的任務(wù)共用一個線程,從執(zhí)行結(jié)果上我們也可以看到線程名稱相同。
3.thenApply()需要接收上一個任務(wù)的返回值,并且自己也要有返回值。
4.thenAccept()需要接收上一個任務(wù)的返回值,但是它不需要返回值。
異步串行

異步串行代表任務(wù)1、任務(wù)2、任務(wù)3按時間先后順序執(zhí)行,并由不同的線程來執(zhí)行。
示例代碼如下:
CompletableFuture
// 有返回值
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR)
// 需要上一個任務(wù)的返回值,并且自身有返回值
.thenApplyAsync(
(task1Result) -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println("拿到上一個任務(wù)的返回值:" + task1Result);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR)
// 不需要上一個任務(wù)的返回值,自身也沒有返回值
.thenRunAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println("thenRunAsync()不需要上一個任務(wù)的返回值");
System.out.println(taskName + "執(zhí)行結(jié)束");
}, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task2
拿到上一個任務(wù)的返回值:task1
task2執(zhí)行結(jié)束
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task3
thenRunAsync()不需要上一個任務(wù)的返回值
task3執(zhí)行結(jié)束
1.入口函數(shù)依然是supplyAsync(),需要傳入一個有返回值的函數(shù)作為參數(shù);如果想要沒有返回值的函數(shù)傳進(jìn)來的話,可以使用CompletableFuture.runAsync();
2.thenApplyAsync()和thenRunAsync()分別表示里面的任務(wù)都是異步執(zhí)行的,和執(zhí)行前面的任務(wù)不是同一個線程;
3.thenRunAsync()需要傳入一個既不需要參數(shù),也沒有返回值的任務(wù);
并行任務(wù)

并行代表任務(wù)1、任務(wù)2、任務(wù)3沒有依賴關(guān)系,分別由不同的線程執(zhí)行;
示例代碼如下:
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture<Void> future2 = CompletableFuture
.runAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
}, THREAD_POOL_EXECUTOR);
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task3
task3執(zhí)行結(jié)束
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
一看執(zhí)行結(jié)果,明顯是亂序的,并且三個任務(wù)分別由三個線程執(zhí)行,符合咱們的預(yù)期;注意異步的方法后面都是帶有Async關(guān)鍵字的;
多任務(wù)結(jié)果合并計(jì)算
- 兩個任務(wù)結(jié)果的合并

任務(wù)3的執(zhí)行依賴于任務(wù)1、任務(wù)2的返回值,并且任務(wù)1和任務(wù)3由同一個線程執(zhí)行,任務(wù)2單獨(dú)一個線程執(zhí)行;
示例代碼如下:
CompletableFuture
// 任務(wù)1
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR)
.thenCombine(
CompletableFuture
// 任務(wù)2
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR),
// 任務(wù)3
(task1Result, task2Result) -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("task1結(jié)果:" + task1Result + "\ttask2結(jié)果:" + task2Result);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
});
執(zhí)行結(jié)果如下:
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
正在執(zhí)行任務(wù)task2
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task3
task1結(jié)果:task1 task2結(jié)果:task2
正在執(zhí)行任務(wù)task3
task3執(zhí)行結(jié)束
CompletableFuture提供了thenCombine()來合并另一個CompletableFuture的執(zhí)行結(jié)果,所以thenCombine()需要兩個參數(shù),第一個參數(shù)是另一個CompletableFuture,第二個參數(shù)會收集前兩個任務(wù)的返回值,類似下面這樣:
(result1,result2)->{
// 執(zhí)行業(yè)務(wù)邏輯
return result3;
}

如果小伙伴們想要實(shí)現(xiàn)任務(wù)3也是單獨(dú)的線程執(zhí)行的話,可以使用thenCombineAsync()這個方法。代碼如下:
CompletableFuture
// 任務(wù)1
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR)
.thenCombineAsync(
CompletableFuture
// 任務(wù)2
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return 2;
}, THREAD_POOL_EXECUTOR),
// 任務(wù)3
(task1Result, task2Result) -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("task1結(jié)果:" + task1Result + "\ttask2結(jié)果:" + task2Result);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return 2L;
}, THREAD_POOL_EXECUTOR);
如果任務(wù)3中不需要返回結(jié)果,可以使用thenAcceptBoth()或thenAcceptBothAsync(),使用方式與thenCombineAsync()類似;
- 多任務(wù)結(jié)果合并

示例代碼如下:
CompletableFuture future1 = CompletableFuture
// 任務(wù)1
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture future2 = CompletableFuture
// 任務(wù)2
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture future3 = CompletableFuture
// 任務(wù)3
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
CompletableFuture.allOf(futures)
// 任務(wù)4
.whenCompleteAsync(
(v, e) -> {
List<Object> values = new ArrayList<>();
for (CompletableFuture future : futures) {
try {
values.add(future.get());
} catch (Exception ex) {
}
}
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task4";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("前面任務(wù)的處理結(jié)果:" + values);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
}, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task4
前面任務(wù)的處理結(jié)果:[task1, task2, task3]
正在執(zhí)行任務(wù)task4
task4執(zhí)行結(jié)束
之所以最后任務(wù)4的線程是completable-future-test-Thread-2,那是因?yàn)榫€程池的核心線程數(shù)設(shè)置為3,線程數(shù)設(shè)置高一點(diǎn)就會創(chuàng)建新的線程處理;
從上述代碼示例中,我們可以收獲到另一個知識點(diǎn):allOf(),它的作用是要求所有的任務(wù)全部完成才能執(zhí)行后面的任務(wù)。
任一任務(wù)完成
在一批任務(wù)中,只要有一個任務(wù)完成,那么就可以向后繼續(xù)執(zhí)行其他任務(wù)。
為了代碼演示無異議,后續(xù)代碼中,我們把線程數(shù)提升到4。
示例代碼如下:
CompletableFuture future1 = CompletableFuture
// 任務(wù)1
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture future2 = CompletableFuture
// 任務(wù)2
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture future3 = CompletableFuture
// 任務(wù)3
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture.anyOf(future1, future2, future3)
.thenApplyAsync((taskResult) -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task4";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("前面任務(wù)的處理結(jié)果:" + taskResult);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
執(zhí)行結(jié)果如下:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
task2執(zhí)行結(jié)束
completable-future-test-Thread-5開始執(zhí)行任務(wù):task4
前面任務(wù)的處理結(jié)果:task1
正在執(zhí)行任務(wù)task4
task4執(zhí)行結(jié)束
可以看到,任務(wù)1第一個結(jié)束,所以任務(wù)4中接收到的執(zhí)行結(jié)果就是任務(wù)1的返回值。
快速失敗
在一批任務(wù)當(dāng)中,只要有任意一個任務(wù)執(zhí)行產(chǎn)生異常了,那么就直接結(jié)束;否則就要等待所有任務(wù)成功執(zhí)行完畢。
示例代碼如下:
CompletableFuture future1 = CompletableFuture
// 任務(wù)1
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR);
CompletableFuture future2 = CompletableFuture
// 任務(wù)2
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task2";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
throw new RuntimeException("任務(wù)2異常!");
}, THREAD_POOL_EXECUTOR);
CompletableFuture future3 = CompletableFuture
// 任務(wù)3
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task3";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
throw new RuntimeException("任務(wù)3異常!");
}, THREAD_POOL_EXECUTOR);
CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures);
// 創(chuàng)建一個任務(wù)來監(jiān)聽異常
CompletableFuture<?> anyException = new CompletableFuture<>();
for (CompletableFuture<?> completableFuture : futures) {
completableFuture.exceptionally((t) -> {
// 任何一個任務(wù)異常都會讓anyException任務(wù)完成
anyException.completeExceptionally(t);
return null;
});
}
// 要么allCompletableFuture全部成功,要么一個出現(xiàn)異常就結(jié)束任務(wù)
CompletableFuture.anyOf(allCompletableFuture, anyException)
.whenComplete((value, exception) -> {
if (Objects.nonNull(exception)) {
System.out.println("產(chǎn)生異常,提前結(jié)束!");
exception.printStackTrace();
return;
}
System.out.println("所有任務(wù)正常完成!");
});
執(zhí)行結(jié)果如下:
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1 completable-future-test-Thread-3開始執(zhí)行任務(wù):task2 completable-future-test-Thread-4開始執(zhí)行任務(wù):task3 正在執(zhí)行任務(wù)task2 正在執(zhí)行任務(wù)task3 正在執(zhí)行任務(wù)task1 task2執(zhí)行結(jié)束 task1執(zhí)行結(jié)束 task3執(zhí)行結(jié)束 產(chǎn)生異常,提前結(jié)束! java.util.concurrent.CompletionException: java.lang.RuntimeException: 任務(wù)2異常! at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.RuntimeException: 任務(wù)2異常! at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more
CompletableFuture沒有現(xiàn)成的api實(shí)現(xiàn)快速失敗的功能,所以我們只能結(jié)合allOf()和anyOf()來邏輯來自定義方法完成快速失敗的邏輯;
1.我們需要額外創(chuàng)建一個CompletableFuture來監(jiān)聽所有的CompletableFuture,一旦其中一個CompletableFuture產(chǎn)生異常,我們就設(shè)置額外的CompletableFuture立即完成。
2.把所有的CompletableFuture和額外的CompletableFuture放在anyOf()方法中,這樣一旦額外的CompletableFuture完成,說明產(chǎn)生異常了;否則就需要等待所有的CompletableFuture完成。
注意
- 異常處理
最后需要注意的是,所有的CompletableFuture任務(wù)一定要加上異常處理:
CompletableFuture
// 任務(wù)1
.supplyAsync(
() -> {
Thread currentThread = Thread.currentThread();
String ThreadName = currentThread.getName();
String taskName = "task1";
System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
System.out.println("正在執(zhí)行任務(wù)" + taskName);
System.out.println(taskName + "執(zhí)行結(jié)束");
return taskName;
}, THREAD_POOL_EXECUTOR)
.whenComplete((v,e)->{
if(Objects.nonNull(e)){
// todo
// 處理異常
}
if(Objects.nonNull(v)){
// todo
}
});
還可以通過另外兩個方法處理:exceptionally()或者handle();
- 自定義線程池
CompletableFuture默認(rèn)的線程池是ForkJoinThreadPool,建議大家在使用的時候盡可能地使用自定義線程池,這樣方便后續(xù)的代碼優(yōu)化以及相關(guān)的日志查看。
以上就是java CompletableFuture異步任務(wù)編排示例詳解的詳細(xì)內(nèi)容,更多關(guān)于java CompletableFuture異步編排的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中List轉(zhuǎn)Map List實(shí)現(xiàn)的幾種姿勢
本文主要介紹了Java中List轉(zhuǎn)Map List實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
解決Maven 項(xiàng)目報錯 java.httpservlet和synchronized使用方法
下面小編就為大家?guī)硪黄鉀QMaven 項(xiàng)目報錯 java.httpservlet和synchronized使用方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07
如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序
這篇文章主要介紹了如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12
利用java和sqlserver建立簡易圖書管理系統(tǒng)的完整步驟
圖書館管理系統(tǒng)是圖書館管理工作中不可缺少的部分,它對于圖書館的管理者和使用者都非常重要,下面這篇文章主要給大家介紹了關(guān)于利用java和sqlserver建立簡易圖書管理系統(tǒng)的完整步驟,需要的朋友可以參考下2022-06-06
springboot2.3 整合mybatis-plus 高級功能(圖文詳解)
這篇文章主要介紹了springboot2.3 整合mybatis-plus 高級功能,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08
Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接
RabbitMQ是一個開源的AMQP實(shí)現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,這篇文章主要介紹了Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接,需要的朋友可以參考下2022-10-10
深入理解java異常處理機(jī)制的原理和開發(fā)應(yīng)用
Java異常處理機(jī)制在日常開發(fā)中應(yīng)用頻繁,本篇文章主要在基礎(chǔ)的使用方法上,更進(jìn)一步的,如何更加合理的使用異常機(jī)制,希望可以對各位朋友能有所幫助。2017-04-04

