欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

node.js中TCP Socket多進(jìn)程間的消息推送示例詳解

 更新時(shí)間:2018年07月10日 09:33:50   作者:子匠_Zijor  
這篇文章主要給大家介紹了關(guān)于node.js中TCP Socket多進(jìn)程間的消息推送的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

前言

前段時(shí)間接到了一個(gè)支付中轉(zhuǎn)服務(wù)的需求,即支付數(shù)據(jù)通過(guò)http接口傳到中轉(zhuǎn)服務(wù)器,中轉(zhuǎn)服務(wù)器將支付數(shù)據(jù)發(fā)送到異構(gòu)后臺(tái)(Lua)的指定tcp socket。

一開(kāi)始評(píng)估的時(shí)候感覺(jué)蠻簡(jiǎn)單的,就是http server和tcp server間的通信,不是一個(gè)Event實(shí)例就能解決的狀態(tài)管理問(wèn)題嗎?注冊(cè)一個(gè)事件A用于消息傳遞,在socket連接時(shí)注冊(cè)唯一的ID,然后在http接收到數(shù)據(jù)時(shí),emit事件A;在監(jiān)聽(tīng)到事件A時(shí),在tcp server中尋找指定ID對(duì)應(yīng)的socket處理該數(shù)據(jù)即可。

盡管node.js在高并發(fā)方面有不錯(cuò)的性能,但是單個(gè)tcp server實(shí)例的承載能力有限,為避免服務(wù)器過(guò)載,node.js 單進(jìn)程的內(nèi)存有上限(默認(rèn)2G),能容納的長(zhǎng)連接客戶端數(shù)不多。但隨著業(yè)務(wù)的擴(kuò)大,我們需要考慮多機(jī)集群部署,客戶端可以連接到任一節(jié)點(diǎn),并發(fā)送消息。如何做到多節(jié)點(diǎn)的同時(shí)推送,我們需要建立一套多節(jié)點(diǎn)之間的消息分發(fā)/訂閱架構(gòu)。常用的第三方消息管理庫(kù)有 RabbitMQ和Redis等。在這里,我用的是Redis的訂閱發(fā)布服務(wù)。

redis.io有一個(gè)比較成熟的redis消息中轉(zhuǎn)庫(kù)socket.io-redis (本地下載)。但我們項(xiàng)目中異構(gòu)后臺(tái)用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs實(shí)現(xiàn)并不難,就手寫(xiě)了。

redis在該項(xiàng)目中主要起到一個(gè)消息分發(fā)中心(publish/subscribe)的作用。當(dāng)http請(qǐng)求的支付數(shù)據(jù)發(fā)送過(guò)來(lái)時(shí),則通過(guò)redis的publish功能往所有的channel推送消息,這樣所有訂閱該channel的socket server就能收到回調(diào),然后推送到指定客戶端。在應(yīng)用層看跟Event事件消息的處理差不多。

const redis = require("redis"),
 redisClient = redis.createClient,
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 監(jiān)聽(tīng)頻道的消息回調(diào)
sub.on('message', function(channel, message) {
 switch (channle){
  case PAY_MQ_CHANNEL:
   console.log('notification received:', message);

   // 廣播消息到指定socket

   break;
 }
});
// 訂閱頻道
sub.subscribe(PAY_MQ_CHANNEL);

// 當(dāng)接收到支付數(shù)據(jù)時(shí),推送頻道消息
pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

由于redis的sub/pub的channel訂閱數(shù)有上限,所以建議一類消息使用一個(gè)channel,一個(gè)channel下使用map、set或數(shù)組來(lái)存儲(chǔ)訂閱時(shí)的回調(diào)函數(shù),在接收到訂閱消息時(shí)遍歷執(zhí)行回調(diào)函數(shù)。

下面是我封裝好的Redis組件(RedisMQProxy.js):

/*
 * redis 訂閱/發(fā)布
 */
const _ = require('lodash'),
 redis = require("redis"),
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG);

let SubListenerFuns = {}; // channel的回調(diào)函數(shù)列表

let RedisMQProxy = {

 // 訂閱channel
 on(channel, cb, errorCb, once = false) {
  sub.subscribe(channel); // 訂閱channel消息

  // 將回調(diào)函數(shù)存放數(shù)組中
  SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  SubListenerFuns[channel].push({
   once, cb, errorCb
  });
 },

 // 監(jiān)聽(tīng)一次性的channel回調(diào)函數(shù)
 once(channel, cb, errorCb) {
  this.on(channel, cb, errorCb, true);
 },

 // 發(fā)送channel消息
 emit(channel, message) {
  if(!_.isString(message)) {
   message = JSON.stringify(message);
  }
  pub.publish(channel, message);
 },

 // 移除channel上的監(jiān)聽(tīng)函數(shù)
 removeListener(channel, func) {
  let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  for(let i = 0, l = channelHandlers.length; i < l; i++) {
   let handler = channelHandlers[i] || {};
   let cb = handler.cb;
   if(func && func == cb) {
    channelHandlers.splice(i, 1);
    return false;
   }
  }
 }
};

RedisMQProxy.SubListeners = SubListenerFuns;

pub.on('error', onError);
sub.on('error', onError);

// 監(jiān)聽(tīng)redis的訂閱消息
sub.on("message", function(channel, message) {
 // 遍歷執(zhí)行channel的回調(diào)函數(shù)
 try {
  message = JSON.parse(message);
 } catch(e) {}
 broadcastToChannel(channel, message);
});

// 廣播消息到指定頻道
function broadcastToChannel(channel, message, isError) {
 let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
 for(let i = 0, l = channelHandlers.length; i < l; i++) {
  let handler = channelHandlers[i] || {};
  let isOnce = handler.once || false;
  let func = handler.cb;
  let errorFunc = handler.errorCb;

  _.isFunction(func) && func(message);
  isError && _.isFunction(errorFunc) && errorFunc(message);

  isOnce && channelHandlers.splice(i, 1); // 移除一次性監(jiān)聽(tīng)的函數(shù)
 }
}

function broadcastToAllChannels(message, isError) {
 for(let channel in SubListenerFuns) {
  broadcastToChannel(channel, message, isError);
 }
}

function onError(err) {
 err = err || {};
 err.msg = err.msg || 'redis sub/pub fail';

 // 通知所有channel執(zhí)行錯(cuò)誤回調(diào)函數(shù)
 broadcastToAllChannels(err, true);
}

module.exports = RedisMQProxy;

在使用時(shí)就可以比較方便地調(diào)用了:

const RedisMQProxy = require('./RedisMQProxy'),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 訂閱channel
RedisMQ.on(PAY_MQ_CHANNEL, function(message) {
 console.log('notification received:', message);
 // 廣播消息到指定socket
 // ...
});

// 訂閱一次性的channel
RedisMQ.once(PAY_MQ_CHANNEL, function(message) {
 // ...
});

// 當(dāng)接收到支付數(shù)據(jù)時(shí),推送頻道消息
RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

目前該項(xiàng)目已經(jīng)健康運(yùn)行了一個(gè)多月。由于socket server的多進(jìn)程間消息推送依賴于redis的消息中轉(zhuǎn),而Redis使用的是單進(jìn)程,未能充分利用CPU。當(dāng)業(yè)務(wù)膨脹的時(shí)候,redis就要考慮分布集群了。

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • Node.js HTTP服務(wù)器中的文件、圖片上傳的方法

    Node.js HTTP服務(wù)器中的文件、圖片上傳的方法

    這篇文章主要介紹了Node.js HTTP服務(wù)器中的文件、圖片上傳的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • Node.js如何通過(guò)http調(diào)用外部接口

    Node.js如何通過(guò)http調(diào)用外部接口

    這篇文章主要介紹了Node.js如何通過(guò)http調(diào)用外部接口問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • 淺談nodejs中的類定義和繼承的套路

    淺談nodejs中的類定義和繼承的套路

    本篇文章主要介紹了淺談nodejs中的類定義和繼承的套路,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-07-07
  • 對(duì)node通過(guò)fs模塊判斷文件是否是文件夾的實(shí)例講解

    對(duì)node通過(guò)fs模塊判斷文件是否是文件夾的實(shí)例講解

    今天小編就為大家分享一篇對(duì)node通過(guò)fs模塊判斷文件是否是文件夾的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-06-06
  • Node.js之HTTP服務(wù)端和客戶端實(shí)現(xiàn)方式

    Node.js之HTTP服務(wù)端和客戶端實(shí)現(xiàn)方式

    這篇文章主要介紹了Node.js之HTTP服務(wù)端和客戶端實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-09-09
  • node.js配置Token驗(yàn)證的2種方式總結(jié)

    node.js配置Token驗(yàn)證的2種方式總結(jié)

    token驗(yàn)證,在設(shè)計(jì)登錄注冊(cè)和一些權(quán)限接口時(shí)發(fā)揮作用,下面這篇文章主要給大家介紹了關(guān)于node.js配置Token驗(yàn)證的2種方式,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • 解決Node.js包管理器安裝報(bào)錯(cuò)npm?ERR!?code?1的問(wèn)題

    解決Node.js包管理器安裝報(bào)錯(cuò)npm?ERR!?code?1的問(wèn)題

    在開(kāi)發(fā)過(guò)程中,我們經(jīng)常需要使用各種Node.js包來(lái)擴(kuò)展我們的應(yīng)用程序功能,這些包通常通過(guò)npm(Node.js包管理器)進(jìn)行安裝和管理,有時(shí)候我們可能會(huì)遇到一些關(guān)于npm的錯(cuò)誤,本文將詳細(xì)介紹如何解決這個(gè)問(wèn)題,并提供一個(gè)詳細(xì)的實(shí)例,需要的朋友可以參考下
    2024-03-03
  • Zabbix添加Node.js監(jiān)控的方法

    Zabbix添加Node.js監(jiān)控的方法

    這篇文章主要介紹了Zabbix添加Node.js監(jiān)控的方法,非常不錯(cuò)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2016-10-10
  • nvm、nrm、npm 安裝和使用詳解(小結(jié))

    nvm、nrm、npm 安裝和使用詳解(小結(jié))

    這篇文章主要介紹了nvm、nrm、npm 安裝和使用詳解(小結(jié)),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-01-01
  • Node.js 使用 gRPC從定義到實(shí)現(xiàn)過(guò)程詳解

    Node.js 使用 gRPC從定義到實(shí)現(xiàn)過(guò)程詳解

    gRPC是一個(gè)高性能、開(kāi)源的遠(yuǎn)程過(guò)程調(diào)用(RPC)框架,由 Google 開(kāi)發(fā),它支持多種編程語(yǔ)言,旨在簡(jiǎn)化和優(yōu)化分布式系統(tǒng)中的服務(wù)通信,本文給大家介紹Node.js 使用 gRPC從定義到實(shí)現(xiàn)過(guò)程,感興趣的朋友跟隨小編一起看看吧
    2024-07-07

最新評(píng)論