欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C語(yǔ)言實(shí)現(xiàn)支持動(dòng)態(tài)拓展和銷毀的線程池

 更新時(shí)間:2016年01月14日 10:31:48   投稿:lijiao  
這篇文章主要為大家介紹了C語(yǔ)言實(shí)現(xiàn)支持動(dòng)態(tài)拓展和銷毀的線程池,感興趣的小伙伴們可以參考一下

本文實(shí)例介紹了C 語(yǔ)言實(shí)現(xiàn)線程池,支持動(dòng)態(tài)拓展和銷毀,分享給大家供大家參考,具體內(nèi)容如下

實(shí)現(xiàn)功能

  • 1.初始化指定個(gè)數(shù)的線程
  • 2.使用鏈表來(lái)管理任務(wù)隊(duì)列
  • 3.支持拓展動(dòng)態(tài)線程
  • 4.如果閑置線程過(guò)多,動(dòng)態(tài)銷毀部分線程
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>
 

/*線程的任務(wù)隊(duì)列由,函數(shù)和參數(shù)組成,任務(wù)由鏈表來(lái)進(jìn)行管理*/
typedef struct thread_worker_s{
  void *(*process)(void *arg); //處理函數(shù)
  void *arg;          //參數(shù)
  struct thread_worker_s *next;
}thread_worker_t;
 
#define bool int
#define true 1
#define false 0
 
/*線程池中各線程狀態(tài)描述*/
#define THREAD_STATE_RUN        0
#define THREAD_STATE_TASK_WAITING   1
#define THREAD_STATE_TASK_PROCESSING  2
#define THREAD_STATE_TASK_FINISHED   3
#define THREAD_STATE_EXIT       4   
 
 
typedef struct thread_info_s{
  pthread_t id;
  int    state; 
  struct thread_info_s *next;
}thread_info_t;
 
static char* thread_state_map[] ={"創(chuàng)建","等待任務(wù)","處理中","處理完成","已退出"};
/*線程壓縮的時(shí)候只有 0,1,2,4 狀態(tài)的線程可以銷毀*/
 
 
/*線程池管理器*/
#define THREAD_BUSY_PERCENT 0.5  /*線程:任務(wù) = 1:2 值越小,說(shuō)明任務(wù)多,增加線程*/
#define THREAD_IDLE_PERCENT 2   /*線程:任務(wù) = 2:1 值大于1,線程多于任務(wù),銷毀部分線程*/
 
typedef struct thread_pool_s{
  pthread_mutex_t queue_lock ; //隊(duì)列互斥鎖,即涉及到隊(duì)列修改時(shí)需要加鎖
  pthread_cond_t queue_ready; //隊(duì)列條件鎖,隊(duì)列滿足某個(gè)條件,觸發(fā)等待這個(gè)條件的線程繼續(xù)執(zhí)行,比如說(shuō)隊(duì)列滿了,隊(duì)列空了
 
  thread_worker_t *head   ;    //任務(wù)隊(duì)列頭指針
  bool    is_destroy   ;    //線程池是否已經(jīng)銷毀
  int num;              //線程的個(gè)數(shù)
  int rnum;         ;    //正在跑的線程
  int knum;         ;    //已殺死的線程
  int queue_size       ;    //工作隊(duì)列的大小 
  thread_info_t *threads   ;    //線程組id,通過(guò)pthread_join(thread_ids[0],NULL) 來(lái)執(zhí)行線程
  pthread_t   display   ;    //打印線程
  pthread_t   destroy   ;    //定期銷毀線程的線程id
  pthread_t   extend    ;
  float percent       ;    //線程個(gè)數(shù)于任務(wù)的比例 rnum/queue_size
  int  init_num      ;
  pthread_cond_t  extend_ready     ;    //如果要增加線程
}thread_pool_t;
 
/*-------------------------函數(shù)聲明----------------------*/
/**
 * 1.初始化互斥變量
 * 2.初始化等待變量
 * 3.創(chuàng)建指定個(gè)數(shù)的線程線程
 */
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
 
 
/*調(diào)試函數(shù)*/
void debug(char *message,int flag){
  if(flag)
    printf("%s\n",message);
}
 
void *display_thread(void *arg);
/**
 * 添加任務(wù)包括以下幾個(gè)操作
 * 1.將任務(wù)添加到隊(duì)列末尾
 * 2.通知等待進(jìn)程來(lái)處理這個(gè)任務(wù) pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //網(wǎng)線程池的隊(duì)列中增加一個(gè)需要執(zhí)行的函數(shù),也就是任務(wù)
 
/**
 * 銷毀線程池,包括以下幾個(gè)部分
 * 1.通知所有等待的進(jìn)程 pthread_cond_broadcase
 * 2.等待所有的線程執(zhí)行完
 * 3.銷毀任務(wù)列表
 * 4.釋放鎖,釋放條件
 * 4.銷毀線程池對(duì)象
 */
 
 
 
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
 
 
thread_pool_t *thread_pool_create(int num){
  if(num<1){
    return NULL;
  }
  thread_pool_t *p;
  p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
  if(p==NULL)
    return NULL;
  p->init_num = num;
  /*初始化互斥變量與條件變量*/
  pthread_mutex_init(&(p->queue_lock),NULL);
  pthread_cond_init(&(p->queue_ready),NULL);
 
  /*設(shè)置線程個(gè)數(shù)*/
  p->num  = num; 
  p->rnum = num;
  p->knum = 0;
 
  p->head = NULL;
  p->queue_size =0; 
  p->is_destroy = false;
 
   
  int i=0;
  thread_info_t *tmp=NULL;
  for(i=0;i<num;i++){
    /*創(chuàng)建線程*/
    tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
    if(tmp==NULL){
      free(p);
      return NULL;
    }else{
      tmp->next = p->threads;
      p->threads = tmp;
    }
    pthread_create(&(tmp->id),NULL,thread_excute_route,p);
    tmp->state = THREAD_STATE_RUN;
  }
 
  /*顯示*/
  pthread_create(&(p->display),NULL,display_thread,p);
  /*檢測(cè)是否需要?jiǎng)討B(tài)線程*/
  //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
  /*動(dòng)態(tài)銷毀*/
  pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
  return p;
}
 
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
  thread_pool_t *p= pool;
  thread_worker_t *worker=NULL,*member=NULL;
  worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
  int incr=0;
  if(worker==NULL){
    return -1;
  }
  worker->process = process;
  worker->arg   = arg;
  worker->next  = NULL;
  thread_pool_is_need_extend(pool);
  pthread_mutex_lock(&(p->queue_lock));
  member = p->head;
  if(member!=NULL){
    while(member->next!=NULL)
      member = member->next;
    member->next = worker;
  }else{
    p->head = worker;
  }
  p->queue_size ++;
  pthread_mutex_unlock(&(p->queue_lock));
  pthread_cond_signal(&(p->queue_ready));
  return 1;
}
 
 
void thread_pool_wait(thread_pool_t *pool){
  thread_info_t *thread;
  int i=0;
  for(i=0;i<pool->num;i++){
    thread = (thread_info_t*)(pool->threads+i);
    thread->state = THREAD_STATE_EXIT;
    pthread_join(thread->id,NULL);
  }
}
void thread_pool_destory(thread_pool_t *pool){
  thread_pool_t  *p   = pool;
  thread_worker_t *member = NULL;
 
  if(p->is_destroy)
    return ;
  p->is_destroy = true;
  pthread_cond_broadcast(&(p->queue_ready));
  thread_pool_wait(pool);
  free(p->threads);
  p->threads = NULL;
  /*銷毀任務(wù)列表*/
  while(p->head){
    member = p->head;
    p->head = member->next;
    free(member);
  }
  /*銷毀線程列表*/
  thread_info_t *tmp=NULL;
  while(p->threads){
    tmp = p->threads;
    p->threads = tmp->next;
    free(tmp);
  }
 
  pthread_mutex_destroy(&(p->queue_lock));
  pthread_cond_destroy(&(p->queue_ready));
  return ;
}
/*通過(guò)線程id,找到對(duì)應(yīng)的線程*/
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
  thread_info_t *thread=NULL;
  thread_info_t *p=pool->threads;
  while(p!=NULL){
    if(p->id==id)
      return p;
    p = p->next;
  }
  return NULL;
}
 
 
/*每個(gè)線程入口函數(shù)*/
void *thread_excute_route(void *arg){
  thread_worker_t *worker = NULL;
  thread_info_t  *thread = NULL; 
  thread_pool_t*  p = (thread_pool_t*)arg;
  //printf("thread %lld create success\n",pthread_self());
  while(1){
    pthread_mutex_lock(&(p->queue_lock));
 
    /*獲取當(dāng)前線程的id*/
    pthread_t pthread_id = pthread_self();
    /*設(shè)置當(dāng)前狀態(tài)*/
    thread = get_thread_by_id(p,pthread_id);
 
    /*線程池被銷毀,并且沒(méi)有任務(wù)了*/
    if(p->is_destroy==true && p->queue_size ==0){
      pthread_mutex_unlock(&(p->queue_lock));
      thread->state = THREAD_STATE_EXIT;
      p->knum ++;
      p->rnum --;
      pthread_exit(NULL);
    }
    if(thread){
      thread->state = THREAD_STATE_TASK_WAITING; /*線程正在等待任務(wù)*/
    }
    /*線程池沒(méi)有被銷毀,沒(méi)有任務(wù)到來(lái)就一直等待*/
    while(p->queue_size==0 && !p->is_destroy){
      pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
    }
    p->queue_size--;
    worker = p->head;
    p->head = worker->next;
    pthread_mutex_unlock(&(p->queue_lock));
     
 
    if(thread)
      thread->state = THREAD_STATE_TASK_PROCESSING; /*線程正在執(zhí)行任務(wù)*/
    (*(worker->process))(worker->arg);
    if(thread)
      thread->state = THREAD_STATE_TASK_FINISHED;  /*任務(wù)執(zhí)行完成*/
    free(worker);
    worker = NULL;
  }
}
 
 
 
/*拓展線程*/
void *thread_pool_is_need_extend(void *arg){
  thread_pool_t *p = (thread_pool_t *)arg;
  thread_pool_t *pool = p;
  /*判斷是否需要增加線程,最終目的 線程:任務(wù)=1:2*/
  if(p->queue_size>100){
    int incr =0;
    if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
      incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*計(jì)算需要增加線程個(gè)數(shù)*/
      int i=0;
      thread_info_t *tmp=NULL;
      thread_pool_t *p = pool;
      pthread_mutex_lock(&pool->queue_lock);
      if(p->queue_size<100){
        pthread_mutex_unlock(&pool->queue_lock);
        return ;
      }
      for(i=0;i<incr;i++){
        /*創(chuàng)建線程*/
        tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
        if(tmp==NULL){
          continue;
        }else{
          tmp->next = p->threads;
          p->threads = tmp;
        }
        p->num ++;
        p->rnum ++;
        pthread_create(&(tmp->id),NULL,thread_excute_route,p);
        tmp->state = THREAD_STATE_RUN;
      }
      pthread_mutex_unlock(&pool->queue_lock);
    }
  }
  //pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/*恢復(fù)初始線程個(gè)數(shù)*/
void *thread_pool_is_need_recovery(void *arg){
  thread_pool_t *pool = (thread_pool_t *)arg;
  int i=0;
  thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
  /*如果沒(méi)有任務(wù)了,當(dāng)前線程大于初始化的線程個(gè)數(shù)*/
  while(1){
    i=0;
    if(pool->queue_size==0 && pool->rnum > pool->init_num ){
      sleep(5);
      /*5s秒內(nèi)還是這個(gè)狀態(tài)的話就,銷毀部分線程*/
      if(pool->queue_size==0 && pool->rnum > pool->init_num ){
        pthread_mutex_lock(&pool->queue_lock);
        tmp = pool->threads;
        while((pool->rnum != pool->init_num) && tmp){
          /*找到空閑的線程*/
          if(tmp->state != THREAD_STATE_TASK_PROCESSING){
            i++;
            if(prev)
              prev->next  = tmp->next;
            else
              pool->threads = tmp->next;
            pool->rnum --; /*正在運(yùn)行的線程減一*/
            pool->knum ++; /*銷毀的線程加一*/
            kill(tmp->id,SIGKILL); /*銷毀線程*/
            p1 = tmp;
            tmp = tmp->next;
            free(p1);
            continue;
          }
          prev = tmp;
          tmp = tmp->next;
        }
        pthread_mutex_unlock(&pool->queue_lock);
        printf("5s內(nèi)沒(méi)有新任務(wù)銷毀部分線程,銷毀了 %d 個(gè)線程\n",i);
      }
    }
    sleep(5);
  }
}
 
 
 
/*打印一些信息的*/
void *display_thread(void *arg){
  thread_pool_t *p =(thread_pool_t *)arg;
  thread_info_t *thread = NULL;
  int i=0;
  while(1){
    printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum);  /*線程總數(shù),正在跑的,已銷毀的*/
    thread = p->threads;
    while(thread){
      printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
      thread = thread->next;
    }
    sleep(5);
  }
}

希望本文所述對(duì)大家學(xué)習(xí)C語(yǔ)言程序設(shè)計(jì)有所幫助。

相關(guān)文章

  • c/c++靜態(tài)庫(kù)之間相互調(diào)用的實(shí)戰(zhàn)案例

    c/c++靜態(tài)庫(kù)之間相互調(diào)用的實(shí)戰(zhàn)案例

    C++調(diào)用C的函數(shù)比較簡(jiǎn)單,直接使用extern "C" {}告訴編譯器用C的規(guī)則去編譯C代碼就可以了,下面這篇文章主要給大家介紹了關(guān)于c/c++靜態(tài)庫(kù)之間相互調(diào)用的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-08-08
  • C++ 實(shí)現(xiàn)2048游戲示例

    C++ 實(shí)現(xiàn)2048游戲示例

    《2048》是比較流行的一款數(shù)字游戲。原版2048首先在github上發(fā)布,原作者是Gabriele Cirulli。它是基于《1024》和《小3傳奇》的玩法開(kāi)發(fā)而成的新型數(shù)字游戲。
    2014-06-06
  • C語(yǔ)言中g(shù)etchar和putchar的使用方法詳解

    C語(yǔ)言中g(shù)etchar和putchar的使用方法詳解

    我們知道scanf函數(shù)可以從鍵盤輸入信息,而printf則可以輸出信息,同樣地,getchar和putchar也有同樣的功能,下面我來(lái)給大家介紹putchar和getchar的使用方法,需要的朋友可以參考下
    2023-08-08
  • C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單計(jì)算器

    C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單計(jì)算器

    這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單計(jì)算器,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-05-05
  • C++中函數(shù)重載與引用的操作方法

    C++中函數(shù)重載與引用的操作方法

    C++中函數(shù)重載允許同名函數(shù)根據(jù)參數(shù)列表的不同而執(zhí)行不同的功能,這依賴于名字修飾或名字改編(Name Mangling)機(jī)制,而引用則是為變量創(chuàng)建一個(gè)別名,不會(huì)開(kāi)辟新的內(nèi)存空間,本文介紹了C++中函數(shù)重載與引用的操作,感興趣的朋友一起看看吧
    2024-10-10
  • 最新評(píng)論