欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java虛擬線程(VirtualThread)使用詳解

 更新時間:2025年05月31日 18:06:08   作者:lifallen  
這篇文章主要介紹了Java虛擬線程(VirtualThread)使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

VirtualThread 是 Java 實現(xiàn)輕量級并發(fā)(也稱為纖程或協(xié)程)的關(guān)鍵。與傳統(tǒng)的平臺線程(直接映射到操作系統(tǒng)線程)不同,虛擬線程由 JVM 管理和調(diào)度,可以在少量平臺線程上運行大量的虛擬線程,從而提高應(yīng)用程序的吞吐量。

流程概要

核心組件:

scheduler (Executor): 每個虛擬線程都有一個調(diào)度器,它是一個 Executor 實例。

這個調(diào)度器負(fù)責(zé)安排虛擬線程在載體線程(通常是平臺線程)上執(zhí)行。

如果沒有顯式指定,它會使用默認(rèn)的 ForkJoinPool (DEFAULT_SCHEDULER) 或者繼承父虛擬線程的調(diào)度器。

  • VirtualThread.java
// scheduler and continuation
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;

在構(gòu)造函數(shù)中,如果 schedulernull,會進(jìn)行選擇:

  • VirtualThread.java
// ... existing code ...
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
    super(name, characteristics, /*bound*/ false);
    Objects.requireNonNull(task);

    // choose scheduler if not specified
    if (scheduler == null) {
        Thread parent = Thread.currentThread();
        if (parent instanceof VirtualThread vparent) {
            scheduler = vparent.scheduler;
        } else {
            scheduler = DEFAULT_SCHEDULER;
        }
    }

    this.scheduler = scheduler;
    this.cont = new VThreadContinuation(this, task);
    this.runContinuation = this::runContinuation;
}
// ... existing code ...

cont (Continuation): 這是虛擬線程的核心。Continuation 是 JDK 內(nèi)部的一個機制,允許代碼執(zhí)行被掛起(yield)和恢復(fù)。每個虛擬線程都包裝了一個 Continuation 實例。

VThreadContinuation 是一個內(nèi)部類,它繼承自 Continuation,并包裝了用戶提供的 Runnable 任務(wù)。

  • VirtualThread.java
// ... existing code ...
/**
 * The continuation that a virtual thread executes.
 */
private static class VThreadContinuation extends Continuation {
    VThreadContinuation(VirtualThread vthread, Runnable task) {
        super(VTHREAD_SCOPE, wrap(vthread, task));
    }
    @Override
    protected void onPinned(Continuation.Pinned reason) {
    }
    private static Runnable wrap(VirtualThread vthread, Runnable task) {
        return new Runnable() {
            @Hidden
            @JvmtiHideEvents
            public void run() {
                vthread.notifyJvmtiStart(); // notify JVMTI
                try {
                    vthread.run(task);
                } finally {
                    vthread.notifyJvmtiEnd(); // notify JVMTI
                }
            }
        };
    }
}
// ... existing code ...

runContinuation (Runnable): 這是一個 Runnable,其 run() 方法(即 this::runContinuation 指向的 VirtualThread#runContinuation() 方法)負(fù)責(zé)實際執(zhí)行或繼續(xù)執(zhí)行虛擬線程的任務(wù)。它處理虛擬線程的掛載(mount)到載體線程、運行 Continuation、以及卸載(unmount)。

狀態(tài)管理: VirtualThread 內(nèi)部維護(hù)了一系列狀態(tài)常量(如 NEW, STARTED, RUNNING, PARKED, TERMINATED 等)和一個 volatile int state 字段來跟蹤其生命周期。狀態(tài)之間的轉(zhuǎn)換是精心設(shè)計的,以處理各種場景,如啟動、運行、暫停、阻塞和終止。

  • VirtualThread.java
// ... existing code ...
/*
 * Virtual thread state transitions:
 *
 *      NEW -> STARTED         // Thread.start, schedule to run
 *  STARTED -> TERMINATED      // failed to start
 *  STARTED -> RUNNING         // first run
 *  RUNNING -> TERMINATED      // done
 *
 *  RUNNING -> PARKING         // Thread parking with LockSupport.park
// ... many more states ...
 *  YIELDED -> RUNNING         // continue execution after Thread.yield
 */
private static final int NEW      = 0;
private static final int STARTED  = 1;
private static final int RUNNING  = 2;     // runnable-mounted

// untimed and timed parking
private static final int PARKING       = 3;
private static final int PARKED        = 4;     // unmounted
// ... other state constants ...
private static final int TERMINATED = 99;  // final state
// ... existing code ...

carrierThread (Thread): 表示當(dāng)前承載該虛擬線程執(zhí)行的平臺線程。當(dāng)虛擬線程被掛起(unmounted)時,它不占用平臺線程。當(dāng)它需要運行時,調(diào)度器會將其調(diào)度到某個可用的載體線程上執(zhí)行。

啟動流程 (start() 方法):

  • VirtualThread.java
// ... existing code ...
@Override
void start(ThreadContainer container) {
    if (!compareAndSetState(NEW, STARTED)) {
        throw new IllegalThreadStateException("Already started");
    }

    // bind thread to container
    assert threadContainer() == null;
    setThreadContainer(container);

    // start thread
    boolean addedToContainer = false;
    boolean started = false;
    try {
        container.add(this);  // may throw
        addedToContainer = true;

        // scoped values may be inherited
        inheritScopedValueBindings(container);

        // submit task to run thread, using externalSubmit if possible
        externalSubmitRunContinuationOrThrow();
        started = true;
    } finally {
        if (!started) {
            afterDone(addedToContainer);
        }
    }
}

@Override
public void start() {
    start(ThreadContainers.root());
}
// ... existing code ...
  • 當(dāng)調(diào)用虛擬線程的 start() 方法時,它首先將狀態(tài)從 NEW 原子地更新為 STARTED。
  • 然后,它將自身添加到一個線程容器 (ThreadContainer) 中。
  • 最關(guān)鍵的一步是調(diào)用 externalSubmitRunContinuationOrThrow() (或類似的提交方法) 將 runContinuation 任務(wù)提交給其 scheduler 執(zhí)行。

執(zhí)行與掛起 (runContinuation()yieldContinuation()):

  • VirtualThread.java
// ... existing code ...
@ChangesCurrentThread // allow mount/unmount to be inlined
private void runContinuation() {
    // the carrier must be a platform thread
    if (Thread.currentThread().isVirtual()) {
        throw new WrongThreadException();
    }

    // set state to RUNNING
    int initialState = state();
    if (initialState == STARTED || initialState == UNPARKED
            || initialState == UNBLOCKED || initialState == YIELDED) {
        // newly started or continue after parking/blocking/Thread.yield
        if (!compareAndSetState(initialState, RUNNING)) {
            return;
        }
        // consume permit when continuing after parking or blocking. If continue
        // after a timed-park or timed-wait then the timeout task is cancelled.
        if (initialState == UNPARKED) {
            cancelTimeoutTask();
            setParkPermit(false);
        } else if (initialState == UNBLOCKED) {
            cancelTimeoutTask();
            blockPermit = false;
        }
    } else {
        // not runnable
        return;
    }

    mount();
    try {
        cont.run();
    } finally {
        unmount();
        if (cont.isDone()) {
            afterDone();
        } else {
            afterYield();
        }
    }
}
// ... existing code ...
  • VirtualThread.java
// ... existing code ...
@Hidden
private boolean yieldContinuation() {
    notifyJvmtiUnmount(/*hide*/true);
    try {
        return Continuation.yield(VTHREAD_SCOPE);
    } finally {
        notifyJvmtiMount(/*hide*/false);
    }
}
// ... existing code ...

runContinuation():

  • 檢查當(dāng)前線程是否為平臺線程(載體線程不能是虛擬線程)。
  • 原子地更新狀態(tài)到 RUNNING。
  • 調(diào)用 mount() 方法,將虛擬線程“掛載”到當(dāng)前平臺線程上。這包括設(shè)置 carrierThread 字段,并使 Thread.currentThread() 返回該虛擬線程實例。
  • 執(zhí)行 cont.run(),這會實際運行或恢復(fù) Continuation 中包裝的用戶任務(wù)。
  • 執(zhí)行完畢或 Continuation 掛起后,調(diào)用 unmount() 方法,將虛擬線程從平臺線程上“卸載”。這包括清除 carrierThread,并使 Thread.currentThread() 返回平臺線程本身。
  • 根據(jù) Continuation 是否完成 (isDone()),調(diào)用 afterDone()afterYield()。
  • yieldContinuation(): 當(dāng)虛擬線程需要掛起時(例如,執(zhí)行了 LockSupport.park(),或者I/O操作會阻塞),它會調(diào)用 Continuation.yield(VTHREAD_SCOPE)。這會導(dǎo)致 Continuation 的執(zhí)行暫停,runContinuation() 方法中的 cont.run() 調(diào)用會返回。此時,載體平臺線程可以被釋放去執(zhí)行其他任務(wù)。
  • afterYield(): 當(dāng) yieldContinuation() 成功掛起后,此方法被調(diào)用。它根據(jù)虛擬線程掛起前的原因(如 PARKING, YIELDING, BLOCKING, WAITING)更新狀態(tài),并可能安排后續(xù)操作(如設(shè)置定時器喚醒、重新提交到調(diào)度器等)。

Parking (park()parkNanos()):

  • VirtualThread.java
// ... existing code ...
@Override
void park() {
    assert Thread.currentThread() == this;

    // complete immediately if parking permit available or interrupted
    if (getAndSetParkPermit(false) || interrupted)
        return;

    // park the thread
    boolean yielded = false;
    setState(PARKING);
    try {
        yielded = yieldContinuation();
    } catch (OutOfMemoryError e) {
        // park on carrier
    } finally {
        assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
        if (!yielded) {
            assert state() == PARKING;
            setState(RUNNING);
        }
    }

    // park on the carrier thread when pinned
    if (!yielded) {
        parkOnCarrierThread(false, 0);
    }
}
// ... existing code ...
  • 當(dāng)虛擬線程調(diào)用 LockSupport.park()LockSupport.parkNanos() 時,會調(diào)用 VirtualThread 內(nèi)部的 park()parkNanos() 方法。
  • 這些方法會嘗試通過 yieldContinuation() 來掛起虛擬線程。
  • 如果 yieldContinuation() 成功(即虛擬線程沒有被固定在載體線程上),虛擬線程的狀態(tài)會變?yōu)?PARKEDTIMED_PARKED,并且載體線程被釋放。
  • 如果 yieldContinuation() 失?。ɡ纾摂M線程在 JNI 調(diào)用中或 synchronized 塊中被固定),它會退而求其次,在當(dāng)前的載體線程上實際執(zhí)行 U.park()(通過 parkOnCarrierThread 方法),此時載體線程會被阻塞,狀態(tài)變?yōu)?PINNEDTIMED_PINNED。

調(diào)度 (submitRunContinuation 系列方法):

  • VirtualThread.java
// ... existing code ...
private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
    boolean done = false;
    while (!done) {
        try {
            // Pin the continuation to prevent the virtual thread from unmounting
            // when submitting a task. For the default scheduler this ensures that
            // the carrier doesn't change when pushing a task. For other schedulers
            // it avoids deadlock that could arise due to carriers and virtual
            // threads contending for a lock.
            if (currentThread().isVirtual()) {
                Continuation.pin();
                try {
                    scheduler.execute(runContinuation);
                } finally {
                    Continuation.unpin();
                }
            } else {
                scheduler.execute(runContinuation);
            }
            done = true;
        } catch (RejectedExecutionException ree) {
            submitFailed(ree);
            throw ree;
        } catch (OutOfMemoryError e) {
            if (retryOnOOME) {
                U.park(false, 100_000_000); // 100ms
            } else {
                throw e;
            }
        }
    }
}
// ... existing code ...
  • 有多種提交 runContinuation 任務(wù)到調(diào)度器的方法,如 submitRunContinuation(), lazySubmitRunContinuation(), externalSubmitRunContinuation()。
  • 這些方法處理了向調(diào)度器提交任務(wù)的細(xì)節(jié),包括在特定情況下(如在虛擬線程內(nèi)部提交任務(wù)給調(diào)度器)需要固定 Continuation (Continuation.pin()) 以防止在提交過程中虛擬線程被意外卸載。

總而言之,VirtualThread 的實現(xiàn)巧妙地結(jié)合了 Continuation 機制和 Executor 調(diào)度框架。Continuation 提供了掛起和恢復(fù)執(zhí)行流的能力,使得虛擬線程在阻塞操作時可以釋放底層的平臺線程。調(diào)度器則負(fù)責(zé)在虛擬線程準(zhǔn)備好運行時,將其分配給一個可用的平臺線程執(zhí)行。通過這種方式,少量的平臺線程就可以支持大量并發(fā)的虛擬線程,大大降低了線程創(chuàng)建和上下文切換的開銷。狀態(tài)管理確保了虛擬線程在各種并發(fā)場景下的正確行為。

調(diào)度器繼承

instanceof 模式匹配語法 (parent instanceof VirtualThread vparent)

這是一種 Java 的語法特性,稱為 模式匹配 (Pattern Matching) for instanceof。它從 Java 14 開始作為預(yù)覽特性引入,并在 Java 16 中正式發(fā)布。

在以前的 Java 版本中,如果想檢查一個對象的類型,并且如果類型匹配則將其轉(zhuǎn)換為該類型并使用,你通常會這樣寫:

if (parent instanceof VirtualThread) {
    VirtualThread vparent = (VirtualThread) parent;
    // 然后使用 vparent
    scheduler = vparent.scheduler;
}

模式匹配 instanceof 簡化了這個過程。

if (parent instanceof VirtualThread vparent) 這行代碼做了兩件事:

  1. 類型檢查: 它檢查 parent 對象是否是 VirtualThread 的一個實例。
  2. 條件聲明和賦值: 如果 parent 確實是 VirtualThread 的實例,那么它會聲明一個新的局部變量 vparent (類型為 VirtualThread),并將 parent 自動轉(zhuǎn)換 (cast) 為 VirtualThread 類型后賦值給 vparent。這個 vparent 變量只在 if 語句塊為真(即類型匹配成功)的作用域內(nèi)有效。

這是一種更簡潔、更安全的寫法,避免了顯式的類型轉(zhuǎn)換和引入額外的變量聲明步驟。

這里的“父子關(guān)系”并不是指操作系統(tǒng)層面嚴(yán)格的父子進(jìn)程或線程關(guān)系,而是指創(chuàng)建者與被創(chuàng)建者的關(guān)系。

Thread parent = Thread.currentThread();:這行代碼獲取的是當(dāng)前正在執(zhí)行 VirtualThread 構(gòu)造函數(shù)的線程。這個線程就是新虛擬線程的“創(chuàng)建者”或“父”線程。

邏輯:

當(dāng)創(chuàng)建一個新的 VirtualThread 時,可以顯式地給它傳遞一個 scheduler

如果調(diào)用者沒有提供 scheduler (即 scheduler == null),那么虛擬線程的構(gòu)造邏輯會嘗試確定一個默認(rèn)的調(diào)度器。

這時,它會檢查創(chuàng)建這個新虛擬線程的線程 (parent):

  • 如果 parent 本身也是一個 VirtualThread (parent instanceof VirtualThread vparent):那么新的虛擬線程將繼承其創(chuàng)建者虛擬線程的調(diào)度器 (scheduler = vparent.scheduler;)。
  • 如果 parent 是一個平臺線程 (platform thread) 或者其他非 VirtualThread 類型: 那么新的虛擬線程將使用默認(rèn)的調(diào)度器 (scheduler = DEFAULT_SCHEDULER;),這個默認(rèn)調(diào)度器通常是一個 ForkJoinPool。

這種設(shè)計體現(xiàn)了一種“上下文感知”的默認(rèn)行為。如果你的代碼已經(jīng)在某個特定的虛擬線程(它使用著特定的調(diào)度器)中運行,當(dāng)你從這個虛擬線程中再創(chuàng)建一個新的虛擬線程時,讓新的虛擬線程默認(rèn)使用與創(chuàng)建者相同的調(diào)度器通常是合理的。這有助于:

  • 資源管理: 如果你為一組相關(guān)的任務(wù)配置了特定的調(diào)度器(例如,具有特定線程池大小或優(yōu)先級的調(diào)度器),那么從這個組內(nèi)派生的新虛擬線程默認(rèn)使用相同的調(diào)度器可以保持資源使用的一致性。
  • 行為一致性: 任務(wù)的執(zhí)行特性(如并發(fā)級別)可以更容易地在相關(guān)的虛擬線程間保持一致。

簡單來說,如果一個虛擬線程A創(chuàng)建了另一個虛擬線程B,并且沒有為B指定調(diào)度器,那么B就會默認(rèn)使用A的調(diào)度器。如果是一個平臺線程創(chuàng)建了虛擬線程B,并且沒有為B指定調(diào)度器,那么B就會使用全局默認(rèn)的調(diào)度器。

Continuation 類深度分析

Continuation 類是 Java 實現(xiàn)輕量級線程(虛擬線程)的基石。它代表了一種一次性(one-shot)的分界延續(xù)(delimited continuation)。簡單來說,它封裝了一段計算(一個 Runnable 任務(wù)),這段計算可以被掛起(yield),并在之后從掛起點恢復(fù)執(zhí)行。

核心能力與特性:

封裝計算單元:

  • 每個 Continuation 對象都關(guān)聯(lián)一個 Runnable 任務(wù) (this.target) 和一個 ContinuationScope (this.scope)。
  • ContinuationScope 用于界定 yield 操作的范圍。當(dāng)調(diào)用 Continuation.yield(scope) 時,執(zhí)行會從當(dāng)前 Continuation 向上回溯,直到找到匹配該 scopeContinuation 實例,然后掛起。

執(zhí)行與掛起 (run()yield()):

run(): 這是啟動或恢復(fù) Continuation 執(zhí)行的入口點。

  • 它首先會嘗試“掛載”(mount)Continuation 到當(dāng)前的載體線程(carrier thread)。掛載意味著將 Continuation 的執(zhí)行上下文(主要是棧幀)與載體線程關(guān)聯(lián)起來。
  • 通過 JLA.setContinuation(t, this) 將當(dāng)前 Continuation 設(shè)置為載體線程的活動 Continuation。
  • 調(diào)用本地方法 enterSpecial(this, isContinue, isVirtualThread) 來實際進(jìn)入或恢復(fù) Continuation 的執(zhí)行。這個本地方法是 JVM 實現(xiàn) Continuation 魔法的核心,它處理棧的切換和管理。
  • enterSpecial 內(nèi)部會調(diào)用 enter(),進(jìn)而調(diào)用 target.run() 來執(zhí)行用戶代碼。
  • 當(dāng) Continuation 內(nèi)部調(diào)用 Continuation.yield(scope) 時,enterSpecial (或其調(diào)用的更深層本地代碼) 會保存當(dāng)前執(zhí)行狀態(tài)(棧幀等),然后“返回”到 run() 方法中 enterSpecial 調(diào)用的地方。
  • run() 方法的 finally 塊負(fù)責(zé)“卸載”(unmount)Continuation,清理狀態(tài),并將載體線程的活動 Continuation 恢復(fù)為其父 Continuation (如果存在)。

yield(ContinuationScope scope) (靜態(tài)方法):

  • 這是 Continuation 主動讓出執(zhí)行權(quán)的方式。
  • 它會找到當(dāng)前線程上與指定 scope 匹配的最內(nèi)層 Continuation。
  • 然后調(diào)用該 Continuation 實例的 yield0(scope, child) 方法(這是一個非靜態(tài)的內(nèi)部方法,最終會觸發(fā)本地代碼 doYield()),將執(zhí)行權(quán)交還給其父 Continuation 或調(diào)度器。
  • yieldInfo 字段用于在 yield 時傳遞信息,決定是徹底返回還是在父 Continuation 中繼續(xù) yield

棧管理 (StackChunk):

  • Continuation 的執(zhí)行棧不是直接使用平臺線程的完整棧,而是由一系列 StackChunk 對象來管理。當(dāng) Continuation 掛起時,它的活動棧幀被保存在這些 StackChunk 中?;謴?fù)時,這些棧幀被重新加載。
  • private StackChunk tail; 字段指向 Continuation 棧的當(dāng)前末端。
  • isEmpty() 方法檢查所有 StackChunk 是否都為空,用于判斷 Continuation 是否執(zhí)行完畢。

狀態(tài)管理:

  • done: 標(biāo)記 ContinuationRunnable 是否已經(jīng)執(zhí)行完畢。
  • mounted: 一個 volatile 標(biāo)志,表示 Continuation 當(dāng)前是否掛載在某個載體線程上。mount()unmount() 方法以及 compareAndSetMounted() 原子地更新此狀態(tài)。
  • scopedValueCache: 用于支持 ScopedValue,在 Continuation 掛載和卸載時保存和恢復(fù)作用域值緩存。

父子關(guān)系 (parent, child):

  • Continuation 可以形成一個層級結(jié)構(gòu)。當(dāng)一個 Continuation (父) 內(nèi)部的某個點創(chuàng)建并運行另一個 Continuation (子) 時,它們之間就建立了父子關(guān)系。
  • JLA.getContinuation(currentCarrierThread()) 用于獲取當(dāng)前載體線程上活動的 Continuation。
  • run() 方法中會處理 parentchild 的設(shè)置,確保在 yield 和恢復(fù)時能正確地在 Continuation 層級間導(dǎo)航。

pinning (固定):

  • Pinned 枚舉(NATIVE, MONITOR, CRITICAL_SECTION, EXCEPTION)定義了 Continuation 可能被“固定”在載體線程上而無法安全 yield 的原因。例如,如果 Continuation 的執(zhí)行進(jìn)入了本地方法(JNI),或者持有一個對象監(jiān)視器鎖(synchronized 塊),它就可能被固定。
  • VThreadContinuation (在 VirtualThread.java 中定義) 的 onPinned() 方法是一個回調(diào),當(dāng) Continuation 被固定時會被調(diào)用。虛擬線程的實現(xiàn)會根據(jù)這個信息決定是阻塞載體線程還是采取其他策略。

與 JVM 的深度集成:

  • @IntrinsicCandidate 注解的本地方法如 doYield()enterSpecial() 表明這些方法的實現(xiàn)是由 JVM 高度優(yōu)化的,它們直接操縱線程棧和執(zhí)行狀態(tài)。
  • jdk.internal.access.JavaLangAccess (JLA) 和 jdk.internal.access.SharedSecrets 用于在 java.base 模塊內(nèi)部訪問 java.lang.Thread 等類的包私有或私有成員,這是實現(xiàn) Continuation 與線程狀態(tài)緊密集成的關(guān)鍵。

Continuation 提供的核心能力: 它提供了一種機制,使得一段Java代碼的執(zhí)行可以在不阻塞底層平臺線程的情況下被暫停,其狀態(tài)(主要是調(diào)用棧)被保存起來,之后可以在相同的或不同的平臺線程上從暫停點恢復(fù)執(zhí)行。這是實現(xiàn)用戶態(tài)線程(如虛擬線程)的基礎(chǔ)。

實現(xiàn)多個虛擬線程的 JVM 級別調(diào)度還需要什么?

僅僅有 Continuation 是不夠的,還需要一個完整的框架來管理和調(diào)度它們,這正是 java.lang.VirtualThread 所做的事情。關(guān)鍵組件包括:

虛擬線程的表示 (VirtualThread 類):

  • 每個虛擬線程實例內(nèi)部包裝一個 Continuation。
  • 管理虛擬線程的生命周期狀態(tài)(NEW, STARTED, RUNNING, PARKED, TERMINATED 等)。
  • 處理中斷、加入 (join) 等線程操作。

調(diào)度器 (Executor):

  • VirtualThread 需要一個調(diào)度器(通常是 ForkJoinPool,如 DEFAULT_SCHEDULER)來執(zhí)行它的 Continuation。
  • 調(diào)度器負(fù)責(zé)將準(zhǔn)備好運行的虛擬線程(其 Continuation)提交給一個可用的平臺線程(載體線程)來執(zhí)行。
  • 當(dāng)虛擬線程 yield 時,它會從載體線程上卸載,載體線程可以被調(diào)度器用于執(zhí)行其他虛擬線程或任務(wù)。

阻塞操作的適配:

  • 標(biāo)準(zhǔn)庫中的阻塞操作(如 LockSupport.park(), Object.wait(), 大部分同步 I/O 操作)需要被適配,以便在虛擬線程中調(diào)用時,它們能夠觸發(fā) Continuation.yield() 而不是阻塞載體線程。
  • 例如,VirtualThread.park() 方法會嘗試 yieldContinuation()。如果成功,虛擬線程掛起,載體線程釋放。如果失敗(因為 Continuation 被固定),則會退化為在載體線程上實際 park。

與平臺線程的交互(掛載/卸載):

  • VirtualThread.mount(): 當(dāng)虛擬線程的 Continuation 開始在載體線程上運行時,需要將虛擬線程設(shè)置為 Thread.currentThread() 的返回值,并記錄載體線程。
  • VirtualThread.unmount(): 當(dāng) Continuation yield 或執(zhí)行完畢時,需要從載體線程上卸載,恢復(fù) Thread.currentThread() 指向載體線程本身。

固定 (Pinning) 的處理:

  • 需要有機制檢測 Continuation 何時被固定(例如,在 JNI 調(diào)用中或持有監(jiān)視器鎖)。
  • 當(dāng)虛擬線程被固定時,如果它執(zhí)行了阻塞操作,那么載體線程本身可能會被阻塞,因為 Continuation 無法 yield。這是虛擬線程的一個重要性能考量點。

線程局部變量和作用域值:

  • 需要確保線程局部變量(ThreadLocal)對于虛擬線程按預(yù)期工作(即每個虛擬線程有自己的副本)。
  • ScopedValue 是一個更現(xiàn)代的替代方案,與虛擬線程和 Continuation 結(jié)合得更好。Continuation 類中的 scopedValueCache 字段就是為此服務(wù)的。

如果自己設(shè)計虛擬線程調(diào)度應(yīng)該怎么做?

這是一個非常復(fù)雜的系統(tǒng)工程,深度依賴于 JVM 的底層支持。但從概念上講,可以設(shè)想以下組件:

MyContinuation:

  • 核心: 能夠保存和恢復(fù)執(zhí)行上下文(調(diào)用棧、程序計數(shù)器、寄存器)。這部分是最難的,需要 JVM 指令集層面的支持,或者像 libcontext 這樣的庫(但 Java 沒有直接使用這個)。在 JDK 中,這是通過 StackChunk 和大量本地代碼實現(xiàn)的。
  • 接口: void init(Runnable task), boolean resume(), void yield(), boolean isDone()。
  • 狀態(tài): INITIAL, SUSPENDED, RUNNING, DONE。

MyVirtualThread:

  • 屬性: ID, 名稱, 狀態(tài) (NEW, RUNNABLE, BLOCKED, TERMINATED), 優(yōu)先級 (可選)。
  • 包含: 一個 MyContinuation 實例,一個 Runnable 任務(wù)。
  • 方法: start(), interrupt(), join(), getState()。

MyScheduler (調(diào)度器):

  • 核心: 一個或多個平臺線程(稱為工作線程或載體線程)。
  • 隊列: 一個或多個用于存放 MyVirtualThread 的就緒隊列。

調(diào)度循環(huán):

  • 工作線程從就緒隊列中取出一個 MyVirtualThread。
  • 設(shè)置 Thread.currentThread() 指向這個 MyVirtualThread (邏輯上的)。
  • 調(diào)用 myVirtualThread.getContinuation().resume()。
  • 如果 resume() 返回是因為 yield() 被調(diào)用:
  • 根據(jù) yield 的原因(例如,等待 I/O,等待鎖),將 MyVirtualThread 放入相應(yīng)的等待結(jié)構(gòu)中,或者如果只是普通的 yield,放回就緒隊列。
  • 如果 resume() 返回是因為任務(wù)完成 (isDone() 為 true):
  • 更新 MyVirtualThread 狀態(tài)為 TERMINATED,處理 join 等待。
  • 恢復(fù) Thread.currentThread() 指向平臺工作線程。
  • 工作線程繼續(xù)從隊列取下一個任務(wù)。

同步原語和 I/O 適配:

鎖 (MyLock): 當(dāng) MyVirtualThread 嘗試獲取一個已被持有的鎖時,它不應(yīng)阻塞平臺工作線程。而是:

  1. MyVirtualThread 放入該鎖的等待隊列。
  2. 調(diào)用其 MyContinuation.yield()。
  3. 當(dāng)鎖被釋放時,調(diào)度器將等待隊列中的一個 MyVirtualThread 移回就緒隊列。

I/O: 對于非阻塞 I/O (NIO):

  1. 發(fā)起非阻塞 I/O 操作。
  2. MyVirtualThread 和一個回調(diào)(當(dāng) I/O 完成時調(diào)用)注冊到 Selector。
  3. 調(diào)用 MyContinuation.yield()。
  4. 當(dāng) Selector 檢測到 I/O 事件完成時,執(zhí)行回調(diào),回調(diào)將對應(yīng)的 MyVirtualThread 重新放入調(diào)度器的就緒隊列。

Pinning 處理:

  • 需要一種方法來標(biāo)記代碼段(如 JNI 調(diào)用,synchronized 塊)是不可 yield 的。
  • 如果一個 MyVirtualThread 在這種不可 yield 的代碼段中嘗試執(zhí)行一個會 yield 的操作(如獲取 MyLock),調(diào)度器可能不得不阻塞當(dāng)前平臺工作線程,或者拋出異常,或者有其他備用策略。

挑戰(zhàn):

  • 棧操作: 安全、高效地保存和恢復(fù)調(diào)用棧是最大的挑戰(zhàn),這需要 JVM 的深度配合。
  • 與現(xiàn)有 Java 生態(tài)的兼容性: 大量現(xiàn)有庫依賴于 Thread.currentThread() 的行為和阻塞原語。
  • 調(diào)試和監(jiān)控: 調(diào)試跨越多個 Continuation 片段的代碼會更復(fù)雜。

JDK 中的 ContinuationVirtualThread 在 JVM 層面解決了這些核心挑戰(zhàn)。自己從頭設(shè)計一個類似的系統(tǒng)將是一項艱巨的任務(wù),但理解其基本原理有助于更好地使用這些高級并發(fā)特性。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問

    詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問

    本篇文章主要介紹了詳解關(guān)于springboot-actuator監(jiān)控的401無權(quán)限訪問,非常具有實用價值,有興趣的可以了解一下
    2017-09-09
  • Java使用kafka發(fā)送和生產(chǎn)消息的示例

    Java使用kafka發(fā)送和生產(chǎn)消息的示例

    本篇文章主要介紹了Java使用kafka發(fā)送和生產(chǎn)消息的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • Java單利模式與多線程總結(jié)歸納

    Java單利模式與多線程總結(jié)歸納

    這篇文章主要介紹了Java單利模式與多線程總結(jié)歸納 的相關(guān)資料,需要的朋友可以參考下
    2016-03-03
  • Java替換視頻背景音樂的實現(xiàn)示例

    Java替換視頻背景音樂的實現(xiàn)示例

    FFmpeg 是一個強大的開源多媒體處理工具,被廣泛應(yīng)用于音視頻的錄制、轉(zhuǎn)碼、編輯等方面,本文主要介紹了Java替換視頻背景音樂,具有一定的參考價值,感興趣的可以了解一下
    2024-03-03
  • 解決?IDEA?Maven?項目中"Could?not?find?artifact"?問題的常見情況和解決方案

    解決?IDEA?Maven?項目中"Could?not?find?artifact"?

    這篇文章主要介紹了解決IDEA Maven項目中Could not?find?artifact問題的常見情況和解決方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-07-07
  • Java實現(xiàn)簡單棋盤存檔和讀取功能

    Java實現(xiàn)簡單棋盤存檔和讀取功能

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)簡單棋盤存檔和讀取功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-09-09
  • elasticsearch?java客戶端action的實現(xiàn)簡單分析

    elasticsearch?java客戶端action的實現(xiàn)簡單分析

    這篇文章主要為大家介紹了elasticsearch?java客戶端action的實現(xiàn)簡單分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-04-04
  • SpringBoot 使用WebSocket功能(實現(xiàn)步驟)

    SpringBoot 使用WebSocket功能(實現(xiàn)步驟)

    本文通過詳細(xì)步驟介紹了SpringBoot 使用WebSocket功能,首先需要導(dǎo)入WebSocket坐標(biāo),編寫WebSocket配置類,用于注冊WebSocket的Bean,結(jié)合示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-02-02
  • MyBatis-Plus中SQL分析與打印

    MyBatis-Plus中SQL分析與打印

    MyBatis-Plus通過集成p6spy實現(xiàn)SQL分析與打印,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-06-06
  • 詳解Java編程中向量(Vector)的應(yīng)用

    詳解Java編程中向量(Vector)的應(yīng)用

    這篇文章主要介紹了Java中的向量(Vector)的應(yīng)用,Vector也是Java中比較常用的一個類,需要的朋友可以參考下
    2015-10-10

最新評論