利用C++如何實現(xiàn)一個阻塞隊列詳解
阻塞隊列是多線程中常用的數(shù)據(jù)結(jié)構(gòu),對于實現(xiàn)多線程之間的數(shù)據(jù)交換、同步等有很大作用。
阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程。簡而言之,阻塞隊列是生產(chǎn)者用來存放元素、消費者獲取元素的容器。
考慮下,這樣一個多線程模型,程序有一個主線程 master 和一些 worker 線程,master 線程負責接收到數(shù)據(jù),給 worker 線程分配數(shù)據(jù),worker 線程取得一個任務(wù)后便可以開始工作,如果沒有任務(wù)便阻塞住,節(jié)約 cpu 資源。
- master 線程 (生產(chǎn)者):負責往阻塞隊列中塞入數(shù)據(jù),并喚醒正在阻塞的 worker 線程。
- workder 線程(消費者):負責從阻塞隊列中取數(shù)據(jù),如果沒有數(shù)據(jù)便阻塞,直到被 master 線程喚醒。
那么怎樣的數(shù)據(jù)結(jié)構(gòu)比較適合做這樣的喚醒呢?顯而易見,是條件變量,在 c++ 11 中,stl 已經(jīng)引入了線程支持庫。
C++11 中條件變量
條件變量一般與一個 互斥量 同時使用,使用時需要先給互斥量上鎖,然后條件變量會檢測是否滿足條件,如果不滿足條件便會暫時釋放鎖,然后阻塞線程。
c++ 11使用方法主要如下:
#include <mutex> #include <condition_value> // 互斥量與條件變量 std::mutex m_mutex; std::condition_value m_condition; // 請求信號的一方 std::unique_lock<std::mutex> lock(mutex); while(xxx) { // 這里會先釋放 lock, // 如果有信號喚醒的話,會重新加鎖。 m_condition.wait(lock); } // 發(fā)送消息進行同步的一方 { std::unique_lock<std::mutex> lock(mutex); // 喚醒其他正在 wait 的線程 m_condition.notify_all(); }
用 C++11 實現(xiàn)阻塞隊列
我們使用條件變量包裝 STL 中的 queue 就可以實現(xiàn)阻塞隊列功能,如果有興趣,甚至可以自己實現(xiàn)一個效率更高的隊列數(shù)據(jù)結(jié)構(gòu)。
我們先假設(shè)一下阻塞隊列需要如下接口:
- push 將一個變量塞入隊列;
- take 從隊列中取出一個元素;
- size 查看隊列有多少個元素;
template <typename T> class BlockingQueue { public: BlockingQueue(); void push(T&& value); T take(); size_t size() const; private: // 實際使用的數(shù)據(jù)結(jié)構(gòu)隊列 std::queue<T> m_data; // 條件變量 std::mutex m_mutex; std::condition_variable m_condition; };
push 一個變量時,我們需要先加鎖,加鎖成功后才可以壓入變量,這是為了線程安全。壓入變量后,就可以發(fā)送信號通知正在阻塞的條件變量。
void push(T&& value) { // 往隊列中塞數(shù)據(jù)前要先加鎖 std::unique_lock<std::mutex> lock(m_mutex); m_data.push(value); // 喚醒正在阻塞的條件變量 m_condition.notify_all(); }
take 一個變量時,就要有些不一樣:
- 先加鎖,加鎖成功后,如果隊列不為空,可以直接取數(shù)據(jù),不需要 wait;
- 如果隊列為空呢,則 wait 等待,直到被喚醒;
- 考慮特殊情況,喚醒后隊列依然是空的……
T take() { std::unique_lock<std::mutex> lock(m_mutex); while(m_data.empty()) { // 等待 m_condition.wait(lock); } assert(!m_data.empty()); T value(std::move(m_data.front())); m_data.pop(); return value; }
總結(jié)下,代碼如下:
#ifndef BLOCKINGQUEUE_H #define BLOCKINGQUEUE_H #include <queue> #include <mutex> #include <condition_variable> #include <assert.h> template <typename T> class BlockingQueue { public: BlockingQueue() :m_mutex(), m_condition(), m_data() { } // 禁止拷貝構(gòu)造 BlockingQueue(BlockingQueue&) = delete; ~BlockingQueue() { } void push(T&& value) { // 往隊列中塞數(shù)據(jù)前要先加鎖 std::unique_lock<std::mutex> lock(m_mutex); m_data.push(value); m_condition.notify_all(); } void push(const T& value) { std::unique_lock<std::mutex> lock(m_mutex); m_data.push(value); m_condition.notify_all(); } T take() { std::unique_lock<std::mutex> lock(m_mutex); while(m_data.empty()) { m_condition.wait(lock); } assert(!m_data.empty()); T value(std::move(m_data.front())); m_data.pop(); return value; } size_t size() const { std::unique_lock<std::mutex> lock(m_mutex); return m_data.size(); } private: // 實際使用的數(shù)據(jù)結(jié)構(gòu)隊列 std::queue<T> m_data; // 條件變量的鎖 std::mutex m_mutex; std::condition_variable m_condition; }; #endif // BLOCKINGQUEUE_H
實驗代碼
我們寫個簡單的程序?qū)嶒炓幌?,下面程序?一個 master 線程,5個 worker 線程,master線程生成一個隨機數(shù),求 0-隨機數(shù) 的和。
#include <iostream> #include <thread> #include <mutex> #include <random> #include <windows.h> #include <blockingqueue.h> using namespace std; int task=100; BlockingQueue<int> blockingqueue; std::mutex mutex_cout; void worker() { int value; thread::id this_id = this_thread::get_id(); while(true) { value = blockingqueue.take(); uint64_t sum = 0; for(int i = 0; i < value; i++) { sum += i; } // 模擬耗時操作 Sleep(100); std::lock_guard<mutex> lock(mutex_cout); std::cout << "workder: " << this_id << " " << __FUNCTION__ << " line: " << __LINE__ << " sum: " << sum << std::endl; } } void master() { srand(time(nullptr)); for(int i = 0; i < task; i++) { blockingqueue.push(rand()%10000); printf("%s %d %i\n",__FUNCTION__, __LINE__, i); Sleep(20); } } int main() { thread th_master(master); std::vector<thread> th_workers; for(int i =0; i < 5; i++) { th_workers.emplace_back(thread(worker)); } th_master.join(); return 0; }
從輸出結(jié)果可以看出,master 線程將任務(wù)分配給了正在空閑的 worker 線程,具體是哪個線程就看操作系統(tǒng)的隨機調(diào)度了。
master 46 5
worker: 3 worker line: 34 sum: 20998440
master 46 6
worker: 7 worker line: 34 sum: 3308878
master 46 7
worker: 4 worker line: 34 sum: 34598721
master 46 8
worker: 6 worker line: 34 sum: 1563796
master 46 9
worker: 5 worker line: 34 sum: 27978940
Reference
總結(jié)
到此這篇關(guān)于利用C++如何實現(xiàn)一個阻塞隊列的文章就介紹到這了,更多相關(guān)C++實現(xiàn)阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C語言數(shù)據(jù)結(jié)構(gòu)二叉樹先序、中序、后序及層次四種遍歷
這篇文章主要介紹了C語言數(shù)據(jù)結(jié)構(gòu)二叉樹先序、中序、后序及層次四種遍歷方式,具有一定的知識性參考價值,需要的小伙伴可以先看一下2022-02-02C語言使用鏈表實現(xiàn)學(xué)生籍貫管理系統(tǒng)
這篇文章主要為大家詳細介紹了C語言使用鏈表實現(xiàn)學(xué)生籍貫管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02C語言匯編分析傳遞結(jié)構(gòu)體指針比傳遞結(jié)構(gòu)體變量高效的深層原因
本文章使用的工具是vs2010,本篇文章主要講解結(jié)構(gòu)體指針作為參數(shù)傳遞與結(jié)構(gòu)體變量作為參數(shù)傳遞的對比,不談值傳遞與址傳遞的概念2022-10-10C語言數(shù)據(jù)結(jié)構(gòu)之堆排序源代碼
這篇文章主要為大家詳細介紹了C語言數(shù)據(jù)結(jié)構(gòu)之堆排序源代碼,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01