diff options
author | 2022-01-05 16:41:58 -0800 | |
---|---|---|
committer | 2022-01-05 16:41:58 -0800 | |
commit | 4b717fe5548fd587b66f0ccdd8dba8160a38077b (patch) | |
tree | 8075a63ad421c007b7fc280d986f1a061d72a050 /src | |
parent | 236a0fde35ba7f9f3b7f6f11457d4b44b92d7c55 (diff) | |
download | bun-4b717fe5548fd587b66f0ccdd8dba8160a38077b.tar.gz bun-4b717fe5548fd587b66f0ccdd8dba8160a38077b.tar.zst bun-4b717fe5548fd587b66f0ccdd8dba8160a38077b.zip |
Fix crash that sometimes happens after 30 seconds
Diffstat (limited to 'src')
-rw-r--r-- | src/analytics/analytics_thread.zig | 5 | ||||
-rw-r--r-- | src/cli/create_command.zig | 8 | ||||
-rw-r--r-- | src/cli/upgrade_command.zig | 8 | ||||
-rw-r--r-- | src/http_client_async.zig | 146 | ||||
-rw-r--r-- | src/pool.zig | 124 |
5 files changed, 185 insertions, 106 deletions
diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig index e883a798f..f5caa7f10 100644 --- a/src/analytics/analytics_thread.zig +++ b/src/analytics/analytics_thread.zig @@ -513,7 +513,10 @@ pub const EventList = struct { var retry_remaining: usize = 10; const rand = random.random(); retry: while (retry_remaining > 0) { - const response = this.async_http.sendSync() catch |err| { + this.async_http.max_retry_count = 0; + this.async_http.retries_count = 0; + + const response = this.async_http.sendSync(true) catch |err| { if (FeatureFlags.verbose_analytics) { Output.prettyErrorln("[Analytics] failed due to error {s} ({d} retries remain)", .{ @errorName(err), retry_remaining }); } diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index b653aa4a7..7e3ea96eb 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -1847,7 +1847,7 @@ pub const Example = struct { var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, &request_body, 60 * std.time.ns_per_min); async_http.client.progress_node = progress; - const response = try async_http.sendSync(); + const response = try async_http.sendSync(true); switch (response.status_code) { 404 => return error.GitHubRepositoryNotFound, @@ -1913,7 +1913,7 @@ pub const Example = struct { var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); async_http.client.progress_node = progress; - var response = try async_http.sendSync(); + var response = try async_http.sendSync(true); switch (response.status_code) { 404 => return error.ExampleNotFound, @@ -1992,7 +1992,7 @@ pub const Example = struct { refresher.maybeRefresh(); - response = try async_http.sendSync(); + response = try async_http.sendSync(true); refresher.maybeRefresh(); @@ -2023,7 +2023,7 @@ pub const Example = struct { async_http.client.progress_node = progress_node; } - const response = async_http.sendSync() catch |err| { + const response = async_http.sendSync(true) catch |err| { switch (err) { error.WouldBlock => { Output.prettyErrorln("Request timed out while trying to fetch examples list. Please try again", .{}); diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig index b3445e278..3f6a4840d 100644 --- a/src/cli/upgrade_command.zig +++ b/src/cli/upgrade_command.zig @@ -95,7 +95,9 @@ pub const UpgradeCheckerThread = struct { js_ast.Expr.Data.Store.deinit(); js_ast.Stmt.Data.Store.deinit(); } - const version = (try UpgradeCommand.getLatestVersion(default_allocator, env_loader, undefined, undefined, true)) orelse return; + var arena = std.heap.ArenaAllocator.init(default_allocator); + defer arena.deinit(); + const version = (try UpgradeCommand.getLatestVersion(arena.allocator(), env_loader, undefined, undefined, true)) orelse return; if (!version.isCurrent()) { if (version.name()) |name| { @@ -182,7 +184,7 @@ pub const UpgradeCommand = struct { var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable; async_http.* = try HTTP.AsyncHTTP.init(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, &request_body, 60 * std.time.ns_per_min); if (!silent) async_http.client.progress_node = progress; - const response = try async_http.sendSync(); + const response = try async_http.sendSync(true); switch (response.status_code) { 404 => return error.HTTP404, @@ -407,7 +409,7 @@ pub const UpgradeCommand = struct { ); async_http.client.timeout = timeout; async_http.client.progress_node = progress; - const response = try async_http.sendSync(); + const response = try async_http.sendSync(true); switch (response.status_code) { 404 => return error.HTTP404, diff --git a/src/http_client_async.zig b/src/http_client_async.zig index df5677ae7..0568591d6 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -78,12 +78,13 @@ tcp_client: tcp.Client = undefined, body_size: u32 = 0, read_count: u32 = 0, remaining_redirect_count: i8 = 127, -redirect: ?*URLBufferPool = null, +redirect: ?*URLBufferPool.Node = null, disable_shutdown: bool = true, timeout: usize = 0, progress_node: ?*std.Progress.Node = null, socket: AsyncSocket.SSL = undefined, gzip_elapsed: u64 = 0, +stage: Stage = Stage.pending, /// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since. /// This is a workaround for that. @@ -117,6 +118,14 @@ pub fn deinit(this: *HTTPClient) !void { } } +const Stage = enum(u8) { + pending, + connect, + request, + response, + done, +}; + // threadlocal var resolver_cache const tcp = std.x.net.tcp; const ip = std.x.net.ip; @@ -329,17 +338,19 @@ pub const AsyncHTTP = struct { sender.release(); } - pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response { + pub fn sendSync(this: *AsyncHTTP, comptime auto_release: bool) anyerror!picohttp.Response { this.callback_ctx = SingleHTTPChannelPool.get(default_allocator); defer { - var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?)); - SingleHTTPChannelPool.release(pooled_node); - this.callback_ctx = null; + if (comptime auto_release) { + var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?)); + SingleHTTPChannelPool.release(pooled_node); + this.callback_ctx = null; + } } this.callback = sendSyncCallback; var batch = NetworkThread.Batch{}; - this.schedule(this.allocator, &batch); + this.schedule(default_allocator, &batch); NetworkThread.global.pool.schedule(batch); while (true) { var pooled = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?)); @@ -442,76 +453,16 @@ pub const AsyncHTTP = struct { } }; -const BufferPool = struct { - pub const len = std.math.maxInt(u16) - 64; - buf: [len]u8 = undefined, - next: ?*BufferPool = null, - allocator: std.mem.Allocator = undefined, - - var head: ?*BufferPool = null; - - pub fn get(allocator: std.mem.Allocator) !*BufferPool { - if (head) |item| { - var this = item; - var head_ = item.next; - head = head_; - this.next = null; - - return this; - } - - var entry = try allocator.create(BufferPool); - entry.* = BufferPool{ .allocator = allocator }; - return entry; - } - - pub fn release(this: *BufferPool) void { - if (head) |item| { - item.next = this; - } else { - head = this; - } - } -}; - -const URLBufferPool = struct { - pub const len = 4096; - buf: [len]u8 = undefined, - next: ?*URLBufferPool = null, - allocator: std.mem.Allocator = undefined, - - var head: ?*URLBufferPool = null; - - pub fn get(allocator: std.mem.Allocator) !*URLBufferPool { - if (head) |item| { - var this = item; - var head_ = item.next; - head = head_; - this.next = null; - - return this; - } - - var entry = try allocator.create(URLBufferPool); - entry.* = URLBufferPool{ .allocator = allocator }; - return entry; - } - - pub fn release(this: *URLBufferPool) void { - if (head) |item| { - item.next = this; - } else { - head = this; - } - } -}; +const buffer_pool_len = std.math.maxInt(u16) - 64; +const BufferPool = ObjectPool([buffer_pool_len]u8, null, false); +const URLBufferPool = ObjectPool([4096]u8, null, false); pub const AsyncMessage = struct { used: u32 = 0, sent: u32 = 0, completion: AsyncIO.Completion = undefined, buf: []u8 = undefined, - pooled: ?*BufferPool = null, + pooled: ?*BufferPool.Node = null, allocator: std.mem.Allocator, next: ?*AsyncMessage = null, context: *anyopaque = undefined, @@ -520,6 +471,7 @@ pub const AsyncMessage = struct { pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage { if (_first_ssl) |first| { var prev = first; + std.debug.assert(prev.released); if (prev.next) |next| { _first_ssl = next; @@ -560,8 +512,8 @@ pub const AsyncMessage = struct { } var msg = allocator.create(AsyncMessage) catch unreachable; - var pooled = BufferPool.get(allocator) catch unreachable; - msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.buf, .pooled = pooled }; + var pooled = BufferPool.get(allocator); + msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled }; return msg; } @@ -689,7 +641,7 @@ const AsyncSocket = struct { // on macOS, getaddrinfo() is very slow // If you send ~200 network requests, about 1.5s is spent on getaddrinfo() // So, we cache this. - var address_list = NetworkThread.getAddressList(this.allocator, name, port) catch |err| { + var address_list = NetworkThread.getAddressList(default_allocator, name, port) catch |err| { return @errSetCast(ConnectError, err); }; @@ -776,7 +728,7 @@ const AsyncSocket = struct { this.queued += resp.written; if (resp.overflow) { - var next = AsyncMessage.get(this.allocator); + var next = AsyncMessage.get(default_allocator); this.tail.next = next; this.tail = next; @@ -886,12 +838,15 @@ const AsyncSocket = struct { send_frame: @Frame(SSL.send) = undefined, read_frame: @Frame(SSL.read) = undefined, hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined, + is_ssl: bool = false, const SSLConnectError = ConnectError || HandshakeError; const HandshakeError = error{OpenSSLError}; pub fn connect(this: *SSL, name: []const u8, port: u16) !void { + this.is_ssl = true; try this.socket.connect(name, port); + this.handshake_complete = false; var ssl = boring.initClient(); @@ -911,7 +866,6 @@ const AsyncSocket = struct { this.ssl = ssl; this.read_bio = AsyncMessage.get(this.socket.allocator); - try this.handshake(); } @@ -1076,6 +1030,9 @@ const AsyncSocket = struct { } pub fn deinit(this: *SSL) void { + this.socket.deinit(); + if (!this.is_ssl) return; + _ = boring.BIO_set_data(this.ssl_bio.bio, null); this.ssl_bio.pending_frame = AsyncBIO.PendingFrame.init(); this.ssl_bio.socket_fd = 0; @@ -1310,7 +1267,7 @@ pub const AsyncBIO = struct { switch (this.send_wait) { .pending => { var write_buffer = this.write_buffer orelse brk: { - this.write_buffer = AsyncMessage.get(this.allocator); + this.write_buffer = AsyncMessage.get(default_allocator); break :brk this.write_buffer.?; }; @@ -1488,7 +1445,7 @@ pub fn connect( try connector.connect(this.url.hostname, port); var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; - client.setReadBufferSize(BufferPool.len) catch {}; + client.setReadBufferSize(buffer_pool_len) catch {}; // client.setQuickACK(true) catch {}; this.tcp_client = client; @@ -1503,8 +1460,12 @@ pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableStri } pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { + defer if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit(); // this prevents stack overflow redirect: while (this.remaining_redirect_count >= -1) { + if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit(); + + this.stage = Stage.pending; body_out_str.reset(); if (this.url.isHTTPS()) { @@ -1512,7 +1473,6 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) ! switch (err) { error.Redirect => { this.remaining_redirect_count -= 1; - this.socket.deinit(); continue :redirect; }, @@ -1524,7 +1484,6 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) ! switch (err) { error.Redirect => { this.remaining_redirect_count -= 1; - this.socket.socket.deinit(); continue :redirect; }, @@ -1541,11 +1500,12 @@ const Task = ThreadPool.Task; pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { this.socket = AsyncSocket.SSL{ - .socket = try AsyncSocket.init(&AsyncIO.global, 0, this.allocator), + .socket = try AsyncSocket.init(&AsyncIO.global, 0, default_allocator), }; + this.stage = Stage.connect; var socket = &this.socket.socket; try this.connect(*AsyncSocket, socket); - + this.stage = Stage.request; defer this.socket.close(); var request = buildRequest(this, body.len); if (this.verbose) { @@ -1554,7 +1514,7 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin try writeRequest(@TypeOf(socket), socket, request, body); _ = try socket.send(); - + this.stage = Stage.response; if (this.progress_node == null) { return this.processResponse( false, @@ -1591,8 +1551,8 @@ const ZlibPool = struct { pub fn get(this: *ZlibPool) !*MutableString { switch (this.items.items.len) { 0 => { - var mutable = try this.allocator.create(MutableString); - mutable.* = try MutableString.init(this.allocator, 0); + var mutable = try default_allocator.create(MutableString); + mutable.* = try MutableString.init(default_allocator, 0); return mutable; }, else => { @@ -1704,7 +1664,7 @@ const ZlibPool = struct { pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { defer if (this.verbose) Output.flush(); var response: picohttp.Response = undefined; - var request_message = AsyncMessage.get(this.allocator); + var request_message = AsyncMessage.get(default_allocator); defer request_message.release(); var request_buffer: []u8 = request_message.buf; var read_length: usize = 0; @@ -1805,21 +1765,21 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti switch (response.status_code) { 302, 301, 307, 308, 303 => { if (strings.indexOf(location, "://")) |i| { - var url_buf = this.redirect orelse try URLBufferPool.get(this.allocator); + var url_buf = this.redirect orelse URLBufferPool.get(default_allocator); const protocol_name = location[0..i]; if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else { return error.UnsupportedRedirectProtocol; } - std.mem.copy(u8, &url_buf.buf, location); - this.url = URL.parse(url_buf.buf[0..location.len]); + std.mem.copy(u8, &url_buf.data, location); + this.url = URL.parse(url_buf.data[0..location.len]); this.redirect = url_buf; } else { - var url_buf = try URLBufferPool.get(this.allocator); + var url_buf = URLBufferPool.get(default_allocator); const original_url = this.url; this.url = URL.parse(std.fmt.bufPrint( - &url_buf.buf, + &url_buf.data, "{s}://{s}{s}", .{ original_url.displayProtocol(), original_url.displayHostname(), location }, ) catch return error.RedirectURLTooLong); @@ -2028,9 +1988,11 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti } pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response { - this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global); + this.socket = try AsyncSocket.SSL.init(default_allocator, &AsyncIO.global); var socket = &this.socket; + this.stage = Stage.connect; try this.connect(*AsyncSocket.SSL, socket); + this.stage = Stage.request; defer this.socket.close(); var request = buildRequest(this, body_str.len); @@ -2041,6 +2003,8 @@ pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *Mutable try writeRequest(@TypeOf(socket), socket, request, body_str); _ = try socket.send(); + this.stage = Stage.response; + if (this.progress_node == null) { return this.processResponse( false, diff --git a/src/pool.zig b/src/pool.zig index 4f3396d30..36a3d9d0a 100644 --- a/src/pool.zig +++ b/src/pool.zig @@ -1,8 +1,116 @@ const std = @import("std"); -pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: std.mem.Allocator) anyerror!Type), comptime threadsafe: bool) type { +fn SinglyLinkedList(comptime T: type, comptime Parent: type) type { return struct { - const LinkedList = std.SinglyLinkedList(Type); + const Self = @This(); + + /// Node inside the linked list wrapping the actual data. + pub const Node = struct { + next: ?*Node = null, + data: T, + + pub const Data = T; + + /// Insert a new node after the current one. + /// + /// Arguments: + /// new_node: Pointer to the new node to insert. + pub fn insertAfter(node: *Node, new_node: *Node) void { + new_node.next = node.next; + node.next = new_node; + } + + /// Remove a node from the list. + /// + /// Arguments: + /// node: Pointer to the node to be removed. + /// Returns: + /// node removed + pub fn removeNext(node: *Node) ?*Node { + const next_node = node.next orelse return null; + node.next = next_node.next; + return next_node; + } + + /// Iterate over the singly-linked list from this node, until the final node is found. + /// This operation is O(N). + pub fn findLast(node: *Node) *Node { + var it = node; + while (true) { + it = it.next orelse return it; + } + } + + /// Iterate over each next node, returning the count of all nodes except the starting one. + /// This operation is O(N). + pub fn countChildren(node: *const Node) usize { + var count: usize = 0; + var it: ?*const Node = node.next; + while (it) |n| : (it = n.next) { + count += 1; + } + return count; + } + + pub inline fn release(node: *Node) void { + Parent.release(node); + } + }; + + first: ?*Node = null, + + /// Insert a new node at the head. + /// + /// Arguments: + /// new_node: Pointer to the new node to insert. + pub fn prepend(list: *Self, new_node: *Node) void { + new_node.next = list.first; + list.first = new_node; + } + + /// Remove a node from the list. + /// + /// Arguments: + /// node: Pointer to the node to be removed. + pub fn remove(list: *Self, node: *Node) void { + if (list.first == node) { + list.first = node.next; + } else { + var current_elm = list.first.?; + while (current_elm.next != node) { + current_elm = current_elm.next.?; + } + current_elm.next = node.next; + } + } + + /// Remove and return the first node in the list. + /// + /// Returns: + /// A pointer to the first node in the list. + pub fn popFirst(list: *Self) ?*Node { + const first = list.first orelse return null; + list.first = first.next; + return first; + } + + /// Iterate over all nodes, returning the count. + /// This operation is O(N). + pub fn len(list: Self) usize { + if (list.first) |n| { + return 1 + n.countChildren(); + } else { + return 0; + } + } + }; +} + +pub fn ObjectPool(comptime Type: type, comptime Init: (?fn (allocator: std.mem.Allocator) anyerror!Type), comptime threadsafe: bool) type { + return struct { + const Pool = @This(); + const LinkedList = SinglyLinkedList(Type, Pool); + pub const Node = LinkedList.Node; const Data = if (threadsafe) struct { pub threadlocal var list: LinkedList = undefined; @@ -15,21 +123,23 @@ pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: std.mem.Al }; const data = Data; - pub const Node = LinkedList.Node; pub fn get(allocator: std.mem.Allocator) *LinkedList.Node { if (data.loaded) { if (data.list.popFirst()) |node| { - if (comptime @hasDecl(Type, "reset")) node.data.reset(); + if (comptime std.meta.trait.isContainer(Type) and @hasDecl(Type, "reset")) node.data.reset(); return node; } } var new_node = allocator.create(LinkedList.Node) catch unreachable; new_node.* = LinkedList.Node{ - .data = Init( - allocator, - ) catch unreachable, + .data = if (comptime Init) |init_| + (init_( + allocator, + ) catch unreachable) + else + undefined, }; return new_node; |