Node.js多文件Stream合并,串行和并發(fā)兩種模式的實(shí)現(xiàn)方式
將多個(gè)文件合并為一個(gè)文件,常見的場(chǎng)景是類似于大文件分片上傳,事先根據(jù)一定的文件大小拆分為多個(gè)小文件上傳到服務(wù)端,最后服務(wù)端在合并起來。
怎么合并?一種簡(jiǎn)單的辦法是使用 fs.readFile 讀取,fs.writeFile 追加寫入,這種方式是將文件數(shù)據(jù)先讀入應(yīng)用內(nèi)存再寫入,不是很推薦,Node.js 本身提供了 Stream 模塊可以更好的處理這種場(chǎng)景。
在 Stream 中合并文件之前一個(gè)比較常用的 api 是 pipe,但是這個(gè) API 對(duì)于錯(cuò)誤處理不是很友好,一不小心還能搞出文件句柄內(nèi)存泄漏問題。
本文先介紹 pipe 方法的使用及什么情況下會(huì)遇到文件句柄的內(nèi)存泄漏問題,之后再分別介紹 Stream 合并的兩種實(shí)現(xiàn)模式。
pipe VS pipeline
pipe
創(chuàng)建一個(gè)可讀流 readable 和一個(gè)可寫流 writeable,通過管道 pipe 將可寫流綁定到可讀流,一個(gè)簡(jiǎn)單的 Stream 操作就完成了。
const fs = require('fs'); const readable = fs.createReadStream('./test1.txt'); const writeable = fs.createWriteStream('./test2.txt'); readable.pipe(writeable);
pipe 方法的兩個(gè)參數(shù):
destination:是一個(gè)可寫流對(duì)象,也就是一個(gè)數(shù)據(jù)寫入的目標(biāo)對(duì)象,例如,上面我們創(chuàng)建的 writeable 就是一個(gè)可寫流對(duì)象
options:
- end:讀取結(jié)束時(shí)終止寫入流,默認(rèn)值是 true
readable.pipe(destination[, options])
默認(rèn)情況下我們是不需要手動(dòng)調(diào)用寫入流的 end 方法關(guān)閉的。
現(xiàn)在我們改一下, 設(shè)置 end 為 false 寫入的目標(biāo)流將會(huì)一直處于打開狀態(tài), 此時(shí)就需要監(jiān)聽可讀流的 end 事件,結(jié)束之后手動(dòng)調(diào)用可寫流的 end 方法結(jié)束( 為什么要這樣做?下文 Stream 串行合并會(huì)再用到這一特性 )。
// readable.pipe(writeable); readable.pipe(writeable, { end: false, }); readable.on('end', function() { writeable.end('結(jié)束'); });
還需要注意一點(diǎn) 如果可讀流期間發(fā)生什么錯(cuò)誤,則寫入的目標(biāo)流將不會(huì)關(guān)閉 ,例如:process.stderr 和 process.stdout 可寫流在 Nodejs 進(jìn)程退出前將永遠(yuǎn)不會(huì)關(guān)閉,所以 需要監(jiān)聽錯(cuò)誤事件,手動(dòng)關(guān)閉可寫流,防止內(nèi)存泄漏 。
Linux 下一切皆文件,為了測(cè)試,在創(chuàng)建可讀流時(shí),你可以不創(chuàng)建 test1.txt 文件,讓可讀流自動(dòng)觸發(fā) error 事件并且將 writeable 的 close 方法注釋掉,通過 linux 命令 ls -l /proc/${pid}/fd 查看 error 和非 error 前后的文件句柄變化。
readable.on('error', function(err) { console.log('error', err); // writeable.close(); }); console.log(process.pid); // 打印進(jìn)程 ID setInterval(function(){}, 5000) // 讓程序不中斷,進(jìn)程不退出
以下為觸發(fā) error 錯(cuò)誤下 test2.txt 這個(gè)文件 fd 將會(huì)一直打開,除非進(jìn)程退出,所以重要的事情再說一遍, 如果使用 pipe 一定要做好錯(cuò)誤監(jiān)聽手動(dòng)關(guān)閉每個(gè)寫入流 ,以防止 “ 內(nèi)存泄漏 ”。
...
l-wx------ 1 root root 64 Apr 10 15:47 19 -> /root/study/test2.txt
...
注意,Mac 下沒有 /proc 文件,可通過 docker 測(cè)試。
不想開兩個(gè)終端的,也可以在程序 setInterval 定時(shí)器函數(shù)里使用 child_process 模塊的 exec 函數(shù)執(zhí)行 ls -l /proc/${process.pid}/fd 命令。
const { exec } = require('child_process'); setInterval(function(){ exec(`ls -l /proc/${process.pid}/fd`, (error, stdout, stderr) => { console.log(`stdout: \n`, stdout); }) }, 5000) // 讓程序不中斷,進(jìn)程不退出
pipeline
Stream 模塊的一個(gè)新 API pipeline 方法,添加于 Node.js v10.0,Promise 風(fēng)格需要 Node.js v15.0+ 支持。
相比較于 pipe 方法增加了錯(cuò)誤處理機(jī)制,當(dāng)管道中的某個(gè)流發(fā)生錯(cuò)誤,它會(huì)自動(dòng)處理并釋放掉相應(yīng)的資源。
try { await pipeline( readable, writable ); console.log('Pipeline succeeded.'); } catch (err) { console.log('error', err); }
串行模式 Stream 合并
使用 pipe 方法實(shí)現(xiàn)串行模式的流合并,根據(jù)前面講的,設(shè)置可讀流的 end 為 false 保持寫入流一直處于打開狀態(tài),直到所有的可讀流結(jié)束(待合并的文件完成后),我們?cè)賹⒖蓪懥鹘o關(guān)閉。
- streamMerge 函數(shù)為入口函數(shù)
- streamMergeRecursive 函數(shù)遞歸調(diào)用合并文件
const fs = require('fs'); const path = require('path'); /** * Stream 合并 * @param { String } sourceFileDirectory 源文件目錄 * @param { String } targetFile 目標(biāo)文件 */ function streamMerge(sourceFileDirectory, targetFile) { const scripts = fs.readdirSync(path.resolve(__dirname, sourceFileDirectory)); // 獲取源文件目錄下的所有文件 const fileWriteStream = fs.createWriteStream(path.resolve(__dirname, targetFile)); // 創(chuàng)建一個(gè)可寫流 // fs.readdir 讀取出來的結(jié)果,根據(jù)具體的規(guī)則做下排序,防止因?yàn)轫樞虿粚?duì)導(dǎo)致最終合并之后的文件無效。 return streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory); } /** * Stream 合并的遞歸調(diào)用 * @param { Array } scripts * @param { Stream } fileWriteStream */ function streamMergeRecursive(scripts=[], fileWriteStream, sourceFileDirectory) { // 遞歸到尾部情況判斷 if (!scripts.length) { return fileWriteStream.end("console.log('Stream 合并完成')"); // 最后關(guān)閉可寫流,防止內(nèi)存泄漏 } const currentFile = path.resolve(__dirname, sourceFileDirectory, scripts.shift()); const currentReadStream = fs.createReadStream(currentFile); // 獲取當(dāng)前的可讀流 currentReadStream.pipe(fileWriteStream, { end: false }); currentReadStream.on('end', function() { streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory); }); currentReadStream.on('error', function(error) { // 監(jiān)聽錯(cuò)誤事件,關(guān)閉可寫流,防止內(nèi)存泄漏 console.error(error); fileWriteStream.close(); }); } streamMerge('./files', './file.js');
并發(fā)模式 Stream 合并
流合并也是可以采用并發(fā)模式的,核心是通過可寫流的 start、end 屬性控制。
start 有點(diǎn)類似于數(shù)據(jù)庫(kù)查詢的 skip,在 計(jì)算時(shí)要求文件分塊的下標(biāo)必須是 0、1、2... 這樣的規(guī)則 ,這種方式可以不用關(guān)注每一個(gè)流分塊在文件中的存儲(chǔ)順序,也可以將可讀流傳輸至可寫流的指定位置。
例如,有一個(gè)大文件 dec47b76e3220432100a1155eff7f402(文件 md5 后的 hash 值) 根據(jù) chunkSize(1048576)拆分為 3 個(gè)小文件。
/chunks └── dec47b76e3220432100a1155eff7f402-1048576 ├── dec47b76e3220432100a1155eff7f402-0 ├── dec47b76e3220432100a1155eff7f402-1 └── dec47b76e3220432100a1155eff7f402-2
并發(fā)模式的 Stream 合并代碼實(shí)現(xiàn)如下:
/** * Stream concurrent merge * @param {String} sourceFileDirectory * @param {String} targetFile * @param {Number} chunkSize */ export const streamConcurrentMerge = async (sourceFileDirectory, targetFile, chunkSize) => { const filenames = await fs.readdir(sourceFileDirectory); await Promise.all(filenames.map(filename => { const index = filename.split('-').pop(); const start = index * chunkSize; const end = (index + 1) * chunkSize; return pipeline( createReadStream(path.join(sourceFileDirectory, filename)), createWriteStream(targetFile, { start, end, }) ); })) }
總結(jié)
使用 pipe 時(shí)錯(cuò)誤處理是件需要注意的事情,特別是出現(xiàn)這種情況 readable.pipe(a).pipe(b).pipe(writable) 其中任何一個(gè)流關(guān)閉或出錯(cuò)都會(huì)導(dǎo)致整個(gè)管道停止工作,這個(gè)時(shí)候就要銷毀所有的流,這種復(fù)雜的處理起來極其麻煩, 推薦使用 stream API pipeline 處理,或使用社區(qū) npm 庫(kù) pump。
將多個(gè)文件合并為一個(gè)文件,使用流的方式有兩種:
- 第一種是串行模式依次讀取每個(gè)文件的內(nèi)容,通過 pipe 方法寫入可寫流,直到最后一個(gè)文件讀取完成關(guān)閉寫入流。
- 另一種是并發(fā)模式,核心實(shí)現(xiàn)是利用寫入流的 start、end 屬性將可讀流傳輸至可寫流的指定位置,上面的實(shí)現(xiàn)還可以在優(yōu)化,比如控制下并發(fā)的數(shù)量。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
mac安裝nvm(node.js)多版本管理實(shí)踐步驟
這篇文章主要介紹了mac安裝nvm(node.js)多版本管理的相關(guān)資料,NVM是一個(gè)用于管理多個(gè)Node.js版本的命令行工具,它允許開發(fā)者在同一臺(tái)機(jī)器上安裝、切換和卸載不同版本的Node.js,從而解決版本不兼容的問題,需要的朋友可以參考下2025-02-02nodejs構(gòu)建本地web測(cè)試服務(wù)器 如何解決訪問靜態(tài)資源問題
這篇文章主要為大家詳細(xì)介紹了nodejs構(gòu)建本地web測(cè)試服務(wù)器,教大家如何解決訪問靜態(tài)資源問題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-07-07