欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

深入解析C++編程中線程池的使用

 更新時間:2015年11月23日 14:58:08   作者:turkeyzhou  
這篇文章主要介紹了深入解析C++編程中線程池的使用,包括線程池的封裝實現(xiàn)等內(nèi)容,需要的朋友可以參考下

為什么需要線程池
目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫服務(wù)器等都具有一個共同點,就是單位時間內(nèi)必須處理數(shù)目巨大的連接請求,但處理時間卻相對較短。
傳 統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請求之后,即創(chuàng)建一個新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時創(chuàng)建,即 時銷毀”的策略。盡管與創(chuàng)建進程相比,創(chuàng)建線程的時間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于 不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
我們將傳統(tǒng)方案中的線程執(zhí)行過程分為三個過程:T1、T2、T3。

  1. T1:線程創(chuàng)建時間
  2. T2:線程執(zhí)行時間,包括線程的同步等時間
  3.  T3:線程銷毀時間

那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務(wù)執(zhí)行時間很頻繁的話,這筆開銷將是不可忽略的。

 
除此之外,線程池能夠減少創(chuàng)建的線程個數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時需要并發(fā)的線程數(shù)超過上界,那么一部分線程將會等待。而傳統(tǒng)方案中,如果同時請求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個線程。盡管這不是一個很大的數(shù)目,但是也有部分機器可能達不到這種要求。
 
因此線程池的出現(xiàn)正是著眼于減少線程池本身帶來的開銷。線程池采用預創(chuàng)建的技術(shù),在應(yīng)用程序啟動之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當任務(wù)到來后,緩沖池選擇一個空閑線程,把任務(wù)傳入此線程中運行。當N1個線程都在處理任務(wù)后,緩沖池自動創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當系統(tǒng)比較空閑時,大部分線程都一直處于暫停狀態(tài),線程池自動銷毀一部分線程,回收系統(tǒng)資源。
 
基于這種預創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷毀本身所帶來的開銷分攤到了各個具體的任務(wù)上,執(zhí)行次數(shù)越多,每個任務(wù)所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷

構(gòu)建線程池框架
一般線程池都必須具備下面幾個組成部分:

  • 線程池管理器:用于創(chuàng)建并管理線程池
  • 工作線程: 線程池中實際執(zhí)行的線程
  • 任務(wù)接口: 盡管線程池大多數(shù)情況下是用來支持網(wǎng)絡(luò)服務(wù)器,但是我們將線程執(zhí)行的任務(wù)抽象出來,形成任務(wù)接口,從而是的線程池與具體的任務(wù)無關(guān)。
  • 任務(wù)隊列: 線程池的概念具體到實現(xiàn)則可能是隊列,鏈表之類的數(shù)據(jù)結(jié)構(gòu),其中保存執(zhí)行線程。

我們實現(xiàn)的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
 

  • CJob是所有的任務(wù)的基類,其提供一個接口Run,所有的任務(wù)類都必須從該類繼承,同時實現(xiàn)Run方法。該方法中實現(xiàn)具體的任務(wù)邏輯。
  •  
  • CThread是Linux中線程的包裝,其封裝了Linux線程最經(jīng)常使用的屬性和方法,它也是一個抽象類,是所有線程類的基類,具有一個接口Run。
  •  
  • CWorkerThread是實際被調(diào)度和執(zhí)行的線程類,其從CThread繼承而來,實現(xiàn)了CThread中的Run方法。
  •  
  • CThreadPool是線程池類,其負責保存線程,釋放線程以及調(diào)度線程。
  •  
  • CThreadManage是線程池與用戶的直接接口,其屏蔽了內(nèi)部的具體實現(xiàn)。
  •  
  • CThreadMutex用于線程之間的互斥。
  •  
  • CCondition則是條件變量的封裝,用于線程之間的同步。

CThreadManage直接跟客戶端打交道,其接受需要創(chuàng)建的線程初始個數(shù),并接受客戶端提交的任務(wù)。這兒的任務(wù)是具體的非抽象的任務(wù)。CThreadManage的內(nèi)部實際上調(diào)用的都是CThreadPool的相關(guān)操作。CThreadPool創(chuàng)建具體的線程,并把客戶端提交的任務(wù)分發(fā)給CWorkerThread,CWorkerThread實際執(zhí)行具體的任務(wù)。
 
理解系統(tǒng)組件
下面我們分開來了解系統(tǒng)中的各個組件。
 
CThreadManage
CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:
 

class CThreadManage
{
private:
  CThreadPool*  m_Pool;
  int     m_NumOfThread;
 
protected:
 
public:
  CThreadManage();
  CThreadManage(int num);
  virtual ~CThreadManage();
 
  void   SetParallelNum(int num);  
  void  Run(CJob* job,void* jobdata);
  void  TerminateAll(void);
};

 
其中m_Pool指向?qū)嶋H的線程池;m_NumOfThread是初始創(chuàng)建時候允許創(chuàng)建的并發(fā)的線程個數(shù)。另外Run和TerminateAll方法也非常簡單,只是簡單的調(diào)用CThreadPool的一些相關(guān)方法而已。其具體的實現(xiàn)如下:
 

CThreadManage::CThreadManage()
{
  m_NumOfThread = 10;
  m_Pool = new CThreadPool(m_NumOfThread);
}
 
CThreadManage::CThreadManage(int num)
{
  m_NumOfThread = num;
  m_Pool = new CThreadPool(m_NumOfThread);
}
 
CThreadManage::~CThreadManage()
{
  if(NULL != m_Pool)
  delete m_Pool;
}
 
void CThreadManage::SetParallelNum(int num)
{
  m_NumOfThread = num;
}
 
void CThreadManage::Run(CJob* job,void* jobdata)
{
  m_Pool->Run(job,jobdata);
}
 
void CThreadManage::TerminateAll(void)
{
  m_Pool->TerminateAll();
}

 
CThread
CThread 類實現(xiàn)了對Linux中線程操作的封裝,它是所有線程的基類,也是一個抽象類,提供了一個抽象接口Run,所有的CThread都必須實現(xiàn)該Run方法。CThread的定義如下所示:
 

class CThread
{
private:
  int     m_ErrCode;
  Semaphore  m_ThreadSemaphore; //the inner semaphore, which is used to realize
  unsigned   long m_ThreadID;  
  bool     m_Detach;    //The thread is detached
  bool     m_CreateSuspended; //if suspend after creating
  char*    m_ThreadName;
  ThreadState m_ThreadState;   //the state of the thread
 
protected:
  void   SetErrcode(int errcode){m_ErrCode = errcode;}
  static void* ThreadFunction(void*);
 
public:
  CThread();
  CThread(bool createsuspended,bool detach);
  virtual ~CThread();
 
  virtual void Run(void) = 0;
  void   SetThreadState(ThreadState state){m_ThreadState = state;}
   bool   Terminate(void);  //Terminate the threa
  bool   Start(void);    //Start to execute the thread
  void   Exit(void);
  bool   Wakeup(void);
  ThreadState GetThreadState(void){return m_ThreadState;}
  int   GetLastError(void){return m_ErrCode;}
  void   SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
  char*  GetThreadName(void){return m_ThreadName;}
  int   GetThreadID(void){return m_ThreadID;}
  bool   SetPriority(int priority);
  int   GetPriority(void);
  int   GetConcurrency(void);
  void   SetConcurrency(int num);
  bool   Detach(void);
  bool   Join(void);
  bool   Yield(void);
  int   Self(void);
};

 
線程的狀態(tài)可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創(chuàng)建后不想立即執(zhí)行任務(wù),那么我們可以將其“暫停”,如果需要運行,則喚醒。有一點必須注意的是,一旦線程開始執(zhí)行任務(wù),將不能被掛起,其將一直執(zhí)行任務(wù)至完畢。
 
線程類的相關(guān)操作均十分簡單。線程的執(zhí)行入口是從Start()函數(shù)開始,其將調(diào)用函數(shù)ThreadFunction,ThreadFunction再調(diào)用實際的Run函數(shù),執(zhí)行實際的任務(wù)。
 
CThreadPool
CThreadPool是線程的承載容器,一般可以將其實現(xiàn)為堆棧、單向隊列或者雙向隊列。在我們的系統(tǒng)中我們使用STL Vector對線程進行保存。CThreadPool的實現(xiàn)代碼如下:
 

class CThreadPool
{
friend class CWorkerThread;
 
private:
  unsigned int m_MaxNum;  //the max thread num that can create at the same time
  unsigned int m_AvailLow; //The min num of idle thread that shoule kept
  unsigned int m_AvailHigh;  //The max num of idle thread that kept at the same time
  unsigned int m_AvailNum; //the normal thread num of idle num;
  unsigned int m_InitNum; //Normal thread num;
 
protected:
  CWorkerThread* GetIdleThread(void); 
  void  AppendToIdleList(CWorkerThread* jobthread);
  void  MoveToBusyList(CWorkerThread* idlethread);
  void  MoveToIdleList(CWorkerThread* busythread);
  void  DeleteIdleThread(int num);
  void  CreateIdleThread(int num);
 
public:
  CThreadMutex m_BusyMutex;  //when visit busy list,use m_BusyMutex to lock and unlock
  CThreadMutex m_IdleMutex;  //when visit idle list,use m_IdleMutex to lock and unlock
  CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
  CThreadMutex m_VarMutex;
  CCondition    m_BusyCond; //m_BusyCond is used to sync busy thread list
  CCondition    m_IdleCond; //m_IdleCond is used to sync idle thread list
  CCondition    m_IdleJobCond; //m_JobCond is used to sync job list
  CCondition    m_MaxNumCond;

  vector<CWorkerThread*>  m_ThreadList;
  vector<CWorkerThread*>  m_BusyList;   //Thread List
  vector<CWorkerThread*>  m_IdleList; //Idle List

  CThreadPool();
  CThreadPool(int initnum);
  virtual ~CThreadPool(); 

  void  SetMaxNum(int maxnum){m_MaxNum = maxnum;}
  int   GetMaxNum(void){return m_MaxNum;}
  void  SetAvailLowNum(int minnum){m_AvailLow = minnum;}
  int   GetAvailLowNum(void){return m_AvailLow;}
  void  SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
  int   GetAvailHighNum(void){return m_AvailHigh;}
  int   GetActualAvailNum(void){return m_AvailNum;}
  int   GetAllNum(void){return m_ThreadList.size();}
  int   GetBusyNum(void){return m_BusyList.size();}
  void  SetInitNum(int initnum){m_InitNum = initnum;}
  int   GetInitNum(void){return m_InitNum;}
  void  TerminateAll(void);
  void  Run(CJob* job,void* jobdata);
};
 
 
 
CWorkerThread* CThreadPool::GetIdleThread(void)
 
{
 
  while(m_IdleList.size() ==0 )
 
  m_IdleCond.Wait();
 
  
 
  m_IdleMutex.Lock();
 
  if(m_IdleList.size() > 0 )
 
  {
 
  CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
 
  printf("Get Idle thread %d\n",thr->GetThreadID());
 
  m_IdleMutex.Unlock();
 
  return thr;
 
  }
 
  m_IdleMutex.Unlock();
 
  return NULL; 
}
  

//create num idle thread and put them to idlelist
 
void CThreadPool::CreateIdleThread(int num)
 
{
 
  for(int i=0;i<num;i++){
 
  CWorkerThread* thr = new CWorkerThread();
 
  thr->SetThreadPool(this);
 
  AppendToIdleList(thr);
 
  m_VarMutex.Lock();
 
  m_AvailNum++;
 
  m_VarMutex.Unlock();
 
  thr->Start();    //begin the thread,the thread wait for job
 
  }
 
}
 

 
void CThreadPool::Run(CJob* job,void* jobdata)
 
{
 
  assert(job!=NULL);
 
  
 
  //if the busy thread num adds to m_MaxNum,so we should wait
 
  if(GetBusyNum() == m_MaxNum)
 
    m_MaxNumCond.Wait();
 
 
 
  if(m_IdleList.size()<m_AvailLow)
 
  {
 
  if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
 
    CreateIdleThread(m_InitNum-m_IdleList.size());
 
  else
 
    CreateIdleThread(m_MaxNum-GetAllNum());
 
  }
 
 
 
  CWorkerThread* idlethr = GetIdleThread();
 
  if(idlethr !=NULL)
 
  {
 
  idlethr->m_WorkMutex.Lock();
 
  MoveToBusyList(idlethr);
 
  idlethr->SetThreadPool(this);
 
  job->SetWorkThread(idlethr);
 
  printf("Job is set to thread %d \n",idlethr->GetThreadID());
 
  idlethr->SetJob(job,jobdata);
 
  }
 
}

 
在CThreadPool中存在兩個鏈表,一個是空閑鏈表,一個是忙碌鏈表。Idle鏈表中存放所有的空閑進程,當線程執(zhí)行任務(wù)時候,其狀態(tài)變?yōu)槊β禒顟B(tài),同時從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構(gòu)造函數(shù)中,我們將執(zhí)行下面的代碼:
 

for(int i=0;i<m_InitNum;i++)
 
  {
 
  CWorkerThread* thr = new CWorkerThread();
 
  AppendToIdleList(thr);
 
  thr->SetThreadPool(this);
 
  thr->Start();    //begin the thread,the thread wait for job
 
  }

 
在該代碼中,我們將創(chuàng)建m_InitNum個線程,創(chuàng)建之后即調(diào)用AppendToIdleList放入Idle鏈表中,由于目前沒有任務(wù)分發(fā)給這些線程,因此線程執(zhí)行Start后將自己掛起。
 
事實上,線程池中容納的線程數(shù)目并不是一成不變的,其會根據(jù)執(zhí)行負載進行自動伸縮。為此在CThreadPool中設(shè)定四個變量:
 

m_InitNum:處世創(chuàng)建時線程池中的線程的個數(shù)。

m_MaxNum:當前線程池中所允許并發(fā)存在的線程的最大數(shù)目。

m_AvailLow:當前線程池中所允許存在的空閑線程的最小數(shù)目,如果空閑數(shù)目低于該值,表明負載可能過重,此時有必要增加空閑線程池的數(shù)目。實現(xiàn)中我們總是將線程調(diào)整為m_InitNum個。

m_AvailHigh:當前線程池中所允許的空閑的線程的最大數(shù)目,如果空閑數(shù)目高于該值,表明當前負載可能較輕,此時將刪除多余的空閑線程,刪除后調(diào)整數(shù)也為m_InitNum個。

m_AvailNum:目前線程池中實際存在的線程的個數(shù),其值介于m_AvailHigh和m_AvailLow之間。如果線程的個數(shù)始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要創(chuàng)建,也不需要刪除,保持平衡狀態(tài)。因此如何設(shè)定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態(tài),是線程池設(shè)計必須考慮的問題。
 
線程池在接受到新的任務(wù)之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個步驟:
 
(1)檢查當前處于忙碌狀態(tài)的線程是否達到了設(shè)定的最大值m_MaxNum,如果達到了,表明目前沒有空閑線程可用,而且也不能創(chuàng)建新的線程,因此必須等待直到有線程執(zhí)行完畢返回到空閑隊列中。
 
(2)如果當前的空閑線程數(shù)目小于我們設(shè)定的最小的空閑數(shù)目m_AvailLow,則我們必須創(chuàng)建新的線程,默認情況下,創(chuàng)建后的線程數(shù)目應(yīng)該為m_InitNum,因此創(chuàng)建的線程數(shù)目應(yīng)該為( 當前空閑線程數(shù)與m_InitNum);但是有一種特殊情況必須考慮,就是現(xiàn)有的線程總數(shù)加上創(chuàng)建后的線程數(shù)可能超過m_MaxNum,因此我們必須對線程的創(chuàng)建區(qū)別對待。
 

  if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
 
    CreateIdleThread(m_InitNum-m_IdleList.size());
 
  else
 
    CreateIdleThread(m_MaxNum-GetAllNum());

 
如果創(chuàng)建后總數(shù)不超過m_MaxNum,則創(chuàng)建后的線程為m_InitNum;如果超過了,則只創(chuàng)建( m_MaxNum-當前線程總數(shù) )個。
 
(3)調(diào)用GetIdleThread方法查找空閑線程。如果當前沒有空閑線程,則掛起;否則將任務(wù)指派給該線程,同時將其移入忙碌隊列。
 
當線程執(zhí)行完畢后,其會調(diào)用MoveToIdleList方法移入空閑鏈表中,其中還調(diào)用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
 
CJob
CJob類相對簡單,其封裝了任務(wù)的基本的屬性和方法,其中最重要的是Run方法,代碼如下:

class CJob
{
 
private:
 
  int   m_JobNo;    //The num was assigned to the job
 
  char*  m_JobName;   //The job name
 
  CThread *m_pWorkThread;   //The thread associated with the job
 
public:
 
  CJob( void );
 
  virtual ~CJob();  
 
  int   GetJobNo(void) const { return m_JobNo; }
 
  void   SetJobNo(int jobno){ m_JobNo = jobno;}
 
  char*  GetJobName(void) const { return m_JobName; }
 
  void   SetJobName(char* jobname);
 
  CThread *GetWorkThread(void){ return m_pWorkThread; }
 
  void   SetWorkThread ( CThread *pWorkThread ){
 
    m_pWorkThread = pWorkThread;
 
  }
 
  virtual void Run ( void *ptr ) = 0;
 
}; 

 
線程池使用示例
至此我們給出了一個簡單的與具體任務(wù)無關(guān)的線程池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務(wù)實現(xiàn)在Run方法中。然后將該Job交由CThreadManage去執(zhí)行。下面我們給出一個簡單的示例程序
 

class CXJob:public CJob
 {
 
public:
 
  CXJob(){i=0;}
 
  ~CXJob(){}
 
  void Run(void* jobdata)  {
 
    printf("The Job comes from CXJOB\n");
 
    sleep(2);
 
  }
 
};
 
 
 
class CYJob:public CJob
 
{
 
public:
 
  CYJob(){i=0;}
 
  ~CYJob(){}
 
  void Run(void* jobdata)  {
 
    printf("The Job comes from CYJob\n");
 
  }
 
};
 
 
 
main()
 
{
 
  CThreadManage* manage = new CThreadManage(10);
 
  for(int i=0;i<40;i++)
 
  {
 
    CXJob*  job = new CXJob();
 
    manage->Run(job,NULL);
 
  }
 
  sleep(2);
 
  CYJob* job = new CYJob();
 
  manage->Run(job,NULL);
 
  manage->TerminateAll();
 
} 

CXJob和CYJob都是從Job類繼承而來,其都實現(xiàn)了Run接口。CXJob只是簡單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創(chuàng)建10個工作線程。然后分別執(zhí)行40次CXJob和一次CYJob。
 

C++ 線程池的封裝實現(xiàn)
為了充分利用多核的優(yōu)勢,我們利用多線程來進行任務(wù)處理,但線程也同樣不能濫用,會帶來一下幾個問題:
1)線程本身存在開銷,系統(tǒng)必須為每個線程分配如棧,TLS(線程局部存儲),寄存器等。
2)線程管理會給系統(tǒng)帶來開銷,context切換同樣會給系統(tǒng)帶來成本。
3)線程本身是可以重用的資源,不需要每次都進行初始化。

所以往往在使用中,我們無需把線程與task任務(wù)進行一對一對應(yīng),只需要預先初始化有限的線程個數(shù)來處理無限的task任務(wù)即可,線程池應(yīng)運而生,原理也就是如此。

20151123145708426.png (508×534)

主要含有三個隊列

  1. 工作隊列
  2. 工作線程隊列
  3. 忙碌線程隊列

工作隊列是一個阻塞隊列,任務(wù)(仿函數(shù))任務(wù)不算被push進來(notify阻塞獲取的工作線程),工作線程隊列(一直不變)則從該隊列中獲取任務(wù)執(zhí)行(wait獲取,當任務(wù)隊列為空時阻塞等待通知),如果獲取到任務(wù),則將線程會進入忙碌線程隊列中,執(zhí)行任務(wù)的仿函數(shù),當工作完成,重新移出工作線程隊列。


定義線程池專屬異常:

struct TC_ThreadPool_Exception : public TC_Exception
{
  TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer){};
  TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err){};
  ~TC_ThreadPool_Exception () throw (){};
};


/**
 * @brief 用通線程池類, 與tc_functor, tc_functorwrapper配合使用.
 * 
 * 使用方式說明:
 * 1 采用tc_functorwrapper封裝一個調(diào)用
 * 2 用tc_threadpool對調(diào)用進行執(zhí)行
 * 具體示例代碼請參見:test/test_tc_thread_pool.cpp
 */

/**線程池本身繼承自鎖,可以幫助鎖定**/
class TC_ThreadPool : public TC_ThreadLock
{
public:

  /**
   * @brief 構(gòu)造函數(shù)
   *
   */
  TC_ThreadPool ();

  /**
   * @brief 析構(gòu), 會停止所有線程
   */
  ~TC_ThreadPool ();

  /**
   * @brief 初始化.
   * 
   * @param num 工作線程個數(shù)
   */
  void init(size_t num);

  /**
   * @brief 獲取線程個數(shù).
   *
   * @return size_t 線程個數(shù)
   */
  size_t getThreadNum()  { Lock sync(* this); return _jobthread. size(); }

  /**
   * @brief 獲取線程池的任務(wù)數(shù)( exec添加進去的).
   *
   * @return size_t 線程池的任務(wù)數(shù)
   */
  size_t getJobNum()   { return _jobqueue. size(); }

  /**
   * @brief 停止所有線程
   */
  void stop();

  /**
   * @brief 啟動所有線程
   */
  void start();

  /**
   * @brief 啟動所有線程并, 執(zhí)行初始化對象.
   * 
   * @param ParentFunctor
   * @param tf
   */
  template<class ParentFunctor>
  void start(const TC_FunctorWrapper< ParentFunctor> &tf)
  {
    for(size_t i = 0; i < _jobthread .size(); i++)
    {
      _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
    }

    start();
  }

  /**
   * @brief 添加對象到線程池執(zhí)行,該函數(shù)馬上返回,
   *   線程池的線程執(zhí)行對象
   */
  template<class ParentFunctor>
   void exec(const TC_FunctorWrapper< ParentFunctor> &tf)
  {
    _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
  }

  /**
   * @brief 等待所有工作全部結(jié)束(隊列無任務(wù), 無空閑線程).
   *
   * @param millsecond 等待的時間( ms), -1:永遠等待
   * @return      true, 所有工作都處理完畢
   *            false,超時退出
   */
  bool waitForAllDone(int millsecond = -1);

public:

  /**
   * @brief 線程數(shù)據(jù)基類,所有線程的私有數(shù)據(jù)繼承于該類
   */
  class ThreadData
  {
  public:
    /**
     * @brief 構(gòu)造
     */
    ThreadData(){};
    /**
     * @brief 析夠
     */
    virtual ~ThreadData(){};

    /**
      * @brief 生成數(shù)據(jù).
      * 
      * @ param T
     * @return ThreadData*
     */
    template<typename T>
    static T* makeThreadData()
    {
      return new T;
    }
  };

  /**
   * @brief 設(shè)置線程數(shù)據(jù).
   * 
   * @param p 線程數(shù)據(jù)
   */
  static void setThreadData(ThreadData *p);

  /**
   * @brief 獲取線程數(shù)據(jù).
   *
   * @return ThreadData* 線程數(shù)據(jù)
   */
  static ThreadData* getThreadData();

  /**
   * @brief 設(shè)置線程數(shù)據(jù), key需要自己維護.
   * 
   * @param pkey 線程私有數(shù)據(jù)key
   * @param p  線程指針
   */
  static void setThreadData(pthread_key_t pkey, ThreadData *p);

  /**
   * @brief 獲取線程數(shù)據(jù), key需要自己維護.
   * 
   * @param pkey 線程私有數(shù)據(jù)key
   * @return   指向線程的ThreadData*指針
   */
  static ThreadData* getThreadData(pthread_key_t pkey);

protected:

  /**
   * @brief 釋放資源.
   * 
   * @param p
   */
  static void destructor(void *p);

  /**
   * @brief 初始化key
   */
  class KeyInitialize
  {
  public:
    /**
     * @brief 初始化key
     */
    KeyInitialize()
    {
      int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor);
      if(ret != 0)
      {
        throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret);
      }
    }

    /**
     * @brief 釋放key
     */
    ~KeyInitialize()
    {
      pthread_key_delete(TC_ThreadPool::g_key);
    }
  };

  /**
   * @brief 初始化key的控制
   */
  static KeyInitialize g_key_initialize;

  /**
   * @brief 數(shù)據(jù)key
   */
  static pthread_key_t g_key;

protected:
  /**
   * @brief 線程池中的工作線程
   */
  class ThreadWorker : public TC_Thread
  {
  public:
    /**
      * @brief 工作線程構(gòu)造函數(shù).
      * 
     * @ param tpool
     */
    ThreadWorker(TC_ThreadPool *tpool);

    /**
     * @brief 通知工作線程結(jié)束
     */
    void terminate();

  protected:
    /**
     * @brief 運行
     */
    virtual void run();

  protected:
    /**
     * 線程池指針
     */
    TC_ThreadPool  * _tpool;

    /**
     * 是否結(jié)束線程
     */
    bool      _bTerminate;
  };

protected:

  /**
   * @brief 清除
   */
  void clear();

  /**
   * @brief 獲取任務(wù), 如果沒有任務(wù), 則為NULL.
   *
   * @return TC_FunctorWrapperInterface*
   */
  TC_FunctorWrapperInterface * get(ThreadWorker *ptw);

  /**
   * @brief 獲取啟動任務(wù).
   *
   * @return TC_FunctorWrapperInterface*
   */
  TC_FunctorWrapperInterface * get();

  /**
   * @brief 空閑了一個線程.
   * 
   * @param ptw
   */
  void idle(ThreadWorker *ptw);

  /**
   * @brief 通知等待在任務(wù)隊列上的工作線程醒來
   */
  void notifyT();

  /**
   * @brief 是否處理結(jié)束.
   *
   * @return bool
   */
  bool finish();

  /**
   * @brief 線程退出時調(diào)用
   */
  void exit();

  friend class ThreadWorker;
protected:

  /**
   * 任務(wù)隊列
   */
  TC_ThreadQueue< TC_FunctorWrapperInterface*> _jobqueue;

  /**
   * 啟動任務(wù)
   */
  TC_ThreadQueue< TC_FunctorWrapperInterface*> _startqueue;

  /**
   * 工作線程
   */
  std::vector<ThreadWorker *>         _jobthread;

  /**
   * 繁忙線程
   */
  std::set<ThreadWorker *>           _busthread;

  /**
   * 任務(wù)隊列的鎖
   */
  TC_ThreadLock                _tmutex;

   /**
   * 是否所有任務(wù)都執(zhí)行完畢
   */
   bool                    _bAllDone;
};

工作線程設(shè)計如下:

TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool)
: _tpool (tpool)
, _bTerminate ( false)
{
}

void TC_ThreadPool ::ThreadWorker::terminate()
{
  _bTerminate = true;
  _tpool->notifyT();
}

void TC_ThreadPool ::ThreadWorker::run()
{
  //調(diào)用初始化部分
  TC_FunctorWrapperInterface *pst = _tpool->get();
  if(pst)
  {
    try
    {
      (*pst)();
    }
    catch ( ... )
    {
    }
    delete pst;
    pst = NULL;
  }

  //調(diào)用處理部分
  while (! _bTerminate)
  {
    TC_FunctorWrapperInterface *pfw = _tpool->get( this);
    if(pfw != NULL)
    {
      auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);

      try
      {
        (*pfw)();
      }
      catch ( ... )
      {
      }

      _tpool->idle( this);
    }
  }

  //結(jié)束
  _tpool->exit();
}

每個工作線程在剛開始時都會執(zhí)行一下初始化操作,并進入一個無限循環(huán)的部分//調(diào)用處理部分
  while (! _bTerminate)
  {
    TC_FunctorWrapperInterface *pfw = _tpool->get( this);
    if(pfw != NULL)
    {
      auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);

      try
      {
        (*pfw)();
      }
      catch ( ... )
      {
      }

      _tpool->idle( this);
    }
  }

該工作主要是無限的從線程池的工作隊列中獲取任務(wù)并執(zhí)行,如果成功獲取任務(wù),則會將線程移進忙碌隊列:

TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw)
{

  TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
  if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
  {
    return NULL;
  }

   {
      Lock sync( _tmutex);
     _busthread. insert(ptw);
  }
  return pFunctorWrapper;
}

執(zhí)行完,移回工作線程隊列:_tpool->idle( this);

void TC_ThreadPool:: idle(ThreadWorker *ptw)
{
  Lock sync( _tmutex);
  _busthread. erase(ptw);

  //無繁忙線程, 通知等待在線程池結(jié)束的線程醒過來
  if( _busthread. empty())
  {
    _bAllDone = true;
    _tmutex.notifyAll();
  }
}


此處jobThread隊列初始化后不會改變(因為沒有實現(xiàn)自增長功能),所以非線程安全的vector隊列即可,busthread的忙碌線程隊列會被移進移出,但是操作會自帶Lock sync( _tmutex),該互斥量是線程池本身繼承的,所以是共有的,也無需另外使用線程安全的TC_ThreadQueue,使用vector即可。

TC_ThreadPool:: idle中的

  if( _busthread. empty())
  {
    _bAllDone = true;
    _tmutex.notifyAll();
  }

主要用于當線程池工作起來后的waitForAllDone方法:

bool TC_ThreadPool:: waitForAllDone( int millsecond)
{
  Lock sync( _tmutex);

start1:
  //任務(wù)隊列和繁忙線程都是空的
  if (finish())
  {
    return true;
  }

  //永遠等待
  if(millsecond < 0)
  {
    _tmutex.timedWait(1000);
    goto start1;
  }

  int64_t iNow = TC_Common:: now2ms();
  int m    = millsecond;
start2:

  bool b = _tmutex.timedWait(millsecond);
  //完成處理了
  if(finish())
  {
    return true;
  }

  if(!b)
  {
    return false;
  }

  millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
  goto start2;

  return false;
}

_tmutex.timedWait(millsecond)方法喚醒。反復判斷是否所有的工作是否完成:

bool TC_ThreadPool:: finish()
{
  return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
}


整體cpp實現(xiàn)如下:

TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize;
pthread_key_t TC_ThreadPool::g_key ;

void TC_ThreadPool::destructor( void *p)
{
  ThreadData *ttd = ( ThreadData*)p;
  if(ttd)
  {
    delete ttd;
  }
}

void TC_ThreadPool::exit()
{
  TC_ThreadPool:: ThreadData *p = getThreadData();
  if(p)
  {
    delete p;
    int ret = pthread_setspecific( g_key, NULL );
    if(ret != 0)
    {
      throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
    }
  }

  _jobqueue. clear();
}

void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p)
{
  TC_ThreadPool:: ThreadData *pOld = getThreadData();
  if(pOld != NULL && pOld != p)
  {
    delete pOld;
  }

  int ret = pthread_setspecific( g_key, ( void *)p);
  if(ret != 0)
  {
    throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
  }
}

TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData ()
{
  return ( ThreadData *) pthread_getspecific( g_key);
}

void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p)
{
  TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey);
  if(pOld != NULL && pOld != p)
  {
    delete pOld;
  }

  int ret = pthread_setspecific(pkey, ( void *)p);
  if(ret != 0)
  {
    throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
  }
}

TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey)
{
  return ( ThreadData *) pthread_getspecific(pkey);
}

TC_ThreadPool::TC_ThreadPool()
: _bAllDone ( true)
{
}

TC_ThreadPool::~TC_ThreadPool()
{
  stop();
  clear();
}

void TC_ThreadPool::clear()
{
  std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
  while(it != _jobthread. end())
  {
    delete (*it);
    ++it;
  }

  _jobthread. clear();
  _busthread. clear();
}

void TC_ThreadPool::init( size_t num)
{
  stop();

  Lock sync(* this);

  clear();

  for( size_t i = 0; i < num; i++)
  {
    _jobthread. push_back( new ThreadWorker( this));
  }
}

void TC_ThreadPool::stop()
{
  Lock sync(* this);

  std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
  while(it != _jobthread. end())
  {
    if ((*it)-> isAlive())
    {
      (*it)-> terminate();
      (*it)-> getThreadControl().join ();
    }
    ++it;
  }
  _bAllDone = true;
}

void TC_ThreadPool::start()
{
  Lock sync(* this);

  std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
  while(it != _jobthread. end())
  {
    (*it)-> start();
    ++it;
  }
  _bAllDone = false;
}

bool TC_ThreadPool:: finish()
{
  return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
}

bool TC_ThreadPool::waitForAllDone( int millsecond)
{
  Lock sync( _tmutex);

start1:
  //任務(wù)隊列和繁忙線程都是空的
  if (finish ())
  {
    return true;
  }

  //永遠等待
  if(millsecond < 0)
  {
    _tmutex.timedWait(1000);
    goto start1;
  }

  int64_t iNow = TC_Common:: now2ms();
  int m    = millsecond;
start2:

  bool b = _tmutex.timedWait(millsecond);
  //完成處理了
  if(finish ())
  {
    return true;
  }

  if(!b)
  {
    return false;
  }

  millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
  goto start2;

  return false;
}

TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw)
{

  TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
  if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
  {
    return NULL;
  }

   {
      Lock sync( _tmutex);
     _busthread. insert(ptw);
  }
  return pFunctorWrapper;
}

TC_FunctorWrapperInterface *TC_ThreadPool::get()
{
  TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
  if(! _startqueue. pop_front(pFunctorWrapper))
  {
    return NULL;
  }

  return pFunctorWrapper;
}

void TC_ThreadPool::idle( ThreadWorker *ptw)
{
  Lock sync( _tmutex);
  _busthread. erase(ptw);

  //無繁忙線程, 通知等待在線程池結(jié)束的線程醒過來
  if( _busthread. empty())
  {
      _bAllDone = true;
    _tmutex.notifyAll();
  }
}

void TC_ThreadPool::notifyT()
{
  _jobqueue. notifyT();
}

線程池使用后記
線程池適合場合
事 實上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開銷與線程執(zhí)行任 務(wù)相比不可忽略。如果線程本身的開銷相對于線程任務(wù)執(zhí)行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創(chuàng)建,即時銷毀”的策略。
 總之線程池通常適合下面的幾個場合:
 
(1)  單位時間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時間短
 
(2)  對實時性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實時要求,因此必須采用線程池進行預創(chuàng)建。
 
(3)  必須經(jīng)常面對高突發(fā)性事件,比如Web服務(wù)器,如果有足球轉(zhuǎn)播,則服務(wù)器將產(chǎn)生巨大的沖擊。此時如果采取傳統(tǒng)方法,則必須不停的大量產(chǎn)生線程,銷毀線程。此時采用動態(tài)線程池可以避免這種情況的發(fā)生。

相關(guān)文章

  • win10環(huán)境下vscode Linux C++開發(fā)代碼自動提示配置(基于WSL)

    win10環(huán)境下vscode Linux C++開發(fā)代碼自動提示配置(基于WSL)

    這篇文章主要介紹了win10環(huán)境下vscode Linux C++開發(fā)代碼自動提示配置(基于WSL),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-05-05
  • 獲取一個文件行數(shù)的方法

    獲取一個文件行數(shù)的方法

    獲取一個文件行數(shù)的方法,需要的朋友可以參考一下
    2013-03-03
  • C++的sstream標準庫詳細介紹

    C++的sstream標準庫詳細介紹

    以下是對C++中的的sstream標準庫進行了詳細的介紹,需要的朋友可以過來參考下
    2013-09-09
  • Qt QMessageBox類使用教程

    Qt QMessageBox類使用教程

    QMessageBox類提供一個模態(tài)對話框,用于通知用戶或詢問用戶一個問題并接收答案。這篇文章主要介紹了QMessageBox的一些常用用法,需要的小伙伴快來學習一下
    2021-12-12
  • C++設(shè)計模式之裝飾模式

    C++設(shè)計模式之裝飾模式

    這篇文章主要介紹了C++設(shè)計模式之裝飾模式,裝飾模式能夠?qū)崿F(xiàn)動態(tài)的為對象添加功能,是從一個對象外部來給對象添加功能,需要的朋友可以參考下
    2014-10-10
  • c語言中的文件加密與解密

    c語言中的文件加密與解密

    這篇文章主要介紹了c語言中的文件加密與解密方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • C語言分支循環(huán)其嵌套語句的使用

    C語言分支循環(huán)其嵌套語句的使用

    本文主要介紹了switch 嵌套和循環(huán)嵌套,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • C語言中的一維數(shù)組與二維數(shù)組的實現(xiàn)

    C語言中的一維數(shù)組與二維數(shù)組的實現(xiàn)

    數(shù)組可以幫我們巧妙解決生活中的問題,使我們的代碼簡潔,本文主要介紹了C語言中的一維數(shù)組與二維數(shù)組,具有一定的參考價值,感興趣的可以了解一下
    2023-12-12
  • C++while和do-while語句求和詳解

    C++while和do-while語句求和詳解

    對于C語言中的while與do-while,相信很多都再熟悉不過了,最近在工作中就用到了,所以想著總結(jié)一下,方便自己或者有需要的朋友們參考借鑒,文中通過示例代碼介紹的很詳細,感興趣的朋友們下面來一起學習學習吧
    2021-08-08
  • C++ 數(shù)字的反轉(zhuǎn)實現(xiàn)實例

    C++ 數(shù)字的反轉(zhuǎn)實現(xiàn)實例

    這篇文章主要介紹了C++ 數(shù)字的反轉(zhuǎn)實現(xiàn)實例的相關(guān)資料,需要的朋友可以參考下
    2017-06-06

最新評論