Linux基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型詳解
一、POSIX信號(hào)量
1、概述
在我們進(jìn)行環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型的學(xué)習(xí)之前,我們要對(duì)前置條件POSIX信號(hào)量進(jìn)行學(xué)習(xí),這里的POSIX的信號(hào)量與systemV的信號(hào)量是幾乎一致的,都是用于同步操作,達(dá)到無(wú)沖突的訪問(wèn)共享資源的目的,只是POSIX信號(hào)量的使用要更簡(jiǎn)單一些,可以用于線程間同步
信號(hào)量的本質(zhì)就是一個(gè)計(jì)數(shù)器,它的本質(zhì)就是用來(lái)描述資源數(shù)目的,把資源是否就緒放到了臨界區(qū)之外,在申請(qǐng)信號(hào)量的時(shí)候其實(shí)已經(jīng)就是間接在做判斷了
2、調(diào)用接口
(一)初始化信號(hào)量
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- 返回值:成功返回0,失敗返回-1
sem
:指向要初始化的信號(hào)量對(duì)象的指針pshared
:指定信號(hào)量的共享屬性,如果pshared
為 0,表示信號(hào)量是進(jìn)程內(nèi)共享的,只能在創(chuàng)建它的進(jìn)程內(nèi)的多個(gè)線程之間使用,如果pshared
非 0,表示信號(hào)量可以在多個(gè)進(jìn)程之間共享value
:指定信號(hào)量的初始值,表示可以同時(shí)訪問(wèn)共享資源的線程或進(jìn)程的數(shù)量
(二)銷毀信號(hào)量
#include <semaphore.h> int sem_destroy(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem
:指向要銷毀的信號(hào)量對(duì)象的指針
(三)等待信號(hào)量
#include <semaphore.h> int sem_wait(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem
:指向要操作的信號(hào)量對(duì)象的指針,這個(gè)指針一定要是被初始化過(guò)的
sem_wait
函數(shù)執(zhí)行的是信號(hào)量的 P
操作
- 如果信號(hào)量
sem
的值大于 0,sem_wait
會(huì)將信號(hào)量的值減 1,然后立即返回,調(diào)用線程或進(jìn)程可以繼續(xù)執(zhí)行后續(xù)代碼,意味著該線程或進(jìn)程成功獲取了對(duì)共享資源的訪問(wèn)權(quán) - 如果信號(hào)量
sem
的值等于 0,sem_wait
會(huì)使調(diào)用線程或進(jìn)程進(jìn)入阻塞狀態(tài),直到信號(hào)量的值大于 0 為止。一旦信號(hào)量的值變?yōu)榇笥?0,sem_wait
會(huì)將信號(hào)量的值減 1 并返回,線程或進(jìn)程繼續(xù)執(zhí)行
(四)發(fā)布信號(hào)量
#include <semaphore.h> int sem_post(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem
:指向要操作的信號(hào)量對(duì)象的指針,這個(gè)指針一定要是被初始化過(guò)的
sem_post
函數(shù)執(zhí)行的是信號(hào)量的 V
操作,會(huì)將信號(hào)量 sem 的值加 1
- 如果在調(diào)用
sem_post
之前,有其他線程或進(jìn)程因?yàn)檎{(diào)用sem_wait
而阻塞在該信號(hào)量上(即信號(hào)量的值為 0),那么在信號(hào)量的值加 1 之后,系統(tǒng)會(huì)喚醒其中一個(gè)阻塞的線程或進(jìn)程,被喚醒的線程或進(jìn)程會(huì)將信號(hào)量的值再減 1 并繼續(xù)執(zhí)行后續(xù)代碼
3、在環(huán)形隊(duì)列中的作用
我們?cè)谥皯?yīng)該都接觸過(guò)環(huán)形隊(duì)列,在環(huán)形隊(duì)列中,一般我們是需要一個(gè)計(jì)數(shù)器的,或者在環(huán)形隊(duì)列中留出最后一個(gè)位置,因?yàn)槿绻麤](méi)有這些措施,我們就不知道雙指針誰(shuí)在前誰(shuí)在后了,我們這里使用信號(hào)量替代了這個(gè)計(jì)數(shù)器
二、基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型
1、理論探究
我們通過(guò)數(shù)組以及模運(yùn)算的方式來(lái)模擬環(huán)狀模型,前面的基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型底層來(lái)說(shuō)是基于容器queue
的,其空間可以動(dòng)態(tài)分配,現(xiàn)在是基于固定大小的,基于容器vector
其中生產(chǎn)者關(guān)注的是環(huán)形隊(duì)列的空間資源,消費(fèi)者關(guān)心的是環(huán)形隊(duì)列的數(shù)據(jù)資源,而環(huán)形隊(duì)列中的空間資源+數(shù)據(jù)資源=全部資源,只要有空間生產(chǎn)者就可以生產(chǎn)數(shù)據(jù)然后放入,只要有數(shù)據(jù)消費(fèi)者就可以取出數(shù)據(jù)然后加工
2、代碼實(shí)現(xiàn)
(一)RingQueue.hpp
#pragma once #include <iostream> #include <vector> #include <semaphore.h> #include <pthread.h> //環(huán)形隊(duì)列默認(rèn)容量 const static int defaultcap = 8; //環(huán)形隊(duì)列核心接口:PV操作以及加鎖解鎖 template<class T> class RingQueue{ private: void P(sem_t &sem) { sem_wait(&sem); } void V(sem_t &sem) { sem_post(&sem); } void Lock(pthread_mutex_t &mutex) { pthread_mutex_lock(&mutex); } void Unlock(pthread_mutex_t &mutex) { pthread_mutex_unlock(&mutex); } public: //初始化 RingQueue(int cap = defaultcap) :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0) { sem_init(&cdata_sem_, 0, 0); sem_init(&pspace_sem_, 0, cap); //生產(chǎn)者消費(fèi)者的鎖 pthread_mutex_init(&c_mutex_, nullptr); pthread_mutex_init(&p_mutex_, nullptr); } void Push(const T &in) // 生產(chǎn)活動(dòng) { //調(diào)用P函數(shù)檢查隊(duì)列中是否有可用空間,沒(méi)有可用空間線程會(huì)阻塞 P(pspace_sem_); //這里為什么要先P后加鎖,下面詳談 Lock(p_mutex_); ringqueue_[p_step_] = in; // 位置后移,維持環(huán)形特性 p_step_++; p_step_ %= cap_; Unlock(p_mutex_); V(cdata_sem_); } void Pop(T *out) // 消費(fèi)活動(dòng) { P(cdata_sem_); Lock(c_mutex_); *out = ringqueue_[c_step_]; // 位置后移,維持環(huán)形特性 c_step_++; c_step_ %= cap_; Unlock(c_mutex_); V(pspace_sem_); } //析構(gòu)銷毀 ~RingQueue() { sem_destroy(&cdata_sem_); sem_destroy(&pspace_sem_); pthread_mutex_destroy(&c_mutex_); pthread_mutex_destroy(&p_mutex_); } private: std::vector<T> ringqueue_;// 環(huán)形隊(duì)列的底層實(shí)現(xiàn) int cap_; // 隊(duì)列容量 int c_step_; // 消費(fèi)者下標(biāo) int p_step_; // 生產(chǎn)者下標(biāo) sem_t cdata_sem_; // 隊(duì)中可用數(shù)據(jù)資源 sem_t pspace_sem_; // 隊(duì)中可用空間資源 pthread_mutex_t c_mutex_; // 消費(fèi)者鎖 pthread_mutex_t p_mutex_; // 生產(chǎn)者鎖 };
(二)Task.hpp
任務(wù)函數(shù)還是上一次的任務(wù)
#pragma once #include <iostream> #include <string> std::string opers="+-*/%"; enum{ DivZero=1, ModZero, Unknown }; class Task { public: Task() {} Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) {} void run() { switch (oper_) { case '+': result_ = data1_ + data2_; break; case '-': result_ = data1_ - data2_; break; case '*': result_ = data1_ * data2_; break; case '/': { if(data2_ == 0) exitcode_ = DivZero; else result_ = data1_ / data2_; } break; case '%': { if(data2_ == 0) exitcode_ = ModZero; else result_ = data1_ % data2_; } break; default: exitcode_ = Unknown; break; } } void operator ()() { run(); } std::string GetResult() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "="; r += std::to_string(result_); r += "[code: "; r += std::to_string(exitcode_); r += "]"; return r; } std::string GetTask() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "=?"; return r; } ~Task() {} private: int data1_; int data2_; char oper_; int result_; int exitcode_; };
(三)main.cpp
#include <iostream> #include <pthread.h> #include <unistd.h> #include <ctime> #include "RingQueue.hpp" #include "Task.hpp" using namespace std; //這個(gè)結(jié)構(gòu)體是方便我們打印的時(shí)候查看方便的 struct ThreadData { RingQueue<Task> *rq; //環(huán)形隊(duì)列 std::string threadname;//線程名字 }; void *Productor(void *args) { ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; int len = opers.size(); while (true) { // 模擬獲取數(shù)據(jù) int data1 = rand() % 10 + 1; usleep(10); int data2 = rand() % 10; char op = opers[rand() % len]; Task t(data1, data2, op); // 生產(chǎn)數(shù)據(jù) rq->Push(t); cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl; sleep(1); } return nullptr; } void *Consumer(void *args) { ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; while (true) { // 消費(fèi)數(shù)據(jù) Task t; rq->Pop(&t); // 處理數(shù)據(jù) t(); cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl; } return nullptr; } int main() { srand(time(nullptr)); RingQueue<Task> *rq = new RingQueue<Task>(10); pthread_t c[5], p[3]; //這里我們?yōu)榱朔奖悴榭?,統(tǒng)一用單生產(chǎn)單消費(fèi) for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; td->threadname = "Productor-" + std::to_string(i); pthread_create(p + i, nullptr, Productor, td); } for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; td->threadname = "Consumer-" + std::to_string(i); pthread_create(c + i, nullptr, Consumer, td); } for (int i = 0; i < 1; i++) { pthread_join(p[i], nullptr); } for (int i = 0; i < 1; i++) { pthread_join(c[i], nullptr); } return 0; }
3、PV操作包裹住加解鎖操作的原因
在 Pop
和Push
函數(shù)中,以Push
函數(shù)為例,P(pspace_sem_)
和 V(cdata_sem_)
包裹著 Lock(p_mutex_)
和 Unlock(p_mutex_)
這種設(shè)計(jì)是為了實(shí)現(xiàn)更細(xì)粒度的同步控制,盡可能減少鎖的競(jìng)爭(zhēng),以確保線程安全和高效性,下面詳細(xì)解釋其原因:
P(pspace_sem_)
在 Lock(p_mutex_)
之前:
- 信號(hào)量的作用:
pspace_sem_
信號(hào)量用于表示環(huán)形隊(duì)列中可用的空間資源,P(pspace_sem_)
操作會(huì)檢查信號(hào)量的值,如果值大于 0,則將其減 1 并繼續(xù)執(zhí)行,如果值為 0,則線程會(huì)阻塞,直到有可用空間(即其他線程調(diào)用V(pspace_sem_)
釋放空間) - 避免不必要的加鎖:在嘗試獲取互斥鎖之前先檢查信號(hào)量,可以避免在沒(méi)有可用空間時(shí)加鎖,因?yàn)槿绻麤](méi)有可用空間,即使加了鎖也無(wú)法進(jìn)行生產(chǎn)操作,還會(huì)導(dǎo)致其他線程無(wú)法釋放空間,造成資源浪費(fèi)和性能下降,通過(guò)先檢查信號(hào)量,只有在有可用空間時(shí)才去獲取互斥鎖,減少了鎖的競(jìng)爭(zhēng),提高了程序的效率
V(cdata_sem_)
在 Unlock(p_mutex_)
之后:
- 信號(hào)量的通知機(jī)制:
cdata_sem_
信號(hào)量用于表示環(huán)形隊(duì)列中可用的數(shù)據(jù)資源,V(cdata_sem_)
操作會(huì)將信號(hào)量的值加 1,如果有消費(fèi)者線程因?yàn)榈却龜?shù)據(jù)而阻塞,會(huì)喚醒其中一個(gè)線程 - 避免死鎖和數(shù)據(jù)不一致:在釋放互斥鎖之后再增加
cdata_sem_
信號(hào)量的值,可以確保在通知消費(fèi)者有新數(shù)據(jù)可用之前,生產(chǎn)者已經(jīng)完成了對(duì)共享資源的修改,并且釋放了鎖,如果在加鎖狀態(tài)下就增加信號(hào)量,可能會(huì)導(dǎo)致消費(fèi)者線程被喚醒后嘗試獲取鎖,但由于生產(chǎn)者還持有鎖而無(wú)法進(jìn)入臨界區(qū),從而造成死鎖或數(shù)據(jù)不一致的問(wèn)題
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
apache的AllowOverride以及Options使用詳解
通常利用Apache的rewrite模塊對(duì) URL 進(jìn)行重寫的時(shí)候, rewrite規(guī)則會(huì)寫在 .htaccess 文件里。但要使 apache 能夠正常的讀取.htaccess 文件的內(nèi)容,就必須對(duì).htaccess 所在目錄進(jìn)行配置2012-11-11Linux常用命令之chmod修改文件權(quán)限777和754
這篇文章主要介紹了Linux常用命令之chmod修改文件權(quán)限777和754,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09ubuntu系統(tǒng)theano和keras的安裝方法
這篇文章主要介紹了ubuntu系統(tǒng)theano和keras的安裝方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-12-12Linux 命令行通配符及轉(zhuǎn)義符的實(shí)現(xiàn)
這篇文章主要介紹了Linux 命令行通配符及轉(zhuǎn)義符的實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-11-11linux下因?yàn)橄到y(tǒng)編碼問(wèn)題造成亂碼的快速解決方法
下面小編就為大家?guī)?lái)一篇linux下因?yàn)橄到y(tǒng)編碼問(wèn)題造成亂碼的快速解決方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-10-10