C#并行編程之?dāng)?shù)據(jù)并行Tasks.Parallel類
一、并行概念
1、并行編程
在.NET 4中的并行編程是依賴Task Parallel Library(后面簡稱為TPL) 實(shí)現(xiàn)的。在TPL中,最基本的執(zhí)行單元是task(中文可以理解為"任務(wù)"),一個task就代表了你要執(zhí)行的一個操作。你可以為你所要執(zhí)行的每一個操作定義一個task,TPL就負(fù)責(zé)創(chuàng)建線程來執(zhí)行你所定義的task,并且管理線程。TPL是面向task的,自動的;而傳統(tǒng)的多線程是以人工為導(dǎo)向的。
現(xiàn)在已經(jīng)進(jìn)入了多核的時代,我們的程序如何更多的利用好硬件cpu,答案是并行處理。在.net4.0之前我們要開發(fā)并行的程序是非常的困難,在.net4.0中,在命名空間System.Threading.Tasks提供了方便的并行開發(fā)的類庫。
2、數(shù)據(jù)并行
數(shù)據(jù)并行指的是對源集合或數(shù)組的元素同時(即,并行)執(zhí)行相同操作的場景。 在數(shù)據(jù)并行操作中,對源集合進(jìn)行分區(qū),以便多個線程能夠同時在不同的網(wǎng)段上操作。
任務(wù)并行庫 (TPL) 支持通過 System.Threading.Tasks.Parallel 類實(shí)現(xiàn)的數(shù)據(jù)并行。 此類對 for 循環(huán)和 foreach 循環(huán)提供了基于方法的并行執(zhí)行。你為Parallel.For 或 Parallel.ForEach 循環(huán)編寫的循環(huán)邏輯與編寫連續(xù)循環(huán)的相似。 無需創(chuàng)建線程或列工作項(xiàng)。 在基本循環(huán)中,不需要加鎖。TPL 為你處理所有低級別的工作。
Parallel.For()和Parallel.ForEach()方法多次調(diào)用同一個方法,而Parallel.Invoke()方法允許同時調(diào)用不同的方法。
二、Parallel.Invoke():并行調(diào)用多個任務(wù) 。
例1:同時調(diào)用2個任務(wù)
static void Main(string[] args) { var watch = Stopwatch.StartNew(); Parallel.Invoke(Run1, Run2); watch.Stop(); Console.WriteLine("我是并行開發(fā),總共耗時:{0}", watch.ElapsedMilliseconds) } static void Run1() { Console.WriteLine("我是任務(wù)一,我跑了3s"); Thread.Sleep(3000); } static void Run2() { Console.WriteLine("我是任務(wù)二,我跑了5s"); Thread.Sleep(5000); }
例2:說明并不是每個任務(wù)一個線程。
// 定義一個線程局部變量,返回其線程名 ThreadLocal<string> ThreadName = new ThreadLocal<string>(() => { return "Thread" + Thread.CurrentThread.ManagedThreadId; }); // 打印出當(dāng)前線程名的方法。 Action action = () => { // 如果 ThreadName.IsValueCreated 為true,在這個線程上不是第一次運(yùn)行這個方法。 bool repeat = ThreadName.IsValueCreated; Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : ""); }; // 調(diào)用8個方法,你應(yīng)該會看到一些重復(fù)的線程名 Parallel.Invoke(action, action, action, action, action, action, action, action); ThreadName.Dispose();
三、Parallel.For(): for 循環(huán)的并行運(yùn)算
我們知道串行代碼中也有一個for,但是那個for并沒有用到多核,而Paraller.for它會在底層根據(jù)硬件線程的運(yùn)行狀況來充分的使用所有的可利用的硬件線程,注意這里的Parallel.for的步行是1。
在For()方法中,前兩個參數(shù)定義了循環(huán)的開頭和結(jié)束。示例從0迭代到9。第3個參數(shù)是一個 Action<int>委托。整數(shù)參數(shù)是循環(huán)的迭代次數(shù),該參數(shù)被傳遞給Action < int >委托引用的方法。 Parallel.For方法的返回類型是ParallelLoopResult結(jié)構(gòu),它提供了循環(huán)是否結(jié)束的信息。
ParallelLoopResult result = Parallel.For(0, 10, i => { Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId); Thread.Sleep(10); }); Console.WriteLine(result.IsCompleted);
首先先寫一個普通的循環(huán):
private void NormalFor() { for (var i = 0; i < 10000; i++) { for (var j = 0; j < 1000; j++) { for (var k = 0; k < 100; k++) { DoSomething(); } } } }
再看一個并行的For語句:
private void ParallelFor() { Parallel.For(0, 10000, i => { for (int j = 0; j < 1000; j++) { for (var k = 0; k < 100; k++) { DoSomething(); } } }); }
上面的例子中,只是將最外層的For語句替換成了Parallel.For,Parallel執(zhí)行速度可以提高近一倍。
四、Parallel.ForEach():foreach 循環(huán)的并行運(yùn)算
private void NormalForeach() { foreach (var file in GetFiles()) { DoSomething(); } } private void ParallelForeach() { Parallel.ForEach(GetFiles(), file => { DoSomething(); }); }
ForEach的使用跟For使用幾乎是差不多了,只是在對非泛型的Collection進(jìn)行操作的時候,需要通過Cast方法進(jìn)行轉(zhuǎn)換。
ForEach的獨(dú)到之處就是可以將數(shù)據(jù)進(jìn)行分區(qū),每一個小區(qū)內(nèi)實(shí)現(xiàn)串行計算,分區(qū)采用Partitioner.Create實(shí)現(xiàn)。
for (int j = 1; j < 4; j++) { Console.WriteLine("\n第{0}次比較", j); ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); for (int i = 0; i < 3000000; i++) { bag.Add(i); } Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); bag = new ConcurrentBag<int>(); watch = Stopwatch.StartNew(); watch.Start(); Parallel.ForEach(Partitioner.Create(0, 3000000), i => { for (int m = i.Item1; m < i.Item2; m++) { bag.Add(m); } }); Console.WriteLine("并行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); }
五、線程局部變量
下面這段代碼多次運(yùn)行每次的結(jié)果都不一樣,因?yàn)閠otal變量是公共的,而我們的程序是多個線程的加,而多個線程之間是不能把數(shù)據(jù)共享的。
public void NormalParallelTest() { int[] nums = Enumerable.Range(0, 1000000).ToArray(); long total = 0; Parallel.For(0,nums.Length,i=> { total += nums[i]; }); Console.WriteLine("The total is {0}", total); }
其實(shí)我們需要的是在每個線程中計算出一個和值,然后再進(jìn)行累加。我們來看看線程局部變量:
泛型方法Parallel.For<T>的原型:
public static ParallelLoopResult For<TLocal> (int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally );
- TLocal:線程變量的類型;第一個、第二個參數(shù)就不必多說了,就是起始值跟結(jié)束值。
- localInit:每個線程的線程局部變量初始值的設(shè)置;
- body:每次循環(huán)執(zhí)行的方法,其中方法的最后一個參數(shù)就是線程局部變量;
- localFinally:每個線程之后執(zhí)行的方法。
1、Parallel.For中定義局部變量:
從2開始,累加2個,得49.
int[] nums = Enumerable.Range(0, 10).ToArray(); long total = 0; Parallel.For<long>(0, nums.Length, () => { return 2; }, (j, loop, subtotal) =>//1、每次循環(huán)執(zhí)行的方法 { subtotal += nums[j]; Console.WriteLine("主體: thread {1}, task {2},結(jié)果:{0}", j+ ":" +nums[j] + "-" + subtotal, Thread.CurrentThread.ManagedThreadId, Task.CurrentId); return subtotal; }, (x) =>//2、每個線程執(zhí)行之后執(zhí)行的方法 { Console.WriteLine(" 最終執(zhí)行:thread {1}, task {2},結(jié)果:{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId); Interlocked.Add(ref total, x); }); Console.WriteLine("The total is {0}", total);
2、Parallel.Each中定義局部變量:
要注意的是,我們必須要使用ForEach<TSource, TLocal>,因?yàn)榈谝粋€參數(shù)表示的是迭代源的類型,第二個表示的是線程局部變量的類型,其方法的參數(shù)跟For是差不多的。
public void ForeachThreadLocalTest() { int[] nums = Enumerable.Range(0, 1000000).ToArray(); long total = 0; Parallel.ForEach<int,long>(nums,()=>0, (member,loopState,subTotal)=>//1、每次循環(huán)執(zhí)行的方法 { subTotal += member; return subTotal; }, (perLocal)=>//2、每個線程執(zhí)行之后執(zhí)行的方法 Interlocked.Add(ref total,perLocal) ); Console.WriteLine("The total is {0}", total); }
六、Break、Stop中斷與停止線程
在并行循環(huán)的委托參數(shù)中提供了一個ParallelLoopState,該實(shí)例提供了Break和Stop方法來幫我們實(shí)現(xiàn)。
- Break“中斷”:表示完成當(dāng)前線程上當(dāng)前迭代之前的所有線程上的所有迭代,然后退出循環(huán)。(比如并行計算正在迭代100,那么break后程序還會迭代所有小于100的。)
- Stop“停止”:表示在方便的情況下盡快停止所有迭代。(比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。)
首先我們可以看到在Parallel.For的一個重載方法:
public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState > body)
在委托的最后一個參數(shù)類型為ParallelLoopState,而ParallelLoopState里面提供給我們兩個方法:Break、Stop來終止迭代。
private void StopLoop() { var Stack = new ConcurrentStack<string>(); Parallel.For(0, 10000, (i, loopState ) => { if (i < 1000) Stack.Push(i.ToString()); else { loopState.Stop(); return; } }); Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count); }
七、Cancel取消循環(huán)
在并行的循環(huán)中支持通過傳遞ParallelOptions參數(shù)中的CancellationToken進(jìn)行取消循環(huán)的控制,我們可以CancellationTokenSource實(shí)例化之后傳遞給ParallelOptions對象Cancellation值。下面來看個示例:
在For循環(huán)的實(shí)現(xiàn)代碼內(nèi)部,Parallel類驗(yàn)證CancellationToken 的結(jié)果,并取消操作。一旦取消操作,F(xiàn)or()方法就拋出個OperationCanceledException類型的異常,這是本例捕獲的異常。使用 CancellationTokeri可以注冊取消操作時的信息。為此,需要調(diào)用Register方法,并傳遞一個在取消 操作時調(diào)用的委托。
var cts = new CancellationTokenSource(); cts.Token.Register(() =>Console.WriteLine("*** token canceled")); // start a task that sends a cancel after 500 ms new Task(() => { Thread.Sleep(500); cts.Cancel(false); }).Start(); try { ParallelLoopResult result = Parallel.For(0, 100, new <strong>ParallelOptions</strong>() { CancellationToken = cts.Token, }, x => { Console.WriteLine("loop {0} started", x); int sum = 0; for (int i = 0; i < 100; i++) { Thread.Sleep(2); sum += i; } Console.WriteLine("loop {0} finished", x); }); } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); }
八、Handel Exceptions異常處理
在處理并行循環(huán)的異常的與順序循環(huán)異常的處理是有所不同的,并行循環(huán)里面可能會一個異常在多個循環(huán)中出現(xiàn),或則一個線程上的異常導(dǎo)致另外一個線程上也出現(xiàn)異常。比較好的處理方式就是,首先獲取所有的異常最后通過AggregateException來包裝所有的循環(huán)的異常,循環(huán)結(jié)束后進(jìn)行throw。看一段示例代碼:
private void HandleNumbers(int[] numbers) { var exceptions = new ConcurrentQueue<Exception>(); Parallel.For(0, numbers.Length, i => { try { if (numbers[i] > 10 && numbers[i] < 20) { throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i)); } } catch (Exception e) { exceptions.Enqueue(e); } }); if (exceptions.Count > 0) throw new AggregateException(exceptions); }
測試方法:
public void HandleExceptions() { var numbers = Enumerable.Range(0, 10000).ToArray(); try { this.HandleNumbers(numbers); } catch(AggregateException exceptions) { foreach (var ex in exceptions.InnerExceptions) { Console.WriteLine(ex.Message); } } }
對上面的方法說明下,在HandleNumbers方法中,就是一個小的demo如果元素的值出現(xiàn)在10-20之間就拋出異常。在上面我們的處理方法就是:在循環(huán)時通過隊列將所有的異常都集中起來,循環(huán)結(jié)束后來拋出一個AggregateException。
到此這篇關(guān)于C#并行編程之?dāng)?shù)據(jù)并行Tasks.Parallel類的文章就介紹到這了。希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
C#窗體實(shí)現(xiàn)點(diǎn)餐系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了C#窗體實(shí)現(xiàn)點(diǎn)餐系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-08-08C# WebApi+Webrtc局域網(wǎng)音視頻通話實(shí)例
這篇文章主要為大家詳細(xì)介紹了C# WebApi+Webrtc局域網(wǎng)音視頻通話實(shí)例,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-07-07協(xié)定需要會話,但是綁定“BasicHttpBinding”不支持它或者因配置不正確而無法支持它
在IIS7及以上版本服務(wù)器中提供了基于WAS的無.SVC文件的WCF服務(wù)激活功能,能夠提供基于HTTP和非HTTP協(xié)議的訪問,通過添加Windows Server AppFabric可以更方便的管理WCF服務(wù)2012-12-12C#+RedisSearch實(shí)現(xiàn)高性能全文搜索
Redis?Search是一個Redis模塊,它使用壓縮的倒排索引來實(shí)現(xiàn)快速的索引和低內(nèi)存占用,本文主要介紹了C#如何使用RedisSearch實(shí)現(xiàn)高性能全文搜索,希望對大家有所幫助2023-07-07unity使用socket編程實(shí)現(xiàn)聊天室功能
這篇文章主要為大家詳細(xì)介紹了unity使用socket編程實(shí)現(xiàn)聊天室功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-11-11詳解Unity使用ParticleSystem粒子系統(tǒng)模擬藥水在血管中流動(粒子碰撞)
這篇文章主要介紹了Unity使用ParticleSystem粒子系統(tǒng)模擬藥水在血管中流動(粒子碰撞),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05將ocx文件轉(zhuǎn)換成C#程序引用的DLL文件的辦法
將ocx文件轉(zhuǎn)換成C#程序引用的DLL文件的辦法,需要的朋友可以參考一下2013-03-03