diff options
Diffstat (limited to 'src/bun.js')
-rw-r--r-- | src/bun.js/api/bun.zig | 34 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 50 | ||||
-rw-r--r-- | src/bun.js/base.zig | 41 | ||||
-rw-r--r-- | src/bun.js/bindings/JSBuffer.cpp | 7 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 254 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 19 | ||||
-rw-r--r-- | src/bun.js/unbounded_queue.zig | 149 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 280 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 3 |
9 files changed, 470 insertions, 367 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index a37d5d62c..3fafdc177 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -1101,6 +1101,9 @@ pub const Class = NewClass( .nanoseconds = .{ .rfn = nanoseconds, }, + .DO_NOT_USE_OR_YOU_WILL_BE_FIRED_mimalloc_dump = .{ + .rfn = dump_mimalloc, + }, .gzipSync = .{ .rfn = JSC.wrapWithHasContainer(JSZlib, "gzipSync", false, false, true), }, @@ -1191,6 +1194,18 @@ pub const Class = NewClass( }, ); +fn dump_mimalloc( + _: void, + globalThis: JSC.C.JSContextRef, + _: JSC.C.JSObjectRef, + _: JSC.C.JSObjectRef, + _: []const JSC.C.JSValueRef, + _: JSC.C.ExceptionRef, +) JSC.C.JSValueRef { + globalThis.bunVM().arena.dumpThreadStats(); + return JSC.JSValue.jsUndefined().asObjectRef(); +} + pub const Crypto = struct { const Hashers = @import("../../sha.zig"); @@ -2101,6 +2116,8 @@ pub const Timer = struct { return VirtualMachine.vm.timer.last_id; } + const Pool = bun.ObjectPool(Timeout, null, true, 1000); + pub const Timeout = struct { id: i32 = 0, callback: JSValue, @@ -2134,11 +2151,13 @@ pub const Timer = struct { if (comptime JSC.is_bindgen) unreachable; + var vm = global.bunVM(); + if (!this.cancelled) { if (this.repeat) { this.io_task.?.deinit(); - var task = Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, global, this) catch unreachable; - VirtualMachine.vm.timer.timeouts.put(VirtualMachine.vm.allocator, this.id, this) catch unreachable; + var task = Timeout.TimeoutTask.createOnJSThread(vm.allocator, global, this) catch unreachable; + vm.timer.timeouts.put(vm.allocator, this.id, this) catch unreachable; this.io_task = task; task.schedule(); } @@ -2148,12 +2167,12 @@ pub const Timer = struct { if (this.repeat) return; - VirtualMachine.vm.timer.active -|= 1; - VirtualMachine.vm.active_tasks -|= 1; + vm.timer.active -|= 1; + vm.active_tasks -|= 1; } else { // the active tasks count is already cleared for canceled timeout, // add one here to neutralize the `-|= 1` in event loop. - VirtualMachine.vm.active_tasks +|= 1; + vm.active_tasks +|= 1; } this.clear(global); @@ -2168,8 +2187,9 @@ pub const Timer = struct { _ = VirtualMachine.vm.timer.timeouts.swapRemove(this.id); if (this.io_task) |task| { task.deinit(); + this.io_task = null; } - VirtualMachine.vm.allocator.destroy(this); + Pool.releaseValue(this); } }; @@ -2181,7 +2201,7 @@ pub const Timer = struct { repeat: bool, ) !void { if (comptime is_bindgen) unreachable; - var timeout = try VirtualMachine.vm.allocator.create(Timeout); + var timeout = Pool.first(globalThis.bunVM().allocator); js.JSValueProtect(globalThis.ref(), callback.asObjectRef()); timeout.* = Timeout{ .id = id, .callback = callback, .interval = countdown.toInt32(), .repeat = repeat }; var task = try Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, globalThis, timeout); diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index b79e6c7ab..55b829caa 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1850,6 +1850,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { has_js_deinited: bool = false, listen_callback: JSC.AnyTask = undefined, allocator: std.mem.Allocator, + keeping_js_alive: bool = false, pub const Class = JSC.NewClass( ThisServer, @@ -1917,15 +1918,17 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { } pub fn deinitIfWeCan(this: *ThisServer) void { - if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited) + if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited) { + this.deref(); this.deinit(); + } } pub fn stop(this: *ThisServer) void { if (this.listener) |listener| { - listener.close(); this.listener = null; - this.vm.disable_run_us_loop = false; + this.deref(); + listener.close(); } this.deinitIfWeCan(); @@ -2038,22 +2041,26 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { this.listener = socket; const needs_post_handler = this.vm.uws_event_loop == null; this.vm.uws_event_loop = uws.Loop.get(); - this.listen_callback = JSC.AnyTask.New(ThisServer, run).init(this); - this.vm.eventLoop().enqueueTask(JSC.Task.init(&this.listen_callback)); + this.ref(); + if (needs_post_handler) { _ = this.vm.uws_event_loop.?.addPostHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick); + _ = this.vm.uws_event_loop.?.addPreHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick); } } - pub fn run(this: *ThisServer) void { - // this.app.addServerName(hostname_pattern: [*:0]const u8) + pub fn ref(this: *ThisServer) void { + if (this.keeping_js_alive) return; + + this.vm.us_loop_reference_count +|= 1; + this.keeping_js_alive = true; + } - // we do not increment the reference count here - // uWS manages running the loop, so it is unnecessary - // this.vm.us_loop_reference_count +|= 1; - this.vm.disable_run_us_loop = true; + pub fn deref(this: *ThisServer) void { + if (!this.keeping_js_alive) return; - this.app.run(); + this.vm.us_loop_reference_count -|= 1; + this.keeping_js_alive = false; } pub fn onBunInfoRequest(this: *ThisServer, req: *uws.Request, resp: *App.Response) void { @@ -2286,11 +2293,20 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { this.app.get("/src:/*", *ThisServer, this, onSrcRequest); } - this.app.listenWithConfig(*ThisServer, this, onListen, .{ - .port = this.config.port, - .host = this.config.hostname, - .options = 0, - }); + const hostname = bun.span(this.config.hostname); + + if (!(hostname.len == 0 or strings.eqlComptime(hostname, "0.0.0.0"))) { + this.app.listenWithConfig(*ThisServer, this, onListen, .{ + .port = this.config.port, + .options = 0, + }); + } else { + this.app.listenWithConfig(*ThisServer, this, onListen, .{ + .port = this.config.port, + .host = this.config.hostname, + .options = 0, + }); + } } }; } diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 2ded4e5cb..6a4cc7469 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -2770,7 +2770,6 @@ pub fn castObj(obj: js.JSObjectRef, comptime Type: type) *Type { const JSNode = @import("../js_ast.zig").Macro.JSNode; const LazyPropertiesObject = @import("../js_ast.zig").Macro.LazyPropertiesObject; const ModuleNamespace = @import("../js_ast.zig").Macro.ModuleNamespace; -const FetchTaskletContext = Fetch.FetchTasklet.FetchTaskletContext; const Expect = Test.Expect; const DescribeScope = Test.DescribeScope; const TestScope = Test.TestScope; @@ -2824,7 +2823,6 @@ pub const JSPrivateDataPtr = TaggedPointerUnion(.{ Expect, ExpectPrototype, FetchEvent, - FetchTaskletContext, HTMLRewriter, JSNode, LazyPropertiesObject, @@ -3497,25 +3495,26 @@ pub fn wrapWithHasContainer( } if (comptime maybe_async) { - var vm = ctx.ptr().bunVM(); - vm.tick(); - - var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result); - - switch (promise.status(ctx.ptr().vm())) { - JSC.JSPromise.Status.Pending => { - while (promise.status(ctx.ptr().vm()) == .Pending) { - vm.tick(); - } - result = promise.result(ctx.ptr().vm()); - }, - JSC.JSPromise.Status.Rejected => { - result = promise.result(ctx.ptr().vm()); - exception.* = result.asObjectRef(); - }, - JSC.JSPromise.Status.Fulfilled => { - result = promise.result(ctx.ptr().vm()); - }, + if (result.asPromise() != null or result.asInternalPromise() != null) { + var vm = ctx.ptr().bunVM(); + vm.tick(); + var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result); + + switch (promise.status(ctx.ptr().vm())) { + JSC.JSPromise.Status.Pending => { + while (promise.status(ctx.ptr().vm()) == .Pending) { + vm.tick(); + } + result = promise.result(ctx.ptr().vm()); + }, + JSC.JSPromise.Status.Rejected => { + result = promise.result(ctx.ptr().vm()); + exception.* = result.asObjectRef(); + }, + JSC.JSPromise.Status.Fulfilled => { + result = promise.result(ctx.ptr().vm()); + }, + } } } diff --git a/src/bun.js/bindings/JSBuffer.cpp b/src/bun.js/bindings/JSBuffer.cpp index 4b509f257..a1b9a5d40 100644 --- a/src/bun.js/bindings/JSBuffer.cpp +++ b/src/bun.js/bindings/JSBuffer.cpp @@ -248,8 +248,8 @@ static EncodedJSValue constructFromEncoding(JSGlobalObject* lexicalGlobalObject, result = Bun__encoding__constructFromLatin1(lexicalGlobalObject, view.characters8(), view.length(), static_cast<uint8_t>(encoding)); break; } - case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1 - case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion. + case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1 + case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion. result = JSBuffer__bufferFromPointerAndLength(lexicalGlobalObject, view.characters8(), view.length()); break; } @@ -1216,7 +1216,7 @@ static inline JSC::EncodedJSValue jsBufferPrototypeFunction_writeBody(JSC::JSGlo if (callFrame->argumentCount() > 2) { uint32_t arg_len = 0; arg_len = callFrame->argument(2).toUInt32(lexicalGlobalObject); - length = std::min(arg_len, length-offset); + length = std::min(arg_len, length - offset); } if (callFrame->argumentCount() > 2) { @@ -1329,7 +1329,6 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSBufferConstructor::construc JSC::JSObject* constructor = lexicalGlobalObject->m_typedArrayUint8.constructor(lexicalGlobalObject); - // TODO: avoid this copy MarkedArgumentBuffer args; for (size_t i = 0; i < argsCount; ++i) args.append(callFrame->uncheckedArgument(i)); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 51d55d2e0..a68376872 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -33,6 +33,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type { allocator: std.mem.Allocator, promise: JSValue, globalThis: *JSGlobalObject, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { var this = try allocator.create(This); @@ -75,68 +76,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type { } pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); - } - - pub fn deinit(this: *This) void { - this.allocator.destroy(this); - } - }; -} - -pub fn SerialPromiseTask(comptime Context: type) type { - return struct { - const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1); - const This = @This(); - - ctx: *Context, - task: WorkPoolTask = .{ .callback = runFromThreadPool }, - event_loop: *JSC.EventLoop, - allocator: std.mem.Allocator, - promise: JSValue, - globalThis: *JSGlobalObject, - - pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { - var this = try allocator.create(This); - this.* = .{ - .event_loop = VirtualMachine.vm.event_loop, - .ctx = value, - .allocator = allocator, - .promise = JSValue.createInternalPromise(globalThis), - .globalThis = globalThis, - }; - js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef()); - VirtualMachine.vm.active_tasks +|= 1; - return this; - } - - pub fn runFromThreadPool(task: *WorkPoolTask) void { - var this = @fieldParentPtr(This, "task", task); - Context.run(this.ctx); - this.onFinish(); - } - - pub fn runFromJS(this: This) void { - var promise_value = this.promise; - var promise = promise_value.asInternalPromise() orelse { - if (comptime @hasDecl(Context, "deinit")) { - @call(.{}, Context.deinit, .{this.ctx}); - } - return; - }; - - var ctx = this.ctx; - - js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef()); - ctx.then(promise, this.globalThis); - } - - pub fn schedule(this: *This) void { - SerialWorkPool.schedule(&this.task); - } - - pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn deinit(this: *This) void { @@ -153,6 +93,7 @@ pub fn IOTask(comptime Context: type) type { event_loop: *JSC.EventLoop, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, + concurrent_task: ConcurrentTask = .{}, pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { var this = try allocator.create(This); @@ -182,53 +123,7 @@ pub fn IOTask(comptime Context: type) type { } pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); - } - - pub fn deinit(this: *This) void { - var allocator = this.allocator; - this.* = undefined; - allocator.destroy(this); - } - }; -} - -pub fn AsyncNativeCallbackTask(comptime Context: type) type { - return struct { - const This = @This(); - ctx: *Context, - task: WorkPoolTask = .{ .callback = runFromThreadPool }, - event_loop: *JSC.EventLoop, - allocator: std.mem.Allocator, - globalThis: *JSGlobalObject, - - pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { - var this = try allocator.create(This); - this.* = .{ - .event_loop = VirtualMachine.vm.eventLoop(), - .ctx = value, - .allocator = allocator, - .globalThis = globalThis, - }; - VirtualMachine.vm.active_tasks +|= 1; - return this; - } - - pub fn runFromThreadPool(task: *WorkPoolTask) void { - var this = @fieldParentPtr(This, "task", task); - Context.run(this.ctx, this); - } - - pub fn runFromJS(this: This) void { - this.ctx.runFromJS(this.globalThis); - } - - pub fn schedule(this: *This) void { - WorkPool.get().schedule(WorkPool.schedule(&this.task)); - } - - pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn deinit(this: *This) void { @@ -290,95 +185,97 @@ pub const Task = TaggedPointerUnion(.{ // PromiseTask, // TimeoutTasklet, }); +const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue; +pub const ConcurrentTask = struct { + task: Task = undefined, + next: ?*ConcurrentTask = null, + + pub const Queue = UnboundedQueue(ConcurrentTask, .next); + + pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask { + this.* = .{ + .task = Task.init(of), + .next = null, + }; + return this; + } +}; const AsyncIO = @import("io"); pub const EventLoop = struct { - ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), tasks: Queue = undefined, - concurrent_tasks: Queue = undefined, - concurrent_lock: Lock = Lock.init(), + concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, global: *JSGlobalObject = undefined, virtual_machine: *VirtualMachine = undefined, waker: ?AsyncIO.Waker = null, - + defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); pub fn tickWithCount(this: *EventLoop) u32 { - var finished: u32 = 0; var global = this.global; var global_vm = global.vm(); var vm_ = this.virtual_machine; + var counter: usize = 0; while (this.tasks.readItem()) |task| { + defer counter += 1; switch (task.tag()) { .Microtask => { var micro: *Microtask = task.as(Microtask); micro.run(global); - finished += 1; }, .MicrotaskForDefaultGlobalObject => { var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject); micro.run(global); - finished += 1; }, .FetchTasklet => { var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?; fetch_task.onDone(); - finished += 1; + fetch_task.deinit(); vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(AsyncTransformTask)) => { var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(CopyFilePromiseTask)) => { var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => { var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?; transform_task.*.runFromJS(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(BunTimerTimeoutTask)) => { var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?; transform_task.*.runFromJS(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(ReadFileTask)) => { var transform_task: *ReadFileTask = task.get(ReadFileTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(WriteFileTask)) => { var transform_task: *WriteFileTask = task.get(WriteFileTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(AnyTask))) => { var any: *AnyTask = task.get(AnyTask).?; any.run(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(CppTask))) => { var any: *CppTask = task.get(CppTask).?; any.run(global); - finished += 1; vm_.active_tasks -|= 1; }, else => if (Environment.allow_assert) { @@ -390,30 +287,39 @@ pub const EventLoop = struct { global_vm.drainMicrotasks(); } - if (finished > 0) { - _ = this.pending_tasks_count.fetchSub(finished, .Monotonic); + if (this.tasks.count == 0) { + this.tasks.head = 0; } - return finished; + return @truncate(u32, counter); } pub fn tickConcurrent(this: *EventLoop) void { - if (this.ready_tasks_count.load(.Monotonic) > 0) { - this.concurrent_lock.lock(); - defer this.concurrent_lock.unlock(); - const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength()); + _ = this.tickConcurrentWithCount(); + } - // TODO: optimzie - this.tasks.ensureUnusedCapacity(add) catch unreachable; + pub fn tickConcurrentWithCount(this: *EventLoop) usize { + var concurrent = this.concurrent_tasks.popBatch(); + const count = concurrent.count; + if (count == 0) + return 0; - { - this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0)); - this.concurrent_tasks.discard(this.concurrent_tasks.count); - } + var iter = concurrent.iterator(); + const start_count = this.tasks.count; + if (start_count == 0) { + this.tasks.head = 0; + } - _ = this.pending_tasks_count.fetchAdd(add, .Monotonic); - _ = this.ready_tasks_count.fetchSub(add, .Monotonic); + this.tasks.ensureUnusedCapacity(count) catch unreachable; + var writable = this.tasks.writableSlice(0); + while (iter.next()) |task| { + writable[0] = task.task; + writable = writable[1..]; + this.tasks.count += 1; + if (writable.len == 0) break; } + + return this.tasks.count - start_count; } // TODO: fix this technical debt @@ -423,7 +329,9 @@ pub const EventLoop = struct { this.tickConcurrent(); var global_vm = ctx.global.vm(); while (true) { - while (this.tickWithCount() > 0) {} else { + while (this.tickWithCount() > 0) { + this.tickConcurrent(); + } else { global_vm.releaseWeakRefs(); global_vm.drainMicrotasks(); this.tickConcurrent(); @@ -436,13 +344,28 @@ pub const EventLoop = struct { break; } - if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) { + this.global.handleRejectedPromises(); + } + + pub fn runUSocketsLoop(this: *EventLoop) void { + var ctx = this.virtual_machine; + + ctx.global.vm().releaseWeakRefs(); + ctx.global.vm().drainMicrotasks(); + + if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) { + if (this.tickConcurrentWithCount() > 0) { + this.tick(); + } else if (ctx.uws_event_loop.?.num_polls > 0) { + if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) { + _ = ctx.global.vm().runGC(true); + } + } + ctx.is_us_loop_entered = true; ctx.enterUWSLoop(); ctx.is_us_loop_entered = false; } - - this.global.handleRejectedPromises(); } // TODO: fix this technical debt @@ -451,6 +374,10 @@ pub const EventLoop = struct { JSC.JSPromise.Status.Pending => { while (promise.status(this.global.vm()) == .Pending) { this.tick(); + + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } } }, else => {}, @@ -459,13 +386,20 @@ pub const EventLoop = struct { pub fn waitForTasks(this: *EventLoop) void { this.tick(); - while (this.pending_tasks_count.load(.Monotonic) > 0) { + while (this.tasks.count > 0) { this.tick(); + + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } + } else { + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } } } pub fn enqueueTask(this: *EventLoop, task: Task) void { - _ = this.pending_tasks_count.fetchAdd(1, .Monotonic); this.tasks.writeItem(task) catch unreachable; } @@ -476,19 +410,25 @@ pub const EventLoop = struct { } } - pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void { + pub fn onDefer(this: *EventLoop) void { + this.defer_count.store(0, .Monotonic); + this.tick(); + } + + pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void { JSC.markBinding(); - this.concurrent_lock.lock(); - defer this.concurrent_lock.unlock(); - this.concurrent_tasks.writeItem(task) catch unreachable; + + this.concurrent_tasks.push(task); + if (this.virtual_machine.uws_event_loop) |loop| { - loop.nextTick(*EventLoop, this, EventLoop.tick); + const deferCount = this.defer_count.fetchAdd(1, .Monotonic); + if (deferCount == 0) { + loop.nextTick(*EventLoop, this, onDefer); + } } - if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) { - if (this.waker) |*waker| { - waker.wake() catch unreachable; - } + if (this.waker) |*waker| { + waker.wake() catch unreachable; } } }; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 5eeff1ba8..4f97a79ad 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -368,7 +368,6 @@ pub const VirtualMachine = struct { rare_data: ?*JSC.RareData = null, poller: JSC.Poller = JSC.Poller{}, us_loop_reference_count: usize = 0, - disable_run_us_loop: bool = false, is_us_loop_entered: bool = false, pub fn io(this: *VirtualMachine) *IO { @@ -423,7 +422,7 @@ pub const VirtualMachine = struct { this.eventLoop().enqueueTask(task); } - pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: Task) void { + pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: JSC.ConcurrentTask) void { this.eventLoop().enqueueTaskConcurrent(task); } @@ -451,7 +450,7 @@ pub const VirtualMachine = struct { this.macro_event_loop.tasks.ensureTotalCapacity(16) catch unreachable; this.macro_event_loop.global = this.global; this.macro_event_loop.virtual_machine = this; - this.macro_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator); + this.macro_event_loop.concurrent_tasks = .{}; } this.bundler.options.platform = .bun_macro; @@ -555,8 +554,7 @@ pub const VirtualMachine = struct { default_allocator, ); VirtualMachine.vm.regular_event_loop.tasks.ensureUnusedCapacity(64) catch unreachable; - VirtualMachine.vm.regular_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator); - VirtualMachine.vm.regular_event_loop.concurrent_tasks.ensureUnusedCapacity(8) catch unreachable; + VirtualMachine.vm.regular_event_loop.concurrent_tasks = .{}; VirtualMachine.vm.event_loop = &VirtualMachine.vm.regular_event_loop; vm.bundler.macro_context = null; @@ -1435,6 +1433,7 @@ pub const VirtualMachine = struct { pub fn loadEntryPoint(this: *VirtualMachine, entry_path: string) !*JSInternalPromise { this.main = entry_path; try this.entry_point.generate(@TypeOf(this.bundler), &this.bundler, Fs.PathName.init(entry_path), main_file_name); + this.eventLoop().ensureWaker(); var promise: *JSInternalPromise = undefined; @@ -1455,7 +1454,15 @@ pub const VirtualMachine = struct { promise = JSModuleLoader.loadAndEvaluateModule(this.global, &ZigString.init(this.main)); } - this.waitForPromise(promise); + while (promise.status(this.global.vm()) == .Pending) { + this.eventLoop().tick(); + _ = this.eventLoop().waker.?.wait() catch 0; + } + + if (this.us_loop_reference_count > 0) { + _ = this.global.vm().runGC(true); + this.eventLoop().runUSocketsLoop(); + } return promise; } diff --git a/src/bun.js/unbounded_queue.zig b/src/bun.js/unbounded_queue.zig new file mode 100644 index 000000000..fd092290d --- /dev/null +++ b/src/bun.js/unbounded_queue.zig @@ -0,0 +1,149 @@ +const std = @import("std"); + +const os = std.os; +const mem = std.mem; +const meta = std.meta; +const atomic = std.atomic; +const builtin = std.builtin; +const testing = std.testing; + +const assert = std.debug.assert; + +const mpsc = @This(); + +pub const cache_line_length = switch (@import("builtin").target.cpu.arch) { + .x86_64, .aarch64, .powerpc64 => 128, + .arm, .mips, .mips64, .riscv64 => 32, + .s390x => 256, + else => 64, +}; + +pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type { + const next = meta.fieldInfo(T, next_field).name; + + return struct { + const Self = @This(); + + pub const Batch = struct { + pub const Iterator = struct { + batch: Self.Batch, + + pub fn next(self: *Self.Batch.Iterator) ?*T { + if (self.batch.count == 0) return null; + const front = self.batch.front orelse unreachable; + self.batch.front = @field(front, next); + self.batch.count -= 1; + return front; + } + }; + + front: ?*T = null, + last: ?*T = null, + count: usize = 0, + + pub fn iterator(self: Self.Batch) Self.Batch.Iterator { + return .{ .batch = self }; + } + }; + + pub const queue_padding_length = cache_line_length / 2; + + back: ?*T align(queue_padding_length) = null, + count: usize = 0, + front: T align(queue_padding_length) = init: { + var stub: T = undefined; + @field(stub, next) = null; + break :init stub; + }, + + pub fn push(self: *Self, src: *T) void { + assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0); + + @field(src, next) = null; + const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front; + @field(old_back, next) = src; + } + + pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void { + assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0); + + @field(last, next) = null; + const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front; + @field(old_back, next) = first; + } + + pub fn pop(self: *Self) ?*T { + const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null; + if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| { + @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic); + assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1); + return first; + } + const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front; + if (first != last) return null; + @atomicStore(?*T, &@field(self.front, next), null, .Monotonic); + if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) { + assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1); + return first; + } + var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire); + while (next_item == null) : (atomic.spinLoopHint()) { + next_item = @atomicLoad(?*T, &@field(first, next), .Acquire); + } + @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic); + assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1); + return first; + } + + pub fn popBatch(self: *Self) Self.Batch { + var batch: Self.Batch = .{}; + + var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch; + batch.front = front; + + var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire); + while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) { + batch.count += 1; + batch.last = front; + + front = next_node; + } + + const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front; + if (front != last) { + @atomicStore(?*T, &@field(self.front, next), front, .Release); + assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count); + return batch; + } + + @atomicStore(?*T, &@field(self.front, next), null, .Monotonic); + if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) { + batch.count += 1; + batch.last = front; + assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count); + return batch; + } + + next_item = @atomicLoad(?*T, &@field(front, next), .Acquire); + while (next_item == null) : (atomic.spinLoopHint()) { + next_item = @atomicLoad(?*T, &@field(front, next), .Acquire); + } + + batch.count += 1; + @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic); + batch.last = front; + assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count); + return batch; + } + + pub fn peek(self: *Self) usize { + const count = @atomicLoad(usize, &self.count, .Acquire); + assert(count >= 0); + return count; + } + + pub fn isEmpty(self: *Self) bool { + return self.peek() == 0; + } + }; +} diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 6c1fc49f9..2ef8225f3 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -515,90 +515,79 @@ pub const Fetch = struct { ); pub const FetchTasklet = struct { - promise: *JSPromise = undefined, - http: HTTPClient.AsyncHTTP = undefined, - result: HTTPClient.HTTPClientResult = undefined, - status: Status = Status.pending, + http: ?*HTTPClient.AsyncHTTP = null, + result: HTTPClient.HTTPClientResult = .{}, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, - - empty_request_body: MutableString = undefined, - - context: FetchTaskletContext = undefined, + request_body: Blob = undefined, response_buffer: MutableString = undefined, - - blob_store: ?*Blob.Store = null, - - const Pool = ObjectPool(FetchTasklet, init, true, 32); - const BodyPool = ObjectPool(MutableString, MutableString.init2048, true, 8); - pub const FetchTaskletContext = struct { - tasklet: *FetchTasklet, - }; + request_headers: Headers = Headers{ .allocator = undefined }, + ref: *JSC.napi.Ref = undefined, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { return FetchTasklet{}; } - pub const Status = enum(u8) { - pending, - running, - done, - }; + fn clearData(this: *FetchTasklet) void { + this.request_headers.entries.deinit(bun.default_allocator); + this.request_headers.buf.deinit(bun.default_allocator); + this.request_headers = Headers{ .allocator = undefined }; + this.http.?.deinit(); + + this.result.deinitMetadata(); + this.response_buffer.deinit(); + this.request_body.detach(); + } + + pub fn deinit(this: *FetchTasklet) void { + if (this.http) |http| this.javascript_vm.allocator.destroy(http); + this.javascript_vm.allocator.destroy(this); + } pub fn onDone(this: *FetchTasklet) void { if (comptime JSC.is_bindgen) unreachable; const globalThis = this.global_this; - const promise = this.promise; - const state = this.http.state.load(.Monotonic); - const result = switch (state) { - .success => this.onResolve(), - .fail => this.onReject(), - else => unreachable, + + var ref = this.ref; + const promise_value = ref.get(globalThis); + defer ref.destroy(globalThis); + + if (promise_value.isEmptyOrUndefinedOrNull()) { + this.clearData(); + return; + } + + var promise = promise_value.asPromise().?; + + const success = this.result.isSuccess(); + const result = switch (success) { + true => this.onResolve(), + false => this.onReject(), }; - this.release(); - const promise_value = promise.asValue(globalThis); - promise_value.unprotect(); + this.clearData(); - switch (state) { - .success => { + promise_value.ensureStillAlive(); + + switch (success) { + true => { promise.resolve(globalThis, result); }, - .fail => { + false => { promise.reject(globalThis, result); }, - else => unreachable, } } - pub fn reset(_: *FetchTasklet) void {} - - pub fn release(this: *FetchTasklet) void { - this.global_this = undefined; - this.javascript_vm = undefined; - this.promise = undefined; - this.status = Status.pending; - // var pooled = this.pooled_body; - // BodyPool.release(pooled); - // this.pooled_body = undefined; - this.http = undefined; - - Pool.release(@fieldParentPtr(Pool.Node, "data", this)); - } - pub fn onReject(this: *FetchTasklet) JSValue { - if (this.blob_store) |store| { - this.blob_store = null; - store.deref(); - } - defer this.result.deinitMetadata(); const fetch_error = std.fmt.allocPrint( default_allocator, "fetch() failed {s}\nurl: \"{s}\"", .{ - @errorName(this.http.err orelse error.HTTPFail), + this.result.fail, this.result.href, }, ) catch unreachable; @@ -606,26 +595,27 @@ pub const Fetch = struct { } pub fn onResolve(this: *FetchTasklet) JSValue { - var allocator = default_allocator; - var http_response = this.http.response.?; + var allocator = this.global_this.bunVM().allocator; + const http_response = this.result.response; var response = allocator.create(Response) catch unreachable; - if (this.blob_store) |store| { - this.blob_store = null; - store.deref(); - } - defer this.result.deinitMetadata(); + const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this); + this.response_buffer = .{ .allocator = default_allocator, .list = .{ + .items = &.{}, + .capacity = 0, + } }; + response.* = Response{ .allocator = allocator, .url = allocator.dupe(u8, this.result.href) catch unreachable, .status_text = allocator.dupe(u8, http_response.status) catch unreachable, - .redirected = this.http.redirected, + .redirected = this.result.redirected, .body = .{ .init = .{ .headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers), .status_code = @truncate(u16, http_response.status_code), }, .value = .{ - .Blob = Blob.init(this.http.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this), + .Blob = blob, }, }, }; @@ -636,38 +626,50 @@ pub const Fetch = struct { allocator: std.mem.Allocator, method: Method, url: ZigURL, - headers: Headers.Entries, - headers_buf: string, - request_body: ?*MutableString, + headers: Headers, + request_body: Blob, timeout: usize, - request_body_store: ?*Blob.Store, - ) !*FetchTasklet.Pool.Node { - var linked_list = FetchTasklet.Pool.get(allocator); - linked_list.data.javascript_vm = VirtualMachine.vm; - linked_list.data.empty_request_body = MutableString.init(allocator, 0) catch unreachable; - // linked_list.data.pooled_body = BodyPool.get(allocator); - linked_list.data.blob_store = request_body_store; - linked_list.data.response_buffer = MutableString.initEmpty(allocator); - linked_list.data.http = HTTPClient.AsyncHTTP.init( + globalThis: *JSC.JSGlobalObject, + promise: JSValue, + ) !*FetchTasklet { + var jsc_vm = globalThis.bunVM(); + var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet); + if (request_body.store) |store| { + store.ref(); + } + + fetch_tasklet.* = .{ + .response_buffer = MutableString{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, + .http = try jsc_vm.allocator.create(HTTPClient.AsyncHTTP), + .javascript_vm = jsc_vm, + .request_body = request_body, + .global_this = globalThis, + .request_headers = headers, + .ref = JSC.napi.Ref.create(globalThis, promise), + }; + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( allocator, method, url, - headers, - headers_buf, - &linked_list.data.response_buffer, - request_body orelse &linked_list.data.empty_request_body, + headers.entries, + headers.buf.items, + &fetch_tasklet.response_buffer, + request_body.sharedView(), timeout, - undefined, - ); - linked_list.data.context = .{ .tasklet = &linked_list.data }; - linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New( - *FetchTasklet, - FetchTasklet.callback, - ).init( - &linked_list.data, + HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + fetch_tasklet, + ), ); - - return linked_list; + return fetch_tasklet; } pub fn queue( @@ -675,27 +677,36 @@ pub const Fetch = struct { global: *JSGlobalObject, method: Method, url: ZigURL, - headers: Headers.Entries, - headers_buf: string, - request_body: ?*MutableString, + headers: Headers, + request_body: Blob, timeout: usize, - request_body_store: ?*Blob.Store, - ) !*FetchTasklet.Pool.Node { + promise: JSValue, + ) !*FetchTasklet { try HTTPClient.HTTPThread.init(); - var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store); + var node = try get( + allocator, + method, + url, + headers, + request_body, + timeout, + global, + promise, + ); - node.data.global_this = global; var batch = NetworkThread.Batch{}; - node.data.http.schedule(allocator, &batch); - HTTPClient.http_thread.schedule(batch); + node.http.?.schedule(allocator, &batch); VirtualMachine.vm.active_tasks +|= 1; + + HTTPClient.http_thread.schedule(batch); + return node; } pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void { task.response_buffer = result.body.?.*; task.result = result; - task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task)); + task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task)); } }; @@ -715,12 +726,11 @@ pub const Fetch = struct { } var headers: ?Headers = null; - var body: MutableString = MutableString.initEmpty(bun.default_allocator); var method = Method.GET; var args = JSC.Node.ArgumentsSlice.from(ctx.bunVM(), arguments); var url: ZigURL = undefined; var first_arg = args.nextEat().?; - var blob_store: ?*Blob.Store = null; + var body: Blob = Blob.initEmpty(ctx); if (first_arg.isString()) { var url_zig_str = ZigString.init(""); JSValue.fromRef(arguments[0]).toZigString(&url_zig_str, globalThis); @@ -737,7 +747,6 @@ pub const Fetch = struct { url_str = getAllocator(ctx).dupe(u8, url_str) catch unreachable; } - NetworkThread.init() catch @panic("Failed to start network thread"); url = ZigURL.parse(url_str); if (arguments.len >= 2 and arguments[1].?.value().isObject()) { @@ -760,18 +769,7 @@ pub const Fetch = struct { if (options.fastGet(ctx.ptr(), .body)) |body__| { if (Blob.fromJS(ctx.ptr(), body__, true, false)) |new_blob| { - if (new_blob.size > 0) { - body = MutableString{ - .list = std.ArrayListUnmanaged(u8){ - .items = bun.constStrToU8(new_blob.sharedView()), - .capacity = new_blob.size, - }, - .allocator = bun.default_allocator, - }; - blob_store = new_blob.store; - } - // transfer is unnecessary here because this is a new slice - //new_blob.transfer(); + body = new_blob; } else |_| { return JSPromise.rejectedPromiseValue(globalThis, ZigString.init("fetch() received invalid body").toErrorInstance(globalThis)).asRef(); } @@ -783,54 +781,28 @@ pub const Fetch = struct { if (request.headers) |head| { headers = Headers.from(head, bun.default_allocator) catch unreachable; } - var blob = request.body.use(); - // TODO: make RequestBody _NOT_ a MutableString - body = MutableString{ - .list = std.ArrayListUnmanaged(u8){ - .items = bun.constStrToU8(blob.sharedView()), - .capacity = bun.constStrToU8(blob.sharedView()).len, - }, - .allocator = blob.allocator orelse bun.default_allocator, - }; - blob_store = blob.store; + body = request.body.use(); } else { const fetch_error = fetch_type_error_strings.get(js.JSValueGetType(ctx, arguments[0])); return JSPromise.rejectedPromiseValue(globalThis, ZigString.init(fetch_error).toErrorInstance(globalThis)).asRef(); } - var header_entries: Headers.Entries = .{}; - var header_buf: string = ""; - - if (headers) |head| { - header_entries = head.entries; - header_buf = head.buf.items; - } - - var request_body: ?*MutableString = null; - if (body.list.items.len > 0) { - var mutable = bun.default_allocator.create(MutableString) catch unreachable; - mutable.* = body; - request_body = mutable; - } + var deferred_promise = JSC.C.JSObjectMakeDeferredPromise(globalThis, null, null, null); // var resolve = FetchTasklet.FetchResolver.Class.make(ctx: js.JSContextRef, ptr: *ZigType) - var queued = FetchTasklet.queue( + _ = FetchTasklet.queue( default_allocator, globalThis, method, url, - header_entries, - header_buf, - request_body, + headers orelse Headers{ + .allocator = bun.default_allocator, + }, + body, std.time.ns_per_hour, - blob_store, + JSC.JSValue.fromRef(deferred_promise), ) catch unreachable; - const promise = JSC.JSPromise.create(ctx); - queued.data.promise = promise; - const promise_value = promise.asValue(ctx); - promise_value.protect(); - - return promise_value.asObjectRef(); + return deferred_promise; } }; @@ -1991,7 +1963,7 @@ pub const Blob = struct { } } pub fn run(this: *ReadFile, task: *ReadFileTask) void { - var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable; + var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task }); } @@ -2020,7 +1992,7 @@ pub const Blob = struct { task.onFinish(); suspend { - HTTPClient.getAllocator().destroy(@frame()); + bun.default_allocator.destroy(@frame()); } } @@ -2236,7 +2208,7 @@ pub const Blob = struct { cb(cb_ctx, .{ .result = @truncate(SizeType, wrote) }); } pub fn run(this: *WriteFile, task: *WriteFileTask) void { - var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable; + var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task }); } @@ -2244,7 +2216,7 @@ pub const Blob = struct { this._runAsync(); task.onFinish(); suspend { - HTTPClient.getAllocator().destroy(@frame()); + bun.default_allocator.destroy(@frame()); } } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 242e3db49..1158a8dd2 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2340,6 +2340,7 @@ pub const FileBlobLoader = struct { read_frame: anyframe = undefined, chunk_size: Blob.SizeType = 0, main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null }, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn taskCallback(task: *NetworkThread.Task) void { var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task)); @@ -2475,7 +2476,7 @@ pub const FileBlobLoader = struct { pub fn scheduleMainThreadTask(this: *FileBlobLoader) void { this.concurrent.main_thread_task.ctx = this; - this.loop.enqueueTaskConcurrent(JSC.Task.init(&this.concurrent.main_thread_task)); + this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task)); } fn runAsync(this: *FileBlobLoader) void { |