aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/event_loop.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-22 23:21:48 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-22 23:21:48 -0700
commit729d445b6885f69dd2c6355f38707bd42851c791 (patch)
treef87a7c408929ea3f57bbb7ace380cf869da83c0e /src/bun.js/event_loop.zig
parent25f820c6bf1d8ec6d444ef579cc036b8c0607b75 (diff)
downloadbun-jarred/rename.tar.gz
bun-jarred/rename.tar.zst
bun-jarred/rename.zip
change the directory structurejarred/rename
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r--src/bun.js/event_loop.zig638
1 files changed, 638 insertions, 0 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
new file mode 100644
index 000000000..d6b7f75da
--- /dev/null
+++ b/src/bun.js/event_loop.zig
@@ -0,0 +1,638 @@
+const std = @import("std");
+const JSC = @import("javascript_core");
+const JSGlobalObject = JSC.JSGlobalObject;
+const VirtualMachine = JSC.VirtualMachine;
+const Lock = @import("../lock.zig").Lock;
+const Microtask = JSC.Microtask;
+const bun = @import("../global.zig");
+const Environment = bun.Environment;
+const Fetch = JSC.WebCore.Fetch;
+const WebCore = JSC.WebCore;
+const Bun = JSC.API.Bun;
+const TaggedPointerUnion = @import("../tagged_pointer.zig").TaggedPointerUnion;
+const CopyFilePromiseTask = WebCore.Blob.Store.CopyFile.CopyFilePromiseTask;
+const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask;
+const BunTimerTimeoutTask = Bun.Timer.Timeout.TimeoutTask;
+const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask;
+const WriteFileTask = WebCore.Blob.Store.WriteFile.WriteFileTask;
+const napi_async_work = JSC.napi.napi_async_work;
+const FetchTasklet = Fetch.FetchTasklet;
+const JSValue = JSC.JSValue;
+const js = JSC.C;
+pub const WorkPool = @import("../work_pool.zig").WorkPool;
+pub const WorkPoolTask = @import("../work_pool.zig").Task;
+const NetworkThread = @import("http").NetworkThread;
+
+pub fn ConcurrentPromiseTask(comptime Context: type) type {
+ return struct {
+ const This = @This();
+ ctx: *Context,
+ task: WorkPoolTask = .{ .callback = runFromThreadPool },
+ event_loop: *JSC.EventLoop,
+ allocator: std.mem.Allocator,
+ promise: JSValue,
+ globalThis: *JSGlobalObject,
+
+ pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
+ var this = try allocator.create(This);
+ this.* = .{
+ .event_loop = VirtualMachine.vm.event_loop,
+ .ctx = value,
+ .allocator = allocator,
+ .promise = JSValue.createInternalPromise(globalThis),
+ .globalThis = globalThis,
+ };
+ js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
+ VirtualMachine.vm.active_tasks +|= 1;
+ return this;
+ }
+
+ pub fn runFromThreadPool(task: *WorkPoolTask) void {
+ var this = @fieldParentPtr(This, "task", task);
+ Context.run(this.ctx);
+ this.onFinish();
+ }
+
+ pub fn runFromJS(this: This) void {
+ var promise_value = this.promise;
+ var promise = promise_value.asInternalPromise() orelse {
+ if (comptime @hasDecl(Context, "deinit")) {
+ @call(.{}, Context.deinit, .{this.ctx});
+ }
+ return;
+ };
+
+ var ctx = this.ctx;
+
+ js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
+ ctx.then(promise);
+ }
+
+ pub fn schedule(this: *This) void {
+ WorkPool.schedule(&this.task);
+ }
+
+ pub fn onFinish(this: *This) void {
+ this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ }
+
+ pub fn deinit(this: *This) void {
+ this.allocator.destroy(this);
+ }
+ };
+}
+
+pub fn SerialPromiseTask(comptime Context: type) type {
+ return struct {
+ const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1);
+ const This = @This();
+
+ ctx: *Context,
+ task: WorkPoolTask = .{ .callback = runFromThreadPool },
+ event_loop: *JSC.EventLoop,
+ allocator: std.mem.Allocator,
+ promise: JSValue,
+ globalThis: *JSGlobalObject,
+
+ pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
+ var this = try allocator.create(This);
+ this.* = .{
+ .event_loop = VirtualMachine.vm.event_loop,
+ .ctx = value,
+ .allocator = allocator,
+ .promise = JSValue.createInternalPromise(globalThis),
+ .globalThis = globalThis,
+ };
+ js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
+ VirtualMachine.vm.active_tasks +|= 1;
+ return this;
+ }
+
+ pub fn runFromThreadPool(task: *WorkPoolTask) void {
+ var this = @fieldParentPtr(This, "task", task);
+ Context.run(this.ctx);
+ this.onFinish();
+ }
+
+ pub fn runFromJS(this: This) void {
+ var promise_value = this.promise;
+ var promise = promise_value.asInternalPromise() orelse {
+ if (comptime @hasDecl(Context, "deinit")) {
+ @call(.{}, Context.deinit, .{this.ctx});
+ }
+ return;
+ };
+
+ var ctx = this.ctx;
+
+ js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
+ ctx.then(promise, this.globalThis);
+ }
+
+ pub fn schedule(this: *This) void {
+ SerialWorkPool.schedule(&this.task);
+ }
+
+ pub fn onFinish(this: *This) void {
+ this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ }
+
+ pub fn deinit(this: *This) void {
+ this.allocator.destroy(this);
+ }
+ };
+}
+
+pub fn IOTask(comptime Context: type) type {
+ return struct {
+ const This = @This();
+ ctx: *Context,
+ task: NetworkThread.Task = .{ .callback = runFromThreadPool },
+ event_loop: *JSC.EventLoop,
+ allocator: std.mem.Allocator,
+ globalThis: *JSGlobalObject,
+
+ pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
+ var this = try allocator.create(This);
+ this.* = .{
+ .event_loop = VirtualMachine.vm.eventLoop(),
+ .ctx = value,
+ .allocator = allocator,
+ .globalThis = globalThis,
+ };
+ return this;
+ }
+
+ pub fn runFromThreadPool(task: *NetworkThread.Task) void {
+ var this = @fieldParentPtr(This, "task", task);
+ Context.run(this.ctx, this);
+ }
+
+ pub fn runFromJS(this: This) void {
+ var ctx = this.ctx;
+ ctx.then(this.globalThis);
+ }
+
+ pub fn schedule(this: *This) void {
+ NetworkThread.init() catch return;
+ NetworkThread.global.pool.schedule(NetworkThread.Batch.from(&this.task));
+ }
+
+ pub fn onFinish(this: *This) void {
+ this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ }
+
+ pub fn deinit(this: *This) void {
+ var allocator = this.allocator;
+ this.* = undefined;
+ allocator.destroy(this);
+ }
+ };
+}
+
+pub fn AsyncNativeCallbackTask(comptime Context: type) type {
+ return struct {
+ const This = @This();
+ ctx: *Context,
+ task: WorkPoolTask = .{ .callback = runFromThreadPool },
+ event_loop: *JSC.EventLoop,
+ allocator: std.mem.Allocator,
+ globalThis: *JSGlobalObject,
+
+ pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
+ var this = try allocator.create(This);
+ this.* = .{
+ .event_loop = VirtualMachine.vm.eventLoop(),
+ .ctx = value,
+ .allocator = allocator,
+ .globalThis = globalThis,
+ };
+ return this;
+ }
+
+ pub fn runFromThreadPool(task: *WorkPoolTask) void {
+ var this = @fieldParentPtr(This, "task", task);
+ Context.run(this.ctx, this);
+ }
+
+ pub fn runFromJS(this: This) void {
+ this.ctx.runFromJS(this.globalThis);
+ }
+
+ pub fn schedule(this: *This) void {
+ WorkPool.get().schedule(WorkPool.schedule(&this.task));
+ }
+
+ pub fn onFinish(this: *This) void {
+ this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ }
+
+ pub fn deinit(this: *This) void {
+ var allocator = this.allocator;
+ this.* = undefined;
+ allocator.destroy(this);
+ }
+ };
+}
+
+pub const AnyTask = struct {
+ ctx: ?*anyopaque,
+ callback: fn (*anyopaque) void,
+
+ pub fn run(this: *AnyTask) void {
+ @setRuntimeSafety(false);
+ this.callback(this.ctx.?);
+ }
+
+ pub fn New(comptime Type: type, comptime Callback: anytype) type {
+ return struct {
+ pub fn init(ctx: *Type) AnyTask {
+ return AnyTask{
+ .callback = wrap,
+ .ctx = ctx,
+ };
+ }
+
+ pub fn wrap(this: ?*anyopaque) void {
+ Callback(@ptrCast(*Type, @alignCast(@alignOf(Type), this.?)));
+ }
+ };
+ }
+};
+
+pub const CppTask = opaque {
+ extern fn Bun__performTask(globalObject: *JSGlobalObject, task: *CppTask) void;
+ pub fn run(this: *CppTask, global: *JSGlobalObject) void {
+ JSC.markBinding();
+ Bun__performTask(global, this);
+ }
+};
+const ThreadSafeFunction = JSC.napi.ThreadSafeFunction;
+const MicrotaskForDefaultGlobalObject = JSC.MicrotaskForDefaultGlobalObject;
+// const PromiseTask = JSInternalPromise.Completion.PromiseTask;
+pub const Task = TaggedPointerUnion(.{
+ FetchTasklet,
+ Microtask,
+ MicrotaskForDefaultGlobalObject,
+ AsyncTransformTask,
+ BunTimerTimeoutTask,
+ ReadFileTask,
+ CopyFilePromiseTask,
+ WriteFileTask,
+ AnyTask,
+ napi_async_work,
+ ThreadSafeFunction,
+ CppTask,
+ // PromiseTask,
+ // TimeoutTasklet,
+});
+
+pub const EventLoop = struct {
+ ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+ pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+ io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+ tasks: Queue = undefined,
+ concurrent_tasks: Queue = undefined,
+ concurrent_lock: Lock = Lock.init(),
+ global: *JSGlobalObject = undefined,
+ virtual_machine: *VirtualMachine = undefined,
+ pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
+
+ pub fn tickWithCount(this: *EventLoop) u32 {
+ var finished: u32 = 0;
+ var global = this.global;
+ var vm_ = this.virtual_machine;
+ while (this.tasks.readItem()) |task| {
+ switch (task.tag()) {
+ .Microtask => {
+ var micro: *Microtask = task.as(Microtask);
+ micro.run(global);
+ finished += 1;
+ },
+ .MicrotaskForDefaultGlobalObject => {
+ var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject);
+ micro.run(global);
+ finished += 1;
+ },
+ .FetchTasklet => {
+ var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
+ fetch_task.onDone();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(AsyncTransformTask)) => {
+ var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
+ transform_task.*.runFromJS();
+ transform_task.deinit();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
+ var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
+ transform_task.*.runFromJS();
+ transform_task.deinit();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(JSC.napi.napi_async_work)) => {
+ var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
+ transform_task.*.runFromJS();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
+ var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
+ transform_task.*.runFromJS();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(ReadFileTask)) => {
+ var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
+ transform_task.*.runFromJS();
+ transform_task.deinit();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(WriteFileTask)) => {
+ var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
+ transform_task.*.runFromJS();
+ transform_task.deinit();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(AnyTask)) => {
+ var any: *AnyTask = task.get(AnyTask).?;
+ any.run();
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ @field(Task.Tag, @typeName(CppTask)) => {
+ var any: *CppTask = task.get(CppTask).?;
+ any.run(global);
+ finished += 1;
+ vm_.active_tasks -|= 1;
+ },
+ else => if (Environment.allow_assert) {
+ bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())});
+ } else unreachable,
+ }
+ }
+
+ if (finished > 0) {
+ _ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
+ }
+
+ return finished;
+ }
+
+ pub fn tickConcurrent(this: *EventLoop) void {
+ if (this.ready_tasks_count.load(.Monotonic) > 0) {
+ this.concurrent_lock.lock();
+ defer this.concurrent_lock.unlock();
+ const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
+
+ // TODO: optimzie
+ this.tasks.ensureUnusedCapacity(add) catch unreachable;
+
+ {
+ this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
+ this.concurrent_tasks.discard(this.concurrent_tasks.count);
+ }
+
+ _ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
+ _ = this.ready_tasks_count.fetchSub(add, .Monotonic);
+ }
+ }
+
+ // TODO: fix this technical debt
+ pub fn tick(this: *EventLoop) void {
+ var poller = &this.virtual_machine.poller;
+ var ctx = this.virtual_machine;
+ while (true) {
+ this.tickConcurrent();
+
+ // this.global.vm().doWork();
+
+ while (this.tickWithCount() > 0) {}
+ poller.tick();
+
+ this.tickConcurrent();
+
+ if (this.tickWithCount() == 0) break;
+ }
+
+ this.global.vm().releaseWeakRefs();
+
+ if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ ctx.is_us_loop_entered = true;
+ ctx.enterUWSLoop();
+ ctx.is_us_loop_entered = false;
+ }
+ }
+
+ // TODO: fix this technical debt
+ pub fn waitForPromise(this: *EventLoop, promise: *JSC.JSInternalPromise) void {
+ switch (promise.status(this.global.vm())) {
+ JSC.JSPromise.Status.Pending => {
+ while (promise.status(this.global.vm()) == .Pending) {
+ this.tick();
+ }
+ },
+ else => {},
+ }
+ }
+
+ pub fn waitForTasks(this: *EventLoop) void {
+ this.tick();
+ while (this.pending_tasks_count.load(.Monotonic) > 0) {
+ this.tick();
+ }
+ }
+
+ pub fn enqueueTask(this: *EventLoop, task: Task) void {
+ _ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
+ this.tasks.writeItem(task) catch unreachable;
+ }
+
+ pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
+ this.concurrent_lock.lock();
+ defer this.concurrent_lock.unlock();
+ this.concurrent_tasks.writeItem(task) catch unreachable;
+ if (this.virtual_machine.uws_event_loop) |loop| {
+ loop.nextTick(*EventLoop, this, EventLoop.tick);
+ }
+ _ = this.ready_tasks_count.fetchAdd(1, .Monotonic);
+ }
+};
+
+pub const Poller = struct {
+ /// kqueue() or epoll()
+ /// 0 == unset
+ watch_fd: i32 = 0,
+ active: u32 = 0,
+
+ pub const PlatformSpecificFlags = struct {};
+
+ const Completion = fn (ctx: ?*anyopaque, sizeOrOffset: i64, flags: u16) void;
+ const kevent64 = std.os.system.kevent64_s;
+ pub fn dispatchKQueueEvent(kqueue_event: *const kevent64) void {
+ if (comptime !Environment.isMac) {
+ unreachable;
+ }
+
+ const ptr = @intToPtr(?*anyopaque, kqueue_event.udata);
+ const callback: Completion = @intToPtr(Completion, kqueue_event.ext[0]);
+ callback(ptr, @bitCast(i64, kqueue_event.data), kqueue_event.flags);
+ }
+
+ const timeout = std.mem.zeroes(std.os.timespec);
+
+ pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, ctx: ?*anyopaque, completion: Completion) JSC.Maybe(void) {
+ if (comptime Environment.isLinux) {
+ // std.debug.assert(this.watch_fd != 0);
+ // TODO:
+ return JSC.Maybe(void).success;
+ } else if (comptime Environment.isMac) {
+ if (this.watch_fd == 0) {
+ this.watch_fd = std.c.kqueue();
+ if (this.watch_fd == -1) {
+ defer this.watch_fd = 0;
+ return JSC.Maybe(void).errnoSys(this.watch_fd, .kqueue).?;
+ }
+ }
+
+ var events_list = std.mem.zeroes([2]kevent64);
+ events_list[0] = switch (flag) {
+ .read => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_READ,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(ctx),
+ .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
+ .ext = .{ @ptrToInt(completion), 0 },
+ },
+ .write => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_WRITE,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(ctx),
+ .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
+ .ext = .{ @ptrToInt(completion), 0 },
+ },
+ };
+
+ // The kevent() system call returns the number of events placed in
+ // the eventlist, up to the value given by nevents. If the time
+ // limit expires, then kevent() returns 0.
+ const rc = std.os.system.kevent64(
+ this.watch_fd,
+ &events_list,
+ 1,
+ // The same array may be used for the changelist and eventlist.
+ &events_list,
+ 1,
+ 0,
+ &timeout,
+ );
+
+ // If an error occurs while
+ // processing an element of the changelist and there is enough room
+ // in the eventlist, then the event will be placed in the eventlist
+ // with EV_ERROR set in flags and the system error in data.
+ if (events_list[0].flags == std.c.EV_ERROR) {
+ return JSC.Maybe(void).errnoSys(events_list[0].data, .kevent).?;
+ // Otherwise, -1 will be returned, and errno will be set to
+ // indicate the error condition.
+ }
+
+ switch (rc) {
+ std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(std.c.getErrno(rc)), .kevent).?,
+ 0 => {
+ this.active += 1;
+ return JSC.Maybe(void).success;
+ },
+ 1 => {
+ // if we immediately get an event, we can skip the reference counting
+ dispatchKQueueEvent(&events_list[0]);
+ return JSC.Maybe(void).success;
+ },
+ 2 => {
+ dispatchKQueueEvent(&events_list[0]);
+
+ this.active -= 1;
+ dispatchKQueueEvent(&events_list[1]);
+ return JSC.Maybe(void).success;
+ },
+ else => unreachable,
+ }
+ } else {
+ @compileError("TODO: Poller");
+ }
+ }
+
+ const kqueue_events_ = std.mem.zeroes([4]kevent64);
+ pub fn tick(this: *Poller) void {
+ if (comptime Environment.isMac) {
+ if (this.active == 0) return;
+
+ var events_list = kqueue_events_;
+ // ub extern "c" fn kevent64(
+ // kq: c_int,
+ // changelist: [*]const kevent64_s,
+ // nchanges: c_int,
+ // eventlist: [*]kevent64_s,
+ // nevents: c_int,
+ // flags: c_uint,
+ // timeout: ?*const timespec,
+ // ) c_int;
+ const rc = std.os.system.kevent64(
+ this.watch_fd,
+ &events_list,
+ 0,
+ // The same array may be used for the changelist and eventlist.
+ &events_list,
+ 4,
+ 0,
+ &timeout,
+ );
+
+ switch (rc) {
+ std.math.minInt(@TypeOf(rc))...-1 => {
+ // EINTR is fine
+ switch (std.c.getErrno(rc)) {
+ .INTR => return,
+ else => |errno| std.debug.panic("kevent64() failed: {d}", .{errno}),
+ }
+ },
+ 0 => {},
+ 1 => {
+ this.active -= 1;
+ dispatchKQueueEvent(&events_list[0]);
+ },
+ 2 => {
+ this.active -= 2;
+ dispatchKQueueEvent(&events_list[0]);
+ dispatchKQueueEvent(&events_list[1]);
+ },
+ 3 => {
+ this.active -= 3;
+ dispatchKQueueEvent(&events_list[0]);
+ dispatchKQueueEvent(&events_list[1]);
+ dispatchKQueueEvent(&events_list[2]);
+ },
+ 4 => {
+ this.active -= 4;
+ dispatchKQueueEvent(&events_list[0]);
+ dispatchKQueueEvent(&events_list[1]);
+ dispatchKQueueEvent(&events_list[2]);
+ dispatchKQueueEvent(&events_list[3]);
+ },
+ else => unreachable,
+ }
+ }
+ }
+
+ pub const Flag = enum { read, write };
+};