Reactor定制一個生產的WebClient實現示例
1 為什么要用 WebClient
剛開始嘗試使用 Spring WebFlux 的時候,很多人都會使用 Mono.fromFuture() 將異步請求轉成 Mono 對象,或者 Mono.fromSupplier() 將請求轉成 MOno 對象,這兩種方式在響應式編程中都是不建議的,都會阻塞當前線程。
1.1 Mono.fromFuture() VS WebClient
Mono.fromFuture()方法和使用 WebClient 調用第三方接口之間存在以下區(qū)別:
- 異步 vs. 非阻塞
Mono.fromFuture()方法適用于接收一個 java.util.concurrent.Future 對象,并將其轉換為響應式的 Mono。這是一個阻塞操作,因為它會等待 Future 對象完成。而使用 WebClient 調用第三方接口是異步和非阻塞的,它不會直接阻塞應用程序的執(zhí)行,而是使用事件驅動的方式處理響應。
可擴展性和靈活性:使用 WebClient 可以更靈活地進行配置和處理,例如設置超時時間、請求頭、重試機制等。WebClient 還可以與許多其他 Spring WebFlux 組件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是適用于單個 Future 對象轉化為 Mono 的情況,可擴展性較差。
- 錯誤處理
WebClient 提供了更豐富的錯誤處理機制,可以通過 onStatus、onError 等方法來處理不同的 HTTP 狀態(tài)碼或異常。同時,WebClient 還提供了更靈活的重試和回退策略。Mono.fromFuture() 方法只能將 Future 對象的結果包裝在 Mono 中,不提供特定的錯誤處理機制。
- 阻塞操作
Mono.fromFuture() 會阻塞。當調用 Mono.fromFuture() 方法將 Future 轉換為 Mono 時,它會等待 Future 對象的結果返回。在這個等待的過程中,Mono.fromFuture()方法會阻塞當前的線程。這意味著,如果 Future 的結果在運行過程中沒有返回,則當前線程會一直阻塞,直到 Future 對象返回結果或者超時。因此,在使用 Mono.fromFuture() 時需要注意潛在的阻塞風險。另外,需要確保F uture 的任務在后臺線程中執(zhí)行,以免阻塞應用程序的主線程。
1.2 Mono.fromFuture VS Mono.fromSupplier
Mono.fromSupplier() 和 Mono.fromFuture() 都是用于將異步執(zhí)行的操作轉換為響應式的 Mono 對象,但它們的區(qū)別在于:
Mono.fromSupplier() 適用于一個提供者/生產者,可以用來表示某個操作的結果,該操作是一些純計算并且沒有阻塞的方法。也就是說,Mono.fromSupplier() 將其參數 (Supplier) 所提供的操作異步執(zhí)行,并將其結果打包成一個 Mono 對象。
Mono.fromFuture() 適用于一個 java.util.concurrent.Future 對象,將其封裝成 Mono 對象。這意味著調用 Mono.fromFuture() 方法將阻塞當前線程,直到異步操作完成返回一個 Future 對象。
因此,Mono.fromSupplier() 與 Mono.fromFuture() 的主要區(qū)別在于:
Mono.fromSupplier() 是一個非阻塞的操作,不會阻塞當前線程。這個方法用于執(zhí)行計算型的任務,返回一個封裝了計算結果的 Mono 對象。
Mono.fromFuture() 是阻塞操作,會阻塞當前線程,直到異步操作完畢并返回看,它適用于處理 java.util.concurrent.Future 對象。
需要注意的是,如果 Supplier 提供的操作是阻塞的,則 Mono.fromSupplier() 方法本身也會阻塞線程。但通常情況下,Supplier 提供的操作是純計算型的,不會阻塞線程。
因此,可以使用 Mono.fromSupplier() 方法將一個純計算型的操作轉換為 Mono 對象,而將一個異步返回結果的操作轉換為 Mono 對象時,可以使用 Mono.fromFuture() 方法。
2 定制化自己的 WebClient
2.1 初始化 WebClient
WebClient 支持建造者模式,使用 WebClient 建造者模式支持開發(fā)自己的個性化 WebClient,比如支持設置接口調用統一耗時、自定義底層 Http 客戶端、調用鏈路、打印接口返回日志、監(jiān)控接口耗時等等。
WebClient builder 支持以下方法
interface Builder { /** * 配置請求基礎的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 沖突,如果有 uriBuilderFactory ,則忽略 baseUrl */ Builder baseUrl(String baseUrl); /** * URI 請求的默認變量。也和 uriBuilderFactory 沖突,如果有 uriBuilderFactory ,則忽略 defaultUriVariables */ Builder defaultUriVariables(Map<String, ?> defaultUriVariables); /** * 提供一個預配置的UriBuilderFactory實例 */ Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory); /** * 默認 header */ Builder defaultHeader(String header, String... values); /** * 默認cookie */ Builder defaultCookie(String cookie, String... values); /** * 提供一個 consumer 來定制每個請求 */ Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest); /** * 添加一個filter,可以添加多個 */ Builder filter(ExchangeFilterFunction filter); /** * 配置要使用的 ClientHttpConnector。這對于插入或自定義底層HTTP 客戶端庫(例如SSL)的選項非常有用。 */ Builder clientConnector(ClientHttpConnector connector); /** * Configure the codecs for the {@code WebClient} in the * {@link #exchangeStrategies(ExchangeStrategies) underlying} * {@code ExchangeStrategies}. * @param configurer the configurer to apply * @since 5.1.13 */ Builder codecs(Consumer<ClientCodecConfigurer> configurer); /** * 提供一個預先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。 這是對 clientConnector 的一種替代,并且有效地覆蓋了它們。 */ Builder exchangeFunction(ExchangeFunction exchangeFunction); /** * Builder the {@link WebClient} instance. */ WebClient build(); // 其他方法 }
2.2 日志打印及監(jiān)控
- 打印參數、url、返回
- 參數和返回需要轉成json
- 需要打印正常返回日志和異常
- 正常監(jiān)控、異常監(jiān)控、總監(jiān)控以及響應時間
.doOnSuccess(response-> { log.info("get.success, url={}, response={}, param={}", url, response); }) .doOnError(error-> { log.info("get.error, url={}", url, error); // 監(jiān)控 }) .doFinally(res-> { //監(jiān)控 })
2.3 返回處理
retrieve() // 聲明如何提取響應。例如,提取一個ResponseEntity的狀態(tài),頭部和身體:
.bodyToMono(clazz) 將返回body內容轉成clazz對象,clazz 對象可以自己指定類型。如果碰到有問題的無法轉化的,也可以先轉成String,然后自己實現一個工具類,將String轉成 class 對象。
2.3.1 get
public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.success, url={}, response={}, param={}", url, response); }) .doOnError(error-> { log.info("get.param.error, url={}", url, error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); }
2.3.2 get param 請求
public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); URI uri = UriComponentsBuilder.fromUriString(url) .queryParams(param) .build() .toUri(); return webClient.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param)); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { // 監(jiān)控 or 打印日志 or 耗時 }) .publishOn(customScheduler); }
2.3.3 post json 請求
public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())) .headers(headers -> headers.setAll(parameter.getHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody()); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); }
2.3.4 post form Data 請求
public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())) .headers(headers -> headers.setAll(parameter.getMapHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter)); }) .doOnError(error-> { log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); }
2.4 異常處理
異常返回兜底
onErrorReturn 發(fā)現異常返回兜底數據
異常處理
狀態(tài)碼轉成異常拋出
.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))
監(jiān)控異常
.doOnError(error -> { // log and monitor })
3 完整的 WebClient
package com.geniu.reactor.webclient; import com.geniu.utils.JsonUtil; import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpClient; import java.net.URI; import java.time.Duration; import java.util.function.Function; /** * @Author: prepared * @Date: 2023/8/15 11:05 */ @Slf4j public class CustomerWebClient { public static final CustomerWebClient instance = new CustomerWebClient(); /** * 限制并發(fā)數 100 */ Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100); private final WebClient webClient; private CustomerWebClient() { final SslContextBuilder sslBuilder = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE); final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder) .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build(); final int cpuCores = Runtime.getRuntime().availableProcessors(); final int selectorCount = Math.max(cpuCores / 2, 4); final int workerCount = Math.max(cpuCores * 2, 8); final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true); final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_TIMEOUT, 10000) .secure(ssl) .runOn(pool); ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider .builder("HttpClientOfSWC") .maxConnections(100_000) .pendingAcquireTimeout(Duration.ofSeconds(6)); final ConnectionProvider connectionProvider = httpClientOfSWC.build(); final HttpClient hc = HttpClient.create(connectionProvider) .tcpConfiguration(tcpMapper); final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc .compress(true); final WebClient.Builder wcb = WebClient.builder() .clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc))); // .filter(new TraceRequestFilter()); 可以通過Filter 增加trace追蹤 this.webClient = wcb.build(); } public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode()))) .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.success, url={}, response={}, param={}", url, response); }) .doOnError(error-> { log.info("get.param.error, url={}", url, error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); URI uri = UriComponentsBuilder.fromUriString(url) .queryParams(param) .build() .toUri(); return webClient.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param)); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())) .headers(headers -> headers.setAll(parameter.getHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody()); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())) .headers(headers -> headers.setAll(parameter.getMapHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter)); }) .doOnError(error-> { log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } }
以上就是Reactor定制一個生產的WebClient實現示例的詳細內容,更多關于Reactor定制生產WebClient的資料請關注腳本之家其它相關文章!
相關文章
IDEA中WebService生成Java代碼并調用外部接口實現代碼
這篇文章主要介紹了IDEA中WebService生成Java代碼并調用外部接口實現,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05Java接收text/event-stream格式數據的詳細代碼
這篇文章主要介紹了java接收text/event-stream格式數據,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07Java如果在try里面執(zhí)行return還會不會執(zhí)行finally
這篇文章主要介紹了Java如果在try里面執(zhí)行return,那么還會不會執(zhí)行finally,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01