aboutsummaryrefslogtreecommitdiff
path: root/src/bun_queue.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun_queue.zig')
-rw-r--r--src/bun_queue.zig845
1 files changed, 845 insertions, 0 deletions
diff --git a/src/bun_queue.zig b/src/bun_queue.zig
new file mode 100644
index 000000000..4c289d511
--- /dev/null
+++ b/src/bun_queue.zig
@@ -0,0 +1,845 @@
+const std = @import("std");
+const Mutex = @import("./lock.zig").Mutex;
+const Channel = @import("./sync.zig").Channel;
+const WaitGroup = @import("./sync.zig").WaitGroup;
+usingnamespace @import("./global.zig");
+const Wyhash = std.hash.Wyhash;
+const assert = std.debug.assert;
+
+const VerboseQueue = false;
+
+pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, comptime block_count: usize) type {
+ return struct {
+ const BlockQueue = @This();
+ const Block = [block_size]Value;
+
+ blocks: [block_count]*Block = undefined,
+ overflow: std.ArrayList(*Block) = undefined,
+ first: Block = undefined,
+ len: std.atomic.Atomic(i32) = std.atomic.Atomic(i32).init(0),
+ allocated_blocks: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+
+ write_lock: bool = false,
+ overflow_write_lock: bool = false,
+ overflow_readers: std.atomic.Atomic(u8) = std.atomic.Atomic(u8).init(0),
+ allocator: *std.mem.Allocator,
+ empty_queue: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(1),
+ rand: std.rand.DefaultPrng = std.rand.DefaultPrng.init(100),
+
+ pub fn new(this: *BlockQueue, allocator: *std.mem.Allocator) void {
+ this.* = BlockQueue{
+ .allocator = allocator,
+ .overflow = std.ArrayList(*Block).init(allocator),
+ .len = std.atomic.Atomic(i32).init(0),
+ };
+ this.blocks[0] = &this.first;
+ this.allocator = allocator;
+ }
+
+ pub fn get(this: *BlockQueue) ?Value {
+ if (this.len.fetchMax(-1, .SeqCst) <= 0) return null;
+
+ while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) {
+ const end = this.rand.random.uintAtMost(u8, 64);
+ var i: u8 = 0;
+ while (i < end) : (i += 1) {}
+ std.atomic.spinLoopHint();
+ }
+ defer assert(@atomicRmw(bool, &this.write_lock, .Xchg, false, .SeqCst));
+
+ if (this.len.fetchMax(-1, .SeqCst) <= 0) return null;
+ const current_len_ = this.len.fetchSub(1, .SeqCst);
+ if (current_len_ <= 0) return null;
+
+ const current_len = @intCast(u32, current_len_);
+ if (current_len == 0) {
+ return null;
+ }
+
+ const current_block = @floatToInt(u32, std.math.floor(@intToFloat(f32, (current_len - 1) / block_size)));
+ const index = (current_len - 1) % block_size;
+
+ if (comptime VerboseQueue) std.debug.print("[GET] {d}, {d}\n", .{ current_block, index });
+
+ switch (current_block) {
+ 0 => {
+ return this.first[index];
+ },
+ 1...block_count => {
+ const ptr = @atomicLoad(*Block, &this.blocks[current_block], .SeqCst);
+ return ptr[index];
+ },
+ else => {
+ const is_overflowing = current_block > block_count;
+
+ unreachable;
+ },
+ }
+ }
+
+ pub fn enqueue(this: *BlockQueue, value: Value) !void {
+ while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) {
+ const end = this.rand.random.uintAtMost(u8, 32);
+ var i: u8 = 0;
+ while (i < end) : (i += 1) {}
+ std.atomic.spinLoopHint();
+ }
+ defer assert(@atomicRmw(bool, &this.write_lock, .Xchg, false, .SeqCst));
+ defer {
+ const old = this.empty_queue.swap(0, .SeqCst);
+ if (old == 1) std.Thread.Futex.wake(&this.empty_queue, std.math.maxInt(u32));
+ }
+
+ const current_len = @intCast(u32, std.math.max(this.len.fetchAdd(1, .SeqCst), 0));
+ const next_len = current_len + 1;
+
+ const current_block = @floatToInt(u32, std.math.floor(@intToFloat(f32, current_len) / block_size));
+ const next_block = @floatToInt(u32, std.math.floor(@intToFloat(f32, next_len) / block_size));
+ const index = (current_len % block_size);
+ const next_index = (next_len % block_size);
+
+ if (comptime VerboseQueue) std.debug.print("\n[PUT] {d}, {d} - {d} \n", .{ current_block, index, current_len });
+
+ const allocated_block = this.allocated_blocks.load(.SeqCst);
+ const needs_new_block = next_index == 0;
+ const needs_to_allocate_block = needs_new_block and allocated_block < next_block;
+ const overflowing = current_block >= block_count;
+
+ if (needs_to_allocate_block) {
+ defer {
+ _ = this.allocated_blocks.fetchAdd(1, .SeqCst);
+ }
+ var new_list = try this.allocator.create(Block);
+ if (next_block >= block_count) {
+ const needs_lock = this.overflow.items.len + 1 >= this.overflow.capacity;
+ if (needs_lock) {
+ while (this.overflow_readers.load(.SeqCst) > 0) {
+ std.atomic.spinLoopHint();
+ }
+ @atomicStore(bool, &this.overflow_write_lock, true, .SeqCst);
+ }
+ defer {
+ if (needs_lock) {
+ @atomicStore(bool, &this.overflow_write_lock, false, .SeqCst);
+ }
+ }
+ try this.overflow.append(new_list);
+ } else {
+ @atomicStore(*Block, &this.blocks[next_block], new_list, .SeqCst);
+ }
+ }
+
+ var block_ptr = if (!overflowing)
+ @atomicLoad(*Block, &this.blocks[current_block], .SeqCst)
+ else
+ @atomicLoad(*Block, &this.overflow.items[current_block - block_count], .SeqCst);
+
+ block_ptr[index] = value;
+ if (current_len < 10) std.Thread.Futex.wake(@ptrCast(*const std.atomic.Atomic(u32), &this.len), std.math.maxInt(u32));
+ }
+ };
+}
+
+pub fn NewBunQueue(comptime Value: type) type {
+ return struct {
+ const KeyType = u32;
+ const BunQueue = @This();
+ const Queue = NewBlockQueue(Value, 64, 48);
+ // pub const Fifo = NewFifo(Value);
+
+ allocator: *std.mem.Allocator,
+ queue: Queue,
+ keys: Keys,
+ count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+
+ pub fn init(allocator: *std.mem.Allocator) !*BunQueue {
+ var bun = try allocator.create(BunQueue);
+ bun.* = BunQueue{
+ .allocator = allocator,
+ .queue = undefined,
+ .keys = Keys{
+ .offset = AtomicOffset.init(Offset.bits(.{ .used = 0, .len = 0 })),
+ .block_overflow = Keys.OverflowList.init(allocator),
+ },
+ };
+ bun.queue.new(allocator);
+
+ bun.keys.blocks[0] = &bun.keys.first_key_list;
+ return bun;
+ }
+
+ pub const Keys = struct {
+ pub const OverflowList = std.ArrayList([*]KeyType);
+ // Half a page of memory
+
+ blocks: [overflow_size][*]KeyType = undefined,
+ offset: AtomicOffset,
+ block_overflow: OverflowList,
+ block_overflow_lock: bool = false,
+ first_key_list: [block_size]KeyType = undefined,
+ mutex: Mutex = Mutex{},
+ write_lock: bool = false,
+ append_readers: u8 = 0,
+ append_lock: bool = false,
+ pending_write: KeyType = 0,
+ };
+
+ pub const Offset = packed struct {
+ used: u16,
+ len: u16,
+
+ pub const Int = std.meta.Int(.unsigned, @bitSizeOf(@This()));
+
+ pub inline fn bits(this: Offset) Int {
+ return @bitCast(Int, this);
+ }
+ };
+
+ pub const block_size = 2048 / @sizeOf(KeyType);
+ pub const overflow_size = 32;
+
+ // In one atomic load/store, get the length and offset of the keys
+ pub const AtomicOffset = std.atomic.Atomic(Offset.Int);
+
+ fn pushList(this: *BunQueue, used: u16) !void {
+
+ // this.keys.mutex.acquire();
+ // defer this.keys.mutex.release();
+
+ var block = try this.allocator.alloc(KeyType, block_size);
+
+ if (used < overflow_size) {
+ @atomicStore([*]KeyType, &this.keys.blocks[used], block.ptr, .Release);
+ } else {
+ const needs_lock = this.keys.block_overflow.items.len + 1 >= this.keys.block_overflow.capacity;
+ if (needs_lock) {
+ while (@atomicLoad(u8, &this.keys.append_readers, .SeqCst) > 0) {
+ std.atomic.spinLoopHint();
+ }
+ @atomicStore(bool, &this.keys.append_lock, true, .SeqCst);
+ }
+ defer {
+ if (needs_lock) @atomicStore(bool, &this.keys.append_lock, false, .SeqCst);
+ }
+ try this.keys.block_overflow.append(block.ptr);
+ }
+ }
+
+ inline fn contains(this: *BunQueue, key: KeyType) bool {
+ @fence(.Acquire);
+ if (@atomicLoad(KeyType, &this.keys.pending_write, .SeqCst) == key) return true;
+ // this.keys.mutex.tryAcquire()
+
+ var offset = this.getOffset();
+ std.debug.assert(&this.keys.first_key_list == this.keys.blocks[0]);
+
+ // Heuristic #1: the first files you import are probably the most common in your app
+ // e.g. "react"
+ if (offset.used != 0) {
+ for (this.keys.first_key_list) |_key| {
+ if (key == _key) return true;
+ }
+ }
+
+ if (offset.used < overflow_size) {
+ // Heuristic #2: you import files near each other
+ const block_ptr = @atomicLoad([*]KeyType, &this.keys.blocks[offset.used], .SeqCst);
+ for (block_ptr[0..offset.len]) |_key| {
+ if (key == _key) return true;
+ }
+ } else {
+ while (@atomicLoad(bool, &this.keys.append_lock, .SeqCst)) {
+ std.atomic.spinLoopHint();
+ }
+ _ = @atomicRmw(u8, &this.keys.append_readers, .Add, 1, .SeqCst);
+ defer {
+ _ = @atomicRmw(u8, &this.keys.append_readers, .Sub, 1, .SeqCst);
+ }
+ const latest = @atomicLoad([*]KeyType, &this.keys.block_overflow.items[offset.used - overflow_size], .SeqCst);
+
+ for (latest[0..offset.len]) |_key| {
+ if (key == _key) return true;
+ }
+ }
+
+ if (offset.used > 0) {
+ var j: usize = 1;
+ while (j < std.math.min(overflow_size, offset.used)) : (j += 1) {
+ const block_ptr = @atomicLoad([*]KeyType, &this.keys.blocks[j], .SeqCst);
+ for (block_ptr[0..block_size]) |_key| {
+ if (key == _key) return true;
+ }
+ }
+
+ if (offset.used > overflow_size) {
+ var end = offset.used - overflow_size;
+ j = 0;
+ while (j < end) : (j += 1) {
+ while (@atomicLoad(bool, &this.keys.append_lock, .SeqCst)) {
+ std.atomic.spinLoopHint();
+ }
+
+ _ = @atomicRmw(u8, &this.keys.append_readers, .Add, 1, .SeqCst);
+ defer {
+ _ = @atomicRmw(u8, &this.keys.append_readers, .Sub, 1, .SeqCst);
+ }
+
+ const block = @atomicLoad([*]KeyType, &this.keys.block_overflow.items[j], .SeqCst);
+ for (block[0..block_size]) |_key| {
+ if (key == _key) return true;
+ }
+ }
+ }
+ }
+
+ return @atomicLoad(KeyType, &this.keys.pending_write, .Acquire) == key;
+ }
+
+ pub inline fn getOffset(this: *BunQueue) Offset {
+ return @bitCast(Offset, this.keys.offset.load(std.atomic.Ordering.Acquire));
+ }
+
+ pub fn hasItem(this: *BunQueue, key: KeyType) bool {
+ @fence(.SeqCst);
+
+ if (this.contains(key)) return true;
+ while (@atomicRmw(bool, &this.keys.write_lock, .Xchg, true, .SeqCst)) {
+ std.atomic.spinLoopHint();
+ }
+ defer assert(@atomicRmw(bool, &this.keys.write_lock, .Xchg, false, .SeqCst));
+
+ if (@atomicRmw(KeyType, &this.keys.pending_write, .Xchg, key, .SeqCst) == key) return true;
+
+ const offset = this.getOffset();
+
+ const new_len = (offset.len + 1) % block_size;
+ const is_new_list = new_len == 0;
+ const new_offset = Offset{ .used = @intCast(u16, @boolToInt(is_new_list)) + offset.used, .len = new_len };
+
+ {
+ var latest_list = if (offset.used < overflow_size)
+ @atomicLoad([*]KeyType, &this.keys.blocks[offset.used], .SeqCst)
+ else
+ @atomicLoad([*]KeyType, &this.keys.block_overflow.items[offset.used - overflow_size], .SeqCst);
+
+ assert(@atomicRmw(KeyType, &latest_list[offset.len], .Xchg, key, .Release) != key);
+ }
+
+ // We only should need to lock when we're allocating memory
+ if (is_new_list) {
+ this.pushList(new_offset.used) catch unreachable;
+ }
+
+ this.keys.offset.store(new_offset.bits(), .Release);
+
+ return false;
+ }
+
+ inline fn _writeItem(this: *BunQueue, value: Value) !void {
+ _ = this.count.fetchAdd(1, .Release);
+ try this.queue.enqueue(value);
+ }
+
+ pub fn upsert(this: *BunQueue, key: KeyType, value: Value) !void {
+ if (!this.hasItem(key)) {
+ try this._writeItem(value);
+ }
+ }
+
+ pub fn upsertWithResult(this: *BunQueue, key: KeyType, value: Value) !bool {
+ if (!this.hasItem(key)) {
+ try this._writeItem(value);
+ return true;
+ }
+
+ return false;
+ }
+ pub inline fn next(this: *BunQueue) ?Value {
+ return this.queue.get();
+ }
+ };
+}
+
+test "BunQueue: Single-threaded" {
+ const BunQueue = NewBunQueue([]const u8);
+ const hash = Wyhash.hash;
+ const expect = std.testing.expect;
+
+ var queue = try BunQueue.init(default_allocator);
+
+ var greet = [_]string{
+ "hello", "how", "are", "you",
+ "https://", "ton.local.twitter.com", "/responsive-web-internal/", "sourcemaps",
+ "/client-web/", "loader.Typeahead.7c3b3805.js.map:", "ERR_BLOCKED_BY_CLIENT", "etch failed loading: POST ",
+ "ondemand.LottieWeb.08803c45.js", "ondemand.InlinePlayer.4990ef15.js", "ondemand.BranchSdk.bb99d145.js", "ondemand.Dropdown.011d5045.js",
+ };
+ var greeted: [greet.len]bool = undefined;
+ std.mem.set(bool, &greeted, false);
+
+ for (greet) |ing, i| {
+ const key = @truncate(u32, hash(0, ing));
+ try expect(!queue.contains(
+ key,
+ ));
+ try queue.upsert(
+ key,
+ ing,
+ );
+ try expect(queue.hasItem(
+ key,
+ ));
+ try expect(queue.getOffset().len == i + 1);
+ }
+
+ {
+ var i: usize = 0;
+ while (i < greet.len) : (i += 1) {
+ const item = (queue.next()) orelse return try std.testing.expect(false);
+ try expect(strings.containsAny(&greet, item));
+ const index = strings.indexAny(&greet, item) orelse unreachable;
+ try expect(!greeted[index]);
+ greeted[index] = true;
+ }
+ i = 0;
+ while (i < greet.len) : (i += 1) {
+ try expect(queue.next() == null);
+ }
+ i = 0;
+ while (i < greet.len) : (i += 1) {
+ try expect(greeted[i]);
+ }
+ i = 0;
+ }
+
+ const end_offset = queue.getOffset().len;
+
+ for (greet) |ing, i| {
+ const key = @truncate(u32, hash(0, ing));
+ try queue.upsert(
+ key,
+ ing,
+ );
+
+ try expect(end_offset == queue.getOffset().len);
+ }
+}
+
+test "BunQueue: Dedupes" {
+ const BunQueue = NewBunQueue([]const u8);
+ const hash = Wyhash.hash;
+ const expect = std.testing.expect;
+
+ var queue = try BunQueue.init(default_allocator);
+
+ var greet = [_]string{
+ "uniq1",
+ "uniq2",
+ "uniq3",
+ "uniq4",
+ "uniq5",
+ "uniq6",
+ "uniq7",
+ "uniq8",
+ "uniq9",
+ "uniq10",
+ "uniq11",
+ "uniq12",
+ "uniq13",
+ "uniq14",
+ "uniq15",
+ "uniq16",
+ "uniq17",
+ "uniq18",
+ "uniq19",
+ "uniq20",
+ "uniq21",
+ "uniq22",
+ "uniq23",
+ "uniq24",
+ "uniq25",
+ "uniq26",
+ "uniq27",
+ "uniq28",
+ "uniq29",
+ "uniq30",
+ } ++ [_]string{ "dup20", "dup21", "dup27", "dup2", "dup12", "dup15", "dup4", "dup12", "dup10", "dup7", "dup26", "dup22", "dup1", "dup23", "dup11", "dup8", "dup11", "dup29", "dup28", "dup25", "dup20", "dup2", "dup6", "dup16", "dup22", "dup13", "dup30", "dup9", "dup3", "dup17", "dup14", "dup18", "dup8", "dup3", "dup28", "dup30", "dup24", "dup18", "dup24", "dup5", "dup23", "dup10", "dup13", "dup26", "dup27", "dup29", "dup25", "dup4", "dup19", "dup15", "dup6", "dup17", "dup1", "dup16", "dup19", "dup7", "dup9", "dup21", "dup14", "dup5" };
+ var prng = std.rand.DefaultPrng.init(100);
+ prng.random.shuffle(string, &greet);
+ var deduped = std.BufSet.init(default_allocator);
+ var consumed = std.BufSet.init(default_allocator);
+
+ for (greet) |ing, i| {
+ const key = @truncate(u32, hash(0, ing));
+
+ const is_new = !deduped.contains(ing);
+ try deduped.insert(ing);
+ try queue.upsert(key, ing);
+ }
+
+ while (queue.next()) |i| {
+ try expect(consumed.contains(i) == false);
+ try consumed.insert(i);
+ }
+
+ try std.testing.expectEqual(consumed.count(), deduped.count());
+ try expect(deduped.count() > 0);
+}
+
+test "BunQueue: SCMP Threaded" {
+ const BunQueue = NewBunQueue([]const u8);
+ const expect = std.testing.expect;
+
+ var _queue = try BunQueue.init(default_allocator);
+
+ var greet = [_]string{
+ "uniq1",
+ "uniq2",
+ "uniq3",
+ "uniq4",
+ "uniq5",
+ "uniq6",
+ "uniq7",
+ "uniq8",
+ "uniq9",
+ "uniq10",
+ "uniq11",
+ "uniq12",
+ "uniq13",
+ "uniq14",
+ "uniq15",
+ "uniq16",
+ "uniq17",
+ "uniq18",
+ "uniq19",
+ "uniq20",
+ "uniq21",
+ "uniq22",
+ "uniq23",
+ "uniq24",
+ "uniq25",
+ "uniq26",
+ "uniq27",
+ "uniq28",
+ "uniq29",
+ "uniq30",
+ "uniq31",
+ "uniq32",
+ "uniq33",
+ "uniq34",
+ "uniq35",
+ "uniq36",
+ "uniq37",
+ "uniq38",
+ "uniq39",
+ "uniq40",
+ "uniq41",
+ "uniq42",
+ "uniq43",
+ "uniq44",
+ "uniq45",
+ "uniq46",
+ "uniq47",
+ "uniq48",
+ "uniq49",
+ "uniq50",
+ "uniq51",
+ "uniq52",
+ "uniq53",
+ "uniq54",
+ "uniq55",
+ "uniq56",
+ "uniq57",
+ "uniq58",
+ "uniq59",
+ "uniq60",
+ "uniq61",
+ "uniq62",
+ "uniq63",
+ "uniq64",
+ "uniq65",
+ "uniq66",
+ "uniq67",
+ "uniq68",
+ "uniq69",
+ "uniq70",
+ "uniq71",
+ "uniq72",
+ "uniq73",
+ "uniq74",
+ "uniq75",
+ "uniq76",
+ "uniq77",
+ "uniq78",
+ "uniq79",
+ "uniq80",
+ "uniq81",
+ "uniq82",
+ "uniq83",
+ "uniq84",
+ "uniq85",
+ "uniq86",
+ "uniq87",
+ "uniq88",
+ "uniq89",
+ "uniq90",
+ "uniq91",
+ "uniq92",
+ "uniq93",
+ "uniq94",
+ "uniq95",
+ "uniq96",
+ "uniq97",
+ "uniq98",
+ "uniq99",
+ "uniq100",
+ "uniq101",
+ "uniq102",
+ "uniq103",
+ "uniq104",
+ "uniq105",
+ "uniq106",
+ "uniq107",
+ "uniq108",
+ "uniq109",
+ "uniq110",
+ "uniq111",
+ "uniq112",
+ "uniq113",
+ "uniq114",
+ "uniq115",
+ "uniq116",
+ "uniq117",
+ "uniq118",
+ "uniq119",
+ "uniq120",
+ } ++ [_]string{ "dup1", "dup1", "dup10", "dup10", "dup11", "dup11", "dup12", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup25", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup25", "dup26", "dup26", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup27", "dup27", "dup28", "dup28", "dup29", "dup29", "dup3", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9" };
+ var prng = std.rand.DefaultPrng.init(100);
+ prng.random.shuffle(string, &greet);
+ var in = try default_allocator.create(std.BufSet);
+ in.* = std.BufSet.init(default_allocator);
+ for (greet) |i| {
+ try in.insert(i);
+ try _queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, i)), i);
+ }
+
+ const Worker = struct {
+ index: u8 = 0,
+
+ pub fn run(queue: *BunQueue, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void {
+ defer wg.done();
+ // const tasks = more_work[num];
+ // var remain = tasks;
+ while (queue.next()) |cur| {
+ mut.acquire();
+ defer mut.release();
+ try dedup_list.insert(cur);
+ }
+ }
+
+ pub fn run1(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void {
+ defer wg.done();
+ const tasks = more_work[num];
+ var remain = tasks;
+ try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]);
+ remain = tasks[1..];
+ loop: while (true) {
+ while (queue.next()) |cur| {
+ mut.acquire();
+ try dedup_list.insert(cur);
+ mut.release();
+ }
+
+ if (remain.len > 0) {
+ try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]);
+ remain = tasks[1..];
+ var j: usize = 0;
+ while (j < 1000) : (j += 1) {}
+ continue :loop;
+ }
+
+ break :loop;
+ }
+ }
+ };
+
+ var out = try default_allocator.create(std.BufSet);
+ out.* = std.BufSet.init(default_allocator);
+
+ var waitgroup = try default_allocator.create(WaitGroup);
+ waitgroup.* = WaitGroup.init();
+
+ var worker1 = try default_allocator.create(Worker);
+ worker1.* = Worker{};
+ var worker2 = try default_allocator.create(Worker);
+ worker2.* = Worker{};
+ waitgroup.add();
+ waitgroup.add();
+ var mutex = try default_allocator.create(Mutex);
+ mutex.* = Mutex{};
+
+ var thread1 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, out, waitgroup, mutex });
+ var thread2 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, out, waitgroup, mutex });
+
+ waitgroup.wait();
+ thread1.join();
+ thread2.join();
+
+ try std.testing.expectEqual(out.count(), in.count());
+ var iter = in.hash_map.iterator();
+
+ while (iter.next()) |entry| {
+ try expect(in.contains(entry.key_ptr.*));
+ }
+}
+
+test "BunQueue: MPMC Threaded" {
+ const BunQueue = NewBunQueue([]const u8);
+ const expect = std.testing.expect;
+ var _queue = try BunQueue.init(default_allocator);
+
+ var in = try default_allocator.create(std.BufSet);
+ in.* = std.BufSet.init(default_allocator);
+
+ const Worker = struct {
+ index: u8 = 0,
+ const WorkerCount = 2;
+ const lodash_all = shuffle(@TypeOf(@import("./test/project.zig").lodash), @import("./test/project.zig").lodash);
+ const lodash1 = lodash_all[0 .. lodash_all.len / 3];
+ const lodash2 = lodash_all[lodash1.len..][0 .. lodash_all.len / 3];
+ const lodash3 = lodash_all[lodash1.len + lodash2.len ..];
+
+ pub fn shuffle(comptime Type: type, comptime val: Type) Type {
+ var copy = val;
+ @setEvalBranchQuota(99999);
+ var rand = std.rand.DefaultPrng.init(100);
+ rand.random.shuffle(string, &copy);
+ return copy;
+ }
+ const three_all = shuffle(@TypeOf(@import("./test/project.zig").three), @import("./test/project.zig").three);
+ const three1 = three_all[0 .. three_all.len / 3];
+ const three2 = three_all[three1.len..][0 .. three_all.len / 3];
+ const three3 = three_all[three1.len + three2.len ..];
+
+ fn run1(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void {
+ defer wg.done();
+ const tasks = switch (num) {
+ 0 => lodash1,
+ 1 => lodash2,
+ 2 => lodash3,
+ 3 => three1,
+ 4 => three2,
+ 5 => three3,
+ else => unreachable,
+ };
+
+ var remain = tasks;
+ try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]);
+ remain = tasks[1..];
+ loop: while (true) {
+ while (queue.next()) |cur| {
+ mut.acquire();
+ defer mut.release();
+ try expect(!dedup_list.contains(cur));
+ try dedup_list.insert(cur);
+ }
+
+ if (remain.len > 0) {
+ try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]);
+ remain = remain[1..];
+ var j: usize = 0;
+ while (j < 10000) : (j += 1) {}
+ continue :loop;
+ }
+
+ break :loop;
+ }
+ }
+
+ pub fn run(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void {
+ try run1(queue, num, dedup_list, wg, mut);
+ }
+ };
+
+ var greet = [_]string{
+ "uniq1",
+ "uniq2",
+ "uniq3",
+ "uniq4",
+ "uniq5",
+ "uniq6",
+ "uniq7",
+ "uniq8",
+ "uniq9",
+ "uniq10",
+ "uniq11",
+ "uniq12",
+ "uniq13",
+ "uniq14",
+ "uniq15",
+ "uniq16",
+ "uniq17",
+ "uniq18",
+ "uniq19",
+ "uniq20",
+ "uniq21",
+ "uniq22",
+ "uniq23",
+ "uniq24",
+ "uniq25",
+ "uniq26",
+ "uniq27",
+ "uniq28",
+ "uniq29",
+ "uniq30",
+ } ++ [_]string{ "dup1", "dup1", "dup10", "dup10", "dup11", "dup11", "dup12", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup25", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup25", "dup26", "dup26", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup27", "dup27", "dup28", "dup28", "dup29", "dup29", "dup3", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9" };
+
+ for (greet) |a| {
+ try in.insert(a);
+ try _queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, a)), a);
+ }
+
+ for (Worker.lodash_all) |a| {
+ try in.insert(a);
+ }
+
+ for (Worker.three_all) |a| {
+ try in.insert(a);
+ }
+
+ var out = try default_allocator.create(std.BufSet);
+ out.* = std.BufSet.init(default_allocator);
+
+ var waitgroup = try default_allocator.create(WaitGroup);
+ waitgroup.* = WaitGroup.init();
+
+ waitgroup.add();
+ waitgroup.add();
+ waitgroup.add();
+ waitgroup.add();
+ waitgroup.add();
+ waitgroup.add();
+ var mutex = try default_allocator.create(Mutex);
+ mutex.* = Mutex{};
+
+ var thread1 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 0, out, waitgroup, mutex });
+ var thread2 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 1, out, waitgroup, mutex });
+ var thread3 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 2, out, waitgroup, mutex });
+ var thread4 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 3, out, waitgroup, mutex });
+ var thread5 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 4, out, waitgroup, mutex });
+ var thread6 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 5, out, waitgroup, mutex });
+
+ waitgroup.wait();
+ thread1.join();
+ thread2.join();
+ thread3.join();
+ thread4.join();
+ thread5.join();
+ thread6.join();
+
+ try std.testing.expectEqual(out.count(), in.count());
+ var iter = in.hash_map.iterator();
+
+ while (iter.next()) |entry| {
+ try expect(out.contains(entry.key_ptr.*));
+ }
+}