解讀CompletableFuture的底層原理
引言
在現(xiàn)代 Java 編程中,異步編程變得越來越重要。為了實(shí)現(xiàn)高效和非阻塞的代碼,Java 8 引入了 CompletableFuture
,一個用于構(gòu)建異步應(yīng)用程序的強(qiáng)大工具。
本文將詳細(xì)探討 CompletableFuture
的底層原理,展示其工作機(jī)制,并通過代碼示例說明如何在實(shí)際應(yīng)用中使用它。
異步編程的背景
異步編程是指在程序運(yùn)行過程中,不等待某個操作完成,而是繼續(xù)執(zhí)行其他操作,待異步操作完成后再處理其結(jié)果。這樣可以提高程序的效率,特別是在 I/O 操作和網(wǎng)絡(luò)請求等耗時操作中。
在 Java 8 之前,實(shí)現(xiàn)異步編程主要依賴于 Future
接口。然而,Future
存在一些局限性,例如無法手動完成、不能鏈?zhǔn)秸{(diào)用等。為了解決這些問題,Java 8 引入了 CompletableFuture
。
什么是 CompletableFuture
CompletableFuture
是 Java 8 中新增的類,實(shí)現(xiàn)了 Future
和 CompletionStage
接口,提供了強(qiáng)大的異步編程能力。
CompletableFuture
允許以非阻塞的方式執(zhí)行任務(wù),并且可以通過鏈?zhǔn)秸{(diào)用來組合多個異步操作。
CompletableFuture 的特點(diǎn)
- 手動完成:可以手動設(shè)置
CompletableFuture
的結(jié)果或異常。 - 鏈?zhǔn)秸{(diào)用:支持多個
CompletableFuture
的鏈?zhǔn)秸{(diào)用,形成復(fù)雜的異步任務(wù)流。 - 組合操作:提供了豐富的方法來組合多個異步任務(wù),例如
thenCombine
、thenAcceptBoth
等。 - 異常處理:提供了靈活的異常處理機(jī)制,可以在任務(wù)鏈中處理異常。
CompletableFuture 的底層原理
工作機(jī)制
CompletableFuture
的核心是基于 ForkJoinPool
實(shí)現(xiàn)的。ForkJoinPool
是一種特殊的線程池,適用于并行計(jì)算任務(wù)。它采用了工作竊取算法,能夠有效利用多核 CPU 的性能。
當(dāng)我們提交一個任務(wù)給 CompletableFuture
時,它會將任務(wù)提交到默認(rèn)的 ForkJoinPool.commonPool()
中執(zhí)行。我們也可以指定自定義的線程池來執(zhí)行任務(wù)。
狀態(tài)管理
CompletableFuture
具有以下幾種狀態(tài):
- 未完成(Pending):任務(wù)尚未完成。
- 完成(Completed):任務(wù)已經(jīng)成功完成,并返回結(jié)果。
- 異常(Exceptionally Completed):任務(wù)在執(zhí)行過程中拋出了異常。
這些狀態(tài)通過內(nèi)部的 volatile
變量來管理,并使用 CAS(Compare-And-Swap)
操作保證線程安全。
任務(wù)調(diào)度
CompletableFuture
的任務(wù)調(diào)度機(jī)制基于 ForkJoinPool
的工作竊取算法。當(dāng)一個線程完成當(dāng)前任務(wù)后,會從其他線程的任務(wù)隊(duì)列中竊取任務(wù)執(zhí)行,從而提高 CPU 利用率。
下面我們通過一個簡單的示例代碼來理解 CompletableFuture
的基本用法。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { // 創(chuàng)建一個 CompletableFuture 實(shí)例 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Hello, World!"; }); // 阻塞等待結(jié)果 String result = future.get(); System.out.println(result); } }
在上面的示例中,我們創(chuàng)建了一個 CompletableFuture
實(shí)例,并使用 supplyAsync
方法異步執(zhí)行任務(wù)。
supplyAsync
方法會將任務(wù)提交到默認(rèn)的 ForkJoinPool
中執(zhí)行。最后,我們使用 get
方法阻塞等待結(jié)果并打印輸出。
鏈?zhǔn)秸{(diào)用
CompletableFuture
的一個重要特性是支持鏈?zhǔn)秸{(diào)用。
通過鏈?zhǔn)秸{(diào)用,我們可以將多個異步任務(wù)組合在一起,形成一個任務(wù)流。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureChainExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Hello, World!"; }).thenApply(result -> { return result + " from CompletableFuture"; }).thenApply(String::toUpperCase); String finalResult = future.get(); System.out.println(finalResult); } }
在這個示例中,我們使用 thenApply
方法對前一個任務(wù)的結(jié)果進(jìn)行處理,并返回一個新的 CompletableFuture
實(shí)例。
通過鏈?zhǔn)秸{(diào)用,我們可以將多個任務(wù)串聯(lián)在一起,形成一個任務(wù)流。
組合操作
CompletableFuture
提供了多種方法來組合多個異步任務(wù)。以下是一些常用的組合操作示例:
1.thenCombine:組合兩個 CompletableFuture
,并將兩個任務(wù)的結(jié)果進(jìn)行處理。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureCombineExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, Integer::sum); System.out.println(combinedFuture.get()); // 輸出 15 } }
2. thenAcceptBoth:組合兩個 CompletableFuture
,并對兩個任務(wù)的結(jié)果進(jìn)行消費(fèi)處理。
import java.util.concurrent.CompletableFuture; public class CompletableFutureAcceptBothExample { public static void main(String[] args) { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10); future1.thenAcceptBoth(future2, (result1, result2) -> { System.out.println("Result: " + (result1 + result2)); }).join(); } }
3. allOf:組合多個 CompletableFuture
,并在所有任務(wù)完成后執(zhí)行操作。
import java.util.concurrent.CompletableFuture; public class CompletableFutureAllOfExample { public static void main(String[] args) { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("Task 1 completed"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("Task 2 completed"); }); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2); combinedFuture.join(); System.out.println("All tasks completed"); } }
異常處理
在異步任務(wù)中處理異常是非常重要的。CompletableFuture
提供了多種方法來處理任務(wù)執(zhí)行過程中的異常。
1.exceptionally:在任務(wù)拋出異常時,提供一個默認(rèn)值。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExceptionallyExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Exception occurred"); } return "Hello, World!"; }).exceptionally(ex -> { System.out.println("Exception: " + ex.getMessage()); return "Default Value"; }); System.out.println(future.get()); // 輸出 Default Value } }
2. handle:無論任務(wù)是否拋出異常,都進(jìn)行處理。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureHandleExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Exception occurred"); } return "Hello, World!"; }).handle((result, ex) -> { if (ex != null) { return "Default Value"; } return result; }); System.out.println(future.get()); // 輸出 Default Value } }
實(shí)戰(zhàn)案例:構(gòu)建異步數(shù)據(jù)處理管道
為了更好地理解 CompletableFuture
的實(shí)際應(yīng)用,我們來構(gòu)建一個異步數(shù)據(jù)處理管道。
假設(shè)我們有一個數(shù)據(jù)源,需要對數(shù)據(jù)進(jìn)行一系列的處理操作,并將處理結(jié)果輸出到文件中。
數(shù)據(jù)源模擬
我們首先模擬一個數(shù)據(jù)源,該數(shù)據(jù)源會生成一系列數(shù)據(jù)。
import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; public class DataSource { public List<Integer> getData() { return IntStream.range(0, 10).boxed().collect(Collectors.toList()); } }
數(shù)據(jù)處理
接下來,我們定義數(shù)據(jù)處理操作。
假設(shè)我們需要對數(shù)據(jù)進(jìn)行兩步處理:首先對每個數(shù)據(jù)乘以 2,然后對結(jié)果進(jìn)行累加。
import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class DataProcessor { public List<Integer> processStep1(List<Integer> data) { return data.stream().map(x -> x * 2).collect(Collectors.toList()); } public Integer processStep2(List<Integer> data) { return data.stream().reduce(0, Integer::sum); } public CompletableFuture<List<Integer>> processStep1Async(List<Integer> data) { return CompletableFuture.supplyAsync(() -> processStep1(data)); } public CompletableFuture<Integer> processStep2Async(List<Integer> data) { return CompletableFuture.supplyAsync(() -> processStep2(data)); } }
結(jié)果輸出
我們定義一個方法將處理結(jié)果輸出到文件中。
import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture; public class ResultWriter { public void writeResult(String fileName, Integer result) throws IOException { Files.write(Paths.get(fileName), result.toString().getBytes()); } public CompletableFuture<Void> writeResultAsync(String fileName, Integer result) { return CompletableFuture.runAsync(() -> { try { writeResult(fileName, result); } catch (IOException e) { throw new IllegalStateException(e); } }); } }
主程序
最后,我們在主程序中將上述組件組合在一起,構(gòu)建異步數(shù)據(jù)處理管道。
import java.util.List; import java.util.concurrent.CompletableFuture; public class Main { public static void main(String[] args) { DataSource dataSource = new DataSource(); DataProcessor dataProcessor = new DataProcessor(); ResultWriter resultWriter = new ResultWriter(); List<Integer> data = dataSource.getData(); CompletableFuture<List<Integer>> step1Future = dataProcessor.processStep1Async(data); CompletableFuture<Integer> step2Future = step1Future.thenCompose(dataProcessor::processStep2Async); CompletableFuture<Void> writeFuture = step2Future.thenCompose(result -> resultWriter.writeResultAsync("result.txt", result)); writeFuture.join(); System.out.println("Data processing completed"); } }
在這個例子中,我們使用 CompletableFuture
將數(shù)據(jù)處理步驟和結(jié)果輸出串聯(lián)在一起,形成了一個完整的異步數(shù)據(jù)處理管道。
通過 thenCompose
方法,我們將前一個任務(wù)的結(jié)果傳遞給下一個異步任務(wù),從而實(shí)現(xiàn)了鏈?zhǔn)秸{(diào)用。
總結(jié)
本文深入探討了 CompletableFuture
的底層原理,展示了其工作機(jī)制,并通過多個代碼示例說明了如何在實(shí)際應(yīng)用中使用 CompletableFuture
。通過理解 CompletableFuture
的異步編程模型、狀態(tài)管理、任務(wù)調(diào)度和異常處理機(jī)制,我們可以更好地利用這一強(qiáng)大的工具構(gòu)建高效、非阻塞的 Java 應(yīng)用程序。
希望這篇文章能夠幫助你全面理解 CompletableFuture
,并在實(shí)際開發(fā)中靈活應(yīng)用。這些僅為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
JVM調(diào)優(yōu)參數(shù)的設(shè)置
Java虛擬機(jī)的調(diào)優(yōu)是一個復(fù)雜而關(guān)鍵的任務(wù),可以通過多種參數(shù)來實(shí)現(xiàn),本文就來介紹一下JVM調(diào)優(yōu)參數(shù)的設(shè)置,具有一定的參考價值,感興趣的可以了解一下2024-03-03SpringBoot項(xiàng)目中的視圖解析器問題(兩種)
SpringBoot官網(wǎng)推薦使用HTML視圖解析器,但是根據(jù)個人的具體業(yè)務(wù)也有可能使用到JSP視圖解析器,所以本文介紹了兩種視圖解析器,感興趣的可以了解下2020-06-06Maven包沖突導(dǎo)致NoSuchMethodError錯誤的解決辦法
web 項(xiàng)目 能正常編譯,運(yùn)行時也正常啟動,但執(zhí)行到需要調(diào)用 org.codehaus.jackson 包中的某個方法時,產(chǎn)生運(yùn)行異常,這篇文章主要介紹了Maven包沖突導(dǎo)致NoSuchMethodError錯誤的解決辦法,需要的朋友可以參考下2024-05-05Java JDK動態(tài)代理(AOP)的實(shí)現(xiàn)原理與使用詳析
所謂代理,就是一個人或者一個機(jī)構(gòu)代表另一個人或者另一個機(jī)構(gòu)采取行動。下面這篇文章主要給大家介紹了關(guān)于Java JDK動態(tài)代理(AOP)實(shí)現(xiàn)原理與使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07Java Spring的數(shù)據(jù)庫開發(fā)詳解
這篇文章主要介紹了Spring的數(shù)據(jù)庫開發(fā),主要圍繞SpringJDBC和Spring Jdbc Template兩個技術(shù)來講解,文中有詳細(xì)的代碼示例,需要的小伙伴可以參考一下2023-04-04SpringSecurity 自定義認(rèn)證登錄的項(xiàng)目實(shí)踐
本文主要介紹了SpringSecurity 自定義認(rèn)證登錄的項(xiàng)目實(shí)踐,以手機(jī)驗(yàn)證碼登錄為例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08java如何確定一個鏈表有環(huán)及入口節(jié)點(diǎn)
這篇文章主要介紹了java如何確定一個鏈表有環(huán)及入口節(jié)點(diǎn),想了解數(shù)據(jù)結(jié)構(gòu)的同學(xué)可以參考下2021-04-04java比較器Comparable接口與Comaprator接口的深入分析
本篇文章是對java比較器Comparable接口與Comaprator接口進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-06-06Java?設(shè)計(jì)模式以虹貓藍(lán)兔的故事講解簡單工廠模式
簡單工廠模式是屬于創(chuàng)建型模式,又叫做靜態(tài)工廠方法(Static Factory Method)模式,但不屬于23種GOF設(shè)計(jì)模式之一。簡單工廠模式是由一個工廠對象決定創(chuàng)建出哪一種產(chǎn)品類的實(shí)例。簡單工廠模式是工廠模式家族中最簡單實(shí)用的模式,可以理解為是不同工廠模式的一個特殊實(shí)現(xiàn)2022-03-03