diff options
-rw-r--r-- | src/bun.js/api/bun.zig | 3 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 230 | ||||
-rw-r--r-- | test/bun.js/log-test.test.ts | 20 | ||||
-rw-r--r-- | test/bun.js/spawn.test.ts | 42 |
4 files changed, 214 insertions, 81 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index 007538a6a..ca84bbad0 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -1198,6 +1198,9 @@ pub const Class = NewClass( .spawn = .{ .rfn = JSC.wrapWithHasContainer(JSC.Subprocess, "spawn", false, false, false), }, + .spawnSync = .{ + .rfn = JSC.wrapWithHasContainer(JSC.Subprocess, "spawnSync", false, false, false), + }, }, .{ .main = .{ diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index b6dfc666d..aec961056 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -174,6 +174,37 @@ pub const Subprocess = struct { }, } } + + pub fn toBufferedValue(this: *Readable, globalThis: *JSC.JSGlobalObject) JSValue { + switch (this.*) { + .fd => |fd| { + return JSValue.jsNumber(fd); + }, + .pipe => { + defer this.close(); + + // TODO: handle when there's pending unread data in the pipe + // For some reason, this currently hangs forever + if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.pipe.buffer.canRead()) + this.pipe.buffer.readIfPossible(true); + } + + var bytes = this.pipe.buffer.internal_buffer.slice(); + this.pipe.buffer.internal_buffer = .{}; + + if (bytes.len > 0) { + // Return a Buffer so that they can do .toString() on it + return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator); + } + + return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator); + }, + else => { + return JSValue.jsUndefined(); + }, + } + } }; pub fn getStderr( @@ -321,27 +352,30 @@ pub const Subprocess = struct { pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady); - pub fn onReady(this: *BufferedInput, size_or_offset: i64) void { - this.write(@intCast(usize, @maximum(size_or_offset, 0))); + pub fn onReady(this: *BufferedInput, _: i64) void { + this.write(); } pub fn canWrite(this: *BufferedInput) bool { return bun.isWritable(this.fd); } - pub fn writeIfPossible(this: *BufferedInput) void { - // we ask, "Is it possible to write right now?" - // we do this rather than epoll or kqueue() - // because we don't want to block the thread waiting for the write - if (!this.canWrite()) { - this.watch(this.fd); - return; + pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void { + if (comptime !is_sync) { + + // we ask, "Is it possible to write right now?" + // we do this rather than epoll or kqueue() + // because we don't want to block the thread waiting for the write + if (!this.canWrite()) { + this.watch(this.fd); + return; + } } - this.write(0); + this.write(); } - pub fn write(this: *BufferedInput, _: usize) void { + pub fn write(this: *BufferedInput) void { var to_write = this.remain; if (to_write.len == 0) { @@ -367,6 +401,11 @@ pub const Subprocess = struct { return; } + if (e.getErrno() == .PIPE) { + this.deinit(); + return; + } + // fail log("write({d}) fail: {d}", .{ to_write.len, e.errno }); this.deinit(); @@ -432,28 +471,30 @@ pub const Subprocess = struct { pub fn ready(this: *BufferedOutput, _: i64) void { // TODO: what happens if the task was already enqueued after unwatch()? - this.readAll(); + this.readAll(false); } pub fn canRead(this: *BufferedOutput) bool { return bun.isReadable(this.fd); } - pub fn readIfPossible(this: *BufferedOutput) void { - // we ask, "Is it possible to read right now?" - // we do this rather than epoll or kqueue() - // because we don't want to block the thread waiting for the read - if (!this.canRead()) { - this.watch(this.fd); - return; + pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void { + if (comptime !force) { + // we ask, "Is it possible to read right now?" + // we do this rather than epoll or kqueue() + // because we don't want to block the thread waiting for the read + // and because kqueue or epoll might return other unrelated events + // and we don't want this to become an event loop ticking point + if (!this.canRead()) { + this.watch(this.fd); + return; + } } - this.readAll(); + this.readAll(force); } - pub fn readAll( - this: *BufferedOutput, - ) void { + pub fn readAll(this: *BufferedOutput, comptime force: bool) void { // read as much as we can from the pipe while (this.internal_buffer.len <= this.max_internal_buffer) { var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined; @@ -465,10 +506,6 @@ pub const Subprocess = struct { buf = available; } - if (comptime bun.Environment.allow_assert) { - // bun.assertNonBlocking(this.fd); - } - switch (JSC.Node.Syscall.read(this.fd, buf)) { .err => |e| { if (e.isRetry()) { @@ -476,6 +513,16 @@ pub const Subprocess = struct { return; } + // INTR is returned on macOS when the process is killed + // It probably sent SIGPIPE but we have the handler for + // that disabled. + // We know it's the "real" INTR because we use read$NOCANCEL + if (e.getErrno() == .INTR) { + this.received_eof = true; + this.autoCloseFileDescriptor(); + return; + } + // fail log("readAll() fail: {s}", .{@tagName(e.getErrno())}); this.pending_error = e; @@ -495,10 +542,19 @@ pub const Subprocess = struct { } } - if (buf[bytes_read..].len > 0 or !this.canRead()) { - this.watch(this.fd); - this.received_eof = true; - return; + if (comptime !force) { + if (buf[bytes_read..].len > 0 or !this.canRead()) { + this.watch(this.fd); + this.received_eof = true; + return; + } + } else { + // we consider a short read as being EOF + this.received_eof = this.received_eof or bytes_read < buf.len; + if (this.received_eof) { + this.autoCloseFileDescriptor(); + return; + } } }, } @@ -732,6 +788,18 @@ pub const Subprocess = struct { } pub fn spawn(globalThis: *JSC.JSGlobalObject, args: JSValue) JSValue { + return spawnMaybeSync(globalThis, args, false); + } + + pub fn spawnSync(globalThis: *JSC.JSGlobalObject, args: JSValue) JSValue { + return spawnMaybeSync(globalThis, args, true); + } + + pub fn spawnMaybeSync( + globalThis: *JSC.JSGlobalObject, + args: JSValue, + comptime is_sync: bool, + ) JSValue { var arena = std.heap.ArenaAllocator.init(bun.default_allocator); defer arena.deinit(); var allocator = arena.allocator(); @@ -751,6 +819,11 @@ pub const Subprocess = struct { .{ .pipe = null }, }; + if (comptime is_sync) { + stdio[1] = .{ .pipe = null }; + stdio[2] = .{ .pipe = null }; + } + var on_exit_callback = JSValue.zero; var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse ""; var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined; @@ -992,7 +1065,7 @@ pub const Subprocess = struct { const kernel = @import("../../../analytics.zig").GenerateHeader.GeneratePlatform.kernelVersion(); // pidfd_nonblock only supported in 5.10+ - const flags: u32 = if (kernel.orderWithoutTag(.{ .major = 5, .minor = 10, .patch = 0 }).compare(.gte)) + const flags: u32 = if (!is_sync and kernel.orderWithoutTag(.{ .major = 5, .minor = 10, .patch = 0 }).compare(.gte)) std.os.O.NONBLOCK else 0; @@ -1014,15 +1087,12 @@ pub const Subprocess = struct { } }; - // set non-blocking stdin - // if (stdio[0].isPiped()) - // _ = std.os.fcntl(stdin_pipe[1], std.os.F.SETFL, std.os.O.NONBLOCK) catch 0; - var subprocess = globalThis.allocator().create(Subprocess) catch { globalThis.throw("out of memory", .{}); return JSValue.jsUndefined(); }; + // When run synchronously, subprocess isn't garbage collected subprocess.* = Subprocess{ .globalThis = globalThis, .pid = pid, @@ -1039,25 +1109,28 @@ pub const Subprocess = struct { subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin); } - const out = subprocess.toJS(globalThis); - subprocess.this_jsvalue.set(globalThis, out); - - switch (globalThis.bunVM().poller.watch( - @intCast(JSC.Node.FileDescriptor, pidfd), - .process, - Subprocess, - subprocess, - )) { - .result => {}, - .err => |err| { - if (err.getErrno() == .SRCH) { - @panic("This shouldn't happen"); - } + const out = if (comptime !is_sync) subprocess.toJS(globalThis) else JSValue.zero; + if (comptime !is_sync) + subprocess.this_jsvalue.set(globalThis, out); + + if (comptime !is_sync) { + switch (globalThis.bunVM().poller.watch( + @intCast(JSC.Node.FileDescriptor, pidfd), + .process, + Subprocess, + subprocess, + )) { + .result => {}, + .err => |err| { + if (err.getErrno() == .SRCH) { + @panic("This shouldn't happen"); + } - // process has already exited - // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007 - subprocess.onExitNotification(); - }, + // process has already exited + // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007 + subprocess.onExitNotification(); + }, + } } if (subprocess.stdin == .buffered_input) { @@ -1065,29 +1138,54 @@ pub const Subprocess = struct { .blob => subprocess.stdin.buffered_input.source.blob.slice(), .array_buffer => |array_buffer| array_buffer.slice(), }; - subprocess.stdin.buffered_input.writeIfPossible(); + subprocess.stdin.buffered_input.writeIfPossible(is_sync); } if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { - // bun.ensureNonBlocking(subprocess.stdout.pipe.buffer.fd); - subprocess.stdout.pipe.buffer.readIfPossible(); + if (comptime is_sync) { + if (subprocess.stderr.pipe.buffer.canRead()) { + subprocess.stderr.pipe.buffer.readAll(true); + } + } else { + subprocess.stdout.pipe.buffer.readIfPossible(false); + } } if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { - // bun.ensureNonBlocking(subprocess.stderr.pipe.buffer.fd); - subprocess.stderr.pipe.buffer.readIfPossible(); + if (comptime is_sync) { + if (subprocess.stderr.pipe.buffer.canRead()) { + subprocess.stderr.pipe.buffer.readAll(true); + } + } else { + subprocess.stderr.pipe.buffer.readIfPossible(false); + } } - return out; + if (comptime !is_sync) { + return out; + } + + subprocess.wait(true); + const exitCode = subprocess.exit_code orelse 1; + const stdout = subprocess.stdout.toBufferedValue(globalThis); + const stderr = subprocess.stderr.toBufferedValue(globalThis); + subprocess.finalize(); + + const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4); + sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode) * -1)); + sync_value.put(globalThis, JSC.ZigString.static("stdout"), stdout); + sync_value.put(globalThis, JSC.ZigString.static("stderr"), stderr); + sync_value.put(globalThis, JSC.ZigString.static("success"), JSValue.jsBoolean(exitCode == 0)); + return sync_value; } pub fn onExitNotification( this: *Subprocess, ) void { - this.wait(this.globalThis.bunVM()); + this.wait(false); } - pub fn wait(this: *Subprocess, vm: *JSC.VirtualMachine) void { + pub fn wait(this: *Subprocess, sync: bool) void { if (this.has_waitpid_task) { return; } @@ -1103,9 +1201,11 @@ pub const Subprocess = struct { }, } - this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this); - this.has_waitpid_task = true; - vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task)); + if (!sync) { + this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this); + this.has_waitpid_task = true; + this.globalThis.bunVM().eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task)); + } } fn onExit(this: *Subprocess) void { diff --git a/test/bun.js/log-test.test.ts b/test/bun.js/log-test.test.ts index ecc2c3939..bdb6cbe42 100644 --- a/test/bun.js/log-test.test.ts +++ b/test/bun.js/log-test.test.ts @@ -1,7 +1,7 @@ import { it, expect } from "bun:test"; import { basename, dirname, join } from "path"; import * as fs from "fs"; -import { readableStreamToText, spawn } from "bun"; +import { readableStreamToText, spawnSync } from "bun"; it("should not log .env when quiet", async () => { writeDirectoryTree("/tmp/log-test-silent", { @@ -9,17 +9,12 @@ it("should not log .env when quiet", async () => { "bunfig.toml": `logLevel = "error"`, "index.ts": "export default console.log('Here');", }); - const out = spawn({ + const { stderr } = spawnSync({ cmd: ["bun", "index.ts"], - stdout: "pipe", - stderr: "pipe", cwd: "/tmp/log-test-silent", }); - out.ref(); - await out.exited; - const text = await readableStreamToText(out.stderr); - expect(text).toBe(""); + expect(stderr.toString()).toBe(""); }); it("should log .env by default", async () => { @@ -29,17 +24,12 @@ it("should log .env by default", async () => { "index.ts": "export default console.log('Here');", }); - const out = spawn({ + const { stderr } = spawnSync({ cmd: ["bun", "index.ts"], - stdout: "pipe", - stderr: "pipe", cwd: "/tmp/log-test-silent", }); - out.ref(); - await out.exited; - const text = await readableStreamToText(out.stderr); - expect(text.includes(".env")).toBe(true); + expect(stderr.toString().includes(".env")).toBe(true); }); function writeDirectoryTree(base, paths) { diff --git a/test/bun.js/spawn.test.ts b/test/bun.js/spawn.test.ts index b8e0459c5..6829791ce 100644 --- a/test/bun.js/spawn.test.ts +++ b/test/bun.js/spawn.test.ts @@ -1,4 +1,4 @@ -import { readableStreamToText, spawn, write } from "bun"; +import { readableStreamToText, spawn, spawnSync, write } from "bun"; import { describe, expect, it } from "bun:test"; import { gcTick as _gcTick } from "gc"; import { rmdirSync, unlinkSync, rmSync, writeFileSync } from "node:fs"; @@ -8,9 +8,49 @@ for (let [gcTick, label] of [ [() => {}, "no gc tick"], ]) { describe(label, () => { + describe("spawnSync", () => { + const hugeString = "hello".repeat(10000).slice(); + + it("Uint8Array works as stdin", async () => { + const { stdout, stderr } = spawnSync({ + cmd: ["cat"], + stdin: new TextEncoder().encode(hugeString), + }); + + expect(stdout.toString()).toBe(hugeString); + expect(stderr.byteLength).toBe(0); + }); + }); + describe("spawn", () => { const hugeString = "hello".repeat(10000).slice(); + it("Uint8Array works as stdin", async () => { + rmSync("/tmp/out.123.txt", { force: true }); + gcTick(); + const { exited } = spawn({ + cmd: ["cat"], + stdin: new TextEncoder().encode(hugeString), + stdout: Bun.file("/tmp/out.123.txt"), + }); + + await exited; + expect(await Bun.file("/tmp/out.123.txt").text()).toBe(hugeString); + }); + + it("Blob works as stdin", async () => { + rmSync("/tmp/out.123.txt", { force: true }); + gcTick(); + const { exited } = spawn({ + cmd: ["cat"], + stdin: new Blob([new TextEncoder().encode(hugeString)]), + stdout: Bun.file("/tmp/out.123.txt"), + }); + + await exited; + expect(await Bun.file("/tmp/out.123.txt").text()).toBe(hugeString); + }); + it("Bun.file() works as stdout", async () => { rmSync("/tmp/out.123.txt", { force: true }); gcTick(); |