aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-24 18:57:58 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-24 18:57:58 -0800
commit5a95fae533cea2be73d78518ed52cd9ebf304a59 (patch)
treed98fc72766752bbc762f792ba3de1a53ae980eb1
parent47f0e14477721a0c1c9f5458208c632fc2e4a370 (diff)
downloadbun-5a95fae533cea2be73d78518ed52cd9ebf304a59.tar.gz
bun-5a95fae533cea2be73d78518ed52cd9ebf304a59.tar.zst
bun-5a95fae533cea2be73d78518ed52cd9ebf304a59.zip
Improve SIGPIPE handling
Diffstat (limited to '')
-rw-r--r--src/bun.js/api/bun/subprocess.zig33
-rw-r--r--src/bun.js/webcore/response.zig2
-rw-r--r--src/bun.js/webcore/streams.zig156
-rw-r--r--src/global.zig15
4 files changed, 121 insertions, 85 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index a713b28f1..633e12d19 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -358,29 +358,28 @@ pub const Subprocess = struct {
this.write();
}
- pub fn canWrite(this: *BufferedInput) bool {
- const is_writable = bun.isWritable(this.fd);
- if (is_writable) {
- if (this.poll_ref) |poll_ref| {
- poll_ref.flags.insert(.writable);
- poll_ref.flags.insert(.fifo);
- std.debug.assert(poll_ref.flags.contains(.poll_writable));
- }
- }
-
- return is_writable;
- }
-
pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void {
if (comptime !is_sync) {
// we ask, "Is it possible to write right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the write
- if (!this.canWrite()) {
- this.watch(this.fd);
- this.poll_ref.?.flags.insert(.fifo);
- return;
+ switch (bun.isWritable(this.fd)) {
+ .writable => {
+ if (this.poll_ref) |poll_ref| {
+ poll_ref.flags.insert(.writable);
+ poll_ref.flags.insert(.fifo);
+ std.debug.assert(poll_ref.flags.contains(.poll_writable));
+ }
+ },
+ .hup => {
+ this.deinit();
+ return;
+ },
+ .not_writable => {
+ if (!this.isWatching()) this.watch(this.fd);
+ return;
+ },
}
}
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 5c996da45..f10d85c86 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -4153,7 +4153,7 @@ pub const InternalBlob = struct {
}
pub fn deinit(this: *@This()) void {
- this.bytes.deinit();
+ this.bytes.clearAndFree();
}
pub inline fn slice(this: @This()) []u8 {
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index ef86a1a16..36d284553 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1218,8 +1218,8 @@ pub const FileSink = struct {
return .{ .result = {} };
}
- pub fn flush(this: *FileSink) StreamResult.Writable {
- return flushMaybePoll(this);
+ pub fn flush(this: *FileSink, buf: []const u8) StreamResult.Writable {
+ return this.flushMaybePollWithSizeAndBuffer(buf, std.math.maxInt(usize));
}
fn adjustPipeLengthOnLinux(this: *FileSink, fd: bun.FileDescriptor, remain_len: usize) void {
@@ -1236,17 +1236,13 @@ pub const FileSink = struct {
}
}
- pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable {
- return flushMaybePollWithSize(this, std.math.maxInt(usize));
- }
-
- pub fn flushMaybePollWithSize(this: *FileSink, writable_size: usize) StreamResult.Writable {
+ pub fn flushMaybePollWithSizeAndBuffer(this: *FileSink, buffer: []const u8, writable_size: usize) StreamResult.Writable {
std.debug.assert(this.fd != bun.invalid_fd);
var total: usize = this.written;
const initial = total;
const fd = this.fd;
- var remain = this.buffer.slice();
+ var remain = buffer;
remain = remain[@minimum(this.head, remain.len)..];
if (remain.len == 0) return .{ .owned = 0 };
@@ -1297,20 +1293,33 @@ pub const FileSink = struct {
}
}
- if (!bun.isWritable(fd)) {
- if (this.poll_ref) |poll| {
- poll.flags.remove(.writable);
- }
+ switch (bun.isWritable(fd)) {
+ .not_writable => {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ }
- if (!this.isWatching())
- this.watch(fd);
+ if (!this.isWatching())
+ this.watch(fd);
- return .{
- .pending = &this.pending,
- };
- }
+ return .{
+ .pending = &this.pending,
+ };
+ },
+ .hup => {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ poll.flags.insert(.hup);
+ }
- break :brk this.max_write_size;
+ this.cleanup();
+
+ return .{
+ .done = {},
+ };
+ },
+ .writable => break :brk this.max_write_size,
+ }
} else remain.len;
if (max_to_write > 0) {
@@ -1334,6 +1343,11 @@ pub const FileSink = struct {
.pending = &this.pending,
};
},
+ .PIPE => {
+ this.cleanup();
+ this.pending.consumed = @truncate(Blob.SizeType, total - initial);
+ return .{ .done = {} };
+ },
else => {},
}
this.pending.result = .{ .err = res.err };
@@ -1358,26 +1372,40 @@ pub const FileSink = struct {
// but we still have more
// lets check if its writable, so we avoid blocking
if (is_fifo and remain.len > 0) {
- const is_writable = bun.isWritable(fd);
- if (is_writable) {
- if (this.poll_ref) |poll_ref| {
- poll_ref.flags.insert(.writable);
- poll_ref.flags.insert(.fifo);
- std.debug.assert(poll_ref.flags.contains(.poll_writable));
- }
- } else {
- if (!this.isWatching())
- this.watch(this.fd);
+ switch (bun.isWritable(fd)) {
+ .writable => {
+ if (this.poll_ref) |poll_ref| {
+ poll_ref.flags.insert(.writable);
+ poll_ref.flags.insert(.fifo);
+ std.debug.assert(poll_ref.flags.contains(.poll_writable));
+ }
+ },
+ .not_writable => {
+ if (!this.isWatching())
+ this.watch(this.fd);
- if (this.poll_ref) |poll| {
- poll.flags.remove(.writable);
- std.debug.assert(poll.flags.contains(.poll_writable));
- }
- this.pending.consumed = @truncate(Blob.SizeType, total - initial);
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ std.debug.assert(poll.flags.contains(.poll_writable));
+ }
+ this.pending.consumed = @truncate(Blob.SizeType, total - initial);
- return .{
- .pending = &this.pending,
- };
+ return .{
+ .pending = &this.pending,
+ };
+ },
+ .hup => {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ poll.flags.insert(.hup);
+ }
+
+ this.cleanup();
+
+ return .{
+ .done = {},
+ };
+ },
}
}
}
@@ -1415,7 +1443,7 @@ pub const FileSink = struct {
if (this.isPending() or this.done) {
return .{ .result = JSC.JSValue.jsUndefined() };
}
- const result = this.flush();
+ const result = this.flush(this.buffer.slice());
if (result == .err) {
return .{ .err = result.err };
@@ -1448,7 +1476,6 @@ pub const FileSink = struct {
if (this.buffer.cap > 0) {
this.buffer.listManaged(this.allocator).deinit();
this.buffer = bun.ByteList.init("");
- this.done = true;
this.head = 0;
}
@@ -1510,9 +1537,9 @@ pub const FileSink = struct {
}
if (comptime Environment.isMac) {
- _ = this.flushMaybePollWithSize(@intCast(usize, @maximum(writable, 0)));
+ _ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), @intCast(usize, @maximum(writable, 0)));
} else {
- _ = this.flushMaybePollWithSize(std.math.maxInt(usize));
+ _ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), std.math.maxInt(usize));
}
}
@@ -1523,12 +1550,9 @@ pub const FileSink = struct {
const input = data.slice();
if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size) {
- var temp = this.buffer;
- defer this.buffer = temp;
- this.buffer = bun.ByteList.init(input);
- const result = this.flush();
+ const result = this.flush(input);
if (this.isPending()) {
- _ = temp.write(this.allocator, input) catch {
+ _ = this.buffer.write(this.allocator, input) catch {
return .{ .err = Syscall.Error.oom };
};
}
@@ -1541,7 +1565,7 @@ pub const FileSink = struct {
};
if (!this.isPending() and this.buffer.len >= this.chunk_size) {
- return this.flush();
+ return this.flush(this.buffer.slice());
}
this.signal.ready(null, null);
@@ -1556,12 +1580,9 @@ pub const FileSink = struct {
const input = data.slice();
if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size and strings.isAllASCII(input)) {
- var temp = this.buffer;
- defer this.buffer = temp;
- this.buffer = bun.ByteList.init(input);
- const result = this.flush();
+ const result = this.flush(input);
if (this.isPending()) {
- _ = temp.write(this.allocator, input) catch {
+ _ = this.buffer.write(this.allocator, input) catch {
return .{ .err = Syscall.Error.oom };
};
}
@@ -1574,7 +1595,7 @@ pub const FileSink = struct {
};
if (!this.isPending() and this.buffer.len >= this.chunk_size) {
- return this.flush();
+ return this.flush(this.buffer.slice());
}
this.signal.ready(null, null);
@@ -1591,8 +1612,9 @@ pub const FileSink = struct {
const len = this.buffer.writeUTF16(this.allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch {
return .{ .err = Syscall.Error.oom };
};
+
if (!this.isPending() and this.buffer.len >= this.chunk_size) {
- return this.flush();
+ return this.flush(this.buffer.slice());
}
this.signal.ready(null, null);
@@ -1601,8 +1623,7 @@ pub const FileSink = struct {
fn isPending(this: *const FileSink) bool {
if (this.done) return false;
- var poll_ref = this.poll_ref orelse return false;
- return poll_ref.isRegistered() and !poll_ref.flags.contains(.needs_rearm);
+ return this.pending.state == .pending;
}
pub fn close(this: *FileSink) void {
@@ -1641,7 +1662,7 @@ pub const FileSink = struct {
this.requested_end = true;
- const flushy = this.flush();
+ const flushy = this.flush(this.buffer.slice());
if (flushy == .err) {
return .{ .err = flushy.err };
@@ -1668,7 +1689,7 @@ pub const FileSink = struct {
return .{ .result = JSValue.jsNumber(this.written) };
}
- const flushed = this.flush();
+ const flushed = this.flush(this.buffer.slice());
if (flushed == .err) {
return .{ .err = flushed.err };
@@ -3627,19 +3648,28 @@ pub const FIFO = struct {
}
pub fn ready(this: *FIFO, sizeOrOffset: i64) void {
- const available_to_read = this.getAvailableToRead(sizeOrOffset);
if (this.isClosed()) {
- this.unwatch(this.poll_ref.?.fd);
+ if (this.isWatching())
+ this.unwatch(this.poll_ref.?.fd);
return;
}
- if (this.buf.len == 0 and (available_to_read orelse 1) != 0) {
+ if (this.buf.len == 0) {
return;
}
- const read_result = this.read(this.buf, available_to_read);
+ const read_result = this.read(
+ this.buf,
+ // On Linux, we end up calling ioctl() twice if we don't do this
+ if (comptime Environment.isMac)
+ // i33 holds the same amount of unsigned space as a u32, so we truncate it there before casting
+ @intCast(u32, @truncate(i33, sizeOrOffset))
+ else
+ null,
+ );
+
if (read_result == .read and read_result.read.len == 0) {
- if (this.poll_ref != null)
+ if (this.isWatching())
this.unwatch(this.poll_ref.?.fd);
this.close();
return;
diff --git a/src/global.zig b/src/global.zig
index 5312b0c05..1dccb15e7 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -357,18 +357,25 @@ pub fn isReadable(fd: std.os.fd_t) bool {
// return result;
}
-pub fn isWritable(fd: std.os.fd_t) bool {
+pub const WritableFlag = enum { writable, not_writable, hup };
+pub fn isWritable(fd: std.os.fd_t) WritableFlag {
var polls = &[_]std.os.pollfd{
.{
.fd = fd,
- .events = std.os.POLL.OUT | std.os.POLL.ERR,
+ .events = std.os.POLL.OUT,
.revents = 0,
},
};
const result = (std.os.poll(polls, 0) catch 0) != 0;
- global_scope_log("isWritable: {d}", .{result});
- return result;
+ global_scope_log("isWritable: {d} ({d})", .{ result, polls[0].revents });
+ if (result and polls[0].revents & std.os.POLL.HUP != 0) {
+ return WritableFlag.hup;
+ } else if (result) {
+ return WritableFlag.writable;
+ } else {
+ return WritableFlag.not_writable;
+ }
}
pub inline fn unreachablePanic(comptime fmts: []const u8, args: anytype) noreturn {