aboutsummaryrefslogtreecommitdiff
path: root/src/io/io_linux.zig
diff options
context:
space:
mode:
authorGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-04-04 23:25:54 -0700
committerGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-04-04 23:25:54 -0700
commit30542225c6e140c900078538a6a563561005411b (patch)
tree9a2c4d1b9b09441df9712d8d410e0810421f5a53 /src/io/io_linux.zig
parent759b6c18fdbaa3788dad0ccde0115e4dce1a6470 (diff)
downloadbun-30542225c6e140c900078538a6a563561005411b.tar.gz
bun-30542225c6e140c900078538a6a563561005411b.tar.zst
bun-30542225c6e140c900078538a6a563561005411b.zip
fix bug with io sometimes sleeping permanetly
Diffstat (limited to 'src/io/io_linux.zig')
-rw-r--r--src/io/io_linux.zig50
1 files changed, 32 insertions, 18 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
index fa4566b83..374ba9d78 100644
--- a/src/io/io_linux.zig
+++ b/src/io/io_linux.zig
@@ -448,7 +448,7 @@ const IO = @This();
ring: IO_Uring,
-pending_timeouts: u32 = 0,
+pending_count: usize = 0,
/// Operations not yet submitted to the kernel and waiting on available space in the
/// submission queue.
@@ -460,9 +460,7 @@ completed: FIFO(Completion) = .{},
next_tick: FIFO(Completion) = .{},
pub fn hasNoWork(this: *IO) bool {
- return this.completed.peek() == null and this.unqueued.peek() == null and
- this.next_tick.peek() == null and
- this.pending_timeouts == 0;
+ return this.pending_count == 0;
}
pub fn init(entries_: u12, flags: u32) !IO {
@@ -916,7 +914,7 @@ pub const Completion = struct {
os.ECANCELED => error.Canceled,
os.ETIME => {}, // A success.
else => |errno| asError(errno),
- } else unreachable;
+ } else void{};
completion.callback(completion.context, completion, &result);
},
.write => {
@@ -1040,6 +1038,7 @@ pub fn accept(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1055,7 +1054,7 @@ pub fn accept(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const CloseError = error{
@@ -1082,6 +1081,7 @@ pub fn close(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1095,11 +1095,12 @@ pub fn close(
if (features.close_blocking) {
const rc = linux.close(fd);
completion.result = @intCast(i32, rc);
+ self.pending_count +|= 1;
self.next_tick.push(completion);
return;
}
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const ConnectError = error{
@@ -1138,6 +1139,7 @@ pub fn connect(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1160,7 +1162,7 @@ pub fn connect(
return;
}
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const FsyncError = error{
@@ -1189,6 +1191,7 @@ pub fn fsync(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1202,7 +1205,7 @@ pub fn fsync(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const ReadError = error{
@@ -1235,6 +1238,7 @@ pub fn read(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1250,7 +1254,7 @@ pub fn read(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const RecvError = error{
@@ -1285,6 +1289,7 @@ pub fn recv(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1299,7 +1304,7 @@ pub fn recv(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub fn readev(
@@ -1320,6 +1325,7 @@ pub fn readev(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1334,7 +1340,7 @@ pub fn readev(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const SendError = error{
@@ -1376,6 +1382,7 @@ pub fn send(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1390,7 +1397,7 @@ pub fn send(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const OpenError = error{
@@ -1468,6 +1475,7 @@ pub fn open(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1483,7 +1491,7 @@ pub fn open(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub fn writev(
@@ -1505,6 +1513,7 @@ pub fn writev(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1521,7 +1530,7 @@ pub fn writev(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const TimeoutError = error{Canceled} || Errno;
@@ -1538,13 +1547,12 @@ pub fn timeout(
completion: *Completion,
nanoseconds: u63,
) void {
- self.pending_timeouts +|= 1;
completion.* = .{
.io = self,
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_timeouts -|= 1;
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1558,7 +1566,7 @@ pub fn timeout(
},
},
};
- self.enqueue(completion);
+ self.enqueueNew(completion);
}
pub const WriteError = error{
@@ -1594,6 +1602,7 @@ pub fn write(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1609,6 +1618,11 @@ pub fn write(
},
},
};
+ self.enqueueNew(completion);
+}
+
+inline fn enqueueNew(self: *IO, completion: *Completion) void {
+ self.pending_count +|= 1;
self.enqueue(completion);
}