欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

.NET Framework 4.8 多線程編程最佳實(shí)踐

 更新時(shí)間:2025年09月18日 09:50:16   作者:yz123lucky  
本文全面介紹.NET Framework 4.8多線程編程,涵蓋Task創(chuàng)建、Async/Await異步模式、并行處理、數(shù)據(jù)流TPLDataflow,同步機(jī)制及性能優(yōu)化,指導(dǎo)開發(fā)者構(gòu)建高效可靠的應(yīng)用,感興趣的朋友跟隨小編一起看看吧

.NET Framework 4.8 多線程編程

1.Task基礎(chǔ)編程

  • Task的多種創(chuàng)建方式(Task.Run、Task.Factory.StartNew等)
  • 連續(xù)任務(wù)(ContinueWith)和任務(wù)鏈
  • Task異常處理機(jī)制
  • Task的狀態(tài)管理

2.Async/Await模式

  • 基礎(chǔ)async/await使用方法
  • 異步方法的返回值處理
  • 并發(fā)執(zhí)行多個(gè)異步操作(Task.WhenAll、Task.WhenAny)
  • 異步異常處理
  • CancellationToken取消機(jī)制

3.高級(jí)異步模式

  • TaskCompletionSource創(chuàng)建自定義異步操作
  • 異步信號(hào)量(SemaphoreSlim)
  • 自定義異步鎖實(shí)現(xiàn)
  • 異步生產(chǎn)者-消費(fèi)者模式
  • 限流的批量異步操作

4.并行編程(Parallel和PLINQ)

  • Parallel.For和Parallel.ForEach循環(huán)
  • Parallel.Invoke并行執(zhí)行多個(gè)操作
  • ParallelOptions控制并行度
  • PLINQ查詢和數(shù)據(jù)處理
  • 并行聚合操作
  • 性能對(duì)比示例

5.數(shù)據(jù)流(TPL Dataflow)

  • TransformBlock、ActionBlock等數(shù)據(jù)流塊
  • 構(gòu)建數(shù)據(jù)處理管道
  • BroadcastBlock一對(duì)多廣播
  • BatchBlock批處理
  • JoinBlock數(shù)據(jù)合并

6.異步編程最佳實(shí)踐

  • 避免async void(除事件處理器外)
  • ConfigureAwait的正確使用
  • 避免阻塞異步代碼(防死鎖)
  • ValueTask性能優(yōu)化
  • 正確處理多個(gè)異步操作
  • IAsyncDisposable資源清理

關(guān)鍵技術(shù)對(duì)比:

傳統(tǒng)Thread vs 現(xiàn)代Task

特性ThreadTask
創(chuàng)建開銷低(使用線程池)
返回值不支持Task支持
異常處理困難內(nèi)置支持
組合操作手動(dòng)實(shí)現(xiàn)WhenAll/WhenAny
取消機(jī)制手動(dòng)實(shí)現(xiàn)CancellationToken

同步 vs 異步編程模式

場(chǎng)景同步(Thread/Lock)異步(async/await)
I/O操作阻塞線程不阻塞,高效
CPU密集適合配合Task.Run
可讀性復(fù)雜簡(jiǎn)潔
調(diào)試困難相對(duì)簡(jiǎn)單
性能線程開銷大資源利用率高

使用場(chǎng)景建議:

使用async/await的場(chǎng)景:

  • Web API調(diào)用
  • 數(shù)據(jù)庫(kù)操作
  • 文件I/O
  • 網(wǎng)絡(luò)通信
  • UI響應(yīng)性要求高的應(yīng)用

使用Parallel/PLINQ的場(chǎng)景:

  • 大數(shù)據(jù)集處理
  • CPU密集型計(jì)算
  • 批量數(shù)據(jù)轉(zhuǎn)換
  • 并行算法實(shí)現(xiàn)

使用傳統(tǒng)Thread的場(chǎng)景:

  • 需要精確控制線程屬性
  • 長(zhǎng)時(shí)間運(yùn)行的后臺(tái)任務(wù)
  • 與舊代碼庫(kù)集成
  • 特殊的線程優(yōu)先級(jí)需求

性能優(yōu)化建議:

  1. 優(yōu)先使用async/await:避免阻塞線程,提高資源利用率
  2. 使用ConfigureAwait(false):在庫(kù)代碼中避免捕獲上下文
  3. 控制并發(fā)度:使用SemaphoreSlim或ParallelOptions限制并發(fā)
  4. 選擇合適的數(shù)據(jù)結(jié)構(gòu):使用Concurrent集合處理并發(fā)訪問(wèn)
  5. 避免過(guò)度并行化:評(píng)估并行化的收益是否大于開銷

這份完整的指南現(xiàn)在涵蓋了.NET Framework 4.8中從基礎(chǔ)Thread到現(xiàn)代async/await的全部并發(fā)編程技術(shù)

一、線程基礎(chǔ)概念

在.NET Framework 4.8中,多線程編程主要涉及以下幾個(gè)核心類:

  • Thread - 線程的基本類
  • ThreadPool - 線程池
  • Task - 任務(wù)并行庫(kù)(TPL)
  • 各種同步原語(yǔ)(lock、Monitor、Mutex、Semaphore等)

二、線程的生命周期控制

1. 線程的創(chuàng)建與啟動(dòng)

using System;
using System.Threading;
public class ThreadLifecycleExample
{
    // 線程工作方法
    private static void WorkerMethod(object state)
    {
        string threadName = (string)state;
        Console.WriteLine($"[{threadName}] 線程開始執(zhí)行,線程ID: {Thread.CurrentThread.ManagedThreadId}");
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"[{threadName}] 工作中... {i}");
            Thread.Sleep(1000);
        }
        Console.WriteLine($"[{threadName}] 線程執(zhí)行完成");
    }
    public static void BasicThreadExample()
    {
        // 創(chuàng)建線程的幾種方式
        // 方式1:使用ThreadStart委托
        Thread thread1 = new Thread(new ThreadStart(() => WorkerMethod("Thread1")));
        // 方式2:使用ParameterizedThreadStart委托
        Thread thread2 = new Thread(new ParameterizedThreadStart(WorkerMethod));
        // 方式3:直接傳遞方法
        Thread thread3 = new Thread(() => WorkerMethod("Thread3"));
        // 設(shè)置線程屬性
        thread1.Name = "Worker-1";
        thread1.IsBackground = false; // 前臺(tái)線程
        thread2.Name = "Worker-2";
        thread2.IsBackground = true;  // 后臺(tái)線程
        // 啟動(dòng)線程
        thread1.Start();
        thread2.Start("Thread2");
        thread3.Start();
        // 等待線程完成
        thread1.Join();
        thread2.Join();
        thread3.Join();
    }
}

2. 線程的暫停、繼續(xù)與停止

using System;
using System.Threading;
public class ThreadControlExample
{
    private static ManualResetEvent pauseEvent = new ManualResetEvent(true);
    private static ManualResetEvent shutdownEvent = new ManualResetEvent(false);
    private static CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    // 可控制的工作線程
    private static void ControllableWorker(object state)
    {
        string workerName = (string)state;
        CancellationToken token = cancellationTokenSource.Token;
        Console.WriteLine($"[{workerName}] 線程啟動(dòng)");
        try
        {
            while (!token.IsCancellationRequested)
            {
                // 等待暫停信號(hào)
                pauseEvent.WaitOne();
                // 檢查是否需要退出
                if (token.IsCancellationRequested)
                    break;
                // 執(zhí)行工作
                Console.WriteLine($"[{workerName}] 正在工作... 時(shí)間: {DateTime.Now:HH:mm:ss}");
                // 模擬工作負(fù)載
                Thread.Sleep(1000);
                // 檢查停止信號(hào)
                if (WaitHandle.WaitAny(new WaitHandle[] { shutdownEvent }, 0) == 0)
                {
                    Console.WriteLine($"[{workerName}] 收到停止信號(hào)");
                    break;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"[{workerName}] 操作被取消");
        }
        finally
        {
            Console.WriteLine($"[{workerName}] 線程退出");
        }
    }
    public static void ThreadControlDemo()
    {
        Thread workerThread = new Thread(ControllableWorker)
        {
            Name = "ControlledWorker",
            IsBackground = false
        };
        // 啟動(dòng)線程
        workerThread.Start("Worker");
        // 讓線程運(yùn)行3秒
        Thread.Sleep(3000);
        // 暫停線程
        Console.WriteLine("\n[主線程] 暫停工作線程...");
        pauseEvent.Reset();
        Thread.Sleep(2000);
        // 繼續(xù)線程
        Console.WriteLine("[主線程] 恢復(fù)工作線程...");
        pauseEvent.Set();
        Thread.Sleep(3000);
        // 停止線程(推薦方式:使用CancellationToken)
        Console.WriteLine("\n[主線程] 停止工作線程...");
        cancellationTokenSource.Cancel();
        // 等待線程結(jié)束
        workerThread.Join(5000);
        if (workerThread.IsAlive)
        {
            Console.WriteLine("[主線程] 強(qiáng)制終止線程");
            workerThread.Abort(); // 注意:不推薦使用,僅作為最后手段
        }
    }
}

三、線程間通信機(jī)制

1. 共享內(nèi)存通信

using System;
using System.Threading;
using System.Collections.Concurrent;
public class SharedMemoryCommunication
{
    // 共享數(shù)據(jù)結(jié)構(gòu)
    private static readonly ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();
    private static readonly object lockObject = new object();
    private static int sharedCounter = 0;
    // 生產(chǎn)者線程
    private static void Producer(object state)
    {
        string producerName = (string)state;
        Random random = new Random();
        for (int i = 0; i < 10; i++)
        {
            string message = $"{producerName}-Message-{i}";
            messageQueue.Enqueue(message);
            // 使用lock保護(hù)共享變量
            lock (lockObject)
            {
                sharedCounter++;
                Console.WriteLine($"[生產(chǎn)者 {producerName}] 發(fā)送: {message}, 總計(jì)數(shù): {sharedCounter}");
            }
            Thread.Sleep(random.Next(100, 500));
        }
    }
    // 消費(fèi)者線程
    private static void Consumer(object state)
    {
        string consumerName = (string)state;
        while (true)
        {
            if (messageQueue.TryDequeue(out string message))
            {
                Console.WriteLine($"[消費(fèi)者 {consumerName}] 接收: {message}");
                Thread.Sleep(200);
            }
            else
            {
                // 隊(duì)列為空時(shí)等待
                Thread.Sleep(100);
                // 檢查是否所有生產(chǎn)者都完成了
                lock (lockObject)
                {
                    if (sharedCounter >= 20 && messageQueue.IsEmpty)
                        break;
                }
            }
        }
        Console.WriteLine($"[消費(fèi)者 {consumerName}] 退出");
    }
    public static void RunSharedMemoryExample()
    {
        Thread producer1 = new Thread(Producer) { Name = "Producer1" };
        Thread producer2 = new Thread(Producer) { Name = "Producer2" };
        Thread consumer1 = new Thread(Consumer) { Name = "Consumer1" };
        Thread consumer2 = new Thread(Consumer) { Name = "Consumer2" };
        producer1.Start("P1");
        producer2.Start("P2");
        consumer1.Start("C1");
        consumer2.Start("C2");
        producer1.Join();
        producer2.Join();
        consumer1.Join();
        consumer2.Join();
    }
}

2. 事件通信機(jī)制

using System;
using System.Threading;
public class EventCommunication
{
    // 自動(dòng)重置事件
    private static AutoResetEvent autoEvent = new AutoResetEvent(false);
    // 手動(dòng)重置事件
    private static ManualResetEvent manualEvent = new ManualResetEvent(false);
    // 倒計(jì)時(shí)事件
    private static CountdownEvent countdownEvent = new CountdownEvent(3);
    // 等待事件的工作線程
    private static void WaitingWorker(object state)
    {
        string workerName = (string)state;
        Console.WriteLine($"[{workerName}] 等待事件信號(hào)...");
        // 等待自動(dòng)重置事件
        autoEvent.WaitOne();
        Console.WriteLine($"[{workerName}] 收到自動(dòng)重置事件信號(hào)");
        // 等待手動(dòng)重置事件
        manualEvent.WaitOne();
        Console.WriteLine($"[{workerName}] 收到手動(dòng)重置事件信號(hào)");
        // 通知倒計(jì)時(shí)事件
        countdownEvent.Signal();
        Console.WriteLine($"[{workerName}] 完成工作");
    }
    public static void RunEventExample()
    {
        Thread[] workers = new Thread[3];
        for (int i = 0; i < 3; i++)
        {
            workers[i] = new Thread(WaitingWorker);
            workers[i].Start($"Worker{i + 1}");
        }
        Thread.Sleep(1000);
        // 觸發(fā)自動(dòng)重置事件(只有一個(gè)線程會(huì)收到信號(hào))
        Console.WriteLine("\n[主線程] 觸發(fā)自動(dòng)重置事件");
        autoEvent.Set();
        Thread.Sleep(500);
        autoEvent.Set();
        Thread.Sleep(500);
        autoEvent.Set();
        Thread.Sleep(1000);
        // 觸發(fā)手動(dòng)重置事件(所有等待的線程都會(huì)收到信號(hào))
        Console.WriteLine("\n[主線程] 觸發(fā)手動(dòng)重置事件");
        manualEvent.Set();
        // 等待所有線程完成
        countdownEvent.Wait();
        Console.WriteLine("\n[主線程] 所有工作線程已完成");
    }
}

3. 信號(hào)量通信

using System;
using System.Threading;
public class SemaphoreCommunication
{
    // 信號(hào)量:最多允許3個(gè)線程同時(shí)訪問(wèn)
    private static Semaphore semaphore = new Semaphore(3, 3);
    // 互斥量
    private static Mutex mutex = new Mutex();
    // 使用信號(hào)量控制的工作方法
    private static void SemaphoreWorker(object state)
    {
        string workerName = (string)state;
        Console.WriteLine($"[{workerName}] 等待進(jìn)入臨界區(qū)...");
        semaphore.WaitOne();
        try
        {
            Console.WriteLine($"[{workerName}] 進(jìn)入臨界區(qū),開始工作");
            Thread.Sleep(2000); // 模擬工作
            Console.WriteLine($"[{workerName}] 完成工作");
        }
        finally
        {
            Console.WriteLine($"[{workerName}] 離開臨界區(qū)");
            semaphore.Release();
        }
    }
    // 使用互斥量的工作方法
    private static void MutexWorker(object state)
    {
        string workerName = (string)state;
        Console.WriteLine($"[{workerName}] 嘗試獲取互斥量...");
        mutex.WaitOne();
        try
        {
            Console.WriteLine($"[{workerName}] 獲得互斥量,獨(dú)占訪問(wèn)資源");
            Thread.Sleep(1000);
            Console.WriteLine($"[{workerName}] 完成獨(dú)占訪問(wèn)");
        }
        finally
        {
            mutex.ReleaseMutex();
            Console.WriteLine($"[{workerName}] 釋放互斥量");
        }
    }
    public static void RunSemaphoreExample()
    {
        Console.WriteLine("=== 信號(hào)量示例 ===");
        Thread[] semaphoreThreads = new Thread[6];
        for (int i = 0; i < 6; i++)
        {
            semaphoreThreads[i] = new Thread(SemaphoreWorker);
            semaphoreThreads[i].Start($"SemWorker{i + 1}");
        }
        foreach (var thread in semaphoreThreads)
            thread.Join();
        Console.WriteLine("\n=== 互斥量示例 ===");
        Thread[] mutexThreads = new Thread[3];
        for (int i = 0; i < 3; i++)
        {
            mutexThreads[i] = new Thread(MutexWorker);
            mutexThreads[i].Start($"MutexWorker{i + 1}");
        }
        foreach (var thread in mutexThreads)
            thread.Join();
    }
}

四、線程保護(hù)與同步

1. Lock和Monitor

using System;
using System.Threading;
public class ThreadSynchronization
{
    private static readonly object lockObject = new object();
    private static int balance = 1000;
    // 使用lock語(yǔ)句
    private static void TransferWithLock(int amount, string from, string to)
    {
        lock (lockObject)
        {
            Console.WriteLine($"[{Thread.CurrentThread.Name}] 轉(zhuǎn)賬開始: {from} -> {to}, 金額: {amount}");
            if (balance >= amount)
            {
                balance -= amount;
                Thread.Sleep(100); // 模擬處理時(shí)間
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 轉(zhuǎn)賬成功,余額: {balance}");
            }
            else
            {
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 余額不足,轉(zhuǎn)賬失敗");
            }
        }
    }
    // 使用Monitor類(更靈活的控制)
    private static void TransferWithMonitor(int amount, string from, string to)
    {
        bool lockTaken = false;
        try
        {
            Monitor.TryEnter(lockObject, TimeSpan.FromSeconds(1), ref lockTaken);
            if (lockTaken)
            {
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 獲得鎖,執(zhí)行轉(zhuǎn)賬");
                if (balance >= amount)
                {
                    balance -= amount;
                    Thread.Sleep(100);
                    Console.WriteLine($"[{Thread.CurrentThread.Name}] 轉(zhuǎn)賬成功,余額: {balance}");
                    // 通知等待的線程
                    Monitor.Pulse(lockObject);
                }
                else
                {
                    Console.WriteLine($"[{Thread.CurrentThread.Name}] 余額不足,等待...");
                    Monitor.Wait(lockObject, 1000); // 等待最多1秒
                }
            }
            else
            {
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 無(wú)法獲得鎖");
            }
        }
        finally
        {
            if (lockTaken)
            {
                Monitor.Exit(lockObject);
            }
        }
    }
    public static void RunSynchronizationExample()
    {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++)
        {
            int amount = (i + 1) * 100;
            threads[i] = new Thread(() => TransferWithLock(amount, "AccountA", "AccountB"))
            {
                Name = $"Transfer-Thread-{i + 1}"
            };
        }
        foreach (var thread in threads)
            thread.Start();
        foreach (var thread in threads)
            thread.Join();
        Console.WriteLine($"\n最終余額: {balance}");
    }
}

2. ReaderWriterLock(讀寫鎖)

using System;
using System.Threading;
using System.Collections.Generic;
public class ReaderWriterExample
{
    private static ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
    private static Dictionary<string, int> cache = new Dictionary<string, int>();
    private static Random random = new Random();
    // 讀操作
    private static void ReadData(object state)
    {
        string readerName = (string)state;
        for (int i = 0; i < 5; i++)
        {
            rwLock.EnterReadLock();
            try
            {
                Console.WriteLine($"[{readerName}] 讀取數(shù)據(jù)開始");
                foreach (var item in cache)
                {
                    Console.WriteLine($"[{readerName}] 讀取: {item.Key} = {item.Value}");
                }
                Thread.Sleep(100);
            }
            finally
            {
                Console.WriteLine($"[{readerName}] 讀取完成");
                rwLock.ExitReadLock();
            }
            Thread.Sleep(random.Next(100, 300));
        }
    }
    // 寫操作
    private static void WriteData(object state)
    {
        string writerName = (string)state;
        for (int i = 0; i < 3; i++)
        {
            rwLock.EnterWriteLock();
            try
            {
                string key = $"Data-{random.Next(100)}";
                int value = random.Next(1000);
                Console.WriteLine($"[{writerName}] 寫入數(shù)據(jù): {key} = {value}");
                cache[key] = value;
                Thread.Sleep(200);
            }
            finally
            {
                Console.WriteLine($"[{writerName}] 寫入完成");
                rwLock.ExitWriteLock();
            }
            Thread.Sleep(random.Next(200, 500));
        }
    }
    // 可升級(jí)讀鎖示例
    private static void UpgradeableReadExample()
    {
        rwLock.EnterUpgradeableReadLock();
        try
        {
            Console.WriteLine("[UpgradeableReader] 進(jìn)入可升級(jí)讀鎖");
            // 先讀取
            foreach (var item in cache)
            {
                Console.WriteLine($"[UpgradeableReader] 讀取: {item.Key} = {item.Value}");
            }
            // 根據(jù)條件升級(jí)為寫鎖
            if (cache.Count < 5)
            {
                rwLock.EnterWriteLock();
                try
                {
                    Console.WriteLine("[UpgradeableReader] 升級(jí)為寫鎖");
                    cache[$"Upgraded-{DateTime.Now.Ticks}"] = random.Next(1000);
                }
                finally
                {
                    rwLock.ExitWriteLock();
                }
            }
        }
        finally
        {
            rwLock.ExitUpgradeableReadLock();
        }
    }
    public static void RunReaderWriterExample()
    {
        // 初始化一些數(shù)據(jù)
        cache["Initial1"] = 100;
        cache["Initial2"] = 200;
        Thread reader1 = new Thread(ReadData) { Name = "Reader1" };
        Thread reader2 = new Thread(ReadData) { Name = "Reader2" };
        Thread writer1 = new Thread(WriteData) { Name = "Writer1" };
        Thread writer2 = new Thread(WriteData) { Name = "Writer2" };
        Thread upgrader = new Thread(UpgradeableReadExample) { Name = "Upgrader" };
        reader1.Start("Reader1");
        reader2.Start("Reader2");
        writer1.Start("Writer1");
        writer2.Start("Writer2");
        Thread.Sleep(500);
        upgrader.Start();
        reader1.Join();
        reader2.Join();
        writer1.Join();
        writer2.Join();
        upgrader.Join();
        Console.WriteLine("\n最終緩存內(nèi)容:");
        foreach (var item in cache)
        {
            Console.WriteLine($"{item.Key} = {item.Value}");
        }
    }
}

3. 線程安全集合

using System;
using System.Threading;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ThreadSafeCollections
{
    // 線程安全的集合
    private static ConcurrentDictionary<string, int> concurrentDict = new ConcurrentDictionary<string, int>();
    private static ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();
    private static ConcurrentStack<int> concurrentStack = new ConcurrentStack<int>();
    private static ConcurrentBag<string> concurrentBag = new ConcurrentBag<string>();
    private static BlockingCollection<string> blockingCollection = new BlockingCollection<string>(10);
    // 使用BlockingCollection的生產(chǎn)者-消費(fèi)者模式
    private static void BlockingProducer()
    {
        for (int i = 0; i < 20; i++)
        {
            string item = $"Item-{i}";
            if (blockingCollection.TryAdd(item, TimeSpan.FromSeconds(1)))
            {
                Console.WriteLine($"[生產(chǎn)者] 添加: {item}");
            }
            else
            {
                Console.WriteLine($"[生產(chǎn)者] 添加失?。ǔ瑫r(shí)): {item}");
            }
            Thread.Sleep(100);
        }
        blockingCollection.CompleteAdding();
        Console.WriteLine("[生產(chǎn)者] 完成添加");
    }
    private static void BlockingConsumer(object state)
    {
        string consumerName = (string)state;
        try
        {
            foreach (var item in blockingCollection.GetConsumingEnumerable())
            {
                Console.WriteLine($"[{consumerName}] 消費(fèi): {item}");
                Thread.Sleep(200);
            }
        }
        catch (InvalidOperationException)
        {
            Console.WriteLine($"[{consumerName}] 集合已完成");
        }
        Console.WriteLine($"[{consumerName}] 退出");
    }
    // 并發(fā)字典操作示例
    private static void ConcurrentDictionaryExample()
    {
        // 添加或更新
        concurrentDict.AddOrUpdate("key1", 1, (key, oldValue) => oldValue + 1);
        // 獲取或添加
        int value = concurrentDict.GetOrAdd("key2", 2);
        // 嘗試獲取
        if (concurrentDict.TryGetValue("key1", out int result))
        {
            Console.WriteLine($"獲取值: key1 = {result}");
        }
        // 嘗試移除
        if (concurrentDict.TryRemove("key1", out int removed))
        {
            Console.WriteLine($"移除值: key1 = {removed}");
        }
    }
    public static void RunThreadSafeCollectionExample()
    {
        Console.WriteLine("=== BlockingCollection 示例 ===");
        Thread producer = new Thread(BlockingProducer);
        Thread consumer1 = new Thread(BlockingConsumer);
        Thread consumer2 = new Thread(BlockingConsumer);
        producer.Start();
        consumer1.Start("Consumer1");
        consumer2.Start("Consumer2");
        producer.Join();
        consumer1.Join();
        consumer2.Join();
        Console.WriteLine("\n=== ConcurrentDictionary 示例 ===");
        ConcurrentDictionaryExample();
    }
}

五、高級(jí)線程同步:Barrier和SpinLock

using System;
using System.Threading;
using System.Threading.Tasks;
public class AdvancedSynchronization
{
    // Barrier:同步多個(gè)線程的執(zhí)行階段
    private static Barrier barrier = new Barrier(3, (b) =>
    {
        Console.WriteLine($"\n[Barrier] 階段 {b.CurrentPhaseNumber} 完成,所有線程已同步\n");
    });
    // SpinLock:自旋鎖(適用于短時(shí)間的鎖定)
    private static SpinLock spinLock = new SpinLock();
    private static int spinLockCounter = 0;
    // 使用Barrier的多階段任務(wù)
    private static void BarrierTask(object state)
    {
        string taskName = (string)state;
        for (int phase = 0; phase < 3; phase++)
        {
            Console.WriteLine($"[{taskName}] 階段 {phase} 開始工作");
            Thread.Sleep(new Random().Next(1000, 3000));
            Console.WriteLine($"[{taskName}] 階段 {phase} 工作完成,等待其他線程...");
            // 等待所有線程完成當(dāng)前階段
            barrier.SignalAndWait();
        }
        Console.WriteLine($"[{taskName}] 所有階段完成");
    }
    // 使用SpinLock的快速操作
    private static void SpinLockTask(object state)
    {
        string taskName = (string)state;
        bool lockTaken = false;
        for (int i = 0; i < 1000; i++)
        {
            try
            {
                spinLock.Enter(ref lockTaken);
                // 快速的臨界區(qū)操作
                spinLockCounter++;
                if (i % 100 == 0)
                {
                    Console.WriteLine($"[{taskName}] 計(jì)數(shù)器: {spinLockCounter}");
                }
            }
            finally
            {
                if (lockTaken)
                {
                    spinLock.Exit();
                    lockTaken = false;
                }
            }
            // 模擬其他工作
            if (i % 10 == 0)
                Thread.Yield();
        }
    }
    public static void RunAdvancedSynchronizationExample()
    {
        Console.WriteLine("=== Barrier 示例 ===");
        Thread[] barrierThreads = new Thread[3];
        for (int i = 0; i < 3; i++)
        {
            barrierThreads[i] = new Thread(BarrierTask);
            barrierThreads[i].Start($"Task{i + 1}");
        }
        foreach (var thread in barrierThreads)
            thread.Join();
        Console.WriteLine("\n=== SpinLock 示例 ===");
        Thread[] spinThreads = new Thread[4];
        for (int i = 0; i < 4; i++)
        {
            spinThreads[i] = new Thread(SpinLockTask);
            spinThreads[i].Start($"SpinTask{i + 1}");
        }
        foreach (var thread in spinThreads)
            thread.Join();
        Console.WriteLine($"\n最終計(jì)數(shù)器值: {spinLockCounter}");
    }
}

六、完整示例程序

using System;
using System.Threading;
public class Program
{
    public static void Main(string[] args)
    {
        while (true)
        {
            Console.Clear();
            Console.WriteLine("===== .NET Framework 4.8 多線程編程示例 =====");
            Console.WriteLine("1. 基礎(chǔ)線程操作");
            Console.WriteLine("2. 線程控制(暫停/繼續(xù)/停止)");
            Console.WriteLine("3. 共享內(nèi)存通信");
            Console.WriteLine("4. 事件通信機(jī)制");
            Console.WriteLine("5. 信號(hào)量和互斥量");
            Console.WriteLine("6. 線程同步(Lock/Monitor)");
            Console.WriteLine("7. 讀寫鎖示例");
            Console.WriteLine("8. 線程安全集合");
            Console.WriteLine("9. 高級(jí)同步(Barrier/SpinLock)");
            Console.WriteLine("0. 退出");
            Console.Write("\n請(qǐng)選擇示例 (0-9): ");
            string choice = Console.ReadLine();
            Console.Clear();
            switch (choice)
            {
                case "1":
                    ThreadLifecycleExample.BasicThreadExample();
                    break;
                case "2":
                    ThreadControlExample.ThreadControlDemo();
                    break;
                case "3":
                    SharedMemoryCommunication.RunSharedMemoryExample();
                    break;
                case "4":
                    EventCommunication.RunEventExample();
                    break;
                case "5":
                    SemaphoreCommunication.RunSemaphoreExample();
                    break;
                case "6":
                    ThreadSynchronization.RunSynchronizationExample();
                    break;
                case "7":
                    ReaderWriterExample.RunReaderWriterExample();
                    break;
                case "8":
                    ThreadSafeCollections.RunThreadSafeCollectionExample();
                    break;
                case "9":
                    AdvancedSynchronization.RunAdvancedSynchronizationExample();
                    break;
                case "0":
                    return;
                default:
                    Console.WriteLine("無(wú)效選擇");
                    break;
            }
            Console.WriteLine("\n按任意鍵繼續(xù)...");
            Console.ReadKey();
        }
    }
}

七、最佳實(shí)踐與注意事項(xiàng)

1. 線程安全最佳實(shí)踐

  • 優(yōu)先使用高級(jí)同步原語(yǔ):Task、async/await比直接使用Thread更安全
  • 避免死鎖:始終以相同的順序獲取多個(gè)鎖
  • 最小化鎖的范圍:只在必要的代碼段使用鎖
  • 使用不可變對(duì)象:減少同步需求
  • 優(yōu)先使用線程安全集合:ConcurrentCollection系列

2. 性能優(yōu)化建議

  • 使用線程池:避免頻繁創(chuàng)建銷毀線程
  • 合理設(shè)置線程數(shù)量:通常為CPU核心數(shù)的1-2倍
  • 避免過(guò)度同步:評(píng)估是否真的需要線程保護(hù)
  • 使用SpinLock處理短時(shí)間鎖定:減少上下文切換
  • 使用ReaderWriterLock處理讀多寫少場(chǎng)景

3. 避免的常見錯(cuò)誤

  • 不要使用Thread.Abort():可能導(dǎo)致資源泄漏
  • 避免使用Thread.Suspend/Resume:已過(guò)時(shí)且不安全
  • 不要忽略線程異常:使用try-catch保護(hù)線程代碼
  • 避免在鎖內(nèi)調(diào)用未知代碼:可能導(dǎo)致死鎖
  • 不要在構(gòu)造函數(shù)中啟動(dòng)線程:對(duì)象可能未完全初始化

4. 調(diào)試技巧

// 使用線程名稱便于調(diào)試
Thread.CurrentThread.Name = "WorkerThread";
// 使用Trace輸出線程信息
System.Diagnostics.Trace.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: Starting work");
// 使用條件斷點(diǎn)調(diào)試特定線程
if (Thread.CurrentThread.Name == "SpecificThread")
{
    System.Diagnostics.Debugger.Break();
}

八、異步編程(Async/Await)

1. Task基礎(chǔ)

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using System.IO;
public class TaskBasics
{
    // 創(chuàng)建和運(yùn)行Task的不同方式
    public static void TaskCreationExamples()
    {
        Console.WriteLine("=== Task創(chuàng)建示例 ===");
        // 方式1:使用Task.Run
        Task task1 = Task.Run(() =>
        {
            Console.WriteLine($"Task1 運(yùn)行在線程 {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(1000);
            Console.WriteLine("Task1 完成");
        });
        // 方式2:使用Task.Factory.StartNew(更多控制選項(xiàng))
        Task task2 = Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"Task2 運(yùn)行在線程 {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(500);
        }, CancellationToken.None, 
           TaskCreationOptions.LongRunning, 
           TaskScheduler.Default);
        // 方式3:使用new Task(需要手動(dòng)啟動(dòng))
        Task task3 = new Task(() =>
        {
            Console.WriteLine($"Task3 運(yùn)行在線程 {Thread.CurrentThread.ManagedThreadId}");
        });
        task3.Start();
        // 方式4:返回結(jié)果的Task
        Task<int> task4 = Task.Run(() =>
        {
            Console.WriteLine("Task4 計(jì)算中...");
            Thread.Sleep(1000);
            return 42;
        });
        // 等待所有任務(wù)完成
        Task.WaitAll(task1, task2, task3, task4);
        Console.WriteLine($"Task4 結(jié)果: {task4.Result}");
    }
    // Task的連續(xù)任務(wù)(ContinueWith)
    public static void TaskContinuationExample()
    {
        Console.WriteLine("\n=== Task連續(xù)任務(wù)示例 ===");
        Task<int> firstTask = Task.Run(() =>
        {
            Console.WriteLine("第一個(gè)任務(wù)執(zhí)行中...");
            Thread.Sleep(1000);
            return 10;
        });
        // 單個(gè)連續(xù)任務(wù)
        Task<int> continuationTask = firstTask.ContinueWith(antecedent =>
        {
            Console.WriteLine($"第一個(gè)任務(wù)結(jié)果: {antecedent.Result}");
            return antecedent.Result * 2;
        });
        // 多個(gè)連續(xù)任務(wù)鏈
        continuationTask
            .ContinueWith(t => 
            {
                Console.WriteLine($"第二個(gè)連續(xù)任務(wù),結(jié)果: {t.Result}");
                return t.Result + 5;
            })
            .ContinueWith(t => 
            {
                Console.WriteLine($"最終結(jié)果: {t.Result}");
            });
        // 條件連續(xù)任務(wù)
        Task faultedTask = Task.Run(() => 
        {
            throw new Exception("任務(wù)失敗");
        });
        faultedTask.ContinueWith(t =>
        {
            Console.WriteLine($"任務(wù)失敗: {t.Exception?.GetBaseException().Message}");
        }, TaskContinuationOptions.OnlyOnFaulted);
        faultedTask.ContinueWith(t =>
        {
            Console.WriteLine("任務(wù)成功完成");
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
        Thread.Sleep(3000);
    }
    // Task異常處理
    public static void TaskExceptionHandling()
    {
        Console.WriteLine("\n=== Task異常處理示例 ===");
        // 方式1:使用Wait或Result捕獲異常
        Task taskWithError = Task.Run(() =>
        {
            throw new InvalidOperationException("任務(wù)中的異常");
        });
        try
        {
            taskWithError.Wait();
        }
        catch (AggregateException ae)
        {
            foreach (var ex in ae.InnerExceptions)
            {
                Console.WriteLine($"捕獲異常: {ex.Message}");
            }
        }
        // 方式2:使用ContinueWith處理異常
        Task.Run(() =>
        {
            throw new Exception("另一個(gè)異常");
        }).ContinueWith(t =>
        {
            if (t.IsFaulted)
            {
                Console.WriteLine($"任務(wù)失敗: {t.Exception?.Flatten().InnerException?.Message}");
            }
        });
        Thread.Sleep(1000);
    }
}

2. Async/Await模式

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Diagnostics;
public class AsyncAwaitPatterns
{
    // 基礎(chǔ)async/await使用
    public static async Task BasicAsyncExample()
    {
        Console.WriteLine("=== 基礎(chǔ)Async/Await示例 ===");
        Console.WriteLine($"開始方法,線程: {Thread.CurrentThread.ManagedThreadId}");
        // 異步延遲
        await Task.Delay(1000);
        Console.WriteLine($"延遲后,線程: {Thread.CurrentThread.ManagedThreadId}");
        // 異步運(yùn)行CPU密集型任務(wù)
        int result = await Task.Run(() =>
        {
            Console.WriteLine($"Task.Run內(nèi)部,線程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(500);
            return 42;
        });
        Console.WriteLine($"結(jié)果: {result}");
    }
    // 異步方法返回值
    public static async Task<string> GetDataAsync(string input)
    {
        Console.WriteLine($"獲取數(shù)據(jù): {input}");
        await Task.Delay(1000);
        return $"處理結(jié)果: {input.ToUpper()}";
    }
    // 并發(fā)執(zhí)行多個(gè)異步操作
    public static async Task ConcurrentAsyncOperations()
    {
        Console.WriteLine("\n=== 并發(fā)異步操作示例 ===");
        Stopwatch sw = Stopwatch.StartNew();
        // 順序執(zhí)行(較慢)
        string result1 = await GetDataAsync("first");
        string result2 = await GetDataAsync("second");
        string result3 = await GetDataAsync("third");
        sw.Stop();
        Console.WriteLine($"順序執(zhí)行耗時(shí): {sw.ElapsedMilliseconds}ms");
        sw.Restart();
        // 并發(fā)執(zhí)行(較快)
        Task<string> task1 = GetDataAsync("first");
        Task<string> task2 = GetDataAsync("second");
        Task<string> task3 = GetDataAsync("third");
        string[] results = await Task.WhenAll(task1, task2, task3);
        sw.Stop();
        Console.WriteLine($"并發(fā)執(zhí)行耗時(shí): {sw.ElapsedMilliseconds}ms");
        Console.WriteLine($"結(jié)果: {string.Join(", ", results)}");
    }
    // 異步流處理(使用IAsyncEnumerable需要.NET Core 3.0+,這里使用Task<IEnumerable>模擬)
    public static async Task<IEnumerable<int>> GetNumbersAsync()
    {
        List<int> numbers = new List<int>();
        for (int i = 0; i < 5; i++)
        {
            await Task.Delay(200);
            numbers.Add(i);
            Console.WriteLine($"生成數(shù)字: {i}");
        }
        return numbers;
    }
    // 異步異常處理
    public static async Task AsyncExceptionHandling()
    {
        Console.WriteLine("\n=== 異步異常處理示例 ===");
        try
        {
            await ThrowExceptionAsync();
        }
        catch (InvalidOperationException ex)
        {
            Console.WriteLine($"捕獲異步異常: {ex.Message}");
        }
        // 處理多個(gè)任務(wù)的異常
        Task failedTask1 = ThrowExceptionAsync();
        Task failedTask2 = ThrowExceptionAsync();
        Task successTask = Task.Delay(100);
        try
        {
            await Task.WhenAll(failedTask1, failedTask2, successTask);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"WhenAll異常: {ex.Message}");
            // 獲取所有異常
            if (failedTask1.IsFaulted)
                Console.WriteLine($"Task1異常: {failedTask1.Exception?.GetBaseException().Message}");
            if (failedTask2.IsFaulted)
                Console.WriteLine($"Task2異常: {failedTask2.Exception?.GetBaseException().Message}");
        }
    }
    private static async Task ThrowExceptionAsync()
    {
        await Task.Delay(100);
        throw new InvalidOperationException("異步方法中的異常");
    }
    // 取消異步操作
    public static async Task CancellationExample()
    {
        Console.WriteLine("\n=== 異步取消示例 ===");
        CancellationTokenSource cts = new CancellationTokenSource();
        // 設(shè)置超時(shí)
        cts.CancelAfter(2000);
        try
        {
            await LongRunningOperationAsync(cts.Token);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("操作被取消");
        }
        // 手動(dòng)取消
        CancellationTokenSource manualCts = new CancellationTokenSource();
        Task longTask = LongRunningOperationAsync(manualCts.Token);
        await Task.Delay(500);
        Console.WriteLine("手動(dòng)取消操作...");
        manualCts.Cancel();
        try
        {
            await longTask;
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("操作被手動(dòng)取消");
        }
    }
    private static async Task LongRunningOperationAsync(CancellationToken cancellationToken)
    {
        for (int i = 0; i < 10; i++)
        {
            cancellationToken.ThrowIfCancellationRequested();
            Console.WriteLine($"工作進(jìn)度: {i + 1}/10");
            await Task.Delay(500, cancellationToken);
        }
        Console.WriteLine("操作完成");
    }
}

3. 高級(jí)異步模式

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Collections.Generic;
using System.Linq;
public class AdvancedAsyncPatterns
{
    // 使用TaskCompletionSource創(chuàng)建自定義異步操作
    public static Task<int> CreateCustomAsyncOperation()
    {
        TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
        // 模擬異步操作
        Timer timer = null;
        timer = new Timer(_ =>
        {
            tcs.SetResult(42);
            timer?.Dispose();
        }, null, 1000, Timeout.Infinite);
        return tcs.Task;
    }
    // 異步信號(hào)量(SemaphoreSlim)
    private static SemaphoreSlim semaphore = new SemaphoreSlim(2, 2);
    public static async Task AsyncSemaphoreExample()
    {
        Console.WriteLine("=== 異步信號(hào)量示例 ===");
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(async () =>
            {
                await semaphore.WaitAsync();
                try
                {
                    Console.WriteLine($"任務(wù) {taskId} 進(jìn)入臨界區(qū)");
                    await Task.Delay(2000);
                    Console.WriteLine($"任務(wù) {taskId} 離開臨界區(qū)");
                }
                finally
                {
                    semaphore.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
    }
    // 異步鎖(AsyncLock實(shí)現(xiàn))
    public class AsyncLock
    {
        private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
        public async Task<IDisposable> LockAsync()
        {
            await semaphore.WaitAsync();
            return new LockReleaser(semaphore);
        }
        private class LockReleaser : IDisposable
        {
            private readonly SemaphoreSlim semaphore;
            public LockReleaser(SemaphoreSlim semaphore)
            {
                this.semaphore = semaphore;
            }
            public void Dispose()
            {
                semaphore.Release();
            }
        }
    }
    // 使用異步鎖
    private static AsyncLock asyncLock = new AsyncLock();
    private static int sharedResource = 0;
    public static async Task AsyncLockExample()
    {
        Console.WriteLine("\n=== 異步鎖示例 ===");
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < 10; i++)
        {
            tasks.Add(Task.Run(async () =>
            {
                using (await asyncLock.LockAsync())
                {
                    int temp = sharedResource;
                    await Task.Delay(10);
                    sharedResource = temp + 1;
                    Console.WriteLine($"共享資源值: {sharedResource}");
                }
            }));
        }
        await Task.WhenAll(tasks);
        Console.WriteLine($"最終值: {sharedResource}");
    }
    // 異步生產(chǎn)者-消費(fèi)者模式
    public static async Task ProducerConsumerAsync()
    {
        Console.WriteLine("\n=== 異步生產(chǎn)者-消費(fèi)者模式 ===");
        // 使用Channel作為隊(duì)列(.NET Core 3.0+)
        // 這里使用BlockingCollection模擬
        var queue = new System.Collections.Concurrent.BlockingCollection<int>(10);
        // 生產(chǎn)者
        Task producer = Task.Run(async () =>
        {
            for (int i = 0; i < 20; i++)
            {
                queue.Add(i);
                Console.WriteLine($"生產(chǎn): {i}");
                await Task.Delay(100);
            }
            queue.CompleteAdding();
        });
        // 消費(fèi)者
        Task consumer = Task.Run(async () =>
        {
            while (!queue.IsCompleted)
            {
                if (queue.TryTake(out int item, 100))
                {
                    Console.WriteLine($"消費(fèi): {item}");
                    await Task.Delay(200);
                }
            }
        });
        await Task.WhenAll(producer, consumer);
    }
    // 批量異步操作with限流
    public static async Task ThrottledAsyncOperations()
    {
        Console.WriteLine("\n=== 限流異步操作示例 ===");
        // 要處理的數(shù)據(jù)
        List<int> data = Enumerable.Range(1, 20).ToList();
        // 最大并發(fā)數(shù)
        int maxConcurrency = 3;
        SemaphoreSlim throttler = new SemaphoreSlim(maxConcurrency);
        List<Task> tasks = new List<Task>();
        foreach (var item in data)
        {
            tasks.Add(Task.Run(async () =>
            {
                await throttler.WaitAsync();
                try
                {
                    Console.WriteLine($"開始處理: {item}");
                    await ProcessItemAsync(item);
                    Console.WriteLine($"完成處理: {item}");
                }
                finally
                {
                    throttler.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
    }
    private static async Task ProcessItemAsync(int item)
    {
        await Task.Delay(1000);
    }
}

4. 并行編程(Parallel類和PLINQ)

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Collections.Concurrent;
using System.Diagnostics;
public class ParallelProgramming
{
    // Parallel.For和Parallel.ForEach
    public static void ParallelLoops()
    {
        Console.WriteLine("=== Parallel循環(huán)示例 ===");
        // Parallel.For
        Console.WriteLine("\nParallel.For示例:");
        Parallel.For(0, 10, i =>
        {
            Console.WriteLine($"處理索引 {i}, 線程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
        // Parallel.ForEach
        Console.WriteLine("\nParallel.ForEach示例:");
        List<string> items = new List<string> { "A", "B", "C", "D", "E", "F", "G", "H" };
        Parallel.ForEach(items, item =>
        {
            Console.WriteLine($"處理項(xiàng)目 {item}, 線程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(200);
        });
        // 帶有ParallelOptions的控制
        Console.WriteLine("\n帶選項(xiàng)的Parallel.ForEach:");
        ParallelOptions options = new ParallelOptions
        {
            MaxDegreeOfParallelism = 2,
            CancellationToken = CancellationToken.None
        };
        Parallel.ForEach(items, options, item =>
        {
            Console.WriteLine($"限制并發(fā)處理 {item}, 線程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(500);
        });
    }
    // Parallel.Invoke
    public static void ParallelInvoke()
    {
        Console.WriteLine("\n=== Parallel.Invoke示例 ===");
        Parallel.Invoke(
            () => DoWork("任務(wù)1", 1000),
            () => DoWork("任務(wù)2", 1500),
            () => DoWork("任務(wù)3", 800),
            () => DoWork("任務(wù)4", 1200)
        );
        Console.WriteLine("所有并行任務(wù)完成");
    }
    private static void DoWork(string taskName, int delay)
    {
        Console.WriteLine($"{taskName} 開始, 線程: {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(delay);
        Console.WriteLine($"{taskName} 完成");
    }
    // PLINQ(并行LINQ)
    public static void PLINQExamples()
    {
        Console.WriteLine("\n=== PLINQ示例 ===");
        // 生成測(cè)試數(shù)據(jù)
        List<int> numbers = Enumerable.Range(1, 100).ToList();
        // 基本PLINQ查詢
        var parallelQuery = numbers
            .AsParallel()
            .Where(n => n % 2 == 0)
            .Select(n => n * n)
            .ToList();
        Console.WriteLine($"偶數(shù)平方結(jié)果數(shù)量: {parallelQuery.Count}");
        // 控制并行度
        var controlledQuery = numbers
            .AsParallel()
            .WithDegreeOfParallelism(2)
            .Where(n => IsPrime(n))
            .ToList();
        Console.WriteLine($"質(zhì)數(shù)數(shù)量: {controlledQuery.Count}");
        // 保持順序
        var orderedQuery = numbers
            .AsParallel()
            .AsOrdered()
            .Where(n => n > 50)
            .Select(n => n * 2)
            .Take(10)
            .ToList();
        Console.WriteLine($"有序結(jié)果: {string.Join(", ", orderedQuery)}");
        // 聚合操作
        int sum = numbers
            .AsParallel()
            .Where(n => n % 3 == 0)
            .Sum();
        Console.WriteLine($"能被3整除的數(shù)之和: {sum}");
        // 性能比較
        Stopwatch sw = Stopwatch.StartNew();
        // 順序執(zhí)行
        var sequentialResult = numbers
            .Where(n => ExpensiveOperation(n))
            .ToList();
        sw.Stop();
        Console.WriteLine($"順序執(zhí)行時(shí)間: {sw.ElapsedMilliseconds}ms");
        sw.Restart();
        // 并行執(zhí)行
        var parallelResult = numbers
            .AsParallel()
            .Where(n => ExpensiveOperation(n))
            .ToList();
        sw.Stop();
        Console.WriteLine($"并行執(zhí)行時(shí)間: {sw.ElapsedMilliseconds}ms");
    }
    private static bool IsPrime(int number)
    {
        if (number <= 1) return false;
        if (number == 2) return true;
        if (number % 2 == 0) return false;
        for (int i = 3; i * i <= number; i += 2)
        {
            if (number % i == 0) return false;
        }
        return true;
    }
    private static bool ExpensiveOperation(int number)
    {
        Thread.Sleep(10);
        return number % 7 == 0;
    }
    // 并行聚合
    public static void ParallelAggregation()
    {
        Console.WriteLine("\n=== 并行聚合示例 ===");
        int[] numbers = Enumerable.Range(1, 1000000).ToArray();
        object lockObj = new object();
        double total = 0;
        // 使用Parallel.ForEach with local state
        Parallel.ForEach(
            numbers,
            () => 0.0, // 局部初始值
            (number, loop, localTotal) => // 局部計(jì)算
            {
                return localTotal + Math.Sqrt(number);
            },
            localTotal => // 合并局部結(jié)果
            {
                lock (lockObj)
                {
                    total += localTotal;
                }
            }
        );
        Console.WriteLine($"并行聚合結(jié)果: {total:F2}");
        // 使用PLINQ聚合
        double plinqTotal = numbers
            .AsParallel()
            .Select(n => Math.Sqrt(n))
            .Sum();
        Console.WriteLine($"PLINQ聚合結(jié)果: {plinqTotal:F2}");
    }
}

5. 數(shù)據(jù)流(Dataflow)和管道模式

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Collections.Generic;
public class DataflowPipeline
{
    // 簡(jiǎn)單的數(shù)據(jù)流管道
    public static async Task SimpleDataflowExample()
    {
        Console.WriteLine("=== 數(shù)據(jù)流管道示例 ===");
        // 創(chuàng)建數(shù)據(jù)流塊
        var transformBlock = new TransformBlock<int, string>(
            async n =>
            {
                await Task.Delay(100);
                Console.WriteLine($"轉(zhuǎn)換: {n} -> {n * n}");
                return $"Result: {n * n}";
            },
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 2
            }
        );
        var actionBlock = new ActionBlock<string>(
            async s =>
            {
                await Task.Delay(50);
                Console.WriteLine($"處理: {s}");
            }
        );
        // 鏈接塊
        transformBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
        // 發(fā)送數(shù)據(jù)
        for (int i = 1; i <= 10; i++)
        {
            await transformBlock.SendAsync(i);
        }
        // 標(biāo)記完成
        transformBlock.Complete();
        // 等待完成
        await actionBlock.Completion;
        Console.WriteLine("數(shù)據(jù)流管道完成");
    }
    // 復(fù)雜的數(shù)據(jù)處理管道
    public static async Task ComplexPipelineExample()
    {
        Console.WriteLine("\n=== 復(fù)雜數(shù)據(jù)處理管道 ===");
        // 廣播塊(一對(duì)多)
        var broadcastBlock = new BroadcastBlock<int>(n => n);
        // 批處理塊
        var batchBlock = new BatchBlock<int>(3);
        // 轉(zhuǎn)換塊
        var multiplyBlock = new TransformBlock<int, int>(n => n * 2);
        var addBlock = new TransformBlock<int, int>(n => n + 100);
        // 合并塊
        var joinBlock = new JoinBlock<int, int>();
        // 動(dòng)作塊
        var finalBlock = new ActionBlock<Tuple<int, int>>(
            tuple => Console.WriteLine($"結(jié)果: {tuple.Item1}, {tuple.Item2}")
        );
        var batchPrintBlock = new ActionBlock<int[]>(
            batch => Console.WriteLine($"批次: [{string.Join(", ", batch)}]")
        );
        // 構(gòu)建管道
        broadcastBlock.LinkTo(multiplyBlock);
        broadcastBlock.LinkTo(addBlock);
        broadcastBlock.LinkTo(batchBlock);
        multiplyBlock.LinkTo(joinBlock.Target1);
        addBlock.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(finalBlock);
        batchBlock.LinkTo(batchPrintBlock);
        // 發(fā)送數(shù)據(jù)
        for (int i = 1; i <= 9; i++)
        {
            await broadcastBlock.SendAsync(i);
        }
        broadcastBlock.Complete();
        await Task.Delay(2000);
    }
}

6. 異步編程最佳實(shí)踐

using System;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using System.Net.Http;
public class AsyncBestPractices
{
    // 避免async void(除了事件處理器)
    // 錯(cuò)誤示例
    public async void BadAsyncMethod()
    {
        await Task.Delay(1000);
        throw new Exception("無(wú)法捕獲的異常");
    }
    // 正確示例
    public async Task GoodAsyncMethod()
    {
        await Task.Delay(1000);
        throw new Exception("可以捕獲的異常");
    }
    // 配置await
    public async Task ConfigureAwaitExample()
    {
        // 不需要捕獲上下文時(shí)使用ConfigureAwait(false)
        await Task.Delay(1000).ConfigureAwait(false);
        // 這在庫(kù)代碼中特別重要,可以提高性能
        await SomeLibraryMethodAsync().ConfigureAwait(false);
    }
    private Task SomeLibraryMethodAsync() => Task.CompletedTask;
    // 避免阻塞異步代碼
    public class AvoidBlockingExample
    {
        // 錯(cuò)誤:可能導(dǎo)致死鎖
        public void BadExample()
        {
            var result = GetDataAsync().Result; // 避免這樣做
            GetDataAsync().Wait(); // 也要避免這樣
        }
        // 正確:使用async/await
        public async Task GoodExample()
        {
            var result = await GetDataAsync();
        }
        private async Task<string> GetDataAsync()
        {
            await Task.Delay(100);
            return "data";
        }
    }
    // 使用ValueTask優(yōu)化性能
    public async ValueTask<int> GetCachedValueAsync()
    {
        // 如果有緩存值,直接返回(沒有分配)
        if (_cache.ContainsKey("key"))
            return _cache["key"];
        // 否則異步獲取
        var value = await ComputeValueAsync();
        _cache["key"] = value;
        return value;
    }
    private Dictionary<string, int> _cache = new Dictionary<string, int>();
    private async Task<int> ComputeValueAsync()
    {
        await Task.Delay(1000);
        return 42;
    }
    // 正確處理多個(gè)異步操作
    public async Task HandleMultipleOperationsCorrectly()
    {
        // 錯(cuò)誤:順序等待(慢)
        var result1 = await Operation1Async();
        var result2 = await Operation2Async();
        var result3 = await Operation3Async();
        // 正確:并發(fā)執(zhí)行(快)
        var task1 = Operation1Async();
        var task2 = Operation2Async();
        var task3 = Operation3Async();
        await Task.WhenAll(task1, task2, task3);
        var results = new[] { task1.Result, task2.Result, task3.Result };
    }
    private Task<int> Operation1Async() => Task.FromResult(1);
    private Task<int> Operation2Async() => Task.FromResult(2);
    private Task<int> Operation3Async() => Task.FromResult(3);
    // 使用IAsyncDisposable(.NET Standard 2.1+)
    public class AsyncDisposableExample : IDisposable
    {
        private readonly HttpClient httpClient = new HttpClient();
        private readonly FileStream fileStream;
        public AsyncDisposableExample(string filePath)
        {
            fileStream = new FileStream(filePath, FileMode.Create);
        }
        public async Task WriteAsync(byte[] data)
        {
            await fileStream.WriteAsync(data, 0, data.Length);
        }
        // 異步清理資源
        public async ValueTask DisposeAsync()
        {
            if (fileStream != null)
            {
                await fileStream.FlushAsync();
                fileStream.Dispose();
            }
            httpClient?.Dispose();
        }
        public void Dispose()
        {
            fileStream?.Dispose();
            httpClient?.Dispose();
        }
    }
}

7. 完整的異步編程示例程序

public class AsyncProgrammingDemo
{
    public static async Task Main(string[] args)
    {
        while (true)
        {
            Console.Clear();
            Console.WriteLine("===== .NET異步編程示例 =====");
            Console.WriteLine("1. Task基礎(chǔ)操作");
            Console.WriteLine("2. Task連續(xù)和異常處理");
            Console.WriteLine("3. 基礎(chǔ)Async/Await");
            Console.WriteLine("4. 并發(fā)異步操作");
            Console.WriteLine("5. 異步取消操作");
            Console.WriteLine("6. 高級(jí)異步模式");
            Console.WriteLine("7. Parallel循環(huán)");
            Console.WriteLine("8. PLINQ示例");
            Console.WriteLine("9. 數(shù)據(jù)流管道");
            Console.WriteLine("0. 返回主菜單");
            Console.Write("\n請(qǐng)選擇 (0-9): ");
            string choice = Console.ReadLine();
            Console.Clear();
            try
            {
                switch (choice)
                {
                    case "1":
                        TaskBasics.TaskCreationExamples();
                        break;
                    case "2":
                        TaskBasics.TaskContinuationExample();
                        TaskBasics.TaskExceptionHandling();
                        break;
                    case "3":
                        await AsyncAwaitPatterns.BasicAsyncExample();
                        break;
                    case "4":
                        await AsyncAwaitPatterns.ConcurrentAsyncOperations();
                        break;
                    case "5":
                        await AsyncAwaitPatterns.CancellationExample();
                        break;
                    case "6":
                        await AdvancedAsyncPatterns.AsyncSemaphoreExample();
                        await AdvancedAsyncPatterns.AsyncLockExample();
                        break;
                    case "7":
                        ParallelProgramming.ParallelLoops();
                        ParallelProgramming.ParallelInvoke();
                        break;
                    case "8":
                        ParallelProgramming.PLINQExamples();
                        break;
                    case "9":
                        await DataflowPipeline.SimpleDataflowExample();
                        await DataflowPipeline.ComplexPipelineExample();
                        break;
                    case "0":
                        return;
                    default:
                        Console.WriteLine("無(wú)效選擇");
                        break;
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"發(fā)生錯(cuò)誤: {ex.Message}");
            }
            Console.WriteLine("\n按任意鍵繼續(xù)...");
            Console.ReadKey();
        }
    }
}

總結(jié)

.NET Framework 4.8提供了豐富的多線程和異步編程工具:

傳統(tǒng)多線程編程

  1. 基礎(chǔ)控制:Thread類提供了線程的創(chuàng)建、啟動(dòng)、暫停、繼續(xù)和停止
  2. 線程間通信:通過(guò)共享內(nèi)存、事件、信號(hào)量等多種機(jī)制實(shí)現(xiàn)
  3. 線程保護(hù):Lock、Monitor、Mutex等確保線程安全
  4. 高級(jí)特性:Barrier、SpinLock、線程安全集合等提供更精細(xì)的控制

現(xiàn)代異步編程

  1. Task和Task:更高級(jí)的線程抽象,支持連續(xù)任務(wù)和異常處理
  2. Async/Await:簡(jiǎn)化異步編程,使代碼更易讀寫和維護(hù)
  3. 并行編程:Parallel類和PLINQ提供數(shù)據(jù)并行和任務(wù)并行
  4. 數(shù)據(jù)流:TPL Dataflow提供構(gòu)建復(fù)雜數(shù)據(jù)處理管道的能力
  5. 異步同步原語(yǔ):SemaphoreSlim、異步鎖等支持異步場(chǎng)景

選擇建議

  • 優(yōu)先使用Task和async/await:更現(xiàn)代、更安全、性能更好
  • 使用Thread類的場(chǎng)景:需要精細(xì)控制線程屬性或與舊代碼集成
  • 使用Parallel和PLINQ:處理數(shù)據(jù)并行和CPU密集型任務(wù)
  • 使用Dataflow:構(gòu)建復(fù)雜的數(shù)據(jù)處理管道

選擇合適的技術(shù)對(duì)于構(gòu)建高效、可靠的多線程應(yīng)用程序至關(guān)重要。建議從async/await開始,逐步掌握各種并發(fā)編程模式。

到此這篇關(guān)于.NET Framework 4.8 多線程編程的文章就介紹到這了,更多相關(guān).NET Framework多線程編程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論