aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packages/bun-usockets/src/eventing/epoll_kqueue.c25
-rw-r--r--src/bun.js/base.zig84
-rw-r--r--src/bun.js/event_loop.zig75
-rw-r--r--src/bun.js/javascript.zig25
-rw-r--r--src/bun.js/rare_data.zig8
-rw-r--r--src/bun.js/web_worker.zig5
-rw-r--r--src/bun_js.zig7
-rw-r--r--src/cli/test_command.zig2
-rw-r--r--src/deps/uws.zig8
9 files changed, 167 insertions, 72 deletions
diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c
index 0e2c1f92b..7ab2be826 100644
--- a/packages/bun-usockets/src/eventing/epoll_kqueue.c
+++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c
@@ -24,10 +24,13 @@
void Bun__internal_dispatch_ready_poll(void* loop, void* poll);
// void Bun__internal_dispatch_ready_poll(void* loop, void* poll) {}
-void us_loop_run_bun_tick(struct us_loop_t *loop);
-
+#ifndef WIN32
/* Cannot include this one on Windows */
#include <unistd.h>
+#include <stdint.h>
+#endif
+
+void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs);
/* Pointer tags are used to indicate a Bun pointer versus a uSockets pointer */
#define UNSET_BITS_49_UNTIL_64 0x0000FFFFFFFFFFFF
@@ -172,7 +175,7 @@ void us_loop_run(struct us_loop_t *loop) {
}
-void us_loop_run_bun_tick(struct us_loop_t *loop) {
+void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs) {
us_loop_integrate(loop);
if (loop->num_polls == 0)
@@ -183,10 +186,20 @@ void us_loop_run_bun_tick(struct us_loop_t *loop) {
/* Fetch ready polls */
#ifdef LIBUS_USE_EPOLL
- loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
+ if (timeoutMs > 0) {
+ loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, (int)timeoutMs);
+ } else {
+ loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
+ }
#else
- struct timespec ts = {0, 0};
- loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
+ if (timeoutMs > 0) {
+ struct timespec ts = {0, 0};
+ ts.tv_sec = timeoutMs / 1000;
+ ts.tv_nsec = (timeoutMs % 1000) * 1000000;
+ loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, &ts);
+ } else {
+ loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
+ }
#endif
/* Iterate ready polls, dispatching them by type */
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index c59d3111f..a6df36c4f 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -1706,6 +1706,7 @@ pub const FilePoll = struct {
/// on macOS kevent64 has an extra pointer field so we use it for that
/// linux doesn't have a field like that
generation_number: KQueueGenerationNumber = 0,
+ next_to_free: ?*FilePoll = null,
const FileReader = JSC.WebCore.FileReader;
const FileSink = JSC.WebCore.FileSink;
@@ -1789,20 +1790,21 @@ pub const FilePoll = struct {
this.deinitWithVM(vm);
}
- pub fn deinitWithoutVM(this: *FilePoll, loop: *uws.Loop, polls: *JSC.FilePoll.HiveArray) void {
+ fn deinitPossiblyDefer(this: *FilePoll, vm: *JSC.VirtualMachine, loop: *uws.Loop, polls: *JSC.FilePoll.Store) void {
if (this.isRegistered()) {
_ = this.unregister(loop);
}
this.owner = Deactivated.owner;
+ const was_ever_registered = this.flags.contains(.was_ever_registered);
this.flags = Flags.Set{};
this.fd = invalid_fd;
- polls.put(this);
+ polls.put(this, vm, was_ever_registered);
}
pub fn deinitWithVM(this: *FilePoll, vm: *JSC.VirtualMachine) void {
var loop = vm.event_loop_handle.?;
- this.deinitWithoutVM(loop, vm.rareData().filePolls(vm));
+ this.deinitPossiblyDefer(vm, loop, vm.rareData().filePolls(vm));
}
pub fn isRegistered(this: *const FilePoll) bool {
@@ -1888,6 +1890,9 @@ pub const FilePoll = struct {
nonblocking,
+ was_ever_registered,
+ ignore_updates,
+
pub fn poll(this: Flags) Flags {
return switch (this) {
.readable => .poll_readable,
@@ -1949,7 +1954,64 @@ pub const FilePoll = struct {
}
};
- pub const HiveArray = bun.HiveArray(FilePoll, 128).Fallback;
+ const HiveArray = bun.HiveArray(FilePoll, 128).Fallback;
+
+ // We defer freeing FilePoll until the end of the next event loop iteration
+ // This ensures that we don't free a FilePoll before the next callback is called
+ pub const Store = struct {
+ hive: HiveArray,
+ pending_free_head: ?*FilePoll = null,
+ pending_free_tail: ?*FilePoll = null,
+
+ const log = Output.scoped(.FilePoll, false);
+
+ pub fn init(allocator: std.mem.Allocator) Store {
+ return .{
+ .hive = HiveArray.init(allocator),
+ };
+ }
+
+ pub fn get(this: *Store) *FilePoll {
+ return this.hive.get();
+ }
+
+ pub fn processDeferredFrees(this: *Store) void {
+ var next = this.pending_free_head;
+ while (next) |current| {
+ next = current.next_to_free;
+ current.next_to_free = null;
+ this.hive.put(current);
+ }
+ this.pending_free_head = null;
+ this.pending_free_tail = null;
+ }
+
+ pub fn put(this: *Store, poll: *FilePoll, vm: *JSC.VirtualMachine, ever_registered: bool) void {
+ if (!ever_registered) {
+ this.hive.put(poll);
+ return;
+ }
+
+ std.debug.assert(poll.next_to_free == null);
+
+ if (this.pending_free_tail) |tail| {
+ std.debug.assert(this.pending_free_head != null);
+ std.debug.assert(tail.next_to_free == null);
+ tail.next_to_free = poll;
+ }
+
+ if (this.pending_free_head == null) {
+ this.pending_free_head = poll;
+ std.debug.assert(this.pending_free_tail == null);
+ }
+
+ poll.flags.insert(.ignore_updates);
+ this.pending_free_tail = poll;
+ std.debug.assert(vm.after_event_loop_callback == null or vm.after_event_loop_callback == @as(?JSC.OpaqueCallback, @ptrCast(&processDeferredFrees)));
+ vm.after_event_loop_callback = @ptrCast(&processDeferredFrees);
+ vm.after_event_loop_callback_ctx = this;
+ }
+ };
const log = Output.scoped(.FilePoll, false);
@@ -1999,7 +2061,6 @@ pub const FilePoll = struct {
pub fn activate(this: *FilePoll, loop: *uws.Loop) void {
loop.num_polls += @as(i32, @intFromBool(!this.flags.contains(.has_incremented_poll_count)));
loop.active += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count)));
-
this.flags.insert(.has_incremented_poll_count);
}
@@ -2012,6 +2073,8 @@ pub const FilePoll = struct {
poll.fd = @intCast(fd);
poll.flags = Flags.Set.init(flags);
poll.owner = owner;
+ poll.next_to_free = null;
+
if (KQueueGenerationNumber != u0) {
max_generation_number +%= 1;
poll.generation_number = max_generation_number;
@@ -2052,7 +2115,11 @@ pub const FilePoll = struct {
if (tag.tag() != @field(Pollable.Tag, "FilePoll"))
return;
- var file_poll = tag.as(FilePoll);
+ var file_poll: *FilePoll = tag.as(FilePoll);
+ if (file_poll.flags.contains(.ignore_updates)) {
+ return;
+ }
+
if (comptime Environment.isMac)
onKQueueEvent(file_poll, loop, &loop.ready_polls[@as(usize, @intCast(loop.current_ready_poll))])
else if (comptime Environment.isLinux)
@@ -2107,7 +2174,7 @@ pub const FilePoll = struct {
@as(std.os.fd_t, @intCast(fd)),
&event,
);
-
+ this.flags.insert(.was_ever_registered);
if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
return errno;
}
@@ -2180,6 +2247,8 @@ pub const FilePoll = struct {
}
};
+ this.flags.insert(.was_ever_registered);
+
// If an error occurs while
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
@@ -2349,7 +2418,6 @@ pub const FilePoll = struct {
this.flags.remove(.poll_writable);
this.flags.remove(.poll_process);
this.flags.remove(.poll_machport);
-
if (this.isActive())
this.deactivate(loop);
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 1a8732318..640a9276c 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -737,6 +737,31 @@ pub const EventLoop = struct {
if (loop.num_polls > 0 or loop.active > 0) {
loop.tick();
this.processGCTimer();
+ ctx.onAfterEventLoop();
+ // this.afterUSocketsTick();
+ }
+ }
+
+ pub fn autoTickWithTimeout(this: *EventLoop, timeoutMs: i64) void {
+ var ctx = this.virtual_machine;
+ var loop = ctx.event_loop_handle.?;
+
+ // Some tasks need to keep the event loop alive for one more tick.
+ // We want to keep the event loop alive long enough to process those ticks and any microtasks
+ //
+ // BUT. We don't actually have an idle event in that case.
+ // That means the process will be waiting forever on nothing.
+ // So we need to drain the counter immediately before entering uSockets loop
+ const pending_unref = ctx.pending_unref_counter;
+ if (pending_unref > 0) {
+ ctx.pending_unref_counter = 0;
+ loop.unrefCount(pending_unref);
+ }
+
+ if (loop.num_polls > 0 or loop.active > 0) {
+ loop.tickWithTimeout(timeoutMs);
+ this.processGCTimer();
+ ctx.onAfterEventLoop();
// this.afterUSocketsTick();
}
}
@@ -761,6 +786,7 @@ pub const EventLoop = struct {
loop.tick();
this.processGCTimer();
+ ctx.onAfterEventLoop();
this.tickConcurrent();
this.tick();
}
@@ -783,6 +809,7 @@ pub const EventLoop = struct {
if (loop.active > 0) {
loop.tick();
this.processGCTimer();
+ ctx.onAfterEventLoop();
// this.afterUSocketsTick();
}
}
@@ -817,26 +844,6 @@ pub const EventLoop = struct {
this.global.handleRejectedPromises();
}
- pub fn runUSocketsLoop(this: *EventLoop) void {
- var ctx = this.virtual_machine;
-
- ctx.global.vm().releaseWeakRefs();
- ctx.global.vm().drainMicrotasks();
- var loop = ctx.event_loop_handle orelse return;
-
- if (loop.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (loop.num_polls > 0 or this.start_server_on_next_tick))) {
- if (this.tickConcurrentWithCount() > 0) {
- this.tick();
- }
-
- ctx.is_us_loop_entered = true;
- this.start_server_on_next_tick = false;
- ctx.enterUWSLoop();
- ctx.is_us_loop_entered = false;
- ctx.autoGarbageCollect();
- }
- }
-
pub fn waitForPromise(this: *EventLoop, promise: JSC.AnyPromise) void {
switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
@@ -852,6 +859,8 @@ pub const EventLoop = struct {
}
}
+ // TODO: this implementation is terrible
+ // we should not be checking the millitimestamp every time
pub fn waitForPromiseWithTimeout(this: *EventLoop, promise: JSC.AnyPromise, timeout: u32) bool {
return switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
@@ -862,12 +871,13 @@ pub const EventLoop = struct {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
- if (std.time.milliTimestamp() - start_time > timeout) {
- return false;
- }
-
if (promise.status(this.global.vm()) == .Pending) {
- this.autoTick();
+ const remaining = std.time.milliTimestamp() - start_time;
+ if (remaining >= timeout) {
+ return false;
+ }
+
+ this.autoTickWithTimeout(remaining);
}
}
return true;
@@ -876,21 +886,6 @@ pub const EventLoop = struct {
};
}
- pub fn waitForTasks(this: *EventLoop) void {
- this.tick();
- while (this.tasks.count > 0) {
- this.tick();
-
- if (this.virtual_machine.event_loop_handle != null) {
- this.runUSocketsLoop();
- }
- } else {
- if (this.virtual_machine.event_loop_handle != null) {
- this.runUSocketsLoop();
- }
- }
- }
-
pub fn enqueueTask(this: *EventLoop, task: Task) void {
this.tasks.writeItem(task) catch unreachable;
}
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index 59b10a75a..90a3bbc66 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -449,6 +449,9 @@ pub const VirtualMachine = struct {
transpiler_store: JSC.RuntimeTranspilerStore,
+ after_event_loop_callback_ctx: ?*anyopaque = null,
+ after_event_loop_callback: ?OpaqueCallback = null,
+
/// The arguments used to launch the process _after_ the script name and bun and any flags applied to Bun
/// "bun run foo --bar"
/// ["--bar"]
@@ -479,7 +482,6 @@ pub const VirtualMachine = struct {
active_tasks: usize = 0,
rare_data: ?*JSC.RareData = null,
- us_loop_reference_count: usize = 0,
is_us_loop_entered: bool = false,
pending_internal_promise: *JSC.JSInternalPromise = undefined,
auto_install_dependencies: bool = false,
@@ -533,6 +535,21 @@ pub const VirtualMachine = struct {
return this.rareData().mimeTypeFromString(this.allocator, str);
}
+ pub fn onAfterEventLoop(this: *VirtualMachine) void {
+ if (this.after_event_loop_callback) |cb| {
+ var ctx = this.after_event_loop_callback_ctx;
+ this.after_event_loop_callback = null;
+ this.after_event_loop_callback_ctx = null;
+ cb(ctx);
+ }
+ }
+
+ pub fn isEventLoopAlive(vm: *const VirtualMachine) bool {
+ return vm.active_tasks > 0 or
+ vm.event_loop_handle.?.active > 0 or
+ vm.event_loop.tasks.count > 0;
+ }
+
const SourceMapHandlerGetter = struct {
vm: *VirtualMachine,
printer: *js_printer.BufferPrinter,
@@ -732,7 +749,7 @@ pub const VirtualMachine = struct {
this.exit_handler.dispatchOnBeforeExit();
var dispatch = false;
while (true) {
- while (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) : (dispatch = true) {
+ while (this.isEventLoopAlive()) : (dispatch = true) {
this.tick();
this.eventLoop().autoTickActive();
}
@@ -741,7 +758,7 @@ pub const VirtualMachine = struct {
this.exit_handler.dispatchOnBeforeExit();
dispatch = false;
- if (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) continue;
+ if (this.isEventLoopAlive()) continue;
}
break;
@@ -884,7 +901,7 @@ pub const VirtualMachine = struct {
this.eventLoop().tick();
while (true) {
- while (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) {
+ while (this.isEventLoopAlive()) {
this.tick();
this.eventLoop().autoTickActive();
}
diff --git a/src/bun.js/rare_data.zig b/src/bun.js/rare_data.zig
index af2b90f3b..78354676f 100644
--- a/src/bun.js/rare_data.zig
+++ b/src/bun.js/rare_data.zig
@@ -26,7 +26,7 @@ hot_map: ?HotMap = null,
tail_cleanup_hook: ?*CleanupHook = null,
cleanup_hook: ?*CleanupHook = null,
-file_polls_: ?*JSC.FilePoll.HiveArray = null,
+file_polls_: ?*JSC.FilePoll.Store = null,
global_dns_data: ?*JSC.DNS.GlobalData = null,
@@ -102,10 +102,10 @@ pub const HotMap = struct {
}
};
-pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.HiveArray {
+pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.Store {
return this.file_polls_ orelse {
- this.file_polls_ = vm.allocator.create(JSC.FilePoll.HiveArray) catch unreachable;
- this.file_polls_.?.* = JSC.FilePoll.HiveArray.init(vm.allocator);
+ this.file_polls_ = vm.allocator.create(JSC.FilePoll.Store) catch unreachable;
+ this.file_polls_.?.* = JSC.FilePoll.Store.init(vm.allocator);
return this.file_polls_.?;
};
}
diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig
index e9c9637b9..b4de8aad2 100644
--- a/src/bun.js/web_worker.zig
+++ b/src/bun.js/web_worker.zig
@@ -263,8 +263,7 @@ pub const WebWorker = struct {
this.setStatus(.running);
// don't run the GC if we don't actually need to
- if (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or
- vm.event_loop_handle.?.active > 0 or
+ if (vm.isEventLoopAlive() or
vm.eventLoop().tickConcurrentWithCount() > 0)
{
vm.global.vm().releaseWeakRefs();
@@ -275,7 +274,7 @@ pub const WebWorker = struct {
// always doing a first tick so we call CppTask without delay after dispatchOnline
vm.tick();
- while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
+ while (vm.isEventLoopAlive()) {
vm.tick();
if (this.requested_terminate) break;
vm.eventLoop().autoTickActive();
diff --git a/src/bun_js.zig b/src/bun_js.zig
index 3578fb2ae..46942b849 100644
--- a/src/bun_js.zig
+++ b/src/bun_js.zig
@@ -297,8 +297,7 @@ pub const Run = struct {
}
// don't run the GC if we don't actually need to
- if (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or
- vm.event_loop_handle.?.active > 0 or
+ if (vm.isEventLoopAlive() or
vm.eventLoop().tickConcurrentWithCount() > 0)
{
vm.global.vm().releaseWeakRefs();
@@ -315,7 +314,7 @@ pub const Run = struct {
}
while (true) {
- while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
+ while (vm.isEventLoopAlive()) {
vm.tick();
// Report exceptions in hot-reloaded modules
@@ -343,7 +342,7 @@ pub const Run = struct {
vm.onUnhandledError(this.vm.global, this.vm.pending_internal_promise.result(vm.global.vm()));
}
} else {
- while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
+ while (vm.isEventLoopAlive()) {
vm.tick();
vm.eventLoop().autoTickActive();
}
diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig
index 4fce7759a..bbbd36dc7 100644
--- a/src/cli/test_command.zig
+++ b/src/cli/test_command.zig
@@ -835,7 +835,7 @@ pub const TestCommand = struct {
vm.eventLoop().tickPossiblyForever();
while (true) {
- while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
+ while (vm.isEventLoopAlive()) {
vm.tick();
vm.eventLoop().autoTickActive();
}
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index 88c649747..201544a75 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -827,7 +827,11 @@ pub const Loop = extern struct {
}
pub fn tick(this: *Loop) void {
- us_loop_run_bun_tick(this);
+ us_loop_run_bun_tick(this, 0);
+ }
+
+ pub fn tickWithTimeout(this: *Loop, timeoutMs: i64) void {
+ us_loop_run_bun_tick(this, timeoutMs);
}
pub fn nextTick(this: *Loop, comptime UserType: type, user_data: UserType, comptime deferCallback: fn (ctx: UserType) void) void {
@@ -889,7 +893,7 @@ pub const Loop = extern struct {
extern fn us_loop_free(loop: ?*Loop) void;
extern fn us_loop_ext(loop: ?*Loop) ?*anyopaque;
extern fn us_loop_run(loop: ?*Loop) void;
- extern fn us_loop_run_bun_tick(loop: ?*Loop) void;
+ extern fn us_loop_run_bun_tick(loop: ?*Loop, timouetMs: i64) void;
extern fn us_wakeup_loop(loop: ?*Loop) void;
extern fn us_loop_integrate(loop: ?*Loop) void;
extern fn us_loop_iteration_number(loop: ?*Loop) c_longlong;