diff options
23 files changed, 723 insertions, 185 deletions
diff --git a/packages/bun-types/globals.d.ts b/packages/bun-types/globals.d.ts index 5ab3ac6fa..d03fe0232 100644 --- a/packages/bun-types/globals.d.ts +++ b/packages/bun-types/globals.d.ts @@ -373,8 +373,170 @@ declare type MessageChannel = import("worker_threads").MessageChannel; declare var BroadcastChannel: typeof import("worker_threads").BroadcastChannel; declare type BroadcastChannel = import("worker_threads").BroadcastChannel; -declare var Worker: typeof import("worker_threads").Worker; -declare type Worker = typeof import("worker_threads").Worker; +interface AbstractWorkerEventMap { + error: ErrorEvent; +} + +interface WorkerEventMap extends AbstractWorkerEventMap { + message: MessageEvent; + messageerror: MessageEvent; + close: CloseEvent; +} + +interface AbstractWorker { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ServiceWorker/error_event) */ + onerror: ((this: AbstractWorker, ev: ErrorEvent) => any) | null; + addEventListener<K extends keyof AbstractWorkerEventMap>( + type: K, + listener: (this: AbstractWorker, ev: AbstractWorkerEventMap[K]) => any, + options?: boolean | AddEventListenerOptions, + ): void; + addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void; + removeEventListener<K extends keyof AbstractWorkerEventMap>( + type: K, + listener: (this: AbstractWorker, ev: AbstractWorkerEventMap[K]) => any, + options?: boolean | EventListenerOptions, + ): void; + removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions, + ): void; +} + +/** + * Bun's Web Worker constructor supports some extra options on top of the API browsers have. + */ +interface WorkerOptions { + /** + * A string specifying an identifying name for the DedicatedWorkerGlobalScope representing the scope of + * the worker, which is mainly useful for debugging purposes. + */ + name?: string; + + /** + * Use less memory, but make the worker slower. + * + * Internally, this sets the heap size configuration in JavaScriptCore to be + * the small heap instead of the large heap. + */ + smol?: boolean; + + /** + * When `true`, the worker will keep the parent thread alive until the worker is terminated or `unref`'d. + * When `false`, the worker will not keep the parent thread alive. + * + * By default, this is `false`. + */ + ref?: boolean; + + /** + * In Bun, this does nothing. + */ + type?: string; + + /** + * List of arguments which would be stringified and appended to + * `Bun.argv` / `process.argv` in the worker. This is mostly similar to the `data` + * but the values will be available on the global `Bun.argv` as if they + * were passed as CLI options to the script. + */ + // argv?: any[] | undefined; + + /** If `true` and the first argument is a string, interpret the first argument to the constructor as a script that is executed once the worker is online. */ + // eval?: boolean | undefined; + + /** + * If set, specifies the initial value of process.env inside the Worker thread. As a special value, worker.SHARE_ENV may be used to specify that the parent thread and the child thread should share their environment variables; in that case, changes to one thread's process.env object affect the other thread as well. Default: process.env. + */ + env?: + | Record<string, string> + | typeof import("node:worker_threads")["SHARE_ENV"] + | undefined; + + /** + * In Bun, this does nothing. + */ + credentials?: string; + + /** + * @default true + */ + // trackUnmanagedFds?: boolean; + + // resourceLimits?: import("worker_threads").ResourceLimits; +} + +interface Worker extends EventTarget, AbstractWorker { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/message_event) */ + onmessage: ((this: Worker, ev: MessageEvent) => any) | null; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/messageerror_event) */ + onmessageerror: ((this: Worker, ev: MessageEvent) => any) | null; + /** + * Clones message and transmits it to worker's global environment. transfer can be passed as a list of objects that are to be transferred rather than cloned. + * + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/postMessage) + */ + postMessage(message: any, transfer: Transferable[]): void; + postMessage(message: any, options?: StructuredSerializeOptions): void; + /** + * Aborts worker's associated global environment. + * + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/terminate) + */ + terminate(): void; + addEventListener<K extends keyof WorkerEventMap>( + type: K, + listener: (this: Worker, ev: WorkerEventMap[K]) => any, + options?: boolean | AddEventListenerOptions, + ): void; + addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void; + removeEventListener<K extends keyof WorkerEventMap>( + type: K, + listener: (this: Worker, ev: WorkerEventMap[K]) => any, + options?: boolean | EventListenerOptions, + ): void; + removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions, + ): void; + + /** + * Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker does _not_ let the program exit if it's the only active handle left (the default + * behavior). If the worker is `ref()`ed, calling `ref()` again has + * no effect. + * @since v10.5.0 + */ + ref(): void; + /** + * Calling `unref()` on a worker allows the thread to exit if this is the only + * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect. + * @since v10.5.0 + */ + unref(): void; + + threadId: number; +} + +declare var Worker: { + prototype: Worker; + new (scriptURL: string | URL, options?: WorkerOptions): Worker; + /** + * This is the cloned value of the `data` property passed to `new Worker()` + * + * This is Bun's equivalent of `workerData` in Node.js. + */ + data: any; +}; interface EncodeIntoResult { /** diff --git a/packages/bun-types/perf_hooks.d.ts b/packages/bun-types/perf_hooks.d.ts index 792223e6b..587602a26 100644 --- a/packages/bun-types/perf_hooks.d.ts +++ b/packages/bun-types/perf_hooks.d.ts @@ -145,16 +145,19 @@ declare module "perf_hooks" { // */ // readonly v8Start: number; // } - // interface EventLoopUtilization { - // idle: number; - // active: number; - // utilization: number; - // } + interface EventLoopUtilization { + idle: number; + active: number; + utilization: number; + } // /** // * @param util1 The result of a previous call to eventLoopUtilization() // * @param util2 The result of a previous call to eventLoopUtilization() prior to util1 // */ - // type EventLoopUtilityFunction = (util1?: EventLoopUtilization, util2?: EventLoopUtilization) => EventLoopUtilization; + type EventLoopUtilityFunction = ( + util1?: EventLoopUtilization, + util2?: EventLoopUtilization, + ) => EventLoopUtilization; // interface MarkOptions { // /** // * Additional optional detail to include with the mark. diff --git a/packages/bun-types/tests/worker.test-d.ts b/packages/bun-types/tests/worker.test-d.ts index 8a2ec7883..dc457ccaa 100644 --- a/packages/bun-types/tests/worker.test-d.ts +++ b/packages/bun-types/tests/worker.test-d.ts @@ -1,18 +1,29 @@ -import { Worker } from "node:worker_threads"; +import { Worker as NodeWorker } from "node:worker_threads"; +import * as tsd from "tsd"; -const _workerthread = new Worker("./worker.js"); -_workerthread; -const worker = new Worker("./worker.ts"); -worker.addEventListener("message", (event: MessageEvent) => { - console.log("Message from worker:", event.data); +const webWorker = new Worker("./worker.js"); + +webWorker.addEventListener("message", event => { + tsd.expectType<MessageEvent>(event); +}); +webWorker.addEventListener("error", event => { + tsd.expectType<ErrorEvent>(event); +}); +webWorker.addEventListener("messageerror", event => { + tsd.expectType<MessageEvent>(event); +}); + +const nodeWorker = new NodeWorker("./worker.ts"); +nodeWorker.on("message", event => { + console.log("Message from worker:", event); }); -worker.postMessage("Hello from main thread!"); +nodeWorker.postMessage("Hello from main thread!"); const workerURL = new URL("worker.ts", import.meta.url).href; const _worker2 = new Worker(workerURL); -worker.postMessage("hello"); -worker.onmessage = event => { +nodeWorker.postMessage("hello"); +webWorker.onmessage = event => { console.log(event.data); }; @@ -20,15 +31,20 @@ worker.onmessage = event => { postMessage({ hello: "world" }); // On the main thread -worker.postMessage({ hello: "world" }); +nodeWorker.postMessage({ hello: "world" }); // ...some time later -worker.terminate(); +nodeWorker.terminate(); // Bun.pathToFileURL const _worker3 = new Worker(new URL("worker.ts", import.meta.url).href, { ref: true, smol: true, + credentials: "", + name: "a name", + env: { + envValue: "hello", + }, }); -export { worker, _worker2, _worker3 }; +export { nodeWorker as worker, _worker2, _worker3 }; diff --git a/packages/bun-types/worker_threads.d.ts b/packages/bun-types/worker_threads.d.ts index 49f844ab0..ddc59fec5 100644 --- a/packages/bun-types/worker_threads.d.ts +++ b/packages/bun-types/worker_threads.d.ts @@ -53,9 +53,10 @@ */ declare module "worker_threads" { // import { Blob } from "node:buffer"; + import { Readable, Writable } from "node:stream"; import { Context } from "node:vm"; import { EventEmitter } from "node:events"; - // import { EventLoopUtilityFunction } from "node:perf_hooks"; + import { EventLoopUtilityFunction } from "node:perf_hooks"; // import { FileHandle } from "node:fs/promises"; // import { Readable, Writable } from "node:stream"; import { URL } from "node:url"; @@ -67,9 +68,9 @@ declare module "worker_threads" { const threadId: number; const workerData: any; - // interface WorkerPerformance { - // eventLoopUtilization: EventLoopUtilityFunction; - // } + interface WorkerPerformance { + eventLoopUtilization: EventLoopUtilityFunction; + } type TransferListItem = | ArrayBuffer | MessagePort @@ -251,28 +252,73 @@ declare module "worker_threads" { } interface WorkerOptions { /** + * A string specifying an identifying name for the DedicatedWorkerGlobalScope representing the scope of + * the worker, which is mainly useful for debugging purposes. + */ + name?: string; + + /** + * Use less memory, but make the worker slower. + * + * Internally, this sets the heap size configuration in JavaScriptCore to be + * the small heap instead of the large heap. + */ + smol?: boolean; + + /** + * When `true`, the worker will keep the parent thread alive until the worker is terminated or `unref`'d. + * When `false`, the worker will not keep the parent thread alive. + * + * By default, this is `false`. + */ + ref?: boolean; + + /** + * In Bun, this does nothing. + */ + type?: string; + + /** * List of arguments which would be stringified and appended to - * `process.argv` in the worker. This is mostly similar to the `workerData` - * but the values will be available on the global `process.argv` as if they + * `Bun.argv` / `process.argv` in the worker. This is mostly similar to the `data` + * but the values will be available on the global `Bun.argv` as if they * were passed as CLI options to the script. */ - argv?: any[] | undefined; - env?: Record<string, string> | typeof SHARE_ENV | undefined; - eval?: boolean | undefined; - workerData?: any; - stdin?: boolean | undefined; - stdout?: boolean | undefined; - stderr?: boolean | undefined; - execArgv?: string[] | undefined; - resourceLimits?: ResourceLimits | undefined; + // argv?: any[] | undefined; + + /** If `true` and the first argument is a string, interpret the first argument to the constructor as a script that is executed once the worker is online. */ + // eval?: boolean | undefined; + /** - * Additional data to send in the first worker message. + * If set, specifies the initial value of process.env inside the Worker thread. As a special value, worker.SHARE_ENV may be used to specify that the parent thread and the child thread should share their environment variables; in that case, changes to one thread's process.env object affect the other thread as well. Default: process.env. */ - transferList?: TransferListItem[] | undefined; + env?: + | Record<string, string> + | typeof import("node:worker_threads")["SHARE_ENV"] + | undefined; + + /** + * In Bun, this does nothing. + */ + credentials?: string; + /** * @default true */ - trackUnmanagedFds?: boolean | undefined; + // trackUnmanagedFds?: boolean; + + workerData?: any; + + /** + * An array of objects that are transferred rather than cloned when being passed between threads. + */ + transferList?: import("worker_threads").TransferListItem[]; + + // resourceLimits?: import("worker_threads").ResourceLimits; + // stdin?: boolean | undefined; + // stdout?: boolean | undefined; + // stderr?: boolean | undefined; + // execArgv?: string[] | undefined; } interface ResourceLimits { /** @@ -356,76 +402,164 @@ declare module "worker_threads" { * ``` * @since v10.5.0 */ - interface Worker extends EventTarget { - onerror: ((this: Worker, ev: ErrorEvent) => any) | null; - onmessage: ((this: Worker, ev: MessageEvent) => any) | null; - onmessageerror: ((this: Worker, ev: MessageEvent) => any) | null; - - addEventListener<K extends keyof WorkerEventMap>( - type: K, - listener: (this: Worker, ev: WorkerEventMap[K]) => any, - options?: boolean | AddEventListenerOptions, - ): void; - - removeEventListener<K extends keyof WorkerEventMap>( - type: K, - listener: (this: Worker, ev: WorkerEventMap[K]) => any, - options?: boolean | EventListenerOptions, - ): void; - - terminate(): void; - - postMessage(message: any, transfer?: Transferable[]): void; - + class Worker extends EventEmitter { /** - * Keep the process alive until the worker is terminated or `unref`'d + * If `stdin: true` was passed to the `Worker` constructor, this is a + * writable stream. The data written to this stream will be made available in + * the worker thread as `process.stdin`. + * @since v10.5.0 */ - ref(): void; + readonly stdin: Writable | null; /** - * Undo a previous `ref()` + * This is a readable stream which contains data written to `process.stdout` inside the worker thread. If `stdout: true` was not passed to the `Worker` constructor, then data is piped to the + * parent thread's `process.stdout` stream. + * @since v10.5.0 */ - unref(): void; - + readonly stdout: Readable; /** - * Unique per-process thread ID. Main thread ID is always `0`. + * This is a readable stream which contains data written to `process.stderr` inside the worker thread. If `stderr: true` was not passed to the `Worker` constructor, then data is piped to the + * parent thread's `process.stderr` stream. + * @since v10.5.0 + */ + readonly stderr: Readable; + /** + * An integer identifier for the referenced thread. Inside the worker thread, + * it is available as `require('node:worker_threads').threadId`. + * This value is unique for each `Worker` instance inside a single process. + * @since v10.5.0 */ readonly threadId: number; - } - var Worker: { - prototype: Worker; - new (stringUrl: string | URL, options?: WorkerOptions): Worker; - }; - interface WorkerOptions { - name?: string; - /** - * Use less memory, but make the worker slower. + * Provides the set of JS engine resource constraints for this Worker thread. + * If the `resourceLimits` option was passed to the `Worker` constructor, + * this matches its values. * - * Internally, this sets the heap size configuration in JavaScriptCore to be - * the small heap instead of the large heap. + * If the worker has stopped, the return value is an empty object. + * @since v13.2.0, v12.16.0 */ - smol?: boolean; - + readonly resourceLimits?: ResourceLimits | undefined; /** - * When `true`, the worker will keep the parent thread alive until the worker is terminated or `unref`'d. - * When `false`, the worker will not keep the parent thread alive. - * - * By default, this is `false`. + * An object that can be used to query performance information from a worker + * instance. Similar to `perf_hooks.performance`. + * @since v15.1.0, v14.17.0, v12.22.0 */ - ref?: boolean; - + readonly performance: WorkerPerformance; /** - * Does nothing in Bun + * @param filename The path to the Worker’s main script or module. + * Must be either an absolute path or a relative path (i.e. relative to the current working directory) starting with ./ or ../, + * or a WHATWG URL object using file: protocol. If options.eval is true, this is a string containing JavaScript code rather than a path. */ - type?: string; - } - - interface WorkerEventMap { - message: MessageEvent; - messageerror: MessageEvent; - error: ErrorEvent; - open: Event; - close: Event; + constructor(filename: string | URL, options?: WorkerOptions); + /** + * Send a message to the worker that is received via `require('node:worker_threads').parentPort.on('message')`. + * See `port.postMessage()` for more details. + * @since v10.5.0 + */ + postMessage( + value: any, + transferList?: ReadonlyArray<TransferListItem>, + ): void; + /** + * Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker does _not_ let the program exit if it's the only active handle left (the default + * behavior). If the worker is `ref()`ed, calling `ref()` again has + * no effect. + * @since v10.5.0 + */ + ref(): void; + /** + * Calling `unref()` on a worker allows the thread to exit if this is the only + * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect. + * @since v10.5.0 + */ + unref(): void; + /** + * Stop all JavaScript execution in the worker thread as soon as possible. + * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted. + * @since v10.5.0 + */ + terminate(): Promise<number>; + /** + * Returns a readable stream for a V8 snapshot of the current state of the Worker. + * See `v8.getHeapSnapshot()` for more details. + * + * If the Worker thread is no longer running, which may occur before the `'exit' event` is emitted, the returned `Promise` is rejected + * immediately with an `ERR_WORKER_NOT_RUNNING` error. + * @since v13.9.0, v12.17.0 + * @return A promise for a Readable Stream containing a V8 heap snapshot + */ + getHeapSnapshot(): Promise<Readable>; + addListener(event: "error", listener: (err: Error) => void): this; + addListener(event: "exit", listener: (exitCode: number) => void): this; + addListener(event: "message", listener: (value: any) => void): this; + addListener(event: "messageerror", listener: (error: Error) => void): this; + addListener(event: "online", listener: () => void): this; + addListener( + event: string | symbol, + listener: (...args: any[]) => void, + ): this; + emit(event: "error", err: Error): boolean; + emit(event: "exit", exitCode: number): boolean; + emit(event: "message", value: any): boolean; + emit(event: "messageerror", error: Error): boolean; + emit(event: "online"): boolean; + emit(event: string | symbol, ...args: any[]): boolean; + on(event: "error", listener: (err: Error) => void): this; + on(event: "exit", listener: (exitCode: number) => void): this; + on(event: "message", listener: (value: any) => void): this; + on(event: "messageerror", listener: (error: Error) => void): this; + on(event: "online", listener: () => void): this; + on(event: string | symbol, listener: (...args: any[]) => void): this; + once(event: "error", listener: (err: Error) => void): this; + once(event: "exit", listener: (exitCode: number) => void): this; + once(event: "message", listener: (value: any) => void): this; + once(event: "messageerror", listener: (error: Error) => void): this; + once(event: "online", listener: () => void): this; + once(event: string | symbol, listener: (...args: any[]) => void): this; + prependListener(event: "error", listener: (err: Error) => void): this; + prependListener(event: "exit", listener: (exitCode: number) => void): this; + prependListener(event: "message", listener: (value: any) => void): this; + prependListener( + event: "messageerror", + listener: (error: Error) => void, + ): this; + prependListener(event: "online", listener: () => void): this; + prependListener( + event: string | symbol, + listener: (...args: any[]) => void, + ): this; + prependOnceListener(event: "error", listener: (err: Error) => void): this; + prependOnceListener( + event: "exit", + listener: (exitCode: number) => void, + ): this; + prependOnceListener(event: "message", listener: (value: any) => void): this; + prependOnceListener( + event: "messageerror", + listener: (error: Error) => void, + ): this; + prependOnceListener(event: "online", listener: () => void): this; + prependOnceListener( + event: string | symbol, + listener: (...args: any[]) => void, + ): this; + removeListener(event: "error", listener: (err: Error) => void): this; + removeListener(event: "exit", listener: (exitCode: number) => void): this; + removeListener(event: "message", listener: (value: any) => void): this; + removeListener( + event: "messageerror", + listener: (error: Error) => void, + ): this; + removeListener(event: "online", listener: () => void): this; + removeListener( + event: string | symbol, + listener: (...args: any[]) => void, + ): this; + off(event: "error", listener: (err: Error) => void): this; + off(event: "exit", listener: (exitCode: number) => void): this; + off(event: "message", listener: (value: any) => void): this; + off(event: "messageerror", listener: (error: Error) => void): this; + off(event: "online", listener: () => void): this; + off(event: string | symbol, listener: (...args: any[]) => void): this; } interface BroadcastChannelEventMap { diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 3f4a8d044..3de7d3daa 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -429,7 +429,7 @@ static String computeErrorInfo(JSC::VM& vm, Vector<StackFrame>& stackTrace, unsi } extern "C" JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* globalObjectClass, int count, - void* console_client, int32_t executionContextId, bool miniMode) + void* console_client, int32_t executionContextId, bool miniMode, void* worker_ptr) { auto heapSize = miniMode ? JSC::HeapType::Small : JSC::HeapType::Large; @@ -448,6 +448,20 @@ extern "C" JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* globalObje vm, Zig::GlobalObject::createStructure(vm, JSC::JSGlobalObject::create(vm, JSC::JSGlobalObject::createStructure(vm, JSC::jsNull())), JSC::jsNull()), static_cast<ScriptExecutionContextIdentifier>(executionContextId)); + + if (auto* worker = static_cast<WebCore::Worker*>(worker_ptr)) { + auto& options = worker->options(); + if (options.bun.env) { + auto map = *options.bun.env; + auto size = map.size(); + auto env = JSC::constructEmptyObject(globalObject, globalObject->objectPrototype(), size > 63 ? 63 : size); + for (auto k : map) { + env->putDirect(vm, JSC::Identifier::fromString(vm, WTFMove(k.key)), JSC::jsString(vm, WTFMove(k.value))); + } + map.clear(); + globalObject->m_processEnvObject.set(vm, globalObject, env); + } + } } else { globalObject = Zig::GlobalObject::create( vm, diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h index 9d84e5214..1d82cd0f3 100644 --- a/src/bun.js/bindings/ZigGlobalObject.h +++ b/src/bun.js/bindings/ZigGlobalObject.h @@ -457,6 +457,8 @@ public: Bun::JSMockModule mockModule; + LazyProperty<JSGlobalObject, JSObject> m_processEnvObject; + #include "ZigGeneratedClasses+lazyStructureHeader.h" private: @@ -519,7 +521,6 @@ private: LazyProperty<JSGlobalObject, JSObject> m_JSHTTPSResponseControllerPrototype; LazyProperty<JSGlobalObject, JSObject> m_navigatorObject; LazyProperty<JSGlobalObject, JSObject> m_performanceObject; - LazyProperty<JSGlobalObject, JSObject> m_processEnvObject; LazyProperty<JSGlobalObject, JSObject> m_processObject; LazyProperty<JSGlobalObject, JSObject> m_subtleCryptoObject; LazyProperty<JSGlobalObject, Structure> m_JSHTTPResponseController; diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig index 6d57798fd..265ada40a 100644 --- a/src/bun.js/bindings/exports.zig +++ b/src/bun.js/bindings/exports.zig @@ -46,8 +46,9 @@ pub const ZigGlobalObject = extern struct { console: *anyopaque, context_id: i32, mini_mode: bool, + worker_ptr: ?*anyopaque, ) *JSGlobalObject { - var global = shim.cppFn("create", .{ class_ref, count, console, context_id, mini_mode }); + var global = shim.cppFn("create", .{ class_ref, count, console, context_id, mini_mode, worker_ptr }); Backtrace.reloadHandlers() catch unreachable; return global; } diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index 9a6d1b72a..63ae6c3a4 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -579,7 +579,7 @@ ZIG_DECL JSC__JSValue Crypto__timingSafeEqual__slowpath(JSC__JSGlobalObject* arg #pragma mark - Zig::GlobalObject -CPP_DECL JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* arg0, int32_t arg1, void* arg2, int32_t arg3, bool arg4); +CPP_DECL JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* arg0, int32_t arg1, void* arg2, int32_t arg3, bool arg4, void* arg5); CPP_DECL void* Zig__GlobalObject__getModuleRegistryMap(JSC__JSGlobalObject* arg0); CPP_DECL bool Zig__GlobalObject__resetModuleRegistryMap(JSC__JSGlobalObject* arg0, void* arg1); diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig index d39793c07..2b25c0f5b 100644 --- a/src/bun.js/bindings/headers.zig +++ b/src/bun.js/bindings/headers.zig @@ -351,7 +351,7 @@ pub extern fn Reader__intptr__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC_ pub extern fn Crypto__getRandomValues__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) void; pub extern fn Crypto__randomUUID__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) void; pub extern fn Crypto__timingSafeEqual__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) void; -pub extern fn Zig__GlobalObject__create(arg0: [*c]JSClassRef, arg1: i32, arg2: ?*anyopaque, arg3: i32, arg4: bool) *bindings.JSGlobalObject; +pub extern fn Zig__GlobalObject__create(arg0: [*c]JSClassRef, arg1: i32, arg2: ?*anyopaque, arg3: i32, arg4: bool, arg5: ?*anyopaque) *bindings.JSGlobalObject; pub extern fn Zig__GlobalObject__getModuleRegistryMap(arg0: *bindings.JSGlobalObject) ?*anyopaque; pub extern fn Zig__GlobalObject__resetModuleRegistryMap(arg0: *bindings.JSGlobalObject, arg1: ?*anyopaque) bool; pub extern fn Bun__Path__create(arg0: *bindings.JSGlobalObject, arg1: bool) JSC__JSValue; diff --git a/src/bun.js/bindings/webcore/JSWorker.cpp b/src/bun.js/bindings/webcore/JSWorker.cpp index 434249068..1b65cba47 100644 --- a/src/bun.js/bindings/webcore/JSWorker.cpp +++ b/src/bun.js/bindings/webcore/JSWorker.cpp @@ -128,7 +128,7 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::const EnsureStillAliveScope argument1 = callFrame->argument(1); auto options = WorkerOptions {}; - options.bun.unref = true; + options.bun.unref = false; if (JSObject* optionsObject = JSC::jsDynamicCast<JSC::JSObject*>(argument1.value())) { if (auto nameValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "name"_s))) { @@ -138,61 +138,93 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::const } } - if (auto* bunObject = optionsObject) { - if (auto miniModeValue = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "smol"_s))) { - options.bun.mini = miniModeValue.toBoolean(lexicalGlobalObject); - RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); - } + if (auto miniModeValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "smol"_s))) { + options.bun.mini = miniModeValue.toBoolean(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + } - if (auto ref = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "ref"_s))) { - options.bun.unref = !ref.toBoolean(lexicalGlobalObject); - RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); - } + if (auto ref = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "ref"_s))) { + options.bun.unref = !ref.toBoolean(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + } - auto workerData = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "workerData"_s)); - if (!workerData) { - workerData = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "data"_s)); - } + auto workerData = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "workerData"_s)); + if (!workerData) { + workerData = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "data"_s)); + } - if (workerData) { - Vector<RefPtr<MessagePort>> ports; - Vector<JSC::Strong<JSC::JSObject>> transferList; - - if (JSValue transferListValue = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "transferList"_s))) { - if (transferListValue.isObject()) { - JSC::JSObject* transferListObject = transferListValue.getObject(); - if (auto* transferListArray = jsDynamicCast<JSC::JSArray*>(transferListObject)) { - for (unsigned i = 0; i < transferListArray->length(); i++) { - JSC::JSValue transferListValue = transferListArray->get(lexicalGlobalObject, i); - if (transferListValue.isObject()) { - JSC::JSObject* transferListObject = transferListValue.getObject(); - transferList.append(JSC::Strong<JSC::JSObject>(vm, transferListObject)); - } + if (workerData) { + Vector<RefPtr<MessagePort>> ports; + Vector<JSC::Strong<JSC::JSObject>> transferList; + + if (JSValue transferListValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "transferList"_s))) { + if (transferListValue.isObject()) { + JSC::JSObject* transferListObject = transferListValue.getObject(); + if (auto* transferListArray = jsDynamicCast<JSC::JSArray*>(transferListObject)) { + for (unsigned i = 0; i < transferListArray->length(); i++) { + JSC::JSValue transferListValue = transferListArray->get(lexicalGlobalObject, i); + if (transferListValue.isObject()) { + JSC::JSObject* transferListObject = transferListValue.getObject(); + transferList.append(JSC::Strong<JSC::JSObject>(vm, transferListObject)); } } } } + } + + ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*lexicalGlobalObject, workerData, WTFMove(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage); + if (serialized.hasException()) { + WebCore::propagateException(*lexicalGlobalObject, throwScope, serialized.releaseException()); + return encodedJSValue(); + } - ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*lexicalGlobalObject, workerData, WTFMove(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage); - if (serialized.hasException()) { - WebCore::propagateException(*lexicalGlobalObject, throwScope, serialized.releaseException()); + Vector<TransferredMessagePort> transferredPorts; + + if (!ports.isEmpty()) { + auto disentangleResult = MessagePort::disentanglePorts(WTFMove(ports)); + if (disentangleResult.hasException()) { + WebCore::propagateException(*lexicalGlobalObject, throwScope, disentangleResult.releaseException()); return encodedJSValue(); } + transferredPorts = disentangleResult.releaseReturnValue(); + } - Vector<TransferredMessagePort> transferredPorts; + options.bun.data = WTFMove(serialized.releaseReturnValue()); + options.bun.dataMessagePorts = WTFMove(transferredPorts); + } - if (!ports.isEmpty()) { - auto disentangleResult = MessagePort::disentanglePorts(WTFMove(ports)); - if (disentangleResult.hasException()) { - WebCore::propagateException(*lexicalGlobalObject, throwScope, disentangleResult.releaseException()); - return encodedJSValue(); - } - transferredPorts = disentangleResult.releaseReturnValue(); - } + auto* globalObject = jsCast<Zig::GlobalObject*>(lexicalGlobalObject); + auto envValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "env"_s)); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + JSObject* envObject = nullptr; + + if (envValue && envValue.isCell()) { + envObject = jsDynamicCast<JSC::JSObject*>(envValue); + } else if (globalObject->m_processEnvObject.isInitialized()) { + envObject = globalObject->processEnvObject(); + } - options.bun.data = WTFMove(serialized.releaseReturnValue()); - options.bun.dataMessagePorts = WTFMove(transferredPorts); + if (envObject) { + if (!envObject->staticPropertiesReified()) { + envObject->reifyAllStaticProperties(globalObject); + RETURN_IF_EXCEPTION(throwScope, {}); } + + JSC::PropertyNameArray keys(vm, JSC::PropertyNameMode::Strings, JSC::PrivateSymbolMode::Exclude); + envObject->methodTable()->getOwnPropertyNames(envObject, lexicalGlobalObject, keys, JSC::DontEnumPropertiesMode::Exclude); + RETURN_IF_EXCEPTION(throwScope, {}); + + HashMap<String, String> env; + + for (const auto& key : keys) { + JSValue value = envObject->get(lexicalGlobalObject, key); + RETURN_IF_EXCEPTION(throwScope, {}); + String str = value.toWTFString(lexicalGlobalObject).isolatedCopy(); + RETURN_IF_EXCEPTION(throwScope, {}); + env.add(key.impl()->isolatedCopy(), str); + } + + options.bun.env = std::make_unique<HashMap<String, String>>(WTFMove(env)); } } @@ -390,6 +422,7 @@ JSC_DEFINE_CUSTOM_SETTER(setJSWorker_onerror, (JSGlobalObject * lexicalGlobalObj static inline JSC::EncodedJSValue jsWorkerPrototypeFunction_terminateBody(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWorker>::ClassParameter castedThis) { + printf("bruh \n"); auto& vm = JSC::getVM(lexicalGlobalObject); auto throwScope = DECLARE_THROW_SCOPE(vm); UNUSED_PARAM(throwScope); diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index ea64811e0..f2694b59f 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -61,7 +61,7 @@ #include "MessageEvent.h" #include <JavaScriptCore/HashMapImplInlines.h> #include "BunWorkerGlobalScope.h" - +#include "CloseEvent.h" namespace WebCore { WTF_MAKE_ISO_ALLOCATED_IMPL(Worker); @@ -210,6 +210,7 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m void Worker::terminate() { + printf("terminate\n"); // m_contextProxy.terminateWorkerGlobalScope(); m_wasTerminated = true; WebWorker__terminate(impl_); @@ -339,19 +340,19 @@ void Worker::dispatchError(WTF::String message) protectedThis->dispatchEvent(event); }); } -void Worker::dispatchExit() +void Worker::dispatchExit(int32_t exitCode) { auto* ctx = scriptExecutionContext(); if (!ctx) return; - ScriptExecutionContext::postTaskTo(ctx->identifier(), [protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void { + ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void { protectedThis->m_isOnline = false; protectedThis->m_isClosing = true; protectedThis->setKeepAlive(false); if (protectedThis->hasEventListeners(eventNames().closeEvent)) { - auto event = Event::create(eventNames().closeEvent, Event::CanBubble::No, Event::IsCancelable::No); + auto event = CloseEvent::create(exitCode == 0, static_cast<unsigned short>(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s); protectedThis->dispatchEvent(event); } }); @@ -377,6 +378,8 @@ void Worker::forEachWorker(const Function<Function<void(ScriptExecutionContext&) extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* worker, int32_t exitCode) { + worker->dispatchExit(exitCode); + if (globalObject) { auto* ctx = globalObject->scriptExecutionContext(); if (ctx) { @@ -398,8 +401,6 @@ extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* vm.notifyNeedTermination(); vm.deferredWorkTimer->doWork(vm); } - - worker->dispatchExit(); } extern "C" void WebWorker__dispatchOnline(Worker* worker, Zig::GlobalObject* globalObject) { diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h index e0f328dbb..a296fa4a8 100644 --- a/src/bun.js/bindings/webcore/Worker.h +++ b/src/bun.js/bindings/webcore/Worker.h @@ -94,7 +94,7 @@ public: void drainEvents(); void dispatchOnline(Zig::GlobalObject* workerGlobalObject); void dispatchError(WTF::String message); - void dispatchExit(); + void dispatchExit(int32_t exitCode); ScriptExecutionContext* scriptExecutionContext() const final { return ContextDestructionObserver::scriptExecutionContext(); } ScriptExecutionContextIdentifier clientIdentifier() const { return m_clientIdentifier; } WorkerOptions& options() { return m_options; } diff --git a/src/bun.js/bindings/webcore/WorkerOptions.h b/src/bun.js/bindings/webcore/WorkerOptions.h index 0c307d18e..79eabef98 100644 --- a/src/bun.js/bindings/webcore/WorkerOptions.h +++ b/src/bun.js/bindings/webcore/WorkerOptions.h @@ -12,6 +12,7 @@ struct BunOptions { bool unref { false }; RefPtr<SerializedScriptValue> data; Vector<TransferredMessagePort> dataMessagePorts; + std::unique_ptr<HashMap<String, String>> env { nullptr }; }; struct WorkerOptions { diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 1c8d91d52..741affa6a 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -910,6 +910,7 @@ pub const VirtualMachine = struct { vm.console, -1, false, + null, ); vm.regular_event_loop.global = vm.global; vm.regular_event_loop.virtual_machine = vm; @@ -1014,6 +1015,7 @@ pub const VirtualMachine = struct { vm.console, -1, smol, + null, ); vm.regular_event_loop.global = vm.global; vm.regular_event_loop.virtual_machine = vm; @@ -1118,6 +1120,7 @@ pub const VirtualMachine = struct { vm.console, @as(i32, @intCast(worker.execution_context_id)), worker.mini, + worker.cpp_worker, ); vm.regular_event_loop.global = vm.global; vm.regular_event_loop.virtual_machine = vm; diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index 73f42b089..bbe708d18 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -341,22 +341,20 @@ pub const WebWorker = struct { if (this.requested_terminate) { return; } - - this.allowed_to_exit = false; _ = this.requestTerminate(); } pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void { - if (this.requested_terminate and value) { + if (this.requested_terminate and !value) { this.parent_poll_ref.unref(this.parent); return; } this.allowed_to_exit = !value; - if (value) { - this.parent_poll_ref.ref(this.parent); - } else { + if (this.allowed_to_exit) { this.parent_poll_ref.unref(this.parent); + } else { + this.parent_poll_ref.ref(this.parent); } if (this.vm) |vm| { @@ -404,6 +402,7 @@ pub const WebWorker = struct { } fn requestTerminate(this: *WebWorker) bool { + this.setRef(false); var vm = this.vm orelse { this.requested_terminate = true; return false; diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index a0b757708..fd099a023 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -1,4 +1,26 @@ -const { MessageChannel, BroadcastChannel } = globalThis; +// import type { Readable, Writable } from "node:stream"; +// import type { WorkerOptions } from "node:worker_threads"; +declare const self: typeof globalThis; +type WebWorker = InstanceType<typeof globalThis.Worker>; + +const EventEmitter = require("node:events"); +const { throwNotImplemented } = require("../internal/shared"); + +const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis; +const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); + +const isMainThread = Bun.isMainThread; +let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads"); + +type NodeWorkerOptions = import("node:worker_threads").WorkerOptions; + +const emittedWarnings = new Set(); +function emitWarning(type, message) { + if (emittedWarnings.has(type)) return; + emittedWarnings.add(type); + // process.emitWarning(message); // our printing is bad + console.warn("[bun] Warning:", message); +} function injectFakeEmitter(Class) { function messageEventHandler(event: MessageEvent) { @@ -76,10 +98,6 @@ injectFakeEmitter(_MessagePort); const MessagePort = _MessagePort; -const EventEmitter = require("node:events"); -const isMainThread = Bun.isMainThread; -let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads"); -let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); let resourceLimits = {}; let workerData = _workerData; @@ -111,7 +129,7 @@ function fakeParentPort() { }); Object.defineProperty(fake, "postMessage", { - value(...args: any[]) { + value(...args: [any, any]) { return self.postMessage(...args); }, }); @@ -154,6 +172,7 @@ function fakeParentPort() { return fake; } +let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); function getEnvironmentData() { return process.env; @@ -164,27 +183,22 @@ function setEnvironmentData(env: any) { } function markAsUntransferable() { - throw new Error("markAsUntransferable is not implemented"); + throwNotImplemented("worker_threads.markAsUntransferable"); } function moveMessagePortToContext() { - throw new Error("moveMessagePortToContext is not implemented"); + throwNotImplemented("worker_threads.moveMessagePortToContext"); } -const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); - -const WebWorker = globalThis.Worker; class Worker extends EventEmitter { - #worker: globalThis.Worker; + #worker: WebWorker; #performance; #onExitPromise = undefined; - constructor(filename: string, options: any = {}) { + constructor(filename: string, options: NodeWorkerOptions = {}) { super(); - - this.#worker = new WebWorker(filename, { - ...options, - }); + // TODO: stdin, stdout, stderr, and other node specific options. + this.#worker = new WebWorker(filename, options); this.#worker.addEventListener("close", this.#onClose.bind(this)); this.#worker.addEventListener("error", this.#onError.bind(this)); this.#worker.addEventListener("message", this.#onMessage.bind(this)); @@ -218,7 +232,12 @@ class Worker extends EventEmitter { get performance() { return (this.#performance ??= { eventLoopUtilization() { - return {}; + emitWarning("performance", "worker_threads.Worker.performance is not implemented."); + return { + idle: 0, + active: 0, + utilization: 0, + }; }, }); } @@ -232,16 +251,16 @@ class Worker extends EventEmitter { this.#worker.addEventListener( "close", event => { - // TODO: exit code - resolve(0); + resolve(event.code); }, { once: true }, ); + this.#worker.terminate(); return (this.#onExitPromise = promise); } - postMessage(...args: any[]) { + postMessage(...args: [any, any]) { return this.#worker.postMessage(...args); } @@ -261,7 +280,7 @@ class Worker extends EventEmitter { #onMessageError(event: Event) { // TODO: is this right? - this.emit("messageerror", event.error || event); + this.emit("messageerror", (event as any).error || event); } #onOpen() { @@ -269,8 +288,8 @@ class Worker extends EventEmitter { this.emit("online"); } - getHeapSnapshot() { - return {}; + async getHeapSnapshot() { + throwNotImplemented("worker_threads.Worker.getHeapSnapshot"); } } export default { @@ -291,6 +310,5 @@ export default { moveMessagePortToContext, receiveMessageOnPort, SHARE_ENV, - threadId, }; diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h index 0ad5c4c97..7969c4cc4 100644 --- a/src/js/out/InternalModuleRegistryConstants.h +++ b/src/js/out/InternalModuleRegistryConstants.h @@ -189,7 +189,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const // // -static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; +static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; // // @@ -414,7 +414,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const // // -static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; +static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; // // @@ -640,7 +640,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const // // -static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; +static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; // // diff --git a/src/js/private.d.ts b/src/js/private.d.ts index 276ac136c..2667df631 100644 --- a/src/js/private.d.ts +++ b/src/js/private.d.ts @@ -193,6 +193,12 @@ interface BunLazyModules { set: typeof import("./builtins/AsyncContext").setAsyncContext; cleanupLater: () => void; }; + "worker_threads": [ + // + workerData: any, + threadId: number, + _receiveMessageOnPort: (port: MessagePort) => any, + ]; // ReadableStream related [1]: any; diff --git a/test/js/node/worker_threads/worker_threads.test.ts b/test/js/node/worker_threads/worker_threads.test.ts index 3bee1a50e..ae88add17 100644 --- a/test/js/node/worker_threads/worker_threads.test.ts +++ b/test/js/node/worker_threads/worker_threads.test.ts @@ -50,12 +50,14 @@ test("all properties are present", () => { expect(Worker).toBeDefined(); expect(() => { + // @ts-expect-error no args wt.markAsUntransferable(); - }).toThrow("not implemented"); + }).toThrow("not yet implemented"); expect(() => { + // @ts-expect-error no args wt.moveMessagePortToContext(); - }).toThrow("not implemented"); + }).toThrow("not yet implemented"); }); test("receiveMessageOnPort works across threads", () => { diff --git a/test/js/web/many-messages-event-loop.js b/test/js/web/many-messages-event-loop.js new file mode 100644 index 000000000..2eaba2568 --- /dev/null +++ b/test/js/web/many-messages-event-loop.js @@ -0,0 +1,11 @@ +const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href); + +worker.postMessage("initial message"); +worker.addEventListener("message", ({ data }) => { + if (data.done) { + console.log("done"); + worker.terminate(); + } else { + worker.postMessage({ i: data.i + 1 }); + } +}); diff --git a/test/js/web/worker-fixture-env.js b/test/js/web/worker-fixture-env.js new file mode 100644 index 000000000..203ed3141 --- /dev/null +++ b/test/js/web/worker-fixture-env.js @@ -0,0 +1,12 @@ +import * as worker_threads from "worker_threads"; + +if (worker_threads.isMainThread) throw new Error("worker_threads.isMainThread is wrong"); + +Bun.inspect(process.env); + +onmessage = ({}) => { + postMessage({ + env: process.env, + hello: process.env.hello, + }); +}; diff --git a/test/js/web/worker-fixture-many-messages.js b/test/js/web/worker-fixture-many-messages.js new file mode 100644 index 000000000..7a8f1d910 --- /dev/null +++ b/test/js/web/worker-fixture-many-messages.js @@ -0,0 +1,12 @@ +addEventListener("message", e => { + const data = e.data; + // console.log("worker", data); + + if (data === "initial message") { + postMessage({ i: 0 }); + } else if (data.i > 50) { + postMessage({ done: true }); + } else { + postMessage({ i: data.i + 1 }); + } +}); diff --git a/test/js/web/worker.test.ts b/test/js/web/worker.test.ts index 87dcf0911..1babfbcc3 100644 --- a/test/js/web/worker.test.ts +++ b/test/js/web/worker.test.ts @@ -1,4 +1,6 @@ import { expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; +import path from "path"; test("worker", done => { const worker = new Worker(new URL("worker-fixture.js", import.meta.url).href, { @@ -10,8 +12,115 @@ test("worker", done => { done(e.error); }; worker.onmessage = e => { - expect(e.data).toEqual("initial message"); + try { + expect(e.data).toEqual("initial message"); + } catch (e) { + done(e); + } finally { + worker.terminate(); + done(); + } worker.terminate(); done(); }; }); + +test("worker-env", done => { + const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, { + env: { + hello: "world", + another_key: 123 as any, + }, + }); + worker.postMessage("hello"); + worker.onerror = e => { + done(e.error); + }; + worker.onmessage = e => { + try { + expect(e.data).toEqual({ + env: { + hello: "world", + another_key: "123", + }, + hello: "world", + }); + } catch (e) { + done(e); + } finally { + worker.terminate(); + done(); + } + }; +}); + +test("worker-env with a lot of properties", done => { + const obj: any = {}; + + for (let i = 0; i < 1000; i++) { + obj["prop " + i] = Math.random().toString(); + } + + const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, { + env: obj, + }); + worker.postMessage("hello"); + worker.onerror = e => { + done(e.error); + }; + worker.onmessage = e => { + try { + expect(e.data).toEqual({ + env: obj, + hello: undefined, + }); + } catch (e) { + done(e); + } finally { + worker.terminate(); + done(); + } + }; +}); + +test("sending 50 messages should just work", done => { + const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href, {}); + + worker.postMessage("initial message"); + worker.addEventListener("message", ({ data }) => { + if (data.done) { + worker.terminate(); + done(); + } else { + worker.postMessage({ i: data.i + 1 }); + } + }); +}); + +test("worker by default will not close the event loop", done => { + const x = Bun.spawn({ + cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.js")], + env: bunEnv, + stdio: ["inherit", "pipe", "inherit"], + }); + + const timer = setTimeout(() => { + x.kill(); + done(new Error("timeout")); + }, 1000); + + x.exited.then(async code => { + clearTimeout(timer); + if (code !== 0) { + done(new Error("exited with non-zero code")); + } else { + const text = await new Response(x.stdout).text(); + if (text.includes("done")) { + console.log({ text }); + done(new Error("event loop killed early")); + } else { + done(); + } + } + }); +}); |