aboutsummaryrefslogtreecommitdiff
path: root/src/js
diff options
context:
space:
mode:
authorGravatar dave caruso <me@paperdave.net> 2023-08-07 23:58:38 -0700
committerGravatar GitHub <noreply@github.com> 2023-08-07 23:58:38 -0700
commit5497accbdb14da9e361175ad1cd074731b7f8eeb (patch)
tree994424356f9059b1171f410a9689f2d00bf89f6b /src/js
parent182e600eb79655e85b3f0371bc46fc4de8e70094 (diff)
downloadbun-5497accbdb14da9e361175ad1cd074731b7f8eeb.tar.gz
bun-5497accbdb14da9e361175ad1cd074731b7f8eeb.tar.zst
bun-5497accbdb14da9e361175ad1cd074731b7f8eeb.zip
Add `env` option for `node:worker_threads` (#4052)
* almost works * env stuff * test fixes * wtfmove * ok * ok * ref by default * it now does the ref stuff by default * cool
Diffstat (limited to 'src/js')
-rw-r--r--src/js/node/worker_threads.ts68
-rw-r--r--src/js/out/InternalModuleRegistryConstants.h6
-rw-r--r--src/js/private.d.ts6
3 files changed, 52 insertions, 28 deletions
diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts
index a0b757708..fd099a023 100644
--- a/src/js/node/worker_threads.ts
+++ b/src/js/node/worker_threads.ts
@@ -1,4 +1,26 @@
-const { MessageChannel, BroadcastChannel } = globalThis;
+// import type { Readable, Writable } from "node:stream";
+// import type { WorkerOptions } from "node:worker_threads";
+declare const self: typeof globalThis;
+type WebWorker = InstanceType<typeof globalThis.Worker>;
+
+const EventEmitter = require("node:events");
+const { throwNotImplemented } = require("../internal/shared");
+
+const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis;
+const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
+
+const isMainThread = Bun.isMainThread;
+let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads");
+
+type NodeWorkerOptions = import("node:worker_threads").WorkerOptions;
+
+const emittedWarnings = new Set();
+function emitWarning(type, message) {
+ if (emittedWarnings.has(type)) return;
+ emittedWarnings.add(type);
+ // process.emitWarning(message); // our printing is bad
+ console.warn("[bun] Warning:", message);
+}
function injectFakeEmitter(Class) {
function messageEventHandler(event: MessageEvent) {
@@ -76,10 +98,6 @@ injectFakeEmitter(_MessagePort);
const MessagePort = _MessagePort;
-const EventEmitter = require("node:events");
-const isMainThread = Bun.isMainThread;
-let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads");
-let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
let resourceLimits = {};
let workerData = _workerData;
@@ -111,7 +129,7 @@ function fakeParentPort() {
});
Object.defineProperty(fake, "postMessage", {
- value(...args: any[]) {
+ value(...args: [any, any]) {
return self.postMessage(...args);
},
});
@@ -154,6 +172,7 @@ function fakeParentPort() {
return fake;
}
+let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
function getEnvironmentData() {
return process.env;
@@ -164,27 +183,22 @@ function setEnvironmentData(env: any) {
}
function markAsUntransferable() {
- throw new Error("markAsUntransferable is not implemented");
+ throwNotImplemented("worker_threads.markAsUntransferable");
}
function moveMessagePortToContext() {
- throw new Error("moveMessagePortToContext is not implemented");
+ throwNotImplemented("worker_threads.moveMessagePortToContext");
}
-const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
-
-const WebWorker = globalThis.Worker;
class Worker extends EventEmitter {
- #worker: globalThis.Worker;
+ #worker: WebWorker;
#performance;
#onExitPromise = undefined;
- constructor(filename: string, options: any = {}) {
+ constructor(filename: string, options: NodeWorkerOptions = {}) {
super();
-
- this.#worker = new WebWorker(filename, {
- ...options,
- });
+ // TODO: stdin, stdout, stderr, and other node specific options.
+ this.#worker = new WebWorker(filename, options);
this.#worker.addEventListener("close", this.#onClose.bind(this));
this.#worker.addEventListener("error", this.#onError.bind(this));
this.#worker.addEventListener("message", this.#onMessage.bind(this));
@@ -218,7 +232,12 @@ class Worker extends EventEmitter {
get performance() {
return (this.#performance ??= {
eventLoopUtilization() {
- return {};
+ emitWarning("performance", "worker_threads.Worker.performance is not implemented.");
+ return {
+ idle: 0,
+ active: 0,
+ utilization: 0,
+ };
},
});
}
@@ -232,16 +251,16 @@ class Worker extends EventEmitter {
this.#worker.addEventListener(
"close",
event => {
- // TODO: exit code
- resolve(0);
+ resolve(event.code);
},
{ once: true },
);
+ this.#worker.terminate();
return (this.#onExitPromise = promise);
}
- postMessage(...args: any[]) {
+ postMessage(...args: [any, any]) {
return this.#worker.postMessage(...args);
}
@@ -261,7 +280,7 @@ class Worker extends EventEmitter {
#onMessageError(event: Event) {
// TODO: is this right?
- this.emit("messageerror", event.error || event);
+ this.emit("messageerror", (event as any).error || event);
}
#onOpen() {
@@ -269,8 +288,8 @@ class Worker extends EventEmitter {
this.emit("online");
}
- getHeapSnapshot() {
- return {};
+ async getHeapSnapshot() {
+ throwNotImplemented("worker_threads.Worker.getHeapSnapshot");
}
}
export default {
@@ -291,6 +310,5 @@ export default {
moveMessagePortToContext,
receiveMessageOnPort,
SHARE_ENV,
-
threadId,
};
diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h
index 0ad5c4c97..7969c4cc4 100644
--- a/src/js/out/InternalModuleRegistryConstants.h
+++ b/src/js/out/InternalModuleRegistryConstants.h
@@ -189,7 +189,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
@@ -414,7 +414,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
@@ -640,7 +640,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const{MessageChannel,BroadcastChannel}=globalThis;function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort,EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\"),parentPort=isMainThread\?null:fakeParentPort(),resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throw new Error(\"markAsUntransferable is not implemented\")}function moveMessagePortToContext(){throw new Error(\"moveMessagePortToContext is not implemented\")}const SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),WebWorker=globalThis.Worker;class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,{...options}),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return{}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(0)},{once:!0}),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}getHeapSnapshot(){return{}}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(){this.emit(\"exit\")}#onError(event){this.emit(\"error\",event)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error||event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
diff --git a/src/js/private.d.ts b/src/js/private.d.ts
index 276ac136c..2667df631 100644
--- a/src/js/private.d.ts
+++ b/src/js/private.d.ts
@@ -193,6 +193,12 @@ interface BunLazyModules {
set: typeof import("./builtins/AsyncContext").setAsyncContext;
cleanupLater: () => void;
};
+ "worker_threads": [
+ //
+ workerData: any,
+ threadId: number,
+ _receiveMessageOnPort: (port: MessagePort) => any,
+ ];
// ReadableStream related
[1]: any;