aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bun.js/api/bun/subprocess.zig81
-rw-r--r--src/bun.js/webcore/streams.zig17
-rw-r--r--src/global.zig28
-rw-r--r--test/bun.js/body-stream.test.ts12
-rw-r--r--test/bun.js/log-test.test.ts4
-rw-r--r--test/bun.js/spawn.test.ts277
6 files changed, 245 insertions, 174 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 6bf839ca1..b6dfc666d 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -325,18 +325,15 @@ pub const Subprocess = struct {
this.write(@intCast(usize, @maximum(size_or_offset, 0)));
}
+ pub fn canWrite(this: *BufferedInput) bool {
+ return bun.isWritable(this.fd);
+ }
+
pub fn writeIfPossible(this: *BufferedInput) void {
// we ask, "Is it possible to write right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the write
- var polls = &[_]std.os.pollfd{
- .{
- .fd = this.fd,
- .events = std.os.POLL.OUT | std.os.POLL.ERR,
- .revents = 0,
- },
- };
- if ((std.os.poll(polls, 0) catch 0) == 0) {
+ if (!this.canWrite()) {
this.watch(this.fd);
return;
}
@@ -438,19 +435,16 @@ pub const Subprocess = struct {
this.readAll();
}
+ pub fn canRead(this: *BufferedOutput) bool {
+ return bun.isReadable(this.fd);
+ }
+
pub fn readIfPossible(this: *BufferedOutput) void {
// we ask, "Is it possible to read right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the read
- var polls = &[_]std.os.pollfd{
- .{
- .fd = this.fd,
- .events = std.os.POLL.IN | std.os.POLL.ERR,
- .revents = 0,
- },
- };
-
- if ((std.os.poll(polls, 0) catch 0) == 0) {
+ if (!this.canRead()) {
+ this.watch(this.fd);
return;
}
@@ -493,19 +487,19 @@ pub const Subprocess = struct {
.result => |bytes_read| {
log("readAll() {d}", .{bytes_read});
- if (bytes_read == 0) {
+ if (bytes_read > 0) {
+ if (buf.ptr == available.ptr) {
+ this.internal_buffer.len += @truncate(u32, bytes_read);
+ } else {
+ _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory");
+ }
+ }
+
+ if (buf[bytes_read..].len > 0 or !this.canRead()) {
this.watch(this.fd);
this.received_eof = true;
return;
}
-
- if (buf.ptr == available.ptr) {
- this.internal_buffer.len += @truncate(u32, bytes_read);
- } else {
- _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory");
- }
-
- continue;
},
}
}
@@ -520,12 +514,11 @@ pub const Subprocess = struct {
}
pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream {
- if (this.poll_ref.isActive())
- this.unwatch(this.fd);
-
if (exited) {
// exited + received EOF => no more read()
if (this.received_eof) {
+ this.autoCloseFileDescriptor();
+
// also no data at all
if (this.internal_buffer.len == 0) {
this.close();
@@ -549,6 +542,12 @@ pub const Subprocess = struct {
std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ // BufferedOutput is going away
+ // let's make sure we don't watch it anymore
+ if (this.poll_ref.isActive()) {
+ this.unwatch(this.fd);
+ }
+
// There could still be data waiting to be read in the pipe
// so we need to create a new stream that will read from the
// pipe and then return the blob.
@@ -571,14 +570,20 @@ pub const Subprocess = struct {
return result;
}
- pub fn close(this: *BufferedOutput) void {
- if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
- if (this.poll_ref.isActive())
- this.unwatch(this.fd);
+ pub fn autoCloseFileDescriptor(this: *BufferedOutput) void {
+ const fd = this.fd;
+ if (fd == std.math.maxInt(JSC.Node.FileDescriptor))
+ return;
+ this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
- _ = JSC.Node.Syscall.close(this.fd);
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
- }
+ if (this.poll_ref.isActive())
+ this.unwatch(fd);
+
+ _ = JSC.Node.Syscall.close(fd);
+ }
+
+ pub fn close(this: *BufferedOutput) void {
+ this.autoCloseFileDescriptor();
if (this.internal_buffer.cap > 0) {
this.internal_buffer.listManaged(bun.default_allocator).deinit();
@@ -1180,9 +1185,7 @@ pub const Subprocess = struct {
const idx: usize = if (std_fileno == 0) 0 else 1;
try actions.dup2(pipe_fd[idx], std_fileno);
-
- if (comptime Environment.isMac)
- try actions.close(pipe_fd[1 - idx]);
+ try actions.close(pipe_fd[1 - idx]);
},
.fd => |fd| {
try actions.dup2(fd, std_fileno);
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 61a232b5c..15eec6262 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -3556,8 +3556,23 @@ pub const FileBlobLoader = struct {
switch (rc) {
.err => |err| {
const retry = std.os.E.AGAIN;
+ const errno = brk: {
+ const _errno = err.getErrno();
+ if (comptime Environment.isLinux) {
+ // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already
+ // sent a 0
+ // Let's retry later.
+ if (std.os.S.ISFIFO(this.mode) and
+ !this.close_on_eof and _errno == .PERM)
+ {
+ break :brk .AGAIN;
+ }
+ }
+
+ break :brk _errno;
+ };
- switch (err.getErrno()) {
+ switch (errno) {
retry => {
if (this.finished) {
return .{ .done = {} };
diff --git a/src/global.zig b/src/global.zig
index a1d33bef5..f0e98861f 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -336,6 +336,30 @@ pub fn assertNonBlocking(fd: anytype) void {
}
pub fn ensureNonBlocking(fd: anytype) void {
- const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch unreachable;
- _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch unreachable;
+ const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch 0;
+ _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch 0;
+}
+
+pub fn isReadable(fd: std.os.fd_t) bool {
+ var polls = &[_]std.os.pollfd{
+ .{
+ .fd = fd,
+ .events = std.os.POLL.IN | std.os.POLL.ERR,
+ .revents = 0,
+ },
+ };
+
+ return (std.os.poll(polls, 0) catch 0) != 0;
+}
+
+pub fn isWritable(fd: std.os.fd_t) bool {
+ var polls = &[_]std.os.pollfd{
+ .{
+ .fd = fd,
+ .events = std.os.POLL.OUT | std.os.POLL.ERR,
+ .revents = 0,
+ },
+ };
+
+ return (std.os.poll(polls, 0) catch 0) != 0;
}
diff --git a/test/bun.js/body-stream.test.ts b/test/bun.js/body-stream.test.ts
index 7310a6837..e513ce7cb 100644
--- a/test/bun.js/body-stream.test.ts
+++ b/test/bun.js/body-stream.test.ts
@@ -4,7 +4,7 @@ import { readFileSync } from "fs";
// afterEach(() => Bun.gc(true));
-var port = 40001;
+var port = 4020;
{
const BodyMixin = [
@@ -178,9 +178,11 @@ async function runInServer(
cb: (url: string) => void | Promise<void>
) {
var server;
+ var thisPort = port++;
+ if (port > 4120) port = 4020;
server = Bun.serve({
...opts,
- port: port++,
+ port: thisPort,
fetch(req) {
try {
return opts.fetch(req);
@@ -201,10 +203,8 @@ async function runInServer(
} catch (e) {
throw e;
} finally {
- queueMicrotask(() => {
- server && server.stop();
- server = undefined;
- });
+ server && server.stop();
+ server = undefined;
}
}
diff --git a/test/bun.js/log-test.test.ts b/test/bun.js/log-test.test.ts
index 883daa5c4..ecc2c3939 100644
--- a/test/bun.js/log-test.test.ts
+++ b/test/bun.js/log-test.test.ts
@@ -10,7 +10,7 @@ it("should not log .env when quiet", async () => {
"index.ts": "export default console.log('Here');",
});
const out = spawn({
- cmd: [process.argv[0], "index.ts"],
+ cmd: ["bun", "index.ts"],
stdout: "pipe",
stderr: "pipe",
cwd: "/tmp/log-test-silent",
@@ -30,7 +30,7 @@ it("should log .env by default", async () => {
});
const out = spawn({
- cmd: [process.argv[0], "index.ts"],
+ cmd: ["bun", "index.ts"],
stdout: "pipe",
stderr: "pipe",
cwd: "/tmp/log-test-silent",
diff --git a/test/bun.js/spawn.test.ts b/test/bun.js/spawn.test.ts
index e03de2662..b8e0459c5 100644
--- a/test/bun.js/spawn.test.ts
+++ b/test/bun.js/spawn.test.ts
@@ -1,141 +1,170 @@
import { readableStreamToText, spawn, write } from "bun";
import { describe, expect, it } from "bun:test";
-import { rmdirSync, unlinkSync, rmSync } from "node:fs";
-
-describe("spawn", () => {
- const hugeString = "hello".repeat(10000).slice();
-
- it("Bun.file() works as stdout", async () => {
- rmSync("/tmp/out.123.txt", { force: true });
- const { exited } = spawn({
- cmd: ["echo", "hello"],
- stdout: Bun.file("/tmp/out.123.txt"),
- });
-
- await exited;
- expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello\n");
- });
-
- it("Bun.file() works as stdin", async () => {
- await write(Bun.file("/tmp/out.456.txt"), "hello there!");
- const { stdout } = spawn({
- cmd: ["cat"],
- stdout: "pipe",
- stdin: Bun.file("/tmp/out.456.txt"),
- });
-
- expect(await readableStreamToText(stdout)).toBe("hello there!");
- });
-
- it("Bun.file() works as stdin and stdout", async () => {
- await write(Bun.file("/tmp/out.456.txt"), "hello!");
- await write(Bun.file("/tmp/out.123.txt"), "wrong!");
-
- const { exited } = spawn({
- cmd: ["cat"],
- stdout: Bun.file("/tmp/out.123.txt"),
- stdin: Bun.file("/tmp/out.456.txt"),
- });
-
- await exited;
- expect(await Bun.file("/tmp/out.456.txt").text()).toBe("hello!");
- expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello!");
- });
+import { gcTick as _gcTick } from "gc";
+import { rmdirSync, unlinkSync, rmSync, writeFileSync } from "node:fs";
+
+for (let [gcTick, label] of [
+ [_gcTick, "gcTick"],
+ [() => {}, "no gc tick"],
+]) {
+ describe(label, () => {
+ describe("spawn", () => {
+ const hugeString = "hello".repeat(10000).slice();
+
+ it("Bun.file() works as stdout", async () => {
+ rmSync("/tmp/out.123.txt", { force: true });
+ gcTick();
+ const { exited } = spawn({
+ cmd: ["echo", "hello"],
+ stdout: Bun.file("/tmp/out.123.txt"),
+ });
- it("stdout can be read", async () => {
- await Bun.write("/tmp/out.txt", hugeString);
- const { stdout } = spawn({
- cmd: ["cat", "/tmp/out.txt"],
- stdout: "pipe",
- });
+ await exited;
+ gcTick();
+ expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello\n");
+ });
- const text = await readableStreamToText(stdout);
- expect(text).toBe(hugeString);
- });
+ it("Bun.file() works as stdin", async () => {
+ await write(Bun.file("/tmp/out.456.txt"), "hello there!");
+ gcTick();
+ const { stdout } = spawn({
+ cmd: ["cat"],
+ stdout: "pipe",
+ stdin: Bun.file("/tmp/out.456.txt"),
+ });
+ gcTick();
+ expect(await readableStreamToText(stdout)).toBe("hello there!");
+ });
- it("stdin can be read and stdout can be written", async () => {
- const { stdout, stdin, exited } = spawn({
- cmd: ["bash", import.meta.dir + "/bash-echo.sh"],
- stdout: "pipe",
- stdin: "pipe",
- stderr: "inherit",
- });
+ it("Bun.file() works as stdin and stdout", async () => {
+ writeFileSync("/tmp/out.456.txt", "hello!");
+ gcTick();
+ writeFileSync("/tmp/out.123.txt", "wrong!");
+ gcTick();
- await stdin.write(hugeString);
- await stdin.end();
+ const { exited } = spawn({
+ cmd: ["cat"],
+ stdout: Bun.file("/tmp/out.123.txt"),
+ stdin: Bun.file("/tmp/out.456.txt"),
+ });
+ gcTick();
+ await exited;
+ expect(await Bun.file("/tmp/out.456.txt").text()).toBe("hello!");
+ gcTick();
+ expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello!");
+ });
- const text = await readableStreamToText(stdout);
- expect(text.trim()).toBe(hugeString);
- await exited;
- });
+ it("stdout can be read", async () => {
+ await Bun.write("/tmp/out.txt", hugeString);
+ gcTick();
+ const { stdout } = spawn({
+ cmd: ["cat", "/tmp/out.txt"],
+ stdout: "pipe",
+ });
+ gcTick();
- describe("pipe", () => {
- function huge() {
- return spawn({
- cmd: ["echo", hugeString],
- stdout: "pipe",
- stdin: "pipe",
- stderr: "inherit",
+ const text = await readableStreamToText(stdout);
+ gcTick();
+ expect(text).toBe(hugeString);
});
- }
- function helloWorld() {
- return spawn({
- cmd: ["echo", "hello"],
- stdout: "pipe",
- stdin: "pipe",
+ it("stdin can be read and stdout can be written", async () => {
+ const proc = spawn({
+ cmd: ["bash", import.meta.dir + "/bash-echo.sh"],
+ stdout: "pipe",
+ stdin: "pipe",
+ stderr: "inherit",
+ });
+ proc.stdin.write(hugeString);
+ await proc.stdin.end(true);
+ var text = "";
+ var reader = proc.stdout.getReader();
+ var done = false;
+ while (!done) {
+ var { value, done } = await reader.read();
+ if (value) text += new TextDecoder().decode(value);
+ if (done && text.length === 0) {
+ reader.releaseLock();
+ reader = proc.stdout.getReader();
+ done = false;
+ }
+ }
+
+ expect(text.trim().length).toBe(hugeString.length);
+ expect(text.trim()).toBe(hugeString);
+ gcTick();
+ await proc.exited;
});
- }
-
- const fixtures = [
- [helloWorld, "hello"],
- [huge, hugeString],
- ];
-
- for (const [callback, fixture] of fixtures) {
- describe(fixture.slice(0, 12), () => {
- describe("should should allow reading stdout", () => {
- it("before exit", async () => {
- const process = callback();
- const output = await readableStreamToText(process.stdout);
- const expected = fixture + "\n";
- expect(output.length).toBe(expected.length);
- expect(output).toBe(expected);
-
- await process.exited;
- });
- it("before exit (chunked)", async () => {
- const process = callback();
- var output = "";
- var reader = process.stdout.getReader();
- var done = false;
- while (!done) {
- var { value, done } = await reader.read();
- if (value) output += new TextDecoder().decode(value);
- }
-
- const expected = fixture + "\n";
- expect(output.length).toBe(expected.length);
- expect(output).toBe(expected);
-
- await process.exited;
+ describe("pipe", () => {
+ function huge() {
+ return spawn({
+ cmd: ["echo", hugeString],
+ stdout: "pipe",
+ stdin: "pipe",
+ stderr: "inherit",
});
+ }
- it("after exit", async () => {
- const process = callback();
- await process.stdin.end();
-
- const output = await readableStreamToText(process.stdout);
- const expected = fixture + "\n";
-
- expect(output.length).toBe(expected.length);
- expect(output).toBe(expected);
-
- await process.exited;
+ function helloWorld() {
+ return spawn({
+ cmd: ["echo", "hello"],
+ stdout: "pipe",
+ stdin: "pipe",
});
- });
+ }
+
+ const fixtures = [
+ [helloWorld, "hello"],
+ [huge, hugeString],
+ ];
+
+ for (const [callback, fixture] of fixtures) {
+ describe(fixture.slice(0, 12), () => {
+ describe("should should allow reading stdout", () => {
+ it("before exit", async () => {
+ const process = callback();
+ const output = await readableStreamToText(process.stdout);
+ const expected = fixture + "\n";
+ expect(output.length).toBe(expected.length);
+ expect(output).toBe(expected);
+
+ await process.exited;
+ });
+
+ it("before exit (chunked)", async () => {
+ const process = callback();
+ var output = "";
+ var reader = process.stdout.getReader();
+ var done = false;
+ while (!done) {
+ var { value, done } = await reader.read();
+ if (value) output += new TextDecoder().decode(value);
+ }
+
+ const expected = fixture + "\n";
+ expect(output.length).toBe(expected.length);
+ expect(output).toBe(expected);
+
+ await process.exited;
+ });
+
+ it("after exit", async () => {
+ const process = callback();
+ await process.stdin.end();
+
+ const output = await readableStreamToText(process.stdout);
+ const expected = fixture + "\n";
+
+ expect(output.length).toBe(expected.length);
+ expect(output).toBe(expected);
+
+ await process.exited;
+ });
+ });
+ });
+ }
});
- }
+ });
});
-});
+}