Java8 CompletableFuture異步編程解讀
CompletableFuturede介紹
Java 8 引入了 CompletableFuture 類,這是 Java 異步編程的一個重要進展。
CompletableFuture 提供了一種基于未來結(jié)果的異步編程模型,允許你以更加直觀和易于理解的方式編寫非阻塞代碼。
CompletableFuturede使用場景
CompletableFuture 主要用于:
- 異步計算:如果你有一些計算任務可以異步執(zhí)行,并且不想阻塞主線程,可以使用 CompletableFuture。
- 多個并行任務組合:當你有多個獨立的異步任務,并且想要在它們都完成后執(zhí)行某些操作時,可以用 CompletableFuture 來組合它們。
- 異步回調(diào):當異步計算完成后,你需要執(zhí)行某些后續(xù)操作(如更新 UI、保存結(jié)果等),可以通過 thenApply(), thenAccept(), thenRun() 等方法指定回調(diào)。
- 超時控制:可以為異步任務設置超時限制,防止任務執(zhí)行時間過長,導致線程被長時間占用。
- 錯誤處理:在異步任務中,如果有異常發(fā)生,可以通過 handle() 或 exceptionally() 方法進行錯誤處理。
- 多任務的組合與合成:可以將多個異步任務的結(jié)果進行合成,產(chǎn)生新的任務。
常用異步編程實現(xiàn)方案
- Thread
特點:
- Thread是 Java 中最基本的并發(fā)執(zhí)行單位,代表一個獨立的執(zhí)行路徑。
- Thread可以通過繼承 Thread 類或?qū)崿F(xiàn) Runnable 接口來創(chuàng)建和啟動。
- 線程會從 run() 方法開始執(zhí)行,run() 方法可以包含任何邏輯。
- 適合處理簡單的并發(fā)任務,但不適合復雜的并發(fā)場景,因為線程管理較為麻煩。
使用示例:
public static void main(String[] args) { Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " is running..."); }); thread.start(); }
- ExecutorService
特點:
- ExecutorService 是一個用于執(zhí)行異步任務的接口,通常與線程池一起使用。
- 它提供了方法來提交任務、關閉線程池、獲取任務結(jié)果等。
- ExecutorService 包括多種實現(xiàn),如 ThreadPoolExecutor,并且支持任務的異步執(zhí)行。
- 支持有返回值的任務(通過 submit() 方法)和無返回值的任務(通過 execute() 方法)。
使用示例:
有返回值:
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(2); // 創(chuàng)建線程池 Callable<Integer> task = () -> { Thread.sleep(1000); return 42; }; Future<Integer> result = executor.submit(task); // 提交任務并獲得 Future 對象 System.out.println("Task result: " + result.get()); // 獲取結(jié)果 executor.shutdown(); // 關閉線程池 }
無返回值:
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2); // 創(chuàng)建線程池 Runnable task = () -> { System.out.println(Thread.currentThread().getName() + " is running..."); }; executor.execute(task); // 提交任務 executor.shutdown(); // 關閉線程池 }
- CountDownLatch
特點:
- CountDownLatch 是一個同步輔助類,允許一個或多個線程等待直到其他線程完成某個操作。
- 使用一個計數(shù)器(count)來表示待完成的任務數(shù)量,每個任務完成后調(diào)用 countDown() 方法,計數(shù)器減一。
- 當計數(shù)器為零時,所有等待的線程會繼續(xù)執(zhí)行。
- CountDownLatch 不能重用,它適合用于多個線程并行執(zhí)行后,等待所有線程完成的場景。
使用示例:
public static void main(String[] args) throws InterruptedException { int totalThreads = 3; CountDownLatch latch = new CountDownLatch(totalThreads); // 初始化計數(shù)器為3 Runnable task = () -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " finished."); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 每個線程完成后減少計數(shù)器 } }; // 啟動多個線程 for (int i = 0; i < totalThreads; i++) { new Thread(task).start(); } latch.await(); // 等待計數(shù)器歸零 System.out.println("All tasks are finished."); }
- CyclicBarrier
特點:
- CyclicBarrier 允許一組線程互相等待,直到所有線程都到達一個公共屏障點,然后所有線程再一起繼續(xù)執(zhí)行。
- 它的計數(shù)器每次歸零后會重置,適合用來處理多輪同步任務。
- 每當所有線程到達屏障點時,都會執(zhí)行一個可選的動作(如回調(diào)函數(shù))。
使用示例:
public static void main(String[] args) throws InterruptedException { int totalThreads = 3; CyclicBarrier barrier = new CyclicBarrier(totalThreads, () -> { System.out.println("All threads reached the barrier point, proceeding..."); }); Runnable task = () -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " reached the barrier."); barrier.await(); // 等待其他線程到達屏障點 } catch (Exception e) { e.printStackTrace(); } }; // 啟動多個線程 for (int i = 0; i < totalThreads; i++) { new Thread(task).start(); } }
- ForkJoinPool
特點:
- ForkJoinPool 是專門用于執(zhí)行遞歸任務的線程池,特別適合大規(guī)模并行計算。
- 它將任務分割成多個子任務并通過遞歸的方式處理(“fork”),然后合并子任務的結(jié)果(“join”)。
- 在 ForkJoinPool 中,任務拆分采用工作竊取算法,盡量平衡工作負載,提升性能。
使用示例:
import java.util.concurrent.*; public class ForkJoinPoolExample { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); // 創(chuàng)建 ForkJoinPool int[] array = {1, 2, 3, 4, 5, 6, 7, 8}; RecursiveTask<Integer> task = new SumTask(array, 0, array.length); int result = pool.invoke(task); // 執(zhí)行任務并獲取結(jié)果 System.out.println("Sum is: " + result); } } class SumTask extends RecursiveTask<Integer> { private int[] array; private int start, end; public SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Integer compute() { if (end - start <= 2) { // 基礎情況 int sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; SumTask task1 = new SumTask(array, start, mid); SumTask task2 = new SumTask(array, mid, end); task1.fork(); // 異步執(zhí)行 task2.fork(); return task1.join() + task2.join(); // 合并結(jié)果 } } }
- CompletableFuture
特點:
- CompletableFuture 是 Java 8 引入的異步編程框架,允許你以非阻塞的方式處理任務。
- 它支持任務的組合、回調(diào)、異常處理等,適合用于處理復雜的異步任務鏈。
- 可以通過 supplyAsync()、thenApply() 等方法定義異步任務的執(zhí)行流程。
使用示例:
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 42; }); // 鏈式調(diào)用,處理結(jié)果 CompletableFuture<Integer> result = future.thenApplyAsync(value -> value * 2); System.out.println("Result: " + result.get()); // 輸出結(jié)果 }
各種實現(xiàn)方案總結(jié)
并發(fā)方式 | 特點 | 優(yōu)點 | 缺點 |
---|---|---|---|
Thread | - 最基本的線程創(chuàng)建方式- 通過繼承Thread 或?qū)崿F(xiàn)Runnable 接口創(chuàng)建任務 | - 簡單直觀 | - 需要手動管理線程,容易資源浪費或死鎖- 無法直接返回任務結(jié)果- 對復雜任務協(xié)調(diào)不便 |
ExecutorService | - 通過線程池管理線程- 提供任務的調(diào)度、執(zhí)行、生命周期管理 | - 提供線程池避免手動創(chuàng)建和銷毀線程,減少資源浪費- 支持任務的結(jié)果返回 | - 任務間依賴和組合較復雜-get() 方法阻塞線程,難以實現(xiàn)非阻塞 |
CountDownLatch | - 用于等待多個任務完成后執(zhí)行后續(xù)操作- 使用計數(shù)器控制任務執(zhí)行 | - 可以控制任務同步,確保多個任務完成后繼續(xù)執(zhí)行 | - 只適用于等待任務完成,無法處理任務的依賴關系- 只能使用一次 |
CyclicBarrier | - 用于多個線程在某一點上等待- 可重復使用,適合同步多任務 | - 可重復使用,適合多次任務同步 | - 不如CompletableFuture 靈活- 僅適合特定的同步場景 |
ForkJoinPool | - 專為遞歸分治任務設計的線程池- 支持任務拆分和合并 | - 高效利用多核處理器,適合分治算法- 支持任務拆分和合并 | - 對于非遞歸任務不適合- 異常處理不如CompletableFuture 靈活 |
CompletableFuture | - 基于Future 設計的異步編程API- 支持非阻塞的任務組合和回調(diào)處理 | - 支持鏈式調(diào)用,異步任務組合,避免阻塞- 可以處理異常,支持并行處理和同步等待- 支持thenApply、thenAccept 等多種處理方式,簡化代碼 | - 復雜任務時調(diào)試困難- 異常處理仍較為復雜- 比ExecutorService 稍顯復雜 |
- Thread:最基礎的并發(fā)方式,直接通過線程控制執(zhí)行,但缺乏高級功能。
- ExecutorService:基于線程池的高層接口,能夠有效管理線程資源和任務執(zhí)行。
- CountDownLatch、CyclicBarrier:用于線程間的同步協(xié)調(diào)。
CountDownLatch
等待特定任務完成,而CyclicBarrier
可重復用于多次任務同步。 - ForkJoinPool:適用于任務拆分和合并的場景,特別是遞歸分治任務。
- CompletableFuture:提供更靈活的異步任務處理方式,支持鏈式調(diào)用、異步執(zhí)行及異常處理,適合復雜的并發(fā)任務調(diào)度。
CompletableFuturede結(jié)構(gòu)
CompletableFuture實現(xiàn)了Future接口和CompletionStage接口。
結(jié)構(gòu)梳理
相關接口 | 描述 |
---|---|
Future | 是一個表示異步計算結(jié)果的接口。它提供了方法來檢查異步計算是否完成、獲取計算的結(jié)果以及取消計算。 |
CompletionStage | 是一個表示異步計算結(jié)果的接口,提供了處理計算結(jié)果的非阻塞操作。與 Future 不同,CompletionStage 采用鏈式調(diào)用,可以更靈活地組合多個異步操作。 |
- Future接口
Future接口是JDK 5引入的,該接口屬于java.util.concurrent包。
Future接口的目的是表示異步計算的結(jié)果,它允許你提交一個任務給一個 Executor(執(zhí)行器),并在稍后獲取任務的結(jié)果。
主要方法:
方法 | 描述 |
---|---|
get() | 阻塞當前線程,直到異步計算完成,并返回計算結(jié)果 |
get(long timeout, TimeUnit unit) | 阻塞當前線程,直到異步計算完成或超時,并返回計算結(jié)果 |
isDone() | 檢查異步計算是否完成 |
cancel(boolean mayInterruptIfRunning) | 嘗試取消異步計算 |
isCancelled() | 檢查異步計算是否被取消。 |
- CompletionStage接口
CompletionStage 接口是 Java 8 引入的一個重要接口,用于描述異步計算的生命周期和結(jié)果。
CompletionStage 提供了一套方法,用于處理異步計算的結(jié)果、組合多個計算、處理異常等。
主要方法:
方法 | 描述 |
---|---|
thenApply | 在當前階段完成后,應用給定的 Function,并返回一個新的 CompletionStage。 |
thenAcceptAsync | 異步地執(zhí)行指定的 Consumer,并返回一個新的 CompletionStage,該階段沒有結(jié)果。 |
thenComposeAsync | 異步地將當前階段的結(jié)果應用于一個返回 CompletionStage 的函數(shù),并返回一個新的 CompletionStage。 |
thenCombine | 在兩個 CompletionStage 都完成后,使用給定的 BiFunction 合并它們的結(jié)果,并返回一個新的 CompletionStage。 |
runAfterEitherAsync | 在任意一個給定的兩個 CompletionStage 完成后,異步地執(zhí)行指定的 Runnable。 |
thenAccept | 在當前階段完成后,執(zhí)行指定的 Consumer,并返回一個新的 CompletionStage,該階段沒有結(jié)果。 |
runAfterEither | 在任意一個給定的兩個 CompletionStage 完成后,執(zhí)行指定的 Runnable。 |
thenCombineAsync | 在兩個 CompletionStage 都完成后,異步地使用給定的 BiFunction 合并它們的結(jié)果,并返回一個新的 CompletionStage。 |
thenAcceptBothAsync | 在兩個 CompletionStage 都完成后,異步地執(zhí)行指定的 BiConsumer,并返回一個新的 CompletionStage。 |
applyToEither | 在兩個 CompletionStage 中任意一個完成后,應用給定的 Function,并返回一個新的 CompletionStage。 |
applyToEitherAsync | 在兩個 CompletionStage 中任意一個完成后,異步地應用給定的 Function,并返回一個新的 CompletionStage。 |
runAfterBothAsync | 在兩個 CompletionStage 都完成后,異步地執(zhí)行指定的 Runnable,并返回一個新的 CompletionStage。 |
thenAcceptBothAsync | 在兩個 CompletionStage 都完成后,異步地執(zhí)行指定的 BiConsumer。 |
acceptEitherAsync | 在兩個 CompletionStage 中任意一個完成后,異步地執(zhí)行指定的 Consumer,并返回一個新的 CompletionStage。 |
handleAsync | 異步地處理當前階段的結(jié)果或異常,應用給定的 BiFunction,并返回一個新的 CompletionStage。 |
thenComposeAsync | 同 thenCompose,但異步地應用給定的函數(shù),并返回一個新的 CompletionStage。 |
thenCombineAsync | 同 thenCombine,但異步地使用給定的 BiFunction 合并兩個 CompletionStage 的結(jié)果。 |
exceptionally | 如果當前階段以異常完成,則應用指定的 Function 處理該異常,并返回一個新的 CompletionStage。 |
acceptEither | 在兩個 CompletionStage 中任意一個完成后,執(zhí)行指定的 Consumer。 |
thenCompose | 將當前階段的結(jié)果應用于一個返回 CompletionStage 的函數(shù),并返回一個新的 CompletionStage。 |
handle | 處理當前階段的結(jié)果或異常,應用給定的 BiFunction,并返回一個新的 CompletionStage。 |
thenAcceptBoth | 在兩個 CompletionStage 都完成后,執(zhí)行指定的 BiConsumer。 |
thenApplyAsync | 異步地應用給定的 Function,并返回一個新的 CompletionStage。 |
whenCompleteAsync | 異步地執(zhí)行指定的 BiConsumer,無論結(jié)果如何,并返回一個新的 CompletionStage。 |
applyToEitherAsync | 同 applyToEither,但異步地應用給定的 Function,并返回一個新的 CompletionStage。 |
acceptEitherAsync | 同 acceptEither,但異步地執(zhí)行指定的 Consumer,并返回一個新的 CompletionStage。 |
runAfterEitherAsync | 同 runAfterEither,但異步地執(zhí)行指定的 Runnable,并返回一個新的 CompletionStage。 |
thenRunAsync | 異步地執(zhí)行指定的 Runnable,并返回一個新的 CompletionStage,該階段沒有結(jié)果。 |
runAfterBoth | 在兩個 CompletionStage 都完成后,執(zhí)行指定的 Runnable。 |
whenComplete | 在當前階段完成后,無論結(jié)果如何,執(zhí)行指定的 BiConsumer,并返回一個新的 CompletionStage。 |
thenRunAsync | 異步地執(zhí)行指定的 Runnable,并返回一個新的 CompletionStage,該階段沒有結(jié)果。 |
常用方法
方法 | 描述 |
---|---|
supplyAsync() | 異步地運行一個帶返回值的任務。 |
runAsync() | 異步地運行一個無返回值的任務。 |
thenApply() | 當 CompletableFuture 任務完成時執(zhí)行某個操作,并返回新的結(jié)果。 |
thenAccept() | 當任務完成時執(zhí)行某個操作,但不返回結(jié)果。 |
thenRun() | 當任務完成時執(zhí)行某個操作,無需返回結(jié)果。 |
exceptionally() | 用于處理任務執(zhí)行中發(fā)生的異常。 |
handle() | 處理任務執(zhí)行中的正常結(jié)果或異常結(jié)果。 |
allOf() | 等待多個 CompletableFuture 全部完成,返回一個新的 CompletableFuture。 |
anyOf() | 等待多個 CompletableFuture 中的任意一個完成。 |
CompletableFuture使用示例
1. 基本異步操作
CompletableFuture.supplyAsync() 和 CompletableFuture.runAsync() 是最常用的啟動異步任務的方法。
- supplyAsync() 用于執(zhí)行帶返回值的異步任務。
- runAsync() 用于執(zhí)行不帶返回值的異步任務。
public static void main(String[] args) throws ExecutionException, InterruptedException { // 帶返回值的異步任務 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模擬耗時任務 } catch (InterruptedException e) { e.printStackTrace(); } return 42; // 返回結(jié)果 }); // 獲取異步任務的結(jié)果 Integer result = future.get(); // 阻塞,直到任務完成 System.out.println("Result: " + result); }
2. 任務鏈式調(diào)用
通過 thenApply(), thenAccept(), thenRun() 等方法,可以將多個異步任務串聯(lián)在一起。
- thenApply() 用于處理任務的返回值。
- thenAccept() 用于消費返回值,但不返回結(jié)果。
- thenRun() 用于執(zhí)行沒有返回值的操作。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 42; // 返回結(jié)果 }); // 鏈式調(diào)用,先處理結(jié)果,再轉(zhuǎn)換 CompletableFuture<Integer> resultFuture = future .thenApply(value -> value * 2) // 將值乘以2 .thenApply(value -> value + 10); // 再加10 Integer result = resultFuture.get(); // 獲取最終結(jié)果 System.out.println("Final Result: " + result); // 輸出 94 }
3. 多個異步任務組合
使用 thenCombine()、thenCompose()、allOf() 和 anyOf() 等方法可以組合多個異步任務,執(zhí)行復雜的操作。
- thenCombine() 用于將兩個獨立的異步任務的結(jié)果合并。
- thenCompose() 用于將第一個異步任務的結(jié)果作為參數(shù)傳遞給下一個異步任務。
- allOf() 用于等待多個異步任務完成,并且不關心每個任務的結(jié)果。
- anyOf() 用于等待多個異步任務中的任意一個完成。
示例1:組合兩個異步任務
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { return 10; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { return 20; }); // 合并兩個任務的結(jié)果 CompletableFuture<Integer> combinedFuture = future1 .thenCombine(future2, (result1, result2) -> result1 + result2); // 將兩個結(jié)果相加 Integer result = combinedFuture.get(); // 獲取最終結(jié)果 System.out.println("Combined Result: " + result); // 輸出 30 }
示例2:使用 allOf() 等待多個任務完成
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); System.out.println("Task 1 completed"); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1500); System.out.println("Task 2 completed"); } catch (InterruptedException e) { e.printStackTrace(); } }); // 等待多個任務全部完成 CompletableFuture.allOf(future1, future2).join(); System.out.println("All tasks are completed."); }
4. 異常處理
在異步任務中,異常可能會發(fā)生。CompletableFuture 提供了 exceptionally() 和 handle() 方法來處理異常。
- exceptionally() 用于捕獲異常并提供替代值。
- handle() 可以處理正常結(jié)果和異常。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Something went wrong!"); } return 42; }); // 使用 exceptionally 處理異常并提供默認值 CompletableFuture<Integer> resultFuture = future.exceptionally(ex -> { System.out.println("Exception occurred: " + ex.getMessage()); return -1; // 返回默認值 }); Integer result = resultFuture.get(); // 獲取結(jié)果 System.out.println("Result: " + result); // 輸出 -1 }
5. 并行執(zhí)行多個任務
使用 CompletableFuture.supplyAsync() 或 runAsync() 來并行執(zhí)行多個任務,然后使用 allOf() 或 anyOf() 等方法等待這些任務的完成。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); return 1; } catch (InterruptedException e) { return 0; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); return 2; } catch (InterruptedException e) { return 0; } }); // 等待所有任務完成并合并結(jié)果 CompletableFuture<Integer> result = future1 .thenCombine(future2, (res1, res2) -> res1 + res2); // 將兩個結(jié)果相加 System.out.println("Combined result: " + result.get()); // 輸出 3 }
6. 處理返回值的轉(zhuǎn)換
通過 thenApply() 等方法可以對異步任務的結(jié)果進行轉(zhuǎn)換處理。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10); // 轉(zhuǎn)換結(jié)果:將值乘以2 CompletableFuture<Integer> transformedFuture = future.thenApply(value -> value * 2); System.out.println("Transformed Result: " + transformedFuture.get()); // 輸出 20 }
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
spring為java.util.Properties類型的屬性進行賦值過程解析
這篇文章主要介紹了spring為java.util.Properties類型的屬性進行賦值過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-01-01Java調(diào)用WebService服務的三種方式總結(jié)
雖然WebService這個框架已經(jīng)過時,但是有些公司還在使用,在調(diào)用他們的服務的時候就不得不面對各種問題,本篇文章總結(jié)了最近我調(diào)用?WebService的心路歷程,3種方式可以分別嘗試,需要的朋友可以參考下2023-08-08java實現(xiàn)ssh登錄linux并執(zhí)行命令的三種實現(xiàn)方式
文章介紹了三種在Java中實現(xiàn)SSH登錄Linux并執(zhí)行命令的方法,包括使用ganymed-ssh2、jsch和sshd-core,由于ganymed-ssh2和jsch的最新版本較舊,可能無法與較新的Linux系統(tǒng)兼容,而sshd-core一直在更新,推薦使用2024-11-11