TransmittableThreadLocal通過javaAgent實現(xiàn)線程傳遞并支持ForkJoin
官方文檔
感謝TTL 作者在我上一篇文章評論,讓我知道了通過官方文檔去全面了解其使用方式及其支持的重要性。
所以官方文檔先貼出來~
alibaba/transmittable-thread-local: ?? TransmittableThreadLocal (TTL), the missing Java™ std lib(simple & 0-dependency) for framework/middleware, provide an enhanced InheritableThreadLocal that transmits values between threads even using thread pooling components. (github.com)
重要的不單單是看官方文檔,還有其他問題可以通過issue查看最新動態(tài)、以及和社區(qū)交流,本文背景以TTL最新版本2.12.6版本為基準(zhǔn)。
TransmittableThreadLocal線程間傳遞邏輯
上一篇文章及評論,可以看到TTL官方描述其通過javaAgent技術(shù)可以實現(xiàn)對ForkJoin的線程傳遞支持。
通過測試,確實可以
通過javaagent指定jar包
簡單的測試代碼如下
private final static TransmittableThreadLocal<Integer> LOCAL = new TransmittableThreadLocal<>(); @PostMapping("/a") public void test() { final int num = ThreadLocalRandom.current().nextInt(0, 1500); LOCAL.set(num); List<Integer> list = new ArrayList<>(); for (int i = 0; i < 150; i++) { list.add(i); } list.stream().parallel().forEach(o -> { if (!Objects.equals(LOCAL.get(), num)) { System.out.println("如果不同代表沒有正確傳遞噢"); } }); }
通過上述代碼及通過javaagent引入jar包后不用手動設(shè)置TTL的線程池裝飾器也可以了!我們接下來看看其中的奧秘!(javaAgent技術(shù)大家可以自行搜索學(xué)習(xí)一下,例如skywalking等無侵入場景應(yīng)用)
// 通過premain 進(jìn)行字節(jié)碼操作添加我們想要的攔截,(attach,premain兩個java agent核心方法大家可以搜索詳細(xì)了解) public static void premain(String agentArgs, @Nonnull Instrumentation inst) { // 一個 volatile 變量,不清除這里volatile的必要性,此文章暫不關(guān)注 kvs = splitCommaColonStringToKV(agentArgs); // 日志打印的一個多態(tài),可以通過參數(shù)設(shè)置。默認(rèn)只打印 StdErr,也可以設(shè)置為stdout Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs)); final Logger logger = Logger.getLogger(TtlAgent.class); try { logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst); // 如果是用javaAgent無侵入的實現(xiàn)TTL邏輯(上一篇講到正常使用TTL的裝飾器線程池來實現(xiàn))默認(rèn)是開啟的(TimerTask默認(rèn)不開啟) 這里是查看線程池的配置,默認(rèn)開啟,可以在jvm參數(shù)設(shè)置為關(guān)閉 final boolean disableInheritable = isDisableInheritableForThreadPool(); // 需要修改字節(jié)碼的列表, 通過javassist修改字節(jié)碼 JavassistTransformlet是TTL 做的抽象對jdk可能出現(xiàn)的3種出現(xiàn)線程傳遞的情況分別做了實現(xiàn) final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>(); // 線程池的 transformletList.add(new TtlExecutorTransformlet(disableInheritable)); // forkjoin的 transformletList.add(new TtlForkJoinTransformlet(disableInheritable)); // timer task默認(rèn)關(guān)閉,如果設(shè)置開啟也加入 if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet()); // 這是jdk提供的接口,給客戶端提供對class 字節(jié)碼做增強的入口 final ClassFileTransformer transformer = new TtlTransformer(transformletList); // 將TTL的字節(jié)碼增強邏輯 織入,加載對應(yīng)class時調(diào)用(也取決于要增強的class對象load時機) inst.addTransformer(transformer, true); logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success"); logger.info("[TtlAgent.premain] end"); // 如果使用了 javaAgent 增強,你再手動給線程池包裹裝飾器則會直接返回,不需要包裝了 ttlAgentLoaded = true; } catch (Exception e) { String msg = "Fail to load TtlAgent , cause: " + e.toString(); logger.log(Level.SEVERE, msg, e); throw new IllegalStateException(msg, e); } }
上面貼出了TTL使用javaAgent的premain進(jìn)行字節(jié)碼增強的流程下面看具體的實現(xiàn)以及如何使用jdk暴露的ClassFileTransformer接口進(jìn)行操作字節(jié)碼
先來看TtlTransformer ,我們的字節(jié)碼增強邏輯的橋梁
public class TtlTransformer implements ClassFileTransformer { private static final Logger logger = Logger.getLogger(TtlTransformer.class); private static final byte[] EMPTY_BYTE_ARRAY = {}; // 要被增強的列表 private final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>(); TtlTransformer(List<? extends JavassistTransformlet> transformletList) { // 復(fù)制到當(dāng)前類的成員變量 for (JavassistTransformlet transformlet : transformletList) { this.transformletList.add(transformlet); logger.info("[TtlTransformer] add Transformlet " + transformlet.getClass() + " success"); } } @Override public final byte[] transform(@Nonnull final ClassLoader loader, @Nullable final String classFile, final Class<?> classBeingRedefined, final ProtectionDomain protectionDomain, final byte[] classFileBuffer) { try { // Lambda has no class file, no need to transform, just return. if (classFile == null) return EMPTY_BYTE_ARRAY; // 獲取到當(dāng)前加載的class的類名 final String className = toClassName(classFile); for (JavassistTransformlet transformlet : transformletList) { // 調(diào)用子類實現(xiàn) final byte[] bytes = transformlet.doTransform(className, classFileBuffer, loader); if (bytes != null) return bytes; } } catch (Throwable t) { String msg = "Fail to transform class " + classFile + ", cause: " + t.toString(); logger.log(Level.SEVERE, msg, t); throw new IllegalStateException(msg, t); } return EMPTY_BYTE_ARRAY; } private static String toClassName(final String classFile) { return classFile.replace('/', '.'); } }
下面來看具體的實現(xiàn)有四個
2.12.6版本增強實現(xiàn)有4個
目前TTL代碼迭代還是比較快的,雖然代碼不多,但是一直在改動。和2.10.2的差別還是挺大的。所以小伙伴們可以多關(guān)注一下官方github上的相關(guān)動態(tài)
先看TtlExecutorTransformlet
// 為什么把源碼中的注釋放進(jìn)來,因為TTL的注釋寫的還是挺全。 /** * TTL {@link JavassistTransformlet} for {@link java.util.concurrent.Executor}. * * @author Jerry Lee (oldratlee at gmail dot com) * @author wuwen5 (wuwen.55 at aliyun dot com) // 通過字節(jié)碼操作會覆蓋以下線程執(zhí)行相關(guān)類 * @see java.util.concurrent.Executor * @see java.util.concurrent.ExecutorService * @see java.util.concurrent.ThreadPoolExecutor * @see java.util.concurrent.ScheduledThreadPoolExecutor * @see java.util.concurrent.Executors // 這是也是TTL的一個實現(xiàn),之后再看 * @see TtlPriorityBlockingQueueTransformlet * @since 2.5.1 */ public class TtlExecutorTransformlet implements JavassistTransformlet { private static final Logger logger = Logger.getLogger(TtlExecutorTransformlet.class); // 裝我們要攔截操作字節(jié)碼的類 private static final Set<String> EXECUTOR_CLASS_NAMES = new HashSet<String>(); // 這次套路變了,如果使用javaAgent不用一層一層裝飾再去增強了 // 我們的目的就是 在thread執(zhí)行任務(wù)時候, //任務(wù): 初始化 [捕獲capture] -> 目標(biāo)執(zhí)行前 [重放replay] -> 目標(biāo)執(zhí)行后 還原 [還原restore]即可 // 現(xiàn)在我們聚焦在 Runnable 和 Callable即可,TTL的切入點是在各種線程池 //(如果你直接new,這個原生就支持,可以看看前一篇文章,就算你使用TTL的父類InheritableThreadLocal在new thread時會將第一次主線程的ThreadLocalMap的引用都帶過去噢,想一下childValue這個方法) // 所以我們只要對所有線程池的 參數(shù)中含有 Runnable和 Callable替換成Ttl的增強裝飾器即可! private static final Map<String, String> PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS = new HashMap<String, String>(); private static final String THREAD_POOL_EXECUTOR_CLASS_NAME = "java.util.concurrent.ThreadPoolExecutor"; private static final String RUNNABLE_CLASS_NAME = "java.lang.Runnable"; static { EXECUTOR_CLASS_NAMES.add(THREAD_POOL_EXECUTOR_CLASS_NAME); EXECUTOR_CLASS_NAMES.add("java.util.concurrent.ScheduledThreadPoolExecutor"); PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put(RUNNABLE_CLASS_NAME, "com.alibaba.ttl.TtlRunnable"); PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.util.concurrent.Callable", "com.alibaba.ttl.TtlCallable"); } private static final String THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ThreadFactory"; private final boolean disableInheritableForThreadPool; public TtlExecutorTransformlet(boolean disableInheritableForThreadPool) { this.disableInheritableForThreadPool = disableInheritableForThreadPool; } @Override public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException { //看issues可以知道為什么要 屏蔽java.util,但是具體原因還不明確。 // 因為所有類加載都會進(jìn)入javaAgent的 premain方法,對應(yīng)的實現(xiàn)自己選擇去操作哪些類的字節(jié)碼,例如skywalking那么多插件,就是用全類名一個一個指定常見開源框架的可做攔截邏輯的類,然后去添加apm的邏輯增強 // work-around ClassCircularityError: // https://github.com/alibaba/transmittable-thread-local/issues/278 // https://github.com/alibaba/transmittable-thread-local/issues/234 if (isClassAtPackageJavaUtil(classInfo.getClassName())) return; final CtClass clazz = classInfo.getCtClass(); // 指定的類 if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) { for (CtMethod method : clazz.getDeclaredMethods()) { updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method); } // 首先DisableInheritableThreadFactory和我之前的理解有些出入,但是看實現(xiàn)確實是一個屏蔽TTL的邏輯實現(xiàn),但是如果使用agent需要添加jvm參數(shù),那么去掉參數(shù)了就不會進(jìn)入當(dāng)前jar的premain方法 // 而且經(jīng)過測試,就算讓這里為true也不能屏蔽什么,因為就算替換了ThreadFactory但是Runnable,Callable已經(jīng)在類加載時字節(jié)碼層面替換為TTL的裝飾器了,這里好像亡羊補牢了 - -!雖然不明白但是這里不影響我們使用邏輯 if (disableInheritableForThreadPool) updateConstructorDisableInheritable(clazz); // 標(biāo)記類被字節(jié)碼增強了 classInfo.setModified(); } else { // 如果不是被指定增強的類 // 基本類型數(shù)組,接口,注解直接返回 if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || clazz.isAnnotation()) { return; } // 這里利用了 javassist一些操作,去獲取已經(jīng)加載的類對象,判斷當(dāng)前類是否是指定要被增強類的子類,后續(xù)要做一些操作 // 包括其他基于java實現(xiàn)的框架對線程池的擴展 // 我們debug啟動時可以看到 tomcat線程池org.apache.tomcat.util.threads.ThreadPoolExecutor 繼承了jdk的線程池 // 我們就可以進(jìn)入到方法內(nèi),后續(xù)看下面的方法 if (!clazz.subclassOf(clazz.getClassPool().get(THREAD_POOL_EXECUTOR_CLASS_NAME))) return; logger.info("Transforming class " + classInfo.getClassName()); // 返回一個 操作字節(jié)碼的結(jié)果然后標(biāo)記該類是否成功 增強了 final boolean modified = updateBeforeAndAfterExecuteMethodOfExecutorSubclass(clazz); if (modified) classInfo.setModified(); } } /** * @see com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils#doAutoWrap(Runnable) * @see com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils#doAutoWrap(Callable) */ @SuppressFBWarnings("VA_FORMAT_STRING_USES_NEWLINE") // [ERROR] Format string should use %n rather than \n private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException { final int modifiers = method.getModifiers(); if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return; CtClass[] parameterTypes = method.getParameterTypes(); StringBuilder insertCode = new StringBuilder(); for (int i = 0; i < parameterTypes.length; i++) { // 核心的邏輯,我們只關(guān)注 Runnable,Callable,然后對其線程池所有方法含有該參數(shù)進(jìn)行字節(jié)碼增強,在進(jìn)入方法并執(zhí)行方法前替換為TTL的裝飾器 final String paramTypeName = parameterTypes[i].getName(); if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) { String code = String.format( // auto decorate to TTL wrapper "$%d = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doAutoWrap($%<d);", i + 1); insertCode.append(code); } } if (insertCode.length() > 0) { logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ":\n" + insertCode); method.insertBefore(insertCode.toString()); } } // 省略替換 ThreadFactory的代碼 /** * @see Utils#doUnwrapIfIsAutoWrapper(Runnable) */ private boolean updateBeforeAndAfterExecuteMethodOfExecutorSubclass(@NonNull final CtClass clazz) throws NotFoundException, CannotCompileException { final CtClass runnableClass = clazz.getClassPool().get(RUNNABLE_CLASS_NAME); final CtClass threadClass = clazz.getClassPool().get("java.lang.Thread"); final CtClass throwableClass = clazz.getClassPool().get("java.lang.Throwable"); boolean modified = false; try { // ScheduledThreadPoolExecutor 也是繼承自ThreadPoolExecutor,TTL就是利用了ThreadPoolExecutor#beforeExecute(Thread t, Runnable r) // 和ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) //這是jdk線程池提供的兩個鉤子,TTL對第三方框架的線程池類名完全不了解,所以我們無法直接拿到子類含有Runnable的方法(我們不知道繼承了幾層,無法通過反射去拿到method對象) // 那么TTL目前通過jdk的 beforeExecute,和 afterExecute來對其他框架或者自定義的線程池進(jìn)行去除TTL裝飾器的邏輯 //猜測目的是無法保證正確控制線程間正確傳遞,我們直接消除測試開發(fā)上的迷惑行為 final CtMethod beforeExecute = clazz.getDeclaredMethod("beforeExecute", new CtClass[]{threadClass, runnableClass}); // unwrap runnable if IsAutoWrapper String code = "$2 = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doUnwrapIfIsAutoWrapper($2);"; logger.info("insert code before method " + signatureOfMethod(beforeExecute) + " of class " + beforeExecute.getDeclaringClass().getName() + ": " + code); beforeExecute.insertBefore(code); modified = true; } catch (NotFoundException e) { // 前提是 目標(biāo)線程池重寫了 beforeExecute 才能進(jìn)行unwraper操作 // clazz does not override beforeExecute method, do nothing. } // 下面邏輯類似 攔截afterExecute try { final CtMethod afterExecute = clazz.getDeclaredMethod("afterExecute", new CtClass[]{runnableClass, throwableClass}); // unwrap runnable if IsAutoWrapper String code = "$1 = com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doUnwrapIfIsAutoWrapper($1);"; logger.info("insert code before method " + signatureOfMethod(afterExecute) + " of class " + afterExecute.getDeclaringClass().getName() + ": " + code); afterExecute.insertBefore(code); modified = true; } catch (NotFoundException e) { // clazz does not override afterExecute method, do nothing. } return modified; }
上面看了 典型的 ThreadPoolExecutor和ScheduledThreadPoolExecutor的增強以及對第三方擴展線程池一定程度上的(重寫jdk線程池的鉤子方法才會操作)消除歧義
關(guān)聯(lián)的TtlPriorityBlockingQueueTransformlet
public class TtlPriorityBlockingQueueTransformlet implements JavassistTransformlet { private static final Logger logger = Logger.getLogger(TtlPriorityBlockingQueueTransformlet.class); // 我們要攔截 關(guān)于優(yōu)先級隊列的線程池 // https://github.com/alibaba/transmittable-thread-local/issues/330 // 這里有前因后果,因為runnable等接口也實現(xiàn)了 可排序接口,會castClass錯誤 private static final String PRIORITY_BLOCKING_QUEUE_CLASS_NAME = "java.util.concurrent.PriorityBlockingQueue"; private static final String PRIORITY_QUEUE_CLASS_NAME = "java.util.PriorityQueue"; private static final String COMPARATOR_CLASS_NAME = "java.util.Comparator"; private static final String COMPARATOR_FIELD_NAME = "comparator"; @Override public void doTransform(@NonNull ClassInfo classInfo) throws IOException, CannotCompileException, NotFoundException { final String className = classInfo.getClassName(); // 一共兩種優(yōu)先級隊列的攔截,但增強邏輯是一樣的 if (PRIORITY_BLOCKING_QUEUE_CLASS_NAME.equals(className)) { updatePriorityBlockingQueueClass(classInfo.getCtClass()); classInfo.setModified(); } if (PRIORITY_QUEUE_CLASS_NAME.equals(className)) { updateBlockingQueueClass(classInfo.getCtClass()); classInfo.setModified(); } } private void updatePriorityBlockingQueueClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException { if (!haveComparatorField(clazz)) { // In Java 6, PriorityBlockingQueue implementation do not have field comparator, // need transform more fundamental class PriorityQueue logger.info(PRIORITY_BLOCKING_QUEUE_CLASS_NAME + " do not have field " + COMPARATOR_FIELD_NAME + ", transform " + PRIORITY_QUEUE_CLASS_NAME + " instead."); return; } modifyConstructors(clazz); } private void updateBlockingQueueClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException { final CtClass classPriorityBlockingQueue = clazz.getClassPool().getCtClass(PRIORITY_BLOCKING_QUEUE_CLASS_NAME); if (haveComparatorField(classPriorityBlockingQueue)) return; logger.info(PRIORITY_BLOCKING_QUEUE_CLASS_NAME + " do not have field " + COMPARATOR_FIELD_NAME + ", so need transform " + PRIORITY_QUEUE_CLASS_NAME); modifyConstructors(clazz); } private static boolean haveComparatorField(CtClass clazz) { try { clazz.getDeclaredField(COMPARATOR_FIELD_NAME); return true; } catch (NotFoundException e) { return false; } } /** * @see #wrapComparator$by$ttl(Comparator) */ private static final String WRAP_METHOD_NAME = "wrapComparator$by$ttl"; /** * wrap comparator field in constructors * * @see #COMPARATOR_FIELD_NAME */ private static final String AFTER_CODE_REWRITE_FILED = String.format("this.%s = %s.%s(this.%1$s);", COMPARATOR_FIELD_NAME, TtlPriorityBlockingQueueTransformlet.class.getName(), WRAP_METHOD_NAME ); private static void modifyConstructors(@NonNull CtClass clazz) throws NotFoundException, CannotCompileException { for (CtConstructor constructor : clazz.getDeclaredConstructors()) { final CtClass[] parameterTypes = constructor.getParameterTypes(); final StringBuilder beforeCode = new StringBuilder(); for (int i = 0; i < parameterTypes.length; i++) { /////////////////////////////////////////////////////////////// // rewrite Comparator constructor parameter /////////////////////////////////////////////////////////////// final String paramTypeName = parameterTypes[i].getName(); // 這里的攔截角度比較清奇,因為runnable實現(xiàn)了 Comparator,所有入隊的runable都會執(zhí)行排序方法,那么只要對compator方法增強即可,老配方,在方法執(zhí)行前拿到參數(shù)列表找到帶有Comparator的runnable裝飾它 if (COMPARATOR_CLASS_NAME.equals(paramTypeName)) { String code = String.format("$%d = %s.%s($%1$d);", i + 1, TtlPriorityBlockingQueueTransformlet.class.getName(), WRAP_METHOD_NAME ); beforeCode.append(code); } } if (beforeCode.length() > 0) { logger.info("insert code before constructor " + signatureOfMethod(constructor) + " of class " + constructor.getDeclaringClass().getName() + ": " + beforeCode); constructor.insertBefore(beforeCode.toString()); } /////////////////////////////////////////////////////////////// // rewrite Comparator class field /////////////////////////////////////////////////////////////// logger.info("insert code after constructor " + signatureOfMethod(constructor) + " of class " + constructor.getDeclaringClass().getName() + ": " + AFTER_CODE_REWRITE_FILED); constructor.insertAfter(AFTER_CODE_REWRITE_FILED); } } /** * @see TtlExecutors#getTtlRunnableUnwrapComparatorForComparableRunnable() * @see TtlExecutors#getTtlRunnableUnwrapComparator(Comparator) */ public static Comparator<Runnable> wrapComparator$by$ttl(Comparator<Runnable> comparator) { if (comparator == null) return TtlExecutors.getTtlRunnableUnwrapComparatorForComparableRunnable(); return TtlExecutors.getTtlRunnableUnwrapComparator(comparator); } }
看一下TtlTimerTaskTransformlet
public class TtlTimerTaskTransformlet implements JavassistTransformlet { private static final Logger logger = Logger.getLogger(TtlTimerTaskTransformlet.class); private static final String TIMER_TASK_CLASS_NAME = "java.util.TimerTask"; private static final String RUN_METHOD_NAME = "run"; @Override public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException { // work-around ClassCircularityError: if (isClassAtPackageJavaUtil(classInfo.getClassName())) return; // TimerTask class is checked by above logic. // // if (TIMER_TASK_CLASS_NAME.equals(classInfo.getClassName())) return; // No need transform TimerTask class final CtClass clazz = classInfo.getCtClass(); if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || clazz.isAnnotation()) { return; } // class contains method `void run()` ? //TimerTask 簡單粗暴就是要找 void run() 方法,因為它本身就是Runnable的子類并且有一個抽象方法 void run(),那么我們只要找到所有TimerTask子類找到其實現(xiàn)的run方法增強即可 try { final CtMethod runMethod = clazz.getDeclaredMethod(RUN_METHOD_NAME, new CtClass[0]); if (!CtClass.voidType.equals(runMethod.getReturnType())) return; } catch (NotFoundException e) { return; } if (!clazz.subclassOf(clazz.getClassPool().get(TIMER_TASK_CLASS_NAME))) return; logger.info("Transforming class " + classInfo.getClassName()); updateTimerTaskClass(clazz); classInfo.setModified(); } /** * @see Utils#doCaptureWhenNotTtlEnhanced(java.lang.Object) */ private void updateTimerTaskClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException { final String className = clazz.getName(); // add new field //新增一個 用于中轉(zhuǎn)捕獲主線程對象的變量 final String capturedFieldName = "captured$field$added$by$ttl"; final CtField capturedField = CtField.make("private final Object " + capturedFieldName + ";", clazz); clazz.addField(capturedField, "com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doCaptureWhenNotTtlEnhanced(this);"); logger.info("add new field " + capturedFieldName + " to class " + className); final CtMethod runMethod = clazz.getDeclaredMethod(RUN_METHOD_NAME, new CtClass[0]); final String beforeCode = "Object backup = com.alibaba.ttl.TransmittableThreadLocal.Transmitter.replay(" + capturedFieldName + ");"; final String finallyCode = "com.alibaba.ttl.TransmittableThreadLocal.Transmitter.restore(backup);"; // 直接增強方法就不能用裝飾器了,直接修改方法加入一個try{}finally邏輯,這本身就是裝飾器內(nèi)部的邏輯 // 這部分是一些代碼拼接就不看了 doTryFinallyForMethod(runMethod, beforeCode, finallyCode); } }
再來看一下 ForkJoin的增強TtlForkJoinTransformlet
public class TtlForkJoinTransformlet implements JavassistTransformlet { private static final Logger logger = Logger.getLogger(TtlForkJoinTransformlet.class); private static final String FORK_JOIN_TASK_CLASS_NAME = "java.util.concurrent.ForkJoinTask"; private static final String FORK_JOIN_POOL_CLASS_NAME = "java.util.concurrent.ForkJoinPool"; private static final String FORK_JOIN_WORKER_THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory"; private final boolean disableInheritableForThreadPool; public TtlForkJoinTransformlet(boolean disableInheritableForThreadPool) { this.disableInheritableForThreadPool = disableInheritableForThreadPool; } @Override public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException { if (FORK_JOIN_TASK_CLASS_NAME.equals(classInfo.getClassName())) { //如果是 forkJoin的task updateForkJoinTaskClass(classInfo.getCtClass()); classInfo.setModified(); } else if (disableInheritableForThreadPool && FORK_JOIN_POOL_CLASS_NAME.equals(classInfo.getClassName())) { // forkJoin的 pool 或者 forkJoin的ThreadFactory updateConstructorDisableInheritable(classInfo.getCtClass()); classInfo.setModified(); } } /** * @see Utils#doCaptureWhenNotTtlEnhanced(java.lang.Object) */ private void updateForkJoinTaskClass(@NonNull final CtClass clazz) throws CannotCompileException, NotFoundException { final String className = clazz.getName(); // 如果是 ForkJoinTask 同 TimerTask // add new field final String capturedFieldName = "captured$field$added$by$ttl"; final CtField capturedField = CtField.make("private final Object " + capturedFieldName + ";", clazz); clazz.addField(capturedField, "com.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.doCaptureWhenNotTtlEnhanced(this);"); logger.info("add new field " + capturedFieldName + " to class " + className); final CtMethod doExecMethod = clazz.getDeclaredMethod("doExec", new CtClass[0]); final String doExec_renamed_method_name = renamedMethodNameByTtl(doExecMethod); final String beforeCode = "if (this instanceof " + TtlEnhanced.class.getName() + ") {\n" + // if the class is already TTL enhanced(eg: com.alibaba.ttl.TtlRecursiveTask) " return " + doExec_renamed_method_name + "($$);\n" + // return directly/do nothing "}\n" + "Object backup = com.alibaba.ttl.TransmittableThreadLocal.Transmitter.replay(" + capturedFieldName + ");"; final String finallyCode = "com.alibaba.ttl.TransmittableThreadLocal.Transmitter.restore(backup);"; doTryFinallyForMethod(doExecMethod, doExec_renamed_method_name, beforeCode, finallyCode); } private void updateConstructorDisableInheritable(@NonNull final CtClass clazz) throws NotFoundException, CannotCompileException { // 如果是 ForkJoin的 Pool或者 ThreadFactory 則同線程池的第三方實現(xiàn)邏輯 // 我們需要屏蔽掉ttl的邏輯。 // 為什么要這么做,因為ForkJoin本身是一個很特殊的線程池 // 1. 初始化線程后不一定為誰工作,其實理論上所有初始化線程做TTL操作都沒什么意義,存在復(fù)用 // 2. 通過線程池執(zhí)行任務(wù),對于ForkJoin來說并沒有真正的執(zhí)行,是先包裝為一個ForkJoinTask然后再進(jìn)行ForkJoinPool#externalPush(ForkJoinTask<?> task) // 3. 存在竊取的可能但是也是以ForkJoinTask 維度 for (CtConstructor constructor : clazz.getDeclaredConstructors()) { final CtClass[] parameterTypes = constructor.getParameterTypes(); final StringBuilder insertCode = new StringBuilder(); for (int i = 0; i < parameterTypes.length; i++) { final String paramTypeName = parameterTypes[i].getName(); if (FORK_JOIN_WORKER_THREAD_FACTORY_CLASS_NAME.equals(paramTypeName)) { String code = String.format("$%d = com.alibaba.ttl.threadpool.TtlForkJoinPoolHelper.getDisableInheritableForkJoinWorkerThreadFactory($%<d);", i + 1); insertCode.append(code); } } if (insertCode.length() > 0) { logger.info("insert code before method " + signatureOfMethod(constructor) + " of class " + constructor.getDeclaringClass().getName() + ": " + insertCode); constructor.insertBefore(insertCode.toString()); } } } }
如上可以看到對于ForkJoin來說我們只對 ForkJoinTask進(jìn)行增強
例外TTL的作者告訴我 其實不使用javaAgent的方式也可以對ForkJoin使用TTL,分別為TtlRecursiveAction,TtlRecursiveTask兩個類,我看了代碼后還未進(jìn)行測試,但是看邏輯就是可以的。后續(xù)我抽時間測試如果有問題這里會修改,如果小伙伴需要用到這個ForkJoin要進(jìn)行并發(fā)的測試,單獨執(zhí)行一個ForkJoin或者parallelStream是不可以的。
TtlRecursiveTask<V> 和TtlRecursiveAction區(qū)別是可返回結(jié)果的區(qū)別,返回結(jié)果可以用于CompletableFuture
通過查看源碼我們還發(fā)現(xiàn)了TtlWrappers可以的對java常見的函數(shù)式接口進(jìn)行修飾。相當(dāng)于把裝飾器維度使用最小的,類似于對Runnable和Callable這種任務(wù)執(zhí)行單元一樣
TtlWrappers使用案例
@Async public void test(Integer integer) { // 首先這里@Async我們已經(jīng)將異步線程池使用了TTL的裝飾器線程池 int num = ThreadLocalRandom.current().nextInt(5, 500); try { Thread.sleep(num); } catch (InterruptedException e) { e.printStackTrace(); } // 這里數(shù)字忽略只是為了使用forkJoin Arrays.asList(1, 2, 3, 4, 5, 6, 1, 2132, 234, 234, 23, 2, 2, 2, 2) .parallelStream().forEach(TtlWrappers.wrapConsumer(o -> { // 如果不適用TtlWrappers 這里會不相等,代表這里線程沒有正確傳遞 //如果使用TtlWrappers 則也是我們手動做裝飾器了,只不過是針對代碼塊執(zhí)行單元的裝飾器 if (!Objects.equals(TestController.LOCAL.get(), integer)) { System.out.println("bbbbbbbbbbb"); } })); }
TtlWrappers可以修飾的函數(shù)
其實如果可以修飾 Consumer,F(xiàn)unction,Supplier了,我們也可以自定義構(gòu)造一個函數(shù),再用TtlWrappers修飾
我們再看看作者提到的TtlRecursiveTask,簡單點我們只看TtlRecursiveAction 沒有返回值的
public abstract class TtlRecursiveAction extends ForkJoinTask<Void> implements TtlEnhanced { private static final long serialVersionUID = -5753568484583412377L; // 子類初始化時捕獲 private final Object captured = capture(); protected TtlRecursiveAction() { } // 延遲到子類進(jìn)行 任務(wù)單元執(zhí)行 protected abstract void compute(); /** * see {@link ForkJoinTask#getRawResult()} */ public final Void getRawResult() { return null; } /** * see {@link ForkJoinTask#setRawResult(Object)} */ protected final void setRawResult(Void mustBeNull) { } /** * Implements execution conventions for RecursiveActions. */ // 熟悉的配方,但是當(dāng)前方法是重寫了 ForkJoinTask的執(zhí)行方法 protected final boolean exec() { final Object backup = replay(captured); try { compute(); return true; } finally { restore(backup); } } }
如何使用
@Async public void test(Integer integer) { // 首先這里@Async我們已經(jīng)將異步線程池使用了TTL的裝飾器線程池 int num = ThreadLocalRandom.current().nextInt(5, 500); try { Thread.sleep(num); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 10 ; i++) { // 只能這樣使用,有點麻煩啊 TestFork testFork = new TestFork(integer); // 獲取默認(rèn)的 ForkJoin線程池 ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); forkJoinPool.execute(testFork); } } private static class TestFork extends TtlRecursiveAction { // 這里只是用來做對比的變量 private final Integer test; public TestFork(Integer test) { this.test = test; } @Override protected void compute() { // 這里是你的任務(wù)單元的業(yè)務(wù)代碼 if (!Objects.equals(TestController.LOCAL.get(), test)) { // 不會進(jìn)到這里,是可用的 System.out.println("bbbbbbbbbbb"); } } }
從上面可以看出TtlRecursiveTask局限性很大,但是現(xiàn)在最新版本2.12.6已經(jīng)有TtlWrappers進(jìn)行替代了,函數(shù)是一等公民,擴展性強使用方便。
總結(jié)
- 不使用JavaAgent技術(shù)的支持 ForkJoin使用手動捕獲,重放的ForkJoinTask
實現(xiàn)傳遞,但是需要暴露ForkJoinPool的使用。 - 通過 javaAgent技術(shù)可以添加jvm啟動參數(shù)后無感支持Runnable,Callable,TimerTask,F(xiàn)orkJoinTask。
- 默認(rèn)開啟Runnable,Callable,F(xiàn)orkJoinTask,并且目前不支持第三方框架重寫的線程池邏輯,并做了一定程度的去干擾操作。
- 如果不使用javaAgent針對常見的函數(shù),Consumer,Supplier等提供代碼塊執(zhí)行單元的裝飾器,也可以支持parallelStream 等,詳見com.alibaba.ttl.TtlWrappers。
以上就是TransmittableThreadLocal通過javaAgent實現(xiàn)線程傳遞并支持ForkJoin的詳細(xì)內(nèi)容,更多關(guān)于TransmittableThreadLocal javaAgent線程傳遞的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于Java SSM框架實現(xiàn)簡易的評教系統(tǒng)
這篇文章主要介紹了通過Java SSM框架實現(xiàn)一個簡易的評教系統(tǒng)的示例代碼,文中的代碼講解詳細(xì),感興趣的小伙伴可以了解一下2022-02-02Mybatis注解實現(xiàn)多數(shù)據(jù)源讀寫分離詳解
這篇文章主要給大家介紹了關(guān)于Mybatis注解實現(xiàn)多數(shù)據(jù)源讀寫分離的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09Java switch()括號內(nèi)參數(shù)的類型要求詳解
這篇文章主要介紹了Java switch()括號內(nèi)參數(shù)的類型要求,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10