const std = @import("std");
const os = std.os;
const mem = std.mem;
const meta = std.meta;
const atomic = std.atomic;
const builtin = std.builtin;
const testing = std.testing;
const assert = std.debug.assert;
const mpsc = @This();
pub const cache_line_length = switch (@import("builtin").target.cpu.arch) {
.x86_64, .aarch64, .powerpc64 => 128,
.arm, .mips, .mips64, .riscv64 => 32,
.s390x => 256,
else => 64,
};
pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type {
const next_name = meta.fieldInfo(T, next_field).name;
return struct {
const Self = @This();
pub const Batch = struct {
pub const Iterator = struct {
batch: Self.Batch,
pub fn next(self: *Self.Batch.Iterator) ?*T {
if (self.batch.count == 0) return null;
const front = self.batch.front orelse unreachable;
self.batch.front = @field(front, next_name);
self.batch.count -= 1;
return front;
}
};
front: ?*T = null,
last: ?*T = null,
count: usize = 0,
pub fn iterator(self: Self.Batch) Self.Batch.Iterator {
return .{ .batch = self };
}
};
const next = next_name;
pub const queue_padding_length = cache_line_length / 2;
back: ?*T align(queue_padding_length) = null,
count: usize = 0,
front: T align(queue_padding_length) = init: {
var stub: T = undefined;
@field(stub, next) = null;
break :init stub;
},
pub fn push(self: *Self, src: *T) void {
assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0);
@field(src, next) = null;
const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front;
@field(old_back, next) = src;
}
pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void {
assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0);
@field(last, next) = null;
const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front;
@field(old_back, next) = first;
}
pub fn pop(self: *Self) ?*T {
const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null;
if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| {
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
return first;
}
const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
if (first != last) return null;
@atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
return first;
}
var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
while (next_item == null) : (atomic.spinLoopHint()) {
next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
}
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
return first;
}
pub fn popBatch(self: *Self) Self.Batch {
var batch: Self.Batch = .{};
var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch;
batch.front = front;
var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) {
batch.count += 1;
batch.last = front;
front = next_node;
}
const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
if (front != last) {
@atomicStore(?*T, &@field(self.front, next), front, .Release);
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
return batch;
}
@atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
batch.count += 1;
batch.last = front;
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
return batch;
}
next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
while (next_item == null) : (atomic.spinLoopHint()) {
next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
}
batch.count += 1;
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
batch.last = front;
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
return batch;
}
pub fn peek(self: *Self) usize {
const count = @atomicLoad(usize, &self.count, .Acquire);
assert(count >= 0);
return count;
}
pub fn isEmpty(self: *Self) bool {
return self.peek() == 0;
}
};
}
tion>
Unnamed repository; edit this file 'description' to name the repository. | |