diff options
Diffstat (limited to 'src/bun_queue.zig')
-rw-r--r-- | src/bun_queue.zig | 62 |
1 files changed, 22 insertions, 40 deletions
diff --git a/src/bun_queue.zig b/src/bun_queue.zig index e602adc6d..fc3edef52 100644 --- a/src/bun_queue.zig +++ b/src/bun_queue.zig @@ -1,7 +1,16 @@ const std = @import("std"); const Mutex = @import("./lock.zig").Mutex; const WaitGroup = @import("./sync.zig").WaitGroup; -usingnamespace @import("./global.zig"); +const _global = @import("./global.zig"); +const string = _global.string; +const Output = _global.Output; +const Global = _global.Global; +const Environment = _global.Environment; +const strings = _global.strings; +const MutableString = _global.MutableString; +const stringZ = _global.stringZ; +const default_allocator = _global.default_allocator; +const C = _global.C; const Wyhash = std.hash.Wyhash; const assert = std.debug.assert; @@ -21,11 +30,11 @@ pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, co write_lock: bool = false, overflow_write_lock: bool = false, overflow_readers: std.atomic.Atomic(u8) = std.atomic.Atomic(u8).init(0), - allocator: *std.mem.Allocator, + allocator: std.mem.Allocator, empty_queue: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(1), rand: std.rand.DefaultPrng = std.rand.DefaultPrng.init(100), - pub fn new(this: *BlockQueue, allocator: *std.mem.Allocator) void { + pub fn new(this: *BlockQueue, allocator: std.mem.Allocator) void { this.* = BlockQueue{ .allocator = allocator, .overflow = std.ArrayList(*Block).init(allocator), @@ -38,8 +47,10 @@ pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, co pub fn get(this: *BlockQueue) ?Value { if (this.len.fetchMax(-1, .SeqCst) <= 0) return null; + const rand = this.rand.random(); + while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) { - const end = this.rand.random.uintAtMost(u8, 64); + const end = rand.uintAtMost(u8, 64); var i: u8 = 0; while (i < end) : (i += 1) {} std.atomic.spinLoopHint(); @@ -68,17 +79,14 @@ pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, co const ptr = @atomicLoad(*Block, &this.blocks[current_block], .SeqCst); return ptr[index]; }, - else => { - const is_overflowing = current_block > block_count; - - unreachable; - }, + else => unreachable, } } pub fn enqueue(this: *BlockQueue, value: Value) !void { + const rand = this.rand.random(); while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) { - const end = this.rand.random.uintAtMost(u8, 32); + const end = rand.uintAtMost(u8, 32); var i: u8 = 0; while (i < end) : (i += 1) {} std.atomic.spinLoopHint(); @@ -144,12 +152,12 @@ pub fn NewBunQueue(comptime Value: type) type { const KeyType = u32; const BunQueue = @This(); const Queue = NewBlockQueue(Value, 64, 48); - allocator: *std.mem.Allocator, + allocator: std.mem.Allocator, queue: Queue, keys: Keys, count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - pub fn init(allocator: *std.mem.Allocator) !*BunQueue { + pub fn init(allocator: std.mem.Allocator) !*BunQueue { var bun = try allocator.create(BunQueue); bun.* = BunQueue{ .allocator = allocator, @@ -409,7 +417,7 @@ test "BunQueue: Single-threaded" { const end_offset = queue.getOffset().len; - for (greet) |ing, i| { + for (greet) |ing| { const key = @truncate(u32, hash(0, ing)); try queue.upsert( key, @@ -464,10 +472,9 @@ test "BunQueue: Dedupes" { var deduped = std.BufSet.init(default_allocator); var consumed = std.BufSet.init(default_allocator); - for (greet) |ing, i| { + for (greet) |ing| { const key = @truncate(u32, hash(0, ing)); - const is_new = !deduped.contains(ing); try deduped.insert(ing); try queue.upsert(key, ing); } @@ -631,31 +638,6 @@ test "BunQueue: SCMP Threaded" { try dedup_list.insert(cur); } } - - pub fn run1(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void { - defer wg.done(); - const tasks = more_work[num]; - var remain = tasks; - try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]); - remain = tasks[1..]; - loop: while (true) { - while (queue.next()) |cur| { - mut.acquire(); - try dedup_list.insert(cur); - mut.release(); - } - - if (remain.len > 0) { - try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]); - remain = tasks[1..]; - var j: usize = 0; - while (j < 1000) : (j += 1) {} - continue :loop; - } - - break :loop; - } - } }; var out = try default_allocator.create(std.BufSet); |