欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C++中線程池ThreadPool源碼解析

 更新時(shí)間:2022年09月05日 08:53:27   作者:喂喂喂–學(xué)編程  
線程池(threadpool)作為五大池之一(內(nèi)存池、連接池、線程池、進(jìn)程池、協(xié)程池),線程池的應(yīng)用非常廣泛,不管事客戶端程序還是后臺(tái)服務(wù)端,都是提高業(yè)務(wù)處理能力的必備模塊

什么是線程

線程是進(jìn)程中的?個(gè)執(zhí)?單元,負(fù)責(zé)當(dāng)前進(jìn)程中程序的執(zhí)?,?個(gè)進(jìn)程中?少有?個(gè)線程。?個(gè)進(jìn)程中是可以有多個(gè)線程的,這個(gè)應(yīng)?程序也可以稱之為多線程程序。多線程程序作為一種多任務(wù)、并發(fā)的工作方式

并發(fā)與并?

早期計(jì)算機(jī)的 CPU 都是單核的,一個(gè) CPU 在同一時(shí)間只能執(zhí)行一個(gè)進(jìn)程/線程,當(dāng)系統(tǒng)中有多個(gè)進(jìn)程/線程等待執(zhí)行時(shí),CPU 只能執(zhí)行完一個(gè)再執(zhí)行下一個(gè)。為了提高 CPU 利用率,減少等待時(shí)間,人們提出了一種 CPU 并發(fā)工作的理論.

并發(fā):指兩個(gè)或多個(gè)事件在同?個(gè)時(shí)間段內(nèi)發(fā)?,當(dāng)系統(tǒng)中有多個(gè)進(jìn)程/線程等待執(zhí)行時(shí),CPU只能執(zhí)行完一個(gè)再執(zhí)行下一個(gè)。

并?:指兩個(gè)或多個(gè)事件在同?時(shí)刻發(fā)?(同時(shí)發(fā)?),多核 CPU 的每個(gè)核心都可以獨(dú)立地執(zhí)行一個(gè)任務(wù),而且多個(gè)核心之間不會(huì)相互干擾。在不同核心上執(zhí)行的多個(gè)任務(wù),是真正地同時(shí)運(yùn)行,這種狀態(tài)就叫做并行。。

什么是線程池

顧名思義:線程池就是線程的池子,有很多線程,但是數(shù)量不會(huì)超過(guò)池子的限制。需要用到多執(zhí)行流進(jìn)行任務(wù)出路的時(shí)候,就從池子中取出一個(gè)線程去處理,線程池就類(lèi)似于一個(gè)實(shí)現(xiàn)了消費(fèi)者業(yè)務(wù)的生產(chǎn)者與消費(fèi)者模型。

本質(zhì)上:這就是一個(gè)基于生產(chǎn)者消費(fèi)者模型來(lái)實(shí)現(xiàn)的線程池,那么同樣遵守三種規(guī)則,生產(chǎn)者和生產(chǎn)者之間存在互斥,處理任務(wù)的線程之間存在互斥關(guān)系,生產(chǎn)者和消費(fèi)者之間存在同步和互斥關(guān)系

線程池解決什么問(wèn)題

線程池維護(hù)者多個(gè)線程,等待著分配可并發(fā)執(zhí)行的任務(wù),可以避免在短時(shí)間創(chuàng)建和銷(xiāo)毀大量線程帶來(lái)時(shí)間成本。

總結(jié)為三點(diǎn):

1.避免線程因?yàn)椴幌拗苿?chuàng)建數(shù)量導(dǎo)致的資源耗盡風(fēng)險(xiǎn)

2.任務(wù)隊(duì)列緩沖任務(wù),支持忙線不均的作用

3.節(jié)省了大量頻繁創(chuàng)建/銷(xiāo)毀線程的時(shí)間成本

怎么用線程池

下面展示一些 threadpool實(shí)現(xiàn),源碼來(lái)自openharmony。

/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef NETSTACK_THREAD_POOL
#define NETSTACK_THREAD_POOL
#include <atomic>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>
namespace OHOS::NetStack {
template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
public:
    /**
     * disallow default constructor
     */
    ThreadPool() = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(ThreadPool &&) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(ThreadPool &&) = delete;
    /**
     * make DEFAULT_THREAD_NUM threads
     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
     */
    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
    {
        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
            std::thread([this] { RunTask(); }).detach();
        }
    }
    /**
     * if ~ThreadPool, terminate all thread
     */
    ~ThreadPool()
    {
        // set needRun_ = false, and notify all the thread to wake and terminate
        needRun_ = false;
        while (runningNum_ > 0) {
            needRunCondition_.notify_all();
        }
    }
    /**
     * push it to taskQueue_ and notify a thread to run it
     * @param task new task to Execute
     */
    void Push(const Task &task)
    {
        PushTask(task);
        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
            std::thread([this] { RunTask(); }).detach();
        }
        needRunCondition_.notify_all();
    }
private:
    bool IsQueueEmpty()
    {
        std::lock_guard<std::mutex> guard(mutex_);
        return taskQueue_.empty();
    }
    bool GetTask(Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        // if taskQueue_ is empty, means timeout
        if (taskQueue_.empty()) {
            return false;
        }
        // if run to this line, means that taskQueue_ is not empty
        task = taskQueue_.top();
        taskQueue_.pop();
        return true;
    }
    void PushTask(const Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        taskQueue_.push(task);
    }
    class NumWrapper {
    public:
        NumWrapper() = delete;
        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
        {
            ++num_;
        }
        ~NumWrapper()
        {
            --num_;
        }
    private:
        std::atomic<uint32_t> &num_;
    };
    void Sleep()
    {
        std::mutex needRunMutex;
        std::unique_lock<std::mutex> lock(needRunMutex);
        /**
         * if the thread is waiting, it is idle
         * if wake up, this thread is not idle:
         *     1 this thread should return
         *     2 this thread should run task
         *     3 this thread should go to next loop
         */
        NumWrapper idleWrapper(idleThreadNum_);
        (void)idleWrapper;
        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
                                   [this] { return !needRun_ || !IsQueueEmpty(); });
    }
    void RunTask()
    {
        NumWrapper runningWrapper(runningNum_);
        (void)runningWrapper;
        while (needRun_) {
            Task task;
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            Sleep();
            if (!needRun_) {
                return;
            }
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            if (runningNum_ > DEFAULT_THREAD_NUM) {
                return;
            }
        }
    }
private:
    /**
     * other thread put a task to the taskQueue_
     */
    std::mutex mutex_;
    std::priority_queue<Task> taskQueue_;
    /**
     * 1 terminate the thread if it is idle for timeout_ seconds
     * 2 wait for the thread started util timeout_
     * 3 wait for the thread notified util timeout_
     * 4 wait for the thread terminated util timeout_
     */
    uint32_t timeout_;
    /**
     * if idleThreadNum_ is zero, make a new thread
     */
    std::atomic<uint32_t> idleThreadNum_;
    /**
     * when ThreadPool object is deleted, wait until runningNum_ is zero.
     */
    std::atomic<uint32_t> runningNum_;
    /**
     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
     */
    std::atomic_bool needRun_;
    std::condition_variable needRunCondition_;
};
} // namespace OHOS::NetStack
#endif /* NETSTACK_THREAD_POOL */

這份源碼的實(shí)現(xiàn),沒(méi)有使用一些較難理解的語(yǔ)法,基本上就是使用線程+優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的。提前創(chuàng)建指定數(shù)目的線程,每次取一個(gè)任務(wù)并執(zhí)行。任務(wù)隊(duì)列負(fù)責(zé)存放線程需要處理的任務(wù),工作線程負(fù)責(zé)從任務(wù)隊(duì)列中取出和運(yùn)行任務(wù),可以看成是一個(gè)生產(chǎn)者和多個(gè)消費(fèi)者的模型。

#include "doctest.h"
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
#include <stdexcept>
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
//#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
//#define DOCTEST_CONFIG_DISABLE
#include <string>
#include <iostream>
#include "thread_pool.h"
//
// Created by Administrator on 2022/8/10.
//
class Task {
public:
    Task() = default;
    explicit Task(std::string context){
        mContext = context;
    }
    bool operator<(const Task &e) const{
        return priority_ < e.priority_;
    }
    void Execute(){
        std::lock_guard<std::mutex> guard(mutex_);
        std::cout <<  "task is execute,name is:"<<mContext<<std::endl;
    }
public:
    uint32_t priority_;
private:
    std::string mContext;
    static std::mutex mutex_;
};
#define DEFAULT_THREAD_NUM 3
#define MAX_THREAD_NUM 6
#define TIME_OUT 500
std::mutex Task::mutex_;
static int threadpoolTest(){
    static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT);
    Task task1("name_1");
    Task task2("name_2");
    Task task3("name_3");
    Task task4("name_4");
    threadPool_.Push(task1);
    threadPool_.Push(task2);
    threadPool_.Push(task3);
    threadPool_.Push(task4);
    return 0;
}
TEST_CASE("threadPool simple use example, test by doctest unit tool") {
    threadpoolTest();
}

以上該版本thread_pool的簡(jiǎn)單使用示例,可以看到使用稍微麻煩了些。必須定義格式如下的task類(lèi),必須實(shí)現(xiàn)operator<和Execute()方法,不過(guò)整體實(shí)現(xiàn)還是很不錯(cuò)的,通俗易懂!

總結(jié)

線程池的應(yīng)用場(chǎng)景:當(dāng)有大量的數(shù)據(jù)請(qǐng)求,需要多執(zhí)行流并發(fā)/并行處理時(shí),可以采用線程池來(lái)處理任務(wù),可避免大量線程頻繁創(chuàng)建或銷(xiāo)毀所帶來(lái)的時(shí)間成本,也可避免在峰值壓力下,系統(tǒng)資源耗盡的風(fēng)險(xiǎn)。

到此這篇關(guān)于C++中線程池ThreadPool源碼解析的文章就介紹到這了,更多相關(guān)C++ ThreadPool內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論