diff options
35 files changed, 465 insertions, 263 deletions
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fc18df32f..84394f654 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -47,33 +47,17 @@ TODO: document this (see [`bindings.zig`](src/bun.js/bindings/bindings.zig) and Copy from examples like `Subprocess` or `Response`. -### ESM modules +### ESM Modules and Builtins JS Bun implements ESM modules in a mix of native code and JavaScript. Several Node.js modules are implemented in JavaScript and loosely based on browserify polyfills. -The ESM modules in Bun are located in [`src/bun.js/*.exports.js`](src/bun.js/). Unlike other code in Bun, these files are NOT transpiled. They are loaded directly into the JavaScriptCore VM. That means `require` does not work in these files. Instead, you must use `import.meta.require`, or ideally, not use require/import other files at all. +Builtin modules in Bun are located in [`src/js`](src/js/). These files are transpiled and support a JavaScriptCore-only syntax for internal slots, which is explained further in [`src/js/README.md`](src/js/README.md). -The module loader is in [`src/bun.js/module_loader.zig`](src/bun.js/module_loader.zig). - -### JavaScript Builtins - -TODO: update this with the new build process that uses TypeScript and `$` instead of `@`. - -JavaScript builtins are located in [`src/js/builtins/*.ts`](src/js/builtins). - -These files support a JavaScriptCore-only syntax for internal slots. `@` is used to access an internal slot. For example: `new @Array(123)` will create a new `Array` similar to `new Array(123)`, except if a library modifies the `Array` global, it will not affect the internal slot (`@Array`). These names must be allow-listed in `BunBuiltinNames.h` (though JavaScriptCore allowlists some names by default). +Native C++ modules are in `src/bun.js/modules/`. -They can not use or reference ESM-modules. The files that end with `*Internals.js` are automatically loaded globally. Most usage of internals right now are the stream implementations (which share a lot of code from Safari/WebKit) and ImportMetaObject (which is how `require` is implemented in the runtime) - -To regenerate the builtins: - -```sh -make clean-bindings && make generate-builtins && make bindings -j10 -``` - -It is recommended that you have ccache installed or else you will spend a lot of time waiting for the bindings to compile. +The module loader is in [`src/bun.js/module_loader.zig`](src/bun.js/module_loader.zig). ### Memory management in Bun's JavaScript runtime @@ -555,7 +555,7 @@ tinycc: PYTHON=$(shell which python 2>/dev/null || which python3 2>/dev/null || which python2 2>/dev/null) .PHONY: esm -js: +js: # to rebundle js (rebuilding binary not needed to reload js code) NODE_ENV=production bun src/js/_codegen/index.ts esm-debug: @@ -660,8 +660,8 @@ else PKGNAME_NINJA := ninja-build endif -.PHONY: require -require: +.PHONY: assert-deps +assert-deps: @echo "Checking if the required utilities are available..." @if [ $(CLANG_VERSION) -lt "15" ]; then echo -e "ERROR: clang version >=15 required, found: $(CLANG_VERSION). Install with:\n\n $(POSIX_PKG_MANAGER) install llvm@15"; exit 1; fi @cmake --version >/dev/null 2>&1 || (echo -e "ERROR: cmake is required."; exit 1) @@ -673,6 +673,9 @@ require: @which $(LIBTOOL) > /dev/null || (echo -e "ERROR: libtool is required. Install with:\n\n $(POSIX_PKG_MANAGER) install libtool"; exit 1) @which ninja > /dev/null || (echo -e "ERROR: Ninja is required. Install with:\n\n $(POSIX_PKG_MANAGER) install $(PKGNAME_NINJA)"; exit 1) @which pkg-config > /dev/null || (echo -e "ERROR: pkg-config is required. Install with:\n\n $(POSIX_PKG_MANAGER) install pkg-config"; exit 1) + @which rustc > /dev/null || (echo -e "ERROR: rustc is required." exit 1) + @which cargo > /dev/null || (echo -e "ERROR: cargo is required." exit 1) + @test $(shell cargo --version | awk '{print $$2}' | cut -d. -f2) -gt 57 || (echo -e "ERROR: cargo version must be at least 1.57."; exit 1) @echo "You have the dependencies installed! Woo" # the following allows you to run `make submodule` to update or init submodules. but we will exclude webkit @@ -1108,9 +1111,6 @@ endif dev-obj-linux: $(ZIG) build obj -Dtarget=x86_64-linux-gnu -Dcpu="$(CPU_TARGET)" -.PHONY: dev -dev: mkdir-dev dev-obj link ## compile zig changes + link bun - mkdir-dev: mkdir -p $(DEBUG_PACKAGE_DIR) @@ -1900,26 +1900,44 @@ vendor-without-npm: node-fallbacks runtime_js fallback_decoder bun_error mimallo vendor-without-check: npm-install vendor-without-npm .PHONY: vendor -vendor: require submodule vendor-without-check +vendor: assert-deps submodule vendor-without-check .PHONY: vendor-dev -vendor-dev: require submodule npm-install-dev vendor-without-npm +vendor-dev: assert-deps submodule npm-install-dev vendor-without-npm .PHONY: bun bun: vendor identifier-cache build-obj bun-link-lld-release bun-codesign-release-local -.PHONY: regenerate-bindings -regenerate-bindings: ## compile src/js/builtins + all c++ code, does not link +.PHONY: cpp +cpp: ## compile src/js/builtins + all c++ code then link + @make clean-bindings js + @make bindings -j$(CPU_COUNT) + @make link + +.PHONY: cpp +cpp-no-link: @make clean-bindings js @make bindings -j$(CPU_COUNT) +.PHONY: zig +zig: ## compile zig code then link + @make mkdir-dev dev-obj link + +.PHONY: zig-no-link +zig-no-link: + @make mkdir-dev dev-obj + +.PHONY: dev +dev: # combo of `make cpp` and `make zig` + @make cpp-no-link zig-no-link -j2 + @make link + .PHONY: setup -setup: vendor-dev identifier-cache clean-bindings js - make jsc-check - make bindings -j$(CPU_COUNT) +setup: vendor-dev identifier-cache clean-bindings + make jsc-check cpp zig link @echo "" - @echo "Development environment setup complete" - @echo "Run \`make dev\` to build \`bun-debug\`" + @echo "First build complete!" + @echo "\"bun-debug\" is available at $(DEBUG_BIN)/bun-debug" @echo "" .PHONY: help diff --git a/docs/api/workers.md b/docs/api/workers.md index ba45d7cc1..73a925531 100644 --- a/docs/api/workers.md +++ b/docs/api/workers.md @@ -88,7 +88,7 @@ worker.addEventListener("message", event => { ## Terminating a worker -A `Worker` instance terminate automatically when Bun's process exits. To terminate a `Worker` sooner, call `worker.terminate()`. +A `Worker` instance terminates automatically once it's event loop has no work left to do. Attaching a `"message"` listener on the global or any `MessagePort`s will keep the event loop alive. To forcefully terminate a `Worker`, call `worker.terminate()`. ```ts const worker = new Worker(new URL("worker.ts", import.meta.url).href); @@ -97,18 +97,20 @@ const worker = new Worker(new URL("worker.ts", import.meta.url).href); worker.terminate(); ``` +This will cause the worker's to exit as soon as possible. + ### `process.exit()` -A worker can terminate itself with `process.exit()`. This does not terminate the main process. Like in Node.js, `process.on('beforeExit', callback)` and `process.on('exit', callback)` are emitted on the worker thread (and not on the main thread). +A worker can terminate itself with `process.exit()`. This does not terminate the main process. Like in Node.js, `process.on('beforeExit', callback)` and `process.on('exit', callback)` are emitted on the worker thread (and not on the main thread), and the exit code is passed to the `"close"` event. ### `"close"` -The `"close"` event is emitted when a worker has been terminated. It can take some time for the worker to actually terminate, so this event is emitted when the worker has been marked as terminated. +The `"close"` event is emitted when a worker has been terminated. It can take some time for the worker to actually terminate, so this event is emitted when the worker has been marked as terminated. The `CloseEvent` will contain the exit code passed to `process.exit()`, or 0 if closed for other reasons. ```ts const worker = new Worker(new URL("worker.ts", import.meta.url).href); -worker.addEventListener("close", () => { +worker.addEventListener("close", event => { console.log("worker is being closed"); }); ``` @@ -117,14 +119,27 @@ This event does not exist in browsers. ## Managing lifetime -By default, an active `Worker` will _not_ keep the main (spawning) process alive. Once the main script finishes, the main thread will terminate, shutting down any workers it created. +By default, an active `Worker` will keep the main (spawning) process alive, so async tasks like `setTimeout` and promises will keep the process alive. Attaching `message` listeners will also keep the `Worker` alive. + +### `worker.unref` + +To stop a running worker from keeping the process alive, call `worker.unref()`. This decouples the lifetime of the worker to the lifetime of the main process, and is equivlent to what Node.js' `worker_threads` does. + +```ts +const worker = new Worker(new URL("worker.ts", import.meta.url).href); +worker.unref(); +``` + +Note: `worker.unref()` is not available in browers. ### `worker.ref` -To keep the process alive until the `Worker` terminates, call `worker.ref()`. This couples the lifetime of the worker to the lifetime of the main process. +To keep the process alive until the `Worker` terminates, call `worker.ref()`. A ref'd worker is the default behavior, and still needs something going on in the event loop (such as a `"message"` listener) for the worker to continue running. ```ts const worker = new Worker(new URL("worker.ts", import.meta.url).href); +worker.unref(); +// later... worker.ref(); ``` @@ -132,22 +147,11 @@ Alternatively, you can also pass an `options` object to `Worker`: ```ts const worker = new Worker(new URL("worker.ts", import.meta.url).href, { - ref: true, + ref: false, }); ``` -### `worker.unref` - -To stop keeping the process alive, call `worker.unref()`. - -```ts -const worker = new Worker(new URL("worker.ts", import.meta.url).href); -worker.ref(); -// ...later on -worker.unref(); -``` - -Note: `worker.ref()` and `worker.unref()` do not exist in browsers. +Note: `worker.ref()` is not available in browers. ## Memory usage with `smol` diff --git a/docs/project/development.md b/docs/project/development.md index 2a3630a12..c635595b3 100644 --- a/docs/project/development.md +++ b/docs/project/development.md @@ -115,18 +115,16 @@ $ bun install -g @oven/zig $ zigup 0.11.0-dev.4006+bf827d0b5 ``` -## Building - -After cloning the repository, run the following command. The runs +{% callout %} +We last updated Zig on **July 18th, 2023** +{% /callout %} -```bash -$ make setup -``` +## First Build -Then to build Bun: +After cloning the repository, run the following command to run the first build. This may take a while as it will clone submodules and build dependencies. ```bash -$ make dev +$ make setup ``` The binary will be located at `packages/debug-bun-{platform}-{arch}/bun-debug`. It is recommended to add this to your `$PATH`. To verify the build worked, lets print the version number on the development build of Bun. @@ -136,16 +134,78 @@ $ packages/debug-bun-*/bun-debug --version bun 0.x.y__dev ``` +Note: `make setup` is just an alias for the following: + +```bash +$ make assert-deps submodule npm-install-dev node-fallbacks runtime_js fallback_decoder bun_error mimalloc picohttp zlib boringssl libarchive lolhtml sqlite usockets uws tinycc c-ares zstd base64 cpp zig link +``` + +## Rebuilding + +Bun uses a series of make commands to rebuild parts of the codebase. The general rule for rebuilding is there is `make link` to rerun the linker, and then different make targets for different parts of the codebase. Do not pass `-j` to make as these scripts will break if run out of order, and multiple cores will be used when possible during the builds. + +| What changed | Run this command | +| ------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Zig Code | `make zig` | +| C++ Code | `make cpp` | +| Zig + C++ Code | `make dev` (combination of the above two) | +| JS/TS Code in `src/js` | `make js` (in bun-debug, js is loaded from disk without a recompile). If you change the names of any file or add/remove anything, you must also run `make dev`. | +| `*.classes.ts` | `make generate-classes dev` | +| JSSink | `make generate-sink cpp` | +| `src/node_fallbacks/*` | `make node-fallbacks zig` | +| `identifier_data.zig` | `make identifier-cache zig` | +| Code using `cppFn`/`JSC.markBinding` | `make headers` (TODO: explain explain what this is used for and why it's useful) | + +`make setup` cloned a bunch of submodules and built the subprojects. When a submodule is out of date, run `make submodule` to quickly reset/update all your submodules, then you can rebuild individual submodules with their respective command. + +| Dependency | Run this command | +| -------------- | ---------------------------------------- | +| WebKit | `bun install` (it is a prebuilt package) | +| uWebSockets | `make uws` | +| Mimalloc | `make mimalloc` | +| PicoHTTPParser | `make picohttp` | +| zlib | `make zlib` | +| BoringSSL | `make boringssl` | +| libarchive | `make libarchive` | +| lolhtml | `make lolhtml` | +| sqlite | `make sqlite` | +| TinyCC | `make tinycc` | +| c-ares | `make c-ares` | +| zstd | `make zstd` | +| Base64 | `make base64` | + +The above will probably also need Zig and/or C++ code rebuilt. + ## VSCode VSCode is the recommended IDE for working on Bun, as it has been configured. Once opening, you can run `Extensions: Show Recommended Extensions` to install the recommended extensions for Zig and C++. ZLS is automatically configured. +### ZLS + +ZLS is the language server for Zig. The latest binary that the extension auto-updates may not function with the version of Zig that Bun uses. It may be more reliable to build ZLS from source: + +```bash +$ git clone https://github.com/zigtools/zls +$ cd zls +$ git checkout f91ff831f4959efcb7e648dba4f0132c296d26c0 +$ zig build +``` + +Then add absolute paths to Zig and ZLS in your vscode config: + +```json +{ + "zig.zigPath": "/path/to/zig/install/zig", + "zig.zls.path": "/path/to/zls/zig-out/bin/zls" +} +``` + ## JavaScript builtins When you change anything in `src/js/builtins/*` or switch branches, run this: ```bash -$ make regenerate-bindings +$ make js cpp ``` That inlines the TypeScript code into C++ headers. @@ -154,6 +214,8 @@ That inlines the TypeScript code into C++ headers. Make sure you have `ccache` installed, otherwise regeneration will take much longer than it should. {% /callout %} +For more information on how `src/js` works, see `src/js/README.md` in the codebase. + ## Code generation scripts Bun leverages a lot of code generation scripts. @@ -193,7 +255,7 @@ Certain modules like `node:fs`, `node:stream`, `bun:sqlite`, and `ws` are implem When these are changed, run: ``` -$ make esm +$ make js ``` In debug builds, Bun automatically loads these from the filesystem, wherever it was compiled, so no need to re-run `make dev`. In release builds, this same behavior can be done via the environment variable `BUN_OVERRIDE_MODULE_PATH`. When set to the repository root, Bun will read from the bundled modules in the repository instead of the ones baked into the binary. @@ -244,7 +306,7 @@ For performance reasons, `make submodule` does not automatically update the WebK ```bash $ bun install -$ make regenerate-bindings +$ make cpp ``` <!-- Check the [Bun repo](https://github.com/oven-sh/bun/tree/main/src/bun.js) to get the hash of the commit of WebKit is currently being used. diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index c964c1d95..27f40eeab 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -2122,6 +2122,11 @@ pub export fn MarkedArrayBuffer_deallocator(bytes_: *anyopaque, _: *anyopaque) v // zig's memory allocator interface won't work here // mimalloc knows the size of things // but we don't + // if (comptime Environment.allow_assert) { + // std.debug.assert(mimalloc.mi_check_owned(bytes_) or + // mimalloc.mi_heap_check_owned(JSC.VirtualMachine.get().arena.heap.?, bytes_)); + // } + mimalloc.mi_free(bytes_); } @@ -3285,6 +3290,15 @@ pub const PollRef = struct { this.status = .inactive; vm.uws_event_loop.?.unref(); } + + /// From another thread, Prevent a poll from keeping the process alive. + pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + if (this.status != .active) + return; + this.status = .inactive; + vm.uws_event_loop.?.unrefConcurrently(); + } + /// Prevent a poll from keeping the process alive on the next tick. pub fn unrefOnNextTick(this: *PollRef, vm: *JSC.VirtualMachine) void { if (this.status != .active) @@ -3293,6 +3307,14 @@ pub const PollRef = struct { vm.pending_unref_counter +|= 1; } + /// From another thread, prevent a poll from keeping the process alive on the next tick. + pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + if (this.status != .active) + return; + this.status = .inactive; + _ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic); + } + /// Allow a poll to keep the process alive. pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void { if (this.status != .inactive) @@ -3300,6 +3322,14 @@ pub const PollRef = struct { this.status = .active; vm.uws_event_loop.?.ref(); } + + /// Allow a poll to keep the process alive. + pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + if (this.status != .inactive) + return; + this.status = .active; + vm.uws_event_loop.?.refConcurrently(); + } }; const KQueueGenerationNumber = if (Environment.isMac and Environment.allow_assert) usize else u0; diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.cpp b/src/bun.js/bindings/BunWorkerGlobalScope.cpp index ef1f70fdf..f78111633 100644 --- a/src/bun.js/bindings/BunWorkerGlobalScope.cpp +++ b/src/bun.js/bindings/BunWorkerGlobalScope.cpp @@ -11,4 +11,34 @@ MessagePortChannelProvider& GlobalScope::messagePortChannelProvider() { return *reinterpret_cast<MessagePortChannelProvider*>(&MessagePortChannelProviderImpl::singleton()); } + +void GlobalScope::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind) +{ + if (eventType == eventNames().messageEvent) { + auto& global = static_cast<GlobalScope&>(self); + switch (kind) { + case Add: + if (global.m_messageEventCount == 0) { + global.scriptExecutionContext()->refEventLoop(); + } + global.m_messageEventCount++; + break; + case Remove: + global.m_messageEventCount--; + if (global.m_messageEventCount == 0) { + global.scriptExecutionContext()->unrefEventLoop(); + } + break; + // I dont think clear in this context is ever called. If it is (search OnDidChangeListenerKind::Clear for the impl), + // it may actually call once per event, in a way the Remove code above would suffice. + case Clear: + if (global.m_messageEventCount > 0) { + global.scriptExecutionContext()->unrefEventLoop(); + } + global.m_messageEventCount = 0; + break; + } + } +}; + }
\ No newline at end of file diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.h b/src/bun.js/bindings/BunWorkerGlobalScope.h index fff50d6ec..f8e0be52e 100644 --- a/src/bun.js/bindings/BunWorkerGlobalScope.h +++ b/src/bun.js/bindings/BunWorkerGlobalScope.h @@ -2,6 +2,7 @@ #include "root.h" +#include "EventNames.h" #include "EventTarget.h" #include "ContextDestructionObserver.h" #include "ExceptionOr.h" @@ -17,12 +18,18 @@ class MessagePortChannelProviderImpl; class GlobalScope : public RefCounted<GlobalScope>, public EventTargetWithInlineData { WTF_MAKE_ISO_ALLOCATED(GlobalScope); + uint32_t m_messageEventCount; + + static void onDidChangeListenerImpl(EventTarget&, const AtomString&, OnDidChangeListenerKind); + public: GlobalScope(ScriptExecutionContext* context) : EventTargetWithInlineData() , m_context(context) { + this->onDidChangeListener = &onDidChangeListenerImpl; } + using RefCounted::deref; using RefCounted::ref; diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index 201fc0959..f7998c83c 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -3901,6 +3901,12 @@ bool JSC__VM__isEntered(JSC__VM* arg0) { return (*arg0).isEntered(); } void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1) { (*arg0).setExecutionForbidden(); } +// These may be called concurrently from another thread. +void JSC__VM__notifyNeedTermination(JSC__VM* arg0) { (*arg0).notifyNeedTermination(); } +void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0) { (*arg0).notifyNeedDebuggerBreak(); } +void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0) { (*arg0).notifyNeedShellTimeoutCheck(); } +void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0) { (*arg0).notifyNeedWatchdogCheck(); } + void JSC__VM__throwError(JSC__VM* vm_, JSC__JSGlobalObject* arg1, JSC__JSValue value) { JSC::VM& vm = *reinterpret_cast<JSC::VM*>(vm_); diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index 0b787ee42..7f081a4b4 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -5063,6 +5063,26 @@ pub const VM = extern struct { }); } + // These four functions fire VM traps. To understand what that means, see VMTraps.h for a giant explainer. + // These may be called concurrently from another thread. + + /// Fires NeedTermination Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedTermination(vm: *VM) void { + cppFn("notifyNeedTermination", .{vm}); + } + /// Fires NeedWatchdogCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedWatchdogCheck(vm: *VM) void { + cppFn("notifyNeedWatchdogCheck", .{vm}); + } + /// Fires NeedDebuggerBreak Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedDebuggerBreak(vm: *VM) void { + cppFn("notifyNeedDebuggerBreak", .{vm}); + } + /// Fires NeedShellTimeoutCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedShellTimeoutCheck(vm: *VM) void { + cppFn("notifyNeedShellTimeoutCheck", .{vm}); + } + pub fn isEntered(vm: *VM) bool { return cppFn("isEntered", .{ vm, @@ -5538,7 +5558,6 @@ pub const WTF = struct { // This is any alignment WTF__copyLCharsFromUCharSource(destination, source.ptr, source.len); } - }; pub const Callback = struct { diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index 63ae6c3a4..c809ddcee 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -430,6 +430,10 @@ CPP_DECL void JSC__VM__releaseWeakRefs(JSC__VM* arg0); CPP_DECL JSC__JSValue JSC__VM__runGC(JSC__VM* arg0, bool arg1); CPP_DECL void JSC__VM__setControlFlowProfiler(JSC__VM* arg0, bool arg1); CPP_DECL void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1); +CPP_DECL void JSC__VM__notifyNeedTermination(JSC__VM* arg0); +CPP_DECL void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0); +CPP_DECL void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0); +CPP_DECL void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0); CPP_DECL void JSC__VM__setExecutionTimeLimit(JSC__VM* arg0, double arg1); CPP_DECL void JSC__VM__shrinkFootprint(JSC__VM* arg0); CPP_DECL void JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue2); diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig index 2b25c0f5b..8471cb235 100644 --- a/src/bun.js/bindings/headers.zig +++ b/src/bun.js/bindings/headers.zig @@ -324,6 +324,10 @@ pub extern fn JSC__VM__releaseWeakRefs(arg0: *bindings.VM) void; pub extern fn JSC__VM__runGC(arg0: *bindings.VM, arg1: bool) JSC__JSValue; pub extern fn JSC__VM__setControlFlowProfiler(arg0: *bindings.VM, arg1: bool) void; pub extern fn JSC__VM__setExecutionForbidden(arg0: *bindings.VM, arg1: bool) void; +pub extern fn JSC__VM__notifyNeedTermination(arg0: *bindings.VM) void; +pub extern fn JSC__VM__notifyNeedDebuggerBreak(arg0: *bindings.VM) void; +pub extern fn JSC__VM__notifyNeedShellTimeoutCheck(arg0: *bindings.VM) void; +pub extern fn JSC__VM__notifyNeedWatchdogCheck(arg0: *bindings.VM) void; pub extern fn JSC__VM__setExecutionTimeLimit(arg0: *bindings.VM, arg1: f64) void; pub extern fn JSC__VM__shrinkFootprint(arg0: *bindings.VM) void; pub extern fn JSC__VM__throwError(arg0: *bindings.VM, arg1: *bindings.JSGlobalObject, JSValue2: JSC__JSValue) void; diff --git a/src/bun.js/bindings/webcore/EventTarget.cpp b/src/bun.js/bindings/webcore/EventTarget.cpp index 9fb875595..36adac3d3 100644 --- a/src/bun.js/bindings/webcore/EventTarget.cpp +++ b/src/bun.js/bindings/webcore/EventTarget.cpp @@ -111,6 +111,9 @@ bool EventTarget::addEventListener(const AtomString& eventType, Ref<EventListene // invalidateEventListenerRegions(); eventListenersDidChange(); + if (UNLIKELY(this->onDidChangeListener)) { + this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Add); + } return true; } @@ -146,6 +149,9 @@ bool EventTarget::removeEventListener(const AtomString& eventType, EventListener if (eventNames().isWheelEventType(eventType)) invalidateEventListenerRegions(); + if (UNLIKELY(this->onDidChangeListener)) { + this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Remove); + } eventListenersDidChange(); return true; } @@ -376,6 +382,11 @@ void EventTarget::removeAllEventListeners() // if (data->eventListenerMap.contains(eventNames().wheelEvent) || data->eventListenerMap.contains(eventNames().mousewheelEvent)) // invalidateEventListenerRegions(); + if (UNLIKELY(this->onDidChangeListener)) { + for (auto& eventType : data->eventListenerMap.eventTypes()) { + this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Clear); + } + } data->eventListenerMap.clear(); eventListenersDidChange(); } diff --git a/src/bun.js/bindings/webcore/EventTarget.h b/src/bun.js/bindings/webcore/EventTarget.h index b763393d7..f5c4354ee 100644 --- a/src/bun.js/bindings/webcore/EventTarget.h +++ b/src/bun.js/bindings/webcore/EventTarget.h @@ -126,6 +126,13 @@ protected: virtual void eventListenersDidChange() {} + enum OnDidChangeListenerKind { + Add, + Remove, + Clear, + }; + WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)> onDidChangeListener = WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)>(nullptr); + private: virtual void refEventTarget() = 0; virtual void derefEventTarget() = 0; diff --git a/src/bun.js/bindings/webcore/MessagePort.cpp b/src/bun.js/bindings/webcore/MessagePort.cpp index 2d94060f1..da2bd32e8 100644 --- a/src/bun.js/bindings/webcore/MessagePort.cpp +++ b/src/bun.js/bindings/webcore/MessagePort.cpp @@ -392,10 +392,38 @@ Vector<RefPtr<MessagePort>> MessagePort::entanglePorts(ScriptExecutionContext& c }); } +void MessagePort::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind) +{ + if (eventType == eventNames().messageEvent) { + auto& port = static_cast<MessagePort&>(self); + switch (kind) { + case Add: + if (port.m_messageEventCount == 0) { + port.scriptExecutionContext()->refEventLoop(); + } + port.m_messageEventCount++; + break; + case Remove: + port.m_messageEventCount--; + if (port.m_messageEventCount == 0) { + port.scriptExecutionContext()->unrefEventLoop(); + } + break; + case Clear: + if (port.m_messageEventCount > 0) { + port.scriptExecutionContext()->unrefEventLoop(); + } + port.m_messageEventCount = 0; + break; + } + } +}; + Ref<MessagePort> MessagePort::entangle(ScriptExecutionContext& context, TransferredMessagePort&& transferredPort) { auto port = MessagePort::create(context, transferredPort.first, transferredPort.second); port->entangle(); + port->onDidChangeListener = &MessagePort::onDidChangeListenerImpl; return port; } @@ -406,7 +434,6 @@ bool MessagePort::addEventListener(const AtomString& eventType, Ref<EventListene start(); m_hasMessageEventListener = true; } - return EventTarget::addEventListener(eventType, WTFMove(listener), options); } diff --git a/src/bun.js/bindings/webcore/MessagePort.h b/src/bun.js/bindings/webcore/MessagePort.h index fe577f93e..d4532433f 100644 --- a/src/bun.js/bindings/webcore/MessagePort.h +++ b/src/bun.js/bindings/webcore/MessagePort.h @@ -141,6 +141,9 @@ private: mutable std::atomic<unsigned> m_refCount { 1 }; bool m_hasRef { false }; + + uint32_t m_messageEventCount { 0 }; + static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind); }; WebCoreOpaqueRoot root(MessagePort*); diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index 92503bbbd..2735770c0 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -66,7 +66,7 @@ namespace WebCore { WTF_MAKE_ISO_ALLOCATED_IMPL(Worker); -extern "C" void WebWorker__terminate( +extern "C" void WebWorker__requestTerminate( void* worker); static Lock allWorkersLock; @@ -210,12 +210,9 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m void Worker::terminate() { - if (m_wasTerminated) { - return; - } // m_contextProxy.terminateWorkerGlobalScope(); m_wasTerminated = true; - WebWorker__terminate(impl_); + WebWorker__requestTerminate(impl_); } // const char* Worker::activeDOMObjectName() const @@ -259,9 +256,14 @@ bool Worker::hasPendingActivity() const void Worker::dispatchEvent(Event& event) { - if (m_wasTerminated) - return; + if (!m_wasTerminated) + EventTargetWithInlineData::dispatchEvent(event); +} +// The close event gets dispatched even if m_wasTerminated is true. +// This allows new wt.Worker().terminate() to actually resolve +void Worker::dispatchCloseEvent(Event& event) +{ EventTargetWithInlineData::dispatchEvent(event); } @@ -350,11 +352,10 @@ void Worker::dispatchExit(int32_t exitCode) ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void { protectedThis->m_isOnline = false; protectedThis->m_isClosing = true; - protectedThis->setKeepAlive(false); if (protectedThis->hasEventListeners(eventNames().closeEvent)) { auto event = CloseEvent::create(exitCode == 0, static_cast<unsigned short>(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s); - protectedThis->dispatchEvent(event); + protectedThis->dispatchCloseEvent(event); } }); } @@ -388,18 +389,20 @@ extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* } auto& vm = globalObject->vm(); - + vm.notifyNeedTermination(); if (JSC::JSObject* obj = JSC::jsDynamicCast<JSC::JSObject*>(globalObject->moduleLoader())) { auto id = JSC::Identifier::fromString(globalObject->vm(), "registry"_s); - if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(obj->getIfPropertyExists(globalObject, id))) { - registry->clear(vm); + auto registryValue = obj->getIfPropertyExists(globalObject, id); + if (registryValue) { + if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(registryValue)) { + registry->clear(vm); + } } } gcUnprotect(globalObject); vm.deleteAllCode(JSC::DeleteAllCodeEffort::PreventCollectionAndDeleteAllCode); vm.heap.reportAbandonedObjectGraph(); WTF::releaseFastMallocFreeMemoryForThisThread(); - vm.notifyNeedTermination(); vm.deferredWorkTimer->doWork(vm); } } diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h index a296fa4a8..f7641d28d 100644 --- a/src/bun.js/bindings/webcore/Worker.h +++ b/src/bun.js/bindings/webcore/Worker.h @@ -76,6 +76,7 @@ public: const String& name() const { return m_options.name; } void dispatchEvent(Event&); + void dispatchCloseEvent(Event&); void setKeepAlive(bool); #if ENABLE(WEB_RTC) diff --git a/src/bun.js/module_loader.zig b/src/bun.js/module_loader.zig index 5d9158b58..fc31498b4 100644 --- a/src/bun.js/module_loader.zig +++ b/src/bun.js/module_loader.zig @@ -2004,12 +2004,6 @@ pub const ModuleLoader = struct { } else if (HardcodedModule.Map.getWithEql(specifier, bun.String.eqlComptime)) |hardcoded| { switch (hardcoded) { .@"bun:main" => { - defer { - if (jsc_vm.worker) |worker| { - worker.queueInitialTask(); - } - } - return ResolvedSource{ .allocator = null, .source_code = bun.String.create(jsc_vm.entry_point.source.contents), diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 02bfe8757..ddb58f0cd 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -2346,7 +2346,7 @@ pub const Process = struct { var vm = globalObject.bunVM(); if (vm.worker) |worker| { vm.exit_handler.exit_code = code; - worker.terminate(); + worker.requestTerminate(); return; } diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index 7fa9c6690..16d7aa34c 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -5,10 +5,12 @@ const log = Output.scoped(.Worker, true); const std = @import("std"); const JSValue = JSC.JSValue; +/// Shared implementation of Web and Node `Worker` pub const WebWorker = struct { - // null when haven't started yet + /// null when haven't started yet vm: ?*JSC.VirtualMachine = null, status: Status = .start, + /// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop. requested_terminate: bool = false, execution_context_id: u32 = 0, parent_context_id: u32 = 0, @@ -20,15 +22,25 @@ pub const WebWorker = struct { arena: bun.MimallocArena = undefined, name: [:0]const u8 = "Worker", cpp_worker: *anyopaque, - allowed_to_exit: bool = false, mini: bool = false, + + /// `user_keep_alive` is the state of the user's .ref()/.unref() calls + /// if false, then the parent poll will always be unref, otherwise the worker's event loop will keep the poll alive. + user_keep_alive: bool = false, + worker_event_loop_running: bool = true, parent_poll_ref: JSC.PollRef = .{}, - initial_poll_ref: JSC.PollRef = .{}, - did_send_initial_task: bool = false, + + pub const Status = enum { + start, + starting, + running, + terminated, + }; extern fn WebWorker__dispatchExit(?*JSC.JSGlobalObject, *anyopaque, i32) void; extern fn WebWorker__dispatchOnline(this: *anyopaque, *JSC.JSGlobalObject) void; extern fn WebWorker__dispatchError(*JSC.JSGlobalObject, *anyopaque, bun.String, JSValue) void; + export fn WebWorker__getParentWorker(vm: *JSC.VirtualMachine) ?*anyopaque { var worker = vm.worker orelse return null; return worker.cpp_worker; @@ -43,31 +55,12 @@ pub const WebWorker = struct { .{worker}, ) catch { worker.deinit(); - worker.parent_poll_ref.unref(worker.parent); - worker.initial_poll_ref.unref(worker.parent); - bun.default_allocator.destroy(worker); return false; }; thread.detach(); return true; } - pub fn hasPendingActivity(this: *WebWorker) callconv(.C) bool { - JSC.markBinding(@src()); - - if (this.vm == null) { - return !this.requested_terminate; - } - - if (!this.allowed_to_exit) { - return true; - } - - var vm = this.vm.?; - - return vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0; - } - pub fn create( cpp_worker: *void, parent: *JSC.VirtualMachine, @@ -80,6 +73,7 @@ pub const WebWorker = struct { default_unref: bool, ) callconv(.C) ?*WebWorker { JSC.markBinding(@src()); + log("[{d}] WebWorker.create", .{this_context_id}); var spec_slice = specifier_str.toUTF8(bun.default_allocator); defer spec_slice.deinit(); var prev_log = parent.bundler.log; @@ -115,35 +109,15 @@ pub const WebWorker = struct { } break :brk ""; }, + .user_keep_alive = !default_unref, + .worker_event_loop_running = true, }; - worker.initial_poll_ref.ref(parent); - - if (!default_unref) { - worker.allowed_to_exit = false; - worker.parent_poll_ref.ref(parent); - } + worker.parent_poll_ref.refConcurrently(parent); return worker; } - pub fn queueInitialTask(this: *WebWorker) void { - if (this.did_send_initial_task) return; - this.did_send_initial_task = true; - - const Unref = struct { - pub fn unref(worker: *WebWorker) void { - worker.initial_poll_ref.unref(worker.parent); - } - }; - - const AnyTask = JSC.AnyTask.New(WebWorker, Unref.unref); - var any_task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM"); - any_task.* = AnyTask.init(this); - var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM"); - this.parent.eventLoop().enqueueTaskConcurrent(concurrent_task.from(any_task, .auto_deinit)); - } - pub fn startWithErrorHandling( this: *WebWorker, ) void { @@ -162,7 +136,6 @@ pub const WebWorker = struct { } if (this.requested_terminate) { - this.queueInitialTask(); this.deinit(); return; } @@ -182,12 +155,12 @@ pub const WebWorker = struct { b.configureRouter(false) catch { this.flushLogs(); - this.onTerminate(); + this.exitAndDeinit(); return; }; b.configureDefines() catch { this.flushLogs(); - this.onTerminate(); + this.exitAndDeinit(); return; }; @@ -202,8 +175,13 @@ pub const WebWorker = struct { vm.global.vm().holdAPILock(this, callback); } + /// Deinit will clean up vm and everything. + /// Early deinit may be called from caller thread, but full vm deinit will only be called within worker's thread. fn deinit(this: *WebWorker) void { + log("[{d}] deinit", .{this.execution_context_id}); + this.parent_poll_ref.unrefConcurrently(this.parent); bun.default_allocator.free(this.specifier); + bun.default_allocator.destroy(this); } fn flushLogs(this: *WebWorker) void { @@ -250,7 +228,7 @@ pub const WebWorker = struct { } fn setStatus(this: *WebWorker, status: Status) void { - log("status: {s}", .{@tagName(status)}); + log("[{d}] status: {s}", .{ this.execution_context_id, @tagName(status) }); this.status = status; } @@ -265,18 +243,15 @@ pub const WebWorker = struct { var promise = vm.loadEntryPointForWebWorker(this.specifier) catch { this.flushLogs(); - this.onTerminate(); + this.exitAndDeinit(); return; }; - this.queueInitialTask(); - if (promise.status(vm.global.vm()) == .Rejected) { vm.onUnhandledError(vm.global, promise.result(vm.global.vm())); vm.exit_handler.exit_code = 1; - this.onTerminate(); - + this.exitAndDeinit(); return; } @@ -300,89 +275,56 @@ pub const WebWorker = struct { // always doing a first tick so we call CppTask without delay after dispatchOnline vm.tick(); - { - while (true) { - while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) { - vm.tick(); - - vm.eventLoop().autoTickActive(); - } - - if (!this.allowed_to_exit) { - this.flushLogs(); - vm.eventLoop().tickPossiblyForever(); - continue; - } - - vm.onBeforeExit(); - - if (!this.allowed_to_exit) - continue; - - break; - } - - this.flushLogs(); - this.onTerminate(); + while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) { + vm.tick(); + if (this.requested_terminate) break; + vm.eventLoop().autoTickActive(); + if (this.requested_terminate) break; } - } - - pub const Status = enum { - start, - starting, - running, - terminated, - }; - pub fn terminate(this: *WebWorker) callconv(.C) void { - if (this.requested_terminate) { - return; + // Only call "beforeExit" if we weren't from a .terminate + if (!this.requested_terminate) { + // TODO: is this able to allow the event loop to continue? + vm.onBeforeExit(); } - _ = this.requestTerminate(); + + this.flushLogs(); + this.exitAndDeinit(); } + /// This is worker.ref()/.unref() from JS (Caller thread) pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void { - if (this.requested_terminate and !value) { - this.parent_poll_ref.unref(this.parent); + if (this.requested_terminate) { return; } - - this.allowed_to_exit = !value; - if (this.allowed_to_exit) { - this.parent_poll_ref.unref(this.parent); - } else { + if (value) { this.parent_poll_ref.ref(this.parent); - } - - if (this.vm) |vm| { - vm.eventLoop().wakeup(); - } - } - - fn onTerminate(this: *WebWorker) void { - log("onTerminate", .{}); - - this.reallyExit(); - } - - comptime { - if (!JSC.is_bindgen) { - @export(hasPendingActivity, .{ .name = "WebWorker__hasPendingActivity" }); - @export(create, .{ .name = "WebWorker__create" }); - @export(terminate, .{ .name = "WebWorker__terminate" }); - @export(setRef, .{ .name = "WebWorker__setRef" }); - _ = WebWorker__updatePtr; + } else { + this.parent_poll_ref.unref(this.parent); } } - fn reallyExit(this: *WebWorker) void { - JSC.markBinding(@src()); - + /// Request a terminate (Called from main thread from worker.terminate(), or inside worker in process.exit()) + /// The termination will actually happen after the next tick of the worker's loop. + pub fn requestTerminate(this: *WebWorker) callconv(.C) void { if (this.requested_terminate) { return; } + log("[{d}] requestTerminate", .{this.execution_context_id}); + this.setRef(false); this.requested_terminate = true; + if (this.vm) |vm| { + vm.global.vm().notifyNeedTermination(); + vm.eventLoop().wakeup(); + } + } + /// This handles cleanup, emitting the "close" event, and deinit. + /// Only call after the VM is initialized AND on the same thread as the worker. + /// Otherwise, call `requestTerminate` to cause the event loop to safely terminate after the next tick. + fn exitAndDeinit(this: *WebWorker) void { + JSC.markBinding(@src()); + log("[{d}] exitAndDeinit", .{this.execution_context_id}); var cpp_worker = this.cpp_worker; var exit_code: i32 = 0; var globalObject: ?*JSC.JSGlobalObject = null; @@ -392,24 +334,18 @@ pub const WebWorker = struct { exit_code = vm.exit_handler.exit_code; globalObject = vm.global; this.arena.deinit(); + vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm. } WebWorker__dispatchExit(globalObject, cpp_worker, exit_code); - this.deinit(); } - fn requestTerminate(this: *WebWorker) bool { - this.setRef(false); - var vm = this.vm orelse { - this.requested_terminate = true; - return false; - }; - this.allowed_to_exit = true; - log("requesting terminate", .{}); - var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM"); - var task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM"); - task.* = JSC.AnyTask.New(WebWorker, onTerminate).init(this); - vm.eventLoop().enqueueTaskConcurrent(concurrent_task.from(task, .auto_deinit)); - return true; + comptime { + if (!JSC.is_bindgen) { + @export(create, .{ .name = "WebWorker__create" }); + @export(requestTerminate, .{ .name = "WebWorker__requestTerminate" }); + @export(setRef, .{ .name = "WebWorker__setRef" }); + _ = WebWorker__updatePtr; + } } }; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 10f836bbc..5714eb09f 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -771,15 +771,14 @@ pub const Loop = extern struct { this.active += 1; } pub fn refConcurrently(this: *Loop) void { - log("ref", .{}); _ = @atomicRmw(@TypeOf(this.num_polls), &this.num_polls, .Add, 1, .Monotonic); _ = @atomicRmw(@TypeOf(this.active), &this.active, .Add, 1, .Monotonic); + log("refConcurrently ({d}, {d})", .{ this.num_polls, this.active }); } - pub fn unrefConcurrently(this: *Loop) void { - log("unref", .{}); _ = @atomicRmw(@TypeOf(this.num_polls), &this.num_polls, .Sub, 1, .Monotonic); _ = @atomicRmw(@TypeOf(this.active), &this.active, .Sub, 1, .Monotonic); + log("unrefConcurrently ({d}, {d})", .{ this.num_polls, this.active }); } pub fn unref(this: *Loop) void { diff --git a/src/js/README.md b/src/js/README.md index 8f270ed2a..82acf7c51 100644 --- a/src/js/README.md +++ b/src/js/README.md @@ -34,9 +34,9 @@ console.log($getInternalField) V8 has a [similar feature](https://v8.dev/blog/embedded-builtins) to this syntax (they use `%` instead) -On top of this, we have some special functions that are handled by the bundle preprocessor: +On top of this, we have some special functions that are handled by the builtin preprocessor: -- `require` works, but it must be a string literal that resolves to a module within src/js. This call gets replaced with `$requireId(id)`, which is a special function that skips the module resolver and directly loads the module by it's generated numerical ID. +- `require` works, but it must be passed a **string literal** that resolves to a module within `src/js`. This call gets replaced with `$getInternalField($internalModuleRegistery, <number>)`, which directly loads the module by it's generated numerical ID, skipping the resolver for inter-internal modules. - `$debug` is exactly like console.log, but is stripped in release builds. It is disabled by default, requiring you to pass one of: `BUN_DEBUG_MODULE_NAME=1`, `BUN_DEBUG_JS=1`, or `BUN_DEBUG_ALL=1`. You can also do `if($debug) {}` to check if debug env var is set. @@ -79,10 +79,6 @@ object->putDirectBuiltinFunction( ); ``` -## Extra Features - -`require` is replaced with `$requireId(id)` which allows these modules to import each other in a way that skips the module resolver. Being written in a syncronous format also makes this faster than ESM. All calls to `require` must be statically known or else this transformation is not possible. - ## Building Run `make js` to bundle all the builtins. The output is placed in `src/js/out/{modules,functions}/`, where these files are loaded dynamically by `bun-debug` (an exact filepath is inlined into the binary pointing at where you cloned bun, so moving the binary to another machine may not work). In a release build, these get minified and inlined into the binary (Please commit those generated headers). diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index 1cc778fd0..f1f15b64e 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -204,6 +204,9 @@ const unsupportedOptions = [ class Worker extends EventEmitter { #worker: WebWorker; #performance; + + // this is used by wt.Worker.terminate(); + // either is the exit code if exited, a promise resolving to the exit code, or undefined if we havent sent .terminate() yet #onExitPromise: Promise<number> | number | undefined = undefined; constructor(filename: string, options: NodeWorkerOptions = {}) { @@ -258,8 +261,9 @@ class Worker extends EventEmitter { } terminate() { - if (this.#onExitPromise) { - return this.#onExitPromise; + var onExitPromise = this.#onExitPromise; + if (onExitPromise) { + return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise); } const { resolve, promise } = Promise.withResolvers(); @@ -284,9 +288,8 @@ class Worker extends EventEmitter { this.emit("exit", e.code); } - #onError(event: ErrorEvent) { - // TODO: is this right? - this.emit("error", event.error); + #onError(error: ErrorEvent) { + this.emit("error", error); } #onMessage(event: MessageEvent) { diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h index f0029f6ed..67730f64d 100644 --- a/src/js/out/InternalModuleRegistryConstants.h +++ b/src/js/out/InternalModuleRegistryConstants.h @@ -189,7 +189,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const // // -static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(event){this.emit(\"error\",event.error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; +static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){var onExitPromise=this.#onExitPromise;if(onExitPromise)return @isPromise(onExitPromise)\?onExitPromise:Promise.resolve(onExitPromise);const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(error){this.emit(\"error\",error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; // // @@ -414,7 +414,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const // // -static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(event){this.emit(\"error\",event.error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; +static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){var onExitPromise=this.#onExitPromise;if(onExitPromise)return @isPromise(onExitPromise)\?onExitPromise:Promise.resolve(onExitPromise);const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(error){this.emit(\"error\",error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; // // @@ -640,7 +640,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const // // -static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(event){this.emit(\"error\",event.error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; +static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){var onExitPromise=this.#onExitPromise;if(onExitPromise)return @isPromise(onExitPromise)\?onExitPromise:Promise.resolve(onExitPromise);const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(error){this.emit(\"error\",error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s; // // diff --git a/test/bun.lockb b/test/bun.lockb Binary files differindex b717f5d1a..a4e2b2fdb 100755 --- a/test/bun.lockb +++ b/test/bun.lockb diff --git a/test/js/bun/util/main-worker-file.js b/test/js/bun/util/main-worker-file.js index 80f83ba30..2d5c3aedd 100644 --- a/test/js/bun/util/main-worker-file.js +++ b/test/js/bun/util/main-worker-file.js @@ -11,5 +11,6 @@ if (isMainThread) { }); await promise; + worker.terminate(); } diff --git a/test/js/third_party/esbuild/esbuild-test.js b/test/js/third_party/esbuild/esbuild-test.js index 09152246b..e0ce6f497 100644 --- a/test/js/third_party/esbuild/esbuild-test.js +++ b/test/js/third_party/esbuild/esbuild-test.js @@ -82,5 +82,3 @@ import { build, buildSync, transform, transformSync } from "esbuild"; throw new Error("Test failed."); } } - -process.exit(0); diff --git a/test/js/web/many-messages-event-loop.js b/test/js/web/many-messages-event-loop.js deleted file mode 100644 index 2eaba2568..000000000 --- a/test/js/web/many-messages-event-loop.js +++ /dev/null @@ -1,11 +0,0 @@ -const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href); - -worker.postMessage("initial message"); -worker.addEventListener("message", ({ data }) => { - if (data.done) { - console.log("done"); - worker.terminate(); - } else { - worker.postMessage({ i: data.i + 1 }); - } -}); diff --git a/test/js/web/many-messages-event-loop.mjs b/test/js/web/many-messages-event-loop.mjs new file mode 100644 index 000000000..deae5f791 --- /dev/null +++ b/test/js/web/many-messages-event-loop.mjs @@ -0,0 +1,11 @@ +const worker = new Worker(new URL(process.argv[2], import.meta.url)); + +worker.postMessage("initial message"); +worker.addEventListener("message", function fn({ data }) { + if (data.done) { + console.log("done"); + worker.removeEventListener("message", fn); + } else { + worker.postMessage({ i: data.i + 1 }); + } +}); diff --git a/test/js/web/worker-fixture-hang.js b/test/js/web/worker-fixture-hang.js new file mode 100644 index 000000000..4db3f65f8 --- /dev/null +++ b/test/js/web/worker-fixture-hang.js @@ -0,0 +1,3 @@ +setTimeout(() => { + process.exit(2); +}, 1000000); diff --git a/test/js/web/worker-fixture-many-messages.js b/test/js/web/worker-fixture-many-messages.js index 7a8f1d910..5bc4442a9 100644 --- a/test/js/web/worker-fixture-many-messages.js +++ b/test/js/web/worker-fixture-many-messages.js @@ -1,11 +1,11 @@ -addEventListener("message", e => { - const data = e.data; +addEventListener("message", function fn({ data }) { // console.log("worker", data); if (data === "initial message") { postMessage({ i: 0 }); } else if (data.i > 50) { postMessage({ done: true }); + removeEventListener("message", fn); } else { postMessage({ i: data.i + 1 }); } diff --git a/test/js/web/worker-fixture-many-messages2.js b/test/js/web/worker-fixture-many-messages2.js new file mode 100644 index 000000000..3c371b156 --- /dev/null +++ b/test/js/web/worker-fixture-many-messages2.js @@ -0,0 +1,12 @@ +onmessage = ({ data }) => { + // console.log("worker", data); + + if (data === "initial message") { + postMessage({ i: 0 }); + } else if (data.i > 50) { + postMessage({ done: true }); + onmessage = null; + } else { + postMessage({ i: data.i + 1 }); + } +}; diff --git a/test/js/web/worker-fixture-process-exit.js b/test/js/web/worker-fixture-process-exit.js index 0d93217d9..05746973b 100644 --- a/test/js/web/worker-fixture-process-exit.js +++ b/test/js/web/worker-fixture-process-exit.js @@ -1,3 +1,3 @@ setTimeout(() => { process.exit(2); -}, 100); +}, 10); diff --git a/test/js/web/worker-fixture-while-true.js b/test/js/web/worker-fixture-while-true.js new file mode 100644 index 000000000..ca6233960 --- /dev/null +++ b/test/js/web/worker-fixture-while-true.js @@ -0,0 +1,4 @@ +let i = 0; +while (true) { + postMessage({ i: i++ }); +} diff --git a/test/js/web/worker.test.ts b/test/js/web/worker.test.ts index 34d5f6f06..e1ab80487 100644 --- a/test/js/web/worker.test.ts +++ b/test/js/web/worker.test.ts @@ -98,9 +98,37 @@ test("sending 50 messages should just work", done => { }); }); -test("worker by default will not close the event loop", done => { +test("worker with event listeners doesnt close event loop", done => { const x = Bun.spawn({ - cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.js")], + cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.mjs"), "worker-fixture-many-messages.js"], + env: bunEnv, + stdio: ["inherit", "pipe", "inherit"], + }); + + const timer = setTimeout(() => { + x.kill(); + done(new Error("timeout")); + }, 1000); + + x.exited.then(async code => { + clearTimeout(timer); + if (code !== 0) { + done(new Error("exited with non-zero code")); + } else { + const text = await new Response(x.stdout).text(); + if (!text.includes("done")) { + console.log({ text }); + done(new Error("event loop killed early")); + } else { + done(); + } + } + }); +}); + +test("worker with event listeners doesnt close event loop 2", done => { + const x = Bun.spawn({ + cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.mjs"), "worker-fixture-many-messages2.js"], env: bunEnv, stdio: ["inherit", "pipe", "inherit"], }); @@ -146,7 +174,6 @@ test("worker_threads with process.exit", done => { }); worker.on("exit", event => { try { - console.log({ event }); expect(event).toBe(2); } catch (e) { done(e); @@ -155,19 +182,28 @@ test("worker_threads with process.exit", done => { }); }); -test.skip("worker_threads with process.exit and terminate", async () => { - const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, { +test("worker_threads terminate", async () => { + const worker = new wt.Worker(new URL("worker-fixture-hang.js", import.meta.url).href, { smol: true, }); const code = await worker.terminate(); - expect(code).toBe(2); + expect(code).toBe(0); }); -test.skip("worker_threads with process.exit (delay) and terminate", async () => { +test("worker_threads with process.exit (delay) and terminate", async () => { const worker2 = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, { smol: true, }); - await Bun.sleep(100); + await Bun.sleep(200); const code2 = await worker2.terminate(); expect(code2).toBe(2); }); + +test.skip("terminating forcefully properly interrupts", async () => { + const worker2 = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {}); + await new Promise<void>(done => { + worker2.on("message", () => done()); + }); + const code2 = await worker2.terminate(); + expect(code2).toBe(0); +}); |