diff options
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r-- | src/bun.js/event_loop.zig | 216 |
1 files changed, 212 insertions, 4 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 6928cd2b8..35ba93eaa 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -12,7 +12,7 @@ const Bun = JSC.API.Bun; const TaggedPointerUnion = @import("../tagged_pointer.zig").TaggedPointerUnion; const typeBaseName = @import("../meta.zig").typeBaseName; const CopyFilePromiseTask = WebCore.Blob.Store.CopyFile.CopyFilePromiseTask; -const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask; +const AsyncTransformTask = JSC.API.JSTranspiler.TransformTask.AsyncTransformTask; const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask; const WriteFileTask = WebCore.Blob.Store.WriteFile.WriteFileTask; const napi_async_work = JSC.napi.napi_async_work; @@ -154,7 +154,9 @@ pub const AnyTask = struct { pub fn run(this: *AnyTask) void { @setRuntimeSafety(false); - this.callback(this.ctx.?); + var callback = this.callback; + var ctx = this.ctx; + callback(ctx.?); } pub fn New(comptime Type: type, comptime Callback: anytype) type { @@ -167,7 +169,42 @@ pub const AnyTask = struct { } pub fn wrap(this: ?*anyopaque) void { - Callback(@ptrCast(*Type, @alignCast(@alignOf(Type), this.?))); + @call(.always_inline, Callback, .{@ptrCast(*Type, @alignCast(@alignOf(Type), this.?))}); + } + }; + } +}; + +pub const AnyTaskWithExtraContext = struct { + ctx: ?*anyopaque, + callback: *const (fn (*anyopaque, *anyopaque) void), + next: ?*AnyTaskWithExtraContext = null, + + pub fn run(this: *AnyTaskWithExtraContext, extra: *anyopaque) void { + @setRuntimeSafety(false); + var callback = this.callback; + var ctx = this.ctx; + callback(ctx.?, extra); + } + + pub fn New(comptime Type: type, comptime ContextType: type, comptime Callback: anytype) type { + return struct { + pub fn init(ctx: *Type) AnyTaskWithExtraContext { + return AnyTaskWithExtraContext{ + .callback = wrap, + .ctx = ctx, + }; + } + + pub fn wrap(this: ?*anyopaque, extra: ?*anyopaque) void { + @call( + .always_inline, + Callback, + .{ + @ptrCast(*Type, @alignCast(@alignOf(Type), this.?)), + @ptrCast(*ContextType, @alignCast(@alignOf(ContextType), extra.?)), + }, + ); } }; } @@ -411,7 +448,7 @@ pub const EventLoop = struct { transform_task.*.runFromJS(); transform_task.deinit(); }, - .HotReloadTask => { + @field(Task.Tag, @typeName(HotReloadTask)) => { var transform_task: *HotReloadTask = task.get(HotReloadTask).?; transform_task.*.run(); transform_task.deinit(); @@ -673,3 +710,174 @@ pub const EventLoop = struct { } } }; + +pub const MiniEventLoop = struct { + tasks: Queue, + concurrent_tasks: UnboundedQueue(AnyTaskWithExtraContext, .next) = .{}, + loop: *uws.Loop, + allocator: std.mem.Allocator, + + const Queue = std.fifo.LinearFifo(*AnyTaskWithExtraContext, .Dynamic); + + pub const Task = AnyTaskWithExtraContext; + + pub fn init( + allocator: std.mem.Allocator, + ) MiniEventLoop { + return .{ + .tasks = Queue.init(allocator), + .allocator = allocator, + .loop = uws.Loop.get().?, + }; + } + + pub fn deinit(this: *MiniEventLoop) void { + this.tasks.deinit(); + std.debug.assert(this.concurrent_tasks.isEmpty()); + } + + pub fn tickConcurrentWithCount(this: *MiniEventLoop) usize { + var concurrent = this.concurrent_tasks.popBatch(); + const count = concurrent.count; + if (count == 0) + return 0; + + var iter = concurrent.iterator(); + const start_count = this.tasks.count; + if (start_count == 0) { + this.tasks.head = 0; + } + + this.tasks.ensureUnusedCapacity(count) catch unreachable; + var writable = this.tasks.writableSlice(0); + while (iter.next()) |task| { + writable[0] = task; + writable = writable[1..]; + this.tasks.count += 1; + if (writable.len == 0) break; + } + + return this.tasks.count - start_count; + } + + pub fn tick( + this: *MiniEventLoop, + context: *anyopaque, + ) void { + while (true) { + _ = this.tickConcurrentWithCount(); + while (this.tasks.readItem()) |task| { + task.run(context); + } + + if (this.tickConcurrentWithCount() == 0) { + if (this.loop.active > 0 or this.loop.num_polls > 0) { + this.loop.run(); + continue; + } + + break; + } + } + } + + pub fn enqueueTask( + this: *MiniEventLoop, + comptime Context: type, + ctx: *Context, + comptime Callback: fn (*Context) void, + comptime field: std.meta.FieldEnum(Context), + ) void { + const TaskType = MiniEventLoop.Task.New(Context, Callback); + @field(ctx, @tagName(field)) = TaskType.init(ctx); + this.enqueueJSCTask(&@field(ctx, @tagName(field))); + } + + pub fn enqueueTaskConcurrent( + this: *MiniEventLoop, + comptime Context: type, + comptime ParentContext: type, + ctx: *Context, + comptime Callback: fn (*Context, *ParentContext) void, + comptime field: std.meta.FieldEnum(Context), + ) void { + JSC.markBinding(@src()); + const TaskType = MiniEventLoop.Task.New(Context, ParentContext, Callback); + @field(ctx, @tagName(field)) = TaskType.init(ctx); + + this.concurrent_tasks.push(&@field(ctx, @tagName(field))); + + this.loop.wakeup(); + } +}; + +pub const AnyEventLoop = union(enum) { + jsc: *EventLoop, + mini: MiniEventLoop, + + pub const Task = AnyTaskWithExtraContext; + + pub fn fromJSC( + this: *AnyEventLoop, + jsc: *EventLoop, + ) void { + this.* = .{ .jsc = jsc }; + } + + pub fn init( + allocator: std.mem.Allocator, + ) AnyEventLoop { + return .{ .mini = MiniEventLoop.init(allocator) }; + } + + // pub fn enqueueTask( + // this: *AnyEventLoop, + // comptime Context: type, + // ctx: *Context, + // comptime Callback: fn (*Context) void, + // comptime field: std.meta.FieldEnum(Context), + // ) void { + // const TaskType = MiniEventLoop.Task.New(Context, Callback); + // @field(ctx, field) = TaskType.init(ctx); + // this.enqueueTaskConcurrent(&@field(ctx, field)); + // } + + pub fn tick( + this: *AnyEventLoop, + context: *anyopaque, + ) void { + switch (this.*) { + .jsc => { + this.jsc.tick(); + this.jsc.autoTick(); + }, + .mini => { + this.mini.tick(context); + }, + } + } + + pub fn enqueueTaskConcurrent( + this: *AnyEventLoop, + comptime Context: type, + comptime ParentContext: type, + ctx: *Context, + comptime Callback: fn (*Context, *ParentContext) void, + comptime field: std.meta.FieldEnum(Context), + ) void { + switch (this.*) { + .jsc => { + unreachable; // TODO: + // const TaskType = AnyTask.New(Context, Callback); + // @field(ctx, field) = TaskType.init(ctx); + // var concurrent = bun.default_allocator.create(ConcurrentTask) catch unreachable; + // _ = concurrent.from(JSC.Task.init(&@field(ctx, field))); + // concurrent.auto_delete = true; + // this.jsc.enqueueTaskConcurrent(concurrent); + }, + .mini => { + this.mini.enqueueTaskConcurrent(Context, ParentContext, ctx, Callback, field); + }, + } + } +}; |