欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

TransmittableThreadLocal通過(guò)javaAgent實(shí)現(xiàn)線(xiàn)程傳遞并支持ForkJoin

 更新時(shí)間:2023年06月27日 14:25:28   作者:二哈_8fd0  
這篇文章主要介紹了TransmittableThreadLocal通過(guò)javaAgent實(shí)現(xiàn)線(xiàn)程傳遞并支持ForkJoin詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

官方文檔

感謝TTL 作者在我上一篇文章評(píng)論,讓我知道了通過(guò)官方文檔去全面了解其使用方式及其支持的重要性。

所以官方文檔先貼出來(lái)~

alibaba/transmittable-thread-local: ?? TransmittableThreadLocal (TTL), the missing Java™ std lib(simple & 0-dependency) for framework/middleware, provide an enhanced InheritableThreadLocal that transmits values between threads even using thread pooling components. (github.com)

重要的不單單是看官方文檔,還有其他問(wèn)題可以通過(guò)issue查看最新動(dòng)態(tài)、以及和社區(qū)交流,本文背景以TTL最新版本2.12.6版本為基準(zhǔn)。

TransmittableThreadLocal線(xiàn)程間傳遞邏輯

上一篇文章及評(píng)論,可以看到TTL官方描述其通過(guò)javaAgent技術(shù)可以實(shí)現(xiàn)對(duì)ForkJoin的線(xiàn)程傳遞支持。

通過(guò)測(cè)試,確實(shí)可以

通過(guò)javaagent指定jar包

簡(jiǎn)單的測(cè)試代碼如下

    private final static TransmittableThreadLocal<Integer> LOCAL = new TransmittableThreadLocal<>();
    @PostMapping("/a")
    public void test() {
        final int num = ThreadLocalRandom.current().nextInt(0, 1500);
        LOCAL.set(num);
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 150; i++) {
            list.add(i);
        }
        list.stream().parallel().forEach(o -> {
            if (!Objects.equals(LOCAL.get(), num)) {
                System.out.println("如果不同代表沒(méi)有正確傳遞噢");
            }
        });
    }

通過(guò)上述代碼及通過(guò)javaagent引入jar包后不用手動(dòng)設(shè)置TTL的線(xiàn)程池裝飾器也可以了!我們接下來(lái)看看其中的奧秘!(javaAgent技術(shù)大家可以自行搜索學(xué)習(xí)一下,例如skywalking等無(wú)侵入場(chǎng)景應(yīng)用)

// 通過(guò)premain 進(jìn)行字節(jié)碼操作添加我們想要的攔截,(attach,premain兩個(gè)java agent核心方法大家可以搜索詳細(xì)了解)
    public static void premain(String agentArgs, @Nonnull Instrumentation inst) {
// 一個(gè) volatile 變量,不清除這里volatile的必要性,此文章暫不關(guān)注
        kvs = splitCommaColonStringToKV(agentArgs);
// 日志打印的一個(gè)多態(tài),可以通過(guò)參數(shù)設(shè)置。默認(rèn)只打印 StdErr,也可以設(shè)置為stdout
        Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
        final Logger logger = Logger.getLogger(TtlAgent.class);
        try {
            logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
// 如果是用javaAgent無(wú)侵入的實(shí)現(xiàn)TTL邏輯(上一篇講到正常使用TTL的裝飾器線(xiàn)程池來(lái)實(shí)現(xiàn))默認(rèn)是開(kāi)啟的(TimerTask默認(rèn)不開(kāi)啟) 這里是查看線(xiàn)程池的配置,默認(rèn)開(kāi)啟,可以在jvm參數(shù)設(shè)置為關(guān)閉
            final boolean disableInheritable = isDisableInheritableForThreadPool();
// 需要修改字節(jié)碼的列表, 通過(guò)javassist修改字節(jié)碼 JavassistTransformlet是TTL 做的抽象對(duì)jdk可能出現(xiàn)的3種出現(xiàn)線(xiàn)程傳遞的情況分別做了實(shí)現(xiàn)
            final List&lt;JavassistTransformlet&gt; transformletList = new ArrayList&lt;JavassistTransformlet&gt;();
// 線(xiàn)程池的
            transformletList.add(new TtlExecutorTransformlet(disableInheritable));
// forkjoin的
            transformletList.add(new TtlForkJoinTransformlet(disableInheritable));
// timer task默認(rèn)關(guān)閉,如果設(shè)置開(kāi)啟也加入
            if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
// 這是jdk提供的接口,給客戶(hù)端提供對(duì)class 字節(jié)碼做增強(qiáng)的入口
            final ClassFileTransformer transformer = new TtlTransformer(transformletList);
// 將TTL的字節(jié)碼增強(qiáng)邏輯 織入,加載對(duì)應(yīng)class時(shí)調(diào)用(也取決于要增強(qiáng)的class對(duì)象load時(shí)機(jī))
            inst.addTransformer(transformer, true);
            logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");
            logger.info("[TtlAgent.premain] end");
// 如果使用了 javaAgent 增強(qiáng),你再手動(dòng)給線(xiàn)程池包裹裝飾器則會(huì)直接返回,不需要包裝了
            ttlAgentLoaded = true;
        } catch (Exception e) {
            String msg = "Fail to load TtlAgent , cause: " + e.toString();
            logger.log(Level.SEVERE, msg, e);
            throw new IllegalStateException(msg, e);
        }
    }

上面貼出了TTL使用javaAgent的premain進(jìn)行字節(jié)碼增強(qiáng)的流程下面看具體的實(shí)現(xiàn)以及如何使用jdk暴露的ClassFileTransformer接口進(jìn)行操作字節(jié)碼
先來(lái)看TtlTransformer ,我們的字節(jié)碼增強(qiáng)邏輯的橋梁

public class TtlTransformer implements ClassFileTransformer {
    private static final Logger logger = Logger.getLogger(TtlTransformer.class);
    private static final byte[] EMPTY_BYTE_ARRAY = {};
// 要被增強(qiáng)的列表
    private final List&lt;JavassistTransformlet&gt; transformletList = new ArrayList&lt;JavassistTransformlet&gt;();
    TtlTransformer(List&lt;? extends JavassistTransformlet&gt; transformletList) {
// 復(fù)制到當(dāng)前類(lèi)的成員變量
        for (JavassistTransformlet transformlet : transformletList) {
            this.transformletList.add(transformlet);
            logger.info("[TtlTransformer] add Transformlet " + transformlet.getClass() + " success");
        }
    }
    @Override
    public final byte[] transform(@Nonnull final ClassLoader loader, @Nullable final String classFile, final Class&lt;?&gt; classBeingRedefined,
                                  final ProtectionDomain protectionDomain, final byte[] classFileBuffer) {
        try {
            // Lambda has no class file, no need to transform, just return.
            if (classFile == null) return EMPTY_BYTE_ARRAY;
            // 獲取到當(dāng)前加載的class的類(lèi)名
            final String className = toClassName(classFile);
            for (JavassistTransformlet transformlet : transformletList) {
// 調(diào)用子類(lèi)實(shí)現(xiàn)
                final byte[] bytes = transformlet.doTransform(className, classFileBuffer, loader);
                if (bytes != null) return bytes;
            }
        } catch (Throwable t) {
            String msg = "Fail to transform class " + classFile + ", cause: " + t.toString();
            logger.log(Level.SEVERE, msg, t);
            throw new IllegalStateException(msg, t);
        }
        return EMPTY_BYTE_ARRAY;
    }
    private static String toClassName(final String classFile) {
        return classFile.replace('/', '.');
    }
}

下面來(lái)看具體的實(shí)現(xiàn)有四個(gè)

2.12.6版本增強(qiáng)實(shí)現(xiàn)有4個(gè)

目前TTL代碼迭代還是比較快的,雖然代碼不多,但是一直在改動(dòng)。和2.10.2的差別還是挺大的。所以小伙伴們可以多關(guān)注一下官方github上的相關(guān)動(dòng)態(tài)

先看TtlExecutorTransformlet

// 為什么把源碼中的注釋放進(jìn)來(lái),因?yàn)門(mén)TL的注釋寫(xiě)的還是挺全。
/**
 * TTL {@link JavassistTransformlet} for {@link java.util.concurrent.Executor}.
 *
 * @author Jerry Lee (oldratlee at gmail dot com)
 * @author wuwen5 (wuwen.55 at aliyun dot com)
// 通過(guò)字節(jié)碼操作會(huì)覆蓋以下線(xiàn)程執(zhí)行相關(guān)類(lèi)
 * @see java.util.concurrent.Executor
 * @see java.util.concurrent.ExecutorService
 * @see java.util.concurrent.ThreadPoolExecutor
 * @see java.util.concurrent.ScheduledThreadPoolExecutor
 * @see java.util.concurrent.Executors
// 這是也是TTL的一個(gè)實(shí)現(xiàn),之后再看
 * @see TtlPriorityBlockingQueueTransformlet
 * @since 2.5.1
 */
public class TtlExecutorTransformlet implements JavassistTransformlet {
    private static final Logger logger = Logger.getLogger(TtlExecutorTransformlet.class);
// 裝我們要攔截操作字節(jié)碼的類(lèi)
    private static final Set<String> EXECUTOR_CLASS_NAMES = new HashSet<String>();
// 這次套路變了,如果使用javaAgent不用一層一層裝飾再去增強(qiáng)了
// 我們的目的就是 在thread執(zhí)行任務(wù)時(shí)候,
//任務(wù): 初始化 [捕獲capture] -> 目標(biāo)執(zhí)行前 [重放replay] -> 目標(biāo)執(zhí)行后 還原 [還原restore]即可
// 現(xiàn)在我們聚焦在 Runnable 和 Callable即可,TTL的切入點(diǎn)是在各種線(xiàn)程池
//(如果你直接new,這個(gè)原生就支持,可以看看前一篇文章,就算你使用TTL的父類(lèi)InheritableThreadLocal在new thread時(shí)會(huì)將第一次主線(xiàn)程的ThreadLocalMap的引用都帶過(guò)去噢,想一下childValue這個(gè)方法)
// 所以我們只要對(duì)所有線(xiàn)程池的 參數(shù)中含有 Runnable和 Callable替換成Ttl的增強(qiáng)裝飾器即可!
    private static final Map<String, String> PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS = new HashMap<String, String>();
    private static final String THREAD_POOL_EXECUTOR_CLASS_NAME = "java.util.concurrent.ThreadPoolExecutor";
    private static final String RUNNABLE_CLASS_NAME = "java.lang.Runnable";
    static {  EXECUTOR_CLASS_NAMES.add(THREAD_POOL_EXECUTOR_CLASS_NAME);
  EXECUTOR_CLASS_NAMES.add("java.util.concurrent.ScheduledThreadPoolExecutor");       PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put(RUNNABLE_CLASS_NAME, "com.alibaba.ttl.TtlRunnable");     PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.util.concurrent.Callable", "com.alibaba.ttl.TtlCallable");
    }
    private static final String THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ThreadFactory";
    private final boolean disableInheritableForThreadPool;
    public TtlExecutorTransformlet(boolean disableInheritableForThreadPool) {
        this.disableInheritableForThreadPool = disableInheritableForThreadPool;
    }
    @Override
    public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
//看issues可以知道為什么要 屏蔽java.util,但是具體原因還不明確。
// 因?yàn)樗蓄?lèi)加載都會(huì)進(jìn)入javaAgent的 premain方法,對(duì)應(yīng)的實(shí)現(xiàn)自己選擇去操作哪些類(lèi)的字節(jié)碼,例如skywalking那么多插件,就是用全類(lèi)名一個(gè)一個(gè)指定常見(jiàn)開(kāi)源框架的可做攔截邏輯的類(lèi),然后去添加apm的邏輯增強(qiáng)
        // work-around ClassCircularityError:
        //      https://github.com/alibaba/transmittable-thread-local/issues/278
        //      https://github.com/alibaba/transmittable-thread-local/issues/234
        if (isClassAtPackageJavaUtil(classInfo.getClassName())) return;
        final CtClass clazz = classInfo.getCtClass();
// 指定的類(lèi)
        if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
            for (CtMethod method : clazz.getDeclaredMethods()) {
                updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
            }
// 首先DisableInheritableThreadFactory和我之前的理解有些出入,但是看實(shí)現(xiàn)確實(shí)是一個(gè)屏蔽TTL的邏輯實(shí)現(xiàn),但是如果使用agent需要添加jvm參數(shù),那么去掉參數(shù)了就不會(huì)進(jìn)入當(dāng)前jar的premain方法
// 而且經(jīng)過(guò)測(cè)試,就算讓這里為true也不能屏蔽什么,因?yàn)榫退闾鎿Q了ThreadFactory但是Runnable,Callable已經(jīng)在類(lèi)加載時(shí)字節(jié)碼層面替換為T(mén)TL的裝飾器了,這里好像亡羊補(bǔ)牢了 - -!雖然不明白但是這里不影響我們使用邏輯
            if (disableInheritableForThreadPool) updateConstructorDisableInheritable(clazz);
// 標(biāo)記類(lèi)被字節(jié)碼增強(qiáng)了
            classInfo.setModified();
        } else {
// 如果不是被指定增強(qiáng)的類(lèi)
// 基本類(lèi)型數(shù)組,接口,注解直接返回
            if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || clazz.isAnnotation()) {
                return;
            }
// 這里利用了 javassist一些操作,去獲取已經(jīng)加載的類(lèi)對(duì)象,判斷當(dāng)前類(lèi)是否是指定要被增強(qiáng)類(lèi)的子類(lèi),后續(xù)要做一些操作
// 包括其他基于java實(shí)現(xiàn)的框架對(duì)線(xiàn)程池的擴(kuò)展
// 我們debug啟動(dòng)時(shí)可以看到 tomcat線(xiàn)程池org.apache.tomcat.util.threads.ThreadPoolExecutor 繼承了jdk的線(xiàn)程池
// 我們就可以進(jìn)入到方法內(nèi),后續(xù)看下面的方法
            if (!clazz.subclassOf(clazz.getClassPool().get(THREAD_POOL_EXECUTOR_CLASS_NAME))) return;
            logger.info("Transforming class " + classInfo.getClassName());
// 返回一個(gè) 操作字節(jié)碼的結(jié)果然后標(biāo)記該類(lèi)是否成功 增強(qiáng)了
            final boolean modified = updateBeforeAndAfterExecuteMethodOfExecutorSubclass(clazz);
            if (modified) classInfo.setModified();
        }
    }
    /**
     * @see com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils#doAutoWrap(Runnable)
     * @see com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils#doAutoWrap(Callable)
     */
    @SuppressFBWarnings("VA_FORMAT_STRING_USES_NEWLINE") // [ERROR] Format string should use %n rather than \n
    private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
        final int modifiers = method.getModifiers();
        if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;
        CtClass[] parameterTypes = method.getParameterTypes();
        StringBuilder insertCode = new StringBuilder();
        for (int i = 0; i < parameterTypes.length; i++) {
// 核心的邏輯,我們只關(guān)注 Runnable,Callable,然后對(duì)其線(xiàn)程池所有方法含有該參數(shù)進(jìn)行字節(jié)碼增強(qiáng),在進(jìn)入方法并執(zhí)行方法前替換為T(mén)TL的裝飾器
            final String paramTypeName = parameterTypes[i].getName();
            if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
                String code = String.format(
                        // auto decorate to TTL wrapper
                        "$%d = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doAutoWrap($%<d);",
                        i + 1);
                insertCode.append(code);
            }
        }
        if (insertCode.length() > 0) {
            logger.info("insert code before method " + signatureOfMethod(method) + " of class " +
                method.getDeclaringClass().getName() + ":\n" + insertCode);
            method.insertBefore(insertCode.toString());
        }
    }
// 省略替換 ThreadFactory的代碼
    /**
     * @see Utils#doUnwrapIfIsAutoWrapper(Runnable)
     */
    private boolean updateBeforeAndAfterExecuteMethodOfExecutorSubclass(@NonNull final CtClass clazz) throws NotFoundException, CannotCompileException {
        final CtClass runnableClass = clazz.getClassPool().get(RUNNABLE_CLASS_NAME);
        final CtClass threadClass = clazz.getClassPool().get("java.lang.Thread");
        final CtClass throwableClass = clazz.getClassPool().get("java.lang.Throwable");
        boolean modified = false;
        try {
// ScheduledThreadPoolExecutor 也是繼承自ThreadPoolExecutor,TTL就是利用了ThreadPoolExecutor#beforeExecute(Thread t, Runnable r) 
// 和ThreadPoolExecutor#afterExecute(Runnable r, Throwable t)
//這是jdk線(xiàn)程池提供的兩個(gè)鉤子,TTL對(duì)第三方框架的線(xiàn)程池類(lèi)名完全不了解,所以我們無(wú)法直接拿到子類(lèi)含有Runnable的方法(我們不知道繼承了幾層,無(wú)法通過(guò)反射去拿到method對(duì)象)
// 那么TTL目前通過(guò)jdk的 beforeExecute,和 afterExecute來(lái)對(duì)其他框架或者自定義的線(xiàn)程池進(jìn)行去除TTL裝飾器的邏輯
//猜測(cè)目的是無(wú)法保證正確控制線(xiàn)程間正確傳遞,我們直接消除測(cè)試開(kāi)發(fā)上的迷惑行為
            final CtMethod beforeExecute = clazz.getDeclaredMethod("beforeExecute", new CtClass[]{threadClass, runnableClass});
            // unwrap runnable if IsAutoWrapper
            String code = "$2 = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doUnwrapIfIsAutoWrapper($2);";
            logger.info("insert code before method " + signatureOfMethod(beforeExecute) + " of class " +
                beforeExecute.getDeclaringClass().getName() + ": " + code);
            beforeExecute.insertBefore(code);
            modified = true;
        } catch (NotFoundException e) {
// 前提是 目標(biāo)線(xiàn)程池重寫(xiě)了 beforeExecute 才能進(jìn)行unwraper操作
            // clazz does not override beforeExecute method, do nothing.
        }
// 下面邏輯類(lèi)似 攔截afterExecute
        try {
            final CtMethod afterExecute = clazz.getDeclaredMethod("afterExecute", new CtClass[]{runnableClass, throwableClass});
            // unwrap runnable if IsAutoWrapper
            String code = "$1 = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doUnwrapIfIsAutoWrapper($1);";
            logger.info("insert code before method " + signatureOfMethod(afterExecute) + " of class " +
                afterExecute.getDeclaringClass().getName() + ": " + code);
            afterExecute.insertBefore(code);
            modified = true;
        } catch (NotFoundException e) {
            // clazz does not override afterExecute method, do nothing.
        }
        return modified;
    }

上面看了 典型的 ThreadPoolExecutor和ScheduledThreadPoolExecutor的增強(qiáng)以及對(duì)第三方擴(kuò)展線(xiàn)程池一定程度上的(重寫(xiě)jdk線(xiàn)程池的鉤子方法才會(huì)操作)消除歧義

關(guān)聯(lián)的TtlPriorityBlockingQueueTransformlet

public class TtlPriorityBlockingQueueTransformlet implements JavassistTransformlet {
    private static final Logger logger = Logger.getLogger(TtlPriorityBlockingQueueTransformlet.class);
// 我們要攔截 關(guān)于優(yōu)先級(jí)隊(duì)列的線(xiàn)程池
// https://github.com/alibaba/transmittable-thread-local/issues/330
// 這里有前因后果,因?yàn)閞unnable等接口也實(shí)現(xiàn)了 可排序接口,會(huì)castClass錯(cuò)誤
    private static final String PRIORITY_BLOCKING_QUEUE_CLASS_NAME = "java.util.concurrent.PriorityBlockingQueue";
    private static final String PRIORITY_QUEUE_CLASS_NAME = "java.util.PriorityQueue";
    private static final String COMPARATOR_CLASS_NAME = "java.util.Comparator";
    private static final String COMPARATOR_FIELD_NAME = "comparator";
    @Override
    public void doTransform(@NonNull ClassInfo classInfo) throws IOException, CannotCompileException, NotFoundException {
        final String className = classInfo.getClassName();
// 一共兩種優(yōu)先級(jí)隊(duì)列的攔截,但增強(qiáng)邏輯是一樣的
        if (PRIORITY_BLOCKING_QUEUE_CLASS_NAME.equals(className)) {
            updatePriorityBlockingQueueClass(classInfo.getCtClass());
            classInfo.setModified();
        }
        if (PRIORITY_QUEUE_CLASS_NAME.equals(className)) {
            updateBlockingQueueClass(classInfo.getCtClass());
            classInfo.setModified();
        }
    }
    private void updatePriorityBlockingQueueClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
        if (!haveComparatorField(clazz)) {
            // In Java 6, PriorityBlockingQueue implementation do not have field comparator,
            // need transform more fundamental class PriorityQueue
            logger.info(PRIORITY_BLOCKING_QUEUE_CLASS_NAME + " do not have field " + COMPARATOR_FIELD_NAME +
                ", transform " + PRIORITY_QUEUE_CLASS_NAME + " instead.");
            return;
        }
        modifyConstructors(clazz);
    }
    private void updateBlockingQueueClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
        final CtClass classPriorityBlockingQueue = clazz.getClassPool().getCtClass(PRIORITY_BLOCKING_QUEUE_CLASS_NAME);
        if (haveComparatorField(classPriorityBlockingQueue)) return;
        logger.info(PRIORITY_BLOCKING_QUEUE_CLASS_NAME + " do not have field " + COMPARATOR_FIELD_NAME +
            ", so need transform " + PRIORITY_QUEUE_CLASS_NAME);
        modifyConstructors(clazz);
    }
    private static boolean haveComparatorField(CtClass clazz) {
        try {
            clazz.getDeclaredField(COMPARATOR_FIELD_NAME);
            return true;
        } catch (NotFoundException e) {
            return false;
        }
    }
    /**
     * @see #wrapComparator$by$ttl(Comparator)
     */
    private static final String WRAP_METHOD_NAME = "wrapComparator$by$ttl";
    /**
     * wrap comparator field in constructors
     *
     * @see #COMPARATOR_FIELD_NAME
     */
    private static final String AFTER_CODE_REWRITE_FILED = String.format("this.%s = %s.%s(this.%1$s);",
        COMPARATOR_FIELD_NAME, TtlPriorityBlockingQueueTransformlet.class.getName(), WRAP_METHOD_NAME
    );
    private static void modifyConstructors(@NonNull CtClass clazz) throws NotFoundException, CannotCompileException {
        for (CtConstructor constructor : clazz.getDeclaredConstructors()) {
            final CtClass[] parameterTypes = constructor.getParameterTypes();
            final StringBuilder beforeCode = new StringBuilder();
            for (int i = 0; i < parameterTypes.length; i++) {
                ///////////////////////////////////////////////////////////////
                // rewrite Comparator constructor parameter
                ///////////////////////////////////////////////////////////////
                final String paramTypeName = parameterTypes[i].getName();
// 這里的攔截角度比較清奇,因?yàn)閞unnable實(shí)現(xiàn)了 Comparator,所有入隊(duì)的runable都會(huì)執(zhí)行排序方法,那么只要對(duì)compator方法增強(qiáng)即可,老配方,在方法執(zhí)行前拿到參數(shù)列表找到帶有Comparator的runnable裝飾它
                if (COMPARATOR_CLASS_NAME.equals(paramTypeName)) {
                    String code = String.format("$%d = %s.%s($%1$d);",
                        i + 1, TtlPriorityBlockingQueueTransformlet.class.getName(), WRAP_METHOD_NAME
                    );
                    beforeCode.append(code);
                }
            }
            if (beforeCode.length() > 0) {
                logger.info("insert code before constructor " + signatureOfMethod(constructor) + " of class " +
                    constructor.getDeclaringClass().getName() + ": " + beforeCode);
                constructor.insertBefore(beforeCode.toString());
            }
            ///////////////////////////////////////////////////////////////
            // rewrite Comparator class field
            ///////////////////////////////////////////////////////////////
            logger.info("insert code after constructor " + signatureOfMethod(constructor) + " of class " +
                constructor.getDeclaringClass().getName() + ": " + AFTER_CODE_REWRITE_FILED);
            constructor.insertAfter(AFTER_CODE_REWRITE_FILED);
        }
    }
    /**
     * @see TtlExecutors#getTtlRunnableUnwrapComparatorForComparableRunnable()
     * @see TtlExecutors#getTtlRunnableUnwrapComparator(Comparator)
     */
    public static Comparator<Runnable> wrapComparator$by$ttl(Comparator<Runnable> comparator) {
        if (comparator == null) return TtlExecutors.getTtlRunnableUnwrapComparatorForComparableRunnable();
        return TtlExecutors.getTtlRunnableUnwrapComparator(comparator);
    }
}

看一下TtlTimerTaskTransformlet

public class TtlTimerTaskTransformlet implements JavassistTransformlet {
    private static final Logger logger = Logger.getLogger(TtlTimerTaskTransformlet.class);
    private static final String TIMER_TASK_CLASS_NAME = "java.util.TimerTask";
    private static final String RUN_METHOD_NAME = "run";
    @Override
    public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
        // work-around ClassCircularityError:
        if (isClassAtPackageJavaUtil(classInfo.getClassName())) return;
        // TimerTask class is checked by above logic.
        //
        // if (TIMER_TASK_CLASS_NAME.equals(classInfo.getClassName())) return; // No need transform TimerTask class
        final CtClass clazz = classInfo.getCtClass();
        if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || clazz.isAnnotation()) {
            return;
        }
        // class contains method `void run()` ?
//TimerTask 簡(jiǎn)單粗暴就是要找 void run() 方法,因?yàn)樗旧砭褪荝unnable的子類(lèi)并且有一個(gè)抽象方法 void run(),那么我們只要找到所有TimerTask子類(lèi)找到其實(shí)現(xiàn)的run方法增強(qiáng)即可
        try {
            final CtMethod runMethod = clazz.getDeclaredMethod(RUN_METHOD_NAME, new CtClass[0]);
            if (!CtClass.voidType.equals(runMethod.getReturnType())) return;
        } catch (NotFoundException e) {
            return;
        }
        if (!clazz.subclassOf(clazz.getClassPool().get(TIMER_TASK_CLASS_NAME))) return;
        logger.info("Transforming class " + classInfo.getClassName());
        updateTimerTaskClass(clazz);
        classInfo.setModified();
    }
    /**
     * @see Utils#doCaptureWhenNotTtlEnhanced(java.lang.Object)
     */
    private void updateTimerTaskClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
        final String className = clazz.getName();
        // add new field
//新增一個(gè)  用于中轉(zhuǎn)捕獲主線(xiàn)程對(duì)象的變量
        final String capturedFieldName = "captured$field$added$by$ttl";
        final CtField capturedField = CtField.make("private final Object " + capturedFieldName + ";", clazz);
        clazz.addField(capturedField, "com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doCaptureWhenNotTtlEnhanced(this);");
        logger.info("add new field " + capturedFieldName + " to class " + className);
        final CtMethod runMethod = clazz.getDeclaredMethod(RUN_METHOD_NAME, new CtClass[0]);
        final String beforeCode = "Object backup = com.alibaba.ttl.TransmittableThreadLocal.Transmitter.replay(" + capturedFieldName + ");";
        final String finallyCode = "com.alibaba.ttl.TransmittableThreadLocal.Transmitter.restore(backup);";
// 直接增強(qiáng)方法就不能用裝飾器了,直接修改方法加入一個(gè)try{}finally邏輯,這本身就是裝飾器內(nèi)部的邏輯
// 這部分是一些代碼拼接就不看了
        doTryFinallyForMethod(runMethod, beforeCode, finallyCode);
    }
}

再來(lái)看一下 ForkJoin的增強(qiáng)TtlForkJoinTransformlet

public class TtlForkJoinTransformlet implements JavassistTransformlet {
    private static final Logger logger = Logger.getLogger(TtlForkJoinTransformlet.class);
    private static final String FORK_JOIN_TASK_CLASS_NAME = "java.util.concurrent.ForkJoinTask";
    private static final String FORK_JOIN_POOL_CLASS_NAME = "java.util.concurrent.ForkJoinPool";
    private static final String FORK_JOIN_WORKER_THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory";
    private final boolean disableInheritableForThreadPool;
    public TtlForkJoinTransformlet(boolean disableInheritableForThreadPool) {
        this.disableInheritableForThreadPool = disableInheritableForThreadPool;
    }
    @Override
    public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
        if (FORK_JOIN_TASK_CLASS_NAME.equals(classInfo.getClassName())) {
//如果是 forkJoin的task
            updateForkJoinTaskClass(classInfo.getCtClass());
            classInfo.setModified();
        } else if (disableInheritableForThreadPool && FORK_JOIN_POOL_CLASS_NAME.equals(classInfo.getClassName())) {
// forkJoin的 pool 或者 forkJoin的ThreadFactory
            updateConstructorDisableInheritable(classInfo.getCtClass());
            classInfo.setModified();
        }
    }
    /**
     * @see Utils#doCaptureWhenNotTtlEnhanced(java.lang.Object)
     */
    private void updateForkJoinTaskClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException {
        final String className = clazz.getName();
    // 如果是 ForkJoinTask 同 TimerTask
        // add new field
        final String capturedFieldName = "captured$field$added$by$ttl";
        final CtField capturedField = CtField.make("private final Object " + capturedFieldName + ";", clazz);
        clazz.addField(capturedField, "com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doCaptureWhenNotTtlEnhanced(this);");
        logger.info("add new field " + capturedFieldName + " to class " + className);
        final CtMethod doExecMethod = clazz.getDeclaredMethod("doExec", new CtClass[0]);
        final String doExec_renamed_method_name = renamedMethodNameByTtl(doExecMethod);
        final String beforeCode = "if (this instanceof " + TtlEnhanced.class.getName() + ") {\n" + // if the class is already TTL enhanced(eg: com.alibaba.ttl.TtlRecursiveTask)
                "    return " + doExec_renamed_method_name + "($$);\n" +                           // return directly/do nothing
                "}\n" +
                "Object backup = com.alibaba.ttl.TransmittableThreadLocal.Transmitter.replay(" + capturedFieldName + ");";
        final String finallyCode = "com.alibaba.ttl.TransmittableThreadLocal.Transmitter.restore(backup);";
        doTryFinallyForMethod(doExecMethod, doExec_renamed_method_name, beforeCode, finallyCode);
    }
    private void updateConstructorDisableInheritable(@NonNull final CtClass clazz) throws NotFoundException, CannotCompileException {
// 如果是 ForkJoin的 Pool或者 ThreadFactory 則同線(xiàn)程池的第三方實(shí)現(xiàn)邏輯
// 我們需要屏蔽掉ttl的邏輯。
// 為什么要這么做,因?yàn)镕orkJoin本身是一個(gè)很特殊的線(xiàn)程池
// 1. 初始化線(xiàn)程后不一定為誰(shuí)工作,其實(shí)理論上所有初始化線(xiàn)程做TTL操作都沒(méi)什么意義,存在復(fù)用
// 2. 通過(guò)線(xiàn)程池執(zhí)行任務(wù),對(duì)于ForkJoin來(lái)說(shuō)并沒(méi)有真正的執(zhí)行,是先包裝為一個(gè)ForkJoinTask然后再進(jìn)行ForkJoinPool#externalPush(ForkJoinTask<?> task)
// 3. 存在竊取的可能但是也是以ForkJoinTask 維度
        for (CtConstructor constructor : clazz.getDeclaredConstructors()) {
            final CtClass[] parameterTypes = constructor.getParameterTypes();
            final StringBuilder insertCode = new StringBuilder();
            for (int i = 0; i < parameterTypes.length; i++) {
                final String paramTypeName = parameterTypes[i].getName();
                if (FORK_JOIN_WORKER_THREAD_FACTORY_CLASS_NAME.equals(paramTypeName)) {
                    String code = String.format("$%d = com.alibaba.ttl.threadpool.TtlForkJoinPoolHelper.getDisableInheritableForkJoinWorkerThreadFactory($%<d);", i + 1);
                    insertCode.append(code);
                }
            }
            if (insertCode.length() > 0) {
                logger.info("insert code before method " + signatureOfMethod(constructor) + " of class " +
                    constructor.getDeclaringClass().getName() + ": " + insertCode);
                constructor.insertBefore(insertCode.toString());
            }
        }
    }
}

如上可以看到對(duì)于ForkJoin來(lái)說(shuō)我們只對(duì) ForkJoinTask進(jìn)行增強(qiáng)

例外TTL的作者告訴我 其實(shí)不使用javaAgent的方式也可以對(duì)ForkJoin使用TTL,分別為T(mén)tlRecursiveAction,TtlRecursiveTask兩個(gè)類(lèi),我看了代碼后還未進(jìn)行測(cè)試,但是看邏輯就是可以的。后續(xù)我抽時(shí)間測(cè)試如果有問(wèn)題這里會(huì)修改,如果小伙伴需要用到這個(gè)ForkJoin要進(jìn)行并發(fā)的測(cè)試,單獨(dú)執(zhí)行一個(gè)ForkJoin或者parallelStream是不可以的。

TtlRecursiveTask<V> 和TtlRecursiveAction區(qū)別是可返回結(jié)果的區(qū)別,返回結(jié)果可以用于CompletableFuture

通過(guò)查看源碼我們還發(fā)現(xiàn)了TtlWrappers可以的對(duì)java常見(jiàn)的函數(shù)式接口進(jìn)行修飾。相當(dāng)于把裝飾器維度使用最小的,類(lèi)似于對(duì)Runnable和Callable這種任務(wù)執(zhí)行單元一樣

TtlWrappers使用案例

 @Async
    public void test(Integer integer) {
        // 首先這里@Async我們已經(jīng)將異步線(xiàn)程池使用了TTL的裝飾器線(xiàn)程池
        int num = ThreadLocalRandom.current().nextInt(5, 500);
        try {
            Thread.sleep(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 這里數(shù)字忽略只是為了使用forkJoin
        Arrays.asList(1, 2, 3, 4, 5, 6, 1, 2132, 234, 234, 23, 2, 2, 2, 2)
            .parallelStream().forEach(TtlWrappers.wrapConsumer(o -> {
                // 如果不適用TtlWrappers 這里會(huì)不相等,代表這里線(xiàn)程沒(méi)有正確傳遞
            //如果使用TtlWrappers 則也是我們手動(dòng)做裝飾器了,只不過(guò)是針對(duì)代碼塊執(zhí)行單元的裝飾器
                if (!Objects.equals(TestController.LOCAL.get(), integer)) {
                    System.out.println("bbbbbbbbbbb");
                }
            }));
    }

TtlWrappers可以修飾的函數(shù)

其實(shí)如果可以修飾 Consumer,F(xiàn)unction,Supplier了,我們也可以自定義構(gòu)造一個(gè)函數(shù),再用TtlWrappers修飾

我們?cè)倏纯醋髡咛岬降腡tlRecursiveTask,簡(jiǎn)單點(diǎn)我們只看TtlRecursiveAction 沒(méi)有返回值的

public abstract class TtlRecursiveAction extends ForkJoinTask<Void> implements TtlEnhanced {
    private static final long serialVersionUID = -5753568484583412377L;
// 子類(lèi)初始化時(shí)捕獲
    private final Object captured = capture();
    protected TtlRecursiveAction() {
    }
// 延遲到子類(lèi)進(jìn)行 任務(wù)單元執(zhí)行
    protected abstract void compute();
    /**
     * see {@link ForkJoinTask#getRawResult()}
     */
    public final Void getRawResult() {
        return null;
    }
    /**
     * see {@link ForkJoinTask#setRawResult(Object)}
     */
    protected final void setRawResult(Void mustBeNull) {
    }
    /**
     * Implements execution conventions for RecursiveActions.
     */
// 熟悉的配方,但是當(dāng)前方法是重寫(xiě)了 ForkJoinTask的執(zhí)行方法
    protected final boolean exec() {
        final Object backup = replay(captured);
        try {
            compute();
            return true;
        } finally {
            restore(backup);
        }
    }
}

如何使用

   @Async
    public void test(Integer integer) {
        // 首先這里@Async我們已經(jīng)將異步線(xiàn)程池使用了TTL的裝飾器線(xiàn)程池
        int num = ThreadLocalRandom.current().nextInt(5, 500);
        try {
            Thread.sleep(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 10 ; i++) {
// 只能這樣使用,有點(diǎn)麻煩啊
            TestFork testFork = new TestFork(integer);
// 獲取默認(rèn)的 ForkJoin線(xiàn)程池
            ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
            forkJoinPool.execute(testFork);
        }
    }
    private static class TestFork extends TtlRecursiveAction {
        // 這里只是用來(lái)做對(duì)比的變量
        private final Integer test;
        public TestFork(Integer test) {
            this.test = test;
        }
        @Override
        protected void compute() {
            //  這里是你的任務(wù)單元的業(yè)務(wù)代碼
            if (!Objects.equals(TestController.LOCAL.get(), test)) {
// 不會(huì)進(jìn)到這里,是可用的
                System.out.println("bbbbbbbbbbb");
            }
        }
    }

從上面可以看出TtlRecursiveTask局限性很大,但是現(xiàn)在最新版本2.12.6已經(jīng)有TtlWrappers進(jìn)行替代了,函數(shù)是一等公民,擴(kuò)展性強(qiáng)使用方便。

總結(jié)

  • 不使用JavaAgent技術(shù)的支持 ForkJoin使用手動(dòng)捕獲,重放的ForkJoinTask
    實(shí)現(xiàn)傳遞,但是需要暴露ForkJoinPool的使用。
  • 通過(guò) javaAgent技術(shù)可以添加jvm啟動(dòng)參數(shù)后無(wú)感支持Runnable,Callable,TimerTask,F(xiàn)orkJoinTask。
  • 默認(rèn)開(kāi)啟Runnable,Callable,F(xiàn)orkJoinTask,并且目前不支持第三方框架重寫(xiě)的線(xiàn)程池邏輯,并做了一定程度的去干擾操作。
  • 如果不使用javaAgent針對(duì)常見(jiàn)的函數(shù),Consumer,Supplier等提供代碼塊執(zhí)行單元的裝飾器,也可以支持parallelStream 等,詳見(jiàn)com.alibaba.ttl.TtlWrappers。

以上就是TransmittableThreadLocal通過(guò)javaAgent實(shí)現(xiàn)線(xiàn)程傳遞并支持ForkJoin的詳細(xì)內(nèi)容,更多關(guān)于TransmittableThreadLocal javaAgent線(xiàn)程傳遞的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot中使用WebSocket的教程分享

    SpringBoot中使用WebSocket的教程分享

    這篇文章主要為大家詳細(xì)介紹了如何在SpringBoot中使用WebSocket,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解一下
    2023-06-06
  • 基于Java SSM框架實(shí)現(xiàn)簡(jiǎn)易的評(píng)教系統(tǒng)

    基于Java SSM框架實(shí)現(xiàn)簡(jiǎn)易的評(píng)教系統(tǒng)

    這篇文章主要介紹了通過(guò)Java SSM框架實(shí)現(xiàn)一個(gè)簡(jiǎn)易的評(píng)教系統(tǒng)的示例代碼,文中的代碼講解詳細(xì),感興趣的小伙伴可以了解一下
    2022-02-02
  • Java中的泛型

    Java中的泛型

    這篇文章主要介紹為何要泛型,如何使用泛型,自定義泛型的方法,泛型類(lèi)的子類(lèi)等多方面介紹了JAVA中的泛型,需要的小伙伴請(qǐng)看下文
    2021-08-08
  • Mybatis注解實(shí)現(xiàn)多數(shù)據(jù)源讀寫(xiě)分離詳解

    Mybatis注解實(shí)現(xiàn)多數(shù)據(jù)源讀寫(xiě)分離詳解

    這篇文章主要給大家介紹了關(guān)于Mybatis注解實(shí)現(xiàn)多數(shù)據(jù)源讀寫(xiě)分離的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • java map中相同的key保存多個(gè)value值方式

    java map中相同的key保存多個(gè)value值方式

    這篇文章主要介紹了java map中相同的key保存多個(gè)value值方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Spring Boot中防止遞歸查詢(xún)的兩種方式

    Spring Boot中防止遞歸查詢(xún)的兩種方式

    這篇文章主要給大家介紹了關(guān)于Spring Boot中防止遞歸查詢(xún)的兩種方式,兩種方式分別是在application.properties中配置和在entity中添加注解,都給出了詳細(xì)的示例代碼,需要的朋友們下面來(lái)一起看看吧。
    2017-06-06
  • 3分鐘純 Java 注解搭個(gè)管理系統(tǒng)的示例代碼

    3分鐘純 Java 注解搭個(gè)管理系統(tǒng)的示例代碼

    這篇文章主要介紹了3分鐘純 Java 注解搭個(gè)管理系統(tǒng)的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • java虛擬機(jī)jvm方法區(qū)實(shí)例講解

    java虛擬機(jī)jvm方法區(qū)實(shí)例講解

    在本篇文章里小編給大家整理的是一篇關(guān)于java虛擬機(jī)jvm方法區(qū)實(shí)例講解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2021-02-02
  • Java switch()括號(hào)內(nèi)參數(shù)的類(lèi)型要求詳解

    Java switch()括號(hào)內(nèi)參數(shù)的類(lèi)型要求詳解

    這篇文章主要介紹了Java switch()括號(hào)內(nèi)參數(shù)的類(lèi)型要求,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • SpringBoot中shiro過(guò)濾器的重寫(xiě)與配置詳解

    SpringBoot中shiro過(guò)濾器的重寫(xiě)與配置詳解

    在前后端分離跨域訪問(wèn)的項(xiàng)目中shiro進(jìn)行權(quán)限攔截失效 (即使有正確權(quán)限的訪問(wèn)也會(huì)被攔截) 時(shí)造成302重定向錯(cuò)誤等問(wèn)題,為解決這個(gè)問(wèn)題,就需要進(jìn)行shiro過(guò)濾器的重寫(xiě)以及配置。本文詳細(xì)介紹了解決方法,需要的可以參考一下
    2022-04-04

最新評(píng)論