aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bun.js/api/bun.zig3
-rw-r--r--src/bun.js/api/bun/subprocess.zig230
-rw-r--r--test/bun.js/log-test.test.ts20
-rw-r--r--test/bun.js/spawn.test.ts42
4 files changed, 214 insertions, 81 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index 007538a6a..ca84bbad0 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -1198,6 +1198,9 @@ pub const Class = NewClass(
.spawn = .{
.rfn = JSC.wrapWithHasContainer(JSC.Subprocess, "spawn", false, false, false),
},
+ .spawnSync = .{
+ .rfn = JSC.wrapWithHasContainer(JSC.Subprocess, "spawnSync", false, false, false),
+ },
},
.{
.main = .{
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index b6dfc666d..aec961056 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -174,6 +174,37 @@ pub const Subprocess = struct {
},
}
}
+
+ pub fn toBufferedValue(this: *Readable, globalThis: *JSC.JSGlobalObject) JSValue {
+ switch (this.*) {
+ .fd => |fd| {
+ return JSValue.jsNumber(fd);
+ },
+ .pipe => {
+ defer this.close();
+
+ // TODO: handle when there's pending unread data in the pipe
+ // For some reason, this currently hangs forever
+ if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ if (this.pipe.buffer.canRead())
+ this.pipe.buffer.readIfPossible(true);
+ }
+
+ var bytes = this.pipe.buffer.internal_buffer.slice();
+ this.pipe.buffer.internal_buffer = .{};
+
+ if (bytes.len > 0) {
+ // Return a Buffer so that they can do .toString() on it
+ return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator);
+ }
+
+ return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator);
+ },
+ else => {
+ return JSValue.jsUndefined();
+ },
+ }
+ }
};
pub fn getStderr(
@@ -321,27 +352,30 @@ pub const Subprocess = struct {
pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady);
- pub fn onReady(this: *BufferedInput, size_or_offset: i64) void {
- this.write(@intCast(usize, @maximum(size_or_offset, 0)));
+ pub fn onReady(this: *BufferedInput, _: i64) void {
+ this.write();
}
pub fn canWrite(this: *BufferedInput) bool {
return bun.isWritable(this.fd);
}
- pub fn writeIfPossible(this: *BufferedInput) void {
- // we ask, "Is it possible to write right now?"
- // we do this rather than epoll or kqueue()
- // because we don't want to block the thread waiting for the write
- if (!this.canWrite()) {
- this.watch(this.fd);
- return;
+ pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void {
+ if (comptime !is_sync) {
+
+ // we ask, "Is it possible to write right now?"
+ // we do this rather than epoll or kqueue()
+ // because we don't want to block the thread waiting for the write
+ if (!this.canWrite()) {
+ this.watch(this.fd);
+ return;
+ }
}
- this.write(0);
+ this.write();
}
- pub fn write(this: *BufferedInput, _: usize) void {
+ pub fn write(this: *BufferedInput) void {
var to_write = this.remain;
if (to_write.len == 0) {
@@ -367,6 +401,11 @@ pub const Subprocess = struct {
return;
}
+ if (e.getErrno() == .PIPE) {
+ this.deinit();
+ return;
+ }
+
// fail
log("write({d}) fail: {d}", .{ to_write.len, e.errno });
this.deinit();
@@ -432,28 +471,30 @@ pub const Subprocess = struct {
pub fn ready(this: *BufferedOutput, _: i64) void {
// TODO: what happens if the task was already enqueued after unwatch()?
- this.readAll();
+ this.readAll(false);
}
pub fn canRead(this: *BufferedOutput) bool {
return bun.isReadable(this.fd);
}
- pub fn readIfPossible(this: *BufferedOutput) void {
- // we ask, "Is it possible to read right now?"
- // we do this rather than epoll or kqueue()
- // because we don't want to block the thread waiting for the read
- if (!this.canRead()) {
- this.watch(this.fd);
- return;
+ pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void {
+ if (comptime !force) {
+ // we ask, "Is it possible to read right now?"
+ // we do this rather than epoll or kqueue()
+ // because we don't want to block the thread waiting for the read
+ // and because kqueue or epoll might return other unrelated events
+ // and we don't want this to become an event loop ticking point
+ if (!this.canRead()) {
+ this.watch(this.fd);
+ return;
+ }
}
- this.readAll();
+ this.readAll(force);
}
- pub fn readAll(
- this: *BufferedOutput,
- ) void {
+ pub fn readAll(this: *BufferedOutput, comptime force: bool) void {
// read as much as we can from the pipe
while (this.internal_buffer.len <= this.max_internal_buffer) {
var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined;
@@ -465,10 +506,6 @@ pub const Subprocess = struct {
buf = available;
}
- if (comptime bun.Environment.allow_assert) {
- // bun.assertNonBlocking(this.fd);
- }
-
switch (JSC.Node.Syscall.read(this.fd, buf)) {
.err => |e| {
if (e.isRetry()) {
@@ -476,6 +513,16 @@ pub const Subprocess = struct {
return;
}
+ // INTR is returned on macOS when the process is killed
+ // It probably sent SIGPIPE but we have the handler for
+ // that disabled.
+ // We know it's the "real" INTR because we use read$NOCANCEL
+ if (e.getErrno() == .INTR) {
+ this.received_eof = true;
+ this.autoCloseFileDescriptor();
+ return;
+ }
+
// fail
log("readAll() fail: {s}", .{@tagName(e.getErrno())});
this.pending_error = e;
@@ -495,10 +542,19 @@ pub const Subprocess = struct {
}
}
- if (buf[bytes_read..].len > 0 or !this.canRead()) {
- this.watch(this.fd);
- this.received_eof = true;
- return;
+ if (comptime !force) {
+ if (buf[bytes_read..].len > 0 or !this.canRead()) {
+ this.watch(this.fd);
+ this.received_eof = true;
+ return;
+ }
+ } else {
+ // we consider a short read as being EOF
+ this.received_eof = this.received_eof or bytes_read < buf.len;
+ if (this.received_eof) {
+ this.autoCloseFileDescriptor();
+ return;
+ }
}
},
}
@@ -732,6 +788,18 @@ pub const Subprocess = struct {
}
pub fn spawn(globalThis: *JSC.JSGlobalObject, args: JSValue) JSValue {
+ return spawnMaybeSync(globalThis, args, false);
+ }
+
+ pub fn spawnSync(globalThis: *JSC.JSGlobalObject, args: JSValue) JSValue {
+ return spawnMaybeSync(globalThis, args, true);
+ }
+
+ pub fn spawnMaybeSync(
+ globalThis: *JSC.JSGlobalObject,
+ args: JSValue,
+ comptime is_sync: bool,
+ ) JSValue {
var arena = std.heap.ArenaAllocator.init(bun.default_allocator);
defer arena.deinit();
var allocator = arena.allocator();
@@ -751,6 +819,11 @@ pub const Subprocess = struct {
.{ .pipe = null },
};
+ if (comptime is_sync) {
+ stdio[1] = .{ .pipe = null };
+ stdio[2] = .{ .pipe = null };
+ }
+
var on_exit_callback = JSValue.zero;
var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse "";
var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined;
@@ -992,7 +1065,7 @@ pub const Subprocess = struct {
const kernel = @import("../../../analytics.zig").GenerateHeader.GeneratePlatform.kernelVersion();
// pidfd_nonblock only supported in 5.10+
- const flags: u32 = if (kernel.orderWithoutTag(.{ .major = 5, .minor = 10, .patch = 0 }).compare(.gte))
+ const flags: u32 = if (!is_sync and kernel.orderWithoutTag(.{ .major = 5, .minor = 10, .patch = 0 }).compare(.gte))
std.os.O.NONBLOCK
else
0;
@@ -1014,15 +1087,12 @@ pub const Subprocess = struct {
}
};
- // set non-blocking stdin
- // if (stdio[0].isPiped())
- // _ = std.os.fcntl(stdin_pipe[1], std.os.F.SETFL, std.os.O.NONBLOCK) catch 0;
-
var subprocess = globalThis.allocator().create(Subprocess) catch {
globalThis.throw("out of memory", .{});
return JSValue.jsUndefined();
};
+ // When run synchronously, subprocess isn't garbage collected
subprocess.* = Subprocess{
.globalThis = globalThis,
.pid = pid,
@@ -1039,25 +1109,28 @@ pub const Subprocess = struct {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
}
- const out = subprocess.toJS(globalThis);
- subprocess.this_jsvalue.set(globalThis, out);
-
- switch (globalThis.bunVM().poller.watch(
- @intCast(JSC.Node.FileDescriptor, pidfd),
- .process,
- Subprocess,
- subprocess,
- )) {
- .result => {},
- .err => |err| {
- if (err.getErrno() == .SRCH) {
- @panic("This shouldn't happen");
- }
+ const out = if (comptime !is_sync) subprocess.toJS(globalThis) else JSValue.zero;
+ if (comptime !is_sync)
+ subprocess.this_jsvalue.set(globalThis, out);
+
+ if (comptime !is_sync) {
+ switch (globalThis.bunVM().poller.watch(
+ @intCast(JSC.Node.FileDescriptor, pidfd),
+ .process,
+ Subprocess,
+ subprocess,
+ )) {
+ .result => {},
+ .err => |err| {
+ if (err.getErrno() == .SRCH) {
+ @panic("This shouldn't happen");
+ }
- // process has already exited
- // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
- subprocess.onExitNotification();
- },
+ // process has already exited
+ // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
+ subprocess.onExitNotification();
+ },
+ }
}
if (subprocess.stdin == .buffered_input) {
@@ -1065,29 +1138,54 @@ pub const Subprocess = struct {
.blob => subprocess.stdin.buffered_input.source.blob.slice(),
.array_buffer => |array_buffer| array_buffer.slice(),
};
- subprocess.stdin.buffered_input.writeIfPossible();
+ subprocess.stdin.buffered_input.writeIfPossible(is_sync);
}
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
- // bun.ensureNonBlocking(subprocess.stdout.pipe.buffer.fd);
- subprocess.stdout.pipe.buffer.readIfPossible();
+ if (comptime is_sync) {
+ if (subprocess.stderr.pipe.buffer.canRead()) {
+ subprocess.stderr.pipe.buffer.readAll(true);
+ }
+ } else {
+ subprocess.stdout.pipe.buffer.readIfPossible(false);
+ }
}
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
- // bun.ensureNonBlocking(subprocess.stderr.pipe.buffer.fd);
- subprocess.stderr.pipe.buffer.readIfPossible();
+ if (comptime is_sync) {
+ if (subprocess.stderr.pipe.buffer.canRead()) {
+ subprocess.stderr.pipe.buffer.readAll(true);
+ }
+ } else {
+ subprocess.stderr.pipe.buffer.readIfPossible(false);
+ }
}
- return out;
+ if (comptime !is_sync) {
+ return out;
+ }
+
+ subprocess.wait(true);
+ const exitCode = subprocess.exit_code orelse 1;
+ const stdout = subprocess.stdout.toBufferedValue(globalThis);
+ const stderr = subprocess.stderr.toBufferedValue(globalThis);
+ subprocess.finalize();
+
+ const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4);
+ sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode) * -1));
+ sync_value.put(globalThis, JSC.ZigString.static("stdout"), stdout);
+ sync_value.put(globalThis, JSC.ZigString.static("stderr"), stderr);
+ sync_value.put(globalThis, JSC.ZigString.static("success"), JSValue.jsBoolean(exitCode == 0));
+ return sync_value;
}
pub fn onExitNotification(
this: *Subprocess,
) void {
- this.wait(this.globalThis.bunVM());
+ this.wait(false);
}
- pub fn wait(this: *Subprocess, vm: *JSC.VirtualMachine) void {
+ pub fn wait(this: *Subprocess, sync: bool) void {
if (this.has_waitpid_task) {
return;
}
@@ -1103,9 +1201,11 @@ pub const Subprocess = struct {
},
}
- this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this);
- this.has_waitpid_task = true;
- vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task));
+ if (!sync) {
+ this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this);
+ this.has_waitpid_task = true;
+ this.globalThis.bunVM().eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task));
+ }
}
fn onExit(this: *Subprocess) void {
diff --git a/test/bun.js/log-test.test.ts b/test/bun.js/log-test.test.ts
index ecc2c3939..bdb6cbe42 100644
--- a/test/bun.js/log-test.test.ts
+++ b/test/bun.js/log-test.test.ts
@@ -1,7 +1,7 @@
import { it, expect } from "bun:test";
import { basename, dirname, join } from "path";
import * as fs from "fs";
-import { readableStreamToText, spawn } from "bun";
+import { readableStreamToText, spawnSync } from "bun";
it("should not log .env when quiet", async () => {
writeDirectoryTree("/tmp/log-test-silent", {
@@ -9,17 +9,12 @@ it("should not log .env when quiet", async () => {
"bunfig.toml": `logLevel = "error"`,
"index.ts": "export default console.log('Here');",
});
- const out = spawn({
+ const { stderr } = spawnSync({
cmd: ["bun", "index.ts"],
- stdout: "pipe",
- stderr: "pipe",
cwd: "/tmp/log-test-silent",
});
- out.ref();
- await out.exited;
- const text = await readableStreamToText(out.stderr);
- expect(text).toBe("");
+ expect(stderr.toString()).toBe("");
});
it("should log .env by default", async () => {
@@ -29,17 +24,12 @@ it("should log .env by default", async () => {
"index.ts": "export default console.log('Here');",
});
- const out = spawn({
+ const { stderr } = spawnSync({
cmd: ["bun", "index.ts"],
- stdout: "pipe",
- stderr: "pipe",
cwd: "/tmp/log-test-silent",
});
- out.ref();
- await out.exited;
- const text = await readableStreamToText(out.stderr);
- expect(text.includes(".env")).toBe(true);
+ expect(stderr.toString().includes(".env")).toBe(true);
});
function writeDirectoryTree(base, paths) {
diff --git a/test/bun.js/spawn.test.ts b/test/bun.js/spawn.test.ts
index b8e0459c5..6829791ce 100644
--- a/test/bun.js/spawn.test.ts
+++ b/test/bun.js/spawn.test.ts
@@ -1,4 +1,4 @@
-import { readableStreamToText, spawn, write } from "bun";
+import { readableStreamToText, spawn, spawnSync, write } from "bun";
import { describe, expect, it } from "bun:test";
import { gcTick as _gcTick } from "gc";
import { rmdirSync, unlinkSync, rmSync, writeFileSync } from "node:fs";
@@ -8,9 +8,49 @@ for (let [gcTick, label] of [
[() => {}, "no gc tick"],
]) {
describe(label, () => {
+ describe("spawnSync", () => {
+ const hugeString = "hello".repeat(10000).slice();
+
+ it("Uint8Array works as stdin", async () => {
+ const { stdout, stderr } = spawnSync({
+ cmd: ["cat"],
+ stdin: new TextEncoder().encode(hugeString),
+ });
+
+ expect(stdout.toString()).toBe(hugeString);
+ expect(stderr.byteLength).toBe(0);
+ });
+ });
+
describe("spawn", () => {
const hugeString = "hello".repeat(10000).slice();
+ it("Uint8Array works as stdin", async () => {
+ rmSync("/tmp/out.123.txt", { force: true });
+ gcTick();
+ const { exited } = spawn({
+ cmd: ["cat"],
+ stdin: new TextEncoder().encode(hugeString),
+ stdout: Bun.file("/tmp/out.123.txt"),
+ });
+
+ await exited;
+ expect(await Bun.file("/tmp/out.123.txt").text()).toBe(hugeString);
+ });
+
+ it("Blob works as stdin", async () => {
+ rmSync("/tmp/out.123.txt", { force: true });
+ gcTick();
+ const { exited } = spawn({
+ cmd: ["cat"],
+ stdin: new Blob([new TextEncoder().encode(hugeString)]),
+ stdout: Bun.file("/tmp/out.123.txt"),
+ });
+
+ await exited;
+ expect(await Bun.file("/tmp/out.123.txt").text()).toBe(hugeString);
+ });
+
it("Bun.file() works as stdout", async () => {
rmSync("/tmp/out.123.txt", { force: true });
gcTick();