C++線程池實(shí)現(xiàn)
一、線程池簡(jiǎn)介
線程池是一種并發(fā)編程技術(shù),通過(guò)預(yù)先創(chuàng)建一組線程并復(fù)用它們來(lái)執(zhí)行多個(gè)任務(wù),避免了頻繁創(chuàng)建和銷毀線程的開(kāi)銷。它特別適合處理大量短生命周期任務(wù)的場(chǎng)景(如服務(wù)器請(qǐng)求、并行計(jì)算)。
線程池的核心組件
1. 任務(wù)隊(duì)列(Task Queue)
存儲(chǔ)待執(zhí)行的任務(wù)(通常是函數(shù)對(duì)象或可調(diào)用對(duì)象)。
2. 工作線程(Worker Threads)
一組預(yù)先創(chuàng)建的線程,不斷從隊(duì)列中取出任務(wù)并執(zhí)行。
3. 同步機(jī)制互斥鎖(Mutex):保護(hù)任務(wù)隊(duì)列的線程安全訪問(wèn)。
條件變量(Condition Variable):通知線程任務(wù)到達(dá)或線程池終止。
實(shí)現(xiàn)步驟
1. 初始化線程池創(chuàng)建固定數(shù)量的線程,每個(gè)線程循環(huán)等待任務(wù)。
2. 提交任務(wù)將任務(wù)包裝成函數(shù)對(duì)象,加入任務(wù)隊(duì)列。
3. 任務(wù)執(zhí)行工作線程從隊(duì)列中取出任務(wù)并執(zhí)行。
4. 終止線程池發(fā)送停止信號(hào),等待所有線程完成當(dāng)前任務(wù)后退出。
二、C++11實(shí)現(xiàn)線程池
源碼
#include <vector> #include <queue> #include <future> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <stdexcept> class ThreadPool { public: //構(gòu)造函數(shù):根據(jù)輸入的線程數(shù)(默認(rèn)硬件并發(fā)數(shù))創(chuàng)建工作線程。 //每個(gè)工作線程執(zhí)行一個(gè)循環(huán),不斷從任務(wù)隊(duì)列中取出并執(zhí)行任務(wù)。 //explicit關(guān)鍵字防止隱式類型轉(zhuǎn)換 explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { if (threads == 0) { threads = 1; } for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); //等待條件:線程通過(guò)條件變量等待任務(wù)到來(lái)或停止信號(hào)。(CPU使用率:休眠時(shí)接近0%,僅在任務(wù)到來(lái)時(shí)喚醒) //lambda表達(dá)式作為謂詞,當(dāng)條件(停止信號(hào)為true 或 任務(wù)隊(duì)列非空)為真時(shí),才會(huì)解除阻塞。 this->condition.wait(lock, [this] { return (this->stop || !this->tasks.empty()); }); /* 傳統(tǒng)忙等待:while (!(stop || !tasks.empty())) {} // 空循環(huán)消耗CPU */ if (this->stop && this->tasks.empty()) { //如果線程池需要終止且任務(wù)隊(duì)列為空則直接return return; } //任務(wù)提?。簭年?duì)列中取出任務(wù)并執(zhí)行,使用std::move避免拷貝開(kāi)銷。 task = std::move(this->tasks.front()); this->tasks.pop(); } //執(zhí)行任務(wù) task(); } }); } } //任務(wù)提交(enqueue方法) template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; //任務(wù)封裝:使用std::packaged_task包裝用戶任務(wù),支持異步返回結(jié)果。 //智能指針管理:shared_ptr確保任務(wù)對(duì)象的生命周期延續(xù)至執(zhí)行完畢。 //完美轉(zhuǎn)發(fā):通過(guò)std::forward保持參數(shù)的左值/右值特性。 auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task]() { (*task)(); }); /* push傳入的對(duì)象需要事先構(gòu)造好,再?gòu)?fù)制過(guò)去插入容器中; 而emplace則可以自己使用構(gòu)造函數(shù)所需的參數(shù)構(gòu)造出對(duì)象,并直接插入容器中。 emplace相比于push省去了復(fù)制的步驟,則使用emplace會(huì)更加節(jié)省內(nèi)存。*/ } condition.notify_one(); return res; } ~ThreadPool() { //設(shè)置stop標(biāo)志,喚醒所有線程,等待任務(wù)隊(duì)列清空。 { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } private: std::vector<std::thread> workers; //存儲(chǔ)工作線程對(duì)象 std::queue<std::function<void()>> tasks; //任務(wù)隊(duì)列,存儲(chǔ)待執(zhí)行的任務(wù) std::mutex queue_mutex; //保護(hù)任務(wù)隊(duì)列的互斥鎖 std::condition_variable condition; //線程間同步的條件變量 bool stop; //線程池是否停止標(biāo)志 };
三、線程池源碼解析
1. 成員變量
std::vector<std::thread> workers; // 工作線程容器 std::queue<std::function<void()>> tasks; // 任務(wù)隊(duì)列 std::mutex queue_mutex; // 隊(duì)列互斥鎖 std::condition_variable condition; // 條件變量 bool stop; // 停機(jī)標(biāo)志
設(shè)計(jì)要點(diǎn):
采用生產(chǎn)者-消費(fèi)者模式,任務(wù)隊(duì)列作為共享資源
組合使用
mutex
+condition_variable
實(shí)現(xiàn)線程同步vector
存儲(chǔ)線程對(duì)象便于統(tǒng)一管理生命周期
2. 構(gòu)造函數(shù)
2.1 線程初始化
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { if (threads == 0) { threads = 1; } for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { /* 工作線程邏輯 */ }); } }
設(shè)計(jì)要點(diǎn):
explicit
防止隱式類型轉(zhuǎn)換(如ThreadPool pool = 4;
)默認(rèn)使用硬件并發(fā)線程數(shù)(通過(guò)
hardware_concurrency()
)最少創(chuàng)建1個(gè)線程避免空池
使用
emplace_back
直接構(gòu)造線程對(duì)象
2.2 工作線程邏輯
for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); }
核心機(jī)制:
unique_lock
配合條件變量實(shí)現(xiàn)自動(dòng)鎖管理雙重狀態(tài)檢查(停機(jī)標(biāo)志+隊(duì)列非空)
任務(wù)提取使用移動(dòng)語(yǔ)義避免拷貝
任務(wù)執(zhí)行在鎖作用域外進(jìn)行
3. 任務(wù)提交(enqueue方法)
3.1 方法簽名
template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
類型推導(dǎo):
- 使用尾置返回類型聲明
std::result_of
推導(dǎo)可調(diào)用對(duì)象的返回類型- 完美轉(zhuǎn)發(fā)參數(shù)(
F&&
+Args&&...
)
3.2 任務(wù)封裝
auto task = std::make_shared<std::packaged_task<return_type()>> (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
封裝策略:
packaged_task
包裝任務(wù)用于異步獲取結(jié)果shared_ptr
管理任務(wù)對(duì)象生命周期std::bind
綁定參數(shù)(注意C++11的參數(shù)轉(zhuǎn)發(fā)限制)
3.3 任務(wù)入隊(duì)
tasks.emplace([task]() { (*task)(); });
優(yōu)化點(diǎn):
- 使用
emplace
直接構(gòu)造隊(duì)列元素 Lambda
捕獲shared_ptr
保持任務(wù)有效性- 顯式解引用執(zhí)行
packaged_task
4. 析構(gòu)函數(shù)
4.1 停機(jī)控制
~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (auto& worker : workers) { worker.join(); } }
停機(jī)協(xié)議:
- 設(shè)置停機(jī)標(biāo)志原子操作
- 廣播喚醒所有等待線程
- 等待所有工作線程退出
5. 關(guān)鍵技術(shù)點(diǎn)解析
5.1 完美轉(zhuǎn)發(fā)實(shí)現(xiàn)
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- 保持參數(shù)的左右值特性
- 支持移動(dòng)語(yǔ)義參數(shù)的傳遞
- C++11的限制:無(wú)法完美轉(zhuǎn)發(fā)所有參數(shù)類型
5.2 異常傳播機(jī)制
- 任務(wù)異常通過(guò)
future
對(duì)象傳播 packaged_task
自動(dòng)捕獲異常- 用戶通過(guò)
future.get()
獲取異常
5.3 內(nèi)存管理模型
[任務(wù)提交者] | v [packaged_task] <---- shared_ptr ---- [任務(wù)隊(duì)列] | v [future]
- 三重生命周期保障:
提交者持有
future
隊(duì)列持有任務(wù)包裝器
工作線程執(zhí)行任務(wù)
四、 性能特征分析
1. 時(shí)間復(fù)雜度
操作 | 時(shí)間復(fù)雜度 |
---|---|
任務(wù)提交(enqueue) | O(1)(加鎖開(kāi)銷) |
任務(wù)提取 | O(1) |
線程喚醒 | 取決于系統(tǒng)調(diào)度 |
2. 空間復(fù)雜度
組件 | 空間占用 |
---|---|
線程棧 | 每線程MB級(jí) |
任務(wù)隊(duì)列 | 與任務(wù)數(shù)成正比 |
同步原語(yǔ) | 固定大小 |
五、 擴(kuò)展優(yōu)化方向
1. 任務(wù)竊取(Work Stealing)
- 實(shí)現(xiàn)多個(gè)任務(wù)隊(duì)列
- 空閑線程從其他隊(duì)列竊取任務(wù)
2. 動(dòng)態(tài)線程池
void adjust_workers(size_t new_size) { if (new_size > workers.size()) { // 擴(kuò)容邏輯 } else { // 縮容邏輯 } }
3. 優(yōu)先級(jí)隊(duì)列
using Task = std::pair<int, std::function<void()>>; // 優(yōu)先級(jí)+任務(wù) std::priority_queue<Task> tasks;
4. 無(wú)鎖隊(duì)列
moodycamel::ConcurrentQueue<std::function<void()>> tasks;
六、 典型問(wèn)題排查指南
現(xiàn)象 | 可能原因 | 解決方案 |
---|---|---|
任務(wù)未執(zhí)行 | 線程池提前析構(gòu) | 延長(zhǎng)線程池生命周期 |
future.get() 永久阻塞 | 任務(wù)未提交/異常未處理 | 檢查任務(wù)提交路徑 |
CPU利用率100% | 忙等待或鎖競(jìng)爭(zhēng) | 優(yōu)化任務(wù)粒度/使用無(wú)鎖結(jié)構(gòu) |
內(nèi)存持續(xù)增長(zhǎng) | 任務(wù)對(duì)象未正確釋放 | 檢查智能指針使用 |
該實(shí)現(xiàn)完整展現(xiàn)了現(xiàn)代C++線程池的核心設(shè)計(jì)范式,開(kāi)發(fā)者可根據(jù)具體需求在此基礎(chǔ)進(jìn)行功能擴(kuò)展和性能優(yōu)化。理解這個(gè)代碼結(jié)構(gòu)是掌握更高級(jí)并發(fā)模式的基礎(chǔ)。
七、 測(cè)試用例
使用實(shí)例(C++11兼容):
#include <iostream> int main() { ThreadPool pool(4); // 提交普通函數(shù) auto future1 = pool.enqueue([](int a, int b) { return a + b; }, 2, 3); // 提交成員函數(shù) struct Calculator { int multiply(int a, int b) { return a * b; } } calc; auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc, std::placeholders::_1, std::placeholders::_2), 4, 5); // 異常處理示例 auto future3 = pool.enqueue([]() -> int { throw std::runtime_error("example error"); return 1; }); std::cout << "2+3=" << future1.get() << std::endl; std::cout << "4*5=" << future2.get() << std::endl; try { future3.get(); } catch(const std::exception& e) { std::cout << "Caught exception: " << e.what() << std::endl; } return 0; }
到此這篇關(guān)于C++線程池實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)C++線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 基于C++17實(shí)現(xiàn)的手寫(xiě)線程池
- 基于C++11實(shí)現(xiàn)手寫(xiě)線程池的示例代碼
- C++ 學(xué)習(xí)筆記實(shí)戰(zhàn)寫(xiě)一個(gè)簡(jiǎn)單的線程池示例
- C++單例模式實(shí)現(xiàn)線程池的示例代碼
- C++實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池的示例代碼
- C++線程池實(shí)現(xiàn)代碼
- C/C++ 原生API實(shí)現(xiàn)線程池的方法
- C++11 簡(jiǎn)單實(shí)現(xiàn)線程池的方法
- C++實(shí)現(xiàn)線程池的簡(jiǎn)單方法示例
- 深入解析C++編程中線程池的使用
- c++實(shí)現(xiàn)簡(jiǎn)單的線程池
- c++線程池實(shí)現(xiàn)方法
相關(guān)文章
c語(yǔ)言string.h頭文件中所有函數(shù)示例詳解
這篇文章詳細(xì)介紹了C語(yǔ)言標(biāo)準(zhǔn)庫(kù)中的字符串和內(nèi)存操作函數(shù),以str開(kāi)頭的字符串處理函數(shù)和以mem開(kāi)頭的內(nèi)存處理函數(shù),每種函數(shù)都有詳細(xì)的原型、功能描述和示例代碼,需要的朋友可以參考下2024-11-11淺析C++中strlen函數(shù)的使用與模擬實(shí)現(xiàn)strlen的方法
這篇文章主要介紹了strlen函數(shù)的使用與模擬實(shí)現(xiàn)strlen的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03C++實(shí)現(xiàn)經(jīng)典24點(diǎn)紙牌益智游戲
這篇文章主要介紹了C++實(shí)現(xiàn)經(jīng)典24點(diǎn)紙牌益智游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-03-03C++實(shí)現(xiàn)打印虛函數(shù)表的地址
對(duì)于存在虛函數(shù)的類,如何打印虛函數(shù)表的地址,并利用這個(gè)虛函數(shù)表的地址來(lái)執(zhí)行該類中的虛函數(shù)呢,下面小編就來(lái)和大家一起簡(jiǎn)單聊聊吧2023-07-07在C語(yǔ)言里單引號(hào)和雙引號(hào)的區(qū)別
這篇文章主要介紹了在C語(yǔ)言里單引號(hào)和雙引號(hào)的區(qū)別,本文通過(guò)代碼的實(shí)例和注釋的詳細(xì)的說(shuō)明了單引號(hào)和雙引號(hào)的概念與區(qū)別,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07使用C++ Matlab中的lp2lp函數(shù)教程詳解
本文介紹如何使用C++編寫(xiě)數(shù)字濾波器設(shè)計(jì)算法,實(shí)現(xiàn)Matlab中的lp2lp函數(shù),將低通濾波器轉(zhuǎn)換為參數(shù)化的低通濾波器,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2023-04-04