欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Reactor如何優(yōu)雅Exception異常處理

 更新時間:2023年02月15日 11:51:15   作者:臨虹路365號  
初識響應(yīng)式編程的時候,除了從命令式的思維方式轉(zhuǎn)變?yōu)楹瘮?shù)式的編程方式外,其中有一個很大的不適應(yīng)的地方就是在面對異常時該怎么處理。本文將通過Project?Reactor的文檔以及源碼來深入解讀,在reactor中是如何優(yōu)雅地實現(xiàn)這異常處理三板斧,希望對大家有所幫助

初識響應(yīng)式編程的時候,除了從命令式的思維方式轉(zhuǎn)變?yōu)楹瘮?shù)式的編程方式外,其中有一個很大的不適應(yīng)的地方就是在面對異常時該怎么處理,尤其是面對檢查異常(Checked Exception)時更是不知所措。在遇到異常時,我們通用的處理方式就是打日志、降級兜底、重試三板斧,本文通過Project Reactor的文檔以及源碼來深入解讀,在reactor中是如何優(yōu)雅地實現(xiàn)這異常處理三板斧。

在介紹怎么使用前,我們先回顧下在用reactor編程的時候,遇到的幾個問題:

  • 遇到異常時,如果能處理,我該怎么兜底/降級
  • 遇到無法處理的異常時,我該怎么打印日志,并往外拋
  • 遇到聲明了檢查異常的方法時,該怎么處理
  • 如果調(diào)用失敗了(如請求超時),該如何重試
  • 如果出現(xiàn)異常了,流里面的后續(xù)數(shù)據(jù)還會繼續(xù)發(fā)送嗎

異常處理的底層機(jī)制

在回答這些問題,就需要我們首先對reactor處理異常的機(jī)制要有理解。先說結(jié)論,如文檔上說的:

Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.

即,一旦出現(xiàn)了異常,那原先的數(shù)據(jù)流就會直接結(jié)束了,是沒有辦法再恢復(fù)的。所以如果要降級兜底,那只能再替換一個新的流?;蛘咧卦?,但其實也是相當(dāng)于創(chuàng)建了新流,只是數(shù)據(jù)和原先的一樣。

那為什么原先的流就結(jié)束了呢? 或者說怎么就結(jié)束了呢?

先拋開異常處理的話題,我們回到最基礎(chǔ)的層面,如果要主動結(jié)束一個流,該怎么結(jié)束呢?比如一個網(wǎng)絡(luò)連接,如果出現(xiàn)異常了,流結(jié)束了,該怎么釋放資源呢? 我們知道,每次訂閱調(diào)用的時候,都會返回一個Disposable對象,如Disposable disposable = Flux.just(1,2,3).subscribe()。所以,如果要主動結(jié)束一個流,其實就是調(diào)用Disposable對象的dispose方法。再深入下去,就會發(fā)現(xiàn),其dispose方法內(nèi)部其實調(diào)用的是由publisher產(chǎn)生的subscription的cancel方法。只有調(diào)用cancel方法,才能完美的結(jié)束publisher并釋放資源。所以,要想結(jié)束一個流,只有調(diào)用subscription的cancel方法。

所以,當(dāng)出現(xiàn)異常時,原先的流會結(jié)束的原因,其實就是調(diào)用了subscription的cancel方法了。那是何時調(diào)用的呢,我們以FluxMap為例,看下源碼。

public void onNext(T t) {
    if (done) {
        Operators.onNextDropped(t, actual.currentContext());
        return;
    }
    R v;
    try {
        v = Objects.requireNonNull(mapper.apply(t),
                "The mapper returned a null value.");
    }
    catch (Throwable e) {
        Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
        if (e_ != null) {
            onError(e_);
        }
        else {
            s.request(1);
        }
        return;
    }
    actual.onNext(v);
}

可以看到,在數(shù)據(jù)處理onNext()的方法內(nèi)部,通常都有類似的try-catch結(jié)構(gòu)。當(dāng)出現(xiàn)異常時,會先對異常進(jìn)行處理,確定是否需要處理異常以及怎么處理,其邏輯都在Operators.onNextError這個方法里。若要處理異常時,則會進(jìn)入onError的流程里。
下面重點來看看Operators.onNextError這個方法,它主要主要包含了兩件事:

  • 這個異常要不要吃掉,當(dāng)做非異常處理:具體可以參見Operators.onNextErrorStrategy方法
  • 這個異常要不要往下傳,即如果是嚴(yán)重異常時,則直接拋出;否則,對訂閱進(jìn)行cancel。詳見Operators.onOperatorError方法.

減少篇幅起見,這里只貼下Operators.onOperatorError的方法,其實onNextErrorStrategy方法也很重要,它會從context里拿出對指定異常能處理的strategy來進(jìn)行處理,通過傳入的value以及產(chǎn)生的error決定是否要拋出這個異常。

public static Throwable onOperatorError(@Nullable Subscription subscription,
        Throwable error,
        @Nullable Object dataSignal, Context context) {

    Exceptions.throwIfFatal(error);
    if(subscription != null) {
        subscription.cancel();
    }

    Throwable t = Exceptions.unwrap(error);
    BiFunction<? super Throwable, Object, ? extends Throwable> hook =
            context.getOrDefault(Hooks.KEY_ON_OPERATOR_ERROR, null);
    if (hook == null) {
        hook = Hooks.onOperatorErrorHook;
    }
    if (hook == null) {
        if (dataSignal != null) {
            if (dataSignal != t && dataSignal instanceof Throwable) {
                t = Exceptions.addSuppressed(t, (Throwable) dataSignal);
            }
            //do not wrap original value to avoid strong references
            /*else {
            }*/
        }
        return t;
    }
    return hook.apply(error, dataSignal);
}

從onOperatorError的源碼里可以看到:

  • Exceptions.throwIfFatal決定是否直接拋出異常還是進(jìn)入onError流程。比如JVM的異常,例如OutOfMemory之類的,就會直接拋出。
  • 接下來,就是我們前面想要的答案,異常時數(shù)據(jù)流會結(jié)束的原因,就是默認(rèn)會調(diào)用subscription.cancel()。
  • 最后,是對異常處理的鉤子hook,可以通過傳入的onNext的數(shù)據(jù)dataSingal以及異常error進(jìn)行處理,例如打印日志或者進(jìn)行異常的轉(zhuǎn)化,甚至是吃掉異常。

實操 —— try-catch-finally的平替

在了解了reactor中對異常處理的機(jī)制后,我們看看有哪些操作符可以用來代替以前命令式編程中的try-catch-finally的結(jié)構(gòu)。遇到異常時,我們通用的方式就是打日志、降級兜底、重試三板斧,下面我們具體看看在reactor中是怎么實現(xiàn)的。

Flux.just(1,2,3)
    .doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" ))
    .map(t -> {
        if (t == 2) {
          throw new IllegalArgumentException("Exception:"+t);
        }
        return t;
    })
    .doOnError(e -> System.out.println("log: error happened with [" + e.getMessage() + "]"))
    .doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" ))
    .onErrorReturn(42)
//        .onErrorResume(e -> Flux.just(11,12,13))
    .subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));

這是一個使用示例,其輸出如下,下面會對這些操作符進(jìn)行介紹。

data:1
Finally: [cancel]
log: error happened with [Exception:2]
data:42
Completed!
Finally: [onError]

1. 降級兜底 - onErrorReturn/onErrorResume

當(dāng)遇到異常的時候,可以使用onErrorReturn來處理,返回一個默認(rèn)值,其底層實現(xiàn)其實用的還是onErrorResume。相比onErrorReturn只能返回一個默認(rèn)值而言,onErrorResume更靈活,它可以根據(jù)不同的error類型,還實現(xiàn)不同的返回值,其fallback函數(shù)的入?yún)⑹钱惓n愋?,返回的則是一個Publisher。所以從返回值也可以看出來,onErrorReturn/onErrorResume返回的是一個新的流,舊的流已經(jīng)在發(fā)生異常的時候就結(jié)束了。

public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate,
        Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {
    Objects.requireNonNull(predicate, "predicate");
    return onErrorResume(e -> predicate.test(e) ? fallback.apply(e) : error(e));
}

這是onErrorResume的實現(xiàn),其提供了特別靈活的處理方式,predicate決定是否要進(jìn)行fallback,fallback下可以根據(jù)異常類型返回任意的數(shù)據(jù)流。假如不需要對異常fallback,即predicate為false時,則直接返回FluxError的封裝,進(jìn)入onError階段。

再補(bǔ)充一點,既然fallback是返回的一個新的流,那么即可以fallback返回一個單值,例如onErrorReturn那樣,也可以返回多個值的數(shù)據(jù)流,例如:

Flux.just(1,2,3)
    .map(t -> {
        if (t == 2) {
          throw new IllegalArgumentException("Exception:"+t);
        }
        return t;
    })
   .onErrorResume(e -> Flux.just(11,12,13))
   .subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));

輸出結(jié)果會是:1 11 12 13。

其實,對于Flux而言,因為是多值的數(shù)據(jù)流,其很難根據(jù)異常error就進(jìn)行合適的兜底,因為兜底往往取決于輸入,而異常的時候往往丟失了數(shù)據(jù)data的信息了,所以對于Flux的降級,onErrorReturn/onErrorResume實用性不是太強(qiáng),因為返回的新的流很難替代舊的流,甚至你都不知道舊的Flux流有多少數(shù)據(jù)量。

相比之下,onErrorReturn/onErrorResume用于單值的Mono就顯得更為合適了。由于用法相同,這里就不過多贅述了。

所以,對于Flux流的降級兜底是個很困難的事情,有一種方式可以讓onErrorReturn/Resume獲取到當(dāng)前的數(shù)據(jù)data,那就是利用前面小節(jié)中說到的,增加Operators.onOperatorError中的onOperatorErrorHook,把數(shù)據(jù)data塞入異常中再返回給onErrorReturn/Resume。

最后再看下FluxOnErrorResume是怎么實現(xiàn)降級的。

public void onError(Throwable t) {
    if (!second) {
        second = true;

        Publisher<? extends T> p;

        try {
            //nextFactory即fallback函數(shù)
            p = Objects.requireNonNull(nextFactory.apply(t),
            "The nextFactory returned a null Publisher");
        }
        catch (Throwable e) {
            Throwable _e = Operators.onOperatorError(e, actual.currentContext());
            _e = Exceptions.addSuppressed(_e, t);
            actual.onError(_e);
            return;
        }
        //重新訂閱一個新的流,其source就是fallback函數(shù)產(chǎn)生的publisher
        p.subscribe(this);
    }
    else {
        actual.onError(t);
    }
}

當(dāng)上游出現(xiàn)異常時,例如先前示例中的FluxMap,就會進(jìn)入onError階段,此時正好被onErrorResume的onError階段攔截,然后利用fallback函數(shù)產(chǎn)生新的流,再重新訂閱p.subscribe(this)。另外,也可以看出,新的流只會作用于onErrorResume之后的operator,前面的operator則不會有作用。

2. 打印日志 - doOnError

打印日志就比較簡單了,可以用doOnError方法來實現(xiàn)。doOnError的底層則用的FluxPeek來實現(xiàn),其作用是覆寫了所有的接口,如onNext,onError, cancel等,通過覆寫來實現(xiàn)hook。幾乎所有doOnXXX的方法都是依賴FluxPeek實現(xiàn)的,例如log、doOnNext、doOnError等等。由于與本次的主題無關(guān),不再贅述,感興趣的可以自行翻看FluxPeek的實現(xiàn)。

需要注意的是:雖然doOnXXX主要用于打印日志,但如果doOnXXX內(nèi)部出錯,也會導(dǎo)致整個流結(jié)束,進(jìn)入onError階段。所以,也是有副作用的,仍然在主流程中。

3. finally - doFinally

try-catch-finally中的關(guān)鍵字finally可以通過方法doFinally來平替。需要注意的是doFinally方法的執(zhí)行順序以及觸發(fā)時機(jī)。

通常,finally的含義是保證100%被執(zhí)行,也就是出錯onError的時候執(zhí)行,正常結(jié)束onComplete也執(zhí)行。但在reactor中,除了這兩個事件外,還未能保證doFinally百分之百執(zhí)行,還需要增加cancel的情況。其原因是,當(dāng)出現(xiàn)異常后,對于異常的上游會走cancel流程,下游則走onError流程。如先前的示例,觸發(fā)doFinally的信號分別是:cancel與onError。

最后再說一下執(zhí)行順序,如先前的示例中那樣,doFinally并不是按出現(xiàn)的順序執(zhí)行,也不是一定是在最后執(zhí)行的(這個區(qū)別與finally關(guān)鍵詞差別很大)。其原因在于,當(dāng)出現(xiàn)異常時,會先cancel掉原先的數(shù)據(jù)流,再調(diào)用onError處理(可以參見前面FluxMap的源碼)。

所以,示例中,"Finally: [cancel]"會先被打印,然后才是onErrorReturn的執(zhí)行,即進(jìn)入onError階段。

那為什么第二個doFinally雖然出現(xiàn)在onErrorReturn之前,但又是最后執(zhí)行的呢?這是因為在實現(xiàn)doFinally的時候,先調(diào)用了下游的onError方法,再執(zhí)行自身doFinally的方法,參見FluxDoFinally的實現(xiàn):

public void onError(Throwable t) {
    try {
        actual.onError(t);
    }
    finally {
        runFinally(SignalType.ON_ERROR);
    }
}

這樣就能符合try-catch-finally的執(zhí)行順序了。

所以,doFinally出現(xiàn)的位置很重要,若出現(xiàn)在異常前面,就會優(yōu)先執(zhí)行(不會像finally那樣最后執(zhí)行),若出現(xiàn)在異常后面,則會最后執(zhí)行(類似finally)。

4. try-with-resource

對于try-with-resource,reactor也給了替代的實現(xiàn),那就是using操作符:

public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
        Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) {
    return onAssembly(new FluxUsing<>(resourceSupplier,
            sourceSupplier,
            resourceCleanup,
            eager));
}

其中resourceSupplier是創(chuàng)建生成資源的函數(shù),sourceSupplier則是針對生成的resource進(jìn)行操作并產(chǎn)生數(shù)據(jù)流,resourceCleanup則是在結(jié)束后(不管成功還是失敗)進(jìn)行資源的釋放。

以try-with-resource為例:

try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}

利用using函數(shù),則可以寫成:

Flux.using(
        () -> new SomeAutoCloseable(), 
        disposableInstance -> Flux.just(disposableInstance.toString()), 
        AutoCloseable::close 
);

5. 重試 - retry / retryWhen

除了以上方式處理異常時,還有一種常見的方式就是重試。比如,我們調(diào)用某個接口超時時,通常會重試一次,這個時候可以使用retry方法,如:

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .subscribe(System.out::println, System.err::println); 

會對Flux流執(zhí)行兩次,其結(jié)果是:0 1 2 0 1 2,即當(dāng)遇到data為3時,會重試一次。

其基本思想很簡單,就是攔截onError流程,計算重試的次數(shù),如果重試未超過,則重新訂閱:

public void onError(Throwable t) {
    long r = remaining;
    if (r != Long.MAX_VALUE) {
        if (r == 0) {
            actual.onError(t);
            return;
        }
        remaining = r - 1;
    }
    resubscribe();
}

這里的remaining就是可以重試的次數(shù),直到重試為0,再一次進(jìn)入actual.onError。重新訂閱的方法也很簡單,就是把上游的source與下游的actual,再來一次subscribe:source.subscribe(actual)。

除了retry外,還有一個高級版本retryWhen,它除了能像retry那樣重試固定的次數(shù)外,還能支持各種重試策略,由于retryWhen的源碼相對復(fù)雜,這里不再敘述(畢竟本文不是源碼解讀),但除了重試策略有區(qū)別外,其重試的機(jī)制還是一樣的,把上游與下游重新訂閱。

6. 檢查異常處理

在java中有一類異常是需要顯示進(jìn)行處理的,那就是檢查異常(Checked Exception),如IOException。在命令式編程中,可以通過throws關(guān)鍵字來聲明,從而可以把異常往外拋,而不需要立即處理。然而,遺憾的是,在reactor中,并沒有類似的平替,不管任何情況,當(dāng)遇到檢查異常,reactor中都需要用try-catch來處理,這是唯一一個在reactor中沒有找到命令式編程中的平替。與命令式編程有throws關(guān)鍵字聲明不同,reactor中處理檢查異常都必須用try-catch來處理,處理的方式有以下三種:

  • 捕獲到異常并從中恢復(fù)。序列繼續(xù)正常的進(jìn)行。
  • 捕獲異常,將其封裝成一個 不檢查 的異常,然后將其拋出(中斷序列)。
  • 如果你需要返回一個 Flux(例如,在 flatMap 中),那么就用一個產(chǎn)生錯誤的 Flux 來封裝異常,如下所示:return Flux.error(checkedException)。(這個序列也會終止。)

這三種方式中,其中最常見也最常用的方式就是第二種,將檢查異常轉(zhuǎn)化為非檢查異常,如throw new RuntimeException(e)。但是reactor提供了輔助工具類Exceptions,進(jìn)而可以相對優(yōu)雅簡潔的進(jìn)行統(tǒng)一處理。

如以下這個例子(來自reactor的文檔):

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}
Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });
converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

由于convert聲明了檢查異常IOException,所以必須要try-catch住,再利用Exceptions.propagate來封裝為非檢查異常。相比于直接用throw new RuntimeException(e),利用Exceptions的好處在onError處理階段可以用Exceptions.unwrap()方法來獲取內(nèi)部真實拋出的異常,體現(xiàn)了利用工具類的好處——簡潔明了。

總結(jié)

本文先從reactor異常處理的底層機(jī)制講起,講清楚了一個基本概念:只要出現(xiàn)異常,不管如何處理,舊的流都已經(jīng)結(jié)束,接下來處理的都是新的流。在這基礎(chǔ)上,按命令式編程中的try-catch-finally的方式,用reactor的方式進(jìn)行了一一替代介紹,希望通過對比的方式,能更好的掌握在reactor中如何優(yōu)雅的處理異常。

到此這篇關(guān)于詳解Reactor如何優(yōu)雅Exception異常處理的文章就介紹到這了,更多相關(guān)Reactor Exception異常處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java詳細(xì)分析String類與StringBuffer和StringBuilder的使用方法

    Java詳細(xì)分析String類與StringBuffer和StringBuilder的使用方法

    當(dāng)對字符串進(jìn)行修改的時候,需要使用 StringBuffer 和 StringBuilder類,和String類不同的是,StringBuffer和 StringBuilder類的對象能夠被多次的修改,并且不產(chǎn)生新的未使用對象
    2022-04-04
  • 利用logback filter過濾某個類 屏蔽某個類

    利用logback filter過濾某個類 屏蔽某個類

    這篇文章主要介紹了利用logback filter過濾某個類 屏蔽某個類的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • springboot項目部署到k8s上的方法步驟

    springboot項目部署到k8s上的方法步驟

    本文主要介紹了springboot項目部署到k8s上的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • 通過實例解析Java類初始化和實例初始化

    通過實例解析Java類初始化和實例初始化

    這篇文章主要介紹了通過實例解析Java類初始化和實例初始化,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-11-11
  • java 內(nèi)部類的實例詳解

    java 內(nèi)部類的實例詳解

    這篇文章主要介紹了java 內(nèi)部類的實例詳解的相關(guān)資料,希望通過本文大家能夠理解掌握java內(nèi)部類的使用,需要的朋友可以參考下
    2017-09-09
  • springboot接入微信app支付的方法

    springboot接入微信app支付的方法

    本文使用springboot集成微信支付服務(wù),包含微信統(tǒng)一支付訂單接口,以及支付回調(diào)接口等,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-05-05
  • Netty啟動流程服務(wù)端channel初始化源碼分析

    Netty啟動流程服務(wù)端channel初始化源碼分析

    這篇文章主要為大家介紹了Netty啟動流程服務(wù)端channel初始化源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-03-03
  • Hibernate核心類和接口的詳細(xì)介紹

    Hibernate核心類和接口的詳細(xì)介紹

    今天小編就為大家分享一篇關(guān)于Hibernate核心類和接口的詳細(xì)介紹,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • Java實戰(zhàn)項目練習(xí)之球館在線預(yù)約系統(tǒng)的實現(xiàn)

    Java實戰(zhàn)項目練習(xí)之球館在線預(yù)約系統(tǒng)的實現(xiàn)

    理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實現(xiàn)一個球館在線預(yù)約系統(tǒng),大家可以在過程中查缺補(bǔ)漏,提升水平
    2022-01-01
  • Java防止文件被篡改之文件校驗功能的實例代碼

    Java防止文件被篡改之文件校驗功能的實例代碼

    這篇文章主要介紹了Java防止文件被篡改之文件校驗功能,本文給大家分享了文件校驗和原理及具體實現(xiàn)思路,需要的朋友可以參考下
    2018-11-11

最新評論