diff options
author | 2022-11-23 21:31:38 -0800 | |
---|---|---|
committer | 2022-11-23 21:31:38 -0800 | |
commit | bddf484c2c7c8d3aadf715f0d908516ed45caeeb (patch) | |
tree | ee7ce4eeb80b44dd44c112bf0af7bfc176f68185 | |
parent | 21531f1e80327f3c64e17fb3069c6bb38e8aad0c (diff) | |
download | bun-bddf484c2c7c8d3aadf715f0d908516ed45caeeb.tar.gz bun-bddf484c2c7c8d3aadf715f0d908516ed45caeeb.tar.zst bun-bddf484c2c7c8d3aadf715f0d908516ed45caeeb.zip |
Close the streams more
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 59 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 6 | ||||
-rw-r--r-- | test/bun.js/spawn-streaming-stdin.test.ts | 53 | ||||
-rw-r--r-- | test/bun.js/stdin-repro.js | 5 |
4 files changed, 94 insertions, 29 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index c85e0396f..e78f3a48b 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -110,7 +110,7 @@ pub const Subprocess = struct { pub fn done(this: *@This()) void { if (this.* == .stream) { - if (this.stream.ptr == .File) this.stream.ptr.File.finish(); + if (this.stream.ptr == .File) this.stream.ptr.File.setSignal(JSC.WebCore.Signal{}); this.stream.done(); return; } @@ -132,12 +132,11 @@ pub const Subprocess = struct { } }; - pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable { + pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable { return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, .pipe => brk: { - _ = JSC.Node.Syscall.close(other_fd); break :brk .{ .pipe = .{ .buffer = undefined, @@ -164,8 +163,6 @@ pub const Subprocess = struct { _ = JSC.Node.Syscall.close(fd); }, .pipe => { - if (this.pipe == .stream and this.pipe.stream.ptr == .File) - this.pipe.stream.ptr.File.readable().FIFO.signal.clear(); this.pipe.done(); }, else => {}, @@ -692,7 +689,7 @@ pub const Subprocess = struct { pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {} pub fn onStart(_: *Writable) void {} - pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { + pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { switch (stdio) { .pipe => { var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); @@ -702,7 +699,6 @@ pub const Subprocess = struct { .allocator = globalThis.bunVM().allocator, .auto_close = true, }; - if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd); sink.mode = std.os.S.IFIFO; if (stdio == .pipe) { if (stdio.pipe) |readable| { @@ -718,7 +714,6 @@ pub const Subprocess = struct { return Writable{ .pipe = sink }; }, .array_buffer, .blob => { - if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd); var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined }; switch (stdio) { .array_buffer => |array_buffer| { @@ -757,13 +752,14 @@ pub const Subprocess = struct { pub fn close(this: *Writable) void { return switch (this.*) { .pipe => |pipe| { - _ = pipe.end(null); + pipe.close(); }, .pipe_to_readable_stream => |*pipe_to_readable_stream| { _ = pipe_to_readable_stream.pipe.end(null); }, .fd => |fd| { _ = JSC.Node.Syscall.close(fd); + this.* = .{ .ignore = {} }; }, .buffered_input => { this.buffered_input.deinit(); @@ -778,7 +774,7 @@ pub const Subprocess = struct { this.closeProcess(); this.stdin.close(); this.stderr.close(); - this.stdin.close(); + this.stdout.close(); this.exit_promise.deinit(); this.on_exit_callback.deinit(); @@ -1049,19 +1045,16 @@ pub const Subprocess = struct { globalThis.throw("failed to create stdin pipe: {s}", .{err}); return .zero; } else undefined; - errdefer if (stdio[0].isPiped()) destroyPipe(stdin_pipe); const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stdout pipe: {s}", .{err}); return .zero; } else undefined; - errdefer if (stdio[1].isPiped()) destroyPipe(stdout_pipe); const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(0) catch |err| { globalThis.throw("failed to create stderr pipe: {s}", .{err}); return .zero; } else undefined; - errdefer if (stdio[2].isPiped()) destroyPipe(stderr_pipe); stdio[0].setUpChildIoPosixSpawn( &actions, @@ -1096,9 +1089,25 @@ pub const Subprocess = struct { env = @ptrCast(@TypeOf(env), env_array.items.ptr); } - const pid = switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @ptrCast([*:null]?[*:0]const u8, argv.items[0..].ptr), env)) { - .err => |err| return err.toJSC(globalThis), - .result => |pid_| pid_, + const pid = brk: { + defer { + if (stdio[0].isPiped()) { + _ = JSC.Node.Syscall.close(stdin_pipe[0]); + } + + if (stdio[1].isPiped()) { + _ = JSC.Node.Syscall.close(stdout_pipe[1]); + } + + if (stdio[2].isPiped()) { + _ = JSC.Node.Syscall.close(stderr_pipe[1]); + } + } + + break :brk switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @ptrCast([*:null]?[*:0]const u8, argv.items[0..].ptr), env)) { + .err => |err| return err.toJSC(globalThis), + .result => |pid_| pid_, + }; }; const pidfd: std.os.fd_t = brk: { @@ -1141,15 +1150,16 @@ pub const Subprocess = struct { .globalThis = globalThis, .pid = pid, .pidfd = pidfd, - .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch { + .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch { globalThis.throw("out of memory", .{}); return .zero; }, - .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis), - .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis), + .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis), + .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .is_sync = is_sync, }; + if (subprocess.stdin == .pipe) { subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin); } @@ -1280,11 +1290,6 @@ pub const Subprocess = struct { this.has_waitpid_task = true; const pid = this.pid; - if (!sync) { - // signal to the other end we are definitely done - this.stdin.close(); - } - switch (PosixSpawn.waitpid(pid, 0)) { .err => |err| { this.waitpid_err = err; @@ -1321,9 +1326,9 @@ pub const Subprocess = struct { } fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void { - this.stdin.close(); - this.stdout.close(); - this.stderr.close(); + // this.stdin.close(); + // this.stdout.close(); + // this.stderr.close(); defer this.updateHasPendingActivity(); this.has_waitpid_task = false; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 8c3f3dfb1..ed5cce8d9 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1427,6 +1427,8 @@ pub const FileSink = struct { } fn cleanup(this: *FileSink) void { + this.done = true; + if (this.poll_ref) |poll| { this.poll_ref = null; poll.deinit(); @@ -1464,7 +1466,6 @@ pub const FileSink = struct { } pub fn onHangup(this: *FileSink) void { - this.done = true; this.signal.clear(); this.cleanup(); @@ -3634,7 +3635,8 @@ pub const FIFO = struct { const read_result = this.read(this.buf, available_to_read); if (read_result == .read and read_result.read.len == 0) { - this.unwatch(this.poll_ref.?.fd); + if (this.poll_ref != null) + this.unwatch(this.poll_ref.?.fd); this.close(); return; } diff --git a/test/bun.js/spawn-streaming-stdin.test.ts b/test/bun.js/spawn-streaming-stdin.test.ts new file mode 100644 index 000000000..953548071 --- /dev/null +++ b/test/bun.js/spawn-streaming-stdin.test.ts @@ -0,0 +1,53 @@ +import { it, test, expect } from "bun:test"; +import { spawn } from "bun"; +import { bunExe } from "./bunExe"; +import { gcTick } from "gc"; + +const N = 100; +test("spawn can write to stdin multiple chunks", async () => { + for (let i = 0; i < N; i++) { + var exited; + await (async function () { + const proc = spawn({ + cmd: [bunExe(), import.meta.dir + "/stdin-repro.js"], + stdout: "pipe", + stdin: "pipe", + stderr: "inherit", + env: { + BUN_DEBUG_QUIET_LOGS: 1, + }, + }); + exited = proc.exited; + var counter = 0; + var inCounter = 0; + const prom2 = (async function () { + while (inCounter++ < 4) { + await new Promise((resolve, reject) => setTimeout(resolve, 8)); + proc.stdin.write("Wrote to stdin!"); + await proc.stdin.flush(); + } + await proc.stdin.end(); + })(); + + const prom = (async function () { + try { + for await (var chunk of proc.stdout) { + expect(new TextDecoder().decode(chunk)).toBe("Wrote to stdin!\n"); + counter++; + + if (counter > 3) break; + } + } catch (e) { + console.log(e.stack); + throw e; + } + })(); + await Promise.all([prom, prom2]); + expect(counter).toBe(4); + // proc.kill(); + })(); + await exited; + } + + gcTick(true); +}); diff --git a/test/bun.js/stdin-repro.js b/test/bun.js/stdin-repro.js new file mode 100644 index 000000000..05daf0637 --- /dev/null +++ b/test/bun.js/stdin-repro.js @@ -0,0 +1,5 @@ +while (true) { + for await (let chunk of Bun.stdin.stream()) { + console.log(new Buffer(chunk).toString()); + } +} |