詳解Reactor中Context的用法
在響應(yīng)式編程中,多線程異步性成為天然的內(nèi)在,多線程之間的切換也成為原生的,在處理一個(gè)數(shù)據(jù)流Flux/Mono時(shí),基本無法知道是運(yùn)行在哪個(gè)線程上或哪個(gè)線程池里,可以說,每一個(gè)操作符operator以及內(nèi)部的函數(shù)都可能運(yùn)行在不同的線程上。這就意味著,以前用ThreadLocal來作為方法間透明傳遞共享變量的方式不再行得通。為此,Reactor提供了Context來替代ThreadLocal實(shí)現(xiàn)一個(gè)跨線程的共享變量的透明方式。
本文會(huì)從以下幾個(gè)方面來介紹Context的相關(guān)知識(shí):
- context的基本用法
- 從源碼上解讀context的用法
- 用log的MDC案例介紹如何用context實(shí)現(xiàn)與threadlocal的橋接
- 總結(jié)下context以及目前的一些局限性
一、使用介紹
static String KEY = "TEST_CONTEXT_KEY"; static String KEY2 = "TEST_CONTEXT_KEY2"; public static void main(String[] args) { Flux<String> flux = convert("hello", Flux.just(1, 2, 3)); flux .subscriberContext(Context.of(KEY, "Outside")) .subscribe(v -> System.out.println(v)); } public static Flux<String> convert(String prefix, Flux<Integer> publisher) { return publisher.map(v -> prefix + " " + v) .subscriberContext(Context.of(KEY, "NotUsed")) .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)) .subscriberContext(context -> context.put(KEY2, "Inside")) .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + v)); }
上面是context的使用方案介紹,其輸出如下:
Outside Outside Inside hello 1
Outside Outside Inside hello 2
Outside Outside Inside hello 3
上面的使用案例展示了一個(gè)使用context的常見例子。通過在外部方法里傳入context,如flux.subscriberContext(Context.of(KEY, "Outside"))
,使得內(nèi)部方法convert能夠獲取外界環(huán)境的context,同時(shí)內(nèi)部方法還可以增加自己的context數(shù)據(jù),如subscriberContext(context -> context.put(KEY2, "Inside"))
,結(jié)合之后,在讓內(nèi)部的方法(flatMap里的方法)感知到整個(gè)上下文context的數(shù)據(jù)內(nèi)容。
對于context的使用,主要分為幾個(gè)部分: 1. context的創(chuàng)建 2. context的寫入(傳入)與讀取 3. 執(zhí)行順序
1. context —— 不可變對象
由于reactor天然是跨線程的,所以context設(shè)計(jì)為了不可變的對象,即每次的更新都是創(chuàng)建一個(gè)新的對象。每次的put/putAll操作,都是先把舊對象的值復(fù)制到新對象,然后再進(jìn)行put/putAll等更新操作。
2. context的寫入與讀取
context寫入是使用subscriberContext方法,其入?yún)⒂袃煞N形式:傳值方式subscriberContext(ctx)與lambda函數(shù)方式 —— subscriberContext(ctx -> ctx.put(key,value))。
context的讀取是利用Mono的靜態(tài)方法subscriberContext()來獲取,由于其返回的是一個(gè)Mono, 所以通常與flatMap結(jié)合使用。
3. 執(zhí)行順序
context的傳入是發(fā)生在subscribe()訂閱階段的,所以其寫入的順序是從下往上的,即在示例中,先執(zhí)行subscriberContext(Context.of(KEY, "Outside"))
,再執(zhí)行subscriberContext(context -> context.put(KEY2, "Inside"))
, 最后執(zhí)行subscriberContext(Context.of(KEY, "NotUsed"))
。在訂閱階段執(zhí)行完后,進(jìn)入運(yùn)行階段,數(shù)據(jù)流從上往下執(zhí)行,每次讀取context的時(shí)候Mono.subscriberContext()
都是讀取下一個(gè)的context。所以"NotUsed"的context并沒有生效。
此外,context.put()操作是復(fù)制舊的再update新的對象,所以Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)
這個(gè)階段仍能讀取前一個(gè)context關(guān)于KEY的內(nèi)容。
總結(jié)
- context是不可變對象,每次更新都是新的context
- context是存在于subscriber的內(nèi)部的,一個(gè)context是綁定在當(dāng)前subscriber上的,如
FluxContextStart
的對象 - context的寫入順序是從下而上的,讀取的時(shí)候是從上而下的,只能讀取之后的subscriber里的context。
- 每個(gè)subscriber中的context都是獨(dú)有的,運(yùn)行階段的時(shí)候,無法改變其他subscriber的context。
注意
subscriberContext(Context.of("Outside")
與subscriberContext(context -> Context.of("Outside"))
是有區(qū)別,前者是會(huì)結(jié)合復(fù)用前面的context,而后者是直接返回一個(gè)新的context并不會(huì)復(fù)用前面的context。 其原因是,subscriberContext(Context.of("Outside"))
其實(shí)內(nèi)部調(diào)用的是subscriberContext(context -> context.putAll(Context.of("Outside"))
,其入?yún)⒌腸ontext就是前面的context,putAll方法會(huì)復(fù)用前面的context。而 subscriberContext(context -> Context.of("Outside"))不復(fù)用的原因就是因?yàn)榉艞壛巳雲(yún)⒌腸ontext。所以,可以利用這種方式來放棄之前的context,當(dāng)然不鼓勵(lì)這么做,因?yàn)槟悴磺宄癱ontext會(huì)不會(huì)影響后續(xù)的程序。
本文章的代碼用的事reactor 3.3的版本,自3.5之后,subscriberContext方法改為contextWrite
,讀取的方法改為deferContextual
。
二、源碼解讀
現(xiàn)在我們從源代碼上看看,context寫入為什么是自下而上的,讀取的時(shí)候又是依附于下一個(gè)subscriber并且自上而下的。
public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) { return new FluxContextStart<>(this, doOnContext); } FluxContextStart(Flux<? extends T> source, Function<Context, Context> doOnContext) { super(source); this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext"); } @Override public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) { Context c = doOnContext.apply(actual.currentContext()); return new ContextStartSubscriber<>(actual, c); } ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) { this.actual = actual; this.context = context; if (actual instanceof ConditionalSubscriber) { this.actualConditional = (ConditionalSubscriber<? super T>) actual; } else { this.actualConditional = null; } } @Override public Context currentContext() { return this.context; }
上面截取了subscriberContext方法的源代碼,可以看到subscriberContext方法最終會(huì)創(chuàng)建ContextStartSubscriber的對象,并將生成的context賦值Context c = doOnContext.apply(actual.currentContext())
,所以context是伴隨subscriberContext方法對應(yīng)的subscriber里的。
由于context賦值操作Context c = doOnContext.apply(actual.currentContext())
是發(fā)生在subscribeOrReturn方法里,即發(fā)生在subscribe()訂閱階段,所以整個(gè)執(zhí)行的順序是自下而上的(沿著整個(gè)flow自下而上至源頭的publisher)。
那讀取context的時(shí)候?yàn)槭裁词亲陨隙碌哪??我們來看下讀取操作Mono.subscribeContext()的源碼。
public static Mono<Context> subscriberContext() { return onAssembly(MonoCurrentContext.INSTANCE); } final class MonoCurrentContext extends Mono<Context> implements Fuseable, Scannable { static final MonoCurrentContext INSTANCE = new MonoCurrentContext(); public void subscribe(CoreSubscriber<? super Context> actual) { Context ctx = actual.currentContext(); actual.onSubscribe(Operators.scalarSubscription(actual, ctx)); } } interface InnerOperator<I, O> extends InnerConsumer<I>, InnerProducer<O> { @Override default Context currentContext() { return actual().currentContext(); } }
Mono.subscribeContext()
方法返回的是一個(gè)MonoCurrentContext的靜態(tài)對象,在訂閱subscribe時(shí)期,就會(huì)去讀取當(dāng)前的context,即Context ctx = actual.currentContext()
。而對于一個(gè)InnerOperator的接口而言,其currentContext()方法會(huì)不斷尋找下一個(gè)subscriber的context,即 actual().currentContext()
,直到有哪個(gè)subscriber覆寫了currentContext方法,如先前的ContextStartSubscriber對象。對于InnerOperator接口,是大多數(shù)subscriber都會(huì)實(shí)現(xiàn)的接口,例如map、filter、flatmap這些,都會(huì)實(shí)現(xiàn)這個(gè)接口。
在找到context之后,通過Operators.scalarSubscription(actual, ctx)
寫入,這個(gè)方法其實(shí)也是Mono.just()的實(shí)現(xiàn),所以相當(dāng)于把context當(dāng)做value,生成了一個(gè)Mono.just(ctx)來完成了context讀取。
所以,context讀取的是從當(dāng)前操作operator之后的那個(gè)最接近的subscriber的context。這也解釋了前面使用案例中,subscriberContext(Context.of(KEY, "NotUsed"))
,沒有作用的緣故。
三、如何橋接現(xiàn)有的ThreadLocal系統(tǒng)
雖然reactor提供了context來替代ThreadLocal的使用,但目前大多數(shù)的代碼庫仍然是命令式編程的,使用的方式仍然是基于ThreadLocal的,如Logger里的MDC。本小節(jié)以Logger中的MDC來介紹,如何利用context實(shí)現(xiàn)與舊系統(tǒng)中的基于ThreadLocal方式的打通。
我們假設(shè)有這樣的一個(gè)場景,每一次的Http請求都有一個(gè)trace id,我們稱為request id,并通過Http Header "X-Request-Id"來命名,打印日志的時(shí)候,希望每條日志里都包含請求id,這樣方便跟蹤整個(gè)請求鏈路的情況。
為此,我們把日志配置里的pattern設(shè)置為:[%X{X-Request-Id}] [%thread] %-5level - %msg %n
。
可以在SpringBoot
的application.yml
里設(shè)置,如:
logging.pattern.level: "[%X{X-Request-Id}] [%thread] %-5level - %msg %n"
因此,要使得每條日志里有request id,那就必須要MDC里有key為X-Request-Id
的內(nèi)容。下面來看下,reactor中是如何實(shí)現(xiàn)的。
@SpringBootApplication @Slf4j @RestController public class MdcApplication { public static void main(String[] args) { SpringApplication.run(MdcApplication.class, args); } private final static String X_REQUEST_ID_KEY = "X-Request-Id"; @GetMapping("/") Flux<String> split(@RequestParam("value") String value, @RequestHeader(X_REQUEST_ID_KEY) String requestId) { return Flux.fromArray(value.split("_")) .doOnEach(logWithContext(ch -> log.info("handling one item: {}", ch))) .subscriberContext(Context.of(X_REQUEST_ID_KEY, requestId)); } private static <T> Consumer<Signal<T>> logWithContext(Consumer<T> logStatement) { return signal -> { if (!signal.isOnNext()) { return; } String requestId = signal.getContext().get(X_REQUEST_ID_KEY); try (MDC.MDCCloseable closeable = MDC.putCloseable(X_REQUEST_ID_KEY, requestId)) { logStatement.accept(signal.get()); } }; } }
這是一個(gè)簡單的示例程序,對于請求輸入的value值通過"-"分割后,再一個(gè)個(gè)返回給客戶端。首先利用subscriberContext方法,將http header里的X-Request-Id
作為context來傳入。然后利用doOnEach的方式獲取signal。doOnEach的方法可以工作在onNext、onComplete、onError等所有事件,每一個(gè)信號signal里都包含有context,當(dāng)為onNext則還包含value值,當(dāng)為onError時(shí),則還包含有exception。因此可以通過signal來獲取context。
在從context獲取X-Request-Id后,可以利用try-with-resource方式來更新MDC,其效果是在執(zhí)行完try里面的程序后,將更新的value回退。等價(jià)于:
try { MDC.put(X_REQUEST_ID_KEY, requestId); logStatement.accept(signal.get()); } finally { MDC.remove(X_REQUEST_ID_KEY); }
置于為什么需要操作完之后回退掉MDC中的更新,那是因?yàn)閞eactor中所有的操作都是異步執(zhí)行在不同線程中的,如果不回退的話,很有可能造成污染,其原因還是MDC內(nèi)部是用ThreadLocal實(shí)現(xiàn)的,所以跨線程的時(shí)候,如果不把ThreadLocal值清理干凈,很容易造成互相污染。
用curl命令發(fā)送請求:curl --header "X-Request-Id:12345" localhost:8080?value=a_b_c
,返回的結(jié)果是abc
,打印的日志如下:
[12345] [reactor-http-nio-2] INFO - handling one item: a
[12345] [reactor-http-nio-2] INFO - handling one item: b
[12345] [reactor-http-nio-2] INFO - handling one item: c
其中12345
就是從context里獲取到的request id。
如果想要將request id繼續(xù)貫穿后續(xù)請求流程,如請求第三方服務(wù),可以在用webClient發(fā)送請求的時(shí)候,把request id作為header加入到它的request請求里,如:
Mono.subscriberContext().map(ctx -> { RequestHeadersSpec<?> request = webClient.get().uri(uri); request = request.header("X-Request-ID", ctx.get(X_REQUEST_ID_KEY)); // The rest of your request logic... });
四、總結(jié)
本文介紹了reactor中context的概念,并用代碼示例的方式介紹了如何使用。再然后,通過源碼的解讀來加深對context使用規(guī)則的理解:自下而上的context寫入,以及與subscriber綁定后的自上而下的讀取。 在這之后,用以傳遞并打印日志中包含request id的一個(gè)實(shí)際例子,來介紹如何使用context與log的MDC一起使用。
雖然reactor自3.1開始提供了context來彌補(bǔ)無法使用ThreadLocal的不足,但與ThreaLocal相比,context仍然有不少局限。比如使用上的不方便,要么利用Mono.subscribeContext().map并搭配flatmap來使用,要么需要將數(shù)據(jù)流轉(zhuǎn)化成信號signal流來使用,總之遠(yuǎn)不如ThreadLocal來的簡單易用。另外,context的不可變特性,雖然有助于thread safe,但使得不同方法之間無法傳遞更新,比如方法A內(nèi)修改后再傳遞給方法B,因?yàn)閏ontext是只讀的,但這在ThreadLocal上卻是輕而易舉就能實(shí)現(xiàn)。
好消息的是,reactor在3.5開始,提供了新的方法deferContextual來簡化context的使用。以及提出了context view的概念來簡化context傳遞問題,感興趣的可以閱讀reactor文檔。
到此這篇關(guān)于詳解Reactor中Context的用法的文章就介紹到這了,更多相關(guān)Reactor Context內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中的CAS鎖機(jī)制(無鎖、自旋鎖、樂觀鎖、輕量級鎖)詳解
這篇文章主要介紹了Java中的CAS鎖機(jī)制(無鎖、自旋鎖、樂觀鎖、輕量級鎖)詳解,CAS算法的作用是解決多線程條件下使用鎖造成性能損耗問題的算法,保證了原子性,這個(gè)原子操作是由CPU來完成的,需要的朋友可以參考下2024-01-01如何使用Java實(shí)現(xiàn)指定概率的抽獎(jiǎng)
這篇文章主要給大家介紹了關(guān)于如何使用Java實(shí)現(xiàn)指定概率的抽獎(jiǎng)的相關(guān)資料,Java抽獎(jiǎng)程序的基本原理是通過隨機(jī)數(shù)生成器來實(shí)現(xiàn)隨機(jī)抽獎(jiǎng)的功能,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07SpringBoot+Netty實(shí)現(xiàn)簡單聊天室的示例代碼
這篇文章主要介紹了如何利用SpringBoot Netty實(shí)現(xiàn)簡單聊天室,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)SpringBoot有一定幫助,感興趣的同學(xué)可以了解一下2022-02-02詳解java如何實(shí)現(xiàn)將數(shù)據(jù)導(dǎo)出為yaml
這篇文章主要為大家詳細(xì)介紹了java如何利用snakeyaml和freemarker實(shí)現(xiàn)將數(shù)據(jù)導(dǎo)出為yaml文件,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下2023-11-11Java IO流學(xué)習(xí)總結(jié)之文件傳輸基礎(chǔ)
這篇文章主要介紹了Java IO流學(xué)習(xí)總結(jié)之文件傳輸基礎(chǔ),文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java io流的小伙伴們有很好的幫助,需要的朋友可以參考下2021-04-04MyBatisPuls多數(shù)據(jù)源操作數(shù)據(jù)源偶爾報(bào)錯(cuò)問題
這篇文章主要介紹了MyBatisPuls多數(shù)據(jù)源操作數(shù)據(jù)源偶爾報(bào)錯(cuò)問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn)
本文主要介紹了Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12Java中l(wèi)ogback?自動(dòng)刷新不生效的問題解決
本文主要介紹了Java中l(wèi)ogback?自動(dòng)刷新不生效的問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05