diff options
author | 2023-09-23 05:57:40 -0700 | |
---|---|---|
committer | 2023-09-23 05:57:40 -0700 | |
commit | 68c8377d76c6e1271e25f862614ce9270a64857d (patch) | |
tree | 593abaf3066e31eba38d96daf6a7e220398b718a | |
parent | ec0e931e9f7934f4f1f7617eac2a880d13794d0c (diff) | |
download | bun-jarred/cjs2.tar.gz bun-jarred/cjs2.tar.zst bun-jarred/cjs2.zip |
WIP concurrent CommonJSjarred/cjs2
-rw-r--r-- | src/bun.js/base.zig | 39 | ||||
-rw-r--r-- | src/bun.js/bindings/ModuleLoader.cpp | 10 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.cpp | 9 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.h | 2 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 6 | ||||
-rw-r--r-- | src/bun.js/module_loader.zig | 182 | ||||
-rw-r--r-- | src/import_record.zig | 3 | ||||
-rw-r--r-- | src/js_parser.zig | 1 |
8 files changed, 206 insertions, 46 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 26d5d74c3..1cb4e2e1f 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -1611,6 +1611,11 @@ pub const PollRef = struct { this.status = .done; } + pub fn disableConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { + this.unrefConcurrently(vm); + @atomicStore(Status, &this.status, Status.done, .Monotonic); + } + /// Only intended to be used from EventLoop.Pollable pub fn deactivate(this: *PollRef, loop: *uws.Loop) void { if (this.status != .active) @@ -1645,10 +1650,15 @@ pub const PollRef = struct { /// From another thread, Prevent a poll from keeping the process alive. pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { - if (this.status != .active) - return; - this.status = .inactive; - vm.event_loop_handle.?.unrefConcurrently(); + switch (@atomicRmw(Status, &this.status, .Xchg, .inactive, .Monotonic)) { + .active => { + vm.event_loop_handle.?.unrefConcurrently(); + }, + .inactive => {}, + .done => { + @atomicStore(Status, &this.status, .done, .Monotonic); + }, + } } /// Prevent a poll from keeping the process alive on the next tick. @@ -1659,14 +1669,6 @@ pub const PollRef = struct { vm.pending_unref_counter +|= 1; } - /// From another thread, prevent a poll from keeping the process alive on the next tick. - pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { - if (this.status != .active) - return; - this.status = .inactive; - _ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic); - } - /// Allow a poll to keep the process alive. pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void { if (this.status != .inactive) @@ -1677,10 +1679,15 @@ pub const PollRef = struct { /// Allow a poll to keep the process alive. pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void { - if (this.status != .inactive) - return; - this.status = .active; - vm.event_loop_handle.?.refConcurrently(); + switch (@atomicRmw(Status, &this.status, .Xchg, .inactive, .Monotonic)) { + .active => {}, + .inactive => { + vm.event_loop_handle.?.refConcurrently(); + }, + .done => { + @atomicStore(Status, &this.status, .done, .Monotonic); + }, + } } pub fn refConcurrentlyFromEventLoop(this: *PollRef, loop: *JSC.EventLoop) void { diff --git a/src/bun.js/bindings/ModuleLoader.cpp b/src/bun.js/bindings/ModuleLoader.cpp index acda70e0a..8afd81128 100644 --- a/src/bun.js/bindings/ModuleLoader.cpp +++ b/src/bun.js/bindings/ModuleLoader.cpp @@ -63,6 +63,13 @@ static JSC::JSInternalPromise* resolvedInternalPromise(JSC::JSGlobalObject* glob return promise; } +extern "C" bool ModuleLoader__moduleIDExistsInRequireMapOrESMRegistry(Zig::GlobalObject* globalObject, BunString* specifier) +{ + JSC::JSValue specifierValue = Bun::toJS(globalObject, *specifier); + + return globalObject->requireMap()->has(globalObject, specifierValue) || globalObject->esModuleRegistry()->has(globalObject, specifierValue); +} + // Converts an object from InternalModuleRegistry into { ...obj, default: obj } static JSC::SyntheticSourceProvider::SyntheticSourceGenerator generateInternalModuleSourceCode(JSC::JSGlobalObject* globalObject, InternalModuleRegistry::Field moduleId) @@ -484,8 +491,7 @@ JSValue fetchCommonJSModule( } } - auto* loader = globalObject->moduleLoader(); - JSMap* registry = jsCast<JSMap*>(loader->getDirect(vm, Identifier::fromString(vm, "registry"_s))); + JSMap* registry = globalObject->esModuleRegistry(); auto hasAlreadyLoadedESMVersionSoWeShouldntTranspileItTwice = [&]() -> bool { JSValue entry = registry->get(globalObject, specifierValue); diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 54fb58776..13aa7b796 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -3070,6 +3070,14 @@ void GlobalObject::finishCreation(VM& vm) init.set(map); }); + m_esModuleRegistry.initLater( + [](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::JSMap>::Initializer& init) { + auto* globalObject = init.owner; + auto* loader = globalObject->moduleLoader(); + JSMap* registry = jsCast<JSMap*>(loader->getDirect(init.vm, Identifier::fromString(init.vm, "registry"_s))); + init.set(registry); + }); + m_encodeIntoObjectStructure.initLater( [](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::Structure>::Initializer& init) { auto& vm = init.vm; @@ -3796,6 +3804,7 @@ void GlobalObject::visitChildrenImpl(JSCell* cell, Visitor& visitor) thisObject->m_utilInspectStylizeNoColorFunction.visit(visitor); thisObject->m_lazyReadableStreamPrototypeMap.visit(visitor); thisObject->m_requireMap.visit(visitor); + thisObject->m_esModuleRegistry.visit(visitor); thisObject->m_encodeIntoObjectStructure.visit(visitor); thisObject->m_JSArrayBufferControllerPrototype.visit(visitor); thisObject->m_JSFileSinkControllerPrototype.visit(visitor); diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h index e27b3bffa..144b9ad0c 100644 --- a/src/bun.js/bindings/ZigGlobalObject.h +++ b/src/bun.js/bindings/ZigGlobalObject.h @@ -211,6 +211,7 @@ public: JSC::JSMap* readableStreamNativeMap() { return m_lazyReadableStreamPrototypeMap.getInitializedOnMainThread(this); } JSC::JSMap* requireMap() { return m_requireMap.getInitializedOnMainThread(this); } + JSC::JSMap* esModuleRegistry() { return m_esModuleRegistry.getInitializedOnMainThread(this); } JSC::Structure* encodeIntoObjectStructure() { return m_encodeIntoObjectStructure.getInitializedOnMainThread(this); } JSC::Structure* callSiteStructure() const { return m_callSiteStructure.getInitializedOnMainThread(this); } @@ -478,6 +479,7 @@ public: LazyProperty<JSGlobalObject, JSFunction> m_emitReadableNextTickFunction; LazyProperty<JSGlobalObject, JSMap> m_lazyReadableStreamPrototypeMap; LazyProperty<JSGlobalObject, JSMap> m_requireMap; + LazyProperty<JSGlobalObject, JSMap> m_esModuleRegistry; LazyProperty<JSGlobalObject, Structure> m_encodeIntoObjectStructure; LazyProperty<JSGlobalObject, JSObject> m_JSArrayBufferControllerPrototype; LazyProperty<JSGlobalObject, JSObject> m_JSFileSinkControllerPrototype; diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 7f175a1a1..a11a25601 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -343,6 +343,7 @@ const Futimes = JSC.Node.Async.futimes; const Lchmod = JSC.Node.Async.lchmod; const Lchown = JSC.Node.Async.lchown; const Unlink = JSC.Node.Async.unlink; +const TranspilerJob = JSC.RuntimeTranspilerStore.TranspilerJob; // Task.get(ReadFileTask) -> ?ReadFileTask pub const Task = TaggedPointerUnion(.{ @@ -350,6 +351,7 @@ pub const Task = TaggedPointerUnion(.{ Microtask, MicrotaskForDefaultGlobalObject, AsyncTransformTask, + TranspilerJob, ReadFileTask, CopyFilePromiseTask, WriteFileTask, @@ -756,6 +758,10 @@ pub const EventLoop = struct { var any: *Lstat = task.get(Lstat).?; any.runFromJSThread(); }, + @field(Task.Tag, typeBaseName(@typeName(TranspilerJob))) => { + var job: *TranspilerJob = task.get(TranspilerJob).?; + job.runFromJSThread(); + }, @field(Task.Tag, typeBaseName(@typeName(Fstat))) => { var any: *Fstat = task.get(Fstat).?; any.runFromJSThread(); diff --git a/src/bun.js/module_loader.zig b/src/bun.js/module_loader.zig index ca066450d..9523bd878 100644 --- a/src/bun.js/module_loader.zig +++ b/src/bun.js/module_loader.zig @@ -188,23 +188,21 @@ pub const RuntimeTranspilerStore = struct { store: TranspilerJob.Store, enabled: bool = true, + sync_transpilation_mutex: bun.Lock = bun.Lock.init(), + pending_transpilations: std.StringHashMap(*TranspilerJob) = .{}, + + pub const SyncQueue = bun.UnboundedQueue(TranspilerJob, .next); + pub fn init(allocator: std.mem.Allocator) RuntimeTranspilerStore { return RuntimeTranspilerStore{ .store = TranspilerJob.Store.init(allocator), }; } - pub fn transpile( - this: *RuntimeTranspilerStore, - vm: *JSC.VirtualMachine, - globalObject: *JSC.JSGlobalObject, - path: Fs.Path, - referrer: []const u8, - ) *anyopaque { - debug("transpile({s})", .{path.text}); - var job: *TranspilerJob = this.store.get(); + fn createJob(this: *RuntimeTranspilerStore, vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, path: Fs.Path, promise: JSC.JSValue, referrer: []const u8) *TranspilerJob { var owned_path = Fs.Path.init(bun.default_allocator.dupe(u8, path.text) catch unreachable); - var promise = JSC.JSInternalPromise.create(globalObject); + var job: *TranspilerJob = this.store.get(); + job.* = TranspilerJob{ .path = owned_path, .globalThis = globalObject, @@ -212,16 +210,86 @@ pub const RuntimeTranspilerStore = struct { .vm = vm, .log = logger.Log.init(bun.default_allocator), .loader = vm.bundler.options.loader(owned_path.name.ext), - .promise = JSC.Strong.create(JSC.JSValue.fromCell(promise), globalObject), + .promise = if (promise != .zero) JSC.Strong.create(promise, globalObject) else .{}, .poll_ref = .{}, .fetcher = TranspilerJob.Fetcher{ .file = {}, }, }; + + return job; + } + + pub fn transpile( + this: *RuntimeTranspilerStore, + vm: *JSC.VirtualMachine, + globalObject: *JSC.JSGlobalObject, + path: Fs.Path, + referrer: []const u8, + ) *anyopaque { + var hash_entry = this.pending_transpilations.getOrPut(path.text) catch @panic("Out of memory"); + if (hash_entry.found_existing) { + var job: *TranspilerJob = hash_entry.value_ptr.*; + if (job.generation_number == this.generation_number.load(.Monotonic)) { + var promise = job.promise.get() orelse brk: { + job.promise.set(JSC.JSValue.fromCell(JSC.JSInternalPromise.create(globalObject)), globalObject); + break :brk job.promise.get().?; + }; + debug("transpile({s}) - returning existing promise", .{path.text}); + job.onComplete(ModuleLoader.AsyncModule.fulfill); + return promise.asCell(); + } else { + job.cancelled.store(true, .Monotonic); + debug("transpile({s}) - generation number mismatch ({d} vs {d})", .{ path.text, job.generation_number, this.generation_number.loadUnchecked() }); + } + } + + debug("transpile({s})", .{path.text}); + var promise = JSC.JSInternalPromise.create(globalObject); + var job = this.createJob(vm, globalObject, path, JSC.JSValue.fromCell(promise), referrer); + hash_entry.value_ptr.* = job; job.schedule(); + return promise; } + pub const RequireQueue = struct { + queue: bun.StringSet = bun.StringSet.init(bun.default_allocator), + + pub fn deinit(this: *RequireQueue) void { + this.queue.deinit(); + } + + extern fn ModuleLoader__moduleIDExistsInRequireMapOrESMRegistry(globalObject: *JSC.JSGlobalObject, specifier: *bun.String) bool; + + pub fn drain(require_queue: *RequireQueue, path: Fs.Path, vm: *JSC.VirtualMachine, globalThis: *JSC.JSGlobalObject) void { + const source_dir = path.name.dirWithTrailingSlash(); + var resolver = &vm.bundler.resolver; + var store = &vm.transpiler_store; + const referrer = path.text; + + for (require_queue.keys()) |module_id| { + var result = resolver.resolve(source_dir, module_id, .require) catch continue; + if (result.is_external or result.is_standalone_module) continue; + var current_path = result.path() orelse continue; + + if (ModuleLoader__moduleIDExistsInRequireMapOrESMRegistry(globalThis, bun.String.init(current_path.text))) { + continue; + } + + var entry = store.pending_transpilations.getOrPut(current_path.text) catch @panic("Out of memory"); + if (entry.found_existing) { + continue; + } + + var job = store.createJob(vm, globalThis, current_path, .zero, referrer); + entry.value_ptr.* = job; + entry.key_ptr.* = job.path.text; + job.schedule(); + } + } + }; + pub const TranspilerJob = struct { path: Fs.Path, referrer: []const u8, @@ -236,8 +304,32 @@ pub const RuntimeTranspilerStore = struct { parse_error: ?anyerror = null, resolved_source: ResolvedSource = ResolvedSource{}, work_task: JSC.WorkPoolTask = .{ .callback = runFromWorkerThread }, + next: ?*TranspilerJob = null, + ref_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(1), + cancelled: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + require_queue: RequireQueue = RequireQueue{}, + + pub const Status = enum(u32) { + pending, + }; pub const Store = bun.HiveArray(TranspilerJob, 64).Fallback; + pub fn ref(this: *TranspilerJob) void { + std.debug.assert(this.ref_count.fetchAdd(1, .Monotonic) != 0); + } + + pub fn unref(this: *TranspilerJob) void { + this.cancelled.store(true, .Monotonic); + this.poll_ref.disableConcurrently(this.vm); + + const prev_count = this.ref_count.fetchSub(1, .Monotonic); + const needs_deinit = prev_count == 1; + std.debug.assert(prev_count != 0); + + if (needs_deinit) { + this.deinit(); + } + } pub const Fetcher = union(enum) { virtual_module: bun.String, @@ -250,29 +342,49 @@ pub const RuntimeTranspilerStore = struct { } }; - pub fn deinit(this: *TranspilerJob) void { + fn deinit(this: *TranspilerJob) void { + _ = this.vm.transpiler_store.pending_transpilations.remove(this.path.text); bun.default_allocator.free(this.path.text); bun.default_allocator.free(this.referrer); - - this.poll_ref.disable(); this.fetcher.deinit(); this.loader = options.Loader.file; this.path = Fs.Path.empty; this.log.deinit(); this.promise.deinit(); + this.require_queue.deinit(); this.globalThis = undefined; + this.resolved_source.source_code.deref(); + this.resolved_source.specifier.deref(); + this.vm.transpiler_store.store.put(this); } threadlocal var ast_memory_store: ?*js_ast.ASTMemoryAllocator = null; threadlocal var source_code_printer: ?*js_printer.BufferPrinter = null; pub fn dispatchToMainThread(this: *TranspilerJob) void { - this.vm.eventLoop().enqueueTaskConcurrent( - JSC.ConcurrentTask.fromCallback(this, runFromJSThread), - ); + this.vm.eventLoop().enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this))); } pub fn runFromJSThread(this: *TranspilerJob) void { + if (this.isCancelled()) { + this.unref(); + return; + } + + this.onComplete(ModuleLoader.AsyncModule.fulfill); + } + + fn drainRequireQueue(this: *TranspilerJob) void { + if (!this.path.isFile()) { + return; + } + + this.require_queue.drain(this.path, this.vm, this.globalThis); + this.require_queue.deinit(); + this.require_queue = .{}; + } + + pub fn onComplete(this: *TranspilerJob, comptime fulfill: anytype) void { var vm = this.vm; var promise = this.promise.swap(); var globalThis = this.globalThis; @@ -282,6 +394,7 @@ pub const RuntimeTranspilerStore = struct { var log = this.log; this.log = logger.Log.init(bun.default_allocator); var resolved_source = this.resolved_source; + this.resolved_source = ResolvedSource{}; resolved_source.source_url = specifier.toZigString(); resolved_source.tag = brk: { @@ -303,18 +416,19 @@ pub const RuntimeTranspilerStore = struct { }; const parse_error = this.parse_error; - if (!vm.transpiler_store.store.hive.in(this)) { - this.promise.deinit(); + + if (parse_error == null) { + this.drainRequireQueue(); } - this.deinit(); - _ = vm.transpiler_store.store.hive.put(this); + this.unref(); - ModuleLoader.AsyncModule.fulfill(globalThis, promise, resolved_source, parse_error, specifier, referrer, &log); + fulfill(globalThis, promise, resolved_source, parse_error, specifier, referrer, &log); } pub fn schedule(this: *TranspilerJob) void { this.poll_ref.ref(this.vm); + this.ref(); JSC.WorkPool.schedule(&this.work_task); } @@ -322,15 +436,21 @@ pub const RuntimeTranspilerStore = struct { @fieldParentPtr(TranspilerJob, "work_task", work_task).run(); } - pub fn run(this: *TranspilerJob) void { - var arena = bun.ArenaAllocator.init(bun.default_allocator); - defer arena.deinit(); + inline fn isCancelled(this: *const TranspilerJob) bool { + return this.cancelled.load(.Monotonic) or this.generation_number != this.vm.transpiler_store.generation_number.load(.Monotonic); + } + pub fn run(this: *TranspilerJob) void { defer this.dispatchToMainThread(); - if (this.generation_number != this.vm.transpiler_store.generation_number.load(.Monotonic)) { - this.parse_error = error.TranspilerJobGenerationMismatch; + + if (this.isCancelled()) { + // Allow the job to be freed from the main thread return; } + var vm = this.vm; + + var arena = bun.ArenaAllocator.init(bun.default_allocator); + defer arena.deinit(); if (ast_memory_store == null) { ast_memory_store = bun.default_allocator.create(js_ast.ASTMemoryAllocator) catch @panic("out of memory!"); @@ -349,7 +469,6 @@ pub const RuntimeTranspilerStore = struct { const loader = this.loader; this.log = logger.Log.init(bun.default_allocator); - var vm = this.vm; var bundler: bun.Bundler = undefined; bundler = vm.bundler; var allocator = arena.allocator(); @@ -516,6 +635,13 @@ pub const RuntimeTranspilerStore = struct { if (strings.eqlComptime(import_record.path.text, "test")) { import_record.tag = .bun_test; } + + continue; + } + + if (import_record.is_top_level_require and import_record.path.isFile()) { + std.debug.assert(import_record.kind == .require); + this.require_queue.queue.insert(import_record.path.text) catch continue; } } diff --git a/src/import_record.zig b/src/import_record.zig index d48cfb1b0..7577b4af9 100644 --- a/src/import_record.zig +++ b/src/import_record.zig @@ -166,6 +166,9 @@ pub const ImportRecord = struct { /// If true, this import can be removed if it's unused is_external_without_side_effects: bool = false, + /// Used for async transpilation of CommonJS modules at runtime + is_top_level_require: bool = false, + kind: ImportKind, tag: Tag = Tag.none, diff --git a/src/js_parser.zig b/src/js_parser.zig index e4dbaf25b..73a91b225 100644 --- a/src/js_parser.zig +++ b/src/js_parser.zig @@ -11669,6 +11669,7 @@ fn NewParser_( .kind = kind, .range = range, .path = path, + .is_top_level_require = kind == .require and p.current_scope.parent == null, }; p.import_records.append(record) catch unreachable; return @as(u32, @intCast(index)); |