欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java?CompletableFuture異步任務(wù)編排示例詳解

 更新時(shí)間:2022年11月29日 11:02:22   作者:夢(mèng)想實(shí)現(xiàn)家_Z  
這篇文章主要為大家介紹了java?CompletableFuture異步任務(wù)編排示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

在之前的項(xiàng)目開發(fā)中,都沒怎么使用過CompletableFuture的功能,只聽說過和異步編程有關(guān)。為了能夠在將來有需要的時(shí)候用得上,這兩天花了點(diǎn)時(shí)間學(xué)習(xí)了一下,并簡(jiǎn)單地總結(jié)一下如何使用CompletableFuture完成異步任務(wù)編排。

先創(chuàng)建一個(gè)自定義的線程池,后續(xù)所有代碼都會(huì)使用到:

  private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {
    private final AtomicInteger THREAD_NUM = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
//      設(shè)置為守護(hù)線程,main線程結(jié)束就跟著一起結(jié)束,否則main函數(shù)結(jié)束jvm還在
      t.setDaemon(true);
      t.setName("completable-future-test-Thread-" + THREAD_NUM.incrementAndGet());
      return t;
    }
  }, new ThreadPoolExecutor.AbortPolicy());

同步串行

同步串行代表任務(wù)1、任務(wù)2、任務(wù)3按時(shí)間先后順序執(zhí)行,并且都是同一個(gè)線程來執(zhí)行。

示例代碼如下:

CompletableFuture
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        .thenApply(
            (task1Result) -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println("拿到上一個(gè)任務(wù)的返回值:" + task1Result);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            })
        .thenAccept(
             (task2Result) -> {
             Thread currentThread = Thread.currentThread();
             String ThreadName = currentThread.getName();
             String taskName = "task3";
             System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
             System.out.println("正在執(zhí)行任務(wù)" + taskName);
             System.out.println("拿到上一個(gè)任務(wù)的返回值:" + task2Result);
             System.out.println(taskName + "執(zhí)行結(jié)束");
           });

執(zhí)行結(jié)果:

completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task2
拿到上一個(gè)任務(wù)的返回值:task1
task2執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task3
拿到上一個(gè)任務(wù)的返回值:task2
task3執(zhí)行結(jié)束

1.入口函數(shù)supplyAsync()代表一個(gè)異步的有返回值的函數(shù),之所以異步,是與主線程區(qū)別,從線程池中的拿一個(gè)線程來執(zhí)行。

2.thenApply()thenAccept()沒有Async,意味著是和前面的任務(wù)共用一個(gè)線程,從執(zhí)行結(jié)果上我們也可以看到線程名稱相同。

3.thenApply()需要接收上一個(gè)任務(wù)的返回值,并且自己也要有返回值。

4.thenAccept()需要接收上一個(gè)任務(wù)的返回值,但是它不需要返回值。

異步串行

異步串行代表任務(wù)1、任務(wù)2、任務(wù)3按時(shí)間先后順序執(zhí)行,并由不同的線程來執(zhí)行。

示例代碼如下:

    CompletableFuture
        // 有返回值
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        // 需要上一個(gè)任務(wù)的返回值,并且自身有返回值
        .thenApplyAsync(
            (task1Result) -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println("拿到上一個(gè)任務(wù)的返回值:" + task1Result);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        // 不需要上一個(gè)任務(wù)的返回值,自身也沒有返回值
        .thenRunAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println("thenRunAsync()不需要上一個(gè)任務(wù)的返回值");
              System.out.println(taskName + "執(zhí)行結(jié)束");
            }, THREAD_POOL_EXECUTOR);

執(zhí)行結(jié)果如下:

completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task2
拿到上一個(gè)任務(wù)的返回值:task1
task2執(zhí)行結(jié)束
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task3
thenRunAsync()不需要上一個(gè)任務(wù)的返回值
task3執(zhí)行結(jié)束

1.入口函數(shù)依然是supplyAsync(),需要傳入一個(gè)有返回值的函數(shù)作為參數(shù);如果想要沒有返回值的函數(shù)傳進(jìn)來的話,可以使用CompletableFuture.runAsync();

2.thenApplyAsync()thenRunAsync()分別表示里面的任務(wù)都是異步執(zhí)行的,和執(zhí)行前面的任務(wù)不是同一個(gè)線程;

3.thenRunAsync()需要傳入一個(gè)既不需要參數(shù),也沒有返回值的任務(wù);

并行任務(wù)

并行代表任務(wù)1、任務(wù)2、任務(wù)3沒有依賴關(guān)系,分別由不同的線程執(zhí)行;

示例代碼如下:

    CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture<Void> future2 = CompletableFuture
        .runAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);

執(zhí)行結(jié)果如下:

completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task3
task3執(zhí)行結(jié)束
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束

一看執(zhí)行結(jié)果,明顯是亂序的,并且三個(gè)任務(wù)分別由三個(gè)線程執(zhí)行,符合咱們的預(yù)期;注意異步的方法后面都是帶有Async關(guān)鍵字的;

多任務(wù)結(jié)果合并計(jì)算

  • 兩個(gè)任務(wù)結(jié)果的合并

任務(wù)3的執(zhí)行依賴于任務(wù)1、任務(wù)2的返回值,并且任務(wù)1和任務(wù)3由同一個(gè)線程執(zhí)行,任務(wù)2單獨(dú)一個(gè)線程執(zhí)行;

示例代碼如下:

    CompletableFuture
        // 任務(wù)1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        .thenCombine(
            CompletableFuture
                // 任務(wù)2
                .supplyAsync(
                    () -> {
                      Thread currentThread = Thread.currentThread();
                      String ThreadName = currentThread.getName();
                      String taskName = "task2";
                      System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
                      System.out.println("正在執(zhí)行任務(wù)" + taskName);
                      System.out.println(taskName + "執(zhí)行結(jié)束");
                      return taskName;
                    }, THREAD_POOL_EXECUTOR),
            // 任務(wù)3
            (task1Result, task2Result) -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("task1結(jié)果:" + task1Result + "\ttask2結(jié)果:" + task2Result);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            });

執(zhí)行結(jié)果如下:

completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task1
正在執(zhí)行任務(wù)task2
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task3
task1結(jié)果:task1 task2結(jié)果:task2
正在執(zhí)行任務(wù)task3
task3執(zhí)行結(jié)束

CompletableFuture提供了thenCombine()來合并另一個(gè)CompletableFuture的執(zhí)行結(jié)果,所以thenCombine()需要兩個(gè)參數(shù),第一個(gè)參數(shù)是另一個(gè)CompletableFuture,第二個(gè)參數(shù)會(huì)收集前兩個(gè)任務(wù)的返回值,類似下面這樣:

(result1,result2)->{
  // 執(zhí)行業(yè)務(wù)邏輯
  return result3;
}

如果小伙伴們想要實(shí)現(xiàn)任務(wù)3也是單獨(dú)的線程執(zhí)行的話,可以使用thenCombineAsync()這個(gè)方法。代碼如下:

    CompletableFuture
        // 任務(wù)1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        .thenCombineAsync(
            CompletableFuture
                // 任務(wù)2
                .supplyAsync(
                    () -> {
                      Thread currentThread = Thread.currentThread();
                      String ThreadName = currentThread.getName();
                      String taskName = "task2";
                      System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
                      System.out.println("正在執(zhí)行任務(wù)" + taskName);
                      System.out.println(taskName + "執(zhí)行結(jié)束");
                      return 2;
                    }, THREAD_POOL_EXECUTOR),
            // 任務(wù)3
            (task1Result, task2Result) -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("task1結(jié)果:" + task1Result + "\ttask2結(jié)果:" + task2Result);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return 2L;
            }, THREAD_POOL_EXECUTOR);

如果任務(wù)3中不需要返回結(jié)果,可以使用thenAcceptBoth()thenAcceptBothAsync(),使用方式與thenCombineAsync()類似;

  • 多任務(wù)結(jié)果合并

示例代碼如下:

    CompletableFuture future1 = CompletableFuture
        // 任務(wù)1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future2 = CompletableFuture
        // 任務(wù)2
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future3 = CompletableFuture
        // 任務(wù)3
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
    CompletableFuture.allOf(futures)
        // 任務(wù)4
        .whenCompleteAsync(
            (v, e) -> {
              List<Object> values = new ArrayList<>();
              for (CompletableFuture future : futures) {
                try {
                  values.add(future.get());
                } catch (Exception ex) {
                }
              }
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task4";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("前面任務(wù)的處理結(jié)果:" + values);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
            }, THREAD_POOL_EXECUTOR);

執(zhí)行結(jié)果如下:

completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
completable-future-test-Thread-2開始執(zhí)行任務(wù):task4
前面任務(wù)的處理結(jié)果:[task1, task2, task3]
正在執(zhí)行任務(wù)task4
task4執(zhí)行結(jié)束

之所以最后任務(wù)4的線程是completable-future-test-Thread-2,那是因?yàn)榫€程池的核心線程數(shù)設(shè)置為3,線程數(shù)設(shè)置高一點(diǎn)就會(huì)創(chuàng)建新的線程處理;

從上述代碼示例中,我們可以收獲到另一個(gè)知識(shí)點(diǎn):allOf(),它的作用是要求所有的任務(wù)全部完成才能執(zhí)行后面的任務(wù)。

任一任務(wù)完成

在一批任務(wù)中,只要有一個(gè)任務(wù)完成,那么就可以向后繼續(xù)執(zhí)行其他任務(wù)。

為了代碼演示無異議,后續(xù)代碼中,我們把線程數(shù)提升到4。

示例代碼如下:

    CompletableFuture future1 = CompletableFuture
        // 任務(wù)1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future2 = CompletableFuture
        // 任務(wù)2
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future3 = CompletableFuture
        // 任務(wù)3
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture.anyOf(future1, future2, future3)
        .thenApplyAsync((taskResult) -> {
          Thread currentThread = Thread.currentThread();
          String ThreadName = currentThread.getName();
          String taskName = "task4";
          System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
          System.out.println("前面任務(wù)的處理結(jié)果:" + taskResult);
          System.out.println("正在執(zhí)行任務(wù)" + taskName);
          System.out.println(taskName + "執(zhí)行結(jié)束");
          return taskName;
        }, THREAD_POOL_EXECUTOR);

執(zhí)行結(jié)果如下:

completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task1
task1執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
task2執(zhí)行結(jié)束
completable-future-test-Thread-5開始執(zhí)行任務(wù):task4
前面任務(wù)的處理結(jié)果:task1
正在執(zhí)行任務(wù)task4
task4執(zhí)行結(jié)束

可以看到,任務(wù)1第一個(gè)結(jié)束,所以任務(wù)4中接收到的執(zhí)行結(jié)果就是任務(wù)1的返回值。

快速失敗

在一批任務(wù)當(dāng)中,只要有任意一個(gè)任務(wù)執(zhí)行產(chǎn)生異常了,那么就直接結(jié)束;否則就要等待所有任務(wù)成功執(zhí)行完畢。

示例代碼如下:

    CompletableFuture future1 = CompletableFuture
        // 任務(wù)1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future2 = CompletableFuture
        // 任務(wù)2
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              throw new RuntimeException("任務(wù)2異常!");
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future3 = CompletableFuture
        // 任務(wù)3
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              throw new RuntimeException("任務(wù)3異常!");
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
    CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures);
    // 創(chuàng)建一個(gè)任務(wù)來監(jiān)聽異常
    CompletableFuture<?> anyException = new CompletableFuture<>();
    for (CompletableFuture<?> completableFuture : futures) {
      completableFuture.exceptionally((t) -> {
        // 任何一個(gè)任務(wù)異常都會(huì)讓anyException任務(wù)完成
        anyException.completeExceptionally(t);
        return null;
      });
    }
    // 要么allCompletableFuture全部成功,要么一個(gè)出現(xiàn)異常就結(jié)束任務(wù)
    CompletableFuture.anyOf(allCompletableFuture, anyException)
        .whenComplete((value, exception) -> {
          if (Objects.nonNull(exception)) {
            System.out.println("產(chǎn)生異常,提前結(jié)束!");
            exception.printStackTrace();
            return;
          }
          System.out.println("所有任務(wù)正常完成!");
        });

執(zhí)行結(jié)果如下:

completable-future-test-Thread-2開始執(zhí)行任務(wù):task1
completable-future-test-Thread-3開始執(zhí)行任務(wù):task2
completable-future-test-Thread-4開始執(zhí)行任務(wù):task3
正在執(zhí)行任務(wù)task2
正在執(zhí)行任務(wù)task3
正在執(zhí)行任務(wù)task1
task2執(zhí)行結(jié)束
task1執(zhí)行結(jié)束
task3執(zhí)行結(jié)束
產(chǎn)生異常,提前結(jié)束!
java.util.concurrent.CompletionException: java.lang.RuntimeException: 任務(wù)2異常!
  at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
  at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
  at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: 任務(wù)2異常!
  at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53)
  at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
  ... 3 more

CompletableFuture沒有現(xiàn)成的api實(shí)現(xiàn)快速失敗的功能,所以我們只能結(jié)合allOf()anyOf()來邏輯來自定義方法完成快速失敗的邏輯;

1.我們需要額外創(chuàng)建一個(gè)CompletableFuture來監(jiān)聽所有的CompletableFuture,一旦其中一個(gè)CompletableFuture產(chǎn)生異常,我們就設(shè)置額外的CompletableFuture立即完成。

2.把所有的CompletableFuture和額外的CompletableFuture放在anyOf()方法中,這樣一旦額外的CompletableFuture完成,說明產(chǎn)生異常了;否則就需要等待所有的CompletableFuture完成。

注意

  • 異常處理

最后需要注意的是,所有的CompletableFuture任務(wù)一定要加上異常處理:

    CompletableFuture
        // 任務(wù)1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "開始執(zhí)行任務(wù):" + taskName);
              System.out.println("正在執(zhí)行任務(wù)" + taskName);
              System.out.println(taskName + "執(zhí)行結(jié)束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        .whenComplete((v,e)->{
          if(Objects.nonNull(e)){
            // todo
            // 處理異常
          }
          if(Objects.nonNull(v)){
            // todo
          }
        });

還可以通過另外兩個(gè)方法處理:exceptionally()或者handle();

  • 自定義線程池

CompletableFuture默認(rèn)的線程池是ForkJoinThreadPool,建議大家在使用的時(shí)候盡可能地使用自定義線程池,這樣方便后續(xù)的代碼優(yōu)化以及相關(guān)的日志查看。

以上就是java CompletableFuture異步任務(wù)編排示例詳解的詳細(xì)內(nèi)容,更多關(guān)于java CompletableFuture異步編排的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java中List轉(zhuǎn)Map List實(shí)現(xiàn)的幾種姿勢(shì)

    Java中List轉(zhuǎn)Map List實(shí)現(xiàn)的幾種姿勢(shì)

    本文主要介紹了Java中List轉(zhuǎn)Map List實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-06-06
  • 解決Maven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法

    解決Maven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法

    下面小編就為大家?guī)硪黄鉀QMaven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-07-07
  • 如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序

    如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序

    這篇文章主要介紹了如何使用Comparator比較接口實(shí)現(xiàn)ArrayList集合排序問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • SpringBoot之webflux全面解析

    SpringBoot之webflux全面解析

    這篇文章主要介紹了SpringBoot之webflux全面解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • 利用java和sqlserver建立簡(jiǎn)易圖書管理系統(tǒng)的完整步驟

    利用java和sqlserver建立簡(jiǎn)易圖書管理系統(tǒng)的完整步驟

    圖書館管理系統(tǒng)是圖書館管理工作中不可缺少的部分,它對(duì)于圖書館的管理者和使用者都非常重要,下面這篇文章主要給大家介紹了關(guān)于利用java和sqlserver建立簡(jiǎn)易圖書管理系統(tǒng)的完整步驟,需要的朋友可以參考下
    2022-06-06
  • springboot2.3 整合mybatis-plus 高級(jí)功能(圖文詳解)

    springboot2.3 整合mybatis-plus 高級(jí)功能(圖文詳解)

    這篇文章主要介紹了springboot2.3 整合mybatis-plus 高級(jí)功能,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-08-08
  • Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接

    Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接

    RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,這篇文章主要介紹了Spring?boot?整合RabbitMQ實(shí)現(xiàn)通過RabbitMQ進(jìn)行項(xiàng)目的連接,需要的朋友可以參考下
    2022-10-10
  • 深入理解java異常處理機(jī)制的原理和開發(fā)應(yīng)用

    深入理解java異常處理機(jī)制的原理和開發(fā)應(yīng)用

     Java異常處理機(jī)制在日常開發(fā)中應(yīng)用頻繁,本篇文章主要在基礎(chǔ)的使用方法上,更進(jìn)一步的,如何更加合理的使用異常機(jī)制,希望可以對(duì)各位朋友能有所幫助。
    2017-04-04
  • Java JDBC導(dǎo)致的反序列化攻擊原理解析

    Java JDBC導(dǎo)致的反序列化攻擊原理解析

    這篇文章主要介紹了Java JDBC導(dǎo)致的反序列化攻擊原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • java設(shè)計(jì)模式:原始模型模式

    java設(shè)計(jì)模式:原始模型模式

    這篇文章主要為大家詳細(xì)介紹了Java設(shè)計(jì)模式之Prototype原型模式的相關(guān)資料,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-08-08

最新評(píng)論