diff options
Diffstat (limited to 'src/bun.js')
-rw-r--r-- | src/bun.js/base.zig | 30 | ||||
-rw-r--r-- | src/bun.js/bindings/BunWorkerGlobalScope.cpp | 30 | ||||
-rw-r--r-- | src/bun.js/bindings/BunWorkerGlobalScope.h | 7 | ||||
-rw-r--r-- | src/bun.js/bindings/bindings.cpp | 6 | ||||
-rw-r--r-- | src/bun.js/bindings/bindings.zig | 21 | ||||
-rw-r--r-- | src/bun.js/bindings/headers.h | 4 | ||||
-rw-r--r-- | src/bun.js/bindings/headers.zig | 4 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/EventTarget.cpp | 11 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/EventTarget.h | 7 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/MessagePort.cpp | 29 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/MessagePort.h | 3 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/Worker.cpp | 29 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/Worker.h | 1 | ||||
-rw-r--r-- | src/bun.js/module_loader.zig | 6 | ||||
-rw-r--r-- | src/bun.js/node/types.zig | 2 | ||||
-rw-r--r-- | src/bun.js/web_worker.zig | 206 |
16 files changed, 239 insertions, 157 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index c964c1d95..27f40eeab 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -2122,6 +2122,11 @@ pub export fn MarkedArrayBuffer_deallocator(bytes_: *anyopaque, _: *anyopaque) v // zig's memory allocator interface won't work here // mimalloc knows the size of things // but we don't + // if (comptime Environment.allow_assert) { + // std.debug.assert(mimalloc.mi_check_owned(bytes_) or + // mimalloc.mi_heap_check_owned(JSC.VirtualMachine.get().arena.heap.?, bytes_)); + // } + mimalloc.mi_free(bytes_); } @@ -3285,6 +3290,15 @@ pub const PollRef = struct { this.status = .inactive; vm.uws_event_loop.?.unref(); } + + /// From another thread, Prevent a poll from keeping the process alive. + pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + if (this.status != .active) + return; + this.status = .inactive; + vm.uws_event_loop.?.unrefConcurrently(); + } + /// Prevent a poll from keeping the process alive on the next tick. pub fn unrefOnNextTick(this: *PollRef, vm: *JSC.VirtualMachine) void { if (this.status != .active) @@ -3293,6 +3307,14 @@ pub const PollRef = struct { vm.pending_unref_counter +|= 1; } + /// From another thread, prevent a poll from keeping the process alive on the next tick. + pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + if (this.status != .active) + return; + this.status = .inactive; + _ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic); + } + /// Allow a poll to keep the process alive. pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void { if (this.status != .inactive) @@ -3300,6 +3322,14 @@ pub const PollRef = struct { this.status = .active; vm.uws_event_loop.?.ref(); } + + /// Allow a poll to keep the process alive. + pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + if (this.status != .inactive) + return; + this.status = .active; + vm.uws_event_loop.?.refConcurrently(); + } }; const KQueueGenerationNumber = if (Environment.isMac and Environment.allow_assert) usize else u0; diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.cpp b/src/bun.js/bindings/BunWorkerGlobalScope.cpp index ef1f70fdf..f78111633 100644 --- a/src/bun.js/bindings/BunWorkerGlobalScope.cpp +++ b/src/bun.js/bindings/BunWorkerGlobalScope.cpp @@ -11,4 +11,34 @@ MessagePortChannelProvider& GlobalScope::messagePortChannelProvider() { return *reinterpret_cast<MessagePortChannelProvider*>(&MessagePortChannelProviderImpl::singleton()); } + +void GlobalScope::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind) +{ + if (eventType == eventNames().messageEvent) { + auto& global = static_cast<GlobalScope&>(self); + switch (kind) { + case Add: + if (global.m_messageEventCount == 0) { + global.scriptExecutionContext()->refEventLoop(); + } + global.m_messageEventCount++; + break; + case Remove: + global.m_messageEventCount--; + if (global.m_messageEventCount == 0) { + global.scriptExecutionContext()->unrefEventLoop(); + } + break; + // I dont think clear in this context is ever called. If it is (search OnDidChangeListenerKind::Clear for the impl), + // it may actually call once per event, in a way the Remove code above would suffice. + case Clear: + if (global.m_messageEventCount > 0) { + global.scriptExecutionContext()->unrefEventLoop(); + } + global.m_messageEventCount = 0; + break; + } + } +}; + }
\ No newline at end of file diff --git a/src/bun.js/bindings/BunWorkerGlobalScope.h b/src/bun.js/bindings/BunWorkerGlobalScope.h index fff50d6ec..f8e0be52e 100644 --- a/src/bun.js/bindings/BunWorkerGlobalScope.h +++ b/src/bun.js/bindings/BunWorkerGlobalScope.h @@ -2,6 +2,7 @@ #include "root.h" +#include "EventNames.h" #include "EventTarget.h" #include "ContextDestructionObserver.h" #include "ExceptionOr.h" @@ -17,12 +18,18 @@ class MessagePortChannelProviderImpl; class GlobalScope : public RefCounted<GlobalScope>, public EventTargetWithInlineData { WTF_MAKE_ISO_ALLOCATED(GlobalScope); + uint32_t m_messageEventCount; + + static void onDidChangeListenerImpl(EventTarget&, const AtomString&, OnDidChangeListenerKind); + public: GlobalScope(ScriptExecutionContext* context) : EventTargetWithInlineData() , m_context(context) { + this->onDidChangeListener = &onDidChangeListenerImpl; } + using RefCounted::deref; using RefCounted::ref; diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index 201fc0959..f7998c83c 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -3901,6 +3901,12 @@ bool JSC__VM__isEntered(JSC__VM* arg0) { return (*arg0).isEntered(); } void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1) { (*arg0).setExecutionForbidden(); } +// These may be called concurrently from another thread. +void JSC__VM__notifyNeedTermination(JSC__VM* arg0) { (*arg0).notifyNeedTermination(); } +void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0) { (*arg0).notifyNeedDebuggerBreak(); } +void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0) { (*arg0).notifyNeedShellTimeoutCheck(); } +void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0) { (*arg0).notifyNeedWatchdogCheck(); } + void JSC__VM__throwError(JSC__VM* vm_, JSC__JSGlobalObject* arg1, JSC__JSValue value) { JSC::VM& vm = *reinterpret_cast<JSC::VM*>(vm_); diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index 0b787ee42..7f081a4b4 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -5063,6 +5063,26 @@ pub const VM = extern struct { }); } + // These four functions fire VM traps. To understand what that means, see VMTraps.h for a giant explainer. + // These may be called concurrently from another thread. + + /// Fires NeedTermination Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedTermination(vm: *VM) void { + cppFn("notifyNeedTermination", .{vm}); + } + /// Fires NeedWatchdogCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedWatchdogCheck(vm: *VM) void { + cppFn("notifyNeedWatchdogCheck", .{vm}); + } + /// Fires NeedDebuggerBreak Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedDebuggerBreak(vm: *VM) void { + cppFn("notifyNeedDebuggerBreak", .{vm}); + } + /// Fires NeedShellTimeoutCheck Trap. Thread safe. See JSC's "VMTraps.h" for explaination on traps. + pub fn notifyNeedShellTimeoutCheck(vm: *VM) void { + cppFn("notifyNeedShellTimeoutCheck", .{vm}); + } + pub fn isEntered(vm: *VM) bool { return cppFn("isEntered", .{ vm, @@ -5538,7 +5558,6 @@ pub const WTF = struct { // This is any alignment WTF__copyLCharsFromUCharSource(destination, source.ptr, source.len); } - }; pub const Callback = struct { diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index 63ae6c3a4..c809ddcee 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -430,6 +430,10 @@ CPP_DECL void JSC__VM__releaseWeakRefs(JSC__VM* arg0); CPP_DECL JSC__JSValue JSC__VM__runGC(JSC__VM* arg0, bool arg1); CPP_DECL void JSC__VM__setControlFlowProfiler(JSC__VM* arg0, bool arg1); CPP_DECL void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1); +CPP_DECL void JSC__VM__notifyNeedTermination(JSC__VM* arg0); +CPP_DECL void JSC__VM__notifyNeedDebuggerBreak(JSC__VM* arg0); +CPP_DECL void JSC__VM__notifyNeedShellTimeoutCheck(JSC__VM* arg0); +CPP_DECL void JSC__VM__notifyNeedWatchdogCheck(JSC__VM* arg0); CPP_DECL void JSC__VM__setExecutionTimeLimit(JSC__VM* arg0, double arg1); CPP_DECL void JSC__VM__shrinkFootprint(JSC__VM* arg0); CPP_DECL void JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue2); diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig index 2b25c0f5b..8471cb235 100644 --- a/src/bun.js/bindings/headers.zig +++ b/src/bun.js/bindings/headers.zig @@ -324,6 +324,10 @@ pub extern fn JSC__VM__releaseWeakRefs(arg0: *bindings.VM) void; pub extern fn JSC__VM__runGC(arg0: *bindings.VM, arg1: bool) JSC__JSValue; pub extern fn JSC__VM__setControlFlowProfiler(arg0: *bindings.VM, arg1: bool) void; pub extern fn JSC__VM__setExecutionForbidden(arg0: *bindings.VM, arg1: bool) void; +pub extern fn JSC__VM__notifyNeedTermination(arg0: *bindings.VM) void; +pub extern fn JSC__VM__notifyNeedDebuggerBreak(arg0: *bindings.VM) void; +pub extern fn JSC__VM__notifyNeedShellTimeoutCheck(arg0: *bindings.VM) void; +pub extern fn JSC__VM__notifyNeedWatchdogCheck(arg0: *bindings.VM) void; pub extern fn JSC__VM__setExecutionTimeLimit(arg0: *bindings.VM, arg1: f64) void; pub extern fn JSC__VM__shrinkFootprint(arg0: *bindings.VM) void; pub extern fn JSC__VM__throwError(arg0: *bindings.VM, arg1: *bindings.JSGlobalObject, JSValue2: JSC__JSValue) void; diff --git a/src/bun.js/bindings/webcore/EventTarget.cpp b/src/bun.js/bindings/webcore/EventTarget.cpp index 9fb875595..36adac3d3 100644 --- a/src/bun.js/bindings/webcore/EventTarget.cpp +++ b/src/bun.js/bindings/webcore/EventTarget.cpp @@ -111,6 +111,9 @@ bool EventTarget::addEventListener(const AtomString& eventType, Ref<EventListene // invalidateEventListenerRegions(); eventListenersDidChange(); + if (UNLIKELY(this->onDidChangeListener)) { + this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Add); + } return true; } @@ -146,6 +149,9 @@ bool EventTarget::removeEventListener(const AtomString& eventType, EventListener if (eventNames().isWheelEventType(eventType)) invalidateEventListenerRegions(); + if (UNLIKELY(this->onDidChangeListener)) { + this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Remove); + } eventListenersDidChange(); return true; } @@ -376,6 +382,11 @@ void EventTarget::removeAllEventListeners() // if (data->eventListenerMap.contains(eventNames().wheelEvent) || data->eventListenerMap.contains(eventNames().mousewheelEvent)) // invalidateEventListenerRegions(); + if (UNLIKELY(this->onDidChangeListener)) { + for (auto& eventType : data->eventListenerMap.eventTypes()) { + this->onDidChangeListener(*this, eventType, OnDidChangeListenerKind::Clear); + } + } data->eventListenerMap.clear(); eventListenersDidChange(); } diff --git a/src/bun.js/bindings/webcore/EventTarget.h b/src/bun.js/bindings/webcore/EventTarget.h index b763393d7..f5c4354ee 100644 --- a/src/bun.js/bindings/webcore/EventTarget.h +++ b/src/bun.js/bindings/webcore/EventTarget.h @@ -126,6 +126,13 @@ protected: virtual void eventListenersDidChange() {} + enum OnDidChangeListenerKind { + Add, + Remove, + Clear, + }; + WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)> onDidChangeListener = WTF::Function<void(EventTarget&, const AtomString& eventName, OnDidChangeListenerKind kind)>(nullptr); + private: virtual void refEventTarget() = 0; virtual void derefEventTarget() = 0; diff --git a/src/bun.js/bindings/webcore/MessagePort.cpp b/src/bun.js/bindings/webcore/MessagePort.cpp index 2d94060f1..da2bd32e8 100644 --- a/src/bun.js/bindings/webcore/MessagePort.cpp +++ b/src/bun.js/bindings/webcore/MessagePort.cpp @@ -392,10 +392,38 @@ Vector<RefPtr<MessagePort>> MessagePort::entanglePorts(ScriptExecutionContext& c }); } +void MessagePort::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind) +{ + if (eventType == eventNames().messageEvent) { + auto& port = static_cast<MessagePort&>(self); + switch (kind) { + case Add: + if (port.m_messageEventCount == 0) { + port.scriptExecutionContext()->refEventLoop(); + } + port.m_messageEventCount++; + break; + case Remove: + port.m_messageEventCount--; + if (port.m_messageEventCount == 0) { + port.scriptExecutionContext()->unrefEventLoop(); + } + break; + case Clear: + if (port.m_messageEventCount > 0) { + port.scriptExecutionContext()->unrefEventLoop(); + } + port.m_messageEventCount = 0; + break; + } + } +}; + Ref<MessagePort> MessagePort::entangle(ScriptExecutionContext& context, TransferredMessagePort&& transferredPort) { auto port = MessagePort::create(context, transferredPort.first, transferredPort.second); port->entangle(); + port->onDidChangeListener = &MessagePort::onDidChangeListenerImpl; return port; } @@ -406,7 +434,6 @@ bool MessagePort::addEventListener(const AtomString& eventType, Ref<EventListene start(); m_hasMessageEventListener = true; } - return EventTarget::addEventListener(eventType, WTFMove(listener), options); } diff --git a/src/bun.js/bindings/webcore/MessagePort.h b/src/bun.js/bindings/webcore/MessagePort.h index fe577f93e..d4532433f 100644 --- a/src/bun.js/bindings/webcore/MessagePort.h +++ b/src/bun.js/bindings/webcore/MessagePort.h @@ -141,6 +141,9 @@ private: mutable std::atomic<unsigned> m_refCount { 1 }; bool m_hasRef { false }; + + uint32_t m_messageEventCount { 0 }; + static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind); }; WebCoreOpaqueRoot root(MessagePort*); diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index 92503bbbd..2735770c0 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -66,7 +66,7 @@ namespace WebCore { WTF_MAKE_ISO_ALLOCATED_IMPL(Worker); -extern "C" void WebWorker__terminate( +extern "C" void WebWorker__requestTerminate( void* worker); static Lock allWorkersLock; @@ -210,12 +210,9 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m void Worker::terminate() { - if (m_wasTerminated) { - return; - } // m_contextProxy.terminateWorkerGlobalScope(); m_wasTerminated = true; - WebWorker__terminate(impl_); + WebWorker__requestTerminate(impl_); } // const char* Worker::activeDOMObjectName() const @@ -259,9 +256,14 @@ bool Worker::hasPendingActivity() const void Worker::dispatchEvent(Event& event) { - if (m_wasTerminated) - return; + if (!m_wasTerminated) + EventTargetWithInlineData::dispatchEvent(event); +} +// The close event gets dispatched even if m_wasTerminated is true. +// This allows new wt.Worker().terminate() to actually resolve +void Worker::dispatchCloseEvent(Event& event) +{ EventTargetWithInlineData::dispatchEvent(event); } @@ -350,11 +352,10 @@ void Worker::dispatchExit(int32_t exitCode) ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void { protectedThis->m_isOnline = false; protectedThis->m_isClosing = true; - protectedThis->setKeepAlive(false); if (protectedThis->hasEventListeners(eventNames().closeEvent)) { auto event = CloseEvent::create(exitCode == 0, static_cast<unsigned short>(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s); - protectedThis->dispatchEvent(event); + protectedThis->dispatchCloseEvent(event); } }); } @@ -388,18 +389,20 @@ extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* } auto& vm = globalObject->vm(); - + vm.notifyNeedTermination(); if (JSC::JSObject* obj = JSC::jsDynamicCast<JSC::JSObject*>(globalObject->moduleLoader())) { auto id = JSC::Identifier::fromString(globalObject->vm(), "registry"_s); - if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(obj->getIfPropertyExists(globalObject, id))) { - registry->clear(vm); + auto registryValue = obj->getIfPropertyExists(globalObject, id); + if (registryValue) { + if (auto* registry = JSC::jsDynamicCast<JSC::JSMap*>(registryValue)) { + registry->clear(vm); + } } } gcUnprotect(globalObject); vm.deleteAllCode(JSC::DeleteAllCodeEffort::PreventCollectionAndDeleteAllCode); vm.heap.reportAbandonedObjectGraph(); WTF::releaseFastMallocFreeMemoryForThisThread(); - vm.notifyNeedTermination(); vm.deferredWorkTimer->doWork(vm); } } diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h index a296fa4a8..f7641d28d 100644 --- a/src/bun.js/bindings/webcore/Worker.h +++ b/src/bun.js/bindings/webcore/Worker.h @@ -76,6 +76,7 @@ public: const String& name() const { return m_options.name; } void dispatchEvent(Event&); + void dispatchCloseEvent(Event&); void setKeepAlive(bool); #if ENABLE(WEB_RTC) diff --git a/src/bun.js/module_loader.zig b/src/bun.js/module_loader.zig index 5d9158b58..fc31498b4 100644 --- a/src/bun.js/module_loader.zig +++ b/src/bun.js/module_loader.zig @@ -2004,12 +2004,6 @@ pub const ModuleLoader = struct { } else if (HardcodedModule.Map.getWithEql(specifier, bun.String.eqlComptime)) |hardcoded| { switch (hardcoded) { .@"bun:main" => { - defer { - if (jsc_vm.worker) |worker| { - worker.queueInitialTask(); - } - } - return ResolvedSource{ .allocator = null, .source_code = bun.String.create(jsc_vm.entry_point.source.contents), diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 02bfe8757..ddb58f0cd 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -2346,7 +2346,7 @@ pub const Process = struct { var vm = globalObject.bunVM(); if (vm.worker) |worker| { vm.exit_handler.exit_code = code; - worker.terminate(); + worker.requestTerminate(); return; } diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index 7fa9c6690..16d7aa34c 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -5,10 +5,12 @@ const log = Output.scoped(.Worker, true); const std = @import("std"); const JSValue = JSC.JSValue; +/// Shared implementation of Web and Node `Worker` pub const WebWorker = struct { - // null when haven't started yet + /// null when haven't started yet vm: ?*JSC.VirtualMachine = null, status: Status = .start, + /// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop. requested_terminate: bool = false, execution_context_id: u32 = 0, parent_context_id: u32 = 0, @@ -20,15 +22,25 @@ pub const WebWorker = struct { arena: bun.MimallocArena = undefined, name: [:0]const u8 = "Worker", cpp_worker: *anyopaque, - allowed_to_exit: bool = false, mini: bool = false, + + /// `user_keep_alive` is the state of the user's .ref()/.unref() calls + /// if false, then the parent poll will always be unref, otherwise the worker's event loop will keep the poll alive. + user_keep_alive: bool = false, + worker_event_loop_running: bool = true, parent_poll_ref: JSC.PollRef = .{}, - initial_poll_ref: JSC.PollRef = .{}, - did_send_initial_task: bool = false, + + pub const Status = enum { + start, + starting, + running, + terminated, + }; extern fn WebWorker__dispatchExit(?*JSC.JSGlobalObject, *anyopaque, i32) void; extern fn WebWorker__dispatchOnline(this: *anyopaque, *JSC.JSGlobalObject) void; extern fn WebWorker__dispatchError(*JSC.JSGlobalObject, *anyopaque, bun.String, JSValue) void; + export fn WebWorker__getParentWorker(vm: *JSC.VirtualMachine) ?*anyopaque { var worker = vm.worker orelse return null; return worker.cpp_worker; @@ -43,31 +55,12 @@ pub const WebWorker = struct { .{worker}, ) catch { worker.deinit(); - worker.parent_poll_ref.unref(worker.parent); - worker.initial_poll_ref.unref(worker.parent); - bun.default_allocator.destroy(worker); return false; }; thread.detach(); return true; } - pub fn hasPendingActivity(this: *WebWorker) callconv(.C) bool { - JSC.markBinding(@src()); - - if (this.vm == null) { - return !this.requested_terminate; - } - - if (!this.allowed_to_exit) { - return true; - } - - var vm = this.vm.?; - - return vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0; - } - pub fn create( cpp_worker: *void, parent: *JSC.VirtualMachine, @@ -80,6 +73,7 @@ pub const WebWorker = struct { default_unref: bool, ) callconv(.C) ?*WebWorker { JSC.markBinding(@src()); + log("[{d}] WebWorker.create", .{this_context_id}); var spec_slice = specifier_str.toUTF8(bun.default_allocator); defer spec_slice.deinit(); var prev_log = parent.bundler.log; @@ -115,35 +109,15 @@ pub const WebWorker = struct { } break :brk ""; }, + .user_keep_alive = !default_unref, + .worker_event_loop_running = true, }; - worker.initial_poll_ref.ref(parent); - - if (!default_unref) { - worker.allowed_to_exit = false; - worker.parent_poll_ref.ref(parent); - } + worker.parent_poll_ref.refConcurrently(parent); return worker; } - pub fn queueInitialTask(this: *WebWorker) void { - if (this.did_send_initial_task) return; - this.did_send_initial_task = true; - - const Unref = struct { - pub fn unref(worker: *WebWorker) void { - worker.initial_poll_ref.unref(worker.parent); - } - }; - - const AnyTask = JSC.AnyTask.New(WebWorker, Unref.unref); - var any_task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM"); - any_task.* = AnyTask.init(this); - var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM"); - this.parent.eventLoop().enqueueTaskConcurrent(concurrent_task.from(any_task, .auto_deinit)); - } - pub fn startWithErrorHandling( this: *WebWorker, ) void { @@ -162,7 +136,6 @@ pub const WebWorker = struct { } if (this.requested_terminate) { - this.queueInitialTask(); this.deinit(); return; } @@ -182,12 +155,12 @@ pub const WebWorker = struct { b.configureRouter(false) catch { this.flushLogs(); - this.onTerminate(); + this.exitAndDeinit(); return; }; b.configureDefines() catch { this.flushLogs(); - this.onTerminate(); + this.exitAndDeinit(); return; }; @@ -202,8 +175,13 @@ pub const WebWorker = struct { vm.global.vm().holdAPILock(this, callback); } + /// Deinit will clean up vm and everything. + /// Early deinit may be called from caller thread, but full vm deinit will only be called within worker's thread. fn deinit(this: *WebWorker) void { + log("[{d}] deinit", .{this.execution_context_id}); + this.parent_poll_ref.unrefConcurrently(this.parent); bun.default_allocator.free(this.specifier); + bun.default_allocator.destroy(this); } fn flushLogs(this: *WebWorker) void { @@ -250,7 +228,7 @@ pub const WebWorker = struct { } fn setStatus(this: *WebWorker, status: Status) void { - log("status: {s}", .{@tagName(status)}); + log("[{d}] status: {s}", .{ this.execution_context_id, @tagName(status) }); this.status = status; } @@ -265,18 +243,15 @@ pub const WebWorker = struct { var promise = vm.loadEntryPointForWebWorker(this.specifier) catch { this.flushLogs(); - this.onTerminate(); + this.exitAndDeinit(); return; }; - this.queueInitialTask(); - if (promise.status(vm.global.vm()) == .Rejected) { vm.onUnhandledError(vm.global, promise.result(vm.global.vm())); vm.exit_handler.exit_code = 1; - this.onTerminate(); - + this.exitAndDeinit(); return; } @@ -300,89 +275,56 @@ pub const WebWorker = struct { // always doing a first tick so we call CppTask without delay after dispatchOnline vm.tick(); - { - while (true) { - while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) { - vm.tick(); - - vm.eventLoop().autoTickActive(); - } - - if (!this.allowed_to_exit) { - this.flushLogs(); - vm.eventLoop().tickPossiblyForever(); - continue; - } - - vm.onBeforeExit(); - - if (!this.allowed_to_exit) - continue; - - break; - } - - this.flushLogs(); - this.onTerminate(); + while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) { + vm.tick(); + if (this.requested_terminate) break; + vm.eventLoop().autoTickActive(); + if (this.requested_terminate) break; } - } - - pub const Status = enum { - start, - starting, - running, - terminated, - }; - pub fn terminate(this: *WebWorker) callconv(.C) void { - if (this.requested_terminate) { - return; + // Only call "beforeExit" if we weren't from a .terminate + if (!this.requested_terminate) { + // TODO: is this able to allow the event loop to continue? + vm.onBeforeExit(); } - _ = this.requestTerminate(); + + this.flushLogs(); + this.exitAndDeinit(); } + /// This is worker.ref()/.unref() from JS (Caller thread) pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void { - if (this.requested_terminate and !value) { - this.parent_poll_ref.unref(this.parent); + if (this.requested_terminate) { return; } - - this.allowed_to_exit = !value; - if (this.allowed_to_exit) { - this.parent_poll_ref.unref(this.parent); - } else { + if (value) { this.parent_poll_ref.ref(this.parent); - } - - if (this.vm) |vm| { - vm.eventLoop().wakeup(); - } - } - - fn onTerminate(this: *WebWorker) void { - log("onTerminate", .{}); - - this.reallyExit(); - } - - comptime { - if (!JSC.is_bindgen) { - @export(hasPendingActivity, .{ .name = "WebWorker__hasPendingActivity" }); - @export(create, .{ .name = "WebWorker__create" }); - @export(terminate, .{ .name = "WebWorker__terminate" }); - @export(setRef, .{ .name = "WebWorker__setRef" }); - _ = WebWorker__updatePtr; + } else { + this.parent_poll_ref.unref(this.parent); } } - fn reallyExit(this: *WebWorker) void { - JSC.markBinding(@src()); - + /// Request a terminate (Called from main thread from worker.terminate(), or inside worker in process.exit()) + /// The termination will actually happen after the next tick of the worker's loop. + pub fn requestTerminate(this: *WebWorker) callconv(.C) void { if (this.requested_terminate) { return; } + log("[{d}] requestTerminate", .{this.execution_context_id}); + this.setRef(false); this.requested_terminate = true; + if (this.vm) |vm| { + vm.global.vm().notifyNeedTermination(); + vm.eventLoop().wakeup(); + } + } + /// This handles cleanup, emitting the "close" event, and deinit. + /// Only call after the VM is initialized AND on the same thread as the worker. + /// Otherwise, call `requestTerminate` to cause the event loop to safely terminate after the next tick. + fn exitAndDeinit(this: *WebWorker) void { + JSC.markBinding(@src()); + log("[{d}] exitAndDeinit", .{this.execution_context_id}); var cpp_worker = this.cpp_worker; var exit_code: i32 = 0; var globalObject: ?*JSC.JSGlobalObject = null; @@ -392,24 +334,18 @@ pub const WebWorker = struct { exit_code = vm.exit_handler.exit_code; globalObject = vm.global; this.arena.deinit(); + vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm. } WebWorker__dispatchExit(globalObject, cpp_worker, exit_code); - this.deinit(); } - fn requestTerminate(this: *WebWorker) bool { - this.setRef(false); - var vm = this.vm orelse { - this.requested_terminate = true; - return false; - }; - this.allowed_to_exit = true; - log("requesting terminate", .{}); - var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM"); - var task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM"); - task.* = JSC.AnyTask.New(WebWorker, onTerminate).init(this); - vm.eventLoop().enqueueTaskConcurrent(concurrent_task.from(task, .auto_deinit)); - return true; + comptime { + if (!JSC.is_bindgen) { + @export(create, .{ .name = "WebWorker__create" }); + @export(requestTerminate, .{ .name = "WebWorker__requestTerminate" }); + @export(setRef, .{ .name = "WebWorker__setRef" }); + _ = WebWorker__updatePtr; + } } }; |