aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-09-11 13:37:17 -0700
committerGravatar GitHub <noreply@github.com> 2022-09-11 13:37:17 -0700
commit9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1 (patch)
treeb96909744baddf65bc4175c40bcc0d9d1658dbb1 /src
parent8b91360a33b782af423c85f9ec7277394e27beb4 (diff)
downloadbun-9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1.tar.gz
bun-9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1.tar.zst
bun-9a5aa059f91ecf14b5dfd28ec610df6d8373b5a1.zip
New HTTP client (#1231)
* wip * It mostly works! * Support `bun install` * Support `bun create` * Support chunked transfer encoding * Handle Keep Alive when redirecting to a different domain Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/event_loop.zig2
-rw-r--r--src/bun.js/webcore/response.zig30
-rw-r--r--src/cli/create_command.zig28
-rw-r--r--src/cli/upgrade_command.zig13
-rw-r--r--src/deps/picohttp.zig58
-rw-r--r--src/deps/picohttpparser.zig8
-rw-r--r--src/deps/uws.zig88
-rw-r--r--src/hive_array.zig106
-rw-r--r--src/http/websocket_http_client.zig29
-rw-r--r--src/http/zlib.zig3
-rw-r--r--src/http_client_async.zig1824
-rw-r--r--src/install/install.zig24
-rw-r--r--src/io/io_darwin.zig9
-rw-r--r--src/network_thread.zig76
14 files changed, 1556 insertions, 742 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 54a5af1a8..51d55d2e0 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -486,7 +486,7 @@ pub const EventLoop = struct {
}
if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) {
- if (this.waker) |waker| {
+ if (this.waker) |*waker| {
waker.wake() catch unreachable;
}
}
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index e649bb923..6c1fc49f9 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -517,6 +517,7 @@ pub const Fetch = struct {
pub const FetchTasklet = struct {
promise: *JSPromise = undefined,
http: HTTPClient.AsyncHTTP = undefined,
+ result: HTTPClient.HTTPClientResult = undefined,
status: Status = Status.pending,
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
@@ -592,12 +593,13 @@ pub const Fetch = struct {
this.blob_store = null;
store.deref();
}
+ defer this.result.deinitMetadata();
const fetch_error = std.fmt.allocPrint(
default_allocator,
"fetch() failed {s}\nurl: \"{s}\"",
.{
@errorName(this.http.err orelse error.HTTPFail),
- this.http.url.href,
+ this.result.href,
},
) catch unreachable;
return ZigString.init(fetch_error).toErrorInstance(this.global_this);
@@ -611,11 +613,12 @@ pub const Fetch = struct {
this.blob_store = null;
store.deref();
}
+ defer this.result.deinitMetadata();
response.* = Response{
.allocator = allocator,
- .url = allocator.dupe(u8, this.http.url.href) catch unreachable,
+ .url = allocator.dupe(u8, this.result.href) catch unreachable,
.status_text = allocator.dupe(u8, http_response.status) catch unreachable,
- .redirected = this.http.redirect_count > 0,
+ .redirected = this.http.redirected,
.body = .{
.init = .{
.headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers),
@@ -645,7 +648,7 @@ pub const Fetch = struct {
// linked_list.data.pooled_body = BodyPool.get(allocator);
linked_list.data.blob_store = request_body_store;
linked_list.data.response_buffer = MutableString.initEmpty(allocator);
- linked_list.data.http = try HTTPClient.AsyncHTTP.init(
+ linked_list.data.http = HTTPClient.AsyncHTTP.init(
allocator,
method,
url,
@@ -653,10 +656,16 @@ pub const Fetch = struct {
headers_buf,
&linked_list.data.response_buffer,
request_body orelse &linked_list.data.empty_request_body,
-
timeout,
+ undefined,
);
linked_list.data.context = .{ .tasklet = &linked_list.data };
+ linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New(
+ *FetchTasklet,
+ FetchTasklet.callback,
+ ).init(
+ &linked_list.data,
+ );
return linked_list;
}
@@ -672,21 +681,20 @@ pub const Fetch = struct {
timeout: usize,
request_body_store: ?*Blob.Store,
) !*FetchTasklet.Pool.Node {
- try NetworkThread.init();
+ try HTTPClient.HTTPThread.init();
var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store);
node.data.global_this = global;
- node.data.http.callback = callback;
var batch = NetworkThread.Batch{};
node.data.http.schedule(allocator, &batch);
- NetworkThread.global.schedule(batch);
+ HTTPClient.http_thread.schedule(batch);
VirtualMachine.vm.active_tasks +|= 1;
return node;
}
- pub fn callback(http_: *HTTPClient.AsyncHTTP) void {
- var task: *FetchTasklet = @fieldParentPtr(FetchTasklet, "http", http_);
- @atomicStore(Status, &task.status, Status.done, .Monotonic);
+ pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
+ task.response_buffer = result.body.?.*;
+ task.result = result;
task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task));
}
};
diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig
index a0017dd34..ee6266f40 100644
--- a/src/cli/create_command.zig
+++ b/src/cli/create_command.zig
@@ -251,7 +251,7 @@ pub const CreateCommand = struct {
@setCold(true);
Global.configureAllocator(.{ .long_running = false });
- try NetworkThread.init();
+ try HTTP.HTTPThread.init();
var create_options = try CreateOptions.parse(ctx, false);
const positionals = create_options.positionals;
@@ -1849,7 +1849,16 @@ pub const Example = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ api_url,
+ header_entries,
+ headers_buf,
+ mutable,
+ &request_body,
+ 60 * std.time.ns_per_min,
+ );
async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
@@ -1912,7 +1921,7 @@ pub const Example = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.client.progress_node = progress;
var response = try async_http.sendSync(true);
@@ -1984,7 +1993,7 @@ pub const Example = struct {
mutable.reset();
// ensure very stable memory address
- async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.client.progress_node = progress;
refresher.maybeRefresh();
@@ -2013,7 +2022,16 @@ pub const Example = struct {
var mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 2048);
- async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ url,
+ .{},
+ "",
+ mutable,
+ &request_body,
+ 60 * std.time.ns_per_min,
+ );
if (Output.enable_ansi_colors) {
async_http.client.progress_node = progress_node;
diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig
index 0d7db71dd..15881e984 100644
--- a/src/cli/upgrade_command.zig
+++ b/src/cli/upgrade_command.zig
@@ -211,7 +211,16 @@ pub const UpgradeCommand = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = try HTTP.AsyncHTTP.init(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ allocator,
+ .GET,
+ api_url,
+ header_entries,
+ headers_buf,
+ &metadata_body,
+ &request_body,
+ 60 * std.time.ns_per_min,
+ );
if (!silent) async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
@@ -434,7 +443,7 @@ pub const UpgradeCommand = struct {
zip_file_buffer.* = try MutableString.init(ctx.allocator, @maximum(version.size, 1024));
var request_buffer = try MutableString.init(ctx.allocator, 0);
- async_http.* = try HTTP.AsyncHTTP.init(
+ async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
URL.parse(version.zip_url),
diff --git a/src/deps/picohttp.zig b/src/deps/picohttp.zig
index ac1a44d35..4a6848749 100644
--- a/src/deps/picohttp.zig
+++ b/src/deps/picohttp.zig
@@ -8,6 +8,7 @@ const Environment = @import("../global.zig").Environment;
const fmt = std.fmt;
const assert = std.debug.assert;
+const StringBuilder = @import("../string_builder.zig");
pub const Header = struct {
name: []const u8,
@@ -33,6 +34,18 @@ pub const Header = struct {
}
}
+ pub fn count(this: *const Header, builder: *StringBuilder) void {
+ builder.count(this.name);
+ builder.count(this.value);
+ }
+
+ pub fn clone(this: *const Header, builder: *StringBuilder) Header {
+ return .{
+ .name = builder.append(this.name),
+ .value = builder.append(this.value),
+ };
+ }
+
comptime {
assert(@sizeOf(Header) == @sizeOf(c.phr_header));
assert(@alignOf(Header) == @alignOf(c.phr_header));
@@ -44,6 +57,21 @@ pub const Request = struct {
path: []const u8,
minor_version: usize,
headers: []const Header,
+ bytes_read: u32 = 0,
+
+ pub fn clone(this: *const Request, headers: []Header, builder: *StringBuilder) Request {
+ for (this.headers) |header, i| {
+ headers[i] = header.clone(builder);
+ }
+
+ return .{
+ .method = builder.append(this.method),
+ .path = builder.append(this.path),
+ .minor_version = this.minor_version,
+ .headers = headers,
+ .bytes_read = this.bytes_read,
+ };
+ }
pub fn format(self: Request, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void {
try fmt.format(writer, "{s} {s}\n", .{ self.method, self.path });
@@ -83,16 +111,17 @@ pub const Request = struct {
.path = path,
.minor_version = @intCast(usize, minor_version),
.headers = src[0..num_headers],
+ .bytes_read = @intCast(u32, rc),
},
};
}
};
pub const Response = struct {
- minor_version: usize,
- status_code: usize,
- status: []const u8,
- headers: []Header,
+ minor_version: usize = 0,
+ status_code: usize = 0,
+ status: []const u8 = "",
+ headers: []Header = &.{},
bytes_read: c_int = 0,
pub fn format(self: Response, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void {
@@ -103,6 +132,27 @@ pub const Response = struct {
}
}
+ pub fn count(this: *const Response, builder: *StringBuilder) void {
+ builder.count(this.status);
+
+ for (this.headers) |header| {
+ header.count(builder);
+ }
+ }
+
+ pub fn clone(this: *const Response, headers: []Header, builder: *StringBuilder) Response {
+ var that = this.*;
+ that.status = builder.append(this.status);
+
+ for (this.headers) |header, i| {
+ headers[i] = header.clone(builder);
+ }
+
+ that.headers = headers[0..this.headers.len];
+
+ return that;
+ }
+
pub fn parseParts(buf: []const u8, src: []Header, offset: ?*usize) !Response {
var minor_version: c_int = 1;
var status_code: c_int = 0;
diff --git a/src/deps/picohttpparser.zig b/src/deps/picohttpparser.zig
index ea9ad9f3a..185ed822e 100644
--- a/src/deps/picohttpparser.zig
+++ b/src/deps/picohttpparser.zig
@@ -10,10 +10,10 @@ pub extern fn phr_parse_request(buf: [*c]const u8, len: usize, method: [*c][*c]c
pub extern fn phr_parse_response(_buf: [*c]const u8, len: usize, minor_version: [*c]c_int, status: [*c]c_int, msg: [*c][*c]const u8, msg_len: [*c]usize, headers: [*c]struct_phr_header, num_headers: [*c]usize, last_len: usize) c_int;
pub extern fn phr_parse_headers(buf: [*c]const u8, len: usize, headers: [*c]struct_phr_header, num_headers: [*c]usize, last_len: usize) c_int;
pub const struct_phr_chunked_decoder = extern struct {
- bytes_left_in_chunk: usize,
- consume_trailer: u8,
- _hex_count: u8,
- _state: u8,
+ bytes_left_in_chunk: usize = 0,
+ consume_trailer: u8 = 0,
+ _hex_count: u8 = 0,
+ _state: u8 = 0,
};
pub extern fn phr_decode_chunked(decoder: *struct_phr_chunked_decoder, buf: [*]u8, bufsz: *usize) isize;
pub extern fn phr_decode_chunked_is_in_data(decoder: *struct_phr_chunked_decoder) c_int;
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index c545de2df..a2a7da143 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -25,12 +25,17 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return us_socket_timeout(comptime ssl_int, this.socket, seconds);
}
pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType {
+ const alignment = if (ContextType == *anyopaque)
+ @sizeOf(usize)
+ else
+ std.meta.alignment(ContextType);
+
var ptr = us_socket_ext(
comptime ssl_int,
this.socket,
) orelse return null;
- return @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), ptr));
+ return @ptrCast(*ContextType, @alignCast(alignment, ptr));
}
pub fn context(this: ThisSocket) *us_socket_context_t {
return us_socket_context(
@@ -126,28 +131,51 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return holder;
}
+ pub fn connectAnon(
+ host: []const u8,
+ port: c_int,
+ socket_ctx: *us_socket_context_t,
+ ptr: *anyopaque,
+ ) ?ThisSocket {
+ var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator);
+ var allocator = stack_fallback.get();
+ var host_ = allocator.dupeZ(u8, host) catch return null;
+ defer allocator.free(host_);
+
+ var socket = us_socket_context_connect(comptime ssl_int, socket_ctx, host_, port, null, 0, @sizeOf(*anyopaque)) orelse return null;
+ const socket_ = ThisSocket{ .socket = socket };
+ var holder = socket_.ext(*anyopaque) orelse {
+ if (comptime bun.Environment.allow_assert) unreachable;
+ _ = us_socket_close_connecting(comptime ssl_int, socket);
+ return null;
+ };
+ holder.* = ptr;
+ return socket_;
+ }
+
pub fn configure(
ctx: *us_socket_context_t,
comptime ContextType: type,
- comptime onOpen: anytype,
- comptime onClose: anytype,
- comptime onData: anytype,
- comptime onWritable: anytype,
- comptime onTimeout: anytype,
- comptime onConnectError: anytype,
- comptime onEnd: anytype,
+ comptime Fields: anytype,
) void {
+ const field_type = comptime if (@TypeOf(Fields) != type) @TypeOf(Fields) else Fields;
+
const SocketHandler = struct {
+ const alignment = if (ContextType == anyopaque)
+ @sizeOf(usize)
+ else
+ std.meta.alignment(ContextType);
+
pub fn on_open(socket: *Socket, _: c_int, _: [*c]u8, _: c_int) callconv(.C) ?*Socket {
- onOpen(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onOpen(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
pub fn on_close(socket: *Socket, code: c_int, reason: ?*anyopaque) callconv(.C) ?*Socket {
- onClose(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onClose(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
code,
reason,
@@ -155,57 +183,57 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return socket;
}
pub fn on_data(socket: *Socket, buf: ?[*]u8, len: c_int) callconv(.C) ?*Socket {
- onData(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onData(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
buf.?[0..@intCast(usize, len)],
);
return socket;
}
pub fn on_writable(socket: *Socket) callconv(.C) ?*Socket {
- onWritable(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onWritable(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
pub fn on_timeout(socket: *Socket) callconv(.C) ?*Socket {
- onTimeout(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onTimeout(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
pub fn on_connect_error(socket: *Socket, code: c_int) callconv(.C) ?*Socket {
- onConnectError(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onConnectError(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
code,
);
return socket;
}
pub fn on_end(socket: *Socket) callconv(.C) ?*Socket {
- onEnd(
- @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
+ Fields.onEnd(
+ @ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
};
- if (comptime @typeInfo(@TypeOf(onOpen)) != .Null)
+ if (comptime @hasDecl(field_type, "onOpen") and @typeInfo(@TypeOf(field_type.onOpen)) != .Null)
us_socket_context_on_open(ssl_int, ctx, SocketHandler.on_open);
- if (comptime @typeInfo(@TypeOf(onClose)) != .Null)
+ if (comptime @hasDecl(field_type, "onClose") and @typeInfo(@TypeOf(field_type.onClose)) != .Null)
us_socket_context_on_close(ssl_int, ctx, SocketHandler.on_close);
- if (comptime @typeInfo(@TypeOf(onData)) != .Null)
+ if (comptime @hasDecl(field_type, "onData") and @typeInfo(@TypeOf(field_type.onData)) != .Null)
us_socket_context_on_data(ssl_int, ctx, SocketHandler.on_data);
- if (comptime @typeInfo(@TypeOf(onWritable)) != .Null)
+ if (comptime @hasDecl(field_type, "onWritable") and @typeInfo(@TypeOf(field_type.onWritable)) != .Null)
us_socket_context_on_writable(ssl_int, ctx, SocketHandler.on_writable);
- if (comptime @typeInfo(@TypeOf(onTimeout)) != .Null)
+ if (comptime @hasDecl(field_type, "onTimeout") and @typeInfo(@TypeOf(field_type.onTimeout)) != .Null)
us_socket_context_on_timeout(ssl_int, ctx, SocketHandler.on_timeout);
- if (comptime @typeInfo(@TypeOf(onConnectError)) != .Null)
+ if (comptime @hasDecl(field_type, "onConnectError") and @typeInfo(@TypeOf(field_type.onConnectError)) != .Null)
us_socket_context_on_connect_error(ssl_int, ctx, SocketHandler.on_connect_error);
- if (comptime @typeInfo(@TypeOf(onEnd)) != .Null)
+ if (comptime @hasDecl(field_type, "onEnd") and @typeInfo(@TypeOf(field_type.onEnd)) != .Null)
us_socket_context_on_end(ssl_int, ctx, SocketHandler.on_end);
}
@@ -316,7 +344,7 @@ extern fn us_socket_context_add_server_name(ssl: c_int, context: ?*us_socket_con
extern fn us_socket_context_remove_server_name(ssl: c_int, context: ?*us_socket_context_t, hostname_pattern: [*c]const u8) void;
extern fn us_socket_context_on_server_name(ssl: c_int, context: ?*us_socket_context_t, cb: ?fn (?*us_socket_context_t, [*c]const u8) callconv(.C) void) void;
extern fn us_socket_context_get_native_handle(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
-extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
+pub extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) void;
extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: fn (*Socket, c_int, [*c]u8, c_int) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: fn (*Socket, c_int, ?*anyopaque) callconv(.C) ?*Socket) void;
diff --git a/src/hive_array.zig b/src/hive_array.zig
new file mode 100644
index 000000000..eb9220e19
--- /dev/null
+++ b/src/hive_array.zig
@@ -0,0 +1,106 @@
+const std = @import("std");
+const assert = std.debug.assert;
+const mem = std.mem;
+const testing = std.testing;
+
+/// An array that efficiently tracks which elements are in use.
+/// The pointers are intended to be stable
+/// Sorta related to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p0447r15.html
+pub fn HiveArray(comptime T: type, comptime capacity: u16) type {
+ return struct {
+ const Self = @This();
+ buffer: [capacity]T = undefined,
+ available: std.bit_set.IntegerBitSet(capacity) = std.bit_set.IntegerBitSet(capacity).initFull(),
+ pub const size = capacity;
+
+ pub fn init() Self {
+ return .{};
+ }
+
+ pub fn get(self: *Self) ?*T {
+ const index = self.available.findFirstSet() orelse return null;
+ self.available.unset(index);
+ return &self.buffer[index];
+ }
+
+ pub fn at(self: *Self, index: u16) *T {
+ assert(index < capacity);
+ return &self.buffer[index];
+ }
+
+ pub fn claim(self: *Self, index: u16) void {
+ assert(index < capacity);
+ assert(self.available.isSet(index));
+ self.available.unset(index);
+ }
+
+ pub fn indexOf(self: *const Self, value: *const T) ?u63 {
+ const start = &self.buffer;
+ const end = @ptrCast([*]const T, start) + capacity;
+ if (!(@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end)))
+ return null;
+
+ // aligned to the size of T
+ const index = (@ptrToInt(value) - @ptrToInt(start)) / @sizeOf(T);
+ assert(index < capacity);
+ assert(&self.buffer[index] == value);
+ return @truncate(u63, index);
+ }
+
+ pub fn in(self: *const Self, value: *const T) bool {
+ const start = &self.buffer;
+ const end = @ptrCast([*]const T, start) + capacity;
+ return (@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end));
+ }
+
+ pub fn put(self: *Self, value: *T) bool {
+ const index = self.indexOf(value) orelse return false;
+
+ assert(!self.available.isSet(index));
+ assert(&self.buffer[index] == value);
+
+ value.* = undefined;
+
+ self.available.set(index);
+ return true;
+ }
+ };
+}
+
+test "HiveArray" {
+ const size = 64;
+
+ // Choose an integer with a weird alignment
+ const Int = u127;
+
+ var a = HiveArray(Int, size).init();
+
+ {
+ var b = a.get().?;
+ try testing.expect(a.get().? != b);
+ try testing.expectEqual(a.indexOf(b), 0);
+ try testing.expect(a.put(b));
+ try testing.expect(a.get().? == b);
+ var c = a.get().?;
+ c.* = 123;
+ var d: Int = 12345;
+ try testing.expect(a.put(&d) == false);
+ try testing.expect(a.in(&d) == false);
+ }
+
+ a.available = @TypeOf(a.available).initFull();
+ {
+ var i: u63 = 0;
+ while (i < size) {
+ var b = a.get().?;
+ try testing.expectEqual(a.indexOf(b), i);
+ try testing.expect(a.put(b));
+ try testing.expect(a.get().? == b);
+ i = i + 1;
+ }
+ i = 0;
+ while (i < size) : (i += 1) {
+ try testing.expect(a.get() == null);
+ }
+ }
+}
diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig
index 7e5bb26ba..f380b38e1 100644
--- a/src/http/websocket_http_client.zig
+++ b/src/http/websocket_http_client.zig
@@ -145,7 +145,19 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type {
vm.uws_event_loop = loop;
- Socket.configure(ctx, HTTPClient, handleOpen, handleClose, handleData, handleWritable, handleTimeout, handleConnectError, handleEnd);
+ Socket.configure(
+ ctx,
+ HTTPClient,
+ .{
+ .onOpen = handleOpen,
+ .onClose = handleClose,
+ .onData = handleData,
+ .onWritable = handleWritable,
+ .onTimeout = handleTimeout,
+ .onConnectError = handleConnectError,
+ .onEnd = handleEnd,
+ },
+ );
if (is_new_loop) {
vm.prepareLoop();
}
@@ -805,13 +817,14 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
Socket.configure(
ctx,
WebSocket,
- null,
- handleClose,
- handleData,
- handleWritable,
- handleTimeout,
- handleConnectError,
- handleEnd,
+ .{
+ .onClose = handleClose,
+ .onData = handleData,
+ .onWritable = handleWritable,
+ .onTimeout = handleTimeout,
+ .onConnectError = handleConnectError,
+ .onEnd = handleEnd,
+ },
);
}
diff --git a/src/http/zlib.zig b/src/http/zlib.zig
index 1bd38777d..f6ada0452 100644
--- a/src/http/zlib.zig
+++ b/src/http/zlib.zig
@@ -20,6 +20,8 @@ pub fn init(allocator: std.mem.Allocator) ZlibPool {
}
pub fn get(this: *ZlibPool) !*MutableString {
+ std.debug.assert(loaded);
+
switch (this.items.items.len) {
0 => {
var mutable = try getAllocator().create(MutableString);
@@ -35,6 +37,7 @@ pub fn get(this: *ZlibPool) !*MutableString {
}
pub fn put(this: *ZlibPool, mutable: *MutableString) !void {
+ std.debug.assert(loaded);
mutable.reset();
try this.items.append(mutable);
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 35c927a13..52f62d473 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -29,98 +29,445 @@ const AsyncBIO = @import("./http/async_bio.zig");
const AsyncSocket = @import("./http/async_socket.zig");
const ZlibPool = @import("./http/zlib.zig");
const URLBufferPool = ObjectPool([4096]u8, null, false, 10);
+const uws = @import("uws");
pub const MimeType = @import("./http/mime_type.zig");
pub const URLPath = @import("./http/url_path.zig");
// This becomes Arena.allocator
pub var default_allocator: std.mem.Allocator = undefined;
pub var default_arena: Arena = undefined;
+pub var http_thread: HTTPThread = undefined;
+const HiveArray = @import("./hive_array.zig").HiveArray;
+const Batch = NetworkThread.Batch;
+const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion;
+
+fn NewHTTPContext(comptime ssl: bool) type {
+ return struct {
+ const pool_size = 64;
+ const PooledSocket = struct {
+ http_socket: HTTPSocket,
+ hostname_buf: [MAX_KEEPALIVE_HOSTNAME]u8 = undefined,
+ hostname_len: u8 = 0,
+ port: u16 = 0,
+
+ pub fn close(this: *PooledSocket) void {
+ this.* = undefined;
+
+ if (comptime ssl) {
+ http_thread.https_context.keep_alive_sockets.unset(http_thread.https_context.pending_sockets.indexOf(this).?);
+ std.debug.assert(http_thread.https_context.pending_sockets.put(this));
+ } else {
+ http_thread.http_context.keep_alive_sockets.unset(http_thread.http_context.pending_sockets.indexOf(this).?);
+ std.debug.assert(http_thread.http_context.pending_sockets.put(this));
+ }
+ }
+ };
-const log = Output.scoped(.fetch, true);
+ pending_sockets: HiveArray(PooledSocket, pool_size) = HiveArray(PooledSocket, pool_size).init(),
+ keep_alive_sockets: std.bit_set.IntegerBitSet(pool_size + 1) = std.bit_set.IntegerBitSet(pool_size + 1).initEmpty(),
+ us_socket_context: *uws.us_socket_context_t,
-pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque {
- onThreadStartNew(0);
- return null;
-}
+ const Context = @This();
+ pub const HTTPSocket = uws.NewSocketHandler(ssl);
+
+ const ActiveSocket = TaggedPointerUnion(.{
+ HTTPClient,
+ PooledSocket,
+ });
+ const ssl_int = @as(c_int, @boolToInt(ssl));
-pub fn onThreadStartNew(waker: AsyncIO.Waker) void {
- Output.Source.configureNamedThread("HTTP");
-
- default_arena = Arena.init() catch unreachable;
- default_allocator = default_arena.allocator();
- NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(default_allocator);
- AsyncIO.global = AsyncIO.init(1024, 0, waker) catch |err| {
- log: {
- if (comptime Environment.isLinux) {
- if (err == error.SystemOutdated) {
- Output.prettyErrorln(
- \\<red>error<r>: Linux kernel version doesn't support io_uring, which Bun depends on.
- \\
- \\ To fix this error: please upgrade to a newer Linux kernel.
- \\
- \\ If you're using Windows Subsystem for Linux, here's how:
- \\ 1. Open PowerShell as an administrator
- \\ 2. Run this:
- \\ wsl --update
- \\ wsl --shutdown
- \\
- \\ Please make sure you're using WSL version 2 (not WSL 1). To check: wsl -l -v
- \\ If you are on WSL 1, update to WSL 2 with the following commands:
- \\ 1. wsl --set-default-version 2
- \\ 2. wsl --set-version [distro_name] 2
- \\ 3. Now follow the WSL 2 instructions above.
- \\ Where [distro_name] is one of the names from the list given by: wsl -l -v
- \\
- \\ If that doesn't work (and you're on a Windows machine), try this:
- \\ 1. Open Windows Update
- \\ 2. Download any updates to Windows Subsystem for Linux
- \\
- \\ If you're still having trouble, ask for help in bun's discord https://bun.sh/discord
- , .{});
- break :log;
- } else if (err == error.SystemResources) {
- Output.prettyErrorln(
- \\<red>error<r>: memlock limit exceeded
- \\
- \\To fix this error: <b>please increase the memlock limit<r> or upgrade to Linux kernel 5.11+
- \\
- \\If Bun is running inside Docker, make sure to set the memlock limit to unlimited (-1)
- \\
- \\ docker run --rm --init --ulimit memlock=-1:-1 jarredsumner/bun:edge
- \\
- \\To bump the memlock limit, check one of the following:
- \\ /etc/security/limits.conf
- \\ /etc/systemd/user.conf
- \\ /etc/systemd/system.conf
- \\
- \\You can also try running bun as root.
- \\
- \\If running many copies of Bun via exec or spawn, be sure that O_CLOEXEC is set so
- \\that resources are not leaked when the child process exits.
- \\
- \\Why does this happen?
- \\
- \\Bun uses io_uring and io_uring accounts memory it
- \\needs under the rlimit memlocked option, which can be
- \\quite low on some setups (64K).
- \\
- \\
- , .{});
- break :log;
+ const MAX_KEEPALIVE_HOSTNAME = 128;
+
+ pub fn init(this: *@This()) !void {
+ var opts: uws.us_socket_context_options_t = undefined;
+ @memset(@ptrCast([*]u8, &opts), 0, @sizeOf(uws.us_socket_context_options_t));
+ this.us_socket_context = uws.us_create_socket_context(ssl_int, uws.Loop.get(), @sizeOf(usize), opts).?;
+
+ HTTPSocket.configure(
+ this.us_socket_context,
+ anyopaque,
+ Handler,
+ );
+ }
+
+ /// Attempt to keep the socket alive by reusing it for another request.
+ /// If no space is available, close the socket.
+ pub fn releaseSocket(this: *@This(), socket: HTTPSocket, hostname: []const u8, port: u16) void {
+ if (comptime Environment.allow_assert) {
+ std.debug.assert(!socket.isClosed());
+ std.debug.assert(!socket.isShutdown());
+ std.debug.assert(socket.isEstablished());
+ }
+
+ if (hostname.len <= MAX_KEEPALIVE_HOSTNAME) {
+ if (this.pending_sockets.get()) |pending| {
+ pending.http_socket = socket;
+ @memcpy(&pending.hostname_buf, hostname.ptr, hostname.len);
+ pending.hostname_len = @truncate(u8, hostname.len);
+ pending.port = port;
+ this.keep_alive_sockets.set(
+ this.pending_sockets.indexOf(pending).?,
+ );
+ pending.http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr());
+ log("Releasing socket for reuse {s}:{d}", .{ hostname, port });
+ return;
}
}
- Output.prettyErrorln("<r><red>error<r>: Failed to initialize network thread: <red><b>{s}<r>.\nHTTP requests will not work. Please file an issue and run strace().", .{@errorName(err)});
+ socket.close(0, null);
}
- Global.exit(1);
+ pub const Handler = struct {
+ pub fn onOpen(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ ) void {
+ if (ActiveSocket.from(bun.cast(**anyopaque, ptr).*).get(HTTPClient)) |client| {
+ return client.onOpen(comptime ssl, socket);
+ }
+ }
+ pub fn onClose(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ _: c_int,
+ _: ?*anyopaque,
+ ) void {
+ var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (tagged.get(HTTPClient)) |client| {
+ return client.onClose(comptime ssl, socket);
+ }
+
+ if (tagged.get(PooledSocket)) |client| {
+ return client.close();
+ }
+
+ unreachable;
+ }
+ pub fn onData(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ buf: []const u8,
+ ) void {
+ var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (tagged.get(HTTPClient)) |client| {
+ return client.onData(
+ comptime ssl,
+ buf,
+ if (comptime ssl) &http_thread.https_context else &http_thread.http_context,
+ socket,
+ );
+ }
+
+ unreachable;
+ }
+ pub fn onWritable(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ ) void {
+ var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (tagged.get(HTTPClient)) |client| {
+ return client.onWritable(
+ false,
+ comptime ssl,
+ socket,
+ );
+ }
+
+ unreachable;
+ }
+ pub fn onTimeout(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ ) void {
+ var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (tagged.get(HTTPClient)) |client| {
+ return client.onTimeout(
+ comptime ssl,
+ socket,
+ );
+ } else if (tagged.get(PooledSocket)) |pooled| {
+ pooled.close();
+ }
+
+ unreachable;
+ }
+ pub fn onConnectError(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ _: c_int,
+ ) void {
+ var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (tagged.get(HTTPClient)) |client| {
+ return client.onConnectError(
+ comptime ssl,
+ socket,
+ );
+ } else if (tagged.get(PooledSocket)) |pooled| {
+ pooled.close();
+ }
+
+ unreachable;
+ }
+ pub fn onEnd(
+ ptr: *anyopaque,
+ socket: HTTPSocket,
+ ) void {
+ var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (tagged.get(HTTPClient)) |client| {
+ return client.onEnd(
+ comptime ssl,
+ socket,
+ );
+ } else if (tagged.get(PooledSocket)) |pooled| {
+ pooled.close();
+ }
+
+ unreachable;
+ }
+ };
+
+ fn existingSocket(this: *@This(), hostname: []const u8, port: u16) ?HTTPSocket {
+ if (hostname.len > MAX_KEEPALIVE_HOSTNAME)
+ return null;
+
+ var iter = this.keep_alive_sockets.iterator(.{
+ .kind = .set,
+ });
+ while (iter.next()) |index_i| {
+ const index = @truncate(u16, index_i);
+ var socket = this.pending_sockets.at(index);
+ if (socket.port != port) {
+ continue;
+ }
+
+ std.debug.assert(!this.pending_sockets.available.isSet(index));
+
+ if (strings.eqlLong(socket.hostname_buf[0..socket.hostname_len], hostname, true)) {
+ const http_socket = socket.http_socket;
+ socket.close();
+ log("Keep-alive socket found for {s}:{d}.", .{ hostname, port });
+ return http_socket;
+ }
+ }
+
+ return null;
+ }
+
+ pub fn connect(this: *@This(), client: *HTTPClient, hostname: []const u8, port: u16) !HTTPSocket {
+ if (this.existingSocket(hostname, port)) |sock| {
+ sock.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
+ client.onOpen(comptime ssl, sock);
+ return sock;
+ }
+
+ if (HTTPSocket.connectAnon(
+ hostname,
+ port,
+ this.us_socket_context,
+ undefined,
+ )) |socket| {
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
+ return socket;
+ }
+
+ return error.FailedToOpenSocket;
+ }
};
+}
+
+pub const HTTPThread = struct {
+ var http_thread_loaded: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false);
- AsyncIO.global_loaded = true;
- NetworkThread.global.io = &AsyncIO.global;
+ loop: *uws.Loop,
+ http_context: NewHTTPContext(false),
+ https_context: NewHTTPContext(true),
- AsyncBIO.initBoringSSL();
+ queued_tasks_mutex: Lock = Lock.init(),
+ queued_tasks: Batch = .{},
+ processing_tasks: Batch = .{},
+ has_awoken: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ timer: std.time.Timer = undefined,
+ const threadlog = Output.scoped(.HTTPThread, true);
- NetworkThread.global.processEvents();
+ pub fn init() !void {
+ if (http_thread_loaded.swap(true, .SeqCst)) {
+ return;
+ }
+
+ http_thread = .{
+ .loop = undefined,
+ .http_context = .{
+ .us_socket_context = undefined,
+ },
+ .https_context = .{
+ .us_socket_context = undefined,
+ },
+ .timer = std.time.Timer.start() catch unreachable,
+ };
+
+ var thread = try std.Thread.spawn(.{
+ .stack_size = 16 * 1024 * 1024,
+ }, onStart, .{});
+ thread.detach();
+ }
+
+ pub fn onStart() void {
+ Output.Source.configureNamedThread("HTTP Client");
+ default_arena = Arena.init() catch unreachable;
+ default_allocator = default_arena.allocator();
+ var loop = uws.Loop.get().?;
+ _ = loop.addPostHandler(*HTTPThread, &http_thread, drainEvents);
+ http_thread.loop = loop;
+ http_thread.http_context.init() catch @panic("Failed to init http context");
+ http_thread.https_context.init() catch @panic("Failed to init https context");
+ http_thread.has_awoken.store(true, .Monotonic);
+ http_thread.processEvents();
+ }
+
+ fn queueEvents(this: *@This()) void {
+ this.queued_tasks_mutex.lock();
+ defer this.queued_tasks_mutex.unlock();
+ if (this.queued_tasks.len == 0)
+ return;
+ threadlog("Received {d} tasks\n", .{this.queued_tasks.len});
+ this.processing_tasks.push(this.queued_tasks);
+ this.queued_tasks = .{};
+ }
+
+ pub fn connect(this: *@This(), client: *HTTPClient, comptime is_ssl: bool) !NewHTTPContext(is_ssl).HTTPSocket {
+ return try this.context(is_ssl).connect(client, client.url.hostname, client.url.getPortAuto());
+ }
+
+ pub fn context(this: *@This(), comptime is_ssl: bool) *NewHTTPContext(is_ssl) {
+ return if (is_ssl) &this.https_context else &this.http_context;
+ }
+
+ fn drainEvents(this: *@This()) void {
+ this.queueEvents();
+
+ var count: usize = 0;
+
+ while (this.processing_tasks.pop()) |task| {
+ var callback = task.callback;
+ callback(task);
+ if (comptime Environment.allow_assert) {
+ count += 1;
+ }
+ }
+
+ if (comptime Environment.allow_assert) {
+ if (count > 0)
+ log("Processed {d} tasks\n", .{count});
+ }
+ }
+
+ fn processEvents_(this: *@This()) void {
+ while (true) {
+ this.drainEvents();
+
+ var start_time: i128 = 0;
+ if (comptime Environment.isDebug) {
+ start_time = std.time.nanoTimestamp();
+ }
+ Output.flush();
+ this.loop.run();
+ if (comptime Environment.isDebug) {
+ var end = std.time.nanoTimestamp();
+ threadlog("Waited {any}\n", .{std.fmt.fmtDurationSigned(@truncate(i64, end - start_time))});
+ Output.flush();
+ }
+ }
+ }
+
+ pub fn processEvents(this: *@This()) void {
+ processEvents_(this);
+ unreachable;
+ }
+ pub fn schedule(this: *@This(), batch: Batch) void {
+ if (batch.len == 0)
+ return;
+
+ {
+ this.queued_tasks_mutex.lock();
+ defer this.queued_tasks_mutex.unlock();
+ this.queued_tasks.push(batch);
+ }
+
+ if (this.has_awoken.load(.Monotonic))
+ this.loop.wakeup();
+ }
+};
+
+const log = Output.scoped(.fetch, false);
+
+const AnySocket = union {
+ https: NewHTTPContext(true).HTTPSocket,
+ http: NewHTTPContext(false).HTTPSocket,
+ none: void,
+
+ pub inline fn field(self: @This(), comptime is_ssl: bool) *uws.Socket {
+ return switch (is_ssl) {
+ true => self.https.socket,
+ false => self.http.socket,
+ };
+ }
+
+ pub inline fn client(self: *@This()) *HTTPClient {
+ return @fieldParentPtr(HTTPClient, "socket", self);
+ }
+};
+
+pub fn onOpen(
+ client: *HTTPClient,
+ comptime is_ssl: bool,
+ socket: NewHTTPContext(is_ssl).HTTPSocket,
+) void {
+ log("Connected {s} \n", .{client.url.href});
+ if (client.state.request_stage == .pending) {
+ client.onWritable(true, comptime is_ssl, socket);
+ }
+}
+pub fn onClose(
+ client: *HTTPClient,
+ comptime is_ssl: bool,
+ socket: NewHTTPContext(is_ssl).HTTPSocket,
+) void {
+ _ = socket;
+ log("Closed {s}\n", .{client.url.href});
+
+ if (client.state.stage != .done and client.state.stage != .fail)
+ client.fail(error.ConnectionClosed);
+}
+pub fn onTimeout(
+ client: *HTTPClient,
+ comptime is_ssl: bool,
+ socket: NewHTTPContext(is_ssl).HTTPSocket,
+) void {
+ _ = socket;
+ log("Timeout {s}\n", .{client.url.href});
+
+ if (client.state.stage != .done and client.state.stage != .fail)
+ client.fail(error.Timeout);
+}
+pub fn onConnectError(
+ client: *HTTPClient,
+ comptime is_ssl: bool,
+ socket: NewHTTPContext(is_ssl).HTTPSocket,
+) void {
+ _ = socket;
+ log("onConnectError {s}\n", .{client.url.href});
+
+ if (client.state.stage != .done and client.state.stage != .fail)
+ client.fail(error.ConnectionRefused);
+}
+pub fn onEnd(
+ client: *HTTPClient,
+ comptime is_ssl: bool,
+ _: NewHTTPContext(is_ssl).HTTPSocket,
+) void {
+ log("onEnd {s}\n", .{client.url.href});
+
+ if (client.state.stage != .done and client.state.stage != .fail)
+ client.fail(error.ConnectionClosed);
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -146,47 +493,147 @@ fn writeRequest(
comptime Writer: type,
writer: Writer,
request: picohttp.Request,
- body: string,
// header_hashes: []u64,
) !void {
- _ = writer.write(request.method);
- _ = writer.write(" ");
- _ = writer.write(request.path);
- _ = writer.write(" HTTP/1.1\r\n");
+ _ = writer.write(request.method) catch 0;
+ _ = writer.write(" ") catch 0;
+ _ = writer.write(request.path) catch 0;
+ _ = writer.write(" HTTP/1.1\r\n") catch 0;
for (request.headers) |header| {
- _ = writer.write(header.name);
- _ = writer.write(": ");
- _ = writer.write(header.value);
- _ = writer.write("\r\n");
+ _ = writer.write(header.name) catch 0;
+ _ = writer.write(": ") catch 0;
+ _ = writer.write(header.value) catch 0;
+ _ = writer.write("\r\n") catch 0;
}
- _ = writer.write("\r\n");
+ _ = writer.write("\r\n") catch 0;
+}
- if (body.len > 0) {
- _ = writer.write(body);
+pub const HTTPStage = enum {
+ pending,
+ headers,
+ body,
+ body_chunk,
+ fail,
+ done,
+};
+
+pub const InternalState = struct {
+ request_message: ?*AsyncMessage = null,
+ pending_response: picohttp.Response = undefined,
+ allow_keepalive: bool = true,
+ transfer_encoding: Encoding = Encoding.identity,
+ encoding: Encoding = Encoding.identity,
+ content_encoding_i: u8 = std.math.maxInt(u8),
+ chunked_decoder: picohttp.phr_chunked_decoder = .{},
+ stage: Stage = Stage.pending,
+ body_out_str: ?*MutableString = null,
+ compressed_body: ?*MutableString = null,
+ body_size: usize = 0,
+ chunked_offset: usize = 0,
+ request_body: []const u8 = "",
+ request_sent_len: usize = 0,
+ fail: anyerror = error.NoError,
+ request_stage: HTTPStage = .pending,
+ response_stage: HTTPStage = .pending,
+
+ pub fn reset(this: *InternalState) void {
+ if (this.request_message) |msg| {
+ msg.release();
+ this.request_message = null;
+ }
+
+ if (this.compressed_body) |body| {
+ ZlibPool.instance.put(body) catch unreachable;
+ this.compressed_body = null;
+ }
+
+ if (this.body_out_str) |body| {
+ body.reset();
+ }
+
+ this.* = .{
+ .body_out_str = this.body_out_str,
+ };
}
-}
+
+ pub fn getBodyBuffer(this: *InternalState) *MutableString {
+ switch (this.encoding) {
+ Encoding.gzip, Encoding.deflate => {
+ if (this.compressed_body == null) {
+ if (!ZlibPool.loaded) {
+ ZlibPool.instance = ZlibPool.init(default_allocator);
+ ZlibPool.loaded = true;
+ }
+
+ this.compressed_body = ZlibPool.instance.get() catch unreachable;
+ }
+
+ return this.compressed_body.?;
+ },
+ else => {
+ return this.body_out_str.?;
+ },
+ }
+ }
+
+ pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !void {
+ var body_out_str = this.body_out_str.?;
+ var buffer_ = this.getBodyBuffer();
+ buffer_.* = buffer;
+
+ switch (this.encoding) {
+ Encoding.gzip, Encoding.deflate => {
+ var gzip_timer: std.time.Timer = undefined;
+
+ if (extremely_verbose)
+ gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
+
+ body_out_str.list.expandToCapacity();
+ defer ZlibPool.instance.put(buffer_) catch unreachable;
+ ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| {
+ Output.prettyErrorln("<r><red>Zlib error<r>", .{});
+ Output.flush();
+ return err;
+ };
+
+ if (extremely_verbose)
+ this.gzip_elapsed = gzip_timer.read();
+ },
+ else => {},
+ }
+
+ var response = &this.pending_response;
+ // if it compressed with this header, it is no longer
+ if (this.content_encoding_i < response.headers.len) {
+ var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
+ _ = mutable_headers.orderedRemove(this.content_encoding_i);
+ response.headers = mutable_headers.items;
+ this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
+ }
+
+ this.body_size = @truncate(usize, body_out_str.list.items.len);
+ }
+};
+
+const default_redirect_count = 127;
method: Method,
header_entries: Headers.Entries,
header_buf: string,
url: URL,
+connected_url: URL = URL{},
allocator: std.mem.Allocator,
verbose: bool = Environment.isTest,
-tcp_client: tcp.Client = undefined,
-body_size: u32 = 0,
-read_count: u32 = 0,
-remaining_redirect_count: i8 = 127,
+remaining_redirect_count: i8 = default_redirect_count,
redirect: ?*URLBufferPool.Node = null,
-disable_shutdown: bool = true,
timeout: usize = 0,
progress_node: ?*std.Progress.Node = null,
-socket: AsyncSocket.SSL = undefined,
-socket_loaded: bool = false,
-gzip_elapsed: u64 = 0,
-stage: Stage = Stage.pending,
received_keep_alive: bool = false,
+state: InternalState = .{},
+
+completion_callback: HTTPClientResult.Callback = undefined,
/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since.
/// This is a workaround for that.
@@ -202,16 +649,13 @@ pub fn init(
url: URL,
header_entries: Headers.Entries,
header_buf: string,
-) !HTTPClient {
+) HTTPClient {
return HTTPClient{
.allocator = allocator,
.method = method,
.url = url,
.header_entries = header_entries,
.header_buf = header_buf,
- .socket = AsyncSocket.SSL{
- .socket = undefined,
- },
};
}
@@ -220,14 +664,15 @@ pub fn deinit(this: *HTTPClient) !void {
redirect.release();
this.redirect = null;
}
+
+ this.state.reset();
}
const Stage = enum(u8) {
pending,
connect,
- request,
- response,
done,
+ fail,
};
// threadlocal var resolver_cache
@@ -273,7 +718,8 @@ const transfer_encoding_header = hashHeaderName("Transfer-Encoding");
const host_header_name = "Host";
const content_length_header_name = "Content-Length";
const content_length_header_hash = hashHeaderName("Content-Length");
-const connection_header = picohttp.Header{ .name = "Connection", .value = "close" };
+const connection_header = picohttp.Header{ .name = "Connection", .value = "keep-alive" };
+const connection_closing_header = picohttp.Header{ .name = "Connection", .value = "close" };
const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" };
const accept_header_hash = hashHeaderName("Accept");
@@ -299,10 +745,11 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string {
pub const HeaderBuilder = @import("./http/header_builder.zig");
-pub const HTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 });
+const HTTPCallbackPair = .{ *AsyncHTTP, HTTPClientResult };
+pub const HTTPChannel = @import("./sync.zig").Channel(HTTPCallbackPair, .{ .Static = 1000 });
// 32 pointers much cheaper than 1000 pointers
const SingleHTTPChannel = struct {
- const SingleHTTPCHannel_ = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 8 });
+ const SingleHTTPCHannel_ = @import("./sync.zig").Channel(HTTPClientResult, .{ .Static = 8 });
channel: SingleHTTPCHannel_,
pub fn reset(_: *@This()) void {}
pub fn init() SingleHTTPChannel {
@@ -314,11 +761,9 @@ pub const HTTPChannelContext = struct {
http: AsyncHTTP = undefined,
channel: *HTTPChannel,
- pub fn callback(
- http: *AsyncHTTP,
- ) void {
- var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", http);
- this.channel.writeItem(http) catch unreachable;
+ pub fn callback(data: HTTPCallbackPair) void {
+ var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", data.@"0");
+ this.channel.writeItem(data) catch unreachable;
}
};
@@ -391,13 +836,14 @@ pub const AsyncHTTP = struct {
max_retry_count: u32 = 0,
url: URL,
- task: ThreadPool.Task = ThreadPool.Task{ .callback = HTTPSender.callback },
+ task: ThreadPool.Task = ThreadPool.Task{ .callback = startAsyncHTTP },
+ completion_callback: HTTPClientResult.Callback = undefined,
/// Timeout in nanoseconds
timeout: usize = 0,
+ redirected: bool = false,
response_encoding: Encoding = Encoding.identity,
- redirect_count: u32 = 0,
retries_count: u32 = 0,
verbose: bool = false,
@@ -408,12 +854,6 @@ pub const AsyncHTTP = struct {
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
- /// Callback runs when request finishes
- /// Executes on the network thread
- callback: ?CompletionCallback = null,
- callback_ctx: ?*anyopaque = null,
-
- pub const CompletionCallback = fn (this: *AsyncHTTP) void;
pub var active_requests_count = std.atomic.Atomic(u32).init(0);
pub var max_simultaneous_requests: u16 = 32;
@@ -435,7 +875,8 @@ pub const AsyncHTTP = struct {
response_buffer: *MutableString,
request_body: *MutableString,
timeout: usize,
- ) !AsyncHTTP {
+ callback: HTTPClientResult.Callback,
+ ) AsyncHTTP {
var this = AsyncHTTP{
.allocator = allocator,
.url = url,
@@ -444,13 +885,37 @@ pub const AsyncHTTP = struct {
.request_header_buf = headers_buf,
.request_body = request_body,
.response_buffer = response_buffer,
+ .completion_callback = callback,
};
- this.client = try HTTPClient.init(allocator, method, url, headers, headers_buf);
+ this.client = HTTPClient.init(allocator, method, url, headers, headers_buf);
this.client.timeout = timeout;
this.timeout = timeout;
return this;
}
+ pub fn initSync(
+ allocator: std.mem.Allocator,
+ method: Method,
+ url: URL,
+ headers: Headers.Entries,
+ headers_buf: string,
+ response_buffer: *MutableString,
+ request_body: *MutableString,
+ timeout: usize,
+ ) AsyncHTTP {
+ return @This().init(
+ allocator,
+ method,
+ url,
+ headers,
+ headers_buf,
+ response_buffer,
+ request_body,
+ timeout,
+ undefined,
+ );
+ }
+
fn reset(this: *AsyncHTTP) !void {
const timeout = this.timeout;
this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf);
@@ -459,100 +924,67 @@ pub const AsyncHTTP = struct {
}
pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void {
- NetworkThread.init() catch unreachable;
+ HTTPThread.init() catch unreachable;
this.state.store(.scheduled, .Monotonic);
batch.push(ThreadPool.Batch.from(&this.task));
}
- fn sendSyncCallback(this: *AsyncHTTP) void {
- var single_http_channel = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
- single_http_channel.channel.writeItem(this) catch unreachable;
+ fn sendSyncCallback(this: *SingleHTTPChannel, result: HTTPClientResult) void {
+ this.channel.writeItem(result) catch unreachable;
}
pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response {
- if (this.callback_ctx == null) {
- var ctx = try bun.default_allocator.create(SingleHTTPChannel);
- ctx.* = SingleHTTPChannel.init();
- this.callback_ctx = ctx;
- } else {
- var ctx = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
- ctx.* = SingleHTTPChannel.init();
- }
+ try HTTPThread.init();
- this.callback = sendSyncCallback;
+ var ctx = try bun.default_allocator.create(SingleHTTPChannel);
+ ctx.* = SingleHTTPChannel.init();
+ this.completion_callback = HTTPClientResult.Callback.New(
+ *SingleHTTPChannel,
+ sendSyncCallback,
+ ).init(ctx);
var batch = NetworkThread.Batch{};
this.schedule(bun.default_allocator, &batch);
- NetworkThread.global.schedule(batch);
+ http_thread.schedule(batch);
while (true) {
- var data = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
- var async_http: *AsyncHTTP = data.channel.readItem() catch unreachable;
- if (async_http.err) |err| {
- return err;
+ const result: HTTPClientResult = ctx.channel.readItem() catch unreachable;
+ if (result.fail != error.NoError) {
+ return result.fail;
}
- return async_http.response.?;
+ return result.response;
}
unreachable;
}
- pub const HTTPSender = struct {
- frame: @Frame(AsyncHTTP.do) = undefined,
- finisher: ThreadPool.Task = .{ .callback = onFinish },
-
- pub const Pool = ObjectPool(HTTPSender, null, false, 8);
-
- pub fn callback(task: *ThreadPool.Task) void {
- var this = @fieldParentPtr(AsyncHTTP, "task", task);
- var sender = HTTPSender.Pool.get(default_allocator);
- sender.data = .{
- .frame = undefined,
- .finisher = .{ .callback = onFinish },
- };
- sender.data.frame = async do(&sender.data, this);
- }
-
- pub fn onFinish(task: *ThreadPool.Task) void {
- var this = @fieldParentPtr(HTTPSender, "finisher", task);
- @fieldParentPtr(HTTPSender.Pool.Node, "data", this).release();
- }
- };
-
- pub fn do(sender: *HTTPSender, this: *AsyncHTTP) void {
- defer {
- NetworkThread.global.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 });
- }
-
- outer: {
+ pub fn onAsyncHTTPComplete(this: *AsyncHTTP, result: HTTPClientResult) void {
+ var completion = this.completion_callback;
+ this.response = result.response;
+ this.elapsed = http_thread.timer.read() -| this.elapsed;
+ this.redirected = this.client.remaining_redirect_count != default_redirect_count;
+ if (result.fail != error.NoError) {
+ this.err = result.fail;
+ this.state.store(State.fail, .Monotonic);
+ } else {
this.err = null;
- this.state.store(.sending, .Monotonic);
-
- const start = NetworkThread.global.timer.read();
- defer this.elapsed = NetworkThread.global.timer.read() -| start;
-
- this.response = this.client.send(this.request_body.list.items, this.response_buffer) catch |err| {
- this.state.store(.fail, .Monotonic);
- this.err = err;
+ this.state.store(.success, .Monotonic);
+ }
- if (this.max_retry_count > this.retries_count) {
- this.retries_count += 1;
- this.response_buffer.reset();
+ completion.function(completion.ctx, result);
+ }
- NetworkThread.global.schedule(ThreadPool.Batch.from(&this.task));
- return;
- }
- break :outer;
- };
+ pub fn startAsyncHTTP(task: *Task) void {
+ var this = @fieldParentPtr(AsyncHTTP, "task", task);
+ this.err = null;
+ this.state.store(.sending, .Monotonic);
+ this.client.completion_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPComplete).init(
+ this,
+ );
- this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0));
- this.state.store(.success, .Monotonic);
- this.gzip_elapsed = this.client.gzip_elapsed;
- }
+ this.elapsed = http_thread.timer.read();
- if (this.callback) |callback| {
- callback(this);
- }
+ this.client.start(this.request_body.list.items, this.response_buffer);
}
};
@@ -660,206 +1092,584 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
};
}
-pub fn connect(
- this: *HTTPClient,
- comptime ConnectType: type,
- connector: ConnectType,
-) !void {
- const port = this.url.getPortAuto();
- if (this.verbose) Output.prettyErrorln("<d>[HTTP]<r> Connecting to {s}:{d}", .{ this.url.href, port });
- try connector.connect(this.url.hostname, port);
- std.debug.assert(this.socket.socket.socket > 0);
- var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) };
- // client.setQuickACK(true) catch {};
-
- this.tcp_client = client;
- if (this.timeout > 0) {
- client.setReadTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {};
- client.setWriteTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {};
+pub fn doRedirect(this: *HTTPClient) void {
+ var body_out_str = this.state.body_out_str.?;
+ this.remaining_redirect_count -|= 1;
+
+ if (this.remaining_redirect_count == 0) {
+ this.fail(error.TooManyRedirects);
+ return;
}
+ this.state.reset();
+ return this.start("", body_out_str);
}
-pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) @Frame(HTTPClient.send) {
- return async this.send(body, body_out_str);
+pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void {
+ body_out_str.reset();
+
+ std.debug.assert(this.state.request_message == null);
+ this.state = InternalState{
+ .request_body = body,
+ .body_out_str = body_out_str,
+ .stage = Stage.pending,
+ .request_message = null,
+ .pending_response = picohttp.Response{},
+ .compressed_body = null,
+ };
+
+ if (this.url.isHTTPS()) {
+ this.start_(true);
+ } else {
+ this.start_(false);
+ }
}
-fn maybeClearSocket(this: *HTTPClient) void {
- if (this.socket_loaded) {
- this.socket_loaded = false;
+fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
+ this.connected_url = this.url;
+ var socket = http_thread.connect(this, is_ssl) catch |err| {
+ this.fail(err);
+ return;
+ };
- this.socket.deinit();
+ if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) {
+ this.fail(error.ConnectionClosed);
+ std.debug.assert(this.state.fail != error.NoError);
}
}
-pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
- defer this.maybeClearSocket();
+const Task = ThreadPool.Task;
- // this prevents stack overflow
- redirect: while (this.remaining_redirect_count >= -1) {
- this.maybeClearSocket();
+pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ switch (this.state.request_stage) {
+ .pending, .headers => {
+ var stack_fallback = std.heap.stackFallback(16384, default_allocator);
+ var allocator = stack_fallback.get();
+ var list = std.ArrayList(u8).initCapacity(allocator, stack_fallback.buffer.len) catch unreachable;
+ defer if (list.capacity > stack_fallback.buffer.len) list.deinit();
+ var writer = &list.writer();
+
+ socket.timeout(60);
+
+ const request = this.buildRequest(this.state.request_body.len);
+ writeRequest(
+ @TypeOf(writer),
+ writer,
+ request,
+ ) catch {
+ this.fail(error.OutOfMemory);
+ socket.close(0, null);
+ return;
+ };
- _ = AsyncHTTP.active_requests_count.fetchAdd(1, .Monotonic);
- defer {
- _ = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
- }
+ const headers_len = list.items.len;
+ std.debug.assert(list.items.len == writer.context.items.len);
+ if (this.state.request_body.len > 0 and list.capacity - list.items.len > 0) {
+ var remain = list.items.ptr[list.items.len..list.capacity];
+ @memcpy(remain.ptr, this.state.request_body.ptr, @minimum(remain.len, this.state.request_body.len));
+ }
- this.stage = Stage.pending;
- body_out_str.reset();
+ const to_send = list.items[this.state.request_sent_len..];
+ if (comptime Environment.allow_assert) {
+ std.debug.assert(!socket.isShutdown());
+ std.debug.assert(!socket.isClosed());
+ }
+ const amount = socket.write(to_send, true);
+ if (comptime is_first_call) {
+ if (amount == 0) {
+ // don't worry about it
+ return;
+ }
+ }
- if (this.url.isHTTPS()) {
- return this.sendHTTPS(body, body_out_str) catch |err| {
- switch (err) {
- error.Redirect => {
- this.remaining_redirect_count -= 1;
+ if (amount < 0) {
+ this.fail(error.WriteFailed);
+ socket.close(0, null);
+ return;
+ }
+
+ this.state.request_sent_len += @intCast(usize, amount);
+ const has_sent_headers = this.state.request_sent_len >= headers_len;
+
+ if (has_sent_headers and this.state.request_body.len > 0) {
+ this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
+ }
+
+ const has_sent_body = this.state.request_body.len == 0;
+
+ if (has_sent_headers and has_sent_body) {
+ this.state.request_stage = .done;
+ return;
+ }
+
+ if (has_sent_headers) {
+ this.state.request_stage = .body;
+ std.debug.assert(this.state.request_body.len > 0);
+ } else {
+ this.state.request_stage = .headers;
+ }
+ },
+ .body => {
+ socket.timeout(60);
+
+ const to_send = this.state.request_body;
+ const amount = socket.write(to_send, true);
+ if (amount < 0) {
+ this.fail(error.WriteFailed);
+ socket.close(0, null);
+ return;
+ }
- continue :redirect;
+ this.state.request_sent_len += @intCast(usize, amount);
+ this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
+
+ if (this.state.request_body.len == 0) {
+ this.state.request_stage = .done;
+ return;
+ }
+ },
+ else => {},
+ }
+}
+
+pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ switch (this.state.response_stage) {
+ .pending, .headers => {
+ var to_read = incoming_data;
+ var pending_buffers: [2]string = .{ "", "" };
+ var amount_read: usize = 0;
+ var needs_move = true;
+ if (this.state.request_message) |req_msg| {
+ var available = req_msg.available();
+ if (available.len == 0) {
+ this.state.request_message.?.release();
+ this.state.request_message = null;
+ this.fail(error.ResponseHeadersTooLarge);
+ socket.close(0, null);
+ return;
+ }
+
+ @memcpy(
+ req_msg.available().ptr,
+ incoming_data.ptr,
+ @minimum(available.len, incoming_data.len),
+ );
+ req_msg.used += @truncate(u32, incoming_data.len);
+ amount_read = @truncate(u32, req_msg.sent);
+ req_msg.sent = 0;
+ needs_move = false;
+ to_read = req_msg.slice();
+ pending_buffers[1] = incoming_data[@minimum(available.len, incoming_data.len)..];
+ }
+
+ const response = picohttp.Response.parseParts(
+ to_read,
+ &this.response_headers_buf,
+ &amount_read,
+ ) catch |err| {
+ switch (err) {
+ error.ShortRead => {
+ socket.timeout(60);
+ if (needs_move) {
+ std.debug.assert(this.state.request_message == null);
+ this.state.request_message = AsyncMessage.get(default_allocator);
+ if (to_read.len > this.state.request_message.?.buf.len) {
+ this.fail(error.ResponseHeadersTooLarge);
+ socket.close(0, null);
+ return;
+ }
+
+ _ = this.state.request_message.?.writeAll(incoming_data);
+ this.state.request_message.?.sent = @truncate(u32, to_read.len);
+ return;
+ }
+ },
+ else => {
+ socket.close(0, null);
+ this.fail(err);
+ return;
},
- else => return err,
}
+ unreachable;
};
- } else {
- return this.sendHTTP(body, body_out_str) catch |err| {
- switch (err) {
- error.Redirect => {
- this.remaining_redirect_count -= 1;
+ pending_buffers[0] = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..];
+ if (pending_buffers[0].len == 0 and pending_buffers[1].len > 0) {
+ pending_buffers[0] = pending_buffers[1];
+ pending_buffers[1] = "";
+ }
- continue :redirect;
- },
- else => return err,
+ var deferred_redirect: ?*URLBufferPool.Node = null;
+ const can_continue = this.handleResponseMetadata(
+ response,
+ // If there are multiple consecutive redirects
+ // and the redirect differs in hostname
+ // the new URL buffer may point to invalid memory after
+ // this function is called
+ // That matters because for Keep Alive, the hostname must point to valid memory
+ &deferred_redirect,
+ ) catch |err| {
+ if (err == error.Redirect) {
+ if (this.state.request_message) |msg| {
+ msg.release();
+ this.state.request_message = null;
+ }
+
+ if (this.state.allow_keepalive) {
+ std.debug.assert(this.connected_url.hostname.len > 0);
+ ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto());
+ } else {
+ socket.close(0, null);
+ }
+
+ if (deferred_redirect) |redirect| {
+ std.debug.assert(redirect != this.redirect);
+ // connected_url no longer points to valid memory
+ redirect.release();
+ }
+ this.connected_url = URL{};
+ this.doRedirect();
+ return;
}
+
+ socket.close(0, null);
+ this.fail(err);
+ return;
};
- }
- }
- return error.TooManyRedirects;
-}
+ if (!can_continue) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
-const Task = ThreadPool.Task;
+ if (pending_buffers[0].len == 0) {
+ return;
+ }
-pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
- this.socket = AsyncSocket.SSL{
- .socket = try AsyncSocket.init(&AsyncIO.global, 0, default_allocator),
- };
- this.socket_loaded = true;
- this.stage = Stage.connect;
- var socket = &this.socket.socket;
- try this.connect(*AsyncSocket, socket);
- this.stage = Stage.request;
- defer this.closeSocket();
-
- var request = buildRequest(this, body.len);
- if (this.verbose) {
- Output.prettyErrorln("{s}", .{request});
+ if (this.state.response_stage == .body) {
+ {
+ const is_done = this.handleResponseBody(pending_buffers[0]) catch |err| {
+ socket.close(0, null);
+ this.fail(err);
+ return;
+ };
+
+ if (is_done) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
+ }
+
+ if (pending_buffers[1].len > 0) {
+ const is_done = this.handleResponseBody(pending_buffers[1]) catch |err| {
+ socket.close(0, null);
+ this.fail(err);
+ return;
+ };
+
+ if (is_done) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
+ }
+ } else if (this.state.response_stage == .body_chunk) {
+ {
+ const is_done = this.handleResponseBodyChunk(pending_buffers[0]) catch |err| {
+ socket.close(0, null);
+ this.fail(err);
+ return;
+ };
+
+ if (is_done) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
+ }
+
+ if (pending_buffers[1].len > 0) {
+ const is_done = this.handleResponseBodyChunk(pending_buffers[1]) catch |err| {
+ socket.close(0, null);
+ this.fail(err);
+ return;
+ };
+
+ if (is_done) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
+ }
+
+ socket.timeout(60);
+ }
+ },
+
+ .body => {
+ socket.timeout(60);
+
+ const is_done = this.handleResponseBody(incoming_data) catch |err| {
+ socket.close(0, null);
+ this.fail(err);
+ return;
+ };
+
+ if (is_done) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
+ },
+
+ .body_chunk => {
+ socket.timeout(60);
+
+ const is_done = this.handleResponseBodyChunk(incoming_data) catch |err| {
+ socket.close(0, null);
+ this.fail(err);
+ return;
+ };
+
+ if (is_done) {
+ this.done(is_ssl, ctx, socket);
+ return;
+ }
+ },
+
+ .fail => {},
+
+ else => {
+ socket.close(0, null);
+ this.fail(error.UnexpectedData);
+ return;
+ },
}
+}
- try writeRequest(@TypeOf(socket), socket, request, body);
+fn fail(this: *HTTPClient, err: anyerror) void {
+ this.state.request_stage = .fail;
+ this.state.response_stage = .fail;
+ this.state.fail = err;
+ this.state.stage = .fail;
- _ = try socket.send();
- this.stage = Stage.response;
- if (this.progress_node == null) {
- return this.processResponse(
- false,
- @TypeOf(socket),
- socket,
- body_out_str,
- );
- } else {
- return this.processResponse(
- true,
- @TypeOf(socket),
- socket,
- body_out_str,
- );
+ const callback = this.completion_callback;
+ const result = this.toResult();
+ this.state.reset();
+ callback.run(result);
+}
+
+pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ var out_str = this.state.body_out_str.?;
+ var body = out_str.*;
+ const result = this.toResult();
+ const callback = this.completion_callback;
+
+ this.state.response_stage = .done;
+ this.state.request_stage = .done;
+ this.state.stage = .done;
+
+ if (this.state.allow_keepalive and !socket.isClosed()) {
+ socket.timeout(60 * 5);
+ ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto());
+ } else if (!socket.isClosed()) {
+ socket.close(0, null);
}
+
+ this.state.reset();
+ result.body.?.* = body;
+ std.debug.assert(this.state.stage != .done);
+ this.state.response_stage = .done;
+ this.state.request_stage = .done;
+ this.state.stage = .done;
+
+ callback.run(result);
}
-pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response {
- defer if (this.verbose) Output.flush();
- var response: picohttp.Response = .{
- .minor_version = 1,
- .status_code = 0,
- .status = "",
- .headers = &[_]picohttp.Header{},
- };
- var request_message = AsyncMessage.get(default_allocator);
- defer request_message.release();
- var request_buffer: []u8 = request_message.buf;
- var read_length: usize = 0;
- {
- var read_headers_up_to: usize = 0;
-
- var req_buf_read: usize = std.math.maxInt(usize);
- defer this.read_count += @intCast(u32, read_length);
-
- restart: while (req_buf_read != 0) {
- req_buf_read = try client.read(request_buffer, read_length);
- read_length += req_buf_read;
- var request_body = request_buffer[0..read_length];
- log("request_body ({d}):\n{s}", .{ read_length, request_body });
- if (comptime report_progress) {
- this.progress_node.?.activate();
- this.progress_node.?.setCompletedItems(read_length);
- this.progress_node.?.context.maybeRefresh();
- }
+pub const HTTPClientResult = struct {
+ body: ?*MutableString = null,
+ response: picohttp.Response,
+ metadata_buf: []u8 = &.{},
+ href: []const u8 = "",
+ fail: anyerror = error.NoError,
+ headers_buf: []picohttp.Header = &.{},
+
+ pub fn deinitMetadata(this: *HTTPClientResult) void {
+ if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
+ if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
+ this.headers_buf = &.{};
+ this.metadata_buf = &.{};
+ this.href = "";
+ this.response.headers = &.{};
+ this.response.status = "";
+ }
- read_headers_up_to = @minimum(read_headers_up_to, read_length);
+ pub const Callback = struct {
+ ctx: *anyopaque,
+ function: Function,
- response = picohttp.Response.parseParts(request_body, &this.response_headers_buf, &read_headers_up_to) catch |err| {
- log("read_headers_up_to: {d}", .{read_headers_up_to});
- switch (err) {
- error.ShortRead => continue :restart,
- else => return err,
+ pub const Function = fn (*anyopaque, HTTPClientResult) void;
+
+ pub fn run(self: Callback, result: HTTPClientResult) void {
+ self.function(self.ctx, result);
+ }
+
+ pub fn New(comptime Type: type, comptime callback: anytype) type {
+ return struct {
+ pub fn init(this: Type) Callback {
+ return Callback{
+ .ctx = this,
+ .function = @This().wrapped_callback,
+ };
+ }
+
+ pub fn wrapped_callback(ptr: *anyopaque, result: HTTPClientResult) void {
+ var casted = @ptrCast(Type, @alignCast(std.meta.alignment(Type), ptr));
+ @call(.{ .modifier = .always_inline }, callback, .{ casted, result });
}
};
- break :restart;
}
+ };
+};
+
+pub fn toResult(this: *HTTPClient) HTTPClientResult {
+ var builder_ = StringBuilder{};
+ var builder = &builder_;
+ this.state.pending_response.count(builder);
+ builder.count(this.url.href);
+ builder.allocate(bun.default_allocator) catch unreachable;
+ var headers_buf = bun.default_allocator.alloc(picohttp.Header, this.state.pending_response.headers.len) catch unreachable;
+ const response = this.state.pending_response.clone(headers_buf, builder);
+ const href = builder.append(this.url.href);
+
+ return HTTPClientResult{
+ .body = this.state.body_out_str,
+ .response = response,
+ .metadata_buf = builder.ptr.?[0..builder.cap],
+ .href = href,
+ .fail = this.state.fail,
+ .headers_buf = headers_buf,
+ };
+}
+
+pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool {
+ var buffer = this.state.getBodyBuffer();
+
+ const remaining_content_length = this.state.body_size - buffer.list.items.len;
+ var remainder = incoming_data[0..@minimum(incoming_data.len, remaining_content_length)];
+
+ _ = try buffer.write(remainder);
+
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(buffer.list.items.len);
+ progress.context.maybeRefresh();
}
- if (read_length == 0) {
- return error.NoData;
+
+ if (buffer.list.items.len == this.state.body_size) {
+ try this.state.processBodyBuffer(buffer.*);
+
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(buffer.list.items.len);
+ progress.context.maybeRefresh();
+ }
+ return true;
}
- body_out_str.reset();
- var content_length: u32 = 0;
- var encoding = Encoding.identity;
- var transfer_encoding = Encoding.identity;
+ return false;
+}
- var location: string = "";
+pub fn handleResponseBodyChunk(
+ this: *HTTPClient,
+ incoming_data: []const u8,
+) !bool {
+ var decoder = &this.state.chunked_decoder;
+ var buffer_ = this.state.getBodyBuffer();
+ var buffer = buffer_.*;
+ this.state.chunked_offset += incoming_data.len;
+ try buffer.appendSlice(incoming_data);
+
+ // set consume_trailer to 1 to discard the trailing header
+ // using content-encoding per chunk is not supported
+ decoder.consume_trailer = 1;
+
+ // these variable names are terrible
+ // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked
+ // (but ported from C -> zig)
+ const pret = picohttp.phr_decode_chunked(
+ decoder,
+ buffer.list.items.ptr,
+
+ // this represents the position that we are currently at in the buffer
+ &this.state.chunked_offset,
+ );
+
+ switch (pret) {
+ // Invalid HTTP response body
+ -1 => {
+ return error.InvalidHTTPResponse;
+ },
+ // Needs more data
+ -2 => {
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(buffer.list.items.len);
+ progress.context.maybeRefresh();
+ }
- var pretend_its_304 = false;
- var maybe_keepalive = false;
- errdefer {
- maybe_keepalive = false;
+ if (this.state.compressed_body) |compressed| {
+ compressed.* = buffer;
+ } else {
+ this.state.body_out_str.?.* = buffer;
+ }
+
+ return false;
+ },
+ // Done
+ else => {
+ try this.state.processBodyBuffer(
+ buffer,
+ );
+
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(buffer.list.items.len);
+ progress.context.maybeRefresh();
+ }
+
+ return true;
+ },
}
- var content_encoding_i = response.headers.len + 1;
+}
+pub fn handleResponseMetadata(
+ this: *HTTPClient,
+ response: picohttp.Response,
+ deferred_redirect: *?*URLBufferPool.Node,
+) !bool {
+ var location: string = "";
+ var pretend_304 = false;
for (response.headers) |header, header_i| {
switch (hashHeaderName(header.name)) {
content_length_header_hash => {
- content_length = std.fmt.parseInt(u32, header.value, 10) catch 0;
- try body_out_str.inflate(content_length);
- body_out_str.list.expandToCapacity();
- this.body_size = content_length;
+ const content_length = std.fmt.parseInt(@TypeOf(this.state.body_size), header.value, 10) catch 0;
+ this.state.body_size = content_length;
},
content_encoding_hash => {
if (strings.eqlComptime(header.value, "gzip")) {
- encoding = Encoding.gzip;
- content_encoding_i = header_i;
+ this.state.encoding = Encoding.gzip;
+ this.state.content_encoding_i = @truncate(u8, header_i);
} else if (strings.eqlComptime(header.value, "deflate")) {
- encoding = Encoding.deflate;
- content_encoding_i = header_i;
+ this.state.encoding = Encoding.deflate;
+ this.state.content_encoding_i = @truncate(u8, header_i);
} else if (!strings.eqlComptime(header.value, "identity")) {
return error.UnsupportedContentEncoding;
}
},
transfer_encoding_header => {
if (strings.eqlComptime(header.value, "gzip")) {
- transfer_encoding = Encoding.gzip;
+ this.state.transfer_encoding = Encoding.gzip;
} else if (strings.eqlComptime(header.value, "deflate")) {
- transfer_encoding = Encoding.deflate;
+ this.state.transfer_encoding = Encoding.deflate;
} else if (strings.eqlComptime(header.value, "identity")) {
- transfer_encoding = Encoding.identity;
+ this.state.transfer_encoding = Encoding.identity;
} else if (strings.eqlComptime(header.value, "chunked")) {
- transfer_encoding = Encoding.chunked;
+ this.state.transfer_encoding = Encoding.chunked;
} else {
return error.UnsupportedTransferEncoding;
}
@@ -869,17 +1679,13 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
},
hashHeaderName("Connection") => {
if (response.status_code >= 200 and response.status_code <= 299 and !KeepAlive.disabled) {
- if (strings.eqlComptime(header.value, "keep-alive")) {
- maybe_keepalive = true;
+ if (!strings.eqlComptime(header.value, "keep-alive")) {
+ this.state.allow_keepalive = false;
}
}
},
hashHeaderName("Last-Modified") => {
- if (this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0) {
- if (strings.eql(this.if_modified_since, header.value)) {
- pretend_its_304 = true;
- }
- }
+ pretend_304 = this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0 and strings.eql(this.if_modified_since, header.value);
},
else => {},
@@ -890,17 +1696,26 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
Output.prettyErrorln("Response: {s}", .{response});
}
+ this.state.pending_response = response;
+ if (pretend_304) {
+ this.state.pending_response.status_code = 304;
+ }
+
if (location.len > 0 and this.remaining_redirect_count > 0) {
- switch (response.status_code) {
+ switch (this.state.pending_response.status_code) {
302, 301, 307, 308, 303 => {
if (strings.indexOf(location, "://")) |i| {
- var url_buf = this.redirect orelse URLBufferPool.get(default_allocator);
+ var url_buf = URLBufferPool.get(default_allocator);
const protocol_name = location[0..i];
if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else {
return error.UnsupportedRedirectProtocol;
}
+ if (location.len > url_buf.data.len) {
+ return error.RedirectURLTooLong;
+ }
+ deferred_redirect.* = this.redirect;
std.mem.copy(u8, &url_buf.data, location);
this.url = URL.parse(url_buf.data[0..location.len]);
this.redirect = url_buf;
@@ -913,10 +1728,7 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
.{ original_url.displayProtocol(), original_url.displayHostname(), location },
) catch return error.RedirectURLTooLong);
- if (this.redirect) |red| {
- red.release();
- }
-
+ deferred_redirect.* = this.redirect;
this.redirect = url_buf;
}
@@ -933,323 +1745,9 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
}
}
- body_getter: {
- if (pretend_its_304) {
- response.status_code = 304;
- }
-
- if (response.status_code == 304) break :body_getter;
-
- if (transfer_encoding == Encoding.chunked) {
- maybe_keepalive = false;
- var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder);
- var buffer_: *MutableString = body_out_str;
-
- switch (encoding) {
- Encoding.gzip, Encoding.deflate => {
- if (!ZlibPool.loaded) {
- ZlibPool.instance = ZlibPool.init(default_allocator);
- ZlibPool.loaded = true;
- }
-
- buffer_ = try ZlibPool.instance.get();
- },
- else => {},
- }
-
- var buffer = buffer_.*;
-
- var last_read: usize = 0;
- {
- const buffered_amount = client.bufferedReadAmount();
- if (buffered_amount > 0) {
- var end = request_buffer[read_length..];
- if (buffered_amount <= end.len) {
- std.debug.assert(client.read(end, buffered_amount) catch unreachable == buffered_amount);
- response.bytes_read += @intCast(c_int, buffered_amount);
- }
- }
- var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length];
- last_read = remainder.len;
- try buffer.inflate(@maximum(remainder.len, 2048));
- buffer.list.expandToCapacity();
- std.mem.copy(u8, buffer.list.items, remainder);
- }
-
- // set consume_trailer to 1 to discard the trailing header
- // using content-encoding per chunk is not supported
- decoder.consume_trailer = 1;
-
- // these variable names are terrible
- // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked
- // (but ported from C -> zig)
- var rret: usize = 0;
- var rsize: usize = last_read;
- var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize);
- var total_size = rsize;
-
- while (pret == -2) {
- var buffered_amount = client.bufferedReadAmount();
- if (buffer.list.items.len < total_size + 512 or buffer.list.items[total_size..].len < @intCast(usize, @maximum(decoder.bytes_left_in_chunk, buffered_amount)) or buffer.list.items[total_size..].len < 512) {
- try buffer.inflate(@maximum((buffered_amount + total_size) * 2, 1024));
- buffer.list.expandToCapacity();
- }
-
- // while (true) {
-
- var remainder = buffer.list.items[total_size..];
- const errorable_read = client.read(remainder, 0);
-
- rret = errorable_read catch |err| {
- if (extremely_verbose) Output.prettyErrorln("Chunked transfer encoding error: {s}", .{@errorName(err)});
- return err;
- };
-
- buffered_amount = client.bufferedReadAmount();
- if (buffered_amount > 0) {
- try buffer.list.ensureTotalCapacity(default_allocator, rret + total_size + buffered_amount);
- buffer.list.expandToCapacity();
- remainder = buffer.list.items[total_size..];
- remainder = remainder[rret..][0..buffered_amount];
- rret += client.read(remainder, 0) catch |err| {
- if (extremely_verbose) Output.prettyErrorln("Chunked transfer encoding error: {s}", .{@errorName(err)});
- return err;
- };
- }
-
- // socket hang up, there was a parsing error, etc
- if (rret == 0) {
- if (extremely_verbose) Output.prettyErrorln("Unexpected 0", .{});
-
- return error.ChunkedEncodingError;
- }
-
- rsize = rret;
- pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize);
- if (pret == -1) {
- if (extremely_verbose)
- Output.prettyErrorln(
- \\ buffered: {d}
- \\ rsize: {d}
- \\ Read: {d} bytes / {d} total ({d} parsed)
- \\ Chunk {d} left
- \\ {}
- , .{
- client.bufferedReadAmount(),
- rsize,
- rret,
- buffer.list.items.len,
- total_size,
- decoder.bytes_left_in_chunk,
-
- decoder,
- });
-
- return error.ChunkedEncodingParseError;
- }
- total_size += rsize;
-
- if (comptime report_progress) {
- this.progress_node.?.activate();
- this.progress_node.?.setCompletedItems(total_size);
- this.progress_node.?.context.maybeRefresh();
- }
- }
-
- buffer.list.shrinkRetainingCapacity(total_size);
- buffer_.* = buffer;
- switch (encoding) {
- Encoding.gzip, Encoding.deflate => {
- var gzip_timer: std.time.Timer = undefined;
-
- if (extremely_verbose)
- gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
-
- body_out_str.list.expandToCapacity();
- defer ZlibPool.instance.put(buffer_) catch unreachable;
- ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| {
- Output.prettyErrorln("<r><red>Zlib error<r>", .{});
- Output.flush();
- return err;
- };
-
- if (extremely_verbose)
- this.gzip_elapsed = gzip_timer.read();
-
- // if it compressed with this header, it is no longer
- if (content_encoding_i < response.headers.len) {
- var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
- _ = mutable_headers.swapRemove(content_encoding_i);
- response.headers = mutable_headers.items;
- }
- },
- else => {},
- }
-
- if (comptime report_progress) {
- this.progress_node.?.activate();
- this.progress_node.?.setCompletedItems(body_out_str.list.items.len);
- this.progress_node.?.context.maybeRefresh();
- }
-
- this.body_size = @truncate(u32, body_out_str.list.items.len);
-
- return response;
- }
-
- if (content_length > 0) {
- var remaining_content_length = content_length;
- var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length];
- remainder = remainder[0..std.math.min(remainder.len, content_length)];
- var buffer_: *MutableString = body_out_str;
-
- switch (encoding) {
- Encoding.gzip, Encoding.deflate => {
- if (!ZlibPool.loaded) {
- ZlibPool.instance = ZlibPool.init(default_allocator);
- ZlibPool.loaded = true;
- }
-
- buffer_ = try ZlibPool.instance.get();
- if (buffer_.list.capacity < remaining_content_length) {
- try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length);
- }
- buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length];
- },
- else => {},
- }
- var buffer = buffer_.*;
-
- var body_size: usize = 0;
- if (remainder.len > 0) {
- std.mem.copy(u8, buffer.list.items, remainder);
- body_size = remainder.len;
- this.read_count += @intCast(u32, body_size);
- remaining_content_length -= @intCast(u32, remainder.len);
- }
-
- while (remaining_content_length > 0) {
- const size = @intCast(u32, try client.read(
- buffer.list.items,
- body_size,
- ));
- this.read_count += size;
- if (size == 0) break;
-
- body_size += size;
- remaining_content_length -= size;
-
- if (comptime report_progress) {
- this.progress_node.?.activate();
- this.progress_node.?.setCompletedItems(body_size);
- this.progress_node.?.context.maybeRefresh();
- }
- }
-
- if (comptime report_progress) {
- this.progress_node.?.activate();
- this.progress_node.?.setCompletedItems(body_size);
- this.progress_node.?.context.maybeRefresh();
- }
-
- buffer.list.shrinkRetainingCapacity(body_size);
- buffer_.* = buffer;
-
- switch (encoding) {
- Encoding.gzip, Encoding.deflate => {
- var gzip_timer: std.time.Timer = undefined;
-
- if (extremely_verbose)
- gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
-
- body_out_str.list.expandToCapacity();
- defer ZlibPool.instance.put(buffer_) catch unreachable;
- ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| {
- Output.prettyErrorln("<r><red>Zlib error<r>", .{});
- Output.flush();
- return err;
- };
-
- if (extremely_verbose)
- this.gzip_elapsed = gzip_timer.read();
-
- // if it compressed with this header, it is no longer
- if (content_encoding_i < response.headers.len) {
- var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
- _ = mutable_headers.swapRemove(content_encoding_i);
- response.headers = mutable_headers.items;
- }
- },
- else => {},
- }
- }
- }
-
- if (comptime report_progress) {
- this.progress_node.?.activate();
- this.progress_node.?.setCompletedItems(body_out_str.list.items.len);
- this.progress_node.?.context.maybeRefresh();
- }
-
- if (maybe_keepalive and response.status_code >= 200 and response.status_code < 300) {
- this.received_keep_alive = true;
- }
-
- return response;
-}
+ this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
-pub fn closeSocket(this: *HTTPClient) void {
- if (this.received_keep_alive) {
- this.received_keep_alive = false;
- if (this.url.hostname.len > 0 and this.socket.socket.socket > 0) {
- if (!this.socket.connect_frame.wait and
- (!this.socket.ssl_bio_loaded or
- (this.socket.ssl_bio.pending_sends == 0 and this.socket.ssl_bio.pending_reads == 0)))
- {
- if (KeepAlive.instance.append(this.url.hostname, this.url.getPortAuto(), this.socket.socket.socket)) {
- this.socket.socket.socket = 0;
- }
- }
- }
- }
- this.socket.close();
-}
-
-pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response {
- this.socket = try AsyncSocket.SSL.init(default_allocator, &AsyncIO.global);
- this.socket_loaded = true;
-
- var socket = &this.socket;
- this.stage = Stage.connect;
- try this.connect(*AsyncSocket.SSL, socket);
- this.stage = Stage.request;
- defer this.closeSocket();
-
- var request = buildRequest(this, body_str.len);
- if (this.verbose) {
- Output.prettyErrorln("{s}", .{request});
- }
-
- try writeRequest(@TypeOf(socket), socket, request, body_str);
- _ = try socket.send();
-
- this.stage = Stage.response;
-
- if (this.progress_node == null) {
- return this.processResponse(
- false,
- @TypeOf(socket),
- socket,
- body_out_str,
- );
- } else {
- return this.processResponse(
- true,
- @TypeOf(socket),
- socket,
- body_out_str,
- );
- }
+ return this.method.hasBody() and (this.state.body_size > 0 or this.state.transfer_encoding == .chunked);
}
// // zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec
diff --git a/src/install/install.zig b/src/install/install.zig
index a1523b076..f0929cd3d 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -40,6 +40,7 @@ const URL = @import("../url.zig").URL;
const AsyncHTTP = @import("http").AsyncHTTP;
const HTTPChannel = @import("http").HTTPChannel;
const NetworkThread = @import("http").NetworkThread;
+const HTTP = @import("http");
const Integrity = @import("./integrity.zig").Integrity;
const clap = @import("clap");
@@ -183,9 +184,9 @@ const NetworkTask = struct {
binlink: void,
},
- pub fn notify(http: *AsyncHTTP) void {
+ pub fn notify(this: *NetworkTask, _: anytype) void {
defer PackageManager.instance.wake();
- PackageManager.instance.network_channel.writeItem(@fieldParentPtr(NetworkTask, "http", http)) catch {};
+ PackageManager.instance.network_channel.writeItem(this) catch {};
}
// We must use a less restrictive Accept header value
@@ -319,7 +320,7 @@ const NetworkTask = struct {
this.request_buffer = try MutableString.init(allocator, 0);
this.response_buffer = try MutableString.init(allocator, 0);
this.allocator = allocator;
- this.http = try AsyncHTTP.init(
+ this.http = AsyncHTTP.init(
allocator,
.GET,
URL.parse(this.url_buf),
@@ -328,6 +329,7 @@ const NetworkTask = struct {
&this.response_buffer,
&this.request_buffer,
0,
+ this.getCompletionCallback(),
);
this.http.max_retry_count = PackageManager.instance.options.max_retry_count;
this.callback = .{
@@ -347,8 +349,10 @@ const NetworkTask = struct {
this.http.client.force_last_modified = true;
this.http.client.if_modified_since = last_modified;
}
+ }
- this.http.callback = notify;
+ pub fn getCompletionCallback(this: *NetworkTask) HTTP.HTTPClientResult.Callback {
+ return HTTP.HTTPClientResult.Callback.New(*NetworkTask, notify).init(this);
}
pub fn schedule(this: *NetworkTask, batch: *ThreadPool.Batch) void {
@@ -399,7 +403,7 @@ const NetworkTask = struct {
header_buf = header_builder.content.ptr.?[0..header_builder.content.len];
}
- this.http = try AsyncHTTP.init(
+ this.http = AsyncHTTP.init(
allocator,
.GET,
URL.parse(this.url_buf),
@@ -408,8 +412,8 @@ const NetworkTask = struct {
&this.response_buffer,
&this.request_buffer,
0,
+ this.getCompletionCallback(),
);
- this.http.callback = notify;
this.http.max_retry_count = PackageManager.instance.options.max_retry_count;
this.callback = .{ .extract = tarball };
}
@@ -2424,7 +2428,7 @@ pub const PackageManager = struct {
manager.pending_tasks += @truncate(u32, count);
manager.total_tasks += @truncate(u32, count);
manager.network_resolve_batch.push(manager.network_tarball_batch);
- NetworkThread.global.schedule(manager.network_resolve_batch);
+ HTTP.http_thread.schedule(manager.network_resolve_batch);
manager.network_tarball_batch = .{};
manager.network_resolve_batch = .{};
return count;
@@ -2463,7 +2467,7 @@ pub const PackageManager = struct {
this.pending_tasks += @truncate(u32, count);
this.total_tasks += @truncate(u32, count);
this.network_resolve_batch.push(this.network_tarball_batch);
- NetworkThread.global.schedule(this.network_resolve_batch);
+ HTTP.http_thread.schedule(this.network_resolve_batch);
this.network_tarball_batch = .{};
this.network_resolve_batch = .{};
}
@@ -2831,7 +2835,7 @@ pub const PackageManager = struct {
manager.total_tasks += @truncate(u32, count);
manager.thread_pool.schedule(batch);
manager.network_resolve_batch.push(manager.network_tarball_batch);
- NetworkThread.global.schedule(manager.network_resolve_batch);
+ HTTP.http_thread.schedule(manager.network_resolve_batch);
manager.network_tarball_batch = .{};
manager.network_resolve_batch = .{};
@@ -3611,7 +3615,7 @@ pub const PackageManager = struct {
cli: CommandLineArguments,
) !*PackageManager {
// assume that spawning a thread will take a lil so we do that asap
- try NetworkThread.warmup();
+ try HTTP.HTTPThread.init();
if (cli.global) {
var explicit_global_dir: string = "";
diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig
index 7906dd655..78bf30290 100644
--- a/src/io/io_darwin.zig
+++ b/src/io/io_darwin.zig
@@ -501,13 +501,16 @@ pub const Waker = struct {
kq: os.fd_t,
machport: *anyopaque = undefined,
machport_buf: []u8 = &.{},
+ has_pending_wake: bool = false,
const zeroed = std.mem.zeroes([16]Kevent64);
- pub fn wake(this: Waker) !void {
- if (!io_darwin_schedule_wakeup(this.machport)) {
- return error.WakeUpFailed;
+ pub fn wake(this: *Waker) !void {
+ if (io_darwin_schedule_wakeup(this.machport)) {
+ this.has_pending_wake = false;
+ return;
}
+ this.has_pending_wake = true;
}
pub fn wait(this: Waker) !usize {
diff --git a/src/network_thread.zig b/src/network_thread.zig
index aa6fcf15d..56ea6a698 100644
--- a/src/network_thread.zig
+++ b/src/network_thread.zig
@@ -26,6 +26,79 @@ pub var global: NetworkThread = undefined;
pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0);
const log = Output.scoped(.NetworkThread, true);
+const Global = @import("./global.zig").Global;
+pub fn onStartIOThread(waker: AsyncIO.Waker) void {
+ NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(@import("./global.zig").default_allocator);
+ AsyncIO.global = AsyncIO.init(1024, 0, waker) catch |err| {
+ log: {
+ if (comptime Environment.isLinux) {
+ if (err == error.SystemOutdated) {
+ Output.prettyErrorln(
+ \\<red>error<r>: Linux kernel version doesn't support io_uring, which Bun depends on.
+ \\
+ \\ To fix this error: please upgrade to a newer Linux kernel.
+ \\
+ \\ If you're using Windows Subsystem for Linux, here's how:
+ \\ 1. Open PowerShell as an administrator
+ \\ 2. Run this:
+ \\ wsl --update
+ \\ wsl --shutdown
+ \\
+ \\ Please make sure you're using WSL version 2 (not WSL 1). To check: wsl -l -v
+ \\ If you are on WSL 1, update to WSL 2 with the following commands:
+ \\ 1. wsl --set-default-version 2
+ \\ 2. wsl --set-version [distro_name] 2
+ \\ 3. Now follow the WSL 2 instructions above.
+ \\ Where [distro_name] is one of the names from the list given by: wsl -l -v
+ \\
+ \\ If that doesn't work (and you're on a Windows machine), try this:
+ \\ 1. Open Windows Update
+ \\ 2. Download any updates to Windows Subsystem for Linux
+ \\
+ \\ If you're still having trouble, ask for help in bun's discord https://bun.sh/discord
+ , .{});
+ break :log;
+ } else if (err == error.SystemResources) {
+ Output.prettyErrorln(
+ \\<red>error<r>: memlock limit exceeded
+ \\
+ \\To fix this error: <b>please increase the memlock limit<r> or upgrade to Linux kernel 5.11+
+ \\
+ \\If Bun is running inside Docker, make sure to set the memlock limit to unlimited (-1)
+ \\
+ \\ docker run --rm --init --ulimit memlock=-1:-1 jarredsumner/bun:edge
+ \\
+ \\To bump the memlock limit, check one of the following:
+ \\ /etc/security/limits.conf
+ \\ /etc/systemd/user.conf
+ \\ /etc/systemd/system.conf
+ \\
+ \\You can also try running bun as root.
+ \\
+ \\If running many copies of Bun via exec or spawn, be sure that O_CLOEXEC is set so
+ \\that resources are not leaked when the child process exits.
+ \\
+ \\Why does this happen?
+ \\
+ \\Bun uses io_uring and io_uring accounts memory it
+ \\needs under the rlimit memlocked option, which can be
+ \\quite low on some setups (64K).
+ \\
+ \\
+ , .{});
+ break :log;
+ }
+ }
+
+ Output.prettyErrorln("<r><red>error<r>: Failed to initialize network thread: <red><b>{s}<r>.\nHTTP requests will not work. Please file an issue and run strace().", .{@errorName(err)});
+ }
+
+ Global.exit(1);
+ };
+ AsyncIO.global_loaded = true;
+ NetworkThread.global.io = &AsyncIO.global;
+ NetworkThread.global.processEvents();
+}
fn queueEvents(this: *@This()) void {
this.queued_tasks_mutex.lock();
@@ -41,6 +114,7 @@ pub fn processEvents(this: *@This()) void {
processEvents_(this) catch {};
unreachable;
}
+
/// Should only be called on the HTTP thread!
fn processEvents_(this: *@This()) !void {
while (true) {
@@ -164,7 +238,7 @@ pub fn init() !void {
@compileLog("TODO: Waker");
}
- global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{
+ global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, onStartIOThread, .{
global.waker,
});
global.thread.detach();