Linux基于阻塞隊列的生產消費者模型詳解
一、什么是生產消費者模型
生產消費者模型就是通過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而是通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接交給阻塞隊列,消費者不找生產者索要數據,而是直接從阻塞隊列中去取,這樣一來,阻塞隊列就相當于一個緩沖區(qū),平衡了生產者和消費者的處理能力
對于生產消費者模型,我們有一個321規(guī)則,分別是3種關系,2種角色,1個交易場所
- 三種關系:生產者和生產者的互斥競爭關系,消費者和消費者的互斥競爭關系,生產者和消費者的互斥、同步關系
- 兩種角色:生產者和消費者
- 一個交易場所:特定結構的內存空間(如阻塞隊列)
二、基于阻塞隊列的生產消費者模型
1、理論研究
在多線程編程中,阻塞隊列是一種常用于實現生產者和消費者模型的數據結構,其與普通的隊列區(qū)別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中再次被放入元素,當隊列滿時,往隊列中存放元素的操作也會被阻塞,直到有元素從隊列中被獲取
生產消費者模型最大的好處是,也是生產消費者模型效率高的原因是:在消費者獲取數據(一般是網絡數據)的時候,生產者可以生產數據,生產者放入數據的時候,消費者可以處理數據,雖然特定內存結構,也就是臨界資源區(qū)是有鎖的,只能由單線程通過,只要將時間合理化,我們就可以實現生產者和消費者的高效率工作,并且將發(fā)送數據的線程和處理數據的線程解耦合
2、多生產多消費模型
(一)BlockQueue.hpp
#pragma once #include <iostream> #include <queue> #include <pthread.h> //定義一個模版類,方便我們使用任何類型進行生產消費 template <class T> //定義一個阻塞隊列 class BlockQueue { //隊列默認最大容量 static const int defalutnum = 20; public: BlockQueue(int maxcap = defalutnum) : maxcap_(maxcap) { //初始化互斥鎖和生產者和消費者的條件變量 pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&c_cond_, nullptr); pthread_cond_init(&p_cond_, nullptr); //下面注釋掉的是設置水位線,設置最低最高水位線 //在阻塞隊列中的數據,在低于最低水位線時是不可被獲取的,只能寫入 //在高于最高水位線時是不可被寫入的,只能獲取 // low_water_ = maxcap_/3; // high_water_ = (maxcap_*2)/3; } //從隊列頭取出元素返回 T pop() { pthread_mutex_lock(&mutex_);//加鎖 //只能用while不能用if,原因是會出現誤喚醒問題,下面說 while (q_.size() == 0) { pthread_cond_wait(&c_cond_, &mutex_); } T out = q_.front(); q_.pop(); //這里是加了水位線的版本,在低于水位線的時候要喚醒生產者 // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_);//解鎖 return out; } void push(const T &in) { pthread_mutex_lock(&mutex_);//加鎖 //同pop函數 while (q_.size() == maxcap_) { pthread_cond_wait(&p_cond_, &mutex_); } q_.push(in); //這里是加了水位線的版本,在高于水位線的時候要喚醒消費者 // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_);//解鎖 } //析構函數 ~BlockQueue() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&c_cond_); pthread_cond_destroy(&p_cond_); } private: std::queue<T> q_; int maxcap_; // 極大值 pthread_mutex_t mutex_; pthread_cond_t c_cond_; pthread_cond_t p_cond_; //最低最高水位線 // int low_water_; // int high_water_; };
(二)Task.hpp
#pragma once #include <iostream> #include <string> //定義運算方法 std::string opers = "+-*/%"; //枚舉錯誤 enum { DivZero = 1, ModZero, Unknown }; class Task { public: Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) {} void run() { switch (oper_) { case '+': result_ = data1_ + data2_; break; case '-': result_ = data1_ - data2_; break; case '*': result_ = data1_ * data2_; break; case '/': { if (data2_ == 0) exitcode_ = DivZero; else result_ = data1_ / data2_; } break; case '%': { if (data2_ == 0) exitcode_ = ModZero; else result_ = data1_ % data2_; } break; default: exitcode_ = Unknown; break; } } //偽函數,通過重載()使run可以像函數一樣調用 void operator()() { run(); } //返回的運算結果以及錯誤代碼 std::string GetResult() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "="; r += std::to_string(result_); r += "[code: "; r += std::to_string(exitcode_); r += "]"; return r; } //返回運算表達式 std::string GetTask() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "=?"; return r; } ~Task() {} private: int data1_; int data2_; char oper_; int result_; int exitcode_; };
(三)main.cpp
#include "BlockQueue.hpp" #include "Task.hpp" #include <unistd.h> #include <ctime> void *Consumer(void *args) { BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); while (true) { // 消費 Task t = bq->pop(); // 計算 t(); //模擬消費者處理任務 std::cout << "處理任務: " << t.GetTask() << " 運算結果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl; } } void *Productor(void *args) { int len = opers.size(); BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); int x = 10; int y = 20; while (true) { // 用隨機數運算模擬生產者生產數據 int data1 = rand() % 10 + 1; // [1,10] usleep(10); int data2 = rand() % 10; char op = opers[rand() % len]; Task t(data1, data2, op); // 生產 bq->push(t); std::cout << "生產了一個任務: " << t.GetTask() << " thread id: " << pthread_self() << std::endl; sleep(1); } } int main() { //隨機數種子 srand(time(nullptr)); //給阻塞隊列傳一個任務 BlockQueue<Task> *bq = new BlockQueue<Task>(); //多生產者多消費者 pthread_t c[3], p[5]; for (int i = 0; i < 3; i++) { pthread_create(c + i, nullptr, Consumer, bq); } for (int i = 0; i < 5; i++) { pthread_create(p + i, nullptr, Productor, bq); } for (int i = 0; i < 3; i++) { pthread_join(c[i], nullptr); } for (int i = 0; i < 5; i++) { pthread_join(p[i], nullptr); } delete bq; return 0; }
3、誤喚醒問題
誤喚醒問題就是在調用pop
函數或者push
函數的時候可能會引起的,下面我們再把代碼貼出來,然后把上面有過的注釋去掉
//... T pop() { pthread_mutex_lock(&mutex_); while (q_.size() == 0) //不能調用if而要用while { pthread_cond_wait(&c_cond_, &mutex_); } T out = q_.front(); q_.pop(); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_); return out; } void push(const T &in) { pthread_mutex_lock(&mutex_); while (q_.size() == maxcap_) { pthread_cond_wait(&p_cond_, &mutex_); } q_.push(in); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_); //...
在多生產者 - 多消費者并發(fā)編程場景中,誤喚醒現象較為常見,假定隊列當前處于滿狀態(tài),當一個消費者線程成功消費一個數據后,隊列中會空出一個位置,隨后,線程可能多次調用pthread_cond_signal(&p_cond_)
函數,喚醒了一批正在 p_cond_
條件變量下等待的生產者線程,由于被喚醒的生產者線程需要重新競爭互斥鎖,這些線程之間呈現出互斥關系,在先前執(zhí)行消費操作的線程釋放鎖之后,僅有一個生產者線程能夠成功獲取鎖,其余雖被喚醒但未能搶到鎖的生產者線程只能在鎖處等待
當成功獲取鎖的生產者線程完成數據生產操作后,隊列可能再次達到滿狀態(tài),此時,該線程會調用 pthread_cond_signal(&c_cond_)
函數喚醒一個消費者線程,隨后釋放鎖,在此情形下,被喚醒的線程不僅包括剛剛被喚醒的消費者線程,還涵蓋之前被喚醒卻未搶到鎖的生產者線程,它們會同時參與鎖的競爭,若使用 if
語句來判斷隊列是否已滿,當某個生產者線程搶到鎖后,可能不會再次對隊列狀態(tài)進行檢查,直接嘗試向已滿的隊列中添加數據,從而引發(fā)錯誤
因此,為確保線程安全,應使用 while
循環(huán)來包裹 pthread_cond_wait
函數,當一個線程被喚醒并成功獲取鎖后,不應直接執(zhí)行隊列操作(無論是生產數據還是消費數據),而應再次檢查資源是否滿足操作條件,若資源就緒,則可繼續(xù)執(zhí)行隊列操作;若資源未就緒,則應再次調用 pthread_cond_wait
函數,使線程進入休眠狀態(tài),等待后續(xù)喚醒
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Apache No space left on device: mod_rewrite: could not creat
這篇文章主要介紹了Apache No space left on device: mod_rewrite: could not create rewrite_log_lock Configuration Failed問題的解決方法,需要的朋友可以參考下2014-09-09使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務的方法
在 Linux 中,無論何時當你安裝任何帶有服務和守護進程的包,系統默認會把這些服務的初始化及 systemd 腳本添加進去,不過此時它們并沒有被啟用。下面小編給大家?guī)砹耸褂?chkconfig 和 systemctl 命令啟用或禁用 Linux 服務的方法,一起看看吧2018-11-11Linux用戶建立腳本/猜字游戲/網卡流量監(jiān)控介紹
大家好,本篇文章主要講的是Linux用戶建立腳本/猜字游戲/網卡流量監(jiān)控介紹,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下2021-12-12