aboutsummaryrefslogtreecommitdiff
path: root/src/io/io_linux.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2021-11-16 15:06:21 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2021-12-16 19:18:51 -0800
commit4af743766d9e32789b90e39c761d5e896426d2f3 (patch)
tree7ae22cb8852e674d428091d4200bee414091980f /src/io/io_linux.zig
parent8f35f16c7ed97c50ee188f13ab9d5ea1008341ae (diff)
downloadbun-4af743766d9e32789b90e39c761d5e896426d2f3.tar.gz
bun-4af743766d9e32789b90e39c761d5e896426d2f3.tar.zst
bun-4af743766d9e32789b90e39c761d5e896426d2f3.zip
[bun install] Begin to add io_uring (via tigerbeetle/@kprotty's interface)
Diffstat (limited to 'src/io/io_linux.zig')
-rw-r--r--src/io/io_linux.zig874
1 files changed, 874 insertions, 0 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
new file mode 100644
index 000000000..794f18f90
--- /dev/null
+++ b/src/io/io_linux.zig
@@ -0,0 +1,874 @@
+const std = @import("std");
+const assert = std.debug.assert;
+const os = std.os;
+const linux = os.linux;
+const IO_Uring = linux.IO_Uring;
+const io_uring_cqe = linux.io_uring_cqe;
+const io_uring_sqe = linux.io_uring_sqe;
+
+const FIFO = @import("./fifo.zig").FIFO;
+const IO = @This();
+
+ring: IO_Uring,
+
+/// Operations not yet submitted to the kernel and waiting on available space in the
+/// submission queue.
+unqueued: FIFO(Completion) = .{},
+
+/// Completions that are ready to have their callbacks run.
+completed: FIFO(Completion) = .{},
+
+pub fn init(entries: u12, flags: u32) !IO {
+ return IO{ .ring = try IO_Uring.init(entries, flags) };
+}
+
+pub fn deinit(self: *IO) void {
+ self.ring.deinit();
+}
+
+/// Pass all queued submissions to the kernel and peek for completions.
+pub fn tick(self: *IO) !void {
+ // We assume that all timeouts submitted by `run_for_ns()` will be reaped by `run_for_ns()`
+ // and that `tick()` and `run_for_ns()` cannot be run concurrently.
+ // Therefore `timeouts` here will never be decremented and `etime` will always be false.
+ var timeouts: usize = 0;
+ var etime = false;
+
+ try self.flush(0, &timeouts, &etime);
+ assert(etime == false);
+
+ // Flush any SQEs that were queued while running completion callbacks in `flush()`:
+ // This is an optimization to avoid delaying submissions until the next tick.
+ // At the same time, we do not flush any ready CQEs since SQEs may complete synchronously.
+ // We guard against an io_uring_enter() syscall if we know we do not have any queued SQEs.
+ // We cannot use `self.ring.sq_ready()` here since this counts flushed and unflushed SQEs.
+ const queued = self.ring.sq.sqe_tail -% self.ring.sq.sqe_head;
+ if (queued > 0) {
+ try self.flush_submissions(0, &timeouts, &etime);
+ assert(etime == false);
+ }
+}
+
+/// Pass all queued submissions to the kernel and run for `nanoseconds`.
+/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
+/// in the __kernel_timespec struct.
+pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
+ // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the
+ // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are
+ // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC.
+ var current_ts: os.timespec = undefined;
+ os.clock_gettime(os.CLOCK_MONOTONIC, &current_ts) catch unreachable;
+ // The absolute CLOCK_MONOTONIC time after which we may return from this function:
+ const timeout_ts: os.__kernel_timespec = .{
+ .tv_sec = current_ts.tv_sec,
+ .tv_nsec = current_ts.tv_nsec + nanoseconds,
+ };
+ var timeouts: usize = 0;
+ var etime = false;
+ while (!etime) {
+ const timeout_sqe = self.ring.get_sqe() catch blk: {
+ // The submission queue is full, so flush submissions to make space:
+ try self.flush_submissions(0, &timeouts, &etime);
+ break :blk self.ring.get_sqe() catch unreachable;
+ };
+ // Submit an absolute timeout that will be canceled if any other SQE completes first:
+ linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, os.IORING_TIMEOUT_ABS);
+ timeout_sqe.user_data = 0;
+ timeouts += 1;
+ // The amount of time this call will block is bounded by the timeout we just submitted:
+ try self.flush(1, &timeouts, &etime);
+ }
+ // Reap any remaining timeouts, which reference the timespec in the current stack frame.
+ // The busy loop here is required to avoid a potential deadlock, as the kernel determines
+ // when the timeouts are pushed to the completion queue, not us.
+ while (timeouts > 0) _ = try self.flush_completions(0, &timeouts, &etime);
+}
+
+fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
+ // Flush any queued SQEs and reuse the same syscall to wait for completions if required:
+ try self.flush_submissions(wait_nr, timeouts, etime);
+ // We can now just peek for any CQEs without waiting and without another syscall:
+ try self.flush_completions(0, timeouts, etime);
+ // Run completions only after all completions have been flushed:
+ // Loop on a copy of the linked list, having reset the list first, so that any synchronous
+ // append on running a completion is executed only the next time round the event loop,
+ // without creating an infinite loop.
+ {
+ var copy = self.completed;
+ self.completed = .{};
+ while (copy.pop()) |completion| completion.complete();
+ }
+ // Again, loop on a copy of the list to avoid an infinite loop:
+ {
+ var copy = self.unqueued;
+ self.unqueued = .{};
+ while (copy.pop()) |completion| self.enqueue(completion);
+ }
+}
+
+fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
+ var cqes: [256]io_uring_cqe = undefined;
+ var wait_remaining = wait_nr;
+ while (true) {
+ // Guard against waiting indefinitely (if there are too few requests inflight),
+ // especially if this is not the first time round the loop:
+ const completed = self.ring.copy_cqes(&cqes, wait_remaining) catch |err| switch (err) {
+ error.SignalInterrupt => continue,
+ else => return err,
+ };
+ if (completed > wait_remaining) wait_remaining = 0 else wait_remaining -= completed;
+ for (cqes[0..completed]) |cqe| {
+ if (cqe.user_data == 0) {
+ timeouts.* -= 1;
+ // We are only done if the timeout submitted was completed due to time, not if
+ // it was completed due to the completion of an event, in which case `cqe.res`
+ // would be 0. It is possible for multiple timeout operations to complete at the
+ // same time if the nanoseconds value passed to `run_for_ns()` is very short.
+ if (-cqe.res == os.ETIME) etime.* = true;
+ continue;
+ }
+ const completion = @intToPtr(*Completion, @intCast(usize, cqe.user_data));
+ completion.result = cqe.res;
+ // We do not run the completion here (instead appending to a linked list) to avoid:
+ // * recursion through `flush_submissions()` and `flush_completions()`,
+ // * unbounded stack usage, and
+ // * confusing stack traces.
+ self.completed.push(completion);
+ }
+ if (completed < cqes.len) break;
+ }
+}
+
+fn flush_submissions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
+ while (true) {
+ _ = self.ring.submit_and_wait(wait_nr) catch |err| switch (err) {
+ error.SignalInterrupt => continue,
+ // Wait for some completions and then try again:
+ // See https://github.com/axboe/liburing/issues/281 re: error.SystemResources.
+ // Be careful also that copy_cqes() will flush before entering to wait (it does):
+ // https://github.com/axboe/liburing/commit/35c199c48dfd54ad46b96e386882e7ac341314c5
+ error.CompletionQueueOvercommitted, error.SystemResources => {
+ try self.flush_completions(1, timeouts, etime);
+ continue;
+ },
+ else => return err,
+ };
+ break;
+ }
+}
+
+fn enqueue(self: *IO, completion: *Completion) void {
+ const sqe = self.ring.get_sqe() catch |err| switch (err) {
+ error.SubmissionQueueFull => {
+ self.unqueued.push(completion);
+ return;
+ },
+ };
+ completion.prep(sqe);
+}
+
+/// This struct holds the data needed for a single io_uring operation
+pub const Completion = struct {
+ io: *IO,
+ result: i32 = undefined,
+ next: ?*Completion = null,
+ operation: Operation,
+ // This is one of the usecases for c_void outside of C code and as such c_void will
+ // be replaced with anyopaque eventually: https://github.com/ziglang/zig/issues/323
+ context: ?*c_void,
+ callback: fn (context: ?*c_void, completion: *Completion, result: *const c_void) void,
+
+ fn prep(completion: *Completion, sqe: *io_uring_sqe) void {
+ switch (completion.operation) {
+ .accept => |*op| {
+ linux.io_uring_prep_accept(
+ sqe,
+ op.socket,
+ &op.address,
+ &op.address_size,
+ os.SOCK_CLOEXEC,
+ );
+ },
+ .close => |op| {
+ linux.io_uring_prep_close(sqe, op.fd);
+ },
+ .connect => |*op| {
+ linux.io_uring_prep_connect(
+ sqe,
+ op.socket,
+ &op.address.any,
+ op.address.getOsSockLen(),
+ );
+ },
+ .fsync => |op| {
+ linux.io_uring_prep_fsync(sqe, op.fd, 0);
+ },
+ .read => |op| {
+ linux.io_uring_prep_read(
+ sqe,
+ op.fd,
+ op.buffer[0..buffer_limit(op.buffer.len)],
+ op.offset,
+ );
+ },
+ .recv => |op| {
+ linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL);
+ },
+ .send => |op| {
+ linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL);
+ },
+ .timeout => |*op| {
+ linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0);
+ },
+ .write => |op| {
+ linux.io_uring_prep_write(
+ sqe,
+ op.fd,
+ op.buffer[0..buffer_limit(op.buffer.len)],
+ op.offset,
+ );
+ },
+ }
+ sqe.user_data = @ptrToInt(completion);
+ }
+
+ fn complete(completion: *Completion) void {
+ switch (completion.operation) {
+ .accept => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNABORTED => error.ConnectionAborted,
+ os.EFAULT => unreachable,
+ os.EINVAL => error.SocketNotListening,
+ os.EMFILE => error.ProcessFdQuotaExceeded,
+ os.ENFILE => error.SystemFdQuotaExceeded,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EOPNOTSUPP => error.OperationNotSupported,
+ os.EPERM => error.PermissionDenied,
+ os.EPROTO => error.ProtocolFailure,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(os.socket_t, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .close => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
+ os.EBADF => error.FileDescriptorInvalid,
+ os.EDQUOT => error.DiskQuota,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
+ .connect => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EACCES => error.AccessDenied,
+ os.EADDRINUSE => error.AddressInUse,
+ os.EADDRNOTAVAIL => error.AddressNotAvailable,
+ os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
+ os.EAGAIN, os.EINPROGRESS => error.WouldBlock,
+ os.EALREADY => error.OpenAlreadyInProgress,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNREFUSED => error.ConnectionRefused,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EFAULT => unreachable,
+ os.EISCONN => error.AlreadyConnected,
+ os.ENETUNREACH => error.NetworkUnreachable,
+ os.ENOENT => error.FileNotFound,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EPERM => error.PermissionDenied,
+ os.EPROTOTYPE => error.ProtocolNotSupported,
+ os.ETIMEDOUT => error.ConnectionTimedOut,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
+ .fsync => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EBADF => error.FileDescriptorInvalid,
+ os.EDQUOT => error.DiskQuota,
+ os.EINVAL => error.ArgumentsInvalid,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ os.EROFS => error.ReadOnlyFileSystem,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
+ .read => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.NotOpenForReading,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EFAULT => unreachable,
+ os.EINVAL => error.Alignment,
+ os.EIO => error.InputOutput,
+ os.EISDIR => error.IsDir,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENXIO => error.Unseekable,
+ os.EOVERFLOW => error.Unseekable,
+ os.ESPIPE => error.Unseekable,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .recv => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNREFUSED => error.ConnectionRefused,
+ os.EFAULT => unreachable,
+ os.EINVAL => unreachable,
+ os.ENOMEM => error.SystemResources,
+ os.ENOTCONN => error.SocketNotConnected,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .send => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EACCES => error.AccessDenied,
+ os.EAGAIN => error.WouldBlock,
+ os.EALREADY => error.FastOpenAlreadyInProgress,
+ os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EDESTADDRREQ => unreachable,
+ os.EFAULT => unreachable,
+ os.EINVAL => unreachable,
+ os.EISCONN => unreachable,
+ os.EMSGSIZE => error.MessageTooBig,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENOTCONN => error.SocketNotConnected,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EOPNOTSUPP => error.OperationNotSupported,
+ os.EPIPE => error.BrokenPipe,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .timeout => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.ECANCELED => error.Canceled,
+ os.ETIME => {}, // A success.
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else unreachable;
+ completion.callback(completion.context, completion, &result);
+ },
+ .write => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.NotOpenForWriting,
+ os.EDESTADDRREQ => error.NotConnected,
+ os.EDQUOT => error.DiskQuota,
+ os.EFAULT => unreachable,
+ os.EFBIG => error.FileTooBig,
+ os.EINVAL => error.Alignment,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ os.ENXIO => error.Unseekable,
+ os.EOVERFLOW => error.Unseekable,
+ os.EPERM => error.AccessDenied,
+ os.EPIPE => error.BrokenPipe,
+ os.ESPIPE => error.Unseekable,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ }
+ }
+};
+
+/// This union encodes the set of operations supported as well as their arguments.
+const Operation = union(enum) {
+ accept: struct {
+ socket: os.socket_t,
+ address: os.sockaddr = undefined,
+ address_size: os.socklen_t = @sizeOf(os.sockaddr),
+ },
+ close: struct {
+ fd: os.fd_t,
+ },
+ connect: struct {
+ socket: os.socket_t,
+ address: std.net.Address,
+ },
+ fsync: struct {
+ fd: os.fd_t,
+ },
+ read: struct {
+ fd: os.fd_t,
+ buffer: []u8,
+ offset: u64,
+ },
+ recv: struct {
+ socket: os.socket_t,
+ buffer: []u8,
+ },
+ send: struct {
+ socket: os.socket_t,
+ buffer: []const u8,
+ },
+ timeout: struct {
+ timespec: os.__kernel_timespec,
+ },
+ write: struct {
+ fd: os.fd_t,
+ buffer: []const u8,
+ offset: u64,
+ },
+};
+
+pub const AcceptError = error{
+ WouldBlock,
+ FileDescriptorInvalid,
+ ConnectionAborted,
+ SocketNotListening,
+ ProcessFdQuotaExceeded,
+ SystemFdQuotaExceeded,
+ SystemResources,
+ FileDescriptorNotASocket,
+ OperationNotSupported,
+ PermissionDenied,
+ ProtocolFailure,
+} || os.UnexpectedError;
+
+pub fn accept(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: AcceptError!os.socket_t,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const AcceptError!os.socket_t, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .accept = .{
+ .socket = socket,
+ .address = undefined,
+ .address_size = @sizeOf(os.sockaddr),
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const CloseError = error{
+ FileDescriptorInvalid,
+ DiskQuota,
+ InputOutput,
+ NoSpaceLeft,
+} || os.UnexpectedError;
+
+pub fn close(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: CloseError!void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const CloseError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .close = .{ .fd = fd },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const ConnectError = error{
+ AccessDenied,
+ AddressInUse,
+ AddressNotAvailable,
+ AddressFamilyNotSupported,
+ WouldBlock,
+ OpenAlreadyInProgress,
+ FileDescriptorInvalid,
+ ConnectionRefused,
+ AlreadyConnected,
+ NetworkUnreachable,
+ FileNotFound,
+ FileDescriptorNotASocket,
+ PermissionDenied,
+ ProtocolNotSupported,
+ ConnectionTimedOut,
+} || os.UnexpectedError;
+
+pub fn connect(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ConnectError!void,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ address: std.net.Address,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const ConnectError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .connect = .{
+ .socket = socket,
+ .address = address,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const FsyncError = error{
+ FileDescriptorInvalid,
+ DiskQuota,
+ ArgumentsInvalid,
+ InputOutput,
+ NoSpaceLeft,
+ ReadOnlyFileSystem,
+} || os.UnexpectedError;
+
+pub fn fsync(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: FsyncError!void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const FsyncError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .fsync = .{
+ .fd = fd,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const ReadError = error{
+ WouldBlock,
+ NotOpenForReading,
+ ConnectionResetByPeer,
+ Alignment,
+ InputOutput,
+ IsDir,
+ SystemResources,
+ Unseekable,
+} || os.UnexpectedError;
+
+pub fn read(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ReadError!usize,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+ buffer: []u8,
+ offset: u64,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const ReadError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .read = .{
+ .fd = fd,
+ .buffer = buffer,
+ .offset = offset,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const RecvError = error{
+ WouldBlock,
+ FileDescriptorInvalid,
+ ConnectionRefused,
+ SystemResources,
+ SocketNotConnected,
+ FileDescriptorNotASocket,
+} || os.UnexpectedError;
+
+pub fn recv(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: RecvError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []u8,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const RecvError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .recv = .{
+ .socket = socket,
+ .buffer = buffer,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const SendError = error{
+ AccessDenied,
+ WouldBlock,
+ FastOpenAlreadyInProgress,
+ AddressFamilyNotSupported,
+ FileDescriptorInvalid,
+ ConnectionResetByPeer,
+ MessageTooBig,
+ SystemResources,
+ SocketNotConnected,
+ FileDescriptorNotASocket,
+ OperationNotSupported,
+ BrokenPipe,
+} || os.UnexpectedError;
+
+pub fn send(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: SendError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []const u8,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const SendError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .send = .{
+ .socket = socket,
+ .buffer = buffer,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const TimeoutError = error{Canceled} || os.UnexpectedError;
+
+pub fn timeout(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: TimeoutError!void,
+ ) void,
+ completion: *Completion,
+ nanoseconds: u63,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const TimeoutError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .timeout = .{
+ .timespec = .{ .tv_sec = 0, .tv_nsec = nanoseconds },
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const WriteError = error{
+ WouldBlock,
+ NotOpenForWriting,
+ NotConnected,
+ DiskQuota,
+ FileTooBig,
+ Alignment,
+ InputOutput,
+ NoSpaceLeft,
+ Unseekable,
+ AccessDenied,
+ BrokenPipe,
+} || os.UnexpectedError;
+
+pub fn write(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: WriteError!usize,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+ buffer: []const u8,
+ offset: u64,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const WriteError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .write = .{
+ .fd = fd,
+ .buffer = buffer,
+ .offset = offset,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
+ return os.socket(family, sock_type, protocol);
+}