Java多線(xiàn)程父線(xiàn)程向子線(xiàn)程傳值問(wèn)題及解決
1 背景
在實(shí)際開(kāi)發(fā)過(guò)程中我們需要父子之間傳遞一些數(shù)據(jù),比如用戶(hù)信息,日志異步生成數(shù)據(jù)傳遞等,該文章從5種解決方案解決父子之間數(shù)據(jù)傳遞困擾
2 ThreadLocal+TaskDecorator
用戶(hù)工具類(lèi) UserUtils
/** *使用ThreadLocal存儲(chǔ)共享的數(shù)據(jù)變量,如登錄的用戶(hù)信息 */ public class UserUtils { private static final ThreadLocal<String> userLocal=new ThreadLocal<>(); public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); } public static void clear(){ userLocal.remove(); } }
自定義CustomTaskDecorator
/** * 線(xiàn)程池修飾類(lèi) */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 獲取主線(xiàn)程中的請(qǐng)求信息(我們的用戶(hù)信息也放在里面) String robotId = UserUtils.getUserId(); System.out.println(robotId); return () -> { try { // 將主線(xiàn)程的請(qǐng)求信息,設(shè)置到子線(xiàn)程中 UserUtils.setUserId(robotId); // 執(zhí)行子線(xiàn)程,這一步不要忘了 runnable.run(); } finally { // 線(xiàn)程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 UserUtils.clear(); } }; } }
ExecutorConfig
在原來(lái)的基礎(chǔ)上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可視化運(yùn)行狀態(tài)的線(xiàn)程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線(xiàn)程數(shù) executor.setCorePoolSize(corePoolSize); //配置最大線(xiàn)程數(shù) executor.setMaxPoolSize(maxPoolSize); //配置隊(duì)列大小 executor.setQueueCapacity(queueCapacity); //配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線(xiàn)程中執(zhí)行任務(wù),而是有調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加線(xiàn)程池修飾類(lèi) executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的線(xiàn)程池修飾類(lèi) //executor.setTaskDecorator(new MDCTaskDecorator()); //執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用ThreadLocal方式傳遞 * 帶有返回值 * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync2() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("異步線(xiàn)程執(zhí)行返回結(jié)果......+"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(UserUtils.getUserId()); }
Test2Controller
/** * 使用ThreadLocal+TaskDecorator的方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test2") public String test2() throws InterruptedException, ExecutionException { UserUtils.setUserId("123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync2(); String s = completableFuture.get(); return s; }
3 RequestContextHolder+TaskDecorator
自定義CustomTaskDecorator
/** * 線(xiàn)程池修飾類(lèi) */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 獲取主線(xiàn)程中的請(qǐng)求信息(我們的用戶(hù)信息也放在里面) RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); return () -> { try { // 將主線(xiàn)程的請(qǐng)求信息,設(shè)置到子線(xiàn)程中 RequestContextHolder.setRequestAttributes(attributes); // 執(zhí)行子線(xiàn)程,這一步不要忘了 runnable.run(); } finally { // 線(xiàn)程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 RequestContextHolder.resetRequestAttributes(); } }; } }
ExecutorConfig
在原來(lái)的基礎(chǔ)上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可視化運(yùn)行狀態(tài)的線(xiàn)程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線(xiàn)程數(shù) executor.setCorePoolSize(corePoolSize); //配置最大線(xiàn)程數(shù) executor.setMaxPoolSize(maxPoolSize); //配置隊(duì)列大小 executor.setQueueCapacity(queueCapacity); //配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線(xiàn)程中執(zhí)行任務(wù),而是有調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加線(xiàn)程池修飾類(lèi) executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的線(xiàn)程池修飾類(lèi) //executor.setTaskDecorator(new MDCTaskDecorator()); //執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用RequestAttributes獲取主線(xiàn)程傳遞的數(shù)據(jù) * @return * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync3() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("異步線(xiàn)程執(zhí)行返回結(jié)果......+"); RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); Object userId = attributes.getAttribute("userId", 0); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(userId.toString()); }
Test2Controller
/** * RequestContextHolder+TaskDecorator的方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test3") public String test3() throws InterruptedException, ExecutionException { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); attributes.setAttribute("userId","123456",0); CompletableFuture<String> completableFuture = asyncService.executeValueAsync3(); String s = completableFuture.get(); return s; }
4 MDC+TaskDecorator
自定義MDCTaskDecorator
/** * 線(xiàn)程池修飾類(lèi) */ public class MDCTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 獲取主線(xiàn)程中的請(qǐng)求信息(我們的用戶(hù)信息也放在里面) String userId = MDC.get("userId"); Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); System.out.println(copyOfContextMap); return () -> { try { // 將主線(xiàn)程的請(qǐng)求信息,設(shè)置到子線(xiàn)程中 MDC.put("userId",userId); // 執(zhí)行子線(xiàn)程,這一步不要忘了 runnable.run(); } finally { // 線(xiàn)程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 MDC.clear(); } }; } }
ExecutorConfig
在原來(lái)的基礎(chǔ)上增加 executor.setTaskDecorator(new MDCTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可視化運(yùn)行狀態(tài)的線(xiàn)程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線(xiàn)程數(shù) executor.setCorePoolSize(corePoolSize); //配置最大線(xiàn)程數(shù) executor.setMaxPoolSize(maxPoolSize); //配置隊(duì)列大小 executor.setQueueCapacity(queueCapacity); //配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線(xiàn)程中執(zhí)行任務(wù),而是有調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加MDC的線(xiàn)程池修飾類(lèi) executor.setTaskDecorator(new MDCTaskDecorator()); //執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用MDC獲取主線(xiàn)程傳遞的數(shù)據(jù) * @return * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync5() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("異步線(xiàn)程執(zhí)行返回結(jié)果......+"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(MDC.get("userId")); }
Test2Controller
/** * 使用MDC+TaskDecorator方式 * 本質(zhì)也是ThreadLocal+TaskDecorator方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test5") public String test5() throws InterruptedException, ExecutionException { MDC.put("userId","123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync5(); String s = completableFuture.get(); return s; }
5 InheritableThreadLocal
測(cè)試代碼
public class TestThreadLocal { public static ThreadLocal<String> threadLocal = new ThreadLocal<>(); public static void main(String[] args) { //設(shè)置線(xiàn)程變量 threadLocal.set("hello world"); Thread thread = new Thread(new Runnable() { @Override public void run( ) { //子線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("thread:"+threadLocal.get()); } }); thread.start(); // 主線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("main:"+threadLocal.get()); } }
輸出結(jié)果:
main:hello world
thread:null
從上面結(jié)果可以看出:同一個(gè)ThreadLocal變量在父線(xiàn)程中被設(shè)置后,在子線(xiàn)程中是獲取不到的;
原因在子線(xiàn)程thread里面調(diào)用get方法時(shí)當(dāng)前線(xiàn)程為thread線(xiàn)程,而這里調(diào)用set方法設(shè)置線(xiàn)程變量的是main線(xiàn)程,兩者是不同的線(xiàn)程,自然子線(xiàn)程訪(fǎng)問(wèn)時(shí)返回null
為了解決上面的問(wèn)題,InheritableThreadLocal應(yīng)運(yùn)而生,InheritableThreadLocal繼承ThreadLocal,其提供一個(gè)特性,就是讓子線(xiàn)程可以訪(fǎng)問(wèn)在父線(xiàn)程中設(shè)置的本地變量
將上面測(cè)試代碼用InheritableThreadLocal修改
public class TestInheritableThreadLocal { public static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) { //設(shè)置線(xiàn)程變量 threadLocal.set("hello world"); Thread thread = new Thread(new Runnable() { @Override public void run( ) { //子線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("thread:"+threadLocal.get()); } }); thread.start(); // 主線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("main:"+threadLocal.get()); } }
輸出結(jié)果:
main:hello world
thread:hello world
5.1 源碼分析
public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
InheritableThreadLocal 重寫(xiě)了childValue,getMap,createMap三個(gè)方法
在InheritableThreadLocal中,變量inheritableThreadLocals 替代了threadLocals;
那么如何讓子線(xiàn)程可以訪(fǎng)問(wèn)父線(xiàn)程的本地變量。這要從創(chuàng)建Thread的代碼說(shuō)起,打開(kāi)Thread類(lèi)的默認(rèn)構(gòu)造方法,代碼如下:
public Thread(Runnable target) { init(null, target, "Thread-" + nextThreadNum(), 0); } private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) { throw new NullPointerException("name cannot be null"); } this.name = name; //獲取當(dāng)前線(xiàn)程 Thread parent = currentThread(); //如果父線(xiàn)程的 inheritableThreadLocals變量不為null if (inheritThreadLocals && parent.inheritableThreadLocals != null) //設(shè)置子線(xiàn)程inheritThreadLocals變量 this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID(); }
我們看下createInheritedMap代碼:
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
在createInheritedMap內(nèi)部使用父線(xiàn)程的inheritableThreadLocals變量作為構(gòu)造方法創(chuàng)建了一個(gè)新的ThreadLocalMap變量,然后賦值給子線(xiàn)程的inheritableThreadLocals變量。
下面看看ThreadLocalMap的構(gòu)造函數(shù)內(nèi)部做了什么事情;
private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } }
InheritableThreadLocal 類(lèi)通過(guò)重寫(xiě)下面代碼
ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); }
讓本地變量保存到了具體的線(xiàn)程的inheritableThreadLocals變量里面,那么線(xiàn)程在通過(guò)InheritableThreadLocal類(lèi)實(shí)例的set或者get方法設(shè)置變量時(shí),就會(huì)創(chuàng)建當(dāng)前線(xiàn)程的inheritableThreadLocals變量。
當(dāng)父線(xiàn)程創(chuàng)建子線(xiàn)程時(shí),構(gòu)造方法會(huì)把父線(xiàn)程中的inheritableThreadLocals變量里面的本地變量賦值一份保存到子線(xiàn)程的inheritableThreadLocals變量里面
5.2 InheritableThreadLocal存在的問(wèn)題
雖然InheritableThreadLocal可以解決在子線(xiàn)程中獲取父線(xiàn)程的值的問(wèn)題,但是在使用線(xiàn)程池的情況下,由于不同的任務(wù)有可能是同一個(gè)線(xiàn)程處理,因此這些任務(wù)取到的值有可能并不是父線(xiàn)程設(shè)置的值
測(cè)試目標(biāo):任務(wù)1和任務(wù)2 獲取父線(xiàn)程值一樣,為測(cè)試代碼中的hello world
測(cè)試代碼:
public class TestInheritableThreadLocaIssue { public static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>(); public static ExecutorService executorService = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws Exception { //設(shè)置線(xiàn)程變量 threadLocal.set("hello world"); Thread thread1 = new Thread(new Runnable() { @Override public void run( ) { //子線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("thread:"+threadLocal.get()); threadLocal.set("hello world 2"); } },"task1"); Thread thread2 = new Thread(new Runnable() { @Override public void run( ) { //子線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("thread:"+threadLocal.get()); threadLocal.set("hello world 2"); } },"task2"); executorService.submit(thread1).get(); executorService.submit(thread2).get(); // 主線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("main:"+threadLocal.get()); } }
輸出結(jié)果:
thread:hello world
thread:hello world 2
main:hello world
結(jié)果分析:
很明顯,任務(wù)2獲取的不是父線(xiàn)程設(shè)置的hello world ,而是線(xiàn)程1修改后的值。如果在線(xiàn)程池中使用,需要注意這種情況(可以備份備份父線(xiàn)程的值)
6 TransmittableThreadLocal
解決線(xiàn)程池化值傳遞
阿里封裝了一個(gè)工具,實(shí)現(xiàn)了在使用線(xiàn)程池等會(huì)池化復(fù)用線(xiàn)程的組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時(shí)上下文傳遞的問(wèn)題
JDK的InheritableThreadLocal類(lèi)可以完成父線(xiàn)程到子線(xiàn)程的值傳遞。但對(duì)于使用線(xiàn)程池等會(huì)池化復(fù)用線(xiàn)程的執(zhí)行組件的情況,線(xiàn)程由線(xiàn)程池創(chuàng)建好,并且線(xiàn)程是池化起來(lái)反復(fù)使用的;
這時(shí)父子線(xiàn)程關(guān)系的ThreadLocal值傳遞已經(jīng)沒(méi)有意義,應(yīng)用需要的實(shí)際上是把 任務(wù)提交給線(xiàn)程池時(shí)的ThreadLocal值傳遞到 任務(wù)執(zhí)行時(shí)
https://github.com/alibaba/transmittable-thread-local
引入:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.11.5</version> </dependency>
需求場(chǎng)景:
- 1.分布式跟蹤系統(tǒng) 或 全鏈路壓測(cè)(即鏈路打標(biāo))
- 2.日志收集記錄系統(tǒng)上下文
- 3.Session級(jí)Cache
- 4.應(yīng)用容器或上層框架跨應(yīng)用代碼給下層SDK傳遞信息
測(cè)試代碼:
1)父子線(xiàn)程信息傳遞
public static TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>(); public static void main(String[] args) { //設(shè)置線(xiàn)程變量 threadLocal.set("hello world"); Thread thread = new Thread(new Runnable() { @Override public void run( ) { //子線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("thread:"+threadLocal.get()); } }); thread.start(); // 主線(xiàn)程輸出線(xiàn)程變量的值 System.out.println("main:"+threadLocal.get()); } }
輸出結(jié)果:
main:hello world
thread:hello world
2)線(xiàn)程池中傳遞值,參考github:修飾線(xiàn)程池
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
用SpringMVC編寫(xiě)一個(gè)HelloWorld的詳細(xì)過(guò)程
SpringMVC是Spring的一個(gè)后續(xù)產(chǎn)品,是Spring的一個(gè)子項(xiàng)目<BR>SpringMVC?是?Spring?為表述層開(kāi)發(fā)提供的一整套完備的解決方案,本文我們將用SpringMVC編寫(xiě)一個(gè)HelloWorld,文中有詳細(xì)的編寫(xiě)過(guò)程,需要的朋友可以參考下2023-08-08Java?Spring?AOP源碼解析之事務(wù)實(shí)現(xiàn)原理
這篇文章主要為大家介紹了Java?Spring?AOP事務(wù)實(shí)現(xiàn)原理,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-01-01詳解Spring Data Jpa當(dāng)屬性為Null也更新的完美解決方案
這篇文章主要介紹了詳解Spring Data Jpa當(dāng)屬性為Null也更新的完美解決方案,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-02-02Java快速實(shí)現(xiàn)圖書(shū)管理基本功能
隨著網(wǎng)絡(luò)技術(shù)的高速發(fā)展,計(jì)算機(jī)應(yīng)用的普及,利用計(jì)算機(jī)對(duì)圖書(shū)館的日常工作進(jìn)行管理勢(shì)在必行,本篇文章涵蓋一個(gè)圖書(shū)管理系統(tǒng)的基本功能實(shí)現(xiàn)代碼,大家可以查缺補(bǔ)漏,提升水平2022-05-05Spring通過(guò)<import>標(biāo)簽導(dǎo)入外部配置文件
之前文章里我們講到Spring加載Xml配置文件的細(xì)節(jié),那么加載完了我們肯定要解析這個(gè)配置文件中定義的元素。這篇我們首先來(lái)分析下Spring是如何通過(guò)標(biāo)簽導(dǎo)入外部配置文件的。2021-06-06SpringIOC容器Bean的作用域及生命周期實(shí)例
這篇文章主要為大家介紹了SpringIOC容器Bean的作用域及生命周期實(shí)例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05SpringBoot 集成 ShedLock 分布式鎖的示例詳解
ShedLock是一個(gè)在分布式環(huán)境中使用的定時(shí)任務(wù)框架,用于解決在分布式環(huán)境中的多個(gè)實(shí)例的相同定時(shí)任務(wù)在同一時(shí)間點(diǎn)重復(fù)執(zhí)行的問(wèn)題,本文重點(diǎn)給大家介紹SpringBoot 分布式鎖ShedLock的相關(guān)知識(shí),感興趣的朋友一起看看吧2021-08-08