aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/schema.peechy7
-rw-r--r--src/cli.zig24
-rw-r--r--src/cli/install_command.zig4
-rw-r--r--src/fs.zig11
-rw-r--r--src/http_client.zig41
-rw-r--r--src/install/install.zig1113
-rw-r--r--src/install/semver.zig75
-rw-r--r--src/js_ast.zig5
-rw-r--r--src/js_lexer_tables.zig3
-rw-r--r--src/lock.zig4
-rw-r--r--src/string_mutable.zig4
-rw-r--r--src/tagged_pointer.zig2
-rw-r--r--src/thread_pool.zig794
13 files changed, 1864 insertions, 223 deletions
diff --git a/src/api/schema.peechy b/src/api/schema.peechy
index 035a7fa26..150bac007 100644
--- a/src/api/schema.peechy
+++ b/src/api/schema.peechy
@@ -538,16 +538,15 @@ struct WebsocketMessageManifestFailure {
}
message SemverQualifier {
- string pre = 1;
- string build = 2;
+ StringPointer pre = 1;
+ StringPointer build = 2;
}
struct Semver {
uint32 major;
uint32 minor;
uint32 patch;
- StringPointer raw;
- SemverQualifier[] qualifiers;
+ qualifier SemverQualifier;
}
enum NPMPackageDataKind {
diff --git a/src/cli.zig b/src/cli.zig
index 77a110fab..44c6159ca 100644
--- a/src/cli.zig
+++ b/src/cli.zig
@@ -36,6 +36,7 @@ const CreateCommand = @import("./cli/create_command.zig").CreateCommand;
const CreateListExamplesCommand = @import("./cli/create_command.zig").CreateListExamplesCommand;
const RunCommand = @import("./cli/run_command.zig").RunCommand;
const UpgradeCommand = @import("./cli/upgrade_command.zig").UpgradeCommand;
+const InstallCommand = @import("./cli/install_command.zig").InstallCommand;
const InstallCompletionsCommand = @import("./cli/install_completions_command.zig").InstallCompletionsCommand;
const ShellCompletions = @import("./cli/shell_completions.zig");
var start_time: i128 = undefined;
@@ -463,6 +464,7 @@ const HelpCommand = struct {
\\> <r> <b><green>dev <r><d> ./a.ts ./b.jsx<r> Start a Bun Dev Server
\\> <r> <b><magenta>bun <r><d> ./a.ts ./b.jsx<r> Bundle dependencies of input files into a <r><magenta>.bun<r>
\\> <r> <b><green>run <r><d> test <r> Run a package.json script or executable<r>
+ \\> <r> <b><green>install<r> Install dependencies for a package.json<r>
\\> <r> <b><cyan>create <r><d>next ./app<r> Start a new project from a template <d>(shorthand: c)<r>
\\> <r> <b><blue>upgrade <r> Get the latest version of Bun
\\> <r> <b><d>completions<r> Install shell completions for tab-completion
@@ -485,6 +487,7 @@ const HelpCommand = struct {
\\> <r> <green>run <r><d> ./a.ts <r> Run a JavaScript-like file with Bun.js
\\> <r> <b><blue>discord<r> Open Bun's Discord server
\\> <r> <b><blue>upgrade <r> Get the latest version of Bun
+ \\> <r> <b><green>install<r> Install dependencies for a package.json<r>
\\> <r> <b><d>help <r> Print this help menu
\\
;
@@ -589,6 +592,7 @@ pub const Command = struct {
RootCommandMatcher.case("upgrade") => .UpgradeCommand,
RootCommandMatcher.case("completions") => .InstallCompletionsCommand,
RootCommandMatcher.case("getcompletes") => .GetCompletionsCommand,
+ RootCommandMatcher.case("i"), RootCommandMatcher.case("install") => .InstallCommand,
RootCommandMatcher.case("c"), RootCommandMatcher.case("create") => .CreateCommand,
RootCommandMatcher.case("b"), RootCommandMatcher.case("build") => .BuildCommand,
@@ -617,6 +621,7 @@ pub const Command = struct {
// "build",
"run",
"dev",
+ "install",
"create",
"bun",
"upgrade",
@@ -658,6 +663,10 @@ pub const Command = struct {
try InstallCompletionsCommand.exec(allocator);
return;
},
+ .InstallCommand => {
+ try InstallCommand.exec(allocator);
+ return;
+ },
.GetCompletionsCommand => {
const ctx = try Command.Context.create(allocator, log, .GetCompletionsCommand);
var filter = ctx.positionals;
@@ -770,17 +779,18 @@ pub const Command = struct {
}
pub const Tag = enum {
- InitCommand,
+ AutoCommand,
+ BuildCommand,
BunCommand,
+ CreateCommand,
DevCommand,
DiscordCommand,
- BuildCommand,
- RunCommand,
- AutoCommand,
+ GetCompletionsCommand,
HelpCommand,
- CreateCommand,
- UpgradeCommand,
+ InitCommand,
+ InstallCommand,
InstallCompletionsCommand,
- GetCompletionsCommand,
+ RunCommand,
+ UpgradeCommand,
};
};
diff --git a/src/cli/install_command.zig b/src/cli/install_command.zig
index cd2271f16..f3d25e055 100644
--- a/src/cli/install_command.zig
+++ b/src/cli/install_command.zig
@@ -34,9 +34,9 @@ const NpmArgs = struct {
const yarn_commands: []u64 = @import("./list-of-yarn-commands.zig").all_yarn_commands;
const ShellCompletions = @import("./shell_completions.zig");
-
+const PackageManager = @import("../install/install.zig").PackageManager;
pub const InstallCommand = struct {
pub fn exec(ctx: Command.Context) !void {
-
+ try PackageManager.install(ctx);
}
};
diff --git a/src/fs.zig b/src/fs.zig
index 9c8a0b6d3..e2faa8697 100644
--- a/src/fs.zig
+++ b/src/fs.zig
@@ -998,10 +998,11 @@ pub const FileSystem = struct {
// doNotCacheEntries bool
};
- pub const Implementation = switch (build_target) {
- .wasi, .native => RealFS,
- .wasm => WasmFS,
- };
+ pub const Implementation = RealFS;
+ // pub const Implementation = switch (build_target) {
+ // .wasi, .native => RealFS,
+ // .wasm => WasmFS,
+ // };
};
pub const Directory = struct { path: Path, contents: []string };
@@ -1260,5 +1261,3 @@ test "PathName.init" {
try std.testing.expectEqualStrings(res.base, "file");
try std.testing.expectEqualStrings(res.ext, ".ext");
}
-
-test {}
diff --git a/src/http_client.zig b/src/http_client.zig
index 560bd0797..f75f9a340 100644
--- a/src/http_client.zig
+++ b/src/http_client.zig
@@ -12,6 +12,7 @@ const HTTPClient = @This();
const SOCKET_FLAGS = os.SOCK_CLOEXEC;
const S2n = @import("./s2n.zig");
const Zlib = @import("./zlib.zig");
+const StringBuilder = @import("./string_builder.zig");
fn writeRequest(
comptime Writer: type,
@@ -72,7 +73,7 @@ const Socket = std.x.os.Socket;
const os = std.os;
// lowercase hash header names so that we can be sure
-fn hashHeaderName(name: string) u64 {
+pub fn hashHeaderName(name: string) u64 {
var hasher = std.hash.Wyhash.init(0);
var remain: string = name;
var buf: [32]u8 = undefined;
@@ -84,6 +85,7 @@ fn hashHeaderName(name: string) u64 {
hasher.update(strings.copyLowercase(std.mem.span(remain[0..end]), buf_slice));
remain = remain[end..];
}
+
return hasher.final();
}
@@ -128,6 +130,43 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string {
return this.header_buf[ptr.offset..][0..ptr.length];
}
+pub const HeaderBuilder = struct {
+ content: StringBuilder = StringBuilder{},
+ header_count: u64 = 0,
+ entries: Headers.Entries = Headers.Entries{},
+
+ pub fn count(this: *HeaderBuilder, name: string, value: string) void {
+ this.header_count += 1;
+ this.content.count(name);
+ this.content.count(value);
+ }
+
+ pub fn allocate(this: *HeaderBuilder, allocator: *std.mem.Allocator) !void {
+ try this.content.allocate(allocator);
+ try this.entries.ensureTotalCapacity(allocator, this.header_count);
+ }
+ pub fn append(this: *HeaderBuilder, name: string, value: string) void {
+ const name_ptr = Api.StringPointer{
+ .offset = @truncate(u32, this.content.len),
+ .length = @truncate(u32, name.len),
+ };
+
+ _ = this.content.append(name);
+
+ const value_ptr = Api.StringPointer{
+ .offset = @truncate(u32, this.content.len),
+ .length = @truncate(u32, value.len),
+ };
+ _ = this.content.append(value);
+ this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr });
+ }
+
+ pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void {
+ client.header_entries = this.entries;
+ client.header_buf = this.content.ptr.?[0..this.content.len];
+ }
+};
+
threadlocal var server_name_buf: [1024]u8 = undefined;
pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request {
diff --git a/src/install/install.zig b/src/install/install.zig
index ebb1b6344..235288d7a 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -8,7 +8,7 @@ const options = @import("../options.zig");
const js_parser = @import("../js_parser.zig");
const json_parser = @import("../json_parser.zig");
const js_printer = @import("../js_printer.zig");
-const js_ast = @import("../js_ast.zig");
+const JSAst = @import("../js_ast.zig");
const linker = @import("../linker.zig");
usingnamespace @import("../ast/base.zig");
usingnamespace @import("../defines.zig");
@@ -24,7 +24,9 @@ const DotEnv = @import("../env_loader.zig");
const which = @import("../which.zig").which;
const Run = @import("../bun_js.zig").Run;
const NewBunQueue = @import("../bun_queue.zig").NewBunQueue;
-
+const HTTPClient = @import("../http_client.zig");
+const Fs = @import("../fs.zig");
+const Lock = @import("../lock.zig").Lock;
var path_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
var path_buf2: [std.fs.MAX_PATH_BYTES]u8 = undefined;
const URL = @import("../query_string_map.zig").URL;
@@ -44,45 +46,73 @@ const ExternalString = Semver.ExternalString;
const StringBuilder = @import("../string_builder.zig");
const SlicedString = Semver.SlicedString;
-pub const ExternalStringMap = extern struct {
- name: []const ExternalString = &[_]ExternalString{},
+pub fn ExternalSlice(comptime Type: type) type {
+ return extern struct {
+ const Slice = @This();
- value: []const ExternalString = &[_]ExternalString{},
-};
+ off: u32 = 0,
+ len: u32 = 0,
+
+ pub inline fn get(this: Slice, in: []const Type) []const Type {
+ return in[this.off .. this.off + this.len];
+ }
+
+ pub inline fn mut(this: Slice, in: []Type) []Type {
+ return in[this.off .. this.off + this.len];
+ }
+
+ pub fn init(buf: []const Type, in: []const Type) Slice {
+ // if (comptime isDebug or isTest) {
+ // std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr));
+ // std.debug.assert((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len));
+ // }
+
+ return Slice{
+ .off = @truncate(u32, (@ptrToInt(in.ptr) - @ptrToInt(buf.ptr)) / @sizeOf(Type)),
+ .len = @truncate(u32, in.len),
+ };
+ }
+ };
+}
+
+const PackageID = u32;
+const invalid_package_id = std.math.maxInt(PackageID);
+
+const ExternalStringList = ExternalSlice(ExternalString);
+const VersionSlice = ExternalSlice(Semver.Version);
-pub const ExternalDependencyMap = extern struct {
- len: u32,
- versions: []const Semver.Version = &[_]Semver.Version{},
- dependencies: []const ExternalStringMap = &[_]ExternalStringMap{},
+pub const ExternalStringMap = extern struct {
+ name: ExternalStringList = ExternalStringList{},
+ value: ExternalStringList = ExternalStringList{},
};
pub const Dependency = struct {
name: string,
name_hash: u32,
- version: Version,
+ request: DependencyRequest,
pub const Version = union(Tag) {
- pub const Tag = enum {
+ pub const Tag = enum(u8) {
/// Semver range
- npm,
+ npm = 1,
/// NPM dist tag, e.g. "latest"
- dist_tag,
+ dist_tag = 2,
/// URI to a .tgz or .tar.gz
- tarball,
+ tarball = 3,
/// Local folder
- folder,
+ folder = 4,
/// TODO:
- symlink,
+ symlink = 5,
/// TODO:
- workspace,
+ workspace = 6,
/// TODO:
- git,
+ git = 7,
/// TODO:
- github,
+ github = 8,
pub fn isGitHubRepoPath(dependency: string) bool {
var slash_count: u8 = 0;
@@ -339,6 +369,8 @@ pub const Package = struct {
peer_dependencies: Dependency.List = Dependency.List{},
optional_dependencies: Dependency.List = Dependency.List{},
+ npm_count: u32 = 0,
+
pub const Features = struct {
optional_dependencies: bool = false,
dev_dependencies: bool = false,
@@ -349,8 +381,10 @@ pub const Package = struct {
fn parseDependencyList(
allocator: *std.mem.Allocator,
+ package_id: DependencyRequest,
log: *logger.Log,
- expr: js_ast.Expr,
+ npm_count_: *u32,
+ expr: JSAst.Expr,
) ?Dependency.List {
if (expr.data != .e_object) return null;
@@ -360,24 +394,26 @@ pub const Package = struct {
var dependencies = Dependency.List{};
dependencies.ensureTotalCapacity(allocator, properties.len) catch @panic("OOM while parsing dependencies?");
+ var npm_count = npm_count_.*;
+ defer npm_count_.* = npm_count;
for (properties) |prop| {
const name = prop.key.?.asString(allocator) orelse continue;
const value = prop.value.?.asString(allocator) orelse continue;
- if (Dependency.parse(allocator, value, log)) |version| {
- const dependency = Dependency{
- .name = name,
- .name_hash = std.hash.Murmur2_32.hash(name),
- .version = version,
- };
-
- dependencies.appendAssumeCapacity(dependency);
- }
+ const version = Dependency.parse(allocator, value, log) orelse continue;
+ const dependency = Dependency{
+ .name = name,
+ .name_hash = @truncate(u32, std.hash.Wyhash.hash(0, name)),
+ .request = DependencyRequest{ .version = version, .from = package_id },
+ };
+ npm_count += @as(u32, @boolToInt(@enumToInt(dependency.version) > @enumToInt(Version.Tag.npm)));
+ dependencies.appendAssumeCapacity(dependency);
}
return dependencies;
}
pub fn parse(
+ package_id: PackageID,
allocator: *std.mem.Allocator,
log: *logger.Log,
source: logger.Source,
@@ -416,24 +452,24 @@ pub const Package = struct {
}
if (json.asProperty("dependencies")) |dependencies_q| {
- package.dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{};
+ package.dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{};
}
if (comptime features.dev_dependencies) {
if (json.asProperty("devDependencies")) |dependencies_q| {
- package.dev_dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{};
+ package.dev_dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{};
}
}
if (comptime features.optional_dependencies) {
if (json.asProperty("optionalDependencies")) |dependencies_q| {
- package.optional_dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{};
+ package.optional_dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{};
}
}
if (comptime features.peer_dependencies) {
if (json.asProperty("peerDependencies")) |dependencies_q| {
- package.peer_dependencies = parseDependencyList(allocator, dependencies_q.expr) orelse Dependency.List{};
+ package.peer_dependencies = parseDependencyList(allocator, package_id, log, &package.npm_count, dependencies_q.expr) orelse Dependency.List{};
}
}
@@ -443,74 +479,308 @@ pub const Package = struct {
}
};
+fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: *std.mem.Allocator) anyerror!Type)) type {
+ return struct {
+ const LinkedList = std.SinglyLinkedList(Type);
+ var list: LinkedList = undefined;
+ var loaded: bool = false;
+ var lock: Lock = undefined;
+ pub fn get(allocator: *std.mem.Allocator) *LinkedList.Node {
+ if (loaded) {
+ lock.lock();
+ defer lock.unlock();
+ if (list.popFirst()) |node| {
+ node.data.reset();
+ return node;
+ }
+ }
+
+ var new_node = allocator.create(LinkedList.Node) catch unreachable;
+ new_node.* = LinkedList.Node{
+ .data = Init(
+ allocator,
+ ) catch unreachable,
+ };
+
+ return new_node;
+ }
+
+ pub fn release(node: *LinkedList.Node) void {
+ if (loaded) {
+ lock.lock();
+ defer lock.unlock();
+ list.prepend(node);
+ return;
+ }
+
+ list = LinkedList{ .first = node };
+ loaded = true;
+ lock = Lock.init();
+ }
+ };
+}
+
const Npm = struct {
pub const Registry = struct {
- url: URL,
+ url: URL = URL.parse("https://registry.npmjs.org/"),
+ const JSONPool = ObjectPool(MutableString, MutableString.init2048);
+
+ const default_headers_buf: string = "Acceptapplication/vnd.npm.install-v1+json";
+
+ const PackageVersionResponse = union(Tag) {
+ pub const Tag = enum {
+ cached,
+ fresh,
+ not_found,
+ };
+
+ cached: void,
+ fresh: PackageManifest,
+ not_found: void,
+ };
+
+ pub fn getPackageMetadata(
+ this: *Registry,
+ allocator: *std.mem.Allocator,
+ log: *logger.Log,
+ package_name: string,
+ last_modified: []const u8,
+ etag: []const u8,
+ ) !PackageVersionResponse {
+ var url_buf = try std.fmt.allocPrint(allocator, "{s}://{s}/{s}", .{ this.url.displayProtocol(), this.url.hostname, package_name });
+ defer allocator.free(url_buf);
+
+ var json_pooled = JSONPool.get(allocator);
+ defer JSONPool.release(json_pooled);
+
+ var header_builder = HTTPClient.HeaderBuilder{};
+
+ if (last_modified.len != 0) {
+ header_builder.count("If-Modified-Since", last_modified);
+ }
+
+ if (etag.len != 0) {
+ header_builder.count("If-None-Match", etag);
+ }
+
+ if (header_builder.content.len > 0) {
+ header_builder.count("Accept", "application/vnd.npm.install-v1+json");
+
+ if (last_modified.len != 0) {
+ header_builder.append("If-Modified-Since", last_modified);
+ }
+
+ if (etag.len != 0) {
+ header_builder.append("If-None-Match", etag);
+ }
+
+ header_builder.append("Accept", "application/vnd.npm.install-v1+json");
+ } else {
+ try header_builder.entries.append(
+ allocator,
+ .{
+ .name = .{ .offset = 0, .length = @truncate(u32, "Accept".len) },
+ .value = .{ .offset = "Accept".len, .length = @truncate(u32, default_headers_buf.len - "Accept".len) },
+ },
+ );
+ header_builder.header_count = 1;
+ header_builder.content = StringBuilder{ .ptr = @intToPtr([*]u8, @ptrToInt(std.mem.span(default_headers_buf).ptr)), .len = default_headers_buf.len, .cap = default_headers_buf.len };
+ }
+
+ var client = HTTPClient.init(allocator, .GET, URL.parse(url_buf), header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len]);
+
+ var response = try client.send("", &json_pooled.data);
+
+ switch (response.status_code) {
+ 429 => return error.TooManyRequests,
+ 404 => return PackageVersionResponse{ .not_found = .{} },
+ 500...599 => return error.HTTPInternalServerError,
+ 304 => return PackageVersionResponse{ .cached = .{} },
+ else => {},
+ }
+
+ var newly_last_modified: string = "";
+ var new_etag: string = "";
+ for (response.headers) |header| {
+ if (!(header.name.len == "last-modified".len or header.name.len == "etag".len)) continue;
+
+ const hashed = HTTPClient.hashHeaderName(header.name);
+
+ switch (hashed) {
+ HTTPClient.hashHeaderName("last-modified") => {
+ newly_last_modified = header.value;
+ },
+ HTTPClient.hashHeaderName("etag") => {
+ new_etag = header.value;
+ },
+ else => {},
+ }
+ }
+
+ var json_body = json_pooled.data.toOwnedSliceLeaky();
+
+ JSAst.Expr.Data.Store.create(default_allocator);
+ JSAst.Stmt.Data.Store.create(default_allocator);
+ defer {
+ JSAst.Expr.Data.Store.reset();
+ JSAst.Stmt.Data.Store.reset();
+ }
+
+ if (try PackageManifest.parse(allocator, log, json_body, package_name, newly_last_modified, new_etag, 300)) |package| {
+ return PackageVersionResponse{ .fresh = package };
+ }
+
+ return error.PackageFailedToParse;
+ }
+ };
+
+ const VersionMap = std.ArrayHashMapUnmanaged(Semver.Version, PackageVersion, Semver.Version.HashContext, false);
+ const DistTagMap = extern struct {
+ tags: ExternalStringList = ExternalStringList{},
+ versions: VersionSlice = VersionSlice{},
+ };
+
+ const PackageVersionList = ExternalSlice(PackageVersion);
+ const ExternVersionMap = extern struct {
+ keys: VersionSlice = VersionSlice{},
+ values: PackageVersionList = PackageVersionList{},
+
+ pub fn findKeyIndex(this: ExternVersionMap, buf: []const Semver.Version, find: Semver.Version) ?u32 {
+ for (this.keys.get(buf)) |key, i| {
+ if (key.eql(find)) {
+ return @truncate(u32, i);
+ }
+ }
+
+ return null;
+ }
};
- const ResolvedPackage = struct {
+ // ~384 bytes each?
+ pub const PackageVersion = extern struct {
+ // 32 bytes each
+ dependencies: ExternalStringMap = ExternalStringMap{},
+ optional_dependencies: ExternalStringMap = ExternalStringMap{},
+ bins: ExternalStringMap = ExternalStringMap{},
+
+ // 24 bytes each
+ integrity: ExternalString = ExternalString{},
+ shasum: ExternalString = ExternalString{},
+ bin_dir: ExternalString = ExternalString{},
+ man_dir: ExternalString = ExternalString{},
+
+ unpacked_size: u64 = 0,
+ file_count: u64 = 0,
+
+ os_matches: bool = true,
+ cpu_matches: bool = true,
+
+ loaded_dependencies: ?*Dependency.List = null,
+ loaded_optional_dependencies: ?*Dependency.List = null,
+ };
+
+ const BigExternalString = Semver.BigExternalString;
+
+ /// Efficient, serializable NPM package metadata
+ /// All the "content" is stored in three separate arrays,
+ /// Everything inside here is just pointers to one of the three arrays
+ const NpmPackage = extern struct {
name: ExternalString = ExternalString{},
- releases: VersionMap,
- prereleases: VersionMap,
+ releases: ExternVersionMap = ExternVersionMap{},
+ prereleases: ExternVersionMap = ExternVersionMap{},
dist_tags: DistTagMap = DistTagMap{},
+ /// "modified" in the JSON
+ modified: ExternalString = ExternalString{},
+
+ /// HTTP response headers
last_modified: ExternalString = ExternalString{},
etag: ExternalString = ExternalString{},
+ public_max_age: u32 = 0,
+
+ string_buf: BigExternalString = BigExternalString{},
+ versions_buf: VersionSlice = VersionSlice{},
+ string_lists_buf: ExternalStringList = ExternalStringList{},
+ };
+
+ const PackageManifest = struct {
+ name: string,
+
+ pkg: NpmPackage = NpmPackage{},
- string_buf: []u8,
+ string_buf: []const u8 = &[_]u8{},
+ versions: []const Semver.Version = &[_]Semver.Version{},
+ external_strings: []const ExternalString = &[_]ExternalString{},
+ package_versions: []PackageVersion = &[_]PackageVersion{},
- pub fn findBestVersion(this: *const ResolvedPackage, group: Semver.Query.Group) ?*VersionedPackage {
+ pub fn str(self: *const PackageManifest, external: ExternalString) string {
+ return external.slice(self.string_buf);
+ }
+
+ pub fn reportSize(this: *const PackageManifest) void {
+ const versions = std.mem.sliceAsBytes(this.versions);
+ const external_strings = std.mem.sliceAsBytes(this.external_strings);
+ const package_versions = std.mem.sliceAsBytes(this.package_versions);
+ const string_buf = std.mem.sliceAsBytes(this.string_buf);
+
+ Output.prettyErrorln(
+ \\ Versions count: {d}
+ \\ External Strings count: {d}
+ \\ Package Versions count: {d}
+ \\
+ \\ Bytes:
+ \\
+ \\ Versions: {d}
+ \\ External: {d}
+ \\ Packages: {d}
+ \\ Strings: {d}
+ \\ Total: {d}
+ , .{
+ this.versions.len,
+ this.external_strings.len,
+ this.package_versions.len,
+
+ std.mem.sliceAsBytes(this.versions).len,
+ std.mem.sliceAsBytes(this.external_strings).len,
+ std.mem.sliceAsBytes(this.package_versions).len,
+ std.mem.sliceAsBytes(this.string_buf).len,
+ std.mem.sliceAsBytes(this.versions).len +
+ std.mem.sliceAsBytes(this.external_strings).len +
+ std.mem.sliceAsBytes(this.package_versions).len +
+ std.mem.sliceAsBytes(this.string_buf).len,
+ });
+ Output.flush();
+ }
+
+ pub fn findBestVersion(this: *const PackageManifest, group: Semver.Query.Group) ?*PackageVersion {
const left = group.head.head.range.left;
// Fast path: exact version
- if (left.op == .version) {
+ if (left.op == .eql) {
if (!left.version.tag.hasPre()) {
- return this.releases.getPtr(left.version);
+ return &this.pkg.releases.values.mut(this.package_versions)[this.pkg.releases.findKeyIndex(this.versions, left.version) orelse return null];
} else {
- return this.prereleases.getPtr(left.version);
+ return &this.pkg.prereleases.values.mut(this.package_versions)[this.pkg.prereleases.findKeyIndex(this.versions, left.version) orelse return null];
}
}
- // For now, this is the dumb way
- for (this.releases.keys()) |version| {
- if (group.satisfies(version)) {
- return this.releases.getPtr(version);
- }
- }
+ // // For now, this is the dumb way
+ // for (this.pkg.releases.keys) |version| {
+ // if (group.satisfies(version)) {
+ // return this.releases.getPtr(version);
+ // }
+ // }
- for (this.prereleases.keys()) |version| {
- if (group.satisfies(version)) {
- return this.prereleases.getPtr(version);
- }
- }
+ // for (this.prereleases.keys()) |version| {
+ // if (group.satisfies(version)) {
+ // return this.prereleases.getPtr(version);
+ // }
+ // }
return null;
}
- const VersionMap = std.ArrayHashMap(Semver.Version, VersionedPackage, Semver.Version.HashContext, false);
- const DistTagMap = extern struct {
- tags: []const ExternalString = &[_]ExternalString{},
- versions: []const Semver.Version = &[_]Semver.Version{},
- };
-
- pub const VersionedPackage = extern struct {
- dependencies: ExternalStringMap = ExternalStringMap{},
- optional_dependencies: ExternalStringMap = ExternalStringMap{},
- integrity: ExternalString = ExternalString{},
- shasum: ExternalString = ExternalString{},
- bins: ExternalStringMap = ExternalStringMap{},
- bin_dir: ExternalString = ExternalString{},
- man_dir: ExternalString = ExternalString{},
- unpacked_size: u64 = 0,
- file_count: u64 = 0,
-
- os_matches: bool = true,
- cpu_matches: bool = true,
-
- loaded_dependencies: ?*Dependency.List = null,
- loaded_optional_dependencies: ?*Dependency.List = null,
- };
-
+ /// This parses [Abbreviated metadata](https://github.com/npm/registry/blob/master/docs/responses/package-metadata.md#abbreviated-metadata-format)
pub fn parse(
allocator: *std.mem.Allocator,
log: *logger.Log,
@@ -518,42 +788,45 @@ const Npm = struct {
expected_name: []const u8,
etag: []const u8,
last_modified: []const u8,
- ) !?ResolvedPackage {
- // npm package names have a max of 512 chars
- // we add an extra 32 because a little wiggle room is good
- var expected_name_buf: [512 + "/package.json".len + 32]u8 = undefined;
- const source = logger.Source.initPathString(try std.fmt.bufPrint(&expected_name_buf, "{s}/package.json", .{expected_name}), json_buffer);
+ public_max_age: u32,
+ ) !?PackageManifest {
+ const source = logger.Source.initPathString(expected_name, json_buffer);
const json = json_parser.ParseJSON(&source, log, allocator) catch |err| {
return null;
};
if (json.asProperty("error")) |error_q| {
- if (error_q.asString(allocator)) |err| {
+ if (error_q.expr.asString(allocator)) |err| {
log.addErrorFmt(&source, logger.Loc.Empty, allocator, "npm error: {s}", .{err}) catch unreachable;
return null;
}
}
- var result = ResolvedPackage{
- .name = ExternalString{},
- .releases = DependencyMap.initContext(allocator, Semver.Version.HashContext{}),
- .prereleases = DependencyMap.initContext(allocator, Semver.Version.HashContext{}),
- .string_buf = &[_]u8{},
+ var result = PackageManifest{
+ .name = "",
};
var string_builder = StringBuilder{};
+ string_builder.count(last_modified);
+ string_builder.count(etag);
if (json.asProperty("name")) |name_q| {
const name = name_q.expr.asString(allocator) orelse return null;
- if (name != expected_name) {
- Output.panic("<r>internal: <red>package name mismatch<r> expected \"{s}\" but received <b>\"{s}\"<r>", .{ expected_name, name });
+ if (!strings.eql(name, expected_name)) {
+ Output.panic("<r>internal: <red>package name mismatch<r> expected <b>\"{s}\"<r> but received <red>\"{s}\"<r>", .{ expected_name, name });
return null;
}
string_builder.count(name);
}
+ if (json.asProperty("modified")) |name_q| {
+ const name = name_q.expr.asString(allocator) orelse return null;
+
+ string_builder.count(name);
+ }
+
var release_versions_len: usize = 0;
var pre_versions_len: usize = 0;
var dependency_sum: usize = 0;
@@ -600,7 +873,7 @@ const Npm = struct {
for (properties) |property| {
if (property.key.?.asString(allocator)) |key| {
string_builder.count(key);
- string_builder.count(property.value.?.data.e_string.utf8.len);
+ string_builder.count(property.value.?.data.e_string.utf8);
}
}
}
@@ -623,63 +896,103 @@ const Npm = struct {
}
}
+ var versioned_packages = try allocator.alloc(PackageVersion, release_versions_len + pre_versions_len);
+ std.mem.set(
+ PackageVersion,
+ versioned_packages,
+
+ PackageVersion{},
+ );
+ var all_semver_versions = try allocator.alloc(Semver.Version, release_versions_len + pre_versions_len);
+ std.mem.set(Semver.Version, all_semver_versions, Semver.Version{});
var all_extern_strings = try allocator.alloc(ExternalString, extern_string_count);
- try string_builder.allocate();
+ std.mem.set(
+ ExternalString,
+ all_extern_strings,
+
+ ExternalString{},
+ );
+ var versioned_package_releases = versioned_packages[0..release_versions_len];
+ var versioned_package_prereleases = versioned_packages[release_versions_len..][0..pre_versions_len];
+ var all_release_versions = all_semver_versions[0..release_versions_len];
+ var all_prerelease_versions = all_semver_versions[release_versions_len..][0..pre_versions_len];
+ var release_versions = all_release_versions;
+ var prerelease_versions = all_prerelease_versions;
+
+ var extern_strings = all_extern_strings;
+ try string_builder.allocate(allocator);
var string_buf = string_builder.ptr.?[0..string_builder.cap];
if (json.asProperty("name")) |name_q| {
const name = name_q.expr.asString(allocator) orelse return null;
- string_builder.append(name);
- result.name = name;
+ result.name = string_builder.append(name);
+ result.pkg.name = ExternalString.init(string_buf, result.name, std.hash.Wyhash.hash(0, name));
}
+ var unique_string_count: usize = 0;
+ var unique_string_len: usize = 0;
+ var string_slice = SlicedString.init(string_buf, string_buf);
get_versions: {
if (json.asProperty("versions")) |versions_q| {
if (versions_q.expr.data != .e_object) break :get_versions;
const versions = versions_q.expr.data.e_object.properties;
- var package_versions = try allocator.alloc(VersionedPackage, release_versions_len + pre_versions_len);
- try result.releases.ensureTotalCapacity(release_versions_len);
- try result.prereleases.ensureTotalCapacity(pre_versions_len);
-
var all_dependency_names_and_values = all_extern_strings[0 .. dependency_sum * 2];
- all_extern_strings = all_extern_strings[dependency_sum * 2 ..];
+
var dependency_names = all_dependency_names_and_values[0..dependency_sum];
var dependency_values = all_dependency_names_and_values[dependency_sum..];
- var prev_names = dependency_names[0..0];
- var prev_versions = dependency_values[0..0];
+ var prev_names: []ExternalString = &[_]ExternalString{};
+ var prev_versions: []ExternalString = &[_]ExternalString{};
+ const DedupString = std.HashMap(
+ u64,
+ ExternalString,
+ struct {
+ pub fn hash(this: @This(), key: u64) u64 {
+ return key;
+ }
+ pub fn eql(this: @This(), a: u64, b: u64) bool {
+ return a == b;
+ }
+ },
+ 80,
+ );
+ var deduper = DedupString.init(allocator);
+ defer deduper.deinit();
for (versions) |prop, version_i| {
- const version_name = string_builder.append(prop.key.?.asString(allocator) orelse continue);
+ const version_name = prop.key.?.asString(allocator) orelse continue;
- const parsed_version = Semver.Version.parse(SlicedString.init(string_buf, version_name), allocator);
+ const parsed_version = Semver.Version.parse(SlicedString.init(version_name, version_name), allocator);
std.debug.assert(parsed_version.valid);
if (!parsed_version.valid) {
- log.addErrorFmt(source, prop.value.?.loc, allocator, "Failed to parse dependency {s}", .{name}) catch unreachable;
+ log.addErrorFmt(&source, prop.value.?.loc, allocator, "Failed to parse dependency {s}", .{version_name}) catch unreachable;
continue;
}
- var package_version = VersionedPackage{};
+ var package_version = PackageVersion{};
var count: usize = 0;
- if (prop.value.?.asProperty("dependencies")) |versioned_deps| {
+ const versioned_deps_ = prop.value.?.asProperty("dependencies");
+ const cpu_prop = prop.value.?.asProperty("cpu");
+ const os_prop = prop.value.?.asProperty("os");
+ if (versioned_deps_) |versioned_deps| {
if (versioned_deps.expr.data == .e_object) {
count = versioned_deps.expr.data.e_object.properties.len;
}
}
- if (prop.value.?.asProperty("cpu")) |cpu_prop| {
+ if (cpu_prop) |cpu| {
const CPU = comptime if (Environment.isAarch64) "arm64" else "x64";
package_version.cpu_matches = false;
- switch (cpu_prop.expr.data) {
+ switch (cpu.expr.data) {
.e_array => |arr| {
for (arr.items) |item| {
- if (item.asString(allocator)) |cpu| {
- if (cpu.eql(@TypeOf(CPU), CPU)) {
+ if (item.asString(allocator)) |cpu_str| {
+ if (strings.eqlComptime(cpu_str, CPU)) {
package_version.cpu_matches = true;
break;
}
@@ -687,22 +1000,22 @@ const Npm = struct {
}
},
.e_string => |str| {
- package_version.cpu_matches = str.eql(@TypeOf(CPU), CPU);
+ package_version.cpu_matches = strings.eql(str.utf8, CPU);
},
else => {},
}
}
- if (prop.value.?.asProperty("os")) |cpu_prop| {
+ if (os_prop) |os| {
// TODO: musl
const OS = comptime if (Environment.isLinux) "linux" else "darwin";
package_version.os_matches = false;
- switch (cpu_prop.expr.data) {
+ switch (os.expr.data) {
.e_array => |arr| {
for (arr.items) |item| {
- if (item.asString(allocator)) |cpu| {
- if (cpu.eql(@TypeOf(OS), OS)) {
+ if (item.asString(allocator)) |os_str| {
+ if (strings.eqlComptime(os_str, OS)) {
package_version.os_matches = true;
break;
}
@@ -710,47 +1023,46 @@ const Npm = struct {
}
},
.e_string => |str| {
- package_version.os_matches = str.eql(@TypeOf(OS), OS);
+ package_version.os_matches = strings.eql(str.utf8, OS);
},
else => {},
}
}
- var this_names = dependency_names[0..count];
- var this_versions = dependency_values[0..count];
-
integrity: {
if (prop.value.?.asProperty("dist")) |dist| {
if (dist.expr.data == .e_object) {
- if (dist.expr.asProperty("integrity")) |shasum| {
- if (shasum.expr.asString(allocator)) |shasum_str| {
- package_version.integrity = string_builder.append(shasum_str);
- break :integrity;
- }
- }
-
if (dist.expr.asProperty("fileCount")) |file_count_| {
if (file_count_.expr.data == .e_number) {
- package_version.file_count = @floatToInt(u64, @maximum(@floor(file_count_.expr.data.e_number.value), 0));
+ package_version.file_count = file_count_.expr.data.e_number.toU64();
}
}
if (dist.expr.asProperty("unpackedSize")) |file_count_| {
if (file_count_.expr.data == .e_number) {
- package_version.unpacked_size = @floatToInt(u64, @maximum(@floor(file_count_.expr.data.e_number.value), 0));
+ package_version.unpacked_size = file_count_.expr.data.e_number.toU64();
+ }
+ }
+
+ if (dist.expr.asProperty("integrity")) |shasum| {
+ if (shasum.expr.asString(allocator)) |shasum_str| {
+ package_version.integrity = string_slice.sub(string_builder.append(shasum_str)).external();
+ break :integrity;
}
}
if (dist.expr.asProperty("shasum")) |shasum| {
if (shasum.expr.asString(allocator)) |shasum_str| {
- package_version.shasum = string_builder.append(shasum_str);
+ package_version.shasum = string_slice.sub(string_builder.append(shasum_str)).external();
}
}
}
}
}
+ if (versioned_deps_) |versioned_deps| {
+ var this_names = dependency_names[0..count];
+ var this_versions = dependency_values[0..count];
- if (prop.value.?.asProperty("dependencies")) |versioned_deps| {
const items = versioned_deps.expr.data.e_object.properties;
var any_differences = false;
for (items) |item, i| {
@@ -758,24 +1070,33 @@ const Npm = struct {
// Often, npm packages have the same dependency names/versions many times.
// This is a cheap way to usually dedup these dependencies.
const name_str = item.key.?.asString(allocator) orelse continue;
+ const version_str = item.value.?.asString(allocator) orelse continue;
+
const name_hash = std.hash.Wyhash.hash(0, name_str);
+ const version_hash = std.hash.Wyhash.hash(0, version_str);
+ var name_entry = try deduper.getOrPut(name_hash);
+ var version_entry = try deduper.getOrPut(version_hash);
- if (prev_names.len > i) this_names[i] = prev_names[i];
- if (!(prev_names.len > i and prev_names[i].hash == name_hash)) {
+ unique_string_count += @as(usize, @boolToInt(!name_entry.found_existing)) + @as(usize, @boolToInt(!version_entry.found_existing));
+ unique_string_len += @as(usize, @boolToInt(!name_entry.found_existing) * name_str.len) + @as(usize, @boolToInt(!version_entry.found_existing) * version_str.len);
+
+ if (!name_entry.found_existing) {
any_differences = true;
this_names[i] = ExternalString.init(string_buf, string_builder.append(name_str), name_hash);
}
- const version_str = item.value.?.asString(allocator) orelse continue;
- const version_hash = std.hash.Wyhash.hash(0, version_str);
-
if (prev_versions.len > i) this_versions[i] = prev_versions[i];
if (!(prev_versions.len > i and prev_versions[i].hash == version_hash)) {
any_differences = true;
this_versions[i] = ExternalString.init(string_buf, string_builder.append(version_str), version_hash);
}
+
+ count = i;
}
+ this_names = this_names[0..count];
+ this_versions = this_versions[0..count];
+
if (any_differences) {
dependency_names = dependency_names[count..];
dependency_values = dependency_values[count..];
@@ -783,61 +1104,62 @@ const Npm = struct {
this_names = prev_names;
this_versions = prev_versions;
}
- }
- package_version.dependencies = ExternalStringMap{ .name = this_names, .value = this_versions };
- package_versions[0] = package_version;
- package_versions = package_versions[1..];
+ if (this_names.len > 0) {
+ package_version.dependencies = ExternalStringMap{
+ .name = ExternalStringList.init(all_extern_strings, this_names),
+ .value = ExternalStringList.init(all_extern_strings, this_versions),
+ };
+ }
+ }
if (!parsed_version.version.tag.hasPre()) {
- result.releases.putAssumeCapacityNoClobber(
- parsed_version.version,
- list,
- );
+ release_versions[0] = parsed_version.version;
+ versioned_package_releases[0] = package_version;
+ release_versions = release_versions[1..];
+ versioned_package_releases = versioned_package_releases[1..];
} else {
- result.prereleases.putAssumeCapacityNoClobber(
- parsed_version.version,
- list,
- );
- }
-
- if (count > 0) {
- prev_names = this_names;
- prev_versions = this_versions;
+ prerelease_versions[0] = parsed_version.version;
+ versioned_package_prereleases[0] = package_version;
+ prerelease_versions = prerelease_versions[1..];
+ versioned_package_prereleases = versioned_package_prereleases[1..];
}
}
}
}
- // Since dist-tags are what will change most often, we will put them last incase we want to someday do incremental updates
- if (json.asProperty("dist-tags")) |dist| {
- if (dist.expr.data == .e_object) {
- const tags = dist.expr.data.e_object.properties;
- result.dist_tags.tags = all_extern_strings;
- for (tags) |tag, i| {
- if (tag.key.?.asString(allocator)) |key| {
- const tag_str = string_builder.append(key);
- const tag_hash = std.hash.Wyhash.hash(0, tag_str);
- result.dist_tags.tags[i] = ExternalString.init(string_buf, tag_str, tag_hash);
-
- var parse_result = Semver.Version.parse(SlicedString.init(source.contents, tag.value.?.asString(allocator)), allocator);
- if (parse_result.valid) {
- if (parse_result.version.tag.hasPre()) {
- const entry = result.prereleases.getEntry(parse_result.version) orelse unreachable;
- // saves us from copying the semver versions again
- result.dist_tags.versions[i] = entry.key_ptr.*;
- } else {
- const entry = result.releases.getEntry(parse_result.version) orelse unreachable;
- result.dist_tags.versions[i] = entry.key_ptr.*;
- }
- }
- }
- }
- }
+ result.pkg.last_modified = string_slice.sub(string_builder.append(last_modified)).external();
+ result.pkg.etag = string_slice.sub(string_builder.append(etag)).external();
+
+ result.pkg.releases.keys.len = @truncate(u32, release_versions_len);
+ result.pkg.releases.values.len = @truncate(u32, release_versions_len);
+
+ result.pkg.prereleases.keys.off = result.pkg.releases.keys.len;
+ result.pkg.prereleases.values.len = @truncate(u32, pre_versions_len);
+
+ result.pkg.string_lists_buf.off = 0;
+ result.pkg.string_lists_buf.len = @truncate(u32, all_extern_strings.len);
+
+ result.pkg.versions_buf.off = 0;
+ result.pkg.versions_buf.len = @truncate(u32, all_semver_versions.len);
+
+ result.versions = all_semver_versions;
+ result.external_strings = all_extern_strings;
+ result.package_versions = versioned_packages;
+
+ if (json.asProperty("modified")) |name_q| {
+ const name = name_q.expr.asString(allocator) orelse return null;
+
+ result.pkg.modified = string_slice.sub(string_builder.append(name)).external();
}
if (string_builder.ptr) |ptr| {
result.string_buf = ptr[0..string_builder.len];
+ result.pkg.string_buf = BigExternalString{
+ .off = 0,
+ .len = @truncate(u32, string_builder.len),
+ .hash = 0,
+ };
}
return result;
@@ -845,6 +1167,437 @@ const Npm = struct {
};
};
-pub const Install = struct {
- root_package: *Package = undefined,
+pub const DownloadPackageManifestTask = struct {
+ ctx: *PackageManager,
+ name: string,
+
+ task: ThreadPool.Task,
+
+ pub fn download(task: *ThreadPool.Task) void {
+ var this: *DownloadPackageManifestTask = @fieldParentPtr(DownloadPackageManifestTask, "task", task);
+ defer {
+ var node = @fieldParentPtr(Pool.LinkedList.Node, "data", this);
+ this.name = "";
+ Pool.release(node);
+ }
+
+ var log = logger.Log.init(this.ctx.allocator);
+ defer if (log.msgs.items.len > 0) this.ctx.mergeLog(log);
+ var package_manifest = this.ctx.registry.getPackageMetadata(this.ctx.allocator, &log, this.name, "", "") catch |err| {
+ log.addErrorFmt(null, logger.Loc.Empty, this.ctx.allocator, "Error fetching package manifest: {s}", .{@errorName(err)}) catch unreachable;
+ return;
+ };
+ switch (package_manifest) {
+ .not_found => {
+ log.addErrorFmt(null, logger.Loc.Empty, this.ctx.allocator, "Package not found: {s}", this.name);
+ return;
+ },
+ .fresh => |resolved| {
+ this.ctx.appendPackageResolution(resolved);
+ },
+ else => unreachable,
+ }
+ }
+
+ pub fn initFn(allocator: *std.mem.Allocator) !DownloadPackageManifestTask {
+ return DownloadPackageManifestTask{ .ctx = undefined, .name = "", .task = .{ .callback = download } };
+ }
+
+ pub const Pool = ObjectPool(DownloadPackageManifestTask, initFn);
+};
+
+const Task = union(Tag) {
+ resolve: ResolveTask,
+ install: InstallTask,
+
+ pub const Tag = enum {
+ resolve,
+ install,
+ };
+};
+
+pub const DependencyLevel = enum { dependency, dev, optional, peer };
+pub const Dependents = std.EnumArray(DependencyLevel, std.ArrayListUnmanaged(PackageID));
+
+pub const Installation = struct {
+ tarball_path: string = "",
+ cached_dir: string = "",
+};
+
+const PackageBlock = struct {
+ pub const block_size = 256;
+ items: [block_size]Package = undefined,
+ dependents: [block_size]Dependents = undefined,
+ installations: [block_size]Installation = undefined,
+ next: std.atomic.Atomic(?*PackageBlock) = std.atomic.Atomic(?*PackageBlock).init(null),
+ lock: Lock = Lock.init(),
+ len: std.atomic.Atomic(u16) = std.atomic.Atomic(u16).init(0),
+
+ pub fn append(this: *PackageBlock, package: Package) *Package {
+ this.lock.lock();
+ defer this.lock.unlock();
+ const i = this.len.fetchAdd(1, .Monotonic);
+ this.items[i] = package;
+ this.dependents[i] = Dependents.initFill(std.ArrayListUnmanaged(PackageID){});
+ return &this.items[i];
+ }
+};
+
+const PackageList = struct {
+ head: PackageBlock = PackageBlock{},
+ tail: std.atomic.Atomic(?*PackageBlock) = std.atomic.Atomic(?*PackageBlock).init(null),
+ allocator: *std.mem.Allocator = undefined,
+ pub fn append(this: *PackageList, package: Package) !*Package {
+ var block: *PackageBlock = this.tail.load(.Monotonic) orelse &this.head;
+ std.debug.assert(block.next.load(.Monotonic) == null);
+
+ if (block.len.fetchMin(PackageBlock.block_size, .Monotonic) >= PackageBlock.block_size) {
+ block.lock.lock();
+ defer block.lock.unlock();
+ var tail = try this.allocator.create(PackageBlock);
+ tail.* = PackageBlock{};
+ tail.items[0] = package;
+ tail.dependents[0] = Dependents.initFill(std.ArrayListUnmanaged(PackageID){});
+ tail.len.storeUnchecked(1);
+ block.next = tail;
+ this.tail.store(tail, .Monotonic);
+ return &tail.items[0];
+ } else {
+ return block.append(package);
+ }
+ }
+};
+
+const IdentityContext = struct {
+ pub fn hash(this: @This(), key: u32) u64 {
+ return key;
+ }
+
+ pub fn eql(this: @This(), a: u32, b: u32) bool {
+ return a == b;
+ }
+};
+
+const ArrayIdentityContext = struct {
+ pub fn hash(this: @This(), key: u32) u32 {
+ return key;
+ }
+
+ pub fn eql(this: @This(), a: u32, b: u32) bool {
+ return a == b;
+ }
+};
+
+const DependencyRequest = struct {
+ version: Dependency.Version,
+ from: PackageID = invalid_package_id,
+ resolution: PackageID = invalid_package_id,
+};
+
+/// Versions & ranges to resolve for a package
+/// A linked list so that we can append without allocating
+/// We expect individual queues to not be 100s long, so it shouldn't be so bad to use pointers here
+const ResolveQueue = std.SinglyLinkedList(*Dependency);
+// Hash table mapping Manifest.name_hash to
+const ResolveMap = std.ArrayHashMap(u32, ResolveQueue, ArrayIdentityContext, false);
+
+const ThreadPool = @import("../thread_pool.zig");
+
+// We can't know all the package s we need until we've downloaded all the packages
+// The easy way wouild be:
+// 1. Download all packages, parsing their dependencies and enqueuing all dependnecies for resolution
+// 2.
+pub const PackageManager = struct {
+ pub var package_list = PackageList{};
+
+ enable_cache: bool = true,
+ cache_directory_path: string = "",
+ cache_directory: std.fs.Dir = undefined,
+ root_dir: *Fs.FileSystem.DirEntry,
+ env_loader: *DotEnv.Loader,
+ allocator: *std.mem.Allocator,
+ root_package: *Package,
+ log: *logger.Log,
+
+ default_features: Package.Features = Package.Features{},
+
+ registry: Npm.Registry = Npm.Registry{},
+
+ /// Tracks a list of packages we have already enqueued for downloading
+ /// The key is Dependency.name_hash
+ /// The queue for actually downloading is separate
+ seen_npm_packages: PackageDedupeList = PackageDedupeList{},
+ seen_npm_packages_lock: Lock = Lock.init(),
+
+ seen_tarball_urls: PackageDedupeList = PackageDedupeList{},
+ seen_tarball_urls_lock: Lock = Lock.init(),
+ thread_pool: ThreadPool,
+
+ manifests_lock: Lock = Lock.init(),
+ manifests: PackageManifestMap = PackageManifestMap{},
+
+ resolve_lock: Lock = Lock.init(),
+ pending_resolve_queue: ResolveMap = ResolveMap{},
+ pending_resolutions_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+
+ const PackageManifestMap = std.StringHashMapUnmanaged(Npm.PackageManifest);
+ const PackageDedupeList = std.HashMapUnmanaged(
+ u32,
+ void,
+ IdentityContext,
+ 80,
+ );
+
+ fn doesNeedToDownloadPackageManifest(this: *PackageManager, name_hash: u32) bool {
+ return !this.seen_npm_packages.getOrPutAssumeCapacity(name_hash).found_existing;
+ }
+
+ inline fn enqueueNpmPackage(this: *PackageManager, batch: *ThreadPool.Batch, name: string) *DownloadPackageManifestTask {
+ var node = DownloadPackageManifestTask.Pool.get(this.allocator);
+ node.data.name = name;
+ node.data.ctx = this;
+ return node;
+ }
+
+ fn enqueuePackages(this: *PackageManager, dependencies: Dependency.List) ThreadPool.Batch {
+ var batch = ThreadPool.Batch{};
+ var count: u32 = 0;
+ var slice = dependencies.unmanaged.entries.slice();
+ const values = slice.items(.value);
+ var i: usize = 0;
+ var last_npm_package: ?*DownloadPackageManifestTask = null;
+ while (i < values.len) : (i += 1) {
+ const dependency: Dependency = values[i];
+ switch (dependency.version) {
+ .npm, .dist_tag => {
+ if (this.doesNeedToDownloadPackageManifest(dependency.name_hash)) {
+ var current = this.enqueueNpmPackage(dependency);
+ if (last_npm_package != null) {
+ batch.tail.?.node.next = &current.task.node;
+ batch.len += 1;
+ } else {
+ batch = ThreadPool.Batch.from(current);
+ }
+ if (verbose_install) {
+ Output.prettyErrorln("Enqueue dependency: {s}", .{dependency.name});
+ }
+ batch.tail = current;
+ last_npm_package = current;
+ }
+ },
+ else => {},
+ }
+ }
+
+ if (verbose_install) Output.flush();
+
+ return batch;
+ }
+
+ pub fn enqueueDependencyList(this: *PackageManager, package: *const Package, features: Package.Features) void {
+
+ // Step 2. Allocate the list
+ if (package.npm_count > 0) {
+ this.seen_npm_packages_lock.lock();
+ defer this.seen_npm_packages_lock.unlock();
+ this.seen_npm_packages.ensureUnusedCapacity(package.npm_count) catch unreachable;
+ var batch = this.enqueuePackages(package.dependencies);
+
+ if (features.dev_dependencies) {
+ batch = batch.push(this.enqueuePackages(package.dev_dependencies));
+ }
+
+ if (features.peer_dependencies) {
+ batch = batch.push(this.enqueuePackages(package.peer_dependencies));
+ }
+
+ if (features.optional_dependencies) {
+ batch = batch.push(this.enqueuePackages(package.optional_dependencies));
+ }
+
+ this.thread_pool.schedule(batch);
+ }
+ }
+
+ pub fn appendPackageResolution(this: *PackageManager, manifest: Npm.PackageManifest) void {
+ const name_hash = @truncate(u32, manifest.pkg.name.hash);
+ {
+ this.manifests_lock.lock();
+ defer this.manifests_lock.unlock();
+
+ this.manifests.getOrPutValue(this.allocator, name_hash, manifest) catch unreachable;
+ }
+
+ {
+ this.resolve_lock.lock();
+ defer this.resolve_lock.unlock();
+ if (this.pending_resolve_queue.get(name_hash)) |pending| {
+ while (pending.popFirst()) |semver_group| {}
+ }
+ }
+ }
+
+ pub fn fetchCacheDirectoryPath(
+ allocator: *std.mem.Allocator,
+ env_loader: *DotEnv.Loader,
+ root_dir: *Fs.FileSystem.DirEntry,
+ ) ?string {
+ if (env_loader.map.get("BUN_INSTALL_CACHE_DIR")) |dir| {
+ return dir;
+ }
+
+ if (env_loader.map.get("BUN_INSTALL")) |dir| {
+ var parts = [_]string{ dir, "install/", "cache/" };
+ return Fs.FileSystem.instance.joinBuf(&parts);
+ }
+
+ if (env_loader.map.get("HOME")) |dir| {
+ var parts = [_]string{ dir, ".bun/", "install/", "cache/" };
+ return Fs.FileSystem.instance.joinBuf(&parts);
+ }
+
+ if (env_loader.map.get("XDG_CACHE_HOME")) |dir| {
+ var parts = [_]string{ dir, ".bun/", "install/", "cache/" };
+ return Fs.FileSystem.instance.joinBuf(&parts);
+ }
+
+ return null;
+ }
+
+ fn loadAllDependencies(this: *PackageManager) !void {}
+ fn installDependencies(this: *PackageManager) !void {}
+
+ var cwd_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
+ var package_json_cwd_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
+ pub fn install(
+ ctx: Command.Context,
+ ) !void {
+ var fs = try Fs.FileSystem.init1(ctx.allocator, null);
+ var original_cwd = std.mem.trimRight(u8, fs.top_level_dir, "/");
+
+ std.mem.copy(u8, &cwd_buf, original_cwd);
+
+ // Step 1. Find the nearest package.json directory
+ //
+ // We will walk up from the cwd, calling chdir on each directory until we find a package.json
+ // If we fail to find one, we will report an error saying no packages to install
+ var package_json_file: std.fs.File = brk: {
+ break :brk std.fs.cwd().openFileZ("package.json", .{ .read = true, .write = true }) catch |err2| {
+ var this_cwd = original_cwd;
+ outer: while (std.fs.path.dirname(this_cwd)) |parent| {
+ cwd_buf[parent.len + 1] = 0;
+ var chdir = cwd_buf[0..parent.len :0];
+
+ std.os.chdirZ(chdir) catch break :brk null;
+ std.fs.cwd().openFileZ("package.json", .{ .read = true, .write = true }) catch |err| {
+ this_cwd = parent;
+ continue :outer;
+ };
+ }
+
+ break :brk null;
+ };
+ } orelse {
+ Output.prettyErrorln("<r><red>Missing package.json<r>! Nothing to install.", .{});
+ Output.flush();
+ return;
+ };
+
+ fs.top_level_dir = try std.os.getcwd(&cwd_buf);
+ cwd_buf[fs.top_level_dir.len] = '/';
+ cwd_buf[fs.top_level_dir.len + 1] = 0;
+ fs.top_level_dir = cwd_buf[0 .. fs.top_level_dir.len + 1];
+ std.mem.copy(u8, &package_json_cwd_buf, fs.top_level_dir);
+ std.mem.copy(u8, package_json_cwd_buf[fs.top_level_dir.len..], "package.json");
+ var package_json_contents = package_json_file.readToEndAlloc(ctx.allocator, std.math.maxInt(usize)) catch |err| {
+ Output.prettyErrorln("<r><red>{s} reading package.json<r>!", .{@errorName(err)});
+ Output.flush();
+ return;
+ };
+ // Step 2. Parse the package.json file
+ //
+ var package_json_source = logger.Source.initPathString(
+ package_json_cwd_buf[0 .. fs.top_level_dir.len + "package.json".len],
+ );
+ package_list.items[0] = try Package.parse(
+ ctx.allocator,
+ ctx.log,
+ package_json_source,
+ Package.Features{
+ .optional_dependencies = true,
+ .dev_dependencies = true,
+ .is_main = true,
+ },
+ );
+ var root = &package_list.items[0];
+ package_list.len = 1;
+ var env_loader: DotEnv.Loader = brk: {
+ var map = try ctx.allocator.create(DotEnv.Map);
+ map.* = DotEnv.Map.init(ctx.allocator);
+
+ break :brk DotEnv.Loader.init(map, ctx.allocator);
+ };
+
+ var entries_option = try fs.fs.readDirectory(fs.top_Level_dir, std.fs.cwd());
+ var enable_cache = false;
+ var cache_directory_path: string = "";
+
+ if (Install.fetchCacheDirectoryPath(ctx.allocator, env_loader, entries_option.dir)) |cache_dir_path| {
+ enable_cache = true;
+ cache_directory_path = try fs.dirname_store.append(@TypeOf(cache_dir_path), cache_dir_path);
+ }
+
+ if (verbose_install) {
+ Output.prettyErrorln("Cache Dir: {s}", .{cache_directory_path});
+ Output.flush();
+ }
+
+ var manager = PackageManager{
+ .enable_cache = enable_cache,
+ .cache_directory_path = cache_directory_path,
+ .env_loader = env_loader,
+ .allocator = ctx.allocator,
+ .log = ctx.log,
+ .root_dir = entries_option.dir,
+ .root_package = root,
+ .thread_pool = ThreadPool.init(.{}),
+ };
+ package_list.allocator = ctx.allocator;
+
+ try manager.enqueueDependencyList(
+ &package_list.items[0],
+ Package.Features{
+ .optional_dependencies = true,
+ .dev_dependencies = true,
+ .is_main = true,
+ },
+ );
+
+ try manager.loadAllDependencies();
+ try manager.installDependencies();
+ }
};
+
+const verbose_install = true;
+
+test "getPackageMetadata" {
+ Output.initTest();
+
+ var registry = Npm.Registry{};
+ var log = logger.Log.init(default_allocator);
+
+ var response = try registry.getPackageMetadata(default_allocator, &log, "lodash", "", "");
+
+ const react_17 = try Semver.Query.parse(default_allocator, "1.2.0");
+
+ switch (response) {
+ .cached, .not_found => unreachable,
+ .fresh => |package| {
+ package.reportSize();
+ const react = package.findBestVersion(react_17) orelse unreachable;
+
+ const entry = react.dependencies.name.get(package.external_strings)[0];
+ // try std.testing.expectEqualStrings("loose-envify", entry.slice(package.string_buf));
+ },
+ }
+}
diff --git a/src/install/semver.zig b/src/install/semver.zig
index a5ab6a9f6..f3b0ab134 100644
--- a/src/install/semver.zig
+++ b/src/install/semver.zig
@@ -2,14 +2,14 @@ usingnamespace @import("../global.zig");
const std = @import("std");
pub const ExternalString = extern struct {
- off: u64 = 0,
- len: u64 = 0,
+ off: u32 = 0,
+ len: u16 = 0,
hash: u64 = 0,
pub fn from(in: string) ExternalString {
return ExternalString{
.off = 0,
- .len = in.len,
+ .len = @truncate(u16, in.len),
.hash = std.hash.Wyhash.hash(0, in),
};
}
@@ -18,8 +18,36 @@ pub const ExternalString = extern struct {
std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr) and ((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len)));
return ExternalString{
- .off = @ptrToInt(in.ptr) - @ptrToInt(buf.ptr),
- .len = in.len,
+ .off = @truncate(u32, @ptrToInt(in.ptr) - @ptrToInt(buf.ptr)),
+ .len = @truncate(u16, in.len),
+ .hash = hash,
+ };
+ }
+
+ pub fn slice(this: ExternalString, buf: string) string {
+ return buf[this.off..][0..this.len];
+ }
+};
+
+pub const BigExternalString = extern struct {
+ off: u32 = 0,
+ len: u32 = 0,
+ hash: u64 = 0,
+
+ pub fn from(in: string) ExternalString {
+ return ExternalString{
+ .off = 0,
+ .len = @truncate(u32, in.len),
+ .hash = std.hash.Wyhash.hash(0, in),
+ };
+ }
+
+ pub inline fn init(buf: string, in: string, hash: u64) ExternalString {
+ std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr) and ((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len)));
+
+ return ExternalString{
+ .off = @truncate(u32, @ptrToInt(in.ptr) - @ptrToInt(buf.ptr)),
+ .len = @truncate(u32, in.len),
.hash = hash,
};
}
@@ -40,7 +68,7 @@ pub const SlicedString = struct {
pub inline fn external(this: SlicedString) ExternalString {
std.debug.assert(@ptrToInt(this.buf.ptr) <= @ptrToInt(this.slice.ptr) and ((@ptrToInt(this.slice.ptr) + this.slice.len) <= (@ptrToInt(this.buf.ptr) + this.buf.len)));
- return ExternalString{ .off = @ptrToInt(this.slice.ptr) - @ptrToInt(this.buf.ptr), .len = this.slice.len, .hash = std.hash.Wyhash.hash(0, this.slice) };
+ return ExternalString{ .off = @truncate(u32, @ptrToInt(this.slice.ptr) - @ptrToInt(this.buf.ptr)), .len = @truncate(u16, this.slice.len), .hash = std.hash.Wyhash.hash(0, this.slice) };
}
pub inline fn sub(this: SlicedString, input: string) SlicedString {
@@ -49,12 +77,13 @@ pub const SlicedString = struct {
}
};
+const RawType = void;
pub const Version = extern struct {
major: u32 = 0,
minor: u32 = 0,
patch: u32 = 0,
tag: Tag = Tag{},
- raw: ExternalString = ExternalString{},
+ // raw: RawType = RawType{},
const HashableVersion = extern struct { major: u32, minor: u32, patch: u32, pre: u64, build: u64 };
@@ -230,15 +259,17 @@ pub const Version = extern struct {
.none => {},
.pre => {
result.tag.pre = sliced_string.sub(input[start..i]).external();
+ // a pre can contain multiple consecutive tags
if (comptime Environment.isDebug) {
- std.debug.assert(!strings.containsChar(result.tag.pre.slice(sliced_string.buf), '-'));
+ std.debug.assert(!strings.startsWithChar(result.tag.pre.slice(sliced_string.buf), '-'));
}
state = State.none;
},
.build => {
+ // a build can contain multiple consecutive tags
result.tag.build = sliced_string.sub(input[start..i]).external();
if (comptime Environment.isDebug) {
- std.debug.assert(!strings.containsChar(result.tag.build.slice(sliced_string.buf), '-'));
+ std.debug.assert(!strings.startsWithChar(result.tag.build.slice(sliced_string.buf), '+'));
}
state = State.none;
},
@@ -415,7 +446,11 @@ pub const Version = extern struct {
}
result.stopped_at = @intCast(u32, i);
- result.version.raw = sliced_string.sub(input[0..i]).external();
+
+ if (comptime RawType != void) {
+ result.version.raw = sliced_string.sub(input[0..i]).external();
+ }
+
return result;
}
@@ -487,15 +522,21 @@ pub const Range = struct {
return Range{
.left = Comparator{
.op = Op.gte,
- .version = Version{ .raw = version.raw },
+ .version = Version{
+ // .raw = version.raw
+ },
},
};
},
.minor => {
- var lhs = Version{ .raw = version.raw };
+ var lhs = Version{
+ // .raw = version.raw
+ };
lhs.major = version.major + 1;
- var rhs = Version{ .raw = version.raw };
+ var rhs = Version{
+ // .raw = version.raw
+ };
rhs.major = version.major;
return Range{
@@ -518,8 +559,8 @@ pub const Range = struct {
rhs.major = version.major;
rhs.minor = version.minor;
- rhs.raw = version.raw;
- lhs.raw = version.raw;
+ // rhs.raw = version.raw;
+ // lhs.raw = version.raw;
return Range{
.left = Comparator{
@@ -709,7 +750,7 @@ pub const Query = struct {
.right = .{
.op = .lt,
.version = Version{
- .raw = version.raw,
+ // .raw = version.raw,
.major = major,
.minor = minor,
.patch = patch,
@@ -1047,7 +1088,7 @@ const expect = struct {
pub fn isRangeMatch(input: string, version_str: string) bool {
var parsed = Version.parse(SlicedString.init(version_str, version_str), default_allocator);
std.debug.assert(parsed.valid);
- std.debug.assert(strings.eql(parsed.version.raw.slice(version_str), version_str));
+ // std.debug.assert(strings.eql(parsed.version.raw.slice(version_str), version_str));
var list = Query.parse(default_allocator, input) catch |err| Output.panic("Test fail due to error {s}", .{@errorName(err)});
diff --git a/src/js_ast.zig b/src/js_ast.zig
index 132ff1e05..7650a61a4 100644
--- a/src/js_ast.zig
+++ b/src/js_ast.zig
@@ -1058,6 +1058,11 @@ pub const E = struct {
pub const Number = struct {
value: f64,
+ pub inline fn toU64(self: Number) u64 {
+ @setRuntimeSafety(false);
+ return @floatToInt(u64, @maximum(@floor(self.value), 0));
+ }
+
pub fn jsonStringify(self: *const Number, opts: anytype, o: anytype) !void {
return try std.json.stringify(self.value, opts, o);
}
diff --git a/src/js_lexer_tables.zig b/src/js_lexer_tables.zig
index b594fbc2e..b619b71a1 100644
--- a/src/js_lexer_tables.zig
+++ b/src/js_lexer_tables.zig
@@ -5,8 +5,7 @@ const expectString = std.testing.expectEqualStrings;
const expect = std.testing.expect;
const logger = @import("logger.zig");
const unicode = std.unicode;
-const alloc = @import("alloc.zig");
-
+const default_allocator = @import("./global.zig").default_allocator;
pub const T = enum(u8) {
t_end_of_file,
t_syntax_error,
diff --git a/src/lock.zig b/src/lock.zig
index 16f28bc1e..23552f6ed 100644
--- a/src/lock.zig
+++ b/src/lock.zig
@@ -46,9 +46,7 @@ pub const Mutex = struct {
// Give up spinning if the Mutex is contended.
// This helps acquire() latency under micro-contention.
//
- // Only spin on x86 as other platforms are assumed to
- // prioritize power efficiency over strict performance.
- var spin: u8 = comptime if (is_x86) 100 else 0;
+ var spin: u8 = 100;
while (spin > 0) : (spin -= 1) {
std.atomic.spinLoopHint();
diff --git a/src/string_mutable.zig b/src/string_mutable.zig
index 5dc153c99..acbecba94 100644
--- a/src/string_mutable.zig
+++ b/src/string_mutable.zig
@@ -9,6 +9,10 @@ pub const MutableString = struct {
allocator: *std.mem.Allocator,
list: std.ArrayListUnmanaged(u8),
+ pub fn init2048(allocator: *std.mem.Allocator) !MutableString {
+ return MutableString.init(allocator, 2048);
+ }
+
pub const Writer = std.io.Writer(*@This(), anyerror, MutableString.writeAll);
pub fn writer(self: *MutableString) Writer {
return Writer{
diff --git a/src/tagged_pointer.zig b/src/tagged_pointer.zig
index ee1af487f..0b75b0f29 100644
--- a/src/tagged_pointer.zig
+++ b/src/tagged_pointer.zig
@@ -4,7 +4,7 @@ usingnamespace @import("./global.zig");
const TagSize = u15;
const AddressableSize = u49;
-const TaggedPointer = packed struct {
+pub const TaggedPointer = packed struct {
_ptr: AddressableSize,
data: TagSize,
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
new file mode 100644
index 000000000..38c102a77
--- /dev/null
+++ b/src/thread_pool.zig
@@ -0,0 +1,794 @@
+// Thank you @kprotty.
+// https://github.com/kprotty/zap/blob/blog/src/thread_pool.zig
+
+const std = @import("std");
+const ThreadPool = @This();
+const Futex = @import("./futex.zig");
+
+const assert = std.debug.assert;
+const Atomic = std.atomic.Atomic;
+
+stack_size: u32,
+max_threads: u32,
+sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
+idle_event: Event = .{},
+join_event: Event = .{},
+run_queue: Node.Queue = .{},
+threads: Atomic(?*Thread) = Atomic(?*Thread).init(null),
+
+const Sync = packed struct {
+ /// Tracks the number of threads not searching for Tasks
+ idle: u14 = 0,
+ /// Tracks the number of threads spawned
+ spawned: u14 = 0,
+ /// What you see is what you get
+ unused: bool = false,
+ /// Used to not miss notifications while state = waking
+ notified: bool = false,
+ /// The current state of the thread pool
+ state: enum(u2) {
+ /// A notification can be issued to wake up a sleeping as the "waking thread".
+ pending = 0,
+ /// The state was notifiied with a signal. A thread is woken up.
+ /// The first thread to transition to `waking` becomes the "waking thread".
+ signaled,
+ /// There is a "waking thread" among us.
+ /// No other thread should be woken up until the waking thread transitions the state.
+ waking,
+ /// The thread pool was terminated. Start decremented `spawned` so that it can be joined.
+ shutdown,
+ } = .pending,
+};
+
+/// Configuration options for the thread pool.
+/// TODO: add CPU core affinity?
+pub const Config = struct {
+ stack_size: u32 = (std.Thread.SpawnConfig{}).stack_size,
+ max_threads: u32,
+};
+
+/// Statically initialize the thread pool using the configuration.
+pub fn init(config: Config) ThreadPool {
+ return .{
+ .stack_size = std.math.max(1, config.stack_size),
+ .max_threads = std.math.max(1, config.max_threads),
+ };
+}
+
+/// Wait for a thread to call shutdown() on the thread pool and kill the worker threads.
+pub fn deinit(self: *ThreadPool) void {
+ self.join();
+ self.* = undefined;
+}
+
+/// A Task represents the unit of Work / Job / Execution that the ThreadPool schedules.
+/// The user provides a `callback` which is invoked when the *Task can run on a thread.
+pub const Task = struct {
+ node: Node = .{},
+ callback: fn (*Task) void,
+};
+
+/// An unordered collection of Tasks which can be submitted for scheduling as a group.
+pub const Batch = struct {
+ len: usize = 0,
+ head: ?*Task = null,
+ tail: ?*Task = null,
+
+ /// Create a batch from a single task.
+ pub fn from(task: *Task) Batch {
+ return Batch{
+ .len = 1,
+ .head = task,
+ .tail = task,
+ };
+ }
+
+ /// Another batch into this one, taking ownership of its tasks.
+ pub fn push(self: *Batch, batch: Batch) void {
+ if (batch.len == 0) return;
+ if (self.len == 0) {
+ self.* = batch;
+ } else {
+ self.tail.?.node.next = if (batch.head) |h| &h.node else null;
+ self.tail = batch.tail;
+ self.len += batch.len;
+ }
+ }
+};
+
+/// Schedule a batch of tasks to be executed by some thread on the thread pool.
+pub fn schedule(self: *ThreadPool, batch: Batch) void {
+ // Sanity check
+ if (batch.len == 0) {
+ return;
+ }
+
+ // Extract out the Node's from the Tasks
+ var list = Node.List{
+ .head = &batch.head.?.node,
+ .tail = &batch.tail.?.node,
+ };
+
+ // Push the task Nodes to the most approriate queue
+ if (Thread.current) |thread| {
+ thread.run_buffer.push(&list) catch thread.run_queue.push(list);
+ } else {
+ self.run_queue.push(list);
+ }
+
+ // Try to notify a thread
+ const is_waking = false;
+ return self.notify(is_waking);
+}
+
+inline fn notify(self: *ThreadPool, is_waking: bool) void {
+ // Fast path to check the Sync state to avoid calling into notifySlow().
+ // If we're waking, then we need to update the state regardless
+ if (!is_waking) {
+ const sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ if (sync.notified) {
+ return;
+ }
+ }
+
+ return self.notifySlow(is_waking);
+}
+
+noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
+ var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ while (sync.state != .shutdown) {
+ const can_wake = is_waking or (sync.state == .pending);
+ if (is_waking) {
+ assert(sync.state == .waking);
+ }
+
+ var new_sync = sync;
+ new_sync.notified = true;
+ if (can_wake and sync.idle > 0) { // wake up an idle thread
+ new_sync.state = .signaled;
+ } else if (can_wake and sync.spawned < self.max_threads) { // spawn a new thread
+ new_sync.state = .signaled;
+ new_sync.spawned += 1;
+ } else if (is_waking) { // no other thread to pass on "waking" status
+ new_sync.state = .pending;
+ } else if (sync.notified) { // nothing to update
+ return;
+ }
+
+ // Release barrier synchronizes with Acquire in wait()
+ // to ensure pushes to run queues happen before observing a posted notification.
+ sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
+ @bitCast(u32, sync),
+ @bitCast(u32, new_sync),
+ .Release,
+ .Monotonic,
+ ) orelse {
+ // We signaled to notify an idle thread
+ if (can_wake and sync.idle > 0) {
+ return self.idle_event.notify();
+ }
+
+ // We signaled to spawn a new thread
+ if (can_wake and sync.spawned < self.max_threads) {
+ const spawn_config = std.Thread.SpawnConfig{ .stack_size = self.stack_size };
+ const thread = std.Thread.spawn(spawn_config, Thread.run, .{self}) catch return self.unregister(null);
+ return thread.detach();
+ }
+
+ return;
+ });
+ }
+}
+
+noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
+ var is_idle = false;
+ var is_waking = _is_waking;
+ var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+
+ while (true) {
+ if (sync.state == .shutdown) return error.Shutdown;
+ if (is_waking) assert(sync.state == .waking);
+
+ // Consume a notification made by notify().
+ if (sync.notified) {
+ var new_sync = sync;
+ new_sync.notified = false;
+ if (is_idle)
+ new_sync.idle -= 1;
+ if (sync.state == .signaled)
+ new_sync.state = .waking;
+
+ // Acquire barrier synchronizes with notify()
+ // to ensure that pushes to run queue are observed after wait() returns.
+ sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
+ @bitCast(u32, sync),
+ @bitCast(u32, new_sync),
+ .Acquire,
+ .Monotonic,
+ ) orelse {
+ return is_waking or (sync.state == .signaled);
+ });
+
+ // No notification to consume.
+ // Mark this thread as idle before sleeping on the idle_event.
+ } else if (!is_idle) {
+ var new_sync = sync;
+ new_sync.idle += 1;
+ if (is_waking)
+ new_sync.state = .pending;
+
+ sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
+ @bitCast(u32, sync),
+ @bitCast(u32, new_sync),
+ .Monotonic,
+ .Monotonic,
+ ) orelse {
+ is_waking = false;
+ is_idle = true;
+ continue;
+ });
+
+ // Wait for a signal by either notify() or shutdown() without wasting cpu cycles.
+ // TODO: Add I/O polling here.
+ } else {
+ self.idle_event.wait();
+ sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ }
+ }
+}
+
+/// Marks the thread pool as shutdown
+pub noinline fn shutdown(self: *ThreadPool) void {
+ var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ while (sync.state != .shutdown) {
+ var new_sync = sync;
+ new_sync.notified = true;
+ new_sync.state = .shutdown;
+ new_sync.idle = 0;
+
+ // Full barrier to synchronize with both wait() and notify()
+ sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
+ @bitCast(u32, sync),
+ @bitCast(u32, new_sync),
+ .AcqRel,
+ .Monotonic,
+ ) orelse {
+ // Wake up any threads sleeping on the idle_event.
+ // TODO: I/O polling notification here.
+ if (sync.idle > 0) self.idle_event.shutdown();
+ return;
+ });
+ }
+}
+
+fn register(noalias self: *ThreadPool, noalias thread: *Thread) void {
+ // Push the thread onto the threads stack in a lock-free manner.
+ var threads = self.threads.load(.Monotonic);
+ while (true) {
+ thread.next = threads;
+ threads = self.threads.tryCompareAndSwap(
+ threads,
+ thread,
+ .Release,
+ .Monotonic,
+ ) orelse break;
+ }
+}
+
+fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
+ // Un-spawn one thread, either due to a failed OS thread spawning or the thread is exitting.
+ const one_spawned = @bitCast(u32, Sync{ .spawned = 1 });
+ const sync = @bitCast(Sync, self.sync.fetchSub(one_spawned, .Release));
+ assert(sync.spawned > 0);
+
+ // The last thread to exit must wake up the thread pool join()er
+ // who will start the chain to shutdown all the threads.
+ if (sync.state == .shutdown and sync.spawned == 1) {
+ self.join_event.notify();
+ }
+
+ // If this is a thread pool thread, wait for a shutdown signal by the thread pool join()er.
+ const thread = maybe_thread orelse return;
+ thread.join_event.wait();
+
+ // After receiving the shutdown signal, shutdown the next thread in the pool.
+ // We have to do that without touching the thread pool itself since it's memory is invalidated by now.
+ // So just follow our .next link.
+ const next_thread = thread.next orelse return;
+ next_thread.join_event.notify();
+}
+
+fn join(self: *ThreadPool) void {
+ // Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
+ var sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ if (!(sync.state == .shutdown and sync.spawned == 0)) {
+ self.join_event.wait();
+ sync = @bitCast(Sync, self.sync.load(.Monotonic));
+ }
+
+ assert(sync.state == .shutdown);
+ assert(sync.spawned == 0);
+
+ // If there are threads, start off the chain sending it the shutdown signal.
+ // The thread receives the shutdown signal and sends it to the next thread, and the next..
+ const thread = self.threads.load(.Acquire) orelse return;
+ thread.join_event.notify();
+}
+
+const Thread = struct {
+ next: ?*Thread = null,
+ target: ?*Thread = null,
+ join_event: Event = .{},
+ run_queue: Node.Queue = .{},
+ run_buffer: Node.Buffer = .{},
+
+ threadlocal var current: ?*Thread = null;
+
+ /// Thread entry point which runs a worker for the ThreadPool
+ fn run(thread_pool: *ThreadPool) void {
+ var self = Thread{};
+ current = &self;
+
+ thread_pool.register(&self);
+ defer thread_pool.unregister(&self);
+
+ var is_waking = false;
+ while (true) {
+ is_waking = thread_pool.wait(is_waking) catch return;
+
+ while (self.pop(thread_pool)) |result| {
+ if (result.pushed or is_waking)
+ thread_pool.notify(is_waking);
+ is_waking = false;
+
+ const task = @fieldParentPtr(Task, "node", result.node);
+ (task.callback)(task);
+ }
+ }
+ }
+
+ /// Try to dequeue a Node/Task from the ThreadPool.
+ /// Spurious reports of dequeue() returning empty are allowed.
+ fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole {
+ // Check our local buffer first
+ if (self.run_buffer.pop()) |node| {
+ return Node.Buffer.Stole{
+ .node = node,
+ .pushed = false,
+ };
+ }
+
+ // Then check our local queue
+ if (self.run_buffer.consume(&self.run_queue)) |stole| {
+ return stole;
+ }
+
+ // Then the global queue
+ if (self.run_buffer.consume(&thread_pool.run_queue)) |stole| {
+ return stole;
+ }
+
+ // TODO: add optimistic I/O polling here
+
+ // Then try work stealing from other threads
+ var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;
+ while (num_threads > 0) : (num_threads -= 1) {
+ // Traverse the stack of registered threads on the thread pool
+ const target = self.target orelse thread_pool.threads.load(.Acquire) orelse unreachable;
+ self.target = target.next;
+
+ // Try to steal from their queue first to avoid contention (the target steal's from queue last).
+ if (self.run_buffer.consume(&target.run_queue)) |stole| {
+ return stole;
+ }
+
+ // Skip stealing from the buffer if we're the target.
+ // We still steal from our own queue above given it may have just been locked the first time we tried.
+ if (target == self) {
+ continue;
+ }
+
+ // Steal from the buffer of a remote thread as a last resort
+ if (self.run_buffer.steal(&target.run_buffer)) |stole| {
+ return stole;
+ }
+ }
+
+ return null;
+ }
+};
+
+/// An event which stores 1 semaphore token and is multi-threaded safe.
+/// The event can be shutdown(), waking up all wait()ing threads and
+/// making subsequent wait()'s return immediately.
+const Event = struct {
+ state: Atomic(u32) = Atomic(u32).init(EMPTY),
+
+ const EMPTY = 0;
+ const WAITING = 1;
+ const NOTIFIED = 2;
+ const SHUTDOWN = 3;
+
+ /// Wait for and consume a notification
+ /// or wait for the event to be shutdown entirely
+ noinline fn wait(self: *Event) void {
+ var acquire_with: u32 = EMPTY;
+ var state = self.state.load(.Monotonic);
+
+ while (true) {
+ // If we're shutdown then exit early.
+ // Acquire barrier to ensure operations before the shutdown() are seen after the wait().
+ // Shutdown is rare so it's better to have an Acquire barrier here instead of on CAS failure + load which are common.
+ if (state == SHUTDOWN) {
+ std.atomic.fence(.Acquire);
+ return;
+ }
+
+ // Consume a notification when it pops up.
+ // Acquire barrier to ensure operations before the notify() appear after the wait().
+ if (state == NOTIFIED) {
+ state = self.state.tryCompareAndSwap(
+ state,
+ acquire_with,
+ .Acquire,
+ .Monotonic,
+ ) orelse return;
+ continue;
+ }
+
+ // There is no notification to consume, we should wait on the event by ensuring its WAITING.
+ if (state != WAITING) blk: {
+ state = self.state.tryCompareAndSwap(
+ state,
+ WAITING,
+ .Monotonic,
+ .Monotonic,
+ ) orelse break :blk;
+ continue;
+ }
+
+ // Wait on the event until a notify() or shutdown().
+ // If we wake up to a notification, we must acquire it with WAITING instead of EMPTY
+ // since there may be other threads sleeping on the Futex who haven't been woken up yet.
+ //
+ // Acquiring to WAITING will make the next notify() or shutdown() wake a sleeping futex thread
+ // who will either exit on SHUTDOWN or acquire with WAITING again, ensuring all threads are awoken.
+ // This unfortunately results in the last notify() or shutdown() doing an extra futex wake but that's fine.
+ Futex.wait(&self.state, WAITING, null) catch unreachable;
+ state = self.state.load(.Monotonic);
+ acquire_with = WAITING;
+ }
+ }
+
+ /// Post a notification to the event if it doesn't have one already
+ /// then wake up a waiting thread if there is one as well.
+ fn notify(self: *Event) void {
+ return self.wake(NOTIFIED, 1);
+ }
+
+ /// Marks the event as shutdown, making all future wait()'s return immediately.
+ /// Then wakes up any threads currently waiting on the Event.
+ fn shutdown(self: *Event) void {
+ return self.wake(SHUTDOWN, std.math.maxInt(u32));
+ }
+
+ fn wake(self: *Event, release_with: u32, wake_threads: u32) void {
+ // Update the Event to notifty it with the new `release_with` state (either NOTIFIED or SHUTDOWN).
+ // Release barrier to ensure any operations before this are this to happen before the wait() in the other threads.
+ const state = self.state.swap(release_with, .Release);
+
+ // Only wake threads sleeping in futex if the state is WAITING.
+ // Avoids unnecessary wake ups.
+ if (state == WAITING) {
+ Futex.wake(&self.state, wake_threads);
+ }
+ }
+};
+
+/// Linked list intrusive memory node and lock-free data structures to operate with it
+const Node = struct {
+ next: ?*Node = null,
+
+ /// A linked list of Nodes
+ const List = struct {
+ head: *Node,
+ tail: *Node,
+ };
+
+ /// An unbounded multi-producer-(non blocking)-multi-consumer queue of Node pointers.
+ const Queue = struct {
+ stack: Atomic(usize) = Atomic(usize).init(0),
+ cache: ?*Node = null,
+
+ const HAS_CACHE: usize = 0b01;
+ const IS_CONSUMING: usize = 0b10;
+ const PTR_MASK: usize = ~(HAS_CACHE | IS_CONSUMING);
+
+ comptime {
+ assert(@alignOf(Node) >= ((IS_CONSUMING | HAS_CACHE) + 1));
+ }
+
+ fn push(noalias self: *Queue, list: List) void {
+ var stack = self.stack.load(.Monotonic);
+ while (true) {
+ // Attach the list to the stack (pt. 1)
+ list.tail.next = @intToPtr(?*Node, stack & PTR_MASK);
+
+ // Update the stack with the list (pt. 2).
+ // Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer.
+ var new_stack = @ptrToInt(list.head);
+ assert(new_stack & ~PTR_MASK == 0);
+ new_stack |= (stack & ~PTR_MASK);
+
+ // Push to the stack with a release barrier for the consumer to see the proper list links.
+ stack = self.stack.tryCompareAndSwap(
+ stack,
+ new_stack,
+ .Release,
+ .Monotonic,
+ ) orelse break;
+ }
+ }
+
+ fn tryAcquireConsumer(self: *Queue) error{ Empty, Contended }!?*Node {
+ var stack = self.stack.load(.Monotonic);
+ while (true) {
+ if (stack & IS_CONSUMING != 0)
+ return error.Contended; // The queue already has a consumer.
+ if (stack & (HAS_CACHE | PTR_MASK) == 0)
+ return error.Empty; // The queue is empty when there's nothing cached and nothing in the stack.
+
+ // When we acquire the consumer, also consume the pushed stack if the cache is empty.
+ var new_stack = stack | HAS_CACHE | IS_CONSUMING;
+ if (stack & HAS_CACHE == 0) {
+ assert(stack & PTR_MASK != 0);
+ new_stack &= ~PTR_MASK;
+ }
+
+ // Acquire barrier on getting the consumer to see cache/Node updates done by previous consumers
+ // and to ensure our cache/Node updates in pop() happen after that of previous consumers.
+ stack = self.stack.tryCompareAndSwap(
+ stack,
+ new_stack,
+ .Acquire,
+ .Monotonic,
+ ) orelse return self.cache orelse @intToPtr(*Node, stack & PTR_MASK);
+ }
+ }
+
+ fn releaseConsumer(noalias self: *Queue, noalias consumer: ?*Node) void {
+ // Stop consuming and remove the HAS_CACHE bit as well if the consumer's cache is empty.
+ // When HAS_CACHE bit is zeroed, the next consumer will acquire the pushed stack nodes.
+ var remove = IS_CONSUMING;
+ if (consumer == null)
+ remove |= HAS_CACHE;
+
+ // Release the consumer with a release barrier to ensure cache/node accesses
+ // happen before the consumer was released and before the next consumer starts using the cache.
+ self.cache = consumer;
+ const stack = self.stack.fetchSub(remove, .Release);
+ assert(stack & remove != 0);
+ }
+
+ fn pop(noalias self: *Queue, noalias consumer_ref: *?*Node) ?*Node {
+ // Check the consumer cache (fast path)
+ if (consumer_ref.*) |node| {
+ consumer_ref.* = node.next;
+ return node;
+ }
+
+ // Load the stack to see if there was anything pushed that we could grab.
+ var stack = self.stack.load(.Monotonic);
+ assert(stack & IS_CONSUMING != 0);
+ if (stack & PTR_MASK == 0) {
+ return null;
+ }
+
+ // Nodes have been pushed to the stack, grab then with an Acquire barrier to see the Node links.
+ stack = self.stack.swap(HAS_CACHE | IS_CONSUMING, .Acquire);
+ assert(stack & IS_CONSUMING != 0);
+ assert(stack & PTR_MASK != 0);
+
+ const node = @intToPtr(*Node, stack & PTR_MASK);
+ consumer_ref.* = node.next;
+ return node;
+ }
+ };
+
+ /// A bounded single-producer, multi-consumer ring buffer for node pointers.
+ const Buffer = struct {
+ head: Atomic(Index) = Atomic(Index).init(0),
+ tail: Atomic(Index) = Atomic(Index).init(0),
+ array: [capacity]Atomic(*Node) = undefined,
+
+ const Index = u32;
+ const capacity = 256; // Appears to be a pretty good trade-off in space vs contended throughput
+ comptime {
+ assert(std.math.maxInt(Index) >= capacity);
+ assert(std.math.isPowerOfTwo(capacity));
+ }
+
+ fn push(noalias self: *Buffer, noalias list: *List) error{Overflow}!void {
+ var head = self.head.load(.Monotonic);
+ var tail = self.tail.loadUnchecked(); // we're the only thread that can change this
+
+ while (true) {
+ var size = tail -% head;
+ assert(size <= capacity);
+
+ // Push nodes from the list to the buffer if it's not empty..
+ if (size < capacity) {
+ var nodes: ?*Node = list.head;
+ while (size < capacity) : (size += 1) {
+ const node = nodes orelse break;
+ nodes = node.next;
+
+ // Array written atomically with weakest ordering since it could be getting atomically read by steal().
+ self.array[tail % capacity].store(node, .Unordered);
+ tail +%= 1;
+ }
+
+ // Release barrier synchronizes with Acquire loads for steal()ers to see the array writes.
+ self.tail.store(tail, .Release);
+
+ // Update the list with the nodes we pushed to the buffer and try again if there's more.
+ list.head = nodes orelse return;
+ std.atomic.spinLoopHint();
+ head = self.head.load(.Monotonic);
+ continue;
+ }
+
+ // Try to steal/overflow half of the tasks in the buffer to make room for future push()es.
+ // Migrating half amortizes the cost of stealing while requiring future pops to still use the buffer.
+ // Acquire barrier to ensure the linked list creation after the steal only happens after we succesfully steal.
+ var migrate = size / 2;
+ head = self.head.tryCompareAndSwap(
+ head,
+ head +% migrate,
+ .Acquire,
+ .Monotonic,
+ ) orelse {
+ // Link the migrated Nodes together
+ const first = self.array[head % capacity].loadUnchecked();
+ while (migrate > 0) : (migrate -= 1) {
+ const prev = self.array[head % capacity].loadUnchecked();
+ head +%= 1;
+ prev.next = self.array[head % capacity].loadUnchecked();
+ }
+
+ // Append the list that was supposed to be pushed to the end of the migrated Nodes
+ const last = self.array[(head -% 1) % capacity].loadUnchecked();
+ last.next = list.head;
+ list.tail.next = null;
+
+ // Return the migrated nodes + the original list as overflowed
+ list.head = first;
+ return error.Overflow;
+ };
+ }
+ }
+
+ fn pop(self: *Buffer) ?*Node {
+ var head = self.head.load(.Monotonic);
+ var tail = self.tail.loadUnchecked(); // we're the only thread that can change this
+
+ while (true) {
+ // Quick sanity check and return null when not empty
+ var size = tail -% head;
+ assert(size <= capacity);
+ if (size == 0) {
+ return null;
+ }
+
+ // Dequeue with an acquire barrier to ensure any writes done to the Node
+ // only happen after we succesfully claim it from the array.
+ head = self.head.tryCompareAndSwap(
+ head,
+ head +% 1,
+ .Acquire,
+ .Monotonic,
+ ) orelse return self.array[head % capacity].loadUnchecked();
+ }
+ }
+
+ const Stole = struct {
+ node: *Node,
+ pushed: bool,
+ };
+
+ fn consume(noalias self: *Buffer, noalias queue: *Queue) ?Stole {
+ var consumer = queue.tryAcquireConsumer() catch return null;
+ defer queue.releaseConsumer(consumer);
+
+ const head = self.head.load(.Monotonic);
+ const tail = self.tail.loadUnchecked(); // we're the only thread that can change this
+
+ const size = tail -% head;
+ assert(size <= capacity);
+ assert(size == 0); // we should only be consuming if our array is empty
+
+ // Pop nodes from the queue and push them to our array.
+ // Atomic stores to the array as steal() threads may be atomically reading from it.
+ var pushed: Index = 0;
+ while (pushed < capacity) : (pushed += 1) {
+ const node = queue.pop(&consumer) orelse break;
+ self.array[(tail +% pushed) % capacity].store(node, .Unordered);
+ }
+
+ // We will be returning one node that we stole from the queue.
+ // Get an extra, and if that's not possible, take one from our array.
+ const node = queue.pop(&consumer) orelse blk: {
+ if (pushed == 0) return null;
+ pushed -= 1;
+ break :blk self.array[(tail +% pushed) % capacity].loadUnchecked();
+ };
+
+ // Update the array tail with the nodes we pushed to it.
+ // Release barrier to synchronize with Acquire barrier in steal()'s to see the written array Nodes.
+ if (pushed > 0) self.tail.store(tail +% pushed, .Release);
+ return Stole{
+ .node = node,
+ .pushed = pushed > 0,
+ };
+ }
+
+ fn steal(noalias self: *Buffer, noalias buffer: *Buffer) ?Stole {
+ const head = self.head.load(.Monotonic);
+ const tail = self.tail.loadUnchecked(); // we're the only thread that can change this
+
+ const size = tail -% head;
+ assert(size <= capacity);
+ assert(size == 0); // we should only be stealing if our array is empty
+
+ while (true) : (std.atomic.spinLoopHint()) {
+ const buffer_head = buffer.head.load(.Acquire);
+ const buffer_tail = buffer.tail.load(.Acquire);
+
+ // Overly large size indicates the the tail was updated a lot after the head was loaded.
+ // Reload both and try again.
+ const buffer_size = buffer_tail -% buffer_head;
+ if (buffer_size > capacity) {
+ continue;
+ }
+
+ // Try to steal half (divCeil) to amortize the cost of stealing from other threads.
+ const steal_size = buffer_size - (buffer_size / 2);
+ if (steal_size == 0) {
+ return null;
+ }
+
+ // Copy the nodes we will steal from the target's array to our own.
+ // Atomically load from the target buffer array as it may be pushing and atomically storing to it.
+ // Atomic store to our array as other steal() threads may be atomically loading from it as above.
+ var i: Index = 0;
+ while (i < steal_size) : (i += 1) {
+ const node = buffer.array[(buffer_head +% i) % capacity].load(.Unordered);
+ self.array[(tail +% i) % capacity].store(node, .Unordered);
+ }
+
+ // Try to commit the steal from the target buffer using:
+ // - an Acquire barrier to ensure that we only interact with the stolen Nodes after the steal was committed.
+ // - a Release barrier to ensure that the Nodes are copied above prior to the committing of the steal
+ // because if they're copied after the steal, the could be getting rewritten by the target's push().
+ _ = buffer.head.compareAndSwap(
+ buffer_head,
+ buffer_head +% steal_size,
+ .AcqRel,
+ .Monotonic,
+ ) orelse {
+ // Pop one from the nodes we stole as we'll be returning it
+ const pushed = steal_size - 1;
+ const node = self.array[(tail +% pushed) % capacity].loadUnchecked();
+
+ // Update the array tail with the nodes we pushed to it.
+ // Release barrier to synchronize with Acquire barrier in steal()'s to see the written array Nodes.
+ if (pushed > 0) self.tail.store(tail +% pushed, .Release);
+ return Stole{
+ .node = node,
+ .pushed = pushed > 0,
+ };
+ };
+ }
+ }
+ };
+};