aboutsummaryrefslogtreecommitdiff
path: root/src/thread_pool.zig
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/thread_pool.zig58
1 files changed, 49 insertions, 9 deletions
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index f287dd866..9b6951cbb 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -63,6 +63,11 @@ pub fn init(config: Config) ThreadPool {
};
}
+pub fn wakeForIdleEvents(this: *ThreadPool) void {
+ // Wake all the threads to check for idle events.
+ this.idle_event.wake(Event.NOTIFIED, std.math.maxInt(u32));
+}
+
/// Wait for a thread to call shutdown() on the thread pool and kill the worker threads.
pub fn deinit(self: *ThreadPool) void {
self.join();
@@ -483,8 +488,6 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
}
}
-// sleep_on_idle seems to impact `bun install` performance negatively
-// so we can just not sleep for that
noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
var is_idle = false;
var is_waking = _is_waking;
@@ -530,6 +533,10 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
continue;
});
} else {
+ if (Thread.current) |current| {
+ current.drainIdleEvents();
+ }
+
self.idle_event.wait();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
@@ -574,6 +581,17 @@ fn register(noalias self: *ThreadPool, noalias thread: *Thread) void {
}
}
+pub fn setThreadContext(noalias pool: *ThreadPool, ctx: ?*anyopaque) void {
+ pool.threadpool_context = ctx;
+
+ var thread = pool.threads.load(.Monotonic) orelse return;
+ thread.ctx = pool.threadpool_context;
+ while (thread.next) |next| {
+ next.ctx = pool.threadpool_context;
+ thread = next;
+ }
+}
+
fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
// Un-spawn one thread, either due to a failed OS thread spawning or the thread is exiting.
const one_spawned = @bitCast(u32, Sync{ .spawned = 1 });
@@ -621,25 +639,35 @@ pub const Thread = struct {
target: ?*Thread = null,
join_event: Event = .{},
run_queue: Node.Queue = .{},
+ idle_queue: Node.Queue = .{},
run_buffer: Node.Buffer = .{},
ctx: ?*anyopaque = null,
pub threadlocal var current: ?*Thread = null;
+ pub fn pushIdleTask(self: *Thread, task: *Task) void {
+ const list = Node.List{
+ .head = &task.node,
+ .tail = &task.node,
+ };
+ self.idle_queue.push(list);
+ }
+
/// Thread entry point which runs a worker for the ThreadPool
fn run(thread_pool: *ThreadPool) void {
- Output.Source.configureThread();
+ Output.Source.configureNamedThread("Bun Pool");
- var self = Thread{};
- current = &self;
+ var self_ = Thread{};
+ var self = &self_;
+ current = self;
if (thread_pool.on_thread_spawn) |spawn| {
current.?.ctx = spawn(thread_pool.threadpool_context);
}
- thread_pool.register(&self);
+ thread_pool.register(self);
- defer thread_pool.unregister(&self);
+ defer thread_pool.unregister(self);
var is_waking = false;
while (true) {
@@ -653,13 +681,25 @@ pub const Thread = struct {
const task = @fieldParentPtr(Task, "node", result.node);
(task.callback)(task);
}
+
Output.flush();
+
+ self.drainIdleEvents();
+ }
+ }
+
+ pub fn drainIdleEvents(noalias self: *Thread) void {
+ var consumer = self.idle_queue.tryAcquireConsumer() catch return;
+ defer self.idle_queue.releaseConsumer(consumer);
+ while (self.idle_queue.pop(&consumer)) |node| {
+ const task = @fieldParentPtr(Task, "node", node);
+ (task.callback)(task);
}
}
/// Try to dequeue a Node/Task from the ThreadPool.
/// Spurious reports of dequeue() returning empty are allowed.
- fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole {
+ pub fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole {
// Check our local buffer first
if (self.run_buffer.pop()) |node| {
return Node.Buffer.Stole{
@@ -714,7 +754,7 @@ const Event = struct {
const EMPTY = 0;
const WAITING = 1;
- const NOTIFIED = 2;
+ pub const NOTIFIED = 2;
const SHUTDOWN = 3;
/// Wait for and consume a notification