C#并行編程之Task同步機(jī)制
在并行計(jì)算中,不可避免的會(huì)碰到多個(gè)任務(wù)共享變量,實(shí)例,集合。雖然task自帶了兩個(gè)方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來實(shí)現(xiàn)任務(wù)串行化,但是這些簡單的方法遠(yuǎn)遠(yuǎn)不能滿足我們實(shí)際的開發(fā)需要,從.net 4.0開始,類庫給我們提供了很多的類來幫助我們簡化并行計(jì)算中復(fù)雜的數(shù)據(jù)同步問題。
一、隔離執(zhí)行:不共享數(shù)據(jù),讓每個(gè)task都有一份自己的數(shù)據(jù)拷貝。
對數(shù)據(jù)共享問題處理的方式是“分離執(zhí)行”,我們通過把每個(gè)Task執(zhí)行完成后的各自計(jì)算的值進(jìn)行最后的匯總,也就是說多個(gè)Task之間不存在數(shù)據(jù)共享了,各自做各自的事,完全分離開來。
1、傳統(tǒng)方式
每個(gè)Task執(zhí)行時(shí)不存在數(shù)據(jù)共享了,每個(gè)Task中計(jì)算自己值,最后我們匯總每個(gè)Task的Result。我們可以通過Task中傳遞的state參數(shù)來進(jìn)行隔離執(zhí)行:
int Sum = 0; Task<int>[] tasks = new Task<int>[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task<int>((obj) => { var start = (int)obj; for (int j = 0; j < 1000; j++) { start = start + 1; } return start; }, Sum); tasks[i].Start(); } Task.WaitAll(tasks); for (var i = 0; i < 10; i++) { Sum += tasks[i].Result; } Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
2、ThreadLocal類
在.Net中提供了System.Threading.ThreadLocal來創(chuàng)建分離。
ThreadLocal是一種提供線程本地存儲(chǔ)的類型,它可以給每個(gè)線程一個(gè)分離的實(shí)例,來提供每個(gè)線程單獨(dú)的數(shù)據(jù)結(jié)果。上面的程序我們可以使用TreadLocal:
int Sum = 0; Task<int>[] tasks = new Task<int>[10]; var tl = new ThreadLocal<int>(); for (int i = 0; i < 10; i++) { tasks[i] = new Task<int>((obj) => { tl.Value = (int)obj; for (int j = 0; j < 1000; j++) { tl.Value++; } returntl.Value; }, Sum); tasks[i].Start(); } Task.WaitAll(tasks); for (var i = 0; i < 10; i++) { Sum += tasks[i].Result; } Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
但是我們要注意的一點(diǎn)TreadLocal是針對每個(gè)線程的,不是針對每個(gè)Task的。一個(gè)Tread中可能有多個(gè)Task。
ThreadLocal類舉例:
static ThreadLocal<string> local; static void Main() { //創(chuàng)建ThreadLocal并提供默認(rèn)值 local = new ThreadLocal<string>(() => "hehe"); //修改TLS的線程 Thread th = new Thread(() => { local.Value = "Mgen"; Display(); }); th.Start(); th.Join(); Display(); } //顯示TLS中數(shù)據(jù)值 static void Display() { Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value); }
二、同步類型:通過調(diào)整task的執(zhí)行,有序的執(zhí)行task。
同步類型是一種用來調(diào)度Task訪問臨界區(qū)域的一種特殊類型。在.Net 4.0中提供了多種同步類型給我們使用,主要分為:輕量級的、重量級的和等待處理型的,在下面我們會(huì)介紹常用的同步處理類型。
常用的同步類型
首先來看看.Net 4.0中常見的幾種同步類型以及處理的相關(guān)問題:
同步類型以及解決問題
- lock關(guān)鍵字、Montor類、SpinLock類:有序訪問臨界區(qū)域
- Interlocked類:數(shù)值類型的增加或則減少
- Mutex類:交叉同步
- WaitAll方法:同步多個(gè)鎖定(主要是Task之間的調(diào)度)
- 申明性的同步(如Synchronization):使類中的所有的方法同步
1、Lock鎖
其實(shí)最簡單同步類型的使用辦法就是使用lock關(guān)鍵字。在使用lock關(guān)鍵字時(shí),首先我們需要?jiǎng)?chuàng)建一個(gè)鎖定的object,而且這個(gè)object需要所有的task都能訪問,其次能我們需要將我們的臨界區(qū)域包含在lock塊中。我們之前例子中代碼可以這樣加上lock:
int Sum = 0; Task[] tasks = new Task[10]; var obj = new Object(); for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { lock (obj) { Sum = Sum + 1; } } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
其實(shí)lock關(guān)鍵字是使用Monitor的一種簡短的方式,lock關(guān)鍵字自動(dòng)通過調(diào)用Monitor.Enter\Monitor.Exit方法來處理獲得鎖以及釋放鎖。
2、Interlocked 聯(lián)鎖
Interlocked通過使用操作系統(tǒng)或則硬件的一些特性提供了一些列高效的靜態(tài)的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類型的多個(gè)方法的重載。我們將上面的例子中使用Interlocked:
int Sum = 0; Task[] tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { Interlocked.Increment(ref Sum); } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
3、Mutex互斥體
Mutex也是一個(gè)同步類型,在多個(gè)線程進(jìn)行訪問的時(shí)候,它只向一個(gè)線程授權(quán)共享數(shù)據(jù)的獨(dú)立訪問。我們可以通過Mutex中的WaitOne方法來獲取Mutex的所有權(quán),但是同時(shí)我們要注意的是,我們在一個(gè)線程中多少次調(diào)用過WaitOne方法,就需要調(diào)用多少次ReleaseMutex方法來釋放Mutex的占有。上面的例子我們通過Mutex這樣實(shí)現(xiàn):
int Sum = 0; Task[] tasks = new Task[10]; var mutex = new Mutex(); for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { bool lockAcquired = mutex.WaitOne(); try { Sum++; } finally { if (lockAcquired) mutex.ReleaseMutex(); } } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
三、申明性同步
我們可以通過使用Synchronization 特性來標(biāo)識(shí)一個(gè)類,從而使一個(gè)類型的字段以及方法都實(shí)現(xiàn)同步化。在使用Synchronization 時(shí),我們需要將我們的目標(biāo)同步的類繼承于System.ContextBoundObject類型。我們來看看之前的例子我們同步標(biāo)識(shí)Synchronization 的實(shí)現(xiàn):
static void Main(string[] args) { var sum = new SumClass(); Task[] tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { sum.Increment(); } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum()); } [Synchronization] class SumClass : ContextBoundObject { private int _Sum; public void Increment() { _Sum++; } public int GetSum() { return _Sum; } }
四、并發(fā)集合
當(dāng)多個(gè)線程對某個(gè)非線程安全容器并發(fā)地進(jìn)行讀寫操作時(shí),這些操作將導(dǎo)致不可預(yù)估的后果或者會(huì)導(dǎo)致報(bào)錯(cuò)。為了解決這個(gè)問題我們可以使用lock關(guān)鍵字或者M(jìn)onitor類來給容器上鎖。但鎖的引入使得我們的代碼更加復(fù)雜,同時(shí)也帶來了更多的同步消耗。而.NET Framework 4提供的線程安全且可拓展的并發(fā)集合能夠使得我們的并行代碼更加容易編寫,此外,鎖的使用次數(shù)的減少也減少了麻煩的死鎖與競爭條件的問題。.NET Framework 4主要提供了如下幾種并發(fā)集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合通過使用一種叫做比較并交換(compare and swap, CAS)指令和內(nèi)存屏障的技術(shù)來避免使用重量級的鎖。
在.Net 4.0中提供了很多并發(fā)的集合類型來讓我們處理數(shù)據(jù)同步的集合的問題,這里面包括:
1.ConcurrentQueue:提供并發(fā)安全的隊(duì)列集合,以先進(jìn)先出的方式進(jìn)行操作;
2.ConcurrentStack:提供并發(fā)安全的堆棧集合,以先進(jìn)后出的方式進(jìn)行操作;
3.ConcurrentBag:提供并發(fā)安全的一種無序集合;
4.ConcurrentDictionary:提供并發(fā)安全的一種key-value類型的集合。
我們在這里只做ConcurrentQueue的一個(gè)嘗試,并發(fā)隊(duì)列是一種線程安全的隊(duì)列集合,我們可以通過Enqueue()進(jìn)行排隊(duì)、TryDequeue()進(jìn)行出隊(duì)列操作:
for (var j = 0; j < 10; j++) { var queue = new ConcurrentQueue<int>(); var count = 0; for (var i = 0; i < 1000; i++) { queue.Enqueue(i); } var tasks = new Task[10]; for (var i = 0; i < tasks.Length; i++) { tasks[i] = new Task(() => { while (queue.Count > 0) { int item; var isDequeue = queue.TryDequeue(out item); if (isDequeue) Interlocked.Increment(ref count); } }); tasks[i].Start(); } try { Task.WaitAll(tasks); } catch (AggregateException e) { e.Handle((ex) => { Console.WriteLine("Exception Message:{0}", ex.Message); return true; }); } Console.WriteLine("Dequeue items count :{0}", count); }
五、Barrier(屏障同步)
barrier叫做屏障,就像下圖中的“紅色線”,如果我們的屏障設(shè)為4個(gè)task就認(rèn)為已經(jīng)滿了的話,那么執(zhí)行中先到的task必須等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設(shè)置操作為new Barrier(4,(i)=>{})。SignalAndWait給我們提供了超時(shí)的重載,為了能夠取消后續(xù)執(zhí)行
//四個(gè)task執(zhí)行 static Task[] tasks = new Task[4]; static Barrier barrier = null; static void Main(string[] args) { barrier = new Barrier(tasks.Length, (i) => { Console.WriteLine("**********************************************************"); Console.WriteLine("\n屏障中當(dāng)前階段編號(hào):{0}\n", i.CurrentPhaseNumber); Console.WriteLine("**********************************************************"); }); for (int j = 0; j < tasks.Length; j++) { tasks[j] = Task.Factory.StartNew((obj) => { var single = Convert.ToInt32(obj); LoadUser(single); barrier.SignalAndWait(); LoadProduct(single); barrier.SignalAndWait(); LoadOrder(single); barrier.SignalAndWait(); }, j); } Task.WaitAll(tasks); Console.WriteLine("指定數(shù)據(jù)庫中所有數(shù)據(jù)已經(jīng)加載完畢!"); Console.Read(); } static void LoadUser(int num) { Console.WriteLine("當(dāng)前任務(wù):{0}正在加載User部分?jǐn)?shù)據(jù)!", num); } static void LoadProduct(int num) { Console.WriteLine("當(dāng)前任務(wù):{0}正在加載Product部分?jǐn)?shù)據(jù)!", num); } static void LoadOrder(int num) { Console.WriteLine("當(dāng)前任務(wù):{0}正在加載Order部分?jǐn)?shù)據(jù)!", num); }
到此這篇關(guān)于C#并行編程之Task同步機(jī)制的文章就介紹到這了。希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
解決WPF附加屬性的Set函數(shù)不調(diào)用的問題
這篇文章介紹了解決WPF附加屬性的Set函數(shù)不調(diào)用的方法,文中通過示例代碼介紹的非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06