Java多線程父線程向子線程傳值問(wèn)題及解決
1 背景
在實(shí)際開(kāi)發(fā)過(guò)程中我們需要父子之間傳遞一些數(shù)據(jù),比如用戶信息,日志異步生成數(shù)據(jù)傳遞等,該文章從5種解決方案解決父子之間數(shù)據(jù)傳遞困擾
2 ThreadLocal+TaskDecorator
用戶工具類 UserUtils
/**
*使用ThreadLocal存儲(chǔ)共享的數(shù)據(jù)變量,如登錄的用戶信息
*/
public class UserUtils {
private static final ThreadLocal<String> userLocal=new ThreadLocal<>();
public static String getUserId(){
return userLocal.get();
}
public static void setUserId(String userId){
userLocal.set(userId);
}
public static void clear(){
userLocal.remove();
}
}自定義CustomTaskDecorator
/**
* 線程池修飾類
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請(qǐng)求信息(我們的用戶信息也放在里面)
String robotId = UserUtils.getUserId();
System.out.println(robotId);
return () -> {
try {
// 將主線程的請(qǐng)求信息,設(shè)置到子線程中
UserUtils.setUserId(robotId);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
UserUtils.clear();
}
};
}
}ExecutorConfig
在原來(lái)的基礎(chǔ)上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//配置隊(duì)列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加線程池修飾類
executor.setTaskDecorator(new CustomTaskDecorator());
//增加MDC的線程池修飾類
//executor.setTaskDecorator(new MDCTaskDecorator());
//執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}AsyncServiceImpl
/**
* 使用ThreadLocal方式傳遞
* 帶有返回值
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync2() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserUtils.getUserId());
}
Test2Controller
/**
* 使用ThreadLocal+TaskDecorator的方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test2")
public String test2() throws InterruptedException, ExecutionException {
UserUtils.setUserId("123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();
String s = completableFuture.get();
return s;
}3 RequestContextHolder+TaskDecorator
自定義CustomTaskDecorator
/**
* 線程池修飾類
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請(qǐng)求信息(我們的用戶信息也放在里面)
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
return () -> {
try {
// 將主線程的請(qǐng)求信息,設(shè)置到子線程中
RequestContextHolder.setRequestAttributes(attributes);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
RequestContextHolder.resetRequestAttributes();
}
};
}
}ExecutorConfig
在原來(lái)的基礎(chǔ)上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//配置隊(duì)列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加線程池修飾類
executor.setTaskDecorator(new CustomTaskDecorator());
//增加MDC的線程池修飾類
//executor.setTaskDecorator(new MDCTaskDecorator());
//執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}AsyncServiceImpl
/**
* 使用RequestAttributes獲取主線程傳遞的數(shù)據(jù)
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync3() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......+");
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
Object userId = attributes.getAttribute("userId", 0);
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(userId.toString());
}Test2Controller
/**
* RequestContextHolder+TaskDecorator的方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test3")
public String test3() throws InterruptedException, ExecutionException {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
attributes.setAttribute("userId","123456",0);
CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();
String s = completableFuture.get();
return s;
}4 MDC+TaskDecorator
自定義MDCTaskDecorator
/**
* 線程池修飾類
*/
public class MDCTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請(qǐng)求信息(我們的用戶信息也放在里面)
String userId = MDC.get("userId");
Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
System.out.println(copyOfContextMap);
return () -> {
try {
// 將主線程的請(qǐng)求信息,設(shè)置到子線程中
MDC.put("userId",userId);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
MDC.clear();
}
};
}
}ExecutorConfig
在原來(lái)的基礎(chǔ)上增加 executor.setTaskDecorator(new MDCTaskDecorator());
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//配置隊(duì)列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加MDC的線程池修飾類
executor.setTaskDecorator(new MDCTaskDecorator());
//執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}AsyncServiceImpl
/**
* 使用MDC獲取主線程傳遞的數(shù)據(jù)
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync5() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(MDC.get("userId"));
}Test2Controller
/**
* 使用MDC+TaskDecorator方式
* 本質(zhì)也是ThreadLocal+TaskDecorator方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test5")
public String test5() throws InterruptedException, ExecutionException {
MDC.put("userId","123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();
String s = completableFuture.get();
return s;
}5 InheritableThreadLocal
測(cè)試代碼
public class TestThreadLocal {
public static ThreadLocal<String> threadLocal = new ThreadLocal<>();
public static void main(String[] args) {
//設(shè)置線程變量
threadLocal.set("hello world");
Thread thread = new Thread(new Runnable() {
@Override
public void run( ) {
//子線程輸出線程變量的值
System.out.println("thread:"+threadLocal.get());
}
});
thread.start();
// 主線程輸出線程變量的值
System.out.println("main:"+threadLocal.get());
}
}輸出結(jié)果:
main:hello world
thread:null
從上面結(jié)果可以看出:同一個(gè)ThreadLocal變量在父線程中被設(shè)置后,在子線程中是獲取不到的;
原因在子線程thread里面調(diào)用get方法時(shí)當(dāng)前線程為thread線程,而這里調(diào)用set方法設(shè)置線程變量的是main線程,兩者是不同的線程,自然子線程訪問(wèn)時(shí)返回null
為了解決上面的問(wèn)題,InheritableThreadLocal應(yīng)運(yùn)而生,InheritableThreadLocal繼承ThreadLocal,其提供一個(gè)特性,就是讓子線程可以訪問(wèn)在父線程中設(shè)置的本地變量
將上面測(cè)試代碼用InheritableThreadLocal修改
public class TestInheritableThreadLocal {
public static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) {
//設(shè)置線程變量
threadLocal.set("hello world");
Thread thread = new Thread(new Runnable() {
@Override
public void run( ) {
//子線程輸出線程變量的值
System.out.println("thread:"+threadLocal.get());
}
});
thread.start();
// 主線程輸出線程變量的值
System.out.println("main:"+threadLocal.get());
}
}輸出結(jié)果:
main:hello world
thread:hello world
5.1 源碼分析
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}InheritableThreadLocal 重寫(xiě)了childValue,getMap,createMap三個(gè)方法
在InheritableThreadLocal中,變量inheritableThreadLocals 替代了threadLocals;
那么如何讓子線程可以訪問(wèn)父線程的本地變量。這要從創(chuàng)建Thread的代碼說(shuō)起,打開(kāi)Thread類的默認(rèn)構(gòu)造方法,代碼如下:
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
//獲取當(dāng)前線程
Thread parent = currentThread();
//如果父線程的 inheritableThreadLocals變量不為null
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
//設(shè)置子線程inheritThreadLocals變量
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}我們看下createInheritedMap代碼:
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
在createInheritedMap內(nèi)部使用父線程的inheritableThreadLocals變量作為構(gòu)造方法創(chuàng)建了一個(gè)新的ThreadLocalMap變量,然后賦值給子線程的inheritableThreadLocals變量。
下面看看ThreadLocalMap的構(gòu)造函數(shù)內(nèi)部做了什么事情;
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}InheritableThreadLocal 類通過(guò)重寫(xiě)下面代碼
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/**
* Create the map associated with a ThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}讓本地變量保存到了具體的線程的inheritableThreadLocals變量里面,那么線程在通過(guò)InheritableThreadLocal類實(shí)例的set或者get方法設(shè)置變量時(shí),就會(huì)創(chuàng)建當(dāng)前線程的inheritableThreadLocals變量。
當(dāng)父線程創(chuàng)建子線程時(shí),構(gòu)造方法會(huì)把父線程中的inheritableThreadLocals變量里面的本地變量賦值一份保存到子線程的inheritableThreadLocals變量里面
5.2 InheritableThreadLocal存在的問(wèn)題
雖然InheritableThreadLocal可以解決在子線程中獲取父線程的值的問(wèn)題,但是在使用線程池的情況下,由于不同的任務(wù)有可能是同一個(gè)線程處理,因此這些任務(wù)取到的值有可能并不是父線程設(shè)置的值
測(cè)試目標(biāo):任務(wù)1和任務(wù)2 獲取父線程值一樣,為測(cè)試代碼中的hello world
測(cè)試代碼:
public class TestInheritableThreadLocaIssue {
public static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
public static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws Exception {
//設(shè)置線程變量
threadLocal.set("hello world");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run( ) {
//子線程輸出線程變量的值
System.out.println("thread:"+threadLocal.get());
threadLocal.set("hello world 2");
}
},"task1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run( ) {
//子線程輸出線程變量的值
System.out.println("thread:"+threadLocal.get());
threadLocal.set("hello world 2");
}
},"task2");
executorService.submit(thread1).get();
executorService.submit(thread2).get();
// 主線程輸出線程變量的值
System.out.println("main:"+threadLocal.get());
}
}輸出結(jié)果:
thread:hello world
thread:hello world 2
main:hello world
結(jié)果分析:
很明顯,任務(wù)2獲取的不是父線程設(shè)置的hello world ,而是線程1修改后的值。如果在線程池中使用,需要注意這種情況(可以備份備份父線程的值)
6 TransmittableThreadLocal
解決線程池化值傳遞
阿里封裝了一個(gè)工具,實(shí)現(xiàn)了在使用線程池等會(huì)池化復(fù)用線程的組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時(shí)上下文傳遞的問(wèn)題
JDK的InheritableThreadLocal類可以完成父線程到子線程的值傳遞。但對(duì)于使用線程池等會(huì)池化復(fù)用線程的執(zhí)行組件的情況,線程由線程池創(chuàng)建好,并且線程是池化起來(lái)反復(fù)使用的;
這時(shí)父子線程關(guān)系的ThreadLocal值傳遞已經(jīng)沒(méi)有意義,應(yīng)用需要的實(shí)際上是把 任務(wù)提交給線程池時(shí)的ThreadLocal值傳遞到 任務(wù)執(zhí)行時(shí)
https://github.com/alibaba/transmittable-thread-local
引入:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.11.5</version> </dependency>
需求場(chǎng)景:
- 1.分布式跟蹤系統(tǒng) 或 全鏈路壓測(cè)(即鏈路打標(biāo))
- 2.日志收集記錄系統(tǒng)上下文
- 3.Session級(jí)Cache
- 4.應(yīng)用容器或上層框架跨應(yīng)用代碼給下層SDK傳遞信息
測(cè)試代碼:
1)父子線程信息傳遞
public static TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
public static void main(String[] args) {
//設(shè)置線程變量
threadLocal.set("hello world");
Thread thread = new Thread(new Runnable() {
@Override
public void run( ) {
//子線程輸出線程變量的值
System.out.println("thread:"+threadLocal.get());
}
});
thread.start();
// 主線程輸出線程變量的值
System.out.println("main:"+threadLocal.get());
}
}輸出結(jié)果:
main:hello world
thread:hello world
2)線程池中傳遞值,參考github:修飾線程池
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
用SpringMVC編寫(xiě)一個(gè)HelloWorld的詳細(xì)過(guò)程
SpringMVC是Spring的一個(gè)后續(xù)產(chǎn)品,是Spring的一個(gè)子項(xiàng)目<BR>SpringMVC?是?Spring?為表述層開(kāi)發(fā)提供的一整套完備的解決方案,本文我們將用SpringMVC編寫(xiě)一個(gè)HelloWorld,文中有詳細(xì)的編寫(xiě)過(guò)程,需要的朋友可以參考下2023-08-08
Java?Spring?AOP源碼解析之事務(wù)實(shí)現(xiàn)原理
這篇文章主要為大家介紹了Java?Spring?AOP事務(wù)實(shí)現(xiàn)原理,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-01-01
詳解Spring Data Jpa當(dāng)屬性為Null也更新的完美解決方案
這篇文章主要介紹了詳解Spring Data Jpa當(dāng)屬性為Null也更新的完美解決方案,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-02-02
Java快速實(shí)現(xiàn)圖書(shū)管理基本功能
隨著網(wǎng)絡(luò)技術(shù)的高速發(fā)展,計(jì)算機(jī)應(yīng)用的普及,利用計(jì)算機(jī)對(duì)圖書(shū)館的日常工作進(jìn)行管理勢(shì)在必行,本篇文章涵蓋一個(gè)圖書(shū)管理系統(tǒng)的基本功能實(shí)現(xiàn)代碼,大家可以查缺補(bǔ)漏,提升水平2022-05-05
Spring通過(guò)<import>標(biāo)簽導(dǎo)入外部配置文件
之前文章里我們講到Spring加載Xml配置文件的細(xì)節(jié),那么加載完了我們肯定要解析這個(gè)配置文件中定義的元素。這篇我們首先來(lái)分析下Spring是如何通過(guò)標(biāo)簽導(dǎo)入外部配置文件的。2021-06-06
SpringIOC容器Bean的作用域及生命周期實(shí)例
這篇文章主要為大家介紹了SpringIOC容器Bean的作用域及生命周期實(shí)例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
SpringBoot 集成 ShedLock 分布式鎖的示例詳解
ShedLock是一個(gè)在分布式環(huán)境中使用的定時(shí)任務(wù)框架,用于解決在分布式環(huán)境中的多個(gè)實(shí)例的相同定時(shí)任務(wù)在同一時(shí)間點(diǎn)重復(fù)執(zhí)行的問(wèn)題,本文重點(diǎn)給大家介紹SpringBoot 分布式鎖ShedLock的相關(guān)知識(shí),感興趣的朋友一起看看吧2021-08-08

