aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-23 21:31:38 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-23 21:31:38 -0800
commitbddf484c2c7c8d3aadf715f0d908516ed45caeeb (patch)
treeee7ce4eeb80b44dd44c112bf0af7bfc176f68185
parent21531f1e80327f3c64e17fb3069c6bb38e8aad0c (diff)
downloadbun-bddf484c2c7c8d3aadf715f0d908516ed45caeeb.tar.gz
bun-bddf484c2c7c8d3aadf715f0d908516ed45caeeb.tar.zst
bun-bddf484c2c7c8d3aadf715f0d908516ed45caeeb.zip
Close the streams more
-rw-r--r--src/bun.js/api/bun/subprocess.zig59
-rw-r--r--src/bun.js/webcore/streams.zig6
-rw-r--r--test/bun.js/spawn-streaming-stdin.test.ts53
-rw-r--r--test/bun.js/stdin-repro.js5
4 files changed, 94 insertions, 29 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index c85e0396f..e78f3a48b 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -110,7 +110,7 @@ pub const Subprocess = struct {
pub fn done(this: *@This()) void {
if (this.* == .stream) {
- if (this.stream.ptr == .File) this.stream.ptr.File.finish();
+ if (this.stream.ptr == .File) this.stream.ptr.File.setSignal(JSC.WebCore.Signal{});
this.stream.done();
return;
}
@@ -132,12 +132,11 @@ pub const Subprocess = struct {
}
};
- pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable {
+ pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
- _ = JSC.Node.Syscall.close(other_fd);
break :brk .{
.pipe = .{
.buffer = undefined,
@@ -164,8 +163,6 @@ pub const Subprocess = struct {
_ = JSC.Node.Syscall.close(fd);
},
.pipe => {
- if (this.pipe == .stream and this.pipe.stream.ptr == .File)
- this.pipe.stream.ptr.File.readable().FIFO.signal.clear();
this.pipe.done();
},
else => {},
@@ -692,7 +689,7 @@ pub const Subprocess = struct {
pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
- pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
+ pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
switch (stdio) {
.pipe => {
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
@@ -702,7 +699,6 @@ pub const Subprocess = struct {
.allocator = globalThis.bunVM().allocator,
.auto_close = true,
};
- if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
sink.mode = std.os.S.IFIFO;
if (stdio == .pipe) {
if (stdio.pipe) |readable| {
@@ -718,7 +714,6 @@ pub const Subprocess = struct {
return Writable{ .pipe = sink };
},
.array_buffer, .blob => {
- if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined };
switch (stdio) {
.array_buffer => |array_buffer| {
@@ -757,13 +752,14 @@ pub const Subprocess = struct {
pub fn close(this: *Writable) void {
return switch (this.*) {
.pipe => |pipe| {
- _ = pipe.end(null);
+ pipe.close();
},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
_ = pipe_to_readable_stream.pipe.end(null);
},
.fd => |fd| {
_ = JSC.Node.Syscall.close(fd);
+ this.* = .{ .ignore = {} };
},
.buffered_input => {
this.buffered_input.deinit();
@@ -778,7 +774,7 @@ pub const Subprocess = struct {
this.closeProcess();
this.stdin.close();
this.stderr.close();
- this.stdin.close();
+ this.stdout.close();
this.exit_promise.deinit();
this.on_exit_callback.deinit();
@@ -1049,19 +1045,16 @@ pub const Subprocess = struct {
globalThis.throw("failed to create stdin pipe: {s}", .{err});
return .zero;
} else undefined;
- errdefer if (stdio[0].isPiped()) destroyPipe(stdin_pipe);
const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stdout pipe: {s}", .{err});
return .zero;
} else undefined;
- errdefer if (stdio[1].isPiped()) destroyPipe(stdout_pipe);
const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stderr pipe: {s}", .{err});
return .zero;
} else undefined;
- errdefer if (stdio[2].isPiped()) destroyPipe(stderr_pipe);
stdio[0].setUpChildIoPosixSpawn(
&actions,
@@ -1096,9 +1089,25 @@ pub const Subprocess = struct {
env = @ptrCast(@TypeOf(env), env_array.items.ptr);
}
- const pid = switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @ptrCast([*:null]?[*:0]const u8, argv.items[0..].ptr), env)) {
- .err => |err| return err.toJSC(globalThis),
- .result => |pid_| pid_,
+ const pid = brk: {
+ defer {
+ if (stdio[0].isPiped()) {
+ _ = JSC.Node.Syscall.close(stdin_pipe[0]);
+ }
+
+ if (stdio[1].isPiped()) {
+ _ = JSC.Node.Syscall.close(stdout_pipe[1]);
+ }
+
+ if (stdio[2].isPiped()) {
+ _ = JSC.Node.Syscall.close(stderr_pipe[1]);
+ }
+ }
+
+ break :brk switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @ptrCast([*:null]?[*:0]const u8, argv.items[0..].ptr), env)) {
+ .err => |err| return err.toJSC(globalThis),
+ .result => |pid_| pid_,
+ };
};
const pidfd: std.os.fd_t = brk: {
@@ -1141,15 +1150,16 @@ pub const Subprocess = struct {
.globalThis = globalThis,
.pid = pid,
.pidfd = pidfd,
- .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch {
+ .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch {
globalThis.throw("out of memory", .{});
return .zero;
},
- .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis),
- .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis),
+ .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis),
+ .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.is_sync = is_sync,
};
+
if (subprocess.stdin == .pipe) {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
}
@@ -1280,11 +1290,6 @@ pub const Subprocess = struct {
this.has_waitpid_task = true;
const pid = this.pid;
- if (!sync) {
- // signal to the other end we are definitely done
- this.stdin.close();
- }
-
switch (PosixSpawn.waitpid(pid, 0)) {
.err => |err| {
this.waitpid_err = err;
@@ -1321,9 +1326,9 @@ pub const Subprocess = struct {
}
fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void {
- this.stdin.close();
- this.stdout.close();
- this.stderr.close();
+ // this.stdin.close();
+ // this.stdout.close();
+ // this.stderr.close();
defer this.updateHasPendingActivity();
this.has_waitpid_task = false;
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 8c3f3dfb1..ed5cce8d9 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1427,6 +1427,8 @@ pub const FileSink = struct {
}
fn cleanup(this: *FileSink) void {
+ this.done = true;
+
if (this.poll_ref) |poll| {
this.poll_ref = null;
poll.deinit();
@@ -1464,7 +1466,6 @@ pub const FileSink = struct {
}
pub fn onHangup(this: *FileSink) void {
- this.done = true;
this.signal.clear();
this.cleanup();
@@ -3634,7 +3635,8 @@ pub const FIFO = struct {
const read_result = this.read(this.buf, available_to_read);
if (read_result == .read and read_result.read.len == 0) {
- this.unwatch(this.poll_ref.?.fd);
+ if (this.poll_ref != null)
+ this.unwatch(this.poll_ref.?.fd);
this.close();
return;
}
diff --git a/test/bun.js/spawn-streaming-stdin.test.ts b/test/bun.js/spawn-streaming-stdin.test.ts
new file mode 100644
index 000000000..953548071
--- /dev/null
+++ b/test/bun.js/spawn-streaming-stdin.test.ts
@@ -0,0 +1,53 @@
+import { it, test, expect } from "bun:test";
+import { spawn } from "bun";
+import { bunExe } from "./bunExe";
+import { gcTick } from "gc";
+
+const N = 100;
+test("spawn can write to stdin multiple chunks", async () => {
+ for (let i = 0; i < N; i++) {
+ var exited;
+ await (async function () {
+ const proc = spawn({
+ cmd: [bunExe(), import.meta.dir + "/stdin-repro.js"],
+ stdout: "pipe",
+ stdin: "pipe",
+ stderr: "inherit",
+ env: {
+ BUN_DEBUG_QUIET_LOGS: 1,
+ },
+ });
+ exited = proc.exited;
+ var counter = 0;
+ var inCounter = 0;
+ const prom2 = (async function () {
+ while (inCounter++ < 4) {
+ await new Promise((resolve, reject) => setTimeout(resolve, 8));
+ proc.stdin.write("Wrote to stdin!");
+ await proc.stdin.flush();
+ }
+ await proc.stdin.end();
+ })();
+
+ const prom = (async function () {
+ try {
+ for await (var chunk of proc.stdout) {
+ expect(new TextDecoder().decode(chunk)).toBe("Wrote to stdin!\n");
+ counter++;
+
+ if (counter > 3) break;
+ }
+ } catch (e) {
+ console.log(e.stack);
+ throw e;
+ }
+ })();
+ await Promise.all([prom, prom2]);
+ expect(counter).toBe(4);
+ // proc.kill();
+ })();
+ await exited;
+ }
+
+ gcTick(true);
+});
diff --git a/test/bun.js/stdin-repro.js b/test/bun.js/stdin-repro.js
new file mode 100644
index 000000000..05daf0637
--- /dev/null
+++ b/test/bun.js/stdin-repro.js
@@ -0,0 +1,5 @@
+while (true) {
+ for await (let chunk of Bun.stdin.stream()) {
+ console.log(new Buffer(chunk).toString());
+ }
+}