c++實(shí)現(xiàn)簡(jiǎn)單的線程池
這是對(duì)pthread線程的一個(gè)簡(jiǎn)單應(yīng)用
1. 實(shí)現(xiàn)了線程池的概念,線程可以重復(fù)使用。
2. 對(duì)信號(hào)量,互斥鎖等進(jìn)行封裝,業(yè)務(wù)處理函數(shù)中只需寫(xiě)和業(yè)務(wù)相關(guān)的代碼。
3. 移植性好。如果想把這個(gè)線程池代碼應(yīng)用到自己的實(shí)現(xiàn)中去,只要寫(xiě)自己的業(yè)務(wù)處理函數(shù)和改寫(xiě)工作隊(duì)列數(shù)據(jù)的處理方法就可以了。
Sample代碼主要包括一個(gè)主程序和兩個(gè)線程實(shí)現(xiàn)類
ThreadTest.cpp:主程序
CThreadManager:線程管理Class,線程池的實(shí)現(xiàn)類
CThread:線程Class.
主程序?qū)崿F(xiàn)方法。
1. 實(shí)現(xiàn)main函數(shù)和一個(gè)需要線程處理的業(yè)務(wù)函數(shù)(例子代碼中業(yè)務(wù)函數(shù)是一個(gè)簡(jiǎn)單的計(jì)算函數(shù)Count)。在main函數(shù)中創(chuàng)建CThreadManager的實(shí)例,產(chǎn)生線程池。這個(gè)時(shí)候,把業(yè)務(wù)函數(shù)作為函數(shù)指針傳到CThreadManager里面,最終會(huì)被線程調(diào)用。
2. 向工作隊(duì)列中放入業(yè)務(wù)函數(shù)要處理的數(shù)據(jù)。
3. 設(shè)置信號(hào)量,喚醒線程。
// 線程要執(zhí)行的函數(shù) int Count(int nWork) { int nResult = nWork * nWork; printf("count result is %d\n",nResult); return 0; } int main() { // 創(chuàng)建線程管理類的實(shí)例,把要執(zhí)行的線程函數(shù)和最大線程數(shù)傳進(jìn)去 CThreadManager* pManager = new CThreadManager(Count, 3); // 把要進(jìn)行計(jì)算的數(shù)放到工作隊(duì)列中 pManager->PushWorkQue(5); pManager->PushWorkQue(20); // 設(shè)置信號(hào)量,喚醒線程 pManager->PostSem(); pManager->PostSem(); // 等待子線程執(zhí)行 sleep(1); return 0; }
CThreadManager實(shí)現(xiàn)的方法
1. 把信號(hào)量和互斥鎖等封裝成自己的函數(shù)
2. 在new方法里,循環(huán)調(diào)用CThread的new方法,啟動(dòng)一定數(shù)量(可設(shè)定)的線程,產(chǎn)生線程池。
3. 這些線程啟動(dòng)后,就會(huì)執(zhí)行CThreadManager中的ManageFuction函數(shù)。這個(gè)函數(shù)是無(wú)限循環(huán)的,保證了線程在整個(gè)程序的生命周期中不銷毀。
4. 在循環(huán)處理里面,第一行代碼就是等待一個(gè)信號(hào)量,這個(gè)信號(hào)量是由主程序進(jìn)行設(shè)置的,這個(gè)信號(hào)信號(hào)量如果沒(méi)有被設(shè)置(代表暫時(shí)沒(méi)有需要處理的工作),所有線程都在這里阻塞著。
4. 一旦信號(hào)量被設(shè)置,根據(jù)Linux線程調(diào)度機(jī)制,在阻塞的線程隊(duì)列中,其中一個(gè)線程被喚醒,可以執(zhí)行后面的代碼。
5. 從工作隊(duì)列中取出要進(jìn)行處理的數(shù)據(jù)(使用互斥鎖進(jìn)行排他)
6. 通過(guò)函數(shù)指針調(diào)用main函數(shù)傳過(guò)來(lái)的業(yè)務(wù)函數(shù),處理數(shù)據(jù)。
7. 業(yè)務(wù)函數(shù)執(zhí)行完之后,線程進(jìn)入下一個(gè)循環(huán),等待新的信號(hào)量。
class CThreadManager { friend void* ManageFuction(void*); private: sem_t m_sem; // 信號(hào)量 pthread_mutex_t m_mutex; // 互斥鎖 queue<int> m_queWork; // 工作隊(duì)列 list<CThread*> m_lstThread; // 線程list int (*m_threadFuction)(int); //函數(shù)指針,指向main函數(shù)傳過(guò)來(lái)的線程執(zhí)行函數(shù) public: CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt); virtual ~CThreadManager(); int WaitSem(); int PostSem(); int LockMutex(); int UnlockMutex(); void PushWorkQue(int nWork); int PopWorkQue(); int RunThreadFunction(int nWork); }; // 線程執(zhí)行函數(shù),它只是個(gè)殼子,處理信號(hào)量和互斥鎖等, // 最后調(diào)用main函數(shù)傳過(guò)來(lái)的線程執(zhí)行函數(shù)來(lái)實(shí)現(xiàn)業(yè)務(wù)處理 void* ManageFuction(void* argv) { CThreadManager* pManager = (CThreadManager*)argv; // 進(jìn)行無(wú)限循環(huán)(意味著線程是不銷毀的,重復(fù)利用) while(true) { // 線程開(kāi)啟后,就在這里阻塞著,直到main函數(shù)設(shè)置了信號(hào)量 pManager->WaitSem(); printf("thread wakeup.\n"); // 從工作隊(duì)列中取出要處理的數(shù) pManager->LockMutex(); int nWork = pManager->PopWorkQue(); pManager->UnlockMutex(); printf("call Count function.\n"); pManager->RunThreadFunction(nWork); } return 0; } // 構(gòu)造方法 CThreadManager::CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt) { sem_init(&m_sem, 0, 0); pthread_mutex_init(&m_mutex, NULL); m_threadFuction = threadFuction; for(int i=0; i<nMaxThreadCnt; i++) { CThread* pThread = new CThread(ManageFuction, this); printf("thread started.\n"); m_lstThread.push_back(pThread); } }
CThread實(shí)現(xiàn)的方法
CThreadManager比較簡(jiǎn)單,封裝了創(chuàng)建線程和join線程的函數(shù)。
CThread::CThread(void* (*threadFuction)(void*),void* threadArgv) { // 初始化線程屬性 pthread_attr_t threadAttr; pthread_attr_init(&threadAttr); pthread_create(&m_thread, &threadAttr, threadFuction, threadArgv); }
c++線程池,繼承CDoit,實(shí)現(xiàn)其中的start和end
/* * 多線程管理類 * */ #ifndef CTHREADPOOLMANAGE_H #define CTHREADPOOLMANAGE_H #include <iostream> #include <pthread.h> #include <unistd.h> #include <list> #include <vector> #include <time.h> #include <asm/errno.h> #define USLEEP_TIME 100 #define CHECK_TIME 1 using namespace std; class CDoit { public: virtual int start(void *){}; virtual int end(){}; }; class CthreadPoolManage { private: int _minThreads; //最少保留幾個(gè)線程 int _maxThreads; //最多可以有幾個(gè)線程 int _waitSec; //空閑多少秒后將線程關(guān)閉 class threadInfo{ public: threadInfo(){ isbusy = false; doFlag = true; } // pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; bool isbusy; //是否空閑 bool doFlag; // time_t beginTime; //線程不工作開(kāi)始時(shí)間 pthread_t cThreadPid; //線程id pthread_attr_t cThreadAttr; //線程屬性 CDoit * doit; //任務(wù)類 void * value; //需要傳遞的值 }; //線程函數(shù) static void* startThread(void*); //任務(wù)隊(duì)列鎖 pthread_mutex_t _duty_mutex; //任務(wù)隊(duì)列 list<threadInfo*> _dutyList; //線程隊(duì)列鎖 pthread_mutex_t _thread_mutex; //線程隊(duì)列 list<threadInfo*> _threadList; ///初始化,創(chuàng)建最小個(gè)數(shù)線程/// void initThread(); ///任務(wù)分配線程/// static void* taskAllocation(void*arg); pthread_t tasktPid; ///線程銷毀、狀態(tài)檢查線程/// static void* checkThread(void* arg); pthread_t checktPid; bool checkrun; //線程異常退出清理 static void threadCleanUp(void* arg); // int addThread(list<threadInfo*> *plist,threadInfo* ptinfo); public: CthreadPoolManage(); /* 保留的最少線程,最多線程數(shù),空閑多久銷毀,保留幾個(gè)線程的冗余 */ CthreadPoolManage(int min,int max,int waitSec); ~CthreadPoolManage(); int start(); //任務(wù)注入器 int putDuty(CDoit *,void *); int getNowThreadNum(); }; #endif // CTHREADPOOLMANAGE_H
CPP
/* * 線程池,線程管理類 * */ #include "cthreadpoolmanage.h" CthreadPoolManage::CthreadPoolManage() { _minThreads = 5; //最少保留幾個(gè)線程 _maxThreads = 5; //最多可以有幾個(gè)線程 _waitSec = 10; //空閑多少秒后將線程關(guān)閉 pthread_mutex_init(&_duty_mutex, NULL); pthread_mutex_init(&_thread_mutex, NULL); checkrun = true; } CthreadPoolManage::CthreadPoolManage(int min, int max, int waitSec) { CthreadPoolManage(); _minThreads = min; //最少保留幾個(gè)線程 _maxThreads = max; //最多可以有幾個(gè)線程 _waitSec = waitSec; //空閑多少秒后將線程關(guān)閉 } CthreadPoolManage::~CthreadPoolManage() { } void CthreadPoolManage::threadCleanUp(void* arg) { threadInfo* tinfo = (threadInfo*)arg; tinfo->isbusy = false; pthread_mutex_unlock(&tinfo->mtx); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; } void* CthreadPoolManage::startThread(void* arg) { cout<<"線程開(kāi)始工作"<<endl; threadInfo* tinfo = (threadInfo*)arg; pthread_cleanup_push(threadCleanUp,arg); while(tinfo->doFlag){ pthread_mutex_lock(&tinfo->mtx); if(tinfo->doit == NULL) { cout<<"開(kāi)始等待任務(wù)"<<endl; pthread_cond_wait(&tinfo->cond,&tinfo->mtx); cout<<"有任務(wù)了"<<endl; } tinfo->isbusy = true; tinfo->doit->start(tinfo->value); tinfo->doit->end(); tinfo->doit=NULL; tinfo->isbusy = false; time( &tinfo->beginTime); pthread_mutex_unlock(&tinfo->mtx); } //0正常執(zhí)行到這兒不執(zhí)行清理函數(shù),異常會(huì)執(zhí)行 pthread_cleanup_pop(0); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; cout<<"線程結(jié)束"<<endl; } void CthreadPoolManage::initThread() { int i = 0; for(i = 0;i<this->_minThreads;i++) { threadInfo *tinfo = new threadInfo; tinfo->doit = NULL; tinfo->value = NULL; tinfo->isbusy = false; tinfo->doFlag = true; // PTHREAD_CREATE_DETACHED (分離線程) 和 PTHREAD _CREATE_JOINABLE (非分離線程) pthread_attr_init(&tinfo->cThreadAttr); pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED ); cout<<"初始化了一個(gè)線程"<<endl; if(pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,(void *)tinfo) != 0) { cout<<"創(chuàng)建線程失敗"<<endl; break; } this->_threadList.push_back(tinfo); } } int CthreadPoolManage::addThread(std::list< CthreadPoolManage::threadInfo* >* plist, CthreadPoolManage::threadInfo* ptinfo) { threadInfo *tinfo = new threadInfo; tinfo->doit = ptinfo->doit; tinfo->value = ptinfo->value; tinfo->isbusy = true; if(pthread_create(&tinfo->cThreadPid,NULL,startThread,(void *)tinfo) != 0) { cout<<"創(chuàng)建線程失敗"<<endl; return -1; } plist->push_back(tinfo); return 0; } int CthreadPoolManage::putDuty(CDoit* doit, void* value) { threadInfo *tinfo = new threadInfo; time( &tinfo->beginTime); tinfo->doit= doit; tinfo->value = value; pthread_mutex_lock(&_duty_mutex); this->_dutyList.push_back(tinfo); pthread_mutex_unlock(&_duty_mutex); return 0; } void* CthreadPoolManage::taskAllocation(void*arg) { CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; int size_1 = 0; int size_2 = 0; int i_1 = 0; int i_2 = 0; bool a_1 = true; bool a_2 = true; threadInfo* ptinfo; threadInfo* ptinfoTmp; while(true){ size_1 = 0; size_2 = 0; pthread_mutex_lock(&ptmanage->_duty_mutex); pthread_mutex_lock(&ptmanage->_thread_mutex); size_1 = ptmanage->_dutyList.size(); size_2 =ptmanage->_threadList.size(); for(list<threadInfo*>::iterator itorti1 = ptmanage->_dutyList.begin();itorti1 !=ptmanage->_dutyList.end();) { ptinfo = *itorti1; a_1 = true; for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){ ptinfoTmp = *itorti2; if(EBUSY == pthread_mutex_trylock(&ptinfoTmp->mtx)) { continue; } if(!ptinfoTmp->isbusy) { ptinfoTmp->doit = ptinfo->doit; ptinfoTmp->value = ptinfo->value; ptinfoTmp->isbusy = true; pthread_cond_signal(&ptinfoTmp->cond); pthread_mutex_unlock(&ptinfoTmp->mtx); a_1 = false; delete ptinfo; break; } pthread_mutex_unlock(&ptinfoTmp->mtx); } if(a_1){ if(ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0) { itorti1++; continue; }else{ itorti1 = ptmanage->_dutyList.erase(itorti1); } delete ptinfo; }else{ itorti1 = ptmanage->_dutyList.erase(itorti1); } } pthread_mutex_unlock(&ptmanage->_duty_mutex); pthread_mutex_unlock(&ptmanage->_thread_mutex); usleep(USLEEP_TIME); } return 0; } void* CthreadPoolManage::checkThread(void* arg) { CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; threadInfo* ptinfo; time_t nowtime; while(ptmanage->checkrun){ sleep(CHECK_TIME); pthread_mutex_lock(&ptmanage->_thread_mutex); if(ptmanage->_threadList.size()<=ptmanage->_minThreads) { pthread_mutex_unlock(&ptmanage->_thread_mutex); continue; } for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){ ptinfo = *itorti2; if(EBUSY == pthread_mutex_trylock(&ptinfo->mtx)) { itorti2++; continue; } time(&nowtime); if(ptinfo->isbusy == false && nowtime-ptinfo->beginTime>ptmanage->_waitSec) { ptinfo->doFlag = false; itorti2 = ptmanage->_threadList.erase(itorti2); }else{ itorti2++; } pthread_mutex_unlock(&ptinfo->mtx); } pthread_mutex_unlock(&ptmanage->_thread_mutex); } } int CthreadPoolManage::start() { //初始化 this->initThread(); //啟動(dòng)任務(wù)分配線程 if(pthread_create(&tasktPid,NULL,taskAllocation,(void *)this) != 0) { cout<<"創(chuàng)建任務(wù)分配線程失敗"<<endl; return -1; } //創(chuàng)建現(xiàn)程狀態(tài)分配管理線程 if(pthread_create(&checktPid,NULL,checkThread,(void *)this) != 0) { cout<<"創(chuàng)建線程狀態(tài)分配管理線程失敗"<<endl; return -1; } return 0; } /////////////////////////////// int CthreadPoolManage::getNowThreadNum() { int num = 0; pthread_mutex_lock(&this->_thread_mutex); num = this->_threadList.size(); pthread_mutex_unlock(&this->_thread_mutex); return num ; }
- 基于C++17實(shí)現(xiàn)的手寫(xiě)線程池
- 基于C++11實(shí)現(xiàn)手寫(xiě)線程池的示例代碼
- C++ 學(xué)習(xí)筆記實(shí)戰(zhàn)寫(xiě)一個(gè)簡(jiǎn)單的線程池示例
- C++單例模式實(shí)現(xiàn)線程池的示例代碼
- C++實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池的示例代碼
- C++線程池實(shí)現(xiàn)代碼
- C/C++ 原生API實(shí)現(xiàn)線程池的方法
- C++11 簡(jiǎn)單實(shí)現(xiàn)線程池的方法
- C++實(shí)現(xiàn)線程池的簡(jiǎn)單方法示例
- 深入解析C++編程中線程池的使用
- c++線程池實(shí)現(xiàn)方法
- C++線程池實(shí)現(xiàn)
相關(guān)文章
c語(yǔ)言實(shí)現(xiàn)php的trim標(biāo)簽
本文給大家介紹的是使用C語(yǔ)言實(shí)現(xiàn)php的trim標(biāo)簽功能的代碼,非常的實(shí)用,其主要作用是清除字符串開(kāi)頭結(jié)尾除空白,有需要的小伙伴可以參考下。2016-01-01C++學(xué)校運(yùn)動(dòng)會(huì)管理系統(tǒng)的實(shí)現(xiàn)
這篇文章主要為大家詳細(xì)介紹了C++如何實(shí)現(xiàn)學(xué)校運(yùn)動(dòng)會(huì)管理系統(tǒng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-10-10讓?xiě)?yīng)用程序只運(yùn)行一個(gè)實(shí)例的實(shí)現(xiàn)方法
我們?cè)谑褂谩?60軟件管家》時(shí)發(fā)現(xiàn),在《360軟件管家》已經(jīng)運(yùn)行了的情況下,再次點(diǎn)擊《360軟件管家》的圖標(biāo),那么它不會(huì)再運(yùn)行另外一個(gè)《360軟件管家》,而是將已有的《360軟件管家》給激活,始終只能運(yùn)行一個(gè)《360軟件管家》的實(shí)例2013-05-05QT使用QML實(shí)現(xiàn)地圖繪制虛線的示例代碼
QML提供了MapPolyline用于在地圖上繪制線段,這篇文章主要為大家詳細(xì)介紹了QT如何使用QML實(shí)現(xiàn)在地圖上繪制虛線,需要的小伙伴可以參考一下2023-07-07