Linux進(jìn)程池實(shí)現(xiàn)的詳細(xì)指南
1.為什么要有進(jìn)程池
如果你了解過(guò)STL的底層設(shè)計(jì),你會(huì)發(fā)現(xiàn)在其中會(huì)有一個(gè)叫做內(nèi)存池的設(shè)計(jì)。其作用就是先申請(qǐng)出一片空間,如果后續(xù)你需要對(duì)你的容器進(jìn)行擴(kuò)容,所擴(kuò)展的空間就從內(nèi)存池里取的。這樣可以提高擴(kuò)容的效率。
比如你要擴(kuò)容100次,如果每次擴(kuò)容都向系統(tǒng)申請(qǐng)空間的話,效率就很低,因?yàn)橄蛳到y(tǒng)申請(qǐng)空間也是需要時(shí)間的,所以內(nèi)存池的作用就是一次性申請(qǐng)一篇空間,當(dāng)需要擴(kuò)容時(shí)就向內(nèi)存池要,可以提高擴(kuò)容的效率。
既然這樣,我們也可以如此理解進(jìn)程池,一次性創(chuàng)建一批進(jìn)程,如果有任務(wù)要執(zhí)行就交給進(jìn)程池中空閑的進(jìn)程來(lái)做,而不是一有任務(wù)就創(chuàng)建一個(gè)新的進(jìn)程,進(jìn)程池的目的也是為了提供效率,節(jié)省創(chuàng)建進(jìn)程的時(shí)間消耗。
通過(guò)預(yù)先創(chuàng)建和復(fù)用進(jìn)程,進(jìn)程池能夠提高任務(wù)執(zhí)行效率,避免頻繁創(chuàng)建和銷毀進(jìn)程帶來(lái)的系統(tǒng)開(kāi)銷。
2.進(jìn)程池的工作原理
進(jìn)程池的核心思想是創(chuàng)建固定數(shù)量的進(jìn)程,然后將需要執(zhí)行的任務(wù)分配給這些進(jìn)程來(lái)處理。當(dāng)某個(gè)任務(wù)完成后,該進(jìn)程可以繼續(xù)處理下一個(gè)任務(wù),而不是銷毀。這樣可以減少頻繁創(chuàng)建和銷毀進(jìn)程帶來(lái)的資源浪費(fèi)。
2.1 進(jìn)程池的工作流程
- 初始化:預(yù)處理創(chuàng)建一定數(shù)量的進(jìn)程,形成進(jìn)程池。
- 任務(wù)分配:當(dāng)有任務(wù)需要處理時(shí),將任務(wù)分配給某個(gè)空閑進(jìn)程。
- 任務(wù)處理:空閑進(jìn)程接受任務(wù)并執(zhí)行。
- 復(fù)用進(jìn)程:任務(wù)執(zhí)行完成后,進(jìn)程回到池中,等待新的任務(wù)。
- 退出:當(dāng)沒(méi)有新的任務(wù)且需要關(guān)閉進(jìn)程池,池中進(jìn)程將逐個(gè)退出。
3. 進(jìn)程池的實(shí)現(xiàn)(重點(diǎn))
本文將著重講解進(jìn)程池的實(shí)現(xiàn)步驟。
初始化
通過(guò)運(yùn)行可執(zhí)行文件時(shí)傳入的參數(shù)來(lái)創(chuàng)建一定數(shù)量的子進(jìn)程。
如:創(chuàng)建5個(gè)子進(jìn)程
./a.out 5
代碼實(shí)現(xiàn):
enum { ArgcError = 1, ArgvError, PipeError }; void Usage(const char* tip) { cout<<"Usage:"<<tip<<" sub_process_num"<<endl; } int main(int argc,char* argv[]) { if(argc != 2) {//格式不對(duì) Usage(argv[0]); return ArgcError; } int sub_process_number = stoi(argv[1]); if(sub_process_number<=0) {//子進(jìn)程數(shù)量不能小于1 return ArgvError; } //創(chuàng)建子進(jìn)程... return 0; }
現(xiàn)在我們來(lái)分析下,我們要實(shí)現(xiàn)進(jìn)程池的功能:創(chuàng)建子進(jìn)程,發(fā)送任務(wù)給子進(jìn)程執(zhí)行,子進(jìn)程的輪詢,殺死進(jìn)程,等待進(jìn)程,一些Debug功能。這樣的話我們完全可以創(chuàng)建一個(gè)類來(lái)封裝這些功能。除此之外,我們還需要描述一下子進(jìn)程,為此也需要?jiǎng)?chuàng)建一個(gè)描述管道的類。
3.1 Channel類
Channel類的功能主要是來(lái)描述管道的,具有的屬性有該管道對(duì)應(yīng)的子進(jìn)程的id,名字,寫(xiě)端描述符。
Channel類的實(shí)現(xiàn)比較簡(jiǎn)單,直接看代碼吧:
class Channel { public: Channel(int wfd,pid_t sub_process_id,string name) : wfd_(wfd),sub_process_id_(sub_process_id),name_(name) {} void printDebug() { cout<<"name:"<<name_<<endl; cout<<"wfd:"<<wfd_<<endl; cout<<"pid:"<<sub_process_id_<<endl; } string getName() { return name_; } pid_t getPid() { return sub_process_id_; } int getWfd() { return wfd_; } void Close() { close(wfd_); } private: int wfd_; pid_t sub_process_id_; string name_; };
3.2 ProcessPool類
ProcessPool類的功能主要是來(lái)描述進(jìn)程池的,具有的屬性有該管道對(duì)應(yīng)的子進(jìn)程的數(shù)量,所有的管道。
類似于這樣:
ProcessPool類的框架:
class ProcessPool { public: ProcessPool(int sub_process_num) : sub_process_num_(sub_process_num) {} //... private: int sub_process_num_; vector<Channel> channels; };
3.2.1 創(chuàng)建子進(jìn)程
因?yàn)槲覀冃枰獎(jiǎng)?chuàng)建指定數(shù)目的進(jìn)程,用一個(gè)循環(huán)來(lái)寫(xiě)就可以了。在循環(huán)中,父進(jìn)程每次都會(huì)創(chuàng)建一個(gè)子進(jìn)程出來(lái),然后用管道于它們鏈接,注意因?yàn)槭歉高M(jìn)程給子進(jìn)程分配任務(wù),所以需要把父進(jìn)程的讀端關(guān)閉,子進(jìn)程的寫(xiě)端關(guān)閉。
初版:
int CreateProcess() { for(int i = 0;i<sub_process_num_;++i) { int pipefd[2]; int n = pipe(pipefd); if(n == -1) { return PipeError; } pid_t id = fork(); if(id == 0) { //子進(jìn)程,關(guān)閉寫(xiě)端 close(pipefd[1]); //work... } //父進(jìn)程,關(guān)閉讀端 close(pipefd[0]); string cname = "channel-"+to_string(i); channels.push_back(Channel(pipefd[1],id,cname)); } return 0; }
為了讓子進(jìn)程執(zhí)行相應(yīng)的任務(wù),我們還可以添加一個(gè)回調(diào)函數(shù)worker
。worker
函數(shù)主要作用是選擇要執(zhí)行的任務(wù),具體的任務(wù),我們還需要自己創(chuàng)建,為此還可以創(chuàng)建3個(gè)測(cè)試用的任務(wù),用一個(gè)函數(shù)指針數(shù)組去保存這些函數(shù)。
代碼如下:
typedef void(* work_t)(int); typedef void(* task_t)(int,pid_t); void PrintLog(int fd, pid_t pid) { cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl; } void ReloadConf(int fd, pid_t pid) { cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl; } void ConnectMysql(int fd, pid_t pid) { cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl; } task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql}; void worker(int fd) { while(true) { uint32_t command_code = 0; ssize_t n = read(0,&command_code,sizeof(command_code)); if(n == sizeof(command_code)) { if(command_code>=3) { continue; } tasks[command_code](fd,getpid()); } else { cout<<"sub process:"<<getpid()<<"quit now..."<<endl; break; } } }
第二版:
第二版相對(duì)第一版,多了個(gè)回調(diào)函數(shù),這個(gè)回調(diào)函數(shù)可以讓我實(shí)現(xiàn)相對(duì)應(yīng)的工作。同時(shí)也多個(gè)重定向功能,把原本標(biāo)準(zhǔn)輸入的功能給到了pipefd[0],也就是說(shuō)當(dāng)子進(jìn)程去讀標(biāo)準(zhǔn)輸入內(nèi)的數(shù)據(jù)時(shí),會(huì)去讀管道中的數(shù)據(jù)。
這是一個(gè)典型的標(biāo)準(zhǔn)輸入重定向操作,將管道的讀端作為當(dāng)前進(jìn)程的輸入來(lái)源
int CreateProcess(work_t work) { //vector<int> fds; for(int i = 0;i<sub_process_num_;++i) { int pipefd[2]; int n = pipe(pipefd); if(n == -1) { return PipeError; } pid_t id = fork(); if(id == 0) { //子進(jìn)程,關(guān)閉寫(xiě)端 close(pipefd[1]); dup2(pipefd[0],0); work(pipefd[0]); exit(0); } //父進(jìn)程,關(guān)閉讀端 close(pipefd[0]); string cname = "channel-"+to_string(i); channels.push_back(Channel(pipefd[1],id,cname)); } return 0; }
其實(shí)該代碼中還存在bug,有個(gè)魔鬼細(xì)節(jié)存在!!!
第三版:
其實(shí)對(duì)于子進(jìn)程來(lái)說(shuō),它的寫(xiě)端并沒(méi)有全部關(guān)閉。下面我們來(lái)畫(huà)圖:
創(chuàng)建第一個(gè)管道,這個(gè)圖如果看過(guò)我講匿名管道的那篇的話,還是比較熟悉的。
現(xiàn)在我們來(lái)創(chuàng)建第二個(gè)管道,我們知道文件描述符的創(chuàng)建是遵循當(dāng)前沒(méi)有直接使用的最小的一個(gè)下標(biāo),作為新的文件描述符。所以呢,新創(chuàng)建的管道的pipefd[0]
依舊是在先前的位置,可是寫(xiě)端就不是了,原先的寫(xiě)端并沒(méi)有被關(guān)閉,我們新管道創(chuàng)建的pipefd[1]
會(huì)在其下方被創(chuàng)建。
然后要知道的是,子進(jìn)程是由父進(jìn)程創(chuàng)建的,它的各項(xiàng)數(shù)據(jù)是由父進(jìn)程復(fù)制而來(lái),也就會(huì)把上一個(gè)管道的寫(xiě)端給復(fù)制過(guò)了,但是子進(jìn)程可是關(guān)閉不了它的,因?yàn)樗荒苣玫叫聞?chuàng)建管道的寫(xiě)端pipefd[1]
的位置。具體情況如圖:
所以為了關(guān)閉子進(jìn)程的所有寫(xiě)端,我們需要用有個(gè)數(shù)組去保存父進(jìn)程中的寫(xiě)端,然后再子進(jìn)程中把它們一一關(guān)閉。
代碼如下:
int CreateProcess(work_t work) { vector<int> fds; for(int i = 0;i<sub_process_num_;++i) { int pipefd[2]; int n = pipe(pipefd); if(n == -1) { return PipeError; } pid_t id = fork(); if(id == 0) { //子進(jìn)程,關(guān)閉寫(xiě)端 if(!fds.empty()) { cout<<"close w fd:"; for(auto fd:fds) { close(fd); cout<<fd<<" "; } cout<<endl; } close(pipefd[1]); dup2(pipefd[0],0); work(pipefd[0]); exit(0); } //父進(jìn)程,關(guān)閉讀端 close(pipefd[0]); string cname = "channel-"+to_string(i); channels.push_back(Channel(pipefd[1],id,cname)); fds.push_back(pipefd[1]); } return 0; }
3.2.2 殺死所有進(jìn)程
進(jìn)程池也有不需要的時(shí)候,當(dāng)進(jìn)程池不需要了,我們就要回收子進(jìn)程了,怎么回收呢?當(dāng)然是進(jìn)程等待了。
殺死子進(jìn)程也就是等待子進(jìn)程。
要注意的是別忘了關(guān)閉文件描述符
進(jìn)程等待是必須的,不然的話子進(jìn)程會(huì)變成僵尸進(jìn)程的。
void KillAllProcess() { //在回收子進(jìn)程前,我們需要把pipefd[1]全部關(guān)閉 for(auto&channel:channels) { channel.Close(); //關(guān)閉完文件描述符后,開(kāi)始等待。等待進(jìn)程需要子進(jìn)程的pid,恰巧我們的Channel中存有子進(jìn)程的pid pid_t pid = channel.getPid(); pid_t rid = waitpid(pid,nullptr,0); if(rid == pid) { //回收成功 cout<<"wait sub process:"<<pid<<" success..."<<endl; } cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl; } }
3.2.3 其他功能
因?yàn)檫@些功能都比較簡(jiǎn)單就一塊講了吧。
子進(jìn)程的輪詢,我能總不能讓一個(gè)子進(jìn)程一直跑任務(wù)吧,為了合理利用子進(jìn)程,我們可以設(shè)計(jì)也該輪詢函數(shù),讓子進(jìn)程的任務(wù)分配"雨露均沾"。
int NextChannel() { static int next = 0;//static修飾的變量只會(huì)初始化一次。 int c = next; next = (next+1)%sub_process_num_; return c; }
發(fā)送任務(wù)代碼:
void SendTaskCode(int index,uint32_t code) { cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl; write(channels[index].getPid(), &code, sizeof(code)); }
debug:
void Debug() { for(auto&channel:channels) { channel.printDebug(); cout<<endl; } }
3.3 控制進(jìn)程池
完成上面的功能就需要我們?nèi)タ刂七M(jìn)程池的子進(jìn)程了。
主要包括創(chuàng)建進(jìn)程池,控制子進(jìn)程,回收子進(jìn)程。
void CtrlSubProcess(ProcessPool* processpool,int cnt) { while(cnt--) { //選擇一個(gè)進(jìn)程和通道 int channel = processpool->NextChannel(); //選擇一個(gè)任務(wù) uint32_t code = NextTask(); processpool->SendTaskCode(channel,code); sleep(1); } } int main(int argc,char* argv[]) { if(argc!=2) { Usage(argv[0]); return ArgcError; } int sub_process_num = stoi(argv[1]); if(sub_process_num<1) { return ArgvError; } srand((unsigned int)time(nullptr));//生成隨機(jī)種子 //創(chuàng)建進(jìn)程池 ProcessPool* processpool = new ProcessPool(sub_process_num); processpool->CreateProcess(worker); processpool->Debug(); //sleep(2); //控制子進(jìn)程 CtrlSubProcess(processpool,10); //sleep(2); //回收子進(jìn)程 processpool->KillAllProcess(); delete processpool; return 0; }
運(yùn)行結(jié)果:
4. 完整代碼
///processpool.cc// #include <iostream> #include <vector> #include <unistd.h> #include <cstdlib> #include <ctime> #include <sys/wait.h> #include <sys/types.h> #include "bolg.hpp" using namespace std; enum { ArgcError = 1, ArgvError, PipeError }; class Channel { public: Channel(int wfd,pid_t sub_process_id,string name) : wfd_(wfd),sub_process_id_(sub_process_id),name_(name) {} void printDebug() { cout<<"name:"<<name_<<endl; cout<<"wfd:"<<wfd_<<endl; cout<<"pid:"<<sub_process_id_<<endl; } string getName() { return name_; } pid_t getPid() { return sub_process_id_; } int getWfd() { return wfd_; } void Close() { close(wfd_); } private: int wfd_; pid_t sub_process_id_; string name_; }; class ProcessPool { public: ProcessPool(int sub_process_num) : sub_process_num_(sub_process_num) {} int CreateProcess(work_t work) { vector<int> fds; for(int i = 0;i<sub_process_num_;++i) { int pipefd[2]; int n = pipe(pipefd); if(n == -1) { return PipeError; } pid_t id = fork(); if(id == 0) { //子進(jìn)程,關(guān)閉寫(xiě)端 if(!fds.empty()) { cout<<"close w fd:"; for(auto fd:fds) { close(fd); cout<<fd<<" "; } cout<<endl; } close(pipefd[1]); dup2(pipefd[0],0); work(pipefd[0]); exit(0); } //父進(jìn)程,關(guān)閉讀端 close(pipefd[0]); string cname = "channel-"+to_string(i); channels.push_back(Channel(pipefd[1],id,cname)); fds.push_back(pipefd[1]); } return 0; } void KillAllProcess() { //在回收子進(jìn)程前,我們需要把pipefd[1]全部關(guān)閉 for(auto&channel:channels) { channel.Close(); //關(guān)閉完文件描述符后,開(kāi)始等待。等待進(jìn)程需要子進(jìn)程的pid,恰巧我們的Channel中存有子進(jìn)程的pid pid_t pid = channel.getPid(); pid_t rid = waitpid(pid,nullptr,0); if(rid == pid) { //回收成功 cout<<"wait sub process:"<<pid<<" success..."<<endl; } cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl; } } int NextChannel() { static int next = 0; int c = next; next = (next+1)%sub_process_num_; return c; } void SendTaskCode(int index,uint32_t code) { cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl; write(channels[index].getPid(), &code, sizeof(code)); } void Debug() { for(auto&channel:channels) { channel.printDebug(); cout<<endl; } } private: int sub_process_num_; vector<Channel> channels; }; void Usage(const char* tip) { cout<<"Usage:"<<tip<<" sub_process_num"<<endl; } void CtrlSubProcess(ProcessPool* processpool,int cnt) { while(cnt--) { //選擇一個(gè)進(jìn)程和通道 int channel = processpool->NextChannel(); //選擇一個(gè)任務(wù) uint32_t code = NextTask(); processpool->SendTaskCode(channel,code); sleep(1); } } int main(int argc,char* argv[]) { if(argc!=2) { Usage(argv[0]); return ArgcError; } int sub_process_num = stoi(argv[1]); if(sub_process_num<1) { return ArgvError; } srand((unsigned int)time(nullptr)); //創(chuàng)建進(jìn)程池 ProcessPool* processpool = new ProcessPool(sub_process_num); processpool->CreateProcess(worker); processpool->Debug(); //sleep(2); //控制子進(jìn)程 CtrlSubProcess(processpool,10); //sleep(2); //回收子進(jìn)程 processpool->KillAllProcess(); delete processpool; return 0; } ///task.hpp #include <iostream> #include <vector> #include <cstdlib> #include <unistd.h> using namespace std; //創(chuàng)建函數(shù)指針 typedef void(* work_t)(int); typedef void(* task_t)(int,pid_t); void PrintLog(int fd, pid_t pid) { cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl; } void ReloadConf(int fd, pid_t pid) { cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl; } void ConnectMysql(int fd, pid_t pid) { cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl; } uint32_t NextTask() { return rand()%3; } task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql}; void worker(int fd) { while(true) { uint32_t command_code = 0; ssize_t n = read(0,&command_code,sizeof(command_code)); if(n == sizeof(command_code)) { if(command_code>=3) { continue; } tasks[command_code](fd,getpid()); } else { cout<<"sub process:"<<getpid()<<"quit now..."<<endl; break; } } }
5. 總結(jié)
進(jìn)程池的核心思想是創(chuàng)建固定數(shù)量的進(jìn)程,然后將需要執(zhí)行的任務(wù)分配給這些進(jìn)程來(lái)處理。當(dāng)某個(gè)任務(wù)完成后,該進(jìn)程可以繼續(xù)處理下一個(gè)任務(wù),而不是銷毀。這樣可以減少頻繁創(chuàng)建和銷毀進(jìn)程帶來(lái)的資源浪費(fèi)。
以上就是Linux進(jìn)程池實(shí)現(xiàn)的詳細(xì)指南的詳細(xì)內(nèi)容,更多關(guān)于Linux進(jìn)程池實(shí)現(xiàn)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
vscode遠(yuǎn)程開(kāi)發(fā)使用SSH遠(yuǎn)程連接服務(wù)器的方法「內(nèi)網(wǎng)穿透」
這篇文章主要介紹了vscode遠(yuǎn)程開(kāi)發(fā)使用SSH遠(yuǎn)程連接服務(wù)器?「內(nèi)網(wǎng)穿透」,通過(guò)本文學(xué)習(xí)我們將通過(guò)vscode實(shí)現(xiàn)遠(yuǎn)程開(kāi)發(fā),并做內(nèi)網(wǎng)穿透實(shí)現(xiàn)在公網(wǎng)環(huán)境下的遠(yuǎn)程連接,在外任意地方也可以遠(yuǎn)程連接服務(wù)器進(jìn)行開(kāi)發(fā)寫(xiě)代碼,需要的朋友可以參考下2023-02-02Apache Flink 任意 Jar 包上傳導(dǎo)致遠(yuǎn)程代碼執(zhí)行漏洞復(fù)現(xiàn)問(wèn)題(漏洞預(yù)警)
這篇文章主要介紹了Apache Flink 任意 Jar 包上傳導(dǎo)致遠(yuǎn)程代碼執(zhí)行漏洞復(fù)現(xiàn)問(wèn)題,本文給出了修復(fù)建議和解決方案,需要的朋友可以參考下2019-11-11SSH遠(yuǎn)程登錄和端口轉(zhuǎn)發(fā)詳解
這篇文章主要介紹了關(guān)于SSH遠(yuǎn)程登錄和端口轉(zhuǎn)發(fā)的相關(guān)資料,文中介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-03-03LINUX中關(guān)于mkdir命令語(yǔ)法和實(shí)例解讀
Linux系統(tǒng)中,mkdir命令用于創(chuàng)建新的目錄,支持多種選項(xiàng)如權(quán)限設(shè)置、遞歸創(chuàng)建等,對(duì)于新手和管理員都非常有用,命令的靈活性可以通過(guò)各種選項(xiàng)來(lái)實(shí)現(xiàn)不同的功能,包括設(shè)置權(quán)限、創(chuàng)建多級(jí)嵌套目錄和處理隱藏目錄等,掌握mkdir命令對(duì)于提高Linux操作效率和管理文件系統(tǒng)至關(guān)重要2024-10-10Linux下用netstat查看網(wǎng)絡(luò)狀態(tài)、端口狀態(tài)
這篇文章主要介紹了Linux下用netstat查看網(wǎng)絡(luò)狀態(tài)、端口狀態(tài)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-12-12linux輸入yum后提示: -bash: /usr/bin/yum: No such file or director
在本篇文章里小編給大家整理的是關(guān)于linux輸入yum后提示: -bash: /usr/bin/yum: No such file or directory的解決方法,有需要的朋友們參考下。2019-11-11