淺談Node異步編程的機(jī)制
本文介紹了Node異步編程,分享給大家,具體如下:
目前的異步編程主要解決方案有:
- 事件發(fā)布/訂閱模式
- Promise/Deferred模式
- 流程控制庫(kù)
事件發(fā)布/訂閱模式
Node自身提供了events模塊,可以輕松實(shí)現(xiàn)事件的發(fā)布/訂閱
//訂閱 emmiter.on("event1",function(message){ console.log(message); }) //發(fā)布 emmiter.emit("event1","I am mesaage!");
偵聽(tīng)器可以很靈活地添加和刪除,使得事件和具體處理邏輯之間可以很輕松的關(guān)聯(lián)和解耦
事件發(fā)布/訂閱模式常常用來(lái)解耦業(yè)務(wù)邏輯,事件發(fā)布者無(wú)需關(guān)注訂閱的偵聽(tīng)器如何實(shí)現(xiàn)業(yè)務(wù)邏輯,甚至不用關(guān)注有多少個(gè)偵聽(tīng)器存在,數(shù)據(jù)通過(guò)消息的方式可以很靈活的進(jìn)行傳遞。
下面的HTTP就是典型的應(yīng)用場(chǎng)景
var req = http.request(options,function(res){ res.on('data',function(chunk){ console.log('Body:'+ chunk); }) res.on('end',function(){ //TODO }) })
如果一個(gè)事件添加了超過(guò)10個(gè)偵聽(tīng)器,將會(huì)得到一條警告,可以通過(guò)調(diào)用emmite.setMaxListeners(0)將這個(gè)限制去掉
繼承events模塊
var events = require('events'); function Stream(){ events.EventEmiiter.call(this); } util.inherits(Stream,events.EventEmitter);
利用事件隊(duì)列解決雪崩問(wèn)題
所謂雪崩問(wèn)題,就是在高訪問(wèn)量,大并發(fā)量的情況下緩存失效的情況,此時(shí)大量的請(qǐng)求同時(shí)融入數(shù)據(jù)庫(kù)中,數(shù)據(jù)庫(kù)無(wú)法同時(shí)承受如此大的查詢請(qǐng)求,進(jìn)而往前影響到網(wǎng)站整體的響應(yīng)速度
解決方案:
var proxy = new events.EventEmitter(); var status = "ready"; var seletc = function(callback){ proxy.once("selected",callback);//為每次請(qǐng)求訂閱這個(gè)查詢時(shí)間,推入事件回調(diào)函數(shù)隊(duì)列 if(status === 'ready'){ status = 'pending';//設(shè)置狀態(tài)為進(jìn)行中以防止引起多次查詢操作 db.select("SQL",function(results){ proxy.emit("selected",results); //查詢操作完成后發(fā)布時(shí)間 status = 'ready';//重新定義為已準(zhǔn)備狀態(tài) }) } }
多異步之間的協(xié)作方案
以上情況事件與偵聽(tīng)器的關(guān)系都是一對(duì)多的,但在異步編程中,也會(huì)出現(xiàn)事件與偵聽(tīng)器多對(duì)一的情況。
這里以渲染頁(yè)面所需要的模板讀取、數(shù)據(jù)讀取和本地化資源讀取為例簡(jiǎn)要介紹一下
var count = 0 ; var results = {}; var done = function(key,value){ result[key] = value; count++; if(count === 3){ render(results); } } fs.readFile(template_path,"utf8",function(err,template){ done('template',template) }) db.query(sql,function(err,data){ done('data',data); }) l10n.get(function(err,resources){ done('resources',resources) })
偏函數(shù)方案
var after = function(times,callback){ var count = 0, result = {}; return function(key,value){ results[key] = value; count++; if(count === times){ callback(results); } } } var done = after(times,render); var emitter = new events.Emitter(); emitter.on('done',done); //一個(gè)偵聽(tīng)器 emitter.on('done',other); //如果業(yè)務(wù)增長(zhǎng),可以完成多對(duì)多的方案 fs.readFile(template_path,"utf8",function(err,template){ emitter.emit('done','template',template); }) db.query(sql,function(err,data){ emitter.emit('done','data',data); }) l10n.get(function(err,resources){ emitter.emit('done','resources',resources) })
引入EventProxy模塊方案
var proxy = new EventProxy(); proxy.all('template','data','resources',function(template,data,resources){ //TODO }) fs.readFile(template_path,'utf8',function(err,template){ proxy.emit('template',template); }) db.query(sql,function(err,data){ proxy.emit('data',data); }) l10n.get(function(err,resources){ proxy.emit('resources',resources); })
Promise/Deferred模式
以上使用事件的方式時(shí),執(zhí)行流程都需要被預(yù)先設(shè)定,這是發(fā)布/訂閱模式的運(yùn)行機(jī)制所決定的。
$.get('/api',{ success:onSuccess, err:onError, complete:onComplete }) //需要嚴(yán)謹(jǐn)設(shè)置目標(biāo)
那么是否有一種先執(zhí)行異步調(diào)用,延遲傳遞處理的方式的?接下來(lái)要說(shuō)的就是針對(duì)這種情況的方式:Promise/Deferred模式
Promise/A
Promise/A提議對(duì)單個(gè)異步操作做出了這樣的抽象定義:
- Promise操作只會(huì)處在三種狀態(tài)的一種:未完成態(tài),完成態(tài)和失敗態(tài)。
- Promise的狀態(tài)只會(huì)出現(xiàn)從未完成態(tài)向完成態(tài)或失敗態(tài)轉(zhuǎn)化,不能逆反,完成態(tài)和失敗態(tài)不能相互轉(zhuǎn)化
- Promise的狀態(tài)一旦轉(zhuǎn)化,就不能被更改。
一個(gè)Promise對(duì)象只要具備then()即可
- 接受完成態(tài)、錯(cuò)誤態(tài)的回調(diào)方法
- 可選地支持progress事件回調(diào)作為第三個(gè)方法
- then()方法只接受function對(duì)象,其余對(duì)象將被忽略
- then()方法繼續(xù)返回Promise對(duì)象,以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用
通過(guò)Node的events模塊來(lái)模擬一個(gè)Promise的實(shí)現(xiàn)
var Promise = function(){ EventEmitter.call(this) } util.inherits(Promise,EventEmitter); Promise.prototype.then = function(fulfilledHandler,errHandler,progeressHandler){ if(typeof fulfilledHandler === 'function'){ this.once('success',fulfilledHandler); //實(shí)現(xiàn)監(jiān)聽(tīng)對(duì)應(yīng)事件 } if(typeof errorHandler === 'function'){ this.once('error',errorHandler) } if(typeof progressHandler === 'function'){ this.on('progress',progressHandler); } return this; }
以上通過(guò)then()將回調(diào)函數(shù)存放起來(lái),接下來(lái)就是等待success、error、progress事件被觸發(fā),實(shí)現(xiàn)這個(gè)功能的對(duì)象稱為Deferred對(duì)象,即延遲對(duì)象。
var Deferred = function(){ this.state = 'unfulfilled'; this.promise = new Promise(); } Deferred.prototype.resolve = function(obj){ //當(dāng)異步完成后可將resolve作為回調(diào)函數(shù),觸發(fā)相關(guān)事件 this.state = 'fulfilled'; this.promise.emit('success',obj); } Deferred.prototype.reject = function(err){ this.state = 'failed'; this.promise.emit('error',err); } Deferred.prototype.progress = function(data){ this.promise.emit('progress',data) }
因此,可以對(duì)一個(gè)典型的響應(yīng)對(duì)象進(jìn)行封裝
res.setEncoding('utf8'); res.on('data',function(chunk){ console.log("Body:" + chunk); }) res.on('end',function(){ //done }) res.on('error',function(err){ //error }
轉(zhuǎn)換成
res.then(function(){ //done },function(err){ //error },function(chunk){ console.log('Body:' + chunk); })
要完成上面的轉(zhuǎn)換,首先需要對(duì)res對(duì)象進(jìn)行封裝,對(duì)data,end,error等事件進(jìn)行promisify
var promisify = function(res){ var deferred = new Deferred(); //創(chuàng)建一個(gè)延遲對(duì)象來(lái)在res的異步完成回調(diào)中發(fā)布相關(guān)事件 var result = ''; //用來(lái)在progress中持續(xù)接收數(shù)據(jù) res.on('data',function(chunk){ //res的異步操作,回調(diào)中發(fā)布事件 result += chunk; deferred.progress(chunk); }) res.on('end',function(){ deferred.resolve(result); }) res.on('error',function(err){ deferred.reject(err); }); return deferred.promise //返回deferred.promise,讓外界不能改變deferred的狀態(tài),只能讓promise的then()方法去接收外界來(lái)偵聽(tīng)相關(guān)事件。 } promisify(res).then(function(){ //done },function(err){ //error },function(chunk){ console.log('Body:' + chunk); })
以上,它將業(yè)務(wù)中不可變的部分封裝在了Deferred中,將可變的部分交給了Promise
Promise中的多異步協(xié)作
Deferred.prototype.all = function(promises){ var count = promises.length; //記錄傳進(jìn)的promise的個(gè)數(shù) var that = this; //保存調(diào)用all的對(duì)象 var results = [];//存放所有promise完成的結(jié)果 promises.forEach(function(promise,i){//對(duì)promises逐個(gè)進(jìn)行調(diào)用 promise.then(function(data){//每個(gè)promise成功之后,存放結(jié)果到result中,count--,直到所有promise被處理完了,才出發(fā)deferred的resolve方法,發(fā)布事件,傳遞結(jié)果出去 count--; result[i] = data; if(count === 0){ that.resolve(results); } },function(err){ that.reject(err); }); }); return this.promise; //返回promise來(lái)讓外界偵聽(tīng)這個(gè)deferred發(fā)布的事件。 } var promise1 = readFile('foo.txt','utf-8');//這里的文件讀取已經(jīng)經(jīng)過(guò)promise化 var promise2 = readFile('bar.txt','utf-8'); var deferred = new Deferred(); deferred.all([promise1,promise2]).thne(function(results){//promise1和promise2的then方法在deferred內(nèi)部的all方法所調(diào)用,用于同步所有的promise //TODO },function(err){ //TODO })
支持序列執(zhí)行的Promise
嘗試改造一下代碼以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用
var Deferred = function(){ this.promise = new Promise() } //完成態(tài) Deferred.prototype.resolve = function(obj){ var promise = this.promise; var handler; while((handler = promise.queue.shift())){ if(handler && handler.fulfilled){ var ret = handler.fulfilled(obj); if(ret && ret.isPromise){ ret.queue = promise.queue; this.promise = ret; return; } } } } //失敗態(tài) Deferred.prototype.reject = function(err){ var promise = this.promise; var handler; while((handler = promise.queue.shift())){ if(handler && handler.error){ var ret = handler.error(err); if(ret && ret.isPromise){ ret.queue = promise.queue; this.promise = ret; return } } } } //生成回調(diào)函數(shù) Deferred.prototype.callback = function(){ var that = this; return function(err,file){ if(err){ return that.reject(err); } that.resolve(file) } } var Promise = function(){ this.queue = []; //隊(duì)列用于存儲(chǔ)待執(zhí)行的回到函數(shù) this.isPromise = true; }; Promise.prototype.then = function(fulfilledHandler,errorHandler,progressHandler){ var handler = {}; if(typeof fulfilledHandler === 'function'){ handler.fulfilled = fulfilledHandler; } if(typeof errorHandler === 'function'){ handler.error = errorHandler; } this.queue.push(handler); return this; } var readFile1 = function(file,encoding){ var deferred = new Deferred(); fs.readFile(file,encoding,deferred.callback()); return deferred.promise; } var readFile2 = function(file,encoding){ var deferred = new Deferred(); fs.readFile(file,encoding,deferred.callback()); return deferred.promise; } readFile1('file1.txt','utf8').then(function(file1){ return readFile2(file1.trim(),'utf8') }).then(function(file2){ console.log(file2) })
流程控制庫(kù)另外進(jìn)行總結(jié)
參考《深入淺出node.js》一書(shū),想學(xué)學(xué)習(xí)可以下載電子書(shū),下載地址:http://www.dbjr.com.cn/books/481114.html
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
node.js中的fs.fsyncSync方法使用說(shuō)明
這篇文章主要介紹了node.js中的fs.fsyncSync方法使用說(shuō)明,本文介紹了fs.fsyncSync的方法說(shuō)明、語(yǔ)法、接收參數(shù)、使用實(shí)例和實(shí)現(xiàn)源碼,需要的朋友可以參考下2014-12-12electron原理,以及electron生成可執(zhí)行文件的方法實(shí)例分析
這篇文章主要介紹了electron原理,以及electron生成可執(zhí)行文件的方法,結(jié)合實(shí)例形式分析了electron原理以及electron生成可執(zhí)行文件的具體操作步驟、實(shí)現(xiàn)方法與相關(guān)注意事項(xiàng),需要的朋友可以參考下2023-04-04淺談Express.js解析Post數(shù)據(jù)類型的正確姿勢(shì)
這篇文章主要介紹了Express.js解析Post數(shù)據(jù)類型的正確姿勢(shì),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-05-05node.js中的fs.writeFile方法使用說(shuō)明
這篇文章主要介紹了node.js中的fs.writeFile方法使用說(shuō)明,本文介紹了fs.writeFile的方法說(shuō)明、語(yǔ)法、接收參數(shù)、使用實(shí)例和實(shí)現(xiàn)源碼,需要的朋友可以參考下2014-12-12搭建基于express框架運(yùn)行環(huán)境的方法步驟
Express提供了一個(gè)輕量級(jí)模塊,把Node.js的http模塊功能封裝在一個(gè)簡(jiǎn)單易用的接口中,這篇文章主要介紹了搭建基于express框架運(yùn)行環(huán)境的方法步驟,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-11-11詳解node如何讓一個(gè)端口同時(shí)支持https與http
眾所周知node是一個(gè)高性能的web服務(wù)器,使用它可以很簡(jiǎn)單的創(chuàng)建一個(gè)http或https的服務(wù)器。這篇文章主要介紹了詳解node如何讓一個(gè)端口同時(shí)支持https與http2017-07-07