diff options
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/base.zig | 546 |
1 files changed, 544 insertions, 2 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 225536591..37eb3e626 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -3935,7 +3935,7 @@ pub const PollRef = struct { this.status = .done; } - /// Only intended to be used from EventLoop.Poller + /// Only intended to be used from EventLoop.Pollable pub fn deactivate(this: *PollRef, loop: *uws.Loop) void { if (this.status != .active) return; @@ -3945,7 +3945,7 @@ pub const PollRef = struct { loop.active -= 1; } - /// Only intended to be used from EventLoop.Poller + /// Only intended to be used from EventLoop.Pollable pub fn activate(this: *PollRef, loop: *uws.Loop) void { if (this.status != .inactive) return; @@ -3980,6 +3980,548 @@ pub const PollRef = struct { } }; +pub const FilePoll = struct { + fd: u32 = invalid_fd, + flags: Flags.Set = Flags.Set{}, + owner: Owner = Deactivated.owner, + + const FileBlobLoader = JSC.WebCore.FileBlobLoader; + const FileSink = JSC.WebCore.FileSink; + const Subprocess = JSC.Subprocess; + const BufferedInput = Subprocess.BufferedInput; + const BufferedOutput = Subprocess.BufferedOutput; + const Deactivated = opaque { + pub var owner = Owner.init(@intToPtr(*Deactivated, @as(usize, 0xDEADBEEF))); + }; + + pub const Owner = bun.TaggedPointerUnion(.{ + FileBlobLoader, + FileSink, + Subprocess, + BufferedInput, + BufferedOutput, + Deactivated, + }); + + fn updateFlags(poll: *FilePoll, updated: Flags.Set) void { + var flags = poll.flags; + flags.remove(.readable); + flags.remove(.writable); + flags.remove(.process); + flags.remove(.eof); + + flags.setUnion(updated); + poll.flags = flags; + } + + pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void { + poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*)); + poll.onUpdate(loop); + } + + pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void { + poll.updateFlags(Flags.fromEpollEvent(epoll_event.*)); + poll.onUpdate(loop); + } + + pub fn clearEvent(poll: *FilePoll, flag: Flags) void { + poll.flags.remove(flag); + } + + pub fn isReadable(this: *FilePoll) bool { + const readable = this.flags.contains(.readable); + this.flags.remove(.readable); + return readable; + } + + pub fn isHUP(this: *FilePoll) bool { + const readable = this.flags.contains(.hup); + this.flags.remove(.hup); + return readable; + } + + pub fn isEOF(this: *FilePoll) bool { + const readable = this.flags.contains(.eof); + this.flags.remove(.eof); + return readable; + } + + pub fn isWritable(this: *FilePoll) bool { + const readable = this.flags.contains(.writable); + this.flags.remove(.writable); + return readable; + } + + pub fn deinit(this: *FilePoll) void { + var vm = JSC.VirtualMachine.vm; + this.deinitWithVM(vm); + } + + pub fn deinitWithVM(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (this.isRegistered()) { + _ = this.unregister(vm.uws_event_loop.?); + } + + this.owner = Deactivated.owner; + this.flags = Flags.Set{}; + this.fd = invalid_fd; + vm.rareData().filePolls(vm).put(this); + } + + pub fn isRegistered(this: *const FilePoll) bool { + return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process); + } + + pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void { + if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) { + if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop); + poll.flags.insert(.needs_rearm); + } + var ptr = poll.owner; + switch (ptr.tag()) { + @field(Owner.Tag, "FileBlobLoader") => { + log("onUpdate: FileBlobLoader", .{}); + ptr.as(FileBlobLoader).onPoll(0, 0); + }, + @field(Owner.Tag, "Subprocess") => { + log("onUpdate: Subprocess", .{}); + var loader = ptr.as(JSC.Subprocess); + + loader.onExitNotification(); + }, + @field(Owner.Tag, "FileSink") => { + log("onUpdate: FileSink", .{}); + var loader = ptr.as(JSC.WebCore.FileSink); + loader.onPoll(0, 0); + }, + + @field(Owner.Tag, "BufferedInput") => { + log("onUpdate: BufferedInput", .{}); + var loader = ptr.as(JSC.Subprocess.BufferedInput); + loader.onReady(0); + }, + @field(Owner.Tag, "BufferedOutput") => { + log("onUpdate: BufferedOutput", .{}); + var loader = ptr.as(JSC.Subprocess.BufferedOutput); + loader.ready(0); + }, + else => {}, + } + } + + pub const Flags = enum { + // What are we asking the event loop about? + + /// Poll for readable events + poll_readable, + + /// Poll for writable events + poll_writable, + + /// Poll for process-related events + poll_process, + + // What did the event loop tell us? + readable, + writable, + process, + eof, + hup, + + // What is the type of file descriptor? + fifo, + tty, + + one_shot, + needs_rearm, + + has_incremented_poll_count, + + disable, + + pub fn poll(this: Flags) Flags { + return switch (this) { + .readable => .poll_readable, + .writable => .poll_writable, + .process => .poll_process, + else => this, + }; + } + + pub const Set = std.EnumSet(Flags); + pub const Struct = std.enums.EnumFieldStruct(Flags, bool, false); + + pub fn fromKQueueEvent(kqueue_event: std.os.system.kevent64_s) Flags.Set { + var flags = Flags.Set{}; + if (kqueue_event.filter == std.os.system.EVFILT_READ) { + flags.insert(Flags.readable); + if (kqueue_event.flags & std.os.system.EV_EOF != 0) { + flags.insert(Flags.eof); + } + } else if (kqueue_event.filter == std.os.system.EVFILT_WRITE) { + flags.insert(Flags.writable); + if (kqueue_event.flags & std.os.system.EV_EOF != 0) { + flags.insert(Flags.hup); + } + } else if (kqueue_event.filter == std.os.system.EVFILT_PROC) { + flags.insert(Flags.process); + } + return flags; + } + + pub fn fromEpollEvent(epoll: std.os.linux.epoll_event) Flags.Set { + var flags = Flags.Set{}; + if (epoll.events & std.os.linux.EPOLL.IN != 0) { + flags.insert(Flags.readable); + log("readable", .{}); + } + if (epoll.events & std.os.linux.EPOLL.OUT != 0) { + flags.insert(Flags.writable); + log("writable", .{}); + } + if (epoll.events & std.os.linux.EPOLL.ERR != 0) { + flags.insert(Flags.eof); + log("eof", .{}); + } + if (epoll.events & std.os.linux.EPOLL.HUP != 0) { + flags.insert(Flags.hup); + log("hup", .{}); + } + return flags; + } + }; + + pub const HiveArray = bun.HiveArray(FilePoll, 128).Fallback; + + const log = Output.scoped(.FilePoll, false); + + pub inline fn isActive(this: *const FilePoll) bool { + return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable); + } + + /// Make calling ref() on this poll into a no-op. + pub fn disable(this: *FilePoll) void { + if (this.isRegistered()) { + this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?); + } + + this.unref(); + this.flags.insert(.disable); + } + + /// Only intended to be used from EventLoop.Pollable + pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void { + std.debug.assert(this.flags.contains(.has_incremented_poll_count)); + this.flags.remove(.has_incremented_poll_count); + loop.num_polls -= 1; + loop.active -= 1; + } + + /// Only intended to be used from EventLoop.Pollable + pub fn activate(this: *FilePoll, loop: *uws.Loop) void { + std.debug.assert(!this.flags.contains(.has_incremented_poll_count)); + std.debug.assert(!this.flags.contains(.disable)); + this.flags.insert(.has_incremented_poll_count); + loop.num_polls += 1; + loop.active += 1; + } + + pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { + return initWithOwner(vm, fd, flags, Owner.init(owner)); + } + + pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll { + var poll = vm.rareData().filePolls(vm).get(); + poll.* = .{ + .fd = @intCast(u32, fd), + .flags = Flags.Set.init(flags), + .owner = owner, + }; + return poll; + } + + /// Prevent a poll from keeping the process alive. + pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (!this.isActive()) + return; + log("unref", .{}); + this.deactivate(vm.uws_event_loop.?); + } + + /// Allow a poll to keep the process alive. + pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (this.isActive()) + return; + log("ref", .{}); + this.activate(vm.uws_event_loop.?); + } + + pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void { + var tag = Pollable.from(tagged_pointer); + + if (tag.tag() != @field(Pollable.Tag, "FilePoll")) + return; + + var file_poll = tag.as(FilePoll); + if (comptime Environment.isMac) + onKQueueEvent(file_poll, loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]) + else if (comptime Environment.isLinux) + onEpollEvent(file_poll, loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]); + } + + const Pollable = bun.TaggedPointerUnion( + .{ + FilePoll, + Deactivated, + }, + ); + + comptime { + @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" }); + } + + const timeout = std.mem.zeroes(std.os.timespec); + const kevent = std.c.kevent; + const linux = std.os.linux; + pub fn register(this: *FilePoll, loop: *uws.Loop, flag: Flags, one_shot: bool) JSC.Maybe(void) { + const watcher_fd = loop.fd; + const fd = this.fd; + + log("register: {s} ({d})", .{ @tagName(flag), fd }); + + if (one_shot) { + this.flags.insert(.one_shot); + } + + std.debug.assert(this.fd != invalid_fd); + + if (comptime Environment.isLinux) { + const flags: u32 = switch (flag) { + .process, + .readable, + => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), + .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), + else => unreachable, + }; + + var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(this).ptr()) } }; + + const ctl = linux.epoll_ctl( + watcher_fd, + if (this.isRegistered() or this.flags.contains(.needs_rearm)) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD, + @intCast(std.os.fd_t, fd), + &event, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; + } + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + changelist[0] = switch (flag) { + .readable => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .writable => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .process => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_PROC, + .data = 0, + .fflags = std.c.NOTE_EXIT, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_ADD, + .ext = .{ 0, 0 }, + }, + }; + + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = rc: { + while (true) { + const rc = std.os.system.kevent64( + watcher_fd, + &changelist, + 1, + // The same array may be used for the changelist and eventlist. + &changelist, + 1, + KEVENT_FLAG_ERROR_EVENTS, + &timeout, + ); + + if (std.c.getErrno(rc) == .INTR) continue; + break :rc rc; + } + }; + + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + const errno = std.c.getErrno(rc); + + if (errno != .SUCCESS) { + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, + else => unreachable, + } + } + } else { + @compileError("TODO: Pollable"); + } + + if (!this.isActive()) this.activate(loop); + this.flags.insert(switch (flag) { + .process, .readable => .poll_readable, + .writable => .poll_writable, + else => unreachable, + }); + return JSC.Maybe(void).success; + } + + pub const invalid_fd = std.math.maxInt(u32); + + pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) { + if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) { + // no-op + return JSC.Maybe(void).success; + } + + const fd = this.fd; + std.debug.assert(fd != invalid_fd); + const watcher_fd = loop.fd; + const flag: Flags = brk: { + if (this.flags.contains(.poll_readable)) + break :brk .readable; + if (this.flags.contains(.poll_writable)) + break :brk .writable; + if (this.flags.contains(.poll_process)) + break :brk .process; + return JSC.Maybe(void).success; + }; + + log("unregister: {s} ({d})", .{ @tagName(flag), fd }); + + if (comptime Environment.isLinux) { + const ctl = linux.epoll_ctl( + watcher_fd, + linux.EPOLL.CTL_DEL, + @intCast(std.os.fd_t, fd), + null, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; + } + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + + changelist[0] = switch (flag) { + .read => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .write => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .process => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_PROC, + .data = 0, + .fflags = std.c.NOTE_EXIT, + .udata = @ptrToInt(Pollable.init(this).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + else => unreachable, + }; + + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = std.os.system.kevent64( + watcher_fd, + &changelist, + 1, + // The same array may be used for the changelist and eventlist. + &changelist, + 1, + KEVENT_FLAG_ERROR_EVENTS, + &timeout, + ); + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + const errno = std.c.getErrno(rc); + + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, + else => unreachable, + } + } else { + @compileError("TODO: Pollable"); + } + + this.flags.remove(.needs_rearm); + this.flags.remove(.one_shot); + // we don't support both right now + std.debug.assert(!(this.flags.contains(.poll_readable) and this.flags.contains(.poll_writable))); + this.flags.remove(.poll_readable); + this.flags.remove(.poll_writable); + this.flags.remove(.poll_process); + + if (this.isActive()) + this.deactivate(loop); + + return JSC.Maybe(void).success; + } +}; + pub const Strong = extern struct { ref: ?*JSC.napi.Ref = null, |