Node.js中的流(Stream)的作用詳解
Node.js 中的流(Stream)是用來(lái)比喻數(shù)據(jù)傳輸?shù)囊环N形式,數(shù)據(jù)傳輸?shù)钠瘘c(diǎn)就是流的源頭,數(shù)據(jù)傳輸?shù)慕K點(diǎn)就是流的終點(diǎn)。例如在網(wǎng)頁(yè)發(fā)起一個(gè) HTTP 請(qǐng)求,瀏覽器就是流的源頭,服務(wù)器就是流的終點(diǎn)。等服務(wù)器處理完請(qǐng)求,返回響應(yīng)時(shí),服務(wù)器就變成了流的源頭,瀏覽器變成了流的終點(diǎn)。
數(shù)據(jù)從一端連續(xù)不斷地傳輸?shù)搅硪欢耍拖袼粯訌囊欢肆鞯搅硪欢?,所以用流?lái)比喻數(shù)據(jù)的傳輸形式。只不過計(jì)算機(jī)中的流傳輸?shù)氖菙?shù)據(jù)(字節(jié)),而不是水。
在 Node.js 中,stream 模塊提供了用于實(shí)現(xiàn)流接口的 API。但是很多內(nèi)置模塊都提供了關(guān)于流的 API,所以通常不需要顯式的調(diào)用 stream 模塊來(lái)使用流。
為什么要使用流
v1 版本示例程序
下面看一個(gè)簡(jiǎn)單的示例:
const path = require('path') printMemoryUsage() fs.readFile(resolveFile('./test.txt'), (err, data) => { if (err) throw err printMemoryUsage() fs.writeFile(resolveFile('./test2.txt'), data, err => { if (err) throw err console.log('done') }) }) function resolveFile(filepath) { return path.resolve(__dirname, filepath) } // 打印內(nèi)存占用情況 function printMemoryUsage() { const info = process.memoryUsage(); // heapTotal:對(duì)應(yīng)v8的堆內(nèi)存信息,是堆中總共申請(qǐng)的內(nèi)存量。 // heapUsed:表示堆中使用的內(nèi)存量。 // rss:是resident set size的縮寫,即常駐內(nèi)存的部分。 console.log('rss=%s, heapTotal=%s, heapUsed=%s', formatMemory(info.rss), formatMemory(info.heapTotal), formatMemory(info.heapUsed)); }
v1 版本的程序每次執(zhí)行時(shí)都得把整個(gè) ./test.txt
文件讀取到內(nèi)存,然后再把內(nèi)容寫入到 ./test2.txt
文件。這個(gè) ./test.txt
文件大小為 1.04 GB,下面的信息就是在拷貝過程中打印的內(nèi)存占用信息。
rss=18.09MB, heapTotal=4.68MB, heapUsed=2.64MB rss=1011.52MB, heapTotal=7.18MB, heapUsed=2.36MB done
從這個(gè)信息可以看出,當(dāng)程序讀取的文件越大,內(nèi)存占用就越大(1011.52MB),因此會(huì)導(dǎo)致其他進(jìn)程處理變慢以及過多的垃圾回收,甚至內(nèi)存耗盡,導(dǎo)致程序崩潰。
v2 版本示例程序
如果用流來(lái)重寫 v1 程序,我們就可以避免內(nèi)存占用過大的問題。因?yàn)榱魇强梢砸贿呑x取數(shù)據(jù)一邊消費(fèi)數(shù)據(jù)的,它不需要等到所有的數(shù)據(jù)都準(zhǔn)備好。
// 可讀流 const readStream = fs.createReadStream(resolveFile('./test.txt')); // 可寫流 const writeStream = fs.createWriteStream(resolveFile('./test2.txt')); // 每讀取到一塊數(shù)據(jù),就會(huì)觸發(fā) data 事件 readStream.on('data', data => { printMemoryUsage() writeStream.write(data) }); readStream.on('end', () => { console.log('done') });
... rss=100.89MB, heapTotal=7.98MB, heapUsed=4.18MB rss=100.89MB, heapTotal=7.98MB, heapUsed=4.18MB rss=100.89MB, heapTotal=7.98MB, heapUsed=4.19MB done
從控制臺(tái)打印的信息來(lái)看,內(nèi)存占用一直穩(wěn)定為 100.89 MB,沒有給系統(tǒng)造成太大的負(fù)擔(dān)。因此,在需要處理一些尺寸較大的文件時(shí),使用流是最好的選擇。
v3 版本示例程序
但是 v2 程序也不完美,因?yàn)榭勺x流和可寫流的速率不一定相等。而 v2 程序在每次觸發(fā)可讀流的 data
事件時(shí)就向可寫流寫入數(shù)據(jù),這時(shí)可寫流的緩沖區(qū)有可能已經(jīng)滿了。如果繼續(xù)寫入更多的數(shù)據(jù),會(huì)導(dǎo)致內(nèi)存占用越來(lái)越大,甚至內(nèi)存耗盡,丟失數(shù)據(jù)。這個(gè)現(xiàn)象又叫背壓(Back pressure)。
在數(shù)據(jù)流從上游生產(chǎn)者向下游消費(fèi)者傳輸?shù)倪^程中,上游生產(chǎn)速度大于下游消費(fèi)速度,導(dǎo)致下游的 Buffer 溢出,這種現(xiàn)象就叫做 Backpressure。這句話的重點(diǎn)不在于「上游生產(chǎn)速度大于下游消費(fèi)速度」,而在于「Buffer 溢出」。
如果出現(xiàn)這個(gè)現(xiàn)象,解決方案是什么呢?我們可以在寫入流緩沖區(qū)已經(jīng)滿載的情況下,暫停可讀流讀取數(shù)據(jù)的行為。這可以通過 write()
的返回值來(lái)判斷。
每個(gè)流在創(chuàng)建時(shí)都可以設(shè)置 highWaterMark
屬性的值(默認(rèn)為16384,即 16 KB),這個(gè)值就是緩沖區(qū)閾值的大小。可寫流的緩沖區(qū)如果超過了閾值,再調(diào)用 write()
寫入數(shù)據(jù)時(shí),會(huì)返回 false;如果緩沖區(qū)未超過閾值,則返回 true。
因此我們可以把 v2 版本的程序改寫一下:
const readStream = fs.createReadStream(resolveFile('./test.txt')); const writeStream = fs.createWriteStream(resolveFile('./test2.txt')); readStream.on('data', data => { printMemoryUsage() if (!writeStream.write(data)) { // 暫停讀取數(shù)據(jù) readStream.pause() // 當(dāng)可寫流的緩沖區(qū)排空時(shí),會(huì)觸發(fā) drain 事件 writeStream.once('drain', () => { // 繼續(xù)讀取數(shù)據(jù) readStream.resume() }); } }); readStream.on('end', () => { console.log('done') });
然后看一下內(nèi)存占用的信息:
... rss=84.20MB, heapTotal=7.98MB, heapUsed=4.75MB rss=84.20MB, heapTotal=7.98MB, heapUsed=4.76MB done
從上面的信息可以看出,v3 程序最大內(nèi)存占用為 84.20 MB,比起上一版的內(nèi)存占用更小,這就是優(yōu)化后的效果。
v4 版本示例程序
v3 版本的程序效果很好,但是要寫的代碼稍微有點(diǎn)多。還好流模塊提供了 pipe()
來(lái)幫我們做這件事:
const readStream = fs.createReadStream(resolveFile('../test.txt')); const writeStream = fs.createWriteStream(resolveFile('../test2.txt')); function resolveFile(filepath) { return path.resolve(__dirname, filepath) } readStream.on('data', () => { printMemoryUsage() }); readStream.on('end', () => { console.log('done') }); readStream.pipe(writeStream)
... rss=94.80MB, heapTotal=7.98MB, heapUsed=4.89MB rss=94.80MB, heapTotal=7.98MB, heapUsed=4.90MB rss=94.80MB, heapTotal=7.98MB, heapUsed=4.90MB done
pipe()
將可寫流綁定到可讀流,使其自動(dòng)切換到流動(dòng)模式并將其所有數(shù)據(jù)推送到綁定的可寫流。 數(shù)據(jù)流將被自動(dòng)管理,以便目標(biāo)可寫流不會(huì)被更快的可讀流漫過。也就是說,pipe()
將數(shù)據(jù)緩沖限制在可接受的水平,以便不同速度的來(lái)源和目標(biāo)不會(huì)壓倒可用內(nèi)存。
流的類型
Node.js 中有四種基本的流類型:
- Readable: 可讀流,可以從中讀取數(shù)據(jù)的流(例如,fs.createReadStream())。
- Writable: 可寫流,可以寫入數(shù)據(jù)的流(例如,fs.createWriteStream())。
- Duplex: 雙工流,Readable 和 Writable 的流(例如,net.Socket)。
- Transform: 可以在寫入和讀取數(shù)據(jù)時(shí)修改或轉(zhuǎn)換數(shù)據(jù)的 Duplex 流(例如,zlib.createDeflate())。
緩沖
Writable 和 Readable 流都將數(shù)據(jù)存儲(chǔ)在內(nèi)部緩沖區(qū)中。
允許緩沖的數(shù)據(jù)量取決于傳給流的構(gòu)造函數(shù)的 highWaterMark 選項(xiàng)。 對(duì)于普通的流,highWaterMark 選項(xiàng)指定字節(jié)的總數(shù)。
當(dāng)實(shí)現(xiàn)調(diào)用 stream.push(chunk)
時(shí),數(shù)據(jù)緩存在 Readable 流中。 如果流的消費(fèi)者沒有調(diào)用 stream.read()
,則數(shù)據(jù)會(huì)一直駐留在內(nèi)部隊(duì)列中,直到被消費(fèi)。
一旦內(nèi)部讀取緩沖區(qū)的總大小達(dá)到 highWaterMark 指定的閾值,則流將暫時(shí)停止從底層資源讀取數(shù)據(jù),直到可以消費(fèi)當(dāng)前緩沖的數(shù)據(jù)(也就是,流將停止調(diào)用內(nèi)部的用于填充讀取緩沖區(qū) readable._read()
方法)。
當(dāng)重復(fù)調(diào)用 writable.write(chunk)
方法時(shí),數(shù)據(jù)會(huì)緩存在 Writable 流中。 雖然內(nèi)部的寫入緩沖區(qū)的總大小低于 highWaterMark 設(shè)置的閾值,但對(duì) writable.write()
的調(diào)用將返回 true。 一旦內(nèi)部緩沖區(qū)的大小達(dá)到或超過 highWaterMark,則將返回 false。
stream API 的一個(gè)關(guān)鍵目標(biāo),尤其是 stream.pipe()
方法,是將數(shù)據(jù)緩沖限制在可接受的水平,以便不同速度的來(lái)源和目標(biāo)不會(huì)壓倒可用內(nèi)存。
highWaterMark 選項(xiàng)是閾值,而不是限制:它規(guī)定了流在停止請(qǐng)求更多數(shù)據(jù)之前緩沖的數(shù)據(jù)量。 它通常不強(qiáng)制執(zhí)行嚴(yán)格的內(nèi)存限制。 特定的流實(shí)現(xiàn)可能會(huì)選擇實(shí)施更嚴(yán)格的限制,但這樣做是可選的。
由于 Duplex 和 Transform 流都是 Readable 和 Writable,因此每個(gè)流都維護(hù)兩個(gè)獨(dú)立的內(nèi)部緩沖區(qū),用于讀取和寫入,允許每一端獨(dú)立操作,同時(shí)保持適當(dāng)且高效的數(shù)據(jù)流。 例如,net.Socket 實(shí)例是 Duplex 流,其 Readable 端允許消費(fèi)從套接字接收的數(shù)據(jù),其 Writable 端允許將數(shù)據(jù)寫入套接字。 因?yàn)閿?shù)據(jù)可能以比接收數(shù)據(jù)更快或更慢的速度寫入套接字,所以每一端都應(yīng)該獨(dú)立于另一端進(jìn)行操作(和緩沖)。
Readable
可讀流是對(duì)被消費(fèi)的數(shù)據(jù)的來(lái)源的抽象。
Readable 流的示例包括:
- 客戶端上的 HTTP 響應(yīng)
- 服務(wù)器上的 HTTP 請(qǐng)求
- 文件系統(tǒng)讀取流
- 壓縮流
- 加密流
- TCP 套接字
- 子進(jìn)程的標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯(cuò)誤
- process.stdin
所有的 Readable 流都實(shí)現(xiàn)了 stream.Readable 類定義的接口。
Readable 流以兩種模式之一有效地運(yùn)行:流動(dòng)和暫停。在流動(dòng)模式下,數(shù)據(jù)會(huì)自動(dòng)從底層系統(tǒng)讀取,并通過 EventEmitter 接口使用事件盡快提供給應(yīng)用程序。在暫停模式下,必須顯式調(diào)用 stream.read()
方法以從流中讀取數(shù)據(jù)塊。
所有的 Readable 流都以暫停模式開始,但可以通過以下方式之一切換到流動(dòng)模式:
- 添加
data
事件句柄。 - 調(diào)用
stream.resume()
方法。 - 調(diào)用
stream.pipe()
方法將數(shù)據(jù)發(fā)送到 Writable。
Readable 可以使用以下方法之一切換回暫停模式:
- 如果沒有管道目標(biāo),則通過調(diào)用
stream.pause()
方法。 - 如果有管道目標(biāo),則刪除所有管道目標(biāo)。 可以通過調(diào)用
stream.unpipe()
方法刪除多個(gè)管道目標(biāo)。
Writable
可寫流是數(shù)據(jù)寫入目標(biāo)的抽象。
Writable 流的示例包括:
- 客戶端上的 HTTP 請(qǐng)求
- 服務(wù)器上的 HTTP 響應(yīng)
- 文件系統(tǒng)寫入流
- 壓縮流
- 加密流
- TCP 套接字
- 子進(jìn)程標(biāo)準(zhǔn)輸入
- process.stdout、process.stderr
其中一些示例實(shí)際上是實(shí)現(xiàn) Writable 接口的 Duplex 流。
所有的 Writable 流都實(shí)現(xiàn)了 stream.Writable 類定義的接口。
雖然 Writable 流的特定實(shí)例可能以各種方式不同,但所有的 Writable 流都遵循相同的基本使用模式,如下例所示:
const myStream = getWritableStreamSomehow(); myStream.write('some data'); myStream.write('some more data'); myStream.end('done writing data');
drain 事件
如果對(duì) stream.write(chunk)
的調(diào)用返回 false,則 drain
事件將在可以繼續(xù)將數(shù)據(jù)寫入流時(shí)觸發(fā)。
Duplex 與 Transform
雙工流是同時(shí)實(shí)現(xiàn) Readable 和 Writable 接口的流。
Duplex 流的示例包括:
- TCP 套接字
- 壓縮流
- 加密流
轉(zhuǎn)換流是可以在寫入和讀取數(shù)據(jù)時(shí)修改或轉(zhuǎn)換數(shù)據(jù)的雙工流。
Transform 流的示例包括:
- 壓縮流
- 加密流
以上就是Node.js中的流(Stream)的作用詳解的詳細(xì)內(nèi)容,更多關(guān)于Node.js 流(Stream)作用的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- node.js同步/異步文件讀寫-fs,Stream文件流操作實(shí)例詳解
- Node.js數(shù)據(jù)流Stream之Duplex流和Transform流用法
- Node.js數(shù)據(jù)流Stream之Readable流和Writable流用法
- node.js中stream流中可讀流和可寫流的實(shí)現(xiàn)與使用方法實(shí)例分析
- node.js使用stream模塊實(shí)現(xiàn)自定義流示例
- Node.js中你不可不精的Stream(流)
- Node.js中流(stream)的使用方法示例
- Node.js中的流(Stream)介紹
- Node.js 中的流Stream模塊簡(jiǎn)介及如何使用流進(jìn)行數(shù)據(jù)處理
相關(guān)文章
Bun入門學(xué)習(xí)教程吊打Node或Deno的現(xiàn)代JS運(yùn)行時(shí)
這篇文章主要為大家介紹了一款吊打Node或Deno的現(xiàn)代JS運(yùn)行時(shí),Bun入門學(xué)習(xí)教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07Nodejs?連接?mysql時(shí)報(bào)Error:?Cannot?enqueue?Query?after?fa
這篇文章主要介紹了Nodejs?連接?mysql時(shí)報(bào)Error:?Cannot?enqueue?Query?after?fatal?error錯(cuò)誤的處理辦法,需要的朋友可以參考下2023-05-05Node.js實(shí)現(xiàn)用戶身份驗(yàn)證和授權(quán)的示例代碼
在web開發(fā)中,我們常常需要對(duì)一些敏感的url進(jìn)行訪問權(quán)限控制,本文主要介紹了Node.js實(shí)現(xiàn)用戶身份驗(yàn)證和授權(quán)的示例代碼,具有一定的參考價(jià)值,感興趣的了解一下2024-02-02