diff options
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 81 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 17 | ||||
-rw-r--r-- | src/global.zig | 28 | ||||
-rw-r--r-- | test/bun.js/body-stream.test.ts | 12 | ||||
-rw-r--r-- | test/bun.js/log-test.test.ts | 4 | ||||
-rw-r--r-- | test/bun.js/spawn.test.ts | 277 |
6 files changed, 245 insertions, 174 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 6bf839ca1..b6dfc666d 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -325,18 +325,15 @@ pub const Subprocess = struct { this.write(@intCast(usize, @maximum(size_or_offset, 0))); } + pub fn canWrite(this: *BufferedInput) bool { + return bun.isWritable(this.fd); + } + pub fn writeIfPossible(this: *BufferedInput) void { // we ask, "Is it possible to write right now?" // we do this rather than epoll or kqueue() // because we don't want to block the thread waiting for the write - var polls = &[_]std.os.pollfd{ - .{ - .fd = this.fd, - .events = std.os.POLL.OUT | std.os.POLL.ERR, - .revents = 0, - }, - }; - if ((std.os.poll(polls, 0) catch 0) == 0) { + if (!this.canWrite()) { this.watch(this.fd); return; } @@ -438,19 +435,16 @@ pub const Subprocess = struct { this.readAll(); } + pub fn canRead(this: *BufferedOutput) bool { + return bun.isReadable(this.fd); + } + pub fn readIfPossible(this: *BufferedOutput) void { // we ask, "Is it possible to read right now?" // we do this rather than epoll or kqueue() // because we don't want to block the thread waiting for the read - var polls = &[_]std.os.pollfd{ - .{ - .fd = this.fd, - .events = std.os.POLL.IN | std.os.POLL.ERR, - .revents = 0, - }, - }; - - if ((std.os.poll(polls, 0) catch 0) == 0) { + if (!this.canRead()) { + this.watch(this.fd); return; } @@ -493,19 +487,19 @@ pub const Subprocess = struct { .result => |bytes_read| { log("readAll() {d}", .{bytes_read}); - if (bytes_read == 0) { + if (bytes_read > 0) { + if (buf.ptr == available.ptr) { + this.internal_buffer.len += @truncate(u32, bytes_read); + } else { + _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory"); + } + } + + if (buf[bytes_read..].len > 0 or !this.canRead()) { this.watch(this.fd); this.received_eof = true; return; } - - if (buf.ptr == available.ptr) { - this.internal_buffer.len += @truncate(u32, bytes_read); - } else { - _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory"); - } - - continue; }, } } @@ -520,12 +514,11 @@ pub const Subprocess = struct { } pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream { - if (this.poll_ref.isActive()) - this.unwatch(this.fd); - if (exited) { // exited + received EOF => no more read() if (this.received_eof) { + this.autoCloseFileDescriptor(); + // also no data at all if (this.internal_buffer.len == 0) { this.close(); @@ -549,6 +542,12 @@ pub const Subprocess = struct { std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); + // BufferedOutput is going away + // let's make sure we don't watch it anymore + if (this.poll_ref.isActive()) { + this.unwatch(this.fd); + } + // There could still be data waiting to be read in the pipe // so we need to create a new stream that will read from the // pipe and then return the blob. @@ -571,14 +570,20 @@ pub const Subprocess = struct { return result; } - pub fn close(this: *BufferedOutput) void { - if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { - if (this.poll_ref.isActive()) - this.unwatch(this.fd); + pub fn autoCloseFileDescriptor(this: *BufferedOutput) void { + const fd = this.fd; + if (fd == std.math.maxInt(JSC.Node.FileDescriptor)) + return; + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); - _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); - } + if (this.poll_ref.isActive()) + this.unwatch(fd); + + _ = JSC.Node.Syscall.close(fd); + } + + pub fn close(this: *BufferedOutput) void { + this.autoCloseFileDescriptor(); if (this.internal_buffer.cap > 0) { this.internal_buffer.listManaged(bun.default_allocator).deinit(); @@ -1180,9 +1185,7 @@ pub const Subprocess = struct { const idx: usize = if (std_fileno == 0) 0 else 1; try actions.dup2(pipe_fd[idx], std_fileno); - - if (comptime Environment.isMac) - try actions.close(pipe_fd[1 - idx]); + try actions.close(pipe_fd[1 - idx]); }, .fd => |fd| { try actions.dup2(fd, std_fileno); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 61a232b5c..15eec6262 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -3556,8 +3556,23 @@ pub const FileBlobLoader = struct { switch (rc) { .err => |err| { const retry = std.os.E.AGAIN; + const errno = brk: { + const _errno = err.getErrno(); + if (comptime Environment.isLinux) { + // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already + // sent a 0 + // Let's retry later. + if (std.os.S.ISFIFO(this.mode) and + !this.close_on_eof and _errno == .PERM) + { + break :brk .AGAIN; + } + } + + break :brk _errno; + }; - switch (err.getErrno()) { + switch (errno) { retry => { if (this.finished) { return .{ .done = {} }; diff --git a/src/global.zig b/src/global.zig index a1d33bef5..f0e98861f 100644 --- a/src/global.zig +++ b/src/global.zig @@ -336,6 +336,30 @@ pub fn assertNonBlocking(fd: anytype) void { } pub fn ensureNonBlocking(fd: anytype) void { - const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch unreachable; - _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch unreachable; + const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch 0; + _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch 0; +} + +pub fn isReadable(fd: std.os.fd_t) bool { + var polls = &[_]std.os.pollfd{ + .{ + .fd = fd, + .events = std.os.POLL.IN | std.os.POLL.ERR, + .revents = 0, + }, + }; + + return (std.os.poll(polls, 0) catch 0) != 0; +} + +pub fn isWritable(fd: std.os.fd_t) bool { + var polls = &[_]std.os.pollfd{ + .{ + .fd = fd, + .events = std.os.POLL.OUT | std.os.POLL.ERR, + .revents = 0, + }, + }; + + return (std.os.poll(polls, 0) catch 0) != 0; } diff --git a/test/bun.js/body-stream.test.ts b/test/bun.js/body-stream.test.ts index 7310a6837..e513ce7cb 100644 --- a/test/bun.js/body-stream.test.ts +++ b/test/bun.js/body-stream.test.ts @@ -4,7 +4,7 @@ import { readFileSync } from "fs"; // afterEach(() => Bun.gc(true)); -var port = 40001; +var port = 4020; { const BodyMixin = [ @@ -178,9 +178,11 @@ async function runInServer( cb: (url: string) => void | Promise<void> ) { var server; + var thisPort = port++; + if (port > 4120) port = 4020; server = Bun.serve({ ...opts, - port: port++, + port: thisPort, fetch(req) { try { return opts.fetch(req); @@ -201,10 +203,8 @@ async function runInServer( } catch (e) { throw e; } finally { - queueMicrotask(() => { - server && server.stop(); - server = undefined; - }); + server && server.stop(); + server = undefined; } } diff --git a/test/bun.js/log-test.test.ts b/test/bun.js/log-test.test.ts index 883daa5c4..ecc2c3939 100644 --- a/test/bun.js/log-test.test.ts +++ b/test/bun.js/log-test.test.ts @@ -10,7 +10,7 @@ it("should not log .env when quiet", async () => { "index.ts": "export default console.log('Here');", }); const out = spawn({ - cmd: [process.argv[0], "index.ts"], + cmd: ["bun", "index.ts"], stdout: "pipe", stderr: "pipe", cwd: "/tmp/log-test-silent", @@ -30,7 +30,7 @@ it("should log .env by default", async () => { }); const out = spawn({ - cmd: [process.argv[0], "index.ts"], + cmd: ["bun", "index.ts"], stdout: "pipe", stderr: "pipe", cwd: "/tmp/log-test-silent", diff --git a/test/bun.js/spawn.test.ts b/test/bun.js/spawn.test.ts index e03de2662..b8e0459c5 100644 --- a/test/bun.js/spawn.test.ts +++ b/test/bun.js/spawn.test.ts @@ -1,141 +1,170 @@ import { readableStreamToText, spawn, write } from "bun"; import { describe, expect, it } from "bun:test"; -import { rmdirSync, unlinkSync, rmSync } from "node:fs"; - -describe("spawn", () => { - const hugeString = "hello".repeat(10000).slice(); - - it("Bun.file() works as stdout", async () => { - rmSync("/tmp/out.123.txt", { force: true }); - const { exited } = spawn({ - cmd: ["echo", "hello"], - stdout: Bun.file("/tmp/out.123.txt"), - }); - - await exited; - expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello\n"); - }); - - it("Bun.file() works as stdin", async () => { - await write(Bun.file("/tmp/out.456.txt"), "hello there!"); - const { stdout } = spawn({ - cmd: ["cat"], - stdout: "pipe", - stdin: Bun.file("/tmp/out.456.txt"), - }); - - expect(await readableStreamToText(stdout)).toBe("hello there!"); - }); - - it("Bun.file() works as stdin and stdout", async () => { - await write(Bun.file("/tmp/out.456.txt"), "hello!"); - await write(Bun.file("/tmp/out.123.txt"), "wrong!"); - - const { exited } = spawn({ - cmd: ["cat"], - stdout: Bun.file("/tmp/out.123.txt"), - stdin: Bun.file("/tmp/out.456.txt"), - }); - - await exited; - expect(await Bun.file("/tmp/out.456.txt").text()).toBe("hello!"); - expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello!"); - }); +import { gcTick as _gcTick } from "gc"; +import { rmdirSync, unlinkSync, rmSync, writeFileSync } from "node:fs"; + +for (let [gcTick, label] of [ + [_gcTick, "gcTick"], + [() => {}, "no gc tick"], +]) { + describe(label, () => { + describe("spawn", () => { + const hugeString = "hello".repeat(10000).slice(); + + it("Bun.file() works as stdout", async () => { + rmSync("/tmp/out.123.txt", { force: true }); + gcTick(); + const { exited } = spawn({ + cmd: ["echo", "hello"], + stdout: Bun.file("/tmp/out.123.txt"), + }); - it("stdout can be read", async () => { - await Bun.write("/tmp/out.txt", hugeString); - const { stdout } = spawn({ - cmd: ["cat", "/tmp/out.txt"], - stdout: "pipe", - }); + await exited; + gcTick(); + expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello\n"); + }); - const text = await readableStreamToText(stdout); - expect(text).toBe(hugeString); - }); + it("Bun.file() works as stdin", async () => { + await write(Bun.file("/tmp/out.456.txt"), "hello there!"); + gcTick(); + const { stdout } = spawn({ + cmd: ["cat"], + stdout: "pipe", + stdin: Bun.file("/tmp/out.456.txt"), + }); + gcTick(); + expect(await readableStreamToText(stdout)).toBe("hello there!"); + }); - it("stdin can be read and stdout can be written", async () => { - const { stdout, stdin, exited } = spawn({ - cmd: ["bash", import.meta.dir + "/bash-echo.sh"], - stdout: "pipe", - stdin: "pipe", - stderr: "inherit", - }); + it("Bun.file() works as stdin and stdout", async () => { + writeFileSync("/tmp/out.456.txt", "hello!"); + gcTick(); + writeFileSync("/tmp/out.123.txt", "wrong!"); + gcTick(); - await stdin.write(hugeString); - await stdin.end(); + const { exited } = spawn({ + cmd: ["cat"], + stdout: Bun.file("/tmp/out.123.txt"), + stdin: Bun.file("/tmp/out.456.txt"), + }); + gcTick(); + await exited; + expect(await Bun.file("/tmp/out.456.txt").text()).toBe("hello!"); + gcTick(); + expect(await Bun.file("/tmp/out.123.txt").text()).toBe("hello!"); + }); - const text = await readableStreamToText(stdout); - expect(text.trim()).toBe(hugeString); - await exited; - }); + it("stdout can be read", async () => { + await Bun.write("/tmp/out.txt", hugeString); + gcTick(); + const { stdout } = spawn({ + cmd: ["cat", "/tmp/out.txt"], + stdout: "pipe", + }); + gcTick(); - describe("pipe", () => { - function huge() { - return spawn({ - cmd: ["echo", hugeString], - stdout: "pipe", - stdin: "pipe", - stderr: "inherit", + const text = await readableStreamToText(stdout); + gcTick(); + expect(text).toBe(hugeString); }); - } - function helloWorld() { - return spawn({ - cmd: ["echo", "hello"], - stdout: "pipe", - stdin: "pipe", + it("stdin can be read and stdout can be written", async () => { + const proc = spawn({ + cmd: ["bash", import.meta.dir + "/bash-echo.sh"], + stdout: "pipe", + stdin: "pipe", + stderr: "inherit", + }); + proc.stdin.write(hugeString); + await proc.stdin.end(true); + var text = ""; + var reader = proc.stdout.getReader(); + var done = false; + while (!done) { + var { value, done } = await reader.read(); + if (value) text += new TextDecoder().decode(value); + if (done && text.length === 0) { + reader.releaseLock(); + reader = proc.stdout.getReader(); + done = false; + } + } + + expect(text.trim().length).toBe(hugeString.length); + expect(text.trim()).toBe(hugeString); + gcTick(); + await proc.exited; }); - } - - const fixtures = [ - [helloWorld, "hello"], - [huge, hugeString], - ]; - - for (const [callback, fixture] of fixtures) { - describe(fixture.slice(0, 12), () => { - describe("should should allow reading stdout", () => { - it("before exit", async () => { - const process = callback(); - const output = await readableStreamToText(process.stdout); - const expected = fixture + "\n"; - expect(output.length).toBe(expected.length); - expect(output).toBe(expected); - - await process.exited; - }); - it("before exit (chunked)", async () => { - const process = callback(); - var output = ""; - var reader = process.stdout.getReader(); - var done = false; - while (!done) { - var { value, done } = await reader.read(); - if (value) output += new TextDecoder().decode(value); - } - - const expected = fixture + "\n"; - expect(output.length).toBe(expected.length); - expect(output).toBe(expected); - - await process.exited; + describe("pipe", () => { + function huge() { + return spawn({ + cmd: ["echo", hugeString], + stdout: "pipe", + stdin: "pipe", + stderr: "inherit", }); + } - it("after exit", async () => { - const process = callback(); - await process.stdin.end(); - - const output = await readableStreamToText(process.stdout); - const expected = fixture + "\n"; - - expect(output.length).toBe(expected.length); - expect(output).toBe(expected); - - await process.exited; + function helloWorld() { + return spawn({ + cmd: ["echo", "hello"], + stdout: "pipe", + stdin: "pipe", }); - }); + } + + const fixtures = [ + [helloWorld, "hello"], + [huge, hugeString], + ]; + + for (const [callback, fixture] of fixtures) { + describe(fixture.slice(0, 12), () => { + describe("should should allow reading stdout", () => { + it("before exit", async () => { + const process = callback(); + const output = await readableStreamToText(process.stdout); + const expected = fixture + "\n"; + expect(output.length).toBe(expected.length); + expect(output).toBe(expected); + + await process.exited; + }); + + it("before exit (chunked)", async () => { + const process = callback(); + var output = ""; + var reader = process.stdout.getReader(); + var done = false; + while (!done) { + var { value, done } = await reader.read(); + if (value) output += new TextDecoder().decode(value); + } + + const expected = fixture + "\n"; + expect(output.length).toBe(expected.length); + expect(output).toBe(expected); + + await process.exited; + }); + + it("after exit", async () => { + const process = callback(); + await process.stdin.end(); + + const output = await readableStreamToText(process.stdout); + const expected = fixture + "\n"; + + expect(output.length).toBe(expected.length); + expect(output).toBe(expected); + + await process.exited; + }); + }); + }); + } }); - } + }); }); -}); +} |