diff options
author | 2022-04-04 23:25:54 -0700 | |
---|---|---|
committer | 2022-04-04 23:25:54 -0700 | |
commit | 30542225c6e140c900078538a6a563561005411b (patch) | |
tree | 9a2c4d1b9b09441df9712d8d410e0810421f5a53 /src/io/io_linux.zig | |
parent | 759b6c18fdbaa3788dad0ccde0115e4dce1a6470 (diff) | |
download | bun-30542225c6e140c900078538a6a563561005411b.tar.gz bun-30542225c6e140c900078538a6a563561005411b.tar.zst bun-30542225c6e140c900078538a6a563561005411b.zip |
fix bug with io sometimes sleeping permanetly
Diffstat (limited to 'src/io/io_linux.zig')
-rw-r--r-- | src/io/io_linux.zig | 50 |
1 files changed, 32 insertions, 18 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index fa4566b83..374ba9d78 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -448,7 +448,7 @@ const IO = @This(); ring: IO_Uring, -pending_timeouts: u32 = 0, +pending_count: usize = 0, /// Operations not yet submitted to the kernel and waiting on available space in the /// submission queue. @@ -460,9 +460,7 @@ completed: FIFO(Completion) = .{}, next_tick: FIFO(Completion) = .{}, pub fn hasNoWork(this: *IO) bool { - return this.completed.peek() == null and this.unqueued.peek() == null and - this.next_tick.peek() == null and - this.pending_timeouts == 0; + return this.pending_count == 0; } pub fn init(entries_: u12, flags: u32) !IO { @@ -916,7 +914,7 @@ pub const Completion = struct { os.ECANCELED => error.Canceled, os.ETIME => {}, // A success. else => |errno| asError(errno), - } else unreachable; + } else void{}; completion.callback(completion.context, completion, &result); }, .write => { @@ -1040,6 +1038,7 @@ pub fn accept( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1055,7 +1054,7 @@ pub fn accept( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const CloseError = error{ @@ -1082,6 +1081,7 @@ pub fn close( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1095,11 +1095,12 @@ pub fn close( if (features.close_blocking) { const rc = linux.close(fd); completion.result = @intCast(i32, rc); + self.pending_count +|= 1; self.next_tick.push(completion); return; } - self.enqueue(completion); + self.enqueueNew(completion); } pub const ConnectError = error{ @@ -1138,6 +1139,7 @@ pub fn connect( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1160,7 +1162,7 @@ pub fn connect( return; } - self.enqueue(completion); + self.enqueueNew(completion); } pub const FsyncError = error{ @@ -1189,6 +1191,7 @@ pub fn fsync( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1202,7 +1205,7 @@ pub fn fsync( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const ReadError = error{ @@ -1235,6 +1238,7 @@ pub fn read( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1250,7 +1254,7 @@ pub fn read( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const RecvError = error{ @@ -1285,6 +1289,7 @@ pub fn recv( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1299,7 +1304,7 @@ pub fn recv( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub fn readev( @@ -1320,6 +1325,7 @@ pub fn readev( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1334,7 +1340,7 @@ pub fn readev( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const SendError = error{ @@ -1376,6 +1382,7 @@ pub fn send( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1390,7 +1397,7 @@ pub fn send( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const OpenError = error{ @@ -1468,6 +1475,7 @@ pub fn open( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1483,7 +1491,7 @@ pub fn open( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub fn writev( @@ -1505,6 +1513,7 @@ pub fn writev( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1521,7 +1530,7 @@ pub fn writev( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const TimeoutError = error{Canceled} || Errno; @@ -1538,13 +1547,12 @@ pub fn timeout( completion: *Completion, nanoseconds: u63, ) void { - self.pending_timeouts +|= 1; completion.* = .{ .io = self, .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_timeouts -|= 1; + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1558,7 +1566,7 @@ pub fn timeout( }, }, }; - self.enqueue(completion); + self.enqueueNew(completion); } pub const WriteError = error{ @@ -1594,6 +1602,7 @@ pub fn write( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1609,6 +1618,11 @@ pub fn write( }, }, }; + self.enqueueNew(completion); +} + +inline fn enqueueNew(self: *IO, completion: *Completion) void { + self.pending_count +|= 1; self.enqueue(completion); } |