詳解通過源碼解析Node.js中cluster模塊的主要功能實現(xiàn)
眾所周知,Node.js中的JavaScript代碼執(zhí)行在單線程中,非常脆弱,一旦出現(xiàn)了未捕獲的異常,那么整個應用就會崩潰。這在許多場景下,尤其是web應用中,是無法忍受的。通常的解決方案,便是使用Node.js中自帶的cluster模塊,以master-worker模式啟動多個應用實例。然而大家在享受cluster模塊帶來的福祉的同時,不少人也開始好奇:
- 為什么我的應用代碼中明明有app.listen(port);,但cluter模塊在多次fork這份代碼時,卻沒有報端口已被占用?
- Master是如何將接收的請求傳遞至worker中進行處理然后響應的?
讓我們從Node.js項目的lib/cluster.js中的代碼里,來一勘究竟。
問題一
為了得到這個問題的解答,我們先從worker進程的初始化看起,master進程在fork工作進程時,會為其附上環(huán)境變量NODE_UNIQUE_ID,是一個從零開始的遞增數(shù):
// lib/cluster.js
// ...
function createWorkerProcess(id, env) {
// ...
workerEnv.NODE_UNIQUE_ID = '' + id;
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
env: workerEnv,
silent: cluster.settings.silent,
execArgv: execArgv,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
隨后Node.js在初始化時,會根據(jù)該環(huán)境變量,來判斷該進程是否為cluster模塊fork出的工作進程,若是,則執(zhí)行workerInit()函數(shù)來初始化環(huán)境,否則執(zhí)行masterInit()函數(shù)。
在workerInit()函數(shù)中,定義了cluster._getServer方法,這個方法在任何net.Server實例的listen方法中,會被調(diào)用:
// lib/net.js
// ...
function listen(self, address, port, addressType, backlog, fd, exclusive) {
exclusive = !!exclusive;
if (!cluster) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
self._listen2(address, port, addressType, backlog, fd);
return;
}
cluster._getServer(self, {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
}, cb);
function cb(err, handle) {
// ...
self._handle = handle;
self._listen2(address, port, addressType, backlog, fd);
}
}
你可能已經(jīng)猜到,問題一的答案,就在這個cluster._getServer函數(shù)的代碼中。它主要干了兩件事:
- 向master進程注冊該worker,若master進程是第一次接收到監(jiān)聽此端口/描述符下的worker,則起一個內(nèi)部TCP服務器,來承擔監(jiān)聽該端口/描述符的職責,隨后在master中記錄下該worker。
- Hack掉worker進程中的net.Server實例的listen方法里監(jiān)聽端口/描述符的部分,使其不再承擔該職責。
對于第一件事,由于master在接收,傳遞請求給worker時,會符合一定的負載均衡規(guī)則(在非Windows平臺下默認為輪詢),這些邏輯被封裝在RoundRobinHandle類中。故,初始化內(nèi)部TCP服務器等操作也在此處:
// lib/cluster.js
// ...
function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
// ...
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd: fd });
else if (port >= 0)
this.server.listen(port, address);
else
this.server.listen(address); // UNIX socket path.
/// ...
}
對于第二件事,由于net.Server實例的listen方法,最終會調(diào)用自身_handle屬性下listen方法來完成監(jiān)聽動作,故在代碼中修改之:
// lib/cluster.js
// ...
function rr(message, cb) {
// ...
// 此處的listen函數(shù)不再做任何監(jiān)聽動作
function listen(backlog) {
return 0;
}
function close() {
// ...
}
function ref() {}
function unref() {}
var handle = {
close: close,
listen: listen,
ref: ref,
unref: unref,
};
// ...
handles[key] = handle;
cb(0, handle); // 傳入這個cb中的handle將會被賦值給net.Server實例中的_handle屬性
}
// lib/net.js
// ...
function listen(self, address, port, addressType, backlog, fd, exclusive) {
// ...
if (cluster.isMaster || exclusive) {
self._listen2(address, port, addressType, backlog, fd);
return; // 僅在worker環(huán)境下改變
}
cluster._getServer(self, {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
}, cb);
function cb(err, handle) {
// ...
self._handle = handle;
// ...
}
}
至此,第一個問題便已豁然開朗了,總結下:
- 端口僅由master進程中的內(nèi)部TCP服務器監(jiān)聽了一次。
- 不會出現(xiàn)端口被重復監(jiān)聽報錯,是由于,worker進程中,最后執(zhí)行監(jiān)聽端口操作的方法,已被cluster模塊主動hack。
問題二
解決了問題一,問題二的解決就明朗輕松許多了。通過問題一我們已得知,監(jiān)聽端口的是master進程中創(chuàng)建的內(nèi)部TCP服務器,所以第二個問題的解決,著手點就是該內(nèi)部TCP服務器接手連接時,執(zhí)行的操作。Cluster模塊的做法是,監(jiān)聽該內(nèi)部TCP服務器的connection事件,在監(jiān)聽器函數(shù)里,有負載均衡地挑選出一個worker,向其發(fā)送newconn內(nèi)部消息(消息體對象中包含cmd: 'NODE_CLUSTER'屬性)以及一個客戶端句柄(即connection事件處理函數(shù)的第二個參數(shù)),相關代碼如下:
// lib/cluster.js
// ...
function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
// ...
this.server = net.createServer(assert.fail);
// ...
var self = this;
this.server.once('listening', function() {
// ...
self.handle.onconnection = self.distribute.bind(self);
});
}
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
var worker = this.free.shift();
if (worker) this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
// ...
var message = { act: 'newconn', key: this.key };
var self = this;
sendHelper(worker.process, message, handle, function(reply) {
// ...
});
};
Worker進程在接收到了newconn內(nèi)部消息后,根據(jù)傳遞過來的句柄,調(diào)用實際的業(yè)務邏輯處理并返回:
// lib/cluster.js
// ...
// 該方法會在Node.js初始化時由 src/node.js 調(diào)用
cluster._setupWorker = function() {
// ...
process.on('internalMessage', internal(worker, onmessage));
// ...
function onmessage(message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
// ...
}
};
function onconnection(message, handle) {
// ...
var accepted = server !== undefined;
// ...
if (accepted) server.onconnection(0, handle);
}
至此,問題二也得到了解決,也總結一下:
- 所有請求先同一經(jīng)過內(nèi)部TCP服務器。
- 在內(nèi)部TCP服務器的請求處理邏輯中,有負載均衡地挑選出一個worker進程,將其發(fā)送一個newconn內(nèi)部消息,隨消息發(fā)送客戶端句柄。
- Worker進程接收到此內(nèi)部消息,根據(jù)客戶端句柄創(chuàng)建net.Socket實例,執(zhí)行具體業(yè)務邏輯,返回。
最后
Node.js中的cluster模塊除了上述提到的功能外,其實還提供了非常豐富的API供master和worker進程之前通信,對于不同的操作系統(tǒng)平臺,也提供了不同的默認行為。本文僅挑選了一條功能線進行了分析闡述。如果大家有閑,非常推薦完整領略一下cluster模塊的代碼實現(xiàn)。
參考:
https://github.com/nodejs/node/blob/master/lib/cluster.js
https://github.com/nodejs/node/blob/master/lib/net.js
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
node.js中的console.timeEnd方法使用說明
這篇文章主要介紹了node.js中的console.timeEnd方法使用說明,本文介紹了console.timeEnd的方法說明、語法、使用實例和實現(xiàn)源碼,需要的朋友可以參考下2014-12-12
Node.js微信 access_token ( jsapi_ticket ) 存取與刷新的示例
本篇文章主要介紹了Node.js微信 access_token ( jsapi_ticket ) 存取與刷新的示例,具有一定的參考價值,有興趣的可以了解一一下‘2017-09-09
nodejs中內(nèi)置模塊fs,path常見的用法說明
這篇文章主要介紹了nodejs中內(nèi)置模塊fs,path常見的用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11
nodejs中簡單實現(xiàn)Javascript Promise機制的實例
這篇文章主要介紹了nodejs中簡單實現(xiàn)Javascript Promise機制的實例,本文在nodejs中簡單實現(xiàn)一個promise/A 規(guī)范,需要的朋友可以參考下2014-12-12
node.js?express和koa中間件機制和錯誤處理機制
這篇文章主要介紹了node.js?express和koa中間件機制和錯誤處理機制,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下2022-07-07

