Node.js高級(jí)編程使用RPC通信示例詳解
前言
在構(gòu)建微服務(wù)時(shí),為了追求極致的效率,服務(wù)間一般會(huì)使用 RPC(Remote Procedure Call)來(lái)進(jìn)行通信。本文通過(guò) Node.js 來(lái)實(shí)踐一下。
Node.js 樸素 RPC
首先我們來(lái)構(gòu)建一下 server
:
// server.js const net = require('net') const {msgBuffer} = require('../utils') const server = net.createServer((clientSocket) => { clientSocket.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const message = JSON.parse(msgBuffer.handleData()) clientSocket.write( JSON.stringify(fnMap[message.cmd].apply(null, message.params)) + '\n' ) } }) }) server.listen(9999, () => console.log('Listening on 9999')) const fnMap = { add: (...args) => { let s = 0 for (let i = 0; i < args.length; i++) { s += args[i] } return s }, multiply: (...args) => { let p = 1 for (let i = 0; i < args.length; i++) { p *= args[i] } return p }, } // MessageBuffer class MessageBuffer { constructor(delimiter) { this.delimiter = delimiter this.buffer = '' } isFinished() { if ( this.buffer.length === 0 || this.buffer.indexOf(this.delimiter) === -1 ) { return true } return false } push(data) { this.buffer += data } getMessage() { const delimiterIndex = this.buffer.indexOf(this.delimiter) if (delimiterIndex !== -1) { const message = this.buffer.slice(0, delimiterIndex) this.buffer = this.buffer.replace(message + this.delimiter, '') return message } return null } handleData() { const message = this.getMessage() return message } } exports.msgBuffer = new MessageBuffer('\n')
我們新建了一個(gè) TCP 的服務(wù),并監(jiān)聽(tīng)來(lái)自客戶端的數(shù)據(jù),注意這里我們通過(guò)一個(gè) MessageBuffer
類(lèi)來(lái)對(duì)數(shù)據(jù)進(jìn)行解析(至于為什么這么做可參考考文末補(bǔ)充內(nèi)容:關(guān)于 TCP “粘包”問(wèn)題說(shuō)明),將 TCP 數(shù)據(jù)流解析成我們的消息體。然后調(diào)用服務(wù)端預(yù)先配置好的方法,最后將返回值返回給客戶端。
客戶端相對(duì)比較簡(jiǎn)單,將函數(shù)調(diào)用相關(guān)數(shù)據(jù)按照事先規(guī)定好的格式發(fā)送給服務(wù)端即可:
const net = require('net') const {msgBuffer} = require('../utils') const client = net.connect({port: 9999}, () => { client.write(JSON.stringify({cmd: 'add', params: [1, 2, 3]}) + '\n') client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}) + '\n') }) client.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const message = JSON.parse(msgBuffer.handleData()) console.log(message) } })
這樣,一個(gè)非常簡(jiǎn)單的 RPC 雛形就出來(lái)了,不過(guò)目前這種方式還不是 RPC。所謂的 RPC,就是客戶端必須像調(diào)用本地方法一樣來(lái)調(diào)用遠(yuǎn)端的方法,而不是還需要自己組裝消息體,并監(jiān)聽(tīng)事件獲取返回值。理想中的方式應(yīng)該像這樣:
const result = await client.add(1, 2, 3)
我們來(lái)改造一下。首先,我們定義一份配置文件,用來(lái)描述我們的 services
:
// services/index.js class Calculator { add(arr) { let s = 0 for (let i = 0; i < arr.length; i++) { s += arr[i] } return s } multiply(arr) { let p = 1 for (let i = 0; i < arr.length; i++) { p *= arr[i] } return p } } module.exports = { calculator: { cls: Calculator, methods: { add: { params: [{type: 'number[]', optional: false}], return: { type: 'number', }, }, multiply: { params: [{type: 'number[]', optional: false}], return: { type: 'number', }, }, }, }, }
services
描述文件中包含了類(lèi)以及它擁有的方法,方法參數(shù)(類(lèi)型,是否可選),返回值類(lèi)型等信息。為了簡(jiǎn)單一點(diǎn),我們先不校驗(yàn)參數(shù)和返回值的類(lèi)型。
然后就是我們的 server
:
const net = require('net') const {msgBuffer} = require('../utils') const services = require('../services') class Server { constructor(services) { this.tcpServer = net.createServer((clientSocket) => { const serviceMap = this.createServiceMap(services) clientSocket.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const {seqId, service, method, params} = JSON.parse( msgBuffer.handleData() ) clientSocket.write( JSON.stringify({ seqId, result: serviceMap[service][method].apply(null, params), }) + '\n' ) } }) }) } createServiceMap(services) { const serviceMap = {} Object.keys(services).forEach((serviceKey) => { serviceMap[serviceKey] = new services[serviceKey].cls() }) return serviceMap } listen(...args) { this.tcpServer.listen(...args) } } new Server(services).listen(9999)
server
中會(huì)監(jiān)聽(tīng) client
的連接,一旦有 client
進(jìn)來(lái),就根據(jù) services
配置文件為其實(shí)例化所有 services
。之后開(kāi)始接受 client
的數(shù)據(jù),并根據(jù) client
的消息調(diào)用相應(yīng)的 service
中的方法,并返回結(jié)果。
注意到消息體中有個(gè) seqId
,用來(lái)標(biāo)識(shí)包的序號(hào),必須將其返回給 client
,這樣 client
才能知道返回的結(jié)果是跟哪個(gè)請(qǐng)求對(duì)應(yīng)的。
最后就是我們的 client
:
const net = require('net') const EventEmitter = require('events') const {msgBuffer} = require('../utils') const services = require('../services') class Client { constructor({port, services}) { this.rspResolve = {} this.seqId = 0 this.port = port this.parseServices(services) } init() { return new Promise((resolve, reject) => { this.client = net.connect({port: this.port}, () => { resolve() }) this.client.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const {seqId, result} = JSON.parse(msgBuffer.handleData()) this.rspResolve[seqId](result) } }) }) } parseServices(services) { for (const serviceKey in services) { const service = services[serviceKey] this[serviceKey] = {} for (const method in service.methods) { this[serviceKey][method] = (...params) => { this.client.write( JSON.stringify({ seqId: this.seqId, service: serviceKey, method, params, }) + '\n' ) return new Promise((resolve, reject) => { this.rspResolve[this.seqId++] = resolve }) } } } } } const client = new Client({port: 9999, services}) client.init().then(async () => { console.log(await client.calculator.add([1, 2, 3, 4, 5])) console.log(await client.calculator.multiply([1, 2, 3, 4, 5])) })
初始化一個(gè) client
時(shí),會(huì)解析 services
,并在當(dāng)前 client
實(shí)例上添加 services
的方法。方法中會(huì)將函數(shù)調(diào)用封裝成消息發(fā)送給服務(wù)端并返回 Promise
對(duì)象,同時(shí)將 Promise
對(duì)象的 resolve
方法緩存在 resResolve
這個(gè) Map
中,此時(shí) Promise
對(duì)象還處于 pending
狀態(tài)。
當(dāng) server
返回相應(yīng)的 seqId
的結(jié)果時(shí),resResolve
中對(duì)應(yīng)的 resolve
方法會(huì)調(diào)用,從而將 Promise
對(duì)象狀態(tài)設(shè)為 fulfilled
,此時(shí) client
則可以獲取到結(jié)果。
這樣我們就實(shí)現(xiàn)了一個(gè)非常樸素的 RPC 框架。接下來(lái)我們簡(jiǎn)單看看業(yè)界常用的 RPC 框架是怎么做的吧,這里以 Thrift 為例。
Thrift RPC Demo
我們先準(zhǔn)備一個(gè) calculator.thrift
文件,用來(lái)描述 service
:
service Calculator { i32 add(1:list<i32> arr), i32 multiply(1:list<i32> arr) }
由于 thrift
文件是語(yǔ)言無(wú)關(guān)的,所以我們需要通過(guò)它生成對(duì)應(yīng) Calculator.js
文件:
thrift -r --gen js:node calculator.thrift
這個(gè)文件包含 server
端和 client
相關(guān)的代碼,在 client
端負(fù)責(zé)將函數(shù)調(diào)用轉(zhuǎn)為消息發(fā)送給 server
,在 server
端負(fù)責(zé)讀取消息,調(diào)用方法,返回結(jié)果給 client
。
然后 server
和 client
分別按照如下方式進(jìn)行使用即可:
// server.js var thrift = require('thrift') var Calculator = require('./gen-nodejs/Calculator') var server = thrift.createServer(Calculator, { add(arr, result) { let s = 0 for (let i = 0; i < arr.length; i++) { s += arr[i] } result(null, s) }, multiply(arr, result) { let p = 1 for (let i = 0; i < arr.length; i++) { p *= arr[i] } result(p) }, }) server.listen(9090) // client.js var thrift = require('thrift') var Calculator = require('./gen-nodejs/Calculator') var transport = thrift.TBufferedTransport var protocol = thrift.TBinaryProtocol var connection = thrift.createConnection('localhost', 9090, { transport: transport, protocol: protocol, }) var client = thrift.createClient(Calculator, connection) client.add([1, 2], function (err, response) { console.log(response) })
下面,我們通過(guò) Wireshark
來(lái)看看 thrift
通信的過(guò)程。
打開(kāi) Wireshark
,選擇 Capturing from Loopback: lo0
,然后在 filter 中輸入 tcp.port == 9090
。分別運(yùn)行上面的 server
和 client
,則可抓包到如下內(nèi)容:
我們先來(lái)看看第五行,可以看到 Wireshark
自動(dòng)識(shí)別了 thrift
協(xié)議,并解析出這是一個(gè) CALL
類(lèi)型的消息,調(diào)用的方法為 add
。接下來(lái)我們?cè)僮屑?xì)看看 thrift
協(xié)議:
thrift
協(xié)議格式如上圖所示,這里是一個(gè)參數(shù)的場(chǎng)景,如果有多個(gè)參數(shù)的話則可以在 Data -> List
后面繼續(xù)添加,比如我們給 add
方法增加第二個(gè)參數(shù),表示是否打印日志:
i32 add(1:list<i32> arr, 2:bool printLog)
抓包得到的內(nèi)容如下:
返回的消息格式也類(lèi)似,這里就不贅述了。
關(guān)于 RPC 的內(nèi)容就先介紹到這,后面計(jì)劃基于 Nest.js 再實(shí)戰(zhàn)一下。
補(bǔ)充內(nèi)容
關(guān)于 TCP “粘包”問(wèn)題說(shuō)明
首先聲明一下,所謂的 TCP “粘包問(wèn)題”其實(shí)并不是一個(gè)問(wèn)題。
先看一個(gè)簡(jiǎn)單的例子:
// server.js const net = require('net') const server = net.createServer((clientSocket) => { console.log('Client connected') clientSocket.on('data', (data) => { console.log('-------------------') console.log(data.toString()) }) }) server.listen(9999, () => console.log('Listening on 9999')) // client.js const net = require('net') const client = net.connect({port: 9999}, () => { client.write(JSON.stringify({cmd: 'add', params: [1, 2]})) client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]})) })
啟動(dòng) server
后再運(yùn)行 client
,則 server
有可能會(huì)打印如下日志:
-------------------
{"cmd":"add","params":[1,2]}{"cmd":"multiply","params":[1,2,3]}
如上所示,客戶端調(diào)用了兩次 write
,但是服務(wù)端卻只打印了一次。也就是說(shuō),兩次發(fā)送的數(shù)據(jù)在服務(wù)端被一次性取出來(lái)了。即,使用方層面的兩個(gè)包“粘在”了一起。原因在于 TCP 是面向字節(jié)流的,并沒(méi)有包的概念,所以開(kāi)發(fā)者需要對(duì) data
事件獲取到的數(shù)據(jù)進(jìn)行解析。
以上就是Node.js高級(jí)編程使用RPC通信示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Node.js高級(jí)編程RPC通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
利用node.js+mongodb如何搭建一個(gè)簡(jiǎn)單登錄注冊(cè)的功能詳解
這篇文章主要給大家介紹了關(guān)于利用node.js+mongodb如何搭建一個(gè)簡(jiǎn)單登錄注冊(cè)功能的相關(guān)資料,文中通過(guò)示例代碼介紹非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-07-07手把手教你VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境
最近在補(bǔ)數(shù)據(jù)結(jié)構(gòu),在用VScode調(diào)試js代碼文件結(jié)果怎么都不行,這篇文章主要給大家介紹了關(guān)于VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境的相關(guān)資料,需要的朋友可以參考下2022-12-12Node發(fā)出HTTP POST請(qǐng)求的方法實(shí)例小結(jié)
這篇文章主要介紹了Node發(fā)出HTTP POST請(qǐng)求的方法,結(jié)合實(shí)例形式總結(jié)分析了三種常用的post請(qǐng)求操作方法,以及相關(guān)庫(kù)操作注意事項(xiàng),需要的朋友可以參考下2023-05-05node.js將MongoDB數(shù)據(jù)同步到MySQL的步驟
這篇文章主要給大家介紹了關(guān)于node.js將MongoDB數(shù)據(jù)同步到MySQL的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12Node.js API詳解之 vm模塊用法實(shí)例分析
這篇文章主要介紹了Node.js API詳解之 vm模塊用法,結(jié)合實(shí)例形式分析了Node.js API中vm模塊基本功能、函數(shù)、使用方法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下2020-05-05總結(jié)Node.js中的一些錯(cuò)誤類(lèi)型
NodeJS 的錯(cuò)誤處理讓人痛苦,在很長(zhǎng)的一段時(shí)間里,大量的錯(cuò)誤被放任不管。但是要想建立一個(gè)健壯的 Node.js 程序就必須正確的處理這些錯(cuò)誤,而且這并不難學(xué)。下面跟著小編一起來(lái)學(xué)習(xí)學(xué)習(xí)。2016-08-08nodejs個(gè)人博客開(kāi)發(fā)第五步 分配數(shù)據(jù)
這篇文章主要為大家詳細(xì)介紹了nodejs個(gè)人博客開(kāi)發(fā)的分配數(shù)據(jù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04詳解Node.js項(xiàng)目APM監(jiān)控之New Relic
本篇文章主要介紹了Node.js項(xiàng)目APM監(jiān)控之New Relic,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05