diff options
author | 2022-02-04 20:50:21 -0800 | |
---|---|---|
committer | 2022-02-04 20:50:21 -0800 | |
commit | dabcac2e96e67356e89e7fb57f6acc62cd6a47a9 (patch) | |
tree | 53dc7668c5a2d97dd983711973f99c4df312f80f | |
parent | c03b7a6f19044e37985128a9e1751a8dc82f13fc (diff) | |
download | bun-dabcac2e96e67356e89e7fb57f6acc62cd6a47a9.tar.gz bun-dabcac2e96e67356e89e7fb57f6acc62cd6a47a9.tar.zst bun-dabcac2e96e67356e89e7fb57f6acc62cd6a47a9.zip |
Several reliability improvements to HTTP
-rw-r--r-- | src/deps/boringssl.translated.zig | 4 | ||||
-rw-r--r-- | src/http/async_bio.zig | 20 | ||||
-rw-r--r-- | src/http/async_socket.zig | 85 | ||||
-rw-r--r-- | src/http_client_async.zig | 106 | ||||
-rw-r--r-- | src/install/extract_tarball.zig | 2 | ||||
-rw-r--r-- | src/install/install.zig | 26 | ||||
-rw-r--r-- | src/install/npm.zig | 21 | ||||
-rw-r--r-- | src/javascript/jsc/webcore/response.zig | 3 | ||||
-rw-r--r-- | src/libarchive/libarchive.zig | 26 | ||||
-rw-r--r-- | src/network_thread.zig | 31 | ||||
-rw-r--r-- | src/report.zig | 2 | ||||
-rw-r--r-- | src/thread_pool.zig | 4 |
12 files changed, 189 insertions, 141 deletions
diff --git a/src/deps/boringssl.translated.zig b/src/deps/boringssl.translated.zig index bb0f03136..b30d99e7e 100644 --- a/src/deps/boringssl.translated.zig +++ b/src/deps/boringssl.translated.zig @@ -18702,6 +18702,10 @@ pub const SSL = opaque { HandshakeHintsReady, }; + pub fn shutdown(this: *SSL) void { + _ = SSL_shutdown(this); + } + pub inline fn deinit(this: *SSL) void { _ = SSL_free(this); } diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index ab1c7a645..e6ac5e71c 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -16,6 +16,7 @@ const connection_closed = -2; const pending = -1; const OK = 0; const ObjectPool = @import("../pool.zig").ObjectPool; +const Environment = @import("../env.zig"); const Packet = struct { completion: Completion, @@ -152,6 +153,7 @@ pub fn doSocketWrite(this: *AsyncBIO, completion: *Completion, result_: AsyncIO. return; } + if (this.socket_fd == 0) return; this.scheduleSocketWrite(completion.operation.slice()[remain..]); } @@ -257,6 +259,11 @@ pub const Bio = struct { var this = cast(this_bio); const len = @intCast(u32, len_); + if (this.socket_fd == 0) { + if (comptime Environment.allow_assert) std.debug.assert(false); // socket_fd should never be 0 + return -1; + } + if (this.socket_send_error != null) { if (extremely_verbose) { Output.prettyErrorln("write: {s}", .{@errorName(this.socket_send_error.?)}); @@ -329,9 +336,15 @@ pub const Bio = struct { // overreading, but issuing one is more efficient. SSL sockets are not // reused after shutdown for non-SSL traffic, so overreading is fine. assert(bio_read_offset == 0); + + if (this.socket_fd == 0) { + if (comptime Environment.allow_assert) std.debug.assert(false); // socket_fd should never be 0 + return -1; + } + if (this.pending_reads == 0) { - this.scheduleSocketRead(len__); this.pending_reads += 1; + this.scheduleSocketRead(len__); } boring.BIO_set_retry_read(this_bio); @@ -351,6 +364,11 @@ pub const Bio = struct { var bytes = this.recv_buffer.?.data[bio_read_offset..socket_recv_len_]; if (len__ > @truncate(u32, bytes.len)) { + if (this.socket_fd == 0) { + if (comptime Environment.allow_assert) std.debug.assert(false); // socket_fd should never be 0 + return -1; + } + if (this.pending_reads == 0) { // if this is true, we will never have enough space if (socket_recv_len_ + len__ >= buffer_pool_len and len_ < buffer_pool_len) { diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig index 43ac9f860..205940b2d 100644 --- a/src/http/async_socket.zig +++ b/src/http/async_socket.zig @@ -38,7 +38,7 @@ read_completion: AsyncIO.Completion = undefined, connect_completion: AsyncIO.Completion = undefined, close_completion: AsyncIO.Completion = undefined, -const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError || error{UnknownHostName}; +const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError || error{ UnknownHostName, FailedToOpenSocket }; pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: std.mem.Allocator) !AsyncSocket { var head = AsyncMessage.get(allocator); @@ -55,13 +55,18 @@ fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void { } fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void { - const sockfd = AsyncIO.openSocket(address.any.family, OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP) catch |err| { - if (extremely_verbose) { - Output.prettyErrorln("openSocket error: {s}", .{@errorName(err)}); - } + const sockfd = if (this.socket > 0) + this.socket + else + AsyncIO.openSocket(address.any.family, OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP) catch |err| { + if (extremely_verbose) { + Output.prettyErrorln("openSocket error: {s}", .{@errorName(err)}); + } + this.socket = 0; - return error.ConnectionRefused; - }; + return error.FailedToOpenSocket; + }; + this.socket = sockfd; this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); suspend { @@ -71,9 +76,6 @@ fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!v if (this.err) |e| { return @errSetCast(ConnectError, e); } - - this.socket = sockfd; - return; } fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void { @@ -82,58 +84,39 @@ fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void pub fn close(this: *AsyncSocket) void { if (this.socket == 0) return; - this.io.close(*AsyncSocket, this, on_close, &this.close_completion, this.socket); + const to_close = this.socket; + this.socket = 0; + this.io.close(*AsyncSocket, this, on_close, &this.close_completion, to_close); suspend { this.close_frame = @frame().*; } - this.socket = 0; } pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { - this.socket = 0; + this.close(); + outer: while (true) { // on macOS, getaddrinfo() is very slow // If you send ~200 network requests, about 1.5s is spent on getaddrinfo() // So, we cache this. - var address_list = NetworkThread.getAddressList(getAllocator(), name, port) catch |err| { + var list = NetworkThread.getAddressList(getAllocator(), name, port) catch |err| { return @errSetCast(ConnectError, err); }; - const list = address_list.address_list; if (list.addrs.len == 0) return error.ConnectionRefused; - try_cached_index: { - if (address_list.index) |i| { - const address = list.addrs[i]; - if (address_list.invalidated) continue :outer; - - this.connectToAddress(address) catch |err| { - if (err == error.ConnectionRefused) { - address_list.index = null; - break :try_cached_index; - } - - address_list.invalidate(); - continue :outer; - }; - } - } - - for (list.addrs) |address, i| { - if (address_list.invalidated) continue :outer; + for (list.addrs) |address| { this.connectToAddress(address) catch |err| { + this.close(); + if (err == error.ConnectionRefused) continue; - address_list.invalidate(); if (err == error.AddressNotAvailable or err == error.UnknownHostName) continue :outer; - return err; }; - address_list.index = @truncate(u32, i); return; } - if (address_list.invalidated) continue :outer; + this.close(); - address_list.invalidate(); return error.ConnectionRefused; } } @@ -201,6 +184,7 @@ pub fn deinit(this: *AsyncSocket) void { this.sent = 0; this.read_context = &[_]u8{}; this.read_offset = 0; + this.socket = 0; } pub fn send(this: *AsyncSocket) SendError!usize { @@ -423,6 +407,16 @@ pub const SSL = struct { } pub fn close(this: *SSL) void { + if (this.ssl_loaded) { + this.ssl.shutdown(); + this.ssl.deinit(); + this.ssl_loaded = false; + } + + if (this.ssl_bio_loaded) { + this.ssl_bio.socket_fd = 0; + } + this.socket.close(); } @@ -796,7 +790,7 @@ pub const SSL = struct { this.socket.deinit(); if (this.ssl_loaded) { - _ = boring.SSL_shutdown(this.ssl); + this.ssl.shutdown(); this.ssl.deinit(); this.ssl_loaded = false; } @@ -805,13 +799,18 @@ pub const SSL = struct { this.ssl_bio_loaded = false; if (this.ssl_bio.recv_buffer) |recv| { recv.release(); - this.ssl_bio.recv_buffer = null; } + this.ssl_bio.recv_buffer = null; if (this.ssl_bio.send_buffer) |recv| { recv.release(); - this.ssl_bio.send_buffer = null; } + this.ssl_bio.send_buffer = null; + + if (this.ssl_bio.bio) |bio| { + bio.deinit(); + } + this.ssl_bio.bio = null; this.ssl_bio.pending_reads = 0; this.ssl_bio.pending_sends = 0; @@ -824,6 +823,8 @@ pub const SSL = struct { this.ssl_bio.socket_fd = 0; this.ssl_bio.onReady = null; + } else { + this.ssl_bio = undefined; } this.handshake_complete = false; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index bd5c43d2d..b1d50c36c 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -67,7 +67,7 @@ else pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; -pub const extremely_verbose = Environment.isDebug; +pub const extremely_verbose = false; fn writeRequest( comptime Writer: type, @@ -110,6 +110,7 @@ disable_shutdown: bool = true, timeout: usize = 0, progress_node: ?*std.Progress.Node = null, socket: AsyncSocket.SSL = undefined, +socket_loaded: bool = false, gzip_elapsed: u64 = 0, stage: Stage = Stage.pending, @@ -237,10 +238,11 @@ pub const HTTPChannelContext = struct { http: AsyncHTTP = undefined, channel: *HTTPChannel, - pub fn callback(http: *AsyncHTTP, sender: *AsyncHTTP.HTTPSender) void { + pub fn callback( + http: *AsyncHTTP, + ) void { var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", http); this.channel.writeItem(http) catch unreachable; - sender.onFinish(); } }; @@ -257,6 +259,8 @@ pub const AsyncHTTP = struct { max_retry_count: u32 = 0, url: URL, + task: ThreadPool.Task = ThreadPool.Task{ .callback = HTTPSender.callback }, + /// Timeout in nanoseconds timeout: usize = 0, @@ -277,7 +281,7 @@ pub const AsyncHTTP = struct { callback: ?CompletionCallback = null, callback_ctx: ?*anyopaque = null, - pub const CompletionCallback = fn (this: *AsyncHTTP, sender: *HTTPSender) void; + pub const CompletionCallback = fn (this: *AsyncHTTP) void; pub var active_requests_count = std.atomic.Atomic(u32).init(0); pub var max_simultaneous_requests: u16 = 32; @@ -315,17 +319,15 @@ pub const AsyncHTTP = struct { return this; } - pub fn schedule(this: *AsyncHTTP, allocator: std.mem.Allocator, batch: *ThreadPool.Batch) void { + pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void { std.debug.assert(NetworkThread.global_loaded.load(.Monotonic) == 1); - var sender = HTTPSender.get(this, allocator); this.state.store(.scheduled, .Monotonic); - batch.push(ThreadPool.Batch.from(&sender.task)); + batch.push(ThreadPool.Batch.from(&this.task)); } - fn sendSyncCallback(this: *AsyncHTTP, sender: *HTTPSender) void { + fn sendSyncCallback(this: *AsyncHTTP) void { var single_http_channel = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); single_http_channel.channel.writeItem(this) catch unreachable; - sender.release(); } pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response { @@ -356,59 +358,30 @@ pub const AsyncHTTP = struct { unreachable; } - var http_sender_head: std.atomic.Atomic(?*HTTPSender) = std.atomic.Atomic(?*HTTPSender).init(null); - pub const HTTPSender = struct { - task: ThreadPool.Task = .{ .callback = callback }, frame: @Frame(AsyncHTTP.do) = undefined, - http: *AsyncHTTP = undefined, - - next: ?*HTTPSender = null, - - pub fn get(http: *AsyncHTTP, allocator: std.mem.Allocator) *HTTPSender { - @fence(.Acquire); - - var head_ = http_sender_head.load(.Monotonic); + finisher: ThreadPool.Task = .{ .callback = onFinish }, - if (head_ == null) { - var new_head = allocator.create(HTTPSender) catch unreachable; - new_head.* = HTTPSender{}; - new_head.next = null; - new_head.task = .{ .callback = callback }; - new_head.http = http; - return new_head; - } - - http_sender_head.store(head_.?.next, .Monotonic); - - head_.?.* = HTTPSender{}; - head_.?.next = null; - head_.?.task = .{ .callback = callback }; - head_.?.http = http; - - return head_.?; - } - - pub fn release(this: *HTTPSender) void { - @fence(.Acquire); - this.task = .{ .callback = callback }; - this.http = undefined; - this.next = http_sender_head.swap(this, .Monotonic); - } + pub const Pool = ObjectPool(HTTPSender, null, false, 8); pub fn callback(task: *ThreadPool.Task) void { - var this = @fieldParentPtr(HTTPSender, "task", task); - this.frame = async AsyncHTTP.do(this); + var this = @fieldParentPtr(AsyncHTTP, "task", task); + var sender = HTTPSender.Pool.get(default_allocator); + sender.data = .{ + .frame = undefined, + .finisher = .{ .callback = onFinish }, + }; + sender.data.frame = async do(&sender.data, this); } - pub fn onFinish(this: *HTTPSender) void { - this.release(); + pub fn onFinish(task: *ThreadPool.Task) void { + var this = @fieldParentPtr(HTTPSender, "finisher", task); + @fieldParentPtr(HTTPSender.Pool.Node, "data", this).release(); } }; - pub fn do(sender: *HTTPSender) void { + pub fn do(sender: *HTTPSender, this: *AsyncHTTP) void { outer: { - var this = sender.http; this.err = null; this.state.store(.sending, .Monotonic); var timer = std.time.Timer.start() catch @panic("Timer failure"); @@ -418,10 +391,10 @@ pub const AsyncHTTP = struct { this.state.store(.fail, .Monotonic); this.err = err; - if (sender.http.max_retry_count > sender.http.retries_count) { - sender.http.retries_count += 1; - sender.http.response_buffer.reset(); - NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task)); + if (this.max_retry_count > this.retries_count) { + this.retries_count += 1; + this.response_buffer.reset(); + NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&this.task)); return; } break :outer; @@ -432,9 +405,10 @@ pub const AsyncHTTP = struct { this.gzip_elapsed = this.client.gzip_elapsed; } - if (sender.http.callback) |callback| { - callback(sender.http, sender); + if (this.callback) |callback| { + callback(this); } + NetworkThread.global.pool.schedule(.{ .head = &sender.finisher, .tail = &sender.finisher, .len = 1 }); } }; @@ -545,6 +519,7 @@ pub fn connect( const port = this.url.getPortAuto(); try connector.connect(this.url.hostname, port); + std.debug.assert(this.socket.socket.socket > 0); var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; // client.setQuickACK(true) catch {}; @@ -560,11 +535,20 @@ pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableStri } pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { - defer if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit(); + defer { + if (this.socket_loaded) { + this.socket_loaded = false; + this.socket.deinit(); + } + } // this prevents stack overflow redirect: while (this.remaining_redirect_count >= -1) { - if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit(); + if (this.socket_loaded) { + this.socket_loaded = false; + this.socket.deinit(); + } + _ = AsyncHTTP.active_requests_count.fetchAdd(1, .Monotonic); defer { _ = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); @@ -607,11 +591,13 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin this.socket = AsyncSocket.SSL{ .socket = try AsyncSocket.init(&AsyncIO.global, 0, default_allocator), }; + this.socket_loaded = true; this.stage = Stage.connect; var socket = &this.socket.socket; try this.connect(*AsyncSocket, socket); this.stage = Stage.request; defer this.socket.close(); + var request = buildRequest(this, body.len); if (this.verbose) { Output.prettyErrorln("{s}", .{request}); @@ -1039,6 +1025,8 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response { this.socket = try AsyncSocket.SSL.init(default_allocator, &AsyncIO.global); + this.socket_loaded = true; + var socket = &this.socket; this.stage = Stage.connect; try this.connect(*AsyncSocket.SSL, socket); diff --git a/src/install/extract_tarball.zig b/src/install/extract_tarball.zig index 5594ff0b1..22b4d7592 100644 --- a/src/install/extract_tarball.zig +++ b/src/install/extract_tarball.zig @@ -250,7 +250,7 @@ fn extract(this: *const ExtractTarball, tgz_bytes: []const u8) !string { // We return a resolved absolute absolute file path to the cache dir. // To get that directory, we open the directory again. - var final_dir = cache_dir.openDirZ(folder_name, .{ .iterate = true }) catch |err| { + var final_dir = cache_dir.openDirZ(folder_name, .{ .iterate = false }) catch |err| { Output.prettyErrorln( "<r><red>Error {s}<r> failed to verify cache dir for {s}", .{ diff --git a/src/install/install.zig b/src/install/install.zig index a8446efd0..051eec1f4 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -179,9 +179,8 @@ const NetworkTask = struct { binlink: void, }, - pub fn notify(http: *AsyncHTTP, sender: *AsyncHTTP.HTTPSender) void { + pub fn notify(http: *AsyncHTTP) void { PackageManager.instance.network_channel.writeItem(@fieldParentPtr(NetworkTask, "http", http)) catch {}; - sender.onFinish(); } const default_headers_buf: string = "Acceptapplication/vnd.npm.install-v1+json"; @@ -3884,7 +3883,7 @@ pub const PackageManager = struct { pub fn isFolderInCache(this: *PackageManager, folder_path: stringZ) bool { // TODO: is this slow? - var dir = this.getCacheDirectory().openDirZ(folder_path, .{ .iterate = true }) catch return false; + var dir = this.getCacheDirectory().openDirZ(folder_path, .{ .iterate = false }) catch return false; dir.close(); return true; } @@ -4356,11 +4355,6 @@ pub const PackageManager = struct { _ = this.network_dedupe_map.remove(task_id); continue :retry_from_manifests_ptr; } - - // We want to make sure the temporary directory & cache directory are loaded on the main thread - // so that we don't run into weird threading issues - // the call to getCacheDirectory() above handles the cache dir - _ = this.getTemporaryDirectory(); } } @@ -5299,7 +5293,7 @@ pub const PackageManager = struct { comptime params: []const ParamType, ) !*PackageManager { // assume that spawning a thread will take a lil so we do that asap - try NetworkThread.init(); + try NetworkThread.warmup(); var cli = try CommandLineArguments.parse(ctx.allocator, params); @@ -6092,6 +6086,7 @@ pub const PackageManager = struct { std.mem.copy(u8, &node_modules_buf, entry.name); node_modules_buf[entry.name.len] = 0; var buf: [:0]u8 = node_modules_buf[0..entry.name.len :0]; + var file = node_modules_bin.openFileZ(buf, .{ .read = true }) catch { node_modules_bin.deleteFileZ(buf) catch {}; continue :iterator; @@ -6793,6 +6788,9 @@ pub const PackageManager = struct { var deps = &manager.lockfile.buffers.dependencies; var res = &manager.lockfile.buffers.resolutions; + _ = manager.getCacheDirectory(); + _ = manager.getTemporaryDirectory(); + while (std.mem.indexOfScalar(PackageID, remaining, invalid_package_id)) |next_i_| { remaining = remaining[next_i_ + 1 ..]; @@ -6834,6 +6832,11 @@ pub const PackageManager = struct { root = try manager.lockfile.appendPackage(root); manager.root_dependency_list = root.dependencies; + + if (root.dependencies.len > 0) { + _ = manager.getCacheDirectory(); + _ = manager.getTemporaryDirectory(); + } manager.enqueueDependencyList( root.dependencies, true, @@ -6846,6 +6849,11 @@ pub const PackageManager = struct { _ = manager.scheduleNetworkTasks(); if (manager.pending_tasks > 0) { + if (root.dependencies.len > 0) { + _ = manager.getCacheDirectory(); + _ = manager.getTemporaryDirectory(); + } + if (comptime log_level.showProgress()) { manager.downloads_node = try manager.progress.start(ProgressStrings.download(), 0); manager.progress.supports_ansi_escape_codes = Output.enable_ansi_colors_stderr; diff --git a/src/install/npm.zig b/src/install/npm.zig index 61e2b064c..65bf61602 100644 --- a/src/install/npm.zig +++ b/src/install/npm.zig @@ -457,6 +457,16 @@ pub const PackageManifest = struct { } } + fn writeFile(this: *const PackageManifest, tmp_path: [:0]const u8, tmpdir: std.fs.Dir) !void { + var tmpfile = try tmpdir.createFileZ(tmp_path, .{ + .truncate = true, + }); + defer tmpfile.close(); + var writer = tmpfile.writer(); + try Serializer.write(this, @TypeOf(writer), writer); + std.os.fdatasync(tmpfile.handle) catch {}; + } + pub fn save(this: *const PackageManifest, tmpdir: std.fs.Dir, cache_dir: std.fs.Dir) !void { const file_id = std.hash.Wyhash.hash(0, this.name()); var dest_path_buf: [512 + 64]u8 = undefined; @@ -466,16 +476,7 @@ pub const PackageManifest = struct { try dest_path_stream_writer.print("{x}.npm-{x}", .{ file_id, @maximum(std.time.milliTimestamp(), 0) }); try dest_path_stream_writer.writeByte(0); var tmp_path: [:0]u8 = dest_path_buf[0 .. dest_path_stream.pos - 1 :0]; - { - var tmpfile = try tmpdir.createFileZ(tmp_path, .{ - .truncate = true, - }); - var writer = tmpfile.writer(); - try Serializer.write(this, @TypeOf(writer), writer); - std.os.fdatasync(tmpfile.handle) catch {}; - tmpfile.close(); - } - + try writeFile(this, tmp_path, tmpdir); var out_path = std.fmt.bufPrintZ(&out_path_buf, "{x}.npm", .{file_id}) catch unreachable; try std.os.renameatZ(tmpdir.fd, tmp_path, cache_dir.fd, out_path); } diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig index 1d5281626..70b87b53a 100644 --- a/src/javascript/jsc/webcore/response.zig +++ b/src/javascript/jsc/webcore/response.zig @@ -639,11 +639,10 @@ pub const Fetch = struct { return node; } - pub fn callback(http_: *HTTPClient.AsyncHTTP, sender: *HTTPClient.AsyncHTTP.HTTPSender) void { + pub fn callback(http_: *HTTPClient.AsyncHTTP) void { var task: *FetchTasklet = @fieldParentPtr(FetchTasklet, "http", http_); @atomicStore(Status, &task.status, Status.done, .Monotonic); task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task)); - sender.release(); } }; diff --git a/src/libarchive/libarchive.zig b/src/libarchive/libarchive.zig index 9e960d1a6..935c3114b 100644 --- a/src/libarchive/libarchive.zig +++ b/src/libarchive/libarchive.zig @@ -407,7 +407,8 @@ pub const Archive = struct { switch (r) { Status.eof => break :loop, - Status.failed, Status.fatal, Status.retry => return error.Fail, + Status.retry => continue :loop, + Status.failed, Status.fatal => return error.Fail, else => { // do not use the utf8 name there // it will require us to pull in libiconv @@ -481,7 +482,8 @@ pub const Archive = struct { switch (r) { Status.eof => break :loop, - Status.failed, Status.fatal, Status.retry => return error.Fail, + Status.retry => continue :loop, + Status.failed, Status.fatal => return error.Fail, else => { var pathname: [:0]const u8 = std.mem.sliceTo(lib.archive_entry_pathname(entry).?, 0); var tokenizer = std.mem.tokenize(u8, std.mem.span(pathname), std.fs.path.sep_str); @@ -544,7 +546,25 @@ pub const Archive = struct { } } - _ = lib.archive_read_data_into_fd(archive, file.handle); + var retries_remaining: u8 = 5; + possibly_retry: while (retries_remaining != 0) : (retries_remaining -= 1) { + switch (lib.archive_read_data_into_fd(archive, file.handle)) { + lib.ARCHIVE_EOF => break :loop, + lib.ARCHIVE_OK => break :possibly_retry, + lib.ARCHIVE_RETRY => { + if (comptime log) { + Output.prettyErrorln("[libarchive] Error extracting {s}, retry {d} / {d}", .{ std.mem.span(pathname_), retries_remaining, 5 }); + } + }, + else => { + if (comptime log) { + const archive_error = std.mem.span(lib.archive_error_string(archive)); + Output.prettyErrorln("[libarchive] Error extracting {s}: {s}", .{ std.mem.span(pathname_), archive_error }); + } + return error.Fail; + }, + } + } } }, } diff --git a/src/network_thread.zig b/src/network_thread.zig index 0a39f574a..ba1db6524 100644 --- a/src/network_thread.zig +++ b/src/network_thread.zig @@ -47,21 +47,26 @@ const CachedAddressList = struct { pub const AddressListCache = std.HashMap(u64, CachedAddressList, IdentityContext(u64), 80); pub var address_list_cached: AddressListCache = undefined; -pub fn getAddressList(allocator: std.mem.Allocator, name: []const u8, port: u16) !*CachedAddressList { - const hash = CachedAddressList.hash(name, port); - const now = @intCast(u64, @maximum(0, std.time.milliTimestamp())); - if (address_list_cached.getPtr(hash)) |cached| { - if (cached.expire_after > now) { - return cached; - } +pub fn getAddressList(allocator: std.mem.Allocator, name: []const u8, port: u16) !*std.net.AddressList { + // const hash = CachedAddressList.hash(name, port); + // const now = @intCast(u64, @maximum(0, std.time.milliTimestamp())); + // if (address_list_cached.getPtr(hash)) |cached| { + // if (cached.expire_after > now) { + // return cached; + // } - cached.address_list.deinit(); - } + // cached.address_list.deinit(); + // } + + return try std.net.getAddressList(allocator, name, port); +} - const address_list = try std.net.getAddressList(allocator, name, port); - var entry = try address_list_cached.getOrPut(hash); - entry.value_ptr.* = CachedAddressList.init(hash, address_list, now); - return entry.value_ptr; +pub var has_warmed = false; +pub fn warmup() !void { + if (has_warmed) return; + has_warmed = true; + try init(); + global.pool.forceSpawn(); } pub fn init() !void { diff --git a/src/report.zig b/src/report.zig index 4cb7bb6c5..ed2b697ff 100644 --- a/src/report.zig +++ b/src/report.zig @@ -28,7 +28,7 @@ pub const CrashReportWriter = struct { pub fn printFrame(_: ?*anyopaque, frame: CrashReporter.StackFrame) void { const function_name = if (frame.function_name.len > 0) frame.function_name else "[function ?]"; const filename = if (frame.filename.len > 0) frame.function_name else "[file ?]"; - crash_report_writer.print("[{d}] - <b>{s}<r> {s}:{d}\n", .{ frame.pc, function_name, filename, frame.line_number }); + crash_report_writer.print("[0x{X}] - <b>{s}<r> {s}:{d}\n", .{ frame.pc, function_name, filename, frame.line_number }); } pub fn dump() void { diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 166705a81..c398151ca 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -122,6 +122,10 @@ pub fn schedule(self: *ThreadPool, batch: Batch) void { self.run_queue.push(list); } + forceSpawn(self); +} + +pub fn forceSpawn(self: *ThreadPool) void { // Try to notify a thread const is_waking = false; return self.notify(is_waking); |