C#集合之并發(fā)集合的用法
.NET 4 開(kāi)始,在System.Collection.Concurrent中提供了幾個(gè)線程安全的集合類(lèi)。線程安全的集合可防止多個(gè)線程以相互沖突的方式訪問(wèn)集合。
為了對(duì)集合進(jìn)行線程安全的訪問(wèn),定義了IProducerConsumerCollection<T>接口。這個(gè)接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法嘗試給集合添加一項(xiàng),但如果集合禁止添加項(xiàng),這個(gè)操作就可能失敗。TryAdd()方法返回一個(gè)布爾值,以說(shuō)明操作成功還是失敗。TryTake()同樣,在成功時(shí)返回集合中的項(xiàng)。
- *ConcurrentQueue<T>————這個(gè)集合類(lèi)用一種免鎖定的算法實(shí)現(xiàn),使用在內(nèi)部合并到一個(gè)鏈表中的32項(xiàng)數(shù)組。該類(lèi)有Enqueue(),TryDequeue()和TryPeek()方法。因?yàn)檫@個(gè)類(lèi)實(shí)現(xiàn)了 IProducerConsumerCollection<T>接口,所以TryAdd()和TryTake()方法僅調(diào)用Enqueue和TryDequeue方法。
- *ConcurrentStack<T>————類(lèi)似于ConcurrentQueue<T>。該類(lèi)定義了Push(),PushRange(),TryPeek(),TryPop()和TryPopRange()方法。在內(nèi)部這個(gè)類(lèi)使用其元素的鏈表。
- *ConcurrentBag<T>————該類(lèi)沒(méi)有定義添加或提取項(xiàng)的任何順序。這個(gè)類(lèi)使用一個(gè)線程映射到內(nèi)部使用的數(shù)組上的概念,因此嘗試減少鎖定。方法:Add(),TryPeek(),TryTake()。
- *ConcurrentDictionary<TKey,TValue>————這是一個(gè)線程安全的鍵值集合。TryAdd(),TryGetValue(),TryRemove()和TryUpdate()方法以非阻塞的方式訪問(wèn)成員。因?yàn)樵鼗阪I和值,所以ConcurrentDictionary<TKey,TValue>沒(méi)有實(shí)現(xiàn)IProducerConsumerCollection<T>接口。
- *BlockingCollection<T>————這個(gè)集合在可以添加或提取元素之前,會(huì)阻塞線程并一直等待。BlockingCollection<T>集合提供了一個(gè)接口,以使用Add()和Take()方法來(lái)刪除和添加元素。這些方法會(huì)阻塞線程。Add()方法有一個(gè)重載版本,其中可以給該重載版本傳遞一個(gè)CancellationToken令牌。這個(gè)令牌允許取消被阻塞的調(diào)用。如果不希望無(wú)限的等待下去,且不希望從外部取消調(diào)用,就可以使用TryAdd()和TryTake()方法,在這些方法中,也可以指定一個(gè)超時(shí)值。
BlockingCollection<T>是對(duì)實(shí)現(xiàn)了 IProducerConsumerCollection<T>接口的任意類(lèi)的修飾器,它默認(rèn)使用ConcurrentQueue<T>類(lèi)。還可以給構(gòu)造函數(shù)傳遞任何實(shí)現(xiàn)了 IProducerConsumerCollection<T>接口的類(lèi)。
下面使用一個(gè)例子演示BlockingCollection<T>的使用,一個(gè)任務(wù)向一個(gè)集合寫(xiě)入,同時(shí)另一個(gè)任務(wù)從這個(gè)集合讀取。
static void Main(string[] args) { StartPipeline(); Console.ReadLine(); } private static async void StartPipeline() { //存儲(chǔ)文件名 var fileNames = new BlockingCollection<string>(); //存儲(chǔ)文件的每一行內(nèi)容 var lines = new BlockingCollection<string>(); //存儲(chǔ)每一行的每個(gè)單詞,單詞為鍵,單詞個(gè)數(shù)為值 var words = new ConcurrentDictionary<string, int>(); //存儲(chǔ)words信息 var items = new BlockingCollection<Info>(); var coloredItems = new BlockingCollection<Info>(); Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames); ConsoleHelper.WriteLine("started stage 1"); Task t2 = PipelineStages.LoadContentAsync(fileNames, lines); ConsoleHelper.WriteLine("started stage 2"); Task t3 = PipelineStages.ProcessContentAsync(lines, words); await Task.WhenAll(t1, t2, t3); ConsoleHelper.WriteLine("stages 1, 2, 3 completed"); //當(dāng)上面三個(gè)任務(wù)完成時(shí),才執(zhí)行下面的任務(wù) Task t4 = PipelineStages.TransferContentAsync(words, items); Task t5 = PipelineStages.AddColorAsync(items, coloredItems); Task t6 = PipelineStages.ShowContentAsync(coloredItems); ConsoleHelper.WriteLine("stages 4, 5, 6 started"); await Task.WhenAll(t4, t5, t6); ConsoleHelper.WriteLine("all stages finished"); } public static class PipelineStages { public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output) { return Task.Run(() => { foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories)) { output.Add(filename); ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename)); } //調(diào)用CompleteAdding,通知所有讀取器不應(yīng)再等待集合中的任何額外項(xiàng) //如果不調(diào)用該方法,讀取器會(huì)在foreach循環(huán)中等待更多的項(xiàng)被添加 output.CompleteAdding(); }); } public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output) { //使用讀取器讀取集合時(shí),需要使用GetConsumingEnumerable獲取阻塞集合的枚舉器, //如果直接使用input迭代集合,這只會(huì)迭代當(dāng)前狀態(tài)的集合,不會(huì)迭代以后添加的項(xiàng) foreach (var filename in input.GetConsumingEnumerable()) { using (FileStream stream = File.OpenRead(filename)) { var reader = new StreamReader(stream); string line = null; while ((line = await reader.ReadLineAsync()) != null) { output.Add(line); ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line)); } } } output.CompleteAdding(); } public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output) { return Task.Run(() => { foreach (var line in input.GetConsumingEnumerable()) { string[] words = line.Split(' ', ';', '\t', '{', '}', '(', ')', ':', ',', '"'); foreach (var word in words.Where(w => !string.IsNullOrEmpty(w))) { //這里使用了字典的一個(gè)擴(kuò)展方法 output.AddOrIncrementValue(word); ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word)); } } }); } public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output) { return Task.Run(() => { foreach (var word in input.Keys) { int value; if (input.TryGetValue(word, out value)) { var info = new Info { Word = word, Count = value }; output.Add(info); ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info)); } } output.CompleteAdding(); }); } public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output) { return Task.Run(() => { foreach (var item in input.GetConsumingEnumerable()) { if (item.Count > 40) { item.Color = "Red"; } else if (item.Count > 20) { item.Color = "Yellow"; } else { item.Color = "Green"; } output.Add(item); ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color)); } output.CompleteAdding(); }); } public static Task ShowContentAsync(BlockingCollection<Info> input) { return Task.Run(() => { foreach (var item in input.GetConsumingEnumerable()) { ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color); } }); } } //創(chuàng)建一個(gè)字典的擴(kuò)展方法 public static class ConcurrentDictionaryExtension { public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key) { bool success = false; while (!success) { int value; if (dict.TryGetValue(key, out value)) { if (dict.TryUpdate(key, value + 1, value)) { success = true; } } else { if (dict.TryAdd(key, 1)) { success = true; } } } } }
這里使用了一個(gè)管道模型的編程模式,上面的添加內(nèi)容,下面處理內(nèi)容
到此這篇關(guān)于C#集合之并發(fā)集合的文章就介紹到這了。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
C# Winform實(shí)現(xiàn)石頭剪刀布游戲
這篇文章主要為大家詳細(xì)介紹了Winform實(shí)現(xiàn)石頭剪刀布游戲,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01C#線程執(zhí)行超時(shí)處理與并發(fā)線程數(shù)控制實(shí)例
這篇文章主要介紹了C#線程執(zhí)行超時(shí)處理與并發(fā)線程數(shù)控制的方法,實(shí)例講述了并發(fā)執(zhí)行存儲(chǔ)過(guò)程的最大個(gè)數(shù),讀者可對(duì)程序稍做改動(dòng)即控制并發(fā)線程數(shù),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2014-11-11