詳解Reactor如何優(yōu)雅Exception異常處理
初識響應(yīng)式編程的時候,除了從命令式的思維方式轉(zhuǎn)變?yōu)楹瘮?shù)式的編程方式外,其中有一個很大的不適應(yīng)的地方就是在面對異常時該怎么處理,尤其是面對檢查異常(Checked Exception)時更是不知所措。在遇到異常時,我們通用的處理方式就是打日志、降級兜底、重試三板斧,本文通過Project Reactor的文檔以及源碼來深入解讀,在reactor中是如何優(yōu)雅地實現(xiàn)這異常處理三板斧。
在介紹怎么使用前,我們先回顧下在用reactor編程的時候,遇到的幾個問題:
- 遇到異常時,如果能處理,我該怎么兜底/降級
- 遇到無法處理的異常時,我該怎么打印日志,并往外拋
- 遇到聲明了檢查異常的方法時,該怎么處理
- 如果調(diào)用失敗了(如請求超時),該如何重試
- 如果出現(xiàn)異常了,流里面的后續(xù)數(shù)據(jù)還會繼續(xù)發(fā)送嗎
異常處理的底層機(jī)制
在回答這些問題,就需要我們首先對reactor處理異常的機(jī)制要有理解。先說結(jié)論,如文檔上說的:
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
即,一旦出現(xiàn)了異常,那原先的數(shù)據(jù)流就會直接結(jié)束了,是沒有辦法再恢復(fù)的。所以如果要降級兜底,那只能再替換一個新的流?;蛘咧卦?,但其實也是相當(dāng)于創(chuàng)建了新流,只是數(shù)據(jù)和原先的一樣。
那為什么原先的流就結(jié)束了呢? 或者說怎么就結(jié)束了呢?
先拋開異常處理的話題,我們回到最基礎(chǔ)的層面,如果要主動結(jié)束一個流,該怎么結(jié)束呢?比如一個網(wǎng)絡(luò)連接,如果出現(xiàn)異常了,流結(jié)束了,該怎么釋放資源呢? 我們知道,每次訂閱調(diào)用的時候,都會返回一個Disposable對象,如Disposable disposable = Flux.just(1,2,3).subscribe()
。所以,如果要主動結(jié)束一個流,其實就是調(diào)用Disposable對象的dispose方法。再深入下去,就會發(fā)現(xiàn),其dispose方法內(nèi)部其實調(diào)用的是由publisher產(chǎn)生的subscription的cancel方法。只有調(diào)用cancel方法,才能完美的結(jié)束publisher并釋放資源。所以,要想結(jié)束一個流,只有調(diào)用subscription的cancel方法。
所以,當(dāng)出現(xiàn)異常時,原先的流會結(jié)束的原因,其實就是調(diào)用了subscription的cancel方法了。那是何時調(diào)用的呢,我們以FluxMap為例,看下源碼。
public void onNext(T t) { if (done) { Operators.onNextDropped(t, actual.currentContext()); return; } R v; try { v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null value."); } catch (Throwable e) { Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s); if (e_ != null) { onError(e_); } else { s.request(1); } return; } actual.onNext(v); }
可以看到,在數(shù)據(jù)處理onNext()的方法內(nèi)部,通常都有類似的try-catch結(jié)構(gòu)。當(dāng)出現(xiàn)異常時,會先對異常進(jìn)行處理,確定是否需要處理異常以及怎么處理,其邏輯都在Operators.onNextError這個方法里。若要處理異常時,則會進(jìn)入onError的流程里。
下面重點來看看Operators.onNextError這個方法,它主要主要包含了兩件事:
- 這個異常要不要吃掉,當(dāng)做非異常處理:具體可以參見Operators.onNextErrorStrategy方法
- 這個異常要不要往下傳,即如果是嚴(yán)重異常時,則直接拋出;否則,對訂閱進(jìn)行cancel。詳見Operators.onOperatorError方法.
減少篇幅起見,這里只貼下Operators.onOperatorError的方法,其實onNextErrorStrategy方法也很重要,它會從context里拿出對指定異常能處理的strategy來進(jìn)行處理,通過傳入的value以及產(chǎn)生的error決定是否要拋出這個異常。
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context) { Exceptions.throwIfFatal(error); if(subscription != null) { subscription.cancel(); } Throwable t = Exceptions.unwrap(error); BiFunction<? super Throwable, Object, ? extends Throwable> hook = context.getOrDefault(Hooks.KEY_ON_OPERATOR_ERROR, null); if (hook == null) { hook = Hooks.onOperatorErrorHook; } if (hook == null) { if (dataSignal != null) { if (dataSignal != t && dataSignal instanceof Throwable) { t = Exceptions.addSuppressed(t, (Throwable) dataSignal); } //do not wrap original value to avoid strong references /*else { }*/ } return t; } return hook.apply(error, dataSignal); }
從onOperatorError的源碼里可以看到:
- Exceptions.throwIfFatal決定是否直接拋出異常還是進(jìn)入onError流程。比如JVM的異常,例如OutOfMemory之類的,就會直接拋出。
- 接下來,就是我們前面想要的答案,異常時數(shù)據(jù)流會結(jié)束的原因,就是默認(rèn)會調(diào)用subscription.cancel()。
- 最后,是對異常處理的鉤子hook,可以通過傳入的onNext的數(shù)據(jù)dataSingal以及異常error進(jìn)行處理,例如打印日志或者進(jìn)行異常的轉(zhuǎn)化,甚至是吃掉異常。
實操 —— try-catch-finally的平替
在了解了reactor中對異常處理的機(jī)制后,我們看看有哪些操作符可以用來代替以前命令式編程中的try-catch-finally的結(jié)構(gòu)。遇到異常時,我們通用的方式就是打日志、降級兜底、重試三板斧,下面我們具體看看在reactor中是怎么實現(xiàn)的。
Flux.just(1,2,3) .doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" )) .map(t -> { if (t == 2) { throw new IllegalArgumentException("Exception:"+t); } return t; }) .doOnError(e -> System.out.println("log: error happened with [" + e.getMessage() + "]")) .doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" )) .onErrorReturn(42) // .onErrorResume(e -> Flux.just(11,12,13)) .subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));
這是一個使用示例,其輸出如下,下面會對這些操作符進(jìn)行介紹。
data:1
Finally: [cancel]
log: error happened with [Exception:2]
data:42
Completed!
Finally: [onError]
1. 降級兜底 - onErrorReturn/onErrorResume
當(dāng)遇到異常的時候,可以使用onErrorReturn來處理,返回一個默認(rèn)值,其底層實現(xiàn)其實用的還是onErrorResume。相比onErrorReturn只能返回一個默認(rèn)值而言,onErrorResume更靈活,它可以根據(jù)不同的error類型,還實現(xiàn)不同的返回值,其fallback函數(shù)的入?yún)⑹钱惓n愋?,返回的則是一個Publisher。所以從返回值也可以看出來,onErrorReturn/onErrorResume返回的是一個新的流,舊的流已經(jīng)在發(fā)生異常的時候就結(jié)束了。
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Publisher<? extends T>> fallback) { Objects.requireNonNull(predicate, "predicate"); return onErrorResume(e -> predicate.test(e) ? fallback.apply(e) : error(e)); }
這是onErrorResume的實現(xiàn),其提供了特別靈活的處理方式,predicate決定是否要進(jìn)行fallback,fallback下可以根據(jù)異常類型返回任意的數(shù)據(jù)流。假如不需要對異常fallback,即predicate為false時,則直接返回FluxError的封裝,進(jìn)入onError階段。
再補(bǔ)充一點,既然fallback是返回的一個新的流,那么即可以fallback返回一個單值,例如onErrorReturn那樣,也可以返回多個值的數(shù)據(jù)流,例如:
Flux.just(1,2,3) .map(t -> { if (t == 2) { throw new IllegalArgumentException("Exception:"+t); } return t; }) .onErrorResume(e -> Flux.just(11,12,13)) .subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));
輸出結(jié)果會是:1 11 12 13
。
其實,對于Flux而言,因為是多值的數(shù)據(jù)流,其很難根據(jù)異常error就進(jìn)行合適的兜底,因為兜底往往取決于輸入,而異常的時候往往丟失了數(shù)據(jù)data的信息了,所以對于Flux的降級,onErrorReturn/onErrorResume實用性不是太強(qiáng),因為返回的新的流很難替代舊的流,甚至你都不知道舊的Flux流有多少數(shù)據(jù)量。
相比之下,onErrorReturn/onErrorResume用于單值的Mono就顯得更為合適了。由于用法相同,這里就不過多贅述了。
所以,對于Flux流的降級兜底是個很困難的事情,有一種方式可以讓onErrorReturn/Resume獲取到當(dāng)前的數(shù)據(jù)data,那就是利用前面小節(jié)中說到的,增加Operators.onOperatorError中的onOperatorErrorHook,把數(shù)據(jù)data塞入異常中再返回給onErrorReturn/Resume。
最后再看下FluxOnErrorResume是怎么實現(xiàn)降級的。
public void onError(Throwable t) { if (!second) { second = true; Publisher<? extends T> p; try { //nextFactory即fallback函數(shù) p = Objects.requireNonNull(nextFactory.apply(t), "The nextFactory returned a null Publisher"); } catch (Throwable e) { Throwable _e = Operators.onOperatorError(e, actual.currentContext()); _e = Exceptions.addSuppressed(_e, t); actual.onError(_e); return; } //重新訂閱一個新的流,其source就是fallback函數(shù)產(chǎn)生的publisher p.subscribe(this); } else { actual.onError(t); } }
當(dāng)上游出現(xiàn)異常時,例如先前示例中的FluxMap,就會進(jìn)入onError階段,此時正好被onErrorResume的onError階段攔截,然后利用fallback函數(shù)產(chǎn)生新的流,再重新訂閱p.subscribe(this)
。另外,也可以看出,新的流只會作用于onErrorResume之后的operator,前面的operator則不會有作用。
2. 打印日志 - doOnError
打印日志就比較簡單了,可以用doOnError方法來實現(xiàn)。doOnError的底層則用的FluxPeek來實現(xiàn),其作用是覆寫了所有的接口,如onNext,onError, cancel等,通過覆寫來實現(xiàn)hook。幾乎所有doOnXXX的方法都是依賴FluxPeek實現(xiàn)的,例如log、doOnNext、doOnError等等。由于與本次的主題無關(guān),不再贅述,感興趣的可以自行翻看FluxPeek的實現(xiàn)。
需要注意的是:雖然doOnXXX主要用于打印日志,但如果doOnXXX內(nèi)部出錯,也會導(dǎo)致整個流結(jié)束,進(jìn)入onError階段。所以,也是有副作用的,仍然在主流程中。
3. finally - doFinally
try-catch-finally中的關(guān)鍵字finally可以通過方法doFinally來平替。需要注意的是doFinally方法的執(zhí)行順序以及觸發(fā)時機(jī)。
通常,finally的含義是保證100%被執(zhí)行,也就是出錯onError的時候執(zhí)行,正常結(jié)束onComplete也執(zhí)行。但在reactor中,除了這兩個事件外,還未能保證doFinally百分之百執(zhí)行,還需要增加cancel的情況。其原因是,當(dāng)出現(xiàn)異常后,對于異常的上游會走cancel流程,下游則走onError流程。如先前的示例,觸發(fā)doFinally的信號分別是:cancel與onError。
最后再說一下執(zhí)行順序,如先前的示例中那樣,doFinally并不是按出現(xiàn)的順序執(zhí)行,也不是一定是在最后執(zhí)行的(這個區(qū)別與finally關(guān)鍵詞差別很大)。其原因在于,當(dāng)出現(xiàn)異常時,會先cancel掉原先的數(shù)據(jù)流,再調(diào)用onError處理(可以參見前面FluxMap的源碼)。
所以,示例中,"Finally: [cancel]"會先被打印,然后才是onErrorReturn的執(zhí)行,即進(jìn)入onError階段。
那為什么第二個doFinally雖然出現(xiàn)在onErrorReturn之前,但又是最后執(zhí)行的呢?這是因為在實現(xiàn)doFinally的時候,先調(diào)用了下游的onError方法,再執(zhí)行自身doFinally的方法,參見FluxDoFinally的實現(xiàn):
public void onError(Throwable t) { try { actual.onError(t); } finally { runFinally(SignalType.ON_ERROR); } }
這樣就能符合try-catch-finally的執(zhí)行順序了。
所以,doFinally出現(xiàn)的位置很重要,若出現(xiàn)在異常前面,就會優(yōu)先執(zhí)行(不會像finally那樣最后執(zhí)行),若出現(xiàn)在異常后面,則會最后執(zhí)行(類似finally)。
4. try-with-resource
對于try-with-resource,reactor也給了替代的實現(xiàn),那就是using操作符:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) { return onAssembly(new FluxUsing<>(resourceSupplier, sourceSupplier, resourceCleanup, eager)); }
其中resourceSupplier是創(chuàng)建生成資源的函數(shù),sourceSupplier則是針對生成的resource進(jìn)行操作并產(chǎn)生數(shù)據(jù)流,resourceCleanup則是在結(jié)束后(不管成功還是失敗)進(jìn)行資源的釋放。
以try-with-resource為例:
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { return disposableInstance.toString(); }
利用using函數(shù),則可以寫成:
Flux.using( () -> new SomeAutoCloseable(), disposableInstance -> Flux.just(disposableInstance.toString()), AutoCloseable::close );
5. 重試 - retry / retryWhen
除了以上方式處理異常時,還有一種常見的方式就是重試。比如,我們調(diào)用某個接口超時時,通常會重試一次,這個時候可以使用retry方法,如:
Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .retry(1) .subscribe(System.out::println, System.err::println);
會對Flux流執(zhí)行兩次,其結(jié)果是:0 1 2 0 1 2
,即當(dāng)遇到data為3時,會重試一次。
其基本思想很簡單,就是攔截onError流程,計算重試的次數(shù),如果重試未超過,則重新訂閱:
public void onError(Throwable t) { long r = remaining; if (r != Long.MAX_VALUE) { if (r == 0) { actual.onError(t); return; } remaining = r - 1; } resubscribe(); }
這里的remaining
就是可以重試的次數(shù),直到重試為0,再一次進(jìn)入actual.onError。重新訂閱的方法也很簡單,就是把上游的source
與下游的actual
,再來一次subscribe:source.subscribe(actual)
。
除了retry
外,還有一個高級版本retryWhen
,它除了能像retry那樣重試固定的次數(shù)外,還能支持各種重試策略,由于retryWhen的源碼相對復(fù)雜,這里不再敘述(畢竟本文不是源碼解讀),但除了重試策略有區(qū)別外,其重試的機(jī)制還是一樣的,把上游與下游重新訂閱。
6. 檢查異常處理
在java中有一類異常是需要顯示進(jìn)行處理的,那就是檢查異常(Checked Exception),如IOException。在命令式編程中,可以通過throws關(guān)鍵字來聲明,從而可以把異常往外拋,而不需要立即處理。然而,遺憾的是,在reactor中,并沒有類似的平替,不管任何情況,當(dāng)遇到檢查異常,reactor中都需要用try-catch來處理,這是唯一一個在reactor中沒有找到命令式編程中的平替。與命令式編程有throws關(guān)鍵字聲明不同,reactor中處理檢查異常都必須用try-catch來處理,處理的方式有以下三種:
- 捕獲到異常并從中恢復(fù)。序列繼續(xù)正常的進(jìn)行。
- 捕獲異常,將其封裝成一個 不檢查 的異常,然后將其拋出(中斷序列)。
- 如果你需要返回一個 Flux(例如,在 flatMap 中),那么就用一個產(chǎn)生錯誤的 Flux 來封裝異常,如下所示:return Flux.error(checkedException)。(這個序列也會終止。)
這三種方式中,其中最常見也最常用的方式就是第二種,將檢查異常轉(zhuǎn)化為非檢查異常,如throw new RuntimeException(e)
。但是reactor提供了輔助工具類Exceptions,進(jìn)而可以相對優(yōu)雅簡潔的進(jìn)行統(tǒng)一處理。
如以下這個例子(來自reactor的文檔):
public String convert(int i) throws IOException { if (i > 3) { throw new IOException("boom " + i); } return "OK " + i; } Flux<String> converted = Flux .range(1, 10) .map(i -> { try { return convert(i); } catch (IOException e) { throw Exceptions.propagate(e); } }); converted.subscribe( v -> System.out.println("RECEIVED: " + v), e -> { if (Exceptions.unwrap(e) instanceof IOException) { System.out.println("Something bad happened with I/O"); } else { System.out.println("Something bad happened"); } } );
由于convert聲明了檢查異常IOException,所以必須要try-catch住,再利用Exceptions.propagate來封裝為非檢查異常。相比于直接用throw new RuntimeException(e)
,利用Exceptions的好處在onError處理階段可以用Exceptions.unwrap()方法來獲取內(nèi)部真實拋出的異常,體現(xiàn)了利用工具類的好處——簡潔明了。
總結(jié)
本文先從reactor異常處理的底層機(jī)制講起,講清楚了一個基本概念:只要出現(xiàn)異常,不管如何處理,舊的流都已經(jīng)結(jié)束,接下來處理的都是新的流。在這基礎(chǔ)上,按命令式編程中的try-catch-finally的方式,用reactor的方式進(jìn)行了一一替代介紹,希望通過對比的方式,能更好的掌握在reactor中如何優(yōu)雅的處理異常。
到此這篇關(guān)于詳解Reactor如何優(yōu)雅Exception異常處理的文章就介紹到這了,更多相關(guān)Reactor Exception異常處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java詳細(xì)分析String類與StringBuffer和StringBuilder的使用方法
當(dāng)對字符串進(jìn)行修改的時候,需要使用 StringBuffer 和 StringBuilder類,和String類不同的是,StringBuffer和 StringBuilder類的對象能夠被多次的修改,并且不產(chǎn)生新的未使用對象2022-04-04Netty啟動流程服務(wù)端channel初始化源碼分析
這篇文章主要為大家介紹了Netty啟動流程服務(wù)端channel初始化源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03Java實戰(zhàn)項目練習(xí)之球館在線預(yù)約系統(tǒng)的實現(xiàn)
理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實現(xiàn)一個球館在線預(yù)約系統(tǒng),大家可以在過程中查缺補(bǔ)漏,提升水平2022-01-01