diff options
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/webcore/streams.zig | 164 |
1 files changed, 89 insertions, 75 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index dcbe12ce2..253deda55 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1139,10 +1139,13 @@ pub const FileSink = struct { var total: usize = this.written; const initial = total; - defer this.written = total; const fd = this.fd; var remain = this.buffer.slice(); remain = remain[@minimum(this.head, remain.len)..]; + if (remain.len == 0) return .{ .owned = 0 }; + + defer this.written = total; + const initial_remain = remain; defer { std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(initial_remain.ptr)); @@ -1155,89 +1158,115 @@ pub const FileSink = struct { } } const is_fifo = this.isFIFO(); - + var did_adjust_pipe_size_on_linux_this_tick = false; if (comptime Environment.isLinux) { if (is_fifo and !this.has_adjusted_pipe_size_on_linux and remain.len >= (max_fifo_size - 1024)) { this.adjustPipeLengthOnLinux(fd, remain.len); + did_adjust_pipe_size_on_linux_this_tick = true; } } const max_to_write = if (is_fifo) brk: { - if (comptime Environment.isMac) { - break :brk if (writable_size == std.math.maxInt(usize)) - max_fifo_size - else - writable_size; + if (comptime Environment.isLinux) { + if (did_adjust_pipe_size_on_linux_this_tick) + break :brk this.max_write_size; } - break :brk this.max_write_size; - } else remain.len; - while (remain.len > 0) { - const write_buf = remain[0..@minimum(remain.len, max_to_write)]; - const res = JSC.Node.Syscall.write(fd, write_buf); + // The caller may have informed us of the size + // in which case we should use that. + if (writable_size != std.math.maxInt(usize)) + break :brk writable_size; - if (res == .err) { - const retry = - std.os.E.AGAIN; - - switch (res.err.getErrno()) { - retry => { - if (this.poll_ref) |poll| { - poll.flags.remove(.writable); - } + if (this.poll_ref) |poll| { + if (poll.isWritable()) { + break :brk this.max_write_size; + } + } - if (!this.isWatching()) - this.watch(fd); - return .{ - .pending = &this.pending, - }; - }, - else => {}, + if (!bun.isWritable(fd)) { + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); } - this.pending.result = .{ .err = res.err }; - this.pending.consumed = @truncate(Blob.SizeType, total - initial); - return .{ .err = res.err }; + if (!this.isWatching()) + this.watch(fd); + + return .{ + .pending = &this.pending, + }; } - remain = remain[res.result..]; - total += res.result; + break :brk this.max_write_size; + } else remain.len; - log("Wrote {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ res.result, fd, this.head, remain.len, total }); + if (max_to_write > 0) { + while (remain.len > 0) { + const write_buf = remain[0..@minimum(remain.len, max_to_write)]; + const res = JSC.Node.Syscall.write(fd, write_buf); - if (res.result == 0) { - if (this.poll_ref) |poll| { - poll.flags.remove(.writable); - } - break; - } - - // we flushed an entire fifo - // but we still have more - // lets check if its writable, so we avoid blocking - if (is_fifo and remain.len > 0) { - const is_writable = bun.isWritable(fd); - if (is_writable) { - if (this.poll_ref) |poll_ref| { - poll_ref.flags.insert(.writable); - poll_ref.flags.insert(.fifo); - std.debug.assert(poll_ref.flags.contains(.poll_writable)); + if (res == .err) { + const retry = + std.os.E.AGAIN; + + switch (res.err.getErrno()) { + retry => { + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + } + + if (!this.isWatching()) + this.watch(fd); + return .{ + .pending = &this.pending, + }; + }, + else => {}, } - } else { - if (!this.isWatching()) - this.watch(this.fd); + this.pending.result = .{ .err = res.err }; + this.pending.consumed = @truncate(Blob.SizeType, total - initial); + + return .{ .err = res.err }; + } + + remain = remain[res.result..]; + total += res.result; + log("Wrote {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ res.result, fd, this.head, remain.len, total }); + + if (res.result == 0) { if (this.poll_ref) |poll| { poll.flags.remove(.writable); - std.debug.assert(poll.flags.contains(.poll_writable)); } - this.pending.consumed = @truncate(Blob.SizeType, total - initial); + break; + } - return .{ - .pending = &this.pending, - }; + // we flushed an entire fifo + // but we still have more + // lets check if its writable, so we avoid blocking + if (is_fifo and remain.len > 0) { + const is_writable = bun.isWritable(fd); + if (is_writable) { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.writable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_writable)); + } + } else { + if (!this.isWatching()) + this.watch(this.fd); + + if (this.poll_ref) |poll| { + poll.flags.remove(.writable); + std.debug.assert(poll.flags.contains(.poll_writable)); + } + this.pending.consumed = @truncate(Blob.SizeType, total - initial); + + return .{ + .pending = &this.pending, + }; + } } } } @@ -3769,21 +3798,6 @@ pub const FileReader = struct { } } - // const rc: JSC.Node.Maybe(usize) = if (comptime Environment.isLinux) brk: { - // if (len == 65536 and this.has_adjusted_pipe_size_on_linux and buf_to_use.len > len) { - // var iovecs = [_]std.os.iovec{.{ .iov_base = @intToPtr([*]u8, @ptrToInt(buf_to_use.ptr)), .iov_len = @intCast(usize, buf_to_use.len) }}; - // const rc = bun.C.linux.vmsplice(fd, &iovecs, 1, 0); - // Output.debug("vmsplice({d}, {d}) = {d}", .{ fd, buf_to_use.len, rc }); - // if (JSC.Node.Maybe(usize).errnoSys(rc, .read)) |err| { - // break :brk err; - // } - - // break :brk JSC.Node.Maybe(usize){ .result = @intCast(usize, rc) }; - // } - - // break :brk Syscall.read(fd, buf_to_use); - // } else Syscall.read(fd, buf_to_use); - switch (Syscall.read(fd, buf_to_use)) { .err => |err| { const retry = std.os.E.AGAIN; @@ -4039,7 +4053,7 @@ pub fn NewReadyWatcher( } if (comptime @hasField(Context, "mode")) { - return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode); + return std.os.S.ISFIFO(this.mode); } return false; |