diff options
Diffstat (limited to 'src/thread_pool.zig')
-rw-r--r-- | src/thread_pool.zig | 74 |
1 files changed, 37 insertions, 37 deletions
diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 3359311d6..402f6b1d6 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -17,7 +17,7 @@ on_thread_spawn: ?OnSpawnCallback = null, threadpool_context: ?*anyopaque = null, stack_size: u32, max_threads: u32, -sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})), +sync: Atomic(u32) = Atomic(u32).init(@as(u32, @bitCast(Sync{}))), idle_event: Event = .{}, join_event: Event = .{}, run_queue: Node.Queue = .{}, @@ -352,7 +352,7 @@ pub fn Do( } } - wait_group.counter += @intCast(u32, values.len); + wait_group.counter += @as(u32, @intCast(values.len)); this.schedule(batch); wait_group.wait(); } @@ -431,7 +431,7 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void { // Fast path to check the Sync state to avoid calling into notifySlow(). // If we're waking, then we need to update the state regardless if (!is_waking) { - const sync = @bitCast(Sync, self.sync.load(.Monotonic)); + const sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); if (sync.notified) { return; } @@ -443,20 +443,20 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void { /// Warm the thread pool up to the given number of threads. /// https://www.youtube.com/watch?v=ys3qcbO5KWw pub fn warm(self: *ThreadPool, count: u14) void { - var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); if (sync.spawned >= count) return; - const to_spawn = @min(count - sync.spawned, @truncate(u14, self.max_threads)); + const to_spawn = @min(count - sync.spawned, @as(u14, @truncate(self.max_threads))); while (sync.spawned < to_spawn) { var new_sync = sync; new_sync.spawned += 1; - sync = @bitCast(Sync, self.sync.tryCompareAndSwap( - @bitCast(u32, sync), - @bitCast(u32, new_sync), + sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap( + @as(u32, @bitCast(sync)), + @as(u32, @bitCast(new_sync)), .Release, .Monotonic, - ) orelse break); + ) orelse break)); const spawn_config = if (Environment.isMac) // stack size must be a multiple of page_size // macOS will fail to spawn a thread if the stack size is not a multiple of page_size @@ -470,7 +470,7 @@ pub fn warm(self: *ThreadPool, count: u14) void { } noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { - var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); while (sync.state != .shutdown) { const can_wake = is_waking or (sync.state == .pending); if (is_waking) { @@ -492,9 +492,9 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { // Release barrier synchronizes with Acquire in wait() // to ensure pushes to run queues happen before observing a posted notification. - sync = @bitCast(Sync, self.sync.tryCompareAndSwap( - @bitCast(u32, sync), - @bitCast(u32, new_sync), + sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap( + @as(u32, @bitCast(sync)), + @as(u32, @bitCast(new_sync)), .Release, .Monotonic, ) orelse { @@ -518,14 +518,14 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { } return; - }); + })); } } noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { var is_idle = false; var is_waking = _is_waking; - var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); while (true) { if (sync.state == .shutdown) return error.Shutdown; @@ -542,44 +542,44 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { // Acquire barrier synchronizes with notify() // to ensure that pushes to run queue are observed after wait() returns. - sync = @bitCast(Sync, self.sync.tryCompareAndSwap( - @bitCast(u32, sync), - @bitCast(u32, new_sync), + sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap( + @as(u32, @bitCast(sync)), + @as(u32, @bitCast(new_sync)), .Acquire, .Monotonic, ) orelse { return is_waking or (sync.state == .signaled); - }); + })); } else if (!is_idle) { var new_sync = sync; new_sync.idle += 1; if (is_waking) new_sync.state = .pending; - sync = @bitCast(Sync, self.sync.tryCompareAndSwap( - @bitCast(u32, sync), - @bitCast(u32, new_sync), + sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap( + @as(u32, @bitCast(sync)), + @as(u32, @bitCast(new_sync)), .Monotonic, .Monotonic, ) orelse { is_waking = false; is_idle = true; continue; - }); + })); } else { if (Thread.current) |current| { current.drainIdleEvents(); } self.idle_event.wait(); - sync = @bitCast(Sync, self.sync.load(.Monotonic)); + sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); } } } /// Marks the thread pool as shutdown pub noinline fn shutdown(self: *ThreadPool) void { - var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); while (sync.state != .shutdown) { var new_sync = sync; new_sync.notified = true; @@ -587,9 +587,9 @@ pub noinline fn shutdown(self: *ThreadPool) void { new_sync.idle = 0; // Full barrier to synchronize with both wait() and notify() - sync = @bitCast(Sync, self.sync.tryCompareAndSwap( - @bitCast(u32, sync), - @bitCast(u32, new_sync), + sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap( + @as(u32, @bitCast(sync)), + @as(u32, @bitCast(new_sync)), .AcqRel, .Monotonic, ) orelse { @@ -597,7 +597,7 @@ pub noinline fn shutdown(self: *ThreadPool) void { // TODO: I/O polling notification here. if (sync.idle > 0) self.idle_event.shutdown(); return; - }); + })); } } @@ -628,8 +628,8 @@ pub fn setThreadContext(noalias pool: *ThreadPool, ctx: ?*anyopaque) void { fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { // Un-spawn one thread, either due to a failed OS thread spawning or the thread is exiting. - const one_spawned = @bitCast(u32, Sync{ .spawned = 1 }); - const sync = @bitCast(Sync, self.sync.fetchSub(one_spawned, .Release)); + const one_spawned = @as(u32, @bitCast(Sync{ .spawned = 1 })); + const sync = @as(Sync, @bitCast(self.sync.fetchSub(one_spawned, .Release))); assert(sync.spawned > 0); // The last thread to exit must wake up the thread pool join()er @@ -651,10 +651,10 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { fn join(self: *ThreadPool) void { // Wait for the thread pool to be shutdown() then for all threads to enter a joinable state - var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); if (!(sync.state == .shutdown and sync.spawned == 0)) { self.join_event.wait(); - sync = @bitCast(Sync, self.sync.load(.Monotonic)); + sync = @as(Sync, @bitCast(self.sync.load(.Monotonic))); } assert(sync.state == .shutdown); @@ -753,7 +753,7 @@ pub const Thread = struct { } // Then try work stealing from other threads - var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned; + var num_threads: u32 = @as(Sync, @bitCast(thread_pool.sync.load(.Monotonic))).spawned; while (num_threads > 0) : (num_threads -= 1) { // Traverse the stack of registered threads on the thread pool const target = self.target orelse thread_pool.threads.load(.Acquire) orelse unreachable; @@ -945,7 +945,7 @@ pub const Node = struct { var stack = self.stack.load(.Monotonic); while (true) { // Attach the list to the stack (pt. 1) - list.tail.next = @ptrFromInt(?*Node, stack & PTR_MASK); + list.tail.next = @as(?*Node, @ptrFromInt(stack & PTR_MASK)); // Update the stack with the list (pt. 2). // Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer. @@ -985,7 +985,7 @@ pub const Node = struct { new_stack, .Acquire, .Monotonic, - ) orelse return self.cache orelse @ptrFromInt(*Node, stack & PTR_MASK); + ) orelse return self.cache orelse @as(*Node, @ptrFromInt(stack & PTR_MASK)); } } @@ -1022,7 +1022,7 @@ pub const Node = struct { assert(stack & IS_CONSUMING != 0); assert(stack & PTR_MASK != 0); - const node = @ptrFromInt(*Node, stack & PTR_MASK); + const node = @as(*Node, @ptrFromInt(stack & PTR_MASK)); consumer_ref.* = node.next; return node; } |