C語言實現(xiàn)支持動態(tài)拓展和銷毀的線程池
更新時間:2016年01月14日 10:31:48 投稿:lijiao
這篇文章主要為大家介紹了C語言實現(xiàn)支持動態(tài)拓展和銷毀的線程池,感興趣的小伙伴們可以參考一下
本文實例介紹了C 語言實現(xiàn)線程池,支持動態(tài)拓展和銷毀,分享給大家供大家參考,具體內(nèi)容如下
實現(xiàn)功能
- 1.初始化指定個數(shù)的線程
- 2.使用鏈表來管理任務(wù)隊列
- 3.支持拓展動態(tài)線程
- 4.如果閑置線程過多,動態(tài)銷毀部分線程
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>
/*線程的任務(wù)隊列由,函數(shù)和參數(shù)組成,任務(wù)由鏈表來進行管理*/
typedef struct thread_worker_s{
void *(*process)(void *arg); //處理函數(shù)
void *arg; //參數(shù)
struct thread_worker_s *next;
}thread_worker_t;
#define bool int
#define true 1
#define false 0
/*線程池中各線程狀態(tài)描述*/
#define THREAD_STATE_RUN 0
#define THREAD_STATE_TASK_WAITING 1
#define THREAD_STATE_TASK_PROCESSING 2
#define THREAD_STATE_TASK_FINISHED 3
#define THREAD_STATE_EXIT 4
typedef struct thread_info_s{
pthread_t id;
int state;
struct thread_info_s *next;
}thread_info_t;
static char* thread_state_map[] ={"創(chuàng)建","等待任務(wù)","處理中","處理完成","已退出"};
/*線程壓縮的時候只有 0,1,2,4 狀態(tài)的線程可以銷毀*/
/*線程池管理器*/
#define THREAD_BUSY_PERCENT 0.5 /*線程:任務(wù) = 1:2 值越小,說明任務(wù)多,增加線程*/
#define THREAD_IDLE_PERCENT 2 /*線程:任務(wù) = 2:1 值大于1,線程多于任務(wù),銷毀部分線程*/
typedef struct thread_pool_s{
pthread_mutex_t queue_lock ; //隊列互斥鎖,即涉及到隊列修改時需要加鎖
pthread_cond_t queue_ready; //隊列條件鎖,隊列滿足某個條件,觸發(fā)等待這個條件的線程繼續(xù)執(zhí)行,比如說隊列滿了,隊列空了
thread_worker_t *head ; //任務(wù)隊列頭指針
bool is_destroy ; //線程池是否已經(jīng)銷毀
int num; //線程的個數(shù)
int rnum; ; //正在跑的線程
int knum; ; //已殺死的線程
int queue_size ; //工作隊列的大小
thread_info_t *threads ; //線程組id,通過pthread_join(thread_ids[0],NULL) 來執(zhí)行線程
pthread_t display ; //打印線程
pthread_t destroy ; //定期銷毀線程的線程id
pthread_t extend ;
float percent ; //線程個數(shù)于任務(wù)的比例 rnum/queue_size
int init_num ;
pthread_cond_t extend_ready ; //如果要增加線程
}thread_pool_t;
/*-------------------------函數(shù)聲明----------------------*/
/**
* 1.初始化互斥變量
* 2.初始化等待變量
* 3.創(chuàng)建指定個數(shù)的線程線程
*/
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
/*調(diào)試函數(shù)*/
void debug(char *message,int flag){
if(flag)
printf("%s\n",message);
}
void *display_thread(void *arg);
/**
* 添加任務(wù)包括以下幾個操作
* 1.將任務(wù)添加到隊列末尾
* 2.通知等待進程來處理這個任務(wù) pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //網(wǎng)線程池的隊列中增加一個需要執(zhí)行的函數(shù),也就是任務(wù)
/**
* 銷毀線程池,包括以下幾個部分
* 1.通知所有等待的進程 pthread_cond_broadcase
* 2.等待所有的線程執(zhí)行完
* 3.銷毀任務(wù)列表
* 4.釋放鎖,釋放條件
* 4.銷毀線程池對象
*/
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
thread_pool_t *thread_pool_create(int num){
if(num<1){
return NULL;
}
thread_pool_t *p;
p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
if(p==NULL)
return NULL;
p->init_num = num;
/*初始化互斥變量與條件變量*/
pthread_mutex_init(&(p->queue_lock),NULL);
pthread_cond_init(&(p->queue_ready),NULL);
/*設(shè)置線程個數(shù)*/
p->num = num;
p->rnum = num;
p->knum = 0;
p->head = NULL;
p->queue_size =0;
p->is_destroy = false;
int i=0;
thread_info_t *tmp=NULL;
for(i=0;i<num;i++){
/*創(chuàng)建線程*/
tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
if(tmp==NULL){
free(p);
return NULL;
}else{
tmp->next = p->threads;
p->threads = tmp;
}
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state = THREAD_STATE_RUN;
}
/*顯示*/
pthread_create(&(p->display),NULL,display_thread,p);
/*檢測是否需要動態(tài)線程*/
//pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
/*動態(tài)銷毀*/
pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
return p;
}
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
thread_pool_t *p= pool;
thread_worker_t *worker=NULL,*member=NULL;
worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
int incr=0;
if(worker==NULL){
return -1;
}
worker->process = process;
worker->arg = arg;
worker->next = NULL;
thread_pool_is_need_extend(pool);
pthread_mutex_lock(&(p->queue_lock));
member = p->head;
if(member!=NULL){
while(member->next!=NULL)
member = member->next;
member->next = worker;
}else{
p->head = worker;
}
p->queue_size ++;
pthread_mutex_unlock(&(p->queue_lock));
pthread_cond_signal(&(p->queue_ready));
return 1;
}
void thread_pool_wait(thread_pool_t *pool){
thread_info_t *thread;
int i=0;
for(i=0;i<pool->num;i++){
thread = (thread_info_t*)(pool->threads+i);
thread->state = THREAD_STATE_EXIT;
pthread_join(thread->id,NULL);
}
}
void thread_pool_destory(thread_pool_t *pool){
thread_pool_t *p = pool;
thread_worker_t *member = NULL;
if(p->is_destroy)
return ;
p->is_destroy = true;
pthread_cond_broadcast(&(p->queue_ready));
thread_pool_wait(pool);
free(p->threads);
p->threads = NULL;
/*銷毀任務(wù)列表*/
while(p->head){
member = p->head;
p->head = member->next;
free(member);
}
/*銷毀線程列表*/
thread_info_t *tmp=NULL;
while(p->threads){
tmp = p->threads;
p->threads = tmp->next;
free(tmp);
}
pthread_mutex_destroy(&(p->queue_lock));
pthread_cond_destroy(&(p->queue_ready));
return ;
}
/*通過線程id,找到對應(yīng)的線程*/
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
thread_info_t *thread=NULL;
thread_info_t *p=pool->threads;
while(p!=NULL){
if(p->id==id)
return p;
p = p->next;
}
return NULL;
}
/*每個線程入口函數(shù)*/
void *thread_excute_route(void *arg){
thread_worker_t *worker = NULL;
thread_info_t *thread = NULL;
thread_pool_t* p = (thread_pool_t*)arg;
//printf("thread %lld create success\n",pthread_self());
while(1){
pthread_mutex_lock(&(p->queue_lock));
/*獲取當(dāng)前線程的id*/
pthread_t pthread_id = pthread_self();
/*設(shè)置當(dāng)前狀態(tài)*/
thread = get_thread_by_id(p,pthread_id);
/*線程池被銷毀,并且沒有任務(wù)了*/
if(p->is_destroy==true && p->queue_size ==0){
pthread_mutex_unlock(&(p->queue_lock));
thread->state = THREAD_STATE_EXIT;
p->knum ++;
p->rnum --;
pthread_exit(NULL);
}
if(thread){
thread->state = THREAD_STATE_TASK_WAITING; /*線程正在等待任務(wù)*/
}
/*線程池沒有被銷毀,沒有任務(wù)到來就一直等待*/
while(p->queue_size==0 && !p->is_destroy){
pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
}
p->queue_size--;
worker = p->head;
p->head = worker->next;
pthread_mutex_unlock(&(p->queue_lock));
if(thread)
thread->state = THREAD_STATE_TASK_PROCESSING; /*線程正在執(zhí)行任務(wù)*/
(*(worker->process))(worker->arg);
if(thread)
thread->state = THREAD_STATE_TASK_FINISHED; /*任務(wù)執(zhí)行完成*/
free(worker);
worker = NULL;
}
}
/*拓展線程*/
void *thread_pool_is_need_extend(void *arg){
thread_pool_t *p = (thread_pool_t *)arg;
thread_pool_t *pool = p;
/*判斷是否需要增加線程,最終目的 線程:任務(wù)=1:2*/
if(p->queue_size>100){
int incr =0;
if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*計算需要增加線程個數(shù)*/
int i=0;
thread_info_t *tmp=NULL;
thread_pool_t *p = pool;
pthread_mutex_lock(&pool->queue_lock);
if(p->queue_size<100){
pthread_mutex_unlock(&pool->queue_lock);
return ;
}
for(i=0;i<incr;i++){
/*創(chuàng)建線程*/
tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
if(tmp==NULL){
continue;
}else{
tmp->next = p->threads;
p->threads = tmp;
}
p->num ++;
p->rnum ++;
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state = THREAD_STATE_RUN;
}
pthread_mutex_unlock(&pool->queue_lock);
}
}
//pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/*恢復(fù)初始線程個數(shù)*/
void *thread_pool_is_need_recovery(void *arg){
thread_pool_t *pool = (thread_pool_t *)arg;
int i=0;
thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
/*如果沒有任務(wù)了,當(dāng)前線程大于初始化的線程個數(shù)*/
while(1){
i=0;
if(pool->queue_size==0 && pool->rnum > pool->init_num ){
sleep(5);
/*5s秒內(nèi)還是這個狀態(tài)的話就,銷毀部分線程*/
if(pool->queue_size==0 && pool->rnum > pool->init_num ){
pthread_mutex_lock(&pool->queue_lock);
tmp = pool->threads;
while((pool->rnum != pool->init_num) && tmp){
/*找到空閑的線程*/
if(tmp->state != THREAD_STATE_TASK_PROCESSING){
i++;
if(prev)
prev->next = tmp->next;
else
pool->threads = tmp->next;
pool->rnum --; /*正在運行的線程減一*/
pool->knum ++; /*銷毀的線程加一*/
kill(tmp->id,SIGKILL); /*銷毀線程*/
p1 = tmp;
tmp = tmp->next;
free(p1);
continue;
}
prev = tmp;
tmp = tmp->next;
}
pthread_mutex_unlock(&pool->queue_lock);
printf("5s內(nèi)沒有新任務(wù)銷毀部分線程,銷毀了 %d 個線程\n",i);
}
}
sleep(5);
}
}
/*打印一些信息的*/
void *display_thread(void *arg){
thread_pool_t *p =(thread_pool_t *)arg;
thread_info_t *thread = NULL;
int i=0;
while(1){
printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum); /*線程總數(shù),正在跑的,已銷毀的*/
thread = p->threads;
while(thread){
printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
thread = thread->next;
}
sleep(5);
}
}
希望本文所述對大家學(xué)習(xí)C語言程序設(shè)計有所幫助。
您可能感興趣的文章:
相關(guān)文章
簡要對比C語言中的truncate()函數(shù)與ftruncate()函數(shù)
這篇文章主要介紹了C語言中的truncate()函數(shù)與ftruncate()函數(shù)的簡要對比,注意其之間的區(qū)別,需要的朋友可以參考下2015-09-09
c/c++靜態(tài)庫之間相互調(diào)用的實戰(zhàn)案例
C++調(diào)用C的函數(shù)比較簡單,直接使用extern "C" {}告訴編譯器用C的規(guī)則去編譯C代碼就可以了,下面這篇文章主要給大家介紹了關(guān)于c/c++靜態(tài)庫之間相互調(diào)用的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
2022-08-08
C語言中g(shù)etchar和putchar的使用方法詳解
我們知道scanf函數(shù)可以從鍵盤輸入信息,而printf則可以輸出信息,同樣地,getchar和putchar也有同樣的功能,下面我來給大家介紹putchar和getchar的使用方法,需要的朋友可以參考下
2023-08-08 
