欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式

 更新時間:2024年08月06日 09:05:38   作者:242030  
這篇文章主要介紹了SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式,文中通過代碼示例給大家介紹的非常詳細,對大家的學習或工作有一定的幫助,需要的朋友可以參考下

SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式

1、ThreadLocal+TaskDecorator

package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;

/**
 * @author tom
 * 使用ThreadLocal存儲共享的數(shù)據(jù)變量,如登錄的用戶信息
 */
public class UserUtils {

    private static final ThreadLocal<String> userLocal = new ThreadLocal<>();

    public static String getUserId() {
        return userLocal.get();
    }

    public static void setUserId(String userId) {
        userLocal.set(userId);
    }

    public static void clear() {
        userLocal.remove();
    }

}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;

import org.springframework.core.task.TaskDecorator;

/**
 * @author tom
 * 線程池修飾類
 */
public class CustomTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {
        // 獲取主線程中的請求信息(我們的用戶信息也放在里面)
        String robotId = UserUtils.getUserId();
        return () -> {
            try {
                // 將主線程的請求信息,設(shè)置到子線程中
                UserUtils.setUserId(robotId);
                // 執(zhí)行子線程,這一步不要忘了
                runnable.run();
            } finally {
                // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
                UserUtils.clear();
            }
        };
    }
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {
            return;
        }
        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class ExecutorConfig {

    @Bean(name = "asyncServiceExecutor1")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor------------");
        // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 使用可視化運行狀態(tài)的線程池
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        // 配置核心線程數(shù)
        executor.setCorePoolSize(2);
        // 配置最大線程數(shù)
        executor.setMaxPoolSize(2);
        // 配置隊列大小
        executor.setQueueCapacity(60);
        // 配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("my-thread");
        // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
        // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 增加線程池修飾類
        executor.setTaskDecorator(new CustomTaskDecorator());
        // 執(zhí)行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * @author tom
 */
@Slf4j
@Service("asyncServiceImpl1")
public class AsyncServiceImpl {

    /**
     * 使用ThreadLocal方式傳遞
     * 帶有返回值
     *
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor1")
    public CompletableFuture<String> executeValueAsync1() {
        log.info("start executeValueAsync");
        System.out.println("異步線程執(zhí)行返回結(jié)果......");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(UserUtils.getUserId());
    }

}
package com.example.parentchildthreadspassingdata.threadLocal_taskDecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author tom
 */
@Slf4j
@RestController
public class Test1Controller {

    private AsyncServiceImpl asyncService;

    @Autowired
    @Qualifier(value = "asyncServiceImpl1")
    public void setAsyncService(AsyncServiceImpl asyncService) {
        this.asyncService = asyncService;
    }

    /**
     * 使用ThreadLocal+TaskDecorator的方式
     *
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test1")
    public String test1() throws ExecutionException, InterruptedException {
        UserUtils.setUserId("123456");
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync1();
        String s = completableFuture.get();
        log.info("result {}", s);
        return s;
    }
}

2、RequestContextHolder+TaskDecorator

package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

/**
 * @author tom
 * 線程池修飾類
 */
public class CustomTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {
        // 獲取主線程中的請求信息(我們的用戶信息也放在里面)
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        return () -> {
            try {
                // 將主線程的請求信息,設(shè)置到子線程中
                RequestContextHolder.setRequestAttributes(attributes);
                // 執(zhí)行子線程,這一步不要忘了
                runnable.run();
            } finally {
                // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {
            return;
        }
        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class ExecutorConfig {

    @Bean(name = "asyncServiceExecutor2")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor------------");
        // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 使用可視化運行狀態(tài)的線程池
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        // 配置核心線程數(shù)
        executor.setCorePoolSize(2);
        // 配置最大線程數(shù)
        executor.setMaxPoolSize(2);
        // 配置隊列大小
        executor.setQueueCapacity(60);
        // 配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("my-thread");
        // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //增加線程池修飾類
        executor.setTaskDecorator(new CustomTaskDecorator());
        // 執(zhí)行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.CompletableFuture;

/**
 * @author tom
 */
@Slf4j
@Service("asyncServiceImpl2")
public class AsyncServiceImpl {

    /**
     * 使用RequestAttributes獲取主線程傳遞的數(shù)據(jù)
     *
     * @return
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor2")
    public CompletableFuture<String> executeValueAsync2(){
        log.info("start executeValueAsync");
        System.out.println("異步線程執(zhí)行返回結(jié)果......");
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        Object userId = attributes.getAttribute("userId", 0);
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(userId.toString());
    }

}
package com.example.parentchildthreadspassingdata.requestcontextholder_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author tom
 */
@Slf4j
@RestController
public class Test2Controller {

    private AsyncServiceImpl asyncService;

    @Autowired
    @Qualifier(value = "asyncServiceImpl2")
    public void setAsyncService(AsyncServiceImpl asyncService) {
        this.asyncService = asyncService;
    }

    /**
     * RequestContextHolder+TaskDecorator的方式
     *
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test2")
    public String test2() throws InterruptedException, ExecutionException {
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        attributes.setAttribute("userId", "123456", 0);
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();
        String s = completableFuture.get();
        log.info("result {}", s);
        return s;
    }
}

3、MDC+TaskDecorator

package com.example.parentchildthreadspassingdata.mdc_taskdecorator;

import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;

/**
 * @author tom
 * 線程池修飾類
 */
public class MdcTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {
        // 獲取主線程中的請求信息(我們的用戶信息也放在里面)
        String userId = MDC.get("userId");
        return () -> {
            try {
                // 將主線程的請求信息,設(shè)置到子線程中
                MDC.put("userId", userId);
                // 執(zhí)行子線程,這一步不要忘了
                runnable.run();
            } finally {
                // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
                MDC.clear();
            }
        };
    }
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {
            return;
        }
        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class ExecutorConfig {

    @Bean(name = "asyncServiceExecutor3")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor------------");
        // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 使用可視化運行狀態(tài)的線程池
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        // 配置核心線程數(shù)
        executor.setCorePoolSize(2);
        // 配置最大線程數(shù)
        executor.setMaxPoolSize(2);
        // 配置隊列大小
        executor.setQueueCapacity(60);
        // 配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("my-thread");
        // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
        // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //增加MDC的線程池修飾類
        executor.setTaskDecorator(new MdcTaskDecorator());
        //執(zhí)行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * @author tom
 */
@Slf4j
@Service("asyncServiceImpl3")
public class AsyncServiceImpl {

    /**
     * 使用MDC獲取主線程傳遞的數(shù)據(jù)
     *
     * @return
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor3")
    public CompletableFuture<String> executeValueAsync3() {
        log.info("start executeValueAsync");
        System.out.println("異步線程執(zhí)行返回結(jié)果......");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(MDC.get("userId"));
    }
}
package com.example.parentchildthreadspassingdata.mdc_taskdecorator;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author tom
 */
@Slf4j
@RestController
public class Test3Controller {

    private AsyncServiceImpl asyncService;

    @Autowired
    @Qualifier(value = "asyncServiceImpl3")
    public void setAsyncService(AsyncServiceImpl asyncService) {
        this.asyncService = asyncService;
    }

    /**
     * 使用MDC+TaskDecorator方式
     * 本質(zhì)也是ThreadLocal+TaskDecorator方式
     *
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test3")
    public String test3() throws InterruptedException, ExecutionException {
        MDC.put("userId", "123456");
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();
        String s = completableFuture.get();
        log.info("result {}", s);
        return s;
    }
}

4、InheritableThreadLocal

package com.example.parentchildthreadspassingdata.inheritablethreadlocal;

/**
 * @author tom
 * 使用InheritableThreadLocal存儲線程之間共享的數(shù)據(jù)變量,如登錄的用戶信息
 */
public class UserInheritableUtils {

    private static final InheritableThreadLocal<String> userLocal = new InheritableThreadLocal<>();

    public static String getUserId() {
        return userLocal.get();
    }

    public static void setUserId(String userId) {
        userLocal.set(userId);
    }

    public static void clear() {
        userLocal.remove();
    }

}
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;

import com.example.parentchildthreadspassingdata.mdc_taskdecorator.VisibleThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class ExecutorConfig {

    @Bean(name = "asyncServiceExecutor4")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor------------");
        // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 使用可視化運行狀態(tài)的線程池
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        // 配置核心線程數(shù)
        executor.setCorePoolSize(2);
        // 配置最大線程數(shù)
        executor.setMaxPoolSize(2);
        // 配置隊列大小
        executor.setQueueCapacity(60);
        // 配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("my-thread");
        // rejection-policy: 當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
        // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執(zhí)行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
}
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * @author
 */
@Slf4j
@Service("asyncServiceImpl4")
public class AsyncServiceImpl {

    /**
     * 使用InheritableThreadLocal獲取主線程傳遞的數(shù)據(jù)
     *
     * @return
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor4")
    public CompletableFuture<String> executeValueAsync4() {
        log.info("start executeValueAsync");
        System.out.println("異步線程執(zhí)行返回結(jié)果......");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(UserInheritableUtils.getUserId());
    }
}
package com.example.parentchildthreadspassingdata.inheritablethreadlocal;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author tom
 */
@Slf4j
@RestController
public class Test4Controller {

    private AsyncServiceImpl asyncService;

    @Autowired
    @Qualifier(value = "asyncServiceImpl4")
    public void setAsyncService(AsyncServiceImpl asyncService) {
        this.asyncService = asyncService;
    }

    /**
     * 使用InheritableThreadLocal方式
     *
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test4")
    public String test4() throws InterruptedException, ExecutionException {
        UserInheritableUtils.setUserId("123456");
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync4();
        String s = completableFuture.get();
        log.info("result {}", s);
        return s;
    }
}

5、TransmittableThreadLocal

package com.example.parentchildthreadspassingdata.transmittablethreadlocal;

import com.alibaba.ttl.TransmittableThreadLocal;

/**
 * @author tom
 * 使用TransmittableThreadLocal存儲線程之間共享的數(shù)據(jù)變量,如登錄的用戶信息
 */
public class UserTransmittableUtils {

    private static final TransmittableThreadLocal<String> userLocal = new TransmittableThreadLocal<>();

    public static String getUserId() {
        return userLocal.get();
    }

    public static void setUserId(String userId) {
        userLocal.set(userId);
    }

    public static void clear() {
        userLocal.remove();
    }
}
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;

import com.example.parentchildthreadspassingdata.mdc_taskdecorator.VisibleThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author tom
 */
@Slf4j
public class ExecutorConfig {

    @Bean(name = "asyncServiceExecutor5")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor------------");
        // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 使用可視化運行狀態(tài)的線程池
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        // 配置核心線程數(shù)
        executor.setCorePoolSize(2);
        // 配置最大線程數(shù)
        executor.setMaxPoolSize(2);
        // 配置隊列大小
        executor.setQueueCapacity(60);
        // 配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("my-thread");
        // rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
        // CALLER_RUNS: 不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 執(zhí)行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
}
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * @author tom
 */
@Slf4j
@Service("asyncServiceImpl5")
public class AsyncServiceImpl {

    /**
     * 使用TransmittableThreadLocal獲取主線程傳遞的數(shù)據(jù)
     *
     * @return
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor5")
    public CompletableFuture<String> executeValueAsync5() {
        log.info("start executeValueAsync");
        System.out.println("異步線程執(zhí)行返回結(jié)果......");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(UserTransmittableUtils.getUserId());
    }
}
package com.example.parentchildthreadspassingdata.transmittablethreadlocal;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author tom
 */
@RestController
@Slf4j
public class Test5Controller {

    private AsyncServiceImpl asyncService;

    @Autowired
    @Qualifier(value = "asyncServiceImpl5")
    public void setAsyncService(AsyncServiceImpl asyncService) {
        this.asyncService = asyncService;
    }

    /**
     * 使用TransmittableThreadLocal方式
     *
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test5")
    public String test5() throws InterruptedException, ExecutionException {
        UserTransmittableUtils.setUserId("123456");
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();
        String s = completableFuture.get();
        log.info("result {}", s);
        return s;
    }
}

6、方案對比

方案1,方案2,方案3主要是借助 TaskDecorator 進行父子線程之間傳遞數(shù)據(jù)。其中 MDC 方案主要借鑒于 MDC

的日志跟蹤的思想來實現(xiàn)。

方案4和方案5使用 InheritableThreadLocal 和 TransmittableThreadLocal 來實現(xiàn),其中

TransmittableThreadLocal 是阿里 InheritableThreadLocal 進行優(yōu)化封裝。

本人推薦使用方案5。

以上就是SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式的詳細內(nèi)容,更多關(guān)于SpringBoot父子線程數(shù)據(jù)傳遞的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • mybatis的使用-Mapper文件各種語法介紹

    mybatis的使用-Mapper文件各種語法介紹

    這篇文章主要介紹了mybatis的使用-Mapper文件各種語法介紹,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟

    SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟

    這篇文章主要介紹了SpringBoot在生產(chǎn)快速禁用Swagger2的方法步驟,使用注解關(guān)閉Swagger2,避免接口重復暴露,非常具有實用價值,需要的朋友可以參考下
    2018-12-12
  • Gradle環(huán)境下導出Swagger為PDF的步驟詳解

    Gradle環(huán)境下導出Swagger為PDF的步驟詳解

    這篇文章主要介紹了Gradle環(huán)境下導出Swagger為PDF的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-06-06
  • java實現(xiàn)的統(tǒng)計字符算法示例

    java實現(xiàn)的統(tǒng)計字符算法示例

    這篇文章主要介紹了java實現(xiàn)的統(tǒng)計字符算法,涉及java針對字符的遍歷、判斷、運算等相關(guān)操作技巧,需要的朋友可以參考下
    2017-10-10
  • SpringCloud組件OpenFeign之默認HTTP請求方式詳解

    SpringCloud組件OpenFeign之默認HTTP請求方式詳解

    這篇文章主要介紹了SpringCloud組件OpenFeign之默認HTTP請求方式詳解,在SpringMvcContract類中有個這樣的方法processAnnotationOnMethod,見名思意,這個方法就是處理Feign接口下方法上的注解的,需要的朋友可以參考下
    2024-01-01
  • Spring?Boot?中的?@DateTimeFormat?和?@JsonFormat?的用法及作用詳解

    Spring?Boot?中的?@DateTimeFormat?和?@JsonFormat?的用法及作用詳解

    本文介紹了SpringBoot中的@DateTimeFormat和@JsonFormat注解的用法,解釋了它們在處理日期和時間數(shù)據(jù)時的作用,并通過實例代碼展示了如何在REST控制器中使用這些注解,感興趣的朋友跟隨小編一起看看吧
    2024-11-11
  • spring實例化javabean的三種方式分享

    spring實例化javabean的三種方式分享

    這篇文章介紹了spring實例化javabean的三種方式,有需要的朋友可以參考一下
    2013-10-10
  • 基于Java的界面開發(fā)詳細步驟(用戶注冊登錄)

    基于Java的界面開發(fā)詳細步驟(用戶注冊登錄)

    通過一段時間Java Web的學習,寫一個簡單的注冊登陸界面來做個總結(jié),這篇文章主要給大家介紹了基于Java的界面開發(fā)(用戶注冊登錄)的相關(guān)資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-01-01
  • Spring Boot webflux使用方法解析

    Spring Boot webflux使用方法解析

    這篇文章主要介紹了Spring Boot webflux使用方法解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-03-03
  • Spring Cache手動清理Redis緩存

    Spring Cache手動清理Redis緩存

    這篇文章主要介紹了Spring Cache手動清理Redis緩存,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-10-10

最新評論