aboutsummaryrefslogtreecommitdiff
path: root/src/http_client_async.zig
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-02-23 00:27:25 -0300
committerGravatar GitHub <noreply@github.com> 2023-02-22 19:27:25 -0800
commit24d624b176df241936d4ec82b2d6f93861de6229 (patch)
treee01d2ecc4bdb20cf12c9ed629b4b11065996e2f4 /src/http_client_async.zig
parent9c5f02e120bbfe76b45d036db3544cf47cf1354f (diff)
downloadbun-24d624b176df241936d4ec82b2d6f93861de6229.tar.gz
bun-24d624b176df241936d4ec82b2d6f93861de6229.tar.zst
bun-24d624b176df241936d4ec82b2d6f93861de6229.zip
feat(Request.signal) Initial support for signal in Request + fetch and Request + Bun.serve (#2097)
* add fetch abort signal * get aborted (still segfaults) * bidings.zig u0 error * still GC/memory error * fix start crash * fix AbortSignal fromJS * change fromJS to obj.as * addAbortSignalEventListenner * handle abort types, and add tests * fix tests * add custom reason test * merge 2 substring methods, use MAKE_STATIC_STRING_IMPL * fix create AbortError and TimeoutError, move globalThis and exception creation to main thread * fix tests and rebuild headers * no need to check with substring reason is already an exception * no need to check with substring reason is already an exception * fix dumb error inverting conditions for check reason * fix custom reason behavior * Request signal * remove package-lock.json * Remove JSC.Strong from Request signal * fix globals for fetch abort signal * more tests, clone signal crashs * fix AbortSignal.toJS * fix toJS bidings for AbortSignal * add streaming tests * fix abortion before connecting * fix tests and segfault * add fetch testing abort after finish * fix signal handler cleanup * support signal event Bun.serve * pull tests (failing) * remove unsupported test * formating * fix server Request.signal, fix cleanNativeBindings * add direct tests * more pull tests * fix stream tests * fix fetch, pending onAborted fix in HTTPServerWritable --------- Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r--src/http_client_async.zig140
1 files changed, 89 insertions, 51 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index b6dd0e828..0c7395319 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -235,6 +235,8 @@ fn NewHTTPContext(comptime ssl: bool) type {
/// Attempt to keep the socket alive by reusing it for another request.
/// If no space is available, close the socket.
pub fn releaseSocket(this: *@This(), socket: HTTPSocket, hostname: []const u8, port: u16) void {
+ log("releaseSocket", .{});
+
if (comptime Environment.allow_assert) {
std.debug.assert(!socket.isClosed());
std.debug.assert(!socket.isShutdown());
@@ -1015,7 +1017,9 @@ pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type {
log("onAborted", .{});
if (this) |this_| {
const self = bun.cast(*@This(), this_);
- self.client.closeAndAbort(reason, is_ssl, self.socket);
+ if (self.client.state.response_stage != .done and self.client.state.response_stage != .fail) {
+ self.client.closeAndAbort(reason, is_ssl, self.socket);
+ }
}
}
@@ -1031,6 +1035,14 @@ pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type {
pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
if (this.signal) |signal| {
+ const aborted = signal.aborted();
+ if (aborted) {
+ log("addAbortSignalEventListenner already aborted!", .{});
+ const reason = signal.abortReason();
+ this.closeAndAbort(reason, is_ssl, socket);
+ return;
+ }
+
const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable;
this.abort_handler = bun.cast(*anyopaque, handler);
this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit;
@@ -1054,6 +1066,15 @@ pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue {
return null;
}
+pub fn deinitSignal(this: *HTTPClient) void {
+ if (this.signal != null) {
+ var signal = this.signal.?;
+ const ctx = bun.cast(*anyopaque, this);
+ signal.cleanNativeBindings(ctx);
+ _ = signal.unref();
+ this.signal = null;
+ }
+}
pub fn deinit(this: *HTTPClient) void {
if (this.redirect) |redirect| {
redirect.release();
@@ -1068,11 +1089,8 @@ pub fn deinit(this: *HTTPClient) void {
this.proxy_tunnel = null;
}
- if (this.signal != null) {
- var signal = this.signal.?;
- _ = signal.unref();
- this.signal = null;
- }
+ this.deinitSignal();
+
if (this.abort_handler != null and this.abort_handler_deinit != null) {
this.abort_handler_deinit.?(this.abort_handler.?);
this.abort_handler = null;
@@ -1538,6 +1556,12 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString)
}
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
+ // Aborted before connecting
+ if (this.hasSignalAborted()) |reason| {
+ this.fail(error.Aborted, reason);
+ return;
+ }
+
var socket = http_thread.connect(this, is_ssl) catch |err| {
this.fail(err, null);
return;
@@ -1815,12 +1839,17 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- socket.ext(**anyopaque).?.* = bun.cast(
- **anyopaque,
- NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
- );
- socket.close(0, null);
- this.fail(err, null);
+ if (this.state.stage != .fail and this.state.stage != .done) {
+ log("closeAndFail", .{});
+ if (!socket.isClosed()) {
+ socket.ext(**anyopaque).?.* = bun.cast(
+ **anyopaque,
+ NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
+ );
+ socket.close(0, null);
+ }
+ this.fail(err, null);
+ }
}
fn startProxySendHeaders(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
@@ -2112,12 +2141,17 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- socket.ext(**anyopaque).?.* = bun.cast(
- **anyopaque,
- NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
- );
- socket.close(0, null);
- this.fail(error.Aborted, reason);
+ if (this.state.stage != .fail and this.state.stage != .done) {
+ log("closeAndAbort", .{});
+ if (!socket.isClosed()) {
+ socket.ext(**anyopaque).?.* = bun.cast(
+ **anyopaque,
+ NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
+ );
+ socket.close(0, null);
+ }
+ this.fail(error.Aborted, reason);
+ }
}
fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void {
@@ -2132,6 +2166,8 @@ fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void {
this.proxy_tunneling = false;
callback.run(result);
+
+ this.deinitSignal();
}
// We have to clone metadata immediately after use
@@ -2164,45 +2200,47 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void {
}
pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- var out_str = this.state.body_out_str.?;
- var body = out_str.*;
- this.cloned_metadata.response = this.state.pending_response;
- const result = this.toResult(this.cloned_metadata, null);
- const callback = this.completion_callback;
+ if (this.state.stage != .done and this.state.stage != .fail) {
+ log("done", .{});
- this.state.response_stage = .done;
- this.state.request_stage = .done;
- this.state.stage = .done;
+ this.deinitSignal();
- socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
+ var out_str = this.state.body_out_str.?;
+ var body = out_str.*;
+ this.cloned_metadata.response = this.state.pending_response;
+ const result = this.toResult(this.cloned_metadata, null);
+ const callback = this.completion_callback;
- if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) {
- ctx.releaseSocket(
- socket,
- this.connected_url.hostname,
- this.connected_url.getPortAuto(),
- );
- } else if (!socket.isClosed()) {
- socket.close(0, null);
- }
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
- this.state.reset();
- result.body.?.* = body;
- std.debug.assert(this.state.stage != .done);
- this.state.response_stage = .done;
- this.state.request_stage = .done;
- this.state.stage = .done;
- this.proxy_tunneling = false;
- if (comptime print_every > 0) {
- print_every_i += 1;
- if (print_every_i % print_every == 0) {
- Output.prettyln("Heap stats for HTTP thread\n", .{});
- Output.flush();
- default_arena.dumpThreadStats();
- print_every_i = 0;
+ if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) {
+ ctx.releaseSocket(
+ socket,
+ this.connected_url.hostname,
+ this.connected_url.getPortAuto(),
+ );
+ } else if (!socket.isClosed()) {
+ socket.close(0, null);
+ }
+
+ this.state.reset();
+ result.body.?.* = body;
+ std.debug.assert(this.state.stage != .done);
+ this.state.response_stage = .done;
+ this.state.request_stage = .done;
+ this.state.stage = .done;
+ this.proxy_tunneling = false;
+ if (comptime print_every > 0) {
+ print_every_i += 1;
+ if (print_every_i % print_every == 0) {
+ Output.prettyln("Heap stats for HTTP thread\n", .{});
+ Output.flush();
+ default_arena.dumpThreadStats();
+ print_every_i = 0;
+ }
}
+ callback.run(result);
}
- callback.run(result);
}
pub const HTTPClientResult = struct {