diff options
Diffstat (limited to 'src/bun.js')
23 files changed, 1587 insertions, 1129 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index 3a88f7a04..ee26b09f5 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -2448,8 +2448,7 @@ pub const Timer = struct { } pub fn deinit(this: *Timeout) void { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); var vm = this.globalThis.bunVM(); this.poll_ref.unref(vm); @@ -2465,7 +2464,7 @@ pub const Timer = struct { countdown: JSValue, repeat: bool, ) !void { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); var vm = globalThis.bunVM(); // We don't deal with nesting levels directly @@ -2534,7 +2533,7 @@ pub const Timer = struct { callback: JSValue, countdown: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); const id = globalThis.bunVM().timer.last_id; globalThis.bunVM().timer.last_id +%= 1; @@ -2548,7 +2547,7 @@ pub const Timer = struct { callback: JSValue, countdown: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); const id = globalThis.bunVM().timer.last_id; globalThis.bunVM().timer.last_id +%= 1; @@ -2559,7 +2558,7 @@ pub const Timer = struct { } pub fn clearTimer(timer_id: JSValue, _: *JSGlobalObject, repeats: bool) void { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); var map = if (repeats) &VirtualMachine.vm.timer.interval_map else &VirtualMachine.vm.timer.timeout_map; const id: Timeout.ID = .{ @@ -2580,7 +2579,7 @@ pub const Timer = struct { globalThis: *JSGlobalObject, id: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); Timer.clearTimer(id, globalThis, false); return JSValue.jsUndefined(); } @@ -2588,7 +2587,7 @@ pub const Timer = struct { globalThis: *JSGlobalObject, id: JSValue, ) callconv(.C) JSValue { - if (comptime is_bindgen) unreachable; + JSC.markBinding(@src()); Timer.clearTimer(id, globalThis, true); return JSValue.jsUndefined(); } diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index d594d44a7..afcc5509b 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -57,8 +57,6 @@ pub const PosixSpawn = struct { } else { _ = system.posix_spawnattr_destroy(&self.attr); } - - self.* = undefined; } pub fn get(self: Attr) !u16 { @@ -207,6 +205,10 @@ pub const PosixSpawn = struct { argv, envp, ); + if (comptime bun.Environment.allow_assert) + JSC.Node.Syscall.syslog("posix_spawn({s}) = {d} ({d})", .{ + path, rc, pid, + }); if (comptime bun.Environment.isLinux) { // rc is negative because it's libc errno diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index e09c5db2b..c85e0396f 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -28,7 +28,6 @@ pub const Subprocess = struct { stdin: Writable, stdout: Readable, stderr: Readable, - killed: bool = false, poll_ref: ?*JSC.FilePoll = null, @@ -47,8 +46,22 @@ pub const Subprocess = struct { finalized: bool = false, globalThis: *JSC.JSGlobalObject, - + observable_getters: std.enums.EnumSet(enum { + stdin, + stdout, + stderr, + }) = .{}, has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), + is_sync: bool = false, + + pub fn hasExited(this: *const Subprocess) bool { + return this.exit_code != null or this.waitpid_err != null; + } + + pub fn updateHasPendingActivityFlag(this: *Subprocess) void { + @fence(.SeqCst); + this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst); + } pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool { @fence(.Acquire); @@ -78,7 +91,7 @@ pub const Subprocess = struct { } const Readable = union(enum) { - fd: JSC.Node.FileDescriptor, + fd: bun.FileDescriptor, pipe: Pipe, inherit: void, @@ -102,44 +115,37 @@ pub const Subprocess = struct { return; } - if (this.buffer.fd != JSC.Node.invalid_fd) { - this.buffer.close(); - } + this.buffer.close(); } pub fn toJS(this: *@This(), readable: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue { - if (this.* == .stream) { - if (this.stream.ptr == .File) { - this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable); - } - return this.stream.toJS(); + if (this.* != .stream) { + const stream = this.buffer.toReadableStream(globalThis, exited); + this.* = .{ .stream = stream }; } - const is_fifo = this.buffer.is_fifo; - const stream = this.buffer.toReadableStream(globalThis, exited); - this.* = .{ .stream = stream }; + if (this.stream.ptr == .File) { - this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable); - this.stream.ptr.File.is_fifo = is_fifo; + this.stream.ptr.File.setSignal(JSC.WebCore.Signal.init(readable)); } - return stream.value; + + return this.stream.toJS(); } }; - pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable { + pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable { return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, .pipe => brk: { + _ = JSC.Node.Syscall.close(other_fd); break :brk .{ .pipe = .{ - .buffer = BufferedOutput{ - .fd = fd, - .is_fifo = true, - }, + .buffer = undefined, }, }; }, - .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }, + .path => Readable{ .ignore = {} }, + .blob, .fd => Readable{ .fd = @intCast(bun.FileDescriptor, fd) }, else => unreachable, }; } @@ -159,7 +165,7 @@ pub const Subprocess = struct { }, .pipe => { if (this.pipe == .stream and this.pipe.stream.ptr == .File) - this.pipe.stream.ptr.File.signal.clear(); + this.pipe.stream.ptr.File.readable().FIFO.signal.clear(); this.pipe.done(); }, else => {}, @@ -190,10 +196,8 @@ pub const Subprocess = struct { .pipe => { defer this.close(); - if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) { - if (this.pipe.buffer.canRead()) - this.pipe.buffer.readIfPossible(true); - } + if (this.pipe.buffer.canRead()) + this.pipe.buffer.readAll(); var bytes = this.pipe.buffer.internal_buffer.slice(); this.pipe.buffer.internal_buffer = .{}; @@ -216,6 +220,7 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { + this.observable_getters.insert(.stderr); return this.stderr.toJS(globalThis, this.exit_code != null); } @@ -223,6 +228,7 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { + this.observable_getters.insert(.stdin); return this.stdin.toJS(globalThis); } @@ -230,6 +236,7 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { + this.observable_getters.insert(.stdout); return this.stdout.toJS(globalThis, this.exit_code != null); } @@ -291,41 +298,22 @@ pub const Subprocess = struct { return .{ .result = {} }; } - pub fn onKill( - this: *Subprocess, - ) void { - if (this.killed) { - return; - } - - this.killed = true; - this.closePorts(); + fn hasCalledGetter(this: *Subprocess, comptime getter: @Type(.EnumLiteral)) bool { + return this.observable_getters.contains(getter); } - pub fn closePorts(this: *Subprocess) void { - const pidfd = this.pidfd; - - if (comptime Environment.isLinux) { - this.pidfd = std.math.maxInt(std.os.fd_t); + fn closeProcess(this: *Subprocess) void { + if (comptime !Environment.isLinux) { + return; } - defer { - if (comptime Environment.isLinux) { - if (pidfd != std.math.maxInt(std.os.fd_t)) { - _ = std.os.close(pidfd); - } - } - } + const pidfd = this.pidfd; - if (this.stdout == .pipe) { - this.stdout.pipe.finish(); - } + this.pidfd = std.math.maxInt(std.os.fd_t); - if (this.stderr == .pipe) { - this.stderr.pipe.finish(); + if (pidfd != std.math.maxInt(std.os.fd_t)) { + _ = std.os.close(pidfd); } - - this.stdin.close(); } pub fn doRef(this: *Subprocess, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSValue { @@ -354,7 +342,7 @@ pub const Subprocess = struct { pub const BufferedInput = struct { remain: []const u8 = "", - fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, + fd: bun.FileDescriptor = bun.invalid_fd, poll_ref: ?*JSC.FilePoll = null, written: usize = 0, @@ -366,6 +354,10 @@ pub const Subprocess = struct { pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady); pub fn onReady(this: *BufferedInput, _: i64) void { + if (this.fd == bun.invalid_fd) { + return; + } + this.write(); } @@ -395,10 +387,14 @@ pub const Subprocess = struct { } } - this.write(); + this.writeAllowBlocking(is_sync); } pub fn write(this: *BufferedInput) void { + this.writeAllowBlocking(false); + } + + pub fn writeAllowBlocking(this: *BufferedInput, allow_blocking: bool) void { var to_write = this.remain; if (to_write.len == 0) { @@ -450,7 +446,7 @@ pub const Subprocess = struct { to_write = to_write[bytes_written..]; // we are done or it accepts no more input - if (this.remain.len == 0 or bytes_written == 0) { + if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) { this.deinit(); return; } @@ -465,9 +461,9 @@ pub const Subprocess = struct { poll.deinit(); } - if (this.fd != JSC.Node.invalid_fd) { + if (this.fd != bun.invalid_fd) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = JSC.Node.invalid_fd; + this.fd = bun.invalid_fd; } } @@ -487,186 +483,137 @@ pub const Subprocess = struct { pub const BufferedOutput = struct { internal_buffer: bun.ByteList = .{}, - max_internal_buffer: u32 = default_max_buffer_size, - fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, - received_eof: bool = false, - pending_error: ?JSC.Node.Syscall.Error = null, - poll_ref: ?*JSC.FilePoll = null, - is_fifo: bool = false, + fifo: JSC.WebCore.FIFO = undefined, + auto_sizer: JSC.WebCore.AutoSizer = undefined, + status: Status = .{ + .pending = {}, + }, - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready); + pub const Status = union(enum) { + pending: void, + done: void, + err: JSC.Node.Syscall.Error, + }; - pub fn ready(this: *BufferedOutput, available_to_read: i64) void { - if (comptime Environment.isMac) { - if (this.poll_ref) |poll| { - if (available_to_read > 0) { - poll.flags.insert(.readable); - } else { - poll.flags.remove(.readable); - } - } - } + pub fn init(fd: bun.FileDescriptor) BufferedOutput { + return BufferedOutput{ + .internal_buffer = .{}, + .fifo = JSC.WebCore.FIFO{ + .fd = fd, + }, + }; + } - // TODO: what happens if the task was already enqueued after unwatch()? - this.readAll(false); + pub fn setup(this: *BufferedOutput, allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) void { + this.* = init(fd); + this.auto_sizer = .{ + .max = max_size, + .allocator = allocator, + .buffer = &this.internal_buffer, + }; + this.watch(); } pub fn canRead(this: *BufferedOutput) bool { - const is_readable = bun.isReadable(this.fd); - - if (is_readable) { - if (this.poll_ref) |poll_ref| { - poll_ref.flags.insert(.readable); - poll_ref.flags.insert(.fifo); - std.debug.assert(poll_ref.flags.contains(.poll_readable)); - } - } - - return is_readable; + return bun.isReadable(this.fifo.fd); } - pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void { - if (comptime !force) { - // we ask, "Is it possible to read right now?" - // we do this rather than epoll or kqueue() - // because we don't want to block the thread waiting for the read - // and because kqueue or epoll might return other unrelated events - // and we don't want this to become an event loop ticking point - if (!this.canRead()) { - this.watch(this.fd); + pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void { + switch (result) { + .pending => { + this.watch(); return; - } - } - - this.readAll(force); - } - - pub fn closeOnEOF(this: *BufferedOutput) bool { - var poll = this.poll_ref orelse return true; - poll.flags.insert(.eof); - return false; - } - - pub fn readAll(this: *BufferedOutput, comptime force: bool) void { - if (this.poll_ref) |poll| { - const is_readable = poll.isReadable(); - if (!is_readable and poll.isEOF()) { - if (poll.isHUP()) { - this.autoCloseFileDescriptor(); - } + }, + .err => |err| { + this.status = .{ .err = err }; + this.fifo.close(); return; - } else if (!is_readable and poll.isHUP()) { - this.autoCloseFileDescriptor(); - return; - } else if (!is_readable) { + }, + .done => { + this.status = .{ .done = {} }; + this.fifo.close(); return; - } + }, + else => { + const slice = result.slice(); + this.internal_buffer.len += @truncate(u32, slice.len); + if (slice.len > 0) + std.debug.assert(this.internal_buffer.contains(slice)); + + if (result.isDone()) { + this.status = .{ .done = {} }; + this.fifo.close(); + } + }, } + } - // read as much as we can from the pipe - while (this.internal_buffer.len < this.max_internal_buffer) { - var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined; - - var buf: []u8 = buffer_[0..]; - - var available = this.internal_buffer.ptr[this.internal_buffer.len..this.internal_buffer.cap]; - if (available.len >= buf.len) { - buf = available; + pub fn readAll(this: *BufferedOutput) void { + while (@as(usize, this.internal_buffer.len) < this.auto_sizer.max and this.status == .pending) { + var stack_buffer: [8096]u8 = undefined; + var stack_buf: []u8 = stack_buffer[0..]; + var buf_to_use = stack_buf; + var available = this.internal_buffer.available(); + if (available.len >= stack_buf.len) { + buf_to_use = available; } - switch (JSC.Node.Syscall.read(this.fd, buf)) { - .err => |e| { - if (e.isRetry()) { - if (!this.isWatching() and this.isFIFO()) - this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); - return; - } - - if (comptime Environment.isMac) { - // INTR is returned on macOS when the process is killed - // It probably sent SIGPIPE but we have the handler for - // that disabled. - // We know it's the "real" INTR because we use read$NOCANCEL - if (e.getErrno() == .INTR) { - this.received_eof = true; - this.autoCloseFileDescriptor(); - return; - } - } else { - if (comptime Environment.allow_assert) { - std.debug.assert(e.getErrno() != .INTR); // Bun's read() function should retry on EINTR - } - } + const result = this.fifo.read(buf_to_use, this.fifo.to_read); - // fail - log("readAll() fail: {s}", .{@tagName(e.getErrno())}); - this.pending_error = e; - this.internal_buffer.listManaged(bun.default_allocator).deinit(); - this.internal_buffer = .{}; + switch (result) { + .pending => { + this.watch(); return; }, + .err => |err| { + this.status = .{ .err = err }; + this.fifo.close(); - .result => |bytes_read| { - log("readAll() {d}", .{bytes_read}); - - if (bytes_read > 0) { - if (buf.ptr == available.ptr) { - this.internal_buffer.len += @truncate(u32, bytes_read); - } else { - _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory"); - } - } - - if (comptime !force) { - if (buf[bytes_read..].len > 0 or !this.canRead()) { - if (!this.isWatching()) - this.watch(this.fd); - if (this.is_fifo) - this.poll_ref.?.flags.insert(.fifo) - else - this.received_eof = true; - return; - } + return; + }, + .done => { + this.status = .{ .done = {} }; + this.fifo.close(); + return; + }, + .read => |slice| { + if (slice.ptr == stack_buf.ptr) { + this.internal_buffer.append(this.auto_sizer.allocator, slice) catch @panic("out of memory"); } else { - // we consider a short read as being EOF - this.received_eof = !this.is_fifo and this.received_eof or bytes_read < buf.len; - if (this.received_eof) { - if (this.closeOnEOF()) { - this.autoCloseFileDescriptor(); - } + this.internal_buffer.len += @truncate(u32, slice.len); + } - // do not auto-close the file descriptor here - // it's totally legit to have a short read - return; - } + if (slice.len < buf_to_use.len) { + this.watch(); + return; } }, } } } + fn watch(this: *BufferedOutput) void { + this.fifo.pending.set(BufferedOutput, this, onRead); + if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd); + return; + } + pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob { const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis); this.internal_buffer = bun.ByteList.init(""); - std.debug.assert(this.fd == JSC.Node.invalid_fd); - std.debug.assert(this.received_eof); return blob; } pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream { if (exited) { // exited + received EOF => no more read() - if (this.received_eof) { - var poll_ref = this.poll_ref; - this.poll_ref = null; - - this.autoCloseFileDescriptor(); - + if (this.fifo.isClosed()) { // also no data at all if (this.internal_buffer.len == 0) { - this.close(); + if (this.internal_buffer.cap > 0) { + this.internal_buffer.deinitWithAllocator(this.auto_sizer.allocator); + } // so we return an empty stream return JSC.WebCore.ReadableStream.fromJS( JSC.WebCore.ReadableStream.empty(globalThis), @@ -675,70 +622,52 @@ pub const Subprocess = struct { } return JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlobWithPoll( + JSC.WebCore.ReadableStream.fromBlob( globalThis, &this.toBlob(globalThis), 0, - poll_ref, ), globalThis, ).?; + } else { + this.fifo.close_on_empty_read = true; } } - std.debug.assert(this.fd != JSC.Node.invalid_fd); { - var poll_ref = this.poll_ref; - this.poll_ref = null; + const internal_buffer = this.internal_buffer; + this.internal_buffer = bun.ByteList.init(""); // There could still be data waiting to be read in the pipe // so we need to create a new stream that will read from the // pipe and then return the blob. - var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis); const result = JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlobWithPoll( + JSC.WebCore.ReadableStream.fromFIFO( globalThis, - &blob, - 0, - poll_ref, + &this.fifo, + internal_buffer, ), globalThis, ).?; - blob.detach(); - result.ptr.File.buffered_data = this.internal_buffer; - result.ptr.File.stored_global_this_ = globalThis; - result.ptr.File.finished = exited; - this.internal_buffer = bun.ByteList.init(""); - this.fd = JSC.Node.invalid_fd; - this.received_eof = false; return result; } } - pub fn autoCloseFileDescriptor(this: *BufferedOutput) void { - const fd = this.fd; - if (fd == JSC.Node.invalid_fd) - return; - this.fd = JSC.Node.invalid_fd; - - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); - } - - _ = JSC.Node.Syscall.close(fd); - } - pub fn close(this: *BufferedOutput) void { - this.autoCloseFileDescriptor(); + switch (this.status) { + .done => {}, + .pending => { + this.fifo.close(); + this.status = .{ .done = {} }; + }, + .err => {}, + } if (this.internal_buffer.cap > 0) { this.internal_buffer.listManaged(bun.default_allocator).deinit(); this.internal_buffer = .{}; } - - this.received_eof = true; } }; @@ -748,7 +677,7 @@ pub const Subprocess = struct { pipe: *JSC.WebCore.FileSink, readable_stream: JSC.WebCore.ReadableStream, }, - fd: JSC.Node.FileDescriptor, + fd: bun.FileDescriptor, buffered_input: BufferedInput, inherit: void, ignore: void, @@ -763,7 +692,7 @@ pub const Subprocess = struct { pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {} pub fn onStart(_: *Writable) void {} - pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { + pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { switch (stdio) { .pipe => { var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); @@ -771,7 +700,9 @@ pub const Subprocess = struct { .fd = fd, .buffer = bun.ByteList.init(&.{}), .allocator = globalThis.bunVM().allocator, + .auto_close = true, }; + if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd); sink.mode = std.os.S.IFIFO; if (stdio == .pipe) { if (stdio.pipe) |readable| { @@ -787,6 +718,7 @@ pub const Subprocess = struct { return Writable{ .pipe = sink }; }, .array_buffer, .blob => { + if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd); var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined }; switch (stdio) { .array_buffer => |array_buffer| { @@ -800,7 +732,7 @@ pub const Subprocess = struct { return Writable{ .buffered_input = buffered_input }; }, .fd => { - return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) }; + return Writable{ .fd = @intCast(bun.FileDescriptor, fd) }; }, .inherit => { return Writable{ .inherit = {} }; @@ -842,15 +774,21 @@ pub const Subprocess = struct { } }; - pub fn finalize(this: *Subprocess) callconv(.C) void { - this.unref(); - this.closePorts(); - this.stdout.close(); + pub fn finalizeSync(this: *Subprocess) void { + this.closeProcess(); + this.stdin.close(); this.stderr.close(); + this.stdin.close(); - this.finalized = true; - bun.default_allocator.destroy(this); + this.exit_promise.deinit(); + this.on_exit_callback.deinit(); + } + + pub fn finalize(this: *Subprocess) callconv(.C) void { + std.debug.assert(!this.hasPendingActivity()); + this.finalizeSync(); log("Finalize", .{}); + bun.default_allocator.destroy(this); } pub fn getExited( @@ -1203,18 +1141,27 @@ pub const Subprocess = struct { .globalThis = globalThis, .pid = pid, .pidfd = pidfd, - .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch { + .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch { globalThis.throw("out of memory", .{}); return .zero; }, - .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis), - .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis), + .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis), + .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, + .is_sync = is_sync, }; if (subprocess.stdin == .pipe) { subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin); } + if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { + subprocess.stdout.pipe.buffer.setup(jsc_vm.allocator, stdout_pipe[0], default_max_buffer_size); + } + + if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { + subprocess.stderr.pipe.buffer.setup(jsc_vm.allocator, stderr_pipe[0], default_max_buffer_size); + } + const out = if (comptime !is_sync) subprocess.toJS(globalThis) else @@ -1223,7 +1170,7 @@ pub const Subprocess = struct { if (comptime !is_sync) { var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess); subprocess.poll_ref = poll; - switch (poll.register( + switch (subprocess.poll_ref.?.register( jsc_vm.uws_event_loop.?, .process, true, @@ -1252,20 +1199,20 @@ pub const Subprocess = struct { if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { if (comptime is_sync) { if (subprocess.stdout.pipe.buffer.canRead()) { - subprocess.stdout.pipe.buffer.readAll(true); + subprocess.stdout.pipe.buffer.readAll(); } } else if (!lazy) { - subprocess.stdout.pipe.buffer.readIfPossible(false); + subprocess.stdout.pipe.buffer.readAll(); } } if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { if (comptime is_sync) { if (subprocess.stderr.pipe.buffer.canRead()) { - subprocess.stderr.pipe.buffer.readAll(true); + subprocess.stderr.pipe.buffer.readAll(); } } else if (!lazy) { - subprocess.stderr.pipe.buffer.readIfPossible(false); + subprocess.stderr.pipe.buffer.readAll(); } } @@ -1273,11 +1220,43 @@ pub const Subprocess = struct { return out; } - subprocess.wait(true); + if (subprocess.stdin == .buffered_input) { + while (subprocess.stdin.buffered_input.remain.len > 0) { + subprocess.stdin.buffered_input.writeIfPossible(true); + } + } + + { + var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess); + subprocess.poll_ref = poll; + switch (subprocess.poll_ref.?.register( + jsc_vm.uws_event_loop.?, + .process, + true, + )) { + .result => {}, + .err => |err| { + if (err.getErrno() == .SRCH) { + @panic("This shouldn't happen"); + } + + // process has already exited + // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007 + subprocess.onExitNotification(); + }, + } + } + + while (!subprocess.hasExited()) { + jsc_vm.tick(); + jsc_vm.eventLoop().autoTick(); + } + + // subprocess.wait(true); const exitCode = subprocess.exit_code orelse 1; const stdout = subprocess.stdout.toBufferedValue(globalThis); const stderr = subprocess.stderr.toBufferedValue(globalThis); - subprocess.finalize(); + subprocess.finalizeSync(); const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4); sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode))); @@ -1290,16 +1269,22 @@ pub const Subprocess = struct { pub fn onExitNotification( this: *Subprocess, ) void { - this.wait(false); + this.wait(this.is_sync); } pub fn wait(this: *Subprocess, sync: bool) void { if (this.has_waitpid_task) { return; } - + defer this.updateHasPendingActivityFlag(); this.has_waitpid_task = true; const pid = this.pid; + + if (!sync) { + // signal to the other end we are definitely done + this.stdin.close(); + } + switch (PosixSpawn.waitpid(pid, 0)) { .err => |err| { this.waitpid_err = err; @@ -1331,14 +1316,16 @@ pub const Subprocess = struct { poll.deinitWithVM(vm); } - this.onExit(); + this.onExit(this.globalThis); } } - fn onExit(this: *Subprocess) void { - defer this.updateHasPendingActivity(); - this.closePorts(); + fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void { + this.stdin.close(); + this.stdout.close(); + this.stderr.close(); + defer this.updateHasPendingActivity(); this.has_waitpid_task = false; if (this.on_exit_callback.trySwap()) |callback| { @@ -1349,7 +1336,7 @@ pub const Subprocess = struct { const waitpid_value: JSValue = if (this.waitpid_err) |err| - err.toJSC(this.globalThis) + err.toJSC(globalThis) else JSC.JSValue.jsUndefined(); @@ -1359,21 +1346,21 @@ pub const Subprocess = struct { }; const result = callback.call( - this.globalThis, + globalThis, args[0 .. @as(usize, @boolToInt(this.exit_code != null)) + @as(usize, @boolToInt(this.waitpid_err != null))], ); - if (result.isAnyError(this.globalThis)) { - this.globalThis.bunVM().onUnhandledError(this.globalThis, result); + if (result.isAnyError(globalThis)) { + globalThis.bunVM().onUnhandledError(globalThis, result); } } if (this.exit_promise.trySwap()) |promise| { if (this.exit_code) |code| { - promise.asPromise().?.resolve(this.globalThis, JSValue.jsNumber(code)); + promise.asPromise().?.resolve(globalThis, JSValue.jsNumber(code)); } else if (this.waitpid_err) |err| { this.waitpid_err = null; - promise.asPromise().?.reject(this.globalThis, err.toJSC(this.globalThis)); + promise.asPromise().?.reject(globalThis, err.toJSC(globalThis)); } else { // crash in debug mode if (comptime Environment.allow_assert) @@ -1395,7 +1382,7 @@ pub const Subprocess = struct { const Stdio = union(enum) { inherit: void, ignore: void, - fd: JSC.Node.FileDescriptor, + fd: bun.FileDescriptor, path: JSC.Node.PathLike, blob: JSC.WebCore.AnyBlob, pipe: ?JSC.WebCore.ReadableStream, @@ -1454,7 +1441,7 @@ pub const Subprocess = struct { if (blob.needsToReadFile()) { if (blob.store()) |store| { if (store.data.file.pathlike == .fd) { - if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) { + if (store.data.file.pathlike.fd == @intCast(bun.FileDescriptor, i)) { stdio_array[i] = Stdio{ .inherit = {} }; } else { switch (@intCast(std.os.fd_t, i)) { @@ -1520,7 +1507,7 @@ pub const Subprocess = struct { return false; } - const fd = @intCast(JSC.Node.FileDescriptor, fd_); + const fd = @intCast(bun.FileDescriptor, fd_); switch (@intCast(std.os.fd_t, i)) { std.os.STDIN_FILENO => { diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index 91248dc5c..e99e49f07 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -817,8 +817,7 @@ fn HandlerCallback( ) (fn (*HandlerType, *LOLHTMLType) bool) { return struct { pub fn callback(this: *HandlerType, value: *LOLHTMLType) bool { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); var zig_element = bun.default_allocator.create(ZigType) catch unreachable; @field(zig_element, field_name) = value; // At the end of this scope, the value is no longer valid diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index c4724b1b8..531d4830b 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1428,10 +1428,17 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp stream.value.ensureStillAlive(); - if (!stream.isLocked(this.server.globalThis)) { - streamLog("is not locked", .{}); - this.renderMissing(); - return; + const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and + response_stream.sink.buffer.len == 0); + + if (!stream.isLocked(this.server.globalThis) and !is_in_progress) { + if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| { + if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) { + streamLog("is not locked", .{}); + this.renderMissing(); + return; + } + } } this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index e456a7113..981c0d2e4 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -3931,7 +3931,7 @@ pub const PollRef = struct { /// Make calling ref() on this poll into a no-op. pub fn disable(this: *PollRef) void { - this.unref(); + this.unref(JSC.VirtualMachine.vm); this.status = .done; } @@ -3987,6 +3987,7 @@ pub const FilePoll = struct { const FileReader = JSC.WebCore.FileReader; const FileSink = JSC.WebCore.FileSink; + const FIFO = JSC.WebCore.FIFO; const Subprocess = JSC.Subprocess; const BufferedInput = Subprocess.BufferedInput; const BufferedOutput = Subprocess.BufferedOutput; @@ -3999,7 +4000,7 @@ pub const FilePoll = struct { FileSink, Subprocess, BufferedInput, - BufferedOutput, + FIFO, Deactivated, }); @@ -4009,6 +4010,7 @@ pub const FilePoll = struct { flags.remove(.writable); flags.remove(.process); flags.remove(.eof); + flags.remove(.hup); flags.setUnion(updated); poll.flags = flags; @@ -4079,9 +4081,9 @@ pub const FilePoll = struct { } var ptr = poll.owner; switch (ptr.tag()) { - @field(Owner.Tag, "FileReader") => { - log("onUpdate: FileReader", .{}); - ptr.as(FileReader).onPoll(size_or_offset, 0); + @field(Owner.Tag, "FIFO") => { + log("onUpdate: FIFO", .{}); + ptr.as(FIFO).ready(size_or_offset); }, @field(Owner.Tag, "Subprocess") => { log("onUpdate: Subprocess", .{}); @@ -4095,17 +4097,9 @@ pub const FilePoll = struct { loader.onPoll(size_or_offset, 0); }, - @field(Owner.Tag, "BufferedInput") => { - log("onUpdate: BufferedInput", .{}); - var loader = ptr.as(JSC.Subprocess.BufferedInput); - loader.onReady(size_or_offset); + else => { + log("onUpdate: disconnected?", .{}); }, - @field(Owner.Tag, "BufferedOutput") => { - log("onUpdate: BufferedOutput", .{}); - var loader = ptr.as(JSC.Subprocess.BufferedOutput); - loader.ready(size_or_offset); - }, - else => {}, } } @@ -4155,15 +4149,20 @@ pub const FilePoll = struct { var flags = Flags.Set{}; if (kqueue_event.filter == std.os.system.EVFILT_READ) { flags.insert(Flags.readable); + log("readable", .{}); if (kqueue_event.flags & std.os.system.EV_EOF != 0) { - flags.insert(Flags.eof); + flags.insert(Flags.hup); + log("hup", .{}); } } else if (kqueue_event.filter == std.os.system.EVFILT_WRITE) { flags.insert(Flags.writable); + log("writable", .{}); if (kqueue_event.flags & std.os.system.EV_EOF != 0) { flags.insert(Flags.hup); + log("hup", .{}); } } else if (kqueue_event.filter == std.os.system.EVFILT_PROC) { + log("proc", .{}); flags.insert(Flags.process); } return flags; @@ -4245,17 +4244,15 @@ pub const FilePoll = struct { this.flags.insert(.has_incremented_poll_count); } - pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { + pub fn init(vm: *JSC.VirtualMachine, fd: bun.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { return initWithOwner(vm, fd, flags, Owner.init(owner)); } - pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll { + pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: bun.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll { var poll = vm.rareData().filePolls(vm).get(); - poll.* = .{ - .fd = @intCast(u32, fd), - .flags = Flags.Set.init(flags), - .owner = owner, - }; + poll.fd = @intCast(u32, fd); + poll.flags = Flags.Set.init(flags); + poll.owner = owner; return poll; } @@ -4350,7 +4347,7 @@ pub const FilePoll = struct { } } else if (comptime Environment.isMac) { var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); - const one_shot_flag: @TypeOf(changelist[0].flags) = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT; + const one_shot_flag: u16 = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT; changelist[0] = switch (flag) { .readable => .{ .ident = @intCast(u64, fd), @@ -4410,7 +4407,7 @@ pub const FilePoll = struct { // processing an element of the changelist and there is enough room // in the eventlist, then the event will be placed in the eventlist // with EV_ERROR set in flags and the system error in data. - if (changelist[0].flags == std.c.EV_ERROR) { + if (changelist[0].flags == std.c.EV_ERROR and changelist[0].data != 0) { return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; // Otherwise, -1 will be returned, and errno will be set to // indicate the error condition. @@ -4440,7 +4437,7 @@ pub const FilePoll = struct { return JSC.Maybe(void).success; } - pub const invalid_fd = JSC.Node.invalid_fd; + const invalid_fd = bun.invalid_fd; pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) { if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) { diff --git a/src/bun.js/bindings/URLSearchParams.cpp b/src/bun.js/bindings/URLSearchParams.cpp index 0ce9554c7..9c168b4f6 100644 --- a/src/bun.js/bindings/URLSearchParams.cpp +++ b/src/bun.js/bindings/URLSearchParams.cpp @@ -96,9 +96,11 @@ void URLSearchParams::set(const String& name, const String& value) return false; }); updateURL(); + needsSorting = true; return; } m_pairs.append({ name, value }); + needsSorting = true; updateURL(); } @@ -106,6 +108,7 @@ void URLSearchParams::append(const String& name, const String& value) { m_pairs.append({ name, value }); updateURL(); + needsSorting = true; } Vector<String> URLSearchParams::getAll(const String& name) const @@ -122,10 +125,12 @@ Vector<String> URLSearchParams::getAll(const String& name) const void URLSearchParams::remove(const String& name) { - m_pairs.removeAllMatching([&](const auto& pair) { - return pair.key == name; - }); + if (!m_pairs.removeAllMatching([&](const auto& pair) { + return pair.key == name; + })) + return; updateURL(); + needsSorting = true; } String URLSearchParams::toString() const diff --git a/src/bun.js/bindings/URLSearchParams.h b/src/bun.js/bindings/URLSearchParams.h index 3749ed9f9..486098adc 100644 --- a/src/bun.js/bindings/URLSearchParams.h +++ b/src/bun.js/bindings/URLSearchParams.h @@ -72,6 +72,7 @@ private: WeakPtr<DOMURL> m_associatedURL; Vector<KeyValuePair<String, String>> m_pairs; + bool needsSorting { true }; }; } // namespace WebCore diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 2ea4bc65a..5d7c49292 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -2382,6 +2382,7 @@ void GlobalObject::finishCreation(VM& vm) auto& global = *reinterpret_cast<Zig::GlobalObject*>(init.owner); if (global.crypto == nullptr) { global.crypto = WebCore::SubtleCrypto::createPtr(global.scriptExecutionContext()); + global.crypto->ref(); } init.set( diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index 977f1ca26..8c0df075e 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -2277,7 +2277,7 @@ static void populateStackFramePosition(const JSC::StackFrame* stackFrame, ZigStr // Make sure the range is valid WTF::StringView sourceString = m_codeBlock->source().provider()->source(); - if (!expressionStop || expressionStart > static_cast<int>(sourceString.length())) { + if (expressionStop < 1 || expressionStart > static_cast<int>(sourceString.length())) { return; } diff --git a/src/bun.js/bindings/headers-cpp.h b/src/bun.js/bindings/headers-cpp.h index 05368907e..e5f4cb8f8 100644 --- a/src/bun.js/bindings/headers-cpp.h +++ b/src/bun.js/bindings/headers-cpp.h @@ -1,4 +1,4 @@ -//-- AUTOGENERATED FILE -- 1668983536 +//-- AUTOGENERATED FILE -- 1669191472 // clang-format off #pragma once diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index 99b046b59..a9d106029 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -1,5 +1,5 @@ // clang-format off -//-- AUTOGENERATED FILE -- 1668983536 +//-- AUTOGENERATED FILE -- 1669191472 #pragma once #include <stddef.h> diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp index 90b9eeada..f2a3d5a55 100644 --- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp +++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp @@ -1039,7 +1039,7 @@ const char* const s_readableStreamInternalsReadDirectStreamCode = "\n" \ " if (highWaterMark) {\n" \ " sink.start({\n" \ - " highWaterMark,\n" \ + " highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,\n" \ " });\n" \ " }\n" \ "\n" \ @@ -2270,7 +2270,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None; const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public; -const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2614; +const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3647; static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_readableStreamInternalsLazyLoadStreamCode = "(function (stream, autoAllocateChunkSize) {\n" \ @@ -2280,7 +2280,7 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = " var nativePtr = @getByIdDirectPrivate(stream, \"bunNativePtr\");\n" \ " var Prototype = @lazyStreamPrototypeMap.@get(nativeType);\n" \ " if (Prototype === @undefined) {\n" \ - " var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);\n" \ + " var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);\n" \ " var closer = [false];\n" \ " var handleResult;\n" \ " function handleNativeReadableStreamPromiseResult(val) {\n" \ @@ -2293,8 +2293,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = "\n" \ " handleResult = function handleResult(result, controller, view) {\n" \ " \"use strict\";\n" \ - "\n" \ - " \n" \ " if (result && @isPromise(result)) {\n" \ " return result.then(\n" \ " handleNativeReadableStreamPromiseResult.bind({\n" \ @@ -2319,53 +2317,96 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = " }\n" \ " };\n" \ "\n" \ + " function createResult(tag, controller, view, closer) {\n" \ + " closer[0] = false;\n" \ + "\n" \ + " var result;\n" \ + " try {\n" \ + " result = pull(tag, view, closer);\n" \ + " } catch (err) {\n" \ + " return controller.error(err);\n" \ + " }\n" \ + "\n" \ + " return handleResult(result, controller, view);\n" \ + " }\n" \ + "\n" \ " Prototype = class NativeReadableStreamSource {\n" \ - " constructor(tag, autoAllocateChunkSize) {\n" \ - " this.pull = this.pull_.bind(tag);\n" \ - " this.cancel = this.cancel_.bind(tag);\n" \ + " constructor(tag, autoAllocateChunkSize, drainValue) {\n" \ + " this.#tag = tag;\n" \ + " this.pull = this.#pull.bind(this);\n" \ + " this.cancel = this.#cancel.bind(this);\n" \ " this.autoAllocateChunkSize = autoAllocateChunkSize;\n" \ + "\n" \ + " if (drainValue !== @undefined) {\n" \ + " this.start = (controller) => {\n" \ + " controller.enqueue(drainValue);\n" \ + " console.log(\"chunkSize\", chunkSize);\n" \ + " };\n" \ + " }\n" \ " }\n" \ "\n" \ " pull;\n" \ " cancel;\n" \ + " start;\n" \ "\n" \ + " #tag;\n" \ " type = \"bytes\";\n" \ " autoAllocateChunkSize = 0;\n" \ - "\n" \ + " \n" \ " static startSync = start;\n" \ + " \n" \ + " \n" \ + " #pull(controller) {\n" \ + " var tag = this.#tag;\n" \ "\n" \ - " pull_(controller) {\n" \ - " closer[0] = false;\n" \ - "\n" \ - " var result;\n" \ - "\n" \ - " const view = controller.byobRequest.view;\n" \ - " try {\n" \ - " result = pull(this, view, closer);\n" \ - " } catch (err) {\n" \ - " return controller.error(err);\n" \ + " if (!tag) {\n" \ + " controller.close();\n" \ + " return;\n" \ " }\n" \ "\n" \ - " return handleResult(result, controller, view);\n" \ + " createResult(tag, controller, controller.byobRequest.view, closer);\n" \ " }\n" \ "\n" \ - " cancel_(reason) {\n" \ - " cancel(this, reason);\n" \ + " #cancel(reason) {\n" \ + " var tag = this.#tag;\n" \ + " setRefOrUnref && setRefOrUnref(tag, false);\n" \ + " cancel(tag, reason);\n" \ " }\n" \ " static deinit = deinit;\n" \ " static registry = new FinalizationRegistry(deinit);\n" \ + " static drain = drain;\n" \ " };\n" \ " @lazyStreamPrototypeMap.@set(nativeType, Prototype);\n" \ " }\n" \ "\n" \ " const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);\n" \ + " var drainValue;\n" \ + " const drainFn = Prototype.drain;\n" \ + " if (drainFn) {\n" \ + " drainValue = drainFn(nativePtr);\n" \ + " }\n" \ "\n" \ " //\n" \ " if (chunkSize === 0) {\n" \ - " @readableStreamClose(stream);\n" \ - " return null;\n" \ + " if ((drainValue?.byteLength ?? 0) > 0) {\n" \ + " deinit && nativePtr && @enqueueJob(deinit, nativePtr);\n" \ + " return {\n" \ + " start(controller) {\n" \ + " controller.enqueue(drainValue);\n" \ + " controller.close();\n" \ + " },\n" \ + " type: \"bytes\",\n" \ + " };\n" \ + " }\n" \ + "\n" \ + " return {\n" \ + " start(controller) {\n" \ + " controller.close();\n" \ + " },\n" \ + " type: \"bytes\",\n" \ + " };\n" \ " }\n" \ - " var instance = new Prototype(nativePtr, chunkSize);\n" \ + " var instance = new Prototype(nativePtr, chunkSize, drainValue);\n" \ " Prototype.registry.register(instance, nativePtr);\n" \ " return instance;\n" \ "})\n" \ diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index e8c9667a0..def4d51a3 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -839,7 +839,7 @@ function readDirectStream(stream, sink, underlyingSource) { if (highWaterMark) { sink.start({ - highWaterMark, + highWaterMark: highWaterMark < 64 ? 64 : highWaterMark, }); } @@ -1857,7 +1857,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); var Prototype = @lazyStreamPrototypeMap.@get(nativeType); if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); + var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType); var closer = [false]; var handleResult; function handleNativeReadableStreamPromiseResult(val) { @@ -1870,8 +1870,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { handleResult = function handleResult(result, controller, view) { "use strict"; - - if (result && @isPromise(result)) { return result.then( handleNativeReadableStreamPromiseResult.bind({ @@ -1896,53 +1894,96 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { } }; + function createResult(tag, controller, view, closer) { + closer[0] = false; + + var result; + try { + result = pull(tag, view, closer); + } catch (err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + Prototype = class NativeReadableStreamSource { - constructor(tag, autoAllocateChunkSize) { - this.pull = this.pull_.bind(tag); - this.cancel = this.cancel_.bind(tag); + constructor(tag, autoAllocateChunkSize, drainValue) { + this.#tag = tag; + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== @undefined) { + this.start = (controller) => { + controller.enqueue(drainValue); + console.log("chunkSize", chunkSize); + }; + } } pull; cancel; + start; + #tag; type = "bytes"; autoAllocateChunkSize = 0; - + static startSync = start; + + + #pull(controller) { + var tag = this.#tag; - pull_(controller) { - closer[0] = false; - - var result; - - const view = controller.byobRequest.view; - try { - result = pull(this, view, closer); - } catch (err) { - return controller.error(err); + if (!tag) { + controller.close(); + return; } - return handleResult(result, controller, view); + createResult(tag, controller, controller.byobRequest.view, closer); } - cancel_(reason) { - cancel(this, reason); + #cancel(reason) { + var tag = this.#tag; + setRefOrUnref && setRefOrUnref(tag, false); + cancel(tag, reason); } static deinit = deinit; static registry = new FinalizationRegistry(deinit); + static drain = drain; }; @lazyStreamPrototypeMap.@set(nativeType, Prototype); } const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + var drainValue; + const drainFn = Prototype.drain; + if (drainFn) { + drainValue = drainFn(nativePtr); + } // empty file, no need for native back-and-forth on this if (chunkSize === 0) { - @readableStreamClose(stream); - return null; + if ((drainValue?.byteLength ?? 0) > 0) { + deinit && nativePtr && @enqueueJob(deinit, nativePtr); + return { + start(controller) { + controller.enqueue(drainValue); + controller.close(); + }, + type: "bytes", + }; + } + + return { + start(controller) { + controller.close(); + }, + type: "bytes", + }; } - var instance = new Prototype(nativePtr, chunkSize); + var instance = new Prototype(nativePtr, chunkSize, drainValue); Prototype.registry.register(instance, nativePtr); return instance; } diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js index a9a1589dd..4819ebda0 100644 --- a/src/bun.js/child_process.exports.js +++ b/src/bun.js/child_process.exports.js @@ -1033,13 +1033,16 @@ export class ChildProcess extends EventEmitter { const stdio = options.stdio || ["pipe", "pipe", "pipe"]; const bunStdio = getBunStdioFromOptions(stdio); + var env = options.envPairs || undefined; + if (env === process.env) env = undefined; + this.#handle = Bun.spawn({ cmd: spawnargs, stdin: bunStdio[0], stdout: bunStdio[1], stderr: bunStdio[2], cwd: options.cwd || undefined, - env: options.envPairs || undefined, + env, onExit: this.#handleOnExit.bind(this), lazy: true, }); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 28e12fb84..d83bd3575 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -132,7 +132,7 @@ pub fn IOTask(comptime Context: type) type { pub fn deinit(this: *This) void { var allocator = this.allocator; this.ref.unref(this.event_loop.virtual_machine); - this.* = undefined; + allocator.destroy(this); } }; diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig index b5ba2b983..394e705db 100644 --- a/src/bun.js/node/node_fs.zig +++ b/src/bun.js/node/node_fs.zig @@ -23,7 +23,7 @@ const linux = os.linux; const PathOrBuffer = JSC.Node.PathOrBuffer; const PathLike = JSC.Node.PathLike; const PathOrFileDescriptor = JSC.Node.PathOrFileDescriptor; -const FileDescriptor = JSC.Node.FileDescriptor; +const FileDescriptor = bun.FileDescriptor; const DirIterator = @import("./dir_iterator.zig"); const Path = @import("../../resolver/resolve_path.zig"); const FileSystem = @import("../../fs.zig").FileSystem; @@ -1823,7 +1823,7 @@ const Arguments = struct { .file = undefined, .global_object = ctx.ptr(), }; - var fd: FileDescriptor = JSC.Node.invalid_fd; + var fd: FileDescriptor = bun.invalid_fd; if (arguments.next()) |arg| { arguments.eat(); @@ -1918,7 +1918,7 @@ const Arguments = struct { } } - if (fd != JSC.Node.invalid_fd) { + if (fd != bun.invalid_fd) { stream.file = .{ .fd = fd }; } else if (path) |path_| { stream.file = .{ .path = path_ }; @@ -1957,7 +1957,7 @@ const Arguments = struct { .file = undefined, .global_object = ctx.ptr(), }; - var fd: FileDescriptor = JSC.Node.invalid_fd; + var fd: FileDescriptor = bun.invalid_fd; if (arguments.next()) |arg| { arguments.eat(); @@ -2044,7 +2044,7 @@ const Arguments = struct { } } - if (fd != JSC.Node.invalid_fd) { + if (fd != bun.invalid_fd) { stream.file = .{ .fd = fd }; } else if (path) |path_| { stream.file = .{ .path = path_ }; diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig index e2d197073..1554186bc 100644 --- a/src/bun.js/node/syscall.zig +++ b/src/bun.js/node/syscall.zig @@ -11,12 +11,13 @@ const JSC = @import("../../jsc.zig"); const SystemError = JSC.SystemError; const bun = @import("../../global.zig"); const MAX_PATH_BYTES = bun.MAX_PATH_BYTES; -const fd_t = bun.FileDescriptorType; +const fd_t = bun.FileDescriptor; const C = @import("../../global.zig").C; const linux = os.linux; const Maybe = JSC.Maybe; const log = bun.Output.scoped(.SYS, false); +pub const syslog = log; // On Linux AARCh64, zig is missing stat & lstat syscalls const use_libc = (Environment.isLinux and Environment.isAarch64) or Environment.isMac; @@ -124,7 +125,7 @@ pub fn getcwd(buf: *[bun.MAX_PATH_BYTES]u8) Maybe([]const u8) { Result.errnoSys(0, .getcwd).?; } -pub fn fchmod(fd: JSC.Node.FileDescriptor, mode: JSC.Node.Mode) Maybe(void) { +pub fn fchmod(fd: bun.FileDescriptor, mode: JSC.Node.Mode) Maybe(void) { return Maybe(void).errnoSys(C.fchmod(fd, mode), .fchmod) orelse Maybe(void).success; } @@ -146,7 +147,7 @@ pub fn lstat(path: [:0]const u8) Maybe(os.Stat) { return Maybe(os.Stat){ .result = stat_ }; } -pub fn fstat(fd: JSC.Node.FileDescriptor) Maybe(os.Stat) { +pub fn fstat(fd: bun.FileDescriptor) Maybe(os.Stat) { var stat_ = mem.zeroes(os.Stat); if (Maybe(os.Stat).errnoSys(fstatSym(fd, &stat_), .fstat)) |err| return err; return Maybe(os.Stat){ .result = stat_ }; @@ -162,7 +163,7 @@ pub fn mkdir(file_path: [:0]const u8, flags: JSC.Node.Mode) Maybe(void) { } } -pub fn fcntl(fd: JSC.Node.FileDescriptor, cmd: i32, arg: usize) Maybe(usize) { +pub fn fcntl(fd: bun.FileDescriptor, cmd: i32, arg: usize) Maybe(usize) { const result = fcntl_symbol(fd, cmd, arg); if (Maybe(usize).errnoSys(result, .fcntl)) |err| return err; return .{ .result = @intCast(usize, result) }; @@ -179,12 +180,12 @@ pub fn getErrno(rc: anytype) std.os.E { }; } -pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) Maybe(JSC.Node.FileDescriptor) { +pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) Maybe(bun.FileDescriptor) { while (true) { const rc = Syscall.system.open(file_path, flags, perm); log("open({s}): {d}", .{ file_path, rc }); return switch (Syscall.getErrno(rc)) { - .SUCCESS => .{ .result = @intCast(JSC.Node.FileDescriptor, rc) }, + .SUCCESS => .{ .result = @intCast(bun.FileDescriptor, rc) }, .INTR => continue, else => |err| { return Maybe(std.os.fd_t){ @@ -204,7 +205,10 @@ pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) // That error is not unreachable for us pub fn close(fd: std.os.fd_t) ?Syscall.Error { log("close({d})", .{fd}); - std.debug.assert(fd != JSC.Node.invalid_fd); + std.debug.assert(fd != bun.invalid_fd); + if (comptime std.meta.trait.isSignedInt(@TypeOf(fd))) + std.debug.assert(fd > -1); + if (comptime Environment.isMac) { // This avoids the EINTR problem. return switch (system.getErrno(system.@"close$NOCANCEL"(fd))) { @@ -685,7 +689,7 @@ pub const Error = struct { } }; -pub fn setPipeCapacityOnLinux(fd: JSC.Node.FileDescriptor, capacity: usize) Maybe(usize) { +pub fn setPipeCapacityOnLinux(fd: bun.FileDescriptor, capacity: usize) Maybe(usize) { if (comptime !Environment.isLinux) @compileError("Linux-only"); std.debug.assert(capacity > 0); diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 680425ddc..020fd5f2d 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -43,7 +43,6 @@ pub fn DeclEnum(comptime T: type) type { }); } -pub const FileDescriptor = os.fd_t; pub const Flavor = enum { sync, promise, @@ -602,7 +601,7 @@ pub const PathLike = union(Tag) { }; pub const Valid = struct { - pub fn fileDescriptor(fd: FileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) bool { + pub fn fileDescriptor(fd: bun.FileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) bool { if (fd < 0) { JSC.throwInvalidArguments("Invalid file descriptor, must not be negative number", .{}, ctx, exception); return false; @@ -756,14 +755,14 @@ pub const ArgumentsSlice = struct { } }; -pub fn fileDescriptorFromJS(ctx: JSC.C.JSContextRef, value: JSC.JSValue, exception: JSC.C.ExceptionRef) ?FileDescriptor { +pub fn fileDescriptorFromJS(ctx: JSC.C.JSContextRef, value: JSC.JSValue, exception: JSC.C.ExceptionRef) ?bun.FileDescriptor { if (!value.isNumber() or value.isBigInt()) return null; const fd = value.toInt32(); if (!Valid.fileDescriptor(fd, ctx, exception)) { return null; } - return @truncate(FileDescriptor, fd); + return @truncate(bun.FileDescriptor, fd); } var _get_time_prop_string: ?JSC.C.JSStringRef = null; @@ -826,18 +825,18 @@ pub fn modeFromJS(ctx: JSC.C.JSContextRef, value: JSC.JSValue, exception: JSC.C. pub const PathOrFileDescriptor = union(Tag) { path: PathLike, - fd: FileDescriptor, + fd: bun.FileDescriptor, pub const Tag = enum { fd, path }; - pub fn hash(this: PathOrFileDescriptor) u64 { + pub fn hash(this: JSC.Node.PathOrFileDescriptor) u64 { return switch (this) { .path => std.hash.Wyhash.hash(0, this.path.slice()), .fd => std.hash.Wyhash.hash(0, std.mem.asBytes(&this.fd)), }; } - pub fn format(this: PathOrFileDescriptor, comptime fmt: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { + pub fn format(this: JSC.Node.PathOrFileDescriptor, comptime fmt: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { if (fmt.len != 0 and fmt != "s") { @compileError("Unsupported format argument: '" ++ fmt ++ "'."); } @@ -847,20 +846,20 @@ pub const PathOrFileDescriptor = union(Tag) { } } - pub fn fromJS(ctx: JSC.C.JSContextRef, arguments: *ArgumentsSlice, allocator: std.mem.Allocator, exception: JSC.C.ExceptionRef) ?PathOrFileDescriptor { + pub fn fromJS(ctx: JSC.C.JSContextRef, arguments: *ArgumentsSlice, allocator: std.mem.Allocator, exception: JSC.C.ExceptionRef) ?JSC.Node.PathOrFileDescriptor { const first = arguments.next() orelse return null; if (fileDescriptorFromJS(ctx, first, exception)) |fd| { arguments.eat(); - return PathOrFileDescriptor{ .fd = fd }; + return JSC.Node.PathOrFileDescriptor{ .fd = fd }; } if (exception.* != null) return null; - return PathOrFileDescriptor{ .path = PathLike.fromJSWithAllocator(ctx, arguments, allocator, exception) orelse return null }; + return JSC.Node.PathOrFileDescriptor{ .path = PathLike.fromJSWithAllocator(ctx, arguments, allocator, exception) orelse return null }; } - pub fn toJS(this: PathOrFileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) JSC.C.JSValueRef { + pub fn toJS(this: JSC.Node.PathOrFileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) JSC.C.JSValueRef { return switch (this) { .path => this.path.toJS(ctx, exception), .fd => JSC.JSValue.jsNumberFromInt32(@intCast(i32, this.fd)).asRef(), @@ -1578,6 +1577,7 @@ pub const Path = struct { if (name_.isEmpty()) { return JSC.ZigString.Empty.toValue(globalThis); } + const out = std.fmt.allocPrint(allocator, "{s}{s}", .{ name_, ext }) catch unreachable; defer allocator.free(out); diff --git a/src/bun.js/node_timers.exports.js b/src/bun.js/node_timers.exports.js index 58e660412..d46916ac5 100644 --- a/src/bun.js/node_timers.exports.js +++ b/src/bun.js/node_timers.exports.js @@ -1,3 +1,31 @@ +class Timeout { + #id; + #refCount = 1; + #clearFunction; + + constructor(id, clearFunction) { + this.#id = id; + this.#refCount = 1; + this.#clearFunction = clearFunction; + } + + ref() { + this.#refCount += 1; + } + + hasRef() { + return this.#refCount > 0; + } + + unref() { + this.#refCount -= 1; + var clearFunction = this.#clearFunction; + if (clearFunction && this.#refCount === 0) { + this.#clearFunction = null; + clearFunction(this.#id); + } + } +} export const setInterval = globalThis.setInterval; export const setImmediate = globalThis.queueMicrotask; export const setTimeout = globalThis.setTimeout; diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 28481dbb3..991c07ad0 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -2039,7 +2039,7 @@ var require_destroy = __commonJS({ r.destroyed = true; } if (!s.constructed) { - this.once(kDestroy, function (er) { + this.once(kDestroy, (er) => { _destroy(this, aggregateTwoErrors(er, err), cb); }); } else { @@ -5725,6 +5725,8 @@ function createNativeStream(nativeType, Readable) { var DYNAMICALLY_ADJUST_CHUNK_SIZE = process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1"; + const finalizer = new FinalizationRegistry((ptr) => ptr && deinit(ptr)); + var NativeReadable = class NativeReadable extends Readable { #ptr; #refCount = 1; @@ -5733,6 +5735,7 @@ function createNativeStream(nativeType, Readable) { #highWaterMark; #pendingRead = false; #hasResized = !DYNAMICALLY_ADJUST_CHUNK_SIZE; + #unregisterToken; constructor(ptr, options = {}) { super(options); if (typeof options.highWaterMark === "number") { @@ -5744,6 +5747,8 @@ function createNativeStream(nativeType, Readable) { this.#constructed = false; this.#remainingChunk = undefined; this.#pendingRead = false; + this.#unregisterToken = {}; + finalizer.register(this, this.#ptr, this.#unregisterToken); } _read(highWaterMark) { @@ -5789,6 +5794,10 @@ function createNativeStream(nativeType, Readable) { return chunk; } + push(result, encoding) { + return super.push(...arguments); + } + #handleResult(result, view, isClosed) { if (typeof result === "number") { if (result >= this.#highWaterMark && !this.#hasResized && !isClosed) { @@ -5844,6 +5853,7 @@ function createNativeStream(nativeType, Readable) { return; } + finalizer.unregister(this.#unregisterToken); this.#ptr = 0; if (updateRef) { updateRef(ptr, false); diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 0d01a218f..5c996da45 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -470,7 +470,7 @@ pub const Response = struct { } }; -const null_fd = JSC.Node.invalid_fd; +const null_fd = bun.invalid_fd; pub const Fetch = struct { const headers_string = "headers"; @@ -590,8 +590,7 @@ pub const Fetch = struct { } pub fn onDone(this: *FetchTasklet) void { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); const globalThis = this.global_this; @@ -641,6 +640,11 @@ pub const Fetch = struct { fn toBodyValue(this: *FetchTasklet) Body.Value { var response_buffer = this.response_buffer.list; + const response = Body.Value{ + .InternalBlob = .{ + .bytes = response_buffer.toManaged(bun.default_allocator), + }, + }; this.response_buffer = .{ .allocator = default_allocator, .list = .{ @@ -654,12 +658,7 @@ pub const Fetch = struct { // defer response_buffer.deinit(bun.default_allocator); // return .{ .InlineBlob = inline_blob }; // } - - return .{ - .InternalBlob = .{ - .bytes = response_buffer.toManaged(bun.default_allocator), - }, - }; + return response; } fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response { @@ -1488,7 +1487,7 @@ pub const Blob = struct { needs_async: *bool, comptime needs_open: bool, ) JSC.JSValue { - const fd: JSC.Node.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: { + const fd: bun.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: { var file_path: [bun.MAX_PATH_BYTES]u8 = undefined; switch (JSC.Node.Syscall.open( pathlike.path.sliceZ(&file_path), @@ -1608,7 +1607,7 @@ pub const Blob = struct { needs_async: *bool, comptime needs_open: bool, ) JSC.JSValue { - const fd: JSC.Node.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: { + const fd: bun.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: { var file_path: [bun.MAX_PATH_BYTES]u8 = undefined; switch (JSC.Node.Syscall.open( pathlike.path.sliceZ(&file_path), @@ -1819,16 +1818,21 @@ pub const Blob = struct { } pub fn deinit(this: *Blob.Store) void { + const allocator = this.allocator; + switch (this.data) { .bytes => |*bytes| { bytes.deinit(); }, .file => |file| { VirtualMachine.vm.removeFileBlob(file.pathlike); + if (file.pathlike == .path) { + allocator.free(bun.constStrToU8(file.pathlike.path.slice())); + } }, } - this.allocator.destroy(this); + allocator.destroy(this); } pub fn fromArrayList(list: std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator) !*Blob.Store { @@ -1843,7 +1847,7 @@ pub const Blob = struct { else std.os.O.RDONLY | __opener_flags; - pub fn getFdMac(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor { + pub fn getFdMac(this: *This) AsyncIO.OpenError!bun.FileDescriptor { var buf: [bun.MAX_PATH_BYTES]u8 = undefined; var path_string = if (@hasField(This, "file_store")) this.file_store.pathlike.path @@ -1865,7 +1869,7 @@ pub const Blob = struct { return this.opened_fd; } - pub fn getFd(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor { + pub fn getFd(this: *This) AsyncIO.OpenError!bun.FileDescriptor { if (this.opened_fd != null_fd) { return this.opened_fd; } @@ -1877,7 +1881,7 @@ pub const Blob = struct { } } - pub fn getFdLinux(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor { + pub fn getFdLinux(this: *This) AsyncIO.OpenError!bun.FileDescriptor { var aio = &AsyncIO.global; var buf: [bun.MAX_PATH_BYTES]u8 = undefined; @@ -1915,7 +1919,7 @@ pub const Blob = struct { return this.opened_fd; } - pub fn onOpen(this: *This, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.OpenError!JSC.Node.FileDescriptor) void { + pub fn onOpen(this: *This, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.OpenError!bun.FileDescriptor) void { this.opened_fd = result catch { this.errno = AsyncIO.asError(-completion.result); @@ -1981,7 +1985,7 @@ pub const Blob = struct { read_frame: @Frame(ReadFile.doRead) = undefined, close_frame: @Frame(ReadFile.doClose) = undefined, open_completion: HTTPClient.NetworkThread.Completion = undefined, - opened_fd: JSC.Node.FileDescriptor = null_fd, + opened_fd: bun.FileDescriptor = null_fd, read_completion: HTTPClient.NetworkThread.Completion = undefined, read_len: SizeType = 0, read_off: SizeType = 0, @@ -1995,8 +1999,6 @@ pub const Blob = struct { onCompleteCtx: *anyopaque = undefined, onCompleteCallback: OnReadFileCallback = undefined, - convert_to_byte_blob: bool = false, - pub const Read = struct { buf: []u8, is_temporary: bool = false, @@ -2103,44 +2105,22 @@ pub const Blob = struct { } }); return; } - var store = this.store.?; - if (this.convert_to_byte_blob and this.file_store.pathlike == .path) { - VirtualMachine.vm.removeFileBlob(this.file_store.pathlike); - } + var store = this.store.?; + var buf = this.buffer; + defer store.deref(); + defer bun.default_allocator.destroy(this); if (this.system_error) |err| { - bun.default_allocator.destroy(this); - store.deref(); cb(cb_ctx, ResultType{ .err = err }); return; } - var buf = this.buffer; - const is_temporary = !this.convert_to_byte_blob; - if (this.convert_to_byte_blob) { - if (store.data == .bytes) { - bun.default_allocator.free(this.buffer); - buf = store.data.bytes.slice(); - } else if (store.data == .file) { - if (this.file_store.pathlike == .path) { - if (this.file_store.pathlike.path == .string) { - bun.default_allocator.free(this.file_store.pathlike.path.slice()); - } - } - store.data = .{ .bytes = ByteStore.init(buf, bun.default_allocator) }; - } - } - - bun.default_allocator.destroy(this); - // Attempt to free it as soon as possible if (store.ref_count > 1) { - store.deref(); - cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = is_temporary } }); + cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = true } }); } else { - cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = is_temporary } }); - store.deref(); + cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = true } }); } } pub fn run(this: *ReadFile, task: *ReadFileTask) void { @@ -2238,7 +2218,6 @@ pub const Blob = struct { return; }; this.buffer = bytes; - this.convert_to_byte_blob = std.os.S.ISREG(stat.mode) and this.file_store.pathlike == .path; var remain = bytes; while (remain.len > 0) { @@ -2268,7 +2247,7 @@ pub const Blob = struct { file_blob: Blob, bytes_blob: Blob, - opened_fd: JSC.Node.FileDescriptor = null_fd, + opened_fd: bun.FileDescriptor = null_fd, open_frame: OpenFrameType = undefined, write_frame: @Frame(WriteFile.doWrite) = undefined, close_frame: @Frame(WriteFile.doClose) = undefined, @@ -2477,8 +2456,8 @@ pub const Blob = struct { offset: SizeType = 0, size: SizeType = 0, max_length: SizeType = Blob.max_size, - destination_fd: JSC.Node.FileDescriptor = null_fd, - source_fd: JSC.Node.FileDescriptor = null_fd, + destination_fd: bun.FileDescriptor = null_fd, + source_fd: bun.FileDescriptor = null_fd, system_error: ?SystemError = null, @@ -3458,7 +3437,7 @@ pub const Blob = struct { } } - pub fn NewReadFileHandler(comptime Function: anytype, comptime lifetime: Lifetime) type { + pub fn NewReadFileHandler(comptime Function: anytype) type { return struct { context: Blob, promise: JSPromise.Strong = .{}, @@ -3472,14 +3451,9 @@ pub const Blob = struct { switch (bytes_) { .result => |result| { const bytes = result.buf; - const is_temporary = result.is_temporary; if (blob.size > 0) blob.size = @minimum(@truncate(u32, bytes.len), blob.size); - if (!is_temporary) { - promise.resolve(globalThis, Function(&blob, globalThis, bytes, comptime lifetime)); - } else { - promise.resolve(globalThis, Function(&blob, globalThis, bytes, .temporary)); - } + promise.resolve(globalThis, Function(&blob, globalThis, bytes, .transfer)); }, .err => |err| { promise.reject(globalThis, err.toErrorInstance(globalThis)); @@ -3530,8 +3504,8 @@ pub const Blob = struct { read_file_task.schedule(); } - pub fn doReadFile(this: *Blob, comptime Function: anytype, comptime lifetime: Lifetime, global: *JSGlobalObject) JSValue { - const Handler = NewReadFileHandler(Function, lifetime); + pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObject) JSValue { + const Handler = NewReadFileHandler(Function); var promise = JSPromise.create(global); var handler = Handler{ @@ -3618,7 +3592,7 @@ pub const Blob = struct { pub fn toString(this: *Blob, global: *JSGlobalObject, comptime lifetime: Lifetime) JSValue { if (this.needsToReadFile()) { - return this.doReadFile(toStringWithBytes, lifetime, global); + return this.doReadFile(toStringWithBytes, global); } const view_: []u8 = @@ -3632,7 +3606,7 @@ pub const Blob = struct { pub fn toJSON(this: *Blob, global: *JSGlobalObject, comptime lifetime: Lifetime) JSValue { if (this.needsToReadFile()) { - return this.doReadFile(toJSONWithBytes, lifetime, global); + return this.doReadFile(toJSONWithBytes, global); } var view_ = this.sharedView(); @@ -3704,7 +3678,7 @@ pub const Blob = struct { pub fn toArrayBuffer(this: *Blob, global: *JSGlobalObject, comptime lifetime: Lifetime) JSValue { if (this.needsToReadFile()) { - return this.doReadFile(toArrayBufferWithBytes, lifetime, global); + return this.doReadFile(toArrayBufferWithBytes, global); } var view_ = this.sharedView(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 7a20b8562..54819a4e5 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -75,16 +75,18 @@ pub const ReadableStream = struct { return AnyBlob{ .Blob = blob }; }, .File => |blobby| { - var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis); - blobby.store.ref(); + if (blobby.lazy_readable == .blob) { + var blob = JSC.WebCore.Blob.initWithStore(blobby.lazy_readable.blob, globalThis); + blob.store.?.ref(); - // it should be lazy, file shouldn't have opened yet. - std.debug.assert(!blobby.started); + // it should be lazy, file shouldn't have opened yet. + std.debug.assert(!blobby.started); - stream.detach(globalThis); - blobby.deinit(); - stream.done(); - return AnyBlob{ .Blob = blob }; + stream.detach(globalThis); + blobby.deinit(); + stream.done(); + return AnyBlob{ .Blob = blob }; + } }, .Bytes => |bytes| { @@ -100,8 +102,10 @@ pub const ReadableStream = struct { return null; }, - else => return null, + else => {}, } + + return null; } pub fn done(this: *const ReadableStream) void { @@ -247,12 +251,7 @@ pub const ReadableStream = struct { } pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { - return fromBlobWithPoll(globalThis, blob, recommended_chunk_size, null); - } - - pub fn fromBlobWithPoll(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) JSC.JSValue { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); var store = blob.store orelse { return ReadableStream.empty(globalThis); }; @@ -260,6 +259,7 @@ pub const ReadableStream = struct { .bytes => { var reader = bun.default_allocator.create(ByteBlobLoader.Source) catch unreachable; reader.* = .{ + .globalThis = globalThis, .context = undefined, }; reader.context.setup(blob, recommended_chunk_size); @@ -268,17 +268,51 @@ pub const ReadableStream = struct { .file => { var reader = bun.default_allocator.create(FileReader.Source) catch unreachable; reader.* = .{ - .context = undefined, + .globalThis = globalThis, + .context = .{ + .lazy_readable = .{ + .blob = store, + }, + }, }; - reader.context.setupWithPoll(store, recommended_chunk_size, poll); + store.ref(); return reader.toJS(globalThis); }, } } + pub fn fromFIFO( + globalThis: *JSGlobalObject, + fifo: *FIFO, + buffered_data: bun.ByteList, + ) JSC.JSValue { + JSC.markBinding(@src()); + var reader = bun.default_allocator.create(FileReader.Source) catch unreachable; + reader.* = .{ + .globalThis = globalThis, + .context = .{ + .buffered_data = buffered_data, + .lazy_readable = .{ + .readable = .{ + .FIFO = fifo.*, + }, + }, + }, + }; + + if (reader.context.lazy_readable.readable.FIFO.poll_ref) |poll| { + poll.owner.set(&reader.context.lazy_readable.readable.FIFO); + fifo.poll_ref = null; + } + reader.context.lazy_readable.readable.FIFO.pending.future = undefined; + reader.context.lazy_readable.readable.FIFO.auto_sizer = null; + reader.context.lazy_readable.readable.FIFO.pending.state = .none; + + return reader.toJS(globalThis); + } + pub fn empty(globalThis: *JSGlobalObject) JSC.JSValue { - if (comptime JSC.is_bindgen) - unreachable; + JSC.markBinding(@src()); return ReadableStream__empty(globalThis); } @@ -288,7 +322,7 @@ pub const ReadableStream = struct { invalid = 0, _, - pub fn init(filedes: JSC.Node.FileDescriptor) StreamTag { + pub fn init(filedes: bun.FileDescriptor) StreamTag { var bytes = [8]u8{ 1, 0, 0, 0, 0, 0, 0, 0 }; const filedes_ = @bitCast([8]u8, @as(usize, @truncate(u56, @intCast(usize, filedes)))); bytes[1..8].* = filedes_[0..7].*; @@ -296,14 +330,14 @@ pub const ReadableStream = struct { return @intToEnum(StreamTag, @bitCast(u64, bytes)); } - pub fn fd(this: StreamTag) JSC.Node.FileDescriptor { + pub fn fd(this: StreamTag) bun.FileDescriptor { var bytes = @bitCast([8]u8, @enumToInt(this)); if (bytes[0] != 1) { - return JSC.Node.invalid_fd; + return bun.invalid_fd; } var out: u64 = 0; @bitCast([8]u8, out)[0..7].* = bytes[1..8].*; - return @intCast(JSC.Node.FileDescriptor, out); + return @intCast(bun.FileDescriptor, out); } }; }; @@ -439,7 +473,7 @@ pub const StreamStart = union(Tag) { return .{ .FileSink = .{ - .input_path = .{ .fd = JSC.Node.invalid_fd }, + .input_path = .{ .fd = bun.invalid_fd }, .chunk_size = chunk_size, }, }; @@ -523,39 +557,72 @@ pub const StreamResult = union(Tag) { into_array_and_done: Blob.SizeType, pub const Pending = struct { - frame: anyframe, + future: Future = undefined, result: Writable, consumed: Blob.SizeType = 0, state: StreamResult.Pending.State = .none, - pub fn run(this: *Writable.Pending) void { - if (this.state != .pending) { - return; + pub const Future = union(enum) { + promise: struct { + promise: *JSPromise, + globalThis: *JSC.JSGlobalObject, + }, + handler: Handler, + }; + + pub fn promise(this: *Writable.Pending, globalThis: *JSC.JSGlobalObject) *JSPromise { + var prom = JSPromise.create(globalThis); + this.future = .{ + .promise = .{ .promise = prom, .globalThis = globalThis }, + }; + this.state = .pending; + return prom; + } + + pub const Handler = struct { + ctx: *anyopaque, + handler: Fn, + + pub const Fn = fn (ctx: *anyopaque, result: StreamResult.Writable) void; + + pub fn init(this: *Handler, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult.Writable) void) void { + this.ctx = ctx; + this.handler = struct { + const handler = handler_fn; + pub fn onHandle(ctx_: *anyopaque, result: StreamResult.Writable) void { + @call(.{ .modifier = .always_inline }, handler, .{ bun.cast(*Context, ctx_), result }); + } + }.onHandle; } + }; + pub fn run(this: *Writable.Pending) void { + if (this.state != .pending) return; this.state = .used; - resume this.frame; + switch (this.future) { + .promise => |p| { + Writable.fulfillPromise(this.result, p.promise, p.globalThis); + }, + .handler => |h| { + h.handler(h.ctx, this.result); + }, + } } }; - pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Writable.Pending) void { - var frame = bun.default_allocator.create(@Frame(Writable.toPromisedWrap)) catch unreachable; - pending.state = .pending; - frame.* = async Writable.toPromisedWrap(globalThis, promise, pending); - pending.frame = frame; - } - pub fn isDone(this: *const Writable) bool { return switch (this.*) { .owned_and_done, .temporary_and_done, .into_array_and_done, .done, .err => true, else => false, }; } - fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Writable.Pending) void { - suspend {} - - const result: Writable = pending.result; + pub fn fulfillPromise( + result: Writable, + promise: *JSPromise, + globalThis: *JSGlobalObject, + ) void { + promise.asValue(globalThis).unprotect(); switch (result) { .err => |err| { promise.reject(globalThis, err.toJSC(globalThis)); @@ -585,9 +652,9 @@ pub const StreamResult = union(Tag) { .done => JSC.JSValue.jsBoolean(true), .pending => |pending| brk: { - var promise = JSC.JSPromise.create(globalThis); - Writable.toPromised(globalThis, promise, pending); - break :brk promise.asValue(globalThis); + const promise_value = pending.promise(globalThis).asValue(globalThis); + promise_value.protect(); + break :brk promise_value; }, }; } @@ -599,10 +666,59 @@ pub const StreamResult = union(Tag) { }; pub const Pending = struct { - frame: anyframe, - result: StreamResult, + future: Future = undefined, + result: StreamResult = .{ .done = {} }, state: State = .none, + pub fn set(this: *Pending, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void { + this.future.init(Context, ctx, handler_fn); + this.state = .pending; + } + + pub fn promise(this: *Pending, globalObject: *JSC.JSGlobalObject) *JSC.JSPromise { + var prom = JSC.JSPromise.create(globalObject); + this.future = .{ + .promise = .{ + .promise = prom, + .globalThis = globalObject, + }, + }; + this.state = .pending; + return prom; + } + + pub const Future = union(enum) { + promise: struct { + promise: *JSPromise, + globalThis: *JSC.JSGlobalObject, + }, + handler: Handler, + + pub fn init(this: *Future, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void { + this.* = .{ + .handler = undefined, + }; + this.handler.init(Context, ctx, handler_fn); + } + }; + + pub const Handler = struct { + ctx: *anyopaque, + handler: Fn, + + pub const Fn = fn (ctx: *anyopaque, result: StreamResult) void; + + pub fn init(this: *Handler, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void { + this.ctx = ctx; + this.handler = struct { + const handler = handler_fn; + pub fn onHandle(ctx_: *anyopaque, result: StreamResult) void { + @call(.{ .modifier = .always_inline }, handler, .{ bun.cast(*Context, ctx_), result }); + } + }.onHandle; + } + }; + pub const State = enum { none, pending, @@ -612,7 +728,14 @@ pub const StreamResult = union(Tag) { pub fn run(this: *Pending) void { if (this.state != .pending) return; this.state = .used; - resume this.frame; + switch (this.future) { + .promise => |p| { + StreamResult.fulfillPromise(this.result, p.promise, p.globalThis); + }, + .handler => |h| { + h.handler(h.ctx, this.result); + }, + } } }; @@ -623,11 +746,8 @@ pub const StreamResult = union(Tag) { }; } - fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void { - suspend {} - - const result: StreamResult = pending.result; - + pub fn fulfillPromise(result: StreamResult, promise: *JSC.JSPromise, globalThis: *JSC.JSGlobalObject) void { + promise.asValue(globalThis).unprotect(); switch (result) { .err => |err| { promise.reject(globalThis, err.toJSC(globalThis)); @@ -641,13 +761,6 @@ pub const StreamResult = union(Tag) { } } - pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void { - var frame = bun.default_allocator.create(@Frame(toPromisedWrap)) catch unreachable; - pending.state = .pending; - frame.* = async toPromisedWrap(globalThis, promise, pending); - pending.frame = frame; - } - pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue { switch (this.*) { .owned => |list| { @@ -675,9 +788,9 @@ pub const StreamResult = union(Tag) { return JSC.JSValue.jsNumberFromInt64(array.len); }, .pending => |pending| { - var promise = JSC.JSPromise.create(globalThis); - toPromised(globalThis, promise, pending); - return promise.asValue(globalThis); + const promise = pending.promise(globalThis).asValue(globalThis); + promise.protect(); + return promise; }, .err => |err| { @@ -1001,15 +1114,6 @@ pub const Sink = struct { } }; -pub const PathOrFileDescriptor = union(enum) { - path: ZigString.Slice, - fd: JSC.Node.FileDescriptor, - - pub fn deinit(this: *const PathOrFileDescriptor) void { - if (this.* == .path) this.path.deinit(); - } -}; - pub const FileSink = struct { buffer: bun.ByteList, allocator: std.mem.Allocator, @@ -1018,11 +1122,10 @@ pub const FileSink = struct { next: ?Sink = null, auto_close: bool = false, auto_truncate: bool = false, - fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, + fd: bun.FileDescriptor = bun.invalid_fd, mode: JSC.Node.Mode = 0, chunk_size: usize = 0, pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{ - .frame = undefined, .result = .{ .done = {} }, }, @@ -1032,13 +1135,16 @@ pub const FileSink = struct { requested_end: bool = false, has_adjusted_pipe_size_on_linux: bool = false, max_write_size: usize = std.math.maxInt(usize), - prevent_process_exit: bool = false, reachable_from_js: bool = true, poll_ref: ?*JSC.FilePoll = null, pub usingnamespace NewReadyWatcher(@This(), .writable, ready); const log = Output.scoped(.FileSink, false); + pub fn isReachable(this: *const FileSink) bool { + return this.reachable_from_js or this.signal.isDead(); + } + const max_fifo_size = 64 * 1024; pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) { var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; @@ -1079,9 +1185,9 @@ pub const FileSink = struct { } pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) { - if (this.fd != JSC.Node.invalid_fd) { + if (this.fd != bun.invalid_fd) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = JSC.Node.invalid_fd; + this.fd = bun.invalid_fd; } this.done = false; @@ -1116,7 +1222,7 @@ pub const FileSink = struct { return flushMaybePoll(this); } - fn adjustPipeLengthOnLinux(this: *FileSink, fd: JSC.Node.FileDescriptor, remain_len: usize) void { + fn adjustPipeLengthOnLinux(this: *FileSink, fd: bun.FileDescriptor, remain_len: usize) void { // On Linux, we can adjust the pipe size to avoid blocking. this.has_adjusted_pipe_size_on_linux = true; @@ -1135,7 +1241,7 @@ pub const FileSink = struct { } pub fn flushMaybePollWithSize(this: *FileSink, writable_size: usize) StreamResult.Writable { - std.debug.assert(this.fd != JSC.Node.invalid_fd); + std.debug.assert(this.fd != bun.invalid_fd); var total: usize = this.written; const initial = total; @@ -1180,6 +1286,12 @@ pub const FileSink = struct { break :brk writable_size; if (this.poll_ref) |poll| { + if (poll.isHUP()) { + this.done = true; + this.cleanup(); + return .{ .done = {} }; + } + if (poll.isWritable()) { break :brk this.max_write_size; } @@ -1288,11 +1400,11 @@ pub const FileSink = struct { } if (this.auto_truncate) - std.os.ftruncate(this.fd, total) catch {}; + std.os.ftruncate(fd, total) catch {}; if (this.auto_close) { - _ = JSC.Node.Syscall.close(this.fd); - this.fd = JSC.Node.invalid_fd; + _ = JSC.Node.Syscall.close(fd); + this.fd = bun.invalid_fd; } } this.pending.run(); @@ -1300,7 +1412,7 @@ pub const FileSink = struct { } pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, _: bool) JSC.Node.Maybe(JSValue) { - if (this.isPending()) { + if (this.isPending() or this.done) { return .{ .result = JSC.JSValue.jsUndefined() }; } const result = this.flush(); @@ -1320,13 +1432,15 @@ pub const FileSink = struct { poll.deinit(); } - if (this.fd != JSC.Node.invalid_fd) { - if (this.scheduled_count > 0) { - this.scheduled_count = 0; - } + if (this.auto_close) { + if (this.fd != bun.invalid_fd) { + if (this.scheduled_count > 0) { + this.scheduled_count = 0; + } - _ = JSC.Node.Syscall.close(this.fd); - this.fd = JSC.Node.invalid_fd; + _ = JSC.Node.Syscall.close(this.fd); + this.fd = bun.invalid_fd; + } } if (this.buffer.cap > 0) { @@ -1345,7 +1459,16 @@ pub const FileSink = struct { this.cleanup(); this.reachable_from_js = false; - if (!this.prevent_process_exit) + if (!this.isReachable()) + this.allocator.destroy(this); + } + + pub fn onHangup(this: *FileSink) void { + this.done = true; + this.signal.clear(); + this.cleanup(); + + if (!this.isReachable()) this.allocator.destroy(this); } @@ -1393,6 +1516,9 @@ pub const FileSink = struct { } pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.done) { + return .{ .done = {} }; + } const input = data.slice(); if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size) { @@ -1422,6 +1548,10 @@ pub const FileSink = struct { } pub const writeBytes = write; pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.done) { + return .{ .done = {} }; + } + const input = data.slice(); if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size and strings.isAllASCII(input)) { @@ -1450,6 +1580,10 @@ pub const FileSink = struct { return .{ .owned = len }; } pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.done) { + return .{ .done = {} }; + } + if (this.next) |*next| { return next.writeUTF16(data); } @@ -1465,11 +1599,38 @@ pub const FileSink = struct { } fn isPending(this: *const FileSink) bool { + if (this.done) return false; var poll_ref = this.poll_ref orelse return false; return poll_ref.isRegistered() and !poll_ref.flags.contains(.needs_rearm); } + pub fn close(this: *FileSink) void { + if (this.done) + return; + + this.done = true; + const fd = this.fd; + if (fd != bun.invalid_fd) { + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } + + this.fd = bun.invalid_fd; + if (this.auto_close) + _ = JSC.Node.Syscall.close(fd); + this.signal.close(null); + } + + this.pending.result = .done; + this.pending.run(); + } + pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { + if (this.done) { + return .{ .result = {} }; + } + if (this.next) |*next| { return next.end(err); } @@ -1501,6 +1662,11 @@ pub const FileSink = struct { std.debug.assert(this.next == null); this.requested_end = true; + if (this.fd == bun.invalid_fd) { + this.cleanup(); + return .{ .result = JSValue.jsNumber(this.written) }; + } + const flushed = this.flush(); if (flushed == .err) { @@ -2737,6 +2903,7 @@ pub fn ReadableStreamSource( pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamSource", .{std.mem.span(name_)}); pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); const arguments = callFrame.arguments(3); var this = arguments.ptr[0].asPtr(ReadableStreamSourceType); const view = arguments.ptr[1]; @@ -2750,6 +2917,7 @@ pub fn ReadableStreamSource( ); } pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); this.globalThis = globalThis; switch (this.startFromJS()) { @@ -2778,11 +2946,13 @@ pub fn ReadableStreamSource( } } pub fn cancel(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); this.cancel(); return JSC.JSValue.jsUndefined(); } pub fn setClose(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); this.close_ctx = this; this.close_handler = JSReadableStreamSource.onClose; @@ -2792,6 +2962,7 @@ pub fn ReadableStreamSource( } pub fn updateRef(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); const ref_or_unref = callFrame.argument(1).asBoolean(); this.setRef(ref_or_unref); @@ -2799,18 +2970,21 @@ pub fn ReadableStreamSource( } fn onClose(ptr: *anyopaque) void { + JSC.markBinding(@src()); var this = bun.cast(*ReadableStreamSourceType, ptr); _ = this.close_jsvalue.call(this.globalThis, &.{}); // this.closer } pub fn deinit(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); this.deinit(); return JSValue.jsUndefined(); } pub fn drain(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + JSC.markBinding(@src()); var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); var list = this.drain(); if (list.len > 0) { @@ -2844,15 +3018,8 @@ pub fn ReadableStreamSource( }); comptime { - if (!JSC.is_bindgen) { + if (!JSC.is_bindgen) @export(load, .{ .name = Export[0].symbol_name }); - _ = JSReadableStreamSource.pull; - _ = JSReadableStreamSource.start; - _ = JSReadableStreamSource.cancel; - _ = JSReadableStreamSource.setClose; - _ = JSReadableStreamSource.load; - _ = JSReadableStreamSource.deinit; - } } }; }; @@ -2955,6 +3122,15 @@ pub const ByteBlobLoader = struct { pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; +pub const PathOrFileDescriptor = union(enum) { + path: ZigString.Slice, + fd: bun.FileDescriptor, + + pub fn deinit(this: *const PathOrFileDescriptor) void { + if (this.* == .path) this.path.deinit(); + } +}; + pub const Pipe = struct { ctx: ?*anyopaque = null, onPipe: ?PipeFunction = null, @@ -2983,7 +3159,6 @@ pub const ByteStream = struct { }, has_received_last_chunk: bool = false, pending: StreamResult.Pending = StreamResult.Pending{ - .frame = undefined, .result = .{ .done = {} }, }, done: bool = false, @@ -3242,109 +3417,534 @@ pub const ByteStream = struct { ); }; -/// **Not** the Web "FileReader" API -pub const FileReader = struct { +pub const ReadResult = union(enum) { + pending: void, + err: Syscall.Error, + done: void, + read: []u8, + + pub fn toStream(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool) StreamResult { + return toStreamWithIsDone( + this, + pending, + buf, + view, + close_on_empty, + false, + ); + } + pub fn toStreamWithIsDone(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool, is_done: bool) StreamResult { + return switch (this) { + .pending => .{ .pending = pending }, + .err => .{ .err = this.err }, + .done => .{ .done = {} }, + .read => |slice| brk: { + const owned = slice.ptr != buf.ptr; + const done = is_done or (close_on_empty and slice.len == 0); + + break :brk if (owned and done) + StreamResult{ .owned_and_done = bun.ByteList.init(slice) } + else if (owned) + StreamResult{ .owned = bun.ByteList.init(slice) } + else if (done) + StreamResult{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, slice.len), .value = view } } + else + StreamResult{ .into_array = .{ .len = @truncate(Blob.SizeType, slice.len), .value = view } }; + }, + }; + } +}; + +pub const AutoSizer = struct { + buffer: *bun.ByteList, + allocator: std.mem.Allocator, + max: usize, + + pub fn resize(this: *AutoSizer, size: usize) ![]u8 { + const available = this.buffer.cap - this.buffer.len; + if (available >= size) return this.buffer.ptr[this.buffer.len..this.buffer.cap][0..size]; + const to_grow = size -| available; + if (to_grow + @as(usize, this.buffer.cap) > this.max) + return this.buffer.ptr[this.buffer.len..this.buffer.cap]; + + var list = this.buffer.listManaged(this.allocator); + const prev_len = list.items.len; + try list.ensureTotalCapacity(to_grow + @as(usize, this.buffer.cap)); + this.buffer.update(list); + return this.buffer.ptr[prev_len..@as(usize, this.buffer.cap)]; + } +}; + +pub const FIFO = struct { buf: []u8 = &[_]u8{}, view: JSC.Strong = .{}, - fd: JSC.Node.FileDescriptor = 0, - auto_close: bool = false, - loop: *JSC.EventLoop = undefined, - mode: JSC.Node.Mode = 0, - store: *Blob.Store, - total_read: Blob.SizeType = 0, - finalized: bool = false, - callback: anyframe = undefined, - buffered_data: bun.ByteList = .{}, - buffered_data_max: u32 = 0, + poll_ref: ?*JSC.FilePoll = null, + fd: bun.FileDescriptor = 0, + to_read: ?u32 = null, + close_on_empty_read: bool = false, + auto_sizer: ?*AutoSizer = null, pending: StreamResult.Pending = StreamResult.Pending{ - .frame = undefined, + .future = undefined, .state = .none, .result = .{ .done = {} }, }, - cancelled: bool = false, + signal: JSC.WebCore.Signal = .{}, + is_first_read: bool = true, + auto_close: bool = true, + + pub usingnamespace NewReadyWatcher(@This(), .readable, ready); + + pub fn finish(this: *FIFO) void { + this.close_on_empty_read = true; + if (this.poll_ref) |poll| { + poll.flags.insert(.hup); + } + + this.pending.result = .{ .done = {} }; + this.pending.run(); + } + + pub fn close(this: *FIFO) void { + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } + + const fd = this.fd; + if (fd != bun.invalid_fd) { + this.fd = bun.invalid_fd; + if (this.auto_close) + _ = JSC.Node.Syscall.close(fd); + this.signal.close(null); + } + + this.to_read = null; + this.pending.result = .{ .done = {} }; + + this.pending.run(); + } + + pub fn isClosed(this: *FIFO) bool { + return this.fd == bun.invalid_fd; + } + + pub fn getAvailableToReadOnLinux(this: *FIFO) u32 { + var len: c_int = 0; + const rc: c_int = std.c.ioctl(this.fd, std.os.linux.T.FIONREAD, &len); + if (rc != 0) { + len = 0; + } + + if (len > 0) { + if (this.poll_ref) |poll| { + poll.flags.insert(.readable); + } + } else { + if (this.poll_ref) |poll| { + poll.flags.remove(.readable); + } + + return @as(u32, 0); + } + + return @intCast(u32, @maximum(len, 0)); + } + + pub fn adjustCapacityOnLinux(this: *FIFO, current: u32, max: u32) u32 { + // we do not un-mark it as readable if there's nothing in the pipe + if (!this.has_adjusted_pipe_size_on_linux) { + if (current > 0 and max >= std.mem.page_size * 16) { + this.has_adjusted_pipe_size_on_linux = true; + _ = Syscall.setPipeCapacityOnLinux(this.fd, @minimum(max * 4, Syscall.getMaxPipeSizeOnLinux())); + } + } + } + + pub fn cannotRead(this: *FIFO, available: u32) ?ReadResult { + if (comptime Environment.isLinux) { + if (available > 0 and available != std.math.maxInt(u32)) { + return null; + } + } + + if (this.poll_ref) |poll| { + if (comptime Environment.isMac) { + if (available > 0 and available != std.math.maxInt(u32)) { + poll.flags.insert(.readable); + } + } + + const is_readable = poll.isReadable(); + if (!is_readable and (this.close_on_empty_read or poll.isHUP())) { + // it might be readable actually + this.close_on_empty_read = true; + if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) { + return null; + } + + return .done; + } else if (!is_readable and poll.isWatching()) { + // this happens if we've registered a watcher but we haven't + // ticked the event loop since registering it + if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) { + return null; + } + + return .pending; + } + } + + if (comptime Environment.isLinux) { + if (available == 0) { + std.debug.assert(this.poll_ref == null); + return .pending; + } + } else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) { + // we don't know if it's readable or not + if (!bun.isReadable(this.fd)) { + // we hung up + if (this.close_on_empty_read) + return .done; + + return .pending; + } + } + + return null; + } + + pub fn getAvailableToRead(this: *FIFO, size_or_offset: i64) ?u32 { + if (comptime Environment.isLinux) { + return this.getAvailableToReadOnLinux(); + } + + if (size_or_offset != std.math.maxInt(@TypeOf(size_or_offset))) + this.to_read = @intCast(u32, @maximum(size_or_offset, 0)); + + return this.to_read; + } + + pub fn ready(this: *FIFO, sizeOrOffset: i64) void { + const available_to_read = this.getAvailableToRead(sizeOrOffset); + if (this.isClosed()) { + this.unwatch(this.poll_ref.?.fd); + return; + } + + const read_result = this.read(this.buf, available_to_read); + if (read_result == .read and read_result.read.len == 0) { + this.unwatch(this.poll_ref.?.fd); + this.close(); + return; + } + + if (read_result == .read) { + if (this.to_read) |*to_read| { + to_read.* = to_read.* -| @truncate(u32, read_result.read.len); + } + } + + this.pending.result = read_result.toStream( + &this.pending, + this.buf, + this.view.get() orelse .zero, + this.close_on_empty_read, + ); + this.pending.run(); + } + + pub fn readFromJS( + this: *FIFO, + buf_: []u8, + view: JSValue, + globalThis: *JSC.JSGlobalObject, + ) StreamResult { + if (this.isClosed()) { + return .{ .done = {} }; + } + + if (!this.isWatching()) { + this.watch(this.fd); + } + + const read_result = this.read(buf_, this.to_read); + if (read_result == .read and read_result.read.len == 0) { + this.close(); + return .{ .done = {} }; + } + + if (read_result == .read) { + if (this.to_read) |*to_read| { + to_read.* = to_read.* -| @truncate(u32, read_result.read.len); + } + } + + if (read_result == .pending) { + this.buf = buf_; + this.view.set(globalThis, view); + if (!this.isWatching()) this.watch(this.fd); + std.debug.assert(this.isWatching()); + return .{ .pending = &this.pending }; + } + + return read_result.toStream(&this.pending, buf_, view, this.close_on_empty_read); + } + + pub fn read( + this: *FIFO, + buf_: []u8, + /// provided via kqueue(), only on macOS + kqueue_read_amt: ?u32, + ) ReadResult { + const available_to_read = this.getAvailableToRead( + if (kqueue_read_amt != null) + @intCast(i64, kqueue_read_amt.?) + else + std.math.maxInt(i64), + ); + + if (this.cannotRead(available_to_read orelse std.math.maxInt(u32))) |res| { + return switch (res) { + .pending => .{ .pending = {} }, + .done => .{ .done = {} }, + else => unreachable, + }; + } + + var buf = buf_; + + if (available_to_read) |amt| { + if (amt >= buf.len) { + if (comptime Environment.isLinux) { + this.adjustCapacityOnLinux(amt, buf.len); + } + + if (this.auto_sizer) |sizer| { + buf = sizer.resize(amt) catch buf_; + } + } + } + + return this.doRead(buf); + } + + fn doRead( + this: *FIFO, + buf: []u8, + ) ReadResult { + switch (Syscall.read(this.fd, buf)) { + .err => |err| { + const retry = std.os.E.AGAIN; + const errno = brk: { + const _errno = err.getErrno(); + if (comptime Environment.isLinux) { + // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already + // sent a 0 + // Let's retry later. + break :brk .AGAIN; + } + + break :brk _errno; + }; + + switch (errno) { + retry => { + return .{ .pending = {} }; + }, + else => {}, + } + + return .{ .err = err }; + }, + .result => |result| { + if (this.poll_ref) |poll| { + if (comptime Environment.isLinux) { + // do not insert .eof here + if (result < buf.len) + poll.flags.remove(.readable); + } else { + // Since we have no way of querying FIFO capacity + // its only okay to read when kqueue says its readable + // otherwise we might block the process + poll.flags.remove(.readable); + } + } + + if (result == 0) + return .{ .read = buf[0..0] }; + + return .{ .read = buf[0..result] }; + }, + } + } +}; + +pub const File = struct { + buf: []u8 = &[_]u8{}, + view: JSC.Strong = .{}, + + poll_ref: JSC.PollRef = .{}, + fd: bun.FileDescriptor = bun.invalid_fd, + concurrent: Concurrent = .{}, + loop: *JSC.EventLoop, + seekable: bool = false, + auto_close: bool = false, + remaining_bytes: Blob.SizeType = std.math.maxInt(Blob.SizeType), user_chunk_size: Blob.SizeType = 0, + total_read: Blob.SizeType = 0, + mode: JSC.Node.Mode = 0, + pending: StreamResult.Pending = .{}, scheduled_count: u32 = 0, - concurrent: Concurrent = Concurrent{}, - started: bool = false, - stored_global_this_: ?*JSC.JSGlobalObject = null, - poll_ref: ?*JSC.FilePoll = null, - has_adjusted_pipe_size_on_linux: bool = false, - is_fifo: bool = false, - finished: bool = false, - /// When we have some way of knowing that EOF truly is the write end of the - /// pipe being closed - /// Set this to true so we automatically mark it as done - /// This is used in Bun.spawn() to automatically close stdout and stderr - /// when the process exits - close_on_eof: bool = false, + pub fn close(this: *File) void { + if (this.auto_close) { + this.auto_close = false; + const fd = this.fd; + if (fd != bun.invalid_fd) { + this.fd = bun.invalid_fd; + _ = Syscall.close(fd); + } + } - signal: JSC.WebCore.Signal = .{}, + this.poll_ref.disable(); - pub usingnamespace NewReadyWatcher(@This(), .readable, ready); + this.view.clear(); + this.buf.len = 0; - pub inline fn globalThis(this: *FileReader) *JSC.JSGlobalObject { - return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis; + this.pending.result = .{ .done = {} }; + this.pending.run(); } - const run_on_different_thread_size = bun.huge_allocator_threshold; + pub fn deinit(this: *File) void { + this.close(); + } - pub const tag = ReadableStream.Tag.File; + pub fn isClosed(this: *const File) bool { + return this.fd == bun.invalid_fd; + } - pub fn setupWithPoll(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void { - store.ref(); - this.* = .{ - .loop = JSC.VirtualMachine.vm.eventLoop(), - .auto_close = store.data.file.pathlike == .path, - .store = store, - .user_chunk_size = chunk_size, - .poll_ref = poll, + fn calculateChunkSize(this: *File, available_to_read: usize) usize { + const chunk_size: usize = if (this.user_chunk_size > 0) + @as(usize, this.user_chunk_size) + else if (this.isSeekable()) + @as(usize, default_file_chunk_size) + else + @as(usize, default_fifo_chunk_size); + + return if (this.remaining_bytes > 0 and this.isSeekable()) + if (available_to_read != std.math.maxInt(usize)) + @minimum(chunk_size, available_to_read) + else + @minimum(this.remaining_bytes -| this.total_read, chunk_size) + else + @minimum(available_to_read, chunk_size); + } + + pub fn start( + this: *File, + file: *Blob.FileStore, + ) StreamStart { + var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + var auto_close = file.pathlike == .path; + + var fd = if (!auto_close) + file.pathlike.fd + else switch (Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) { + .result => |_fd| _fd, + .err => |err| { + return .{ .err = err.withPath(file.pathlike.path.slice()) }; + }, }; - if (this.poll_ref) |poll_| { - poll_.owner.set(this); + + if (!auto_close) { + // ensure we have non-blocking IO set + switch (Syscall.fcntl(fd, std.os.F.GETFL, 0)) { + .err => return .{ .err = Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }, + .result => |flags| { + // if we do not, clone the descriptor and set non-blocking + // it is important for us to clone it so we don't cause Weird Things to happen + if ((flags & std.os.O.NONBLOCK) == 0) { + auto_close = true; + fd = switch (Syscall.fcntl(fd, std.os.F.DUPFD, 0)) { + .result => |_fd| @intCast(@TypeOf(fd), _fd), + .err => |err| return .{ .err = err }, + }; + + switch (Syscall.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK)) { + .err => |err| return .{ .err = err }, + .result => |_| {}, + } + } + }, + } } - } - pub fn setup(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType) void { - this.setupWithPoll(store, chunk_size, null); - } + const stat: std.os.Stat = switch (Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + if (auto_close) { + _ = Syscall.close(fd); + } + return .{ .err = err }; + }, + }; - pub fn finish(this: *FileReader) void { - if (this.finished) return; - this.finished = true; - this.close_on_eof = true; - - // we are done - // resolve any promises with done - // but there could still be data in the pipe - // so we shouldn't actually end it, just means that we no longer will retry on EGAGAIN - if (this.pending.state == .pending) { - if (this.buffered_data.len > 0) { - this.pending.result = .{ .owned = this.buffered_data }; - this.buffered_data = .{}; - } else { - this.pending.result = .{ .done = {} }; + if (std.os.S.ISDIR(stat.mode)) { + if (auto_close) { + _ = Syscall.close(fd); + } + return .{ .err = Syscall.Error.fromCode(.ISDIR, .fstat) }; + } + + if (std.os.S.ISSOCK(stat.mode)) { + if (auto_close) { + _ = Syscall.close(fd); + } + return .{ .err = Syscall.Error.fromCode(.INVAL, .fstat) }; + } + + file.mode = @intCast(JSC.Node.Mode, stat.mode); + this.mode = file.mode; + + this.seekable = std.os.S.ISREG(stat.mode); + file.seekable = this.seekable; + + if (this.seekable) { + this.remaining_bytes = @intCast(Blob.SizeType, stat.size); + + if (this.remaining_bytes == 0) { + if (auto_close) { + _ = Syscall.close(fd); + } + + return .{ .empty = {} }; } - this.pending.run(); } + + this.fd = fd; + this.auto_close = auto_close; + + return StreamStart{ .ready = {} }; + } + + pub fn isSeekable(this: File) bool { + return this.seekable; } const Concurrent = struct { read: Blob.SizeType = 0, task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback }, completion: AsyncIO.Completion = undefined, - read_frame: anyframe = undefined, chunk_size: Blob.SizeType = 0, main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null }, concurrent_task: JSC.ConcurrentTask = .{}, pub fn taskCallback(task: *NetworkThread.Task) void { - var this = @fieldParentPtr(FileReader, "concurrent", @fieldParentPtr(Concurrent, "task", task)); - var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; - _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this}); + var this = @fieldParentPtr(File, "concurrent", @fieldParentPtr(Concurrent, "task", task)); + runAsync(this); } - pub fn onRead(this: *FileReader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void { + pub fn onRead(this: *File, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void { this.concurrent.read = @truncate(Blob.SizeType, result catch |err| { if (@hasField(HTTPClient.NetworkThread.Completion, "result")) { this.pending.result = .{ @@ -3363,14 +3963,14 @@ pub const FileReader = struct { }; } this.concurrent.read = 0; - resume this.concurrent.read_frame; + scheduleMainThreadTask(this); return; }); - resume this.concurrent.read_frame; + scheduleMainThreadTask(this); } - pub fn scheduleRead(this: *FileReader) void { + pub fn scheduleRead(this: *File) void { if (comptime Environment.isMac) { var remaining = this.buf[this.concurrent.read..]; @@ -3408,7 +4008,7 @@ pub const FileReader = struct { } AsyncIO.global.read( - *FileReader, + *File, this, onRead, &this.concurrent.completion, @@ -3417,602 +4017,345 @@ pub const FileReader = struct { null, ); - suspend { - var _frame = @frame(); - var this_frame = bun.default_allocator.create(std.meta.Child(@TypeOf(_frame))) catch unreachable; - this_frame.* = _frame.*; - this.concurrent.read_frame = this_frame; - } - scheduleMainThreadTask(this); } pub fn onJSThread(task_ctx: *anyopaque) void { - var this: *FileReader = bun.cast(*FileReader, task_ctx); + var this: *File = bun.cast(*File, task_ctx); const view = this.view.get().?; defer this.view.clear(); - if (this.finalized and this.scheduled_count > 0) { - this.pending.run(); - this.scheduled_count -= 1; - + if (this.isClosed()) { this.deinit(); return; } - if (this.pending.state == .pending and this.pending.result == .err and this.concurrent.read == 0) { - resume this.pending.frame; - this.scheduled_count -= 1; - this.finalize(); - return; - } - if (this.concurrent.read == 0) { this.pending.result = .{ .done = {} }; - resume this.pending.frame; - this.scheduled_count -= 1; - this.finalize(); - return; + } else if (view != .zero) { + this.pending.result = .{ + .into_array = .{ + .value = view, + .len = @truncate(Blob.SizeType, this.concurrent.read), + }, + }; + } else { + this.pending.result = .{ + .owned = bun.ByteList.init(this.buf), + }; } - this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read), view, false, this.buf); this.pending.run(); - this.scheduled_count -= 1; - if (this.pending.result.isDone()) { - this.finalize(); - } } - pub fn scheduleMainThreadTask(this: *FileReader) void { + pub fn scheduleMainThreadTask(this: *File) void { this.concurrent.main_thread_task.ctx = this; this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task)); } - fn runAsync(this: *FileReader) void { + fn runAsync(this: *File) void { this.concurrent.read = 0; Concurrent.scheduleRead(this); - - suspend { - bun.default_allocator.destroy(@frame()); - } } }; - pub fn scheduleAsync(this: *FileReader, chunk_size: Blob.SizeType) void { + pub fn scheduleAsync( + this: *File, + chunk_size: Blob.SizeType, + globalThis: *JSC.JSGlobalObject, + ) void { this.scheduled_count += 1; - this.pollRef().ref(this.globalThis().bunVM()); - std.debug.assert(this.started); + this.poll_ref.ref(globalThis.bunVM()); NetworkThread.init() catch {}; + this.concurrent.chunk_size = chunk_size; NetworkThread.global.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 }); } - // macOS default pipe size is page_size, 16k, or 64k. It changes based on how much was written - // Linux default pipe size is 16 pages of memory - const default_fifo_chunk_size = 64 * 1024; - const default_file_chunk_size = 1024 * 1024 * 2; - - pub fn onStart(this: *FileReader) StreamStart { - var file = &this.store.data.file; - std.debug.assert(!this.started); - this.started = true; - var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; - var auto_close = this.auto_close; - defer this.auto_close = auto_close; - var fd = if (!auto_close) - file.pathlike.fd - else switch (Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) { - .result => |_fd| _fd, - .err => |err| { - this.deinit(); - return .{ .err = err.withPath(file.pathlike.path.slice()) }; - }, - }; - - if (this.poll_ref != null or this.is_fifo) { - file.seekable = false; - this.is_fifo = true; - if (this.poll_ref) |poll| - std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd)); - } else { - if (!auto_close) { - // ensure we have non-blocking IO set - switch (Syscall.fcntl(fd, std.os.F.GETFL, 0)) { - .err => return .{ .err = Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }, - .result => |flags| { - // if we do not, clone the descriptor and set non-blocking - // it is important for us to clone it so we don't cause Weird Things to happen - if ((flags & std.os.O.NONBLOCK) == 0) { - auto_close = true; - fd = switch (Syscall.fcntl(fd, std.os.F.DUPFD, 0)) { - .result => |_fd| @intCast(@TypeOf(fd), _fd), - .err => |err| return .{ .err = err }, - }; - - switch (Syscall.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK)) { - .err => |err| return .{ .err = err }, - .result => |_| {}, - } - } - }, - } - } - - const stat: std.os.Stat = switch (Syscall.fstat(fd)) { - .result => |result| result, - .err => |err| { - if (auto_close) { - _ = Syscall.close(fd); - } - this.deinit(); - return .{ .err = err.withPath(file.pathlike.path.slice()) }; - }, - }; - - if (std.os.S.ISDIR(stat.mode)) { - const err = Syscall.Error.fromCode(.ISDIR, .fstat); - if (auto_close) { - _ = Syscall.close(fd); - } - this.deinit(); - return .{ .err = err }; - } - - if (std.os.S.ISSOCK(stat.mode)) { - const err = Syscall.Error.fromCode(.INVAL, .fstat); + pub fn read(this: *File, buf: []u8) ReadResult { + if (this.fd == bun.invalid_fd) + return .{ .done = {} }; - if (auto_close) { - _ = Syscall.close(fd); - } - this.deinit(); - return .{ .err = err }; - } + if (this.seekable and this.remaining_bytes == 0) + return .{ .done = {} }; - file.seekable = std.os.S.ISREG(stat.mode); - file.mode = @intCast(JSC.Node.Mode, stat.mode); - this.mode = file.mode; - this.is_fifo = std.os.S.ISFIFO(stat.mode); + return this.doRead(buf); + } - if (file.seekable orelse false) - file.max_size = @intCast(Blob.SizeType, stat.size); + pub fn readFromJS(this: *File, buf: []u8, view: JSValue, globalThis: *JSC.JSGlobalObject) StreamResult { + const read_result = this.read(buf); + if (read_result == .read and read_result.read.len == 0) { + this.close(); + return .{ .done = {} }; } - if ((file.seekable orelse false) and file.max_size == 0) { - if (auto_close) { - _ = Syscall.close(fd); - } - this.deinit(); - return .{ .empty = {} }; + if (read_result == .read) { + this.remaining_bytes -|= @intCast(Blob.SizeType, read_result.read.len); } - this.fd = fd; - this.auto_close = auto_close; - - const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); - this.signal.start(); - return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) }; - } - - fn calculateChunkSize(this: *FileReader, available_to_read: usize) usize { - const file = &this.store.data.file; + if (read_result == .pending) { + if (this.scheduled_count == 0) { + this.buf = buf; + this.view.set(globalThis, view); + this.scheduleAsync(@truncate(Blob.SizeType, buf.len), globalThis); + } - const chunk_size: usize = if (this.user_chunk_size > 0) - @as(usize, this.user_chunk_size) - else if (file.seekable orelse false) - @as(usize, default_file_chunk_size) - else - @as(usize, default_fifo_chunk_size); + return .{ .pending = &this.pending }; + } - return if (file.max_size > 0) - if (available_to_read != std.math.maxInt(usize)) @minimum(chunk_size, available_to_read) else @minimum(@maximum(this.total_read, file.max_size) - this.total_read, chunk_size) - else - @minimum(available_to_read, chunk_size); + return read_result.toStream(&this.pending, buf, view, false); } - pub fn onPullInto(this: *FileReader, buffer: []u8, view: JSC.JSValue) StreamResult { - const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); - std.debug.assert(this.started); - - switch (chunk_size) { - 0 => { - std.debug.assert(this.store.data.file.seekable orelse false); - this.finalize(); - return .{ .done = {} }; - }, - run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => { - if (!this.isFIFO()) { - this.view.set(this.globalThis(), view); - // should never be reached - this.pending.result = .{ - .err = Syscall.Error.todo, - }; - this.buf = buffer; - - this.scheduleAsync(@truncate(Blob.SizeType, chunk_size)); + pub fn doRead(this: *File, buf: []u8) ReadResult { + switch (Syscall.read(this.fd, buf)) { + .err => |err| { + const retry = std.os.E.AGAIN; + const errno = err.getErrno(); - return .{ .pending = &this.pending }; + switch (errno) { + retry => { + return .{ .pending = {} }; + }, + else => { + return .{ .err = err }; + }, } }, - else => {}, - } + .result => |result| { + this.remaining_bytes -|= @truncate(@TypeOf(this.remaining_bytes), result); - return this.read(buffer, view, null); - } + if (result == 0) { + return .{ .done = {} }; + } - fn maybeAutoClose(this: *FileReader) void { - if (this.auto_close) { - _ = Syscall.close(this.fd); - this.auto_close = false; + return .{ .read = buf[0..result] }; + }, } } +}; - fn handleReadChunk(this: *FileReader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult { - std.debug.assert(this.started); - - this.total_read += @intCast(Blob.SizeType, result); - const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false) - this.store.data.file.max_size -| this.total_read - else - @as(Blob.SizeType, std.math.maxInt(Blob.SizeType)); +// macOS default pipe size is page_size, 16k, or 64k. It changes based on how much was written +// Linux default pipe size is 16 pages of memory +const default_fifo_chunk_size = 64 * 1024; +const default_file_chunk_size = 1024 * 1024 * 2; - // this handles: - // - empty file - // - stream closed for some reason - // - FIFO returned EOF - if ((result == 0 and (remaining == 0 or this.close_on_eof))) { - this.finalize(); - return .{ .done = {} }; - } +/// **Not** the Web "FileReader" API +pub const FileReader = struct { + buffered_data: bun.ByteList = .{}, - const has_more = remaining > 0; + total_read: Blob.SizeType = 0, + max_read: Blob.SizeType = 0, - if (!has_more) { - if (owned) { - return .{ .owned_and_done = bun.ByteList.init(buf) }; - } - return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result), .value = view } }; - } + cancelled: bool = false, + started: bool = false, + stored_global_this_: ?*JSC.JSGlobalObject = null, + user_chunk_size: Blob.SizeType = 0, + lazy_readable: Readable.Lazy = undefined, - if (owned) { - return .{ .owned = bun.ByteList.init(buf) }; + pub fn setSignal(this: *FileReader, signal: Signal) void { + switch (this.lazy_readable) { + .readable => { + if (this.lazy_readable.readable == .FIFO) + this.lazy_readable.readable.FIFO.signal = signal; + }, + else => {}, } - - return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result), .value = view } }; } - pub fn read( - this: *FileReader, - read_buf: []u8, - view: JSC.JSValue, - /// provided via kqueue(), only on macOS - available_to_read: ?c_int, - ) StreamResult { - std.debug.assert(this.started); - std.debug.assert(read_buf.len > 0); - - const fd = this.fd; - - if (fd == JSC.Node.invalid_fd) { - std.debug.assert(this.poll_ref == null); - return .{ .done = {} }; - } - - var buf_to_use = read_buf; - var free_buffer_on_error: bool = false; - var pipe_is_empty_on_linux = false; - var len: c_int = available_to_read orelse 0; - - // if it's a pipe, we really don't know what to expect what the max size will be - // if the pipe is sending us WAY bigger data than what we can fit in the buffer - // we allocate a new buffer of up to 4 MB - if (this.isFIFO() and view != .zero) { - outer: { - // macOS FIONREAD doesn't seem to work here - // Kernel code implies it only is enabled for FIFOs which exist - // in the filesystem - if (comptime Environment.isLinux) { - if (len == 0) { - const rc: c_int = std.c.ioctl(fd, std.os.linux.T.FIONREAD, &len); - if (rc != 0) { - len = 0; - } + pub fn readable(this: *FileReader) *Readable { + return &this.lazy_readable.readable; + } - if (len > 0) { - if (this.poll_ref) |poll| { - poll.flags.insert(.readable); - } - } else { - if (this.poll_ref) |poll| { - poll.flags.remove(.readable); - } + pub const Readable = union(enum) { + FIFO: FIFO, + File: File, - pipe_is_empty_on_linux = true; - } + pub const Lazy = union(enum) { + readable: Readable, + blob: *Blob.Store, + empty: void, - // we do not un-mark it as readable if there's nothing in the pipe - if (!this.has_adjusted_pipe_size_on_linux) { - if (len > 0 and buf_to_use.len >= std.mem.page_size * 16) { - this.has_adjusted_pipe_size_on_linux = true; - _ = Syscall.setPipeCapacityOnLinux(fd, @minimum(buf_to_use.len * 4, Syscall.getMaxPipeSizeOnLinux())); - } - } - } - } - if (len > read_buf.len * 10 and read_buf.len < std.mem.page_size) { - // then we need to allocate a buffer - // to read into - // this - buf_to_use = bun.default_allocator.alloc( - u8, - @intCast( - usize, - @minimum( - len, - 1024 * 1024 * 4, - ), - ), - ) catch break :outer; - free_buffer_on_error = true; + pub fn finish(this: *Lazy) void { + switch (this.readable) { + .FIFO => { + this.readable.FIFO.finish(); + }, + .File => {}, } } - } - if (this.poll_ref) |poll| { - if (comptime Environment.isMac) { - if ((available_to_read orelse 0) > 0) { - poll.flags.insert(.readable); + pub fn isClosed(this: *Lazy) bool { + switch (this.*) { + .empty, .blob => { + return true; + }, + .readable => { + return this.readable.isClosed(); + }, } } - const is_readable = poll.isReadable(); - if (!is_readable and poll.isEOF()) { - if (poll.isHUP()) { - this.finalize(); - } - - return .{ .done = {} }; - } else if (!is_readable and poll.isHUP()) { - this.finalize(); - return .{ .done = {} }; - } else if (!is_readable) { - if (this.finished) { - this.finalize(); - return .{ .done = {} }; - } - - if (view != .zero) { - this.view.set(this.globalThis(), view); - this.buf = read_buf; - if (!this.isWatching()) - this.watch(fd); + pub fn deinit(this: *Lazy) void { + switch (this.*) { + .blob => |blob| { + blob.deref(); + }, + .readable => { + this.readable.deinit(); + }, + .empty => {}, } - - return .{ - .pending = &this.pending, - }; + this.* = .{ .empty = {} }; } - } - - if (comptime Environment.isLinux) { - if (pipe_is_empty_on_linux) { - std.debug.assert(this.poll_ref == null); - if (view != .zero) { - this.view.set(this.globalThis(), view); - this.buf = read_buf; - } + }; - this.watch(fd); - return .{ - .pending = &this.pending, - }; + pub fn deinit(this: *Readable) void { + switch (this.*) { + .FIFO => { + this.FIFO.close(); + }, + .File => { + this.File.deinit(); + }, } - } else if (this.isFIFO() and this.poll_ref == null and available_to_read == null) { - // we don't know if it's readable or not - if (!bun.isReadable(fd)) { - if (free_buffer_on_error) { - bun.default_allocator.free(buf_to_use); - buf_to_use = read_buf; - } - - if (view != .zero) { - this.view.set(this.globalThis(), view); - this.buf = read_buf; - } + } - this.watch(fd); - return .{ - .pending = &this.pending, - }; + pub fn isClosed(this: *Readable) bool { + switch (this.*) { + .FIFO => { + return this.FIFO.isClosed(); + }, + .File => { + return this.File.isClosed(); + }, } } - switch (Syscall.read(fd, buf_to_use)) { - .err => |err| { - const retry = std.os.E.AGAIN; - const errno = brk: { - const _errno = err.getErrno(); - if (comptime Environment.isLinux) { - // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already - // sent a 0 - // Let's retry later. - if (this.isFIFO() and - !this.close_on_eof and _errno == .PERM) - { - break :brk .AGAIN; - } + pub fn close(this: *Readable) void { + switch (this.*) { + .FIFO => { + this.FIFO.close(); + }, + .File => { + if (this.File.concurrent) |concurrent| { + this.File.concurrent = null; + concurrent.close(); } - break :brk _errno; - }; - - switch (errno) { - retry => { - if (this.finished) { - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); - } - - return .{ .done = {} }; - } + this.File.close(); + }, + } + } - if (free_buffer_on_error) { - bun.default_allocator.free(buf_to_use); - buf_to_use = read_buf; - } + pub fn read( + this: *Readable, + read_buf: []u8, + view: JSC.JSValue, + global: *JSC.JSGlobalObject, + ) StreamResult { + return switch (std.meta.activeTag(this.*)) { + .FIFO => this.FIFO.readFromJS(read_buf, view, global), + .File => this.File.readFromJS(read_buf, view, global), + }; + } - if (view != .zero) { - this.view.set(this.globalThis(), view); - this.buf = read_buf; - if (!this.isWatching()) - this.watch(this.fd); - } + pub fn isSeekable(this: Readable) bool { + if (this == .File) { + return this.File.isSeekable(); + } - return .{ - .pending = &this.pending, - }; - }, - else => {}, - } - const sys = if (this.store.data.file.pathlike == .path and this.store.data.file.pathlike.path.slice().len > 0) - err.withPath(this.store.data.file.pathlike.path.slice()) - else - err; + return false; + } - this.finalize(); - return .{ .err = sys }; - }, - .result => |result| { - if (this.isFIFO()) { - if (this.poll_ref) |poll| { - if (comptime Environment.isLinux) { - // do not insert .eof here - if (result < buf_to_use.len) - poll.flags.remove(.readable); - } else { - // Since we have no way of querying FIFO capacity - // its only okay to read when kqueue says its readable - // otherwise we might block the process - poll.flags.remove(.readable); - } - } + pub fn watch(this: *Readable) void { + switch (this.*) { + .FIFO => { + if (!this.FIFO.isWatching()) + this.FIFO.watch(this.FIFO.fd); + }, + } + } + }; - if (!this.finished and !this.isWatching()) - this.watch(fd); - } + pub inline fn globalThis(this: *FileReader) *JSC.JSGlobalObject { + return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis; + } - if (result == 0 and free_buffer_on_error) { - bun.default_allocator.free(buf_to_use); - buf_to_use = read_buf; - } else if (free_buffer_on_error) { - this.view.clear(); - this.buf = &.{}; - return this.handleReadChunk(result, view, true, buf_to_use); - } + const run_on_different_thread_size = bun.huge_allocator_threshold; - if (result == 0 and this.isFIFO() and view != .zero) { - this.view.set(this.globalThis(), view); - this.buf = read_buf; - return .{ - .pending = &this.pending, - }; - } + pub const tag = ReadableStream.Tag.File; - return this.handleReadChunk(result, view, false, buf_to_use); + pub fn fromReadable(this: *FileReader, chunk_size: Blob.SizeType, readable_: *Readable) void { + this.* = .{ + .lazy_readable = .{ + .readable = readable_.*, }, - } + }; + this.user_chunk_size = chunk_size; } - /// Called from Poller - pub fn ready(this: *FileReader, sizeOrOffset: i64) void { - const view = this.view.get() orelse .zero; - defer this.view.clear(); + pub fn finish(this: *FileReader) void { + this.lazy_readable.finish(); + } - const available_to_read: usize = if (comptime Environment.isMac) brk: { - if (this.isFIFO()) { - break :brk @intCast(usize, @maximum(sizeOrOffset, 0)); - } else if (std.os.S.ISREG(this.mode)) { - // Returns when the file pointer is not at the end of - // file. data contains the offset from current position - // to end of file, and may be negative. - break :brk @intCast(usize, @maximum(sizeOrOffset, 0)); - } - break :brk std.math.maxInt(usize); - } else std.math.maxInt(usize); - if (this.finalized and this.scheduled_count == 0) { - if (this.pending.state == .pending) { - // should never be reached - this.pending.result = .{ - .err = Syscall.Error.todo, - }; - resume this.pending.frame; - } - this.deinit(); - return; - } + pub fn onStart(this: *FileReader) StreamStart { + if (!this.started) { + this.started = true; + + switch (this.lazy_readable) { + .blob => |blob| { + defer blob.deref(); + var readable_file: File = .{ .loop = this.globalThis().bunVM().eventLoop() }; + const result = readable_file.start(&blob.data.file); + if (result != .ready) { + return result; + } - // If we do nothing here, stop watching the file descriptor - var unschedule = this.poll_ref != null; - defer { - if (unschedule) { - if (this.poll_ref) |ref| { - _ = ref.unregister(this.globalThis().bunVM().uws_event_loop.?); - } + if (std.os.S.ISFIFO(readable_file.mode)) { + this.lazy_readable = .{ + .readable = .{ + .FIFO = FIFO{ + .fd = readable_file.fd, + .auto_close = readable_file.auto_close, + }, + }, + }; + } else { + this.lazy_readable = .{ + .readable = .{ .File = readable_file }, + }; + } + }, + .readable => {}, + .empty => return .{ .empty = {} }, } } - if (this.cancelled) { - return; - } - if (this.buf.len == 0) { - return; - } else { - this.buf.len = @minimum(this.buf.len, available_to_read); + if (this.readable().* == .File) { + const chunk_size = this.readable().File.calculateChunkSize(std.math.maxInt(usize)); + return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) }; } - this.pending.result = this.read( - this.buf, - view, - if (available_to_read == std.math.maxInt(usize)) - null - else - @truncate(c_int, @intCast(isize, available_to_read)), - ); - unschedule = false; - this.pending.run(); + return .{ .chunk_size = if (this.user_chunk_size == 0) default_fifo_chunk_size else this.user_chunk_size }; } - pub fn finalize(this: *FileReader) void { - if (this.finalized) - return; - - this.signal.close(null); - - if (this.buffered_data.cap > 0) { - this.buffered_data.listManaged(bun.default_allocator).deinit(); - this.buffered_data.cap = 0; - } - - this.finished = true; + pub fn onPullInto(this: *FileReader, buffer: []u8, view: JSC.JSValue) StreamResult { + std.debug.assert(this.started); + return this.readable().read(buffer, view, this.globalThis()); + } - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); + fn isFIFO(this: *const FileReader) bool { + if (this.lazy_readable == .readable) { + return this.lazy_readable.readable == .FIFO; } - this.finalized = true; - - this.pending.result = .{ .done = {} }; - this.pending.run(); - - this.view.deinit(); - this.buf = &.{}; - - this.maybeAutoClose(); + return false; + } - this.store.deref(); + pub fn finalize(this: *FileReader) void { + this.lazy_readable.deinit(); } pub fn onCancel(this: *FileReader) void { @@ -4022,7 +4365,7 @@ pub const FileReader = struct { pub fn deinit(this: *FileReader) void { this.finalize(); - if (this.scheduled_count == 0 and this.pending.state == .pending) { + if (this.lazy_readable.isClosed()) { this.destroy(); } } @@ -4032,18 +4375,30 @@ pub const FileReader = struct { } pub fn setRefOrUnref(this: *FileReader, value: bool) void { - if (this.poll_ref) |poll| { - if (value) { - poll.enableKeepingProcessAlive(this.globalThis().bunVM()); - } else { - poll.disableKeepingProcessAlive(this.globalThis().bunVM()); + if (this.lazy_readable == .readable) { + switch (this.lazy_readable.readable) { + .FIFO => { + if (this.lazy_readable.readable.FIFO.poll_ref) |poll| { + if (value) { + poll.enableKeepingProcessAlive(this.globalThis().bunVM()); + } else { + poll.disableKeepingProcessAlive(this.globalThis().bunVM()); + } + } + }, + .File => { + if (value) + this.lazy_readable.readable.File.poll_ref.ref(JSC.VirtualMachine.vm) + else + this.lazy_readable.readable.File.poll_ref.unref(JSC.VirtualMachine.vm); + }, } } } pub fn drainInternalBuffer(this: *FileReader) bun.ByteList { - var buffered = this.buffered_data; - if (buffered.len > 0) { + const buffered = this.buffered_data; + if (buffered.cap > 0) { this.buffered_data = .{}; } @@ -4093,8 +4448,9 @@ pub fn NewReadyWatcher( ready(this, sizeOrOffset); } - pub fn unwatch(this: *Context, fd: JSC.Node.FileDescriptor) void { - std.debug.assert(@intCast(JSC.Node.FileDescriptor, this.poll_ref.?.fd) == fd); + pub fn unwatch(this: *Context, fd_: anytype) void { + const fd = @intCast(c_int, fd_); + std.debug.assert(@intCast(c_int, this.poll_ref.?.fd) == fd); std.debug.assert( this.poll_ref.?.unregister(JSC.VirtualMachine.vm.uws_event_loop.?) == .result, ); @@ -4121,7 +4477,8 @@ pub fn NewReadyWatcher( return false; } - pub fn watch(this: *Context, fd: JSC.Node.FileDescriptor) void { + pub fn watch(this: *Context, fd_: anytype) void { + const fd = @intCast(c_int, fd_); var poll_ref: *JSC.FilePoll = this.poll_ref orelse brk: { this.poll_ref = JSC.FilePoll.init( JSC.VirtualMachine.vm, @@ -4133,6 +4490,7 @@ pub fn NewReadyWatcher( break :brk this.poll_ref.?; }; std.debug.assert(poll_ref.fd == fd); + std.debug.assert(!this.isWatching()); switch (poll_ref.register(JSC.VirtualMachine.vm.uws_event_loop.?, flag, true)) { .err => |err| { bun.unreachablePanic("FilePoll.register failed: {d}", .{err.errno}); @@ -4154,3 +4512,4 @@ pub fn NewReadyWatcher( // pub fn onError(this: *Streamer): anytype, // }; // } + |