diff options
-rw-r--r-- | src/io/io_linux.zig | 143 |
1 files changed, 31 insertions, 112 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index 2d8a992fe..2ba2b3e36 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -142,12 +142,11 @@ const os = struct { pub const EHWPOISON = 133; }; -pub const pretend_older_kernel = true; const Features = struct { - connect_poll: bool = pretend_older_kernel, - close_poll: bool = pretend_older_kernel, - replace_recv_with_readv: bool = pretend_older_kernel, - replace_send_with_writev: bool = pretend_older_kernel, + connect_blocking: bool = false, + close_blocking: bool = false, + replace_recv_with_readv: bool = false, + replace_send_with_writev: bool = false, }; var features = Features{}; @@ -452,14 +451,20 @@ unqueued: FIFO(Completion) = .{}, /// Completions that are ready to have their callbacks run. completed: FIFO(Completion) = .{}, +next_tick: FIFO(Completion) = .{}, + pub fn init(entries_: u12, flags: u32) !IO { var ring: IO_Uring = undefined; var entries = entries_; const kernel = Platform.kernelVersion(); + + if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 5, .patch = 0 }) == .lt) { + features.connect_blocking = true; + } + if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 6, .patch = 0 }) == .lt) { - features.close_poll = true; - features.connect_poll = true; + features.close_blocking = true; features.replace_recv_with_readv = true; features.replace_send_with_writev = true; } @@ -499,6 +504,9 @@ pub fn deinit(self: *IO) void { /// Pass all queued submissions to the kernel and peek for completions. pub fn tick(self: *IO) !void { + while (self.next_tick.pop()) |completion| { + completion.complete(); + } // We assume that all timeouts submitted by `run_for_ns()` will be reaped by `run_for_ns()` // and that `tick()` and `run_for_ns()` cannot be run concurrently. // Therefore `timeouts` here will never be decremented and `etime` will always be false. @@ -524,6 +532,10 @@ pub fn tick(self: *IO) !void { /// The `nanoseconds` argument is a u63 to allow coercion to the i64 used /// in the timespec struct. pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { + while (self.next_tick.pop()) |completion| { + completion.complete(); + } + // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC. @@ -671,12 +683,6 @@ pub const Completion = struct { op.address.getOsSockLen(), ); }, - .close_poll => |op| { - linux.io_uring_prep_poll_add(sqe, op.fd, linux.POLL.HUP | linux.POLL.IN | linux.POLL.OUT); - }, - .connect_poll => |*op| { - linux.io_uring_prep_poll_add(sqe, op.socket, linux.POLL.HUP | linux.POLL.OUT | linux.POLL.IN); - }, .fsync => |op| { linux.io_uring_prep_fsync(sqe, op.fd, 0); }, @@ -741,21 +747,7 @@ pub const Completion = struct { } else @intCast(os.socket_t, completion.result); completion.callback(completion.context, completion, &result); }, - .close_poll => { - var op = &completion.operation.close_poll; - const rc = linux.close(op.fd); - completion.result = @intCast(i32, rc); - const result = if (completion.result < 0) switch (-completion.result) { - os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 - os.EBADF => error.FileDescriptorInvalid, - os.EDQUOT => error.DiskQuota, - os.EIO => error.InputOutput, - os.ENOSPC => error.NoSpaceLeft, - else => |errno| asError(errno), - } else assert(completion.result == 0); - completion.callback(completion.context, completion, &result); - }, .close => { const result = if (completion.result < 0) switch (-completion.result) { os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 @@ -767,47 +759,6 @@ pub const Completion = struct { } else assert(completion.result == 0); completion.callback(completion.context, completion, &result); }, - .connect_poll => { - var op = &completion.operation.connect_poll; - var err_code: i32 = undefined; - var size: u32 = @sizeOf(u32); - const rc = linux.getsockopt(op.socket, os.SOL.SOCKET, os.SO.ERROR, @ptrCast([*]u8, &err_code), &size); - completion.result = @intCast(i32, rc); - - const result = if (completion.result < 0) switch (-completion.result) { - os.EAGAIN, os.EWOULDBLOCK, os.EINPROGRESS, os.EINTR => { - _ = linux.connect(op.socket, &op.address.any, op.address.getOsSockLen()); - completion.io.enqueue(completion); - return; - }, - os.EACCES => error.AccessDenied, - os.EADDRINUSE => error.AddressInUse, - os.EADDRNOTAVAIL => error.AddressNotAvailable, - os.EAFNOSUPPORT => error.AddressFamilyNotSupported, - os.EALREADY => error.OpenAlreadyInProgress, - os.EBADF => error.FileDescriptorInvalid, - os.ECONNREFUSED => error.ConnectionRefused, - os.ECONNRESET => error.ConnectionResetByPeer, - os.EISCONN => error.AlreadyConnected, - os.ENETUNREACH => error.NetworkUnreachable, - os.ENOENT => error.FileNotFound, - os.ENOTSOCK => error.FileDescriptorNotASocket, - os.EPERM => error.PermissionDenied, - os.EPROTOTYPE => error.ProtocolNotSupported, - os.ETIMEDOUT => error.ConnectionTimedOut, - else => |errno| asError(errno), - } else { - // go back to blocking mode - var opts = linux.fcntl(op.socket, os.F.GETFL, 0); - if ((opts & os.O.NONBLOCK) != 0) { - opts &= ~(@as(u32, os.O.NONBLOCK)); - _ = linux.fcntl(op.socket, os.F.SETFL, opts); - } - - assert(completion.result == 0); - }; - completion.callback(completion.context, completion, &result); - }, .connect => { const result = if (completion.result < 0) switch (-completion.result) { os.EAGAIN, os.EINPROGRESS, os.EINTR => { @@ -956,17 +907,10 @@ const Operation = union(enum) { close: struct { fd: os.fd_t, }, - close_poll: struct { - fd: os.fd_t, - }, connect: struct { socket: os.socket_t, address: std.net.Address, }, - connect_poll: struct { - socket: os.socket_t, - address: std.net.Address, - }, fsync: struct { fd: os.fd_t, }, @@ -1091,30 +1035,14 @@ pub fn close( ); } }.wrapper, - .operation = if (features.close_poll) .{ - .close_poll = .{ .fd = fd }, - } else .{ - .close = .{ .fd = fd }, - }, + .operation = .{ .close = .{ .fd = fd } }, }; - if (features.close_poll) { - // go to non-blocking mode - var opts = linux.fcntl(fd, os.F.GETFL, 0); - if ((opts & os.O.NONBLOCK) == 0) { - opts |= os.O.NONBLOCK; - _ = linux.fcntl(fd, os.F.SETFL, opts); - } - + if (features.close_blocking) { const rc = linux.close(fd); - switch (linux.getErrno(rc)) { - .AGAIN, .INPROGRESS, .INTR => {}, - else => { - completion.result = @intCast(i32, rc); - self.completed.push(completion); - return; - }, - } + completion.result = @intCast(i32, rc); + self.next_tick.push(completion); + return; } self.enqueue(completion); @@ -1163,12 +1091,7 @@ pub fn connect( ); } }.wrapper, - .operation = if (features.connect_poll) .{ - .connect_poll = .{ - .socket = socket, - .address = address, - }, - } else .{ + .operation = .{ .connect = .{ .socket = socket, .address = address, @@ -1176,17 +1099,13 @@ pub fn connect( }, }; - if (features.connect_poll) { + if (features.connect_blocking) { const rc = linux.connect(socket, &address.any, address.getOsSockLen()); - switch (linux.getErrno(rc)) { - .AGAIN, .INPROGRESS, .INTR => {}, - else => { - completion.result = @intCast(i32, rc); - self.completed.push(completion); - return; - }, - } + completion.result = @intCast(i32, rc); + self.completed.push(completion); + return; } + self.enqueue(completion); } @@ -1575,7 +1494,7 @@ const Syscall = struct { }; pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { - return Syscall.socket(family, sock_type | os.O.NONBLOCK | os.SOCK.CLOEXEC, protocol); + return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC, protocol); } pub var global: IO = undefined; |