diff options
author | 2022-11-24 18:57:58 -0800 | |
---|---|---|
committer | 2022-11-24 18:57:58 -0800 | |
commit | 5a95fae533cea2be73d78518ed52cd9ebf304a59 (patch) | |
tree | d98fc72766752bbc762f792ba3de1a53ae980eb1 | |
parent | 47f0e14477721a0c1c9f5458208c632fc2e4a370 (diff) | |
download | bun-5a95fae533cea2be73d78518ed52cd9ebf304a59.tar.gz bun-5a95fae533cea2be73d78518ed52cd9ebf304a59.tar.zst bun-5a95fae533cea2be73d78518ed52cd9ebf304a59.zip |
Improve SIGPIPE handling
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 33 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 156 | ||||
-rw-r--r-- | src/global.zig | 15 |
4 files changed, 121 insertions, 85 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index a713b28f1..633e12d19 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -358,29 +358,28 @@ pub const Subprocess = struct { this.write(); } - pub fn canWrite(this: *BufferedInput) bool { - const is_writable = bun.isWritable(this.fd); - if (is_writable) { - if (this.poll_ref) |poll_ref| { - poll_ref.flags.insert(.writable); - poll_ref.flags.insert(.fifo); - std.debug.assert(poll_ref.flags.contains(.poll_writable)); - } - } - - return is_writable; - } - pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void { if (comptime !is_sync) { // we ask, "Is it possible to write right now?" // we do this rather than epoll or kqueue() // because we don't want to block the thread waiting for the write - if (!this.canWrite()) { - this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); - return; + switch (bun.isWritable(this.fd)) { + .writable => { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.writable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_writable)); + } + }, + .hup => { + this.deinit(); + return; + }, + .not_writable => { + if (!this.isWatching()) this.watch(this.fd); + return; + }, } } diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 5c996da45..f10d85c86 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -4153,7 +4153,7 @@ pub const InternalBlob = struct { } pub fn deinit(this: *@This()) void { - this.bytes.deinit(); + this.bytes.clearAndFree(); } pub inline fn slice(this: @This()) []u8 { diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index ef86a1a16..36d284553 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1218,8 +1218,8 @@ pub const FileSink = struct { return .{ .result = {} }; } - pub fn flush(this: *FileSink) StreamResult.Writable { - return flushMaybePoll(this); + pub fn flush(this: *FileSink, buf: []const u8) StreamResult.Writable { + return this.flushMaybePollWithSizeAndBuffer(buf, std.math.maxInt(usize)); } fn adjustPipeLengthOnLinux(this: *FileSink, fd: bun.FileDescriptor, remain_len: usize) void { @@ -1236,17 +1236,13 @@ pub const FileSink = struct { } } - pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable { - return flushMaybePollWithSize(this, std.math.maxInt(usize)); - } - - pub fn flushMaybePollWithSize(this: *FileSink, writable_size: usize) StreamResult.Writable { + pub fn flushMaybePollWithSizeAndBuffer(this: *FileSink, buffer: []const u8, writable_size: usize) StreamResult.Writable { std.debug.assert(this.fd != bun.invalid_fd); var total: usize = this.written; const initial = total; const fd = this.fd; - var remain = this.buffer.slice(); + var remain = buffer; remain = remain[@minimum(this.head, remain.len)..]; if (remain.len == 0) return .{ .owned = 0 }; @@ -1297,20 +1293,33 @@ pub const FileSink = struct { } } - if (!bun.isWritable(fd)) { - if (this.poll_ref) |poll| { - poll.flags.remove(.writable); - } + switch (bun.isWritable(fd)) { + .not_writable => { + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + } - if (!this.isWatching()) - this.watch(fd); + if (!this.isWatching()) + this.watch(fd); - return .{ - .pending = &this.pending, - }; - } + return .{ + .pending = &this.pending, + }; + }, + .hup => { + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + poll.flags.insert(.hup); + } - break :brk this.max_write_size; + this.cleanup(); + + return .{ + .done = {}, + }; + }, + .writable => break :brk this.max_write_size, + } } else remain.len; if (max_to_write > 0) { @@ -1334,6 +1343,11 @@ pub const FileSink = struct { .pending = &this.pending, }; }, + .PIPE => { + this.cleanup(); + this.pending.consumed = @truncate(Blob.SizeType, total - initial); + return .{ .done = {} }; + }, else => {}, } this.pending.result = .{ .err = res.err }; @@ -1358,26 +1372,40 @@ pub const FileSink = struct { // but we still have more // lets check if its writable, so we avoid blocking if (is_fifo and remain.len > 0) { - const is_writable = bun.isWritable(fd); - if (is_writable) { - if (this.poll_ref) |poll_ref| { - poll_ref.flags.insert(.writable); - poll_ref.flags.insert(.fifo); - std.debug.assert(poll_ref.flags.contains(.poll_writable)); - } - } else { - if (!this.isWatching()) - this.watch(this.fd); + switch (bun.isWritable(fd)) { + .writable => { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.writable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_writable)); + } + }, + .not_writable => { + if (!this.isWatching()) + this.watch(this.fd); - if (this.poll_ref) |poll| { - poll.flags.remove(.writable); - std.debug.assert(poll.flags.contains(.poll_writable)); - } - this.pending.consumed = @truncate(Blob.SizeType, total - initial); + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + std.debug.assert(poll.flags.contains(.poll_writable)); + } + this.pending.consumed = @truncate(Blob.SizeType, total - initial); - return .{ - .pending = &this.pending, - }; + return .{ + .pending = &this.pending, + }; + }, + .hup => { + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + poll.flags.insert(.hup); + } + + this.cleanup(); + + return .{ + .done = {}, + }; + }, } } } @@ -1415,7 +1443,7 @@ pub const FileSink = struct { if (this.isPending() or this.done) { return .{ .result = JSC.JSValue.jsUndefined() }; } - const result = this.flush(); + const result = this.flush(this.buffer.slice()); if (result == .err) { return .{ .err = result.err }; @@ -1448,7 +1476,6 @@ pub const FileSink = struct { if (this.buffer.cap > 0) { this.buffer.listManaged(this.allocator).deinit(); this.buffer = bun.ByteList.init(""); - this.done = true; this.head = 0; } @@ -1510,9 +1537,9 @@ pub const FileSink = struct { } if (comptime Environment.isMac) { - _ = this.flushMaybePollWithSize(@intCast(usize, @maximum(writable, 0))); + _ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), @intCast(usize, @maximum(writable, 0))); } else { - _ = this.flushMaybePollWithSize(std.math.maxInt(usize)); + _ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), std.math.maxInt(usize)); } } @@ -1523,12 +1550,9 @@ pub const FileSink = struct { const input = data.slice(); if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size) { - var temp = this.buffer; - defer this.buffer = temp; - this.buffer = bun.ByteList.init(input); - const result = this.flush(); + const result = this.flush(input); if (this.isPending()) { - _ = temp.write(this.allocator, input) catch { + _ = this.buffer.write(this.allocator, input) catch { return .{ .err = Syscall.Error.oom }; }; } @@ -1541,7 +1565,7 @@ pub const FileSink = struct { }; if (!this.isPending() and this.buffer.len >= this.chunk_size) { - return this.flush(); + return this.flush(this.buffer.slice()); } this.signal.ready(null, null); @@ -1556,12 +1580,9 @@ pub const FileSink = struct { const input = data.slice(); if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size and strings.isAllASCII(input)) { - var temp = this.buffer; - defer this.buffer = temp; - this.buffer = bun.ByteList.init(input); - const result = this.flush(); + const result = this.flush(input); if (this.isPending()) { - _ = temp.write(this.allocator, input) catch { + _ = this.buffer.write(this.allocator, input) catch { return .{ .err = Syscall.Error.oom }; }; } @@ -1574,7 +1595,7 @@ pub const FileSink = struct { }; if (!this.isPending() and this.buffer.len >= this.chunk_size) { - return this.flush(); + return this.flush(this.buffer.slice()); } this.signal.ready(null, null); @@ -1591,8 +1612,9 @@ pub const FileSink = struct { const len = this.buffer.writeUTF16(this.allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch { return .{ .err = Syscall.Error.oom }; }; + if (!this.isPending() and this.buffer.len >= this.chunk_size) { - return this.flush(); + return this.flush(this.buffer.slice()); } this.signal.ready(null, null); @@ -1601,8 +1623,7 @@ pub const FileSink = struct { fn isPending(this: *const FileSink) bool { if (this.done) return false; - var poll_ref = this.poll_ref orelse return false; - return poll_ref.isRegistered() and !poll_ref.flags.contains(.needs_rearm); + return this.pending.state == .pending; } pub fn close(this: *FileSink) void { @@ -1641,7 +1662,7 @@ pub const FileSink = struct { this.requested_end = true; - const flushy = this.flush(); + const flushy = this.flush(this.buffer.slice()); if (flushy == .err) { return .{ .err = flushy.err }; @@ -1668,7 +1689,7 @@ pub const FileSink = struct { return .{ .result = JSValue.jsNumber(this.written) }; } - const flushed = this.flush(); + const flushed = this.flush(this.buffer.slice()); if (flushed == .err) { return .{ .err = flushed.err }; @@ -3627,19 +3648,28 @@ pub const FIFO = struct { } pub fn ready(this: *FIFO, sizeOrOffset: i64) void { - const available_to_read = this.getAvailableToRead(sizeOrOffset); if (this.isClosed()) { - this.unwatch(this.poll_ref.?.fd); + if (this.isWatching()) + this.unwatch(this.poll_ref.?.fd); return; } - if (this.buf.len == 0 and (available_to_read orelse 1) != 0) { + if (this.buf.len == 0) { return; } - const read_result = this.read(this.buf, available_to_read); + const read_result = this.read( + this.buf, + // On Linux, we end up calling ioctl() twice if we don't do this + if (comptime Environment.isMac) + // i33 holds the same amount of unsigned space as a u32, so we truncate it there before casting + @intCast(u32, @truncate(i33, sizeOrOffset)) + else + null, + ); + if (read_result == .read and read_result.read.len == 0) { - if (this.poll_ref != null) + if (this.isWatching()) this.unwatch(this.poll_ref.?.fd); this.close(); return; diff --git a/src/global.zig b/src/global.zig index 5312b0c05..1dccb15e7 100644 --- a/src/global.zig +++ b/src/global.zig @@ -357,18 +357,25 @@ pub fn isReadable(fd: std.os.fd_t) bool { // return result; } -pub fn isWritable(fd: std.os.fd_t) bool { +pub const WritableFlag = enum { writable, not_writable, hup }; +pub fn isWritable(fd: std.os.fd_t) WritableFlag { var polls = &[_]std.os.pollfd{ .{ .fd = fd, - .events = std.os.POLL.OUT | std.os.POLL.ERR, + .events = std.os.POLL.OUT, .revents = 0, }, }; const result = (std.os.poll(polls, 0) catch 0) != 0; - global_scope_log("isWritable: {d}", .{result}); - return result; + global_scope_log("isWritable: {d} ({d})", .{ result, polls[0].revents }); + if (result and polls[0].revents & std.os.POLL.HUP != 0) { + return WritableFlag.hup; + } else if (result) { + return WritableFlag.writable; + } else { + return WritableFlag.not_writable; + } } pub inline fn unreachablePanic(comptime fmts: []const u8, args: anytype) noreturn { |