aboutsummaryrefslogtreecommitdiff
path: root/src/io/io_linux.zig
diff options
context:
space:
mode:
authorGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-08-13 06:07:18 -0700
committerGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-08-13 06:07:18 -0700
commit0815c83974729d47ae220a7206e7df2de45981e6 (patch)
treef128e71a8bd6560512fd4800d61357031fea513f /src/io/io_linux.zig
parent65ca0503a7c1f775638ecbb363c6d423a1cd0d89 (diff)
downloadbun-0815c83974729d47ae220a7206e7df2de45981e6.tar.gz
bun-0815c83974729d47ae220a7206e7df2de45981e6.tar.zst
bun-0815c83974729d47ae220a7206e7df2de45981e6.zip
Improve event loop reliability on Linux
Diffstat (limited to 'src/io/io_linux.zig')
-rw-r--r--src/io/io_linux.zig74
1 files changed, 52 insertions, 22 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
index b4f21dee5..d7a164665 100644
--- a/src/io/io_linux.zig
+++ b/src/io/io_linux.zig
@@ -448,8 +448,6 @@ const IO = @This();
ring: IO_Uring,
-pending_count: usize = 0,
-
/// Operations not yet submitted to the kernel and waiting on available space in the
/// submission queue.
unqueued: FIFO(Completion) = .{},
@@ -458,12 +456,49 @@ unqueued: FIFO(Completion) = .{},
completed: FIFO(Completion) = .{},
next_tick: FIFO(Completion) = .{},
+event_fd: linux.fd_t = 0,
+
+eventfd_buf: [16]u8 = undefined,
+has_queued: usize = 0,
+wakeup_completion: Completion = undefined,
+
+fn queueForWakeup(this: *@This(), comptime Type: type, ctx: Type, comptime cb: anytype) void {
+ @memset(&this.eventfd_buf, 0, this.eventfd_buf.len);
+ const Callback = struct {
+ pub fn callback(that: Type, completion: *Completion, _: ReadError!usize) void {
+ var io = @fieldParentPtr(IO, "wakeup_completion", completion);
+ io.has_queued -|= 1;
+ cb(that);
+ }
+ };
+ this.read(
+ Type,
+ ctx,
+ Callback.callback,
+ &this.wakeup_completion,
+ this.event_fd,
+ &this.eventfd_buf,
+ null,
+ );
+ this.has_queued +|= 1;
+}
+
+pub fn wait(this: *@This(), ptr: anytype, comptime onReady: anytype) void {
+ // Subscribe to wakeups
+ if (this.has_queued == 0) {
+ this.queueForWakeup(@TypeOf(ptr), ptr, onReady);
+ }
+
+ this.tick() catch {};
-pub fn hasNoWork(this: *IO) bool {
- return this.pending_count == 0;
+ if (this.has_queued == 0) {
+ return;
+ }
+ const submitted = this.ring.flush_sq();
+ _ = this.ring.enter(submitted, 1, linux.IORING_ENTER_GETEVENTS) catch 0;
}
-pub fn init(entries_: u12, flags: u32) !IO {
+pub fn init(entries_: u12, flags: u32, event_fd: os.fd_t) !IO {
var ring: IO_Uring = undefined;
var entries = entries_;
@@ -480,6 +515,7 @@ pub fn init(entries_: u12, flags: u32) !IO {
}
var limit = linux.rlimit{ .cur = 0, .max = 0 };
+
if (linux.getrlimit(.MEMLOCK, &limit) == 0) {
if (limit.cur < 16 * 1024) {
return error.@"memlock is too low. Please increase it to at least 64k";
@@ -505,7 +541,7 @@ pub fn init(entries_: u12, flags: u32) !IO {
break;
}
- return IO{ .ring = ring };
+ return IO{ .ring = ring, .event_fd = event_fd };
}
pub fn deinit(self: *IO) void {
@@ -542,6 +578,8 @@ pub fn tick(self: *IO) !void {
/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
/// in the timespec struct.
pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
+ assert(nanoseconds > 0);
+
while (self.next_tick.pop()) |completion| {
completion.complete();
}
@@ -600,7 +638,9 @@ fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
}
fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
- var cqes: [256]io_uring_cqe = undefined;
+ var cqes: [256]std.os.linux.io_uring_cqe = undefined;
+ var completion_byttes = std.mem.asBytes(&cqes);
+ @memset(completion_byttes, 0, completion_byttes.len);
var wait_remaining = wait_nr;
while (true) {
// Guard against waiting indefinitely (if there are too few requests inflight),
@@ -1038,7 +1078,6 @@ pub fn accept(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1081,7 +1120,6 @@ pub fn close(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1095,7 +1133,6 @@ pub fn close(
if (features.close_blocking) {
const rc = linux.close(fd);
completion.result = @intCast(i32, rc);
- self.pending_count +|= 1;
self.next_tick.push(completion);
return;
}
@@ -1139,7 +1176,6 @@ pub fn connect(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1191,7 +1227,6 @@ pub fn fsync(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1238,7 +1273,6 @@ pub fn read(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1290,7 +1324,6 @@ pub fn recv(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1326,7 +1359,6 @@ pub fn readev(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1383,7 +1415,6 @@ pub fn send(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1476,7 +1507,6 @@ pub fn open(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1514,7 +1544,6 @@ pub fn writev(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1553,7 +1582,6 @@ pub fn timeout(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1603,7 +1631,6 @@ pub fn write(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1623,7 +1650,10 @@ pub fn write(
}
inline fn enqueueNew(self: *IO, completion: *Completion) void {
- self.pending_count +|= 1;
+ self.enqueue(completion);
+}
+
+pub fn wake(self: *IO, completion: *Completion) void {
self.enqueue(completion);
}
@@ -1658,7 +1688,7 @@ const Syscall = struct {
};
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
- return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC, protocol);
+ return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC | os.SOCK.NONBLOCK, protocol);
}
pub var global: IO = undefined;