diff options
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r-- | src/bun.js/event_loop.zig | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 238b3907f..747bf01e0 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -463,17 +463,28 @@ pub const Poller = struct { @field(Pollable.Tag, "FileBlobLoader") => { var loader = ptr.as(FileBlobLoader); loop.active -= 1; + loop.num_polls -= 1; + loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags); }, @field(Pollable.Tag, "Subprocess") => { var loader = ptr.as(JSC.Subprocess); loop.num_polls -= 1; + loop.active -= 1; // kqueue sends the same notification multiple times in the same tick potentially // so we have to dedupe it _ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable; }, + @field(Pollable.Tag, "FileSink") => { + var loader = ptr.as(JSC.WebCore.FileSink); + + loop.num_polls -= 1; + loop.active -= 1; + + loader.onPoll(0, 0); + }, else => |tag| { bun.Output.panic( "Internal error\nUnknown pollable tag: {d}\n", @@ -489,17 +500,28 @@ pub const Poller = struct { @field(Pollable.Tag, "FileBlobLoader") => { var loader = ptr.as(FileBlobLoader); loop.active -= 1; + loop.num_polls -= 1; + loader.onPoll(0, 0); }, @field(Pollable.Tag, "Subprocess") => { var loader = ptr.as(JSC.Subprocess); loop.num_polls -= 1; + loop.active -= 1; // kqueue sends the same notification multiple times in the same tick potentially // so we have to dedupe it _ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable; }, + @field(Pollable.Tag, "FileSink") => { + var loader = ptr.as(JSC.WebCore.FileSink); + + loop.num_polls -= 1; + loop.active -= 1; + + loader.onPoll(0, 0); + }, else => unreachable, } } @@ -627,6 +649,99 @@ pub const Poller = struct { } } + pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { + if (this.loop == null) { + this.loop = uws.Loop.get(); + JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; + } + const watcher_fd = this.loop.?.fd; + + if (comptime Environment.isLinux) { + const ctl = linux.epoll_ctl( + watcher_fd, + linux.EPOLL.CTL_DEL, + fd, + null, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; + } + + return JSC.Maybe(void).success; + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + changelist[0] = switch (flag) { + .read => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(ctx).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .write => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(ctx).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .process => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_PROC, + .data = 0, + .fflags = std.c.NOTE_EXIT, + .udata = @ptrToInt(Pollable.init(ctx).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + }; + + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = std.os.system.kevent64( + watcher_fd, + &changelist, + 1, + // The same array may be used for the changelist and eventlist. + &changelist, + 1, + KEVENT_FLAG_ERROR_EVENTS, + &timeout, + ); + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + const errno = std.c.getErrno(rc); + + if (errno == .SUCCESS) { + return JSC.Maybe(void).success; + } + + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, + else => unreachable, + } + } else { + @compileError("TODO: Poller"); + } + } + pub fn tick(this: *Poller) void { var loop = this.loop orelse return; if (loop.active == 0) return; |