TransmittableThreadLocal線程間傳遞邏輯示例解析
InheritableThreadLocal線程設(shè)計
上一篇文章我們知道了TTL利用了InheritableThreadLocal線程傳遞的特性進行擴展,也可以在使用線程池時線程復(fù)用的情況也可以正確的傳遞線程私有變量,現(xiàn)在我們就學(xué)習(xí)一下其設(shè)計
首先聲明TTL重寫了InheritableThreadLocal#childValue(T parentValue) 提供了一個以InheritableThreadLocal為基礎(chǔ)的擴展。
InheritableThreadLocal 的線程傳遞只在當(dāng)子線程為new的時候會調(diào)用,接下來分析代碼
public class InheritableThreadLocal<T> extends ThreadLocal<T> { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. * <p> * This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ // 這是ThreadLocal的執(zhí)行邏輯,相當(dāng)于一個模板方法,由子類實現(xiàn),ThreadLocal不支持傳遞給子線程 protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ // 顧名思義,只有在線程new出來的時刻會調(diào)用當(dāng)前方法,然后調(diào)用childValue void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
然后看看 TTL的重寫邏輯
// Note about holder: // 1. The value of holder is type Map<TransmittableThreadLocal<?>, ?> (WeakHashMap implementation), // but it is used as *set*. // 2. WeakHashMap support null value. // 這是TTL的核心設(shè)計,組裝為一個 以TTL對象為key的map返回,同同時這個map對象還是TTL對象的一個內(nèi)部靜態(tài)對象,一直跟隨客戶端使用的TTL對象。 private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() { // 只有子線程 為new時調(diào)用 @Override protected Map<TransmittableThreadLocal<?>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(); } // 只有子線程 為new時調(diào)用,雖然做了拓展,通過一個跟隨客戶端使用的TTL對象內(nèi)部構(gòu)造了這個holder中轉(zhuǎn)站,但是還是使用的引用傳遞,如果主子線程一邊直接修改了引用的對象,另一邊也會感知到。并且存在并發(fā)修改問題。因為是增強InheritableThreadLocal,并沒有修改這里的引用傳遞邏輯。實際其它擴展有傳遞為不可變對象的邏輯 @Override protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue); } };
TTL也是使用的引用邏輯實際也有一些拓展是不可變對象的邏輯,例如
childValue在我的電腦里可以看到的子類
我們看一下CopyOnWriteSortedArrayThreadContextMap中的代碼
private ThreadLocal<StringMap> createThreadLocalMap() { return (ThreadLocal)(inheritableMap ? new InheritableThreadLocal<StringMap>() { protected StringMap childValue(StringMap parentValue) { if (parentValue == null) { return null; } else { // 主要看看這個接口 StringMap stringMap = CopyOnWriteSortedArrayThreadContextMap.this.createStringMap(parentValue); stringMap.freeze(); return stringMap; } } } : new ThreadLocal()); }
// 看名字就知道了,是一個不可變對象,也就是不同于InheritableThreadLocal和TTL傳遞的對象引用,這里做了復(fù)制后變?yōu)椴豢勺儗ο蟮倪壿?,日后小伙伴們也可以借助TTL實現(xiàn)自己不可變對象的邏輯 public interface StringMap extends ReadOnlyStringMap { void clear(); boolean equals(Object var1); void freeze(); int hashCode(); boolean isFrozen(); void putAll(ReadOnlyStringMap var1); void putValue(String var1, Object var2); void remove(String var1); }
接下來看裝飾器
裝飾器的引入,實際是對ExecutorService的執(zhí)行Runnable,Callable等真正執(zhí)行邏輯的攔截,做前,后的邏輯,而裝飾器在不改變原有對象的邏輯包裹一層后,可以做到增強的目的,其實這個裝飾器本身也是 Runnable,Callable的一個代理。
看看使用的接入
@Override public Executor getAsyncExecutor() { // 這里原本是設(shè)置一個 @Async的默認(rèn)線程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(512), FACTORY); executor.setRejectedExecutionHandler(new CustomRejectedHandler()); // 最后我們裝入了TTL的裝飾器返回 return TtlExecutors.getTtlExecutorService(executor); }
ExecutorServiceTtlWrapper作為ExecutorService的裝飾器目的就是為了再進行真正執(zhí)行的目標(biāo)接口再封裝一層裝飾器。
裝飾器
如上圖各種目標(biāo)接口的裝飾器,我們就看看 TtlCallable這個裝飾器作為@Async線程池執(zhí)行單元的增強
public final class TtlCallable<V> implements Callable<V>, TtlEnhanced { //用于threadLocal中轉(zhuǎn)的對象,通過Transmitter#capture()在裝飾器初始化時就創(chuàng)建好,實際就是獲取當(dāng)前主線程的threadLocal private final AtomicReference<Object> capturedRef; // 被裝飾的目標(biāo)對象接口 private final Callable<V> callable; // 是否釋放TTL對象傳遞過來的業(yè)務(wù)對象引用,從代碼看這里只決定了當(dāng)前TtlCallable對象的引用是否釋放,TtlCallable對象本身有一定生命周期,再者如果復(fù)用主線程傳遞過來的TTL對象引用也一直存在于主線程,目前都是false,子線程引用也會一直隨著主線程傳遞而更新 private final boolean releaseTtlValueReferenceAfterCall; private TtlCallable(@Nonnull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) { // 在new這個對象時是在主線程,所以capture()方法拿到的是主線程的TTL對象最新的引用,包括業(yè)務(wù)對象也是最新的 this.capturedRef = new AtomicReference<Object>(capture()); this.callable = callable; this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall; } /** * wrap method {@link Callable#call()}. */ @Override public V call() throws Exception { // 獲取主線程的 TTL對象map,就是通過Transmitter#capture()方法從 TTL對象中上面所說的TTL對象中的內(nèi)部holder中轉(zhuǎn)map獲取到主線程的所有TTL及業(yè)務(wù)對象引用 Object captured = capturedRef.get(); // 如果為空 或者 需要清理TTL對象引用,則進行一次原子操作對TTL對象引用置為空 if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after call!"); } // 重放 captured為當(dāng)前裝飾器初始化時從主線程拿到的,這里對其進行重放替換 // 并返回當(dāng)前子線程的 TTL對象作為還原 Object backup = replay(captured); try { //被增項的目標(biāo)方法執(zhí)行 return callable.call(); } finally { // 再將當(dāng)前子線程還原 restore(backup); } } // ----------------省略大部分代碼-------------- }
Transmitter#capture()方法
@Nonnull public static Object capture() { Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>(); // 復(fù)制的核心 是從 holder中轉(zhuǎn)對象中獲取每個key的threadLocal中的業(yè)務(wù)對象引用 // 然后再用其TTL對象作為key 組裝一個 TTL對象 -> 業(yè)務(wù)對象的map返回 for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { captured.put(threadLocal, threadLocal.copyValue()); } return captured; } //-----------copyValue 方法---------- private T copyValue() { // 復(fù)制就是從當(dāng)前主線程 的threadLocal get return copy(get()); } //------------copy 方法------ protected T copy(T parentValue) { // 復(fù)制的是對象的引用 return parentValue; }
下面看Transmitter#replay(@Nonnull Object captured) 重放邏輯
@Nonnull public static Object replay(@Nonnull Object captured) { @SuppressWarnings("unchecked") // 主線程傳遞過來的引用 Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; // 當(dāng)前子線程的TTL引用用于返回后 還原 Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // backup backup.put(threadLocal, threadLocal.get()); // clear the TTL values that is not in captured // avoid the extra TTL values after replay when run task //清除掉可能失效和舊的子線程的TTL對象引用,為什么這么做,目前不太清楚 if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set values to captured TTL // 我們上一篇文章以及當(dāng)前文章上面提到,在thread new的時候調(diào)用initialValue和childValue 方法時,會將主線程的TTL對象引用傳遞給子線程,但是不同裝飾器增強時,子線程里的TTL對象中的業(yè)務(wù)對象引用是一直不變的,一直是第一次傳遞過來的業(yè)務(wù)對象的值,而主線程的業(yè)務(wù)對象變更子線程感知不到,但是TTL對象也一直是一個引用這里將其舊的TTL引用 // 放入主線程新得 TTL中的業(yè)務(wù)對象引用,實際因為子線程的TTL對象引用和主線程的TTL對象是一樣的,只不過主線程更新了業(yè)務(wù)對象引用子線程感知不到,因為java內(nèi)存模型的原因,所以這里直接重新操作一次 子線程的TTL對象更新 *業(yè)務(wù)對象引用* 重復(fù)了一次主線程的操作 setTtlValuesTo(capturedMap); // call beforeExecute callback // 這里其實是一個模板方法,包括目標(biāo)對象執(zhí)行前也就是重放,及目標(biāo)對象執(zhí)行后,還原的實際的一個鉤子 doExecuteCallback(true); return backup; }
我們來看看 setTtlValuesTo(capturedMap); 實際就是重復(fù)了主線程的操作,使用相同的TTL對象引用對業(yè)務(wù)對象引用進行更新
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); } }
看看鉤子方法,可以用于我們擴展TTL對象進行鉤子回調(diào)
private static void doExecuteCallback(boolean isBefore) { for (Map.Entry<TransmittableThreadLocal<?>, ?> entry : holder.get().entrySet()) { TransmittableThreadLocal<?> threadLocal = entry.getKey(); try { // 兩個模板方法鉤子 if (isBefore) threadLocal.beforeExecute(); else threadLocal.afterExecute(); } catch (Throwable t) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t); } } } }
doExecuteCallback的模板方法鉤子
目前TTL對象中是空實現(xiàn)。如果繼承擴展TTL對象可能用到噢
然后是 還原方法Transmitter#restore(@Nonnull Object backup)邏輯和上面的replay方法基本相同不過邏輯是反過來的小伙伴可以自行看代碼
總結(jié)
- 利用裝飾器對ExectorService包裝后一步步的繼續(xù)利用裝飾器一直裝飾到要執(zhí)行的目標(biāo)對象接口例如Runnable,Callable等對初始化,執(zhí)行前,執(zhí)行后三個時機進行增強
- 重寫了InheritableThreadLocal#childValue 方法來傳遞 TTL定義的一個中轉(zhuǎn)map對象 key為 TTL對象
- 利用了主子線程傳遞 TTL對象的引用一致,同時用以TTL對象為key的map進行重放,直接對主線程傳遞過來的TTL對象業(yè)務(wù)對象引用進行更新,因為子對象的引用相同相當(dāng)于對子線程的TTL的業(yè)務(wù)對象引用更新。感覺用其它集合也可以,但是看代碼map可以在重放的同時更方便的清理子線程的多余的TTL對象,保證主子線程的TTL對應(yīng)一致性。
- 提供了 一些模板方法提高了擴展性 例如beforeExecute ,afterExecute
- 提供了屏蔽ForkJoin工作線程屏蔽InheritableThreadLocal的傳遞,幫助開發(fā)期間及時發(fā)現(xiàn)threadLocal的問題
其它問題,java8提供的parallelStream 并行流和CompletableFuture 都是使用ForkJoin框架實現(xiàn),使用TTL還是會有問題
在TTL源碼沒有看到關(guān)于forkJoin的增強,但是發(fā)現(xiàn)了TtlForkJoinPoolHelper類,提供了DisableInheritableForkJoinWorkerThreadFactory 的支持,為了屏蔽掉InheritableThreadLocal的傳遞防止開發(fā)測試時theadLocal錯誤傳遞的假象。
// ForkJoinWorkerThreadFactory 的裝飾器 public interface DisableInheritableForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory { /** * Unwrap {@link DisableInheritableThreadFactory} to the original/underneath one. */ @Nonnull ForkJoinWorkerThreadFactory unwrap(); }
ForkJoin的邏輯大家自行查詢資料,因為存在工作竊取等邏輯理論上是無法避免的ThreadLocal錯亂問題。所以TTL提供了屏蔽裝飾器,但是forkJoin的工作線程也可能是主線程,所以使用TTL的屏蔽邏輯只能屏蔽掉ForkJoin的工作線程,無法避免ForkJoin直接使用主線程執(zhí)行任務(wù)單元時還是有正確的threadLocal對象引用。但是這樣也足夠開發(fā)測試期間及時發(fā)現(xiàn)threadLocal的問題了。
經(jīng)過我網(wǎng)上搜索我們可以替換掉ForkJoin默認(rèn)的ForkJoinWorkerThreadFactory,增強線程創(chuàng)建邏輯。
private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); // ForkJoin會有一個擴展邏輯,這里如果獲取到指定的線程工廠類則不會使用默認(rèn)的。但是當(dāng)前makeCommonPool 方法在 static {} 代碼塊中執(zhí)行,經(jīng)過測試直接System.setProperty無法掌控好加載順序,可能獲取不到自定義的系統(tǒng)變量,索性直接通過jvm啟動參數(shù)指定 String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) // 如果有自定義的線程工廠會初始化 factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
但是看到TTL包內(nèi)的DisableInheritableForkJoinWorkerThreadFactoryWrapper 線程工廠裝飾器并沒有構(gòu)造方法,并且不是public不能繼承,也就是直接指定這個類不能被正常加載后newInstance(),又不能繼承,可能只是一個示例?那么我自定義一個類復(fù)制它的邏輯
class DisableInheritableForkJoinWorkerThreadFactoryWrapper implements DisableInheritableForkJoinWorkerThreadFactory { final ForkJoinWorkerThreadFactory threadFactory; public DisableInheritableForkJoinWorkerThreadFactoryWrapper(@Nonnull ForkJoinWorkerThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { // 看到這里在new thread時進行了 TTL對象的清理 //這個執(zhí)行時機其實還是在主線程中,如果正常不執(zhí)行這個代碼子線程會拿到一個舊的主線程的TTL對象引用,但是這里清除了,就不會拿到了,方便開發(fā)測試階段發(fā)現(xiàn)問題 final Object backup = TransmittableThreadLocal.Transmitter.clear(); try { return threadFactory.newThread(pool); } finally { // 執(zhí)行完后進行還原 TransmittableThreadLocal.Transmitter.restore(backup); } } @Nonnull @Override public ForkJoinWorkerThreadFactory unwrap() { return threadFactory; } }
我們自定義仿照上述類,直接復(fù)制的,區(qū)別是提供了構(gòu)造方法,可以讓ForkJoinPool#makeCommonPool方法可以加載擴展工廠,并且直接指定被增強的默認(rèn)ForkJoinWorkerThreadFactory
public class CustomForkJoinThreadFactory implements DisableInheritableForkJoinWorkerThreadFactory { // 被增強的默認(rèn)的線程工廠 final ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory; // 有無參構(gòu)造才可以 加載成功噢 public CustomForkJoinThreadFactory() { } @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { final Object backup = TransmittableThreadLocal.Transmitter.clear(); try { return threadFactory.newThread(pool); } finally { TransmittableThreadLocal.Transmitter.restore(backup); } } @Nonnull @Override public ForkJoinWorkerThreadFactory unwrap() { return threadFactory; } }
jvm啟動參數(shù)
(如果有辦法在ForkJoinPool的static加載前System.setProperty也可以)
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=xxx.xxx.xxx.CustomForkJoinThreadFactory
其實TTL是支持 forkJoin的線程間傳遞的,由于我沒有看官方文檔,也沒有仔細(xì)研究一下源碼中agent目錄,看來是大意了,感謝TTL作者,閱讀了一下文檔再回來試了試果然可以使用。下一篇文章去研究一下作者如何通過java agent技術(shù)實現(xiàn)無感的裝飾器,以及如何實現(xiàn)當(dāng)前文章提到的 捕捉,重放,恢復(fù)動作
TransmittableThreadLocal通過javaAgent技術(shù)實現(xiàn)線程傳遞(并且支持ForkJoin
以上就是TransmittableThreadLocal線程間傳遞邏輯示例解析的詳細(xì)內(nèi)容,更多關(guān)于TransmittableThreadLocal線程傳遞的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Elasticsearch索引結(jié)構(gòu)與算法解析
?作為搜索引擎的一部分,ES自然具有速度快、結(jié)果準(zhǔn)確、結(jié)果豐富等特點,那么ES是如何達到“搜索引擎”級別的查詢效率呢?首先是索引,其次是壓縮算法,接下來我們就一起了解下ES的索引結(jié)構(gòu)和壓縮算法2023-04-04Springboot升級至2.4.0中出現(xiàn)的跨域問題分析及修改方案
這篇文章主要介紹了Springboot升級至2.4.0中出現(xiàn)的跨域問題分析及修改方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12關(guān)于springboot忽略接口,參數(shù)注解的使用ApiIgnore
這篇文章主要介紹了關(guān)于springboot忽略接口,參數(shù)注解的使用ApiIgnore,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07SpringBoot添加SSL證書,開啟HTTPS方式(單向認(rèn)證服務(wù)端)
這篇文章主要介紹了SpringBoot添加SSL證書,開啟HTTPS方式(單向認(rèn)證服務(wù)端),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-03-03Spring?Cloud?Alibaba使用Nacos作為注冊中心和配置中心
這篇文章主要為大家介紹了Spring?Cloud?Alibaba使用Nacos作為注冊中心和配置中心的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06