diff options
-rw-r--r-- | .vscode/launch.json | 4 | ||||
-rw-r--r-- | misctools/http_bench.zig | 5 | ||||
-rw-r--r-- | src/http/http_client_async.zig | 328 | ||||
-rw-r--r-- | src/http_client.zig | 297 | ||||
-rw-r--r-- | src/install/install.zig | 699 | ||||
-rw-r--r-- | src/install/semver.zig | 4 | ||||
-rw-r--r-- | src/thread_pool.zig | 13 |
7 files changed, 849 insertions, 501 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json index 9cccbb2c4..be255a9a0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,7 +6,7 @@ "request": "launch", "name": "HTTP bench", "program": "${workspaceFolder}/misctools/http_bench", - "args": ["https://example.com", "--count=1"], + "args": ["https://registry.npmjs.org/next", "--count=1"], "cwd": "${workspaceFolder}", "console": "internalConsole" }, @@ -349,6 +349,8 @@ "args": ["install"], "cwd": "/tmp/wow-such-npm", "env": { + "GOMAXPROCS": "1", + "BUN_MANIFEST_CACHE": "1" }, "console": "internalConsole" }, diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig index ff937f6db..5d72770ef 100644 --- a/misctools/http_bench.zig +++ b/misctools/http_bench.zig @@ -197,6 +197,8 @@ pub fn main() anyerror!void { }; var groups = try default_allocator.alloc(Group, args.count); var i: usize = 0; + const Batch = @import("../src/thread_pool.zig").Batch; + var batch = Batch{}; while (i < args.count) : (i += 1) { groups[i] = Group{}; var response_body = &groups[i].response_body; @@ -219,8 +221,9 @@ pub fn main() anyerror!void { ), }; ctx.http.callback = HTTP.HTTPChannelContext.callback; - ctx.http.schedule(default_allocator); + ctx.http.schedule(default_allocator, &batch); } + NetworkThread.global.pool.schedule(batch); var read_count: usize = 0; var success_count: usize = 0; diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig index 452d44afa..3d02df842 100644 --- a/src/http/http_client_async.zig +++ b/src/http/http_client_async.zig @@ -62,6 +62,14 @@ progress_node: ?*std.Progress.Node = null, socket: AsyncSocket.SSL = undefined, gzip_elapsed: u64 = 0, +/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since. +/// This is a workaround for that. +force_last_modified: bool = false, +if_modified_since: string = "", +request_content_len_buf: ["-4294967295".len]u8 = undefined, +request_headers_buf: [128]picohttp.Header = undefined, +response_headers_buf: [128]picohttp.Header = undefined, + pub fn init( allocator: *std.mem.Allocator, method: Method, @@ -86,9 +94,6 @@ pub fn deinit(this: *HTTPClient) !void { } } -threadlocal var response_headers_buf: [256]picohttp.Header = undefined; -threadlocal var request_content_len_buf: [64]u8 = undefined; -threadlocal var header_name_hashes: [256]u64 = undefined; // threadlocal var resolver_cache const tcp = std.x.net.tcp; const ip = std.x.net.ip; @@ -193,8 +198,6 @@ pub const HeaderBuilder = struct { } }; -threadlocal var server_name_buf: [1024]u8 = undefined; - pub const HTTPChannel = @import("../sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 }); pub const HTTPChannelContext = struct { @@ -1282,15 +1285,15 @@ pub const AsyncBIO = struct { }; }; -threadlocal var request_headers_buf: [256]picohttp.Header = undefined; - -pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { +pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); var header_names = header_entries.items(.name); var header_values = header_entries.items(.value); + var request_headers_buf = &this.request_headers_buf; var override_accept_encoding = false; + var override_accept_header = false; var override_user_agent = false; for (header_names) |head, i| { @@ -1305,6 +1308,14 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { connection_header_hash, content_length_header_hash, => continue, + hashHeaderName("if-modified-since") => { + if (this.force_last_modified and this.if_modified_since.len == 0) { + this.if_modified_since = this.headerStr(header_values[i]); + } + }, + accept_header_hash => { + override_accept_header = true; + }, else => {}, } @@ -1341,8 +1352,10 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { header_count += 1; } - request_headers_buf[header_count] = accept_header; - header_count += 1; + if (!override_accept_header) { + request_headers_buf[header_count] = accept_header; + header_count += 1; + } request_headers_buf[header_count] = picohttp.Header{ .name = host_header_name, @@ -1358,7 +1371,7 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { if (body_len > 0) { request_headers_buf[header_count] = picohttp.Header{ .name = content_length_header_name, - .value = std.fmt.bufPrint(&request_content_len_buf, "{d}", .{body_len}) catch "0", + .value = std.fmt.bufPrint(&this.request_content_len_buf, "{d}", .{body_len}) catch "0", }; header_count += 1; } @@ -1383,7 +1396,7 @@ pub fn connect( client.setNoDelay(true) catch {}; client.setReadBufferSize(BufferPool.len) catch {}; client.setQuickACK(true) catch {}; - + this.tcp_client = client; if (this.timeout > 0) { client.setReadTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; client.setWriteTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; @@ -1618,7 +1631,7 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo var request_body = request_buffer[0..read_length]; read_headers_up_to = if (read_headers_up_to > read_length) read_length else read_headers_up_to; - response = picohttp.Response.parseParts(request_body, &response_headers_buf, &read_headers_up_to) catch |err| { + response = picohttp.Response.parseParts(request_body, &this.response_headers_buf, &read_headers_up_to) catch |err| { switch (err) { error.ShortRead => { continue :restart; @@ -1642,9 +1655,7 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo var location: string = ""; - if (this.verbose) { - Output.prettyErrorln("Response: {s}", .{response}); - } + var pretend_its_304 = false; for (response.headers) |header| { switch (hashHeaderName(header.name)) { @@ -1679,11 +1690,22 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo location_header_hash => { location = header.value; }, + hashHeaderName("Last-Modified") => { + if (this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0) { + if (strings.eql(this.if_modified_since, header.value)) { + pretend_its_304 = true; + } + } + }, else => {}, } } + if (this.verbose) { + Output.prettyErrorln("Response: {s}", .{response}); + } + if (location.len > 0 and this.remaining_redirect_count > 0) { switch (response.status_code) { 302, 301, 307, 308, 303 => { @@ -1727,169 +1749,177 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo } } - if (transfer_encoding == Encoding.chunked) { - var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); - var buffer_: *MutableString = body_out_str; + body_getter: { + if (pretend_its_304) { + response.status_code = 304; + } + + if (response.status_code == 304) break :body_getter; - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - if (!ZlibPool.loaded) { - ZlibPool.instance = ZlibPool.init(default_allocator); - ZlibPool.loaded = true; - } + if (transfer_encoding == Encoding.chunked) { + var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); + var buffer_: *MutableString = body_out_str; - buffer_ = try ZlibPool.instance.get(); - }, - else => {}, - } + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } - var buffer = buffer_.*; + buffer_ = try ZlibPool.instance.get(); + }, + else => {}, + } - var last_read: usize = 0; - { - var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; - last_read = remainder.len; - try buffer.inflate(std.math.max(remainder.len, 2048)); - buffer.list.expandToCapacity(); - std.mem.copy(u8, buffer.list.items, remainder); - } - - // set consume_trailer to 1 to discard the trailing header - // using content-encoding per chunk is not supported - decoder.consume_trailer = 1; - - // these variable names are terrible - // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked - // (but ported from C -> zig) - var rret: usize = 0; - var rsize: usize = last_read; - var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); - var total_size = rsize; - - while (pret == -2) { - if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { - try buffer.inflate(std.math.max(total_size * 2, 1024)); + var buffer = buffer_.*; + + var last_read: usize = 0; + { + var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; + last_read = remainder.len; + try buffer.inflate(std.math.max(remainder.len, 2048)); buffer.list.expandToCapacity(); + std.mem.copy(u8, buffer.list.items, remainder); } - rret = try client.read(buffer.list.items, total_size); + // set consume_trailer to 1 to discard the trailing header + // using content-encoding per chunk is not supported + decoder.consume_trailer = 1; + + // these variable names are terrible + // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked + // (but ported from C -> zig) + var rret: usize = 0; + var rsize: usize = last_read; + var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); + var total_size = rsize; + + while (pret == -2) { + if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { + try buffer.inflate(std.math.max(total_size * 2, 1024)); + buffer.list.expandToCapacity(); + } + + rret = try client.read(buffer.list.items, total_size); + + if (rret == 0) { + return error.ChunkedEncodingError; + } + + rsize = rret; + pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); + if (pret == -1) return error.ChunkedEncodingParseError; - if (rret == 0) { - return error.ChunkedEncodingError; + total_size += rsize; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(total_size); + this.progress_node.?.context.maybeRefresh(); + } } - rsize = rret; - pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); - if (pret == -1) return error.ChunkedEncodingParseError; + buffer.list.shrinkRetainingCapacity(total_size); + buffer_.* = buffer; - total_size += rsize; + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + var gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { + Output.prettyErrorln("<r><red>Zlib error<r>", .{}); + Output.flush(); + return err; + }; + this.gzip_elapsed = gzip_timer.read(); + }, + else => {}, + } if (comptime report_progress) { this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(total_size); + this.progress_node.?.setCompletedItems(body_out_str.list.items.len); this.progress_node.?.context.maybeRefresh(); } - } - buffer.list.shrinkRetainingCapacity(total_size); - buffer_.* = buffer; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - var gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); - body_out_str.list.expandToCapacity(); - defer ZlibPool.instance.put(buffer_) catch unreachable; - ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { - Output.prettyErrorln("<r><red>Zlib error<r>", .{}); - Output.flush(); - return err; - }; - this.gzip_elapsed = gzip_timer.read(); - }, - else => {}, + this.body_size = @intCast(u32, body_out_str.list.items.len); + return response; } - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_out_str.list.items.len); - this.progress_node.?.context.maybeRefresh(); - } - - this.body_size = @intCast(u32, body_out_str.list.items.len); - return response; - } + if (content_length > 0) { + var remaining_content_length = content_length; + var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; + remainder = remainder[0..std.math.min(remainder.len, content_length)]; + var buffer_: *MutableString = body_out_str; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } - if (content_length > 0) { - var remaining_content_length = content_length; - var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; - remainder = remainder[0..std.math.min(remainder.len, content_length)]; - var buffer_: *MutableString = body_out_str; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - if (!ZlibPool.loaded) { - ZlibPool.instance = ZlibPool.init(default_allocator); - ZlibPool.loaded = true; - } + buffer_ = try ZlibPool.instance.get(); + if (buffer_.list.capacity < remaining_content_length) { + try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); + } + buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; + }, + else => {}, + } + var buffer = buffer_.*; + + var body_size: usize = 0; + if (remainder.len > 0) { + std.mem.copy(u8, buffer.list.items, remainder); + body_size = remainder.len; + this.read_count += @intCast(u32, body_size); + remaining_content_length -= @intCast(u32, remainder.len); + } - buffer_ = try ZlibPool.instance.get(); - if (buffer_.list.capacity < remaining_content_length) { - try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); + while (remaining_content_length > 0) { + const size = @intCast(u32, try client.read( + buffer.list.items, + body_size, + )); + this.read_count += size; + if (size == 0) break; + + body_size += size; + remaining_content_length -= size; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_size); + this.progress_node.?.context.maybeRefresh(); } - buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; - }, - else => {}, - } - var buffer = buffer_.*; - - var body_size: usize = 0; - if (remainder.len > 0) { - std.mem.copy(u8, buffer.list.items, remainder); - body_size = remainder.len; - this.read_count += @intCast(u32, body_size); - remaining_content_length -= @intCast(u32, remainder.len); - } - - while (remaining_content_length > 0) { - const size = @intCast(u32, try client.read( - buffer.list.items, - body_size, - )); - this.read_count += size; - if (size == 0) break; - - body_size += size; - remaining_content_length -= size; + } if (comptime report_progress) { this.progress_node.?.activate(); this.progress_node.?.setCompletedItems(body_size); this.progress_node.?.context.maybeRefresh(); } - } - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_size); - this.progress_node.?.context.maybeRefresh(); - } + buffer.list.shrinkRetainingCapacity(body_size); + buffer_.* = buffer; - buffer.list.shrinkRetainingCapacity(body_size); - buffer_.* = buffer; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - var gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); - body_out_str.list.expandToCapacity(); - defer ZlibPool.instance.put(buffer_) catch unreachable; - ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { - Output.prettyErrorln("<r><red>Zlib error<r>", .{}); - Output.flush(); - return err; - }; - this.gzip_elapsed = gzip_timer.read(); - }, - else => {}, + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + var gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { + Output.prettyErrorln("<r><red>Zlib error<r>", .{}); + Output.flush(); + return err; + }; + this.gzip_elapsed = gzip_timer.read(); + }, + else => {}, + } } } diff --git a/src/http_client.zig b/src/http_client.zig index 4ea9fb3f7..9fa3e2e6c 100644 --- a/src/http_client.zig +++ b/src/http_client.zig @@ -48,6 +48,10 @@ redirect_buf: [2048]u8 = undefined, disable_shutdown: bool = true, timeout: u32 = 0, progress_node: ?*std.Progress.Node = null, +force_last_modified: bool = false, +if_modified_since: string = "", +response_headers_buf: [256]picohttp.Header = undefined, +request_headers_buf: [256]picohttp.Header = undefined, pub fn init(allocator: *std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string) HTTPClient { return HTTPClient{ @@ -59,8 +63,6 @@ pub fn init(allocator: *std.mem.Allocator, method: Method, url: URL, header_entr }; } -threadlocal var response_headers_buf: [256]picohttp.Header = undefined; -threadlocal var request_headers_buf: [256]picohttp.Header = undefined; threadlocal var request_content_len_buf: [64]u8 = undefined; threadlocal var header_name_hashes: [256]u64 = undefined; // threadlocal var resolver_cache @@ -169,11 +171,12 @@ pub const HeaderBuilder = struct { threadlocal var server_name_buf: [1024]u8 = undefined; -pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { +pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); var header_names = header_entries.items(.name); var header_values = header_entries.items(.value); + var request_headers_buf = &this.request_headers_buf; var override_accept_encoding = false; @@ -190,6 +193,11 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { connection_header_hash, content_length_header_hash, => continue, + hashHeaderName("if-modified-since") => { + if (this.force_last_modified) { + this.if_modified_since = this.headerStr(header_values[i]); + } + }, else => {}, } @@ -425,7 +433,7 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo var request_buffer = http_req_buf[0..read_length]; read_headers_up_to = if (read_headers_up_to > read_length) read_length else read_headers_up_to; - response = picohttp.Response.parseParts(request_buffer, &response_headers_buf, &read_headers_up_to) catch |err| { + response = picohttp.Response.parseParts(request_buffer, &this.response_headers_buf, &read_headers_up_to) catch |err| { switch (err) { error.ShortRead => { continue :restart; @@ -443,6 +451,7 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo var content_length: u32 = 0; var encoding = Encoding.identity; var transfer_encoding = Encoding.identity; + var pretend_its_304 = false; var location: string = ""; @@ -452,6 +461,13 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo for (response.headers) |header| { switch (hashHeaderName(header.name)) { + hashHeaderName("Last-Modified") => { + if (this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0) { + if (strings.eql(this.if_modified_since, header.value)) { + pretend_its_304 = true; + } + } + }, content_length_header_hash => { content_length = std.fmt.parseInt(u32, header.value, 10) catch 0; try body_out_str.inflate(content_length); @@ -519,170 +535,177 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo } } - if (transfer_encoding == Encoding.chunked) { - var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); - var buffer_: *MutableString = body_out_str; + body_getter: { + if (pretend_its_304) { + response.status_code = 304; + break :body_getter; + } - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - if (!ZlibPool.loaded) { - ZlibPool.instance = ZlibPool.init(default_allocator); - ZlibPool.loaded = true; - } + if (transfer_encoding == Encoding.chunked) { + var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); + var buffer_: *MutableString = body_out_str; - buffer_ = try ZlibPool.instance.get(); - }, - else => {}, - } + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } - var buffer = buffer_.*; + buffer_ = try ZlibPool.instance.get(); + }, + else => {}, + } - var last_read: usize = 0; - { - var remainder = http_req_buf[@intCast(usize, response.bytes_read)..read_length]; - last_read = remainder.len; - try buffer.inflate(std.math.max(remainder.len, 2048)); - buffer.list.expandToCapacity(); - std.mem.copy(u8, buffer.list.items, remainder); - } + var buffer = buffer_.*; - // set consume_trailer to 1 to discard the trailing header - // using content-encoding per chunk is not supported - decoder.consume_trailer = 1; - - // these variable names are terrible - // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked - // (but ported from C -> zig) - var rret: usize = 0; - var rsize: usize = last_read; - var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); - var total_size = rsize; - - while (pret == -2) { - if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { - try buffer.inflate(std.math.max(total_size * 2, 1024)); + var last_read: usize = 0; + { + var remainder = http_req_buf[@intCast(usize, response.bytes_read)..read_length]; + last_read = remainder.len; + try buffer.inflate(std.math.max(remainder.len, 2048)); buffer.list.expandToCapacity(); + std.mem.copy(u8, buffer.list.items, remainder); } - rret = try client.read(buffer.list.items[total_size..]); + // set consume_trailer to 1 to discard the trailing header + // using content-encoding per chunk is not supported + decoder.consume_trailer = 1; + + // these variable names are terrible + // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked + // (but ported from C -> zig) + var rret: usize = 0; + var rsize: usize = last_read; + var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); + var total_size = rsize; + + while (pret == -2) { + if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { + try buffer.inflate(std.math.max(total_size * 2, 1024)); + buffer.list.expandToCapacity(); + } - if (rret == 0) { - return error.ChunkedEncodingError; - } + rret = try client.read(buffer.list.items[total_size..]); + + if (rret == 0) { + return error.ChunkedEncodingError; + } - rsize = rret; - pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); - if (pret == -1) return error.ChunkedEncodingParseError; + rsize = rret; + pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); + if (pret == -1) return error.ChunkedEncodingParseError; - total_size += rsize; + total_size += rsize; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(total_size); + this.progress_node.?.context.maybeRefresh(); + } + } + + buffer.list.shrinkRetainingCapacity(total_size); + buffer_.* = buffer; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator); + reader.readAll() catch |err| { + if (reader.errorMessage()) |msg| { + Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg}); + Output.flush(); + } + return err; + }; + }, + else => {}, + } if (comptime report_progress) { this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(total_size); + this.progress_node.?.setCompletedItems(body_out_str.list.items.len); this.progress_node.?.context.maybeRefresh(); } - } - buffer.list.shrinkRetainingCapacity(total_size); - buffer_.* = buffer; + this.body_size = @intCast(u32, body_out_str.list.items.len); + return response; + } - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - body_out_str.list.expandToCapacity(); - defer ZlibPool.instance.put(buffer_) catch unreachable; - var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator); - reader.readAll() catch |err| { - if (reader.errorMessage()) |msg| { - Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg}); - Output.flush(); + if (content_length > 0) { + var remaining_content_length = content_length; + var remainder = http_req_buf[@intCast(usize, response.bytes_read)..read_length]; + remainder = remainder[0..std.math.min(remainder.len, content_length)]; + var buffer_: *MutableString = body_out_str; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; } - return err; - }; - }, - else => {}, - } - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_out_str.list.items.len); - this.progress_node.?.context.maybeRefresh(); - } + buffer_ = try ZlibPool.instance.get(); + if (buffer_.list.capacity < remaining_content_length) { + try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); + } + buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; + }, + else => {}, + } + var buffer = buffer_.*; + + var body_size: usize = 0; + if (remainder.len > 0) { + std.mem.copy(u8, buffer.list.items, remainder); + body_size = remainder.len; + this.read_count += @intCast(u32, body_size); + remaining_content_length -= @intCast(u32, remainder.len); + } - this.body_size = @intCast(u32, body_out_str.list.items.len); - return response; - } + while (remaining_content_length > 0) { + const size = @intCast(u32, try client.read( + buffer.list.items[body_size..], + )); + this.read_count += size; + if (size == 0) break; - if (content_length > 0) { - var remaining_content_length = content_length; - var remainder = http_req_buf[@intCast(usize, response.bytes_read)..read_length]; - remainder = remainder[0..std.math.min(remainder.len, content_length)]; - var buffer_: *MutableString = body_out_str; - - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - if (!ZlibPool.loaded) { - ZlibPool.instance = ZlibPool.init(default_allocator); - ZlibPool.loaded = true; - } + body_size += size; + remaining_content_length -= size; - buffer_ = try ZlibPool.instance.get(); - if (buffer_.list.capacity < remaining_content_length) { - try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_size); + this.progress_node.?.context.maybeRefresh(); } - buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; - }, - else => {}, - } - var buffer = buffer_.*; - - var body_size: usize = 0; - if (remainder.len > 0) { - std.mem.copy(u8, buffer.list.items, remainder); - body_size = remainder.len; - this.read_count += @intCast(u32, body_size); - remaining_content_length -= @intCast(u32, remainder.len); - } - - while (remaining_content_length > 0) { - const size = @intCast(u32, try client.read( - buffer.list.items[body_size..], - )); - this.read_count += size; - if (size == 0) break; - - body_size += size; - remaining_content_length -= size; + } if (comptime report_progress) { this.progress_node.?.activate(); this.progress_node.?.setCompletedItems(body_size); this.progress_node.?.context.maybeRefresh(); } - } - - if (comptime report_progress) { - this.progress_node.?.activate(); - this.progress_node.?.setCompletedItems(body_size); - this.progress_node.?.context.maybeRefresh(); - } - - buffer.list.shrinkRetainingCapacity(body_size); - buffer_.* = buffer; - switch (encoding) { - Encoding.gzip, Encoding.deflate => { - body_out_str.list.expandToCapacity(); - defer ZlibPool.instance.put(buffer_) catch unreachable; - var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator); - reader.readAll() catch |err| { - if (reader.errorMessage()) |msg| { - Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg}); - Output.flush(); - } - return err; - }; - }, - else => {}, + buffer.list.shrinkRetainingCapacity(body_size); + buffer_.* = buffer; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + var reader = try Zlib.ZlibReaderArrayList.init(buffer.list.items, &body_out_str.list, default_allocator); + reader.readAll() catch |err| { + if (reader.errorMessage()) |msg| { + Output.prettyErrorln("<r><red>Zlib error<r>: <b>{s}<r>", .{msg}); + Output.flush(); + } + return err; + }; + }, + else => {}, + } } } diff --git a/src/install/install.zig b/src/install/install.zig index e8caec194..a2116935d 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -72,15 +72,15 @@ pub fn ExternalSlice(comptime Type: type) type { off: u32 = 0, len: u32 = 0, - pub inline fn get(this: Slice, in: []const Type) []const Type { + pub inline fn get(this: Slice, in: []align(1) const Type) []align(1) const Type { return in[this.off..@minimum(in.len, this.off + this.len)]; } - pub inline fn mut(this: Slice, in: []Type) []Type { + pub inline fn mut(this: Slice, in: []align(1) Type) []align(1) Type { return in[this.off..@minimum(in.len, this.off + this.len)]; } - pub fn init(buf: []const Type, in: []const Type) Slice { + pub fn init(buf: []align(1) const Type, in: []align(1) const Type) Slice { // if (comptime isDebug or isTest) { // std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr)); // std.debug.assert((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len)); @@ -102,7 +102,10 @@ const NetworkTask = struct { request_buffer: MutableString = undefined, response_buffer: MutableString = undefined, callback: union(Task.Tag) { - package_manifest: string, + package_manifest: struct { + loaded_manifest: ?Npm.PackageManifest = null, + name: string, + }, extract: ExtractTarball, }, @@ -116,32 +119,42 @@ const NetworkTask = struct { name: string, allocator: *std.mem.Allocator, registry_url: URL, + loaded_manifest: ?Npm.PackageManifest, ) !void { this.url_buf = try std.fmt.allocPrint(allocator, "{s}://{s}/{s}", .{ registry_url.displayProtocol(), registry_url.hostname, name }); var last_modified: string = ""; var etag: string = ""; - var header_builder = HTTPClient.HeaderBuilder{}; - - if (last_modified.len != 0) { - header_builder.count("If-Modified-Since", last_modified); + if (loaded_manifest) |manifest| { + last_modified = manifest.pkg.last_modified.slice(manifest.string_buf); + etag = manifest.pkg.etag.slice(manifest.string_buf); } + var header_builder = HTTPClient.HeaderBuilder{}; + if (etag.len != 0) { header_builder.count("If-None-Match", etag); + } else if (last_modified.len != 0) { + header_builder.count("If-Modified-Since", last_modified); } - if (header_builder.content.len > 0) { + if (header_builder.header_count > 0) { header_builder.count("Accept", "application/vnd.npm.install-v1+json"); - - if (last_modified.len != 0) { - header_builder.append("If-Modified-Since", last_modified); + if (last_modified.len > 0 and etag.len > 0) { + header_builder.content.count(last_modified); } + try header_builder.allocate(allocator); if (etag.len != 0) { header_builder.append("If-None-Match", etag); + } else if (last_modified.len != 0) { + header_builder.append("If-Modified-Since", last_modified); } header_builder.append("Accept", "application/vnd.npm.install-v1+json"); + + if (last_modified.len > 0 and etag.len > 0) { + last_modified = header_builder.content.append(last_modified); + } } else { try header_builder.entries.append( allocator, @@ -167,7 +180,24 @@ const NetworkTask = struct { &this.request_buffer, 0, ); - this.callback = .{ .package_manifest = name }; + this.callback = .{ + .package_manifest = .{ + .name = name, + .loaded_manifest = loaded_manifest, + }, + }; + + if (verbose_install) { + this.http.verbose = true; + this.http.client.verbose = true; + } + + // Incase the ETag causes invalidation, we fallback to the last modified date. + if (last_modified.len != 0) { + this.http.client.force_last_modified = true; + this.http.client.if_modified_since = last_modified; + } + this.http.callback = notify; } @@ -540,7 +570,7 @@ pub const Package = struct { log: *logger.Log, manifest: *const Npm.PackageManifest, version: Semver.Version, - package_version: *const Npm.PackageVersion, + package_version: *align(1) const Npm.PackageVersion, features: Features, string_buf: []const u8, ) Package { @@ -614,8 +644,8 @@ pub const Package = struct { package_id: PackageID, log: *logger.Log, npm_count_: *u32, - names: []const ExternalString, - values: []const ExternalString, + names: []align(1) const ExternalString, + values: []align(1) const ExternalString, string_buf: []const u8, required: bool, ) ?Dependency.List { @@ -921,7 +951,7 @@ const Npm = struct { not_found, }; - cached: void, + cached: PackageManifest, fresh: PackageManifest, not_found: void, }; @@ -933,12 +963,16 @@ const Npm = struct { body: []const u8, log: *logger.Log, package_name: string, + loaded_manifest: ?PackageManifest, ) !PackageVersionResponse { switch (response.status_code) { + 400 => return error.BadRequest, 429 => return error.TooManyRequests, 404 => return PackageVersionResponse{ .not_found = .{} }, 500...599 => return error.HTTPInternalServerError, - 304 => return PackageVersionResponse{ .cached = .{} }, + 304 => return PackageVersionResponse{ + .cached = loaded_manifest.?, + }, else => {}, } @@ -967,7 +1001,21 @@ const Npm = struct { JSAst.Stmt.Data.Store.reset(); } - if (try PackageManifest.parse(allocator, log, body, package_name, newly_last_modified, new_etag, 300)) |package| { + if (try PackageManifest.parse( + allocator, + log, + body, + package_name, + newly_last_modified, + new_etag, + @truncate(u32, @intCast(u64, @maximum(0, std.time.timestamp()))) + 300, + )) |package| { + if (PackageManager.instance.enable_manifest_cache) { + var tmpdir = Fs.FileSystem.instance.tmpdir(); + + PackageManifest.Serializer.save(&package, tmpdir, PackageManager.instance.cache_directory) catch {}; + } + return PackageVersionResponse{ .fresh = package }; } @@ -986,7 +1034,7 @@ const Npm = struct { keys: VersionSlice = VersionSlice{}, values: PackageVersionList = PackageVersionList{}, - pub fn findKeyIndex(this: ExternVersionMap, buf: []const Semver.Version, find: Semver.Version) ?u32 { + pub fn findKeyIndex(this: ExternVersionMap, buf: []align(1) const Semver.Version, find: Semver.Version) ?u32 { for (this.keys.get(buf)) |key, i| { if (key.eql(find)) { return @truncate(u32, i); @@ -1024,22 +1072,21 @@ const Npm = struct { /// Everything inside here is just pointers to one of the three arrays const NpmPackage = extern struct { name: ExternalString = ExternalString{}, - - releases: ExternVersionMap = ExternVersionMap{}, - prereleases: ExternVersionMap = ExternVersionMap{}, - dist_tags: DistTagMap = DistTagMap{}, + /// HTTP response headers + last_modified: ExternalString = ExternalString{}, + etag: ExternalString = ExternalString{}, /// "modified" in the JSON modified: ExternalString = ExternalString{}, - /// HTTP response headers - last_modified: ExternalString = ExternalString{}, - etag: ExternalString = ExternalString{}, - public_max_age: u32 = 0, + releases: ExternVersionMap = ExternVersionMap{}, + prereleases: ExternVersionMap = ExternVersionMap{}, + dist_tags: DistTagMap = DistTagMap{}, - string_buf: BigExternalString = BigExternalString{}, versions_buf: VersionSlice = VersionSlice{}, string_lists_buf: ExternalStringList = ExternalStringList{}, + string_buf: BigExternalString = BigExternalString{}, + public_max_age: u32 = 0, }; const PackageManifest = struct { @@ -1048,9 +1095,139 @@ const Npm = struct { pkg: NpmPackage = NpmPackage{}, string_buf: []const u8 = &[_]u8{}, - versions: []const Semver.Version = &[_]Semver.Version{}, - external_strings: []const ExternalString = &[_]ExternalString{}, - package_versions: []PackageVersion = &[_]PackageVersion{}, + versions: []align(1) const Semver.Version = &[_]Semver.Version{}, + external_strings: []align(1) const ExternalString = &[_]ExternalString{}, + package_versions: []align(1) const PackageVersion = &[_]PackageVersion{}, + + pub const Serializer = struct { + pub const version = "bun-npm-manifest-cache-v0.0.1\n"; + const header_bytes: string = "#!/usr/bin/env bun\n" ++ version; + + pub fn writeArray(comptime Writer: type, writer: Writer, comptime Type: type, array: []align(1) const Type, pos: *u64) !void { + const bytes = std.mem.sliceAsBytes(array); + if (bytes.len == 0) { + try writer.writeIntNative(u64, 0); + pos.* += 8; + return; + } + + try writer.writeAll(std.mem.asBytes(&array.len)); + pos.* += 8; + try writer.writeAll( + bytes, + ); + pos.* += bytes.len; + } + + pub fn readArray(stream: *std.io.FixedBufferStream([]const u8), comptime Type: type) ![]align(1) const Type { + var reader = stream.reader(); + const len = try reader.readIntNative(u64); + if (len == 0) { + return &[_]Type{}; + } + const result = @ptrCast([*]align(1) const Type, &stream.buffer[stream.pos])[0..len]; + stream.pos += std.mem.sliceAsBytes(result).len; + return result; + } + + pub fn write(this: *const PackageManifest, comptime Writer: type, writer: Writer) !void { + var pos: u64 = 0; + try writer.writeAll(header_bytes); + pos += header_bytes.len; + + // try writer.writeAll(&std.mem.zeroes([header_bytes.len % @alignOf(NpmPackage)]u8)); + + // package metadata first + try writer.writeAll(std.mem.asBytes(&this.pkg)); + pos += std.mem.asBytes(&this.pkg).len; + + try writeArray(Writer, writer, PackageVersion, this.package_versions, &pos); + try writeArray(Writer, writer, Semver.Version, this.versions, &pos); + try writeArray(Writer, writer, ExternalString, this.external_strings, &pos); + + // strings + try writer.writeAll(std.mem.asBytes(&this.string_buf.len)); + if (this.string_buf.len > 0) try writer.writeAll(this.string_buf); + } + + pub fn save(this: *const PackageManifest, tmpdir: std.fs.Dir, cache_dir: std.fs.Dir) !void { + const file_id = std.hash.Wyhash.hash(0, this.name); + var dest_path_buf: [512 + 64]u8 = undefined; + var out_path_buf: ["-18446744073709551615".len + ".npm".len + 1]u8 = undefined; + var dest_path_stream = std.io.fixedBufferStream(&dest_path_buf); + var dest_path_stream_writer = dest_path_stream.writer(); + try dest_path_stream_writer.print("{x}.npm-{x}", .{ file_id, @maximum(std.time.milliTimestamp(), 0) }); + try dest_path_stream_writer.writeByte(0); + var tmp_path: [:0]u8 = dest_path_buf[0 .. dest_path_stream.pos - 1 :0]; + { + var tmpfile = try tmpdir.createFileZ(tmp_path, .{ + .truncate = true, + }); + var writer = tmpfile.writer(); + try Serializer.write(this, @TypeOf(writer), writer); + tmpfile.close(); + } + + var out_path = std.fmt.bufPrintZ(&out_path_buf, "{x}.npm", .{file_id}) catch unreachable; + try std.os.renameatZ(tmpdir.fd, tmp_path, cache_dir.fd, out_path); + } + + pub fn load(allocator: *std.mem.Allocator, cache_dir: std.fs.Dir, package_name: string) !?PackageManifest { + const file_id = std.hash.Wyhash.hash(0, package_name); + var file_path_buf: [512 + 64]u8 = undefined; + var file_path = try std.fmt.bufPrintZ(&file_path_buf, "{x}.npm", .{file_id}); + var cache_file = cache_dir.openFileZ( + file_path, + .{ + .read = true, + }, + ) catch return null; + var timer: std.time.Timer = undefined; + if (verbose_install) { + timer = std.time.Timer.start() catch @panic("timer fail"); + } + defer cache_file.close(); + var bytes = try cache_file.readToEndAlloc(allocator, std.math.maxInt(u32)); + errdefer allocator.free(bytes); + if (bytes.len < header_bytes.len) return null; + const result = try readAll(bytes); + if (verbose_install) { + Output.prettyError("\n ", .{}); + Output.printTimer(&timer); + Output.prettyErrorln("<d> [cache hit] {s}<r>", .{package_name}); + } + return result; + } + + pub fn readAll(bytes: []const u8) !PackageManifest { + var remaining = bytes; + if (!strings.eqlComptime(bytes[0..header_bytes.len], header_bytes)) { + return error.InvalidPackageManifest; + } + remaining = remaining[header_bytes.len..]; + var pkg_stream = std.io.fixedBufferStream(remaining); + var pkg_reader = pkg_stream.reader(); + var package_manifest = PackageManifest{ + .name = "", + .pkg = try pkg_reader.readStruct(NpmPackage), + }; + + package_manifest.package_versions = try readArray(&pkg_stream, PackageVersion); + package_manifest.versions = try readArray(&pkg_stream, Semver.Version); + package_manifest.external_strings = try readArray(&pkg_stream, ExternalString); + + { + const len = try pkg_reader.readIntNative(u64); + const start = pkg_stream.pos; + pkg_stream.pos += len; + if (len > 0) package_manifest.string_buf = remaining[start .. start + len]; + } + + package_manifest.name = package_manifest.pkg.name.slice(package_manifest.string_buf); + + return package_manifest; + } + }; pub fn str(self: *const PackageManifest, external: ExternalString) string { return external.slice(self.string_buf); @@ -1093,7 +1270,7 @@ const Npm = struct { pub const FindResult = struct { version: Semver.Version, - package: *const PackageVersion, + package: *align(1) const PackageVersion, }; pub fn findByString(this: *const PackageManifest, version: string) ?FindResult { @@ -1114,7 +1291,7 @@ const Npm = struct { pub fn findByVersion(this: *const PackageManifest, version: Semver.Version) ?FindResult { const list = if (!version.tag.hasPre()) this.pkg.releases else this.pkg.prereleases; - const values = list.values.mut(this.package_versions); + const values = list.values.get(this.package_versions); const keys = list.keys.get(this.versions); const index = list.findKeyIndex(this.versions, version) orelse return null; return FindResult{ @@ -1180,8 +1357,8 @@ const Npm = struct { log: *logger.Log, json_buffer: []const u8, expected_name: []const u8, - etag: []const u8, last_modified: []const u8, + etag: []const u8, public_max_age: u32, ) !?PackageManifest { const source = logger.Source.initPathString(expected_name, json_buffer); @@ -1202,8 +1379,6 @@ const Npm = struct { }; var string_builder = StringBuilder{}; - string_builder.count(last_modified); - string_builder.count(etag); if (json.asProperty("name")) |name_q| { const name = name_q.expr.asString(allocator) orelse return null; @@ -1297,22 +1472,18 @@ const Npm = struct { } } - var versioned_packages = try allocator.alloc(PackageVersion, release_versions_len + pre_versions_len); - std.mem.set( - PackageVersion, - versioned_packages, + if (last_modified.len > 0) { + string_builder.count(last_modified); + } + + if (etag.len > 0) { + string_builder.count(etag); + } + + var versioned_packages = try allocator.allocAdvanced(PackageVersion, 1, release_versions_len + pre_versions_len, .exact); + var all_semver_versions = try allocator.allocAdvanced(Semver.Version, 1, release_versions_len + pre_versions_len + dist_tags_count, .exact); + var all_extern_strings = try allocator.allocAdvanced(ExternalString, 1, extern_string_count, .exact); - PackageVersion{}, - ); - var all_semver_versions = try allocator.alloc(Semver.Version, release_versions_len + pre_versions_len + dist_tags_count); - std.mem.set(Semver.Version, all_semver_versions, Semver.Version{}); - var all_extern_strings = try allocator.alloc(ExternalString, extern_string_count); - std.mem.set( - ExternalString, - all_extern_strings, - - ExternalString{}, - ); var versioned_package_releases = versioned_packages[0..release_versions_len]; var all_versioned_package_releases = versioned_package_releases; var versioned_package_prereleases = versioned_packages[release_versions_len..][0..pre_versions_len]; @@ -1618,6 +1789,7 @@ const Npm = struct { result.versions = all_semver_versions; result.external_strings = all_extern_strings; result.package_versions = versioned_packages; + result.pkg.public_max_age = public_max_age; if (string_builder.ptr) |ptr| { result.string_buf = ptr[0..string_builder.len]; @@ -2005,6 +2177,7 @@ const Task = struct { this.request.package_manifest.network.response_buffer.toOwnedSliceLeaky(), &this.log, this.request.package_manifest.name, + this.request.package_manifest.network.callback.package_manifest.loaded_manifest, ) catch |err| { this.status = Status.fail; PackageManager.instance.resolve_tasks.writeItem(this.*) catch unreachable; @@ -2088,12 +2261,20 @@ const TaskChannel = sync.Channel(Task, .{ .Static = 4096 }); const NetworkChannel = sync.Channel(*NetworkTask, .{ .Static = 8192 }); const ThreadPool = @import("../thread_pool.zig"); +pub const CacheLevel = struct { + use_cache_control_headers: bool, + use_etag: bool, + use_last_modified: bool, +}; + // We can't know all the package s we need until we've downloaded all the packages // The easy way wouild be: // 1. Download all packages, parsing their dependencies and enqueuing all dependnecies for resolution // 2. pub const PackageManager = struct { enable_cache: bool = true, + enable_manifest_cache: bool = true, + enable_manifest_cache_public: bool = true, cache_directory_path: string = "", cache_directory: std.fs.Dir = undefined, root_dir: *Fs.FileSystem.DirEntry, @@ -2198,6 +2379,83 @@ pub const PackageManager = struct { network_task: ?*NetworkTask = null, }; + pub fn getOrPutResolvedPackageWithFindResult( + this: *PackageManager, + name_hash: u32, + name: string, + version: Dependency.Version, + resolution: *PackageID, + manifest: *const Npm.PackageManifest, + find_result: Npm.PackageManifest.FindResult, + ) !?ResolvedPackageResult { + var resolved_package_entry = try this.resolved_package_index.getOrPut(this.allocator, Package.hash(name, find_result.version)); + + // Was this package already allocated? Let's reuse the existing one. + if (resolved_package_entry.found_existing) { + resolution.* = resolved_package_entry.value_ptr.*.id; + return ResolvedPackageResult{ .package = resolved_package_entry.value_ptr.* }; + } + + const id = package_list.reserveOne() catch unreachable; + resolution.* = id; + var ptr = package_list.at(id).?; + ptr.* = Package.fromNPM( + this.allocator, + id, + this.log, + manifest, + find_result.version, + find_result.package, + this.default_features, + manifest.string_buf, + ); + resolved_package_entry.value_ptr.* = ptr; + + switch (ptr.determinePreinstallState(this)) { + // Is this package already in the cache? + // We don't need to download the tarball, but we should enqueue dependencies + .done => { + return ResolvedPackageResult{ .package = ptr, .is_first_time = true }; + }, + + // Do we need to download the tarball? + .extract => { + const task_id = Task.Id.forPackage(Task.Tag.extract, ptr.name, ptr.version); + const dedupe_entry = try this.network_task_queue.getOrPut(this.allocator, task_id); + + // Assert that we don't end up downloading the tarball twice. + std.debug.assert(!dedupe_entry.found_existing); + var network_task = this.getNetworkTask(); + network_task.* = NetworkTask{ + .task_id = task_id, + .callback = undefined, + .allocator = this.allocator, + }; + + try network_task.forTarball( + this.allocator, + ExtractTarball{ + .name = name, + .version = ptr.version, + .cache_dir = this.cache_directory_path, + .registry = this.registry.url.href, + .package = ptr, + .extracted_file_count = find_result.package.file_count, + }, + ); + + return ResolvedPackageResult{ + .package = ptr, + .is_first_time = true, + .network_task = network_task, + }; + }, + else => unreachable, + } + + return ResolvedPackageResult{ .package = ptr }; + } + pub fn getOrPutResolvedPackage( this: *PackageManager, name_hash: u32, @@ -2224,72 +2482,7 @@ pub const PackageManager = struct { else => unreachable, }; - var resolved_package_entry = try this.resolved_package_index.getOrPut(this.allocator, Package.hash(name, find_result.version)); - - // Was this package already allocated? Let's reuse the existing one. - if (resolved_package_entry.found_existing) { - resolution.* = resolved_package_entry.value_ptr.*.id; - return ResolvedPackageResult{ .package = resolved_package_entry.value_ptr.* }; - } - - const id = package_list.reserveOne() catch unreachable; - resolution.* = id; - var ptr = package_list.at(id).?; - ptr.* = Package.fromNPM( - this.allocator, - id, - this.log, - manifest, - find_result.version, - find_result.package, - this.default_features, - manifest.string_buf, - ); - resolved_package_entry.value_ptr.* = ptr; - - switch (ptr.determinePreinstallState(this)) { - // Is this package already in the cache? - // We don't need to download the tarball, but we should enqueue dependencies - .done => { - return ResolvedPackageResult{ .package = ptr, .is_first_time = true }; - }, - - // Do we need to download the tarball? - .extract => { - const task_id = Task.Id.forPackage(Task.Tag.extract, ptr.name, ptr.version); - const dedupe_entry = try this.network_task_queue.getOrPut(this.allocator, task_id); - - // Assert that we don't end up downloading the tarball twice. - std.debug.assert(!dedupe_entry.found_existing); - var network_task = this.getNetworkTask(); - network_task.* = NetworkTask{ - .task_id = task_id, - .callback = undefined, - .allocator = this.allocator, - }; - - try network_task.forTarball( - this.allocator, - ExtractTarball{ - .name = name, - .version = ptr.version, - .cache_dir = this.cache_directory_path, - .registry = this.registry.url.href, - .package = ptr, - .extracted_file_count = find_result.package.file_count, - }, - ); - - return ResolvedPackageResult{ - .package = ptr, - .is_first_time = true, - .network_task = network_task, - }; - }, - else => unreachable, - } - - return ResolvedPackageResult{ .package = ptr }; + return try getOrPutResolvedPackageWithFindResult(this, name_hash, name, version, resolution, manifest, find_result); }, else => return null, @@ -2351,111 +2544,155 @@ pub const PackageManager = struct { const name_hash = dependency.name_hash; const version: Dependency.Version = dependency.version; var batch = ThreadPool.Batch{}; + var loaded_manifest: ?Npm.PackageManifest = null; + switch (dependency.version) { .npm, .dist_tag => { - const resolve_result = this.getOrPutResolvedPackage(name_hash, name, version, &dependency.resolution) catch |err| { - switch (err) { - error.DistTagNotFound => { - if (required) { - this.log.addErrorFmt( - null, - logger.Loc.Empty, - this.allocator, - "Package \"{s}\" with tag \"{s}\" not found, but package exists", - .{ - name, - version.dist_tag, - }, - ) catch unreachable; + retry_from_manifests_ptr: while (true) { + var resolve_result_ = this.getOrPutResolvedPackage(name_hash, name, version, &dependency.resolution); + + retry_with_new_resolve_result: while (true) { + const resolve_result = resolve_result_ catch |err| { + switch (err) { + error.DistTagNotFound => { + if (required) { + this.log.addErrorFmt( + null, + logger.Loc.Empty, + this.allocator, + "Package \"{s}\" with tag \"{s}\" not found, but package exists", + .{ + name, + version.dist_tag, + }, + ) catch unreachable; + } + + return null; + }, + error.NoMatchingVersion => { + if (required) { + this.log.addErrorFmt( + null, + logger.Loc.Empty, + this.allocator, + "No version matching \"{s}\" found for package {s} (but package exists)", + .{ + version.npm.input, + name, + }, + ) catch unreachable; + } + return null; + }, + else => return err, } + }; - return null; - }, - error.NoMatchingVersion => { - if (required) { - this.log.addErrorFmt( - null, - logger.Loc.Empty, - this.allocator, - "No version matching \"{s}\" found for package {s} (but package exists)", - .{ - version.npm.input, - name, - }, - ) catch unreachable; + if (resolve_result) |result| { + if (result.package.isDisabled()) return null; + + // First time? + if (result.is_first_time) { + if (verbose_install) { + const label: string = switch (version) { + .npm => version.npm.input, + .dist_tag => version.dist_tag, + else => unreachable, + }; + + Output.prettyErrorln(" -> \"{s}\": \"{s}\" -> {s}@{}", .{ + result.package.name, + label, + result.package.name, + result.package.version.fmt(result.package.string_buf), + }); + } + // Resolve dependencies first + batch.push(this.enqueuePackages(result.package.dependencies, true)); + if (this.default_features.peer_dependencies) batch.push(this.enqueuePackages(result.package.peer_dependencies, true)); + if (this.default_features.optional_dependencies) batch.push(this.enqueuePackages(result.package.optional_dependencies, false)); } - return null; - }, - else => return err, - } - }; - if (resolve_result) |result| { - if (result.package.isDisabled()) return null; + if (result.network_task) |network_task| { + if (result.package.preinstall_state == .extract) { + Output.prettyErrorln(" ↓📦 {s}@{}", .{ + result.package.name, + result.package.version.fmt(result.package.string_buf), + }); + result.package.preinstall_state = .extracting; + network_task.schedule(&this.network_tarball_batch); + } + } - // First time? - if (result.is_first_time) { - if (verbose_install) { - const label: string = switch (version) { - .npm => version.npm.input, - .dist_tag => version.dist_tag, - else => unreachable, - }; + if (batch.len > 0) { + return batch; + } + } else { + const task_id = Task.Id.forManifest(Task.Tag.package_manifest, name); + var network_entry = try this.network_task_queue.getOrPutContext(this.allocator, task_id, .{}); + if (!network_entry.found_existing) { + if (this.enable_manifest_cache) { + if (Npm.PackageManifest.Serializer.load(this.allocator, this.cache_directory, name) catch null) |manifest_| { + const manifest: Npm.PackageManifest = manifest_; + loaded_manifest = manifest; + + if (this.enable_manifest_cache_public and manifest.pkg.public_max_age > @truncate(u32, @intCast(u64, @maximum(std.time.timestamp(), 0)))) { + try this.manifests.put(this.allocator, @truncate(u32, manifest.pkg.name.hash), manifest); + } - Output.prettyErrorln(" -> \"{s}\": \"{s}\" -> {s}@{}", .{ - result.package.name, - label, - result.package.name, - result.package.version.fmt(result.package.string_buf), - }); - } - // Resolve dependencies first - batch.push(this.enqueuePackages(result.package.dependencies, true)); - if (this.default_features.peer_dependencies) batch.push(this.enqueuePackages(result.package.peer_dependencies, true)); - if (this.default_features.optional_dependencies) batch.push(this.enqueuePackages(result.package.optional_dependencies, false)); - } + // If it's an exact package version already living in the cache + // We can skip the network request, even if it's beyond the caching period + if (dependency.version == .npm and dependency.version.npm.isExact()) { + if (loaded_manifest.?.findByVersion(dependency.version.npm.head.head.range.left.version)) |find_result| { + if (this.getOrPutResolvedPackageWithFindResult( + name_hash, + name, + version, + &dependency.resolution, + &loaded_manifest.?, + find_result, + ) catch null) |new_resolve_result| { + resolve_result_ = new_resolve_result; + _ = this.network_task_queue.remove(task_id); + continue :retry_with_new_resolve_result; + } + } + } - if (result.network_task) |network_task| { - if (result.package.preinstall_state == .extract) { - Output.prettyErrorln(" ↓📦 {s}@{}", .{ - result.package.name, - result.package.version.fmt(result.package.string_buf), - }); - result.package.preinstall_state = .extracting; - network_task.schedule(&this.network_tarball_batch); - } - } + // Was it recent enough to just load it without the network call? + if (this.enable_manifest_cache_public and manifest.pkg.public_max_age > @truncate(u32, @intCast(u64, @maximum(std.time.timestamp(), 0)))) { + _ = this.network_task_queue.remove(task_id); + continue :retry_from_manifests_ptr; + } + } + } - if (batch.len > 0) { - return batch; - } - } else { - const task_id = Task.Id.forManifest(Task.Tag.package_manifest, name); - var network_entry = try this.network_task_queue.getOrPutContext(this.allocator, task_id, .{}); - if (!network_entry.found_existing) { - if (verbose_install) { - Output.prettyErrorln("Enqueue package manifest for download: {s}", .{name}); - } + if (verbose_install) { + Output.prettyErrorln("Enqueue package manifest for download: {s}", .{name}); + } - var network_task = this.getNetworkTask(); - network_task.* = NetworkTask{ - .callback = undefined, - .task_id = task_id, - .allocator = this.allocator, - }; - try network_task.forManifest(name, this.allocator, this.registry.url); - network_task.schedule(&this.network_resolve_batch); - } + var network_task = this.getNetworkTask(); + network_task.* = NetworkTask{ + .callback = undefined, + .task_id = task_id, + .allocator = this.allocator, + }; + try network_task.forManifest(name, this.allocator, this.registry.url, loaded_manifest); + network_task.schedule(&this.network_resolve_batch); + } - var manifest_entry_parse = try this.task_queue.getOrPutContext(this.allocator, task_id, .{}); - if (!manifest_entry_parse.found_existing) { - manifest_entry_parse.value_ptr.* = TaskCallbackList{}; - } + var manifest_entry_parse = try this.task_queue.getOrPutContext(this.allocator, task_id, .{}); + if (!manifest_entry_parse.found_existing) { + manifest_entry_parse.value_ptr.* = TaskCallbackList{}; + } - try manifest_entry_parse.value_ptr.append(this.allocator, TaskCallbackContext.init(dependency)); - } + try manifest_entry_parse.value_ptr.append(this.allocator, TaskCallbackContext.init(dependency)); + } - return null; + return null; + } + } }, else => {}, } @@ -2672,6 +2909,24 @@ pub const PackageManager = struct { // .progress }; + if (!enable_cache) { + manager.enable_manifest_cache = false; + manager.enable_manifest_cache_public = false; + } + + if (env_loader.map.get("BUN_MANIFEST_CACHE")) |manifest_cache| { + if (strings.eqlComptime(manifest_cache, "1")) { + manager.enable_manifest_cache = true; + manager.enable_manifest_cache_public = false; + } else if (strings.eqlComptime(manifest_cache, "2")) { + manager.enable_manifest_cache = true; + manager.enable_manifest_cache_public = true; + } else { + manager.enable_manifest_cache = false; + manager.enable_manifest_cache_public = false; + } + } + manager.enqueueDependencyList( root, Package.Features{ @@ -2688,7 +2943,8 @@ pub const PackageManager = struct { manager.pending_tasks -= 1; switch (task.callback) { - .package_manifest => |name| { + .package_manifest => |manifest_req| { + const name = manifest_req.name; const response = task.http.response orelse { Output.prettyErrorln("Failed to download package manifest for package {s}", .{name}); Output.flush(); @@ -2714,6 +2970,29 @@ pub const PackageManager = struct { Output.flush(); } + if (response.status_code == 304) { + // The HTTP request was cached + if (manifest_req.loaded_manifest) |manifest| { + var entry = try manager.manifests.getOrPut(ctx.allocator, @truncate(u32, manifest.pkg.name.hash)); + entry.value_ptr.* = manifest; + entry.value_ptr.*.pkg.public_max_age = @truncate(u32, @intCast(u64, @maximum(0, std.time.timestamp()))) + 300; + { + var tmpdir = Fs.FileSystem.instance.tmpdir(); + Npm.PackageManifest.Serializer.save(entry.value_ptr, tmpdir, PackageManager.instance.cache_directory) catch {}; + } + + const dependency_list = manager.task_queue.get(task.task_id).?; + + for (dependency_list.items) |item| { + var dependency: *Dependency = TaskCallbackContext.get(item, Dependency).?; + if (try manager.enqueueDependency(dependency, dependency.required)) |new_batch| { + batch.push(new_batch); + } + } + continue; + } + } + batch.push(ThreadPool.Batch.from(manager.enqueueParseNPMPackage(task.task_id, name, task))); }, .extract => |extract| { diff --git a/src/install/semver.zig b/src/install/semver.zig index d51712e6e..71eb2073c 100644 --- a/src/install/semver.zig +++ b/src/install/semver.zig @@ -693,6 +693,10 @@ pub const Query = struct { pub const FlagsBitSet = std.bit_set.IntegerBitSet(3); + pub fn isExact(this: *const Group) bool { + return this.head.next == null and this.head.head.next == null and !this.head.head.range.hasRight() and this.head.head.range.left.op == .eql; + } + pub fn orVersion(self: *Group, version: Version) !void { if (self.tail == null and !self.head.head.range.hasLeft()) { self.head.head.range.left.version = version; diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 8b171c422..6e9f1ffbc 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -211,11 +211,18 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { .Acquire, .Monotonic, ) orelse { + if (self.io) |io| { + io.tick() catch {}; + } + return is_waking or (sync.state == .signaled); }); // No notification to consume. // Mark this thread as idle before sleeping on the idle_event. + if (self.io) |io| { + io.tick() catch {}; + } } else if (!is_idle) { var new_sync = sync; new_sync.idle += 1; @@ -236,13 +243,13 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { // Wait for a signal by either notify() or shutdown() without wasting cpu cycles. // TODO: Add I/O polling here. if (self.io) |io| { - io.tick() catch unreachable; + io.tick() catch {}; } } else { if (self.io) |io| { const HTTP = @import("./http/http_client_async.zig"); - io.run_for_ns(std.time.ns_per_us * 100) catch {}; - while (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > 100) { + io.run_for_ns(std.time.ns_per_us * 10) catch {}; + while (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > 255) { io.tick() catch {}; } sync = @bitCast(Sync, self.sync.load(.Monotonic)); |