C#集合之并發(fā)集合的用法
.NET 4 開始,在System.Collection.Concurrent中提供了幾個線程安全的集合類。線程安全的集合可防止多個線程以相互沖突的方式訪問集合。
為了對集合進行線程安全的訪問,定義了IProducerConsumerCollection<T>接口。這個接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法嘗試給集合添加一項,但如果集合禁止添加項,這個操作就可能失敗。TryAdd()方法返回一個布爾值,以說明操作成功還是失敗。TryTake()同樣,在成功時返回集合中的項。
- *ConcurrentQueue<T>————這個集合類用一種免鎖定的算法實現(xiàn),使用在內部合并到一個鏈表中的32項數(shù)組。該類有Enqueue(),TryDequeue()和TryPeek()方法。因為這個類實現(xiàn)了 IProducerConsumerCollection<T>接口,所以TryAdd()和TryTake()方法僅調用Enqueue和TryDequeue方法。
- *ConcurrentStack<T>————類似于ConcurrentQueue<T>。該類定義了Push(),PushRange(),TryPeek(),TryPop()和TryPopRange()方法。在內部這個類使用其元素的鏈表。
- *ConcurrentBag<T>————該類沒有定義添加或提取項的任何順序。這個類使用一個線程映射到內部使用的數(shù)組上的概念,因此嘗試減少鎖定。方法:Add(),TryPeek(),TryTake()。
- *ConcurrentDictionary<TKey,TValue>————這是一個線程安全的鍵值集合。TryAdd(),TryGetValue(),TryRemove()和TryUpdate()方法以非阻塞的方式訪問成員。因為元素基于鍵和值,所以ConcurrentDictionary<TKey,TValue>沒有實現(xiàn)IProducerConsumerCollection<T>接口。
- *BlockingCollection<T>————這個集合在可以添加或提取元素之前,會阻塞線程并一直等待。BlockingCollection<T>集合提供了一個接口,以使用Add()和Take()方法來刪除和添加元素。這些方法會阻塞線程。Add()方法有一個重載版本,其中可以給該重載版本傳遞一個CancellationToken令牌。這個令牌允許取消被阻塞的調用。如果不希望無限的等待下去,且不希望從外部取消調用,就可以使用TryAdd()和TryTake()方法,在這些方法中,也可以指定一個超時值。
BlockingCollection<T>是對實現(xiàn)了 IProducerConsumerCollection<T>接口的任意類的修飾器,它默認使用ConcurrentQueue<T>類。還可以給構造函數(shù)傳遞任何實現(xiàn)了 IProducerConsumerCollection<T>接口的類。
下面使用一個例子演示BlockingCollection<T>的使用,一個任務向一個集合寫入,同時另一個任務從這個集合讀取。
static void Main(string[] args)
{
StartPipeline();
Console.ReadLine();
}
private static async void StartPipeline()
{
//存儲文件名
var fileNames = new BlockingCollection<string>();
//存儲文件的每一行內容
var lines = new BlockingCollection<string>();
//存儲每一行的每個單詞,單詞為鍵,單詞個數(shù)為值
var words = new ConcurrentDictionary<string, int>();
//存儲words信息
var items = new BlockingCollection<Info>();
var coloredItems = new BlockingCollection<Info>();
Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
ConsoleHelper.WriteLine("started stage 1");
Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
ConsoleHelper.WriteLine("started stage 2");
Task t3 = PipelineStages.ProcessContentAsync(lines, words);
await Task.WhenAll(t1, t2, t3);
ConsoleHelper.WriteLine("stages 1, 2, 3 completed");
//當上面三個任務完成時,才執(zhí)行下面的任務
Task t4 = PipelineStages.TransferContentAsync(words, items);
Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
Task t6 = PipelineStages.ShowContentAsync(coloredItems);
ConsoleHelper.WriteLine("stages 4, 5, 6 started");
await Task.WhenAll(t4, t5, t6);
ConsoleHelper.WriteLine("all stages finished");
}
public static class PipelineStages
{
public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
{
return Task.Run(() =>
{
foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
{
output.Add(filename);
ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
}
//調用CompleteAdding,通知所有讀取器不應再等待集合中的任何額外項
//如果不調用該方法,讀取器會在foreach循環(huán)中等待更多的項被添加
output.CompleteAdding();
});
}
public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
{
//使用讀取器讀取集合時,需要使用GetConsumingEnumerable獲取阻塞集合的枚舉器,
//如果直接使用input迭代集合,這只會迭代當前狀態(tài)的集合,不會迭代以后添加的項
foreach (var filename in input.GetConsumingEnumerable())
{
using (FileStream stream = File.OpenRead(filename))
{
var reader = new StreamReader(stream);
string line = null;
while ((line = await reader.ReadLineAsync()) != null)
{
output.Add(line);
ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
}
}
}
output.CompleteAdding();
}
public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
{
return Task.Run(() =>
{
foreach (var line in input.GetConsumingEnumerable())
{
string[] words = line.Split(' ', ';', '\t', '{', '}', '(', ')', ':', ',', '"');
foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
{
//這里使用了字典的一個擴展方法
output.AddOrIncrementValue(word);
ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
}
}
});
}
public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
{
return Task.Run(() =>
{
foreach (var word in input.Keys)
{
int value;
if (input.TryGetValue(word, out value))
{
var info = new Info { Word = word, Count = value };
output.Add(info);
ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
}
}
output.CompleteAdding();
});
}
public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
{
return Task.Run(() =>
{
foreach (var item in input.GetConsumingEnumerable())
{
if (item.Count > 40)
{
item.Color = "Red";
}
else if (item.Count > 20)
{
item.Color = "Yellow";
}
else
{
item.Color = "Green";
}
output.Add(item);
ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
}
output.CompleteAdding();
});
}
public static Task ShowContentAsync(BlockingCollection<Info> input)
{
return Task.Run(() =>
{
foreach (var item in input.GetConsumingEnumerable())
{
ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
}
});
}
}
//創(chuàng)建一個字典的擴展方法
public static class ConcurrentDictionaryExtension
{
public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key)
{
bool success = false;
while (!success)
{
int value;
if (dict.TryGetValue(key, out value))
{
if (dict.TryUpdate(key, value + 1, value))
{
success = true;
}
}
else
{
if (dict.TryAdd(key, 1))
{
success = true;
}
}
}
}
}這里使用了一個管道模型的編程模式,上面的添加內容,下面處理內容

到此這篇關于C#集合之并發(fā)集合的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
C#線程執(zhí)行超時處理與并發(fā)線程數(shù)控制實例
這篇文章主要介紹了C#線程執(zhí)行超時處理與并發(fā)線程數(shù)控制的方法,實例講述了并發(fā)執(zhí)行存儲過程的最大個數(shù),讀者可對程序稍做改動即控制并發(fā)線程數(shù),具有一定的參考借鑒價值,需要的朋友可以參考下2014-11-11

