Java虛擬線程(VirtualThread)使用詳解
VirtualThread 是 Java 實(shí)現(xiàn)輕量級并發(fā)(也稱為纖程或協(xié)程)的關(guān)鍵。與傳統(tǒng)的平臺線程(直接映射到操作系統(tǒng)線程)不同,虛擬線程由 JVM 管理和調(diào)度,可以在少量平臺線程上運(yùn)行大量的虛擬線程,從而提高應(yīng)用程序的吞吐量。
流程概要
核心組件:
scheduler (Executor): 每個(gè)虛擬線程都有一個(gè)調(diào)度器,它是一個(gè) Executor 實(shí)例。
這個(gè)調(diào)度器負(fù)責(zé)安排虛擬線程在載體線程(通常是平臺線程)上執(zhí)行。
如果沒有顯式指定,它會使用默認(rèn)的 ForkJoinPool (DEFAULT_SCHEDULER) 或者繼承父虛擬線程的調(diào)度器。
- VirtualThread.java
// scheduler and continuation private final Executor scheduler; private final Continuation cont; private final Runnable runContinuation;
在構(gòu)造函數(shù)中,如果 scheduler 為 null,會進(jìn)行選擇:
- VirtualThread.java
// ... existing code ...
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}
// ... existing code ...cont (Continuation): 這是虛擬線程的核心。Continuation 是 JDK 內(nèi)部的一個(gè)機(jī)制,允許代碼執(zhí)行被掛起(yield)和恢復(fù)。每個(gè)虛擬線程都包裝了一個(gè) Continuation 實(shí)例。
VThreadContinuation 是一個(gè)內(nèi)部類,它繼承自 Continuation,并包裝了用戶提供的 Runnable 任務(wù)。
- VirtualThread.java
// ... existing code ...
/**
* The continuation that a virtual thread executes.
*/
private static class VThreadContinuation extends Continuation {
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
@JvmtiHideEvents
public void run() {
vthread.notifyJvmtiStart(); // notify JVMTI
try {
vthread.run(task);
} finally {
vthread.notifyJvmtiEnd(); // notify JVMTI
}
}
};
}
}
// ... existing code ...runContinuation (Runnable): 這是一個(gè) Runnable,其 run() 方法(即 this::runContinuation 指向的 VirtualThread#runContinuation() 方法)負(fù)責(zé)實(shí)際執(zhí)行或繼續(xù)執(zhí)行虛擬線程的任務(wù)。它處理虛擬線程的掛載(mount)到載體線程、運(yùn)行 Continuation、以及卸載(unmount)。
狀態(tài)管理: VirtualThread 內(nèi)部維護(hù)了一系列狀態(tài)常量(如 NEW, STARTED, RUNNING, PARKED, TERMINATED 等)和一個(gè) volatile int state 字段來跟蹤其生命周期。狀態(tài)之間的轉(zhuǎn)換是精心設(shè)計(jì)的,以處理各種場景,如啟動、運(yùn)行、暫停、阻塞和終止。
- VirtualThread.java
// ... existing code ... /* * Virtual thread state transitions: * * NEW -> STARTED // Thread.start, schedule to run * STARTED -> TERMINATED // failed to start * STARTED -> RUNNING // first run * RUNNING -> TERMINATED // done * * RUNNING -> PARKING // Thread parking with LockSupport.park // ... many more states ... * YIELDED -> RUNNING // continue execution after Thread.yield */ private static final int NEW = 0; private static final int STARTED = 1; private static final int RUNNING = 2; // runnable-mounted // untimed and timed parking private static final int PARKING = 3; private static final int PARKED = 4; // unmounted // ... other state constants ... private static final int TERMINATED = 99; // final state // ... existing code ...
carrierThread (Thread): 表示當(dāng)前承載該虛擬線程執(zhí)行的平臺線程。當(dāng)虛擬線程被掛起(unmounted)時(shí),它不占用平臺線程。當(dāng)它需要運(yùn)行時(shí),調(diào)度器會將其調(diào)度到某個(gè)可用的載體線程上執(zhí)行。
啟動流程 (start() 方法):
- VirtualThread.java
// ... existing code ...
@Override
void start(ThreadContainer container) {
if (!compareAndSetState(NEW, STARTED)) {
throw new IllegalThreadStateException("Already started");
}
// bind thread to container
assert threadContainer() == null;
setThreadContainer(container);
// start thread
boolean addedToContainer = false;
boolean started = false;
try {
container.add(this); // may throw
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
// submit task to run thread, using externalSubmit if possible
externalSubmitRunContinuationOrThrow();
started = true;
} finally {
if (!started) {
afterDone(addedToContainer);
}
}
}
@Override
public void start() {
start(ThreadContainers.root());
}
// ... existing code ...- 當(dāng)調(diào)用虛擬線程的
start()方法時(shí),它首先將狀態(tài)從NEW原子地更新為STARTED。 - 然后,它將自身添加到一個(gè)線程容器 (
ThreadContainer) 中。 - 最關(guān)鍵的一步是調(diào)用
externalSubmitRunContinuationOrThrow()(或類似的提交方法) 將runContinuation任務(wù)提交給其scheduler執(zhí)行。
執(zhí)行與掛起 (runContinuation() 和 yieldContinuation()):
- VirtualThread.java
// ... existing code ...
@ChangesCurrentThread // allow mount/unmount to be inlined
private void runContinuation() {
// the carrier must be a platform thread
if (Thread.currentThread().isVirtual()) {
throw new WrongThreadException();
}
// set state to RUNNING
int initialState = state();
if (initialState == STARTED || initialState == UNPARKED
|| initialState == UNBLOCKED || initialState == YIELDED) {
// newly started or continue after parking/blocking/Thread.yield
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume permit when continuing after parking or blocking. If continue
// after a timed-park or timed-wait then the timeout task is cancelled.
if (initialState == UNPARKED) {
cancelTimeoutTask();
setParkPermit(false);
} else if (initialState == UNBLOCKED) {
cancelTimeoutTask();
blockPermit = false;
}
} else {
// not runnable
return;
}
mount();
try {
cont.run();
} finally {
unmount();
if (cont.isDone()) {
afterDone();
} else {
afterYield();
}
}
}
// ... existing code ...- VirtualThread.java
// ... existing code ...
@Hidden
private boolean yieldContinuation() {
notifyJvmtiUnmount(/*hide*/true);
try {
return Continuation.yield(VTHREAD_SCOPE);
} finally {
notifyJvmtiMount(/*hide*/false);
}
}
// ... existing code ...runContinuation():
- 檢查當(dāng)前線程是否為平臺線程(載體線程不能是虛擬線程)。
- 原子地更新狀態(tài)到
RUNNING。 - 調(diào)用
mount()方法,將虛擬線程“掛載”到當(dāng)前平臺線程上。這包括設(shè)置carrierThread字段,并使Thread.currentThread()返回該虛擬線程實(shí)例。 - 執(zhí)行
cont.run(),這會實(shí)際運(yùn)行或恢復(fù)Continuation中包裝的用戶任務(wù)。 - 執(zhí)行完畢或
Continuation掛起后,調(diào)用unmount()方法,將虛擬線程從平臺線程上“卸載”。這包括清除carrierThread,并使Thread.currentThread()返回平臺線程本身。 - 根據(jù)
Continuation是否完成 (isDone()),調(diào)用afterDone()或afterYield()。 yieldContinuation(): 當(dāng)虛擬線程需要掛起時(shí)(例如,執(zhí)行了LockSupport.park(),或者I/O操作會阻塞),它會調(diào)用Continuation.yield(VTHREAD_SCOPE)。這會導(dǎo)致Continuation的執(zhí)行暫停,runContinuation()方法中的cont.run()調(diào)用會返回。此時(shí),載體平臺線程可以被釋放去執(zhí)行其他任務(wù)。afterYield(): 當(dāng)yieldContinuation()成功掛起后,此方法被調(diào)用。它根據(jù)虛擬線程掛起前的原因(如PARKING,YIELDING,BLOCKING,WAITING)更新狀態(tài),并可能安排后續(xù)操作(如設(shè)置定時(shí)器喚醒、重新提交到調(diào)度器等)。
Parking (park() 和 parkNanos()):
- VirtualThread.java
// ... existing code ...
@Override
void park() {
assert Thread.currentThread() == this;
// complete immediately if parking permit available or interrupted
if (getAndSetParkPermit(false) || interrupted)
return;
// park the thread
boolean yielded = false;
setState(PARKING);
try {
yielded = yieldContinuation();
} catch (OutOfMemoryError e) {
// park on carrier
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
}
}
// park on the carrier thread when pinned
if (!yielded) {
parkOnCarrierThread(false, 0);
}
}
// ... existing code ...- 當(dāng)虛擬線程調(diào)用
LockSupport.park()或LockSupport.parkNanos()時(shí),會調(diào)用VirtualThread內(nèi)部的park()或parkNanos()方法。 - 這些方法會嘗試通過
yieldContinuation()來掛起虛擬線程。 - 如果
yieldContinuation()成功(即虛擬線程沒有被固定在載體線程上),虛擬線程的狀態(tài)會變?yōu)?PARKED或TIMED_PARKED,并且載體線程被釋放。 - 如果
yieldContinuation()失?。ɡ纾摂M線程在 JNI 調(diào)用中或synchronized塊中被固定),它會退而求其次,在當(dāng)前的載體線程上實(shí)際執(zhí)行U.park()(通過parkOnCarrierThread方法),此時(shí)載體線程會被阻塞,狀態(tài)變?yōu)?PINNED或TIMED_PINNED。
調(diào)度 (submitRunContinuation 系列方法):
- VirtualThread.java
// ... existing code ...
private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
boolean done = false;
while (!done) {
try {
// Pin the continuation to prevent the virtual thread from unmounting
// when submitting a task. For the default scheduler this ensures that
// the carrier doesn't change when pushing a task. For other schedulers
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentThread().isVirtual()) {
Continuation.pin();
try {
scheduler.execute(runContinuation);
} finally {
Continuation.unpin();
}
} else {
scheduler.execute(runContinuation);
}
done = true;
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
if (retryOnOOME) {
U.park(false, 100_000_000); // 100ms
} else {
throw e;
}
}
}
}
// ... existing code ...- 有多種提交
runContinuation任務(wù)到調(diào)度器的方法,如submitRunContinuation(),lazySubmitRunContinuation(),externalSubmitRunContinuation()。 - 這些方法處理了向調(diào)度器提交任務(wù)的細(xì)節(jié),包括在特定情況下(如在虛擬線程內(nèi)部提交任務(wù)給調(diào)度器)需要固定
Continuation(Continuation.pin()) 以防止在提交過程中虛擬線程被意外卸載。
總而言之,VirtualThread 的實(shí)現(xiàn)巧妙地結(jié)合了 Continuation 機(jī)制和 Executor 調(diào)度框架。Continuation 提供了掛起和恢復(fù)執(zhí)行流的能力,使得虛擬線程在阻塞操作時(shí)可以釋放底層的平臺線程。調(diào)度器則負(fù)責(zé)在虛擬線程準(zhǔn)備好運(yùn)行時(shí),將其分配給一個(gè)可用的平臺線程執(zhí)行。通過這種方式,少量的平臺線程就可以支持大量并發(fā)的虛擬線程,大大降低了線程創(chuàng)建和上下文切換的開銷。狀態(tài)管理確保了虛擬線程在各種并發(fā)場景下的正確行為。
調(diào)度器繼承
instanceof 模式匹配語法 (parent instanceof VirtualThread vparent)
這是一種 Java 的語法特性,稱為 模式匹配 (Pattern Matching) for instanceof。它從 Java 14 開始作為預(yù)覽特性引入,并在 Java 16 中正式發(fā)布。
在以前的 Java 版本中,如果想檢查一個(gè)對象的類型,并且如果類型匹配則將其轉(zhuǎn)換為該類型并使用,你通常會這樣寫:
if (parent instanceof VirtualThread) {
VirtualThread vparent = (VirtualThread) parent;
// 然后使用 vparent
scheduler = vparent.scheduler;
}模式匹配 instanceof 簡化了這個(gè)過程。
if (parent instanceof VirtualThread vparent) 這行代碼做了兩件事:
- 類型檢查: 它檢查
parent對象是否是VirtualThread的一個(gè)實(shí)例。 - 條件聲明和賦值: 如果
parent確實(shí)是VirtualThread的實(shí)例,那么它會聲明一個(gè)新的局部變量vparent(類型為VirtualThread),并將parent自動轉(zhuǎn)換 (cast) 為VirtualThread類型后賦值給vparent。這個(gè)vparent變量只在if語句塊為真(即類型匹配成功)的作用域內(nèi)有效。
這是一種更簡潔、更安全的寫法,避免了顯式的類型轉(zhuǎn)換和引入額外的變量聲明步驟。
這里的“父子關(guān)系”并不是指操作系統(tǒng)層面嚴(yán)格的父子進(jìn)程或線程關(guān)系,而是指創(chuàng)建者與被創(chuàng)建者的關(guān)系。
Thread parent = Thread.currentThread();:這行代碼獲取的是當(dāng)前正在執(zhí)行 VirtualThread 構(gòu)造函數(shù)的線程。這個(gè)線程就是新虛擬線程的“創(chuàng)建者”或“父”線程。
邏輯:
當(dāng)創(chuàng)建一個(gè)新的 VirtualThread 時(shí),可以顯式地給它傳遞一個(gè) scheduler。
如果調(diào)用者沒有提供 scheduler (即 scheduler == null),那么虛擬線程的構(gòu)造邏輯會嘗試確定一個(gè)默認(rèn)的調(diào)度器。
這時(shí),它會檢查創(chuàng)建這個(gè)新虛擬線程的線程 (parent):
- 如果
parent本身也是一個(gè)VirtualThread(parent instanceof VirtualThread vparent):那么新的虛擬線程將繼承其創(chuàng)建者虛擬線程的調(diào)度器 (scheduler = vparent.scheduler;)。 - 如果
parent是一個(gè)平臺線程 (platform thread) 或者其他非VirtualThread類型: 那么新的虛擬線程將使用默認(rèn)的調(diào)度器 (scheduler = DEFAULT_SCHEDULER;),這個(gè)默認(rèn)調(diào)度器通常是一個(gè)ForkJoinPool。
這種設(shè)計(jì)體現(xiàn)了一種“上下文感知”的默認(rèn)行為。如果你的代碼已經(jīng)在某個(gè)特定的虛擬線程(它使用著特定的調(diào)度器)中運(yùn)行,當(dāng)你從這個(gè)虛擬線程中再創(chuàng)建一個(gè)新的虛擬線程時(shí),讓新的虛擬線程默認(rèn)使用與創(chuàng)建者相同的調(diào)度器通常是合理的。這有助于:
- 資源管理: 如果你為一組相關(guān)的任務(wù)配置了特定的調(diào)度器(例如,具有特定線程池大小或優(yōu)先級的調(diào)度器),那么從這個(gè)組內(nèi)派生的新虛擬線程默認(rèn)使用相同的調(diào)度器可以保持資源使用的一致性。
- 行為一致性: 任務(wù)的執(zhí)行特性(如并發(fā)級別)可以更容易地在相關(guān)的虛擬線程間保持一致。
簡單來說,如果一個(gè)虛擬線程A創(chuàng)建了另一個(gè)虛擬線程B,并且沒有為B指定調(diào)度器,那么B就會默認(rèn)使用A的調(diào)度器。如果是一個(gè)平臺線程創(chuàng)建了虛擬線程B,并且沒有為B指定調(diào)度器,那么B就會使用全局默認(rèn)的調(diào)度器。
Continuation 類深度分析
Continuation 類是 Java 實(shí)現(xiàn)輕量級線程(虛擬線程)的基石。它代表了一種一次性(one-shot)的分界延續(xù)(delimited continuation)。簡單來說,它封裝了一段計(jì)算(一個(gè) Runnable 任務(wù)),這段計(jì)算可以被掛起(yield),并在之后從掛起點(diǎn)恢復(fù)執(zhí)行。
核心能力與特性:
封裝計(jì)算單元:
- 每個(gè)
Continuation對象都關(guān)聯(lián)一個(gè)Runnable任務(wù) (this.target) 和一個(gè)ContinuationScope(this.scope)。 ContinuationScope用于界定yield操作的范圍。當(dāng)調(diào)用Continuation.yield(scope)時(shí),執(zhí)行會從當(dāng)前Continuation向上回溯,直到找到匹配該scope的Continuation實(shí)例,然后掛起。
執(zhí)行與掛起 (run() 和 yield()):
run(): 這是啟動或恢復(fù) Continuation 執(zhí)行的入口點(diǎn)。
- 它首先會嘗試“掛載”(mount)
Continuation到當(dāng)前的載體線程(carrier thread)。掛載意味著將Continuation的執(zhí)行上下文(主要是棧幀)與載體線程關(guān)聯(lián)起來。 - 通過
JLA.setContinuation(t, this)將當(dāng)前Continuation設(shè)置為載體線程的活動Continuation。 - 調(diào)用本地方法
enterSpecial(this, isContinue, isVirtualThread)來實(shí)際進(jìn)入或恢復(fù)Continuation的執(zhí)行。這個(gè)本地方法是 JVM 實(shí)現(xiàn)Continuation魔法的核心,它處理?xiàng)5那袚Q和管理。 enterSpecial內(nèi)部會調(diào)用enter(),進(jìn)而調(diào)用target.run()來執(zhí)行用戶代碼。- 當(dāng)
Continuation內(nèi)部調(diào)用Continuation.yield(scope)時(shí),enterSpecial(或其調(diào)用的更深層本地代碼) 會保存當(dāng)前執(zhí)行狀態(tài)(棧幀等),然后“返回”到run()方法中enterSpecial調(diào)用的地方。 run()方法的finally塊負(fù)責(zé)“卸載”(unmount)Continuation,清理狀態(tài),并將載體線程的活動Continuation恢復(fù)為其父Continuation(如果存在)。
yield(ContinuationScope scope) (靜態(tài)方法):
- 這是
Continuation主動讓出執(zhí)行權(quán)的方式。 - 它會找到當(dāng)前線程上與指定
scope匹配的最內(nèi)層Continuation。 - 然后調(diào)用該
Continuation實(shí)例的yield0(scope, child)方法(這是一個(gè)非靜態(tài)的內(nèi)部方法,最終會觸發(fā)本地代碼doYield()),將執(zhí)行權(quán)交還給其父Continuation或調(diào)度器。 yieldInfo字段用于在yield時(shí)傳遞信息,決定是徹底返回還是在父Continuation中繼續(xù)yield。
棧管理 (StackChunk):
Continuation的執(zhí)行棧不是直接使用平臺線程的完整棧,而是由一系列StackChunk對象來管理。當(dāng)Continuation掛起時(shí),它的活動棧幀被保存在這些StackChunk中?;謴?fù)時(shí),這些棧幀被重新加載。private StackChunk tail;字段指向Continuation棧的當(dāng)前末端。isEmpty()方法檢查所有StackChunk是否都為空,用于判斷Continuation是否執(zhí)行完畢。
狀態(tài)管理:
done: 標(biāo)記Continuation的Runnable是否已經(jīng)執(zhí)行完畢。mounted: 一個(gè)volatile標(biāo)志,表示Continuation當(dāng)前是否掛載在某個(gè)載體線程上。mount()和unmount()方法以及compareAndSetMounted()原子地更新此狀態(tài)。scopedValueCache: 用于支持ScopedValue,在Continuation掛載和卸載時(shí)保存和恢復(fù)作用域值緩存。
父子關(guān)系 (parent, child):
Continuation可以形成一個(gè)層級結(jié)構(gòu)。當(dāng)一個(gè)Continuation(父) 內(nèi)部的某個(gè)點(diǎn)創(chuàng)建并運(yùn)行另一個(gè)Continuation(子) 時(shí),它們之間就建立了父子關(guān)系。JLA.getContinuation(currentCarrierThread())用于獲取當(dāng)前載體線程上活動的Continuation。run()方法中會處理parent和child的設(shè)置,確保在yield和恢復(fù)時(shí)能正確地在Continuation層級間導(dǎo)航。
pinning (固定):
Pinned枚舉(NATIVE,MONITOR,CRITICAL_SECTION,EXCEPTION)定義了Continuation可能被“固定”在載體線程上而無法安全yield的原因。例如,如果Continuation的執(zhí)行進(jìn)入了本地方法(JNI),或者持有一個(gè)對象監(jiān)視器鎖(synchronized塊),它就可能被固定。VThreadContinuation(在VirtualThread.java中定義) 的onPinned()方法是一個(gè)回調(diào),當(dāng)Continuation被固定時(shí)會被調(diào)用。虛擬線程的實(shí)現(xiàn)會根據(jù)這個(gè)信息決定是阻塞載體線程還是采取其他策略。
與 JVM 的深度集成:
@IntrinsicCandidate注解的本地方法如doYield()和enterSpecial()表明這些方法的實(shí)現(xiàn)是由 JVM 高度優(yōu)化的,它們直接操縱線程棧和執(zhí)行狀態(tài)。jdk.internal.access.JavaLangAccess(JLA) 和jdk.internal.access.SharedSecrets用于在java.base模塊內(nèi)部訪問java.lang.Thread等類的包私有或私有成員,這是實(shí)現(xiàn)Continuation與線程狀態(tài)緊密集成的關(guān)鍵。
Continuation 提供的核心能力: 它提供了一種機(jī)制,使得一段Java代碼的執(zhí)行可以在不阻塞底層平臺線程的情況下被暫停,其狀態(tài)(主要是調(diào)用棧)被保存起來,之后可以在相同的或不同的平臺線程上從暫停點(diǎn)恢復(fù)執(zhí)行。這是實(shí)現(xiàn)用戶態(tài)線程(如虛擬線程)的基礎(chǔ)。
實(shí)現(xiàn)多個(gè)虛擬線程的 JVM 級別調(diào)度還需要什么?
僅僅有 Continuation 是不夠的,還需要一個(gè)完整的框架來管理和調(diào)度它們,這正是 java.lang.VirtualThread 所做的事情。關(guān)鍵組件包括:
虛擬線程的表示 (VirtualThread 類):
- 每個(gè)虛擬線程實(shí)例內(nèi)部包裝一個(gè)
Continuation。 - 管理虛擬線程的生命周期狀態(tài)(
NEW,STARTED,RUNNING,PARKED,TERMINATED等)。 - 處理中斷、加入 (
join) 等線程操作。
調(diào)度器 (Executor):
VirtualThread需要一個(gè)調(diào)度器(通常是ForkJoinPool,如DEFAULT_SCHEDULER)來執(zhí)行它的Continuation。- 調(diào)度器負(fù)責(zé)將準(zhǔn)備好運(yùn)行的虛擬線程(其
Continuation)提交給一個(gè)可用的平臺線程(載體線程)來執(zhí)行。 - 當(dāng)虛擬線程
yield時(shí),它會從載體線程上卸載,載體線程可以被調(diào)度器用于執(zhí)行其他虛擬線程或任務(wù)。
阻塞操作的適配:
- 標(biāo)準(zhǔn)庫中的阻塞操作(如
LockSupport.park(),Object.wait(), 大部分同步 I/O 操作)需要被適配,以便在虛擬線程中調(diào)用時(shí),它們能夠觸發(fā)Continuation.yield()而不是阻塞載體線程。 - 例如,
VirtualThread.park()方法會嘗試yieldContinuation()。如果成功,虛擬線程掛起,載體線程釋放。如果失?。ㄒ?yàn)?Continuation被固定),則會退化為在載體線程上實(shí)際park。
與平臺線程的交互(掛載/卸載):
VirtualThread.mount(): 當(dāng)虛擬線程的Continuation開始在載體線程上運(yùn)行時(shí),需要將虛擬線程設(shè)置為Thread.currentThread()的返回值,并記錄載體線程。VirtualThread.unmount(): 當(dāng)Continuationyield或執(zhí)行完畢時(shí),需要從載體線程上卸載,恢復(fù)Thread.currentThread()指向載體線程本身。
固定 (Pinning) 的處理:
- 需要有機(jī)制檢測
Continuation何時(shí)被固定(例如,在 JNI 調(diào)用中或持有監(jiān)視器鎖)。 - 當(dāng)虛擬線程被固定時(shí),如果它執(zhí)行了阻塞操作,那么載體線程本身可能會被阻塞,因?yàn)?
Continuation無法yield。這是虛擬線程的一個(gè)重要性能考量點(diǎn)。
線程局部變量和作用域值:
- 需要確保線程局部變量(
ThreadLocal)對于虛擬線程按預(yù)期工作(即每個(gè)虛擬線程有自己的副本)。 ScopedValue是一個(gè)更現(xiàn)代的替代方案,與虛擬線程和Continuation結(jié)合得更好。Continuation類中的scopedValueCache字段就是為此服務(wù)的。
如果自己設(shè)計(jì)虛擬線程調(diào)度應(yīng)該怎么做?
這是一個(gè)非常復(fù)雜的系統(tǒng)工程,深度依賴于 JVM 的底層支持。但從概念上講,可以設(shè)想以下組件:
MyContinuation:
- 核心: 能夠保存和恢復(fù)執(zhí)行上下文(調(diào)用棧、程序計(jì)數(shù)器、寄存器)。這部分是最難的,需要 JVM 指令集層面的支持,或者像
libcontext這樣的庫(但 Java 沒有直接使用這個(gè))。在 JDK 中,這是通過StackChunk和大量本地代碼實(shí)現(xiàn)的。 - 接口:
void init(Runnable task),boolean resume(),void yield(),boolean isDone()。 - 狀態(tài):
INITIAL,SUSPENDED,RUNNING,DONE。
MyVirtualThread:
- 屬性: ID, 名稱, 狀態(tài) (
NEW,RUNNABLE,BLOCKED,TERMINATED), 優(yōu)先級 (可選)。 - 包含: 一個(gè)
MyContinuation實(shí)例,一個(gè)Runnable任務(wù)。 - 方法:
start(),interrupt(),join(),getState()。
MyScheduler (調(diào)度器):
- 核心: 一個(gè)或多個(gè)平臺線程(稱為工作線程或載體線程)。
- 隊(duì)列: 一個(gè)或多個(gè)用于存放
MyVirtualThread的就緒隊(duì)列。
調(diào)度循環(huán):
- 工作線程從就緒隊(duì)列中取出一個(gè)
MyVirtualThread。 - 設(shè)置
Thread.currentThread()指向這個(gè)MyVirtualThread(邏輯上的)。 - 調(diào)用
myVirtualThread.getContinuation().resume()。 - 如果
resume()返回是因?yàn)?yield()被調(diào)用: - 根據(jù)
yield的原因(例如,等待 I/O,等待鎖),將MyVirtualThread放入相應(yīng)的等待結(jié)構(gòu)中,或者如果只是普通的yield,放回就緒隊(duì)列。 - 如果
resume()返回是因?yàn)槿蝿?wù)完成 (isDone()為 true): - 更新
MyVirtualThread狀態(tài)為TERMINATED,處理join等待。 - 恢復(fù)
Thread.currentThread()指向平臺工作線程。 - 工作線程繼續(xù)從隊(duì)列取下一個(gè)任務(wù)。
同步原語和 I/O 適配:
鎖 (MyLock): 當(dāng) MyVirtualThread 嘗試獲取一個(gè)已被持有的鎖時(shí),它不應(yīng)阻塞平臺工作線程。而是:
- 將
MyVirtualThread放入該鎖的等待隊(duì)列。 - 調(diào)用其
MyContinuation.yield()。 - 當(dāng)鎖被釋放時(shí),調(diào)度器將等待隊(duì)列中的一個(gè)
MyVirtualThread移回就緒隊(duì)列。
I/O: 對于非阻塞 I/O (NIO):
- 發(fā)起非阻塞 I/O 操作。
- 將
MyVirtualThread和一個(gè)回調(diào)(當(dāng) I/O 完成時(shí)調(diào)用)注冊到Selector。 - 調(diào)用
MyContinuation.yield()。 - 當(dāng)
Selector檢測到 I/O 事件完成時(shí),執(zhí)行回調(diào),回調(diào)將對應(yīng)的MyVirtualThread重新放入調(diào)度器的就緒隊(duì)列。
Pinning 處理:
- 需要一種方法來標(biāo)記代碼段(如 JNI 調(diào)用,
synchronized塊)是不可yield的。 - 如果一個(gè)
MyVirtualThread在這種不可yield的代碼段中嘗試執(zhí)行一個(gè)會yield的操作(如獲取MyLock),調(diào)度器可能不得不阻塞當(dāng)前平臺工作線程,或者拋出異常,或者有其他備用策略。
挑戰(zhàn):
- 棧操作: 安全、高效地保存和恢復(fù)調(diào)用棧是最大的挑戰(zhàn),這需要 JVM 的深度配合。
- 與現(xiàn)有 Java 生態(tài)的兼容性: 大量現(xiàn)有庫依賴于
Thread.currentThread()的行為和阻塞原語。 - 調(diào)試和監(jiān)控: 調(diào)試跨越多個(gè)
Continuation片段的代碼會更復(fù)雜。
JDK 中的 Continuation 和 VirtualThread 在 JVM 層面解決了這些核心挑戰(zhàn)。自己從頭設(shè)計(jì)一個(gè)類似的系統(tǒng)將是一項(xiàng)艱巨的任務(wù),但理解其基本原理有助于更好地使用這些高級并發(fā)特性。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問
本篇文章主要介紹了詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問,非常具有實(shí)用價(jià)值,有興趣的可以了解一下2017-09-09
Java使用kafka發(fā)送和生產(chǎn)消息的示例
本篇文章主要介紹了Java使用kafka發(fā)送和生產(chǎn)消息的示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-04-04
解決?IDEA?Maven?項(xiàng)目中"Could?not?find?artifact"?
這篇文章主要介紹了解決IDEA Maven項(xiàng)目中Could not?find?artifact問題的常見情況和解決方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07
elasticsearch?java客戶端action的實(shí)現(xiàn)簡單分析
這篇文章主要為大家介紹了elasticsearch?java客戶端action的實(shí)現(xiàn)簡單分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
SpringBoot 使用WebSocket功能(實(shí)現(xiàn)步驟)
本文通過詳細(xì)步驟介紹了SpringBoot 使用WebSocket功能,首先需要導(dǎo)入WebSocket坐標(biāo),編寫WebSocket配置類,用于注冊WebSocket的Bean,結(jié)合示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-02-02

