aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-28 23:00:22 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-28 23:00:22 -0800
commit887496bcf9bc3e87ca18637f4cd059eecc324102 (patch)
tree98d391fa46ac7cba84a5743131bef5c6a4dda979 /src/bun.js/api
parentda4376103205bc9bdb810fee5cc8d343d04f36ef (diff)
downloadbun-887496bcf9bc3e87ca18637f4cd059eecc324102.tar.gz
bun-887496bcf9bc3e87ca18637f4cd059eecc324102.tar.zst
bun-887496bcf9bc3e87ca18637f4cd059eecc324102.zip
Fix failing spawn() and spawnSync() tests
cc @ThatOneBro
Diffstat (limited to 'src/bun.js/api')
-rw-r--r--src/bun.js/api/bun/subprocess.zig167
1 files changed, 142 insertions, 25 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 14febbd00..d803704f3 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -51,6 +51,11 @@ pub const Subprocess = struct {
stdout,
stderr,
}) = .{},
+ closed: std.enums.EnumSet(enum {
+ stdin,
+ stdout,
+ stderr,
+ }) = .{},
has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
is_sync: bool = false,
@@ -76,11 +81,33 @@ pub const Subprocess = struct {
pub fn ref(this: *Subprocess) void {
var vm = this.globalThis.bunVM();
if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm);
+ if (!this.hasCalledGetter(.stdin)) {
+ this.stdin.ref();
+ }
+
+ if (!this.hasCalledGetter(.stdout)) {
+ this.stdout.ref();
+ }
+
+ if (!this.hasCalledGetter(.stderr)) {
+ this.stdout.ref();
+ }
}
pub fn unref(this: *Subprocess) void {
var vm = this.globalThis.bunVM();
if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm);
+ if (!this.hasCalledGetter(.stdin)) {
+ this.stdin.unref();
+ }
+
+ if (!this.hasCalledGetter(.stdout)) {
+ this.stdout.unref();
+ }
+
+ if (!this.hasCalledGetter(.stderr)) {
+ this.stdout.unref();
+ }
}
pub fn constructor(
@@ -98,6 +125,32 @@ pub const Subprocess = struct {
ignore: void,
closed: void,
+ pub fn ref(this: *Readable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe == .buffer) {
+ if (this.pipe.buffer.fifo.poll_ref) |poll| {
+ poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ }
+ },
+ else => {},
+ }
+ }
+
+ pub fn unref(this: *Readable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe == .buffer) {
+ if (this.pipe.buffer.fifo.poll_ref) |poll| {
+ poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ }
+ },
+ else => {},
+ }
+ }
+
pub const Pipe = union(enum) {
stream: JSC.WebCore.ReadableStream,
buffer: BufferedOutput,
@@ -167,8 +220,23 @@ pub const Subprocess = struct {
},
else => {},
}
+ }
- this.* = .closed;
+ pub fn finalize(this: *Readable) void {
+ switch (this.*) {
+ .fd => |fd| {
+ _ = JSC.Node.Syscall.close(fd);
+ },
+ .pipe => {
+ if (this.pipe == .stream and this.pipe.stream.ptr == .File) {
+ this.close();
+ return;
+ }
+
+ this.pipe.buffer.close();
+ },
+ else => {},
+ }
}
pub fn toJS(this: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue {
@@ -191,10 +259,8 @@ pub const Subprocess = struct {
return JSValue.jsNumber(fd);
},
.pipe => {
- defer this.close();
-
- if (this.pipe.buffer.canRead())
- this.pipe.buffer.readAll();
+ this.pipe.buffer.fifo.close_on_empty_read = true;
+ this.pipe.buffer.readAll();
var bytes = this.pipe.buffer.internal_buffer.slice();
this.pipe.buffer.internal_buffer = .{};
@@ -519,11 +585,6 @@ pub const Subprocess = struct {
.allocator = allocator,
.buffer = &this.internal_buffer,
};
- this.watch();
- }
-
- pub fn canRead(this: *BufferedOutput) bool {
- return bun.isReadable(this.fifo.fd) == .ready;
}
pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void {
@@ -549,7 +610,7 @@ pub const Subprocess = struct {
if (slice.len > 0)
std.debug.assert(this.internal_buffer.contains(slice));
- if (result.isDone()) {
+ if (result.isDone() or (slice.len == 0 and this.fifo.poll_ref != null and this.fifo.poll_ref.?.isHUP())) {
this.status = .{ .done = {} };
this.fifo.close();
}
@@ -602,6 +663,8 @@ pub const Subprocess = struct {
}
fn watch(this: *BufferedOutput) void {
+ std.debug.assert(this.fifo.fd != bun.invalid_fd);
+
this.fifo.pending.set(BufferedOutput, this, onRead);
if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd);
return;
@@ -637,8 +700,6 @@ pub const Subprocess = struct {
),
globalThis,
).?;
- } else {
- this.fifo.close_on_empty_read = true;
}
}
@@ -657,7 +718,8 @@ pub const Subprocess = struct {
),
globalThis,
).?;
-
+ this.fifo.fd = bun.invalid_fd;
+ this.fifo.poll_ref = null;
return result;
}
}
@@ -690,6 +752,28 @@ pub const Subprocess = struct {
inherit: void,
ignore: void,
+ pub fn ref(this: *Writable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe.poll_ref) |poll| {
+ poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ },
+ else => {},
+ }
+ }
+
+ pub fn unref(this: *Writable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe.poll_ref) |poll| {
+ poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ },
+ else => {},
+ }
+ }
+
// When the stream has closed we need to be notified to prevent a use-after-free
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
pub fn onClose(this: *Writable, _: ?JSC.Node.Syscall.Error) void {
@@ -761,7 +845,7 @@ pub const Subprocess = struct {
};
}
- pub fn close(this: *Writable) void {
+ pub fn finalize(this: *Writable) void {
return switch (this.*) {
.pipe => |pipe| {
pipe.close();
@@ -780,13 +864,50 @@ pub const Subprocess = struct {
.inherit => {},
};
}
+
+ pub fn close(this: *Writable) void {
+ return switch (this.*) {
+ .pipe => {},
+ .pipe_to_readable_stream => |*pipe_to_readable_stream| {
+ _ = pipe_to_readable_stream.pipe.end(null);
+ },
+ .fd => |fd| {
+ _ = JSC.Node.Syscall.close(fd);
+ this.* = .{ .ignore = {} };
+ },
+ .buffered_input => {
+ this.buffered_input.deinit();
+ },
+ .ignore => {},
+ .inherit => {},
+ };
+ }
};
+ fn closeIO(this: *Subprocess, comptime io: @Type(.EnumLiteral)) void {
+ if (this.closed.contains(io)) return;
+ this.closed.insert(io);
+
+ // If you never referenced stdout/stderr, they won't be garbage collected.
+ //
+ // That means:
+ // 1. We need to stop watching them
+ // 2. We need to free the memory
+ // 3. We need to halt any pending reads (1)
+ if (!this.hasCalledGetter(io)) {
+ @field(this, @tagName(io)).finalize();
+ } else {
+ @field(this, @tagName(io)).close();
+ }
+ }
+
+ // This must only be run once per Subprocess
pub fn finalizeSync(this: *Subprocess) void {
this.closeProcess();
- this.stdin.close();
- this.stderr.close();
- this.stdout.close();
+
+ this.closeIO(.stdin);
+ this.closeIO(.stdout);
+ this.closeIO(.stderr);
this.exit_promise.deinit();
this.on_exit_callback.deinit();
@@ -1220,9 +1341,7 @@ pub const Subprocess = struct {
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
if (comptime is_sync) {
- if (subprocess.stdout.pipe.buffer.canRead()) {
- subprocess.stdout.pipe.buffer.readAll();
- }
+ subprocess.stdout.pipe.buffer.readAll();
} else if (!lazy) {
subprocess.stdout.pipe.buffer.readAll();
}
@@ -1230,9 +1349,7 @@ pub const Subprocess = struct {
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
if (comptime is_sync) {
- if (subprocess.stderr.pipe.buffer.canRead()) {
- subprocess.stderr.pipe.buffer.readAll();
- }
+ subprocess.stderr.pipe.buffer.readAll();
} else if (!lazy) {
subprocess.stderr.pipe.buffer.readAll();
}
@@ -1298,7 +1415,7 @@ pub const Subprocess = struct {
if (this.has_waitpid_task) {
return;
}
- defer this.updateHasPendingActivityFlag();
+ defer if (sync) this.updateHasPendingActivityFlag();
this.has_waitpid_task = true;
const pid = this.pid;