diff options
Diffstat (limited to 'src/bun.js/api')
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 167 |
1 files changed, 142 insertions, 25 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 14febbd00..d803704f3 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -51,6 +51,11 @@ pub const Subprocess = struct { stdout, stderr, }) = .{}, + closed: std.enums.EnumSet(enum { + stdin, + stdout, + stderr, + }) = .{}, has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), is_sync: bool = false, @@ -76,11 +81,33 @@ pub const Subprocess = struct { pub fn ref(this: *Subprocess) void { var vm = this.globalThis.bunVM(); if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm); + if (!this.hasCalledGetter(.stdin)) { + this.stdin.ref(); + } + + if (!this.hasCalledGetter(.stdout)) { + this.stdout.ref(); + } + + if (!this.hasCalledGetter(.stderr)) { + this.stdout.ref(); + } } pub fn unref(this: *Subprocess) void { var vm = this.globalThis.bunVM(); if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm); + if (!this.hasCalledGetter(.stdin)) { + this.stdin.unref(); + } + + if (!this.hasCalledGetter(.stdout)) { + this.stdout.unref(); + } + + if (!this.hasCalledGetter(.stderr)) { + this.stdout.unref(); + } } pub fn constructor( @@ -98,6 +125,32 @@ pub const Subprocess = struct { ignore: void, closed: void, + pub fn ref(this: *Readable) void { + switch (this.*) { + .pipe => { + if (this.pipe == .buffer) { + if (this.pipe.buffer.fifo.poll_ref) |poll| { + poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm); + } + } + }, + else => {}, + } + } + + pub fn unref(this: *Readable) void { + switch (this.*) { + .pipe => { + if (this.pipe == .buffer) { + if (this.pipe.buffer.fifo.poll_ref) |poll| { + poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm); + } + } + }, + else => {}, + } + } + pub const Pipe = union(enum) { stream: JSC.WebCore.ReadableStream, buffer: BufferedOutput, @@ -167,8 +220,23 @@ pub const Subprocess = struct { }, else => {}, } + } - this.* = .closed; + pub fn finalize(this: *Readable) void { + switch (this.*) { + .fd => |fd| { + _ = JSC.Node.Syscall.close(fd); + }, + .pipe => { + if (this.pipe == .stream and this.pipe.stream.ptr == .File) { + this.close(); + return; + } + + this.pipe.buffer.close(); + }, + else => {}, + } } pub fn toJS(this: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue { @@ -191,10 +259,8 @@ pub const Subprocess = struct { return JSValue.jsNumber(fd); }, .pipe => { - defer this.close(); - - if (this.pipe.buffer.canRead()) - this.pipe.buffer.readAll(); + this.pipe.buffer.fifo.close_on_empty_read = true; + this.pipe.buffer.readAll(); var bytes = this.pipe.buffer.internal_buffer.slice(); this.pipe.buffer.internal_buffer = .{}; @@ -519,11 +585,6 @@ pub const Subprocess = struct { .allocator = allocator, .buffer = &this.internal_buffer, }; - this.watch(); - } - - pub fn canRead(this: *BufferedOutput) bool { - return bun.isReadable(this.fifo.fd) == .ready; } pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void { @@ -549,7 +610,7 @@ pub const Subprocess = struct { if (slice.len > 0) std.debug.assert(this.internal_buffer.contains(slice)); - if (result.isDone()) { + if (result.isDone() or (slice.len == 0 and this.fifo.poll_ref != null and this.fifo.poll_ref.?.isHUP())) { this.status = .{ .done = {} }; this.fifo.close(); } @@ -602,6 +663,8 @@ pub const Subprocess = struct { } fn watch(this: *BufferedOutput) void { + std.debug.assert(this.fifo.fd != bun.invalid_fd); + this.fifo.pending.set(BufferedOutput, this, onRead); if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd); return; @@ -637,8 +700,6 @@ pub const Subprocess = struct { ), globalThis, ).?; - } else { - this.fifo.close_on_empty_read = true; } } @@ -657,7 +718,8 @@ pub const Subprocess = struct { ), globalThis, ).?; - + this.fifo.fd = bun.invalid_fd; + this.fifo.poll_ref = null; return result; } } @@ -690,6 +752,28 @@ pub const Subprocess = struct { inherit: void, ignore: void, + pub fn ref(this: *Writable) void { + switch (this.*) { + .pipe => { + if (this.pipe.poll_ref) |poll| { + poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm); + } + }, + else => {}, + } + } + + pub fn unref(this: *Writable) void { + switch (this.*) { + .pipe => { + if (this.pipe.poll_ref) |poll| { + poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm); + } + }, + else => {}, + } + } + // When the stream has closed we need to be notified to prevent a use-after-free // We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice pub fn onClose(this: *Writable, _: ?JSC.Node.Syscall.Error) void { @@ -761,7 +845,7 @@ pub const Subprocess = struct { }; } - pub fn close(this: *Writable) void { + pub fn finalize(this: *Writable) void { return switch (this.*) { .pipe => |pipe| { pipe.close(); @@ -780,13 +864,50 @@ pub const Subprocess = struct { .inherit => {}, }; } + + pub fn close(this: *Writable) void { + return switch (this.*) { + .pipe => {}, + .pipe_to_readable_stream => |*pipe_to_readable_stream| { + _ = pipe_to_readable_stream.pipe.end(null); + }, + .fd => |fd| { + _ = JSC.Node.Syscall.close(fd); + this.* = .{ .ignore = {} }; + }, + .buffered_input => { + this.buffered_input.deinit(); + }, + .ignore => {}, + .inherit => {}, + }; + } }; + fn closeIO(this: *Subprocess, comptime io: @Type(.EnumLiteral)) void { + if (this.closed.contains(io)) return; + this.closed.insert(io); + + // If you never referenced stdout/stderr, they won't be garbage collected. + // + // That means: + // 1. We need to stop watching them + // 2. We need to free the memory + // 3. We need to halt any pending reads (1) + if (!this.hasCalledGetter(io)) { + @field(this, @tagName(io)).finalize(); + } else { + @field(this, @tagName(io)).close(); + } + } + + // This must only be run once per Subprocess pub fn finalizeSync(this: *Subprocess) void { this.closeProcess(); - this.stdin.close(); - this.stderr.close(); - this.stdout.close(); + + this.closeIO(.stdin); + this.closeIO(.stdout); + this.closeIO(.stderr); this.exit_promise.deinit(); this.on_exit_callback.deinit(); @@ -1220,9 +1341,7 @@ pub const Subprocess = struct { if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { if (comptime is_sync) { - if (subprocess.stdout.pipe.buffer.canRead()) { - subprocess.stdout.pipe.buffer.readAll(); - } + subprocess.stdout.pipe.buffer.readAll(); } else if (!lazy) { subprocess.stdout.pipe.buffer.readAll(); } @@ -1230,9 +1349,7 @@ pub const Subprocess = struct { if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { if (comptime is_sync) { - if (subprocess.stderr.pipe.buffer.canRead()) { - subprocess.stderr.pipe.buffer.readAll(); - } + subprocess.stderr.pipe.buffer.readAll(); } else if (!lazy) { subprocess.stderr.pipe.buffer.readAll(); } @@ -1298,7 +1415,7 @@ pub const Subprocess = struct { if (this.has_waitpid_task) { return; } - defer this.updateHasPendingActivityFlag(); + defer if (sync) this.updateHasPendingActivityFlag(); this.has_waitpid_task = true; const pid = this.pid; |