diff options
author | 2023-04-03 08:32:19 +0300 | |
---|---|---|
committer | 2023-04-02 22:32:19 -0700 | |
commit | 9b0f12883c2520eecdfb26caec5f48845ccae8af (patch) | |
tree | 0b6d43c7455b2d362e2aed493d83ebc8797b3235 | |
parent | fcd8b828644cc3cf2bd46bbfc0f6b90789d5dba2 (diff) | |
download | bun-9b0f12883c2520eecdfb26caec5f48845ccae8af.tar.gz bun-9b0f12883c2520eecdfb26caec5f48845ccae8af.tar.zst bun-9b0f12883c2520eecdfb26caec5f48845ccae8af.zip |
[install] reduce parallel HTTP requests under heavy load (#2536)
* [install] reduce parallel HTTP requests under heavy load
* make `max_simultaneous_requests` atomic
-rw-r--r-- | misctools/http_bench.zig | 2 | ||||
-rw-r--r-- | src/analytics/analytics_thread.zig | 3 | ||||
-rw-r--r-- | src/bun_js.zig | 29 | ||||
-rw-r--r-- | src/http_client_async.zig | 54 | ||||
-rw-r--r-- | src/install/install.zig | 132 |
5 files changed, 120 insertions, 100 deletions
diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig index 5e12f0157..efb1069fe 100644 --- a/misctools/http_bench.zig +++ b/misctools/http_bench.zig @@ -198,7 +198,7 @@ pub fn main() anyerror!void { try channel.buffer.ensureTotalCapacity(args.count); try NetworkThread.init(); - if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests = args.concurrency; + if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests.store(args.concurrency, .Monotonic); const Group = struct { response_body: MutableString = undefined, context: HTTP.HTTPChannelContext = undefined, diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig index 06b882901..04c3c9aff 100644 --- a/src/analytics/analytics_thread.zig +++ b/src/analytics/analytics_thread.zig @@ -510,9 +510,6 @@ pub const EventList = struct { var retry_remaining: usize = 10; const rand = random.random(); retry: while (retry_remaining > 0) { - this.async_http.max_retry_count = 0; - this.async_http.retries_count = 0; - const response = this.async_http.sendSync(true) catch |err| { if (FeatureFlags.verbose_analytics) { Output.prettyErrorln("[Analytics] failed due to error {s} ({d} retries remain)", .{ @errorName(err), retry_remaining }); diff --git a/src/bun_js.zig b/src/bun_js.zig index 51764231e..2797096bc 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -104,33 +104,8 @@ pub const Run = struct { Output.prettyErrorln("\n", .{}); Global.exit(1); }; - AsyncHTTP.max_simultaneous_requests = 255; - - if (b.env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| { - load: { - AsyncHTTP.max_simultaneous_requests = std.fmt.parseInt(u16, max_http_requests, 10) catch { - vm.log.addErrorFmt( - null, - logger.Loc.Empty, - vm.allocator, - "BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535", - .{max_http_requests}, - ) catch unreachable; - break :load; - }; - - if (AsyncHTTP.max_simultaneous_requests == 0) { - vm.log.addWarningFmt( - null, - logger.Loc.Empty, - vm.allocator, - "BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535", - .{}, - ) catch unreachable; - AsyncHTTP.max_simultaneous_requests = 255; - } - } - } + + AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env); vm.loadExtraEnv(); vm.is_main_thread = true; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 0033d8ad0..6e8542479 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -1,5 +1,5 @@ -const picohttp = @import("bun").picohttp; const bun = @import("bun"); +const picohttp = bun.picohttp; const JSC = bun.JSC; const string = bun.string; const Output = bun.Output; @@ -10,6 +10,9 @@ const MutableString = bun.MutableString; const FeatureFlags = bun.FeatureFlags; const stringZ = bun.stringZ; const C = bun.C; +const Loc = bun.logger.Loc; +const Log = bun.logger.Log; +const DotEnv = @import("./env_loader.zig"); const std = @import("std"); const URL = @import("./url.zig").URL; pub const Method = @import("./http/method.zig").Method; @@ -18,16 +21,16 @@ const Lock = @import("./lock.zig").Lock; const HTTPClient = @This(); const Zlib = @import("./zlib.zig"); const StringBuilder = @import("./string_builder.zig"); -const AsyncIO = @import("bun").AsyncIO; -const ThreadPool = @import("bun").ThreadPool; -const BoringSSL = @import("bun").BoringSSL; +const AsyncIO = bun.AsyncIO; +const ThreadPool = bun.ThreadPool; +const BoringSSL = bun.BoringSSL; pub const NetworkThread = @import("./network_thread.zig"); const ObjectPool = @import("./pool.zig").ObjectPool; const SOCK = os.SOCK; const Arena = @import("./mimalloc_arena.zig").Arena; const ZlibPool = @import("./http/zlib.zig"); const URLBufferPool = ObjectPool([4096]u8, null, false, 10); -const uws = @import("bun").uws; +const uws = bun.uws; pub const MimeType = @import("./http/mime_type.zig"); pub const URLPath = @import("./http/url_path.zig"); // This becomes Arena.allocator @@ -570,8 +573,9 @@ pub const HTTPThread = struct { } var count: usize = 0; - var remaining: usize = AsyncHTTP.max_simultaneous_requests - AsyncHTTP.active_requests_count.loadUnchecked(); - if (remaining == 0) return; + var active = AsyncHTTP.active_requests_count.load(.Monotonic); + const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic); + if (active >= max) return; defer { if (comptime Environment.allow_assert) { if (count > 0) @@ -588,8 +592,8 @@ pub const HTTPThread = struct { count += 1; } - remaining -= 1; - if (remaining == 0) break; + active += 1; + if (active >= max) break; } } @@ -1171,7 +1175,6 @@ pub const AsyncHTTP = struct { allocator: std.mem.Allocator, request_header_buf: string = "", method: Method = Method.GET, - max_retry_count: u32 = 0, url: URL, http_proxy: ?URL = null, real: ?*AsyncHTTP = null, @@ -1185,7 +1188,6 @@ pub const AsyncHTTP = struct { redirected: bool = false, response_encoding: Encoding = Encoding.identity, - retries_count: u32 = 0, verbose: bool = false, client: HTTPClient = undefined, @@ -1197,7 +1199,33 @@ pub const AsyncHTTP = struct { gzip_elapsed: u64 = 0, pub var active_requests_count = std.atomic.Atomic(usize).init(0); - pub var max_simultaneous_requests: usize = 256; + pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256); + + pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void { + if (env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| { + const max = std.fmt.parseInt(u16, max_http_requests, 10) catch { + logger.addErrorFmt( + null, + Loc.Empty, + allocator, + "BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535", + .{max_http_requests}, + ) catch unreachable; + return; + }; + if (max == 0) { + logger.addWarningFmt( + null, + Loc.Empty, + allocator, + "BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535", + .{}, + ) catch unreachable; + return; + } + AsyncHTTP.max_simultaneous_requests.store(max, .Monotonic); + } + } pub fn deinit(this: *AsyncHTTP) void { this.response_headers.deinit(this.allocator); @@ -1357,7 +1385,7 @@ pub const AsyncHTTP = struct { completion.function(completion.ctx, result); - if (active_requests == AsyncHTTP.max_simultaneous_requests) { + if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) { http_thread.drainEvents(); } } diff --git a/src/install/install.zig b/src/install/install.zig index 1c1ea124d..e52ca32d2 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -11,7 +11,7 @@ const C = bun.C; const std = @import("std"); const JSLexer = bun.js_lexer; -const logger = @import("bun").logger; +const logger = bun.logger; const js_parser = bun.js_parser; const json_parser = bun.JSON; @@ -30,23 +30,23 @@ const NodeModuleBundle = @import("../node_module_bundle.zig").NodeModuleBundle; const DotEnv = @import("../env_loader.zig"); const which = @import("../which.zig").which; const Run = @import("../bun_js.zig").Run; -const HeaderBuilder = @import("bun").HTTP.HeaderBuilder; const Fs = @import("../fs.zig"); const FileSystem = Fs.FileSystem; const Lock = @import("../lock.zig").Lock; const URL = @import("../url.zig").URL; -const AsyncHTTP = @import("bun").HTTP.AsyncHTTP; -const HTTPChannel = @import("bun").HTTP.HTTPChannel; -const NetworkThread = @import("bun").HTTP.NetworkThread; -const HTTP = @import("bun").HTTP; +const HTTP = bun.HTTP; +const AsyncHTTP = HTTP.AsyncHTTP; +const HTTPChannel = HTTP.HTTPChannel; +const NetworkThread = HTTP.NetworkThread; +const HeaderBuilder = HTTP.HeaderBuilder; const Integrity = @import("./integrity.zig").Integrity; -const clap = @import("bun").clap; +const clap = bun.clap; const ExtractTarball = @import("./extract_tarball.zig"); const Npm = @import("./npm.zig"); const Bitset = @import("./bit_set.zig").DynamicBitSetUnmanaged; const z_allocator = @import("../memory_allocator.zig").z_allocator; -const Syscall = @import("bun").JSC.Node.Syscall; +const Syscall = bun.JSC.Node.Syscall; const RunCommand = @import("../cli/run_command.zig").RunCommand; threadlocal var initialized_store = false; const Futex = @import("../futex.zig"); @@ -160,6 +160,7 @@ const NetworkTask = struct { http: AsyncHTTP = undefined, task_id: u64, url_buf: []const u8 = &[_]u8{}, + retried: u16 = 0, allocator: std.mem.Allocator, request_buffer: MutableString = undefined, response_buffer: MutableString = undefined, @@ -339,7 +340,6 @@ const NetworkTask = struct { this.package_manager.httpProxy(url), null, ); - this.http.max_retry_count = this.package_manager.options.max_retry_count; this.callback = .{ .package_manifest = .{ .name = try strings.StringOrTinyString.initAppendIfNeeded(name, *FileSystem.FilenameStore, &FileSystem.FilenameStore.instance), @@ -416,7 +416,6 @@ const NetworkTask = struct { this.package_manager.httpProxy(url), null, ); - this.http.max_retry_count = this.package_manager.options.max_retry_count; this.callback = .{ .extract = tarball }; } }; @@ -1559,7 +1558,7 @@ const TaskCallbackList = std.ArrayListUnmanaged(TaskCallbackContext); const TaskDependencyQueue = std.HashMapUnmanaged(u64, TaskCallbackList, IdentityContext(u64), 80); const TaskChannel = sync.Channel(Task, .{ .Static = 4096 }); const NetworkChannel = sync.Channel(*NetworkTask, .{ .Static = 8192 }); -const ThreadPool = @import("bun").ThreadPool; +const ThreadPool = bun.ThreadPool; const PackageManifestMap = std.HashMapUnmanaged(PackageNameHash, Npm.PackageManifest, IdentityContext(PackageNameHash), 80); const RepositoryMap = std.HashMapUnmanaged(u64, std.os.fd_t, IdentityContext(u64), 80); @@ -1568,7 +1567,7 @@ pub const CacheLevel = struct { use_etag: bool, use_last_modified: bool, }; -const AsyncIO = @import("bun").AsyncIO; +const AsyncIO = bun.AsyncIO; const Waker = AsyncIO.Waker; // We can't know all the packages we need until we've downloaded all the packages @@ -3577,6 +3576,7 @@ pub const PackageManager = struct { comptime log_level: Options.LogLevel, ) anyerror!void { var has_updated_this_run = false; + var has_network_error = false; var timestamp_this_tick: ?u32 = null; @@ -3598,7 +3598,28 @@ pub const PackageManager = struct { const response = task.http.response orelse { const err = task.http.err orelse error.HTTPError; - if (@TypeOf(callbacks.onPackageManifestError) != void) { + if (task.retried < manager.options.max_retry_count) { + task.retried += 1; + if (!has_network_error) { + has_network_error = true; + const min = manager.options.min_simultaneous_requests; + const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic); + if (max > min) { + AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .Monotonic); + } + } + manager.enqueueNetworkTask(task); + + if (manager.options.log_level.isVerbose()) { + manager.log.addWarningFmt( + null, + logger.Loc.Empty, + manager.allocator, + "<r><yellow>warn:<r> {s} downloading package manifest <b>{s}<r>", + .{ bun.span(@errorName(err)), name.slice() }, + ) catch unreachable; + } + } else if (@TypeOf(callbacks.onPackageManifestError) != void) { callbacks.onPackageManifestError( extract_ctx, name.slice(), @@ -3607,8 +3628,7 @@ pub const PackageManager = struct { ); } else if (comptime log_level != .silent) { const fmt = "\n<r><red>error<r>: {s} downloading package manifest <b>{s}<r>\n"; - const error_name: string = bun.span(@errorName(err)); - const args = .{ error_name, name.slice() }; + const args = .{ bun.span(@errorName(err)), name.slice() }; if (comptime log_level.showProgress()) { Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress); } else { @@ -3759,9 +3779,34 @@ pub const PackageManager = struct { .extract => |extract| { const response = task.http.response orelse { const err = task.http.err orelse error.TarballFailedToDownload; - const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id]; - if (@TypeOf(callbacks.onPackageDownloadError) != void) { + if (task.retried < manager.options.max_retry_count) { + task.retried += 1; + if (!has_network_error) { + has_network_error = true; + const min = manager.options.min_simultaneous_requests; + const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic); + if (max > min) { + AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .Monotonic); + } + } + manager.enqueueNetworkTask(task); + + if (manager.options.log_level.isVerbose()) { + manager.log.addWarningFmt( + null, + logger.Loc.Empty, + manager.allocator, + "<r><yellow>warn:<r> {s} downloading tarball <b>{s}@{s}<r>", + .{ + bun.span(@errorName(err)), + extract.name.slice(), + extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items), + }, + ) catch unreachable; + } + } else if (@TypeOf(callbacks.onPackageDownloadError) != void) { + const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id]; callbacks.onPackageDownloadError( extract_ctx, package_id, @@ -3770,18 +3815,18 @@ pub const PackageManager = struct { err, task.url_buf, ); - } else { + } else if (comptime log_level != .silent) { const fmt = "\n<r><red>error<r>: {s} downloading tarball <b>{s}@{s}<r>\n"; - const error_name: string = bun.span(@errorName(err)); - const args = .{ error_name, extract.name.slice(), extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items) }; - - if (comptime log_level != .silent) { - if (comptime log_level.showProgress()) { - Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress); - } else { - Output.prettyErrorln(fmt, args); - Output.flush(); - } + const args = .{ + bun.span(@errorName(err)), + extract.name.slice(), + extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items), + }; + if (comptime log_level.showProgress()) { + Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress); + } else { + Output.prettyErrorln(fmt, args); + Output.flush(); } } @@ -4194,6 +4239,7 @@ pub const PackageManager = struct { // 2. Has a platform and/or os specified, which evaluates to not disabled native_bin_link_allowlist: []const PackageNameHash = &default_native_bin_link_allowlist, max_retry_count: u16 = 5, + min_simultaneous_requests: usize = 4, pub fn shouldPrintCommandName(this: *const Options) bool { return this.log_level != .silent and this.do.summary; @@ -4497,9 +4543,7 @@ pub const PackageManager = struct { } if (env.map.get("BUN_CONFIG_HTTP_RETRY_COUNT")) |retry_count| { - if (std.fmt.parseInt(i32, retry_count, 10)) |int| { - this.max_retry_count = @intCast(u16, @min(@max(int, 0), 65355)); - } else |_| {} + if (std.fmt.parseInt(u16, retry_count, 10)) |int| this.max_retry_count = int else |_| {} } if (env.map.get("BUN_CONFIG_LINK_NATIVE_BINS")) |native_packages| { @@ -4522,31 +4566,7 @@ pub const PackageManager = struct { // this.enable.deduplicate_packages = false; // } - if (env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| { - load: { - AsyncHTTP.max_simultaneous_requests = std.fmt.parseInt(u16, max_http_requests, 10) catch { - log.addErrorFmt( - null, - logger.Loc.Empty, - allocator, - "BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535", - .{max_http_requests}, - ) catch unreachable; - break :load; - }; - - if (AsyncHTTP.max_simultaneous_requests == 0) { - log.addWarningFmt( - null, - logger.Loc.Empty, - allocator, - "BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535", - .{}, - ) catch unreachable; - AsyncHTTP.max_simultaneous_requests = 255; - } - } - } + AsyncHTTP.loadEnv(allocator, log, env); if (env.map.get("BUN_CONFIG_SKIP_SAVE_LOCKFILE")) |check_bool| { this.do.save_lockfile = strings.eqlComptime(check_bool, "0"); |