C# 線程安全詳解
介紹
在 .NET4.0 之前,如果我們需要在多線程環(huán)境下使用 Dictionary 類,除了自己實(shí)現(xiàn)線程同步來(lái)保證線程安全外,我們沒(méi)有其他選擇。很多開(kāi)發(fā)人員肯定都實(shí)現(xiàn)過(guò)類似的線程安全方案,可能是通過(guò)創(chuàng)建全新的線程安全字典,或者僅是簡(jiǎn)單的用一個(gè)類封裝一個(gè) Dictionary 對(duì)象,并在所有方法中加上鎖機(jī)制,我們稱這種方案叫 “Dictionary+Locks” 。
System.Collections.Concurrent 命名空間下提供多個(gè)線程安全集合類,只要多個(gè)線程同時(shí)訪問(wèn)集合,就應(yīng)使用這些類來(lái)代替 System.Collections 和 System.Collections.Generic 命名空間中的相應(yīng)類型。 但是,不保證通過(guò)擴(kuò)展方法或通過(guò)顯式接口實(shí)現(xiàn)訪問(wèn)集合對(duì)象是線程安全的,可能需要由調(diào)用方進(jìn)行同步。
經(jīng)典生產(chǎn)消費(fèi)問(wèn)題
介紹
這個(gè)問(wèn)題是最為經(jīng)典的多線程應(yīng)用問(wèn)題問(wèn)題就是:有一個(gè)或多個(gè)線程(生產(chǎn)者線程)產(chǎn)生一些數(shù)據(jù),還有一個(gè)或者多個(gè)線程(消費(fèi)者線程)要取出這些數(shù)據(jù)并執(zhí)行一些相應(yīng)的工作。
Queue
接下來(lái),我們是使用程序去描述這個(gè)問(wèn)題,看下面代碼
static void Main(string[] args) { int count = 0; // 臨界資源區(qū) var queue = new Queue<string>(); // 生產(chǎn)者線程 Task.Factory.StartNew(() => { while (true) { queue.Enqueue("mesg" + count); count++; } }); // 消費(fèi)者線程1 Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker A: " + value); } } }); // 消費(fèi)者線程2 Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker B: " + value); } } }); Thread.Sleep(50000); }
我們使用 Queue 模擬了一個(gè)簡(jiǎn)單的資源池,一個(gè)生產(chǎn)者放數(shù)據(jù),兩個(gè)消費(fèi)者消費(fèi)數(shù)據(jù)。
這個(gè)程序運(yùn)行以后會(huì)產(chǎn)生異常,異常的原因很簡(jiǎn)單。當(dāng)某時(shí)刻,第一個(gè)消費(fèi)者判斷 queue.Count > 0 為true 時(shí),就會(huì)到 Queue 中取數(shù)據(jù)。但是,此時(shí)這個(gè)數(shù)據(jù)可能會(huì)被第二個(gè)消費(fèi)者拿走了,因?yàn)榈诙€(gè)消費(fèi)者也判斷出此時(shí)有數(shù)據(jù)可取。第一個(gè)消費(fèi)者取取數(shù)據(jù)時(shí)就會(huì)發(fā)生異常,這就是一個(gè)簡(jiǎn)單的臨界資源線程安全問(wèn)題。
知道問(wèn)題了,那么如何解決呢?有兩種方案,接下來(lái)進(jìn)行講解
ConcurrentQueue
1 . 加鎖
這個(gè)方案是可行的,很多時(shí)候我們也是這么做的,包括微軟早期實(shí)現(xiàn)線程安全的 ArrayList 和 Hashtable 內(nèi)部 (Synchronized方法) 也是這么實(shí)現(xiàn)的。這個(gè)方案適用于只有少量的消費(fèi)者,并且每個(gè)消費(fèi)者都會(huì)執(zhí)行大量操作的時(shí)候,這時(shí) lock 并沒(méi)什么太大問(wèn)題,但是,如果是大批量短小精悍的消費(fèi)者存在的話,lock 會(huì)嚴(yán)重影響代碼的執(zhí)行效率。
2 . 線程安全的集合區(qū)
這個(gè)就是 .NET4.0 后 System.Collections.Concurrent 命名空間下提供多個(gè)線程安全集合類方案。
新的線程安全的這些集合內(nèi)部不再使用lock機(jī)制這種比較低效的方式去實(shí)現(xiàn)線程安全,而是轉(zhuǎn)而使用SpinWait 和 Interlocked 等機(jī)制,間接實(shí)現(xiàn)了線程安全,這種方式的效率要高于使用lock的方式。
var queue = new ConcurrentQueue<string>(); Task.Factory.StartNew(() => { while (true) { queue.Enqueue("msg" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker A: " + value); } } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker B: " + value); } } });
ConcurrentQueue.TryDequeue(T) 方法會(huì)嘗試獲取消費(fèi),那能不能不要去判斷集合是否為空,集合當(dāng)自己沒(méi)有元素的時(shí)候自己 Block 一下可以嗎?答案是,可以的
BlockingCollection
針對(duì)上面的問(wèn)題,我們可以使用 BlockingCollection 即可。接下來(lái)我來(lái)看
var blockingCollection = new BlockingCollection<string>(); Task.Factory.StartNew(() => { while (true) { blockingCollection.Add("msg" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker A: " + blockingCollection.Take()); } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker B: " + blockingCollection.Take()); } });
BlockingCollection 集合是一個(gè)擁有阻塞功能的集合,它就是完成了經(jīng)典生產(chǎn)者消費(fèi)者的算法功能。它沒(méi)有實(shí)現(xiàn)底層的存儲(chǔ)結(jié)構(gòu),而是使用了實(shí)現(xiàn) IProducerConsumerCollection 接口的幾個(gè)集合作為底層的數(shù)據(jù)結(jié)構(gòu),例如 ConcurrentBag, ConcurrentStack 或者是 ConcurrentQueue。你可以在構(gòu)造BlockingCollection 實(shí)例的時(shí)候傳入這個(gè)參數(shù),如果不指定的話,則默認(rèn)使用 ConcurrentQueue 作為存儲(chǔ)結(jié)構(gòu)。
而對(duì)于生產(chǎn)者來(lái)說(shuō),只需要通過(guò)調(diào)用其Add方法放數(shù)據(jù),消費(fèi)者只需要調(diào)用Take方法來(lái)取數(shù)據(jù)就可以了。
當(dāng)然了上面的消費(fèi)者代碼中還有一點(diǎn)是讓人不爽的,那就是 while 語(yǔ)句,可以更優(yōu)雅一點(diǎn)嗎?答案是,可以的。
Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker A: " + value); } });
BlockingCollection.GetConsumingEnumerable 方法是關(guān)鍵,這個(gè)方法會(huì)遍歷集合取出數(shù)據(jù),一旦發(fā)現(xiàn)集合空了,則阻塞自己,直到集合中又有元素了再開(kāi)始遍歷。
此時(shí),完美了解決了生產(chǎn)者消費(fèi)者問(wèn)題。然而通常來(lái)說(shuō),還有下面兩個(gè)問(wèn)題我們有時(shí)需要去控制
1 . 控制集合中數(shù)據(jù)的最大數(shù)量
這個(gè)問(wèn)題由 BlockingCollection 構(gòu)造函數(shù)解決,構(gòu)造該對(duì)象實(shí)例的時(shí)候,構(gòu)造函數(shù)中的 BoundedCapacity 決定了集合最大的可容納數(shù)據(jù)數(shù)量,這個(gè)比較簡(jiǎn)單。
2 . 何時(shí)停止的問(wèn)題
這個(gè)問(wèn)題由 CompleteAdding 和 IsCompleted 兩個(gè)配合解決。CompleteAdding 方法是直接不允許任何元素被加入集合;當(dāng)使用了 CompleteAdding 方法后且集合內(nèi)沒(méi)有元素的時(shí)候,另一個(gè)屬性 IsCompleted 此時(shí)會(huì)為 True,這個(gè)屬性可以用來(lái)判斷是否當(dāng)前集合內(nèi)的所有元素都被處理完。生產(chǎn)者修改后的代碼:
Task.Factory.StartNew(() => { for (int count = 0; count < 10; count++) { blockingCollection.Add("msg" + count); } blockingCollection.CompleteAdding(); });
當(dāng)使用了 CompleteAdding 方法后,對(duì)象停止往集合中添加數(shù)據(jù),這時(shí)如果是使用 GetConsumingEnumerable 枚舉的,那么這種枚舉會(huì)自然結(jié)束,不會(huì)再 Block 住集合,這種方式最優(yōu)雅,也是推薦的寫(xiě)法。
但是如果是使用 TryTake 訪問(wèn)元素的,則需要使用 IsCompleted 判斷一下,因?yàn)檫@個(gè)時(shí)候使用 TryTake 會(huì)拋InvalidOperationException 異常。接著我們看下最后的完整代碼:
static void Main(string[] args) { var blockingCollection = new BlockingCollection<string>(); var producer = Task.Factory.StartNew(() => { for (int count = 0; count < 10; count++) { blockingCollection.Add("msg" + count); Thread.Sleep(300); } blockingCollection.CompleteAdding(); }); var consumer1 = Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker A: " + value); } }); var consumer2 = Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker B: " + value); } }); Task.WaitAll(producer, consumer1, consumer2); }
BlockingCollection 枚舉
此外,需要注意 BlockingCollection 有兩種枚舉方法,
1 . foreach
首先 BlockingCollection 本身繼承自IEnumerable,所以它自己就可以被 foreach 枚舉,首先 BlockingCollection 包裝了一個(gè)線程安全集合,那么它自己也是線程安全的,而當(dāng)多個(gè)線程在同時(shí)修改或訪問(wèn)線程安全容器時(shí),BlockingCollection 自己作為 IEnumerable 會(huì)返回一個(gè)一定時(shí)間內(nèi)的集合片段,也就是只會(huì)枚舉在那個(gè)時(shí)間點(diǎn)上內(nèi)部集合的元素。使用這種方式枚舉的時(shí)候,不會(huì)有 Block 效果。
2 . GetConsumingEnumerable
另外一種方式就是我們上面使用的 GetConsumingEnumerable 方式的枚舉,這種方式會(huì)有 Block 效果,直到 CompleteAdding 被調(diào)用為止。
BlockingCollection 擴(kuò)展
實(shí)現(xiàn) IProducerConsumerCollection 接口的幾個(gè)集合:ConcurrentBag (線程安全的無(wú)序的元素集合), ConcurrentStack (線程安全的堆棧) 和 ConcurrentQueue (線程安全的隊(duì)列)。這些都很簡(jiǎn)單,功能與非線程安全的那些集合都一樣,只不過(guò)是多了 TryXXX 方法,多線程環(huán)境下使用這些方法就好了。
System.Collections.Concurrent
System.Collections.Concurrent 下面還有一些其他與多線程相關(guān)的集合,有些個(gè)類在原來(lái)的基礎(chǔ)上也添加了一下新的方法,例如:AddOrUpdate,GetOrAdd,TryXXX 等等,都很容易理解。
總結(jié)
本篇文章就到這里了,希望能夠給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
C#隨機(jī)設(shè)置900-1100毫秒延遲的方法
這篇文章主要介紹了C#隨機(jī)設(shè)置900-1100毫秒延遲的方法,涉及C#中Thread.Sleep方法的使用技巧,需要的朋友可以參考下2015-04-04C#中使用ArrayPool和MemoryPool實(shí)例
對(duì)資源的可復(fù)用是提升應(yīng)用程序性能的一個(gè)非常重要的手段,比如本篇要分享的 ArrayPool 和 MemoryPool,它們就有效的減少了內(nèi)存使用和對(duì)GC的壓力,從而提升應(yīng)用程序性能。感興趣的可以了解一下2021-05-05C#基礎(chǔ):Dispose()、Close()、Finalize()的區(qū)別詳解
本篇文章是對(duì)c#中的Dispose()、Close()、Finalize()的區(qū)別進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05

WindowsForm實(shí)現(xiàn)TextBox占位符Placeholder提示功能

C#實(shí)現(xiàn)基于IE內(nèi)核的簡(jiǎn)單瀏覽器完整實(shí)例