Reactive反應(yīng)式編程及使用介紹
前言
前一篇分析了Spring WebFlux的設(shè)計(jì)及實(shí)現(xiàn)原理后,反應(yīng)式編程又來了,Spring WebFlux其底層還是基于Reactive編程模型的,在java領(lǐng)域中,關(guān)于Reactive,有一個(gè)框架規(guī)范,叫【Reactive Streams】,在java9的ava.util.concurrent.Flow包中已經(jīng)實(shí)現(xiàn)了這個(gè)規(guī)范。其他的優(yōu)秀實(shí)現(xiàn)還有Reactor和Rxjava。在Spring WebFlux中依賴的就是Reactor。雖然你可能沒用過Reactive開發(fā)過應(yīng)用,但是或多會(huì)少你接觸過異步Servlet,同時(shí)又有這么一種論調(diào):異步化非阻塞io并不能增強(qiáng)太多的系統(tǒng)性能,但是也不可否認(rèn)異步化后并發(fā)性能上去了。聽到這種結(jié)論后在面對(duì)是否選擇Reactive編程后,是不是非常模棱兩可。因?yàn)槲覀儾皇呛芰私夥磻?yīng)式編程,所以會(huì)有這種感覺。沒關(guān)系,下面看看反應(yīng)式編程集大者Reactor是怎么闡述反應(yīng)式編程的。
- Reactor官網(wǎng):https://projectreactor.io/
- Rxjava官網(wǎng):http://reactivex.io/
反應(yīng)式編程簡(jiǎn)介
Reactor是Reactive Programming范例的一個(gè)實(shí)現(xiàn),可以概括為:
反應(yīng)式編程是一種涉及數(shù)據(jù)流和變化傳播的異步編程范例。這意味著可以通過所采用的編程語(yǔ)言輕松地表達(dá)靜態(tài)(例如陣列)或動(dòng)態(tài)(例如事件發(fā)射器)數(shù)據(jù)流。
作為反應(yīng)式編程方向的第一步,Microsoft在.NET生態(tài)系統(tǒng)中創(chuàng)建了Reactive Extensions(Rx)庫(kù)。然后RxJava在JVM上實(shí)現(xiàn)了響應(yīng)式編程。隨著時(shí)間的推移,通過Reactive Streams工作出現(xiàn)了Java的標(biāo)準(zhǔn)化 ,這一規(guī)范定義了JVM上的反應(yīng)庫(kù)的一組接口和交互規(guī)則。它的接口已經(jīng)集成到父Flow類下的Java 9中。
反應(yīng)式編程范例通常以面向?qū)ο蟮恼Z(yǔ)言呈現(xiàn),作為Observer設(shè)計(jì)模式的擴(kuò)展。人們還可以將主要的反應(yīng)流模式與熟悉的迭代器設(shè)計(jì)模式進(jìn)行比較,因?yàn)樵谒羞@些庫(kù)中對(duì)Iterable- Iterator對(duì)存在雙重性 。一個(gè)主要的區(qū)別是,雖然迭代器是基于拉的,但是反應(yīng)流是基于推的。
使用迭代器是一種命令式編程模式,即使訪問值的方法完全由其負(fù)責(zé)Iterable。實(shí)際上,開發(fā)人員可以選擇何時(shí)訪問next()序列中的項(xiàng)目。在反應(yīng)流中,相當(dāng)于上述對(duì)Publisher-Subscriber。但是, 當(dāng)它們出現(xiàn)時(shí),Publisher它會(huì)通知訂閱者新的可用值,而這一推動(dòng)方面是被動(dòng)反應(yīng)的關(guān)鍵。此外,應(yīng)用于推送值的操作以聲明方式而非命令方式表示:程序員表達(dá)計(jì)算的邏輯而不是描述其精確的控制流。
除了推送值之外,還以明確定義的方式涵蓋錯(cuò)誤處理和完成方面。A Publisher可以將新值推送到Subscriber(通過調(diào)用onNext),但也可以發(fā)出錯(cuò)誤(通過調(diào)用onError)或完成(通過調(diào)用onComplete)。錯(cuò)誤和完成都會(huì)終止序列。這可以概括為:
onNext x 0..N [onError | onComplete]
這種方法非常靈活。該模式支持沒有值,一個(gè)值或n值的用例(包括無限的值序列,例如時(shí)鐘的連續(xù)滴答)。
但是我們首先考慮一下,為什么我們首先需要這樣的異步反應(yīng)庫(kù)?
阻塞可能會(huì)浪費(fèi)資源
現(xiàn)代應(yīng)用程序可以覆蓋大量并發(fā)用戶,即使現(xiàn)代硬件的功能不斷提高,現(xiàn)代軟件的性能仍然是一個(gè)關(guān)鍵問題。
人們可以通過兩種方式來提高計(jì)劃的績(jī)效:
- 并行化:使用更多線程和更多硬件資源。
- 在現(xiàn)有資源的使用方式上尋求更高的效率。
通常,Java開發(fā)人員使用阻塞代碼編寫程序。這種做法很好,直到出現(xiàn)性能瓶頸,此時(shí)需要引入額外的線程,運(yùn)行類似的阻塞代碼。但是,資源利用率的這種擴(kuò)展會(huì)很快引入爭(zhēng)用和并發(fā)問題。
更糟糕的是,阻止浪費(fèi)資源。如果仔細(xì)觀察,一旦程序涉及一些延遲(特別是I / O,例如數(shù)據(jù)庫(kù)請(qǐng)求或網(wǎng)絡(luò)調(diào)用),資源就會(huì)被浪費(fèi),因?yàn)榫€程(或許多線程)現(xiàn)在處于空閑狀態(tài),等待數(shù)據(jù)。
所以并行化方法不是靈丹妙藥。為了獲得硬件的全部功能是必要的,但是理由也很復(fù)雜并且易受資源浪費(fèi)的影響。
使用異步來解決?
第二種方法(前面提到過),尋求更高的效率,可以解決資源浪費(fèi)問題。通過編寫異步,非阻塞代碼,您可以使用相同的底層資源將執(zhí)行切換到另一個(gè)活動(dòng)任務(wù),然后在異步處理完成后返回到當(dāng)前進(jìn)程。
但是如何在JVM上生成異步代碼?Java提供了兩種異步編程模型:
回調(diào):異步方法沒有返回值,但需要額外的 callback參數(shù)(lambda或匿名類),在結(jié)果可用時(shí)調(diào)用它們。一個(gè)眾所周知的例子是Swing的EventListener層次結(jié)構(gòu)。
期貨:異步方法Future立即返回。異步進(jìn)程計(jì)算一個(gè)T值,但該Future對(duì)象包含對(duì)它的訪問。該值不會(huì)立即可用,并且可以輪詢對(duì)象,直到該值可用。例如,ExecutorService運(yùn)行Callable任務(wù)使用Future對(duì)象。
這些技術(shù)是否足夠好?不適用于所有用例,兩種方法都有局限性。
回調(diào)難以組合在一起,很快導(dǎo)致難以閱讀和維護(hù)的代碼(稱為“Callback Hell”)。
考慮一個(gè)示例:在用戶界面上顯示用戶的前五個(gè)收藏夾,或者如果她沒有收藏夾則提出建議。這通過三個(gè)服務(wù)(一個(gè)提供喜歡的ID,第二個(gè)提取喜歡的詳細(xì)信息,第三個(gè)提供詳細(xì)建議):
回調(diào)地獄的例子
userService.getFavorites(userId, new Callback() { public void onSuccess(Listlist) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback() { public void onSuccess(Listlist) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
- 我們有基于回調(diào)的服務(wù):一個(gè)Callback接口,其中包含在異步過程成功時(shí)調(diào)用的方法,以及在發(fā)生錯(cuò)誤時(shí)調(diào)用的方法。
- 第一個(gè)服務(wù)使用喜歡的ID列表調(diào)用其回調(diào)。
- 如果列表為空,我們必須去suggestionService。
- 在suggestionService給出了一個(gè)List到第二個(gè)回調(diào)。
- 由于我們處理UI,我們需要確保我們的消費(fèi)代碼將在UI線程中運(yùn)行。
- 我們使用Java 8 Stream將處理的建議數(shù)限制為五個(gè),并在UI中的圖形列表中顯示它們。
- 在每個(gè)級(jí)別,我們以相同的方式處理錯(cuò)誤:在彈出窗口中顯示它們。
- 回到最喜歡的ID級(jí)別。如果服務(wù)返回完整列表,那么我們需要轉(zhuǎn)到favoriteService獲取詳細(xì)Favorite對(duì)象。由于我們只需要五個(gè),我們首先流式傳輸ID列表,將其限制為五個(gè)。
- 再一次,一個(gè)回調(diào)。這次我們得到一個(gè)完全成熟的Favorite對(duì)象,我們將其推送到UI線程內(nèi)的UI。
這是很多代碼,它有點(diǎn)難以遵循并且具有重復(fù)的部分??紤]它在Reactor中的等價(jià)物:
與回調(diào)代碼等效的Reactor代碼示例
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
- 我們從最喜歡的ID流開始。
- 我們將它們異步轉(zhuǎn)換為詳細(xì)的Favorite對(duì)象(flatMap)。我們現(xiàn)在有一個(gè)流動(dòng)Favorite。
- 如果流量Favorite是空的,我們會(huì)切換到后退 suggestionService。
- 我們最多只對(duì)最終流程中的五個(gè)元素感興趣。
- 最后,我們想要處理UI線程中的每個(gè)數(shù)據(jù)。
- 我們通過描述如何處理數(shù)據(jù)的最終形式(在UI列表中顯示)以及在出現(xiàn)錯(cuò)誤(顯示彈出窗口)時(shí)該怎么做來觸發(fā)流程。
如果您想確保在不到800毫秒內(nèi)檢索到喜歡的ID,或者如果需要更長(zhǎng)時(shí)間從緩存中獲取它們,該怎么辦?在基于回調(diào)的代碼中,這是一項(xiàng)復(fù)雜的任務(wù)。在Reactor中,它變得像timeout在鏈中添加運(yùn)算符一樣簡(jiǎn)單:
具有超時(shí)和回退的Reactor代碼示例
userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
- 如果上面的部分發(fā)出的時(shí)間超過800毫秒,則傳播錯(cuò)誤。
- 如果出現(xiàn)錯(cuò)誤,請(qǐng)回復(fù)cacheService。
- 鏈的其余部分與前面的示例類似。
盡管Java 8中帶來了改進(jìn),但期貨比回調(diào)要好一些,但它們?cè)跇?gòu)圖方面仍然表現(xiàn)不佳CompletableFuture。一起編排多個(gè)未來是可行但不容易的。此外,F(xiàn)uture還有其他問題:Future通過調(diào)用get() 方法很容易結(jié)束對(duì)象的另一個(gè)阻塞情況,它們不支持延遲計(jì)算,并且它們不支持多個(gè)值和高級(jí)錯(cuò)誤處理。
考慮另一個(gè)例子:我們得到一個(gè)ID列表,我們要從中獲取一個(gè)名稱和一個(gè)統(tǒng)計(jì)信息,然后將它們成對(duì)地組合在一起,所有這些都是異步的。
CompletableFuture組合的例子
CompletableFutureids = ifhIds(); CompletableFutureresult = ids.thenComposeAsync(l -> { Streamzip = l.stream().map(i -> { CompletableFuturenameTask = ifhName(i); CompletableFuturestatTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); ListcombinationList = zip.collect(Collectors.toList()); CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFutureallDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); Listresults = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
- 我們從一個(gè)未來開始,它為我們提供了一個(gè)id要處理的值列表。
- 一旦得到列表,我們想要開始一些更深入的異步處理。
- 對(duì)于列表中的每個(gè)元素:
- 異步獲取關(guān)聯(lián)的名稱。
- 異步獲取相關(guān)任務(wù)。
- 結(jié)合兩個(gè)結(jié)果。
- 我們現(xiàn)在有一個(gè)代表所有組合任務(wù)的期貨清單。為了執(zhí)行這些任務(wù),我們需要將列表轉(zhuǎn)換為數(shù)組。
- 將數(shù)組傳遞給CompletableFuture.allOf,輸出Future完成所有任務(wù)后完成的數(shù)組。
- 棘手的一點(diǎn)是allOf返回CompletableFuture,所以我們重申了期貨清單,通過收集結(jié)果join() (這里沒有阻止,因?yàn)閍llOf確保期貨全部完成)。
- 一旦觸發(fā)了整個(gè)異步管道,我們就等待它被處理并返回我們可以斷言的結(jié)果列表。
由于Reactor具有更多開箱即用的組合運(yùn)算符,因此可以簡(jiǎn)化此過程:
與未來代碼等效的Reactor代碼示例
Fluxids = ifhrIds(); Fluxcombinations = ids.flatMap(id -> { MononameTask = ifhrName(id); MonostatTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Monoresult = combinations.collectList(); Listresults = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
- 這一次,我們從異步提供的ids(a Flux)序列開始。
- 對(duì)于序列中的每個(gè)元素,我們異步處理它(在body函數(shù)內(nèi)部flatMap)兩次。
- 獲取相關(guān)名稱。
- 獲取相關(guān)統(tǒng)計(jì)信息。
- 異步組合2個(gè)值。
- 在將值List變?yōu)榭捎脮r(shí)將值聚合為a 。
- 在生產(chǎn)中,我們將繼續(xù)Flux通過進(jìn)一步組合或訂閱它來異步處理。最有可能的是,我們會(huì)回歸result Mono。由于我們?cè)跍y(cè)試中,我們阻塞,等待處理完成,然后直接返回聚合的值列表。
- 斷言結(jié)果。
Callback和Future的這些風(fēng)險(xiǎn)是相似的,并且是反應(yīng)式編程與該P(yáng)ublisher-Subscriber對(duì)的關(guān)系。
從命令式到反應(yīng)式編程
諸如Reactor之類的反應(yīng)庫(kù)旨在解決JVM上“經(jīng)典”異步方法的這些缺點(diǎn),同時(shí)還關(guān)注一些其他方面:
- 可組合性和可讀性
- 數(shù)據(jù)作為一個(gè)用豐富的運(yùn)算符詞匯表操縱的流程
- 在您訂閱之前沒有任何事情發(fā)生
- 背壓或消費(fèi)者向生產(chǎn)者發(fā)出信號(hào)表明排放率過高的能力
- 高級(jí)但高價(jià)值的抽象,與并發(fā)無關(guān)
可組合性和可讀性
通過可組合性,我們指的是編排多個(gè)異步任務(wù)的能力,使用先前任務(wù)的結(jié)果將輸入提供給后續(xù)任務(wù)或以fork-join方式執(zhí)行多個(gè)任務(wù),以及將異步任務(wù)重用為更高級(jí)別系統(tǒng)中的分立組件。
編排任務(wù)的能力與代碼的可讀性和可維護(hù)性緊密相關(guān)。隨著異步過程層數(shù)量和復(fù)雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,回調(diào)模型很簡(jiǎn)單,但其主要缺點(diǎn)之一是,對(duì)于復(fù)雜的進(jìn)程,您需要從回調(diào)執(zhí)行回調(diào),本身嵌套在另一個(gè)回調(diào)中,依此類推。那個(gè)混亂被稱為Callback Hell。正如你可以猜到的(或者從經(jīng)驗(yàn)中得知),這樣的代碼很難回歸并推理。
Reactor提供了豐富的組合選項(xiàng),其中代碼反映了抽象過程的組織,并且所有內(nèi)容通常都保持在同一級(jí)別(嵌套最小化)。
類比裝配線工作流程
您可以將響應(yīng)式應(yīng)用程序處理的數(shù)據(jù)視為在裝配線中移動(dòng)。反應(yīng)器既是傳送帶又是工作站。原材料從原料(原始Publisher)中倒出,最終成為成品,準(zhǔn)備推送給消費(fèi)者(或Subscriber)。
原材料可以經(jīng)歷各種轉(zhuǎn)換和其他中間步驟,或者是將中間件聚集在一起的較大裝配線的一部分。如果在某一點(diǎn)出現(xiàn)毛刺或堵塞(也許裝箱產(chǎn)品需要不成比例的長(zhǎng)時(shí)間),受影響的工作站可向上游發(fā)出信號(hào)以限制原材料的流動(dòng)。
操作符(運(yùn)算符)
在Reactor中,運(yùn)算符是我們的匯編類比中的工作站。每個(gè)操作符都將行為添加到a Publisher并將上一步驟包裝Publisher到新實(shí)例中。因此,整個(gè)鏈被鏈接,使得數(shù)據(jù)源自第一Publisher鏈并且向下移動(dòng)鏈,由每個(gè)鏈轉(zhuǎn)換。最終,Subscriber完成了整個(gè)過程。請(qǐng)記住,在Subscriber訂閱a 之前沒有任何事情發(fā)生Publisher,下面就會(huì)提到。
了解操作員創(chuàng)建新實(shí)例可以幫助您避免一個(gè)常見錯(cuò)誤,該錯(cuò)誤會(huì)導(dǎo)致您認(rèn)為您的鏈中使用的操作員未被應(yīng)用。看到這個(gè)項(xiàng)目的常見問題。
雖然Reactive Streams規(guī)范根本沒有指定運(yùn)算符,但Reactor等反應(yīng)庫(kù)的最佳附加值之一是它們提供的豐富的運(yùn)算符。這些涉及很多方面,從簡(jiǎn)單的轉(zhuǎn)換和過濾到復(fù)雜的編排和錯(cuò)誤處理。
在你訂閱之前什么都不會(huì)發(fā)生
在Reactor中,當(dāng)您編寫Publisher鏈時(shí),默認(rèn)情況下數(shù)據(jù)不會(huì)啟動(dòng)。相反,您可以創(chuàng)建異步過程的抽象描述(這可以幫助重用和組合)。
通過訂閱行為,您將Publishera 綁定到a Subscriber,從而觸發(fā)整個(gè)鏈中的數(shù)據(jù)流。這是通過上游傳播的單個(gè)request 信號(hào)在內(nèi)部實(shí)現(xiàn)的Subscriber,一直傳回源 Publisher。
背壓
上游傳播信號(hào)也用于實(shí)現(xiàn)背壓,我們?cè)谘b配線中將其描述為當(dāng)工作站比上游工作站處理速度慢時(shí)向線路發(fā)送的反饋信號(hào)。
Reactive Streams規(guī)范定義的真實(shí)機(jī)制非常接近于類比:訂閱者可以在無限制模式下工作,讓源以最快的速度推送所有數(shù)據(jù),或者可以使用該request機(jī)制向源發(fā)送信號(hào)表明它已準(zhǔn)備就緒處理最多的n元素。
中間操作員也可以在途中更改請(qǐng)求。想象一個(gè)buffer 運(yùn)算符,它將元素分組為10個(gè)。如果訂閱者請(qǐng)求1個(gè)緩沖區(qū),則源可以生成10個(gè)元素。一些操作員還實(shí)施 預(yù)取策略,這避免了request(1)往返,并且如果在請(qǐng)求之前生成元素并不太昂貴,則是有益的。
這將推模型轉(zhuǎn)換為推拉式混合動(dòng)力,如果它們隨時(shí)可用,下游可以從上游拉出n個(gè)元素。但是如果元素沒有準(zhǔn)備好,它們就會(huì)在生成時(shí)被上游推動(dòng)。
熱與冷
在反應(yīng)庫(kù)的Rx家族中,人們可以區(qū)分兩大類反應(yīng)序列:熱和冷。這種區(qū)別主要與反應(yīng)流如何對(duì)訂閱的用戶做出反應(yīng)有關(guān):
冷序列的含義是不論訂閱者在何時(shí)訂閱該序列,總是能收到序列中產(chǎn)生的全部消息。
而與之對(duì)應(yīng)的熱序列,則是在持續(xù)不斷地產(chǎn)生消息,訂閱者只能獲取到在其訂閱之后產(chǎn)生的消息。
以上就是Reactive反應(yīng)式編程及使用介紹的詳細(xì)內(nèi)容,更多關(guān)于Reactive反應(yīng)式編程使用的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實(shí)現(xiàn)上傳圖片尺寸修改和質(zhì)量壓縮
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)上傳圖片尺寸修改和質(zhì)量壓縮,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04Spring Boot中使用Activiti的方法教程(二)
工作流(Workflow),就是“業(yè)務(wù)過程的部分或整體在計(jì)算機(jī)應(yīng)用環(huán)境下的自動(dòng)化”,下面這篇文章主要給大家介紹了關(guān)于Spring Boot中使用Activiti的相關(guān)資料,需要的朋友可以參考下2018-08-08springBoot Junit測(cè)試用例出現(xiàn)@Autowired不生效的解決
這篇文章主要介紹了springBoot Junit測(cè)試用例出現(xiàn)@Autowired不生效的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09詳解Javaweb狀態(tài)管理的Session和Cookie
這篇文章主要介紹了Javaweb狀態(tài)管理的Session和Cookie,將瀏覽器與web服務(wù)器之間多次交互當(dāng)做一個(gè)整體來處理,并且多次交互所涉及的數(shù)據(jù)(狀態(tài))保存下來,需要的朋友可以參考下2023-05-05SpringCloud Gateway使用redis實(shí)現(xiàn)動(dòng)態(tài)路由的方法
這篇文章主要介紹了SpringCloud Gateway使用redis實(shí)現(xiàn)動(dòng)態(tài)路由的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01mybatis使用foreach語(yǔ)句實(shí)現(xiàn)IN查詢(三種)
這篇文章主要介紹了mybatis使用foreach語(yǔ)句實(shí)現(xiàn)IN查詢(三種),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12