.NET Core 中的并發(fā)編程
并發(fā)編程 - 異步 vs. 多線程代碼
并行編程是一個(gè)廣泛的術(shù)語(yǔ),我們應(yīng)該通過觀察異步方法和實(shí)際的多線程之間的差異展開探討。 盡管 .NET Core 使用了任務(wù)來(lái)表達(dá)同樣的概念,一個(gè)關(guān)鍵的差異是內(nèi)部處理的不同。 調(diào)用線程在做其他事情時(shí),異步方法在后臺(tái)運(yùn)行。這意味著這些方法是 I/O 密集型的,即他們大部分時(shí)間用于輸入和輸出操作,例如文件或網(wǎng)絡(luò)訪問。 只要有可能,使用異步 I/O 方法代替同步操作很有意義。相同的時(shí)間,調(diào)用線程可以在處理桌面應(yīng)用程序中的用戶交互或處理服務(wù)器應(yīng)用程序中的同時(shí)處理其他請(qǐng)求,而不僅僅是等待操作完成。
計(jì)算密集型的方法要求 CPU 周期工作,并且只能運(yùn)行在他們專用的后臺(tái)線程中。CPU 的核心數(shù)限制了并行運(yùn)行時(shí)的可用線程數(shù)量。操作系統(tǒng)負(fù)責(zé)在剩余的線程之間切換,使他們有機(jī)會(huì)執(zhí)行代碼。 這些方法仍然被并發(fā)地執(zhí)行,卻不必被并行地執(zhí)行。盡管這意味著方法不是同時(shí)執(zhí)行,卻可以在其他方法暫停的時(shí)候執(zhí)行。
并行 vs 并發(fā)
本文將在最后一段中重點(diǎn)介紹 在 .NET Core中多線程并發(fā)編程。
任務(wù)并行庫(kù)
.NET Framework 4 引入了任務(wù)并行庫(kù) (TPL) 作為編寫并發(fā)代碼的首選 API。.NET Core采用相同的編程模式。 要在后臺(tái)運(yùn)行一段代碼,需要將其包裝成一個(gè) 任務(wù):
var backgroundTask = Task.Run(() => DoComplexCalculation(42)); // do other work var result = backgroundTask.Result;
當(dāng)需要返回結(jié)果時(shí),Task.Run 方法接收一個(gè) 函數(shù) (Func) ;當(dāng)不需要返回結(jié)果時(shí),方法 Task.Run 接收一個(gè) 動(dòng)作 (Action) 。當(dāng)然,所有的情況下都可以使用 lambda 表達(dá)式,就像我上面例子中調(diào)用帶一個(gè)參數(shù)的長(zhǎng)時(shí)間方法。 線程池中的某個(gè)線程將會(huì)處理任務(wù)。.NET Core 的運(yùn)行時(shí)包含一個(gè)默認(rèn)調(diào)度程序,使用線程池來(lái)處理隊(duì)列并執(zhí)行任務(wù)。您可以通過派生 TaskScheduler 類實(shí)現(xiàn)自己的調(diào)度算法,代替默認(rèn)的,但這超過本文的討論范圍。 正如我們之前所見,我使用 Result 屬性來(lái)合并被調(diào)用的后臺(tái)線程。對(duì)于不需要返回結(jié)果的線程,我可以調(diào)用 Wait() 來(lái)代替。這兩種方式都將被堵塞到后臺(tái)任務(wù)完成。 為了避免堵塞調(diào)用線程 ( 如在ASP.NET Core應(yīng)用程序中) ,可以使用 await 關(guān)鍵字:
var backgroundTask = Task.Run(() => DoComplexCalculation(42)); // do other work var result = await backgroundTask;
這樣被調(diào)用的線程將被釋放以便處理其他傳入請(qǐng)求。一旦任務(wù)完成,一個(gè)可用的工作線程將會(huì)繼續(xù)處理請(qǐng)求。當(dāng)然,控制器動(dòng)作方法必須是異步的:
public async Task<iactionresult> Index() { // method body }
處理異常
將兩個(gè)線程合并在一起的時(shí)候,任務(wù)拋出的任何異常將被傳遞到調(diào)用線程中:
如果使用 Result 或 Wait() ,它們將被打包到 AggregateException 中。實(shí)際的異常將被拋出并存儲(chǔ)在其 InnerException 屬性中。
如果您使用 await,原來(lái)的異常將不會(huì)被打包。
在這兩種情況下,調(diào)用堆棧的信息將保持不變。
取消任務(wù)
由于任務(wù)是可以長(zhǎng)時(shí)間運(yùn)行的,所以你可能想要有一個(gè)可以提前取消任務(wù)的選項(xiàng)。實(shí)現(xiàn)這個(gè)選項(xiàng),需要在任務(wù)創(chuàng)建的時(shí)候傳入取消的令牌 (token),之后再使用令牌觸發(fā)取消任務(wù):
var tokenSource = new CancellationTokenSource(); var cancellableTask = Task.Run(() => { for (int i = 0; i < 100; i++) { if (tokenSource.Token.IsCancellationRequested) { // clean up before exiting tokenSource.Token.ThrowIfCancellationRequested(); } // do long-running processing } return 42; }, tokenSource.Token); // cancel the task tokenSource.Cancel(); try { await cancellableTask; } catch (OperationCanceledException e) { // handle the exception }
實(shí)際上,為了提前取消任務(wù),你需要檢查任務(wù)中的取消令牌,并在需要取消的時(shí)候作出反應(yīng):在執(zhí)行必要的清理操作后,調(diào)用 ThrowIfCancellationRequested()
退出任務(wù)。這個(gè)方法將會(huì)拋出 OperationCanceledException
,以便在調(diào)用線程中執(zhí)行相應(yīng)的處理。
協(xié)調(diào)多任務(wù)
如果你需要運(yùn)行多個(gè)后臺(tái)任務(wù),這里有些方法可以幫助到你。 要同時(shí)運(yùn)行多個(gè)任務(wù),只需連續(xù)啟動(dòng)它們并收集它們的引用,例如在數(shù)組中:
var backgroundTasks = new [] { Task.Run(() => DoComplexCalculation(1)), Task.Run(() => DoComplexCalculation(2)), Task.Run(() => DoComplexCalculation(3)) };
現(xiàn)在你可以使用 Task 類的靜態(tài)方法,等待他們被異步或者同步執(zhí)行完畢。
// wait synchronously Task.WaitAny(backgroundTasks); Task.WaitAll(backgroundTasks); // wait asynchronously await Task.WhenAny(backgroundTasks); await Task.WhenAll(backgroundTasks);
實(shí)際上,這兩個(gè)方法最終都會(huì)返回所有任務(wù)的自身,可以像任何其他任務(wù)一樣再次操作。為了獲取對(duì)應(yīng)任務(wù)的結(jié)果,你可以檢查該任務(wù)的 Result 屬性。 處理多任務(wù)的異常有點(diǎn)棘手。方法 WaitAll 和 WhenAll 不管哪個(gè)任務(wù)被收集到異常時(shí)都會(huì)拋出異常。不過,對(duì)于 WaitAll ,將會(huì)收集所有的異常到對(duì)應(yīng)的 InnerExceptions 屬性;對(duì)于 WhenAll ,只會(huì)拋出第一個(gè)異常。為了確認(rèn)哪個(gè)任務(wù)拋出了哪個(gè)異常,您需要單獨(dú)檢查每個(gè)任務(wù)的 Status 和 Exception 屬性。 在使用 WaitAny 和 WhenAny 時(shí)必須足夠小心。他們會(huì)等到第一個(gè)任務(wù)完成 (成功或失敗),即使某個(gè)任務(wù)出現(xiàn)異常時(shí)也不會(huì)拋出任何異常。他們只會(huì)返回已完成任務(wù)的索引或者分別返回已完成的任務(wù)。你必須等到任務(wù)完成或訪問其 result 屬性時(shí)捕獲異常,例如:
var completedTask = await Task.WhenAny(backgroundTasks); try { var result = await completedTask; } catch (Exception e) { // handle exception }
如果你想連續(xù)運(yùn)行多個(gè)任務(wù),代替并發(fā)任務(wù),可以使用延續(xù) (continuations)的方式:
var compositeTask = Task.Run(() => DoComplexCalculation(42)) .ContinueWith(previous => DoAnotherComplexCalculation(previous.Result), TaskContinuationOptions.OnlyOnRanToCompletion)
ContinueWith()
方法允許你把多個(gè)任務(wù)一個(gè)接著一個(gè)執(zhí)行。這個(gè)延續(xù)的任務(wù)將獲取到前面任務(wù)的結(jié)果或狀態(tài)的引用。 你仍然可以增加條件判斷是否執(zhí)行延續(xù)任務(wù),例如只有在前面任務(wù)成功執(zhí)行或者拋出異常時(shí)。對(duì)比連續(xù)等待多個(gè)任務(wù),提高了靈活性。 當(dāng)然,您可以將延續(xù)任務(wù)與之前討論的所有功能相結(jié)合:異常處理、取消和并行運(yùn)行任務(wù)。這就有了很大的表演空間,以不同的方式進(jìn)行組合:
var multipleTasks = new[] { Task.Run(() => DoComplexCalculation(1)), Task.Run(() => DoComplexCalculation(2)), Task.Run(() => DoComplexCalculation(3)) }; var combinedTask = Task.WhenAll(multipleTasks); var successfulContinuation = combinedTask.ContinueWith(task => CombineResults(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion); var failedContinuation = combinedTask.ContinueWith(task => HandleError(task.Exception), TaskContinuationOptions.NotOnRanToCompletion); await Task.WhenAny(successfulContinuation, failedContinuation);
任務(wù)同步
如果任務(wù)是完全獨(dú)立的,那么我們剛才看到的協(xié)調(diào)方法就已足夠。然而,一旦需要同時(shí)共享數(shù)據(jù),為了防止數(shù)據(jù)損壞,就必須要有額外的同步。 兩個(gè)以及更多的線程同時(shí)更新一個(gè)數(shù)據(jù)結(jié)構(gòu)時(shí),數(shù)據(jù)很快就會(huì)變得不一致。就好像下面這個(gè)示例代碼一樣:
var counters = new Dictionary< int, int >(); if (counters.ContainsKey(key)) { counters[key] ++; } else { counters[key] = 1; }
當(dāng)多個(gè)線程同時(shí)執(zhí)行上述代碼時(shí),不同線程中的特定順序執(zhí)行指令可能導(dǎo)致數(shù)據(jù)不正確,例如:
- 所有線程將會(huì)檢查集合中是否存在同一個(gè) key
- 結(jié)果,他們都會(huì)進(jìn)入 else 分支,并將這個(gè) key 的值設(shè)為1
- 最后結(jié)果將會(huì)是1,而不是2。如果是接連著執(zhí)行代碼的話,將會(huì)是預(yù)期的結(jié)果。
上述代碼中,臨界區(qū) (critical section) 一次只允許一個(gè)線程可以進(jìn)入。在C# 中,可以使用 lock 語(yǔ)句來(lái)實(shí)現(xiàn):
var counters = new Dictionary< int, int >(); lock (syncObject) { if (counters.ContainsKey(key)) { counters[key]++; } else { counters[key] = 1; } }
在這個(gè)方法中,所有線程都必須共享相同的的 syncObject 。作為最佳做法,syncObject 應(yīng)該是一個(gè)專用的 Object 實(shí)例,專門用于保護(hù)對(duì)一個(gè)獨(dú)立的臨界區(qū)的訪問,避免從外部訪問。 在 lock 語(yǔ)句中,只允許一個(gè)線程訪問里面的代碼塊。它將阻止下一個(gè)嘗試訪問它的線程,直到前一個(gè)線程退出。這將確保線程完整執(zhí)行臨界區(qū)代碼,而不會(huì)被另一個(gè)線程中斷。當(dāng)然,這將減少并行性并減慢代碼的整體執(zhí)行速度,因此您最好最小化臨界區(qū)的數(shù)量并使其盡可能的短。
使用 Monitor 類來(lái)簡(jiǎn)化 lock 聲明:
var lockWasTaken = false; var temp = syncObject; try { Monitor.Enter(temp, ref lockWasTaken); // lock statement body } finally { if (lockWasTaken) { Monitor.Exit(temp); } }
盡管大部分時(shí)間您都希望使用 lock 語(yǔ)句,但 Monitor 類可以在需要時(shí)給予額外的控制。例如,您可以使用 TryEnter() 而不是 Enter(),并指定一個(gè)限定時(shí)間,避免無(wú)止境地等待鎖釋放。
其他同步基元
Monitor 只是 .NET Core 中眾多同步基元的一員。根據(jù)實(shí)際情況,其他基元可能更適合。
Mutex 是 Monitor 更重量級(jí)的版本,依賴于底層的操作系統(tǒng),提供跨多個(gè)進(jìn)程同步訪問資源[1], 是針對(duì) Mutex 進(jìn)行同步的推薦替代方案。
SemaphoreSlim 和 Semaphore 可以限制同時(shí)訪問資源的最大線程數(shù)量,而不是像 Monitor 一樣只能限制一個(gè)線程。 SemaphoreSlim 比 Semaphore 更輕量,但僅限于單個(gè)進(jìn)程。如果可能,您最好使用 SemaphoreSlim 而不是 Semaphore。
ReaderWriterLockSlim 可以區(qū)分兩種對(duì)訪問資源的方式。它允許無(wú)限數(shù)量的讀取器 (readers) 同時(shí)訪問資源,并且限制同時(shí)只允許一個(gè)寫入器 (writers) 訪問鎖定資源。讀取時(shí)線程安全,但修改數(shù)據(jù)時(shí)需要獨(dú)占資源,很好地保護(hù)了資源。
AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 將堵塞傳入的線程,直到它們接收到一個(gè)信號(hào) (即調(diào)用 Set() )。然后等待中的線程將繼續(xù)執(zhí)行。AutoResetEvent 在下一次調(diào)用 Set() 之前,將一直阻塞,并只允許一個(gè)線程繼續(xù)執(zhí)行。ManualResetEvent 和 ManualResetEventSlim 不會(huì)堵塞線程,除非 Reset() 被調(diào)用。ManualResetEventSlim 比前兩者更輕量,更值得推薦。
Interlocked 提供一種選擇——原子操作,這是替代 locking 和其他同步基元更好的選擇(如果適用):
// non-atomic operation with a lock lock (syncObject) { counter++; } // equivalent atomic operation that doesn't require a lock Interlocked.Increment(ref counter);
并發(fā)集合
當(dāng)一個(gè)臨界區(qū)需要確保對(duì)數(shù)據(jù)結(jié)構(gòu)的原子訪問時(shí),用于并發(fā)訪問的專用數(shù)據(jù)結(jié)構(gòu)可能是更好和更有效的替代方案。例如,使用 ConcurrentDictionary 而不是 Dictionary,可以簡(jiǎn)化 lock 語(yǔ)句示例:
var counters = new ConcurrentDictionary< int, int >(); counters.TryAdd(key, 0); lock (syncObject) { counters[key]++; }
自然地,也有可能像下面一樣:
counters.AddOrUpdate(key, 1, (oldKey, oldValue) => oldValue + 1);
因?yàn)?update 的委托是臨界區(qū)外面的方法,因此,第二個(gè)線程可能在第一個(gè)線程更新值之前,讀取到同樣的舊值,使用自己的值有效地覆蓋了第一個(gè)線程的更新值,這就丟失了一個(gè)增量。錯(cuò)誤使用并發(fā)集合也是無(wú)法避免多線程帶來(lái)的問題。 并發(fā)集合的另一個(gè)替代方案是 不變的集合 (immutable collections)。 類似于并發(fā)集合,同樣是線程安全的,但是底層實(shí)現(xiàn)是不一樣的。任何關(guān)改變數(shù)據(jù)結(jié)構(gòu)的操作將不會(huì)改變?cè)瓉?lái)的實(shí)例。相反,它們返回一個(gè)更改后的副本,并保持原始實(shí)例不變:
var original = new Dictionary< int, int >().ToImmutableDictionary(); var modified = original.Add(key, value);
因此在一個(gè)線程中對(duì)集合任何更改對(duì)于其他線程來(lái)說(shuō)都是不可見的。因?yàn)樗鼈內(nèi)匀灰迷瓉?lái)的未修改的集合,這就是不變的集合本質(zhì)上是線程安全的原因。 當(dāng)然,這使得它們對(duì)于解決不同集合的問題很有效。最好的情況是多個(gè)線程在同一個(gè)輸入集合的情況下,獨(dú)立地修改數(shù)據(jù),在最后一步可能為所有線程合并變更。而使用常規(guī)集合,需要提前為每個(gè)線程創(chuàng)建集合的副本。
并行LINQ (PLINQ)
并行LINQ (PLINQ) 是 Task Parallel Library 的替代方案。顧名思義,它很大程度上依賴于 LINQ(語(yǔ)言集成查詢)功能。對(duì)于在大集合中執(zhí)行相同的昂貴操作的場(chǎng)景是很有用的。與所有操作都是順序執(zhí)行的普通 LINQ to Objects 不同的是,PLINQ可以在多個(gè)CPU上并行執(zhí)行這些操作。 發(fā)揮優(yōu)勢(shì)所需要的代碼改動(dòng)也是極小的:
// sequential execution var sequential = Enumerable.Range(0, 40) .Select(n => ExpensiveOperation(n)) .ToArray(); // parallel execution var parallel = Enumerable.Range(0, 40) .AsParallel() .Select(n => ExpensiveOperation(n)) .ToArray();
如你所見,這兩個(gè)代碼片段的不同僅僅是調(diào)用 AsParallel()。
這將IEnumerable 轉(zhuǎn)換為 ParallelQuery,導(dǎo)致查詢的部分并行運(yùn)行。要切換為回順序執(zhí)行,您可以調(diào)用 AsSequential(),
它將再次返回一個(gè)IEnumerable。 默認(rèn)情況下,PLINQ 不保留集合中的順序,以便讓進(jìn)程更有效率。但是當(dāng)順序很重要時(shí),可以調(diào)用 AsOrdered():
var parallel = Enumerable.Range(0, 40) .AsParallel() .AsOrdered() .Select(n => ExpensiveOperation(n)) .ToArray();
同理,你可以通過調(diào)用 AsUnordered()
切換回來(lái)。
在完整的 .NET Framework 中并發(fā)編程
由于 .NET Core 是完整的 .NET Framework 的簡(jiǎn)化實(shí)現(xiàn),所以 .NET Framework 中所有并行編程方法也可以在.NET Core 中使用。唯一的例外是不變的集合,它們不是完整的 .NET Framework 的組成部分。它們作為單獨(dú)的 NuGet 軟件包(System.Collections.Immutable)分發(fā),您需要在項(xiàng)目中安裝使用。
結(jié)論:
每當(dāng)應(yīng)用程序包含可以并行運(yùn)行的 CPU 密集型代碼時(shí),利用并發(fā)編程來(lái)提高性能并提高硬件利用率是很有意義的。 .NET Core 中的 API 抽象了許多細(xì)節(jié),使編寫并發(fā)代碼更容易。然而需要注意某些潛在的問題, 其中大部分涉及從多個(gè)線程訪問共享數(shù)據(jù)。 如果可以的話,你應(yīng)該完全避免這種情況。如果不行,請(qǐng)確保選擇最合適的同步方法或數(shù)據(jù)結(jié)構(gòu)。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
ASP.NET Core基礎(chǔ)之啟動(dòng)設(shè)置
這篇文章介紹了ASP.NET Core基礎(chǔ)之啟動(dòng)設(shè)置,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02.Net?Core微服務(wù)rpc框架GRPC通信基礎(chǔ)
這篇文章介紹了.Net?Core微服務(wù)rpc框架GRPC通信的基礎(chǔ)應(yīng)用,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-01-01詳解ABP框架中領(lǐng)域?qū)拥念I(lǐng)域事件Domain events
ABP是基于ASP.NET框架之上的Web開發(fā)框架(GitHub:https://github.com/aspnetboilerplate),這篇我們來(lái)詳解ABP框架中領(lǐng)域?qū)拥念I(lǐng)域事件Domain events,需要的朋友可以參考下2016-06-06ASP.NET在VS2022中使用Dispose釋放資源實(shí)例
這篇文章介紹了ASP.NET在VS2022中使用Dispose釋放資源實(shí)例,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-11-11