aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
authorGravatar dave caruso <me@paperdave.net> 2023-08-12 13:51:03 -0700
committerGravatar GitHub <noreply@github.com> 2023-08-12 13:51:03 -0700
commit78defe7a87226b5b10766e24fae458a62811dab2 (patch)
tree6f81506a5556ec42c3bfc0b6333ada390bf92d1a /src/bun.js
parentb94433ce86017dccb2e13070dcba57c11421c3ce (diff)
downloadbun-78defe7a87226b5b10766e24fae458a62811dab2.tar.gz
bun-78defe7a87226b5b10766e24fae458a62811dab2.tar.zst
bun-78defe7a87226b5b10766e24fae458a62811dab2.zip
Fix worker event loop ref/unref + leak (#4114)
* make more tests pass * worker changes * fix some bugs * remove this * progress * uh * okay * remove console log * a * comment assert for later * mergable state * remove test * remove test
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/base.zig30
-rw-r--r--src/bun.js/bindings/BunWorkerGlobalScope.cpp30
-rw-r--r--src/bun.js/bindings/BunWorkerGlobalScope.h7
-rw-r--r--src/bun.js/bindings/bindings.cpp6
-rw-r--r--src/bun.js/bindings/bindings.zig21
-rw-r--r--src/bun.js/bindings/headers.h4
-rw-r--r--src/bun.js/bindings/headers.zig4
-rw-r--r--src/bun.js/bindings/webcore/EventTarget.cpp11
-rw-r--r--src/bun.js/bindings/webcore/EventTarget.h7
-rw-r--r--src/bun.js/bindings/webcore/MessagePort.cpp29
-rw-r--r--src/bun.js/bindings/webcore/MessagePort.h3
-rw-r--r--src/bun.js/bindings/webcore/Worker.cpp29
-rw-r--r--src/bun.js/bindings/webcore/Worker.h1
-rw-r--r--src/bun.js/module_loader.zig6
-rw-r--r--src/bun.js/node/types.zig2
-rw-r--r--src/bun.js/web_worker.zig206
16 files changed, 239 insertions, 157 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index c964c1d95..27f40eeab 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -2122,6 +2122,11 @@ pub export fn MarkedArrayBuffer_deallocator(bytes_: *anyopaque, _: *anyopaque) v
// zig's memory allocator interface won't work here
// mimalloc knows the size of things
// but we don't
+ // if (comptime Environment.allow_assert) {
+ // std.debug.assert(mimalloc.mi_check_owned(bytes_) or
+ // mimalloc.mi_heap_check_owned(JSC.VirtualMachine.get().arena.heap.?, bytes_));
+ // }
+
mimalloc.mi_free(bytes_);
}
@@ -3285,6 +3290,15 @@ pub const PollRef = struct {
this.status = .inactive;
vm.uws_event_loop.?.unref();
}
+
+ /// From another thread, Prevent a poll from keeping the process alive.
+ pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
+ if (this.status != .active)
+ return;
+ this.status = .inactive;
+ vm.uws_event_loop.?.unrefConcurrently();
+ }
+
/// Prevent a poll from keeping the process alive on the next tick.
pub fn unrefOnNextTick(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
@@ -3293,6 +3307,14 @@ pub const PollRef = struct {
vm.pending_unref_counter +|= 1;
}
+ /// From another thread, prevent a poll from keeping the process alive on the next tick.
+ pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
+ if (this.status != .active)
+ return;
+ this.status = .inactive;
+ _ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic);
+ }
+
/// Allow a poll to keep the process alive.
pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .inactive)
@@ -3300,6 +3322,14 @@ pub const PollRef = struct {
this.status = .active;
vm.uws_event_loop.?.ref();
}
+
+ /// Allow a poll to keep the process alive.
+ pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
+ if (this.status != .inactive)
+ return;
+ this.status = .active;
+ vm.uws_event_loop.?.refConcurrently();
+ }
};
const KQueueGenerationNumber = if (Environment.isMac and Environment.allow_assert) usize else u0;
diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.cpp b/src/bun.js/bindings/BunWorkerGlobalScope.cpp
index ef1f70fdf..f78111633 100644
--- a/src/bun.js/bindings/BunWorkerGlobalScope.cpp
+++ b/src/bun.js/bindings/BunWorkerGlobalScope.cpp
@@ -11,4 +11,34 @@ MessagePortChannelProvider& GlobalScope::messagePortChannelProvider()
{
return *reinterpret_cast<MessagePortChannelProvider*>(&MessagePortChannelProviderImpl::singleton());
}
+
+void GlobalScope::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind)
+{
+ if (eventType == eventNames().messageEvent) {
+ auto& global = static_cast<GlobalScope&>(self);
+ switch (kind) {
+ case Add:
+ if (global.m_messageEventCount == 0) {
+ global.scriptExecutionContext()->refEventLoop();
+ }
+ global.m_messageEventCount++;
+ break;
+ case Remove:
+ global.m_messageEventCount--;
+ if (global.m_messageEventCount == 0) {
+ global.scriptExecutionContext()->unrefEventLoop();
+ }
+ break;
+ // I dont think clear in this context is ever called. If it is (search OnDidChangeListenerKind::Clear for the impl),
+ // it may actually call once per event, in a way the Remove code above would suffice.
+ case Clear:
+ if (global.m_messageEventCount > 0) {
+ global.scriptExecutionContext()->unrefEventLoop();
+ }
+ global.m_messageEventCount = 0;
+ break;
+ }
+ }
+};
+
} \ No newline at end of file
diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.h b/src/bun.js/bindings/BunWorkerGlobalScope.h
index fff50d6ec..f8e0be52e 100644
--- a/src/bun.js/bindings/BunWorkerGlobalScope.h
+++ b/src/bun.js/bindings/BunWorkerGlobalScope.h
@@ -2,6 +2,7 @@
#include "root.h"
+#include "EventNames.h"
#include "EventTarget.h"
#include "ContextDestructionObserver.h"
#include "ExceptionOr.h"
@@ -17,12 +18,18 @@ class MessagePortChannelProviderImpl;
class GlobalScope : public RefCounted<GlobalScope>, public EventTargetWithInlineData {
WTF_MAKE_ISO_ALLOCATED(GlobalScope);
+ uint32_t m_messageEventCount;
+
+ static void onDidChangeListenerImpl(EventTarget&, const AtomString&, OnDidChangeListenerKind);
+
public:
GlobalScope(ScriptExecutionContext* context)
: EventTargetWithInlineData()
, m_context(context)
{
+ this->onDidChangeListener = &onDidChangeListenerImpl;
}
+
using RefCounted::deref;
using RefCounted::ref;
diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp
index 201fc0959..f7998c83c 100644
--- a/src/bun.js/bindings/bindings.cpp
+++ b/src/bun.js/bindings/bindings.cpp
@@ -3901,6 +3901,12 @@ bool JSC__VM__isEntered(JSC__VM* arg0) { return (*arg0).isEntered(); }
void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1) { (*arg0).setExecutionForbidden(); }
+// These may be called concurrently from another thread.
+void JSC__VM__notifyNeedTermination(JSC__VM* arg0) { (*arg0).notifyNeedTermination(); }
+void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0) { (*arg0).notifyNeedDebuggerBreak(); }
+void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0) { (*arg0).notifyNeedShellTimeoutCheck(); }
+void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0) { (*arg0).notifyNeedWatchdogCheck(); }
+
void JSC__VM__throwError(JSC__VM* vm_, JSC__JSGlobalObject* arg1, JSC__JSValue value)
{
JSC::VM& vm = *reinterpret_cast<JSC::VM*>(vm_);
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig
index 0b787ee42..7f081a4b4 100644
--- a/src/bun.js/bindings/bindings.zig
+++ b/src/bun.js/bindings/bindings.zig
@@ -5063,6 +5063,26 @@ pub const VM = extern struct {
});
}
+ // These four functions fire VM traps. To understand what that means, see VMTraps.h for a giant explainer.
+ // These may be called concurrently from another thread.
+
+ /// Fires NeedTermination Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedTermination(vm: *VM) void {
+ cppFn("notifyNeedTermination", .{vm});
+ }
+ /// Fires NeedWatchdogCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedWatchdogCheck(vm: *VM) void {
+ cppFn("notifyNeedWatchdogCheck", .{vm});
+ }
+ /// Fires NeedDebuggerBreak Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedDebuggerBreak(vm: *VM) void {
+ cppFn("notifyNeedDebuggerBreak", .{vm});
+ }
+ /// Fires NeedShellTimeoutCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps.
+ pub fn notifyNeedShellTimeoutCheck(vm: *VM) void {
+ cppFn("notifyNeedShellTimeoutCheck", .{vm});
+ }
+
pub fn isEntered(vm: *VM) bool {
return cppFn("isEntered", .{
vm,
@@ -5538,7 +5558,6 @@ pub const WTF = struct {
// This is any alignment
WTF__copyLCharsFromUCharSource(destination, source.ptr, source.len);
}
-
};
pub const Callback = struct {
diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h
index 63ae6c3a4..c809ddcee 100644
--- a/src/bun.js/bindings/headers.h
+++ b/src/bun.js/bindings/headers.h
@@ -430,6 +430,10 @@ CPP_DECL void JSC__VM__releaseWeakRefs(JSC__VM* arg0);
CPP_DECL JSC__JSValue JSC__VM__runGC(JSC__VM* arg0, bool arg1);
CPP_DECL void JSC__VM__setControlFlowProfiler(JSC__VM* arg0, bool arg1);
CPP_DECL void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1);
+CPP_DECL void JSC__VM__notifyNeedTermination(JSC__VM* arg0);
+CPP_DECL void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0);
+CPP_DECL void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0);
+CPP_DECL void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0);
CPP_DECL void JSC__VM__setExecutionTimeLimit(JSC__VM* arg0, double arg1);
CPP_DECL void JSC__VM__shrinkFootprint(JSC__VM* arg0);
CPP_DECL void JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue2);
diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig
index 2b25c0f5b..8471cb235 100644
--- a/src/bun.js/bindings/headers.zig
+++ b/src/bun.js/bindings/headers.zig
@@ -324,6 +324,10 @@ pub extern fn JSC__VM__releaseWeakRefs(arg0: *bindings.VM) void;
pub extern fn JSC__VM__runGC(arg0: *bindings.VM, arg1: bool) JSC__JSValue;
pub extern fn JSC__VM__setControlFlowProfiler(arg0: *bindings.VM, arg1: bool) void;
pub extern fn JSC__VM__setExecutionForbidden(arg0: *bindings.VM, arg1: bool) void;
+pub extern fn JSC__VM__notifyNeedTermination(arg0: *bindings.VM) void;
+pub extern fn JSC__VM__notifyNeedDebuggerBreak(arg0: *bindings.VM) void;
+pub extern fn JSC__VM__notifyNeedShellTimeoutCheck(arg0: *bindings.VM) void;
+pub extern fn JSC__VM__notifyNeedWatchdogCheck(arg0: *bindings.VM) void;
pub extern fn JSC__VM__setExecutionTimeLimit(arg0: *bindings.VM, arg1: f64) void;
pub extern fn JSC__VM__shrinkFootprint(arg0: *bindings.VM) void;
pub extern fn JSC__VM__throwError(arg0: *bindings.VM, arg1: *bindings.JSGlobalObject, JSValue2: JSC__JSValue) void;
diff --git a/src/bun.js/bindings/webcore/EventTarget.cpp b/src/bun.js/bindings/webcore/EventTarget.cpp
index 9fb875595..36adac3d3 100644
--- a/src/bun.js/bindings/webcore/EventTarget.cpp
+++ b/src/bun.js/bindings/webcore/EventTarget.cpp
@@ -111,6 +111,9 @@ bool EventTarget::addEventListener(const AtomString& eventType, Ref<EventListene
// invalidateEventListenerRegions();
eventListenersDidChange();
+ if (UNLIKELY(this->onDidChangeListener)) {
+ this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Add);
+ }
return true;
}
@@ -146,6 +149,9 @@ bool EventTarget::removeEventListener(const AtomString& eventType, EventListener
if (eventNames().isWheelEventType(eventType))
invalidateEventListenerRegions();
+ if (UNLIKELY(this->onDidChangeListener)) {
+ this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Remove);
+ }
eventListenersDidChange();
return true;
}
@@ -376,6 +382,11 @@ void EventTarget::removeAllEventListeners()
// if (data->eventListenerMap.contains(eventNames().wheelEvent) || data->eventListenerMap.contains(eventNames().mousewheelEvent))
// invalidateEventListenerRegions();
+ if (UNLIKELY(this->onDidChangeListener)) {
+ for (auto& eventType : data->eventListenerMap.eventTypes()) {
+ this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Clear);
+ }
+ }
data->eventListenerMap.clear();
eventListenersDidChange();
}
diff --git a/src/bun.js/bindings/webcore/EventTarget.h b/src/bun.js/bindings/webcore/EventTarget.h
index b763393d7..f5c4354ee 100644
--- a/src/bun.js/bindings/webcore/EventTarget.h
+++ b/src/bun.js/bindings/webcore/EventTarget.h
@@ -126,6 +126,13 @@ protected:
virtual void eventListenersDidChange() {}
+ enum OnDidChangeListenerKind {
+ Add,
+ Remove,
+ Clear,
+ };
+ WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)> onDidChangeListener = WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)>(nullptr);
+
private:
virtual void refEventTarget() = 0;
virtual void derefEventTarget() = 0;
diff --git a/src/bun.js/bindings/webcore/MessagePort.cpp b/src/bun.js/bindings/webcore/MessagePort.cpp
index 2d94060f1..da2bd32e8 100644
--- a/src/bun.js/bindings/webcore/MessagePort.cpp
+++ b/src/bun.js/bindings/webcore/MessagePort.cpp
@@ -392,10 +392,38 @@ Vector<RefPtr<MessagePort>> MessagePort::entanglePorts(ScriptExecutionContext& c
});
}
+void MessagePort::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind)
+{
+ if (eventType == eventNames().messageEvent) {
+ auto& port = static_cast<MessagePort&>(self);
+ switch (kind) {
+ case Add:
+ if (port.m_messageEventCount == 0) {
+ port.scriptExecutionContext()->refEventLoop();
+ }
+ port.m_messageEventCount++;
+ break;
+ case Remove:
+ port.m_messageEventCount--;
+ if (port.m_messageEventCount == 0) {
+ port.scriptExecutionContext()->unrefEventLoop();
+ }
+ break;
+ case Clear:
+ if (port.m_messageEventCount > 0) {
+ port.scriptExecutionContext()->unrefEventLoop();
+ }
+ port.m_messageEventCount = 0;
+ break;
+ }
+ }
+};
+
Ref<MessagePort> MessagePort::entangle(ScriptExecutionContext& context, TransferredMessagePort&& transferredPort)
{
auto port = MessagePort::create(context, transferredPort.first, transferredPort.second);
port->entangle();
+ port->onDidChangeListener = &MessagePort::onDidChangeListenerImpl;
return port;
}
@@ -406,7 +434,6 @@ bool MessagePort::addEventListener(const AtomString& eventType, Ref<EventListene
start();
m_hasMessageEventListener = true;
}
-
return EventTarget::addEventListener(eventType, WTFMove(listener), options);
}
diff --git a/src/bun.js/bindings/webcore/MessagePort.h b/src/bun.js/bindings/webcore/MessagePort.h
index fe577f93e..d4532433f 100644
--- a/src/bun.js/bindings/webcore/MessagePort.h
+++ b/src/bun.js/bindings/webcore/MessagePort.h
@@ -141,6 +141,9 @@ private:
mutable std::atomic<unsigned> m_refCount { 1 };
bool m_hasRef { false };
+
+ uint32_t m_messageEventCount { 0 };
+ static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
};
WebCoreOpaqueRoot root(MessagePort*);
diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp
index 92503bbbd..2735770c0 100644
--- a/src/bun.js/bindings/webcore/Worker.cpp
+++ b/src/bun.js/bindings/webcore/Worker.cpp
@@ -66,7 +66,7 @@ namespace WebCore {
WTF_MAKE_ISO_ALLOCATED_IMPL(Worker);
-extern "C" void WebWorker__terminate(
+extern "C" void WebWorker__requestTerminate(
void* worker);
static Lock allWorkersLock;
@@ -210,12 +210,9 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
void Worker::terminate()
{
- if (m_wasTerminated) {
- return;
- }
// m_contextProxy.terminateWorkerGlobalScope();
m_wasTerminated = true;
- WebWorker__terminate(impl_);
+ WebWorker__requestTerminate(impl_);
}
// const char* Worker::activeDOMObjectName() const
@@ -259,9 +256,14 @@ bool Worker::hasPendingActivity() const
void Worker::dispatchEvent(Event& event)
{
- if (m_wasTerminated)
- return;
+ if (!m_wasTerminated)
+ EventTargetWithInlineData::dispatchEvent(event);
+}
+// The close event gets dispatched even if m_wasTerminated is true.
+// This allows new wt.Worker().terminate() to actually resolve
+void Worker::dispatchCloseEvent(Event& event)
+{
EventTargetWithInlineData::dispatchEvent(event);
}
@@ -350,11 +352,10 @@ void Worker::dispatchExit(int32_t exitCode)
ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void {
protectedThis->m_isOnline = false;
protectedThis->m_isClosing = true;
- protectedThis->setKeepAlive(false);
if (protectedThis->hasEventListeners(eventNames().closeEvent)) {
auto event = CloseEvent::create(exitCode == 0, static_cast<unsigned short>(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s);
- protectedThis->dispatchEvent(event);
+ protectedThis->dispatchCloseEvent(event);
}
});
}
@@ -388,18 +389,20 @@ extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker*
}
auto& vm = globalObject->vm();
-
+ vm.notifyNeedTermination();
if (JSC::JSObject* obj = JSC::jsDynamicCast<JSC::JSObject*>(globalObject->moduleLoader())) {
auto id = JSC::Identifier::fromString(globalObject->vm(), "registry"_s);
- if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(obj->getIfPropertyExists(globalObject, id))) {
- registry->clear(vm);
+ auto registryValue = obj->getIfPropertyExists(globalObject, id);
+ if (registryValue) {
+ if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(registryValue)) {
+ registry->clear(vm);
+ }
}
}
gcUnprotect(globalObject);
vm.deleteAllCode(JSC::DeleteAllCodeEffort::PreventCollectionAndDeleteAllCode);
vm.heap.reportAbandonedObjectGraph();
WTF::releaseFastMallocFreeMemoryForThisThread();
- vm.notifyNeedTermination();
vm.deferredWorkTimer->doWork(vm);
}
}
diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h
index a296fa4a8..f7641d28d 100644
--- a/src/bun.js/bindings/webcore/Worker.h
+++ b/src/bun.js/bindings/webcore/Worker.h
@@ -76,6 +76,7 @@ public:
const String& name() const { return m_options.name; }
void dispatchEvent(Event&);
+ void dispatchCloseEvent(Event&);
void setKeepAlive(bool);
#if ENABLE(WEB_RTC)
diff --git a/src/bun.js/module_loader.zig b/src/bun.js/module_loader.zig
index 5d9158b58..fc31498b4 100644
--- a/src/bun.js/module_loader.zig
+++ b/src/bun.js/module_loader.zig
@@ -2004,12 +2004,6 @@ pub const ModuleLoader = struct {
} else if (HardcodedModule.Map.getWithEql(specifier, bun.String.eqlComptime)) |hardcoded| {
switch (hardcoded) {
.@"bun:main" => {
- defer {
- if (jsc_vm.worker) |worker| {
- worker.queueInitialTask();
- }
- }
-
return ResolvedSource{
.allocator = null,
.source_code = bun.String.create(jsc_vm.entry_point.source.contents),
diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig
index 02bfe8757..ddb58f0cd 100644
--- a/src/bun.js/node/types.zig
+++ b/src/bun.js/node/types.zig
@@ -2346,7 +2346,7 @@ pub const Process = struct {
var vm = globalObject.bunVM();
if (vm.worker) |worker| {
vm.exit_handler.exit_code = code;
- worker.terminate();
+ worker.requestTerminate();
return;
}
diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig
index 7fa9c6690..16d7aa34c 100644
--- a/src/bun.js/web_worker.zig
+++ b/src/bun.js/web_worker.zig
@@ -5,10 +5,12 @@ const log = Output.scoped(.Worker, true);
const std = @import("std");
const JSValue = JSC.JSValue;
+/// Shared implementation of Web and Node `Worker`
pub const WebWorker = struct {
- // null when haven't started yet
+ /// null when haven't started yet
vm: ?*JSC.VirtualMachine = null,
status: Status = .start,
+ /// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop.
requested_terminate: bool = false,
execution_context_id: u32 = 0,
parent_context_id: u32 = 0,
@@ -20,15 +22,25 @@ pub const WebWorker = struct {
arena: bun.MimallocArena = undefined,
name: [:0]const u8 = "Worker",
cpp_worker: *anyopaque,
- allowed_to_exit: bool = false,
mini: bool = false,
+
+ /// `user_keep_alive` is the state of the user's .ref()/.unref() calls
+ /// if false, then the parent poll will always be unref, otherwise the worker's event loop will keep the poll alive.
+ user_keep_alive: bool = false,
+ worker_event_loop_running: bool = true,
parent_poll_ref: JSC.PollRef = .{},
- initial_poll_ref: JSC.PollRef = .{},
- did_send_initial_task: bool = false,
+
+ pub const Status = enum {
+ start,
+ starting,
+ running,
+ terminated,
+ };
extern fn WebWorker__dispatchExit(?*JSC.JSGlobalObject, *anyopaque, i32) void;
extern fn WebWorker__dispatchOnline(this: *anyopaque, *JSC.JSGlobalObject) void;
extern fn WebWorker__dispatchError(*JSC.JSGlobalObject, *anyopaque, bun.String, JSValue) void;
+
export fn WebWorker__getParentWorker(vm: *JSC.VirtualMachine) ?*anyopaque {
var worker = vm.worker orelse return null;
return worker.cpp_worker;
@@ -43,31 +55,12 @@ pub const WebWorker = struct {
.{worker},
) catch {
worker.deinit();
- worker.parent_poll_ref.unref(worker.parent);
- worker.initial_poll_ref.unref(worker.parent);
- bun.default_allocator.destroy(worker);
return false;
};
thread.detach();
return true;
}
- pub fn hasPendingActivity(this: *WebWorker) callconv(.C) bool {
- JSC.markBinding(@src());
-
- if (this.vm == null) {
- return !this.requested_terminate;
- }
-
- if (!this.allowed_to_exit) {
- return true;
- }
-
- var vm = this.vm.?;
-
- return vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0;
- }
-
pub fn create(
cpp_worker: *void,
parent: *JSC.VirtualMachine,
@@ -80,6 +73,7 @@ pub const WebWorker = struct {
default_unref: bool,
) callconv(.C) ?*WebWorker {
JSC.markBinding(@src());
+ log("[{d}] WebWorker.create", .{this_context_id});
var spec_slice = specifier_str.toUTF8(bun.default_allocator);
defer spec_slice.deinit();
var prev_log = parent.bundler.log;
@@ -115,35 +109,15 @@ pub const WebWorker = struct {
}
break :brk "";
},
+ .user_keep_alive = !default_unref,
+ .worker_event_loop_running = true,
};
- worker.initial_poll_ref.ref(parent);
-
- if (!default_unref) {
- worker.allowed_to_exit = false;
- worker.parent_poll_ref.ref(parent);
- }
+ worker.parent_poll_ref.refConcurrently(parent);
return worker;
}
- pub fn queueInitialTask(this: *WebWorker) void {
- if (this.did_send_initial_task) return;
- this.did_send_initial_task = true;
-
- const Unref = struct {
- pub fn unref(worker: *WebWorker) void {
- worker.initial_poll_ref.unref(worker.parent);
- }
- };
-
- const AnyTask = JSC.AnyTask.New(WebWorker, Unref.unref);
- var any_task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM");
- any_task.* = AnyTask.init(this);
- var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM");
- this.parent.eventLoop().enqueueTaskConcurrent(concurrent_task.from(any_task, .auto_deinit));
- }
-
pub fn startWithErrorHandling(
this: *WebWorker,
) void {
@@ -162,7 +136,6 @@ pub const WebWorker = struct {
}
if (this.requested_terminate) {
- this.queueInitialTask();
this.deinit();
return;
}
@@ -182,12 +155,12 @@ pub const WebWorker = struct {
b.configureRouter(false) catch {
this.flushLogs();
- this.onTerminate();
+ this.exitAndDeinit();
return;
};
b.configureDefines() catch {
this.flushLogs();
- this.onTerminate();
+ this.exitAndDeinit();
return;
};
@@ -202,8 +175,13 @@ pub const WebWorker = struct {
vm.global.vm().holdAPILock(this, callback);
}
+ /// Deinit will clean up vm and everything.
+ /// Early deinit may be called from caller thread, but full vm deinit will only be called within worker's thread.
fn deinit(this: *WebWorker) void {
+ log("[{d}] deinit", .{this.execution_context_id});
+ this.parent_poll_ref.unrefConcurrently(this.parent);
bun.default_allocator.free(this.specifier);
+ bun.default_allocator.destroy(this);
}
fn flushLogs(this: *WebWorker) void {
@@ -250,7 +228,7 @@ pub const WebWorker = struct {
}
fn setStatus(this: *WebWorker, status: Status) void {
- log("status: {s}", .{@tagName(status)});
+ log("[{d}] status: {s}", .{ this.execution_context_id, @tagName(status) });
this.status = status;
}
@@ -265,18 +243,15 @@ pub const WebWorker = struct {
var promise = vm.loadEntryPointForWebWorker(this.specifier) catch {
this.flushLogs();
- this.onTerminate();
+ this.exitAndDeinit();
return;
};
- this.queueInitialTask();
-
if (promise.status(vm.global.vm()) == .Rejected) {
vm.onUnhandledError(vm.global, promise.result(vm.global.vm()));
vm.exit_handler.exit_code = 1;
- this.onTerminate();
-
+ this.exitAndDeinit();
return;
}
@@ -300,89 +275,56 @@ pub const WebWorker = struct {
// always doing a first tick so we call CppTask without delay after dispatchOnline
vm.tick();
- {
- while (true) {
- while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) {
- vm.tick();
-
- vm.eventLoop().autoTickActive();
- }
-
- if (!this.allowed_to_exit) {
- this.flushLogs();
- vm.eventLoop().tickPossiblyForever();
- continue;
- }
-
- vm.onBeforeExit();
-
- if (!this.allowed_to_exit)
- continue;
-
- break;
- }
-
- this.flushLogs();
- this.onTerminate();
+ while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) {
+ vm.tick();
+ if (this.requested_terminate) break;
+ vm.eventLoop().autoTickActive();
+ if (this.requested_terminate) break;
}
- }
-
- pub const Status = enum {
- start,
- starting,
- running,
- terminated,
- };
- pub fn terminate(this: *WebWorker) callconv(.C) void {
- if (this.requested_terminate) {
- return;
+ // Only call "beforeExit" if we weren't from a .terminate
+ if (!this.requested_terminate) {
+ // TODO: is this able to allow the event loop to continue?
+ vm.onBeforeExit();
}
- _ = this.requestTerminate();
+
+ this.flushLogs();
+ this.exitAndDeinit();
}
+ /// This is worker.ref()/.unref() from JS (Caller thread)
pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void {
- if (this.requested_terminate and !value) {
- this.parent_poll_ref.unref(this.parent);
+ if (this.requested_terminate) {
return;
}
-
- this.allowed_to_exit = !value;
- if (this.allowed_to_exit) {
- this.parent_poll_ref.unref(this.parent);
- } else {
+ if (value) {
this.parent_poll_ref.ref(this.parent);
- }
-
- if (this.vm) |vm| {
- vm.eventLoop().wakeup();
- }
- }
-
- fn onTerminate(this: *WebWorker) void {
- log("onTerminate", .{});
-
- this.reallyExit();
- }
-
- comptime {
- if (!JSC.is_bindgen) {
- @export(hasPendingActivity, .{ .name = "WebWorker__hasPendingActivity" });
- @export(create, .{ .name = "WebWorker__create" });
- @export(terminate, .{ .name = "WebWorker__terminate" });
- @export(setRef, .{ .name = "WebWorker__setRef" });
- _ = WebWorker__updatePtr;
+ } else {
+ this.parent_poll_ref.unref(this.parent);
}
}
- fn reallyExit(this: *WebWorker) void {
- JSC.markBinding(@src());
-
+ /// Request a terminate (Called from main thread from worker.terminate(), or inside worker in process.exit())
+ /// The termination will actually happen after the next tick of the worker's loop.
+ pub fn requestTerminate(this: *WebWorker) callconv(.C) void {
if (this.requested_terminate) {
return;
}
+ log("[{d}] requestTerminate", .{this.execution_context_id});
+ this.setRef(false);
this.requested_terminate = true;
+ if (this.vm) |vm| {
+ vm.global.vm().notifyNeedTermination();
+ vm.eventLoop().wakeup();
+ }
+ }
+ /// This handles cleanup, emitting the "close" event, and deinit.
+ /// Only call after the VM is initialized AND on the same thread as the worker.
+ /// Otherwise, call `requestTerminate` to cause the event loop to safely terminate after the next tick.
+ fn exitAndDeinit(this: *WebWorker) void {
+ JSC.markBinding(@src());
+ log("[{d}] exitAndDeinit", .{this.execution_context_id});
var cpp_worker = this.cpp_worker;
var exit_code: i32 = 0;
var globalObject: ?*JSC.JSGlobalObject = null;
@@ -392,24 +334,18 @@ pub const WebWorker = struct {
exit_code = vm.exit_handler.exit_code;
globalObject = vm.global;
this.arena.deinit();
+ vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
}
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
-
this.deinit();
}
- fn requestTerminate(this: *WebWorker) bool {
- this.setRef(false);
- var vm = this.vm orelse {
- this.requested_terminate = true;
- return false;
- };
- this.allowed_to_exit = true;
- log("requesting terminate", .{});
- var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM");
- var task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM");
- task.* = JSC.AnyTask.New(WebWorker, onTerminate).init(this);
- vm.eventLoop().enqueueTaskConcurrent(concurrent_task.from(task, .auto_deinit));
- return true;
+ comptime {
+ if (!JSC.is_bindgen) {
+ @export(create, .{ .name = "WebWorker__create" });
+ @export(requestTerminate, .{ .name = "WebWorker__requestTerminate" });
+ @export(setRef, .{ .name = "WebWorker__setRef" });
+ _ = WebWorker__updatePtr;
+ }
}
};