diff options
| author | 2021-11-16 15:06:21 -0800 | |
|---|---|---|
| committer | 2021-12-16 19:18:51 -0800 | |
| commit | 4af743766d9e32789b90e39c761d5e896426d2f3 (patch) | |
| tree | 7ae22cb8852e674d428091d4200bee414091980f /src | |
| parent | 8f35f16c7ed97c50ee188f13ab9d5ea1008341ae (diff) | |
| download | bun-4af743766d9e32789b90e39c761d5e896426d2f3.tar.gz bun-4af743766d9e32789b90e39c761d5e896426d2f3.tar.zst bun-4af743766d9e32789b90e39c761d5e896426d2f3.zip | |
[bun install] Begin to add io_uring (via tigerbeetle/@kprotty's interface)
Diffstat (limited to 'src')
| -rw-r--r-- | src/builder.zig | 2 | ||||
| -rw-r--r-- | src/io/fifo.zig | 104 | ||||
| -rw-r--r-- | src/io/io_darwin.zig | 665 | ||||
| -rw-r--r-- | src/io/io_linux.zig | 874 | ||||
| -rw-r--r-- | src/io/time.zig | 64 | 
5 files changed, 1708 insertions, 1 deletions
| diff --git a/src/builder.zig b/src/builder.zig index 3811a7878..19daa0a20 100644 --- a/src/builder.zig +++ b/src/builder.zig @@ -1,7 +1,7 @@  const Allocator = @import("std").mem.Allocator;  const assert = @import("std").debug.assert;  const copy = @import("std").mem.copy; - +const io = @import("io");  pub fn Builder(comptime Type: type) type {      return struct {          const This = @This(); diff --git a/src/io/fifo.zig b/src/io/fifo.zig new file mode 100644 index 000000000..b9b3bc961 --- /dev/null +++ b/src/io/fifo.zig @@ -0,0 +1,104 @@ +const std = @import("std"); +const assert = std.debug.assert; + +/// An intrusive first in/first out linked list. +/// The element type T must have a field called "next" of type ?*T +pub fn FIFO(comptime T: type) type { +    return struct { +        const Self = @This(); + +        in: ?*T = null, +        out: ?*T = null, + +        pub fn push(self: *Self, elem: *T) void { +            assert(elem.next == null); +            if (self.in) |in| { +                in.next = elem; +                self.in = elem; +            } else { +                assert(self.out == null); +                self.in = elem; +                self.out = elem; +            } +        } + +        pub fn pop(self: *Self) ?*T { +            const ret = self.out orelse return null; +            self.out = ret.next; +            ret.next = null; +            if (self.in == ret) self.in = null; +            return ret; +        } + +        pub fn peek(self: Self) ?*T { +            return self.out; +        } + +        /// Remove an element from the FIFO. Asserts that the element is +        /// in the FIFO. This operation is O(N), if this is done often you +        /// probably want a different data structure. +        pub fn remove(self: *Self, to_remove: *T) void { +            if (to_remove == self.out) { +                _ = self.pop(); +                return; +            } +            var it = self.out; +            while (it) |elem| : (it = elem.next) { +                if (to_remove == elem.next) { +                    if (to_remove == self.in) self.in = elem; +                    elem.next = to_remove.next; +                    to_remove.next = null; +                    break; +                } +            } else unreachable; +        } +    }; +} + +test "push/pop/peek/remove" { +    const testing = @import("std").testing; + +    const Foo = struct { next: ?*@This() = null }; + +    var one: Foo = .{}; +    var two: Foo = .{}; +    var three: Foo = .{}; + +    var fifo: FIFO(Foo) = .{}; + +    fifo.push(&one); +    try testing.expectEqual(@as(?*Foo, &one), fifo.peek()); + +    fifo.push(&two); +    fifo.push(&three); +    try testing.expectEqual(@as(?*Foo, &one), fifo.peek()); + +    fifo.remove(&one); +    try testing.expectEqual(@as(?*Foo, &two), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, &three), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, null), fifo.pop()); + +    fifo.push(&one); +    fifo.push(&two); +    fifo.push(&three); +    fifo.remove(&two); +    try testing.expectEqual(@as(?*Foo, &one), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, &three), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, null), fifo.pop()); + +    fifo.push(&one); +    fifo.push(&two); +    fifo.push(&three); +    fifo.remove(&three); +    try testing.expectEqual(@as(?*Foo, &one), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, &two), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, null), fifo.pop()); + +    fifo.push(&one); +    fifo.push(&two); +    fifo.remove(&two); +    fifo.push(&three); +    try testing.expectEqual(@as(?*Foo, &one), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, &three), fifo.pop()); +    try testing.expectEqual(@as(?*Foo, null), fifo.pop()); +} diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig new file mode 100644 index 000000000..00186610b --- /dev/null +++ b/src/io/io_darwin.zig @@ -0,0 +1,665 @@ +const std = @import("std"); +const os = std.os; +const mem = std.mem; +const assert = std.debug.assert; + +const FIFO = @import("./fifo.zig").FIFO; +const Time = @import("./time.zig").Time; + +const IO = @This(); + +kq: os.fd_t, +time: Time = .{}, +io_inflight: usize = 0, +timeouts: FIFO(Completion) = .{}, +completed: FIFO(Completion) = .{}, +io_pending: FIFO(Completion) = .{}, + +pub fn init(entries: u12, flags: u32) !IO { +    const kq = try os.kqueue(); +    assert(kq > -1); +    return IO{ .kq = kq }; +} + +pub fn deinit(self: *IO) void { +    assert(self.kq > -1); +    os.close(self.kq); +    self.kq = -1; +} + +/// Pass all queued submissions to the kernel and peek for completions. +pub fn tick(self: *IO) !void { +    return self.flush(false); +} + +/// Pass all queued submissions to the kernel and run for `nanoseconds`. +/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used +/// in the __kernel_timespec struct. +pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { +    var timed_out = false; +    var completion: Completion = undefined; +    const on_timeout = struct { +        fn callback( +            timed_out_ptr: *bool, +            _completion: *Completion, +            _result: TimeoutError!void, +        ) void { +            timed_out_ptr.* = true; +        } +    }.callback; + +    // Submit a timeout which sets the timed_out value to true to terminate the loop below. +    self.timeout( +        *bool, +        &timed_out, +        on_timeout, +        &completion, +        nanoseconds, +    ); + +    // Loop until our timeout completion is processed above, which sets timed_out to true. +    // LLVM shouldn't be able to cache timed_out's value here since its address escapes above. +    while (!timed_out) { +        try self.flush(true); +    } +} + +fn flush(self: *IO, wait_for_completions: bool) !void { +    var io_pending = self.io_pending.peek(); +    var events: [256]os.Kevent = undefined; + +    // Check timeouts and fill events with completions in io_pending +    // (they will be submitted through kevent). +    // Timeouts are expired here and possibly pushed to the completed queue. +    const next_timeout = self.flush_timeouts(); +    const change_events = self.flush_io(&events, &io_pending); + +    // Only call kevent() if we need to submit io events or if we need to wait for completions. +    if (change_events > 0 or self.completed.peek() == null) { +        // Zero timeouts for kevent() implies a non-blocking poll +        var ts = std.mem.zeroes(os.timespec); + +        // We need to wait (not poll) on kevent if there's nothing to submit or complete. +        // We should never wait indefinitely (timeout_ptr = null for kevent) given: +        // - tick() is non-blocking (wait_for_completions = false) +        // - run_for_ns() always submits a timeout +        if (change_events == 0 and self.completed.peek() == null) { +            if (wait_for_completions) { +                const timeout_ns = next_timeout orelse @panic("kevent() blocking forever"); +                ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); +                ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); +            } else if (self.io_inflight == 0) { +                return; +            } +        } + +        const new_events = try os.kevent( +            self.kq, +            events[0..change_events], +            events[0..events.len], +            &ts, +        ); + +        // Mark the io events submitted only after kevent() successfully processed them +        self.io_pending.out = io_pending; +        if (io_pending == null) { +            self.io_pending.in = null; +        } + +        self.io_inflight += change_events; +        self.io_inflight -= new_events; + +        for (events[0..new_events]) |event| { +            const completion = @intToPtr(*Completion, event.udata); +            completion.next = null; +            self.completed.push(completion); +        } +    } + +    var completed = self.completed; +    self.completed = .{}; +    while (completed.pop()) |completion| { +        (completion.callback)(self, completion); +    } +} + +fn flush_io(self: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { +    for (events) |*event, flushed| { +        const completion = io_pending_top.* orelse return flushed; +        io_pending_top.* = completion.next; + +        const event_info = switch (completion.operation) { +            .accept => |op| [2]c_int{ op.socket, os.EVFILT_READ }, +            .connect => |op| [2]c_int{ op.socket, os.EVFILT_WRITE }, +            .read => |op| [2]c_int{ op.fd, os.EVFILT_READ }, +            .write => |op| [2]c_int{ op.fd, os.EVFILT_WRITE }, +            .recv => |op| [2]c_int{ op.socket, os.EVFILT_READ }, +            .send => |op| [2]c_int{ op.socket, os.EVFILT_WRITE }, +            else => @panic("invalid completion operation queued for io"), +        }; + +        event.* = .{ +            .ident = @intCast(u32, event_info[0]), +            .filter = @intCast(i16, event_info[1]), +            .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, +            .fflags = 0, +            .data = 0, +            .udata = @ptrToInt(completion), +        }; +    } +    return events.len; +} + +fn flush_timeouts(self: *IO) ?u64 { +    var min_timeout: ?u64 = null; +    var timeouts: ?*Completion = self.timeouts.peek(); +    while (timeouts) |completion| { +        timeouts = completion.next; + +        // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call. +        const now = self.time.monotonic(); +        const expires = completion.operation.timeout.expires; + +        // NOTE: remove() could be O(1) here with a doubly-linked-list +        // since we know the previous Completion. +        if (now >= expires) { +            self.timeouts.remove(completion); +            self.completed.push(completion); +            continue; +        } + +        const timeout_ns = expires - now; +        if (min_timeout) |min_ns| { +            min_timeout = std.math.min(min_ns, timeout_ns); +        } else { +            min_timeout = timeout_ns; +        } +    } +    return min_timeout; +} + +/// This struct holds the data needed for a single IO operation +pub const Completion = struct { +    next: ?*Completion, +    context: ?*c_void, +    callback: fn (*IO, *Completion) void, +    operation: Operation, +}; + +const Operation = union(enum) { +    accept: struct { +        socket: os.socket_t, +    }, +    close: struct { +        fd: os.fd_t, +    }, +    connect: struct { +        socket: os.socket_t, +        address: std.net.Address, +        initiated: bool, +    }, +    fsync: struct { +        fd: os.fd_t, +    }, +    read: struct { +        fd: os.fd_t, +        buf: [*]u8, +        len: u32, +        offset: u64, +    }, +    recv: struct { +        socket: os.socket_t, +        buf: [*]u8, +        len: u32, +    }, +    send: struct { +        socket: os.socket_t, +        buf: [*]const u8, +        len: u32, +    }, +    timeout: struct { +        expires: u64, +    }, +    write: struct { +        fd: os.fd_t, +        buf: [*]const u8, +        len: u32, +        offset: u64, +    }, +}; + +fn submit( +    self: *IO, +    context: anytype, +    comptime callback: anytype, +    completion: *Completion, +    comptime operation_tag: std.meta.Tag(Operation), +    operation_data: anytype, +    comptime OperationImpl: type, +) void { +    const Context = @TypeOf(context); +    const onCompleteFn = struct { +        fn onComplete(io: *IO, _completion: *Completion) void { +            // Perform the actual operaton +            const op_data = &@field(_completion.operation, @tagName(operation_tag)); +            const result = OperationImpl.doOperation(op_data); + +            // Requeue onto io_pending if error.WouldBlock +            switch (operation_tag) { +                .accept, .connect, .read, .write, .send, .recv => { +                    _ = result catch |err| switch (err) { +                        error.WouldBlock => { +                            _completion.next = null; +                            io.io_pending.push(_completion); +                            return; +                        }, +                        else => {}, +                    }; +                }, +                else => {}, +            } + +            // Complete the Completion +            return callback( +                @intToPtr(Context, @ptrToInt(_completion.context)), +                _completion, +                result, +            ); +        } +    }.onComplete; + +    completion.* = .{ +        .next = null, +        .context = context, +        .callback = onCompleteFn, +        .operation = @unionInit(Operation, @tagName(operation_tag), operation_data), +    }; + +    switch (operation_tag) { +        .timeout => self.timeouts.push(completion), +        else => self.completed.push(completion), +    } +} + +pub const AcceptError = os.AcceptError || os.SetSockOptError; + +pub fn accept( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: AcceptError!os.socket_t, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +) void { +    self.submit( +        context, +        callback, +        completion, +        .accept, +        .{ +            .socket = socket, +        }, +        struct { +            fn doOperation(op: anytype) AcceptError!os.socket_t { +                const fd = try os.accept( +                    op.socket, +                    null, +                    null, +                    os.SOCK_NONBLOCK | os.SOCK_CLOEXEC, +                ); +                errdefer os.close(fd); + +                // darwin doesn't support os.MSG_NOSIGNAL, +                // but instead a socket option to avoid SIGPIPE. +                os.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch |err| return switch (err) { +                    error.TimeoutTooBig => unreachable, +                    error.PermissionDenied => error.NetworkSubsystemFailed, +                    error.AlreadyConnected => error.NetworkSubsystemFailed, +                    error.InvalidProtocolOption => error.ProtocolFailure, +                    else => |e| e, +                }; + +                return fd; +            } +        }, +    ); +} + +pub const CloseError = error{ +    FileDescriptorInvalid, +    DiskQuota, +    InputOutput, +    NoSpaceLeft, +} || os.UnexpectedError; + +pub fn close( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: CloseError!void, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +) void { +    self.submit( +        context, +        callback, +        completion, +        .close, +        .{ +            .fd = fd, +        }, +        struct { +            fn doOperation(op: anytype) CloseError!void { +                return switch (os.errno(os.system.close(op.fd))) { +                    0 => {}, +                    os.EBADF => error.FileDescriptorInvalid, +                    os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 +                    os.EIO => error.InputOutput, +                    else => |errno| os.unexpectedErrno(errno), +                }; +            } +        }, +    ); +} + +pub const ConnectError = os.ConnectError; + +pub fn connect( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: ConnectError!void, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +    address: std.net.Address, +) void { +    self.submit( +        context, +        callback, +        completion, +        .connect, +        .{ +            .socket = socket, +            .address = address, +            .initiated = false, +        }, +        struct { +            fn doOperation(op: anytype) ConnectError!void { +                // Don't call connect after being rescheduled by io_pending as it gives EISCONN. +                // Instead, check the socket error to see if has been connected successfully. +                const result = switch (op.initiated) { +                    true => os.getsockoptError(op.socket), +                    else => os.connect(op.socket, &op.address.any, op.address.getOsSockLen()), +                }; + +                op.initiated = true; +                return result; +            } +        }, +    ); +} + +pub const FsyncError = os.SyncError; + +pub fn fsync( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: FsyncError!void, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +) void { +    self.submit( +        context, +        callback, +        completion, +        .fsync, +        .{ +            .fd = fd, +        }, +        struct { +            fn doOperation(op: anytype) FsyncError!void { +                _ = os.fcntl(op.fd, os.F_FULLFSYNC, 1) catch return os.fsync(op.fd); +            } +        }, +    ); +} + +pub const ReadError = error{ +    WouldBlock, +    NotOpenForReading, +    ConnectionResetByPeer, +    Alignment, +    InputOutput, +    IsDir, +    SystemResources, +    Unseekable, +} || os.UnexpectedError; + +pub fn read( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: ReadError!usize, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +    buffer: []u8, +    offset: u64, +) void { +    self.submit( +        context, +        callback, +        completion, +        .read, +        .{ +            .fd = fd, +            .buf = buffer.ptr, +            .len = @intCast(u32, buffer_limit(buffer.len)), +            .offset = offset, +        }, +        struct { +            fn doOperation(op: anytype) ReadError!usize { +                while (true) { +                    const rc = os.system.pread( +                        op.fd, +                        op.buf, +                        op.len, +                        @bitCast(isize, op.offset), +                    ); +                    return switch (os.errno(rc)) { +                        0 => @intCast(usize, rc), +                        os.EINTR => continue, +                        os.EAGAIN => error.WouldBlock, +                        os.EBADF => error.NotOpenForReading, +                        os.ECONNRESET => error.ConnectionResetByPeer, +                        os.EFAULT => unreachable, +                        os.EINVAL => error.Alignment, +                        os.EIO => error.InputOutput, +                        os.EISDIR => error.IsDir, +                        os.ENOBUFS => error.SystemResources, +                        os.ENOMEM => error.SystemResources, +                        os.ENXIO => error.Unseekable, +                        os.EOVERFLOW => error.Unseekable, +                        os.ESPIPE => error.Unseekable, +                        else => |err| os.unexpectedErrno(err), +                    }; +                } +            } +        }, +    ); +} + +pub const RecvError = os.RecvFromError; + +pub fn recv( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: RecvError!usize, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +    buffer: []u8, +) void { +    self.submit( +        context, +        callback, +        completion, +        .recv, +        .{ +            .socket = socket, +            .buf = buffer.ptr, +            .len = @intCast(u32, buffer_limit(buffer.len)), +        }, +        struct { +            fn doOperation(op: anytype) RecvError!usize { +                return os.recv(op.socket, op.buf[0..op.len], 0); +            } +        }, +    ); +} + +pub const SendError = os.SendError; + +pub fn send( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: SendError!usize, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +    buffer: []const u8, +) void { +    self.submit( +        context, +        callback, +        completion, +        .send, +        .{ +            .socket = socket, +            .buf = buffer.ptr, +            .len = @intCast(u32, buffer_limit(buffer.len)), +        }, +        struct { +            fn doOperation(op: anytype) SendError!usize { +                return os.send(op.socket, op.buf[0..op.len], 0); +            } +        }, +    ); +} + +pub const TimeoutError = error{Canceled} || os.UnexpectedError; + +pub fn timeout( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: TimeoutError!void, +    ) void, +    completion: *Completion, +    nanoseconds: u63, +) void { +    self.submit( +        context, +        callback, +        completion, +        .timeout, +        .{ +            .expires = self.time.monotonic() + nanoseconds, +        }, +        struct { +            fn doOperation(_: anytype) TimeoutError!void { +                return; // timeouts don't have errors for now +            } +        }, +    ); +} + +pub const WriteError = os.PWriteError; + +pub fn write( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: WriteError!usize, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +    buffer: []const u8, +    offset: u64, +) void { +    self.submit( +        context, +        callback, +        completion, +        .write, +        .{ +            .fd = fd, +            .buf = buffer.ptr, +            .len = @intCast(u32, buffer_limit(buffer.len)), +            .offset = offset, +        }, +        struct { +            fn doOperation(op: anytype) WriteError!usize { +                return os.pwrite(op.fd, op.buf[0..op.len], op.offset); +            } +        }, +    ); +} + +pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { +    const fd = try os.socket(family, sock_type | os.SOCK_NONBLOCK, protocol); +    errdefer os.close(fd); + +    // darwin doesn't support os.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE. +    try os.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))); +    return fd; +} + +fn buffer_limit(buffer_len: usize) usize { + +    // Linux limits how much may be written in a `pwrite()/pread()` call, which is `0x7ffff000` on +    // both 64-bit and 32-bit systems, due to using a signed C int as the return value, as well as +    // stuffing the errno codes into the last `4096` values. +    // Darwin limits writes to `0x7fffffff` bytes, more than that returns `EINVAL`. +    // The corresponding POSIX limit is `std.math.maxInt(isize)`. +    const limit = switch (std.Target.current.os.tag) { +        .linux => 0x7ffff000, +        .macos, .ios, .watchos, .tvos => std.math.maxInt(i32), +        else => std.math.maxInt(isize), +    }; +    return std.math.min(limit, buffer_len); +} diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig new file mode 100644 index 000000000..794f18f90 --- /dev/null +++ b/src/io/io_linux.zig @@ -0,0 +1,874 @@ +const std = @import("std"); +const assert = std.debug.assert; +const os = std.os; +const linux = os.linux; +const IO_Uring = linux.IO_Uring; +const io_uring_cqe = linux.io_uring_cqe; +const io_uring_sqe = linux.io_uring_sqe; + +const FIFO = @import("./fifo.zig").FIFO; +const IO = @This(); + +ring: IO_Uring, + +/// Operations not yet submitted to the kernel and waiting on available space in the +/// submission queue. +unqueued: FIFO(Completion) = .{}, + +/// Completions that are ready to have their callbacks run. +completed: FIFO(Completion) = .{}, + +pub fn init(entries: u12, flags: u32) !IO { +    return IO{ .ring = try IO_Uring.init(entries, flags) }; +} + +pub fn deinit(self: *IO) void { +    self.ring.deinit(); +} + +/// Pass all queued submissions to the kernel and peek for completions. +pub fn tick(self: *IO) !void { +    // We assume that all timeouts submitted by `run_for_ns()` will be reaped by `run_for_ns()` +    // and that `tick()` and `run_for_ns()` cannot be run concurrently. +    // Therefore `timeouts` here will never be decremented and `etime` will always be false. +    var timeouts: usize = 0; +    var etime = false; + +    try self.flush(0, &timeouts, &etime); +    assert(etime == false); + +    // Flush any SQEs that were queued while running completion callbacks in `flush()`: +    // This is an optimization to avoid delaying submissions until the next tick. +    // At the same time, we do not flush any ready CQEs since SQEs may complete synchronously. +    // We guard against an io_uring_enter() syscall if we know we do not have any queued SQEs. +    // We cannot use `self.ring.sq_ready()` here since this counts flushed and unflushed SQEs. +    const queued = self.ring.sq.sqe_tail -% self.ring.sq.sqe_head; +    if (queued > 0) { +        try self.flush_submissions(0, &timeouts, &etime); +        assert(etime == false); +    } +} + +/// Pass all queued submissions to the kernel and run for `nanoseconds`. +/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used +/// in the __kernel_timespec struct. +pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { +    // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the +    // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are +    // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC. +    var current_ts: os.timespec = undefined; +    os.clock_gettime(os.CLOCK_MONOTONIC, ¤t_ts) catch unreachable; +    // The absolute CLOCK_MONOTONIC time after which we may return from this function: +    const timeout_ts: os.__kernel_timespec = .{ +        .tv_sec = current_ts.tv_sec, +        .tv_nsec = current_ts.tv_nsec + nanoseconds, +    }; +    var timeouts: usize = 0; +    var etime = false; +    while (!etime) { +        const timeout_sqe = self.ring.get_sqe() catch blk: { +            // The submission queue is full, so flush submissions to make space: +            try self.flush_submissions(0, &timeouts, &etime); +            break :blk self.ring.get_sqe() catch unreachable; +        }; +        // Submit an absolute timeout that will be canceled if any other SQE completes first: +        linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, os.IORING_TIMEOUT_ABS); +        timeout_sqe.user_data = 0; +        timeouts += 1; +        // The amount of time this call will block is bounded by the timeout we just submitted: +        try self.flush(1, &timeouts, &etime); +    } +    // Reap any remaining timeouts, which reference the timespec in the current stack frame. +    // The busy loop here is required to avoid a potential deadlock, as the kernel determines +    // when the timeouts are pushed to the completion queue, not us. +    while (timeouts > 0) _ = try self.flush_completions(0, &timeouts, &etime); +} + +fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { +    // Flush any queued SQEs and reuse the same syscall to wait for completions if required: +    try self.flush_submissions(wait_nr, timeouts, etime); +    // We can now just peek for any CQEs without waiting and without another syscall: +    try self.flush_completions(0, timeouts, etime); +    // Run completions only after all completions have been flushed: +    // Loop on a copy of the linked list, having reset the list first, so that any synchronous +    // append on running a completion is executed only the next time round the event loop, +    // without creating an infinite loop. +    { +        var copy = self.completed; +        self.completed = .{}; +        while (copy.pop()) |completion| completion.complete(); +    } +    // Again, loop on a copy of the list to avoid an infinite loop: +    { +        var copy = self.unqueued; +        self.unqueued = .{}; +        while (copy.pop()) |completion| self.enqueue(completion); +    } +} + +fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { +    var cqes: [256]io_uring_cqe = undefined; +    var wait_remaining = wait_nr; +    while (true) { +        // Guard against waiting indefinitely (if there are too few requests inflight), +        // especially if this is not the first time round the loop: +        const completed = self.ring.copy_cqes(&cqes, wait_remaining) catch |err| switch (err) { +            error.SignalInterrupt => continue, +            else => return err, +        }; +        if (completed > wait_remaining) wait_remaining = 0 else wait_remaining -= completed; +        for (cqes[0..completed]) |cqe| { +            if (cqe.user_data == 0) { +                timeouts.* -= 1; +                // We are only done if the timeout submitted was completed due to time, not if +                // it was completed due to the completion of an event, in which case `cqe.res` +                // would be 0. It is possible for multiple timeout operations to complete at the +                // same time if the nanoseconds value passed to `run_for_ns()` is very short. +                if (-cqe.res == os.ETIME) etime.* = true; +                continue; +            } +            const completion = @intToPtr(*Completion, @intCast(usize, cqe.user_data)); +            completion.result = cqe.res; +            // We do not run the completion here (instead appending to a linked list) to avoid: +            // * recursion through `flush_submissions()` and `flush_completions()`, +            // * unbounded stack usage, and +            // * confusing stack traces. +            self.completed.push(completion); +        } +        if (completed < cqes.len) break; +    } +} + +fn flush_submissions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { +    while (true) { +        _ = self.ring.submit_and_wait(wait_nr) catch |err| switch (err) { +            error.SignalInterrupt => continue, +            // Wait for some completions and then try again: +            // See https://github.com/axboe/liburing/issues/281 re: error.SystemResources. +            // Be careful also that copy_cqes() will flush before entering to wait (it does): +            // https://github.com/axboe/liburing/commit/35c199c48dfd54ad46b96e386882e7ac341314c5 +            error.CompletionQueueOvercommitted, error.SystemResources => { +                try self.flush_completions(1, timeouts, etime); +                continue; +            }, +            else => return err, +        }; +        break; +    } +} + +fn enqueue(self: *IO, completion: *Completion) void { +    const sqe = self.ring.get_sqe() catch |err| switch (err) { +        error.SubmissionQueueFull => { +            self.unqueued.push(completion); +            return; +        }, +    }; +    completion.prep(sqe); +} + +/// This struct holds the data needed for a single io_uring operation +pub const Completion = struct { +    io: *IO, +    result: i32 = undefined, +    next: ?*Completion = null, +    operation: Operation, +    // This is one of the usecases for c_void outside of C code and as such c_void will +    // be replaced with anyopaque eventually: https://github.com/ziglang/zig/issues/323 +    context: ?*c_void, +    callback: fn (context: ?*c_void, completion: *Completion, result: *const c_void) void, + +    fn prep(completion: *Completion, sqe: *io_uring_sqe) void { +        switch (completion.operation) { +            .accept => |*op| { +                linux.io_uring_prep_accept( +                    sqe, +                    op.socket, +                    &op.address, +                    &op.address_size, +                    os.SOCK_CLOEXEC, +                ); +            }, +            .close => |op| { +                linux.io_uring_prep_close(sqe, op.fd); +            }, +            .connect => |*op| { +                linux.io_uring_prep_connect( +                    sqe, +                    op.socket, +                    &op.address.any, +                    op.address.getOsSockLen(), +                ); +            }, +            .fsync => |op| { +                linux.io_uring_prep_fsync(sqe, op.fd, 0); +            }, +            .read => |op| { +                linux.io_uring_prep_read( +                    sqe, +                    op.fd, +                    op.buffer[0..buffer_limit(op.buffer.len)], +                    op.offset, +                ); +            }, +            .recv => |op| { +                linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL); +            }, +            .send => |op| { +                linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL); +            }, +            .timeout => |*op| { +                linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0); +            }, +            .write => |op| { +                linux.io_uring_prep_write( +                    sqe, +                    op.fd, +                    op.buffer[0..buffer_limit(op.buffer.len)], +                    op.offset, +                ); +            }, +        } +        sqe.user_data = @ptrToInt(completion); +    } + +    fn complete(completion: *Completion) void { +        switch (completion.operation) { +            .accept => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EAGAIN => error.WouldBlock, +                    os.EBADF => error.FileDescriptorInvalid, +                    os.ECONNABORTED => error.ConnectionAborted, +                    os.EFAULT => unreachable, +                    os.EINVAL => error.SocketNotListening, +                    os.EMFILE => error.ProcessFdQuotaExceeded, +                    os.ENFILE => error.SystemFdQuotaExceeded, +                    os.ENOBUFS => error.SystemResources, +                    os.ENOMEM => error.SystemResources, +                    os.ENOTSOCK => error.FileDescriptorNotASocket, +                    os.EOPNOTSUPP => error.OperationNotSupported, +                    os.EPERM => error.PermissionDenied, +                    os.EPROTO => error.ProtocolFailure, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else @intCast(os.socket_t, completion.result); +                completion.callback(completion.context, completion, &result); +            }, +            .close => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 +                    os.EBADF => error.FileDescriptorInvalid, +                    os.EDQUOT => error.DiskQuota, +                    os.EIO => error.InputOutput, +                    os.ENOSPC => error.NoSpaceLeft, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else assert(completion.result == 0); +                completion.callback(completion.context, completion, &result); +            }, +            .connect => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EACCES => error.AccessDenied, +                    os.EADDRINUSE => error.AddressInUse, +                    os.EADDRNOTAVAIL => error.AddressNotAvailable, +                    os.EAFNOSUPPORT => error.AddressFamilyNotSupported, +                    os.EAGAIN, os.EINPROGRESS => error.WouldBlock, +                    os.EALREADY => error.OpenAlreadyInProgress, +                    os.EBADF => error.FileDescriptorInvalid, +                    os.ECONNREFUSED => error.ConnectionRefused, +                    os.ECONNRESET => error.ConnectionResetByPeer, +                    os.EFAULT => unreachable, +                    os.EISCONN => error.AlreadyConnected, +                    os.ENETUNREACH => error.NetworkUnreachable, +                    os.ENOENT => error.FileNotFound, +                    os.ENOTSOCK => error.FileDescriptorNotASocket, +                    os.EPERM => error.PermissionDenied, +                    os.EPROTOTYPE => error.ProtocolNotSupported, +                    os.ETIMEDOUT => error.ConnectionTimedOut, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else assert(completion.result == 0); +                completion.callback(completion.context, completion, &result); +            }, +            .fsync => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EBADF => error.FileDescriptorInvalid, +                    os.EDQUOT => error.DiskQuota, +                    os.EINVAL => error.ArgumentsInvalid, +                    os.EIO => error.InputOutput, +                    os.ENOSPC => error.NoSpaceLeft, +                    os.EROFS => error.ReadOnlyFileSystem, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else assert(completion.result == 0); +                completion.callback(completion.context, completion, &result); +            }, +            .read => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EAGAIN => error.WouldBlock, +                    os.EBADF => error.NotOpenForReading, +                    os.ECONNRESET => error.ConnectionResetByPeer, +                    os.EFAULT => unreachable, +                    os.EINVAL => error.Alignment, +                    os.EIO => error.InputOutput, +                    os.EISDIR => error.IsDir, +                    os.ENOBUFS => error.SystemResources, +                    os.ENOMEM => error.SystemResources, +                    os.ENXIO => error.Unseekable, +                    os.EOVERFLOW => error.Unseekable, +                    os.ESPIPE => error.Unseekable, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else @intCast(usize, completion.result); +                completion.callback(completion.context, completion, &result); +            }, +            .recv => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EAGAIN => error.WouldBlock, +                    os.EBADF => error.FileDescriptorInvalid, +                    os.ECONNREFUSED => error.ConnectionRefused, +                    os.EFAULT => unreachable, +                    os.EINVAL => unreachable, +                    os.ENOMEM => error.SystemResources, +                    os.ENOTCONN => error.SocketNotConnected, +                    os.ENOTSOCK => error.FileDescriptorNotASocket, +                    os.ECONNRESET => error.ConnectionResetByPeer, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else @intCast(usize, completion.result); +                completion.callback(completion.context, completion, &result); +            }, +            .send => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EACCES => error.AccessDenied, +                    os.EAGAIN => error.WouldBlock, +                    os.EALREADY => error.FastOpenAlreadyInProgress, +                    os.EAFNOSUPPORT => error.AddressFamilyNotSupported, +                    os.EBADF => error.FileDescriptorInvalid, +                    os.ECONNRESET => error.ConnectionResetByPeer, +                    os.EDESTADDRREQ => unreachable, +                    os.EFAULT => unreachable, +                    os.EINVAL => unreachable, +                    os.EISCONN => unreachable, +                    os.EMSGSIZE => error.MessageTooBig, +                    os.ENOBUFS => error.SystemResources, +                    os.ENOMEM => error.SystemResources, +                    os.ENOTCONN => error.SocketNotConnected, +                    os.ENOTSOCK => error.FileDescriptorNotASocket, +                    os.EOPNOTSUPP => error.OperationNotSupported, +                    os.EPIPE => error.BrokenPipe, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else @intCast(usize, completion.result); +                completion.callback(completion.context, completion, &result); +            }, +            .timeout => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.ECANCELED => error.Canceled, +                    os.ETIME => {}, // A success. +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else unreachable; +                completion.callback(completion.context, completion, &result); +            }, +            .write => { +                const result = if (completion.result < 0) switch (-completion.result) { +                    os.EINTR => { +                        completion.io.enqueue(completion); +                        return; +                    }, +                    os.EAGAIN => error.WouldBlock, +                    os.EBADF => error.NotOpenForWriting, +                    os.EDESTADDRREQ => error.NotConnected, +                    os.EDQUOT => error.DiskQuota, +                    os.EFAULT => unreachable, +                    os.EFBIG => error.FileTooBig, +                    os.EINVAL => error.Alignment, +                    os.EIO => error.InputOutput, +                    os.ENOSPC => error.NoSpaceLeft, +                    os.ENXIO => error.Unseekable, +                    os.EOVERFLOW => error.Unseekable, +                    os.EPERM => error.AccessDenied, +                    os.EPIPE => error.BrokenPipe, +                    os.ESPIPE => error.Unseekable, +                    else => |errno| os.unexpectedErrno(@intCast(usize, errno)), +                } else @intCast(usize, completion.result); +                completion.callback(completion.context, completion, &result); +            }, +        } +    } +}; + +/// This union encodes the set of operations supported as well as their arguments. +const Operation = union(enum) { +    accept: struct { +        socket: os.socket_t, +        address: os.sockaddr = undefined, +        address_size: os.socklen_t = @sizeOf(os.sockaddr), +    }, +    close: struct { +        fd: os.fd_t, +    }, +    connect: struct { +        socket: os.socket_t, +        address: std.net.Address, +    }, +    fsync: struct { +        fd: os.fd_t, +    }, +    read: struct { +        fd: os.fd_t, +        buffer: []u8, +        offset: u64, +    }, +    recv: struct { +        socket: os.socket_t, +        buffer: []u8, +    }, +    send: struct { +        socket: os.socket_t, +        buffer: []const u8, +    }, +    timeout: struct { +        timespec: os.__kernel_timespec, +    }, +    write: struct { +        fd: os.fd_t, +        buffer: []const u8, +        offset: u64, +    }, +}; + +pub const AcceptError = error{ +    WouldBlock, +    FileDescriptorInvalid, +    ConnectionAborted, +    SocketNotListening, +    ProcessFdQuotaExceeded, +    SystemFdQuotaExceeded, +    SystemResources, +    FileDescriptorNotASocket, +    OperationNotSupported, +    PermissionDenied, +    ProtocolFailure, +} || os.UnexpectedError; + +pub fn accept( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: AcceptError!os.socket_t, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const AcceptError!os.socket_t, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .accept = .{ +                .socket = socket, +                .address = undefined, +                .address_size = @sizeOf(os.sockaddr), +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const CloseError = error{ +    FileDescriptorInvalid, +    DiskQuota, +    InputOutput, +    NoSpaceLeft, +} || os.UnexpectedError; + +pub fn close( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: CloseError!void, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const CloseError!void, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .close = .{ .fd = fd }, +        }, +    }; +    self.enqueue(completion); +} + +pub const ConnectError = error{ +    AccessDenied, +    AddressInUse, +    AddressNotAvailable, +    AddressFamilyNotSupported, +    WouldBlock, +    OpenAlreadyInProgress, +    FileDescriptorInvalid, +    ConnectionRefused, +    AlreadyConnected, +    NetworkUnreachable, +    FileNotFound, +    FileDescriptorNotASocket, +    PermissionDenied, +    ProtocolNotSupported, +    ConnectionTimedOut, +} || os.UnexpectedError; + +pub fn connect( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: ConnectError!void, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +    address: std.net.Address, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const ConnectError!void, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .connect = .{ +                .socket = socket, +                .address = address, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const FsyncError = error{ +    FileDescriptorInvalid, +    DiskQuota, +    ArgumentsInvalid, +    InputOutput, +    NoSpaceLeft, +    ReadOnlyFileSystem, +} || os.UnexpectedError; + +pub fn fsync( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: FsyncError!void, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const FsyncError!void, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .fsync = .{ +                .fd = fd, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const ReadError = error{ +    WouldBlock, +    NotOpenForReading, +    ConnectionResetByPeer, +    Alignment, +    InputOutput, +    IsDir, +    SystemResources, +    Unseekable, +} || os.UnexpectedError; + +pub fn read( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: ReadError!usize, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +    buffer: []u8, +    offset: u64, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const ReadError!usize, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .read = .{ +                .fd = fd, +                .buffer = buffer, +                .offset = offset, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const RecvError = error{ +    WouldBlock, +    FileDescriptorInvalid, +    ConnectionRefused, +    SystemResources, +    SocketNotConnected, +    FileDescriptorNotASocket, +} || os.UnexpectedError; + +pub fn recv( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: RecvError!usize, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +    buffer: []u8, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const RecvError!usize, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .recv = .{ +                .socket = socket, +                .buffer = buffer, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const SendError = error{ +    AccessDenied, +    WouldBlock, +    FastOpenAlreadyInProgress, +    AddressFamilyNotSupported, +    FileDescriptorInvalid, +    ConnectionResetByPeer, +    MessageTooBig, +    SystemResources, +    SocketNotConnected, +    FileDescriptorNotASocket, +    OperationNotSupported, +    BrokenPipe, +} || os.UnexpectedError; + +pub fn send( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: SendError!usize, +    ) void, +    completion: *Completion, +    socket: os.socket_t, +    buffer: []const u8, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const SendError!usize, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .send = .{ +                .socket = socket, +                .buffer = buffer, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const TimeoutError = error{Canceled} || os.UnexpectedError; + +pub fn timeout( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: TimeoutError!void, +    ) void, +    completion: *Completion, +    nanoseconds: u63, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const TimeoutError!void, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .timeout = .{ +                .timespec = .{ .tv_sec = 0, .tv_nsec = nanoseconds }, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub const WriteError = error{ +    WouldBlock, +    NotOpenForWriting, +    NotConnected, +    DiskQuota, +    FileTooBig, +    Alignment, +    InputOutput, +    NoSpaceLeft, +    Unseekable, +    AccessDenied, +    BrokenPipe, +} || os.UnexpectedError; + +pub fn write( +    self: *IO, +    comptime Context: type, +    context: Context, +    comptime callback: fn ( +        context: Context, +        completion: *Completion, +        result: WriteError!usize, +    ) void, +    completion: *Completion, +    fd: os.fd_t, +    buffer: []const u8, +    offset: u64, +) void { +    completion.* = .{ +        .io = self, +        .context = context, +        .callback = struct { +            fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { +                callback( +                    @intToPtr(Context, @ptrToInt(ctx)), +                    comp, +                    @intToPtr(*const WriteError!usize, @ptrToInt(res)).*, +                ); +            } +        }.wrapper, +        .operation = .{ +            .write = .{ +                .fd = fd, +                .buffer = buffer, +                .offset = offset, +            }, +        }, +    }; +    self.enqueue(completion); +} + +pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { +    return os.socket(family, sock_type, protocol); +} diff --git a/src/io/time.zig b/src/io/time.zig new file mode 100644 index 000000000..2bfe24da0 --- /dev/null +++ b/src/io/time.zig @@ -0,0 +1,64 @@ +const std = @import("std"); +const assert = std.debug.assert; +const is_darwin = std.Target.current.isDarwin(); + +pub const Time = struct { +    const Self = @This(); + +    /// Hardware and/or software bugs can mean that the monotonic clock may regress. +    /// One example (of many): https://bugzilla.redhat.com/show_bug.cgi?id=448449 +    /// We crash the process for safety if this ever happens, to protect against infinite loops. +    /// It's better to crash and come back with a valid monotonic clock than get stuck forever. +    monotonic_guard: u64 = 0, + +    /// A timestamp to measure elapsed time, meaningful only on the same system, not across reboots. +    /// Always use a monotonic timestamp if the goal is to measure elapsed time. +    /// This clock is not affected by discontinuous jumps in the system time, for example if the +    /// system administrator manually changes the clock. +    pub fn monotonic(self: *Self) u64 { +        const m = blk: { +            // Uses mach_continuous_time() instead of mach_absolute_time() as it counts while suspended. +            // https://developer.apple.com/documentation/kernel/1646199-mach_continuous_time +            // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.c.auto.html +            if (is_darwin) { +                const darwin = struct { +                    const mach_timebase_info_t = std.os.darwin.mach_timebase_info_data; +                    extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) std.os.darwin.kern_return_t; +                    extern "c" fn mach_continuous_time() u64; +                }; + +                const now = darwin.mach_continuous_time(); +                var info: darwin.mach_timebase_info_t = undefined; +                if (darwin.mach_timebase_info(&info) != 0) @panic("mach_timebase_info() failed"); +                return (now * info.numer) / info.denom; +            } + +            // The true monotonic clock on Linux is not in fact CLOCK_MONOTONIC: +            // CLOCK_MONOTONIC excludes elapsed time while the system is suspended (e.g. VM migration). +            // CLOCK_BOOTTIME is the same as CLOCK_MONOTONIC but includes elapsed time during a suspend. +            // For more detail and why CLOCK_MONOTONIC_RAW is even worse than CLOCK_MONOTONIC, +            // see https://github.com/ziglang/zig/pull/933#discussion_r656021295. +            var ts: std.os.timespec = undefined; +            std.os.clock_gettime(std.os.CLOCK_BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required"); +            break :blk @intCast(u64, ts.tv_sec) * std.time.ns_per_s + @intCast(u64, ts.tv_nsec); +        }; + +        // "Oops!...I Did It Again" +        if (m < self.monotonic_guard) @panic("a hardware/kernel bug regressed the monotonic clock"); +        self.monotonic_guard = m; +        return m; +    } + +    /// A timestamp to measure real (i.e. wall clock) time, meaningful across systems, and reboots. +    /// This clock is affected by discontinuous jumps in the system time. +    pub fn realtime(self: *Self) i64 { +        // macos has supported clock_gettime() since 10.12: +        // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.3.auto.html + +        var ts: std.os.timespec = undefined; +        std.os.clock_gettime(std.os.CLOCK_REALTIME, &ts) catch unreachable; +        return @as(i64, ts.tv_sec) * std.time.ns_per_s + ts.tv_nsec; +    } + +    pub fn tick(self: *Self) void {} +}; | 
