aboutsummaryrefslogtreecommitdiff
path: root/src/io/io_darwin.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2021-11-27 01:12:58 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2021-12-16 19:18:51 -0800
commit809c1e46d8c7a144156e5f6d8ad89351b5f7d740 (patch)
tree9b56e67442c8632d85b0f07641ab949334b637fa /src/io/io_darwin.zig
parentc0560931255c0d303df7478b1778c0e54759f010 (diff)
downloadbun-809c1e46d8c7a144156e5f6d8ad89351b5f7d740.tar.gz
bun-809c1e46d8c7a144156e5f6d8ad89351b5f7d740.tar.zst
bun-809c1e46d8c7a144156e5f6d8ad89351b5f7d740.zip
[npm install] starting to look good!
Diffstat (limited to 'src/io/io_darwin.zig')
-rw-r--r--src/io/io_darwin.zig129
1 files changed, 112 insertions, 17 deletions
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig
index d3e09cf15..2042a38ea 100644
--- a/src/io/io_darwin.zig
+++ b/src/io/io_darwin.zig
@@ -29,6 +29,7 @@ io_inflight: usize = 0,
timeouts: FIFO(Completion) = .{},
completed: FIFO(Completion) = .{},
io_pending: FIFO(Completion) = .{},
+last_event_fd: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(32),
pub fn init(entries: u12, flags: u32) !IO {
const kq = try os.kqueue();
@@ -124,8 +125,8 @@ fn flush(self: *IO, wait_for_completions: bool) !void {
self.io_inflight += change_events;
self.io_inflight -= new_events;
- for (events[0..new_events]) |event| {
- const completion = @intToPtr(*Completion, event.udata);
+ for (events[0..new_events]) |kevent| {
+ const completion = @intToPtr(*Completion, kevent.udata);
completion.next = null;
self.completed.push(completion);
}
@@ -139,29 +140,59 @@ fn flush(self: *IO, wait_for_completions: bool) !void {
}
fn flush_io(self: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize {
- for (events) |*event, flushed| {
+ for (events) |*kevent, flushed| {
const completion = io_pending_top.* orelse return flushed;
io_pending_top.* = completion.next;
const event_info = switch (completion.operation) {
- .accept => |op| [2]c_int{ op.socket, os.EVFILT_READ },
- .connect => |op| [2]c_int{ op.socket, os.EVFILT_WRITE },
- .read => |op| [2]c_int{ op.fd, os.EVFILT_READ },
- .write => |op| [2]c_int{ op.fd, os.EVFILT_WRITE },
- .recv => |op| [2]c_int{ op.socket, os.EVFILT_READ },
- .send => |op| [2]c_int{ op.socket, os.EVFILT_WRITE },
+ .accept => |op| [3]c_int{
+ op.socket,
+ os.EVFILT_READ,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
+ .connect => |op| [3]c_int{
+ op.socket,
+ os.EVFILT_WRITE,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
+ .read => |op| [3]c_int{
+ op.fd,
+ os.EVFILT_READ,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
+ .write => |op| [3]c_int{
+ op.fd,
+ os.EVFILT_WRITE,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
+ .recv => |op| [3]c_int{
+ op.socket,
+ os.EVFILT_READ,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
+ .send => |op| [3]c_int{
+ op.socket,
+ os.EVFILT_WRITE,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
+ .event => |op| [3]c_int{
+ op.fd,
+ os.EVFILT_USER,
+ os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ },
else => @panic("invalid completion operation queued for io"),
};
- event.* = .{
+ kevent.* = .{
.ident = @intCast(u32, event_info[0]),
.filter = @intCast(i16, event_info[1]),
- .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ .flags = @intCast(u16, event_info[2]),
.fflags = 0,
.data = 0,
.udata = @ptrToInt(completion),
};
}
+
return events.len;
}
@@ -242,6 +273,9 @@ const Operation = union(enum) {
len: u32,
offset: u64,
},
+ event: struct {
+ fd: os.fd_t,
+ },
};
fn submit(
@@ -299,6 +333,67 @@ fn submit(
pub const AcceptError = os.AcceptError || os.SetSockOptError;
+// -- NOT DONE YET
+pub fn eventfd(self: *IO) os.fd_t {
+ return @intCast(os.fd_t, self.last_event_fd.fetchAdd(1, .Monotonic));
+}
+
+pub fn triggerEvent(event_fd: os.fd_t, completion: *Completion) !void {
+ var kevents = [1]os.Kevent{
+ .{
+ .ident = @intCast(usize, event_fd),
+ .filter = os.EVFILT_USER,
+ .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(completion),
+ },
+ };
+
+ var change_events = [1]os.Kevent{
+ .{
+ .ident = @intCast(usize, event_fd),
+ .filter = os.EVFILT_USER,
+ .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(completion),
+ },
+ };
+
+ const ret = try os.kevent(global.kq, &kevents, &change_events, null);
+ if (ret != 1) {
+ @panic("failed to trigger event");
+ }
+}
+
+// -- NOT DONE YET
+pub fn event(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .event,
+ .{
+ .fd = fd,
+ },
+ struct {
+ fn doOperation(op: anytype) void {}
+ },
+ );
+}
+
pub fn accept(
self: *IO,
comptime Context: type,
@@ -374,12 +469,12 @@ pub fn close(
},
struct {
fn doOperation(op: anytype) CloseError!void {
- return switch (os.errno(os.system.close(op.fd))) {
+ return switch (os.system.close(op.fd)) {
0 => {},
os.EBADF => error.FileDescriptorInvalid,
os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
os.EIO => error.InputOutput,
- else => |errno| os.unexpectedErrno(errno),
+ else => |errno| os.unexpectedErrno(os.errno(errno)),
};
}
},
@@ -558,7 +653,7 @@ pub fn recv(
);
}
-pub const SendError = os.SendError;
+pub const SendError = os.SendToError;
pub fn send(
self: *IO,
@@ -572,7 +667,7 @@ pub fn send(
completion: *Completion,
socket: os.socket_t,
buffer: []const u8,
- flags: u32,
+ _: u32,
) void {
self.submit(
context,
@@ -583,11 +678,11 @@ pub fn send(
.socket = socket,
.buf = buffer.ptr,
.len = @intCast(u32, buffer_limit(buffer.len)),
- .flags = flags,
+ .flags = 0,
},
struct {
fn doOperation(op: anytype) SendError!usize {
- return os.send(op.socket, op.buf[0..op.len], op.flags);
+ return os.sendto(op.socket, op.buf[0..op.len], op.flags, null, 0);
}
},
);