diff options
author | 2021-11-27 01:12:58 -0800 | |
---|---|---|
committer | 2021-12-16 19:18:51 -0800 | |
commit | 809c1e46d8c7a144156e5f6d8ad89351b5f7d740 (patch) | |
tree | 9b56e67442c8632d85b0f07641ab949334b637fa /src/io/io_darwin.zig | |
parent | c0560931255c0d303df7478b1778c0e54759f010 (diff) | |
download | bun-809c1e46d8c7a144156e5f6d8ad89351b5f7d740.tar.gz bun-809c1e46d8c7a144156e5f6d8ad89351b5f7d740.tar.zst bun-809c1e46d8c7a144156e5f6d8ad89351b5f7d740.zip |
[npm install] starting to look good!
Diffstat (limited to 'src/io/io_darwin.zig')
-rw-r--r-- | src/io/io_darwin.zig | 129 |
1 files changed, 112 insertions, 17 deletions
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index d3e09cf15..2042a38ea 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -29,6 +29,7 @@ io_inflight: usize = 0, timeouts: FIFO(Completion) = .{}, completed: FIFO(Completion) = .{}, io_pending: FIFO(Completion) = .{}, +last_event_fd: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(32), pub fn init(entries: u12, flags: u32) !IO { const kq = try os.kqueue(); @@ -124,8 +125,8 @@ fn flush(self: *IO, wait_for_completions: bool) !void { self.io_inflight += change_events; self.io_inflight -= new_events; - for (events[0..new_events]) |event| { - const completion = @intToPtr(*Completion, event.udata); + for (events[0..new_events]) |kevent| { + const completion = @intToPtr(*Completion, kevent.udata); completion.next = null; self.completed.push(completion); } @@ -139,29 +140,59 @@ fn flush(self: *IO, wait_for_completions: bool) !void { } fn flush_io(self: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { - for (events) |*event, flushed| { + for (events) |*kevent, flushed| { const completion = io_pending_top.* orelse return flushed; io_pending_top.* = completion.next; const event_info = switch (completion.operation) { - .accept => |op| [2]c_int{ op.socket, os.EVFILT_READ }, - .connect => |op| [2]c_int{ op.socket, os.EVFILT_WRITE }, - .read => |op| [2]c_int{ op.fd, os.EVFILT_READ }, - .write => |op| [2]c_int{ op.fd, os.EVFILT_WRITE }, - .recv => |op| [2]c_int{ op.socket, os.EVFILT_READ }, - .send => |op| [2]c_int{ op.socket, os.EVFILT_WRITE }, + .accept => |op| [3]c_int{ + op.socket, + os.EVFILT_READ, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, + .connect => |op| [3]c_int{ + op.socket, + os.EVFILT_WRITE, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, + .read => |op| [3]c_int{ + op.fd, + os.EVFILT_READ, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, + .write => |op| [3]c_int{ + op.fd, + os.EVFILT_WRITE, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, + .recv => |op| [3]c_int{ + op.socket, + os.EVFILT_READ, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, + .send => |op| [3]c_int{ + op.socket, + os.EVFILT_WRITE, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, + .event => |op| [3]c_int{ + op.fd, + os.EVFILT_USER, + os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + }, else => @panic("invalid completion operation queued for io"), }; - event.* = .{ + kevent.* = .{ .ident = @intCast(u32, event_info[0]), .filter = @intCast(i16, event_info[1]), - .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + .flags = @intCast(u16, event_info[2]), .fflags = 0, .data = 0, .udata = @ptrToInt(completion), }; } + return events.len; } @@ -242,6 +273,9 @@ const Operation = union(enum) { len: u32, offset: u64, }, + event: struct { + fd: os.fd_t, + }, }; fn submit( @@ -299,6 +333,67 @@ fn submit( pub const AcceptError = os.AcceptError || os.SetSockOptError; +// -- NOT DONE YET +pub fn eventfd(self: *IO) os.fd_t { + return @intCast(os.fd_t, self.last_event_fd.fetchAdd(1, .Monotonic)); +} + +pub fn triggerEvent(event_fd: os.fd_t, completion: *Completion) !void { + var kevents = [1]os.Kevent{ + .{ + .ident = @intCast(usize, event_fd), + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(completion), + }, + }; + + var change_events = [1]os.Kevent{ + .{ + .ident = @intCast(usize, event_fd), + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(completion), + }, + }; + + const ret = try os.kevent(global.kq, &kevents, &change_events, null); + if (ret != 1) { + @panic("failed to trigger event"); + } +} + +// -- NOT DONE YET +pub fn event( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: void, + ) void, + completion: *Completion, + fd: os.fd_t, +) void { + self.submit( + context, + callback, + completion, + .event, + .{ + .fd = fd, + }, + struct { + fn doOperation(op: anytype) void {} + }, + ); +} + pub fn accept( self: *IO, comptime Context: type, @@ -374,12 +469,12 @@ pub fn close( }, struct { fn doOperation(op: anytype) CloseError!void { - return switch (os.errno(os.system.close(op.fd))) { + return switch (os.system.close(op.fd)) { 0 => {}, os.EBADF => error.FileDescriptorInvalid, os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 os.EIO => error.InputOutput, - else => |errno| os.unexpectedErrno(errno), + else => |errno| os.unexpectedErrno(os.errno(errno)), }; } }, @@ -558,7 +653,7 @@ pub fn recv( ); } -pub const SendError = os.SendError; +pub const SendError = os.SendToError; pub fn send( self: *IO, @@ -572,7 +667,7 @@ pub fn send( completion: *Completion, socket: os.socket_t, buffer: []const u8, - flags: u32, + _: u32, ) void { self.submit( context, @@ -583,11 +678,11 @@ pub fn send( .socket = socket, .buf = buffer.ptr, .len = @intCast(u32, buffer_limit(buffer.len)), - .flags = flags, + .flags = 0, }, struct { fn doOperation(op: anytype) SendError!usize { - return os.send(op.socket, op.buf[0..op.len], op.flags); + return os.sendto(op.socket, op.buf[0..op.len], op.flags, null, 0); } }, ); |