diff options
-rw-r--r-- | src/bun.js/base.zig | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 76 |
2 files changed, 59 insertions, 19 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 30b218d47..8b4d858c9 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -4146,6 +4146,8 @@ pub const FilePoll = struct { disable, + nonblocking, + pub fn poll(this: Flags) Flags { return switch (this) { .readable => .poll_readable, diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 36d284553..866442b25 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -292,6 +292,7 @@ pub const ReadableStream = struct { .globalThis = globalThis, .context = .{ .buffered_data = buffered_data, + .started = true, .lazy_readable = .{ .readable = .{ .FIFO = fifo.*, @@ -1142,7 +1143,16 @@ pub const FileSink = struct { const log = Output.scoped(.FileSink, false); pub fn isReachable(this: *const FileSink) bool { - return this.reachable_from_js or this.signal.isDead(); + return this.reachable_from_js or !this.signal.isDead(); + } + + pub fn updateRef(this: *FileSink, value: bool) void { + if (this.poll_ref) |poll| { + if (value) + poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm) + else + poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm); + } } const max_fifo_size = 64 * 1024; @@ -1294,7 +1304,7 @@ pub const FileSink = struct { } switch (bun.isWritable(fd)) { - .not_writable => { + .not_ready => { if (this.poll_ref) |poll| { poll.flags.remove(.writable); } @@ -1318,7 +1328,7 @@ pub const FileSink = struct { .done = {}, }; }, - .writable => break :brk this.max_write_size, + .ready => break :brk this.max_write_size, } } else remain.len; @@ -1373,14 +1383,14 @@ pub const FileSink = struct { // lets check if its writable, so we avoid blocking if (is_fifo and remain.len > 0) { switch (bun.isWritable(fd)) { - .writable => { + .ready => { if (this.poll_ref) |poll_ref| { poll_ref.flags.insert(.writable); poll_ref.flags.insert(.fifo); std.debug.assert(poll_ref.flags.contains(.poll_writable)); } }, - .not_writable => { + .not_ready => { if (!this.isWatching()) this.watch(this.fd); @@ -2227,8 +2237,16 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { .@"end" = end, .@"construct" = construct, .@"endWithSink" = endWithSink, + .@"updateRef" = updateRef, }); + pub fn updateRef(ptr: *anyopaque, value: bool) callconv(.C) void { + JSC.markBinding(@src()); + var this = bun.cast(*ThisSink, ptr); + if (comptime @hasDecl(SinkType, "updateRef")) + this.sink.updateRef(value); + } + comptime { if (!JSC.is_bindgen) { @export(finalize, .{ .name = Export[0].symbol_name }); @@ -2239,6 +2257,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { @export(end, .{ .name = Export[5].symbol_name }); @export(construct, .{ .name = Export[6].symbol_name }); @export(endWithSink, .{ .name = Export[7].symbol_name }); + @export(updateRef, .{ .name = Export[8].symbol_name }); } } @@ -3040,8 +3059,9 @@ pub fn ReadableStreamSource( }); comptime { - if (!JSC.is_bindgen) + if (!JSC.is_bindgen) { @export(load, .{ .name = Export[0].symbol_name }); + } } }; }; @@ -3601,19 +3621,36 @@ pub const FIFO = struct { if (!is_readable and (this.close_on_empty_read or poll.isHUP())) { // it might be readable actually this.close_on_empty_read = true; - if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) { - return null; + switch (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) { + .ready => { + this.close_on_empty_read = false; + return null; + }, + else => {}, } return .done; } else if (!is_readable and poll.isWatching()) { + // if the file was opened non-blocking + // we don't risk anything by attempting to read it! + if (poll.flags.contains(.nonblocking)) + return null; + // this happens if we've registered a watcher but we haven't // ticked the event loop since registering it - if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) { - return null; + switch (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) { + .ready => { + poll.flags.insert(.readable); + return null; + }, + .hup => { + poll.flags.insert(.hup); + return .done; + }, + else => { + return .pending; + }, } - - return .pending; } } @@ -3624,13 +3661,10 @@ pub const FIFO = struct { } } else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) { // we don't know if it's readable or not - if (!bun.isReadable(this.fd)) { - // we hung up - if (this.close_on_empty_read) - return .done; - - return .pending; - } + return switch (bun.isReadable(this.fd)) { + .hup, .ready => null, + else => ReadResult{ .pending = {} }, + }; } return null; @@ -4361,6 +4395,8 @@ pub const FileReader = struct { }, }, }; + this.lazy_readable.readable.FIFO.watch(readable_file.fd); + this.lazy_readable.readable.FIFO.poll_ref.?.flags.insert(.nonblocking); } else { this.lazy_readable = .{ .readable = .{ .File = readable_file }, @@ -4435,6 +4471,8 @@ pub const FileReader = struct { } } + pub const setRef = setRefOrUnref; + pub fn drainInternalBuffer(this: *FileReader) bun.ByteList { const buffered = this.buffered_data; if (buffered.cap > 0) { |