aboutsummaryrefslogtreecommitdiff
path: root/src/thread_pool.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/thread_pool.zig')
-rw-r--r--src/thread_pool.zig74
1 files changed, 37 insertions, 37 deletions
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index 3359311d6..402f6b1d6 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -17,7 +17,7 @@ on_thread_spawn: ?OnSpawnCallback = null,
threadpool_context: ?*anyopaque = null,
stack_size: u32,
max_threads: u32,
-sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
+sync: Atomic(u32) = Atomic(u32).init(@as(u32, @bitCast(Sync{}))),
idle_event: Event = .{},
join_event: Event = .{},
run_queue: Node.Queue = .{},
@@ -352,7 +352,7 @@ pub fn Do(
}
}
- wait_group.counter += @intCast(u32, values.len);
+ wait_group.counter += @as(u32, @intCast(values.len));
this.schedule(batch);
wait_group.wait();
}
@@ -431,7 +431,7 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void {
// Fast path to check the Sync state to avoid calling into notifySlow().
// If we're waking, then we need to update the state regardless
if (!is_waking) {
- const sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ const sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
if (sync.notified) {
return;
}
@@ -443,20 +443,20 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void {
/// Warm the thread pool up to the given number of threads.
/// https://www.youtube.com/watch?v=ys3qcbO5KWw
pub fn warm(self: *ThreadPool, count: u14) void {
- var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
if (sync.spawned >= count)
return;
- const to_spawn = @min(count - sync.spawned, @truncate(u14, self.max_threads));
+ const to_spawn = @min(count - sync.spawned, @as(u14, @truncate(self.max_threads)));
while (sync.spawned < to_spawn) {
var new_sync = sync;
new_sync.spawned += 1;
- sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
- @bitCast(u32, sync),
- @bitCast(u32, new_sync),
+ sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap(
+ @as(u32, @bitCast(sync)),
+ @as(u32, @bitCast(new_sync)),
.Release,
.Monotonic,
- ) orelse break);
+ ) orelse break));
const spawn_config = if (Environment.isMac)
// stack size must be a multiple of page_size
// macOS will fail to spawn a thread if the stack size is not a multiple of page_size
@@ -470,7 +470,7 @@ pub fn warm(self: *ThreadPool, count: u14) void {
}
noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
- var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
while (sync.state != .shutdown) {
const can_wake = is_waking or (sync.state == .pending);
if (is_waking) {
@@ -492,9 +492,9 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
// Release barrier synchronizes with Acquire in wait()
// to ensure pushes to run queues happen before observing a posted notification.
- sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
- @bitCast(u32, sync),
- @bitCast(u32, new_sync),
+ sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap(
+ @as(u32, @bitCast(sync)),
+ @as(u32, @bitCast(new_sync)),
.Release,
.Monotonic,
) orelse {
@@ -518,14 +518,14 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
}
return;
- });
+ }));
}
}
noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
var is_idle = false;
var is_waking = _is_waking;
- var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
while (true) {
if (sync.state == .shutdown) return error.Shutdown;
@@ -542,44 +542,44 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
// Acquire barrier synchronizes with notify()
// to ensure that pushes to run queue are observed after wait() returns.
- sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
- @bitCast(u32, sync),
- @bitCast(u32, new_sync),
+ sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap(
+ @as(u32, @bitCast(sync)),
+ @as(u32, @bitCast(new_sync)),
.Acquire,
.Monotonic,
) orelse {
return is_waking or (sync.state == .signaled);
- });
+ }));
} else if (!is_idle) {
var new_sync = sync;
new_sync.idle += 1;
if (is_waking)
new_sync.state = .pending;
- sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
- @bitCast(u32, sync),
- @bitCast(u32, new_sync),
+ sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap(
+ @as(u32, @bitCast(sync)),
+ @as(u32, @bitCast(new_sync)),
.Monotonic,
.Monotonic,
) orelse {
is_waking = false;
is_idle = true;
continue;
- });
+ }));
} else {
if (Thread.current) |current| {
current.drainIdleEvents();
}
self.idle_event.wait();
- sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
}
}
}
/// Marks the thread pool as shutdown
pub noinline fn shutdown(self: *ThreadPool) void {
- var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
while (sync.state != .shutdown) {
var new_sync = sync;
new_sync.notified = true;
@@ -587,9 +587,9 @@ pub noinline fn shutdown(self: *ThreadPool) void {
new_sync.idle = 0;
// Full barrier to synchronize with both wait() and notify()
- sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
- @bitCast(u32, sync),
- @bitCast(u32, new_sync),
+ sync = @as(Sync, @bitCast(self.sync.tryCompareAndSwap(
+ @as(u32, @bitCast(sync)),
+ @as(u32, @bitCast(new_sync)),
.AcqRel,
.Monotonic,
) orelse {
@@ -597,7 +597,7 @@ pub noinline fn shutdown(self: *ThreadPool) void {
// TODO: I/O polling notification here.
if (sync.idle > 0) self.idle_event.shutdown();
return;
- });
+ }));
}
}
@@ -628,8 +628,8 @@ pub fn setThreadContext(noalias pool: *ThreadPool, ctx: ?*anyopaque) void {
fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
// Un-spawn one thread, either due to a failed OS thread spawning or the thread is exiting.
- const one_spawned = @bitCast(u32, Sync{ .spawned = 1 });
- const sync = @bitCast(Sync, self.sync.fetchSub(one_spawned, .Release));
+ const one_spawned = @as(u32, @bitCast(Sync{ .spawned = 1 }));
+ const sync = @as(Sync, @bitCast(self.sync.fetchSub(one_spawned, .Release)));
assert(sync.spawned > 0);
// The last thread to exit must wake up the thread pool join()er
@@ -651,10 +651,10 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
fn join(self: *ThreadPool) void {
// Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
- var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ var sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
if (!(sync.state == .shutdown and sync.spawned == 0)) {
self.join_event.wait();
- sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ sync = @as(Sync, @bitCast(self.sync.load(.Monotonic)));
}
assert(sync.state == .shutdown);
@@ -753,7 +753,7 @@ pub const Thread = struct {
}
// Then try work stealing from other threads
- var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;
+ var num_threads: u32 = @as(Sync, @bitCast(thread_pool.sync.load(.Monotonic))).spawned;
while (num_threads > 0) : (num_threads -= 1) {
// Traverse the stack of registered threads on the thread pool
const target = self.target orelse thread_pool.threads.load(.Acquire) orelse unreachable;
@@ -945,7 +945,7 @@ pub const Node = struct {
var stack = self.stack.load(.Monotonic);
while (true) {
// Attach the list to the stack (pt. 1)
- list.tail.next = @ptrFromInt(?*Node, stack & PTR_MASK);
+ list.tail.next = @as(?*Node, @ptrFromInt(stack & PTR_MASK));
// Update the stack with the list (pt. 2).
// Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer.
@@ -985,7 +985,7 @@ pub const Node = struct {
new_stack,
.Acquire,
.Monotonic,
- ) orelse return self.cache orelse @ptrFromInt(*Node, stack & PTR_MASK);
+ ) orelse return self.cache orelse @as(*Node, @ptrFromInt(stack & PTR_MASK));
}
}
@@ -1022,7 +1022,7 @@ pub const Node = struct {
assert(stack & IS_CONSUMING != 0);
assert(stack & PTR_MASK != 0);
- const node = @ptrFromInt(*Node, stack & PTR_MASK);
+ const node = @as(*Node, @ptrFromInt(stack & PTR_MASK));
consumer_ref.* = node.next;
return node;
}