diff options
-rw-r--r-- | Makefile | 11 | ||||
-rw-r--r-- | build.zig | 13 | ||||
-rw-r--r-- | src/builder.zig | 2 | ||||
-rw-r--r-- | src/io/fifo.zig | 104 | ||||
-rw-r--r-- | src/io/io_darwin.zig | 665 | ||||
-rw-r--r-- | src/io/io_linux.zig | 874 | ||||
-rw-r--r-- | src/io/time.zig | 64 |
7 files changed, 1732 insertions, 1 deletions
@@ -826,12 +826,23 @@ endif endif +IO_FILE = + +ifeq ($(OS_NAME),linux) +IO_FILE = "src/io/io_linux.zig" +endif + +ifeq ($(OS_NAME),darwin) +IO_FILE = "src/io/io_darwin.zig" +endif + build-unit: @rm -rf zig-out/bin/$(testname) @mkdir -p zig-out/bin zig test $(realpath $(testpath)) \ $(testfilterflag) \ --pkg-begin picohttp $(DEPS_DIR)/picohttp.zig --pkg-end \ + --pkg-begin io $(IO_FILE) --pkg-end \ --pkg-begin clap $(DEPS_DIR)/zig-clap/clap.zig --pkg-end \ --main-pkg-path $(shell pwd) \ --test-no-exec \ @@ -308,6 +308,19 @@ pub fn build(b: *std.build.Builder) !void { .path = .{ .path = "src/deps/zig-clap/clap.zig" }, }); + var platform_label = if (target.isDarwin()) + "darwin" + else + "linux"; + obj.addPackage(.{ + .name = "io", + .path = .{ .path = try std.fmt.allocPrint(b.allocator, "src/io/io_{s}.zig", .{platform_label}) }, + }); + exe.addPackage(.{ + .name = "io", + .path = .{ .path = try std.fmt.allocPrint(b.allocator, "src/io/io_{s}.zig", .{platform_label}) }, + }); + { obj_step.dependOn(&b.addLog( "Build {s} v{} - v{}\n", diff --git a/src/builder.zig b/src/builder.zig index 3811a7878..19daa0a20 100644 --- a/src/builder.zig +++ b/src/builder.zig @@ -1,7 +1,7 @@ const Allocator = @import("std").mem.Allocator; const assert = @import("std").debug.assert; const copy = @import("std").mem.copy; - +const io = @import("io"); pub fn Builder(comptime Type: type) type { return struct { const This = @This(); diff --git a/src/io/fifo.zig b/src/io/fifo.zig new file mode 100644 index 000000000..b9b3bc961 --- /dev/null +++ b/src/io/fifo.zig @@ -0,0 +1,104 @@ +const std = @import("std"); +const assert = std.debug.assert; + +/// An intrusive first in/first out linked list. +/// The element type T must have a field called "next" of type ?*T +pub fn FIFO(comptime T: type) type { + return struct { + const Self = @This(); + + in: ?*T = null, + out: ?*T = null, + + pub fn push(self: *Self, elem: *T) void { + assert(elem.next == null); + if (self.in) |in| { + in.next = elem; + self.in = elem; + } else { + assert(self.out == null); + self.in = elem; + self.out = elem; + } + } + + pub fn pop(self: *Self) ?*T { + const ret = self.out orelse return null; + self.out = ret.next; + ret.next = null; + if (self.in == ret) self.in = null; + return ret; + } + + pub fn peek(self: Self) ?*T { + return self.out; + } + + /// Remove an element from the FIFO. Asserts that the element is + /// in the FIFO. This operation is O(N), if this is done often you + /// probably want a different data structure. + pub fn remove(self: *Self, to_remove: *T) void { + if (to_remove == self.out) { + _ = self.pop(); + return; + } + var it = self.out; + while (it) |elem| : (it = elem.next) { + if (to_remove == elem.next) { + if (to_remove == self.in) self.in = elem; + elem.next = to_remove.next; + to_remove.next = null; + break; + } + } else unreachable; + } + }; +} + +test "push/pop/peek/remove" { + const testing = @import("std").testing; + + const Foo = struct { next: ?*@This() = null }; + + var one: Foo = .{}; + var two: Foo = .{}; + var three: Foo = .{}; + + var fifo: FIFO(Foo) = .{}; + + fifo.push(&one); + try testing.expectEqual(@as(?*Foo, &one), fifo.peek()); + + fifo.push(&two); + fifo.push(&three); + try testing.expectEqual(@as(?*Foo, &one), fifo.peek()); + + fifo.remove(&one); + try testing.expectEqual(@as(?*Foo, &two), fifo.pop()); + try testing.expectEqual(@as(?*Foo, &three), fifo.pop()); + try testing.expectEqual(@as(?*Foo, null), fifo.pop()); + + fifo.push(&one); + fifo.push(&two); + fifo.push(&three); + fifo.remove(&two); + try testing.expectEqual(@as(?*Foo, &one), fifo.pop()); + try testing.expectEqual(@as(?*Foo, &three), fifo.pop()); + try testing.expectEqual(@as(?*Foo, null), fifo.pop()); + + fifo.push(&one); + fifo.push(&two); + fifo.push(&three); + fifo.remove(&three); + try testing.expectEqual(@as(?*Foo, &one), fifo.pop()); + try testing.expectEqual(@as(?*Foo, &two), fifo.pop()); + try testing.expectEqual(@as(?*Foo, null), fifo.pop()); + + fifo.push(&one); + fifo.push(&two); + fifo.remove(&two); + fifo.push(&three); + try testing.expectEqual(@as(?*Foo, &one), fifo.pop()); + try testing.expectEqual(@as(?*Foo, &three), fifo.pop()); + try testing.expectEqual(@as(?*Foo, null), fifo.pop()); +} diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig new file mode 100644 index 000000000..00186610b --- /dev/null +++ b/src/io/io_darwin.zig @@ -0,0 +1,665 @@ +const std = @import("std"); +const os = std.os; +const mem = std.mem; +const assert = std.debug.assert; + +const FIFO = @import("./fifo.zig").FIFO; +const Time = @import("./time.zig").Time; + +const IO = @This(); + +kq: os.fd_t, +time: Time = .{}, +io_inflight: usize = 0, +timeouts: FIFO(Completion) = .{}, +completed: FIFO(Completion) = .{}, +io_pending: FIFO(Completion) = .{}, + +pub fn init(entries: u12, flags: u32) !IO { + const kq = try os.kqueue(); + assert(kq > -1); + return IO{ .kq = kq }; +} + +pub fn deinit(self: *IO) void { + assert(self.kq > -1); + os.close(self.kq); + self.kq = -1; +} + +/// Pass all queued submissions to the kernel and peek for completions. +pub fn tick(self: *IO) !void { + return self.flush(false); +} + +/// Pass all queued submissions to the kernel and run for `nanoseconds`. +/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used +/// in the __kernel_timespec struct. +pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { + var timed_out = false; + var completion: Completion = undefined; + const on_timeout = struct { + fn callback( + timed_out_ptr: *bool, + _completion: *Completion, + _result: TimeoutError!void, + ) void { + timed_out_ptr.* = true; + } + }.callback; + + // Submit a timeout which sets the timed_out value to true to terminate the loop below. + self.timeout( + *bool, + &timed_out, + on_timeout, + &completion, + nanoseconds, + ); + + // Loop until our timeout completion is processed above, which sets timed_out to true. + // LLVM shouldn't be able to cache timed_out's value here since its address escapes above. + while (!timed_out) { + try self.flush(true); + } +} + +fn flush(self: *IO, wait_for_completions: bool) !void { + var io_pending = self.io_pending.peek(); + var events: [256]os.Kevent = undefined; + + // Check timeouts and fill events with completions in io_pending + // (they will be submitted through kevent). + // Timeouts are expired here and possibly pushed to the completed queue. + const next_timeout = self.flush_timeouts(); + const change_events = self.flush_io(&events, &io_pending); + + // Only call kevent() if we need to submit io events or if we need to wait for completions. + if (change_events > 0 or self.completed.peek() == null) { + // Zero timeouts for kevent() implies a non-blocking poll + var ts = std.mem.zeroes(os.timespec); + + // We need to wait (not poll) on kevent if there's nothing to submit or complete. + // We should never wait indefinitely (timeout_ptr = null for kevent) given: + // - tick() is non-blocking (wait_for_completions = false) + // - run_for_ns() always submits a timeout + if (change_events == 0 and self.completed.peek() == null) { + if (wait_for_completions) { + const timeout_ns = next_timeout orelse @panic("kevent() blocking forever"); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); + } else if (self.io_inflight == 0) { + return; + } + } + + const new_events = try os.kevent( + self.kq, + events[0..change_events], + events[0..events.len], + &ts, + ); + + // Mark the io events submitted only after kevent() successfully processed them + self.io_pending.out = io_pending; + if (io_pending == null) { + self.io_pending.in = null; + } + + self.io_inflight += change_events; + self.io_inflight -= new_events; + + for (events[0..new_events]) |event| { + const completion = @intToPtr(*Completion, event.udata); + completion.next = null; + self.completed.push(completion); + } + } + + var completed = self.completed; + self.completed = .{}; + while (completed.pop()) |completion| { + (completion.callback)(self, completion); + } +} + +fn flush_io(self: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { + for (events) |*event, flushed| { + const completion = io_pending_top.* orelse return flushed; + io_pending_top.* = completion.next; + + const event_info = switch (completion.operation) { + .accept => |op| [2]c_int{ op.socket, os.EVFILT_READ }, + .connect => |op| [2]c_int{ op.socket, os.EVFILT_WRITE }, + .read => |op| [2]c_int{ op.fd, os.EVFILT_READ }, + .write => |op| [2]c_int{ op.fd, os.EVFILT_WRITE }, + .recv => |op| [2]c_int{ op.socket, os.EVFILT_READ }, + .send => |op| [2]c_int{ op.socket, os.EVFILT_WRITE }, + else => @panic("invalid completion operation queued for io"), + }; + + event.* = .{ + .ident = @intCast(u32, event_info[0]), + .filter = @intCast(i16, event_info[1]), + .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(completion), + }; + } + return events.len; +} + +fn flush_timeouts(self: *IO) ?u64 { + var min_timeout: ?u64 = null; + var timeouts: ?*Completion = self.timeouts.peek(); + while (timeouts) |completion| { + timeouts = completion.next; + + // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call. + const now = self.time.monotonic(); + const expires = completion.operation.timeout.expires; + + // NOTE: remove() could be O(1) here with a doubly-linked-list + // since we know the previous Completion. + if (now >= expires) { + self.timeouts.remove(completion); + self.completed.push(completion); + continue; + } + + const timeout_ns = expires - now; + if (min_timeout) |min_ns| { + min_timeout = std.math.min(min_ns, timeout_ns); + } else { + min_timeout = timeout_ns; + } + } + return min_timeout; +} + +/// This struct holds the data needed for a single IO operation +pub const Completion = struct { + next: ?*Completion, + context: ?*c_void, + callback: fn (*IO, *Completion) void, + operation: Operation, +}; + +const Operation = union(enum) { + accept: struct { + socket: os.socket_t, + }, + close: struct { + fd: os.fd_t, + }, + connect: struct { + socket: os.socket_t, + address: std.net.Address, + initiated: bool, + }, + fsync: struct { + fd: os.fd_t, + }, + read: struct { + fd: os.fd_t, + buf: [*]u8, + len: u32, + offset: u64, + }, + recv: struct { + socket: os.socket_t, + buf: [*]u8, + len: u32, + }, + send: struct { + socket: os.socket_t, + buf: [*]const u8, + len: u32, + }, + timeout: struct { + expires: u64, + }, + write: struct { + fd: os.fd_t, + buf: [*]const u8, + len: u32, + offset: u64, + }, +}; + +fn submit( + self: *IO, + context: anytype, + comptime callback: anytype, + completion: *Completion, + comptime operation_tag: std.meta.Tag(Operation), + operation_data: anytype, + comptime OperationImpl: type, +) void { + const Context = @TypeOf(context); + const onCompleteFn = struct { + fn onComplete(io: *IO, _completion: *Completion) void { + // Perform the actual operaton + const op_data = &@field(_completion.operation, @tagName(operation_tag)); + const result = OperationImpl.doOperation(op_data); + + // Requeue onto io_pending if error.WouldBlock + switch (operation_tag) { + .accept, .connect, .read, .write, .send, .recv => { + _ = result catch |err| switch (err) { + error.WouldBlock => { + _completion.next = null; + io.io_pending.push(_completion); + return; + }, + else => {}, + }; + }, + else => {}, + } + + // Complete the Completion + return callback( + @intToPtr(Context, @ptrToInt(_completion.context)), + _completion, + result, + ); + } + }.onComplete; + + completion.* = .{ + .next = null, + .context = context, + .callback = onCompleteFn, + .operation = @unionInit(Operation, @tagName(operation_tag), operation_data), + }; + + switch (operation_tag) { + .timeout => self.timeouts.push(completion), + else => self.completed.push(completion), + } +} + +pub const AcceptError = os.AcceptError || os.SetSockOptError; + +pub fn accept( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: AcceptError!os.socket_t, + ) void, + completion: *Completion, + socket: os.socket_t, +) void { + self.submit( + context, + callback, + completion, + .accept, + .{ + .socket = socket, + }, + struct { + fn doOperation(op: anytype) AcceptError!os.socket_t { + const fd = try os.accept( + op.socket, + null, + null, + os.SOCK_NONBLOCK | os.SOCK_CLOEXEC, + ); + errdefer os.close(fd); + + // darwin doesn't support os.MSG_NOSIGNAL, + // but instead a socket option to avoid SIGPIPE. + os.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch |err| return switch (err) { + error.TimeoutTooBig => unreachable, + error.PermissionDenied => error.NetworkSubsystemFailed, + error.AlreadyConnected => error.NetworkSubsystemFailed, + error.InvalidProtocolOption => error.ProtocolFailure, + else => |e| e, + }; + + return fd; + } + }, + ); +} + +pub const CloseError = error{ + FileDescriptorInvalid, + DiskQuota, + InputOutput, + NoSpaceLeft, +} || os.UnexpectedError; + +pub fn close( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: CloseError!void, + ) void, + completion: *Completion, + fd: os.fd_t, +) void { + self.submit( + context, + callback, + completion, + .close, + .{ + .fd = fd, + }, + struct { + fn doOperation(op: anytype) CloseError!void { + return switch (os.errno(os.system.close(op.fd))) { + 0 => {}, + os.EBADF => error.FileDescriptorInvalid, + os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 + os.EIO => error.InputOutput, + else => |errno| os.unexpectedErrno(errno), + }; + } + }, + ); +} + +pub const ConnectError = os.ConnectError; + +pub fn connect( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: ConnectError!void, + ) void, + completion: *Completion, + socket: os.socket_t, + address: std.net.Address, +) void { + self.submit( + context, + callback, + completion, + .connect, + .{ + .socket = socket, + .address = address, + .initiated = false, + }, + struct { + fn doOperation(op: anytype) ConnectError!void { + // Don't call connect after being rescheduled by io_pending as it gives EISCONN. + // Instead, check the socket error to see if has been connected successfully. + const result = switch (op.initiated) { + true => os.getsockoptError(op.socket), + else => os.connect(op.socket, &op.address.any, op.address.getOsSockLen()), + }; + + op.initiated = true; + return result; + } + }, + ); +} + +pub const FsyncError = os.SyncError; + +pub fn fsync( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: FsyncError!void, + ) void, + completion: *Completion, + fd: os.fd_t, +) void { + self.submit( + context, + callback, + completion, + .fsync, + .{ + .fd = fd, + }, + struct { + fn doOperation(op: anytype) FsyncError!void { + _ = os.fcntl(op.fd, os.F_FULLFSYNC, 1) catch return os.fsync(op.fd); + } + }, + ); +} + +pub const ReadError = error{ + WouldBlock, + NotOpenForReading, + ConnectionResetByPeer, + Alignment, + InputOutput, + IsDir, + SystemResources, + Unseekable, +} || os.UnexpectedError; + +pub fn read( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: ReadError!usize, + ) void, + completion: *Completion, + fd: os.fd_t, + buffer: []u8, + offset: u64, +) void { + self.submit( + context, + callback, + completion, + .read, + .{ + .fd = fd, + .buf = buffer.ptr, + .len = @intCast(u32, buffer_limit(buffer.len)), + .offset = offset, + }, + struct { + fn doOperation(op: anytype) ReadError!usize { + while (true) { + const rc = os.system.pread( + op.fd, + op.buf, + op.len, + @bitCast(isize, op.offset), + ); + return switch (os.errno(rc)) { + 0 => @intCast(usize, rc), + os.EINTR => continue, + os.EAGAIN => error.WouldBlock, + os.EBADF => error.NotOpenForReading, + os.ECONNRESET => error.ConnectionResetByPeer, + os.EFAULT => unreachable, + os.EINVAL => error.Alignment, + os.EIO => error.InputOutput, + os.EISDIR => error.IsDir, + os.ENOBUFS => error.SystemResources, + os.ENOMEM => error.SystemResources, + os.ENXIO => error.Unseekable, + os.EOVERFLOW => error.Unseekable, + os.ESPIPE => error.Unseekable, + else => |err| os.unexpectedErrno(err), + }; + } + } + }, + ); +} + +pub const RecvError = os.RecvFromError; + +pub fn recv( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: RecvError!usize, + ) void, + completion: *Completion, + socket: os.socket_t, + buffer: []u8, +) void { + self.submit( + context, + callback, + completion, + .recv, + .{ + .socket = socket, + .buf = buffer.ptr, + .len = @intCast(u32, buffer_limit(buffer.len)), + }, + struct { + fn doOperation(op: anytype) RecvError!usize { + return os.recv(op.socket, op.buf[0..op.len], 0); + } + }, + ); +} + +pub const SendError = os.SendError; + +pub fn send( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: SendError!usize, + ) void, + completion: *Completion, + socket: os.socket_t, + buffer: []const u8, +) void { + self.submit( + context, + callback, + completion, + .send, + .{ + .socket = socket, + .buf = buffer.ptr, + .len = @intCast(u32, buffer_limit(buffer.len)), + }, + struct { + fn doOperation(op: anytype) SendError!usize { + return os.send(op.socket, op.buf[0..op.len], 0); + } + }, + ); +} + +pub const TimeoutError = error{Canceled} || os.UnexpectedError; + +pub fn timeout( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: TimeoutError!void, + ) void, + completion: *Completion, + nanoseconds: u63, +) void { + self.submit( + context, + callback, + completion, + .timeout, + .{ + .expires = self.time.monotonic() + nanoseconds, + }, + struct { + fn doOperation(_: anytype) TimeoutError!void { + return; // timeouts don't have errors for now + } + }, + ); +} + +pub const WriteError = os.PWriteError; + +pub fn write( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: WriteError!usize, + ) void, + completion: *Completion, + fd: os.fd_t, + buffer: []const u8, + offset: u64, +) void { + self.submit( + context, + callback, + completion, + .write, + .{ + .fd = fd, + .buf = buffer.ptr, + .len = @intCast(u32, buffer_limit(buffer.len)), + .offset = offset, + }, + struct { + fn doOperation(op: anytype) WriteError!usize { + return os.pwrite(op.fd, op.buf[0..op.len], op.offset); + } + }, + ); +} + +pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { + const fd = try os.socket(family, sock_type | os.SOCK_NONBLOCK, protocol); + errdefer os.close(fd); + + // darwin doesn't support os.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE. + try os.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))); + return fd; +} + +fn buffer_limit(buffer_len: usize) usize { + + // Linux limits how much may be written in a `pwrite()/pread()` call, which is `0x7ffff000` on + // both 64-bit and 32-bit systems, due to using a signed C int as the return value, as well as + // stuffing the errno codes into the last `4096` values. + // Darwin limits writes to `0x7fffffff` bytes, more than that returns `EINVAL`. + // The corresponding POSIX limit is `std.math.maxInt(isize)`. + const limit = switch (std.Target.current.os.tag) { + .linux => 0x7ffff000, + .macos, .ios, .watchos, .tvos => std.math.maxInt(i32), + else => std.math.maxInt(isize), + }; + return std.math.min(limit, buffer_len); +} diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig new file mode 100644 index 000000000..794f18f90 --- /dev/null +++ b/src/io/io_linux.zig @@ -0,0 +1,874 @@ +const std = @import("std"); +const assert = std.debug.assert; +const os = std.os; +const linux = os.linux; +const IO_Uring = linux.IO_Uring; +const io_uring_cqe = linux.io_uring_cqe; +const io_uring_sqe = linux.io_uring_sqe; + +const FIFO = @import("./fifo.zig").FIFO; +const IO = @This(); + +ring: IO_Uring, + +/// Operations not yet submitted to the kernel and waiting on available space in the +/// submission queue. +unqueued: FIFO(Completion) = .{}, + +/// Completions that are ready to have their callbacks run. +completed: FIFO(Completion) = .{}, + +pub fn init(entries: u12, flags: u32) !IO { + return IO{ .ring = try IO_Uring.init(entries, flags) }; +} + +pub fn deinit(self: *IO) void { + self.ring.deinit(); +} + +/// Pass all queued submissions to the kernel and peek for completions. +pub fn tick(self: *IO) !void { + // We assume that all timeouts submitted by `run_for_ns()` will be reaped by `run_for_ns()` + // and that `tick()` and `run_for_ns()` cannot be run concurrently. + // Therefore `timeouts` here will never be decremented and `etime` will always be false. + var timeouts: usize = 0; + var etime = false; + + try self.flush(0, &timeouts, &etime); + assert(etime == false); + + // Flush any SQEs that were queued while running completion callbacks in `flush()`: + // This is an optimization to avoid delaying submissions until the next tick. + // At the same time, we do not flush any ready CQEs since SQEs may complete synchronously. + // We guard against an io_uring_enter() syscall if we know we do not have any queued SQEs. + // We cannot use `self.ring.sq_ready()` here since this counts flushed and unflushed SQEs. + const queued = self.ring.sq.sqe_tail -% self.ring.sq.sqe_head; + if (queued > 0) { + try self.flush_submissions(0, &timeouts, &etime); + assert(etime == false); + } +} + +/// Pass all queued submissions to the kernel and run for `nanoseconds`. +/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used +/// in the __kernel_timespec struct. +pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { + // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the + // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are + // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC. + var current_ts: os.timespec = undefined; + os.clock_gettime(os.CLOCK_MONOTONIC, ¤t_ts) catch unreachable; + // The absolute CLOCK_MONOTONIC time after which we may return from this function: + const timeout_ts: os.__kernel_timespec = .{ + .tv_sec = current_ts.tv_sec, + .tv_nsec = current_ts.tv_nsec + nanoseconds, + }; + var timeouts: usize = 0; + var etime = false; + while (!etime) { + const timeout_sqe = self.ring.get_sqe() catch blk: { + // The submission queue is full, so flush submissions to make space: + try self.flush_submissions(0, &timeouts, &etime); + break :blk self.ring.get_sqe() catch unreachable; + }; + // Submit an absolute timeout that will be canceled if any other SQE completes first: + linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, os.IORING_TIMEOUT_ABS); + timeout_sqe.user_data = 0; + timeouts += 1; + // The amount of time this call will block is bounded by the timeout we just submitted: + try self.flush(1, &timeouts, &etime); + } + // Reap any remaining timeouts, which reference the timespec in the current stack frame. + // The busy loop here is required to avoid a potential deadlock, as the kernel determines + // when the timeouts are pushed to the completion queue, not us. + while (timeouts > 0) _ = try self.flush_completions(0, &timeouts, &etime); +} + +fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { + // Flush any queued SQEs and reuse the same syscall to wait for completions if required: + try self.flush_submissions(wait_nr, timeouts, etime); + // We can now just peek for any CQEs without waiting and without another syscall: + try self.flush_completions(0, timeouts, etime); + // Run completions only after all completions have been flushed: + // Loop on a copy of the linked list, having reset the list first, so that any synchronous + // append on running a completion is executed only the next time round the event loop, + // without creating an infinite loop. + { + var copy = self.completed; + self.completed = .{}; + while (copy.pop()) |completion| completion.complete(); + } + // Again, loop on a copy of the list to avoid an infinite loop: + { + var copy = self.unqueued; + self.unqueued = .{}; + while (copy.pop()) |completion| self.enqueue(completion); + } +} + +fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { + var cqes: [256]io_uring_cqe = undefined; + var wait_remaining = wait_nr; + while (true) { + // Guard against waiting indefinitely (if there are too few requests inflight), + // especially if this is not the first time round the loop: + const completed = self.ring.copy_cqes(&cqes, wait_remaining) catch |err| switch (err) { + error.SignalInterrupt => continue, + else => return err, + }; + if (completed > wait_remaining) wait_remaining = 0 else wait_remaining -= completed; + for (cqes[0..completed]) |cqe| { + if (cqe.user_data == 0) { + timeouts.* -= 1; + // We are only done if the timeout submitted was completed due to time, not if + // it was completed due to the completion of an event, in which case `cqe.res` + // would be 0. It is possible for multiple timeout operations to complete at the + // same time if the nanoseconds value passed to `run_for_ns()` is very short. + if (-cqe.res == os.ETIME) etime.* = true; + continue; + } + const completion = @intToPtr(*Completion, @intCast(usize, cqe.user_data)); + completion.result = cqe.res; + // We do not run the completion here (instead appending to a linked list) to avoid: + // * recursion through `flush_submissions()` and `flush_completions()`, + // * unbounded stack usage, and + // * confusing stack traces. + self.completed.push(completion); + } + if (completed < cqes.len) break; + } +} + +fn flush_submissions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { + while (true) { + _ = self.ring.submit_and_wait(wait_nr) catch |err| switch (err) { + error.SignalInterrupt => continue, + // Wait for some completions and then try again: + // See https://github.com/axboe/liburing/issues/281 re: error.SystemResources. + // Be careful also that copy_cqes() will flush before entering to wait (it does): + // https://github.com/axboe/liburing/commit/35c199c48dfd54ad46b96e386882e7ac341314c5 + error.CompletionQueueOvercommitted, error.SystemResources => { + try self.flush_completions(1, timeouts, etime); + continue; + }, + else => return err, + }; + break; + } +} + +fn enqueue(self: *IO, completion: *Completion) void { + const sqe = self.ring.get_sqe() catch |err| switch (err) { + error.SubmissionQueueFull => { + self.unqueued.push(completion); + return; + }, + }; + completion.prep(sqe); +} + +/// This struct holds the data needed for a single io_uring operation +pub const Completion = struct { + io: *IO, + result: i32 = undefined, + next: ?*Completion = null, + operation: Operation, + // This is one of the usecases for c_void outside of C code and as such c_void will + // be replaced with anyopaque eventually: https://github.com/ziglang/zig/issues/323 + context: ?*c_void, + callback: fn (context: ?*c_void, completion: *Completion, result: *const c_void) void, + + fn prep(completion: *Completion, sqe: *io_uring_sqe) void { + switch (completion.operation) { + .accept => |*op| { + linux.io_uring_prep_accept( + sqe, + op.socket, + &op.address, + &op.address_size, + os.SOCK_CLOEXEC, + ); + }, + .close => |op| { + linux.io_uring_prep_close(sqe, op.fd); + }, + .connect => |*op| { + linux.io_uring_prep_connect( + sqe, + op.socket, + &op.address.any, + op.address.getOsSockLen(), + ); + }, + .fsync => |op| { + linux.io_uring_prep_fsync(sqe, op.fd, 0); + }, + .read => |op| { + linux.io_uring_prep_read( + sqe, + op.fd, + op.buffer[0..buffer_limit(op.buffer.len)], + op.offset, + ); + }, + .recv => |op| { + linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL); + }, + .send => |op| { + linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL); + }, + .timeout => |*op| { + linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0); + }, + .write => |op| { + linux.io_uring_prep_write( + sqe, + op.fd, + op.buffer[0..buffer_limit(op.buffer.len)], + op.offset, + ); + }, + } + sqe.user_data = @ptrToInt(completion); + } + + fn complete(completion: *Completion) void { + switch (completion.operation) { + .accept => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EAGAIN => error.WouldBlock, + os.EBADF => error.FileDescriptorInvalid, + os.ECONNABORTED => error.ConnectionAborted, + os.EFAULT => unreachable, + os.EINVAL => error.SocketNotListening, + os.EMFILE => error.ProcessFdQuotaExceeded, + os.ENFILE => error.SystemFdQuotaExceeded, + os.ENOBUFS => error.SystemResources, + os.ENOMEM => error.SystemResources, + os.ENOTSOCK => error.FileDescriptorNotASocket, + os.EOPNOTSUPP => error.OperationNotSupported, + os.EPERM => error.PermissionDenied, + os.EPROTO => error.ProtocolFailure, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else @intCast(os.socket_t, completion.result); + completion.callback(completion.context, completion, &result); + }, + .close => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 + os.EBADF => error.FileDescriptorInvalid, + os.EDQUOT => error.DiskQuota, + os.EIO => error.InputOutput, + os.ENOSPC => error.NoSpaceLeft, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else assert(completion.result == 0); + completion.callback(completion.context, completion, &result); + }, + .connect => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EACCES => error.AccessDenied, + os.EADDRINUSE => error.AddressInUse, + os.EADDRNOTAVAIL => error.AddressNotAvailable, + os.EAFNOSUPPORT => error.AddressFamilyNotSupported, + os.EAGAIN, os.EINPROGRESS => error.WouldBlock, + os.EALREADY => error.OpenAlreadyInProgress, + os.EBADF => error.FileDescriptorInvalid, + os.ECONNREFUSED => error.ConnectionRefused, + os.ECONNRESET => error.ConnectionResetByPeer, + os.EFAULT => unreachable, + os.EISCONN => error.AlreadyConnected, + os.ENETUNREACH => error.NetworkUnreachable, + os.ENOENT => error.FileNotFound, + os.ENOTSOCK => error.FileDescriptorNotASocket, + os.EPERM => error.PermissionDenied, + os.EPROTOTYPE => error.ProtocolNotSupported, + os.ETIMEDOUT => error.ConnectionTimedOut, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else assert(completion.result == 0); + completion.callback(completion.context, completion, &result); + }, + .fsync => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EBADF => error.FileDescriptorInvalid, + os.EDQUOT => error.DiskQuota, + os.EINVAL => error.ArgumentsInvalid, + os.EIO => error.InputOutput, + os.ENOSPC => error.NoSpaceLeft, + os.EROFS => error.ReadOnlyFileSystem, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else assert(completion.result == 0); + completion.callback(completion.context, completion, &result); + }, + .read => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EAGAIN => error.WouldBlock, + os.EBADF => error.NotOpenForReading, + os.ECONNRESET => error.ConnectionResetByPeer, + os.EFAULT => unreachable, + os.EINVAL => error.Alignment, + os.EIO => error.InputOutput, + os.EISDIR => error.IsDir, + os.ENOBUFS => error.SystemResources, + os.ENOMEM => error.SystemResources, + os.ENXIO => error.Unseekable, + os.EOVERFLOW => error.Unseekable, + os.ESPIPE => error.Unseekable, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else @intCast(usize, completion.result); + completion.callback(completion.context, completion, &result); + }, + .recv => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EAGAIN => error.WouldBlock, + os.EBADF => error.FileDescriptorInvalid, + os.ECONNREFUSED => error.ConnectionRefused, + os.EFAULT => unreachable, + os.EINVAL => unreachable, + os.ENOMEM => error.SystemResources, + os.ENOTCONN => error.SocketNotConnected, + os.ENOTSOCK => error.FileDescriptorNotASocket, + os.ECONNRESET => error.ConnectionResetByPeer, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else @intCast(usize, completion.result); + completion.callback(completion.context, completion, &result); + }, + .send => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EACCES => error.AccessDenied, + os.EAGAIN => error.WouldBlock, + os.EALREADY => error.FastOpenAlreadyInProgress, + os.EAFNOSUPPORT => error.AddressFamilyNotSupported, + os.EBADF => error.FileDescriptorInvalid, + os.ECONNRESET => error.ConnectionResetByPeer, + os.EDESTADDRREQ => unreachable, + os.EFAULT => unreachable, + os.EINVAL => unreachable, + os.EISCONN => unreachable, + os.EMSGSIZE => error.MessageTooBig, + os.ENOBUFS => error.SystemResources, + os.ENOMEM => error.SystemResources, + os.ENOTCONN => error.SocketNotConnected, + os.ENOTSOCK => error.FileDescriptorNotASocket, + os.EOPNOTSUPP => error.OperationNotSupported, + os.EPIPE => error.BrokenPipe, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else @intCast(usize, completion.result); + completion.callback(completion.context, completion, &result); + }, + .timeout => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.ECANCELED => error.Canceled, + os.ETIME => {}, // A success. + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else unreachable; + completion.callback(completion.context, completion, &result); + }, + .write => { + const result = if (completion.result < 0) switch (-completion.result) { + os.EINTR => { + completion.io.enqueue(completion); + return; + }, + os.EAGAIN => error.WouldBlock, + os.EBADF => error.NotOpenForWriting, + os.EDESTADDRREQ => error.NotConnected, + os.EDQUOT => error.DiskQuota, + os.EFAULT => unreachable, + os.EFBIG => error.FileTooBig, + os.EINVAL => error.Alignment, + os.EIO => error.InputOutput, + os.ENOSPC => error.NoSpaceLeft, + os.ENXIO => error.Unseekable, + os.EOVERFLOW => error.Unseekable, + os.EPERM => error.AccessDenied, + os.EPIPE => error.BrokenPipe, + os.ESPIPE => error.Unseekable, + else => |errno| os.unexpectedErrno(@intCast(usize, errno)), + } else @intCast(usize, completion.result); + completion.callback(completion.context, completion, &result); + }, + } + } +}; + +/// This union encodes the set of operations supported as well as their arguments. +const Operation = union(enum) { + accept: struct { + socket: os.socket_t, + address: os.sockaddr = undefined, + address_size: os.socklen_t = @sizeOf(os.sockaddr), + }, + close: struct { + fd: os.fd_t, + }, + connect: struct { + socket: os.socket_t, + address: std.net.Address, + }, + fsync: struct { + fd: os.fd_t, + }, + read: struct { + fd: os.fd_t, + buffer: []u8, + offset: u64, + }, + recv: struct { + socket: os.socket_t, + buffer: []u8, + }, + send: struct { + socket: os.socket_t, + buffer: []const u8, + }, + timeout: struct { + timespec: os.__kernel_timespec, + }, + write: struct { + fd: os.fd_t, + buffer: []const u8, + offset: u64, + }, +}; + +pub const AcceptError = error{ + WouldBlock, + FileDescriptorInvalid, + ConnectionAborted, + SocketNotListening, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + SystemResources, + FileDescriptorNotASocket, + OperationNotSupported, + PermissionDenied, + ProtocolFailure, +} || os.UnexpectedError; + +pub fn accept( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: AcceptError!os.socket_t, + ) void, + completion: *Completion, + socket: os.socket_t, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const AcceptError!os.socket_t, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .accept = .{ + .socket = socket, + .address = undefined, + .address_size = @sizeOf(os.sockaddr), + }, + }, + }; + self.enqueue(completion); +} + +pub const CloseError = error{ + FileDescriptorInvalid, + DiskQuota, + InputOutput, + NoSpaceLeft, +} || os.UnexpectedError; + +pub fn close( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: CloseError!void, + ) void, + completion: *Completion, + fd: os.fd_t, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const CloseError!void, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .close = .{ .fd = fd }, + }, + }; + self.enqueue(completion); +} + +pub const ConnectError = error{ + AccessDenied, + AddressInUse, + AddressNotAvailable, + AddressFamilyNotSupported, + WouldBlock, + OpenAlreadyInProgress, + FileDescriptorInvalid, + ConnectionRefused, + AlreadyConnected, + NetworkUnreachable, + FileNotFound, + FileDescriptorNotASocket, + PermissionDenied, + ProtocolNotSupported, + ConnectionTimedOut, +} || os.UnexpectedError; + +pub fn connect( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: ConnectError!void, + ) void, + completion: *Completion, + socket: os.socket_t, + address: std.net.Address, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const ConnectError!void, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .connect = .{ + .socket = socket, + .address = address, + }, + }, + }; + self.enqueue(completion); +} + +pub const FsyncError = error{ + FileDescriptorInvalid, + DiskQuota, + ArgumentsInvalid, + InputOutput, + NoSpaceLeft, + ReadOnlyFileSystem, +} || os.UnexpectedError; + +pub fn fsync( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: FsyncError!void, + ) void, + completion: *Completion, + fd: os.fd_t, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const FsyncError!void, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .fsync = .{ + .fd = fd, + }, + }, + }; + self.enqueue(completion); +} + +pub const ReadError = error{ + WouldBlock, + NotOpenForReading, + ConnectionResetByPeer, + Alignment, + InputOutput, + IsDir, + SystemResources, + Unseekable, +} || os.UnexpectedError; + +pub fn read( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: ReadError!usize, + ) void, + completion: *Completion, + fd: os.fd_t, + buffer: []u8, + offset: u64, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const ReadError!usize, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .read = .{ + .fd = fd, + .buffer = buffer, + .offset = offset, + }, + }, + }; + self.enqueue(completion); +} + +pub const RecvError = error{ + WouldBlock, + FileDescriptorInvalid, + ConnectionRefused, + SystemResources, + SocketNotConnected, + FileDescriptorNotASocket, +} || os.UnexpectedError; + +pub fn recv( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: RecvError!usize, + ) void, + completion: *Completion, + socket: os.socket_t, + buffer: []u8, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const RecvError!usize, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .recv = .{ + .socket = socket, + .buffer = buffer, + }, + }, + }; + self.enqueue(completion); +} + +pub const SendError = error{ + AccessDenied, + WouldBlock, + FastOpenAlreadyInProgress, + AddressFamilyNotSupported, + FileDescriptorInvalid, + ConnectionResetByPeer, + MessageTooBig, + SystemResources, + SocketNotConnected, + FileDescriptorNotASocket, + OperationNotSupported, + BrokenPipe, +} || os.UnexpectedError; + +pub fn send( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: SendError!usize, + ) void, + completion: *Completion, + socket: os.socket_t, + buffer: []const u8, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const SendError!usize, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .send = .{ + .socket = socket, + .buffer = buffer, + }, + }, + }; + self.enqueue(completion); +} + +pub const TimeoutError = error{Canceled} || os.UnexpectedError; + +pub fn timeout( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: TimeoutError!void, + ) void, + completion: *Completion, + nanoseconds: u63, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const TimeoutError!void, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .timeout = .{ + .timespec = .{ .tv_sec = 0, .tv_nsec = nanoseconds }, + }, + }, + }; + self.enqueue(completion); +} + +pub const WriteError = error{ + WouldBlock, + NotOpenForWriting, + NotConnected, + DiskQuota, + FileTooBig, + Alignment, + InputOutput, + NoSpaceLeft, + Unseekable, + AccessDenied, + BrokenPipe, +} || os.UnexpectedError; + +pub fn write( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: WriteError!usize, + ) void, + completion: *Completion, + fd: os.fd_t, + buffer: []const u8, + offset: u64, +) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void { + callback( + @intToPtr(Context, @ptrToInt(ctx)), + comp, + @intToPtr(*const WriteError!usize, @ptrToInt(res)).*, + ); + } + }.wrapper, + .operation = .{ + .write = .{ + .fd = fd, + .buffer = buffer, + .offset = offset, + }, + }, + }; + self.enqueue(completion); +} + +pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { + return os.socket(family, sock_type, protocol); +} diff --git a/src/io/time.zig b/src/io/time.zig new file mode 100644 index 000000000..2bfe24da0 --- /dev/null +++ b/src/io/time.zig @@ -0,0 +1,64 @@ +const std = @import("std"); +const assert = std.debug.assert; +const is_darwin = std.Target.current.isDarwin(); + +pub const Time = struct { + const Self = @This(); + + /// Hardware and/or software bugs can mean that the monotonic clock may regress. + /// One example (of many): https://bugzilla.redhat.com/show_bug.cgi?id=448449 + /// We crash the process for safety if this ever happens, to protect against infinite loops. + /// It's better to crash and come back with a valid monotonic clock than get stuck forever. + monotonic_guard: u64 = 0, + + /// A timestamp to measure elapsed time, meaningful only on the same system, not across reboots. + /// Always use a monotonic timestamp if the goal is to measure elapsed time. + /// This clock is not affected by discontinuous jumps in the system time, for example if the + /// system administrator manually changes the clock. + pub fn monotonic(self: *Self) u64 { + const m = blk: { + // Uses mach_continuous_time() instead of mach_absolute_time() as it counts while suspended. + // https://developer.apple.com/documentation/kernel/1646199-mach_continuous_time + // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.c.auto.html + if (is_darwin) { + const darwin = struct { + const mach_timebase_info_t = std.os.darwin.mach_timebase_info_data; + extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) std.os.darwin.kern_return_t; + extern "c" fn mach_continuous_time() u64; + }; + + const now = darwin.mach_continuous_time(); + var info: darwin.mach_timebase_info_t = undefined; + if (darwin.mach_timebase_info(&info) != 0) @panic("mach_timebase_info() failed"); + return (now * info.numer) / info.denom; + } + + // The true monotonic clock on Linux is not in fact CLOCK_MONOTONIC: + // CLOCK_MONOTONIC excludes elapsed time while the system is suspended (e.g. VM migration). + // CLOCK_BOOTTIME is the same as CLOCK_MONOTONIC but includes elapsed time during a suspend. + // For more detail and why CLOCK_MONOTONIC_RAW is even worse than CLOCK_MONOTONIC, + // see https://github.com/ziglang/zig/pull/933#discussion_r656021295. + var ts: std.os.timespec = undefined; + std.os.clock_gettime(std.os.CLOCK_BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required"); + break :blk @intCast(u64, ts.tv_sec) * std.time.ns_per_s + @intCast(u64, ts.tv_nsec); + }; + + // "Oops!...I Did It Again" + if (m < self.monotonic_guard) @panic("a hardware/kernel bug regressed the monotonic clock"); + self.monotonic_guard = m; + return m; + } + + /// A timestamp to measure real (i.e. wall clock) time, meaningful across systems, and reboots. + /// This clock is affected by discontinuous jumps in the system time. + pub fn realtime(self: *Self) i64 { + // macos has supported clock_gettime() since 10.12: + // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.3.auto.html + + var ts: std.os.timespec = undefined; + std.os.clock_gettime(std.os.CLOCK_REALTIME, &ts) catch unreachable; + return @as(i64, ts.tv_sec) * std.time.ns_per_s + ts.tv_nsec; + } + + pub fn tick(self: *Self) void {} +}; |