SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式
SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式
1、ThreadLocal+TaskDecorator
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;
/**
* @author tom
* 使用ThreadLocal存儲(chǔ)共享的數(shù)據(jù)變量,如登錄的用戶信息
*/
public class UserUtils {
private static final ThreadLocal<String> userLocal = new ThreadLocal<>();
public static String getUserId() {
return userLocal.get();
}
public static void setUserId(String userId) {
userLocal.set(userId);
}
public static void clear() {
userLocal.remove();
}
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;
import org.springframework.core.task.TaskDecorator;
/**
* @author tom
* 線程池修飾類
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請(qǐng)求信息(我們的用戶信息也放在里面)
String robotId = UserUtils.getUserId();
return () -> {
try {
// 將主線程的請(qǐng)求信息,設(shè)置到子線程中
UserUtils.setUserId(robotId);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
UserUtils.clear();
}
};
}
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class ExecutorConfig {
@Bean(name = "asyncServiceExecutor1")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor------------");
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 配置核心線程數(shù)
executor.setCorePoolSize(2);
// 配置最大線程數(shù)
executor.setMaxPoolSize(2);
// 配置隊(duì)列大小
executor.setQueueCapacity(60);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("my-thread");
// rejection-policy: 當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 增加線程池修飾類
executor.setTaskDecorator(new CustomTaskDecorator());
// 執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
/**
* @author tom
*/
@Slf4j
@Service("asyncServiceImpl1")
public class AsyncServiceImpl {
/**
* 使用ThreadLocal方式傳遞
* 帶有返回值
*
* @throws InterruptedException
*/
@Async("asyncServiceExecutor1")
public CompletableFuture<String> executeValueAsync1() {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserUtils.getUserId());
}
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author tom
*/
@Slf4j
@RestController
public class Test1Controller {
private AsyncServiceImpl asyncService;
@Autowired
@Qualifier(value = "asyncServiceImpl1")
public void setAsyncService(AsyncServiceImpl asyncService) {
this.asyncService = asyncService;
}
/**
* 使用ThreadLocal+TaskDecorator的方式
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test1")
public String test1() throws ExecutionException, InterruptedException {
UserUtils.setUserId("123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync1();
String s = completableFuture.get();
log.info("result {}", s);
return s;
}
}
2、RequestContextHolder+TaskDecorator
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;
import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
/**
* @author tom
* 線程池修飾類
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請(qǐng)求信息(我們的用戶信息也放在里面)
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
return () -> {
try {
// 將主線程的請(qǐng)求信息,設(shè)置到子線程中
RequestContextHolder.setRequestAttributes(attributes);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
RequestContextHolder.resetRequestAttributes();
}
};
}
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class ExecutorConfig {
@Bean(name = "asyncServiceExecutor2")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor------------");
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 配置核心線程數(shù)
executor.setCorePoolSize(2);
// 配置最大線程數(shù)
executor.setMaxPoolSize(2);
// 配置隊(duì)列大小
executor.setQueueCapacity(60);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("my-thread");
// rejection-policy: 當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加線程池修飾類
executor.setTaskDecorator(new CustomTaskDecorator());
// 執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.CompletableFuture;
/**
* @author tom
*/
@Slf4j
@Service("asyncServiceImpl2")
public class AsyncServiceImpl {
/**
* 使用RequestAttributes獲取主線程傳遞的數(shù)據(jù)
*
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor2")
public CompletableFuture<String> executeValueAsync2(){
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......");
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
Object userId = attributes.getAttribute("userId", 0);
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(userId.toString());
}
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author tom
*/
@Slf4j
@RestController
public class Test2Controller {
private AsyncServiceImpl asyncService;
@Autowired
@Qualifier(value = "asyncServiceImpl2")
public void setAsyncService(AsyncServiceImpl asyncService) {
this.asyncService = asyncService;
}
/**
* RequestContextHolder+TaskDecorator的方式
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test2")
public String test2() throws InterruptedException, ExecutionException {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
attributes.setAttribute("userId", "123456", 0);
CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();
String s = completableFuture.get();
log.info("result {}", s);
return s;
}
}
3、MDC+TaskDecorator
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;
import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;
/**
* @author tom
* 線程池修飾類
*/
public class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請(qǐng)求信息(我們的用戶信息也放在里面)
String userId = MDC.get("userId");
return () -> {
try {
// 將主線程的請(qǐng)求信息,設(shè)置到子線程中
MDC.put("userId", userId);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
MDC.clear();
}
};
}
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class ExecutorConfig {
@Bean(name = "asyncServiceExecutor3")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor------------");
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 配置核心線程數(shù)
executor.setCorePoolSize(2);
// 配置最大線程數(shù)
executor.setMaxPoolSize(2);
// 配置隊(duì)列大小
executor.setQueueCapacity(60);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("my-thread");
// rejection-policy: 當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加MDC的線程池修飾類
executor.setTaskDecorator(new MdcTaskDecorator());
//執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
/**
* @author tom
*/
@Slf4j
@Service("asyncServiceImpl3")
public class AsyncServiceImpl {
/**
* 使用MDC獲取主線程傳遞的數(shù)據(jù)
*
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor3")
public CompletableFuture<String> executeValueAsync3() {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(MDC.get("userId"));
}
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author tom
*/
@Slf4j
@RestController
public class Test3Controller {
private AsyncServiceImpl asyncService;
@Autowired
@Qualifier(value = "asyncServiceImpl3")
public void setAsyncService(AsyncServiceImpl asyncService) {
this.asyncService = asyncService;
}
/**
* 使用MDC+TaskDecorator方式
* 本質(zhì)也是ThreadLocal+TaskDecorator方式
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test3")
public String test3() throws InterruptedException, ExecutionException {
MDC.put("userId", "123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();
String s = completableFuture.get();
log.info("result {}", s);
return s;
}
}
4、InheritableThreadLocal
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;
/**
* @author tom
* 使用InheritableThreadLocal存儲(chǔ)線程之間共享的數(shù)據(jù)變量,如登錄的用戶信息
*/
public class UserInheritableUtils {
private static final InheritableThreadLocal<String> userLocal = new InheritableThreadLocal<>();
public static String getUserId() {
return userLocal.get();
}
public static void setUserId(String userId) {
userLocal.set(userId);
}
public static void clear() {
userLocal.remove();
}
}
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;
import com.example.parentchildthreadspassingdata.mdc_taskdecorator.VisibleThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class ExecutorConfig {
@Bean(name = "asyncServiceExecutor4")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor------------");
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 配置核心線程數(shù)
executor.setCorePoolSize(2);
// 配置最大線程數(shù)
executor.setMaxPoolSize(2);
// 配置隊(duì)列大小
executor.setQueueCapacity(60);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("my-thread");
// rejection-policy: 當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
}
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
/**
* @author
*/
@Slf4j
@Service("asyncServiceImpl4")
public class AsyncServiceImpl {
/**
* 使用InheritableThreadLocal獲取主線程傳遞的數(shù)據(jù)
*
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor4")
public CompletableFuture<String> executeValueAsync4() {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserInheritableUtils.getUserId());
}
}
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author tom
*/
@Slf4j
@RestController
public class Test4Controller {
private AsyncServiceImpl asyncService;
@Autowired
@Qualifier(value = "asyncServiceImpl4")
public void setAsyncService(AsyncServiceImpl asyncService) {
this.asyncService = asyncService;
}
/**
* 使用InheritableThreadLocal方式
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test4")
public String test4() throws InterruptedException, ExecutionException {
UserInheritableUtils.setUserId("123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync4();
String s = completableFuture.get();
log.info("result {}", s);
return s;
}
}
5、TransmittableThreadLocal
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;
import com.alibaba.ttl.TransmittableThreadLocal;
/**
* @author tom
* 使用TransmittableThreadLocal存儲(chǔ)線程之間共享的數(shù)據(jù)變量,如登錄的用戶信息
*/
public class UserTransmittableUtils {
private static final TransmittableThreadLocal<String> userLocal = new TransmittableThreadLocal<>();
public static String getUserId() {
return userLocal.get();
}
public static void setUserId(String userId) {
userLocal.set(userId);
}
public static void clear() {
userLocal.remove();
}
}
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;
import com.example.parentchildthreadspassingdata.mdc_taskdecorator.VisibleThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class ExecutorConfig {
@Bean(name = "asyncServiceExecutor5")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor------------");
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 使用可視化運(yùn)行狀態(tài)的線程池
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 配置核心線程數(shù)
executor.setCorePoolSize(2);
// 配置最大線程數(shù)
executor.setMaxPoolSize(2);
// 配置隊(duì)列大小
executor.setQueueCapacity(60);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("my-thread");
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 執(zhí)行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
}
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
/**
* @author tom
*/
@Slf4j
@Service("asyncServiceImpl5")
public class AsyncServiceImpl {
/**
* 使用TransmittableThreadLocal獲取主線程傳遞的數(shù)據(jù)
*
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor5")
public CompletableFuture<String> executeValueAsync5() {
log.info("start executeValueAsync");
System.out.println("異步線程執(zhí)行返回結(jié)果......");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserTransmittableUtils.getUserId());
}
}
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author tom
*/
@RestController
@Slf4j
public class Test5Controller {
private AsyncServiceImpl asyncService;
@Autowired
@Qualifier(value = "asyncServiceImpl5")
public void setAsyncService(AsyncServiceImpl asyncService) {
this.asyncService = asyncService;
}
/**
* 使用TransmittableThreadLocal方式
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test5")
public String test5() throws InterruptedException, ExecutionException {
UserTransmittableUtils.setUserId("123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();
String s = completableFuture.get();
log.info("result {}", s);
return s;
}
}
6、方案對(duì)比
方案1,方案2,方案3主要是借助 TaskDecorator 進(jìn)行父子線程之間傳遞數(shù)據(jù)。其中 MDC 方案主要借鑒于 MDC
的日志跟蹤的思想來(lái)實(shí)現(xiàn)。
方案4和方案5使用 InheritableThreadLocal 和 TransmittableThreadLocal 來(lái)實(shí)現(xiàn),其中
TransmittableThreadLocal 是阿里 InheritableThreadLocal 進(jìn)行優(yōu)化封裝。
本人推薦使用方案5。
以上就是SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot父子線程數(shù)據(jù)傳遞的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 在 Spring Boot 中使用異步線程時(shí)的 HttpServletRequest 復(fù)用問(wèn)題記錄
- Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
- springboot?正確的在異步線程中使用request的示例代碼
- SpringBoot?異步線程間傳遞上下文方式
- SpringBoot獲取HttpServletRequest的3種方式總結(jié)
- SpringBoot詳細(xì)講解異步任務(wù)如何獲取HttpServletRequest
- SpringBoot實(shí)現(xiàn)任意位置獲取HttpServletRequest對(duì)象
- Spring?Boot?中正確地在異步線程中使用?HttpServletRequest的方法
相關(guān)文章
SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟
這篇文章主要介紹了SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟,使用注解關(guān)閉Swagger2,避免接口重復(fù)暴露,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2018-12-12
Gradle環(huán)境下導(dǎo)出Swagger為PDF的步驟詳解
這篇文章主要介紹了Gradle環(huán)境下導(dǎo)出Swagger為PDF的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
java實(shí)現(xiàn)的統(tǒng)計(jì)字符算法示例
這篇文章主要介紹了java實(shí)現(xiàn)的統(tǒng)計(jì)字符算法,涉及java針對(duì)字符的遍歷、判斷、運(yùn)算等相關(guān)操作技巧,需要的朋友可以參考下2017-10-10
SpringCloud組件OpenFeign之默認(rèn)HTTP請(qǐng)求方式詳解
這篇文章主要介紹了SpringCloud組件OpenFeign之默認(rèn)HTTP請(qǐng)求方式詳解,在SpringMvcContract類中有個(gè)這樣的方法processAnnotationOnMethod,見(jiàn)名思意,這個(gè)方法就是處理Feign接口下方法上的注解的,需要的朋友可以參考下2024-01-01
Spring?Boot?中的?@DateTimeFormat?和?@JsonFormat?的用法及作用詳解
本文介紹了SpringBoot中的@DateTimeFormat和@JsonFormat注解的用法,解釋了它們?cè)谔幚砣掌诤蜁r(shí)間數(shù)據(jù)時(shí)的作用,并通過(guò)實(shí)例代碼展示了如何在REST控制器中使用這些注解,感興趣的朋友跟隨小編一起看看吧2024-11-11
基于Java的界面開(kāi)發(fā)詳細(xì)步驟(用戶注冊(cè)登錄)
通過(guò)一段時(shí)間Java Web的學(xué)習(xí),寫(xiě)一個(gè)簡(jiǎn)單的注冊(cè)登陸界面來(lái)做個(gè)總結(jié),這篇文章主要給大家介紹了基于Java的界面開(kāi)發(fā)(用戶注冊(cè)登錄)的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01

