diff options
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r-- | src/bun.js/event_loop.zig | 254 |
1 files changed, 97 insertions, 157 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 51d55d2e0..a68376872 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -33,6 +33,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type { allocator: std.mem.Allocator, promise: JSValue, globalThis: *JSGlobalObject, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { var this = try allocator.create(This); @@ -75,68 +76,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type { } pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); - } - - pub fn deinit(this: *This) void { - this.allocator.destroy(this); - } - }; -} - -pub fn SerialPromiseTask(comptime Context: type) type { - return struct { - const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1); - const This = @This(); - - ctx: *Context, - task: WorkPoolTask = .{ .callback = runFromThreadPool }, - event_loop: *JSC.EventLoop, - allocator: std.mem.Allocator, - promise: JSValue, - globalThis: *JSGlobalObject, - - pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { - var this = try allocator.create(This); - this.* = .{ - .event_loop = VirtualMachine.vm.event_loop, - .ctx = value, - .allocator = allocator, - .promise = JSValue.createInternalPromise(globalThis), - .globalThis = globalThis, - }; - js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef()); - VirtualMachine.vm.active_tasks +|= 1; - return this; - } - - pub fn runFromThreadPool(task: *WorkPoolTask) void { - var this = @fieldParentPtr(This, "task", task); - Context.run(this.ctx); - this.onFinish(); - } - - pub fn runFromJS(this: This) void { - var promise_value = this.promise; - var promise = promise_value.asInternalPromise() orelse { - if (comptime @hasDecl(Context, "deinit")) { - @call(.{}, Context.deinit, .{this.ctx}); - } - return; - }; - - var ctx = this.ctx; - - js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef()); - ctx.then(promise, this.globalThis); - } - - pub fn schedule(this: *This) void { - SerialWorkPool.schedule(&this.task); - } - - pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn deinit(this: *This) void { @@ -153,6 +93,7 @@ pub fn IOTask(comptime Context: type) type { event_loop: *JSC.EventLoop, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, + concurrent_task: ConcurrentTask = .{}, pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { var this = try allocator.create(This); @@ -182,53 +123,7 @@ pub fn IOTask(comptime Context: type) type { } pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); - } - - pub fn deinit(this: *This) void { - var allocator = this.allocator; - this.* = undefined; - allocator.destroy(this); - } - }; -} - -pub fn AsyncNativeCallbackTask(comptime Context: type) type { - return struct { - const This = @This(); - ctx: *Context, - task: WorkPoolTask = .{ .callback = runFromThreadPool }, - event_loop: *JSC.EventLoop, - allocator: std.mem.Allocator, - globalThis: *JSGlobalObject, - - pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { - var this = try allocator.create(This); - this.* = .{ - .event_loop = VirtualMachine.vm.eventLoop(), - .ctx = value, - .allocator = allocator, - .globalThis = globalThis, - }; - VirtualMachine.vm.active_tasks +|= 1; - return this; - } - - pub fn runFromThreadPool(task: *WorkPoolTask) void { - var this = @fieldParentPtr(This, "task", task); - Context.run(this.ctx, this); - } - - pub fn runFromJS(this: This) void { - this.ctx.runFromJS(this.globalThis); - } - - pub fn schedule(this: *This) void { - WorkPool.get().schedule(WorkPool.schedule(&this.task)); - } - - pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn deinit(this: *This) void { @@ -290,95 +185,97 @@ pub const Task = TaggedPointerUnion(.{ // PromiseTask, // TimeoutTasklet, }); +const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue; +pub const ConcurrentTask = struct { + task: Task = undefined, + next: ?*ConcurrentTask = null, + + pub const Queue = UnboundedQueue(ConcurrentTask, .next); + + pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask { + this.* = .{ + .task = Task.init(of), + .next = null, + }; + return this; + } +}; const AsyncIO = @import("io"); pub const EventLoop = struct { - ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), tasks: Queue = undefined, - concurrent_tasks: Queue = undefined, - concurrent_lock: Lock = Lock.init(), + concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, global: *JSGlobalObject = undefined, virtual_machine: *VirtualMachine = undefined, waker: ?AsyncIO.Waker = null, - + defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); pub fn tickWithCount(this: *EventLoop) u32 { - var finished: u32 = 0; var global = this.global; var global_vm = global.vm(); var vm_ = this.virtual_machine; + var counter: usize = 0; while (this.tasks.readItem()) |task| { + defer counter += 1; switch (task.tag()) { .Microtask => { var micro: *Microtask = task.as(Microtask); micro.run(global); - finished += 1; }, .MicrotaskForDefaultGlobalObject => { var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject); micro.run(global); - finished += 1; }, .FetchTasklet => { var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?; fetch_task.onDone(); - finished += 1; + fetch_task.deinit(); vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(AsyncTransformTask)) => { var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(CopyFilePromiseTask)) => { var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => { var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?; transform_task.*.runFromJS(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(BunTimerTimeoutTask)) => { var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?; transform_task.*.runFromJS(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(ReadFileTask)) => { var transform_task: *ReadFileTask = task.get(ReadFileTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(WriteFileTask)) => { var transform_task: *WriteFileTask = task.get(WriteFileTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(AnyTask))) => { var any: *AnyTask = task.get(AnyTask).?; any.run(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(CppTask))) => { var any: *CppTask = task.get(CppTask).?; any.run(global); - finished += 1; vm_.active_tasks -|= 1; }, else => if (Environment.allow_assert) { @@ -390,30 +287,39 @@ pub const EventLoop = struct { global_vm.drainMicrotasks(); } - if (finished > 0) { - _ = this.pending_tasks_count.fetchSub(finished, .Monotonic); + if (this.tasks.count == 0) { + this.tasks.head = 0; } - return finished; + return @truncate(u32, counter); } pub fn tickConcurrent(this: *EventLoop) void { - if (this.ready_tasks_count.load(.Monotonic) > 0) { - this.concurrent_lock.lock(); - defer this.concurrent_lock.unlock(); - const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength()); + _ = this.tickConcurrentWithCount(); + } - // TODO: optimzie - this.tasks.ensureUnusedCapacity(add) catch unreachable; + pub fn tickConcurrentWithCount(this: *EventLoop) usize { + var concurrent = this.concurrent_tasks.popBatch(); + const count = concurrent.count; + if (count == 0) + return 0; - { - this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0)); - this.concurrent_tasks.discard(this.concurrent_tasks.count); - } + var iter = concurrent.iterator(); + const start_count = this.tasks.count; + if (start_count == 0) { + this.tasks.head = 0; + } - _ = this.pending_tasks_count.fetchAdd(add, .Monotonic); - _ = this.ready_tasks_count.fetchSub(add, .Monotonic); + this.tasks.ensureUnusedCapacity(count) catch unreachable; + var writable = this.tasks.writableSlice(0); + while (iter.next()) |task| { + writable[0] = task.task; + writable = writable[1..]; + this.tasks.count += 1; + if (writable.len == 0) break; } + + return this.tasks.count - start_count; } // TODO: fix this technical debt @@ -423,7 +329,9 @@ pub const EventLoop = struct { this.tickConcurrent(); var global_vm = ctx.global.vm(); while (true) { - while (this.tickWithCount() > 0) {} else { + while (this.tickWithCount() > 0) { + this.tickConcurrent(); + } else { global_vm.releaseWeakRefs(); global_vm.drainMicrotasks(); this.tickConcurrent(); @@ -436,13 +344,28 @@ pub const EventLoop = struct { break; } - if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) { + this.global.handleRejectedPromises(); + } + + pub fn runUSocketsLoop(this: *EventLoop) void { + var ctx = this.virtual_machine; + + ctx.global.vm().releaseWeakRefs(); + ctx.global.vm().drainMicrotasks(); + + if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) { + if (this.tickConcurrentWithCount() > 0) { + this.tick(); + } else if (ctx.uws_event_loop.?.num_polls > 0) { + if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) { + _ = ctx.global.vm().runGC(true); + } + } + ctx.is_us_loop_entered = true; ctx.enterUWSLoop(); ctx.is_us_loop_entered = false; } - - this.global.handleRejectedPromises(); } // TODO: fix this technical debt @@ -451,6 +374,10 @@ pub const EventLoop = struct { JSC.JSPromise.Status.Pending => { while (promise.status(this.global.vm()) == .Pending) { this.tick(); + + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } } }, else => {}, @@ -459,13 +386,20 @@ pub const EventLoop = struct { pub fn waitForTasks(this: *EventLoop) void { this.tick(); - while (this.pending_tasks_count.load(.Monotonic) > 0) { + while (this.tasks.count > 0) { this.tick(); + + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } + } else { + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } } } pub fn enqueueTask(this: *EventLoop, task: Task) void { - _ = this.pending_tasks_count.fetchAdd(1, .Monotonic); this.tasks.writeItem(task) catch unreachable; } @@ -476,19 +410,25 @@ pub const EventLoop = struct { } } - pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void { + pub fn onDefer(this: *EventLoop) void { + this.defer_count.store(0, .Monotonic); + this.tick(); + } + + pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void { JSC.markBinding(); - this.concurrent_lock.lock(); - defer this.concurrent_lock.unlock(); - this.concurrent_tasks.writeItem(task) catch unreachable; + + this.concurrent_tasks.push(task); + if (this.virtual_machine.uws_event_loop) |loop| { - loop.nextTick(*EventLoop, this, EventLoop.tick); + const deferCount = this.defer_count.fetchAdd(1, .Monotonic); + if (deferCount == 0) { + loop.nextTick(*EventLoop, this, onDefer); + } } - if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) { - if (this.waker) |*waker| { - waker.wake() catch unreachable; - } + if (this.waker) |*waker| { + waker.wake() catch unreachable; } } }; |