aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile11
-rw-r--r--build.zig13
-rw-r--r--src/builder.zig2
-rw-r--r--src/io/fifo.zig104
-rw-r--r--src/io/io_darwin.zig665
-rw-r--r--src/io/io_linux.zig874
-rw-r--r--src/io/time.zig64
7 files changed, 1732 insertions, 1 deletions
diff --git a/Makefile b/Makefile
index bd1b2a105..f1c5ca68a 100644
--- a/Makefile
+++ b/Makefile
@@ -826,12 +826,23 @@ endif
endif
+IO_FILE =
+
+ifeq ($(OS_NAME),linux)
+IO_FILE = "src/io/io_linux.zig"
+endif
+
+ifeq ($(OS_NAME),darwin)
+IO_FILE = "src/io/io_darwin.zig"
+endif
+
build-unit:
@rm -rf zig-out/bin/$(testname)
@mkdir -p zig-out/bin
zig test $(realpath $(testpath)) \
$(testfilterflag) \
--pkg-begin picohttp $(DEPS_DIR)/picohttp.zig --pkg-end \
+ --pkg-begin io $(IO_FILE) --pkg-end \
--pkg-begin clap $(DEPS_DIR)/zig-clap/clap.zig --pkg-end \
--main-pkg-path $(shell pwd) \
--test-no-exec \
diff --git a/build.zig b/build.zig
index f754909e9..ba2288e09 100644
--- a/build.zig
+++ b/build.zig
@@ -308,6 +308,19 @@ pub fn build(b: *std.build.Builder) !void {
.path = .{ .path = "src/deps/zig-clap/clap.zig" },
});
+ var platform_label = if (target.isDarwin())
+ "darwin"
+ else
+ "linux";
+ obj.addPackage(.{
+ .name = "io",
+ .path = .{ .path = try std.fmt.allocPrint(b.allocator, "src/io/io_{s}.zig", .{platform_label}) },
+ });
+ exe.addPackage(.{
+ .name = "io",
+ .path = .{ .path = try std.fmt.allocPrint(b.allocator, "src/io/io_{s}.zig", .{platform_label}) },
+ });
+
{
obj_step.dependOn(&b.addLog(
"Build {s} v{} - v{}\n",
diff --git a/src/builder.zig b/src/builder.zig
index 3811a7878..19daa0a20 100644
--- a/src/builder.zig
+++ b/src/builder.zig
@@ -1,7 +1,7 @@
const Allocator = @import("std").mem.Allocator;
const assert = @import("std").debug.assert;
const copy = @import("std").mem.copy;
-
+const io = @import("io");
pub fn Builder(comptime Type: type) type {
return struct {
const This = @This();
diff --git a/src/io/fifo.zig b/src/io/fifo.zig
new file mode 100644
index 000000000..b9b3bc961
--- /dev/null
+++ b/src/io/fifo.zig
@@ -0,0 +1,104 @@
+const std = @import("std");
+const assert = std.debug.assert;
+
+/// An intrusive first in/first out linked list.
+/// The element type T must have a field called "next" of type ?*T
+pub fn FIFO(comptime T: type) type {
+ return struct {
+ const Self = @This();
+
+ in: ?*T = null,
+ out: ?*T = null,
+
+ pub fn push(self: *Self, elem: *T) void {
+ assert(elem.next == null);
+ if (self.in) |in| {
+ in.next = elem;
+ self.in = elem;
+ } else {
+ assert(self.out == null);
+ self.in = elem;
+ self.out = elem;
+ }
+ }
+
+ pub fn pop(self: *Self) ?*T {
+ const ret = self.out orelse return null;
+ self.out = ret.next;
+ ret.next = null;
+ if (self.in == ret) self.in = null;
+ return ret;
+ }
+
+ pub fn peek(self: Self) ?*T {
+ return self.out;
+ }
+
+ /// Remove an element from the FIFO. Asserts that the element is
+ /// in the FIFO. This operation is O(N), if this is done often you
+ /// probably want a different data structure.
+ pub fn remove(self: *Self, to_remove: *T) void {
+ if (to_remove == self.out) {
+ _ = self.pop();
+ return;
+ }
+ var it = self.out;
+ while (it) |elem| : (it = elem.next) {
+ if (to_remove == elem.next) {
+ if (to_remove == self.in) self.in = elem;
+ elem.next = to_remove.next;
+ to_remove.next = null;
+ break;
+ }
+ } else unreachable;
+ }
+ };
+}
+
+test "push/pop/peek/remove" {
+ const testing = @import("std").testing;
+
+ const Foo = struct { next: ?*@This() = null };
+
+ var one: Foo = .{};
+ var two: Foo = .{};
+ var three: Foo = .{};
+
+ var fifo: FIFO(Foo) = .{};
+
+ fifo.push(&one);
+ try testing.expectEqual(@as(?*Foo, &one), fifo.peek());
+
+ fifo.push(&two);
+ fifo.push(&three);
+ try testing.expectEqual(@as(?*Foo, &one), fifo.peek());
+
+ fifo.remove(&one);
+ try testing.expectEqual(@as(?*Foo, &two), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, &three), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, null), fifo.pop());
+
+ fifo.push(&one);
+ fifo.push(&two);
+ fifo.push(&three);
+ fifo.remove(&two);
+ try testing.expectEqual(@as(?*Foo, &one), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, &three), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, null), fifo.pop());
+
+ fifo.push(&one);
+ fifo.push(&two);
+ fifo.push(&three);
+ fifo.remove(&three);
+ try testing.expectEqual(@as(?*Foo, &one), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, &two), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, null), fifo.pop());
+
+ fifo.push(&one);
+ fifo.push(&two);
+ fifo.remove(&two);
+ fifo.push(&three);
+ try testing.expectEqual(@as(?*Foo, &one), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, &three), fifo.pop());
+ try testing.expectEqual(@as(?*Foo, null), fifo.pop());
+}
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig
new file mode 100644
index 000000000..00186610b
--- /dev/null
+++ b/src/io/io_darwin.zig
@@ -0,0 +1,665 @@
+const std = @import("std");
+const os = std.os;
+const mem = std.mem;
+const assert = std.debug.assert;
+
+const FIFO = @import("./fifo.zig").FIFO;
+const Time = @import("./time.zig").Time;
+
+const IO = @This();
+
+kq: os.fd_t,
+time: Time = .{},
+io_inflight: usize = 0,
+timeouts: FIFO(Completion) = .{},
+completed: FIFO(Completion) = .{},
+io_pending: FIFO(Completion) = .{},
+
+pub fn init(entries: u12, flags: u32) !IO {
+ const kq = try os.kqueue();
+ assert(kq > -1);
+ return IO{ .kq = kq };
+}
+
+pub fn deinit(self: *IO) void {
+ assert(self.kq > -1);
+ os.close(self.kq);
+ self.kq = -1;
+}
+
+/// Pass all queued submissions to the kernel and peek for completions.
+pub fn tick(self: *IO) !void {
+ return self.flush(false);
+}
+
+/// Pass all queued submissions to the kernel and run for `nanoseconds`.
+/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
+/// in the __kernel_timespec struct.
+pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
+ var timed_out = false;
+ var completion: Completion = undefined;
+ const on_timeout = struct {
+ fn callback(
+ timed_out_ptr: *bool,
+ _completion: *Completion,
+ _result: TimeoutError!void,
+ ) void {
+ timed_out_ptr.* = true;
+ }
+ }.callback;
+
+ // Submit a timeout which sets the timed_out value to true to terminate the loop below.
+ self.timeout(
+ *bool,
+ &timed_out,
+ on_timeout,
+ &completion,
+ nanoseconds,
+ );
+
+ // Loop until our timeout completion is processed above, which sets timed_out to true.
+ // LLVM shouldn't be able to cache timed_out's value here since its address escapes above.
+ while (!timed_out) {
+ try self.flush(true);
+ }
+}
+
+fn flush(self: *IO, wait_for_completions: bool) !void {
+ var io_pending = self.io_pending.peek();
+ var events: [256]os.Kevent = undefined;
+
+ // Check timeouts and fill events with completions in io_pending
+ // (they will be submitted through kevent).
+ // Timeouts are expired here and possibly pushed to the completed queue.
+ const next_timeout = self.flush_timeouts();
+ const change_events = self.flush_io(&events, &io_pending);
+
+ // Only call kevent() if we need to submit io events or if we need to wait for completions.
+ if (change_events > 0 or self.completed.peek() == null) {
+ // Zero timeouts for kevent() implies a non-blocking poll
+ var ts = std.mem.zeroes(os.timespec);
+
+ // We need to wait (not poll) on kevent if there's nothing to submit or complete.
+ // We should never wait indefinitely (timeout_ptr = null for kevent) given:
+ // - tick() is non-blocking (wait_for_completions = false)
+ // - run_for_ns() always submits a timeout
+ if (change_events == 0 and self.completed.peek() == null) {
+ if (wait_for_completions) {
+ const timeout_ns = next_timeout orelse @panic("kevent() blocking forever");
+ ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
+ ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
+ } else if (self.io_inflight == 0) {
+ return;
+ }
+ }
+
+ const new_events = try os.kevent(
+ self.kq,
+ events[0..change_events],
+ events[0..events.len],
+ &ts,
+ );
+
+ // Mark the io events submitted only after kevent() successfully processed them
+ self.io_pending.out = io_pending;
+ if (io_pending == null) {
+ self.io_pending.in = null;
+ }
+
+ self.io_inflight += change_events;
+ self.io_inflight -= new_events;
+
+ for (events[0..new_events]) |event| {
+ const completion = @intToPtr(*Completion, event.udata);
+ completion.next = null;
+ self.completed.push(completion);
+ }
+ }
+
+ var completed = self.completed;
+ self.completed = .{};
+ while (completed.pop()) |completion| {
+ (completion.callback)(self, completion);
+ }
+}
+
+fn flush_io(self: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize {
+ for (events) |*event, flushed| {
+ const completion = io_pending_top.* orelse return flushed;
+ io_pending_top.* = completion.next;
+
+ const event_info = switch (completion.operation) {
+ .accept => |op| [2]c_int{ op.socket, os.EVFILT_READ },
+ .connect => |op| [2]c_int{ op.socket, os.EVFILT_WRITE },
+ .read => |op| [2]c_int{ op.fd, os.EVFILT_READ },
+ .write => |op| [2]c_int{ op.fd, os.EVFILT_WRITE },
+ .recv => |op| [2]c_int{ op.socket, os.EVFILT_READ },
+ .send => |op| [2]c_int{ op.socket, os.EVFILT_WRITE },
+ else => @panic("invalid completion operation queued for io"),
+ };
+
+ event.* = .{
+ .ident = @intCast(u32, event_info[0]),
+ .filter = @intCast(i16, event_info[1]),
+ .flags = os.EV_ADD | os.EV_ENABLE | os.EV_ONESHOT,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(completion),
+ };
+ }
+ return events.len;
+}
+
+fn flush_timeouts(self: *IO) ?u64 {
+ var min_timeout: ?u64 = null;
+ var timeouts: ?*Completion = self.timeouts.peek();
+ while (timeouts) |completion| {
+ timeouts = completion.next;
+
+ // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call.
+ const now = self.time.monotonic();
+ const expires = completion.operation.timeout.expires;
+
+ // NOTE: remove() could be O(1) here with a doubly-linked-list
+ // since we know the previous Completion.
+ if (now >= expires) {
+ self.timeouts.remove(completion);
+ self.completed.push(completion);
+ continue;
+ }
+
+ const timeout_ns = expires - now;
+ if (min_timeout) |min_ns| {
+ min_timeout = std.math.min(min_ns, timeout_ns);
+ } else {
+ min_timeout = timeout_ns;
+ }
+ }
+ return min_timeout;
+}
+
+/// This struct holds the data needed for a single IO operation
+pub const Completion = struct {
+ next: ?*Completion,
+ context: ?*c_void,
+ callback: fn (*IO, *Completion) void,
+ operation: Operation,
+};
+
+const Operation = union(enum) {
+ accept: struct {
+ socket: os.socket_t,
+ },
+ close: struct {
+ fd: os.fd_t,
+ },
+ connect: struct {
+ socket: os.socket_t,
+ address: std.net.Address,
+ initiated: bool,
+ },
+ fsync: struct {
+ fd: os.fd_t,
+ },
+ read: struct {
+ fd: os.fd_t,
+ buf: [*]u8,
+ len: u32,
+ offset: u64,
+ },
+ recv: struct {
+ socket: os.socket_t,
+ buf: [*]u8,
+ len: u32,
+ },
+ send: struct {
+ socket: os.socket_t,
+ buf: [*]const u8,
+ len: u32,
+ },
+ timeout: struct {
+ expires: u64,
+ },
+ write: struct {
+ fd: os.fd_t,
+ buf: [*]const u8,
+ len: u32,
+ offset: u64,
+ },
+};
+
+fn submit(
+ self: *IO,
+ context: anytype,
+ comptime callback: anytype,
+ completion: *Completion,
+ comptime operation_tag: std.meta.Tag(Operation),
+ operation_data: anytype,
+ comptime OperationImpl: type,
+) void {
+ const Context = @TypeOf(context);
+ const onCompleteFn = struct {
+ fn onComplete(io: *IO, _completion: *Completion) void {
+ // Perform the actual operaton
+ const op_data = &@field(_completion.operation, @tagName(operation_tag));
+ const result = OperationImpl.doOperation(op_data);
+
+ // Requeue onto io_pending if error.WouldBlock
+ switch (operation_tag) {
+ .accept, .connect, .read, .write, .send, .recv => {
+ _ = result catch |err| switch (err) {
+ error.WouldBlock => {
+ _completion.next = null;
+ io.io_pending.push(_completion);
+ return;
+ },
+ else => {},
+ };
+ },
+ else => {},
+ }
+
+ // Complete the Completion
+ return callback(
+ @intToPtr(Context, @ptrToInt(_completion.context)),
+ _completion,
+ result,
+ );
+ }
+ }.onComplete;
+
+ completion.* = .{
+ .next = null,
+ .context = context,
+ .callback = onCompleteFn,
+ .operation = @unionInit(Operation, @tagName(operation_tag), operation_data),
+ };
+
+ switch (operation_tag) {
+ .timeout => self.timeouts.push(completion),
+ else => self.completed.push(completion),
+ }
+}
+
+pub const AcceptError = os.AcceptError || os.SetSockOptError;
+
+pub fn accept(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: AcceptError!os.socket_t,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .accept,
+ .{
+ .socket = socket,
+ },
+ struct {
+ fn doOperation(op: anytype) AcceptError!os.socket_t {
+ const fd = try os.accept(
+ op.socket,
+ null,
+ null,
+ os.SOCK_NONBLOCK | os.SOCK_CLOEXEC,
+ );
+ errdefer os.close(fd);
+
+ // darwin doesn't support os.MSG_NOSIGNAL,
+ // but instead a socket option to avoid SIGPIPE.
+ os.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch |err| return switch (err) {
+ error.TimeoutTooBig => unreachable,
+ error.PermissionDenied => error.NetworkSubsystemFailed,
+ error.AlreadyConnected => error.NetworkSubsystemFailed,
+ error.InvalidProtocolOption => error.ProtocolFailure,
+ else => |e| e,
+ };
+
+ return fd;
+ }
+ },
+ );
+}
+
+pub const CloseError = error{
+ FileDescriptorInvalid,
+ DiskQuota,
+ InputOutput,
+ NoSpaceLeft,
+} || os.UnexpectedError;
+
+pub fn close(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: CloseError!void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .close,
+ .{
+ .fd = fd,
+ },
+ struct {
+ fn doOperation(op: anytype) CloseError!void {
+ return switch (os.errno(os.system.close(op.fd))) {
+ 0 => {},
+ os.EBADF => error.FileDescriptorInvalid,
+ os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
+ os.EIO => error.InputOutput,
+ else => |errno| os.unexpectedErrno(errno),
+ };
+ }
+ },
+ );
+}
+
+pub const ConnectError = os.ConnectError;
+
+pub fn connect(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ConnectError!void,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ address: std.net.Address,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .connect,
+ .{
+ .socket = socket,
+ .address = address,
+ .initiated = false,
+ },
+ struct {
+ fn doOperation(op: anytype) ConnectError!void {
+ // Don't call connect after being rescheduled by io_pending as it gives EISCONN.
+ // Instead, check the socket error to see if has been connected successfully.
+ const result = switch (op.initiated) {
+ true => os.getsockoptError(op.socket),
+ else => os.connect(op.socket, &op.address.any, op.address.getOsSockLen()),
+ };
+
+ op.initiated = true;
+ return result;
+ }
+ },
+ );
+}
+
+pub const FsyncError = os.SyncError;
+
+pub fn fsync(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: FsyncError!void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .fsync,
+ .{
+ .fd = fd,
+ },
+ struct {
+ fn doOperation(op: anytype) FsyncError!void {
+ _ = os.fcntl(op.fd, os.F_FULLFSYNC, 1) catch return os.fsync(op.fd);
+ }
+ },
+ );
+}
+
+pub const ReadError = error{
+ WouldBlock,
+ NotOpenForReading,
+ ConnectionResetByPeer,
+ Alignment,
+ InputOutput,
+ IsDir,
+ SystemResources,
+ Unseekable,
+} || os.UnexpectedError;
+
+pub fn read(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ReadError!usize,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+ buffer: []u8,
+ offset: u64,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .read,
+ .{
+ .fd = fd,
+ .buf = buffer.ptr,
+ .len = @intCast(u32, buffer_limit(buffer.len)),
+ .offset = offset,
+ },
+ struct {
+ fn doOperation(op: anytype) ReadError!usize {
+ while (true) {
+ const rc = os.system.pread(
+ op.fd,
+ op.buf,
+ op.len,
+ @bitCast(isize, op.offset),
+ );
+ return switch (os.errno(rc)) {
+ 0 => @intCast(usize, rc),
+ os.EINTR => continue,
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.NotOpenForReading,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EFAULT => unreachable,
+ os.EINVAL => error.Alignment,
+ os.EIO => error.InputOutput,
+ os.EISDIR => error.IsDir,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENXIO => error.Unseekable,
+ os.EOVERFLOW => error.Unseekable,
+ os.ESPIPE => error.Unseekable,
+ else => |err| os.unexpectedErrno(err),
+ };
+ }
+ }
+ },
+ );
+}
+
+pub const RecvError = os.RecvFromError;
+
+pub fn recv(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: RecvError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []u8,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .recv,
+ .{
+ .socket = socket,
+ .buf = buffer.ptr,
+ .len = @intCast(u32, buffer_limit(buffer.len)),
+ },
+ struct {
+ fn doOperation(op: anytype) RecvError!usize {
+ return os.recv(op.socket, op.buf[0..op.len], 0);
+ }
+ },
+ );
+}
+
+pub const SendError = os.SendError;
+
+pub fn send(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: SendError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []const u8,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .send,
+ .{
+ .socket = socket,
+ .buf = buffer.ptr,
+ .len = @intCast(u32, buffer_limit(buffer.len)),
+ },
+ struct {
+ fn doOperation(op: anytype) SendError!usize {
+ return os.send(op.socket, op.buf[0..op.len], 0);
+ }
+ },
+ );
+}
+
+pub const TimeoutError = error{Canceled} || os.UnexpectedError;
+
+pub fn timeout(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: TimeoutError!void,
+ ) void,
+ completion: *Completion,
+ nanoseconds: u63,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .timeout,
+ .{
+ .expires = self.time.monotonic() + nanoseconds,
+ },
+ struct {
+ fn doOperation(_: anytype) TimeoutError!void {
+ return; // timeouts don't have errors for now
+ }
+ },
+ );
+}
+
+pub const WriteError = os.PWriteError;
+
+pub fn write(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: WriteError!usize,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+ buffer: []const u8,
+ offset: u64,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .write,
+ .{
+ .fd = fd,
+ .buf = buffer.ptr,
+ .len = @intCast(u32, buffer_limit(buffer.len)),
+ .offset = offset,
+ },
+ struct {
+ fn doOperation(op: anytype) WriteError!usize {
+ return os.pwrite(op.fd, op.buf[0..op.len], op.offset);
+ }
+ },
+ );
+}
+
+pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
+ const fd = try os.socket(family, sock_type | os.SOCK_NONBLOCK, protocol);
+ errdefer os.close(fd);
+
+ // darwin doesn't support os.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE.
+ try os.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1)));
+ return fd;
+}
+
+fn buffer_limit(buffer_len: usize) usize {
+
+ // Linux limits how much may be written in a `pwrite()/pread()` call, which is `0x7ffff000` on
+ // both 64-bit and 32-bit systems, due to using a signed C int as the return value, as well as
+ // stuffing the errno codes into the last `4096` values.
+ // Darwin limits writes to `0x7fffffff` bytes, more than that returns `EINVAL`.
+ // The corresponding POSIX limit is `std.math.maxInt(isize)`.
+ const limit = switch (std.Target.current.os.tag) {
+ .linux => 0x7ffff000,
+ .macos, .ios, .watchos, .tvos => std.math.maxInt(i32),
+ else => std.math.maxInt(isize),
+ };
+ return std.math.min(limit, buffer_len);
+}
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
new file mode 100644
index 000000000..794f18f90
--- /dev/null
+++ b/src/io/io_linux.zig
@@ -0,0 +1,874 @@
+const std = @import("std");
+const assert = std.debug.assert;
+const os = std.os;
+const linux = os.linux;
+const IO_Uring = linux.IO_Uring;
+const io_uring_cqe = linux.io_uring_cqe;
+const io_uring_sqe = linux.io_uring_sqe;
+
+const FIFO = @import("./fifo.zig").FIFO;
+const IO = @This();
+
+ring: IO_Uring,
+
+/// Operations not yet submitted to the kernel and waiting on available space in the
+/// submission queue.
+unqueued: FIFO(Completion) = .{},
+
+/// Completions that are ready to have their callbacks run.
+completed: FIFO(Completion) = .{},
+
+pub fn init(entries: u12, flags: u32) !IO {
+ return IO{ .ring = try IO_Uring.init(entries, flags) };
+}
+
+pub fn deinit(self: *IO) void {
+ self.ring.deinit();
+}
+
+/// Pass all queued submissions to the kernel and peek for completions.
+pub fn tick(self: *IO) !void {
+ // We assume that all timeouts submitted by `run_for_ns()` will be reaped by `run_for_ns()`
+ // and that `tick()` and `run_for_ns()` cannot be run concurrently.
+ // Therefore `timeouts` here will never be decremented and `etime` will always be false.
+ var timeouts: usize = 0;
+ var etime = false;
+
+ try self.flush(0, &timeouts, &etime);
+ assert(etime == false);
+
+ // Flush any SQEs that were queued while running completion callbacks in `flush()`:
+ // This is an optimization to avoid delaying submissions until the next tick.
+ // At the same time, we do not flush any ready CQEs since SQEs may complete synchronously.
+ // We guard against an io_uring_enter() syscall if we know we do not have any queued SQEs.
+ // We cannot use `self.ring.sq_ready()` here since this counts flushed and unflushed SQEs.
+ const queued = self.ring.sq.sqe_tail -% self.ring.sq.sqe_head;
+ if (queued > 0) {
+ try self.flush_submissions(0, &timeouts, &etime);
+ assert(etime == false);
+ }
+}
+
+/// Pass all queued submissions to the kernel and run for `nanoseconds`.
+/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
+/// in the __kernel_timespec struct.
+pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
+ // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the
+ // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are
+ // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC.
+ var current_ts: os.timespec = undefined;
+ os.clock_gettime(os.CLOCK_MONOTONIC, &current_ts) catch unreachable;
+ // The absolute CLOCK_MONOTONIC time after which we may return from this function:
+ const timeout_ts: os.__kernel_timespec = .{
+ .tv_sec = current_ts.tv_sec,
+ .tv_nsec = current_ts.tv_nsec + nanoseconds,
+ };
+ var timeouts: usize = 0;
+ var etime = false;
+ while (!etime) {
+ const timeout_sqe = self.ring.get_sqe() catch blk: {
+ // The submission queue is full, so flush submissions to make space:
+ try self.flush_submissions(0, &timeouts, &etime);
+ break :blk self.ring.get_sqe() catch unreachable;
+ };
+ // Submit an absolute timeout that will be canceled if any other SQE completes first:
+ linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, os.IORING_TIMEOUT_ABS);
+ timeout_sqe.user_data = 0;
+ timeouts += 1;
+ // The amount of time this call will block is bounded by the timeout we just submitted:
+ try self.flush(1, &timeouts, &etime);
+ }
+ // Reap any remaining timeouts, which reference the timespec in the current stack frame.
+ // The busy loop here is required to avoid a potential deadlock, as the kernel determines
+ // when the timeouts are pushed to the completion queue, not us.
+ while (timeouts > 0) _ = try self.flush_completions(0, &timeouts, &etime);
+}
+
+fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
+ // Flush any queued SQEs and reuse the same syscall to wait for completions if required:
+ try self.flush_submissions(wait_nr, timeouts, etime);
+ // We can now just peek for any CQEs without waiting and without another syscall:
+ try self.flush_completions(0, timeouts, etime);
+ // Run completions only after all completions have been flushed:
+ // Loop on a copy of the linked list, having reset the list first, so that any synchronous
+ // append on running a completion is executed only the next time round the event loop,
+ // without creating an infinite loop.
+ {
+ var copy = self.completed;
+ self.completed = .{};
+ while (copy.pop()) |completion| completion.complete();
+ }
+ // Again, loop on a copy of the list to avoid an infinite loop:
+ {
+ var copy = self.unqueued;
+ self.unqueued = .{};
+ while (copy.pop()) |completion| self.enqueue(completion);
+ }
+}
+
+fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
+ var cqes: [256]io_uring_cqe = undefined;
+ var wait_remaining = wait_nr;
+ while (true) {
+ // Guard against waiting indefinitely (if there are too few requests inflight),
+ // especially if this is not the first time round the loop:
+ const completed = self.ring.copy_cqes(&cqes, wait_remaining) catch |err| switch (err) {
+ error.SignalInterrupt => continue,
+ else => return err,
+ };
+ if (completed > wait_remaining) wait_remaining = 0 else wait_remaining -= completed;
+ for (cqes[0..completed]) |cqe| {
+ if (cqe.user_data == 0) {
+ timeouts.* -= 1;
+ // We are only done if the timeout submitted was completed due to time, not if
+ // it was completed due to the completion of an event, in which case `cqe.res`
+ // would be 0. It is possible for multiple timeout operations to complete at the
+ // same time if the nanoseconds value passed to `run_for_ns()` is very short.
+ if (-cqe.res == os.ETIME) etime.* = true;
+ continue;
+ }
+ const completion = @intToPtr(*Completion, @intCast(usize, cqe.user_data));
+ completion.result = cqe.res;
+ // We do not run the completion here (instead appending to a linked list) to avoid:
+ // * recursion through `flush_submissions()` and `flush_completions()`,
+ // * unbounded stack usage, and
+ // * confusing stack traces.
+ self.completed.push(completion);
+ }
+ if (completed < cqes.len) break;
+ }
+}
+
+fn flush_submissions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
+ while (true) {
+ _ = self.ring.submit_and_wait(wait_nr) catch |err| switch (err) {
+ error.SignalInterrupt => continue,
+ // Wait for some completions and then try again:
+ // See https://github.com/axboe/liburing/issues/281 re: error.SystemResources.
+ // Be careful also that copy_cqes() will flush before entering to wait (it does):
+ // https://github.com/axboe/liburing/commit/35c199c48dfd54ad46b96e386882e7ac341314c5
+ error.CompletionQueueOvercommitted, error.SystemResources => {
+ try self.flush_completions(1, timeouts, etime);
+ continue;
+ },
+ else => return err,
+ };
+ break;
+ }
+}
+
+fn enqueue(self: *IO, completion: *Completion) void {
+ const sqe = self.ring.get_sqe() catch |err| switch (err) {
+ error.SubmissionQueueFull => {
+ self.unqueued.push(completion);
+ return;
+ },
+ };
+ completion.prep(sqe);
+}
+
+/// This struct holds the data needed for a single io_uring operation
+pub const Completion = struct {
+ io: *IO,
+ result: i32 = undefined,
+ next: ?*Completion = null,
+ operation: Operation,
+ // This is one of the usecases for c_void outside of C code and as such c_void will
+ // be replaced with anyopaque eventually: https://github.com/ziglang/zig/issues/323
+ context: ?*c_void,
+ callback: fn (context: ?*c_void, completion: *Completion, result: *const c_void) void,
+
+ fn prep(completion: *Completion, sqe: *io_uring_sqe) void {
+ switch (completion.operation) {
+ .accept => |*op| {
+ linux.io_uring_prep_accept(
+ sqe,
+ op.socket,
+ &op.address,
+ &op.address_size,
+ os.SOCK_CLOEXEC,
+ );
+ },
+ .close => |op| {
+ linux.io_uring_prep_close(sqe, op.fd);
+ },
+ .connect => |*op| {
+ linux.io_uring_prep_connect(
+ sqe,
+ op.socket,
+ &op.address.any,
+ op.address.getOsSockLen(),
+ );
+ },
+ .fsync => |op| {
+ linux.io_uring_prep_fsync(sqe, op.fd, 0);
+ },
+ .read => |op| {
+ linux.io_uring_prep_read(
+ sqe,
+ op.fd,
+ op.buffer[0..buffer_limit(op.buffer.len)],
+ op.offset,
+ );
+ },
+ .recv => |op| {
+ linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL);
+ },
+ .send => |op| {
+ linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG_NOSIGNAL);
+ },
+ .timeout => |*op| {
+ linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0);
+ },
+ .write => |op| {
+ linux.io_uring_prep_write(
+ sqe,
+ op.fd,
+ op.buffer[0..buffer_limit(op.buffer.len)],
+ op.offset,
+ );
+ },
+ }
+ sqe.user_data = @ptrToInt(completion);
+ }
+
+ fn complete(completion: *Completion) void {
+ switch (completion.operation) {
+ .accept => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNABORTED => error.ConnectionAborted,
+ os.EFAULT => unreachable,
+ os.EINVAL => error.SocketNotListening,
+ os.EMFILE => error.ProcessFdQuotaExceeded,
+ os.ENFILE => error.SystemFdQuotaExceeded,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EOPNOTSUPP => error.OperationNotSupported,
+ os.EPERM => error.PermissionDenied,
+ os.EPROTO => error.ProtocolFailure,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(os.socket_t, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .close => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
+ os.EBADF => error.FileDescriptorInvalid,
+ os.EDQUOT => error.DiskQuota,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
+ .connect => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EACCES => error.AccessDenied,
+ os.EADDRINUSE => error.AddressInUse,
+ os.EADDRNOTAVAIL => error.AddressNotAvailable,
+ os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
+ os.EAGAIN, os.EINPROGRESS => error.WouldBlock,
+ os.EALREADY => error.OpenAlreadyInProgress,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNREFUSED => error.ConnectionRefused,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EFAULT => unreachable,
+ os.EISCONN => error.AlreadyConnected,
+ os.ENETUNREACH => error.NetworkUnreachable,
+ os.ENOENT => error.FileNotFound,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EPERM => error.PermissionDenied,
+ os.EPROTOTYPE => error.ProtocolNotSupported,
+ os.ETIMEDOUT => error.ConnectionTimedOut,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
+ .fsync => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EBADF => error.FileDescriptorInvalid,
+ os.EDQUOT => error.DiskQuota,
+ os.EINVAL => error.ArgumentsInvalid,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ os.EROFS => error.ReadOnlyFileSystem,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
+ .read => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.NotOpenForReading,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EFAULT => unreachable,
+ os.EINVAL => error.Alignment,
+ os.EIO => error.InputOutput,
+ os.EISDIR => error.IsDir,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENXIO => error.Unseekable,
+ os.EOVERFLOW => error.Unseekable,
+ os.ESPIPE => error.Unseekable,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .recv => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNREFUSED => error.ConnectionRefused,
+ os.EFAULT => unreachable,
+ os.EINVAL => unreachable,
+ os.ENOMEM => error.SystemResources,
+ os.ENOTCONN => error.SocketNotConnected,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .send => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EACCES => error.AccessDenied,
+ os.EAGAIN => error.WouldBlock,
+ os.EALREADY => error.FastOpenAlreadyInProgress,
+ os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EDESTADDRREQ => unreachable,
+ os.EFAULT => unreachable,
+ os.EINVAL => unreachable,
+ os.EISCONN => unreachable,
+ os.EMSGSIZE => error.MessageTooBig,
+ os.ENOBUFS => error.SystemResources,
+ os.ENOMEM => error.SystemResources,
+ os.ENOTCONN => error.SocketNotConnected,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EOPNOTSUPP => error.OperationNotSupported,
+ os.EPIPE => error.BrokenPipe,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ .timeout => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.ECANCELED => error.Canceled,
+ os.ETIME => {}, // A success.
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else unreachable;
+ completion.callback(completion.context, completion, &result);
+ },
+ .write => {
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EAGAIN => error.WouldBlock,
+ os.EBADF => error.NotOpenForWriting,
+ os.EDESTADDRREQ => error.NotConnected,
+ os.EDQUOT => error.DiskQuota,
+ os.EFAULT => unreachable,
+ os.EFBIG => error.FileTooBig,
+ os.EINVAL => error.Alignment,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ os.ENXIO => error.Unseekable,
+ os.EOVERFLOW => error.Unseekable,
+ os.EPERM => error.AccessDenied,
+ os.EPIPE => error.BrokenPipe,
+ os.ESPIPE => error.Unseekable,
+ else => |errno| os.unexpectedErrno(@intCast(usize, errno)),
+ } else @intCast(usize, completion.result);
+ completion.callback(completion.context, completion, &result);
+ },
+ }
+ }
+};
+
+/// This union encodes the set of operations supported as well as their arguments.
+const Operation = union(enum) {
+ accept: struct {
+ socket: os.socket_t,
+ address: os.sockaddr = undefined,
+ address_size: os.socklen_t = @sizeOf(os.sockaddr),
+ },
+ close: struct {
+ fd: os.fd_t,
+ },
+ connect: struct {
+ socket: os.socket_t,
+ address: std.net.Address,
+ },
+ fsync: struct {
+ fd: os.fd_t,
+ },
+ read: struct {
+ fd: os.fd_t,
+ buffer: []u8,
+ offset: u64,
+ },
+ recv: struct {
+ socket: os.socket_t,
+ buffer: []u8,
+ },
+ send: struct {
+ socket: os.socket_t,
+ buffer: []const u8,
+ },
+ timeout: struct {
+ timespec: os.__kernel_timespec,
+ },
+ write: struct {
+ fd: os.fd_t,
+ buffer: []const u8,
+ offset: u64,
+ },
+};
+
+pub const AcceptError = error{
+ WouldBlock,
+ FileDescriptorInvalid,
+ ConnectionAborted,
+ SocketNotListening,
+ ProcessFdQuotaExceeded,
+ SystemFdQuotaExceeded,
+ SystemResources,
+ FileDescriptorNotASocket,
+ OperationNotSupported,
+ PermissionDenied,
+ ProtocolFailure,
+} || os.UnexpectedError;
+
+pub fn accept(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: AcceptError!os.socket_t,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const AcceptError!os.socket_t, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .accept = .{
+ .socket = socket,
+ .address = undefined,
+ .address_size = @sizeOf(os.sockaddr),
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const CloseError = error{
+ FileDescriptorInvalid,
+ DiskQuota,
+ InputOutput,
+ NoSpaceLeft,
+} || os.UnexpectedError;
+
+pub fn close(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: CloseError!void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const CloseError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .close = .{ .fd = fd },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const ConnectError = error{
+ AccessDenied,
+ AddressInUse,
+ AddressNotAvailable,
+ AddressFamilyNotSupported,
+ WouldBlock,
+ OpenAlreadyInProgress,
+ FileDescriptorInvalid,
+ ConnectionRefused,
+ AlreadyConnected,
+ NetworkUnreachable,
+ FileNotFound,
+ FileDescriptorNotASocket,
+ PermissionDenied,
+ ProtocolNotSupported,
+ ConnectionTimedOut,
+} || os.UnexpectedError;
+
+pub fn connect(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ConnectError!void,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ address: std.net.Address,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const ConnectError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .connect = .{
+ .socket = socket,
+ .address = address,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const FsyncError = error{
+ FileDescriptorInvalid,
+ DiskQuota,
+ ArgumentsInvalid,
+ InputOutput,
+ NoSpaceLeft,
+ ReadOnlyFileSystem,
+} || os.UnexpectedError;
+
+pub fn fsync(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: FsyncError!void,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const FsyncError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .fsync = .{
+ .fd = fd,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const ReadError = error{
+ WouldBlock,
+ NotOpenForReading,
+ ConnectionResetByPeer,
+ Alignment,
+ InputOutput,
+ IsDir,
+ SystemResources,
+ Unseekable,
+} || os.UnexpectedError;
+
+pub fn read(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ReadError!usize,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+ buffer: []u8,
+ offset: u64,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const ReadError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .read = .{
+ .fd = fd,
+ .buffer = buffer,
+ .offset = offset,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const RecvError = error{
+ WouldBlock,
+ FileDescriptorInvalid,
+ ConnectionRefused,
+ SystemResources,
+ SocketNotConnected,
+ FileDescriptorNotASocket,
+} || os.UnexpectedError;
+
+pub fn recv(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: RecvError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []u8,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const RecvError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .recv = .{
+ .socket = socket,
+ .buffer = buffer,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const SendError = error{
+ AccessDenied,
+ WouldBlock,
+ FastOpenAlreadyInProgress,
+ AddressFamilyNotSupported,
+ FileDescriptorInvalid,
+ ConnectionResetByPeer,
+ MessageTooBig,
+ SystemResources,
+ SocketNotConnected,
+ FileDescriptorNotASocket,
+ OperationNotSupported,
+ BrokenPipe,
+} || os.UnexpectedError;
+
+pub fn send(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: SendError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []const u8,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const SendError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .send = .{
+ .socket = socket,
+ .buffer = buffer,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const TimeoutError = error{Canceled} || os.UnexpectedError;
+
+pub fn timeout(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: TimeoutError!void,
+ ) void,
+ completion: *Completion,
+ nanoseconds: u63,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const TimeoutError!void, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .timeout = .{
+ .timespec = .{ .tv_sec = 0, .tv_nsec = nanoseconds },
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub const WriteError = error{
+ WouldBlock,
+ NotOpenForWriting,
+ NotConnected,
+ DiskQuota,
+ FileTooBig,
+ Alignment,
+ InputOutput,
+ NoSpaceLeft,
+ Unseekable,
+ AccessDenied,
+ BrokenPipe,
+} || os.UnexpectedError;
+
+pub fn write(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: WriteError!usize,
+ ) void,
+ completion: *Completion,
+ fd: os.fd_t,
+ buffer: []const u8,
+ offset: u64,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*c_void, comp: *Completion, res: *const c_void) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const WriteError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .write = .{
+ .fd = fd,
+ .buffer = buffer,
+ .offset = offset,
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
+pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
+ return os.socket(family, sock_type, protocol);
+}
diff --git a/src/io/time.zig b/src/io/time.zig
new file mode 100644
index 000000000..2bfe24da0
--- /dev/null
+++ b/src/io/time.zig
@@ -0,0 +1,64 @@
+const std = @import("std");
+const assert = std.debug.assert;
+const is_darwin = std.Target.current.isDarwin();
+
+pub const Time = struct {
+ const Self = @This();
+
+ /// Hardware and/or software bugs can mean that the monotonic clock may regress.
+ /// One example (of many): https://bugzilla.redhat.com/show_bug.cgi?id=448449
+ /// We crash the process for safety if this ever happens, to protect against infinite loops.
+ /// It's better to crash and come back with a valid monotonic clock than get stuck forever.
+ monotonic_guard: u64 = 0,
+
+ /// A timestamp to measure elapsed time, meaningful only on the same system, not across reboots.
+ /// Always use a monotonic timestamp if the goal is to measure elapsed time.
+ /// This clock is not affected by discontinuous jumps in the system time, for example if the
+ /// system administrator manually changes the clock.
+ pub fn monotonic(self: *Self) u64 {
+ const m = blk: {
+ // Uses mach_continuous_time() instead of mach_absolute_time() as it counts while suspended.
+ // https://developer.apple.com/documentation/kernel/1646199-mach_continuous_time
+ // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.c.auto.html
+ if (is_darwin) {
+ const darwin = struct {
+ const mach_timebase_info_t = std.os.darwin.mach_timebase_info_data;
+ extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) std.os.darwin.kern_return_t;
+ extern "c" fn mach_continuous_time() u64;
+ };
+
+ const now = darwin.mach_continuous_time();
+ var info: darwin.mach_timebase_info_t = undefined;
+ if (darwin.mach_timebase_info(&info) != 0) @panic("mach_timebase_info() failed");
+ return (now * info.numer) / info.denom;
+ }
+
+ // The true monotonic clock on Linux is not in fact CLOCK_MONOTONIC:
+ // CLOCK_MONOTONIC excludes elapsed time while the system is suspended (e.g. VM migration).
+ // CLOCK_BOOTTIME is the same as CLOCK_MONOTONIC but includes elapsed time during a suspend.
+ // For more detail and why CLOCK_MONOTONIC_RAW is even worse than CLOCK_MONOTONIC,
+ // see https://github.com/ziglang/zig/pull/933#discussion_r656021295.
+ var ts: std.os.timespec = undefined;
+ std.os.clock_gettime(std.os.CLOCK_BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required");
+ break :blk @intCast(u64, ts.tv_sec) * std.time.ns_per_s + @intCast(u64, ts.tv_nsec);
+ };
+
+ // "Oops!...I Did It Again"
+ if (m < self.monotonic_guard) @panic("a hardware/kernel bug regressed the monotonic clock");
+ self.monotonic_guard = m;
+ return m;
+ }
+
+ /// A timestamp to measure real (i.e. wall clock) time, meaningful across systems, and reboots.
+ /// This clock is affected by discontinuous jumps in the system time.
+ pub fn realtime(self: *Self) i64 {
+ // macos has supported clock_gettime() since 10.12:
+ // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.3.auto.html
+
+ var ts: std.os.timespec = undefined;
+ std.os.clock_gettime(std.os.CLOCK_REALTIME, &ts) catch unreachable;
+ return @as(i64, ts.tv_sec) * std.time.ns_per_s + ts.tv_nsec;
+ }
+
+ pub fn tick(self: *Self) void {}
+};