aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/analytics/analytics_thread.zig2
m---------src/bun.js/WebKit0
-rw-r--r--src/bun.js/api/bun.zig2
-rw-r--r--src/bun.js/event_loop.zig2
-rw-r--r--src/bun.js/webcore/response.zig3
-rw-r--r--src/bun.js/webcore/streams.zig2
-rw-r--r--src/bun_js.zig2
-rw-r--r--src/cli/create_command.zig2
-rw-r--r--src/http_client_async.zig37
-rw-r--r--src/install/install.zig49
-rw-r--r--src/io/io_linux.zig74
-rw-r--r--src/network_thread.zig110
-rw-r--r--src/output.zig2
-rw-r--r--src/thread_pool.zig28
-rw-r--r--src/watcher.zig2
15 files changed, 260 insertions, 57 deletions
diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig
index c01189c7c..82f435025 100644
--- a/src/analytics/analytics_thread.zig
+++ b/src/analytics/analytics_thread.zig
@@ -389,7 +389,7 @@ var out_buffer: MutableString = undefined;
var event_list: EventList = undefined;
fn readloop() anyerror!void {
defer disabled = true;
- Output.Source.configureNamedThread(thread, "Analytics");
+ Output.Source.configureNamedThread("Analytics");
defer Output.flush();
event_list = EventList.init();
diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit
-Subproject e773c96d39675e9b190c26087d671c933e999c5
+Subproject 7e7774dabf1c2d94fe3604defb3c54a4c989c3b
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index d1260b14b..45e7cb1a1 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -2237,7 +2237,7 @@ pub const Timer = struct {
pub fn run(this: *Timeout, _task: *TimeoutTask) void {
this.io_task = _task;
- NetworkThread.global.pool.io.?.timeout(
+ NetworkThread.global.io.timeout(
*Timeout,
this,
onCallback,
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index baf96937c..132572422 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -177,7 +177,7 @@ pub fn IOTask(comptime Context: type) type {
pub fn schedule(this: *This) void {
NetworkThread.init() catch return;
- NetworkThread.global.pool.schedule(NetworkThread.Batch.from(&this.task));
+ NetworkThread.global.schedule(NetworkThread.Batch.from(&this.task));
}
pub fn onFinish(this: *This) void {
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index a48ca0a2e..d37c36e7c 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -826,13 +826,14 @@ pub const Fetch = struct {
timeout: usize,
request_body_store: ?*Blob.Store,
) !*FetchTasklet.Pool.Node {
+ try NetworkThread.init();
var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store);
node.data.global_this = global;
node.data.http.callback = callback;
var batch = NetworkThread.Batch{};
node.data.http.schedule(allocator, &batch);
- NetworkThread.global.pool.schedule(batch);
+ NetworkThread.global.schedule(batch);
VirtualMachine.vm.active_tasks +|= 1;
return node;
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index de9115666..82f27bdb5 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -2491,7 +2491,7 @@ pub const FileBlobLoader = struct {
std.debug.assert(this.started);
NetworkThread.init() catch {};
this.concurrent.chunk_size = chunk_size;
- NetworkThread.global.pool.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 });
+ NetworkThread.global.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 });
}
const default_fifo_chunk_size = 1024;
diff --git a/src/bun_js.zig b/src/bun_js.zig
index 896b25627..3683ea747 100644
--- a/src/bun_js.zig
+++ b/src/bun_js.zig
@@ -141,7 +141,7 @@ pub const Run = struct {
i +%= 1;
if (i > 0 and i % 100 == 0) {
- std.time.sleep(std.time.ns_per_us);
+ this.vm.global.vm().runGC(true);
}
}
diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig
index d0760bd07..a0017dd34 100644
--- a/src/cli/create_command.zig
+++ b/src/cli/create_command.zig
@@ -2149,7 +2149,7 @@ const GitHandler = struct {
PATH: string,
verbose: bool,
) void {
- Output.Source.configureNamedThread(thread, "git");
+ Output.Source.configureNamedThread("git");
defer Output.flush();
const outcome = if (verbose)
run(destination, PATH, true) catch false
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index f24f27f81..b3a06a721 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -38,10 +38,15 @@ pub var default_arena: Arena = undefined;
const log = Output.scoped(.fetch, true);
pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque {
+ onThreadStartNew(0);
+ return null;
+}
+
+pub fn onThreadStartNew(event_fd: os.fd_t) void {
default_arena = Arena.init() catch unreachable;
default_allocator = default_arena.allocator();
NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(default_allocator);
- AsyncIO.global = AsyncIO.init(1024, 0) catch |err| {
+ AsyncIO.global = AsyncIO.init(1024, 0, event_fd) catch |err| {
log: {
if (comptime Environment.isLinux) {
if (err == error.SystemOutdated) {
@@ -105,10 +110,17 @@ pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque {
};
AsyncIO.global_loaded = true;
- NetworkThread.global.pool.io = &AsyncIO.global;
- Global.setThreadName("HTTP");
+ NetworkThread.global.io = &AsyncIO.global;
+ if (comptime !Environment.isLinux) {
+ NetworkThread.global.pool.io = &AsyncIO.global;
+ }
+
+ Output.Source.configureNamedThread("HTTP");
AsyncBIO.initBoringSSL();
- return null;
+
+ if (comptime Environment.isLinux) {
+ NetworkThread.global.processEvents();
+ }
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -128,7 +140,7 @@ else
pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC;
-pub const extremely_verbose = Environment.isDebug;
+pub const extremely_verbose = false;
fn writeRequest(
comptime Writer: type,
@@ -471,7 +483,7 @@ pub const AsyncHTTP = struct {
var batch = NetworkThread.Batch{};
this.schedule(bun.default_allocator, &batch);
- NetworkThread.global.pool.schedule(batch);
+ NetworkThread.global.schedule(batch);
while (true) {
var data = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
var async_http: *AsyncHTTP = data.channel.readItem() catch unreachable;
@@ -509,17 +521,17 @@ pub const AsyncHTTP = struct {
pub fn do(sender: *HTTPSender, this: *AsyncHTTP) void {
defer {
- NetworkThread.global.pool.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 });
+ NetworkThread.global.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 });
}
outer: {
this.err = null;
this.state.store(.sending, .Monotonic);
- var timer = std.time.Timer.start() catch @panic("Timer failure");
- defer this.elapsed = timer.read();
+ const start = NetworkThread.global.timer.read();
+ defer this.elapsed = NetworkThread.global.timer.read() -| start;
- this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| {
+ this.response = this.client.send(this.request_body.list.items, this.response_buffer) catch |err| {
this.state.store(.fail, .Monotonic);
this.err = err;
@@ -527,7 +539,7 @@ pub const AsyncHTTP = struct {
this.retries_count += 1;
this.response_buffer.reset();
- NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&this.task));
+ NetworkThread.global.schedule(ThreadPool.Batch.from(&this.task));
return;
}
break :outer;
@@ -654,7 +666,7 @@ pub fn connect(
connector: ConnectType,
) !void {
const port = this.url.getPortAuto();
-
+ if (this.verbose) Output.prettyErrorln("<d>[HTTP]<r> Connecting to {s}:{d}", .{ this.url.href, port });
try connector.connect(this.url.hostname, port);
std.debug.assert(this.socket.socket.socket > 0);
var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) };
@@ -741,6 +753,7 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin
}
try writeRequest(@TypeOf(socket), socket, request, body);
+
_ = try socket.send();
this.stage = Stage.response;
if (this.progress_node == null) {
diff --git a/src/install/install.zig b/src/install/install.zig
index 250d325c2..a1523b076 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -50,6 +50,7 @@ const z_allocator = @import("../memory_allocator.zig").z_allocator;
const Syscall = @import("javascript_core").Node.Syscall;
const RunCommand = @import("../cli/run_command.zig").RunCommand;
threadlocal var initialized_store = false;
+const Futex = @import("../futex.zig");
pub const Lockfile = @import("./lockfile.zig");
@@ -183,6 +184,7 @@ const NetworkTask = struct {
},
pub fn notify(http: *AsyncHTTP) void {
+ defer PackageManager.instance.wake();
PackageManager.instance.network_channel.writeItem(@fieldParentPtr(NetworkTask, "http", http)) catch {};
}
@@ -515,6 +517,8 @@ const Task = struct {
var this = @fieldParentPtr(Task, "threadpool_task", task);
+ defer PackageManager.instance.wake();
+
switch (this.tag) {
.package_manifest => {
var allocator = PackageManager.instance.allocator;
@@ -1468,6 +1472,9 @@ pub const PackageManager = struct {
global_dir: ?std.fs.Dir = null,
global_link_dir_path: string = "",
+ sleepy: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+ sleep_delay_counter: u32 = 0,
+
const PreallocatedNetworkTasks = std.BoundedArray(NetworkTask, 1024);
const NetworkTaskQueue = std.HashMapUnmanaged(u64, void, IdentityContext(u64), 80);
const PackageIndex = std.AutoHashMapUnmanaged(u64, *Package);
@@ -1480,6 +1487,14 @@ pub const PackageManager = struct {
80,
);
+ pub fn wake(this: *PackageManager) void {
+ Futex.wake(&this.sleepy, 1);
+ }
+
+ pub fn sleep(this: *PackageManager) void {
+ Futex.wait(&this.sleepy, 1, std.time.ns_per_ms * 16) catch {};
+ }
+
pub fn globalLinkDir(this: *PackageManager) !std.fs.Dir {
return this.global_link_dir orelse brk: {
var global_dir = try Options.openGlobalDir(this.options.explicit_global_directory);
@@ -2409,7 +2424,7 @@ pub const PackageManager = struct {
manager.pending_tasks += @truncate(u32, count);
manager.total_tasks += @truncate(u32, count);
manager.network_resolve_batch.push(manager.network_tarball_batch);
- NetworkThread.global.pool.schedule(manager.network_resolve_batch);
+ NetworkThread.global.schedule(manager.network_resolve_batch);
manager.network_tarball_batch = .{};
manager.network_resolve_batch = .{};
return count;
@@ -2448,7 +2463,7 @@ pub const PackageManager = struct {
this.pending_tasks += @truncate(u32, count);
this.total_tasks += @truncate(u32, count);
this.network_resolve_batch.push(this.network_tarball_batch);
- NetworkThread.global.pool.schedule(this.network_resolve_batch);
+ NetworkThread.global.schedule(this.network_resolve_batch);
this.network_tarball_batch = .{};
this.network_resolve_batch = .{};
}
@@ -2489,7 +2504,10 @@ pub const PackageManager = struct {
) anyerror!void {
var batch = ThreadPool.Batch{};
var has_updated_this_run = false;
+ var maybe_sleep = true;
+
while (manager.network_channel.tryReadItem() catch null) |task_| {
+ maybe_sleep = false;
var task: *NetworkTask = task_;
manager.pending_tasks -|= 1;
@@ -2704,6 +2722,7 @@ pub const PackageManager = struct {
}
while (manager.resolve_tasks.tryReadItem() catch null) |task_| {
+ maybe_sleep = false;
manager.pending_tasks -= 1;
var task: Task = task_;
@@ -2812,7 +2831,7 @@ pub const PackageManager = struct {
manager.total_tasks += @truncate(u32, count);
manager.thread_pool.schedule(batch);
manager.network_resolve_batch.push(manager.network_tarball_batch);
- NetworkThread.global.pool.schedule(manager.network_resolve_batch);
+ NetworkThread.global.schedule(manager.network_resolve_batch);
manager.network_tarball_batch = .{};
manager.network_resolve_batch = .{};
@@ -2829,6 +2848,16 @@ pub const PackageManager = struct {
manager.progress.maybeRefresh();
}
}
+
+ manager.sleep_delay_counter = if (maybe_sleep)
+ manager.sleep_delay_counter + 1
+ else
+ 0;
+
+ if (manager.sleep_delay_counter >= 5) {
+ manager.sleep_delay_counter = 0;
+ manager.sleep();
+ }
}
pub const Options = struct {
@@ -5191,9 +5220,6 @@ pub const PackageManager = struct {
const cwd = std.fs.cwd();
- // sleep goes off, only need to set it once because it will have an impact on the next network request
- NetworkThread.global.pool.sleep_on_idle_network_thread = false;
-
while (iterator.nextNodeModulesFolder()) |node_modules| {
try cwd.makePath(std.mem.span(node_modules.relative_path));
// We deliberately do not close this folder.
@@ -5360,7 +5386,6 @@ pub const PackageManager = struct {
comptime log_level: Options.LogLevel,
) !void {
// sleep off for maximum network throughput
- NetworkThread.global.pool.sleep_on_idle_network_thread = false;
var load_lockfile_result: Lockfile.LoadFromDiskResult = if (manager.options.do.load_lockfile)
manager.lockfile.loadFromDisk(
@@ -5617,8 +5642,13 @@ pub const PackageManager = struct {
Output.flush();
}
- while (manager.pending_tasks > 0) {
- try manager.runTasks(void, void{}, null, log_level);
+ {
+ manager.sleepy.store(1, .Monotonic);
+ defer manager.sleepy.store(0, .Monotonic);
+
+ while (manager.pending_tasks > 0) {
+ try manager.runTasks(void, void{}, null, log_level);
+ }
}
if (comptime log_level.showProgress()) {
@@ -5645,7 +5675,6 @@ pub const PackageManager = struct {
}
// sleep on since we might not need it anymore
- NetworkThread.global.pool.sleep_on_idle_network_thread = true;
const needs_clean_lockfile = had_any_diffs or needs_new_lockfile or manager.package_json_updates.len > 0;
var did_meta_hash_change = needs_clean_lockfile;
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
index b4f21dee5..d7a164665 100644
--- a/src/io/io_linux.zig
+++ b/src/io/io_linux.zig
@@ -448,8 +448,6 @@ const IO = @This();
ring: IO_Uring,
-pending_count: usize = 0,
-
/// Operations not yet submitted to the kernel and waiting on available space in the
/// submission queue.
unqueued: FIFO(Completion) = .{},
@@ -458,12 +456,49 @@ unqueued: FIFO(Completion) = .{},
completed: FIFO(Completion) = .{},
next_tick: FIFO(Completion) = .{},
+event_fd: linux.fd_t = 0,
+
+eventfd_buf: [16]u8 = undefined,
+has_queued: usize = 0,
+wakeup_completion: Completion = undefined,
+
+fn queueForWakeup(this: *@This(), comptime Type: type, ctx: Type, comptime cb: anytype) void {
+ @memset(&this.eventfd_buf, 0, this.eventfd_buf.len);
+ const Callback = struct {
+ pub fn callback(that: Type, completion: *Completion, _: ReadError!usize) void {
+ var io = @fieldParentPtr(IO, "wakeup_completion", completion);
+ io.has_queued -|= 1;
+ cb(that);
+ }
+ };
+ this.read(
+ Type,
+ ctx,
+ Callback.callback,
+ &this.wakeup_completion,
+ this.event_fd,
+ &this.eventfd_buf,
+ null,
+ );
+ this.has_queued +|= 1;
+}
+
+pub fn wait(this: *@This(), ptr: anytype, comptime onReady: anytype) void {
+ // Subscribe to wakeups
+ if (this.has_queued == 0) {
+ this.queueForWakeup(@TypeOf(ptr), ptr, onReady);
+ }
+
+ this.tick() catch {};
-pub fn hasNoWork(this: *IO) bool {
- return this.pending_count == 0;
+ if (this.has_queued == 0) {
+ return;
+ }
+ const submitted = this.ring.flush_sq();
+ _ = this.ring.enter(submitted, 1, linux.IORING_ENTER_GETEVENTS) catch 0;
}
-pub fn init(entries_: u12, flags: u32) !IO {
+pub fn init(entries_: u12, flags: u32, event_fd: os.fd_t) !IO {
var ring: IO_Uring = undefined;
var entries = entries_;
@@ -480,6 +515,7 @@ pub fn init(entries_: u12, flags: u32) !IO {
}
var limit = linux.rlimit{ .cur = 0, .max = 0 };
+
if (linux.getrlimit(.MEMLOCK, &limit) == 0) {
if (limit.cur < 16 * 1024) {
return error.@"memlock is too low. Please increase it to at least 64k";
@@ -505,7 +541,7 @@ pub fn init(entries_: u12, flags: u32) !IO {
break;
}
- return IO{ .ring = ring };
+ return IO{ .ring = ring, .event_fd = event_fd };
}
pub fn deinit(self: *IO) void {
@@ -542,6 +578,8 @@ pub fn tick(self: *IO) !void {
/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
/// in the timespec struct.
pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
+ assert(nanoseconds > 0);
+
while (self.next_tick.pop()) |completion| {
completion.complete();
}
@@ -600,7 +638,9 @@ fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
}
fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
- var cqes: [256]io_uring_cqe = undefined;
+ var cqes: [256]std.os.linux.io_uring_cqe = undefined;
+ var completion_byttes = std.mem.asBytes(&cqes);
+ @memset(completion_byttes, 0, completion_byttes.len);
var wait_remaining = wait_nr;
while (true) {
// Guard against waiting indefinitely (if there are too few requests inflight),
@@ -1038,7 +1078,6 @@ pub fn accept(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1081,7 +1120,6 @@ pub fn close(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1095,7 +1133,6 @@ pub fn close(
if (features.close_blocking) {
const rc = linux.close(fd);
completion.result = @intCast(i32, rc);
- self.pending_count +|= 1;
self.next_tick.push(completion);
return;
}
@@ -1139,7 +1176,6 @@ pub fn connect(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1191,7 +1227,6 @@ pub fn fsync(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1238,7 +1273,6 @@ pub fn read(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1290,7 +1324,6 @@ pub fn recv(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1326,7 +1359,6 @@ pub fn readev(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1383,7 +1415,6 @@ pub fn send(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1476,7 +1507,6 @@ pub fn open(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1514,7 +1544,6 @@ pub fn writev(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1553,7 +1582,6 @@ pub fn timeout(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1603,7 +1631,6 @@ pub fn write(
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
- comp.io.pending_count -|= 1;
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@@ -1623,7 +1650,10 @@ pub fn write(
}
inline fn enqueueNew(self: *IO, completion: *Completion) void {
- self.pending_count +|= 1;
+ self.enqueue(completion);
+}
+
+pub fn wake(self: *IO, completion: *Completion) void {
self.enqueue(completion);
}
@@ -1658,7 +1688,7 @@ const Syscall = struct {
};
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
- return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC, protocol);
+ return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC | os.SOCK.NONBLOCK, protocol);
}
pub var global: IO = undefined;
diff --git a/src/network_thread.zig b/src/network_thread.zig
index 97ee1cadc..24110b6a3 100644
--- a/src/network_thread.zig
+++ b/src/network_thread.zig
@@ -1,6 +1,7 @@
const ThreadPool = @import("thread_pool");
pub const Batch = ThreadPool.Batch;
pub const Task = ThreadPool.Task;
+const Node = ThreadPool.Node;
pub const Completion = AsyncIO.Completion;
const std = @import("std");
pub const AsyncIO = @import("io");
@@ -8,13 +9,110 @@ const Output = @import("./global.zig").Output;
const IdentityContext = @import("./identity_context.zig").IdentityContext;
const HTTP = @import("./http_client_async.zig");
const NetworkThread = @This();
+const Environment = @import("./global.zig").Environment;
+const Lock = @import("./lock.zig").Lock;
+const FIFO = @import("./io/fifo.zig").FIFO;
/// Single-thread in this pool
pool: ThreadPool,
+io: *AsyncIO = undefined,
+thread: std.Thread = undefined,
+event_fd: std.os.fd_t = 0,
+queued_tasks_mutex: Lock = Lock.init(),
+queued_tasks: Batch = .{},
+head: ?*Node = null,
+tail: ?*Node = null,
+timer: std.time.Timer = undefined,
pub var global: NetworkThread = undefined;
pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0);
+const log = Output.scoped(.NetworkThread, true);
+
+fn queueEvents(this: *@This()) void {
+ this.queued_tasks_mutex.lock();
+ defer this.queued_tasks_mutex.unlock();
+ if (this.queued_tasks.len == 0)
+ return;
+ log("Received {d} tasks\n", .{this.queued_tasks.len});
+ if (this.tail) |tail| {
+ std.debug.assert(tail.next == null);
+ tail.next = &this.queued_tasks.head.?.node;
+ this.tail = &this.queued_tasks.tail.?.node;
+ } else {
+ this.head = &this.queued_tasks.head.?.node;
+ this.tail = &this.queued_tasks.tail.?.node;
+ }
+ this.queued_tasks = .{};
+}
+
+pub fn processEvents(this: *@This()) void {
+ processEvents_(this) catch {};
+ unreachable;
+}
+/// Should only be called on the HTTP thread!
+fn processEvents_(this: *@This()) !void {
+ {
+ var bytes: [8]u8 = undefined;
+ _ = std.os.read(this.event_fd, &bytes) catch 0;
+ }
+
+ while (true) {
+ this.queueEvents();
+
+ var count: usize = 0;
+
+ while (this.head) |node| {
+ if (node == this.tail) {
+ this.tail = null;
+ }
+ this.head = node.next;
+ node.next = null;
+ var task = @fieldParentPtr(Task, "node", node);
+ var callback = task.callback;
+ callback(task);
+ if (comptime Environment.allow_assert) {
+ count += 1;
+ }
+ }
+
+ if (comptime Environment.allow_assert) {
+ if (count > 0)
+ log("Processed {d} tasks\n", .{count});
+ }
+
+ var start: i128 = 0;
+ if (comptime Environment.isDebug) {
+ start = std.time.nanoTimestamp();
+ }
+ Output.flush();
+ this.io.wait(this, queueEvents);
+ if (comptime Environment.isDebug) {
+ var end = std.time.nanoTimestamp();
+ log("Waited {any}\n", .{std.fmt.fmtDurationSigned(@truncate(i64, end - start))});
+ Output.flush();
+ }
+ }
+}
+
+pub fn schedule(this: *@This(), batch: Batch) void {
+ if (comptime Environment.isLinux) {
+ if (batch.len == 0)
+ return;
+
+ {
+ this.queued_tasks_mutex.lock();
+ defer this.queued_tasks_mutex.unlock();
+ this.queued_tasks.push(batch);
+ }
+
+ const one = @bitCast([8]u8, @as(usize, batch.len));
+ _ = std.os.write(this.event_fd, &one) catch @panic("Failed to write to eventfd");
+ } else {
+ this.pool.schedule(batch);
+ }
+}
+
const CachedAddressList = struct {
address_list: *std.net.AddressList,
expire_after: u64,
@@ -67,7 +165,8 @@ pub fn warmup() !void {
if (has_warmed or global_loaded.load(.Monotonic) > 0) return;
has_warmed = true;
try init();
- global.pool.forceSpawn();
+ if (comptime !Environment.isLinux)
+ global.pool.forceSpawn();
}
pub fn init() !void {
@@ -76,6 +175,15 @@ pub fn init() !void {
global = NetworkThread{
.pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }),
+ .timer = try std.time.Timer.start(),
};
global.pool.on_thread_spawn = HTTP.onThreadStart;
+ if (comptime Environment.isLinux) {
+ const event_fd = try std.os.eventfd(0, std.os.linux.EFD.CLOEXEC | 0);
+ global.event_fd = event_fd;
+ global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{
+ @intCast(std.os.fd_t, event_fd),
+ });
+ global.thread.detach();
+ }
}
diff --git a/src/output.zig b/src/output.zig
index 4712c97f1..6ae8ae4ff 100644
--- a/src/output.zig
+++ b/src/output.zig
@@ -88,7 +88,7 @@ pub const Source = struct {
source = Source.init(stdout_stream, stderr_stream);
}
- pub fn configureNamedThread(_: std.Thread, name: StringTypes.stringZ) void {
+ pub fn configureNamedThread(name: StringTypes.stringZ) void {
Global.setThreadName(name);
configureThread();
}
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index 03fe6f211..8839d2090 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -84,7 +84,26 @@ pub const Batch = struct {
head: ?*Task = null,
tail: ?*Task = null,
- /// Create a batch from a single task.
+ pub fn pop(this: *Batch) ?*Task {
+ const len = @atomicLoad(usize, &this.len, .Monotonic);
+ if (len == 0) {
+ return null;
+ }
+ var task = this.head.?;
+ if (task.node.next) |node| {
+ this.head = @fieldParentPtr(Task, "node", node);
+ } else {
+ this.head = null;
+ }
+
+ this.len -= 1;
+ if (len == 0) {
+ this.tail = null;
+ }
+ return task;
+ }
+
+ /// Create a batch from a single task.
pub fn from(task: *Task) Batch {
return Batch{
.len = 1,
@@ -276,6 +295,9 @@ fn _wait(self: *ThreadPool, _is_waking: bool, comptime sleep_on_idle: bool) erro
}
} else {
if (self.io) |io| {
+ if (comptime Environment.isLinux)
+ unreachable;
+
const HTTP = @import("http");
io.tick() catch {};
@@ -483,7 +505,7 @@ pub const Thread = struct {
};
/// An event which stores 1 semaphore token and is multi-threaded safe.
-/// The event can be shutdown(), waking up all wait()ing threads and
+/// The event can be shutdown(), waking up all wait()ing threads and
/// making subsequent wait()'s return immediately.
const Event = struct {
state: Atomic(u32) = Atomic(u32).init(EMPTY),
@@ -621,7 +643,7 @@ const Event = struct {
};
/// Linked list intrusive memory node and lock-free data structures to operate with it
-const Node = struct {
+pub const Node = struct {
next: ?*Node = null,
/// A linked list of Nodes
diff --git a/src/watcher.zig b/src/watcher.zig
index c1b371217..138edca44 100644
--- a/src/watcher.zig
+++ b/src/watcher.zig
@@ -348,7 +348,7 @@ pub fn NewWatcher(comptime ContextType: type) type {
// This must only be called from the watcher thread
pub fn watchLoop(this: *Watcher) !void {
this.watchloop_handle = std.Thread.getCurrentId();
- Output.Source.configureNamedThread(this.thread, "File Watcher");
+ Output.Source.configureNamedThread("File Watcher");
defer Output.flush();
if (FeatureFlags.verbose_watcher) Output.prettyln("Watcher started", .{});