Linux進程池實現(xiàn)的詳細(xì)指南
1.為什么要有進程池
如果你了解過STL的底層設(shè)計,你會發(fā)現(xiàn)在其中會有一個叫做內(nèi)存池的設(shè)計。其作用就是先申請出一片空間,如果后續(xù)你需要對你的容器進行擴容,所擴展的空間就從內(nèi)存池里取的。這樣可以提高擴容的效率。
比如你要擴容100次,如果每次擴容都向系統(tǒng)申請空間的話,效率就很低,因為向系統(tǒng)申請空間也是需要時間的,所以內(nèi)存池的作用就是一次性申請一篇空間,當(dāng)需要擴容時就向內(nèi)存池要,可以提高擴容的效率。
既然這樣,我們也可以如此理解進程池,一次性創(chuàng)建一批進程,如果有任務(wù)要執(zhí)行就交給進程池中空閑的進程來做,而不是一有任務(wù)就創(chuàng)建一個新的進程,進程池的目的也是為了提供效率,節(jié)省創(chuàng)建進程的時間消耗。
通過預(yù)先創(chuàng)建和復(fù)用進程,進程池能夠提高任務(wù)執(zhí)行效率,避免頻繁創(chuàng)建和銷毀進程帶來的系統(tǒng)開銷。
2.進程池的工作原理
進程池的核心思想是創(chuàng)建固定數(shù)量的進程,然后將需要執(zhí)行的任務(wù)分配給這些進程來處理。當(dāng)某個任務(wù)完成后,該進程可以繼續(xù)處理下一個任務(wù),而不是銷毀。這樣可以減少頻繁創(chuàng)建和銷毀進程帶來的資源浪費。
2.1 進程池的工作流程
- 初始化:預(yù)處理創(chuàng)建一定數(shù)量的進程,形成進程池。
- 任務(wù)分配:當(dāng)有任務(wù)需要處理時,將任務(wù)分配給某個空閑進程。
- 任務(wù)處理:空閑進程接受任務(wù)并執(zhí)行。
- 復(fù)用進程:任務(wù)執(zhí)行完成后,進程回到池中,等待新的任務(wù)。
- 退出:當(dāng)沒有新的任務(wù)且需要關(guān)閉進程池,池中進程將逐個退出。
3. 進程池的實現(xiàn)(重點)
本文將著重講解進程池的實現(xiàn)步驟。
初始化
通過運行可執(zhí)行文件時傳入的參數(shù)來創(chuàng)建一定數(shù)量的子進程。
如:創(chuàng)建5個子進程
./a.out 5
代碼實現(xiàn):
enum
{
ArgcError = 1,
ArgvError,
PipeError
};
void Usage(const char* tip)
{
cout<<"Usage:"<<tip<<" sub_process_num"<<endl;
}
int main(int argc,char* argv[])
{
if(argc != 2)
{//格式不對
Usage(argv[0]);
return ArgcError;
}
int sub_process_number = stoi(argv[1]);
if(sub_process_number<=0)
{//子進程數(shù)量不能小于1
return ArgvError;
}
//創(chuàng)建子進程...
return 0;
}
現(xiàn)在我們來分析下,我們要實現(xiàn)進程池的功能:創(chuàng)建子進程,發(fā)送任務(wù)給子進程執(zhí)行,子進程的輪詢,殺死進程,等待進程,一些Debug功能。這樣的話我們完全可以創(chuàng)建一個類來封裝這些功能。除此之外,我們還需要描述一下子進程,為此也需要創(chuàng)建一個描述管道的類。
3.1 Channel類
Channel類的功能主要是來描述管道的,具有的屬性有該管道對應(yīng)的子進程的id,名字,寫端描述符。
Channel類的實現(xiàn)比較簡單,直接看代碼吧:
class Channel
{
public:
Channel(int wfd,pid_t sub_process_id,string name)
: wfd_(wfd),sub_process_id_(sub_process_id),name_(name)
{}
void printDebug()
{
cout<<"name:"<<name_<<endl;
cout<<"wfd:"<<wfd_<<endl;
cout<<"pid:"<<sub_process_id_<<endl;
}
string getName()
{
return name_;
}
pid_t getPid()
{
return sub_process_id_;
}
int getWfd()
{
return wfd_;
}
void Close()
{
close(wfd_);
}
private:
int wfd_;
pid_t sub_process_id_;
string name_;
};
3.2 ProcessPool類
ProcessPool類的功能主要是來描述進程池的,具有的屬性有該管道對應(yīng)的子進程的數(shù)量,所有的管道。
類似于這樣:

ProcessPool類的框架:
class ProcessPool
{
public:
ProcessPool(int sub_process_num)
: sub_process_num_(sub_process_num)
{}
//...
private:
int sub_process_num_;
vector<Channel> channels;
};
3.2.1 創(chuàng)建子進程
因為我們需要創(chuàng)建指定數(shù)目的進程,用一個循環(huán)來寫就可以了。在循環(huán)中,父進程每次都會創(chuàng)建一個子進程出來,然后用管道于它們鏈接,注意因為是父進程給子進程分配任務(wù),所以需要把父進程的讀端關(guān)閉,子進程的寫端關(guān)閉。
初版:
int CreateProcess()
{
for(int i = 0;i<sub_process_num_;++i)
{
int pipefd[2];
int n = pipe(pipefd);
if(n == -1)
{
return PipeError;
}
pid_t id = fork();
if(id == 0)
{
//子進程,關(guān)閉寫端
close(pipefd[1]);
//work...
}
//父進程,關(guān)閉讀端
close(pipefd[0]);
string cname = "channel-"+to_string(i);
channels.push_back(Channel(pipefd[1],id,cname));
}
return 0;
}
為了讓子進程執(zhí)行相應(yīng)的任務(wù),我們還可以添加一個回調(diào)函數(shù)worker。worker函數(shù)主要作用是選擇要執(zhí)行的任務(wù),具體的任務(wù),我們還需要自己創(chuàng)建,為此還可以創(chuàng)建3個測試用的任務(wù),用一個函數(shù)指針數(shù)組去保存這些函數(shù)。
代碼如下:
typedef void(* work_t)(int);
typedef void(* task_t)(int,pid_t);
void PrintLog(int fd, pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}
void ReloadConf(int fd, pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}
void ConnectMysql(int fd, pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}
task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};
void worker(int fd)
{
while(true)
{
uint32_t command_code = 0;
ssize_t n = read(0,&command_code,sizeof(command_code));
if(n == sizeof(command_code))
{
if(command_code>=3)
{
continue;
}
tasks[command_code](fd,getpid());
}
else
{
cout<<"sub process:"<<getpid()<<"quit now..."<<endl;
break;
}
}
}
第二版:
第二版相對第一版,多了個回調(diào)函數(shù),這個回調(diào)函數(shù)可以讓我實現(xiàn)相對應(yīng)的工作。同時也多個重定向功能,把原本標(biāo)準(zhǔn)輸入的功能給到了pipefd[0],也就是說當(dāng)子進程去讀標(biāo)準(zhǔn)輸入內(nèi)的數(shù)據(jù)時,會去讀管道中的數(shù)據(jù)。
這是一個典型的標(biāo)準(zhǔn)輸入重定向操作,將管道的讀端作為當(dāng)前進程的輸入來源
int CreateProcess(work_t work)
{
//vector<int> fds;
for(int i = 0;i<sub_process_num_;++i)
{
int pipefd[2];
int n = pipe(pipefd);
if(n == -1)
{
return PipeError;
}
pid_t id = fork();
if(id == 0)
{
//子進程,關(guān)閉寫端
close(pipefd[1]);
dup2(pipefd[0],0);
work(pipefd[0]);
exit(0);
}
//父進程,關(guān)閉讀端
close(pipefd[0]);
string cname = "channel-"+to_string(i);
channels.push_back(Channel(pipefd[1],id,cname));
}
return 0;
}
其實該代碼中還存在bug,有個魔鬼細(xì)節(jié)存在!!!
第三版:
其實對于子進程來說,它的寫端并沒有全部關(guān)閉。下面我們來畫圖:
創(chuàng)建第一個管道,這個圖如果看過我講匿名管道的那篇的話,還是比較熟悉的。

現(xiàn)在我們來創(chuàng)建第二個管道,我們知道文件描述符的創(chuàng)建是遵循當(dāng)前沒有直接使用的最小的一個下標(biāo),作為新的文件描述符。所以呢,新創(chuàng)建的管道的pipefd[0]依舊是在先前的位置,可是寫端就不是了,原先的寫端并沒有被關(guān)閉,我們新管道創(chuàng)建的pipefd[1]會在其下方被創(chuàng)建。
然后要知道的是,子進程是由父進程創(chuàng)建的,它的各項數(shù)據(jù)是由父進程復(fù)制而來,也就會把上一個管道的寫端給復(fù)制過了,但是子進程可是關(guān)閉不了它的,因為它只能拿到新創(chuàng)建管道的寫端pipefd[1]的位置。具體情況如圖:

所以為了關(guān)閉子進程的所有寫端,我們需要用有個數(shù)組去保存父進程中的寫端,然后再子進程中把它們一一關(guān)閉。
代碼如下:
int CreateProcess(work_t work)
{
vector<int> fds;
for(int i = 0;i<sub_process_num_;++i)
{
int pipefd[2];
int n = pipe(pipefd);
if(n == -1)
{
return PipeError;
}
pid_t id = fork();
if(id == 0)
{
//子進程,關(guān)閉寫端
if(!fds.empty())
{
cout<<"close w fd:";
for(auto fd:fds)
{
close(fd);
cout<<fd<<" ";
}
cout<<endl;
}
close(pipefd[1]);
dup2(pipefd[0],0);
work(pipefd[0]);
exit(0);
}
//父進程,關(guān)閉讀端
close(pipefd[0]);
string cname = "channel-"+to_string(i);
channels.push_back(Channel(pipefd[1],id,cname));
fds.push_back(pipefd[1]);
}
return 0;
}
3.2.2 殺死所有進程
進程池也有不需要的時候,當(dāng)進程池不需要了,我們就要回收子進程了,怎么回收呢?當(dāng)然是進程等待了。
殺死子進程也就是等待子進程。
要注意的是別忘了關(guān)閉文件描述符
進程等待是必須的,不然的話子進程會變成僵尸進程的。
void KillAllProcess()
{
//在回收子進程前,我們需要把pipefd[1]全部關(guān)閉
for(auto&channel:channels)
{
channel.Close();
//關(guān)閉完文件描述符后,開始等待。等待進程需要子進程的pid,恰巧我們的Channel中存有子進程的pid
pid_t pid = channel.getPid();
pid_t rid = waitpid(pid,nullptr,0);
if(rid == pid)
{
//回收成功
cout<<"wait sub process:"<<pid<<" success..."<<endl;
}
cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl;
}
}
3.2.3 其他功能
因為這些功能都比較簡單就一塊講了吧。
子進程的輪詢,我能總不能讓一個子進程一直跑任務(wù)吧,為了合理利用子進程,我們可以設(shè)計也該輪詢函數(shù),讓子進程的任務(wù)分配"雨露均沾"。
int NextChannel()
{
static int next = 0;//static修飾的變量只會初始化一次。
int c = next;
next = (next+1)%sub_process_num_;
return c;
}
發(fā)送任務(wù)代碼:
void SendTaskCode(int index,uint32_t code)
{
cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl;
write(channels[index].getPid(), &code, sizeof(code));
}
debug:
void Debug()
{
for(auto&channel:channels)
{
channel.printDebug();
cout<<endl;
}
}
3.3 控制進程池
完成上面的功能就需要我們?nèi)タ刂七M程池的子進程了。
主要包括創(chuàng)建進程池,控制子進程,回收子進程。
void CtrlSubProcess(ProcessPool* processpool,int cnt)
{
while(cnt--)
{
//選擇一個進程和通道
int channel = processpool->NextChannel();
//選擇一個任務(wù)
uint32_t code = NextTask();
processpool->SendTaskCode(channel,code);
sleep(1);
}
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return ArgcError;
}
int sub_process_num = stoi(argv[1]);
if(sub_process_num<1)
{
return ArgvError;
}
srand((unsigned int)time(nullptr));//生成隨機種子
//創(chuàng)建進程池
ProcessPool* processpool = new ProcessPool(sub_process_num);
processpool->CreateProcess(worker);
processpool->Debug();
//sleep(2);
//控制子進程
CtrlSubProcess(processpool,10);
//sleep(2);
//回收子進程
processpool->KillAllProcess();
delete processpool;
return 0;
}
運行結(jié)果:

4. 完整代碼
///processpool.cc//
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include "bolg.hpp"
using namespace std;
enum
{
ArgcError = 1,
ArgvError,
PipeError
};
class Channel
{
public:
Channel(int wfd,pid_t sub_process_id,string name)
: wfd_(wfd),sub_process_id_(sub_process_id),name_(name)
{}
void printDebug()
{
cout<<"name:"<<name_<<endl;
cout<<"wfd:"<<wfd_<<endl;
cout<<"pid:"<<sub_process_id_<<endl;
}
string getName()
{
return name_;
}
pid_t getPid()
{
return sub_process_id_;
}
int getWfd()
{
return wfd_;
}
void Close()
{
close(wfd_);
}
private:
int wfd_;
pid_t sub_process_id_;
string name_;
};
class ProcessPool
{
public:
ProcessPool(int sub_process_num)
: sub_process_num_(sub_process_num)
{}
int CreateProcess(work_t work)
{
vector<int> fds;
for(int i = 0;i<sub_process_num_;++i)
{
int pipefd[2];
int n = pipe(pipefd);
if(n == -1)
{
return PipeError;
}
pid_t id = fork();
if(id == 0)
{
//子進程,關(guān)閉寫端
if(!fds.empty())
{
cout<<"close w fd:";
for(auto fd:fds)
{
close(fd);
cout<<fd<<" ";
}
cout<<endl;
}
close(pipefd[1]);
dup2(pipefd[0],0);
work(pipefd[0]);
exit(0);
}
//父進程,關(guān)閉讀端
close(pipefd[0]);
string cname = "channel-"+to_string(i);
channels.push_back(Channel(pipefd[1],id,cname));
fds.push_back(pipefd[1]);
}
return 0;
}
void KillAllProcess()
{
//在回收子進程前,我們需要把pipefd[1]全部關(guān)閉
for(auto&channel:channels)
{
channel.Close();
//關(guān)閉完文件描述符后,開始等待。等待進程需要子進程的pid,恰巧我們的Channel中存有子進程的pid
pid_t pid = channel.getPid();
pid_t rid = waitpid(pid,nullptr,0);
if(rid == pid)
{
//回收成功
cout<<"wait sub process:"<<pid<<" success..."<<endl;
}
cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl;
}
}
int NextChannel()
{
static int next = 0;
int c = next;
next = (next+1)%sub_process_num_;
return c;
}
void SendTaskCode(int index,uint32_t code)
{
cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl;
write(channels[index].getPid(), &code, sizeof(code));
}
void Debug()
{
for(auto&channel:channels)
{
channel.printDebug();
cout<<endl;
}
}
private:
int sub_process_num_;
vector<Channel> channels;
};
void Usage(const char* tip)
{
cout<<"Usage:"<<tip<<" sub_process_num"<<endl;
}
void CtrlSubProcess(ProcessPool* processpool,int cnt)
{
while(cnt--)
{
//選擇一個進程和通道
int channel = processpool->NextChannel();
//選擇一個任務(wù)
uint32_t code = NextTask();
processpool->SendTaskCode(channel,code);
sleep(1);
}
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return ArgcError;
}
int sub_process_num = stoi(argv[1]);
if(sub_process_num<1)
{
return ArgvError;
}
srand((unsigned int)time(nullptr));
//創(chuàng)建進程池
ProcessPool* processpool = new ProcessPool(sub_process_num);
processpool->CreateProcess(worker);
processpool->Debug();
//sleep(2);
//控制子進程
CtrlSubProcess(processpool,10);
//sleep(2);
//回收子進程
processpool->KillAllProcess();
delete processpool;
return 0;
}
///task.hpp
#include <iostream>
#include <vector>
#include <cstdlib>
#include <unistd.h>
using namespace std;
//創(chuàng)建函數(shù)指針
typedef void(* work_t)(int);
typedef void(* task_t)(int,pid_t);
void PrintLog(int fd, pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}
void ReloadConf(int fd, pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}
void ConnectMysql(int fd, pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}
uint32_t NextTask()
{
return rand()%3;
}
task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};
void worker(int fd)
{
while(true)
{
uint32_t command_code = 0;
ssize_t n = read(0,&command_code,sizeof(command_code));
if(n == sizeof(command_code))
{
if(command_code>=3)
{
continue;
}
tasks[command_code](fd,getpid());
}
else
{
cout<<"sub process:"<<getpid()<<"quit now..."<<endl;
break;
}
}
}
5. 總結(jié)
進程池的核心思想是創(chuàng)建固定數(shù)量的進程,然后將需要執(zhí)行的任務(wù)分配給這些進程來處理。當(dāng)某個任務(wù)完成后,該進程可以繼續(xù)處理下一個任務(wù),而不是銷毀。這樣可以減少頻繁創(chuàng)建和銷毀進程帶來的資源浪費。
以上就是Linux進程池實現(xiàn)的詳細(xì)指南的詳細(xì)內(nèi)容,更多關(guān)于Linux進程池實現(xiàn)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
vscode遠(yuǎn)程開發(fā)使用SSH遠(yuǎn)程連接服務(wù)器的方法「內(nèi)網(wǎng)穿透」
這篇文章主要介紹了vscode遠(yuǎn)程開發(fā)使用SSH遠(yuǎn)程連接服務(wù)器?「內(nèi)網(wǎng)穿透」,通過本文學(xué)習(xí)我們將通過vscode實現(xiàn)遠(yuǎn)程開發(fā),并做內(nèi)網(wǎng)穿透實現(xiàn)在公網(wǎng)環(huán)境下的遠(yuǎn)程連接,在外任意地方也可以遠(yuǎn)程連接服務(wù)器進行開發(fā)寫代碼,需要的朋友可以參考下2023-02-02
Apache Flink 任意 Jar 包上傳導(dǎo)致遠(yuǎn)程代碼執(zhí)行漏洞復(fù)現(xiàn)問題(漏洞預(yù)警)
這篇文章主要介紹了Apache Flink 任意 Jar 包上傳導(dǎo)致遠(yuǎn)程代碼執(zhí)行漏洞復(fù)現(xiàn)問題,本文給出了修復(fù)建議和解決方案,需要的朋友可以參考下2019-11-11
SSH遠(yuǎn)程登錄和端口轉(zhuǎn)發(fā)詳解
這篇文章主要介紹了關(guān)于SSH遠(yuǎn)程登錄和端口轉(zhuǎn)發(fā)的相關(guān)資料,文中介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-03-03
Linux下用netstat查看網(wǎng)絡(luò)狀態(tài)、端口狀態(tài)
這篇文章主要介紹了Linux下用netstat查看網(wǎng)絡(luò)狀態(tài)、端口狀態(tài)的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-12-12
linux輸入yum后提示: -bash: /usr/bin/yum: No such file or director
在本篇文章里小編給大家整理的是關(guān)于linux輸入yum后提示: -bash: /usr/bin/yum: No such file or directory的解決方法,有需要的朋友們參考下。2019-11-11

