使用C++實(shí)現(xiàn)一個(gè)高效的線程池
在多線程編程中,線程池是一種常見(jiàn)且高效的設(shè)計(jì)模式。它通過(guò)預(yù)先創(chuàng)建一定數(shù)量的線程來(lái)處理任務(wù),從而避免頻繁創(chuàng)建和銷毀線程帶來(lái)的性能開(kāi)銷。本文將詳細(xì)介紹如何使用C++實(shí)現(xiàn)一個(gè)線程池,并解析相關(guān)代碼實(shí)現(xiàn)細(xì)節(jié)。
線程池簡(jiǎn)介
線程池(Thread Pool)是一種管理和復(fù)用線程的機(jī)制。它通過(guò)維護(hù)一個(gè)線程集合,當(dāng)有任務(wù)需要執(zhí)行時(shí),從線程池中分配一個(gè)空閑線程來(lái)處理任務(wù),任務(wù)完成后線程歸還到池中。這樣可以顯著減少線程創(chuàng)建和銷毀的開(kāi)銷,提高系統(tǒng)的整體性能和響應(yīng)速度。
設(shè)計(jì)思路
本文實(shí)現(xiàn)的線程池主要包含兩個(gè)核心類:
- Thread類:封裝了單個(gè)線程的創(chuàng)建、啟動(dòng)和管理。
- ThreadPool類:管理多個(gè)線程,維護(hù)任務(wù)隊(duì)列,并調(diào)度任務(wù)給線程執(zhí)行。
線程池支持兩種模式:
MODE_CACHED:緩存模式,根據(jù)任務(wù)量動(dòng)態(tài)調(diào)整線程數(shù)量,適用于任務(wù)量不固定的場(chǎng)景。
MODE_FIXED:固定模式,線程數(shù)量固定,適用于任務(wù)量穩(wěn)定的場(chǎng)景。
Thread類實(shí)現(xiàn)
Thread類負(fù)責(zé)封裝單個(gè)線程的創(chuàng)建和管理。以下是Thread.h和Thread.cpp的實(shí)現(xiàn)。
Thread.h
#include <functional> #include <atomic> #include <cstdint> #include <thread> class Thread { public: using ThreadFunc = std::function<void(std::uint32_t)>; public: explicit Thread(ThreadFunc func); void join(); ~Thread(); void start(); [[nodiscard]] std::uint32_t getID() const; [[nodiscard]] static std::uint32_t getNumCreated(); Thread(const Thread &) = delete; Thread &operator=(const Thread &) = delete; private: ThreadFunc m_func; uint32_t m_threadID; std::thread m_thread; static std::atomic<uint32_t> m_numCreateThread; };
Thread.cpp
#include "Thread.h" std::atomic<uint32_t> Thread::m_numCreateThread(0); Thread::Thread(Thread::ThreadFunc func) : m_func(std::move(func)), m_threadID(m_numCreateThread.load()) { m_numCreateThread++; } void Thread::start() { m_thread = std::thread([this]() { m_func(m_threadID); }); m_thread.detach(); } uint32_t Thread::getID() const { return m_threadID; } uint32_t Thread::getNumCreated() { return Thread::m_numCreateThread.load(); } Thread::~Thread() { join(); } void Thread::join() { if (m_thread.joinable()) { m_thread.join(); } }
解析
成員變量:
- m_func:線程執(zhí)行的函數(shù)。
- m_threadID:線程的唯一標(biāo)識(shí)。
- m_thread:std::thread對(duì)象。
- m_numCreateThread:靜態(tài)原子變量,用于記錄已創(chuàng)建的線程數(shù)量。
構(gòu)造函數(shù):
接受一個(gè)函數(shù)作為參數(shù),并分配一個(gè)唯一的線程ID。
start方法:
啟動(dòng)線程,執(zhí)行傳入的函數(shù),并將線程設(shè)為分離狀態(tài),以便在線程結(jié)束時(shí)自動(dòng)回收資源。
join方法和析構(gòu)函數(shù):
如果線程可連接,則執(zhí)行join操作,確保線程資源的正確回收。
ThreadPool類實(shí)現(xiàn)
ThreadPool類負(fù)責(zé)管理多個(gè)線程,維護(hù)任務(wù)隊(duì)列,并調(diào)度任務(wù)給線程執(zhí)行。以下是ThreadPool.h和ThreadPool.cpp的實(shí)現(xiàn)。
ThreadPool.h
#include <mutex> #include <unordered_map> #include <memory> #include <functional> #include <queue> #include <iostream> #include <condition_variable> #include <future> #include <cstdint> #include "Thread.h" enum class THREAD_MODE { MODE_CACHED, MODE_FIXED, }; class ThreadPool { public: explicit ThreadPool(THREAD_MODE mode = THREAD_MODE::MODE_CACHED, std::uint32_t maxThreadSize = 1024, std::uint32_t initThreadSize = 4, std::uint32_t maxTaskSize = 1024); ~ThreadPool(); void setThreadMaxSize(uint32_t maxSize); void setMode(THREAD_MODE mode); void setTaskMaxSize(uint32_t maxSize); void start(uint32_t taskSize = std::thread::hardware_concurrency()); ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(const ThreadPool &) = delete; template<typename Func, typename ...Args> auto submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type>; protected: [[nodiscard]] bool checkState() const; void ThreadFun(uint32_t threadID); private: using Task = std::function<void()>; std::unordered_map<uint32_t, std::unique_ptr<Thread>> m_threads; uint32_t m_initThreadSize; // 初始線程數(shù)量 std::atomic<std::uint32_t> m_spareThreadSize; // 空閑線程數(shù)量 uint32_t m_maxThreadSize; // 最大線程數(shù)量 std::atomic<bool> m_isRunning; // 線程池運(yùn)行標(biāo)志 THREAD_MODE m_mode; // 線程池運(yùn)行模式 std::deque<Task> m_tasks; std::atomic<uint32_t> m_taskSize; uint32_t m_maxTaskSize; uint32_t m_thread_maxSpareTime; mutable std::mutex m_mutex; // 線程池互斥量 std::condition_variable m_notEmpty; std::condition_variable m_notFull; std::condition_variable m_isExit; };
ThreadPool.cpp
#include "ThreadPool.hpp" #include <thread> ThreadPool::ThreadPool(THREAD_MODE mode, uint32_t maxThreadSize, uint32_t initThreadSize, uint32_t maxTaskSize) : m_initThreadSize(initThreadSize), m_spareThreadSize(0), m_maxThreadSize(maxThreadSize), m_isRunning(false), m_mode(mode), m_taskSize(0), m_maxTaskSize(maxTaskSize), m_thread_maxSpareTime(60) { } bool ThreadPool::checkState() const { return m_isRunning; } void ThreadPool::setThreadMaxSize(uint32_t maxSize) { if (checkState()) std::cerr << "threadPool is running, cannot change!" << std::endl; else this->m_maxThreadSize = maxSize; } void ThreadPool::setMode(THREAD_MODE mode) { if (checkState()) std::cerr << "threadPool is running, cannot change!" << std::endl; else this->m_mode = mode; } void ThreadPool::setTaskMaxSize(uint32_t maxSize) { if (checkState()) std::cerr << "threadPool is running, cannot change!" << std::endl; else this->m_maxTaskSize = maxSize; } void ThreadPool::ThreadFun(uint32_t threadID) { auto last_time = std::chrono::high_resolution_clock::now(); for (;;) { Task task; { std::unique_lock<std::mutex> lock(m_mutex); std::cout << "threadID: " << threadID << " trying to get a task" << std::endl; while (m_tasks.empty() && m_isRunning) { if (m_mode == THREAD_MODE::MODE_CACHED && m_threads.size() > m_initThreadSize) { if (m_notEmpty.wait_for(lock, std::chrono::seconds(3)) == std::cv_status::timeout) { auto now_time = std::chrono::high_resolution_clock::now(); auto dur_time = std::chrono::duration_cast<std::chrono::seconds>(now_time - last_time); if (dur_time.count() > m_thread_maxSpareTime && m_threads.size() > m_initThreadSize) { m_threads.erase(threadID); m_spareThreadSize--; std::cout << "threadID: " << threadID << " exiting due to inactivity!" << std::endl; return; } } } else { m_notEmpty.wait(lock); } } if (!m_isRunning && m_tasks.empty()) { m_threads.erase(threadID); std::cout << "threadID: " << threadID << " exiting!" << std::endl; m_isExit.notify_all(); return; } if (!m_tasks.empty()) { m_spareThreadSize--; task = std::move(m_tasks.front()); m_tasks.pop_front(); std::cout << "threadID: " << threadID << " retrieved a task!" << std::endl; if (!m_tasks.empty()) m_notEmpty.notify_all(); m_notFull.notify_all(); } } if (task) { try { task(); } catch (const std::exception &e) { std::cerr << "Exception in task: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown exception in task." << std::endl; } std::cout << "threadID: " << threadID << " completed a task." << std::endl; m_spareThreadSize++; last_time = std::chrono::high_resolution_clock::now(); } } } void ThreadPool::start(std::uint32_t taskSize) { m_isRunning = true; for (std::uint32_t i = 0; i < taskSize; ++i) { auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1)); auto threadID = ptr->getID(); m_threads.emplace(threadID, std::move(ptr)); } for (auto &it: m_threads) { it.second->start(); m_spareThreadSize++; } } ThreadPool::~ThreadPool() { m_isRunning = false; std::unique_lock<std::mutex> lock(m_mutex); m_notEmpty.notify_all(); m_notFull.notify_all(); m_isExit.wait(lock, [&]() -> bool { return m_threads.empty(); }); }
submitTask模板方法實(shí)現(xiàn)
template<typename Func, typename ...Args> auto ThreadPool::submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type> { using Rtype = typename std::invoke_result<Func, Args...>::type; auto task = std::make_shared<std::packaged_task<Rtype()>>( std::bind(std::forward<Func>(func), std::forward<Args>(args)...)); std::future<Rtype> result = task->get_future(); std::unique_lock lock(m_mutex); if (!m_notFull.wait_for(lock, std::chrono::seconds(3), [&]() -> bool { return m_tasks.size() < m_maxTaskSize; })) { std::cerr << "Task queue is full, submit task failed!" << std::endl; throw std::runtime_error("Task queue is full"); } m_tasks.emplace_back([task] { (*task)(); }); m_notEmpty.notify_all(); if (m_mode == THREAD_MODE::MODE_CACHED && m_tasks.size() > m_spareThreadSize) { if (m_threads.size() >= m_maxThreadSize) { std::cerr << "Thread pool has reached max size, cannot create new thread!" << std::endl; } else { std::cout << "Creating a new thread!" << std::endl; auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1)); u_int64_t threadID = ptr->getID(); m_threads.emplace(threadID, std::move(ptr)); m_threads[threadID]->start(); ++m_spareThreadSize; } } return result; }
解析
成員變量:
- m_threads:存儲(chǔ)所有線程的集合。
- m_tasks:任務(wù)隊(duì)列,存儲(chǔ)待執(zhí)行的任務(wù)。
- m_mutex、m_notEmpty、m_notFull、m_isExit:用于線程同步和任務(wù)調(diào)度的互斥量和條件變量。
- 其他變量用于控制線程池的狀態(tài),如最大線程數(shù)、初始線程數(shù)、任務(wù)隊(duì)列最大長(zhǎng)度等。
構(gòu)造函數(shù):
初始化線程池的各項(xiàng)參數(shù),如模式、最大線程數(shù)、初始線程數(shù)、最大任務(wù)數(shù)等。
start方法:
啟動(dòng)線程池,創(chuàng)建初始數(shù)量的線程,并將其啟動(dòng)。
submitTask模板方法:
- 提交任務(wù)到線程池,支持任意可調(diào)用對(duì)象。
- 使用std::packaged_task和std::future實(shí)現(xiàn)任務(wù)的異步執(zhí)行和結(jié)果獲取。
- 如果任務(wù)隊(duì)列已滿,則在指定時(shí)間內(nèi)等待,若仍滿則拋出異常。
- 在緩存模式下,根據(jù)任務(wù)量動(dòng)態(tài)創(chuàng)建新線程。
ThreadFun方法:
- 線程的工作函數(shù),從任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。
- 在緩存模式下,線程在空閑一定時(shí)間后會(huì)自動(dòng)退出,降低資源占用。
析構(gòu)函數(shù):
關(guān)閉線程池,通知所有線程退出,并等待所有線程結(jié)束。
線程池的使用
以下是一個(gè)簡(jiǎn)單的示例,展示如何使用上述實(shí)現(xiàn)的線程池。
#include "ThreadPool.h" #include <iostream> #include <chrono> // 示例任務(wù)函數(shù) void exampleTask(int n) { std::cout << "Task " << n << " is starting." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "Task " << n << " is completed." << std::endl; } int main() { // 創(chuàng)建線程池,使用緩存模式,最大線程數(shù)為8,初始線程數(shù)為4,最大任務(wù)數(shù)為16 ThreadPool pool(THREAD_MODE::MODE_CACHED, 8, 4, 16); pool.start(); // 提交多個(gè)任務(wù) std::vector<std::future<void>> futures; for (int i = 0; i < 10; ++i) { futures.emplace_back(pool.submitTask(exampleTask, i)); } // 等待所有任務(wù)完成 for (auto &fut : futures) { fut.get(); } std::cout << "All tasks have been completed." << std::endl; return 0; }
運(yùn)行結(jié)果
threadID: 0 trying to get a task
threadID: 1 trying to get a task
threadID: 2 trying to get a task
threadID: 3 trying to get a task
Task 0 is starting.
Task 1 is starting.
Task 2 is starting.
Task 3 is starting.
threadID: 0 completed a task.
threadID: 0 trying to get a task
Task 4 is starting.
threadID: 1 completed a task.
threadID: 1 trying to get a task
Task 5 is starting.
...
All tasks have been completed.
以上就是使用C++實(shí)現(xiàn)一個(gè)高效的線程池的詳細(xì)內(nèi)容,更多關(guān)于C++線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
一篇文章帶你了解C++(STL基礎(chǔ)、Vector)
這篇文章主要為大家詳細(xì)介紹了C++ STL基礎(chǔ),vector向量容器使用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來(lái)幫助2021-08-08使用C++的inipp庫(kù)處理配置文件.ini的示例詳解
一個(gè)ini文件由多個(gè)節(jié)section組成,每個(gè)節(jié)由多個(gè)鍵值對(duì)組成,本文給大家介紹了使用第三方庫(kù)inipp來(lái)操作ini文件,文中通過(guò)代碼示例講解的非常詳細(xì),需要的朋友可以參考下2024-01-01C++編程中用put輸出單個(gè)字符和cin輸入流的用法
這篇文章主要介紹了C++編程中用put輸出單個(gè)字符和cin輸入流的用法,是C++入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-09-09C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序
這篇文章主要為大家詳細(xì)介紹了C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序的相關(guān)資料,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06C++ ffmpeg硬件解碼的實(shí)現(xiàn)方法
這篇文章主要介紹了C++ ffmpeg硬件解碼的實(shí)現(xiàn),對(duì)FFmpeg多媒體解決方案中的視頻編解碼流程進(jìn)行研究。為嵌入式多媒體開(kāi)發(fā)提供參考,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串)
這篇文章主要介紹了C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串),本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07