diff options
-rw-r--r-- | src/bun.js/api/bun.classes.ts | 8 | ||||
-rw-r--r-- | src/bun.js/api/bun.zig | 331 | ||||
-rw-r--r-- | src/bun.js/base.zig | 20 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 3 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 3 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 50 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 87 |
7 files changed, 384 insertions, 118 deletions
diff --git a/src/bun.js/api/bun.classes.ts b/src/bun.js/api/bun.classes.ts index 5a6ae47cc..bbb1df944 100644 --- a/src/bun.js/api/bun.classes.ts +++ b/src/bun.js/api/bun.classes.ts @@ -19,6 +19,14 @@ export default [ getter: "getStdout", cache: true, }, + writable: { + getter: "getStdin", + cache: true, + }, + readable: { + getter: "getStdout", + cache: true, + }, stderr: { getter: "getStderr", cache: true, diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index 1171d804e..e1df32bc3 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -3514,6 +3514,7 @@ pub const JSZlib = struct { }; pub const Subprocess = struct { + const log = Output.scoped(.Subprocess, true); pub usingnamespace JSC.Codegen.JSSubprocess; pid: std.os.pid_t, @@ -3569,7 +3570,7 @@ pub const Subprocess = struct { ignore: void, closed: void, - pub fn init(stdio: std.meta.Tag(Stdio), fd: i32, globalThis: *JSC.JSGlobalObject) Readable { + pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) Readable { return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, @@ -3582,7 +3583,8 @@ pub const Subprocess = struct { out.ptr.File.stored_global_this_ = globalThis; break :brk Readable{ .pipe = out }; }, - .callback, .fd, .path, .blob => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }, + .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }, + else => unreachable, }; } @@ -3653,32 +3655,43 @@ pub const Subprocess = struct { return JSValue.jsUndefined(); } + switch (this.tryKill(sig)) { + .result => {}, + .err => |err| { + globalThis.throwValue(err.toJSC(globalThis)); + return JSValue.jsUndefined(); + }, + } + + return JSValue.jsUndefined(); + } + + pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) { if (this.killed) { - return JSValue.jsUndefined(); + return .{ .result = {} }; } if (comptime Environment.isLinux) { // should this be handled differently? // this effectively shouldn't happen if (this.pidfd == std.math.maxInt(std.os.fd_t)) { - return JSValue.jsUndefined(); + return .{ .result = {} }; } // first appeared in Linux 5.1 const rc = std.os.linux.pidfd_send_signal(this.pidfd, @intCast(u8, sig), null, 0); if (rc != 0) { - globalThis.throwValue(JSC.Node.Syscall.Error.fromCode(std.os.linux.getErrno(rc), .kill).toJSC(globalThis)); - return JSValue.jsUndefined(); + return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.linux.getErrno(rc), .kill) }; } } else { const err = std.c.kill(this.pid, sig); if (err != 0) { - return JSC.Node.Syscall.Error.fromCode(std.c.getErrno(err), .kill).toJSC(globalThis); + return .{ .err = JSC.Node.Syscall.Error.fromCode(std.c.getErrno(err), .kill) }; } } - return JSValue.jsUndefined(); + return .{ .result = {} }; } pub fn onKill( @@ -3701,11 +3714,13 @@ pub const Subprocess = struct { } if (this.stdout == .pipe) { - this.stdout.pipe.cancel(this.globalThis); + if (this.stdout.pipe.isDisturbed(this.globalThis)) + this.stdout.pipe.cancel(this.globalThis); } if (this.stderr == .pipe) { - this.stderr.pipe.cancel(this.globalThis); + if (this.stderr.pipe.isDisturbed(this.globalThis)) + this.stderr.pipe.cancel(this.globalThis); } this.stdin.close(); @@ -3737,15 +3752,112 @@ pub const Subprocess = struct { return JSValue.jsBoolean(this.killed); } + pub const BufferedInput = struct { + remain: []const u8 = "", + fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + poll_ref: JSC.PollRef = .{}, + written: usize = 0, + + source: union(enum) { + blob: JSC.WebCore.AnyBlob, + array_buffer: JSC.ArrayBuffer.Strong, + }, + + pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady); + + pub fn onReady(this: *BufferedInput, size_or_offset: i64) void { + this.write(@intCast(usize, @maximum(size_or_offset, 0))); + } + + pub fn write(this: *BufferedInput, _: usize) void { + var to_write = this.remain; + + if (to_write.len == 0) { + if (this.poll_ref.isActive()) this.unwatch(this.fd); + // we are done! + this.closeFDIfOpen(); + return; + } + + while (to_write.len > 0) { + switch (JSC.Node.Syscall.write(this.fd, to_write)) { + .err => |e| { + if (e.isRetry()) { + log("write({d}) retry", .{ + to_write.len, + }); + + this.watch(this.fd); + return; + } + + // fail + log("write({d}) fail: {d}", .{ to_write.len, e.errno }); + this.deinit(); + return; + }, + + .result => |bytes_written| { + this.written += bytes_written; + + log( + "write({d}) {d}", + .{ + to_write.len, + bytes_written, + }, + ); + + this.remain = this.remain[@minimum(bytes_written, this.remain.len)..]; + to_write = to_write[bytes_written..]; + + // we are done or it accepts no more input + if (this.remain.len == 0 or bytes_written == 0) { + this.deinit(); + return; + } + }, + } + } + } + + fn closeFDIfOpen(this: *BufferedInput) void { + if (this.poll_ref.isActive()) this.unwatch(this.fd); + + if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + _ = JSC.Node.Syscall.close(this.fd); + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + } + } + + pub fn deinit(this: *BufferedInput) void { + this.closeFDIfOpen(); + + switch (this.source) { + .blob => |*blob| { + blob.detach(); + }, + .array_buffer => |*array_buffer| { + array_buffer.deinit(); + }, + } + } + }; + const Writable = union(enum) { pipe: *JSC.WebCore.FileSink, + pipe_to_readable_stream: struct { + pipe: *JSC.WebCore.FileSink, + readable_stream: JSC.WebCore.ReadableStream, + }, fd: JSC.Node.FileDescriptor, + buffered_input: BufferedInput, inherit: void, ignore: void, - pub fn init(stdio: std.meta.Tag(Stdio), fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { + pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { switch (stdio) { - .path, .pipe, .callback => { + .path, .pipe => { var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); sink.* = .{ .fd = fd, @@ -3753,9 +3865,33 @@ pub const Subprocess = struct { .allocator = globalThis.bunVM().allocator, }; + if (stdio == .pipe) { + if (stdio.pipe) |readable| { + return Writable{ + .pipe_to_readable_stream = .{ + .pipe = sink, + .readable_stream = readable, + }, + }; + } + } + return Writable{ .pipe = sink }; }, - .blob, .fd => { + .array_buffer, .blob => { + var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined }; + switch (stdio) { + .array_buffer => |array_buffer| { + buffered_input.source = .{ .array_buffer = array_buffer }; + }, + .blob => |blob| { + buffered_input.source = .{ .blob = blob }; + }, + else => unreachable, + } + return Writable{ .buffered_input = buffered_input }; + }, + .fd => { return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }; }, .inherit => { @@ -3773,6 +3909,8 @@ pub const Subprocess = struct { .fd => |fd| JSValue.jsNumber(fd), .ignore => JSValue.jsUndefined(), .inherit => JSValue.jsUndefined(), + .buffered_input => JSValue.jsUndefined(), + .pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value, }; } @@ -3781,9 +3919,15 @@ pub const Subprocess = struct { .pipe => |pipe| { _ = pipe.end(null); }, + .pipe_to_readable_stream => |*pipe_to_readable_stream| { + _ = pipe_to_readable_stream.pipe.end(null); + }, .fd => |fd| { _ = JSC.Node.Syscall.close(fd); }, + .buffered_input => { + this.buffered_input.deinit(); + }, .ignore => {}, .inherit => {}, }; @@ -3841,7 +3985,7 @@ pub const Subprocess = struct { var stdio = [3]Stdio{ .{ .ignore = .{} }, .{ .inherit = .{} }, - .{ .pipe = .{} }, + .{ .pipe = null }, }; var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse ""; @@ -4007,19 +4151,19 @@ pub const Subprocess = struct { -1, ); - const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| { + const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stdin pipe: {s}", .{err}); return JSValue.jsUndefined(); } else undefined; errdefer if (stdio[0].isPiped()) destroyPipe(stdin_pipe); - const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| { + const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stdout pipe: {s}", .{err}); return JSValue.jsUndefined(); } else undefined; errdefer if (stdio[1].isPiped()) destroyPipe(stdout_pipe); - const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| { + const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stderr pipe: {s}", .{err}); return JSValue.jsUndefined(); } else undefined; @@ -4096,6 +4240,10 @@ pub const Subprocess = struct { } }; + // set non-blocking stdin + if (stdio[0].isPiped()) + _ = std.os.fcntl(stdin_pipe[1], std.os.F.SETFL, std.os.O.NONBLOCK) catch 0; + var subprocess = globalThis.allocator().create(Subprocess) catch { globalThis.throw("out of memory", .{}); return JSValue.jsUndefined(); @@ -4105,12 +4253,12 @@ pub const Subprocess = struct { .globalThis = globalThis, .pid = pid, .pidfd = pidfd, - .stdin = Writable.init(std.meta.activeTag(stdio[std.os.STDIN_FILENO]), stdin_pipe[1], globalThis) catch { + .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch { globalThis.throw("out of memory", .{}); return JSValue.jsUndefined(); }, - .stdout = Readable.init(std.meta.activeTag(stdio[std.os.STDOUT_FILENO]), stdout_pipe[0], globalThis), - .stderr = Readable.init(std.meta.activeTag(stdio[std.os.STDERR_FILENO]), stderr_pipe[0], globalThis), + .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis), + .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis), }; subprocess.this_jsvalue = subprocess.toJS(globalThis); @@ -4134,6 +4282,14 @@ pub const Subprocess = struct { }, } + if (subprocess.stdin == .buffered_input) { + subprocess.stdin.buffered_input.remain = switch (subprocess.stdin.buffered_input.source) { + .blob => subprocess.stdin.buffered_input.source.blob.slice(), + .array_buffer => |array_buffer| array_buffer.slice(), + }; + subprocess.stdin.buffered_input.write(0); + } + return subprocess.this_jsvalue; } @@ -4206,13 +4362,13 @@ pub const Subprocess = struct { ignore: void, fd: JSC.Node.FileDescriptor, path: JSC.Node.PathLike, - blob: JSC.WebCore.Blob, - pipe: void, - callback: JSC.JSValue, + blob: JSC.WebCore.AnyBlob, + pipe: ?JSC.WebCore.ReadableStream, + array_buffer: JSC.ArrayBuffer.Strong, pub fn isPiped(self: Stdio) bool { return switch (self) { - .blob, .callback, .pipe => true, + .array_buffer, .blob, .pipe => true, else => false, }; } @@ -4225,8 +4381,10 @@ pub const Subprocess = struct { _: i32, ) !void { switch (stdio) { - .blob, .callback, .pipe => { + .array_buffer, .blob, .pipe => { + std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile())); const idx: usize = if (std_fileno == 0) 0 else 1; + try actions.dup2(pipe_fd[idx], std_fileno); try actions.close(pipe_fd[1 - idx]); }, @@ -4252,6 +4410,50 @@ pub const Subprocess = struct { } }; + fn extractStdioBlob( + globalThis: *JSC.JSGlobalObject, + blob: JSC.WebCore.AnyBlob, + i: usize, + stdio_array: []Stdio, + ) bool { + if (blob.needsToReadFile()) { + if (blob.store()) |store| { + if (store.data.file.pathlike == .fd) { + if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) { + stdio_array[i] = Stdio{ .inherit = {} }; + } else { + switch (@intCast(std.os.fd_t, i)) { + std.os.STDIN_FILENO => { + if (i == std.os.STDERR_FILENO or i == std.os.STDOUT_FILENO) { + globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); + return false; + } + }, + + std.os.STDOUT_FILENO, std.os.STDERR_FILENO => { + if (i == std.os.STDIN_FILENO) { + globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); + return false; + } + }, + else => {}, + } + + stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; + } + + return true; + } + + stdio_array[i] = .{ .path = store.data.file.pathlike.path }; + return true; + } + } + + stdio_array[i] = .{ .blob = blob }; + return true; + } + fn extractStdio( globalThis: *JSC.JSGlobalObject, i: usize, @@ -4269,7 +4471,7 @@ pub const Subprocess = struct { } else if (str.eqlComptime("ignore")) { stdio_array[i] = Stdio{ .ignore = {} }; } else if (str.eqlComptime("pipe")) { - stdio_array[i] = Stdio{ .pipe = {} }; + stdio_array[i] = Stdio{ .pipe = null }; } else { globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{}); return false; @@ -4306,53 +4508,48 @@ pub const Subprocess = struct { return true; } else if (value.as(JSC.WebCore.Blob)) |blob| { - var store = blob.store orelse { - globalThis.throwInvalidArguments("Blob is detached (in stdio)", .{}); - return false; - }; - - if (i == std.os.STDIN_FILENO and store.data == .bytes) { - stdio_array[i] = .{ .blob = blob.dupe() }; - return true; - } - - if (store.data != .file) { - globalThis.throwInvalidArguments("Blob is not a file (in stdio)", .{}); - return false; - } - - if (store.data.file.pathlike == .fd) { - if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) { - stdio_array[i] = Stdio{ .inherit = {} }; - } else { - switch (@intCast(std.os.fd_t, i)) { - std.os.STDIN_FILENO => { - if (i == std.os.STDERR_FILENO or i == std.os.STDOUT_FILENO) { - globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); - return false; - } - }, + return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, stdio_array); + } else if (value.as(JSC.WebCore.Request)) |req| { + req.getBodyValue().toBlobIfPossible(); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array); + } else if (value.as(JSC.WebCore.Response)) |req| { + req.getBodyValue().toBlobIfPossible(); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array); + } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |*req| { + if (i == std.os.STDIN_FILENO) { + if (req.toAnyBlob(globalThis)) |blob| { + return extractStdioBlob(globalThis, blob, i, stdio_array); + } - std.os.STDOUT_FILENO, std.os.STDERR_FILENO => { - if (i == std.os.STDIN_FILENO) { - globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); - return false; - } - }, - else => {}, - } + switch (req.ptr) { + .File, .Blob => unreachable, + .Direct, .JavaScript, .Bytes => { + if (req.isLocked(globalThis)) { + globalThis.throwInvalidArguments("ReadableStream cannot be locked", .{}); + return false; + } - stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; + stdio_array[i] = .{ .pipe = req.* }; + return true; + }, + else => {}, } - return true; + globalThis.throwInvalidArguments("Unsupported ReadableStream type", .{}); + return false; + } + } else if (value.asArrayBuffer(globalThis)) |array_buffer| { + if (array_buffer.slice().len == 0) { + globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{}); + return false; } - stdio_array[i] = .{ .path = store.data.file.pathlike.path }; - return true; - } else if (value.isCallable(globalThis.vm())) { - stdio_array[i] = .{ .callback = value }; - value.ensureStillAlive(); + stdio_array[i] = .{ + .array_buffer = JSC.ArrayBuffer.Strong{ + .array_buffer = array_buffer, + .held = JSC.Strong.create(array_buffer.value, globalThis), + }, + }; return true; } diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 23b00bd8e..59cad571b 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -2384,6 +2384,24 @@ pub const ArrayBuffer = extern struct { value: JSC.JSValue = JSC.JSValue.zero, shared: bool = false, + pub const Strong = struct { + array_buffer: ArrayBuffer, + held: JSC.Strong = .{}, + + pub fn clear(this: *ArrayBuffer.Strong) void { + var ref: *JSC.napi.Ref = this.ref orelse return; + ref.set(JSC.JSValue.zero); + } + + pub fn slice(this: *const ArrayBuffer.Strong) []u8 { + return this.array_buffer.slice(); + } + + pub fn deinit(this: *ArrayBuffer.Strong) void { + this.held.deinit(); + } + }; + pub const empty = ArrayBuffer{ .offset = 0, .len = 0, .byte_len = 0, .typed_array_type = .Uint8Array, .ptr = undefined }; pub const name = "Bun__ArrayBuffer"; @@ -3902,7 +3920,7 @@ pub const Ref = struct { pub const PollRef = struct { status: Status = .inactive, - const log = Output.scoped(.PollRef, true); + const log = Output.scoped(.PollRef, false); const Status = enum { active, inactive, done }; diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 067d10366..5321ca922 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -268,6 +268,9 @@ function readableStreamPipeToWritableStream( ) { "use strict"; + const isDirectStream = !!@getByIdDirectPrivate(source, "start"); + + @assert(@isReadableStream(source)); @assert(@isWritableStream(destination)); @assert(!@isReadableStreamLocked(source)); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 4c36f1c05..76206de54 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -519,7 +519,7 @@ pub const Poller = struct { const FileBlobLoader = JSC.WebCore.FileBlobLoader; const FileSink = JSC.WebCore.FileSink; const Subprocess = JSC.Subprocess; - + const BufferedInput = Subprocess.BufferedInput; /// epoll only allows one pointer /// We unfortunately need two pointers: one for a function call and one for the context /// We use a tagged pointer union and then call the function with the context pointer @@ -527,6 +527,7 @@ pub const Poller = struct { FileBlobLoader, FileSink, Subprocess, + BufferedInput, }); const Kevent = std.os.Kevent; const kevent = std.c.kevent; diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 5d641831f..079fa7f7f 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -3991,7 +3991,7 @@ pub const AnyBlob = union(enum) { }; } - pub fn store(this: *@This()) ?*Blob.Store { + pub fn store(this: *const @This()) ?*Blob.Store { if (this.* == .Blob) { return this.Blob.store; } @@ -4333,50 +4333,14 @@ pub const Body = struct { } pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob { - const stream = this.readable orelse return null; - - switch (stream.ptr) { - .Blob => |blobby| { - var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global); - blob.offset = blobby.offset; - blob.size = blobby.remain; - blob.store.?.ref(); - stream.detach(this.global); - stream.done(); - blobby.deinit(); - this.readable = null; - return AnyBlob{ .Blob = blob }; - }, - .File => |blobby| { - var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global); - blobby.store.ref(); - - // it should be lazy, file shouldn't have opened yet. - std.debug.assert(!blobby.started); - - stream.detach(this.global); - blobby.deinit(); - stream.done(); - this.readable = null; - return AnyBlob{ .Blob = blob }; - }, - .Bytes => |bytes| { - - // If we've received the complete body by the time this function is called - // we can avoid streaming it and convert it to a Blob - if (bytes.has_received_last_chunk) { - stream.detach(this.global); - var blob: JSC.WebCore.AnyBlob = undefined; - blob.from(bytes.buffer); - bytes.parent().deinit(); - this.readable = null; - return blob; - } + var stream = if (this.readable != null) &this.readable.? else return null; - return null; - }, - else => return null, + if (stream.toAnyBlob(this.global)) |blob| { + this.readable = null; + return blob; } + + return null; } pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue { diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index e6f8b2378..0f0577b1b 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -49,6 +49,7 @@ const Request = JSC.WebCore.Request; const assert = std.debug.assert; const Syscall = JSC.Node.Syscall; +const AnyBlob = JSC.WebCore.AnyBlob; pub const ReadableStream = struct { value: JSValue, ptr: Source, @@ -57,6 +58,52 @@ pub const ReadableStream = struct { return this.value; } + pub fn toAnyBlob( + stream: *ReadableStream, + globalThis: *JSC.JSGlobalObject, + ) ?JSC.WebCore.AnyBlob { + switch (stream.ptr) { + .Blob => |blobby| { + var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis); + blob.offset = blobby.offset; + blob.size = blobby.remain; + blob.store.?.ref(); + stream.detach(globalThis); + stream.done(); + blobby.deinit(); + + return AnyBlob{ .Blob = blob }; + }, + .File => |blobby| { + var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis); + blobby.store.ref(); + + // it should be lazy, file shouldn't have opened yet. + std.debug.assert(!blobby.started); + + stream.detach(globalThis); + blobby.deinit(); + stream.done(); + return AnyBlob{ .Blob = blob }; + }, + .Bytes => |bytes| { + + // If we've received the complete body by the time this function is called + // we can avoid streaming it and convert it to a Blob + if (bytes.has_received_last_chunk) { + stream.detach(globalThis); + var blob: JSC.WebCore.AnyBlob = undefined; + blob.from(bytes.buffer); + bytes.parent().deinit(); + return blob; + } + + return null; + }, + else => return null, + } + } + pub fn done(this: *const ReadableStream) void { this.value.unprotect(); } @@ -2973,6 +3020,8 @@ pub const FileBlobLoader = struct { stored_global_this_: ?*JSC.JSGlobalObject = null, poll_ref: JSC.PollRef = .{}, + has_adjusted_pipe_size_on_linux: bool = false, + pub usingnamespace NewReadyWatcher(@This(), .read, ready); pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject { @@ -3358,14 +3407,40 @@ pub const FileBlobLoader = struct { // macOS FIONREAD doesn't seem to work here // but we can get this information from the kqueue callback so we don't need to - if (len == 0) { - const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD; - const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len); - if (rc != 0) { - len = 0; + if (comptime Environment.isLinux) { + if (len == 0) { + const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD; + const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len); + if (rc != 0) { + len = 0; + } + + // In Linux versions before 2.6.11, the capacity of a + // pipe was the same as the system page size (e.g., 4096 + // bytes on i386). Since Linux 2.6.11, the pipe + // capacity is 16 pages (i.e., 65,536 bytes in a system + // with a page size of 4096 bytes). Since Linux 2.6.35, + // the default pipe capacity is 16 pages, but the + // capacity can be queried and set using the + // fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. + // See fcntl(2) for more information. + + //:# define F_SETPIPE_SZ 1031 /* Set pipe page size array. + const F_SETPIPE_SZ = 1031; + const F_GETPIPE_SZ = 1032; + + if (!this.has_adjusted_pipe_size_on_linux) { + if (len + 1024 > 16 * std.mem.page_size) { + this.has_adjusted_pipe_size_on_linux = true; + var pipe_len: c_int = 0; + _ = std.c.fcntl(this.fd, F_GETPIPE_SZ, &pipe_len); + if (pipe_len <= 16 * std.mem.page_size) { + _ = std.c.fcntl(this.fd, F_SETPIPE_SZ, 512 * 1024); + } + } + } } } - if (len > read_buf.len * 10 and read_buf.len < std.mem.page_size) { // then we need to allocate a buffer // to read into |