利用C++11原子量如何實(shí)現(xiàn)自旋鎖詳解
一、自旋鎖
自旋鎖是一種基礎(chǔ)的同步原語(yǔ),用于保障對(duì)共享數(shù)據(jù)的互斥訪問(wèn)。與互斥鎖的相比,在獲取鎖失敗的時(shí)候不會(huì)使得線程阻塞而是一直自旋嘗試獲取鎖。當(dāng)線程等待自旋鎖的時(shí)候,CPU不能做其他事情,而是一直處于輪詢忙等的狀態(tài)。自旋鎖主要適用于被持有時(shí)間短,線程不希望在重新調(diào)度上花過(guò)多時(shí)間的情況。實(shí)際上許多其他類型的鎖在底層使用了自旋鎖實(shí)現(xiàn),例如多數(shù)互斥鎖在試圖獲取鎖的時(shí)候會(huì)先自旋一小段時(shí)間,然后才會(huì)休眠。如果在持鎖時(shí)間很長(zhǎng)的場(chǎng)景下使用自旋鎖,則會(huì)導(dǎo)致CPU在這個(gè)線程的時(shí)間片用盡之前一直消耗在無(wú)意義的忙等上,造成計(jì)算資源的浪費(fèi)。
使用自旋鎖時(shí)要注意:
由于自旋時(shí)不釋放CPU,因而持有自旋鎖的線程應(yīng)該盡快釋放自旋鎖,否則等待該自旋鎖的線程會(huì)一直在哪里自旋,這就會(huì)浪費(fèi)CPU時(shí)間。
持有自旋鎖的線程在sleep之前應(yīng)該釋放自旋鎖以便其他咸亨可以獲得該自旋鎖
二、CAS操作實(shí)現(xiàn)自旋鎖
CAS(Compare and Swap),即比較并替換,實(shí)現(xiàn)并發(fā)算法時(shí)常用到的一種技術(shù),這種操作提供了硬件級(jí)別的原子操作(通過(guò)鎖總線的方式)。CAS操作的原型可以認(rèn)為是:
bool CAS(V, A, B)
其中V代表內(nèi)存中的變量,A代表期待的值,B表示新值。當(dāng)V的值與A相等時(shí),將V與B的值交換。邏輯上可以用下面的偽代碼表示:
bool CAS(V, A, B)
{
if (V == A)
{
swap(V, B);
return true;
}
return false;
}
需要強(qiáng)調(diào)的是上面的操作是原子的,要么不做,要么全部完成。
那么已經(jīng)擁有CAS操作的情況下如何實(shí)現(xiàn)一個(gè)自旋鎖呢?首先回憶自旋鎖的用途,本質(zhì)上我們是希望能夠讓一個(gè)線程在不滿足進(jìn)入臨界區(qū)的條件時(shí),不停的忙等輪詢,直到可以運(yùn)行的時(shí)候再繼續(xù)(進(jìn)入臨界區(qū))執(zhí)行。那么,我們可能自然的想到使用一個(gè)bool變量來(lái)表示是否可以進(jìn)入臨界區(qū),例如以下面的偽代碼的邏輯:
while(flag == true); flag = true; /* do something ... */ flag = false; ...
這樣做的直觀想法是當(dāng)flag為true的時(shí)候表示已經(jīng)有線程處于臨界區(qū)內(nèi),只有當(dāng)flag為fasle時(shí)才能進(jìn)入,而在進(jìn)入的時(shí)候立即將flag置為true。但是這樣做明顯存在一個(gè)問(wèn)題,判斷flag為false和設(shè)置flag為true并不是一個(gè)不可分割的整體,有可能出現(xiàn)類似下面這樣的時(shí)序, 假設(shè)最初flag為false:
| step | thread 1 | thread 2 |
|---|---|---|
| 1 | while(flag == true); | |
| 2 | while(flag == true); | |
| 3 | flag = true | |
| 4 | flag = true | |
| 5 | do something | do something |
| 6 | flag = false | |
| 7 | flag = false |
step是虛構(gòu)的步驟,do something為一系列指令,這里寫(xiě)在一起表示并發(fā)執(zhí)行。這里可以看出由于thread1讀取判斷flag的值與修改flag的值是兩個(gè)獨(dú)立的操作,中間插入了thread2的判斷操作,最終使得有兩個(gè)線程同時(shí)進(jìn)入了臨界區(qū),這與我們的期望相悖。那么如何解決呢?如果能將讀取判斷與修改的操作合二為一,變成一個(gè)不可分割的整體,那么自然就不可能出現(xiàn)這種交錯(cuò)的場(chǎng)景。對(duì)于這樣一個(gè)整體操作,我們希望它能讀取內(nèi)存中變量的值,并且當(dāng)其等于特定值的時(shí)候,修改它為我們需要的另一個(gè)值。嗯......沒(méi)錯(cuò),這樣我們就得到了CAS操作。
現(xiàn)在可以重新修改我們的同步方式,不停的進(jìn)行期望flag為false的CAS操作 CAS(flag, flase, b) (這里b為true),直到其返回成功為止,再進(jìn)行臨界區(qū)中的操作,離開(kāi)臨界區(qū)時(shí)將flag置為false。
b = true; while(!CAS(flag, false, b)); //do something flag = false;
現(xiàn)在,判斷操作與寫(xiě)入操作已經(jīng)成為了一個(gè)整體,當(dāng)一個(gè)線程的CAS操作成功的時(shí)候會(huì)阻止其他線程進(jìn)入臨界區(qū),到達(dá)互斥訪問(wèn)的目的。
現(xiàn)在我們已經(jīng)可以使用CAS操作來(lái)解決臨界區(qū)的互斥訪問(wèn)的問(wèn)題了,但是如果每次都這樣寫(xiě)一遍實(shí)在太過(guò)麻煩,因此可以進(jìn)行一些封裝使得使用更加方便,也就是說(shuō)...可以封裝成自旋鎖。我們可以用一個(gè)類來(lái)表示,將一個(gè)bool值作為類的數(shù)據(jù)成員,同時(shí)將CAS操作和賦值操作作為其成員函數(shù),CAS操作其實(shí)就是加鎖操作,而后面的賦值操作就是解鎖操作。
三、用C++原子量實(shí)現(xiàn)
按照上面的思路,接下來(lái)用 C++ 11 引入標(biāo)準(zhǔn)庫(kù)的原子量來(lái)實(shí)現(xiàn)一個(gè)自旋鎖并且進(jìn)行測(cè)試。
首先,我們需要一個(gè)bool值來(lái)表示鎖的狀態(tài),這里直接使用標(biāo)準(zhǔn)庫(kù)中的原子量 atomic<bool> (C++ 11的原子量可以參考:http://www.dbjr.com.cn/article/141896.htm ,在我的平臺(tái)(Cygwin64、GCC7.3)上 atomic<bool> 的成員函數(shù)is_lock_free()返回值為true,是無(wú)鎖的實(shí)現(xiàn)(如果內(nèi)部使用了鎖來(lái)實(shí)現(xiàn)的話那還叫什么自旋鎖 = =)。實(shí)際上在大多數(shù)平臺(tái)上 atomic<bool> 都是無(wú)鎖的,如果不確定的話也可以使用C++標(biāo)準(zhǔn)規(guī)定必須為無(wú)鎖實(shí)現(xiàn)的atomic_flag。
接下來(lái),我們需要兩個(gè)原子操作,CAS和賦值,C++11標(biāo)準(zhǔn)庫(kù)在原子量的成員函數(shù)中直接提供了這兩個(gè)操作。
//CAS
std::atomic::compare_exchange_weak( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst ),
std::atomic::compare_exchange_strong( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst )
//賦值
void store( T desired, std::memory_order order = std::memory_order_seq_cst )
compare_exchange_weak 與 compare_exchange_strong 主要的區(qū)別在于內(nèi)存中的值與expected相等的時(shí)候,CAS操作是否一定能成功,compare_exchange_weak有概率會(huì)返回失敗,而compare_exchange_strong則一定會(huì)成功。因此,compare_exchange_weak必須與循環(huán)搭配使用來(lái)保證在失敗的時(shí)候重試CAS操作。得到的好處是在某些平臺(tái)上compare_exchange_weak性能更好。按照上面的模型,我們本來(lái)就要和while搭配使用,可以使用compare_exchange_weak。最后內(nèi)存序的選擇沒(méi)有特殊需求直接使用默認(rèn)的std::memory_order_seq_cst。而賦值操作非常簡(jiǎn)單直接,這個(gè)調(diào)用一定會(huì)成功(只是賦值而已 = =),沒(méi)有返回值。
實(shí)現(xiàn)代碼非常短,下面是源代碼:
#include <atomic>
class SpinLock {
public:
SpinLock() : flag_(false)
{}
void lock()
{
bool expect = false;
while (!flag_.compare_exchange_weak(expect, true))
{
//這里一定要將expect復(fù)原,執(zhí)行失敗時(shí)expect結(jié)果是未定的
expect = false;
}
}
void unlock()
{
flag_.store(false);
}
private:
std::atomic<bool> flag_;
};
如上面所說(shuō),lock操作不停的嘗試CAS操作直到成功為止,unlock操作則將bool標(biāo)志位復(fù)原。使用方式如下:
SpinLock myLock; myLock.lock(); //do something myLock.unlock();
接下來(lái),我們進(jìn)行正確性測(cè)試,以經(jīng)典的i++ 問(wèn)題為例:
#include <atomic>
#include <thread>
#include <vector>
//自旋鎖類定義
class SpinLock {
public:
SpinLock() : flag_(false)
{}
void lock()
{
bool expect = false;
while (!flag_.compare_exchange_weak(expect, true))
{
expect = false;
}
}
void unlock()
{
flag_.store(false);
}
private:
std::atomic<bool> flag_;
};
//每個(gè)線程自增次數(shù)
const int kIncNum = 1000000;
//線程數(shù)
const int kWorkerNum = 10;
//自增計(jì)數(shù)器
int count = 0;
//自旋鎖
SpinLock spinLock;
//每個(gè)線程的工作函數(shù)
void IncCounter()
{
for (int i = 0; i < kIncNum; ++i)
{
spinLock.lock();
count++;
spinLock.unlock();
}
}
int main()
{
std::vector<std::thread> workers;
std::cout << "SpinLock inc MyTest start" << std::endl;
count = 0;
std::cout << "start " << kWorkerNum << " workers_" << "every worker inc " << kIncNum << std::endl;
std::cout << "count_: " << count << std::endl;
//創(chuàng)建10個(gè)工作線程進(jìn)行自增操作
for (int i = 0; i < kWorkerNum; ++i)
workers.push_back(std::move(std::thread(IncCounter)));
for (auto it = workers.begin(); it != workers.end(); it++)
it->join();
std::cout << "workers_ end" << std::endl;
std::cout << "count_: " << count << std::endl;
//驗(yàn)證結(jié)果
if (count == kIncNum * kWorkerNum)
{
std::cout << "SpinLock inc MyTest passed" << std::endl;
return true;
}
else
{
std::cout << "SpinLock inc MyTest failed" << std::endl;
return false;
}
return 0;
}
上面的代碼中創(chuàng)建了10個(gè)線程對(duì)共享的全局變量count分別進(jìn)行一百萬(wàn)次++操作,然后驗(yàn)證結(jié)果是否正確,最終執(zhí)行的輸出為:
SpinLock inc MyTest start start 10 workers_every worker inc 1000000 count_: 0 workers_ end count_: 10000000 SpinLock inc MyTest passed
從結(jié)果中可以看出我們實(shí)現(xiàn)的自旋鎖起到了保護(hù)臨界區(qū)(這里就是i++ )的作用,count最后的值等于每個(gè)線程執(zhí)行自增的數(shù)目之和。作為對(duì)比,可以去掉IncCounter中的加鎖解鎖操作:
void IncCounter()
{
for (int i = 0; i < kIncNum; ++i)
{
//spinLock.lock();
count++;
//spinLock.unlock();
}
}
執(zhí)行后的輸出為:
SpinLock inc MyTest start start 10 workers_every worker inc 1000000 count_: 0 workers_ end count_: 7254522 SpinLock inc MyTest failed
結(jié)果由于多個(gè)線程同時(shí)執(zhí)行 i++ 造成結(jié)果錯(cuò)誤。
到這里,我們就通過(guò) C++ 11的原子量實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的自旋鎖。這里只是對(duì)C++原子量的一個(gè)小使用,無(wú)論是自旋鎖本身還是原子量都還有許多值得探究的地方。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
C語(yǔ)言數(shù)據(jù)結(jié)構(gòu)之堆排序詳解
堆是計(jì)算機(jī)科學(xué)中一類特殊的數(shù)據(jù)結(jié)構(gòu)的統(tǒng)稱,通常是一個(gè)可以被看做一棵完全二叉樹(shù)的數(shù)組對(duì)象。而堆排序是利用堆這種數(shù)據(jù)結(jié)構(gòu)所設(shè)計(jì)的一種排序算法。本文將通過(guò)圖片詳細(xì)介紹堆排序,需要的可以參考一下2022-03-03
C語(yǔ)言簡(jiǎn)單實(shí)現(xiàn)三子棋游戲
這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言簡(jiǎn)單實(shí)現(xiàn)三子棋游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
Matlab實(shí)現(xiàn)灰色預(yù)測(cè)的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何利用Matlab實(shí)現(xiàn)灰色預(yù)測(cè),文中的示例代碼講解詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴可以了解一下2022-05-05
詳談c++11 final與override說(shuō)明符
下面小編就為大家?guī)?lái)一篇詳談c++11 final與override說(shuō)明符。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-01-01
C++中4種強(qiáng)制類型轉(zhuǎn)換的區(qū)別總結(jié)
C++風(fēng)格的類型轉(zhuǎn)換提供了4種類型轉(zhuǎn)換操作符來(lái)應(yīng)對(duì)不同場(chǎng)合的應(yīng)用。下面這篇文章主要給大家介紹了C++中4種強(qiáng)制類型轉(zhuǎn)換的區(qū)別,有需要的朋友們可以參考借鑒,下面來(lái)一起看看吧。2016-12-12
使用C語(yǔ)言求二叉樹(shù)結(jié)點(diǎn)的最低公共祖先的方法
這篇文章主要介紹了使用C語(yǔ)言求二叉樹(shù)結(jié)點(diǎn)的最低公共祖先的方法,文中還給出了ACM的練習(xí)題目,需要的朋友可以參考下2015-08-08

