欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程工具CompletableFuture的使用教程

 更新時(shí)間:2022年08月19日 14:29:18   作者:Real_man  
CompletableFuture實(shí)現(xiàn)了CompletionStage接口和Future接口,前者是對后者的一個(gè)擴(kuò)展,增加了異步回調(diào)、流式處理、多個(gè)Future組合處理的能力。本文就來詳細(xì)講講CompletableFuture的使用方式,需要的可以參考一下

前言

Future的問題

寫多線程程序的時(shí)候,可以使用Future從一個(gè)異步線程中拿到結(jié)果,但是如果使用過程中會(huì)發(fā)現(xiàn)一些問題:

  • 如果想要對Future的結(jié)果做進(jìn)一步的操作,需要阻塞當(dāng)前線程
  • 多個(gè)Future不能被鏈?zhǔn)降膱?zhí)行,每個(gè)Future的結(jié)果都是獨(dú)立的,期望對一個(gè)Future的結(jié)果做另外一件異步的事情;
  • 沒有異常處理策略,如果Future執(zhí)行失敗了,需要手動(dòng)捕捉

CompletableFuture應(yīng)運(yùn)而生

為了解決Future問題,JDK在1.8的時(shí)候給我們提供了一個(gè)好用的工具類CompletableFuture;

它實(shí)現(xiàn)了Future和CompletionStage接口,針對Future的不足之處給出了相應(yīng)的處理方式。

  • 在異步線程執(zhí)行結(jié)束后可以自動(dòng)回調(diào)我們新的處理邏輯,無需阻塞
  • 可以對多個(gè)異步任務(wù)進(jìn)行編排,組合或者排序
  • 異常處理

CompletableFuture的核心思想是將每個(gè)異步任務(wù)都可以看做一個(gè)步驟(CompletionStage),然后其他的異步任務(wù)可以根據(jù)這個(gè)步驟做一些想做的事情。

CompletionStage定義了許多步驟處理的方法,功能非常強(qiáng)大,這里就只列一下日常中常用到的一些方法供大家參考。

使用方式

基本使用-提交異步任務(wù)

簡單的使用方式

異步執(zhí)行,無需結(jié)果:

// 可以執(zhí)行Executors異步執(zhí)行,如果不指定,默認(rèn)使用ForkJoinPool
CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

異步執(zhí)行,同時(shí)返回結(jié)果:

// 同樣可以指定線程池
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
System.out.println(stringCompletableFuture.get());

處理上個(gè)異步任務(wù)結(jié)果

  • thenRun: 不需要上一步的結(jié)果,直接直接新的操作

  • thenAccept:獲取上一步異步處理的內(nèi)容,進(jìn)行新的操作

  • thenApply: 獲取上一步的內(nèi)容,然后產(chǎn)生新的內(nèi)容

所有加上Async后綴的,代表新的處理操作仍然是異步的。Async的操作都可以指定Executors進(jìn)行處理

// Demo
       CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                // 針對上一步的結(jié)果做處理,產(chǎn)生新的結(jié)果
                .thenApplyAsync(s -> s.toUpperCase())
                // 針對上一步的結(jié)果做處理,不返回結(jié)果
                .thenAcceptAsync(s -> System.out.println(s))
                // 不需要上一步返回的結(jié)果,直接進(jìn)行操作
                .thenRunAsync(() -> System.out.println("end"));
        ;

對兩個(gè)結(jié)果進(jìn)行選用-acceptEither

當(dāng)我們有兩個(gè)回調(diào)在處理的時(shí)候,任何完成都可以使用,兩者結(jié)果沒有關(guān)系,那么使用acceptEither。

兩個(gè)異步線程誰先執(zhí)行完成,用誰的結(jié)果,其余類型的方法也是如此。

// 返回abc
CompletableFuture
                .supplyAsync(() -> {
                    SleepUtils.sleep(100);
                    return "Hello CompletableFuture!";
                })
                .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });
// 返回Hello CompletableFuture!       
CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                .acceptEither(CompletableFuture.supplyAsync(() -> {
                    SleepUtils.sleep(100);
                    return "abc";
                }), new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });

對兩個(gè)結(jié)果進(jìn)行合并-thenCombine, thenAcceptBoth

thenCombine

當(dāng)我們有兩個(gè)CompletionStage時(shí),需要對兩個(gè)的結(jié)果進(jìn)行整合處理,然后計(jì)算得出一個(gè)新的結(jié)果。

  • thenCompose是對上一個(gè)CompletionStage的結(jié)果進(jìn)行處理,返回結(jié)果,并且返回類型必須是CompletionStage。
  • thenCombine是得到第一個(gè)CompletionStage的結(jié)果,然后拿到當(dāng)前的CompletionStage,兩者的結(jié)果進(jìn)行處理。
        CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);

        CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                    @Override
                    public Double apply(Integer wight, Integer height) {
                        return wight * 10000.0 / (height * height);
                    }
                })
                ;

thenAcceptBoth

需要兩個(gè)異步CompletableFuture的結(jié)果,兩者都完成的時(shí)候,才進(jìn)入thenAcceptBoth回調(diào)。

// thenAcceptBoth案例:
        CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                		// 參數(shù)一為我們剛開始運(yùn)行時(shí)的CompletableStage,新傳入的作為第二個(gè)參數(shù)
                    @Override
                    public void accept(String s, String s2) {
                        System.out.println("param1=" + s + ", param2=" + s2);
                    }
                });
// 結(jié)果:param1=Hello CompletableFuture!, param2=abc

異常處理

當(dāng)我們使用CompleteFuture進(jìn)行鏈?zhǔn)秸{(diào)用的時(shí)候,多個(gè)異步回調(diào)中,如果有一個(gè)執(zhí)行出現(xiàn)問題,那么接下來的回調(diào)都會(huì)停止,所以需要一種異常處理策略。

exceptionally

exceptionally是當(dāng)出現(xiàn)錯(cuò)誤時(shí),給我們機(jī)會(huì)進(jìn)行恢復(fù),自定義返回內(nèi)容。

        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("發(fā)生錯(cuò)誤");
        }).exceptionally(throwable -> {
            log.error("調(diào)用錯(cuò)誤 {}", throwable.getMessage(), throwable);
            return "異常處理內(nèi)容";
        });

handle

exceptionally是只有發(fā)生異常時(shí)才會(huì)執(zhí)行,而handle則是不管是否發(fā)生錯(cuò)誤都會(huì)執(zhí)行。

CompletableFuture.supplyAsync(() -> {
    return "abc";
})
.handle((r,err) -> {
    log.error("調(diào)用錯(cuò)誤 {}", err.getMessage(), err);
    // 對結(jié)果做額外的處理
    return r;
})
;

案例

大量用戶發(fā)送短信|消息

需求為對某個(gè)表中特定條件的用戶進(jìn)行短信通知,但是短信用戶有成百上千萬,如果使用單線程讀取效率會(huì)很慢。這個(gè)時(shí)候可以考慮使用多線程的方式進(jìn)行讀??;

1、將讀取任務(wù)拆分為多個(gè)不同的子任務(wù),指定讀取的偏移量和個(gè)數(shù)

  // 假設(shè)有500萬條記錄
        long recordCount = 500 * 10000;
        int subTaskRecordCount = 10000;
        // 對記錄進(jìn)行分片
        List<Map> subTaskList = new LinkedList<>();
        for (int i = 0; i < recordCount / 500; i++) {
            // 如果子任務(wù)結(jié)構(gòu)復(fù)雜,建議使用對象
            HashMap<String, Integer> subTask = new HashMap<>();
            subTask.put("index", i);
            subTask.put("offset", i * subTaskRecordCount);
            subTask.put("count", subTaskRecordCount);
            subTaskList.add(subTask);
        }

2、使用多線程進(jìn)行批量讀取

  // 進(jìn)行subTask批量處理,拆分為不同的任務(wù)
        subTaskList.stream()
                .map(subTask -> CompletableFuture.runAsync(()->{
                    // 讀取數(shù)據(jù),然后處理
                    // dataTunel.read(subTask);
                },excuturs))   // 使用應(yīng)用的通用任務(wù)線程池
                .map(c -> ((CompletableFuture<?>) c).join());

3、進(jìn)行業(yè)務(wù)邏輯處理,或者直接在讀取完進(jìn)行業(yè)務(wù)邏輯處理也是可以;

并發(fā)獲取商品不同信息

在系統(tǒng)拆分比較細(xì)的時(shí)候,價(jià)格,優(yōu)惠券,庫存,商品詳情等信息分散在不同的系統(tǒng)中,有時(shí)候需要同時(shí)獲取商品的所有信息, 有時(shí)候可能只需要獲取商品的部分信息。

當(dāng)然問題點(diǎn)在于要調(diào)用多個(gè)不同的系統(tǒng),需要將RT降低下來,那么需要進(jìn)行并發(fā)調(diào)用;

     List<Task> taskList = new ArrayList<>();
        List<Object> result = taskList.stream()
                .map(task -> CompletableFuture.supplyAsync(()->{
//                    handlerMap.get(task).query();
                    return "";
                }, executorService))
                .map(c -> c.join())
                .collect(Collectors.toList());

問題

thenRun和thenRunAsync有什么區(qū)別

  • 如果不使用傳入的線程池,大家用默認(rèn)的線程池ForkJoinPool
  • thenRun用的默認(rèn)和上一個(gè)任務(wù)使用相同的線程池
  • thenRunAsync在執(zhí)行新的任務(wù)的時(shí)候可以接受傳入一個(gè)新的線程池,使用新的線程池執(zhí)行任務(wù);

handle和exceptional有什么區(qū)別

exceptionally是只有發(fā)生異常時(shí)才會(huì)執(zhí)行,而handle則是不管是否發(fā)生錯(cuò)誤都會(huì)執(zhí)行。

最后

一般情況下上述簡單的API已經(jīng)滿足絕大部分的場景了,如果有更復(fù)雜的訴求,可繼續(xù)深入研究。

到此這篇關(guān)于Java多線程工具CompletableFuture的使用教程的文章就介紹到這了,更多相關(guān)Java CompletableFuture內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java實(shí)現(xiàn)微信點(diǎn)餐申請微信退款

    java實(shí)現(xiàn)微信點(diǎn)餐申請微信退款

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)微信點(diǎn)餐申請微信退款,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-09-09
  • Java微服務(wù)間接口調(diào)用 feign

    Java微服務(wù)間接口調(diào)用 feign

    這篇文章主要介紹了微服務(wù)間的接口調(diào)用feign,F(xiàn)eign是一種聲明式、模板化的HTTP客戶端。在spring cloud中使用Feign,可以做到類似于普通的接口的請求調(diào)用,感興趣的小伙伴可以參考閱讀
    2023-03-03
  • Java的動(dòng)態(tài)代理和靜態(tài)代理及反射常用API詳解

    Java的動(dòng)態(tài)代理和靜態(tài)代理及反射常用API詳解

    這篇文章主要介紹了Java的動(dòng)態(tài)代理和靜態(tài)代理及反射常用API詳解,動(dòng)態(tài)代理是一種在運(yùn)行時(shí)動(dòng)態(tài)生成代理對象的技術(shù),它是一種設(shè)計(jì)模式,用于在不修改原始對象的情況下,通過代理對象來間接訪問原始對象,并在訪問前后執(zhí)行額外的操作,需要的朋友可以參考下
    2024-01-01
  • JAVA字符串類型switch的底層原理詳析

    JAVA字符串類型switch的底層原理詳析

    這篇文章主要給大家介紹了關(guān)于JAVA字符串類型switch的底層原理的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用JAVA具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • Java+MyBatis+MySQL開發(fā)環(huán)境搭建流程詳解

    Java+MyBatis+MySQL開發(fā)環(huán)境搭建流程詳解

    Java的MyBatis框架提供了強(qiáng)大的數(shù)據(jù)庫操作支持,這里我們先在本地的開發(fā)環(huán)境中上手,來看一下Java+MyBatis+MySQL開發(fā)環(huán)境搭建流程詳
    2016-06-06
  • 詳解Java創(chuàng)建線程的五種常見方式

    詳解Java創(chuàng)建線程的五種常見方式

    Java中如何進(jìn)行多線程編程,如何使用多線程?不要擔(dān)心,本文將為你詳細(xì)介紹一下Java實(shí)現(xiàn)線程創(chuàng)建的五種常見方式,感興趣的可以跟隨小編學(xué)習(xí)一下
    2022-01-01
  • 修改Springboot默認(rèn)序列化工具Jackson配置的實(shí)例代碼

    修改Springboot默認(rèn)序列化工具Jackson配置的實(shí)例代碼

    這篇文章主要介紹了如何修改Springboot默認(rèn)序列化工具Jackson的配置,當(dāng)Spring容器中存在多個(gè)同類型的Bean時(shí),默認(rèn)情況下最后一個(gè)創(chuàng)建的Bean將作為首選Bean,文中通過代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2024-02-02
  • java中JsonObject與JsonArray轉(zhuǎn)換方法實(shí)例

    java中JsonObject與JsonArray轉(zhuǎn)換方法實(shí)例

    在項(xiàng)目日常開發(fā)中常常會(huì)遇到JSONArray和JSONObject的轉(zhuǎn)換,很多公司剛?cè)肼毜男∶刃聲?huì)卡在這里,下面這篇文章主要給大家介紹了關(guān)于java中JsonObject與JsonArray轉(zhuǎn)換方法的相關(guān)資料,需要的朋友可以參考下
    2023-04-04
  • java實(shí)戰(zhàn)技巧之if-else代碼優(yōu)化技巧大全

    java實(shí)戰(zhàn)技巧之if-else代碼優(yōu)化技巧大全

    代碼中如果if-else比較多,閱讀起來比較困難,維護(hù)起來也比較困難,很容易出bug,下面這篇文章主要給大家介紹了關(guān)于java實(shí)戰(zhàn)技巧之if-else代碼優(yōu)化技巧的相關(guān)資料,需要的朋友可以參考下
    2022-02-02
  • Spring MVC文件上傳大小和類型限制以及超大文件上傳bug問題

    Spring MVC文件上傳大小和類型限制以及超大文件上傳bug問題

    這篇文章主要介紹了Spring MVC文件上傳大小和類型限制以及超大文件上傳bug問題,非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2017-10-10

最新評論