欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Node.js高級編程使用RPC通信示例詳解

 更新時間:2023年01月12日 11:45:02   作者:Aaaaaaaaaaayou  
這篇文章主要為大家介紹了Node.js高級編程使用RPC通信示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

在構(gòu)建微服務(wù)時,為了追求極致的效率,服務(wù)間一般會使用 RPC(Remote Procedure Call)來進行通信。本文通過 Node.js 來實踐一下。

Node.js 樸素 RPC

首先我們來構(gòu)建一下 server

// server.js
const net = require('net')
const {msgBuffer} = require('../utils')
const server = net.createServer((clientSocket) => {
  clientSocket.on('data', (data) => {
    msgBuffer.push(data)
    while (!msgBuffer.isFinished()) {
      const message = JSON.parse(msgBuffer.handleData())
      clientSocket.write(
        JSON.stringify(fnMap[message.cmd].apply(null, message.params)) + '\n'
      )
    }
  })
})
server.listen(9999, () => console.log('Listening on 9999'))
const fnMap = {
  add: (...args) => {
    let s = 0
    for (let i = 0; i < args.length; i++) {
      s += args[i]
    }
    return s
  },
  multiply: (...args) => {
    let p = 1
    for (let i = 0; i < args.length; i++) {
      p *= args[i]
    }
    return p
  },
}
// MessageBuffer
class MessageBuffer {
  constructor(delimiter) {
    this.delimiter = delimiter
    this.buffer = ''
  }
  isFinished() {
    if (
      this.buffer.length === 0 ||
      this.buffer.indexOf(this.delimiter) === -1
    ) {
      return true
    }
    return false
  }
  push(data) {
    this.buffer += data
  }
  getMessage() {
    const delimiterIndex = this.buffer.indexOf(this.delimiter)
    if (delimiterIndex !== -1) {
      const message = this.buffer.slice(0, delimiterIndex)
      this.buffer = this.buffer.replace(message + this.delimiter, '')
      return message
    }
    return null
  }
  handleData() {
    const message = this.getMessage()
    return message
  }
}
exports.msgBuffer = new MessageBuffer('\n')

我們新建了一個 TCP 的服務(wù),并監(jiān)聽來自客戶端的數(shù)據(jù),注意這里我們通過一個 MessageBuffer 類來對數(shù)據(jù)進行解析(至于為什么這么做可參考考文末補充內(nèi)容:關(guān)于 TCP “粘包”問題說明),將 TCP 數(shù)據(jù)流解析成我們的消息體。然后調(diào)用服務(wù)端預(yù)先配置好的方法,最后將返回值返回給客戶端。

客戶端相對比較簡單,將函數(shù)調(diào)用相關(guān)數(shù)據(jù)按照事先規(guī)定好的格式發(fā)送給服務(wù)端即可:

const net = require('net')
const {msgBuffer} = require('../utils')
const client = net.connect({port: 9999}, () => {
  client.write(JSON.stringify({cmd: 'add', params: [1, 2, 3]}) + '\n')
  client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}) + '\n')
})
client.on('data', (data) => {
  msgBuffer.push(data)
  while (!msgBuffer.isFinished()) {
    const message = JSON.parse(msgBuffer.handleData())
    console.log(message)
  }
})

這樣,一個非常簡單的 RPC 雛形就出來了,不過目前這種方式還不是 RPC。所謂的 RPC,就是客戶端必須像調(diào)用本地方法一樣來調(diào)用遠端的方法,而不是還需要自己組裝消息體,并監(jiān)聽事件獲取返回值。理想中的方式應(yīng)該像這樣:

const result = await client.add(1, 2, 3)

我們來改造一下。首先,我們定義一份配置文件,用來描述我們的 services

// services/index.js
class Calculator {
  add(arr) {
    let s = 0
    for (let i = 0; i < arr.length; i++) {
      s += arr[i]
    }
    return s
  }
  multiply(arr) {
    let p = 1
    for (let i = 0; i < arr.length; i++) {
      p *= arr[i]
    }
    return p
  }
}
module.exports = {
  calculator: {
    cls: Calculator,
    methods: {
      add: {
        params: [{type: 'number[]', optional: false}],
        return: {
          type: 'number',
        },
      },
      multiply: {
        params: [{type: 'number[]', optional: false}],
        return: {
          type: 'number',
        },
      },
    },
  },
}

services 描述文件中包含了類以及它擁有的方法,方法參數(shù)(類型,是否可選),返回值類型等信息。為了簡單一點,我們先不校驗參數(shù)和返回值的類型。

然后就是我們的 server

const net = require('net')
const {msgBuffer} = require('../utils')
const services = require('../services')
class Server {
  constructor(services) {
    this.tcpServer = net.createServer((clientSocket) => {
      const serviceMap = this.createServiceMap(services)
      clientSocket.on('data', (data) => {
        msgBuffer.push(data)
        while (!msgBuffer.isFinished()) {
          const {seqId, service, method, params} = JSON.parse(
            msgBuffer.handleData()
          )
          clientSocket.write(
            JSON.stringify({
              seqId,
              result: serviceMap[service][method].apply(null, params),
            }) + '\n'
          )
        }
      })
    })
  }
  createServiceMap(services) {
    const serviceMap = {}
    Object.keys(services).forEach((serviceKey) => {
      serviceMap[serviceKey] = new services[serviceKey].cls()
    })
    return serviceMap
  }
  listen(...args) {
    this.tcpServer.listen(...args)
  }
}
new Server(services).listen(9999)

server 中會監(jiān)聽 client 的連接,一旦有 client 進來,就根據(jù) services 配置文件為其實例化所有 services。之后開始接受 client 的數(shù)據(jù),并根據(jù) client 的消息調(diào)用相應(yīng)的 service 中的方法,并返回結(jié)果。

注意到消息體中有個 seqId,用來標識包的序號,必須將其返回給 client,這樣 client 才能知道返回的結(jié)果是跟哪個請求對應(yīng)的。

最后就是我們的 client

const net = require('net')
const EventEmitter = require('events')
const {msgBuffer} = require('../utils')
const services = require('../services')
class Client {
  constructor({port, services}) {
    this.rspResolve = {}
    this.seqId = 0
    this.port = port
    this.parseServices(services)
  }
  init() {
    return new Promise((resolve, reject) => {
      this.client = net.connect({port: this.port}, () => {
        resolve()
      })
      this.client.on('data', (data) => {
        msgBuffer.push(data)
        while (!msgBuffer.isFinished()) {
          const {seqId, result} = JSON.parse(msgBuffer.handleData())
          this.rspResolve[seqId](result)
        }
      })
    })
  }
  parseServices(services) {
    for (const serviceKey in services) {
      const service = services[serviceKey]
      this[serviceKey] = {}
      for (const method in service.methods) {
        this[serviceKey][method] = (...params) => {
          this.client.write(
            JSON.stringify({
              seqId: this.seqId,
              service: serviceKey,
              method,
              params,
            }) + '\n'
          )
          return new Promise((resolve, reject) => {
            this.rspResolve[this.seqId++] = resolve
          })
        }
      }
    }
  }
}
const client = new Client({port: 9999, services})
client.init().then(async () => {
  console.log(await client.calculator.add([1, 2, 3, 4, 5]))
  console.log(await client.calculator.multiply([1, 2, 3, 4, 5]))
})

初始化一個 client 時,會解析 services,并在當前 client 實例上添加 services 的方法。方法中會將函數(shù)調(diào)用封裝成消息發(fā)送給服務(wù)端并返回 Promise 對象,同時將 Promise 對象的 resolve 方法緩存在 resResolve 這個 Map 中,此時 Promise 對象還處于 pending 狀態(tài)。

server 返回相應(yīng)的 seqId 的結(jié)果時,resResolve 中對應(yīng)的 resolve 方法會調(diào)用,從而將 Promise 對象狀態(tài)設(shè)為 fulfilled,此時 client 則可以獲取到結(jié)果。

這樣我們就實現(xiàn)了一個非常樸素的 RPC 框架。接下來我們簡單看看業(yè)界常用的 RPC 框架是怎么做的吧,這里以 Thrift 為例。

Thrift RPC Demo

我們先準備一個 calculator.thrift 文件,用來描述 service

service Calculator {
  i32 add(1:list&lt;i32&gt; arr),
  i32 multiply(1:list&lt;i32&gt; arr)
}

由于 thrift 文件是語言無關(guān)的,所以我們需要通過它生成對應(yīng) Calculator.js 文件:

thrift -r --gen js:node calculator.thrift

這個文件包含 server 端和 client 相關(guān)的代碼,在 client 端負責將函數(shù)調(diào)用轉(zhuǎn)為消息發(fā)送給 server,在 server 端負責讀取消息,調(diào)用方法,返回結(jié)果給 client。

然后 serverclient 分別按照如下方式進行使用即可:

// server.js
var thrift = require('thrift')
var Calculator = require('./gen-nodejs/Calculator')
var server = thrift.createServer(Calculator, {
  add(arr, result) {
    let s = 0
    for (let i = 0; i &lt; arr.length; i++) {
      s += arr[i]
    }
    result(null, s)
  },
  multiply(arr, result) {
    let p = 1
    for (let i = 0; i &lt; arr.length; i++) {
      p *= arr[i]
    }
    result(p)
  },
})
server.listen(9090)
// client.js
var thrift = require('thrift')
var Calculator = require('./gen-nodejs/Calculator')
var transport = thrift.TBufferedTransport
var protocol = thrift.TBinaryProtocol
var connection = thrift.createConnection('localhost', 9090, {
  transport: transport,
  protocol: protocol,
})
var client = thrift.createClient(Calculator, connection)
client.add([1, 2], function (err, response) {
  console.log(response)
})

下面,我們通過 Wireshark 來看看 thrift 通信的過程。

打開 Wireshark,選擇 Capturing from Loopback: lo0,然后在 filter 中輸入 tcp.port == 9090。分別運行上面的 serverclient,則可抓包到如下內(nèi)容:

我們先來看看第五行,可以看到 Wireshark 自動識別了 thrift 協(xié)議,并解析出這是一個 CALL 類型的消息,調(diào)用的方法為 add。接下來我們再仔細看看 thrift 協(xié)議:

thrift 協(xié)議格式如上圖所示,這里是一個參數(shù)的場景,如果有多個參數(shù)的話則可以在 Data -> List 后面繼續(xù)添加,比如我們給 add 方法增加第二個參數(shù),表示是否打印日志:

i32 add(1:list<i32> arr, 2:bool printLog)

抓包得到的內(nèi)容如下:

返回的消息格式也類似,這里就不贅述了。

關(guān)于 RPC 的內(nèi)容就先介紹到這,后面計劃基于 Nest.js 再實戰(zhàn)一下。

補充內(nèi)容

關(guān)于 TCP “粘包”問題說明

首先聲明一下,所謂的 TCP “粘包問題”其實并不是一個問題。

先看一個簡單的例子:

// server.js
const net = require('net')
const server = net.createServer((clientSocket) => {
  console.log('Client connected')
  clientSocket.on('data', (data) => {
    console.log('-------------------')
    console.log(data.toString())
  })
})
server.listen(9999, () => console.log('Listening on 9999'))
// client.js
const net = require('net')
const client = net.connect({port: 9999}, () => {
  client.write(JSON.stringify({cmd: 'add', params: [1, 2]}))
  client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}))
})

啟動 server 后再運行 client,則 server可能會打印如下日志:

-------------------
{"cmd":"add","params":[1,2]}{"cmd":"multiply","params":[1,2,3]}

如上所示,客戶端調(diào)用了兩次 write,但是服務(wù)端卻只打印了一次。也就是說,兩次發(fā)送的數(shù)據(jù)在服務(wù)端被一次性取出來了。即,使用方層面的兩個包“粘在”了一起。原因在于 TCP 是面向字節(jié)流的,并沒有包的概念,所以開發(fā)者需要對 data 事件獲取到的數(shù)據(jù)進行解析。

以上就是Node.js高級編程使用RPC通信示例詳解的詳細內(nèi)容,更多關(guān)于Node.js高級編程RPC通信的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 利用node.js+mongodb如何搭建一個簡單登錄注冊的功能詳解

    利用node.js+mongodb如何搭建一個簡單登錄注冊的功能詳解

    這篇文章主要給大家介紹了關(guān)于利用node.js+mongodb如何搭建一個簡單登錄注冊功能的相關(guān)資料,文中通過示例代碼介紹非常詳細,對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-07-07
  • 手把手教你VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境

    手把手教你VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境

    最近在補數(shù)據(jù)結(jié)構(gòu),在用VScode調(diào)試js代碼文件結(jié)果怎么都不行,這篇文章主要給大家介紹了關(guān)于VSCode配置JavaScript基于Node.js的調(diào)試環(huán)境的相關(guān)資料,需要的朋友可以參考下
    2022-12-12
  • 在Node.js中實現(xiàn)視頻收藏功能

    在Node.js中實現(xiàn)視頻收藏功能

    在構(gòu)建視頻分享平臺時,視頻的收藏功能是用戶互動的重要組成部分,本文將介紹如何在Node.js應(yīng)用中實現(xiàn)視頻收藏功能,包括數(shù)據(jù)模型的創(chuàng)建、業(yè)務(wù)邏輯的實現(xiàn)以及接口的驗證測試,需要的朋友可以參考下
    2024-04-04
  • Node發(fā)出HTTP POST請求的方法實例小結(jié)

    Node發(fā)出HTTP POST請求的方法實例小結(jié)

    這篇文章主要介紹了Node發(fā)出HTTP POST請求的方法,結(jié)合實例形式總結(jié)分析了三種常用的post請求操作方法,以及相關(guān)庫操作注意事項,需要的朋友可以參考下
    2023-05-05
  • node.js將MongoDB數(shù)據(jù)同步到MySQL的步驟

    node.js將MongoDB數(shù)據(jù)同步到MySQL的步驟

    這篇文章主要給大家介紹了關(guān)于node.js將MongoDB數(shù)據(jù)同步到MySQL的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-12-12
  • Node.js API詳解之 vm模塊用法實例分析

    Node.js API詳解之 vm模塊用法實例分析

    這篇文章主要介紹了Node.js API詳解之 vm模塊用法,結(jié)合實例形式分析了Node.js API中vm模塊基本功能、函數(shù)、使用方法及相關(guān)操作注意事項,需要的朋友可以參考下
    2020-05-05
  • 總結(jié)Node.js中的一些錯誤類型

    總結(jié)Node.js中的一些錯誤類型

    NodeJS 的錯誤處理讓人痛苦,在很長的一段時間里,大量的錯誤被放任不管。但是要想建立一個健壯的 Node.js 程序就必須正確的處理這些錯誤,而且這并不難學(xué)。下面跟著小編一起來學(xué)習(xí)學(xué)習(xí)。
    2016-08-08
  • nodejs實現(xiàn)UDP組播示例方法

    nodejs實現(xiàn)UDP組播示例方法

    這篇文章主要介紹了nodejs實現(xiàn)UDP組播示例方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • nodejs個人博客開發(fā)第五步 分配數(shù)據(jù)

    nodejs個人博客開發(fā)第五步 分配數(shù)據(jù)

    這篇文章主要為大家詳細介紹了nodejs個人博客開發(fā)的分配數(shù)據(jù),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • 詳解Node.js項目APM監(jiān)控之New Relic

    詳解Node.js項目APM監(jiān)控之New Relic

    本篇文章主要介紹了Node.js項目APM監(jiān)控之New Relic,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05

最新評論