基于C++11實現(xiàn)手寫線程池的示例代碼
在實際的項目中,使用線程池是非常廣泛的,所以最近學(xué)習(xí)了線程池的開發(fā),在此做一個總結(jié)。
源碼:https://github.com/Cheeron955/Handwriting-threadpool-based-on-C-17
項目介紹
項目分為兩個部分,在初版的時候,使用了C++11中的知識,自己實現(xiàn)了Any類,Semaphore類以及Result類的開發(fā),代碼比較繞,但是有很多細節(jié)是值得學(xué)習(xí)的;最終版使用了C++17提供的future類,使得代碼輕量化。接下來先看初版:
test.cpp
先從test.cpp開始剖析項目構(gòu)造:
#include <iostream>
#include <chrono>
#include <thread>
#include "threadpool.h"
using ULong = unsigned long long;
class MyTask : public Task
{
public:
MyTask(int begin, int end)
:begin_(begin)
,end_(end)
{
}
Any run()
{
std::cout << "tid:" << std::this_thread::get_id() << "begin!" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
ULong sum = 0;
for (ULong i = begin_; i < end_; i++)
{
sum += i;
}
std::cout << "tid:" << std::this_thread::get_id() << "end!" << std::endl;
return sum;
}
private:
int begin_;
int end_;
};
int main()
{
{
ThreadPool pool;
pool.setMode(PoolMode::MODE_CACHED);
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
ULong sum1 = res1.get().cast_<ULong>();
std::cout << sum1 << std::endl;
}
std::cout << "main over!" << std::endl;
getchar();
}
在main函數(shù)中,創(chuàng)建了一個ThreadPool對象,進入ThreadPool中:
ThreadPool
重要成員變量
std::unordered_map<int, std::unique_ptr<Thread>> threads_; //初始的線程數(shù)量 int initThreadSize_; //記錄當(dāng)前線程池里面線程的總數(shù)量 std::atomic_int curThreadSize_; //線程數(shù)量上限閾值 int threadSizeThresHold_; //記錄空閑線程的數(shù)量 std::atomic_int idleThreadSize_; //任務(wù)隊列 std::queue<std::shared_ptr<Task>> taskQue_; //任務(wù)數(shù)量 需要保證線程安全 std::atomic_int taskSize_; //任務(wù)隊列數(shù)量上限閾值 int taskQueMaxThresHold_; //任務(wù)隊列互斥鎖,保證任務(wù)隊列的線程安全 std::mutex taskQueMtx_; //表示任務(wù)隊列不滿 std::condition_variable notFull_; //表示任務(wù)隊列不空 std::condition_variable notEmpty_; //等待線程資源全部回收 std::condition_variable exitCond_; //當(dāng)前線程池的工作模式 PoolMode poolMode_; //表示當(dāng)前線程池的啟動狀態(tài) std::atomic_bool isPoolRuning_; const int TASK_MAX_THRESHHOLD = INT32_MAX; const int THREAD_MAX_THRESHHOLD = 100; const int THREAD_MAX_IDLE_TIME = 60; //60s
具體含義請看代碼中注釋
重要成員函數(shù)
構(gòu)造函數(shù)
//線程池構(gòu)造
ThreadPool::ThreadPool()
: initThreadSize_(4)
, taskSize_(0)
, idleThreadSize_(0)
, curThreadSize_(0)
, threadSizeThresHold_(THREAD_MAX_THRESHHOLD)
, taskQueMaxThresHold_(TASK_MAX_THRESHHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRuning_(false)
{
}
- 進行了一系列的初始化,包括線程數(shù)量,闕值等等
析構(gòu)函數(shù)
ThreadPool::~ThreadPool()
{
isPoolRuning_ = false;
//notEmpty_.notify_all();//把等待的叫醒 進入阻塞 會死鎖
std::unique_lock<std::mutex> lock(taskQueMtx_);
//等待線程池里面所有的線程返回用戶調(diào)用ThreadPool退出 兩種狀態(tài):阻塞 正在執(zhí)行任務(wù)中
notEmpty_.notify_all();//把等待的叫醒 進入阻塞
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
- 析構(gòu)函數(shù)中,主要是回收線程池的資源,但是這里要注意
notEmpty_.notify_all();位置,如果在獲得鎖之前就喚醒,可能會發(fā)生死鎖問題,這個在下面還會在提到。
設(shè)置線程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{
if (checkRunningState()) return;
poolMode_ = mode;
}
設(shè)置task任務(wù)隊列上限閾值
void ThreadPool::setTaskQueMaxThreshHold(int threshhold)
{
if (checkRunningState()) return;
taskQueMaxThresHold_ = threshhold;
}
設(shè)置線程池的工作模式,支持fixed以及cached模式
enum class PoolMode
{
MODE_FIXED, //固定數(shù)量的線程
MODE_CACHED, //線程數(shù)量可動態(tài)增長
};
void ThreadPool::setMode(PoolMode mode)
{
if (checkRunningState()) return;
poolMode_ = mode;
}
設(shè)置task任務(wù)隊列上限閾值
void ThreadPool::setTaskQueMaxThreshHold(int threshhold)
{
if (checkRunningState()) return;
taskQueMaxThresHold_ = threshhold;
}
設(shè)置線程池cached模式下線程閾值
void ThreadPool::setThreadSizeThreshHold(int threshhold)
{
if (checkRunningState()) return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeThresHold_ = threshhold;
}
}
給線程池提交任務(wù),這是重中之重,用來生產(chǎn)任務(wù)
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
//獲取鎖
std::unique_lock<std::mutex> lock(taskQueMtx_);
//線程通信 等待任務(wù)隊列有空余 并且用戶提交任務(wù)最長不能阻塞超過1s 否則判斷提交失敗,返回
if(!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThresHold_; }))
{
std::cerr << "task queue is full,submit task fail." << std::endl;
return Result(sp, false);
}
//如果有空余,把任務(wù)放入任務(wù)隊列中
taskQue_.emplace(sp);
taskSize_++;
notEmpty_.notify_all();
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_>idleThreadSize_
&& curThreadSize_ < threadSizeThresHold_)
{
std::cout << ">>> create new thread" << std::endl;
//創(chuàng)建thread線程對象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
//threads_.emplace_back(std::move(ptr)); //資源轉(zhuǎn)移
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start(); //啟動線程
//修改線程個數(shù)相關(guān)的變量
curThreadSize_++;
idleThreadSize_++;
}
//返回任務(wù)的Result對象
return Result(sp);
}
- 在submitTask函數(shù)中,首先這是生產(chǎn)任務(wù)的函數(shù),所以我們要保證線程安全,獲取鎖;
- 考慮到了如果有耗時嚴重的任務(wù)一直占用,線程,導(dǎo)致提交任務(wù)一直失敗,所以等待1s提交失敗以后會通知用戶;
- 此時隊列里面的任務(wù)沒有超過闕值,就把任務(wù)放入任務(wù)隊列中,更新任務(wù)數(shù);
- 因為新放了任務(wù),任務(wù)隊列不空了,在notEmpty_上進行通知,趕快分配線程執(zhí)行任務(wù);
- cached模式下,需要根據(jù)任務(wù)數(shù)量和空閑線程的數(shù)量,判斷是否需要創(chuàng)建新的線程出來,如果任務(wù)數(shù)大于現(xiàn)有的空閑線程數(shù)并且沒有超過闕值,就增加線程,修改相關(guān)數(shù)量;
- 返回任務(wù)的Result對象
開啟線程池
void ThreadPool::start(int initThreadSize)
{
//設(shè)置線程池的運行狀態(tài)
isPoolRuning_ = true;
//記錄初始線程個數(shù)
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;
//創(chuàng)建線程對象
for (int i = 0; i < initThreadSize_; i++)
{
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
}
//啟動所有線程 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start(); //需要執(zhí)行一個線程函數(shù)
//記錄初始空閑線程的數(shù)量
idleThreadSize_++;
}
}
- 設(shè)置線程池的運行狀態(tài),如果線程在運行狀態(tài)了,之前所有的設(shè)置相關(guān)的函數(shù)都不能運行了,記錄初始相關(guān)數(shù)量
- 創(chuàng)建線程對象,把線程函數(shù)threadFunc給到thread線程對象,使用綁定器,獲取線程id,方便回收線程資源;
- 加入線程列表
std::unordered_map<int, std::unique_ptr<Thread>>類型; - 啟動所有線程,執(zhí)行線程函數(shù),threadFunc
void Thread::start()
{
std::thread t(func_,threadId_);
t.detach();
}
線程函數(shù),從任務(wù)隊列里面消費任務(wù)
void ThreadPool::threadFunc(int threadid) //線程函數(shù)返回,相應(yīng)的線程就結(jié)束了
{
auto lastTime = std::chrono::high_resolution_clock().now();
for(;;)
{
std::shared_ptr<Task> task;
{
//獲取鎖
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid:" << std::this_thread::get_id()
<< "嘗試獲取任務(wù)..." << std::endl;
while ( taskQue_.size() == 0 )
{
if (!isPoolRuning_)
{
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id()
<< "exit!" << std::endl;
//通知主線程線程被回收了,再次查看是否滿足條件
exitCond_.notify_all();
return;
}
if (poolMode_ == PoolMode::MODE_CACHED)
{ //超時返回std::cv_status::timeout
if (std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid:" << std::this_thread::get_id()
<< "exit!" << std::endl;
return;
}
}
}
else
{
//等待notEmpty_條件
notEmpty_.wait(lock);
}
/*if (!isPoolRuning_)
{
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id()
<< "exit!" << std::endl;
exitCond_.notify_all();
return;
}*/
}
idleThreadSize_--;
std::cout << "tid:" << std::this_thread::get_id()
<< "獲取任務(wù)成功..." << std::endl;
//從任務(wù)隊列中取一個任務(wù)出來
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
//若依然有剩余任務(wù),繼續(xù)通知其他線程執(zhí)行任務(wù)
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
notFull_.notify_all();
}//釋放鎖,使其他線程獲取任務(wù)或者提交任務(wù)
if (task != nullptr)
{
task->exec();
}
idleThreadSize_++;
auto lastTime = std::chrono::high_resolution_clock().now();
}
}
- 獲取任務(wù)開始的時間,便于在cached模式下,判斷是否需要回收線程
- 創(chuàng)造一個Task類,獲取鎖
class Task
{
public:
Task();
~Task()=default;
void exec();
void setResult(Result*res);
//用戶可以自定義任意任務(wù)類型,從Task繼承,重寫run方法,實現(xiàn)自定義任務(wù)處理
virtual Any run() = 0;
private:
Result* result_; //Result的生命周期》Task的
};
- 如果此時任務(wù)隊列里沒有任務(wù),并且主函數(shù)退出了,此時會在ThreadPool析構(gòu)中設(shè)置isPoolRuning_為false,這時候就該回收線程資源了,并通知析構(gòu)函數(shù)是否滿足條件;
- 如果isPoolRuning_為ture,但是在cached模式下,根據(jù)當(dāng)前時間和上一次線程使用時間,判斷有沒有超過60s,如果超過了,并且當(dāng)前線程數(shù)大于初始定義,說明不需要那么多線程了就需要回收線程資源;
- 如果不在cached模式,就阻塞等待任務(wù)隊列里面有任務(wù)
- 獲取成功任務(wù),取出,如果隊列里面還有任務(wù),繼續(xù)通知。并且取完任務(wù),消費了一個任務(wù) 進行通知可以繼續(xù)提交生產(chǎn)任務(wù)了,釋放鎖,使其他線程獲取任務(wù)或者提交任務(wù);
- 執(zhí)行任務(wù),把任務(wù)的返回值通過setVal方法給到Result;
- 線程處理完了,更新線程執(zhí)行完任務(wù)調(diào)度的時間
檢查線程池狀態(tài)
bool ThreadPool::checkRunningState() const
{
return isPoolRuning_;
}
執(zhí)行任務(wù)
void Task::exec()
{
if (result_ != nullptr)
{
result_->setVal(run()); //多態(tài)調(diào)用,run是用戶的任務(wù)
}
}
void Task::setResult(Result* res)
{
result_ = res;
}
把任務(wù)的返回值通過setVal方法給到Result
信號量類
class Semaphore
{
public:
Semaphore(int limit = 0)
:resLimit_(limit)
{}
~Semaphore() = default;
void wait()
{
std::unique_lock<std::mutex> lock(mtx_);
//等待信號量有資源 沒有資源的話 會阻塞當(dāng)前線程
cond_.wait(lock, [&]()->bool { return resLimit_ > 0; });
resLimit_--;
}
void post()
{
std::unique_lock<std::mutex> lock(mtx_);
resLimit_++;
cond_.notify_all();
}
private:
int resLimit_;
std::mutex mtx_;
std::condition_variable cond_;
};
在信號量類中使用了條件變量和互斥鎖實現(xiàn)了信號量的實現(xiàn),等待信號量資源和釋放信號量資源。
Any類
class Any
{
public:
Any() = default;
~Any() = default;
//左值
Any(const Any&) = delete;
Any& operator=(const Any&) = delete;
//右值
Any(Any&&) = default;
Any& operator=(Any&&) = default;
template<typename T>
Any(T data) :base_(std::make_unique<Derive<T>>(data))
{}
template<typename T>
T cast_()
{
Derive<T> *pd = dynamic_cast<Derive<T>*>(base_.get());
if (pd == nullptr)
{
throw "type is unmatch";
}
return pd->data_;
}
private:
//基類類型
class Base
{
public:
virtual ~Base() = default;
};
//派生類類型
template<typename T>//模板
class Derive :public Base
{
public:
Derive(T data) : data_(data)
{}
T data_; //保存了任意的其他類型
};
private:
//定義一個基類指針,基類指針可以指向派生類對象
std::unique_ptr<Base> base_;
};
- 定義了一個基類Base
- 定義了一個模板類的派生類類型,繼承Base,其中保存了任意的其他類型;
- 對象包在派生類對象里面,通過基類指針指向派生類對象,構(gòu)造函數(shù)可以讓Any類型接收任意其他的數(shù)據(jù)類型,用戶就可以使用任意期望的類型;
cast_()方法把Any對象里面存儲的data數(shù)據(jù)提取出來,基類指針指向 派生類指針 ,使用強轉(zhuǎn)dynamic_cast將基類指針或引用轉(zhuǎn)換為派生類指針或引用,獲取了指向的Derive對象,然后提取出來data_;
Result方法的實現(xiàn)
class Result
{
public:
Result(std::shared_ptr<Task> task, bool isValid = true);
~Result() = default;
//setVal方法,獲取任務(wù)執(zhí)行完的返回值
void setVal(Any any);
//用戶調(diào)用get方法,獲取task的返回值
Any get();
private:
//存儲任務(wù)的返回值
Any any_;
//線程通信信號量
Semaphore sem_;
//指向?qū)?yīng)獲取返回值的任務(wù)對象
std::shared_ptr<Task> task_;
//返回值是否有效
std::atomic_bool isValid_;
};
Result::Result(std::shared_ptr<Task> task, bool isValid)
:isValid_(isValid)
,task_(task)
{
task_->setResult(this);
}
Any Result::get()
{
if (!isValid_)
{
return " ";
}
//task任務(wù)如果沒有執(zhí)行完,這里會阻塞用戶的線程
sem_.wait();
return std::move(any_);
}
void Result::setVal(Any any)
{
//存儲task的返回值
this->any_ = std::move(any);
//已經(jīng)獲取了任務(wù)的返回值,增加信號量資源
sem_.post();
}
- Result 實現(xiàn)接受提交到線程池的task任務(wù)執(zhí)行完成后的返回值類型result;
- 設(shè)置了setVal方法,獲取任務(wù)執(zhí)行完的返回值和用戶調(diào)用get方法,獲取task的返回值,使用了信號量等到setVal設(shè)置成功,才能獲取值,否則會進入阻塞;
回到test.cpp
- 定義了一個ThreadPool對象,默認是固定的,可以修改為cached模式,然后開啟線程(可以使用hardware_concurrency()獲取cpu核心數(shù)量);
- 提交任務(wù)submitTask;
- 出 } 進行析構(gòu)
舉個栗子~
在cached模式,代碼如上test.cpp

可以看到,目前四個線程,六個任務(wù),所以創(chuàng)建了兩個線程;
六個線程獲取任務(wù)成功,然后釋放資源成功;
固定線程:
int main()
{
{
ThreadPool pool;
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
pool.submitTask(std::make_shared<MyTask>(1, 1000000));
ULong sum1 = res1.get().cast_<ULong>();
std::cout << sum1 << std::endl;
}
std::cout << "main over!" << std::endl;
getchar();
}

有四個線程,五個任務(wù),11676線程獲取了兩次任務(wù),最后回收線程資源。
好了~基于C++11實現(xiàn)的手寫線程池,就到此結(jié)束了。除此之外,在GitHub上,提供了linux下的使用方法,感興趣的小伙伴可以按照步驟實現(xiàn)一下 ~ 注意死鎖問題!下一節(jié)會剖析基于C++17實現(xiàn)的手寫線程池,代碼會看起來很輕便。
到此這篇關(guān)于基于C++11實現(xiàn)手寫線程池的示例代碼的文章就介紹到這了,更多相關(guān)C++11 手寫線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C語言數(shù)據(jù)結(jié)構(gòu)之棧和隊列的實現(xiàn)及應(yīng)用
棧和隊列是一種數(shù)據(jù)結(jié)構(gòu),只規(guī)定了性質(zhì),并沒有規(guī)定實現(xiàn)方式。本文將以順序結(jié)構(gòu)實現(xiàn)棧,鏈表方式實現(xiàn)隊列,感興趣的小伙伴快跟隨小編一起學(xué)習(xí)一下吧2022-08-08
C語言中枚舉與聯(lián)合體的使用方法(enum union)
枚舉的意思就是列舉,將每一個可能的取值都進行一一列舉,下面這篇文章主要給大家介紹了關(guān)于C語言中枚舉與聯(lián)合體的使用方法,需要的朋友可以參考下2021-09-09
VSCode配置C++環(huán)境的方法步驟(MSVC)
這篇文章主要介紹了VSCode配置C++環(huán)境的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
C++課程設(shè)計之學(xué)生成績管理系統(tǒng)
這篇文章主要為大家詳細介紹了C++課程設(shè)計之學(xué)生成績管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-12-12
關(guān)于Visual Studio無法打開源文件"stdio.h"問題
這篇文章主要介紹了關(guān)于Visual Studio無法打開源文件"stdio.h"問題,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04

