diff options
author | 2022-08-28 21:28:05 -0700 | |
---|---|---|
committer | 2022-08-28 21:28:05 -0700 | |
commit | c1734c6ec5ef709ee4126b3474c7bee0a377a1fa (patch) | |
tree | 097710a13a1d85228efadf6d57823bb3a4f1c011 /src/io/io_darwin.zig | |
parent | b2141a204fbc351a40467037138168aea23a6930 (diff) | |
download | bun-c1734c6ec5ef709ee4126b3474c7bee0a377a1fa.tar.gz bun-c1734c6ec5ef709ee4126b3474c7bee0a377a1fa.tar.zst bun-c1734c6ec5ef709ee4126b3474c7bee0a377a1fa.zip |
More reliable macOS event loop (#1166)
* More reliable macOS event loop
* Reduce CPU usage of idling
* Add another implementation
* Add benchmark
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/io/io_darwin.zig')
-rw-r--r-- | src/io/io_darwin.zig | 304 |
1 files changed, 225 insertions, 79 deletions
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index 5157b0198..7906dd655 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -478,7 +478,6 @@ const Time = @import("./time.zig").Time; const IO = @This(); -kq: os.fd_t, time: Time = .{}, io_inflight: usize = 0, timeouts: FIFO(Completion) = .{}, @@ -486,28 +485,176 @@ completed: FIFO(Completion) = .{}, io_pending: FIFO(Completion) = .{}, last_event_fd: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(32), pending_count: usize = 0, +waker: Waker = undefined, pub fn hasNoWork(this: *IO) bool { return this.pending_count == 0 and this.io_inflight == 0 and this.io_pending.peek() == null and this.completed.peek() == null and this.timeouts.peek() == null; } -pub fn init(_: u12, _: u32, _: anytype) !IO { - const kq = try os.kqueue(); - assert(kq > -1); - return IO{ .kq = kq }; +pub fn init(_: u12, _: u32, waker: Waker) !IO { + return IO{ + .waker = waker, + }; } +pub const Waker = struct { + kq: os.fd_t, + machport: *anyopaque = undefined, + machport_buf: []u8 = &.{}, + + const zeroed = std.mem.zeroes([16]Kevent64); + + pub fn wake(this: Waker) !void { + if (!io_darwin_schedule_wakeup(this.machport)) { + return error.WakeUpFailed; + } + } + + pub fn wait(this: Waker) !usize { + var events = zeroed; + + const count = std.os.system.kevent64( + this.kq, + &events, + 0, + &events, + events.len, + 0, + null, + ); + + if (count < 0) { + return asError(std.c.getErrno(count)); + } + + return @intCast(usize, count); + } + + extern fn io_darwin_create_machport( + *anyopaque, + os.fd_t, + *anyopaque, + usize, + ) ?*anyopaque; + + extern fn io_darwin_schedule_wakeup( + *anyopaque, + ) bool; + + pub fn init(allocator: std.mem.Allocator) !Waker { + const kq = try os.kqueue(); + assert(kq > -1); + var machport_buf = try allocator.alloc(u8, 1024); + const machport = io_darwin_create_machport( + machport_buf.ptr, + kq, + machport_buf.ptr, + 1024, + ) orelse return error.MachportCreationFailed; + + return Waker{ + .kq = kq, + .machport = machport, + .machport_buf = machport_buf, + }; + } +}; + +pub const UserFilterWaker = struct { + kq: os.fd_t, + ident: u64 = undefined, + + pub fn wake(this: UserFilterWaker) !void { + var events = zeroed; + events[0].ident = this.ident; + events[0].filter = c.EVFILT_USER; + events[0].data = 0; + events[0].fflags = c.NOTE_TRIGGER; + events[0].udata = 0; + const errno = std.os.system.kevent64( + this.kq, + &events, + 1, + &events, + events.len, + 0, + null, + ); + + if (errno < 0) { + return asError(std.c.getErrno(errno)); + } + } + + const zeroed = std.mem.zeroes([16]Kevent64); + + pub fn wait(this: UserFilterWaker) !u64 { + var events = zeroed; + events[0].ident = 123; + events[0].filter = c.EVFILT_USER; + events[0].flags = c.EV_ADD | c.EV_ENABLE; + events[0].data = 0; + events[0].udata = 0; + + const errno = std.os.system.kevent64( + this.kq, + &events, + 1, + &events, + events.len, + 0, + null, + ); + if (errno < 0) { + return asError(std.c.getErrno(errno)); + } + + return @intCast(u64, errno); + } + + pub fn init(_: std.mem.Allocator) !UserFilterWaker { + const kq = try os.kqueue(); + assert(kq > -1); + + var events = [1]Kevent64{std.mem.zeroes(Kevent64)}; + events[0].ident = 123; + events[0].filter = c.EVFILT_USER; + events[0].flags = c.EV_ADD | c.EV_ENABLE; + events[0].data = 0; + events[0].udata = 0; + var timespec = default_timespec; + const errno = std.os.system.kevent64( + kq, + &events, + 1, + &events, + @intCast(c_int, events.len), + 0, + ×pec, + ); + + std.debug.assert(errno == 0); + + return UserFilterWaker{ + .kq = kq, + .ident = 123, + }; + } +}; + pub fn deinit(self: *IO) void { - assert(self.kq > -1); - os.close(self.kq); - self.kq = -1; + assert(self.waker.kq > -1); + os.close(self.waker.kq); + self.waker.kq = -1; } /// Pass all queued submissions to the kernel and peek for completions. pub fn tick(self: *IO) !void { - return self.flush(false); + return self.flush(.no_wait); } +const Kevent64 = std.os.system.kevent64_s; + /// Pass all queued submissions to the kernel and run for `nanoseconds`. /// The `nanoseconds` argument is a u63 to allow coercion to the i64 used /// in the __kernel_timespec struct. @@ -536,64 +683,89 @@ pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { // Loop until our timeout completion is processed above, which sets timed_out to true. // LLVM shouldn't be able to cache timed_out's value here since its address escapes above. while (!timed_out) { - try self.flush(true); + try self.flush(.wait_for_completion); } } const default_timespec = std.mem.zeroInit(os.timespec, .{}); -fn flush(self: *IO, wait_for_completions: bool) !void { +pub fn wait(self: *IO, context: anytype, comptime function: anytype) void { + self.flush(.block) catch unreachable; + function(context); +} + +fn flush(self: *IO, comptime _: @Type(.EnumLiteral)) !void { var io_pending = self.io_pending.peek(); - var events: [512]os.Kevent = undefined; + var events: [2048]Kevent64 = undefined; // Check timeouts and fill events with completions in io_pending // (they will be submitted through kevent). // Timeouts are expired here and possibly pushed to the completed queue. const next_timeout = self.flush_timeouts(); - const change_events = self.flush_io(&events, &io_pending); - // Only call kevent() if we need to submit io events or if we need to wait for completions. - if (change_events > 0 or self.completed.peek() == null) { - // Zero timeouts for kevent() implies a non-blocking poll - var ts = default_timespec; - - // We need to wait (not poll) on kevent if there's nothing to submit or complete. - // We should never wait indefinitely (timeout_ptr = null for kevent) given: - // - tick() is non-blocking (wait_for_completions = false) - // - run_for_ns() always submits a timeout - if (change_events == 0 and self.completed.peek() == null) { - if (wait_for_completions) { - const timeout_ns = next_timeout orelse @panic("kevent() blocking forever"); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - } else if (self.io_inflight == 0) { - return; - } - } + // Flush any timeouts + { + var completed = self.completed; + self.completed = .{}; + if (completed.pop()) |first| { + (first.callback)(self, first); - const new_events = try os.kevent( - self.kq, - events[0..change_events], - events[0..events.len], - &ts, - ); + while (completed.pop()) |completion| + (completion.callback)(self, completion); - // Mark the io events submitted only after kevent() successfully processed them - self.io_pending.out = io_pending; - if (io_pending == null) { - self.io_pending.in = null; + return; } + } - self.io_inflight += change_events; - self.io_inflight -= new_events; + const change_events = self.flush_io(&events, &io_pending); - for (events[0..new_events]) |kevent| { - const completion = @intToPtr(*Completion, kevent.udata); - completion.next = null; - self.completed.push(completion); + // Zero timeouts for kevent() implies a non-blocking poll + var ts = default_timespec; + + // We need to wait (not poll) on kevent if there's nothing to submit or complete. + if (next_timeout) |timeout_ns| { + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); + } + + const new_events_ = std.os.system.kevent64( + self.waker.kq, + &events, + @intCast(c_int, change_events), + &events, + @intCast(c_int, events.len), + 0, + if (next_timeout != null) &ts else null, + ); + + if (new_events_ < 0) { + return std.debug.panic("kevent() failed {s}", .{@tagName(std.c.getErrno(new_events_))}); + } + const new_events = @intCast(usize, new_events_); + + // Mark the io events submitted only after kevent() successfully processed them + self.io_pending.out = io_pending; + if (io_pending == null) { + self.io_pending.in = null; + } + + var new_io_inflight_events = new_events; + self.io_inflight += change_events; + + for (events[0..new_events]) |kevent| { + if (kevent.filter == c.EVFILT_MACHPORT) { + new_io_inflight_events -= 1; + continue; } + + const completion = @intToPtr(*Completion, kevent.udata); + completion.next = null; + self.completed.push(completion); } + // subtract machport events from io_inflight + self.io_inflight -= new_io_inflight_events; + var completed = self.completed; self.completed = .{}; while (completed.pop()) |completion| { @@ -601,7 +773,7 @@ fn flush(self: *IO, wait_for_completions: bool) !void { } } -fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { +fn flush_io(_: *IO, events: []Kevent64, io_pending_top: *?*Completion) usize { for (events) |*kevent, flushed| { const completion = io_pending_top.* orelse return flushed; io_pending_top.* = completion.next; @@ -645,6 +817,7 @@ fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { }; kevent.* = .{ + .ext = [2]u64{ 0, 0 }, .ident = @intCast(u32, event_info[0]), .filter = @intCast(i16, event_info[1]), .flags = @intCast(u16, event_info[2]), @@ -660,11 +833,13 @@ fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { fn flush_timeouts(self: *IO) ?u64 { var min_timeout: ?u64 = null; var timeouts: ?*Completion = self.timeouts.peek(); + + // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call. + const now: u64 = if (timeouts != null) self.time.monotonic() else 0; + while (timeouts) |completion| { timeouts = completion.next; - // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call. - const now = self.time.monotonic(); const expires = completion.operation.timeout.expires; // NOTE: remove() could be O(1) here with a doubly-linked-list @@ -833,35 +1008,6 @@ pub fn eventfd(self: *IO) os.fd_t { return @intCast(os.fd_t, self.last_event_fd.fetchAdd(1, .Monotonic)); } -pub fn triggerEvent(event_fd: os.fd_t, completion: *Completion) !void { - var kevents = [1]os.Kevent{ - .{ - .ident = @intCast(usize, event_fd), - .filter = c.EVFILT_USER, - .flags = c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(completion), - }, - }; - - var change_events = [1]os.Kevent{ - .{ - .ident = @intCast(usize, event_fd), - .filter = c.EVFILT_USER, - .flags = c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(completion), - }, - }; - - const ret = try os.kevent(global.kq, &kevents, &change_events, null); - if (ret != 1) { - @panic("failed to trigger event"); - } -} - // -- NOT DONE YET pub fn event( self: *IO, |