aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bench/stream-file-upload-client/.gitignore1
-rw-r--r--bench/stream-file-upload-client/README.md35
-rw-r--r--bench/stream-file-upload-client/generate-file.js8
-rw-r--r--bench/stream-file-upload-client/server-node.mjs15
-rw-r--r--bench/stream-file-upload-client/stream-file-bun.js9
-rw-r--r--bench/stream-file-upload-client/stream-file-deno.js12
-rw-r--r--bench/stream-file-upload-client/stream-file-node.mjs19
-rw-r--r--src/bun.js/webcore/response.zig198
-rw-r--r--src/deps/libuwsockets.cpp5
-rw-r--r--src/deps/uws.zig19
-rw-r--r--src/feature_flags.zig2
-rw-r--r--src/http_client_async.zig174
-rw-r--r--test/js/bun/http/fetch-file-upload.test.ts31
-rw-r--r--test/regression/issue/02499.test.ts7
14 files changed, 475 insertions, 60 deletions
diff --git a/bench/stream-file-upload-client/.gitignore b/bench/stream-file-upload-client/.gitignore
new file mode 100644
index 000000000..f0ad0dec6
--- /dev/null
+++ b/bench/stream-file-upload-client/.gitignore
@@ -0,0 +1 @@
+hello.txt
diff --git a/bench/stream-file-upload-client/README.md b/bench/stream-file-upload-client/README.md
new file mode 100644
index 000000000..0035cfcf5
--- /dev/null
+++ b/bench/stream-file-upload-client/README.md
@@ -0,0 +1,35 @@
+# HTTP request file upload benchmark
+
+This is a simple benchmark of uploading a file to a web server in different runtimes.
+
+## Usage
+
+Generate a file to upload (default is `hello.txt`):
+
+```bash
+bun generate-file.js
+```
+
+Run the server:
+
+```bash
+node server-node.mjs
+```
+
+Run the benchmark in bun:
+
+```bash
+bun stream-file-bun.js
+```
+
+Run the benchmark in node:
+
+```bash
+node stream-file-node.mjs
+```
+
+Run the benchmark in deno:
+
+```bash
+deno run -A stream-file-deno.js
+```
diff --git a/bench/stream-file-upload-client/generate-file.js b/bench/stream-file-upload-client/generate-file.js
new file mode 100644
index 000000000..b3b2080a1
--- /dev/null
+++ b/bench/stream-file-upload-client/generate-file.js
@@ -0,0 +1,8 @@
+var hey =
+ "abcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghiabcdefghi".repeat(
+ 1024,
+ );
+
+hey += hey.repeat(2);
+require("fs").writeFileSync("hello.txt", Buffer.from(hey));
+console.log("Wrote", hey.length, "bytes", "to", "hello.txt");
diff --git a/bench/stream-file-upload-client/server-node.mjs b/bench/stream-file-upload-client/server-node.mjs
new file mode 100644
index 000000000..10a7b19ed
--- /dev/null
+++ b/bench/stream-file-upload-client/server-node.mjs
@@ -0,0 +1,15 @@
+import { createServer } from "node:http";
+const server = createServer((req, res) => {
+ var chunkSize = 0;
+ req.on("data", chunk => {
+ chunkSize += chunk.byteLength;
+ });
+
+ req.on("end", () => {
+ console.log("Received", chunkSize, "bytes");
+ res.end(`${chunkSize}`);
+ });
+});
+server.listen(parseInt(process.env.PORT ?? "3000"), (err, port) => {
+ console.log(`http://localhost:${server.address().port}`);
+});
diff --git a/bench/stream-file-upload-client/stream-file-bun.js b/bench/stream-file-upload-client/stream-file-bun.js
new file mode 100644
index 000000000..e3499bd29
--- /dev/null
+++ b/bench/stream-file-upload-client/stream-file-bun.js
@@ -0,0 +1,9 @@
+import { file } from "bun";
+console.time("stream-file-bun");
+const response = await fetch(process.env.URL ?? "http://localhost:3000", {
+ method: "POST",
+ body: file(process.env.FILE ?? "hello.txt"),
+});
+console.timeEnd("stream-file-bun");
+
+console.log("Sent", await response.text(), "bytes");
diff --git a/bench/stream-file-upload-client/stream-file-deno.js b/bench/stream-file-upload-client/stream-file-deno.js
new file mode 100644
index 000000000..a87d56252
--- /dev/null
+++ b/bench/stream-file-upload-client/stream-file-deno.js
@@ -0,0 +1,12 @@
+const file = await Deno.open(Deno.env.get("FILE") ?? "hello.txt", {
+ read: true,
+});
+
+console.time("stream-file-deno");
+const response = await fetch(Deno.env.get("URL") ?? "http://localhost:3000", {
+ method: "POST",
+ body: file.readable,
+});
+console.timeEnd("stream-file-deno");
+
+console.log("Sent", await response.text(), "bytes");
diff --git a/bench/stream-file-upload-client/stream-file-node.mjs b/bench/stream-file-upload-client/stream-file-node.mjs
new file mode 100644
index 000000000..9a0957285
--- /dev/null
+++ b/bench/stream-file-upload-client/stream-file-node.mjs
@@ -0,0 +1,19 @@
+import { createReadStream } from "node:fs";
+import http from "node:http";
+
+console.time("stream-file-node");
+createReadStream(process.env.FILE ?? "hello.txt")
+ .pipe(
+ http
+ .request(process.env.URL ?? "http://localhost:3000", {
+ method: "POST",
+ })
+ .on("response", response => {
+ response.on("data", data => {
+ console.log("Sent", parseInt(data.toString(), 10), "bytes");
+ });
+ }),
+ )
+ .on("close", () => {
+ console.timeEnd("stream-file-node");
+ });
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 1ba0c82cc..ad3857685 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -629,7 +629,7 @@ pub const Fetch = struct {
result: HTTPClient.HTTPClientResult = .{},
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
- request_body: AnyBlob = undefined,
+ request_body: HTTPRequestBody = undefined,
response_buffer: MutableString = undefined,
request_headers: Headers = Headers{ .allocator = undefined },
promise: JSC.JSPromise.Strong,
@@ -647,6 +647,38 @@ pub const Fetch = struct {
abort_reason: JSValue = JSValue.zero,
// Custom Hostname
hostname: ?[]u8 = null,
+
+ pub const HTTPRequestBody = union(enum) {
+ AnyBlob: AnyBlob,
+ Sendfile: HTTPClient.Sendfile,
+
+ pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store {
+ return switch (this.*) {
+ .AnyBlob => this.AnyBlob.store(),
+ else => null,
+ };
+ }
+
+ pub fn slice(this: *const HTTPRequestBody) []const u8 {
+ return switch (this.*) {
+ .AnyBlob => this.AnyBlob.slice(),
+ else => "",
+ };
+ }
+
+ pub fn detach(this: *HTTPRequestBody) void {
+ switch (this.*) {
+ .AnyBlob => this.AnyBlob.detach(),
+ .Sendfile => {
+ if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0)
+ _ = JSC.Node.Syscall.close(this.Sendfile.fd);
+ this.Sendfile.offset = 0;
+ this.Sendfile.remain = 0;
+ },
+ }
+ }
+ };
+
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
return FetchTasklet{};
}
@@ -850,12 +882,26 @@ pub const Fetch = struct {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
}
- fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New(
- *FetchTasklet,
- FetchTasklet.callback,
- ).init(
- fetch_tasklet,
- ), proxy, if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, fetch_options.hostname, fetch_options.redirect_type);
+ fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
+ allocator,
+ fetch_options.method,
+ fetch_options.url,
+ fetch_options.headers.entries,
+ fetch_options.headers.buf.items,
+ &fetch_tasklet.response_buffer,
+ fetch_tasklet.request_body.slice(),
+ fetch_options.timeout,
+ HTTPClient.HTTPClientResult.Callback.New(
+ *FetchTasklet,
+ FetchTasklet.callback,
+ ).init(
+ fetch_tasklet,
+ ),
+ proxy,
+ if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
+ fetch_options.hostname,
+ fetch_options.redirect_type,
+ );
if (fetch_options.redirect_type != FetchRedirect.follow) {
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
@@ -865,6 +911,12 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
+ if (fetch_tasklet.request_body == .Sendfile) {
+ std.debug.assert(fetch_options.url.isHTTP());
+ std.debug.assert(fetch_options.proxy == null);
+ fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile };
+ }
+
if (fetch_tasklet.signal) |signal| {
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
}
@@ -886,7 +938,7 @@ pub const Fetch = struct {
const FetchOptions = struct {
method: Method,
headers: Headers,
- body: AnyBlob,
+ body: HTTPRequestBody,
timeout: usize,
disable_timeout: bool,
disable_keepalive: bool,
@@ -1339,36 +1391,114 @@ pub const Fetch = struct {
) catch unreachable;
}
+ var http_body = FetchTasklet.HTTPRequestBody{
+ .AnyBlob = body,
+ };
+
if (body.needsToReadFile()) {
- // TODO: make this async + lazy
- const res = JSC.Node.NodeFS.readFile(
- globalThis.bunVM().nodeFS(),
- .{
- .encoding = .buffer,
- .path = body.Blob.store.?.data.file.pathlike,
- .offset = body.Blob.offset,
- .max_size = body.Blob.size,
- },
- .sync,
- );
+ prepare_body: {
+ const opened_fd_res: JSC.Node.Maybe(bun.FileDescriptor) = switch (body.Blob.store.?.data.file.pathlike) {
+ .fd => |fd| JSC.Node.Maybe(bun.FileDescriptor).errnoSysFd(JSC.Node.Syscall.system.dup(fd), .open, fd) orelse .{ .result = fd },
+ .path => |path| JSC.Node.Syscall.open(path.sliceZ(&globalThis.bunVM().nodeFS().sync_error_buf), std.os.O.RDONLY | std.os.O.NOCTTY, 0),
+ };
+
+ const opened_fd = switch (opened_fd_res) {
+ .err => |err| {
+ bun.default_allocator.free(url_proxy_buffer);
+
+ const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
+ body.detach();
+ if (headers) |*headers_| {
+ headers_.buf.deinit(bun.default_allocator);
+ headers_.entries.deinit(bun.default_allocator);
+ }
- switch (res) {
- .err => |err| {
- bun.default_allocator.free(url_proxy_buffer);
+ return rejected_value;
+ },
+ .result => |fd| fd,
+ };
- const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
- body.detach();
- if (headers) |*headers_| {
- headers_.buf.deinit(bun.default_allocator);
- headers_.entries.deinit(bun.default_allocator);
+ if (proxy == null and bun.HTTP.Sendfile.isEligible(url)) {
+ use_sendfile: {
+ const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(opened_fd)) {
+ .result => |result| result,
+ // bail out for any reason
+ .err => break :use_sendfile,
+ };
+
+ if (Environment.isMac) {
+ // macOS only supports regular files for sendfile()
+ if (!std.os.S.ISREG(stat.mode)) {
+ break :use_sendfile;
+ }
+ }
+
+ // if it's < 32 KB, it's not worth it
+ if (stat.size < 32 * 1024) {
+ break :use_sendfile;
+ }
+
+ const original_size = body.Blob.size;
+ const stat_size = @intCast(Blob.SizeType, stat.size);
+ const blob_size = if (std.os.S.ISREG(stat.mode))
+ stat_size
+ else
+ @min(original_size, stat_size);
+
+ http_body = .{
+ .Sendfile = .{
+ .fd = opened_fd,
+ .remain = body.Blob.offset + original_size,
+ .offset = body.Blob.offset,
+ .content_size = blob_size,
+ },
+ };
+
+ if (std.os.S.ISREG(stat.mode)) {
+ http_body.Sendfile.offset = @min(http_body.Sendfile.offset, stat_size);
+ http_body.Sendfile.remain = @min(@max(http_body.Sendfile.remain, http_body.Sendfile.offset), stat_size) -| http_body.Sendfile.offset;
+ }
+ body.detach();
+
+ break :prepare_body;
}
+ }
- return rejected_value;
- },
- .result => |result| {
- body.detach();
- body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice())));
- },
+ // TODO: make this async + lazy
+ const res = JSC.Node.NodeFS.readFile(
+ globalThis.bunVM().nodeFS(),
+ .{
+ .encoding = .buffer,
+ .path = .{ .fd = opened_fd },
+ .offset = body.Blob.offset,
+ .max_size = body.Blob.size,
+ },
+ .sync,
+ );
+
+ if (body.Blob.store.?.data.file.pathlike == .path) {
+ _ = JSC.Node.Syscall.close(opened_fd);
+ }
+
+ switch (res) {
+ .err => |err| {
+ bun.default_allocator.free(url_proxy_buffer);
+
+ const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
+ body.detach();
+ if (headers) |*headers_| {
+ headers_.buf.deinit(bun.default_allocator);
+ headers_.entries.deinit(bun.default_allocator);
+ }
+
+ return rejected_value;
+ },
+ .result => |result| {
+ body.detach();
+ body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice())));
+ http_body = .{ .AnyBlob = body };
+ },
+ }
}
}
@@ -1388,7 +1518,7 @@ pub const Fetch = struct {
.headers = headers orelse Headers{
.allocator = bun.default_allocator,
},
- .body = body,
+ .body = http_body,
.timeout = std.time.ns_per_hour,
.disable_keepalive = disable_keepalive,
.disable_timeout = disable_timeout,
diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp
index ae6443cba..aa4889892 100644
--- a/src/deps/libuwsockets.cpp
+++ b/src/deps/libuwsockets.cpp
@@ -1572,4 +1572,9 @@ extern "C"
return uwsRes->getNativeHandle();
}
}
+
+ void us_socket_sendfile_needs_more(us_socket_t *s) {
+ s->context->loop->data.last_write_failed = 1;
+ us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
+ }
}
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index c9f350a37..538756b71 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -43,6 +43,23 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
pub fn getNativeHandle(this: ThisSocket) *NativeSocketHandleType(ssl) {
return @ptrCast(*NativeSocketHandleType(ssl), us_socket_get_native_handle(comptime ssl_int, this.socket).?);
}
+
+ pub fn fd(this: ThisSocket) i32 {
+ if (comptime ssl) {
+ @compileError("SSL sockets do not have a file descriptor accessible this way");
+ }
+
+ return @intCast(i32, @ptrToInt(us_socket_get_native_handle(0, this.socket)));
+ }
+
+ pub fn markNeedsMoreForSendfile(this: ThisSocket) void {
+ if (comptime ssl) {
+ @compileError("SSL sockets do not support sendfile yet");
+ }
+
+ us_socket_sendfile_needs_more(this.socket);
+ }
+
pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType {
const alignment = if (ContextType == *anyopaque)
@sizeOf(usize)
@@ -1882,3 +1899,5 @@ pub const State = enum(i32) {
return @enumToInt(this) & @enumToInt(State.HTTP_CONNECTION_CLOSE) != 0;
}
};
+
+extern fn us_socket_sendfile_needs_more(socket: *Socket) void;
diff --git a/src/feature_flags.zig b/src/feature_flags.zig
index cdfeacb10..0a0c920a4 100644
--- a/src/feature_flags.zig
+++ b/src/feature_flags.zig
@@ -170,3 +170,5 @@ pub const source_map_debug_id = true;
pub const alignment_tweak = false;
pub const export_star_redirect = false;
+
+pub const streaming_file_uploads_for_http_client = true;
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 4e0926baa..fe5f34f48 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -71,6 +71,89 @@ pub const FetchRedirect = enum(u8) {
});
};
+pub const HTTPRequestBody = union(enum) {
+ bytes: []const u8,
+ sendfile: Sendfile,
+
+ pub fn len(this: *const HTTPRequestBody) usize {
+ return switch (this.*) {
+ .bytes => this.bytes.len,
+ .sendfile => this.sendfile.content_size,
+ };
+ }
+};
+
+pub const Sendfile = struct {
+ fd: bun.FileDescriptor,
+ remain: usize = 0,
+ offset: usize = 0,
+ content_size: usize = 0,
+
+ pub fn isEligible(url: bun.URL) bool {
+ return url.isHTTP() and url.href.len > 0 and FeatureFlags.streaming_file_uploads_for_http_client;
+ }
+
+ pub fn write(
+ this: *Sendfile,
+ socket: NewHTTPContext(false).HTTPSocket,
+ ) Status {
+ const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63)));
+ // TODO we should not need this int cast; improve the return type of `@min`
+ const adjusted_count = @intCast(u63, adjusted_count_temporary);
+
+ if (Environment.isLinux) {
+ var signed_offset = @intCast(i64, this.offset);
+ const begin = this.offset;
+ const val =
+ // this does the syscall directly, without libc
+ std.os.linux.sendfile(socket.fd(), this.fd, &signed_offset, this.remain);
+ this.offset = @intCast(u64, signed_offset);
+
+ const errcode = std.os.linux.getErrno(val);
+
+ this.remain -|= @intCast(u64, this.offset -| begin);
+
+ if (errcode != .SUCCESS or this.remain == 0 or val == 0) {
+ if (errcode == .SUCCESS) {
+ return .{ .done = {} };
+ }
+
+ return .{ .err = AsyncIO.asError(errcode) };
+ }
+ } else {
+ var sbytes: std.os.off_t = adjusted_count;
+ const signed_offset = @bitCast(i64, @as(u64, this.offset));
+ const errcode = std.c.getErrno(std.c.sendfile(
+ this.fd,
+ socket.fd(),
+
+ signed_offset,
+ &sbytes,
+ null,
+ 0,
+ ));
+ const wrote = @intCast(u64, sbytes);
+ this.offset +|= wrote;
+ this.remain -|= wrote;
+ if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) {
+ if (errcode == .SUCCESS) {
+ return .{ .done = {} };
+ }
+
+ return .{ .err = AsyncIO.asError(errcode) };
+ }
+ }
+
+ return .{ .again = {} };
+ }
+
+ pub const Status = union(enum) {
+ done: void,
+ err: anyerror,
+ again: void,
+ };
+};
+
const ProxySSLData = struct {
buffer: std.ArrayList(u8),
partial: bool,
@@ -738,7 +821,7 @@ pub fn onClose(
if (client.allow_retry) {
client.allow_retry = false;
- client.start(client.state.request_body, client.state.body_out_str.?);
+ client.start(client.state.original_request_body, client.state.body_out_str.?);
return;
}
@@ -915,14 +998,16 @@ pub const InternalState = struct {
compressed_body: MutableString = undefined,
body_size: usize = 0,
request_body: []const u8 = "",
+ original_request_body: HTTPRequestBody = .{ .bytes = "" },
request_sent_len: usize = 0,
fail: anyerror = error.NoError,
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
- pub fn init(body: []const u8, body_out_str: *MutableString) InternalState {
+ pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
return .{
- .request_body = body,
+ .original_request_body = body,
+ .request_body = if (body == .bytes) body.bytes else "",
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.body_out_str = body_out_str,
@@ -942,6 +1027,7 @@ pub const InternalState = struct {
.body_out_str = body_msg,
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
+ .original_request_body = .{ .bytes = "" },
.request_body = "",
};
}
@@ -1191,7 +1277,7 @@ pub const AsyncHTTP = struct {
request_headers: Headers.Entries = Headers.Entries{},
response_headers: Headers.Entries = Headers.Entries{},
response_buffer: *MutableString,
- request_body: []const u8 = "",
+ request_body: HTTPRequestBody = .{ .bytes = "" },
allocator: std.mem.Allocator,
request_header_buf: string = "",
method: Method = Method.GET,
@@ -1278,7 +1364,18 @@ pub const AsyncHTTP = struct {
hostname: ?[]u8,
redirect_type: FetchRedirect,
) AsyncHTTP {
- var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0 };
+ var this = AsyncHTTP{
+ .allocator = allocator,
+ .url = url,
+ .method = method,
+ .request_headers = headers,
+ .request_header_buf = headers_buf,
+ .request_body = .{ .bytes = request_body },
+ .response_buffer = response_buffer,
+ .completion_callback = callback,
+ .http_proxy = http_proxy,
+ .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
+ };
this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname);
this.client.async_http_id = this.async_http_id;
@@ -1648,7 +1745,7 @@ pub fn doRedirect(this: *HTTPClient) void {
if (this.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
- return this.start("", body_out_str);
+ return this.start(.{ .bytes = "" }, body_out_str);
}
pub fn isHTTPS(this: *HTTPClient) bool {
if (this.http_proxy) |proxy| {
@@ -1662,7 +1759,7 @@ pub fn isHTTPS(this: *HTTPClient) bool {
}
return false;
}
-pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void {
+pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void {
body_out_str.reset();
std.debug.assert(this.state.response_message_buffer.list.capacity == 0);
@@ -1730,7 +1827,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
this.setTimeout(socket, 60);
- const request = this.buildRequest(this.state.request_body.len);
+ const request = this.buildRequest(this.state.original_request_body.len());
if (this.http_proxy) |_| {
if (this.url.isHTTPS()) {
@@ -1784,7 +1881,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
std.debug.assert(!socket.isShutdown());
std.debug.assert(!socket.isClosed());
}
- const amount = socket.write(to_send, false);
+ const amount = socket.write(
+ to_send,
+ false,
+ );
if (comptime is_first_call) {
if (amount == 0) {
// don't worry about it
@@ -1804,7 +1904,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
}
- const has_sent_body = this.state.request_body.len == 0;
+ const has_sent_body = if (this.state.original_request_body == .bytes)
+ this.state.request_body.len == 0
+ else
+ false;
if (has_sent_headers and has_sent_body) {
this.state.request_stage = .done;
@@ -1813,7 +1916,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
if (has_sent_headers) {
this.state.request_stage = .body;
- std.debug.assert(this.state.request_body.len > 0);
+ std.debug.assert(
+ // we should have leftover data OR we use sendfile()
+ (this.state.original_request_body == .bytes and this.state.request_body.len > 0) or
+ this.state.original_request_body == .sendfile,
+ );
// we sent everything, but there's some body leftover
if (amount == @intCast(c_int, to_send.len)) {
@@ -1826,19 +1933,42 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
.body => {
this.setTimeout(socket, 60);
- const to_send = this.state.request_body;
- const amount = socket.write(to_send, true);
- if (amount < 0) {
- this.closeAndFail(error.WriteFailed, is_ssl, socket);
- return;
- }
+ switch (this.state.original_request_body) {
+ .bytes => {
+ const to_send = this.state.request_body;
+ const amount = socket.write(to_send, true);
+ if (amount < 0) {
+ this.closeAndFail(error.WriteFailed, is_ssl, socket);
+ return;
+ }
- this.state.request_sent_len += @intCast(usize, amount);
- this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
+ this.state.request_sent_len += @intCast(usize, amount);
+ this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
- if (this.state.request_body.len == 0) {
- this.state.request_stage = .done;
- return;
+ if (this.state.request_body.len == 0) {
+ this.state.request_stage = .done;
+ return;
+ }
+ },
+ .sendfile => |*sendfile| {
+ if (comptime is_ssl) {
+ @panic("sendfile is only supported without SSL. This code should never have been reached!");
+ }
+
+ switch (sendfile.write(socket)) {
+ .done => {
+ this.state.request_stage = .done;
+ return;
+ },
+ .err => |err| {
+ this.closeAndFail(err, false, socket);
+ return;
+ },
+ .again => {
+ socket.markNeedsMoreForSendfile();
+ },
+ }
+ },
}
},
.proxy_body => {
diff --git a/test/js/bun/http/fetch-file-upload.test.ts b/test/js/bun/http/fetch-file-upload.test.ts
index 33f0d0581..b070fbd6e 100644
--- a/test/js/bun/http/fetch-file-upload.test.ts
+++ b/test/js/bun/http/fetch-file-upload.test.ts
@@ -1,5 +1,7 @@
import { expect, test, describe } from "bun:test";
import { withoutAggressiveGC } from "harness";
+import { tmpdir } from "os";
+import { join } from "path";
test("uploads roundtrip", async () => {
const body = Bun.file(import.meta.dir + "/fetch.js.txt");
@@ -32,6 +34,35 @@ test("uploads roundtrip", async () => {
server.stop(true);
});
+test("uploads roundtrip with sendfile()", async () => {
+ var hugeTxt = "huge".repeat(1024 * 1024 * 32);
+ const path = join(tmpdir(), "huge.txt");
+ require("fs").writeFileSync(path, hugeTxt);
+
+ const server = Bun.serve({
+ maxRequestBodySize: 1024 * 1024 * 1024 * 8,
+ async fetch(req) {
+ var count = 0;
+ for await (let chunk of req.body!) {
+ count += chunk.byteLength;
+ }
+ return new Response(count + "");
+ },
+ });
+
+ const resp = await fetch("http://" + server.hostname + ":" + server.port, {
+ body: Bun.file(path),
+ method: "PUT",
+ });
+
+ expect(resp.status).toBe(200);
+
+ const body = parseInt(await resp.text());
+ expect(body).toBe(hugeTxt.length);
+
+ server.stop(true);
+});
+
test("missing file throws the expected error", async () => {
Bun.gc(true);
// Run this 1000 times to check for GC bugs
diff --git a/test/regression/issue/02499.test.ts b/test/regression/issue/02499.test.ts
index 0e4666b36..f1ee1da80 100644
--- a/test/regression/issue/02499.test.ts
+++ b/test/regression/issue/02499.test.ts
@@ -12,8 +12,7 @@ it("onAborted() and onWritable are not called after receiving an empty response
testDone(new Error("Test timed out, which means it failed"));
};
- const body = new FormData();
- body.append("hey", "hi");
+ const invalidJSON = Buffer.from("invalid json");
// We want to test that the server isn't keeping the connection open in a
// zombie-like state when an error occurs due to an unhandled rejected promise
@@ -69,7 +68,7 @@ it("onAborted() and onWritable are not called after receiving an empty response
try {
await fetch(`http://${hostname}:${port}/upload`, {
- body,
+ body: invalidJSON,
keepalive: false,
method: "POST",
timeout: true,
@@ -91,4 +90,4 @@ it("onAborted() and onWritable are not called after receiving an empty response
}
timeout.onabort = () => {};
testDone();
-});
+}, 30_000);