diff options
author | 2022-02-18 02:45:30 -0800 | |
---|---|---|
committer | 2022-02-18 02:45:30 -0800 | |
commit | 65f1733d6f82d05edd1fad31f5536a5b8aad7c6f (patch) | |
tree | dd1d8d0d027ccfa0da270b2bf9efa438e2ded73c | |
parent | 37e63eff22766604b3c107916d3f48e5ec709b30 (diff) | |
download | bun-65f1733d6f82d05edd1fad31f5536a5b8aad7c6f.tar.gz bun-65f1733d6f82d05edd1fad31f5536a5b8aad7c6f.tar.zst bun-65f1733d6f82d05edd1fad31f5536a5b8aad7c6f.zip |
[Bundler] Fix race condition, 5% faster bundling
cc @hanford
-rw-r--r-- | src/bundler.zig | 386 |
1 files changed, 209 insertions, 177 deletions
diff --git a/src/bundler.zig b/src/bundler.zig index 19102fd98..d8376da2d 100644 --- a/src/bundler.zig +++ b/src/bundler.zig @@ -22,6 +22,7 @@ const linker = @import("linker.zig"); const Ref = @import("ast/base.zig").Ref; const Define = @import("defines.zig").Define; const DebugOptions = @import("./cli.zig").Command.DebugOptions; +const ThreadPoolLib = @import("./thread_pool.zig"); const panicky = @import("panic_handler.zig"); const Fs = @import("fs.zig"); @@ -474,18 +475,24 @@ pub const Bundler = struct { } pub const GenerateNodeModuleBundle = struct { - const BunQueue = NewBunQueue(_resolver.Result); + const BunQueue = sync.Channel(PendingImports, .Dynamic); pub const ThreadPool = struct { + pool: ThreadPoolLib = undefined, // Hardcode 512 as max number of threads for now. workers: [512]Worker = undefined, - workers_used: u32 = 0, + workers_used: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), cpu_count: u32 = 0, started_workers: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), stopped_workers: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), completed_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + pending_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + + generator: *GenerateNodeModuleBundle = undefined, + pub fn start(this: *ThreadPool, generator: *GenerateNodeModuleBundle) !void { generator.bundler.env.loadProcess(); + this.generator = generator; this.cpu_count = @truncate(u32, @divFloor((try std.Thread.getCpuCount()) + 1, 2)); @@ -495,59 +502,47 @@ pub const Bundler = struct { } else |_| {} } - if (this.cpu_count <= 1) return; - - while (this.workers_used < this.cpu_count) : (this.workers_used += 1) { - try this.workers[this.workers_used].init(generator); + this.pool = ThreadPoolLib.init(.{ + .max_threads = this.cpu_count, + }); + this.pool.on_thread_spawn = Worker.onSpawn; + this.pool.threadpool_context = this; + var workers_used: u32 = 0; + while (workers_used < this.cpu_count) : (workers_used += 1) { + try this.workers[workers_used].init(generator); } } pub fn wait(this: *ThreadPool, generator: *GenerateNodeModuleBundle) !void { - if (this.cpu_count <= 1) { - var worker = generator.allocator.create(Worker) catch unreachable; - worker.* = Worker{ - .generator = generator, - .allocator = generator.allocator, - .data = generator.allocator.create(Worker.WorkerData) catch unreachable, - .thread_id = undefined, - .thread = undefined, - }; - worker.data.shared_buffer = try MutableString.init(generator.allocator, 0); - worker.data.scan_pass_result = js_parser.ScanPassResult.init(generator.allocator); - worker.data.log = generator.log; - worker.data.estimated_input_lines_of_code = 0; - worker.data.macro_context = js_ast.Macro.MacroContext.init(generator.bundler); - - defer { - worker.data.deinit(generator.allocator); + while (true) { + while (this.generator.queue.tryReadItem() catch null) |queue| { + var iter = queue.iterator(); + var batch = ThreadPoolLib.Batch{}; + var count: u32 = 0; + while (iter.next()) |entry| { + const module_id: u32 = entry.key_ptr.*; + const exists = generator.enqueued_map.getOrPut(module_id) catch unreachable; + if (exists.found_existing) { + continue; + } + batch.push(ThreadPoolLib.Batch.from(&entry.value_ptr.*.task)); + count += 1; + } + _ = this.pending_count.fetchAdd(count, .Monotonic); + this.pool.schedule(batch); } - while (generator.queue.next()) |item| { - try generator.processFile(worker, generator.bundler, item); + if (this.completed_count.load(.Monotonic) > 0 and this.completed_count.load(.Monotonic) == this.pending_count.load(.Monotonic)) { + break; } - generator.estimated_input_lines_of_code = worker.data.estimated_input_lines_of_code; - return; - } - - while (generator.queue.count.load(.SeqCst) != generator.pool.completed_count.load(.SeqCst)) { - var j: usize = 0; - while (j < 100) : (j += 1) {} - std.atomic.spinLoopHint(); - } - - for (this.workers[0..this.workers_used]) |*worker| { - @atomicStore(bool, &worker.quit, true, .Release); - } - - while (this.stopped_workers.load(.Acquire) != this.workers_used) { - var j: usize = 0; - while (j < 100) : (j += 1) {} std.atomic.spinLoopHint(); } - for (this.workers[0..this.workers_used]) |*worker| { - worker.thread.join(); + const workers: []const Worker = this.workers[0..this.workers_used.loadUnchecked()]; + for (workers) |worker| { + this.generator.estimated_input_lines_of_code += worker.data.estimated_input_lines_of_code; + try worker.data.log.appendTo(this.generator.log); } } @@ -573,6 +568,7 @@ pub const Bundler = struct { log: *logger.Log, estimated_input_lines_of_code: usize = 0, macro_context: js_ast.Macro.MacroContext, + bundler: Bundler = undefined, pub fn deinit(this: *WorkerData, allocator: std.mem.Allocator) void { this.shared_buffer.deinit(); @@ -585,7 +581,14 @@ pub const Bundler = struct { pub fn init(worker: *Worker, generator: *GenerateNodeModuleBundle) !void { worker.generator = generator; worker.allocator = generator.allocator; - worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker}); + } + + pub fn onSpawn(ctx: ?*anyopaque) ?*anyopaque { + var pool = @ptrCast(*ThreadPool, @alignCast(@alignOf(*ThreadPool), ctx.?)); + + const id = pool.workers_used.fetchAdd(1, .Monotonic); + pool.workers[id].run(); + return &pool.workers[id]; } pub fn notifyStarted(this: *Worker) void { @@ -602,25 +605,6 @@ pub const Bundler = struct { if (Environment.isDebug) { Output.prettyln("Thread started.\n", .{}); } - defer { - if (Environment.isDebug) { - Output.prettyln("Thread stopped.\n", .{}); - } - Output.flush(); - } - - this.loop() catch |err| Report.globalError(err); - } - - pub fn loop(this: *Worker) anyerror!void { - defer { - _ = this.generator.pool.stopped_workers.fetchAdd(1, .Release); - this.notifyStarted(); - - std.Thread.Futex.wake(&this.generator.pool.stopped_workers, 1); - // std.Thread.Futex.wake(&this.generator.queue.len, std.math.maxInt(u32)); - } - js_ast.Expr.Data.Store.create(this.generator.allocator); js_ast.Stmt.Data.Store.create(this.generator.allocator); this.data = this.generator.allocator.create(WorkerData) catch unreachable; @@ -630,10 +614,10 @@ pub const Bundler = struct { .macro_context = js_ast.Macro.MacroContext.init(this.generator.bundler), }; this.data.log.* = logger.Log.init(this.generator.allocator); - this.data.shared_buffer = try MutableString.init(this.generator.allocator, 0); + this.data.shared_buffer = MutableString.init(this.generator.allocator, 0) catch unreachable; this.data.scan_pass_result = js_parser.ScanPassResult.init(this.generator.allocator); - var bundler = this.generator.bundler.*; - var bundler_ptr = &bundler; + this.data.bundler = this.generator.bundler.*; + var bundler_ptr = &this.data.bundler; const CacheSet = @import("./cache.zig"); // no funny business mr. cache bundler_ptr.resolver.caches = CacheSet.Set.init(this.allocator); @@ -641,30 +625,30 @@ pub const Bundler = struct { bundler_ptr.log = this.data.log; bundler_ptr.linker.log = this.data.log; bundler_ptr.linker.resolver.log = this.data.log; + } - defer { - { - this.generator.log_lock.lock(); - this.data.log.appendTo(this.generator.log) catch {}; - this.generator.estimated_input_lines_of_code += this.data.estimated_input_lines_of_code; - this.generator.log_lock.unlock(); - } - - this.data.deinit(this.generator.allocator); - } - - this.notifyStarted(); - - while (!@atomicLoad(bool, &this.quit, .Acquire)) { - while (this.generator.queue.next()) |item| { - defer { - _ = this.generator.pool.completed_count.fetchAdd(1, .Release); - } + pub const ProcessFileTask = struct { + resolution: _resolver.Result, + task: ThreadPoolLib.Task = .{ .callback = callback }, - try this.generator.processFile(this, bundler_ptr, item); - } + pub fn callback(task: *ThreadPoolLib.Task) void { + var worker = @ptrCast( + *ThreadPool.Worker, + @alignCast( + @alignOf(*ThreadPool.Worker), + ThreadPoolLib.Thread.current.?.ctx.?, + ), + ); + var process: *ProcessFileTask = @fieldParentPtr(ProcessFileTask, "task", task); + + worker.generator.processFile( + worker, + &worker.data.bundler, + process.resolution, + ) catch {}; + _ = worker.generator.pool.completed_count.fetchAdd(1, .Monotonic); } - } + }; }; }; write_lock: Lock, @@ -675,8 +659,10 @@ pub const Bundler = struct { // Just need to know if we've already enqueued this one package_list_map: std.AutoHashMap(u64, u32), - queue: *BunQueue, + enqueued_map: std.AutoHashMap(u32, void), + queue: BunQueue, bundler: *ThisBundler, + allocator: std.mem.Allocator, tmpfile: std.fs.File, log: *logger.Log, @@ -694,6 +680,7 @@ pub const Bundler = struct { always_bundled_package_hashes: []u32 = &[_]u32{}, always_bundled_package_jsons: []*const PackageJSON = &.{}, + always_bundled_booleans: []bool = &.{}, package_bundle_map: options.BundlePackage.Map = options.BundlePackage.Map{}, const U32Map = std.AutoHashMap(u32, u32); @@ -701,6 +688,16 @@ pub const Bundler = struct { const dist_index_js_string_pointer = Api.StringPointer{ .length = "dist/index.js".len }; const index_js_string_pointer = Api.StringPointer{ .length = "index.js".len, .offset = "dist/".len }; + fn upsert(this: *GenerateNodeModuleBundle, module_id: u32, resolve: _resolver.Result) !void { + var dedupe = try this.enqueued_map.getOrPut(module_id); + if (dedupe.found_existing) return; + var task = try this.allocator.create(ThreadPool.Worker.ProcessFileTask); + task.* = ThreadPool.Worker.ProcessFileTask{ + .resolution = resolve, + }; + _ = this.pool.pending_count.fetchAdd(1, .Monotonic); + this.pool.pool.schedule(ThreadPoolLib.Batch.from(&task.task)); + } pub fn enqueueItem(this: *GenerateNodeModuleBundle, resolve: _resolver.Result) !void { var result = resolve; var path = result.path() orelse return; @@ -710,9 +707,9 @@ pub const Bundler = struct { path.* = try path.dupeAlloc(this.allocator); if (BundledModuleData.get(this, &result)) |mod| { - try this.queue.upsert(mod.module_id, result); + try this.upsert(mod.module_id, result); } else { - try this.queue.upsert(result.hash(this.bundler.fs.top_level_dir, loader), result); + try this.upsert(result.hash(this.bundler.fs.top_level_dir, loader), result); } } @@ -799,13 +796,14 @@ pub const Bundler = struct { errdefer tmpfile.closeAndDelete(tmpname); var generator = try allocator.create(GenerateNodeModuleBundle); - var queue = try BunQueue.init(allocator); + var queue = BunQueue.init(allocator); defer allocator.destroy(generator); generator.* = GenerateNodeModuleBundle{ .module_list = std.ArrayList(Api.JavascriptBundledModule).init(allocator), .package_list = std.ArrayList(Api.JavascriptBundledPackage).init(allocator), .header_string_buffer = try MutableString.init(allocator, "dist/index.js".len), .allocator = allocator, + .enqueued_map = std.AutoHashMap(u32, void).init(allocator), .queue = queue, .estimated_input_lines_of_code = 0, // .resolve_queue = queue, @@ -855,20 +853,16 @@ pub const Bundler = struct { const bundle_keys = package_bundle_map.keys(); const do_always_bundle = package_bundle_map.values(); - var always_bundle_count: u32 = 0; - for (do_always_bundle) |always| { - always_bundle_count += @as(u32, @boolToInt(always == .always)); - } + var always_bundle_count: u32 = @truncate(u32, bundle_keys.len); - if (always_bundle_count > 0) { + if (always_bundle_count != 0) { Analytics.Features.always_bundle = true; var always_bundled_package_jsons = bundler.allocator.alloc(*PackageJSON, always_bundle_count) catch unreachable; var always_bundled_package_hashes = bundler.allocator.alloc(u32, always_bundle_count) catch unreachable; + var always_bundled_booleans = bundler.allocator.alloc(bool, always_bundle_count) catch unreachable; var i: u16 = 0; inner: for (bundle_keys) |name, k| { - if (do_always_bundle[k] != .always) continue; - std.mem.copy(u8, &tmp_buildfile_buf, name); std.mem.copy(u8, tmp_buildfile_buf[name.len..], "/package.json"); const package_json_import = tmp_buildfile_buf[0 .. name.len + "/package.json".len]; @@ -905,10 +899,12 @@ pub const Bundler = struct { always_bundled_package_jsons[i] = package_json; always_bundled_package_hashes[i] = package_json.hash; + always_bundled_booleans[i] = do_always_bundle[k] == .always; i += 1; } generator.always_bundled_package_hashes = always_bundled_package_hashes[0..i]; generator.always_bundled_package_jsons = always_bundled_package_jsons[0..i]; + generator.always_bundled_booleans = always_bundled_booleans[0..i]; } } if (generator.log.errors > 0) return error.BundleFailed; @@ -920,6 +916,27 @@ pub const Bundler = struct { this.bundler.options.jsx.supports_fast_refresh and bundler.options.platform.isWebLike(); + if (framework_config != null) { + defer this.bundler.resetStore(); + + try this.bundler.configureFramework(true); + if (bundler.options.framework) |framework| { + Analytics.Features.framework = true; + + if (framework.override_modules.keys.len > 0) { + bundler.options.framework.?.override_modules_hashes = allocator.alloc(u64, framework.override_modules.keys.len) catch unreachable; + for (framework.override_modules.keys) |key, i| { + bundler.options.framework.?.override_modules_hashes[i] = std.hash.Wyhash.hash(0, key); + } + } + } + } else {} + + this.pool.start(this) catch |err| { + Analytics.enqueue(Analytics.EventName.bundle_fail); + return err; + }; + if (bundler.router) |router| { defer this.bundler.resetStore(); Analytics.Features.filesystem_router = true; @@ -932,6 +949,38 @@ pub const Bundler = struct { this.bundler.resetStore(); } else {} + if (bundler.options.framework) |framework| { + if (bundler.options.platform.isBun()) { + if (framework.server.isEnabled()) { + Analytics.Features.bunjs = true; + const resolved = try bundler.resolver.resolve( + bundler.fs.top_level_dir, + framework.server.path, + .entry_point, + ); + try this.enqueueItem(resolved); + } + } else { + if (framework.client.isEnabled()) { + const resolved = try bundler.resolver.resolve( + bundler.fs.top_level_dir, + framework.client.path, + .entry_point, + ); + try this.enqueueItem(resolved); + } + + if (framework.fallback.isEnabled()) { + const resolved = try bundler.resolver.resolve( + bundler.fs.top_level_dir, + framework.fallback.path, + .entry_point, + ); + try this.enqueueItem(resolved); + } + } + } + for (bundler.options.entry_points) |entry_point| { if (bundler.options.platform.isBun()) continue; defer this.bundler.resetStore(); @@ -941,51 +990,6 @@ pub const Bundler = struct { try this.enqueueItem(resolved); } - if (framework_config != null) { - defer this.bundler.resetStore(); - - try this.bundler.configureFramework(true); - if (bundler.options.framework) |framework| { - Analytics.Features.framework = true; - - if (framework.override_modules.keys.len > 0) { - bundler.options.framework.?.override_modules_hashes = allocator.alloc(u64, framework.override_modules.keys.len) catch unreachable; - for (framework.override_modules.keys) |key, i| { - bundler.options.framework.?.override_modules_hashes[i] = std.hash.Wyhash.hash(0, key); - } - } - if (bundler.options.platform.isBun()) { - if (framework.server.isEnabled()) { - Analytics.Features.bunjs = true; - const resolved = try bundler.resolver.resolve( - bundler.fs.top_level_dir, - framework.server.path, - .entry_point, - ); - try this.enqueueItem(resolved); - } - } else { - if (framework.client.isEnabled()) { - const resolved = try bundler.resolver.resolve( - bundler.fs.top_level_dir, - framework.client.path, - .entry_point, - ); - try this.enqueueItem(resolved); - } - - if (framework.fallback.isEnabled()) { - const resolved = try bundler.resolver.resolve( - bundler.fs.top_level_dir, - framework.fallback.path, - .entry_point, - ); - try this.enqueueItem(resolved); - } - } - } - } else {} - // Normally, this is automatic // However, since we only do the parsing pass, it may not get imported automatically. if (bundler.options.jsx.parse) { @@ -1035,10 +1039,6 @@ pub const Bundler = struct { try generator.appendBytes(comptime runtime.Runtime.sourceContent(false) ++ "\n\n"); } - this.pool.start(this) catch |err| { - Analytics.enqueue(Analytics.EventName.bundle_fail); - return err; - }; this.pool.wait(this) catch |err| { Analytics.enqueue(Analytics.EventName.bundle_fail); return err; @@ -1325,6 +1325,7 @@ pub const Bundler = struct { if (std.mem.indexOfScalar(u32, this.always_bundled_package_hashes, pkg.hash)) |pkg_i| { pkg = this.always_bundled_package_jsons[pkg_i]; + if (!this.always_bundled_booleans[pkg_i]) return null; const key_path_source_dir = pkg.source.key_path.sourceDir(); const default_source_dir = pkg.source.path.sourceDir(); @@ -1358,6 +1359,34 @@ pub const Bundler = struct { var base_path = root.base_path; const package_json = root.package_json; + if (std.mem.indexOfScalar(u32, this.always_bundled_package_hashes, package_json.hash)) |pkg_i| { + var pkg = this.always_bundled_package_jsons[pkg_i]; + if (!this.always_bundled_booleans[pkg_i]) return null; + const key_path_source_dir = pkg.source.key_path.sourceDir(); + const default_source_dir = pkg.source.path.sourceDir(); + + if (strings.startsWith(path.text, key_path_source_dir)) { + import_path = path.text[key_path_source_dir.len..]; + } else if (strings.startsWith(path.text, default_source_dir)) { + import_path = path.text[default_source_dir.len..]; + } else if (strings.startsWith(path.pretty, pkg.name)) { + import_path = path.pretty[pkg.name.len + 1 ..]; + } + + var buf_to_use: []u8 = if (is_main) &normalized_package_path2 else &normalized_package_path; + + std.mem.copy(u8, buf_to_use, pkg.name); + buf_to_use[pkg.name.len] = '/'; + std.mem.copy(u8, buf_to_use[pkg.name.len + 1 ..], import_path); + package_path = buf_to_use[0 .. pkg.name.len + import_path.len + 1]; + return BundledModuleData{ + .import_path = import_path, + .package_path = package_path, + .package = pkg, + .module_id = pkg.hashModule(package_path), + }; + } + // Easymode: the file path doesn't need to be remapped. if (strings.startsWith(file_path, base_path)) { import_path = std.mem.trimLeft(u8, path.text[base_path.len..], "/"); @@ -1386,6 +1415,7 @@ pub const Bundler = struct { if (comptime force) { if (std.mem.indexOfScalar(u32, this.always_bundled_package_hashes, root.package_json.hash)) |pkg_json_i| { const pkg_json = this.always_bundled_package_jsons[pkg_json_i]; + base_path = pkg_json.source.key_path.sourceDir(); if (strings.startsWith(file_path, base_path)) { @@ -1435,7 +1465,6 @@ pub const Bundler = struct { } } } - unreachable; } return null; @@ -1530,6 +1559,7 @@ pub const Bundler = struct { threadlocal var json_e_call: js_ast.E.Call = undefined; threadlocal var json_e_identifier: js_ast.E.Identifier = undefined; threadlocal var json_call_args: [1]js_ast.Expr = undefined; + const PendingImports = std.AutoArrayHashMap(u32, ThreadPool.Worker.ProcessFileTask); pub fn processFile(this: *GenerateNodeModuleBundle, worker: *ThreadPool.Worker, bundler: *Bundler, _resolve: _resolver.Result) !void { const resolve = _resolve; if (resolve.is_external) return; @@ -1538,7 +1568,7 @@ pub const Bundler = struct { var scan_pass_result = &worker.data.scan_pass_result; var file_path = (resolve.pathConst() orelse unreachable).*; - const add_to_bundle = brk: { + var add_to_bundle = brk: { if (resolve.package_json) |package_json| { if (this.package_bundle_map.get(package_json.name)) |result| { break :brk result == .always; @@ -1556,23 +1586,16 @@ pub const Bundler = struct { defer shared_buffer.reset(); defer this.bundler.resetStore(); var log = worker.data.log; - - // If we're in a node_module, build that almost normally + var queue = PendingImports.init(worker.allocator); + var __module_data: ?BundledModuleData = null; if (add_to_bundle) { - var code_offset: u32 = 0; + __module_data = BundledModuleData.getForceBundleForMain(this, &resolve); + } + // If we're in a node_module, build that almost normally + if (add_to_bundle and __module_data != null) { + const module_data = __module_data.?; - const module_data = BundledModuleData.getForceBundleForMain(this, &resolve) orelse { - const fake_path = logger.Source.initPathString(file_path.text, ""); - log.addResolveError( - &fake_path, - logger.Range.None, - this.allocator, - "Bug while resolving: \"{s}\"", - .{file_path.text}, - resolve.import_kind, - ) catch {}; - return error.ResolveError; - }; + var code_offset: u32 = 0; const module_id = module_data.module_id; const package = module_data.package; @@ -1711,11 +1734,12 @@ pub const Bundler = struct { path.* = try path.dupeAlloc(this.allocator); import_record.path = path.*; - - try this.queue.upsert( + _ = queue.getOrPutValue( _module_data.module_id, - _resolved_import.*, - ); + .{ + .resolution = _resolved_import.*, + }, + ) catch unreachable; } else |err| { if (comptime Environment.isDebug) { if (!import_record.handles_import_errors) { @@ -1762,7 +1786,7 @@ pub const Bundler = struct { platform.isBun(), ); } - } else { + } else if (!platform.isBun()) { try addError( log, &source, @@ -2151,18 +2175,22 @@ pub const Bundler = struct { } std.debug.assert(mod.module_id != 0); - try this.queue.upsert( + _ = queue.getOrPutValue( mod.module_id, - _resolved_import.*, - ); + .{ + .resolution = _resolved_import.*, + }, + ) catch unreachable; } else { - try this.queue.upsert( + _ = queue.getOrPutValue( _resolved_import.hash( this.bundler.fs.top_level_dir, loader_, ), - _resolved_import.*, - ); + .{ + .resolution = _resolved_import.*, + }, + ) catch unreachable; } } else |err| { switch (err) { @@ -2193,7 +2221,7 @@ pub const Bundler = struct { platform.isBun(), ); } - } else { + } else if (!platform.isBun()) { try addError( log, &source, @@ -2219,6 +2247,10 @@ pub const Bundler = struct { else => {}, } } + + if (queue.count() > 0) { + try this.queue.writeItem(queue); + } } }; |