nginx線(xiàn)程池源碼分析
周末看了nginx線(xiàn)程池部分的代碼,順手照抄了一遍,寫(xiě)成了自己的版本。實(shí)現(xiàn)上某些地方還是有差異的,不過(guò)基本結(jié)構(gòu)全部摘抄。
在這里分享一下。如果你看懂了我的版本,也就證明你看懂了nginx的線(xiàn)程池。
本文只列出了關(guān)鍵數(shù)據(jù)結(jié)構(gòu)和API,重在理解nginx線(xiàn)程池設(shè)計(jì)思路。完整代碼在最后的鏈接里。
1.任務(wù)節(jié)點(diǎn)
typedef void (*CB_FUN)(void *); //任務(wù)結(jié)構(gòu)體 typedef struct task { void *argv; //任務(wù)函數(shù)的參數(shù)(任務(wù)執(zhí)行結(jié)束前,要保證參數(shù)地址有效) CB_FUN handler; //任務(wù)函數(shù)(返回值必須為0 非0值用作增加線(xiàn)程,和銷(xiāo)毀線(xiàn)程池) struct task *next; //任務(wù)鏈指針 }zoey_task_t;
handler為函數(shù)指針,是實(shí)際的任務(wù)函數(shù),argv為該函數(shù)的參數(shù),next指向下一個(gè)任務(wù)。
2.任務(wù)隊(duì)列
typedef struct task_queue { zoey_task_t *head; //隊(duì)列頭 zoey_task_t **tail; //隊(duì)列尾 unsigned int maxtasknum; //最大任務(wù)限制 unsigned int curtasknum; //當(dāng)前任務(wù)數(shù) }zoey_task_queue_t;
head為任務(wù)隊(duì)列頭指針,tail為任務(wù)隊(duì)列尾指針,maxtasknum為隊(duì)列最大任務(wù)數(shù)限制,curtasknum為隊(duì)列當(dāng)前任務(wù)數(shù)。
3.線(xiàn)程池
typedef struct threadpool { pthread_mutex_t mutex; //互斥鎖 pthread_cond_t cond; //條件鎖 zoey_task_queue_t tasks;//任務(wù)隊(duì)列 unsigned int threadnum; //線(xiàn)程數(shù) unsigned int thread_stack_size; //線(xiàn)程堆棧大小 }zoey_threadpool_t;
mutex為互斥鎖 cond為條件鎖。mutex和cond共同保證線(xiàn)程池任務(wù)的互斥領(lǐng)取或者添加。
tasks指向任務(wù)隊(duì)列。
threadnum為線(xiàn)程池的線(xiàn)程數(shù)
thread_stack_size為線(xiàn)程堆棧大小
4.啟動(dòng)配置
//配置參數(shù) typedef struct threadpool_conf { unsigned int threadnum; //線(xiàn)程數(shù) unsigned int thread_stack_size;//線(xiàn)程堆棧大小 unsigned int maxtasknum;//最大任務(wù)限制 }zoey_threadpool_conf_t;
啟動(dòng)配置結(jié)構(gòu)體是初始化線(xiàn)程池時(shí)的一些參數(shù)。
5.初始化線(xiàn)程池
首先檢查參數(shù)是否合法,然后初始化mutex,cond,key(pthread_key_t)。key用來(lái)讀寫(xiě)線(xiàn)程全局變量,此全局變量控制線(xiàn)程是否退出。
最后創(chuàng)建線(xiàn)程。
zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf) { zoey_threadpool_t *pool = NULL; int error_flag_mutex = 0; int error_flag_cond = 0; pthread_attr_t attr; do{ if (z_conf_check(conf) == -1){ //檢查參數(shù)是否合法 break; } pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申請(qǐng)線(xiàn)程池句柄 if (pool == NULL){ break; } //初始化線(xiàn)程池基本參數(shù) pool->threadnum = conf->threadnum; pool->thread_stack_size = conf->thread_stack_size; pool->tasks.maxtasknum = conf->maxtasknum; pool->tasks.curtasknum = 0; z_task_queue_init(&pool->tasks); if (z_thread_key_create() != 0){//創(chuàng)建一個(gè)pthread_key_t,用以訪(fǎng)問(wèn)線(xiàn)程全局變量。 free(pool); break; } if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥鎖 z_thread_key_destroy(); free(pool); break; } if (z_thread_cond_create(&pool->cond) != 0){ //初始化條件鎖 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); free(pool); break; } if (z_threadpool_create(pool) != 0){ //創(chuàng)建線(xiàn)程池 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); free(pool); break; } return pool; }while(0); return NULL; }
6.添加任務(wù)
首先申請(qǐng)一個(gè)任務(wù)節(jié)點(diǎn),實(shí)例化后將節(jié)點(diǎn)加入任務(wù)隊(duì)列,并將當(dāng)前任務(wù)隊(duì)列數(shù)++并通知其他進(jìn)程有新任務(wù)。整個(gè)過(guò)程加鎖。
int zoey_threadpool_add_task(zoey_threadpool_t *pool, CB_FUN handler, void* argv) { zoey_task_t *task = NULL; //申請(qǐng)一個(gè)任務(wù)節(jié)點(diǎn)并賦值 task = (zoey_task_t *)malloc(sizeof(zoey_task_t)); if (task == NULL){ return -1; } task->handler = handler; task->argv = argv; task->next = NULL; if (pthread_mutex_lock(&pool->mutex) != 0){ //加鎖 free(task); return -1; } do{ if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判斷工作隊(duì)列中的任務(wù)數(shù)是否達(dá)到限制 break; } //將任務(wù)節(jié)點(diǎn)尾插到任務(wù)隊(duì)列 *(pool->tasks.tail) = task; pool->tasks.tail = &task->next; pool->tasks.curtasknum++; //通知阻塞的線(xiàn)程 if (pthread_cond_signal(&pool->cond) != 0){ break; } //解鎖 pthread_mutex_unlock(&pool->mutex); return 0; }while(0); pthread_mutex_unlock(&pool->mutex); free(task); return -1; }
7.銷(xiāo)毀線(xiàn)程池
銷(xiāo)毀線(xiàn)程池其實(shí)也是向任務(wù)隊(duì)列添加任務(wù),只不過(guò)添加的任務(wù)是讓線(xiàn)程退出。z_threadpool_exit_cb函數(shù)會(huì)將lock置0后退出線(xiàn)程,lock為0表示此線(xiàn)程
已經(jīng)退出,接著退出下一個(gè)線(xiàn)程。退出完線(xiàn)程釋放所有資源。
void zoey_threadpool_destroy(zoey_threadpool_t *pool) { unsigned int n = 0; volatile unsigned int lock; //z_threadpool_exit_cb函數(shù)會(huì)使對(duì)應(yīng)線(xiàn)程退出 for (; n < pool->threadnum; n++){ lock = 1; if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){ return; } while (lock){ usleep(1); } } z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); z_thread_key_destroy(); free(pool); }
8.增加一個(gè)線(xiàn)程
很簡(jiǎn)單,再生成一個(gè)線(xiàn)程以及線(xiàn)程數(shù)++即可。加鎖。
int zoey_thread_add(zoey_threadpool_t *pool) { int ret = 0; if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } ret = z_thread_add(pool); pthread_mutex_unlock(&pool->mutex); return ret; }
9.改變?nèi)蝿?wù)隊(duì)列最大任務(wù)限制
當(dāng)num=0時(shí)設(shè)置線(xiàn)程數(shù)為無(wú)限大。
void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num) { if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } z_change_maxtask_num(pool, num); //改變最大任務(wù)限制 pthread_mutex_unlock(&pool->mutex); }
10.使用示例
int main() { int array[10000] = {0}; int i = 0; zoey_threadpool_conf_t conf = {5,0,5}; //實(shí)例化啟動(dòng)參數(shù) zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化線(xiàn)程池 if (pool == NULL){ return 0; } for (; i < 10000; i++){ array[i] = i; if (i == 80){ zoey_thread_add(pool); //增加線(xiàn)程 zoey_thread_add(pool); } if (i == 100){ zoey_set_max_tasknum(pool, 0); //改變最大任務(wù)數(shù) 0為不做上限 } while(1){ if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){ break; } printf("error in i = %d\n",i); } } zoey_threadpool_destroy(pool); while(1){ sleep(5); } return 0; }
11.源碼
https://github.com/unlikewashface/zoey_threadpool.git
線(xiàn)程池可以發(fā)揮更多作用,比如可以把連接放到線(xiàn)程池里。nginx的異步加lua的協(xié)程是個(gè)非常好的組合,現(xiàn)在有了線(xiàn)程池后,線(xiàn)程池加協(xié)程將是另一個(gè)選擇。總而言之,如果在保證性能的情況下,讓nginx開(kāi)發(fā)變得非常簡(jiǎn)單,這是非常利好的消息。
相關(guān)文章
為高負(fù)載網(wǎng)絡(luò)優(yōu)化Nginx和Node.js的方法
如果不先對(duì)Nginx和Node.js的底層傳輸機(jī)制有所了解,并進(jìn)行針對(duì)性?xún)?yōu)化,可能對(duì)兩者再細(xì)致的調(diào)優(yōu)也會(huì)徒勞無(wú)功。一般情況下,Nginx通過(guò)TCP socket來(lái)連接客戶(hù)端與上游應(yīng)用2013-02-02nginx配置location總結(jié)location正則寫(xiě)法及rewrite規(guī)則寫(xiě)法
本文詳細(xì)講述了Nginx location正則寫(xiě)法,Nginx 的Rewrite規(guī)則以及Nginx.conf中if指令與全局變量2018-10-10Nginx緩存Cache的配置方案以及相關(guān)內(nèi)存占用問(wèn)題解決
這篇文章主要介紹了Nginx緩存Cache的配置方案以及相關(guān)內(nèi)存占用問(wèn)題解決,Cashe配置最主要的還是需要注意內(nèi)存資源的使用問(wèn)題,需要的朋友可以參考下2016-01-01Nginx優(yōu)化配置和內(nèi)核優(yōu)化 實(shí)現(xiàn)突破十萬(wàn)并發(fā)
Nginx是一個(gè)高性能的 HTTP 和 反向代理 服務(wù)器,也是一個(gè) IMAP/POP3/SMTP 代理服務(wù)器。本文介紹一些Nginx優(yōu)化代碼參數(shù)等2013-06-06Nginx、Apache、Lighttpd禁止目錄執(zhí)行php配置示例
這篇文章主要介紹了Nginx、Apache、Lighttpd禁止目錄執(zhí)行php配置示例,本文給出了單個(gè)目錄、多個(gè)目錄的禁止執(zhí)行PHP的方法,需要的朋友可以參考下2014-09-09詳解Nginx proxy_pass的一個(gè)/斜杠引發(fā)的血案
這篇文章主要介紹了詳解Nginx proxy_pass的一個(gè)/斜杠引發(fā)的血案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11nginx與apache限制ip并發(fā)訪(fǎng)問(wèn) 限制ip連接的設(shè)置方法
nginx限制ip并發(fā)數(shù),也是說(shuō)限制同一個(gè)ip同時(shí)連接服務(wù)器的數(shù)量,要使apache服務(wù)器做對(duì)同一IP地址的連接限制,需要mod_limitipconn來(lái)實(shí)現(xiàn)。一般需要手動(dòng)編譯。不過(guò)模塊作者也提供了一些編譯好的模塊,根據(jù)自己的apache版本可以直接使用2012-11-11