欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

.NET?Core?線程池(ThreadPool)底層原理源碼解析

 更新時(shí)間:2024年11月26日 09:12:38   作者:叫我安不理  
文章介紹了.NET?Core線程池的結(jié)構(gòu)和工作原理,包括生產(chǎn)者-消費(fèi)者模型、線程饑餓問(wèn)題、線程池增長(zhǎng)策略等,同時(shí),對(duì)比了Task和線程池在并發(fā)編程中的優(yōu)缺點(diǎn),并推薦使用Task來(lái)優(yōu)化線程池的使用,感興趣的朋友一起看看吧

簡(jiǎn)介

上文提到,創(chuàng)建線程在操作系統(tǒng)層面有4大無(wú)法避免的開(kāi)銷。因此復(fù)用線程明顯是一個(gè)更優(yōu)的策略,切降低了使用線程的門檻,提高程序員的下限。

.NET Core線程池日新月異,不同版本實(shí)現(xiàn)都有差別,在.NET 6之前,ThreadPool底層由C++承載。在之后由C#承載。本文以.NET 8.0.8為藍(lán)本,如有出入,請(qǐng)參考源碼.

ThreadPool結(jié)構(gòu)模型圖

眼見(jiàn)為實(shí)

https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs上源碼 and windbg

internal sealed partial class ThreadPoolWorkQueue
{
        internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();//全局隊(duì)列
        internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();//高優(yōu)先級(jí)隊(duì)列,比如Timer產(chǎn)生的定時(shí)任務(wù)
        internal readonly ConcurrentQueue<object> lowPriorityWorkItems =
            s_prioritizationExperiment ? new ConcurrentQueue<object>() : null!;//低優(yōu)先級(jí)隊(duì)列,比如回調(diào)
        internal readonly ConcurrentQueue<object>[] _assignableWorkItemQueues =
            new ConcurrentQueue<object>[s_assignableWorkItemQueueCount];//CPU 核心大于32個(gè),全局隊(duì)列會(huì)分裂為好幾個(gè),目的是降低CPU核心對(duì)全局隊(duì)列的鎖競(jìng)爭(zhēng)
}

ThreadPool生產(chǎn)者模型

眼見(jiàn)為實(shí)

        public void Enqueue(object callback, bool forceGlobal)
        {
            Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));
            if (_loggingEnabled && FrameworkEventSource.Log.IsEnabled())
                FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
#if CORECLR
            if (s_prioritizationExperiment)//lowPriorityWorkItems目前還是實(shí)驗(yàn)階段,CLR代碼比較偷懶,這一段代碼很不優(yōu)雅,沒(méi)有連續(xù)性。
            {
                EnqueueForPrioritizationExperiment(callback, forceGlobal);
            }
            else
#endif
            {
                ThreadPoolWorkQueueThreadLocals? tl;
                if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
                {
                    tl.workStealingQueue.LocalPush(callback);//如果沒(méi)有特殊情況,默認(rèn)加入本地隊(duì)列
                }
                else
                {
                    ConcurrentQueue<object> queue =
                        s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
                            ? tl.assignedGlobalWorkItemQueue//CPU>32 加入分裂的全局隊(duì)列
                            : workItems;//CPU<=32 加入全局隊(duì)列
                    queue.Enqueue(callback);
                }
            }
            EnsureThreadRequested();
        }

細(xì)心的朋友,會(huì)發(fā)現(xiàn)highPriorityWorkItems的注入判斷哪里去了?目前CLR對(duì)于高優(yōu)先級(jí)隊(duì)列只開(kāi)放給內(nèi)部,比如timer/Task使用

ThreadPool消費(fèi)者模型

眼見(jiàn)為實(shí)

public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
        {
            // Check for local work items
            object? workItem = tl.workStealingQueue.LocalPop();
            if (workItem != null)
            {
                return workItem;
            }
            // Check for high-priority work items
            if (tl.isProcessingHighPriorityWorkItems)
            {
                if (highPriorityWorkItems.TryDequeue(out workItem))
                {
                    return workItem;
                }
                tl.isProcessingHighPriorityWorkItems = false;
            }
            else if (
                _mayHaveHighPriorityWorkItems != 0 &&
                Interlocked.CompareExchange(ref _mayHaveHighPriorityWorkItems, 0, 1) != 0 &&
                TryStartProcessingHighPriorityWorkItemsAndDequeue(tl, out workItem))
            {
                return workItem;
            }
            // Check for work items from the assigned global queue
            if (s_assignableWorkItemQueueCount > 0 && tl.assignedGlobalWorkItemQueue.TryDequeue(out workItem))
            {
                return workItem;
            }
            // Check for work items from the global queue
            if (workItems.TryDequeue(out workItem))
            {
                return workItem;
            }
            // Check for work items in other assignable global queues
            uint randomValue = tl.random.NextUInt32();
            if (s_assignableWorkItemQueueCount > 0)
            {
                int queueIndex = tl.queueIndex;
                int c = s_assignableWorkItemQueueCount;
                int maxIndex = c - 1;
                for (int i = (int)(randomValue % (uint)c); c > 0; i = i < maxIndex ? i + 1 : 0, c--)
                {
                    if (i != queueIndex && _assignableWorkItemQueues[i].TryDequeue(out workItem))
                    {
                        return workItem;
                    }
                }
            }
#if CORECLR
            // Check for low-priority work items
            if (s_prioritizationExperiment && lowPriorityWorkItems.TryDequeue(out workItem))
            {
                return workItem;
            }
#endif
            // Try to steal from other threads' local work items
            {
                WorkStealingQueue localWsq = tl.workStealingQueue;
                WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
                int c = queues.Length;
                Debug.Assert(c > 0, "There must at least be a queue for this thread.");
                int maxIndex = c - 1;
                for (int i = (int)(randomValue % (uint)c); c > 0; i = i < maxIndex ? i + 1 : 0, c--)
                {
                    WorkStealingQueue otherQueue = queues[i];
                    if (otherQueue != localWsq && otherQueue.CanSteal)
                    {
                        workItem = otherQueue.TrySteal(ref missedSteal);
                        if (workItem != null)
                        {
                            return workItem;
                        }
                    }
                }
            }
            return null;
        }

什么是線程饑餓?

線程饑餓(Thread Starvation)是指線程長(zhǎng)時(shí)間得不到調(diào)度(時(shí)間片),從而無(wú)法完成任務(wù)。

  • 線程被無(wú)限阻塞
    當(dāng)某個(gè)線程獲取鎖后長(zhǎng)期不釋放,其它線程一直在等待
  • 線程優(yōu)先級(jí)降低
    操作系統(tǒng)鎖競(jìng)爭(zhēng)中,高優(yōu)先級(jí)線程,搶占低優(yōu)先級(jí)線程的CPU時(shí)間
  • 線程在等待
    比如線程Wait/Result時(shí),線程池資源不夠,導(dǎo)致得不到執(zhí)行

眼見(jiàn)為實(shí)

@一線碼農(nóng) 使用大佬的案例

http://www.dbjr.com.cn/program/3313770o1.htm

http://www.dbjr.com.cn/aspnet/3313810g7.htm

windbg sos bug依舊存在

大佬的文章中,描述sos存在bug,無(wú)法顯示線程堆積情況

經(jīng)實(shí)測(cè),在.net 8中依舊存在此bug

99851個(gè)積壓隊(duì)列,沒(méi)有顯示出來(lái)

ThreadPool如何改善線程饑餓

CLR線程池使用爬山算法來(lái)動(dòng)態(tài)調(diào)整線程池的大小來(lái)來(lái)改善線程饑餓的問(wèn)題。本人水平有限,放出地址,有興趣的同學(xué)可以自行研究https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs

ThreadPool如何增加線程

在 PortableThreadPool 中有一個(gè)子類叫 GateThread,它就是專門用來(lái)增減線程的類

其底層使用一個(gè)while (true) 每隔500ms來(lái)輪詢線程數(shù)量是否足夠,以及一個(gè)AutoResetEvent來(lái)接收注入線程Event.如果不夠就新增

《CLR vir C#》 一書中,提過(guò)一句 CLR線程池每秒最多新增1~2個(gè)線程。結(jié)論的源頭就是在這里注意:是線程池注入線程每秒1~2個(gè),不是每秒只能創(chuàng)建1~2個(gè)線程。OS創(chuàng)建線程的速度塊多了。

眼見(jiàn)為實(shí)

https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs

眼見(jiàn)為實(shí)

        static void Main(string[] args)
        {
            for (int i = 0;i<=100000;i++)
            {
                ThreadPool.QueueUserWorkItem((x) =>
                {
                    Console.WriteLine($"當(dāng)前線程Id:{Thread.CurrentThread.ManagedThreadId}");
                    Thread.Sleep(int.MaxValue);
                });
            }
            Console.ReadLine();
        }

可以觀察輸出,判斷是不是每秒注入1~2個(gè)線程

Task

不用多說(shuō)什么了吧?

Task的底層調(diào)用模型圖

Task的底層實(shí)現(xiàn)主要取決于TaskSchedule,一般來(lái)說(shuō),除了UI線程外,默認(rèn)是調(diào)度到線程池

眼見(jiàn)為實(shí)

Task.Run(() => { { Console.WriteLine("Test"); } });

其底層會(huì)自動(dòng)調(diào)用Start(),Start()底層調(diào)用的TaskShedule.QueueTask().而作為實(shí)現(xiàn)類ThreadPoolTaskScheduler.QueueTask底層調(diào)用如下。

可以看到,默認(rèn)情況下(除非你自己實(shí)現(xiàn)一個(gè)TaskShedule抽象類).Task的底層使用ThreadPool來(lái)管理。

有意思的是,對(duì)于長(zhǎng)任務(wù)(Long Task),直接是用一個(gè)單獨(dú)的后臺(tái)線程來(lái)管理,完全不參與調(diào)度。

Task對(duì)線程池的優(yōu)化

既然Task的底層是使用ThreadPool,而線程池注入速度是比較慢的。Task作為線程池的高度封裝,有沒(méi)有優(yōu)化呢?答案是Yes當(dāng)使用Task.Result時(shí),底層會(huì)調(diào)用InternalWaitCore(),如果Task還未完成,會(huì)調(diào)用ThreadPool.NotifyThreadBlocked()來(lái)通知ThreadPool當(dāng)前線程已經(jīng)被阻塞,必須馬上注入一個(gè)新線程來(lái)代替被阻塞的線程。相對(duì)每500ms來(lái)輪詢注入線程,該方式采用事件驅(qū)動(dòng),注入線程池的速度會(huì)更快。

眼見(jiàn)為實(shí)

  static void Main(string[] args)
        {
            var client = new HttpClient();
            for(int i = 0; i < 100000; i++)
            {
                ThreadPool.QueueUserWorkItem(x =>
                {
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")} -> {x}: 這是耗時(shí)任務(wù)");
                    try
                    {
                        var content = client.GetStringAsync("https://youtube.com").Result;
                        Console.WriteLine(content);
                    }
                    catch (Exception)
                    {
                        throw;
                    }
                });
            }
            Console.ReadLine();
        }

其底層通過(guò)AutoResetEvent來(lái)觸發(fā)注入線程的Event消息

結(jié)論

多用Task,它更完善。對(duì)線程池優(yōu)化更好。沒(méi)有不使用Task的理由

到此這篇關(guān)于.NET Core 線程池(ThreadPool)底層原理淺談的文章就介紹到這了,更多相關(guān).NET Core 線程池ThreadPool內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論