aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-11-12 20:27:51 -0800
committerGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-11-12 20:28:10 -0800
commita78b6f920d5042c583d8e2c8ab12132ba1a5e982 (patch)
tree9a79ef5f83e2693432c1193353dd7057aa91b7b0
parent7da520b22e2d6216a28e67382bcf6e0450e851b5 (diff)
downloadbun-a78b6f920d5042c583d8e2c8ab12132ba1a5e982.tar.gz
bun-a78b6f920d5042c583d8e2c8ab12132ba1a5e982.tar.zst
bun-a78b6f920d5042c583d8e2c8ab12132ba1a5e982.zip
Fix infinite write loop on Linux
Diffstat (limited to '')
-rw-r--r--src/bun.js/webcore/streams.zig203
-rw-r--r--src/global.zig16
-rw-r--r--test/bun.js/spawn.test.ts10
3 files changed, 185 insertions, 44 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 2f9c509d3..4ec4f9f9c 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -299,7 +299,7 @@ pub const ReadableStream = struct {
pub fn fd(this: StreamTag) JSC.Node.FileDescriptor {
var bytes = @bitCast([8]u8, @enumToInt(this));
if (bytes[0] != 1) {
- return std.math.maxInt(JSC.Node.FileDescriptor);
+ return JSC.Node.invalid_fd;
}
var out: u64 = 0;
@bitCast([8]u8, out)[0..7].* = bytes[1..8].*;
@@ -439,7 +439,7 @@ pub const StreamStart = union(Tag) {
return .{
.FileSink = .{
- .input_path = .{ .fd = std.math.maxInt(JSC.Node.FileDescriptor) },
+ .input_path = .{ .fd = JSC.Node.invalid_fd },
.chunk_size = chunk_size,
},
};
@@ -1018,7 +1018,7 @@ pub const FileSink = struct {
next: ?Sink = null,
auto_close: bool = false,
auto_truncate: bool = false,
- fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor),
+ fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
mode: JSC.Node.Mode = 0,
chunk_size: usize = 0,
pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{
@@ -1030,12 +1030,14 @@ pub const FileSink = struct {
written: usize = 0,
head: usize = 0,
requested_end: bool = false,
-
+ has_adjusted_pipe_size_on_linux: bool = false,
+ max_write_size: usize = std.math.maxInt(usize),
prevent_process_exit: bool = false,
reachable_from_js: bool = true,
poll_ref: ?*JSC.FilePoll = null,
pub usingnamespace NewReadyWatcher(@This(), .writable, ready);
+ const log = Output.scoped(.FileSink, false);
const max_fifo_size = 64 * 1024;
pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) {
@@ -1063,6 +1065,7 @@ pub const FileSink = struct {
this.auto_truncate = this.auto_truncate and (std.os.S.ISREG(this.mode));
} else {
this.auto_truncate = false;
+ this.max_write_size = max_fifo_size;
}
this.fd = fd;
@@ -1076,9 +1079,9 @@ pub const FileSink = struct {
}
pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) {
- if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ if (this.fd != JSC.Node.invalid_fd) {
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ this.fd = JSC.Node.invalid_fd;
}
this.done = false;
@@ -1113,8 +1116,32 @@ pub const FileSink = struct {
return flushMaybePoll(this);
}
+ fn adjustPipeLengthOnLinux(this: *FileSink, fd: JSC.Node.FileDescriptor, remain_len: usize) void {
+ const F_SETPIPE_SZ = 1031;
+ const F_GETPIPE_SZ = 1032;
+
+ // On Linux, we can adjust the pipe size to avoid blocking.
+ this.has_adjusted_pipe_size_on_linux = true;
+ var pipe_len: c_int = 0;
+ _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len);
+ if (pipe_len < 0) return;
+
+ // If we have a valid pipe_len, then pessimistically set it to that.
+ this.max_write_size = @intCast(usize, pipe_len);
+
+ if (pipe_len < remain_len) {
+ // If our real pipe length is less than the amount of data we have left to write,
+ // let's figure out what the maximum pipe size is and grow it to that.
+ var out_size = getMaxPipeSizeOnLinux();
+ _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size);
+ if (out_size > 0) {
+ this.max_write_size = @intCast(usize, out_size);
+ }
+ }
+ }
+
pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable {
- std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ std.debug.assert(this.fd != JSC.Node.invalid_fd);
var total: usize = this.written;
const initial = total;
@@ -1133,9 +1160,20 @@ pub const FileSink = struct {
this.head += total - initial;
}
}
+ const is_fifo = this.isFIFO();
+
+ if (comptime Environment.isLinux) {
+ if (is_fifo and !this.has_adjusted_pipe_size_on_linux and remain.len >= (max_fifo_size - 1024)) {
+ this.adjustPipeLengthOnLinux(fd, remain.len);
+ }
+ }
+
+ const max_to_write = if (is_fifo) this.max_write_size else remain.len;
+
while (remain.len > 0) {
- const max_to_write = if (std.os.S.ISFIFO(this.mode)) max_fifo_size else remain.len;
const write_buf = remain[0..@minimum(remain.len, max_to_write)];
+
+ log("Write {d} bytes (fd: {d})", .{ write_buf.len, fd });
const res = JSC.Node.Syscall.write(fd, write_buf);
if (res == .err) {
const retry =
@@ -1143,7 +1181,12 @@ pub const FileSink = struct {
switch (res.err.getErrno()) {
retry => {
- this.watch(fd);
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ }
+
+ if (!this.isWatching())
+ this.watch(fd);
return .{
.pending = &this.pending,
};
@@ -1158,26 +1201,37 @@ pub const FileSink = struct {
remain = remain[res.result..];
total += res.result;
- if (res.result == 0) break;
+
+ log("Wrote {d} bytes (fd: {d})", .{ res.result, fd });
+
+ if (res.result == 0) {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ }
+ break;
+ }
// we flushed an entire fifo
// but we still have more
// lets check if its writable, so we avoid blocking
- if (std.os.S.ISFIFO(this.mode) and
- max_to_write == max_fifo_size and
- res.result == max_fifo_size and
- remain.len > 0)
- {
- var polls = [_]std.os.pollfd{
- .{
- .fd = fd,
- .events = std.os.POLL.IN | std.os.POLL.ERR,
- .revents = 0,
- },
- };
+ if (is_fifo and remain.len > 0) {
+ const is_writable = bun.isWritable(fd);
+ if (is_writable) {
+ if (this.poll_ref) |poll_ref| {
+ poll_ref.flags.insert(.writable);
+ poll_ref.flags.insert(.fifo);
+ std.debug.assert(poll_ref.flags.contains(.poll_writable));
+ }
+ } else {
+ if (!this.isWatching())
+ this.watch(this.fd);
+
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ std.debug.assert(poll.flags.contains(.poll_writable));
+ }
+ this.pending.consumed = @truncate(Blob.SizeType, total - initial);
- if ((std.os.poll(&polls, 0) catch 0) == 0) {
- this.watch(this.fd);
return .{
.pending = &this.pending,
};
@@ -1189,14 +1243,24 @@ pub const FileSink = struct {
.owned = @truncate(Blob.SizeType, total),
};
this.pending.consumed = @truncate(Blob.SizeType, total - initial);
+
+ if (is_fifo and remain.len == 0 and this.isWatching()) {
+ this.unwatch(fd);
+ }
+
if (this.requested_end) {
this.done = true;
+
+ if (is_fifo and this.isWatching()) {
+ this.unwatch(fd);
+ }
+
if (this.auto_truncate)
std.os.ftruncate(this.fd, total) catch {};
if (this.auto_close) {
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ this.fd = JSC.Node.invalid_fd;
}
}
this.pending.run();
@@ -1219,7 +1283,7 @@ pub const FileSink = struct {
}
fn cleanup(this: *FileSink) void {
- if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ if (this.fd != JSC.Node.invalid_fd) {
if (this.scheduled_count > 0) {
this.scheduled_count = 0;
if (this.poll_ref) |poll| {
@@ -1229,7 +1293,7 @@ pub const FileSink = struct {
}
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ this.fd = JSC.Node.invalid_fd;
}
if (this.buffer.cap > 0) {
@@ -1278,6 +1342,16 @@ pub const FileSink = struct {
}
pub fn ready(this: *FileSink, _: i64) void {
+ var remain = this.buffer.slice();
+ const pending = remain[@minimum(this.head, remain.len)..].len;
+ if (pending == 0) {
+ if (this.isWatching()) {
+ this.unwatch(this.fd);
+ }
+
+ return;
+ }
+
_ = this.flushMaybePoll();
}
@@ -3508,7 +3582,9 @@ pub const FileBlobLoader = struct {
std.debug.assert(this.started);
std.debug.assert(read_buf.len > 0);
- if (this.fd == std.math.maxInt(JSC.Node.FileDescriptor)) {
+ const fd = this.fd;
+
+ if (fd == JSC.Node.invalid_fd) {
std.debug.assert(this.poll_ref == null);
return .{ .done = {} };
}
@@ -3528,7 +3604,7 @@ pub const FileBlobLoader = struct {
if (comptime Environment.isLinux) {
if (len == 0) {
const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD;
- const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len);
+ const rc: c_int = std.c.ioctl(fd, FIONREAD, &len);
if (rc != 0) {
len = 0;
}
@@ -3556,14 +3632,14 @@ pub const FileBlobLoader = struct {
}
if (!this.has_adjusted_pipe_size_on_linux) {
- if (len + 1024 > 16 * std.mem.page_size) {
+ if (len >= std.mem.page_size * 16) {
this.has_adjusted_pipe_size_on_linux = true;
var pipe_len: c_int = 0;
- _ = std.c.fcntl(this.fd, F_GETPIPE_SZ, &pipe_len);
+ _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len);
- if (pipe_len <= 16 * std.mem.page_size) {
- var out_size: c_int = 512 * 1024;
- _ = std.c.fcntl(this.fd, F_SETPIPE_SZ, &out_size);
+ if (pipe_len > 0 and pipe_len < std.mem.page_size * 16) {
+ var out_size: c_int = getMaxPipeSizeOnLinux();
+ _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size);
}
}
}
@@ -3613,7 +3689,7 @@ pub const FileBlobLoader = struct {
}
}
- const rc = Syscall.read(this.fd, buf_to_use);
+ const rc = Syscall.read(fd, buf_to_use);
switch (rc) {
.err => |err| {
@@ -3637,6 +3713,11 @@ pub const FileBlobLoader = struct {
switch (errno) {
retry => {
if (this.finished) {
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinit();
+ }
+
return .{ .done = {} };
}
@@ -3702,14 +3783,6 @@ pub const FileBlobLoader = struct {
}
}
- pub fn isFIFO(this: *const FileBlobLoader) bool {
- if (this.poll_ref) |poll| {
- return poll.flags.contains(.fifo) or poll.flags.contains(.tty);
- }
-
- return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode);
- }
-
/// Called from Poller
pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void {
std.debug.assert(this.started);
@@ -3832,6 +3905,18 @@ pub fn NewReadyWatcher(
const Watcher = @This();
+ pub inline fn isFIFO(this: *const Context) bool {
+ if (this.poll_ref) |poll| {
+ return poll.flags.contains(.fifo) or poll.flags.contains(.tty);
+ }
+
+ if (@hasField(Context, "mode")) {
+ return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode);
+ }
+
+ return false;
+ }
+
pub fn onPoll(this: *Context, sizeOrOffset: i64, _: u16) void {
ready(this, sizeOrOffset);
}
@@ -3856,7 +3941,7 @@ pub fn NewReadyWatcher(
};
}
- pub fn isWatching(this: *Context) bool {
+ pub fn isWatching(this: *const Context) bool {
if (this.poll_ref) |poll| {
return poll.flags.contains(flag.poll());
}
@@ -3898,3 +3983,33 @@ pub fn NewReadyWatcher(
// };
// }
+fn getMaxPipeSizeOnLinux() c_int {
+ return bun.once(struct {
+ fn once() c_int {
+ const default_out_size = 512 * 1024;
+ const pipe_max_size_fd = switch (JSC.Node.Syscall.open("/proc/sys/fs/pipe-max-size", std.os.O.RDONLY, 0)) {
+ .result => |fd2| fd2,
+ .err => |err| {
+ Output.debug("Failed to open /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno});
+ return default_out_size;
+ },
+ };
+ defer _ = JSC.Node.Syscall.close(pipe_max_size_fd);
+ var max_pipe_size_buf: [128]u8 = undefined;
+ const max_pipe_size = switch (JSC.Node.Syscall.read(pipe_max_size_fd, max_pipe_size_buf[0..])) {
+ .result => |bytes_read| std.fmt.parseInt(i64, strings.trim(max_pipe_size_buf[0..bytes_read], "\n"), 10) catch |err| {
+ Output.debug("Failed to parse /proc/sys/fs/pipe-max-size: {any}\n", .{@errorName(err)});
+ return default_out_size;
+ },
+ .err => |err| {
+ Output.debug("Failed to read /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno});
+ return default_out_size;
+ },
+ };
+
+ // we set the absolute max to 8 MB because honestly that's a huge pipe
+ // my current linux machine only goes up to 1 MB, so that's very unlikely to be hit
+ return @minimum(@truncate(c_int, max_pipe_size), 1024 * 1024 * 8);
+ }
+ }.once, c_int);
+}
diff --git a/src/global.zig b/src/global.zig
index 7a9ef2657..6135e807d 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -380,3 +380,19 @@ pub const HTTPThead = @import("./http_client_async.zig").HTTPThread;
pub const Analytics = @import("./analytics/analytics_thread.zig");
pub usingnamespace @import("./tagged_pointer.zig");
+
+pub fn once(comptime function: anytype, comptime ReturnType: type) ReturnType {
+ const Result = struct {
+ var value: ReturnType = undefined;
+ var ran = false;
+
+ pub fn execute() ReturnType {
+ if (ran) return value;
+ ran = true;
+ value = function();
+ return value;
+ }
+ };
+
+ return Result.execute();
+}
diff --git a/test/bun.js/spawn.test.ts b/test/bun.js/spawn.test.ts
index bc00964c1..5be0b1ddc 100644
--- a/test/bun.js/spawn.test.ts
+++ b/test/bun.js/spawn.test.ts
@@ -92,6 +92,16 @@ for (let [gcTick, label] of [
expect(exitCode2).toBe(1);
});
+ it("nothing to stdout and sleeping doesn't keep process open 4ever", async () => {
+ const proc = spawn({
+ cmd: ["sleep", "0.1"],
+ });
+
+ for await (const _ of proc.stdout!) {
+ throw new Error("should not happen");
+ }
+ });
+
it("check exit code from onExit", async () => {
var exitCode1, exitCode2;
await new Promise<void>((resolve) => {