aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.vscode/launch.json9
-rw-r--r--Makefile11
-rw-r--r--misctools/.gitignore2
-rw-r--r--misctools/fetch.zig2
-rw-r--r--misctools/http_bench.zig250
-rw-r--r--src/http.zig32
-rw-r--r--src/http/http_client_async.zig1310
-rw-r--r--src/http/network_thread.zig25
-rw-r--r--src/http_client.zig21
-rw-r--r--src/io/io_darwin.zig24
-rw-r--r--src/io/io_linux.zig3
-rw-r--r--src/main.zig2
-rw-r--r--src/pool.zig37
-rw-r--r--src/thread_pool.zig20
14 files changed, 1718 insertions, 30 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 002036fce..b4c7735e2 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -4,6 +4,15 @@
{
"type": "lldb",
"request": "launch",
+ "name": "HTTP bench",
+ "program": "${workspaceFolder}/misctools/http_bench",
+ "args": ["http://example.com"],
+ "cwd": "${workspaceFolder}",
+ "console": "internalConsole"
+ },
+ {
+ "type": "lldb",
+ "request": "launch",
"name": "fetch debug",
"program": "${workspaceFolder}/misctools/fetch",
"args": ["https://api.github.com/repos/hanford/trends/tarball"],
diff --git a/Makefile b/Makefile
index f1c5ca68a..f902913c4 100644
--- a/Makefile
+++ b/Makefile
@@ -425,6 +425,17 @@ fetch-debug:
src/deps/picohttpparser.o \
$(LIBCRYPTO_STATIC_LIB)
+
+httpbench-debug:
+ cd misctools; $(ZIG) build-obj ./http_bench.zig -fcompiler-rt -lc --main-pkg-path ../ --pkg-begin io ../$(IO_FILE) --pkg-end
+ $(CXX) ./misctools/http_bench.o -g -o ./misctools/http_bench $(DEFAULT_LINKER_FLAGS) -lc \
+ src/deps/mimalloc/libmimalloc.a \
+ src/deps/zlib/libz.a \
+ src/deps/libarchive.a \
+ src/deps/libs2n.a \
+ src/deps/picohttpparser.o \
+ $(LIBCRYPTO_STATIC_LIB)
+
s2n-mac:
cd $(DEPS_DIR)/s2n-tls; \
make clean; \
diff --git a/misctools/.gitignore b/misctools/.gitignore
index 41cdc1338..bb8850ab6 100644
--- a/misctools/.gitignore
+++ b/misctools/.gitignore
@@ -1,3 +1,5 @@
*.tgz
tgz
readlink-getfd
+readlink-realpath
+http_bench
diff --git a/misctools/fetch.zig b/misctools/fetch.zig
index 59bd3320a..e41906245 100644
--- a/misctools/fetch.zig
+++ b/misctools/fetch.zig
@@ -2,7 +2,7 @@ const std = @import("std");
usingnamespace @import("../src/global.zig");
const clap = @import("../src/deps/zig-clap/clap.zig");
-const HTTPClient = @import("../src/http_client.zig");
+const HTTPClient = @import("../src/http/http_client_async.zig");
const URL = @import("../src/query_string_map.zig").URL;
const Headers = @import("../src/javascript/jsc/webcore/response.zig").Headers;
const Method = @import("../src/http/method.zig").Method;
diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig
new file mode 100644
index 000000000..9100d3614
--- /dev/null
+++ b/misctools/http_bench.zig
@@ -0,0 +1,250 @@
+const std = @import("std");
+usingnamespace @import("../src/global.zig");
+const clap = @import("../src/deps/zig-clap/clap.zig");
+
+const URL = @import("../src/query_string_map.zig").URL;
+const Headers = @import("../src/javascript/jsc/webcore/response.zig").Headers;
+const Method = @import("../src/http/method.zig").Method;
+const ColonListType = @import("../src/cli/colon_list_type.zig").ColonListType;
+const HeadersTuple = ColonListType(string, noop_resolver);
+const path_handler = @import("../src/resolver/resolve_path.zig");
+
+fn noop_resolver(in: string) !string {
+ return in;
+}
+
+const VERSION = "0.0.0";
+
+const params = [_]clap.Param(clap.Help){
+ clap.parseParam("-v, --verbose Show headers & status code") catch unreachable,
+ clap.parseParam("-H, --header <STR>... Add a header") catch unreachable,
+ clap.parseParam("-r, --max-redirects <STR> Maximum number of redirects to follow (default: 128)") catch unreachable,
+ clap.parseParam("-b, --body <STR> HTTP request body as a string") catch unreachable,
+ clap.parseParam("-f, --file <STR> File path to load as body") catch unreachable,
+ clap.parseParam("-n, --count <INT> How many runs? Default 10") catch unreachable,
+ clap.parseParam("-t, --timeout <INT> Max duration per request") catch unreachable,
+ clap.parseParam("-r, --retry <INT> Max retry count") catch unreachable,
+ clap.parseParam("--no-gzip Disable gzip") catch unreachable,
+ clap.parseParam("--no-deflate Disable deflate") catch unreachable,
+ clap.parseParam("--no-compression Disable gzip & deflate") catch unreachable,
+ clap.parseParam("--version Print the version and exit") catch unreachable,
+ clap.parseParam("--turbo Skip sending TLS shutdown signals") catch unreachable,
+ clap.parseParam("<POS>... ") catch unreachable,
+};
+
+const MethodNames = std.ComptimeStringMap(Method, .{
+ .{ "GET", Method.GET },
+ .{ "get", Method.GET },
+
+ .{ "POST", Method.POST },
+ .{ "post", Method.POST },
+
+ .{ "PUT", Method.PUT },
+ .{ "put", Method.PUT },
+
+ .{ "PATCH", Method.PATCH },
+ .{ "patch", Method.PATCH },
+
+ .{ "OPTIONS", Method.OPTIONS },
+ .{ "options", Method.OPTIONS },
+
+ .{ "HEAD", Method.HEAD },
+ .{ "head", Method.HEAD },
+});
+
+var file_path_buf: [std.fs.MAX_PATH_BYTES + 1]u8 = undefined;
+var cwd_buf: [std.fs.MAX_PATH_BYTES + 1]u8 = undefined;
+
+pub const Arguments = struct {
+ url: URL,
+ method: Method,
+ verbose: bool = false,
+ headers: Headers.Entries,
+ headers_buf: string,
+ body: string = "",
+ turbo: bool = false,
+ count: usize = 10,
+ timeout: usize = 0,
+
+ pub fn parse(allocator: *std.mem.Allocator) !Arguments {
+ var diag = clap.Diagnostic{};
+
+ var args = clap.parse(clap.Help, &params, .{
+ .diagnostic = &diag,
+ .allocator = allocator,
+ }) catch |err| {
+ // Report useful error and exit
+ diag.report(Output.errorWriter(), err) catch {};
+ return err;
+ };
+
+ var positionals = args.positionals();
+ var raw_args: std.ArrayListUnmanaged(string) = undefined;
+
+ if (positionals.len > 0) {
+ raw_args = .{ .capacity = positionals.len, .items = @intToPtr([*][]const u8, @ptrToInt(positionals.ptr))[0..positionals.len] };
+ } else {
+ raw_args = .{};
+ }
+
+ if (args.flag("--version")) {
+ try Output.writer().writeAll(VERSION);
+ std.os.exit(0);
+ }
+
+ var method = Method.GET;
+ var url: URL = .{};
+ var body_string: string = args.option("--body") orelse "";
+
+ if (args.option("--file")) |file_path| {
+ if (file_path.len > 0) {
+ var cwd = try std.process.getCwd(&cwd_buf);
+ var parts = [_]string{std.mem.span(file_path)};
+ var absolute_path = path_handler.joinAbsStringBuf(cwd, &file_path_buf, &parts, .auto);
+ file_path_buf[absolute_path.len] = 0;
+ file_path_buf[absolute_path.len + 1] = 0;
+ var absolute_path_len = absolute_path.len;
+ var absolute_path_ = file_path_buf[0..absolute_path_len :0];
+
+ var body_file = std.fs.openFileAbsoluteZ(absolute_path_, .{ .read = true }) catch |err| {
+ Output.printErrorln("<r><red>{s}<r> opening file {s}", .{ @errorName(err), absolute_path });
+ Output.flush();
+ std.os.exit(1);
+ };
+
+ var file_contents = body_file.readToEndAlloc(allocator, try body_file.getEndPos()) catch |err| {
+ Output.printErrorln("<r><red>{s}<r> reading file {s}", .{ @errorName(err), absolute_path });
+ Output.flush();
+ std.os.exit(1);
+ };
+ body_string = file_contents;
+ }
+ }
+
+ {
+ var raw_arg_i: usize = 0;
+ while (raw_arg_i < raw_args.items.len) : (raw_arg_i += 1) {
+ const arg = raw_args.items[raw_arg_i];
+ if (MethodNames.get(std.mem.span(arg))) |method_| {
+ method = method_;
+ _ = raw_args.swapRemove(raw_arg_i);
+ }
+ }
+
+ if (raw_args.items.len == 0) {
+ Output.prettyErrorln("<r><red>error<r><d>:<r> <b>Missing URL<r>\n\nExample:\n<r><b>fetch GET https://example.com<r>\n\n<b>fetch example.com/foo<r>\n\n", .{});
+ Output.flush();
+ std.os.exit(1);
+ }
+
+ const url_position = raw_args.items.len - 1;
+ url = URL.parse(raw_args.swapRemove(url_position));
+ if (!url.isAbsolute()) {
+ Output.prettyErrorln("<r><red>error<r><d>:<r> <b>Invalid URL<r>\n\nExample:\n<r><b>fetch GET https://example.com<r>\n\n<b>fetch example.com/foo<r>\n\n", .{});
+ Output.flush();
+ std.os.exit(1);
+ }
+ }
+
+ return Arguments{
+ .url = url,
+ .method = method,
+ .verbose = args.flag("--verbose"),
+ .headers = .{},
+ .headers_buf = "",
+ .body = body_string,
+ .turbo = args.flag("--turbo"),
+ .timeout = std.fmt.parseInt(usize, args.option("--timeout") orelse "0", 10) catch |err| {
+ Output.prettyErrorln("<r><red>{s}<r> parsing timeout", .{@errorName(err)});
+ Output.flush();
+ std.os.exit(1);
+ },
+ .count = std.fmt.parseInt(usize, args.option("--count") orelse "10", 10) catch |err| {
+ Output.prettyErrorln("<r><red>{s}<r> parsing count", .{@errorName(err)});
+ Output.flush();
+ std.os.exit(1);
+ },
+ };
+ }
+};
+
+const NetworkThread = @import("../src/http/network_thread.zig");
+const HTTP = @import("../src/http/http_client_async.zig");
+
+var stdout_: std.fs.File = undefined;
+var stderr_: std.fs.File = undefined;
+pub fn main() anyerror!void {
+ stdout_ = std.io.getStdOut();
+ stderr_ = std.io.getStdErr();
+ var output_source = Output.Source.init(stdout_, stderr_);
+ Output.Source.set(&output_source);
+
+ defer Output.flush();
+
+ var args = try Arguments.parse(default_allocator);
+
+ var channel = try default_allocator.create(HTTP.HTTPChannel);
+ channel.* = HTTP.HTTPChannel.init();
+
+ try channel.buffer.ensureCapacity(args.count);
+
+ try NetworkThread.init();
+
+ var i: usize = 0;
+ while (i < args.count) : (i += 1) {
+ var response_body = try default_allocator.create(MutableString);
+ response_body.* = try MutableString.init(default_allocator, 1024);
+ var request_body = try default_allocator.create(MutableString);
+ request_body.* = try MutableString.init(default_allocator, 0);
+
+ var async_http = try default_allocator.create(HTTP.AsyncHTTP);
+ async_http.* = try HTTP.AsyncHTTP.init(
+ default_allocator,
+ args.method,
+ args.url,
+ args.headers,
+ args.headers_buf,
+ request_body,
+ response_body,
+ args.timeout,
+ );
+ async_http.channel = channel;
+ async_http.schedule(default_allocator);
+ }
+
+ var read_count: usize = 0;
+ var success_count: usize = 0;
+ var fail_count: usize = 0;
+ var timer = try std.time.Timer.start();
+ while (read_count < args.count) {
+ while (channel.tryReadItem() catch null) |http| {
+ read_count += 1;
+
+ Output.printElapsed(@floatCast(f64, @intToFloat(f128, http.elapsed) / std.time.ns_per_ms));
+ if (http.response) |resp| {
+ if (resp.status_code == 200) {
+ success_count += 1;
+ } else {
+ fail_count += 1;
+ }
+ Output.printError(" {}\n", .{resp});
+ } else if (http.err) |err| {
+ fail_count += 1;
+ Output.printError(" err: {s}\n", .{@errorName(err)});
+ } else {
+ fail_count += 1;
+ Output.prettyError(" Uh-oh: {s}\n", .{@tagName(http.state.loadUnchecked())});
+ }
+
+ Output.flush();
+ }
+ }
+
+ Output.printElapsed(@floatCast(f64, @intToFloat(f128, timer.read()) / std.time.ns_per_ms));
+ Output.prettyErrorln("Completed {d}\n Success: <green>{d}<r>\n Failure: <red>{d}<r>\n", .{
+ read_count,
+ success_count,
+ fail_count,
+ });
+ Output.flush();
+}
diff --git a/src/http.zig b/src/http.zig
index 4f978d44d..22bc278bf 100644
--- a/src/http.zig
+++ b/src/http.zig
@@ -1540,7 +1540,7 @@ pub const RequestContext = struct {
ctx.appendHeader("Sec-WebSocket-Protocol", "bun-hmr");
try ctx.writeStatus(101);
try ctx.flushHeaders();
- // Output.prettyln("<r><green>101<r><d> Hot Module Reloading connected.<r>", .{});
+ // Output.prettyErrorln("<r><green>101<r><d> Hot Module Reloading connected.<r>", .{});
// Output.flush();
Analytics.Features.hot_module_reloading = true;
@@ -1591,7 +1591,7 @@ pub const RequestContext = struct {
var frame = handler.websocket.read() catch |err| {
switch (err) {
error.ConnectionClosed => {
- // Output.prettyln("Websocket closed.", .{});
+ // Output.prettyErrorln("Websocket closed.", .{});
handler.tombstone = true;
is_socket_closed = true;
continue;
@@ -1604,7 +1604,7 @@ pub const RequestContext = struct {
};
switch (frame.header.opcode) {
.Close => {
- // Output.prettyln("Websocket closed.", .{});
+ // Output.prettyErrorln("Websocket closed.", .{});
is_socket_closed = true;
return;
},
@@ -1639,7 +1639,7 @@ pub const RequestContext = struct {
},
.success => {
if (build_result.timestamp > cmd.timestamp) {
- Output.prettyln(
+ Output.prettyErrorln(
"<r><b><green>{d}ms<r> <d>built<r> <b>{s}<r><b> <r><d>({d}+ LOC)",
.{
build_result.timestamp - cmd.timestamp,
@@ -2553,7 +2553,7 @@ pub const Server = struct {
);
if (comptime FeatureFlags.verbose_watcher) {
- Output.prettyln("<r><d>File changed: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
+ Output.prettyErrorln("<r><d>File changed: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
}
} else {
const change_message = Api.WebsocketMessageFileChangeNotification{
@@ -2566,12 +2566,12 @@ pub const Server = struct {
const change_buf = content_fbs.getWritten();
const written_buf = filechange_buf[0 .. header.len + change_buf.len];
RequestContext.WebsocketHandler.broadcast(written_buf) catch |err| {
- Output.prettyln("Error writing change notification: {s}<r>", .{@errorName(err)});
+ Output.prettyErrorln("Error writing change notification: {s}<r>", .{@errorName(err)});
};
if (comptime is_emoji_enabled) {
- Output.prettyln("<r>📜 <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
+ Output.prettyErrorln("<r>📜 <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
} else {
- Output.prettyln("<r> <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
+ Output.prettyErrorln("<r> <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
}
}
},
@@ -2583,9 +2583,9 @@ pub const Server = struct {
// ctx.watcher.removeAtIndex(event.index, hashes[event.index], parent_hashes, .directory);
if (comptime is_emoji_enabled) {
- Output.prettyln("<r>📁 <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
+ Output.prettyErrorln("<r>📁 <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
} else {
- Output.prettyln("<r> <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
+ Output.prettyErrorln("<r> <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
}
},
}
@@ -2839,13 +2839,13 @@ pub const Server = struct {
200, 304, 101 => {},
201...303, 305...399 => {
- Output.prettyln("<r><green>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
+ Output.prettyErrorln("<r><green>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
},
400...499 => {
- Output.prettyln("<r><yellow>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
+ Output.prettyErrorln("<r><yellow>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
},
else => {
- Output.prettyln("<r><red>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
+ Output.prettyErrorln("<r><red>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
},
}
}
@@ -2861,13 +2861,13 @@ pub const Server = struct {
200, 304, 101 => {},
201...303, 305...399 => {
- Output.prettyln("<r><green>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
+ Output.prettyErrorln("<r><green>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
},
400...499 => {
- Output.prettyln("<r><yellow>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
+ Output.prettyErrorln("<r><yellow>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
},
else => {
- Output.prettyln("<r><red>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
+ Output.prettyErrorln("<r><red>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
},
}
}
diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig
new file mode 100644
index 000000000..4bb1339c4
--- /dev/null
+++ b/src/http/http_client_async.zig
@@ -0,0 +1,1310 @@
+const picohttp = @import("../deps/picohttp.zig");
+usingnamespace @import("../global.zig");
+const std = @import("std");
+const Headers = @import("../javascript/jsc/webcore/response.zig").Headers;
+const URL = @import("../query_string_map.zig").URL;
+const Method = @import("../http/method.zig").Method;
+const Api = @import("../api/schema.zig").Api;
+const Lock = @import("../lock.zig").Lock;
+const HTTPClient = @This();
+const SOCKET_FLAGS = os.SOCK_CLOEXEC | os.SOCK_NONBLOCK;
+const S2n = @import("../s2n.zig");
+const Zlib = @import("../zlib.zig");
+const StringBuilder = @import("../string_builder.zig");
+const AsyncIO = @import("io");
+const ThreadPool = @import("../thread_pool.zig");
+
+const NetoworkThread = @import("./network_thread.zig");
+
+fn writeRequest(
+ writer: *AsyncSocket,
+ request: picohttp.Request,
+ body: string,
+ // header_hashes: []u64,
+) !void {
+ _ = try writer.write(request.method);
+ _ = try writer.write(" ");
+ _ = try writer.write(request.path);
+ _ = try writer.write(" HTTP/1.1\r\n");
+
+ for (request.headers) |header, i| {
+ _ = try writer.write(header.name);
+ _ = try writer.write(": ");
+ _ = try writer.write(header.value);
+ _ = try writer.write("\r\n");
+ }
+
+ _ = try writer.write("\r\n");
+
+ if (body.len > 0) {
+ _ = try writer.write(body);
+ }
+}
+
+method: Method,
+header_entries: Headers.Entries,
+header_buf: string,
+url: URL,
+allocator: *std.mem.Allocator,
+verbose: bool = isTest,
+tcp_client: tcp.Client = undefined,
+body_size: u32 = 0,
+read_count: u32 = 0,
+remaining_redirect_count: i8 = 127,
+redirect_buf: [2048]u8 = undefined,
+disable_shutdown: bool = true,
+timeout: usize = 0,
+progress_node: ?*std.Progress.Node = null,
+socket: AsyncSocket = undefined,
+
+pub fn init(
+ allocator: *std.mem.Allocator,
+ method: Method,
+ url: URL,
+ header_entries: Headers.Entries,
+ header_buf: string,
+) !HTTPClient {
+ return HTTPClient{
+ .allocator = allocator,
+ .method = method,
+ .url = url,
+ .header_entries = header_entries,
+ .header_buf = header_buf,
+ .socket = try AsyncSocket.init(&AsyncIO.global, 0, allocator),
+ };
+}
+
+threadlocal var response_headers_buf: [256]picohttp.Header = undefined;
+threadlocal var request_content_len_buf: [64]u8 = undefined;
+threadlocal var header_name_hashes: [256]u64 = undefined;
+// threadlocal var resolver_cache
+const tcp = std.x.net.tcp;
+const ip = std.x.net.ip;
+
+const IPv4 = std.x.os.IPv4;
+const IPv6 = std.x.os.IPv6;
+const Socket = std.x.os.Socket;
+const os = std.os;
+
+// lowercase hash header names so that we can be sure
+pub fn hashHeaderName(name: string) u64 {
+ var hasher = std.hash.Wyhash.init(0);
+ var remain: string = name;
+ var buf: [32]u8 = undefined;
+ var buf_slice: []u8 = std.mem.span(&buf);
+
+ while (remain.len > 0) {
+ var end = std.math.min(hasher.buf.len, remain.len);
+
+ hasher.update(strings.copyLowercase(std.mem.span(remain[0..end]), buf_slice));
+ remain = remain[end..];
+ }
+
+ return hasher.final();
+}
+
+const host_header_hash = hashHeaderName("Host");
+const connection_header_hash = hashHeaderName("Connection");
+
+pub const Encoding = enum {
+ identity,
+ gzip,
+ deflate,
+ brotli,
+ chunked,
+};
+
+const content_encoding_hash = hashHeaderName("Content-Encoding");
+const transfer_encoding_header = hashHeaderName("Transfer-Encoding");
+
+const host_header_name = "Host";
+const content_length_header_name = "Content-Length";
+const content_length_header_hash = hashHeaderName("Content-Length");
+const connection_header = picohttp.Header{ .name = "Connection", .value = "close" };
+const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" };
+const accept_header_hash = hashHeaderName("Accept");
+
+const accept_encoding_no_compression = "identity";
+const accept_encoding_compression = "deflate, gzip";
+const accept_encoding_header_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression };
+const accept_encoding_header_no_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_no_compression };
+
+const accept_encoding_header = if (FeatureFlags.disable_compression_in_http_client)
+ accept_encoding_header_no_compression
+else
+ accept_encoding_header_compression;
+
+const accept_encoding_header_hash = hashHeaderName("Accept-Encoding");
+
+const user_agent_header = picohttp.Header{ .name = "User-Agent", .value = "Bun.js " ++ Global.package_json_version };
+const user_agent_header_hash = hashHeaderName("User-Agent");
+const location_header_hash = hashHeaderName("Location");
+
+pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string {
+ return this.header_buf[ptr.offset..][0..ptr.length];
+}
+
+pub const HeaderBuilder = struct {
+ content: StringBuilder = StringBuilder{},
+ header_count: u64 = 0,
+ entries: Headers.Entries = Headers.Entries{},
+
+ pub fn count(this: *HeaderBuilder, name: string, value: string) void {
+ this.header_count += 1;
+ this.content.count(name);
+ this.content.count(value);
+ }
+
+ pub fn allocate(this: *HeaderBuilder, allocator: *std.mem.Allocator) !void {
+ try this.content.allocate(allocator);
+ try this.entries.ensureTotalCapacity(allocator, this.header_count);
+ }
+ pub fn append(this: *HeaderBuilder, name: string, value: string) void {
+ const name_ptr = Api.StringPointer{
+ .offset = @truncate(u32, this.content.len),
+ .length = @truncate(u32, name.len),
+ };
+
+ _ = this.content.append(name);
+
+ const value_ptr = Api.StringPointer{
+ .offset = @truncate(u32, this.content.len),
+ .length = @truncate(u32, value.len),
+ };
+ _ = this.content.append(value);
+ this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr });
+ }
+
+ pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void {
+ client.header_entries = this.entries;
+ client.header_buf = this.content.ptr.?[0..this.content.len];
+ }
+};
+
+threadlocal var server_name_buf: [1024]u8 = undefined;
+
+pub const HTTPChannel = @import("../sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 });
+
+pub const AsyncHTTP = struct {
+ request: ?picohttp.Request = null,
+ response: ?picohttp.Response = null,
+ request_headers: Headers.Entries = Headers.Entries{},
+ response_headers: Headers.Entries = Headers.Entries{},
+ response_buffer: *MutableString,
+ request_body: *MutableString,
+ allocator: *std.mem.Allocator,
+ request_header_buf: string = "",
+ method: Method = Method.GET,
+ max_retry_count: u32 = 0,
+ url: URL,
+
+ /// Timeout in nanoseconds
+ timeout: usize = 0,
+
+ response_encoding: Encoding = Encoding.identity,
+ redirect_count: u32 = 0,
+ retries_count: u32 = 0,
+
+ client: HTTPClient = undefined,
+ err: ?anyerror = null,
+
+ state: AtomicState = AtomicState.init(State.pending),
+ channel: ?*HTTPChannel = undefined,
+ elapsed: u64 = 0,
+
+ pub var active_requests_count = std.atomic.Atomic(u32).init(0);
+
+ pub const State = enum(u32) {
+ pending = 0,
+ scheduled = 1,
+ sending = 2,
+ success = 3,
+ fail = 4,
+ };
+ const AtomicState = std.atomic.Atomic(State);
+
+ pub fn init(
+ allocator: *std.mem.Allocator,
+ method: Method,
+ url: URL,
+ headers: Headers.Entries,
+ headers_buf: string,
+ response_buffer: *MutableString,
+ request_body: *MutableString,
+ timeout: usize,
+ ) !AsyncHTTP {
+ var this = AsyncHTTP{
+ .allocator = allocator,
+ .url = url,
+ .method = method,
+ .request_headers = headers,
+ .request_header_buf = headers_buf,
+ .request_body = request_body,
+ .response_buffer = response_buffer,
+ };
+ this.client = try HTTPClient.init(allocator, method, url, headers, headers_buf);
+ this.client.timeout = timeout;
+ this.timeout = timeout;
+ return this;
+ }
+
+ pub fn schedule(this: *AsyncHTTP, allocator: *std.mem.Allocator) void {
+ std.debug.assert(NetoworkThread.global_loaded);
+ var sender = HTTPSender.get(this, allocator);
+ this.state.store(.scheduled, .Monotonic);
+ NetoworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task));
+ }
+
+ const HTTPSender = struct {
+ task: ThreadPool.Task = .{ .callback = callback },
+ frame: @Frame(AsyncHTTP.do) = undefined,
+ http: *AsyncHTTP = undefined,
+
+ next: ?*HTTPSender = null,
+
+ var head: ?*HTTPSender = null;
+
+ pub fn get(http: *AsyncHTTP, allocator: *std.mem.Allocator) *HTTPSender {
+ if (head == null) {
+ head = allocator.create(HTTPSender) catch unreachable;
+ head.?.* = HTTPSender{};
+ }
+
+ var head_ = head.?;
+ head = head.?.next;
+ head_.next = null;
+ head_.task = .{ .callback = callback };
+ head_.http = http;
+
+ return head_;
+ }
+
+ pub fn release(this: *HTTPSender) void {
+ // head = this;
+ }
+
+ pub fn callback(task: *ThreadPool.Task) void {
+ var this = @fieldParentPtr(HTTPSender, "task", task);
+ this.frame = async AsyncHTTP.do(this);
+ }
+
+ pub fn onFinish(this: *HTTPSender) void {}
+ };
+
+ pub fn do(sender: *HTTPSender) void {
+ {
+ var this = sender.http;
+ this.err = null;
+ this.state.store(.sending, .Monotonic);
+ var timer = std.time.Timer.start() catch @panic("Timer failure");
+ defer this.elapsed = timer.read();
+ _ = active_requests_count.fetchAdd(1, .Monotonic);
+ defer _ = active_requests_count.fetchSub(1, .Monotonic);
+ this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| {
+ this.state.store(.fail, .Monotonic);
+ this.err = err;
+ return;
+ };
+ this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0));
+ this.state.store(.success, .Monotonic);
+ }
+
+ switch (sender.http.state.load(.Monotonic)) {
+ .fail => {
+ if (sender.http.max_retry_count > sender.http.retries_count) {
+ sender.http.retries_count += 1;
+ NetoworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task));
+ return;
+ }
+ },
+ else => {},
+ }
+
+ if (sender.http.channel) |channel| {
+ std.debug.assert(channel.tryWriteItem(sender.http) catch false);
+ }
+
+ sender.release();
+ }
+};
+
+const AsyncMessage = struct {
+ const buffer_size = std.math.maxInt(u16) - 64;
+ used: u16 = 0,
+ sent: u16 = 0,
+ completion: AsyncIO.Completion = undefined,
+ buf: [buffer_size]u8 = undefined,
+ allocator: *std.mem.Allocator,
+ next: ?*AsyncMessage = null,
+ context: *c_void = undefined,
+
+ var _first: ?*AsyncMessage = null;
+ pub fn get(allocator: *std.mem.Allocator) *AsyncMessage {
+ if (_first) |first| {
+ var prev = first;
+ if (first.next) |next| {
+ _first = next;
+ prev.next = null;
+ return prev;
+ }
+
+ return prev;
+ }
+
+ var msg = allocator.create(AsyncMessage) catch unreachable;
+ msg.* = AsyncMessage{ .allocator = allocator };
+ return msg;
+ }
+
+ pub fn release(self: *AsyncMessage) void {
+ self.next = _first;
+ self.used = 0;
+ self.sent = 0;
+ _first = self;
+ }
+
+ const WriteResponse = struct {
+ written: u32 = 0,
+ overflow: bool = false,
+ };
+
+ pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse {
+ var remain = this.buf[this.used..];
+ var writable = buffer[0..@minimum(buffer.len, remain.len)];
+ if (writable.len == 0) {
+ return .{ .written = 0, .overflow = buffer.len > 0 };
+ }
+
+ std.mem.copy(u8, remain, writable);
+ this.used += @intCast(u16, writable.len);
+
+ return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len };
+ }
+
+ pub inline fn slice(this: *const AsyncMessage) []const u8 {
+ return this.buf[0..this.used][this.sent..];
+ }
+
+ pub inline fn available(this: *AsyncMessage) []u8 {
+ return this.buf[0 .. this.buf.len - this.used];
+ }
+};
+
+const Completion = AsyncIO.Completion;
+
+const AsyncSocket = struct {
+ const This = @This();
+ io: *AsyncIO = undefined,
+ socket: std.os.socket_t = 0,
+ head: *AsyncMessage = undefined,
+ tail: *AsyncMessage = undefined,
+ allocator: *std.mem.Allocator,
+ err: ?anyerror = null,
+ queued: usize = 0,
+ sent: usize = 0,
+ send_frame: @Frame(AsyncSocket.send) = undefined,
+ read_frame: @Frame(AsyncSocket.read) = undefined,
+ connect_frame: @Frame(AsyncSocket.connect) = undefined,
+ read_context: []u8 = undefined,
+ read_offset: u64 = 0,
+ read_completion: AsyncIO.Completion = undefined,
+ connect_completion: AsyncIO.Completion = undefined,
+
+ const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError;
+
+ pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: *std.mem.Allocator) !AsyncSocket {
+ var head = AsyncMessage.get(allocator);
+
+ return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator };
+ }
+
+ fn on_connect(this: *AsyncSocket, completion: *Completion, err: ConnectError!void) void {
+ err catch |resolved_err| {
+ this.err = resolved_err;
+ };
+
+ resume this.connect_frame;
+ }
+
+ pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void {
+ this.socket = 0;
+
+ const list = std.net.getAddressList(this.allocator, name, port) catch |err| {
+ return @errSetCast(ConnectError, err);
+ };
+ defer list.deinit();
+
+ if (list.addrs.len == 0) return error.ConnectionRefused;
+
+ for (list.addrs) |address| {
+ const sockfd = try AsyncIO.openSocket(address.any.family, SOCKET_FLAGS | std.os.SOCK_STREAM, std.os.IPPROTO_TCP);
+
+ this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address);
+ suspend {
+ this.connect_frame = @frame().*;
+ }
+
+ if (this.err) |e| {
+ std.os.closeSocket(sockfd);
+
+ switch (e) {
+ error.ConnectionRefused => {
+ this.err = null;
+ continue;
+ },
+ else => return @errSetCast(ConnectError, e),
+ }
+ }
+
+ this.socket = sockfd;
+ return;
+ }
+
+ return error.ConnectionRefused;
+ }
+
+ fn on_send(msg: *AsyncMessage, completion: *Completion, result: SendError!usize) void {
+ var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context));
+ const written = result catch |err| {
+ this.err = err;
+ resume this.send_frame;
+ return;
+ };
+
+ if (written == 0) {
+ resume this.send_frame;
+ return;
+ }
+
+ msg.sent += @truncate(u16, written);
+ const has_more = msg.used > msg.sent;
+ this.sent += written;
+
+ if (has_more) {
+ this.io.send(*AsyncMessage, msg, on_send, &msg.completion, this.socket, msg.slice());
+ } else {
+ msg.release();
+ }
+
+ // complete
+ if (this.queued <= this.sent) {
+ resume this.send_frame;
+ }
+ }
+
+ pub fn write(this: *AsyncSocket, buf: []const u8) AsyncSocket.SendError!usize {
+ this.tail.context = this;
+
+ const resp = this.tail.writeAll(buf);
+ this.queued += resp.written;
+
+ if (resp.overflow) {
+ var next = AsyncMessage.get(this.allocator);
+ this.tail.next = next;
+ this.tail = next;
+
+ return @as(usize, resp.written) + try this.write(buf[resp.written..]);
+ }
+
+ return @as(usize, resp.written);
+ }
+
+ pub const SendError = AsyncIO.SendError;
+
+ pub fn deinit(this: *AsyncSocket) void {
+ var node = this.head;
+ while (node.next) |element| {
+ element.release();
+ node = element.next orelse break;
+ }
+ this.head.release();
+ }
+
+ pub fn send(this: *This) SendError!usize {
+ const original_sent = this.sent;
+ this.head.context = this;
+
+ this.io.send(*AsyncMessage, this.head, on_send, &this.head.completion, this.socket, this.head.slice());
+
+ var node = this.head;
+ while (node.next) |element| {
+ this.io.send(*AsyncMessage, element, on_send, &element.completion, this.socket, element.slice());
+ node = element.next orelse break;
+ }
+
+ suspend {
+ this.send_frame = @frame().*;
+ }
+
+ if (this.err) |err| {
+ this.err = null;
+ return @errSetCast(AsyncSocket.SendError, err);
+ }
+
+ return this.sent - original_sent;
+ }
+
+ pub const RecvError = AsyncIO.RecvError;
+
+ pub fn read(
+ this: *AsyncSocket,
+ bytes: []u8,
+ offset: u64,
+ ) RecvError!u64 {
+ this.read_context = bytes;
+ this.read_offset = offset;
+ const original_read_offset = this.read_offset;
+ const Reader = struct {
+ pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void {
+ const len = result catch |err| {
+ ctx.err = err;
+ resume ctx.read_frame;
+ return;
+ };
+ ctx.read_offset += len;
+ resume ctx.read_frame;
+ }
+ };
+
+ this.io.recv(
+ *AsyncSocket,
+ this,
+ Reader.on_read,
+ &this.read_completion,
+ this.socket,
+ bytes,
+ );
+
+ suspend {
+ this.read_frame = @frame().*;
+ }
+
+ if (this.err) |err| {
+ this.err = null;
+ return @errSetCast(RecvError, err);
+ }
+
+ return this.read_offset - original_read_offset;
+ }
+};
+
+threadlocal var request_headers_buf: [256]picohttp.Header = undefined;
+
+pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request {
+ var header_count: usize = 0;
+ var header_entries = this.header_entries.slice();
+ var header_names = header_entries.items(.name);
+ var header_values = header_entries.items(.value);
+
+ var override_accept_encoding = false;
+
+ var override_user_agent = false;
+ for (header_names) |head, i| {
+ const name = this.headerStr(head);
+ // Hash it as lowercase
+ const hash = hashHeaderName(name);
+
+ // Skip host and connection header
+ // we manage those
+ switch (hash) {
+ host_header_hash,
+ connection_header_hash,
+ content_length_header_hash,
+ => continue,
+ else => {},
+ }
+
+ override_user_agent = override_user_agent or hash == user_agent_header_hash;
+
+ override_accept_encoding = override_accept_encoding or hash == accept_encoding_header_hash;
+
+ request_headers_buf[header_count] = (picohttp.Header{
+ .name = name,
+ .value = this.headerStr(header_values[i]),
+ });
+
+ // header_name_hashes[header_count] = hash;
+
+ // // ensure duplicate headers come after each other
+ // if (header_count > 2) {
+ // var head_i: usize = header_count - 1;
+ // while (head_i > 0) : (head_i -= 1) {
+ // if (header_name_hashes[head_i] == header_name_hashes[header_count]) {
+ // std.mem.swap(picohttp.Header, &header_name_hashes[header_count], &header_name_hashes[head_i + 1]);
+ // std.mem.swap(u64, &request_headers_buf[header_count], &request_headers_buf[head_i + 1]);
+ // break;
+ // }
+ // }
+ // }
+ header_count += 1;
+ }
+
+ // request_headers_buf[header_count] = connection_header;
+ // header_count += 1;
+
+ if (!override_user_agent) {
+ request_headers_buf[header_count] = user_agent_header;
+ header_count += 1;
+ }
+
+ request_headers_buf[header_count] = accept_header;
+ header_count += 1;
+
+ request_headers_buf[header_count] = picohttp.Header{
+ .name = host_header_name,
+ .value = this.url.hostname,
+ };
+ header_count += 1;
+
+ if (!override_accept_encoding) {
+ request_headers_buf[header_count] = accept_encoding_header;
+ header_count += 1;
+ }
+
+ if (body_len > 0) {
+ request_headers_buf[header_count] = picohttp.Header{
+ .name = content_length_header_name,
+ .value = std.fmt.bufPrint(&request_content_len_buf, "{d}", .{body_len}) catch "0",
+ };
+ header_count += 1;
+ }
+
+ return picohttp.Request{
+ .method = @tagName(this.method),
+ .path = this.url.pathname,
+ .minor_version = 1,
+ .headers = request_headers_buf[0..header_count],
+ };
+}
+
+pub fn connect(
+ this: *HTTPClient,
+) !void {
+ const port = this.url.getPortAuto();
+
+ try this.socket.connect(this.url.hostname, port);
+ var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket) };
+ client.setNoDelay(true) catch {};
+ client.setReadBufferSize(AsyncMessage.buffer_size) catch {};
+ client.setQuickACK(true) catch {};
+
+ if (this.timeout > 0) {
+ client.setReadTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {};
+ client.setWriteTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {};
+ }
+}
+
+pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) @Frame(HTTPClient.send) {
+ return async this.send(body, body_out_str);
+}
+
+pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
+ // this prevents stack overflow
+ redirect: while (this.remaining_redirect_count >= -1) {
+ if (this.url.isHTTPS()) {
+ return error.NotImplementedYet;
+ // return this.sendHTTPS(body, body_out_str) catch |err| {
+ // switch (err) {
+ // error.Redirect => {
+ // this.remaining_redirect_count -= 1;
+ // continue :redirect;
+ // },
+ // else => return err,
+ // }
+ // };
+ } else {
+ return this.sendHTTP(body, body_out_str) catch |err| {
+ switch (err) {
+ error.Redirect => {
+ this.remaining_redirect_count -= 1;
+ continue :redirect;
+ },
+ else => return err,
+ }
+ };
+ }
+ }
+
+ return error.TooManyRedirects;
+}
+
+const Task = ThreadPool.Task;
+
+pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
+ try this.connect();
+ defer if (this.socket.socket > 0) std.os.closeSocket(this.socket.socket);
+ var request = buildRequest(this, body.len);
+ if (this.verbose) {
+ Output.prettyErrorln("{s}", .{request});
+ }
+
+ try writeRequest(&this.socket, request, body);
+ _ = try this.socket.send();
+ var client_reader = &this.socket;
+
+ if (this.progress_node == null) {
+ return this.processResponse(
+ false,
+ false,
+ @TypeOf(client_reader),
+ client_reader,
+ body_out_str,
+ );
+ } else {
+ return this.processResponse(
+ false,
+ true,
+ @TypeOf(client_reader),
+ client_reader,
+ body_out_str,
+ );
+ }
+}
+
+const ZlibPool = struct {
+ lock: Lock = Lock.init(),
+ items: std.ArrayList(*MutableString),
+ allocator: *std.mem.Allocator,
+ pub var instance: ZlibPool = undefined;
+ pub var loaded: bool = false;
+
+ pub fn init(allocator: *std.mem.Allocator) ZlibPool {
+ return ZlibPool{
+ .allocator = allocator,
+ .items = std.ArrayList(*MutableString).init(allocator),
+ };
+ }
+
+ pub fn get(this: *ZlibPool) !*MutableString {
+ this.lock.lock();
+ defer this.lock.unlock();
+ switch (this.items.items.len) {
+ 0 => {
+ var mutable = try this.allocator.create(MutableString);
+ mutable.* = try MutableString.init(this.allocator, 0);
+ return mutable;
+ },
+ else => {
+ return this.items.pop();
+ },
+ }
+
+ return item;
+ }
+
+ pub fn put(this: *ZlibPool, mutable: *MutableString) !void {
+ this.lock.lock();
+ defer this.lock.unlock();
+ mutable.reset();
+ try this.items.append(mutable);
+ }
+};
+
+pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response {
+ var response: picohttp.Response = undefined;
+ var request_message = AsyncMessage.get(this.allocator);
+ defer request_message.release();
+ var request_buffer: []u8 = &request_message.buf;
+ var read_length: usize = 0;
+ {
+ var read_headers_up_to: usize = 0;
+
+ var req_buf_read: usize = std.math.maxInt(usize);
+ defer this.read_count += @intCast(u32, read_length);
+
+ restart: while (req_buf_read != 0) {
+ req_buf_read = try client.read(request_buffer, read_length);
+ read_length += req_buf_read;
+ if (comptime report_progress) {
+ this.progress_node.?.activate();
+ this.progress_node.?.setCompletedItems(read_length);
+ this.progress_node.?.context.maybeRefresh();
+ }
+
+ var request_body = request_buffer[0..read_length];
+ read_headers_up_to = if (read_headers_up_to > read_length) read_length else read_headers_up_to;
+
+ response = picohttp.Response.parseParts(request_body, &response_headers_buf, &read_headers_up_to) catch |err| {
+ switch (err) {
+ error.ShortRead => {
+ continue :restart;
+ },
+ else => {
+ return err;
+ },
+ }
+ };
+ break :restart;
+ }
+ }
+
+ body_out_str.reset();
+ var content_length: u32 = 0;
+ var encoding = Encoding.identity;
+ var transfer_encoding = Encoding.identity;
+
+ var location: string = "";
+
+ if (this.verbose) {
+ Output.prettyErrorln("Response: {s}", .{response});
+ }
+
+ for (response.headers) |header| {
+ switch (hashHeaderName(header.name)) {
+ content_length_header_hash => {
+ content_length = std.fmt.parseInt(u32, header.value, 10) catch 0;
+ try body_out_str.inflate(content_length);
+ body_out_str.list.expandToCapacity();
+ this.body_size = content_length;
+ },
+ content_encoding_hash => {
+ if (strings.eqlComptime(header.value, "gzip")) {
+ encoding = Encoding.gzip;
+ } else if (strings.eqlComptime(header.value, "deflate")) {
+ encoding = Encoding.deflate;
+ } else if (!strings.eqlComptime(header.value, "identity")) {
+ return error.UnsupportedContentEncoding;
+ }
+ },
+ transfer_encoding_header => {
+ if (strings.eqlComptime(header.value, "gzip")) {
+ transfer_encoding = Encoding.gzip;
+ } else if (strings.eqlComptime(header.value, "deflate")) {
+ transfer_encoding = Encoding.deflate;
+ } else if (strings.eqlComptime(header.value, "identity")) {
+ transfer_encoding = Encoding.identity;
+ } else if (strings.eqlComptime(header.value, "chunked")) {
+ transfer_encoding = Encoding.chunked;
+ } else {
+ return error.UnsupportedTransferEncoding;
+ }
+ },
+ location_header_hash => {
+ location = header.value;
+ },
+
+ else => {},
+ }
+ }
+
+ if (location.len > 0 and this.remaining_redirect_count > 0) {
+ switch (response.status_code) {
+ 302, 301, 307, 308, 303 => {
+ if (strings.indexOf(location, "://")) |i| {
+ const protocol_name = location[0..i];
+ if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else {
+ return error.UnsupportedRedirectProtocol;
+ }
+
+ std.mem.copy(u8, &this.redirect_buf, location);
+ this.url = URL.parse(location);
+ } else {
+ const original_url = this.url;
+ this.url = URL.parse(std.fmt.bufPrint(
+ &this.redirect_buf,
+ "{s}://{s}{s}",
+ .{ original_url.displayProtocol(), original_url.displayHostname(), location },
+ ) catch return error.RedirectURLTooLong);
+ }
+
+ // https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303
+ if (response.status_code == 303) {
+ this.method = .GET;
+ }
+
+ return error.Redirect;
+ },
+ else => {},
+ }
+ }
+
+ if (transfer_encoding == Encoding.chunked) {
+ var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder);
+ var buffer_: *MutableString = body_out_str;
+
+ switch (encoding) {
+ Encoding.gzip, Encoding.deflate => {
+ if (!ZlibPool.loaded) {
+ ZlibPool.instance = ZlibPool.init(default_allocator);
+ ZlibPool.loaded = true;
+ }
+
+ buffer_ = try ZlibPool.instance.get();
+ },
+ else => {},
+ }
+
+ var buffer = buffer_.*;
+
+ var last_read: usize = 0;
+ {
+ var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length];
+ last_read = remainder.len;
+ try buffer.inflate(std.math.max(remainder.len, 2048));
+ buffer.list.expandToCapacity();
+ std.mem.copy(u8, buffer.list.items, remainder);
+ }
+
+ // set consume_trailer to 1 to discard the trailing header
+ // using content-encoding per chunk is not supported
+ decoder.consume_trailer = 1;
+
+ // these variable names are terrible
+ // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked
+ // (but ported from C -> zig)
+ var rret: usize = 0;
+ var rsize: usize = last_read;
+ var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize);
+ var total_size = rsize;
+
+ while (pret == -2) {
+ if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) {
+ try buffer.inflate(std.math.max(total_size * 2, 1024));
+ buffer.list.expandToCapacity();
+ }
+
+ rret = try client.read(buffer.list.items, total_size);
+
+ if (rret == 0) {
+ return error.ChunkedEncodingError;
+ }
+
+ rsize = rret;
+ pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize);
+ if (pret == -1) return error.ChunkedEncodingParseError;
+
+ total_size += rsize;
+
+ if (comptime report_progress) {
+ this.progress_node.?.activate();
+ this.progress_node.?.setCompletedItems(total_size);
+ this.progress_node.?.context.maybeRefresh();
+ }
+ }
+
+ buffer.list.shrinkRetainingCapacity(total_size);
+ buffer_.* = buffer;
+
+ switch (encoding) {
+ Encoding.gzip, Encoding.deflate => {
+ body_out_str.list.expandToCapacity();
+ defer ZlibPool.instance.put(buffer_) catch unreachable;
+ var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator);
+ reader.readAll() catch |err| {
+ if (reader.errorMessage()) |msg| {
+ Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg});
+ Output.flush();
+ }
+ return err;
+ };
+ },
+ else => {},
+ }
+
+ if (comptime report_progress) {
+ this.progress_node.?.activate();
+ this.progress_node.?.setCompletedItems(body_out_str.list.items.len);
+ this.progress_node.?.context.maybeRefresh();
+ }
+
+ this.body_size = @intCast(u32, body_out_str.list.items.len);
+ return response;
+ }
+
+ if (content_length > 0) {
+ var remaining_content_length = content_length;
+ var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length];
+ remainder = remainder[0..std.math.min(remainder.len, content_length)];
+ var buffer_: *MutableString = body_out_str;
+
+ switch (encoding) {
+ Encoding.gzip, Encoding.deflate => {
+ if (!ZlibPool.loaded) {
+ ZlibPool.instance = ZlibPool.init(default_allocator);
+ ZlibPool.loaded = true;
+ }
+
+ buffer_ = try ZlibPool.instance.get();
+ if (buffer_.list.capacity < remaining_content_length) {
+ try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length);
+ }
+ buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length];
+ },
+ else => {},
+ }
+ var buffer = buffer_.*;
+
+ var body_size: usize = 0;
+ if (remainder.len > 0) {
+ std.mem.copy(u8, buffer.list.items, remainder);
+ body_size = remainder.len;
+ this.read_count += @intCast(u32, body_size);
+ remaining_content_length -= @intCast(u32, remainder.len);
+ }
+
+ while (remaining_content_length > 0) {
+ const size = @intCast(u32, try client.read(
+ buffer.list.items,
+ body_size,
+ ));
+ this.read_count += size;
+ if (size == 0) break;
+
+ body_size += size;
+ remaining_content_length -= size;
+
+ if (comptime report_progress) {
+ this.progress_node.?.activate();
+ this.progress_node.?.setCompletedItems(body_size);
+ this.progress_node.?.context.maybeRefresh();
+ }
+ }
+
+ if (comptime report_progress) {
+ this.progress_node.?.activate();
+ this.progress_node.?.setCompletedItems(body_size);
+ this.progress_node.?.context.maybeRefresh();
+ }
+
+ buffer.list.shrinkRetainingCapacity(body_size);
+ buffer_.* = buffer;
+
+ switch (encoding) {
+ Encoding.gzip, Encoding.deflate => {
+ body_out_str.list.expandToCapacity();
+ defer ZlibPool.instance.put(buffer_) catch unreachable;
+ var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator);
+ reader.readAll() catch |err| {
+ if (reader.errorMessage()) |msg| {
+ Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg});
+ Output.flush();
+ }
+ return err;
+ };
+ },
+ else => {},
+ }
+ }
+
+ if (comptime report_progress) {
+ this.progress_node.?.activate();
+ this.progress_node.?.setCompletedItems(body_out_str.list.items.len);
+ this.progress_node.?.context.maybeRefresh();
+ }
+
+ return response;
+}
+
+pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response {
+ var connection = try this.connect();
+ S2n.boot(default_allocator);
+ const hostname = this.url.displayHostname();
+ std.mem.copy(u8, &server_name_buf, hostname);
+ server_name_buf[hostname.len] = 0;
+ var server_name = server_name_buf[0..hostname.len :0];
+
+ var client = S2n.Connection.init(connection.socket.fd);
+ try client.start(server_name);
+ client.disable_shutdown = this.disable_shutdown;
+ defer client.close() catch {};
+
+ var request = buildRequest(this, body_str.len);
+ if (this.verbose) {
+ Output.prettyErrorln("{s}", .{request});
+ }
+ const body = body_str;
+
+ var client_writer = client.writer();
+ {
+ var client_writer_buffered = std.io.bufferedWriter(client_writer);
+ var client_writer_buffered_writer = client_writer_buffered.writer();
+
+ try writeRequest(@TypeOf(&client_writer_buffered_writer), &client_writer_buffered_writer, request, body);
+ try client_writer_buffered_writer.writeAll("\r\n");
+ try client_writer_buffered.flush();
+ }
+
+ if (body.len > 0) {
+ try client_writer.writeAll(body);
+ }
+
+ if (this.progress_node == null) {
+ return try this.processResponse(true, false, @TypeOf(&client), &client, body_out_str);
+ } else {
+ return try this.processResponse(true, true, @TypeOf(&client), &client, body_out_str);
+ }
+}
+
+// zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec
+test "sendHTTP - only" {
+ Output.initTest();
+ defer Output.flush();
+
+ var headers = try std.heap.c_allocator.create(Headers);
+ headers.* = Headers{
+ .entries = @TypeOf(headers.entries){},
+ .buf = @TypeOf(headers.buf){},
+ .used = 0,
+ .allocator = std.heap.c_allocator,
+ };
+
+ // headers.appendHeader("X-What", "ok", true, true, false);
+ headers.appendHeader("Accept-Encoding", "identity", true, true, false);
+
+ var client = HTTPClient.init(
+ std.heap.c_allocator,
+ .GET,
+ URL.parse("http://example.com/"),
+ headers.entries,
+ headers.buf.items,
+ );
+ var body_out_str = try MutableString.init(std.heap.c_allocator, 0);
+ var response = try client.sendHTTP("", &body_out_str);
+ try std.testing.expectEqual(response.status_code, 200);
+ try std.testing.expectEqual(body_out_str.list.items.len, 1256);
+ try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html"));
+}
+
+// zig test src/http_client.zig --test-filter "sendHTTP - gzip" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec
+test "sendHTTP - gzip" {
+ Output.initTest();
+ defer Output.flush();
+
+ var headers = try std.heap.c_allocator.create(Headers);
+ headers.* = Headers{
+ .entries = @TypeOf(headers.entries){},
+ .buf = @TypeOf(headers.buf){},
+ .used = 0,
+ .allocator = std.heap.c_allocator,
+ };
+
+ // headers.appendHeader("X-What", "ok", true, true, false);
+ headers.appendHeader("Accept-Encoding", "gzip", true, true, false);
+
+ var client = HTTPClient.init(
+ std.heap.c_allocator,
+ .GET,
+ URL.parse("http://example.com/"),
+ headers.entries,
+ headers.buf.items,
+ );
+ var body_out_str = try MutableString.init(std.heap.c_allocator, 0);
+ var response = try client.sendHTTP("", &body_out_str);
+ try std.testing.expectEqual(response.status_code, 200);
+ try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html"));
+}
+
+// zig test src/http_client.zig --test-filter "sendHTTPS - identity" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec
+test "sendHTTPS - identity" {
+ Output.initTest();
+ defer Output.flush();
+
+ var headers = try std.heap.c_allocator.create(Headers);
+ headers.* = Headers{
+ .entries = @TypeOf(headers.entries){},
+ .buf = @TypeOf(headers.buf){},
+ .used = 0,
+ .allocator = std.heap.c_allocator,
+ };
+
+ headers.appendHeader("X-What", "ok", true, true, false);
+ headers.appendHeader("Accept-Encoding", "identity", true, true, false);
+
+ var client = HTTPClient.init(
+ std.heap.c_allocator,
+ .GET,
+ URL.parse("https://example.com/"),
+ headers.entries,
+ headers.buf.items,
+ );
+ var body_out_str = try MutableString.init(std.heap.c_allocator, 0);
+ var response = try client.sendHTTPS("", &body_out_str);
+ try std.testing.expectEqual(response.status_code, 200);
+ try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html"));
+}
+
+test "sendHTTPS - gzip" {
+ Output.initTest();
+ defer Output.flush();
+
+ var headers = try std.heap.c_allocator.create(Headers);
+ headers.* = Headers{
+ .entries = @TypeOf(headers.entries){},
+ .buf = @TypeOf(headers.buf){},
+ .used = 0,
+ .allocator = std.heap.c_allocator,
+ };
+
+ headers.appendHeader("Accept-Encoding", "gzip", false, false, false);
+
+ var client = HTTPClient.init(
+ std.heap.c_allocator,
+ .GET,
+ URL.parse("https://example.com/"),
+ headers.entries,
+ headers.buf.items,
+ );
+ var body_out_str = try MutableString.init(std.heap.c_allocator, 0);
+ var response = try client.sendHTTPS("", &body_out_str);
+ try std.testing.expectEqual(response.status_code, 200);
+ try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html"));
+}
+
+// zig test src/http_client.zig --test-filter "sendHTTPS - deflate" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test
+test "sendHTTPS - deflate" {
+ Output.initTest();
+ defer Output.flush();
+
+ var headers = try std.heap.c_allocator.create(Headers);
+ headers.* = Headers{
+ .entries = @TypeOf(headers.entries){},
+ .buf = @TypeOf(headers.buf){},
+ .used = 0,
+ .allocator = std.heap.c_allocator,
+ };
+
+ headers.appendHeader("Accept-Encoding", "deflate", false, false, false);
+
+ var client = HTTPClient.init(
+ std.heap.c_allocator,
+ .GET,
+ URL.parse("https://example.com/"),
+ headers.entries,
+ headers.buf.items,
+ );
+ var body_out_str = try MutableString.init(std.heap.c_allocator, 0);
+ var response = try client.sendHTTPS("", &body_out_str);
+ try std.testing.expectEqual(response.status_code, 200);
+ try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html"));
+}
+
+// zig test src/http_client.zig --test-filter "sendHTTP" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test
+
+test "send - redirect" {
+ Output.initTest();
+ defer Output.flush();
+
+ var headers = try std.heap.c_allocator.create(Headers);
+ headers.* = Headers{
+ .entries = @TypeOf(headers.entries){},
+ .buf = @TypeOf(headers.buf){},
+ .used = 0,
+ .allocator = std.heap.c_allocator,
+ };
+
+ headers.appendHeader("Accept-Encoding", "gzip", false, false, false);
+
+ var client = HTTPClient.init(
+ std.heap.c_allocator,
+ .GET,
+ URL.parse("https://www.bun.sh/"),
+ headers.entries,
+ headers.buf.items,
+ );
+ try std.testing.expectEqualStrings(client.url.hostname, "www.bun.sh");
+ var body_out_str = try MutableString.init(std.heap.c_allocator, 0);
+ var response = try client.send("", &body_out_str);
+ try std.testing.expectEqual(response.status_code, 200);
+ try std.testing.expectEqual(client.url.hostname, "bun.sh");
+ try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html"));
+}
diff --git a/src/http/network_thread.zig b/src/http/network_thread.zig
new file mode 100644
index 000000000..d7f9c4409
--- /dev/null
+++ b/src/http/network_thread.zig
@@ -0,0 +1,25 @@
+const ThreadPool = @import("../thread_pool.zig");
+const Batch = ThreadPool.Batch;
+const std = @import("std");
+const AsyncIO = @import("io");
+
+const NetworkThread = @This();
+
+/// Single-thread in this pool
+pool: ThreadPool,
+
+pub var global: NetworkThread = undefined;
+pub var global_loaded: bool = false;
+
+pub fn init() !void {
+ AsyncIO.global = try AsyncIO.init(0, 0);
+ AsyncIO.global_loaded = true;
+
+ global = NetworkThread{
+ .pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }),
+ };
+
+ global.pool.io = &AsyncIO.global;
+
+ global_loaded = true;
+}
diff --git a/src/http_client.zig b/src/http_client.zig
index f75f9a340..3b78e625e 100644
--- a/src/http_client.zig
+++ b/src/http_client.zig
@@ -259,16 +259,8 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request {
pub fn connect(
this: *HTTPClient,
) !tcp.Client {
- var client: tcp.Client = try tcp.Client.init(tcp.Domain.ip, .{ .close_on_exec = true });
const port = this.url.getPortAuto();
- client.setNoDelay(true) catch {};
- client.setReadBufferSize(http_req_buf.len) catch {};
- client.setQuickACK(true) catch {};
-
- if (this.timeout > 0) {
- client.setReadTimeout(this.timeout) catch {};
- client.setWriteTimeout(this.timeout) catch {};
- }
+ var client: tcp.Client = undefined;
// if (this.url.isLocalhost()) {
// try client.connect(
@@ -277,7 +269,16 @@ pub fn connect(
// } else {
// } else if (this.url.isDomainName()) {
var stream = try std.net.tcpConnectToHost(default_allocator, this.url.hostname, port);
- client.socket = std.x.os.Socket.from(stream.handle);
+ client = tcp.Client{ .socket = std.x.os.Socket.from(stream.handle) };
+
+ if (this.timeout > 0) {
+ client.setReadTimeout(this.timeout) catch {};
+ client.setWriteTimeout(this.timeout) catch {};
+ }
+
+ client.setNoDelay(true) catch {};
+ client.setReadBufferSize(http_req_buf.len) catch {};
+ client.setQuickACK(true) catch {};
// }
// } else if (this.url.getIPv4Address()) |ip_addr| {
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig
index 00186610b..f1ff5a06e 100644
--- a/src/io/io_darwin.zig
+++ b/src/io/io_darwin.zig
@@ -1,5 +1,20 @@
const std = @import("std");
-const os = std.os;
+const os = struct {
+ pub usingnamespace std.os;
+ pub const EINTR = 4;
+ pub const EAGAIN = 35;
+ pub const EBADF = 9;
+ pub const ECONNRESET = 54;
+ pub const EFAULT = 14;
+ pub const EINVAL = 22;
+ pub const EIO = 5;
+ pub const EISDIR = 21;
+ pub const ENOBUFS = 55;
+ pub const ENOMEM = 12;
+ pub const ENXIO = 6;
+ pub const EOVERFLOW = 84;
+ pub const ESPIPE = 29;
+};
const mem = std.mem;
const assert = std.debug.assert;
@@ -486,7 +501,7 @@ pub fn read(
op.len,
@bitCast(isize, op.offset),
);
- return switch (os.errno(rc)) {
+ return switch (@enumToInt(os.errno(rc))) {
0 => @intCast(usize, rc),
os.EINTR => continue,
os.EAGAIN => error.WouldBlock,
@@ -501,7 +516,7 @@ pub fn read(
os.ENXIO => error.Unseekable,
os.EOVERFLOW => error.Unseekable,
os.ESPIPE => error.Unseekable,
- else => |err| os.unexpectedErrno(err),
+ else => error.Unexpected,
};
}
}
@@ -663,3 +678,6 @@ fn buffer_limit(buffer_len: usize) usize {
};
return std.math.min(limit, buffer_len);
}
+
+pub var global: IO = undefined;
+pub var global_loaded: bool = false;
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
index 794f18f90..0cf6b7c58 100644
--- a/src/io/io_linux.zig
+++ b/src/io/io_linux.zig
@@ -872,3 +872,6 @@ pub fn write(
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
return os.socket(family, sock_type, protocol);
}
+
+pub var global: IO = undefined;
+pub var global_loaded: bool = false;
diff --git a/src/main.zig b/src/main.zig
index 1635f5ad1..d70602827 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -17,6 +17,8 @@ pub const MainPanicHandler = panicky.NewPanicHandler(std.builtin.default_panic);
const js = @import("javascript/jsc/bindings/bindings.zig");
usingnamespace @import("javascript/jsc/javascript.zig");
+pub const io_mode = .blocking;
+
pub fn panic(msg: []const u8, error_return_trace: ?*std.builtin.StackTrace) noreturn {
MainPanicHandler.handle_panic(msg, error_return_trace);
}
diff --git a/src/pool.zig b/src/pool.zig
new file mode 100644
index 000000000..b4fe0eb29
--- /dev/null
+++ b/src/pool.zig
@@ -0,0 +1,37 @@
+const std = @import("std");
+
+pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: *std.mem.Allocator) anyerror!Type)) type {
+ return struct {
+ const LinkedList = std.SinglyLinkedList(Type);
+ // mimalloc crashes on realloc across threads
+ threadlocal var list: LinkedList = undefined;
+ threadlocal var loaded: bool = false;
+ pub fn get(allocator: *std.mem.Allocator) *LinkedList.Node {
+ if (loaded) {
+ if (list.popFirst()) |node| {
+ node.data.reset();
+ return node;
+ }
+ }
+
+ var new_node = allocator.create(LinkedList.Node) catch unreachable;
+ new_node.* = LinkedList.Node{
+ .data = Init(
+ allocator,
+ ) catch unreachable,
+ };
+
+ return new_node;
+ }
+
+ pub fn release(node: *LinkedList.Node) void {
+ if (loaded) {
+ list.prepend(node);
+ return;
+ }
+
+ list = LinkedList{ .first = node };
+ loaded = true;
+ }
+ };
+}
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index 38c102a77..6305590db 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -4,10 +4,13 @@
const std = @import("std");
const ThreadPool = @This();
const Futex = @import("./futex.zig");
+const AsyncIO = @import("io");
const assert = std.debug.assert;
const Atomic = std.atomic.Atomic;
+io: ?*AsyncIO = null,
+
stack_size: u32,
max_threads: u32,
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
@@ -15,6 +18,7 @@ idle_event: Event = .{},
join_event: Event = .{},
run_queue: Node.Queue = .{},
threads: Atomic(?*Thread) = Atomic(?*Thread).init(null),
+name: []const u8 = "",
const Sync = packed struct {
/// Tracks the number of threads not searching for Tasks
@@ -172,6 +176,7 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
if (can_wake and sync.spawned < self.max_threads) {
const spawn_config = std.Thread.SpawnConfig{ .stack_size = self.stack_size };
const thread = std.Thread.spawn(spawn_config, Thread.run, .{self}) catch return self.unregister(null);
+ // if (self.name.len > 0) thread.setName(self.name) catch {};
return thread.detach();
}
@@ -230,7 +235,14 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
// Wait for a signal by either notify() or shutdown() without wasting cpu cycles.
// TODO: Add I/O polling here.
+ if (self.io) |io| {
+ io.tick() catch unreachable;
+ }
} else {
+ if (self.io) |io| {
+ while (true) io.run_for_ns(std.time.ns_per_ms * 1000) catch {};
+ }
+
self.idle_event.wait();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
@@ -315,6 +327,8 @@ fn join(self: *ThreadPool) void {
thread.join_event.notify();
}
+const Output = @import("./global.zig").Output;
+
const Thread = struct {
next: ?*Thread = null,
target: ?*Thread = null,
@@ -326,6 +340,8 @@ const Thread = struct {
/// Thread entry point which runs a worker for the ThreadPool
fn run(thread_pool: *ThreadPool) void {
+ Output.Source.configureThread();
+
var self = Thread{};
current = &self;
@@ -344,6 +360,7 @@ const Thread = struct {
const task = @fieldParentPtr(Task, "node", result.node);
(task.callback)(task);
}
+ Output.flush();
}
}
@@ -369,6 +386,9 @@ const Thread = struct {
}
// TODO: add optimistic I/O polling here
+ if (thread_pool.io) |io| {
+ io.tick() catch {};
+ }
// Then try work stealing from other threads
var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;