nodejs中使用worker_threads來創(chuàng)建新的線程的方法
簡介
之前的文章中提到了,nodejs中有兩種線程,一種是event loop用來相應(yīng)用戶的請求和處理各種callback。另一種就是worker pool用來處理各種耗時操作。
nodejs的官網(wǎng)提到了一個能夠使用nodejs本地woker pool的lib叫做webworker-threads。
可惜的是webworker-threads的最后一次更新還是在2年前,而在最新的nodejs 12中,根本無法使用。
而webworker-threads的作者則推薦了一個新的lib叫做web-worker。
web-worker是構(gòu)建于nodejs的worker_threads之上的,本文將會詳細(xì)講解worker_threads和web-worker的使用。
worker_threads
worker_threads模塊的源代碼源自lib/worker_threads.js,它指的是工作線程,可以開啟一個新的線程來并行執(zhí)行javascript程序。
worker_threads主要用來處理CPU密集型操作,而不是IO操作,因為nodejs本身的異步IO已經(jīng)非常強大了。
worker_threads中主要有5個屬性,3個class和3個主要的方法。接下來我們將會一一講解。
isMainThread
isMainThread用來判斷代碼是否在主線程中運行,我們看一個使用的例子:
const { Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
console.log('在主線程中');
new Worker(__filename);
} else {
console.log('在工作線程中');
console.log(isMainThread); // 打印 'false'。
}
上面的例子中,我們從worker_threads模塊中引入了Worker和isMainThread,Worker就是工作線程的主類,我們將會在后面詳細(xì)講解,這里我們使用Worker創(chuàng)建了一個工作線程。
MessageChannel
MessageChannel代表的是一個異步雙向通信channel。MessageChannel中沒有方法,主要通過MessageChannel來連接兩端的MessagePort。
class MessageChannel {
readonly port1: MessagePort;
readonly port2: MessagePort;
}
當(dāng)我們使用new MessageChannel()的時候,會自動創(chuàng)建兩個MessagePort。
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener
通過MessageChannel,我們可以進(jìn)行MessagePort間的通信。
parentPort和MessagePort
parentPort是一個MessagePort類型,parentPort主要用于worker線程和主線程進(jìn)行消息交互。
通過parentPort.postMessage()發(fā)送的消息在主線程中將可以通過worker.on(‘message')接收。
主線程中通過worker.postMessage()發(fā)送的消息將可以在工作線程中通過parentPort.on(‘message')接收。
我們看一下MessagePort的定義:
class MessagePort extends EventEmitter {
close(): void;
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
ref(): void;
unref(): void;
start(): void;
addListener(event: "close", listener: () => void): this;
addListener(event: "message", listener: (value: any) => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
emit(event: "close"): boolean;
emit(event: "message", value: any): boolean;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: "close", listener: () => void): this;
on(event: "message", listener: (value: any) => void): this;
on(event: string | symbol, listener: (...args: any[]) => void): this;
once(event: "close", listener: () => void): this;
once(event: "message", listener: (value: any) => void): this;
once(event: string | symbol, listener: (...args: any[]) => void): this;
prependListener(event: "close", listener: () => void): this;
prependListener(event: "message", listener: (value: any) => void): this;
prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
prependOnceListener(event: "close", listener: () => void): this;
prependOnceListener(event: "message", listener: (value: any) => void): this;
prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
removeListener(event: "close", listener: () => void): this;
removeListener(event: "message", listener: (value: any) => void): this;
removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
off(event: "close", listener: () => void): this;
off(event: "message", listener: (value: any) => void): this;
off(event: string | symbol, listener: (...args: any[]) => void): this;
}
MessagePort繼承自EventEmitter,它表示的是異步雙向通信channel的一端。這個channel就叫做MessageChannel,MessagePort通過MessageChannel來進(jìn)行通信。
我們可以通過MessagePort來傳輸結(jié)構(gòu)體數(shù)據(jù),內(nèi)存區(qū)域或者其他的MessagePorts。
從源代碼中,我們可以看到MessagePort中有兩個事件,close和message。
close事件將會在channel的中任何一端斷開連接的時候觸發(fā),而message事件將會在port.postMessage時候觸發(fā),下面我們看一個例子:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
// Prints:
// foobar
// closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));
port1.postMessage('foobar');
port1.close();
port.on(‘message')實際上為message事件添加了一個listener,port還提供了addListener方法來手動添加listener。
port.on(‘message')會自動觸發(fā)port.start()方法,表示啟動一個port。
當(dāng)port有l(wèi)istener存在的時候,這表示port存在一個ref,當(dāng)存在ref的時候,程序是不會結(jié)束的。我們可以通過調(diào)用port.unref方法來取消這個ref。
接下來我們看一下怎么通過port來傳輸消息:
port.postMessage(value[, transferList])
postMessage可以接受兩個參數(shù),第一個參數(shù)是value,這是一個JavaScript對象。第二個參數(shù)是transferList。
先看一個傳遞一個參數(shù)的情況:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);
通常來說postMessage發(fā)送的對象都是value的拷貝,但是如果你指定了transferList,那么在transferList中的對象將會被transfer到channel的接受端,并且不再存在于發(fā)送端,就好像把對象傳送出去一樣。
transferList是一個list,list中的對象可以是ArrayBuffer, MessagePort 和 FileHandle。
如果value中包含SharedArrayBuffer對象,那么該對象不能被包含在transferList中。
看一個包含兩個參數(shù)的例子:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// post uint8Array的拷貝:
port2.postMessage(uint8Array);
port2.postMessage(uint8Array, [ uint8Array.buffer ]);
//port2.postMessage(uint8Array);
上面的例子將輸出:
Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]
第一個postMessage是拷貝,第二個postMessage是transfer Uint8Array底層的buffer。
如果我們再次調(diào)用port2.postMessage(uint8Array),我們會得到下面的錯誤:
DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.
buffer是TypedArray的底層存儲結(jié)構(gòu),如果buffer被transfer,那么之前的TypedArray將會變得不可用。
markAsUntransferable
要想避免這個問題,我們可以調(diào)用markAsUntransferable將buffer標(biāo)記為不可transferable. 我們看一個markAsUntransferable的例子:
const { MessageChannel, markAsUntransferable } = require('worker_threads');
const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);
markAsUntransferable(pooledBuffer);
const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer ]);
console.log(typedArray1);
console.log(typedArray2);
SHARE_ENV
SHARE_ENV是傳遞給worker構(gòu)造函數(shù)的一個env變量,通過設(shè)置這個變量,我們可以在主線程與工作線程進(jìn)行共享環(huán)境變量的讀寫。
const { Worker, SHARE_ENV } = require('worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
.on('exit', () => {
console.log(process.env.SET_IN_WORKER); // Prints 'foo'.
});
workerData
除了postMessage(),還可以通過在主線程中傳遞workerData給worker的構(gòu)造函數(shù),從而將主線程中的數(shù)據(jù)傳遞給worker:
const { Worker, isMainThread, workerData } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
console.log(workerData); // Prints 'Hello, world!'.
}
worker類
先看一下worker的定義:
class Worker extends EventEmitter {
readonly stdin: Writable | null;
readonly stdout: Readable;
readonly stderr: Readable;
readonly threadId: number;
readonly resourceLimits?: ResourceLimits;
constructor(filename: string | URL, options?: WorkerOptions);
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
ref(): void;
unref(): void;
terminate(): Promise<number>;
getHeapSnapshot(): Promise<Readable>;
addListener(event: "error", listener: (err: Error) => void): this;
addListener(event: "exit", listener: (exitCode: number) => void): this;
addListener(event: "message", listener: (value: any) => void): this;
addListener(event: "online", listener: () => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
...
}
worker繼承自EventEmitter,并且包含了4個重要的事件:error,exit,message和online。
worker表示的是一個獨立的 JavaScript 執(zhí)行線程,我們可以通過傳遞filename或者URL來構(gòu)造worker。
每一個worker都有一對內(nèi)置的MessagePort,在worker創(chuàng)建的時候就會相互關(guān)聯(lián)。worker使用這對內(nèi)置的MessagePort來和父線程進(jìn)行通信。
通過parentPort.postMessage()發(fā)送的消息在主線程中將可以通過worker.on(‘message')接收。
主線程中通過worker.postMessage()發(fā)送的消息將可以在工作線程中通過parentPort.on(‘message')接收。
當(dāng)然,你也可以顯式的創(chuàng)建MessageChannel 對象,然后將MessagePort作為消息傳遞給其他線程,我們看一個例子:
const assert = require('assert');
const {
Worker, MessageChannel, MessagePort, isMainThread, parentPort
} = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
const subChannel = new MessageChannel();
worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
subChannel.port2.on('message', (value) => {
console.log('接收到:', value);
});
} else {
parentPort.once('message', (value) => {
assert(value.hereIsYourPort instanceof MessagePort);
value.hereIsYourPort.postMessage('工作線程正在發(fā)送此消息');
value.hereIsYourPort.close();
});
}
上面的例子中,我們借助了worker和parentPort本身的消息傳遞功能,傳遞了一個顯式的MessageChannel中的MessagePort。
然后又通過該MessagePort來進(jìn)行消息的分發(fā)。
receiveMessageOnPort
除了port的on(‘message')方法之外,我們還可以使用receiveMessageOnPort來手動接收消息:
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });
console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined
moveMessagePortToContext
先了解一下nodejs中的Context的概念,我們可以從vm中創(chuàng)建context,它是一個隔離的上下文環(huán)境,從而保證不同運行環(huán)境的安全性,我們看一個context的例子:
const vm = require('vm');
const x = 1;
const context = { x: 2 };
vm.createContext(context); // 上下文隔離化對象。
const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全局變量。
// 最初,x 的值為 2,因為這是 context.x 的值。
vm.runInContext(code, context);
console.log(context.x); // 42
console.log(context.y); // 17
console.log(x); // 1; y 沒有定義。
在worker中,我們可以將一個MessagePort move到其他的context中。
worker.moveMessagePortToContext(port, contextifiedSandbox)
這個方法接收兩個參數(shù),第一個參數(shù)就是要move的MessagePort,第二個參數(shù)就是vm.createContext()創(chuàng)建的context對象。
worker_threads的線程池
上面我們提到了使用單個的worker thread,但是現(xiàn)在程序中一個線程往往是不夠的,我們需要創(chuàng)建一個線程池來維護(hù)worker thread對象。
nodejs提供了AsyncResource類,來作為對異步資源的擴(kuò)展。
AsyncResource類是async_hooks模塊中的。
下面我們看下怎么使用AsyncResource類來創(chuàng)建worker的線程池。
假設(shè)我們有一個task,使用來執(zhí)行兩個數(shù)相加,腳本名字叫做task_processor.js:
const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
下面是worker pool的實現(xiàn):
const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
我們給worker創(chuàng)建了一個新的kTaskInfo屬性,并且將異步的callback封裝到WorkerPoolTaskInfo中,賦值給worker.kTaskInfo.
接下來我們就可以使用workerPool了:
const WorkerPool = require('./worker_pool.js');
const os = require('os');
const pool = new WorkerPool(os.cpus().length);
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
到此這篇關(guān)于nodejs中使用worker_threads來創(chuàng)建新的線程的方法的文章就介紹到這了,更多相關(guān)nodejs使用worker_threads創(chuàng)建線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
手把手教你更優(yōu)雅的修改node_modules里的代碼
這篇文章主要給大家介紹了關(guān)于如何更優(yōu)雅的修改node_modules里的代碼的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2023-02-02
使用nodejs實現(xiàn)JSON文件自動轉(zhuǎn)Excel的工具(推薦)
這篇文章主要介紹了使用nodejs實現(xiàn),JSON文件自動轉(zhuǎn)Excel的工具,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-06-06

