C#并行編程之Task同步機制
在并行計算中,不可避免的會碰到多個任務共享變量,實例,集合。雖然task自帶了兩個方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來實現(xiàn)任務串行化,但是這些簡單的方法遠遠不能滿足我們實際的開發(fā)需要,從.net 4.0開始,類庫給我們提供了很多的類來幫助我們簡化并行計算中復雜的數(shù)據(jù)同步問題。
一、隔離執(zhí)行:不共享數(shù)據(jù),讓每個task都有一份自己的數(shù)據(jù)拷貝。
對數(shù)據(jù)共享問題處理的方式是“分離執(zhí)行”,我們通過把每個Task執(zhí)行完成后的各自計算的值進行最后的匯總,也就是說多個Task之間不存在數(shù)據(jù)共享了,各自做各自的事,完全分離開來。
1、傳統(tǒng)方式
每個Task執(zhí)行時不存在數(shù)據(jù)共享了,每個Task中計算自己值,最后我們匯總每個Task的Result。我們可以通過Task中傳遞的state參數(shù)來進行隔離執(zhí)行:
int Sum = 0; Task<int>[] tasks = new Task<int>[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task<int>((obj) => { var start = (int)obj; for (int j = 0; j < 1000; j++) { start = start + 1; } return start; }, Sum); tasks[i].Start(); } Task.WaitAll(tasks); for (var i = 0; i < 10; i++) { Sum += tasks[i].Result; } Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
2、ThreadLocal類
在.Net中提供了System.Threading.ThreadLocal來創(chuàng)建分離。
ThreadLocal是一種提供線程本地存儲的類型,它可以給每個線程一個分離的實例,來提供每個線程單獨的數(shù)據(jù)結(jié)果。上面的程序我們可以使用TreadLocal:
int Sum = 0; Task<int>[] tasks = new Task<int>[10]; var tl = new ThreadLocal<int>(); for (int i = 0; i < 10; i++) { tasks[i] = new Task<int>((obj) => { tl.Value = (int)obj; for (int j = 0; j < 1000; j++) { tl.Value++; } returntl.Value; }, Sum); tasks[i].Start(); } Task.WaitAll(tasks); for (var i = 0; i < 10; i++) { Sum += tasks[i].Result; } Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
但是我們要注意的一點TreadLocal是針對每個線程的,不是針對每個Task的。一個Tread中可能有多個Task。
ThreadLocal類舉例:
static ThreadLocal<string> local; static void Main() { //創(chuàng)建ThreadLocal并提供默認值 local = new ThreadLocal<string>(() => "hehe"); //修改TLS的線程 Thread th = new Thread(() => { local.Value = "Mgen"; Display(); }); th.Start(); th.Join(); Display(); } //顯示TLS中數(shù)據(jù)值 static void Display() { Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value); }
二、同步類型:通過調(diào)整task的執(zhí)行,有序的執(zhí)行task。
同步類型是一種用來調(diào)度Task訪問臨界區(qū)域的一種特殊類型。在.Net 4.0中提供了多種同步類型給我們使用,主要分為:輕量級的、重量級的和等待處理型的,在下面我們會介紹常用的同步處理類型。
常用的同步類型
首先來看看.Net 4.0中常見的幾種同步類型以及處理的相關(guān)問題:
同步類型以及解決問題
- lock關(guān)鍵字、Montor類、SpinLock類:有序訪問臨界區(qū)域
- Interlocked類:數(shù)值類型的增加或則減少
- Mutex類:交叉同步
- WaitAll方法:同步多個鎖定(主要是Task之間的調(diào)度)
- 申明性的同步(如Synchronization):使類中的所有的方法同步
1、Lock鎖
其實最簡單同步類型的使用辦法就是使用lock關(guān)鍵字。在使用lock關(guān)鍵字時,首先我們需要創(chuàng)建一個鎖定的object,而且這個object需要所有的task都能訪問,其次能我們需要將我們的臨界區(qū)域包含在lock塊中。我們之前例子中代碼可以這樣加上lock:
int Sum = 0; Task[] tasks = new Task[10]; var obj = new Object(); for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { lock (obj) { Sum = Sum + 1; } } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
其實lock關(guān)鍵字是使用Monitor的一種簡短的方式,lock關(guān)鍵字自動通過調(diào)用Monitor.Enter\Monitor.Exit方法來處理獲得鎖以及釋放鎖。
2、Interlocked 聯(lián)鎖
Interlocked通過使用操作系統(tǒng)或則硬件的一些特性提供了一些列高效的靜態(tài)的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類型的多個方法的重載。我們將上面的例子中使用Interlocked:
int Sum = 0; Task[] tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { Interlocked.Increment(ref Sum); } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
3、Mutex互斥體
Mutex也是一個同步類型,在多個線程進行訪問的時候,它只向一個線程授權(quán)共享數(shù)據(jù)的獨立訪問。我們可以通過Mutex中的WaitOne方法來獲取Mutex的所有權(quán),但是同時我們要注意的是,我們在一個線程中多少次調(diào)用過WaitOne方法,就需要調(diào)用多少次ReleaseMutex方法來釋放Mutex的占有。上面的例子我們通過Mutex這樣實現(xiàn):
int Sum = 0; Task[] tasks = new Task[10]; var mutex = new Mutex(); for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { bool lockAcquired = mutex.WaitOne(); try { Sum++; } finally { if (lockAcquired) mutex.ReleaseMutex(); } } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
三、申明性同步
我們可以通過使用Synchronization 特性來標識一個類,從而使一個類型的字段以及方法都實現(xiàn)同步化。在使用Synchronization 時,我們需要將我們的目標同步的類繼承于System.ContextBoundObject類型。我們來看看之前的例子我們同步標識Synchronization 的實現(xiàn):
static void Main(string[] args) { var sum = new SumClass(); Task[] tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { sum.Increment(); } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum()); } [Synchronization] class SumClass : ContextBoundObject { private int _Sum; public void Increment() { _Sum++; } public int GetSum() { return _Sum; } }
四、并發(fā)集合
當多個線程對某個非線程安全容器并發(fā)地進行讀寫操作時,這些操作將導致不可預估的后果或者會導致報錯。為了解決這個問題我們可以使用lock關(guān)鍵字或者Monitor類來給容器上鎖。但鎖的引入使得我們的代碼更加復雜,同時也帶來了更多的同步消耗。而.NET Framework 4提供的線程安全且可拓展的并發(fā)集合能夠使得我們的并行代碼更加容易編寫,此外,鎖的使用次數(shù)的減少也減少了麻煩的死鎖與競爭條件的問題。.NET Framework 4主要提供了如下幾種并發(fā)集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合通過使用一種叫做比較并交換(compare and swap, CAS)指令和內(nèi)存屏障的技術(shù)來避免使用重量級的鎖。
在.Net 4.0中提供了很多并發(fā)的集合類型來讓我們處理數(shù)據(jù)同步的集合的問題,這里面包括:
1.ConcurrentQueue:提供并發(fā)安全的隊列集合,以先進先出的方式進行操作;
2.ConcurrentStack:提供并發(fā)安全的堆棧集合,以先進后出的方式進行操作;
3.ConcurrentBag:提供并發(fā)安全的一種無序集合;
4.ConcurrentDictionary:提供并發(fā)安全的一種key-value類型的集合。
我們在這里只做ConcurrentQueue的一個嘗試,并發(fā)隊列是一種線程安全的隊列集合,我們可以通過Enqueue()進行排隊、TryDequeue()進行出隊列操作:
for (var j = 0; j < 10; j++) { var queue = new ConcurrentQueue<int>(); var count = 0; for (var i = 0; i < 1000; i++) { queue.Enqueue(i); } var tasks = new Task[10]; for (var i = 0; i < tasks.Length; i++) { tasks[i] = new Task(() => { while (queue.Count > 0) { int item; var isDequeue = queue.TryDequeue(out item); if (isDequeue) Interlocked.Increment(ref count); } }); tasks[i].Start(); } try { Task.WaitAll(tasks); } catch (AggregateException e) { e.Handle((ex) => { Console.WriteLine("Exception Message:{0}", ex.Message); return true; }); } Console.WriteLine("Dequeue items count :{0}", count); }
五、Barrier(屏障同步)
barrier叫做屏障,就像下圖中的“紅色線”,如果我們的屏障設為4個task就認為已經(jīng)滿了的話,那么執(zhí)行中先到的task必須等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設置操作為new Barrier(4,(i)=>{})。SignalAndWait給我們提供了超時的重載,為了能夠取消后續(xù)執(zhí)行
//四個task執(zhí)行 static Task[] tasks = new Task[4]; static Barrier barrier = null; static void Main(string[] args) { barrier = new Barrier(tasks.Length, (i) => { Console.WriteLine("**********************************************************"); Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber); Console.WriteLine("**********************************************************"); }); for (int j = 0; j < tasks.Length; j++) { tasks[j] = Task.Factory.StartNew((obj) => { var single = Convert.ToInt32(obj); LoadUser(single); barrier.SignalAndWait(); LoadProduct(single); barrier.SignalAndWait(); LoadOrder(single); barrier.SignalAndWait(); }, j); } Task.WaitAll(tasks); Console.WriteLine("指定數(shù)據(jù)庫中所有數(shù)據(jù)已經(jīng)加載完畢!"); Console.Read(); } static void LoadUser(int num) { Console.WriteLine("當前任務:{0}正在加載User部分數(shù)據(jù)!", num); } static void LoadProduct(int num) { Console.WriteLine("當前任務:{0}正在加載Product部分數(shù)據(jù)!", num); } static void LoadOrder(int num) { Console.WriteLine("當前任務:{0}正在加載Order部分數(shù)據(jù)!", num); }
到此這篇關(guān)于C#并行編程之Task同步機制的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
解決WPF附加屬性的Set函數(shù)不調(diào)用的問題
這篇文章介紹了解決WPF附加屬性的Set函數(shù)不調(diào)用的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-06-06