Node.js高級編程使用RPC通信示例詳解
前言
在構(gòu)建微服務(wù)時,為了追求極致的效率,服務(wù)間一般會使用 RPC(Remote Procedure Call)來進行通信。本文通過 Node.js 來實踐一下。
Node.js 樸素 RPC
首先我們來構(gòu)建一下 server
:
// server.js const net = require('net') const {msgBuffer} = require('../utils') const server = net.createServer((clientSocket) => { clientSocket.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const message = JSON.parse(msgBuffer.handleData()) clientSocket.write( JSON.stringify(fnMap[message.cmd].apply(null, message.params)) + '\n' ) } }) }) server.listen(9999, () => console.log('Listening on 9999')) const fnMap = { add: (...args) => { let s = 0 for (let i = 0; i < args.length; i++) { s += args[i] } return s }, multiply: (...args) => { let p = 1 for (let i = 0; i < args.length; i++) { p *= args[i] } return p }, } // MessageBuffer class MessageBuffer { constructor(delimiter) { this.delimiter = delimiter this.buffer = '' } isFinished() { if ( this.buffer.length === 0 || this.buffer.indexOf(this.delimiter) === -1 ) { return true } return false } push(data) { this.buffer += data } getMessage() { const delimiterIndex = this.buffer.indexOf(this.delimiter) if (delimiterIndex !== -1) { const message = this.buffer.slice(0, delimiterIndex) this.buffer = this.buffer.replace(message + this.delimiter, '') return message } return null } handleData() { const message = this.getMessage() return message } } exports.msgBuffer = new MessageBuffer('\n')
我們新建了一個 TCP 的服務(wù),并監(jiān)聽來自客戶端的數(shù)據(jù),注意這里我們通過一個 MessageBuffer
類來對數(shù)據(jù)進行解析(至于為什么這么做可參考考文末補充內(nèi)容:關(guān)于 TCP “粘包”問題說明),將 TCP 數(shù)據(jù)流解析成我們的消息體。然后調(diào)用服務(wù)端預(yù)先配置好的方法,最后將返回值返回給客戶端。
客戶端相對比較簡單,將函數(shù)調(diào)用相關(guān)數(shù)據(jù)按照事先規(guī)定好的格式發(fā)送給服務(wù)端即可:
const net = require('net') const {msgBuffer} = require('../utils') const client = net.connect({port: 9999}, () => { client.write(JSON.stringify({cmd: 'add', params: [1, 2, 3]}) + '\n') client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}) + '\n') }) client.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const message = JSON.parse(msgBuffer.handleData()) console.log(message) } })
這樣,一個非常簡單的 RPC 雛形就出來了,不過目前這種方式還不是 RPC。所謂的 RPC,就是客戶端必須像調(diào)用本地方法一樣來調(diào)用遠端的方法,而不是還需要自己組裝消息體,并監(jiān)聽事件獲取返回值。理想中的方式應(yīng)該像這樣:
const result = await client.add(1, 2, 3)
我們來改造一下。首先,我們定義一份配置文件,用來描述我們的 services
:
// services/index.js class Calculator { add(arr) { let s = 0 for (let i = 0; i < arr.length; i++) { s += arr[i] } return s } multiply(arr) { let p = 1 for (let i = 0; i < arr.length; i++) { p *= arr[i] } return p } } module.exports = { calculator: { cls: Calculator, methods: { add: { params: [{type: 'number[]', optional: false}], return: { type: 'number', }, }, multiply: { params: [{type: 'number[]', optional: false}], return: { type: 'number', }, }, }, }, }
services
描述文件中包含了類以及它擁有的方法,方法參數(shù)(類型,是否可選),返回值類型等信息。為了簡單一點,我們先不校驗參數(shù)和返回值的類型。
然后就是我們的 server
:
const net = require('net') const {msgBuffer} = require('../utils') const services = require('../services') class Server { constructor(services) { this.tcpServer = net.createServer((clientSocket) => { const serviceMap = this.createServiceMap(services) clientSocket.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const {seqId, service, method, params} = JSON.parse( msgBuffer.handleData() ) clientSocket.write( JSON.stringify({ seqId, result: serviceMap[service][method].apply(null, params), }) + '\n' ) } }) }) } createServiceMap(services) { const serviceMap = {} Object.keys(services).forEach((serviceKey) => { serviceMap[serviceKey] = new services[serviceKey].cls() }) return serviceMap } listen(...args) { this.tcpServer.listen(...args) } } new Server(services).listen(9999)
server
中會監(jiān)聽 client
的連接,一旦有 client
進來,就根據(jù) services
配置文件為其實例化所有 services
。之后開始接受 client
的數(shù)據(jù),并根據(jù) client
的消息調(diào)用相應(yīng)的 service
中的方法,并返回結(jié)果。
注意到消息體中有個 seqId
,用來標識包的序號,必須將其返回給 client
,這樣 client
才能知道返回的結(jié)果是跟哪個請求對應(yīng)的。
最后就是我們的 client
:
const net = require('net') const EventEmitter = require('events') const {msgBuffer} = require('../utils') const services = require('../services') class Client { constructor({port, services}) { this.rspResolve = {} this.seqId = 0 this.port = port this.parseServices(services) } init() { return new Promise((resolve, reject) => { this.client = net.connect({port: this.port}, () => { resolve() }) this.client.on('data', (data) => { msgBuffer.push(data) while (!msgBuffer.isFinished()) { const {seqId, result} = JSON.parse(msgBuffer.handleData()) this.rspResolve[seqId](result) } }) }) } parseServices(services) { for (const serviceKey in services) { const service = services[serviceKey] this[serviceKey] = {} for (const method in service.methods) { this[serviceKey][method] = (...params) => { this.client.write( JSON.stringify({ seqId: this.seqId, service: serviceKey, method, params, }) + '\n' ) return new Promise((resolve, reject) => { this.rspResolve[this.seqId++] = resolve }) } } } } } const client = new Client({port: 9999, services}) client.init().then(async () => { console.log(await client.calculator.add([1, 2, 3, 4, 5])) console.log(await client.calculator.multiply([1, 2, 3, 4, 5])) })
初始化一個 client
時,會解析 services
,并在當前 client
實例上添加 services
的方法。方法中會將函數(shù)調(diào)用封裝成消息發(fā)送給服務(wù)端并返回 Promise
對象,同時將 Promise
對象的 resolve
方法緩存在 resResolve
這個 Map
中,此時 Promise
對象還處于 pending
狀態(tài)。
當 server
返回相應(yīng)的 seqId
的結(jié)果時,resResolve
中對應(yīng)的 resolve
方法會調(diào)用,從而將 Promise
對象狀態(tài)設(shè)為 fulfilled
,此時 client
則可以獲取到結(jié)果。
這樣我們就實現(xiàn)了一個非常樸素的 RPC 框架。接下來我們簡單看看業(yè)界常用的 RPC 框架是怎么做的吧,這里以 Thrift 為例。
Thrift RPC Demo
我們先準備一個 calculator.thrift
文件,用來描述 service
:
service Calculator { i32 add(1:list<i32> arr), i32 multiply(1:list<i32> arr) }
由于 thrift
文件是語言無關(guān)的,所以我們需要通過它生成對應(yīng) Calculator.js
文件:
thrift -r --gen js:node calculator.thrift
這個文件包含 server
端和 client
相關(guān)的代碼,在 client
端負責將函數(shù)調(diào)用轉(zhuǎn)為消息發(fā)送給 server
,在 server
端負責讀取消息,調(diào)用方法,返回結(jié)果給 client
。
然后 server
和 client
分別按照如下方式進行使用即可:
// server.js var thrift = require('thrift') var Calculator = require('./gen-nodejs/Calculator') var server = thrift.createServer(Calculator, { add(arr, result) { let s = 0 for (let i = 0; i < arr.length; i++) { s += arr[i] } result(null, s) }, multiply(arr, result) { let p = 1 for (let i = 0; i < arr.length; i++) { p *= arr[i] } result(p) }, }) server.listen(9090) // client.js var thrift = require('thrift') var Calculator = require('./gen-nodejs/Calculator') var transport = thrift.TBufferedTransport var protocol = thrift.TBinaryProtocol var connection = thrift.createConnection('localhost', 9090, { transport: transport, protocol: protocol, }) var client = thrift.createClient(Calculator, connection) client.add([1, 2], function (err, response) { console.log(response) })
下面,我們通過 Wireshark
來看看 thrift
通信的過程。
打開 Wireshark
,選擇 Capturing from Loopback: lo0
,然后在 filter 中輸入 tcp.port == 9090
。分別運行上面的 server
和 client
,則可抓包到如下內(nèi)容:
我們先來看看第五行,可以看到 Wireshark
自動識別了 thrift
協(xié)議,并解析出這是一個 CALL
類型的消息,調(diào)用的方法為 add
。接下來我們再仔細看看 thrift
協(xié)議:
thrift
協(xié)議格式如上圖所示,這里是一個參數(shù)的場景,如果有多個參數(shù)的話則可以在 Data -> List
后面繼續(xù)添加,比如我們給 add
方法增加第二個參數(shù),表示是否打印日志:
i32 add(1:list<i32> arr, 2:bool printLog)
抓包得到的內(nèi)容如下:
返回的消息格式也類似,這里就不贅述了。
關(guān)于 RPC 的內(nèi)容就先介紹到這,后面計劃基于 Nest.js 再實戰(zhàn)一下。
補充內(nèi)容
關(guān)于 TCP “粘包”問題說明
首先聲明一下,所謂的 TCP “粘包問題”其實并不是一個問題。
先看一個簡單的例子:
// server.js const net = require('net') const server = net.createServer((clientSocket) => { console.log('Client connected') clientSocket.on('data', (data) => { console.log('-------------------') console.log(data.toString()) }) }) server.listen(9999, () => console.log('Listening on 9999')) // client.js const net = require('net') const client = net.connect({port: 9999}, () => { client.write(JSON.stringify({cmd: 'add', params: [1, 2]})) client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]})) })
啟動 server
后再運行 client
,則 server
有可能會打印如下日志:
-------------------
{"cmd":"add","params":[1,2]}{"cmd":"multiply","params":[1,2,3]}
如上所示,客戶端調(diào)用了兩次 write
,但是服務(wù)端卻只打印了一次。也就是說,兩次發(fā)送的數(shù)據(jù)在服務(wù)端被一次性取出來了。即,使用方層面的兩個包“粘在”了一起。原因在于 TCP 是面向字節(jié)流的,并沒有包的概念,所以開發(fā)者需要對 data
事件獲取到的數(shù)據(jù)進行解析。
以上就是Node.js高級編程使用RPC通信示例詳解的詳細內(nèi)容,更多關(guān)于Node.js高級編程RPC通信的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
利用node.js+mongodb如何搭建一個簡單登錄注冊的功能詳解
這篇文章主要給大家介紹了關(guān)于利用node.js+mongodb如何搭建一個簡單登錄注冊功能的相關(guān)資料,文中通過示例代碼介紹非常詳細,對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-07-07手把手教你VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境
最近在補數(shù)據(jù)結(jié)構(gòu),在用VScode調(diào)試js代碼文件結(jié)果怎么都不行,這篇文章主要給大家介紹了關(guān)于VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境的相關(guān)資料,需要的朋友可以參考下2022-12-12Node發(fā)出HTTP POST請求的方法實例小結(jié)
這篇文章主要介紹了Node發(fā)出HTTP POST請求的方法,結(jié)合實例形式總結(jié)分析了三種常用的post請求操作方法,以及相關(guān)庫操作注意事項,需要的朋友可以參考下2023-05-05node.js將MongoDB數(shù)據(jù)同步到MySQL的步驟
這篇文章主要給大家介紹了關(guān)于node.js將MongoDB數(shù)據(jù)同步到MySQL的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12nodejs個人博客開發(fā)第五步 分配數(shù)據(jù)
這篇文章主要為大家詳細介紹了nodejs個人博客開發(fā)的分配數(shù)據(jù),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04詳解Node.js項目APM監(jiān)控之New Relic
本篇文章主要介紹了Node.js項目APM監(jiān)控之New Relic,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05