Reactive反應(yīng)式編程及使用介紹
前言
前一篇分析了Spring WebFlux的設(shè)計及實現(xiàn)原理后,反應(yīng)式編程又來了,Spring WebFlux其底層還是基于Reactive編程模型的,在java領(lǐng)域中,關(guān)于Reactive,有一個框架規(guī)范,叫【Reactive Streams】,在java9的ava.util.concurrent.Flow包中已經(jīng)實現(xiàn)了這個規(guī)范。其他的優(yōu)秀實現(xiàn)還有Reactor和Rxjava。在Spring WebFlux中依賴的就是Reactor。雖然你可能沒用過Reactive開發(fā)過應(yīng)用,但是或多會少你接觸過異步Servlet,同時又有這么一種論調(diào):異步化非阻塞io并不能增強(qiáng)太多的系統(tǒng)性能,但是也不可否認(rèn)異步化后并發(fā)性能上去了。聽到這種結(jié)論后在面對是否選擇Reactive編程后,是不是非常模棱兩可。因為我們不是很了解反應(yīng)式編程,所以會有這種感覺。沒關(guān)系,下面看看反應(yīng)式編程集大者Reactor是怎么闡述反應(yīng)式編程的。
- Reactor官網(wǎng):https://projectreactor.io/
- Rxjava官網(wǎng):http://reactivex.io/
反應(yīng)式編程簡介
Reactor是Reactive Programming范例的一個實現(xiàn),可以概括為:
反應(yīng)式編程是一種涉及數(shù)據(jù)流和變化傳播的異步編程范例。這意味著可以通過所采用的編程語言輕松地表達(dá)靜態(tài)(例如陣列)或動態(tài)(例如事件發(fā)射器)數(shù)據(jù)流。
作為反應(yīng)式編程方向的第一步,Microsoft在.NET生態(tài)系統(tǒng)中創(chuàng)建了Reactive Extensions(Rx)庫。然后RxJava在JVM上實現(xiàn)了響應(yīng)式編程。隨著時間的推移,通過Reactive Streams工作出現(xiàn)了Java的標(biāo)準(zhǔn)化 ,這一規(guī)范定義了JVM上的反應(yīng)庫的一組接口和交互規(guī)則。它的接口已經(jīng)集成到父Flow類下的Java 9中。
反應(yīng)式編程范例通常以面向?qū)ο蟮恼Z言呈現(xiàn),作為Observer設(shè)計模式的擴(kuò)展。人們還可以將主要的反應(yīng)流模式與熟悉的迭代器設(shè)計模式進(jìn)行比較,因為在所有這些庫中對Iterable- Iterator對存在雙重性 。一個主要的區(qū)別是,雖然迭代器是基于拉的,但是反應(yīng)流是基于推的。
使用迭代器是一種命令式編程模式,即使訪問值的方法完全由其負(fù)責(zé)Iterable。實際上,開發(fā)人員可以選擇何時訪問next()序列中的項目。在反應(yīng)流中,相當(dāng)于上述對Publisher-Subscriber。但是, 當(dāng)它們出現(xiàn)時,Publisher它會通知訂閱者新的可用值,而這一推動方面是被動反應(yīng)的關(guān)鍵。此外,應(yīng)用于推送值的操作以聲明方式而非命令方式表示:程序員表達(dá)計算的邏輯而不是描述其精確的控制流。
除了推送值之外,還以明確定義的方式涵蓋錯誤處理和完成方面。A Publisher可以將新值推送到Subscriber(通過調(diào)用onNext),但也可以發(fā)出錯誤(通過調(diào)用onError)或完成(通過調(diào)用onComplete)。錯誤和完成都會終止序列。這可以概括為:
onNext x 0..N [onError | onComplete]
這種方法非常靈活。該模式支持沒有值,一個值或n值的用例(包括無限的值序列,例如時鐘的連續(xù)滴答)。
但是我們首先考慮一下,為什么我們首先需要這樣的異步反應(yīng)庫?
阻塞可能會浪費資源
現(xiàn)代應(yīng)用程序可以覆蓋大量并發(fā)用戶,即使現(xiàn)代硬件的功能不斷提高,現(xiàn)代軟件的性能仍然是一個關(guān)鍵問題。
人們可以通過兩種方式來提高計劃的績效:
- 并行化:使用更多線程和更多硬件資源。
- 在現(xiàn)有資源的使用方式上尋求更高的效率。
通常,Java開發(fā)人員使用阻塞代碼編寫程序。這種做法很好,直到出現(xiàn)性能瓶頸,此時需要引入額外的線程,運行類似的阻塞代碼。但是,資源利用率的這種擴(kuò)展會很快引入爭用和并發(fā)問題。
更糟糕的是,阻止浪費資源。如果仔細(xì)觀察,一旦程序涉及一些延遲(特別是I / O,例如數(shù)據(jù)庫請求或網(wǎng)絡(luò)調(diào)用),資源就會被浪費,因為線程(或許多線程)現(xiàn)在處于空閑狀態(tài),等待數(shù)據(jù)。
所以并行化方法不是靈丹妙藥。為了獲得硬件的全部功能是必要的,但是理由也很復(fù)雜并且易受資源浪費的影響。
使用異步來解決?
第二種方法(前面提到過),尋求更高的效率,可以解決資源浪費問題。通過編寫異步,非阻塞代碼,您可以使用相同的底層資源將執(zhí)行切換到另一個活動任務(wù),然后在異步處理完成后返回到當(dāng)前進(jìn)程。
但是如何在JVM上生成異步代碼?Java提供了兩種異步編程模型:
回調(diào):異步方法沒有返回值,但需要額外的 callback參數(shù)(lambda或匿名類),在結(jié)果可用時調(diào)用它們。一個眾所周知的例子是Swing的EventListener層次結(jié)構(gòu)。
期貨:異步方法Future立即返回。異步進(jìn)程計算一個T值,但該Future對象包含對它的訪問。該值不會立即可用,并且可以輪詢對象,直到該值可用。例如,ExecutorService運行Callable任務(wù)使用Future對象。
這些技術(shù)是否足夠好?不適用于所有用例,兩種方法都有局限性。
回調(diào)難以組合在一起,很快導(dǎo)致難以閱讀和維護(hù)的代碼(稱為“Callback Hell”)。
考慮一個示例:在用戶界面上顯示用戶的前五個收藏夾,或者如果她沒有收藏夾則提出建議。這通過三個服務(wù)(一個提供喜歡的ID,第二個提取喜歡的詳細(xì)信息,第三個提供詳細(xì)建議):
回調(diào)地獄的例子
userService.getFavorites(userId, new Callback() { public void onSuccess(Listlist) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback() { public void onSuccess(Listlist) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
- 我們有基于回調(diào)的服務(wù):一個Callback接口,其中包含在異步過程成功時調(diào)用的方法,以及在發(fā)生錯誤時調(diào)用的方法。
- 第一個服務(wù)使用喜歡的ID列表調(diào)用其回調(diào)。
- 如果列表為空,我們必須去suggestionService。
- 在suggestionService給出了一個List到第二個回調(diào)。
- 由于我們處理UI,我們需要確保我們的消費代碼將在UI線程中運行。
- 我們使用Java 8 Stream將處理的建議數(shù)限制為五個,并在UI中的圖形列表中顯示它們。
- 在每個級別,我們以相同的方式處理錯誤:在彈出窗口中顯示它們。
- 回到最喜歡的ID級別。如果服務(wù)返回完整列表,那么我們需要轉(zhuǎn)到favoriteService獲取詳細(xì)Favorite對象。由于我們只需要五個,我們首先流式傳輸ID列表,將其限制為五個。
- 再一次,一個回調(diào)。這次我們得到一個完全成熟的Favorite對象,我們將其推送到UI線程內(nèi)的UI。
這是很多代碼,它有點難以遵循并且具有重復(fù)的部分??紤]它在Reactor中的等價物:
與回調(diào)代碼等效的Reactor代碼示例
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
- 我們從最喜歡的ID流開始。
- 我們將它們異步轉(zhuǎn)換為詳細(xì)的Favorite對象(flatMap)。我們現(xiàn)在有一個流動Favorite。
- 如果流量Favorite是空的,我們會切換到后退 suggestionService。
- 我們最多只對最終流程中的五個元素感興趣。
- 最后,我們想要處理UI線程中的每個數(shù)據(jù)。
- 我們通過描述如何處理數(shù)據(jù)的最終形式(在UI列表中顯示)以及在出現(xiàn)錯誤(顯示彈出窗口)時該怎么做來觸發(fā)流程。
如果您想確保在不到800毫秒內(nèi)檢索到喜歡的ID,或者如果需要更長時間從緩存中獲取它們,該怎么辦?在基于回調(diào)的代碼中,這是一項復(fù)雜的任務(wù)。在Reactor中,它變得像timeout在鏈中添加運算符一樣簡單:
具有超時和回退的Reactor代碼示例
userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
- 如果上面的部分發(fā)出的時間超過800毫秒,則傳播錯誤。
- 如果出現(xiàn)錯誤,請回復(fù)cacheService。
- 鏈的其余部分與前面的示例類似。
盡管Java 8中帶來了改進(jìn),但期貨比回調(diào)要好一些,但它們在構(gòu)圖方面仍然表現(xiàn)不佳CompletableFuture。一起編排多個未來是可行但不容易的。此外,F(xiàn)uture還有其他問題:Future通過調(diào)用get() 方法很容易結(jié)束對象的另一個阻塞情況,它們不支持延遲計算,并且它們不支持多個值和高級錯誤處理。
考慮另一個例子:我們得到一個ID列表,我們要從中獲取一個名稱和一個統(tǒng)計信息,然后將它們成對地組合在一起,所有這些都是異步的。
CompletableFuture組合的例子
CompletableFutureids = ifhIds(); CompletableFutureresult = ids.thenComposeAsync(l -> { Streamzip = l.stream().map(i -> { CompletableFuturenameTask = ifhName(i); CompletableFuturestatTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); ListcombinationList = zip.collect(Collectors.toList()); CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFutureallDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); Listresults = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
- 我們從一個未來開始,它為我們提供了一個id要處理的值列表。
- 一旦得到列表,我們想要開始一些更深入的異步處理。
- 對于列表中的每個元素:
- 異步獲取關(guān)聯(lián)的名稱。
- 異步獲取相關(guān)任務(wù)。
- 結(jié)合兩個結(jié)果。
- 我們現(xiàn)在有一個代表所有組合任務(wù)的期貨清單。為了執(zhí)行這些任務(wù),我們需要將列表轉(zhuǎn)換為數(shù)組。
- 將數(shù)組傳遞給CompletableFuture.allOf,輸出Future完成所有任務(wù)后完成的數(shù)組。
- 棘手的一點是allOf返回CompletableFuture,所以我們重申了期貨清單,通過收集結(jié)果join() (這里沒有阻止,因為allOf確保期貨全部完成)。
- 一旦觸發(fā)了整個異步管道,我們就等待它被處理并返回我們可以斷言的結(jié)果列表。
由于Reactor具有更多開箱即用的組合運算符,因此可以簡化此過程:
與未來代碼等效的Reactor代碼示例
Fluxids = ifhrIds(); Fluxcombinations = ids.flatMap(id -> { MononameTask = ifhrName(id); MonostatTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Monoresult = combinations.collectList(); Listresults = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
- 這一次,我們從異步提供的ids(a Flux)序列開始。
- 對于序列中的每個元素,我們異步處理它(在body函數(shù)內(nèi)部flatMap)兩次。
- 獲取相關(guān)名稱。
- 獲取相關(guān)統(tǒng)計信息。
- 異步組合2個值。
- 在將值List變?yōu)榭捎脮r將值聚合為a 。
- 在生產(chǎn)中,我們將繼續(xù)Flux通過進(jìn)一步組合或訂閱它來異步處理。最有可能的是,我們會回歸result Mono。由于我們在測試中,我們阻塞,等待處理完成,然后直接返回聚合的值列表。
- 斷言結(jié)果。
Callback和Future的這些風(fēng)險是相似的,并且是反應(yīng)式編程與該Publisher-Subscriber對的關(guān)系。
從命令式到反應(yīng)式編程
諸如Reactor之類的反應(yīng)庫旨在解決JVM上“經(jīng)典”異步方法的這些缺點,同時還關(guān)注一些其他方面:
- 可組合性和可讀性
- 數(shù)據(jù)作為一個用豐富的運算符詞匯表操縱的流程
- 在您訂閱之前沒有任何事情發(fā)生
- 背壓或消費者向生產(chǎn)者發(fā)出信號表明排放率過高的能力
- 高級但高價值的抽象,與并發(fā)無關(guān)
可組合性和可讀性
通過可組合性,我們指的是編排多個異步任務(wù)的能力,使用先前任務(wù)的結(jié)果將輸入提供給后續(xù)任務(wù)或以fork-join方式執(zhí)行多個任務(wù),以及將異步任務(wù)重用為更高級別系統(tǒng)中的分立組件。
編排任務(wù)的能力與代碼的可讀性和可維護(hù)性緊密相關(guān)。隨著異步過程層數(shù)量和復(fù)雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,回調(diào)模型很簡單,但其主要缺點之一是,對于復(fù)雜的進(jìn)程,您需要從回調(diào)執(zhí)行回調(diào),本身嵌套在另一個回調(diào)中,依此類推。那個混亂被稱為Callback Hell。正如你可以猜到的(或者從經(jīng)驗中得知),這樣的代碼很難回歸并推理。
Reactor提供了豐富的組合選項,其中代碼反映了抽象過程的組織,并且所有內(nèi)容通常都保持在同一級別(嵌套最小化)。
類比裝配線工作流程
您可以將響應(yīng)式應(yīng)用程序處理的數(shù)據(jù)視為在裝配線中移動。反應(yīng)器既是傳送帶又是工作站。原材料從原料(原始Publisher)中倒出,最終成為成品,準(zhǔn)備推送給消費者(或Subscriber)。
原材料可以經(jīng)歷各種轉(zhuǎn)換和其他中間步驟,或者是將中間件聚集在一起的較大裝配線的一部分。如果在某一點出現(xiàn)毛刺或堵塞(也許裝箱產(chǎn)品需要不成比例的長時間),受影響的工作站可向上游發(fā)出信號以限制原材料的流動。
操作符(運算符)
在Reactor中,運算符是我們的匯編類比中的工作站。每個操作符都將行為添加到a Publisher并將上一步驟包裝Publisher到新實例中。因此,整個鏈被鏈接,使得數(shù)據(jù)源自第一Publisher鏈并且向下移動鏈,由每個鏈轉(zhuǎn)換。最終,Subscriber完成了整個過程。請記住,在Subscriber訂閱a 之前沒有任何事情發(fā)生Publisher,下面就會提到。
了解操作員創(chuàng)建新實例可以幫助您避免一個常見錯誤,該錯誤會導(dǎo)致您認(rèn)為您的鏈中使用的操作員未被應(yīng)用??吹竭@個項目的常見問題。
雖然Reactive Streams規(guī)范根本沒有指定運算符,但Reactor等反應(yīng)庫的最佳附加值之一是它們提供的豐富的運算符。這些涉及很多方面,從簡單的轉(zhuǎn)換和過濾到復(fù)雜的編排和錯誤處理。
在你訂閱之前什么都不會發(fā)生
在Reactor中,當(dāng)您編寫Publisher鏈時,默認(rèn)情況下數(shù)據(jù)不會啟動。相反,您可以創(chuàng)建異步過程的抽象描述(這可以幫助重用和組合)。
通過訂閱行為,您將Publishera 綁定到a Subscriber,從而觸發(fā)整個鏈中的數(shù)據(jù)流。這是通過上游傳播的單個request 信號在內(nèi)部實現(xiàn)的Subscriber,一直傳回源 Publisher。
背壓
上游傳播信號也用于實現(xiàn)背壓,我們在裝配線中將其描述為當(dāng)工作站比上游工作站處理速度慢時向線路發(fā)送的反饋信號。
Reactive Streams規(guī)范定義的真實機(jī)制非常接近于類比:訂閱者可以在無限制模式下工作,讓源以最快的速度推送所有數(shù)據(jù),或者可以使用該request機(jī)制向源發(fā)送信號表明它已準(zhǔn)備就緒處理最多的n元素。
中間操作員也可以在途中更改請求。想象一個buffer 運算符,它將元素分組為10個。如果訂閱者請求1個緩沖區(qū),則源可以生成10個元素。一些操作員還實施 預(yù)取策略,這避免了request(1)往返,并且如果在請求之前生成元素并不太昂貴,則是有益的。
這將推模型轉(zhuǎn)換為推拉式混合動力,如果它們隨時可用,下游可以從上游拉出n個元素。但是如果元素沒有準(zhǔn)備好,它們就會在生成時被上游推動。
熱與冷
在反應(yīng)庫的Rx家族中,人們可以區(qū)分兩大類反應(yīng)序列:熱和冷。這種區(qū)別主要與反應(yīng)流如何對訂閱的用戶做出反應(yīng)有關(guān):
冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中產(chǎn)生的全部消息。
而與之對應(yīng)的熱序列,則是在持續(xù)不斷地產(chǎn)生消息,訂閱者只能獲取到在其訂閱之后產(chǎn)生的消息。
以上就是Reactive反應(yīng)式編程及使用介紹的詳細(xì)內(nèi)容,更多關(guān)于Reactive反應(yīng)式編程使用的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實現(xiàn)上傳圖片尺寸修改和質(zhì)量壓縮
這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)上傳圖片尺寸修改和質(zhì)量壓縮,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-04-04Spring Boot中使用Activiti的方法教程(二)
工作流(Workflow),就是“業(yè)務(wù)過程的部分或整體在計算機(jī)應(yīng)用環(huán)境下的自動化”,下面這篇文章主要給大家介紹了關(guān)于Spring Boot中使用Activiti的相關(guān)資料,需要的朋友可以參考下2018-08-08springBoot Junit測試用例出現(xiàn)@Autowired不生效的解決
這篇文章主要介紹了springBoot Junit測試用例出現(xiàn)@Autowired不生效的解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09詳解Javaweb狀態(tài)管理的Session和Cookie
這篇文章主要介紹了Javaweb狀態(tài)管理的Session和Cookie,將瀏覽器與web服務(wù)器之間多次交互當(dāng)做一個整體來處理,并且多次交互所涉及的數(shù)據(jù)(狀態(tài))保存下來,需要的朋友可以參考下2023-05-05SpringCloud Gateway使用redis實現(xiàn)動態(tài)路由的方法
這篇文章主要介紹了SpringCloud Gateway使用redis實現(xiàn)動態(tài)路由的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01mybatis使用foreach語句實現(xiàn)IN查詢(三種)
這篇文章主要介紹了mybatis使用foreach語句實現(xiàn)IN查詢(三種),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12