C++線程池實現(xiàn)
一、線程池簡介
線程池是一種并發(fā)編程技術(shù),通過預先創(chuàng)建一組線程并復用它們來執(zhí)行多個任務,避免了頻繁創(chuàng)建和銷毀線程的開銷。它特別適合處理大量短生命周期任務的場景(如服務器請求、并行計算)。
線程池的核心組件
1. 任務隊列(Task Queue)
存儲待執(zhí)行的任務(通常是函數(shù)對象或可調(diào)用對象)。
2. 工作線程(Worker Threads)
一組預先創(chuàng)建的線程,不斷從隊列中取出任務并執(zhí)行。
3. 同步機制互斥鎖(Mutex):保護任務隊列的線程安全訪問。
條件變量(Condition Variable):通知線程任務到達或線程池終止。
實現(xiàn)步驟
1. 初始化線程池創(chuàng)建固定數(shù)量的線程,每個線程循環(huán)等待任務。
2. 提交任務將任務包裝成函數(shù)對象,加入任務隊列。
3. 任務執(zhí)行工作線程從隊列中取出任務并執(zhí)行。
4. 終止線程池發(fā)送停止信號,等待所有線程完成當前任務后退出。
二、C++11實現(xiàn)線程池
源碼
#include <vector> #include <queue> #include <future> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <stdexcept> class ThreadPool { public: //構(gòu)造函數(shù):根據(jù)輸入的線程數(shù)(默認硬件并發(fā)數(shù))創(chuàng)建工作線程。 //每個工作線程執(zhí)行一個循環(huán),不斷從任務隊列中取出并執(zhí)行任務。 //explicit關(guān)鍵字防止隱式類型轉(zhuǎn)換 explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { if (threads == 0) { threads = 1; } for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); //等待條件:線程通過條件變量等待任務到來或停止信號。(CPU使用率:休眠時接近0%,僅在任務到來時喚醒) //lambda表達式作為謂詞,當條件(停止信號為true 或 任務隊列非空)為真時,才會解除阻塞。 this->condition.wait(lock, [this] { return (this->stop || !this->tasks.empty()); }); /* 傳統(tǒng)忙等待:while (!(stop || !tasks.empty())) {} // 空循環(huán)消耗CPU */ if (this->stop && this->tasks.empty()) { //如果線程池需要終止且任務隊列為空則直接return return; } //任務提?。簭年犃兄腥〕鋈蝿詹?zhí)行,使用std::move避免拷貝開銷。 task = std::move(this->tasks.front()); this->tasks.pop(); } //執(zhí)行任務 task(); } }); } } //任務提交(enqueue方法) template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; //任務封裝:使用std::packaged_task包裝用戶任務,支持異步返回結(jié)果。 //智能指針管理:shared_ptr確保任務對象的生命周期延續(xù)至執(zhí)行完畢。 //完美轉(zhuǎn)發(fā):通過std::forward保持參數(shù)的左值/右值特性。 auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task]() { (*task)(); }); /* push傳入的對象需要事先構(gòu)造好,再復制過去插入容器中; 而emplace則可以自己使用構(gòu)造函數(shù)所需的參數(shù)構(gòu)造出對象,并直接插入容器中。 emplace相比于push省去了復制的步驟,則使用emplace會更加節(jié)省內(nèi)存。*/ } condition.notify_one(); return res; } ~ThreadPool() { //設置stop標志,喚醒所有線程,等待任務隊列清空。 { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } private: std::vector<std::thread> workers; //存儲工作線程對象 std::queue<std::function<void()>> tasks; //任務隊列,存儲待執(zhí)行的任務 std::mutex queue_mutex; //保護任務隊列的互斥鎖 std::condition_variable condition; //線程間同步的條件變量 bool stop; //線程池是否停止標志 };
三、線程池源碼解析
1. 成員變量
std::vector<std::thread> workers; // 工作線程容器 std::queue<std::function<void()>> tasks; // 任務隊列 std::mutex queue_mutex; // 隊列互斥鎖 std::condition_variable condition; // 條件變量 bool stop; // 停機標志
設計要點:
采用生產(chǎn)者-消費者模式,任務隊列作為共享資源
組合使用
mutex
+condition_variable
實現(xiàn)線程同步vector
存儲線程對象便于統(tǒng)一管理生命周期
2. 構(gòu)造函數(shù)
2.1 線程初始化
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { if (threads == 0) { threads = 1; } for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { /* 工作線程邏輯 */ }); } }
設計要點:
explicit
防止隱式類型轉(zhuǎn)換(如ThreadPool pool = 4;
)默認使用硬件并發(fā)線程數(shù)(通過
hardware_concurrency()
)最少創(chuàng)建1個線程避免空池
使用
emplace_back
直接構(gòu)造線程對象
2.2 工作線程邏輯
for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); }
核心機制:
unique_lock
配合條件變量實現(xiàn)自動鎖管理雙重狀態(tài)檢查(停機標志+隊列非空)
任務提取使用移動語義避免拷貝
任務執(zhí)行在鎖作用域外進行
3. 任務提交(enqueue方法)
3.1 方法簽名
template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
類型推導:
- 使用尾置返回類型聲明
std::result_of
推導可調(diào)用對象的返回類型- 完美轉(zhuǎn)發(fā)參數(shù)(
F&&
+Args&&...
)
3.2 任務封裝
auto task = std::make_shared<std::packaged_task<return_type()>> (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
封裝策略:
packaged_task
包裝任務用于異步獲取結(jié)果shared_ptr
管理任務對象生命周期std::bind
綁定參數(shù)(注意C++11的參數(shù)轉(zhuǎn)發(fā)限制)
3.3 任務入隊
tasks.emplace([task]() { (*task)(); });
優(yōu)化點:
- 使用
emplace
直接構(gòu)造隊列元素 Lambda
捕獲shared_ptr
保持任務有效性- 顯式解引用執(zhí)行
packaged_task
4. 析構(gòu)函數(shù)
4.1 停機控制
~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (auto& worker : workers) { worker.join(); } }
停機協(xié)議:
- 設置停機標志原子操作
- 廣播喚醒所有等待線程
- 等待所有工作線程退出
5. 關(guān)鍵技術(shù)點解析
5.1 完美轉(zhuǎn)發(fā)實現(xiàn)
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- 保持參數(shù)的左右值特性
- 支持移動語義參數(shù)的傳遞
- C++11的限制:無法完美轉(zhuǎn)發(fā)所有參數(shù)類型
5.2 異常傳播機制
- 任務異常通過
future
對象傳播 packaged_task
自動捕獲異常- 用戶通過
future.get()
獲取異常
5.3 內(nèi)存管理模型
[任務提交者] | v [packaged_task] <---- shared_ptr ---- [任務隊列] | v [future]
- 三重生命周期保障:
提交者持有
future
隊列持有任務包裝器
工作線程執(zhí)行任務
四、 性能特征分析
1. 時間復雜度
操作 | 時間復雜度 |
---|---|
任務提交(enqueue) | O(1)(加鎖開銷) |
任務提取 | O(1) |
線程喚醒 | 取決于系統(tǒng)調(diào)度 |
2. 空間復雜度
組件 | 空間占用 |
---|---|
線程棧 | 每線程MB級 |
任務隊列 | 與任務數(shù)成正比 |
同步原語 | 固定大小 |
五、 擴展優(yōu)化方向
1. 任務竊?。╓ork Stealing)
- 實現(xiàn)多個任務隊列
- 空閑線程從其他隊列竊取任務
2. 動態(tài)線程池
void adjust_workers(size_t new_size) { if (new_size > workers.size()) { // 擴容邏輯 } else { // 縮容邏輯 } }
3. 優(yōu)先級隊列
using Task = std::pair<int, std::function<void()>>; // 優(yōu)先級+任務 std::priority_queue<Task> tasks;
4. 無鎖隊列
moodycamel::ConcurrentQueue<std::function<void()>> tasks;
六、 典型問題排查指南
現(xiàn)象 | 可能原因 | 解決方案 |
---|---|---|
任務未執(zhí)行 | 線程池提前析構(gòu) | 延長線程池生命周期 |
future.get() 永久阻塞 | 任務未提交/異常未處理 | 檢查任務提交路徑 |
CPU利用率100% | 忙等待或鎖競爭 | 優(yōu)化任務粒度/使用無鎖結(jié)構(gòu) |
內(nèi)存持續(xù)增長 | 任務對象未正確釋放 | 檢查智能指針使用 |
該實現(xiàn)完整展現(xiàn)了現(xiàn)代C++線程池的核心設計范式,開發(fā)者可根據(jù)具體需求在此基礎進行功能擴展和性能優(yōu)化。理解這個代碼結(jié)構(gòu)是掌握更高級并發(fā)模式的基礎。
七、 測試用例
使用實例(C++11兼容):
#include <iostream> int main() { ThreadPool pool(4); // 提交普通函數(shù) auto future1 = pool.enqueue([](int a, int b) { return a + b; }, 2, 3); // 提交成員函數(shù) struct Calculator { int multiply(int a, int b) { return a * b; } } calc; auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc, std::placeholders::_1, std::placeholders::_2), 4, 5); // 異常處理示例 auto future3 = pool.enqueue([]() -> int { throw std::runtime_error("example error"); return 1; }); std::cout << "2+3=" << future1.get() << std::endl; std::cout << "4*5=" << future2.get() << std::endl; try { future3.get(); } catch(const std::exception& e) { std::cout << "Caught exception: " << e.what() << std::endl; } return 0; }
到此這篇關(guān)于C++線程池實現(xiàn)的文章就介紹到這了,更多相關(guān)C++線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺析C++中strlen函數(shù)的使用與模擬實現(xiàn)strlen的方法
這篇文章主要介紹了strlen函數(shù)的使用與模擬實現(xiàn)strlen的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03使用C++ Matlab中的lp2lp函數(shù)教程詳解
本文介紹如何使用C++編寫數(shù)字濾波器設計算法,實現(xiàn)Matlab中的lp2lp函數(shù),將低通濾波器轉(zhuǎn)換為參數(shù)化的低通濾波器,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2023-04-04