SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式
SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式
1、ThreadLocal+TaskDecorator
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator; /** * @author tom * 使用ThreadLocal存儲共享的數(shù)據(jù)變量,如登錄的用戶信息 */ public class UserUtils { private static final ThreadLocal<String> userLocal = new ThreadLocal<>(); public static String getUserId() { return userLocal.get(); } public static void setUserId(String userId) { userLocal.set(userId); } public static void clear() { userLocal.remove(); } }
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator; import org.springframework.core.task.TaskDecorator; /** * @author tom * 線程池修飾類 */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 獲取主線程中的請求信息(我們的用戶信息也放在里面) String robotId = UserUtils.getUserId(); return () -> { try { // 將主線程的請求信息,設(shè)置到子線程中 UserUtils.setUserId(robotId); // 執(zhí)行子線程,這一步不要忘了 runnable.run(); } finally { // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 UserUtils.clear(); } }; } }
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class ExecutorConfig { @Bean(name = "asyncServiceExecutor1") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor------------"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 使用可視化運行狀態(tài)的線程池 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // 配置核心線程數(shù) executor.setCorePoolSize(2); // 配置最大線程數(shù) executor.setMaxPoolSize(2); // 配置隊列大小 executor.setQueueCapacity(60); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("my-thread"); // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù) // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 增加線程池修飾類 executor.setTaskDecorator(new CustomTaskDecorator()); // 執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; } }
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; /** * @author tom */ @Slf4j @Service("asyncServiceImpl1") public class AsyncServiceImpl { /** * 使用ThreadLocal方式傳遞 * 帶有返回值 * * @throws InterruptedException */ @Async("asyncServiceExecutor1") public CompletableFuture<String> executeValueAsync1() { log.info("start executeValueAsync"); System.out.println("異步線程執(zhí)行返回結(jié)果......"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(UserUtils.getUserId()); } }
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author tom */ @Slf4j @RestController public class Test1Controller { private AsyncServiceImpl asyncService; @Autowired @Qualifier(value = "asyncServiceImpl1") public void setAsyncService(AsyncServiceImpl asyncService) { this.asyncService = asyncService; } /** * 使用ThreadLocal+TaskDecorator的方式 * * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test1") public String test1() throws ExecutionException, InterruptedException { UserUtils.setUserId("123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync1(); String s = completableFuture.get(); log.info("result {}", s); return s; } }
2、RequestContextHolder+TaskDecorator
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator; import org.springframework.core.task.TaskDecorator; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; /** * @author tom * 線程池修飾類 */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 獲取主線程中的請求信息(我們的用戶信息也放在里面) RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); return () -> { try { // 將主線程的請求信息,設(shè)置到子線程中 RequestContextHolder.setRequestAttributes(attributes); // 執(zhí)行子線程,這一步不要忘了 runnable.run(); } finally { // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 RequestContextHolder.resetRequestAttributes(); } }; } }
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class ExecutorConfig { @Bean(name = "asyncServiceExecutor2") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor------------"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 使用可視化運行狀態(tài)的線程池 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // 配置核心線程數(shù) executor.setCorePoolSize(2); // 配置最大線程數(shù) executor.setMaxPoolSize(2); // 配置隊列大小 executor.setQueueCapacity(60); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("my-thread"); // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加線程池修飾類 executor.setTaskDecorator(new CustomTaskDecorator()); // 執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; } }
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import java.util.concurrent.CompletableFuture; /** * @author tom */ @Slf4j @Service("asyncServiceImpl2") public class AsyncServiceImpl { /** * 使用RequestAttributes獲取主線程傳遞的數(shù)據(jù) * * @return * @throws InterruptedException */ @Async("asyncServiceExecutor2") public CompletableFuture<String> executeValueAsync2(){ log.info("start executeValueAsync"); System.out.println("異步線程執(zhí)行返回結(jié)果......"); RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); Object userId = attributes.getAttribute("userId", 0); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(userId.toString()); } }
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author tom */ @Slf4j @RestController public class Test2Controller { private AsyncServiceImpl asyncService; @Autowired @Qualifier(value = "asyncServiceImpl2") public void setAsyncService(AsyncServiceImpl asyncService) { this.asyncService = asyncService; } /** * RequestContextHolder+TaskDecorator的方式 * * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test2") public String test2() throws InterruptedException, ExecutionException { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); attributes.setAttribute("userId", "123456", 0); CompletableFuture<String> completableFuture = asyncService.executeValueAsync2(); String s = completableFuture.get(); log.info("result {}", s); return s; } }
3、MDC+TaskDecorator
package com.example.parentchildthreadspassingdata.mdc_taskdecorator; import org.slf4j.MDC; import org.springframework.core.task.TaskDecorator; /** * @author tom * 線程池修飾類 */ public class MdcTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 獲取主線程中的請求信息(我們的用戶信息也放在里面) String userId = MDC.get("userId"); return () -> { try { // 將主線程的請求信息,設(shè)置到子線程中 MDC.put("userId", userId); // 執(zhí)行子線程,這一步不要忘了 runnable.run(); } finally { // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 MDC.clear(); } }; } }
package com.example.parentchildthreadspassingdata.mdc_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
package com.example.parentchildthreadspassingdata.mdc_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class ExecutorConfig { @Bean(name = "asyncServiceExecutor3") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor------------"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 使用可視化運行狀態(tài)的線程池 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // 配置核心線程數(shù) executor.setCorePoolSize(2); // 配置最大線程數(shù) executor.setMaxPoolSize(2); // 配置隊列大小 executor.setQueueCapacity(60); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("my-thread"); // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù) // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加MDC的線程池修飾類 executor.setTaskDecorator(new MdcTaskDecorator()); //執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; } }
package com.example.parentchildthreadspassingdata.mdc_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; /** * @author tom */ @Slf4j @Service("asyncServiceImpl3") public class AsyncServiceImpl { /** * 使用MDC獲取主線程傳遞的數(shù)據(jù) * * @return * @throws InterruptedException */ @Async("asyncServiceExecutor3") public CompletableFuture<String> executeValueAsync3() { log.info("start executeValueAsync"); System.out.println("異步線程執(zhí)行返回結(jié)果......"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(MDC.get("userId")); } }
package com.example.parentchildthreadspassingdata.mdc_taskdecorator; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author tom */ @Slf4j @RestController public class Test3Controller { private AsyncServiceImpl asyncService; @Autowired @Qualifier(value = "asyncServiceImpl3") public void setAsyncService(AsyncServiceImpl asyncService) { this.asyncService = asyncService; } /** * 使用MDC+TaskDecorator方式 * 本質(zhì)也是ThreadLocal+TaskDecorator方式 * * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test3") public String test3() throws InterruptedException, ExecutionException { MDC.put("userId", "123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync3(); String s = completableFuture.get(); log.info("result {}", s); return s; } }
4、InheritableThreadLocal
package com.example.parentchildthreadspassingdata.inheritablethreadlocal; /** * @author tom * 使用InheritableThreadLocal存儲線程之間共享的數(shù)據(jù)變量,如登錄的用戶信息 */ public class UserInheritableUtils { private static final InheritableThreadLocal<String> userLocal = new InheritableThreadLocal<>(); public static String getUserId() { return userLocal.get(); } public static void setUserId(String userId) { userLocal.set(userId); } public static void clear() { userLocal.remove(); } }
package com.example.parentchildthreadspassingdata.inheritablethreadlocal; import com.example.parentchildthreadspassingdata.mdc_taskdecorator.VisibleThreadPoolTaskExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class ExecutorConfig { @Bean(name = "asyncServiceExecutor4") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor------------"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 使用可視化運行狀態(tài)的線程池 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // 配置核心線程數(shù) executor.setCorePoolSize(2); // 配置最大線程數(shù) executor.setMaxPoolSize(2); // 配置隊列大小 executor.setQueueCapacity(60); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("my-thread"); // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù) // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; } }
package com.example.parentchildthreadspassingdata.inheritablethreadlocal; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; /** * @author */ @Slf4j @Service("asyncServiceImpl4") public class AsyncServiceImpl { /** * 使用InheritableThreadLocal獲取主線程傳遞的數(shù)據(jù) * * @return * @throws InterruptedException */ @Async("asyncServiceExecutor4") public CompletableFuture<String> executeValueAsync4() { log.info("start executeValueAsync"); System.out.println("異步線程執(zhí)行返回結(jié)果......"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(UserInheritableUtils.getUserId()); } }
package com.example.parentchildthreadspassingdata.inheritablethreadlocal; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author tom */ @Slf4j @RestController public class Test4Controller { private AsyncServiceImpl asyncService; @Autowired @Qualifier(value = "asyncServiceImpl4") public void setAsyncService(AsyncServiceImpl asyncService) { this.asyncService = asyncService; } /** * 使用InheritableThreadLocal方式 * * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test4") public String test4() throws InterruptedException, ExecutionException { UserInheritableUtils.setUserId("123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync4(); String s = completableFuture.get(); log.info("result {}", s); return s; } }
5、TransmittableThreadLocal
package com.example.parentchildthreadspassingdata.transmittablethreadlocal; import com.alibaba.ttl.TransmittableThreadLocal; /** * @author tom * 使用TransmittableThreadLocal存儲線程之間共享的數(shù)據(jù)變量,如登錄的用戶信息 */ public class UserTransmittableUtils { private static final TransmittableThreadLocal<String> userLocal = new TransmittableThreadLocal<>(); public static String getUserId() { return userLocal.get(); } public static void setUserId(String userId) { userLocal.set(userId); } public static void clear() { userLocal.remove(); } }
package com.example.parentchildthreadspassingdata.transmittablethreadlocal; import com.example.parentchildthreadspassingdata.mdc_taskdecorator.VisibleThreadPoolTaskExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author tom */ @Slf4j public class ExecutorConfig { @Bean(name = "asyncServiceExecutor5") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor------------"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 使用可視化運行狀態(tài)的線程池 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // 配置核心線程數(shù) executor.setCorePoolSize(2); // 配置最大線程數(shù) executor.setMaxPoolSize(2); // 配置隊列大小 executor.setQueueCapacity(60); // 配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("my-thread"); // rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù) // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 執(zhí)行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; } }
package com.example.parentchildthreadspassingdata.transmittablethreadlocal; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; /** * @author tom */ @Slf4j @Service("asyncServiceImpl5") public class AsyncServiceImpl { /** * 使用TransmittableThreadLocal獲取主線程傳遞的數(shù)據(jù) * * @return * @throws InterruptedException */ @Async("asyncServiceExecutor5") public CompletableFuture<String> executeValueAsync5() { log.info("start executeValueAsync"); System.out.println("異步線程執(zhí)行返回結(jié)果......"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(UserTransmittableUtils.getUserId()); } }
package com.example.parentchildthreadspassingdata.transmittablethreadlocal; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author tom */ @RestController @Slf4j public class Test5Controller { private AsyncServiceImpl asyncService; @Autowired @Qualifier(value = "asyncServiceImpl5") public void setAsyncService(AsyncServiceImpl asyncService) { this.asyncService = asyncService; } /** * 使用TransmittableThreadLocal方式 * * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test5") public String test5() throws InterruptedException, ExecutionException { UserTransmittableUtils.setUserId("123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync5(); String s = completableFuture.get(); log.info("result {}", s); return s; } }
6、方案對比
方案1,方案2,方案3主要是借助 TaskDecorator 進行父子線程之間傳遞數(shù)據(jù)。其中 MDC 方案主要借鑒于 MDC
的日志跟蹤的思想來實現(xiàn)。
方案4和方案5使用 InheritableThreadLocal 和 TransmittableThreadLocal 來實現(xiàn),其中
TransmittableThreadLocal 是阿里 InheritableThreadLocal 進行優(yōu)化封裝。
本人推薦使用方案5。
以上就是SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式的詳細內(nèi)容,更多關(guān)于SpringBoot父子線程數(shù)據(jù)傳遞的資料請關(guān)注腳本之家其它相關(guān)文章!
- 在 Spring Boot 中使用異步線程時的 HttpServletRequest 復用問題記錄
- Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
- springboot?正確的在異步線程中使用request的示例代碼
- SpringBoot?異步線程間傳遞上下文方式
- SpringBoot獲取HttpServletRequest的3種方式總結(jié)
- SpringBoot詳細講解異步任務(wù)如何獲取HttpServletRequest
- SpringBoot實現(xiàn)任意位置獲取HttpServletRequest對象
- Spring?Boot?中正確地在異步線程中使用?HttpServletRequest的方法
相關(guān)文章
SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟
這篇文章主要介紹了SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟,使用注解關(guān)閉Swagger2,避免接口重復暴露,非常具有實用價值,需要的朋友可以參考下2018-12-12Gradle環(huán)境下導出Swagger為PDF的步驟詳解
這篇文章主要介紹了Gradle環(huán)境下導出Swagger為PDF的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-06-06SpringCloud組件OpenFeign之默認HTTP請求方式詳解
這篇文章主要介紹了SpringCloud組件OpenFeign之默認HTTP請求方式詳解,在SpringMvcContract類中有個這樣的方法processAnnotationOnMethod,見名思意,這個方法就是處理Feign接口下方法上的注解的,需要的朋友可以參考下2024-01-01Spring?Boot?中的?@DateTimeFormat?和?@JsonFormat?的用法及作用詳解
本文介紹了SpringBoot中的@DateTimeFormat和@JsonFormat注解的用法,解釋了它們在處理日期和時間數(shù)據(jù)時的作用,并通過實例代碼展示了如何在REST控制器中使用這些注解,感興趣的朋友跟隨小編一起看看吧2024-11-11