diff options
author | 2021-11-20 19:27:18 -0800 | |
---|---|---|
committer | 2021-12-16 19:18:51 -0800 | |
commit | b1660fa46873a9579180f9f02c1f287a97dd8228 (patch) | |
tree | fffd40c5cf6df8544856bd9093c5d2a5b60b1148 | |
parent | 4af743766d9e32789b90e39c761d5e896426d2f3 (diff) | |
download | bun-b1660fa46873a9579180f9f02c1f287a97dd8228.tar.gz bun-b1660fa46873a9579180f9f02c1f287a97dd8228.tar.zst bun-b1660fa46873a9579180f9f02c1f287a97dd8228.zip |
[bun install] async http request works I think?
-rw-r--r-- | .vscode/launch.json | 9 | ||||
-rw-r--r-- | Makefile | 11 | ||||
-rw-r--r-- | misctools/.gitignore | 2 | ||||
-rw-r--r-- | misctools/fetch.zig | 2 | ||||
-rw-r--r-- | misctools/http_bench.zig | 250 | ||||
-rw-r--r-- | src/http.zig | 32 | ||||
-rw-r--r-- | src/http/http_client_async.zig | 1310 | ||||
-rw-r--r-- | src/http/network_thread.zig | 25 | ||||
-rw-r--r-- | src/http_client.zig | 21 | ||||
-rw-r--r-- | src/io/io_darwin.zig | 24 | ||||
-rw-r--r-- | src/io/io_linux.zig | 3 | ||||
-rw-r--r-- | src/main.zig | 2 | ||||
-rw-r--r-- | src/pool.zig | 37 | ||||
-rw-r--r-- | src/thread_pool.zig | 20 |
14 files changed, 1718 insertions, 30 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json index 002036fce..b4c7735e2 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,15 @@ { "type": "lldb", "request": "launch", + "name": "HTTP bench", + "program": "${workspaceFolder}/misctools/http_bench", + "args": ["http://example.com"], + "cwd": "${workspaceFolder}", + "console": "internalConsole" + }, + { + "type": "lldb", + "request": "launch", "name": "fetch debug", "program": "${workspaceFolder}/misctools/fetch", "args": ["https://api.github.com/repos/hanford/trends/tarball"], @@ -425,6 +425,17 @@ fetch-debug: src/deps/picohttpparser.o \ $(LIBCRYPTO_STATIC_LIB) + +httpbench-debug: + cd misctools; $(ZIG) build-obj ./http_bench.zig -fcompiler-rt -lc --main-pkg-path ../ --pkg-begin io ../$(IO_FILE) --pkg-end + $(CXX) ./misctools/http_bench.o -g -o ./misctools/http_bench $(DEFAULT_LINKER_FLAGS) -lc \ + src/deps/mimalloc/libmimalloc.a \ + src/deps/zlib/libz.a \ + src/deps/libarchive.a \ + src/deps/libs2n.a \ + src/deps/picohttpparser.o \ + $(LIBCRYPTO_STATIC_LIB) + s2n-mac: cd $(DEPS_DIR)/s2n-tls; \ make clean; \ diff --git a/misctools/.gitignore b/misctools/.gitignore index 41cdc1338..bb8850ab6 100644 --- a/misctools/.gitignore +++ b/misctools/.gitignore @@ -1,3 +1,5 @@ *.tgz tgz readlink-getfd +readlink-realpath +http_bench diff --git a/misctools/fetch.zig b/misctools/fetch.zig index 59bd3320a..e41906245 100644 --- a/misctools/fetch.zig +++ b/misctools/fetch.zig @@ -2,7 +2,7 @@ const std = @import("std"); usingnamespace @import("../src/global.zig"); const clap = @import("../src/deps/zig-clap/clap.zig"); -const HTTPClient = @import("../src/http_client.zig"); +const HTTPClient = @import("../src/http/http_client_async.zig"); const URL = @import("../src/query_string_map.zig").URL; const Headers = @import("../src/javascript/jsc/webcore/response.zig").Headers; const Method = @import("../src/http/method.zig").Method; diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig new file mode 100644 index 000000000..9100d3614 --- /dev/null +++ b/misctools/http_bench.zig @@ -0,0 +1,250 @@ +const std = @import("std"); +usingnamespace @import("../src/global.zig"); +const clap = @import("../src/deps/zig-clap/clap.zig"); + +const URL = @import("../src/query_string_map.zig").URL; +const Headers = @import("../src/javascript/jsc/webcore/response.zig").Headers; +const Method = @import("../src/http/method.zig").Method; +const ColonListType = @import("../src/cli/colon_list_type.zig").ColonListType; +const HeadersTuple = ColonListType(string, noop_resolver); +const path_handler = @import("../src/resolver/resolve_path.zig"); + +fn noop_resolver(in: string) !string { + return in; +} + +const VERSION = "0.0.0"; + +const params = [_]clap.Param(clap.Help){ + clap.parseParam("-v, --verbose Show headers & status code") catch unreachable, + clap.parseParam("-H, --header <STR>... Add a header") catch unreachable, + clap.parseParam("-r, --max-redirects <STR> Maximum number of redirects to follow (default: 128)") catch unreachable, + clap.parseParam("-b, --body <STR> HTTP request body as a string") catch unreachable, + clap.parseParam("-f, --file <STR> File path to load as body") catch unreachable, + clap.parseParam("-n, --count <INT> How many runs? Default 10") catch unreachable, + clap.parseParam("-t, --timeout <INT> Max duration per request") catch unreachable, + clap.parseParam("-r, --retry <INT> Max retry count") catch unreachable, + clap.parseParam("--no-gzip Disable gzip") catch unreachable, + clap.parseParam("--no-deflate Disable deflate") catch unreachable, + clap.parseParam("--no-compression Disable gzip & deflate") catch unreachable, + clap.parseParam("--version Print the version and exit") catch unreachable, + clap.parseParam("--turbo Skip sending TLS shutdown signals") catch unreachable, + clap.parseParam("<POS>... ") catch unreachable, +}; + +const MethodNames = std.ComptimeStringMap(Method, .{ + .{ "GET", Method.GET }, + .{ "get", Method.GET }, + + .{ "POST", Method.POST }, + .{ "post", Method.POST }, + + .{ "PUT", Method.PUT }, + .{ "put", Method.PUT }, + + .{ "PATCH", Method.PATCH }, + .{ "patch", Method.PATCH }, + + .{ "OPTIONS", Method.OPTIONS }, + .{ "options", Method.OPTIONS }, + + .{ "HEAD", Method.HEAD }, + .{ "head", Method.HEAD }, +}); + +var file_path_buf: [std.fs.MAX_PATH_BYTES + 1]u8 = undefined; +var cwd_buf: [std.fs.MAX_PATH_BYTES + 1]u8 = undefined; + +pub const Arguments = struct { + url: URL, + method: Method, + verbose: bool = false, + headers: Headers.Entries, + headers_buf: string, + body: string = "", + turbo: bool = false, + count: usize = 10, + timeout: usize = 0, + + pub fn parse(allocator: *std.mem.Allocator) !Arguments { + var diag = clap.Diagnostic{}; + + var args = clap.parse(clap.Help, ¶ms, .{ + .diagnostic = &diag, + .allocator = allocator, + }) catch |err| { + // Report useful error and exit + diag.report(Output.errorWriter(), err) catch {}; + return err; + }; + + var positionals = args.positionals(); + var raw_args: std.ArrayListUnmanaged(string) = undefined; + + if (positionals.len > 0) { + raw_args = .{ .capacity = positionals.len, .items = @intToPtr([*][]const u8, @ptrToInt(positionals.ptr))[0..positionals.len] }; + } else { + raw_args = .{}; + } + + if (args.flag("--version")) { + try Output.writer().writeAll(VERSION); + std.os.exit(0); + } + + var method = Method.GET; + var url: URL = .{}; + var body_string: string = args.option("--body") orelse ""; + + if (args.option("--file")) |file_path| { + if (file_path.len > 0) { + var cwd = try std.process.getCwd(&cwd_buf); + var parts = [_]string{std.mem.span(file_path)}; + var absolute_path = path_handler.joinAbsStringBuf(cwd, &file_path_buf, &parts, .auto); + file_path_buf[absolute_path.len] = 0; + file_path_buf[absolute_path.len + 1] = 0; + var absolute_path_len = absolute_path.len; + var absolute_path_ = file_path_buf[0..absolute_path_len :0]; + + var body_file = std.fs.openFileAbsoluteZ(absolute_path_, .{ .read = true }) catch |err| { + Output.printErrorln("<r><red>{s}<r> opening file {s}", .{ @errorName(err), absolute_path }); + Output.flush(); + std.os.exit(1); + }; + + var file_contents = body_file.readToEndAlloc(allocator, try body_file.getEndPos()) catch |err| { + Output.printErrorln("<r><red>{s}<r> reading file {s}", .{ @errorName(err), absolute_path }); + Output.flush(); + std.os.exit(1); + }; + body_string = file_contents; + } + } + + { + var raw_arg_i: usize = 0; + while (raw_arg_i < raw_args.items.len) : (raw_arg_i += 1) { + const arg = raw_args.items[raw_arg_i]; + if (MethodNames.get(std.mem.span(arg))) |method_| { + method = method_; + _ = raw_args.swapRemove(raw_arg_i); + } + } + + if (raw_args.items.len == 0) { + Output.prettyErrorln("<r><red>error<r><d>:<r> <b>Missing URL<r>\n\nExample:\n<r><b>fetch GET https://example.com<r>\n\n<b>fetch example.com/foo<r>\n\n", .{}); + Output.flush(); + std.os.exit(1); + } + + const url_position = raw_args.items.len - 1; + url = URL.parse(raw_args.swapRemove(url_position)); + if (!url.isAbsolute()) { + Output.prettyErrorln("<r><red>error<r><d>:<r> <b>Invalid URL<r>\n\nExample:\n<r><b>fetch GET https://example.com<r>\n\n<b>fetch example.com/foo<r>\n\n", .{}); + Output.flush(); + std.os.exit(1); + } + } + + return Arguments{ + .url = url, + .method = method, + .verbose = args.flag("--verbose"), + .headers = .{}, + .headers_buf = "", + .body = body_string, + .turbo = args.flag("--turbo"), + .timeout = std.fmt.parseInt(usize, args.option("--timeout") orelse "0", 10) catch |err| { + Output.prettyErrorln("<r><red>{s}<r> parsing timeout", .{@errorName(err)}); + Output.flush(); + std.os.exit(1); + }, + .count = std.fmt.parseInt(usize, args.option("--count") orelse "10", 10) catch |err| { + Output.prettyErrorln("<r><red>{s}<r> parsing count", .{@errorName(err)}); + Output.flush(); + std.os.exit(1); + }, + }; + } +}; + +const NetworkThread = @import("../src/http/network_thread.zig"); +const HTTP = @import("../src/http/http_client_async.zig"); + +var stdout_: std.fs.File = undefined; +var stderr_: std.fs.File = undefined; +pub fn main() anyerror!void { + stdout_ = std.io.getStdOut(); + stderr_ = std.io.getStdErr(); + var output_source = Output.Source.init(stdout_, stderr_); + Output.Source.set(&output_source); + + defer Output.flush(); + + var args = try Arguments.parse(default_allocator); + + var channel = try default_allocator.create(HTTP.HTTPChannel); + channel.* = HTTP.HTTPChannel.init(); + + try channel.buffer.ensureCapacity(args.count); + + try NetworkThread.init(); + + var i: usize = 0; + while (i < args.count) : (i += 1) { + var response_body = try default_allocator.create(MutableString); + response_body.* = try MutableString.init(default_allocator, 1024); + var request_body = try default_allocator.create(MutableString); + request_body.* = try MutableString.init(default_allocator, 0); + + var async_http = try default_allocator.create(HTTP.AsyncHTTP); + async_http.* = try HTTP.AsyncHTTP.init( + default_allocator, + args.method, + args.url, + args.headers, + args.headers_buf, + request_body, + response_body, + args.timeout, + ); + async_http.channel = channel; + async_http.schedule(default_allocator); + } + + var read_count: usize = 0; + var success_count: usize = 0; + var fail_count: usize = 0; + var timer = try std.time.Timer.start(); + while (read_count < args.count) { + while (channel.tryReadItem() catch null) |http| { + read_count += 1; + + Output.printElapsed(@floatCast(f64, @intToFloat(f128, http.elapsed) / std.time.ns_per_ms)); + if (http.response) |resp| { + if (resp.status_code == 200) { + success_count += 1; + } else { + fail_count += 1; + } + Output.printError(" {}\n", .{resp}); + } else if (http.err) |err| { + fail_count += 1; + Output.printError(" err: {s}\n", .{@errorName(err)}); + } else { + fail_count += 1; + Output.prettyError(" Uh-oh: {s}\n", .{@tagName(http.state.loadUnchecked())}); + } + + Output.flush(); + } + } + + Output.printElapsed(@floatCast(f64, @intToFloat(f128, timer.read()) / std.time.ns_per_ms)); + Output.prettyErrorln("Completed {d}\n Success: <green>{d}<r>\n Failure: <red>{d}<r>\n", .{ + read_count, + success_count, + fail_count, + }); + Output.flush(); +} diff --git a/src/http.zig b/src/http.zig index 4f978d44d..22bc278bf 100644 --- a/src/http.zig +++ b/src/http.zig @@ -1540,7 +1540,7 @@ pub const RequestContext = struct { ctx.appendHeader("Sec-WebSocket-Protocol", "bun-hmr"); try ctx.writeStatus(101); try ctx.flushHeaders(); - // Output.prettyln("<r><green>101<r><d> Hot Module Reloading connected.<r>", .{}); + // Output.prettyErrorln("<r><green>101<r><d> Hot Module Reloading connected.<r>", .{}); // Output.flush(); Analytics.Features.hot_module_reloading = true; @@ -1591,7 +1591,7 @@ pub const RequestContext = struct { var frame = handler.websocket.read() catch |err| { switch (err) { error.ConnectionClosed => { - // Output.prettyln("Websocket closed.", .{}); + // Output.prettyErrorln("Websocket closed.", .{}); handler.tombstone = true; is_socket_closed = true; continue; @@ -1604,7 +1604,7 @@ pub const RequestContext = struct { }; switch (frame.header.opcode) { .Close => { - // Output.prettyln("Websocket closed.", .{}); + // Output.prettyErrorln("Websocket closed.", .{}); is_socket_closed = true; return; }, @@ -1639,7 +1639,7 @@ pub const RequestContext = struct { }, .success => { if (build_result.timestamp > cmd.timestamp) { - Output.prettyln( + Output.prettyErrorln( "<r><b><green>{d}ms<r> <d>built<r> <b>{s}<r><b> <r><d>({d}+ LOC)", .{ build_result.timestamp - cmd.timestamp, @@ -2553,7 +2553,7 @@ pub const Server = struct { ); if (comptime FeatureFlags.verbose_watcher) { - Output.prettyln("<r><d>File changed: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); + Output.prettyErrorln("<r><d>File changed: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); } } else { const change_message = Api.WebsocketMessageFileChangeNotification{ @@ -2566,12 +2566,12 @@ pub const Server = struct { const change_buf = content_fbs.getWritten(); const written_buf = filechange_buf[0 .. header.len + change_buf.len]; RequestContext.WebsocketHandler.broadcast(written_buf) catch |err| { - Output.prettyln("Error writing change notification: {s}<r>", .{@errorName(err)}); + Output.prettyErrorln("Error writing change notification: {s}<r>", .{@errorName(err)}); }; if (comptime is_emoji_enabled) { - Output.prettyln("<r>📜 <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); + Output.prettyErrorln("<r>📜 <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); } else { - Output.prettyln("<r> <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); + Output.prettyErrorln("<r> <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); } } }, @@ -2583,9 +2583,9 @@ pub const Server = struct { // ctx.watcher.removeAtIndex(event.index, hashes[event.index], parent_hashes, .directory); if (comptime is_emoji_enabled) { - Output.prettyln("<r>📁 <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); + Output.prettyErrorln("<r>📁 <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); } else { - Output.prettyln("<r> <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); + Output.prettyErrorln("<r> <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)}); } }, } @@ -2839,13 +2839,13 @@ pub const Server = struct { 200, 304, 101 => {}, 201...303, 305...399 => { - Output.prettyln("<r><green>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); + Output.prettyErrorln("<r><green>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); }, 400...499 => { - Output.prettyln("<r><yellow>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); + Output.prettyErrorln("<r><yellow>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); }, else => { - Output.prettyln("<r><red>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); + Output.prettyErrorln("<r><red>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); }, } } @@ -2861,13 +2861,13 @@ pub const Server = struct { 200, 304, 101 => {}, 201...303, 305...399 => { - Output.prettyln("<r><green>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); + Output.prettyErrorln("<r><green>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); }, 400...499 => { - Output.prettyln("<r><yellow>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); + Output.prettyErrorln("<r><yellow>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); }, else => { - Output.prettyln("<r><red>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); + Output.prettyErrorln("<r><red>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value }); }, } } diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig new file mode 100644 index 000000000..4bb1339c4 --- /dev/null +++ b/src/http/http_client_async.zig @@ -0,0 +1,1310 @@ +const picohttp = @import("../deps/picohttp.zig"); +usingnamespace @import("../global.zig"); +const std = @import("std"); +const Headers = @import("../javascript/jsc/webcore/response.zig").Headers; +const URL = @import("../query_string_map.zig").URL; +const Method = @import("../http/method.zig").Method; +const Api = @import("../api/schema.zig").Api; +const Lock = @import("../lock.zig").Lock; +const HTTPClient = @This(); +const SOCKET_FLAGS = os.SOCK_CLOEXEC | os.SOCK_NONBLOCK; +const S2n = @import("../s2n.zig"); +const Zlib = @import("../zlib.zig"); +const StringBuilder = @import("../string_builder.zig"); +const AsyncIO = @import("io"); +const ThreadPool = @import("../thread_pool.zig"); + +const NetoworkThread = @import("./network_thread.zig"); + +fn writeRequest( + writer: *AsyncSocket, + request: picohttp.Request, + body: string, + // header_hashes: []u64, +) !void { + _ = try writer.write(request.method); + _ = try writer.write(" "); + _ = try writer.write(request.path); + _ = try writer.write(" HTTP/1.1\r\n"); + + for (request.headers) |header, i| { + _ = try writer.write(header.name); + _ = try writer.write(": "); + _ = try writer.write(header.value); + _ = try writer.write("\r\n"); + } + + _ = try writer.write("\r\n"); + + if (body.len > 0) { + _ = try writer.write(body); + } +} + +method: Method, +header_entries: Headers.Entries, +header_buf: string, +url: URL, +allocator: *std.mem.Allocator, +verbose: bool = isTest, +tcp_client: tcp.Client = undefined, +body_size: u32 = 0, +read_count: u32 = 0, +remaining_redirect_count: i8 = 127, +redirect_buf: [2048]u8 = undefined, +disable_shutdown: bool = true, +timeout: usize = 0, +progress_node: ?*std.Progress.Node = null, +socket: AsyncSocket = undefined, + +pub fn init( + allocator: *std.mem.Allocator, + method: Method, + url: URL, + header_entries: Headers.Entries, + header_buf: string, +) !HTTPClient { + return HTTPClient{ + .allocator = allocator, + .method = method, + .url = url, + .header_entries = header_entries, + .header_buf = header_buf, + .socket = try AsyncSocket.init(&AsyncIO.global, 0, allocator), + }; +} + +threadlocal var response_headers_buf: [256]picohttp.Header = undefined; +threadlocal var request_content_len_buf: [64]u8 = undefined; +threadlocal var header_name_hashes: [256]u64 = undefined; +// threadlocal var resolver_cache +const tcp = std.x.net.tcp; +const ip = std.x.net.ip; + +const IPv4 = std.x.os.IPv4; +const IPv6 = std.x.os.IPv6; +const Socket = std.x.os.Socket; +const os = std.os; + +// lowercase hash header names so that we can be sure +pub fn hashHeaderName(name: string) u64 { + var hasher = std.hash.Wyhash.init(0); + var remain: string = name; + var buf: [32]u8 = undefined; + var buf_slice: []u8 = std.mem.span(&buf); + + while (remain.len > 0) { + var end = std.math.min(hasher.buf.len, remain.len); + + hasher.update(strings.copyLowercase(std.mem.span(remain[0..end]), buf_slice)); + remain = remain[end..]; + } + + return hasher.final(); +} + +const host_header_hash = hashHeaderName("Host"); +const connection_header_hash = hashHeaderName("Connection"); + +pub const Encoding = enum { + identity, + gzip, + deflate, + brotli, + chunked, +}; + +const content_encoding_hash = hashHeaderName("Content-Encoding"); +const transfer_encoding_header = hashHeaderName("Transfer-Encoding"); + +const host_header_name = "Host"; +const content_length_header_name = "Content-Length"; +const content_length_header_hash = hashHeaderName("Content-Length"); +const connection_header = picohttp.Header{ .name = "Connection", .value = "close" }; +const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" }; +const accept_header_hash = hashHeaderName("Accept"); + +const accept_encoding_no_compression = "identity"; +const accept_encoding_compression = "deflate, gzip"; +const accept_encoding_header_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression }; +const accept_encoding_header_no_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_no_compression }; + +const accept_encoding_header = if (FeatureFlags.disable_compression_in_http_client) + accept_encoding_header_no_compression +else + accept_encoding_header_compression; + +const accept_encoding_header_hash = hashHeaderName("Accept-Encoding"); + +const user_agent_header = picohttp.Header{ .name = "User-Agent", .value = "Bun.js " ++ Global.package_json_version }; +const user_agent_header_hash = hashHeaderName("User-Agent"); +const location_header_hash = hashHeaderName("Location"); + +pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string { + return this.header_buf[ptr.offset..][0..ptr.length]; +} + +pub const HeaderBuilder = struct { + content: StringBuilder = StringBuilder{}, + header_count: u64 = 0, + entries: Headers.Entries = Headers.Entries{}, + + pub fn count(this: *HeaderBuilder, name: string, value: string) void { + this.header_count += 1; + this.content.count(name); + this.content.count(value); + } + + pub fn allocate(this: *HeaderBuilder, allocator: *std.mem.Allocator) !void { + try this.content.allocate(allocator); + try this.entries.ensureTotalCapacity(allocator, this.header_count); + } + pub fn append(this: *HeaderBuilder, name: string, value: string) void { + const name_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, name.len), + }; + + _ = this.content.append(name); + + const value_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, value.len), + }; + _ = this.content.append(value); + this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr }); + } + + pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void { + client.header_entries = this.entries; + client.header_buf = this.content.ptr.?[0..this.content.len]; + } +}; + +threadlocal var server_name_buf: [1024]u8 = undefined; + +pub const HTTPChannel = @import("../sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 }); + +pub const AsyncHTTP = struct { + request: ?picohttp.Request = null, + response: ?picohttp.Response = null, + request_headers: Headers.Entries = Headers.Entries{}, + response_headers: Headers.Entries = Headers.Entries{}, + response_buffer: *MutableString, + request_body: *MutableString, + allocator: *std.mem.Allocator, + request_header_buf: string = "", + method: Method = Method.GET, + max_retry_count: u32 = 0, + url: URL, + + /// Timeout in nanoseconds + timeout: usize = 0, + + response_encoding: Encoding = Encoding.identity, + redirect_count: u32 = 0, + retries_count: u32 = 0, + + client: HTTPClient = undefined, + err: ?anyerror = null, + + state: AtomicState = AtomicState.init(State.pending), + channel: ?*HTTPChannel = undefined, + elapsed: u64 = 0, + + pub var active_requests_count = std.atomic.Atomic(u32).init(0); + + pub const State = enum(u32) { + pending = 0, + scheduled = 1, + sending = 2, + success = 3, + fail = 4, + }; + const AtomicState = std.atomic.Atomic(State); + + pub fn init( + allocator: *std.mem.Allocator, + method: Method, + url: URL, + headers: Headers.Entries, + headers_buf: string, + response_buffer: *MutableString, + request_body: *MutableString, + timeout: usize, + ) !AsyncHTTP { + var this = AsyncHTTP{ + .allocator = allocator, + .url = url, + .method = method, + .request_headers = headers, + .request_header_buf = headers_buf, + .request_body = request_body, + .response_buffer = response_buffer, + }; + this.client = try HTTPClient.init(allocator, method, url, headers, headers_buf); + this.client.timeout = timeout; + this.timeout = timeout; + return this; + } + + pub fn schedule(this: *AsyncHTTP, allocator: *std.mem.Allocator) void { + std.debug.assert(NetoworkThread.global_loaded); + var sender = HTTPSender.get(this, allocator); + this.state.store(.scheduled, .Monotonic); + NetoworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task)); + } + + const HTTPSender = struct { + task: ThreadPool.Task = .{ .callback = callback }, + frame: @Frame(AsyncHTTP.do) = undefined, + http: *AsyncHTTP = undefined, + + next: ?*HTTPSender = null, + + var head: ?*HTTPSender = null; + + pub fn get(http: *AsyncHTTP, allocator: *std.mem.Allocator) *HTTPSender { + if (head == null) { + head = allocator.create(HTTPSender) catch unreachable; + head.?.* = HTTPSender{}; + } + + var head_ = head.?; + head = head.?.next; + head_.next = null; + head_.task = .{ .callback = callback }; + head_.http = http; + + return head_; + } + + pub fn release(this: *HTTPSender) void { + // head = this; + } + + pub fn callback(task: *ThreadPool.Task) void { + var this = @fieldParentPtr(HTTPSender, "task", task); + this.frame = async AsyncHTTP.do(this); + } + + pub fn onFinish(this: *HTTPSender) void {} + }; + + pub fn do(sender: *HTTPSender) void { + { + var this = sender.http; + this.err = null; + this.state.store(.sending, .Monotonic); + var timer = std.time.Timer.start() catch @panic("Timer failure"); + defer this.elapsed = timer.read(); + _ = active_requests_count.fetchAdd(1, .Monotonic); + defer _ = active_requests_count.fetchSub(1, .Monotonic); + this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| { + this.state.store(.fail, .Monotonic); + this.err = err; + return; + }; + this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0)); + this.state.store(.success, .Monotonic); + } + + switch (sender.http.state.load(.Monotonic)) { + .fail => { + if (sender.http.max_retry_count > sender.http.retries_count) { + sender.http.retries_count += 1; + NetoworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task)); + return; + } + }, + else => {}, + } + + if (sender.http.channel) |channel| { + std.debug.assert(channel.tryWriteItem(sender.http) catch false); + } + + sender.release(); + } +}; + +const AsyncMessage = struct { + const buffer_size = std.math.maxInt(u16) - 64; + used: u16 = 0, + sent: u16 = 0, + completion: AsyncIO.Completion = undefined, + buf: [buffer_size]u8 = undefined, + allocator: *std.mem.Allocator, + next: ?*AsyncMessage = null, + context: *c_void = undefined, + + var _first: ?*AsyncMessage = null; + pub fn get(allocator: *std.mem.Allocator) *AsyncMessage { + if (_first) |first| { + var prev = first; + if (first.next) |next| { + _first = next; + prev.next = null; + return prev; + } + + return prev; + } + + var msg = allocator.create(AsyncMessage) catch unreachable; + msg.* = AsyncMessage{ .allocator = allocator }; + return msg; + } + + pub fn release(self: *AsyncMessage) void { + self.next = _first; + self.used = 0; + self.sent = 0; + _first = self; + } + + const WriteResponse = struct { + written: u32 = 0, + overflow: bool = false, + }; + + pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse { + var remain = this.buf[this.used..]; + var writable = buffer[0..@minimum(buffer.len, remain.len)]; + if (writable.len == 0) { + return .{ .written = 0, .overflow = buffer.len > 0 }; + } + + std.mem.copy(u8, remain, writable); + this.used += @intCast(u16, writable.len); + + return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len }; + } + + pub inline fn slice(this: *const AsyncMessage) []const u8 { + return this.buf[0..this.used][this.sent..]; + } + + pub inline fn available(this: *AsyncMessage) []u8 { + return this.buf[0 .. this.buf.len - this.used]; + } +}; + +const Completion = AsyncIO.Completion; + +const AsyncSocket = struct { + const This = @This(); + io: *AsyncIO = undefined, + socket: std.os.socket_t = 0, + head: *AsyncMessage = undefined, + tail: *AsyncMessage = undefined, + allocator: *std.mem.Allocator, + err: ?anyerror = null, + queued: usize = 0, + sent: usize = 0, + send_frame: @Frame(AsyncSocket.send) = undefined, + read_frame: @Frame(AsyncSocket.read) = undefined, + connect_frame: @Frame(AsyncSocket.connect) = undefined, + read_context: []u8 = undefined, + read_offset: u64 = 0, + read_completion: AsyncIO.Completion = undefined, + connect_completion: AsyncIO.Completion = undefined, + + const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError; + + pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: *std.mem.Allocator) !AsyncSocket { + var head = AsyncMessage.get(allocator); + + return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator }; + } + + fn on_connect(this: *AsyncSocket, completion: *Completion, err: ConnectError!void) void { + err catch |resolved_err| { + this.err = resolved_err; + }; + + resume this.connect_frame; + } + + pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { + this.socket = 0; + + const list = std.net.getAddressList(this.allocator, name, port) catch |err| { + return @errSetCast(ConnectError, err); + }; + defer list.deinit(); + + if (list.addrs.len == 0) return error.ConnectionRefused; + + for (list.addrs) |address| { + const sockfd = try AsyncIO.openSocket(address.any.family, SOCKET_FLAGS | std.os.SOCK_STREAM, std.os.IPPROTO_TCP); + + this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); + suspend { + this.connect_frame = @frame().*; + } + + if (this.err) |e| { + std.os.closeSocket(sockfd); + + switch (e) { + error.ConnectionRefused => { + this.err = null; + continue; + }, + else => return @errSetCast(ConnectError, e), + } + } + + this.socket = sockfd; + return; + } + + return error.ConnectionRefused; + } + + fn on_send(msg: *AsyncMessage, completion: *Completion, result: SendError!usize) void { + var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context)); + const written = result catch |err| { + this.err = err; + resume this.send_frame; + return; + }; + + if (written == 0) { + resume this.send_frame; + return; + } + + msg.sent += @truncate(u16, written); + const has_more = msg.used > msg.sent; + this.sent += written; + + if (has_more) { + this.io.send(*AsyncMessage, msg, on_send, &msg.completion, this.socket, msg.slice()); + } else { + msg.release(); + } + + // complete + if (this.queued <= this.sent) { + resume this.send_frame; + } + } + + pub fn write(this: *AsyncSocket, buf: []const u8) AsyncSocket.SendError!usize { + this.tail.context = this; + + const resp = this.tail.writeAll(buf); + this.queued += resp.written; + + if (resp.overflow) { + var next = AsyncMessage.get(this.allocator); + this.tail.next = next; + this.tail = next; + + return @as(usize, resp.written) + try this.write(buf[resp.written..]); + } + + return @as(usize, resp.written); + } + + pub const SendError = AsyncIO.SendError; + + pub fn deinit(this: *AsyncSocket) void { + var node = this.head; + while (node.next) |element| { + element.release(); + node = element.next orelse break; + } + this.head.release(); + } + + pub fn send(this: *This) SendError!usize { + const original_sent = this.sent; + this.head.context = this; + + this.io.send(*AsyncMessage, this.head, on_send, &this.head.completion, this.socket, this.head.slice()); + + var node = this.head; + while (node.next) |element| { + this.io.send(*AsyncMessage, element, on_send, &element.completion, this.socket, element.slice()); + node = element.next orelse break; + } + + suspend { + this.send_frame = @frame().*; + } + + if (this.err) |err| { + this.err = null; + return @errSetCast(AsyncSocket.SendError, err); + } + + return this.sent - original_sent; + } + + pub const RecvError = AsyncIO.RecvError; + + pub fn read( + this: *AsyncSocket, + bytes: []u8, + offset: u64, + ) RecvError!u64 { + this.read_context = bytes; + this.read_offset = offset; + const original_read_offset = this.read_offset; + const Reader = struct { + pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void { + const len = result catch |err| { + ctx.err = err; + resume ctx.read_frame; + return; + }; + ctx.read_offset += len; + resume ctx.read_frame; + } + }; + + this.io.recv( + *AsyncSocket, + this, + Reader.on_read, + &this.read_completion, + this.socket, + bytes, + ); + + suspend { + this.read_frame = @frame().*; + } + + if (this.err) |err| { + this.err = null; + return @errSetCast(RecvError, err); + } + + return this.read_offset - original_read_offset; + } +}; + +threadlocal var request_headers_buf: [256]picohttp.Header = undefined; + +pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { + var header_count: usize = 0; + var header_entries = this.header_entries.slice(); + var header_names = header_entries.items(.name); + var header_values = header_entries.items(.value); + + var override_accept_encoding = false; + + var override_user_agent = false; + for (header_names) |head, i| { + const name = this.headerStr(head); + // Hash it as lowercase + const hash = hashHeaderName(name); + + // Skip host and connection header + // we manage those + switch (hash) { + host_header_hash, + connection_header_hash, + content_length_header_hash, + => continue, + else => {}, + } + + override_user_agent = override_user_agent or hash == user_agent_header_hash; + + override_accept_encoding = override_accept_encoding or hash == accept_encoding_header_hash; + + request_headers_buf[header_count] = (picohttp.Header{ + .name = name, + .value = this.headerStr(header_values[i]), + }); + + // header_name_hashes[header_count] = hash; + + // // ensure duplicate headers come after each other + // if (header_count > 2) { + // var head_i: usize = header_count - 1; + // while (head_i > 0) : (head_i -= 1) { + // if (header_name_hashes[head_i] == header_name_hashes[header_count]) { + // std.mem.swap(picohttp.Header, &header_name_hashes[header_count], &header_name_hashes[head_i + 1]); + // std.mem.swap(u64, &request_headers_buf[header_count], &request_headers_buf[head_i + 1]); + // break; + // } + // } + // } + header_count += 1; + } + + // request_headers_buf[header_count] = connection_header; + // header_count += 1; + + if (!override_user_agent) { + request_headers_buf[header_count] = user_agent_header; + header_count += 1; + } + + request_headers_buf[header_count] = accept_header; + header_count += 1; + + request_headers_buf[header_count] = picohttp.Header{ + .name = host_header_name, + .value = this.url.hostname, + }; + header_count += 1; + + if (!override_accept_encoding) { + request_headers_buf[header_count] = accept_encoding_header; + header_count += 1; + } + + if (body_len > 0) { + request_headers_buf[header_count] = picohttp.Header{ + .name = content_length_header_name, + .value = std.fmt.bufPrint(&request_content_len_buf, "{d}", .{body_len}) catch "0", + }; + header_count += 1; + } + + return picohttp.Request{ + .method = @tagName(this.method), + .path = this.url.pathname, + .minor_version = 1, + .headers = request_headers_buf[0..header_count], + }; +} + +pub fn connect( + this: *HTTPClient, +) !void { + const port = this.url.getPortAuto(); + + try this.socket.connect(this.url.hostname, port); + var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket) }; + client.setNoDelay(true) catch {}; + client.setReadBufferSize(AsyncMessage.buffer_size) catch {}; + client.setQuickACK(true) catch {}; + + if (this.timeout > 0) { + client.setReadTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; + client.setWriteTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; + } +} + +pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) @Frame(HTTPClient.send) { + return async this.send(body, body_out_str); +} + +pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { + // this prevents stack overflow + redirect: while (this.remaining_redirect_count >= -1) { + if (this.url.isHTTPS()) { + return error.NotImplementedYet; + // return this.sendHTTPS(body, body_out_str) catch |err| { + // switch (err) { + // error.Redirect => { + // this.remaining_redirect_count -= 1; + // continue :redirect; + // }, + // else => return err, + // } + // }; + } else { + return this.sendHTTP(body, body_out_str) catch |err| { + switch (err) { + error.Redirect => { + this.remaining_redirect_count -= 1; + continue :redirect; + }, + else => return err, + } + }; + } + } + + return error.TooManyRedirects; +} + +const Task = ThreadPool.Task; + +pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { + try this.connect(); + defer if (this.socket.socket > 0) std.os.closeSocket(this.socket.socket); + var request = buildRequest(this, body.len); + if (this.verbose) { + Output.prettyErrorln("{s}", .{request}); + } + + try writeRequest(&this.socket, request, body); + _ = try this.socket.send(); + var client_reader = &this.socket; + + if (this.progress_node == null) { + return this.processResponse( + false, + false, + @TypeOf(client_reader), + client_reader, + body_out_str, + ); + } else { + return this.processResponse( + false, + true, + @TypeOf(client_reader), + client_reader, + body_out_str, + ); + } +} + +const ZlibPool = struct { + lock: Lock = Lock.init(), + items: std.ArrayList(*MutableString), + allocator: *std.mem.Allocator, + pub var instance: ZlibPool = undefined; + pub var loaded: bool = false; + + pub fn init(allocator: *std.mem.Allocator) ZlibPool { + return ZlibPool{ + .allocator = allocator, + .items = std.ArrayList(*MutableString).init(allocator), + }; + } + + pub fn get(this: *ZlibPool) !*MutableString { + this.lock.lock(); + defer this.lock.unlock(); + switch (this.items.items.len) { + 0 => { + var mutable = try this.allocator.create(MutableString); + mutable.* = try MutableString.init(this.allocator, 0); + return mutable; + }, + else => { + return this.items.pop(); + }, + } + + return item; + } + + pub fn put(this: *ZlibPool, mutable: *MutableString) !void { + this.lock.lock(); + defer this.lock.unlock(); + mutable.reset(); + try this.items.append(mutable); + } +}; + +pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { + var response: picohttp.Response = undefined; + var request_message = AsyncMessage.get(this.allocator); + defer request_message.release(); + var request_buffer: []u8 = &request_message.buf; + var read_length: usize = 0; + { + var read_headers_up_to: usize = 0; + + var req_buf_read: usize = std.math.maxInt(usize); + defer this.read_count += @intCast(u32, read_length); + + restart: while (req_buf_read != 0) { + req_buf_read = try client.read(request_buffer, read_length); + read_length += req_buf_read; + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(read_length); + this.progress_node.?.context.maybeRefresh(); + } + + var request_body = request_buffer[0..read_length]; + read_headers_up_to = if (read_headers_up_to > read_length) read_length else read_headers_up_to; + + response = picohttp.Response.parseParts(request_body, &response_headers_buf, &read_headers_up_to) catch |err| { + switch (err) { + error.ShortRead => { + continue :restart; + }, + else => { + return err; + }, + } + }; + break :restart; + } + } + + body_out_str.reset(); + var content_length: u32 = 0; + var encoding = Encoding.identity; + var transfer_encoding = Encoding.identity; + + var location: string = ""; + + if (this.verbose) { + Output.prettyErrorln("Response: {s}", .{response}); + } + + for (response.headers) |header| { + switch (hashHeaderName(header.name)) { + content_length_header_hash => { + content_length = std.fmt.parseInt(u32, header.value, 10) catch 0; + try body_out_str.inflate(content_length); + body_out_str.list.expandToCapacity(); + this.body_size = content_length; + }, + content_encoding_hash => { + if (strings.eqlComptime(header.value, "gzip")) { + encoding = Encoding.gzip; + } else if (strings.eqlComptime(header.value, "deflate")) { + encoding = Encoding.deflate; + } else if (!strings.eqlComptime(header.value, "identity")) { + return error.UnsupportedContentEncoding; + } + }, + transfer_encoding_header => { + if (strings.eqlComptime(header.value, "gzip")) { + transfer_encoding = Encoding.gzip; + } else if (strings.eqlComptime(header.value, "deflate")) { + transfer_encoding = Encoding.deflate; + } else if (strings.eqlComptime(header.value, "identity")) { + transfer_encoding = Encoding.identity; + } else if (strings.eqlComptime(header.value, "chunked")) { + transfer_encoding = Encoding.chunked; + } else { + return error.UnsupportedTransferEncoding; + } + }, + location_header_hash => { + location = header.value; + }, + + else => {}, + } + } + + if (location.len > 0 and this.remaining_redirect_count > 0) { + switch (response.status_code) { + 302, 301, 307, 308, 303 => { + if (strings.indexOf(location, "://")) |i| { + const protocol_name = location[0..i]; + if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else { + return error.UnsupportedRedirectProtocol; + } + + std.mem.copy(u8, &this.redirect_buf, location); + this.url = URL.parse(location); + } else { + const original_url = this.url; + this.url = URL.parse(std.fmt.bufPrint( + &this.redirect_buf, + "{s}://{s}{s}", + .{ original_url.displayProtocol(), original_url.displayHostname(), location }, + ) catch return error.RedirectURLTooLong); + } + + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303 + if (response.status_code == 303) { + this.method = .GET; + } + + return error.Redirect; + }, + else => {}, + } + } + + if (transfer_encoding == Encoding.chunked) { + var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); + var buffer_: *MutableString = body_out_str; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } + + buffer_ = try ZlibPool.instance.get(); + }, + else => {}, + } + + var buffer = buffer_.*; + + var last_read: usize = 0; + { + var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; + last_read = remainder.len; + try buffer.inflate(std.math.max(remainder.len, 2048)); + buffer.list.expandToCapacity(); + std.mem.copy(u8, buffer.list.items, remainder); + } + + // set consume_trailer to 1 to discard the trailing header + // using content-encoding per chunk is not supported + decoder.consume_trailer = 1; + + // these variable names are terrible + // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked + // (but ported from C -> zig) + var rret: usize = 0; + var rsize: usize = last_read; + var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); + var total_size = rsize; + + while (pret == -2) { + if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { + try buffer.inflate(std.math.max(total_size * 2, 1024)); + buffer.list.expandToCapacity(); + } + + rret = try client.read(buffer.list.items, total_size); + + if (rret == 0) { + return error.ChunkedEncodingError; + } + + rsize = rret; + pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); + if (pret == -1) return error.ChunkedEncodingParseError; + + total_size += rsize; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(total_size); + this.progress_node.?.context.maybeRefresh(); + } + } + + buffer.list.shrinkRetainingCapacity(total_size); + buffer_.* = buffer; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator); + reader.readAll() catch |err| { + if (reader.errorMessage()) |msg| { + Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg}); + Output.flush(); + } + return err; + }; + }, + else => {}, + } + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_out_str.list.items.len); + this.progress_node.?.context.maybeRefresh(); + } + + this.body_size = @intCast(u32, body_out_str.list.items.len); + return response; + } + + if (content_length > 0) { + var remaining_content_length = content_length; + var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; + remainder = remainder[0..std.math.min(remainder.len, content_length)]; + var buffer_: *MutableString = body_out_str; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } + + buffer_ = try ZlibPool.instance.get(); + if (buffer_.list.capacity < remaining_content_length) { + try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); + } + buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; + }, + else => {}, + } + var buffer = buffer_.*; + + var body_size: usize = 0; + if (remainder.len > 0) { + std.mem.copy(u8, buffer.list.items, remainder); + body_size = remainder.len; + this.read_count += @intCast(u32, body_size); + remaining_content_length -= @intCast(u32, remainder.len); + } + + while (remaining_content_length > 0) { + const size = @intCast(u32, try client.read( + buffer.list.items, + body_size, + )); + this.read_count += size; + if (size == 0) break; + + body_size += size; + remaining_content_length -= size; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_size); + this.progress_node.?.context.maybeRefresh(); + } + } + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_size); + this.progress_node.?.context.maybeRefresh(); + } + + buffer.list.shrinkRetainingCapacity(body_size); + buffer_.* = buffer; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator); + reader.readAll() catch |err| { + if (reader.errorMessage()) |msg| { + Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg}); + Output.flush(); + } + return err; + }; + }, + else => {}, + } + } + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_out_str.list.items.len); + this.progress_node.?.context.maybeRefresh(); + } + + return response; +} + +pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response { + var connection = try this.connect(); + S2n.boot(default_allocator); + const hostname = this.url.displayHostname(); + std.mem.copy(u8, &server_name_buf, hostname); + server_name_buf[hostname.len] = 0; + var server_name = server_name_buf[0..hostname.len :0]; + + var client = S2n.Connection.init(connection.socket.fd); + try client.start(server_name); + client.disable_shutdown = this.disable_shutdown; + defer client.close() catch {}; + + var request = buildRequest(this, body_str.len); + if (this.verbose) { + Output.prettyErrorln("{s}", .{request}); + } + const body = body_str; + + var client_writer = client.writer(); + { + var client_writer_buffered = std.io.bufferedWriter(client_writer); + var client_writer_buffered_writer = client_writer_buffered.writer(); + + try writeRequest(@TypeOf(&client_writer_buffered_writer), &client_writer_buffered_writer, request, body); + try client_writer_buffered_writer.writeAll("\r\n"); + try client_writer_buffered.flush(); + } + + if (body.len > 0) { + try client_writer.writeAll(body); + } + + if (this.progress_node == null) { + return try this.processResponse(true, false, @TypeOf(&client), &client, body_out_str); + } else { + return try this.processResponse(true, true, @TypeOf(&client), &client, body_out_str); + } +} + +// zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec +test "sendHTTP - only" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + // headers.appendHeader("X-What", "ok", true, true, false); + headers.appendHeader("Accept-Encoding", "identity", true, true, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("http://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTP("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqual(body_out_str.list.items.len, 1256); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTP - gzip" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec +test "sendHTTP - gzip" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + // headers.appendHeader("X-What", "ok", true, true, false); + headers.appendHeader("Accept-Encoding", "gzip", true, true, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("http://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTP("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTPS - identity" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec +test "sendHTTPS - identity" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("X-What", "ok", true, true, false); + headers.appendHeader("Accept-Encoding", "identity", true, true, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTPS("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +test "sendHTTPS - gzip" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("Accept-Encoding", "gzip", false, false, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTPS("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTPS - deflate" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test +test "sendHTTPS - deflate" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("Accept-Encoding", "deflate", false, false, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTPS("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTP" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test + +test "send - redirect" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("Accept-Encoding", "gzip", false, false, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://www.bun.sh/"), + headers.entries, + headers.buf.items, + ); + try std.testing.expectEqualStrings(client.url.hostname, "www.bun.sh"); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.send("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqual(client.url.hostname, "bun.sh"); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} diff --git a/src/http/network_thread.zig b/src/http/network_thread.zig new file mode 100644 index 000000000..d7f9c4409 --- /dev/null +++ b/src/http/network_thread.zig @@ -0,0 +1,25 @@ +const ThreadPool = @import("../thread_pool.zig"); +const Batch = ThreadPool.Batch; +const std = @import("std"); +const AsyncIO = @import("io"); + +const NetworkThread = @This(); + +/// Single-thread in this pool +pool: ThreadPool, + +pub var global: NetworkThread = undefined; +pub var global_loaded: bool = false; + +pub fn init() !void { + AsyncIO.global = try AsyncIO.init(0, 0); + AsyncIO.global_loaded = true; + + global = NetworkThread{ + .pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }), + }; + + global.pool.io = &AsyncIO.global; + + global_loaded = true; +} diff --git a/src/http_client.zig b/src/http_client.zig index f75f9a340..3b78e625e 100644 --- a/src/http_client.zig +++ b/src/http_client.zig @@ -259,16 +259,8 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { pub fn connect( this: *HTTPClient, ) !tcp.Client { - var client: tcp.Client = try tcp.Client.init(tcp.Domain.ip, .{ .close_on_exec = true }); const port = this.url.getPortAuto(); - client.setNoDelay(true) catch {}; - client.setReadBufferSize(http_req_buf.len) catch {}; - client.setQuickACK(true) catch {}; - - if (this.timeout > 0) { - client.setReadTimeout(this.timeout) catch {}; - client.setWriteTimeout(this.timeout) catch {}; - } + var client: tcp.Client = undefined; // if (this.url.isLocalhost()) { // try client.connect( @@ -277,7 +269,16 @@ pub fn connect( // } else { // } else if (this.url.isDomainName()) { var stream = try std.net.tcpConnectToHost(default_allocator, this.url.hostname, port); - client.socket = std.x.os.Socket.from(stream.handle); + client = tcp.Client{ .socket = std.x.os.Socket.from(stream.handle) }; + + if (this.timeout > 0) { + client.setReadTimeout(this.timeout) catch {}; + client.setWriteTimeout(this.timeout) catch {}; + } + + client.setNoDelay(true) catch {}; + client.setReadBufferSize(http_req_buf.len) catch {}; + client.setQuickACK(true) catch {}; // } // } else if (this.url.getIPv4Address()) |ip_addr| { diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index 00186610b..f1ff5a06e 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -1,5 +1,20 @@ const std = @import("std"); -const os = std.os; +const os = struct { + pub usingnamespace std.os; + pub const EINTR = 4; + pub const EAGAIN = 35; + pub const EBADF = 9; + pub const ECONNRESET = 54; + pub const EFAULT = 14; + pub const EINVAL = 22; + pub const EIO = 5; + pub const EISDIR = 21; + pub const ENOBUFS = 55; + pub const ENOMEM = 12; + pub const ENXIO = 6; + pub const EOVERFLOW = 84; + pub const ESPIPE = 29; +}; const mem = std.mem; const assert = std.debug.assert; @@ -486,7 +501,7 @@ pub fn read( op.len, @bitCast(isize, op.offset), ); - return switch (os.errno(rc)) { + return switch (@enumToInt(os.errno(rc))) { 0 => @intCast(usize, rc), os.EINTR => continue, os.EAGAIN => error.WouldBlock, @@ -501,7 +516,7 @@ pub fn read( os.ENXIO => error.Unseekable, os.EOVERFLOW => error.Unseekable, os.ESPIPE => error.Unseekable, - else => |err| os.unexpectedErrno(err), + else => error.Unexpected, }; } } @@ -663,3 +678,6 @@ fn buffer_limit(buffer_len: usize) usize { }; return std.math.min(limit, buffer_len); } + +pub var global: IO = undefined; +pub var global_loaded: bool = false; diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index 794f18f90..0cf6b7c58 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -872,3 +872,6 @@ pub fn write( pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t { return os.socket(family, sock_type, protocol); } + +pub var global: IO = undefined; +pub var global_loaded: bool = false; diff --git a/src/main.zig b/src/main.zig index 1635f5ad1..d70602827 100644 --- a/src/main.zig +++ b/src/main.zig @@ -17,6 +17,8 @@ pub const MainPanicHandler = panicky.NewPanicHandler(std.builtin.default_panic); const js = @import("javascript/jsc/bindings/bindings.zig"); usingnamespace @import("javascript/jsc/javascript.zig"); +pub const io_mode = .blocking; + pub fn panic(msg: []const u8, error_return_trace: ?*std.builtin.StackTrace) noreturn { MainPanicHandler.handle_panic(msg, error_return_trace); } diff --git a/src/pool.zig b/src/pool.zig new file mode 100644 index 000000000..b4fe0eb29 --- /dev/null +++ b/src/pool.zig @@ -0,0 +1,37 @@ +const std = @import("std"); + +pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: *std.mem.Allocator) anyerror!Type)) type { + return struct { + const LinkedList = std.SinglyLinkedList(Type); + // mimalloc crashes on realloc across threads + threadlocal var list: LinkedList = undefined; + threadlocal var loaded: bool = false; + pub fn get(allocator: *std.mem.Allocator) *LinkedList.Node { + if (loaded) { + if (list.popFirst()) |node| { + node.data.reset(); + return node; + } + } + + var new_node = allocator.create(LinkedList.Node) catch unreachable; + new_node.* = LinkedList.Node{ + .data = Init( + allocator, + ) catch unreachable, + }; + + return new_node; + } + + pub fn release(node: *LinkedList.Node) void { + if (loaded) { + list.prepend(node); + return; + } + + list = LinkedList{ .first = node }; + loaded = true; + } + }; +} diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 38c102a77..6305590db 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -4,10 +4,13 @@ const std = @import("std"); const ThreadPool = @This(); const Futex = @import("./futex.zig"); +const AsyncIO = @import("io"); const assert = std.debug.assert; const Atomic = std.atomic.Atomic; +io: ?*AsyncIO = null, + stack_size: u32, max_threads: u32, sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})), @@ -15,6 +18,7 @@ idle_event: Event = .{}, join_event: Event = .{}, run_queue: Node.Queue = .{}, threads: Atomic(?*Thread) = Atomic(?*Thread).init(null), +name: []const u8 = "", const Sync = packed struct { /// Tracks the number of threads not searching for Tasks @@ -172,6 +176,7 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { if (can_wake and sync.spawned < self.max_threads) { const spawn_config = std.Thread.SpawnConfig{ .stack_size = self.stack_size }; const thread = std.Thread.spawn(spawn_config, Thread.run, .{self}) catch return self.unregister(null); + // if (self.name.len > 0) thread.setName(self.name) catch {}; return thread.detach(); } @@ -230,7 +235,14 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { // Wait for a signal by either notify() or shutdown() without wasting cpu cycles. // TODO: Add I/O polling here. + if (self.io) |io| { + io.tick() catch unreachable; + } } else { + if (self.io) |io| { + while (true) io.run_for_ns(std.time.ns_per_ms * 1000) catch {}; + } + self.idle_event.wait(); sync = @bitCast(Sync, self.sync.load(.Monotonic)); } @@ -315,6 +327,8 @@ fn join(self: *ThreadPool) void { thread.join_event.notify(); } +const Output = @import("./global.zig").Output; + const Thread = struct { next: ?*Thread = null, target: ?*Thread = null, @@ -326,6 +340,8 @@ const Thread = struct { /// Thread entry point which runs a worker for the ThreadPool fn run(thread_pool: *ThreadPool) void { + Output.Source.configureThread(); + var self = Thread{}; current = &self; @@ -344,6 +360,7 @@ const Thread = struct { const task = @fieldParentPtr(Task, "node", result.node); (task.callback)(task); } + Output.flush(); } } @@ -369,6 +386,9 @@ const Thread = struct { } // TODO: add optimistic I/O polling here + if (thread_pool.io) |io| { + io.tick() catch {}; + } // Then try work stealing from other threads var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned; |