運用示例簡單講解C#取消令牌CancellationTokenSource
前言
相信大家在使用C#進行開發(fā)的時候,特別是使用異步的場景,多多少少會接觸到CancellationTokenSource??疵志椭浪腿∠惒饺蝿?wù)相關(guān)的,而且一看便知大名鼎鼎的CancellationToken就是它生產(chǎn)出來的。不看不知道,一看嚇一跳。它在取消異步任務(wù)、異步通知等方面效果還是不錯的,不僅好用而且夠強大。無論是微軟底層類庫還是開源項目涉及到Task相關(guān)的,基本上都能看到它的身影,而微軟近幾年也是很重視框架中的異步操作,特別是在.NET Core上基本上能看到Task的地方就能看到CancellationTokenSource的身影。這次我們抱著學(xué)習(xí)的態(tài)度,來揭開它的神秘面紗。
簡單示例
相信對于CancellationTokenSource基本的使用,許多同學(xué)已經(jīng)非常熟悉了。不過為了能夠讓大家?guī)胛恼碌墓?jié)奏,我們還是打算先展示幾個基礎(chǔ)的操作,讓大家找找感覺,回到那個熟悉的年代。
基礎(chǔ)操作
首先呈現(xiàn)一個最基礎(chǔ)的操作。
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = cancellationTokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("取消了???")); cancellationToken.Register(() => System.Console.WriteLine("取消了?。?!")); cancellationToken.Register(state => System.Console.WriteLine($"取消了。。。{state}"),"啊啊啊"); System.Console.WriteLine("做了點別的,然后取消了."); cancellationTokenSource.Cancel();
這個操作是最簡單的操作,我們上面提到過CancellationTokenSource就是用來生產(chǎn)CancellationToken的,還可以說CancellationToken是CancellationTokenSource的表現(xiàn),這個待會看源碼的時候我們會知道為啥這么說。這里呢我們給CancellationToken注冊幾個操作,然后使用CancellationTokenSource的Cancel方法取消操作,這時候控制臺就會打印結(jié)果如下
做了點別的,然后取消了.
取消了。。。啊啊啊
取消了?。?!
取消了???
通過上面簡單的示例,大家應(yīng)該非常輕松的理解了它的簡單使用。
定時取消
有的時候呢我們可能需要超時操作,比如我不想一直等著,到了一個固定的時間我就要取消操作,這時候我們可以利用CancellationTokenSource的構(gòu)造函數(shù)給定一個限定時間,過了這個時間CancellationTokenSource就會被取消了,操作如下
//設(shè)置3000毫秒(即3秒)后取消 CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(3000); CancellationToken cancellationToken = cancellationTokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("我被取消了.")); System.Console.WriteLine("先等五秒鐘."); await Task.Delay(5000); System.Console.WriteLine("手動取消.") cancellationTokenSource.Cancel();
然后在控制臺打印的結(jié)果是這個樣子的,活脫脫的為我們實現(xiàn)了內(nèi)建的超時操作。
先等五秒鐘.
我被取消了.
手動取消.
上面的寫法是在構(gòu)造CancellationTokenSource的時候設(shè)置超時等待,還有另一種寫法等同于這種寫法,使用的是CancelAfter方法,具體使用如下
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.Token.Register(() => System.Console.WriteLine("我被取消了.")); //五秒之后取消 cancellationTokenSource.CancelAfter(5000); System.Console.WriteLine("不會阻塞,我會執(zhí)行.");
這個操作也是定時取消操作,需要注意的是CancelAfter方法并不會阻塞執(zhí)行,所以打印的結(jié)果是
不會阻塞,我會執(zhí)行.
我被取消了.
關(guān)聯(lián)取消
還有的時候是這樣的場景,就是我們設(shè)置一組關(guān)聯(lián)的CancellationTokenSource,我們期望的是只要這一組里的任意一個CancellationTokenSource被取消了,那么這個被關(guān)聯(lián)的CancellationTokenSource就會被取消。說得通俗一點就是,我們幾個當中只要一個不在了,那么你也可以不在了,具體的實現(xiàn)方式是這樣的
//聲明幾個CancellationTokenSource CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationTokenSource tokenSource2 = new CancellationTokenSource(); CancellationTokenSource tokenSource3 = new CancellationTokenSource(); tokenSource2.Token.Register(() => System.Console.WriteLine("tokenSource2被取消了")); //創(chuàng)建一個關(guān)聯(lián)的CancellationTokenSource CancellationTokenSource tokenSourceNew = CancellationTokenSource.CreateLinkedTokenSource(tokenSource.Token, tokenSource2.Token, tokenSource3.Token); tokenSourceNew.Token.Register(() => System.Console.WriteLine("tokenSourceNew被取消了")); //取消tokenSource2 tokenSource2.Cancel();
上述示例中因為tokenSourceNew關(guān)聯(lián)了tokenSource、tokenSource2、tokenSource3所以只要他們其中有一個被取消那么tokenSourceNew也會被取消,所以上述示例的打印結(jié)果是
tokenSourceNew被取消了
tokenSource2被取消了
判斷取消
上面我們使用的方式,都是通過回調(diào)的方式得知CancellationTokenSource被取消了,沒辦法通過標識去得知CancellationTokenSource是否可用。不過微軟貼心的為我們提供了IsCancellationRequested屬性去判斷,需要注意的是它是CancellationToken的屬性,具體使用方式如下
CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = tokenSource.Token; //打印被取消 cancellationToken.Register(() => System.Console.WriteLine("被取消了.")); //模擬傳遞的場景 Task.Run(async ()=> { while (!cancellationToken.IsCancellationRequested) { System.Console.WriteLine("一直在執(zhí)行..."); await Task.Delay(1000); } }); //5s之后取消 tokenSource.CancelAfter(5000);
上述代碼五秒之后CancellationTokenSource被取消,因此CancellationTokenSource的Token也會被取消。反映到IsCancellationRequested上就是值為true說明被取消,為false說明沒被取消,因此控制臺輸出的結(jié)果是
一直在執(zhí)行...
一直在執(zhí)行...
一直在執(zhí)行...
一直在執(zhí)行...
一直在執(zhí)行...
被取消了.
還有另一種方式,也可以主動判斷任務(wù)是否被取消,不過這種方式簡單粗暴,直接是拋出了異常。如果是使用異步的方式的話,需要注意的是Task內(nèi)部異常的捕獲方式,否則對外可能還沒有感知到具體異常的原因,它的使用方式是這樣的,這里為了演示方便我直接換了一種更直接的方式
CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = tokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("被取消了.")); tokenSource.CancelAfter(5000); while (true) { //如果操作被取消則直接拋出異常 cancellationToken.ThrowIfCancellationRequested(); System.Console.WriteLine("一直在執(zhí)行..."); await Task.Delay(1000); }
執(zhí)行五秒之后則直接拋出 System.OperationCanceledException: The operation was canceled.異常,異步情況下注意異常處理的方式即可。通過上面這些簡單的示例,相信大家對CancellationTokenSource有了一定的認識,大概知道了在什么時候可以使用它,主要是異步取消通知,或者限定時間操作通知等等。CancellationTokenSource是個不錯的神器,使用簡單功能強大。
源碼探究
通過上面的示例,相信大家對CancellationTokenSource有了一個基本的認識,真的是非常強大,而且使用起來也非常的簡單,這也是c#語言的精妙之處,非常實用,讓你用起來的時候非常舒服,有種用著用著就想跪下的沖動。步入正題,接下來讓我們來往深處看看CancellationTokenSource的源碼,看看它的工作機制是啥。本文貼出的源碼是博主精簡過的,畢竟源碼太多不太可能全部粘貼出來,主要是跟著它的思路了解它的工作方式。
構(gòu)造入手
因為這一次呢CancellationTokenSource的初始化函數(shù)中有一個比較重要的構(gòu)造函數(shù),那就是可以設(shè)置定時超時的操作,那么我們就從它的構(gòu)造函數(shù)入手[點擊查看源碼👈]
//全局狀態(tài) private volatile int _state; //未取消狀態(tài)值 private const int NotCanceledState = 1; /// <summary> /// 無參構(gòu)造初始化狀態(tài) /// </summary> public CancellationTokenSource() => _state = NotCanceledState; /// <summary> /// 定時取消構(gòu)造 /// </summary> public CancellationTokenSource(TimeSpan delay) { //獲取timespan的毫秒數(shù) long totalMilliseconds = (long)delay.TotalMilliseconds; if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue) { throw new ArgumentOutOfRangeException(nameof(delay)); } //調(diào)用InitializeWithTimer InitializeWithTimer((int)totalMilliseconds); } public CancellationTokenSource(int millisecondsDelay) { if (millisecondsDelay < -1) { throw new ArgumentOutOfRangeException(nameof(millisecondsDelay)); } //調(diào)用InitializeWithTimer InitializeWithTimer(millisecondsDelay); }
無參構(gòu)造函數(shù)沒啥好說的,就是給全局state狀態(tài)初始化NotCanceledState的初始值,也就是初始化狀態(tài)。我們比較關(guān)注的是可以定時取消的構(gòu)造函數(shù),雖然是兩個構(gòu)造函數(shù),但是殊途同歸,本質(zhì)都是傳遞的毫秒整形參數(shù),而且調(diào)用的核心方法都是InitializeWithTimer,看來是一個定時器操作,這樣不奇怪了,我們看下InitializeWithTimer方法的實現(xiàn)[點擊查看源碼👈]
//任務(wù)完成狀態(tài)值 private const int NotifyingCompleteState = 2; //定時器 private volatile TimerQueueTimer? _timer; //定時器回調(diào)初始化 private static readonly TimerCallback s_timerCallback = TimerCallback; //定時器回調(diào)委托本質(zhì)是調(diào)用的CancellationTokenSource的NotifyCancellation方法 private static void TimerCallback(object? state) => ((CancellationTokenSource)state!).NotifyCancellation(throwOnFirstException: false); private void InitializeWithTimer(uint millisecondsDelay) { if (millisecondsDelay == 0) { //如果定時的毫秒為0,則設(shè)置全局狀態(tài)為NotifyingCompleteState _state = NotifyingCompleteState; } else { //如果超時毫秒不為0則初始化定時器,并設(shè)置定時器定時的回調(diào) _timer = new TimerQueueTimer(s_timerCallback, this, millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false); } }
通過這個方法,我們可以非常清晰的看到定時初始化的核心操作其實就是初始化一個定時器,而定時的時間就是我們初始化傳遞的毫秒數(shù),其中s_timerCallback是定時的回調(diào)函數(shù),即如果等待超時之后則調(diào)用這個委托,其本質(zhì)正是CancellationTokenSource的NotifyCancellation方法,這個方法正是處理超時之后的操作[點擊查看源碼👈]
//信號控制類,通過信號判斷是否需要繼續(xù)執(zhí)行或阻塞 private volatile ManualResetEvent? _kernelEvent; //throwOnFirstException函數(shù)是指示如果被取消了是否拋出異常 private void NotifyCancellation(bool throwOnFirstException) { //如果任務(wù)已經(jīng)取消則直接直接釋放定時器 if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState) { TimerQueueTimer? timer = _timer; if (timer != null) { _timer = null; timer.Close(); } //信號量涉及到了一個重要的屬性WaitHandle接下來會說 _kernelEvent?.Set(); //執(zhí)行取消操作,是取消操作的核心,講取消操作的時候咱們會著重說這個 ExecuteCallbackHandlers(throwOnFirstException); Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished"); } }
NotifyCancellation正是處理定時器到時的操作,說白了就是到了指定的時間但是沒有手動取消執(zhí)行的操作,其實也是執(zhí)行的取消操作,這個方法里涉及到了兩個比較重要的點,也是接下來我們會分析的點,這里做一下說明
- 首先是ManualResetEvent這個實例,這個類的功能是通過信號機制控制是否阻塞或執(zhí)行后續(xù)操作,與之相輔的還有另一個類AutoResetEvent。這兩個類實現(xiàn)的效果是一致的,只是ManualResetEvent需要手動重置初始狀態(tài),而AutoResetEvent則會自動重置。有關(guān)兩個類的說明,這里不做過多介紹,有需要了解的同學(xué)們可以自行百度。而CancellationTokenSource類的一個重要屬性WaitHandle正是使用的它。
- 還有一個是ExecuteCallbackHandlers方法,這個是CancellationTokenSource執(zhí)行取消操作的核心操作。為了保證閱讀的順序性,咱們在講取消操作的時候在重點講這個方法。
上面提到了,為了保證閱讀的順序性方便理解,咱們在本文接下來會講解這兩部分,就不再初始化這里講解了,這里做一下標記,以防大家覺得沒講清楚就繼續(xù)了。
小插曲WaitHandle
上面我們提到了CancellationTokenSource的WaitHandle屬性,它是基于ManualResetEvent實現(xiàn)的。這個算是一個稍微獨立的地方,我們可以先進行講解一下[點擊查看源碼👈]
private volatile ManualResetEvent? _kernelEvent; internal WaitHandle WaitHandle { get { ThrowIfDisposed(); //如果初始化過了則直接返回 if (_kernelEvent != null) { return _kernelEvent; } //初始化一個ManualResetEvent,給定初始值為false var mre = new ManualResetEvent(false); //線程安全操作如果有別的線程初始了則釋放上面初始化的操作 if (Interlocked.CompareExchange(ref _kernelEvent, mre, null) != null) { mre.Dispose(); } //如果任務(wù)已取消則后續(xù)操作不阻塞 if (IsCancellationRequested) { _kernelEvent.Set(); } return _kernelEvent; } }
通過這段代碼我們可以看到,如果使用了WaitHandle屬性則可以使用它實現(xiàn)簡單的阻塞通知操作,也就是收到取消通知操作之后我們可以執(zhí)行WaitHandle之后的操作,但是WaitHandle是internal修飾的,我們該怎么使用呢?莫慌,我們知道CancellationTokenSource的Token屬性獲取的是CancellationToken實例[點擊查看源碼👈]
public CancellationToken Token { get { ThrowIfDisposed(); return new CancellationToken(this); } }
直接實例化了一個CancellationToken實例返回去了,并傳遞了當前CancellationTokenSource實例,找到CancellationToken的這個構(gòu)造函數(shù)[點擊查看源碼👈]
private readonly CancellationTokenSource? _source; internal CancellationToken(CancellationTokenSource? source) => _source = source; public WaitHandle WaitHandle => (_source ?? CancellationTokenSource.s_neverCanceledSource).WaitHandle;
通過上面的代碼我們可以看到通過CancellationToken實例便可以使用WaitHandle屬性,實現(xiàn)我們訪問到它的效果,光是說的話可能有點迷糊,通過一個簡單的示例我們來了解WaitHandle的使用方式,簡單來看下
CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = tokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("被取消了.")); tokenSource.CancelAfter(5000); Task.Run(()=> { System.Console.WriteLine("阻塞之前"); cancellationToken.WaitHandle.WaitOne(); System.Console.WriteLine("阻塞取消,執(zhí)行到了."); }); System.Console.WriteLine("執(zhí)行到了這里");
在CancellationTokenSource為被取消之前WaitHandle.WaitOne()方法會阻塞后續(xù)執(zhí)行,也就是下面的輸出暫時不會輸出。等到CancellationTokenSource執(zhí)行了Cancel操作里調(diào)用了ManualResetEvent的Set方法停止阻塞,后續(xù)的輸出才會被執(zhí)行到這是一個同步操作,如果了解ManualResetEvent的同學(xué)相信對這個不難理解。為了演示效果我用Task演示異步的情況,所以執(zhí)行的結(jié)果如下所示
執(zhí)行到了這里
阻塞之前
阻塞取消,執(zhí)行到了.
被取消了.
注冊操作
上面我們大概講解了一些初始化相關(guān)的和一些輔助的操作,接下來我們看一下核心的注冊操作,注冊操作的用途就是注冊CancellationTokenSource取消或超時后需要執(zhí)行的動作,而注冊Register的操作并未由CancellationTokenSource直接進行,而是通過它的Token屬性即CancellationToken實例操作的,話不多說直接找到CancellationToken的Register方法[點擊查看源碼👈]
public CancellationTokenRegistration Register(Action callback) => Register( s_actionToActionObjShunt, callback ?? throw new ArgumentNullException(nameof(callback)), useSynchronizationContext: false, useExecutionContext: true);
它是直接調(diào)用自己的重載方法,注意幾個參數(shù),如果看細節(jié)的話還是要關(guān)注方法參數(shù)的。過程就省略了,直接找到最底層的方法[點擊查看源碼👈]
private CancellationTokenRegistration Register(Action<object?> callback, object? state, bool useSynchronizationContext, bool useExecutionContext) { if (callback == null) throw new ArgumentNullException(nameof(callback)); //_source就是傳遞下來的CancellationTokenSource CancellationTokenSource? source = _source; //本質(zhì)是調(diào)用的CancellationTokenSource的InternalRegister方法 return source != null ? source.InternalRegister(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null, useExecutionContext ? ExecutionContext.Capture() : null) : default;
從這個最底層的方法我們可以得知,其本質(zhì)還是調(diào)用CancellationTokenSource的InternalRegister方法,核心操作都不在CancellationToken還是在CancellationTokenSource類,CancellationToken更像是依賴CancellationTokenSource的表現(xiàn)類,看一下InternalRegister方法[點擊查看源碼👈]
//初始化CallbackPartition數(shù)組 private volatile CallbackPartition?[]? _callbackPartitions; //獲取初始化上面數(shù)組的長度,根據(jù)當前CPU核心數(shù)獲取的 private static readonly int s_numPartitions = GetPartitionCount(); internal CancellationTokenRegistration InternalRegister( Action<object?> callback, object? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext) { //判斷有沒有被取消 if (!IsCancellationRequested) { //如果已被釋放直接返回 if (_disposed) { return default; } CallbackPartition?[]? partitions = _callbackPartitions; if (partitions == null) { //首次調(diào)用初始化CallbackPartition數(shù)組 partitions = new CallbackPartition[s_numPartitions]; //判斷_callbackPartitions如果為null,則把partitions賦值給_callbackPartitions partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions; } //獲取當前線程使用的分區(qū)下標 int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask; //獲取一個CallbackPartition CallbackPartition? partition = partitions[partitionIndex]; if (partition == null) { //初始化CallbackPartition實例 partition = new CallbackPartition(this); //如果partitions的partitionIndex下標位置為null則使用partition填充 partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition; } long id; CallbackNode? node; bool lockTaken = false; //鎖住操作 partition.Lock.Enter(ref lockTaken); try { id = partition.NextAvailableId++; //獲取CallbackNode,這事真正存儲回調(diào)的地方,不要被List名字迷惑,其實是要構(gòu)建鏈表 node = partition.FreeNodeList; if (node != null) { //這個比較有意思如果CallbackNode不是首次,則把最新的賦值給FreeNodeList partition.FreeNodeList = node.Next; } else { //首次的時候初始化一個CallbackNode實例 node = new CallbackNode(partition); } node.Id = id; //Register的回調(diào)操作賦值給了CallbackNode的Callback node.Callback = callback; node.CallbackState = stateForCallback; node.ExecutionContext = executionContext; node.SynchronizationContext = syncContext; //構(gòu)建一個CallbackNode鏈表,從下面的代碼可以看出來構(gòu)建的其實是倒序鏈表,最新的CallbackNode是表頭 node.Next = partition.Callbacks; if (node.Next != null) { node.Next.Prev = node; } //Callbacks記錄的是當前的節(jié)點,如果下一次進來新節(jié)點則作為新節(jié)點的Next節(jié)點 partition.Callbacks = node; } finally { //釋放鎖 partition.Lock.Exit(useMemoryBarrier: false); } //用當前注冊回調(diào)生成的CallbackNode節(jié)點生成CancellationTokenRegistration實例 var ctr = new CancellationTokenRegistration(id, node); //如果未被取消則直接返回 if (!IsCancellationRequested || !partition.Unregister(id, node)) { return ctr; } } //走到這里說明IsCancellationRequested已經(jīng)等于true了也就是被取消了,則直接執(zhí)行該回調(diào) callback(stateForCallback); return default; }
這里涉及到一個比較核心的類那就是CallbackPartition,這是一個內(nèi)部類,它的主要用途就是輔助構(gòu)建執(zhí)行回調(diào)的鏈表操作,其大概實現(xiàn)是這個樣子的[點擊查看源碼👈]
internal sealed class CallbackPartition { public readonly CancellationTokenSource Source; //使用了自旋鎖 public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false); public CallbackNode? Callbacks; public CallbackNode? FreeNodeList; public long NextAvailableId = 1; public CallbackPartition(CancellationTokenSource source) { Source = source; } internal bool Unregister(long id, CallbackNode node) { //這里面有內(nèi)容,就不羅列了,判斷CallbackNode是否被取消注冊,如果為false說明未被取消注冊 } }
這里面我暫時沒有列出Unregister的內(nèi)容,因為它是和取消相關(guān)的,說到取消的時候咱們再看,如果返回true則說明取消成功。這個類核心就是輔助構(gòu)建Register回調(diào)鏈表的,它的核心都是在操作CallbackNode節(jié)點和其構(gòu)建的回調(diào)鏈表,而CallbackNode則是鏈表的一個節(jié)點定義,其大致結(jié)構(gòu)如下[點擊查看源碼👈]
internal sealed class CallbackNode { public readonly CallbackPartition Partition; //構(gòu)建鏈表的核心Prev和Next public CallbackNode? Prev; public CallbackNode? Next; public long Id; //回調(diào)操作被這個委托記錄 public Action<object?>? Callback; public object? CallbackState; public ExecutionContext? ExecutionContext; public SynchronizationContext? SynchronizationContext; public CallbackNode(CallbackPartition partition) { Partition = partition; } public void ExecuteCallback() { //這里也有代碼,暫時不列出來,講取消的時候單獨講解 } }
到了這里關(guān)于Register涉及到的核心操作都羅列出來了,由于貼出來的是源碼相關(guān)看著是比較蒙圈的,但是如果順著看的話其實還是大致的實現(xiàn)思路還是可以理解的,這里我大致的總結(jié)一下它的實現(xiàn)思路
- 首先是構(gòu)建了CallbackPartition數(shù)組,構(gòu)建這個數(shù)組的長度是根據(jù)CPU的核心數(shù)來決定,每個CallbackPartition是操作的核心,為了防止過多的線程同時操作一個CallbackPartition實例,它采用了為不同線程分區(qū)的思路,CallbackPartition維護了構(gòu)建鏈表節(jié)點的類CallbackNode。
- CallbackNode是組成鏈表的核心,CallbackNode每個實例都是鏈表的一個節(jié)點,從它自包含Prev和Next屬性便可以看出是一個雙向鏈表。
- CallbackPartition的核心功能就是為了構(gòu)建Register進來的回調(diào),從上面的InternalRegister方法里的操作我們可以得知,通過CallbackPartition的輔助將CallbackNode節(jié)點構(gòu)建為一個倒序鏈表,也就是最新的CallbackNode實例是鏈表的首節(jié)點,而最老的CallbackNode實例則是鏈表的尾節(jié)點。每一次Register進來的回調(diào),都被包裝成了CallbackNode添加到這個鏈表中。
上面InternalRegister方法里我們看到操作CallbackNode的時候,使用了SpinLock自旋鎖。短時間鎖定的情況下SpinLock更快,因為自旋鎖本質(zhì)上不會讓線程休眠,而是一直循環(huán)嘗試對資源訪問,直到可用。所以自旋鎖線程被阻塞時,不進行線程上下文切換,而是空轉(zhuǎn)等待。對于多核CPU而言,減少了切換線程上下文的開銷,從而提高了性能。
取消操作
上面我們看到了注冊相關(guān)的操作,注冊還是比較統(tǒng)一的,就一種操作方式。取消卻有兩種方式,一種是超時取消,另一種是主動取消,接下來我們就分別看一下這兩種方式分別是如何操作的。
Cancel操作
首先我們來看主動取消的操作方式這個是最簡單最直接的方式,而且這個方法屬于CancellationTokenSource類,話不多說直接看實現(xiàn)[點擊查看源碼👈]
public void Cancel() => Cancel(false); public void Cancel(bool throwOnFirstException) { ThrowIfDisposed(); NotifyCancellation(throwOnFirstException); }
重點來了Cancel方法居然也是調(diào)用的NotifyCancellation方法,這個方法咱們上面已經(jīng)看過了。在說定時的方式構(gòu)造CancellationTokenSource的時候有一個自動取消的操作,提到了NotifyCancellation方法的核心是ExecuteCallbackHandlers方法,這個是CancellationTokenSource執(zhí)行取消操作的核心操作。還說了為了保證閱讀的順序性,咱們在講取消操作的時候在重點講這個方法??磥磉@個時刻終于還是到來了,直接打開ExecuteCallbackHandlers方法[點擊查看源碼👈]
private volatile int _threadIDExecutingCallbacks = -1; private volatile CallbackPartition?[]? _callbackPartitions; private const int NotifyingCompleteState = 3; private void ExecuteCallbackHandlers(bool throwOnFirstException) { //獲取當前線程ID ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; //將_callbackPartitions置為null,但是partitions不為null,因為Exchange返回的是改變之前的值 CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null); //如果partitions為null說明是回調(diào)已經(jīng)通知完成狀態(tài)了直接返回 if (partitions == null) { Interlocked.Exchange(ref _state, NotifyingCompleteState); return; } List<Exception>? exceptionList = null; try { //遍歷CallbackPartition數(shù)組 foreach (CallbackPartition? partition in partitions) { //CallbackPartition實例為null說明這個分區(qū)未被使用直接跳過 if (partition == null) { continue; } //循環(huán)處理CallbackNode鏈表 while (true) { CallbackNode? node; bool lockTaken = false; //鎖住當前操作 partition.Lock.Enter(ref lockTaken); try { //獲取鏈表的節(jié)點 node = partition.Callbacks; //為null說明沒Register過直接中斷 if (node == null) { break; } else { //如果鏈表遍歷不是尾節(jié)點,切斷和下一個節(jié)點的關(guān)聯(lián) if (node.Next != null) node.Next.Prev = null; //把下一個節(jié)點賦值給Callbacks partition.Callbacks = node.Next; } //當前執(zhí)行節(jié)點ID _executingCallbackId = node.Id; node.Id = 0; } finally { //退出鎖 partition.Lock.Exit(useMemoryBarrier: false); } try { //如果當時傳遞了同步上下文則直接在當時的上下文調(diào)用ExecuteCallback委托 if (node.SynchronizationContext != null) { node.SynchronizationContext.Send(static s => { var n = (CallbackNode)s!; n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; n.ExecuteCallback(); }, node); ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; } else { //如果沒有傳遞SynchronizationContext則直接調(diào)用ExecuteCallback委托 //即調(diào)用Register的注冊的委托 node.ExecuteCallback(); } } catch (Exception ex) when (!throwOnFirstException) { (exceptionList ??= new List<Exception>()).Add(ex); } } } } finally { //將全局狀態(tài)置為通知完成狀態(tài) //即已經(jīng)調(diào)用過Register回調(diào) _state = NotifyingCompleteState; Volatile.Write(ref _executingCallbackId, 0); Interlocked.MemoryBarrier(); } //如果中途存在異常則拋出 if (exceptionList != null) { Debug.Assert(exceptionList.Count > 0, $"Expected {exceptionList.Count} > 0"); throw new AggregateException(exceptionList); } }
關(guān)于ExecuteCallback方法是CallbackNode類的方法,也就是咱們上面羅列CallbackNode類結(jié)構(gòu)時被省略的方法,它的主要功能就是調(diào)用Register的回調(diào),也就是執(zhí)行Register里的委托。欠下的我會補上來,注意這里是CallbackNode類,接下來看下實現(xiàn)[點擊查看源碼👈]
public ExecutionContext? ExecutionContext; public void ExecuteCallback() { ExecutionContext? context = ExecutionContext; //如果Register的時候允許傳遞ExecutionContext則直接用這個上下文執(zhí)行回調(diào)Callback //Callback委托也就是承載Register的委托操作 if (context != null) { ExecutionContext.RunInternal(context, static s => { Debug.Assert(s is CallbackNode, $"Expected {typeof(CallbackNode)}, got {s}"); CallbackNode n = (CallbackNode)s; Debug.Assert(n.Callback != null); n.Callback(n.CallbackState); }, this); } else { Debug.Assert(Callback != null); //直接在當前線程調(diào)用Callback //Callback委托也就是承載Register的委托操作 Callback(CallbackState); } }
關(guān)于取消的核心方法ExecuteCallbackHandlers的重要操作,咱們已經(jīng)羅列出來了,其實我們看到注冊的思路的時候,就已經(jīng)能猜到執(zhí)行取消回調(diào)的大致思路了,既然Register的時候進行了拉鏈,那么取消執(zhí)行注冊回調(diào)肯定是變量鏈表執(zhí)行里面的Callback了,大致總結(jié)一下
- 執(zhí)行Cancel之后核心操作還是針對構(gòu)建的CallbackNode鏈表進行遍歷,咱們之前說過構(gòu)建的CallbackNode鏈表是倒序鏈表,最新的節(jié)點放在鏈表的首部,這也就解釋了為啥我們上面的示例Register多個委托的時候,最先輸出的是最后注冊委托。
- Register注冊時候有參數(shù)判斷是否需要傳遞當前同步上下文SynchronizationContext和執(zhí)行上下文ExecutionContext,作用就是為了是否在當時的上下文環(huán)境執(zhí)行Callback回調(diào)操作。
- 上面的遍歷代碼我們看到了會執(zhí)行CallbackNode.Next.Prev=null的操作,是為了斷開當前鏈表節(jié)點和上下節(jié)點的關(guān)系,個人感覺是為了切斷對象引用方便釋放的,防止內(nèi)存泄漏,同時也說明了默認情況下Register的的回調(diào)函數(shù)執(zhí)行是一次性的,當執(zhí)行完Cancel操作之后當前CancellationToken實例也就失效了。
CancelAfter操作
之前我們演示的時候說過有兩種方式可以執(zhí)行超時取消操作,一種是在構(gòu)建CancellationTokenSource實例構(gòu)造的時候傳遞超時時間,還有另一種是使用CancelAfter操作,這個方法表示在指定時間之后取消,效果上等同于實例化CancellationTokenSource的時候傳遞超時時間的操作,廢話不多說直接羅列代碼[點擊查看源碼👈]
public void CancelAfter(TimeSpan delay) { long totalMilliseconds = (long)delay.TotalMilliseconds; if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue) { throw new ArgumentOutOfRangeException(nameof(delay)); } //調(diào)用的是重載的CancelAfter方法 CancelAfter((int)totalMilliseconds); } private static readonly TimerCallback s_timerCallback = obj => { ((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false); }; public void CancelAfter(int millisecondsDelay) { //傳遞的毫秒數(shù)不能小于-1 if (millisecondsDelay < -1) { throw new ArgumentOutOfRangeException(nameof(millisecondsDelay)); } //如果已經(jīng)取消則直接返回 if (IsCancellationRequested) { return; } //注冊一個定時器執(zhí)行s_timerCallback //s_timerCallback在上面我們介紹過了 本這就是調(diào)用CancellationTokenSource的NotifyCancellation方法 TimerQueueTimer? timer = _timer; if (timer == null) { timer = new TimerQueueTimer(s_timerCallback, this, Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false); TimerQueueTimer? currentTimer = Interlocked.CompareExchange(ref _timer, timer, null); if (currentTimer != null) { timer.Close(); timer = currentTimer; } } try { timer.Change((uint)millisecondsDelay, Timeout.UnsignedInfinite); } catch (ObjectDisposedException) { } }
通過上面的源碼我們可以看到CancelAfter的操作代碼和傳遞超時時間構(gòu)造CancellationTokenSource的代碼基本上是一致的,都是通過TimerQueueTimer的方式定時觸發(fā)調(diào)用CancellationTokenSource的NotifyCancellation方法,而NotifyCancellation方法的核心實現(xiàn)就是ExecuteCallbackHandlers方法,這些方法咱們上面都有講解過,就不重復(fù)介紹了,這樣關(guān)于取消相關(guān)的操作我們也就全部講解完成了。
總結(jié)
本文我們主要講解了C#取消令牌CancellationTokenSource,雖然設(shè)計到的類并不多,但是這部分源碼并不少,而且也只是講解核心功能的部分源碼,有興趣的同學(xué)可以自行閱讀這個類相關(guān)代碼,如果你覺得你的GitHub比較不給力推薦一個可以閱讀CoreCLR源碼的網(wǎng)站source.dot.net這個網(wǎng)站看到的是目前CoreCLR最新的源碼,可以直接連接到GitHub非常方便,但是最新版本的源碼和穩(wěn)定版本的有些差別,這個還需要注意。由于文章比較長,再加上筆者技術(shù)能力和文筆能力都有限,這里做一下簡單的總結(jié)
- CancellationTokenSource的用途就是可以感知到取消操作,其中涉及到的Register回調(diào)、WaitHandle、IsCancellationRequested都能實現(xiàn)這個功能,當然它還支持超時取消操作。
- CancellationTokenSource的Register和Cancel相關(guān)成雙成對的,雖然有CancelAfter和構(gòu)造傳遞超時時間的方式,其本質(zhì)和Cancel操作是一樣的。
- CancellationTokenSource的核心操作原理,是通過CallbackPartition和CallbackNode構(gòu)建倒序鏈表,Register的時候通過Callback委托構(gòu)建鏈表,Cancel的時候遍歷構(gòu)建的鏈表執(zhí)行Callback,雖然有一堆額外操作,但是核心工作方式就是鏈表操作。
- 需要注意的是,默認情況下CancellationTokenSource產(chǎn)生的CancellationToken是一次性的,取消了之后是沒有辦法進行重置的,當然微軟已經(jīng)為我們提供了IChangeToken去解決了CancellationToken重復(fù)觸發(fā)的問題,請放心使用。
由于本篇文章篇幅較長,加上筆者能力有限,文筆更是一般,如果講解的不清楚還望諒解,或者感興趣的同學(xué)可以自行閱讀源碼。關(guān)于看源碼每個人都有自己的關(guān)注點,我一般的初衷都是弄明白它的原理,順便學(xué)習(xí)下它代碼風(fēng)格或思路。學(xué)無止境,結(jié)果有時候并不那么重要,過程才重要。就和許多人追求自己能有到達什么樣的高度,成功其實只是成長過程中順便的一種表現(xiàn),就和你如果不滿現(xiàn)狀,說明你在很早之前沒想過改變自己一樣。
到此這篇關(guān)于運用示例簡單講解C#取消令牌CancellationTokenSource的文章就介紹到這了,更多相關(guān)C# CancellationTokenSource內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
c#實現(xiàn)數(shù)據(jù)同步的方法(使用文件監(jiān)控對象filesystemwatcher)
這篇文章主要介紹了C#使用文件監(jiān)控對象FileSystemWatcher實現(xiàn)數(shù)據(jù)同步,大家參考使用吧2013-12-12C# 在PDF文檔中創(chuàng)建表格的實現(xiàn)方法
表格能夠一目了然的讓用戶看到數(shù)據(jù)信息,使信息顯得有條理化,那么在pdf類型的文檔中如何來添加表格并對表格進行格式化操作呢?下面小編給大家?guī)砹薈# 在PDF文檔中創(chuàng)建表格的實現(xiàn)方法,需要的朋友參考下吧2017-12-12分享WCF文件傳輸實現(xiàn)方法---WCFFileTransfer
這篇文章主要介紹了分享WCF文件傳輸實現(xiàn)方法---WCFFileTransfer,需要的朋友可以參考下2015-11-11C#中WPF內(nèi)存回收與釋放LierdaCracker的實現(xiàn)
本文主要介紹了C#中WPF內(nèi)存回收與釋放LierdaCracker的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07C#實現(xiàn)Socket服務(wù)器及多客戶端連接的方式
這篇文章介紹了C#實現(xiàn)Socket服務(wù)器及多客戶端連接的方式,文中通過示例代碼介紹的非常詳細。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-01-01DevExpress設(shè)置FocusedNode背景色的方法
這篇文章主要介紹了DevExpress設(shè)置FocusedNode背景色的方法,很實用的功能,需要的朋友可以參考下2014-08-08