一文帶你搞懂Node中的流
流是什么?
流,通俗來講就是數(shù)據(jù)流動,數(shù)據(jù)從一個地方緩慢的流到另一個地方。
舉個栗子,可以借助水管中的水流來輔助理解,當(dāng)打開水龍頭后,水便可以從源頭流出水龍頭;關(guān)閉水龍頭,水便不再流動。
為什么需要流
那為什么會需要流吶?
其它介質(zhì)和內(nèi)存的數(shù)據(jù)規(guī)模不一致,例如磁盤的內(nèi)存往往遠(yuǎn)遠(yuǎn)大于內(nèi)存,因此磁盤中有可能會出現(xiàn)大于內(nèi)存的文件,此時內(nèi)存無法一次讀入該文件。這種情形可以把水庫比作磁盤,洗碗池比作內(nèi)存,如果不加限制,水庫的水量輕輕拿捏洗碗池,因此就需要水管來進(jìn)行傳輸,限制水的流量。

其他介質(zhì)和內(nèi)存的數(shù)據(jù)處理能力不一致,內(nèi)存的處理速度其他介質(zhì)很難比,內(nèi)存迅速處理數(shù)據(jù),一波流傳給硬盤,硬盤很難吃得消。

為了更深刻得理解流的作用,接下來我們來試一下不使用流需要如何進(jìn)行文件讀寫。
文件讀寫
首先我們來實現(xiàn)最簡單的文件拷貝功能,這個比較簡單,我們可以借助 fs 模塊的 readFile 和 writeFile 方法來實現(xiàn)。
readFile 和 writeFile 并沒有 promise 化,可以借助 util.promiseify 方法將其 promise 化,但這里并不是文章的重點,因此依舊采用回調(diào)的方式
const fs = require("fs");
const path = require("path");
// 利用 path 上的方法組裝路徑
fs.readFile(path.resolve(__dirname, "test.txt"), (err, data) => {
if (err) return console.log("error", err);
fs.writeFile(path.resolve(__dirname, "result.txt"), data, () => {
console.log("拷貝成功");
});
});上面的代碼雖然成功實現(xiàn)了文件拷貝,但問題也很明顯,不適用于大文件,當(dāng)文件大于或接近內(nèi)存時,會淹沒內(nèi)存,這也響應(yīng)了為什么需要流的第一點。
對于大文件,如何進(jìn)行讀寫那: 邊讀邊寫,讀一點寫一點,這樣我們便可以控制文件讀寫的速率 ,也稱作分片讀寫??偟膩碚f就是邊讀邊寫。
分片讀寫
分片讀寫需要使用 fs 模塊中的 read,write,close,open 方法。
既然 fs 有方法可以實現(xiàn)邊讀邊寫,那為什么還會有流的出現(xiàn)的?這幾個方法太麻煩了,參數(shù)太多,這里只做一個演示。
首先來實現(xiàn)單個文字的讀寫。
// 創(chuàng)建一個存儲單位 1 的 Buffer 空間,來存儲中間讀取的數(shù)據(jù)
let buf = Buffer.alloc(1);
// 讀取源文件中的數(shù)據(jù)
fs.open(path.resolve(__dirname, "test.js"), "r", function (err, rfd) {
// rfd 可以理解為文字指針
// 你看到了嗎?6 個參數(shù),麻爪
// 甚至都有點解釋不動
fs.read(rfd, buf, 0, 1, 0, function (err, bytesRead) {
// bytesRead讀取到的字節(jié)長度
// 讀取到的第一個數(shù)據(jù)存入 buf 中
console.log(buf); // <Buffer 31>
// 打開目標(biāo)文件。
fs.open(path.resolve(__dirname, "result.js"), "w", function (err, wfd) {
// 6 個參數(shù)
// 這里做的就是將 buf 內(nèi)容寫入 result
fs.write(wfd, buf, 0, 1, 0, function (err, bytesWritten) {
console.log("拷貝成功");
});
});
});
});上面的方法實現(xiàn)了單次數(shù)據(jù)的讀取,我們只需要重復(fù)這個過程就可以實現(xiàn)大文件的讀寫。
如何重復(fù)實現(xiàn)上述過程那?遞歸,沒錯,就是遞歸,將讀寫部分封裝成函數(shù),在寫成功的回調(diào)函數(shù)中再次調(diào)用該函數(shù)。
// source 源文件
// target 目標(biāo)文件
// cb 回調(diào)函數(shù)
// bufferSize buffer固定長度,即一次讀寫的數(shù)量
function copy(source, target, cb, bufferSize = 3) {
const SOURCE_PATH = path.resolve(__dirname, source);
const TARGET_PATH = path.resolve(__dirname, target);
let buf = Buffer.alloc(bufferSize); // 創(chuàng)建 buffer 實例
let rOffset = 0; // 讀取偏移量
let wOffset = 0; // 寫入偏移量
fs.open(SOURCE_PATH, "r", function (err, rfd) {
if (err) return cb(err);
fs.open(TARGET_PATH, "w", function (err, wfd) {
if (err) return cb(err);
// 遞歸讀寫函數(shù) next
function next() {
fs.read(rfd, buf, 0, bufferSize, rOffset, function (err, bytesRead) {
if (err) return cb(err);
// bytesRead 代表一次讀取的字節(jié)數(shù)
// 當(dāng) bytesRead 為 0 時,代表文件已經(jīng)成功讀完
// 則可以停止讀寫操作,關(guān)閉文件
if (bytesRead == 0) {
let index = 0;
let done = () => {
if (++index == 2) {
cb();
}
};
fs.close(wfd, done);
fs.close(rfd, done);
return;
}
fs.write(
wfd,
buf,
0,
bytesRead,
wOffset,
function (err, bytesWritten) {
if (err) return cb(err);
// 讀取成功,并更新偏移量
rOffset += bytesRead;
wOffset += bytesWritten;
next();
}
);
});
}
next();
});
});
}
copy("test.js", "result.js", function (err) {
if (err) return console.log(err);
console.log("拷貝成功");
});這樣我們就成功地實現(xiàn)大文件分片讀寫,但可以明顯發(fā)現(xiàn):
- write/read 方法參數(shù)多,用起來非常繁瑣
- 上面的代碼有些回調(diào)地獄的傾向,不宜維護和擴展
因此,流就出現(xiàn)了,下面一起來了解一下 nodejs 中的流。
可讀流及源碼編寫
node 中有四種流,下面我們來依次介紹一下,本文主要介紹 Readable 可讀流的使用及其源碼編寫。
Node.js 中的流同樣位于 fs 模塊
EventListener
Nodejs 中的流都繼承于 EventListener ,也就是說其工作原理都是基于發(fā)布訂閱模式。
Readable 可讀流
可讀流用于文件內(nèi)容的讀取,它主要有兩種讀取模式:
- 流動模式: 可讀流自動讀取數(shù)據(jù),通過
EventListener接口將數(shù)據(jù)傳遞給應(yīng)用 - 暫停模式: 這種模式下不會主動通過
EventListener給應(yīng)用傳遞數(shù)據(jù),當(dāng)顯式調(diào)用stream.read后重啟數(shù)據(jù)流動
通過 createReadStream 方法可以創(chuàng)建可讀流,該方法有兩個參數(shù):
- 參數(shù)一讀取文件的路徑
- 參數(shù)二是
options配置項,該項有八個參數(shù),但日常我們只需要常用帶星號的幾個配置。 flags*:標(biāo)識位,默認(rèn)為 r;encoding:字符編碼,默認(rèn)為 null;fd:文件描述符,默認(rèn)為 null;mode:權(quán)限位,默認(rèn)為 0o666;autoClose:是否自動關(guān)閉文件,默認(rèn)為 true;start:讀取文件的起始位置;end:讀取文件的(包含)結(jié)束位置;highWaterMark*:最大讀取文件的字節(jié)數(shù),默認(rèn)64 * 1024。
highWaterMark 是最值得注意的,它表示每次讀取的文件字節(jié)長度。
看起來流的參數(shù)很多,用起來會很復(fù)雜,那你就錯了,下面來看個例子。
// 流是基于發(fā)布訂閱模式實現(xiàn)的
// 因此我們只需要訂閱對應(yīng)事件即可
const fs = require("fs");
const path = require("path");
// 返回一個可讀流
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
highWaterMark: 3, // 每次讀取 3kb
});
// 文件打開的鉤子函數(shù)
rs.on("open", (fd) => {
console.log(fd); // 3
});
// 當(dāng)可讀流處于流動模式時,data 事件會不斷觸發(fā)
// 在這里我們可以獲取到讀取的數(shù)據(jù),進(jìn)行后續(xù)操作
rs.on("data", (chunk) => {
console.log(chunk);
});
rs.on("end", () => {
console.log("end"); // 結(jié)束事件
});data 事件會一直觸發(fā),也就是說在文件讀取完成前, data 會一直傳遞數(shù)據(jù),有時候我們并非需要一直讀取,例如讀取一下暫停一下,那該如何實現(xiàn)那?
// 借助 pause 和 resume 方法可以實現(xiàn)數(shù)據(jù)讀取的暫停與恢復(fù)
rs.on("data", function (data) {
// 讀取的數(shù)據(jù)為 buffer 類型
console.log(`讀取了 ${data.length} 字節(jié)數(shù)據(jù) : ${data.toString()}`);
//使流動模式的流停止觸發(fā)'data'事件,切換出流動模式,數(shù)據(jù)都會保留在內(nèi)部緩存中。
rs.pause();
//等待3秒后,再恢復(fù)觸發(fā)'data'事件,將流切換回流動模式。
setTimeout(function () {
rs.resume();
}, 3000);
});下面我們來實現(xiàn)一下可讀流的源碼。
源碼實現(xiàn)
Step1: 定義可讀流
可讀流繼承于 EventListener ,因此我們首先建立 ReadStream 類繼承于 EventListener ,這樣 ReadStream 便可以使用 EventListener 類的方法。
EventListener 實現(xiàn)其實并不困難,小包前面的文章也講過 EventListener 源碼的解讀及編寫。
let fs = require("fs");
let EventEmitter = require("events");
class ReadStream extends EventEmitter {}Step2: 參數(shù)配置
可讀流有兩個參數(shù), path 路徑和 options 配置項,我們把對應(yīng)的參數(shù)配置在類上,因此我們需要編寫一下構(gòu)造函數(shù)。
constructor(path, options = {}) {
// 使用繼承,子類必須調(diào)用 super 函數(shù)
super();
this.path = path; //指定要讀取的文件地址
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true; //是否自動關(guān)閉文件
this.start = options.start || 0; // 從文件哪個位置開始讀取
this.end = options.end || null; // null表示沒傳遞
this.encoding = options.encoding || null;// buffer編碼
this.flags = options.flags || 'r';
}除了 ReadStream 所需的參數(shù)外,我們還需要添加幾個控制參數(shù)
pos: 記錄當(dāng)前文件讀取到的位置flowing: 當(dāng)前讀取的模式,true為流動模式buffer: 每次讀取內(nèi)容的存儲位置
constructor() {
// ...
this.pos = this.start;
this.flowing = null;
this.buffer = Buffer.alloc(this.highWaterMark);
}Step3: 打開待讀文件
ReadStream 中分別使用 close、open、error 注冊事件來控制對應(yīng)行為的產(chǎn)生,當(dāng)打開文件后,觸發(fā) open 事件;打開失敗,觸發(fā) error 事件。
這里我們處理一下上面幾個事件的觸發(fā)時機,使用 fs.open 方法來打開文件。
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
if (this.autoClose) { // 如果需要自動關(guān)閉則去關(guān)閉文件
this.destroy(); // 銷毀(關(guān)閉文件,觸發(fā)關(guān)閉事件)
}
this.emit('error', err); // 打開錯誤,觸發(fā) error 事件
return;
}
this.fd = fd; // 保存文件描述符,方便后續(xù)輪詢判斷
this.emit('open', this.fd); // 文件打開,觸發(fā) open 事件
});
}Step4: 讀取文件內(nèi)容
上文提到, ReadStream 有兩種模式: 流動模式和暫停模式,并用 flowing 屬性來標(biāo)識兩種模式。
ReadStream 通過監(jiān)聽 data 事件來啟動文件讀取,即:
rs.on("data", (chunk) => {
console.log(chunk);
});這里實現(xiàn)有兩個難點:
- 當(dāng)監(jiān)聽
data事件后,ReadStream才開啟數(shù)據(jù)讀取,那應(yīng)該如何監(jiān)聽data事件的注冊那? fs.open是異步讀取操作,因此有可能出現(xiàn)data事件觸發(fā)時,文件還未讀取完畢,那我們應(yīng)該如何處理這種情況那?
一個問題一個問題來解決, EventListener 中提供了 newListener 事件,當(dāng)注冊新事件后,該事件的處理函數(shù)觸發(fā),因此我們可以監(jiān)聽該事件,判斷事件類型,如果為 data 事件,打開 flowing ,開始讀取
class ReadStream extends EventEmitter {
constructor(path, options) {
// 監(jiān)聽newListener事件,判斷當(dāng)前監(jiān)聽事件是否為 data 事件
// 如果為 data 事件,開啟文件讀取
this.on("newListener", (type) => {
if (type === "data") {
// 開啟流動模式,開始讀取文件中的內(nèi)容
this.flowing = true;
this.read();
}
});
}
}
由于 data 事件的觸發(fā)可能發(fā)生在 fs.open 讀取之前,因此 read 函數(shù)中要做一個 輪詢操作 ,每次判斷是否成功讀取。
read() {
// 文件如果未打卡,fd 是沒有值的
if (typeof this.fd !== "number") {
// 如果文件未打開,觸發(fā) open 事件
return this.once("open", () => this.read());
}
}Step5: 編寫 read 方法
上面編寫完畢后,我們可以成功的監(jiān)聽到 data 事件,且可以打開文件,后續(xù)就可以進(jìn)行文件的讀取了。
文件讀取的內(nèi)容上文案例中提到過,即利用 fs.read 方法進(jìn)行讀取,下面直接在源碼上進(jìn)行解釋。
class ReadStream extends EventEmitter {
read() {
// 計算當(dāng)前讀取字節(jié)
const howManyToRead = this.end
? Math.min(this.highWaterMark, this.end - this.pos + 1)
: this.highWaterMark;
// 創(chuàng)建 buffer 實例
const buffer = Buffer.alloc(howManyToRead);
// 利用 fs.read 進(jìn)行文件內(nèi)容讀取
fs.read(
this.fd,
buffer,
0,
howManyToRead,
this.offset,
(err, bytesRead) => {
if (err) return this.destory(err);
this.pos += bytesRead;
// 可能存在最后一次的 buffer 大小 大于 實際數(shù)據(jù)大小的情況,所以使用slice來進(jìn)行截取
// 將讀取后的內(nèi)容傳遞給 data 事件
this.emit("data", buffer.slice(0, bytesRead));
}
);
}
}這樣便可以實現(xiàn)一次讀取,一次讀取完畢后,接著調(diào)用 read 方法就可以實現(xiàn)不斷讀取,即流動模式
read() {
// ...
// 流動模式下,循環(huán)進(jìn)行讀取
if (this.flowing) {
this.read();
}
}Step6: 流動模式與暫停模式
ReadStream 使用 flowing 來控制可讀流的讀取與暫停,最后我們來實現(xiàn)可讀流的暫停和恢復(fù)。
pause() {
// 判斷當(dāng)前是否讀取完畢了
if (this.flowing) {
this.flowing = false;
}
}
resume() {
// 判斷當(dāng)前是否讀取完畢了
if (!this.flowing) {
this.flowing = true;
this.read();
}
}總結(jié)
本文詳細(xì)的講解了流的前因后果,流可以說是 node 的核心之一,對此我們需要完美掌握,靈活運用。本文為了讓大家更深入的了解流,從源碼和應(yīng)用出發(fā),帶你全方位了解流??蓪懥鞯木帉懜幸馑?,可以學(xué)到更多東西,后續(xù)小包會繼續(xù)撰寫文章。
以上就是一文帶你搞懂Node中的流的詳細(xì)內(nèi)容,更多關(guān)于Node 流的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
nodejs實現(xiàn)遍歷文件夾并統(tǒng)計文件大小
這篇文章主要介紹了nodejs實現(xiàn)遍歷文件夾并統(tǒng)計文件大小,下面使用nodejs的遍歷文件夾文件內(nèi)容,并且讀取所有的文件,并采取排序往大到小的順序進(jìn)行輸出,需要的朋友可以參考下2015-05-05
node.js實現(xiàn)端口轉(zhuǎn)發(fā)
這篇文章主要為大家詳細(xì)介紹了node.js實現(xiàn)端口轉(zhuǎn)發(fā)的關(guān)鍵代碼,感興趣的小伙伴們可以參考一下2016-04-04
nodejs使用express創(chuàng)建一個簡單web應(yīng)用
這篇文章主要介紹了nodejs使用express創(chuàng)建一個簡單web應(yīng)用的相關(guān)資料,需要的朋友可以參考下2017-03-03

