PHP網(wǎng)絡(luò)處理模塊FPM源碼分析
PHP-FPM源碼分析
一個請求從瀏覽器到達PHP腳本執(zhí)行中間有個必要模塊是網(wǎng)絡(luò)處理模塊,F(xiàn)PM是這個模塊的一部分,配合fastcgi協(xié)議實現(xiàn)對請求的從監(jiān)聽到轉(zhuǎn)發(fā)到PHP處理,并將結(jié)果返回這條流程。
FPM采用多進程模型,就是創(chuàng)建一個master進程,在master進程中創(chuàng)建并監(jiān)聽socket,然后fork多個子進程,然后子進程各自accept請求,子進程在啟動后阻塞在accept上,有請求到達后開始讀取請求 數(shù)據(jù),讀取完成后開始處理然后再返回,在這期間是不會接收其它請求的,也就是說fpm的子進程同時只能響應(yīng) 一個請求,只有把這個請求處理完成后才會accept下一個請求,這是一種同步阻塞的模型。master進程負責(zé)管理子進程,監(jiān)聽子進程的狀態(tài),控制子進程的數(shù)量。master進程與worker進程之間通過共享變量同步信息。
從main函數(shù)開始
int main(int argc, char *argv[])
{
zend_signal_startup();
// 將全局變量sapi_module設(shè)置為cgi_sapi_module
sapi_startup(&cgi_sapi_module);
fcgi_init();
// 獲取命令行參數(shù),其中php-fpm -D、-i等參數(shù)都是在這里被解析出來的
// ...
cgi_sapi_module.startup(&cgi_sapi_module);
fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix, fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon, force_stderr);
// master進程會在這一步死循環(huán),后面的流程都是子進程在執(zhí)行。
fcgi_fd = fpm_run(&max_requests);
fcgi_fd = fpm_run(&max_requests);
request = fpm_init_request(fcgi_fd);
// accept請求
// ....
}main()函數(shù)展現(xiàn)了這個fpm運行完整的框架,可見整個fpm主要分為三個部分:
- 1、運行前的fpm_init();
- 2、運行函數(shù)fpm_run();
- 3、子進程accept請求處理。
FPM中的事件監(jiān)聽機制
在詳細了解fpm工作過程前,我們要先了解fpm中的事件機制。在fpm中事件的監(jiān)聽默認使用kqueue來實現(xiàn),關(guān)于kqueue的介紹可以看看我之前整理的這篇文章kqueue用法簡介。
// fpm中的事件結(jié)構(gòu)體
struct fpm_event_s {
// 事件的句柄
int fd;
// 下一次觸發(fā)的事件
struct timeval timeout;
// 頻率:多久執(zhí)行一次
struct timeval frequency;
// 事件觸發(fā)時調(diào)用的函數(shù)
void (*callback)(struct fpm_event_s *, short, void *);
void *arg; // 調(diào)用callback時的參數(shù)
// FPM_EV_READ:讀;FPM_EV_TIMEOUT:;FPM_EV_PERSIST:;FPM_EV_EDGE:;
int flags;
int index; // 在fd句柄數(shù)組中的索引
// 事件的類型 FPM_EV_READ:讀;FPM_EV_TIMEOUT:計時器;FPM_EV_PERSIST:;FPM_EV_EDGE:;
short which;
};
// 事件隊列
typedef struct fpm_event_queue_s {
struct fpm_event_queue_s *prev;
struct fpm_event_queue_s *next;
struct fpm_event_s *ev;
} fpm_event_queue;以fpm_run()中master進程注冊的一個sp[0]的可讀事件為例:
void fpm_event_loop(int err)
{
static struct fpm_event_s signal_fd_event;
// 創(chuàng)建一個事件:管道sp[0]可讀時觸發(fā)
fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
// 將事件添加進queue
fpm_event_add(&signal_fd_event, 0);
// 處理定時器等邏輯
// 以阻塞的方式獲取事件
// module->wait()是一個接口定義的方法簽名,下面展示kqueue的實現(xiàn)
ret = module->wait(fpm_event_queue_fd, timeout);
}
int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency)
{
// ...
// 如果事件是觸發(fā)事件則之間添加進queue中
// 對于定時器事件先根據(jù)事件的frequency設(shè)置事件的觸發(fā)頻率和下一次觸發(fā)的事件
if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) {
return -1;
}
return 0;
}
static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev)
{
// ...
// 構(gòu)建并將當(dāng)前事件插入事件隊列queue中
if (*queue == fpm_event_queue_fd && module->add) {
// module->add(ev)是一個接口定義的方法簽名,下面展示kqueue的實現(xiàn)
module->add(ev);
}
return 0;
}
// kqueue關(guān)于添加事件到kqueue的實現(xiàn)
static int fpm_event_kqueue_add(struct fpm_event_s *ev) /* {{{ */
{
struct kevent k;
int flags = EV_ADD;
if (ev->flags & FPM_EV_EDGE) {
flags = flags | EV_CLEAR;
}
EV_SET(&k, ev->fd, EVFILT_READ, flags, 0, 0, (void *)ev);
if (kevent(kfd, &k, 1, NULL, 0, NULL) < 0) {
zlog(ZLOG_ERROR, "kevent: unable to add event");
return -1;
}
/* mark the event as registered */
ev->index = ev->fd;
return 0;
}FPM中關(guān)于kqueue的實現(xiàn)
// kqueue關(guān)于從kqueue中監(jiān)聽事件的實現(xiàn)
static int fpm_event_kqueue_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */
{
struct timespec t;
int ret, i;
/* ensure we have a clean kevents before calling kevent() */
memset(kevents, 0, sizeof(struct kevent) * nkevents);
/* convert ms to timespec struct */
t.tv_sec = timeout / 1000;
t.tv_nsec = (timeout % 1000) * 1000 * 1000;
/* wait for incoming event or timeout */
ret = kevent(kfd, NULL, 0, kevents, nkevents, &t);
if (ret == -1) {
/* trigger error unless signal interrupt */
if (errno != EINTR) {
zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno);
return -1;
}
}
/* fire triggered events */
for (i = 0; i < ret; i++) {
if (kevents[i].udata) {
struct fpm_event_s *ev = (struct fpm_event_s *)kevents[i].udata;
fpm_event_fire(ev);
/* sanity check */
if (fpm_globals.parent_pid != getpid()) {
return -2;
}
}
}
return ret;
}fpm_init
fpm_init()負責(zé)啟動前的初始化工作,包括注冊各個模塊的銷毀時用于清理變量的callback。下面只介紹幾個重要的init。
fpm_conf_init_main
負責(zé)解析php-fpm.conf配置文件,分配worker pool內(nèi)存結(jié)構(gòu)并保存到全局變量fpm_worker_all_pools中,各worker pool配置解析到 fpm_worker_pool_s->config 中。
所謂worker pool 是fpm可以同時監(jiān)聽多個端口,每個端口對應(yīng)一個worker pool。
fpm_scoreboard_init_main
為每個worker pool分配一個fpm_scoreboard_s結(jié)構(gòu)的內(nèi)存空間scoreboard,用于記錄worker進程運行信息。
// fpm_scoreboard_s 結(jié)構(gòu)
struct fpm_scoreboard_s {
union {
atomic_t lock;
char dummy[16];
};
char pool[32];
int pm; // 進程的管理方式 static、dynamic、ondemand
time_t start_epoch;
int idle; // 空閑的worker進程數(shù)
int active; // 繁忙的worker進程數(shù)
int active_max; // 最大繁忙進程數(shù)
unsigned long int requests;
unsigned int max_children_reached;
int lq;
int lq_max;
unsigned int lq_len;
unsigned int nprocs;
int free_proc;
unsigned long int slow_rq;
struct fpm_scoreboard_proc_s *procs[];
};fpm_signals_init_main
fpm注冊自己的信號量,并設(shè)置監(jiān)聽函數(shù)的處理邏輯。
int fpm_signals_init_main() /* {{{ */
{
struct sigaction act;
// 創(chuàng)建一個全雙工套接字
// 全雙工的套接字是一個可以讀、寫的socket通道[0]和[1],每個進程固定一個管道。
// 寫數(shù)據(jù)時:管道不滿不會被阻塞;讀數(shù)據(jù)時:管道里沒有數(shù)據(jù)會阻塞(可設(shè)置)
// 向sp[0]寫入數(shù)據(jù)時,sp[0]的讀取將會被阻塞,sp[1]的寫管道會被阻塞,sp[1]中此時讀取sp[0]的數(shù)據(jù)
if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {
zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()");
return -1;
}
if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {
zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()");
return -1;
}
if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {
zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)");
return -1;
}
memset(&act, 0, sizeof(act));
act.sa_handler = sig_handler; // 監(jiān)聽到信號調(diào)用這個函數(shù)
sigfillset(&act.sa_mask);
if (0 > sigaction(SIGTERM, &act, 0) ||
0 > sigaction(SIGINT, &act, 0) ||
0 > sigaction(SIGUSR1, &act, 0) ||
0 > sigaction(SIGUSR2, &act, 0) ||
0 > sigaction(SIGCHLD, &act, 0) ||
0 > sigaction(SIGQUIT, &act, 0)) {
zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()");
return -1;
}
return 0;
}
// 所有信號共用同一個處理函數(shù)
static void sig_handler(int signo) /* {{{ */
{
static const char sig_chars[NSIG + 1] = {
[SIGTERM] = 'T',
[SIGINT] = 'I',
[SIGUSR1] = '1',
[SIGUSR2] = '2',
[SIGQUIT] = 'Q',
[SIGCHLD] = 'C'
};
char s;
int saved_errno;
if (fpm_globals.parent_pid != getpid()) {
return;
}
saved_errno = errno;
s = sig_chars[signo];
zend_quiet_write(sp[1], &s, sizeof(s)); // 將信息對應(yīng)的字節(jié)寫進管道sp[1]端,此時sp[1]端的讀數(shù)據(jù)會阻塞;數(shù)據(jù)可以從sp[0]端讀取
errno = saved_errno;
}fpm_sockets_init_main
每個worker pool 開啟一個socket套接字。
fpm_event_init_main
這里啟動master的事件管理器。用于管理IO、定時事件,其中IO事件通過kqueue、epoll、 poll、select等管理,定時事件就是定時器,一定時間后觸發(fā)某個事件。同樣,我們以kqueue的實現(xiàn)為例看下源碼。
int fpm_event_init_main()
{
// ...
if (module->init(max) < 0) {
zlog(ZLOG_ERROR, "Unable to initialize the event module %s", module->name);
return -1;
}
// ...
}
// max用于指定kqueue事件數(shù)組的大小
static int fpm_event_kqueue_init(int max) /* {{{ */
{
if (max < 1) {
return 0;
}
kfd = kqueue();
if (kfd < 0) {
zlog(ZLOG_ERROR, "kqueue: unable to initialize");
return -1;
}
kevents = malloc(sizeof(struct kevent) * max);
if (!kevents) {
zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max);
return -1;
}
memset(kevents, 0, sizeof(struct kevent) * max);
nkevents = max;
return 0;
}fpm_run
fpm_init到此結(jié)束,下面進入fpm_run階段,在這個階段master進程會根據(jù)配置fork出多個子進程然后master進程會進入fpm_event_loop(0)函數(shù),并在這個函數(shù)內(nèi)部死循環(huán),也就是說master進程將不再執(zhí)行后面的代碼,后面的邏輯全部是子進程執(zhí)行的操作。
master進程在fpm_event_loop里通過管道sp來監(jiān)聽子進程的各個事件,同時也要處理自身產(chǎn)生的一些事件、定時器等任務(wù),來響應(yīng)的管理子進程。內(nèi)部的邏輯在介紹事件監(jiān)聽機制時已經(jīng)詳細說過。
int fpm_run(int *max_requests) /* {{{ */
{
struct fpm_worker_pool_s *wp;
/* create initial children in all pools */
for (wp = fpm_worker_all_pools; wp; wp = wp->next) {
int is_parent;
is_parent = fpm_children_create_initial(wp);
if (!is_parent) {
goto run_child;
}
}
/* run event loop forever */
fpm_event_loop(0);
run_child: /* only workers reach this point */
fpm_cleanups_run(FPM_CLEANUP_CHILD);
*max_requests = fpm_globals.max_requests;
return fpm_globals.listening_socket;
}子進程處理請求
回到main函數(shù),fpm_run后面的邏輯都是子進程在運行。首先會初始化一個fpm的request結(jié)構(gòu)的變量,然后子進程會阻塞在fcgi_accept_request(request)函數(shù)上等待請求。關(guān)于fcgi_accept_request函數(shù)就是死循環(huán)一個socket編程的accept函數(shù)來接收請求,并將請求數(shù)據(jù)全部取出。
...
// 初始化request
request = fpm_init_request(fcgi_fd);
zend_first_try {
// accept接收請求
while (EXPECTED(fcgi_accept_request(request) >= 0)) {
init_request_info();
fpm_request_info();
if (UNEXPECTED(php_request_startup() == FAILURE)) {
// ...
}
if (UNEXPECTED(fpm_status_handle_request())) {
goto fastcgi_request_done;
}
...
// 打開配置文件中DOCUMENT_ROOT設(shè)置的腳本
if (UNEXPECTED(php_fopen_primary_script(&file_handle) == FAILURE)) {
...
}
fpm_request_executing();
// 執(zhí)行腳本
php_execute_script(&file_handle);
...
}
// 銷毀請求request
fcgi_destroy_request(request);
// fcgi退出
fcgi_shutdown();
if (cgi_sapi_module.php_ini_path_override) {
free(cgi_sapi_module.php_ini_path_override);
}
if (cgi_sapi_module.ini_entries) {
free(cgi_sapi_module.ini_entries);
}
} zend_catch {
...
} zend_end_try();以上就是PHP網(wǎng)絡(luò)處理模塊FPM源碼分析的詳細內(nèi)容,更多關(guān)于PHP網(wǎng)絡(luò)處理模塊FPM的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
thinkphp5使用phpmail發(fā)送qq郵件的實現(xiàn)過程
這篇文章主要給大家介紹了thinkphp5使用phpmail發(fā)送qq郵件的實現(xiàn)過程,文中有詳細的代碼示例供大家參考,具有一定的參考價值,感興趣的小伙伴可以自己動手嘗試一下2023-10-10
在Mac OS上編譯安裝Nginx+PHP+MariaDB開發(fā)環(huán)境的教程
這篇文章主要介紹了在Mac OS上編譯安裝Nginx+PHP+MariaDB開發(fā)環(huán)境的教程,包括使用phpize安裝PHP擴展的方法,需要的朋友可以參考下2016-02-02
PHP得到某段時間區(qū)間的時間戳 php定時任務(wù)
linux 定時掃描,若有滿足條件的用戶,則發(fā)送短信2012-04-04
php根據(jù)操作系統(tǒng)轉(zhuǎn)換文件名大小寫的方法
這篇文章主要介紹了php根據(jù)操作系統(tǒng)轉(zhuǎn)換文件名大小寫的方法,需要的朋友可以參考下2014-02-02

