diff options
-rw-r--r-- | .vscode/settings.json | 3 | ||||
-rw-r--r-- | src/api/schema.peechy | 7 | ||||
-rw-r--r-- | src/cli.zig | 24 | ||||
-rw-r--r-- | src/cli/install_command.zig | 4 | ||||
-rw-r--r-- | src/fs.zig | 11 | ||||
-rw-r--r-- | src/http_client.zig | 41 | ||||
-rw-r--r-- | src/install/install.zig | 1113 | ||||
-rw-r--r-- | src/install/semver.zig | 75 | ||||
-rw-r--r-- | src/js_ast.zig | 5 | ||||
-rw-r--r-- | src/js_lexer_tables.zig | 3 | ||||
-rw-r--r-- | src/lock.zig | 4 | ||||
-rw-r--r-- | src/string_mutable.zig | 4 | ||||
-rw-r--r-- | src/tagged_pointer.zig | 2 | ||||
-rw-r--r-- | src/thread_pool.zig | 794 |
14 files changed, 1866 insertions, 224 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index c0970369d..25426f899 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,9 +10,10 @@ "[zig]": { "editor.defaultFormatter": "tiehuis.zig" }, - "lldb.verboseLogging": true, "zig.beforeDebugCmd": "make build-unit ${file} ${filter} ${bin}", "zig.testCmd": "make test ${file} ${filter} ${bin}", + + "lldb.verboseLogging": false, "files.exclude": { "**/.git": true, "**/.svn": true, diff --git a/src/api/schema.peechy b/src/api/schema.peechy index 035a7fa26..150bac007 100644 --- a/src/api/schema.peechy +++ b/src/api/schema.peechy @@ -538,16 +538,15 @@ struct WebsocketMessageManifestFailure { } message SemverQualifier { - string pre = 1; - string build = 2; + StringPointer pre = 1; + StringPointer build = 2; } struct Semver { uint32 major; uint32 minor; uint32 patch; - StringPointer raw; - SemverQualifier[] qualifiers; + qualifier SemverQualifier; } enum NPMPackageDataKind { diff --git a/src/cli.zig b/src/cli.zig index 77a110fab..44c6159ca 100644 --- a/src/cli.zig +++ b/src/cli.zig @@ -36,6 +36,7 @@ const CreateCommand = @import("./cli/create_command.zig").CreateCommand; const CreateListExamplesCommand = @import("./cli/create_command.zig").CreateListExamplesCommand; const RunCommand = @import("./cli/run_command.zig").RunCommand; const UpgradeCommand = @import("./cli/upgrade_command.zig").UpgradeCommand; +const InstallCommand = @import("./cli/install_command.zig").InstallCommand; const InstallCompletionsCommand = @import("./cli/install_completions_command.zig").InstallCompletionsCommand; const ShellCompletions = @import("./cli/shell_completions.zig"); var start_time: i128 = undefined; @@ -463,6 +464,7 @@ const HelpCommand = struct { \\> <r> <b><green>dev <r><d> ./a.ts ./b.jsx<r> Start a Bun Dev Server \\> <r> <b><magenta>bun <r><d> ./a.ts ./b.jsx<r> Bundle dependencies of input files into a <r><magenta>.bun<r> \\> <r> <b><green>run <r><d> test <r> Run a package.json script or executable<r> + \\> <r> <b><green>install<r> Install dependencies for a package.json<r> \\> <r> <b><cyan>create <r><d>next ./app<r> Start a new project from a template <d>(shorthand: c)<r> \\> <r> <b><blue>upgrade <r> Get the latest version of Bun \\> <r> <b><d>completions<r> Install shell completions for tab-completion @@ -485,6 +487,7 @@ const HelpCommand = struct { \\> <r> <green>run <r><d> ./a.ts <r> Run a JavaScript-like file with Bun.js \\> <r> <b><blue>discord<r> Open Bun's Discord server \\> <r> <b><blue>upgrade <r> Get the latest version of Bun + \\> <r> <b><green>install<r> Install dependencies for a package.json<r> \\> <r> <b><d>help <r> Print this help menu \\ ; @@ -589,6 +592,7 @@ pub const Command = struct { RootCommandMatcher.case("upgrade") => .UpgradeCommand, RootCommandMatcher.case("completions") => .InstallCompletionsCommand, RootCommandMatcher.case("getcompletes") => .GetCompletionsCommand, + RootCommandMatcher.case("i"), RootCommandMatcher.case("install") => .InstallCommand, RootCommandMatcher.case("c"), RootCommandMatcher.case("create") => .CreateCommand, RootCommandMatcher.case("b"), RootCommandMatcher.case("build") => .BuildCommand, @@ -617,6 +621,7 @@ pub const Command = struct { // "build", "run", "dev", + "install", "create", "bun", "upgrade", @@ -658,6 +663,10 @@ pub const Command = struct { try InstallCompletionsCommand.exec(allocator); return; }, + .InstallCommand => { + try InstallCommand.exec(allocator); + return; + }, .GetCompletionsCommand => { const ctx = try Command.Context.create(allocator, log, .GetCompletionsCommand); var filter = ctx.positionals; @@ -770,17 +779,18 @@ pub const Command = struct { } pub const Tag = enum { - InitCommand, + AutoCommand, + BuildCommand, BunCommand, + CreateCommand, DevCommand, DiscordCommand, - BuildCommand, - RunCommand, - AutoCommand, + GetCompletionsCommand, HelpCommand, - CreateCommand, - UpgradeCommand, + InitCommand, + InstallCommand, InstallCompletionsCommand, - GetCompletionsCommand, + RunCommand, + UpgradeCommand, }; }; diff --git a/src/cli/install_command.zig b/src/cli/install_command.zig index cd2271f16..f3d25e055 100644 --- a/src/cli/install_command.zig +++ b/src/cli/install_command.zig @@ -34,9 +34,9 @@ const NpmArgs = struct { const yarn_commands: []u64 = @import("./list-of-yarn-commands.zig").all_yarn_commands; const ShellCompletions = @import("./shell_completions.zig"); - +const PackageManager = @import("../install/install.zig").PackageManager; pub const InstallCommand = struct { pub fn exec(ctx: Command.Context) !void { - + try PackageManager.install(ctx); } }; diff --git a/src/fs.zig b/src/fs.zig index 9c8a0b6d3..e2faa8697 100644 --- a/src/fs.zig +++ b/src/fs.zig @@ -998,10 +998,11 @@ pub const FileSystem = struct { // doNotCacheEntries bool }; - pub const Implementation = switch (build_target) { - .wasi, .native => RealFS, - .wasm => WasmFS, - }; + pub const Implementation = RealFS; + // pub const Implementation = switch (build_target) { + // .wasi, .native => RealFS, + // .wasm => WasmFS, + // }; }; pub const Directory = struct { path: Path, contents: []string }; @@ -1260,5 +1261,3 @@ test "PathName.init" { try std.testing.expectEqualStrings(res.base, "file"); try std.testing.expectEqualStrings(res.ext, ".ext"); } - -test {} diff --git a/src/http_client.zig b/src/http_client.zig index 560bd0797..f75f9a340 100644 --- a/src/http_client.zig +++ b/src/http_client.zig @@ -12,6 +12,7 @@ const HTTPClient = @This(); const SOCKET_FLAGS = os.SOCK_CLOEXEC; const S2n = @import("./s2n.zig"); const Zlib = @import("./zlib.zig"); +const StringBuilder = @import("./string_builder.zig"); fn writeRequest( comptime Writer: type, @@ -72,7 +73,7 @@ const Socket = std.x.os.Socket; const os = std.os; // lowercase hash header names so that we can be sure -fn hashHeaderName(name: string) u64 { +pub fn hashHeaderName(name: string) u64 { var hasher = std.hash.Wyhash.init(0); var remain: string = name; var buf: [32]u8 = undefined; @@ -84,6 +85,7 @@ fn hashHeaderName(name: string) u64 { hasher.update(strings.copyLowercase(std.mem.span(remain[0..end]), buf_slice)); remain = remain[end..]; } + return hasher.final(); } @@ -128,6 +130,43 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string { return this.header_buf[ptr.offset..][0..ptr.length]; } +pub const HeaderBuilder = struct { + content: StringBuilder = StringBuilder{}, + header_count: u64 = 0, + entries: Headers.Entries = Headers.Entries{}, + + pub fn count(this: *HeaderBuilder, name: string, value: string) void { + this.header_count += 1; + this.content.count(name); + this.content.count(value); + } + + pub fn allocate(this: *HeaderBuilder, allocator: *std.mem.Allocator) !void { + try this.content.allocate(allocator); + try this.entries.ensureTotalCapacity(allocator, this.header_count); + } + pub fn append(this: *HeaderBuilder, name: string, value: string) void { + const name_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, name.len), + }; + + _ = this.content.append(name); + + const value_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, value.len), + }; + _ = this.content.append(value); + this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr }); + } + + pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void { + client.header_entries = this.entries; + client.header_buf = this.content.ptr.?[0..this.content.len]; + } +}; + threadlocal var server_name_buf: [1024]u8 = undefined; pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { diff --git a/src/install/install.zig b/src/install/install.zig index ebb1b6344..235288d7a 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -8,7 +8,7 @@ const options = @import("../options.zig"); const js_parser = @import("../js_parser.zig"); const json_parser = @import("../json_parser.zig"); const js_printer = @import("../js_printer.zig"); -const js_ast = @import("../js_ast.zig"); +const JSAst = @import("../js_ast.zig"); const linker = @import("../linker.zig"); usingnamespace @import("../ast/base.zig"); usingnamespace @import("../defines.zig"); @@ -24,7 +24,9 @@ const DotEnv = @import("../env_loader.zig"); const which = @import("../which.zig").which; const Run = @import("../bun_js.zig").Run; const NewBunQueue = @import("../bun_queue.zig").NewBunQueue; - +const HTTPClient = @import("../http_client.zig"); +const Fs = @import("../fs.zig"); +const Lock = @import("../lock.zig").Lock; var path_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; var path_buf2: [std.fs.MAX_PATH_BYTES]u8 = undefined; const URL = @import("../query_string_map.zig").URL; @@ -44,45 +46,73 @@ const ExternalString = Semver.ExternalString; const StringBuilder = @import("../string_builder.zig"); const SlicedString = Semver.SlicedString; -pub const ExternalStringMap = extern struct { - name: []const ExternalString = &[_]ExternalString{}, +pub fn ExternalSlice(comptime Type: type) type { + return extern struct { + const Slice = @This(); - value: []const ExternalString = &[_]ExternalString{}, -}; + off: u32 = 0, + len: u32 = 0, + + pub inline fn get(this: Slice, in: []const Type) []const Type { + return in[this.off .. this.off + this.len]; + } + + pub inline fn mut(this: Slice, in: []Type) []Type { + return in[this.off .. this.off + this.len]; + } + + pub fn init(buf: []const Type, in: []const Type) Slice { + // if (comptime isDebug or isTest) { + // std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr)); + // std.debug.assert((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len)); + // } + + return Slice{ + .off = @truncate(u32, (@ptrToInt(in.ptr) - @ptrToInt(buf.ptr)) / @sizeOf(Type)), + .len = @truncate(u32, in.len), + }; + } + }; +} + +const PackageID = u32; +const invalid_package_id = std.math.maxInt(PackageID); + +const ExternalStringList = ExternalSlice(ExternalString); +const VersionSlice = ExternalSlice(Semver.Version); -pub const ExternalDependencyMap = extern struct { - len: u32, - versions: []const Semver.Version = &[_]Semver.Version{}, - dependencies: []const ExternalStringMap = &[_]ExternalStringMap{}, +pub const ExternalStringMap = extern struct { + name: ExternalStringList = ExternalStringList{}, + value: ExternalStringList = ExternalStringList{}, }; pub const Dependency = struct { name: string, name_hash: u32, - version: Version, + request: DependencyRequest, pub const Version = union(Tag) { - pub const Tag = enum { + pub const Tag = enum(u8) { /// Semver range - npm, + npm = 1, /// NPM dist tag, e.g. "latest" - dist_tag, + dist_tag = 2, /// URI to a .tgz or .tar.gz - tarball, + tarball = 3, /// Local folder - folder, + folder = 4, /// TODO: - symlink, + symlink = 5, /// TODO: - workspace, + workspace = 6, /// TODO: - git, + git = 7, /// TODO: - github, + github = 8, pub fn isGitHubRepoPath(dependency: string) bool { var slash_count: u8 = 0; @@ -339,6 +369,8 @@ pub const Package = struct { peer_dependencies: Dependency.List = Dependency.List{}, optional_dependencies: Dependency.List = Dependency.List{}, + npm_count: u32 = 0, + pub const Features = struct { optional_dependencies: bool = false, dev_dependencies: bool = false, @@ -349,8 +381,10 @@ pub const Package = struct { fn parseDependencyList( allocator: *std.mem.Allocator, + package_id: DependencyRequest, log: *logger.Log, - expr: js_ast.Expr, + npm_count_: *u32, + expr: JSAst.Expr, ) ?Dependency.List { if (expr.data != .e_object) return null; @@ -360,24 +394,26 @@ pub const Package = struct { var dependencies = Dependency.List{}; dependencies.ensureTotalCapacity(allocator, properties.len) catch @panic("OOM while parsing dependencies?"); + var npm_count = npm_count_.*; + defer npm_count_.* = npm_count; for (properties) |prop| { const name = prop.key.?.asString(allocator) orelse continue; const value = prop.value.?.asString(allocator) orelse continue; - if (Dependency.parse(allocator, value, log)) |version| { - const dependency = Dependency{ - .name = name, - .name_hash = std.hash.Murmur2_32.hash(name), - .version = version, - }; - - dependencies.appendAssumeCapacity(dependency); - } + const version = Dependency.parse(allocator, value, log) orelse continue; + const dependency = Dependency{ + .name = name, + .name_hash = @truncate(u32, std.hash.Wyhash.hash(0, name)), + .request = DependencyRequest{ .version = version, .from = package_id }, + }; + npm_count += @as(u32, @boolToInt(@enumToInt(dependency.version) > @enumToInt(Version.Tag.npm))); + dependencies.appendAssumeCapacity(dependency); } return dependencies; } pub fn parse( + package_id: PackageID, allocator: *std.mem.Allocator, log: *logger.Log, source: logger.Source, @@ -416,24 +452,24 @@ pub const Package = struct { } if (json.asProperty("dependencies")) |dependencies_q| { - package.dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{}; + package.dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{}; } if (comptime features.dev_dependencies) { if (json.asProperty("devDependencies")) |dependencies_q| { - package.dev_dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{}; + package.dev_dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{}; } } if (comptime features.optional_dependencies) { if (json.asProperty("optionalDependencies")) |dependencies_q| { - package.optional_dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{}; + package.optional_dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{}; } } if (comptime features.peer_dependencies) { if (json.asProperty("peerDependencies")) |dependencies_q| { - package.peer_dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{}; + package.peer_dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{}; } } @@ -443,74 +479,308 @@ pub const Package = struct { } }; +fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: *std.mem.Allocator) anyerror!Type)) type { + return struct { + const LinkedList = std.SinglyLinkedList(Type); + var list: LinkedList = undefined; + var loaded: bool = false; + var lock: Lock = undefined; + pub fn get(allocator: *std.mem.Allocator) *LinkedList.Node { + if (loaded) { + lock.lock(); + defer lock.unlock(); + if (list.popFirst()) |node| { + node.data.reset(); + return node; + } + } + + var new_node = allocator.create(LinkedList.Node) catch unreachable; + new_node.* = LinkedList.Node{ + .data = Init( + allocator, + ) catch unreachable, + }; + + return new_node; + } + + pub fn release(node: *LinkedList.Node) void { + if (loaded) { + lock.lock(); + defer lock.unlock(); + list.prepend(node); + return; + } + + list = LinkedList{ .first = node }; + loaded = true; + lock = Lock.init(); + } + }; +} + const Npm = struct { pub const Registry = struct { - url: URL, + url: URL = URL.parse("https://registry.npmjs.org/"), + const JSONPool = ObjectPool(MutableString, MutableString.init2048); + + const default_headers_buf: string = "Acceptapplication/vnd.npm.install-v1+json"; + + const PackageVersionResponse = union(Tag) { + pub const Tag = enum { + cached, + fresh, + not_found, + }; + + cached: void, + fresh: PackageManifest, + not_found: void, + }; + + pub fn getPackageMetadata( + this: *Registry, + allocator: *std.mem.Allocator, + log: *logger.Log, + package_name: string, + last_modified: []const u8, + etag: []const u8, + ) !PackageVersionResponse { + var url_buf = try std.fmt.allocPrint(allocator, "{s}://{s}/{s}", .{ this.url.displayProtocol(), this.url.hostname, package_name }); + defer allocator.free(url_buf); + + var json_pooled = JSONPool.get(allocator); + defer JSONPool.release(json_pooled); + + var header_builder = HTTPClient.HeaderBuilder{}; + + if (last_modified.len != 0) { + header_builder.count("If-Modified-Since", last_modified); + } + + if (etag.len != 0) { + header_builder.count("If-None-Match", etag); + } + + if (header_builder.content.len > 0) { + header_builder.count("Accept", "application/vnd.npm.install-v1+json"); + + if (last_modified.len != 0) { + header_builder.append("If-Modified-Since", last_modified); + } + + if (etag.len != 0) { + header_builder.append("If-None-Match", etag); + } + + header_builder.append("Accept", "application/vnd.npm.install-v1+json"); + } else { + try header_builder.entries.append( + allocator, + .{ + .name = .{ .offset = 0, .length = @truncate(u32, "Accept".len) }, + .value = .{ .offset = "Accept".len, .length = @truncate(u32, default_headers_buf.len - "Accept".len) }, + }, + ); + header_builder.header_count = 1; + header_builder.content = StringBuilder{ .ptr = @intToPtr([*]u8, @ptrToInt(std.mem.span(default_headers_buf).ptr)), .len = default_headers_buf.len, .cap = default_headers_buf.len }; + } + + var client = HTTPClient.init(allocator, .GET, URL.parse(url_buf), header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len]); + + var response = try client.send("", &json_pooled.data); + + switch (response.status_code) { + 429 => return error.TooManyRequests, + 404 => return PackageVersionResponse{ .not_found = .{} }, + 500...599 => return error.HTTPInternalServerError, + 304 => return PackageVersionResponse{ .cached = .{} }, + else => {}, + } + + var newly_last_modified: string = ""; + var new_etag: string = ""; + for (response.headers) |header| { + if (!(header.name.len == "last-modified".len or header.name.len == "etag".len)) continue; + + const hashed = HTTPClient.hashHeaderName(header.name); + + switch (hashed) { + HTTPClient.hashHeaderName("last-modified") => { + newly_last_modified = header.value; + }, + HTTPClient.hashHeaderName("etag") => { + new_etag = header.value; + }, + else => {}, + } + } + + var json_body = json_pooled.data.toOwnedSliceLeaky(); + + JSAst.Expr.Data.Store.create(default_allocator); + JSAst.Stmt.Data.Store.create(default_allocator); + defer { + JSAst.Expr.Data.Store.reset(); + JSAst.Stmt.Data.Store.reset(); + } + + if (try PackageManifest.parse(allocator, log, json_body, package_name, newly_last_modified, new_etag, 300)) |package| { + return PackageVersionResponse{ .fresh = package }; + } + + return error.PackageFailedToParse; + } + }; + + const VersionMap = std.ArrayHashMapUnmanaged(Semver.Version, PackageVersion, Semver.Version.HashContext, false); + const DistTagMap = extern struct { + tags: ExternalStringList = ExternalStringList{}, + versions: VersionSlice = VersionSlice{}, + }; + + const PackageVersionList = ExternalSlice(PackageVersion); + const ExternVersionMap = extern struct { + keys: VersionSlice = VersionSlice{}, + values: PackageVersionList = PackageVersionList{}, + + pub fn findKeyIndex(this: ExternVersionMap, buf: []const Semver.Version, find: Semver.Version) ?u32 { + for (this.keys.get(buf)) |key, i| { + if (key.eql(find)) { + return @truncate(u32, i); + } + } + + return null; + } }; - const ResolvedPackage = struct { + // ~384 bytes each? + pub const PackageVersion = extern struct { + // 32 bytes each + dependencies: ExternalStringMap = ExternalStringMap{}, + optional_dependencies: ExternalStringMap = ExternalStringMap{}, + bins: ExternalStringMap = ExternalStringMap{}, + + // 24 bytes each + integrity: ExternalString = ExternalString{}, + shasum: ExternalString = ExternalString{}, + bin_dir: ExternalString = ExternalString{}, + man_dir: ExternalString = ExternalString{}, + + unpacked_size: u64 = 0, + file_count: u64 = 0, + + os_matches: bool = true, + cpu_matches: bool = true, + + loaded_dependencies: ?*Dependency.List = null, + loaded_optional_dependencies: ?*Dependency.List = null, + }; + + const BigExternalString = Semver.BigExternalString; + + /// Efficient, serializable NPM package metadata + /// All the "content" is stored in three separate arrays, + /// Everything inside here is just pointers to one of the three arrays + const NpmPackage = extern struct { name: ExternalString = ExternalString{}, - releases: VersionMap, - prereleases: VersionMap, + releases: ExternVersionMap = ExternVersionMap{}, + prereleases: ExternVersionMap = ExternVersionMap{}, dist_tags: DistTagMap = DistTagMap{}, + /// "modified" in the JSON + modified: ExternalString = ExternalString{}, + + /// HTTP response headers last_modified: ExternalString = ExternalString{}, etag: ExternalString = ExternalString{}, + public_max_age: u32 = 0, + + string_buf: BigExternalString = BigExternalString{}, + versions_buf: VersionSlice = VersionSlice{}, + string_lists_buf: ExternalStringList = ExternalStringList{}, + }; + + const PackageManifest = struct { + name: string, + + pkg: NpmPackage = NpmPackage{}, - string_buf: []u8, + string_buf: []const u8 = &[_]u8{}, + versions: []const Semver.Version = &[_]Semver.Version{}, + external_strings: []const ExternalString = &[_]ExternalString{}, + package_versions: []PackageVersion = &[_]PackageVersion{}, - pub fn findBestVersion(this: *const ResolvedPackage, group: Semver.Query.Group) ?*VersionedPackage { + pub fn str(self: *const PackageManifest, external: ExternalString) string { + return external.slice(self.string_buf); + } + + pub fn reportSize(this: *const PackageManifest) void { + const versions = std.mem.sliceAsBytes(this.versions); + const external_strings = std.mem.sliceAsBytes(this.external_strings); + const package_versions = std.mem.sliceAsBytes(this.package_versions); + const string_buf = std.mem.sliceAsBytes(this.string_buf); + + Output.prettyErrorln( + \\ Versions count: {d} + \\ External Strings count: {d} + \\ Package Versions count: {d} + \\ + \\ Bytes: + \\ + \\ Versions: {d} + \\ External: {d} + \\ Packages: {d} + \\ Strings: {d} + \\ Total: {d} + , .{ + this.versions.len, + this.external_strings.len, + this.package_versions.len, + + std.mem.sliceAsBytes(this.versions).len, + std.mem.sliceAsBytes(this.external_strings).len, + std.mem.sliceAsBytes(this.package_versions).len, + std.mem.sliceAsBytes(this.string_buf).len, + std.mem.sliceAsBytes(this.versions).len + + std.mem.sliceAsBytes(this.external_strings).len + + std.mem.sliceAsBytes(this.package_versions).len + + std.mem.sliceAsBytes(this.string_buf).len, + }); + Output.flush(); + } + + pub fn findBestVersion(this: *const PackageManifest, group: Semver.Query.Group) ?*PackageVersion { const left = group.head.head.range.left; // Fast path: exact version - if (left.op == .version) { + if (left.op == .eql) { if (!left.version.tag.hasPre()) { - return this.releases.getPtr(left.version); + return &this.pkg.releases.values.mut(this.package_versions)[this.pkg.releases.findKeyIndex(this.versions, left.version) orelse return null]; } else { - return this.prereleases.getPtr(left.version); + return &this.pkg.prereleases.values.mut(this.package_versions)[this.pkg.prereleases.findKeyIndex(this.versions, left.version) orelse return null]; } } - // For now, this is the dumb way - for (this.releases.keys()) |version| { - if (group.satisfies(version)) { - return this.releases.getPtr(version); - } - } + // // For now, this is the dumb way + // for (this.pkg.releases.keys) |version| { + // if (group.satisfies(version)) { + // return this.releases.getPtr(version); + // } + // } - for (this.prereleases.keys()) |version| { - if (group.satisfies(version)) { - return this.prereleases.getPtr(version); - } - } + // for (this.prereleases.keys()) |version| { + // if (group.satisfies(version)) { + // return this.prereleases.getPtr(version); + // } + // } return null; } - const VersionMap = std.ArrayHashMap(Semver.Version, VersionedPackage, Semver.Version.HashContext, false); - const DistTagMap = extern struct { - tags: []const ExternalString = &[_]ExternalString{}, - versions: []const Semver.Version = &[_]Semver.Version{}, - }; - - pub const VersionedPackage = extern struct { - dependencies: ExternalStringMap = ExternalStringMap{}, - optional_dependencies: ExternalStringMap = ExternalStringMap{}, - integrity: ExternalString = ExternalString{}, - shasum: ExternalString = ExternalString{}, - bins: ExternalStringMap = ExternalStringMap{}, - bin_dir: ExternalString = ExternalString{}, - man_dir: ExternalString = ExternalString{}, - unpacked_size: u64 = 0, - file_count: u64 = 0, - - os_matches: bool = true, - cpu_matches: bool = true, - - loaded_dependencies: ?*Dependency.List = null, - loaded_optional_dependencies: ?*Dependency.List = null, - }; - + /// This parses [Abbreviated metadata](https://github.com/npm/registry/blob/master/docs/responses/package-metadata.md#abbreviated-metadata-format) pub fn parse( allocator: *std.mem.Allocator, log: *logger.Log, @@ -518,42 +788,45 @@ const Npm = struct { expected_name: []const u8, etag: []const u8, last_modified: []const u8, - ) !?ResolvedPackage { - // npm package names have a max of 512 chars - // we add an extra 32 because a little wiggle room is good - var expected_name_buf: [512 + "/package.json".len + 32]u8 = undefined; - const source = logger.Source.initPathString(try std.fmt.bufPrint(&expected_name_buf, "{s}/package.json", .{expected_name}), json_buffer); + public_max_age: u32, + ) !?PackageManifest { + const source = logger.Source.initPathString(expected_name, json_buffer); const json = json_parser.ParseJSON(&source, log, allocator) catch |err| { return null; }; if (json.asProperty("error")) |error_q| { - if (error_q.asString(allocator)) |err| { + if (error_q.expr.asString(allocator)) |err| { log.addErrorFmt(&source, logger.Loc.Empty, allocator, "npm error: {s}", .{err}) catch unreachable; return null; } } - var result = ResolvedPackage{ - .name = ExternalString{}, - .releases = DependencyMap.initContext(allocator, Semver.Version.HashContext{}), - .prereleases = DependencyMap.initContext(allocator, Semver.Version.HashContext{}), - .string_buf = &[_]u8{}, + var result = PackageManifest{ + .name = "", }; var string_builder = StringBuilder{}; + string_builder.count(last_modified); + string_builder.count(etag); if (json.asProperty("name")) |name_q| { const name = name_q.expr.asString(allocator) orelse return null; - if (name != expected_name) { - Output.panic("<r>internal: <red>package name mismatch<r> expected \"{s}\" but received <b>\"{s}\"<r>", .{ expected_name, name }); + if (!strings.eql(name, expected_name)) { + Output.panic("<r>internal: <red>package name mismatch<r> expected <b>\"{s}\"<r> but received <red>\"{s}\"<r>", .{ expected_name, name }); return null; } string_builder.count(name); } + if (json.asProperty("modified")) |name_q| { + const name = name_q.expr.asString(allocator) orelse return null; + + string_builder.count(name); + } + var release_versions_len: usize = 0; var pre_versions_len: usize = 0; var dependency_sum: usize = 0; @@ -600,7 +873,7 @@ const Npm = struct { for (properties) |property| { if (property.key.?.asString(allocator)) |key| { string_builder.count(key); - string_builder.count(property.value.?.data.e_string.utf8.len); + string_builder.count(property.value.?.data.e_string.utf8); } } } @@ -623,63 +896,103 @@ const Npm = struct { } } + var versioned_packages = try allocator.alloc(PackageVersion, release_versions_len + pre_versions_len); + std.mem.set( + PackageVersion, + versioned_packages, + + PackageVersion{}, + ); + var all_semver_versions = try allocator.alloc(Semver.Version, release_versions_len + pre_versions_len); + std.mem.set(Semver.Version, all_semver_versions, Semver.Version{}); var all_extern_strings = try allocator.alloc(ExternalString, extern_string_count); - try string_builder.allocate(); + std.mem.set( + ExternalString, + all_extern_strings, + + ExternalString{}, + ); + var versioned_package_releases = versioned_packages[0..release_versions_len]; + var versioned_package_prereleases = versioned_packages[release_versions_len..][0..pre_versions_len]; + var all_release_versions = all_semver_versions[0..release_versions_len]; + var all_prerelease_versions = all_semver_versions[release_versions_len..][0..pre_versions_len]; + var release_versions = all_release_versions; + var prerelease_versions = all_prerelease_versions; + + var extern_strings = all_extern_strings; + try string_builder.allocate(allocator); var string_buf = string_builder.ptr.?[0..string_builder.cap]; if (json.asProperty("name")) |name_q| { const name = name_q.expr.asString(allocator) orelse return null; - string_builder.append(name); - result.name = name; + result.name = string_builder.append(name); + result.pkg.name = ExternalString.init(string_buf, result.name, std.hash.Wyhash.hash(0, name)); } + var unique_string_count: usize = 0; + var unique_string_len: usize = 0; + var string_slice = SlicedString.init(string_buf, string_buf); get_versions: { if (json.asProperty("versions")) |versions_q| { if (versions_q.expr.data != .e_object) break :get_versions; const versions = versions_q.expr.data.e_object.properties; - var package_versions = try allocator.alloc(VersionedPackage, release_versions_len + pre_versions_len); - try result.releases.ensureTotalCapacity(release_versions_len); - try result.prereleases.ensureTotalCapacity(pre_versions_len); - var all_dependency_names_and_values = all_extern_strings[0 .. dependency_sum * 2]; - all_extern_strings = all_extern_strings[dependency_sum * 2 ..]; + var dependency_names = all_dependency_names_and_values[0..dependency_sum]; var dependency_values = all_dependency_names_and_values[dependency_sum..]; - var prev_names = dependency_names[0..0]; - var prev_versions = dependency_values[0..0]; + var prev_names: []ExternalString = &[_]ExternalString{}; + var prev_versions: []ExternalString = &[_]ExternalString{}; + const DedupString = std.HashMap( + u64, + ExternalString, + struct { + pub fn hash(this: @This(), key: u64) u64 { + return key; + } + pub fn eql(this: @This(), a: u64, b: u64) bool { + return a == b; + } + }, + 80, + ); + var deduper = DedupString.init(allocator); + defer deduper.deinit(); for (versions) |prop, version_i| { - const version_name = string_builder.append(prop.key.?.asString(allocator) orelse continue); + const version_name = prop.key.?.asString(allocator) orelse continue; - const parsed_version = Semver.Version.parse(SlicedString.init(string_buf, version_name), allocator); + const parsed_version = Semver.Version.parse(SlicedString.init(version_name, version_name), allocator); std.debug.assert(parsed_version.valid); if (!parsed_version.valid) { - log.addErrorFmt(source, prop.value.?.loc, allocator, "Failed to parse dependency {s}", .{name}) catch unreachable; + log.addErrorFmt(&source, prop.value.?.loc, allocator, "Failed to parse dependency {s}", .{version_name}) catch unreachable; continue; } - var package_version = VersionedPackage{}; + var package_version = PackageVersion{}; var count: usize = 0; - if (prop.value.?.asProperty("dependencies")) |versioned_deps| { + const versioned_deps_ = prop.value.?.asProperty("dependencies"); + const cpu_prop = prop.value.?.asProperty("cpu"); + const os_prop = prop.value.?.asProperty("os"); + if (versioned_deps_) |versioned_deps| { if (versioned_deps.expr.data == .e_object) { count = versioned_deps.expr.data.e_object.properties.len; } } - if (prop.value.?.asProperty("cpu")) |cpu_prop| { + if (cpu_prop) |cpu| { const CPU = comptime if (Environment.isAarch64) "arm64" else "x64"; package_version.cpu_matches = false; - switch (cpu_prop.expr.data) { + switch (cpu.expr.data) { .e_array => |arr| { for (arr.items) |item| { - if (item.asString(allocator)) |cpu| { - if (cpu.eql(@TypeOf(CPU), CPU)) { + if (item.asString(allocator)) |cpu_str| { + if (strings.eqlComptime(cpu_str, CPU)) { package_version.cpu_matches = true; break; } @@ -687,22 +1000,22 @@ const Npm = struct { } }, .e_string => |str| { - package_version.cpu_matches = str.eql(@TypeOf(CPU), CPU); + package_version.cpu_matches = strings.eql(str.utf8, CPU); }, else => {}, } } - if (prop.value.?.asProperty("os")) |cpu_prop| { + if (os_prop) |os| { // TODO: musl const OS = comptime if (Environment.isLinux) "linux" else "darwin"; package_version.os_matches = false; - switch (cpu_prop.expr.data) { + switch (os.expr.data) { .e_array => |arr| { for (arr.items) |item| { - if (item.asString(allocator)) |cpu| { - if (cpu.eql(@TypeOf(OS), OS)) { + if (item.asString(allocator)) |os_str| { + if (strings.eqlComptime(os_str, OS)) { package_version.os_matches = true; break; } @@ -710,47 +1023,46 @@ const Npm = struct { } }, .e_string => |str| { - package_version.os_matches = str.eql(@TypeOf(OS), OS); + package_version.os_matches = strings.eql(str.utf8, OS); }, else => {}, } } - var this_names = dependency_names[0..count]; - var this_versions = dependency_values[0..count]; - integrity: { if (prop.value.?.asProperty("dist")) |dist| { if (dist.expr.data == .e_object) { - if (dist.expr.asProperty("integrity")) |shasum| { - if (shasum.expr.asString(allocator)) |shasum_str| { - package_version.integrity = string_builder.append(shasum_str); - break :integrity; - } - } - if (dist.expr.asProperty("fileCount")) |file_count_| { if (file_count_.expr.data == .e_number) { - package_version.file_count = @floatToInt(u64, @maximum(@floor(file_count_.expr.data.e_number.value), 0)); + package_version.file_count = file_count_.expr.data.e_number.toU64(); } } if (dist.expr.asProperty("unpackedSize")) |file_count_| { if (file_count_.expr.data == .e_number) { - package_version.unpacked_size = @floatToInt(u64, @maximum(@floor(file_count_.expr.data.e_number.value), 0)); + package_version.unpacked_size = file_count_.expr.data.e_number.toU64(); + } + } + + if (dist.expr.asProperty("integrity")) |shasum| { + if (shasum.expr.asString(allocator)) |shasum_str| { + package_version.integrity = string_slice.sub(string_builder.append(shasum_str)).external(); + break :integrity; } } if (dist.expr.asProperty("shasum")) |shasum| { if (shasum.expr.asString(allocator)) |shasum_str| { - package_version.shasum = string_builder.append(shasum_str); + package_version.shasum = string_slice.sub(string_builder.append(shasum_str)).external(); } } } } } + if (versioned_deps_) |versioned_deps| { + var this_names = dependency_names[0..count]; + var this_versions = dependency_values[0..count]; - if (prop.value.?.asProperty("dependencies")) |versioned_deps| { const items = versioned_deps.expr.data.e_object.properties; var any_differences = false; for (items) |item, i| { @@ -758,24 +1070,33 @@ const Npm = struct { // Often, npm packages have the same dependency names/versions many times. // This is a cheap way to usually dedup these dependencies. const name_str = item.key.?.asString(allocator) orelse continue; + const version_str = item.value.?.asString(allocator) orelse continue; + const name_hash = std.hash.Wyhash.hash(0, name_str); + const version_hash = std.hash.Wyhash.hash(0, version_str); + var name_entry = try deduper.getOrPut(name_hash); + var version_entry = try deduper.getOrPut(version_hash); - if (prev_names.len > i) this_names[i] = prev_names[i]; - if (!(prev_names.len > i and prev_names[i].hash == name_hash)) { + unique_string_count += @as(usize, @boolToInt(!name_entry.found_existing)) + @as(usize, @boolToInt(!version_entry.found_existing)); + unique_string_len += @as(usize, @boolToInt(!name_entry.found_existing) * name_str.len) + @as(usize, @boolToInt(!version_entry.found_existing) * version_str.len); + + if (!name_entry.found_existing) { any_differences = true; this_names[i] = ExternalString.init(string_buf, string_builder.append(name_str), name_hash); } - const version_str = item.value.?.asString(allocator) orelse continue; - const version_hash = std.hash.Wyhash.hash(0, version_str); - if (prev_versions.len > i) this_versions[i] = prev_versions[i]; if (!(prev_versions.len > i and prev_versions[i].hash == version_hash)) { any_differences = true; this_versions[i] = ExternalString.init(string_buf, string_builder.append(version_str), version_hash); } + + count = i; } + this_names = this_names[0..count]; + this_versions = this_versions[0..count]; + if (any_differences) { dependency_names = dependency_names[count..]; dependency_values = dependency_values[count..]; @@ -783,61 +1104,62 @@ const Npm = struct { this_names = prev_names; this_versions = prev_versions; } - } - package_version.dependencies = ExternalStringMap{ .name = this_names, .value = this_versions }; - package_versions[0] = package_version; - package_versions = package_versions[1..]; + if (this_names.len > 0) { + package_version.dependencies = ExternalStringMap{ + .name = ExternalStringList.init(all_extern_strings, this_names), + .value = ExternalStringList.init(all_extern_strings, this_versions), + }; + } + } if (!parsed_version.version.tag.hasPre()) { - result.releases.putAssumeCapacityNoClobber( - parsed_version.version, - list, - ); + release_versions[0] = parsed_version.version; + versioned_package_releases[0] = package_version; + release_versions = release_versions[1..]; + versioned_package_releases = versioned_package_releases[1..]; } else { - result.prereleases.putAssumeCapacityNoClobber( - parsed_version.version, - list, - ); - } - - if (count > 0) { - prev_names = this_names; - prev_versions = this_versions; + prerelease_versions[0] = parsed_version.version; + versioned_package_prereleases[0] = package_version; + prerelease_versions = prerelease_versions[1..]; + versioned_package_prereleases = versioned_package_prereleases[1..]; } } } } - // Since dist-tags are what will change most often, we will put them last incase we want to someday do incremental updates - if (json.asProperty("dist-tags")) |dist| { - if (dist.expr.data == .e_object) { - const tags = dist.expr.data.e_object.properties; - result.dist_tags.tags = all_extern_strings; - for (tags) |tag, i| { - if (tag.key.?.asString(allocator)) |key| { - const tag_str = string_builder.append(key); - const tag_hash = std.hash.Wyhash.hash(0, tag_str); - result.dist_tags.tags[i] = ExternalString.init(string_buf, tag_str, tag_hash); - - var parse_result = Semver.Version.parse(SlicedString.init(source.contents, tag.value.?.asString(allocator)), allocator); - if (parse_result.valid) { - if (parse_result.version.tag.hasPre()) { - const entry = result.prereleases.getEntry(parse_result.version) orelse unreachable; - // saves us from copying the semver versions again - result.dist_tags.versions[i] = entry.key_ptr.*; - } else { - const entry = result.releases.getEntry(parse_result.version) orelse unreachable; - result.dist_tags.versions[i] = entry.key_ptr.*; - } - } - } - } - } + result.pkg.last_modified = string_slice.sub(string_builder.append(last_modified)).external(); + result.pkg.etag = string_slice.sub(string_builder.append(etag)).external(); + + result.pkg.releases.keys.len = @truncate(u32, release_versions_len); + result.pkg.releases.values.len = @truncate(u32, release_versions_len); + + result.pkg.prereleases.keys.off = result.pkg.releases.keys.len; + result.pkg.prereleases.values.len = @truncate(u32, pre_versions_len); + + result.pkg.string_lists_buf.off = 0; + result.pkg.string_lists_buf.len = @truncate(u32, all_extern_strings.len); + + result.pkg.versions_buf.off = 0; + result.pkg.versions_buf.len = @truncate(u32, all_semver_versions.len); + + result.versions = all_semver_versions; + result.external_strings = all_extern_strings; + result.package_versions = versioned_packages; + + if (json.asProperty("modified")) |name_q| { + const name = name_q.expr.asString(allocator) orelse return null; + + result.pkg.modified = string_slice.sub(string_builder.append(name)).external(); } if (string_builder.ptr) |ptr| { result.string_buf = ptr[0..string_builder.len]; + result.pkg.string_buf = BigExternalString{ + .off = 0, + .len = @truncate(u32, string_builder.len), + .hash = 0, + }; } return result; @@ -845,6 +1167,437 @@ const Npm = struct { }; }; -pub const Install = struct { - root_package: *Package = undefined, +pub const DownloadPackageManifestTask = struct { + ctx: *PackageManager, + name: string, + + task: ThreadPool.Task, + + pub fn download(task: *ThreadPool.Task) void { + var this: *DownloadPackageManifestTask = @fieldParentPtr(DownloadPackageManifestTask, "task", task); + defer { + var node = @fieldParentPtr(Pool.LinkedList.Node, "data", this); + this.name = ""; + Pool.release(node); + } + + var log = logger.Log.init(this.ctx.allocator); + defer if (log.msgs.items.len > 0) this.ctx.mergeLog(log); + var package_manifest = this.ctx.registry.getPackageMetadata(this.ctx.allocator, &log, this.name, "", "") catch |err| { + log.addErrorFmt(null, logger.Loc.Empty, this.ctx.allocator, "Error fetching package manifest: {s}", .{@errorName(err)}) catch unreachable; + return; + }; + switch (package_manifest) { + .not_found => { + log.addErrorFmt(null, logger.Loc.Empty, this.ctx.allocator, "Package not found: {s}", this.name); + return; + }, + .fresh => |resolved| { + this.ctx.appendPackageResolution(resolved); + }, + else => unreachable, + } + } + + pub fn initFn(allocator: *std.mem.Allocator) !DownloadPackageManifestTask { + return DownloadPackageManifestTask{ .ctx = undefined, .name = "", .task = .{ .callback = download } }; + } + + pub const Pool = ObjectPool(DownloadPackageManifestTask, initFn); +}; + +const Task = union(Tag) { + resolve: ResolveTask, + install: InstallTask, + + pub const Tag = enum { + resolve, + install, + }; +}; + +pub const DependencyLevel = enum { dependency, dev, optional, peer }; +pub const Dependents = std.EnumArray(DependencyLevel, std.ArrayListUnmanaged(PackageID)); + +pub const Installation = struct { + tarball_path: string = "", + cached_dir: string = "", +}; + +const PackageBlock = struct { + pub const block_size = 256; + items: [block_size]Package = undefined, + dependents: [block_size]Dependents = undefined, + installations: [block_size]Installation = undefined, + next: std.atomic.Atomic(?*PackageBlock) = std.atomic.Atomic(?*PackageBlock).init(null), + lock: Lock = Lock.init(), + len: std.atomic.Atomic(u16) = std.atomic.Atomic(u16).init(0), + + pub fn append(this: *PackageBlock, package: Package) *Package { + this.lock.lock(); + defer this.lock.unlock(); + const i = this.len.fetchAdd(1, .Monotonic); + this.items[i] = package; + this.dependents[i] = Dependents.initFill(std.ArrayListUnmanaged(PackageID){}); + return &this.items[i]; + } +}; + +const PackageList = struct { + head: PackageBlock = PackageBlock{}, + tail: std.atomic.Atomic(?*PackageBlock) = std.atomic.Atomic(?*PackageBlock).init(null), + allocator: *std.mem.Allocator = undefined, + pub fn append(this: *PackageList, package: Package) !*Package { + var block: *PackageBlock = this.tail.load(.Monotonic) orelse &this.head; + std.debug.assert(block.next.load(.Monotonic) == null); + + if (block.len.fetchMin(PackageBlock.block_size, .Monotonic) >= PackageBlock.block_size) { + block.lock.lock(); + defer block.lock.unlock(); + var tail = try this.allocator.create(PackageBlock); + tail.* = PackageBlock{}; + tail.items[0] = package; + tail.dependents[0] = Dependents.initFill(std.ArrayListUnmanaged(PackageID){}); + tail.len.storeUnchecked(1); + block.next = tail; + this.tail.store(tail, .Monotonic); + return &tail.items[0]; + } else { + return block.append(package); + } + } +}; + +const IdentityContext = struct { + pub fn hash(this: @This(), key: u32) u64 { + return key; + } + + pub fn eql(this: @This(), a: u32, b: u32) bool { + return a == b; + } +}; + +const ArrayIdentityContext = struct { + pub fn hash(this: @This(), key: u32) u32 { + return key; + } + + pub fn eql(this: @This(), a: u32, b: u32) bool { + return a == b; + } +}; + +const DependencyRequest = struct { + version: Dependency.Version, + from: PackageID = invalid_package_id, + resolution: PackageID = invalid_package_id, +}; + +/// Versions & ranges to resolve for a package +/// A linked list so that we can append without allocating +/// We expect individual queues to not be 100s long, so it shouldn't be so bad to use pointers here +const ResolveQueue = std.SinglyLinkedList(*Dependency); +// Hash table mapping Manifest.name_hash to +const ResolveMap = std.ArrayHashMap(u32, ResolveQueue, ArrayIdentityContext, false); + +const ThreadPool = @import("../thread_pool.zig"); + +// We can't know all the package s we need until we've downloaded all the packages +// The easy way wouild be: +// 1. Download all packages, parsing their dependencies and enqueuing all dependnecies for resolution +// 2. +pub const PackageManager = struct { + pub var package_list = PackageList{}; + + enable_cache: bool = true, + cache_directory_path: string = "", + cache_directory: std.fs.Dir = undefined, + root_dir: *Fs.FileSystem.DirEntry, + env_loader: *DotEnv.Loader, + allocator: *std.mem.Allocator, + root_package: *Package, + log: *logger.Log, + + default_features: Package.Features = Package.Features{}, + + registry: Npm.Registry = Npm.Registry{}, + + /// Tracks a list of packages we have already enqueued for downloading + /// The key is Dependency.name_hash + /// The queue for actually downloading is separate + seen_npm_packages: PackageDedupeList = PackageDedupeList{}, + seen_npm_packages_lock: Lock = Lock.init(), + + seen_tarball_urls: PackageDedupeList = PackageDedupeList{}, + seen_tarball_urls_lock: Lock = Lock.init(), + thread_pool: ThreadPool, + + manifests_lock: Lock = Lock.init(), + manifests: PackageManifestMap = PackageManifestMap{}, + + resolve_lock: Lock = Lock.init(), + pending_resolve_queue: ResolveMap = ResolveMap{}, + pending_resolutions_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + + const PackageManifestMap = std.StringHashMapUnmanaged(Npm.PackageManifest); + const PackageDedupeList = std.HashMapUnmanaged( + u32, + void, + IdentityContext, + 80, + ); + + fn doesNeedToDownloadPackageManifest(this: *PackageManager, name_hash: u32) bool { + return !this.seen_npm_packages.getOrPutAssumeCapacity(name_hash).found_existing; + } + + inline fn enqueueNpmPackage(this: *PackageManager, batch: *ThreadPool.Batch, name: string) *DownloadPackageManifestTask { + var node = DownloadPackageManifestTask.Pool.get(this.allocator); + node.data.name = name; + node.data.ctx = this; + return node; + } + + fn enqueuePackages(this: *PackageManager, dependencies: Dependency.List) ThreadPool.Batch { + var batch = ThreadPool.Batch{}; + var count: u32 = 0; + var slice = dependencies.unmanaged.entries.slice(); + const values = slice.items(.value); + var i: usize = 0; + var last_npm_package: ?*DownloadPackageManifestTask = null; + while (i < values.len) : (i += 1) { + const dependency: Dependency = values[i]; + switch (dependency.version) { + .npm, .dist_tag => { + if (this.doesNeedToDownloadPackageManifest(dependency.name_hash)) { + var current = this.enqueueNpmPackage(dependency); + if (last_npm_package != null) { + batch.tail.?.node.next = ¤t.task.node; + batch.len += 1; + } else { + batch = ThreadPool.Batch.from(current); + } + if (verbose_install) { + Output.prettyErrorln("Enqueue dependency: {s}", .{dependency.name}); + } + batch.tail = current; + last_npm_package = current; + } + }, + else => {}, + } + } + + if (verbose_install) Output.flush(); + + return batch; + } + + pub fn enqueueDependencyList(this: *PackageManager, package: *const Package, features: Package.Features) void { + + // Step 2. Allocate the list + if (package.npm_count > 0) { + this.seen_npm_packages_lock.lock(); + defer this.seen_npm_packages_lock.unlock(); + this.seen_npm_packages.ensureUnusedCapacity(package.npm_count) catch unreachable; + var batch = this.enqueuePackages(package.dependencies); + + if (features.dev_dependencies) { + batch = batch.push(this.enqueuePackages(package.dev_dependencies)); + } + + if (features.peer_dependencies) { + batch = batch.push(this.enqueuePackages(package.peer_dependencies)); + } + + if (features.optional_dependencies) { + batch = batch.push(this.enqueuePackages(package.optional_dependencies)); + } + + this.thread_pool.schedule(batch); + } + } + + pub fn appendPackageResolution(this: *PackageManager, manifest: Npm.PackageManifest) void { + const name_hash = @truncate(u32, manifest.pkg.name.hash); + { + this.manifests_lock.lock(); + defer this.manifests_lock.unlock(); + + this.manifests.getOrPutValue(this.allocator, name_hash, manifest) catch unreachable; + } + + { + this.resolve_lock.lock(); + defer this.resolve_lock.unlock(); + if (this.pending_resolve_queue.get(name_hash)) |pending| { + while (pending.popFirst()) |semver_group| {} + } + } + } + + pub fn fetchCacheDirectoryPath( + allocator: *std.mem.Allocator, + env_loader: *DotEnv.Loader, + root_dir: *Fs.FileSystem.DirEntry, + ) ?string { + if (env_loader.map.get("BUN_INSTALL_CACHE_DIR")) |dir| { + return dir; + } + + if (env_loader.map.get("BUN_INSTALL")) |dir| { + var parts = [_]string{ dir, "install/", "cache/" }; + return Fs.FileSystem.instance.joinBuf(&parts); + } + + if (env_loader.map.get("HOME")) |dir| { + var parts = [_]string{ dir, ".bun/", "install/", "cache/" }; + return Fs.FileSystem.instance.joinBuf(&parts); + } + + if (env_loader.map.get("XDG_CACHE_HOME")) |dir| { + var parts = [_]string{ dir, ".bun/", "install/", "cache/" }; + return Fs.FileSystem.instance.joinBuf(&parts); + } + + return null; + } + + fn loadAllDependencies(this: *PackageManager) !void {} + fn installDependencies(this: *PackageManager) !void {} + + var cwd_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + var package_json_cwd_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + pub fn install( + ctx: Command.Context, + ) !void { + var fs = try Fs.FileSystem.init1(ctx.allocator, null); + var original_cwd = std.mem.trimRight(u8, fs.top_level_dir, "/"); + + std.mem.copy(u8, &cwd_buf, original_cwd); + + // Step 1. Find the nearest package.json directory + // + // We will walk up from the cwd, calling chdir on each directory until we find a package.json + // If we fail to find one, we will report an error saying no packages to install + var package_json_file: std.fs.File = brk: { + break :brk std.fs.cwd().openFileZ("package.json", .{ .read = true, .write = true }) catch |err2| { + var this_cwd = original_cwd; + outer: while (std.fs.path.dirname(this_cwd)) |parent| { + cwd_buf[parent.len + 1] = 0; + var chdir = cwd_buf[0..parent.len :0]; + + std.os.chdirZ(chdir) catch break :brk null; + std.fs.cwd().openFileZ("package.json", .{ .read = true, .write = true }) catch |err| { + this_cwd = parent; + continue :outer; + }; + } + + break :brk null; + }; + } orelse { + Output.prettyErrorln("<r><red>Missing package.json<r>! Nothing to install.", .{}); + Output.flush(); + return; + }; + + fs.top_level_dir = try std.os.getcwd(&cwd_buf); + cwd_buf[fs.top_level_dir.len] = '/'; + cwd_buf[fs.top_level_dir.len + 1] = 0; + fs.top_level_dir = cwd_buf[0 .. fs.top_level_dir.len + 1]; + std.mem.copy(u8, &package_json_cwd_buf, fs.top_level_dir); + std.mem.copy(u8, package_json_cwd_buf[fs.top_level_dir.len..], "package.json"); + var package_json_contents = package_json_file.readToEndAlloc(ctx.allocator, std.math.maxInt(usize)) catch |err| { + Output.prettyErrorln("<r><red>{s} reading package.json<r>!", .{@errorName(err)}); + Output.flush(); + return; + }; + // Step 2. Parse the package.json file + // + var package_json_source = logger.Source.initPathString( + package_json_cwd_buf[0 .. fs.top_level_dir.len + "package.json".len], + ); + package_list.items[0] = try Package.parse( + ctx.allocator, + ctx.log, + package_json_source, + Package.Features{ + .optional_dependencies = true, + .dev_dependencies = true, + .is_main = true, + }, + ); + var root = &package_list.items[0]; + package_list.len = 1; + var env_loader: DotEnv.Loader = brk: { + var map = try ctx.allocator.create(DotEnv.Map); + map.* = DotEnv.Map.init(ctx.allocator); + + break :brk DotEnv.Loader.init(map, ctx.allocator); + }; + + var entries_option = try fs.fs.readDirectory(fs.top_Level_dir, std.fs.cwd()); + var enable_cache = false; + var cache_directory_path: string = ""; + + if (Install.fetchCacheDirectoryPath(ctx.allocator, env_loader, entries_option.dir)) |cache_dir_path| { + enable_cache = true; + cache_directory_path = try fs.dirname_store.append(@TypeOf(cache_dir_path), cache_dir_path); + } + + if (verbose_install) { + Output.prettyErrorln("Cache Dir: {s}", .{cache_directory_path}); + Output.flush(); + } + + var manager = PackageManager{ + .enable_cache = enable_cache, + .cache_directory_path = cache_directory_path, + .env_loader = env_loader, + .allocator = ctx.allocator, + .log = ctx.log, + .root_dir = entries_option.dir, + .root_package = root, + .thread_pool = ThreadPool.init(.{}), + }; + package_list.allocator = ctx.allocator; + + try manager.enqueueDependencyList( + &package_list.items[0], + Package.Features{ + .optional_dependencies = true, + .dev_dependencies = true, + .is_main = true, + }, + ); + + try manager.loadAllDependencies(); + try manager.installDependencies(); + } }; + +const verbose_install = true; + +test "getPackageMetadata" { + Output.initTest(); + + var registry = Npm.Registry{}; + var log = logger.Log.init(default_allocator); + + var response = try registry.getPackageMetadata(default_allocator, &log, "lodash", "", ""); + + const react_17 = try Semver.Query.parse(default_allocator, "1.2.0"); + + switch (response) { + .cached, .not_found => unreachable, + .fresh => |package| { + package.reportSize(); + const react = package.findBestVersion(react_17) orelse unreachable; + + const entry = react.dependencies.name.get(package.external_strings)[0]; + // try std.testing.expectEqualStrings("loose-envify", entry.slice(package.string_buf)); + }, + } +} diff --git a/src/install/semver.zig b/src/install/semver.zig index a5ab6a9f6..f3b0ab134 100644 --- a/src/install/semver.zig +++ b/src/install/semver.zig @@ -2,14 +2,14 @@ usingnamespace @import("../global.zig"); const std = @import("std"); pub const ExternalString = extern struct { - off: u64 = 0, - len: u64 = 0, + off: u32 = 0, + len: u16 = 0, hash: u64 = 0, pub fn from(in: string) ExternalString { return ExternalString{ .off = 0, - .len = in.len, + .len = @truncate(u16, in.len), .hash = std.hash.Wyhash.hash(0, in), }; } @@ -18,8 +18,36 @@ pub const ExternalString = extern struct { std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr) and ((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len))); return ExternalString{ - .off = @ptrToInt(in.ptr) - @ptrToInt(buf.ptr), - .len = in.len, + .off = @truncate(u32, @ptrToInt(in.ptr) - @ptrToInt(buf.ptr)), + .len = @truncate(u16, in.len), + .hash = hash, + }; + } + + pub fn slice(this: ExternalString, buf: string) string { + return buf[this.off..][0..this.len]; + } +}; + +pub const BigExternalString = extern struct { + off: u32 = 0, + len: u32 = 0, + hash: u64 = 0, + + pub fn from(in: string) ExternalString { + return ExternalString{ + .off = 0, + .len = @truncate(u32, in.len), + .hash = std.hash.Wyhash.hash(0, in), + }; + } + + pub inline fn init(buf: string, in: string, hash: u64) ExternalString { + std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr) and ((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len))); + + return ExternalString{ + .off = @truncate(u32, @ptrToInt(in.ptr) - @ptrToInt(buf.ptr)), + .len = @truncate(u32, in.len), .hash = hash, }; } @@ -40,7 +68,7 @@ pub const SlicedString = struct { pub inline fn external(this: SlicedString) ExternalString { std.debug.assert(@ptrToInt(this.buf.ptr) <= @ptrToInt(this.slice.ptr) and ((@ptrToInt(this.slice.ptr) + this.slice.len) <= (@ptrToInt(this.buf.ptr) + this.buf.len))); - return ExternalString{ .off = @ptrToInt(this.slice.ptr) - @ptrToInt(this.buf.ptr), .len = this.slice.len, .hash = std.hash.Wyhash.hash(0, this.slice) }; + return ExternalString{ .off = @truncate(u32, @ptrToInt(this.slice.ptr) - @ptrToInt(this.buf.ptr)), .len = @truncate(u16, this.slice.len), .hash = std.hash.Wyhash.hash(0, this.slice) }; } pub inline fn sub(this: SlicedString, input: string) SlicedString { @@ -49,12 +77,13 @@ pub const SlicedString = struct { } }; +const RawType = void; pub const Version = extern struct { major: u32 = 0, minor: u32 = 0, patch: u32 = 0, tag: Tag = Tag{}, - raw: ExternalString = ExternalString{}, + // raw: RawType = RawType{}, const HashableVersion = extern struct { major: u32, minor: u32, patch: u32, pre: u64, build: u64 }; @@ -230,15 +259,17 @@ pub const Version = extern struct { .none => {}, .pre => { result.tag.pre = sliced_string.sub(input[start..i]).external(); + // a pre can contain multiple consecutive tags if (comptime Environment.isDebug) { - std.debug.assert(!strings.containsChar(result.tag.pre.slice(sliced_string.buf), '-')); + std.debug.assert(!strings.startsWithChar(result.tag.pre.slice(sliced_string.buf), '-')); } state = State.none; }, .build => { + // a build can contain multiple consecutive tags result.tag.build = sliced_string.sub(input[start..i]).external(); if (comptime Environment.isDebug) { - std.debug.assert(!strings.containsChar(result.tag.build.slice(sliced_string.buf), '-')); + std.debug.assert(!strings.startsWithChar(result.tag.build.slice(sliced_string.buf), '+')); } state = State.none; }, @@ -415,7 +446,11 @@ pub const Version = extern struct { } result.stopped_at = @intCast(u32, i); - result.version.raw = sliced_string.sub(input[0..i]).external(); + + if (comptime RawType != void) { + result.version.raw = sliced_string.sub(input[0..i]).external(); + } + return result; } @@ -487,15 +522,21 @@ pub const Range = struct { return Range{ .left = Comparator{ .op = Op.gte, - .version = Version{ .raw = version.raw }, + .version = Version{ + // .raw = version.raw + }, }, }; }, .minor => { - var lhs = Version{ .raw = version.raw }; + var lhs = Version{ + // .raw = version.raw + }; lhs.major = version.major + 1; - var rhs = Version{ .raw = version.raw }; + var rhs = Version{ + // .raw = version.raw + }; rhs.major = version.major; return Range{ @@ -518,8 +559,8 @@ pub const Range = struct { rhs.major = version.major; rhs.minor = version.minor; - rhs.raw = version.raw; - lhs.raw = version.raw; + // rhs.raw = version.raw; + // lhs.raw = version.raw; return Range{ .left = Comparator{ @@ -709,7 +750,7 @@ pub const Query = struct { .right = .{ .op = .lt, .version = Version{ - .raw = version.raw, + // .raw = version.raw, .major = major, .minor = minor, .patch = patch, @@ -1047,7 +1088,7 @@ const expect = struct { pub fn isRangeMatch(input: string, version_str: string) bool { var parsed = Version.parse(SlicedString.init(version_str, version_str), default_allocator); std.debug.assert(parsed.valid); - std.debug.assert(strings.eql(parsed.version.raw.slice(version_str), version_str)); + // std.debug.assert(strings.eql(parsed.version.raw.slice(version_str), version_str)); var list = Query.parse(default_allocator, input) catch |err| Output.panic("Test fail due to error {s}", .{@errorName(err)}); diff --git a/src/js_ast.zig b/src/js_ast.zig index 132ff1e05..7650a61a4 100644 --- a/src/js_ast.zig +++ b/src/js_ast.zig @@ -1058,6 +1058,11 @@ pub const E = struct { pub const Number = struct { value: f64, + pub inline fn toU64(self: Number) u64 { + @setRuntimeSafety(false); + return @floatToInt(u64, @maximum(@floor(self.value), 0)); + } + pub fn jsonStringify(self: *const Number, opts: anytype, o: anytype) !void { return try std.json.stringify(self.value, opts, o); } diff --git a/src/js_lexer_tables.zig b/src/js_lexer_tables.zig index b594fbc2e..b619b71a1 100644 --- a/src/js_lexer_tables.zig +++ b/src/js_lexer_tables.zig @@ -5,8 +5,7 @@ const expectString = std.testing.expectEqualStrings; const expect = std.testing.expect; const logger = @import("logger.zig"); const unicode = std.unicode; -const alloc = @import("alloc.zig"); - +const default_allocator = @import("./global.zig").default_allocator; pub const T = enum(u8) { t_end_of_file, t_syntax_error, diff --git a/src/lock.zig b/src/lock.zig index 16f28bc1e..23552f6ed 100644 --- a/src/lock.zig +++ b/src/lock.zig @@ -46,9 +46,7 @@ pub const Mutex = struct { // Give up spinning if the Mutex is contended. // This helps acquire() latency under micro-contention. // - // Only spin on x86 as other platforms are assumed to - // prioritize power efficiency over strict performance. - var spin: u8 = comptime if (is_x86) 100 else 0; + var spin: u8 = 100; while (spin > 0) : (spin -= 1) { std.atomic.spinLoopHint(); diff --git a/src/string_mutable.zig b/src/string_mutable.zig index 5dc153c99..acbecba94 100644 --- a/src/string_mutable.zig +++ b/src/string_mutable.zig @@ -9,6 +9,10 @@ pub const MutableString = struct { allocator: *std.mem.Allocator, list: std.ArrayListUnmanaged(u8), + pub fn init2048(allocator: *std.mem.Allocator) !MutableString { + return MutableString.init(allocator, 2048); + } + pub const Writer = std.io.Writer(*@This(), anyerror, MutableString.writeAll); pub fn writer(self: *MutableString) Writer { return Writer{ diff --git a/src/tagged_pointer.zig b/src/tagged_pointer.zig index ee1af487f..0b75b0f29 100644 --- a/src/tagged_pointer.zig +++ b/src/tagged_pointer.zig @@ -4,7 +4,7 @@ usingnamespace @import("./global.zig"); const TagSize = u15; const AddressableSize = u49; -const TaggedPointer = packed struct { +pub const TaggedPointer = packed struct { _ptr: AddressableSize, data: TagSize, diff --git a/src/thread_pool.zig b/src/thread_pool.zig new file mode 100644 index 000000000..38c102a77 --- /dev/null +++ b/src/thread_pool.zig @@ -0,0 +1,794 @@ +// Thank you @kprotty. +// https://github.com/kprotty/zap/blob/blog/src/thread_pool.zig + +const std = @import("std"); +const ThreadPool = @This(); +const Futex = @import("./futex.zig"); + +const assert = std.debug.assert; +const Atomic = std.atomic.Atomic; + +stack_size: u32, +max_threads: u32, +sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})), +idle_event: Event = .{}, +join_event: Event = .{}, +run_queue: Node.Queue = .{}, +threads: Atomic(?*Thread) = Atomic(?*Thread).init(null), + +const Sync = packed struct { + /// Tracks the number of threads not searching for Tasks + idle: u14 = 0, + /// Tracks the number of threads spawned + spawned: u14 = 0, + /// What you see is what you get + unused: bool = false, + /// Used to not miss notifications while state = waking + notified: bool = false, + /// The current state of the thread pool + state: enum(u2) { + /// A notification can be issued to wake up a sleeping as the "waking thread". + pending = 0, + /// The state was notifiied with a signal. A thread is woken up. + /// The first thread to transition to `waking` becomes the "waking thread". + signaled, + /// There is a "waking thread" among us. + /// No other thread should be woken up until the waking thread transitions the state. + waking, + /// The thread pool was terminated. Start decremented `spawned` so that it can be joined. + shutdown, + } = .pending, +}; + +/// Configuration options for the thread pool. +/// TODO: add CPU core affinity? +pub const Config = struct { + stack_size: u32 = (std.Thread.SpawnConfig{}).stack_size, + max_threads: u32, +}; + +/// Statically initialize the thread pool using the configuration. +pub fn init(config: Config) ThreadPool { + return .{ + .stack_size = std.math.max(1, config.stack_size), + .max_threads = std.math.max(1, config.max_threads), + }; +} + +/// Wait for a thread to call shutdown() on the thread pool and kill the worker threads. +pub fn deinit(self: *ThreadPool) void { + self.join(); + self.* = undefined; +} + +/// A Task represents the unit of Work / Job / Execution that the ThreadPool schedules. +/// The user provides a `callback` which is invoked when the *Task can run on a thread. +pub const Task = struct { + node: Node = .{}, + callback: fn (*Task) void, +}; + +/// An unordered collection of Tasks which can be submitted for scheduling as a group. +pub const Batch = struct { + len: usize = 0, + head: ?*Task = null, + tail: ?*Task = null, + + /// Create a batch from a single task. + pub fn from(task: *Task) Batch { + return Batch{ + .len = 1, + .head = task, + .tail = task, + }; + } + + /// Another batch into this one, taking ownership of its tasks. + pub fn push(self: *Batch, batch: Batch) void { + if (batch.len == 0) return; + if (self.len == 0) { + self.* = batch; + } else { + self.tail.?.node.next = if (batch.head) |h| &h.node else null; + self.tail = batch.tail; + self.len += batch.len; + } + } +}; + +/// Schedule a batch of tasks to be executed by some thread on the thread pool. +pub fn schedule(self: *ThreadPool, batch: Batch) void { + // Sanity check + if (batch.len == 0) { + return; + } + + // Extract out the Node's from the Tasks + var list = Node.List{ + .head = &batch.head.?.node, + .tail = &batch.tail.?.node, + }; + + // Push the task Nodes to the most approriate queue + if (Thread.current) |thread| { + thread.run_buffer.push(&list) catch thread.run_queue.push(list); + } else { + self.run_queue.push(list); + } + + // Try to notify a thread + const is_waking = false; + return self.notify(is_waking); +} + +inline fn notify(self: *ThreadPool, is_waking: bool) void { + // Fast path to check the Sync state to avoid calling into notifySlow(). + // If we're waking, then we need to update the state regardless + if (!is_waking) { + const sync = @bitCast(Sync, self.sync.load(.Monotonic)); + if (sync.notified) { + return; + } + } + + return self.notifySlow(is_waking); +} + +noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { + var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + while (sync.state != .shutdown) { + const can_wake = is_waking or (sync.state == .pending); + if (is_waking) { + assert(sync.state == .waking); + } + + var new_sync = sync; + new_sync.notified = true; + if (can_wake and sync.idle > 0) { // wake up an idle thread + new_sync.state = .signaled; + } else if (can_wake and sync.spawned < self.max_threads) { // spawn a new thread + new_sync.state = .signaled; + new_sync.spawned += 1; + } else if (is_waking) { // no other thread to pass on "waking" status + new_sync.state = .pending; + } else if (sync.notified) { // nothing to update + return; + } + + // Release barrier synchronizes with Acquire in wait() + // to ensure pushes to run queues happen before observing a posted notification. + sync = @bitCast(Sync, self.sync.tryCompareAndSwap( + @bitCast(u32, sync), + @bitCast(u32, new_sync), + .Release, + .Monotonic, + ) orelse { + // We signaled to notify an idle thread + if (can_wake and sync.idle > 0) { + return self.idle_event.notify(); + } + + // We signaled to spawn a new thread + if (can_wake and sync.spawned < self.max_threads) { + const spawn_config = std.Thread.SpawnConfig{ .stack_size = self.stack_size }; + const thread = std.Thread.spawn(spawn_config, Thread.run, .{self}) catch return self.unregister(null); + return thread.detach(); + } + + return; + }); + } +} + +noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { + var is_idle = false; + var is_waking = _is_waking; + var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + + while (true) { + if (sync.state == .shutdown) return error.Shutdown; + if (is_waking) assert(sync.state == .waking); + + // Consume a notification made by notify(). + if (sync.notified) { + var new_sync = sync; + new_sync.notified = false; + if (is_idle) + new_sync.idle -= 1; + if (sync.state == .signaled) + new_sync.state = .waking; + + // Acquire barrier synchronizes with notify() + // to ensure that pushes to run queue are observed after wait() returns. + sync = @bitCast(Sync, self.sync.tryCompareAndSwap( + @bitCast(u32, sync), + @bitCast(u32, new_sync), + .Acquire, + .Monotonic, + ) orelse { + return is_waking or (sync.state == .signaled); + }); + + // No notification to consume. + // Mark this thread as idle before sleeping on the idle_event. + } else if (!is_idle) { + var new_sync = sync; + new_sync.idle += 1; + if (is_waking) + new_sync.state = .pending; + + sync = @bitCast(Sync, self.sync.tryCompareAndSwap( + @bitCast(u32, sync), + @bitCast(u32, new_sync), + .Monotonic, + .Monotonic, + ) orelse { + is_waking = false; + is_idle = true; + continue; + }); + + // Wait for a signal by either notify() or shutdown() without wasting cpu cycles. + // TODO: Add I/O polling here. + } else { + self.idle_event.wait(); + sync = @bitCast(Sync, self.sync.load(.Monotonic)); + } + } +} + +/// Marks the thread pool as shutdown +pub noinline fn shutdown(self: *ThreadPool) void { + var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + while (sync.state != .shutdown) { + var new_sync = sync; + new_sync.notified = true; + new_sync.state = .shutdown; + new_sync.idle = 0; + + // Full barrier to synchronize with both wait() and notify() + sync = @bitCast(Sync, self.sync.tryCompareAndSwap( + @bitCast(u32, sync), + @bitCast(u32, new_sync), + .AcqRel, + .Monotonic, + ) orelse { + // Wake up any threads sleeping on the idle_event. + // TODO: I/O polling notification here. + if (sync.idle > 0) self.idle_event.shutdown(); + return; + }); + } +} + +fn register(noalias self: *ThreadPool, noalias thread: *Thread) void { + // Push the thread onto the threads stack in a lock-free manner. + var threads = self.threads.load(.Monotonic); + while (true) { + thread.next = threads; + threads = self.threads.tryCompareAndSwap( + threads, + thread, + .Release, + .Monotonic, + ) orelse break; + } +} + +fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { + // Un-spawn one thread, either due to a failed OS thread spawning or the thread is exitting. + const one_spawned = @bitCast(u32, Sync{ .spawned = 1 }); + const sync = @bitCast(Sync, self.sync.fetchSub(one_spawned, .Release)); + assert(sync.spawned > 0); + + // The last thread to exit must wake up the thread pool join()er + // who will start the chain to shutdown all the threads. + if (sync.state == .shutdown and sync.spawned == 1) { + self.join_event.notify(); + } + + // If this is a thread pool thread, wait for a shutdown signal by the thread pool join()er. + const thread = maybe_thread orelse return; + thread.join_event.wait(); + + // After receiving the shutdown signal, shutdown the next thread in the pool. + // We have to do that without touching the thread pool itself since it's memory is invalidated by now. + // So just follow our .next link. + const next_thread = thread.next orelse return; + next_thread.join_event.notify(); +} + +fn join(self: *ThreadPool) void { + // Wait for the thread pool to be shutdown() then for all threads to enter a joinable state + var sync = @bitCast(Sync, self.sync.load(.Monotonic)); + if (!(sync.state == .shutdown and sync.spawned == 0)) { + self.join_event.wait(); + sync = @bitCast(Sync, self.sync.load(.Monotonic)); + } + + assert(sync.state == .shutdown); + assert(sync.spawned == 0); + + // If there are threads, start off the chain sending it the shutdown signal. + // The thread receives the shutdown signal and sends it to the next thread, and the next.. + const thread = self.threads.load(.Acquire) orelse return; + thread.join_event.notify(); +} + +const Thread = struct { + next: ?*Thread = null, + target: ?*Thread = null, + join_event: Event = .{}, + run_queue: Node.Queue = .{}, + run_buffer: Node.Buffer = .{}, + + threadlocal var current: ?*Thread = null; + + /// Thread entry point which runs a worker for the ThreadPool + fn run(thread_pool: *ThreadPool) void { + var self = Thread{}; + current = &self; + + thread_pool.register(&self); + defer thread_pool.unregister(&self); + + var is_waking = false; + while (true) { + is_waking = thread_pool.wait(is_waking) catch return; + + while (self.pop(thread_pool)) |result| { + if (result.pushed or is_waking) + thread_pool.notify(is_waking); + is_waking = false; + + const task = @fieldParentPtr(Task, "node", result.node); + (task.callback)(task); + } + } + } + + /// Try to dequeue a Node/Task from the ThreadPool. + /// Spurious reports of dequeue() returning empty are allowed. + fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole { + // Check our local buffer first + if (self.run_buffer.pop()) |node| { + return Node.Buffer.Stole{ + .node = node, + .pushed = false, + }; + } + + // Then check our local queue + if (self.run_buffer.consume(&self.run_queue)) |stole| { + return stole; + } + + // Then the global queue + if (self.run_buffer.consume(&thread_pool.run_queue)) |stole| { + return stole; + } + + // TODO: add optimistic I/O polling here + + // Then try work stealing from other threads + var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned; + while (num_threads > 0) : (num_threads -= 1) { + // Traverse the stack of registered threads on the thread pool + const target = self.target orelse thread_pool.threads.load(.Acquire) orelse unreachable; + self.target = target.next; + + // Try to steal from their queue first to avoid contention (the target steal's from queue last). + if (self.run_buffer.consume(&target.run_queue)) |stole| { + return stole; + } + + // Skip stealing from the buffer if we're the target. + // We still steal from our own queue above given it may have just been locked the first time we tried. + if (target == self) { + continue; + } + + // Steal from the buffer of a remote thread as a last resort + if (self.run_buffer.steal(&target.run_buffer)) |stole| { + return stole; + } + } + + return null; + } +}; + +/// An event which stores 1 semaphore token and is multi-threaded safe. +/// The event can be shutdown(), waking up all wait()ing threads and +/// making subsequent wait()'s return immediately. +const Event = struct { + state: Atomic(u32) = Atomic(u32).init(EMPTY), + + const EMPTY = 0; + const WAITING = 1; + const NOTIFIED = 2; + const SHUTDOWN = 3; + + /// Wait for and consume a notification + /// or wait for the event to be shutdown entirely + noinline fn wait(self: *Event) void { + var acquire_with: u32 = EMPTY; + var state = self.state.load(.Monotonic); + + while (true) { + // If we're shutdown then exit early. + // Acquire barrier to ensure operations before the shutdown() are seen after the wait(). + // Shutdown is rare so it's better to have an Acquire barrier here instead of on CAS failure + load which are common. + if (state == SHUTDOWN) { + std.atomic.fence(.Acquire); + return; + } + + // Consume a notification when it pops up. + // Acquire barrier to ensure operations before the notify() appear after the wait(). + if (state == NOTIFIED) { + state = self.state.tryCompareAndSwap( + state, + acquire_with, + .Acquire, + .Monotonic, + ) orelse return; + continue; + } + + // There is no notification to consume, we should wait on the event by ensuring its WAITING. + if (state != WAITING) blk: { + state = self.state.tryCompareAndSwap( + state, + WAITING, + .Monotonic, + .Monotonic, + ) orelse break :blk; + continue; + } + + // Wait on the event until a notify() or shutdown(). + // If we wake up to a notification, we must acquire it with WAITING instead of EMPTY + // since there may be other threads sleeping on the Futex who haven't been woken up yet. + // + // Acquiring to WAITING will make the next notify() or shutdown() wake a sleeping futex thread + // who will either exit on SHUTDOWN or acquire with WAITING again, ensuring all threads are awoken. + // This unfortunately results in the last notify() or shutdown() doing an extra futex wake but that's fine. + Futex.wait(&self.state, WAITING, null) catch unreachable; + state = self.state.load(.Monotonic); + acquire_with = WAITING; + } + } + + /// Post a notification to the event if it doesn't have one already + /// then wake up a waiting thread if there is one as well. + fn notify(self: *Event) void { + return self.wake(NOTIFIED, 1); + } + + /// Marks the event as shutdown, making all future wait()'s return immediately. + /// Then wakes up any threads currently waiting on the Event. + fn shutdown(self: *Event) void { + return self.wake(SHUTDOWN, std.math.maxInt(u32)); + } + + fn wake(self: *Event, release_with: u32, wake_threads: u32) void { + // Update the Event to notifty it with the new `release_with` state (either NOTIFIED or SHUTDOWN). + // Release barrier to ensure any operations before this are this to happen before the wait() in the other threads. + const state = self.state.swap(release_with, .Release); + + // Only wake threads sleeping in futex if the state is WAITING. + // Avoids unnecessary wake ups. + if (state == WAITING) { + Futex.wake(&self.state, wake_threads); + } + } +}; + +/// Linked list intrusive memory node and lock-free data structures to operate with it +const Node = struct { + next: ?*Node = null, + + /// A linked list of Nodes + const List = struct { + head: *Node, + tail: *Node, + }; + + /// An unbounded multi-producer-(non blocking)-multi-consumer queue of Node pointers. + const Queue = struct { + stack: Atomic(usize) = Atomic(usize).init(0), + cache: ?*Node = null, + + const HAS_CACHE: usize = 0b01; + const IS_CONSUMING: usize = 0b10; + const PTR_MASK: usize = ~(HAS_CACHE | IS_CONSUMING); + + comptime { + assert(@alignOf(Node) >= ((IS_CONSUMING | HAS_CACHE) + 1)); + } + + fn push(noalias self: *Queue, list: List) void { + var stack = self.stack.load(.Monotonic); + while (true) { + // Attach the list to the stack (pt. 1) + list.tail.next = @intToPtr(?*Node, stack & PTR_MASK); + + // Update the stack with the list (pt. 2). + // Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer. + var new_stack = @ptrToInt(list.head); + assert(new_stack & ~PTR_MASK == 0); + new_stack |= (stack & ~PTR_MASK); + + // Push to the stack with a release barrier for the consumer to see the proper list links. + stack = self.stack.tryCompareAndSwap( + stack, + new_stack, + .Release, + .Monotonic, + ) orelse break; + } + } + + fn tryAcquireConsumer(self: *Queue) error{ Empty, Contended }!?*Node { + var stack = self.stack.load(.Monotonic); + while (true) { + if (stack & IS_CONSUMING != 0) + return error.Contended; // The queue already has a consumer. + if (stack & (HAS_CACHE | PTR_MASK) == 0) + return error.Empty; // The queue is empty when there's nothing cached and nothing in the stack. + + // When we acquire the consumer, also consume the pushed stack if the cache is empty. + var new_stack = stack | HAS_CACHE | IS_CONSUMING; + if (stack & HAS_CACHE == 0) { + assert(stack & PTR_MASK != 0); + new_stack &= ~PTR_MASK; + } + + // Acquire barrier on getting the consumer to see cache/Node updates done by previous consumers + // and to ensure our cache/Node updates in pop() happen after that of previous consumers. + stack = self.stack.tryCompareAndSwap( + stack, + new_stack, + .Acquire, + .Monotonic, + ) orelse return self.cache orelse @intToPtr(*Node, stack & PTR_MASK); + } + } + + fn releaseConsumer(noalias self: *Queue, noalias consumer: ?*Node) void { + // Stop consuming and remove the HAS_CACHE bit as well if the consumer's cache is empty. + // When HAS_CACHE bit is zeroed, the next consumer will acquire the pushed stack nodes. + var remove = IS_CONSUMING; + if (consumer == null) + remove |= HAS_CACHE; + + // Release the consumer with a release barrier to ensure cache/node accesses + // happen before the consumer was released and before the next consumer starts using the cache. + self.cache = consumer; + const stack = self.stack.fetchSub(remove, .Release); + assert(stack & remove != 0); + } + + fn pop(noalias self: *Queue, noalias consumer_ref: *?*Node) ?*Node { + // Check the consumer cache (fast path) + if (consumer_ref.*) |node| { + consumer_ref.* = node.next; + return node; + } + + // Load the stack to see if there was anything pushed that we could grab. + var stack = self.stack.load(.Monotonic); + assert(stack & IS_CONSUMING != 0); + if (stack & PTR_MASK == 0) { + return null; + } + + // Nodes have been pushed to the stack, grab then with an Acquire barrier to see the Node links. + stack = self.stack.swap(HAS_CACHE | IS_CONSUMING, .Acquire); + assert(stack & IS_CONSUMING != 0); + assert(stack & PTR_MASK != 0); + + const node = @intToPtr(*Node, stack & PTR_MASK); + consumer_ref.* = node.next; + return node; + } + }; + + /// A bounded single-producer, multi-consumer ring buffer for node pointers. + const Buffer = struct { + head: Atomic(Index) = Atomic(Index).init(0), + tail: Atomic(Index) = Atomic(Index).init(0), + array: [capacity]Atomic(*Node) = undefined, + + const Index = u32; + const capacity = 256; // Appears to be a pretty good trade-off in space vs contended throughput + comptime { + assert(std.math.maxInt(Index) >= capacity); + assert(std.math.isPowerOfTwo(capacity)); + } + + fn push(noalias self: *Buffer, noalias list: *List) error{Overflow}!void { + var head = self.head.load(.Monotonic); + var tail = self.tail.loadUnchecked(); // we're the only thread that can change this + + while (true) { + var size = tail -% head; + assert(size <= capacity); + + // Push nodes from the list to the buffer if it's not empty.. + if (size < capacity) { + var nodes: ?*Node = list.head; + while (size < capacity) : (size += 1) { + const node = nodes orelse break; + nodes = node.next; + + // Array written atomically with weakest ordering since it could be getting atomically read by steal(). + self.array[tail % capacity].store(node, .Unordered); + tail +%= 1; + } + + // Release barrier synchronizes with Acquire loads for steal()ers to see the array writes. + self.tail.store(tail, .Release); + + // Update the list with the nodes we pushed to the buffer and try again if there's more. + list.head = nodes orelse return; + std.atomic.spinLoopHint(); + head = self.head.load(.Monotonic); + continue; + } + + // Try to steal/overflow half of the tasks in the buffer to make room for future push()es. + // Migrating half amortizes the cost of stealing while requiring future pops to still use the buffer. + // Acquire barrier to ensure the linked list creation after the steal only happens after we succesfully steal. + var migrate = size / 2; + head = self.head.tryCompareAndSwap( + head, + head +% migrate, + .Acquire, + .Monotonic, + ) orelse { + // Link the migrated Nodes together + const first = self.array[head % capacity].loadUnchecked(); + while (migrate > 0) : (migrate -= 1) { + const prev = self.array[head % capacity].loadUnchecked(); + head +%= 1; + prev.next = self.array[head % capacity].loadUnchecked(); + } + + // Append the list that was supposed to be pushed to the end of the migrated Nodes + const last = self.array[(head -% 1) % capacity].loadUnchecked(); + last.next = list.head; + list.tail.next = null; + + // Return the migrated nodes + the original list as overflowed + list.head = first; + return error.Overflow; + }; + } + } + + fn pop(self: *Buffer) ?*Node { + var head = self.head.load(.Monotonic); + var tail = self.tail.loadUnchecked(); // we're the only thread that can change this + + while (true) { + // Quick sanity check and return null when not empty + var size = tail -% head; + assert(size <= capacity); + if (size == 0) { + return null; + } + + // Dequeue with an acquire barrier to ensure any writes done to the Node + // only happen after we succesfully claim it from the array. + head = self.head.tryCompareAndSwap( + head, + head +% 1, + .Acquire, + .Monotonic, + ) orelse return self.array[head % capacity].loadUnchecked(); + } + } + + const Stole = struct { + node: *Node, + pushed: bool, + }; + + fn consume(noalias self: *Buffer, noalias queue: *Queue) ?Stole { + var consumer = queue.tryAcquireConsumer() catch return null; + defer queue.releaseConsumer(consumer); + + const head = self.head.load(.Monotonic); + const tail = self.tail.loadUnchecked(); // we're the only thread that can change this + + const size = tail -% head; + assert(size <= capacity); + assert(size == 0); // we should only be consuming if our array is empty + + // Pop nodes from the queue and push them to our array. + // Atomic stores to the array as steal() threads may be atomically reading from it. + var pushed: Index = 0; + while (pushed < capacity) : (pushed += 1) { + const node = queue.pop(&consumer) orelse break; + self.array[(tail +% pushed) % capacity].store(node, .Unordered); + } + + // We will be returning one node that we stole from the queue. + // Get an extra, and if that's not possible, take one from our array. + const node = queue.pop(&consumer) orelse blk: { + if (pushed == 0) return null; + pushed -= 1; + break :blk self.array[(tail +% pushed) % capacity].loadUnchecked(); + }; + + // Update the array tail with the nodes we pushed to it. + // Release barrier to synchronize with Acquire barrier in steal()'s to see the written array Nodes. + if (pushed > 0) self.tail.store(tail +% pushed, .Release); + return Stole{ + .node = node, + .pushed = pushed > 0, + }; + } + + fn steal(noalias self: *Buffer, noalias buffer: *Buffer) ?Stole { + const head = self.head.load(.Monotonic); + const tail = self.tail.loadUnchecked(); // we're the only thread that can change this + + const size = tail -% head; + assert(size <= capacity); + assert(size == 0); // we should only be stealing if our array is empty + + while (true) : (std.atomic.spinLoopHint()) { + const buffer_head = buffer.head.load(.Acquire); + const buffer_tail = buffer.tail.load(.Acquire); + + // Overly large size indicates the the tail was updated a lot after the head was loaded. + // Reload both and try again. + const buffer_size = buffer_tail -% buffer_head; + if (buffer_size > capacity) { + continue; + } + + // Try to steal half (divCeil) to amortize the cost of stealing from other threads. + const steal_size = buffer_size - (buffer_size / 2); + if (steal_size == 0) { + return null; + } + + // Copy the nodes we will steal from the target's array to our own. + // Atomically load from the target buffer array as it may be pushing and atomically storing to it. + // Atomic store to our array as other steal() threads may be atomically loading from it as above. + var i: Index = 0; + while (i < steal_size) : (i += 1) { + const node = buffer.array[(buffer_head +% i) % capacity].load(.Unordered); + self.array[(tail +% i) % capacity].store(node, .Unordered); + } + + // Try to commit the steal from the target buffer using: + // - an Acquire barrier to ensure that we only interact with the stolen Nodes after the steal was committed. + // - a Release barrier to ensure that the Nodes are copied above prior to the committing of the steal + // because if they're copied after the steal, the could be getting rewritten by the target's push(). + _ = buffer.head.compareAndSwap( + buffer_head, + buffer_head +% steal_size, + .AcqRel, + .Monotonic, + ) orelse { + // Pop one from the nodes we stole as we'll be returning it + const pushed = steal_size - 1; + const node = self.array[(tail +% pushed) % capacity].loadUnchecked(); + + // Update the array tail with the nodes we pushed to it. + // Release barrier to synchronize with Acquire barrier in steal()'s to see the written array Nodes. + if (pushed > 0) self.tail.store(tail +% pushed, .Release); + return Stole{ + .node = node, + .pushed = pushed > 0, + }; + }; + } + } + }; +}; |