diff options
Diffstat (limited to 'src/io/io_linux.zig')
-rw-r--r-- | src/io/io_linux.zig | 74 |
1 files changed, 52 insertions, 22 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index b4f21dee5..d7a164665 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -448,8 +448,6 @@ const IO = @This(); ring: IO_Uring, -pending_count: usize = 0, - /// Operations not yet submitted to the kernel and waiting on available space in the /// submission queue. unqueued: FIFO(Completion) = .{}, @@ -458,12 +456,49 @@ unqueued: FIFO(Completion) = .{}, completed: FIFO(Completion) = .{}, next_tick: FIFO(Completion) = .{}, +event_fd: linux.fd_t = 0, + +eventfd_buf: [16]u8 = undefined, +has_queued: usize = 0, +wakeup_completion: Completion = undefined, + +fn queueForWakeup(this: *@This(), comptime Type: type, ctx: Type, comptime cb: anytype) void { + @memset(&this.eventfd_buf, 0, this.eventfd_buf.len); + const Callback = struct { + pub fn callback(that: Type, completion: *Completion, _: ReadError!usize) void { + var io = @fieldParentPtr(IO, "wakeup_completion", completion); + io.has_queued -|= 1; + cb(that); + } + }; + this.read( + Type, + ctx, + Callback.callback, + &this.wakeup_completion, + this.event_fd, + &this.eventfd_buf, + null, + ); + this.has_queued +|= 1; +} + +pub fn wait(this: *@This(), ptr: anytype, comptime onReady: anytype) void { + // Subscribe to wakeups + if (this.has_queued == 0) { + this.queueForWakeup(@TypeOf(ptr), ptr, onReady); + } + + this.tick() catch {}; -pub fn hasNoWork(this: *IO) bool { - return this.pending_count == 0; + if (this.has_queued == 0) { + return; + } + const submitted = this.ring.flush_sq(); + _ = this.ring.enter(submitted, 1, linux.IORING_ENTER_GETEVENTS) catch 0; } -pub fn init(entries_: u12, flags: u32) !IO { +pub fn init(entries_: u12, flags: u32, event_fd: os.fd_t) !IO { var ring: IO_Uring = undefined; var entries = entries_; @@ -480,6 +515,7 @@ pub fn init(entries_: u12, flags: u32) !IO { } var limit = linux.rlimit{ .cur = 0, .max = 0 }; + if (linux.getrlimit(.MEMLOCK, &limit) == 0) { if (limit.cur < 16 * 1024) { return error.@"memlock is too low. Please increase it to at least 64k"; @@ -505,7 +541,7 @@ pub fn init(entries_: u12, flags: u32) !IO { break; } - return IO{ .ring = ring }; + return IO{ .ring = ring, .event_fd = event_fd }; } pub fn deinit(self: *IO) void { @@ -542,6 +578,8 @@ pub fn tick(self: *IO) !void { /// The `nanoseconds` argument is a u63 to allow coercion to the i64 used /// in the timespec struct. pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { + assert(nanoseconds > 0); + while (self.next_tick.pop()) |completion| { completion.complete(); } @@ -600,7 +638,9 @@ fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { } fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { - var cqes: [256]io_uring_cqe = undefined; + var cqes: [256]std.os.linux.io_uring_cqe = undefined; + var completion_byttes = std.mem.asBytes(&cqes); + @memset(completion_byttes, 0, completion_byttes.len); var wait_remaining = wait_nr; while (true) { // Guard against waiting indefinitely (if there are too few requests inflight), @@ -1038,7 +1078,6 @@ pub fn accept( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1081,7 +1120,6 @@ pub fn close( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1095,7 +1133,6 @@ pub fn close( if (features.close_blocking) { const rc = linux.close(fd); completion.result = @intCast(i32, rc); - self.pending_count +|= 1; self.next_tick.push(completion); return; } @@ -1139,7 +1176,6 @@ pub fn connect( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1191,7 +1227,6 @@ pub fn fsync( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1238,7 +1273,6 @@ pub fn read( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1290,7 +1324,6 @@ pub fn recv( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1326,7 +1359,6 @@ pub fn readev( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1383,7 +1415,6 @@ pub fn send( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1476,7 +1507,6 @@ pub fn open( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1514,7 +1544,6 @@ pub fn writev( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1553,7 +1582,6 @@ pub fn timeout( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1603,7 +1631,6 @@ pub fn write( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1623,7 +1650,10 @@ pub fn write( } inline fn enqueueNew(self: *IO, completion: *Completion) void { - self.pending_count +|= 1; + self.enqueue(completion); +} + +pub fn wake(self: *IO, completion: *Completion) void { self.enqueue(completion); } @@ -1658,7 +1688,7 @@ const Syscall = struct { }; pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { - return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC, protocol); + return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC | os.SOCK.NONBLOCK, protocol); } pub var global: IO = undefined; |