diff options
-rw-r--r-- | packages/bun-usockets/src/eventing/epoll_kqueue.c | 25 | ||||
-rw-r--r-- | src/bun.js/base.zig | 84 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 75 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 25 | ||||
-rw-r--r-- | src/bun.js/rare_data.zig | 8 | ||||
-rw-r--r-- | src/bun.js/web_worker.zig | 5 | ||||
-rw-r--r-- | src/bun_js.zig | 7 | ||||
-rw-r--r-- | src/cli/test_command.zig | 2 | ||||
-rw-r--r-- | src/deps/uws.zig | 8 |
9 files changed, 167 insertions, 72 deletions
diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c index 0e2c1f92b..7ab2be826 100644 --- a/packages/bun-usockets/src/eventing/epoll_kqueue.c +++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c @@ -24,10 +24,13 @@ void Bun__internal_dispatch_ready_poll(void* loop, void* poll); // void Bun__internal_dispatch_ready_poll(void* loop, void* poll) {} -void us_loop_run_bun_tick(struct us_loop_t *loop); - +#ifndef WIN32 /* Cannot include this one on Windows */ #include <unistd.h> +#include <stdint.h> +#endif + +void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs); /* Pointer tags are used to indicate a Bun pointer versus a uSockets pointer */ #define UNSET_BITS_49_UNTIL_64 0x0000FFFFFFFFFFFF @@ -172,7 +175,7 @@ void us_loop_run(struct us_loop_t *loop) { } -void us_loop_run_bun_tick(struct us_loop_t *loop) { +void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs) { us_loop_integrate(loop); if (loop->num_polls == 0) @@ -183,10 +186,20 @@ void us_loop_run_bun_tick(struct us_loop_t *loop) { /* Fetch ready polls */ #ifdef LIBUS_USE_EPOLL - loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1); + if (timeoutMs > 0) { + loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, (int)timeoutMs); + } else { + loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1); + } #else - struct timespec ts = {0, 0}; - loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL); + if (timeoutMs > 0) { + struct timespec ts = {0, 0}; + ts.tv_sec = timeoutMs / 1000; + ts.tv_nsec = (timeoutMs % 1000) * 1000000; + loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, &ts); + } else { + loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL); + } #endif /* Iterate ready polls, dispatching them by type */ diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index c59d3111f..a6df36c4f 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -1706,6 +1706,7 @@ pub const FilePoll = struct { /// on macOS kevent64 has an extra pointer field so we use it for that /// linux doesn't have a field like that generation_number: KQueueGenerationNumber = 0, + next_to_free: ?*FilePoll = null, const FileReader = JSC.WebCore.FileReader; const FileSink = JSC.WebCore.FileSink; @@ -1789,20 +1790,21 @@ pub const FilePoll = struct { this.deinitWithVM(vm); } - pub fn deinitWithoutVM(this: *FilePoll, loop: *uws.Loop, polls: *JSC.FilePoll.HiveArray) void { + fn deinitPossiblyDefer(this: *FilePoll, vm: *JSC.VirtualMachine, loop: *uws.Loop, polls: *JSC.FilePoll.Store) void { if (this.isRegistered()) { _ = this.unregister(loop); } this.owner = Deactivated.owner; + const was_ever_registered = this.flags.contains(.was_ever_registered); this.flags = Flags.Set{}; this.fd = invalid_fd; - polls.put(this); + polls.put(this, vm, was_ever_registered); } pub fn deinitWithVM(this: *FilePoll, vm: *JSC.VirtualMachine) void { var loop = vm.event_loop_handle.?; - this.deinitWithoutVM(loop, vm.rareData().filePolls(vm)); + this.deinitPossiblyDefer(vm, loop, vm.rareData().filePolls(vm)); } pub fn isRegistered(this: *const FilePoll) bool { @@ -1888,6 +1890,9 @@ pub const FilePoll = struct { nonblocking, + was_ever_registered, + ignore_updates, + pub fn poll(this: Flags) Flags { return switch (this) { .readable => .poll_readable, @@ -1949,7 +1954,64 @@ pub const FilePoll = struct { } }; - pub const HiveArray = bun.HiveArray(FilePoll, 128).Fallback; + const HiveArray = bun.HiveArray(FilePoll, 128).Fallback; + + // We defer freeing FilePoll until the end of the next event loop iteration + // This ensures that we don't free a FilePoll before the next callback is called + pub const Store = struct { + hive: HiveArray, + pending_free_head: ?*FilePoll = null, + pending_free_tail: ?*FilePoll = null, + + const log = Output.scoped(.FilePoll, false); + + pub fn init(allocator: std.mem.Allocator) Store { + return .{ + .hive = HiveArray.init(allocator), + }; + } + + pub fn get(this: *Store) *FilePoll { + return this.hive.get(); + } + + pub fn processDeferredFrees(this: *Store) void { + var next = this.pending_free_head; + while (next) |current| { + next = current.next_to_free; + current.next_to_free = null; + this.hive.put(current); + } + this.pending_free_head = null; + this.pending_free_tail = null; + } + + pub fn put(this: *Store, poll: *FilePoll, vm: *JSC.VirtualMachine, ever_registered: bool) void { + if (!ever_registered) { + this.hive.put(poll); + return; + } + + std.debug.assert(poll.next_to_free == null); + + if (this.pending_free_tail) |tail| { + std.debug.assert(this.pending_free_head != null); + std.debug.assert(tail.next_to_free == null); + tail.next_to_free = poll; + } + + if (this.pending_free_head == null) { + this.pending_free_head = poll; + std.debug.assert(this.pending_free_tail == null); + } + + poll.flags.insert(.ignore_updates); + this.pending_free_tail = poll; + std.debug.assert(vm.after_event_loop_callback == null or vm.after_event_loop_callback == @as(?JSC.OpaqueCallback, @ptrCast(&processDeferredFrees))); + vm.after_event_loop_callback = @ptrCast(&processDeferredFrees); + vm.after_event_loop_callback_ctx = this; + } + }; const log = Output.scoped(.FilePoll, false); @@ -1999,7 +2061,6 @@ pub const FilePoll = struct { pub fn activate(this: *FilePoll, loop: *uws.Loop) void { loop.num_polls += @as(i32, @intFromBool(!this.flags.contains(.has_incremented_poll_count))); loop.active += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count))); - this.flags.insert(.has_incremented_poll_count); } @@ -2012,6 +2073,8 @@ pub const FilePoll = struct { poll.fd = @intCast(fd); poll.flags = Flags.Set.init(flags); poll.owner = owner; + poll.next_to_free = null; + if (KQueueGenerationNumber != u0) { max_generation_number +%= 1; poll.generation_number = max_generation_number; @@ -2052,7 +2115,11 @@ pub const FilePoll = struct { if (tag.tag() != @field(Pollable.Tag, "FilePoll")) return; - var file_poll = tag.as(FilePoll); + var file_poll: *FilePoll = tag.as(FilePoll); + if (file_poll.flags.contains(.ignore_updates)) { + return; + } + if (comptime Environment.isMac) onKQueueEvent(file_poll, loop, &loop.ready_polls[@as(usize, @intCast(loop.current_ready_poll))]) else if (comptime Environment.isLinux) @@ -2107,7 +2174,7 @@ pub const FilePoll = struct { @as(std.os.fd_t, @intCast(fd)), &event, ); - + this.flags.insert(.was_ever_registered); if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { return errno; } @@ -2180,6 +2247,8 @@ pub const FilePoll = struct { } }; + this.flags.insert(.was_ever_registered); + // If an error occurs while // processing an element of the changelist and there is enough room // in the eventlist, then the event will be placed in the eventlist @@ -2349,7 +2418,6 @@ pub const FilePoll = struct { this.flags.remove(.poll_writable); this.flags.remove(.poll_process); this.flags.remove(.poll_machport); - if (this.isActive()) this.deactivate(loop); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 1a8732318..640a9276c 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -737,6 +737,31 @@ pub const EventLoop = struct { if (loop.num_polls > 0 or loop.active > 0) { loop.tick(); this.processGCTimer(); + ctx.onAfterEventLoop(); + // this.afterUSocketsTick(); + } + } + + pub fn autoTickWithTimeout(this: *EventLoop, timeoutMs: i64) void { + var ctx = this.virtual_machine; + var loop = ctx.event_loop_handle.?; + + // Some tasks need to keep the event loop alive for one more tick. + // We want to keep the event loop alive long enough to process those ticks and any microtasks + // + // BUT. We don't actually have an idle event in that case. + // That means the process will be waiting forever on nothing. + // So we need to drain the counter immediately before entering uSockets loop + const pending_unref = ctx.pending_unref_counter; + if (pending_unref > 0) { + ctx.pending_unref_counter = 0; + loop.unrefCount(pending_unref); + } + + if (loop.num_polls > 0 or loop.active > 0) { + loop.tickWithTimeout(timeoutMs); + this.processGCTimer(); + ctx.onAfterEventLoop(); // this.afterUSocketsTick(); } } @@ -761,6 +786,7 @@ pub const EventLoop = struct { loop.tick(); this.processGCTimer(); + ctx.onAfterEventLoop(); this.tickConcurrent(); this.tick(); } @@ -783,6 +809,7 @@ pub const EventLoop = struct { if (loop.active > 0) { loop.tick(); this.processGCTimer(); + ctx.onAfterEventLoop(); // this.afterUSocketsTick(); } } @@ -817,26 +844,6 @@ pub const EventLoop = struct { this.global.handleRejectedPromises(); } - pub fn runUSocketsLoop(this: *EventLoop) void { - var ctx = this.virtual_machine; - - ctx.global.vm().releaseWeakRefs(); - ctx.global.vm().drainMicrotasks(); - var loop = ctx.event_loop_handle orelse return; - - if (loop.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (loop.num_polls > 0 or this.start_server_on_next_tick))) { - if (this.tickConcurrentWithCount() > 0) { - this.tick(); - } - - ctx.is_us_loop_entered = true; - this.start_server_on_next_tick = false; - ctx.enterUWSLoop(); - ctx.is_us_loop_entered = false; - ctx.autoGarbageCollect(); - } - } - pub fn waitForPromise(this: *EventLoop, promise: JSC.AnyPromise) void { switch (promise.status(this.global.vm())) { JSC.JSPromise.Status.Pending => { @@ -852,6 +859,8 @@ pub const EventLoop = struct { } } + // TODO: this implementation is terrible + // we should not be checking the millitimestamp every time pub fn waitForPromiseWithTimeout(this: *EventLoop, promise: JSC.AnyPromise, timeout: u32) bool { return switch (promise.status(this.global.vm())) { JSC.JSPromise.Status.Pending => { @@ -862,12 +871,13 @@ pub const EventLoop = struct { while (promise.status(this.global.vm()) == .Pending) { this.tick(); - if (std.time.milliTimestamp() - start_time > timeout) { - return false; - } - if (promise.status(this.global.vm()) == .Pending) { - this.autoTick(); + const remaining = std.time.milliTimestamp() - start_time; + if (remaining >= timeout) { + return false; + } + + this.autoTickWithTimeout(remaining); } } return true; @@ -876,21 +886,6 @@ pub const EventLoop = struct { }; } - pub fn waitForTasks(this: *EventLoop) void { - this.tick(); - while (this.tasks.count > 0) { - this.tick(); - - if (this.virtual_machine.event_loop_handle != null) { - this.runUSocketsLoop(); - } - } else { - if (this.virtual_machine.event_loop_handle != null) { - this.runUSocketsLoop(); - } - } - } - pub fn enqueueTask(this: *EventLoop, task: Task) void { this.tasks.writeItem(task) catch unreachable; } diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 59b10a75a..90a3bbc66 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -449,6 +449,9 @@ pub const VirtualMachine = struct { transpiler_store: JSC.RuntimeTranspilerStore, + after_event_loop_callback_ctx: ?*anyopaque = null, + after_event_loop_callback: ?OpaqueCallback = null, + /// The arguments used to launch the process _after_ the script name and bun and any flags applied to Bun /// "bun run foo --bar" /// ["--bar"] @@ -479,7 +482,6 @@ pub const VirtualMachine = struct { active_tasks: usize = 0, rare_data: ?*JSC.RareData = null, - us_loop_reference_count: usize = 0, is_us_loop_entered: bool = false, pending_internal_promise: *JSC.JSInternalPromise = undefined, auto_install_dependencies: bool = false, @@ -533,6 +535,21 @@ pub const VirtualMachine = struct { return this.rareData().mimeTypeFromString(this.allocator, str); } + pub fn onAfterEventLoop(this: *VirtualMachine) void { + if (this.after_event_loop_callback) |cb| { + var ctx = this.after_event_loop_callback_ctx; + this.after_event_loop_callback = null; + this.after_event_loop_callback_ctx = null; + cb(ctx); + } + } + + pub fn isEventLoopAlive(vm: *const VirtualMachine) bool { + return vm.active_tasks > 0 or + vm.event_loop_handle.?.active > 0 or + vm.event_loop.tasks.count > 0; + } + const SourceMapHandlerGetter = struct { vm: *VirtualMachine, printer: *js_printer.BufferPrinter, @@ -732,7 +749,7 @@ pub const VirtualMachine = struct { this.exit_handler.dispatchOnBeforeExit(); var dispatch = false; while (true) { - while (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) : (dispatch = true) { + while (this.isEventLoopAlive()) : (dispatch = true) { this.tick(); this.eventLoop().autoTickActive(); } @@ -741,7 +758,7 @@ pub const VirtualMachine = struct { this.exit_handler.dispatchOnBeforeExit(); dispatch = false; - if (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) continue; + if (this.isEventLoopAlive()) continue; } break; @@ -884,7 +901,7 @@ pub const VirtualMachine = struct { this.eventLoop().tick(); while (true) { - while (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) { + while (this.isEventLoopAlive()) { this.tick(); this.eventLoop().autoTickActive(); } diff --git a/src/bun.js/rare_data.zig b/src/bun.js/rare_data.zig index af2b90f3b..78354676f 100644 --- a/src/bun.js/rare_data.zig +++ b/src/bun.js/rare_data.zig @@ -26,7 +26,7 @@ hot_map: ?HotMap = null, tail_cleanup_hook: ?*CleanupHook = null, cleanup_hook: ?*CleanupHook = null, -file_polls_: ?*JSC.FilePoll.HiveArray = null, +file_polls_: ?*JSC.FilePoll.Store = null, global_dns_data: ?*JSC.DNS.GlobalData = null, @@ -102,10 +102,10 @@ pub const HotMap = struct { } }; -pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.HiveArray { +pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.Store { return this.file_polls_ orelse { - this.file_polls_ = vm.allocator.create(JSC.FilePoll.HiveArray) catch unreachable; - this.file_polls_.?.* = JSC.FilePoll.HiveArray.init(vm.allocator); + this.file_polls_ = vm.allocator.create(JSC.FilePoll.Store) catch unreachable; + this.file_polls_.?.* = JSC.FilePoll.Store.init(vm.allocator); return this.file_polls_.?; }; } diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index e9c9637b9..b4de8aad2 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -263,8 +263,7 @@ pub const WebWorker = struct { this.setStatus(.running); // don't run the GC if we don't actually need to - if (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or - vm.event_loop_handle.?.active > 0 or + if (vm.isEventLoopAlive() or vm.eventLoop().tickConcurrentWithCount() > 0) { vm.global.vm().releaseWeakRefs(); @@ -275,7 +274,7 @@ pub const WebWorker = struct { // always doing a first tick so we call CppTask without delay after dispatchOnline vm.tick(); - while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) { + while (vm.isEventLoopAlive()) { vm.tick(); if (this.requested_terminate) break; vm.eventLoop().autoTickActive(); diff --git a/src/bun_js.zig b/src/bun_js.zig index 3578fb2ae..46942b849 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -297,8 +297,7 @@ pub const Run = struct { } // don't run the GC if we don't actually need to - if (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or - vm.event_loop_handle.?.active > 0 or + if (vm.isEventLoopAlive() or vm.eventLoop().tickConcurrentWithCount() > 0) { vm.global.vm().releaseWeakRefs(); @@ -315,7 +314,7 @@ pub const Run = struct { } while (true) { - while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) { + while (vm.isEventLoopAlive()) { vm.tick(); // Report exceptions in hot-reloaded modules @@ -343,7 +342,7 @@ pub const Run = struct { vm.onUnhandledError(this.vm.global, this.vm.pending_internal_promise.result(vm.global.vm())); } } else { - while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) { + while (vm.isEventLoopAlive()) { vm.tick(); vm.eventLoop().autoTickActive(); } diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index 4fce7759a..bbbd36dc7 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -835,7 +835,7 @@ pub const TestCommand = struct { vm.eventLoop().tickPossiblyForever(); while (true) { - while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) { + while (vm.isEventLoopAlive()) { vm.tick(); vm.eventLoop().autoTickActive(); } diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 88c649747..201544a75 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -827,7 +827,11 @@ pub const Loop = extern struct { } pub fn tick(this: *Loop) void { - us_loop_run_bun_tick(this); + us_loop_run_bun_tick(this, 0); + } + + pub fn tickWithTimeout(this: *Loop, timeoutMs: i64) void { + us_loop_run_bun_tick(this, timeoutMs); } pub fn nextTick(this: *Loop, comptime UserType: type, user_data: UserType, comptime deferCallback: fn (ctx: UserType) void) void { @@ -889,7 +893,7 @@ pub const Loop = extern struct { extern fn us_loop_free(loop: ?*Loop) void; extern fn us_loop_ext(loop: ?*Loop) ?*anyopaque; extern fn us_loop_run(loop: ?*Loop) void; - extern fn us_loop_run_bun_tick(loop: ?*Loop) void; + extern fn us_loop_run_bun_tick(loop: ?*Loop, timouetMs: i64) void; extern fn us_wakeup_loop(loop: ?*Loop) void; extern fn us_loop_integrate(loop: ?*Loop) void; extern fn us_loop_iteration_number(loop: ?*Loop) c_longlong; |