diff options
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r-- | src/http_client_async.zig | 140 |
1 files changed, 89 insertions, 51 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig index b6dd0e828..0c7395319 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -235,6 +235,8 @@ fn NewHTTPContext(comptime ssl: bool) type { /// Attempt to keep the socket alive by reusing it for another request. /// If no space is available, close the socket. pub fn releaseSocket(this: *@This(), socket: HTTPSocket, hostname: []const u8, port: u16) void { + log("releaseSocket", .{}); + if (comptime Environment.allow_assert) { std.debug.assert(!socket.isClosed()); std.debug.assert(!socket.isShutdown()); @@ -1015,7 +1017,9 @@ pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type { log("onAborted", .{}); if (this) |this_| { const self = bun.cast(*@This(), this_); - self.client.closeAndAbort(reason, is_ssl, self.socket); + if (self.client.state.response_stage != .done and self.client.state.response_stage != .fail) { + self.client.closeAndAbort(reason, is_ssl, self.socket); + } } } @@ -1031,6 +1035,14 @@ pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type { pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { if (this.signal) |signal| { + const aborted = signal.aborted(); + if (aborted) { + log("addAbortSignalEventListenner already aborted!", .{}); + const reason = signal.abortReason(); + this.closeAndAbort(reason, is_ssl, socket); + return; + } + const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable; this.abort_handler = bun.cast(*anyopaque, handler); this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit; @@ -1054,6 +1066,15 @@ pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue { return null; } +pub fn deinitSignal(this: *HTTPClient) void { + if (this.signal != null) { + var signal = this.signal.?; + const ctx = bun.cast(*anyopaque, this); + signal.cleanNativeBindings(ctx); + _ = signal.unref(); + this.signal = null; + } +} pub fn deinit(this: *HTTPClient) void { if (this.redirect) |redirect| { redirect.release(); @@ -1068,11 +1089,8 @@ pub fn deinit(this: *HTTPClient) void { this.proxy_tunnel = null; } - if (this.signal != null) { - var signal = this.signal.?; - _ = signal.unref(); - this.signal = null; - } + this.deinitSignal(); + if (this.abort_handler != null and this.abort_handler_deinit != null) { this.abort_handler_deinit.?(this.abort_handler.?); this.abort_handler = null; @@ -1538,6 +1556,12 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) } fn start_(this: *HTTPClient, comptime is_ssl: bool) void { + // Aborted before connecting + if (this.hasSignalAborted()) |reason| { + this.fail(error.Aborted, reason); + return; + } + var socket = http_thread.connect(this, is_ssl) catch |err| { this.fail(err, null); return; @@ -1815,12 +1839,17 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - socket.ext(**anyopaque).?.* = bun.cast( - **anyopaque, - NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), - ); - socket.close(0, null); - this.fail(err, null); + if (this.state.stage != .fail and this.state.stage != .done) { + log("closeAndFail", .{}); + if (!socket.isClosed()) { + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), + ); + socket.close(0, null); + } + this.fail(err, null); + } } fn startProxySendHeaders(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { @@ -2112,12 +2141,17 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - socket.ext(**anyopaque).?.* = bun.cast( - **anyopaque, - NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), - ); - socket.close(0, null); - this.fail(error.Aborted, reason); + if (this.state.stage != .fail and this.state.stage != .done) { + log("closeAndAbort", .{}); + if (!socket.isClosed()) { + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), + ); + socket.close(0, null); + } + this.fail(error.Aborted, reason); + } } fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void { @@ -2132,6 +2166,8 @@ fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void { this.proxy_tunneling = false; callback.run(result); + + this.deinitSignal(); } // We have to clone metadata immediately after use @@ -2164,45 +2200,47 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void { } pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { - var out_str = this.state.body_out_str.?; - var body = out_str.*; - this.cloned_metadata.response = this.state.pending_response; - const result = this.toResult(this.cloned_metadata, null); - const callback = this.completion_callback; + if (this.state.stage != .done and this.state.stage != .fail) { + log("done", .{}); - this.state.response_stage = .done; - this.state.request_stage = .done; - this.state.stage = .done; + this.deinitSignal(); - socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); + var out_str = this.state.body_out_str.?; + var body = out_str.*; + this.cloned_metadata.response = this.state.pending_response; + const result = this.toResult(this.cloned_metadata, null); + const callback = this.completion_callback; - if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { - ctx.releaseSocket( - socket, - this.connected_url.hostname, - this.connected_url.getPortAuto(), - ); - } else if (!socket.isClosed()) { - socket.close(0, null); - } + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); - this.state.reset(); - result.body.?.* = body; - std.debug.assert(this.state.stage != .done); - this.state.response_stage = .done; - this.state.request_stage = .done; - this.state.stage = .done; - this.proxy_tunneling = false; - if (comptime print_every > 0) { - print_every_i += 1; - if (print_every_i % print_every == 0) { - Output.prettyln("Heap stats for HTTP thread\n", .{}); - Output.flush(); - default_arena.dumpThreadStats(); - print_every_i = 0; + if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { + ctx.releaseSocket( + socket, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); + } else if (!socket.isClosed()) { + socket.close(0, null); + } + + this.state.reset(); + result.body.?.* = body; + std.debug.assert(this.state.stage != .done); + this.state.response_stage = .done; + this.state.request_stage = .done; + this.state.stage = .done; + this.proxy_tunneling = false; + if (comptime print_every > 0) { + print_every_i += 1; + if (print_every_i % print_every == 0) { + Output.prettyln("Heap stats for HTTP thread\n", .{}); + Output.flush(); + default_arena.dumpThreadStats(); + print_every_i = 0; + } } + callback.run(result); } - callback.run(result); } pub const HTTPClientResult = struct { |