aboutsummaryrefslogtreecommitdiff
path: root/src/http_client_async.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2023-05-31 17:20:30 -0700
committerGravatar GitHub <noreply@github.com> 2023-05-31 17:20:30 -0700
commit4c0145437679f879329df69c9a56e395e74e8280 (patch)
tree77c317891d68344a0f04ac44ed90804793ec3a0e /src/http_client_async.zig
parent611f1d0e241d264747dda7288c018ab383906ccc (diff)
downloadbun-4c0145437679f879329df69c9a56e395e74e8280.tar.gz
bun-4c0145437679f879329df69c9a56e395e74e8280.tar.zst
bun-4c0145437679f879329df69c9a56e395e74e8280.zip
Make uploading files with `fetch()`fast (#3125)
* Make file uploads fast * Add benchmark * Update README.md * defaults * print * prettier * smaller * fix(path) fix parse behavior (#3134) * Add macro docs (#3139) * Add macro doc * Updates * Tweaks * Update doc * Update macro serialization doc * Update macro doc * `--no-macros` flag, disable macros in node_modules * invert base/filename internally (#3141) * always false * Fix broken test * Add a test sendfile() test with large file --------- Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com> Co-authored-by: Colin McDonnell <colinmcd94@gmail.com>
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r--src/http_client_async.zig174
1 files changed, 152 insertions, 22 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 4e0926baa..fe5f34f48 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -71,6 +71,89 @@ pub const FetchRedirect = enum(u8) {
});
};
+pub const HTTPRequestBody = union(enum) {
+ bytes: []const u8,
+ sendfile: Sendfile,
+
+ pub fn len(this: *const HTTPRequestBody) usize {
+ return switch (this.*) {
+ .bytes => this.bytes.len,
+ .sendfile => this.sendfile.content_size,
+ };
+ }
+};
+
+pub const Sendfile = struct {
+ fd: bun.FileDescriptor,
+ remain: usize = 0,
+ offset: usize = 0,
+ content_size: usize = 0,
+
+ pub fn isEligible(url: bun.URL) bool {
+ return url.isHTTP() and url.href.len > 0 and FeatureFlags.streaming_file_uploads_for_http_client;
+ }
+
+ pub fn write(
+ this: *Sendfile,
+ socket: NewHTTPContext(false).HTTPSocket,
+ ) Status {
+ const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63)));
+ // TODO we should not need this int cast; improve the return type of `@min`
+ const adjusted_count = @intCast(u63, adjusted_count_temporary);
+
+ if (Environment.isLinux) {
+ var signed_offset = @intCast(i64, this.offset);
+ const begin = this.offset;
+ const val =
+ // this does the syscall directly, without libc
+ std.os.linux.sendfile(socket.fd(), this.fd, &signed_offset, this.remain);
+ this.offset = @intCast(u64, signed_offset);
+
+ const errcode = std.os.linux.getErrno(val);
+
+ this.remain -|= @intCast(u64, this.offset -| begin);
+
+ if (errcode != .SUCCESS or this.remain == 0 or val == 0) {
+ if (errcode == .SUCCESS) {
+ return .{ .done = {} };
+ }
+
+ return .{ .err = AsyncIO.asError(errcode) };
+ }
+ } else {
+ var sbytes: std.os.off_t = adjusted_count;
+ const signed_offset = @bitCast(i64, @as(u64, this.offset));
+ const errcode = std.c.getErrno(std.c.sendfile(
+ this.fd,
+ socket.fd(),
+
+ signed_offset,
+ &sbytes,
+ null,
+ 0,
+ ));
+ const wrote = @intCast(u64, sbytes);
+ this.offset +|= wrote;
+ this.remain -|= wrote;
+ if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) {
+ if (errcode == .SUCCESS) {
+ return .{ .done = {} };
+ }
+
+ return .{ .err = AsyncIO.asError(errcode) };
+ }
+ }
+
+ return .{ .again = {} };
+ }
+
+ pub const Status = union(enum) {
+ done: void,
+ err: anyerror,
+ again: void,
+ };
+};
+
const ProxySSLData = struct {
buffer: std.ArrayList(u8),
partial: bool,
@@ -738,7 +821,7 @@ pub fn onClose(
if (client.allow_retry) {
client.allow_retry = false;
- client.start(client.state.request_body, client.state.body_out_str.?);
+ client.start(client.state.original_request_body, client.state.body_out_str.?);
return;
}
@@ -915,14 +998,16 @@ pub const InternalState = struct {
compressed_body: MutableString = undefined,
body_size: usize = 0,
request_body: []const u8 = "",
+ original_request_body: HTTPRequestBody = .{ .bytes = "" },
request_sent_len: usize = 0,
fail: anyerror = error.NoError,
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
- pub fn init(body: []const u8, body_out_str: *MutableString) InternalState {
+ pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
return .{
- .request_body = body,
+ .original_request_body = body,
+ .request_body = if (body == .bytes) body.bytes else "",
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.body_out_str = body_out_str,
@@ -942,6 +1027,7 @@ pub const InternalState = struct {
.body_out_str = body_msg,
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
+ .original_request_body = .{ .bytes = "" },
.request_body = "",
};
}
@@ -1191,7 +1277,7 @@ pub const AsyncHTTP = struct {
request_headers: Headers.Entries = Headers.Entries{},
response_headers: Headers.Entries = Headers.Entries{},
response_buffer: *MutableString,
- request_body: []const u8 = "",
+ request_body: HTTPRequestBody = .{ .bytes = "" },
allocator: std.mem.Allocator,
request_header_buf: string = "",
method: Method = Method.GET,
@@ -1278,7 +1364,18 @@ pub const AsyncHTTP = struct {
hostname: ?[]u8,
redirect_type: FetchRedirect,
) AsyncHTTP {
- var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0 };
+ var this = AsyncHTTP{
+ .allocator = allocator,
+ .url = url,
+ .method = method,
+ .request_headers = headers,
+ .request_header_buf = headers_buf,
+ .request_body = .{ .bytes = request_body },
+ .response_buffer = response_buffer,
+ .completion_callback = callback,
+ .http_proxy = http_proxy,
+ .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
+ };
this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname);
this.client.async_http_id = this.async_http_id;
@@ -1648,7 +1745,7 @@ pub fn doRedirect(this: *HTTPClient) void {
if (this.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
- return this.start("", body_out_str);
+ return this.start(.{ .bytes = "" }, body_out_str);
}
pub fn isHTTPS(this: *HTTPClient) bool {
if (this.http_proxy) |proxy| {
@@ -1662,7 +1759,7 @@ pub fn isHTTPS(this: *HTTPClient) bool {
}
return false;
}
-pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void {
+pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void {
body_out_str.reset();
std.debug.assert(this.state.response_message_buffer.list.capacity == 0);
@@ -1730,7 +1827,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
this.setTimeout(socket, 60);
- const request = this.buildRequest(this.state.request_body.len);
+ const request = this.buildRequest(this.state.original_request_body.len());
if (this.http_proxy) |_| {
if (this.url.isHTTPS()) {
@@ -1784,7 +1881,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
std.debug.assert(!socket.isShutdown());
std.debug.assert(!socket.isClosed());
}
- const amount = socket.write(to_send, false);
+ const amount = socket.write(
+ to_send,
+ false,
+ );
if (comptime is_first_call) {
if (amount == 0) {
// don't worry about it
@@ -1804,7 +1904,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
}
- const has_sent_body = this.state.request_body.len == 0;
+ const has_sent_body = if (this.state.original_request_body == .bytes)
+ this.state.request_body.len == 0
+ else
+ false;
if (has_sent_headers and has_sent_body) {
this.state.request_stage = .done;
@@ -1813,7 +1916,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
if (has_sent_headers) {
this.state.request_stage = .body;
- std.debug.assert(this.state.request_body.len > 0);
+ std.debug.assert(
+ // we should have leftover data OR we use sendfile()
+ (this.state.original_request_body == .bytes and this.state.request_body.len > 0) or
+ this.state.original_request_body == .sendfile,
+ );
// we sent everything, but there's some body leftover
if (amount == @intCast(c_int, to_send.len)) {
@@ -1826,19 +1933,42 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
.body => {
this.setTimeout(socket, 60);
- const to_send = this.state.request_body;
- const amount = socket.write(to_send, true);
- if (amount < 0) {
- this.closeAndFail(error.WriteFailed, is_ssl, socket);
- return;
- }
+ switch (this.state.original_request_body) {
+ .bytes => {
+ const to_send = this.state.request_body;
+ const amount = socket.write(to_send, true);
+ if (amount < 0) {
+ this.closeAndFail(error.WriteFailed, is_ssl, socket);
+ return;
+ }
- this.state.request_sent_len += @intCast(usize, amount);
- this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
+ this.state.request_sent_len += @intCast(usize, amount);
+ this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
- if (this.state.request_body.len == 0) {
- this.state.request_stage = .done;
- return;
+ if (this.state.request_body.len == 0) {
+ this.state.request_stage = .done;
+ return;
+ }
+ },
+ .sendfile => |*sendfile| {
+ if (comptime is_ssl) {
+ @panic("sendfile is only supported without SSL. This code should never have been reached!");
+ }
+
+ switch (sendfile.write(socket)) {
+ .done => {
+ this.state.request_stage = .done;
+ return;
+ },
+ .err => |err| {
+ this.closeAndFail(err, false, socket);
+ return;
+ },
+ .again => {
+ socket.markNeedsMoreForSendfile();
+ },
+ }
+ },
}
},
.proxy_body => {