aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore')
-rw-r--r--src/bun.js/webcore/streams.zig60
1 files changed, 47 insertions, 13 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 7f82d694e..263525ab8 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -308,6 +308,7 @@ pub const ReadableStream = struct {
reader.context.lazy_readable.readable.FIFO.pending.future = undefined;
reader.context.lazy_readable.readable.FIFO.auto_sizer = null;
reader.context.lazy_readable.readable.FIFO.pending.state = .none;
+ reader.context.lazy_readable.readable.FIFO.drained = buffered_data.len == 0;
return reader.toJS(globalThis);
}
@@ -3517,6 +3518,7 @@ pub const FIFO = struct {
is_first_read: bool = true,
auto_close: bool = true,
has_adjusted_pipe_size_on_linux: bool = false,
+ drained: bool = true,
pub usingnamespace NewReadyWatcher(@This(), .readable, ready);
@@ -3610,6 +3612,12 @@ pub const FIFO = struct {
this.close_on_empty_read = false;
return null;
},
+ // we need to read the 0 at the end or else we are not truly done
+ .hup => {
+ this.close_on_empty_read = true;
+ poll.flags.insert(.hup);
+ return null;
+ },
else => {},
}
@@ -3647,7 +3655,11 @@ pub const FIFO = struct {
} else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
// we don't know if it's readable or not
return switch (bun.isReadable(this.fd)) {
- .hup, .ready => null,
+ .hup => {
+ this.close_on_empty_read = true;
+ return null;
+ },
+ .ready => null,
else => ReadResult{ .pending = {} },
};
}
@@ -3666,17 +3678,34 @@ pub const FIFO = struct {
return this.to_read;
}
- pub fn ready(this: *FIFO, sizeOrOffset: i64) void {
+ pub fn ready(this: *FIFO, sizeOrOffset: i64, is_hup: bool) void {
if (this.isClosed()) {
if (this.isWatching())
this.unwatch(this.poll_ref.?.fd);
return;
}
- if (this.buf.len == 0) {
+ if (comptime Environment.isMac) {
+ if (sizeOrOffset == 0 and is_hup and this.drained) {
+ this.close();
+ return;
+ }
+ } else if (is_hup and this.drained and this.getAvailableToReadOnLinux() == 0) {
+ this.close();
return;
}
+ if (this.buf.len == 0) {
+ var auto_sizer = this.auto_sizer orelse return;
+ if (comptime Environment.isMac) {
+ if (sizeOrOffset > 0) {
+ this.buf = auto_sizer.resize(@intCast(usize, sizeOrOffset)) catch return;
+ } else {
+ this.buf = auto_sizer.resize(8096) catch return;
+ }
+ }
+ }
+
const read_result = this.read(
this.buf,
// On Linux, we end up calling ioctl() twice if we don't do this
@@ -3687,13 +3716,6 @@ pub const FIFO = struct {
null,
);
- if (read_result == .read and read_result.read.len == 0) {
- if (this.isWatching())
- this.unwatch(this.poll_ref.?.fd);
- this.close();
- return;
- }
-
if (read_result == .read) {
if (this.to_read) |*to_read| {
to_read.* = to_read.* -| @truncate(u32, read_result.read.len);
@@ -3768,6 +3790,7 @@ pub const FIFO = struct {
}
var buf = buf_;
+ std.debug.assert(buf.len > 0);
if (available_to_read) |amt| {
if (amt >= buf.len) {
@@ -3828,9 +3851,9 @@ pub const FIFO = struct {
}
}
- if (result == 0)
+ if (result == 0) {
return .{ .read = buf[0..0] };
-
+ }
return .{ .read = buf[0..result] };
},
}
@@ -4238,6 +4261,14 @@ pub const FileReader = struct {
blob: *Blob.Store,
empty: void,
+ pub fn onDrain(this: *Lazy) void {
+ if (this.* == .readable) {
+ if (this.readable == .FIFO) {
+ this.readable.FIFO.drained = true;
+ }
+ }
+ }
+
pub fn finish(this: *Lazy) void {
switch (this.readable) {
.FIFO => {
@@ -4380,6 +4411,7 @@ pub const FileReader = struct {
.FIFO = FIFO{
.fd = readable_file.fd,
.auto_close = readable_file.auto_close,
+ .drained = this.buffered_data.len == 0,
},
},
};
@@ -4394,7 +4426,8 @@ pub const FileReader = struct {
.readable => {},
.empty => return .{ .empty = {} },
}
- }
+ } else if (this.lazy_readable == .empty)
+ return .{ .empty = {} };
if (this.readable().* == .File) {
const chunk_size = this.readable().File.calculateChunkSize(std.math.maxInt(usize));
@@ -4463,6 +4496,7 @@ pub const FileReader = struct {
pub fn drainInternalBuffer(this: *FileReader) bun.ByteList {
const buffered = this.buffered_data;
+ this.lazy_readable.onDrain();
if (buffered.cap > 0) {
this.buffered_data = .{};
}