詳解Reactor如何優(yōu)雅Exception異常處理
初識(shí)響應(yīng)式編程的時(shí)候,除了從命令式的思維方式轉(zhuǎn)變?yōu)楹瘮?shù)式的編程方式外,其中有一個(gè)很大的不適應(yīng)的地方就是在面對(duì)異常時(shí)該怎么處理,尤其是面對(duì)檢查異常(Checked Exception)時(shí)更是不知所措。在遇到異常時(shí),我們通用的處理方式就是打日志、降級(jí)兜底、重試三板斧,本文通過(guò)Project Reactor的文檔以及源碼來(lái)深入解讀,在reactor中是如何優(yōu)雅地實(shí)現(xiàn)這異常處理三板斧。
在介紹怎么使用前,我們先回顧下在用reactor編程的時(shí)候,遇到的幾個(gè)問(wèn)題:
- 遇到異常時(shí),如果能處理,我該怎么兜底/降級(jí)
- 遇到無(wú)法處理的異常時(shí),我該怎么打印日志,并往外拋
- 遇到聲明了檢查異常的方法時(shí),該怎么處理
- 如果調(diào)用失敗了(如請(qǐng)求超時(shí)),該如何重試
- 如果出現(xiàn)異常了,流里面的后續(xù)數(shù)據(jù)還會(huì)繼續(xù)發(fā)送嗎
異常處理的底層機(jī)制
在回答這些問(wèn)題,就需要我們首先對(duì)reactor處理異常的機(jī)制要有理解。先說(shuō)結(jié)論,如文檔上說(shuō)的:
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
即,一旦出現(xiàn)了異常,那原先的數(shù)據(jù)流就會(huì)直接結(jié)束了,是沒(méi)有辦法再恢復(fù)的。所以如果要降級(jí)兜底,那只能再替換一個(gè)新的流?;蛘咧卦嚕鋵?shí)也是相當(dāng)于創(chuàng)建了新流,只是數(shù)據(jù)和原先的一樣。
那為什么原先的流就結(jié)束了呢? 或者說(shuō)怎么就結(jié)束了呢?
先拋開(kāi)異常處理的話題,我們回到最基礎(chǔ)的層面,如果要主動(dòng)結(jié)束一個(gè)流,該怎么結(jié)束呢?比如一個(gè)網(wǎng)絡(luò)連接,如果出現(xiàn)異常了,流結(jié)束了,該怎么釋放資源呢? 我們知道,每次訂閱調(diào)用的時(shí)候,都會(huì)返回一個(gè)Disposable對(duì)象,如Disposable disposable = Flux.just(1,2,3).subscribe()
。所以,如果要主動(dòng)結(jié)束一個(gè)流,其實(shí)就是調(diào)用Disposable對(duì)象的dispose方法。再深入下去,就會(huì)發(fā)現(xiàn),其dispose方法內(nèi)部其實(shí)調(diào)用的是由publisher產(chǎn)生的subscription的cancel方法。只有調(diào)用cancel方法,才能完美的結(jié)束publisher并釋放資源。所以,要想結(jié)束一個(gè)流,只有調(diào)用subscription的cancel方法。
所以,當(dāng)出現(xiàn)異常時(shí),原先的流會(huì)結(jié)束的原因,其實(shí)就是調(diào)用了subscription的cancel方法了。那是何時(shí)調(diào)用的呢,我們以FluxMap為例,看下源碼。
public void onNext(T t) { if (done) { Operators.onNextDropped(t, actual.currentContext()); return; } R v; try { v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null value."); } catch (Throwable e) { Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s); if (e_ != null) { onError(e_); } else { s.request(1); } return; } actual.onNext(v); }
可以看到,在數(shù)據(jù)處理onNext()的方法內(nèi)部,通常都有類似的try-catch結(jié)構(gòu)。當(dāng)出現(xiàn)異常時(shí),會(huì)先對(duì)異常進(jìn)行處理,確定是否需要處理異常以及怎么處理,其邏輯都在Operators.onNextError這個(gè)方法里。若要處理異常時(shí),則會(huì)進(jìn)入onError的流程里。
下面重點(diǎn)來(lái)看看Operators.onNextError這個(gè)方法,它主要主要包含了兩件事:
- 這個(gè)異常要不要吃掉,當(dāng)做非異常處理:具體可以參見(jiàn)Operators.onNextErrorStrategy方法
- 這個(gè)異常要不要往下傳,即如果是嚴(yán)重異常時(shí),則直接拋出;否則,對(duì)訂閱進(jìn)行cancel。詳見(jiàn)Operators.onOperatorError方法.
減少篇幅起見(jiàn),這里只貼下Operators.onOperatorError的方法,其實(shí)onNextErrorStrategy方法也很重要,它會(huì)從context里拿出對(duì)指定異常能處理的strategy來(lái)進(jìn)行處理,通過(guò)傳入的value以及產(chǎn)生的error決定是否要拋出這個(gè)異常。
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context) { Exceptions.throwIfFatal(error); if(subscription != null) { subscription.cancel(); } Throwable t = Exceptions.unwrap(error); BiFunction<? super Throwable, Object, ? extends Throwable> hook = context.getOrDefault(Hooks.KEY_ON_OPERATOR_ERROR, null); if (hook == null) { hook = Hooks.onOperatorErrorHook; } if (hook == null) { if (dataSignal != null) { if (dataSignal != t && dataSignal instanceof Throwable) { t = Exceptions.addSuppressed(t, (Throwable) dataSignal); } //do not wrap original value to avoid strong references /*else { }*/ } return t; } return hook.apply(error, dataSignal); }
從onOperatorError的源碼里可以看到:
- Exceptions.throwIfFatal決定是否直接拋出異常還是進(jìn)入onError流程。比如JVM的異常,例如OutOfMemory之類的,就會(huì)直接拋出。
- 接下來(lái),就是我們前面想要的答案,異常時(shí)數(shù)據(jù)流會(huì)結(jié)束的原因,就是默認(rèn)會(huì)調(diào)用subscription.cancel()。
- 最后,是對(duì)異常處理的鉤子hook,可以通過(guò)傳入的onNext的數(shù)據(jù)dataSingal以及異常error進(jìn)行處理,例如打印日志或者進(jìn)行異常的轉(zhuǎn)化,甚至是吃掉異常。
實(shí)操 —— try-catch-finally的平替
在了解了reactor中對(duì)異常處理的機(jī)制后,我們看看有哪些操作符可以用來(lái)代替以前命令式編程中的try-catch-finally的結(jié)構(gòu)。遇到異常時(shí),我們通用的方式就是打日志、降級(jí)兜底、重試三板斧,下面我們具體看看在reactor中是怎么實(shí)現(xiàn)的。
Flux.just(1,2,3) .doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" )) .map(t -> { if (t == 2) { throw new IllegalArgumentException("Exception:"+t); } return t; }) .doOnError(e -> System.out.println("log: error happened with [" + e.getMessage() + "]")) .doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" )) .onErrorReturn(42) // .onErrorResume(e -> Flux.just(11,12,13)) .subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));
這是一個(gè)使用示例,其輸出如下,下面會(huì)對(duì)這些操作符進(jìn)行介紹。
data:1
Finally: [cancel]
log: error happened with [Exception:2]
data:42
Completed!
Finally: [onError]
1. 降級(jí)兜底 - onErrorReturn/onErrorResume
當(dāng)遇到異常的時(shí)候,可以使用onErrorReturn來(lái)處理,返回一個(gè)默認(rèn)值,其底層實(shí)現(xiàn)其實(shí)用的還是onErrorResume。相比onErrorReturn只能返回一個(gè)默認(rèn)值而言,onErrorResume更靈活,它可以根據(jù)不同的error類型,還實(shí)現(xiàn)不同的返回值,其fallback函數(shù)的入?yún)⑹钱惓n愋停祷氐膭t是一個(gè)Publisher。所以從返回值也可以看出來(lái),onErrorReturn/onErrorResume返回的是一個(gè)新的流,舊的流已經(jīng)在發(fā)生異常的時(shí)候就結(jié)束了。
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Publisher<? extends T>> fallback) { Objects.requireNonNull(predicate, "predicate"); return onErrorResume(e -> predicate.test(e) ? fallback.apply(e) : error(e)); }
這是onErrorResume的實(shí)現(xiàn),其提供了特別靈活的處理方式,predicate決定是否要進(jìn)行fallback,fallback下可以根據(jù)異常類型返回任意的數(shù)據(jù)流。假如不需要對(duì)異常fallback,即predicate為false時(shí),則直接返回FluxError的封裝,進(jìn)入onError階段。
再補(bǔ)充一點(diǎn),既然fallback是返回的一個(gè)新的流,那么即可以fallback返回一個(gè)單值,例如onErrorReturn那樣,也可以返回多個(gè)值的數(shù)據(jù)流,例如:
Flux.just(1,2,3) .map(t -> { if (t == 2) { throw new IllegalArgumentException("Exception:"+t); } return t; }) .onErrorResume(e -> Flux.just(11,12,13)) .subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));
輸出結(jié)果會(huì)是:1 11 12 13
。
其實(shí),對(duì)于Flux而言,因?yàn)槭嵌嘀档臄?shù)據(jù)流,其很難根據(jù)異常error就進(jìn)行合適的兜底,因?yàn)槎档淄Q于輸入,而異常的時(shí)候往往丟失了數(shù)據(jù)data的信息了,所以對(duì)于Flux的降級(jí),onErrorReturn/onErrorResume實(shí)用性不是太強(qiáng),因?yàn)榉祷氐男碌牧骱茈y替代舊的流,甚至你都不知道舊的Flux流有多少數(shù)據(jù)量。
相比之下,onErrorReturn/onErrorResume用于單值的Mono就顯得更為合適了。由于用法相同,這里就不過(guò)多贅述了。
所以,對(duì)于Flux流的降級(jí)兜底是個(gè)很困難的事情,有一種方式可以讓onErrorReturn/Resume獲取到當(dāng)前的數(shù)據(jù)data,那就是利用前面小節(jié)中說(shuō)到的,增加Operators.onOperatorError中的onOperatorErrorHook,把數(shù)據(jù)data塞入異常中再返回給onErrorReturn/Resume。
最后再看下FluxOnErrorResume是怎么實(shí)現(xiàn)降級(jí)的。
public void onError(Throwable t) { if (!second) { second = true; Publisher<? extends T> p; try { //nextFactory即fallback函數(shù) p = Objects.requireNonNull(nextFactory.apply(t), "The nextFactory returned a null Publisher"); } catch (Throwable e) { Throwable _e = Operators.onOperatorError(e, actual.currentContext()); _e = Exceptions.addSuppressed(_e, t); actual.onError(_e); return; } //重新訂閱一個(gè)新的流,其source就是fallback函數(shù)產(chǎn)生的publisher p.subscribe(this); } else { actual.onError(t); } }
當(dāng)上游出現(xiàn)異常時(shí),例如先前示例中的FluxMap,就會(huì)進(jìn)入onError階段,此時(shí)正好被onErrorResume的onError階段攔截,然后利用fallback函數(shù)產(chǎn)生新的流,再重新訂閱p.subscribe(this)
。另外,也可以看出,新的流只會(huì)作用于onErrorResume之后的operator,前面的operator則不會(huì)有作用。
2. 打印日志 - doOnError
打印日志就比較簡(jiǎn)單了,可以用doOnError方法來(lái)實(shí)現(xiàn)。doOnError的底層則用的FluxPeek來(lái)實(shí)現(xiàn),其作用是覆寫(xiě)了所有的接口,如onNext,onError, cancel等,通過(guò)覆寫(xiě)來(lái)實(shí)現(xiàn)hook。幾乎所有doOnXXX的方法都是依賴FluxPeek實(shí)現(xiàn)的,例如log、doOnNext、doOnError等等。由于與本次的主題無(wú)關(guān),不再贅述,感興趣的可以自行翻看FluxPeek的實(shí)現(xiàn)。
需要注意的是:雖然doOnXXX主要用于打印日志,但如果doOnXXX內(nèi)部出錯(cuò),也會(huì)導(dǎo)致整個(gè)流結(jié)束,進(jìn)入onError階段。所以,也是有副作用的,仍然在主流程中。
3. finally - doFinally
try-catch-finally中的關(guān)鍵字finally可以通過(guò)方法doFinally來(lái)平替。需要注意的是doFinally方法的執(zhí)行順序以及觸發(fā)時(shí)機(jī)。
通常,finally的含義是保證100%被執(zhí)行,也就是出錯(cuò)onError的時(shí)候執(zhí)行,正常結(jié)束onComplete也執(zhí)行。但在reactor中,除了這兩個(gè)事件外,還未能保證doFinally百分之百執(zhí)行,還需要增加cancel的情況。其原因是,當(dāng)出現(xiàn)異常后,對(duì)于異常的上游會(huì)走cancel流程,下游則走onError流程。如先前的示例,觸發(fā)doFinally的信號(hào)分別是:cancel與onError。
最后再說(shuō)一下執(zhí)行順序,如先前的示例中那樣,doFinally并不是按出現(xiàn)的順序執(zhí)行,也不是一定是在最后執(zhí)行的(這個(gè)區(qū)別與finally關(guān)鍵詞差別很大)。其原因在于,當(dāng)出現(xiàn)異常時(shí),會(huì)先cancel掉原先的數(shù)據(jù)流,再調(diào)用onError處理(可以參見(jiàn)前面FluxMap的源碼)。
所以,示例中,"Finally: [cancel]"會(huì)先被打印,然后才是onErrorReturn的執(zhí)行,即進(jìn)入onError階段。
那為什么第二個(gè)doFinally雖然出現(xiàn)在onErrorReturn之前,但又是最后執(zhí)行的呢?這是因?yàn)樵趯?shí)現(xiàn)doFinally的時(shí)候,先調(diào)用了下游的onError方法,再執(zhí)行自身doFinally的方法,參見(jiàn)FluxDoFinally的實(shí)現(xiàn):
public void onError(Throwable t) { try { actual.onError(t); } finally { runFinally(SignalType.ON_ERROR); } }
這樣就能符合try-catch-finally的執(zhí)行順序了。
所以,doFinally出現(xiàn)的位置很重要,若出現(xiàn)在異常前面,就會(huì)優(yōu)先執(zhí)行(不會(huì)像finally那樣最后執(zhí)行),若出現(xiàn)在異常后面,則會(huì)最后執(zhí)行(類似finally)。
4. try-with-resource
對(duì)于try-with-resource,reactor也給了替代的實(shí)現(xiàn),那就是using操作符:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) { return onAssembly(new FluxUsing<>(resourceSupplier, sourceSupplier, resourceCleanup, eager)); }
其中resourceSupplier是創(chuàng)建生成資源的函數(shù),sourceSupplier則是針對(duì)生成的resource進(jìn)行操作并產(chǎn)生數(shù)據(jù)流,resourceCleanup則是在結(jié)束后(不管成功還是失?。┻M(jìn)行資源的釋放。
以try-with-resource為例:
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { return disposableInstance.toString(); }
利用using函數(shù),則可以寫(xiě)成:
Flux.using( () -> new SomeAutoCloseable(), disposableInstance -> Flux.just(disposableInstance.toString()), AutoCloseable::close );
5. 重試 - retry / retryWhen
除了以上方式處理異常時(shí),還有一種常見(jiàn)的方式就是重試。比如,我們調(diào)用某個(gè)接口超時(shí)時(shí),通常會(huì)重試一次,這個(gè)時(shí)候可以使用retry方法,如:
Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .retry(1) .subscribe(System.out::println, System.err::println);
會(huì)對(duì)Flux流執(zhí)行兩次,其結(jié)果是:0 1 2 0 1 2
,即當(dāng)遇到data為3時(shí),會(huì)重試一次。
其基本思想很簡(jiǎn)單,就是攔截onError流程,計(jì)算重試的次數(shù),如果重試未超過(guò),則重新訂閱:
public void onError(Throwable t) { long r = remaining; if (r != Long.MAX_VALUE) { if (r == 0) { actual.onError(t); return; } remaining = r - 1; } resubscribe(); }
這里的remaining
就是可以重試的次數(shù),直到重試為0,再一次進(jìn)入actual.onError。重新訂閱的方法也很簡(jiǎn)單,就是把上游的source
與下游的actual
,再來(lái)一次subscribe:source.subscribe(actual)
。
除了retry
外,還有一個(gè)高級(jí)版本retryWhen
,它除了能像retry那樣重試固定的次數(shù)外,還能支持各種重試策略,由于retryWhen的源碼相對(duì)復(fù)雜,這里不再敘述(畢竟本文不是源碼解讀),但除了重試策略有區(qū)別外,其重試的機(jī)制還是一樣的,把上游與下游重新訂閱。
6. 檢查異常處理
在java中有一類異常是需要顯示進(jìn)行處理的,那就是檢查異常(Checked Exception),如IOException。在命令式編程中,可以通過(guò)throws關(guān)鍵字來(lái)聲明,從而可以把異常往外拋,而不需要立即處理。然而,遺憾的是,在reactor中,并沒(méi)有類似的平替,不管任何情況,當(dāng)遇到檢查異常,reactor中都需要用try-catch來(lái)處理,這是唯一一個(gè)在reactor中沒(méi)有找到命令式編程中的平替。與命令式編程有throws關(guān)鍵字聲明不同,reactor中處理檢查異常都必須用try-catch來(lái)處理,處理的方式有以下三種:
- 捕獲到異常并從中恢復(fù)。序列繼續(xù)正常的進(jìn)行。
- 捕獲異常,將其封裝成一個(gè) 不檢查 的異常,然后將其拋出(中斷序列)。
- 如果你需要返回一個(gè) Flux(例如,在 flatMap 中),那么就用一個(gè)產(chǎn)生錯(cuò)誤的 Flux 來(lái)封裝異常,如下所示:return Flux.error(checkedException)。(這個(gè)序列也會(huì)終止。)
這三種方式中,其中最常見(jiàn)也最常用的方式就是第二種,將檢查異常轉(zhuǎn)化為非檢查異常,如throw new RuntimeException(e)
。但是reactor提供了輔助工具類Exceptions,進(jìn)而可以相對(duì)優(yōu)雅簡(jiǎn)潔的進(jìn)行統(tǒng)一處理。
如以下這個(gè)例子(來(lái)自reactor的文檔):
public String convert(int i) throws IOException { if (i > 3) { throw new IOException("boom " + i); } return "OK " + i; } Flux<String> converted = Flux .range(1, 10) .map(i -> { try { return convert(i); } catch (IOException e) { throw Exceptions.propagate(e); } }); converted.subscribe( v -> System.out.println("RECEIVED: " + v), e -> { if (Exceptions.unwrap(e) instanceof IOException) { System.out.println("Something bad happened with I/O"); } else { System.out.println("Something bad happened"); } } );
由于convert聲明了檢查異常IOException,所以必須要try-catch住,再利用Exceptions.propagate來(lái)封裝為非檢查異常。相比于直接用throw new RuntimeException(e)
,利用Exceptions的好處在onError處理階段可以用Exceptions.unwrap()方法來(lái)獲取內(nèi)部真實(shí)拋出的異常,體現(xiàn)了利用工具類的好處——簡(jiǎn)潔明了。
總結(jié)
本文先從reactor異常處理的底層機(jī)制講起,講清楚了一個(gè)基本概念:只要出現(xiàn)異常,不管如何處理,舊的流都已經(jīng)結(jié)束,接下來(lái)處理的都是新的流。在這基礎(chǔ)上,按命令式編程中的try-catch-finally的方式,用reactor的方式進(jìn)行了一一替代介紹,希望通過(guò)對(duì)比的方式,能更好的掌握在reactor中如何優(yōu)雅的處理異常。
到此這篇關(guān)于詳解Reactor如何優(yōu)雅Exception異常處理的文章就介紹到這了,更多相關(guān)Reactor Exception異常處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java詳細(xì)分析String類與StringBuffer和StringBuilder的使用方法
當(dāng)對(duì)字符串進(jìn)行修改的時(shí)候,需要使用 StringBuffer 和 StringBuilder類,和String類不同的是,StringBuffer和 StringBuilder類的對(duì)象能夠被多次的修改,并且不產(chǎn)生新的未使用對(duì)象2022-04-04利用logback filter過(guò)濾某個(gè)類 屏蔽某個(gè)類
這篇文章主要介紹了利用logback filter過(guò)濾某個(gè)類 屏蔽某個(gè)類的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07springboot項(xiàng)目部署到k8s上的方法步驟
本文主要介紹了springboot項(xiàng)目部署到k8s上的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05通過(guò)實(shí)例解析Java類初始化和實(shí)例初始化
這篇文章主要介紹了通過(guò)實(shí)例解析Java類初始化和實(shí)例初始化,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11Netty啟動(dòng)流程服務(wù)端channel初始化源碼分析
這篇文章主要為大家介紹了Netty啟動(dòng)流程服務(wù)端channel初始化源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03Java實(shí)戰(zhàn)項(xiàng)目練習(xí)之球館在線預(yù)約系統(tǒng)的實(shí)現(xiàn)
理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實(shí)現(xiàn)一個(gè)球館在線預(yù)約系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2022-01-01Java防止文件被篡改之文件校驗(yàn)功能的實(shí)例代碼
這篇文章主要介紹了Java防止文件被篡改之文件校驗(yàn)功能,本文給大家分享了文件校驗(yàn)和原理及具體實(shí)現(xiàn)思路,需要的朋友可以參考下2018-11-11