aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar dave caruso <me@paperdave.net> 2023-08-12 13:51:03 -0700
committerGravatar GitHub <noreply@github.com> 2023-08-12 13:51:03 -0700
commit78defe7a87226b5b10766e24fae458a62811dab2 (patch)
tree6f81506a5556ec42c3bfc0b6333ada390bf92d1a
parentb94433ce86017dccb2e13070dcba57c11421c3ce (diff)
downloadbun-78defe7a87226b5b10766e24fae458a62811dab2.tar.gz
bun-78defe7a87226b5b10766e24fae458a62811dab2.tar.zst
bun-78defe7a87226b5b10766e24fae458a62811dab2.zip
Fix worker event loop ref/unref + leak (#4114)
* make more tests pass * worker changes * fix some bugs * remove this * progress * uh * okay * remove console log * a * comment assert for later * mergable state * remove test * remove test
-rw-r--r--CONTRIBUTING.md24
-rw-r--r--Makefile48
-rw-r--r--docs/api/workers.md42
-rw-r--r--docs/project/development.md84
-rw-r--r--src/bun.js/base.zig30
-rw-r--r--src/bun.js/bindings/BunWorkerGlobalScope.cpp30
-rw-r--r--src/bun.js/bindings/BunWorkerGlobalScope.h7
-rw-r--r--src/bun.js/bindings/bindings.cpp6
-rw-r--r--src/bun.js/bindings/bindings.zig21
-rw-r--r--src/bun.js/bindings/headers.h4
-rw-r--r--src/bun.js/bindings/headers.zig4
-rw-r--r--src/bun.js/bindings/webcore/EventTarget.cpp11
-rw-r--r--src/bun.js/bindings/webcore/EventTarget.h7
-rw-r--r--src/bun.js/bindings/webcore/MessagePort.cpp29
-rw-r--r--src/bun.js/bindings/webcore/MessagePort.h3
-rw-r--r--src/bun.js/bindings/webcore/Worker.cpp29
-rw-r--r--src/bun.js/bindings/webcore/Worker.h1
-rw-r--r--src/bun.js/module_loader.zig6
-rw-r--r--src/bun.js/node/types.zig2
-rw-r--r--src/bun.js/web_worker.zig206
-rw-r--r--src/deps/uws.zig5
-rw-r--r--src/js/README.md8
-rw-r--r--src/js/node/worker_threads.ts13
-rw-r--r--src/js/out/InternalModuleRegistryConstants.h6
-rwxr-xr-xtest/bun.lockbbin149700 -> 149737 bytes
-rw-r--r--test/js/bun/util/main-worker-file.js1
-rw-r--r--test/js/third_party/esbuild/esbuild-test.js2
-rw-r--r--test/js/web/many-messages-event-loop.js11
-rw-r--r--test/js/web/many-messages-event-loop.mjs11
-rw-r--r--test/js/web/worker-fixture-hang.js3
-rw-r--r--test/js/web/worker-fixture-many-messages.js4
-rw-r--r--test/js/web/worker-fixture-many-messages2.js12
-rw-r--r--test/js/web/worker-fixture-process-exit.js2
-rw-r--r--test/js/web/worker-fixture-while-true.js4
-rw-r--r--test/js/web/worker.test.ts52
35 files changed, 465 insertions, 263 deletions
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index fc18df32f..84394f654 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -47,33 +47,17 @@ TODO: document this (see [`bindings.zig`](src/bun.js/bindings/bindings.zig) and
Copy from examples like `Subprocess` or `Response`.
-### ESM modules
+### ESM Modules and Builtins JS
Bun implements ESM modules in a mix of native code and JavaScript.
Several Node.js modules are implemented in JavaScript and loosely based on browserify polyfills.
-The ESM modules in Bun are located in [`src/bun.js/*.exports.js`](src/bun.js/). Unlike other code in Bun, these files are NOT transpiled. They are loaded directly into the JavaScriptCore VM. That means `require` does not work in these files. Instead, you must use `import.meta.require`, or ideally, not use require/import other files at all.
+Builtin modules in Bun are located in [`src/js`](src/js/). These files are transpiled and support a JavaScriptCore-only syntax for internal slots, which is explained further in [`src/js/README.md`](src/js/README.md).
-The module loader is in [`src/bun.js/module_loader.zig`](src/bun.js/module_loader.zig).
-
-### JavaScript Builtins
-
-TODO: update this with the new build process that uses TypeScript and `$` instead of `@`.
-
-JavaScript builtins are located in [`src/js/builtins/*.ts`](src/js/builtins).
-
-These files support a JavaScriptCore-only syntax for internal slots. `@` is used to access an internal slot. For example: `new @Array(123)` will create a new `Array` similar to `new Array(123)`, except if a library modifies the `Array` global, it will not affect the internal slot (`@Array`). These names must be allow-listed in `BunBuiltinNames.h` (though JavaScriptCore allowlists some names by default).
+Native C++ modules are in `src/bun.js/modules/`.
-They can not use or reference ESM-modules. The files that end with `*Internals.js` are automatically loaded globally. Most usage of internals right now are the stream implementations (which share a lot of code from Safari/WebKit) and ImportMetaObject (which is how `require` is implemented in the runtime)
-
-To regenerate the builtins:
-
-```sh
-make clean-bindings && make generate-builtins && make bindings -j10
-```
-
-It is recommended that you have ccache installed or else you will spend a lot of time waiting for the bindings to compile.
+The module loader is in [`src/bun.js/module_loader.zig`](src/bun.js/module_loader.zig).
### Memory management in Bun's JavaScript runtime
diff --git a/Makefile b/Makefile
index 2a04a21e4..4b75fbf37 100644
--- a/Makefile
+++ b/Makefile
@@ -555,7 +555,7 @@ tinycc:
PYTHON=$(shell which python 2>/dev/null || which python3 2>/dev/null || which python2 2>/dev/null)
.PHONY: esm
-js:
+js: # to rebundle js (rebuilding binary not needed to reload js code)
NODE_ENV=production bun src/js/_codegen/index.ts
esm-debug:
@@ -660,8 +660,8 @@ else
PKGNAME_NINJA := ninja-build
endif
-.PHONY: require
-require:
+.PHONY: assert-deps
+assert-deps:
@echo "Checking if the required utilities are available..."
@if [ $(CLANG_VERSION) -lt "15" ]; then echo -e "ERROR: clang version >=15 required, found: $(CLANG_VERSION). Install with:\n\n $(POSIX_PKG_MANAGER) install llvm@15"; exit 1; fi
@cmake --version >/dev/null 2>&1 || (echo -e "ERROR: cmake is required."; exit 1)
@@ -673,6 +673,9 @@ require:
@which $(LIBTOOL) > /dev/null || (echo -e "ERROR: libtool is required. Install with:\n\n $(POSIX_PKG_MANAGER) install libtool"; exit 1)
@which ninja > /dev/null || (echo -e "ERROR: Ninja is required. Install with:\n\n $(POSIX_PKG_MANAGER) install $(PKGNAME_NINJA)"; exit 1)
@which pkg-config > /dev/null || (echo -e "ERROR: pkg-config is required. Install with:\n\n $(POSIX_PKG_MANAGER) install pkg-config"; exit 1)
+ @which rustc > /dev/null || (echo -e "ERROR: rustc is required." exit 1)
+ @which cargo > /dev/null || (echo -e "ERROR: cargo is required." exit 1)
+ @test $(shell cargo --version | awk '{print $$2}' | cut -d. -f2) -gt 57 || (echo -e "ERROR: cargo version must be at least 1.57."; exit 1)
@echo "You have the dependencies installed! Woo"
# the following allows you to run `make submodule` to update or init submodules. but we will exclude webkit
@@ -1108,9 +1111,6 @@ endif
dev-obj-linux:
$(ZIG) build obj -Dtarget=x86_64-linux-gnu -Dcpu="$(CPU_TARGET)"
-.PHONY: dev
-dev: mkdir-dev dev-obj link ## compile zig changes + link bun
-
mkdir-dev:
mkdir -p $(DEBUG_PACKAGE_DIR)
@@ -1900,26 +1900,44 @@ vendor-without-npm: node-fallbacks runtime_js fallback_decoder bun_error mimallo
vendor-without-check: npm-install vendor-without-npm
.PHONY: vendor
-vendor: require submodule vendor-without-check
+vendor: assert-deps submodule vendor-without-check
.PHONY: vendor-dev
-vendor-dev: require submodule npm-install-dev vendor-without-npm
+vendor-dev: assert-deps submodule npm-install-dev vendor-without-npm
.PHONY: bun
bun: vendor identifier-cache build-obj bun-link-lld-release bun-codesign-release-local
-.PHONY: regenerate-bindings
-regenerate-bindings: ## compile src/js/builtins + all c++ code, does not link
+.PHONY: cpp
+cpp: ## compile src/js/builtins + all c++ code then link
+ @make clean-bindings js
+ @make bindings -j$(CPU_COUNT)
+ @make link
+
+.PHONY: cpp
+cpp-no-link:
@make clean-bindings js
@make bindings -j$(CPU_COUNT)
+.PHONY: zig
+zig: ## compile zig code then link
+ @make mkdir-dev dev-obj link
+
+.PHONY: zig-no-link
+zig-no-link:
+ @make mkdir-dev dev-obj
+
+.PHONY: dev
+dev: # combo of `make cpp` and `make zig`
+ @make cpp-no-link zig-no-link -j2
+ @make link
+
.PHONY: setup
-setup: vendor-dev identifier-cache clean-bindings js
- make jsc-check
- make bindings -j$(CPU_COUNT)
+setup: vendor-dev identifier-cache clean-bindings
+ make jsc-check cpp zig link
@echo ""
- @echo "Development environment setup complete"
- @echo "Run \`make dev\` to build \`bun-debug\`"
+ @echo "First build complete!"
+ @echo "\"bun-debug\" is available at $(DEBUG_BIN)/bun-debug"
@echo ""
.PHONY: help
diff --git a/docs/api/workers.md b/docs/api/workers.md
index ba45d7cc1..73a925531 100644
--- a/docs/api/workers.md
+++ b/docs/api/workers.md
@@ -88,7 +88,7 @@ worker.addEventListener("message", event => {
## Terminating a worker
-A `Worker` instance terminate automatically when Bun's process exits. To terminate a `Worker` sooner, call `worker.terminate()`.
+A `Worker` instance terminates automatically once it's event loop has no work left to do. Attaching a `"message"` listener on the global or any `MessagePort`s will keep the event loop alive. To forcefully terminate a `Worker`, call `worker.terminate()`.
```ts
const worker = new Worker(new URL("worker.ts", import.meta.url).href);
@@ -97,18 +97,20 @@ const worker = new Worker(new URL("worker.ts", import.meta.url).href);
worker.terminate();
```
+This will cause the worker's to exit as soon as possible.
+
### `process.exit()`
-A worker can terminate itself with `process.exit()`. This does not terminate the main process. Like in Node.js, `process.on('beforeExit', callback)` and `process.on('exit', callback)` are emitted on the worker thread (and not on the main thread).
+A worker can terminate itself with `process.exit()`. This does not terminate the main process. Like in Node.js, `process.on('beforeExit', callback)` and `process.on('exit', callback)` are emitted on the worker thread (and not on the main thread), and the exit code is passed to the `"close"` event.
### `"close"`
-The `"close"` event is emitted when a worker has been terminated. It can take some time for the worker to actually terminate, so this event is emitted when the worker has been marked as terminated.
+The `"close"` event is emitted when a worker has been terminated. It can take some time for the worker to actually terminate, so this event is emitted when the worker has been marked as terminated. The `CloseEvent` will contain the exit code passed to `process.exit()`, or 0 if closed for other reasons.
```ts
const worker = new Worker(new URL("worker.ts", import.meta.url).href);
-worker.addEventListener("close", () => {
+worker.addEventListener("close", event => {
console.log("worker is being closed");
});
```
@@ -117,14 +119,27 @@ This event does not exist in browsers.
## Managing lifetime
-By default, an active `Worker` will _not_ keep the main (spawning) process alive. Once the main script finishes, the main thread will terminate, shutting down any workers it created.
+By default, an active `Worker` will keep the main (spawning) process alive, so async tasks like `setTimeout` and promises will keep the process alive. Attaching `message` listeners will also keep the `Worker` alive.
+
+### `worker.unref`
+
+To stop a running worker from keeping the process alive, call `worker.unref()`. This decouples the lifetime of the worker to the lifetime of the main process, and is equivlent to what Node.js' `worker_threads` does.
+
+```ts
+const worker = new Worker(new URL("worker.ts", import.meta.url).href);
+worker.unref();
+```
+
+Note: `worker.unref()` is not available in browers.
### `worker.ref`
-To keep the process alive until the `Worker` terminates, call `worker.ref()`. This couples the lifetime of the worker to the lifetime of the main process.
+To keep the process alive until the `Worker` terminates, call `worker.ref()`. A ref'd worker is the default behavior, and still needs something going on in the event loop (such as a `"message"` listener) for the worker to continue running.
```ts
const worker = new Worker(new URL("worker.ts", import.meta.url).href);
+worker.unref();
+// later...
worker.ref();
```
@@ -132,22 +147,11 @@ Alternatively, you can also pass an `options` object to `Worker`:
```ts
const worker = new Worker(new URL("worker.ts", import.meta.url).href, {
- ref: true,
+ ref: false,
});
```
-### `worker.unref`
-
-To stop keeping the process alive, call `worker.unref()`.
-
-```ts
-const worker = new Worker(new URL("worker.ts", import.meta.url).href);
-worker.ref();
-// ...later on
-worker.unref();
-```
-
-Note: `worker.ref()` and `worker.unref()` do not exist in browsers.
+Note: `worker.ref()` is not available in browers.
## Memory usage with `smol`
diff --git a/docs/project/development.md b/docs/project/development.md
index 2a3630a12..c635595b3 100644
--- a/docs/project/development.md
+++ b/docs/project/development.md
@@ -115,18 +115,16 @@ $ bun install -g @oven/zig
$ zigup 0.11.0-dev.4006+bf827d0b5
```
-## Building
-
-After cloning the repository, run the following command. The runs
+{% callout %}
+We last updated Zig on **July 18th, 2023**
+{% /callout %}
-```bash
-$ make setup
-```
+## First Build
-Then to build Bun:
+After cloning the repository, run the following command to run the first build. This may take a while as it will clone submodules and build dependencies.
```bash
-$ make dev
+$ make setup
```
The binary will be located at `packages/debug-bun-{platform}-{arch}/bun-debug`. It is recommended to add this to your `$PATH`. To verify the build worked, lets print the version number on the development build of Bun.
@@ -136,16 +134,78 @@ $ packages/debug-bun-*/bun-debug --version
bun 0.x.y__dev
```
+Note: `make setup` is just an alias for the following:
+
+```bash
+$ make assert-deps submodule npm-install-dev node-fallbacks runtime_js fallback_decoder bun_error mimalloc picohttp zlib boringssl libarchive lolhtml sqlite usockets uws tinycc c-ares zstd base64 cpp zig link
+```
+
+## Rebuilding
+
+Bun uses a series of make commands to rebuild parts of the codebase. The general rule for rebuilding is there is `make link` to rerun the linker, and then different make targets for different parts of the codebase. Do not pass `-j` to make as these scripts will break if run out of order, and multiple cores will be used when possible during the builds.
+
+| What changed | Run this command |
+| ------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| Zig Code | `make zig` |
+| C++ Code | `make cpp` |
+| Zig + C++ Code | `make dev` (combination of the above two) |
+| JS/TS Code in `src/js` | `make js` (in bun-debug, js is loaded from disk without a recompile). If you change the names of any file or add/remove anything, you must also run `make dev`. |
+| `*.classes.ts` | `make generate-classes dev` |
+| JSSink | `make generate-sink cpp` |
+| `src/node_fallbacks/*` | `make node-fallbacks zig` |
+| `identifier_data.zig` | `make identifier-cache zig` |
+| Code using `cppFn`/`JSC.markBinding` | `make headers` (TODO: explain explain what this is used for and why it's useful) |
+
+`make setup` cloned a bunch of submodules and built the subprojects. When a submodule is out of date, run `make submodule` to quickly reset/update all your submodules, then you can rebuild individual submodules with their respective command.
+
+| Dependency | Run this command |
+| -------------- | ---------------------------------------- |
+| WebKit | `bun install` (it is a prebuilt package) |
+| uWebSockets | `make uws` |
+| Mimalloc | `make mimalloc` |
+| PicoHTTPParser | `make picohttp` |
+| zlib | `make zlib` |
+| BoringSSL | `make boringssl` |
+| libarchive | `make libarchive` |
+| lolhtml | `make lolhtml` |
+| sqlite | `make sqlite` |
+| TinyCC | `make tinycc` |
+| c-ares | `make c-ares` |
+| zstd | `make zstd` |
+| Base64 | `make base64` |
+
+The above will probably also need Zig and/or C++ code rebuilt.
+
## VSCode
VSCode is the recommended IDE for working on Bun, as it has been configured. Once opening, you can run `Extensions: Show Recommended Extensions` to install the recommended extensions for Zig and C++. ZLS is automatically configured.
+### ZLS
+
+ZLS is the language server for Zig. The latest binary that the extension auto-updates may not function with the version of Zig that Bun uses. It may be more reliable to build ZLS from source:
+
+```bash
+$ git clone https://github.com/zigtools/zls
+$ cd zls
+$ git checkout f91ff831f4959efcb7e648dba4f0132c296d26c0
+$ zig build
+```
+
+Then add absolute paths to Zig and ZLS in your vscode config:
+
+```json
+{
+ "zig.zigPath": "/path/to/zig/install/zig",
+ "zig.zls.path": "/path/to/zls/zig-out/bin/zls"
+}
+```
+
## JavaScript builtins
When you change anything in `src/js/builtins/*` or switch branches, run this:
```bash
-$ make regenerate-bindings
+$ make js cpp
```
That inlines the TypeScript code into C++ headers.
@@ -154,6 +214,8 @@ That inlines the TypeScript code into C++ headers.
Make sure you have `ccache` installed, otherwise regeneration will take much longer than it should.
{% /callout %}
+For more information on how `src/js` works, see `src/js/README.md` in the codebase.
+
## Code generation scripts
Bun leverages a lot of code generation scripts.
@@ -193,7 +255,7 @@ Certain modules like `node:fs`, `node:stream`, `bun:sqlite`, and `ws` are implem
When these are changed, run:
```
-$ make esm
+$ make js
```
In debug builds, Bun automatically loads these from the filesystem, wherever it was compiled, so no need to re-run `make dev`. In release builds, this same behavior can be done via the environment variable `BUN_OVERRIDE_MODULE_PATH`. When set to the repository root, Bun will read from the bundled modules in the repository instead of the ones baked into the binary.
@@ -244,7 +306,7 @@ For performance reasons, `make submodule` does not automatically update the WebK
```bash
$ bun install
-$ make regenerate-bindings
+$ make cpp
```
<!-- Check the [Bun repo](https://github.com/oven-sh/bun/tree/main/src/bun.js) to get the hash of the commit of WebKit is currently being used.
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index c964c1d95..27f40eeab 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -2122,6 +2122,11 @@ pub export fn MarkedArrayBuffer_deallocator(bytes_: *anyopaque, _: *anyopaque) v
// zig's memory allocator interface won't work here
// mimalloc knows the size of things
// but we don't
+ // if (comptime Environment.allow_assert) {
+ // std.debug.assert(mimalloc.mi_check_owned(bytes_) or
+ // mimalloc.mi_heap_check_owned(JSC.VirtualMachine.get().arena.heap.?, bytes_));
+ // }
+
mimalloc.mi_free(bytes_);
}
@@ -3285,6 +3290,15 @@ pub const PollRef = struct {
this.status = .inactive;
vm.uws_event_loop.?.unref();
}
+
+ /// From another thread, Prevent a poll from keeping the process alive.
+ pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
+ if (this.status != .active)
+ return;
+ this.status = .inactive;
+ vm.uws_event_loop.?.unrefConcurrently();
+ }
+
/// Prevent a poll from keeping the process alive on the next tick.
pub fn unrefOnNextTick(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
@@ -3293,6 +3307,14 @@ pub const PollRef = struct {
vm.pending_unref_counter +|= 1;
}
+ /// From another thread, prevent a poll from keeping the process alive on the next tick.
+ pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
+ if (this.status != .active)
+ return;
+ this.status = .inactive;
+ _ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic);
+ }
+
/// Allow a poll to keep the process alive.
pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .inactive)
@@ -3300,6 +3322,14 @@ pub const PollRef = struct {
this.status = .active;
vm.uws_event_loop.?.ref();
}
+
+ /// Allow a poll to keep the process alive.
+ pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
+ if (this.status != .inactive)
+ return;
+ this.status = .active;
+ vm.uws_event_loop.?.refConcurrently();
+ }
};
const KQueueGenerationNumber = if (Environment.isMac and Environment.allow_assert) usize else u0;
diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.cpp b/src/bun.js/bindings/BunWorkerGlobalScope.cpp
index ef1f70fdf..f78111633 100644
--- a/src/bun.js/bindings/BunWorkerGlobalScope.cpp
+++ b/src/bun.js/bindings/BunWorkerGlobalScope.cpp
@@ -11,4 +11,34 @@ MessagePortChannelProvider& GlobalScope::messagePortChannelProvider()
{
return *reinterpret_cast<MessagePortChannelProvider*>(&MessagePortChannelProviderImpl::singleton());
}
+
+void GlobalScope::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind)
+{
+ if (eventType == eventNames().messageEvent) {
+ auto& global = static_cast<GlobalScope&>(self);
+ switch (kind) {
+ case Add:
+ if (global.m_messageEventCount == 0) {
+ global.scriptExecutionContext()->refEventLoop();
+ }
+ global.m_messageEventCount++;
+ break;
+ case Remove:
+ global.m_messageEventCount--;
+ if (global.m_messageEventCount == 0) {
+ global.scriptExecutionContext()->unrefEventLoop();
+ }
+ break;
+ // I dont think clear in this context is ever called. If it is (search OnDidChangeListenerKind::Clear for the impl),
+ // it may actually call once per event, in a way the Remove code above would suffice.
+ case Clear:
+ if (global.m_messageEventCount > 0) {
+ global.scriptExecutionContext()->unrefEventLoop();
+ }
+ global.m_messageEventCount = 0;
+ break;
+ }
+ }
+};
+
} \ No newline at end of file
diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.h b/src/bun.js/bindings/BunWorkerGlobalScope.h
index fff50d6ec..f8e0be52e 100644
--- a/src/bun.js/bindings/BunWorkerGlobalScope.h
+++ b/src/bun.js/bindings/BunWorkerGlobalScope.h
@@ -2,6 +2,7 @@
#include "root.h"
+#include "EventNames.h"
#include "EventTarget.h"
#include "ContextDestructionObserver.h"
#include "ExceptionOr.h"
@@ -17,12 +18,18 @@ class MessagePortChannelProviderImpl;
class GlobalScope : public RefCounted<GlobalScope>, public EventTargetWithInlineData {
WTF_MAKE_ISO_ALLOCATED(GlobalScope);
+ uint32_t m_messageEventCount;
+
+ static void onDidChangeListenerImpl(EventTarget&, const AtomString&, OnDidChangeListenerKind);
+
public:
GlobalScope(ScriptExecutionContext* context)
: EventTargetWithInlineData()
, m_context(context)
{
+ this->onDidChangeListener = &onDidChangeListenerImpl;
}
+
using RefCounted::deref;
using RefCounted::ref;
diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp
index 201fc0959..f7998c83c 100644
--- a/src/bun.js/bindings/bindings.cpp
+++ b/src/bun.js/bindings/bindings.cpp
@@ -3901,6 +3901,12 @@ bool JSC__VM__isEntered(JSC__VM* arg0) { return (*arg0).isEntered(); }
void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1) { (*arg0).setExecutionForbidden(); }
+// These may be called concurrently from another thread.
+void JSC__VM__notifyNeedTermination(JSC__VM* arg0) { (*arg0).notifyNeedTermination(); }
+void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0) { (*arg0).notifyNeedDebuggerBreak(); }
+void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0) { (*arg0).notifyNeedShellTimeoutCheck(); }
+void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0) { (*arg0).notifyNeedWatchdogCheck(); }
+
void JSC__VM__throwError(JSC__VM* vm_, JSC__JSGlobalObject* arg1, JSC__JSValue value)
{
JSC::VM& vm = *reinterpret_cast<JSC::VM*>(vm_);
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig
index 0b787ee42..7f081a4b4 100644
--- a/src/bun.js/bindings/bindings.zig
+++ b/src/bun.js/bindings/bindings.zig
@@ -5063,6 +5063,26 @@ pub const VM = extern struct {
});
}
+ // These four functions fire VM traps. To understand what that means, see VMTraps.h for a giant explainer.
+ // These may be called concurrently from another thread.
+
+ /// Fires NeedTermination Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedTermination(vm: *VM) void {
+ cppFn("notifyNeedTermination", .{vm});
+ }
+ /// Fires NeedWatchdogCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedWatchdogCheck(vm: *VM) void {
+ cppFn("notifyNeedWatchdogCheck", .{vm});
+ }
+ /// Fires NeedDebuggerBreak Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedDebuggerBreak(vm: *VM) void {
+ cppFn("notifyNeedDebuggerBreak", .{vm});
+ }
+ /// Fires NeedShellTimeoutCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedShellTimeoutCheck(vm: *VM) void {
+ cppFn("notifyNeedShellTimeoutCheck", .{vm});
+ }
+
pub fn isEntered(vm: *VM) bool {
return cppFn("isEntered", .{
vm,
@@ -5538,7 +5558,6 @@ pub const WTF = struct {
// This is any alignment
WTF__copyLCharsFromUCharSource(destination, source.ptr, source.len);
}
-
};
pub const Callback = struct {
diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h
index 63ae6c3a4..c809ddcee 100644
--- a/src/bun.js/bindings/headers.h
+++ b/src/bun.js/bindings/headers.h
@@ -430,6 +430,10 @@ CPP_DECL void JSC__VM__releaseWeakRefs(JSC__VM* arg0);
CPP_DECL JSC__JSValue JSC__VM__runGC(JSC__VM* arg0, bool arg1);
CPP_DECL void JSC__VM__setControlFlowProfiler(JSC__VM* arg0, bool arg1);
CPP_DECL void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1);
+CPP_DECL void JSC__VM__notifyNeedTermination(JSC__VM* arg0);
+CPP_DECL void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0);
+CPP_DECL void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0);
+CPP_DECL void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0);
CPP_DECL void JSC__VM__setExecutionTimeLimit(JSC__VM* arg0, double arg1);
CPP_DECL void JSC__VM__shrinkFootprint(JSC__VM* arg0);
CPP_DECL void JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue2);
diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig
index 2b25c0f5b..8471cb235 100644
--- a/src/bun.js/bindings/headers.zig
+++ b/src/bun.js/bindings/headers.zig
@@ -324,6 +324,10 @@ pub extern fn JSC__VM__releaseWeakRefs(arg0: *bindings.VM) void;
pub extern fn JSC__VM__runGC(arg0: *bindings.VM, arg1: bool) JSC__JSValue;
pub extern fn JSC__VM__setControlFlowProfiler(arg0: *bindings.VM, arg1: bool) void;
pub extern fn JSC__VM__setExecutionForbidden(arg0: *bindings.VM, arg1: bool) void;
+pub extern fn JSC__VM__notifyNeedTermination(arg0: *bindings.VM) void;
+pub extern fn JSC__VM__notifyNeedDebuggerBreak(arg0: *bindings.VM) void;
+pub extern fn JSC__VM__notifyNeedShellTimeoutCheck(arg0: *bindings.VM) void;
+pub extern fn JSC__VM__notifyNeedWatchdogCheck(arg0: *bindings.VM) void;
pub extern fn JSC__VM__setExecutionTimeLimit(arg0: *bindings.VM, arg1: f64) void;
pub extern fn JSC__VM__shrinkFootprint(arg0: *bindings.VM) void;
pub extern fn JSC__VM__throwError(arg0: *bindings.VM, arg1: *bindings.JSGlobalObject, JSValue2: JSC__JSValue) void;
diff --git a/src/bun.js/bindings/webcore/EventTarget.cpp b/src/bun.js/bindings/webcore/EventTarget.cpp
index 9fb875595..36adac3d3 100644
--- a/src/bun.js/bindings/webcore/EventTarget.cpp
+++ b/src/bun.js/bindings/webcore/EventTarget.cpp
@@ -111,6 +111,9 @@ bool EventTarget::addEventListener(const AtomString& eventType, Ref<EventListene
// invalidateEventListenerRegions();
eventListenersDidChange();
+ if (UNLIKELY(this->onDidChangeListener)) {
+ this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Add);
+ }
return true;
}
@@ -146,6 +149,9 @@ bool EventTarget::removeEventListener(const AtomString& eventType, EventListener
if (eventNames().isWheelEventType(eventType))
invalidateEventListenerRegions();
+ if (UNLIKELY(this->onDidChangeListener)) {
+ this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Remove);
+ }
eventListenersDidChange();
return true;
}
@@ -376,6 +382,11 @@ void EventTarget::removeAllEventListeners()
// if (data->eventListenerMap.contains(eventNames().wheelEvent) || data->eventListenerMap.contains(eventNames().mousewheelEvent))
// invalidateEventListenerRegions();
+ if (UNLIKELY(this->onDidChangeListener)) {
+ for (auto& eventType : data->eventListenerMap.eventTypes()) {
+ this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Clear);
+ }
+ }
data->eventListenerMap.clear();
eventListenersDidChange();
}
diff --git a/src/bun.js/bindings/webcore/EventTarget.h b/src/bun.js/bindings/webcore/EventTarget.h
index b763393d7..f5c4354ee 100644
--- a/src/bun.js/bindings/webcore/EventTarget.h
+++ b/src/bun.js/bindings/webcore/EventTarget.h
@@ -126,6 +126,13 @@ protected:
virtual void eventListenersDidChange() {}
+ enum OnDidChangeListenerKind {
+ Add,
+ Remove,
+ Clear,
+ };
+ WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)> onDidChangeListener = WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)>(nullptr);
+
private:
virtual void refEventTarget() = 0;
virtual void derefEventTarget() = 0;
diff --git a/src/bun.js/bindings/webcore/MessagePort.cpp b/src/bun.js/bindings/webcore/MessagePort.cpp
index 2d94060f1..da2bd32e8 100644
--- a/src/bun.js/bindings/webcore/MessagePort.cpp
+++ b/src/bun.js/bindings/webcore/MessagePort.cpp
@@ -392,10 +392,38 @@ Vector<RefPtr<MessagePort>> MessagePort::entanglePorts(ScriptExecutionContext& c
});
}
+void MessagePort::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind)
+{
+ if (eventType == eventNames().messageEvent) {
+ auto& port = static_cast<MessagePort&>(self);
+ switch (kind) {
+ case Add:
+ if (port.m_messageEventCount == 0) {
+ port.scriptExecutionContext()->refEventLoop();
+ }
+ port.m_messageEventCount++;
+ break;
+ case Remove:
+ port.m_messageEventCount--;
+ if (port.m_messageEventCount == 0) {
+ port.scriptExecutionContext()->unrefEventLoop();
+ }
+ break;
+ case Clear:
+ if (port.m_messageEventCount > 0) {
+ port.scriptExecutionContext()->unrefEventLoop();
+ }
+ port.m_messageEventCount = 0;
+ break;
+ }
+ }
+};
+
Ref<MessagePort> MessagePort::entangle(ScriptExecutionContext& context, TransferredMessagePort&& transferredPort)
{
auto port = MessagePort::create(context, transferredPort.first, transferredPort.second);
port->entangle();
+ port->onDidChangeListener = &MessagePort::onDidChangeListenerImpl;
return port;
}
@@ -406,7 +434,6 @@ bool MessagePort::addEventListener(const AtomString& eventType, Ref<EventListene
start();
m_hasMessageEventListener = true;
}
-
return EventTarget::addEventListener(eventType, WTFMove(listener), options);
}
diff --git a/src/bun.js/bindings/webcore/MessagePort.h b/src/bun.js/bindings/webcore/MessagePort.h
index fe577f93e..d4532433f 100644
--- a/src/bun.js/bindings/webcore/MessagePort.h
+++ b/src/bun.js/bindings/webcore/MessagePort.h
@@ -141,6 +141,9 @@ private:
mutable std::atomic<unsigned> m_refCount { 1 };
bool m_hasRef { false };
+
+ uint32_t m_messageEventCount { 0 };
+ static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
};
WebCoreOpaqueRoot root(MessagePort*);
diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp
index 92503bbbd..2735770c0 100644
--- a/src/bun.js/bindings/webcore/Worker.cpp
+++ b/src/bun.js/bindings/webcore/Worker.cpp
@@ -66,7 +66,7 @@ namespace WebCore {
WTF_MAKE_ISO_ALLOCATED_IMPL(Worker);
-extern "C" void WebWorker__terminate(
+extern "C" void WebWorker__requestTerminate(
void* worker);
static Lock allWorkersLock;
@@ -210,12 +210,9 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
void Worker::terminate()
{
- if (m_wasTerminated) {
- return;
- }
// m_contextProxy.terminateWorkerGlobalScope();
m_wasTerminated = true;
- WebWorker__terminate(impl_);
+ WebWorker__requestTerminate(impl_);
}
// const char* Worker::activeDOMObjectName() const
@@ -259,9 +256,14 @@ bool Worker::hasPendingActivity() const
void Worker::dispatchEvent(Event& event)
{
- if (m_wasTerminated)
- return;
+ if (!m_wasTerminated)
+ EventTargetWithInlineData::dispatchEvent(event);
+}
+// The close event gets dispatched even if m_wasTerminated is true.
+// This allows new wt.Worker().terminate() to actually resolve
+void Worker::dispatchCloseEvent(Event& event)
+{
EventTargetWithInlineData::dispatchEvent(event);
}
@@ -350,11 +352,10 @@ void Worker::dispatchExit(int32_t exitCode)
ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void {
protectedThis->m_isOnline = false;
protectedThis->m_isClosing = true;
- protectedThis->setKeepAlive(false);
if (protectedThis->hasEventListeners(eventNames().closeEvent)) {
auto event = CloseEvent::create(exitCode == 0, static_cast<unsigned short>(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s);
- protectedThis->dispatchEvent(event);
+ protectedThis->dispatchCloseEvent(event);
}
});
}
@@ -388,18 +389,20 @@ extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker*
}
auto& vm = globalObject->vm();
-
+ vm.notifyNeedTermination();
if (JSC::JSObject* obj = JSC::jsDynamicCast<JSC::JSObject*>(globalObject->moduleLoader())) {
auto id = JSC::Identifier::fromString(globalObject->vm(), "registry"_s);
- if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(obj->getIfPropertyExists(globalObject, id))) {
- registry->clear(vm);
+ auto registryValue = obj->getIfPropertyExists(globalObject, id);
+ if (registryValue) {
+ if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(registryValue)) {
+ registry->clear(vm);
+ }
}
}
gcUnprotect(globalObject);
vm.deleteAllCode(JSC::DeleteAllCodeEffort::PreventCollectionAndDeleteAllCode);
vm.heap.reportAbandonedObjectGraph();
WTF::releaseFastMallocFreeMemoryForThisThread();
- vm.notifyNeedTermination();
vm.deferredWorkTimer->doWork(vm);
}
}
diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h
index a296fa4a8..f7641d28d 100644
--- a/src/bun.js/bindings/webcore/Worker.h
+++ b/src/bun.js/bindings/webcore/Worker.h
@@ -76,6 +76,7 @@ public:
const String& name() const { return m_options.name; }
void dispatchEvent(Event&);
+ void dispatchCloseEvent(Event&);
void setKeepAlive(bool);
#if ENABLE(WEB_RTC)
diff --git a/src/bun.js/module_loader.zig b/src/bun.js/module_loader.zig
index 5d9158b58..fc31498b4 100644
--- a/src/bun.js/module_loader.zig
+++ b/src/bun.js/module_loader.zig
@@ -2004,12 +2004,6 @@ pub const ModuleLoader = struct {
} else if (HardcodedModule.Map.getWithEql(specifier, bun.String.eqlComptime)) |hardcoded| {
switch (hardcoded) {
.@"bun:main" => {
- defer {
- if (jsc_vm.worker) |worker| {
- worker.queueInitialTask();
- }
- }
-
return ResolvedSource{
.allocator = null,
.source_code = bun.String.create(jsc_vm.entry_point.source.contents),
diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig
index 02bfe8757..ddb58f0cd 100644
--- a/src/bun.js/node/types.zig
+++ b/src/bun.js/node/types.zig
@@ -2346,7 +2346,7 @@ pub const Process = struct {
var vm = globalObject.bunVM();
if (vm.worker) |worker| {
vm.exit_handler.exit_code = code;
- worker.terminate();
+ worker.requestTerminate();
return;
}
diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig
index 7fa9c6690..16d7aa34c 100644
--- a/src/bun.js/web_worker.zig
+++ b/src/bun.js/web_worker.zig
@@ -5,10 +5,12 @@ const log = Output.scoped(.Worker, true);
const std = @import("std");
const JSValue = JSC.JSValue;
+/// Shared implementation of Web and Node `Worker`
pub const WebWorker = struct {
- // null when haven't started yet
+ /// null when haven't started yet
vm: ?*JSC.VirtualMachine = null,
status: Status = .start,
+ /// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop.
requested_terminate: bool = false,
execution_context_id: u32 = 0,
parent_context_id: u32 = 0,
@@ -20,15 +22,25 @@ pub const WebWorker = struct {
arena: bun.MimallocArena = undefined,
name: [:0]const u8 = "Worker",
cpp_worker: *anyopaque,
- allowed_to_exit: bool = false,
mini: bool = false,
+
+ /// `user_keep_alive` is the state of the user's .ref()/.unref() calls
+ /// if false, then the parent poll will always be unref, otherwise the worker's event loop will keep the poll alive.
+ user_keep_alive: bool = false,
+ worker_event_loop_running: bool = true,
parent_poll_ref: JSC.PollRef = .{},
- initial_poll_ref: JSC.PollRef = .{},
- did_send_initial_task: bool = false,
+
+ pub const Status = enum {
+ start,
+ starting,
+ running,
+ terminated,
+ };
extern fn WebWorker__dispatchExit(?*JSC.JSGlobalObject, *anyopaque, i32) void;
extern fn WebWorker__dispatchOnline(this: *anyopaque, *JSC.JSGlobalObject) void;
extern fn WebWorker__dispatchError(*JSC.JSGlobalObject, *anyopaque, bun.String, JSValue) void;
+
export fn WebWorker__getParentWorker(vm: *JSC.VirtualMachine) ?*anyopaque {
var worker = vm.worker orelse return null;
return worker.cpp_worker;
@@ -43,31 +55,12 @@ pub const WebWorker = struct {
.{worker},
) catch {
worker.deinit();
- worker.parent_poll_ref.unref(worker.parent);
- worker.initial_poll_ref.unref(worker.parent);
- bun.default_allocator.destroy(worker);
return false;
};
thread.detach();
return true;
}
- pub fn hasPendingActivity(this: *WebWorker) callconv(.C) bool {
- JSC.markBinding(@src());
-
- if (this.vm == null) {
- return !this.requested_terminate;
- }
-
- if (!this.allowed_to_exit) {
- return true;
- }
-
- var vm = this.vm.?;
-
- return vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0;
- }
-
pub fn create(
cpp_worker: *void,
parent: *JSC.VirtualMachine,
@@ -80,6 +73,7 @@ pub const WebWorker = struct {
default_unref: bool,
) callconv(.C) ?*WebWorker {
JSC.markBinding(@src());
+ log("[{d}] WebWorker.create", .{this_context_id});
var spec_slice = specifier_str.toUTF8(bun.default_allocator);
defer spec_slice.deinit();
var prev_log = parent.bundler.log;
@@ -115,35 +109,15 @@ pub const WebWorker = struct {
}
break :brk "";
},
+ .user_keep_alive = !default_unref,
+ .worker_event_loop_running = true,
};
- worker.initial_poll_ref.ref(parent);
-
- if (!default_unref) {
- worker.allowed_to_exit = false;
- worker.parent_poll_ref.ref(parent);
- }
+ worker.parent_poll_ref.refConcurrently(parent);
return worker;
}
- pub fn queueInitialTask(this: *WebWorker) void {
- if (this.did_send_initial_task) return;
- this.did_send_initial_task = true;
-
- const Unref = struct {
- pub fn unref(worker: *WebWorker) void {
- worker.initial_poll_ref.unref(worker.parent);
- }
- };
-
- const AnyTask = JSC.AnyTask.New(WebWorker, Unref.unref);
- var any_task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM");
- any_task.* = AnyTask.init(this);
- var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM");
- this.parent.eventLoop().enqueueTaskConcurrent(concurrent_task.from(any_task, .auto_deinit));
- }
-
pub fn startWithErrorHandling(
this: *WebWorker,
) void {
@@ -162,7 +136,6 @@ pub const WebWorker = struct {
}
if (this.requested_terminate) {
- this.queueInitialTask();
this.deinit();
return;
}
@@ -182,12 +155,12 @@ pub const WebWorker = struct {
b.configureRouter(false) catch {
this.flushLogs();
- this.onTerminate();
+ this.exitAndDeinit();
return;
};
b.configureDefines() catch {
this.flushLogs();
- this.onTerminate();
+ this.exitAndDeinit();
return;
};
@@ -202,8 +175,13 @@ pub const WebWorker = struct {
vm.global.vm().holdAPILock(this, callback);
}
+ /// Deinit will clean up vm and everything.
+ /// Early deinit may be called from caller thread, but full vm deinit will only be called within worker's thread.
fn deinit(this: *WebWorker) void {
+ log("[{d}] deinit", .{this.execution_context_id});
+ this.parent_poll_ref.unrefConcurrently(this.parent);
bun.default_allocator.free(this.specifier);
+ bun.default_allocator.destroy(this);
}
fn flushLogs(this: *WebWorker) void {
@@ -250,7 +228,7 @@ pub const WebWorker = struct {
}
fn setStatus(this: *WebWorker, status: Status) void {
- log("status: {s}", .{@tagName(status)});
+ log("[{d}] status: {s}", .{ this.execution_context_id, @tagName(status) });
this.status = status;
}
@@ -265,18 +243,15 @@ pub const WebWorker = struct {
var promise = vm.loadEntryPointForWebWorker(this.specifier) catch {
this.flushLogs();
- this.onTerminate();
+ this.exitAndDeinit();
return;
};
- this.queueInitialTask();
-
if (promise.status(vm.global.vm()) == .Rejected) {
vm.onUnhandledError(vm.global, promise.result(vm.global.vm()));
vm.exit_handler.exit_code = 1;
- this.onTerminate();
-
+ this.exitAndDeinit();
return;
}
@@ -300,89 +275,56 @@ pub const WebWorker = struct {
// always doing a first tick so we call CppTask without delay after dispatchOnline
vm.tick();
- {
- while (true) {
- while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) {
- vm.tick();
-
- vm.eventLoop().autoTickActive();
- }
-
- if (!this.allowed_to_exit) {
- this.flushLogs();
- vm.eventLoop().tickPossiblyForever();
- continue;
- }
-
- vm.onBeforeExit();
-
- if (!this.allowed_to_exit)
- continue;
-
- break;
- }
-
- this.flushLogs();
- this.onTerminate();
+ while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) {
+ vm.tick();
+ if (this.requested_terminate) break;
+ vm.eventLoop().autoTickActive();
+ if (this.requested_terminate) break;
}
- }
-
- pub const Status = enum {
- start,
- starting,
- running,
- terminated,
- };
- pub fn terminate(this: *WebWorker) callconv(.C) void {
- if (this.requested_terminate) {
- return;
+ // Only call "beforeExit" if we weren't from a .terminate
+ if (!this.requested_terminate) {
+ // TODO: is this able to allow the event loop to continue?
+ vm.onBeforeExit();
}
- _ = this.requestTerminate();
+
+ this.flushLogs();
+ this.exitAndDeinit();
}
+ /// This is worker.ref()/.unref() from JS (Caller thread)
pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void {
- if (this.requested_terminate and !value) {
- this.parent_poll_ref.unref(this.parent);
+ if (this.requested_terminate) {
return;
}
-
- this.allowed_to_exit = !value;
- if (this.allowed_to_exit) {
- this.parent_poll_ref.unref(this.parent);
- } else {
+ if (value) {
this.parent_poll_ref.ref(this.parent);
- }
-
- if (this.vm) |vm| {
- vm.eventLoop().wakeup();
- }
- }
-
- fn onTerminate(this: *WebWorker) void {
- log("onTerminate", .{});
-
- this.reallyExit();
- }
-
- comptime {
- if (!JSC.is_bindgen) {
- @export(hasPendingActivity, .{ .name = "WebWorker__hasPendingActivity" });
- @export(create, .{ .name = "WebWorker__create" });
- @export(terminate, .{ .name = "WebWorker__terminate" });
- @export(setRef, .{ .name = "WebWorker__setRef" });
- _ = WebWorker__updatePtr;
+ } else {
+ this.parent_poll_ref.unref(this.parent);
}
}
- fn reallyExit(this: *WebWorker) void {
- JSC.markBinding(@src());
-
+ /// Request a terminate (Called from main thread from worker.terminate(), or inside worker in process.exit())
+ /// The termination will actually happen after the next tick of the worker's loop.
+ pub fn requestTerminate(this: *WebWorker) callconv(.C) void {
if (this.requested_terminate) {
return;
}
+ log("[{d}] requestTerminate", .{this.execution_context_id});
+ this.setRef(false);
this.requested_terminate = true;
+ if (this.vm) |vm| {
+ vm.global.vm().notifyNeedTermination();
+ vm.eventLoop().wakeup();
+ }
+ }
+ /// This handles cleanup, emitting the "close" event, and deinit.
+ /// Only call after the VM is initialized AND on the same thread as the worker.
+ /// Otherwise, call `requestTerminate` to cause the event loop to safely terminate after the next tick.
+ fn exitAndDeinit(this: *WebWorker) void {
+ JSC.markBinding(@src());
+ log("[{d}] exitAndDeinit", .{this.execution_context_id});
var cpp_worker = this.cpp_worker;
var exit_code: i32 = 0;
var globalObject: ?*JSC.JSGlobalObject = null;
@@ -392,24 +334,18 @@ pub const WebWorker = struct {
exit_code = vm.exit_handler.exit_code;
globalObject = vm.global;
this.arena.deinit();
+ vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
}
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
-
this.deinit();
}
- fn requestTerminate(this: *WebWorker) bool {
- this.setRef(false);
- var vm = this.vm orelse {
- this.requested_terminate = true;
- return false;
- };
- this.allowed_to_exit = true;
- log("requesting terminate", .{});
- var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM");
- var task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM");
- task.* = JSC.AnyTask.New(WebWorker, onTerminate).init(this);
- vm.eventLoop().enqueueTaskConcurrent(concurrent_task.from(task, .auto_deinit));
- return true;
+ comptime {
+ if (!JSC.is_bindgen) {
+ @export(create, .{ .name = "WebWorker__create" });
+ @export(requestTerminate, .{ .name = "WebWorker__requestTerminate" });
+ @export(setRef, .{ .name = "WebWorker__setRef" });
+ _ = WebWorker__updatePtr;
+ }
}
};
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index 10f836bbc..5714eb09f 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -771,15 +771,14 @@ pub const Loop = extern struct {
this.active += 1;
}
pub fn refConcurrently(this: *Loop) void {
- log("ref", .{});
_ = @atomicRmw(@TypeOf(this.num_polls), &this.num_polls, .Add, 1, .Monotonic);
_ = @atomicRmw(@TypeOf(this.active), &this.active, .Add, 1, .Monotonic);
+ log("refConcurrently ({d}, {d})", .{ this.num_polls, this.active });
}
-
pub fn unrefConcurrently(this: *Loop) void {
- log("unref", .{});
_ = @atomicRmw(@TypeOf(this.num_polls), &this.num_polls, .Sub, 1, .Monotonic);
_ = @atomicRmw(@TypeOf(this.active), &this.active, .Sub, 1, .Monotonic);
+ log("unrefConcurrently ({d}, {d})", .{ this.num_polls, this.active });
}
pub fn unref(this: *Loop) void {
diff --git a/src/js/README.md b/src/js/README.md
index 8f270ed2a..82acf7c51 100644
--- a/src/js/README.md
+++ b/src/js/README.md
@@ -34,9 +34,9 @@ console.log($getInternalField)
V8 has a [similar feature](https://v8.dev/blog/embedded-builtins) to this syntax (they use `%` instead)
-On top of this, we have some special functions that are handled by the bundle preprocessor:
+On top of this, we have some special functions that are handled by the builtin preprocessor:
-- `require` works, but it must be a string literal that resolves to a module within src/js. This call gets replaced with `$requireId(id)`, which is a special function that skips the module resolver and directly loads the module by it's generated numerical ID.
+- `require` works, but it must be passed a **string literal** that resolves to a module within `src/js`. This call gets replaced with `$getInternalField($internalModuleRegistery, <number>)`, which directly loads the module by it's generated numerical ID, skipping the resolver for inter-internal modules.
- `$debug` is exactly like console.log, but is stripped in release builds. It is disabled by default, requiring you to pass one of: `BUN_DEBUG_MODULE_NAME=1`, `BUN_DEBUG_JS=1`, or `BUN_DEBUG_ALL=1`. You can also do `if($debug) {}` to check if debug env var is set.
@@ -79,10 +79,6 @@ object->putDirectBuiltinFunction(
);
```
-## Extra Features
-
-`require` is replaced with `$requireId(id)` which allows these modules to import each other in a way that skips the module resolver. Being written in a syncronous format also makes this faster than ESM. All calls to `require` must be statically known or else this transformation is not possible.
-
## Building
Run `make js` to bundle all the builtins. The output is placed in `src/js/out/{modules,functions}/`, where these files are loaded dynamically by `bun-debug` (an exact filepath is inlined into the binary pointing at where you cloned bun, so moving the binary to another machine may not work). In a release build, these get minified and inlined into the binary (Please commit those generated headers).
diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts
index 1cc778fd0..f1f15b64e 100644
--- a/src/js/node/worker_threads.ts
+++ b/src/js/node/worker_threads.ts
@@ -204,6 +204,9 @@ const unsupportedOptions = [
class Worker extends EventEmitter {
#worker: WebWorker;
#performance;
+
+ // this is used by wt.Worker.terminate();
+ // either is the exit code if exited, a promise resolving to the exit code, or undefined if we havent sent .terminate() yet
#onExitPromise: Promise<number> | number | undefined = undefined;
constructor(filename: string, options: NodeWorkerOptions = {}) {
@@ -258,8 +261,9 @@ class Worker extends EventEmitter {
}
terminate() {
- if (this.#onExitPromise) {
- return this.#onExitPromise;
+ var onExitPromise = this.#onExitPromise;
+ if (onExitPromise) {
+ return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise);
}
const { resolve, promise } = Promise.withResolvers();
@@ -284,9 +288,8 @@ class Worker extends EventEmitter {
this.emit("exit", e.code);
}
- #onError(event: ErrorEvent) {
- // TODO: is this right?
- this.emit("error", event.error);
+ #onError(error: ErrorEvent) {
+ this.emit("error", error);
}
#onMessage(event: MessageEvent) {
diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h
index f0029f6ed..67730f64d 100644
--- a/src/js/out/InternalModuleRegistryConstants.h
+++ b/src/js/out/InternalModuleRegistryConstants.h
@@ -189,7 +189,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(event){this.emit(\"error\",event.error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){var onExitPromise=this.#onExitPromise;if(onExitPromise)return @isPromise(onExitPromise)\?onExitPromise:Promise.resolve(onExitPromise);const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(error){this.emit(\"error\",error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
@@ -414,7 +414,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(event){this.emit(\"error\",event.error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){var onExitPromise=this.#onExitPromise;if(onExitPromise)return @isPromise(onExitPromise)\?onExitPromise:Promise.resolve(onExitPromise);const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(error){this.emit(\"error\",error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
@@ -640,7 +640,7 @@ static constexpr ASCIILiteral NodeWasiCode = "(function (){\"use strict\";const
//
//
-static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){if(this.#onExitPromise)return this.#onExitPromise;const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(event){this.emit(\"error\",event.error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
+static constexpr ASCIILiteral NodeWorkerThreadsCode = "(function (){\"use strict\";var $;const EventEmitter=@getInternalField(@internalModuleRegistry,15)||@createInternalModuleById(15),{throwNotImplemented}=@getInternalField(@internalModuleRegistry,2)||@createInternalModuleById(2),{MessageChannel,BroadcastChannel,Worker:WebWorker}=globalThis,SHARE_ENV=Symbol(\"nodejs.worker_threads.SHARE_ENV\"),isMainThread=Bun.isMainThread;let[_workerData,_threadId,_receiveMessageOnPort]=globalThis[globalThis.Symbol.for('Bun.lazy')](\"worker_threads\");const emittedWarnings=new Set;function emitWarning(type,message){if(emittedWarnings.has(type))return;emittedWarnings.add(type),console.warn(\"[bun] Warning:\",message)}function injectFakeEmitter(Class){function messageEventHandler(event){return event.data}function errorEventHandler(event){return event.error}const wrappedListener=Symbol(\"wrappedListener\");function wrapped(run,listener){const callback=function(event){return listener(run(event))};return listener[wrappedListener]=callback,callback}function functionForEventType(event,listener){switch(event){case\"error\":case\"messageerror\":return wrapped(errorEventHandler,listener);default:return wrapped(messageEventHandler,listener)}}Class.prototype.on=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener)),this},Class.prototype.off=function(event,listener){if(listener)this.removeEventListener(event,listener[wrappedListener]||listener);else this.removeEventListener(event);return this},Class.prototype.once=function(event,listener){return this.addEventListener(event,functionForEventType(event,listener),{once:!0}),this};function EventClass(eventName){if(eventName===\"error\"||eventName===\"messageerror\")return ErrorEvent;return MessageEvent}Class.prototype.emit=function(event,...args){return this.dispatchEvent(new(EventClass(event))(event,...args)),this},Class.prototype.prependListener=Class.prototype.on,Class.prototype.prependOnceListener=Class.prototype.once}const _MessagePort=globalThis.MessagePort;injectFakeEmitter(_MessagePort);const MessagePort=_MessagePort;let resourceLimits={},workerData=_workerData,threadId=_threadId;function receiveMessageOnPort(port){let res=_receiveMessageOnPort(port);if(!res)return;return{message:res}}function fakeParentPort(){const fake=Object.create(MessagePort.prototype);return Object.defineProperty(fake,\"onmessage\",{get(){return self.onmessage},set(value){self.onmessage=value}}),Object.defineProperty(fake,\"onmessageerror\",{get(){return self.onmessageerror},set(value){}}),Object.defineProperty(fake,\"postMessage\",{value(...args){return self.postMessage(...args)}}),Object.defineProperty(fake,\"close\",{value(){return process.exit(0)}}),Object.defineProperty(fake,\"start\",{value(){}}),Object.defineProperty(fake,\"unref\",{value(){}}),Object.defineProperty(fake,\"ref\",{value(){}}),Object.defineProperty(fake,\"hasRef\",{value(){return!1}}),Object.defineProperty(fake,\"setEncoding\",{value(){}}),Object.defineProperty(fake,\"addEventListener\",{value:self.addEventListener.bind(self)}),Object.defineProperty(fake,\"removeEventListener\",{value:self.removeEventListener.bind(self)}),fake}let parentPort=isMainThread\?null:fakeParentPort();function getEnvironmentData(){return process.env}function setEnvironmentData(env){process.env=env}function markAsUntransferable(){throwNotImplemented(\"worker_threads.markAsUntransferable\")}function moveMessagePortToContext(){throwNotImplemented(\"worker_threads.moveMessagePortToContext\")}const unsupportedOptions=[\"eval\",\"argv\",\"execArgv\",\"stdin\",\"stdout\",\"stderr\",\"trackedUnmanagedFds\",\"resourceLimits\"];class Worker extends EventEmitter{#worker;#performance;#onExitPromise=void 0;constructor(filename,options={}){super();for(let key of unsupportedOptions)if(key in options)emitWarning(\"option.\"+key,`worker_threads.Worker option \"${key}\" is not implemented.`);this.#worker=new WebWorker(filename,options),this.#worker.addEventListener(\"close\",this.#onClose.bind(this)),this.#worker.addEventListener(\"error\",this.#onError.bind(this)),this.#worker.addEventListener(\"message\",this.#onMessage.bind(this)),this.#worker.addEventListener(\"messageerror\",this.#onMessageError.bind(this)),this.#worker.addEventListener(\"open\",this.#onOpen.bind(this))}ref(){this.#worker.ref()}unref(){this.#worker.unref()}get stdin(){return null}get stdout(){return null}get stderr(){return null}get performance(){return this.#performance\?\?={eventLoopUtilization(){return emitWarning(\"performance\",\"worker_threads.Worker.performance is not implemented.\"),{idle:0,active:0,utilization:0}}}}terminate(){var onExitPromise=this.#onExitPromise;if(onExitPromise)return @isPromise(onExitPromise)\?onExitPromise:Promise.resolve(onExitPromise);const{resolve,promise}=Promise.withResolvers();return this.#worker.addEventListener(\"close\",(event)=>{resolve(event.code)},{once:!0}),this.#worker.terminate(),this.#onExitPromise=promise}postMessage(...args){return this.#worker.postMessage(...args)}#onClose(e){this.#onExitPromise=e.code,this.emit(\"exit\",e.code)}#onError(error){this.emit(\"error\",error)}#onMessage(event){this.emit(\"message\",event.data)}#onMessageError(event){this.emit(\"messageerror\",event.error\?\?event.data\?\?event)}#onOpen(){this.emit(\"online\")}async getHeapSnapshot(){throwNotImplemented(\"worker_threads.Worker.getHeapSnapshot\")}}return $={Worker,workerData,parentPort,resourceLimits,isMainThread,MessageChannel,BroadcastChannel,MessagePort,getEnvironmentData,setEnvironmentData,getHeapSnapshot(){return{}},markAsUntransferable,moveMessagePortToContext,receiveMessageOnPort,SHARE_ENV,threadId},$})\n"_s;
//
//
diff --git a/test/bun.lockb b/test/bun.lockb
index b717f5d1a..a4e2b2fdb 100755
--- a/test/bun.lockb
+++ b/test/bun.lockb
Binary files differ
diff --git a/test/js/bun/util/main-worker-file.js b/test/js/bun/util/main-worker-file.js
index 80f83ba30..2d5c3aedd 100644
--- a/test/js/bun/util/main-worker-file.js
+++ b/test/js/bun/util/main-worker-file.js
@@ -11,5 +11,6 @@ if (isMainThread) {
});
await promise;
+
worker.terminate();
}
diff --git a/test/js/third_party/esbuild/esbuild-test.js b/test/js/third_party/esbuild/esbuild-test.js
index 09152246b..e0ce6f497 100644
--- a/test/js/third_party/esbuild/esbuild-test.js
+++ b/test/js/third_party/esbuild/esbuild-test.js
@@ -82,5 +82,3 @@ import { build, buildSync, transform, transformSync } from "esbuild";
throw new Error("Test failed.");
}
}
-
-process.exit(0);
diff --git a/test/js/web/many-messages-event-loop.js b/test/js/web/many-messages-event-loop.js
deleted file mode 100644
index 2eaba2568..000000000
--- a/test/js/web/many-messages-event-loop.js
+++ /dev/null
@@ -1,11 +0,0 @@
-const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href);
-
-worker.postMessage("initial message");
-worker.addEventListener("message", ({ data }) => {
- if (data.done) {
- console.log("done");
- worker.terminate();
- } else {
- worker.postMessage({ i: data.i + 1 });
- }
-});
diff --git a/test/js/web/many-messages-event-loop.mjs b/test/js/web/many-messages-event-loop.mjs
new file mode 100644
index 000000000..deae5f791
--- /dev/null
+++ b/test/js/web/many-messages-event-loop.mjs
@@ -0,0 +1,11 @@
+const worker = new Worker(new URL(process.argv[2], import.meta.url));
+
+worker.postMessage("initial message");
+worker.addEventListener("message", function fn({ data }) {
+ if (data.done) {
+ console.log("done");
+ worker.removeEventListener("message", fn);
+ } else {
+ worker.postMessage({ i: data.i + 1 });
+ }
+});
diff --git a/test/js/web/worker-fixture-hang.js b/test/js/web/worker-fixture-hang.js
new file mode 100644
index 000000000..4db3f65f8
--- /dev/null
+++ b/test/js/web/worker-fixture-hang.js
@@ -0,0 +1,3 @@
+setTimeout(() => {
+ process.exit(2);
+}, 1000000);
diff --git a/test/js/web/worker-fixture-many-messages.js b/test/js/web/worker-fixture-many-messages.js
index 7a8f1d910..5bc4442a9 100644
--- a/test/js/web/worker-fixture-many-messages.js
+++ b/test/js/web/worker-fixture-many-messages.js
@@ -1,11 +1,11 @@
-addEventListener("message", e => {
- const data = e.data;
+addEventListener("message", function fn({ data }) {
// console.log("worker", data);
if (data === "initial message") {
postMessage({ i: 0 });
} else if (data.i > 50) {
postMessage({ done: true });
+ removeEventListener("message", fn);
} else {
postMessage({ i: data.i + 1 });
}
diff --git a/test/js/web/worker-fixture-many-messages2.js b/test/js/web/worker-fixture-many-messages2.js
new file mode 100644
index 000000000..3c371b156
--- /dev/null
+++ b/test/js/web/worker-fixture-many-messages2.js
@@ -0,0 +1,12 @@
+onmessage = ({ data }) => {
+ // console.log("worker", data);
+
+ if (data === "initial message") {
+ postMessage({ i: 0 });
+ } else if (data.i > 50) {
+ postMessage({ done: true });
+ onmessage = null;
+ } else {
+ postMessage({ i: data.i + 1 });
+ }
+};
diff --git a/test/js/web/worker-fixture-process-exit.js b/test/js/web/worker-fixture-process-exit.js
index 0d93217d9..05746973b 100644
--- a/test/js/web/worker-fixture-process-exit.js
+++ b/test/js/web/worker-fixture-process-exit.js
@@ -1,3 +1,3 @@
setTimeout(() => {
process.exit(2);
-}, 100);
+}, 10);
diff --git a/test/js/web/worker-fixture-while-true.js b/test/js/web/worker-fixture-while-true.js
new file mode 100644
index 000000000..ca6233960
--- /dev/null
+++ b/test/js/web/worker-fixture-while-true.js
@@ -0,0 +1,4 @@
+let i = 0;
+while (true) {
+ postMessage({ i: i++ });
+}
diff --git a/test/js/web/worker.test.ts b/test/js/web/worker.test.ts
index 34d5f6f06..e1ab80487 100644
--- a/test/js/web/worker.test.ts
+++ b/test/js/web/worker.test.ts
@@ -98,9 +98,37 @@ test("sending 50 messages should just work", done => {
});
});
-test("worker by default will not close the event loop", done => {
+test("worker with event listeners doesnt close event loop", done => {
const x = Bun.spawn({
- cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.js")],
+ cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.mjs"), "worker-fixture-many-messages.js"],
+ env: bunEnv,
+ stdio: ["inherit", "pipe", "inherit"],
+ });
+
+ const timer = setTimeout(() => {
+ x.kill();
+ done(new Error("timeout"));
+ }, 1000);
+
+ x.exited.then(async code => {
+ clearTimeout(timer);
+ if (code !== 0) {
+ done(new Error("exited with non-zero code"));
+ } else {
+ const text = await new Response(x.stdout).text();
+ if (!text.includes("done")) {
+ console.log({ text });
+ done(new Error("event loop killed early"));
+ } else {
+ done();
+ }
+ }
+ });
+});
+
+test("worker with event listeners doesnt close event loop 2", done => {
+ const x = Bun.spawn({
+ cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.mjs"), "worker-fixture-many-messages2.js"],
env: bunEnv,
stdio: ["inherit", "pipe", "inherit"],
});
@@ -146,7 +174,6 @@ test("worker_threads with process.exit", done => {
});
worker.on("exit", event => {
try {
- console.log({ event });
expect(event).toBe(2);
} catch (e) {
done(e);
@@ -155,19 +182,28 @@ test("worker_threads with process.exit", done => {
});
});
-test.skip("worker_threads with process.exit and terminate", async () => {
- const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
+test("worker_threads terminate", async () => {
+ const worker = new wt.Worker(new URL("worker-fixture-hang.js", import.meta.url).href, {
smol: true,
});
const code = await worker.terminate();
- expect(code).toBe(2);
+ expect(code).toBe(0);
});
-test.skip("worker_threads with process.exit (delay) and terminate", async () => {
+test("worker_threads with process.exit (delay) and terminate", async () => {
const worker2 = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
smol: true,
});
- await Bun.sleep(100);
+ await Bun.sleep(200);
const code2 = await worker2.terminate();
expect(code2).toBe(2);
});
+
+test.skip("terminating forcefully properly interrupts", async () => {
+ const worker2 = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {});
+ await new Promise<void>(done => {
+ worker2.on("message", () => done());
+ });
+ const code2 = await worker2.terminate();
+ expect(code2).toBe(0);
+});