欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C#中使用CAS實(shí)現(xiàn)無(wú)鎖算法的示例詳解

 更新時(shí)間:2023年04月21日 08:38:36   作者:黑洞視界  
CAS(Compare-and-Swap)是一種多線程并發(fā)編程中常用的原子操作,用于實(shí)現(xiàn)多線程間的同步和互斥訪問(wèn)。本文將利用CAS實(shí)現(xiàn)無(wú)鎖算法,需要的可以參考一下

CAS 的基本概念

CAS(Compare-and-Swap)是一種多線程并發(fā)編程中常用的原子操作,用于實(shí)現(xiàn)多線程間的同步和互斥訪問(wèn)。 它操作通常包含三個(gè)參數(shù):一個(gè)內(nèi)存地址(通常是一個(gè)共享變量的地址)、期望的舊值和新值。

CompareAndSwap(內(nèi)存地址,期望的舊值,新值)

CAS 操作會(huì)比較內(nèi)存地址處的值與期望的舊值是否相等,如果相等,則將新值寫(xiě)入該內(nèi)存地址; 如果不相等,則不進(jìn)行任何操作。這個(gè)比較和交換的操作是一個(gè)原子操作,不會(huì)被其他線程中斷。

CAS 通常是通過(guò)硬件層面的CPU指令實(shí)現(xiàn)的,其原子性是由硬件保證的。具體的實(shí)現(xiàn)方式根據(jù)環(huán)境會(huì)有所不同。

CAS 操作通常會(huì)有一個(gè)返回值,用于表示操作是否成功。返回結(jié)果可能是true或false,也可能是內(nèi)存地址處的舊值。

相比于傳統(tǒng)的鎖機(jī)制,CAS 有一些優(yōu)勢(shì):

  • 原子性:CAS 操作是原子的,不需要額外的鎖來(lái)保證多線程環(huán)境下的數(shù)據(jù)一致性,避免了鎖帶來(lái)的性能開(kāi)銷(xiāo)和競(jìng)爭(zhēng)條件。
  • 無(wú)阻塞:CAS 操作是無(wú)阻塞的,不會(huì)因?yàn)橘Y源被鎖定而導(dǎo)致線程的阻塞和上下文切換,提高了系統(tǒng)的并發(fā)性和可伸縮性。
  • 適用性:CAS 操作可以應(yīng)用于廣泛的數(shù)據(jù)結(jié)構(gòu)和算法,如自旋鎖、計(jì)數(shù)器、隊(duì)列等,使得它在實(shí)際應(yīng)用中具有較大的靈活性和適用性。

C# 中如何使用 CAS

在 C# 中,我們可以使用 Interlocked 類(lèi)來(lái)實(shí)現(xiàn) CAS 操作。

Interlocked 類(lèi)提供了一組 CompareExchange 的重載方法,用于實(shí)現(xiàn)不同類(lèi)型的數(shù)據(jù)的 CAS 操作。

public static int CompareExchange(ref int location1, int value, int comparand);
public static long CompareExchange(ref long location1, long value, long comparand);
// ... 省略其他重載方法
public static object CompareExchange(ref object location1, object value, object comparand);
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;

CompareExchange 方法將 location1 內(nèi)存地址處的值與 comparand 比較,如果相等,則將 value 寫(xiě)入 location1 內(nèi)存地址處,否則不進(jìn)行任何操作。
該方法返回 location1 內(nèi)存地址處的值。

通過(guò)判斷方法返回值與 comparand 是否相等,我們就可以知道 CompareExchange 方法是否執(zhí)行成功。

算法示例

在使用 CAS 實(shí)現(xiàn)無(wú)鎖算法時(shí),通常我們不光是為了比較和更新一個(gè)數(shù)據(jù),還需要在更新成功后進(jìn)行下一步的操作。結(jié)合 while(true) 循環(huán),我們可以不斷地嘗試更新數(shù)據(jù),直到更新成功為止。

偽代碼如下:

while (true)
{
    // 讀取數(shù)據(jù)
    oldValue = ...;
    // 計(jì)算新值
    newValue = ...;
    // CAS 更新數(shù)據(jù)
    result = CompareExchange(ref location, newValue, oldValue);
    // 判斷 CAS 是否成功
    if (result == oldValue)
    {
        // CAS 成功,執(zhí)行后續(xù)操作
        break;
    }
}

在復(fù)雜的無(wú)鎖算法中,因?yàn)槊恳徊讲僮鞫际仟?dú)立的,連續(xù)的操作并非原子,所以我們不光要借助 CAS,每一步操作前都應(yīng)判斷是否有其他線程已經(jīng)修改了數(shù)據(jù)。

示例1:計(jì)數(shù)器

下面是一個(gè)簡(jiǎn)單的計(jì)數(shù)器類(lèi),它使用 CAS 實(shí)現(xiàn)了一個(gè)線程安全的自增操作。

public class Counter
{
    private int _value;

    public int Increment()
    {
        while (true)
        {
            int oldValue = _value;
            int newValue = oldValue + 1;
            int result = Interlocked.CompareExchange(ref _value, newValue, oldValue);
            if (result == oldValue)
            {
                return newValue;
            }
        }
    }
}

CLR 底層源碼中,我們也會(huì)經(jīng)??吹竭@樣的代碼,比如 ThreadPool 增加線程時(shí)的計(jì)數(shù)器。https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446

internal void EnsureThreadRequested()
{
    //
    // If we have not yet requested #procs threads, then request a new thread.
    //
    // CoreCLR: Note that there is a separate count in the VM which has already been incremented
    // by the VM by the time we reach this point.
    //
    int count = _separated.numOutstandingThreadRequests;
    while (count < Environment.ProcessorCount)
    {
        int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        {
            ThreadPool.RequestWorkerThread();
            break;
        }
        count = prev;
    }
}

示例2:隊(duì)列

下面是一個(gè)簡(jiǎn)單的隊(duì)列類(lèi),它使用 CAS 實(shí)現(xiàn)了一個(gè)線程安全的入隊(duì)和出隊(duì)操作。相較于上面的計(jì)數(shù)器,這里的操作更加復(fù)雜,我們每一步都需要考慮是否有其他線程已經(jīng)修改了數(shù)據(jù)。

這樣的算法有點(diǎn)像薛定諤的貓,你不知道它是死是活,只有當(dāng)你試圖去觀察它的時(shí)候,它才可能會(huì)變成死或者活。

public class ConcurrentQueue<T>
{
    // _head 和 _tail 是兩個(gè)偽節(jié)點(diǎn),_head._next 指向隊(duì)列的第一個(gè)節(jié)點(diǎn),_tail 指向隊(duì)列的最后一個(gè)節(jié)點(diǎn)。
    // _head 和 _tail 會(huì)被多個(gè)線程修改和訪問(wèn),所以要用 volatile 修飾。
    private volatile Node _head;
    private volatile Node _tail;
    
    public ConcurrentQueue()
    {
        _head = new Node(default);
        // _tail 指向 _head 時(shí),隊(duì)列為空。
        _tail = _head;
    }

    public void Enqueue(T item)
    {
        var node = new Node(item);
        while (true)
        {
            Node tail = _tail;
            Node next = tail._next;
            // 判斷給 next 賦值的這段時(shí)間,是否有其他線程修改過(guò) _tail
            if (tail == _tail)
            {
                // 如果 next 為 null,則說(shuō)明從給 tail 賦值到給 next 賦值這段時(shí)間,沒(méi)有其他線程修改過(guò) tail._next,
                if (next == null)
                {
                    // 如果 tail._next 為 null,則說(shuō)明從給 tail 賦值到這里,沒(méi)有其他線程修改過(guò) tail._next,
                    // tail 依舊是隊(duì)列的最后一個(gè)節(jié)點(diǎn),我們就可以直接將 node 賦值給 tail._next。                                
                    if (Interlocked.CompareExchange(ref tail._next, node, null) == null)
                    {
                        // 如果_tail == tail,則說(shuō)明從上一步 CAS 操作到這里,沒(méi)有其他線程修改過(guò) _tail,也就是沒(méi)有其他線程執(zhí)行過(guò) Enqueue 操作。
                        // 那么當(dāng)前線程 Enqueue 的 node 就是隊(duì)列的最后一個(gè)節(jié)點(diǎn),我們就可以直接將 node 賦值給 _tail。
                        Interlocked.CompareExchange(ref _tail, node, tail);
                        break;
                    }
                }
                // 如果 next 不為 null,則說(shuō)明從給 tail 賦值到給 next 賦值這段時(shí)間,有其他線程修改過(guò) tail._next,
                else
                {
                    // 如果沒(méi)有其他線程修改過(guò) _tail,那么 next 就是隊(duì)列的最后一個(gè)節(jié)點(diǎn),我們就可以直接將 next 賦值給 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                }
            }
        }
    }

    public bool TryDequeue(out T item)
    {
        while (true)
        {
            Node head = _head;
            Node tail = _tail;
            Node next = head._next;
            // 判斷 _head 是否被修改過(guò)
            // 如果沒(méi)有被修改過(guò),說(shuō)明從給 head 賦值到給 next 賦值這段時(shí)間,沒(méi)有其他線程執(zhí)行過(guò) Dequeue 操作。          
            if (head == _head)
            {
                // 如果 head == tail,說(shuō)明隊(duì)列為空
                if (head == tail)
                {
                    // 雖然上面已經(jīng)判斷過(guò)隊(duì)列是否為空,但是在這里再判斷一次
                    // 是為了防止在給 tail 賦值到給 next 賦值這段時(shí)間,有其他線程執(zhí)行過(guò) Enqueue 操作。
                    if (next == null)
                    {
                        item = default;
                        return false;
                    }

                    // 如果 next 不為 null,則說(shuō)明從給 tail 賦值到給 next 賦值這段時(shí)間,有其他線程修改過(guò) tail._next,也就是有其他線程執(zhí)行過(guò) Enqueue 操作。
                    // 那么 next 就可能是隊(duì)列的最后一個(gè)節(jié)點(diǎn),我們嘗試將 next 賦值給 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                }
                // 如果 head != tail,說(shuō)明隊(duì)列不為空
                else
                {
                    item = next._item;
                    if (Interlocked.CompareExchange(ref _head, next, head) == head)
                    {
                        // 如果 _head 沒(méi)有被修改過(guò)
                        // 說(shuō)明從給 head 賦值到這里,沒(méi)有其他線程執(zhí)行過(guò) Dequeue 操作,上面的 item 就是隊(duì)列的第一個(gè)節(jié)點(diǎn)的值。
                        // 我們就可以直接返回。
                        break;
                    }
                    // 如果 _head 被修改過(guò)
                    // 說(shuō)明從給 head 賦值到這里,有其他線程執(zhí)行過(guò) Dequeue 操作,上面的 item 就不是隊(duì)列的第一個(gè)節(jié)點(diǎn)的值。
                    // 我們就需要重新執(zhí)行 Dequeue 操作。
                }
            }
        }

        return true;
    }

    private class Node
    {
        public readonly T _item;
        public Node _next;

        public Node(T item)
        {
            _item = item;
        }
    }
}

我們可以通過(guò)以下代碼來(lái)進(jìn)行測(cè)試

using System.Collections.Concurrent;

var queue = new ConcurrentQueue<int>();
var results = new ConcurrentBag<int>();
int dequeueRetryCount = 0;

var enqueueTask = Task.Run(() =>
{
    // 確保 Enqueue 前 dequeueTask 已經(jīng)開(kāi)始運(yùn)行
    Thread.Sleep(10);
    Console.WriteLine("Enqueue start");
    Parallel.For(0, 100000, i => queue.Enqueue(i));
    Console.WriteLine("Enqueue done");
});

var dequeueTask = Task.Run(() =>
{
    Thread.Sleep(10);
    Console.WriteLine("Dequeue start");
    Parallel.For(0, 100000, i =>
    {
        while (true)
        {
            if (queue.TryDequeue(out int result))
            {
                results.Add(result);
                break;
            }

            Interlocked.Increment(ref dequeueRetryCount);
        }
    });
    Console.WriteLine("Dequeue done");
});

await Task.WhenAll(enqueueTask, dequeueTask);
Console.WriteLine(
    $"Enqueue and dequeue done, total data count: {results.Count}, dequeue retry count: {dequeueRetryCount}");

var hashSet = results.ToHashSet();
for (int i = 0; i < 100000; i++)
{
    if (!hashSet.Contains(i))
    {
        Console.WriteLine("Error, missing " + i);
        break;
    }
}

Console.WriteLine("Done");

輸出結(jié)果:

Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done

上述的 retry count 為 797,說(shuō)明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重試,那是因?yàn)樵?Dequeue 操作中,可能暫時(shí)沒(méi)有數(shù)據(jù)可供 Dequeue,需要等待其他線程執(zhí)行 Enqueue 操作。

當(dāng)然這個(gè) retry count 是不穩(wěn)定的,因?yàn)樵诙嗑€程環(huán)境下,每次執(zhí)行的結(jié)果都可能不一樣。

總結(jié)

CAS 操作是一種樂(lè)觀鎖,它假設(shè)沒(méi)有其他線程修改過(guò)數(shù)據(jù),如果沒(méi)有修改過(guò),那么就直接修改數(shù)據(jù),如果修改過(guò),那么就重新獲取數(shù)據(jù),再次嘗試修改。

在借助 CAS 實(shí)現(xiàn)較為復(fù)雜的數(shù)據(jù)結(jié)構(gòu)時(shí),我們不光要依靠 CAS 操作,還需要注意每次操作的數(shù)據(jù)是否被其他線程修改過(guò),考慮各個(gè)可能的分支,以及在不同的分支中,如何處理數(shù)據(jù)。

以上就是C#中使用CAS實(shí)現(xiàn)無(wú)鎖算法的示例詳解的詳細(xì)內(nèi)容,更多關(guān)于C# CAS無(wú)鎖算法的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論