使用C++實(shí)現(xiàn)一個(gè)高效的線程池
在多線程編程中,線程池是一種常見且高效的設(shè)計(jì)模式。它通過預(yù)先創(chuàng)建一定數(shù)量的線程來處理任務(wù),從而避免頻繁創(chuàng)建和銷毀線程帶來的性能開銷。本文將詳細(xì)介紹如何使用C++實(shí)現(xiàn)一個(gè)線程池,并解析相關(guān)代碼實(shí)現(xiàn)細(xì)節(jié)。
線程池簡介
線程池(Thread Pool)是一種管理和復(fù)用線程的機(jī)制。它通過維護(hù)一個(gè)線程集合,當(dāng)有任務(wù)需要執(zhí)行時(shí),從線程池中分配一個(gè)空閑線程來處理任務(wù),任務(wù)完成后線程歸還到池中。這樣可以顯著減少線程創(chuàng)建和銷毀的開銷,提高系統(tǒng)的整體性能和響應(yīng)速度。
設(shè)計(jì)思路
本文實(shí)現(xiàn)的線程池主要包含兩個(gè)核心類:
- Thread類:封裝了單個(gè)線程的創(chuàng)建、啟動和管理。
- ThreadPool類:管理多個(gè)線程,維護(hù)任務(wù)隊(duì)列,并調(diào)度任務(wù)給線程執(zhí)行。
線程池支持兩種模式:
MODE_CACHED:緩存模式,根據(jù)任務(wù)量動態(tài)調(diào)整線程數(shù)量,適用于任務(wù)量不固定的場景。
MODE_FIXED:固定模式,線程數(shù)量固定,適用于任務(wù)量穩(wěn)定的場景。
Thread類實(shí)現(xiàn)
Thread類負(fù)責(zé)封裝單個(gè)線程的創(chuàng)建和管理。以下是Thread.h和Thread.cpp的實(shí)現(xiàn)。
Thread.h
#include <functional>
#include <atomic>
#include <cstdint>
#include <thread>
class Thread {
public:
using ThreadFunc = std::function<void(std::uint32_t)>;
public:
explicit Thread(ThreadFunc func);
void join();
~Thread();
void start();
[[nodiscard]] std::uint32_t getID() const;
[[nodiscard]] static std::uint32_t getNumCreated();
Thread(const Thread &) = delete;
Thread &operator=(const Thread &) = delete;
private:
ThreadFunc m_func;
uint32_t m_threadID;
std::thread m_thread;
static std::atomic<uint32_t> m_numCreateThread;
};
Thread.cpp
#include "Thread.h"
std::atomic<uint32_t> Thread::m_numCreateThread(0);
Thread::Thread(Thread::ThreadFunc func) : m_func(std::move(func)), m_threadID(m_numCreateThread.load()) {
m_numCreateThread++;
}
void Thread::start() {
m_thread = std::thread([this]() {
m_func(m_threadID);
});
m_thread.detach();
}
uint32_t Thread::getID() const {
return m_threadID;
}
uint32_t Thread::getNumCreated() {
return Thread::m_numCreateThread.load();
}
Thread::~Thread() {
join();
}
void Thread::join() {
if (m_thread.joinable()) {
m_thread.join();
}
}
解析
成員變量:
- m_func:線程執(zhí)行的函數(shù)。
- m_threadID:線程的唯一標(biāo)識。
- m_thread:std::thread對象。
- m_numCreateThread:靜態(tài)原子變量,用于記錄已創(chuàng)建的線程數(shù)量。
構(gòu)造函數(shù):
接受一個(gè)函數(shù)作為參數(shù),并分配一個(gè)唯一的線程ID。
start方法:
啟動線程,執(zhí)行傳入的函數(shù),并將線程設(shè)為分離狀態(tài),以便在線程結(jié)束時(shí)自動回收資源。
join方法和析構(gòu)函數(shù):
如果線程可連接,則執(zhí)行join操作,確保線程資源的正確回收。
ThreadPool類實(shí)現(xiàn)
ThreadPool類負(fù)責(zé)管理多個(gè)線程,維護(hù)任務(wù)隊(duì)列,并調(diào)度任務(wù)給線程執(zhí)行。以下是ThreadPool.h和ThreadPool.cpp的實(shí)現(xiàn)。
ThreadPool.h
#include <mutex>
#include <unordered_map>
#include <memory>
#include <functional>
#include <queue>
#include <iostream>
#include <condition_variable>
#include <future>
#include <cstdint>
#include "Thread.h"
enum class THREAD_MODE {
MODE_CACHED,
MODE_FIXED,
};
class ThreadPool {
public:
explicit ThreadPool(THREAD_MODE mode = THREAD_MODE::MODE_CACHED, std::uint32_t maxThreadSize = 1024,
std::uint32_t initThreadSize = 4, std::uint32_t maxTaskSize = 1024);
~ThreadPool();
void setThreadMaxSize(uint32_t maxSize);
void setMode(THREAD_MODE mode);
void setTaskMaxSize(uint32_t maxSize);
void start(uint32_t taskSize = std::thread::hardware_concurrency());
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
template<typename Func, typename ...Args>
auto submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type>;
protected:
[[nodiscard]] bool checkState() const;
void ThreadFun(uint32_t threadID);
private:
using Task = std::function<void()>;
std::unordered_map<uint32_t, std::unique_ptr<Thread>> m_threads;
uint32_t m_initThreadSize; // 初始線程數(shù)量
std::atomic<std::uint32_t> m_spareThreadSize; // 空閑線程數(shù)量
uint32_t m_maxThreadSize; // 最大線程數(shù)量
std::atomic<bool> m_isRunning; // 線程池運(yùn)行標(biāo)志
THREAD_MODE m_mode; // 線程池運(yùn)行模式
std::deque<Task> m_tasks;
std::atomic<uint32_t> m_taskSize;
uint32_t m_maxTaskSize;
uint32_t m_thread_maxSpareTime;
mutable std::mutex m_mutex; // 線程池互斥量
std::condition_variable m_notEmpty;
std::condition_variable m_notFull;
std::condition_variable m_isExit;
};
ThreadPool.cpp
#include "ThreadPool.hpp"
#include <thread>
ThreadPool::ThreadPool(THREAD_MODE mode, uint32_t maxThreadSize, uint32_t initThreadSize,
uint32_t maxTaskSize) : m_initThreadSize(initThreadSize), m_spareThreadSize(0),
m_maxThreadSize(maxThreadSize), m_isRunning(false),
m_mode(mode), m_taskSize(0), m_maxTaskSize(maxTaskSize),
m_thread_maxSpareTime(60) {
}
bool ThreadPool::checkState() const {
return m_isRunning;
}
void ThreadPool::setThreadMaxSize(uint32_t maxSize) {
if (checkState())
std::cerr << "threadPool is running, cannot change!" << std::endl;
else
this->m_maxThreadSize = maxSize;
}
void ThreadPool::setMode(THREAD_MODE mode) {
if (checkState())
std::cerr << "threadPool is running, cannot change!" << std::endl;
else
this->m_mode = mode;
}
void ThreadPool::setTaskMaxSize(uint32_t maxSize) {
if (checkState())
std::cerr << "threadPool is running, cannot change!" << std::endl;
else
this->m_maxTaskSize = maxSize;
}
void ThreadPool::ThreadFun(uint32_t threadID) {
auto last_time = std::chrono::high_resolution_clock::now();
for (;;) {
Task task;
{
std::unique_lock<std::mutex> lock(m_mutex);
std::cout << "threadID: " << threadID << " trying to get a task" << std::endl;
while (m_tasks.empty() && m_isRunning) {
if (m_mode == THREAD_MODE::MODE_CACHED && m_threads.size() > m_initThreadSize) {
if (m_notEmpty.wait_for(lock, std::chrono::seconds(3)) == std::cv_status::timeout) {
auto now_time = std::chrono::high_resolution_clock::now();
auto dur_time = std::chrono::duration_cast<std::chrono::seconds>(now_time - last_time);
if (dur_time.count() > m_thread_maxSpareTime && m_threads.size() > m_initThreadSize) {
m_threads.erase(threadID);
m_spareThreadSize--;
std::cout << "threadID: " << threadID << " exiting due to inactivity!" << std::endl;
return;
}
}
} else {
m_notEmpty.wait(lock);
}
}
if (!m_isRunning && m_tasks.empty()) {
m_threads.erase(threadID);
std::cout << "threadID: " << threadID << " exiting!" << std::endl;
m_isExit.notify_all();
return;
}
if (!m_tasks.empty()) {
m_spareThreadSize--;
task = std::move(m_tasks.front());
m_tasks.pop_front();
std::cout << "threadID: " << threadID << " retrieved a task!" << std::endl;
if (!m_tasks.empty())
m_notEmpty.notify_all();
m_notFull.notify_all();
}
}
if (task) {
try {
task();
} catch (const std::exception &e) {
std::cerr << "Exception in task: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown exception in task." << std::endl;
}
std::cout << "threadID: " << threadID << " completed a task." << std::endl;
m_spareThreadSize++;
last_time = std::chrono::high_resolution_clock::now();
}
}
}
void ThreadPool::start(std::uint32_t taskSize) {
m_isRunning = true;
for (std::uint32_t i = 0; i < taskSize; ++i) {
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
auto threadID = ptr->getID();
m_threads.emplace(threadID, std::move(ptr));
}
for (auto &it: m_threads) {
it.second->start();
m_spareThreadSize++;
}
}
ThreadPool::~ThreadPool() {
m_isRunning = false;
std::unique_lock<std::mutex> lock(m_mutex);
m_notEmpty.notify_all();
m_notFull.notify_all();
m_isExit.wait(lock, [&]() -> bool { return m_threads.empty(); });
}
submitTask模板方法實(shí)現(xiàn)
template<typename Func, typename ...Args>
auto ThreadPool::submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type> {
using Rtype = typename std::invoke_result<Func, Args...>::type;
auto task = std::make_shared<std::packaged_task<Rtype()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<Rtype> result = task->get_future();
std::unique_lock lock(m_mutex);
if (!m_notFull.wait_for(lock, std::chrono::seconds(3),
[&]() -> bool { return m_tasks.size() < m_maxTaskSize; })) {
std::cerr << "Task queue is full, submit task failed!" << std::endl;
throw std::runtime_error("Task queue is full");
}
m_tasks.emplace_back([task] { (*task)(); });
m_notEmpty.notify_all();
if (m_mode == THREAD_MODE::MODE_CACHED && m_tasks.size() > m_spareThreadSize) {
if (m_threads.size() >= m_maxThreadSize) {
std::cerr << "Thread pool has reached max size, cannot create new thread!" << std::endl;
} else {
std::cout << "Creating a new thread!" << std::endl;
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
u_int64_t threadID = ptr->getID();
m_threads.emplace(threadID, std::move(ptr));
m_threads[threadID]->start();
++m_spareThreadSize;
}
}
return result;
}
解析
成員變量:
- m_threads:存儲所有線程的集合。
- m_tasks:任務(wù)隊(duì)列,存儲待執(zhí)行的任務(wù)。
- m_mutex、m_notEmpty、m_notFull、m_isExit:用于線程同步和任務(wù)調(diào)度的互斥量和條件變量。
- 其他變量用于控制線程池的狀態(tài),如最大線程數(shù)、初始線程數(shù)、任務(wù)隊(duì)列最大長度等。
構(gòu)造函數(shù):
初始化線程池的各項(xiàng)參數(shù),如模式、最大線程數(shù)、初始線程數(shù)、最大任務(wù)數(shù)等。
start方法:
啟動線程池,創(chuàng)建初始數(shù)量的線程,并將其啟動。
submitTask模板方法:
- 提交任務(wù)到線程池,支持任意可調(diào)用對象。
- 使用std::packaged_task和std::future實(shí)現(xiàn)任務(wù)的異步執(zhí)行和結(jié)果獲取。
- 如果任務(wù)隊(duì)列已滿,則在指定時(shí)間內(nèi)等待,若仍滿則拋出異常。
- 在緩存模式下,根據(jù)任務(wù)量動態(tài)創(chuàng)建新線程。
ThreadFun方法:
- 線程的工作函數(shù),從任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。
- 在緩存模式下,線程在空閑一定時(shí)間后會自動退出,降低資源占用。
析構(gòu)函數(shù):
關(guān)閉線程池,通知所有線程退出,并等待所有線程結(jié)束。
線程池的使用
以下是一個(gè)簡單的示例,展示如何使用上述實(shí)現(xiàn)的線程池。
#include "ThreadPool.h"
#include <iostream>
#include <chrono>
// 示例任務(wù)函數(shù)
void exampleTask(int n) {
std::cout << "Task " << n << " is starting." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Task " << n << " is completed." << std::endl;
}
int main() {
// 創(chuàng)建線程池,使用緩存模式,最大線程數(shù)為8,初始線程數(shù)為4,最大任務(wù)數(shù)為16
ThreadPool pool(THREAD_MODE::MODE_CACHED, 8, 4, 16);
pool.start();
// 提交多個(gè)任務(wù)
std::vector<std::future<void>> futures;
for (int i = 0; i < 10; ++i) {
futures.emplace_back(pool.submitTask(exampleTask, i));
}
// 等待所有任務(wù)完成
for (auto &fut : futures) {
fut.get();
}
std::cout << "All tasks have been completed." << std::endl;
return 0;
}
運(yùn)行結(jié)果
threadID: 0 trying to get a task
threadID: 1 trying to get a task
threadID: 2 trying to get a task
threadID: 3 trying to get a task
Task 0 is starting.
Task 1 is starting.
Task 2 is starting.
Task 3 is starting.
threadID: 0 completed a task.
threadID: 0 trying to get a task
Task 4 is starting.
threadID: 1 completed a task.
threadID: 1 trying to get a task
Task 5 is starting.
...
All tasks have been completed.
以上就是使用C++實(shí)現(xiàn)一個(gè)高效的線程池的詳細(xì)內(nèi)容,更多關(guān)于C++線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
一篇文章帶你了解C++(STL基礎(chǔ)、Vector)
這篇文章主要為大家詳細(xì)介紹了C++ STL基礎(chǔ),vector向量容器使用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來幫助2021-08-08
C++編程中用put輸出單個(gè)字符和cin輸入流的用法
這篇文章主要介紹了C++編程中用put輸出單個(gè)字符和cin輸入流的用法,是C++入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下2015-09-09
C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序
這篇文章主要為大家詳細(xì)介紹了C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序的相關(guān)資料,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
C++ ffmpeg硬件解碼的實(shí)現(xiàn)方法
這篇文章主要介紹了C++ ffmpeg硬件解碼的實(shí)現(xiàn),對FFmpeg多媒體解決方案中的視頻編解碼流程進(jìn)行研究。為嵌入式多媒體開發(fā)提供參考,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08
C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串)
這篇文章主要介紹了C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串),本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07

