aboutsummaryrefslogtreecommitdiff
path: root/src/http_client_async.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r--src/http_client_async.zig156
1 files changed, 116 insertions, 40 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 853ab8f3d..b6dd0e828 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -1,5 +1,6 @@
const picohttp = @import("bun").picohttp;
const bun = @import("bun");
+const JSC = bun.JSC;
const string = bun.string;
const Output = bun.Output;
const Global = bun.Global;
@@ -658,6 +659,7 @@ pub fn onOpen(
ssl.configureHTTPClient(hostname);
}
}
+ client.addAbortSignalEventListenner(is_ssl, socket);
if (client.state.request_stage == .pending) {
client.onWritable(true, comptime is_ssl, socket);
}
@@ -691,7 +693,7 @@ pub fn onClose(
}
if (in_progress) {
- client.fail(error.ConnectionClosed);
+ client.fail(error.ConnectionClosed, null);
}
}
pub fn onTimeout(
@@ -702,8 +704,9 @@ pub fn onTimeout(
_ = socket;
log("Timeout {s}\n", .{client.url.href});
- if (client.state.stage != .done and client.state.stage != .fail)
- client.fail(error.Timeout);
+ if (client.state.stage != .done and client.state.stage != .fail) {
+ client.fail(error.Timeout, null);
+ }
}
pub fn onConnectError(
client: *HTTPClient,
@@ -714,7 +717,7 @@ pub fn onConnectError(
log("onConnectError {s}\n", .{client.url.href});
if (client.state.stage != .done and client.state.stage != .fail)
- client.fail(error.ConnectionRefused);
+ client.fail(error.ConnectionRefused, null);
}
pub fn onEnd(
client: *HTTPClient,
@@ -724,7 +727,7 @@ pub fn onEnd(
log("onEnd {s}\n", .{client.url.href});
if (client.state.stage != .done and client.state.stage != .fail)
- client.fail(error.ConnectionClosed);
+ client.fail(error.ConnectionClosed, null);
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -989,23 +992,68 @@ http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunneling: bool = false,
proxy_tunnel: ?ProxyTunnel = null,
+signal: ?*JSC.AbortSignal = null,
+abort_handler: ?*anyopaque = null,
+abort_handler_deinit: ?*const fn (?*anyopaque) void = null,
+pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*JSC.AbortSignal) HTTPClient {
+ return HTTPClient{ .allocator = allocator, .method = method, .url = url, .header_entries = header_entries, .header_buf = header_buf, .signal = signal, .abort_handler = null, .abort_handler_deinit = null };
+}
-pub fn init(
- allocator: std.mem.Allocator,
- method: Method,
- url: URL,
- header_entries: Headers.Entries,
- header_buf: string,
-) HTTPClient {
- return HTTPClient{
- .allocator = allocator,
- .method = method,
- .url = url,
- .header_entries = header_entries,
- .header_buf = header_buf,
+pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type {
+ return struct {
+ client: *HTTPClient,
+ socket: NewHTTPContext(is_ssl).HTTPSocket,
+
+ pub fn init(client: *HTTPClient, socket: NewHTTPContext(is_ssl).HTTPSocket) !*@This() {
+ var ctx = try client.allocator.create(@This());
+ ctx.client = client;
+ ctx.socket = socket;
+ return ctx;
+ }
+
+ pub fn onAborted(this: ?*anyopaque, reason: JSC.JSValue) callconv(.C) void {
+ log("onAborted", .{});
+ if (this) |this_| {
+ const self = bun.cast(*@This(), this_);
+ self.client.closeAndAbort(reason, is_ssl, self.socket);
+ }
+ }
+
+ pub fn deinit(this: ?*anyopaque) void {
+ if (this) |this_| {
+ var self = bun.cast(*@This(), this_);
+ const allocator = self.client.allocator;
+ allocator.destroy(self);
+ }
+ }
};
}
+pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ if (this.signal) |signal| {
+ const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable;
+ this.abort_handler = bun.cast(*anyopaque, handler);
+ this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit;
+ _ = signal.addListener(this.abort_handler.?, ClientSocketAbortHandler(is_ssl).onAborted);
+ log("addAbortSignalEventListenner added!", .{});
+ return;
+ }
+ log("addAbortSignalEventListenner (signal == null)", .{});
+}
+
+pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue {
+ if (this.signal) |signal| {
+ const aborted = signal.aborted();
+ log("hasSignalAborted {any}", .{aborted});
+ if (aborted) {
+ return signal.abortReason();
+ }
+ return null;
+ }
+ log("hasSignalAborted (signal == null)", .{});
+ return null;
+}
+
pub fn deinit(this: *HTTPClient) void {
if (this.redirect) |redirect| {
redirect.release();
@@ -1019,6 +1067,17 @@ pub fn deinit(this: *HTTPClient) void {
tunnel.deinit();
this.proxy_tunnel = null;
}
+
+ if (this.signal != null) {
+ var signal = this.signal.?;
+ _ = signal.unref();
+ this.signal = null;
+ }
+ if (this.abort_handler != null and this.abort_handler_deinit != null) {
+ this.abort_handler_deinit.?(this.abort_handler.?);
+ this.abort_handler = null;
+ this.abort_handler_deinit = null;
+ }
this.state.compressed_body.deinit();
this.state.response_message_buffer.deinit();
}
@@ -1178,20 +1237,9 @@ pub const AsyncHTTP = struct {
};
const AtomicState = std.atomic.Atomic(State);
- pub fn init(
- allocator: std.mem.Allocator,
- method: Method,
- url: URL,
- headers: Headers.Entries,
- headers_buf: string,
- response_buffer: *MutableString,
- request_body: []const u8,
- timeout: usize,
- callback: HTTPClientResult.Callback,
- http_proxy: ?URL,
- ) AsyncHTTP {
+ pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, callback: HTTPClientResult.Callback, http_proxy: ?URL, signal: ?*JSC.AbortSignal) AsyncHTTP {
var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy };
- this.client = HTTPClient.init(allocator, method, url, headers, headers_buf);
+ this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal);
this.client.timeout = timeout;
this.client.http_proxy = this.http_proxy;
if (http_proxy) |proxy| {
@@ -1222,7 +1270,7 @@ pub const AsyncHTTP = struct {
}
pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, http_proxy: ?URL) AsyncHTTP {
- return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy);
+ return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy, null);
}
fn reset(this: *AsyncHTTP) !void {
@@ -1454,7 +1502,7 @@ pub fn doRedirect(this: *HTTPClient) void {
std.debug.assert(this.follow_redirects);
if (this.remaining_redirect_count == 0) {
- this.fail(error.TooManyRedirects);
+ this.fail(error.TooManyRedirects, null);
return;
}
this.state.reset();
@@ -1491,12 +1539,12 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString)
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
var socket = http_thread.connect(this, is_ssl) catch |err| {
- this.fail(err);
+ this.fail(err, null);
return;
};
if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) {
- this.fail(error.ConnectionClosed);
+ this.fail(error.ConnectionClosed, null);
std.debug.assert(this.state.fail != error.NoError);
}
}
@@ -1522,6 +1570,11 @@ fn printResponse(response: picohttp.Response) void {
}
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ if (this.hasSignalAborted()) |reason| {
+ this.closeAndAbort(reason, is_ssl, socket);
+ return;
+ }
+
switch (this.state.request_stage) {
.pending, .headers => {
var stack_fallback = std.heap.stackFallback(16384, default_allocator);
@@ -1766,8 +1819,8 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc
**anyopaque,
NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
);
- this.fail(err);
socket.close(0, null);
+ this.fail(err, null);
}
fn startProxySendHeaders(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
@@ -1821,6 +1874,10 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
}
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ if (this.hasSignalAborted()) |reason| {
+ this.closeAndAbort(reason, is_ssl, socket);
+ return;
+ }
switch (this.state.response_stage) {
.pending, .headers, .proxy_decoded_headers => {
var to_read = incoming_data;
@@ -2054,14 +2111,23 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
}
-fn fail(this: *HTTPClient, err: anyerror) void {
+pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ socket.ext(**anyopaque).?.* = bun.cast(
+ **anyopaque,
+ NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
+ );
+ socket.close(0, null);
+ this.fail(error.Aborted, reason);
+}
+
+fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void {
this.state.request_stage = .fail;
this.state.response_stage = .fail;
this.state.fail = err;
this.state.stage = .fail;
const callback = this.completion_callback;
- const result = this.toResult(this.cloned_metadata);
+ const result = this.toResult(this.cloned_metadata, reason);
this.state.reset();
this.proxy_tunneling = false;
@@ -2101,7 +2167,7 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss
var out_str = this.state.body_out_str.?;
var body = out_str.*;
this.cloned_metadata.response = this.state.pending_response;
- const result = this.toResult(this.cloned_metadata);
+ const result = this.toResult(this.cloned_metadata, null);
const callback = this.completion_callback;
this.state.response_stage = .done;
@@ -2147,11 +2213,20 @@ pub const HTTPClientResult = struct {
fail: anyerror = error.NoError,
redirected: bool = false,
headers_buf: []picohttp.Header = &.{},
+ reason: ?JSC.JSValue = null,
pub fn isSuccess(this: *const HTTPClientResult) bool {
return this.fail == error.NoError;
}
+ pub fn isTimeout(this: *const HTTPClientResult) bool {
+ return this.fail == error.Timeout;
+ }
+
+ pub fn isAbort(this: *const HTTPClientResult) bool {
+ return this.fail == error.Aborted;
+ }
+
pub fn deinitMetadata(this: *HTTPClientResult) void {
if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
@@ -2191,7 +2266,7 @@ pub const HTTPClientResult = struct {
};
};
-pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult {
+pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.JSValue) HTTPClientResult {
return HTTPClientResult{
.body = this.state.body_out_str,
.response = metadata.response,
@@ -2199,6 +2274,7 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes
.redirected = this.remaining_redirect_count != default_redirect_count,
.href = metadata.url,
.fail = this.state.fail,
+ .reason = reason,
.headers_buf = metadata.response.headers,
};
}