diff options
-rw-r--r-- | bench/stream-file-upload-client/.gitignore | 1 | ||||
-rw-r--r-- | bench/stream-file-upload-client/README.md | 35 | ||||
-rw-r--r-- | bench/stream-file-upload-client/generate-file.js | 8 | ||||
-rw-r--r-- | bench/stream-file-upload-client/server-node.mjs | 15 | ||||
-rw-r--r-- | bench/stream-file-upload-client/stream-file-bun.js | 9 | ||||
-rw-r--r-- | bench/stream-file-upload-client/stream-file-deno.js | 12 | ||||
-rw-r--r-- | bench/stream-file-upload-client/stream-file-node.mjs | 19 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 198 | ||||
-rw-r--r-- | src/deps/libuwsockets.cpp | 5 | ||||
-rw-r--r-- | src/deps/uws.zig | 19 | ||||
-rw-r--r-- | src/feature_flags.zig | 2 | ||||
-rw-r--r-- | src/http_client_async.zig | 174 | ||||
-rw-r--r-- | test/js/bun/http/fetch-file-upload.test.ts | 31 | ||||
-rw-r--r-- | test/regression/issue/02499.test.ts | 7 |
14 files changed, 475 insertions, 60 deletions
diff --git a/bench/stream-file-upload-client/.gitignore b/bench/stream-file-upload-client/.gitignore new file mode 100644 index 000000000..f0ad0dec6 --- /dev/null +++ b/bench/stream-file-upload-client/.gitignore @@ -0,0 +1 @@ +hello.txt diff --git a/bench/stream-file-upload-client/README.md b/bench/stream-file-upload-client/README.md new file mode 100644 index 000000000..0035cfcf5 --- /dev/null +++ b/bench/stream-file-upload-client/README.md @@ -0,0 +1,35 @@ +# HTTP request file upload benchmark + +This is a simple benchmark of uploading a file to a web server in different runtimes. + +## Usage + +Generate a file to upload (default is `hello.txt`): + +```bash +bun generate-file.js +``` + +Run the server: + +```bash +node server-node.mjs +``` + +Run the benchmark in bun: + +```bash +bun stream-file-bun.js +``` + +Run the benchmark in node: + +```bash +node stream-file-node.mjs +``` + +Run the benchmark in deno: + +```bash +deno run -A stream-file-deno.js +``` diff --git a/bench/stream-file-upload-client/generate-file.js b/bench/stream-file-upload-client/generate-file.js new file mode 100644 index 000000000..b3b2080a1 --- /dev/null +++ b/bench/stream-file-upload-client/generate-file.js @@ -0,0 +1,8 @@ +var hey = + "abcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghi".repeat( + 1024, + ); + +hey += hey.repeat(2); +require("fs").writeFileSync("hello.txt", Buffer.from(hey)); +console.log("Wrote", hey.length, "bytes", "to", "hello.txt"); diff --git a/bench/stream-file-upload-client/server-node.mjs b/bench/stream-file-upload-client/server-node.mjs new file mode 100644 index 000000000..10a7b19ed --- /dev/null +++ b/bench/stream-file-upload-client/server-node.mjs @@ -0,0 +1,15 @@ +import { createServer } from "node:http"; +const server = createServer((req, res) => { + var chunkSize = 0; + req.on("data", chunk => { + chunkSize += chunk.byteLength; + }); + + req.on("end", () => { + console.log("Received", chunkSize, "bytes"); + res.end(`${chunkSize}`); + }); +}); +server.listen(parseInt(process.env.PORT ?? "3000"), (err, port) => { + console.log(`http://localhost:${server.address().port}`); +}); diff --git a/bench/stream-file-upload-client/stream-file-bun.js b/bench/stream-file-upload-client/stream-file-bun.js new file mode 100644 index 000000000..e3499bd29 --- /dev/null +++ b/bench/stream-file-upload-client/stream-file-bun.js @@ -0,0 +1,9 @@ +import { file } from "bun"; +console.time("stream-file-bun"); +const response = await fetch(process.env.URL ?? "http://localhost:3000", { + method: "POST", + body: file(process.env.FILE ?? "hello.txt"), +}); +console.timeEnd("stream-file-bun"); + +console.log("Sent", await response.text(), "bytes"); diff --git a/bench/stream-file-upload-client/stream-file-deno.js b/bench/stream-file-upload-client/stream-file-deno.js new file mode 100644 index 000000000..a87d56252 --- /dev/null +++ b/bench/stream-file-upload-client/stream-file-deno.js @@ -0,0 +1,12 @@ +const file = await Deno.open(Deno.env.get("FILE") ?? "hello.txt", { + read: true, +}); + +console.time("stream-file-deno"); +const response = await fetch(Deno.env.get("URL") ?? "http://localhost:3000", { + method: "POST", + body: file.readable, +}); +console.timeEnd("stream-file-deno"); + +console.log("Sent", await response.text(), "bytes"); diff --git a/bench/stream-file-upload-client/stream-file-node.mjs b/bench/stream-file-upload-client/stream-file-node.mjs new file mode 100644 index 000000000..9a0957285 --- /dev/null +++ b/bench/stream-file-upload-client/stream-file-node.mjs @@ -0,0 +1,19 @@ +import { createReadStream } from "node:fs"; +import http from "node:http"; + +console.time("stream-file-node"); +createReadStream(process.env.FILE ?? "hello.txt") + .pipe( + http + .request(process.env.URL ?? "http://localhost:3000", { + method: "POST", + }) + .on("response", response => { + response.on("data", data => { + console.log("Sent", parseInt(data.toString(), 10), "bytes"); + }); + }), + ) + .on("close", () => { + console.timeEnd("stream-file-node"); + }); diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 1ba0c82cc..ad3857685 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -629,7 +629,7 @@ pub const Fetch = struct { result: HTTPClient.HTTPClientResult = .{}, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, - request_body: AnyBlob = undefined, + request_body: HTTPRequestBody = undefined, response_buffer: MutableString = undefined, request_headers: Headers = Headers{ .allocator = undefined }, promise: JSC.JSPromise.Strong, @@ -647,6 +647,38 @@ pub const Fetch = struct { abort_reason: JSValue = JSValue.zero, // Custom Hostname hostname: ?[]u8 = null, + + pub const HTTPRequestBody = union(enum) { + AnyBlob: AnyBlob, + Sendfile: HTTPClient.Sendfile, + + pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store { + return switch (this.*) { + .AnyBlob => this.AnyBlob.store(), + else => null, + }; + } + + pub fn slice(this: *const HTTPRequestBody) []const u8 { + return switch (this.*) { + .AnyBlob => this.AnyBlob.slice(), + else => "", + }; + } + + pub fn detach(this: *HTTPRequestBody) void { + switch (this.*) { + .AnyBlob => this.AnyBlob.detach(), + .Sendfile => { + if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0) + _ = JSC.Node.Syscall.close(this.Sendfile.fd); + this.Sendfile.offset = 0; + this.Sendfile.remain = 0; + }, + } + } + }; + pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { return FetchTasklet{}; } @@ -850,12 +882,26 @@ pub const Fetch = struct { proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url); } - fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New( - *FetchTasklet, - FetchTasklet.callback, - ).init( - fetch_tasklet, - ), proxy, if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, fetch_options.hostname, fetch_options.redirect_type); + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( + allocator, + fetch_options.method, + fetch_options.url, + fetch_options.headers.entries, + fetch_options.headers.buf.items, + &fetch_tasklet.response_buffer, + fetch_tasklet.request_body.slice(), + fetch_options.timeout, + HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + fetch_tasklet, + ), + proxy, + if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, + fetch_options.hostname, + fetch_options.redirect_type, + ); if (fetch_options.redirect_type != FetchRedirect.follow) { fetch_tasklet.http.?.client.remaining_redirect_count = 0; @@ -865,6 +911,12 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; + if (fetch_tasklet.request_body == .Sendfile) { + std.debug.assert(fetch_options.url.isHTTP()); + std.debug.assert(fetch_options.proxy == null); + fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile }; + } + if (fetch_tasklet.signal) |signal| { fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); } @@ -886,7 +938,7 @@ pub const Fetch = struct { const FetchOptions = struct { method: Method, headers: Headers, - body: AnyBlob, + body: HTTPRequestBody, timeout: usize, disable_timeout: bool, disable_keepalive: bool, @@ -1339,36 +1391,114 @@ pub const Fetch = struct { ) catch unreachable; } + var http_body = FetchTasklet.HTTPRequestBody{ + .AnyBlob = body, + }; + if (body.needsToReadFile()) { - // TODO: make this async + lazy - const res = JSC.Node.NodeFS.readFile( - globalThis.bunVM().nodeFS(), - .{ - .encoding = .buffer, - .path = body.Blob.store.?.data.file.pathlike, - .offset = body.Blob.offset, - .max_size = body.Blob.size, - }, - .sync, - ); + prepare_body: { + const opened_fd_res: JSC.Node.Maybe(bun.FileDescriptor) = switch (body.Blob.store.?.data.file.pathlike) { + .fd => |fd| JSC.Node.Maybe(bun.FileDescriptor).errnoSysFd(JSC.Node.Syscall.system.dup(fd), .open, fd) orelse .{ .result = fd }, + .path => |path| JSC.Node.Syscall.open(path.sliceZ(&globalThis.bunVM().nodeFS().sync_error_buf), std.os.O.RDONLY | std.os.O.NOCTTY, 0), + }; + + const opened_fd = switch (opened_fd_res) { + .err => |err| { + bun.default_allocator.free(url_proxy_buffer); + + const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); + body.detach(); + if (headers) |*headers_| { + headers_.buf.deinit(bun.default_allocator); + headers_.entries.deinit(bun.default_allocator); + } - switch (res) { - .err => |err| { - bun.default_allocator.free(url_proxy_buffer); + return rejected_value; + }, + .result => |fd| fd, + }; - const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); - body.detach(); - if (headers) |*headers_| { - headers_.buf.deinit(bun.default_allocator); - headers_.entries.deinit(bun.default_allocator); + if (proxy == null and bun.HTTP.Sendfile.isEligible(url)) { + use_sendfile: { + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(opened_fd)) { + .result => |result| result, + // bail out for any reason + .err => break :use_sendfile, + }; + + if (Environment.isMac) { + // macOS only supports regular files for sendfile() + if (!std.os.S.ISREG(stat.mode)) { + break :use_sendfile; + } + } + + // if it's < 32 KB, it's not worth it + if (stat.size < 32 * 1024) { + break :use_sendfile; + } + + const original_size = body.Blob.size; + const stat_size = @intCast(Blob.SizeType, stat.size); + const blob_size = if (std.os.S.ISREG(stat.mode)) + stat_size + else + @min(original_size, stat_size); + + http_body = .{ + .Sendfile = .{ + .fd = opened_fd, + .remain = body.Blob.offset + original_size, + .offset = body.Blob.offset, + .content_size = blob_size, + }, + }; + + if (std.os.S.ISREG(stat.mode)) { + http_body.Sendfile.offset = @min(http_body.Sendfile.offset, stat_size); + http_body.Sendfile.remain = @min(@max(http_body.Sendfile.remain, http_body.Sendfile.offset), stat_size) -| http_body.Sendfile.offset; + } + body.detach(); + + break :prepare_body; } + } - return rejected_value; - }, - .result => |result| { - body.detach(); - body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice()))); - }, + // TODO: make this async + lazy + const res = JSC.Node.NodeFS.readFile( + globalThis.bunVM().nodeFS(), + .{ + .encoding = .buffer, + .path = .{ .fd = opened_fd }, + .offset = body.Blob.offset, + .max_size = body.Blob.size, + }, + .sync, + ); + + if (body.Blob.store.?.data.file.pathlike == .path) { + _ = JSC.Node.Syscall.close(opened_fd); + } + + switch (res) { + .err => |err| { + bun.default_allocator.free(url_proxy_buffer); + + const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); + body.detach(); + if (headers) |*headers_| { + headers_.buf.deinit(bun.default_allocator); + headers_.entries.deinit(bun.default_allocator); + } + + return rejected_value; + }, + .result => |result| { + body.detach(); + body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice()))); + http_body = .{ .AnyBlob = body }; + }, + } } } @@ -1388,7 +1518,7 @@ pub const Fetch = struct { .headers = headers orelse Headers{ .allocator = bun.default_allocator, }, - .body = body, + .body = http_body, .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index ae6443cba..aa4889892 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1572,4 +1572,9 @@ extern "C" return uwsRes->getNativeHandle(); } } + + void us_socket_sendfile_needs_more(us_socket_t *s) { + s->context->loop->data.last_write_failed = 1; + us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); + } } diff --git a/src/deps/uws.zig b/src/deps/uws.zig index c9f350a37..538756b71 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -43,6 +43,23 @@ pub fn NewSocketHandler(comptime ssl: bool) type { pub fn getNativeHandle(this: ThisSocket) *NativeSocketHandleType(ssl) { return @ptrCast(*NativeSocketHandleType(ssl), us_socket_get_native_handle(comptime ssl_int, this.socket).?); } + + pub fn fd(this: ThisSocket) i32 { + if (comptime ssl) { + @compileError("SSL sockets do not have a file descriptor accessible this way"); + } + + return @intCast(i32, @ptrToInt(us_socket_get_native_handle(0, this.socket))); + } + + pub fn markNeedsMoreForSendfile(this: ThisSocket) void { + if (comptime ssl) { + @compileError("SSL sockets do not support sendfile yet"); + } + + us_socket_sendfile_needs_more(this.socket); + } + pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType { const alignment = if (ContextType == *anyopaque) @sizeOf(usize) @@ -1882,3 +1899,5 @@ pub const State = enum(i32) { return @enumToInt(this) & @enumToInt(State.HTTP_CONNECTION_CLOSE) != 0; } }; + +extern fn us_socket_sendfile_needs_more(socket: *Socket) void; diff --git a/src/feature_flags.zig b/src/feature_flags.zig index cdfeacb10..0a0c920a4 100644 --- a/src/feature_flags.zig +++ b/src/feature_flags.zig @@ -170,3 +170,5 @@ pub const source_map_debug_id = true; pub const alignment_tweak = false; pub const export_star_redirect = false; + +pub const streaming_file_uploads_for_http_client = true; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 4e0926baa..fe5f34f48 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -71,6 +71,89 @@ pub const FetchRedirect = enum(u8) { }); }; +pub const HTTPRequestBody = union(enum) { + bytes: []const u8, + sendfile: Sendfile, + + pub fn len(this: *const HTTPRequestBody) usize { + return switch (this.*) { + .bytes => this.bytes.len, + .sendfile => this.sendfile.content_size, + }; + } +}; + +pub const Sendfile = struct { + fd: bun.FileDescriptor, + remain: usize = 0, + offset: usize = 0, + content_size: usize = 0, + + pub fn isEligible(url: bun.URL) bool { + return url.isHTTP() and url.href.len > 0 and FeatureFlags.streaming_file_uploads_for_http_client; + } + + pub fn write( + this: *Sendfile, + socket: NewHTTPContext(false).HTTPSocket, + ) Status { + const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63))); + // TODO we should not need this int cast; improve the return type of `@min` + const adjusted_count = @intCast(u63, adjusted_count_temporary); + + if (Environment.isLinux) { + var signed_offset = @intCast(i64, this.offset); + const begin = this.offset; + const val = + // this does the syscall directly, without libc + std.os.linux.sendfile(socket.fd(), this.fd, &signed_offset, this.remain); + this.offset = @intCast(u64, signed_offset); + + const errcode = std.os.linux.getErrno(val); + + this.remain -|= @intCast(u64, this.offset -| begin); + + if (errcode != .SUCCESS or this.remain == 0 or val == 0) { + if (errcode == .SUCCESS) { + return .{ .done = {} }; + } + + return .{ .err = AsyncIO.asError(errcode) }; + } + } else { + var sbytes: std.os.off_t = adjusted_count; + const signed_offset = @bitCast(i64, @as(u64, this.offset)); + const errcode = std.c.getErrno(std.c.sendfile( + this.fd, + socket.fd(), + + signed_offset, + &sbytes, + null, + 0, + )); + const wrote = @intCast(u64, sbytes); + this.offset +|= wrote; + this.remain -|= wrote; + if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) { + if (errcode == .SUCCESS) { + return .{ .done = {} }; + } + + return .{ .err = AsyncIO.asError(errcode) }; + } + } + + return .{ .again = {} }; + } + + pub const Status = union(enum) { + done: void, + err: anyerror, + again: void, + }; +}; + const ProxySSLData = struct { buffer: std.ArrayList(u8), partial: bool, @@ -738,7 +821,7 @@ pub fn onClose( if (client.allow_retry) { client.allow_retry = false; - client.start(client.state.request_body, client.state.body_out_str.?); + client.start(client.state.original_request_body, client.state.body_out_str.?); return; } @@ -915,14 +998,16 @@ pub const InternalState = struct { compressed_body: MutableString = undefined, body_size: usize = 0, request_body: []const u8 = "", + original_request_body: HTTPRequestBody = .{ .bytes = "" }, request_sent_len: usize = 0, fail: anyerror = error.NoError, request_stage: HTTPStage = .pending, response_stage: HTTPStage = .pending, - pub fn init(body: []const u8, body_out_str: *MutableString) InternalState { + pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState { return .{ - .request_body = body, + .original_request_body = body, + .request_body = if (body == .bytes) body.bytes else "", .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, .body_out_str = body_out_str, @@ -942,6 +1027,7 @@ pub const InternalState = struct { .body_out_str = body_msg, .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, + .original_request_body = .{ .bytes = "" }, .request_body = "", }; } @@ -1191,7 +1277,7 @@ pub const AsyncHTTP = struct { request_headers: Headers.Entries = Headers.Entries{}, response_headers: Headers.Entries = Headers.Entries{}, response_buffer: *MutableString, - request_body: []const u8 = "", + request_body: HTTPRequestBody = .{ .bytes = "" }, allocator: std.mem.Allocator, request_header_buf: string = "", method: Method = Method.GET, @@ -1278,7 +1364,18 @@ pub const AsyncHTTP = struct { hostname: ?[]u8, redirect_type: FetchRedirect, ) AsyncHTTP { - var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0 }; + var this = AsyncHTTP{ + .allocator = allocator, + .url = url, + .method = method, + .request_headers = headers, + .request_header_buf = headers_buf, + .request_body = .{ .bytes = request_body }, + .response_buffer = response_buffer, + .completion_callback = callback, + .http_proxy = http_proxy, + .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0, + }; this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname); this.client.async_http_id = this.async_http_id; @@ -1648,7 +1745,7 @@ pub fn doRedirect(this: *HTTPClient) void { if (this.aborted != null) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } - return this.start("", body_out_str); + return this.start(.{ .bytes = "" }, body_out_str); } pub fn isHTTPS(this: *HTTPClient) bool { if (this.http_proxy) |proxy| { @@ -1662,7 +1759,7 @@ pub fn isHTTPS(this: *HTTPClient) bool { } return false; } -pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void { +pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void { body_out_str.reset(); std.debug.assert(this.state.response_message_buffer.list.capacity == 0); @@ -1730,7 +1827,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s this.setTimeout(socket, 60); - const request = this.buildRequest(this.state.request_body.len); + const request = this.buildRequest(this.state.original_request_body.len()); if (this.http_proxy) |_| { if (this.url.isHTTPS()) { @@ -1784,7 +1881,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s std.debug.assert(!socket.isShutdown()); std.debug.assert(!socket.isClosed()); } - const amount = socket.write(to_send, false); + const amount = socket.write( + to_send, + false, + ); if (comptime is_first_call) { if (amount == 0) { // don't worry about it @@ -1804,7 +1904,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..]; } - const has_sent_body = this.state.request_body.len == 0; + const has_sent_body = if (this.state.original_request_body == .bytes) + this.state.request_body.len == 0 + else + false; if (has_sent_headers and has_sent_body) { this.state.request_stage = .done; @@ -1813,7 +1916,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s if (has_sent_headers) { this.state.request_stage = .body; - std.debug.assert(this.state.request_body.len > 0); + std.debug.assert( + // we should have leftover data OR we use sendfile() + (this.state.original_request_body == .bytes and this.state.request_body.len > 0) or + this.state.original_request_body == .sendfile, + ); // we sent everything, but there's some body leftover if (amount == @intCast(c_int, to_send.len)) { @@ -1826,19 +1933,42 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s .body => { this.setTimeout(socket, 60); - const to_send = this.state.request_body; - const amount = socket.write(to_send, true); - if (amount < 0) { - this.closeAndFail(error.WriteFailed, is_ssl, socket); - return; - } + switch (this.state.original_request_body) { + .bytes => { + const to_send = this.state.request_body; + const amount = socket.write(to_send, true); + if (amount < 0) { + this.closeAndFail(error.WriteFailed, is_ssl, socket); + return; + } - this.state.request_sent_len += @intCast(usize, amount); - this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; + this.state.request_sent_len += @intCast(usize, amount); + this.state.request_body = this.state.request_body[@intCast(usize, amount)..]; - if (this.state.request_body.len == 0) { - this.state.request_stage = .done; - return; + if (this.state.request_body.len == 0) { + this.state.request_stage = .done; + return; + } + }, + .sendfile => |*sendfile| { + if (comptime is_ssl) { + @panic("sendfile is only supported without SSL. This code should never have been reached!"); + } + + switch (sendfile.write(socket)) { + .done => { + this.state.request_stage = .done; + return; + }, + .err => |err| { + this.closeAndFail(err, false, socket); + return; + }, + .again => { + socket.markNeedsMoreForSendfile(); + }, + } + }, } }, .proxy_body => { diff --git a/test/js/bun/http/fetch-file-upload.test.ts b/test/js/bun/http/fetch-file-upload.test.ts index 33f0d0581..b070fbd6e 100644 --- a/test/js/bun/http/fetch-file-upload.test.ts +++ b/test/js/bun/http/fetch-file-upload.test.ts @@ -1,5 +1,7 @@ import { expect, test, describe } from "bun:test"; import { withoutAggressiveGC } from "harness"; +import { tmpdir } from "os"; +import { join } from "path"; test("uploads roundtrip", async () => { const body = Bun.file(import.meta.dir + "/fetch.js.txt"); @@ -32,6 +34,35 @@ test("uploads roundtrip", async () => { server.stop(true); }); +test("uploads roundtrip with sendfile()", async () => { + var hugeTxt = "huge".repeat(1024 * 1024 * 32); + const path = join(tmpdir(), "huge.txt"); + require("fs").writeFileSync(path, hugeTxt); + + const server = Bun.serve({ + maxRequestBodySize: 1024 * 1024 * 1024 * 8, + async fetch(req) { + var count = 0; + for await (let chunk of req.body!) { + count += chunk.byteLength; + } + return new Response(count + ""); + }, + }); + + const resp = await fetch("http://" + server.hostname + ":" + server.port, { + body: Bun.file(path), + method: "PUT", + }); + + expect(resp.status).toBe(200); + + const body = parseInt(await resp.text()); + expect(body).toBe(hugeTxt.length); + + server.stop(true); +}); + test("missing file throws the expected error", async () => { Bun.gc(true); // Run this 1000 times to check for GC bugs diff --git a/test/regression/issue/02499.test.ts b/test/regression/issue/02499.test.ts index 0e4666b36..f1ee1da80 100644 --- a/test/regression/issue/02499.test.ts +++ b/test/regression/issue/02499.test.ts @@ -12,8 +12,7 @@ it("onAborted() and onWritable are not called after receiving an empty response testDone(new Error("Test timed out, which means it failed")); }; - const body = new FormData(); - body.append("hey", "hi"); + const invalidJSON = Buffer.from("invalid json"); // We want to test that the server isn't keeping the connection open in a // zombie-like state when an error occurs due to an unhandled rejected promise @@ -69,7 +68,7 @@ it("onAborted() and onWritable are not called after receiving an empty response try { await fetch(`http://${hostname}:${port}/upload`, { - body, + body: invalidJSON, keepalive: false, method: "POST", timeout: true, @@ -91,4 +90,4 @@ it("onAborted() and onWritable are not called after receiving an empty response } timeout.onabort = () => {}; testDone(); -}); +}, 30_000); |