aboutsummaryrefslogtreecommitdiff
path: root/src/bun_queue.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun_queue.zig')
-rw-r--r--src/bun_queue.zig62
1 files changed, 22 insertions, 40 deletions
diff --git a/src/bun_queue.zig b/src/bun_queue.zig
index e602adc6d..fc3edef52 100644
--- a/src/bun_queue.zig
+++ b/src/bun_queue.zig
@@ -1,7 +1,16 @@
const std = @import("std");
const Mutex = @import("./lock.zig").Mutex;
const WaitGroup = @import("./sync.zig").WaitGroup;
-usingnamespace @import("./global.zig");
+const _global = @import("./global.zig");
+const string = _global.string;
+const Output = _global.Output;
+const Global = _global.Global;
+const Environment = _global.Environment;
+const strings = _global.strings;
+const MutableString = _global.MutableString;
+const stringZ = _global.stringZ;
+const default_allocator = _global.default_allocator;
+const C = _global.C;
const Wyhash = std.hash.Wyhash;
const assert = std.debug.assert;
@@ -21,11 +30,11 @@ pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, co
write_lock: bool = false,
overflow_write_lock: bool = false,
overflow_readers: std.atomic.Atomic(u8) = std.atomic.Atomic(u8).init(0),
- allocator: *std.mem.Allocator,
+ allocator: std.mem.Allocator,
empty_queue: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(1),
rand: std.rand.DefaultPrng = std.rand.DefaultPrng.init(100),
- pub fn new(this: *BlockQueue, allocator: *std.mem.Allocator) void {
+ pub fn new(this: *BlockQueue, allocator: std.mem.Allocator) void {
this.* = BlockQueue{
.allocator = allocator,
.overflow = std.ArrayList(*Block).init(allocator),
@@ -38,8 +47,10 @@ pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, co
pub fn get(this: *BlockQueue) ?Value {
if (this.len.fetchMax(-1, .SeqCst) <= 0) return null;
+ const rand = this.rand.random();
+
while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) {
- const end = this.rand.random.uintAtMost(u8, 64);
+ const end = rand.uintAtMost(u8, 64);
var i: u8 = 0;
while (i < end) : (i += 1) {}
std.atomic.spinLoopHint();
@@ -68,17 +79,14 @@ pub fn NewBlockQueue(comptime Value: type, comptime block_size: comptime_int, co
const ptr = @atomicLoad(*Block, &this.blocks[current_block], .SeqCst);
return ptr[index];
},
- else => {
- const is_overflowing = current_block > block_count;
-
- unreachable;
- },
+ else => unreachable,
}
}
pub fn enqueue(this: *BlockQueue, value: Value) !void {
+ const rand = this.rand.random();
while (@atomicRmw(bool, &this.write_lock, .Xchg, true, .SeqCst)) {
- const end = this.rand.random.uintAtMost(u8, 32);
+ const end = rand.uintAtMost(u8, 32);
var i: u8 = 0;
while (i < end) : (i += 1) {}
std.atomic.spinLoopHint();
@@ -144,12 +152,12 @@ pub fn NewBunQueue(comptime Value: type) type {
const KeyType = u32;
const BunQueue = @This();
const Queue = NewBlockQueue(Value, 64, 48);
- allocator: *std.mem.Allocator,
+ allocator: std.mem.Allocator,
queue: Queue,
keys: Keys,
count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- pub fn init(allocator: *std.mem.Allocator) !*BunQueue {
+ pub fn init(allocator: std.mem.Allocator) !*BunQueue {
var bun = try allocator.create(BunQueue);
bun.* = BunQueue{
.allocator = allocator,
@@ -409,7 +417,7 @@ test "BunQueue: Single-threaded" {
const end_offset = queue.getOffset().len;
- for (greet) |ing, i| {
+ for (greet) |ing| {
const key = @truncate(u32, hash(0, ing));
try queue.upsert(
key,
@@ -464,10 +472,9 @@ test "BunQueue: Dedupes" {
var deduped = std.BufSet.init(default_allocator);
var consumed = std.BufSet.init(default_allocator);
- for (greet) |ing, i| {
+ for (greet) |ing| {
const key = @truncate(u32, hash(0, ing));
- const is_new = !deduped.contains(ing);
try deduped.insert(ing);
try queue.upsert(key, ing);
}
@@ -631,31 +638,6 @@ test "BunQueue: SCMP Threaded" {
try dedup_list.insert(cur);
}
}
-
- pub fn run1(queue: *BunQueue, num: u8, dedup_list: *std.BufSet, wg: *WaitGroup, mut: *Mutex) !void {
- defer wg.done();
- const tasks = more_work[num];
- var remain = tasks;
- try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]);
- remain = tasks[1..];
- loop: while (true) {
- while (queue.next()) |cur| {
- mut.acquire();
- try dedup_list.insert(cur);
- mut.release();
- }
-
- if (remain.len > 0) {
- try queue.upsert(@truncate(u32, std.hash.Wyhash.hash(0, remain[0])), remain[0]);
- remain = tasks[1..];
- var j: usize = 0;
- while (j < 1000) : (j += 1) {}
- continue :loop;
- }
-
- break :loop;
- }
- }
};
var out = try default_allocator.create(std.BufSet);