異步線(xiàn)程traceId如何實(shí)現(xiàn)傳遞
前言
在日常問(wèn)題排查中,我們經(jīng)常在ELK中根據(jù)traceId來(lái)查詢(xún)請(qǐng)求的日志鏈路,在同步請(qǐng)求中,根據(jù)traceId一站到底,很爽,那如果是異步請(qǐng)求該如何處理呢?
項(xiàng)目中的異步請(qǐng)求都是結(jié)合線(xiàn)程池來(lái)開(kāi)啟異步線(xiàn)程,下面結(jié)合slf4j中的MDC和線(xiàn)程池來(lái)實(shí)現(xiàn)異步線(xiàn)程的traceId傳遞。
重寫(xiě)ThreadPoolTaskExecutor中方法
下面的工具類(lèi),分別在Callable和Runnable異步任務(wù)執(zhí)行前通過(guò)MDC.setContextMap(context)設(shè)置請(qǐng)求映射上下文
import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import java.util.Map; import java.util.concurrent.Callable; /** * @desc: 定義MDC工具類(lèi),支持Runnable和Callable兩種,目的就是為了把父線(xiàn)程的traceId設(shè)置給子線(xiàn)程 */ public class MdcUtil { public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) { return () -> { if (CollectionUtils.isEmpty(context)) { MDC.clear(); } else { MDC.setContextMap(context); } try { return callable.call(); } finally { // 清除子線(xiàn)程的,避免內(nèi)存溢出,就和ThreadLocal.remove()一個(gè)原因 MDC.clear(); } }; } public static Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { if (CollectionUtils.isEmpty(context)) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { MDC.clear(); } }; } }
下面定義一個(gè)ThreadPoolMdcExecutor 類(lèi)來(lái)繼承ThreadPoolTaskExecutor 類(lèi),重寫(xiě)execute和submit方法
import java.util.concurrent.Callable; import java.util.concurrent.Future; /** * @desc: 把當(dāng)前的traceId透?jìng)鞯阶泳€(xiàn)程特意加的實(shí)現(xiàn)。 * 重點(diǎn)就是 MDC.getCopyOfContextMap(),此方法獲取當(dāng)前線(xiàn)程(父線(xiàn)程)的traceId */ public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor { @Override public void execute(Runnable task) { super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } }
下面定義線(xiàn)程池,就可以使用ThreadPoolMdcExecutor
@Bean(name = "callBackExecutorConfig") public Executor callBackExecutorConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor(); // 配置核心線(xiàn)程數(shù) executor.setCorePoolSize(10); // 配置最大線(xiàn)程數(shù) executor.setMaxPoolSize(20); // 配置隊(duì)列大小 executor.setQueueCapacity(200); // 配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴 executor.setThreadNamePrefix("async-Thread-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // abort:在調(diào)用executor執(zhí)行的方法中拋出異常 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 執(zhí)行初始化 executor.initialize(); return executor; }
定義好線(xiàn)程池之后,我們就可以使用callBackExecutorConfig線(xiàn)程池進(jìn)行異步任務(wù),避免異步線(xiàn)程中的traceId丟失。
線(xiàn)程池增強(qiáng)
上面是通過(guò)繼承ThreadPoolTaskExecutor來(lái),重寫(xiě)execute和submit方法,設(shè)置MDC.setContextMap(context)設(shè)置上下文,我們也可以通過(guò)實(shí)現(xiàn)TaskDecorator 接口來(lái)增強(qiáng)線(xiàn)程池
public class ContextTransferTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { Map<String, String> context = MDC.getCopyOfContextMap(); RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); return () -> { try { MDC.setContextMap(context); RequestContextHolder.setRequestAttributes(requestAttributes); runnable.run(); } finally { MDC.clear(); RequestContextHolder.resetRequestAttributes(); } }; } }
接下來(lái),定義線(xiàn)程池,對(duì)線(xiàn)程池進(jìn)行增強(qiáng)
@Bean(name = "callBackExecutorConfig") public Executor callBackExecutorConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); // 配置核心線(xiàn)程數(shù) executor.setCorePoolSize(10); // 配置最大線(xiàn)程數(shù) executor.setMaxPoolSize(20); // 配置隊(duì)列大小 executor.setQueueCapacity(200); // 配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴 executor.setThreadNamePrefix("async-Thread-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // abort:在調(diào)用executor執(zhí)行的方法中拋出異常 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //線(xiàn)程池增強(qiáng) threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator()); // 執(zhí)行初始化 executor.initialize(); return executor; }
總結(jié)
上面兩種方式其實(shí)本質(zhì)都是通過(guò)Mdc來(lái)進(jìn)行異步線(xiàn)程間的traceId同步,可以看下Mdc的源碼,最終還是通過(guò)InheritableThreadLocal來(lái)實(shí)現(xiàn)子線(xiàn)程獲取父線(xiàn)程信息
public class BasicMDCAdapter implements MDCAdapter { private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal = new InheritableThreadLocal<Map<String, String>>() { protected Map<String, String> childValue(Map<String, String> parentValue) { return parentValue == null ? null : new HashMap(parentValue); } }; //省略若干 ...... }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java:程序包org.apache.ibatis.annotations不存在報(bào)錯(cuò)解決
這篇文章主要給大家介紹了關(guān)于java:程序包org.apache.ibatis.annotations不存在報(bào)錯(cuò)的解決方法,這個(gè)錯(cuò)誤是我在直接導(dǎo)入springboot項(xiàng)目的時(shí)候報(bào)錯(cuò)的,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-04-04Java靜態(tài)static與實(shí)例instance方法示例
這篇文章主要為大家介紹了Java靜態(tài)static與實(shí)例instance方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08JDK8中新增的原子性操作類(lèi)LongAdder詳解
這篇文章主要給大家介紹了關(guān)于JDK8中新增的原子性操作類(lèi)LongAdder的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-08-08jackson json序列化實(shí)現(xiàn)首字母大寫(xiě),第二個(gè)字母需小寫(xiě)
這篇文章主要介紹了jackson json序列化實(shí)現(xiàn)首字母大寫(xiě),第二個(gè)字母需小寫(xiě)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java中Cron表達(dá)式的生成解析及計(jì)算的工具類(lèi)完整代碼
這篇文章主要給大家介紹了關(guān)于Java中Cron表達(dá)式的生成解析及計(jì)算工具類(lèi)的相關(guān)資料,Cron表達(dá)式是一個(gè)字符串,字符串空格分割,每一個(gè)域代表一個(gè)含義,一個(gè)cron表達(dá)式有至少6個(gè),需要的朋友可以參考下2023-12-12