diff options
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r-- | src/bun.js/event_loop.zig | 236 |
1 files changed, 114 insertions, 122 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 1907838d0..d4688beac 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -23,6 +23,7 @@ const js = JSC.C; pub const WorkPool = @import("../work_pool.zig").WorkPool; pub const WorkPoolTask = @import("../work_pool.zig").Task; const NetworkThread = @import("http").NetworkThread; +const uws = @import("uws"); pub fn ConcurrentPromiseTask(comptime Context: type) type { return struct { @@ -325,7 +326,6 @@ pub const EventLoop = struct { // TODO: fix this technical debt pub fn tick(this: *EventLoop) void { - var poller = &this.virtual_machine.poller; var ctx = this.virtual_machine; this.tickConcurrent(); var global_vm = ctx.global.vm(); @@ -338,11 +338,13 @@ pub const EventLoop = struct { this.tickConcurrent(); if (this.tasks.count > 0) continue; } + break; + } - this.global.vm().doWork(); - poller.tick(); + this.global.vm().doWork(); - break; + while (this.tickWithCount() > 0) { + this.tickConcurrent(); } this.global.handleRejectedPromises(); @@ -354,7 +356,7 @@ pub const EventLoop = struct { ctx.global.vm().releaseWeakRefs(); ctx.global.vm().drainMicrotasks(); - if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (ctx.uws_event_loop.?.num_polls > 0 or this.start_server_on_next_tick)) { + if (ctx.poller.loop != null and ctx.poller.loop.?.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (ctx.uws_event_loop.?.num_polls > 0 or this.start_server_on_next_tick))) { if (this.tickConcurrentWithCount() > 0) { this.tick(); } else { @@ -380,8 +382,6 @@ pub const EventLoop = struct { if (promise.status(this.global.vm()) == .Pending) { if (this.virtual_machine.uws_event_loop != null) { this.runUSocketsLoop(); - } else if (this.waker) |*waker| { - _ = waker.wait() catch 0; } } } @@ -411,8 +411,11 @@ pub const EventLoop = struct { pub fn ensureWaker(this: *EventLoop) void { JSC.markBinding(); - if (this.waker == null) { - this.waker = AsyncIO.Waker.init(this.virtual_machine.allocator) catch unreachable; + if (this.virtual_machine.uws_event_loop == null) { + var actual = uws.Loop.get().?; + this.virtual_machine.uws_event_loop = actual; + _ = actual.addPostHandler(*JSC.EventLoop, this, JSC.EventLoop.tick); + _ = actual.addPreHandler(*JSC.VM, this.virtual_machine.global.vm(), JSC.VM.drainMicrotasks); } } @@ -432,113 +435,143 @@ pub const EventLoop = struct { loop.nextTick(*EventLoop, this, onDefer); } } - - if (this.waker) |*waker| { - waker.wake() catch unreachable; - } } }; pub const Poller = struct { /// kqueue() or epoll() /// 0 == unset - watch_fd: i32 = 0, - active: u32 = 0, - - pub const PlatformSpecificFlags = struct {}; + loop: ?*uws.Loop = null, - const Completion = fn (ctx: ?*anyopaque, sizeOrOffset: i64, flags: u16) void; - const kevent64 = std.os.system.kevent64_s; - pub fn dispatchKQueueEvent(kqueue_event: *const kevent64) void { + pub fn dispatchKQueueEvent(loop: *uws.Loop, kqueue_event: *const std.os.Kevent) void { if (comptime !Environment.isMac) { unreachable; } + var ptr = Pollable.from(@intToPtr(?*anyopaque, kqueue_event.udata)); - const ptr = @intToPtr(?*anyopaque, kqueue_event.udata); - const callback: Completion = @intToPtr(Completion, kqueue_event.ext[0]); - callback(ptr, @bitCast(i64, kqueue_event.data), kqueue_event.flags); + switch (ptr.tag()) { + @field(Pollable.Tag, "FileBlobLoader") => { + var loader = ptr.as(FileBlobLoader); + loop.active -= 1; + loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags); + }, + else => unreachable, + } + } + + fn dispatchEpollEvent(loop: *uws.Loop, epoll_event: *linux.epoll_event) void { + var ptr = Pollable.from(@intToPtr(?*anyopaque, epoll_event.data.ptr)); + switch (ptr.tag()) { + @field(Pollable.Tag, "FileBlobLoader") => { + var loader = ptr.as(FileBlobLoader); + loop.active -= 1; + loader.onPoll(0, 0); + }, + else => unreachable, + } } const timeout = std.mem.zeroes(std.os.timespec); + const linux = std.os.linux; + + const FileBlobLoader = JSC.WebCore.FileBlobLoader; + + /// epoll only allows one pointer + /// We unfortunately need two pointers: one for a function call and one for the context + /// We use a tagged pointer union and then call the function with the context pointer + pub const Pollable = TaggedPointerUnion(.{ + FileBlobLoader, + AsyncIO.Waker, + }); + const Kevent = std.os.Kevent; + const kevent = std.c.kevent; + + pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { + if (this.loop == null) { + this.loop = uws.Loop.get(); + JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; + } + const watcher_fd = this.loop.?.fd; - pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, ctx: ?*anyopaque, completion: Completion) JSC.Maybe(void) { if (comptime Environment.isLinux) { - // std.debug.assert(this.watch_fd != 0); - // TODO: - return JSC.Maybe(void).success; - } else if (comptime Environment.isMac) { - if (this.watch_fd == 0) { - this.watch_fd = std.c.kqueue(); - if (this.watch_fd == -1) { - defer this.watch_fd = 0; - return JSC.Maybe(void).errnoSys(this.watch_fd, .kqueue).?; - } + const flags: u32 = switch (flag) { + .read => linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.ONESHOT, + .write => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | linux.EPOLL.ONESHOT, + }; + + var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(ctx).ptr()) } }; + + const ctl = linux.epoll_ctl( + watcher_fd, + linux.EPOLL.CTL_ADD, + fd, + &event, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; } - var events_list = std.mem.zeroes([2]kevent64); - events_list[0] = switch (flag) { + this.loop.?.num_polls += 1; + this.loop.?.active += 1; + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + changelist[0] = switch (flag) { .read => .{ .ident = @intCast(u64, fd), .filter = std.os.system.EVFILT_READ, .data = 0, .fflags = 0, - .udata = @ptrToInt(ctx), + .udata = @ptrToInt(Pollable.init(ctx).ptr()), .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT, - .ext = .{ @ptrToInt(completion), 0 }, + .ext = .{ 0, 0 }, }, .write => .{ .ident = @intCast(u64, fd), .filter = std.os.system.EVFILT_WRITE, .data = 0, .fflags = 0, - .udata = @ptrToInt(ctx), + .udata = @ptrToInt(Pollable.init(ctx).ptr()), .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT, - .ext = .{ @ptrToInt(completion), 0 }, + .ext = .{ 0, 0 }, }, }; + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + // The kevent() system call returns the number of events placed in // the eventlist, up to the value given by nevents. If the time // limit expires, then kevent() returns 0. const rc = std.os.system.kevent64( - this.watch_fd, - &events_list, + watcher_fd, + &changelist, 1, // The same array may be used for the changelist and eventlist. - &events_list, + &changelist, 1, - 0, + KEVENT_FLAG_ERROR_EVENTS, &timeout, ); - // If an error occurs while // processing an element of the changelist and there is enough room // in the eventlist, then the event will be placed in the eventlist // with EV_ERROR set in flags and the system error in data. - if (events_list[0].flags == std.c.EV_ERROR) { - return JSC.Maybe(void).errnoSys(events_list[0].data, .kevent).?; + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; // Otherwise, -1 will be returned, and errno will be set to // indicate the error condition. } - switch (rc) { - std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(std.c.getErrno(rc)), .kevent).?, - 0 => { - this.active += 1; - return JSC.Maybe(void).success; - }, - 1 => { - // if we immediately get an event, we can skip the reference counting - dispatchKQueueEvent(&events_list[0]); - return JSC.Maybe(void).success; - }, - 2 => { - dispatchKQueueEvent(&events_list[0]); + const errno = std.c.getErrno(rc); + if (errno == .SUCCESS) { + this.loop.?.num_polls += 1; + this.loop.?.active += 1; + return JSC.Maybe(void).success; + } - this.active -= 1; - dispatchKQueueEvent(&events_list[1]); - return JSC.Maybe(void).success; - }, + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, else => unreachable, } } else { @@ -546,67 +579,26 @@ pub const Poller = struct { } } - const kqueue_events_ = std.mem.zeroes([4]kevent64); pub fn tick(this: *Poller) void { - if (comptime Environment.isMac) { - if (this.active == 0) return; - - var events_list = kqueue_events_; - // ub extern "c" fn kevent64( - // kq: c_int, - // changelist: [*]const kevent64_s, - // nchanges: c_int, - // eventlist: [*]kevent64_s, - // nevents: c_int, - // flags: c_uint, - // timeout: ?*const timespec, - // ) c_int; - const rc = std.os.system.kevent64( - this.watch_fd, - &events_list, - 0, - // The same array may be used for the changelist and eventlist. - &events_list, - 4, - 0, - &timeout, - ); + var loop = this.loop orelse return; + if (loop.active == 0) return; + loop.tick(); + } - switch (rc) { - std.math.minInt(@TypeOf(rc))...-1 => { - // EINTR is fine - switch (std.c.getErrno(rc)) { - .INTR => return, - else => |errno| std.debug.panic("kevent64() failed: {d}", .{errno}), - } - }, - 0 => {}, - 1 => { - this.active -= 1; - dispatchKQueueEvent(&events_list[0]); - }, - 2 => { - this.active -= 2; - dispatchKQueueEvent(&events_list[0]); - dispatchKQueueEvent(&events_list[1]); - }, - 3 => { - this.active -= 3; - dispatchKQueueEvent(&events_list[0]); - dispatchKQueueEvent(&events_list[1]); - dispatchKQueueEvent(&events_list[2]); - }, - 4 => { - this.active -= 4; - dispatchKQueueEvent(&events_list[0]); - dispatchKQueueEvent(&events_list[1]); - dispatchKQueueEvent(&events_list[2]); - dispatchKQueueEvent(&events_list[3]); - }, - else => unreachable, - } - } + pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void { + _ = loop; + _ = tagged_pointer; + if (comptime Environment.isMac) + dispatchKQueueEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]) + else if (comptime Environment.isLinux) + dispatchEpollEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]); } pub const Flag = enum { read, write }; + + comptime { + if (!JSC.is_bindgen) { + @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" }); + } + } }; |