一文帶你搞懂Node中的流
流是什么?
流,通俗來講就是數(shù)據(jù)流動(dòng),數(shù)據(jù)從一個(gè)地方緩慢的流到另一個(gè)地方。
舉個(gè)栗子,可以借助水管中的水流來輔助理解,當(dāng)打開水龍頭后,水便可以從源頭流出水龍頭;關(guān)閉水龍頭,水便不再流動(dòng)。
為什么需要流
那為什么會(huì)需要流吶?
其它介質(zhì)和內(nèi)存的數(shù)據(jù)規(guī)模不一致,例如磁盤的內(nèi)存往往遠(yuǎn)遠(yuǎn)大于內(nèi)存,因此磁盤中有可能會(huì)出現(xiàn)大于內(nèi)存的文件,此時(shí)內(nèi)存無法一次讀入該文件。這種情形可以把水庫(kù)比作磁盤,洗碗池比作內(nèi)存,如果不加限制,水庫(kù)的水量輕輕拿捏洗碗池,因此就需要水管來進(jìn)行傳輸,限制水的流量。
其他介質(zhì)和內(nèi)存的數(shù)據(jù)處理能力不一致,內(nèi)存的處理速度其他介質(zhì)很難比,內(nèi)存迅速處理數(shù)據(jù),一波流傳給硬盤,硬盤很難吃得消。
為了更深刻得理解流的作用,接下來我們來試一下不使用流需要如何進(jìn)行文件讀寫。
文件讀寫
首先我們來實(shí)現(xiàn)最簡(jiǎn)單的文件拷貝功能,這個(gè)比較簡(jiǎn)單,我們可以借助 fs
模塊的 readFile
和 writeFile
方法來實(shí)現(xiàn)。
readFile 和 writeFile 并沒有 promise 化,可以借助 util.promiseify 方法將其 promise 化,但這里并不是文章的重點(diǎn),因此依舊采用回調(diào)的方式
const fs = require("fs"); const path = require("path"); // 利用 path 上的方法組裝路徑 fs.readFile(path.resolve(__dirname, "test.txt"), (err, data) => { if (err) return console.log("error", err); fs.writeFile(path.resolve(__dirname, "result.txt"), data, () => { console.log("拷貝成功"); }); });
上面的代碼雖然成功實(shí)現(xiàn)了文件拷貝,但問題也很明顯,不適用于大文件,當(dāng)文件大于或接近內(nèi)存時(shí),會(huì)淹沒內(nèi)存,這也響應(yīng)了為什么需要流的第一點(diǎn)。
對(duì)于大文件,如何進(jìn)行讀寫那: 邊讀邊寫,讀一點(diǎn)寫一點(diǎn),這樣我們便可以控制文件讀寫的速率 ,也稱作分片讀寫??偟膩碚f就是邊讀邊寫。
分片讀寫
分片讀寫需要使用 fs
模塊中的 read,write,close,open
方法。
既然 fs 有方法可以實(shí)現(xiàn)邊讀邊寫,那為什么還會(huì)有流的出現(xiàn)的?這幾個(gè)方法太麻煩了,參數(shù)太多,這里只做一個(gè)演示。
首先來實(shí)現(xiàn)單個(gè)文字的讀寫。
// 創(chuàng)建一個(gè)存儲(chǔ)單位 1 的 Buffer 空間,來存儲(chǔ)中間讀取的數(shù)據(jù) let buf = Buffer.alloc(1); // 讀取源文件中的數(shù)據(jù) fs.open(path.resolve(__dirname, "test.js"), "r", function (err, rfd) { // rfd 可以理解為文字指針 // 你看到了嗎?6 個(gè)參數(shù),麻爪 // 甚至都有點(diǎn)解釋不動(dòng) fs.read(rfd, buf, 0, 1, 0, function (err, bytesRead) { // bytesRead讀取到的字節(jié)長(zhǎng)度 // 讀取到的第一個(gè)數(shù)據(jù)存入 buf 中 console.log(buf); // <Buffer 31> // 打開目標(biāo)文件。 fs.open(path.resolve(__dirname, "result.js"), "w", function (err, wfd) { // 6 個(gè)參數(shù) // 這里做的就是將 buf 內(nèi)容寫入 result fs.write(wfd, buf, 0, 1, 0, function (err, bytesWritten) { console.log("拷貝成功"); }); }); }); });
上面的方法實(shí)現(xiàn)了單次數(shù)據(jù)的讀取,我們只需要重復(fù)這個(gè)過程就可以實(shí)現(xiàn)大文件的讀寫。
如何重復(fù)實(shí)現(xiàn)上述過程那?遞歸,沒錯(cuò),就是遞歸,將讀寫部分封裝成函數(shù),在寫成功的回調(diào)函數(shù)中再次調(diào)用該函數(shù)。
// source 源文件 // target 目標(biāo)文件 // cb 回調(diào)函數(shù) // bufferSize buffer固定長(zhǎng)度,即一次讀寫的數(shù)量 function copy(source, target, cb, bufferSize = 3) { const SOURCE_PATH = path.resolve(__dirname, source); const TARGET_PATH = path.resolve(__dirname, target); let buf = Buffer.alloc(bufferSize); // 創(chuàng)建 buffer 實(shí)例 let rOffset = 0; // 讀取偏移量 let wOffset = 0; // 寫入偏移量 fs.open(SOURCE_PATH, "r", function (err, rfd) { if (err) return cb(err); fs.open(TARGET_PATH, "w", function (err, wfd) { if (err) return cb(err); // 遞歸讀寫函數(shù) next function next() { fs.read(rfd, buf, 0, bufferSize, rOffset, function (err, bytesRead) { if (err) return cb(err); // bytesRead 代表一次讀取的字節(jié)數(shù) // 當(dāng) bytesRead 為 0 時(shí),代表文件已經(jīng)成功讀完 // 則可以停止讀寫操作,關(guān)閉文件 if (bytesRead == 0) { let index = 0; let done = () => { if (++index == 2) { cb(); } }; fs.close(wfd, done); fs.close(rfd, done); return; } fs.write( wfd, buf, 0, bytesRead, wOffset, function (err, bytesWritten) { if (err) return cb(err); // 讀取成功,并更新偏移量 rOffset += bytesRead; wOffset += bytesWritten; next(); } ); }); } next(); }); }); } copy("test.js", "result.js", function (err) { if (err) return console.log(err); console.log("拷貝成功"); });
這樣我們就成功地實(shí)現(xiàn)大文件分片讀寫,但可以明顯發(fā)現(xiàn):
- write/read 方法參數(shù)多,用起來非常繁瑣
- 上面的代碼有些回調(diào)地獄的傾向,不宜維護(hù)和擴(kuò)展
因此,流就出現(xiàn)了,下面一起來了解一下 nodejs 中的流。
可讀流及源碼編寫
node
中有四種流,下面我們來依次介紹一下,本文主要介紹 Readable
可讀流的使用及其源碼編寫。
Node.js 中的流同樣位于 fs 模塊
EventListener
Nodejs
中的流都繼承于 EventListener
,也就是說其工作原理都是基于發(fā)布訂閱模式。
Readable 可讀流
可讀流用于文件內(nèi)容的讀取,它主要有兩種讀取模式:
- 流動(dòng)模式: 可讀流自動(dòng)讀取數(shù)據(jù),通過
EventListener
接口將數(shù)據(jù)傳遞給應(yīng)用 - 暫停模式: 這種模式下不會(huì)主動(dòng)通過
EventListener
給應(yīng)用傳遞數(shù)據(jù),當(dāng)顯式調(diào)用stream.read
后重啟數(shù)據(jù)流動(dòng)
通過 createReadStream
方法可以創(chuàng)建可讀流,該方法有兩個(gè)參數(shù):
- 參數(shù)一讀取文件的路徑
- 參數(shù)二是
options
配置項(xiàng),該項(xiàng)有八個(gè)參數(shù),但日常我們只需要常用帶星號(hào)的幾個(gè)配置。 flags*
:標(biāo)識(shí)位,默認(rèn)為 r;encoding
:字符編碼,默認(rèn)為 null;fd
:文件描述符,默認(rèn)為 null;mode
:權(quán)限位,默認(rèn)為 0o666;autoClose
:是否自動(dòng)關(guān)閉文件,默認(rèn)為 true;start
:讀取文件的起始位置;end
:讀取文件的(包含)結(jié)束位置;highWaterMark*
:最大讀取文件的字節(jié)數(shù),默認(rèn)64 * 1024
。
highWaterMark
是最值得注意的,它表示每次讀取的文件字節(jié)長(zhǎng)度。
看起來流的參數(shù)很多,用起來會(huì)很復(fù)雜,那你就錯(cuò)了,下面來看個(gè)例子。
// 流是基于發(fā)布訂閱模式實(shí)現(xiàn)的 // 因此我們只需要訂閱對(duì)應(yīng)事件即可 const fs = require("fs"); const path = require("path"); // 返回一個(gè)可讀流 const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), { highWaterMark: 3, // 每次讀取 3kb }); // 文件打開的鉤子函數(shù) rs.on("open", (fd) => { console.log(fd); // 3 }); // 當(dāng)可讀流處于流動(dòng)模式時(shí),data 事件會(huì)不斷觸發(fā) // 在這里我們可以獲取到讀取的數(shù)據(jù),進(jìn)行后續(xù)操作 rs.on("data", (chunk) => { console.log(chunk); }); rs.on("end", () => { console.log("end"); // 結(jié)束事件 });
data
事件會(huì)一直觸發(fā),也就是說在文件讀取完成前, data
會(huì)一直傳遞數(shù)據(jù),有時(shí)候我們并非需要一直讀取,例如讀取一下暫停一下,那該如何實(shí)現(xiàn)那?
// 借助 pause 和 resume 方法可以實(shí)現(xiàn)數(shù)據(jù)讀取的暫停與恢復(fù) rs.on("data", function (data) { // 讀取的數(shù)據(jù)為 buffer 類型 console.log(`讀取了 ${data.length} 字節(jié)數(shù)據(jù) : ${data.toString()}`); //使流動(dòng)模式的流停止觸發(fā)'data'事件,切換出流動(dòng)模式,數(shù)據(jù)都會(huì)保留在內(nèi)部緩存中。 rs.pause(); //等待3秒后,再恢復(fù)觸發(fā)'data'事件,將流切換回流動(dòng)模式。 setTimeout(function () { rs.resume(); }, 3000); });
下面我們來實(shí)現(xiàn)一下可讀流的源碼。
源碼實(shí)現(xiàn)
Step1: 定義可讀流
可讀流繼承于 EventListener
,因此我們首先建立 ReadStream
類繼承于 EventListener
,這樣 ReadStream
便可以使用 EventListener
類的方法。
EventListener 實(shí)現(xiàn)其實(shí)并不困難,小包前面的文章也講過 EventListener 源碼的解讀及編寫。
let fs = require("fs"); let EventEmitter = require("events"); class ReadStream extends EventEmitter {}
Step2: 參數(shù)配置
可讀流有兩個(gè)參數(shù), path
路徑和 options
配置項(xiàng),我們把對(duì)應(yīng)的參數(shù)配置在類上,因此我們需要編寫一下構(gòu)造函數(shù)。
constructor(path, options = {}) { // 使用繼承,子類必須調(diào)用 super 函數(shù) super(); this.path = path; //指定要讀取的文件地址 this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; //是否自動(dòng)關(guān)閉文件 this.start = options.start || 0; // 從文件哪個(gè)位置開始讀取 this.end = options.end || null; // null表示沒傳遞 this.encoding = options.encoding || null;// buffer編碼 this.flags = options.flags || 'r'; }
除了 ReadStream
所需的參數(shù)外,我們還需要添加幾個(gè)控制參數(shù)
pos
: 記錄當(dāng)前文件讀取到的位置flowing
: 當(dāng)前讀取的模式,true
為流動(dòng)模式buffer
: 每次讀取內(nèi)容的存儲(chǔ)位置
constructor() { // ... this.pos = this.start; this.flowing = null; this.buffer = Buffer.alloc(this.highWaterMark); }
Step3: 打開待讀文件
ReadStream 中分別使用 close、open、error 注冊(cè)事件來控制對(duì)應(yīng)行為的產(chǎn)生,當(dāng)打開文件后,觸發(fā) open 事件;打開失敗,觸發(fā) error 事件。
這里我們處理一下上面幾個(gè)事件的觸發(fā)時(shí)機(jī),使用 fs.open 方法來打開文件。
open() { fs.open(this.path, this.flags, (err, fd) => { if (err) { if (this.autoClose) { // 如果需要自動(dòng)關(guān)閉則去關(guān)閉文件 this.destroy(); // 銷毀(關(guān)閉文件,觸發(fā)關(guān)閉事件) } this.emit('error', err); // 打開錯(cuò)誤,觸發(fā) error 事件 return; } this.fd = fd; // 保存文件描述符,方便后續(xù)輪詢判斷 this.emit('open', this.fd); // 文件打開,觸發(fā) open 事件 }); }
Step4: 讀取文件內(nèi)容
上文提到, ReadStream
有兩種模式: 流動(dòng)模式和暫停模式,并用 flowing
屬性來標(biāo)識(shí)兩種模式。
ReadStream
通過監(jiān)聽 data
事件來啟動(dòng)文件讀取,即:
rs.on("data", (chunk) => { console.log(chunk); });
這里實(shí)現(xiàn)有兩個(gè)難點(diǎn):
- 當(dāng)監(jiān)聽
data
事件后,ReadStream
才開啟數(shù)據(jù)讀取,那應(yīng)該如何監(jiān)聽data
事件的注冊(cè)那? fs.open
是異步讀取操作,因此有可能出現(xiàn)data
事件觸發(fā)時(shí),文件還未讀取完畢,那我們應(yīng)該如何處理這種情況那?
一個(gè)問題一個(gè)問題來解決, EventListener
中提供了 newListener
事件,當(dāng)注冊(cè)新事件后,該事件的處理函數(shù)觸發(fā),因此我們可以監(jiān)聽該事件,判斷事件類型,如果為 data
事件,打開 flowing
,開始讀取
class ReadStream extends EventEmitter { constructor(path, options) { // 監(jiān)聽newListener事件,判斷當(dāng)前監(jiān)聽事件是否為 data 事件 // 如果為 data 事件,開啟文件讀取 this.on("newListener", (type) => { if (type === "data") { // 開啟流動(dòng)模式,開始讀取文件中的內(nèi)容 this.flowing = true; this.read(); } }); } }
由于 data
事件的觸發(fā)可能發(fā)生在 fs.open
讀取之前,因此 read
函數(shù)中要做一個(gè) 輪詢操作 ,每次判斷是否成功讀取。
read() { // 文件如果未打卡,fd 是沒有值的 if (typeof this.fd !== "number") { // 如果文件未打開,觸發(fā) open 事件 return this.once("open", () => this.read()); } }
Step5: 編寫 read 方法
上面編寫完畢后,我們可以成功的監(jiān)聽到 data
事件,且可以打開文件,后續(xù)就可以進(jìn)行文件的讀取了。
文件讀取的內(nèi)容上文案例中提到過,即利用 fs.read
方法進(jìn)行讀取,下面直接在源碼上進(jìn)行解釋。
class ReadStream extends EventEmitter { read() { // 計(jì)算當(dāng)前讀取字節(jié) const howManyToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark; // 創(chuàng)建 buffer 實(shí)例 const buffer = Buffer.alloc(howManyToRead); // 利用 fs.read 進(jìn)行文件內(nèi)容讀取 fs.read( this.fd, buffer, 0, howManyToRead, this.offset, (err, bytesRead) => { if (err) return this.destory(err); this.pos += bytesRead; // 可能存在最后一次的 buffer 大小 大于 實(shí)際數(shù)據(jù)大小的情況,所以使用slice來進(jìn)行截取 // 將讀取后的內(nèi)容傳遞給 data 事件 this.emit("data", buffer.slice(0, bytesRead)); } ); } }
這樣便可以實(shí)現(xiàn)一次讀取,一次讀取完畢后,接著調(diào)用 read
方法就可以實(shí)現(xiàn)不斷讀取,即流動(dòng)模式
read() { // ... // 流動(dòng)模式下,循環(huán)進(jìn)行讀取 if (this.flowing) { this.read(); } }
Step6: 流動(dòng)模式與暫停模式
ReadStream
使用 flowing
來控制可讀流的讀取與暫停,最后我們來實(shí)現(xiàn)可讀流的暫停和恢復(fù)。
pause() { // 判斷當(dāng)前是否讀取完畢了 if (this.flowing) { this.flowing = false; } } resume() { // 判斷當(dāng)前是否讀取完畢了 if (!this.flowing) { this.flowing = true; this.read(); } }
總結(jié)
本文詳細(xì)的講解了流的前因后果,流可以說是 node 的核心之一,對(duì)此我們需要完美掌握,靈活運(yùn)用。本文為了讓大家更深入的了解流,從源碼和應(yīng)用出發(fā),帶你全方位了解流??蓪懥鞯木帉懜幸馑?,可以學(xué)到更多東西,后續(xù)小包會(huì)繼續(xù)撰寫文章。
以上就是一文帶你搞懂Node中的流的詳細(xì)內(nèi)容,更多關(guān)于Node 流的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用pkg打包nodejs項(xiàng)目并解決本地文件讀取的問題
這篇文章主要介紹了使用pkg打包nodejs項(xiàng)目并解決本地文件讀取的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-10-10nodejs實(shí)現(xiàn)遍歷文件夾并統(tǒng)計(jì)文件大小
這篇文章主要介紹了nodejs實(shí)現(xiàn)遍歷文件夾并統(tǒng)計(jì)文件大小,下面使用nodejs的遍歷文件夾文件內(nèi)容,并且讀取所有的文件,并采取排序往大到小的順序進(jìn)行輸出,需要的朋友可以參考下2015-05-05node.js實(shí)現(xiàn)端口轉(zhuǎn)發(fā)
這篇文章主要為大家詳細(xì)介紹了node.js實(shí)現(xiàn)端口轉(zhuǎn)發(fā)的關(guān)鍵代碼,感興趣的小伙伴們可以參考一下2016-04-04nodejs使用express創(chuàng)建一個(gè)簡(jiǎn)單web應(yīng)用
這篇文章主要介紹了nodejs使用express創(chuàng)建一個(gè)簡(jiǎn)單web應(yīng)用的相關(guān)資料,需要的朋友可以參考下2017-03-03