aboutsummaryrefslogtreecommitdiff
path: root/src/network_thread.zig
diff options
context:
space:
mode:
authorGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-08-13 06:07:18 -0700
committerGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-08-13 06:07:18 -0700
commit0815c83974729d47ae220a7206e7df2de45981e6 (patch)
treef128e71a8bd6560512fd4800d61357031fea513f /src/network_thread.zig
parent65ca0503a7c1f775638ecbb363c6d423a1cd0d89 (diff)
downloadbun-0815c83974729d47ae220a7206e7df2de45981e6.tar.gz
bun-0815c83974729d47ae220a7206e7df2de45981e6.tar.zst
bun-0815c83974729d47ae220a7206e7df2de45981e6.zip
Improve event loop reliability on Linux
Diffstat (limited to 'src/network_thread.zig')
-rw-r--r--src/network_thread.zig110
1 files changed, 109 insertions, 1 deletions
diff --git a/src/network_thread.zig b/src/network_thread.zig
index 97ee1cadc..24110b6a3 100644
--- a/src/network_thread.zig
+++ b/src/network_thread.zig
@@ -1,6 +1,7 @@
const ThreadPool = @import("thread_pool");
pub const Batch = ThreadPool.Batch;
pub const Task = ThreadPool.Task;
+const Node = ThreadPool.Node;
pub const Completion = AsyncIO.Completion;
const std = @import("std");
pub const AsyncIO = @import("io");
@@ -8,13 +9,110 @@ const Output = @import("./global.zig").Output;
const IdentityContext = @import("./identity_context.zig").IdentityContext;
const HTTP = @import("./http_client_async.zig");
const NetworkThread = @This();
+const Environment = @import("./global.zig").Environment;
+const Lock = @import("./lock.zig").Lock;
+const FIFO = @import("./io/fifo.zig").FIFO;
/// Single-thread in this pool
pool: ThreadPool,
+io: *AsyncIO = undefined,
+thread: std.Thread = undefined,
+event_fd: std.os.fd_t = 0,
+queued_tasks_mutex: Lock = Lock.init(),
+queued_tasks: Batch = .{},
+head: ?*Node = null,
+tail: ?*Node = null,
+timer: std.time.Timer = undefined,
pub var global: NetworkThread = undefined;
pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0);
+const log = Output.scoped(.NetworkThread, true);
+
+fn queueEvents(this: *@This()) void {
+ this.queued_tasks_mutex.lock();
+ defer this.queued_tasks_mutex.unlock();
+ if (this.queued_tasks.len == 0)
+ return;
+ log("Received {d} tasks\n", .{this.queued_tasks.len});
+ if (this.tail) |tail| {
+ std.debug.assert(tail.next == null);
+ tail.next = &this.queued_tasks.head.?.node;
+ this.tail = &this.queued_tasks.tail.?.node;
+ } else {
+ this.head = &this.queued_tasks.head.?.node;
+ this.tail = &this.queued_tasks.tail.?.node;
+ }
+ this.queued_tasks = .{};
+}
+
+pub fn processEvents(this: *@This()) void {
+ processEvents_(this) catch {};
+ unreachable;
+}
+/// Should only be called on the HTTP thread!
+fn processEvents_(this: *@This()) !void {
+ {
+ var bytes: [8]u8 = undefined;
+ _ = std.os.read(this.event_fd, &bytes) catch 0;
+ }
+
+ while (true) {
+ this.queueEvents();
+
+ var count: usize = 0;
+
+ while (this.head) |node| {
+ if (node == this.tail) {
+ this.tail = null;
+ }
+ this.head = node.next;
+ node.next = null;
+ var task = @fieldParentPtr(Task, "node", node);
+ var callback = task.callback;
+ callback(task);
+ if (comptime Environment.allow_assert) {
+ count += 1;
+ }
+ }
+
+ if (comptime Environment.allow_assert) {
+ if (count > 0)
+ log("Processed {d} tasks\n", .{count});
+ }
+
+ var start: i128 = 0;
+ if (comptime Environment.isDebug) {
+ start = std.time.nanoTimestamp();
+ }
+ Output.flush();
+ this.io.wait(this, queueEvents);
+ if (comptime Environment.isDebug) {
+ var end = std.time.nanoTimestamp();
+ log("Waited {any}\n", .{std.fmt.fmtDurationSigned(@truncate(i64, end - start))});
+ Output.flush();
+ }
+ }
+}
+
+pub fn schedule(this: *@This(), batch: Batch) void {
+ if (comptime Environment.isLinux) {
+ if (batch.len == 0)
+ return;
+
+ {
+ this.queued_tasks_mutex.lock();
+ defer this.queued_tasks_mutex.unlock();
+ this.queued_tasks.push(batch);
+ }
+
+ const one = @bitCast([8]u8, @as(usize, batch.len));
+ _ = std.os.write(this.event_fd, &one) catch @panic("Failed to write to eventfd");
+ } else {
+ this.pool.schedule(batch);
+ }
+}
+
const CachedAddressList = struct {
address_list: *std.net.AddressList,
expire_after: u64,
@@ -67,7 +165,8 @@ pub fn warmup() !void {
if (has_warmed or global_loaded.load(.Monotonic) > 0) return;
has_warmed = true;
try init();
- global.pool.forceSpawn();
+ if (comptime !Environment.isLinux)
+ global.pool.forceSpawn();
}
pub fn init() !void {
@@ -76,6 +175,15 @@ pub fn init() !void {
global = NetworkThread{
.pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }),
+ .timer = try std.time.Timer.start(),
};
global.pool.on_thread_spawn = HTTP.onThreadStart;
+ if (comptime Environment.isLinux) {
+ const event_fd = try std.os.eventfd(0, std.os.linux.EFD.CLOEXEC | 0);
+ global.event_fd = event_fd;
+ global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{
+ @intCast(std.os.fd_t, event_fd),
+ });
+ global.thread.detach();
+ }
}