Node.js高級編程使用RPC通信示例詳解
前言
在構(gòu)建微服務(wù)時,為了追求極致的效率,服務(wù)間一般會使用 RPC(Remote Procedure Call)來進行通信。本文通過 Node.js 來實踐一下。
Node.js 樸素 RPC
首先我們來構(gòu)建一下 server:
// server.js
const net = require('net')
const {msgBuffer} = require('../utils')
const server = net.createServer((clientSocket) => {
clientSocket.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const message = JSON.parse(msgBuffer.handleData())
clientSocket.write(
JSON.stringify(fnMap[message.cmd].apply(null, message.params)) + '\n'
)
}
})
})
server.listen(9999, () => console.log('Listening on 9999'))
const fnMap = {
add: (...args) => {
let s = 0
for (let i = 0; i < args.length; i++) {
s += args[i]
}
return s
},
multiply: (...args) => {
let p = 1
for (let i = 0; i < args.length; i++) {
p *= args[i]
}
return p
},
}
// MessageBuffer
class MessageBuffer {
constructor(delimiter) {
this.delimiter = delimiter
this.buffer = ''
}
isFinished() {
if (
this.buffer.length === 0 ||
this.buffer.indexOf(this.delimiter) === -1
) {
return true
}
return false
}
push(data) {
this.buffer += data
}
getMessage() {
const delimiterIndex = this.buffer.indexOf(this.delimiter)
if (delimiterIndex !== -1) {
const message = this.buffer.slice(0, delimiterIndex)
this.buffer = this.buffer.replace(message + this.delimiter, '')
return message
}
return null
}
handleData() {
const message = this.getMessage()
return message
}
}
exports.msgBuffer = new MessageBuffer('\n')
我們新建了一個 TCP 的服務(wù),并監(jiān)聽來自客戶端的數(shù)據(jù),注意這里我們通過一個 MessageBuffer 類來對數(shù)據(jù)進行解析(至于為什么這么做可參考考文末補充內(nèi)容:關(guān)于 TCP “粘包”問題說明),將 TCP 數(shù)據(jù)流解析成我們的消息體。然后調(diào)用服務(wù)端預(yù)先配置好的方法,最后將返回值返回給客戶端。
客戶端相對比較簡單,將函數(shù)調(diào)用相關(guān)數(shù)據(jù)按照事先規(guī)定好的格式發(fā)送給服務(wù)端即可:
const net = require('net')
const {msgBuffer} = require('../utils')
const client = net.connect({port: 9999}, () => {
client.write(JSON.stringify({cmd: 'add', params: [1, 2, 3]}) + '\n')
client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}) + '\n')
})
client.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const message = JSON.parse(msgBuffer.handleData())
console.log(message)
}
})
這樣,一個非常簡單的 RPC 雛形就出來了,不過目前這種方式還不是 RPC。所謂的 RPC,就是客戶端必須像調(diào)用本地方法一樣來調(diào)用遠端的方法,而不是還需要自己組裝消息體,并監(jiān)聽事件獲取返回值。理想中的方式應(yīng)該像這樣:
const result = await client.add(1, 2, 3)
我們來改造一下。首先,我們定義一份配置文件,用來描述我們的 services:
// services/index.js
class Calculator {
add(arr) {
let s = 0
for (let i = 0; i < arr.length; i++) {
s += arr[i]
}
return s
}
multiply(arr) {
let p = 1
for (let i = 0; i < arr.length; i++) {
p *= arr[i]
}
return p
}
}
module.exports = {
calculator: {
cls: Calculator,
methods: {
add: {
params: [{type: 'number[]', optional: false}],
return: {
type: 'number',
},
},
multiply: {
params: [{type: 'number[]', optional: false}],
return: {
type: 'number',
},
},
},
},
}
services 描述文件中包含了類以及它擁有的方法,方法參數(shù)(類型,是否可選),返回值類型等信息。為了簡單一點,我們先不校驗參數(shù)和返回值的類型。
然后就是我們的 server:
const net = require('net')
const {msgBuffer} = require('../utils')
const services = require('../services')
class Server {
constructor(services) {
this.tcpServer = net.createServer((clientSocket) => {
const serviceMap = this.createServiceMap(services)
clientSocket.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const {seqId, service, method, params} = JSON.parse(
msgBuffer.handleData()
)
clientSocket.write(
JSON.stringify({
seqId,
result: serviceMap[service][method].apply(null, params),
}) + '\n'
)
}
})
})
}
createServiceMap(services) {
const serviceMap = {}
Object.keys(services).forEach((serviceKey) => {
serviceMap[serviceKey] = new services[serviceKey].cls()
})
return serviceMap
}
listen(...args) {
this.tcpServer.listen(...args)
}
}
new Server(services).listen(9999)
server 中會監(jiān)聽 client 的連接,一旦有 client 進來,就根據(jù) services 配置文件為其實例化所有 services。之后開始接受 client 的數(shù)據(jù),并根據(jù) client 的消息調(diào)用相應(yīng)的 service 中的方法,并返回結(jié)果。
注意到消息體中有個 seqId,用來標識包的序號,必須將其返回給 client,這樣 client 才能知道返回的結(jié)果是跟哪個請求對應(yīng)的。
最后就是我們的 client:
const net = require('net')
const EventEmitter = require('events')
const {msgBuffer} = require('../utils')
const services = require('../services')
class Client {
constructor({port, services}) {
this.rspResolve = {}
this.seqId = 0
this.port = port
this.parseServices(services)
}
init() {
return new Promise((resolve, reject) => {
this.client = net.connect({port: this.port}, () => {
resolve()
})
this.client.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const {seqId, result} = JSON.parse(msgBuffer.handleData())
this.rspResolve[seqId](result)
}
})
})
}
parseServices(services) {
for (const serviceKey in services) {
const service = services[serviceKey]
this[serviceKey] = {}
for (const method in service.methods) {
this[serviceKey][method] = (...params) => {
this.client.write(
JSON.stringify({
seqId: this.seqId,
service: serviceKey,
method,
params,
}) + '\n'
)
return new Promise((resolve, reject) => {
this.rspResolve[this.seqId++] = resolve
})
}
}
}
}
}
const client = new Client({port: 9999, services})
client.init().then(async () => {
console.log(await client.calculator.add([1, 2, 3, 4, 5]))
console.log(await client.calculator.multiply([1, 2, 3, 4, 5]))
})
初始化一個 client 時,會解析 services,并在當前 client 實例上添加 services 的方法。方法中會將函數(shù)調(diào)用封裝成消息發(fā)送給服務(wù)端并返回 Promise 對象,同時將 Promise 對象的 resolve 方法緩存在 resResolve 這個 Map 中,此時 Promise 對象還處于 pending 狀態(tài)。
當 server 返回相應(yīng)的 seqId 的結(jié)果時,resResolve 中對應(yīng)的 resolve 方法會調(diào)用,從而將 Promise 對象狀態(tài)設(shè)為 fulfilled,此時 client 則可以獲取到結(jié)果。
這樣我們就實現(xiàn)了一個非常樸素的 RPC 框架。接下來我們簡單看看業(yè)界常用的 RPC 框架是怎么做的吧,這里以 Thrift 為例。
Thrift RPC Demo
我們先準備一個 calculator.thrift 文件,用來描述 service:
service Calculator {
i32 add(1:list<i32> arr),
i32 multiply(1:list<i32> arr)
}
由于 thrift 文件是語言無關(guān)的,所以我們需要通過它生成對應(yīng) Calculator.js 文件:
thrift -r --gen js:node calculator.thrift
這個文件包含 server 端和 client 相關(guān)的代碼,在 client 端負責將函數(shù)調(diào)用轉(zhuǎn)為消息發(fā)送給 server,在 server 端負責讀取消息,調(diào)用方法,返回結(jié)果給 client。
然后 server 和 client 分別按照如下方式進行使用即可:
// server.js
var thrift = require('thrift')
var Calculator = require('./gen-nodejs/Calculator')
var server = thrift.createServer(Calculator, {
add(arr, result) {
let s = 0
for (let i = 0; i < arr.length; i++) {
s += arr[i]
}
result(null, s)
},
multiply(arr, result) {
let p = 1
for (let i = 0; i < arr.length; i++) {
p *= arr[i]
}
result(p)
},
})
server.listen(9090)
// client.js
var thrift = require('thrift')
var Calculator = require('./gen-nodejs/Calculator')
var transport = thrift.TBufferedTransport
var protocol = thrift.TBinaryProtocol
var connection = thrift.createConnection('localhost', 9090, {
transport: transport,
protocol: protocol,
})
var client = thrift.createClient(Calculator, connection)
client.add([1, 2], function (err, response) {
console.log(response)
})
下面,我們通過 Wireshark 來看看 thrift 通信的過程。
打開 Wireshark,選擇 Capturing from Loopback: lo0,然后在 filter 中輸入 tcp.port == 9090。分別運行上面的 server 和 client,則可抓包到如下內(nèi)容:

我們先來看看第五行,可以看到 Wireshark 自動識別了 thrift 協(xié)議,并解析出這是一個 CALL 類型的消息,調(diào)用的方法為 add。接下來我們再仔細看看 thrift 協(xié)議:

thrift 協(xié)議格式如上圖所示,這里是一個參數(shù)的場景,如果有多個參數(shù)的話則可以在 Data -> List 后面繼續(xù)添加,比如我們給 add 方法增加第二個參數(shù),表示是否打印日志:
i32 add(1:list<i32> arr, 2:bool printLog)
抓包得到的內(nèi)容如下:

返回的消息格式也類似,這里就不贅述了。
關(guān)于 RPC 的內(nèi)容就先介紹到這,后面計劃基于 Nest.js 再實戰(zhàn)一下。
補充內(nèi)容
關(guān)于 TCP “粘包”問題說明
首先聲明一下,所謂的 TCP “粘包問題”其實并不是一個問題。
先看一個簡單的例子:
// server.js
const net = require('net')
const server = net.createServer((clientSocket) => {
console.log('Client connected')
clientSocket.on('data', (data) => {
console.log('-------------------')
console.log(data.toString())
})
})
server.listen(9999, () => console.log('Listening on 9999'))
// client.js
const net = require('net')
const client = net.connect({port: 9999}, () => {
client.write(JSON.stringify({cmd: 'add', params: [1, 2]}))
client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}))
})
啟動 server 后再運行 client,則 server 有可能會打印如下日志:
-------------------
{"cmd":"add","params":[1,2]}{"cmd":"multiply","params":[1,2,3]}
如上所示,客戶端調(diào)用了兩次 write,但是服務(wù)端卻只打印了一次。也就是說,兩次發(fā)送的數(shù)據(jù)在服務(wù)端被一次性取出來了。即,使用方層面的兩個包“粘在”了一起。原因在于 TCP 是面向字節(jié)流的,并沒有包的概念,所以開發(fā)者需要對 data 事件獲取到的數(shù)據(jù)進行解析。
以上就是Node.js高級編程使用RPC通信示例詳解的詳細內(nèi)容,更多關(guān)于Node.js高級編程RPC通信的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
利用node.js+mongodb如何搭建一個簡單登錄注冊的功能詳解
這篇文章主要給大家介紹了關(guān)于利用node.js+mongodb如何搭建一個簡單登錄注冊功能的相關(guān)資料,文中通過示例代碼介紹非常詳細,對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-07-07
手把手教你VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境
最近在補數(shù)據(jù)結(jié)構(gòu),在用VScode調(diào)試js代碼文件結(jié)果怎么都不行,這篇文章主要給大家介紹了關(guān)于VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境的相關(guān)資料,需要的朋友可以參考下2022-12-12
Node發(fā)出HTTP POST請求的方法實例小結(jié)
這篇文章主要介紹了Node發(fā)出HTTP POST請求的方法,結(jié)合實例形式總結(jié)分析了三種常用的post請求操作方法,以及相關(guān)庫操作注意事項,需要的朋友可以參考下2023-05-05
node.js將MongoDB數(shù)據(jù)同步到MySQL的步驟
這篇文章主要給大家介紹了關(guān)于node.js將MongoDB數(shù)據(jù)同步到MySQL的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12
nodejs個人博客開發(fā)第五步 分配數(shù)據(jù)
這篇文章主要為大家詳細介紹了nodejs個人博客開發(fā)的分配數(shù)據(jù),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04
詳解Node.js項目APM監(jiān)控之New Relic
本篇文章主要介紹了Node.js項目APM監(jiān)控之New Relic,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05

