diff options
author | 2022-11-12 20:27:51 -0800 | |
---|---|---|
committer | 2022-11-12 20:28:10 -0800 | |
commit | a78b6f920d5042c583d8e2c8ab12132ba1a5e982 (patch) | |
tree | 9a79ef5f83e2693432c1193353dd7057aa91b7b0 | |
parent | 7da520b22e2d6216a28e67382bcf6e0450e851b5 (diff) | |
download | bun-a78b6f920d5042c583d8e2c8ab12132ba1a5e982.tar.gz bun-a78b6f920d5042c583d8e2c8ab12132ba1a5e982.tar.zst bun-a78b6f920d5042c583d8e2c8ab12132ba1a5e982.zip |
Fix infinite write loop on Linux
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/webcore/streams.zig | 203 | ||||
-rw-r--r-- | src/global.zig | 16 | ||||
-rw-r--r-- | test/bun.js/spawn.test.ts | 10 |
3 files changed, 185 insertions, 44 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 2f9c509d3..4ec4f9f9c 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -299,7 +299,7 @@ pub const ReadableStream = struct { pub fn fd(this: StreamTag) JSC.Node.FileDescriptor { var bytes = @bitCast([8]u8, @enumToInt(this)); if (bytes[0] != 1) { - return std.math.maxInt(JSC.Node.FileDescriptor); + return JSC.Node.invalid_fd; } var out: u64 = 0; @bitCast([8]u8, out)[0..7].* = bytes[1..8].*; @@ -439,7 +439,7 @@ pub const StreamStart = union(Tag) { return .{ .FileSink = .{ - .input_path = .{ .fd = std.math.maxInt(JSC.Node.FileDescriptor) }, + .input_path = .{ .fd = JSC.Node.invalid_fd }, .chunk_size = chunk_size, }, }; @@ -1018,7 +1018,7 @@ pub const FileSink = struct { next: ?Sink = null, auto_close: bool = false, auto_truncate: bool = false, - fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, mode: JSC.Node.Mode = 0, chunk_size: usize = 0, pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{ @@ -1030,12 +1030,14 @@ pub const FileSink = struct { written: usize = 0, head: usize = 0, requested_end: bool = false, - + has_adjusted_pipe_size_on_linux: bool = false, + max_write_size: usize = std.math.maxInt(usize), prevent_process_exit: bool = false, reachable_from_js: bool = true, poll_ref: ?*JSC.FilePoll = null, pub usingnamespace NewReadyWatcher(@This(), .writable, ready); + const log = Output.scoped(.FileSink, false); const max_fifo_size = 64 * 1024; pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) { @@ -1063,6 +1065,7 @@ pub const FileSink = struct { this.auto_truncate = this.auto_truncate and (std.os.S.ISREG(this.mode)); } else { this.auto_truncate = false; + this.max_write_size = max_fifo_size; } this.fd = fd; @@ -1076,9 +1079,9 @@ pub const FileSink = struct { } pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) { - if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.fd != JSC.Node.invalid_fd) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; } this.done = false; @@ -1113,8 +1116,32 @@ pub const FileSink = struct { return flushMaybePoll(this); } + fn adjustPipeLengthOnLinux(this: *FileSink, fd: JSC.Node.FileDescriptor, remain_len: usize) void { + const F_SETPIPE_SZ = 1031; + const F_GETPIPE_SZ = 1032; + + // On Linux, we can adjust the pipe size to avoid blocking. + this.has_adjusted_pipe_size_on_linux = true; + var pipe_len: c_int = 0; + _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len); + if (pipe_len < 0) return; + + // If we have a valid pipe_len, then pessimistically set it to that. + this.max_write_size = @intCast(usize, pipe_len); + + if (pipe_len < remain_len) { + // If our real pipe length is less than the amount of data we have left to write, + // let's figure out what the maximum pipe size is and grow it to that. + var out_size = getMaxPipeSizeOnLinux(); + _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size); + if (out_size > 0) { + this.max_write_size = @intCast(usize, out_size); + } + } + } + pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable { - std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); + std.debug.assert(this.fd != JSC.Node.invalid_fd); var total: usize = this.written; const initial = total; @@ -1133,9 +1160,20 @@ pub const FileSink = struct { this.head += total - initial; } } + const is_fifo = this.isFIFO(); + + if (comptime Environment.isLinux) { + if (is_fifo and !this.has_adjusted_pipe_size_on_linux and remain.len >= (max_fifo_size - 1024)) { + this.adjustPipeLengthOnLinux(fd, remain.len); + } + } + + const max_to_write = if (is_fifo) this.max_write_size else remain.len; + while (remain.len > 0) { - const max_to_write = if (std.os.S.ISFIFO(this.mode)) max_fifo_size else remain.len; const write_buf = remain[0..@minimum(remain.len, max_to_write)]; + + log("Write {d} bytes (fd: {d})", .{ write_buf.len, fd }); const res = JSC.Node.Syscall.write(fd, write_buf); if (res == .err) { const retry = @@ -1143,7 +1181,12 @@ pub const FileSink = struct { switch (res.err.getErrno()) { retry => { - this.watch(fd); + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + } + + if (!this.isWatching()) + this.watch(fd); return .{ .pending = &this.pending, }; @@ -1158,26 +1201,37 @@ pub const FileSink = struct { remain = remain[res.result..]; total += res.result; - if (res.result == 0) break; + + log("Wrote {d} bytes (fd: {d})", .{ res.result, fd }); + + if (res.result == 0) { + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + } + break; + } // we flushed an entire fifo // but we still have more // lets check if its writable, so we avoid blocking - if (std.os.S.ISFIFO(this.mode) and - max_to_write == max_fifo_size and - res.result == max_fifo_size and - remain.len > 0) - { - var polls = [_]std.os.pollfd{ - .{ - .fd = fd, - .events = std.os.POLL.IN | std.os.POLL.ERR, - .revents = 0, - }, - }; + if (is_fifo and remain.len > 0) { + const is_writable = bun.isWritable(fd); + if (is_writable) { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.writable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_writable)); + } + } else { + if (!this.isWatching()) + this.watch(this.fd); + + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + std.debug.assert(poll.flags.contains(.poll_writable)); + } + this.pending.consumed = @truncate(Blob.SizeType, total - initial); - if ((std.os.poll(&polls, 0) catch 0) == 0) { - this.watch(this.fd); return .{ .pending = &this.pending, }; @@ -1189,14 +1243,24 @@ pub const FileSink = struct { .owned = @truncate(Blob.SizeType, total), }; this.pending.consumed = @truncate(Blob.SizeType, total - initial); + + if (is_fifo and remain.len == 0 and this.isWatching()) { + this.unwatch(fd); + } + if (this.requested_end) { this.done = true; + + if (is_fifo and this.isWatching()) { + this.unwatch(fd); + } + if (this.auto_truncate) std.os.ftruncate(this.fd, total) catch {}; if (this.auto_close) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; } } this.pending.run(); @@ -1219,7 +1283,7 @@ pub const FileSink = struct { } fn cleanup(this: *FileSink) void { - if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.fd != JSC.Node.invalid_fd) { if (this.scheduled_count > 0) { this.scheduled_count = 0; if (this.poll_ref) |poll| { @@ -1229,7 +1293,7 @@ pub const FileSink = struct { } _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; } if (this.buffer.cap > 0) { @@ -1278,6 +1342,16 @@ pub const FileSink = struct { } pub fn ready(this: *FileSink, _: i64) void { + var remain = this.buffer.slice(); + const pending = remain[@minimum(this.head, remain.len)..].len; + if (pending == 0) { + if (this.isWatching()) { + this.unwatch(this.fd); + } + + return; + } + _ = this.flushMaybePoll(); } @@ -3508,7 +3582,9 @@ pub const FileBlobLoader = struct { std.debug.assert(this.started); std.debug.assert(read_buf.len > 0); - if (this.fd == std.math.maxInt(JSC.Node.FileDescriptor)) { + const fd = this.fd; + + if (fd == JSC.Node.invalid_fd) { std.debug.assert(this.poll_ref == null); return .{ .done = {} }; } @@ -3528,7 +3604,7 @@ pub const FileBlobLoader = struct { if (comptime Environment.isLinux) { if (len == 0) { const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD; - const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len); + const rc: c_int = std.c.ioctl(fd, FIONREAD, &len); if (rc != 0) { len = 0; } @@ -3556,14 +3632,14 @@ pub const FileBlobLoader = struct { } if (!this.has_adjusted_pipe_size_on_linux) { - if (len + 1024 > 16 * std.mem.page_size) { + if (len >= std.mem.page_size * 16) { this.has_adjusted_pipe_size_on_linux = true; var pipe_len: c_int = 0; - _ = std.c.fcntl(this.fd, F_GETPIPE_SZ, &pipe_len); + _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len); - if (pipe_len <= 16 * std.mem.page_size) { - var out_size: c_int = 512 * 1024; - _ = std.c.fcntl(this.fd, F_SETPIPE_SZ, &out_size); + if (pipe_len > 0 and pipe_len < std.mem.page_size * 16) { + var out_size: c_int = getMaxPipeSizeOnLinux(); + _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size); } } } @@ -3613,7 +3689,7 @@ pub const FileBlobLoader = struct { } } - const rc = Syscall.read(this.fd, buf_to_use); + const rc = Syscall.read(fd, buf_to_use); switch (rc) { .err => |err| { @@ -3637,6 +3713,11 @@ pub const FileBlobLoader = struct { switch (errno) { retry => { if (this.finished) { + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } + return .{ .done = {} }; } @@ -3702,14 +3783,6 @@ pub const FileBlobLoader = struct { } } - pub fn isFIFO(this: *const FileBlobLoader) bool { - if (this.poll_ref) |poll| { - return poll.flags.contains(.fifo) or poll.flags.contains(.tty); - } - - return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode); - } - /// Called from Poller pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void { std.debug.assert(this.started); @@ -3832,6 +3905,18 @@ pub fn NewReadyWatcher( const Watcher = @This(); + pub inline fn isFIFO(this: *const Context) bool { + if (this.poll_ref) |poll| { + return poll.flags.contains(.fifo) or poll.flags.contains(.tty); + } + + if (@hasField(Context, "mode")) { + return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode); + } + + return false; + } + pub fn onPoll(this: *Context, sizeOrOffset: i64, _: u16) void { ready(this, sizeOrOffset); } @@ -3856,7 +3941,7 @@ pub fn NewReadyWatcher( }; } - pub fn isWatching(this: *Context) bool { + pub fn isWatching(this: *const Context) bool { if (this.poll_ref) |poll| { return poll.flags.contains(flag.poll()); } @@ -3898,3 +3983,33 @@ pub fn NewReadyWatcher( // }; // } +fn getMaxPipeSizeOnLinux() c_int { + return bun.once(struct { + fn once() c_int { + const default_out_size = 512 * 1024; + const pipe_max_size_fd = switch (JSC.Node.Syscall.open("/proc/sys/fs/pipe-max-size", std.os.O.RDONLY, 0)) { + .result => |fd2| fd2, + .err => |err| { + Output.debug("Failed to open /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno}); + return default_out_size; + }, + }; + defer _ = JSC.Node.Syscall.close(pipe_max_size_fd); + var max_pipe_size_buf: [128]u8 = undefined; + const max_pipe_size = switch (JSC.Node.Syscall.read(pipe_max_size_fd, max_pipe_size_buf[0..])) { + .result => |bytes_read| std.fmt.parseInt(i64, strings.trim(max_pipe_size_buf[0..bytes_read], "\n"), 10) catch |err| { + Output.debug("Failed to parse /proc/sys/fs/pipe-max-size: {any}\n", .{@errorName(err)}); + return default_out_size; + }, + .err => |err| { + Output.debug("Failed to read /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno}); + return default_out_size; + }, + }; + + // we set the absolute max to 8 MB because honestly that's a huge pipe + // my current linux machine only goes up to 1 MB, so that's very unlikely to be hit + return @minimum(@truncate(c_int, max_pipe_size), 1024 * 1024 * 8); + } + }.once, c_int); +} diff --git a/src/global.zig b/src/global.zig index 7a9ef2657..6135e807d 100644 --- a/src/global.zig +++ b/src/global.zig @@ -380,3 +380,19 @@ pub const HTTPThead = @import("./http_client_async.zig").HTTPThread; pub const Analytics = @import("./analytics/analytics_thread.zig"); pub usingnamespace @import("./tagged_pointer.zig"); + +pub fn once(comptime function: anytype, comptime ReturnType: type) ReturnType { + const Result = struct { + var value: ReturnType = undefined; + var ran = false; + + pub fn execute() ReturnType { + if (ran) return value; + ran = true; + value = function(); + return value; + } + }; + + return Result.execute(); +} diff --git a/test/bun.js/spawn.test.ts b/test/bun.js/spawn.test.ts index bc00964c1..5be0b1ddc 100644 --- a/test/bun.js/spawn.test.ts +++ b/test/bun.js/spawn.test.ts @@ -92,6 +92,16 @@ for (let [gcTick, label] of [ expect(exitCode2).toBe(1); }); + it("nothing to stdout and sleeping doesn't keep process open 4ever", async () => { + const proc = spawn({ + cmd: ["sleep", "0.1"], + }); + + for await (const _ of proc.stdout!) { + throw new Error("should not happen"); + } + }); + it("check exit code from onExit", async () => { var exitCode1, exitCode2; await new Promise<void>((resolve) => { |