C#并行編程之Task任務(wù)
任務(wù),基于線程池。其使我們對并行編程變得更簡單,且不用關(guān)心底層是怎么實現(xiàn)的。
System.Threading.Tasks.Task類是Task Programming Library(TPL)中最核心的一個類。
一、任務(wù)與線程
1:任務(wù)是架構(gòu)在線程之上的,也就是說任務(wù)最終還是要拋給線程去執(zhí)行。
2:任務(wù)跟線程不是一對一的關(guān)系,比如開10個任務(wù)并不是說會開10個線程,這一點任務(wù)有點類似線程池,但是任務(wù)相比線程池有很小的開銷和精確的控制。
我們用VS里面的“并行任務(wù)”看一看,快捷鍵Ctrl+D,K,或者找到“調(diào)試"->"窗口“->"并行任務(wù)“,我們在WaitAll方法處插入一個斷點,最終我們發(fā)現(xiàn)任務(wù)確實托管給了線程。
二、初識Task
兩種構(gòu)建Task的方式,只是StartNew方法直接構(gòu)建出了一個Task之后又調(diào)用了其Start方法。
Task.Factory.StartNew (() => { Console.WriteLine("Hello word!"); }); Task task = new Task (() => { Console.WriteLine("Hello,Word!"); }); task.Start();
在Task內(nèi)部執(zhí)行的內(nèi)容我們稱作為Task的Body,Task提供了多個初始化重載的方法。
public Task(Action action); public Task(Action<object> action, object state );給action傳參數(shù) public Task(Action action, CancellationToken cancellationToken); public Task(Action action, TaskCreationOptions creationOptions);
例如使用了重載方法的State參數(shù):
Task task2 = new Task((obj ) => { Console.WriteLine("Message: {0}", obj); }, "Say \"Hello\" from task2"); task2.Start(); }
補充細(xì)節(jié)
在創(chuàng)建Task的時候,Task有很多的構(gòu)造函數(shù)的重載,一個主要的重載就是傳入TaskCreateOptions的枚舉:
- TaskCreateOptions.None:用默認(rèn)的方式創(chuàng)建一個Task
- TaskCreateOptions.PreferFairness:請求scheduler盡量公平的執(zhí)行Task(Task和線程一樣,有優(yōu)先級的)
- TaskCreateOptions.LongRunning:聲明Task將會長時間的運行。
- TaskCreateOptions.AttachToParent:因為Task是可以嵌套的,所以這個枚舉就是把一個子task附加到一個父task中。
三、任務(wù)的結(jié)果
任務(wù)結(jié)束時,它可以把一些有用的狀態(tài)信總寫到共享對象中。這個共享對象必須是線程安全的。
另一個方式是使用返回某個結(jié)果的任務(wù)。使用Task類的泛型版本,就可以定義返冋某個結(jié)果的任務(wù)的返回類型。
使用返回值的Result屬性可獲取是在一個Task運行完成才會獲取的,所以task2是在task1運行完成后,才開始運行,也就是說上面的兩個result的值不管運行多少次都是不會變的。其中我們也可以通過CurrentId來獲取當(dāng)前運行的Task的編號。
var loop = 0; var task1 = new Task<int>(() => { for (var i = 0; i < 1000; i++) loop += i; return loop; }); task1.Start(); var loopResut = task1.Result; var task2 = new Task<long>(obj=> { long res = 0; var looptimes = (int)obj; for (var i = 0; i < looptimes; i++) res += i; return res; },loopResut); task2.Start(); var resultTask2 = task2.Result; Console.WriteLine("任務(wù)1的結(jié)果':{0}\n任務(wù)2的結(jié)果:{1}", loopResut,resultTask2);
.NET 4.5 :Task.Run
在 .NET Framework 4.5 及更高版本(包括 .NET Core 和 .NET Standard)中,使用靜態(tài) Task.Run 方法作為 TaskFactory.StartNew 的快捷方式。
Task.Run的跟Task.Factory.StarNew和new Task相差不多,不同的是前兩種是放進線程池立即執(zhí)行,而Task.Run則是等線程池空閑后在執(zhí)行。
Run方法只接受無參的Action和Func委托,另外兩個接受一個object類型的參數(shù)。
在msdn中TaskFactory.StartNew的備注信息如下:
四、連續(xù)任務(wù)
所謂的延續(xù)的Task就是在第一個Task完成后自動啟動下一個Task。我們通過ContinueWith方法來創(chuàng)建延續(xù)的Task。我們假設(shè)有一個接受xml解析的服務(wù),首先從某個地方接受文件,然后解析入庫,最后發(fā)送是否解析正確的回執(zhí)。在每次調(diào)用ContinueWith方法時,每次會把上次Task的引用傳入進來,以便檢測上次Task的狀態(tài),比如我們可以使用上次Task的Result屬性來獲取返回值。
var ReceiveTask = new Task(() => ReceiveXml()); var ResolveTask = ReceiveTask .ContinueWith <bool>((r) => ResolveXml()); var SendFeedBackTask = ResolveTask.ContinueWith <string>((s) => SendFeedBack(s.Result)); ReceiveTask.Start(); Console.WriteLine(SendFeedBackTask.Result);
上面的代碼我們也可以這么寫:
var SendFeedBackTask = Task.Factory.StartNew(() => ReceiveXml()) .ContinueWith<bool>(s => ResolveXml()) .ContinueWith<string>(r => SendFeedBack(r.Result)); Console.WriteLine(SendFeedBackTask.Result);
無論前一個任務(wù)是如何結(jié)束的,前面的連續(xù)任務(wù)總是在前一個任務(wù)結(jié)束時啟動。使用 TaskContinuationOptions枚舉中的值,可以指定,連續(xù)任務(wù)只有在起始任務(wù)成功(或失敗)結(jié)束吋啟動。可能的值是 OnlyOnFaulted、NotOoFaulted、Onl)OnCanceIed、NotOnCanceled 和 OnlyOnRanToCompletion
Task t5 = t1.ContinueWith(DoOnError, TaskContinuationOptions.OnlyOnFaulted);
五、分離嵌套任務(wù)
有些情況下我們需要創(chuàng)建嵌套的Task,嵌套里面又分為分離的和不分離的。其創(chuàng)建的方式很簡單,就是在Task的body里面創(chuàng)建一個新的Task。如果新的Task未指定AttachedToParent選項,那么就是分離嵌套的。我們看下面這段代碼。下面的代碼中outTask.Wait()表示等待outTask執(zhí)行完成。
var outTask = Task.Factory.StartNew(() => { Console.WriteLine("Outer task beginning..."); var childTask = Task.Factory.StartNew(() => { Thread.SpinWait(3000000); Console.WriteLine("Detached nested task completed."); }); }); outTask.Wait(); Console.WriteLine("Outer task completed."); Console.ReadKey();
我們可以看到運行結(jié)果是:
六、子任務(wù)
我們將上面的代碼加上TaskCreationOptions選項:
如果父任務(wù)在子任務(wù)之前結(jié)束,父任務(wù)的狀態(tài)就顯示為WaitingForChildrenToComplete。只要子任務(wù)也結(jié)束時,父任務(wù)的狀態(tài)就變成RanToCompletion。.、當(dāng)然,如果父任務(wù)用TaskCreatiooOptions 枚舉中的DetachedFromParent創(chuàng)建子任務(wù)時,這就無效。
var outTask = Task.Factory.StartNew(() => { Console.WriteLine("Outer task beginning..."); var childTask = Task.Factory.StartNew(() => { Thread.SpinWait(3000000); Console.WriteLine("Detached nested task completed."); },TaskCreationOptions.AttachedToParent); }); outTask.Wait(); Console.WriteLine("Outer task completed.");
看到運行結(jié)果:
七、取消任務(wù)
在4.0中給我們提供一個“取消標(biāo)記”叫做CancellationTokenSource.Token,在創(chuàng)建task的時候傳入此參數(shù),就可以將主線程和任務(wù)相關(guān)聯(lián)。我們通過cancellation的tokens來取消一個Task。
有點要特別注意的,當(dāng)我們調(diào)用了Cancel()方法之后,.NET Framework不會強制性的去關(guān)閉運行的Task。我們自己必須去檢測之前在創(chuàng)建Task時候傳入的那個CancellationToken。
一旦cancel被調(diào)用,task將會拋出OperationCanceledException來中斷此任務(wù)的執(zhí)行,最后將當(dāng)前task的Status的IsCanceled屬性設(shè)為true。
1、在很多Task的Body里面包含循環(huán),我們可以在輪詢的時候判斷IsCancellationRequested屬性是否為True,如果是True的話,就可以停止循環(huán)以及釋放資源,同時拋出OperationCanceledException異常出來。
2、或者在任務(wù)中設(shè)置“取消信號“叫做ThrowIfCancellationRequested,來等待主線程使用Cancel來通知。
3、檢測task是否被cancel就是調(diào)用CancellationToken.WaitHandle屬性。CancellationToken的WaitOne()方法會阻止task的運行,只有CancellationToken的cancel()方法被調(diào)用后,這種阻止才會釋放。
var cts = new CancellationTokenSource(); var ct = cts.Token; var task = Task.Factory.StartNew(() => { for (var i = 0; i < 10000000; i++) { if (ct.IsCancellationRequested) { Console.WriteLine("任務(wù)開始取消..."); throw new OperationCanceledException(ct); } //或者直接在檢測到異常時,扔出異常: token.ThrowIfCancellationRequested(); //或者等待 WaitHandle: token.WaitHandle.WaitOne(); } },ct);//傳入CancellationToken作為Task第二個參數(shù) ct.Register(() => { Console.WriteLine("已經(jīng)取消"); }); Thread.Sleep(5000); cts.Cancel();//如果想要取消一個Task的運行,只要調(diào)用CancellationToken實例的Cancel()方法就可以了。 try { task.Wait(); } catch (AggregateException e) { foreach (var v in e.InnerExceptions) Console.WriteLine("msg: " + v.Message); }
八、休眠:等待時間執(zhí)行
在TPL中我們可以通過三種方式進行等待,一是通過CancellTaken的WaitHanle進行等待、第二種則是通過傳統(tǒng)的Tread.Sleep方法、第三種則通過Thread.SpainWait方法。
1、CancellToken方式:每次我們等待十秒鐘之后,再進行下次輸出。
有一點要注意:WaitOne()方法只有在設(shè)定的時間間隔到了,或者Cancel方法被調(diào)用,此時task才會被喚醒。如果如果cancel()方法被調(diào)用而導(dǎo)致task被喚醒,那么CancellationToken.WaitHandle.WaitOne()方法就會返回true,如果是因為設(shè)定的時間到了而導(dǎo)致task喚醒,那么CancellationToken.WaitHandle.WaitOne()方法返回false。
var cts = new CancellationTokenSource(); var ct = cts.Token; var task = new Task(() => { for (var i = 0; i < 100000; i++) { var cancelled = ct.WaitHandle.WaitOne(1000 ); Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled); if (cancelled) { throw new OperationCanceledException(ct); } } }, ct); task.Start();
2、上面的功能如果我們要是通過Tread.Sleep方式實現(xiàn):
var task = new Task(() => { for (var i = 0; i < 100000; i++) { Thread.Sleep(10000); var cancelled =ct.IsCancellationRequested; Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled); if (cancelled) { throw new OperationCanceledException(ct); } } },ct);
3、Thread.SpainWait則跟上面兩種方式完全不同,上面的兩種方式都是會在線程調(diào)度程序不考慮改線程,直等到運行結(jié)束。而Thread.SpainWait的作用實質(zhì)上會將處理器置于十分緊密的循環(huán)中,主要的作用是來實現(xiàn)同步鎖的作用。并不常用,大部分情況下我們可以通過Lock的方式來實現(xiàn)。
Thread.SpinWait(10000);
九、等待任務(wù)執(zhí)行
在很多時候我們也許需要等待同時開啟的幾個線程完成之后再來做其他事,在TPL中提供了幾種方式來等待任務(wù)執(zhí)行。Task.Wait等待單個任務(wù)完成;Task.WaitAll等待所有的Task完成、TaskAny等在其中的任何一個或則多個任務(wù)完成。
1、Task.Wait: 等待單獨的一個Task執(zhí)行完成
共有5個重載:Wait()、Wait(CancellToken)、Wait(Int32)、Wait(TimeSpan)、Wait(TimeSpan、CancellToken)。各個重載方法的含義:
- 1)Wait():等待整個任務(wù)完成或者取消或者出現(xiàn)異常;
- 2)Wait(CancellToken):等待任務(wù)直到CancellToken調(diào)用取消或者完成,或者出現(xiàn)異常;
- 3)Wait(Int32):等待任務(wù),未完成則到指定的時間;
- 4)Wait(TimeSpan):同上;
- 5)Wait(TimeSpan、CancellToken):等待任務(wù)到指定時間,或者CancellToken調(diào)用取消或者任務(wù)完成。
static void Main(string[] args) { var tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; Task task = createTask(token,6); task.Start(); Console.WriteLine("Wait() complete."); task.Wait(); Console.WriteLine("Task Completed."); task = createTask(token,3); task.Start(); Console.WriteLine("Wait(2) secs for task to complete."); bool completed = task.Wait(2000); Console.WriteLine("Wait ended - task completed: {0}", completed); task = createTask(token,4); task.Start(); Console.WriteLine("Wait(2,token) for task to complete."); completed = task.Wait(2000, token); Console.WriteLine("Wait ended - task completed: {0} task cancelled {1}", completed, task.IsCanceled); Console.WriteLine("Main method complete. Press enter to finish."); Console.ReadLine(); } static Task createTask(CancellationToken token,int loop) { return new Task(() => { for (int i = 0; i < loop; i++) { token.ThrowIfCancellationRequested(); Console.WriteLine("Task - Int value {0}", i); token.WaitHandle.WaitOne(1000); } }, token); }
循環(huán)都會等待1秒鐘,這樣我們可以看看Wait(2000)的效果,看看運行后的效果:
從上面的例子可以看出,wait方法子task執(zhí)行完成之后會返回true。
注意:當(dāng)在執(zhí)行的task內(nèi)部拋出了異常之后,這個異常在調(diào)用wait方法時會被再次拋出。后面再"異常處理篇"會講述。
2、Task.WaitAll方法: 等待多個task
是等待所有的任務(wù)完成,也有5個重載, 也可以傳遞時間以及Token參數(shù),進行等待時間以及取消Token的控制。
var tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; var task1 = createTask(token,2); var task2 = createTask(token, 5); task1.Start(); task2.Start(); Console.WriteLine("Waiting for tasks to complete."); Task.WaitAll(task1, task2); Console.WriteLine("Tasks Completed.");
注意:如果在等在的多個task之中,有一個task拋出了異常,那么調(diào)用WaitAll()方法時就會拋出異常。
ContinueWith結(jié)合WaitAll來玩一把
當(dāng)這兩者結(jié)合起來,我們就可以玩一些復(fù)雜一點的東西,比如說現(xiàn)在有4個任務(wù),其中t1需要串行,t2-t3可以并行,t4需要串行.
ConcurrentStack<int> stack = new ConcurrentStack<int>(); //t1先執(zhí)行 var t1 = Task.Factory.StartNew(() => { stack.Push(1); stack.Push(2); }); //t2,t3并行執(zhí)行 var t2 = t1. ContinueWith (t => { int result; stack.TryPop(out result); }); //t2,t3并行執(zhí)行 var t3 = t1. ContinueWith (t => { int result; stack.TryPop(out result); }); //等待t2和t3執(zhí)行完 Task.WaitAll(t2, t3); //t4z再執(zhí)行 var t4 = Task.Factory.StartNew(() => { Console.WriteLine("當(dāng)前集合元素個數(shù):" + stack.Count); });
3、Task.WaitAny
等待任何一個任務(wù)完成,完成之后返回其完成的任務(wù)的Index:
var tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; var task1 = createTask(token,2); var task2 = createTask(token, 5); task1.Start(); task2.Start(); Console.WriteLine("Waiting for tasks to complete."); var index = Task.WaitAny(task1, task2); Console.WriteLine("Tasks Completed.Index is {0}",index);
十、異常處理
在TPL中,異常的觸發(fā)器主要是這幾個:
Task.Wait(), Task.WaitAll(), Task,WaitAny(),Task.Result。而在TPL出現(xiàn)的異常都會以AggregateException的示例拋出,我們在進行基本的異常處理時,可以通過查看AggregateException的InnerExceptions來進行內(nèi)部異常的捕獲:
var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; var task1 = new Task(() => { throw new NullReferenceException() { Source ="task1"}; }); var task2 = new Task(() => { throw new ArgumentNullException("a", "a para can not be null") { Source="task2"}; }); task1.Start(); task2.Start(); try { Task.WaitAll(task1, task2); } catch(AggregateException ex) { foreach (Exception inner in ex.InnerExceptions) { Console.WriteLine("Exception type {0} from {1}", inner.GetType(), inner.Source); } }
同時,我們還可以通過Task的幾個屬性來判斷Task的狀態(tài),如:IsCompleted, IsFaulted, IsCancelled,Exception。
另外,AggregateException中還提供了Handle方法來給我們方法來給我們處理每個內(nèi)部 異常,每個異常發(fā)生時都會調(diào)用Handle傳入的delegate ,同時我們需要通過返回True,False來告訴異常是否已經(jīng)被處理,比如對于OperationCanceledException我們知道是取消了Task,是肯定可以處理的:
try { Task.WaitAll(task1, task2, task3, task4); } catch(AggregateException ex) { ex.Handle((e) => { if (e is OperationCanceledException) { return true; } else { return false; } }); }
十一、執(zhí)行晚加載的Task(Lazily Task)
晚加載,或者又名延遲初始化,主要的好處就是避免不必要的系統(tǒng)開銷。在并行編程中,可以聯(lián)合使用Lazy變量和Task<>.Factory.StartNew()做到這點。(Lazy變量時.NET 4中的一個新特性,這里大家不用知道Lazy的具體細(xì)節(jié))。
Lazy變量只有在用到的時候才會被初始化。所以我們可以把Lazy變量和task的創(chuàng)建結(jié)合:只有這個task要被執(zhí)行的時候才去初始化。
// do the same thing in a single statement Lazy<Task<string>> lazyData2 = new Lazy<Task<string>>( () => Task<string>.Factory.StartNew(() => { Console.WriteLine("Task body working..."); return "Task Result"; })); Console.WriteLine("Calling second lazy variable"); Console.WriteLine("Result from task: {0}", lazyData2.Value.Result);
首先我們回想一下,在之前的系列文章中我們是怎么定義一個task的:直接new,或者通過task的factory來創(chuàng)建,因為創(chuàng)建task的代碼是在main函數(shù)中的,所以只要new了一個task,那么這個task就被初始化?,F(xiàn)在如果用了Lazy的task,那么現(xiàn)在我們初始化的就是那個Lazy變量了,而沒有初始化task,(初始化Lazy變量的開銷小于初始化task),只有當(dāng)調(diào)用了lazyData.Value時,Lazy變量中包含的那個task才會初始化。
到此這篇關(guān)于C#并行編程之Task任務(wù)的文章就介紹到這了。希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
C#中的DataSet、string、DataTable、對象轉(zhuǎn)換成Json的實現(xiàn)代碼
這篇文章主要介紹了C#中的DataSet、string、DataTable、對象轉(zhuǎn)換成Json的實現(xiàn)代碼,需要的朋友可以參考下2014-09-09C#中FileSystemWatcher類實現(xiàn)監(jiān)控文件夾
在C#中,如果你想要監(jiān)控一個文件夾內(nèi)文件的變動情況,比如文件的創(chuàng)建、刪除、修改等,你可以使用FileSystemWatcher類,下面就來介紹一下FileSystemWatcher監(jiān)控的使用,感興趣的可以了解一下2024-03-03C#實現(xiàn)將音頻PCM數(shù)據(jù)封裝成wav文件
這篇文章主要為大家詳細(xì)介紹了C#如何實現(xiàn)將音頻PCM數(shù)據(jù)封裝成wav文件,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2023-10-10DevExpress之TreeList用法實例總結(jié)
這篇文章主要介紹了DevExpress之TreeList用法,對于C#初學(xué)者有一定的借鑒價值,需要的朋友可以參考下2014-08-08C#中字段、屬性、只讀、構(gòu)造函數(shù)賦值、反射賦值的問題
這篇文章主要介紹了C#中字段、屬性、只讀、構(gòu)造函數(shù)賦值、反射賦值的問題 ,非常不錯,具有一定的參考借鑒借鑒價值,需要的朋友可以參考下2018-08-08C# 實現(xiàn)視頻監(jiān)控系統(tǒng)(附源碼)
這篇文章主要介紹了C# 如何實現(xiàn)視頻監(jiān)控系統(tǒng),幫助大家更好的理解和使用c#,感興趣的朋友可以了解下2021-02-02