C++中的并行與并發(fā)基礎與使用詳解
1. 并行基礎
std::thread 用于創(chuàng)建一個執(zhí)行的線程實例,所以它是一切并發(fā)編程的基礎,使用時需要包含 <thread> 頭文件, 它提供了很多基本的線程操作,例如 get_id() 來獲取所創(chuàng)建線程的線程 ID,使用 join() 來加入一個線程等等,例如:
#include <iostream> #include <thread> int main() { std::thread t([](){ std::cout << "hello world." << std::endl; }); t.join(); return 0; }
2. 互斥量與臨界區(qū)
我們在操作系統(tǒng)、亦或是數(shù)據(jù)庫的相關知識中已經了解過了有關并發(fā)技術的基本知識,mutex 就是其中的核心之一。 C++11 引入了 mutex 相關的類,其所有相關的函數(shù)都放在 <mutex> 頭文件中。
std::mutex 是 C++11 中最基本的 mutex 類,通過實例化 std::mutex 可以創(chuàng)建互斥量, 而通過其成員函數(shù) lock() 可以進行上鎖,unlock() 可以進行解鎖。 但是在實際編寫代碼的過程中,最好不去直接調用成員函數(shù), 因為調用成員函數(shù)就需要在每個臨界區(qū)的出口處調用 unlock(),當然,還包括異常。 這時候 C++11 還為互斥量提供了一個 RAII 語法的模板類 std::lock_guard。 RAII 在不失代碼簡潔性的同時,很好的保證了代碼的異常安全性。
在 RAII 用法下,對于臨界區(qū)的互斥量的創(chuàng)建只需要在作用域的開始部分,例如:
#include <iostream> #include <mutex> #include <thread> int v = 1; void critical_section(int change_v) { static std::mutex mtx; std::lock_guard<std::mutex> lock(mtx); // 執(zhí)行競爭操作 v = change_v; // 離開此作用域后 mtx 會被釋放 } int main() { std::thread t1(critical_section, 2), t2(critical_section, 3); t1.join(); t2.join(); std::cout << v << std::endl; return 0; }
由于 C++ 保證了所有棧對象在生命周期結束時會被銷毀,所以這樣的代碼也是異常安全的。 無論 critical_section() 正常返回、還是在中途拋出異常,都會引發(fā)堆棧回退,也就自動調用了 unlock()。
而 std::unique_lock 則是相對于 std::lock_guard 出現(xiàn)的,std::unique_lock 更加靈活, std::unique_lock 的對象會以獨占所有權(沒有其他的 unique_lock 對象同時擁有某個 mutex 對象的所有權) 的方式管理 mutex 對象上的上鎖和解鎖的操作。所以在并發(fā)編程中,推薦使用 std::unique_lock。
std::lock_guard 不能顯式的調用 lock 和 unlock, 而 std::unique_lock 可以在聲明后的任意位置調用, 可以縮小鎖的作用范圍,提供更高的并發(fā)度。
如果你用到了條件變量 std::condition_variable::wait 則必須使用 std::unique_lock 作為參數(shù)。
例如:
#include <iostream> #include <mutex> #include <thread> int v = 1; void critical_section(int change_v) { static std::mutex mtx; std::unique_lock<std::mutex> lock(mtx); // 執(zhí)行競爭操作 v = change_v; std::cout << v << std::endl; // 將鎖進行釋放 lock.unlock(); // 在此期間,任何人都可以搶奪 v 的持有權 // 開始另一組競爭操作,再次加鎖 lock.lock(); v += 1; std::cout << v << std::endl; } int main() { std::thread t1(critical_section, 2), t2(critical_section, 3); t1.join(); t2.join(); return 0; }
3. 期物
期物(Future)表現(xiàn)為 std::future,它提供了一個訪問異步操作結果的途徑,這句話很不好理解。 為了理解這個特性,我們需要先理解一下在 C++11 之前的多線程行為。
試想,如果我們的主線程 A 希望新開辟一個線程 B 去執(zhí)行某個我們預期的任務,并返回我一個結果。 而這時候,線程 A 可能正在忙其他的事情,無暇顧及 B 的結果, 所以我們會很自然的希望能夠在某個特定的時間獲得線程 B 的結果。
在 C++11 的 std::future 被引入之前,通常的做法是: 創(chuàng)建一個線程 A,在線程 A 里啟動任務 B,當準備完畢后發(fā)送一個事件,并將結果保存在全局變量中。 而主函數(shù)線程 A 里正在做其他的事情,當需要結果的時候,調用一個線程等待函數(shù)來獲得執(zhí)行的結果。
而 C++11 提供的 std::future 簡化了這個流程,可以用來獲取異步任務的結果。 自然地,我們很容易能夠想象到把它作為一種簡單的線程同步手段,即屏障(barrier)。
為了看一個例子,我們這里額外使用 std::packaged_task,它可以用來封裝任何可以調用的目標,從而用于實現(xiàn)異步的調用。 舉例來說:
#include <iostream> #include <future> #include <thread> int main() { // 將一個返回值為7的 lambda 表達式封裝到 task 中 // std::packaged_task 的模板參數(shù)為要封裝函數(shù)的類型 std::packaged_task<int()> task([](){return 7;}); // 獲得 task 的期物 std::future<int> result = task.get_future(); // 在一個線程中執(zhí)行 task std::thread(std::move(task)).detach(); std::cout << "waiting..."; result.wait(); // 在此設置屏障,阻塞到期物的完成 // 輸出執(zhí)行結果 std::cout << "done!" << std:: endl << "future result is " << result.get() << std::endl; return 0; }
在封裝好要調用的目標后,可以使用 get_future() 來獲得一個 std::future 對象,以便之后實施線程同步。
4. 條件變量
條件變量 std::condition_variable 是為了解決死鎖而生,當互斥操作不夠用而引入的。 比如,線程可能需要等待某個條件為真才能繼續(xù)執(zhí)行, 而一個忙等待循環(huán)中可能會導致所有其他線程都無法進入臨界區(qū)使得條件為真時,就會發(fā)生死鎖。 所以,condition_variable 實例被創(chuàng)建出現(xiàn)主要就是用于喚醒等待線程從而避免死鎖。 std::condition_variable的 notify_one() 用于喚醒一個線程; notify_all() 則是通知所有線程。下面是一個生產者和消費者模型的例子:
#include <queue> #include <chrono> #include <mutex> #include <thread> #include <iostream> #include <condition_variable> int main() { std::queue<int> produced_nums; std::mutex mtx; std::condition_variable cv; bool notified = false; // 通知信號 // 生產者 auto producer = [&]() { for (int i = 0; ; i++) { std::this_thread::sleep_for(std::chrono::milliseconds(900)); std::unique_lock<std::mutex> lock(mtx); std::cout << "producing " << i << std::endl; produced_nums.push(i); notified = true; cv.notify_all(); // 此處也可以使用 notify_one } }; // 消費者 auto consumer = [&]() { while (true) { std::unique_lock<std::mutex> lock(mtx); while (!notified) { // 避免虛假喚醒 cv.wait(lock); } // 短暫取消鎖,使得生產者有機會在消費者消費空前繼續(xù)生產 lock.unlock(); // 消費者慢于生產者 std::this_thread::sleep_for(std::chrono::milliseconds(1000)); lock.lock(); while (!produced_nums.empty()) { std::cout << "consuming " << produced_nums.front() << std::endl; produced_nums.pop(); } notified = false; } }; // 分別在不同的線程中運行 std::thread p(producer); std::thread cs[2]; for (int i = 0; i < 2; ++i) { cs[i] = std::thread(consumer); } p.join(); for (int i = 0; i < 2; ++i) { cs[i].join(); } return 0; }
值得一提的是,在生產者中我們雖然可以使用 notify_one(),但實際上并不建議在此處使用, 因為在多消費者的情況下,我們的消費者實現(xiàn)中簡單放棄了鎖的持有,這使得可能讓其他消費者 爭奪此鎖,從而更好的利用多個消費者之間的并發(fā)。話雖如此,但實際上因為 std::mutex 的排他性, 我們根本無法期待多個消費者能真正意義上的并行消費隊列的中生產的內容,我們仍需要粒度更細的手段。
5. 原子操作與內存模型
細心的讀者可能會對前一小節(jié)中生產者消費者模型的例子可能存在編譯器優(yōu)化導致程序出錯的情況產生疑惑。 例如,布爾值 notified 沒有被 volatile 修飾,編譯器可能對此變量存在優(yōu)化,例如將其作為一個寄存器的值, 從而導致消費者線程永遠無法觀察到此值的變化。這是一個好問題,為了解釋清楚這個問題,我們需要進一步討論 從 C++ 11 起引入的內存模型這一概念。我們首先來看一個問題,下面這段代碼輸出結果是多少?
#include <thread> #include <iostream> int main() { int a = 0; int flag = 0; std::thread t1([&]() { while (flag != 1); int b = a; std::cout << "b = " << b << std::endl; }); std::thread t2([&]() { a = 5; flag = 1; }); t1.join(); t2.join(); return 0; }
從直觀上看,t2 中 a = 5; 這一條語句似乎總在 flag = 1; 之前得到執(zhí)行,而 t1 中 while (flag != 1) 似乎保證了 std::cout << "b = " << b << std::endl; 不會再標記被改變前執(zhí)行。從邏輯上看,似乎 b 的值應該等于 5。 但實際情況遠比此復雜得多,或者說這段代碼本身屬于未定義的行為,因為對于 a 和 flag 而言,他們在兩個并行的線程中被讀寫, 出現(xiàn)了競爭。除此之外,即便我們忽略競爭讀寫,仍然可能受 CPU 的亂序執(zhí)行,編譯器對指令的重排的影響, 導致 a = 5 發(fā)生在 flag = 1 之后。從而 b 可能輸出 0。
5.1原子操作
std::mutex 可以解決上面出現(xiàn)的并發(fā)讀寫的問題,但互斥鎖是操作系統(tǒng)級的功能, 這是因為一個互斥鎖的實現(xiàn)通常包含兩條基本原理:
提供線程間自動的狀態(tài)轉換,即『鎖住』這個狀態(tài)
保障在互斥鎖操作期間,所操作變量的內存與臨界區(qū)外進行隔離
這是一組非常強的同步條件,換句話說當最終編譯為 CPU 指令時會表現(xiàn)為非常多的指令(我們之后再來看如何實現(xiàn)一個簡單的互斥鎖)。 這對于一個僅需原子級操作(沒有中間態(tài))的變量,似乎太苛刻了。
關于同步條件的研究有著非常久遠的歷史,我們在這里不進行贅述。讀者應該明白,現(xiàn)代 CPU 體系結構提供了 CPU 指令級的原子操作, 因此在 C++11 中多線程下共享變量的讀寫這一問題上,還引入了 std::atomic 模板,使得我們實例化一個原子類型,將一個 原子類型讀寫操作從一組指令,最小化到單個 CPU 指令。例如:
std::atomic<int> counter;
并為整數(shù)或浮點數(shù)的原子類型提供了基本的數(shù)值成員函數(shù),舉例來說, 包括 fetch_add, fetch_sub 等,同時通過重載方便的提供了對應的 +,- 版本。 比如下面的例子:
#include <atomic> #include <thread> #include <iostream> std::atomic<int> count = {0}; int main() { std::thread t1([](){ count.fetch_add(1); }); std::thread t2([](){ count++; // 等價于 fetch_add count += 1; // 等價于 fetch_add }); t1.join(); t2.join(); std::cout << count << std::endl; return 0; }
當然,并非所有的類型都能提供原子操作,這是因為原子操作的可行性取決于具體的 CPU 架構,以及所實例化的類型結構是否能夠滿足該 CPU 架構對內存對齊 條件的要求,因而我們總是可以通過 std::atomic<T>::is_lock_free 來檢查該原子類型是否需支持原子操作,例如:
#include <atomic> #include <iostream> struct A { float x; int y; long long z; }; int main() { std::atomic<A> a; std::cout << std::boolalpha << a.is_lock_free() << std::endl; return 0; }
5.2一致性模型
并行執(zhí)行的多個線程,從某種宏觀層面上討論,可以粗略的視為一種分布式系統(tǒng)。 在分布式系統(tǒng)中,任何通信乃至本地操作都需要消耗一定時間,甚至出現(xiàn)不可靠的通信。
如果我們強行將一個變量 v 在多個線程之間的操作設為原子操作,即任何一個線程在操作完 v 后, 其他線程均能同步感知到 v 的變化,則對于變量 v 而言,表現(xiàn)為順序執(zhí)行的程序,它并沒有由于引入多線程 而得到任何效率上的收益。對此有什么辦法能夠適當?shù)募铀倌??答案便是削弱原子操作的在進程間的同步條件。
從原理上看,每個線程可以對應為一個集群節(jié)點,而線程間的通信也幾乎等價于集群節(jié)點間的通信。 削弱進程間的同步條件,通常我們會考慮四種不同的一致性模型:
線性一致性:又稱強一致性或原子一致性。它要求任何一次讀操作都能讀到某個數(shù)據(jù)的最近一次寫的數(shù)據(jù),并且所有線程的操作順序與全局時鐘下的順序是一致的。
x.store(1) x.load()
T1 ---------+----------------+------>
T2 -------------------+------------->
x.store(2)
在這種情況下線程 T1, T2 對 x 的兩次寫操作是原子的,且 x.store(1) 是嚴格的發(fā)生在 x.store(2) 之前,x.store(2) 嚴格的發(fā)生在 x.load() 之前。 值得一提的是,線性一致性對全局時鐘的要求是難以實現(xiàn)的,這也是人們不斷研究比這個一致性更弱條件下其他一致性的算法的原因。
順序一致性:同樣要求任何一次讀操作都能讀到數(shù)據(jù)最近一次寫入的數(shù)據(jù),但未要求與全局時鐘的順序一致。
x.store(1) x.store(3) x.load()
T1 ---------+-----------+----------+----->
T2 ---------------+---------------------->
x.store(2)
或者
x.store(1) x.store(3) x.load()
T1 ---------+-----------+----------+----->
T2 ------+------------------------------->
x.store(2)
在順序一致性的要求下,x.load() 必須讀到最近一次寫入的數(shù)據(jù),因此 x.store(2) 與 x.store(1) 并無任何先后保障,即 只要 T2 的 x.store(2) 發(fā)生在 x.store(3) 之前即可。
因果一致性:它的要求進一步降低,只需要有因果關系的操作順序得到保障,而非因果關系的操作順序則不做要求。
a = 1 b = 2
T1 ----+-----------+---------------------------->
T2 ------+--------------------+--------+-------->
x.store(3) c = a + b y.load()
或者
a = 1 b = 2
T1 ----+-----------+---------------------------->
T2 ------+--------------------+--------+-------->
x.store(3) y.load() c = a + b
亦或者
b = 2 a = 1
T1 ----+-----------+---------------------------->
T2 ------+--------------------+--------+-------->
y.load() c = a + b x.store(3)
上面給出的三種例子都是屬于因果一致的,因為整個過程中,只有 c 對 a 和 b 產生依賴,而 x 和 y 在此例子中表現(xiàn)為沒有關系(但實際情況中我們需要更詳細的信息才能確定 x 與 y 確實無關)
最終一致性:是最弱的一致性要求,它只保障某個操作在未來的某個時間節(jié)點上會被觀察到,但并未要求被觀察到的時間。因此我們甚至可以對此條件稍作加強,例如規(guī)定某個操作被觀察到的時間總是有界的。當然這已經不在我們的討論范圍之內了。
x.store(3) x.store(4)
T1 ----+-----------+-------------------------------------------->
T2 ---------+------------+--------------------+--------+-------->
x.read x.read() x.read() x.read()
在上面的情況中,如果我們假設 x 的初始值為 0,則 T2 中四次 x.read() 結果可能但不限于以下情況:
3 4 4 4 // x 的寫操作被很快觀察到
0 3 3 4 // x 的寫操作被觀察到的時間存在一定延遲
0 0 0 4 // 最后一次讀操作讀到了 x 的最終值,但此前的變化并未觀察到
0 0 0 0 // 在當前時間段內 x 的寫操作均未被觀察到,
// 但未來某個時間點上一定能觀察到 x 為 4 的情況
5.3內存順序
為了追求極致的性能,實現(xiàn)各種強度要求的一致性,C++11 為原子操作定義了六種不同的內存順序 std::memory_order 的選項,表達了四種多線程間的同步模型:
寬松模型:在此模型下,單個線程內的原子操作都是順序執(zhí)行的,不允許指令重排,但不同線程間原子操作的順序是任意的。類型通過 std::memory_order_relaxed 指定。我們來看一個例子:
std::atomic<int> counter = {0}; std::vector<std::thread> vt; for (int i = 0; i < 100; ++i) { vt.emplace_back([&](){ counter.fetch_add(1, std::memory_order_relaxed); }); } for (auto& t : vt) { t.join(); } std::cout << "current counter:" << counter << std::endl;
釋放/消費模型:在此模型中,我們開始限制進程間的操作順序,如果某個線程需要修改某個值,但另一個線程會對該值的某次操作產生依賴,即后者依賴前者。具體而言,線程 A 完成了三次對 x 的寫操作,線程 B 僅依賴其中第三次 x 的寫操作,與 x 的前兩次寫行為無關,則當 A 主動 x.release() 時候(即使用 std::memory_order_release),選項 std::memory_order_consume 能夠確保 B 在調用 x.load() 時候觀察到 A 中第三次對 x 的寫操作。我們來看一個例子:
// 初始化為 nullptr 防止 consumer 線程從野指針進行讀取 std::atomic<int*> ptr(nullptr); int v; std::thread producer([&]() { int* p = new int(42); v = 1024; ptr.store(p, std::memory_order_release); }); std::thread consumer([&]() { int* p; while(!(p = ptr.load(std::memory_order_consume))); std::cout << "p: " << *p << std::endl; std::cout << "v: " << v << std::endl; }); producer.join(); consumer.join();
釋放/獲取模型:在此模型下,我們可以進一步加緊對不同線程間原子操作的順序的限制,在釋放 std::memory_order_release 和獲取 std::memory_order_acquire 之間規(guī)定時序,即發(fā)生在釋放(release)操作之前的所有寫操作,對其他線程的任何獲?。╝cquire)操作都是可見的,亦即發(fā)生順序(happens-before)。
可以看到,std::memory_order_release 確保了它之前的寫操作不會發(fā)生在釋放操作之后,是一個向后的屏障(backward),而 std::memory_order_acquire 確保了它之前的寫行為不會發(fā)生在該獲取操作之后,是一個向前的屏障(forward)。對于選項 std::memory_order_acq_rel 而言,則結合了這兩者的特點,唯一確定了一個內存屏障,使得當前線程對內存的讀寫不會被重排并越過此操作的前后:
我們來看一個例子:
std::vector<int> v; std::atomic<int> flag = {0}; std::thread release([&]() { v.push_back(42); flag.store(1, std::memory_order_release); }); std::thread acqrel([&]() { int expected = 1; // must before compare_exchange_strong while(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) expected = 1; // must after compare_exchange_strong // flag has changed to 2 }); std::thread acquire([&]() { while(flag.load(std::memory_order_acquire) < 2); std::cout << v.at(0) << std::endl; // must be 42 }); release.join(); acqrel.join(); acquire.join();
在此例中我們使用了 compare_exchange_strong 比較交換原語(Compare-and-swap primitive),它有一個更弱的版本,即 compare_exchange_weak,它允許即便交換成功,也仍然返回 false 失敗。其原因是因為在某些平臺上虛假故障導致的,具體而言,當 CPU 進行上下文切換時,另一線程加載同一地址產生的不一致。除此之外,compare_exchange_strong 的性能可能稍差于 compare_exchange_weak,但大部分情況下,鑒于其使用的復雜度而言,compare_exchange_weak 應該被有限考慮。
順序一致模型:在此模型下,原子操作滿足順序一致性,進而可能對性能產生損耗??娠@式的通過 std::memory_order_seq_cst 進行指定。最后來看一個例子: std::atomic<int> counter = {0}; std::vector<std::thread> vt; for (int i = 0; i < 100; ++i) { vt.emplace_back([&](){ counter.fetch_add(1, std::memory_order_seq_cst); }); } for (auto& t : vt) { t.join(); } std::cout << "current counter:" << counter << std::endl;
這個例子與第一個寬松模型的例子本質上沒有區(qū)別,僅僅只是將原子操作的內存順序修改為了 memory_order_seq_cst,有興趣的讀者可以自行編寫程序測量這兩種不同內存順序導致的性能差異。
到此這篇關于C++中的并行與并發(fā)基礎與使用詳解的文章就介紹到這了,更多相關C++并行與并發(fā)內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
VC++ 字符串String MD5計算小工具 VS2008工程
基于字符串加密的MD5算法,VS2008 VC++,多字節(jié)編譯工程。主要代碼如下,實現(xiàn)了ANSI字符串加密與Unicode字符串加密,需要的朋友可以參考下2017-07-07iostream與iostream.h的區(qū)別詳細解析
以下是對C++中iostream與iostream.h的區(qū)別進行了詳細的分析介紹,需要的朋友可以過來參考下2013-09-09