aboutsummaryrefslogtreecommitdiff
path: root/src/io/io_darwin.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-08-28 21:28:05 -0700
committerGravatar GitHub <noreply@github.com> 2022-08-28 21:28:05 -0700
commitc1734c6ec5ef709ee4126b3474c7bee0a377a1fa (patch)
tree097710a13a1d85228efadf6d57823bb3a4f1c011 /src/io/io_darwin.zig
parentb2141a204fbc351a40467037138168aea23a6930 (diff)
downloadbun-c1734c6ec5ef709ee4126b3474c7bee0a377a1fa.tar.gz
bun-c1734c6ec5ef709ee4126b3474c7bee0a377a1fa.tar.zst
bun-c1734c6ec5ef709ee4126b3474c7bee0a377a1fa.zip
More reliable macOS event loop (#1166)
* More reliable macOS event loop * Reduce CPU usage of idling * Add another implementation * Add benchmark Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/io/io_darwin.zig')
-rw-r--r--src/io/io_darwin.zig304
1 files changed, 225 insertions, 79 deletions
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig
index 5157b0198..7906dd655 100644
--- a/src/io/io_darwin.zig
+++ b/src/io/io_darwin.zig
@@ -478,7 +478,6 @@ const Time = @import("./time.zig").Time;
const IO = @This();
-kq: os.fd_t,
time: Time = .{},
io_inflight: usize = 0,
timeouts: FIFO(Completion) = .{},
@@ -486,28 +485,176 @@ completed: FIFO(Completion) = .{},
io_pending: FIFO(Completion) = .{},
last_event_fd: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(32),
pending_count: usize = 0,
+waker: Waker = undefined,
pub fn hasNoWork(this: *IO) bool {
return this.pending_count == 0 and this.io_inflight == 0 and this.io_pending.peek() == null and this.completed.peek() == null and this.timeouts.peek() == null;
}
-pub fn init(_: u12, _: u32, _: anytype) !IO {
- const kq = try os.kqueue();
- assert(kq > -1);
- return IO{ .kq = kq };
+pub fn init(_: u12, _: u32, waker: Waker) !IO {
+ return IO{
+ .waker = waker,
+ };
}
+pub const Waker = struct {
+ kq: os.fd_t,
+ machport: *anyopaque = undefined,
+ machport_buf: []u8 = &.{},
+
+ const zeroed = std.mem.zeroes([16]Kevent64);
+
+ pub fn wake(this: Waker) !void {
+ if (!io_darwin_schedule_wakeup(this.machport)) {
+ return error.WakeUpFailed;
+ }
+ }
+
+ pub fn wait(this: Waker) !usize {
+ var events = zeroed;
+
+ const count = std.os.system.kevent64(
+ this.kq,
+ &events,
+ 0,
+ &events,
+ events.len,
+ 0,
+ null,
+ );
+
+ if (count < 0) {
+ return asError(std.c.getErrno(count));
+ }
+
+ return @intCast(usize, count);
+ }
+
+ extern fn io_darwin_create_machport(
+ *anyopaque,
+ os.fd_t,
+ *anyopaque,
+ usize,
+ ) ?*anyopaque;
+
+ extern fn io_darwin_schedule_wakeup(
+ *anyopaque,
+ ) bool;
+
+ pub fn init(allocator: std.mem.Allocator) !Waker {
+ const kq = try os.kqueue();
+ assert(kq > -1);
+ var machport_buf = try allocator.alloc(u8, 1024);
+ const machport = io_darwin_create_machport(
+ machport_buf.ptr,
+ kq,
+ machport_buf.ptr,
+ 1024,
+ ) orelse return error.MachportCreationFailed;
+
+ return Waker{
+ .kq = kq,
+ .machport = machport,
+ .machport_buf = machport_buf,
+ };
+ }
+};
+
+pub const UserFilterWaker = struct {
+ kq: os.fd_t,
+ ident: u64 = undefined,
+
+ pub fn wake(this: UserFilterWaker) !void {
+ var events = zeroed;
+ events[0].ident = this.ident;
+ events[0].filter = c.EVFILT_USER;
+ events[0].data = 0;
+ events[0].fflags = c.NOTE_TRIGGER;
+ events[0].udata = 0;
+ const errno = std.os.system.kevent64(
+ this.kq,
+ &events,
+ 1,
+ &events,
+ events.len,
+ 0,
+ null,
+ );
+
+ if (errno < 0) {
+ return asError(std.c.getErrno(errno));
+ }
+ }
+
+ const zeroed = std.mem.zeroes([16]Kevent64);
+
+ pub fn wait(this: UserFilterWaker) !u64 {
+ var events = zeroed;
+ events[0].ident = 123;
+ events[0].filter = c.EVFILT_USER;
+ events[0].flags = c.EV_ADD | c.EV_ENABLE;
+ events[0].data = 0;
+ events[0].udata = 0;
+
+ const errno = std.os.system.kevent64(
+ this.kq,
+ &events,
+ 1,
+ &events,
+ events.len,
+ 0,
+ null,
+ );
+ if (errno < 0) {
+ return asError(std.c.getErrno(errno));
+ }
+
+ return @intCast(u64, errno);
+ }
+
+ pub fn init(_: std.mem.Allocator) !UserFilterWaker {
+ const kq = try os.kqueue();
+ assert(kq > -1);
+
+ var events = [1]Kevent64{std.mem.zeroes(Kevent64)};
+ events[0].ident = 123;
+ events[0].filter = c.EVFILT_USER;
+ events[0].flags = c.EV_ADD | c.EV_ENABLE;
+ events[0].data = 0;
+ events[0].udata = 0;
+ var timespec = default_timespec;
+ const errno = std.os.system.kevent64(
+ kq,
+ &events,
+ 1,
+ &events,
+ @intCast(c_int, events.len),
+ 0,
+ &timespec,
+ );
+
+ std.debug.assert(errno == 0);
+
+ return UserFilterWaker{
+ .kq = kq,
+ .ident = 123,
+ };
+ }
+};
+
pub fn deinit(self: *IO) void {
- assert(self.kq > -1);
- os.close(self.kq);
- self.kq = -1;
+ assert(self.waker.kq > -1);
+ os.close(self.waker.kq);
+ self.waker.kq = -1;
}
/// Pass all queued submissions to the kernel and peek for completions.
pub fn tick(self: *IO) !void {
- return self.flush(false);
+ return self.flush(.no_wait);
}
+const Kevent64 = std.os.system.kevent64_s;
+
/// Pass all queued submissions to the kernel and run for `nanoseconds`.
/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
/// in the __kernel_timespec struct.
@@ -536,64 +683,89 @@ pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
// Loop until our timeout completion is processed above, which sets timed_out to true.
// LLVM shouldn't be able to cache timed_out's value here since its address escapes above.
while (!timed_out) {
- try self.flush(true);
+ try self.flush(.wait_for_completion);
}
}
const default_timespec = std.mem.zeroInit(os.timespec, .{});
-fn flush(self: *IO, wait_for_completions: bool) !void {
+pub fn wait(self: *IO, context: anytype, comptime function: anytype) void {
+ self.flush(.block) catch unreachable;
+ function(context);
+}
+
+fn flush(self: *IO, comptime _: @Type(.EnumLiteral)) !void {
var io_pending = self.io_pending.peek();
- var events: [512]os.Kevent = undefined;
+ var events: [2048]Kevent64 = undefined;
// Check timeouts and fill events with completions in io_pending
// (they will be submitted through kevent).
// Timeouts are expired here and possibly pushed to the completed queue.
const next_timeout = self.flush_timeouts();
- const change_events = self.flush_io(&events, &io_pending);
- // Only call kevent() if we need to submit io events or if we need to wait for completions.
- if (change_events > 0 or self.completed.peek() == null) {
- // Zero timeouts for kevent() implies a non-blocking poll
- var ts = default_timespec;
-
- // We need to wait (not poll) on kevent if there's nothing to submit or complete.
- // We should never wait indefinitely (timeout_ptr = null for kevent) given:
- // - tick() is non-blocking (wait_for_completions = false)
- // - run_for_ns() always submits a timeout
- if (change_events == 0 and self.completed.peek() == null) {
- if (wait_for_completions) {
- const timeout_ns = next_timeout orelse @panic("kevent() blocking forever");
- ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
- ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
- } else if (self.io_inflight == 0) {
- return;
- }
- }
+ // Flush any timeouts
+ {
+ var completed = self.completed;
+ self.completed = .{};
+ if (completed.pop()) |first| {
+ (first.callback)(self, first);
- const new_events = try os.kevent(
- self.kq,
- events[0..change_events],
- events[0..events.len],
- &ts,
- );
+ while (completed.pop()) |completion|
+ (completion.callback)(self, completion);
- // Mark the io events submitted only after kevent() successfully processed them
- self.io_pending.out = io_pending;
- if (io_pending == null) {
- self.io_pending.in = null;
+ return;
}
+ }
- self.io_inflight += change_events;
- self.io_inflight -= new_events;
+ const change_events = self.flush_io(&events, &io_pending);
- for (events[0..new_events]) |kevent| {
- const completion = @intToPtr(*Completion, kevent.udata);
- completion.next = null;
- self.completed.push(completion);
+ // Zero timeouts for kevent() implies a non-blocking poll
+ var ts = default_timespec;
+
+ // We need to wait (not poll) on kevent if there's nothing to submit or complete.
+ if (next_timeout) |timeout_ns| {
+ ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
+ ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
+ }
+
+ const new_events_ = std.os.system.kevent64(
+ self.waker.kq,
+ &events,
+ @intCast(c_int, change_events),
+ &events,
+ @intCast(c_int, events.len),
+ 0,
+ if (next_timeout != null) &ts else null,
+ );
+
+ if (new_events_ < 0) {
+ return std.debug.panic("kevent() failed {s}", .{@tagName(std.c.getErrno(new_events_))});
+ }
+ const new_events = @intCast(usize, new_events_);
+
+ // Mark the io events submitted only after kevent() successfully processed them
+ self.io_pending.out = io_pending;
+ if (io_pending == null) {
+ self.io_pending.in = null;
+ }
+
+ var new_io_inflight_events = new_events;
+ self.io_inflight += change_events;
+
+ for (events[0..new_events]) |kevent| {
+ if (kevent.filter == c.EVFILT_MACHPORT) {
+ new_io_inflight_events -= 1;
+ continue;
}
+
+ const completion = @intToPtr(*Completion, kevent.udata);
+ completion.next = null;
+ self.completed.push(completion);
}
+ // subtract machport events from io_inflight
+ self.io_inflight -= new_io_inflight_events;
+
var completed = self.completed;
self.completed = .{};
while (completed.pop()) |completion| {
@@ -601,7 +773,7 @@ fn flush(self: *IO, wait_for_completions: bool) !void {
}
}
-fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize {
+fn flush_io(_: *IO, events: []Kevent64, io_pending_top: *?*Completion) usize {
for (events) |*kevent, flushed| {
const completion = io_pending_top.* orelse return flushed;
io_pending_top.* = completion.next;
@@ -645,6 +817,7 @@ fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize {
};
kevent.* = .{
+ .ext = [2]u64{ 0, 0 },
.ident = @intCast(u32, event_info[0]),
.filter = @intCast(i16, event_info[1]),
.flags = @intCast(u16, event_info[2]),
@@ -660,11 +833,13 @@ fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize {
fn flush_timeouts(self: *IO) ?u64 {
var min_timeout: ?u64 = null;
var timeouts: ?*Completion = self.timeouts.peek();
+
+ // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call.
+ const now: u64 = if (timeouts != null) self.time.monotonic() else 0;
+
while (timeouts) |completion| {
timeouts = completion.next;
- // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call.
- const now = self.time.monotonic();
const expires = completion.operation.timeout.expires;
// NOTE: remove() could be O(1) here with a doubly-linked-list
@@ -833,35 +1008,6 @@ pub fn eventfd(self: *IO) os.fd_t {
return @intCast(os.fd_t, self.last_event_fd.fetchAdd(1, .Monotonic));
}
-pub fn triggerEvent(event_fd: os.fd_t, completion: *Completion) !void {
- var kevents = [1]os.Kevent{
- .{
- .ident = @intCast(usize, event_fd),
- .filter = c.EVFILT_USER,
- .flags = c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT,
- .fflags = 0,
- .data = 0,
- .udata = @ptrToInt(completion),
- },
- };
-
- var change_events = [1]os.Kevent{
- .{
- .ident = @intCast(usize, event_fd),
- .filter = c.EVFILT_USER,
- .flags = c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT,
- .fflags = 0,
- .data = 0,
- .udata = @ptrToInt(completion),
- },
- };
-
- const ret = try os.kevent(global.kq, &kevents, &change_events, null);
- if (ret != 1) {
- @panic("failed to trigger event");
- }
-}
-
// -- NOT DONE YET
pub fn event(
self: *IO,