欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Reactor定制一個(gè)生產(chǎn)的WebClient實(shí)現(xiàn)示例

 更新時(shí)間:2023年08月23日 16:14:48   作者:六七十三  
這篇文章主要為大家介紹了Reactor定制一個(gè)生產(chǎn)的WebClient實(shí)現(xiàn)示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

1 為什么要用 WebClient

剛開(kāi)始嘗試使用 Spring WebFlux 的時(shí)候,很多人都會(huì)使用 Mono.fromFuture() 將異步請(qǐng)求轉(zhuǎn)成 Mono 對(duì)象,或者 Mono.fromSupplier() 將請(qǐng)求轉(zhuǎn)成 MOno 對(duì)象,這兩種方式在響應(yīng)式編程中都是不建議的,都會(huì)阻塞當(dāng)前線程。

1.1 Mono.fromFuture() VS WebClient

Mono.fromFuture()方法和使用 WebClient 調(diào)用第三方接口之間存在以下區(qū)別:

  • 異步 vs. 非阻塞

Mono.fromFuture()方法適用于接收一個(gè) java.util.concurrent.Future 對(duì)象,并將其轉(zhuǎn)換為響應(yīng)式的 Mono。這是一個(gè)阻塞操作,因?yàn)樗鼤?huì)等待 Future 對(duì)象完成。而使用 WebClient 調(diào)用第三方接口是異步和非阻塞的,它不會(huì)直接阻塞應(yīng)用程序的執(zhí)行,而是使用事件驅(qū)動(dòng)的方式處理響應(yīng)。

可擴(kuò)展性和靈活性:使用 WebClient 可以更靈活地進(jìn)行配置和處理,例如設(shè)置超時(shí)時(shí)間、請(qǐng)求頭、重試機(jī)制等。WebClient 還可以與許多其他 Spring WebFlux 組件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是適用于單個(gè) Future 對(duì)象轉(zhuǎn)化為 Mono 的情況,可擴(kuò)展性較差。

  • 錯(cuò)誤處理

WebClient 提供了更豐富的錯(cuò)誤處理機(jī)制,可以通過(guò) onStatus、onError 等方法來(lái)處理不同的 HTTP 狀態(tài)碼或異常。同時(shí),WebClient 還提供了更靈活的重試和回退策略。Mono.fromFuture() 方法只能將 Future 對(duì)象的結(jié)果包裝在 Mono 中,不提供特定的錯(cuò)誤處理機(jī)制。

  • 阻塞操作

Mono.fromFuture() 會(huì)阻塞。當(dāng)調(diào)用 Mono.fromFuture() 方法將 Future 轉(zhuǎn)換為 Mono 時(shí),它會(huì)等待 Future 對(duì)象的結(jié)果返回。在這個(gè)等待的過(guò)程中,Mono.fromFuture()方法會(huì)阻塞當(dāng)前的線程。這意味著,如果 Future 的結(jié)果在運(yùn)行過(guò)程中沒(méi)有返回,則當(dāng)前線程會(huì)一直阻塞,直到 Future 對(duì)象返回結(jié)果或者超時(shí)。因此,在使用 Mono.fromFuture() 時(shí)需要注意潛在的阻塞風(fēng)險(xiǎn)。另外,需要確保F uture 的任務(wù)在后臺(tái)線程中執(zhí)行,以免阻塞應(yīng)用程序的主線程。

1.2 Mono.fromFuture VS Mono.fromSupplier

Mono.fromSupplier() 和 Mono.fromFuture() 都是用于將異步執(zhí)行的操作轉(zhuǎn)換為響應(yīng)式的 Mono 對(duì)象,但它們的區(qū)別在于:

Mono.fromSupplier() 適用于一個(gè)提供者/生產(chǎn)者,可以用來(lái)表示某個(gè)操作的結(jié)果,該操作是一些純計(jì)算并且沒(méi)有阻塞的方法。也就是說(shuō),Mono.fromSupplier() 將其參數(shù) (Supplier) 所提供的操作異步執(zhí)行,并將其結(jié)果打包成一個(gè) Mono 對(duì)象。

Mono.fromFuture() 適用于一個(gè) java.util.concurrent.Future 對(duì)象,將其封裝成 Mono 對(duì)象。這意味著調(diào)用 Mono.fromFuture() 方法將阻塞當(dāng)前線程,直到異步操作完成返回一個(gè) Future 對(duì)象。

因此,Mono.fromSupplier() 與 Mono.fromFuture() 的主要區(qū)別在于:

Mono.fromSupplier() 是一個(gè)非阻塞的操作,不會(huì)阻塞當(dāng)前線程。這個(gè)方法用于執(zhí)行計(jì)算型的任務(wù),返回一個(gè)封裝了計(jì)算結(jié)果的 Mono 對(duì)象。
Mono.fromFuture() 是阻塞操作,會(huì)阻塞當(dāng)前線程,直到異步操作完畢并返回看,它適用于處理 java.util.concurrent.Future 對(duì)象。

需要注意的是,如果 Supplier 提供的操作是阻塞的,則 Mono.fromSupplier() 方法本身也會(huì)阻塞線程。但通常情況下,Supplier 提供的操作是純計(jì)算型的,不會(huì)阻塞線程。

因此,可以使用 Mono.fromSupplier() 方法將一個(gè)純計(jì)算型的操作轉(zhuǎn)換為 Mono 對(duì)象,而將一個(gè)異步返回結(jié)果的操作轉(zhuǎn)換為 Mono 對(duì)象時(shí),可以使用 Mono.fromFuture() 方法。

2 定制化自己的 WebClient

2.1 初始化 WebClient

WebClient 支持建造者模式,使用 WebClient 建造者模式支持開(kāi)發(fā)自己的個(gè)性化 WebClient,比如支持設(shè)置接口調(diào)用統(tǒng)一耗時(shí)、自定義底層 Http 客戶端、調(diào)用鏈路、打印接口返回日志、監(jiān)控接口耗時(shí)等等。

WebClient builder 支持以下方法

interface Builder {
        /**
         * 配置請(qǐng)求基礎(chǔ)的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 沖突,如果有 uriBuilderFactory ,則忽略 baseUrl
         */
        Builder baseUrl(String baseUrl);
        /**
         * URI 請(qǐng)求的默認(rèn)變量。也和 uriBuilderFactory 沖突,如果有 uriBuilderFactory ,則忽略 defaultUriVariables
         */
        Builder defaultUriVariables(Map<String, ?> defaultUriVariables);
        /**
         * 提供一個(gè)預(yù)配置的UriBuilderFactory實(shí)例
         */
        Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);
        /**
         * 默認(rèn) header
         */
        Builder defaultHeader(String header, String... values);
        /**
         * 默認(rèn)cookie
         */
        Builder defaultCookie(String cookie, String... values);
        /**
         * 提供一個(gè) consumer 來(lái)定制每個(gè)請(qǐng)求
         */
        Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);
        /**
         * 添加一個(gè)filter,可以添加多個(gè)
         */
        Builder filter(ExchangeFilterFunction filter);
        /**
         * 配置要使用的 ClientHttpConnector。這對(duì)于插入或自定義底層HTTP 客戶端庫(kù)(例如SSL)的選項(xiàng)非常有用。
         */
        Builder clientConnector(ClientHttpConnector connector);
        /**
         * Configure the codecs for the {@code WebClient} in the
         * {@link #exchangeStrategies(ExchangeStrategies) underlying}
         * {@code ExchangeStrategies}.
         * @param configurer the configurer to apply
         * @since 5.1.13
         */
        Builder codecs(Consumer<ClientCodecConfigurer> configurer);
        /**
         * 提供一個(gè)預(yù)先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。
這是對(duì) clientConnector 的一種替代,并且有效地覆蓋了它們。
         */
        Builder exchangeFunction(ExchangeFunction exchangeFunction);
        /**
         * Builder the {@link WebClient} instance.
         */
        WebClient build();
  // 其他方法
    }

2.2 日志打印及監(jiān)控

  • 打印參數(shù)、url、返回
  • 參數(shù)和返回需要轉(zhuǎn)成json
  • 需要打印正常返回日志和異常
  • 正常監(jiān)控、異常監(jiān)控、總監(jiān)控以及響應(yīng)時(shí)間
.doOnSuccess(response-> {
    log.info("get.success, url={}, response={}, param={}", url, response);
})
.doOnError(error-> {
    log.info("get.error, url={}", url, error);
    // 監(jiān)控
})
.doFinally(res-> {
  //監(jiān)控
})

2.3 返回處理

retrieve() // 聲明如何提取響應(yīng)。例如,提取一個(gè)ResponseEntity的狀態(tài),頭部和身體:

.bodyToMono(clazz) 將返回body內(nèi)容轉(zhuǎn)成clazz對(duì)象,clazz 對(duì)象可以自己指定類(lèi)型。如果碰到有問(wèn)題的無(wú)法轉(zhuǎn)化的,也可以先轉(zhuǎn)成String,然后自己實(shí)現(xiàn)一個(gè)工具類(lèi),將String轉(zhuǎn)成 class 對(duì)象。

2.3.1 get

public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
return webClient.get()
        .uri(url)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("get.success, url={}, response={}, param={}", url, response);
        })
        .doOnError(error-> {
            log.info("get.param.error, url={}", url, error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        })
        .publishOn(customScheduler);
}

2.3.2 get param 請(qǐng)求

public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
URI uri = UriComponentsBuilder.fromUriString(url)
        .queryParams(param)
        .build()
        .toUri();
return webClient.get()
        .uri(uri)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
        })
        .doOnError(error-> {
            log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        // 監(jiān)控 or 打印日志 or 耗時(shí)
        })
        .publishOn(customScheduler);
}

2.3.3 post json 請(qǐng)求

public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
final long start = System.currentTimeMillis();
return webClient.post()
        .uri(url)
        .contentType(MediaType.APPLICATION_JSON)
        .cookies(cookies -> cookies.setAll(parameter.getCookies()))
        .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
        .headers(headers -> headers.setAll(parameter.getHeaders()))
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
        })
        .doOnError(error-> {
            log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        })
        .publishOn(customScheduler);
}

2.3.4 post form Data 請(qǐng)求

public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {
    final long start = System.currentTimeMillis();
    return webClient.post()
            .uri(url)
            .contentType(MediaType.APPLICATION_FORM_URLENCODED)
            .cookies(cookies -> cookies.setAll(parameter.getCookies()))
            .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
            .headers(headers -> headers.setAll(parameter.getMapHeaders()))
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(clazz)
            .doOnSuccess(response-> {
                log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
            })
            .doOnError(error-> {
                log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
            })
            .onErrorReturn(defaultClass)
            .doFinally(res-> {
            })
            .publishOn(customScheduler);
}

2.4 異常處理

異常返回兜底

onErrorReturn 發(fā)現(xiàn)異常返回兜底數(shù)據(jù)

異常處理

狀態(tài)碼轉(zhuǎn)成異常拋出

.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))

監(jiān)控異常

.doOnError(error -> {
    // log and monitor
})

3 完整的 WebClient

package com.geniu.reactor.webclient;
import com.geniu.utils.JsonUtil;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;
import java.net.URI;
import java.time.Duration;
import java.util.function.Function;
/**
 * @Author: prepared
 * @Date: 2023/8/15 11:05
 */
@Slf4j
public class CustomerWebClient {
    public static final CustomerWebClient instance = new CustomerWebClient();
    /**
     * 限制并發(fā)數(shù) 100
     */
    Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100);
    private final WebClient webClient;
    private CustomerWebClient() {
        final SslContextBuilder sslBuilder = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE);
        final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder)
                .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();
        final int cpuCores = Runtime.getRuntime().availableProcessors();
        final int selectorCount = Math.max(cpuCores / 2, 4);
        final int workerCount = Math.max(cpuCores * 2, 8);
        final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true);
        final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_TIMEOUT, 10000)
                .secure(ssl)
                .runOn(pool);
        ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider
                .builder("HttpClientOfSWC")
                .maxConnections(100_000)
                .pendingAcquireTimeout(Duration.ofSeconds(6));
        final ConnectionProvider connectionProvider = httpClientOfSWC.build();
        final HttpClient hc = HttpClient.create(connectionProvider)
                .tcpConfiguration(tcpMapper);
        final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc
                .compress(true);
        final WebClient.Builder wcb = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc)));
//                .filter(new TraceRequestFilter()); 可以通過(guò)Filter 增加trace追蹤
        this.webClient = wcb.build();
    }
    public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
        long start = System.currentTimeMillis();
        return webClient.get()
                .uri(url)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("get.success, url={}, response={}, param={}", url, response);
                })
                .doOnError(error-> {
                    log.info("get.param.error, url={}", url, error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
    public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
        long start = System.currentTimeMillis();
        URI uri = UriComponentsBuilder.fromUriString(url)
                .queryParams(param)
                .build()
                .toUri();
        return webClient.get()
                .uri(uri)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
                })
                .doOnError(error-> {
                    log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
    public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
        final long start = System.currentTimeMillis();
        return webClient.post()
                .uri(url)
                .contentType(MediaType.APPLICATION_JSON)
                .cookies(cookies -> cookies.setAll(parameter.getCookies()))
                .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
                .headers(headers -> headers.setAll(parameter.getHeaders()))
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
                })
                .doOnError(error-> {
                    log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
    public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {
        final long start = System.currentTimeMillis();
        return webClient.post()
                .uri(url)
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .cookies(cookies -> cookies.setAll(parameter.getCookies()))
                .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
                .headers(headers -> headers.setAll(parameter.getMapHeaders()))
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
                })
                .doOnError(error-> {
                    log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
}

以上就是Reactor定制一個(gè)生產(chǎn)的WebClient實(shí)現(xiàn)示例的詳細(xì)內(nèi)容,更多關(guān)于Reactor定制生產(chǎn)WebClient的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論