diff options
author | 2023-05-31 17:20:30 -0700 | |
---|---|---|
committer | 2023-05-31 17:20:30 -0700 | |
commit | 4c0145437679f879329df69c9a56e395e74e8280 (patch) | |
tree | 77c317891d68344a0f04ac44ed90804793ec3a0e /src | |
parent | 611f1d0e241d264747dda7288c018ab383906ccc (diff) | |
download | bun-4c0145437679f879329df69c9a56e395e74e8280.tar.gz bun-4c0145437679f879329df69c9a56e395e74e8280.tar.zst bun-4c0145437679f879329df69c9a56e395e74e8280.zip |
Make uploading files with `fetch()`fast (#3125)
* Make file uploads fast
* Add benchmark
* Update README.md
* defaults
* print
* prettier
* smaller
* fix(path) fix parse behavior (#3134)
* Add macro docs (#3139)
* Add macro doc
* Updates
* Tweaks
* Update doc
* Update macro serialization doc
* Update macro doc
* `--no-macros` flag, disable macros in node_modules
* invert base/filename internally (#3141)
* always false
* Fix broken test
* Add a test sendfile() test with large file
---------
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com>
Co-authored-by: Colin McDonnell <colinmcd94@gmail.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/webcore/response.zig | 198 | ||||
-rw-r--r-- | src/deps/libuwsockets.cpp | 5 | ||||
-rw-r--r-- | src/deps/uws.zig | 19 | ||||
-rw-r--r-- | src/feature_flags.zig | 2 | ||||
-rw-r--r-- | src/http_client_async.zig | 174 |
5 files changed, 342 insertions, 56 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 1ba0c82cc..ad3857685 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -629,7 +629,7 @@ pub const Fetch = struct { result: HTTPClient.HTTPClientResult = .{}, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, - request_body: AnyBlob = undefined, + request_body: HTTPRequestBody = undefined, response_buffer: MutableString = undefined, request_headers: Headers = Headers{ .allocator = undefined }, promise: JSC.JSPromise.Strong, @@ -647,6 +647,38 @@ pub const Fetch = struct { abort_reason: JSValue = JSValue.zero, // Custom Hostname hostname: ?[]u8 = null, + + pub const HTTPRequestBody = union(enum) { + AnyBlob: AnyBlob, + Sendfile: HTTPClient.Sendfile, + + pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store { + return switch (this.*) { + .AnyBlob => this.AnyBlob.store(), + else => null, + }; + } + + pub fn slice(this: *const HTTPRequestBody) []const u8 { + return switch (this.*) { + .AnyBlob => this.AnyBlob.slice(), + else => "", + }; + } + + pub fn detach(this: *HTTPRequestBody) void { + switch (this.*) { + .AnyBlob => this.AnyBlob.detach(), + .Sendfile => { + if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0) + _ = JSC.Node.Syscall.close(this.Sendfile.fd); + this.Sendfile.offset = 0; + this.Sendfile.remain = 0; + }, + } + } + }; + pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { return FetchTasklet{}; } @@ -850,12 +882,26 @@ pub const Fetch = struct { proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url); } - fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New( - *FetchTasklet, - FetchTasklet.callback, - ).init( - fetch_tasklet, - ), proxy, if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, fetch_options.hostname, fetch_options.redirect_type); + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( + allocator, + fetch_options.method, + fetch_options.url, + fetch_options.headers.entries, + fetch_options.headers.buf.items, + &fetch_tasklet.response_buffer, + fetch_tasklet.request_body.slice(), + fetch_options.timeout, + HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + fetch_tasklet, + ), + proxy, + if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, + fetch_options.hostname, + fetch_options.redirect_type, + ); if (fetch_options.redirect_type != FetchRedirect.follow) { fetch_tasklet.http.?.client.remaining_redirect_count = 0; @@ -865,6 +911,12 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; + if (fetch_tasklet.request_body == .Sendfile) { + std.debug.assert(fetch_options.url.isHTTP()); + std.debug.assert(fetch_options.proxy == null); + fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile }; + } + if (fetch_tasklet.signal) |signal| { fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); } @@ -886,7 +938,7 @@ pub const Fetch = struct { const FetchOptions = struct { method: Method, headers: Headers, - body: AnyBlob, + body: HTTPRequestBody, timeout: usize, disable_timeout: bool, disable_keepalive: bool, @@ -1339,36 +1391,114 @@ pub const Fetch = struct { ) catch unreachable; } + var http_body = FetchTasklet.HTTPRequestBody{ + .AnyBlob = body, + }; + if (body.needsToReadFile()) { - // TODO: make this async + lazy - const res = JSC.Node.NodeFS.readFile( - globalThis.bunVM().nodeFS(), - .{ - .encoding = .buffer, - .path = body.Blob.store.?.data.file.pathlike, - .offset = body.Blob.offset, - .max_size = body.Blob.size, - }, - .sync, - ); + prepare_body: { + const opened_fd_res: JSC.Node.Maybe(bun.FileDescriptor) = switch (body.Blob.store.?.data.file.pathlike) { + .fd => |fd| JSC.Node.Maybe(bun.FileDescriptor).errnoSysFd(JSC.Node.Syscall.system.dup(fd), .open, fd) orelse .{ .result = fd }, + .path => |path| JSC.Node.Syscall.open(path.sliceZ(&globalThis.bunVM().nodeFS().sync_error_buf), std.os.O.RDONLY | std.os.O.NOCTTY, 0), + }; + + const opened_fd = switch (opened_fd_res) { + .err => |err| { + bun.default_allocator.free(url_proxy_buffer); + + const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); + body.detach(); + if (headers) |*headers_| { + headers_.buf.deinit(bun.default_allocator); + headers_.entries.deinit(bun.default_allocator); + } - switch (res) { - .err => |err| { - bun.default_allocator.free(url_proxy_buffer); + return rejected_value; + }, + .result => |fd| fd, + }; - const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); - body.detach(); - if (headers) |*headers_| { - headers_.buf.deinit(bun.default_allocator); - headers_.entries.deinit(bun.default_allocator); + if (proxy == null and bun.HTTP.Sendfile.isEligible(url)) { + use_sendfile: { + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(opened_fd)) { + .result => |result| result, + // bail out for any reason + .err => break :use_sendfile, + }; + + if (Environment.isMac) { + // macOS only supports regular files for sendfile() + if (!std.os.S.ISREG(stat.mode)) { + break :use_sendfile; + } + } + + // if it's < 32 KB, it's not worth it + if (stat.size < 32 * 1024) { + break :use_sendfile; + } + + const original_size = body.Blob.size; + const stat_size = @intCast(Blob.SizeType, stat.size); + const blob_size = if (std.os.S.ISREG(stat.mode)) + stat_size + else + @min(original_size, stat_size); + + http_body = .{ + .Sendfile = .{ + .fd = opened_fd, + .remain = body.Blob.offset + original_size, + .offset = body.Blob.offset, + .content_size = blob_size, + }, + }; + + if (std.os.S.ISREG(stat.mode)) { + http_body.Sendfile.offset = @min(http_body.Sendfile.offset, stat_size); + http_body.Sendfile.remain = @min(@max(http_body.Sendfile.remain, http_body.Sendfile.offset), stat_size) -| http_body.Sendfile.offset; + } + body.detach(); + + break :prepare_body; } + } - return rejected_value; - }, - .result => |result| { - body.detach(); - body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice()))); - }, + // TODO: make this async + lazy + const res = JSC.Node.NodeFS.readFile( + globalThis.bunVM().nodeFS(), + .{ + .encoding = .buffer, + .path = .{ .fd = opened_fd }, + .offset = body.Blob.offset, + .max_size = body.Blob.size, + }, + .sync, + ); + + if (body.Blob.store.?.data.file.pathlike == .path) { + _ = JSC.Node.Syscall.close(opened_fd); + } + + switch (res) { + .err => |err| { + bun.default_allocator.free(url_proxy_buffer); + + const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); + body.detach(); + if (headers) |*headers_| { + headers_.buf.deinit(bun.default_allocator); + headers_.entries.deinit(bun.default_allocator); + } + + return rejected_value; + }, + .result => |result| { + body.detach(); + body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice()))); + http_body = .{ .AnyBlob = body }; + }, + } } } @@ -1388,7 +1518,7 @@ pub const Fetch = struct { .headers = headers orelse Headers{ .allocator = bun.default_allocator, }, - .body = body, + .body = http_body, .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index ae6443cba..aa4889892 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1572,4 +1572,9 @@ extern "C" return uwsRes->getNativeHandle(); } } + + void us_socket_sendfile_needs_more(us_socket_t *s) { + s->context->loop->data.last_write_failed = 1; + us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); + } } diff --git a/src/deps/uws.zig b/src/deps/uws.zig index c9f350a37..538756b71 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -43,6 +43,23 @@ pub fn NewSocketHandler(comptime ssl: bool) type { pub fn getNativeHandle(this: ThisSocket) *NativeSocketHandleType(ssl) { return @ptrCast(*NativeSocketHandleType(ssl), us_socket_get_native_handle(comptime ssl_int, this.socket).?); } + + pub fn fd(this: ThisSocket) i32 { + if (comptime ssl) { + @compileError("SSL sockets do not have a file descriptor accessible this way"); + } + + return @intCast(i32, @ptrToInt(us_socket_get_native_handle(0, this.socket))); + } + + pub fn markNeedsMoreForSendfile(this: ThisSocket) void { + if (comptime ssl) { + @compileError("SSL sockets do not support sendfile yet"); + } + + us_socket_sendfile_needs_more(this.socket); + } + pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType { const alignment = if (ContextType == *anyopaque) @sizeOf(usize) @@ -1882,3 +1899,5 @@ pub const State = enum(i32) { return @enumToInt(this) & @enumToInt(State.HTTP_CONNECTION_CLOSE) != 0; } }; + +extern fn us_socket_sendfile_needs_more(socket: *Socket) void; diff --git a/src/feature_flags.zig b/src/feature_flags.zig index cdfeacb10..0a0c920a4 100644 --- a/src/feature_flags.zig +++ b/src/feature_flags.zig @@ -170,3 +170,5 @@ pub const source_map_debug_id = true; pub const alignment_tweak = false; pub const export_star_redirect = false; + +pub const streaming_file_uploads_for_http_client = true; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 4e0926baa..fe5f34f48 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -71,6 +71,89 @@ pub const FetchRedirect = enum(u8) { }); }; +pub const HTTPRequestBody = union(enum) { + bytes: []const u8, + sendfile: Sendfile, + + pub fn len(this: *const HTTPRequestBody) usize { + return switch (this.*) { + .bytes => this.bytes.len, + .sendfile => this.sendfile.content_size, + }; + } +}; + +pub const Sendfile = struct { + fd: bun.FileDescriptor, + remain: usize = 0, + offset: usize = 0, + content_size: usize = 0, + + pub fn isEligible(url: bun.URL) bool { + return url.isHTTP() and url.href.len > 0 and FeatureFlags.streaming_file_uploads_for_http_client; + } + + pub fn write( + this: *Sendfile, + socket: NewHTTPContext(false).HTTPSocket, + ) Status { + const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63))); + // TODO we should not need this int cast; improve the return type of `@min` + const adjusted_count = @intCast(u63, adjusted_count_temporary); + + if (Environment.isLinux) { + var signed_offset = @intCast(i64, this.offset); + const begin = this.offset; + const val = + // this does the syscall directly, without libc + std.os.linux.sendfile(socket.fd(), this.fd, &signed_offset, this.remain); + this.offset = @intCast(u64, signed_offset); + + const errcode = std.os.linux.getErrno(val); + + this.remain -|= @intCast(u64, this.offset -| begin); + + if (errcode != .SUCCESS or this.remain == 0 or val == 0) { + if (errcode == .SUCCESS) { + return .{ .done = {} }; + } + + return .{ .err = AsyncIO.asError(errcode) }; + } + } else { + var sbytes: std.os.off_t = adjusted_count; + const signed_offset = @bitCast(i64, @as(u64, this.offset)); + const errcode = std.c.getErrno(std.c.sendfile( + this.fd, + socket.fd(), + + signed_offset, + &sbytes, + null, + 0, + )); + const wrote = @intCast(u64, sbytes); + this.offset +|= wrote; + this.remain -|= wrote; + if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) { + if (errcode == .SUCCESS) { + return .{ .done = {} }; + } + + return .{ .err = AsyncIO.asError(errcode) }; + } + } + + return .{ .again = {} }; + } + + pub const Status = union(enum) { + done: void, + err: anyerror, + again: void, + }; +}; + const ProxySSLData = struct { buffer: std.ArrayList(u8), partial: bool, @@ -738,7 +821,7 @@ pub fn onClose( if (client.allow_retry) { client.allow_retry = false; - client.start(client.state.request_body, client.state.body_out_str.?); + client.start(client.state.original_request_body, client.state.body_out_str.?); return; } @@ -915,14 +998,16 @@ pub const InternalState = struct { compressed_body: MutableString = undefined, body_size: usize = 0, request_body: []const u8 = "", + original_request_body: HTTPRequestBody = .{ .bytes = "" }, request_sent_len: usize = 0, fail: anyerror = error.NoError, request_stage: HTTPStage = .pending, response_stage: HTTPStage = .pending, - pub fn init(body: []const u8, body_out_str: *MutableString) InternalState { + pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState { return .{ - .request_body = body, + .original_request_body = body, + .request_body = if (body == .bytes) body.bytes else "", .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, .body_out_str = body_out_str, @@ -942,6 +1027,7 @@ pub const InternalState = struct { .body_out_str = body_msg, .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, + .original_request_body = .{ .bytes = "" }, .request_body = "", }; } @@ -1191,7 +1277,7 @@ pub const AsyncHTTP = struct { request_headers: Headers.Entries = Headers.Entries{}, response_headers: Headers.Entries = Headers.Entries{}, response_buffer: *MutableString, - request_body: []const u8 = "", + request_body: HTTPRequestBody = .{ .bytes = "" }, allocator: std.mem.Allocator, request_header_buf: string = "", method: Method = Method.GET, @@ -1278,7 +1364,18 @@ pub const AsyncHTTP = struct { hostname: ?[]u8, redirect_type: FetchRedirect, ) AsyncHTTP { - var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0 }; + var this = AsyncHTTP{ + .allocator = allocator, + .url = url, + .method = method, + .request_headers = headers, + .request_header_buf = headers_buf, + .request_body = .{ .bytes = request_body }, + .response_buffer = response_buffer, + .completion_callback = callback, + .http_proxy = http_proxy, + .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0, + }; this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname); this.client.async_http_id = this.async_http_id; @@ -1648,7 +1745,7 @@ pub fn doRedirect(this: *HTTPClient) void { if (this.aborted != null) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } - return this.start("", body_out_str); + return this.start(.{ .bytes = "" }, body_out_str); } pub fn isHTTPS(this: *HTTPClient) bool { if (this.http_proxy) |proxy| { @@ -1662,7 +1759,7 @@ pub fn isHTTPS(this: *HTTPClient) bool { } return false; } -pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void { +pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void { body_out_str.reset(); std.debug.assert(this.state.response_message_buffer.list.capacity == 0); @@ -1730,7 +1827,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s this.setTimeout(socket, 60); - const request = this.buildRequest(this.state.request_body.len); + const request = this.buildRequest(this.state.original_request_body.len()); if (this.http_proxy) |_| { if (this.url.isHTTPS()) { @@ -1784,7 +1881,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s std.debug.assert(!socket.isShutdown()); std.debug.assert(!socket.isClosed()); } - const amount = socket.write(to_send, false); + const amount = socket.write( + to_send, + false, + ); if (comptime is_first_call) { if (amount == 0) { // don't worry about it @@ -1804,7 +1904,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..]; } - const has_sent_body = this.state.request_body.len == 0; + const has_sent_body = if (this.state.original_request_body == .bytes) + this.state.request_body.len == 0 + else + false; if (has_sent_headers and has_sent_body) { this.state.request_stage = .done; @@ -1813,7 +1916,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s if (has_sent_headers) { this.state.request_stage = .body; - std.debug.assert(this.state.request_body.len > 0); + std.debug.assert( + // we should have leftover data OR we use sendfile() + (this.state.original_request_body == .bytes and this.state.request_body.len > 0) or + this.state.original_request_body == .sendfile, + ); // we sent everything, but there's some body leftover if (amount == @intCast(c_int, to_send.len)) { @@ -1826,19 +1933,42 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s .body => { this.setTimeout(socket, 60); - const to_send = this.state.request_body; - const amount = socket.write(to_send, true); - if (amount < 0) { - this.closeAndFail(error.WriteFailed, is_ssl, socket); - return; - } + switch (this.state.original_request_body) { + .bytes => { + const to_send = this.state.request_body; + const amount = socket.write(to_send, true); + if (amount < 0) { + this.closeAndFail(error.WriteFailed, is_ssl, socket); + return; + } - this.state.request_sent_len += @intCast(usize, amount); - this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; + this.state.request_sent_len += @intCast(usize, amount); + this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; - if (this.state.request_body.len == 0) { - this.state.request_stage = .done; - return; + if (this.state.request_body.len == 0) { + this.state.request_stage = .done; + return; + } + }, + .sendfile => |*sendfile| { + if (comptime is_ssl) { + @panic("sendfile is only supported without SSL. This code should never have been reached!"); + } + + switch (sendfile.write(socket)) { + .done => { + this.state.request_stage = .done; + return; + }, + .err => |err| { + this.closeAndFail(err, false, socket); + return; + }, + .again => { + socket.markNeedsMoreForSendfile(); + }, + } + }, } }, .proxy_body => { |