aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/base.zig
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bun.js/base.zig546
1 files changed, 544 insertions, 2 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 225536591..37eb3e626 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -3935,7 +3935,7 @@ pub const PollRef = struct {
this.status = .done;
}
- /// Only intended to be used from EventLoop.Poller
+ /// Only intended to be used from EventLoop.Pollable
pub fn deactivate(this: *PollRef, loop: *uws.Loop) void {
if (this.status != .active)
return;
@@ -3945,7 +3945,7 @@ pub const PollRef = struct {
loop.active -= 1;
}
- /// Only intended to be used from EventLoop.Poller
+ /// Only intended to be used from EventLoop.Pollable
pub fn activate(this: *PollRef, loop: *uws.Loop) void {
if (this.status != .inactive)
return;
@@ -3980,6 +3980,548 @@ pub const PollRef = struct {
}
};
+pub const FilePoll = struct {
+ fd: u32 = invalid_fd,
+ flags: Flags.Set = Flags.Set{},
+ owner: Owner = Deactivated.owner,
+
+ const FileBlobLoader = JSC.WebCore.FileBlobLoader;
+ const FileSink = JSC.WebCore.FileSink;
+ const Subprocess = JSC.Subprocess;
+ const BufferedInput = Subprocess.BufferedInput;
+ const BufferedOutput = Subprocess.BufferedOutput;
+ const Deactivated = opaque {
+ pub var owner = Owner.init(@intToPtr(*Deactivated, @as(usize, 0xDEADBEEF)));
+ };
+
+ pub const Owner = bun.TaggedPointerUnion(.{
+ FileBlobLoader,
+ FileSink,
+ Subprocess,
+ BufferedInput,
+ BufferedOutput,
+ Deactivated,
+ });
+
+ fn updateFlags(poll: *FilePoll, updated: Flags.Set) void {
+ var flags = poll.flags;
+ flags.remove(.readable);
+ flags.remove(.writable);
+ flags.remove(.process);
+ flags.remove(.eof);
+
+ flags.setUnion(updated);
+ poll.flags = flags;
+ }
+
+ pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void {
+ poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*));
+ poll.onUpdate(loop);
+ }
+
+ pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void {
+ poll.updateFlags(Flags.fromEpollEvent(epoll_event.*));
+ poll.onUpdate(loop);
+ }
+
+ pub fn clearEvent(poll: *FilePoll, flag: Flags) void {
+ poll.flags.remove(flag);
+ }
+
+ pub fn isReadable(this: *FilePoll) bool {
+ const readable = this.flags.contains(.readable);
+ this.flags.remove(.readable);
+ return readable;
+ }
+
+ pub fn isHUP(this: *FilePoll) bool {
+ const readable = this.flags.contains(.hup);
+ this.flags.remove(.hup);
+ return readable;
+ }
+
+ pub fn isEOF(this: *FilePoll) bool {
+ const readable = this.flags.contains(.eof);
+ this.flags.remove(.eof);
+ return readable;
+ }
+
+ pub fn isWritable(this: *FilePoll) bool {
+ const readable = this.flags.contains(.writable);
+ this.flags.remove(.writable);
+ return readable;
+ }
+
+ pub fn deinit(this: *FilePoll) void {
+ var vm = JSC.VirtualMachine.vm;
+ this.deinitWithVM(vm);
+ }
+
+ pub fn deinitWithVM(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (this.isRegistered()) {
+ _ = this.unregister(vm.uws_event_loop.?);
+ }
+
+ this.owner = Deactivated.owner;
+ this.flags = Flags.Set{};
+ this.fd = invalid_fd;
+ vm.rareData().filePolls(vm).put(this);
+ }
+
+ pub fn isRegistered(this: *const FilePoll) bool {
+ return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process);
+ }
+
+ pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void {
+ if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) {
+ if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop);
+ poll.flags.insert(.needs_rearm);
+ }
+ var ptr = poll.owner;
+ switch (ptr.tag()) {
+ @field(Owner.Tag, "FileBlobLoader") => {
+ log("onUpdate: FileBlobLoader", .{});
+ ptr.as(FileBlobLoader).onPoll(0, 0);
+ },
+ @field(Owner.Tag, "Subprocess") => {
+ log("onUpdate: Subprocess", .{});
+ var loader = ptr.as(JSC.Subprocess);
+
+ loader.onExitNotification();
+ },
+ @field(Owner.Tag, "FileSink") => {
+ log("onUpdate: FileSink", .{});
+ var loader = ptr.as(JSC.WebCore.FileSink);
+ loader.onPoll(0, 0);
+ },
+
+ @field(Owner.Tag, "BufferedInput") => {
+ log("onUpdate: BufferedInput", .{});
+ var loader = ptr.as(JSC.Subprocess.BufferedInput);
+ loader.onReady(0);
+ },
+ @field(Owner.Tag, "BufferedOutput") => {
+ log("onUpdate: BufferedOutput", .{});
+ var loader = ptr.as(JSC.Subprocess.BufferedOutput);
+ loader.ready(0);
+ },
+ else => {},
+ }
+ }
+
+ pub const Flags = enum {
+ // What are we asking the event loop about?
+
+ /// Poll for readable events
+ poll_readable,
+
+ /// Poll for writable events
+ poll_writable,
+
+ /// Poll for process-related events
+ poll_process,
+
+ // What did the event loop tell us?
+ readable,
+ writable,
+ process,
+ eof,
+ hup,
+
+ // What is the type of file descriptor?
+ fifo,
+ tty,
+
+ one_shot,
+ needs_rearm,
+
+ has_incremented_poll_count,
+
+ disable,
+
+ pub fn poll(this: Flags) Flags {
+ return switch (this) {
+ .readable => .poll_readable,
+ .writable => .poll_writable,
+ .process => .poll_process,
+ else => this,
+ };
+ }
+
+ pub const Set = std.EnumSet(Flags);
+ pub const Struct = std.enums.EnumFieldStruct(Flags, bool, false);
+
+ pub fn fromKQueueEvent(kqueue_event: std.os.system.kevent64_s) Flags.Set {
+ var flags = Flags.Set{};
+ if (kqueue_event.filter == std.os.system.EVFILT_READ) {
+ flags.insert(Flags.readable);
+ if (kqueue_event.flags & std.os.system.EV_EOF != 0) {
+ flags.insert(Flags.eof);
+ }
+ } else if (kqueue_event.filter == std.os.system.EVFILT_WRITE) {
+ flags.insert(Flags.writable);
+ if (kqueue_event.flags & std.os.system.EV_EOF != 0) {
+ flags.insert(Flags.hup);
+ }
+ } else if (kqueue_event.filter == std.os.system.EVFILT_PROC) {
+ flags.insert(Flags.process);
+ }
+ return flags;
+ }
+
+ pub fn fromEpollEvent(epoll: std.os.linux.epoll_event) Flags.Set {
+ var flags = Flags.Set{};
+ if (epoll.events & std.os.linux.EPOLL.IN != 0) {
+ flags.insert(Flags.readable);
+ log("readable", .{});
+ }
+ if (epoll.events & std.os.linux.EPOLL.OUT != 0) {
+ flags.insert(Flags.writable);
+ log("writable", .{});
+ }
+ if (epoll.events & std.os.linux.EPOLL.ERR != 0) {
+ flags.insert(Flags.eof);
+ log("eof", .{});
+ }
+ if (epoll.events & std.os.linux.EPOLL.HUP != 0) {
+ flags.insert(Flags.hup);
+ log("hup", .{});
+ }
+ return flags;
+ }
+ };
+
+ pub const HiveArray = bun.HiveArray(FilePoll, 128).Fallback;
+
+ const log = Output.scoped(.FilePoll, false);
+
+ pub inline fn isActive(this: *const FilePoll) bool {
+ return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable);
+ }
+
+ /// Make calling ref() on this poll into a no-op.
+ pub fn disable(this: *FilePoll) void {
+ if (this.isRegistered()) {
+ this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?);
+ }
+
+ this.unref();
+ this.flags.insert(.disable);
+ }
+
+ /// Only intended to be used from EventLoop.Pollable
+ pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void {
+ std.debug.assert(this.flags.contains(.has_incremented_poll_count));
+ this.flags.remove(.has_incremented_poll_count);
+ loop.num_polls -= 1;
+ loop.active -= 1;
+ }
+
+ /// Only intended to be used from EventLoop.Pollable
+ pub fn activate(this: *FilePoll, loop: *uws.Loop) void {
+ std.debug.assert(!this.flags.contains(.has_incremented_poll_count));
+ std.debug.assert(!this.flags.contains(.disable));
+ this.flags.insert(.has_incremented_poll_count);
+ loop.num_polls += 1;
+ loop.active += 1;
+ }
+
+ pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll {
+ return initWithOwner(vm, fd, flags, Owner.init(owner));
+ }
+
+ pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll {
+ var poll = vm.rareData().filePolls(vm).get();
+ poll.* = .{
+ .fd = @intCast(u32, fd),
+ .flags = Flags.Set.init(flags),
+ .owner = owner,
+ };
+ return poll;
+ }
+
+ /// Prevent a poll from keeping the process alive.
+ pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (!this.isActive())
+ return;
+ log("unref", .{});
+ this.deactivate(vm.uws_event_loop.?);
+ }
+
+ /// Allow a poll to keep the process alive.
+ pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (this.isActive())
+ return;
+ log("ref", .{});
+ this.activate(vm.uws_event_loop.?);
+ }
+
+ pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void {
+ var tag = Pollable.from(tagged_pointer);
+
+ if (tag.tag() != @field(Pollable.Tag, "FilePoll"))
+ return;
+
+ var file_poll = tag.as(FilePoll);
+ if (comptime Environment.isMac)
+ onKQueueEvent(file_poll, loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)])
+ else if (comptime Environment.isLinux)
+ onEpollEvent(file_poll, loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]);
+ }
+
+ const Pollable = bun.TaggedPointerUnion(
+ .{
+ FilePoll,
+ Deactivated,
+ },
+ );
+
+ comptime {
+ @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" });
+ }
+
+ const timeout = std.mem.zeroes(std.os.timespec);
+ const kevent = std.c.kevent;
+ const linux = std.os.linux;
+ pub fn register(this: *FilePoll, loop: *uws.Loop, flag: Flags, one_shot: bool) JSC.Maybe(void) {
+ const watcher_fd = loop.fd;
+ const fd = this.fd;
+
+ log("register: {s} ({d})", .{ @tagName(flag), fd });
+
+ if (one_shot) {
+ this.flags.insert(.one_shot);
+ }
+
+ std.debug.assert(this.fd != invalid_fd);
+
+ if (comptime Environment.isLinux) {
+ const flags: u32 = switch (flag) {
+ .process,
+ .readable,
+ => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT),
+ .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT),
+ else => unreachable,
+ };
+
+ var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(this).ptr()) } };
+
+ const ctl = linux.epoll_ctl(
+ watcher_fd,
+ if (this.isRegistered() or this.flags.contains(.needs_rearm)) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD,
+ @intCast(std.os.fd_t, fd),
+ &event,
+ );
+
+ if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
+ return errno;
+ }
+ } else if (comptime Environment.isMac) {
+ var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
+ changelist[0] = switch (flag) {
+ .readable => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_READ,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(this).ptr()),
+ .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .writable => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_WRITE,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(this).ptr()),
+ .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .process => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_PROC,
+ .data = 0,
+ .fflags = std.c.NOTE_EXIT,
+ .udata = @ptrToInt(Pollable.init(this).ptr()),
+ .flags = std.c.EV_ADD,
+ .ext = .{ 0, 0 },
+ },
+ };
+
+ // output events only include change errors
+ const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
+
+ // The kevent() system call returns the number of events placed in
+ // the eventlist, up to the value given by nevents. If the time
+ // limit expires, then kevent() returns 0.
+ const rc = rc: {
+ while (true) {
+ const rc = std.os.system.kevent64(
+ watcher_fd,
+ &changelist,
+ 1,
+ // The same array may be used for the changelist and eventlist.
+ &changelist,
+ 1,
+ KEVENT_FLAG_ERROR_EVENTS,
+ &timeout,
+ );
+
+ if (std.c.getErrno(rc) == .INTR) continue;
+ break :rc rc;
+ }
+ };
+
+ // If an error occurs while
+ // processing an element of the changelist and there is enough room
+ // in the eventlist, then the event will be placed in the eventlist
+ // with EV_ERROR set in flags and the system error in data.
+ if (changelist[0].flags == std.c.EV_ERROR) {
+ return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
+ // Otherwise, -1 will be returned, and errno will be set to
+ // indicate the error condition.
+ }
+
+ const errno = std.c.getErrno(rc);
+
+ if (errno != .SUCCESS) {
+ switch (rc) {
+ std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
+ else => unreachable,
+ }
+ }
+ } else {
+ @compileError("TODO: Pollable");
+ }
+
+ if (!this.isActive()) this.activate(loop);
+ this.flags.insert(switch (flag) {
+ .process, .readable => .poll_readable,
+ .writable => .poll_writable,
+ else => unreachable,
+ });
+ return JSC.Maybe(void).success;
+ }
+
+ pub const invalid_fd = std.math.maxInt(u32);
+
+ pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) {
+ if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) {
+ // no-op
+ return JSC.Maybe(void).success;
+ }
+
+ const fd = this.fd;
+ std.debug.assert(fd != invalid_fd);
+ const watcher_fd = loop.fd;
+ const flag: Flags = brk: {
+ if (this.flags.contains(.poll_readable))
+ break :brk .readable;
+ if (this.flags.contains(.poll_writable))
+ break :brk .writable;
+ if (this.flags.contains(.poll_process))
+ break :brk .process;
+ return JSC.Maybe(void).success;
+ };
+
+ log("unregister: {s} ({d})", .{ @tagName(flag), fd });
+
+ if (comptime Environment.isLinux) {
+ const ctl = linux.epoll_ctl(
+ watcher_fd,
+ linux.EPOLL.CTL_DEL,
+ @intCast(std.os.fd_t, fd),
+ null,
+ );
+
+ if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
+ return errno;
+ }
+ } else if (comptime Environment.isMac) {
+ var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
+
+ changelist[0] = switch (flag) {
+ .read => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_READ,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(this).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .write => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_WRITE,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(this).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .process => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_PROC,
+ .data = 0,
+ .fflags = std.c.NOTE_EXIT,
+ .udata = @ptrToInt(Pollable.init(this).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ else => unreachable,
+ };
+
+ // output events only include change errors
+ const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
+
+ // The kevent() system call returns the number of events placed in
+ // the eventlist, up to the value given by nevents. If the time
+ // limit expires, then kevent() returns 0.
+ const rc = std.os.system.kevent64(
+ watcher_fd,
+ &changelist,
+ 1,
+ // The same array may be used for the changelist and eventlist.
+ &changelist,
+ 1,
+ KEVENT_FLAG_ERROR_EVENTS,
+ &timeout,
+ );
+ // If an error occurs while
+ // processing an element of the changelist and there is enough room
+ // in the eventlist, then the event will be placed in the eventlist
+ // with EV_ERROR set in flags and the system error in data.
+ if (changelist[0].flags == std.c.EV_ERROR) {
+ return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
+ // Otherwise, -1 will be returned, and errno will be set to
+ // indicate the error condition.
+ }
+
+ const errno = std.c.getErrno(rc);
+
+ switch (rc) {
+ std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
+ else => unreachable,
+ }
+ } else {
+ @compileError("TODO: Pollable");
+ }
+
+ this.flags.remove(.needs_rearm);
+ this.flags.remove(.one_shot);
+ // we don't support both right now
+ std.debug.assert(!(this.flags.contains(.poll_readable) and this.flags.contains(.poll_writable)));
+ this.flags.remove(.poll_readable);
+ this.flags.remove(.poll_writable);
+ this.flags.remove(.poll_process);
+
+ if (this.isActive())
+ this.deactivate(loop);
+
+ return JSC.Maybe(void).success;
+ }
+};
+
pub const Strong = extern struct {
ref: ?*JSC.napi.Ref = null,