Linux基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型詳解
一、POSIX信號(hào)量
1、概述
在我們進(jìn)行環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型的學(xué)習(xí)之前,我們要對(duì)前置條件POSIX信號(hào)量進(jìn)行學(xué)習(xí),這里的POSIX的信號(hào)量與systemV的信號(hào)量是幾乎一致的,都是用于同步操作,達(dá)到無沖突的訪問共享資源的目的,只是POSIX信號(hào)量的使用要更簡(jiǎn)單一些,可以用于線程間同步
信號(hào)量的本質(zhì)就是一個(gè)計(jì)數(shù)器,它的本質(zhì)就是用來描述資源數(shù)目的,把資源是否就緒放到了臨界區(qū)之外,在申請(qǐng)信號(hào)量的時(shí)候其實(shí)已經(jīng)就是間接在做判斷了
2、調(diào)用接口
(一)初始化信號(hào)量
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- 返回值:成功返回0,失敗返回-1
sem:指向要初始化的信號(hào)量對(duì)象的指針pshared:指定信號(hào)量的共享屬性,如果pshared為 0,表示信號(hào)量是進(jìn)程內(nèi)共享的,只能在創(chuàng)建它的進(jìn)程內(nèi)的多個(gè)線程之間使用,如果pshared非 0,表示信號(hào)量可以在多個(gè)進(jìn)程之間共享value:指定信號(hào)量的初始值,表示可以同時(shí)訪問共享資源的線程或進(jìn)程的數(shù)量
(二)銷毀信號(hào)量
#include <semaphore.h> int sem_destroy(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem:指向要銷毀的信號(hào)量對(duì)象的指針
(三)等待信號(hào)量
#include <semaphore.h> int sem_wait(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem:指向要操作的信號(hào)量對(duì)象的指針,這個(gè)指針一定要是被初始化過的
sem_wait 函數(shù)執(zhí)行的是信號(hào)量的 P 操作
- 如果信號(hào)量
sem的值大于 0,sem_wait會(huì)將信號(hào)量的值減 1,然后立即返回,調(diào)用線程或進(jìn)程可以繼續(xù)執(zhí)行后續(xù)代碼,意味著該線程或進(jìn)程成功獲取了對(duì)共享資源的訪問權(quán) - 如果信號(hào)量
sem的值等于 0,sem_wait會(huì)使調(diào)用線程或進(jìn)程進(jìn)入阻塞狀態(tài),直到信號(hào)量的值大于 0 為止。一旦信號(hào)量的值變?yōu)榇笥?0,sem_wait會(huì)將信號(hào)量的值減 1 并返回,線程或進(jìn)程繼續(xù)執(zhí)行
(四)發(fā)布信號(hào)量
#include <semaphore.h> int sem_post(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem:指向要操作的信號(hào)量對(duì)象的指針,這個(gè)指針一定要是被初始化過的
sem_post 函數(shù)執(zhí)行的是信號(hào)量的 V 操作,會(huì)將信號(hào)量 sem 的值加 1
- 如果在調(diào)用
sem_post之前,有其他線程或進(jìn)程因?yàn)檎{(diào)用sem_wait而阻塞在該信號(hào)量上(即信號(hào)量的值為 0),那么在信號(hào)量的值加 1 之后,系統(tǒng)會(huì)喚醒其中一個(gè)阻塞的線程或進(jìn)程,被喚醒的線程或進(jìn)程會(huì)將信號(hào)量的值再減 1 并繼續(xù)執(zhí)行后續(xù)代碼
3、在環(huán)形隊(duì)列中的作用
我們?cè)谥皯?yīng)該都接觸過環(huán)形隊(duì)列,在環(huán)形隊(duì)列中,一般我們是需要一個(gè)計(jì)數(shù)器的,或者在環(huán)形隊(duì)列中留出最后一個(gè)位置,因?yàn)槿绻麤]有這些措施,我們就不知道雙指針誰在前誰在后了,我們這里使用信號(hào)量替代了這個(gè)計(jì)數(shù)器
二、基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型
1、理論探究

我們通過數(shù)組以及模運(yùn)算的方式來模擬環(huán)狀模型,前面的基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型底層來說是基于容器queue的,其空間可以動(dòng)態(tài)分配,現(xiàn)在是基于固定大小的,基于容器vector
其中生產(chǎn)者關(guān)注的是環(huán)形隊(duì)列的空間資源,消費(fèi)者關(guān)心的是環(huán)形隊(duì)列的數(shù)據(jù)資源,而環(huán)形隊(duì)列中的空間資源+數(shù)據(jù)資源=全部資源,只要有空間生產(chǎn)者就可以生產(chǎn)數(shù)據(jù)然后放入,只要有數(shù)據(jù)消費(fèi)者就可以取出數(shù)據(jù)然后加工
2、代碼實(shí)現(xiàn)
(一)RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
//環(huán)形隊(duì)列默認(rèn)容量
const static int defaultcap = 8;
//環(huán)形隊(duì)列核心接口:PV操作以及加鎖解鎖
template<class T>
class RingQueue{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
//初始化
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
//生產(chǎn)者消費(fèi)者的鎖
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T &in) // 生產(chǎn)活動(dòng)
{
//調(diào)用P函數(shù)檢查隊(duì)列中是否有可用空間,沒有可用空間線程會(huì)阻塞
P(pspace_sem_);
//這里為什么要先P后加鎖,下面詳談
Lock(p_mutex_);
ringqueue_[p_step_] = in;
// 位置后移,維持環(huán)形特性
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T *out) // 消費(fèi)活動(dòng)
{
P(cdata_sem_);
Lock(c_mutex_);
*out = ringqueue_[c_step_];
// 位置后移,維持環(huán)形特性
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
//析構(gòu)銷毀
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;// 環(huán)形隊(duì)列的底層實(shí)現(xiàn)
int cap_; // 隊(duì)列容量
int c_step_; // 消費(fèi)者下標(biāo)
int p_step_; // 生產(chǎn)者下標(biāo)
sem_t cdata_sem_; // 隊(duì)中可用數(shù)據(jù)資源
sem_t pspace_sem_; // 隊(duì)中可用空間資源
pthread_mutex_t c_mutex_; // 消費(fèi)者鎖
pthread_mutex_t p_mutex_; // 生產(chǎn)者鎖
};(二)Task.hpp
任務(wù)函數(shù)還是上一次的任務(wù)
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};(三)main.cpp
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"
using namespace std;
//這個(gè)結(jié)構(gòu)體是方便我們打印的時(shí)候查看方便的
struct ThreadData
{
RingQueue<Task> *rq; //環(huán)形隊(duì)列
std::string threadname;//線程名字
};
void *Productor(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 模擬獲取數(shù)據(jù)
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 生產(chǎn)數(shù)據(jù)
rq->Push(t);
cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
sleep(1);
}
return nullptr;
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
// 消費(fèi)數(shù)據(jù)
Task t;
rq->Pop(&t);
// 處理數(shù)據(jù)
t();
cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
}
return nullptr;
}
int main()
{
srand(time(nullptr));
RingQueue<Task> *rq = new RingQueue<Task>(10);
pthread_t c[5], p[3];
//這里我們?yōu)榱朔奖悴榭?,統(tǒng)一用單生產(chǎn)單消費(fèi)
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, td);
}
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, td);
}
for (int i = 0; i < 1; i++)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 1; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}
3、PV操作包裹住加解鎖操作的原因
在 Pop和Push 函數(shù)中,以Push 函數(shù)為例,P(pspace_sem_) 和 V(cdata_sem_) 包裹著 Lock(p_mutex_) 和 Unlock(p_mutex_) 這種設(shè)計(jì)是為了實(shí)現(xiàn)更細(xì)粒度的同步控制,盡可能減少鎖的競(jìng)爭(zhēng),以確保線程安全和高效性,下面詳細(xì)解釋其原因:
P(pspace_sem_) 在 Lock(p_mutex_) 之前:
- 信號(hào)量的作用:
pspace_sem_信號(hào)量用于表示環(huán)形隊(duì)列中可用的空間資源,P(pspace_sem_)操作會(huì)檢查信號(hào)量的值,如果值大于 0,則將其減 1 并繼續(xù)執(zhí)行,如果值為 0,則線程會(huì)阻塞,直到有可用空間(即其他線程調(diào)用V(pspace_sem_)釋放空間) - 避免不必要的加鎖:在嘗試獲取互斥鎖之前先檢查信號(hào)量,可以避免在沒有可用空間時(shí)加鎖,因?yàn)槿绻麤]有可用空間,即使加了鎖也無法進(jìn)行生產(chǎn)操作,還會(huì)導(dǎo)致其他線程無法釋放空間,造成資源浪費(fèi)和性能下降,通過先檢查信號(hào)量,只有在有可用空間時(shí)才去獲取互斥鎖,減少了鎖的競(jìng)爭(zhēng),提高了程序的效率
V(cdata_sem_) 在 Unlock(p_mutex_) 之后:
- 信號(hào)量的通知機(jī)制:
cdata_sem_信號(hào)量用于表示環(huán)形隊(duì)列中可用的數(shù)據(jù)資源,V(cdata_sem_)操作會(huì)將信號(hào)量的值加 1,如果有消費(fèi)者線程因?yàn)榈却龜?shù)據(jù)而阻塞,會(huì)喚醒其中一個(gè)線程 - 避免死鎖和數(shù)據(jù)不一致:在釋放互斥鎖之后再增加
cdata_sem_信號(hào)量的值,可以確保在通知消費(fèi)者有新數(shù)據(jù)可用之前,生產(chǎn)者已經(jīng)完成了對(duì)共享資源的修改,并且釋放了鎖,如果在加鎖狀態(tài)下就增加信號(hào)量,可能會(huì)導(dǎo)致消費(fèi)者線程被喚醒后嘗試獲取鎖,但由于生產(chǎn)者還持有鎖而無法進(jìn)入臨界區(qū),從而造成死鎖或數(shù)據(jù)不一致的問題
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
apache的AllowOverride以及Options使用詳解
通常利用Apache的rewrite模塊對(duì) URL 進(jìn)行重寫的時(shí)候, rewrite規(guī)則會(huì)寫在 .htaccess 文件里。但要使 apache 能夠正常的讀取.htaccess 文件的內(nèi)容,就必須對(duì).htaccess 所在目錄進(jìn)行配置2012-11-11
Linux常用命令之chmod修改文件權(quán)限777和754
這篇文章主要介紹了Linux常用命令之chmod修改文件權(quán)限777和754,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
ubuntu系統(tǒng)theano和keras的安裝方法
這篇文章主要介紹了ubuntu系統(tǒng)theano和keras的安裝方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-12-12
Linux 命令行通配符及轉(zhuǎn)義符的實(shí)現(xiàn)
這篇文章主要介紹了Linux 命令行通配符及轉(zhuǎn)義符的實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-11-11
linux下因?yàn)橄到y(tǒng)編碼問題造成亂碼的快速解決方法
下面小編就為大家?guī)硪黄猯inux下因?yàn)橄到y(tǒng)編碼問題造成亂碼的快速解決方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-10-10

