diff options
-rw-r--r-- | src/bun.js/api/bun/spawn.zig | 13 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 199 | ||||
-rw-r--r-- | src/bun.js/base.zig | 546 | ||||
-rw-r--r-- | src/bun.js/bindings/wtf-bindings.cpp | 22 | ||||
-rw-r--r-- | src/bun.js/child_process.exports.js | 53 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 351 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 1 | ||||
-rw-r--r-- | src/bun.js/node/node_fs.zig | 8 | ||||
-rw-r--r-- | src/bun.js/node/syscall.zig | 1 | ||||
-rw-r--r-- | src/bun.js/rare_data.zig | 10 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 300 | ||||
-rw-r--r-- | src/global.zig | 2 | ||||
-rw-r--r-- | src/hive_array.zig | 28 | ||||
-rw-r--r-- | src/http_client_async.zig | 5 | ||||
-rw-r--r-- | src/jsc.zig | 1 | ||||
-rw-r--r-- | src/report.zig | 2 | ||||
-rw-r--r-- | src/tagged_pointer.zig | 4 | ||||
-rw-r--r-- | test/bun.js/child_process-node.test.js | 241 | ||||
-rw-r--r-- | test/bun.js/child_process.test.ts | 33 | ||||
-rw-r--r-- | test/bun.js/filesink.test.ts | 133 | ||||
-rw-r--r-- | test/bun.js/streams.test.js | 3 |
22 files changed, 1251 insertions, 707 deletions
diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index c1deebd3a..d594d44a7 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -208,8 +208,17 @@ pub const PosixSpawn = struct { envp, ); - if (Maybe(pid_t).errno(rc)) |err| { - return err; + if (comptime bun.Environment.isLinux) { + // rc is negative because it's libc errno + if (rc > 0) { + if (Maybe(pid_t).errnoSysP(-rc, .posix_spawn, path)) |err| { + return err; + } + } + } else { + if (Maybe(pid_t).errnoSysP(rc, .posix_spawn, path)) |err| { + return err; + } } return Maybe(pid_t){ .result = pid }; diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index c82b4744f..dcfa88a40 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -31,7 +31,7 @@ pub const Subprocess = struct { killed: bool = false, reffer: JSC.Ref = JSC.Ref.init(), - poll_ref: JSC.PollRef = JSC.PollRef.init(), + poll_ref: ?*JSC.FilePoll = null, exit_promise: JSC.Strong = .{}, @@ -56,7 +56,7 @@ pub const Subprocess = struct { pub fn ref(this: *Subprocess) void { this.reffer.ref(this.globalThis.bunVM()); - this.poll_ref.ref(this.globalThis.bunVM()); + if (this.poll_ref) |poll| poll.ref(this.globalThis.bunVM()); } pub fn unref(this: *Subprocess) void { @@ -96,7 +96,7 @@ pub const Subprocess = struct { return; } - if (this.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.buffer.fd != JSC.Node.invalid_fd) { this.buffer.close(); } } @@ -184,7 +184,7 @@ pub const Subprocess = struct { // TODO: handle when there's pending unread data in the pipe // For some reason, this currently hangs forever - if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) { if (this.pipe.buffer.canRead()) this.pipe.buffer.readIfPossible(true); } @@ -349,8 +349,8 @@ pub const Subprocess = struct { pub const BufferedInput = struct { remain: []const u8 = "", - fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), - poll_ref: JSC.PollRef = .{}, + fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, + poll_ref: ?*JSC.FilePoll = null, written: usize = 0, source: union(enum) { @@ -358,14 +358,23 @@ pub const Subprocess = struct { array_buffer: JSC.ArrayBuffer.Strong, }, - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady); + pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady); pub fn onReady(this: *BufferedInput, _: i64) void { this.write(); } pub fn canWrite(this: *BufferedInput) bool { - return bun.isWritable(this.fd); + const is_writable = bun.isWritable(this.fd); + if (is_writable) { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.writable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_writable)); + } + } + + return is_writable; } pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void { @@ -376,6 +385,7 @@ pub const Subprocess = struct { // because we don't want to block the thread waiting for the write if (!this.canWrite()) { this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } } @@ -387,7 +397,6 @@ pub const Subprocess = struct { var to_write = this.remain; if (to_write.len == 0) { - if (this.poll_ref.isActive()) this.unwatch(this.fd); // we are done! this.closeFDIfOpen(); return; @@ -406,6 +415,7 @@ pub const Subprocess = struct { }); this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } @@ -445,11 +455,14 @@ pub const Subprocess = struct { } fn closeFDIfOpen(this: *BufferedInput) void { - if (this.poll_ref.isActive()) this.unwatch(this.fd); + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } - if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.fd != JSC.Node.invalid_fd) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; } } @@ -470,12 +483,12 @@ pub const Subprocess = struct { pub const BufferedOutput = struct { internal_buffer: bun.ByteList = .{}, max_internal_buffer: u32 = default_max_buffer_size, - fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, received_eof: bool = false, pending_error: ?JSC.Node.Syscall.Error = null, - poll_ref: JSC.PollRef = .{}, + poll_ref: ?*JSC.FilePoll = null, - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .read, ready); + pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready); pub fn ready(this: *BufferedOutput, _: i64) void { // TODO: what happens if the task was already enqueued after unwatch()? @@ -483,7 +496,17 @@ pub const Subprocess = struct { } pub fn canRead(this: *BufferedOutput) bool { - return bun.isReadable(this.fd); + const is_readable = bun.isReadable(this.fd); + + if (is_readable) { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.readable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_readable)); + } + } + + return is_readable; } pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void { @@ -495,6 +518,7 @@ pub const Subprocess = struct { // and we don't want this to become an event loop ticking point if (!this.canRead()) { this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } } @@ -502,9 +526,33 @@ pub const Subprocess = struct { this.readAll(force); } + pub fn closeOnEOF(this: *BufferedOutput) bool { + var poll = this.poll_ref orelse return true; + poll.flags.insert(.eof); + return false; + } + pub fn readAll(this: *BufferedOutput, comptime force: bool) void { + if (this.poll_ref) |poll| { + const is_readable = poll.isReadable(); + if (!is_readable and poll.isEOF()) { + if (poll.isHUP()) { + this.autoCloseFileDescriptor(); + } + + return; + } else if (!is_readable and poll.isHUP()) { + this.autoCloseFileDescriptor(); + return; + } else if (!is_readable) { + if (comptime !force) { + return; + } + } + } + // read as much as we can from the pipe - while (this.internal_buffer.len <= this.max_internal_buffer) { + while (this.internal_buffer.len < this.max_internal_buffer) { var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined; var buf: []u8 = buffer_[0..]; @@ -517,7 +565,9 @@ pub const Subprocess = struct { switch (JSC.Node.Syscall.read(this.fd, buf)) { .err => |e| { if (e.isRetry()) { - this.watch(this.fd); + if (!this.isWatching()) + this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } @@ -558,7 +608,9 @@ pub const Subprocess = struct { if (comptime !force) { if (buf[bytes_read..].len > 0 or !this.canRead()) { - this.watch(this.fd); + if (!this.isWatching()) + this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); this.received_eof = true; return; } @@ -566,6 +618,10 @@ pub const Subprocess = struct { // we consider a short read as being EOF this.received_eof = this.received_eof or bytes_read < buf.len; if (this.received_eof) { + if (this.closeOnEOF()) { + this.autoCloseFileDescriptor(); + } + // do not auto-close the file descriptor here // it's totally legit to have a short read return; @@ -579,7 +635,7 @@ pub const Subprocess = struct { pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob { const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis); this.internal_buffer = bun.ByteList.init(""); - std.debug.assert(this.fd == std.math.maxInt(JSC.Node.FileDescriptor)); + std.debug.assert(this.fd == JSC.Node.invalid_fd); std.debug.assert(this.received_eof); return blob; } @@ -600,55 +656,61 @@ pub const Subprocess = struct { ).?; } + var poll_ref = this.poll_ref; + this.poll_ref = null; + return JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlob( + JSC.WebCore.ReadableStream.fromBlobWithPoll( globalThis, &this.toBlob(globalThis), 0, + poll_ref, ), globalThis, ).?; } } - std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); - - // BufferedOutput is going away - // let's make sure we don't watch it anymore - if (this.poll_ref.isActive()) { - this.unwatch(this.fd); - } - - // There could still be data waiting to be read in the pipe - // so we need to create a new stream that will read from the - // pipe and then return the blob. - var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis); - const result = JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlob( + std.debug.assert(this.fd != JSC.Node.invalid_fd); + { + var poll_ref = this.poll_ref; + this.poll_ref = null; + + // There could still be data waiting to be read in the pipe + // so we need to create a new stream that will read from the + // pipe and then return the blob. + var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis); + const result = JSC.WebCore.ReadableStream.fromJS( + JSC.WebCore.ReadableStream.fromBlobWithPoll( + globalThis, + &blob, + 0, + poll_ref, + ), globalThis, - &blob, - 0, - ), - globalThis, - ).?; - blob.detach(); - result.ptr.File.buffered_data = this.internal_buffer; - result.ptr.File.stored_global_this_ = globalThis; - result.ptr.File.finished = exited; - this.internal_buffer = bun.ByteList.init(""); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); - this.received_eof = false; - return result; + ).?; + blob.detach(); + + result.ptr.File.buffered_data = this.internal_buffer; + result.ptr.File.stored_global_this_ = globalThis; + result.ptr.File.finished = exited; + this.internal_buffer = bun.ByteList.init(""); + this.fd = JSC.Node.invalid_fd; + this.received_eof = false; + return result; + } } pub fn autoCloseFileDescriptor(this: *BufferedOutput) void { const fd = this.fd; - if (fd == std.math.maxInt(JSC.Node.FileDescriptor)) + if (fd == JSC.Node.invalid_fd) return; - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; - if (this.poll_ref.isActive()) - this.unwatch(fd); + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } _ = JSC.Node.Syscall.close(fd); } @@ -826,8 +888,9 @@ pub const Subprocess = struct { .items = &.{}, .capacity = 0, }; + var jsc_vm = globalThis.bunVM(); - var cwd = globalThis.bunVM().bundler.fs.top_level_dir; + var cwd = jsc_vm.bundler.fs.top_level_dir; var stdio = [3]Stdio{ .{ .ignore = .{} }, @@ -841,13 +904,13 @@ pub const Subprocess = struct { } var on_exit_callback = JSValue.zero; - var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse ""; + var PATH = jsc_vm.bundler.env.get("PATH") orelse ""; var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined; var cmd_value = JSValue.zero; var args = args_; { if (args.isEmptyOrUndefinedOrNull()) { - globalThis.throwInvalidArguments("cmds must be an array", .{}); + globalThis.throwInvalidArguments("cmd must be an array", .{}); return .zero; } @@ -858,7 +921,7 @@ pub const Subprocess = struct { } else if (args.get(globalThis, "cmd")) |cmd_value_| { cmd_value = cmd_value_; } else { - globalThis.throwInvalidArguments("cmds must be an array", .{}); + globalThis.throwInvalidArguments("cmd must be an array", .{}); return .zero; } @@ -1015,7 +1078,7 @@ pub const Subprocess = struct { defer actions.deinit(); if (env_array.items.len == 0) { - env_array.items = globalThis.bunVM().bundler.env.map.createNullDelimitedEnvMap(allocator) catch |err| return globalThis.handleError(err, "in posix_spawn"); + env_array.items = jsc_vm.bundler.env.map.createNullDelimitedEnvMap(allocator) catch |err| return globalThis.handleError(err, "in posix_spawn"); env_array.capacity = env_array.items.len; } @@ -1100,7 +1163,7 @@ pub const Subprocess = struct { var status: u32 = 0; // ensure we don't leak the child process on error _ = std.os.linux.waitpid(pid, &status, 0); - return JSValue.jsUndefined(); + return .zero; }, } }; @@ -1135,11 +1198,12 @@ pub const Subprocess = struct { subprocess.this_jsvalue.set(globalThis, out); if (comptime !is_sync) { - switch (globalThis.bunVM().poller.watch( - @intCast(JSC.Node.FileDescriptor, pidfd), + var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess); + subprocess.poll_ref = poll; + switch (poll.register( + jsc_vm.uws_event_loop.?, .process, - Subprocess, - subprocess, + true, )) { .result => {}, .err => |err| { @@ -1237,7 +1301,14 @@ pub const Subprocess = struct { if (!sync) { var vm = this.globalThis.bunVM(); - this.unrefWithoutGC(vm); + this.reffer.unref(vm); + + // prevent duplicate notifications + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinitWithVM(vm); + } + this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this); this.has_waitpid_task = true; vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task)); @@ -1245,7 +1316,7 @@ pub const Subprocess = struct { } pub fn unrefWithoutGC(this: *Subprocess, vm: *JSC.VirtualMachine) void { - this.poll_ref.unref(vm); + if (this.poll_ref) |poll| poll.unref(vm); this.reffer.unref(vm); } diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 225536591..37eb3e626 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -3935,7 +3935,7 @@ pub const PollRef = struct { this.status = .done; } - /// Only intended to be used from EventLoop.Poller + /// Only intended to be used from EventLoop.Pollable pub fn deactivate(this: *PollRef, loop: *uws.Loop) void { if (this.status != .active) return; @@ -3945,7 +3945,7 @@ pub const PollRef = struct { loop.active -= 1; } - /// Only intended to be used from EventLoop.Poller + /// Only intended to be used from EventLoop.Pollable pub fn activate(this: *PollRef, loop: *uws.Loop) void { if (this.status != .inactive) return; @@ -3980,6 +3980,548 @@ pub const PollRef = struct { } }; +pub const FilePoll = struct { + fd: u32 = invalid_fd, + flags: Flags.Set = Flags.Set{}, + owner: Owner = Deactivated.owner, + + const FileBlobLoader = JSC.WebCore.FileBlobLoader; + const FileSink = JSC.WebCore.FileSink; + const Subprocess = JSC.Subprocess; + const BufferedInput = Subprocess.BufferedInput; + const BufferedOutput = Subprocess.BufferedOutput; + const Deactivated = opaque { + pub var owner = Owner.init(@intToPtr(*Deactivated, @as(usize, 0xDEADBEEF))); + }; + + pub const Owner = bun.TaggedPointerUnion(.{ + FileBlobLoader, + FileSink, + Subprocess, + BufferedInput, + BufferedOutput, + Deactivated, + }); + + fn updateFlags(poll: *FilePoll, updated: Flags.Set) void { + var flags = poll.flags; + flags.remove(.readable); + flags.remove(.writable); + flags.remove(.process); + flags.remove(.eof); + + flags.setUnion(updated); + poll.flags = flags; + } + + pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void { + poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*)); + poll.onUpdate(loop); + } + + pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void { + poll.updateFlags(Flags.fromEpollEvent(epoll_event.*)); + poll.onUpdate(loop); + } + + pub fn clearEvent(poll: *FilePoll, flag: Flags) void { + poll.flags.remove(flag); + } + + pub fn isReadable(this: *FilePoll) bool { + const readable = this.flags.contains(.readable); + this.flags.remove(.readable); + return readable; + } + + pub fn isHUP(this: *FilePoll) bool { + const readable = this.flags.contains(.hup); + this.flags.remove(.hup); + return readable; + } + + pub fn isEOF(this: *FilePoll) bool { + const readable = this.flags.contains(.eof); + this.flags.remove(.eof); + return readable; + } + + pub fn isWritable(this: *FilePoll) bool { + const readable = this.flags.contains(.writable); + this.flags.remove(.writable); + return readable; + } + + pub fn deinit(this: *FilePoll) void { + var vm = JSC.VirtualMachine.vm; + this.deinitWithVM(vm); + } + + pub fn deinitWithVM(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (this.isRegistered()) { + _ = this.unregister(vm.uws_event_loop.?); + } + + this.owner = Deactivated.owner; + this.flags = Flags.Set{}; + this.fd = invalid_fd; + vm.rareData().filePolls(vm).put(this); + } + + pub fn isRegistered(this: *const FilePoll) bool { + return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process); + } + + pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void { + if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) { + if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop); + poll.flags.insert(.needs_rearm); + } + var ptr = poll.owner; + switch (ptr.tag()) { + @field(Owner.Tag, "FileBlobLoader") => { + log("onUpdate: FileBlobLoader", .{}); + ptr.as(FileBlobLoader).onPoll(0, 0); + }, + @field(Owner.Tag, "Subprocess") => { + log("onUpdate: Subprocess", .{}); + var loader = ptr.as(JSC.Subprocess); + + loader.onExitNotification(); + }, + @field(Owner.Tag, "FileSink") => { + log("onUpdate: FileSink", .{}); + var loader = ptr.as(JSC.WebCore.FileSink); + loader.onPoll(0, 0); + }, + + @field(Owner.Tag, "BufferedInput") => { + log("onUpdate: BufferedInput", .{}); + var loader = ptr.as(JSC.Subprocess.BufferedInput); + loader.onReady(0); + }, + @field(Owner.Tag, "BufferedOutput") => { + log("onUpdate: BufferedOutput", .{}); + var loader = ptr.as(JSC.Subprocess.BufferedOutput); + loader.ready(0); + }, + else => {}, + } + } + + pub const Flags = enum { + // What are we asking the event loop about? + + /// Poll for readable events + poll_readable, + + /// Poll for writable events + poll_writable, + + /// Poll for process-related events + poll_process, + + // What did the event loop tell us? + readable, + writable, + process, + eof, + hup, + + // What is the type of file descriptor? + fifo, + tty, + + one_shot, + needs_rearm, + + has_incremented_poll_count, + + disable, + + pub fn poll(this: Flags) Flags { + return switch (this) { + .readable => .poll_readable, + .writable => .poll_writable, + .process => .poll_process, + else => this, + }; + } + + pub const Set = std.EnumSet(Flags); + pub const Struct = std.enums.EnumFieldStruct(Flags, bool, false); + + pub fn fromKQueueEvent(kqueue_event: std.os.system.kevent64_s) Flags.Set { + var flags = Flags.Set{}; + if (kqueue_event.filter == std.os.system.EVFILT_READ) { + flags.insert(Flags.readable); + if (kqueue_event.flags & std.os.system.EV_EOF != 0) { + flags.insert(Flags.eof); + } + } else if (kqueue_event.filter == std.os.system.EVFILT_WRITE) { + flags.insert(Flags.writable); + if (kqueue_event.flags & std.os.system.EV_EOF != 0) { + flags.insert(Flags.hup); + } + } else if (kqueue_event.filter == std.os.system.EVFILT_PROC) { + flags.insert(Flags.process); + } + return flags; + } + + pub fn fromEpollEvent(epoll: std.os.linux.epoll_event) Flags.Set { + var flags = Flags.Set{}; + if (epoll.events & std.os.linux.EPOLL.IN != 0) { + flags.insert(Flags.readable); + log("readable", .{}); + } + if (epoll.events & std.os.linux.EPOLL.OUT != 0) { + flags.insert(Flags.writable); + log("writable", .{}); + } + if (epoll.events & std.os.linux.EPOLL.ERR != 0) { + flags.insert(Flags.eof); + log("eof", .{}); + } + if (epoll.events & std.os.linux.EPOLL.HUP != 0) { + flags.insert(Flags.hup); + log("hup", .{}); + } + return flags; + } + }; + + pub const HiveArray = bun.HiveArray(FilePoll, 128).Fallback; + + const log = Output.scoped(.FilePoll, false); + + pub inline fn isActive(this: *const FilePoll) bool { + return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable); + } + + /// Make calling ref() on this poll into a no-op. + pub fn disable(this: *FilePoll) void { + if (this.isRegistered()) { + this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?); + } + + this.unref(); + this.flags.insert(.disable); + } + + /// Only intended to be used from EventLoop.Pollable + pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void { + std.debug.assert(this.flags.contains(.has_incremented_poll_count)); + this.flags.remove(.has_incremented_poll_count); + loop.num_polls -= 1; + loop.active -= 1; + } + + /// Only intended to be used from EventLoop.Pollable + pub fn activate(this: *FilePoll, loop: *uws.Loop) void { + std.debug.assert(!this.flags.contains(.has_incremented_poll_count)); + std.debug.assert(!this.flags.contains(.disable)); + this.flags.insert(.has_incremented_poll_count); + loop.num_polls += 1; + loop.active += 1; + } + + pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { + return initWithOwner(vm, fd, flags, Owner.init(owner)); + } + + pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll { + var poll = vm.rareData().filePolls(vm).get(); + poll.* = .{ + .fd = @intCast(u32, fd), + .flags = Flags.Set.init(flags), + .owner = owner, + }; + return poll; + } + + /// Prevent a poll from keeping the process alive. + pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (!this.isActive()) + return; + log("unref", .{}); + this.deactivate(vm.uws_event_loop.?); + } + + /// Allow a poll to keep the process alive. + pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (this.isActive()) + return; + log("ref", .{}); + this.activate(vm.uws_event_loop.?); + } + + pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void { + var tag = Pollable.from(tagged_pointer); + + if (tag.tag() != @field(Pollable.Tag, "FilePoll")) + return; + + var file_poll = tag.as(FilePoll); + if (comptime Environment.isMac) + onKQueueEvent(file_poll, loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]) + else if (comptime Environment.isLinux) + onEpollEvent(file_poll, loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]); + } + + const Pollable = bun.TaggedPointerUnion( + .{ + FilePoll, + Deactivated, + }, + ); + + comptime { + @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" }); + } + + const timeout = std.mem.zeroes(std.os.timespec); + const kevent = std.c.kevent; + const linux = std.os.linux; + pub fn register(this: *FilePoll, loop: *uws.Loop, flag: Flags, one_shot: bool) JSC.Maybe(void) { + const watcher_fd = loop.fd; + const fd = this.fd; + + log("register: {s} ({d})", .{ @tagName(flag), fd }); + + if (one_shot) { + this.flags.insert(.one_shot); + } + + std.debug.assert(this.fd != invalid_fd); + + if (comptime Environment.isLinux) { + const flags: u32 = switch (flag) { + .process, + .readable, + => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), + .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), + else => unreachable, + }; + + var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(this).ptr()) } }; + + const ctl = linux.epoll_ctl( + watcher_fd, + if (this.isRegistered() or this.flags.contains(.needs_rearm)) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD, + @intCast(std.os.fd_t, fd), + &event, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; + } + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + changelist[0] = switch (flag) { + .readable => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .writable => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .process => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_PROC, + .data = 0, + .fflags = std.c.NOTE_EXIT, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_ADD, + .ext = .{ 0, 0 }, + }, + }; + + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = rc: { + while (true) { + const rc = std.os.system.kevent64( + watcher_fd, + &changelist, + 1, + // The same array may be used for the changelist and eventlist. + &changelist, + 1, + KEVENT_FLAG_ERROR_EVENTS, + &timeout, + ); + + if (std.c.getErrno(rc) == .INTR) continue; + break :rc rc; + } + }; + + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + const errno = std.c.getErrno(rc); + + if (errno != .SUCCESS) { + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, + else => unreachable, + } + } + } else { + @compileError("TODO: Pollable"); + } + + if (!this.isActive()) this.activate(loop); + this.flags.insert(switch (flag) { + .process, .readable => .poll_readable, + .writable => .poll_writable, + else => unreachable, + }); + return JSC.Maybe(void).success; + } + + pub const invalid_fd = std.math.maxInt(u32); + + pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) { + if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) { + // no-op + return JSC.Maybe(void).success; + } + + const fd = this.fd; + std.debug.assert(fd != invalid_fd); + const watcher_fd = loop.fd; + const flag: Flags = brk: { + if (this.flags.contains(.poll_readable)) + break :brk .readable; + if (this.flags.contains(.poll_writable)) + break :brk .writable; + if (this.flags.contains(.poll_process)) + break :brk .process; + return JSC.Maybe(void).success; + }; + + log("unregister: {s} ({d})", .{ @tagName(flag), fd }); + + if (comptime Environment.isLinux) { + const ctl = linux.epoll_ctl( + watcher_fd, + linux.EPOLL.CTL_DEL, + @intCast(std.os.fd_t, fd), + null, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; + } + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + + changelist[0] = switch (flag) { + .read => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .write => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .process => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_PROC, + .data = 0, + .fflags = std.c.NOTE_EXIT, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + else => unreachable, + }; + + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = std.os.system.kevent64( + watcher_fd, + &changelist, + 1, + // The same array may be used for the changelist and eventlist. + &changelist, + 1, + KEVENT_FLAG_ERROR_EVENTS, + &timeout, + ); + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + const errno = std.c.getErrno(rc); + + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, + else => unreachable, + } + } else { + @compileError("TODO: Pollable"); + } + + this.flags.remove(.needs_rearm); + this.flags.remove(.one_shot); + // we don't support both right now + std.debug.assert(!(this.flags.contains(.poll_readable) and this.flags.contains(.poll_writable))); + this.flags.remove(.poll_readable); + this.flags.remove(.poll_writable); + this.flags.remove(.poll_process); + + if (this.isActive()) + this.deactivate(loop); + + return JSC.Maybe(void).success; + } +}; + pub const Strong = extern struct { ref: ?*JSC.napi.Ref = null, diff --git a/src/bun.js/bindings/wtf-bindings.cpp b/src/bun.js/bindings/wtf-bindings.cpp index 78b88ad5e..699e3db5b 100644 --- a/src/bun.js/bindings/wtf-bindings.cpp +++ b/src/bun.js/bindings/wtf-bindings.cpp @@ -23,26 +23,26 @@ extern "C" void Bun__crashReportDumpStackTrace(void* ctx) void* stack[framesToShow + framesToSkip]; int frames = framesToShow + framesToSkip; WTFGetBacktrace(stack, &frames); + int size = frames - framesToSkip; bool isFirst = true; - WTF::StackTraceSymbolResolver { { stack, static_cast<size_t>(frames) } }.forEach([&](int frameNumber, void* stackFrame, const char* name) { - if (frameNumber < framesToSkip) - return; - + for (int frameNumber = 0; frameNumber < size; ++frameNumber) { + auto demangled = WTF::StackTraceSymbolResolver::demangle(stack[frameNumber]); + StringPrintStream out; if (isFirst) { isFirst = false; - if (name) - out.printf("\n%-3d %p %s", frameNumber, stackFrame, name); + if (demangled) + out.printf("\n%-3d %p %s", frameNumber, stack[frameNumber], demangled->demangledName() ? demangled->demangledName() : demangled->mangledName()); else - out.printf("\n%-3d %p", frameNumber, stackFrame); + out.printf("\n%-3d %p", frameNumber, stack[frameNumber]); } else { - if (name) - out.printf("%-3d %p %s", frameNumber, stackFrame, name); + if (demangled) + out.printf("%-3d ??? %s", frameNumber, demangled->demangledName() ? demangled->demangledName() : demangled->mangledName()); else - out.printf("%-3d %p", frameNumber, stackFrame); + out.printf("%-3d ???", frameNumber); } auto str = out.toCString(); Bun__crashReportWrite(ctx, str.data(), str.length()); - }); + } }
\ No newline at end of file diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js index 49750878c..10a538333 100644 --- a/src/bun.js/child_process.exports.js +++ b/src/bun.js/child_process.exports.js @@ -536,8 +536,13 @@ export function spawnSync(file, args, options) { result.stderr = result.output[2]; if (!success) { - result.error = errnoException(result.stderr, "spawnSync " + options.file); - result.error.path = options.file; + result.error = new SystemError( + result.stderr, + options.file, + "spawnSync", + -1, + result.status, + ); result.error.spawnargs = ArrayPrototypeSlice.call(options.args, 1); } @@ -844,7 +849,6 @@ export class ChildProcess extends EventEmitter { connected = false; signalCode = null; exitCode = null; - killed = false; spawnfile; spawnargs; pid; @@ -854,6 +858,10 @@ export class ChildProcess extends EventEmitter { stdio; channel; + get killed() { + if (this.#handle == null) return false; + } + // constructor(options) { // super(options); // this.#handle[owner_symbol] = this; @@ -877,7 +885,14 @@ export class ChildProcess extends EventEmitter { if (exitCode < 0) { const syscall = this.spawnfile ? "spawn " + this.spawnfile : "spawn"; - const err = errnoException(exitCode, syscall); + + const err = new SystemError( + `Spawned process exited with error code: ${exitCode}`, + undefined, + "spawn", + "EUNKNOWN", + "ERR_CHILD_PROCESS_UNKNOWN_ERROR", + ); if (this.spawnfile) err.path = this.spawnfile; @@ -906,15 +921,17 @@ export class ChildProcess extends EventEmitter { } #getBunSpawnIo(stdio, options) { - const result = []; + const result = [null]; switch (stdio[0]) { case "pipe": result[0] = new WrappedFileSink(this.#handle.stdin); break; case "inherit": result[0] = process.stdin; + break; default: result[0] = null; + break; } let i = 1; for (; i < stdio.length; i++) { @@ -959,7 +976,7 @@ export class ChildProcess extends EventEmitter { validateString(options.file, "options.file"); this.spawnfile = options.file; - if (options.args === undefined) { + if (options.args == null) { this.spawnargs = []; } else { validateArray(options.args, "options.args"); @@ -969,9 +986,8 @@ export class ChildProcess extends EventEmitter { const stdio = options.stdio || "pipe"; const bunStdio = getBunStdioFromOptions(stdio); - const cmd = options.args; this.#handle = Bun.spawn({ - cmd, + cmd: [options.file, ...this.spawnargs], stdin: bunStdio[0], stdout: bunStdio[1], stderr: bunStdio[2], @@ -1066,14 +1082,13 @@ export class ChildProcess extends EventEmitter { this.#handle.kill(signal); } - this.killed = true; this.emit("exit", null, signal); this.#maybeClose(); // TODO: Make this actually ensure the process has exited before returning // await this.#handle.exited() // return this.#handle.killed; - return this.killed; + return this.#handle?.killed ?? true; } #maybeClose() { @@ -1698,8 +1713,22 @@ function ERR_INVALID_ARG_VALUE(name, value, reason) { } // TODO: Add actual proper error implementation here -function errnoException(err, name) { - return new Error(`Error: ${name}. Internal error: ${err.message}`); +class SystemError extends Error { + path; + syscall; + errno; + code; + constructor(message, path, syscall, errno, code) { + super(message); + this.path = path; + this.syscall = syscall; + this.errno = errno; + this.code = code; + } + + get name() { + return "SystemError"; + } } export default { diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 0c99a949a..b62ac0123 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -377,8 +377,9 @@ pub const EventLoop = struct { ctx.global.vm().releaseWeakRefs(); ctx.global.vm().drainMicrotasks(); + var loop = ctx.uws_event_loop orelse return; - if (ctx.poller.loop != null and ctx.poller.loop.?.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (ctx.uws_event_loop.?.num_polls > 0 or this.start_server_on_next_tick))) { + if (loop.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (loop.num_polls > 0 or this.start_server_on_next_tick))) { if (this.tickConcurrentWithCount() > 0) { this.tick(); } else { @@ -449,351 +450,3 @@ pub const EventLoop = struct { } } }; - -pub const Poller = struct { - /// kqueue() or epoll() - /// 0 == unset - loop: ?*uws.Loop = null, - - pub fn dispatchKQueueEvent(loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void { - if (comptime !Environment.isMac) { - unreachable; - } - var ptr = Pollable.from(@intToPtr(?*anyopaque, kqueue_event.udata)); - - switch (ptr.tag()) { - @field(Pollable.Tag, "FileBlobLoader") => { - var loader = ptr.as(FileBlobLoader); - loader.poll_ref.deactivate(loop); - - loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags); - }, - @field(Pollable.Tag, "Subprocess") => { - var loader = ptr.as(JSC.Subprocess); - - loader.poll_ref.deactivate(loop); - loader.onExitNotification(); - }, - @field(Pollable.Tag, "BufferedInput") => { - var loader = ptr.as(JSC.Subprocess.BufferedInput); - - loader.poll_ref.deactivate(loop); - - loader.onReady(@bitCast(i64, kqueue_event.data)); - }, - @field(Pollable.Tag, "BufferedOutput") => { - var loader = ptr.as(JSC.Subprocess.BufferedOutput); - - loader.poll_ref.deactivate(loop); - - loader.ready(@bitCast(i64, kqueue_event.data)); - }, - @field(Pollable.Tag, "FileSink") => { - var loader = ptr.as(JSC.WebCore.FileSink); - loader.poll_ref.deactivate(loop); - - loader.onPoll(0, 0); - }, - else => |tag| { - bun.Output.panic( - "Internal error\nUnknown pollable tag: {d}\n", - .{@enumToInt(tag)}, - ); - }, - } - } - - fn dispatchEpollEvent(loop: *uws.Loop, epoll_event: *linux.epoll_event) void { - var ptr = Pollable.from(@intToPtr(?*anyopaque, epoll_event.data.ptr)); - switch (ptr.tag()) { - @field(Pollable.Tag, "FileBlobLoader") => { - var loader = ptr.as(FileBlobLoader); - loader.poll_ref.deactivate(loop); - - loader.onPoll(0, 0); - }, - @field(Pollable.Tag, "Subprocess") => { - var loader = ptr.as(JSC.Subprocess); - loader.poll_ref.deactivate(loop); - - loader.onExitNotification(); - }, - @field(Pollable.Tag, "FileSink") => { - var loader = ptr.as(JSC.WebCore.FileSink); - loader.poll_ref.deactivate(loop); - - loader.onPoll(0, 0); - }, - - @field(Pollable.Tag, "BufferedInput") => { - var loader = ptr.as(JSC.Subprocess.BufferedInput); - - loader.poll_ref.deactivate(loop); - - loader.onReady(0); - }, - @field(Pollable.Tag, "BufferedOutput") => { - var loader = ptr.as(JSC.Subprocess.BufferedOutput); - - loader.poll_ref.deactivate(loop); - - loader.ready(0); - }, - else => unreachable, - } - } - - const timeout = std.mem.zeroes(std.os.timespec); - const linux = std.os.linux; - - const FileBlobLoader = JSC.WebCore.FileBlobLoader; - const FileSink = JSC.WebCore.FileSink; - const Subprocess = JSC.Subprocess; - const BufferedInput = Subprocess.BufferedInput; - const BufferedOutput = Subprocess.BufferedOutput; - /// epoll only allows one pointer - /// We unfortunately need two pointers: one for a function call and one for the context - /// We use a tagged pointer union and then call the function with the context pointer - pub const Pollable = TaggedPointerUnion(.{ - FileBlobLoader, - FileSink, - Subprocess, - BufferedInput, - BufferedOutput, - }); - const Kevent = std.os.Kevent; - const kevent = std.c.kevent; - - pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { - if (this.loop == null) { - this.loop = uws.Loop.get(); - JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; - } - const watcher_fd = this.loop.?.fd; - - if (comptime Environment.isLinux) { - const flags: u32 = switch (flag) { - .process, .read => linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.ONESHOT, - .write => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | linux.EPOLL.ONESHOT, - }; - - var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(ctx).ptr()) } }; - - const ctl = linux.epoll_ctl( - watcher_fd, - linux.EPOLL.CTL_ADD, - fd, - &event, - ); - - if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { - return errno; - } - - ctx.poll_ref.activate(this.loop.?); - - return JSC.Maybe(void).success; - } else if (comptime Environment.isMac) { - var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); - changelist[0] = switch (flag) { - .read => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_READ, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .write => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_WRITE, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .process => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_PROC, - .data = 0, - .fflags = std.c.NOTE_EXIT, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_ADD, - .ext = .{ 0, 0 }, - }, - }; - - // output events only include change errors - const KEVENT_FLAG_ERROR_EVENTS = 0x000002; - - // The kevent() system call returns the number of events placed in - // the eventlist, up to the value given by nevents. If the time - // limit expires, then kevent() returns 0. - const rc = rc: { - while (true) { - const rc = std.os.system.kevent64( - watcher_fd, - &changelist, - 1, - // The same array may be used for the changelist and eventlist. - &changelist, - 1, - KEVENT_FLAG_ERROR_EVENTS, - &timeout, - ); - - if (std.c.getErrno(rc) == .INTR) continue; - break :rc rc; - } - }; - - // If an error occurs while - // processing an element of the changelist and there is enough room - // in the eventlist, then the event will be placed in the eventlist - // with EV_ERROR set in flags and the system error in data. - if (changelist[0].flags == std.c.EV_ERROR) { - return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; - // Otherwise, -1 will be returned, and errno will be set to - // indicate the error condition. - } - - const errno = std.c.getErrno(rc); - - if (errno == .SUCCESS) { - ctx.poll_ref.activate(this.loop.?); - - return JSC.Maybe(void).success; - } - - switch (rc) { - std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, - else => unreachable, - } - } else { - @compileError("TODO: Poller"); - } - } - - pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { - if (this.loop == null) { - this.loop = uws.Loop.get(); - JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; - } - const watcher_fd = this.loop.?.fd; - - if (comptime Environment.isLinux) { - const ctl = linux.epoll_ctl( - watcher_fd, - linux.EPOLL.CTL_DEL, - fd, - null, - ); - - if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { - return errno; - } - - ctx.poll_ref.deactivate(this.loop.?); - - return JSC.Maybe(void).success; - } else if (comptime Environment.isMac) { - var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); - changelist[0] = switch (flag) { - .read => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_READ, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .write => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_WRITE, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .process => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_PROC, - .data = 0, - .fflags = std.c.NOTE_EXIT, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - }; - - // output events only include change errors - const KEVENT_FLAG_ERROR_EVENTS = 0x000002; - - // The kevent() system call returns the number of events placed in - // the eventlist, up to the value given by nevents. If the time - // limit expires, then kevent() returns 0. - const rc = std.os.system.kevent64( - watcher_fd, - &changelist, - 1, - // The same array may be used for the changelist and eventlist. - &changelist, - 1, - KEVENT_FLAG_ERROR_EVENTS, - &timeout, - ); - // If an error occurs while - // processing an element of the changelist and there is enough room - // in the eventlist, then the event will be placed in the eventlist - // with EV_ERROR set in flags and the system error in data. - if (changelist[0].flags == std.c.EV_ERROR) { - return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; - // Otherwise, -1 will be returned, and errno will be set to - // indicate the error condition. - } - - const errno = std.c.getErrno(rc); - - if (errno == .SUCCESS) { - ctx.poll_ref.deactivate(this.loop.?); - return JSC.Maybe(void).success; - } - - switch (rc) { - std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, - else => unreachable, - } - } else { - @compileError("TODO: Poller"); - } - } - - pub fn tick(this: *Poller) void { - var loop = this.loop orelse return; - if (loop.active == 0) return; - loop.tick(); - } - - pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void { - _ = loop; - _ = tagged_pointer; - if (comptime Environment.isMac) - dispatchKQueueEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]) - else if (comptime Environment.isLinux) - dispatchEpollEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]); - } - - pub const Flag = enum { - read, - write, - process, - }; - - comptime { - @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" }); - } -}; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 82d34a986..23bcf542f 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -417,7 +417,6 @@ pub const VirtualMachine = struct { active_tasks: usize = 0, rare_data: ?*JSC.RareData = null, - poller: JSC.Poller = JSC.Poller{}, us_loop_reference_count: usize = 0, is_us_loop_entered: bool = false, pending_internal_promise: *JSC.JSInternalPromise = undefined, diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig index 23596d7b5..ff9f1ba1f 100644 --- a/src/bun.js/node/node_fs.zig +++ b/src/bun.js/node/node_fs.zig @@ -1823,7 +1823,7 @@ const Arguments = struct { .file = undefined, .global_object = ctx.ptr(), }; - var fd: FileDescriptor = std.math.maxInt(FileDescriptor); + var fd: FileDescriptor = JSC.Node.invalid_fd; if (arguments.next()) |arg| { arguments.eat(); @@ -1918,7 +1918,7 @@ const Arguments = struct { } } - if (fd != std.math.maxInt(FileDescriptor)) { + if (fd != JSC.Node.invalid_fd) { stream.file = .{ .fd = fd }; } else if (path) |path_| { stream.file = .{ .path = path_ }; @@ -1957,7 +1957,7 @@ const Arguments = struct { .file = undefined, .global_object = ctx.ptr(), }; - var fd: FileDescriptor = std.math.maxInt(FileDescriptor); + var fd: FileDescriptor = JSC.Node.invalid_fd; if (arguments.next()) |arg| { arguments.eat(); @@ -2044,7 +2044,7 @@ const Arguments = struct { } } - if (fd != std.math.maxInt(FileDescriptor)) { + if (fd != JSC.Node.invalid_fd) { stream.file = .{ .fd = fd }; } else if (path) |path_| { stream.file = .{ .path = path_ }; diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig index c2d13ea59..dd961b510 100644 --- a/src/bun.js/node/syscall.zig +++ b/src/bun.js/node/syscall.zig @@ -101,6 +101,7 @@ pub const Tag = enum(u8) { epoll_ctl, kill, waitpid, + posix_spawn, pub var strings = std.EnumMap(Tag, JSC.C.JSStringRef).initFull(null); }; const PathString = @import("../../global.zig").PathString; diff --git a/src/bun.js/rare_data.zig b/src/bun.js/rare_data.zig index 8686fa1b4..895dac40f 100644 --- a/src/bun.js/rare_data.zig +++ b/src/bun.js/rare_data.zig @@ -23,6 +23,16 @@ entropy_cache: ?*EntropyCache = null, tail_cleanup_hook: ?*CleanupHook = null, cleanup_hook: ?*CleanupHook = null, +file_polls_: ?*JSC.FilePoll.HiveArray = null, + +pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.HiveArray { + return this.file_polls_ orelse { + this.file_polls_ = vm.allocator.create(JSC.FilePoll.HiveArray) catch unreachable; + this.file_polls_.?.* = JSC.FilePoll.HiveArray.init(vm.allocator); + return this.file_polls_.?; + }; +} + pub fn nextUUID(this: *RareData) [16]u8 { if (this.entropy_cache == null) { this.entropy_cache = default_allocator.create(EntropyCache) catch unreachable; diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 36ed271fd..81e3c6a51 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -471,7 +471,7 @@ pub const Response = struct { } }; -const null_fd = std.math.maxInt(JSC.Node.FileDescriptor); +const null_fd = JSC.Node.invalid_fd; pub const Fetch = struct { const headers_string = "headers"; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 2d7a3badd..2f9c509d3 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -245,7 +245,12 @@ pub const ReadableStream = struct { JSC.markBinding(@src()); return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id))); } + pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { + return fromBlobWithPoll(globalThis, blob, recommended_chunk_size, null); + } + + pub fn fromBlobWithPoll(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) JSC.JSValue { if (comptime JSC.is_bindgen) unreachable; var store = blob.store orelse { @@ -265,7 +270,7 @@ pub const ReadableStream = struct { reader.* = .{ .context = undefined, }; - reader.context.setup(store, recommended_chunk_size); + reader.context.setupWithPoll(store, recommended_chunk_size, poll); return reader.toJS(globalThis); }, } @@ -1028,9 +1033,9 @@ pub const FileSink = struct { prevent_process_exit: bool = false, reachable_from_js: bool = true, - poll_ref: JSC.PollRef = .{}, + poll_ref: ?*JSC.FilePoll = null, - pub usingnamespace NewReadyWatcher(@This(), .write, ready); + pub usingnamespace NewReadyWatcher(@This(), .writable, ready); const max_fifo_size = 64 * 1024; pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) { @@ -1043,20 +1048,24 @@ pub const FileSink = struct { .err => |err| return .{ .err = err.withPath(input_path.path.slice()) }, }; - const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) { - .result => |result| result, - .err => |err| { - if (auto_close) { - _ = JSC.Node.Syscall.close(fd); - } - return .{ .err = err.withPathLike(input_path) }; - }, - }; + if (this.poll_ref == null) { + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return .{ .err = err.withPathLike(input_path) }; + }, + }; - this.mode = stat.mode; - this.fd = fd; + this.mode = stat.mode; + this.auto_truncate = this.auto_truncate and (std.os.S.ISREG(this.mode)); + } else { + this.auto_truncate = false; + } - this.auto_truncate = this.auto_truncate and (std.os.S.ISREG(this.mode)); + this.fd = fd; return .{ .result = {} }; } @@ -1213,7 +1222,10 @@ pub const FileSink = struct { if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { if (this.scheduled_count > 0) { this.scheduled_count = 0; - this.unwatch(this.fd); + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } } _ = JSC.Node.Syscall.close(this.fd); @@ -1342,7 +1354,8 @@ pub const FileSink = struct { } fn isPending(this: *const FileSink) bool { - return this.poll_ref.status == .active; + var poll_ref = this.poll_ref orelse return false; + return poll_ref.isActive(); } pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { @@ -3055,6 +3068,7 @@ pub const FileBlobLoader = struct { finalized: bool = false, callback: anyframe = undefined, buffered_data: bun.ByteList = .{}, + buffered_data_max: u32 = 0, pending: StreamResult.Pending = StreamResult.Pending{ .frame = undefined, .state = .none, @@ -3066,7 +3080,7 @@ pub const FileBlobLoader = struct { concurrent: Concurrent = Concurrent{}, started: bool = false, stored_global_this_: ?*JSC.JSGlobalObject = null, - poll_ref: JSC.PollRef = .{}, + poll_ref: ?*JSC.FilePoll = null, has_adjusted_pipe_size_on_linux: bool = false, finished: bool = false, @@ -3079,7 +3093,7 @@ pub const FileBlobLoader = struct { signal: JSC.WebCore.Signal = .{}, - pub usingnamespace NewReadyWatcher(@This(), .read, ready); + pub usingnamespace NewReadyWatcher(@This(), .readable, ready); pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject { return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis; @@ -3091,14 +3105,22 @@ pub const FileBlobLoader = struct { pub const tag = ReadableStream.Tag.File; - pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void { + pub fn setupWithPoll(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void { store.ref(); this.* = .{ .loop = JSC.VirtualMachine.vm.eventLoop(), .auto_close = store.data.file.pathlike == .path, .store = store, .user_chunk_size = chunk_size, + .poll_ref = poll, }; + if (this.poll_ref) |poll_| { + poll_.owner.set(this); + } + } + + pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void { + this.setupWithPoll(store, chunk_size, null); } pub fn finish(this: *FileBlobLoader) void { @@ -3274,7 +3296,7 @@ pub const FileBlobLoader = struct { pub fn scheduleAsync(this: *FileReader, chunk_size: Blob.SizeType) void { this.scheduled_count += 1; - this.poll_ref.ref(this.globalThis().bunVM()); + this.pollRef().ref(this.globalThis().bunVM()); std.debug.assert(this.started); NetworkThread.init() catch {}; this.concurrent.chunk_size = chunk_size; @@ -3303,65 +3325,70 @@ pub const FileBlobLoader = struct { }, }; - if (!auto_close) { - // ensure we have non-blocking IO set - switch (Syscall.fcntl(fd, std.os.F.GETFL, 0)) { - .err => return .{ .err = Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }, - .result => |flags| { - // if we do not, clone the descriptor and set non-blocking - // it is important for us to clone it so we don't cause Weird Things to happen - if ((flags & std.os.O.NONBLOCK) == 0) { - auto_close = true; - fd = switch (Syscall.fcntl(fd, std.os.F.DUPFD, 0)) { - .result => |_fd| @intCast(@TypeOf(fd), _fd), - .err => |err| return .{ .err = err }, - }; - - switch (Syscall.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK)) { - .err => |err| return .{ .err = err }, - .result => |_| {}, + if (this.poll_ref) |poll| { + file.seekable = false; + std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd)); + } else { + if (!auto_close) { + // ensure we have non-blocking IO set + switch (Syscall.fcntl(fd, std.os.F.GETFL, 0)) { + .err => return .{ .err = Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }, + .result => |flags| { + // if we do not, clone the descriptor and set non-blocking + // it is important for us to clone it so we don't cause Weird Things to happen + if ((flags & std.os.O.NONBLOCK) == 0) { + auto_close = true; + fd = switch (Syscall.fcntl(fd, std.os.F.DUPFD, 0)) { + .result => |_fd| @intCast(@TypeOf(fd), _fd), + .err => |err| return .{ .err = err }, + }; + + switch (Syscall.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK)) { + .err => |err| return .{ .err = err }, + .result => |_| {}, + } } + }, + } + } + + const stat: std.os.Stat = switch (Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + if (auto_close) { + _ = Syscall.close(fd); } + this.deinit(); + return .{ .err = err.withPath(file.pathlike.path.slice()) }; }, - } - } + }; - const stat: std.os.Stat = switch (Syscall.fstat(fd)) { - .result => |result| result, - .err => |err| { + if (std.os.S.ISDIR(stat.mode)) { + const err = Syscall.Error.fromCode(.ISDIR, .fstat); if (auto_close) { _ = Syscall.close(fd); } this.deinit(); - return .{ .err = err.withPath(file.pathlike.path.slice()) }; - }, - }; - - if (std.os.S.ISDIR(stat.mode)) { - const err = Syscall.Error.fromCode(.ISDIR, .fstat); - if (auto_close) { - _ = Syscall.close(fd); + return .{ .err = err }; } - this.deinit(); - return .{ .err = err }; - } - if (std.os.S.ISSOCK(stat.mode)) { - const err = Syscall.Error.fromCode(.INVAL, .fstat); + if (std.os.S.ISSOCK(stat.mode)) { + const err = Syscall.Error.fromCode(.INVAL, .fstat); - if (auto_close) { - _ = Syscall.close(fd); + if (auto_close) { + _ = Syscall.close(fd); + } + this.deinit(); + return .{ .err = err }; } - this.deinit(); - return .{ .err = err }; - } - file.seekable = std.os.S.ISREG(stat.mode); - file.mode = @intCast(JSC.Node.Mode, stat.mode); - this.mode = file.mode; + file.seekable = std.os.S.ISREG(stat.mode); + file.mode = @intCast(JSC.Node.Mode, stat.mode); + this.mode = file.mode; - if (file.seekable orelse false) - file.max_size = @intCast(Blob.SizeType, stat.size); + if (file.seekable orelse false) + file.max_size = @intCast(Blob.SizeType, stat.size); + } if ((file.seekable orelse false) and file.max_size == 0) { if (auto_close) { @@ -3482,6 +3509,7 @@ pub const FileBlobLoader = struct { std.debug.assert(read_buf.len > 0); if (this.fd == std.math.maxInt(JSC.Node.FileDescriptor)) { + std.debug.assert(this.poll_ref == null); return .{ .done = {} }; } @@ -3491,7 +3519,7 @@ pub const FileBlobLoader = struct { // if it's a pipe, we really don't know what to expect what the max size will be // if the pipe is sending us WAY bigger data than what we can fit in the buffer // we allocate a new buffer of up to 4 MB - if (std.os.S.ISFIFO(this.mode) and view != .zero) { + if (this.isFIFO() and view != .zero) { outer: { var len: c_int = available_to_read orelse 0; @@ -3519,6 +3547,14 @@ pub const FileBlobLoader = struct { const F_SETPIPE_SZ = 1031; const F_GETPIPE_SZ = 1032; + if (len > 0) { + if (this.poll_ref) |poll| { + poll.flags.insert(.readable); + } + } else if (this.poll_ref) |poll| { + poll.flags.remove(.readable); + } + if (!this.has_adjusted_pipe_size_on_linux) { if (len + 1024 > 16 * std.mem.page_size) { this.has_adjusted_pipe_size_on_linux = true; @@ -3552,6 +3588,31 @@ pub const FileBlobLoader = struct { } } + if (this.poll_ref) |poll| { + const is_readable = poll.isReadable(); + if (!is_readable and poll.isEOF()) { + if (poll.isHUP()) { + this.finalize(); + } + + return .{ .done = {} }; + } else if (!is_readable and poll.isHUP()) { + this.finalize(); + return .{ .done = {} }; + } else if (!is_readable and poll.isRegistered()) { + if (view != .zero) { + this.view.set(this.globalThis(), view); + this.buf = read_buf; + if (!this.isWatching()) + this.watch(this.fd); + } + + return .{ + .pending = &this.pending, + }; + } + } + const rc = Syscall.read(this.fd, buf_to_use); switch (rc) { @@ -3563,7 +3624,7 @@ pub const FileBlobLoader = struct { // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already // sent a 0 // Let's retry later. - if (std.os.S.ISFIFO(this.mode) and + if (this.isFIFO() and !this.close_on_eof and _errno == .PERM) { break :brk .AGAIN; @@ -3587,7 +3648,8 @@ pub const FileBlobLoader = struct { if (view != .zero) { this.view.set(this.globalThis(), view); this.buf = read_buf; - this.watch(this.fd); + if (!this.isWatching()) + this.watch(this.fd); } return .{ @@ -3605,6 +3667,15 @@ pub const FileBlobLoader = struct { return .{ .err = sys }; }, .result => |result| { + if (this.poll_ref) |poll| { + if (this.isFIFO()) { + if (result < buf_to_use.len) { + // do not insert .eof here + poll.flags.remove(.readable); + } + } + } + if (result == 0 and free_buffer_on_error) { bun.default_allocator.free(buf_to_use); buf_to_use = read_buf; @@ -3614,10 +3685,13 @@ pub const FileBlobLoader = struct { return this.handleReadChunk(result, view, true, buf_to_use); } - if (result == 0 and !this.finished and !this.close_on_eof and std.os.S.ISFIFO(this.mode)) { + if (result == 0 and !this.finished and !this.close_on_eof and this.isFIFO()) { this.view.set(this.globalThis(), view); this.buf = read_buf; - this.watch(this.fd); + if (!this.isWatching()) + this.watch(this.fd); + this.poll_ref.?.flags.remove(.readable); + return .{ .pending = &this.pending, }; @@ -3628,6 +3702,14 @@ pub const FileBlobLoader = struct { } } + pub fn isFIFO(this: *const FileBlobLoader) bool { + if (this.poll_ref) |poll| { + return poll.flags.contains(.fifo) or poll.flags.contains(.tty); + } + + return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode); + } + /// Called from Poller pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void { std.debug.assert(this.started); @@ -3637,13 +3719,13 @@ pub const FileBlobLoader = struct { var available_to_read: usize = std.math.maxInt(usize); if (comptime Environment.isMac) { - if (std.os.S.ISREG(this.mode)) { + if (this.isFIFO()) { + available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); + } else if (std.os.S.ISREG(this.mode)) { // Returns when the file pointer is not at the end of // file. data contains the offset from current position // to end of file, and may be negative. available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); - } else if (std.os.S.ISCHR(this.mode) or std.os.S.ISFIFO(this.mode)) { - available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); } } if (this.finalized and this.scheduled_count == 0) { @@ -3657,8 +3739,19 @@ pub const FileBlobLoader = struct { this.deinit(); return; } - if (this.cancelled) + + // If we do nothing here, stop watching the file descriptor + var unschedule = this.poll_ref != null; + defer { + if (unschedule) { + if (this.poll_ref) |ref| { + _ = ref.unregister(this.globalThis().bunVM().uws_event_loop.?); + } + } + } + if (this.cancelled) { return; + } if (this.buf.len == 0) { return; @@ -3674,6 +3767,7 @@ pub const FileBlobLoader = struct { else @truncate(c_int, @intCast(isize, available_to_read)), ); + unschedule = false; this.pending.run(); } @@ -3685,12 +3779,16 @@ pub const FileBlobLoader = struct { if (this.buffered_data.cap > 0) { this.buffered_data.listManaged(bun.default_allocator).deinit(); + this.buffered_data.cap = 0; } this.finished = true; - if (this.poll_ref.isActive()) - this.unwatch(this.fd); + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } + this.finalized = true; this.pending.result = .{ .done = {} }; @@ -3725,7 +3823,7 @@ pub const FileBlobLoader = struct { pub fn NewReadyWatcher( comptime Context: type, - comptime flag_: JSC.Poller.Flag, + comptime flag_: JSC.FilePoll.Flags, comptime onReady: anytype, ) type { return struct { @@ -3739,13 +3837,51 @@ pub fn NewReadyWatcher( } pub fn unwatch(this: *Context, fd: JSC.Node.FileDescriptor) void { - std.debug.assert(fd != std.math.maxInt(JSC.Node.FileDescriptor)); - _ = JSC.VirtualMachine.vm.poller.unwatch(fd, flag, Context, this); + std.debug.assert(@intCast(JSC.Node.FileDescriptor, this.poll_ref.?.fd) == fd); + std.debug.assert( + this.poll_ref.?.unregister(JSC.VirtualMachine.vm.uws_event_loop.?) == .result, + ); + } + + pub fn pollRef(this: *Context) *JSC.FilePoll { + return this.poll_ref orelse brk: { + this.poll_ref = JSC.FilePoll.init( + JSC.VirtualMachine.vm, + this.fd, + .{}, + Context, + this, + ); + break :brk this.poll_ref.?; + }; + } + + pub fn isWatching(this: *Context) bool { + if (this.poll_ref) |poll| { + return poll.flags.contains(flag.poll()); + } + + return false; } pub fn watch(this: *Context, fd: JSC.Node.FileDescriptor) void { - std.debug.assert(fd != std.math.maxInt(JSC.Node.FileDescriptor)); - _ = JSC.VirtualMachine.vm.poller.watch(fd, flag, Context, this); + var poll_ref: *JSC.FilePoll = this.poll_ref orelse brk: { + this.poll_ref = JSC.FilePoll.init( + JSC.VirtualMachine.vm, + fd, + .{}, + Context, + this, + ); + break :brk this.poll_ref.?; + }; + std.debug.assert(poll_ref.fd == fd); + switch (poll_ref.register(JSC.VirtualMachine.vm.uws_event_loop.?, flag, true)) { + .err => |err| { + bun.unreachablePanic("FilePoll.register failed: {d}", .{err.errno}); + }, + .result => {}, + } } }; } diff --git a/src/global.zig b/src/global.zig index fe4f5246a..7a9ef2657 100644 --- a/src/global.zig +++ b/src/global.zig @@ -378,3 +378,5 @@ pub const Bunfig = @import("./bunfig.zig").Bunfig; pub const HTTPThead = @import("./http_client_async.zig").HTTPThread; pub const Analytics = @import("./analytics/analytics_thread.zig"); + +pub usingnamespace @import("./tagged_pointer.zig"); diff --git a/src/hive_array.zig b/src/hive_array.zig index eb9220e19..39f10d324 100644 --- a/src/hive_array.zig +++ b/src/hive_array.zig @@ -64,6 +64,34 @@ pub fn HiveArray(comptime T: type, comptime capacity: u16) type { self.available.set(index); return true; } + + pub const Fallback = struct { + hive: HiveArray(T, capacity), + allocator: std.mem.Allocator, + + pub const This = @This(); + + pub fn init(allocator: std.mem.Allocator) This { + return .{ + .allocator = allocator, + .hive = HiveArray(T, capacity).init(), + }; + } + + pub fn get(self: *This) *T { + if (self.hive.get()) |value| { + return value; + } + + return self.allocator.create(T) catch unreachable; + } + + pub fn put(self: *This, value: *T) void { + if (self.hive.put(value)) return; + + self.allocator.destroy(value); + } + }; }; } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 114181c60..495a3889f 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -190,6 +190,11 @@ fn NewHTTPContext(comptime ssl: bool) type { socket, ); } else { + // trailing zero is fine to ignore + if (strings.eqlComptime(buf, "0\r\n")) { + return; + } + log("Unexpected data on socket", .{}); } } diff --git a/src/jsc.zig b/src/jsc.zig index 07f5d4d26..bd450b2a2 100644 --- a/src/jsc.zig +++ b/src/jsc.zig @@ -46,6 +46,7 @@ pub const Node = struct { pub usingnamespace @import("./bun.js/node/node_os.zig"); pub const Syscall = @import("./bun.js/node/syscall.zig"); pub const fs = @import("./bun.js/node/node_fs_constant.zig"); + pub const invalid_fd = std.math.maxInt(i32); }; pub const Maybe = Node.Maybe; pub const jsNumber = @This().JSValue.jsNumber; diff --git a/src/report.zig b/src/report.zig index 8037c2899..b1ad68594 100644 --- a/src/report.zig +++ b/src/report.zig @@ -202,7 +202,7 @@ pub fn fatal(err_: ?anyerror, msg_: ?string) void { ); } else { crash_report_writer.print( - "\n<r>an uh-oh: {s}\n\n", + "\n<r>Panic: {s}\n\n", .{msg[0..len]}, ); } diff --git a/src/tagged_pointer.zig b/src/tagged_pointer.zig index dd6215439..564569c7e 100644 --- a/src/tagged_pointer.zig +++ b/src/tagged_pointer.zig @@ -129,6 +129,10 @@ pub fn TaggedPointerUnion(comptime Types: anytype) type { return this.repr.data == comptime @enumToInt(@field(Tag, typeBaseName(@typeName(Type)))); } + pub fn set(this: *@This(), _ptr: anytype) void { + this.* = @This().init(_ptr); + } + pub inline fn isValidPtr(_ptr: ?*anyopaque) bool { return This.isValid(This.from(_ptr)); } diff --git a/test/bun.js/child_process-node.test.js b/test/bun.js/child_process-node.test.js index 17b2c4cfb..0f04bb2cd 100644 --- a/test/bun.js/child_process-node.test.js +++ b/test/bun.js/child_process-node.test.js @@ -8,12 +8,11 @@ import { createCallCheckCtx, createDoneDotAll, } from "node-test-helpers"; +import { tmpdir } from "node:os"; const debug = process.env.DEBUG ? console.log : () => {}; -const platformTmpDir = `${process.platform === "darwin" ? "/private" : ""}${ - process.env.TMPDIR -}`.slice(0, -1); // remove trailing slash +const platformTmpDir = tmpdir(); // Copyright Joyent, Inc. and other Node contributors. // @@ -131,23 +130,29 @@ describe("ChildProcess.spawn()", () => { }); describe("ChildProcess.spawn", () => { - const child = new ChildProcess(); - child.spawn({ - file: "bun", - // file: process.execPath, - args: ["--interactive"], - cwd: process.cwd(), - stdio: "pipe", - }); + function getChild() { + const child = new ChildProcess(); + child.spawn({ + file: "node", + // file: process.execPath, + args: ["--interactive"], + cwd: process.cwd(), + stdio: ["ignore", "ignore", "ignore", "ipc"], + }); + return child; + } it("should spawn a process", () => { + const child = getChild(); // Test that we can call spawn strictEqual(Object.hasOwn(child, "pid"), true); assert(Number.isInteger(child.pid)); + child.kill(); }); it("should throw error on invalid signal", () => { + const child = getChild(); // Try killing with invalid signal throws( () => { @@ -158,6 +163,7 @@ describe("ChildProcess.spawn", () => { }); it("should die when killed", () => { + const child = getChild(); strictEqual(child.kill(), true); }); }); @@ -250,6 +256,7 @@ describe("child_process cwd", () => { let data = ""; child.stdout.on("data", (chunk) => { data += chunk; + console.trace("here"); }); // TODO: Test exit events @@ -315,7 +322,7 @@ describe("child_process cwd", () => { // }); it("should work for valid given cwd", (done) => { - const tmpdir = { path: Bun.env.TMPDIR }; + const tmpdir = { path: platformTmpDir }; const createDone = createDoneDotAll(done); // Assume these exist, and 'pwd' gives us the right directory back @@ -373,115 +380,115 @@ describe("child_process default options", () => { // because the process can exit before the stream is closed and the data is read child.stdout.on("close", () => { assertOk( - response.includes(`TMPDIR=${process.env.TMPDIR}`), + response.includes(`TMPDIR=${platformTmpDir}`), "spawn did not use process.env as default " + - `(process.env.TMPDIR = ${process.env.TMPDIR})`, + `(process.env.TMPDIR=${platformTmpDir})`, ); done(); }); }); }); -describe("child_process double pipe", () => { - it("should allow two pipes to be used at once", (done) => { - const { mustCallAtLeast, mustCall } = createCallCheckCtx(done); - let grep, sed, echo; - grep = spawn("grep", ["o"]); - sed = spawn("sed", ["s/o/O/"]); - echo = spawn("echo", ["hello\nnode\nand\nworld\n"]); - - // pipe echo | grep - echo.stdout.on( - "data", - mustCallAtLeast((data) => { - debug(`grep stdin write ${data.length}`); - if (!grep.stdin.write(data)) { - echo.stdout.pause(); - } - }), - ); - - // TODO(Derrick): We don't implement the full API for this yet, - // So stdin has no 'drain' event. - // // TODO(@jasnell): This does not appear to ever be - // // emitted. It's not clear if it is necessary. - // grep.stdin.on("drain", (data) => { - // echo.stdout.resume(); - // }); - - // Propagate end from echo to grep - echo.stdout.on( - "end", - mustCall(() => { - debug("echo stdout end"); - grep.stdin.end(); - }), - ); - - echo.on( - "exit", - mustCall(() => { - debug("echo exit"); - }), - ); - - grep.on( - "exit", - mustCall(() => { - debug("grep exit"); - }), - ); - - sed.on( - "exit", - mustCall(() => { - debug("sed exit"); - }), - ); - - // pipe grep | sed - grep.stdout.on( - "data", - mustCallAtLeast((data) => { - debug(`grep stdout ${data.length}`); - if (!sed.stdin.write(data)) { - grep.stdout.pause(); - } - }), - ); - - // // TODO(@jasnell): This does not appear to ever be - // // emitted. It's not clear if it is necessary. - // sed.stdin.on("drain", (data) => { - // grep.stdout.resume(); - // }); - - // Propagate end from grep to sed - grep.stdout.on( - "end", - mustCall((code) => { - debug("grep stdout end"); - sed.stdin.end(); - }), - ); - - let result = ""; - - // print sed's output - sed.stdout.on( - "data", - mustCallAtLeast((data) => { - result += data.toString("utf8"); - debug(data); - }), - ); - - sed.stdout.on( - "end", - mustCall(() => { - debug("result: " + result); - strictEqual(result, `hellO\nnOde\nwOrld\n`); - }), - ); - }); -}); +// describe("child_process double pipe", () => { +// it("should allow two pipes to be used at once", (done) => { +// const { mustCallAtLeast, mustCall } = createCallCheckCtx(done); +// let grep, sed, echo; +// grep = spawn("grep", ["o"]); +// sed = spawn("sed", ["s/o/O/"]); +// echo = spawn("echo", ["hello\nnode\nand\nworld\n"]); + +// // pipe echo | grep +// echo.stdout.on( +// "data", +// mustCallAtLeast((data) => { +// debug(`grep stdin write ${data.length}`); +// if (!grep.stdin.write(data)) { +// echo.stdout.pause(); +// } +// }), +// ); + +// // TODO(Derrick): We don't implement the full API for this yet, +// // So stdin has no 'drain' event. +// // // TODO(@jasnell): This does not appear to ever be +// // // emitted. It's not clear if it is necessary. +// // grep.stdin.on("drain", (data) => { +// // echo.stdout.resume(); +// // }); + +// // Propagate end from echo to grep +// echo.stdout.on( +// "end", +// mustCall(() => { +// debug("echo stdout end"); +// grep.stdin.end(); +// }), +// ); + +// echo.on( +// "exit", +// mustCall(() => { +// debug("echo exit"); +// }), +// ); + +// grep.on( +// "exit", +// mustCall(() => { +// debug("grep exit"); +// }), +// ); + +// sed.on( +// "exit", +// mustCall(() => { +// debug("sed exit"); +// }), +// ); + +// // pipe grep | sed +// grep.stdout.on( +// "data", +// mustCallAtLeast((data) => { +// debug(`grep stdout ${data.length}`); +// if (!sed.stdin.write(data)) { +// grep.stdout.pause(); +// } +// }), +// ); + +// // // TODO(@jasnell): This does not appear to ever be +// // // emitted. It's not clear if it is necessary. +// // sed.stdin.on("drain", (data) => { +// // grep.stdout.resume(); +// // }); + +// // Propagate end from grep to sed +// grep.stdout.on( +// "end", +// mustCall((code) => { +// debug("grep stdout end"); +// sed.stdin.end(); +// }), +// ); + +// let result = ""; + +// // print sed's output +// sed.stdout.on( +// "data", +// mustCallAtLeast((data) => { +// result += data.toString("utf8"); +// debug(data); +// }), +// ); + +// sed.stdout.on( +// "end", +// mustCall(() => { +// debug("result: " + result); +// strictEqual(result, `hellO\nnOde\nwOrld\n`); +// }), +// ); +// }); +// }); diff --git a/test/bun.js/child_process.test.ts b/test/bun.js/child_process.test.ts index f39629169..c862ff1b4 100644 --- a/test/bun.js/child_process.test.ts +++ b/test/bun.js/child_process.test.ts @@ -9,12 +9,11 @@ import { execFileSync, execSync, } from "node:child_process"; +import { tmpdir } from "node:os"; const debug = process.env.DEBUG ? console.log : () => {}; -const platformTmpDir = `${process.platform === "darwin" ? "/private" : ""}${ - process.env.TMPDIR -}`.slice(0, -1); // remove trailing slash +const platformTmpDir = tmpdir(); // Semver regex: https://gist.github.com/jhorsman/62eeea161a13b80e39f5249281e17c39?permalink_comment_id=2896416#gistcomment-2896416 // Not 100% accurate, but good enough for this test @@ -122,7 +121,7 @@ describe("spawn()", () => { }); it("should allow us to set cwd", async () => { - const child = spawn("pwd", { cwd: process.env.TMPDIR }); + const child = spawn("pwd", { cwd: platformTmpDir }); const result: string = await new Promise((resolve) => { child.stdout.on("data", (data) => { resolve(data.toString()); @@ -261,10 +260,14 @@ describe("execFileSync()", () => { }); it("should allow us to pass input to the command", () => { - const result = execFileSync("node", ["spawned-child.js", "STDIN"], { - input: "hello world!", - encoding: "utf8", - }); + const result = execFileSync( + "node", + [import.meta.dir + "/spawned-child.js", "STDIN"], + { + input: "hello world!", + encoding: "utf8", + }, + ); expect(result.trim()).toBe("hello world!"); }); }); @@ -278,11 +281,17 @@ describe("execSync()", () => { describe("Bun.spawn()", () => { it("should return exit code 0 on successful execution", async () => { + const proc = Bun.spawn({ + cmd: ["echo", "hello"], + stdout: "pipe", + }); + + for await (const chunk of proc.stdout!) { + const text = new TextDecoder().decode(chunk); + expect(text.trim()).toBe("hello"); + } + const result = await new Promise((resolve) => { - const proc = Bun.spawn({ - cmd: ["echo", "hello"], - stdout: "inherit", - }); const maybeExited = Bun.peek(proc.exited); if (maybeExited === proc.exited) { proc.exited.then((code) => resolve(code)); diff --git a/test/bun.js/filesink.test.ts b/test/bun.js/filesink.test.ts index b3b3a7e74..b4a178613 100644 --- a/test/bun.js/filesink.test.ts +++ b/test/bun.js/filesink.test.ts @@ -1,5 +1,6 @@ import { ArrayBufferSink } from "bun"; import { describe, expect, it } from "bun:test"; +import { mkfifo } from "mkfifo"; describe("FileSink", () => { const fixtures = [ @@ -66,64 +67,100 @@ describe("FileSink", () => { ], ] as const; - for (const [input, expected, label] of fixtures) { - it(`${JSON.stringify(label)}`, async () => { - const path = `/tmp/bun-test-${Bun.hash(label).toString(10)}.txt`; - try { - require("fs").unlinkSync(path); - } catch (e) {} + function getPath(label) { + const path = `/tmp/bun-test-${Bun.hash(label).toString(10)}.txt`; + try { + require("fs").unlinkSync(path); + } catch (e) {} + return path; + } - const sink = Bun.file(path).writer(); - for (let i = 0; i < input.length; i++) { - sink.write(input[i]); - } - await sink.end(); + var activeFIFO: Promise<string>; + var decoder = new TextDecoder(); - const output = new Uint8Array(await Bun.file(path).arrayBuffer()); - for (let i = 0; i < expected.length; i++) { - expect(output[i]).toBe(expected[i]); + function getFd(label) { + const path = `/tmp/bun-test-${Bun.hash(label).toString(10)}.txt`; + try { + require("fs").unlinkSync(path); + } catch (e) {} + mkfifo(path, 0o666); + activeFIFO = (async function (stream: ReadableStream<Uint8Array>) { + var chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); } - expect(output.byteLength).toBe(expected.byteLength); - }); + return Buffer.concat(chunks).toString(); + // test it on a small chunk size + })(Bun.file(path).stream(4)); + return path; + } - it(`flushing -> ${JSON.stringify(label)}`, async () => { - const path = `/tmp/bun-test-${Bun.hash(label).toString(10)}.txt`; - try { - require("fs").unlinkSync(path); - } catch (e) {} + for (let isPipe of [true, false] as const) { + describe(isPipe ? "pipe" : "file", () => { + for (const [input, expected, label] of fixtures) { + var getPathOrFd = () => (isPipe ? getFd(label) : getPath(label)); - const sink = Bun.file(path).writer(); - for (let i = 0; i < input.length; i++) { - sink.write(input[i]); - await sink.flush(); - } - await sink.end(); + it(`${JSON.stringify(label)}`, async () => { + const path = getPathOrFd(); + const sink = Bun.file(path).writer(); + for (let i = 0; i < input.length; i++) { + sink.write(input[i]); + } + await sink.end(); - const output = new Uint8Array(await Bun.file(path).arrayBuffer()); - for (let i = 0; i < expected.length; i++) { - expect(output[i]).toBe(expected[i]); - } - expect(output.byteLength).toBe(expected.byteLength); - }); + if (!isPipe) { + const output = new Uint8Array(await Bun.file(path).arrayBuffer()); + for (let i = 0; i < expected.length; i++) { + expect(output[i]).toBe(expected[i]); + } + expect(output.byteLength).toBe(expected.byteLength); + } else { + const output = await activeFIFO; + expect(output).toBe(decoder.decode(expected)); + } + }); - it(`highWaterMark -> ${JSON.stringify(label)}`, async () => { - const path = `/tmp/bun-test-${Bun.hash(label).toString(10)}.txt`; - try { - require("fs").unlinkSync(path); - } catch (e) {} + it(`flushing -> ${JSON.stringify(label)}`, async () => { + const path = getPathOrFd(); + const sink = Bun.file(path).writer(); + for (let i = 0; i < input.length; i++) { + sink.write(input[i]); + await sink.flush(); + } + await sink.end(); + if (!isPipe) { + const output = new Uint8Array(await Bun.file(path).arrayBuffer()); + for (let i = 0; i < expected.length; i++) { + expect(output[i]).toBe(expected[i]); + } + expect(output.byteLength).toBe(expected.byteLength); + } else { + const output = await activeFIFO; + expect(output).toBe(decoder.decode(expected)); + } + }); - const sink = Bun.file(path).writer({ highWaterMark: 1 }); - for (let i = 0; i < input.length; i++) { - sink.write(input[i]); - await sink.flush(); - } - await sink.end(); + it(`highWaterMark -> ${JSON.stringify(label)}`, async () => { + const path = getPathOrFd(); + const sink = Bun.file(path).writer({ highWaterMark: 1 }); + for (let i = 0; i < input.length; i++) { + sink.write(input[i]); + await sink.flush(); + } + await sink.end(); - const output = new Uint8Array(await Bun.file(path).arrayBuffer()); - for (let i = 0; i < expected.length; i++) { - expect(output[i]).toBe(expected[i]); + if (!isPipe) { + const output = new Uint8Array(await Bun.file(path).arrayBuffer()); + for (let i = 0; i < expected.length; i++) { + expect(output[i]).toBe(expected[i]); + } + expect(output.byteLength).toBe(expected.byteLength); + } else { + const output = await activeFIFO; + expect(output).toBe(decoder.decode(expected)); + } + }); } - expect(output.byteLength).toBe(expected.byteLength); }); } }); diff --git a/test/bun.js/streams.test.js b/test/bun.js/streams.test.js index c6d69ab08..75ac964ca 100644 --- a/test/bun.js/streams.test.js +++ b/test/bun.js/streams.test.js @@ -218,7 +218,8 @@ it("Bun.file() read text from pipe", async () => { mkfifo("/tmp/fifo", 0o666); - const large = "HELLO!".repeat((((1024 * 512) / "HELLO!".length) | 0) + 1); + // 65k so its less than the max on linux + const large = "HELLO!".repeat((((1024 * 65) / "HELLO!".length) | 0) + 1); const chunks = []; var out = Bun.file("/tmp/fifo").stream(); |