diff options
author | 2022-09-11 13:37:17 -0700 | |
---|---|---|
committer | 2022-09-11 13:37:17 -0700 | |
commit | 9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1 (patch) | |
tree | b96909744baddf65bc4175c40bcc0d9d1658dbb1 /src | |
parent | 8b91360a33b782af423c85f9ec7277394e27beb4 (diff) | |
download | bun-9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1.tar.gz bun-9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1.tar.zst bun-9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1.zip |
New HTTP client (#1231)
* wip
* It mostly works!
* Support `bun install`
* Support `bun create`
* Support chunked transfer encoding
* Handle Keep Alive when redirecting to a different domain
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/event_loop.zig | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 30 | ||||
-rw-r--r-- | src/cli/create_command.zig | 28 | ||||
-rw-r--r-- | src/cli/upgrade_command.zig | 13 | ||||
-rw-r--r-- | src/deps/picohttp.zig | 58 | ||||
-rw-r--r-- | src/deps/picohttpparser.zig | 8 | ||||
-rw-r--r-- | src/deps/uws.zig | 88 | ||||
-rw-r--r-- | src/hive_array.zig | 106 | ||||
-rw-r--r-- | src/http/websocket_http_client.zig | 29 | ||||
-rw-r--r-- | src/http/zlib.zig | 3 | ||||
-rw-r--r-- | src/http_client_async.zig | 1824 | ||||
-rw-r--r-- | src/install/install.zig | 24 | ||||
-rw-r--r-- | src/io/io_darwin.zig | 9 | ||||
-rw-r--r-- | src/network_thread.zig | 76 |
14 files changed, 1556 insertions, 742 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 54a5af1a8..51d55d2e0 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -486,7 +486,7 @@ pub const EventLoop = struct { } if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) { - if (this.waker) |waker| { + if (this.waker) |*waker| { waker.wake() catch unreachable; } } diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index e649bb923..6c1fc49f9 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -517,6 +517,7 @@ pub const Fetch = struct { pub const FetchTasklet = struct { promise: *JSPromise = undefined, http: HTTPClient.AsyncHTTP = undefined, + result: HTTPClient.HTTPClientResult = undefined, status: Status = Status.pending, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, @@ -592,12 +593,13 @@ pub const Fetch = struct { this.blob_store = null; store.deref(); } + defer this.result.deinitMetadata(); const fetch_error = std.fmt.allocPrint( default_allocator, "fetch() failed {s}\nurl: \"{s}\"", .{ @errorName(this.http.err orelse error.HTTPFail), - this.http.url.href, + this.result.href, }, ) catch unreachable; return ZigString.init(fetch_error).toErrorInstance(this.global_this); @@ -611,11 +613,12 @@ pub const Fetch = struct { this.blob_store = null; store.deref(); } + defer this.result.deinitMetadata(); response.* = Response{ .allocator = allocator, - .url = allocator.dupe(u8, this.http.url.href) catch unreachable, + .url = allocator.dupe(u8, this.result.href) catch unreachable, .status_text = allocator.dupe(u8, http_response.status) catch unreachable, - .redirected = this.http.redirect_count > 0, + .redirected = this.http.redirected, .body = .{ .init = .{ .headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers), @@ -645,7 +648,7 @@ pub const Fetch = struct { // linked_list.data.pooled_body = BodyPool.get(allocator); linked_list.data.blob_store = request_body_store; linked_list.data.response_buffer = MutableString.initEmpty(allocator); - linked_list.data.http = try HTTPClient.AsyncHTTP.init( + linked_list.data.http = HTTPClient.AsyncHTTP.init( allocator, method, url, @@ -653,10 +656,16 @@ pub const Fetch = struct { headers_buf, &linked_list.data.response_buffer, request_body orelse &linked_list.data.empty_request_body, - timeout, + undefined, ); linked_list.data.context = .{ .tasklet = &linked_list.data }; + linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + &linked_list.data, + ); return linked_list; } @@ -672,21 +681,20 @@ pub const Fetch = struct { timeout: usize, request_body_store: ?*Blob.Store, ) !*FetchTasklet.Pool.Node { - try NetworkThread.init(); + try HTTPClient.HTTPThread.init(); var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store); node.data.global_this = global; - node.data.http.callback = callback; var batch = NetworkThread.Batch{}; node.data.http.schedule(allocator, &batch); - NetworkThread.global.schedule(batch); + HTTPClient.http_thread.schedule(batch); VirtualMachine.vm.active_tasks +|= 1; return node; } - pub fn callback(http_: *HTTPClient.AsyncHTTP) void { - var task: *FetchTasklet = @fieldParentPtr(FetchTasklet, "http", http_); - @atomicStore(Status, &task.status, Status.done, .Monotonic); + pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void { + task.response_buffer = result.body.?.*; + task.result = result; task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task)); } }; diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index a0017dd34..ee6266f40 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -251,7 +251,7 @@ pub const CreateCommand = struct { @setCold(true); Global.configureAllocator(.{ .long_running = false }); - try NetworkThread.init(); + try HTTP.HTTPThread.init(); var create_options = try CreateOptions.parse(ctx, false); const positionals = create_options.positionals; @@ -1849,7 +1849,16 @@ pub const Example = struct { // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + api_url, + header_entries, + headers_buf, + mutable, + &request_body, + 60 * std.time.ns_per_min, + ); async_http.client.progress_node = progress; const response = try async_http.sendSync(true); @@ -1912,7 +1921,7 @@ pub const Example = struct { // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); async_http.client.progress_node = progress; var response = try async_http.sendSync(true); @@ -1984,7 +1993,7 @@ pub const Example = struct { mutable.reset(); // ensure very stable memory address - async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); async_http.client.progress_node = progress; refresher.maybeRefresh(); @@ -2013,7 +2022,16 @@ pub const Example = struct { var mutable = try ctx.allocator.create(MutableString); mutable.* = try MutableString.init(ctx.allocator, 2048); - async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + url, + .{}, + "", + mutable, + &request_body, + 60 * std.time.ns_per_min, + ); if (Output.enable_ansi_colors) { async_http.client.progress_node = progress_node; diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig index 0d7db71dd..15881e984 100644 --- a/src/cli/upgrade_command.zig +++ b/src/cli/upgrade_command.zig @@ -211,7 +211,16 @@ pub const UpgradeCommand = struct { // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = try HTTP.AsyncHTTP.init(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync( + allocator, + .GET, + api_url, + header_entries, + headers_buf, + &metadata_body, + &request_body, + 60 * std.time.ns_per_min, + ); if (!silent) async_http.client.progress_node = progress; const response = try async_http.sendSync(true); @@ -434,7 +443,7 @@ pub const UpgradeCommand = struct { zip_file_buffer.* = try MutableString.init(ctx.allocator, @maximum(version.size, 1024)); var request_buffer = try MutableString.init(ctx.allocator, 0); - async_http.* = try HTTP.AsyncHTTP.init( + async_http.* = HTTP.AsyncHTTP.initSync( ctx.allocator, .GET, URL.parse(version.zip_url), diff --git a/src/deps/picohttp.zig b/src/deps/picohttp.zig index ac1a44d35..4a6848749 100644 --- a/src/deps/picohttp.zig +++ b/src/deps/picohttp.zig @@ -8,6 +8,7 @@ const Environment = @import("../global.zig").Environment; const fmt = std.fmt; const assert = std.debug.assert; +const StringBuilder = @import("../string_builder.zig"); pub const Header = struct { name: []const u8, @@ -33,6 +34,18 @@ pub const Header = struct { } } + pub fn count(this: *const Header, builder: *StringBuilder) void { + builder.count(this.name); + builder.count(this.value); + } + + pub fn clone(this: *const Header, builder: *StringBuilder) Header { + return .{ + .name = builder.append(this.name), + .value = builder.append(this.value), + }; + } + comptime { assert(@sizeOf(Header) == @sizeOf(c.phr_header)); assert(@alignOf(Header) == @alignOf(c.phr_header)); @@ -44,6 +57,21 @@ pub const Request = struct { path: []const u8, minor_version: usize, headers: []const Header, + bytes_read: u32 = 0, + + pub fn clone(this: *const Request, headers: []Header, builder: *StringBuilder) Request { + for (this.headers) |header, i| { + headers[i] = header.clone(builder); + } + + return .{ + .method = builder.append(this.method), + .path = builder.append(this.path), + .minor_version = this.minor_version, + .headers = headers, + .bytes_read = this.bytes_read, + }; + } pub fn format(self: Request, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void { try fmt.format(writer, "{s} {s}\n", .{ self.method, self.path }); @@ -83,16 +111,17 @@ pub const Request = struct { .path = path, .minor_version = @intCast(usize, minor_version), .headers = src[0..num_headers], + .bytes_read = @intCast(u32, rc), }, }; } }; pub const Response = struct { - minor_version: usize, - status_code: usize, - status: []const u8, - headers: []Header, + minor_version: usize = 0, + status_code: usize = 0, + status: []const u8 = "", + headers: []Header = &.{}, bytes_read: c_int = 0, pub fn format(self: Response, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void { @@ -103,6 +132,27 @@ pub const Response = struct { } } + pub fn count(this: *const Response, builder: *StringBuilder) void { + builder.count(this.status); + + for (this.headers) |header| { + header.count(builder); + } + } + + pub fn clone(this: *const Response, headers: []Header, builder: *StringBuilder) Response { + var that = this.*; + that.status = builder.append(this.status); + + for (this.headers) |header, i| { + headers[i] = header.clone(builder); + } + + that.headers = headers[0..this.headers.len]; + + return that; + } + pub fn parseParts(buf: []const u8, src: []Header, offset: ?*usize) !Response { var minor_version: c_int = 1; var status_code: c_int = 0; diff --git a/src/deps/picohttpparser.zig b/src/deps/picohttpparser.zig index ea9ad9f3a..185ed822e 100644 --- a/src/deps/picohttpparser.zig +++ b/src/deps/picohttpparser.zig @@ -10,10 +10,10 @@ pub extern fn phr_parse_request(buf: [*c]const u8, len: usize, method: [*c][*c]c pub extern fn phr_parse_response(_buf: [*c]const u8, len: usize, minor_version: [*c]c_int, status: [*c]c_int, msg: [*c][*c]const u8, msg_len: [*c]usize, headers: [*c]struct_phr_header, num_headers: [*c]usize, last_len: usize) c_int; pub extern fn phr_parse_headers(buf: [*c]const u8, len: usize, headers: [*c]struct_phr_header, num_headers: [*c]usize, last_len: usize) c_int; pub const struct_phr_chunked_decoder = extern struct { - bytes_left_in_chunk: usize, - consume_trailer: u8, - _hex_count: u8, - _state: u8, + bytes_left_in_chunk: usize = 0, + consume_trailer: u8 = 0, + _hex_count: u8 = 0, + _state: u8 = 0, }; pub extern fn phr_decode_chunked(decoder: *struct_phr_chunked_decoder, buf: [*]u8, bufsz: *usize) isize; pub extern fn phr_decode_chunked_is_in_data(decoder: *struct_phr_chunked_decoder) c_int; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index c545de2df..a2a7da143 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -25,12 +25,17 @@ pub fn NewSocketHandler(comptime ssl: bool) type { return us_socket_timeout(comptime ssl_int, this.socket, seconds); } pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType { + const alignment = if (ContextType == *anyopaque) + @sizeOf(usize) + else + std.meta.alignment(ContextType); + var ptr = us_socket_ext( comptime ssl_int, this.socket, ) orelse return null; - return @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), ptr)); + return @ptrCast(*ContextType, @alignCast(alignment, ptr)); } pub fn context(this: ThisSocket) *us_socket_context_t { return us_socket_context( @@ -126,28 +131,51 @@ pub fn NewSocketHandler(comptime ssl: bool) type { return holder; } + pub fn connectAnon( + host: []const u8, + port: c_int, + socket_ctx: *us_socket_context_t, + ptr: *anyopaque, + ) ?ThisSocket { + var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator); + var allocator = stack_fallback.get(); + var host_ = allocator.dupeZ(u8, host) catch return null; + defer allocator.free(host_); + + var socket = us_socket_context_connect(comptime ssl_int, socket_ctx, host_, port, null, 0, @sizeOf(*anyopaque)) orelse return null; + const socket_ = ThisSocket{ .socket = socket }; + var holder = socket_.ext(*anyopaque) orelse { + if (comptime bun.Environment.allow_assert) unreachable; + _ = us_socket_close_connecting(comptime ssl_int, socket); + return null; + }; + holder.* = ptr; + return socket_; + } + pub fn configure( ctx: *us_socket_context_t, comptime ContextType: type, - comptime onOpen: anytype, - comptime onClose: anytype, - comptime onData: anytype, - comptime onWritable: anytype, - comptime onTimeout: anytype, - comptime onConnectError: anytype, - comptime onEnd: anytype, + comptime Fields: anytype, ) void { + const field_type = comptime if (@TypeOf(Fields) != type) @TypeOf(Fields) else Fields; + const SocketHandler = struct { + const alignment = if (ContextType == anyopaque) + @sizeOf(usize) + else + std.meta.alignment(ContextType); + pub fn on_open(socket: *Socket, _: c_int, _: [*c]u8, _: c_int) callconv(.C) ?*Socket { - onOpen( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onOpen( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, ); return socket; } pub fn on_close(socket: *Socket, code: c_int, reason: ?*anyopaque) callconv(.C) ?*Socket { - onClose( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onClose( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, code, reason, @@ -155,57 +183,57 @@ pub fn NewSocketHandler(comptime ssl: bool) type { return socket; } pub fn on_data(socket: *Socket, buf: ?[*]u8, len: c_int) callconv(.C) ?*Socket { - onData( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onData( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, buf.?[0..@intCast(usize, len)], ); return socket; } pub fn on_writable(socket: *Socket) callconv(.C) ?*Socket { - onWritable( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onWritable( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, ); return socket; } pub fn on_timeout(socket: *Socket) callconv(.C) ?*Socket { - onTimeout( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onTimeout( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, ); return socket; } pub fn on_connect_error(socket: *Socket, code: c_int) callconv(.C) ?*Socket { - onConnectError( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onConnectError( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, code, ); return socket; } pub fn on_end(socket: *Socket) callconv(.C) ?*Socket { - onEnd( - @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)), + Fields.onEnd( + @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)), ThisSocket{ .socket = socket }, ); return socket; } }; - if (comptime @typeInfo(@TypeOf(onOpen)) != .Null) + if (comptime @hasDecl(field_type, "onOpen") and @typeInfo(@TypeOf(field_type.onOpen)) != .Null) us_socket_context_on_open(ssl_int, ctx, SocketHandler.on_open); - if (comptime @typeInfo(@TypeOf(onClose)) != .Null) + if (comptime @hasDecl(field_type, "onClose") and @typeInfo(@TypeOf(field_type.onClose)) != .Null) us_socket_context_on_close(ssl_int, ctx, SocketHandler.on_close); - if (comptime @typeInfo(@TypeOf(onData)) != .Null) + if (comptime @hasDecl(field_type, "onData") and @typeInfo(@TypeOf(field_type.onData)) != .Null) us_socket_context_on_data(ssl_int, ctx, SocketHandler.on_data); - if (comptime @typeInfo(@TypeOf(onWritable)) != .Null) + if (comptime @hasDecl(field_type, "onWritable") and @typeInfo(@TypeOf(field_type.onWritable)) != .Null) us_socket_context_on_writable(ssl_int, ctx, SocketHandler.on_writable); - if (comptime @typeInfo(@TypeOf(onTimeout)) != .Null) + if (comptime @hasDecl(field_type, "onTimeout") and @typeInfo(@TypeOf(field_type.onTimeout)) != .Null) us_socket_context_on_timeout(ssl_int, ctx, SocketHandler.on_timeout); - if (comptime @typeInfo(@TypeOf(onConnectError)) != .Null) + if (comptime @hasDecl(field_type, "onConnectError") and @typeInfo(@TypeOf(field_type.onConnectError)) != .Null) us_socket_context_on_connect_error(ssl_int, ctx, SocketHandler.on_connect_error); - if (comptime @typeInfo(@TypeOf(onEnd)) != .Null) + if (comptime @hasDecl(field_type, "onEnd") and @typeInfo(@TypeOf(field_type.onEnd)) != .Null) us_socket_context_on_end(ssl_int, ctx, SocketHandler.on_end); } @@ -316,7 +344,7 @@ extern fn us_socket_context_add_server_name(ssl: c_int, context: ?*us_socket_con extern fn us_socket_context_remove_server_name(ssl: c_int, context: ?*us_socket_context_t, hostname_pattern: [*c]const u8) void; extern fn us_socket_context_on_server_name(ssl: c_int, context: ?*us_socket_context_t, cb: ?fn (?*us_socket_context_t, [*c]const u8) callconv(.C) void) void; extern fn us_socket_context_get_native_handle(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque; -extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t; +pub extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t; extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) void; extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: fn (*Socket, c_int, [*c]u8, c_int) callconv(.C) ?*Socket) void; extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: fn (*Socket, c_int, ?*anyopaque) callconv(.C) ?*Socket) void; diff --git a/src/hive_array.zig b/src/hive_array.zig new file mode 100644 index 000000000..eb9220e19 --- /dev/null +++ b/src/hive_array.zig @@ -0,0 +1,106 @@ +const std = @import("std"); +const assert = std.debug.assert; +const mem = std.mem; +const testing = std.testing; + +/// An array that efficiently tracks which elements are in use. +/// The pointers are intended to be stable +/// Sorta related to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p0447r15.html +pub fn HiveArray(comptime T: type, comptime capacity: u16) type { + return struct { + const Self = @This(); + buffer: [capacity]T = undefined, + available: std.bit_set.IntegerBitSet(capacity) = std.bit_set.IntegerBitSet(capacity).initFull(), + pub const size = capacity; + + pub fn init() Self { + return .{}; + } + + pub fn get(self: *Self) ?*T { + const index = self.available.findFirstSet() orelse return null; + self.available.unset(index); + return &self.buffer[index]; + } + + pub fn at(self: *Self, index: u16) *T { + assert(index < capacity); + return &self.buffer[index]; + } + + pub fn claim(self: *Self, index: u16) void { + assert(index < capacity); + assert(self.available.isSet(index)); + self.available.unset(index); + } + + pub fn indexOf(self: *const Self, value: *const T) ?u63 { + const start = &self.buffer; + const end = @ptrCast([*]const T, start) + capacity; + if (!(@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end))) + return null; + + // aligned to the size of T + const index = (@ptrToInt(value) - @ptrToInt(start)) / @sizeOf(T); + assert(index < capacity); + assert(&self.buffer[index] == value); + return @truncate(u63, index); + } + + pub fn in(self: *const Self, value: *const T) bool { + const start = &self.buffer; + const end = @ptrCast([*]const T, start) + capacity; + return (@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end)); + } + + pub fn put(self: *Self, value: *T) bool { + const index = self.indexOf(value) orelse return false; + + assert(!self.available.isSet(index)); + assert(&self.buffer[index] == value); + + value.* = undefined; + + self.available.set(index); + return true; + } + }; +} + +test "HiveArray" { + const size = 64; + + // Choose an integer with a weird alignment + const Int = u127; + + var a = HiveArray(Int, size).init(); + + { + var b = a.get().?; + try testing.expect(a.get().? != b); + try testing.expectEqual(a.indexOf(b), 0); + try testing.expect(a.put(b)); + try testing.expect(a.get().? == b); + var c = a.get().?; + c.* = 123; + var d: Int = 12345; + try testing.expect(a.put(&d) == false); + try testing.expect(a.in(&d) == false); + } + + a.available = @TypeOf(a.available).initFull(); + { + var i: u63 = 0; + while (i < size) { + var b = a.get().?; + try testing.expectEqual(a.indexOf(b), i); + try testing.expect(a.put(b)); + try testing.expect(a.get().? == b); + i = i + 1; + } + i = 0; + while (i < size) : (i += 1) { + try testing.expect(a.get() == null); + } + } +} diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig index 7e5bb26ba..f380b38e1 100644 --- a/src/http/websocket_http_client.zig +++ b/src/http/websocket_http_client.zig @@ -145,7 +145,19 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type { vm.uws_event_loop = loop; - Socket.configure(ctx, HTTPClient, handleOpen, handleClose, handleData, handleWritable, handleTimeout, handleConnectError, handleEnd); + Socket.configure( + ctx, + HTTPClient, + .{ + .onOpen = handleOpen, + .onClose = handleClose, + .onData = handleData, + .onWritable = handleWritable, + .onTimeout = handleTimeout, + .onConnectError = handleConnectError, + .onEnd = handleEnd, + }, + ); if (is_new_loop) { vm.prepareLoop(); } @@ -805,13 +817,14 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { Socket.configure( ctx, WebSocket, - null, - handleClose, - handleData, - handleWritable, - handleTimeout, - handleConnectError, - handleEnd, + .{ + .onClose = handleClose, + .onData = handleData, + .onWritable = handleWritable, + .onTimeout = handleTimeout, + .onConnectError = handleConnectError, + .onEnd = handleEnd, + }, ); } diff --git a/src/http/zlib.zig b/src/http/zlib.zig index 1bd38777d..f6ada0452 100644 --- a/src/http/zlib.zig +++ b/src/http/zlib.zig @@ -20,6 +20,8 @@ pub fn init(allocator: std.mem.Allocator) ZlibPool { } pub fn get(this: *ZlibPool) !*MutableString { + std.debug.assert(loaded); + switch (this.items.items.len) { 0 => { var mutable = try getAllocator().create(MutableString); @@ -35,6 +37,7 @@ pub fn get(this: *ZlibPool) !*MutableString { } pub fn put(this: *ZlibPool, mutable: *MutableString) !void { + std.debug.assert(loaded); mutable.reset(); try this.items.append(mutable); } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 35c927a13..52f62d473 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -29,98 +29,445 @@ const AsyncBIO = @import("./http/async_bio.zig"); const AsyncSocket = @import("./http/async_socket.zig"); const ZlibPool = @import("./http/zlib.zig"); const URLBufferPool = ObjectPool([4096]u8, null, false, 10); +const uws = @import("uws"); pub const MimeType = @import("./http/mime_type.zig"); pub const URLPath = @import("./http/url_path.zig"); // This becomes Arena.allocator pub var default_allocator: std.mem.Allocator = undefined; pub var default_arena: Arena = undefined; +pub var http_thread: HTTPThread = undefined; +const HiveArray = @import("./hive_array.zig").HiveArray; +const Batch = NetworkThread.Batch; +const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion; + +fn NewHTTPContext(comptime ssl: bool) type { + return struct { + const pool_size = 64; + const PooledSocket = struct { + http_socket: HTTPSocket, + hostname_buf: [MAX_KEEPALIVE_HOSTNAME]u8 = undefined, + hostname_len: u8 = 0, + port: u16 = 0, + + pub fn close(this: *PooledSocket) void { + this.* = undefined; + + if (comptime ssl) { + http_thread.https_context.keep_alive_sockets.unset(http_thread.https_context.pending_sockets.indexOf(this).?); + std.debug.assert(http_thread.https_context.pending_sockets.put(this)); + } else { + http_thread.http_context.keep_alive_sockets.unset(http_thread.http_context.pending_sockets.indexOf(this).?); + std.debug.assert(http_thread.http_context.pending_sockets.put(this)); + } + } + }; -const log = Output.scoped(.fetch, true); + pending_sockets: HiveArray(PooledSocket, pool_size) = HiveArray(PooledSocket, pool_size).init(), + keep_alive_sockets: std.bit_set.IntegerBitSet(pool_size + 1) = std.bit_set.IntegerBitSet(pool_size + 1).initEmpty(), + us_socket_context: *uws.us_socket_context_t, -pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque { - onThreadStartNew(0); - return null; -} + const Context = @This(); + pub const HTTPSocket = uws.NewSocketHandler(ssl); + + const ActiveSocket = TaggedPointerUnion(.{ + HTTPClient, + PooledSocket, + }); + const ssl_int = @as(c_int, @boolToInt(ssl)); -pub fn onThreadStartNew(waker: AsyncIO.Waker) void { - Output.Source.configureNamedThread("HTTP"); - - default_arena = Arena.init() catch unreachable; - default_allocator = default_arena.allocator(); - NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(default_allocator); - AsyncIO.global = AsyncIO.init(1024, 0, waker) catch |err| { - log: { - if (comptime Environment.isLinux) { - if (err == error.SystemOutdated) { - Output.prettyErrorln( - \\<red>error<r>: Linux kernel version doesn't support io_uring, which Bun depends on. - \\ - \\ To fix this error: please upgrade to a newer Linux kernel. - \\ - \\ If you're using Windows Subsystem for Linux, here's how: - \\ 1. Open PowerShell as an administrator - \\ 2. Run this: - \\ wsl --update - \\ wsl --shutdown - \\ - \\ Please make sure you're using WSL version 2 (not WSL 1). To check: wsl -l -v - \\ If you are on WSL 1, update to WSL 2 with the following commands: - \\ 1. wsl --set-default-version 2 - \\ 2. wsl --set-version [distro_name] 2 - \\ 3. Now follow the WSL 2 instructions above. - \\ Where [distro_name] is one of the names from the list given by: wsl -l -v - \\ - \\ If that doesn't work (and you're on a Windows machine), try this: - \\ 1. Open Windows Update - \\ 2. Download any updates to Windows Subsystem for Linux - \\ - \\ If you're still having trouble, ask for help in bun's discord https://bun.sh/discord - , .{}); - break :log; - } else if (err == error.SystemResources) { - Output.prettyErrorln( - \\<red>error<r>: memlock limit exceeded - \\ - \\To fix this error: <b>please increase the memlock limit<r> or upgrade to Linux kernel 5.11+ - \\ - \\If Bun is running inside Docker, make sure to set the memlock limit to unlimited (-1) - \\ - \\ docker run --rm --init --ulimit memlock=-1:-1 jarredsumner/bun:edge - \\ - \\To bump the memlock limit, check one of the following: - \\ /etc/security/limits.conf - \\ /etc/systemd/user.conf - \\ /etc/systemd/system.conf - \\ - \\You can also try running bun as root. - \\ - \\If running many copies of Bun via exec or spawn, be sure that O_CLOEXEC is set so - \\that resources are not leaked when the child process exits. - \\ - \\Why does this happen? - \\ - \\Bun uses io_uring and io_uring accounts memory it - \\needs under the rlimit memlocked option, which can be - \\quite low on some setups (64K). - \\ - \\ - , .{}); - break :log; + const MAX_KEEPALIVE_HOSTNAME = 128; + + pub fn init(this: *@This()) !void { + var opts: uws.us_socket_context_options_t = undefined; + @memset(@ptrCast([*]u8, &opts), 0, @sizeOf(uws.us_socket_context_options_t)); + this.us_socket_context = uws.us_create_socket_context(ssl_int, uws.Loop.get(), @sizeOf(usize), opts).?; + + HTTPSocket.configure( + this.us_socket_context, + anyopaque, + Handler, + ); + } + + /// Attempt to keep the socket alive by reusing it for another request. + /// If no space is available, close the socket. + pub fn releaseSocket(this: *@This(), socket: HTTPSocket, hostname: []const u8, port: u16) void { + if (comptime Environment.allow_assert) { + std.debug.assert(!socket.isClosed()); + std.debug.assert(!socket.isShutdown()); + std.debug.assert(socket.isEstablished()); + } + + if (hostname.len <= MAX_KEEPALIVE_HOSTNAME) { + if (this.pending_sockets.get()) |pending| { + pending.http_socket = socket; + @memcpy(&pending.hostname_buf, hostname.ptr, hostname.len); + pending.hostname_len = @truncate(u8, hostname.len); + pending.port = port; + this.keep_alive_sockets.set( + this.pending_sockets.indexOf(pending).?, + ); + pending.http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr()); + log("Releasing socket for reuse {s}:{d}", .{ hostname, port }); + return; } } - Output.prettyErrorln("<r><red>error<r>: Failed to initialize network thread: <red><b>{s}<r>.\nHTTP requests will not work. Please file an issue and run strace().", .{@errorName(err)}); + socket.close(0, null); } - Global.exit(1); + pub const Handler = struct { + pub fn onOpen( + ptr: *anyopaque, + socket: HTTPSocket, + ) void { + if (ActiveSocket.from(bun.cast(**anyopaque, ptr).*).get(HTTPClient)) |client| { + return client.onOpen(comptime ssl, socket); + } + } + pub fn onClose( + ptr: *anyopaque, + socket: HTTPSocket, + _: c_int, + _: ?*anyopaque, + ) void { + var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (tagged.get(HTTPClient)) |client| { + return client.onClose(comptime ssl, socket); + } + + if (tagged.get(PooledSocket)) |client| { + return client.close(); + } + + unreachable; + } + pub fn onData( + ptr: *anyopaque, + socket: HTTPSocket, + buf: []const u8, + ) void { + var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (tagged.get(HTTPClient)) |client| { + return client.onData( + comptime ssl, + buf, + if (comptime ssl) &http_thread.https_context else &http_thread.http_context, + socket, + ); + } + + unreachable; + } + pub fn onWritable( + ptr: *anyopaque, + socket: HTTPSocket, + ) void { + var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (tagged.get(HTTPClient)) |client| { + return client.onWritable( + false, + comptime ssl, + socket, + ); + } + + unreachable; + } + pub fn onTimeout( + ptr: *anyopaque, + socket: HTTPSocket, + ) void { + var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (tagged.get(HTTPClient)) |client| { + return client.onTimeout( + comptime ssl, + socket, + ); + } else if (tagged.get(PooledSocket)) |pooled| { + pooled.close(); + } + + unreachable; + } + pub fn onConnectError( + ptr: *anyopaque, + socket: HTTPSocket, + _: c_int, + ) void { + var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (tagged.get(HTTPClient)) |client| { + return client.onConnectError( + comptime ssl, + socket, + ); + } else if (tagged.get(PooledSocket)) |pooled| { + pooled.close(); + } + + unreachable; + } + pub fn onEnd( + ptr: *anyopaque, + socket: HTTPSocket, + ) void { + var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (tagged.get(HTTPClient)) |client| { + return client.onEnd( + comptime ssl, + socket, + ); + } else if (tagged.get(PooledSocket)) |pooled| { + pooled.close(); + } + + unreachable; + } + }; + + fn existingSocket(this: *@This(), hostname: []const u8, port: u16) ?HTTPSocket { + if (hostname.len > MAX_KEEPALIVE_HOSTNAME) + return null; + + var iter = this.keep_alive_sockets.iterator(.{ + .kind = .set, + }); + while (iter.next()) |index_i| { + const index = @truncate(u16, index_i); + var socket = this.pending_sockets.at(index); + if (socket.port != port) { + continue; + } + + std.debug.assert(!this.pending_sockets.available.isSet(index)); + + if (strings.eqlLong(socket.hostname_buf[0..socket.hostname_len], hostname, true)) { + const http_socket = socket.http_socket; + socket.close(); + log("Keep-alive socket found for {s}:{d}.", .{ hostname, port }); + return http_socket; + } + } + + return null; + } + + pub fn connect(this: *@This(), client: *HTTPClient, hostname: []const u8, port: u16) !HTTPSocket { + if (this.existingSocket(hostname, port)) |sock| { + sock.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr()); + client.onOpen(comptime ssl, sock); + return sock; + } + + if (HTTPSocket.connectAnon( + hostname, + port, + this.us_socket_context, + undefined, + )) |socket| { + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr()); + return socket; + } + + return error.FailedToOpenSocket; + } }; +} + +pub const HTTPThread = struct { + var http_thread_loaded: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false); - AsyncIO.global_loaded = true; - NetworkThread.global.io = &AsyncIO.global; + loop: *uws.Loop, + http_context: NewHTTPContext(false), + https_context: NewHTTPContext(true), - AsyncBIO.initBoringSSL(); + queued_tasks_mutex: Lock = Lock.init(), + queued_tasks: Batch = .{}, + processing_tasks: Batch = .{}, + has_awoken: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + timer: std.time.Timer = undefined, + const threadlog = Output.scoped(.HTTPThread, true); - NetworkThread.global.processEvents(); + pub fn init() !void { + if (http_thread_loaded.swap(true, .SeqCst)) { + return; + } + + http_thread = .{ + .loop = undefined, + .http_context = .{ + .us_socket_context = undefined, + }, + .https_context = .{ + .us_socket_context = undefined, + }, + .timer = std.time.Timer.start() catch unreachable, + }; + + var thread = try std.Thread.spawn(.{ + .stack_size = 16 * 1024 * 1024, + }, onStart, .{}); + thread.detach(); + } + + pub fn onStart() void { + Output.Source.configureNamedThread("HTTP Client"); + default_arena = Arena.init() catch unreachable; + default_allocator = default_arena.allocator(); + var loop = uws.Loop.get().?; + _ = loop.addPostHandler(*HTTPThread, &http_thread, drainEvents); + http_thread.loop = loop; + http_thread.http_context.init() catch @panic("Failed to init http context"); + http_thread.https_context.init() catch @panic("Failed to init https context"); + http_thread.has_awoken.store(true, .Monotonic); + http_thread.processEvents(); + } + + fn queueEvents(this: *@This()) void { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + if (this.queued_tasks.len == 0) + return; + threadlog("Received {d} tasks\n", .{this.queued_tasks.len}); + this.processing_tasks.push(this.queued_tasks); + this.queued_tasks = .{}; + } + + pub fn connect(this: *@This(), client: *HTTPClient, comptime is_ssl: bool) !NewHTTPContext(is_ssl).HTTPSocket { + return try this.context(is_ssl).connect(client, client.url.hostname, client.url.getPortAuto()); + } + + pub fn context(this: *@This(), comptime is_ssl: bool) *NewHTTPContext(is_ssl) { + return if (is_ssl) &this.https_context else &this.http_context; + } + + fn drainEvents(this: *@This()) void { + this.queueEvents(); + + var count: usize = 0; + + while (this.processing_tasks.pop()) |task| { + var callback = task.callback; + callback(task); + if (comptime Environment.allow_assert) { + count += 1; + } + } + + if (comptime Environment.allow_assert) { + if (count > 0) + log("Processed {d} tasks\n", .{count}); + } + } + + fn processEvents_(this: *@This()) void { + while (true) { + this.drainEvents(); + + var start_time: i128 = 0; + if (comptime Environment.isDebug) { + start_time = std.time.nanoTimestamp(); + } + Output.flush(); + this.loop.run(); + if (comptime Environment.isDebug) { + var end = std.time.nanoTimestamp(); + threadlog("Waited {any}\n", .{std.fmt.fmtDurationSigned(@truncate(i64, end - start_time))}); + Output.flush(); + } + } + } + + pub fn processEvents(this: *@This()) void { + processEvents_(this); + unreachable; + } + pub fn schedule(this: *@This(), batch: Batch) void { + if (batch.len == 0) + return; + + { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + this.queued_tasks.push(batch); + } + + if (this.has_awoken.load(.Monotonic)) + this.loop.wakeup(); + } +}; + +const log = Output.scoped(.fetch, false); + +const AnySocket = union { + https: NewHTTPContext(true).HTTPSocket, + http: NewHTTPContext(false).HTTPSocket, + none: void, + + pub inline fn field(self: @This(), comptime is_ssl: bool) *uws.Socket { + return switch (is_ssl) { + true => self.https.socket, + false => self.http.socket, + }; + } + + pub inline fn client(self: *@This()) *HTTPClient { + return @fieldParentPtr(HTTPClient, "socket", self); + } +}; + +pub fn onOpen( + client: *HTTPClient, + comptime is_ssl: bool, + socket: NewHTTPContext(is_ssl).HTTPSocket, +) void { + log("Connected {s} \n", .{client.url.href}); + if (client.state.request_stage == .pending) { + client.onWritable(true, comptime is_ssl, socket); + } +} +pub fn onClose( + client: *HTTPClient, + comptime is_ssl: bool, + socket: NewHTTPContext(is_ssl).HTTPSocket, +) void { + _ = socket; + log("Closed {s}\n", .{client.url.href}); + + if (client.state.stage != .done and client.state.stage != .fail) + client.fail(error.ConnectionClosed); +} +pub fn onTimeout( + client: *HTTPClient, + comptime is_ssl: bool, + socket: NewHTTPContext(is_ssl).HTTPSocket, +) void { + _ = socket; + log("Timeout {s}\n", .{client.url.href}); + + if (client.state.stage != .done and client.state.stage != .fail) + client.fail(error.Timeout); +} +pub fn onConnectError( + client: *HTTPClient, + comptime is_ssl: bool, + socket: NewHTTPContext(is_ssl).HTTPSocket, +) void { + _ = socket; + log("onConnectError {s}\n", .{client.url.href}); + + if (client.state.stage != .done and client.state.stage != .fail) + client.fail(error.ConnectionRefused); +} +pub fn onEnd( + client: *HTTPClient, + comptime is_ssl: bool, + _: NewHTTPContext(is_ssl).HTTPSocket, +) void { + log("onEnd {s}\n", .{client.url.href}); + + if (client.state.stage != .done and client.state.stage != .fail) + client.fail(error.ConnectionClosed); } pub inline fn getAllocator() std.mem.Allocator { @@ -146,47 +493,147 @@ fn writeRequest( comptime Writer: type, writer: Writer, request: picohttp.Request, - body: string, // header_hashes: []u64, ) !void { - _ = writer.write(request.method); - _ = writer.write(" "); - _ = writer.write(request.path); - _ = writer.write(" HTTP/1.1\r\n"); + _ = writer.write(request.method) catch 0; + _ = writer.write(" ") catch 0; + _ = writer.write(request.path) catch 0; + _ = writer.write(" HTTP/1.1\r\n") catch 0; for (request.headers) |header| { - _ = writer.write(header.name); - _ = writer.write(": "); - _ = writer.write(header.value); - _ = writer.write("\r\n"); + _ = writer.write(header.name) catch 0; + _ = writer.write(": ") catch 0; + _ = writer.write(header.value) catch 0; + _ = writer.write("\r\n") catch 0; } - _ = writer.write("\r\n"); + _ = writer.write("\r\n") catch 0; +} - if (body.len > 0) { - _ = writer.write(body); +pub const HTTPStage = enum { + pending, + headers, + body, + body_chunk, + fail, + done, +}; + +pub const InternalState = struct { + request_message: ?*AsyncMessage = null, + pending_response: picohttp.Response = undefined, + allow_keepalive: bool = true, + transfer_encoding: Encoding = Encoding.identity, + encoding: Encoding = Encoding.identity, + content_encoding_i: u8 = std.math.maxInt(u8), + chunked_decoder: picohttp.phr_chunked_decoder = .{}, + stage: Stage = Stage.pending, + body_out_str: ?*MutableString = null, + compressed_body: ?*MutableString = null, + body_size: usize = 0, + chunked_offset: usize = 0, + request_body: []const u8 = "", + request_sent_len: usize = 0, + fail: anyerror = error.NoError, + request_stage: HTTPStage = .pending, + response_stage: HTTPStage = .pending, + + pub fn reset(this: *InternalState) void { + if (this.request_message) |msg| { + msg.release(); + this.request_message = null; + } + + if (this.compressed_body) |body| { + ZlibPool.instance.put(body) catch unreachable; + this.compressed_body = null; + } + + if (this.body_out_str) |body| { + body.reset(); + } + + this.* = .{ + .body_out_str = this.body_out_str, + }; } -} + + pub fn getBodyBuffer(this: *InternalState) *MutableString { + switch (this.encoding) { + Encoding.gzip, Encoding.deflate => { + if (this.compressed_body == null) { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } + + this.compressed_body = ZlibPool.instance.get() catch unreachable; + } + + return this.compressed_body.?; + }, + else => { + return this.body_out_str.?; + }, + } + } + + pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !void { + var body_out_str = this.body_out_str.?; + var buffer_ = this.getBodyBuffer(); + buffer_.* = buffer; + + switch (this.encoding) { + Encoding.gzip, Encoding.deflate => { + var gzip_timer: std.time.Timer = undefined; + + if (extremely_verbose) + gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); + + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { + Output.prettyErrorln("<r><red>Zlib error<r>", .{}); + Output.flush(); + return err; + }; + + if (extremely_verbose) + this.gzip_elapsed = gzip_timer.read(); + }, + else => {}, + } + + var response = &this.pending_response; + // if it compressed with this header, it is no longer + if (this.content_encoding_i < response.headers.len) { + var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; + _ = mutable_headers.orderedRemove(this.content_encoding_i); + response.headers = mutable_headers.items; + this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i)); + } + + this.body_size = @truncate(usize, body_out_str.list.items.len); + } +}; + +const default_redirect_count = 127; method: Method, header_entries: Headers.Entries, header_buf: string, url: URL, +connected_url: URL = URL{}, allocator: std.mem.Allocator, verbose: bool = Environment.isTest, -tcp_client: tcp.Client = undefined, -body_size: u32 = 0, -read_count: u32 = 0, -remaining_redirect_count: i8 = 127, +remaining_redirect_count: i8 = default_redirect_count, redirect: ?*URLBufferPool.Node = null, -disable_shutdown: bool = true, timeout: usize = 0, progress_node: ?*std.Progress.Node = null, -socket: AsyncSocket.SSL = undefined, -socket_loaded: bool = false, -gzip_elapsed: u64 = 0, -stage: Stage = Stage.pending, received_keep_alive: bool = false, +state: InternalState = .{}, + +completion_callback: HTTPClientResult.Callback = undefined, /// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since. /// This is a workaround for that. @@ -202,16 +649,13 @@ pub fn init( url: URL, header_entries: Headers.Entries, header_buf: string, -) !HTTPClient { +) HTTPClient { return HTTPClient{ .allocator = allocator, .method = method, .url = url, .header_entries = header_entries, .header_buf = header_buf, - .socket = AsyncSocket.SSL{ - .socket = undefined, - }, }; } @@ -220,14 +664,15 @@ pub fn deinit(this: *HTTPClient) !void { redirect.release(); this.redirect = null; } + + this.state.reset(); } const Stage = enum(u8) { pending, connect, - request, - response, done, + fail, }; // threadlocal var resolver_cache @@ -273,7 +718,8 @@ const transfer_encoding_header = hashHeaderName("Transfer-Encoding"); const host_header_name = "Host"; const content_length_header_name = "Content-Length"; const content_length_header_hash = hashHeaderName("Content-Length"); -const connection_header = picohttp.Header{ .name = "Connection", .value = "close" }; +const connection_header = picohttp.Header{ .name = "Connection", .value = "keep-alive" }; +const connection_closing_header = picohttp.Header{ .name = "Connection", .value = "close" }; const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" }; const accept_header_hash = hashHeaderName("Accept"); @@ -299,10 +745,11 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string { pub const HeaderBuilder = @import("./http/header_builder.zig"); -pub const HTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 }); +const HTTPCallbackPair = .{ *AsyncHTTP, HTTPClientResult }; +pub const HTTPChannel = @import("./sync.zig").Channel(HTTPCallbackPair, .{ .Static = 1000 }); // 32 pointers much cheaper than 1000 pointers const SingleHTTPChannel = struct { - const SingleHTTPCHannel_ = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 8 }); + const SingleHTTPCHannel_ = @import("./sync.zig").Channel(HTTPClientResult, .{ .Static = 8 }); channel: SingleHTTPCHannel_, pub fn reset(_: *@This()) void {} pub fn init() SingleHTTPChannel { @@ -314,11 +761,9 @@ pub const HTTPChannelContext = struct { http: AsyncHTTP = undefined, channel: *HTTPChannel, - pub fn callback( - http: *AsyncHTTP, - ) void { - var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", http); - this.channel.writeItem(http) catch unreachable; + pub fn callback(data: HTTPCallbackPair) void { + var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", data.@"0"); + this.channel.writeItem(data) catch unreachable; } }; @@ -391,13 +836,14 @@ pub const AsyncHTTP = struct { max_retry_count: u32 = 0, url: URL, - task: ThreadPool.Task = ThreadPool.Task{ .callback = HTTPSender.callback }, + task: ThreadPool.Task = ThreadPool.Task{ .callback = startAsyncHTTP }, + completion_callback: HTTPClientResult.Callback = undefined, /// Timeout in nanoseconds timeout: usize = 0, + redirected: bool = false, response_encoding: Encoding = Encoding.identity, - redirect_count: u32 = 0, retries_count: u32 = 0, verbose: bool = false, @@ -408,12 +854,6 @@ pub const AsyncHTTP = struct { elapsed: u64 = 0, gzip_elapsed: u64 = 0, - /// Callback runs when request finishes - /// Executes on the network thread - callback: ?CompletionCallback = null, - callback_ctx: ?*anyopaque = null, - - pub const CompletionCallback = fn (this: *AsyncHTTP) void; pub var active_requests_count = std.atomic.Atomic(u32).init(0); pub var max_simultaneous_requests: u16 = 32; @@ -435,7 +875,8 @@ pub const AsyncHTTP = struct { response_buffer: *MutableString, request_body: *MutableString, timeout: usize, - ) !AsyncHTTP { + callback: HTTPClientResult.Callback, + ) AsyncHTTP { var this = AsyncHTTP{ .allocator = allocator, .url = url, @@ -444,13 +885,37 @@ pub const AsyncHTTP = struct { .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, + .completion_callback = callback, }; - this.client = try HTTPClient.init(allocator, method, url, headers, headers_buf); + this.client = HTTPClient.init(allocator, method, url, headers, headers_buf); this.client.timeout = timeout; this.timeout = timeout; return this; } + pub fn initSync( + allocator: std.mem.Allocator, + method: Method, + url: URL, + headers: Headers.Entries, + headers_buf: string, + response_buffer: *MutableString, + request_body: *MutableString, + timeout: usize, + ) AsyncHTTP { + return @This().init( + allocator, + method, + url, + headers, + headers_buf, + response_buffer, + request_body, + timeout, + undefined, + ); + } + fn reset(this: *AsyncHTTP) !void { const timeout = this.timeout; this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf); @@ -459,100 +924,67 @@ pub const AsyncHTTP = struct { } pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void { - NetworkThread.init() catch unreachable; + HTTPThread.init() catch unreachable; this.state.store(.scheduled, .Monotonic); batch.push(ThreadPool.Batch.from(&this.task)); } - fn sendSyncCallback(this: *AsyncHTTP) void { - var single_http_channel = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); - single_http_channel.channel.writeItem(this) catch unreachable; + fn sendSyncCallback(this: *SingleHTTPChannel, result: HTTPClientResult) void { + this.channel.writeItem(result) catch unreachable; } pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response { - if (this.callback_ctx == null) { - var ctx = try bun.default_allocator.create(SingleHTTPChannel); - ctx.* = SingleHTTPChannel.init(); - this.callback_ctx = ctx; - } else { - var ctx = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); - ctx.* = SingleHTTPChannel.init(); - } + try HTTPThread.init(); - this.callback = sendSyncCallback; + var ctx = try bun.default_allocator.create(SingleHTTPChannel); + ctx.* = SingleHTTPChannel.init(); + this.completion_callback = HTTPClientResult.Callback.New( + *SingleHTTPChannel, + sendSyncCallback, + ).init(ctx); var batch = NetworkThread.Batch{}; this.schedule(bun.default_allocator, &batch); - NetworkThread.global.schedule(batch); + http_thread.schedule(batch); while (true) { - var data = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); - var async_http: *AsyncHTTP = data.channel.readItem() catch unreachable; - if (async_http.err) |err| { - return err; + const result: HTTPClientResult = ctx.channel.readItem() catch unreachable; + if (result.fail != error.NoError) { + return result.fail; } - return async_http.response.?; + return result.response; } unreachable; } - pub const HTTPSender = struct { - frame: @Frame(AsyncHTTP.do) = undefined, - finisher: ThreadPool.Task = .{ .callback = onFinish }, - - pub const Pool = ObjectPool(HTTPSender, null, false, 8); - - pub fn callback(task: *ThreadPool.Task) void { - var this = @fieldParentPtr(AsyncHTTP, "task", task); - var sender = HTTPSender.Pool.get(default_allocator); - sender.data = .{ - .frame = undefined, - .finisher = .{ .callback = onFinish }, - }; - sender.data.frame = async do(&sender.data, this); - } - - pub fn onFinish(task: *ThreadPool.Task) void { - var this = @fieldParentPtr(HTTPSender, "finisher", task); - @fieldParentPtr(HTTPSender.Pool.Node, "data", this).release(); - } - }; - - pub fn do(sender: *HTTPSender, this: *AsyncHTTP) void { - defer { - NetworkThread.global.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 }); - } - - outer: { + pub fn onAsyncHTTPComplete(this: *AsyncHTTP, result: HTTPClientResult) void { + var completion = this.completion_callback; + this.response = result.response; + this.elapsed = http_thread.timer.read() -| this.elapsed; + this.redirected = this.client.remaining_redirect_count != default_redirect_count; + if (result.fail != error.NoError) { + this.err = result.fail; + this.state.store(State.fail, .Monotonic); + } else { this.err = null; - this.state.store(.sending, .Monotonic); - - const start = NetworkThread.global.timer.read(); - defer this.elapsed = NetworkThread.global.timer.read() -| start; - - this.response = this.client.send(this.request_body.list.items, this.response_buffer) catch |err| { - this.state.store(.fail, .Monotonic); - this.err = err; + this.state.store(.success, .Monotonic); + } - if (this.max_retry_count > this.retries_count) { - this.retries_count += 1; - this.response_buffer.reset(); + completion.function(completion.ctx, result); + } - NetworkThread.global.schedule(ThreadPool.Batch.from(&this.task)); - return; - } - break :outer; - }; + pub fn startAsyncHTTP(task: *Task) void { + var this = @fieldParentPtr(AsyncHTTP, "task", task); + this.err = null; + this.state.store(.sending, .Monotonic); + this.client.completion_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPComplete).init( + this, + ); - this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0)); - this.state.store(.success, .Monotonic); - this.gzip_elapsed = this.client.gzip_elapsed; - } + this.elapsed = http_thread.timer.read(); - if (this.callback) |callback| { - callback(this); - } + this.client.start(this.request_body.list.items, this.response_buffer); } }; @@ -660,206 +1092,584 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { }; } -pub fn connect( - this: *HTTPClient, - comptime ConnectType: type, - connector: ConnectType, -) !void { - const port = this.url.getPortAuto(); - if (this.verbose) Output.prettyErrorln("<d>[HTTP]<r> Connecting to {s}:{d}", .{ this.url.href, port }); - try connector.connect(this.url.hostname, port); - std.debug.assert(this.socket.socket.socket > 0); - var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; - // client.setQuickACK(true) catch {}; - - this.tcp_client = client; - if (this.timeout > 0) { - client.setReadTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; - client.setWriteTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; +pub fn doRedirect(this: *HTTPClient) void { + var body_out_str = this.state.body_out_str.?; + this.remaining_redirect_count -|= 1; + + if (this.remaining_redirect_count == 0) { + this.fail(error.TooManyRedirects); + return; } + this.state.reset(); + return this.start("", body_out_str); } -pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) @Frame(HTTPClient.send) { - return async this.send(body, body_out_str); +pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void { + body_out_str.reset(); + + std.debug.assert(this.state.request_message == null); + this.state = InternalState{ + .request_body = body, + .body_out_str = body_out_str, + .stage = Stage.pending, + .request_message = null, + .pending_response = picohttp.Response{}, + .compressed_body = null, + }; + + if (this.url.isHTTPS()) { + this.start_(true); + } else { + this.start_(false); + } } -fn maybeClearSocket(this: *HTTPClient) void { - if (this.socket_loaded) { - this.socket_loaded = false; +fn start_(this: *HTTPClient, comptime is_ssl: bool) void { + this.connected_url = this.url; + var socket = http_thread.connect(this, is_ssl) catch |err| { + this.fail(err); + return; + }; - this.socket.deinit(); + if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) { + this.fail(error.ConnectionClosed); + std.debug.assert(this.state.fail != error.NoError); } } -pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { - defer this.maybeClearSocket(); +const Task = ThreadPool.Task; - // this prevents stack overflow - redirect: while (this.remaining_redirect_count >= -1) { - this.maybeClearSocket(); +pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + switch (this.state.request_stage) { + .pending, .headers => { + var stack_fallback = std.heap.stackFallback(16384, default_allocator); + var allocator = stack_fallback.get(); + var list = std.ArrayList(u8).initCapacity(allocator, stack_fallback.buffer.len) catch unreachable; + defer if (list.capacity > stack_fallback.buffer.len) list.deinit(); + var writer = &list.writer(); + + socket.timeout(60); + + const request = this.buildRequest(this.state.request_body.len); + writeRequest( + @TypeOf(writer), + writer, + request, + ) catch { + this.fail(error.OutOfMemory); + socket.close(0, null); + return; + }; - _ = AsyncHTTP.active_requests_count.fetchAdd(1, .Monotonic); - defer { - _ = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); - } + const headers_len = list.items.len; + std.debug.assert(list.items.len == writer.context.items.len); + if (this.state.request_body.len > 0 and list.capacity - list.items.len > 0) { + var remain = list.items.ptr[list.items.len..list.capacity]; + @memcpy(remain.ptr, this.state.request_body.ptr, @minimum(remain.len, this.state.request_body.len)); + } - this.stage = Stage.pending; - body_out_str.reset(); + const to_send = list.items[this.state.request_sent_len..]; + if (comptime Environment.allow_assert) { + std.debug.assert(!socket.isShutdown()); + std.debug.assert(!socket.isClosed()); + } + const amount = socket.write(to_send, true); + if (comptime is_first_call) { + if (amount == 0) { + // don't worry about it + return; + } + } - if (this.url.isHTTPS()) { - return this.sendHTTPS(body, body_out_str) catch |err| { - switch (err) { - error.Redirect => { - this.remaining_redirect_count -= 1; + if (amount < 0) { + this.fail(error.WriteFailed); + socket.close(0, null); + return; + } + + this.state.request_sent_len += @intCast(usize, amount); + const has_sent_headers = this.state.request_sent_len >= headers_len; + + if (has_sent_headers and this.state.request_body.len > 0) { + this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..]; + } + + const has_sent_body = this.state.request_body.len == 0; + + if (has_sent_headers and has_sent_body) { + this.state.request_stage = .done; + return; + } + + if (has_sent_headers) { + this.state.request_stage = .body; + std.debug.assert(this.state.request_body.len > 0); + } else { + this.state.request_stage = .headers; + } + }, + .body => { + socket.timeout(60); + + const to_send = this.state.request_body; + const amount = socket.write(to_send, true); + if (amount < 0) { + this.fail(error.WriteFailed); + socket.close(0, null); + return; + } - continue :redirect; + this.state.request_sent_len += @intCast(usize, amount); + this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; + + if (this.state.request_body.len == 0) { + this.state.request_stage = .done; + return; + } + }, + else => {}, + } +} + +pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { + switch (this.state.response_stage) { + .pending, .headers => { + var to_read = incoming_data; + var pending_buffers: [2]string = .{ "", "" }; + var amount_read: usize = 0; + var needs_move = true; + if (this.state.request_message) |req_msg| { + var available = req_msg.available(); + if (available.len == 0) { + this.state.request_message.?.release(); + this.state.request_message = null; + this.fail(error.ResponseHeadersTooLarge); + socket.close(0, null); + return; + } + + @memcpy( + req_msg.available().ptr, + incoming_data.ptr, + @minimum(available.len, incoming_data.len), + ); + req_msg.used += @truncate(u32, incoming_data.len); + amount_read = @truncate(u32, req_msg.sent); + req_msg.sent = 0; + needs_move = false; + to_read = req_msg.slice(); + pending_buffers[1] = incoming_data[@minimum(available.len, incoming_data.len)..]; + } + + const response = picohttp.Response.parseParts( + to_read, + &this.response_headers_buf, + &amount_read, + ) catch |err| { + switch (err) { + error.ShortRead => { + socket.timeout(60); + if (needs_move) { + std.debug.assert(this.state.request_message == null); + this.state.request_message = AsyncMessage.get(default_allocator); + if (to_read.len > this.state.request_message.?.buf.len) { + this.fail(error.ResponseHeadersTooLarge); + socket.close(0, null); + return; + } + + _ = this.state.request_message.?.writeAll(incoming_data); + this.state.request_message.?.sent = @truncate(u32, to_read.len); + return; + } + }, + else => { + socket.close(0, null); + this.fail(err); + return; }, - else => return err, } + unreachable; }; - } else { - return this.sendHTTP(body, body_out_str) catch |err| { - switch (err) { - error.Redirect => { - this.remaining_redirect_count -= 1; + pending_buffers[0] = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..]; + if (pending_buffers[0].len == 0 and pending_buffers[1].len > 0) { + pending_buffers[0] = pending_buffers[1]; + pending_buffers[1] = ""; + } - continue :redirect; - }, - else => return err, + var deferred_redirect: ?*URLBufferPool.Node = null; + const can_continue = this.handleResponseMetadata( + response, + // If there are multiple consecutive redirects + // and the redirect differs in hostname + // the new URL buffer may point to invalid memory after + // this function is called + // That matters because for Keep Alive, the hostname must point to valid memory + &deferred_redirect, + ) catch |err| { + if (err == error.Redirect) { + if (this.state.request_message) |msg| { + msg.release(); + this.state.request_message = null; + } + + if (this.state.allow_keepalive) { + std.debug.assert(this.connected_url.hostname.len > 0); + ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto()); + } else { + socket.close(0, null); + } + + if (deferred_redirect) |redirect| { + std.debug.assert(redirect != this.redirect); + // connected_url no longer points to valid memory + redirect.release(); + } + this.connected_url = URL{}; + this.doRedirect(); + return; } + + socket.close(0, null); + this.fail(err); + return; }; - } - } - return error.TooManyRedirects; -} + if (!can_continue) { + this.done(is_ssl, ctx, socket); + return; + } -const Task = ThreadPool.Task; + if (pending_buffers[0].len == 0) { + return; + } -pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { - this.socket = AsyncSocket.SSL{ - .socket = try AsyncSocket.init(&AsyncIO.global, 0, default_allocator), - }; - this.socket_loaded = true; - this.stage = Stage.connect; - var socket = &this.socket.socket; - try this.connect(*AsyncSocket, socket); - this.stage = Stage.request; - defer this.closeSocket(); - - var request = buildRequest(this, body.len); - if (this.verbose) { - Output.prettyErrorln("{s}", .{request}); + if (this.state.response_stage == .body) { + { + const is_done = this.handleResponseBody(pending_buffers[0]) catch |err| { + socket.close(0, null); + this.fail(err); + return; + }; + + if (is_done) { + this.done(is_ssl, ctx, socket); + return; + } + } + + if (pending_buffers[1].len > 0) { + const is_done = this.handleResponseBody(pending_buffers[1]) catch |err| { + socket.close(0, null); + this.fail(err); + return; + }; + + if (is_done) { + this.done(is_ssl, ctx, socket); + return; + } + } + } else if (this.state.response_stage == .body_chunk) { + { + const is_done = this.handleResponseBodyChunk(pending_buffers[0]) catch |err| { + socket.close(0, null); + this.fail(err); + return; + }; + + if (is_done) { + this.done(is_ssl, ctx, socket); + return; + } + } + + if (pending_buffers[1].len > 0) { + const is_done = this.handleResponseBodyChunk(pending_buffers[1]) catch |err| { + socket.close(0, null); + this.fail(err); + return; + }; + + if (is_done) { + this.done(is_ssl, ctx, socket); + return; + } + } + + socket.timeout(60); + } + }, + + .body => { + socket.timeout(60); + + const is_done = this.handleResponseBody(incoming_data) catch |err| { + socket.close(0, null); + this.fail(err); + return; + }; + + if (is_done) { + this.done(is_ssl, ctx, socket); + return; + } + }, + + .body_chunk => { + socket.timeout(60); + + const is_done = this.handleResponseBodyChunk(incoming_data) catch |err| { + socket.close(0, null); + this.fail(err); + return; + }; + + if (is_done) { + this.done(is_ssl, ctx, socket); + return; + } + }, + + .fail => {}, + + else => { + socket.close(0, null); + this.fail(error.UnexpectedData); + return; + }, } +} - try writeRequest(@TypeOf(socket), socket, request, body); +fn fail(this: *HTTPClient, err: anyerror) void { + this.state.request_stage = .fail; + this.state.response_stage = .fail; + this.state.fail = err; + this.state.stage = .fail; - _ = try socket.send(); - this.stage = Stage.response; - if (this.progress_node == null) { - return this.processResponse( - false, - @TypeOf(socket), - socket, - body_out_str, - ); - } else { - return this.processResponse( - true, - @TypeOf(socket), - socket, - body_out_str, - ); + const callback = this.completion_callback; + const result = this.toResult(); + this.state.reset(); + callback.run(result); +} + +pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { + var out_str = this.state.body_out_str.?; + var body = out_str.*; + const result = this.toResult(); + const callback = this.completion_callback; + + this.state.response_stage = .done; + this.state.request_stage = .done; + this.state.stage = .done; + + if (this.state.allow_keepalive and !socket.isClosed()) { + socket.timeout(60 * 5); + ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto()); + } else if (!socket.isClosed()) { + socket.close(0, null); } + + this.state.reset(); + result.body.?.* = body; + std.debug.assert(this.state.stage != .done); + this.state.response_stage = .done; + this.state.request_stage = .done; + this.state.stage = .done; + + callback.run(result); } -pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { - defer if (this.verbose) Output.flush(); - var response: picohttp.Response = .{ - .minor_version = 1, - .status_code = 0, - .status = "", - .headers = &[_]picohttp.Header{}, - }; - var request_message = AsyncMessage.get(default_allocator); - defer request_message.release(); - var request_buffer: []u8 = request_message.buf; - var read_length: usize = 0; - { - var read_headers_up_to: usize = 0; - - var req_buf_read: usize = std.math.maxInt(usize); - defer this.read_count += @intCast(u32, read_length); - - restart: while (req_buf_read != 0) { - req_buf_read = try client.read(request_buffer, read_length); - read_length += req_buf_read; - var request_body = request_buffer[0..read_length]; - log("request_body ({d}):\n{s}", .{ read_length, request_body }); - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(read_length); - this.progress_node.?.context.maybeRefresh(); - } +pub const HTTPClientResult = struct { + body: ?*MutableString = null, + response: picohttp.Response, + metadata_buf: []u8 = &.{}, + href: []const u8 = "", + fail: anyerror = error.NoError, + headers_buf: []picohttp.Header = &.{}, + + pub fn deinitMetadata(this: *HTTPClientResult) void { + if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf); + if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf); + this.headers_buf = &.{}; + this.metadata_buf = &.{}; + this.href = ""; + this.response.headers = &.{}; + this.response.status = ""; + } - read_headers_up_to = @minimum(read_headers_up_to, read_length); + pub const Callback = struct { + ctx: *anyopaque, + function: Function, - response = picohttp.Response.parseParts(request_body, &this.response_headers_buf, &read_headers_up_to) catch |err| { - log("read_headers_up_to: {d}", .{read_headers_up_to}); - switch (err) { - error.ShortRead => continue :restart, - else => return err, + pub const Function = fn (*anyopaque, HTTPClientResult) void; + + pub fn run(self: Callback, result: HTTPClientResult) void { + self.function(self.ctx, result); + } + + pub fn New(comptime Type: type, comptime callback: anytype) type { + return struct { + pub fn init(this: Type) Callback { + return Callback{ + .ctx = this, + .function = @This().wrapped_callback, + }; + } + + pub fn wrapped_callback(ptr: *anyopaque, result: HTTPClientResult) void { + var casted = @ptrCast(Type, @alignCast(std.meta.alignment(Type), ptr)); + @call(.{ .modifier = .always_inline }, callback, .{ casted, result }); } }; - break :restart; } + }; +}; + +pub fn toResult(this: *HTTPClient) HTTPClientResult { + var builder_ = StringBuilder{}; + var builder = &builder_; + this.state.pending_response.count(builder); + builder.count(this.url.href); + builder.allocate(bun.default_allocator) catch unreachable; + var headers_buf = bun.default_allocator.alloc(picohttp.Header, this.state.pending_response.headers.len) catch unreachable; + const response = this.state.pending_response.clone(headers_buf, builder); + const href = builder.append(this.url.href); + + return HTTPClientResult{ + .body = this.state.body_out_str, + .response = response, + .metadata_buf = builder.ptr.?[0..builder.cap], + .href = href, + .fail = this.state.fail, + .headers_buf = headers_buf, + }; +} + +pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool { + var buffer = this.state.getBodyBuffer(); + + const remaining_content_length = this.state.body_size - buffer.list.items.len; + var remainder = incoming_data[0..@minimum(incoming_data.len, remaining_content_length)]; + + _ = try buffer.write(remainder); + + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(buffer.list.items.len); + progress.context.maybeRefresh(); } - if (read_length == 0) { - return error.NoData; + + if (buffer.list.items.len == this.state.body_size) { + try this.state.processBodyBuffer(buffer.*); + + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(buffer.list.items.len); + progress.context.maybeRefresh(); + } + return true; } - body_out_str.reset(); - var content_length: u32 = 0; - var encoding = Encoding.identity; - var transfer_encoding = Encoding.identity; + return false; +} - var location: string = ""; +pub fn handleResponseBodyChunk( + this: *HTTPClient, + incoming_data: []const u8, +) !bool { + var decoder = &this.state.chunked_decoder; + var buffer_ = this.state.getBodyBuffer(); + var buffer = buffer_.*; + this.state.chunked_offset += incoming_data.len; + try buffer.appendSlice(incoming_data); + + // set consume_trailer to 1 to discard the trailing header + // using content-encoding per chunk is not supported + decoder.consume_trailer = 1; + + // these variable names are terrible + // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked + // (but ported from C -> zig) + const pret = picohttp.phr_decode_chunked( + decoder, + buffer.list.items.ptr, + + // this represents the position that we are currently at in the buffer + &this.state.chunked_offset, + ); + + switch (pret) { + // Invalid HTTP response body + -1 => { + return error.InvalidHTTPResponse; + }, + // Needs more data + -2 => { + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(buffer.list.items.len); + progress.context.maybeRefresh(); + } - var pretend_its_304 = false; - var maybe_keepalive = false; - errdefer { - maybe_keepalive = false; + if (this.state.compressed_body) |compressed| { + compressed.* = buffer; + } else { + this.state.body_out_str.?.* = buffer; + } + + return false; + }, + // Done + else => { + try this.state.processBodyBuffer( + buffer, + ); + + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(buffer.list.items.len); + progress.context.maybeRefresh(); + } + + return true; + }, } - var content_encoding_i = response.headers.len + 1; +} +pub fn handleResponseMetadata( + this: *HTTPClient, + response: picohttp.Response, + deferred_redirect: *?*URLBufferPool.Node, +) !bool { + var location: string = ""; + var pretend_304 = false; for (response.headers) |header, header_i| { switch (hashHeaderName(header.name)) { content_length_header_hash => { - content_length = std.fmt.parseInt(u32, header.value, 10) catch 0; - try body_out_str.inflate(content_length); - body_out_str.list.expandToCapacity(); - this.body_size = content_length; + const content_length = std.fmt.parseInt(@TypeOf(this.state.body_size), header.value, 10) catch 0; + this.state.body_size = content_length; }, content_encoding_hash => { if (strings.eqlComptime(header.value, "gzip")) { - encoding = Encoding.gzip; - content_encoding_i = header_i; + this.state.encoding = Encoding.gzip; + this.state.content_encoding_i = @truncate(u8, header_i); } else if (strings.eqlComptime(header.value, "deflate")) { - encoding = Encoding.deflate; - content_encoding_i = header_i; + this.state.encoding = Encoding.deflate; + this.state.content_encoding_i = @truncate(u8, header_i); } else if (!strings.eqlComptime(header.value, "identity")) { return error.UnsupportedContentEncoding; } }, transfer_encoding_header => { if (strings.eqlComptime(header.value, "gzip")) { - transfer_encoding = Encoding.gzip; + this.state.transfer_encoding = Encoding.gzip; } else if (strings.eqlComptime(header.value, "deflate")) { - transfer_encoding = Encoding.deflate; + this.state.transfer_encoding = Encoding.deflate; } else if (strings.eqlComptime(header.value, "identity")) { - transfer_encoding = Encoding.identity; + this.state.transfer_encoding = Encoding.identity; } else if (strings.eqlComptime(header.value, "chunked")) { - transfer_encoding = Encoding.chunked; + this.state.transfer_encoding = Encoding.chunked; } else { return error.UnsupportedTransferEncoding; } @@ -869,17 +1679,13 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti }, hashHeaderName("Connection") => { if (response.status_code >= 200 and response.status_code <= 299 and !KeepAlive.disabled) { - if (strings.eqlComptime(header.value, "keep-alive")) { - maybe_keepalive = true; + if (!strings.eqlComptime(header.value, "keep-alive")) { + this.state.allow_keepalive = false; } } }, hashHeaderName("Last-Modified") => { - if (this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0) { - if (strings.eql(this.if_modified_since, header.value)) { - pretend_its_304 = true; - } - } + pretend_304 = this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0 and strings.eql(this.if_modified_since, header.value); }, else => {}, @@ -890,17 +1696,26 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti Output.prettyErrorln("Response: {s}", .{response}); } + this.state.pending_response = response; + if (pretend_304) { + this.state.pending_response.status_code = 304; + } + if (location.len > 0 and this.remaining_redirect_count > 0) { - switch (response.status_code) { + switch (this.state.pending_response.status_code) { 302, 301, 307, 308, 303 => { if (strings.indexOf(location, "://")) |i| { - var url_buf = this.redirect orelse URLBufferPool.get(default_allocator); + var url_buf = URLBufferPool.get(default_allocator); const protocol_name = location[0..i]; if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else { return error.UnsupportedRedirectProtocol; } + if (location.len > url_buf.data.len) { + return error.RedirectURLTooLong; + } + deferred_redirect.* = this.redirect; std.mem.copy(u8, &url_buf.data, location); this.url = URL.parse(url_buf.data[0..location.len]); this.redirect = url_buf; @@ -913,10 +1728,7 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti .{ original_url.displayProtocol(), original_url.displayHostname(), location }, ) catch return error.RedirectURLTooLong); - if (this.redirect) |red| { - red.release(); - } - + deferred_redirect.* = this.redirect; this.redirect = url_buf; } @@ -933,323 +1745,9 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti } } - body_getter: { - if (pretend_its_304) { - response.status_code = 304; - } - - if (response.status_code == 304) break :body_getter; - - if (transfer_encoding == Encoding.chunked) { - maybe_keepalive = false; - var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); - var buffer_: *MutableString = body_out_str; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - if (!ZlibPool.loaded) { - ZlibPool.instance = ZlibPool.init(default_allocator); - ZlibPool.loaded = true; - } - - buffer_ = try ZlibPool.instance.get(); - }, - else => {}, - } - - var buffer = buffer_.*; - - var last_read: usize = 0; - { - const buffered_amount = client.bufferedReadAmount(); - if (buffered_amount > 0) { - var end = request_buffer[read_length..]; - if (buffered_amount <= end.len) { - std.debug.assert(client.read(end, buffered_amount) catch unreachable == buffered_amount); - response.bytes_read += @intCast(c_int, buffered_amount); - } - } - var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; - last_read = remainder.len; - try buffer.inflate(@maximum(remainder.len, 2048)); - buffer.list.expandToCapacity(); - std.mem.copy(u8, buffer.list.items, remainder); - } - - // set consume_trailer to 1 to discard the trailing header - // using content-encoding per chunk is not supported - decoder.consume_trailer = 1; - - // these variable names are terrible - // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked - // (but ported from C -> zig) - var rret: usize = 0; - var rsize: usize = last_read; - var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); - var total_size = rsize; - - while (pret == -2) { - var buffered_amount = client.bufferedReadAmount(); - if (buffer.list.items.len < total_size + 512 or buffer.list.items[total_size..].len < @intCast(usize, @maximum(decoder.bytes_left_in_chunk, buffered_amount)) or buffer.list.items[total_size..].len < 512) { - try buffer.inflate(@maximum((buffered_amount + total_size) * 2, 1024)); - buffer.list.expandToCapacity(); - } - - // while (true) { - - var remainder = buffer.list.items[total_size..]; - const errorable_read = client.read(remainder, 0); - - rret = errorable_read catch |err| { - if (extremely_verbose) Output.prettyErrorln("Chunked transfer encoding error: {s}", .{@errorName(err)}); - return err; - }; - - buffered_amount = client.bufferedReadAmount(); - if (buffered_amount > 0) { - try buffer.list.ensureTotalCapacity(default_allocator, rret + total_size + buffered_amount); - buffer.list.expandToCapacity(); - remainder = buffer.list.items[total_size..]; - remainder = remainder[rret..][0..buffered_amount]; - rret += client.read(remainder, 0) catch |err| { - if (extremely_verbose) Output.prettyErrorln("Chunked transfer encoding error: {s}", .{@errorName(err)}); - return err; - }; - } - - // socket hang up, there was a parsing error, etc - if (rret == 0) { - if (extremely_verbose) Output.prettyErrorln("Unexpected 0", .{}); - - return error.ChunkedEncodingError; - } - - rsize = rret; - pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); - if (pret == -1) { - if (extremely_verbose) - Output.prettyErrorln( - \\ buffered: {d} - \\ rsize: {d} - \\ Read: {d} bytes / {d} total ({d} parsed) - \\ Chunk {d} left - \\ {} - , .{ - client.bufferedReadAmount(), - rsize, - rret, - buffer.list.items.len, - total_size, - decoder.bytes_left_in_chunk, - - decoder, - }); - - return error.ChunkedEncodingParseError; - } - total_size += rsize; - - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(total_size); - this.progress_node.?.context.maybeRefresh(); - } - } - - buffer.list.shrinkRetainingCapacity(total_size); - buffer_.* = buffer; - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - var gzip_timer: std.time.Timer = undefined; - - if (extremely_verbose) - gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); - - body_out_str.list.expandToCapacity(); - defer ZlibPool.instance.put(buffer_) catch unreachable; - ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { - Output.prettyErrorln("<r><red>Zlib error<r>", .{}); - Output.flush(); - return err; - }; - - if (extremely_verbose) - this.gzip_elapsed = gzip_timer.read(); - - // if it compressed with this header, it is no longer - if (content_encoding_i < response.headers.len) { - var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; - _ = mutable_headers.swapRemove(content_encoding_i); - response.headers = mutable_headers.items; - } - }, - else => {}, - } - - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_out_str.list.items.len); - this.progress_node.?.context.maybeRefresh(); - } - - this.body_size = @truncate(u32, body_out_str.list.items.len); - - return response; - } - - if (content_length > 0) { - var remaining_content_length = content_length; - var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; - remainder = remainder[0..std.math.min(remainder.len, content_length)]; - var buffer_: *MutableString = body_out_str; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - if (!ZlibPool.loaded) { - ZlibPool.instance = ZlibPool.init(default_allocator); - ZlibPool.loaded = true; - } - - buffer_ = try ZlibPool.instance.get(); - if (buffer_.list.capacity < remaining_content_length) { - try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); - } - buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; - }, - else => {}, - } - var buffer = buffer_.*; - - var body_size: usize = 0; - if (remainder.len > 0) { - std.mem.copy(u8, buffer.list.items, remainder); - body_size = remainder.len; - this.read_count += @intCast(u32, body_size); - remaining_content_length -= @intCast(u32, remainder.len); - } - - while (remaining_content_length > 0) { - const size = @intCast(u32, try client.read( - buffer.list.items, - body_size, - )); - this.read_count += size; - if (size == 0) break; - - body_size += size; - remaining_content_length -= size; - - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_size); - this.progress_node.?.context.maybeRefresh(); - } - } - - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_size); - this.progress_node.?.context.maybeRefresh(); - } - - buffer.list.shrinkRetainingCapacity(body_size); - buffer_.* = buffer; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - var gzip_timer: std.time.Timer = undefined; - - if (extremely_verbose) - gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); - - body_out_str.list.expandToCapacity(); - defer ZlibPool.instance.put(buffer_) catch unreachable; - ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { - Output.prettyErrorln("<r><red>Zlib error<r>", .{}); - Output.flush(); - return err; - }; - - if (extremely_verbose) - this.gzip_elapsed = gzip_timer.read(); - - // if it compressed with this header, it is no longer - if (content_encoding_i < response.headers.len) { - var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; - _ = mutable_headers.swapRemove(content_encoding_i); - response.headers = mutable_headers.items; - } - }, - else => {}, - } - } - } - - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_out_str.list.items.len); - this.progress_node.?.context.maybeRefresh(); - } - - if (maybe_keepalive and response.status_code >= 200 and response.status_code < 300) { - this.received_keep_alive = true; - } - - return response; -} + this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body; -pub fn closeSocket(this: *HTTPClient) void { - if (this.received_keep_alive) { - this.received_keep_alive = false; - if (this.url.hostname.len > 0 and this.socket.socket.socket > 0) { - if (!this.socket.connect_frame.wait and - (!this.socket.ssl_bio_loaded or - (this.socket.ssl_bio.pending_sends == 0 and this.socket.ssl_bio.pending_reads == 0))) - { - if (KeepAlive.instance.append(this.url.hostname, this.url.getPortAuto(), this.socket.socket.socket)) { - this.socket.socket.socket = 0; - } - } - } - } - this.socket.close(); -} - -pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response { - this.socket = try AsyncSocket.SSL.init(default_allocator, &AsyncIO.global); - this.socket_loaded = true; - - var socket = &this.socket; - this.stage = Stage.connect; - try this.connect(*AsyncSocket.SSL, socket); - this.stage = Stage.request; - defer this.closeSocket(); - - var request = buildRequest(this, body_str.len); - if (this.verbose) { - Output.prettyErrorln("{s}", .{request}); - } - - try writeRequest(@TypeOf(socket), socket, request, body_str); - _ = try socket.send(); - - this.stage = Stage.response; - - if (this.progress_node == null) { - return this.processResponse( - false, - @TypeOf(socket), - socket, - body_out_str, - ); - } else { - return this.processResponse( - true, - @TypeOf(socket), - socket, - body_out_str, - ); - } + return this.method.hasBody() and (this.state.body_size > 0 or this.state.transfer_encoding == .chunked); } // // zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec diff --git a/src/install/install.zig b/src/install/install.zig index a1523b076..f0929cd3d 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -40,6 +40,7 @@ const URL = @import("../url.zig").URL; const AsyncHTTP = @import("http").AsyncHTTP; const HTTPChannel = @import("http").HTTPChannel; const NetworkThread = @import("http").NetworkThread; +const HTTP = @import("http"); const Integrity = @import("./integrity.zig").Integrity; const clap = @import("clap"); @@ -183,9 +184,9 @@ const NetworkTask = struct { binlink: void, }, - pub fn notify(http: *AsyncHTTP) void { + pub fn notify(this: *NetworkTask, _: anytype) void { defer PackageManager.instance.wake(); - PackageManager.instance.network_channel.writeItem(@fieldParentPtr(NetworkTask, "http", http)) catch {}; + PackageManager.instance.network_channel.writeItem(this) catch {}; } // We must use a less restrictive Accept header value @@ -319,7 +320,7 @@ const NetworkTask = struct { this.request_buffer = try MutableString.init(allocator, 0); this.response_buffer = try MutableString.init(allocator, 0); this.allocator = allocator; - this.http = try AsyncHTTP.init( + this.http = AsyncHTTP.init( allocator, .GET, URL.parse(this.url_buf), @@ -328,6 +329,7 @@ const NetworkTask = struct { &this.response_buffer, &this.request_buffer, 0, + this.getCompletionCallback(), ); this.http.max_retry_count = PackageManager.instance.options.max_retry_count; this.callback = .{ @@ -347,8 +349,10 @@ const NetworkTask = struct { this.http.client.force_last_modified = true; this.http.client.if_modified_since = last_modified; } + } - this.http.callback = notify; + pub fn getCompletionCallback(this: *NetworkTask) HTTP.HTTPClientResult.Callback { + return HTTP.HTTPClientResult.Callback.New(*NetworkTask, notify).init(this); } pub fn schedule(this: *NetworkTask, batch: *ThreadPool.Batch) void { @@ -399,7 +403,7 @@ const NetworkTask = struct { header_buf = header_builder.content.ptr.?[0..header_builder.content.len]; } - this.http = try AsyncHTTP.init( + this.http = AsyncHTTP.init( allocator, .GET, URL.parse(this.url_buf), @@ -408,8 +412,8 @@ const NetworkTask = struct { &this.response_buffer, &this.request_buffer, 0, + this.getCompletionCallback(), ); - this.http.callback = notify; this.http.max_retry_count = PackageManager.instance.options.max_retry_count; this.callback = .{ .extract = tarball }; } @@ -2424,7 +2428,7 @@ pub const PackageManager = struct { manager.pending_tasks += @truncate(u32, count); manager.total_tasks += @truncate(u32, count); manager.network_resolve_batch.push(manager.network_tarball_batch); - NetworkThread.global.schedule(manager.network_resolve_batch); + HTTP.http_thread.schedule(manager.network_resolve_batch); manager.network_tarball_batch = .{}; manager.network_resolve_batch = .{}; return count; @@ -2463,7 +2467,7 @@ pub const PackageManager = struct { this.pending_tasks += @truncate(u32, count); this.total_tasks += @truncate(u32, count); this.network_resolve_batch.push(this.network_tarball_batch); - NetworkThread.global.schedule(this.network_resolve_batch); + HTTP.http_thread.schedule(this.network_resolve_batch); this.network_tarball_batch = .{}; this.network_resolve_batch = .{}; } @@ -2831,7 +2835,7 @@ pub const PackageManager = struct { manager.total_tasks += @truncate(u32, count); manager.thread_pool.schedule(batch); manager.network_resolve_batch.push(manager.network_tarball_batch); - NetworkThread.global.schedule(manager.network_resolve_batch); + HTTP.http_thread.schedule(manager.network_resolve_batch); manager.network_tarball_batch = .{}; manager.network_resolve_batch = .{}; @@ -3611,7 +3615,7 @@ pub const PackageManager = struct { cli: CommandLineArguments, ) !*PackageManager { // assume that spawning a thread will take a lil so we do that asap - try NetworkThread.warmup(); + try HTTP.HTTPThread.init(); if (cli.global) { var explicit_global_dir: string = ""; diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index 7906dd655..78bf30290 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -501,13 +501,16 @@ pub const Waker = struct { kq: os.fd_t, machport: *anyopaque = undefined, machport_buf: []u8 = &.{}, + has_pending_wake: bool = false, const zeroed = std.mem.zeroes([16]Kevent64); - pub fn wake(this: Waker) !void { - if (!io_darwin_schedule_wakeup(this.machport)) { - return error.WakeUpFailed; + pub fn wake(this: *Waker) !void { + if (io_darwin_schedule_wakeup(this.machport)) { + this.has_pending_wake = false; + return; } + this.has_pending_wake = true; } pub fn wait(this: Waker) !usize { diff --git a/src/network_thread.zig b/src/network_thread.zig index aa6fcf15d..56ea6a698 100644 --- a/src/network_thread.zig +++ b/src/network_thread.zig @@ -26,6 +26,79 @@ pub var global: NetworkThread = undefined; pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0); const log = Output.scoped(.NetworkThread, true); +const Global = @import("./global.zig").Global; +pub fn onStartIOThread(waker: AsyncIO.Waker) void { + NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(@import("./global.zig").default_allocator); + AsyncIO.global = AsyncIO.init(1024, 0, waker) catch |err| { + log: { + if (comptime Environment.isLinux) { + if (err == error.SystemOutdated) { + Output.prettyErrorln( + \\<red>error<r>: Linux kernel version doesn't support io_uring, which Bun depends on. + \\ + \\ To fix this error: please upgrade to a newer Linux kernel. + \\ + \\ If you're using Windows Subsystem for Linux, here's how: + \\ 1. Open PowerShell as an administrator + \\ 2. Run this: + \\ wsl --update + \\ wsl --shutdown + \\ + \\ Please make sure you're using WSL version 2 (not WSL 1). To check: wsl -l -v + \\ If you are on WSL 1, update to WSL 2 with the following commands: + \\ 1. wsl --set-default-version 2 + \\ 2. wsl --set-version [distro_name] 2 + \\ 3. Now follow the WSL 2 instructions above. + \\ Where [distro_name] is one of the names from the list given by: wsl -l -v + \\ + \\ If that doesn't work (and you're on a Windows machine), try this: + \\ 1. Open Windows Update + \\ 2. Download any updates to Windows Subsystem for Linux + \\ + \\ If you're still having trouble, ask for help in bun's discord https://bun.sh/discord + , .{}); + break :log; + } else if (err == error.SystemResources) { + Output.prettyErrorln( + \\<red>error<r>: memlock limit exceeded + \\ + \\To fix this error: <b>please increase the memlock limit<r> or upgrade to Linux kernel 5.11+ + \\ + \\If Bun is running inside Docker, make sure to set the memlock limit to unlimited (-1) + \\ + \\ docker run --rm --init --ulimit memlock=-1:-1 jarredsumner/bun:edge + \\ + \\To bump the memlock limit, check one of the following: + \\ /etc/security/limits.conf + \\ /etc/systemd/user.conf + \\ /etc/systemd/system.conf + \\ + \\You can also try running bun as root. + \\ + \\If running many copies of Bun via exec or spawn, be sure that O_CLOEXEC is set so + \\that resources are not leaked when the child process exits. + \\ + \\Why does this happen? + \\ + \\Bun uses io_uring and io_uring accounts memory it + \\needs under the rlimit memlocked option, which can be + \\quite low on some setups (64K). + \\ + \\ + , .{}); + break :log; + } + } + + Output.prettyErrorln("<r><red>error<r>: Failed to initialize network thread: <red><b>{s}<r>.\nHTTP requests will not work. Please file an issue and run strace().", .{@errorName(err)}); + } + + Global.exit(1); + }; + AsyncIO.global_loaded = true; + NetworkThread.global.io = &AsyncIO.global; + NetworkThread.global.processEvents(); +} fn queueEvents(this: *@This()) void { this.queued_tasks_mutex.lock(); @@ -41,6 +114,7 @@ pub fn processEvents(this: *@This()) void { processEvents_(this) catch {}; unreachable; } + /// Should only be called on the HTTP thread! fn processEvents_(this: *@This()) !void { while (true) { @@ -164,7 +238,7 @@ pub fn init() !void { @compileLog("TODO: Waker"); } - global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{ + global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, onStartIOThread, .{ global.waker, }); global.thread.detach(); |