diff options
author | 2023-08-02 18:12:12 -0700 | |
---|---|---|
committer | 2023-08-02 18:12:12 -0700 | |
commit | 207c7eb50955eb6f8f4b92fbf5a5955c23ec7e65 (patch) | |
tree | 4c0a1cdb53ec617e2c0ca13db161d6755752fd31 /src/js/node | |
parent | 505e77c2d0a5cafb0b2b321e30086de7e9944302 (diff) | |
download | bun-207c7eb50955eb6f8f4b92fbf5a5955c23ec7e65.tar.gz bun-207c7eb50955eb6f8f4b92fbf5a5955c23ec7e65.tar.zst bun-207c7eb50955eb6f8f4b92fbf5a5955c23ec7e65.zip |
Implement `node:worker_threads` (#3923)
* Start to implement `worker_threads`
* more
* more!!
* more
* Update bundle_v2.zig
* delete outdated tests
* `receiveMessageOnPort`
* props test and export default
* fix merge
* not implemented tests
* individual imports
* `receiveMessageOnPort` tests
---------
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Dylan Conway <dylan.conway567@gmail.com>
Diffstat (limited to 'src/js/node')
-rw-r--r-- | src/js/node/worker_threads.ts | 297 |
1 files changed, 296 insertions, 1 deletions
diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index 2db3a9446..a0b757708 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -1 +1,296 @@ -export default $lazy("masqueradesAsUndefined"); +const { MessageChannel, BroadcastChannel } = globalThis; + +function injectFakeEmitter(Class) { + function messageEventHandler(event: MessageEvent) { + return event.data; + } + + function errorEventHandler(event: ErrorEvent) { + return event.error; + } + + const wrappedListener = Symbol("wrappedListener"); + + function wrapped(run, listener) { + const callback = function (event) { + return listener(run(event)); + }; + listener[wrappedListener] = callback; + return callback; + } + + function functionForEventType(event, listener) { + switch (event) { + case "error": + case "messageerror": { + return wrapped(errorEventHandler, listener); + } + + default: { + return wrapped(messageEventHandler, listener); + } + } + } + + Class.prototype.on = function (event, listener) { + this.addEventListener(event, functionForEventType(event, listener)); + + return this; + }; + + Class.prototype.off = function (event, listener) { + if (listener) { + this.removeEventListener(event, listener[wrappedListener] || listener); + } else { + this.removeEventListener(event); + } + + return this; + }; + + Class.prototype.once = function (event, listener) { + this.addEventListener(event, functionForEventType(event, listener), { once: true }); + + return this; + }; + + function EventClass(eventName) { + if (eventName === "error" || eventName === "messageerror") { + return ErrorEvent; + } + + return MessageEvent; + } + + Class.prototype.emit = function (event, ...args) { + this.dispatchEvent(new (EventClass(event))(event, ...args)); + return this; + }; + + Class.prototype.prependListener = Class.prototype.on; + Class.prototype.prependOnceListener = Class.prototype.once; +} + +const _MessagePort = globalThis.MessagePort; +injectFakeEmitter(_MessagePort); + +const MessagePort = _MessagePort; + +const EventEmitter = require("node:events"); +const isMainThread = Bun.isMainThread; +let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads"); +let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); +let resourceLimits = {}; + +let workerData = _workerData; +let threadId = _threadId; +function receiveMessageOnPort(port: MessagePort) { + let res = _receiveMessageOnPort(port); + if (!res) return undefined; + return { + message: res, + }; +} + +function fakeParentPort() { + const fake = Object.create(MessagePort.prototype); + Object.defineProperty(fake, "onmessage", { + get() { + return self.onmessage; + }, + set(value) { + self.onmessage = value; + }, + }); + + Object.defineProperty(fake, "onmessageerror", { + get() { + return self.onmessageerror; + }, + set(value) {}, + }); + + Object.defineProperty(fake, "postMessage", { + value(...args: any[]) { + return self.postMessage(...args); + }, + }); + + Object.defineProperty(fake, "close", { + value() { + return process.exit(0); + }, + }); + + Object.defineProperty(fake, "start", { + value() {}, + }); + + Object.defineProperty(fake, "unref", { + value() {}, + }); + + Object.defineProperty(fake, "ref", { + value() {}, + }); + + Object.defineProperty(fake, "hasRef", { + value() { + return false; + }, + }); + + Object.defineProperty(fake, "setEncoding", { + value() {}, + }); + + Object.defineProperty(fake, "addEventListener", { + value: self.addEventListener.bind(self), + }); + + Object.defineProperty(fake, "removeEventListener", { + value: self.removeEventListener.bind(self), + }); + + return fake; +} + +function getEnvironmentData() { + return process.env; +} + +function setEnvironmentData(env: any) { + process.env = env; +} + +function markAsUntransferable() { + throw new Error("markAsUntransferable is not implemented"); +} + +function moveMessagePortToContext() { + throw new Error("moveMessagePortToContext is not implemented"); +} + +const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); + +const WebWorker = globalThis.Worker; +class Worker extends EventEmitter { + #worker: globalThis.Worker; + #performance; + #onExitPromise = undefined; + + constructor(filename: string, options: any = {}) { + super(); + + this.#worker = new WebWorker(filename, { + ...options, + }); + this.#worker.addEventListener("close", this.#onClose.bind(this)); + this.#worker.addEventListener("error", this.#onError.bind(this)); + this.#worker.addEventListener("message", this.#onMessage.bind(this)); + this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this)); + this.#worker.addEventListener("open", this.#onOpen.bind(this)); + } + + ref() { + this.#worker.ref(); + } + + unref() { + this.#worker.unref(); + } + + get stdin() { + // TODO: + return null; + } + + get stdout() { + // TODO: + return null; + } + + get stderr() { + // TODO: + return null; + } + + get performance() { + return (this.#performance ??= { + eventLoopUtilization() { + return {}; + }, + }); + } + + terminate() { + if (this.#onExitPromise) { + return this.#onExitPromise; + } + + const { resolve, promise } = Promise.withResolvers(); + this.#worker.addEventListener( + "close", + event => { + // TODO: exit code + resolve(0); + }, + { once: true }, + ); + + return (this.#onExitPromise = promise); + } + + postMessage(...args: any[]) { + return this.#worker.postMessage(...args); + } + + #onClose() { + this.emit("exit"); + } + + #onError(event: ErrorEvent) { + // TODO: is this right? + this.emit("error", event); + } + + #onMessage(event: MessageEvent) { + // TODO: is this right? + this.emit("message", event.data); + } + + #onMessageError(event: Event) { + // TODO: is this right? + this.emit("messageerror", event.error || event); + } + + #onOpen() { + // TODO: is this right? + this.emit("online"); + } + + getHeapSnapshot() { + return {}; + } +} +export default { + Worker, + workerData, + parentPort, + resourceLimits, + isMainThread, + MessageChannel, + BroadcastChannel, + MessagePort, + getEnvironmentData, + setEnvironmentData, + getHeapSnapshot() { + return {}; + }, + markAsUntransferable, + moveMessagePortToContext, + receiveMessageOnPort, + SHARE_ENV, + + threadId, +}; |