aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Alex Lam S.L <alexlamsl@gmail.com> 2023-04-03 08:32:19 +0300
committerGravatar GitHub <noreply@github.com> 2023-04-02 22:32:19 -0700
commit9b0f12883c2520eecdfb26caec5f48845ccae8af (patch)
tree0b6d43c7455b2d362e2aed493d83ebc8797b3235
parentfcd8b828644cc3cf2bd46bbfc0f6b90789d5dba2 (diff)
downloadbun-9b0f12883c2520eecdfb26caec5f48845ccae8af.tar.gz
bun-9b0f12883c2520eecdfb26caec5f48845ccae8af.tar.zst
bun-9b0f12883c2520eecdfb26caec5f48845ccae8af.zip
[install] reduce parallel HTTP requests under heavy load (#2536)
* [install] reduce parallel HTTP requests under heavy load * make `max_simultaneous_requests` atomic
-rw-r--r--misctools/http_bench.zig2
-rw-r--r--src/analytics/analytics_thread.zig3
-rw-r--r--src/bun_js.zig29
-rw-r--r--src/http_client_async.zig54
-rw-r--r--src/install/install.zig132
5 files changed, 120 insertions, 100 deletions
diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig
index 5e12f0157..efb1069fe 100644
--- a/misctools/http_bench.zig
+++ b/misctools/http_bench.zig
@@ -198,7 +198,7 @@ pub fn main() anyerror!void {
try channel.buffer.ensureTotalCapacity(args.count);
try NetworkThread.init();
- if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests = args.concurrency;
+ if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests.store(args.concurrency, .Monotonic);
const Group = struct {
response_body: MutableString = undefined,
context: HTTP.HTTPChannelContext = undefined,
diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig
index 06b882901..04c3c9aff 100644
--- a/src/analytics/analytics_thread.zig
+++ b/src/analytics/analytics_thread.zig
@@ -510,9 +510,6 @@ pub const EventList = struct {
var retry_remaining: usize = 10;
const rand = random.random();
retry: while (retry_remaining > 0) {
- this.async_http.max_retry_count = 0;
- this.async_http.retries_count = 0;
-
const response = this.async_http.sendSync(true) catch |err| {
if (FeatureFlags.verbose_analytics) {
Output.prettyErrorln("[Analytics] failed due to error {s} ({d} retries remain)", .{ @errorName(err), retry_remaining });
diff --git a/src/bun_js.zig b/src/bun_js.zig
index 51764231e..2797096bc 100644
--- a/src/bun_js.zig
+++ b/src/bun_js.zig
@@ -104,33 +104,8 @@ pub const Run = struct {
Output.prettyErrorln("\n", .{});
Global.exit(1);
};
- AsyncHTTP.max_simultaneous_requests = 255;
-
- if (b.env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
- load: {
- AsyncHTTP.max_simultaneous_requests = std.fmt.parseInt(u16, max_http_requests, 10) catch {
- vm.log.addErrorFmt(
- null,
- logger.Loc.Empty,
- vm.allocator,
- "BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
- .{max_http_requests},
- ) catch unreachable;
- break :load;
- };
-
- if (AsyncHTTP.max_simultaneous_requests == 0) {
- vm.log.addWarningFmt(
- null,
- logger.Loc.Empty,
- vm.allocator,
- "BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
- .{},
- ) catch unreachable;
- AsyncHTTP.max_simultaneous_requests = 255;
- }
- }
- }
+
+ AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env);
vm.loadExtraEnv();
vm.is_main_thread = true;
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 0033d8ad0..6e8542479 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -1,5 +1,5 @@
-const picohttp = @import("bun").picohttp;
const bun = @import("bun");
+const picohttp = bun.picohttp;
const JSC = bun.JSC;
const string = bun.string;
const Output = bun.Output;
@@ -10,6 +10,9 @@ const MutableString = bun.MutableString;
const FeatureFlags = bun.FeatureFlags;
const stringZ = bun.stringZ;
const C = bun.C;
+const Loc = bun.logger.Loc;
+const Log = bun.logger.Log;
+const DotEnv = @import("./env_loader.zig");
const std = @import("std");
const URL = @import("./url.zig").URL;
pub const Method = @import("./http/method.zig").Method;
@@ -18,16 +21,16 @@ const Lock = @import("./lock.zig").Lock;
const HTTPClient = @This();
const Zlib = @import("./zlib.zig");
const StringBuilder = @import("./string_builder.zig");
-const AsyncIO = @import("bun").AsyncIO;
-const ThreadPool = @import("bun").ThreadPool;
-const BoringSSL = @import("bun").BoringSSL;
+const AsyncIO = bun.AsyncIO;
+const ThreadPool = bun.ThreadPool;
+const BoringSSL = bun.BoringSSL;
pub const NetworkThread = @import("./network_thread.zig");
const ObjectPool = @import("./pool.zig").ObjectPool;
const SOCK = os.SOCK;
const Arena = @import("./mimalloc_arena.zig").Arena;
const ZlibPool = @import("./http/zlib.zig");
const URLBufferPool = ObjectPool([4096]u8, null, false, 10);
-const uws = @import("bun").uws;
+const uws = bun.uws;
pub const MimeType = @import("./http/mime_type.zig");
pub const URLPath = @import("./http/url_path.zig");
// This becomes Arena.allocator
@@ -570,8 +573,9 @@ pub const HTTPThread = struct {
}
var count: usize = 0;
- var remaining: usize = AsyncHTTP.max_simultaneous_requests - AsyncHTTP.active_requests_count.loadUnchecked();
- if (remaining == 0) return;
+ var active = AsyncHTTP.active_requests_count.load(.Monotonic);
+ const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic);
+ if (active >= max) return;
defer {
if (comptime Environment.allow_assert) {
if (count > 0)
@@ -588,8 +592,8 @@ pub const HTTPThread = struct {
count += 1;
}
- remaining -= 1;
- if (remaining == 0) break;
+ active += 1;
+ if (active >= max) break;
}
}
@@ -1171,7 +1175,6 @@ pub const AsyncHTTP = struct {
allocator: std.mem.Allocator,
request_header_buf: string = "",
method: Method = Method.GET,
- max_retry_count: u32 = 0,
url: URL,
http_proxy: ?URL = null,
real: ?*AsyncHTTP = null,
@@ -1185,7 +1188,6 @@ pub const AsyncHTTP = struct {
redirected: bool = false,
response_encoding: Encoding = Encoding.identity,
- retries_count: u32 = 0,
verbose: bool = false,
client: HTTPClient = undefined,
@@ -1197,7 +1199,33 @@ pub const AsyncHTTP = struct {
gzip_elapsed: u64 = 0,
pub var active_requests_count = std.atomic.Atomic(usize).init(0);
- pub var max_simultaneous_requests: usize = 256;
+ pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256);
+
+ pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
+ if (env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
+ const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
+ logger.addErrorFmt(
+ null,
+ Loc.Empty,
+ allocator,
+ "BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
+ .{max_http_requests},
+ ) catch unreachable;
+ return;
+ };
+ if (max == 0) {
+ logger.addWarningFmt(
+ null,
+ Loc.Empty,
+ allocator,
+ "BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
+ .{},
+ ) catch unreachable;
+ return;
+ }
+ AsyncHTTP.max_simultaneous_requests.store(max, .Monotonic);
+ }
+ }
pub fn deinit(this: *AsyncHTTP) void {
this.response_headers.deinit(this.allocator);
@@ -1357,7 +1385,7 @@ pub const AsyncHTTP = struct {
completion.function(completion.ctx, result);
- if (active_requests == AsyncHTTP.max_simultaneous_requests) {
+ if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) {
http_thread.drainEvents();
}
}
diff --git a/src/install/install.zig b/src/install/install.zig
index 1c1ea124d..e52ca32d2 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -11,7 +11,7 @@ const C = bun.C;
const std = @import("std");
const JSLexer = bun.js_lexer;
-const logger = @import("bun").logger;
+const logger = bun.logger;
const js_parser = bun.js_parser;
const json_parser = bun.JSON;
@@ -30,23 +30,23 @@ const NodeModuleBundle = @import("../node_module_bundle.zig").NodeModuleBundle;
const DotEnv = @import("../env_loader.zig");
const which = @import("../which.zig").which;
const Run = @import("../bun_js.zig").Run;
-const HeaderBuilder = @import("bun").HTTP.HeaderBuilder;
const Fs = @import("../fs.zig");
const FileSystem = Fs.FileSystem;
const Lock = @import("../lock.zig").Lock;
const URL = @import("../url.zig").URL;
-const AsyncHTTP = @import("bun").HTTP.AsyncHTTP;
-const HTTPChannel = @import("bun").HTTP.HTTPChannel;
-const NetworkThread = @import("bun").HTTP.NetworkThread;
-const HTTP = @import("bun").HTTP;
+const HTTP = bun.HTTP;
+const AsyncHTTP = HTTP.AsyncHTTP;
+const HTTPChannel = HTTP.HTTPChannel;
+const NetworkThread = HTTP.NetworkThread;
+const HeaderBuilder = HTTP.HeaderBuilder;
const Integrity = @import("./integrity.zig").Integrity;
-const clap = @import("bun").clap;
+const clap = bun.clap;
const ExtractTarball = @import("./extract_tarball.zig");
const Npm = @import("./npm.zig");
const Bitset = @import("./bit_set.zig").DynamicBitSetUnmanaged;
const z_allocator = @import("../memory_allocator.zig").z_allocator;
-const Syscall = @import("bun").JSC.Node.Syscall;
+const Syscall = bun.JSC.Node.Syscall;
const RunCommand = @import("../cli/run_command.zig").RunCommand;
threadlocal var initialized_store = false;
const Futex = @import("../futex.zig");
@@ -160,6 +160,7 @@ const NetworkTask = struct {
http: AsyncHTTP = undefined,
task_id: u64,
url_buf: []const u8 = &[_]u8{},
+ retried: u16 = 0,
allocator: std.mem.Allocator,
request_buffer: MutableString = undefined,
response_buffer: MutableString = undefined,
@@ -339,7 +340,6 @@ const NetworkTask = struct {
this.package_manager.httpProxy(url),
null,
);
- this.http.max_retry_count = this.package_manager.options.max_retry_count;
this.callback = .{
.package_manifest = .{
.name = try strings.StringOrTinyString.initAppendIfNeeded(name, *FileSystem.FilenameStore, &FileSystem.FilenameStore.instance),
@@ -416,7 +416,6 @@ const NetworkTask = struct {
this.package_manager.httpProxy(url),
null,
);
- this.http.max_retry_count = this.package_manager.options.max_retry_count;
this.callback = .{ .extract = tarball };
}
};
@@ -1559,7 +1558,7 @@ const TaskCallbackList = std.ArrayListUnmanaged(TaskCallbackContext);
const TaskDependencyQueue = std.HashMapUnmanaged(u64, TaskCallbackList, IdentityContext(u64), 80);
const TaskChannel = sync.Channel(Task, .{ .Static = 4096 });
const NetworkChannel = sync.Channel(*NetworkTask, .{ .Static = 8192 });
-const ThreadPool = @import("bun").ThreadPool;
+const ThreadPool = bun.ThreadPool;
const PackageManifestMap = std.HashMapUnmanaged(PackageNameHash, Npm.PackageManifest, IdentityContext(PackageNameHash), 80);
const RepositoryMap = std.HashMapUnmanaged(u64, std.os.fd_t, IdentityContext(u64), 80);
@@ -1568,7 +1567,7 @@ pub const CacheLevel = struct {
use_etag: bool,
use_last_modified: bool,
};
-const AsyncIO = @import("bun").AsyncIO;
+const AsyncIO = bun.AsyncIO;
const Waker = AsyncIO.Waker;
// We can't know all the packages we need until we've downloaded all the packages
@@ -3577,6 +3576,7 @@ pub const PackageManager = struct {
comptime log_level: Options.LogLevel,
) anyerror!void {
var has_updated_this_run = false;
+ var has_network_error = false;
var timestamp_this_tick: ?u32 = null;
@@ -3598,7 +3598,28 @@ pub const PackageManager = struct {
const response = task.http.response orelse {
const err = task.http.err orelse error.HTTPError;
- if (@TypeOf(callbacks.onPackageManifestError) != void) {
+ if (task.retried < manager.options.max_retry_count) {
+ task.retried += 1;
+ if (!has_network_error) {
+ has_network_error = true;
+ const min = manager.options.min_simultaneous_requests;
+ const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic);
+ if (max > min) {
+ AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .Monotonic);
+ }
+ }
+ manager.enqueueNetworkTask(task);
+
+ if (manager.options.log_level.isVerbose()) {
+ manager.log.addWarningFmt(
+ null,
+ logger.Loc.Empty,
+ manager.allocator,
+ "<r><yellow>warn:<r> {s} downloading package manifest <b>{s}<r>",
+ .{ bun.span(@errorName(err)), name.slice() },
+ ) catch unreachable;
+ }
+ } else if (@TypeOf(callbacks.onPackageManifestError) != void) {
callbacks.onPackageManifestError(
extract_ctx,
name.slice(),
@@ -3607,8 +3628,7 @@ pub const PackageManager = struct {
);
} else if (comptime log_level != .silent) {
const fmt = "\n<r><red>error<r>: {s} downloading package manifest <b>{s}<r>\n";
- const error_name: string = bun.span(@errorName(err));
- const args = .{ error_name, name.slice() };
+ const args = .{ bun.span(@errorName(err)), name.slice() };
if (comptime log_level.showProgress()) {
Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress);
} else {
@@ -3759,9 +3779,34 @@ pub const PackageManager = struct {
.extract => |extract| {
const response = task.http.response orelse {
const err = task.http.err orelse error.TarballFailedToDownload;
- const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id];
- if (@TypeOf(callbacks.onPackageDownloadError) != void) {
+ if (task.retried < manager.options.max_retry_count) {
+ task.retried += 1;
+ if (!has_network_error) {
+ has_network_error = true;
+ const min = manager.options.min_simultaneous_requests;
+ const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic);
+ if (max > min) {
+ AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .Monotonic);
+ }
+ }
+ manager.enqueueNetworkTask(task);
+
+ if (manager.options.log_level.isVerbose()) {
+ manager.log.addWarningFmt(
+ null,
+ logger.Loc.Empty,
+ manager.allocator,
+ "<r><yellow>warn:<r> {s} downloading tarball <b>{s}@{s}<r>",
+ .{
+ bun.span(@errorName(err)),
+ extract.name.slice(),
+ extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items),
+ },
+ ) catch unreachable;
+ }
+ } else if (@TypeOf(callbacks.onPackageDownloadError) != void) {
+ const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id];
callbacks.onPackageDownloadError(
extract_ctx,
package_id,
@@ -3770,18 +3815,18 @@ pub const PackageManager = struct {
err,
task.url_buf,
);
- } else {
+ } else if (comptime log_level != .silent) {
const fmt = "\n<r><red>error<r>: {s} downloading tarball <b>{s}@{s}<r>\n";
- const error_name: string = bun.span(@errorName(err));
- const args = .{ error_name, extract.name.slice(), extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items) };
-
- if (comptime log_level != .silent) {
- if (comptime log_level.showProgress()) {
- Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress);
- } else {
- Output.prettyErrorln(fmt, args);
- Output.flush();
- }
+ const args = .{
+ bun.span(@errorName(err)),
+ extract.name.slice(),
+ extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items),
+ };
+ if (comptime log_level.showProgress()) {
+ Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress);
+ } else {
+ Output.prettyErrorln(fmt, args);
+ Output.flush();
}
}
@@ -4194,6 +4239,7 @@ pub const PackageManager = struct {
// 2. Has a platform and/or os specified, which evaluates to not disabled
native_bin_link_allowlist: []const PackageNameHash = &default_native_bin_link_allowlist,
max_retry_count: u16 = 5,
+ min_simultaneous_requests: usize = 4,
pub fn shouldPrintCommandName(this: *const Options) bool {
return this.log_level != .silent and this.do.summary;
@@ -4497,9 +4543,7 @@ pub const PackageManager = struct {
}
if (env.map.get("BUN_CONFIG_HTTP_RETRY_COUNT")) |retry_count| {
- if (std.fmt.parseInt(i32, retry_count, 10)) |int| {
- this.max_retry_count = @intCast(u16, @min(@max(int, 0), 65355));
- } else |_| {}
+ if (std.fmt.parseInt(u16, retry_count, 10)) |int| this.max_retry_count = int else |_| {}
}
if (env.map.get("BUN_CONFIG_LINK_NATIVE_BINS")) |native_packages| {
@@ -4522,31 +4566,7 @@ pub const PackageManager = struct {
// this.enable.deduplicate_packages = false;
// }
- if (env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
- load: {
- AsyncHTTP.max_simultaneous_requests = std.fmt.parseInt(u16, max_http_requests, 10) catch {
- log.addErrorFmt(
- null,
- logger.Loc.Empty,
- allocator,
- "BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
- .{max_http_requests},
- ) catch unreachable;
- break :load;
- };
-
- if (AsyncHTTP.max_simultaneous_requests == 0) {
- log.addWarningFmt(
- null,
- logger.Loc.Empty,
- allocator,
- "BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
- .{},
- ) catch unreachable;
- AsyncHTTP.max_simultaneous_requests = 255;
- }
- }
- }
+ AsyncHTTP.loadEnv(allocator, log, env);
if (env.map.get("BUN_CONFIG_SKIP_SAVE_LOCKFILE")) |check_bool| {
this.do.save_lockfile = strings.eqlComptime(check_bool, "0");