aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar dave caruso <me@paperdave.net> 2023-08-07 23:58:38 -0700
committerGravatar GitHub <noreply@github.com> 2023-08-07 23:58:38 -0700
commit5497accbdb14da9e361175ad1cd074731b7f8eeb (patch)
tree994424356f9059b1171f410a9689f2d00bf89f6b
parent182e600eb79655e85b3f0371bc46fc4de8e70094 (diff)
downloadbun-5497accbdb14da9e361175ad1cd074731b7f8eeb.tar.gz
bun-5497accbdb14da9e361175ad1cd074731b7f8eeb.tar.zst
bun-5497accbdb14da9e361175ad1cd074731b7f8eeb.zip
Add `env` option for `node:worker_threads` (#4052)
* almost works * env stuff * test fixes * wtfmove * ok * ok * ref by default * it now does the ref stuff by default * cool
-rw-r--r--packages/bun-types/globals.d.ts166
-rw-r--r--packages/bun-types/perf_hooks.d.ts15
-rw-r--r--packages/bun-types/tests/worker.test-d.ts40
-rw-r--r--packages/bun-types/worker_threads.d.ts282
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp16
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.h3
-rw-r--r--src/bun.js/bindings/exports.zig3
-rw-r--r--src/bun.js/bindings/headers.h2
-rw-r--r--src/bun.js/bindings/headers.zig2
-rw-r--r--src/bun.js/bindings/webcore/JSWorker.cpp117
-rw-r--r--src/bun.js/bindings/webcore/Worker.cpp13
-rw-r--r--src/bun.js/bindings/webcore/Worker.h2
-rw-r--r--src/bun.js/bindings/webcore/WorkerOptions.h1
-rw-r--r--src/bun.js/javascript.zig3
-rw-r--r--src/bun.js/web_worker.zig11
-rw-r--r--src/js/node/worker_threads.ts68
-rw-r--r--src/js/out/InternalModuleRegistryConstants.h6
-rw-r--r--src/js/private.d.ts6
-rw-r--r--test/js/node/worker_threads/worker_threads.test.ts6
-rw-r--r--test/js/web/many-messages-event-loop.js11
-rw-r--r--test/js/web/worker-fixture-env.js12
-rw-r--r--test/js/web/worker-fixture-many-messages.js12
-rw-r--r--test/js/web/worker.test.ts111
23 files changed, 723 insertions, 185 deletions
diff --git a/packages/bun-types/globals.d.ts b/packages/bun-types/globals.d.ts
index 5ab3ac6fa..d03fe0232 100644
--- a/packages/bun-types/globals.d.ts
+++ b/packages/bun-types/globals.d.ts
@@ -373,8 +373,170 @@ declare type MessageChannel = import("worker_threads").MessageChannel;
declare var BroadcastChannel: typeof import("worker_threads").BroadcastChannel;
declare type BroadcastChannel = import("worker_threads").BroadcastChannel;
-declare var Worker: typeof import("worker_threads").Worker;
-declare type Worker = typeof import("worker_threads").Worker;
+interface AbstractWorkerEventMap {
+ error: ErrorEvent;
+}
+
+interface WorkerEventMap extends AbstractWorkerEventMap {
+ message: MessageEvent;
+ messageerror: MessageEvent;
+ close: CloseEvent;
+}
+
+interface AbstractWorker {
+ /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ServiceWorker/error_event) */
+ onerror: ((this: AbstractWorker, ev: ErrorEvent) => any) | null;
+ addEventListener<K extends keyof AbstractWorkerEventMap>(
+ type: K,
+ listener: (this: AbstractWorker, ev: AbstractWorkerEventMap[K]) => any,
+ options?: boolean | AddEventListenerOptions,
+ ): void;
+ addEventListener(
+ type: string,
+ listener: EventListenerOrEventListenerObject,
+ options?: boolean | AddEventListenerOptions,
+ ): void;
+ removeEventListener<K extends keyof AbstractWorkerEventMap>(
+ type: K,
+ listener: (this: AbstractWorker, ev: AbstractWorkerEventMap[K]) => any,
+ options?: boolean | EventListenerOptions,
+ ): void;
+ removeEventListener(
+ type: string,
+ listener: EventListenerOrEventListenerObject,
+ options?: boolean | EventListenerOptions,
+ ): void;
+}
+
+/**
+ * Bun's Web Worker constructor supports some extra options on top of the API browsers have.
+ */
+interface WorkerOptions {
+ /**
+ * A string specifying an identifying name for the DedicatedWorkerGlobalScope representing the scope of
+ * the worker, which is mainly useful for debugging purposes.
+ */
+ name?: string;
+
+ /**
+ * Use less memory, but make the worker slower.
+ *
+ * Internally, this sets the heap size configuration in JavaScriptCore to be
+ * the small heap instead of the large heap.
+ */
+ smol?: boolean;
+
+ /**
+ * When `true`, the worker will keep the parent thread alive until the worker is terminated or `unref`'d.
+ * When `false`, the worker will not keep the parent thread alive.
+ *
+ * By default, this is `false`.
+ */
+ ref?: boolean;
+
+ /**
+ * In Bun, this does nothing.
+ */
+ type?: string;
+
+ /**
+ * List of arguments which would be stringified and appended to
+ * `Bun.argv` / `process.argv` in the worker. This is mostly similar to the `data`
+ * but the values will be available on the global `Bun.argv` as if they
+ * were passed as CLI options to the script.
+ */
+ // argv?: any[] | undefined;
+
+ /** If `true` and the first argument is a string, interpret the first argument to the constructor as a script that is executed once the worker is online. */
+ // eval?: boolean | undefined;
+
+ /**
+ * If set, specifies the initial value of process.env inside the Worker thread. As a special value, worker.SHARE_ENV may be used to specify that the parent thread and the child thread should share their environment variables; in that case, changes to one thread's process.env object affect the other thread as well. Default: process.env.
+ */
+ env?:
+ | Record<string, string>
+ | typeof import("node:worker_threads")["SHARE_ENV"]
+ | undefined;
+
+ /**
+ * In Bun, this does nothing.
+ */
+ credentials?: string;
+
+ /**
+ * @default true
+ */
+ // trackUnmanagedFds?: boolean;
+
+ // resourceLimits?: import("worker_threads").ResourceLimits;
+}
+
+interface Worker extends EventTarget, AbstractWorker {
+ /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/message_event) */
+ onmessage: ((this: Worker, ev: MessageEvent) => any) | null;
+ /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/messageerror_event) */
+ onmessageerror: ((this: Worker, ev: MessageEvent) => any) | null;
+ /**
+ * Clones message and transmits it to worker's global environment. transfer can be passed as a list of objects that are to be transferred rather than cloned.
+ *
+ * [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/postMessage)
+ */
+ postMessage(message: any, transfer: Transferable[]): void;
+ postMessage(message: any, options?: StructuredSerializeOptions): void;
+ /**
+ * Aborts worker's associated global environment.
+ *
+ * [MDN Reference](https://developer.mozilla.org/docs/Web/API/Worker/terminate)
+ */
+ terminate(): void;
+ addEventListener<K extends keyof WorkerEventMap>(
+ type: K,
+ listener: (this: Worker, ev: WorkerEventMap[K]) => any,
+ options?: boolean | AddEventListenerOptions,
+ ): void;
+ addEventListener(
+ type: string,
+ listener: EventListenerOrEventListenerObject,
+ options?: boolean | AddEventListenerOptions,
+ ): void;
+ removeEventListener<K extends keyof WorkerEventMap>(
+ type: K,
+ listener: (this: Worker, ev: WorkerEventMap[K]) => any,
+ options?: boolean | EventListenerOptions,
+ ): void;
+ removeEventListener(
+ type: string,
+ listener: EventListenerOrEventListenerObject,
+ options?: boolean | EventListenerOptions,
+ ): void;
+
+ /**
+ * Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker does _not_ let the program exit if it's the only active handle left (the default
+ * behavior). If the worker is `ref()`ed, calling `ref()` again has
+ * no effect.
+ * @since v10.5.0
+ */
+ ref(): void;
+ /**
+ * Calling `unref()` on a worker allows the thread to exit if this is the only
+ * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
+ * @since v10.5.0
+ */
+ unref(): void;
+
+ threadId: number;
+}
+
+declare var Worker: {
+ prototype: Worker;
+ new (scriptURL: string | URL, options?: WorkerOptions): Worker;
+ /**
+ * This is the cloned value of the `data` property passed to `new Worker()`
+ *
+ * This is Bun's equivalent of `workerData` in Node.js.
+ */
+ data: any;
+};
interface EncodeIntoResult {
/**
diff --git a/packages/bun-types/perf_hooks.d.ts b/packages/bun-types/perf_hooks.d.ts
index 792223e6b..587602a26 100644
--- a/packages/bun-types/perf_hooks.d.ts
+++ b/packages/bun-types/perf_hooks.d.ts
@@ -145,16 +145,19 @@ declare module "perf_hooks" {
// */
// readonly v8Start: number;
// }
- // interface EventLoopUtilization {
- // idle: number;
- // active: number;
- // utilization: number;
- // }
+ interface EventLoopUtilization {
+ idle: number;
+ active: number;
+ utilization: number;
+ }
// /**
// * @param util1 The result of a previous call to eventLoopUtilization()
// * @param util2 The result of a previous call to eventLoopUtilization() prior to util1
// */
- // type EventLoopUtilityFunction = (util1?: EventLoopUtilization, util2?: EventLoopUtilization) => EventLoopUtilization;
+ type EventLoopUtilityFunction = (
+ util1?: EventLoopUtilization,
+ util2?: EventLoopUtilization,
+ ) => EventLoopUtilization;
// interface MarkOptions {
// /**
// * Additional optional detail to include with the mark.
diff --git a/packages/bun-types/tests/worker.test-d.ts b/packages/bun-types/tests/worker.test-d.ts
index 8a2ec7883..dc457ccaa 100644
--- a/packages/bun-types/tests/worker.test-d.ts
+++ b/packages/bun-types/tests/worker.test-d.ts
@@ -1,18 +1,29 @@
-import { Worker } from "node:worker_threads";
+import { Worker as NodeWorker } from "node:worker_threads";
+import * as tsd from "tsd";
-const _workerthread = new Worker("./worker.js");
-_workerthread;
-const worker = new Worker("./worker.ts");
-worker.addEventListener("message", (event: MessageEvent) => {
- console.log("Message from worker:", event.data);
+const webWorker = new Worker("./worker.js");
+
+webWorker.addEventListener("message", event => {
+ tsd.expectType<MessageEvent>(event);
+});
+webWorker.addEventListener("error", event => {
+ tsd.expectType<ErrorEvent>(event);
+});
+webWorker.addEventListener("messageerror", event => {
+ tsd.expectType<MessageEvent>(event);
+});
+
+const nodeWorker = new NodeWorker("./worker.ts");
+nodeWorker.on("message", event => {
+ console.log("Message from worker:", event);
});
-worker.postMessage("Hello from main thread!");
+nodeWorker.postMessage("Hello from main thread!");
const workerURL = new URL("worker.ts", import.meta.url).href;
const _worker2 = new Worker(workerURL);
-worker.postMessage("hello");
-worker.onmessage = event => {
+nodeWorker.postMessage("hello");
+webWorker.onmessage = event => {
console.log(event.data);
};
@@ -20,15 +31,20 @@ worker.onmessage = event => {
postMessage({ hello: "world" });
// On the main thread
-worker.postMessage({ hello: "world" });
+nodeWorker.postMessage({ hello: "world" });
// ...some time later
-worker.terminate();
+nodeWorker.terminate();
// Bun.pathToFileURL
const _worker3 = new Worker(new URL("worker.ts", import.meta.url).href, {
ref: true,
smol: true,
+ credentials: "",
+ name: "a name",
+ env: {
+ envValue: "hello",
+ },
});
-export { worker, _worker2, _worker3 };
+export { nodeWorker as worker, _worker2, _worker3 };
diff --git a/packages/bun-types/worker_threads.d.ts b/packages/bun-types/worker_threads.d.ts
index 49f844ab0..ddc59fec5 100644
--- a/packages/bun-types/worker_threads.d.ts
+++ b/packages/bun-types/worker_threads.d.ts
@@ -53,9 +53,10 @@
*/
declare module "worker_threads" {
// import { Blob } from "node:buffer";
+ import { Readable, Writable } from "node:stream";
import { Context } from "node:vm";
import { EventEmitter } from "node:events";
- // import { EventLoopUtilityFunction } from "node:perf_hooks";
+ import { EventLoopUtilityFunction } from "node:perf_hooks";
// import { FileHandle } from "node:fs/promises";
// import { Readable, Writable } from "node:stream";
import { URL } from "node:url";
@@ -67,9 +68,9 @@ declare module "worker_threads" {
const threadId: number;
const workerData: any;
- // interface WorkerPerformance {
- // eventLoopUtilization: EventLoopUtilityFunction;
- // }
+ interface WorkerPerformance {
+ eventLoopUtilization: EventLoopUtilityFunction;
+ }
type TransferListItem =
| ArrayBuffer
| MessagePort
@@ -251,28 +252,73 @@ declare module "worker_threads" {
}
interface WorkerOptions {
/**
+ * A string specifying an identifying name for the DedicatedWorkerGlobalScope representing the scope of
+ * the worker, which is mainly useful for debugging purposes.
+ */
+ name?: string;
+
+ /**
+ * Use less memory, but make the worker slower.
+ *
+ * Internally, this sets the heap size configuration in JavaScriptCore to be
+ * the small heap instead of the large heap.
+ */
+ smol?: boolean;
+
+ /**
+ * When `true`, the worker will keep the parent thread alive until the worker is terminated or `unref`'d.
+ * When `false`, the worker will not keep the parent thread alive.
+ *
+ * By default, this is `false`.
+ */
+ ref?: boolean;
+
+ /**
+ * In Bun, this does nothing.
+ */
+ type?: string;
+
+ /**
* List of arguments which would be stringified and appended to
- * `process.argv` in the worker. This is mostly similar to the `workerData`
- * but the values will be available on the global `process.argv` as if they
+ * `Bun.argv` / `process.argv` in the worker. This is mostly similar to the `data`
+ * but the values will be available on the global `Bun.argv` as if they
* were passed as CLI options to the script.
*/
- argv?: any[] | undefined;
- env?: Record<string, string> | typeof SHARE_ENV | undefined;
- eval?: boolean | undefined;
- workerData?: any;
- stdin?: boolean | undefined;
- stdout?: boolean | undefined;
- stderr?: boolean | undefined;
- execArgv?: string[] | undefined;
- resourceLimits?: ResourceLimits | undefined;
+ // argv?: any[] | undefined;
+
+ /** If `true` and the first argument is a string, interpret the first argument to the constructor as a script that is executed once the worker is online. */
+ // eval?: boolean | undefined;
+
/**
- * Additional data to send in the first worker message.
+ * If set, specifies the initial value of process.env inside the Worker thread. As a special value, worker.SHARE_ENV may be used to specify that the parent thread and the child thread should share their environment variables; in that case, changes to one thread's process.env object affect the other thread as well. Default: process.env.
*/
- transferList?: TransferListItem[] | undefined;
+ env?:
+ | Record<string, string>
+ | typeof import("node:worker_threads")["SHARE_ENV"]
+ | undefined;
+
+ /**
+ * In Bun, this does nothing.
+ */
+ credentials?: string;
+
/**
* @default true
*/
- trackUnmanagedFds?: boolean | undefined;
+ // trackUnmanagedFds?: boolean;
+
+ workerData?: any;
+
+ /**
+ * An array of objects that are transferred rather than cloned when being passed between threads.
+ */
+ transferList?: import("worker_threads").TransferListItem[];
+
+ // resourceLimits?: import("worker_threads").ResourceLimits;
+ // stdin?: boolean | undefined;
+ // stdout?: boolean | undefined;
+ // stderr?: boolean | undefined;
+ // execArgv?: string[] | undefined;
}
interface ResourceLimits {
/**
@@ -356,76 +402,164 @@ declare module "worker_threads" {
* ```
* @since v10.5.0
*/
- interface Worker extends EventTarget {
- onerror: ((this: Worker, ev: ErrorEvent) => any) | null;
- onmessage: ((this: Worker, ev: MessageEvent) => any) | null;
- onmessageerror: ((this: Worker, ev: MessageEvent) => any) | null;
-
- addEventListener<K extends keyof WorkerEventMap>(
- type: K,
- listener: (this: Worker, ev: WorkerEventMap[K]) => any,
- options?: boolean | AddEventListenerOptions,
- ): void;
-
- removeEventListener<K extends keyof WorkerEventMap>(
- type: K,
- listener: (this: Worker, ev: WorkerEventMap[K]) => any,
- options?: boolean | EventListenerOptions,
- ): void;
-
- terminate(): void;
-
- postMessage(message: any, transfer?: Transferable[]): void;
-
+ class Worker extends EventEmitter {
/**
- * Keep the process alive until the worker is terminated or `unref`'d
+ * If `stdin: true` was passed to the `Worker` constructor, this is a
+ * writable stream. The data written to this stream will be made available in
+ * the worker thread as `process.stdin`.
+ * @since v10.5.0
*/
- ref(): void;
+ readonly stdin: Writable | null;
/**
- * Undo a previous `ref()`
+ * This is a readable stream which contains data written to `process.stdout` inside the worker thread. If `stdout: true` was not passed to the `Worker` constructor, then data is piped to the
+ * parent thread's `process.stdout` stream.
+ * @since v10.5.0
*/
- unref(): void;
-
+ readonly stdout: Readable;
/**
- * Unique per-process thread ID. Main thread ID is always `0`.
+ * This is a readable stream which contains data written to `process.stderr` inside the worker thread. If `stderr: true` was not passed to the `Worker` constructor, then data is piped to the
+ * parent thread's `process.stderr` stream.
+ * @since v10.5.0
+ */
+ readonly stderr: Readable;
+ /**
+ * An integer identifier for the referenced thread. Inside the worker thread,
+ * it is available as `require('node:worker_threads').threadId`.
+ * This value is unique for each `Worker` instance inside a single process.
+ * @since v10.5.0
*/
readonly threadId: number;
- }
- var Worker: {
- prototype: Worker;
- new (stringUrl: string | URL, options?: WorkerOptions): Worker;
- };
- interface WorkerOptions {
- name?: string;
-
/**
- * Use less memory, but make the worker slower.
+ * Provides the set of JS engine resource constraints for this Worker thread.
+ * If the `resourceLimits` option was passed to the `Worker` constructor,
+ * this matches its values.
*
- * Internally, this sets the heap size configuration in JavaScriptCore to be
- * the small heap instead of the large heap.
+ * If the worker has stopped, the return value is an empty object.
+ * @since v13.2.0, v12.16.0
*/
- smol?: boolean;
-
+ readonly resourceLimits?: ResourceLimits | undefined;
/**
- * When `true`, the worker will keep the parent thread alive until the worker is terminated or `unref`'d.
- * When `false`, the worker will not keep the parent thread alive.
- *
- * By default, this is `false`.
+ * An object that can be used to query performance information from a worker
+ * instance. Similar to `perf_hooks.performance`.
+ * @since v15.1.0, v14.17.0, v12.22.0
*/
- ref?: boolean;
-
+ readonly performance: WorkerPerformance;
/**
- * Does nothing in Bun
+ * @param filename The path to the Worker’s main script or module.
+ * Must be either an absolute path or a relative path (i.e. relative to the current working directory) starting with ./ or ../,
+ * or a WHATWG URL object using file: protocol. If options.eval is true, this is a string containing JavaScript code rather than a path.
*/
- type?: string;
- }
-
- interface WorkerEventMap {
- message: MessageEvent;
- messageerror: MessageEvent;
- error: ErrorEvent;
- open: Event;
- close: Event;
+ constructor(filename: string | URL, options?: WorkerOptions);
+ /**
+ * Send a message to the worker that is received via `require('node:worker_threads').parentPort.on('message')`.
+ * See `port.postMessage()` for more details.
+ * @since v10.5.0
+ */
+ postMessage(
+ value: any,
+ transferList?: ReadonlyArray<TransferListItem>,
+ ): void;
+ /**
+ * Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker does _not_ let the program exit if it's the only active handle left (the default
+ * behavior). If the worker is `ref()`ed, calling `ref()` again has
+ * no effect.
+ * @since v10.5.0
+ */
+ ref(): void;
+ /**
+ * Calling `unref()` on a worker allows the thread to exit if this is the only
+ * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
+ * @since v10.5.0
+ */
+ unref(): void;
+ /**
+ * Stop all JavaScript execution in the worker thread as soon as possible.
+ * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
+ * @since v10.5.0
+ */
+ terminate(): Promise<number>;
+ /**
+ * Returns a readable stream for a V8 snapshot of the current state of the Worker.
+ * See `v8.getHeapSnapshot()` for more details.
+ *
+ * If the Worker thread is no longer running, which may occur before the `'exit' event` is emitted, the returned `Promise` is rejected
+ * immediately with an `ERR_WORKER_NOT_RUNNING` error.
+ * @since v13.9.0, v12.17.0
+ * @return A promise for a Readable Stream containing a V8 heap snapshot
+ */
+ getHeapSnapshot(): Promise<Readable>;
+ addListener(event: "error", listener: (err: Error) => void): this;
+ addListener(event: "exit", listener: (exitCode: number) => void): this;
+ addListener(event: "message", listener: (value: any) => void): this;
+ addListener(event: "messageerror", listener: (error: Error) => void): this;
+ addListener(event: "online", listener: () => void): this;
+ addListener(
+ event: string | symbol,
+ listener: (...args: any[]) => void,
+ ): this;
+ emit(event: "error", err: Error): boolean;
+ emit(event: "exit", exitCode: number): boolean;
+ emit(event: "message", value: any): boolean;
+ emit(event: "messageerror", error: Error): boolean;
+ emit(event: "online"): boolean;
+ emit(event: string | symbol, ...args: any[]): boolean;
+ on(event: "error", listener: (err: Error) => void): this;
+ on(event: "exit", listener: (exitCode: number) => void): this;
+ on(event: "message", listener: (value: any) => void): this;
+ on(event: "messageerror", listener: (error: Error) => void): this;
+ on(event: "online", listener: () => void): this;
+ on(event: string | symbol, listener: (...args: any[]) => void): this;
+ once(event: "error", listener: (err: Error) => void): this;
+ once(event: "exit", listener: (exitCode: number) => void): this;
+ once(event: "message", listener: (value: any) => void): this;
+ once(event: "messageerror", listener: (error: Error) => void): this;
+ once(event: "online", listener: () => void): this;
+ once(event: string | symbol, listener: (...args: any[]) => void): this;
+ prependListener(event: "error", listener: (err: Error) => void): this;
+ prependListener(event: "exit", listener: (exitCode: number) => void): this;
+ prependListener(event: "message", listener: (value: any) => void): this;
+ prependListener(
+ event: "messageerror",
+ listener: (error: Error) => void,
+ ): this;
+ prependListener(event: "online", listener: () => void): this;
+ prependListener(
+ event: string | symbol,
+ listener: (...args: any[]) => void,
+ ): this;
+ prependOnceListener(event: "error", listener: (err: Error) => void): this;
+ prependOnceListener(
+ event: "exit",
+ listener: (exitCode: number) => void,
+ ): this;
+ prependOnceListener(event: "message", listener: (value: any) => void): this;
+ prependOnceListener(
+ event: "messageerror",
+ listener: (error: Error) => void,
+ ): this;
+ prependOnceListener(event: "online", listener: () => void): this;
+ prependOnceListener(
+ event: string | symbol,
+ listener: (...args: any[]) => void,
+ ): this;
+ removeListener(event: "error", listener: (err: Error) => void): this;
+ removeListener(event: "exit", listener: (exitCode: number) => void): this;
+ removeListener(event: "message", listener: (value: any) => void): this;
+ removeListener(
+ event: "messageerror",
+ listener: (error: Error) => void,
+ ): this;
+ removeListener(event: "online", listener: () => void): this;
+ removeListener(
+ event: string | symbol,
+ listener: (...args: any[]) => void,
+ ): this;
+ off(event: "error", listener: (err: Error) => void): this;
+ off(event: "exit", listener: (exitCode: number) => void): this;
+ off(event: "message", listener: (value: any) => void): this;
+ off(event: "messageerror", listener: (error: Error) => void): this;
+ off(event: "online", listener: () => void): this;
+ off(event: string | symbol, listener: (...args: any[]) => void): this;
}
interface BroadcastChannelEventMap {
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index 3f4a8d044..3de7d3daa 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -429,7 +429,7 @@ static String computeErrorInfo(JSC::VM& vm, Vector<StackFrame>& stackTrace, unsi
}
extern "C" JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* globalObjectClass, int count,
- void* console_client, int32_t executionContextId, bool miniMode)
+ void* console_client, int32_t executionContextId, bool miniMode, void* worker_ptr)
{
auto heapSize = miniMode ? JSC::HeapType::Small : JSC::HeapType::Large;
@@ -448,6 +448,20 @@ extern "C" JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* globalObje
vm,
Zig::GlobalObject::createStructure(vm, JSC::JSGlobalObject::create(vm, JSC::JSGlobalObject::createStructure(vm, JSC::jsNull())), JSC::jsNull()),
static_cast<ScriptExecutionContextIdentifier>(executionContextId));
+
+ if (auto* worker = static_cast<WebCore::Worker*>(worker_ptr)) {
+ auto& options = worker->options();
+ if (options.bun.env) {
+ auto map = *options.bun.env;
+ auto size = map.size();
+ auto env = JSC::constructEmptyObject(globalObject, globalObject->objectPrototype(), size > 63 ? 63 : size);
+ for (auto k : map) {
+ env->putDirect(vm, JSC::Identifier::fromString(vm, WTFMove(k.key)), JSC::jsString(vm, WTFMove(k.value)));
+ }
+ map.clear();
+ globalObject->m_processEnvObject.set(vm, globalObject, env);
+ }
+ }
} else {
globalObject = Zig::GlobalObject::create(
vm,
diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h
index 9d84e5214..1d82cd0f3 100644
--- a/src/bun.js/bindings/ZigGlobalObject.h
+++ b/src/bun.js/bindings/ZigGlobalObject.h
@@ -457,6 +457,8 @@ public:
Bun::JSMockModule mockModule;
+ LazyProperty<JSGlobalObject, JSObject> m_processEnvObject;
+
#include "ZigGeneratedClasses+lazyStructureHeader.h"
private:
@@ -519,7 +521,6 @@ private:
LazyProperty<JSGlobalObject, JSObject> m_JSHTTPSResponseControllerPrototype;
LazyProperty<JSGlobalObject, JSObject> m_navigatorObject;
LazyProperty<JSGlobalObject, JSObject> m_performanceObject;
- LazyProperty<JSGlobalObject, JSObject> m_processEnvObject;
LazyProperty<JSGlobalObject, JSObject> m_processObject;
LazyProperty<JSGlobalObject, JSObject> m_subtleCryptoObject;
LazyProperty<JSGlobalObject, Structure> m_JSHTTPResponseController;
diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig
index 6d57798fd..265ada40a 100644
--- a/src/bun.js/bindings/exports.zig
+++ b/src/bun.js/bindings/exports.zig
@@ -46,8 +46,9 @@ pub const ZigGlobalObject = extern struct {
console: *anyopaque,
context_id: i32,
mini_mode: bool,
+ worker_ptr: ?*anyopaque,
) *JSGlobalObject {
- var global = shim.cppFn("create", .{ class_ref, count, console, context_id, mini_mode });
+ var global = shim.cppFn("create", .{ class_ref, count, console, context_id, mini_mode, worker_ptr });
Backtrace.reloadHandlers() catch unreachable;
return global;
}
diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h
index 9a6d1b72a..63ae6c3a4 100644
--- a/src/bun.js/bindings/headers.h
+++ b/src/bun.js/bindings/headers.h
@@ -579,7 +579,7 @@ ZIG_DECL JSC__JSValue Crypto__timingSafeEqual__slowpath(JSC__JSGlobalObject* arg
#pragma mark - Zig::GlobalObject
-CPP_DECL JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* arg0, int32_t arg1, void* arg2, int32_t arg3, bool arg4);
+CPP_DECL JSC__JSGlobalObject* Zig__GlobalObject__create(JSClassRef* arg0, int32_t arg1, void* arg2, int32_t arg3, bool arg4, void* arg5);
CPP_DECL void* Zig__GlobalObject__getModuleRegistryMap(JSC__JSGlobalObject* arg0);
CPP_DECL bool Zig__GlobalObject__resetModuleRegistryMap(JSC__JSGlobalObject* arg0, void* arg1);
diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig
index d39793c07..2b25c0f5b 100644
--- a/src/bun.js/bindings/headers.zig
+++ b/src/bun.js/bindings/headers.zig
@@ -351,7 +351,7 @@ pub extern fn Reader__intptr__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC_
pub extern fn Crypto__getRandomValues__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) void;
pub extern fn Crypto__randomUUID__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) void;
pub extern fn Crypto__timingSafeEqual__put(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) void;
-pub extern fn Zig__GlobalObject__create(arg0: [*c]JSClassRef, arg1: i32, arg2: ?*anyopaque, arg3: i32, arg4: bool) *bindings.JSGlobalObject;
+pub extern fn Zig__GlobalObject__create(arg0: [*c]JSClassRef, arg1: i32, arg2: ?*anyopaque, arg3: i32, arg4: bool, arg5: ?*anyopaque) *bindings.JSGlobalObject;
pub extern fn Zig__GlobalObject__getModuleRegistryMap(arg0: *bindings.JSGlobalObject) ?*anyopaque;
pub extern fn Zig__GlobalObject__resetModuleRegistryMap(arg0: *bindings.JSGlobalObject, arg1: ?*anyopaque) bool;
pub extern fn Bun__Path__create(arg0: *bindings.JSGlobalObject, arg1: bool) JSC__JSValue;
diff --git a/src/bun.js/bindings/webcore/JSWorker.cpp b/src/bun.js/bindings/webcore/JSWorker.cpp
index 434249068..1b65cba47 100644
--- a/src/bun.js/bindings/webcore/JSWorker.cpp
+++ b/src/bun.js/bindings/webcore/JSWorker.cpp
@@ -128,7 +128,7 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::const
EnsureStillAliveScope argument1 = callFrame->argument(1);
auto options = WorkerOptions {};
- options.bun.unref = true;
+ options.bun.unref = false;
if (JSObject* optionsObject = JSC::jsDynamicCast<JSC::JSObject*>(argument1.value())) {
if (auto nameValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "name"_s))) {
@@ -138,61 +138,93 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::const
}
}
- if (auto* bunObject = optionsObject) {
- if (auto miniModeValue = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "smol"_s))) {
- options.bun.mini = miniModeValue.toBoolean(lexicalGlobalObject);
- RETURN_IF_EXCEPTION(throwScope, encodedJSValue());
- }
+ if (auto miniModeValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "smol"_s))) {
+ options.bun.mini = miniModeValue.toBoolean(lexicalGlobalObject);
+ RETURN_IF_EXCEPTION(throwScope, encodedJSValue());
+ }
- if (auto ref = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "ref"_s))) {
- options.bun.unref = !ref.toBoolean(lexicalGlobalObject);
- RETURN_IF_EXCEPTION(throwScope, encodedJSValue());
- }
+ if (auto ref = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "ref"_s))) {
+ options.bun.unref = !ref.toBoolean(lexicalGlobalObject);
+ RETURN_IF_EXCEPTION(throwScope, encodedJSValue());
+ }
- auto workerData = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "workerData"_s));
- if (!workerData) {
- workerData = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "data"_s));
- }
+ auto workerData = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "workerData"_s));
+ if (!workerData) {
+ workerData = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "data"_s));
+ }
- if (workerData) {
- Vector<RefPtr<MessagePort>> ports;
- Vector<JSC::Strong<JSC::JSObject>> transferList;
-
- if (JSValue transferListValue = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "transferList"_s))) {
- if (transferListValue.isObject()) {
- JSC::JSObject* transferListObject = transferListValue.getObject();
- if (auto* transferListArray = jsDynamicCast<JSC::JSArray*>(transferListObject)) {
- for (unsigned i = 0; i < transferListArray->length(); i++) {
- JSC::JSValue transferListValue = transferListArray->get(lexicalGlobalObject, i);
- if (transferListValue.isObject()) {
- JSC::JSObject* transferListObject = transferListValue.getObject();
- transferList.append(JSC::Strong<JSC::JSObject>(vm, transferListObject));
- }
+ if (workerData) {
+ Vector<RefPtr<MessagePort>> ports;
+ Vector<JSC::Strong<JSC::JSObject>> transferList;
+
+ if (JSValue transferListValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "transferList"_s))) {
+ if (transferListValue.isObject()) {
+ JSC::JSObject* transferListObject = transferListValue.getObject();
+ if (auto* transferListArray = jsDynamicCast<JSC::JSArray*>(transferListObject)) {
+ for (unsigned i = 0; i < transferListArray->length(); i++) {
+ JSC::JSValue transferListValue = transferListArray->get(lexicalGlobalObject, i);
+ if (transferListValue.isObject()) {
+ JSC::JSObject* transferListObject = transferListValue.getObject();
+ transferList.append(JSC::Strong<JSC::JSObject>(vm, transferListObject));
}
}
}
}
+ }
+
+ ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*lexicalGlobalObject, workerData, WTFMove(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
+ if (serialized.hasException()) {
+ WebCore::propagateException(*lexicalGlobalObject, throwScope, serialized.releaseException());
+ return encodedJSValue();
+ }
- ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*lexicalGlobalObject, workerData, WTFMove(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
- if (serialized.hasException()) {
- WebCore::propagateException(*lexicalGlobalObject, throwScope, serialized.releaseException());
+ Vector<TransferredMessagePort> transferredPorts;
+
+ if (!ports.isEmpty()) {
+ auto disentangleResult = MessagePort::disentanglePorts(WTFMove(ports));
+ if (disentangleResult.hasException()) {
+ WebCore::propagateException(*lexicalGlobalObject, throwScope, disentangleResult.releaseException());
return encodedJSValue();
}
+ transferredPorts = disentangleResult.releaseReturnValue();
+ }
- Vector<TransferredMessagePort> transferredPorts;
+ options.bun.data = WTFMove(serialized.releaseReturnValue());
+ options.bun.dataMessagePorts = WTFMove(transferredPorts);
+ }
- if (!ports.isEmpty()) {
- auto disentangleResult = MessagePort::disentanglePorts(WTFMove(ports));
- if (disentangleResult.hasException()) {
- WebCore::propagateException(*lexicalGlobalObject, throwScope, disentangleResult.releaseException());
- return encodedJSValue();
- }
- transferredPorts = disentangleResult.releaseReturnValue();
- }
+ auto* globalObject = jsCast<Zig::GlobalObject*>(lexicalGlobalObject);
+ auto envValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "env"_s));
+ RETURN_IF_EXCEPTION(throwScope, encodedJSValue());
+ JSObject* envObject = nullptr;
+
+ if (envValue && envValue.isCell()) {
+ envObject = jsDynamicCast<JSC::JSObject*>(envValue);
+ } else if (globalObject->m_processEnvObject.isInitialized()) {
+ envObject = globalObject->processEnvObject();
+ }
- options.bun.data = WTFMove(serialized.releaseReturnValue());
- options.bun.dataMessagePorts = WTFMove(transferredPorts);
+ if (envObject) {
+ if (!envObject->staticPropertiesReified()) {
+ envObject->reifyAllStaticProperties(globalObject);
+ RETURN_IF_EXCEPTION(throwScope, {});
}
+
+ JSC::PropertyNameArray keys(vm, JSC::PropertyNameMode::Strings, JSC::PrivateSymbolMode::Exclude);
+ envObject->methodTable()->getOwnPropertyNames(envObject, lexicalGlobalObject, keys, JSC::DontEnumPropertiesMode::Exclude);
+ RETURN_IF_EXCEPTION(throwScope, {});
+
+ HashMap<String, String> env;
+
+ for (const auto& key : keys) {
+ JSValue value = envObject->get(lexicalGlobalObject, key);
+ RETURN_IF_EXCEPTION(throwScope, {});
+ String str = value.toWTFString(lexicalGlobalObject).isolatedCopy();
+ RETURN_IF_EXCEPTION(throwScope, {});
+ env.add(key.impl()->isolatedCopy(), str);
+ }
+
+ options.bun.env = std::make_unique<HashMap<String, String>>(WTFMove(env));
}
}
@@ -390,6 +422,7 @@ JSC_DEFINE_CUSTOM_SETTER(setJSWorker_onerror, (JSGlobalObject * lexicalGlobalObj
static inline JSC::EncodedJSValue jsWorkerPrototypeFunction_terminateBody(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWorker>::ClassParameter castedThis)
{
+ printf("bruh \n");
auto& vm = JSC::getVM(lexicalGlobalObject);
auto throwScope = DECLARE_THROW_SCOPE(vm);
UNUSED_PARAM(throwScope);
diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp
index ea64811e0..f2694b59f 100644
--- a/src/bun.js/bindings/webcore/Worker.cpp
+++ b/src/bun.js/bindings/webcore/Worker.cpp
@@ -61,7 +61,7 @@
#include "MessageEvent.h"
#include <JavaScriptCore/HashMapImplInlines.h>
#include "BunWorkerGlobalScope.h"
-
+#include "CloseEvent.h"
namespace WebCore {
WTF_MAKE_ISO_ALLOCATED_IMPL(Worker);
@@ -210,6 +210,7 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
void Worker::terminate()
{
+ printf("terminate\n");
// m_contextProxy.terminateWorkerGlobalScope();
m_wasTerminated = true;
WebWorker__terminate(impl_);
@@ -339,19 +340,19 @@ void Worker::dispatchError(WTF::String message)
protectedThis->dispatchEvent(event);
});
}
-void Worker::dispatchExit()
+void Worker::dispatchExit(int32_t exitCode)
{
auto* ctx = scriptExecutionContext();
if (!ctx)
return;
- ScriptExecutionContext::postTaskTo(ctx->identifier(), [protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void {
+ ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void {
protectedThis->m_isOnline = false;
protectedThis->m_isClosing = true;
protectedThis->setKeepAlive(false);
if (protectedThis->hasEventListeners(eventNames().closeEvent)) {
- auto event = Event::create(eventNames().closeEvent, Event::CanBubble::No, Event::IsCancelable::No);
+ auto event = CloseEvent::create(exitCode == 0, static_cast<unsigned short>(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s);
protectedThis->dispatchEvent(event);
}
});
@@ -377,6 +378,8 @@ void Worker::forEachWorker(const Function<Function<void(ScriptExecutionContext&)
extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* worker, int32_t exitCode)
{
+ worker->dispatchExit(exitCode);
+
if (globalObject) {
auto* ctx = globalObject->scriptExecutionContext();
if (ctx) {
@@ -398,8 +401,6 @@ extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker*
vm.notifyNeedTermination();
vm.deferredWorkTimer->doWork(vm);
}
-
- worker->dispatchExit();
}
extern "C" void WebWorker__dispatchOnline(Worker* worker, Zig::GlobalObject* globalObject)
{
diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h
index e0f328dbb..a296fa4a8 100644
--- a/src/bun.js/bindings/webcore/Worker.h
+++ b/src/bun.js/bindings/webcore/Worker.h
@@ -94,7 +94,7 @@ public:
void drainEvents();
void dispatchOnline(Zig::GlobalObject* workerGlobalObject);
void dispatchError(WTF::String message);
- void dispatchExit();
+ void dispatchExit(int32_t exitCode);
ScriptExecutionContext* scriptExecutionContext() const final { return ContextDestructionObserver::scriptExecutionContext(); }
ScriptExecutionContextIdentifier clientIdentifier() const { return m_clientIdentifier; }
WorkerOptions& options() { return m_options; }
diff --git a/src/bun.js/bindings/webcore/WorkerOptions.h b/src/bun.js/bindings/webcore/WorkerOptions.h
index 0c307d18e..79eabef98 100644
--- a/src/bun.js/bindings/webcore/WorkerOptions.h
+++ b/src/bun.js/bindings/webcore/WorkerOptions.h
@@ -12,6 +12,7 @@ struct BunOptions {
bool unref { false };
RefPtr<SerializedScriptValue> data;
Vector<TransferredMessagePort> dataMessagePorts;
+ std::unique_ptr<HashMap<String, String>> env { nullptr };
};
struct WorkerOptions {
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index 1c8d91d52..741affa6a 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -910,6 +910,7 @@ pub const VirtualMachine = struct {
vm.console,
-1,
false,
+ null,
);
vm.regular_event_loop.global = vm.global;
vm.regular_event_loop.virtual_machine = vm;
@@ -1014,6 +1015,7 @@ pub const VirtualMachine = struct {
vm.console,
-1,
smol,
+ null,
);
vm.regular_event_loop.global = vm.global;
vm.regular_event_loop.virtual_machine = vm;
@@ -1118,6 +1120,7 @@ pub const VirtualMachine = struct {
vm.console,
@as(i32, @intCast(worker.execution_context_id)),
worker.mini,
+ worker.cpp_worker,
);
vm.regular_event_loop.global = vm.global;
vm.regular_event_loop.virtual_machine = vm;
diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig
index 73f42b089..bbe708d18 100644
--- a/src/bun.js/web_worker.zig
+++ b/src/bun.js/web_worker.zig
@@ -341,22 +341,20 @@ pub const WebWorker = struct {
if (this.requested_terminate) {
return;
}
-
- this.allowed_to_exit = false;
_ = this.requestTerminate();
}
pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void {
- if (this.requested_terminate and value) {
+ if (this.requested_terminate and !value) {
this.parent_poll_ref.unref(this.parent);
return;
}
this.allowed_to_exit = !value;
- if (value) {
- this.parent_poll_ref.ref(this.parent);
- } else {
+ if (this.allowed_to_exit) {
this.parent_poll_ref.unref(this.parent);
+ } else {
+ this.parent_poll_ref.ref(this.parent);
}
if (this.vm) |vm| {
@@ -404,6 +402,7 @@ pub const WebWorker = struct {
}
fn requestTerminate(this: *WebWorker) bool {
+ this.setRef(false);
var vm = this.vm orelse {
this.requested_terminate = true;
return false;
diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts
index a0b757708..fd099a023 100644
--- a/src/js/node/worker_threads.ts
+++ b/src/js/node/worker_threads.ts
@@ -1,4 +1,26 @@
-const { MessageChannel, BroadcastChannel } = globalThis;
+// import type { Readable, Writable } from "node:stream";
+// import type { WorkerOptions } from "node:worker_threads";
+declare const self: typeof globalThis;
+type WebWorker = InstanceType<typeof globalThis.Worker>;
+
+const EventEmitter = require("node:events");
+const { throwNotImplemented } = require("../internal/shared");
+
+const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis;
+const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
+
+const isMainThread = Bun.isMainThread;
+let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads");
+
+type NodeWorkerOptions = import("node:worker_threads").WorkerOptions;
+
+const emittedWarnings = new Set();
+function emitWarning(type, message) {
+ if (emittedWarnings.has(type)) return;
+ emittedWarnings.add(type);
+ // process.emitWarning(message); // our printing is bad
+ console.warn("[bun] Warning:", message);
+}
function injectFakeEmitter(Class) {
function messageEventHandler(event: MessageEvent) {
@@ -76,10 +98,6 @@ injectFakeEmitter(_MessagePort);
const MessagePort = _MessagePort;
-const EventEmitter = require("node:events");
-const isMainThread = Bun.isMainThread;
-let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads");
-let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
let resourceLimits = {};
let workerData = _workerData;
@@ -111,7 +129,7 @@ function fakeParentPort() {
});
Object.defineProperty(fake, "postMessage", {
- value(...args: any[]) {
+ value(...args: [any, any]) {
return self.postMessage(...args);
},
});
@@ -154,6 +172,7 @@ function fakeParentPort() {
return fake;
}
+let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
function getEnvironmentData() {
return process.env;
@@ -164,27 +183,22 @@ function setEnvironmentData(env: any) {
}
function markAsUntransferable() {
- throw new Error("markAsUntransferable is not implemented");
+ throwNotImplemented("worker_threads.markAsUntransferable");
}
function moveMessagePortToContext() {
- throw new Error("moveMessagePortToContext is not implemented");
+ throwNotImplemented("worker_threads.moveMessagePortToContext");
}
-const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
-
-const WebWorker = globalThis.Worker;
class Worker extends EventEmitter {
- #worker: globalThis.Worker;
+ #worker: WebWorker;
#performance;
#onExitPromise = undefined;
- constructor(filename: string, options: any = {}) {
+ constructor(filename: string, options: NodeWorkerOptions = {}) {
super();
-
- this.#worker = new WebWorker(filename, {
- ...options,
- });
+ // TODO: stdin, stdout, stderr, and other node specific options.
+ this.#worker = new WebWorker(filename, options);
this.#worker.addEventListener("close", this.#onClose.bind(this));
this.#worker.addEventListener("error", this.#onError.bind(this));
this.#worker.addEventListener("message", this.#onMessage.bind(this));
@@ -218,7 +232,12 @@ class Worker extends EventEmitter {
get performance() {
return (this.#performance ??= {
eventLoopUtilization() {
- return {};
+ emitWarning("performance", "worker_threads.Worker.performance is not implemented.");
+ return {
+ idle: 0,
+ active: 0,
+ utilization: 0,
+ };
},
});
}
@@ -232,16 +251,16 @@ class Worker extends EventEmitter {
this.#worker.addEventListener(
"close",
event => {
- // TODO: exit code
- resolve(0);
+ resolve(event.code);
},
{ once: true },
);
+ this.#worker.terminate();
return (this.#onExitPromise = promise);
}
- postMessage(...args: any[]) {
+ postMessage(...args: [any, any]) {
return this.#worker.postMessage(...args);
}
@@ -261,7 +280,7 @@ class Worker extends EventEmitter {
#onMessageError(event: Event) {
// TODO: is this right?
- this.emit("messageerror", event.error || event);
+ this.emit("messageerror", (event as any).error || event);
}
#onOpen() {
@@ -269,8 +288,8 @@ class Worker extends EventEmitter {
this.emit("online");
}
- getHeapSnapshot() {
- return {};
+ async getHeapSnapshot() {
+ throwNotImplemented("worker_threads.Worker.getHeapSnapshot");
}
}
export default {
@@ -291,6 +310,5 @@ export default {
moveMessagePortToContext,
receiveMessageOnPort,
SHARE_ENV,
-
threadId,
};
diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h
index 0ad5c4c97..7969c4cc4 100644
--- a/src/js/out/InternalModuleRegistryConstants.h
+++ b/src/js/out/InternalModuleRegistryConstants.h
@@ -189,7 +189,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
@@ -414,7 +414,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
@@ -640,7 +640,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
diff --git a/src/js/private.d.ts b/src/js/private.d.ts
index 276ac136c..2667df631 100644
--- a/src/js/private.d.ts
+++ b/src/js/private.d.ts
@@ -193,6 +193,12 @@ interface BunLazyModules {
set: typeof import("./builtins/AsyncContext").setAsyncContext;
cleanupLater: () => void;
};
+ "worker_threads": [
+ //
+ workerData: any,
+ threadId: number,
+ _receiveMessageOnPort: (port: MessagePort) => any,
+ ];
// ReadableStream related
[1]: any;
diff --git a/test/js/node/worker_threads/worker_threads.test.ts b/test/js/node/worker_threads/worker_threads.test.ts
index 3bee1a50e..ae88add17 100644
--- a/test/js/node/worker_threads/worker_threads.test.ts
+++ b/test/js/node/worker_threads/worker_threads.test.ts
@@ -50,12 +50,14 @@ test("all properties are present", () => {
expect(Worker).toBeDefined();
expect(() => {
+ // @ts-expect-error no args
wt.markAsUntransferable();
- }).toThrow("not implemented");
+ }).toThrow("not yet implemented");
expect(() => {
+ // @ts-expect-error no args
wt.moveMessagePortToContext();
- }).toThrow("not implemented");
+ }).toThrow("not yet implemented");
});
test("receiveMessageOnPort works across threads", () => {
diff --git a/test/js/web/many-messages-event-loop.js b/test/js/web/many-messages-event-loop.js
new file mode 100644
index 000000000..2eaba2568
--- /dev/null
+++ b/test/js/web/many-messages-event-loop.js
@@ -0,0 +1,11 @@
+const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href);
+
+worker.postMessage("initial message");
+worker.addEventListener("message", ({ data }) => {
+ if (data.done) {
+ console.log("done");
+ worker.terminate();
+ } else {
+ worker.postMessage({ i: data.i + 1 });
+ }
+});
diff --git a/test/js/web/worker-fixture-env.js b/test/js/web/worker-fixture-env.js
new file mode 100644
index 000000000..203ed3141
--- /dev/null
+++ b/test/js/web/worker-fixture-env.js
@@ -0,0 +1,12 @@
+import * as worker_threads from "worker_threads";
+
+if (worker_threads.isMainThread) throw new Error("worker_threads.isMainThread is wrong");
+
+Bun.inspect(process.env);
+
+onmessage = ({}) => {
+ postMessage({
+ env: process.env,
+ hello: process.env.hello,
+ });
+};
diff --git a/test/js/web/worker-fixture-many-messages.js b/test/js/web/worker-fixture-many-messages.js
new file mode 100644
index 000000000..7a8f1d910
--- /dev/null
+++ b/test/js/web/worker-fixture-many-messages.js
@@ -0,0 +1,12 @@
+addEventListener("message", e => {
+ const data = e.data;
+ // console.log("worker", data);
+
+ if (data === "initial message") {
+ postMessage({ i: 0 });
+ } else if (data.i > 50) {
+ postMessage({ done: true });
+ } else {
+ postMessage({ i: data.i + 1 });
+ }
+});
diff --git a/test/js/web/worker.test.ts b/test/js/web/worker.test.ts
index 87dcf0911..1babfbcc3 100644
--- a/test/js/web/worker.test.ts
+++ b/test/js/web/worker.test.ts
@@ -1,4 +1,6 @@
import { expect, test } from "bun:test";
+import { bunEnv, bunExe } from "harness";
+import path from "path";
test("worker", done => {
const worker = new Worker(new URL("worker-fixture.js", import.meta.url).href, {
@@ -10,8 +12,115 @@ test("worker", done => {
done(e.error);
};
worker.onmessage = e => {
- expect(e.data).toEqual("initial message");
+ try {
+ expect(e.data).toEqual("initial message");
+ } catch (e) {
+ done(e);
+ } finally {
+ worker.terminate();
+ done();
+ }
worker.terminate();
done();
};
});
+
+test("worker-env", done => {
+ const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, {
+ env: {
+ hello: "world",
+ another_key: 123 as any,
+ },
+ });
+ worker.postMessage("hello");
+ worker.onerror = e => {
+ done(e.error);
+ };
+ worker.onmessage = e => {
+ try {
+ expect(e.data).toEqual({
+ env: {
+ hello: "world",
+ another_key: "123",
+ },
+ hello: "world",
+ });
+ } catch (e) {
+ done(e);
+ } finally {
+ worker.terminate();
+ done();
+ }
+ };
+});
+
+test("worker-env with a lot of properties", done => {
+ const obj: any = {};
+
+ for (let i = 0; i < 1000; i++) {
+ obj["prop " + i] = Math.random().toString();
+ }
+
+ const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, {
+ env: obj,
+ });
+ worker.postMessage("hello");
+ worker.onerror = e => {
+ done(e.error);
+ };
+ worker.onmessage = e => {
+ try {
+ expect(e.data).toEqual({
+ env: obj,
+ hello: undefined,
+ });
+ } catch (e) {
+ done(e);
+ } finally {
+ worker.terminate();
+ done();
+ }
+ };
+});
+
+test("sending 50 messages should just work", done => {
+ const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href, {});
+
+ worker.postMessage("initial message");
+ worker.addEventListener("message", ({ data }) => {
+ if (data.done) {
+ worker.terminate();
+ done();
+ } else {
+ worker.postMessage({ i: data.i + 1 });
+ }
+ });
+});
+
+test("worker by default will not close the event loop", done => {
+ const x = Bun.spawn({
+ cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.js")],
+ env: bunEnv,
+ stdio: ["inherit", "pipe", "inherit"],
+ });
+
+ const timer = setTimeout(() => {
+ x.kill();
+ done(new Error("timeout"));
+ }, 1000);
+
+ x.exited.then(async code => {
+ clearTimeout(timer);
+ if (code !== 0) {
+ done(new Error("exited with non-zero code"));
+ } else {
+ const text = await new Response(x.stdout).text();
+ if (text.includes("done")) {
+ console.log({ text });
+ done(new Error("event loop killed early"));
+ } else {
+ done();
+ }
+ }
+ });
+});