diff options
author | 2023-02-23 00:27:25 -0300 | |
---|---|---|
committer | 2023-02-22 19:27:25 -0800 | |
commit | 24d624b176df241936d4ec82b2d6f93861de6229 (patch) | |
tree | e01d2ecc4bdb20cf12c9ed629b4b11065996e2f4 /src/http_client_async.zig | |
parent | 9c5f02e120bbfe76b45d036db3544cf47cf1354f (diff) | |
download | bun-24d624b176df241936d4ec82b2d6f93861de6229.tar.gz bun-24d624b176df241936d4ec82b2d6f93861de6229.tar.zst bun-24d624b176df241936d4ec82b2d6f93861de6229.zip |
feat(Request.signal) Initial support for signal in Request + fetch and Request + Bun.serve (#2097)
* add fetch abort signal
* get aborted (still segfaults)
* bidings.zig u0 error
* still GC/memory error
* fix start crash
* fix AbortSignal fromJS
* change fromJS to obj.as
* addAbortSignalEventListenner
* handle abort types, and add tests
* fix tests
* add custom reason test
* merge 2 substring methods, use MAKE_STATIC_STRING_IMPL
* fix create AbortError and TimeoutError, move globalThis and exception creation to main thread
* fix tests and rebuild headers
* no need to check with substring reason is already an exception
* no need to check with substring reason is already an exception
* fix dumb error inverting conditions for check reason
* fix custom reason behavior
* Request signal
* remove package-lock.json
* Remove JSC.Strong from Request signal
* fix globals for fetch abort signal
* more tests, clone signal crashs
* fix AbortSignal.toJS
* fix toJS bidings for AbortSignal
* add streaming tests
* fix abortion before connecting
* fix tests and segfault
* add fetch testing abort after finish
* fix signal handler cleanup
* support signal event Bun.serve
* pull tests (failing)
* remove unsupported test
* formating
* fix server Request.signal, fix cleanNativeBindings
* add direct tests
* more pull tests
* fix stream tests
* fix fetch, pending onAborted fix in HTTPServerWritable
---------
Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r-- | src/http_client_async.zig | 140 |
1 files changed, 89 insertions, 51 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig index b6dd0e828..0c7395319 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -235,6 +235,8 @@ fn NewHTTPContext(comptime ssl: bool) type { /// Attempt to keep the socket alive by reusing it for another request. /// If no space is available, close the socket. pub fn releaseSocket(this: *@This(), socket: HTTPSocket, hostname: []const u8, port: u16) void { + log("releaseSocket", .{}); + if (comptime Environment.allow_assert) { std.debug.assert(!socket.isClosed()); std.debug.assert(!socket.isShutdown()); @@ -1015,7 +1017,9 @@ pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type { log("onAborted", .{}); if (this) |this_| { const self = bun.cast(*@This(), this_); - self.client.closeAndAbort(reason, is_ssl, self.socket); + if (self.client.state.response_stage != .done and self.client.state.response_stage != .fail) { + self.client.closeAndAbort(reason, is_ssl, self.socket); + } } } @@ -1031,6 +1035,14 @@ pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type { pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { if (this.signal) |signal| { + const aborted = signal.aborted(); + if (aborted) { + log("addAbortSignalEventListenner already aborted!", .{}); + const reason = signal.abortReason(); + this.closeAndAbort(reason, is_ssl, socket); + return; + } + const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable; this.abort_handler = bun.cast(*anyopaque, handler); this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit; @@ -1054,6 +1066,15 @@ pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue { return null; } +pub fn deinitSignal(this: *HTTPClient) void { + if (this.signal != null) { + var signal = this.signal.?; + const ctx = bun.cast(*anyopaque, this); + signal.cleanNativeBindings(ctx); + _ = signal.unref(); + this.signal = null; + } +} pub fn deinit(this: *HTTPClient) void { if (this.redirect) |redirect| { redirect.release(); @@ -1068,11 +1089,8 @@ pub fn deinit(this: *HTTPClient) void { this.proxy_tunnel = null; } - if (this.signal != null) { - var signal = this.signal.?; - _ = signal.unref(); - this.signal = null; - } + this.deinitSignal(); + if (this.abort_handler != null and this.abort_handler_deinit != null) { this.abort_handler_deinit.?(this.abort_handler.?); this.abort_handler = null; @@ -1538,6 +1556,12 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) } fn start_(this: *HTTPClient, comptime is_ssl: bool) void { + // Aborted before connecting + if (this.hasSignalAborted()) |reason| { + this.fail(error.Aborted, reason); + return; + } + var socket = http_thread.connect(this, is_ssl) catch |err| { this.fail(err, null); return; @@ -1815,12 +1839,17 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - socket.ext(**anyopaque).?.* = bun.cast( - **anyopaque, - NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), - ); - socket.close(0, null); - this.fail(err, null); + if (this.state.stage != .fail and this.state.stage != .done) { + log("closeAndFail", .{}); + if (!socket.isClosed()) { + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), + ); + socket.close(0, null); + } + this.fail(err, null); + } } fn startProxySendHeaders(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { @@ -2112,12 +2141,17 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - socket.ext(**anyopaque).?.* = bun.cast( - **anyopaque, - NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), - ); - socket.close(0, null); - this.fail(error.Aborted, reason); + if (this.state.stage != .fail and this.state.stage != .done) { + log("closeAndAbort", .{}); + if (!socket.isClosed()) { + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), + ); + socket.close(0, null); + } + this.fail(error.Aborted, reason); + } } fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void { @@ -2132,6 +2166,8 @@ fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void { this.proxy_tunneling = false; callback.run(result); + + this.deinitSignal(); } // We have to clone metadata immediately after use @@ -2164,45 +2200,47 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void { } pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { - var out_str = this.state.body_out_str.?; - var body = out_str.*; - this.cloned_metadata.response = this.state.pending_response; - const result = this.toResult(this.cloned_metadata, null); - const callback = this.completion_callback; + if (this.state.stage != .done and this.state.stage != .fail) { + log("done", .{}); - this.state.response_stage = .done; - this.state.request_stage = .done; - this.state.stage = .done; + this.deinitSignal(); - socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); + var out_str = this.state.body_out_str.?; + var body = out_str.*; + this.cloned_metadata.response = this.state.pending_response; + const result = this.toResult(this.cloned_metadata, null); + const callback = this.completion_callback; - if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { - ctx.releaseSocket( - socket, - this.connected_url.hostname, - this.connected_url.getPortAuto(), - ); - } else if (!socket.isClosed()) { - socket.close(0, null); - } + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); - this.state.reset(); - result.body.?.* = body; - std.debug.assert(this.state.stage != .done); - this.state.response_stage = .done; - this.state.request_stage = .done; - this.state.stage = .done; - this.proxy_tunneling = false; - if (comptime print_every > 0) { - print_every_i += 1; - if (print_every_i % print_every == 0) { - Output.prettyln("Heap stats for HTTP thread\n", .{}); - Output.flush(); - default_arena.dumpThreadStats(); - print_every_i = 0; + if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { + ctx.releaseSocket( + socket, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); + } else if (!socket.isClosed()) { + socket.close(0, null); + } + + this.state.reset(); + result.body.?.* = body; + std.debug.assert(this.state.stage != .done); + this.state.response_stage = .done; + this.state.request_stage = .done; + this.state.stage = .done; + this.proxy_tunneling = false; + if (comptime print_every > 0) { + print_every_i += 1; + if (print_every_i % print_every == 0) { + Output.prettyln("Heap stats for HTTP thread\n", .{}); + Output.flush(); + default_arena.dumpThreadStats(); + print_every_i = 0; + } } + callback.run(result); } - callback.run(result); } pub const HTTPClientResult = struct { |