異步線程traceId如何實(shí)現(xiàn)傳遞
前言
在日常問(wèn)題排查中,我們經(jīng)常在ELK中根據(jù)traceId來(lái)查詢請(qǐng)求的日志鏈路,在同步請(qǐng)求中,根據(jù)traceId一站到底,很爽,那如果是異步請(qǐng)求該如何處理呢?
項(xiàng)目中的異步請(qǐng)求都是結(jié)合線程池來(lái)開啟異步線程,下面結(jié)合slf4j中的MDC和線程池來(lái)實(shí)現(xiàn)異步線程的traceId傳遞。
重寫ThreadPoolTaskExecutor中方法
下面的工具類,分別在Callable和Runnable異步任務(wù)執(zhí)行前通過(guò)MDC.setContextMap(context)設(shè)置請(qǐng)求映射上下文
import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import java.util.Map; import java.util.concurrent.Callable; /** * @desc: 定義MDC工具類,支持Runnable和Callable兩種,目的就是為了把父線程的traceId設(shè)置給子線程 */ public class MdcUtil { public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) { return () -> { if (CollectionUtils.isEmpty(context)) { MDC.clear(); } else { MDC.setContextMap(context); } try { return callable.call(); } finally { // 清除子線程的,避免內(nèi)存溢出,就和ThreadLocal.remove()一個(gè)原因 MDC.clear(); } }; } public static Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { if (CollectionUtils.isEmpty(context)) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { MDC.clear(); } }; } }
下面定義一個(gè)ThreadPoolMdcExecutor 類來(lái)繼承ThreadPoolTaskExecutor 類,重寫execute和submit方法
import java.util.concurrent.Callable; import java.util.concurrent.Future; /** * @desc: 把當(dāng)前的traceId透?jìng)鞯阶泳€程特意加的實(shí)現(xiàn)。 * 重點(diǎn)就是 MDC.getCopyOfContextMap(),此方法獲取當(dāng)前線程(父線程)的traceId */ public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor { @Override public void execute(Runnable task) { super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap())); } }
下面定義線程池,就可以使用ThreadPoolMdcExecutor
@Bean(name = "callBackExecutorConfig") public Executor callBackExecutorConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor(); // 配置核心線程數(shù) executor.setCorePoolSize(10); // 配置最大線程數(shù) executor.setMaxPoolSize(20); // 配置隊(duì)列大小 executor.setQueueCapacity(200); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-Thread-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // abort:在調(diào)用executor執(zhí)行的方法中拋出異常 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 執(zhí)行初始化 executor.initialize(); return executor; }
定義好線程池之后,我們就可以使用callBackExecutorConfig線程池進(jìn)行異步任務(wù),避免異步線程中的traceId丟失。
線程池增強(qiáng)
上面是通過(guò)繼承ThreadPoolTaskExecutor來(lái),重寫execute和submit方法,設(shè)置MDC.setContextMap(context)設(shè)置上下文,我們也可以通過(guò)實(shí)現(xiàn)TaskDecorator 接口來(lái)增強(qiáng)線程池
public class ContextTransferTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { Map<String, String> context = MDC.getCopyOfContextMap(); RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); return () -> { try { MDC.setContextMap(context); RequestContextHolder.setRequestAttributes(requestAttributes); runnable.run(); } finally { MDC.clear(); RequestContextHolder.resetRequestAttributes(); } }; } }
接下來(lái),定義線程池,對(duì)線程池進(jìn)行增強(qiáng)
@Bean(name = "callBackExecutorConfig") public Executor callBackExecutorConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); // 配置核心線程數(shù) executor.setCorePoolSize(10); // 配置最大線程數(shù) executor.setMaxPoolSize(20); // 配置隊(duì)列大小 executor.setQueueCapacity(200); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-Thread-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // abort:在調(diào)用executor執(zhí)行的方法中拋出異常 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //線程池增強(qiáng) threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator()); // 執(zhí)行初始化 executor.initialize(); return executor; }
總結(jié)
上面兩種方式其實(shí)本質(zhì)都是通過(guò)Mdc來(lái)進(jìn)行異步線程間的traceId同步,可以看下Mdc的源碼,最終還是通過(guò)InheritableThreadLocal來(lái)實(shí)現(xiàn)子線程獲取父線程信息
public class BasicMDCAdapter implements MDCAdapter { private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal = new InheritableThreadLocal<Map<String, String>>() { protected Map<String, String> childValue(Map<String, String> parentValue) { return parentValue == null ? null : new HashMap(parentValue); } }; //省略若干 ...... }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java:程序包org.apache.ibatis.annotations不存在報(bào)錯(cuò)解決
這篇文章主要給大家介紹了關(guān)于java:程序包org.apache.ibatis.annotations不存在報(bào)錯(cuò)的解決方法,這個(gè)錯(cuò)誤是我在直接導(dǎo)入springboot項(xiàng)目的時(shí)候報(bào)錯(cuò)的,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-04-04Java靜態(tài)static與實(shí)例instance方法示例
這篇文章主要為大家介紹了Java靜態(tài)static與實(shí)例instance方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫
這篇文章主要介紹了jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java中Cron表達(dá)式的生成解析及計(jì)算的工具類完整代碼
這篇文章主要給大家介紹了關(guān)于Java中Cron表達(dá)式的生成解析及計(jì)算工具類的相關(guān)資料,Cron表達(dá)式是一個(gè)字符串,字符串空格分割,每一個(gè)域代表一個(gè)含義,一個(gè)cron表達(dá)式有至少6個(gè),需要的朋友可以參考下2023-12-12