diff options
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r-- | src/http_client_async.zig | 156 |
1 files changed, 116 insertions, 40 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 853ab8f3d..b6dd0e828 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -1,5 +1,6 @@ const picohttp = @import("bun").picohttp; const bun = @import("bun"); +const JSC = bun.JSC; const string = bun.string; const Output = bun.Output; const Global = bun.Global; @@ -658,6 +659,7 @@ pub fn onOpen( ssl.configureHTTPClient(hostname); } } + client.addAbortSignalEventListenner(is_ssl, socket); if (client.state.request_stage == .pending) { client.onWritable(true, comptime is_ssl, socket); } @@ -691,7 +693,7 @@ pub fn onClose( } if (in_progress) { - client.fail(error.ConnectionClosed); + client.fail(error.ConnectionClosed, null); } } pub fn onTimeout( @@ -702,8 +704,9 @@ pub fn onTimeout( _ = socket; log("Timeout {s}\n", .{client.url.href}); - if (client.state.stage != .done and client.state.stage != .fail) - client.fail(error.Timeout); + if (client.state.stage != .done and client.state.stage != .fail) { + client.fail(error.Timeout, null); + } } pub fn onConnectError( client: *HTTPClient, @@ -714,7 +717,7 @@ pub fn onConnectError( log("onConnectError {s}\n", .{client.url.href}); if (client.state.stage != .done and client.state.stage != .fail) - client.fail(error.ConnectionRefused); + client.fail(error.ConnectionRefused, null); } pub fn onEnd( client: *HTTPClient, @@ -724,7 +727,7 @@ pub fn onEnd( log("onEnd {s}\n", .{client.url.href}); if (client.state.stage != .done and client.state.stage != .fail) - client.fail(error.ConnectionClosed); + client.fail(error.ConnectionClosed, null); } pub inline fn getAllocator() std.mem.Allocator { @@ -989,23 +992,68 @@ http_proxy: ?URL = null, proxy_authorization: ?[]u8 = null, proxy_tunneling: bool = false, proxy_tunnel: ?ProxyTunnel = null, +signal: ?*JSC.AbortSignal = null, +abort_handler: ?*anyopaque = null, +abort_handler_deinit: ?*const fn (?*anyopaque) void = null, +pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*JSC.AbortSignal) HTTPClient { + return HTTPClient{ .allocator = allocator, .method = method, .url = url, .header_entries = header_entries, .header_buf = header_buf, .signal = signal, .abort_handler = null, .abort_handler_deinit = null }; +} -pub fn init( - allocator: std.mem.Allocator, - method: Method, - url: URL, - header_entries: Headers.Entries, - header_buf: string, -) HTTPClient { - return HTTPClient{ - .allocator = allocator, - .method = method, - .url = url, - .header_entries = header_entries, - .header_buf = header_buf, +pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type { + return struct { + client: *HTTPClient, + socket: NewHTTPContext(is_ssl).HTTPSocket, + + pub fn init(client: *HTTPClient, socket: NewHTTPContext(is_ssl).HTTPSocket) !*@This() { + var ctx = try client.allocator.create(@This()); + ctx.client = client; + ctx.socket = socket; + return ctx; + } + + pub fn onAborted(this: ?*anyopaque, reason: JSC.JSValue) callconv(.C) void { + log("onAborted", .{}); + if (this) |this_| { + const self = bun.cast(*@This(), this_); + self.client.closeAndAbort(reason, is_ssl, self.socket); + } + } + + pub fn deinit(this: ?*anyopaque) void { + if (this) |this_| { + var self = bun.cast(*@This(), this_); + const allocator = self.client.allocator; + allocator.destroy(self); + } + } }; } +pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + if (this.signal) |signal| { + const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable; + this.abort_handler = bun.cast(*anyopaque, handler); + this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit; + _ = signal.addListener(this.abort_handler.?, ClientSocketAbortHandler(is_ssl).onAborted); + log("addAbortSignalEventListenner added!", .{}); + return; + } + log("addAbortSignalEventListenner (signal == null)", .{}); +} + +pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue { + if (this.signal) |signal| { + const aborted = signal.aborted(); + log("hasSignalAborted {any}", .{aborted}); + if (aborted) { + return signal.abortReason(); + } + return null; + } + log("hasSignalAborted (signal == null)", .{}); + return null; +} + pub fn deinit(this: *HTTPClient) void { if (this.redirect) |redirect| { redirect.release(); @@ -1019,6 +1067,17 @@ pub fn deinit(this: *HTTPClient) void { tunnel.deinit(); this.proxy_tunnel = null; } + + if (this.signal != null) { + var signal = this.signal.?; + _ = signal.unref(); + this.signal = null; + } + if (this.abort_handler != null and this.abort_handler_deinit != null) { + this.abort_handler_deinit.?(this.abort_handler.?); + this.abort_handler = null; + this.abort_handler_deinit = null; + } this.state.compressed_body.deinit(); this.state.response_message_buffer.deinit(); } @@ -1178,20 +1237,9 @@ pub const AsyncHTTP = struct { }; const AtomicState = std.atomic.Atomic(State); - pub fn init( - allocator: std.mem.Allocator, - method: Method, - url: URL, - headers: Headers.Entries, - headers_buf: string, - response_buffer: *MutableString, - request_body: []const u8, - timeout: usize, - callback: HTTPClientResult.Callback, - http_proxy: ?URL, - ) AsyncHTTP { + pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, callback: HTTPClientResult.Callback, http_proxy: ?URL, signal: ?*JSC.AbortSignal) AsyncHTTP { var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy }; - this.client = HTTPClient.init(allocator, method, url, headers, headers_buf); + this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal); this.client.timeout = timeout; this.client.http_proxy = this.http_proxy; if (http_proxy) |proxy| { @@ -1222,7 +1270,7 @@ pub const AsyncHTTP = struct { } pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, http_proxy: ?URL) AsyncHTTP { - return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy); + return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy, null); } fn reset(this: *AsyncHTTP) !void { @@ -1454,7 +1502,7 @@ pub fn doRedirect(this: *HTTPClient) void { std.debug.assert(this.follow_redirects); if (this.remaining_redirect_count == 0) { - this.fail(error.TooManyRedirects); + this.fail(error.TooManyRedirects, null); return; } this.state.reset(); @@ -1491,12 +1539,12 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) fn start_(this: *HTTPClient, comptime is_ssl: bool) void { var socket = http_thread.connect(this, is_ssl) catch |err| { - this.fail(err); + this.fail(err, null); return; }; if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) { - this.fail(error.ConnectionClosed); + this.fail(error.ConnectionClosed, null); std.debug.assert(this.state.fail != error.NoError); } } @@ -1522,6 +1570,11 @@ fn printResponse(response: picohttp.Response) void { } pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + if (this.hasSignalAborted()) |reason| { + this.closeAndAbort(reason, is_ssl, socket); + return; + } + switch (this.state.request_stage) { .pending, .headers => { var stack_fallback = std.heap.stackFallback(16384, default_allocator); @@ -1766,8 +1819,8 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc **anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), ); - this.fail(err); socket.close(0, null); + this.fail(err, null); } fn startProxySendHeaders(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { @@ -1821,6 +1874,10 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP } pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { + if (this.hasSignalAborted()) |reason| { + this.closeAndAbort(reason, is_ssl, socket); + return; + } switch (this.state.response_stage) { .pending, .headers, .proxy_decoded_headers => { var to_read = incoming_data; @@ -2054,14 +2111,23 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } } -fn fail(this: *HTTPClient, err: anyerror) void { +pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), + ); + socket.close(0, null); + this.fail(error.Aborted, reason); +} + +fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void { this.state.request_stage = .fail; this.state.response_stage = .fail; this.state.fail = err; this.state.stage = .fail; const callback = this.completion_callback; - const result = this.toResult(this.cloned_metadata); + const result = this.toResult(this.cloned_metadata, reason); this.state.reset(); this.proxy_tunneling = false; @@ -2101,7 +2167,7 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss var out_str = this.state.body_out_str.?; var body = out_str.*; this.cloned_metadata.response = this.state.pending_response; - const result = this.toResult(this.cloned_metadata); + const result = this.toResult(this.cloned_metadata, null); const callback = this.completion_callback; this.state.response_stage = .done; @@ -2147,11 +2213,20 @@ pub const HTTPClientResult = struct { fail: anyerror = error.NoError, redirected: bool = false, headers_buf: []picohttp.Header = &.{}, + reason: ?JSC.JSValue = null, pub fn isSuccess(this: *const HTTPClientResult) bool { return this.fail == error.NoError; } + pub fn isTimeout(this: *const HTTPClientResult) bool { + return this.fail == error.Timeout; + } + + pub fn isAbort(this: *const HTTPClientResult) bool { + return this.fail == error.Aborted; + } + pub fn deinitMetadata(this: *HTTPClientResult) void { if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf); if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf); @@ -2191,7 +2266,7 @@ pub const HTTPClientResult = struct { }; }; -pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult { +pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.JSValue) HTTPClientResult { return HTTPClientResult{ .body = this.state.body_out_str, .response = metadata.response, @@ -2199,6 +2274,7 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes .redirected = this.remaining_redirect_count != default_redirect_count, .href = metadata.url, .fail = this.state.fail, + .reason = reason, .headers_buf = metadata.response.headers, }; } |