From 0e138bcc8f82715fcd84289ee7410570ceedd2a2 Mon Sep 17 00:00:00 2001 From: Jarred SUmner Date: Mon, 24 Jan 2022 23:30:23 -0800 Subject: Fallback to readev / writev --- src/io/io_linux.zig | 233 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 226 insertions(+), 7 deletions(-) (limited to 'src/io') diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index 1bf90eec6..c950ee8ea 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -1,5 +1,6 @@ const std = @import("std"); const assert = std.debug.assert; +const Platform = @import("analytics").GenerateHeader.GeneratePlatform; const os = struct { pub usingnamespace std.os; pub const EPERM = 1; @@ -141,6 +142,16 @@ const os = struct { pub const EHWPOISON = 133; }; +pub const pretend_older_kernel = false; +const Features = struct { + connect_poll: bool = pretend_older_kernel, + close_poll: bool = pretend_older_kernel, + replace_recv_with_readv: bool = pretend_older_kernel, + replace_send_with_writev: bool = pretend_older_kernel, +}; + +var features = Features{}; + pub const Errno = error{ EPERM, ENOENT, @@ -444,6 +455,26 @@ completed: FIFO(Completion) = .{}, pub fn init(entries_: u12, flags: u32) !IO { var ring: IO_Uring = undefined; var entries = entries_; + + const kernel = Platform.kernelVersion(); + if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 6, .patch = 0 }) == .lt) { + features.close_poll = true; + features.connect_poll = true; + features.replace_recv_with_readv = true; + features.replace_send_with_writev = true; + } + + var limit = linux.rlimit{ .cur = 0, .max = 0 }; + if (linux.getrlimit(.MEMLOCK, &limit) == 0) { + if (limit.cur < 16 * 1024) { + return error.@"memlock is too low. Please increase it to at least 64k"; + } + + if (limit.cur < 128 * 1024) { + entries = @minimum(256, entries); + } + } + while (true) { ring = IO_Uring.init(entries, flags) catch |err| { if (err == error.SystemResources) { @@ -640,6 +671,12 @@ pub const Completion = struct { op.address.getOsSockLen(), ); }, + .close_poll => |op| { + linux.io_uring_prep_poll_add(sqe, op.fd, linux.POLL.HUP | linux.POLL.IN | linux.POLL.OUT); + }, + .connect_poll => |*op| { + linux.io_uring_prep_poll_add(sqe, op.socket, linux.POLL.HUP | linux.POLL.OUT); + }, .fsync => |op| { linux.io_uring_prep_fsync(sqe, op.fd, 0); }, @@ -651,6 +688,14 @@ pub const Completion = struct { op.offset, ); }, + .readev => { + var op = &completion.operation.readev; + linux.io_uring_prep_readv(sqe, op.socket, &op.iovecs, 0); + }, + .writev => { + var op = &completion.operation.writev; + linux.io_uring_prep_writev(sqe, op.socket, &op.iovecs, 0); + }, .recv => |op| { linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL); }, @@ -696,6 +741,21 @@ pub const Completion = struct { } else @intCast(os.socket_t, completion.result); completion.callback(completion.context, completion, &result); }, + .close_poll => { + var op = &completion.operation.close_poll; + const rc = linux.close(op.fd); + completion.result = @intCast(i32, rc); + + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 + os.EBADF => error.FileDescriptorInvalid, + os.EDQUOT => error.DiskQuota, + os.EIO => error.InputOutput, + os.ENOSPC => error.NoSpaceLeft, + else => |errno| asError(errno), + } else assert(completion.result == 0); + completion.callback(completion.context, completion, &result); + }, .close => { const result = if (completion.result < 0) switch (-completion.result) { os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 @@ -707,6 +767,35 @@ pub const Completion = struct { } else assert(completion.result == 0); completion.callback(completion.context, completion, &result); }, + .connect_poll => { + var op = &completion.operation.connect_poll; + const rc = linux.connect(op.socket, &op.address.any, op.address.getOsSockLen()); + completion.result = @intCast(i32, rc); + + const result = if (completion.result < 0) switch (-completion.result) { + os.EAGAIN, os.EWOULDBLOCK, os.EINPROGRESS, os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EACCES => error.AccessDenied, + os.EADDRINUSE => error.AddressInUse, + os.EADDRNOTAVAIL => error.AddressNotAvailable, + os.EAFNOSUPPORT => error.AddressFamilyNotSupported, + os.EALREADY => error.OpenAlreadyInProgress, + os.EBADF => error.FileDescriptorInvalid, + os.ECONNREFUSED => error.ConnectionRefused, + os.ECONNRESET => error.ConnectionResetByPeer, + os.EISCONN => error.AlreadyConnected, + os.ENETUNREACH => error.NetworkUnreachable, + os.ENOENT => error.FileNotFound, + os.ENOTSOCK => error.FileDescriptorNotASocket, + os.EPERM => error.PermissionDenied, + os.EPROTOTYPE => error.ProtocolNotSupported, + os.ETIMEDOUT => error.ConnectionTimedOut, + else => |errno| asError(errno), + } else assert(completion.result == 0); + completion.callback(completion.context, completion, &result); + }, .connect => { const result = if (completion.result < 0) switch (-completion.result) { os.EINTR => { @@ -770,7 +859,7 @@ pub const Completion = struct { } else @intCast(usize, completion.result); completion.callback(completion.context, completion, &result); }, - .recv => { + .readev, .recv => { const result = if (completion.result < 0) switch (-completion.result) { os.EINTR => { completion.io.enqueue(completion); @@ -787,7 +876,7 @@ pub const Completion = struct { } else @intCast(usize, completion.result); completion.callback(completion.context, completion, &result); }, - .send => { + .writev, .send => { const result = if (completion.result < 0) switch (-completion.result) { os.EINTR => { completion.io.enqueue(completion); @@ -859,10 +948,17 @@ const Operation = union(enum) { close: struct { fd: os.fd_t, }, + close_poll: struct { + fd: os.fd_t, + }, connect: struct { socket: os.socket_t, address: std.net.Address, }, + connect_poll: struct { + socket: os.socket_t, + address: std.net.Address, + }, fsync: struct { fd: os.fd_t, }, @@ -871,6 +967,14 @@ const Operation = union(enum) { buffer: []u8, offset: u64, }, + readev: struct { + socket: os.socket_t, + iovecs: [1]os.iovec, + }, + writev: struct { + socket: os.socket_t, + iovecs: [1]os.iovec_const, + }, recv: struct { socket: os.socket_t, buffer: []u8, @@ -969,10 +1073,25 @@ pub fn close( ); } }.wrapper, - .operation = .{ + .operation = if (features.close_poll) .{ + .close_poll = .{ .fd = fd }, + } else .{ .close = .{ .fd = fd }, }, }; + + if (features.close_poll) { + const rc = linux.close(fd); + switch (linux.getErrno(rc)) { + .AGAIN, .INPROGRESS, .INTR => {}, + else => { + completion.result = @intCast(i32, rc); + self.completed.push(completion); + return; + }, + } + } + self.enqueue(completion); } @@ -1019,13 +1138,30 @@ pub fn connect( ); } }.wrapper, - .operation = .{ + .operation = if (features.connect_poll) .{ + .connect_poll = .{ + .socket = socket, + .address = address, + }, + } else .{ .connect = .{ .socket = socket, .address = address, }, }, }; + + if (features.connect_poll) { + const rc = linux.connect(socket, &address.any, address.getOsSockLen()); + switch (linux.getErrno(rc)) { + .AGAIN, .INPROGRESS, .INTR => {}, + else => { + completion.result = @intCast(i32, rc); + self.completed.push(completion); + return; + }, + } + } self.enqueue(completion); } @@ -1141,6 +1277,11 @@ pub fn recv( socket: os.socket_t, buffer: []u8, ) void { + if (features.replace_recv_with_readv) { + readev(self, Context, context, callback, completion, socket, buffer); + return; + } + completion.* = .{ .io = self, .context = context, @@ -1163,6 +1304,41 @@ pub fn recv( self.enqueue(completion); } +pub fn readev( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: RecvError!usize, + ) void, + completion: *Completion, + socket: os.socket_t, + buffer: []u8, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const RecvError!usize, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .readev = .{ + .socket = socket, + .iovecs = .{.{ .iov_base = buffer.ptr, .iov_len = buffer.len }}, + }, + }, + }; + self.enqueue(completion); +} + pub const SendError = error{ AccessDenied, WouldBlock, @@ -1192,6 +1368,11 @@ pub fn send( buffer: []const u8, _: u32, ) void { + if (features.replace_send_with_writev) { + writev(self, Context, context, callback, completion, socket, buffer, 0); + return; + } + completion.* = .{ .io = self, .context = context, @@ -1214,6 +1395,44 @@ pub fn send( self.enqueue(completion); } +pub fn writev( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: SendError!usize, + ) void, + completion: *Completion, + socket: os.socket_t, + buffer: []const u8, + _: u32, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const SendError!usize, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .writev = .{ + .socket = socket, + .iovecs = .{ + .{ .iov_base = buffer.ptr, .iov_len = buffer.len }, + }, + }, + }, + }; + self.enqueue(completion); +} + pub const TimeoutError = error{Canceled} || Errno; pub fn timeout( @@ -1314,7 +1533,7 @@ const SocketError = error{ const Syscall = struct { pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!os.socket_t { const rc = linux.socket(domain, socket_type, protocol); - return switch (linux.getErrno(rc)) { + return switch (linux.getErrno((rc))) { .SUCCESS => @intCast(os.fd_t, rc), .ACCES => return error.PermissionDenied, .AFNOSUPPORT => return error.AddressFamilyNotSupported, @@ -1325,13 +1544,13 @@ const Syscall = struct { .NOMEM => return error.SystemResources, .PROTONOSUPPORT => return error.ProtocolNotSupported, .PROTOTYPE => return error.SocketTypeNotSupported, - else => |err| return asError(err), + else => |err| return asError(@enumToInt(err)), }; } }; pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { - return Syscall.socket(family, sock_type, protocol); + return Syscall.socket(family, sock_type | os.O.NONBLOCK | os.O.CLOEXEC, protocol); } pub var global: IO = undefined; -- cgit v1.2.3