aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/event_loop.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r--src/bun.js/event_loop.zig115
1 files changed, 115 insertions, 0 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 238b3907f..747bf01e0 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -463,17 +463,28 @@ pub const Poller = struct {
@field(Pollable.Tag, "FileBlobLoader") => {
var loader = ptr.as(FileBlobLoader);
loop.active -= 1;
+ loop.num_polls -= 1;
+
loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags);
},
@field(Pollable.Tag, "Subprocess") => {
var loader = ptr.as(JSC.Subprocess);
loop.num_polls -= 1;
+ loop.active -= 1;
// kqueue sends the same notification multiple times in the same tick potentially
// so we have to dedupe it
_ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable;
},
+ @field(Pollable.Tag, "FileSink") => {
+ var loader = ptr.as(JSC.WebCore.FileSink);
+
+ loop.num_polls -= 1;
+ loop.active -= 1;
+
+ loader.onPoll(0, 0);
+ },
else => |tag| {
bun.Output.panic(
"Internal error\nUnknown pollable tag: {d}\n",
@@ -489,17 +500,28 @@ pub const Poller = struct {
@field(Pollable.Tag, "FileBlobLoader") => {
var loader = ptr.as(FileBlobLoader);
loop.active -= 1;
+ loop.num_polls -= 1;
+
loader.onPoll(0, 0);
},
@field(Pollable.Tag, "Subprocess") => {
var loader = ptr.as(JSC.Subprocess);
loop.num_polls -= 1;
+ loop.active -= 1;
// kqueue sends the same notification multiple times in the same tick potentially
// so we have to dedupe it
_ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable;
},
+ @field(Pollable.Tag, "FileSink") => {
+ var loader = ptr.as(JSC.WebCore.FileSink);
+
+ loop.num_polls -= 1;
+ loop.active -= 1;
+
+ loader.onPoll(0, 0);
+ },
else => unreachable,
}
}
@@ -627,6 +649,99 @@ pub const Poller = struct {
}
}
+ pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) {
+ if (this.loop == null) {
+ this.loop = uws.Loop.get();
+ JSC.VirtualMachine.vm.uws_event_loop = this.loop.?;
+ }
+ const watcher_fd = this.loop.?.fd;
+
+ if (comptime Environment.isLinux) {
+ const ctl = linux.epoll_ctl(
+ watcher_fd,
+ linux.EPOLL.CTL_DEL,
+ fd,
+ null,
+ );
+
+ if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
+ return errno;
+ }
+
+ return JSC.Maybe(void).success;
+ } else if (comptime Environment.isMac) {
+ var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
+ changelist[0] = switch (flag) {
+ .read => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_READ,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(ctx).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .write => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_WRITE,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(ctx).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .process => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_PROC,
+ .data = 0,
+ .fflags = std.c.NOTE_EXIT,
+ .udata = @ptrToInt(Pollable.init(ctx).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ };
+
+ // output events only include change errors
+ const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
+
+ // The kevent() system call returns the number of events placed in
+ // the eventlist, up to the value given by nevents. If the time
+ // limit expires, then kevent() returns 0.
+ const rc = std.os.system.kevent64(
+ watcher_fd,
+ &changelist,
+ 1,
+ // The same array may be used for the changelist and eventlist.
+ &changelist,
+ 1,
+ KEVENT_FLAG_ERROR_EVENTS,
+ &timeout,
+ );
+ // If an error occurs while
+ // processing an element of the changelist and there is enough room
+ // in the eventlist, then the event will be placed in the eventlist
+ // with EV_ERROR set in flags and the system error in data.
+ if (changelist[0].flags == std.c.EV_ERROR) {
+ return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
+ // Otherwise, -1 will be returned, and errno will be set to
+ // indicate the error condition.
+ }
+
+ const errno = std.c.getErrno(rc);
+
+ if (errno == .SUCCESS) {
+ return JSC.Maybe(void).success;
+ }
+
+ switch (rc) {
+ std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
+ else => unreachable,
+ }
+ } else {
+ @compileError("TODO: Poller");
+ }
+ }
+
pub fn tick(this: *Poller) void {
var loop = this.loop orelse return;
if (loop.active == 0) return;