Linux基于阻塞隊列的生產(chǎn)消費者模型詳解
一、什么是生產(chǎn)消費者模型
生產(chǎn)消費者模型就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題,生產(chǎn)者和消費者彼此之間不直接通訊,而是通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接交給阻塞隊列,消費者不找生產(chǎn)者索要數(shù)據(jù),而是直接從阻塞隊列中去取,這樣一來,阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力
對于生產(chǎn)消費者模型,我們有一個321規(guī)則,分別是3種關(guān)系,2種角色,1個交易場所
- 三種關(guān)系:生產(chǎn)者和生產(chǎn)者的互斥競爭關(guān)系,消費者和消費者的互斥競爭關(guān)系,生產(chǎn)者和消費者的互斥、同步關(guān)系
- 兩種角色:生產(chǎn)者和消費者
- 一個交易場所:特定結(jié)構(gòu)的內(nèi)存空間(如阻塞隊列)
二、基于阻塞隊列的生產(chǎn)消費者模型
1、理論研究

在多線程編程中,阻塞隊列是一種常用于實現(xiàn)生產(chǎn)者和消費者模型的數(shù)據(jù)結(jié)構(gòu),其與普通的隊列區(qū)別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中再次被放入元素,當隊列滿時,往隊列中存放元素的操作也會被阻塞,直到有元素從隊列中被獲取
生產(chǎn)消費者模型最大的好處是,也是生產(chǎn)消費者模型效率高的原因是:在消費者獲取數(shù)據(jù)(一般是網(wǎng)絡(luò)數(shù)據(jù))的時候,生產(chǎn)者可以生產(chǎn)數(shù)據(jù),生產(chǎn)者放入數(shù)據(jù)的時候,消費者可以處理數(shù)據(jù),雖然特定內(nèi)存結(jié)構(gòu),也就是臨界資源區(qū)是有鎖的,只能由單線程通過,只要將時間合理化,我們就可以實現(xiàn)生產(chǎn)者和消費者的高效率工作,并且將發(fā)送數(shù)據(jù)的線程和處理數(shù)據(jù)的線程解耦合

2、多生產(chǎn)多消費模型
(一)BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
//定義一個模版類,方便我們使用任何類型進行生產(chǎn)消費
template <class T>
//定義一個阻塞隊列
class BlockQueue
{
//隊列默認最大容量
static const int defalutnum = 20;
public:
BlockQueue(int maxcap = defalutnum) : maxcap_(maxcap)
{
//初始化互斥鎖和生產(chǎn)者和消費者的條件變量
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
//下面注釋掉的是設(shè)置水位線,設(shè)置最低最高水位線
//在阻塞隊列中的數(shù)據(jù),在低于最低水位線時是不可被獲取的,只能寫入
//在高于最高水位線時是不可被寫入的,只能獲取
// low_water_ = maxcap_/3;
// high_water_ = (maxcap_*2)/3;
}
//從隊列頭取出元素返回
T pop()
{
pthread_mutex_lock(&mutex_);//加鎖
//只能用while不能用if,原因是會出現(xiàn)誤喚醒問題,下面說
while (q_.size() == 0)
{
pthread_cond_wait(&c_cond_, &mutex_);
}
T out = q_.front();
q_.pop();
//這里是加了水位線的版本,在低于水位線的時候要喚醒生產(chǎn)者
// if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mutex_);//解鎖
return out;
}
void push(const T &in)
{
pthread_mutex_lock(&mutex_);//加鎖
//同pop函數(shù)
while (q_.size() == maxcap_)
{
pthread_cond_wait(&p_cond_, &mutex_);
}
q_.push(in);
//這里是加了水位線的版本,在高于水位線的時候要喚醒消費者
// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);//解鎖
}
//析構(gòu)函數(shù)
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_;
int maxcap_; // 極大值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
//最低最高水位線
// int low_water_;
// int high_water_;
};(二)Task.hpp
#pragma once
#include <iostream>
#include <string>
//定義運算方法
std::string opers = "+-*/%";
//枚舉錯誤
enum
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
}
break;
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
//偽函數(shù),通過重載()使run可以像函數(shù)一樣調(diào)用
void operator()()
{
run();
}
//返回的運算結(jié)果以及錯誤代碼
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
//返回運算表達式
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};(三)main.cpp
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 消費
Task t = bq->pop();
// 計算
t();
//模擬消費者處理任務(wù)
std::cout << "處理任務(wù): " << t.GetTask() << " 運算結(jié)果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
}
}
void *Productor(void *args)
{
int len = opers.size();
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
int x = 10;
int y = 20;
while (true)
{
// 用隨機數(shù)運算模擬生產(chǎn)者生產(chǎn)數(shù)據(jù)
int data1 = rand() % 10 + 1; // [1,10]
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 生產(chǎn)
bq->push(t);
std::cout << "生產(chǎn)了一個任務(wù): " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
sleep(1);
}
}
int main()
{
//隨機數(shù)種子
srand(time(nullptr));
//給阻塞隊列傳一個任務(wù)
BlockQueue<Task> *bq = new BlockQueue<Task>();
//多生產(chǎn)者多消費者
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, Consumer, bq);
}
for (int i = 0; i < 5; i++)
{
pthread_create(p + i, nullptr, Productor, bq);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
delete bq;
return 0;
}
3、誤喚醒問題
誤喚醒問題就是在調(diào)用pop函數(shù)或者push函數(shù)的時候可能會引起的,下面我們再把代碼貼出來,然后把上面有過的注釋去掉
//...
T pop()
{
pthread_mutex_lock(&mutex_);
while (q_.size() == 0) //不能調(diào)用if而要用while
{
pthread_cond_wait(&c_cond_, &mutex_);
}
T out = q_.front();
q_.pop();
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &in)
{
pthread_mutex_lock(&mutex_);
while (q_.size() == maxcap_)
{
pthread_cond_wait(&p_cond_, &mutex_);
}
q_.push(in);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
//...在多生產(chǎn)者 - 多消費者并發(fā)編程場景中,誤喚醒現(xiàn)象較為常見,假定隊列當前處于滿狀態(tài),當一個消費者線程成功消費一個數(shù)據(jù)后,隊列中會空出一個位置,隨后,線程可能多次調(diào)用pthread_cond_signal(&p_cond_) 函數(shù),喚醒了一批正在 p_cond_ 條件變量下等待的生產(chǎn)者線程,由于被喚醒的生產(chǎn)者線程需要重新競爭互斥鎖,這些線程之間呈現(xiàn)出互斥關(guān)系,在先前執(zhí)行消費操作的線程釋放鎖之后,僅有一個生產(chǎn)者線程能夠成功獲取鎖,其余雖被喚醒但未能搶到鎖的生產(chǎn)者線程只能在鎖處等待
當成功獲取鎖的生產(chǎn)者線程完成數(shù)據(jù)生產(chǎn)操作后,隊列可能再次達到滿狀態(tài),此時,該線程會調(diào)用 pthread_cond_signal(&c_cond_) 函數(shù)喚醒一個消費者線程,隨后釋放鎖,在此情形下,被喚醒的線程不僅包括剛剛被喚醒的消費者線程,還涵蓋之前被喚醒卻未搶到鎖的生產(chǎn)者線程,它們會同時參與鎖的競爭,若使用 if 語句來判斷隊列是否已滿,當某個生產(chǎn)者線程搶到鎖后,可能不會再次對隊列狀態(tài)進行檢查,直接嘗試向已滿的隊列中添加數(shù)據(jù),從而引發(fā)錯誤
因此,為確保線程安全,應(yīng)使用 while 循環(huán)來包裹 pthread_cond_wait 函數(shù),當一個線程被喚醒并成功獲取鎖后,不應(yīng)直接執(zhí)行隊列操作(無論是生產(chǎn)數(shù)據(jù)還是消費數(shù)據(jù)),而應(yīng)再次檢查資源是否滿足操作條件,若資源就緒,則可繼續(xù)執(zhí)行隊列操作;若資源未就緒,則應(yīng)再次調(diào)用 pthread_cond_wait 函數(shù),使線程進入休眠狀態(tài),等待后續(xù)喚醒
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
systemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開機啟動的方法
下面小編就為大家?guī)硪黄猻ystemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開機啟動的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-12-12
Apache No space left on device: mod_rewrite: could not creat
這篇文章主要介紹了Apache No space left on device: mod_rewrite: could not create rewrite_log_lock Configuration Failed問題的解決方法,需要的朋友可以參考下2014-09-09
淺談Linux系統(tǒng)中的異常堆棧跟蹤的簡單實現(xiàn)
下面小編就為大家?guī)硪黄獪\談Linux系統(tǒng)中的異常堆棧跟蹤的簡單實現(xiàn)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-12-12
使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法
在 Linux 中,無論何時當你安裝任何帶有服務(wù)和守護進程的包,系統(tǒng)默認會把這些服務(wù)的初始化及 systemd 腳本添加進去,不過此時它們并沒有被啟用。下面小編給大家?guī)砹耸褂?chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法,一起看看吧2018-11-11
Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹
大家好,本篇文章主要講的是Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下2021-12-12

