diff options
Diffstat (limited to 'src/bun.js/api/bun.zig')
-rw-r--r-- | src/bun.js/api/bun.zig | 331 |
1 files changed, 264 insertions, 67 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index 1171d804e..e1df32bc3 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -3514,6 +3514,7 @@ pub const JSZlib = struct { }; pub const Subprocess = struct { + const log = Output.scoped(.Subprocess, true); pub usingnamespace JSC.Codegen.JSSubprocess; pid: std.os.pid_t, @@ -3569,7 +3570,7 @@ pub const Subprocess = struct { ignore: void, closed: void, - pub fn init(stdio: std.meta.Tag(Stdio), fd: i32, globalThis: *JSC.JSGlobalObject) Readable { + pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) Readable { return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, @@ -3582,7 +3583,8 @@ pub const Subprocess = struct { out.ptr.File.stored_global_this_ = globalThis; break :brk Readable{ .pipe = out }; }, - .callback, .fd, .path, .blob => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }, + .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }, + else => unreachable, }; } @@ -3653,32 +3655,43 @@ pub const Subprocess = struct { return JSValue.jsUndefined(); } + switch (this.tryKill(sig)) { + .result => {}, + .err => |err| { + globalThis.throwValue(err.toJSC(globalThis)); + return JSValue.jsUndefined(); + }, + } + + return JSValue.jsUndefined(); + } + + pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) { if (this.killed) { - return JSValue.jsUndefined(); + return .{ .result = {} }; } if (comptime Environment.isLinux) { // should this be handled differently? // this effectively shouldn't happen if (this.pidfd == std.math.maxInt(std.os.fd_t)) { - return JSValue.jsUndefined(); + return .{ .result = {} }; } // first appeared in Linux 5.1 const rc = std.os.linux.pidfd_send_signal(this.pidfd, @intCast(u8, sig), null, 0); if (rc != 0) { - globalThis.throwValue(JSC.Node.Syscall.Error.fromCode(std.os.linux.getErrno(rc), .kill).toJSC(globalThis)); - return JSValue.jsUndefined(); + return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.linux.getErrno(rc), .kill) }; } } else { const err = std.c.kill(this.pid, sig); if (err != 0) { - return JSC.Node.Syscall.Error.fromCode(std.c.getErrno(err), .kill).toJSC(globalThis); + return .{ .err = JSC.Node.Syscall.Error.fromCode(std.c.getErrno(err), .kill) }; } } - return JSValue.jsUndefined(); + return .{ .result = {} }; } pub fn onKill( @@ -3701,11 +3714,13 @@ pub const Subprocess = struct { } if (this.stdout == .pipe) { - this.stdout.pipe.cancel(this.globalThis); + if (this.stdout.pipe.isDisturbed(this.globalThis)) + this.stdout.pipe.cancel(this.globalThis); } if (this.stderr == .pipe) { - this.stderr.pipe.cancel(this.globalThis); + if (this.stderr.pipe.isDisturbed(this.globalThis)) + this.stderr.pipe.cancel(this.globalThis); } this.stdin.close(); @@ -3737,15 +3752,112 @@ pub const Subprocess = struct { return JSValue.jsBoolean(this.killed); } + pub const BufferedInput = struct { + remain: []const u8 = "", + fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + poll_ref: JSC.PollRef = .{}, + written: usize = 0, + + source: union(enum) { + blob: JSC.WebCore.AnyBlob, + array_buffer: JSC.ArrayBuffer.Strong, + }, + + pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady); + + pub fn onReady(this: *BufferedInput, size_or_offset: i64) void { + this.write(@intCast(usize, @maximum(size_or_offset, 0))); + } + + pub fn write(this: *BufferedInput, _: usize) void { + var to_write = this.remain; + + if (to_write.len == 0) { + if (this.poll_ref.isActive()) this.unwatch(this.fd); + // we are done! + this.closeFDIfOpen(); + return; + } + + while (to_write.len > 0) { + switch (JSC.Node.Syscall.write(this.fd, to_write)) { + .err => |e| { + if (e.isRetry()) { + log("write({d}) retry", .{ + to_write.len, + }); + + this.watch(this.fd); + return; + } + + // fail + log("write({d}) fail: {d}", .{ to_write.len, e.errno }); + this.deinit(); + return; + }, + + .result => |bytes_written| { + this.written += bytes_written; + + log( + "write({d}) {d}", + .{ + to_write.len, + bytes_written, + }, + ); + + this.remain = this.remain[@minimum(bytes_written, this.remain.len)..]; + to_write = to_write[bytes_written..]; + + // we are done or it accepts no more input + if (this.remain.len == 0 or bytes_written == 0) { + this.deinit(); + return; + } + }, + } + } + } + + fn closeFDIfOpen(this: *BufferedInput) void { + if (this.poll_ref.isActive()) this.unwatch(this.fd); + + if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + _ = JSC.Node.Syscall.close(this.fd); + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + } + } + + pub fn deinit(this: *BufferedInput) void { + this.closeFDIfOpen(); + + switch (this.source) { + .blob => |*blob| { + blob.detach(); + }, + .array_buffer => |*array_buffer| { + array_buffer.deinit(); + }, + } + } + }; + const Writable = union(enum) { pipe: *JSC.WebCore.FileSink, + pipe_to_readable_stream: struct { + pipe: *JSC.WebCore.FileSink, + readable_stream: JSC.WebCore.ReadableStream, + }, fd: JSC.Node.FileDescriptor, + buffered_input: BufferedInput, inherit: void, ignore: void, - pub fn init(stdio: std.meta.Tag(Stdio), fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { + pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { switch (stdio) { - .path, .pipe, .callback => { + .path, .pipe => { var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); sink.* = .{ .fd = fd, @@ -3753,9 +3865,33 @@ pub const Subprocess = struct { .allocator = globalThis.bunVM().allocator, }; + if (stdio == .pipe) { + if (stdio.pipe) |readable| { + return Writable{ + .pipe_to_readable_stream = .{ + .pipe = sink, + .readable_stream = readable, + }, + }; + } + } + return Writable{ .pipe = sink }; }, - .blob, .fd => { + .array_buffer, .blob => { + var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined }; + switch (stdio) { + .array_buffer => |array_buffer| { + buffered_input.source = .{ .array_buffer = array_buffer }; + }, + .blob => |blob| { + buffered_input.source = .{ .blob = blob }; + }, + else => unreachable, + } + return Writable{ .buffered_input = buffered_input }; + }, + .fd => { return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }; }, .inherit => { @@ -3773,6 +3909,8 @@ pub const Subprocess = struct { .fd => |fd| JSValue.jsNumber(fd), .ignore => JSValue.jsUndefined(), .inherit => JSValue.jsUndefined(), + .buffered_input => JSValue.jsUndefined(), + .pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value, }; } @@ -3781,9 +3919,15 @@ pub const Subprocess = struct { .pipe => |pipe| { _ = pipe.end(null); }, + .pipe_to_readable_stream => |*pipe_to_readable_stream| { + _ = pipe_to_readable_stream.pipe.end(null); + }, .fd => |fd| { _ = JSC.Node.Syscall.close(fd); }, + .buffered_input => { + this.buffered_input.deinit(); + }, .ignore => {}, .inherit => {}, }; @@ -3841,7 +3985,7 @@ pub const Subprocess = struct { var stdio = [3]Stdio{ .{ .ignore = .{} }, .{ .inherit = .{} }, - .{ .pipe = .{} }, + .{ .pipe = null }, }; var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse ""; @@ -4007,19 +4151,19 @@ pub const Subprocess = struct { -1, ); - const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| { + const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stdin pipe: {s}", .{err}); return JSValue.jsUndefined(); } else undefined; errdefer if (stdio[0].isPiped()) destroyPipe(stdin_pipe); - const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| { + const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stdout pipe: {s}", .{err}); return JSValue.jsUndefined(); } else undefined; errdefer if (stdio[1].isPiped()) destroyPipe(stdout_pipe); - const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| { + const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stderr pipe: {s}", .{err}); return JSValue.jsUndefined(); } else undefined; @@ -4096,6 +4240,10 @@ pub const Subprocess = struct { } }; + // set non-blocking stdin + if (stdio[0].isPiped()) + _ = std.os.fcntl(stdin_pipe[1], std.os.F.SETFL, std.os.O.NONBLOCK) catch 0; + var subprocess = globalThis.allocator().create(Subprocess) catch { globalThis.throw("out of memory", .{}); return JSValue.jsUndefined(); @@ -4105,12 +4253,12 @@ pub const Subprocess = struct { .globalThis = globalThis, .pid = pid, .pidfd = pidfd, - .stdin = Writable.init(std.meta.activeTag(stdio[std.os.STDIN_FILENO]), stdin_pipe[1], globalThis) catch { + .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch { globalThis.throw("out of memory", .{}); return JSValue.jsUndefined(); }, - .stdout = Readable.init(std.meta.activeTag(stdio[std.os.STDOUT_FILENO]), stdout_pipe[0], globalThis), - .stderr = Readable.init(std.meta.activeTag(stdio[std.os.STDERR_FILENO]), stderr_pipe[0], globalThis), + .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis), + .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis), }; subprocess.this_jsvalue = subprocess.toJS(globalThis); @@ -4134,6 +4282,14 @@ pub const Subprocess = struct { }, } + if (subprocess.stdin == .buffered_input) { + subprocess.stdin.buffered_input.remain = switch (subprocess.stdin.buffered_input.source) { + .blob => subprocess.stdin.buffered_input.source.blob.slice(), + .array_buffer => |array_buffer| array_buffer.slice(), + }; + subprocess.stdin.buffered_input.write(0); + } + return subprocess.this_jsvalue; } @@ -4206,13 +4362,13 @@ pub const Subprocess = struct { ignore: void, fd: JSC.Node.FileDescriptor, path: JSC.Node.PathLike, - blob: JSC.WebCore.Blob, - pipe: void, - callback: JSC.JSValue, + blob: JSC.WebCore.AnyBlob, + pipe: ?JSC.WebCore.ReadableStream, + array_buffer: JSC.ArrayBuffer.Strong, pub fn isPiped(self: Stdio) bool { return switch (self) { - .blob, .callback, .pipe => true, + .array_buffer, .blob, .pipe => true, else => false, }; } @@ -4225,8 +4381,10 @@ pub const Subprocess = struct { _: i32, ) !void { switch (stdio) { - .blob, .callback, .pipe => { + .array_buffer, .blob, .pipe => { + std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile())); const idx: usize = if (std_fileno == 0) 0 else 1; + try actions.dup2(pipe_fd[idx], std_fileno); try actions.close(pipe_fd[1 - idx]); }, @@ -4252,6 +4410,50 @@ pub const Subprocess = struct { } }; + fn extractStdioBlob( + globalThis: *JSC.JSGlobalObject, + blob: JSC.WebCore.AnyBlob, + i: usize, + stdio_array: []Stdio, + ) bool { + if (blob.needsToReadFile()) { + if (blob.store()) |store| { + if (store.data.file.pathlike == .fd) { + if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) { + stdio_array[i] = Stdio{ .inherit = {} }; + } else { + switch (@intCast(std.os.fd_t, i)) { + std.os.STDIN_FILENO => { + if (i == std.os.STDERR_FILENO or i == std.os.STDOUT_FILENO) { + globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); + return false; + } + }, + + std.os.STDOUT_FILENO, std.os.STDERR_FILENO => { + if (i == std.os.STDIN_FILENO) { + globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); + return false; + } + }, + else => {}, + } + + stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; + } + + return true; + } + + stdio_array[i] = .{ .path = store.data.file.pathlike.path }; + return true; + } + } + + stdio_array[i] = .{ .blob = blob }; + return true; + } + fn extractStdio( globalThis: *JSC.JSGlobalObject, i: usize, @@ -4269,7 +4471,7 @@ pub const Subprocess = struct { } else if (str.eqlComptime("ignore")) { stdio_array[i] = Stdio{ .ignore = {} }; } else if (str.eqlComptime("pipe")) { - stdio_array[i] = Stdio{ .pipe = {} }; + stdio_array[i] = Stdio{ .pipe = null }; } else { globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{}); return false; @@ -4306,53 +4508,48 @@ pub const Subprocess = struct { return true; } else if (value.as(JSC.WebCore.Blob)) |blob| { - var store = blob.store orelse { - globalThis.throwInvalidArguments("Blob is detached (in stdio)", .{}); - return false; - }; - - if (i == std.os.STDIN_FILENO and store.data == .bytes) { - stdio_array[i] = .{ .blob = blob.dupe() }; - return true; - } - - if (store.data != .file) { - globalThis.throwInvalidArguments("Blob is not a file (in stdio)", .{}); - return false; - } - - if (store.data.file.pathlike == .fd) { - if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) { - stdio_array[i] = Stdio{ .inherit = {} }; - } else { - switch (@intCast(std.os.fd_t, i)) { - std.os.STDIN_FILENO => { - if (i == std.os.STDERR_FILENO or i == std.os.STDOUT_FILENO) { - globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); - return false; - } - }, + return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, stdio_array); + } else if (value.as(JSC.WebCore.Request)) |req| { + req.getBodyValue().toBlobIfPossible(); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array); + } else if (value.as(JSC.WebCore.Response)) |req| { + req.getBodyValue().toBlobIfPossible(); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array); + } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |*req| { + if (i == std.os.STDIN_FILENO) { + if (req.toAnyBlob(globalThis)) |blob| { + return extractStdioBlob(globalThis, blob, i, stdio_array); + } - std.os.STDOUT_FILENO, std.os.STDERR_FILENO => { - if (i == std.os.STDIN_FILENO) { - globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); - return false; - } - }, - else => {}, - } + switch (req.ptr) { + .File, .Blob => unreachable, + .Direct, .JavaScript, .Bytes => { + if (req.isLocked(globalThis)) { + globalThis.throwInvalidArguments("ReadableStream cannot be locked", .{}); + return false; + } - stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; + stdio_array[i] = .{ .pipe = req.* }; + return true; + }, + else => {}, } - return true; + globalThis.throwInvalidArguments("Unsupported ReadableStream type", .{}); + return false; + } + } else if (value.asArrayBuffer(globalThis)) |array_buffer| { + if (array_buffer.slice().len == 0) { + globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{}); + return false; } - stdio_array[i] = .{ .path = store.data.file.pathlike.path }; - return true; - } else if (value.isCallable(globalThis.vm())) { - stdio_array[i] = .{ .callback = value }; - value.ensureStillAlive(); + stdio_array[i] = .{ + .array_buffer = JSC.ArrayBuffer.Strong{ + .array_buffer = array_buffer, + .held = JSC.Strong.create(array_buffer.value, globalThis), + }, + }; return true; } |