欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

淺析C#?AsyncLocal如何在異步間進行數(shù)據流轉

 更新時間:2023年08月28日 10:00:12   作者:yi念之間  
在異步編程中,處理異步操作之間的數(shù)據流轉是一個比較常用的操作,C#異步編程提供了一個強大的工具來解決這個問題,那就是AsyncLocal,下面我們就來看看AsyncLocal的原理和用法吧

前言

在異步編程中,處理異步操作之間的數(shù)據流轉是一個比較常用的操作。C#異步編程提供了一個強大的工具來解決這個問題,那就是AsyncLocal。它是一個線程本地存儲的機制,可以在異步操作之間傳遞數(shù)據。它為我們提供了一種簡單而可靠的方式來共享數(shù)據,而不必擔心線程切換或異步上下文的變化。本文我們將探究AsyncLocal的原理和用法,并進行相關源碼解析。探討它如何在異步操作之間實現(xiàn)數(shù)據的流轉,以及它是如何在底層工作的。

使用方式

上面我們提到了AsyncLocal可以在異步操作間傳遞數(shù)據,我們在之前的文章<研究c#異步操作async await狀態(tài)機的總結>一文中提到過異步操作會涉及到線程切換的問題,接下來通過Task來模擬一個簡單異步示例,來看一下它的工作方式是什么樣的,以便加深對它的理解,先看一下示例

AsyncLocal<Person> context = new AsyncLocal<Person>();
context.Value = new Person { Id = 1, Name = "張三" };
Console.WriteLine($"Main之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
await Task.Run(() =>
{
    Console.WriteLine($"Task1之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value.Name = "李四";
    Console.WriteLine($"Task1之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
await Task.Run(() =>
{
    Console.WriteLine($"Task2之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value.Name = "王五";
    Console.WriteLine($"Task2之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine($"Main之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");

在上面的示例中,我們創(chuàng)建了一個AsyncLocal實例,并賦值了一個Person對象,然后我們創(chuàng)建了兩個Task,分別執(zhí)行了兩個異步操作,并分別修改了AsyncLocal中的Person對象的值,分別在執(zhí)行異步之前執(zhí)行異步過程中和執(zhí)行異步之后打印值來觀察變化,執(zhí)行程序輸出結果如下

Main之前:張三,ThreadId=1
Task1之前:張三,ThreadId=4
Task1之后:李四,ThreadId=4
Task2之前:李四,ThreadId=6
Task2之后:王五,ThreadId=6
Main之后:王五,ThreadId=6

從輸出結果來看,雖然我們在異步中修改了AsyncLocalPerson對象的值,并且也發(fā)生了線程切換。但是它可以在異步操作之間的數(shù)據共享和傳遞,使得我們在異步間進行的數(shù)據就和在一個線程里操作數(shù)據一樣,讓我們可以忽略掉其實已經發(fā)生了多次線程切換。

探究本質

通過上面的示例,我們發(fā)現(xiàn)AsyncLocal確實可以實現(xiàn)異步之間的數(shù)據共享和傳遞,那么它是如何實現(xiàn)的呢?接下來,我們通過先查看AsyncLocal涉及到的相關源碼來探究一下。想弄明白它的流轉問題,需要研究兩個大方向,一個是AsyncLocal的本身實現(xiàn),一個是AsyncLocal的流轉涉及到的異步或者多線程相關這里涉及到的主要是Task線程池里的相關實現(xiàn)。由于異步相關涉及到了一整個體系,所以但看某一點的時候可能不太容易理解,我們先從AsyncLocal本身入手,然后從Task入手,最后從線程池入手,逐步探究AsyncLocal如何進行數(shù)據流轉的。但是仍然希望能在閱讀本文之前先了解一下設計到該話題的相關文章,先對整體有一個整體的把握

AsyncLocal

雖然強烈建議先看一下上面推薦的文章,但是在這里我們還是簡單介紹一下AsyncLocal的實現(xiàn),所以這里我們簡單介紹一下,方便大家能直觀的看到。其實涉及到的比較簡單,就是看一下AsyncLocal里涉及到關于Value的操作即可[點擊查看AsyncLocal.Value源碼]

public sealed class AsyncLocal<T> : IAsyncLocal
{
    [MaybeNull]
    public T Value
    {
        get
        {
            object? value = ExecutionContext.GetLocalValue(this);
            if (typeof(T).IsValueType && value is null)
            {
                return default;
            }
            return (T)value!;
        }
        set
        {
            ExecutionContext.SetLocalValue(this, value, _valueChangedHandler is not null);
        }
    }
}

通過上面的源碼可以看到AsyncLocal的Value屬性的能力來自于ExecutionContext,也可理解為AsyncLocal是對ExecutionContext能力的包裝。

在C#中,ExecutionContext是用于多線程和異步編程的類,用于保存和還原線程的執(zhí)行狀態(tài)。它的主要功能是確保在線程切換時,狀態(tài)得以保留和恢復,以便線程能夠在正確的上下文中繼續(xù)執(zhí)行。這有助于管理線程的數(shù)據、狀態(tài)以及異步任務的正確執(zhí)行。

所以我們可以繼續(xù)簡單的看一下ExecutionContext中關于GetLocalValue方法和SetLocalValue方法的大致實現(xiàn),這里我們不在進行全部代碼展示,只展示核心實現(xiàn)[點擊查看ExecutionContext.LocalValue源碼]

public sealed class ExecutionContext : IDisposable, ISerializable
{
    private readonly IAsyncLocalValueMap? m_localValues;
    private ExecutionContext(
        IAsyncLocalValueMap localValues,)
    {
        m_localValues = localValues;
    }
    //獲取值的方法
    internal static object? GetLocalValue(IAsyncLocal local)
    {
        //捕獲當前線程的執(zhí)行上下文
        ExecutionContext? current = Thread.CurrentThread._executionContext;
        if (current == null)
        {
            return null;
        }
        //在執(zhí)行上下文中獲取值
        current.m_localValues.TryGetValue(local, out object? value);
        return value;
    }
    //設置值的方法
    internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications)
    {
        ExecutionContext? current = Thread.CurrentThread._executionContext;
        //判斷設置的心值和舊值是否相同
        object? previousValue = null;
        bool hadPreviousValue = false;
        if (current != null)
        {
            hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue);
        }
        //相同的話不在進行設置直接返回
        if (previousValue == newValue)
        {
            return;
        }
        if (current != null)
        {
            //設置新值
            newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
        }
        else
        {
            //如果沒有使用過先初始化在存儲
            newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
        } 
        //給當前線程執(zhí)行上下文賦值新值
        Thread.CurrentThread._executionContext = (!isFlowSuppressed && AsyncLocalValueMap.IsEmpty(newValues)) ?
            null : new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed);
    }
}

通過上面的代碼我們可以知道GetLocalValue函數(shù)用于從當前線程的執(zhí)行上下文中獲取異步本地對象的值。通過檢索執(zhí)行狀態(tài)并查找本地值字典,該函數(shù)能夠獲取正確的值,實現(xiàn)了上下文數(shù)據的提取。SetLocalValue函數(shù)用于設置異步本地對象的值。它通過比較新舊值、操作本地值字典,并根據情況創(chuàng)建新的執(zhí)行上下文,確保了數(shù)據正確地傳遞和存儲。而異步操作過程中無非也正是不同線程上下文之間切換的問題。

有關ExecutionContext更詳細的源碼可以仔細閱讀一下,上面開頭提到的黑洞大佬文章地址。

在異步中流轉

上面我們展示了AsyncLocal相關的代碼實現(xiàn),知道了AsyncLocal本質是對ExecutionContext能力的封裝。每個線程Thread對象都包含了_executionContext類存儲ExecutionContext執(zhí)行上下問信息。接下來我們就來研究一下AsyncLocal中的數(shù)據是如何在異步過程中流轉的。首先我們來大致回顧一下異步編譯之后形成狀態(tài)機的執(zhí)行過程。

IAsyncStateMachine狀態(tài)機實例
  ->AsyncTaskMethodBuilder屬性類型AwaitUnsafeOnCompleted方法->
    ->AsyncTaskMethodBuilder<VoidTaskResult>.AwaitUnsafeOnCompleted方法
        判斷是否是以下類型
        ITaskAwaiter
        IConfiguredTaskAwaiter
        IStateMachineBoxAwareAwaiter
            ->Task是類型ITaskAwaiter類型所以調用UnsafeOnCompletedInternal方法
                ->Task.UnsafeSetContinuationForAwait
                    ->判斷交由哪種執(zhí)行策略執(zhí)行比如TaskScheduler或ThreadPool

到了Task.UnsafeSetContinuationForAwait方法這一步會涉及到異步代碼如何被調度的問題也就是會被自定義調度策略調度還是被線程池調度等等。我們來看一下這個方法的實現(xiàn),這個方法的實現(xiàn)代碼,在上面的<研究c#異步操作async await狀態(tài)機的總結>一文中也有介紹,咱們簡單看一下這里面的代碼[點擊查看Task.UnsafeSetContinuationForAwait源碼]

internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
{
    //是否捕獲同步上下文
    if (continueOnCapturedContext)
    {
        //在異步執(zhí)行完成后通過同步上下文執(zhí)行后續(xù)結果
        SynchronizationContext? syncCtx = SynchronizationContext.Current;
        if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext))
        {
            var tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, stateMachineBox.MoveNextAction, flowExecutionContext: false);
            if (!AddTaskContinuation(tc, addBeforeOthers: false))
            {
                tc.Run(this, canInlineContinuationTask: false);
            }
            return;
        }
        else
        {
            //選擇執(zhí)行默認的TaskScheduler還是自定義的Scheduler
            TaskScheduler? scheduler = TaskScheduler.InternalCurrent;
            if (scheduler != null && scheduler != TaskScheduler.Default)
            {
                var tc = new TaskSchedulerAwaitTaskContinuation(scheduler, stateMachineBox.MoveNextAction, flowExecutionContext: false);
                if (!AddTaskContinuation(tc, addBeforeOthers: false))
                {
                    tc.Run(this, canInlineContinuationTask: false);
                }
                return;
            }
        }
    }
    if (!AddTaskContinuation(stateMachineBox, addBeforeOthers: false))
    {
        //兜底的線程池策略
        ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, preferLocal: true);
    }
}

在許多情況下,特定的代碼需要在特定的線程上執(zhí)行,例如UI操作需要在UI線程上執(zhí)行,以避免UI沖突和渲染問題。SynchronizationContext就是為了解決這樣的問題而引入的。它允許您捕獲和存儲特定線程的上下文,并在需要時將任務切換到正確的線程。

上面的這段源碼是Task執(zhí)行操作的核心策略,咱們簡單的分析一下這段代碼涉及到的幾個核心的邏輯

首先是continueOnCapturedContext判斷,我們使用task.ConfigureAwait(false)這里設置的true或false設置的就是continueOnCapturedContext的值,如果為true則表示當前Task的執(zhí)行需要切換到當前SynchronizationContext的線程,如果用一段代碼描述默認情況下一步執(zhí)行的原理可以大致理解為下面的代碼。

SynchronizationContext sc = SynchronizationContext.Current;
ThreadPool.QueueUserWorkItem(_ =>
{
    try 
    { 
        DoWorker();
    }
    finally 
    { 
        sc.Post(_ => callback(), null); 
    }
});

其次是scheduler != TaskScheduler.Default判斷,如果自定義了TaskScheduler則使用自定義的TaskScheduler執(zhí)行,否則使用ThreadPool的線程池執(zhí)行。比如經典問題Task.Factory.StartNew()方法中await前后如果不想切換線程可以只用自定義TaskScheduler的方式只用一個Thread執(zhí)行所有任務,示例代碼如下所示。

await Task.Factory.StartNew(async () =>
{
    while (true)
    {
        Console.WriteLine($"Task之前Current Thread:{Thread.CurrentThread.ManagedThreadId}");
        await Task.Delay(2000);
        Console.WriteLine($"Task之后Current Thread:{Thread.CurrentThread.ManagedThreadId}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new SingleThreadScheduler());
public class SingleThreadScheduler : TaskScheduler
{
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    public SingleThreadScheduler()
    {
        var thread = new Thread(() =>
        {
            foreach (var task in _tasks.GetConsumingEnumerable())
            {
                if (!TryExecuteTask(task))
                { 
                    _tasks.Add(task);
                }
            }
        })
        {
            IsBackground = true
        };
        thread.Start();
    }
    protected override IEnumerable<Task>? GetScheduledTasks()
    {
        return _tasks;
    }
    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);
    }
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }
}

最后兜底的策略就是使用ThreadPool線程池去執(zhí)行異步任務。

好了接下來我們把探索的重心就在線程池的里,我們知道自從有了Task之后ThreadPool就是和Task關聯(lián)起來的,關聯(lián)的核心邏輯就是在ThreadPoolWorkQueue的DispatchWorkItem方法中[點擊查看ThreadPoolWorkQueue.DispatchWorkItem源碼]

private static void DispatchWorkItem(object workItem, Thread currentThread)
{
    //判斷如果線程池執(zhí)行的任務是Task任務則執(zhí)行Task里的ExecuteFromThreadPool方法
    if (workItem is Task task)
    {
        //傳遞當前的線程池里的線程
        task.ExecuteFromThreadPool(currentThread);
    }
    else
    {
        Debug.Assert(workItem is IThreadPoolWorkItem);
        Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
    }
}

通過上面的源碼我們可以看到如果線程池執(zhí)行的任務是Task任務則執(zhí)行Task里的ExecuteFromThreadPool方法里,從這里我們也可以看到TaskThreadPool的關聯(lián)性。需要注意的是這里雖然關聯(lián)的Task類型但是并非是Task類的實例本身,而是實現(xiàn)了Task類的狀態(tài)機類型AsyncStateMachineBox<TStateMachine>,通過跟蹤生成的狀態(tài)機代碼我們可以看到,實際添加到線程池的是IAsyncStateMachineBox實例,而AsyncStateMachineBox<TStateMachine>即繼承了Task也實現(xiàn)了IAsyncStateMachineBox接口,由于邏輯較多只粘貼咱們關注的部分[點擊查看AsyncTaskMethodBuilderT.GetStateMachineBox源碼]

private static IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(
            ref TStateMachine stateMachine,
            [NotNull] ref Task<TResult>? taskField)
            where TStateMachine : IAsyncStateMachine
{
    //捕獲當前線程上下文
    ExecutionContext? currentContext = ExecutionContext.Capture();
    //創(chuàng)建AsyncStateMachineBox實例
    AsyncStateMachineBox<TStateMachine> box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ?
    CreateDebugFinalizableAsyncStateMachineBox<TStateMachine>() : new AsyncStateMachineBox<TStateMachine>();
    taskField = box; 
    box.StateMachine = stateMachine;
    //傳遞當前捕獲的ExecutionContext執(zhí)行上下文
    box.Context = currentContext;
    return box;
}

在上面的方法中我們看到在初始化AsyncStateMachineBox<TStateMachine>實例之前先使用ExecutionContext.Capture()方法捕獲執(zhí)行上下文傳遞進來,這個時候還不存在被線程池執(zhí)行一說,所以捕獲的肯定是初始化Task的線程,注意這個時候還沒有執(zhí)行Task里的任何邏輯。所以我們關注一下ExecutionContext.Capture()方法的實現(xiàn)[點擊查看EExecutionContext.Capture源碼]

public static ExecutionContext? Capture()
{
    //捕獲當前線程的執(zhí)行上下文
    ExecutionContext? executionContext = Thread.CurrentThread._executionContext;
    if (executionContext == null)
    {
        executionContext = Default;
    }
    //如果設置ExecutionContext.RestoreFlow()則不進行捕獲
    else if (executionContext.m_isFlowSuppressed)
    {
        executionContext = null;
    }
    return executionContext;
}

通過上面的代碼我們看到了ExecutionContext.Capture()就是捕獲當前線程的執(zhí)行上下文,如果設置了ExecutionContext.RestoreFlow()上面邏輯里的m_isFlowSuppressed值則為true這個時候則不進行上下文捕獲。好了我們繼續(xù)往下看,上面的GetStateMachineBox方法返回的正是AsyncStateMachineBox<TStateMachine>類實例,它是線程池線程中真正執(zhí)行的Task實例,我們看一下的定義[點擊查看AsyncStateMachineBox源碼]

private class AsyncStateMachineBox<TStateMachine> :
            Task<TResult>, IAsyncStateMachineBox
            where TStateMachine : IAsyncStateMachine
{
}

這里我們可以看到AsyncStateMachineBox<TStateMachine>類是繼承自Task類也實現(xiàn)了IAsyncStateMachine,所以上面的ThreadPoolWorkQueue.DispatchWorkItem方法中調用的ExecuteFromThreadPool方法,本質是調用的AsyncStateMachineBox<TStateMachine>.ExecuteFromThreadPool方法,我們看一下它的實現(xiàn)方式[點擊查看AsyncStateMachineBox.ExecuteFromThreadPool源碼]

internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread);
public void MoveNext() => MoveNext(threadPoolThread: null);
private void MoveNext(Thread? threadPoolThread)
{
    //獲取之前捕獲的ExecutionContext執(zhí)行上下文
    ExecutionContext? context = Context;
    if (context == null)
    {
        Debug.Assert(StateMachine != null);
        StateMachine.MoveNext();
    }
    else
    {
        //判斷是否是線程池代碼
        if (threadPoolThread is null)
        {
            ExecutionContext.RunInternal(context, s_callback, this);
        }
        else
        {
            //默認是線程池線程,會走到這里的邏輯
            ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this);
        }
    }
}

源碼中的s_callback本質是調用狀態(tài)機生成的MoveNext方法,也就是在線程池線程里需要被執(zhí)行的邏輯,我們看一下它的定義

private static readonly ContextCallback s_callback = ExecutionContextCallback;
private static void ExecutionContextCallback(object? s)
{
    //本質調用的狀態(tài)機生成的MoveNext方法
    Unsafe.As<AsyncStateMachineBox<TStateMachine>>(s).StateMachine!.MoveNext();
}

上面的這段代碼可以清楚的看到線程池線程里執(zhí)行的邏輯是async await生成的狀態(tài)機里的代碼,完成了多線程執(zhí)行狀態(tài)機邏輯的關聯(lián)。

咱們再繼續(xù)看AsyncStateMachineBox.MoveNext方法里的執(zhí)行邏輯。由于咱們是默認機制所以這段邏輯肯定是在線程池里的線程執(zhí)行,所以會執(zhí)行到ExecutionContext.RunFromThreadPoolDispatchLoop()方法里,我們看一下它的邏輯[點擊查看ExecutionContext.RunFromThreadPoolDispatchLoop源碼]

internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state)
{
    //threadPoolThread是線程池線程,executionContext是Task.CapturedContext捕獲的執(zhí)行上下文
    if (executionContext != null && !executionContext.m_isDefault)
    {
        //如果線程存在ExecutionContext則把捕獲到的執(zhí)行上下文賦值給當前線程池線程的執(zhí)行上下文ExecutionContext
        RestoreChangedContextToThread(threadPoolThread, contextToRestore: executionContext, currentContext: null);
    }
    ExceptionDispatchInfo? edi = null;
    try
    {
        //執(zhí)行Task里的邏輯
        callback.Invoke(state);
    }
    catch (Exception ex)
    {
        edi = ExceptionDispatchInfo.Capture(ex);
    }
    //捕獲當前線程池線程
    Thread currentThread = threadPoolThread;
    //獲取當前線程池里的執(zhí)行上下文
    ExecutionContext? currentExecutionCtx = currentThread._executionContext;
    currentThread._synchronizationContext = null;
    if (currentExecutionCtx != null)
    {
        //將當前線程池里的執(zhí)行上下文清空,方便下次在線程池里獲取到當前線程處于初始化狀態(tài)
        RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx);
    }
    edi?.Throw();
}
internal static void RestoreChangedContextToThread(Thread currentThread, ExecutionContext? contextToRestore, ExecutionContext? currentContext)
{
    //把捕獲到的執(zhí)行上下文賦值給當前線程池線程的執(zhí)行上下文ExecutionContext
    currentThread._executionContext = contextToRestore;
    if ((currentContext != null && currentContext.HasChangeNotifications) ||
        (contextToRestore != null && contextToRestore.HasChangeNotifications))
    {
        OnValuesChanged(currentContext, contextToRestore);
    }
}

從上面的ExecutionContext.ExecuteFromThreadPool里的邏輯我們可以清楚的看到我們想要的結果,由于上面提供了大片的源碼,看起來容易混亂,老規(guī)矩我們在這里總結一下核心邏輯的執(zhí)行流程

  • 在線程池線程執(zhí)行當前Task里的任務之前即AsyncStateMachineBox實例,因為它就是Task子類。先通過ExecutionContext.Capture()捕獲當前線程的ExecutionContext執(zhí)行上下文,方便給接下來線程池里的線程使用。
  • 把上一步里捕獲到的ExecutionContext執(zhí)行上下文,填充到在ThreadPool里得到的線程的執(zhí)行上下文_executionContext里,這樣就完成了不同線程之間的執(zhí)行上下文流轉。
  • 執(zhí)行完當前Task之后,把當前線程池中捕獲的線程執(zhí)行上下文給還原掉,也就是上面的RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx)使用null賦值。

通過上面的總結相信大家對執(zhí)行上下文數(shù)據流轉有個很好的理解了。先捕獲當前線程執(zhí)行上下文,然后把捕獲的執(zhí)行上下文填充到要執(zhí)行任務的線程池的線程里,這樣就完成了不同線程中執(zhí)行上下文的流轉,執(zhí)行完Task任務之后把線程池里線程的執(zhí)行上下文還原掉方便下次執(zhí)行的時候是初始化狀態(tài)。

一個常見的坑

通過上面的源碼解析我們清楚的了解到了AsyncLocal在異步中是如何傳遞的,其實本質也就是在不同的線程里傳遞。那么接下來我們看一個大家在使用的過程中容易出錯的地方,還是剛開始的例子,我們改造一下示例代碼,如下所示

AsyncLocal<Person> context = new AsyncLocal<Person>();
context.Value = new Person { Id = 1, Name = "張三" };
Console.WriteLine($"Main之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
await Task.Run(() =>
{
    Console.WriteLine($"Task1之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value = new Person { Id = 2, Name = "李四" };
    Console.WriteLine($"Task1之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
await Task.Run(() =>
{
    Console.WriteLine($"Task2之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value = new Person { Id = 3, Name = "王五" };;
    Console.WriteLine($"Task2之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine($"Main之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");

這段代碼的執(zhí)行結果大家猜到了嗎?不賣關子了,上面的示例代碼執(zhí)行結果如下所示

Main之前:張三,ThreadId=1
Task1之前:張三,ThreadId=6
Task1之后:李四,ThreadId=6
Task2之前:張三,ThreadId=8
Task2之后:王五,ThreadId=8
Main之后:張三,ThreadId=8

這里我們可以看到,雖然我們在不同的Task里改變了AsyncLocal里的Value值比如改成了李四王五這種,但是執(zhí)行完Task之后仿佛值又被還原成最初初始化時候的樣子也就是上面說的張三,為什么會這個樣子呢?我們來分析一下

  • 1.初始化線程我們叫線程A,線程A.ExecutionContext存儲的是Person { Id = 1, Name = "張三" }內存區(qū)域的引用。
  • 2.第一個Task中執(zhí)行邏輯之前捕獲了線程A.ExecutionContext賦值給再線程池中線程線程B,現(xiàn)在線程A.ExecutionContext線程B.ExecutionContext都指向內存區(qū)域Person { Id = 1, Name = "張三" },因為數(shù)據是直接流轉過來的,上面的邏輯里我們提到過。
  • 3.在接下來的Task里我們得到線程池線程線程B在這里我們實例化了一個新的Person { Id = 2, Name = "李四" }實例,此時線程B.ExecutionContext的引用讓指向Person { Id = 2, Name = "李四" }內存區(qū)域,線程A.ExecutionContext指向的依然的是Person { Id = 1, Name = "張三" }內存區(qū)域。
  • 4.線程B執(zhí)行完成之后要還原掉執(zhí)行上下文賦值null,這個時候線程B.ExecutionContext的引用讓指向null,線程A.ExecutionContext指向的依然的是Person { Id = 1, Name = "張三" }內存區(qū)域。
  • 5.進入另一個Task之后我們得到線程池線程線程C,接下來線程C重復執(zhí)行上面的2、3、4步驟。

畫個圖簡單的演示一下,首先是初始化的時候這個時候線程A.ExecutionContext線程B.ExecutionContext都指向內存區(qū)域Person { Id = 1, Name = "張三" }如下所示

線程B里重新實例化了一個新的Person實例,此時的引用指向發(fā)生了變化,如下所示

這個時候線程A.ExecutionContext線程B.ExecutionContext已經沒啥關系了,所以你無論怎么操作線程B.ExecutionContext也和線程A.ExecutionContext沒有任何關系了。

總結

通過本文我們探究了AsyncLocal中的數(shù)據如何在異步之間如何流轉數(shù)據的,本質還是在多個線程之間流轉數(shù)據。接下來我們大致的總結一下本文的核心內容

  • 首先我們探究了AsyncLocal知道了它是對ExecutionContext執(zhí)行上下文能力的包裝,每個線程都會包含一個執(zhí)行上下文,即Thread._executionContext屬性。
  • 當使用異步或者線程池線程執(zhí)行Task里的任務之前,即AsyncStateMachineBox實例,因為它就是Task子類。先通過ExecutionContext.Capture()捕獲當前線程的ExecutionContext執(zhí)行上下文,方便給接下來線程池里的線程使用。
  • 然后把上一步里捕獲到的ExecutionContext執(zhí)行上下文,填充到在ThreadPool里得到的線程的執(zhí)行上下文_executionContext里,這樣就完成了不同線程之間的執(zhí)行上下文流轉。
  • 執(zhí)行完當前Task之后,把當前線程池中捕獲的線程執(zhí)行上下文給還原掉,也就是上面的RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx)使用null賦值。

也就是先捕獲當前線程執(zhí)行上下文,然后把捕獲的執(zhí)行上下文填充到要執(zhí)行任務的線程池的線程里,這樣就完成了不同線程中執(zhí)行上下文的流轉,執(zhí)行完Task任務之后把線程池里線程的執(zhí)行上下文還原掉方便下次執(zhí)行的時候是初始化狀態(tài)。

以上就是淺析C# AsyncLocal如何在異步間進行數(shù)據流轉的詳細內容,更多關于C# AsyncLocal的資料請關注腳本之家其它相關文章!

相關文章

最新評論