Linux基于環(huán)形隊列的生產消費者模型詳解
一、POSIX信號量
1、概述
在我們進行環(huán)形隊列的生產消費者模型的學習之前,我們要對前置條件POSIX信號量進行學習,這里的POSIX的信號量與systemV的信號量是幾乎一致的,都是用于同步操作,達到無沖突的訪問共享資源的目的,只是POSIX信號量的使用要更簡單一些,可以用于線程間同步
信號量的本質就是一個計數器,它的本質就是用來描述資源數目的,把資源是否就緒放到了臨界區(qū)之外,在申請信號量的時候其實已經就是間接在做判斷了
2、調用接口
(一)初始化信號量
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- 返回值:成功返回0,失敗返回-1
sem
:指向要初始化的信號量對象的指針pshared
:指定信號量的共享屬性,如果pshared
為 0,表示信號量是進程內共享的,只能在創(chuàng)建它的進程內的多個線程之間使用,如果pshared
非 0,表示信號量可以在多個進程之間共享value
:指定信號量的初始值,表示可以同時訪問共享資源的線程或進程的數量
(二)銷毀信號量
#include <semaphore.h> int sem_destroy(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem
:指向要銷毀的信號量對象的指針
(三)等待信號量
#include <semaphore.h> int sem_wait(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem
:指向要操作的信號量對象的指針,這個指針一定要是被初始化過的
sem_wait
函數執(zhí)行的是信號量的 P
操作
- 如果信號量
sem
的值大于 0,sem_wait
會將信號量的值減 1,然后立即返回,調用線程或進程可以繼續(xù)執(zhí)行后續(xù)代碼,意味著該線程或進程成功獲取了對共享資源的訪問權 - 如果信號量
sem
的值等于 0,sem_wait
會使調用線程或進程進入阻塞狀態(tài),直到信號量的值大于 0 為止。一旦信號量的值變?yōu)榇笥?0,sem_wait
會將信號量的值減 1 并返回,線程或進程繼續(xù)執(zhí)行
(四)發(fā)布信號量
#include <semaphore.h> int sem_post(sem_t *sem);
- 返回值:成功返回0,失敗返回-1
sem
:指向要操作的信號量對象的指針,這個指針一定要是被初始化過的
sem_post
函數執(zhí)行的是信號量的 V
操作,會將信號量 sem 的值加 1
- 如果在調用
sem_post
之前,有其他線程或進程因為調用sem_wait
而阻塞在該信號量上(即信號量的值為 0),那么在信號量的值加 1 之后,系統(tǒng)會喚醒其中一個阻塞的線程或進程,被喚醒的線程或進程會將信號量的值再減 1 并繼續(xù)執(zhí)行后續(xù)代碼
3、在環(huán)形隊列中的作用
我們在之前應該都接觸過環(huán)形隊列,在環(huán)形隊列中,一般我們是需要一個計數器的,或者在環(huán)形隊列中留出最后一個位置,因為如果沒有這些措施,我們就不知道雙指針誰在前誰在后了,我們這里使用信號量替代了這個計數器
二、基于環(huán)形隊列的生產消費者模型
1、理論探究
我們通過數組以及模運算的方式來模擬環(huán)狀模型,前面的基于阻塞隊列的生產消費者模型底層來說是基于容器queue
的,其空間可以動態(tài)分配,現在是基于固定大小的,基于容器vector
其中生產者關注的是環(huán)形隊列的空間資源,消費者關心的是環(huán)形隊列的數據資源,而環(huán)形隊列中的空間資源+數據資源=全部資源,只要有空間生產者就可以生產數據然后放入,只要有數據消費者就可以取出數據然后加工
2、代碼實現
(一)RingQueue.hpp
#pragma once #include <iostream> #include <vector> #include <semaphore.h> #include <pthread.h> //環(huán)形隊列默認容量 const static int defaultcap = 8; //環(huán)形隊列核心接口:PV操作以及加鎖解鎖 template<class T> class RingQueue{ private: void P(sem_t &sem) { sem_wait(&sem); } void V(sem_t &sem) { sem_post(&sem); } void Lock(pthread_mutex_t &mutex) { pthread_mutex_lock(&mutex); } void Unlock(pthread_mutex_t &mutex) { pthread_mutex_unlock(&mutex); } public: //初始化 RingQueue(int cap = defaultcap) :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0) { sem_init(&cdata_sem_, 0, 0); sem_init(&pspace_sem_, 0, cap); //生產者消費者的鎖 pthread_mutex_init(&c_mutex_, nullptr); pthread_mutex_init(&p_mutex_, nullptr); } void Push(const T &in) // 生產活動 { //調用P函數檢查隊列中是否有可用空間,沒有可用空間線程會阻塞 P(pspace_sem_); //這里為什么要先P后加鎖,下面詳談 Lock(p_mutex_); ringqueue_[p_step_] = in; // 位置后移,維持環(huán)形特性 p_step_++; p_step_ %= cap_; Unlock(p_mutex_); V(cdata_sem_); } void Pop(T *out) // 消費活動 { P(cdata_sem_); Lock(c_mutex_); *out = ringqueue_[c_step_]; // 位置后移,維持環(huán)形特性 c_step_++; c_step_ %= cap_; Unlock(c_mutex_); V(pspace_sem_); } //析構銷毀 ~RingQueue() { sem_destroy(&cdata_sem_); sem_destroy(&pspace_sem_); pthread_mutex_destroy(&c_mutex_); pthread_mutex_destroy(&p_mutex_); } private: std::vector<T> ringqueue_;// 環(huán)形隊列的底層實現 int cap_; // 隊列容量 int c_step_; // 消費者下標 int p_step_; // 生產者下標 sem_t cdata_sem_; // 隊中可用數據資源 sem_t pspace_sem_; // 隊中可用空間資源 pthread_mutex_t c_mutex_; // 消費者鎖 pthread_mutex_t p_mutex_; // 生產者鎖 };
(二)Task.hpp
任務函數還是上一次的任務
#pragma once #include <iostream> #include <string> std::string opers="+-*/%"; enum{ DivZero=1, ModZero, Unknown }; class Task { public: Task() {} Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) {} void run() { switch (oper_) { case '+': result_ = data1_ + data2_; break; case '-': result_ = data1_ - data2_; break; case '*': result_ = data1_ * data2_; break; case '/': { if(data2_ == 0) exitcode_ = DivZero; else result_ = data1_ / data2_; } break; case '%': { if(data2_ == 0) exitcode_ = ModZero; else result_ = data1_ % data2_; } break; default: exitcode_ = Unknown; break; } } void operator ()() { run(); } std::string GetResult() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "="; r += std::to_string(result_); r += "[code: "; r += std::to_string(exitcode_); r += "]"; return r; } std::string GetTask() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "=?"; return r; } ~Task() {} private: int data1_; int data2_; char oper_; int result_; int exitcode_; };
(三)main.cpp
#include <iostream> #include <pthread.h> #include <unistd.h> #include <ctime> #include "RingQueue.hpp" #include "Task.hpp" using namespace std; //這個結構體是方便我們打印的時候查看方便的 struct ThreadData { RingQueue<Task> *rq; //環(huán)形隊列 std::string threadname;//線程名字 }; void *Productor(void *args) { ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; int len = opers.size(); while (true) { // 模擬獲取數據 int data1 = rand() % 10 + 1; usleep(10); int data2 = rand() % 10; char op = opers[rand() % len]; Task t(data1, data2, op); // 生產數據 rq->Push(t); cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl; sleep(1); } return nullptr; } void *Consumer(void *args) { ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; while (true) { // 消費數據 Task t; rq->Pop(&t); // 處理數據 t(); cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl; } return nullptr; } int main() { srand(time(nullptr)); RingQueue<Task> *rq = new RingQueue<Task>(10); pthread_t c[5], p[3]; //這里我們?yōu)榱朔奖悴榭?,統(tǒng)一用單生產單消費 for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; td->threadname = "Productor-" + std::to_string(i); pthread_create(p + i, nullptr, Productor, td); } for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; td->threadname = "Consumer-" + std::to_string(i); pthread_create(c + i, nullptr, Consumer, td); } for (int i = 0; i < 1; i++) { pthread_join(p[i], nullptr); } for (int i = 0; i < 1; i++) { pthread_join(c[i], nullptr); } return 0; }
3、PV操作包裹住加解鎖操作的原因
在 Pop
和Push
函數中,以Push
函數為例,P(pspace_sem_)
和 V(cdata_sem_)
包裹著 Lock(p_mutex_)
和 Unlock(p_mutex_)
這種設計是為了實現更細粒度的同步控制,盡可能減少鎖的競爭,以確保線程安全和高效性,下面詳細解釋其原因:
P(pspace_sem_)
在 Lock(p_mutex_)
之前:
- 信號量的作用:
pspace_sem_
信號量用于表示環(huán)形隊列中可用的空間資源,P(pspace_sem_)
操作會檢查信號量的值,如果值大于 0,則將其減 1 并繼續(xù)執(zhí)行,如果值為 0,則線程會阻塞,直到有可用空間(即其他線程調用V(pspace_sem_)
釋放空間) - 避免不必要的加鎖:在嘗試獲取互斥鎖之前先檢查信號量,可以避免在沒有可用空間時加鎖,因為如果沒有可用空間,即使加了鎖也無法進行生產操作,還會導致其他線程無法釋放空間,造成資源浪費和性能下降,通過先檢查信號量,只有在有可用空間時才去獲取互斥鎖,減少了鎖的競爭,提高了程序的效率
V(cdata_sem_)
在 Unlock(p_mutex_)
之后:
- 信號量的通知機制:
cdata_sem_
信號量用于表示環(huán)形隊列中可用的數據資源,V(cdata_sem_)
操作會將信號量的值加 1,如果有消費者線程因為等待數據而阻塞,會喚醒其中一個線程 - 避免死鎖和數據不一致:在釋放互斥鎖之后再增加
cdata_sem_
信號量的值,可以確保在通知消費者有新數據可用之前,生產者已經完成了對共享資源的修改,并且釋放了鎖,如果在加鎖狀態(tài)下就增加信號量,可能會導致消費者線程被喚醒后嘗試獲取鎖,但由于生產者還持有鎖而無法進入臨界區(qū),從而造成死鎖或數據不一致的問題
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
apache的AllowOverride以及Options使用詳解
通常利用Apache的rewrite模塊對 URL 進行重寫的時候, rewrite規(guī)則會寫在 .htaccess 文件里。但要使 apache 能夠正常的讀取.htaccess 文件的內容,就必須對.htaccess 所在目錄進行配置2012-11-11ubuntu系統(tǒng)theano和keras的安裝方法
這篇文章主要介紹了ubuntu系統(tǒng)theano和keras的安裝方法,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-12-12linux下因為系統(tǒng)編碼問題造成亂碼的快速解決方法
下面小編就為大家?guī)硪黄猯inux下因為系統(tǒng)編碼問題造成亂碼的快速解決方法。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-10-10