diff options
Diffstat (limited to 'src/bun.js/api')
-rw-r--r-- | src/bun.js/api/bun.zig | 15 | ||||
-rw-r--r-- | src/bun.js/api/bun/spawn.zig | 6 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 527 | ||||
-rw-r--r-- | src/bun.js/api/html_rewriter.zig | 3 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 15 |
5 files changed, 280 insertions, 286 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index 3a88f7a04..ee26b09f5 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -2448,8 +2448,7 @@ pub const Timer = struct { } pub fn deinit(this: *Timeout) void { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); var vm = this.globalThis.bunVM(); this.poll_ref.unref(vm); @@ -2465,7 +2464,7 @@ pub const Timer = struct { countdown: JSValue, repeat: bool, ) !void { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); var vm = globalThis.bunVM(); // We don't deal with nesting levels directly @@ -2534,7 +2533,7 @@ pub const Timer = struct { callback: JSValue, countdown: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); const id = globalThis.bunVM().timer.last_id; globalThis.bunVM().timer.last_id +%= 1; @@ -2548,7 +2547,7 @@ pub const Timer = struct { callback: JSValue, countdown: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); const id = globalThis.bunVM().timer.last_id; globalThis.bunVM().timer.last_id +%= 1; @@ -2559,7 +2558,7 @@ pub const Timer = struct { } pub fn clearTimer(timer_id: JSValue, _: *JSGlobalObject, repeats: bool) void { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); var map = if (repeats) &VirtualMachine.vm.timer.interval_map else &VirtualMachine.vm.timer.timeout_map; const id: Timeout.ID = .{ @@ -2580,7 +2579,7 @@ pub const Timer = struct { globalThis: *JSGlobalObject, id: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); Timer.clearTimer(id, globalThis, false); return JSValue.jsUndefined(); } @@ -2588,7 +2587,7 @@ pub const Timer = struct { globalThis: *JSGlobalObject, id: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); Timer.clearTimer(id, globalThis, true); return JSValue.jsUndefined(); } diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index d594d44a7..afcc5509b 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -57,8 +57,6 @@ pub const PosixSpawn = struct { } else { _ = system.posix_spawnattr_destroy(&self.attr); } - - self.* = undefined; } pub fn get(self: Attr) !u16 { @@ -207,6 +205,10 @@ pub const PosixSpawn = struct { argv, envp, ); + if (comptime bun.Environment.allow_assert) + JSC.Node.Syscall.syslog("posix_spawn({s}) = {d} ({d})", .{ + path, rc, pid, + }); if (comptime bun.Environment.isLinux) { // rc is negative because it's libc errno diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index e09c5db2b..c85e0396f 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -28,7 +28,6 @@ pub const Subprocess = struct { stdin: Writable, stdout: Readable, stderr: Readable, - killed: bool = false, poll_ref: ?*JSC.FilePoll = null, @@ -47,8 +46,22 @@ pub const Subprocess = struct { finalized: bool = false, globalThis: *JSC.JSGlobalObject, - + observable_getters: std.enums.EnumSet(enum { + stdin, + stdout, + stderr, + }) = .{}, has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), + is_sync: bool = false, + + pub fn hasExited(this: *const Subprocess) bool { + return this.exit_code != null or this.waitpid_err != null; + } + + pub fn updateHasPendingActivityFlag(this: *Subprocess) void { + @fence(.SeqCst); + this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst); + } pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool { @fence(.Acquire); @@ -78,7 +91,7 @@ pub const Subprocess = struct { } const Readable = union(enum) { - fd: JSC.Node.FileDescriptor, + fd: bun.FileDescriptor, pipe: Pipe, inherit: void, @@ -102,44 +115,37 @@ pub const Subprocess = struct { return; } - if (this.buffer.fd != JSC.Node.invalid_fd) { - this.buffer.close(); - } + this.buffer.close(); } pub fn toJS(this: *@This(), readable: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue { - if (this.* == .stream) { - if (this.stream.ptr == .File) { - this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable); - } - return this.stream.toJS(); + if (this.* != .stream) { + const stream = this.buffer.toReadableStream(globalThis, exited); + this.* = .{ .stream = stream }; } - const is_fifo = this.buffer.is_fifo; - const stream = this.buffer.toReadableStream(globalThis, exited); - this.* = .{ .stream = stream }; + if (this.stream.ptr == .File) { - this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable); - this.stream.ptr.File.is_fifo = is_fifo; + this.stream.ptr.File.setSignal(JSC.WebCore.Signal.init(readable)); } - return stream.value; + + return this.stream.toJS(); } }; - pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable { + pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable { return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, .pipe => brk: { + _ = JSC.Node.Syscall.close(other_fd); break :brk .{ .pipe = .{ - .buffer = BufferedOutput{ - .fd = fd, - .is_fifo = true, - }, + .buffer = undefined, }, }; }, - .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }, + .path => Readable{ .ignore = {} }, + .blob, .fd => Readable{ .fd = @intCast(bun.FileDescriptor, fd) }, else => unreachable, }; } @@ -159,7 +165,7 @@ pub const Subprocess = struct { }, .pipe => { if (this.pipe == .stream and this.pipe.stream.ptr == .File) - this.pipe.stream.ptr.File.signal.clear(); + this.pipe.stream.ptr.File.readable().FIFO.signal.clear(); this.pipe.done(); }, else => {}, @@ -190,10 +196,8 @@ pub const Subprocess = struct { .pipe => { defer this.close(); - if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) { - if (this.pipe.buffer.canRead()) - this.pipe.buffer.readIfPossible(true); - } + if (this.pipe.buffer.canRead()) + this.pipe.buffer.readAll(); var bytes = this.pipe.buffer.internal_buffer.slice(); this.pipe.buffer.internal_buffer = .{}; @@ -216,6 +220,7 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { + this.observable_getters.insert(.stderr); return this.stderr.toJS(globalThis, this.exit_code != null); } @@ -223,6 +228,7 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { + this.observable_getters.insert(.stdin); return this.stdin.toJS(globalThis); } @@ -230,6 +236,7 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { + this.observable_getters.insert(.stdout); return this.stdout.toJS(globalThis, this.exit_code != null); } @@ -291,41 +298,22 @@ pub const Subprocess = struct { return .{ .result = {} }; } - pub fn onKill( - this: *Subprocess, - ) void { - if (this.killed) { - return; - } - - this.killed = true; - this.closePorts(); + fn hasCalledGetter(this: *Subprocess, comptime getter: @Type(.EnumLiteral)) bool { + return this.observable_getters.contains(getter); } - pub fn closePorts(this: *Subprocess) void { - const pidfd = this.pidfd; - - if (comptime Environment.isLinux) { - this.pidfd = std.math.maxInt(std.os.fd_t); + fn closeProcess(this: *Subprocess) void { + if (comptime !Environment.isLinux) { + return; } - defer { - if (comptime Environment.isLinux) { - if (pidfd != std.math.maxInt(std.os.fd_t)) { - _ = std.os.close(pidfd); - } - } - } + const pidfd = this.pidfd; - if (this.stdout == .pipe) { - this.stdout.pipe.finish(); - } + this.pidfd = std.math.maxInt(std.os.fd_t); - if (this.stderr == .pipe) { - this.stderr.pipe.finish(); + if (pidfd != std.math.maxInt(std.os.fd_t)) { + _ = std.os.close(pidfd); } - - this.stdin.close(); } pub fn doRef(this: *Subprocess, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSValue { @@ -354,7 +342,7 @@ pub const Subprocess = struct { pub const BufferedInput = struct { remain: []const u8 = "", - fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, + fd: bun.FileDescriptor = bun.invalid_fd, poll_ref: ?*JSC.FilePoll = null, written: usize = 0, @@ -366,6 +354,10 @@ pub const Subprocess = struct { pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady); pub fn onReady(this: *BufferedInput, _: i64) void { + if (this.fd == bun.invalid_fd) { + return; + } + this.write(); } @@ -395,10 +387,14 @@ pub const Subprocess = struct { } } - this.write(); + this.writeAllowBlocking(is_sync); } pub fn write(this: *BufferedInput) void { + this.writeAllowBlocking(false); + } + + pub fn writeAllowBlocking(this: *BufferedInput, allow_blocking: bool) void { var to_write = this.remain; if (to_write.len == 0) { @@ -450,7 +446,7 @@ pub const Subprocess = struct { to_write = to_write[bytes_written..]; // we are done or it accepts no more input - if (this.remain.len == 0 or bytes_written == 0) { + if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) { this.deinit(); return; } @@ -465,9 +461,9 @@ pub const Subprocess = struct { poll.deinit(); } - if (this.fd != JSC.Node.invalid_fd) { + if (this.fd != bun.invalid_fd) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = JSC.Node.invalid_fd; + this.fd = bun.invalid_fd; } } @@ -487,186 +483,137 @@ pub const Subprocess = struct { pub const BufferedOutput = struct { internal_buffer: bun.ByteList = .{}, - max_internal_buffer: u32 = default_max_buffer_size, - fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, - received_eof: bool = false, - pending_error: ?JSC.Node.Syscall.Error = null, - poll_ref: ?*JSC.FilePoll = null, - is_fifo: bool = false, + fifo: JSC.WebCore.FIFO = undefined, + auto_sizer: JSC.WebCore.AutoSizer = undefined, + status: Status = .{ + .pending = {}, + }, - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready); + pub const Status = union(enum) { + pending: void, + done: void, + err: JSC.Node.Syscall.Error, + }; - pub fn ready(this: *BufferedOutput, available_to_read: i64) void { - if (comptime Environment.isMac) { - if (this.poll_ref) |poll| { - if (available_to_read > 0) { - poll.flags.insert(.readable); - } else { - poll.flags.remove(.readable); - } - } - } + pub fn init(fd: bun.FileDescriptor) BufferedOutput { + return BufferedOutput{ + .internal_buffer = .{}, + .fifo = JSC.WebCore.FIFO{ + .fd = fd, + }, + }; + } - // TODO: what happens if the task was already enqueued after unwatch()? - this.readAll(false); + pub fn setup(this: *BufferedOutput, allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) void { + this.* = init(fd); + this.auto_sizer = .{ + .max = max_size, + .allocator = allocator, + .buffer = &this.internal_buffer, + }; + this.watch(); } pub fn canRead(this: *BufferedOutput) bool { - const is_readable = bun.isReadable(this.fd); - - if (is_readable) { - if (this.poll_ref) |poll_ref| { - poll_ref.flags.insert(.readable); - poll_ref.flags.insert(.fifo); - std.debug.assert(poll_ref.flags.contains(.poll_readable)); - } - } - - return is_readable; + return bun.isReadable(this.fifo.fd); } - pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void { - if (comptime !force) { - // we ask, "Is it possible to read right now?" - // we do this rather than epoll or kqueue() - // because we don't want to block the thread waiting for the read - // and because kqueue or epoll might return other unrelated events - // and we don't want this to become an event loop ticking point - if (!this.canRead()) { - this.watch(this.fd); + pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void { + switch (result) { + .pending => { + this.watch(); return; - } - } - - this.readAll(force); - } - - pub fn closeOnEOF(this: *BufferedOutput) bool { - var poll = this.poll_ref orelse return true; - poll.flags.insert(.eof); - return false; - } - - pub fn readAll(this: *BufferedOutput, comptime force: bool) void { - if (this.poll_ref) |poll| { - const is_readable = poll.isReadable(); - if (!is_readable and poll.isEOF()) { - if (poll.isHUP()) { - this.autoCloseFileDescriptor(); - } + }, + .err => |err| { + this.status = .{ .err = err }; + this.fifo.close(); return; - } else if (!is_readable and poll.isHUP()) { - this.autoCloseFileDescriptor(); - return; - } else if (!is_readable) { + }, + .done => { + this.status = .{ .done = {} }; + this.fifo.close(); return; - } + }, + else => { + const slice = result.slice(); + this.internal_buffer.len += @truncate(u32, slice.len); + if (slice.len > 0) + std.debug.assert(this.internal_buffer.contains(slice)); + + if (result.isDone()) { + this.status = .{ .done = {} }; + this.fifo.close(); + } + }, } + } - // read as much as we can from the pipe - while (this.internal_buffer.len < this.max_internal_buffer) { - var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined; - - var buf: []u8 = buffer_[0..]; - - var available = this.internal_buffer.ptr[this.internal_buffer.len..this.internal_buffer.cap]; - if (available.len >= buf.len) { - buf = available; + pub fn readAll(this: *BufferedOutput) void { + while (@as(usize, this.internal_buffer.len) < this.auto_sizer.max and this.status == .pending) { + var stack_buffer: [8096]u8 = undefined; + var stack_buf: []u8 = stack_buffer[0..]; + var buf_to_use = stack_buf; + var available = this.internal_buffer.available(); + if (available.len >= stack_buf.len) { + buf_to_use = available; } - switch (JSC.Node.Syscall.read(this.fd, buf)) { - .err => |e| { - if (e.isRetry()) { - if (!this.isWatching() and this.isFIFO()) - this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); - return; - } - - if (comptime Environment.isMac) { - // INTR is returned on macOS when the process is killed - // It probably sent SIGPIPE but we have the handler for - // that disabled. - // We know it's the "real" INTR because we use read$NOCANCEL - if (e.getErrno() == .INTR) { - this.received_eof = true; - this.autoCloseFileDescriptor(); - return; - } - } else { - if (comptime Environment.allow_assert) { - std.debug.assert(e.getErrno() != .INTR); // Bun's read() function should retry on EINTR - } - } + const result = this.fifo.read(buf_to_use, this.fifo.to_read); - // fail - log("readAll() fail: {s}", .{@tagName(e.getErrno())}); - this.pending_error = e; - this.internal_buffer.listManaged(bun.default_allocator).deinit(); - this.internal_buffer = .{}; + switch (result) { + .pending => { + this.watch(); return; }, + .err => |err| { + this.status = .{ .err = err }; + this.fifo.close(); - .result => |bytes_read| { - log("readAll() {d}", .{bytes_read}); - - if (bytes_read > 0) { - if (buf.ptr == available.ptr) { - this.internal_buffer.len += @truncate(u32, bytes_read); - } else { - _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory"); - } - } - - if (comptime !force) { - if (buf[bytes_read..].len > 0 or !this.canRead()) { - if (!this.isWatching()) - this.watch(this.fd); - if (this.is_fifo) - this.poll_ref.?.flags.insert(.fifo) - else - this.received_eof = true; - return; - } + return; + }, + .done => { + this.status = .{ .done = {} }; + this.fifo.close(); + return; + }, + .read => |slice| { + if (slice.ptr == stack_buf.ptr) { + this.internal_buffer.append(this.auto_sizer.allocator, slice) catch @panic("out of memory"); } else { - // we consider a short read as being EOF - this.received_eof = !this.is_fifo and this.received_eof or bytes_read < buf.len; - if (this.received_eof) { - if (this.closeOnEOF()) { - this.autoCloseFileDescriptor(); - } + this.internal_buffer.len += @truncate(u32, slice.len); + } - // do not auto-close the file descriptor here - // it's totally legit to have a short read - return; - } + if (slice.len < buf_to_use.len) { + this.watch(); + return; } }, } } } + fn watch(this: *BufferedOutput) void { + this.fifo.pending.set(BufferedOutput, this, onRead); + if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd); + return; + } + pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob { const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis); this.internal_buffer = bun.ByteList.init(""); - std.debug.assert(this.fd == JSC.Node.invalid_fd); - std.debug.assert(this.received_eof); return blob; } pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream { if (exited) { // exited + received EOF => no more read() - if (this.received_eof) { - var poll_ref = this.poll_ref; - this.poll_ref = null; - - this.autoCloseFileDescriptor(); - + if (this.fifo.isClosed()) { // also no data at all if (this.internal_buffer.len == 0) { - this.close(); + if (this.internal_buffer.cap > 0) { + this.internal_buffer.deinitWithAllocator(this.auto_sizer.allocator); + } // so we return an empty stream return JSC.WebCore.ReadableStream.fromJS( JSC.WebCore.ReadableStream.empty(globalThis), @@ -675,70 +622,52 @@ pub const Subprocess = struct { } return JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlobWithPoll( + JSC.WebCore.ReadableStream.fromBlob( globalThis, &this.toBlob(globalThis), 0, - poll_ref, ), globalThis, ).?; + } else { + this.fifo.close_on_empty_read = true; } } - std.debug.assert(this.fd != JSC.Node.invalid_fd); { - var poll_ref = this.poll_ref; - this.poll_ref = null; + const internal_buffer = this.internal_buffer; + this.internal_buffer = bun.ByteList.init(""); // There could still be data waiting to be read in the pipe // so we need to create a new stream that will read from the // pipe and then return the blob. - var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis); const result = JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlobWithPoll( + JSC.WebCore.ReadableStream.fromFIFO( globalThis, - &blob, - 0, - poll_ref, + &this.fifo, + internal_buffer, ), globalThis, ).?; - blob.detach(); - result.ptr.File.buffered_data = this.internal_buffer; - result.ptr.File.stored_global_this_ = globalThis; - result.ptr.File.finished = exited; - this.internal_buffer = bun.ByteList.init(""); - this.fd = JSC.Node.invalid_fd; - this.received_eof = false; return result; } } - pub fn autoCloseFileDescriptor(this: *BufferedOutput) void { - const fd = this.fd; - if (fd == JSC.Node.invalid_fd) - return; - this.fd = JSC.Node.invalid_fd; - - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); - } - - _ = JSC.Node.Syscall.close(fd); - } - pub fn close(this: *BufferedOutput) void { - this.autoCloseFileDescriptor(); + switch (this.status) { + .done => {}, + .pending => { + this.fifo.close(); + this.status = .{ .done = {} }; + }, + .err => {}, + } if (this.internal_buffer.cap > 0) { this.internal_buffer.listManaged(bun.default_allocator).deinit(); this.internal_buffer = .{}; } - - this.received_eof = true; } }; @@ -748,7 +677,7 @@ pub const Subprocess = struct { pipe: *JSC.WebCore.FileSink, readable_stream: JSC.WebCore.ReadableStream, }, - fd: JSC.Node.FileDescriptor, + fd: bun.FileDescriptor, buffered_input: BufferedInput, inherit: void, ignore: void, @@ -763,7 +692,7 @@ pub const Subprocess = struct { pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {} pub fn onStart(_: *Writable) void {} - pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { + pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { switch (stdio) { .pipe => { var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); @@ -771,7 +700,9 @@ pub const Subprocess = struct { .fd = fd, .buffer = bun.ByteList.init(&.{}), .allocator = globalThis.bunVM().allocator, + .auto_close = true, }; + if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd); sink.mode = std.os.S.IFIFO; if (stdio == .pipe) { if (stdio.pipe) |readable| { @@ -787,6 +718,7 @@ pub const Subprocess = struct { return Writable{ .pipe = sink }; }, .array_buffer, .blob => { + if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd); var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined }; switch (stdio) { .array_buffer => |array_buffer| { @@ -800,7 +732,7 @@ pub const Subprocess = struct { return Writable{ .buffered_input = buffered_input }; }, .fd => { - return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }; + return Writable{ .fd = @intCast(bun.FileDescriptor, fd) }; }, .inherit => { return Writable{ .inherit = {} }; @@ -842,15 +774,21 @@ pub const Subprocess = struct { } }; - pub fn finalize(this: *Subprocess) callconv(.C) void { - this.unref(); - this.closePorts(); - this.stdout.close(); + pub fn finalizeSync(this: *Subprocess) void { + this.closeProcess(); + this.stdin.close(); this.stderr.close(); + this.stdin.close(); - this.finalized = true; - bun.default_allocator.destroy(this); + this.exit_promise.deinit(); + this.on_exit_callback.deinit(); + } + + pub fn finalize(this: *Subprocess) callconv(.C) void { + std.debug.assert(!this.hasPendingActivity()); + this.finalizeSync(); log("Finalize", .{}); + bun.default_allocator.destroy(this); } pub fn getExited( @@ -1203,18 +1141,27 @@ pub const Subprocess = struct { .globalThis = globalThis, .pid = pid, .pidfd = pidfd, - .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch { + .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch { globalThis.throw("out of memory", .{}); return .zero; }, - .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis), - .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis), + .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis), + .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, + .is_sync = is_sync, }; if (subprocess.stdin == .pipe) { subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin); } + if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { + subprocess.stdout.pipe.buffer.setup(jsc_vm.allocator, stdout_pipe[0], default_max_buffer_size); + } + + if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { + subprocess.stderr.pipe.buffer.setup(jsc_vm.allocator, stderr_pipe[0], default_max_buffer_size); + } + const out = if (comptime !is_sync) subprocess.toJS(globalThis) else @@ -1223,7 +1170,7 @@ pub const Subprocess = struct { if (comptime !is_sync) { var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess); subprocess.poll_ref = poll; - switch (poll.register( + switch (subprocess.poll_ref.?.register( jsc_vm.uws_event_loop.?, .process, true, @@ -1252,20 +1199,20 @@ pub const Subprocess = struct { if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { if (comptime is_sync) { if (subprocess.stdout.pipe.buffer.canRead()) { - subprocess.stdout.pipe.buffer.readAll(true); + subprocess.stdout.pipe.buffer.readAll(); } } else if (!lazy) { - subprocess.stdout.pipe.buffer.readIfPossible(false); + subprocess.stdout.pipe.buffer.readAll(); } } if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { if (comptime is_sync) { if (subprocess.stderr.pipe.buffer.canRead()) { - subprocess.stderr.pipe.buffer.readAll(true); + subprocess.stderr.pipe.buffer.readAll(); } } else if (!lazy) { - subprocess.stderr.pipe.buffer.readIfPossible(false); + subprocess.stderr.pipe.buffer.readAll(); } } @@ -1273,11 +1220,43 @@ pub const Subprocess = struct { return out; } - subprocess.wait(true); + if (subprocess.stdin == .buffered_input) { + while (subprocess.stdin.buffered_input.remain.len > 0) { + subprocess.stdin.buffered_input.writeIfPossible(true); + } + } + + { + var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess); + subprocess.poll_ref = poll; + switch (subprocess.poll_ref.?.register( + jsc_vm.uws_event_loop.?, + .process, + true, + )) { + .result => {}, + .err => |err| { + if (err.getErrno() == .SRCH) { + @panic("This shouldn't happen"); + } + + // process has already exited + // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007 + subprocess.onExitNotification(); + }, + } + } + + while (!subprocess.hasExited()) { + jsc_vm.tick(); + jsc_vm.eventLoop().autoTick(); + } + + // subprocess.wait(true); const exitCode = subprocess.exit_code orelse 1; const stdout = subprocess.stdout.toBufferedValue(globalThis); const stderr = subprocess.stderr.toBufferedValue(globalThis); - subprocess.finalize(); + subprocess.finalizeSync(); const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4); sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode))); @@ -1290,16 +1269,22 @@ pub const Subprocess = struct { pub fn onExitNotification( this: *Subprocess, ) void { - this.wait(false); + this.wait(this.is_sync); } pub fn wait(this: *Subprocess, sync: bool) void { if (this.has_waitpid_task) { return; } - + defer this.updateHasPendingActivityFlag(); this.has_waitpid_task = true; const pid = this.pid; + + if (!sync) { + // signal to the other end we are definitely done + this.stdin.close(); + } + switch (PosixSpawn.waitpid(pid, 0)) { .err => |err| { this.waitpid_err = err; @@ -1331,14 +1316,16 @@ pub const Subprocess = struct { poll.deinitWithVM(vm); } - this.onExit(); + this.onExit(this.globalThis); } } - fn onExit(this: *Subprocess) void { - defer this.updateHasPendingActivity(); - this.closePorts(); + fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void { + this.stdin.close(); + this.stdout.close(); + this.stderr.close(); + defer this.updateHasPendingActivity(); this.has_waitpid_task = false; if (this.on_exit_callback.trySwap()) |callback| { @@ -1349,7 +1336,7 @@ pub const Subprocess = struct { const waitpid_value: JSValue = if (this.waitpid_err) |err| - err.toJSC(this.globalThis) + err.toJSC(globalThis) else JSC.JSValue.jsUndefined(); @@ -1359,21 +1346,21 @@ pub const Subprocess = struct { }; const result = callback.call( - this.globalThis, + globalThis, args[0 .. @as(usize, @boolToInt(this.exit_code != null)) + @as(usize, @boolToInt(this.waitpid_err != null))], ); - if (result.isAnyError(this.globalThis)) { - this.globalThis.bunVM().onUnhandledError(this.globalThis, result); + if (result.isAnyError(globalThis)) { + globalThis.bunVM().onUnhandledError(globalThis, result); } } if (this.exit_promise.trySwap()) |promise| { if (this.exit_code) |code| { - promise.asPromise().?.resolve(this.globalThis, JSValue.jsNumber(code)); + promise.asPromise().?.resolve(globalThis, JSValue.jsNumber(code)); } else if (this.waitpid_err) |err| { this.waitpid_err = null; - promise.asPromise().?.reject(this.globalThis, err.toJSC(this.globalThis)); + promise.asPromise().?.reject(globalThis, err.toJSC(globalThis)); } else { // crash in debug mode if (comptime Environment.allow_assert) @@ -1395,7 +1382,7 @@ pub const Subprocess = struct { const Stdio = union(enum) { inherit: void, ignore: void, - fd: JSC.Node.FileDescriptor, + fd: bun.FileDescriptor, path: JSC.Node.PathLike, blob: JSC.WebCore.AnyBlob, pipe: ?JSC.WebCore.ReadableStream, @@ -1454,7 +1441,7 @@ pub const Subprocess = struct { if (blob.needsToReadFile()) { if (blob.store()) |store| { if (store.data.file.pathlike == .fd) { - if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) { + if (store.data.file.pathlike.fd == @intCast(bun.FileDescriptor, i)) { stdio_array[i] = Stdio{ .inherit = {} }; } else { switch (@intCast(std.os.fd_t, i)) { @@ -1520,7 +1507,7 @@ pub const Subprocess = struct { return false; } - const fd = @intCast(JSC.Node.FileDescriptor, fd_); + const fd = @intCast(bun.FileDescriptor, fd_); switch (@intCast(std.os.fd_t, i)) { std.os.STDIN_FILENO => { diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index 91248dc5c..e99e49f07 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -817,8 +817,7 @@ fn HandlerCallback( ) (fn (*HandlerType, *LOLHTMLType) bool) { return struct { pub fn callback(this: *HandlerType, value: *LOLHTMLType) bool { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); var zig_element = bun.default_allocator.create(ZigType) catch unreachable; @field(zig_element, field_name) = value; // At the end of this scope, the value is no longer valid diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index c4724b1b8..531d4830b 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1428,10 +1428,17 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp stream.value.ensureStillAlive(); - if (!stream.isLocked(this.server.globalThis)) { - streamLog("is not locked", .{}); - this.renderMissing(); - return; + const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and + response_stream.sink.buffer.len == 0); + + if (!stream.isLocked(this.server.globalThis) and !is_in_progress) { + if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| { + if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) { + streamLog("is not locked", .{}); + this.renderMissing(); + return; + } + } } this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); |