aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/unbounded_queue.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/unbounded_queue.zig')
-rw-r--r--src/bun.js/unbounded_queue.zig149
1 files changed, 149 insertions, 0 deletions
diff --git a/src/bun.js/unbounded_queue.zig b/src/bun.js/unbounded_queue.zig
new file mode 100644
index 000000000..fd092290d
--- /dev/null
+++ b/src/bun.js/unbounded_queue.zig
@@ -0,0 +1,149 @@
+const std = @import("std");
+
+const os = std.os;
+const mem = std.mem;
+const meta = std.meta;
+const atomic = std.atomic;
+const builtin = std.builtin;
+const testing = std.testing;
+
+const assert = std.debug.assert;
+
+const mpsc = @This();
+
+pub const cache_line_length = switch (@import("builtin").target.cpu.arch) {
+ .x86_64, .aarch64, .powerpc64 => 128,
+ .arm, .mips, .mips64, .riscv64 => 32,
+ .s390x => 256,
+ else => 64,
+};
+
+pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type {
+ const next = meta.fieldInfo(T, next_field).name;
+
+ return struct {
+ const Self = @This();
+
+ pub const Batch = struct {
+ pub const Iterator = struct {
+ batch: Self.Batch,
+
+ pub fn next(self: *Self.Batch.Iterator) ?*T {
+ if (self.batch.count == 0) return null;
+ const front = self.batch.front orelse unreachable;
+ self.batch.front = @field(front, next);
+ self.batch.count -= 1;
+ return front;
+ }
+ };
+
+ front: ?*T = null,
+ last: ?*T = null,
+ count: usize = 0,
+
+ pub fn iterator(self: Self.Batch) Self.Batch.Iterator {
+ return .{ .batch = self };
+ }
+ };
+
+ pub const queue_padding_length = cache_line_length / 2;
+
+ back: ?*T align(queue_padding_length) = null,
+ count: usize = 0,
+ front: T align(queue_padding_length) = init: {
+ var stub: T = undefined;
+ @field(stub, next) = null;
+ break :init stub;
+ },
+
+ pub fn push(self: *Self, src: *T) void {
+ assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0);
+
+ @field(src, next) = null;
+ const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front;
+ @field(old_back, next) = src;
+ }
+
+ pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void {
+ assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0);
+
+ @field(last, next) = null;
+ const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front;
+ @field(old_back, next) = first;
+ }
+
+ pub fn pop(self: *Self) ?*T {
+ const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null;
+ if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| {
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+ const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
+ if (first != last) return null;
+ @atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
+ if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+ var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
+ while (next_item == null) : (atomic.spinLoopHint()) {
+ next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
+ }
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+
+ pub fn popBatch(self: *Self) Self.Batch {
+ var batch: Self.Batch = .{};
+
+ var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch;
+ batch.front = front;
+
+ var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) {
+ batch.count += 1;
+ batch.last = front;
+
+ front = next_node;
+ }
+
+ const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
+ if (front != last) {
+ @atomicStore(?*T, &@field(self.front, next), front, .Release);
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ @atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
+ if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
+ batch.count += 1;
+ batch.last = front;
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ while (next_item == null) : (atomic.spinLoopHint()) {
+ next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ }
+
+ batch.count += 1;
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ batch.last = front;
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ pub fn peek(self: *Self) usize {
+ const count = @atomicLoad(usize, &self.count, .Acquire);
+ assert(count >= 0);
+ return count;
+ }
+
+ pub fn isEmpty(self: *Self) bool {
+ return self.peek() == 0;
+ }
+ };
+}