linux中編寫(xiě)自己的并發(fā)隊(duì)列類(Queue 并發(fā)阻塞隊(duì)列)
設(shè)計(jì)并發(fā)隊(duì)列
#include <pthread.h>
#include <list>
using namespace std;
template <typename T>
class Queue
{
public:
Queue( )
{
pthread_mutex_init(&_lock, NULL);
}
~Queue( )
{
pthread_mutex_destroy(&_lock);
}
void push(const T& data);
T pop( );
private:
list<T> _list;
pthread_mutex_t _lock;
};
template <typename T>
void Queue<T>::push(const T& value )
{
pthread_mutex_lock(&_lock);
_list.push_back(value);
pthread_mutex_unlock(&_lock);
}
template <typename T>
T Queue<T>::pop( )
{
if (_list.empty( ))
{
throw "element not found";
}
pthread_mutex_lock(&_lock);
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(&_lock);
return _temp;
}
上述代碼是有效的。但是,請(qǐng)考慮這樣的情況:您有一個(gè)很長(zhǎng)的隊(duì)列(可能包含超過(guò) 100,000 個(gè)元素),而且在代碼執(zhí)行期間的某個(gè)時(shí)候,從隊(duì)列中讀取數(shù)據(jù)的線程遠(yuǎn)遠(yuǎn)多于添加數(shù)據(jù)的線程。因?yàn)樘砑雍腿〕鰯?shù)據(jù)操作使用相同的互斥鎖,所以讀取數(shù)據(jù)的速度會(huì)影響寫(xiě)數(shù)據(jù)的線程訪問(wèn)鎖。那么,使用兩個(gè)鎖怎么樣?一個(gè)鎖用于讀取操作,另一個(gè)用于寫(xiě)操作。給出修改后的 Queue 類。
template <typename T>
class Queue
{
public:
Queue( )
{
pthread_mutex_init(&_rlock, NULL);
pthread_mutex_init(&_wlock, NULL);
}
~Queue( )
{
pthread_mutex_destroy(&_rlock);
pthread_mutex_destroy(&_wlock);
}
void push(const T& data);
T pop( );
private:
list<T> _list;
pthread_mutex_t _rlock, _wlock;
};
template <typename T>
void Queue<T>::push(const T& value )
{
pthread_mutex_lock(&_wlock);
_list.push_back(value);
pthread_mutex_unlock(&_wlock);
}
template <typename T>
T Queue<T>::pop( )
{
if (_list.empty( ))
{
throw "element not found";
}
pthread_mutex_lock(&_rlock);
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(&_rlock);
return _temp;
}
設(shè)計(jì)并發(fā)阻塞隊(duì)列
目前,如果讀線程試圖從沒(méi)有數(shù)據(jù)的隊(duì)列讀取數(shù)據(jù),僅僅會(huì)拋出異常并繼續(xù)執(zhí)行。但是,這種做法不總是我們想要的,讀線程很可能希望等待(即阻塞自身),直到有數(shù)據(jù)可用時(shí)為止。這種隊(duì)列稱為阻塞的隊(duì)列。如何讓讀線程在發(fā)現(xiàn)隊(duì)列是空的之后等待?一種做法是定期輪詢隊(duì)列。但是,因?yàn)檫@種做法不保證隊(duì)列中有數(shù)據(jù)可用,它可能會(huì)導(dǎo)致浪費(fèi)大量 CPU 周期。推薦的方法是使用條件變量,即 pthread_cond_t 類型的變量。
template <typename T>
class BlockingQueue
{
public:
BlockingQueue ( )
{
pthread_mutexattr_init(&_attr);
// set lock recursive
pthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&_lock,&_attr);
pthread_cond_init(&_cond, NULL);
}
~BlockingQueue ( )
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
void push(const T& data);
bool push(const T& data, const int seconds); //time-out push
T pop( );
T pop(const int seconds); // time-out pop
private:
list<T> _list;
pthread_mutex_t _lock;
pthread_mutexattr_t _attr;
pthread_cond_t _cond;
};
template <typename T>
T BlockingQueue<T>::pop( )
{
pthread_mutex_lock(&_lock);
while (_list.empty( ))
{
pthread_cond_wait(&_cond, &_lock) ;
}
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(&_lock);
return _temp;
}
template <typename T>
void BlockingQueue <T>::push(const T& value )
{
pthread_mutex_lock(&_lock);
const bool was_empty = _list.empty( );
_list.push_back(value);
pthread_mutex_unlock(&_lock);
if (was_empty)
pthread_cond_broadcast(&_cond);
}
并發(fā)阻塞隊(duì)列設(shè)計(jì)有兩個(gè)要注意的方面:
1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 會(huì)釋放至少一個(gè)等待條件變量的線程,這個(gè)線程不一定是等待時(shí)間最長(zhǎng)的讀線程。盡管使用 pthread_cond_signal 不會(huì)損害阻塞隊(duì)列的功能,但是這可能會(huì)導(dǎo)致某些讀線程的等待時(shí)間過(guò)長(zhǎng)。
2.可能會(huì)出現(xiàn)虛假的線程喚醒。因此,在喚醒讀線程之后,要確認(rèn)列表非空,然后再繼續(xù)處理。強(qiáng)烈建議使用基于 while 循環(huán)的 pop()。
設(shè)計(jì)有超時(shí)限制的并發(fā)阻塞隊(duì)列
在許多系統(tǒng)中,如果無(wú)法在特定的時(shí)間段內(nèi)處理新數(shù)據(jù),就根本不處理數(shù)據(jù)了。例如,新聞?lì)l道的自動(dòng)收?qǐng)?bào)機(jī)顯示來(lái)自金融交易所的實(shí)時(shí)股票行情,它每 n 秒收到一次新數(shù)據(jù)。如果在 n 秒內(nèi)無(wú)法處理以前的一些數(shù)據(jù),就應(yīng)該丟棄這些數(shù)據(jù)并顯示最新的信息。根據(jù)這個(gè)概念,我們來(lái)看看如何給并發(fā)隊(duì)列的添加和取出操作增加超時(shí)限制。這意味著,如果系統(tǒng)無(wú)法在指定的時(shí)間限制內(nèi)執(zhí)行添加和取出操作,就應(yīng)該根本不執(zhí)行操作。
template <typename T>
bool BlockingQueue <T>::push(const T& data, const int seconds)
{
struct timespec ts1, ts2;
const bool was_empty = _list.empty( );
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_mutex_lock(&_lock);
clock_gettime(CLOCK_REALTIME, &ts2);
if ((ts2.tv_sec – ts1.tv_sec) <seconds)
{
was_empty = _list.empty( );
_list.push_back(value);
}
pthread_mutex_unlock(&_lock);
if (was_empty)
pthread_cond_broadcast(&_cond);
}
template <typename T>
T BlockingQueue <T>::pop(const int seconds)
{
struct timespec ts1, ts2;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_mutex_lock(&_lock);
clock_gettime(CLOCK_REALTIME, &ts2);
// First Check: if time out when get the _lock
if ((ts1.tv_sec – ts2.tv_sec) < seconds)
{
ts2.tv_sec += seconds; // specify wake up time
while(_list.empty( ) && (result == 0))
{
result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;
}
if (result == 0) // Second Check: if time out when timedwait
{
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(&_lock);
return _temp;
}
}
pthread_mutex_unlock(&lock);
throw "timeout happened";
}
設(shè)計(jì)有大小限制的并發(fā)阻塞隊(duì)列
最后,討論有大小限制的并發(fā)阻塞隊(duì)列。這種隊(duì)列與并發(fā)阻塞隊(duì)列相似,但是對(duì)隊(duì)列的大小有限制。在許多內(nèi)存有限的嵌入式系統(tǒng)中,確實(shí)需要有大小限制的隊(duì)列。
對(duì)于阻塞隊(duì)列,只有讀線程需要在隊(duì)列中沒(méi)有數(shù)據(jù)時(shí)等待。對(duì)于有大小限制的阻塞隊(duì)列,如果隊(duì)列滿了,寫(xiě)線程也需要等待。
template <typename T>
class BoundedBlockingQueue
{
public:
BoundedBlockingQueue (int size) : maxSize(size)
{
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_rcond, NULL);
pthread_cond_init(&_wcond, NULL);
_array.reserve(maxSize);
}
~BoundedBlockingQueue ( )
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_rcond);
pthread_cond_destroy(&_wcond);
}
void push(const T& data);
T pop( );
private:
vector<T> _array; // or T* _array if you so prefer
int maxSize;
pthread_mutex_t _lock;
pthread_cond_t _rcond, _wcond;
};
template <typename T>
void BoundedBlockingQueue <T>::push(const T& value )
{
pthread_mutex_lock(&_lock);
const bool was_empty = _array.empty( );
while (_array.size( ) == maxSize)
{
pthread_cond_wait(&_wcond, &_lock);
}
_array.push_back(value);
pthread_mutex_unlock(&_lock);
if (was_empty)
pthread_cond_broadcast(&_rcond);
}
template <typename T>
T BoundedBlockingQueue<T>::pop( )
{
pthread_mutex_lock(&_lock);
const bool was_full = (_array.size( ) == maxSize);
while(_array.empty( ))
{
pthread_cond_wait(&_rcond, &_lock) ;
}
T _temp = _array.front( );
_array.erase( _array.begin( ));
pthread_mutex_unlock(&_lock);
if (was_full)
pthread_cond_broadcast(&_wcond);
return _temp;
}
要注意的第一點(diǎn)是,這個(gè)阻塞隊(duì)列有兩個(gè)條件變量而不是一個(gè)。如果隊(duì)列滿了,寫(xiě)線程等待 _wcond 條件變量;讀線程在從隊(duì)列中取出數(shù)據(jù)之后需要通知所有線程。同樣,如果隊(duì)列是空的,讀線程等待 _rcond 變量,寫(xiě)線程在把數(shù)據(jù)插入隊(duì)列中之后向所有線程發(fā)送廣播消息。如果在發(fā)送廣播通知時(shí)沒(méi)有線程在等待 _wcond 或 _rcond,會(huì)發(fā)生什么?什么也不會(huì)發(fā)生;系統(tǒng)會(huì)忽略這些消息。還要注意,兩個(gè)條件變量使用相同的互斥鎖。
相關(guān)文章
關(guān)于shell的幾個(gè)不為人知卻十分有用的命令分享
這篇文章主要介紹了關(guān)于shell的幾個(gè)不為人知卻十分有用的命令,需要的朋友可以參考下2016-03-03Linux下統(tǒng)計(jì)當(dāng)前文件夾下的文件個(gè)數(shù)、目錄個(gè)數(shù)
這篇文章主要介紹了Linux下統(tǒng)計(jì)當(dāng)前文件夾下的文件個(gè)數(shù)、目錄個(gè)數(shù),本文使用ls命令配合管理、grep命令實(shí)現(xiàn)統(tǒng)計(jì)需求,需要的朋友可以參考下2014-10-10shell 1>&2 2>&1 &>filename重定向的含義和區(qū)別
這篇文章主要介紹了shell 1>&2 2>&1 &>filename重定向的含義和區(qū)別,需要的朋友可以參考下2015-04-04統(tǒng)計(jì)網(wǎng)卡流量的兩段shell腳本(使用ifconfig)
一個(gè)很小巧的shell腳本,使用ifconfig的不間斷輸出來(lái)統(tǒng)計(jì)網(wǎng)卡的流量,有需要的朋友可以參考下2013-02-02linux echo命令以及l(fā)inux echo命令提示權(quán)限不夠的解決辦法
linux的echo命令, 在shell編程中極為常用, 在終端下打印變量value的時(shí)候也是常常用到的, 因此有必要了解下echo的用法。下面通過(guò)本文給大家介紹linux echo命令以及l(fā)inux echo命令提示權(quán)限不夠的解決辦法,感興趣的朋友一起看看吧2017-09-09