diff options
Diffstat (limited to 'src/js/node/worker_threads.ts')
-rw-r--r-- | src/js/node/worker_threads.ts | 68 |
1 files changed, 43 insertions, 25 deletions
diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index a0b757708..fd099a023 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -1,4 +1,26 @@ -const { MessageChannel, BroadcastChannel } = globalThis; +// import type { Readable, Writable } from "node:stream"; +// import type { WorkerOptions } from "node:worker_threads"; +declare const self: typeof globalThis; +type WebWorker = InstanceType<typeof globalThis.Worker>; + +const EventEmitter = require("node:events"); +const { throwNotImplemented } = require("../internal/shared"); + +const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis; +const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); + +const isMainThread = Bun.isMainThread; +let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads"); + +type NodeWorkerOptions = import("node:worker_threads").WorkerOptions; + +const emittedWarnings = new Set(); +function emitWarning(type, message) { + if (emittedWarnings.has(type)) return; + emittedWarnings.add(type); + // process.emitWarning(message); // our printing is bad + console.warn("[bun] Warning:", message); +} function injectFakeEmitter(Class) { function messageEventHandler(event: MessageEvent) { @@ -76,10 +98,6 @@ injectFakeEmitter(_MessagePort); const MessagePort = _MessagePort; -const EventEmitter = require("node:events"); -const isMainThread = Bun.isMainThread; -let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads"); -let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); let resourceLimits = {}; let workerData = _workerData; @@ -111,7 +129,7 @@ function fakeParentPort() { }); Object.defineProperty(fake, "postMessage", { - value(...args: any[]) { + value(...args: [any, any]) { return self.postMessage(...args); }, }); @@ -154,6 +172,7 @@ function fakeParentPort() { return fake; } +let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); function getEnvironmentData() { return process.env; @@ -164,27 +183,22 @@ function setEnvironmentData(env: any) { } function markAsUntransferable() { - throw new Error("markAsUntransferable is not implemented"); + throwNotImplemented("worker_threads.markAsUntransferable"); } function moveMessagePortToContext() { - throw new Error("moveMessagePortToContext is not implemented"); + throwNotImplemented("worker_threads.moveMessagePortToContext"); } -const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); - -const WebWorker = globalThis.Worker; class Worker extends EventEmitter { - #worker: globalThis.Worker; + #worker: WebWorker; #performance; #onExitPromise = undefined; - constructor(filename: string, options: any = {}) { + constructor(filename: string, options: NodeWorkerOptions = {}) { super(); - - this.#worker = new WebWorker(filename, { - ...options, - }); + // TODO: stdin, stdout, stderr, and other node specific options. + this.#worker = new WebWorker(filename, options); this.#worker.addEventListener("close", this.#onClose.bind(this)); this.#worker.addEventListener("error", this.#onError.bind(this)); this.#worker.addEventListener("message", this.#onMessage.bind(this)); @@ -218,7 +232,12 @@ class Worker extends EventEmitter { get performance() { return (this.#performance ??= { eventLoopUtilization() { - return {}; + emitWarning("performance", "worker_threads.Worker.performance is not implemented."); + return { + idle: 0, + active: 0, + utilization: 0, + }; }, }); } @@ -232,16 +251,16 @@ class Worker extends EventEmitter { this.#worker.addEventListener( "close", event => { - // TODO: exit code - resolve(0); + resolve(event.code); }, { once: true }, ); + this.#worker.terminate(); return (this.#onExitPromise = promise); } - postMessage(...args: any[]) { + postMessage(...args: [any, any]) { return this.#worker.postMessage(...args); } @@ -261,7 +280,7 @@ class Worker extends EventEmitter { #onMessageError(event: Event) { // TODO: is this right? - this.emit("messageerror", event.error || event); + this.emit("messageerror", (event as any).error || event); } #onOpen() { @@ -269,8 +288,8 @@ class Worker extends EventEmitter { this.emit("online"); } - getHeapSnapshot() { - return {}; + async getHeapSnapshot() { + throwNotImplemented("worker_threads.Worker.getHeapSnapshot"); } } export default { @@ -291,6 +310,5 @@ export default { moveMessagePortToContext, receiveMessageOnPort, SHARE_ENV, - threadId, }; |