diff options
Diffstat (limited to '')
-rw-r--r-- | src/thread_pool.zig | 58 |
1 files changed, 49 insertions, 9 deletions
diff --git a/src/thread_pool.zig b/src/thread_pool.zig index f287dd866..9b6951cbb 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -63,6 +63,11 @@ pub fn init(config: Config) ThreadPool { }; } +pub fn wakeForIdleEvents(this: *ThreadPool) void { + // Wake all the threads to check for idle events. + this.idle_event.wake(Event.NOTIFIED, std.math.maxInt(u32)); +} + /// Wait for a thread to call shutdown() on the thread pool and kill the worker threads. pub fn deinit(self: *ThreadPool) void { self.join(); @@ -483,8 +488,6 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { } } -// sleep_on_idle seems to impact `bun install` performance negatively -// so we can just not sleep for that noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { var is_idle = false; var is_waking = _is_waking; @@ -530,6 +533,10 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { continue; }); } else { + if (Thread.current) |current| { + current.drainIdleEvents(); + } + self.idle_event.wait(); sync = @bitCast(Sync, self.sync.load(.Monotonic)); } @@ -574,6 +581,17 @@ fn register(noalias self: *ThreadPool, noalias thread: *Thread) void { } } +pub fn setThreadContext(noalias pool: *ThreadPool, ctx: ?*anyopaque) void { + pool.threadpool_context = ctx; + + var thread = pool.threads.load(.Monotonic) orelse return; + thread.ctx = pool.threadpool_context; + while (thread.next) |next| { + next.ctx = pool.threadpool_context; + thread = next; + } +} + fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { // Un-spawn one thread, either due to a failed OS thread spawning or the thread is exiting. const one_spawned = @bitCast(u32, Sync{ .spawned = 1 }); @@ -621,25 +639,35 @@ pub const Thread = struct { target: ?*Thread = null, join_event: Event = .{}, run_queue: Node.Queue = .{}, + idle_queue: Node.Queue = .{}, run_buffer: Node.Buffer = .{}, ctx: ?*anyopaque = null, pub threadlocal var current: ?*Thread = null; + pub fn pushIdleTask(self: *Thread, task: *Task) void { + const list = Node.List{ + .head = &task.node, + .tail = &task.node, + }; + self.idle_queue.push(list); + } + /// Thread entry point which runs a worker for the ThreadPool fn run(thread_pool: *ThreadPool) void { - Output.Source.configureThread(); + Output.Source.configureNamedThread("Bun Pool"); - var self = Thread{}; - current = &self; + var self_ = Thread{}; + var self = &self_; + current = self; if (thread_pool.on_thread_spawn) |spawn| { current.?.ctx = spawn(thread_pool.threadpool_context); } - thread_pool.register(&self); + thread_pool.register(self); - defer thread_pool.unregister(&self); + defer thread_pool.unregister(self); var is_waking = false; while (true) { @@ -653,13 +681,25 @@ pub const Thread = struct { const task = @fieldParentPtr(Task, "node", result.node); (task.callback)(task); } + Output.flush(); + + self.drainIdleEvents(); + } + } + + pub fn drainIdleEvents(noalias self: *Thread) void { + var consumer = self.idle_queue.tryAcquireConsumer() catch return; + defer self.idle_queue.releaseConsumer(consumer); + while (self.idle_queue.pop(&consumer)) |node| { + const task = @fieldParentPtr(Task, "node", node); + (task.callback)(task); } } /// Try to dequeue a Node/Task from the ThreadPool. /// Spurious reports of dequeue() returning empty are allowed. - fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole { + pub fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole { // Check our local buffer first if (self.run_buffer.pop()) |node| { return Node.Buffer.Stole{ @@ -714,7 +754,7 @@ const Event = struct { const EMPTY = 0; const WAITING = 1; - const NOTIFIED = 2; + pub const NOTIFIED = 2; const SHUTDOWN = 3; /// Wait for and consume a notification |