欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java如何利用CompletableFuture描述任務(wù)之間的關(guān)系

 更新時(shí)間:2023年07月30日 08:30:43   作者:Shawn_Shawn  
Java如何根據(jù)線程的執(zhí)行結(jié)果執(zhí)行下一步動作呢,F(xiàn)uture的另一個(gè)實(shí)現(xiàn)類CompletableFuture能夠優(yōu)雅的解決異步化問題,下面就跟隨小編一起了解一下吧

java線程-如何獲取到線程執(zhí)行的結(jié)果一文中講解了Future的用法和實(shí)現(xiàn)原理,Future主要用于獲取線程執(zhí)行的結(jié)果,那么如何根據(jù)線程的執(zhí)行結(jié)果執(zhí)行下一步動作呢?Future的另一個(gè)實(shí)現(xiàn)類CompletableFuture能夠優(yōu)雅的解決異步化問題。

CompletableFuture

CompletableFuture是java8新增的,這個(gè)類實(shí)現(xiàn)了兩個(gè)接口,一個(gè)是Future接口,一個(gè)是CompletionStage接口。

CompletableFuture提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法。

CompletableFuture被設(shè)計(jì)在Java中進(jìn)行異步編程。異步編程意味著在主線程之外創(chuàng)建一個(gè)獨(dú)立的線程,與主線程分隔開,并在上面運(yùn)行一個(gè)非阻塞的任務(wù),然后通知主線程進(jìn)展,成功或者失敗。

通過這種方式,你的主線程不用為了任務(wù)的完成而阻塞/等待,你可以用主線程去并行執(zhí)行其他的任務(wù)。 使用這種并行方式,極大地提升了程序的表現(xiàn)。

CompletionStage

CompletionStage是java8新增的接口,用于異步執(zhí)行中的階段處理。CompletionStage定義了一組接口用于在一個(gè)階段執(zhí)行結(jié)束之后,要么繼續(xù)執(zhí)行下一個(gè)階段,要么對結(jié)果進(jìn)行轉(zhuǎn)換產(chǎn)生新的結(jié)果等等,一般來說要執(zhí)行下一個(gè)階段都需要上一個(gè)階段正常完成,當(dāng)然這個(gè)類也提供了對異常結(jié)果的處理接口。

任務(wù)與任務(wù)之間是有聯(lián)系關(guān)系的,比如串行關(guān)系,并行關(guān)系,AND,OR等關(guān)系。

描述串行關(guān)系

CompletionStage接口里面描述串行關(guān)系,主要是thenApply、thenAcceptthenRunthenCompose這四個(gè)系列的接口。

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, 
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, 
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action, 
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Executor executor);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);
  • thenApply系列方法,入?yún)㈩愋褪墙涌?Function<T, R>,這個(gè)接口里與CompletionStage相關(guān)的方法是 R apply(T t),這個(gè)方法既能接收參數(shù)也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>
  • thenAccept系列方法,入?yún)㈩愋褪墙涌?code>Consumer<T>,這個(gè)接口里與CompletionStage相關(guān)的方法是void accept(T t),這個(gè)方法雖然支持參數(shù),但卻不支持回值,所以thenAccept系列方法返回的是CompletionStage<Void>。
  • thenRun系列方法,入?yún)㈩愋褪墙涌?code>Runnable,Runnable既不能接收參數(shù)也不支持返回值,所以thenRun系列方法返回的是CompletionStage<Void>。
  • thenCompose系列方法,這個(gè)系列的方法會新創(chuàng)建出一個(gè)子流程,最終結(jié)果和thenApply系列是相同的。
  • 這些方法里面Async代表的是異步執(zhí)行fn、consumer或者action。
// 描述串行關(guān)系
// 期待輸出
// hello world
// Hello CompletableFuture
private static void serial() throws Exception {
 ?CompletableFuture<Void> t1 =
 ? ?CompletableFuture.supplyAsync(() -> "Hello")
 ?  .thenApply(s -> s + " World")
 ?  .thenApply(String::toLowerCase)
 ?  .thenAccept(System.out::println);

 ?CompletableFuture<Void> t2 =
 ? ?CompletableFuture.supplyAsync(() -> "Hello")
 ?  .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " CompuletableFuture"))
 ?  .thenAccept(System.out::println);

 ?t1.thenRun(() -> t2.join());
 ?t1.join();
}

描述AND聚合關(guān)系

CompletionStage接口里面描述AND聚合關(guān)系,主要是thenCombine、thenAcceptBoth、runAfterBoth這三個(gè)系列的接口。

public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, 
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BiFunction<? super T,? super U,? extends V> fn,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, 
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BiConsumer<? super T, ? super U> action,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Runnable action);
  • thenAcceptBoth系列方法,入?yún)?code>CompletionStage以及BiConsumer<T,U>,這一組函數(shù)是用來接受兩個(gè)CompletableFuture的返回值,并將其組合到一起。
  • thenCombine系列方法,與thenAcceotBoth類似,與thenAcceptBoth不同的是,thenCombine將兩個(gè)任務(wù)結(jié)果合并后會返回一個(gè)全新的值作為出參。
  • runAfterBoth系列方法,入?yún)㈩愋蜑?code>CompletionStage<?>和Runnable
/*
hello world
Hello CompuletableFuture
thenCombine end
thenAcceptBoth end
runAfterBoth end
*/
private static void and() throws Exception {
 ?CompletableFuture<Void> t1 =
 ? ? ?CompletableFuture.supplyAsync(() -> "Hello")
 ? ? ? ?  .thenApply(s -> s + " World")
 ? ? ? ?  .thenApply(String::toLowerCase)
 ? ? ? ?  .thenAccept(System.out::println);

 ?CompletableFuture<Void> t2 =
 ? ? ?CompletableFuture.supplyAsync(() -> "Hello")
 ? ? ? ?  .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " CompuletableFuture"))
 ? ? ? ?  .thenAccept(System.out::println);

 ?CompletableFuture<String> t3 = t1.thenCombine(t2, (__, s) -> "end");
 ?t3.thenAccept(s -> System.out.println("thenCombine " + s));
 ?t3.join();

 ?CompletableFuture<Void> t4 =
 ? ? ?t1.thenAcceptBoth(t2, (__, s) -> System.out.println("thenAcceptBoth end"));
 ?t4.join();

 ?CompletableFuture<Void> t5 = t1.runAfterBoth(t2, () -> System.out.println("runAfterBoth end"));
 ?t5.join();
}

描述OR聚合關(guān)系

CompletionStage接口里面描述OR聚合關(guān)系,主要是applyToEitheracceptEitherrunAfterEither系列的接口。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Function<? super T, U> fn);

public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Function<? super T, U> fn,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Consumer<? super T> action);

public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Consumer<? super T> action);

public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Consumer<? super T> action,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Runnable action);

public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Runnable action);

public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Runnable action,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Executor executor);
  • applyToEither系列方法,入?yún)㈩愋蜑?code>CompletionStage和Function
  • acceptEither系列方法,入?yún)㈩愋蜑?code>CompletionStage和Consumer
  • runAfterEither系列方法,入?yún)㈩愋蜑?code>CompletionStage和Runnable
private static void or() {
 ?CompletableFuture<Void> t1 =
 ? ? ?CompletableFuture.runAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(10);
 ? ? ? ? ? ? ?System.out.println("t1");
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ?throw new RuntimeException(e);
 ? ? ? ? ?  }
 ? ? ? ?  });

 ?CompletableFuture<Void> t2 =
 ? ? ?CompletableFuture.runAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(200);
 ? ? ? ? ? ? ?System.out.println("t2");
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ?throw new RuntimeException(e);
 ? ? ? ? ?  }
 ? ? ? ?  });

 ?CompletableFuture<String> t3 = t1.applyToEither(t2, s -> "applyToEither end");
 ?t3.thenAccept(System.out::println);
 ?t3.join();

 ?CompletableFuture<Void> t4 = t1.acceptEither(t2, s -> System.out.println("acceptEither end"));
 ?t4.join();

 ?CompletableFuture<Void> t5 =
 ? ? ?t1.runAfterEither(t2, () -> System.out.println("runAfterEither end"));
 ?t5.join();
}

描述異常關(guān)系

雖然fn、consumer、action它們的核心方法都不允許拋出可檢查異常,但是卻無法限制它們拋出運(yùn)行時(shí)異常。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Executor executor);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Executor executor);
  • exceptionally系列方法,入?yún)㈩愋蜑?code>Function<Throwable, T>。
  • whenComplete系列方法,入?yún)㈩愋蜑?code>BiConsumer<T, Throwable>。
  • handle系列方法,入?yún)㈩愋蜑?code>BiFunction<T, Throwable, U>。
private static void exception() {
 ?int i = 0;
 ?CompletableFuture<String> t1 =
 ? ? ?CompletableFuture.supplyAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?System.out.println("t1");
 ? ? ? ? ? ?if (i > 0) {
 ? ? ? ? ? ? ?return "t1";
 ? ? ? ? ?  }
 ? ? ? ? ? ?throw new RuntimeException("error");
 ? ? ? ?  });

 ?// ?  CompletableFuture<Void> t2 =
 ?// ? ? ?  t1.exceptionally(e -> e.getMessage())
 ?// ? ? ? ? ?  .thenAccept(System.out::println)
 ?// ? ? ? ? ?  .thenRun(() -> System.out.println("exceptionally end"));
 ?// ?  t2.join();

 ?// ?  CompletableFuture<String> t3 =
 ?// ? ? ?  t1.whenComplete(
 ?// ? ? ? ? ? ? ?  (s, e) -> {
 ?// ? ? ? ? ? ? ? ?  if (e != null) {
 ?// ? ? ? ? ? ? ? ? ?  System.out.println(e.getMessage());
 ?// ? ? ? ? ? ? ? ?  } else {
 ?// ? ? ? ? ? ? ? ? ?  System.out.println(s);
 ?// ? ? ? ? ? ? ? ?  }
 ?// ? ? ? ? ? ? ?  });
 ?// ?  t3.join();

 ?CompletableFuture<Void> t4 =
 ? ? ?t1.handle(
 ? ? ? ? ? ?  (s, e) -> {
 ? ? ? ? ? ? ? ?if (e != null) {
 ? ? ? ? ? ? ? ? ?return e.getMessage();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?return s;
 ? ? ? ? ? ?  })
 ? ? ? ?  .thenAccept(System.out::println);
 ?t4.join();
}

其余API

allOf() 與anyOf() 也是一對孿生兄弟,當(dāng)我們需要對多個(gè)Future的運(yùn)行進(jìn)行組織時(shí),就可以考慮使用它們:

  • allOf() :給定一組任務(wù),等待所有任務(wù)執(zhí)行結(jié)束;
  • anyOf() :給定一組任務(wù),等待其中任一任務(wù)執(zhí)行結(jié)束。
private static void all() {
 ?long startTime = System.currentTimeMillis();
 ?CompletableFuture<Void> t1 =
 ? ? ?CompletableFuture.runAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(5);
 ? ? ? ? ? ? ?System.out.println("t1");
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ?throw new RuntimeException(e);
 ? ? ? ? ?  }
 ? ? ? ?  });

 ?CompletableFuture<Void> t2 =
 ? ? ?CompletableFuture.runAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(15);
 ? ? ? ? ? ? ?System.out.println("t2");
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ?throw new RuntimeException(e);
 ? ? ? ? ?  }
 ? ? ? ?  });
 ?CompletableFuture<Void> t3 =
 ? ? ?CompletableFuture.allOf(t1, t2)
 ? ? ? ?  .thenRun(
 ? ? ? ? ? ?  () ->
 ? ? ? ? ? ? ? ? ?System.out.println(
 ? ? ? ? ? ? ? ? ? ? ?"all end cost " + (System.currentTimeMillis() - startTime) + " ms."));
 ?t3.join();
}

private static void any() {
 ?long startTime = System.currentTimeMillis();
 ?CompletableFuture<Void> t1 =
 ? ? ?CompletableFuture.runAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(5);
 ? ? ? ? ? ? ?System.out.println("t1");
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ?throw new RuntimeException(e);
 ? ? ? ? ?  }
 ? ? ? ?  });

 ?CompletableFuture<Void> t2 =
 ? ? ?CompletableFuture.runAsync(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(15);
 ? ? ? ? ? ? ?System.out.println("t2");
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ?throw new RuntimeException(e);
 ? ? ? ? ?  }
 ? ? ? ?  });

 ?CompletableFuture<Void> t3 =
 ? ? ?CompletableFuture.anyOf(t1, t2)
 ? ? ? ?  .thenRun(
 ? ? ? ? ? ?  () ->
 ? ? ? ? ? ? ? ? ?System.out.println(
 ? ? ? ? ? ? ? ? ? ? ?"all end cost " + (System.currentTimeMillis() - startTime) + " ms."));
 ?t3.join();
}

經(jīng)典案例

利用CompletableFuture來實(shí)現(xiàn)燒水泡茶程序,來自極客時(shí)間-java并發(fā)編程課程的案例

分工:

  • 任務(wù)1負(fù)責(zé)洗水壺,燒開水。
  • 任務(wù)2負(fù)責(zé)洗茶壺,洗茶杯,拿茶葉。
  • 任務(wù)1和任務(wù)2并行。
  • 任務(wù)1和任務(wù)3都完成后,啟動任務(wù)三泡茶。

// 任務(wù) 1:洗水壺 -> 燒開水
CompletableFuture<Void> f1 = 
 ?CompletableFuture.runAsync(()->{
 ?System.out.println("T1: 洗水壺...");
 ?sleep(1, TimeUnit.SECONDS);
?
 ?System.out.println("T1: 燒開水...");
 ?sleep(15, TimeUnit.SECONDS);
});
// 任務(wù) 2:洗茶壺 -> 洗茶杯 -> 拿茶葉
CompletableFuture<String> f2 = 
 ?CompletableFuture.supplyAsync(()->{
 ?System.out.println("T2: 洗茶壺...");
 ?sleep(1, TimeUnit.SECONDS);
?
 ?System.out.println("T2: 洗茶杯...");
 ?sleep(2, TimeUnit.SECONDS);
?
 ?System.out.println("T2: 拿茶葉...");
 ?sleep(1, TimeUnit.SECONDS);
 ?return " 龍井 ";
});
// 任務(wù) 3:任務(wù) 1 和任務(wù) 2 完成后執(zhí)行:泡茶
CompletableFuture<String> f3 = 
 ?f1.thenCombine(f2, (__, tf)->{
 ? ?System.out.println("T1: 拿到茶葉:" + tf);
 ? ?System.out.println("T1: 泡茶...");
 ? ?return " 上茶:" + tf;
  });
// 等待任務(wù) 3 執(zhí)行結(jié)果
System.out.println(f3.join());
?
void sleep(int t, TimeUnit u) {
 ?try {
 ? ?u.sleep(t);
  }catch(InterruptedException e){}
}

一次執(zhí)行結(jié)果:

T1: 洗水壺...
T2: 洗茶壺...
T1: 燒開水...
T2: 洗茶杯...
T2: 拿茶葉...
T1: 拿到茶葉: 龍井
T1: 泡茶...
上茶: 龍井

到此這篇關(guān)于Java如何利用CompletableFuture描述任務(wù)之間的關(guān)系的文章就介紹到這了,更多相關(guān)Java CompletableFuture內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論