aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bun.js/webcore/streams.zig164
1 files changed, 89 insertions, 75 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index dcbe12ce2..253deda55 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1139,10 +1139,13 @@ pub const FileSink = struct {
var total: usize = this.written;
const initial = total;
- defer this.written = total;
const fd = this.fd;
var remain = this.buffer.slice();
remain = remain[@minimum(this.head, remain.len)..];
+ if (remain.len == 0) return .{ .owned = 0 };
+
+ defer this.written = total;
+
const initial_remain = remain;
defer {
std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(initial_remain.ptr));
@@ -1155,89 +1158,115 @@ pub const FileSink = struct {
}
}
const is_fifo = this.isFIFO();
-
+ var did_adjust_pipe_size_on_linux_this_tick = false;
if (comptime Environment.isLinux) {
if (is_fifo and !this.has_adjusted_pipe_size_on_linux and remain.len >= (max_fifo_size - 1024)) {
this.adjustPipeLengthOnLinux(fd, remain.len);
+ did_adjust_pipe_size_on_linux_this_tick = true;
}
}
const max_to_write =
if (is_fifo)
brk: {
- if (comptime Environment.isMac) {
- break :brk if (writable_size == std.math.maxInt(usize))
- max_fifo_size
- else
- writable_size;
+ if (comptime Environment.isLinux) {
+ if (did_adjust_pipe_size_on_linux_this_tick)
+ break :brk this.max_write_size;
}
- break :brk this.max_write_size;
- } else remain.len;
- while (remain.len > 0) {
- const write_buf = remain[0..@minimum(remain.len, max_to_write)];
- const res = JSC.Node.Syscall.write(fd, write_buf);
+ // The caller may have informed us of the size
+ // in which case we should use that.
+ if (writable_size != std.math.maxInt(usize))
+ break :brk writable_size;
- if (res == .err) {
- const retry =
- std.os.E.AGAIN;
-
- switch (res.err.getErrno()) {
- retry => {
- if (this.poll_ref) |poll| {
- poll.flags.remove(.writable);
- }
+ if (this.poll_ref) |poll| {
+ if (poll.isWritable()) {
+ break :brk this.max_write_size;
+ }
+ }
- if (!this.isWatching())
- this.watch(fd);
- return .{
- .pending = &this.pending,
- };
- },
- else => {},
+ if (!bun.isWritable(fd)) {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
}
- this.pending.result = .{ .err = res.err };
- this.pending.consumed = @truncate(Blob.SizeType, total - initial);
- return .{ .err = res.err };
+ if (!this.isWatching())
+ this.watch(fd);
+
+ return .{
+ .pending = &this.pending,
+ };
}
- remain = remain[res.result..];
- total += res.result;
+ break :brk this.max_write_size;
+ } else remain.len;
- log("Wrote {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ res.result, fd, this.head, remain.len, total });
+ if (max_to_write > 0) {
+ while (remain.len > 0) {
+ const write_buf = remain[0..@minimum(remain.len, max_to_write)];
+ const res = JSC.Node.Syscall.write(fd, write_buf);
- if (res.result == 0) {
- if (this.poll_ref) |poll| {
- poll.flags.remove(.writable);
- }
- break;
- }
-
- // we flushed an entire fifo
- // but we still have more
- // lets check if its writable, so we avoid blocking
- if (is_fifo and remain.len > 0) {
- const is_writable = bun.isWritable(fd);
- if (is_writable) {
- if (this.poll_ref) |poll_ref| {
- poll_ref.flags.insert(.writable);
- poll_ref.flags.insert(.fifo);
- std.debug.assert(poll_ref.flags.contains(.poll_writable));
+ if (res == .err) {
+ const retry =
+ std.os.E.AGAIN;
+
+ switch (res.err.getErrno()) {
+ retry => {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ }
+
+ if (!this.isWatching())
+ this.watch(fd);
+ return .{
+ .pending = &this.pending,
+ };
+ },
+ else => {},
}
- } else {
- if (!this.isWatching())
- this.watch(this.fd);
+ this.pending.result = .{ .err = res.err };
+ this.pending.consumed = @truncate(Blob.SizeType, total - initial);
+
+ return .{ .err = res.err };
+ }
+
+ remain = remain[res.result..];
+ total += res.result;
+ log("Wrote {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ res.result, fd, this.head, remain.len, total });
+
+ if (res.result == 0) {
if (this.poll_ref) |poll| {
poll.flags.remove(.writable);
- std.debug.assert(poll.flags.contains(.poll_writable));
}
- this.pending.consumed = @truncate(Blob.SizeType, total - initial);
+ break;
+ }
- return .{
- .pending = &this.pending,
- };
+ // we flushed an entire fifo
+ // but we still have more
+ // lets check if its writable, so we avoid blocking
+ if (is_fifo and remain.len > 0) {
+ const is_writable = bun.isWritable(fd);
+ if (is_writable) {
+ if (this.poll_ref) |poll_ref| {
+ poll_ref.flags.insert(.writable);
+ poll_ref.flags.insert(.fifo);
+ std.debug.assert(poll_ref.flags.contains(.poll_writable));
+ }
+ } else {
+ if (!this.isWatching())
+ this.watch(this.fd);
+
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.writable);
+ std.debug.assert(poll.flags.contains(.poll_writable));
+ }
+ this.pending.consumed = @truncate(Blob.SizeType, total - initial);
+
+ return .{
+ .pending = &this.pending,
+ };
+ }
}
}
}
@@ -3769,21 +3798,6 @@ pub const FileReader = struct {
}
}
- // const rc: JSC.Node.Maybe(usize) = if (comptime Environment.isLinux) brk: {
- // if (len == 65536 and this.has_adjusted_pipe_size_on_linux and buf_to_use.len > len) {
- // var iovecs = [_]std.os.iovec{.{ .iov_base = @intToPtr([*]u8, @ptrToInt(buf_to_use.ptr)), .iov_len = @intCast(usize, buf_to_use.len) }};
- // const rc = bun.C.linux.vmsplice(fd, &iovecs, 1, 0);
- // Output.debug("vmsplice({d}, {d}) = {d}", .{ fd, buf_to_use.len, rc });
- // if (JSC.Node.Maybe(usize).errnoSys(rc, .read)) |err| {
- // break :brk err;
- // }
-
- // break :brk JSC.Node.Maybe(usize){ .result = @intCast(usize, rc) };
- // }
-
- // break :brk Syscall.read(fd, buf_to_use);
- // } else Syscall.read(fd, buf_to_use);
-
switch (Syscall.read(fd, buf_to_use)) {
.err => |err| {
const retry = std.os.E.AGAIN;
@@ -4039,7 +4053,7 @@ pub fn NewReadyWatcher(
}
if (comptime @hasField(Context, "mode")) {
- return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode);
+ return std.os.S.ISFIFO(this.mode);
}
return false;