diff options
author | 2022-11-12 18:30:12 -0800 | |
---|---|---|
committer | 2022-11-12 18:30:12 -0800 | |
commit | 21bf3ddaf23c842dc12a1d76dbd3b48daf08f349 (patch) | |
tree | 06706104877984e9f083fed7c3278c9d007193cc /src/bun.js/event_loop.zig | |
parent | 514f2a8eddf1a1d35a33cc096ed7403a79afe36f (diff) | |
download | bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.gz bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.zst bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.zip |
Redo how we poll pipes (#1496)
* Fix pipe
* Handle unregistered
* Fix failing test
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r-- | src/bun.js/event_loop.zig | 351 |
1 files changed, 2 insertions, 349 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 0c99a949a..b62ac0123 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -377,8 +377,9 @@ pub const EventLoop = struct { ctx.global.vm().releaseWeakRefs(); ctx.global.vm().drainMicrotasks(); + var loop = ctx.uws_event_loop orelse return; - if (ctx.poller.loop != null and ctx.poller.loop.?.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (ctx.uws_event_loop.?.num_polls > 0 or this.start_server_on_next_tick))) { + if (loop.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (loop.num_polls > 0 or this.start_server_on_next_tick))) { if (this.tickConcurrentWithCount() > 0) { this.tick(); } else { @@ -449,351 +450,3 @@ pub const EventLoop = struct { } } }; - -pub const Poller = struct { - /// kqueue() or epoll() - /// 0 == unset - loop: ?*uws.Loop = null, - - pub fn dispatchKQueueEvent(loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void { - if (comptime !Environment.isMac) { - unreachable; - } - var ptr = Pollable.from(@intToPtr(?*anyopaque, kqueue_event.udata)); - - switch (ptr.tag()) { - @field(Pollable.Tag, "FileBlobLoader") => { - var loader = ptr.as(FileBlobLoader); - loader.poll_ref.deactivate(loop); - - loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags); - }, - @field(Pollable.Tag, "Subprocess") => { - var loader = ptr.as(JSC.Subprocess); - - loader.poll_ref.deactivate(loop); - loader.onExitNotification(); - }, - @field(Pollable.Tag, "BufferedInput") => { - var loader = ptr.as(JSC.Subprocess.BufferedInput); - - loader.poll_ref.deactivate(loop); - - loader.onReady(@bitCast(i64, kqueue_event.data)); - }, - @field(Pollable.Tag, "BufferedOutput") => { - var loader = ptr.as(JSC.Subprocess.BufferedOutput); - - loader.poll_ref.deactivate(loop); - - loader.ready(@bitCast(i64, kqueue_event.data)); - }, - @field(Pollable.Tag, "FileSink") => { - var loader = ptr.as(JSC.WebCore.FileSink); - loader.poll_ref.deactivate(loop); - - loader.onPoll(0, 0); - }, - else => |tag| { - bun.Output.panic( - "Internal error\nUnknown pollable tag: {d}\n", - .{@enumToInt(tag)}, - ); - }, - } - } - - fn dispatchEpollEvent(loop: *uws.Loop, epoll_event: *linux.epoll_event) void { - var ptr = Pollable.from(@intToPtr(?*anyopaque, epoll_event.data.ptr)); - switch (ptr.tag()) { - @field(Pollable.Tag, "FileBlobLoader") => { - var loader = ptr.as(FileBlobLoader); - loader.poll_ref.deactivate(loop); - - loader.onPoll(0, 0); - }, - @field(Pollable.Tag, "Subprocess") => { - var loader = ptr.as(JSC.Subprocess); - loader.poll_ref.deactivate(loop); - - loader.onExitNotification(); - }, - @field(Pollable.Tag, "FileSink") => { - var loader = ptr.as(JSC.WebCore.FileSink); - loader.poll_ref.deactivate(loop); - - loader.onPoll(0, 0); - }, - - @field(Pollable.Tag, "BufferedInput") => { - var loader = ptr.as(JSC.Subprocess.BufferedInput); - - loader.poll_ref.deactivate(loop); - - loader.onReady(0); - }, - @field(Pollable.Tag, "BufferedOutput") => { - var loader = ptr.as(JSC.Subprocess.BufferedOutput); - - loader.poll_ref.deactivate(loop); - - loader.ready(0); - }, - else => unreachable, - } - } - - const timeout = std.mem.zeroes(std.os.timespec); - const linux = std.os.linux; - - const FileBlobLoader = JSC.WebCore.FileBlobLoader; - const FileSink = JSC.WebCore.FileSink; - const Subprocess = JSC.Subprocess; - const BufferedInput = Subprocess.BufferedInput; - const BufferedOutput = Subprocess.BufferedOutput; - /// epoll only allows one pointer - /// We unfortunately need two pointers: one for a function call and one for the context - /// We use a tagged pointer union and then call the function with the context pointer - pub const Pollable = TaggedPointerUnion(.{ - FileBlobLoader, - FileSink, - Subprocess, - BufferedInput, - BufferedOutput, - }); - const Kevent = std.os.Kevent; - const kevent = std.c.kevent; - - pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { - if (this.loop == null) { - this.loop = uws.Loop.get(); - JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; - } - const watcher_fd = this.loop.?.fd; - - if (comptime Environment.isLinux) { - const flags: u32 = switch (flag) { - .process, .read => linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.ONESHOT, - .write => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | linux.EPOLL.ONESHOT, - }; - - var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(ctx).ptr()) } }; - - const ctl = linux.epoll_ctl( - watcher_fd, - linux.EPOLL.CTL_ADD, - fd, - &event, - ); - - if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { - return errno; - } - - ctx.poll_ref.activate(this.loop.?); - - return JSC.Maybe(void).success; - } else if (comptime Environment.isMac) { - var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); - changelist[0] = switch (flag) { - .read => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_READ, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .write => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_WRITE, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .process => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_PROC, - .data = 0, - .fflags = std.c.NOTE_EXIT, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_ADD, - .ext = .{ 0, 0 }, - }, - }; - - // output events only include change errors - const KEVENT_FLAG_ERROR_EVENTS = 0x000002; - - // The kevent() system call returns the number of events placed in - // the eventlist, up to the value given by nevents. If the time - // limit expires, then kevent() returns 0. - const rc = rc: { - while (true) { - const rc = std.os.system.kevent64( - watcher_fd, - &changelist, - 1, - // The same array may be used for the changelist and eventlist. - &changelist, - 1, - KEVENT_FLAG_ERROR_EVENTS, - &timeout, - ); - - if (std.c.getErrno(rc) == .INTR) continue; - break :rc rc; - } - }; - - // If an error occurs while - // processing an element of the changelist and there is enough room - // in the eventlist, then the event will be placed in the eventlist - // with EV_ERROR set in flags and the system error in data. - if (changelist[0].flags == std.c.EV_ERROR) { - return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; - // Otherwise, -1 will be returned, and errno will be set to - // indicate the error condition. - } - - const errno = std.c.getErrno(rc); - - if (errno == .SUCCESS) { - ctx.poll_ref.activate(this.loop.?); - - return JSC.Maybe(void).success; - } - - switch (rc) { - std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, - else => unreachable, - } - } else { - @compileError("TODO: Poller"); - } - } - - pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { - if (this.loop == null) { - this.loop = uws.Loop.get(); - JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; - } - const watcher_fd = this.loop.?.fd; - - if (comptime Environment.isLinux) { - const ctl = linux.epoll_ctl( - watcher_fd, - linux.EPOLL.CTL_DEL, - fd, - null, - ); - - if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { - return errno; - } - - ctx.poll_ref.deactivate(this.loop.?); - - return JSC.Maybe(void).success; - } else if (comptime Environment.isMac) { - var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); - changelist[0] = switch (flag) { - .read => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_READ, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .write => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_WRITE, - .data = 0, - .fflags = 0, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - .process => .{ - .ident = @intCast(u64, fd), - .filter = std.os.system.EVFILT_PROC, - .data = 0, - .fflags = std.c.NOTE_EXIT, - .udata = @ptrToInt(Pollable.init(ctx).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, - .ext = .{ 0, 0 }, - }, - }; - - // output events only include change errors - const KEVENT_FLAG_ERROR_EVENTS = 0x000002; - - // The kevent() system call returns the number of events placed in - // the eventlist, up to the value given by nevents. If the time - // limit expires, then kevent() returns 0. - const rc = std.os.system.kevent64( - watcher_fd, - &changelist, - 1, - // The same array may be used for the changelist and eventlist. - &changelist, - 1, - KEVENT_FLAG_ERROR_EVENTS, - &timeout, - ); - // If an error occurs while - // processing an element of the changelist and there is enough room - // in the eventlist, then the event will be placed in the eventlist - // with EV_ERROR set in flags and the system error in data. - if (changelist[0].flags == std.c.EV_ERROR) { - return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; - // Otherwise, -1 will be returned, and errno will be set to - // indicate the error condition. - } - - const errno = std.c.getErrno(rc); - - if (errno == .SUCCESS) { - ctx.poll_ref.deactivate(this.loop.?); - return JSC.Maybe(void).success; - } - - switch (rc) { - std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, - else => unreachable, - } - } else { - @compileError("TODO: Poller"); - } - } - - pub fn tick(this: *Poller) void { - var loop = this.loop orelse return; - if (loop.active == 0) return; - loop.tick(); - } - - pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void { - _ = loop; - _ = tagged_pointer; - if (comptime Environment.isMac) - dispatchKQueueEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]) - else if (comptime Environment.isLinux) - dispatchEpollEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]); - } - - pub const Flag = enum { - read, - write, - process, - }; - - comptime { - @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" }); - } -}; |