diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/analytics/analytics_thread.zig | 2 | ||||
m--------- | src/bun.js/WebKit | 0 | ||||
-rw-r--r-- | src/bun.js/api/bun.zig | 2 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 3 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 2 | ||||
-rw-r--r-- | src/bun_js.zig | 2 | ||||
-rw-r--r-- | src/cli/create_command.zig | 2 | ||||
-rw-r--r-- | src/http_client_async.zig | 37 | ||||
-rw-r--r-- | src/install/install.zig | 49 | ||||
-rw-r--r-- | src/io/io_linux.zig | 74 | ||||
-rw-r--r-- | src/network_thread.zig | 110 | ||||
-rw-r--r-- | src/output.zig | 2 | ||||
-rw-r--r-- | src/thread_pool.zig | 28 | ||||
-rw-r--r-- | src/watcher.zig | 2 |
15 files changed, 260 insertions, 57 deletions
diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig index c01189c7c..82f435025 100644 --- a/src/analytics/analytics_thread.zig +++ b/src/analytics/analytics_thread.zig @@ -389,7 +389,7 @@ var out_buffer: MutableString = undefined; var event_list: EventList = undefined; fn readloop() anyerror!void { defer disabled = true; - Output.Source.configureNamedThread(thread, "Analytics"); + Output.Source.configureNamedThread("Analytics"); defer Output.flush(); event_list = EventList.init(); diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit -Subproject e773c96d39675e9b190c26087d671c933e999c5 +Subproject 7e7774dabf1c2d94fe3604defb3c54a4c989c3b diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index d1260b14b..45e7cb1a1 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -2237,7 +2237,7 @@ pub const Timer = struct { pub fn run(this: *Timeout, _task: *TimeoutTask) void { this.io_task = _task; - NetworkThread.global.pool.io.?.timeout( + NetworkThread.global.io.timeout( *Timeout, this, onCallback, diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index baf96937c..132572422 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -177,7 +177,7 @@ pub fn IOTask(comptime Context: type) type { pub fn schedule(this: *This) void { NetworkThread.init() catch return; - NetworkThread.global.pool.schedule(NetworkThread.Batch.from(&this.task)); + NetworkThread.global.schedule(NetworkThread.Batch.from(&this.task)); } pub fn onFinish(this: *This) void { diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index a48ca0a2e..d37c36e7c 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -826,13 +826,14 @@ pub const Fetch = struct { timeout: usize, request_body_store: ?*Blob.Store, ) !*FetchTasklet.Pool.Node { + try NetworkThread.init(); var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store); node.data.global_this = global; node.data.http.callback = callback; var batch = NetworkThread.Batch{}; node.data.http.schedule(allocator, &batch); - NetworkThread.global.pool.schedule(batch); + NetworkThread.global.schedule(batch); VirtualMachine.vm.active_tasks +|= 1; return node; } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index de9115666..82f27bdb5 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2491,7 +2491,7 @@ pub const FileBlobLoader = struct { std.debug.assert(this.started); NetworkThread.init() catch {}; this.concurrent.chunk_size = chunk_size; - NetworkThread.global.pool.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 }); + NetworkThread.global.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 }); } const default_fifo_chunk_size = 1024; diff --git a/src/bun_js.zig b/src/bun_js.zig index 896b25627..3683ea747 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -141,7 +141,7 @@ pub const Run = struct { i +%= 1; if (i > 0 and i % 100 == 0) { - std.time.sleep(std.time.ns_per_us); + this.vm.global.vm().runGC(true); } } diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index d0760bd07..a0017dd34 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -2149,7 +2149,7 @@ const GitHandler = struct { PATH: string, verbose: bool, ) void { - Output.Source.configureNamedThread(thread, "git"); + Output.Source.configureNamedThread("git"); defer Output.flush(); const outcome = if (verbose) run(destination, PATH, true) catch false diff --git a/src/http_client_async.zig b/src/http_client_async.zig index f24f27f81..b3a06a721 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -38,10 +38,15 @@ pub var default_arena: Arena = undefined; const log = Output.scoped(.fetch, true); pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque { + onThreadStartNew(0); + return null; +} + +pub fn onThreadStartNew(event_fd: os.fd_t) void { default_arena = Arena.init() catch unreachable; default_allocator = default_arena.allocator(); NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(default_allocator); - AsyncIO.global = AsyncIO.init(1024, 0) catch |err| { + AsyncIO.global = AsyncIO.init(1024, 0, event_fd) catch |err| { log: { if (comptime Environment.isLinux) { if (err == error.SystemOutdated) { @@ -105,10 +110,17 @@ pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque { }; AsyncIO.global_loaded = true; - NetworkThread.global.pool.io = &AsyncIO.global; - Global.setThreadName("HTTP"); + NetworkThread.global.io = &AsyncIO.global; + if (comptime !Environment.isLinux) { + NetworkThread.global.pool.io = &AsyncIO.global; + } + + Output.Source.configureNamedThread("HTTP"); AsyncBIO.initBoringSSL(); - return null; + + if (comptime Environment.isLinux) { + NetworkThread.global.processEvents(); + } } pub inline fn getAllocator() std.mem.Allocator { @@ -128,7 +140,7 @@ else pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; -pub const extremely_verbose = Environment.isDebug; +pub const extremely_verbose = false; fn writeRequest( comptime Writer: type, @@ -471,7 +483,7 @@ pub const AsyncHTTP = struct { var batch = NetworkThread.Batch{}; this.schedule(bun.default_allocator, &batch); - NetworkThread.global.pool.schedule(batch); + NetworkThread.global.schedule(batch); while (true) { var data = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); var async_http: *AsyncHTTP = data.channel.readItem() catch unreachable; @@ -509,17 +521,17 @@ pub const AsyncHTTP = struct { pub fn do(sender: *HTTPSender, this: *AsyncHTTP) void { defer { - NetworkThread.global.pool.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 }); + NetworkThread.global.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 }); } outer: { this.err = null; this.state.store(.sending, .Monotonic); - var timer = std.time.Timer.start() catch @panic("Timer failure"); - defer this.elapsed = timer.read(); + const start = NetworkThread.global.timer.read(); + defer this.elapsed = NetworkThread.global.timer.read() -| start; - this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| { + this.response = this.client.send(this.request_body.list.items, this.response_buffer) catch |err| { this.state.store(.fail, .Monotonic); this.err = err; @@ -527,7 +539,7 @@ pub const AsyncHTTP = struct { this.retries_count += 1; this.response_buffer.reset(); - NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&this.task)); + NetworkThread.global.schedule(ThreadPool.Batch.from(&this.task)); return; } break :outer; @@ -654,7 +666,7 @@ pub fn connect( connector: ConnectType, ) !void { const port = this.url.getPortAuto(); - + if (this.verbose) Output.prettyErrorln("<d>[HTTP]<r> Connecting to {s}:{d}", .{ this.url.href, port }); try connector.connect(this.url.hostname, port); std.debug.assert(this.socket.socket.socket > 0); var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; @@ -741,6 +753,7 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin } try writeRequest(@TypeOf(socket), socket, request, body); + _ = try socket.send(); this.stage = Stage.response; if (this.progress_node == null) { diff --git a/src/install/install.zig b/src/install/install.zig index 250d325c2..a1523b076 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -50,6 +50,7 @@ const z_allocator = @import("../memory_allocator.zig").z_allocator; const Syscall = @import("javascript_core").Node.Syscall; const RunCommand = @import("../cli/run_command.zig").RunCommand; threadlocal var initialized_store = false; +const Futex = @import("../futex.zig"); pub const Lockfile = @import("./lockfile.zig"); @@ -183,6 +184,7 @@ const NetworkTask = struct { }, pub fn notify(http: *AsyncHTTP) void { + defer PackageManager.instance.wake(); PackageManager.instance.network_channel.writeItem(@fieldParentPtr(NetworkTask, "http", http)) catch {}; } @@ -515,6 +517,8 @@ const Task = struct { var this = @fieldParentPtr(Task, "threadpool_task", task); + defer PackageManager.instance.wake(); + switch (this.tag) { .package_manifest => { var allocator = PackageManager.instance.allocator; @@ -1468,6 +1472,9 @@ pub const PackageManager = struct { global_dir: ?std.fs.Dir = null, global_link_dir_path: string = "", + sleepy: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + sleep_delay_counter: u32 = 0, + const PreallocatedNetworkTasks = std.BoundedArray(NetworkTask, 1024); const NetworkTaskQueue = std.HashMapUnmanaged(u64, void, IdentityContext(u64), 80); const PackageIndex = std.AutoHashMapUnmanaged(u64, *Package); @@ -1480,6 +1487,14 @@ pub const PackageManager = struct { 80, ); + pub fn wake(this: *PackageManager) void { + Futex.wake(&this.sleepy, 1); + } + + pub fn sleep(this: *PackageManager) void { + Futex.wait(&this.sleepy, 1, std.time.ns_per_ms * 16) catch {}; + } + pub fn globalLinkDir(this: *PackageManager) !std.fs.Dir { return this.global_link_dir orelse brk: { var global_dir = try Options.openGlobalDir(this.options.explicit_global_directory); @@ -2409,7 +2424,7 @@ pub const PackageManager = struct { manager.pending_tasks += @truncate(u32, count); manager.total_tasks += @truncate(u32, count); manager.network_resolve_batch.push(manager.network_tarball_batch); - NetworkThread.global.pool.schedule(manager.network_resolve_batch); + NetworkThread.global.schedule(manager.network_resolve_batch); manager.network_tarball_batch = .{}; manager.network_resolve_batch = .{}; return count; @@ -2448,7 +2463,7 @@ pub const PackageManager = struct { this.pending_tasks += @truncate(u32, count); this.total_tasks += @truncate(u32, count); this.network_resolve_batch.push(this.network_tarball_batch); - NetworkThread.global.pool.schedule(this.network_resolve_batch); + NetworkThread.global.schedule(this.network_resolve_batch); this.network_tarball_batch = .{}; this.network_resolve_batch = .{}; } @@ -2489,7 +2504,10 @@ pub const PackageManager = struct { ) anyerror!void { var batch = ThreadPool.Batch{}; var has_updated_this_run = false; + var maybe_sleep = true; + while (manager.network_channel.tryReadItem() catch null) |task_| { + maybe_sleep = false; var task: *NetworkTask = task_; manager.pending_tasks -|= 1; @@ -2704,6 +2722,7 @@ pub const PackageManager = struct { } while (manager.resolve_tasks.tryReadItem() catch null) |task_| { + maybe_sleep = false; manager.pending_tasks -= 1; var task: Task = task_; @@ -2812,7 +2831,7 @@ pub const PackageManager = struct { manager.total_tasks += @truncate(u32, count); manager.thread_pool.schedule(batch); manager.network_resolve_batch.push(manager.network_tarball_batch); - NetworkThread.global.pool.schedule(manager.network_resolve_batch); + NetworkThread.global.schedule(manager.network_resolve_batch); manager.network_tarball_batch = .{}; manager.network_resolve_batch = .{}; @@ -2829,6 +2848,16 @@ pub const PackageManager = struct { manager.progress.maybeRefresh(); } } + + manager.sleep_delay_counter = if (maybe_sleep) + manager.sleep_delay_counter + 1 + else + 0; + + if (manager.sleep_delay_counter >= 5) { + manager.sleep_delay_counter = 0; + manager.sleep(); + } } pub const Options = struct { @@ -5191,9 +5220,6 @@ pub const PackageManager = struct { const cwd = std.fs.cwd(); - // sleep goes off, only need to set it once because it will have an impact on the next network request - NetworkThread.global.pool.sleep_on_idle_network_thread = false; - while (iterator.nextNodeModulesFolder()) |node_modules| { try cwd.makePath(std.mem.span(node_modules.relative_path)); // We deliberately do not close this folder. @@ -5360,7 +5386,6 @@ pub const PackageManager = struct { comptime log_level: Options.LogLevel, ) !void { // sleep off for maximum network throughput - NetworkThread.global.pool.sleep_on_idle_network_thread = false; var load_lockfile_result: Lockfile.LoadFromDiskResult = if (manager.options.do.load_lockfile) manager.lockfile.loadFromDisk( @@ -5617,8 +5642,13 @@ pub const PackageManager = struct { Output.flush(); } - while (manager.pending_tasks > 0) { - try manager.runTasks(void, void{}, null, log_level); + { + manager.sleepy.store(1, .Monotonic); + defer manager.sleepy.store(0, .Monotonic); + + while (manager.pending_tasks > 0) { + try manager.runTasks(void, void{}, null, log_level); + } } if (comptime log_level.showProgress()) { @@ -5645,7 +5675,6 @@ pub const PackageManager = struct { } // sleep on since we might not need it anymore - NetworkThread.global.pool.sleep_on_idle_network_thread = true; const needs_clean_lockfile = had_any_diffs or needs_new_lockfile or manager.package_json_updates.len > 0; var did_meta_hash_change = needs_clean_lockfile; diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index b4f21dee5..d7a164665 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -448,8 +448,6 @@ const IO = @This(); ring: IO_Uring, -pending_count: usize = 0, - /// Operations not yet submitted to the kernel and waiting on available space in the /// submission queue. unqueued: FIFO(Completion) = .{}, @@ -458,12 +456,49 @@ unqueued: FIFO(Completion) = .{}, completed: FIFO(Completion) = .{}, next_tick: FIFO(Completion) = .{}, +event_fd: linux.fd_t = 0, + +eventfd_buf: [16]u8 = undefined, +has_queued: usize = 0, +wakeup_completion: Completion = undefined, + +fn queueForWakeup(this: *@This(), comptime Type: type, ctx: Type, comptime cb: anytype) void { + @memset(&this.eventfd_buf, 0, this.eventfd_buf.len); + const Callback = struct { + pub fn callback(that: Type, completion: *Completion, _: ReadError!usize) void { + var io = @fieldParentPtr(IO, "wakeup_completion", completion); + io.has_queued -|= 1; + cb(that); + } + }; + this.read( + Type, + ctx, + Callback.callback, + &this.wakeup_completion, + this.event_fd, + &this.eventfd_buf, + null, + ); + this.has_queued +|= 1; +} + +pub fn wait(this: *@This(), ptr: anytype, comptime onReady: anytype) void { + // Subscribe to wakeups + if (this.has_queued == 0) { + this.queueForWakeup(@TypeOf(ptr), ptr, onReady); + } + + this.tick() catch {}; -pub fn hasNoWork(this: *IO) bool { - return this.pending_count == 0; + if (this.has_queued == 0) { + return; + } + const submitted = this.ring.flush_sq(); + _ = this.ring.enter(submitted, 1, linux.IORING_ENTER_GETEVENTS) catch 0; } -pub fn init(entries_: u12, flags: u32) !IO { +pub fn init(entries_: u12, flags: u32, event_fd: os.fd_t) !IO { var ring: IO_Uring = undefined; var entries = entries_; @@ -480,6 +515,7 @@ pub fn init(entries_: u12, flags: u32) !IO { } var limit = linux.rlimit{ .cur = 0, .max = 0 }; + if (linux.getrlimit(.MEMLOCK, &limit) == 0) { if (limit.cur < 16 * 1024) { return error.@"memlock is too low. Please increase it to at least 64k"; @@ -505,7 +541,7 @@ pub fn init(entries_: u12, flags: u32) !IO { break; } - return IO{ .ring = ring }; + return IO{ .ring = ring, .event_fd = event_fd }; } pub fn deinit(self: *IO) void { @@ -542,6 +578,8 @@ pub fn tick(self: *IO) !void { /// The `nanoseconds` argument is a u63 to allow coercion to the i64 used /// in the timespec struct. pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { + assert(nanoseconds > 0); + while (self.next_tick.pop()) |completion| { completion.complete(); } @@ -600,7 +638,9 @@ fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { } fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void { - var cqes: [256]io_uring_cqe = undefined; + var cqes: [256]std.os.linux.io_uring_cqe = undefined; + var completion_byttes = std.mem.asBytes(&cqes); + @memset(completion_byttes, 0, completion_byttes.len); var wait_remaining = wait_nr; while (true) { // Guard against waiting indefinitely (if there are too few requests inflight), @@ -1038,7 +1078,6 @@ pub fn accept( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1081,7 +1120,6 @@ pub fn close( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1095,7 +1133,6 @@ pub fn close( if (features.close_blocking) { const rc = linux.close(fd); completion.result = @intCast(i32, rc); - self.pending_count +|= 1; self.next_tick.push(completion); return; } @@ -1139,7 +1176,6 @@ pub fn connect( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1191,7 +1227,6 @@ pub fn fsync( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1238,7 +1273,6 @@ pub fn read( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1290,7 +1324,6 @@ pub fn recv( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1326,7 +1359,6 @@ pub fn readev( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1383,7 +1415,6 @@ pub fn send( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1476,7 +1507,6 @@ pub fn open( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1514,7 +1544,6 @@ pub fn writev( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1553,7 +1582,6 @@ pub fn timeout( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1603,7 +1631,6 @@ pub fn write( .context = context, .callback = struct { fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { - comp.io.pending_count -|= 1; callback( @intToPtr(Context, @ptrToInt(ctx)), comp, @@ -1623,7 +1650,10 @@ pub fn write( } inline fn enqueueNew(self: *IO, completion: *Completion) void { - self.pending_count +|= 1; + self.enqueue(completion); +} + +pub fn wake(self: *IO, completion: *Completion) void { self.enqueue(completion); } @@ -1658,7 +1688,7 @@ const Syscall = struct { }; pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { - return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC, protocol); + return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC | os.SOCK.NONBLOCK, protocol); } pub var global: IO = undefined; diff --git a/src/network_thread.zig b/src/network_thread.zig index 97ee1cadc..24110b6a3 100644 --- a/src/network_thread.zig +++ b/src/network_thread.zig @@ -1,6 +1,7 @@ const ThreadPool = @import("thread_pool"); pub const Batch = ThreadPool.Batch; pub const Task = ThreadPool.Task; +const Node = ThreadPool.Node; pub const Completion = AsyncIO.Completion; const std = @import("std"); pub const AsyncIO = @import("io"); @@ -8,13 +9,110 @@ const Output = @import("./global.zig").Output; const IdentityContext = @import("./identity_context.zig").IdentityContext; const HTTP = @import("./http_client_async.zig"); const NetworkThread = @This(); +const Environment = @import("./global.zig").Environment; +const Lock = @import("./lock.zig").Lock; +const FIFO = @import("./io/fifo.zig").FIFO; /// Single-thread in this pool pool: ThreadPool, +io: *AsyncIO = undefined, +thread: std.Thread = undefined, +event_fd: std.os.fd_t = 0, +queued_tasks_mutex: Lock = Lock.init(), +queued_tasks: Batch = .{}, +head: ?*Node = null, +tail: ?*Node = null, +timer: std.time.Timer = undefined, pub var global: NetworkThread = undefined; pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0); +const log = Output.scoped(.NetworkThread, true); + +fn queueEvents(this: *@This()) void { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + if (this.queued_tasks.len == 0) + return; + log("Received {d} tasks\n", .{this.queued_tasks.len}); + if (this.tail) |tail| { + std.debug.assert(tail.next == null); + tail.next = &this.queued_tasks.head.?.node; + this.tail = &this.queued_tasks.tail.?.node; + } else { + this.head = &this.queued_tasks.head.?.node; + this.tail = &this.queued_tasks.tail.?.node; + } + this.queued_tasks = .{}; +} + +pub fn processEvents(this: *@This()) void { + processEvents_(this) catch {}; + unreachable; +} +/// Should only be called on the HTTP thread! +fn processEvents_(this: *@This()) !void { + { + var bytes: [8]u8 = undefined; + _ = std.os.read(this.event_fd, &bytes) catch 0; + } + + while (true) { + this.queueEvents(); + + var count: usize = 0; + + while (this.head) |node| { + if (node == this.tail) { + this.tail = null; + } + this.head = node.next; + node.next = null; + var task = @fieldParentPtr(Task, "node", node); + var callback = task.callback; + callback(task); + if (comptime Environment.allow_assert) { + count += 1; + } + } + + if (comptime Environment.allow_assert) { + if (count > 0) + log("Processed {d} tasks\n", .{count}); + } + + var start: i128 = 0; + if (comptime Environment.isDebug) { + start = std.time.nanoTimestamp(); + } + Output.flush(); + this.io.wait(this, queueEvents); + if (comptime Environment.isDebug) { + var end = std.time.nanoTimestamp(); + log("Waited {any}\n", .{std.fmt.fmtDurationSigned(@truncate(i64, end - start))}); + Output.flush(); + } + } +} + +pub fn schedule(this: *@This(), batch: Batch) void { + if (comptime Environment.isLinux) { + if (batch.len == 0) + return; + + { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + this.queued_tasks.push(batch); + } + + const one = @bitCast([8]u8, @as(usize, batch.len)); + _ = std.os.write(this.event_fd, &one) catch @panic("Failed to write to eventfd"); + } else { + this.pool.schedule(batch); + } +} + const CachedAddressList = struct { address_list: *std.net.AddressList, expire_after: u64, @@ -67,7 +165,8 @@ pub fn warmup() !void { if (has_warmed or global_loaded.load(.Monotonic) > 0) return; has_warmed = true; try init(); - global.pool.forceSpawn(); + if (comptime !Environment.isLinux) + global.pool.forceSpawn(); } pub fn init() !void { @@ -76,6 +175,15 @@ pub fn init() !void { global = NetworkThread{ .pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }), + .timer = try std.time.Timer.start(), }; global.pool.on_thread_spawn = HTTP.onThreadStart; + if (comptime Environment.isLinux) { + const event_fd = try std.os.eventfd(0, std.os.linux.EFD.CLOEXEC | 0); + global.event_fd = event_fd; + global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{ + @intCast(std.os.fd_t, event_fd), + }); + global.thread.detach(); + } } diff --git a/src/output.zig b/src/output.zig index 4712c97f1..6ae8ae4ff 100644 --- a/src/output.zig +++ b/src/output.zig @@ -88,7 +88,7 @@ pub const Source = struct { source = Source.init(stdout_stream, stderr_stream); } - pub fn configureNamedThread(_: std.Thread, name: StringTypes.stringZ) void { + pub fn configureNamedThread(name: StringTypes.stringZ) void { Global.setThreadName(name); configureThread(); } diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 03fe6f211..8839d2090 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -84,7 +84,26 @@ pub const Batch = struct { head: ?*Task = null, tail: ?*Task = null, - /// Create a batch from a single task. + pub fn pop(this: *Batch) ?*Task { + const len = @atomicLoad(usize, &this.len, .Monotonic); + if (len == 0) { + return null; + } + var task = this.head.?; + if (task.node.next) |node| { + this.head = @fieldParentPtr(Task, "node", node); + } else { + this.head = null; + } + + this.len -= 1; + if (len == 0) { + this.tail = null; + } + return task; + } + + /// Create a batch from a single task. pub fn from(task: *Task) Batch { return Batch{ .len = 1, @@ -276,6 +295,9 @@ fn _wait(self: *ThreadPool, _is_waking: bool, comptime sleep_on_idle: bool) erro } } else { if (self.io) |io| { + if (comptime Environment.isLinux) + unreachable; + const HTTP = @import("http"); io.tick() catch {}; @@ -483,7 +505,7 @@ pub const Thread = struct { }; /// An event which stores 1 semaphore token and is multi-threaded safe. -/// The event can be shutdown(), waking up all wait()ing threads and +/// The event can be shutdown(), waking up all wait()ing threads and /// making subsequent wait()'s return immediately. const Event = struct { state: Atomic(u32) = Atomic(u32).init(EMPTY), @@ -621,7 +643,7 @@ const Event = struct { }; /// Linked list intrusive memory node and lock-free data structures to operate with it -const Node = struct { +pub const Node = struct { next: ?*Node = null, /// A linked list of Nodes diff --git a/src/watcher.zig b/src/watcher.zig index c1b371217..138edca44 100644 --- a/src/watcher.zig +++ b/src/watcher.zig @@ -348,7 +348,7 @@ pub fn NewWatcher(comptime ContextType: type) type { // This must only be called from the watcher thread pub fn watchLoop(this: *Watcher) !void { this.watchloop_handle = std.Thread.getCurrentId(); - Output.Source.configureNamedThread(this.thread, "File Watcher"); + Output.Source.configureNamedThread("File Watcher"); defer Output.flush(); if (FeatureFlags.verbose_watcher) Output.prettyln("Watcher started", .{}); |