Java虛擬線程(VirtualThread)使用詳解
VirtualThread
是 Java 實現(xiàn)輕量級并發(fā)(也稱為纖程或協(xié)程)的關(guān)鍵。與傳統(tǒng)的平臺線程(直接映射到操作系統(tǒng)線程)不同,虛擬線程由 JVM 管理和調(diào)度,可以在少量平臺線程上運行大量的虛擬線程,從而提高應(yīng)用程序的吞吐量。
流程概要
核心組件:
scheduler
(Executor): 每個虛擬線程都有一個調(diào)度器,它是一個 Executor
實例。
這個調(diào)度器負(fù)責(zé)安排虛擬線程在載體線程(通常是平臺線程)上執(zhí)行。
如果沒有顯式指定,它會使用默認(rèn)的 ForkJoinPool
(DEFAULT_SCHEDULER
) 或者繼承父虛擬線程的調(diào)度器。
- VirtualThread.java
// scheduler and continuation private final Executor scheduler; private final Continuation cont; private final Runnable runContinuation;
在構(gòu)造函數(shù)中,如果 scheduler
為 null
,會進(jìn)行選擇:
- VirtualThread.java
// ... existing code ... VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { super(name, characteristics, /*bound*/ false); Objects.requireNonNull(task); // choose scheduler if not specified if (scheduler == null) { Thread parent = Thread.currentThread(); if (parent instanceof VirtualThread vparent) { scheduler = vparent.scheduler; } else { scheduler = DEFAULT_SCHEDULER; } } this.scheduler = scheduler; this.cont = new VThreadContinuation(this, task); this.runContinuation = this::runContinuation; } // ... existing code ...
cont
(Continuation): 這是虛擬線程的核心。Continuation
是 JDK 內(nèi)部的一個機制,允許代碼執(zhí)行被掛起(yield)和恢復(fù)。每個虛擬線程都包裝了一個 Continuation
實例。
VThreadContinuation
是一個內(nèi)部類,它繼承自 Continuation
,并包裝了用戶提供的 Runnable
任務(wù)。
- VirtualThread.java
// ... existing code ... /** * The continuation that a virtual thread executes. */ private static class VThreadContinuation extends Continuation { VThreadContinuation(VirtualThread vthread, Runnable task) { super(VTHREAD_SCOPE, wrap(vthread, task)); } @Override protected void onPinned(Continuation.Pinned reason) { } private static Runnable wrap(VirtualThread vthread, Runnable task) { return new Runnable() { @Hidden @JvmtiHideEvents public void run() { vthread.notifyJvmtiStart(); // notify JVMTI try { vthread.run(task); } finally { vthread.notifyJvmtiEnd(); // notify JVMTI } } }; } } // ... existing code ...
runContinuation
(Runnable): 這是一個 Runnable
,其 run()
方法(即 this::runContinuation
指向的 VirtualThread#runContinuation()
方法)負(fù)責(zé)實際執(zhí)行或繼續(xù)執(zhí)行虛擬線程的任務(wù)。它處理虛擬線程的掛載(mount)到載體線程、運行 Continuation
、以及卸載(unmount)。
狀態(tài)管理: VirtualThread
內(nèi)部維護(hù)了一系列狀態(tài)常量(如 NEW
, STARTED
, RUNNING
, PARKED
, TERMINATED
等)和一個 volatile int state
字段來跟蹤其生命周期。狀態(tài)之間的轉(zhuǎn)換是精心設(shè)計的,以處理各種場景,如啟動、運行、暫停、阻塞和終止。
- VirtualThread.java
// ... existing code ... /* * Virtual thread state transitions: * * NEW -> STARTED // Thread.start, schedule to run * STARTED -> TERMINATED // failed to start * STARTED -> RUNNING // first run * RUNNING -> TERMINATED // done * * RUNNING -> PARKING // Thread parking with LockSupport.park // ... many more states ... * YIELDED -> RUNNING // continue execution after Thread.yield */ private static final int NEW = 0; private static final int STARTED = 1; private static final int RUNNING = 2; // runnable-mounted // untimed and timed parking private static final int PARKING = 3; private static final int PARKED = 4; // unmounted // ... other state constants ... private static final int TERMINATED = 99; // final state // ... existing code ...
carrierThread
(Thread): 表示當(dāng)前承載該虛擬線程執(zhí)行的平臺線程。當(dāng)虛擬線程被掛起(unmounted)時,它不占用平臺線程。當(dāng)它需要運行時,調(diào)度器會將其調(diào)度到某個可用的載體線程上執(zhí)行。
啟動流程 (start()
方法):
- VirtualThread.java
// ... existing code ... @Override void start(ThreadContainer container) { if (!compareAndSetState(NEW, STARTED)) { throw new IllegalThreadStateException("Already started"); } // bind thread to container assert threadContainer() == null; setThreadContainer(container); // start thread boolean addedToContainer = false; boolean started = false; try { container.add(this); // may throw addedToContainer = true; // scoped values may be inherited inheritScopedValueBindings(container); // submit task to run thread, using externalSubmit if possible externalSubmitRunContinuationOrThrow(); started = true; } finally { if (!started) { afterDone(addedToContainer); } } } @Override public void start() { start(ThreadContainers.root()); } // ... existing code ...
- 當(dāng)調(diào)用虛擬線程的
start()
方法時,它首先將狀態(tài)從NEW
原子地更新為STARTED
。 - 然后,它將自身添加到一個線程容器 (
ThreadContainer
) 中。 - 最關(guān)鍵的一步是調(diào)用
externalSubmitRunContinuationOrThrow()
(或類似的提交方法) 將runContinuation
任務(wù)提交給其scheduler
執(zhí)行。
執(zhí)行與掛起 (runContinuation()
和 yieldContinuation()
):
- VirtualThread.java
// ... existing code ... @ChangesCurrentThread // allow mount/unmount to be inlined private void runContinuation() { // the carrier must be a platform thread if (Thread.currentThread().isVirtual()) { throw new WrongThreadException(); } // set state to RUNNING int initialState = state(); if (initialState == STARTED || initialState == UNPARKED || initialState == UNBLOCKED || initialState == YIELDED) { // newly started or continue after parking/blocking/Thread.yield if (!compareAndSetState(initialState, RUNNING)) { return; } // consume permit when continuing after parking or blocking. If continue // after a timed-park or timed-wait then the timeout task is cancelled. if (initialState == UNPARKED) { cancelTimeoutTask(); setParkPermit(false); } else if (initialState == UNBLOCKED) { cancelTimeoutTask(); blockPermit = false; } } else { // not runnable return; } mount(); try { cont.run(); } finally { unmount(); if (cont.isDone()) { afterDone(); } else { afterYield(); } } } // ... existing code ...
- VirtualThread.java
// ... existing code ... @Hidden private boolean yieldContinuation() { notifyJvmtiUnmount(/*hide*/true); try { return Continuation.yield(VTHREAD_SCOPE); } finally { notifyJvmtiMount(/*hide*/false); } } // ... existing code ...
runContinuation()
:
- 檢查當(dāng)前線程是否為平臺線程(載體線程不能是虛擬線程)。
- 原子地更新狀態(tài)到
RUNNING
。 - 調(diào)用
mount()
方法,將虛擬線程“掛載”到當(dāng)前平臺線程上。這包括設(shè)置carrierThread
字段,并使Thread.currentThread()
返回該虛擬線程實例。 - 執(zhí)行
cont.run()
,這會實際運行或恢復(fù)Continuation
中包裝的用戶任務(wù)。 - 執(zhí)行完畢或
Continuation
掛起后,調(diào)用unmount()
方法,將虛擬線程從平臺線程上“卸載”。這包括清除carrierThread
,并使Thread.currentThread()
返回平臺線程本身。 - 根據(jù)
Continuation
是否完成 (isDone()
),調(diào)用afterDone()
或afterYield()
。 yieldContinuation()
: 當(dāng)虛擬線程需要掛起時(例如,執(zhí)行了LockSupport.park()
,或者I/O操作會阻塞),它會調(diào)用Continuation.yield(VTHREAD_SCOPE)
。這會導(dǎo)致Continuation
的執(zhí)行暫停,runContinuation()
方法中的cont.run()
調(diào)用會返回。此時,載體平臺線程可以被釋放去執(zhí)行其他任務(wù)。afterYield()
: 當(dāng)yieldContinuation()
成功掛起后,此方法被調(diào)用。它根據(jù)虛擬線程掛起前的原因(如PARKING
,YIELDING
,BLOCKING
,WAITING
)更新狀態(tài),并可能安排后續(xù)操作(如設(shè)置定時器喚醒、重新提交到調(diào)度器等)。
Parking (park()
和 parkNanos()
):
- VirtualThread.java
// ... existing code ... @Override void park() { assert Thread.currentThread() == this; // complete immediately if parking permit available or interrupted if (getAndSetParkPermit(false) || interrupted) return; // park the thread boolean yielded = false; setState(PARKING); try { yielded = yieldContinuation(); } catch (OutOfMemoryError e) { // park on carrier } finally { assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); if (!yielded) { assert state() == PARKING; setState(RUNNING); } } // park on the carrier thread when pinned if (!yielded) { parkOnCarrierThread(false, 0); } } // ... existing code ...
- 當(dāng)虛擬線程調(diào)用
LockSupport.park()
或LockSupport.parkNanos()
時,會調(diào)用VirtualThread
內(nèi)部的park()
或parkNanos()
方法。 - 這些方法會嘗試通過
yieldContinuation()
來掛起虛擬線程。 - 如果
yieldContinuation()
成功(即虛擬線程沒有被固定在載體線程上),虛擬線程的狀態(tài)會變?yōu)?PARKED
或TIMED_PARKED
,并且載體線程被釋放。 - 如果
yieldContinuation()
失?。ɡ纾摂M線程在 JNI 調(diào)用中或synchronized
塊中被固定),它會退而求其次,在當(dāng)前的載體線程上實際執(zhí)行U.park()
(通過parkOnCarrierThread
方法),此時載體線程會被阻塞,狀態(tài)變?yōu)?PINNED
或TIMED_PINNED
。
調(diào)度 (submitRunContinuation
系列方法):
- VirtualThread.java
// ... existing code ... private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { boolean done = false; while (!done) { try { // Pin the continuation to prevent the virtual thread from unmounting // when submitting a task. For the default scheduler this ensures that // the carrier doesn't change when pushing a task. For other schedulers // it avoids deadlock that could arise due to carriers and virtual // threads contending for a lock. if (currentThread().isVirtual()) { Continuation.pin(); try { scheduler.execute(runContinuation); } finally { Continuation.unpin(); } } else { scheduler.execute(runContinuation); } done = true; } catch (RejectedExecutionException ree) { submitFailed(ree); throw ree; } catch (OutOfMemoryError e) { if (retryOnOOME) { U.park(false, 100_000_000); // 100ms } else { throw e; } } } } // ... existing code ...
- 有多種提交
runContinuation
任務(wù)到調(diào)度器的方法,如submitRunContinuation()
,lazySubmitRunContinuation()
,externalSubmitRunContinuation()
。 - 這些方法處理了向調(diào)度器提交任務(wù)的細(xì)節(jié),包括在特定情況下(如在虛擬線程內(nèi)部提交任務(wù)給調(diào)度器)需要固定
Continuation
(Continuation.pin()
) 以防止在提交過程中虛擬線程被意外卸載。
總而言之,VirtualThread
的實現(xiàn)巧妙地結(jié)合了 Continuation
機制和 Executor
調(diào)度框架。Continuation
提供了掛起和恢復(fù)執(zhí)行流的能力,使得虛擬線程在阻塞操作時可以釋放底層的平臺線程。調(diào)度器則負(fù)責(zé)在虛擬線程準(zhǔn)備好運行時,將其分配給一個可用的平臺線程執(zhí)行。通過這種方式,少量的平臺線程就可以支持大量并發(fā)的虛擬線程,大大降低了線程創(chuàng)建和上下文切換的開銷。狀態(tài)管理確保了虛擬線程在各種并發(fā)場景下的正確行為。
調(diào)度器繼承
instanceof
模式匹配語法 (parent instanceof VirtualThread vparent
)
這是一種 Java 的語法特性,稱為 模式匹配 (Pattern Matching) for instanceof
。它從 Java 14 開始作為預(yù)覽特性引入,并在 Java 16 中正式發(fā)布。
在以前的 Java 版本中,如果想檢查一個對象的類型,并且如果類型匹配則將其轉(zhuǎn)換為該類型并使用,你通常會這樣寫:
if (parent instanceof VirtualThread) { VirtualThread vparent = (VirtualThread) parent; // 然后使用 vparent scheduler = vparent.scheduler; }
模式匹配 instanceof
簡化了這個過程。
if (parent instanceof VirtualThread vparent)
這行代碼做了兩件事:
- 類型檢查: 它檢查
parent
對象是否是VirtualThread
的一個實例。 - 條件聲明和賦值: 如果
parent
確實是VirtualThread
的實例,那么它會聲明一個新的局部變量vparent
(類型為VirtualThread
),并將parent
自動轉(zhuǎn)換 (cast) 為VirtualThread
類型后賦值給vparent
。這個vparent
變量只在if
語句塊為真(即類型匹配成功)的作用域內(nèi)有效。
這是一種更簡潔、更安全的寫法,避免了顯式的類型轉(zhuǎn)換和引入額外的變量聲明步驟。
這里的“父子關(guān)系”并不是指操作系統(tǒng)層面嚴(yán)格的父子進(jìn)程或線程關(guān)系,而是指創(chuàng)建者與被創(chuàng)建者的關(guān)系。
Thread parent = Thread.currentThread();
:這行代碼獲取的是當(dāng)前正在執(zhí)行 VirtualThread
構(gòu)造函數(shù)的線程。這個線程就是新虛擬線程的“創(chuàng)建者”或“父”線程。
邏輯:
當(dāng)創(chuàng)建一個新的 VirtualThread
時,可以顯式地給它傳遞一個 scheduler
。
如果調(diào)用者沒有提供 scheduler
(即 scheduler == null
),那么虛擬線程的構(gòu)造邏輯會嘗試確定一個默認(rèn)的調(diào)度器。
這時,它會檢查創(chuàng)建這個新虛擬線程的線程 (parent
):
- 如果
parent
本身也是一個VirtualThread
(parent instanceof VirtualThread vparent
):那么新的虛擬線程將繼承其創(chuàng)建者虛擬線程的調(diào)度器 (scheduler = vparent.scheduler;
)。 - 如果
parent
是一個平臺線程 (platform thread) 或者其他非VirtualThread
類型: 那么新的虛擬線程將使用默認(rèn)的調(diào)度器 (scheduler = DEFAULT_SCHEDULER;
),這個默認(rèn)調(diào)度器通常是一個ForkJoinPool
。
這種設(shè)計體現(xiàn)了一種“上下文感知”的默認(rèn)行為。如果你的代碼已經(jīng)在某個特定的虛擬線程(它使用著特定的調(diào)度器)中運行,當(dāng)你從這個虛擬線程中再創(chuàng)建一個新的虛擬線程時,讓新的虛擬線程默認(rèn)使用與創(chuàng)建者相同的調(diào)度器通常是合理的。這有助于:
- 資源管理: 如果你為一組相關(guān)的任務(wù)配置了特定的調(diào)度器(例如,具有特定線程池大小或優(yōu)先級的調(diào)度器),那么從這個組內(nèi)派生的新虛擬線程默認(rèn)使用相同的調(diào)度器可以保持資源使用的一致性。
- 行為一致性: 任務(wù)的執(zhí)行特性(如并發(fā)級別)可以更容易地在相關(guān)的虛擬線程間保持一致。
簡單來說,如果一個虛擬線程A創(chuàng)建了另一個虛擬線程B,并且沒有為B指定調(diào)度器,那么B就會默認(rèn)使用A的調(diào)度器。如果是一個平臺線程創(chuàng)建了虛擬線程B,并且沒有為B指定調(diào)度器,那么B就會使用全局默認(rèn)的調(diào)度器。
Continuation 類深度分析
Continuation
類是 Java 實現(xiàn)輕量級線程(虛擬線程)的基石。它代表了一種一次性(one-shot)的分界延續(xù)(delimited continuation)。簡單來說,它封裝了一段計算(一個 Runnable
任務(wù)),這段計算可以被掛起(yield),并在之后從掛起點恢復(fù)執(zhí)行。
核心能力與特性:
封裝計算單元:
- 每個
Continuation
對象都關(guān)聯(lián)一個Runnable
任務(wù) (this.target
) 和一個ContinuationScope
(this.scope
)。 ContinuationScope
用于界定yield
操作的范圍。當(dāng)調(diào)用Continuation.yield(scope)
時,執(zhí)行會從當(dāng)前Continuation
向上回溯,直到找到匹配該scope
的Continuation
實例,然后掛起。
執(zhí)行與掛起 (run()
和 yield()
):
run()
: 這是啟動或恢復(fù) Continuation
執(zhí)行的入口點。
- 它首先會嘗試“掛載”(mount)
Continuation
到當(dāng)前的載體線程(carrier thread)。掛載意味著將Continuation
的執(zhí)行上下文(主要是棧幀)與載體線程關(guān)聯(lián)起來。 - 通過
JLA.setContinuation(t, this)
將當(dāng)前Continuation
設(shè)置為載體線程的活動Continuation
。 - 調(diào)用本地方法
enterSpecial(this, isContinue, isVirtualThread)
來實際進(jìn)入或恢復(fù)Continuation
的執(zhí)行。這個本地方法是 JVM 實現(xiàn)Continuation
魔法的核心,它處理棧的切換和管理。 enterSpecial
內(nèi)部會調(diào)用enter()
,進(jìn)而調(diào)用target.run()
來執(zhí)行用戶代碼。- 當(dāng)
Continuation
內(nèi)部調(diào)用Continuation.yield(scope)
時,enterSpecial
(或其調(diào)用的更深層本地代碼) 會保存當(dāng)前執(zhí)行狀態(tài)(棧幀等),然后“返回”到run()
方法中enterSpecial
調(diào)用的地方。 run()
方法的finally
塊負(fù)責(zé)“卸載”(unmount)Continuation
,清理狀態(tài),并將載體線程的活動Continuation
恢復(fù)為其父Continuation
(如果存在)。
yield(ContinuationScope scope)
(靜態(tài)方法):
- 這是
Continuation
主動讓出執(zhí)行權(quán)的方式。 - 它會找到當(dāng)前線程上與指定
scope
匹配的最內(nèi)層Continuation
。 - 然后調(diào)用該
Continuation
實例的yield0(scope, child)
方法(這是一個非靜態(tài)的內(nèi)部方法,最終會觸發(fā)本地代碼doYield()
),將執(zhí)行權(quán)交還給其父Continuation
或調(diào)度器。 yieldInfo
字段用于在yield
時傳遞信息,決定是徹底返回還是在父Continuation
中繼續(xù)yield
。
棧管理 (StackChunk
):
Continuation
的執(zhí)行棧不是直接使用平臺線程的完整棧,而是由一系列StackChunk
對象來管理。當(dāng)Continuation
掛起時,它的活動棧幀被保存在這些StackChunk
中?;謴?fù)時,這些棧幀被重新加載。private StackChunk tail;
字段指向Continuation
棧的當(dāng)前末端。isEmpty()
方法檢查所有StackChunk
是否都為空,用于判斷Continuation
是否執(zhí)行完畢。
狀態(tài)管理:
done
: 標(biāo)記Continuation
的Runnable
是否已經(jīng)執(zhí)行完畢。mounted
: 一個volatile
標(biāo)志,表示Continuation
當(dāng)前是否掛載在某個載體線程上。mount()
和unmount()
方法以及compareAndSetMounted()
原子地更新此狀態(tài)。scopedValueCache
: 用于支持ScopedValue
,在Continuation
掛載和卸載時保存和恢復(fù)作用域值緩存。
父子關(guān)系 (parent
, child
):
Continuation
可以形成一個層級結(jié)構(gòu)。當(dāng)一個Continuation
(父) 內(nèi)部的某個點創(chuàng)建并運行另一個Continuation
(子) 時,它們之間就建立了父子關(guān)系。JLA.getContinuation(currentCarrierThread())
用于獲取當(dāng)前載體線程上活動的Continuation
。run()
方法中會處理parent
和child
的設(shè)置,確保在yield
和恢復(fù)時能正確地在Continuation
層級間導(dǎo)航。
pinning (固定):
Pinned
枚舉(NATIVE
,MONITOR
,CRITICAL_SECTION
,EXCEPTION
)定義了Continuation
可能被“固定”在載體線程上而無法安全yield
的原因。例如,如果Continuation
的執(zhí)行進(jìn)入了本地方法(JNI),或者持有一個對象監(jiān)視器鎖(synchronized
塊),它就可能被固定。VThreadContinuation
(在VirtualThread.java
中定義) 的onPinned()
方法是一個回調(diào),當(dāng)Continuation
被固定時會被調(diào)用。虛擬線程的實現(xiàn)會根據(jù)這個信息決定是阻塞載體線程還是采取其他策略。
與 JVM 的深度集成:
@IntrinsicCandidate
注解的本地方法如doYield()
和enterSpecial()
表明這些方法的實現(xiàn)是由 JVM 高度優(yōu)化的,它們直接操縱線程棧和執(zhí)行狀態(tài)。jdk.internal.access.JavaLangAccess
(JLA) 和jdk.internal.access.SharedSecrets
用于在java.base
模塊內(nèi)部訪問java.lang.Thread
等類的包私有或私有成員,這是實現(xiàn)Continuation
與線程狀態(tài)緊密集成的關(guān)鍵。
Continuation
提供的核心能力: 它提供了一種機制,使得一段Java代碼的執(zhí)行可以在不阻塞底層平臺線程的情況下被暫停,其狀態(tài)(主要是調(diào)用棧)被保存起來,之后可以在相同的或不同的平臺線程上從暫停點恢復(fù)執(zhí)行。這是實現(xiàn)用戶態(tài)線程(如虛擬線程)的基礎(chǔ)。
實現(xiàn)多個虛擬線程的 JVM 級別調(diào)度還需要什么?
僅僅有 Continuation
是不夠的,還需要一個完整的框架來管理和調(diào)度它們,這正是 java.lang.VirtualThread
所做的事情。關(guān)鍵組件包括:
虛擬線程的表示 (VirtualThread
類):
- 每個虛擬線程實例內(nèi)部包裝一個
Continuation
。 - 管理虛擬線程的生命周期狀態(tài)(
NEW
,STARTED
,RUNNING
,PARKED
,TERMINATED
等)。 - 處理中斷、加入 (
join
) 等線程操作。
調(diào)度器 (Executor
):
VirtualThread
需要一個調(diào)度器(通常是ForkJoinPool
,如DEFAULT_SCHEDULER
)來執(zhí)行它的Continuation
。- 調(diào)度器負(fù)責(zé)將準(zhǔn)備好運行的虛擬線程(其
Continuation
)提交給一個可用的平臺線程(載體線程)來執(zhí)行。 - 當(dāng)虛擬線程
yield
時,它會從載體線程上卸載,載體線程可以被調(diào)度器用于執(zhí)行其他虛擬線程或任務(wù)。
阻塞操作的適配:
- 標(biāo)準(zhǔn)庫中的阻塞操作(如
LockSupport.park()
,Object.wait()
, 大部分同步 I/O 操作)需要被適配,以便在虛擬線程中調(diào)用時,它們能夠觸發(fā)Continuation.yield()
而不是阻塞載體線程。 - 例如,
VirtualThread.park()
方法會嘗試yieldContinuation()
。如果成功,虛擬線程掛起,載體線程釋放。如果失敗(因為Continuation
被固定),則會退化為在載體線程上實際park
。
與平臺線程的交互(掛載/卸載):
VirtualThread.mount()
: 當(dāng)虛擬線程的Continuation
開始在載體線程上運行時,需要將虛擬線程設(shè)置為Thread.currentThread()
的返回值,并記錄載體線程。VirtualThread.unmount()
: 當(dāng)Continuation
yield
或執(zhí)行完畢時,需要從載體線程上卸載,恢復(fù)Thread.currentThread()
指向載體線程本身。
固定 (Pinning) 的處理:
- 需要有機制檢測
Continuation
何時被固定(例如,在 JNI 調(diào)用中或持有監(jiān)視器鎖)。 - 當(dāng)虛擬線程被固定時,如果它執(zhí)行了阻塞操作,那么載體線程本身可能會被阻塞,因為
Continuation
無法yield
。這是虛擬線程的一個重要性能考量點。
線程局部變量和作用域值:
- 需要確保線程局部變量(
ThreadLocal
)對于虛擬線程按預(yù)期工作(即每個虛擬線程有自己的副本)。 ScopedValue
是一個更現(xiàn)代的替代方案,與虛擬線程和Continuation
結(jié)合得更好。Continuation
類中的scopedValueCache
字段就是為此服務(wù)的。
如果自己設(shè)計虛擬線程調(diào)度應(yīng)該怎么做?
這是一個非常復(fù)雜的系統(tǒng)工程,深度依賴于 JVM 的底層支持。但從概念上講,可以設(shè)想以下組件:
MyContinuation
:
- 核心: 能夠保存和恢復(fù)執(zhí)行上下文(調(diào)用棧、程序計數(shù)器、寄存器)。這部分是最難的,需要 JVM 指令集層面的支持,或者像
libcontext
這樣的庫(但 Java 沒有直接使用這個)。在 JDK 中,這是通過StackChunk
和大量本地代碼實現(xiàn)的。 - 接口:
void init(Runnable task)
,boolean resume()
,void yield()
,boolean isDone()
。 - 狀態(tài):
INITIAL
,SUSPENDED
,RUNNING
,DONE
。
MyVirtualThread
:
- 屬性: ID, 名稱, 狀態(tài) (
NEW
,RUNNABLE
,BLOCKED
,TERMINATED
), 優(yōu)先級 (可選)。 - 包含: 一個
MyContinuation
實例,一個Runnable
任務(wù)。 - 方法:
start()
,interrupt()
,join()
,getState()
。
MyScheduler
(調(diào)度器):
- 核心: 一個或多個平臺線程(稱為工作線程或載體線程)。
- 隊列: 一個或多個用于存放
MyVirtualThread
的就緒隊列。
調(diào)度循環(huán):
- 工作線程從就緒隊列中取出一個
MyVirtualThread
。 - 設(shè)置
Thread.currentThread()
指向這個MyVirtualThread
(邏輯上的)。 - 調(diào)用
myVirtualThread.getContinuation().resume()
。 - 如果
resume()
返回是因為yield()
被調(diào)用: - 根據(jù)
yield
的原因(例如,等待 I/O,等待鎖),將MyVirtualThread
放入相應(yīng)的等待結(jié)構(gòu)中,或者如果只是普通的yield
,放回就緒隊列。 - 如果
resume()
返回是因為任務(wù)完成 (isDone()
為 true): - 更新
MyVirtualThread
狀態(tài)為TERMINATED
,處理join
等待。 - 恢復(fù)
Thread.currentThread()
指向平臺工作線程。 - 工作線程繼續(xù)從隊列取下一個任務(wù)。
同步原語和 I/O 適配:
鎖 (MyLock
): 當(dāng) MyVirtualThread
嘗試獲取一個已被持有的鎖時,它不應(yīng)阻塞平臺工作線程。而是:
- 將
MyVirtualThread
放入該鎖的等待隊列。 - 調(diào)用其
MyContinuation.yield()
。 - 當(dāng)鎖被釋放時,調(diào)度器將等待隊列中的一個
MyVirtualThread
移回就緒隊列。
I/O: 對于非阻塞 I/O (NIO):
- 發(fā)起非阻塞 I/O 操作。
- 將
MyVirtualThread
和一個回調(diào)(當(dāng) I/O 完成時調(diào)用)注冊到Selector
。 - 調(diào)用
MyContinuation.yield()
。 - 當(dāng)
Selector
檢測到 I/O 事件完成時,執(zhí)行回調(diào),回調(diào)將對應(yīng)的MyVirtualThread
重新放入調(diào)度器的就緒隊列。
Pinning 處理:
- 需要一種方法來標(biāo)記代碼段(如 JNI 調(diào)用,
synchronized
塊)是不可yield
的。 - 如果一個
MyVirtualThread
在這種不可yield
的代碼段中嘗試執(zhí)行一個會yield
的操作(如獲取MyLock
),調(diào)度器可能不得不阻塞當(dāng)前平臺工作線程,或者拋出異常,或者有其他備用策略。
挑戰(zhàn):
- 棧操作: 安全、高效地保存和恢復(fù)調(diào)用棧是最大的挑戰(zhàn),這需要 JVM 的深度配合。
- 與現(xiàn)有 Java 生態(tài)的兼容性: 大量現(xiàn)有庫依賴于
Thread.currentThread()
的行為和阻塞原語。 - 調(diào)試和監(jiān)控: 調(diào)試跨越多個
Continuation
片段的代碼會更復(fù)雜。
JDK 中的 Continuation
和 VirtualThread
在 JVM 層面解決了這些核心挑戰(zhàn)。自己從頭設(shè)計一個類似的系統(tǒng)將是一項艱巨的任務(wù),但理解其基本原理有助于更好地使用這些高級并發(fā)特性。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問
本篇文章主要介紹了詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問,非常具有實用價值,有興趣的可以了解一下2017-09-09Java使用kafka發(fā)送和生產(chǎn)消息的示例
本篇文章主要介紹了Java使用kafka發(fā)送和生產(chǎn)消息的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04解決?IDEA?Maven?項目中"Could?not?find?artifact"?
這篇文章主要介紹了解決IDEA Maven項目中Could not?find?artifact問題的常見情況和解決方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07elasticsearch?java客戶端action的實現(xiàn)簡單分析
這篇文章主要為大家介紹了elasticsearch?java客戶端action的實現(xiàn)簡單分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04SpringBoot 使用WebSocket功能(實現(xiàn)步驟)
本文通過詳細(xì)步驟介紹了SpringBoot 使用WebSocket功能,首先需要導(dǎo)入WebSocket坐標(biāo),編寫WebSocket配置類,用于注冊WebSocket的Bean,結(jié)合示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-02-02