C++線(xiàn)程池實(shí)現(xiàn)代碼
前言
這段時(shí)間看了《C++并發(fā)編程實(shí)戰(zhàn)》的基礎(chǔ)內(nèi)容,想著利用最近學(xué)的知識(shí)自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線(xiàn)程池。
什么是線(xiàn)程池
線(xiàn)程池(thread pool)是一種線(xiàn)程使用模式。線(xiàn)程過(guò)多或者頻繁創(chuàng)建和銷(xiāo)毀線(xiàn)程會(huì)帶來(lái)調(diào)度開(kāi)銷(xiāo),進(jìn)而影響緩存局部性和整體性能。而線(xiàn)程池維護(hù)著多個(gè)線(xiàn)程,等待著管理器分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷(xiāo)毀線(xiàn)程的代價(jià),以及保證了線(xiàn)程的可復(fù)用性。線(xiàn)程池不僅能夠保證內(nèi)核的充分利用,還能防止過(guò)分調(diào)度。
思路
個(gè)人對(duì)線(xiàn)程池的理解是:利用已經(jīng)創(chuàng)建的固定數(shù)量的線(xiàn)程去執(zhí)行指定的任務(wù),從而避免線(xiàn)程重復(fù)創(chuàng)建和銷(xiāo)毀帶來(lái)的額外開(kāi)銷(xiāo)。
C++11中,線(xiàn)程我們可以理解為對(duì)應(yīng)一個(gè)thread對(duì)象,任務(wù)可以理解為要執(zhí)行的函數(shù),通常是耗時(shí)的函數(shù)。
我們的任務(wù)多少和順序并非固定的,因此需要有一個(gè)方法能添加指定的任務(wù),任務(wù)存放的地方應(yīng)該是一個(gè)任務(wù)隊(duì)列,因?yàn)槲覀兊木€(xiàn)程數(shù)量有限,當(dāng)任務(wù)很多時(shí)同時(shí)執(zhí)行的任務(wù)數(shù)量也有限,因此任務(wù)需要排隊(duì),遵循先來(lái)后到的原則。
當(dāng)要執(zhí)行一個(gè)任務(wù)時(shí),意味著先將這個(gè)任務(wù)從隊(duì)列取出,再執(zhí)行相應(yīng)任務(wù),而“取出”動(dòng)作的執(zhí)行者是線(xiàn)程池中的線(xiàn)程,這意味我們的隊(duì)列需要考慮多個(gè)線(xiàn)程在同一隊(duì)列上執(zhí)行“取出”操作的問(wèn)題,實(shí)際上,取出任務(wù)操作和添加任務(wù)操作也不能同時(shí)進(jìn)行,否則會(huì)產(chǎn)生競(jìng)爭(zhēng)條件;另一方面,程序本身如果就是多線(xiàn)程的,多個(gè)線(xiàn)程同時(shí)添加任務(wù)的操作也應(yīng)該是互斥的。
當(dāng)沒(méi)有任務(wù)可以執(zhí)行時(shí),所有線(xiàn)程應(yīng)該什么也不做,當(dāng)出現(xiàn)了一個(gè)任務(wù)時(shí),應(yīng)該將這個(gè)任務(wù)分配到任一線(xiàn)程中執(zhí)行。實(shí)現(xiàn)上我們固然可以使用輪詢(xún)的方式判斷當(dāng)前隊(duì)列是否有任務(wù),有則取出(即使加了互斥鎖似乎也無(wú)法避免競(jìng)爭(zhēng)條件?),但這樣會(huì)消耗無(wú)謂的CPU資源,寫(xiě)輪詢(xún)周期難以選取。其實(shí),我們可以使用condition_variable代替輪詢(xún)。
上述任務(wù)的創(chuàng)建和取出其實(shí)就是經(jīng)典的生產(chǎn)者消費(fèi)者模型。
我們將上面的內(nèi)容都封裝在一個(gè)類(lèi)中,取名ThreadPool,用戶(hù)可以在構(gòu)造ThreadPool對(duì)象時(shí)指定線(xiàn)程池大小,之后可以隨時(shí)添加要執(zhí)行的任務(wù)。
實(shí)現(xiàn)
class ThreadPool
{
public:
ThreadPool(int n);
~ThreadPool();
void pushTask(packaged_task<void()> &&task);
private:
vector<thread*> threadPool;
deque<packaged_task<void()>> taskQueue;
void taskConsumer();
mutex taskMutex;
condition_variable taskQueueCond;
};
ThreadPool::ThreadPool(int n)
{
for (int i = 0; i < n; i++)
{
thread *t = new thread(&ThreadPool::taskConsumer,this);
threadPool.push_back(t);
t->detach();
}
}
ThreadPool::~ThreadPool()
{
while (!threadPool.empty())
{
thread *t=threadPool.back();
threadPool.pop_back();
delete t;
}
}
void ThreadPool::pushTask(packaged_task<void()> &&task)
{
{
lock_guard<mutex> guard(taskMutex);
taskQueue.push_back(std::move(task));
}
taskQueueCond.notify_one();
}
void ThreadPool::taskConsumer()
{
while (true)
{
unique_lock<mutex> lk(taskMutex);
taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
packaged_task<void()> task=std::move(taskQueue.front());
taskQueue.pop_front();
lk.unlock();
task();
}
}
這里我使用packaged_task作為任務(wù),每當(dāng)添加一個(gè)任務(wù),就調(diào)用condition_variable::notify_one方法,調(diào)用condition_variable::wait的線(xiàn)程就會(huì)被喚醒,并檢查等待條件。這里有個(gè)小細(xì)節(jié)是notify_one在解鎖后執(zhí)行,這樣避免線(xiàn)程喚醒后還要等待互斥鎖解鎖。
使用示例:
void Task1()
{
Sleep(1000);
cout << "Task1"<<endl;
}
void Task5()
{
Sleep(5000);
cout << "Task5" << endl;
}
class Worker
{
public:
void run();
};
void Worker::run()
{
cout << "Worker::run start" << endl;
Sleep(5000);
cout << "Worker::run end" << endl;
}
int main()
{
ThreadPool pool(2);
pool.pushTask(packaged_task<void()>(Task5));
pool.pushTask(packaged_task<void()>(Task1));
pool.pushTask(packaged_task<void()>(Task1));
Worker worker;
pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker)));
pool.pushTask(packaged_task<void()>([&](){worker.run();}));
Sleep(20000);
}
這個(gè)線(xiàn)程池目前有幾個(gè)缺點(diǎn):
- 只能傳入調(diào)用形式為void()形式的函數(shù)或可調(diào)用對(duì)象,不能返回任務(wù)執(zhí)行的值,只能通過(guò)其他方式同步任務(wù)執(zhí)行結(jié)果(如果有)
- 傳入?yún)?shù)較為復(fù)雜,必須封裝一層packaged_task,調(diào)用對(duì)象方法時(shí)需要使用bind或者lambda表達(dá)式的方法封裝
以上缺點(diǎn)在當(dāng)前版本的實(shí)現(xiàn)不予解決,日后另寫(xiě)博文優(yōu)化。
2021/12/29 更新之一:
事實(shí)上,我們只要將packaged_task改為funtion模板類(lèi),就可以簡(jiǎn)化我們的調(diào)用參數(shù):
class ThreadPool
{
public:
ThreadPool(int n);
~ThreadPool();
void pushTask(function<void()> task);
private:
vector<thread*> threadPool;
deque<function<void()>> taskQueue;
void taskConsumer();
mutex taskMutex;
condition_variable taskQueueCond;
};
ThreadPool::ThreadPool(int n)
{
for (int i = 0; i < n; i++)
{
thread *t = new thread(&ThreadPool::taskConsumer,this);
threadPool.push_back(t);
t->detach();
}
}
ThreadPool::~ThreadPool()
{
while (!threadPool.empty())
{
thread *t=threadPool.back();
threadPool.pop_back();
delete t;
}
}
void ThreadPool::pushTask(function<void()> task)
{
{
lock_guard<mutex> guard(taskMutex);
taskQueue.push_back(std::move(task));
}
taskQueueCond.notify_one();
}
void ThreadPool::taskConsumer()
{
while (true)
{
unique_lock<mutex> lk(taskMutex);
taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
function<void()> task=taskQueue.front();
taskQueue.pop_front();
lk.unlock();
task();
}
}
調(diào)用代碼改為如下:
ThreadPool pool(2);
pool.pushTask(&Task5);
pool.pushTask(&Task1);
pool.pushTask(&Task1);
Worker worker;
pool.pushTask((bind(&Worker::run, &worker)));
pool.pushTask([&](){worker.run(); });//1
Sleep(15000);
我們可以執(zhí)行指定的函數(shù),也可以將要執(zhí)行的代碼放入lambda表達(dá)式的函數(shù)體中,正如1處所示,這樣就能在其他線(xiàn)程中執(zhí)行指定的代碼了。
2021/12/29 更新之二:
我們發(fā)現(xiàn),main最后都要調(diào)用sleep函數(shù)來(lái)避免主線(xiàn)程在線(xiàn)程任務(wù)完成之前就退出,因此我們希望添加一個(gè)接口,等待線(xiàn)程所有任務(wù)完成,改進(jìn)如下,其他函數(shù)同前:
class ThreadPool
{
public:
ThreadPool(int n);
~ThreadPool();
void pushTask(function<void()> task);
void waitAllTask();
private:
vector<thread*> threadPool;
deque<function<void()>> taskQueue;
atomic<int> busyCount;
bool bStop;
void taskConsumer();
mutex taskQueueMutex;
condition_variable taskQueueCond;
condition_variable taskFinishedCond;
};
void ThreadPool::taskConsumer()
{
while (!bStop)
{
unique_lock<mutex> lk(taskQueueMutex);
taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
busyCount++;
function<void()> task=taskQueue.front();
taskQueue.pop_front();
lk.unlock();
task();
busyCount--;
taskFinishedCond.notify_one();
}
}
void ThreadPool::waitAllTask()
{
unique_lock<mutex> lk(taskQueueMutex);
taskFinishedCond.wait(lk, [&] {return taskQueue.empty() && busyCount==0; });//所有任務(wù)均已完成
}
這樣我們只要調(diào)用waitAllTask就可以等待所有任務(wù)完成啦。
到此這篇關(guān)于C++線(xiàn)程池實(shí)現(xiàn)代碼的文章就介紹到這了,更多相關(guān)C++線(xiàn)程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 基于C++17實(shí)現(xiàn)的手寫(xiě)線(xiàn)程池
- 基于C++11實(shí)現(xiàn)手寫(xiě)線(xiàn)程池的示例代碼
- C++ 學(xué)習(xí)筆記實(shí)戰(zhàn)寫(xiě)一個(gè)簡(jiǎn)單的線(xiàn)程池示例
- C++單例模式實(shí)現(xiàn)線(xiàn)程池的示例代碼
- C++實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線(xiàn)程池的示例代碼
- C/C++ 原生API實(shí)現(xiàn)線(xiàn)程池的方法
- C++11 簡(jiǎn)單實(shí)現(xiàn)線(xiàn)程池的方法
- C++實(shí)現(xiàn)線(xiàn)程池的簡(jiǎn)單方法示例
- 深入解析C++編程中線(xiàn)程池的使用
- c++實(shí)現(xiàn)簡(jiǎn)單的線(xiàn)程池
- c++線(xiàn)程池實(shí)現(xiàn)方法
- C++線(xiàn)程池實(shí)現(xiàn)
相關(guān)文章
OpenCV實(shí)現(xiàn)幀差法檢測(cè)運(yùn)動(dòng)目標(biāo)
這篇文章主要為大家詳細(xì)介紹了OpenCV實(shí)現(xiàn)幀差法檢測(cè)運(yùn)動(dòng)目標(biāo),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-03-03
c++實(shí)現(xiàn)通用參數(shù)解析類(lèi)示例
使用命令行執(zhí)行程序的時(shí)候在程序后可跟多個(gè)參數(shù)列表,而main函數(shù)的argc和argv分別存儲(chǔ)了相關(guān)的參數(shù)個(gè)數(shù)和參數(shù)內(nèi)容,而循環(huán)輸入相關(guān)的時(shí)候就需要用戶(hù)自己來(lái)解析相關(guān)參數(shù)。以下代碼用c++的方式實(shí)現(xiàn)了相關(guān)解析的封裝,使用起來(lái)非常方便2014-03-03
C++設(shè)計(jì)模式編程中使用Bridge橋接模式的完全攻略
這篇文章主要介紹了C++設(shè)計(jì)模式編程中使用Bridge橋接模式的完全攻略,Bridge將抽象部分與它的實(shí)現(xiàn)部分分離,使它們都可以獨(dú)立地變化需要的朋友可以參考下2016-03-03
C++報(bào)錯(cuò):Id?returned?1exit?status的解決辦法
最近剛學(xué)c語(yǔ)言,不止一次遇到了同一種報(bào)錯(cuò),經(jīng)過(guò)總結(jié)分享給大家,下面這篇文章主要給大家介紹了關(guān)于C++報(bào)錯(cuò):Id?returned?1exit?status的解決辦法,需要的朋友可以參考下2023-04-04
C語(yǔ)言入門(mén)學(xué)習(xí)筆記之typedef簡(jiǎn)介
typedef為C語(yǔ)言的關(guān)鍵字,作用是為一種數(shù)據(jù)類(lèi)型定義一個(gè)新名字,下面這篇文章主要給大家介紹了關(guān)于C語(yǔ)言入門(mén)學(xué)習(xí)筆記之typedef簡(jiǎn)介的相關(guān)資料,需要的朋友可以參考下2021-11-11

