aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/event_loop.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/event_loop.zig')
-rw-r--r--src/bun.js/event_loop.zig254
1 files changed, 97 insertions, 157 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 51d55d2e0..a68376872 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -33,6 +33,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type {
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
@@ -75,68 +76,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type {
}
pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- this.allocator.destroy(this);
- }
- };
-}
-
-pub fn SerialPromiseTask(comptime Context: type) type {
- return struct {
- const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1);
- const This = @This();
-
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- promise: JSValue,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.event_loop,
- .ctx = value,
- .allocator = allocator,
- .promise = JSValue.createInternalPromise(globalThis),
- .globalThis = globalThis,
- };
- js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx);
- this.onFinish();
- }
-
- pub fn runFromJS(this: This) void {
- var promise_value = this.promise;
- var promise = promise_value.asInternalPromise() orelse {
- if (comptime @hasDecl(Context, "deinit")) {
- @call(.{}, Context.deinit, .{this.ctx});
- }
- return;
- };
-
- var ctx = this.ctx;
-
- js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
- ctx.then(promise, this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- SerialWorkPool.schedule(&this.task);
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
@@ -153,6 +93,7 @@ pub fn IOTask(comptime Context: type) type {
event_loop: *JSC.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
+ concurrent_task: ConcurrentTask = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
@@ -182,53 +123,7 @@ pub fn IOTask(comptime Context: type) type {
}
pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- var allocator = this.allocator;
- this.* = undefined;
- allocator.destroy(this);
- }
- };
-}
-
-pub fn AsyncNativeCallbackTask(comptime Context: type) type {
- return struct {
- const This = @This();
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.eventLoop(),
- .ctx = value,
- .allocator = allocator,
- .globalThis = globalThis,
- };
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx, this);
- }
-
- pub fn runFromJS(this: This) void {
- this.ctx.runFromJS(this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- WorkPool.get().schedule(WorkPool.schedule(&this.task));
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
@@ -290,95 +185,97 @@ pub const Task = TaggedPointerUnion(.{
// PromiseTask,
// TimeoutTasklet,
});
+const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
+pub const ConcurrentTask = struct {
+ task: Task = undefined,
+ next: ?*ConcurrentTask = null,
+
+ pub const Queue = UnboundedQueue(ConcurrentTask, .next);
+
+ pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask {
+ this.* = .{
+ .task = Task.init(of),
+ .next = null,
+ };
+ return this;
+ }
+};
const AsyncIO = @import("io");
pub const EventLoop = struct {
- ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
tasks: Queue = undefined,
- concurrent_tasks: Queue = undefined,
- concurrent_lock: Lock = Lock.init(),
+ concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
global: *JSGlobalObject = undefined,
virtual_machine: *VirtualMachine = undefined,
waker: ?AsyncIO.Waker = null,
-
+ defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
pub fn tickWithCount(this: *EventLoop) u32 {
- var finished: u32 = 0;
var global = this.global;
var global_vm = global.vm();
var vm_ = this.virtual_machine;
+ var counter: usize = 0;
while (this.tasks.readItem()) |task| {
+ defer counter += 1;
switch (task.tag()) {
.Microtask => {
var micro: *Microtask = task.as(Microtask);
micro.run(global);
- finished += 1;
},
.MicrotaskForDefaultGlobalObject => {
var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject);
micro.run(global);
- finished += 1;
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
- finished += 1;
+ fetch_task.deinit();
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
transform_task.*.runFromJS();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
var any: *CppTask = task.get(CppTask).?;
any.run(global);
- finished += 1;
vm_.active_tasks -|= 1;
},
else => if (Environment.allow_assert) {
@@ -390,30 +287,39 @@ pub const EventLoop = struct {
global_vm.drainMicrotasks();
}
- if (finished > 0) {
- _ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
+ if (this.tasks.count == 0) {
+ this.tasks.head = 0;
}
- return finished;
+ return @truncate(u32, counter);
}
pub fn tickConcurrent(this: *EventLoop) void {
- if (this.ready_tasks_count.load(.Monotonic) > 0) {
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
+ _ = this.tickConcurrentWithCount();
+ }
- // TODO: optimzie
- this.tasks.ensureUnusedCapacity(add) catch unreachable;
+ pub fn tickConcurrentWithCount(this: *EventLoop) usize {
+ var concurrent = this.concurrent_tasks.popBatch();
+ const count = concurrent.count;
+ if (count == 0)
+ return 0;
- {
- this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
- this.concurrent_tasks.discard(this.concurrent_tasks.count);
- }
+ var iter = concurrent.iterator();
+ const start_count = this.tasks.count;
+ if (start_count == 0) {
+ this.tasks.head = 0;
+ }
- _ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
- _ = this.ready_tasks_count.fetchSub(add, .Monotonic);
+ this.tasks.ensureUnusedCapacity(count) catch unreachable;
+ var writable = this.tasks.writableSlice(0);
+ while (iter.next()) |task| {
+ writable[0] = task.task;
+ writable = writable[1..];
+ this.tasks.count += 1;
+ if (writable.len == 0) break;
}
+
+ return this.tasks.count - start_count;
}
// TODO: fix this technical debt
@@ -423,7 +329,9 @@ pub const EventLoop = struct {
this.tickConcurrent();
var global_vm = ctx.global.vm();
while (true) {
- while (this.tickWithCount() > 0) {} else {
+ while (this.tickWithCount() > 0) {
+ this.tickConcurrent();
+ } else {
global_vm.releaseWeakRefs();
global_vm.drainMicrotasks();
this.tickConcurrent();
@@ -436,13 +344,28 @@ pub const EventLoop = struct {
break;
}
- if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ this.global.handleRejectedPromises();
+ }
+
+ pub fn runUSocketsLoop(this: *EventLoop) void {
+ var ctx = this.virtual_machine;
+
+ ctx.global.vm().releaseWeakRefs();
+ ctx.global.vm().drainMicrotasks();
+
+ if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ if (this.tickConcurrentWithCount() > 0) {
+ this.tick();
+ } else if (ctx.uws_event_loop.?.num_polls > 0) {
+ if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) {
+ _ = ctx.global.vm().runGC(true);
+ }
+ }
+
ctx.is_us_loop_entered = true;
ctx.enterUWSLoop();
ctx.is_us_loop_entered = false;
}
-
- this.global.handleRejectedPromises();
}
// TODO: fix this technical debt
@@ -451,6 +374,10 @@ pub const EventLoop = struct {
JSC.JSPromise.Status.Pending => {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
+
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
}
},
else => {},
@@ -459,13 +386,20 @@ pub const EventLoop = struct {
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
- while (this.pending_tasks_count.load(.Monotonic) > 0) {
+ while (this.tasks.count > 0) {
this.tick();
+
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
+ } else {
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
- _ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
this.tasks.writeItem(task) catch unreachable;
}
@@ -476,19 +410,25 @@ pub const EventLoop = struct {
}
}
- pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
+ pub fn onDefer(this: *EventLoop) void {
+ this.defer_count.store(0, .Monotonic);
+ this.tick();
+ }
+
+ pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void {
JSC.markBinding();
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- this.concurrent_tasks.writeItem(task) catch unreachable;
+
+ this.concurrent_tasks.push(task);
+
if (this.virtual_machine.uws_event_loop) |loop| {
- loop.nextTick(*EventLoop, this, EventLoop.tick);
+ const deferCount = this.defer_count.fetchAdd(1, .Monotonic);
+ if (deferCount == 0) {
+ loop.nextTick(*EventLoop, this, onDefer);
+ }
}
- if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) {
- if (this.waker) |*waker| {
- waker.wake() catch unreachable;
- }
+ if (this.waker) |*waker| {
+ waker.wake() catch unreachable;
}
}
};