欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java響應(yīng)式編程之Reactor使用示例解析

 更新時間:2023年07月13日 11:27:10   作者:pq217  
這篇文章主要為大家介紹了java響應(yīng)式編程之Reactor使用示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

響應(yīng)式編程在java編程中并不常見,可能比較近的接觸也就是spring-gateway中晦澀難懂的響應(yīng)式代碼,一直處于半懵逼,參考了很多文章說響應(yīng)式是一種未來趨勢,所以還是有必要研究一下

因此今天開始研究學(xué)習(xí)響應(yīng)式編程系列,計劃的學(xué)習(xí)路線:Reactor使用>Reactor源碼研究>WebFlux>Gateway

本文主要記錄響應(yīng)式編程的意義及Reactor框架的使用

響應(yīng)式

首先什么是響應(yīng)式?說白了就是異步獲取結(jié)果,這個概念可以用下面例子描述一下同步和異步

你去飯店點(diǎn)餐,點(diǎn)完餐坐在座位上等,菜做好開始吃,這就是同步

你去飯店點(diǎn)餐,點(diǎn)完餐出去逛街,菜做好了服務(wù)員給你打電話,你回來開始吃,這就是異步

關(guān)鍵就在于,你所需要的結(jié)果由服務(wù)方準(zhǔn)備好后主動通知你,再此之前你可以做其它事,你得到通知后開始進(jìn)行響應(yīng),這就是響應(yīng)式

回到程序,通常,我們在讀取數(shù)據(jù)庫或者做網(wǎng)絡(luò)請求時,都是同步阻塞執(zhí)行的,此時操作線程會一直等待著結(jié)果返回(運(yùn)行狀態(tài)中),如果請求時間很長,線程一直占用著CPU的資源,造成資源浪費(fèi),這種編程方式就是 “imperative”(迫切的)

但如果寫過js,就發(fā)現(xiàn)前端的js網(wǎng)絡(luò)請求就完全就是響應(yīng)式的編程寫法,這里并不是說java很落后,主要還是實(shí)際場景問題限制,后續(xù)會詳說

異步編程

在jvm中,我們?nèi)绾巫龅疆惒骄幊虆?,java提供了兩種異步編程的方式

  • Callback 通過使用一個callback方法作為參數(shù),當(dāng)獲得結(jié)果后服務(wù)方通過調(diào)用callback方法
  • Future 通過java中的Future異步獲取執(zhí)行結(jié)果

這兩種方式都可以使用,但都有一定的局限性

callback

首先是callback的使用,可讀性非常差,當(dāng)多個callback嵌套時,程序就會非常亂,出現(xiàn)傳說中的“回調(diào)地域”

比如我們舉個類似官方文檔中的例子:
Example 1 一個很常見的場景,獲取某用戶的喜歡欄目,并截取前兩個,涉及到兩個服務(wù): UserService(根據(jù)用戶id獲取喜歡欄目ids),F(xiàn)avoriteService(根據(jù)欄目id獲取欄目詳情)

我們使用callback方式來異步獲取兩種數(shù)據(jù),首先定義callback接口

public interface Callback<T> {
    void onSuccess(T t);
    void onError(String error);
}

然后是UserService,根據(jù)用戶id獲取喜歡欄目ids,用sleep來模擬讀取時間

public class CallbackUserService {
    public void getFavorites(Long userId, Callback<List<Long>> callback) {
        new Thread(() -> {
            try {
                // 模擬數(shù)據(jù)庫訪問時間
                Thread.sleep(1000);
                List<Long> favorites = Arrays.asList(1L,2L,3L);
                callback.onSuccess(favorites);
            } catch (Exception e) {
                callback.onError("讀取出現(xiàn)錯誤");
            }
        }).start();
    }
}

然后是FavoriteService,根據(jù)欄目id獲取欄目詳情

public class CallbackFavoriteService {
    private Map<Long, String> names = new HashMap<Long, String>() {{
        put(1L, "football");
        put(2L, "movie");
        put(3L, "film");
    }};
    public void getDetail(Long id, Callback<String> callback) {
        new Thread(() -> {
            try {
                // 模擬數(shù)據(jù)庫訪問時間
                Thread.sleep(1000);
                callback.onSuccess(names.get(id));
            } catch (Exception e) {
                callback.onError("讀取出現(xiàn)錯誤");
            }
        }).start();
    }
}

這時開始寫我們的主代碼,如下

userService.getFavorites(23L, new Callback<List<Long>>() {
    @Override
    public void onSuccess(List<Long> favIds) {
        favIds.stream().limit(2).forEach(favId->{
            favoriteService.getDetail(favId, new Callback<String>() {
                @Override
                public void onSuccess(String s) {
                    System.out.println(s);
                }
                @Override
                public void onError(String error) {
                    System.out.println(error);
                }
            });
        });
    }
    @Override
    public void onError(String error) {
        System.out.println(error);
    }
});
while (true) {
    System.out.println("做點(diǎn)其它事。。。");
    Thread.sleep(1000);
}

最終代碼輸出如下:

做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
movie
football
做點(diǎn)其它事。。。

可以看到在等待的過程中,主線程并沒有阻塞,實(shí)現(xiàn)了異步編程,但可以看到代碼是十分混亂的,如果再加上一個需求:“用戶沒有喜歡欄目時通過建議服務(wù)獲取建議欄目”,可以想象對應(yīng)的代碼會有多么混亂不堪

Future

Future的弱點(diǎn)更明顯了,在執(zhí)行g(shù)et()方法獲取結(jié)果時依然會阻塞線程

雖然jdk8中出現(xiàn)了CompletableFuture可以真正實(shí)現(xiàn)異步編程但是使用起來也是非常麻煩

Reactive Stream

由于JVM本身的響應(yīng)式編程支持的缺失,針對JVM響應(yīng)式編程的擴(kuò)展庫開始陸續(xù)出現(xiàn),首先是RxJava庫的出現(xiàn)擴(kuò)展了JVM的響應(yīng)式編程,而隨著時間的推移一個響應(yīng)式規(guī)范誕生了,即 Reactive Stream,它為 JVM 上的響應(yīng)式編程定義了一組接口和交互規(guī)則,RxJava從 RxJava2 開始實(shí)現(xiàn) Reactive Stream規(guī)范。同時 MongoDB、Reactor、Slick 等也相繼實(shí)現(xiàn)了 Reactive Stream 規(guī)范

Reactive Stream規(guī)范所定義的一系列接口也被集成在java 9的Flow包下

Reactor

本文主要介紹相對較火的Reactor,它在滿足響應(yīng)式編程的同時讓代碼變的可讀性可維護(hù)及可維護(hù)性非常高

首先思想上,Reactor是一個發(fā)布訂閱的模式,由服務(wù)方發(fā)布數(shù)據(jù),訂閱者獲取通知進(jìn)行相關(guān)響應(yīng),服務(wù)方也可以不停的發(fā)布數(shù)據(jù),形成動態(tài)數(shù)據(jù)流

而在這個數(shù)據(jù)流動的中間過程Reactor提供了一系列的中間處理運(yùn)算符:比如map,take,flatMap等對數(shù)據(jù)進(jìn)行中間處理

異步編程的關(guān)鍵在于我們要在數(shù)據(jù)返回前就知道數(shù)據(jù)的格式,就比如我們寫js對接接口時,一定是提前知道了返回數(shù)據(jù)的形式才能寫出代碼

Reactor有兩種數(shù)據(jù)形式,分別用Flux和Mono表示,如果是Flux代表將來發(fā)送的是多個數(shù)據(jù),如果是Mono代表將來放回的是1個數(shù)據(jù)(也有可能是0)

Reactor的定義還是非常抽象,我們還是拿Example 1,如果我們使用Reactor,這段代碼該如何改造?

第一步,引入Reactor框架

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.8.RELEASE</version>
</dependency>

先改造我們的UserService

public class ReactorUserService {
    // 返回的是多個,即Flux
    public Flux<Long> getFavorites(Long userId) {
        // 創(chuàng)建Flux并返回
        return Flux.create(sink -> {
            new Thread(() -> {
                // 模擬數(shù)據(jù)庫訪問時間
                try {
                    Thread.sleep(1000);
                    // 發(fā)布數(shù)據(jù) 1,2,3
                    sink.next(1L);
                    sink.next(2L);
                    sink.next(3L);
                    // 標(biāo)識發(fā)布結(jié)束
                    sink.complete();
                } catch (Exception e) {
                    e.printStackTrace();
                    sink.error(new Exception("讀取出現(xiàn)錯誤"));
                }
            }).start();
        });
    }
}

然后是FavoriteService

public class ReactorFavoriteService {
    private Map<Long, String> names = new HashMap<Long, String>() {{
        put(1L, "football");
        put(2L, "movie");
        put(3L, "film");
    }};
    // 返回單個數(shù)據(jù),所以是Mono
    public Mono<String> getDetail(Long id) {
        return Mono.create(sink -> {
            new Thread(() -> {
                // 模擬數(shù)據(jù)庫訪問時間
                try {
                    Thread.sleep(1000);
                    // 發(fā)布并完成
                    sink.success(names.get(id));
                } catch (Exception e) {
                    e.printStackTrace();
                    sink.error(new Exception("讀取出現(xiàn)錯誤"));
                }
            }).start();
        });
    }
}

開始定義操作流

userService.getFavorites(23L)
        .flatMap(id->favoriteService.getDetail(id)) // 取詳情
        .take(2) // 取前兩個
        .subscribe(System.out::println, error-> System.out.println("process error:"+error)); // 訂閱,處理方式sout
while (true) {
    System.out.println("做點(diǎn)其它事。。。");
    Thread.sleep(1000);
}

最終輸出:

做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
做點(diǎn)其它事。。。
film
football
做點(diǎn)其它事。。。

再回頭看一下callback的代碼,會不會覺得Reactor真香?有沒有感覺像在寫js...

Redis異步讀取

回頭細(xì)品一下上面的代碼,確實(shí)做到了非阻塞獲取數(shù)據(jù),并在數(shù)據(jù)獲取到時做出相應(yīng)

但以上的模擬代碼肯定是不合理的,雖然主線程沒阻塞,但新開了一個線程去阻塞等待結(jié)果,很顯然是脫了褲子放屁的事

上面我說過,響應(yīng)式可行的關(guān)鍵是:你所需要的結(jié)果由服務(wù)方準(zhǔn)備好后主動通知你,所以從本質(zhì)上來說,我們以上的例子最根源的發(fā)布者是數(shù)據(jù)庫,如果我們給數(shù)據(jù)庫發(fā)送請求,數(shù)據(jù)庫準(zhǔn)備好數(shù)據(jù)后主動通知我們,我們再去響應(yīng),這才是徹底擁有了響應(yīng)式的價值~即節(jié)省資源

但~很顯然我們常用的數(shù)據(jù)庫大多數(shù)現(xiàn)階段不會給我們提供這種服務(wù),但也有特例,比如redis和mongodb就可以異步獲取數(shù)據(jù),那如果再結(jié)合我們的Reactor框架,才能真正做到響應(yīng)式編程

下面就以redis為例,看看使用Reactor如何做到響應(yīng)式讀取redis數(shù)據(jù)

lettuce

這里介紹一個redis的客戶端:lettuce,相比于Jedis這種老牌客戶端,lettuce基于netty技術(shù)可以實(shí)現(xiàn)異步讀取redis數(shù)據(jù),lettuce更加先進(jìn),即便spring-redis的底層也從Jedis變成了lettuce

我們試著使用使用Reactor+Lettuce寫一個SuggestionService(異步獲取redis中存儲的推薦欄目)

public class ReactorRedisSuggestionService {
    private RedisURI redisUri = RedisURI.builder().withHost("127.0.0.1").withPort(6379).build();
    public Mono<String> getSuggestions() {
        return Mono.create(sink -> {
            RedisClient redisClient = RedisClient.create(redisUri); // 客戶端
            StatefulRedisConnection<String, String> connection = redisClient.connect(); // 連接
            RedisAsyncCommands<String, String> asyncCommands = connection.async(); // 異步指令
            asyncCommands.get("favorites").thenAccept(favorites->{// 異步獲取,key為favorites
                sink.success(favorites); // 返回數(shù)據(jù)后推送給mono
                connection.close();  // 關(guān)閉連接
                redisClient.shutdown();
            });
        });
    }
}

此時我們流程代碼如下

suggestionService.getSuggestions()
                .subscribe(System.out::println, error-> System.out.println("process error:"+error));

整個程序執(zhí)行過程是這樣的,主線程向redis發(fā)起讀取數(shù)據(jù)請求,redis準(zhǔn)備返回數(shù)據(jù)后交給lettuce的響應(yīng)線程池中的子線程,子線程根據(jù)訂閱的處理將結(jié)果輸出

整個過程沒有任何阻塞,也沒有一點(diǎn)資源的浪費(fèi),是真真正正的響應(yīng)式編程

實(shí)際上lettuce內(nèi)部也集成了reactor框架,所以SuggestionService可以直接簡化成這樣

public Mono<String> getSuggestions() {
        RedisReactiveCommands<String, String> reactiveCommands = connection.reactive(); // 響應(yīng)式指令
        return reactiveCommands.get("favorites"); // key為favorites,返回的就是Mono
}

真的很方便的說

壓測

接下來做一個小的壓力測試,分別用同步和異步的方式獲取redis數(shù)據(jù),使用一個固定大小的線程池模擬處理請求的線程池,看看在同步和異步兩種方式下這些線程池多久能從獲取任務(wù)中釋放出來干別的事情

首先是同步代碼

@Test
public void sync() {
    Executor pool = Executors.newFixedThreadPool(10); // 請求處理線程 10個
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    long startTime = System.currentTimeMillis();
    for (int i=0;i<30;i++) { // 模擬30個請求
        pool.execute(()->{
            connection.sync().get("favorites"); // 同步獲取
        });
    }
    pool.execute(()->{
        // 執(zhí)行到這里代表線程池的線程都釋放出來了,可以做其它事情了,記錄一下時間
        long endTIme = System.currentTimeMillis();
        System.out.println("free:" + (endTIme-startTime));
    });
    for(;;);
}

再看看異步代碼

@Test
public void async() {
    Executor pool = Executors.newFixedThreadPool(5); // 請求處理線程 5個
    ClientResources res = DefaultClientResources.builder().ioThreadPoolSize(5).build(); // 回調(diào)處理線程5個
    RedisClient redisClient = RedisClient.create(res, redisUri);
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    long startTime = System.currentTimeMillis();
    for (int i=0;i<30;i++) { // 模擬30個請求
        pool.execute(()-> {
            RedisReactiveCommands<String, String> reactiveCommands = connection.reactive();
            reactiveCommands.get("favorites").subscribe(); // 異步獲取
        });
    }
    // 執(zhí)行到這里代表線程池的線程都釋放出來了,可以做其它事情了,記錄一下時間
    pool.execute(()->{
        long endTIme = System.currentTimeMillis();
        System.out.println("free:" + (endTIme-startTime));
    });
    for(;;);
}

這里為什么處理模擬請求的線程變成5個了吶,因為lettuce的異步處理回調(diào)還占用了5個,這樣兩種方式實(shí)際工作的線程數(shù)都是10個,比較公平(實(shí)際上異步代碼還是吃點(diǎn)虧,因為回調(diào)處理的線程不能參與請求)

運(yùn)行一下結(jié)果,差別很大:

  • 同步輸出:free 3998 接近3秒
  • 異步輸出:free 11 才11毫秒

這種差距數(shù)據(jù)量越大、帶寬越低越明顯,異步基本沒有變化,而同步越來越大

注意:異步加快數(shù)據(jù)讀取速度,而是在等待數(shù)據(jù)過程中釋放了資源,讓CPU可以繼續(xù)干其他的事,增加系統(tǒng)吞吐量

使用

以上討論了傳統(tǒng)編程方式資源的浪費(fèi),以及響應(yīng)式的種種好處,但為什么這東西還沒大火以致徹底顛覆我們的編程方式吶,個人認(rèn)為主要是以下幾點(diǎn)(針對web服務(wù)端開發(fā))

  • 響應(yīng)式的關(guān)鍵在于服務(wù)方的主動通知,最底層需要依靠NIO技術(shù),而我們數(shù)據(jù)庫大多是不支持的
  • 大多數(shù)情況下我們網(wǎng)絡(luò)請求讀取數(shù)據(jù)是比較快的,異步鎖導(dǎo)致的線程切換反而更加浪費(fèi)時間,就比如說上面的例子,你去餐廳點(diǎn)餐,如果餐廳做飯很快,你出去溜達(dá)等通知反而更浪費(fèi)時間
  • 我們一般寫接口返回數(shù)據(jù)給前端展示,異步操作獲取數(shù)據(jù)后的處理都是其他線程完成的,問題在于其他線程如何獲取到當(dāng)前連接并寫回數(shù)據(jù)到http響應(yīng),這處理起來還是很麻煩的(也是WebFlux要解決的問題)
  • 傳統(tǒng)阻塞代碼更易讀且易調(diào)試
  • 轉(zhuǎn)換為非阻塞學(xué)習(xí)曲線陡峭,寫出來的代碼甚至不像是java語言...,有點(diǎn)類似node.js

因此我們傳統(tǒng)的web服務(wù)端場景并不適合使用響應(yīng)式編程,甚至在spring的WebFlux官網(wǎng)也不建議大家切換響應(yīng)式編程,只有明確知道這樣做可帶來的性能提升時才會考慮使用它,比如你有一個微服務(wù)專門從redis或mongo這樣的數(shù)據(jù)庫讀寫很大數(shù)據(jù)緩存,就可以考慮使用它來減少資源浪費(fèi),再比如IO密集的網(wǎng)關(guān)服務(wù),使用它就可以增加網(wǎng)關(guān)的吞吐量

以上就是java響應(yīng)式編程之Reactor使用示例解析的詳細(xì)內(nèi)容,更多關(guān)于java響應(yīng)式編程Reactor的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • springboot?vue前后端接口測試樹結(jié)點(diǎn)添加功能

    springboot?vue前后端接口測試樹結(jié)點(diǎn)添加功能

    這篇文章主要為大家介紹了springboot?vue前后端接口測試樹結(jié)點(diǎn)添加功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Java 反射機(jī)制詳解及實(shí)例

    Java 反射機(jī)制詳解及實(shí)例

    這篇文章主要介紹了Java 反射機(jī)制詳解及實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • Java Jedis NOAUTH Authentication required問題解決方法

    Java Jedis NOAUTH Authentication required問題解決方法

    這篇文章主要介紹了Java Jedis NOAUTH Authentication required問題解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-07-07
  • Java8 Lambda和Invokedynamic詳情

    Java8 Lambda和Invokedynamic詳情

    關(guān)于Java8的Lambda 我們可以將lambda表達(dá)式與新的Streams API結(jié)合起來,以表達(dá)豐富的數(shù)據(jù)處理查詢,下面文章小編就列舉簡單的例子給大家介說吧,感興趣的小伙伴可以參考下面文章的具體內(nèi)容奧
    2021-09-09
  • Spring中的依賴注入DI源碼詳細(xì)解析

    Spring中的依賴注入DI源碼詳細(xì)解析

    這篇文章主要介紹了Spring中的依賴注入DI源碼詳細(xì)解析,Spring的依賴注入(Dependency Injection,DI)是Spring框架核心的一部分,它是實(shí)現(xiàn)控制反轉(zhuǎn)(Inversion of Control,IoC)的一種方式,需要的朋友可以參考下
    2023-11-11
  • Java使用Kaptcha實(shí)現(xiàn)簡單的驗證碼生成器

    Java使用Kaptcha實(shí)現(xiàn)簡單的驗證碼生成器

    這篇文章主要為大家詳細(xì)介紹了Java如何使用Kaptcha實(shí)現(xiàn)簡單的驗證碼生成器,文中的示例代碼講解詳細(xì),具有一定的借鑒價值,有需要的小伙伴可以參考下
    2024-02-02
  • Java實(shí)現(xiàn)的圖像查看器完整實(shí)例

    Java實(shí)現(xiàn)的圖像查看器完整實(shí)例

    這篇文章主要介紹了Java實(shí)現(xiàn)的圖像查看器,以完整實(shí)例形式較為詳細(xì)的分析了java處理圖片的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-10-10
  • 解決java讀取EXCEL數(shù)據(jù)變成科學(xué)計數(shù)法的問題

    解決java讀取EXCEL數(shù)據(jù)變成科學(xué)計數(shù)法的問題

    這篇文章主要介紹了解決java讀取EXCEL數(shù)據(jù)變成科學(xué)計數(shù)法的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • Java實(shí)現(xiàn)給微信群中定時推送消息

    Java實(shí)現(xiàn)給微信群中定時推送消息

    這篇文章主要為大家詳細(xì)介紹了Java如何實(shí)現(xiàn)給微信群中定時推送消息的功能,文中的示例代碼講解詳細(xì),具有一定的借鑒價值,需要的可以了解一下
    2022-12-12
  • springcloud使用Hystrix進(jìn)行微服務(wù)降級管理

    springcloud使用Hystrix進(jìn)行微服務(wù)降級管理

    這篇文章主要介紹了springcloud使用Hystrix進(jìn)行微服務(wù)降級管理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04

最新評論