diff options
Diffstat (limited to 'src/bun_queue.zig')
-rw-r--r-- | src/bun_queue.zig | 822 |
1 files changed, 0 insertions, 822 deletions
diff --git a/src/bun_queue.zig b/src/bun_queue.zig deleted file mode 100644 index 3f20ca430..000000000 --- a/src/bun_queue.zig +++ /dev/null @@ -1,822 +0,0 @@ -const std = @import("std"); -const Mutex = @import("./lock.zig").Mutex; -const WaitGroup = @import("./sync.zig").WaitGroup; -const _global = @import("./global.zig"); -const string = _global.string; -const Output = _global.Output; -const Global = _global.Global; -const Environment = _global.Environment; -const strings = _global.strings; -const MutableString = _global.MutableString; -const stringZ = _global.stringZ; -const default_allocator = _global.default_allocator; -const C = _global.C; -const Wyhash = std.hash.Wyhash; -const assert = std.debug.assert; - -const VerboseQueue = false; - -pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, comptime block_count: usize) type { - return struct { - const BlockQueue = @This(); - const Block = [block_size]Value; - - blocks: [block_count]*Block = undefined, - overflow: std.ArrayList(*Block) = undefined, - first: Block = undefined, - len: std.atomic.Atomic(i32) = std.atomic.Atomic(i32).init(0), - allocated_blocks: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - - write_lock: bool = false, - overflow_write_lock: bool = false, - overflow_readers: std.atomic.Atomic(u8) = std.atomic.Atomic(u8).init(0), - allocator: std.mem.Allocator, - empty_queue: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(1), - rand: std.rand.DefaultPrng = std.rand.DefaultPrng.init(100), - random: std.rand.Random = undefined, - pub fn new(this: *BlockQueue, allocator: std.mem.Allocator) void { - this.* = BlockQueue{ - .allocator = allocator, - .overflow = std.ArrayList(*Block).init(allocator), - .len = std.atomic.Atomic(i32).init(0), - }; - this.blocks[0] = &this.first; - this.allocator = allocator; - this.random = this.rand.random(); - } - - pub fn get(this: *BlockQueue) ?Value { - if (this.len.fetchMax(-1, .SeqCst) <= 0) return null; - - while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) { - const end = this.random.uintAtMost(u8, 64); - var i: u8 = 0; - while (i < end) : (i += 1) {} - std.atomic.spinLoopHint(); - } - defer assert(@atomicRmw(bool, &this.write_lock, .Xchg, false, .SeqCst)); - - if (this.len.fetchMax(-1, .SeqCst) <= 0) return null; - const current_len_ = this.len.fetchSub(1, .SeqCst); - if (current_len_ <= 0) return null; - - const current_len = @intCast(u32, current_len_); - if (current_len == 0) { - return null; - } - - const current_block = @floatToInt(u32, std.math.floor(@intToFloat(f32, (current_len - 1) / block_size))); - const index = (current_len - 1) % block_size; - - if (comptime VerboseQueue) std.debug.print("[GET] {d}, {d}\n", .{ current_block, index }); - - switch (current_block) { - 0 => { - return this.first[index]; - }, - 1...block_count => { - const ptr = @atomicLoad(*Block, &this.blocks[current_block], .SeqCst); - return ptr[index]; - }, - else => unreachable, - } - } - - pub fn enqueue(this: *BlockQueue, value: Value) !void { - const rand = this.rand.random(); - while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) { - const end = rand.uintAtMost(u8, 32); - var i: u8 = 0; - while (i < end) : (i += 1) {} - std.atomic.spinLoopHint(); - } - defer assert(@atomicRmw(bool, &this.write_lock, .Xchg, false, .SeqCst)); - defer { - const old = this.empty_queue.swap(0, .SeqCst); - if (old == 1) std.Thread.Futex.wake(&this.empty_queue, std.math.maxInt(u32)); - } - - const current_len = @intCast(u32, std.math.max(this.len.fetchAdd(1, .SeqCst), 0)); - const next_len = current_len + 1; - - const current_block = @floatToInt(u32, std.math.floor(@intToFloat(f32, current_len) / block_size)); - const next_block = @floatToInt(u32, std.math.floor(@intToFloat(f32, next_len) / block_size)); - const index = (current_len % block_size); - const next_index = (next_len % block_size); - - if (comptime VerboseQueue) std.debug.print("\n[PUT] {d}, {d} - {d} \n", .{ current_block, index, current_len }); - - const allocated_block = this.allocated_blocks.load(.SeqCst); - const needs_new_block = next_index == 0; - const needs_to_allocate_block = needs_new_block and allocated_block < next_block; - const overflowing = current_block >= block_count; - - if (needs_to_allocate_block) { - defer { - _ = this.allocated_blocks.fetchAdd(1, .SeqCst); - } - var new_list = try this.allocator.create(Block); - if (next_block >= block_count) { - const needs_lock = this.overflow.items.len + 1 >= this.overflow.capacity; - if (needs_lock) { - while (this.overflow_readers.load(.SeqCst) > 0) { - std.atomic.spinLoopHint(); - } - @atomicStore(bool, &this.overflow_write_lock, true, .SeqCst); - } - defer { - if (needs_lock) { - @atomicStore(bool, &this.overflow_write_lock, false, .SeqCst); - } - } - try this.overflow.append(new_list); - } else { - @atomicStore(*Block, &this.blocks[next_block], new_list, .SeqCst); - } - } - - var block_ptr = if (!overflowing) - @atomicLoad(*Block, &this.blocks[current_block], .SeqCst) - else - @atomicLoad(*Block, &this.overflow.items[current_block - block_count], .SeqCst); - - block_ptr[index] = value; - if (current_len < 10) std.Thread.Futex.wake(@ptrCast(*const std.atomic.Atomic(u32), &this.len), std.math.maxInt(u32)); - } - }; -} - -pub fn NewBunQueue(comptime Value: type) type { - return struct { - const KeyType = u32; - const BunQueue = @This(); - const Queue = NewBlockQueue(Value, 64, 48); - allocator: std.mem.Allocator, - queue: Queue, - keys: Keys, - count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - - pub fn init(allocator: std.mem.Allocator) !*BunQueue { - var bun = try allocator.create(BunQueue); - bun.* = BunQueue{ - .allocator = allocator, - .queue = undefined, - .keys = Keys{ - .offset = AtomicOffset.init(Offset.bits(.{ .used = 0, .len = 0 })), - .block_overflow = Keys.OverflowList.init(allocator), - }, - }; - bun.queue.new(allocator); - - bun.keys.blocks[0] = &bun.keys.first_key_list; - return bun; - } - - pub const Keys = struct { - pub const OverflowList = std.ArrayList([*]KeyType); - - blocks: [overflow_size][*]KeyType = undefined, - offset: AtomicOffset, - block_overflow: OverflowList, - block_overflow_lock: bool = false, - first_key_list: [block_size]KeyType = undefined, - write_lock: bool = false, - append_readers: u8 = 0, - append_lock: bool = false, - pending_write: KeyType = 0, - }; - - pub const Offset = packed struct { - used: u16, - len: u16, - - pub const Int = std.meta.Int(.unsigned, @bitSizeOf(@This())); - - pub inline fn bits(this: Offset) Int { - return @bitCast(Int, this); - } - }; - - // Half a page of memory - pub const block_size = 2048 / @sizeOf(KeyType); - // 32 is arbitrary - pub const overflow_size = 32; - - // In one atomic load/store, get the length and offset of the keys - pub const AtomicOffset = std.atomic.Atomic(Offset.Int); - - fn pushList(this: *BunQueue, used: u16) !void { - - // this.keys.mutex.acquire(); - // defer this.keys.mutex.release(); - - var block = try this.allocator.alloc(KeyType, block_size); - - if (used < overflow_size) { - @atomicStore([*]KeyType, &this.keys.blocks[used], block.ptr, .Release); - } else { - const needs_lock = this.keys.block_overflow.items.len + 1 >= this.keys.block_overflow.capacity; - if (needs_lock) { - while (@atomicLoad(u8, &this.keys.append_readers, .SeqCst) > 0) { - std.atomic.spinLoopHint(); - } - @atomicStore(bool, &this.keys.append_lock, true, .SeqCst); - } - defer { - if (needs_lock) @atomicStore(bool, &this.keys.append_lock, false, .SeqCst); - } - try this.keys.block_overflow.append(block.ptr); - } - } - - inline fn contains(this: *BunQueue, key: KeyType) bool { - @fence(.Acquire); - if (@atomicLoad(KeyType, &this.keys.pending_write, .SeqCst) == key) return true; - - var offset = this.getOffset(); - std.debug.assert(&this.keys.first_key_list == this.keys.blocks[0]); - - // Heuristic #1: the first files you import are probably the most common in your app - // e.g. "react" - if (offset.used != 0) { - for (this.keys.first_key_list) |_key| { - if (key == _key) return true; - } - } - - if (offset.used < overflow_size) { - // Heuristic #2: you import files near each other - const block_ptr = @atomicLoad([*]KeyType, &this.keys.blocks[offset.used], .SeqCst); - for (block_ptr[0..offset.len]) |_key| { - if (key == _key) return true; - } - } else { - while (@atomicLoad(bool, &this.keys.append_lock, .SeqCst)) { - std.atomic.spinLoopHint(); - } - _ = @atomicRmw(u8, &this.keys.append_readers, .Add, 1, .SeqCst); - defer { - _ = @atomicRmw(u8, &this.keys.append_readers, .Sub, 1, .SeqCst); - } - const latest = @atomicLoad([*]KeyType, &this.keys.block_overflow.items[offset.used - overflow_size], .SeqCst); - - for (latest[0..offset.len]) |_key| { - if (key == _key) return true; - } - } - - if (offset.used > 0) { - var j: usize = 1; - while (j < std.math.min(overflow_size, offset.used)) : (j += 1) { - const block_ptr = @atomicLoad([*]KeyType, &this.keys.blocks[j], .SeqCst); - for (block_ptr[0..block_size]) |_key| { - if (key == _key) return true; - } - } - - if (offset.used > overflow_size) { - var end = offset.used - overflow_size; - j = 0; - while (j < end) : (j += 1) { - while (@atomicLoad(bool, &this.keys.append_lock, .SeqCst)) { - std.atomic.spinLoopHint(); - } - - _ = @atomicRmw(u8, &this.keys.append_readers, .Add, 1, .SeqCst); - defer { - _ = @atomicRmw(u8, &this.keys.append_readers, .Sub, 1, .SeqCst); - } - - const block = @atomicLoad([*]KeyType, &this.keys.block_overflow.items[j], .SeqCst); - for (block[0..block_size]) |_key| { - if (key == _key) return true; - } - } - } - } - - return @atomicLoad(KeyType, &this.keys.pending_write, .Acquire) == key; - } - - pub inline fn getOffset(this: *BunQueue) Offset { - return @bitCast(Offset, this.keys.offset.load(std.atomic.Ordering.Acquire)); - } - - pub fn hasItem(this: *BunQueue, key: KeyType) bool { - @fence(.SeqCst); - - if (this.contains(key)) return true; - while (@atomicRmw(bool, &this.keys.write_lock, .Xchg, true, .SeqCst)) { - std.atomic.spinLoopHint(); - } - defer assert(@atomicRmw(bool, &this.keys.write_lock, .Xchg, false, .SeqCst)); - - if (@atomicRmw(KeyType, &this.keys.pending_write, .Xchg, key, .SeqCst) == key) return true; - - const offset = this.getOffset(); - - const new_len = (offset.len + 1) % block_size; - const is_new_list = new_len == 0; - const new_offset = Offset{ .used = @intCast(u16, @boolToInt(is_new_list)) + offset.used, .len = new_len }; - - { - var latest_list = if (offset.used < overflow_size) - @atomicLoad([*]KeyType, &this.keys.blocks[offset.used], .SeqCst) - else - @atomicLoad([*]KeyType, &this.keys.block_overflow.items[offset.used - overflow_size], .SeqCst); - - assert(@atomicRmw(KeyType, &latest_list[offset.len], .Xchg, key, .Release) != key); - } - - // We only should need to lock when we're allocating memory - if (is_new_list) { - this.pushList(new_offset.used) catch unreachable; - } - - this.keys.offset.store(new_offset.bits(), .Release); - - return false; - } - - inline fn _writeItem(this: *BunQueue, value: Value) !void { - _ = this.count.fetchAdd(1, .Release); - try this.queue.enqueue(value); - } - - pub fn upsert(this: *BunQueue, key: KeyType, value: Value) !void { - if (!this.hasItem(key)) { - try this._writeItem(value); - } - } - - pub fn upsertWithResult(this: *BunQueue, key: KeyType, value: Value) !bool { - if (!this.hasItem(key)) { - try this._writeItem(value); - return true; - } - - return false; - } - pub inline fn next(this: *BunQueue) ?Value { - return this.queue.get(); - } - }; -} - -test "BunQueue: Single-threaded" { - const BunQueue = NewBunQueue([]const u8); - const hash = Wyhash.hash; - const expect = std.testing.expect; - - var queue = try BunQueue.init(default_allocator); - - var greet = [_]string{ - "hello", "how", "are", "you", - "https://", "ton.local.twitter.com", "/responsive-web-internal/", "sourcemaps", - "/client-web/", "loader.Typeahead.7c3b3805.js.map:", "ERR_BLOCKED_BY_CLIENT", "etch failed loading: POST ", - "ondemand.LottieWeb.08803c45.js", "ondemand.InlinePlayer.4990ef15.js", "ondemand.BranchSdk.bb99d145.js", "ondemand.Dropdown.011d5045.js", - }; - var greeted: [greet.len]bool = undefined; - std.mem.set(bool, &greeted, false); - - for (greet) |ing, i| { - const key = @truncate(u32, hash(0, ing)); - try expect(!queue.contains( - key, - )); - try queue.upsert( - key, - ing, - ); - try expect(queue.hasItem( - key, - )); - try expect(queue.getOffset().len == i + 1); - } - - { - var i: usize = 0; - while (i < greet.len) : (i += 1) { - const item = (queue.next()) orelse return try std.testing.expect(false); - try expect(strings.containsAny(&greet, item)); - const index = strings.indexAny(&greet, item) orelse unreachable; - try expect(!greeted[index]); - greeted[index] = true; - } - i = 0; - while (i < greet.len) : (i += 1) { - try expect(queue.next() == null); - } - i = 0; - while (i < greet.len) : (i += 1) { - try expect(greeted[i]); - } - i = 0; - } - - const end_offset = queue.getOffset().len; - - for (greet) |ing| { - const key = @truncate(u32, hash(0, ing)); - try queue.upsert( - key, - ing, - ); - - try expect(end_offset == queue.getOffset().len); - } -} - -test "BunQueue: Dedupes" { - const BunQueue = NewBunQueue([]const u8); - const hash = Wyhash.hash; - const expect = std.testing.expect; - - var queue = try BunQueue.init(default_allocator); - - var greet = [_]string{ - "uniq1", - "uniq2", - "uniq3", - "uniq4", - "uniq5", - "uniq6", - "uniq7", - "uniq8", - "uniq9", - "uniq10", - "uniq11", - "uniq12", - "uniq13", - "uniq14", - "uniq15", - "uniq16", - "uniq17", - "uniq18", - "uniq19", - "uniq20", - "uniq21", - "uniq22", - "uniq23", - "uniq24", - "uniq25", - "uniq26", - "uniq27", - "uniq28", - "uniq29", - "uniq30", - } ++ [_]string{ "dup20", "dup21", "dup27", "dup2", "dup12", "dup15", "dup4", "dup12", "dup10", "dup7", "dup26", "dup22", "dup1", "dup23", "dup11", "dup8", "dup11", "dup29", "dup28", "dup25", "dup20", "dup2", "dup6", "dup16", "dup22", "dup13", "dup30", "dup9", "dup3", "dup17", "dup14", "dup18", "dup8", "dup3", "dup28", "dup30", "dup24", "dup18", "dup24", "dup5", "dup23", "dup10", "dup13", "dup26", "dup27", "dup29", "dup25", "dup4", "dup19", "dup15", "dup6", "dup17", "dup1", "dup16", "dup19", "dup7", "dup9", "dup21", "dup14", "dup5" }; - var prng = std.rand.DefaultPrng.init(100); - prng.random().shuffle(string, &greet); - var deduped = std.BufSet.init(default_allocator); - var consumed = std.BufSet.init(default_allocator); - - for (greet) |ing| { - const key = @truncate(u32, hash(0, ing)); - - try deduped.insert(ing); - try queue.upsert(key, ing); - } - - while (queue.next()) |i| { - try expect(consumed.contains(i) == false); - try consumed.insert(i); - } - - try std.testing.expectEqual(consumed.count(), deduped.count()); - try expect(deduped.count() > 0); -} - -test "BunQueue: SCMP Threaded" { - const BunQueue = NewBunQueue([]const u8); - const expect = std.testing.expect; - - var _queue = try BunQueue.init(default_allocator); - - var greet = [_]string{ - "uniq1", - "uniq2", - "uniq3", - "uniq4", - "uniq5", - "uniq6", - "uniq7", - "uniq8", - "uniq9", - "uniq10", - "uniq11", - "uniq12", - "uniq13", - "uniq14", - "uniq15", - "uniq16", - "uniq17", - "uniq18", - "uniq19", - "uniq20", - "uniq21", - "uniq22", - "uniq23", - "uniq24", - "uniq25", - "uniq26", - "uniq27", - "uniq28", - "uniq29", - "uniq30", - "uniq31", - "uniq32", - "uniq33", - "uniq34", - "uniq35", - "uniq36", - "uniq37", - "uniq38", - "uniq39", - "uniq40", - "uniq41", - "uniq42", - "uniq43", - "uniq44", - "uniq45", - "uniq46", - "uniq47", - "uniq48", - "uniq49", - "uniq50", - "uniq51", - "uniq52", - "uniq53", - "uniq54", - "uniq55", - "uniq56", - "uniq57", - "uniq58", - "uniq59", - "uniq60", - "uniq61", - "uniq62", - "uniq63", - "uniq64", - "uniq65", - "uniq66", - "uniq67", - "uniq68", - "uniq69", - "uniq70", - "uniq71", - "uniq72", - "uniq73", - "uniq74", - "uniq75", - "uniq76", - "uniq77", - "uniq78", - "uniq79", - "uniq80", - "uniq81", - "uniq82", - "uniq83", - "uniq84", - "uniq85", - "uniq86", - "uniq87", - "uniq88", - "uniq89", - "uniq90", - "uniq91", - "uniq92", - "uniq93", - "uniq94", - "uniq95", - "uniq96", - "uniq97", - "uniq98", - "uniq99", - "uniq100", - "uniq101", - "uniq102", - "uniq103", - "uniq104", - "uniq105", - "uniq106", - "uniq107", - "uniq108", - "uniq109", - "uniq110", - "uniq111", - "uniq112", - "uniq113", - "uniq114", - "uniq115", - "uniq116", - "uniq117", - "uniq118", - "uniq119", - "uniq120", - } ++ [_]string{ "dup1", "dup1", "dup10", "dup10", "dup11", "dup11", "dup12", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup25", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup25", "dup26", "dup26", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup27", "dup27", "dup28", "dup28", "dup29", "dup29", "dup3", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9" }; - var prng = std.rand.DefaultPrng.init(100).random(); - prng.shuffle(string, &greet); - var in = try default_allocator.create(std.BufSet); - in.* = std.BufSet.init(default_allocator); - for (greet) |i| { - try in.insert(i); - try _queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, i)), i); - } - - const Worker = struct { - index: u8 = 0, - - pub fn run(queue: *BunQueue, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void { - defer wg.done(); - // const tasks = more_work[num]; - // var remain = tasks; - while (queue.next()) |cur| { - mut.acquire(); - defer mut.release(); - try dedup_list.insert(cur); - } - } - }; - - var out = try default_allocator.create(std.BufSet); - out.* = std.BufSet.init(default_allocator); - - var waitgroup = try default_allocator.create(WaitGroup); - waitgroup.* = WaitGroup.init(); - - var worker1 = try default_allocator.create(Worker); - worker1.* = Worker{}; - var worker2 = try default_allocator.create(Worker); - worker2.* = Worker{}; - waitgroup.add(); - waitgroup.add(); - var mutex = try default_allocator.create(Mutex); - mutex.* = Mutex{}; - - var thread1 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, out, waitgroup, mutex }); - var thread2 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, out, waitgroup, mutex }); - - waitgroup.wait(); - thread1.join(); - thread2.join(); - - try std.testing.expectEqual(out.count(), in.count()); - var iter = in.hash_map.iterator(); - - while (iter.next()) |entry| { - try expect(in.contains(entry.key_ptr.*)); - } -} - -test "BunQueue: MPMC Threaded" { - const BunQueue = NewBunQueue([]const u8); - const expect = std.testing.expect; - var _queue = try BunQueue.init(default_allocator); - - var in = try default_allocator.create(std.BufSet); - in.* = std.BufSet.init(default_allocator); - - const Worker = struct { - index: u8 = 0, - const WorkerCount = 2; - const lodash_all = shuffle(@TypeOf(@import("./test/project.zig").lodash), @import("./test/project.zig").lodash); - const lodash1 = lodash_all[0 .. lodash_all.len / 3]; - const lodash2 = lodash_all[lodash1.len..][0 .. lodash_all.len / 3]; - const lodash3 = lodash_all[lodash1.len + lodash2.len ..]; - - pub fn shuffle(comptime Type: type, comptime val: Type) Type { - var copy = val; - @setEvalBranchQuota(99999); - var rand = std.rand.DefaultPrng.init(100); - rand.random().shuffle(string, ©); - return copy; - } - const three_all = shuffle(@TypeOf(@import("./test/project.zig").three), @import("./test/project.zig").three); - const three1 = three_all[0 .. three_all.len / 3]; - const three2 = three_all[three1.len..][0 .. three_all.len / 3]; - const three3 = three_all[three1.len + three2.len ..]; - - fn run1(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void { - defer wg.done(); - const tasks = switch (num) { - 0 => lodash1, - 1 => lodash2, - 2 => lodash3, - 3 => three1, - 4 => three2, - 5 => three3, - else => unreachable, - }; - - var remain = tasks; - try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]); - remain = tasks[1..]; - loop: while (true) { - while (queue.next()) |cur| { - mut.acquire(); - defer mut.release(); - try expect(!dedup_list.contains(cur)); - try dedup_list.insert(cur); - } - - if (remain.len > 0) { - try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]); - remain = remain[1..]; - var j: usize = 0; - while (j < 10000) : (j += 1) {} - continue :loop; - } - - break :loop; - } - } - - pub fn run(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void { - try run1(queue, num, dedup_list, wg, mut); - } - }; - - var greet = [_]string{ - "uniq1", - "uniq2", - "uniq3", - "uniq4", - "uniq5", - "uniq6", - "uniq7", - "uniq8", - "uniq9", - "uniq10", - "uniq11", - "uniq12", - "uniq13", - "uniq14", - "uniq15", - "uniq16", - "uniq17", - "uniq18", - "uniq19", - "uniq20", - "uniq21", - "uniq22", - "uniq23", - "uniq24", - "uniq25", - "uniq26", - "uniq27", - "uniq28", - "uniq29", - "uniq30", - } ++ [_]string{ "dup1", "dup1", "dup10", "dup10", "dup11", "dup11", "dup12", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup25", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup23", "dup23", "dup12", "dup13", "dup13", "dup14", "dup14", "dup15", "dup15", "dup16", "dup16", "dup17", "dup17", "dup18", "dup18", "dup19", "dup19", "dup2", "dup2", "dup20", "dup20", "dup21", "dup21", "dup22", "dup22", "dup23", "dup23", "dup24", "dup24", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup25", "dup26", "dup26", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9", "dup27", "dup27", "dup28", "dup28", "dup29", "dup29", "dup3", "dup3", "dup30", "dup30", "dup4", "dup4", "dup5", "dup5", "dup6", "dup6", "dup7", "dup7", "dup8", "dup8", "dup9", "dup9" }; - - for (greet) |a| { - try in.insert(a); - try _queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, a)), a); - } - - for (Worker.lodash_all) |a| { - try in.insert(a); - } - - for (Worker.three_all) |a| { - try in.insert(a); - } - - var out = try default_allocator.create(std.BufSet); - out.* = std.BufSet.init(default_allocator); - - var waitgroup = try default_allocator.create(WaitGroup); - waitgroup.* = WaitGroup.init(); - - waitgroup.add(); - waitgroup.add(); - waitgroup.add(); - waitgroup.add(); - waitgroup.add(); - waitgroup.add(); - var mutex = try default_allocator.create(Mutex); - mutex.* = Mutex{}; - - var thread1 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 0, out, waitgroup, mutex }); - var thread2 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 1, out, waitgroup, mutex }); - var thread3 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 2, out, waitgroup, mutex }); - var thread4 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 3, out, waitgroup, mutex }); - var thread5 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 4, out, waitgroup, mutex }); - var thread6 = try std.Thread.spawn(.{}, Worker.run, .{ _queue, 5, out, waitgroup, mutex }); - - waitgroup.wait(); - thread1.join(); - thread2.join(); - thread3.join(); - thread4.join(); - thread5.join(); - thread6.join(); - - try std.testing.expectEqual(out.count(), in.count()); - var iter = in.hash_map.iterator(); - - while (iter.next()) |entry| { - try expect(out.contains(entry.key_ptr.*)); - } -} |