欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用C++實(shí)現(xiàn)一個(gè)高效的線程池

 更新時(shí)間:2024年12月31日 09:39:25   作者:過(guò)過(guò)過(guò)呀Glik  
在多線程編程中,線程池是一種常見(jiàn)且高效的設(shè)計(jì)模式,本文將詳細(xì)介紹如何使用C++實(shí)現(xiàn)一個(gè)線程池,并解析相關(guān)代碼實(shí)現(xiàn)細(xì)節(jié),需要的小伙伴可以參考下

在多線程編程中,線程池是一種常見(jiàn)且高效的設(shè)計(jì)模式。它通過(guò)預(yù)先創(chuàng)建一定數(shù)量的線程來(lái)處理任務(wù),從而避免頻繁創(chuàng)建和銷毀線程帶來(lái)的性能開(kāi)銷。本文將詳細(xì)介紹如何使用C++實(shí)現(xiàn)一個(gè)線程池,并解析相關(guān)代碼實(shí)現(xiàn)細(xì)節(jié)。

線程池簡(jiǎn)介

線程池(Thread Pool)是一種管理和復(fù)用線程的機(jī)制。它通過(guò)維護(hù)一個(gè)線程集合,當(dāng)有任務(wù)需要執(zhí)行時(shí),從線程池中分配一個(gè)空閑線程來(lái)處理任務(wù),任務(wù)完成后線程歸還到池中。這樣可以顯著減少線程創(chuàng)建和銷毀的開(kāi)銷,提高系統(tǒng)的整體性能和響應(yīng)速度。

設(shè)計(jì)思路

本文實(shí)現(xiàn)的線程池主要包含兩個(gè)核心類:

  • Thread類:封裝了單個(gè)線程的創(chuàng)建、啟動(dòng)和管理。
  • ThreadPool類:管理多個(gè)線程,維護(hù)任務(wù)隊(duì)列,并調(diào)度任務(wù)給線程執(zhí)行。

線程池支持兩種模式:

MODE_CACHED:緩存模式,根據(jù)任務(wù)量動(dòng)態(tài)調(diào)整線程數(shù)量,適用于任務(wù)量不固定的場(chǎng)景。

MODE_FIXED:固定模式,線程數(shù)量固定,適用于任務(wù)量穩(wěn)定的場(chǎng)景。

Thread類實(shí)現(xiàn)

Thread類負(fù)責(zé)封裝單個(gè)線程的創(chuàng)建和管理。以下是Thread.h和Thread.cpp的實(shí)現(xiàn)。

Thread.h

#include <functional>
#include <atomic>
#include <cstdint>
#include <thread>

class Thread {
public:
    using ThreadFunc = std::function<void(std::uint32_t)>;

public:
    explicit Thread(ThreadFunc func);

    void join();

    ~Thread();

    void start();

    [[nodiscard]] std::uint32_t getID() const;

    [[nodiscard]] static std::uint32_t getNumCreated();

    Thread(const Thread &) = delete;

    Thread &operator=(const Thread &) = delete;

private:
    ThreadFunc m_func;
    uint32_t m_threadID;
    std::thread m_thread;
    static std::atomic<uint32_t> m_numCreateThread;
};

Thread.cpp

#include "Thread.h"

std::atomic<uint32_t> Thread::m_numCreateThread(0);

Thread::Thread(Thread::ThreadFunc func) : m_func(std::move(func)), m_threadID(m_numCreateThread.load()) {
    m_numCreateThread++;
}

void Thread::start() {
    m_thread = std::thread([this]() {
        m_func(m_threadID);
    });
    m_thread.detach();
}

uint32_t Thread::getID() const {
    return m_threadID;
}

uint32_t Thread::getNumCreated() {
    return Thread::m_numCreateThread.load();
}

Thread::~Thread() {
    join();
}

void Thread::join() {
    if (m_thread.joinable()) {
        m_thread.join();
    }
}

解析

成員變量:

  • m_func:線程執(zhí)行的函數(shù)。
  • m_threadID:線程的唯一標(biāo)識(shí)。
  • m_thread:std::thread對(duì)象。
  • m_numCreateThread:靜態(tài)原子變量,用于記錄已創(chuàng)建的線程數(shù)量。

構(gòu)造函數(shù):

接受一個(gè)函數(shù)作為參數(shù),并分配一個(gè)唯一的線程ID。

start方法:

啟動(dòng)線程,執(zhí)行傳入的函數(shù),并將線程設(shè)為分離狀態(tài),以便在線程結(jié)束時(shí)自動(dòng)回收資源。

join方法和析構(gòu)函數(shù):

如果線程可連接,則執(zhí)行join操作,確保線程資源的正確回收。

ThreadPool類實(shí)現(xiàn)

ThreadPool類負(fù)責(zé)管理多個(gè)線程,維護(hù)任務(wù)隊(duì)列,并調(diào)度任務(wù)給線程執(zhí)行。以下是ThreadPool.h和ThreadPool.cpp的實(shí)現(xiàn)。

ThreadPool.h

#include <mutex>
#include <unordered_map>
#include <memory>
#include <functional>
#include <queue>
#include <iostream>
#include <condition_variable>
#include <future>
#include <cstdint>
#include "Thread.h"

enum class THREAD_MODE {
    MODE_CACHED,
    MODE_FIXED,
};

class ThreadPool {
public:
    explicit ThreadPool(THREAD_MODE mode = THREAD_MODE::MODE_CACHED, std::uint32_t maxThreadSize = 1024,
                        std::uint32_t initThreadSize = 4, std::uint32_t maxTaskSize = 1024);

    ~ThreadPool();

    void setThreadMaxSize(uint32_t maxSize);
    void setMode(THREAD_MODE mode);
    void setTaskMaxSize(uint32_t maxSize);
    void start(uint32_t taskSize = std::thread::hardware_concurrency());

    ThreadPool(const ThreadPool &) = delete;
    ThreadPool &operator=(const ThreadPool &) = delete;

    template<typename Func, typename ...Args>
    auto submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type>;

protected:
    [[nodiscard]] bool checkState() const;
    void ThreadFun(uint32_t threadID);

private:
    using Task = std::function<void()>;

    std::unordered_map<uint32_t, std::unique_ptr<Thread>> m_threads;
    uint32_t m_initThreadSize; // 初始線程數(shù)量
    std::atomic<std::uint32_t> m_spareThreadSize; // 空閑線程數(shù)量
    uint32_t m_maxThreadSize; // 最大線程數(shù)量
    std::atomic<bool> m_isRunning; // 線程池運(yùn)行標(biāo)志
    THREAD_MODE m_mode; // 線程池運(yùn)行模式

    std::deque<Task> m_tasks;
    std::atomic<uint32_t> m_taskSize;
    uint32_t m_maxTaskSize;

    uint32_t m_thread_maxSpareTime;

    mutable std::mutex m_mutex; // 線程池互斥量
    std::condition_variable m_notEmpty;
    std::condition_variable m_notFull;
    std::condition_variable m_isExit;
};

ThreadPool.cpp

#include "ThreadPool.hpp"
#include <thread>

ThreadPool::ThreadPool(THREAD_MODE mode, uint32_t maxThreadSize, uint32_t initThreadSize,
                       uint32_t maxTaskSize) : m_initThreadSize(initThreadSize), m_spareThreadSize(0),
                                               m_maxThreadSize(maxThreadSize), m_isRunning(false), 
                                               m_mode(mode), m_taskSize(0), m_maxTaskSize(maxTaskSize), 
                                               m_thread_maxSpareTime(60) {
}

bool ThreadPool::checkState() const {
    return m_isRunning;
}

void ThreadPool::setThreadMaxSize(uint32_t maxSize) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_maxThreadSize = maxSize;
}

void ThreadPool::setMode(THREAD_MODE mode) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_mode = mode;
}

void ThreadPool::setTaskMaxSize(uint32_t maxSize) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_maxTaskSize = maxSize;
}

void ThreadPool::ThreadFun(uint32_t threadID) {
    auto last_time = std::chrono::high_resolution_clock::now();
    for (;;) {
        Task task;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            std::cout << "threadID: " << threadID << " trying to get a task" << std::endl;
            while (m_tasks.empty() && m_isRunning) {
                if (m_mode == THREAD_MODE::MODE_CACHED && m_threads.size() > m_initThreadSize) {
                    if (m_notEmpty.wait_for(lock, std::chrono::seconds(3)) == std::cv_status::timeout) {
                        auto now_time = std::chrono::high_resolution_clock::now();
                        auto dur_time = std::chrono::duration_cast<std::chrono::seconds>(now_time - last_time);
                        if (dur_time.count() > m_thread_maxSpareTime && m_threads.size() > m_initThreadSize) {
                            m_threads.erase(threadID);
                            m_spareThreadSize--;
                            std::cout << "threadID: " << threadID << " exiting due to inactivity!" << std::endl;
                            return;
                        }
                    }
                } else {
                    m_notEmpty.wait(lock);
                }
            }
            if (!m_isRunning && m_tasks.empty()) {
                m_threads.erase(threadID);
                std::cout << "threadID: " << threadID << " exiting!" << std::endl;
                m_isExit.notify_all();
                return;
            }

            if (!m_tasks.empty()) {
                m_spareThreadSize--;
                task = std::move(m_tasks.front());
                m_tasks.pop_front();
                std::cout << "threadID: " << threadID << " retrieved a task!" << std::endl;
                if (!m_tasks.empty())
                    m_notEmpty.notify_all();
                m_notFull.notify_all();
            }
        }
        if (task) {
            try {
                task();
            } catch (const std::exception &e) {
                std::cerr << "Exception in task: " << e.what() << std::endl;
            } catch (...) {
                std::cerr << "Unknown exception in task." << std::endl;
            }
            std::cout << "threadID: " << threadID << " completed a task." << std::endl;
            m_spareThreadSize++;
            last_time = std::chrono::high_resolution_clock::now();
        }
    }
}

void ThreadPool::start(std::uint32_t taskSize) {
    m_isRunning = true;
    for (std::uint32_t i = 0; i < taskSize; ++i) {
        auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
        auto threadID = ptr->getID();
        m_threads.emplace(threadID, std::move(ptr));
    }
    for (auto &it: m_threads) {
        it.second->start();
        m_spareThreadSize++;
    }
}

ThreadPool::~ThreadPool() {
    m_isRunning = false;
    std::unique_lock<std::mutex> lock(m_mutex);
    m_notEmpty.notify_all();
    m_notFull.notify_all();
    m_isExit.wait(lock, [&]() -> bool { return m_threads.empty(); });
}

submitTask模板方法實(shí)現(xiàn)

template<typename Func, typename ...Args>
auto ThreadPool::submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type> {
    using Rtype = typename std::invoke_result<Func, Args...>::type;
    auto task = std::make_shared<std::packaged_task<Rtype()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    std::future<Rtype> result = task->get_future();
    std::unique_lock lock(m_mutex);
    if (!m_notFull.wait_for(lock, std::chrono::seconds(3),
                            [&]() -> bool { return m_tasks.size() < m_maxTaskSize; })) {
        std::cerr << "Task queue is full, submit task failed!" << std::endl;
        throw std::runtime_error("Task queue is full");
    }
    m_tasks.emplace_back([task] { (*task)(); });
    m_notEmpty.notify_all();

    if (m_mode == THREAD_MODE::MODE_CACHED && m_tasks.size() > m_spareThreadSize) {
        if (m_threads.size() >= m_maxThreadSize) {
            std::cerr << "Thread pool has reached max size, cannot create new thread!" << std::endl;
        } else {
            std::cout << "Creating a new thread!" << std::endl;
            auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
            u_int64_t threadID = ptr->getID();
            m_threads.emplace(threadID, std::move(ptr));
            m_threads[threadID]->start();
            ++m_spareThreadSize;
        }
    }
    return result;
}

解析

成員變量:

  • m_threads:存儲(chǔ)所有線程的集合。
  • m_tasks:任務(wù)隊(duì)列,存儲(chǔ)待執(zhí)行的任務(wù)。
  • m_mutex、m_notEmpty、m_notFull、m_isExit:用于線程同步和任務(wù)調(diào)度的互斥量和條件變量。
  • 其他變量用于控制線程池的狀態(tài),如最大線程數(shù)、初始線程數(shù)、任務(wù)隊(duì)列最大長(zhǎng)度等。

構(gòu)造函數(shù):

初始化線程池的各項(xiàng)參數(shù),如模式、最大線程數(shù)、初始線程數(shù)、最大任務(wù)數(shù)等。

start方法:

啟動(dòng)線程池,創(chuàng)建初始數(shù)量的線程,并將其啟動(dòng)。

submitTask模板方法:

  • 提交任務(wù)到線程池,支持任意可調(diào)用對(duì)象。
  • 使用std::packaged_task和std::future實(shí)現(xiàn)任務(wù)的異步執(zhí)行和結(jié)果獲取。
  • 如果任務(wù)隊(duì)列已滿,則在指定時(shí)間內(nèi)等待,若仍滿則拋出異常。
  • 在緩存模式下,根據(jù)任務(wù)量動(dòng)態(tài)創(chuàng)建新線程。

ThreadFun方法:

  • 線程的工作函數(shù),從任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。
  • 在緩存模式下,線程在空閑一定時(shí)間后會(huì)自動(dòng)退出,降低資源占用。

析構(gòu)函數(shù):

關(guān)閉線程池,通知所有線程退出,并等待所有線程結(jié)束。

線程池的使用

以下是一個(gè)簡(jiǎn)單的示例,展示如何使用上述實(shí)現(xiàn)的線程池。

#include "ThreadPool.h"
#include <iostream>
#include <chrono>

// 示例任務(wù)函數(shù)
void exampleTask(int n) {
    std::cout << "Task " << n << " is starting." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Task " << n << " is completed." << std::endl;
}

int main() {
    // 創(chuàng)建線程池,使用緩存模式,最大線程數(shù)為8,初始線程數(shù)為4,最大任務(wù)數(shù)為16
    ThreadPool pool(THREAD_MODE::MODE_CACHED, 8, 4, 16);
    pool.start();

    // 提交多個(gè)任務(wù)
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 10; ++i) {
        futures.emplace_back(pool.submitTask(exampleTask, i));
    }

    // 等待所有任務(wù)完成
    for (auto &fut : futures) {
        fut.get();
    }

    std::cout << "All tasks have been completed." << std::endl;
    return 0;
}

運(yùn)行結(jié)果

threadID: 0 trying to get a task
threadID: 1 trying to get a task
threadID: 2 trying to get a task
threadID: 3 trying to get a task
Task 0 is starting.
Task 1 is starting.
Task 2 is starting.
Task 3 is starting.
threadID: 0 completed a task.
threadID: 0 trying to get a task
Task 4 is starting.
threadID: 1 completed a task.
threadID: 1 trying to get a task
Task 5 is starting.
...
All tasks have been completed.

以上就是使用C++實(shí)現(xiàn)一個(gè)高效的線程池的詳細(xì)內(nèi)容,更多關(guān)于C++線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • C++預(yù)處理連接的示例詳解

    C++預(yù)處理連接的示例詳解

    C++預(yù)處理連接(Preprocessor?Concatenation)是一種宏定義技巧,用于將兩個(gè)或多個(gè)符號(hào)(如變量、字符串等)連接成一個(gè)符號(hào)。這篇文章主要通過(guò)一些示例為大家講解一下預(yù)處理連接,需要的可以參考一下
    2023-03-03
  • C++第三方庫(kù)jsoncpp超詳細(xì)講解

    C++第三方庫(kù)jsoncpp超詳細(xì)講解

    這篇文章主要介紹了C++第三方庫(kù)jsoncpp的相關(guān)資料,JSONcpp是一個(gè)在C++中用于處理JSON數(shù)據(jù)的庫(kù),支持JSON格式的序列化和反序列化,通過(guò)JSONcpp,可以輕松地將數(shù)據(jù)對(duì)象組織成JSON格式的字符串,需要的朋友可以參考下
    2024-10-10
  • 一篇文章帶你了解C++(STL基礎(chǔ)、Vector)

    一篇文章帶你了解C++(STL基礎(chǔ)、Vector)

    這篇文章主要為大家詳細(xì)介紹了C++ STL基礎(chǔ),vector向量容器使用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來(lái)幫助
    2021-08-08
  • C字符串與C++字符串的深入理解

    C字符串與C++字符串的深入理解

    本篇文章是對(duì)C字符串與C++字符串進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下
    2013-05-05
  • 使用C++的inipp庫(kù)處理配置文件.ini的示例詳解

    使用C++的inipp庫(kù)處理配置文件.ini的示例詳解

    一個(gè)ini文件由多個(gè)節(jié)section組成,每個(gè)節(jié)由多個(gè)鍵值對(duì)組成,本文給大家介紹了使用第三方庫(kù)inipp來(lái)操作ini文件,文中通過(guò)代碼示例講解的非常詳細(xì),需要的朋友可以參考下
    2024-01-01
  • C++編程中用put輸出單個(gè)字符和cin輸入流的用法

    C++編程中用put輸出單個(gè)字符和cin輸入流的用法

    這篇文章主要介紹了C++編程中用put輸出單個(gè)字符和cin輸入流的用法,是C++入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-09-09
  • C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序

    C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序

    這篇文章主要為大家詳細(xì)介紹了C++有限狀態(tài)機(jī)實(shí)現(xiàn)計(jì)算器小程序的相關(guān)資料,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-06-06
  • C++ ffmpeg硬件解碼的實(shí)現(xiàn)方法

    C++ ffmpeg硬件解碼的實(shí)現(xiàn)方法

    這篇文章主要介紹了C++ ffmpeg硬件解碼的實(shí)現(xiàn),對(duì)FFmpeg多媒體解決方案中的視頻編解碼流程進(jìn)行研究。為嵌入式多媒體開(kāi)發(fā)提供參考,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-08-08
  • C語(yǔ)言可變參數(shù)函數(shù)詳解

    C語(yǔ)言可變參數(shù)函數(shù)詳解

    在某些情況下我們希望函數(shù)的參數(shù)個(gè)數(shù)可以根據(jù)需要確定,因此c語(yǔ)言引入可變參數(shù)函數(shù)。典型的可變參數(shù)函數(shù)的例子有printf()、scanf()等,下面我就開(kāi)始講解
    2021-08-08
  • C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串)

    C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串)

    這篇文章主要介紹了C++實(shí)現(xiàn)LeetCode(6.字型轉(zhuǎn)換字符串),本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-07-07

最新評(píng)論