淺談Node.js:理解stream
Stream在node.js中是一個抽象的接口,基于EventEmitter,也是一種Buffer的高級封裝,用來處理流數(shù)據(jù)。流模塊便是提供各種API讓我們可以很簡單的使用Stream。
流分為四種類型,如下所示:
- Readable,可讀流
- Writable,可寫流
- Duplex,讀寫流
- Transform,擴(kuò)展的Duplex,可修改寫入的數(shù)據(jù)
1、Readable可讀流
通過stream.Readable可創(chuàng)建一個可讀流,它有兩種模式:暫停和流動。
在流動模式下,將自動從下游系統(tǒng)讀取數(shù)據(jù)并使用data事件輸出;暫停模式下,必須顯示調(diào)用stream.read()
方法讀取數(shù)據(jù),并觸發(fā)data事件。
所有的可讀流最開始都是暫停模式,可以通過以下方法切換到流動模式:
- 監(jiān)聽'data'事件
- 調(diào)用
stream.resume()
方法 - 調(diào)用
stream.pipe()
方法將數(shù)據(jù)輸出到一個可寫流Writable
同樣地,也可以切換到暫停模式,有兩種方法:
- 如果沒有設(shè)置pipe目標(biāo),調(diào)用
stream.pause()
方法即可。 - 如果設(shè)置了pipe目標(biāo),則需要移除所有的data監(jiān)聽和調(diào)用
stream.unpipe()
方法
在Readable對象中有一個_readableSate
的對象,通過該對象可以得知流當(dāng)前處于什么模式,如下所示:
- readable._readableState.flowing = null,沒有數(shù)據(jù)消費(fèi)者,流不產(chǎn)生數(shù)據(jù)
- readable._readableState.flowing = true,處于流動模式
- readable._readableState.flowing = false,處于暫停模式
為什么使用流取數(shù)據(jù)
對于小文件,使用fs.readFile()
方法讀取數(shù)據(jù)更方便,但需要讀取大文件的時候,比如幾G大小的文件,使用該方法將消耗大量的內(nèi)存,甚至使程序崩潰。這種情況下,使用流來處理是更合適的,采用分段讀取,便不會造成內(nèi)存的'爆倉'問題。
data事件
在stream提供數(shù)據(jù)塊給消費(fèi)者時觸發(fā),有可能是切換到流動模式的時候,也有可能是調(diào)用readable.read()
方法且有有效數(shù)據(jù)塊的時候,使用如下所示:
const fs = require('fs'); const rs = fs.createReadStream('./appbak.js'); var chunkArr = [], chunkLen = 0; rs.on('data',(chunk)=>{ chunkArr.push(chunk); chunkLen+=chunk.length; }); rs.on('end',(chunk)=>{ console.log(Buffer.concat(chunkArr,chunkLen).toString()); });
readable事件
當(dāng)流中有可用數(shù)據(jù)能被讀取時觸發(fā),分為兩種,新的可用的數(shù)據(jù)和到達(dá)流的末尾,前者stream.read()
方法返回可用數(shù)據(jù),后者返回null,如下所示:
const rs = fs.createReadStream('./appbak.js'); var chunkArr = [], chunkLen = 0; rs.on('readable',()=>{ var chunk = null; //這里需要判斷是否到了流的末尾 if((chunk = rs.read()) !== null){ chunkArr.push(chunk); chunkLen+=chunk.length; } }); rs.on('end',(chunk)=>{ console.log(Buffer.concat(chunkArr,chunkLen).toString()); });
pause和resume方法
stream.pause()
方法讓流進(jìn)入暫停模式,并停止'data'事件觸發(fā),stream.resume()
方法使流進(jìn)入流動模式,并恢復(fù)'data'事件觸發(fā),也可以用來消費(fèi)所有數(shù)據(jù),如下所示:
const rs = fs.createReadStream('./下載.png'); rs.on('data',(chunk)=>{ console.log(`接收到${chunk.length}字節(jié)數(shù)據(jù)...`); rs.pause(); console.log(`數(shù)據(jù)接收將暫停1.5秒.`); setTimeout(()=>{ rs.resume(); },1000); }); rs.on('end',(chunk)=>{ console.log(`數(shù)據(jù)接收完畢`); });
pipe(destination[, options])方法
pipe()
方法綁定一個可寫流到可讀流上,并自動切換到流動模式,將所有數(shù)據(jù)輸出到可寫流,以及做好了數(shù)據(jù)流的管理,不會發(fā)生數(shù)據(jù)丟失的問題,使用如下所示:
const rs = fs.createReadStream('./app.js'); rs.pipe(process.stdout);
以上介紹了多種可讀流的數(shù)據(jù)消費(fèi)的方法,但對于一個可讀流,最好只選擇其中的一種,推薦使用pipe()
方法。
2、Writable可寫流
所有的可寫流都是基于stream.Writable
類創(chuàng)建的,創(chuàng)建之后便可將數(shù)據(jù)寫入該流中。
write(chunk[, encoding][, callback])方法
write()
方法向可寫流中寫入數(shù)據(jù),參數(shù)含義:
- chunk,字符串或buffer
- encoding,若chunk為字符串,則是chunk的編碼
- callback,當(dāng)前chunk數(shù)據(jù)寫入磁盤時的回調(diào)函數(shù)
該方法的返回值為布爾值,如果為false,則表示需要寫入的數(shù)據(jù)塊被緩存并且此時緩存的大小超出highWaterMark閥值,否則為true。
使用如下所示:
const ws = fs.createWriteStream('./test.txt'); ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');}); ws.end('done.')
背壓機(jī)制
如果可寫流的寫入速度跟不上可讀流的讀取速度,write方法添加的數(shù)據(jù)將被緩存,逐漸增多,導(dǎo)致占用大量內(nèi)存。我們希望的是消耗一個數(shù)據(jù),再去讀取一個數(shù)據(jù),這樣內(nèi)存就維持在一個水平上。如何做到這一點(diǎn)?可以利用write方法的返回值來判斷可寫流的緩存狀態(tài)和'drain'事件,及時切換可讀流的模式,如下所示:
function copy(src,dest){ src = path.resolve(src); dest = path.resolve(dest); const rs = fs.createReadStream(src); const ws = fs.createWriteStream(dest); console.log('正在復(fù)制中...'); const stime = +new Date(); rs.on('data',(chunk)=>{ if(null === ws.write(chunk)){ rs.pause(); } }); ws.on('drain',()=>{ rs.resume(); }); rs.on('end',()=>{ const etime = +new Date(); console.log(`已完成,用時:${(etime-stime)/1000}秒`); ws.end(); }); function calcProgress(){ } } copy('./CSS權(quán)威指南 第3版.pdf','./javascript.pdf');
drain事件
如果Writable.write()
方法返回false,則drain事件將會被觸發(fā),上面的背壓機(jī)制已經(jīng)使用了該事件。
finish事件
在調(diào)用stream.end()
方法之后且所有緩存區(qū)的數(shù)據(jù)都被寫入到下游系統(tǒng),就會觸發(fā)該事件,如下所示:
const ws = fs.createWriteStream('./alphabet.txt'); const alphabetStr = 'abcdefghijklmnopqrstuvwxyz'; ws.on('finish',()=>{ console.log('done.'); }); for(let letter of alphabetStr.split()){ ws.write(letter); } ws.end();//必須調(diào)用
end([chunk][, encoding][, callback])方法
end()
方法被調(diào)用之后,便不能再調(diào)用stream.write()
方法寫入數(shù)據(jù),負(fù)責(zé)將拋出錯誤。
3、Duplex讀寫流
Duplex流同時實(shí)現(xiàn)了Readable與Writable類的接口,既是可讀流,也是可寫流。例如'zlib streams'、'crypto streams'、'TCP sockets'等都是Duplex流。
4、Transform流
Duplex流的擴(kuò)展,區(qū)別在于,Transform流自動將寫入端的數(shù)據(jù)變換后添加到可讀端。例如:'zlib streams'、'crypto streams'等都是Transform流。
5、四種流的實(shí)現(xiàn)
stream
模塊提供的API可以讓我們很簡單的實(shí)現(xiàn)流,該模塊使用require('stream')
引用,我們只要繼承四種流中的一個基類(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform)
,然后實(shí)現(xiàn)它的接口就可以了,需要實(shí)現(xiàn)的接口如下所示:
| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |
Readable流實(shí)現(xiàn)
如上所示,我們只要繼承Readable類并實(shí)現(xiàn)_read接口即可,,如下所示:
const Readable = require('stream').Readable; const util = require('util'); const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split(); /*function AbReadable(){ if(!this instanceof AbReadable){ return new AbReadable(); } Readable.call(this); } util.inherits(AbReadable,Readable); AbReadable.prototype._read = function(){ if(!alphabetArr.length){ this.push(null); }else{ this.push(alphabetArr.shift()); } }; const abReadable = new AbReadable(); abReadable.pipe(process.stdout);*/ /*class AbReadable extends Readable{ constructor(){ super(); } _read(){ if(!alphabetArr.length){ this.push(null); }else{ this.push(alphabetArr.shift()); } } } const abReadable = new AbReadable(); abReadable.pipe(process.stdout);*/ /*const abReadable = new Readable({ read(){ if(!alphabetArr.length){ this.push(null); }else{ this.push(alphabetArr.shift()); } } }); abReadable.pipe(process.stdout);*/ const abReadable = Readable(); abReadable._read = function(){ if (!alphabetArr.length) { this.push(null); } else { this.push(alphabetArr.shift()); } } abReadable.pipe(process.stdout);
以上代碼使用了四種方法創(chuàng)建一個Readable可讀流,必須實(shí)現(xiàn)_read()
方法,以及用到了readable.push()
方法,該方法的作用是將指定的數(shù)據(jù)添加到讀取隊(duì)列。
Writable流實(shí)現(xiàn)
我們只要繼承Writable類并實(shí)現(xiàn)_write或_writev接口,如下所示(只使用兩種方法):
/*class MyWritable extends Writable{ constructor(){ super(); } _write(chunk,encoding,callback){ process.stdout.write(chunk); callback(); } } const myWritable = new MyWritable();*/ const myWritable = new Writable({ write(chunk,encoding,callback){ process.stdout.write(chunk); callback(); } }); myWritable.on('finish',()=>{ process.stdout.write('done'); }) myWritable.write('a'); myWritable.write('b'); myWritable.write('c'); myWritable.end();
Duplex流實(shí)現(xiàn)
實(shí)現(xiàn)Duplex流,需要繼承Duplex類,并實(shí)現(xiàn)_read和_write接口,如下所示:
class MyDuplex extends Duplex{ constructor(){ super(); this.source = []; } _read(){ if (!this.source.length) { this.push(null); } else { this.push(this.source.shift()); } } _write(chunk,encoding,cb){ this.source.push(chunk); cb(); } } const myDuplex = new MyDuplex(); myDuplex.on('finish',()=>{ process.stdout.write('write done.') }); myDuplex.on('end',()=>{ process.stdout.write('read done.') }); myDuplex.write('\na\n'); myDuplex.write('c\n'); myDuplex.end('b\n'); myDuplex.pipe(process.stdout);
上面的代碼實(shí)現(xiàn)了_read()
方法,可作為可讀流來使用,同時實(shí)現(xiàn)了_write()
方法,又可作為可寫流來使用。
Transform流實(shí)現(xiàn)
實(shí)現(xiàn)Transform流,需要繼承Transform類,并實(shí)現(xiàn)_transform接口,如下所示:
class MyTransform extends Transform{ constructor(){ super(); } _transform(chunk, encoding, callback){ chunk = (chunk+'').toUpperCase(); callback(null,chunk); } } const myTransform = new MyTransform(); myTransform.write('hello world!'); myTransform.end(); myTransform.pipe(process.stdout);
上面代碼中的_transform()
方法,其第一個參數(shù),要么為error,要么為null,第二個參數(shù)將被自動轉(zhuǎn)發(fā)給readable.push()
方法,因此該方法也可以使用如下寫法:
_transform(chunk, encoding, callback){ chunk = (chunk+'').toUpperCase() this.push(chunk) callback(); }
Object Mode流實(shí)現(xiàn)
我們知道流中的數(shù)據(jù)默認(rèn)都是Buffer類型,可讀流的數(shù)據(jù)進(jìn)入流中便被轉(zhuǎn)換成buffer,然后被消耗,可寫流寫入數(shù)據(jù)時,底層調(diào)用也將其轉(zhuǎn)化為buffer。但將構(gòu)造函數(shù)的objectMode選擇設(shè)置為true,便可產(chǎn)生原樣的數(shù)據(jù),如下所示:
const rs = Readable(); rs.push('a'); rs.push('b'); rs.push(null); rs.on('data',(chunk)=>{console.log(chunk);});//<Buffer 61>與<Buffer 62> const rs1 = Readable({objectMode:!0}); rs1.push('a'); rs1.push('b'); rs1.push(null); rs1.on('data',(chunk)=>{console.log(chunk);});//a與b
下面利用Transform流實(shí)現(xiàn)一個簡單的CSS壓縮工具,如下所示:
function minify(src,dest){ const transform = new Transform({ transform(chunk,encoding,cb){ cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,'')); } }); fs.createReadStream(src,{encoding:'utf8'}).pipe(transform).pipe(fs.createWriteStream(dest)); } minify('./reset.css','./reset.min.css');
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Node.js?express中的身份認(rèn)證的實(shí)現(xiàn)
本文主要介紹了Node.js?express中的身份認(rèn)證的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01node簡單實(shí)現(xiàn)一個更改頭像功能的示例
本篇文章主要介紹了node簡單實(shí)現(xiàn)一個更改頭像功能的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12Node.JS 循環(huán)遞歸復(fù)制文件夾目錄及其子文件夾下的所有文件
在Node.js中,要實(shí)現(xiàn)目錄文件夾的循環(huán)遞歸復(fù)制也非常簡單,使用fs模塊即可,僅需幾行,而且性能也不錯,我們先來實(shí)現(xiàn)文件的復(fù)制,需要的朋友可以參考下2017-09-09Node.js中的require.resolve方法使用簡介
在Node.js中,可以使用require.resolve函數(shù)來查詢某個模塊文件的帶有完整絕對路徑的文件名,下面這篇文章主要介紹了Node.js中require.resolve方法使用的相關(guān)資料,需要的朋友可以參考借鑒,下面來一起看看吧。2017-04-04