aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-25 00:05:14 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-25 00:05:14 -0800
commit7b23cb5cd7413564dae58a44e511d0640fb339d4 (patch)
treeac8f5ffdced0ec3d899ab4d3c2bb9f66f9046fc2
parent04328c163b92723f37ec57a3f7c9b4e80deeaf5b (diff)
downloadbun-7b23cb5cd7413564dae58a44e511d0640fb339d4.tar.gz
bun-7b23cb5cd7413564dae58a44e511d0640fb339d4.tar.zst
bun-7b23cb5cd7413564dae58a44e511d0640fb339d4.zip
Fix reading FIFO files
-rw-r--r--src/bun.js/base.zig2
-rw-r--r--src/bun.js/webcore/streams.zig76
2 files changed, 59 insertions, 19 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 30b218d47..8b4d858c9 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -4146,6 +4146,8 @@ pub const FilePoll = struct {
disable,
+ nonblocking,
+
pub fn poll(this: Flags) Flags {
return switch (this) {
.readable => .poll_readable,
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 36d284553..866442b25 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -292,6 +292,7 @@ pub const ReadableStream = struct {
.globalThis = globalThis,
.context = .{
.buffered_data = buffered_data,
+ .started = true,
.lazy_readable = .{
.readable = .{
.FIFO = fifo.*,
@@ -1142,7 +1143,16 @@ pub const FileSink = struct {
const log = Output.scoped(.FileSink, false);
pub fn isReachable(this: *const FileSink) bool {
- return this.reachable_from_js or this.signal.isDead();
+ return this.reachable_from_js or !this.signal.isDead();
+ }
+
+ pub fn updateRef(this: *FileSink, value: bool) void {
+ if (this.poll_ref) |poll| {
+ if (value)
+ poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm)
+ else
+ poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
}
const max_fifo_size = 64 * 1024;
@@ -1294,7 +1304,7 @@ pub const FileSink = struct {
}
switch (bun.isWritable(fd)) {
- .not_writable => {
+ .not_ready => {
if (this.poll_ref) |poll| {
poll.flags.remove(.writable);
}
@@ -1318,7 +1328,7 @@ pub const FileSink = struct {
.done = {},
};
},
- .writable => break :brk this.max_write_size,
+ .ready => break :brk this.max_write_size,
}
} else remain.len;
@@ -1373,14 +1383,14 @@ pub const FileSink = struct {
// lets check if its writable, so we avoid blocking
if (is_fifo and remain.len > 0) {
switch (bun.isWritable(fd)) {
- .writable => {
+ .ready => {
if (this.poll_ref) |poll_ref| {
poll_ref.flags.insert(.writable);
poll_ref.flags.insert(.fifo);
std.debug.assert(poll_ref.flags.contains(.poll_writable));
}
},
- .not_writable => {
+ .not_ready => {
if (!this.isWatching())
this.watch(this.fd);
@@ -2227,8 +2237,16 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
.@"end" = end,
.@"construct" = construct,
.@"endWithSink" = endWithSink,
+ .@"updateRef" = updateRef,
});
+ pub fn updateRef(ptr: *anyopaque, value: bool) callconv(.C) void {
+ JSC.markBinding(@src());
+ var this = bun.cast(*ThisSink, ptr);
+ if (comptime @hasDecl(SinkType, "updateRef"))
+ this.sink.updateRef(value);
+ }
+
comptime {
if (!JSC.is_bindgen) {
@export(finalize, .{ .name = Export[0].symbol_name });
@@ -2239,6 +2257,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
@export(end, .{ .name = Export[5].symbol_name });
@export(construct, .{ .name = Export[6].symbol_name });
@export(endWithSink, .{ .name = Export[7].symbol_name });
+ @export(updateRef, .{ .name = Export[8].symbol_name });
}
}
@@ -3040,8 +3059,9 @@ pub fn ReadableStreamSource(
});
comptime {
- if (!JSC.is_bindgen)
+ if (!JSC.is_bindgen) {
@export(load, .{ .name = Export[0].symbol_name });
+ }
}
};
};
@@ -3601,19 +3621,36 @@ pub const FIFO = struct {
if (!is_readable and (this.close_on_empty_read or poll.isHUP())) {
// it might be readable actually
this.close_on_empty_read = true;
- if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
- return null;
+ switch (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
+ .ready => {
+ this.close_on_empty_read = false;
+ return null;
+ },
+ else => {},
}
return .done;
} else if (!is_readable and poll.isWatching()) {
+ // if the file was opened non-blocking
+ // we don't risk anything by attempting to read it!
+ if (poll.flags.contains(.nonblocking))
+ return null;
+
// this happens if we've registered a watcher but we haven't
// ticked the event loop since registering it
- if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
- return null;
+ switch (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
+ .ready => {
+ poll.flags.insert(.readable);
+ return null;
+ },
+ .hup => {
+ poll.flags.insert(.hup);
+ return .done;
+ },
+ else => {
+ return .pending;
+ },
}
-
- return .pending;
}
}
@@ -3624,13 +3661,10 @@ pub const FIFO = struct {
}
} else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
// we don't know if it's readable or not
- if (!bun.isReadable(this.fd)) {
- // we hung up
- if (this.close_on_empty_read)
- return .done;
-
- return .pending;
- }
+ return switch (bun.isReadable(this.fd)) {
+ .hup, .ready => null,
+ else => ReadResult{ .pending = {} },
+ };
}
return null;
@@ -4361,6 +4395,8 @@ pub const FileReader = struct {
},
},
};
+ this.lazy_readable.readable.FIFO.watch(readable_file.fd);
+ this.lazy_readable.readable.FIFO.poll_ref.?.flags.insert(.nonblocking);
} else {
this.lazy_readable = .{
.readable = .{ .File = readable_file },
@@ -4435,6 +4471,8 @@ pub const FileReader = struct {
}
}
+ pub const setRef = setRefOrUnref;
+
pub fn drainInternalBuffer(this: *FileReader) bun.ByteList {
const buffered = this.buffered_data;
if (buffered.cap > 0) {