aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/event_loop.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/javascript/jsc/event_loop.zig')
-rw-r--r--src/javascript/jsc/event_loop.zig638
1 files changed, 0 insertions, 638 deletions
diff --git a/src/javascript/jsc/event_loop.zig b/src/javascript/jsc/event_loop.zig
deleted file mode 100644
index 0a32732d0..000000000
--- a/src/javascript/jsc/event_loop.zig
+++ /dev/null
@@ -1,638 +0,0 @@
-const std = @import("std");
-const JSC = @import("javascript_core");
-const JSGlobalObject = JSC.JSGlobalObject;
-const VirtualMachine = JSC.VirtualMachine;
-const Lock = @import("../../lock.zig").Lock;
-const Microtask = JSC.Microtask;
-const bun = @import("../../global.zig");
-const Environment = bun.Environment;
-const Fetch = JSC.WebCore.Fetch;
-const WebCore = JSC.WebCore;
-const Bun = JSC.API.Bun;
-const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion;
-const CopyFilePromiseTask = WebCore.Blob.Store.CopyFile.CopyFilePromiseTask;
-const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask;
-const BunTimerTimeoutTask = Bun.Timer.Timeout.TimeoutTask;
-const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask;
-const WriteFileTask = WebCore.Blob.Store.WriteFile.WriteFileTask;
-const napi_async_work = JSC.napi.napi_async_work;
-const FetchTasklet = Fetch.FetchTasklet;
-const JSValue = JSC.JSValue;
-const js = JSC.C;
-pub const WorkPool = @import("../../work_pool.zig").WorkPool;
-pub const WorkPoolTask = @import("../../work_pool.zig").Task;
-const NetworkThread = @import("http").NetworkThread;
-
-pub fn ConcurrentPromiseTask(comptime Context: type) type {
- return struct {
- const This = @This();
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- promise: JSValue,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.event_loop,
- .ctx = value,
- .allocator = allocator,
- .promise = JSValue.createInternalPromise(globalThis),
- .globalThis = globalThis,
- };
- js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx);
- this.onFinish();
- }
-
- pub fn runFromJS(this: This) void {
- var promise_value = this.promise;
- var promise = promise_value.asInternalPromise() orelse {
- if (comptime @hasDecl(Context, "deinit")) {
- @call(.{}, Context.deinit, .{this.ctx});
- }
- return;
- };
-
- var ctx = this.ctx;
-
- js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
- ctx.then(promise);
- }
-
- pub fn schedule(this: *This) void {
- WorkPool.schedule(&this.task);
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- this.allocator.destroy(this);
- }
- };
-}
-
-pub fn SerialPromiseTask(comptime Context: type) type {
- return struct {
- const SerialWorkPool = @import("../../work_pool.zig").NewWorkPool(1);
- const This = @This();
-
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- promise: JSValue,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.event_loop,
- .ctx = value,
- .allocator = allocator,
- .promise = JSValue.createInternalPromise(globalThis),
- .globalThis = globalThis,
- };
- js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx);
- this.onFinish();
- }
-
- pub fn runFromJS(this: This) void {
- var promise_value = this.promise;
- var promise = promise_value.asInternalPromise() orelse {
- if (comptime @hasDecl(Context, "deinit")) {
- @call(.{}, Context.deinit, .{this.ctx});
- }
- return;
- };
-
- var ctx = this.ctx;
-
- js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
- ctx.then(promise, this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- SerialWorkPool.schedule(&this.task);
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- this.allocator.destroy(this);
- }
- };
-}
-
-pub fn IOTask(comptime Context: type) type {
- return struct {
- const This = @This();
- ctx: *Context,
- task: NetworkThread.Task = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.eventLoop(),
- .ctx = value,
- .allocator = allocator,
- .globalThis = globalThis,
- };
- return this;
- }
-
- pub fn runFromThreadPool(task: *NetworkThread.Task) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx, this);
- }
-
- pub fn runFromJS(this: This) void {
- var ctx = this.ctx;
- ctx.then(this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- NetworkThread.init() catch return;
- NetworkThread.global.pool.schedule(NetworkThread.Batch.from(&this.task));
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- var allocator = this.allocator;
- this.* = undefined;
- allocator.destroy(this);
- }
- };
-}
-
-pub fn AsyncNativeCallbackTask(comptime Context: type) type {
- return struct {
- const This = @This();
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.eventLoop(),
- .ctx = value,
- .allocator = allocator,
- .globalThis = globalThis,
- };
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx, this);
- }
-
- pub fn runFromJS(this: This) void {
- this.ctx.runFromJS(this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- WorkPool.get().schedule(WorkPool.schedule(&this.task));
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- var allocator = this.allocator;
- this.* = undefined;
- allocator.destroy(this);
- }
- };
-}
-
-pub const AnyTask = struct {
- ctx: ?*anyopaque,
- callback: fn (*anyopaque) void,
-
- pub fn run(this: *AnyTask) void {
- @setRuntimeSafety(false);
- this.callback(this.ctx.?);
- }
-
- pub fn New(comptime Type: type, comptime Callback: anytype) type {
- return struct {
- pub fn init(ctx: *Type) AnyTask {
- return AnyTask{
- .callback = wrap,
- .ctx = ctx,
- };
- }
-
- pub fn wrap(this: ?*anyopaque) void {
- Callback(@ptrCast(*Type, @alignCast(@alignOf(Type), this.?)));
- }
- };
- }
-};
-
-pub const CppTask = opaque {
- extern fn Bun__performTask(globalObject: *JSGlobalObject, task: *CppTask) void;
- pub fn run(this: *CppTask, global: *JSGlobalObject) void {
- JSC.markBinding();
- Bun__performTask(global, this);
- }
-};
-const ThreadSafeFunction = JSC.napi.ThreadSafeFunction;
-const MicrotaskForDefaultGlobalObject = JSC.MicrotaskForDefaultGlobalObject;
-// const PromiseTask = JSInternalPromise.Completion.PromiseTask;
-pub const Task = TaggedPointerUnion(.{
- FetchTasklet,
- Microtask,
- MicrotaskForDefaultGlobalObject,
- AsyncTransformTask,
- BunTimerTimeoutTask,
- ReadFileTask,
- CopyFilePromiseTask,
- WriteFileTask,
- AnyTask,
- napi_async_work,
- ThreadSafeFunction,
- CppTask,
- // PromiseTask,
- // TimeoutTasklet,
-});
-
-pub const EventLoop = struct {
- ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- tasks: Queue = undefined,
- concurrent_tasks: Queue = undefined,
- concurrent_lock: Lock = Lock.init(),
- global: *JSGlobalObject = undefined,
- virtual_machine: *VirtualMachine = undefined,
- pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
-
- pub fn tickWithCount(this: *EventLoop) u32 {
- var finished: u32 = 0;
- var global = this.global;
- var vm_ = this.virtual_machine;
- while (this.tasks.readItem()) |task| {
- switch (task.tag()) {
- .Microtask => {
- var micro: *Microtask = task.as(Microtask);
- micro.run(global);
- finished += 1;
- },
- .MicrotaskForDefaultGlobalObject => {
- var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject);
- micro.run(global);
- finished += 1;
- },
- .FetchTasklet => {
- var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
- fetch_task.onDone();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(AsyncTransformTask)) => {
- var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
- transform_task.*.runFromJS();
- transform_task.deinit();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
- var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
- transform_task.*.runFromJS();
- transform_task.deinit();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(JSC.napi.napi_async_work)) => {
- var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
- transform_task.*.runFromJS();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
- var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
- transform_task.*.runFromJS();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(ReadFileTask)) => {
- var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
- transform_task.*.runFromJS();
- transform_task.deinit();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(WriteFileTask)) => {
- var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
- transform_task.*.runFromJS();
- transform_task.deinit();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(AnyTask)) => {
- var any: *AnyTask = task.get(AnyTask).?;
- any.run();
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- @field(Task.Tag, @typeName(CppTask)) => {
- var any: *CppTask = task.get(CppTask).?;
- any.run(global);
- finished += 1;
- vm_.active_tasks -|= 1;
- },
- else => if (Environment.allow_assert) {
- bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())});
- } else unreachable,
- }
- }
-
- if (finished > 0) {
- _ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
- }
-
- return finished;
- }
-
- pub fn tickConcurrent(this: *EventLoop) void {
- if (this.ready_tasks_count.load(.Monotonic) > 0) {
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
-
- // TODO: optimzie
- this.tasks.ensureUnusedCapacity(add) catch unreachable;
-
- {
- this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
- this.concurrent_tasks.discard(this.concurrent_tasks.count);
- }
-
- _ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
- _ = this.ready_tasks_count.fetchSub(add, .Monotonic);
- }
- }
-
- // TODO: fix this technical debt
- pub fn tick(this: *EventLoop) void {
- var poller = &this.virtual_machine.poller;
- var ctx = this.virtual_machine;
- while (true) {
- this.tickConcurrent();
-
- // this.global.vm().doWork();
-
- while (this.tickWithCount() > 0) {}
- poller.tick();
-
- this.tickConcurrent();
-
- if (this.tickWithCount() == 0) break;
- }
-
- this.global.vm().releaseWeakRefs();
-
- if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
- ctx.is_us_loop_entered = true;
- ctx.enterUWSLoop();
- ctx.is_us_loop_entered = false;
- }
- }
-
- // TODO: fix this technical debt
- pub fn waitForPromise(this: *EventLoop, promise: *JSC.JSInternalPromise) void {
- switch (promise.status(this.global.vm())) {
- JSC.JSPromise.Status.Pending => {
- while (promise.status(this.global.vm()) == .Pending) {
- this.tick();
- }
- },
- else => {},
- }
- }
-
- pub fn waitForTasks(this: *EventLoop) void {
- this.tick();
- while (this.pending_tasks_count.load(.Monotonic) > 0) {
- this.tick();
- }
- }
-
- pub fn enqueueTask(this: *EventLoop, task: Task) void {
- _ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
- this.tasks.writeItem(task) catch unreachable;
- }
-
- pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- this.concurrent_tasks.writeItem(task) catch unreachable;
- if (this.virtual_machine.uws_event_loop) |loop| {
- loop.nextTick(*EventLoop, this, EventLoop.tick);
- }
- _ = this.ready_tasks_count.fetchAdd(1, .Monotonic);
- }
-};
-
-pub const Poller = struct {
- /// kqueue() or epoll()
- /// 0 == unset
- watch_fd: i32 = 0,
- active: u32 = 0,
-
- pub const PlatformSpecificFlags = struct {};
-
- const Completion = fn (ctx: ?*anyopaque, sizeOrOffset: i64, flags: u16) void;
- const kevent64 = std.os.system.kevent64_s;
- pub fn dispatchKQueueEvent(kqueue_event: *const kevent64) void {
- if (comptime !Environment.isMac) {
- unreachable;
- }
-
- const ptr = @intToPtr(?*anyopaque, kqueue_event.udata);
- const callback: Completion = @intToPtr(Completion, kqueue_event.ext[0]);
- callback(ptr, @bitCast(i64, kqueue_event.data), kqueue_event.flags);
- }
-
- const timeout = std.mem.zeroes(std.os.timespec);
-
- pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, ctx: ?*anyopaque, completion: Completion) JSC.Maybe(void) {
- if (comptime Environment.isLinux) {
- // std.debug.assert(this.watch_fd != 0);
- // TODO:
- return JSC.Maybe(void).success;
- } else if (comptime Environment.isMac) {
- if (this.watch_fd == 0) {
- this.watch_fd = std.c.kqueue();
- if (this.watch_fd == -1) {
- defer this.watch_fd = 0;
- return JSC.Maybe(void).errnoSys(this.watch_fd, .kqueue).?;
- }
- }
-
- var events_list = std.mem.zeroes([2]kevent64);
- events_list[0] = switch (flag) {
- .read => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_READ,
- .data = 0,
- .fflags = 0,
- .udata = @ptrToInt(ctx),
- .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
- .ext = .{ @ptrToInt(completion), 0 },
- },
- .write => .{
- .ident = @intCast(u64, fd),
- .filter = std.os.system.EVFILT_WRITE,
- .data = 0,
- .fflags = 0,
- .udata = @ptrToInt(ctx),
- .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
- .ext = .{ @ptrToInt(completion), 0 },
- },
- };
-
- // The kevent() system call returns the number of events placed in
- // the eventlist, up to the value given by nevents. If the time
- // limit expires, then kevent() returns 0.
- const rc = std.os.system.kevent64(
- this.watch_fd,
- &events_list,
- 1,
- // The same array may be used for the changelist and eventlist.
- &events_list,
- 1,
- 0,
- &timeout,
- );
-
- // If an error occurs while
- // processing an element of the changelist and there is enough room
- // in the eventlist, then the event will be placed in the eventlist
- // with EV_ERROR set in flags and the system error in data.
- if (events_list[0].flags == std.c.EV_ERROR) {
- return JSC.Maybe(void).errnoSys(events_list[0].data, .kevent).?;
- // Otherwise, -1 will be returned, and errno will be set to
- // indicate the error condition.
- }
-
- switch (rc) {
- std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(std.c.getErrno(rc)), .kevent).?,
- 0 => {
- this.active += 1;
- return JSC.Maybe(void).success;
- },
- 1 => {
- // if we immediately get an event, we can skip the reference counting
- dispatchKQueueEvent(&events_list[0]);
- return JSC.Maybe(void).success;
- },
- 2 => {
- dispatchKQueueEvent(&events_list[0]);
-
- this.active -= 1;
- dispatchKQueueEvent(&events_list[1]);
- return JSC.Maybe(void).success;
- },
- else => unreachable,
- }
- } else {
- @compileError("TODO: Poller");
- }
- }
-
- const kqueue_events_ = std.mem.zeroes([4]kevent64);
- pub fn tick(this: *Poller) void {
- if (comptime Environment.isMac) {
- if (this.active == 0) return;
-
- var events_list = kqueue_events_;
- // ub extern "c" fn kevent64(
- // kq: c_int,
- // changelist: [*]const kevent64_s,
- // nchanges: c_int,
- // eventlist: [*]kevent64_s,
- // nevents: c_int,
- // flags: c_uint,
- // timeout: ?*const timespec,
- // ) c_int;
- const rc = std.os.system.kevent64(
- this.watch_fd,
- &events_list,
- 0,
- // The same array may be used for the changelist and eventlist.
- &events_list,
- 4,
- 0,
- &timeout,
- );
-
- switch (rc) {
- std.math.minInt(@TypeOf(rc))...-1 => {
- // EINTR is fine
- switch (std.c.getErrno(rc)) {
- .INTR => return,
- else => |errno| std.debug.panic("kevent64() failed: {d}", .{errno}),
- }
- },
- 0 => {},
- 1 => {
- this.active -= 1;
- dispatchKQueueEvent(&events_list[0]);
- },
- 2 => {
- this.active -= 2;
- dispatchKQueueEvent(&events_list[0]);
- dispatchKQueueEvent(&events_list[1]);
- },
- 3 => {
- this.active -= 3;
- dispatchKQueueEvent(&events_list[0]);
- dispatchKQueueEvent(&events_list[1]);
- dispatchKQueueEvent(&events_list[2]);
- },
- 4 => {
- this.active -= 4;
- dispatchKQueueEvent(&events_list[0]);
- dispatchKQueueEvent(&events_list[1]);
- dispatchKQueueEvent(&events_list[2]);
- dispatchKQueueEvent(&events_list[3]);
- },
- else => unreachable,
- }
- }
- }
-
- pub const Flag = enum { read, write };
-};