aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/event_loop.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-11-12 18:30:12 -0800
committerGravatar GitHub <noreply@github.com> 2022-11-12 18:30:12 -0800
commit21bf3ddaf23c842dc12a1d76dbd3b48daf08f349 (patch)
tree06706104877984e9f083fed7c3278c9d007193cc /src/bun.js/event_loop.zig
parent514f2a8eddf1a1d35a33cc096ed7403a79afe36f (diff)
downloadbun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.gz
bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.zst
bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.zip
Redo how we poll pipes (#1496)
* Fix pipe * Handle unregistered * Fix failing test
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r--src/bun.js/event_loop.zig351
1 files changed, 2 insertions, 349 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 0c99a949a..b62ac0123 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -377,8 +377,9 @@ pub const EventLoop = struct {
ctx.global.vm().releaseWeakRefs();
ctx.global.vm().drainMicrotasks();
+ var loop = ctx.uws_event_loop orelse return;
- if (ctx.poller.loop != null and ctx.poller.loop.?.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (ctx.uws_event_loop.?.num_polls > 0 or this.start_server_on_next_tick))) {
+ if (loop.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (loop.num_polls > 0 or this.start_server_on_next_tick))) {
if (this.tickConcurrentWithCount() > 0) {
this.tick();
} else {
@@ -449,351 +450,3 @@ pub const EventLoop = struct {
}
}
};
-
-pub const Poller = struct {
- /// kqueue() or epoll()
- /// 0 == unset
- loop: ?*uws.Loop = null,
-
- pub fn dispatchKQueueEvent(loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void {
- if (comptime !Environment.isMac) {
- unreachable;
- }
- var ptr = Pollable.from(@intToPtr(?*anyopaque, kqueue_event.udata));
-
- switch (ptr.tag()) {
- @field(Pollable.Tag, "FileBlobLoader") => {
- var loader = ptr.as(FileBlobLoader);
- loader.poll_ref.deactivate(loop);
-
- loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags);
- },
- @field(Pollable.Tag, "Subprocess") => {
- var loader = ptr.as(JSC.Subprocess);
-
- loader.poll_ref.deactivate(loop);
- loader.onExitNotification();
- },
- @field(Pollable.Tag, "BufferedInput") => {
- var loader = ptr.as(JSC.Subprocess.BufferedInput);
-
- loader.poll_ref.deactivate(loop);
-
- loader.onReady(@bitCast(i64, kqueue_event.data));
- },
- @field(Pollable.Tag, "BufferedOutput") => {
- var loader = ptr.as(JSC.Subprocess.BufferedOutput);
-
- loader.poll_ref.deactivate(loop);
-
- loader.ready(@bitCast(i64, kqueue_event.data));
- },
- @field(Pollable.Tag, "FileSink") => {
- var loader = ptr.as(JSC.WebCore.FileSink);
- loader.poll_ref.deactivate(loop);
-
- loader.onPoll(0, 0);
- },
- else => |tag| {
- bun.Output.panic(
- "Internal error\nUnknown pollable tag: {d}\n",
- .{@enumToInt(tag)},
- );
- },
- }
- }
-
- fn dispatchEpollEvent(loop: *uws.Loop, epoll_event: *linux.epoll_event) void {
- var ptr = Pollable.from(@intToPtr(?*anyopaque, epoll_event.data.ptr));
- switch (ptr.tag()) {
- @field(Pollable.Tag, "FileBlobLoader") => {
- var loader = ptr.as(FileBlobLoader);
- loader.poll_ref.deactivate(loop);
-
- loader.onPoll(0, 0);
- },
- @field(Pollable.Tag, "Subprocess") => {
- var loader = ptr.as(JSC.Subprocess);
- loader.poll_ref.deactivate(loop);
-
- loader.onExitNotification();
- },
- @field(Pollable.Tag, "FileSink") => {
- var loader = ptr.as(JSC.WebCore.FileSink);
- loader.poll_ref.deactivate(loop);
-
- loader.onPoll(0, 0);
- },
-
- @field(Pollable.Tag, "BufferedInput") => {
- var loader = ptr.as(JSC.Subprocess.BufferedInput);
-
- loader.poll_ref.deactivate(loop);
-
- loader.onReady(0);
- },
- @field(Pollable.Tag, "BufferedOutput") => {
- var loader = ptr.as(JSC.Subprocess.BufferedOutput);
-
- loader.poll_ref.deactivate(loop);
-
- loader.ready(0);
- },
- else => unreachable,
- }
- }
-
- const timeout = std.mem.zeroes(std.os.timespec);
- const linux = std.os.linux;
-
- const FileBlobLoader = JSC.WebCore.FileBlobLoader;
- const FileSink = JSC.WebCore.FileSink;
- const Subprocess = JSC.Subprocess;
- const BufferedInput = Subprocess.BufferedInput;
- const BufferedOutput = Subprocess.BufferedOutput;
- /// epoll only allows one pointer
- /// We unfortunately need two pointers: one for a function call and one for the context
- /// We use a tagged pointer union and then call the function with the context pointer
- pub const Pollable = TaggedPointerUnion(.{
- FileBlobLoader,
- FileSink,
- Subprocess,
- BufferedInput,
- BufferedOutput,
- });
- const Kevent = std.os.Kevent;
- const kevent = std.c.kevent;
-
- pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) {
- if (this.loop == null) {
- this.loop = uws.Loop.get();
- JSC.VirtualMachine.vm.uws_event_loop = this.loop.?;
- }
- const watcher_fd = this.loop.?.fd;
-
- if (comptime Environment.isLinux) {
- const flags: u32 = switch (flag) {
- .process, .read => linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.ONESHOT,
- .write => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | linux.EPOLL.ONESHOT,
- };
-
- var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(ctx).ptr()) } };
-
- const ctl = linux.epoll_ctl(
- watcher_fd,
- linux.EPOLL.CTL_ADD,
- fd,
- &event,
- );
-
- if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
- return errno;
- }
-
- ctx.poll_ref.activate(this.loop.?);
-
- return JSC.Maybe(void).success;
- } else if (comptime Environment.isMac) {
- var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
- changelist[0] = switch (flag) {
- .read => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_READ,
- .data = 0,
- .fflags = 0,
- .udata = @ptrToInt(Pollable.init(ctx).ptr()),
- .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
- .ext = .{ 0, 0 },
- },
- .write => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_WRITE,
- .data = 0,
- .fflags = 0,
- .udata = @ptrToInt(Pollable.init(ctx).ptr()),
- .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
- .ext = .{ 0, 0 },
- },
- .process => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_PROC,
- .data = 0,
- .fflags = std.c.NOTE_EXIT,
- .udata = @ptrToInt(Pollable.init(ctx).ptr()),
- .flags = std.c.EV_ADD,
- .ext = .{ 0, 0 },
- },
- };
-
- // output events only include change errors
- const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
-
- // The kevent() system call returns the number of events placed in
- // the eventlist, up to the value given by nevents. If the time
- // limit expires, then kevent() returns 0.
- const rc = rc: {
- while (true) {
- const rc = std.os.system.kevent64(
- watcher_fd,
- &changelist,
- 1,
- // The same array may be used for the changelist and eventlist.
- &changelist,
- 1,
- KEVENT_FLAG_ERROR_EVENTS,
- &timeout,
- );
-
- if (std.c.getErrno(rc) == .INTR) continue;
- break :rc rc;
- }
- };
-
- // If an error occurs while
- // processing an element of the changelist and there is enough room
- // in the eventlist, then the event will be placed in the eventlist
- // with EV_ERROR set in flags and the system error in data.
- if (changelist[0].flags == std.c.EV_ERROR) {
- return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
- // Otherwise, -1 will be returned, and errno will be set to
- // indicate the error condition.
- }
-
- const errno = std.c.getErrno(rc);
-
- if (errno == .SUCCESS) {
- ctx.poll_ref.activate(this.loop.?);
-
- return JSC.Maybe(void).success;
- }
-
- switch (rc) {
- std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
- else => unreachable,
- }
- } else {
- @compileError("TODO: Poller");
- }
- }
-
- pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) {
- if (this.loop == null) {
- this.loop = uws.Loop.get();
- JSC.VirtualMachine.vm.uws_event_loop = this.loop.?;
- }
- const watcher_fd = this.loop.?.fd;
-
- if (comptime Environment.isLinux) {
- const ctl = linux.epoll_ctl(
- watcher_fd,
- linux.EPOLL.CTL_DEL,
- fd,
- null,
- );
-
- if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
- return errno;
- }
-
- ctx.poll_ref.deactivate(this.loop.?);
-
- return JSC.Maybe(void).success;
- } else if (comptime Environment.isMac) {
- var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
- changelist[0] = switch (flag) {
- .read => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_READ,
- .data = 0,
- .fflags = 0,
- .udata = @ptrToInt(Pollable.init(ctx).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
- .ext = .{ 0, 0 },
- },
- .write => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_WRITE,
- .data = 0,
- .fflags = 0,
- .udata = @ptrToInt(Pollable.init(ctx).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
- .ext = .{ 0, 0 },
- },
- .process => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_PROC,
- .data = 0,
- .fflags = std.c.NOTE_EXIT,
- .udata = @ptrToInt(Pollable.init(ctx).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
- .ext = .{ 0, 0 },
- },
- };
-
- // output events only include change errors
- const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
-
- // The kevent() system call returns the number of events placed in
- // the eventlist, up to the value given by nevents. If the time
- // limit expires, then kevent() returns 0.
- const rc = std.os.system.kevent64(
- watcher_fd,
- &changelist,
- 1,
- // The same array may be used for the changelist and eventlist.
- &changelist,
- 1,
- KEVENT_FLAG_ERROR_EVENTS,
- &timeout,
- );
- // If an error occurs while
- // processing an element of the changelist and there is enough room
- // in the eventlist, then the event will be placed in the eventlist
- // with EV_ERROR set in flags and the system error in data.
- if (changelist[0].flags == std.c.EV_ERROR) {
- return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
- // Otherwise, -1 will be returned, and errno will be set to
- // indicate the error condition.
- }
-
- const errno = std.c.getErrno(rc);
-
- if (errno == .SUCCESS) {
- ctx.poll_ref.deactivate(this.loop.?);
- return JSC.Maybe(void).success;
- }
-
- switch (rc) {
- std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
- else => unreachable,
- }
- } else {
- @compileError("TODO: Poller");
- }
- }
-
- pub fn tick(this: *Poller) void {
- var loop = this.loop orelse return;
- if (loop.active == 0) return;
- loop.tick();
- }
-
- pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void {
- _ = loop;
- _ = tagged_pointer;
- if (comptime Environment.isMac)
- dispatchKQueueEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)])
- else if (comptime Environment.isLinux)
- dispatchEpollEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]);
- }
-
- pub const Flag = enum {
- read,
- write,
- process,
- };
-
- comptime {
- @export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" });
- }
-};