diff options
author | 2023-05-31 17:20:30 -0700 | |
---|---|---|
committer | 2023-05-31 17:20:30 -0700 | |
commit | 4c0145437679f879329df69c9a56e395e74e8280 (patch) | |
tree | 77c317891d68344a0f04ac44ed90804793ec3a0e /src/http_client_async.zig | |
parent | 611f1d0e241d264747dda7288c018ab383906ccc (diff) | |
download | bun-4c0145437679f879329df69c9a56e395e74e8280.tar.gz bun-4c0145437679f879329df69c9a56e395e74e8280.tar.zst bun-4c0145437679f879329df69c9a56e395e74e8280.zip |
Make uploading files with `fetch()`fast (#3125)
* Make file uploads fast
* Add benchmark
* Update README.md
* defaults
* print
* prettier
* smaller
* fix(path) fix parse behavior (#3134)
* Add macro docs (#3139)
* Add macro doc
* Updates
* Tweaks
* Update doc
* Update macro serialization doc
* Update macro doc
* `--no-macros` flag, disable macros in node_modules
* invert base/filename internally (#3141)
* always false
* Fix broken test
* Add a test sendfile() test with large file
---------
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com>
Co-authored-by: Colin McDonnell <colinmcd94@gmail.com>
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r-- | src/http_client_async.zig | 174 |
1 files changed, 152 insertions, 22 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 4e0926baa..fe5f34f48 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -71,6 +71,89 @@ pub const FetchRedirect = enum(u8) { }); }; +pub const HTTPRequestBody = union(enum) { + bytes: []const u8, + sendfile: Sendfile, + + pub fn len(this: *const HTTPRequestBody) usize { + return switch (this.*) { + .bytes => this.bytes.len, + .sendfile => this.sendfile.content_size, + }; + } +}; + +pub const Sendfile = struct { + fd: bun.FileDescriptor, + remain: usize = 0, + offset: usize = 0, + content_size: usize = 0, + + pub fn isEligible(url: bun.URL) bool { + return url.isHTTP() and url.href.len > 0 and FeatureFlags.streaming_file_uploads_for_http_client; + } + + pub fn write( + this: *Sendfile, + socket: NewHTTPContext(false).HTTPSocket, + ) Status { + const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63))); + // TODO we should not need this int cast; improve the return type of `@min` + const adjusted_count = @intCast(u63, adjusted_count_temporary); + + if (Environment.isLinux) { + var signed_offset = @intCast(i64, this.offset); + const begin = this.offset; + const val = + // this does the syscall directly, without libc + std.os.linux.sendfile(socket.fd(), this.fd, &signed_offset, this.remain); + this.offset = @intCast(u64, signed_offset); + + const errcode = std.os.linux.getErrno(val); + + this.remain -|= @intCast(u64, this.offset -| begin); + + if (errcode != .SUCCESS or this.remain == 0 or val == 0) { + if (errcode == .SUCCESS) { + return .{ .done = {} }; + } + + return .{ .err = AsyncIO.asError(errcode) }; + } + } else { + var sbytes: std.os.off_t = adjusted_count; + const signed_offset = @bitCast(i64, @as(u64, this.offset)); + const errcode = std.c.getErrno(std.c.sendfile( + this.fd, + socket.fd(), + + signed_offset, + &sbytes, + null, + 0, + )); + const wrote = @intCast(u64, sbytes); + this.offset +|= wrote; + this.remain -|= wrote; + if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) { + if (errcode == .SUCCESS) { + return .{ .done = {} }; + } + + return .{ .err = AsyncIO.asError(errcode) }; + } + } + + return .{ .again = {} }; + } + + pub const Status = union(enum) { + done: void, + err: anyerror, + again: void, + }; +}; + const ProxySSLData = struct { buffer: std.ArrayList(u8), partial: bool, @@ -738,7 +821,7 @@ pub fn onClose( if (client.allow_retry) { client.allow_retry = false; - client.start(client.state.request_body, client.state.body_out_str.?); + client.start(client.state.original_request_body, client.state.body_out_str.?); return; } @@ -915,14 +998,16 @@ pub const InternalState = struct { compressed_body: MutableString = undefined, body_size: usize = 0, request_body: []const u8 = "", + original_request_body: HTTPRequestBody = .{ .bytes = "" }, request_sent_len: usize = 0, fail: anyerror = error.NoError, request_stage: HTTPStage = .pending, response_stage: HTTPStage = .pending, - pub fn init(body: []const u8, body_out_str: *MutableString) InternalState { + pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState { return .{ - .request_body = body, + .original_request_body = body, + .request_body = if (body == .bytes) body.bytes else "", .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, .body_out_str = body_out_str, @@ -942,6 +1027,7 @@ pub const InternalState = struct { .body_out_str = body_msg, .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, + .original_request_body = .{ .bytes = "" }, .request_body = "", }; } @@ -1191,7 +1277,7 @@ pub const AsyncHTTP = struct { request_headers: Headers.Entries = Headers.Entries{}, response_headers: Headers.Entries = Headers.Entries{}, response_buffer: *MutableString, - request_body: []const u8 = "", + request_body: HTTPRequestBody = .{ .bytes = "" }, allocator: std.mem.Allocator, request_header_buf: string = "", method: Method = Method.GET, @@ -1278,7 +1364,18 @@ pub const AsyncHTTP = struct { hostname: ?[]u8, redirect_type: FetchRedirect, ) AsyncHTTP { - var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0 }; + var this = AsyncHTTP{ + .allocator = allocator, + .url = url, + .method = method, + .request_headers = headers, + .request_header_buf = headers_buf, + .request_body = .{ .bytes = request_body }, + .response_buffer = response_buffer, + .completion_callback = callback, + .http_proxy = http_proxy, + .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0, + }; this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname); this.client.async_http_id = this.async_http_id; @@ -1648,7 +1745,7 @@ pub fn doRedirect(this: *HTTPClient) void { if (this.aborted != null) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } - return this.start("", body_out_str); + return this.start(.{ .bytes = "" }, body_out_str); } pub fn isHTTPS(this: *HTTPClient) bool { if (this.http_proxy) |proxy| { @@ -1662,7 +1759,7 @@ pub fn isHTTPS(this: *HTTPClient) bool { } return false; } -pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void { +pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void { body_out_str.reset(); std.debug.assert(this.state.response_message_buffer.list.capacity == 0); @@ -1730,7 +1827,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s this.setTimeout(socket, 60); - const request = this.buildRequest(this.state.request_body.len); + const request = this.buildRequest(this.state.original_request_body.len()); if (this.http_proxy) |_| { if (this.url.isHTTPS()) { @@ -1784,7 +1881,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s std.debug.assert(!socket.isShutdown()); std.debug.assert(!socket.isClosed()); } - const amount = socket.write(to_send, false); + const amount = socket.write( + to_send, + false, + ); if (comptime is_first_call) { if (amount == 0) { // don't worry about it @@ -1804,7 +1904,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..]; } - const has_sent_body = this.state.request_body.len == 0; + const has_sent_body = if (this.state.original_request_body == .bytes) + this.state.request_body.len == 0 + else + false; if (has_sent_headers and has_sent_body) { this.state.request_stage = .done; @@ -1813,7 +1916,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s if (has_sent_headers) { this.state.request_stage = .body; - std.debug.assert(this.state.request_body.len > 0); + std.debug.assert( + // we should have leftover data OR we use sendfile() + (this.state.original_request_body == .bytes and this.state.request_body.len > 0) or + this.state.original_request_body == .sendfile, + ); // we sent everything, but there's some body leftover if (amount == @intCast(c_int, to_send.len)) { @@ -1826,19 +1933,42 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s .body => { this.setTimeout(socket, 60); - const to_send = this.state.request_body; - const amount = socket.write(to_send, true); - if (amount < 0) { - this.closeAndFail(error.WriteFailed, is_ssl, socket); - return; - } + switch (this.state.original_request_body) { + .bytes => { + const to_send = this.state.request_body; + const amount = socket.write(to_send, true); + if (amount < 0) { + this.closeAndFail(error.WriteFailed, is_ssl, socket); + return; + } - this.state.request_sent_len += @intCast(usize, amount); - this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; + this.state.request_sent_len += @intCast(usize, amount); + this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; - if (this.state.request_body.len == 0) { - this.state.request_stage = .done; - return; + if (this.state.request_body.len == 0) { + this.state.request_stage = .done; + return; + } + }, + .sendfile => |*sendfile| { + if (comptime is_ssl) { + @panic("sendfile is only supported without SSL. This code should never have been reached!"); + } + + switch (sendfile.write(socket)) { + .done => { + this.state.request_stage = .done; + return; + }, + .err => |err| { + this.closeAndFail(err, false, socket); + return; + }, + .again => { + socket.markNeedsMoreForSendfile(); + }, + } + }, } }, .proxy_body => { |