java響應(yīng)式編程之Reactor使用示例解析
前言
響應(yīng)式編程在java編程中并不常見,可能比較近的接觸也就是spring-gateway中晦澀難懂的響應(yīng)式代碼,一直處于半懵逼,參考了很多文章說響應(yīng)式是一種未來趨勢,所以還是有必要研究一下
因此今天開始研究學(xué)習(xí)響應(yīng)式編程系列,計劃的學(xué)習(xí)路線:Reactor使用>Reactor源碼研究>WebFlux>Gateway
本文主要記錄響應(yīng)式編程的意義及Reactor框架的使用
響應(yīng)式
首先什么是響應(yīng)式?說白了就是異步獲取結(jié)果,這個概念可以用下面例子描述一下同步和異步
你去飯店點(diǎn)餐,點(diǎn)完餐坐在座位上等,菜做好開始吃,這就是同步
你去飯店點(diǎn)餐,點(diǎn)完餐出去逛街,菜做好了服務(wù)員給你打電話,你回來開始吃,這就是異步
關(guān)鍵就在于,你所需要的結(jié)果由服務(wù)方準(zhǔn)備好后主動通知你,再此之前你可以做其它事,你得到通知后開始進(jìn)行響應(yīng),這就是響應(yīng)式
回到程序,通常,我們在讀取數(shù)據(jù)庫或者做網(wǎng)絡(luò)請求時,都是同步阻塞執(zhí)行的,此時操作線程會一直等待著結(jié)果返回(運(yùn)行狀態(tài)中),如果請求時間很長,線程一直占用著CPU的資源,造成資源浪費(fèi),這種編程方式就是 “imperative”(迫切的)
但如果寫過js,就發(fā)現(xiàn)前端的js網(wǎng)絡(luò)請求就完全就是響應(yīng)式的編程寫法,這里并不是說java很落后,主要還是實(shí)際場景問題限制,后續(xù)會詳說
異步編程
在jvm中,我們?nèi)绾巫龅疆惒骄幊虆?,java提供了兩種異步編程的方式
- Callback 通過使用一個callback方法作為參數(shù),當(dāng)獲得結(jié)果后服務(wù)方通過調(diào)用callback方法
- Future 通過java中的Future異步獲取執(zhí)行結(jié)果
這兩種方式都可以使用,但都有一定的局限性
callback
首先是callback的使用,可讀性非常差,當(dāng)多個callback嵌套時,程序就會非常亂,出現(xiàn)傳說中的“回調(diào)地域”
比如我們舉個類似官方文檔中的例子:Example 1
一個很常見的場景,獲取某用戶的喜歡欄目,并截取前兩個,涉及到兩個服務(wù): UserService(根據(jù)用戶id獲取喜歡欄目ids),F(xiàn)avoriteService(根據(jù)欄目id獲取欄目詳情)
我們使用callback方式來異步獲取兩種數(shù)據(jù),首先定義callback接口
public interface Callback<T> { void onSuccess(T t); void onError(String error); }
然后是UserService,根據(jù)用戶id獲取喜歡欄目ids,用sleep來模擬讀取時間
public class CallbackUserService { public void getFavorites(Long userId, Callback<List<Long>> callback) { new Thread(() -> { try { // 模擬數(shù)據(jù)庫訪問時間 Thread.sleep(1000); List<Long> favorites = Arrays.asList(1L,2L,3L); callback.onSuccess(favorites); } catch (Exception e) { callback.onError("讀取出現(xiàn)錯誤"); } }).start(); } }
然后是FavoriteService,根據(jù)欄目id獲取欄目詳情
public class CallbackFavoriteService { private Map<Long, String> names = new HashMap<Long, String>() {{ put(1L, "football"); put(2L, "movie"); put(3L, "film"); }}; public void getDetail(Long id, Callback<String> callback) { new Thread(() -> { try { // 模擬數(shù)據(jù)庫訪問時間 Thread.sleep(1000); callback.onSuccess(names.get(id)); } catch (Exception e) { callback.onError("讀取出現(xiàn)錯誤"); } }).start(); } }
這時開始寫我們的主代碼,如下
userService.getFavorites(23L, new Callback<List<Long>>() { @Override public void onSuccess(List<Long> favIds) { favIds.stream().limit(2).forEach(favId->{ favoriteService.getDetail(favId, new Callback<String>() { @Override public void onSuccess(String s) { System.out.println(s); } @Override public void onError(String error) { System.out.println(error); } }); }); } @Override public void onError(String error) { System.out.println(error); } }); while (true) { System.out.println("做點(diǎn)其它事。。。"); Thread.sleep(1000); }
最終代碼輸出如下:
做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
movie
football
做點(diǎn)其它事。。。
可以看到在等待的過程中,主線程并沒有阻塞,實(shí)現(xiàn)了異步編程,但可以看到代碼是十分混亂的,如果再加上一個需求:“用戶沒有喜歡欄目時通過建議服務(wù)獲取建議欄目”,可以想象對應(yīng)的代碼會有多么混亂不堪
Future
Future的弱點(diǎn)更明顯了,在執(zhí)行g(shù)et()方法獲取結(jié)果時依然會阻塞線程
雖然jdk8中出現(xiàn)了CompletableFuture
可以真正實(shí)現(xiàn)異步編程但是使用起來也是非常麻煩
Reactive Stream
由于JVM本身的響應(yīng)式編程支持的缺失,針對JVM響應(yīng)式編程的擴(kuò)展庫開始陸續(xù)出現(xiàn),首先是RxJava庫的出現(xiàn)擴(kuò)展了JVM的響應(yīng)式編程,而隨著時間的推移一個響應(yīng)式規(guī)范誕生了,即 Reactive Stream,它為 JVM 上的響應(yīng)式編程定義了一組接口和交互規(guī)則,RxJava從 RxJava2 開始實(shí)現(xiàn) Reactive Stream規(guī)范。同時 MongoDB、Reactor、Slick 等也相繼實(shí)現(xiàn)了 Reactive Stream 規(guī)范
Reactive Stream規(guī)范所定義的一系列接口也被集成在java 9的Flow包下
Reactor
本文主要介紹相對較火的Reactor,它在滿足響應(yīng)式編程的同時讓代碼變的可讀性可維護(hù)及可維護(hù)性非常高
首先思想上,Reactor是一個發(fā)布訂閱的模式,由服務(wù)方發(fā)布數(shù)據(jù),訂閱者獲取通知進(jìn)行相關(guān)響應(yīng),服務(wù)方也可以不停的發(fā)布數(shù)據(jù),形成動態(tài)數(shù)據(jù)流
而在這個數(shù)據(jù)流動的中間過程Reactor提供了一系列的中間處理運(yùn)算符:比如map,take,flatMap等對數(shù)據(jù)進(jìn)行中間處理
異步編程的關(guān)鍵在于我們要在數(shù)據(jù)返回前就知道數(shù)據(jù)的格式,就比如我們寫js對接接口時,一定是提前知道了返回數(shù)據(jù)的形式才能寫出代碼
Reactor有兩種數(shù)據(jù)形式,分別用Flux和Mono表示,如果是Flux代表將來發(fā)送的是多個數(shù)據(jù),如果是Mono代表將來放回的是1個數(shù)據(jù)(也有可能是0)
Reactor的定義還是非常抽象,我們還是拿Example 1
,如果我們使用Reactor,這段代碼該如何改造?
第一步,引入Reactor框架
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.3.8.RELEASE</version> </dependency>
先改造我們的UserService
public class ReactorUserService { // 返回的是多個,即Flux public Flux<Long> getFavorites(Long userId) { // 創(chuàng)建Flux并返回 return Flux.create(sink -> { new Thread(() -> { // 模擬數(shù)據(jù)庫訪問時間 try { Thread.sleep(1000); // 發(fā)布數(shù)據(jù) 1,2,3 sink.next(1L); sink.next(2L); sink.next(3L); // 標(biāo)識發(fā)布結(jié)束 sink.complete(); } catch (Exception e) { e.printStackTrace(); sink.error(new Exception("讀取出現(xiàn)錯誤")); } }).start(); }); } }
然后是FavoriteService
public class ReactorFavoriteService { private Map<Long, String> names = new HashMap<Long, String>() {{ put(1L, "football"); put(2L, "movie"); put(3L, "film"); }}; // 返回單個數(shù)據(jù),所以是Mono public Mono<String> getDetail(Long id) { return Mono.create(sink -> { new Thread(() -> { // 模擬數(shù)據(jù)庫訪問時間 try { Thread.sleep(1000); // 發(fā)布并完成 sink.success(names.get(id)); } catch (Exception e) { e.printStackTrace(); sink.error(new Exception("讀取出現(xiàn)錯誤")); } }).start(); }); } }
開始定義操作流
userService.getFavorites(23L) .flatMap(id->favoriteService.getDetail(id)) // 取詳情 .take(2) // 取前兩個 .subscribe(System.out::println, error-> System.out.println("process error:"+error)); // 訂閱,處理方式sout while (true) { System.out.println("做點(diǎn)其它事。。。"); Thread.sleep(1000); }
最終輸出:
做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
film
football
做點(diǎn)其它事。。。
再回頭看一下callback的代碼,會不會覺得Reactor真香?有沒有感覺像在寫js...
Redis異步讀取
回頭細(xì)品一下上面的代碼,確實(shí)做到了非阻塞獲取數(shù)據(jù),并在數(shù)據(jù)獲取到時做出相應(yīng)
但以上的模擬代碼肯定是不合理的,雖然主線程沒阻塞,但新開了一個線程去阻塞等待結(jié)果,很顯然是脫了褲子放屁的事
上面我說過,響應(yīng)式可行的關(guān)鍵是:你所需要的結(jié)果由服務(wù)方準(zhǔn)備好后主動通知你,所以從本質(zhì)上來說,我們以上的例子最根源的發(fā)布者是數(shù)據(jù)庫,如果我們給數(shù)據(jù)庫發(fā)送請求,數(shù)據(jù)庫準(zhǔn)備好數(shù)據(jù)后主動通知我們,我們再去響應(yīng),這才是徹底擁有了響應(yīng)式的價值~即節(jié)省資源
但~很顯然我們常用的數(shù)據(jù)庫大多數(shù)現(xiàn)階段不會給我們提供這種服務(wù),但也有特例,比如redis和mongodb就可以異步獲取數(shù)據(jù),那如果再結(jié)合我們的Reactor框架,才能真正做到響應(yīng)式編程
下面就以redis為例,看看使用Reactor如何做到響應(yīng)式讀取redis數(shù)據(jù)
lettuce
這里介紹一個redis的客戶端:lettuce,相比于Jedis這種老牌客戶端,lettuce基于netty技術(shù)可以實(shí)現(xiàn)異步讀取redis數(shù)據(jù),lettuce更加先進(jìn),即便spring-redis的底層也從Jedis變成了lettuce
我們試著使用使用Reactor+Lettuce寫一個SuggestionService(異步獲取redis中存儲的推薦欄目)
public class ReactorRedisSuggestionService { private RedisURI redisUri = RedisURI.builder().withHost("127.0.0.1").withPort(6379).build(); public Mono<String> getSuggestions() { return Mono.create(sink -> { RedisClient redisClient = RedisClient.create(redisUri); // 客戶端 StatefulRedisConnection<String, String> connection = redisClient.connect(); // 連接 RedisAsyncCommands<String, String> asyncCommands = connection.async(); // 異步指令 asyncCommands.get("favorites").thenAccept(favorites->{// 異步獲取,key為favorites sink.success(favorites); // 返回數(shù)據(jù)后推送給mono connection.close(); // 關(guān)閉連接 redisClient.shutdown(); }); }); } }
此時我們流程代碼如下
suggestionService.getSuggestions() .subscribe(System.out::println, error-> System.out.println("process error:"+error));
整個程序執(zhí)行過程是這樣的,主線程向redis發(fā)起讀取數(shù)據(jù)請求,redis準(zhǔn)備返回數(shù)據(jù)后交給lettuce的響應(yīng)線程池中的子線程,子線程根據(jù)訂閱的處理將結(jié)果輸出
整個過程沒有任何阻塞,也沒有一點(diǎn)資源的浪費(fèi),是真真正正的響應(yīng)式編程
實(shí)際上lettuce內(nèi)部也集成了reactor框架,所以SuggestionService可以直接簡化成這樣
public Mono<String> getSuggestions() { RedisReactiveCommands<String, String> reactiveCommands = connection.reactive(); // 響應(yīng)式指令 return reactiveCommands.get("favorites"); // key為favorites,返回的就是Mono }
真的很方便的說
壓測
接下來做一個小的壓力測試,分別用同步和異步的方式獲取redis數(shù)據(jù),使用一個固定大小的線程池模擬處理請求的線程池,看看在同步和異步兩種方式下這些線程池多久能從獲取任務(wù)中釋放出來干別的事情
首先是同步代碼
@Test public void sync() { Executor pool = Executors.newFixedThreadPool(10); // 請求處理線程 10個 RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection<String, String> connection = redisClient.connect(); long startTime = System.currentTimeMillis(); for (int i=0;i<30;i++) { // 模擬30個請求 pool.execute(()->{ connection.sync().get("favorites"); // 同步獲取 }); } pool.execute(()->{ // 執(zhí)行到這里代表線程池的線程都釋放出來了,可以做其它事情了,記錄一下時間 long endTIme = System.currentTimeMillis(); System.out.println("free:" + (endTIme-startTime)); }); for(;;); }
再看看異步代碼
@Test public void async() { Executor pool = Executors.newFixedThreadPool(5); // 請求處理線程 5個 ClientResources res = DefaultClientResources.builder().ioThreadPoolSize(5).build(); // 回調(diào)處理線程5個 RedisClient redisClient = RedisClient.create(res, redisUri); StatefulRedisConnection<String, String> connection = redisClient.connect(); long startTime = System.currentTimeMillis(); for (int i=0;i<30;i++) { // 模擬30個請求 pool.execute(()-> { RedisReactiveCommands<String, String> reactiveCommands = connection.reactive(); reactiveCommands.get("favorites").subscribe(); // 異步獲取 }); } // 執(zhí)行到這里代表線程池的線程都釋放出來了,可以做其它事情了,記錄一下時間 pool.execute(()->{ long endTIme = System.currentTimeMillis(); System.out.println("free:" + (endTIme-startTime)); }); for(;;); }
這里為什么處理模擬請求的線程變成5個了吶,因為lettuce的異步處理回調(diào)還占用了5個,這樣兩種方式實(shí)際工作的線程數(shù)都是10個,比較公平(實(shí)際上異步代碼還是吃點(diǎn)虧,因為回調(diào)處理的線程不能參與請求)
運(yùn)行一下結(jié)果,差別很大:
- 同步輸出:
free 3998
接近3秒 - 異步輸出:
free 11
才11毫秒
這種差距數(shù)據(jù)量越大、帶寬越低越明顯,異步基本沒有變化,而同步越來越大
注意:異步加快數(shù)據(jù)讀取速度,而是在等待數(shù)據(jù)過程中釋放了資源,讓CPU可以繼續(xù)干其他的事,增加系統(tǒng)吞吐量
使用
以上討論了傳統(tǒng)編程方式資源的浪費(fèi),以及響應(yīng)式的種種好處,但為什么這東西還沒大火以致徹底顛覆我們的編程方式吶,個人認(rèn)為主要是以下幾點(diǎn)(針對web服務(wù)端開發(fā))
- 響應(yīng)式的關(guān)鍵在于服務(wù)方的主動通知,最底層需要依靠NIO技術(shù),而我們數(shù)據(jù)庫大多是不支持的
- 大多數(shù)情況下我們網(wǎng)絡(luò)請求讀取數(shù)據(jù)是比較快的,異步鎖導(dǎo)致的線程切換反而更加浪費(fèi)時間,就比如說上面的例子,你去餐廳點(diǎn)餐,如果餐廳做飯很快,你出去溜達(dá)等通知反而更浪費(fèi)時間
- 我們一般寫接口返回數(shù)據(jù)給前端展示,異步操作獲取數(shù)據(jù)后的處理都是其他線程完成的,問題在于其他線程如何獲取到當(dāng)前連接并寫回數(shù)據(jù)到http響應(yīng),這處理起來還是很麻煩的(也是WebFlux要解決的問題)
- 傳統(tǒng)阻塞代碼更易讀且易調(diào)試
- 轉(zhuǎn)換為非阻塞學(xué)習(xí)曲線陡峭,寫出來的代碼甚至不像是java語言...,有點(diǎn)類似node.js
因此我們傳統(tǒng)的web服務(wù)端場景并不適合使用響應(yīng)式編程,甚至在spring的WebFlux官網(wǎng)也不建議大家切換響應(yīng)式編程,只有明確知道這樣做可帶來的性能提升時才會考慮使用它,比如你有一個微服務(wù)專門從redis或mongo這樣的數(shù)據(jù)庫讀寫很大數(shù)據(jù)緩存,就可以考慮使用它來減少資源浪費(fèi),再比如IO密集的網(wǎng)關(guān)服務(wù),使用它就可以增加網(wǎng)關(guān)的吞吐量
以上就是java響應(yīng)式編程之Reactor使用示例解析的詳細(xì)內(nèi)容,更多關(guān)于java響應(yīng)式編程Reactor的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot?vue前后端接口測試樹結(jié)點(diǎn)添加功能
這篇文章主要為大家介紹了springboot?vue前后端接口測試樹結(jié)點(diǎn)添加功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05Java Jedis NOAUTH Authentication required問題解決方法
這篇文章主要介紹了Java Jedis NOAUTH Authentication required問題解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-07-07Java使用Kaptcha實(shí)現(xiàn)簡單的驗證碼生成器
這篇文章主要為大家詳細(xì)介紹了Java如何使用Kaptcha實(shí)現(xiàn)簡單的驗證碼生成器,文中的示例代碼講解詳細(xì),具有一定的借鑒價值,有需要的小伙伴可以參考下2024-02-02Java實(shí)現(xiàn)的圖像查看器完整實(shí)例
這篇文章主要介紹了Java實(shí)現(xiàn)的圖像查看器,以完整實(shí)例形式較為詳細(xì)的分析了java處理圖片的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-10-10解決java讀取EXCEL數(shù)據(jù)變成科學(xué)計數(shù)法的問題
這篇文章主要介紹了解決java讀取EXCEL數(shù)據(jù)變成科學(xué)計數(shù)法的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04springcloud使用Hystrix進(jìn)行微服務(wù)降級管理
這篇文章主要介紹了springcloud使用Hystrix進(jìn)行微服務(wù)降級管理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04