diff options
author | 2022-08-13 06:07:18 -0700 | |
---|---|---|
committer | 2022-08-13 06:07:18 -0700 | |
commit | 0815c83974729d47ae220a7206e7df2de45981e6 (patch) | |
tree | f128e71a8bd6560512fd4800d61357031fea513f /src/network_thread.zig | |
parent | 65ca0503a7c1f775638ecbb363c6d423a1cd0d89 (diff) | |
download | bun-0815c83974729d47ae220a7206e7df2de45981e6.tar.gz bun-0815c83974729d47ae220a7206e7df2de45981e6.tar.zst bun-0815c83974729d47ae220a7206e7df2de45981e6.zip |
Improve event loop reliability on Linux
Diffstat (limited to 'src/network_thread.zig')
-rw-r--r-- | src/network_thread.zig | 110 |
1 files changed, 109 insertions, 1 deletions
diff --git a/src/network_thread.zig b/src/network_thread.zig index 97ee1cadc..24110b6a3 100644 --- a/src/network_thread.zig +++ b/src/network_thread.zig @@ -1,6 +1,7 @@ const ThreadPool = @import("thread_pool"); pub const Batch = ThreadPool.Batch; pub const Task = ThreadPool.Task; +const Node = ThreadPool.Node; pub const Completion = AsyncIO.Completion; const std = @import("std"); pub const AsyncIO = @import("io"); @@ -8,13 +9,110 @@ const Output = @import("./global.zig").Output; const IdentityContext = @import("./identity_context.zig").IdentityContext; const HTTP = @import("./http_client_async.zig"); const NetworkThread = @This(); +const Environment = @import("./global.zig").Environment; +const Lock = @import("./lock.zig").Lock; +const FIFO = @import("./io/fifo.zig").FIFO; /// Single-thread in this pool pool: ThreadPool, +io: *AsyncIO = undefined, +thread: std.Thread = undefined, +event_fd: std.os.fd_t = 0, +queued_tasks_mutex: Lock = Lock.init(), +queued_tasks: Batch = .{}, +head: ?*Node = null, +tail: ?*Node = null, +timer: std.time.Timer = undefined, pub var global: NetworkThread = undefined; pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0); +const log = Output.scoped(.NetworkThread, true); + +fn queueEvents(this: *@This()) void { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + if (this.queued_tasks.len == 0) + return; + log("Received {d} tasks\n", .{this.queued_tasks.len}); + if (this.tail) |tail| { + std.debug.assert(tail.next == null); + tail.next = &this.queued_tasks.head.?.node; + this.tail = &this.queued_tasks.tail.?.node; + } else { + this.head = &this.queued_tasks.head.?.node; + this.tail = &this.queued_tasks.tail.?.node; + } + this.queued_tasks = .{}; +} + +pub fn processEvents(this: *@This()) void { + processEvents_(this) catch {}; + unreachable; +} +/// Should only be called on the HTTP thread! +fn processEvents_(this: *@This()) !void { + { + var bytes: [8]u8 = undefined; + _ = std.os.read(this.event_fd, &bytes) catch 0; + } + + while (true) { + this.queueEvents(); + + var count: usize = 0; + + while (this.head) |node| { + if (node == this.tail) { + this.tail = null; + } + this.head = node.next; + node.next = null; + var task = @fieldParentPtr(Task, "node", node); + var callback = task.callback; + callback(task); + if (comptime Environment.allow_assert) { + count += 1; + } + } + + if (comptime Environment.allow_assert) { + if (count > 0) + log("Processed {d} tasks\n", .{count}); + } + + var start: i128 = 0; + if (comptime Environment.isDebug) { + start = std.time.nanoTimestamp(); + } + Output.flush(); + this.io.wait(this, queueEvents); + if (comptime Environment.isDebug) { + var end = std.time.nanoTimestamp(); + log("Waited {any}\n", .{std.fmt.fmtDurationSigned(@truncate(i64, end - start))}); + Output.flush(); + } + } +} + +pub fn schedule(this: *@This(), batch: Batch) void { + if (comptime Environment.isLinux) { + if (batch.len == 0) + return; + + { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + this.queued_tasks.push(batch); + } + + const one = @bitCast([8]u8, @as(usize, batch.len)); + _ = std.os.write(this.event_fd, &one) catch @panic("Failed to write to eventfd"); + } else { + this.pool.schedule(batch); + } +} + const CachedAddressList = struct { address_list: *std.net.AddressList, expire_after: u64, @@ -67,7 +165,8 @@ pub fn warmup() !void { if (has_warmed or global_loaded.load(.Monotonic) > 0) return; has_warmed = true; try init(); - global.pool.forceSpawn(); + if (comptime !Environment.isLinux) + global.pool.forceSpawn(); } pub fn init() !void { @@ -76,6 +175,15 @@ pub fn init() !void { global = NetworkThread{ .pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }), + .timer = try std.time.Timer.start(), }; global.pool.on_thread_spawn = HTTP.onThreadStart; + if (comptime Environment.isLinux) { + const event_fd = try std.os.eventfd(0, std.os.linux.EFD.CLOEXEC | 0); + global.event_fd = event_fd; + global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{ + @intCast(std.os.fd_t, event_fd), + }); + global.thread.detach(); + } } |