aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2023-02-22 21:29:43 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2023-02-22 21:29:43 -0800
commit0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef (patch)
treec16f4292aea038ab2101ab30ffeb41e8fbebc446
parent583b29632c6006ad4cdce75d1609f087c6bd2173 (diff)
downloadbun-0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef.tar.gz
bun-0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef.tar.zst
bun-0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef.zip
Fix flaky request.signal implementation
Diffstat (limited to '')
-rw-r--r--src/bun.js/bindings/bindings.zig50
-rw-r--r--src/bun.js/webcore/request.zig44
-rw-r--r--src/bun.js/webcore/response.zig118
-rw-r--r--src/http_client_async.zig198
-rw-r--r--test/bun.js/fetch.test.js68
5 files changed, 260 insertions, 218 deletions
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig
index a43ff8265..48cbe0887 100644
--- a/src/bun.js/bindings/bindings.zig
+++ b/src/bun.js/bindings/bindings.zig
@@ -1690,6 +1690,26 @@ pub const AbortSignal = extern opaque {
pub const name = "JSC::AbortSignal";
pub const namespace = "JSC";
+ pub fn listen(
+ this: *AbortSignal,
+ comptime Context: type,
+ ctx: *Context,
+ comptime cb: *const fn (*Context, JSValue) void,
+ ) *AbortSignal {
+ const Wrapper = struct {
+ const call = cb;
+ pub fn callback(
+ ptr: ?*anyopaque,
+ reason: JSValue,
+ ) callconv(.C) void {
+ var val = bun.cast(*Context, ptr.?);
+ call(val, reason);
+ }
+ };
+
+ return this.addListener(@ptrCast(?*anyopaque, ctx), Wrapper.callback);
+ }
+
pub fn addListener(
this: *AbortSignal,
ctx: ?*anyopaque,
@@ -1709,10 +1729,12 @@ pub const AbortSignal = extern opaque {
return cppFn("signal", .{ this, reason });
}
+ /// This function is not threadsafe. aborted is a boolean, not an atomic!
pub fn aborted(this: *AbortSignal) bool {
return cppFn("aborted", .{this});
}
+ /// This function is not threadsafe. JSValue cannot safely be passed between threads.
pub fn abortReason(this: *AbortSignal) JSValue {
return cppFn("abortReason", .{this});
}
@@ -1734,11 +1756,11 @@ pub const AbortSignal = extern opaque {
}
pub fn toJS(this: *AbortSignal, global: *JSGlobalObject) JSValue {
- return cppFn("toJS", .{this, global});
+ return cppFn("toJS", .{ this, global });
}
pub fn create(global: *JSGlobalObject) JSValue {
- return cppFn("create", .{ global });
+ return cppFn("create", .{global});
}
pub fn createAbortError(message: *const ZigString, code: *const ZigString, global: *JSGlobalObject) JSValue {
@@ -1749,20 +1771,7 @@ pub const AbortSignal = extern opaque {
return cppFn("createTimeoutError", .{ message, code, global });
}
- pub const Extern = [_][]const u8{
- "createAbortError",
- "createTimeoutError",
- "create",
- "ref",
- "unref",
- "signal",
- "abortReason",
- "aborted",
- "addListener",
- "fromJS",
- "toJS",
- "cleanNativeBindings"
- };
+ pub const Extern = [_][]const u8{ "createAbortError", "createTimeoutError", "create", "ref", "unref", "signal", "abortReason", "aborted", "addListener", "fromJS", "toJS", "cleanNativeBindings" };
};
pub const JSPromise = extern struct {
@@ -3567,14 +3576,7 @@ pub const JSValue = enum(JSValueReprInt) {
return cppFn("eqlCell", .{ this, other });
}
- pub const BuiltinName = enum(u8) {
- method,
- headers,
- status,
- url,
- body,
- data
- };
+ pub const BuiltinName = enum(u8) { method, headers, status, url, body, data };
// intended to be more lightweight than ZigString
pub fn fastGet(this: JSValue, global: *JSGlobalObject, builtin_name: BuiltinName) ?JSValue {
diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig
index fc5bff2b5..b2ff43a5a 100644
--- a/src/bun.js/webcore/request.zig
+++ b/src/bun.js/webcore/request.zig
@@ -260,7 +260,7 @@ pub const Request = struct {
return ZigString.init(Properties.UTF8.navigate).toValue(globalThis);
}
- pub fn finalize(this: *Request) callconv(.C) void {
+ pub fn finalizeWithoutDeinit(this: *Request) void {
if (this.headers) |headers| {
headers.deref();
this.headers = null;
@@ -275,7 +275,10 @@ pub const Request = struct {
_ = signal.unref();
this.signal = null;
}
+ }
+ pub fn finalize(this: *Request) callconv(.C) void {
+ this.finalizeWithoutDeinit();
bun.default_allocator.destroy(this);
}
@@ -402,10 +405,7 @@ pub const Request = struct {
if (Body.Value.fromJS(globalThis, body_)) |body| {
request.body = body;
} else {
- if (request.headers) |head| {
- head.deref();
- }
-
+ request.finalizeWithoutDeinit();
return null;
}
}
@@ -419,23 +419,6 @@ pub const Request = struct {
}
},
else => {
- if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| {
- request.headers = req_init.headers;
- request.method = req_init.method;
- }
-
- if (arguments[1].fastGet(globalThis, .body)) |body_| {
- if (Body.Value.fromJS(globalThis, body_)) |body| {
- request.body = body;
- } else {
- if (request.headers) |head| {
- head.deref();
- }
-
- return null;
- }
- }
-
if (arguments[1].get(globalThis, "signal")) |signal_| {
if (AbortSignal.fromJS(signal_)) |signal| {
//Keep it alive
@@ -444,10 +427,21 @@ pub const Request = struct {
} else {
globalThis.throw("Failed to construct 'Request': member signal is not of type AbortSignal.", .{});
- if (request.headers) |head| {
- head.deref();
- }
+ request.finalizeWithoutDeinit();
+ return null;
+ }
+ }
+
+ if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| {
+ request.headers = req_init.headers;
+ request.method = req_init.method;
+ }
+ if (arguments[1].fastGet(globalThis, .body)) |body_| {
+ if (Body.Value.fromJS(globalThis, body_)) |body| {
+ request.body = body;
+ } else {
+ request.finalizeWithoutDeinit();
return null;
}
}
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 39ee205c2..cd4b1be4c 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -623,6 +623,12 @@ pub const Fetch = struct {
/// We always clone url and proxy (if informed)
url_proxy_buffer: []const u8 = "",
+ signal: ?*JSC.AbortSignal = null,
+ aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+
+ // must be stored because AbortSignal stores reason weakly
+ abort_reason: JSValue = JSValue.zero,
+
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
return FetchTasklet{};
}
@@ -641,6 +647,14 @@ pub const Fetch = struct {
this.result.deinitMetadata();
this.response_buffer.deinit();
this.request_body.detach();
+
+ if (this.abort_reason != .zero) this.abort_reason.unprotect();
+
+ if (this.signal) |signal| {
+ signal.cleanNativeBindings(this);
+ _ = signal.unref();
+ this.signal = null;
+ }
}
pub fn deinit(this: *FetchTasklet) void {
@@ -688,29 +702,23 @@ pub const Fetch = struct {
}
pub fn onReject(this: *FetchTasklet) JSValue {
+ if (this.signal) |signal| {
+ _ = signal.unref();
+ this.signal = null;
+ }
+
+ if (!this.abort_reason.isEmptyOrUndefinedOrNull()) {
+ return this.abort_reason;
+ }
+
if (this.result.isTimeout()) {
- //Timeout with reason
- if (this.result.reason) |exception| {
- if (!exception.isEmptyOrUndefinedOrNull()) {
- return exception;
- }
- }
- //Timeout without reason
- const exception = JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this);
- return exception;
+ // Timeout without reason
+ return JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this);
}
if (this.result.isAbort()) {
- //Abort can be TimeoutError (AbortSignal.timeout(ms)) or AbortError so we need to detect
- if (this.result.reason) |exception| {
- if (!exception.isEmptyOrUndefinedOrNull()) {
- return exception;
- }
- }
-
- //Abort without reason
- const exception = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
- return exception;
+ // Abort without reason
+ return JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
}
const fetch_error = JSC.SystemError{
@@ -793,6 +801,7 @@ pub const Fetch = struct {
.request_headers = fetch_options.headers,
.ref = JSC.napi.Ref.create(globalThis, promise),
.url_proxy_buffer = fetch_options.url_proxy_buffer,
+ .signal = fetch_options.signal,
};
if (fetch_tasklet.request_body.store()) |store| {
@@ -808,12 +817,24 @@ pub const Fetch = struct {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
}
- fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New(
- *FetchTasklet,
- FetchTasklet.callback,
- ).init(
- fetch_tasklet,
- ), proxy, fetch_options.signal);
+ fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
+ allocator,
+ fetch_options.method,
+ fetch_options.url,
+ fetch_options.headers.entries,
+ fetch_options.headers.buf.items,
+ &fetch_tasklet.response_buffer,
+ fetch_tasklet.request_body.slice(),
+ fetch_options.timeout,
+ HTTPClient.HTTPClientResult.Callback.New(
+ *FetchTasklet,
+ FetchTasklet.callback,
+ ).init(
+ fetch_tasklet,
+ ),
+ proxy,
+ if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
+ );
if (!fetch_options.follow_redirects) {
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
@@ -822,10 +843,35 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout;
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
+
+ if (fetch_tasklet.signal) |signal| {
+ fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
+ }
return fetch_tasklet;
}
- const FetchOptions = struct { method: Method, headers: Headers, body: AnyBlob, timeout: usize, disable_timeout: bool, disable_keepalive: bool, url: ZigURL, verbose: bool = false, follow_redirects: bool = true, proxy: ?ZigURL = null, url_proxy_buffer: []const u8 = "", signal: ?*JSC.AbortSignal = null, globalThis: ?*JSGlobalObject };
+ pub fn abortListener(this: *FetchTasklet, reason: JSValue) void {
+ reason.ensureStillAlive();
+ this.abort_reason = reason;
+ reason.protect();
+ this.aborted.store(true, .Monotonic);
+ }
+
+ const FetchOptions = struct {
+ method: Method,
+ headers: Headers,
+ body: AnyBlob,
+ timeout: usize,
+ disable_timeout: bool,
+ disable_keepalive: bool,
+ url: ZigURL,
+ verbose: bool = false,
+ follow_redirects: bool = true,
+ proxy: ?ZigURL = null,
+ url_proxy_buffer: []const u8 = "",
+ signal: ?*JSC.AbortSignal = null,
+ globalThis: ?*JSGlobalObject,
+ };
pub fn queue(
allocator: std.mem.Allocator,
@@ -1205,9 +1251,23 @@ pub const Fetch = struct {
_ = FetchTasklet.queue(
default_allocator,
globalThis,
- .{ .method = method, .url = url, .headers = headers orelse Headers{
- .allocator = bun.default_allocator,
- }, .body = body, .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, .follow_redirects = follow_redirects, .verbose = verbose, .proxy = proxy, .url_proxy_buffer = url_proxy_buffer, .signal = signal, .globalThis = globalThis },
+ .{
+ .method = method,
+ .url = url,
+ .headers = headers orelse Headers{
+ .allocator = bun.default_allocator,
+ },
+ .body = body,
+ .timeout = std.time.ns_per_hour,
+ .disable_keepalive = disable_keepalive,
+ .disable_timeout = disable_timeout,
+ .follow_redirects = follow_redirects,
+ .verbose = verbose,
+ .proxy = proxy,
+ .url_proxy_buffer = url_proxy_buffer,
+ .signal = signal,
+ .globalThis = globalThis,
+ },
promise_val,
) catch unreachable;
return promise_val.asRef();
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 0c7395319..670bb2f7c 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -635,6 +635,11 @@ pub fn onOpen(
log("Connected {s} \n", .{client.url.href});
+ if (client.hasSignalAborted()) {
+ client.closeAndAbort(comptime is_ssl, socket);
+ return;
+ }
+
if (comptime is_ssl) {
var ssl: *BoringSSL.SSL = @ptrCast(*BoringSSL.SSL, socket.getNativeHandle());
if (!ssl.isInitFinished()) {
@@ -661,7 +666,7 @@ pub fn onOpen(
ssl.configureHTTPClient(hostname);
}
}
- client.addAbortSignalEventListenner(is_ssl, socket);
+
if (client.state.request_stage == .pending) {
client.onWritable(true, comptime is_ssl, socket);
}
@@ -695,7 +700,7 @@ pub fn onClose(
}
if (in_progress) {
- client.fail(error.ConnectionClosed, null);
+ client.fail(error.ConnectionClosed);
}
}
pub fn onTimeout(
@@ -707,7 +712,7 @@ pub fn onTimeout(
log("Timeout {s}\n", .{client.url.href});
if (client.state.stage != .done and client.state.stage != .fail) {
- client.fail(error.Timeout, null);
+ client.fail(error.Timeout);
}
}
pub fn onConnectError(
@@ -719,7 +724,7 @@ pub fn onConnectError(
log("onConnectError {s}\n", .{client.url.href});
if (client.state.stage != .done and client.state.stage != .fail)
- client.fail(error.ConnectionRefused, null);
+ client.fail(error.ConnectionRefused);
}
pub fn onEnd(
client: *HTTPClient,
@@ -729,7 +734,7 @@ pub fn onEnd(
log("onEnd {s}\n", .{client.url.href});
if (client.state.stage != .done and client.state.stage != .fail)
- client.fail(error.ConnectionClosed, null);
+ client.fail(error.ConnectionClosed);
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -994,87 +999,26 @@ http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunneling: bool = false,
proxy_tunnel: ?ProxyTunnel = null,
-signal: ?*JSC.AbortSignal = null,
-abort_handler: ?*anyopaque = null,
-abort_handler_deinit: ?*const fn (?*anyopaque) void = null,
-pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*JSC.AbortSignal) HTTPClient {
- return HTTPClient{ .allocator = allocator, .method = method, .url = url, .header_entries = header_entries, .header_buf = header_buf, .signal = signal, .abort_handler = null, .abort_handler_deinit = null };
-}
-
-pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type {
- return struct {
- client: *HTTPClient,
- socket: NewHTTPContext(is_ssl).HTTPSocket,
-
- pub fn init(client: *HTTPClient, socket: NewHTTPContext(is_ssl).HTTPSocket) !*@This() {
- var ctx = try client.allocator.create(@This());
- ctx.client = client;
- ctx.socket = socket;
- return ctx;
- }
-
- pub fn onAborted(this: ?*anyopaque, reason: JSC.JSValue) callconv(.C) void {
- log("onAborted", .{});
- if (this) |this_| {
- const self = bun.cast(*@This(), this_);
- if (self.client.state.response_stage != .done and self.client.state.response_stage != .fail) {
- self.client.closeAndAbort(reason, is_ssl, self.socket);
- }
- }
- }
+aborted: ?*std.atomic.Atomic(bool) = null,
- pub fn deinit(this: ?*anyopaque) void {
- if (this) |this_| {
- var self = bun.cast(*@This(), this_);
- const allocator = self.client.allocator;
- allocator.destroy(self);
- }
- }
+pub fn init(
+ allocator: std.mem.Allocator,
+ method: Method,
+ url: URL,
+ header_entries: Headers.Entries,
+ header_buf: string,
+ signal: ?*std.atomic.Atomic(bool),
+) HTTPClient {
+ return HTTPClient{
+ .allocator = allocator,
+ .method = method,
+ .url = url,
+ .header_entries = header_entries,
+ .header_buf = header_buf,
+ .aborted = signal,
};
}
-pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- if (this.signal) |signal| {
- const aborted = signal.aborted();
- if (aborted) {
- log("addAbortSignalEventListenner already aborted!", .{});
- const reason = signal.abortReason();
- this.closeAndAbort(reason, is_ssl, socket);
- return;
- }
-
- const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable;
- this.abort_handler = bun.cast(*anyopaque, handler);
- this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit;
- _ = signal.addListener(this.abort_handler.?, ClientSocketAbortHandler(is_ssl).onAborted);
- log("addAbortSignalEventListenner added!", .{});
- return;
- }
- log("addAbortSignalEventListenner (signal == null)", .{});
-}
-
-pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue {
- if (this.signal) |signal| {
- const aborted = signal.aborted();
- log("hasSignalAborted {any}", .{aborted});
- if (aborted) {
- return signal.abortReason();
- }
- return null;
- }
- log("hasSignalAborted (signal == null)", .{});
- return null;
-}
-
-pub fn deinitSignal(this: *HTTPClient) void {
- if (this.signal != null) {
- var signal = this.signal.?;
- const ctx = bun.cast(*anyopaque, this);
- signal.cleanNativeBindings(ctx);
- _ = signal.unref();
- this.signal = null;
- }
-}
pub fn deinit(this: *HTTPClient) void {
if (this.redirect) |redirect| {
redirect.release();
@@ -1089,13 +1033,6 @@ pub fn deinit(this: *HTTPClient) void {
this.proxy_tunnel = null;
}
- this.deinitSignal();
-
- if (this.abort_handler != null and this.abort_handler_deinit != null) {
- this.abort_handler_deinit.?(this.abort_handler.?);
- this.abort_handler = null;
- this.abort_handler_deinit = null;
- }
this.state.compressed_body.deinit();
this.state.response_message_buffer.deinit();
}
@@ -1255,8 +1192,30 @@ pub const AsyncHTTP = struct {
};
const AtomicState = std.atomic.Atomic(State);
- pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, callback: HTTPClientResult.Callback, http_proxy: ?URL, signal: ?*JSC.AbortSignal) AsyncHTTP {
- var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy };
+ pub fn init(
+ allocator: std.mem.Allocator,
+ method: Method,
+ url: URL,
+ headers: Headers.Entries,
+ headers_buf: string,
+ response_buffer: *MutableString,
+ request_body: []const u8,
+ timeout: usize,
+ callback: HTTPClientResult.Callback,
+ http_proxy: ?URL,
+ signal: ?*std.atomic.Atomic(bool),
+ ) AsyncHTTP {
+ var this = AsyncHTTP{
+ .allocator = allocator,
+ .url = url,
+ .method = method,
+ .request_headers = headers,
+ .request_header_buf = headers_buf,
+ .request_body = request_body,
+ .response_buffer = response_buffer,
+ .completion_callback = callback,
+ .http_proxy = http_proxy,
+ };
this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal);
this.client.timeout = timeout;
this.client.http_proxy = this.http_proxy;
@@ -1293,7 +1252,8 @@ pub const AsyncHTTP = struct {
fn reset(this: *AsyncHTTP) !void {
const timeout = this.timeout;
- this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf);
+ var aborted = this.client.aborted;
+ this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf, aborted);
this.client.timeout = timeout;
this.client.http_proxy = this.http_proxy;
if (this.http_proxy) |proxy| {
@@ -1409,6 +1369,10 @@ pub const AsyncHTTP = struct {
}
};
+pub fn hasSignalAborted(this: *const HTTPClient) bool {
+ return (this.aborted orelse return false).load(.Monotonic);
+}
+
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
var header_count: usize = 0;
var header_entries = this.header_entries.slice();
@@ -1520,7 +1484,7 @@ pub fn doRedirect(this: *HTTPClient) void {
std.debug.assert(this.follow_redirects);
if (this.remaining_redirect_count == 0) {
- this.fail(error.TooManyRedirects, null);
+ this.fail(error.TooManyRedirects);
return;
}
this.state.reset();
@@ -1557,18 +1521,18 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString)
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
// Aborted before connecting
- if (this.hasSignalAborted()) |reason| {
- this.fail(error.Aborted, reason);
+ if (this.hasSignalAborted()){
+ this.fail(error.Aborted);
return;
}
var socket = http_thread.connect(this, is_ssl) catch |err| {
- this.fail(err, null);
+ this.fail(err);
return;
};
if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) {
- this.fail(error.ConnectionClosed, null);
+ this.fail(error.ConnectionClosed);
std.debug.assert(this.state.fail != error.NoError);
}
}
@@ -1594,8 +1558,8 @@ fn printResponse(response: picohttp.Response) void {
}
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- if (this.hasSignalAborted()) |reason| {
- this.closeAndAbort(reason, is_ssl, socket);
+ if (this.hasSignalAborted()) {
+ this.closeAndAbort(is_ssl, socket);
return;
}
@@ -1840,7 +1804,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
if (this.state.stage != .fail and this.state.stage != .done) {
- log("closeAndFail", .{});
+ log("closeAndFail: {s}", .{@errorName(err)});
if (!socket.isClosed()) {
socket.ext(**anyopaque).?.* = bun.cast(
**anyopaque,
@@ -1848,7 +1812,7 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc
);
socket.close(0, null);
}
- this.fail(err, null);
+ this.fail(err);
}
}
@@ -1903,8 +1867,8 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
}
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- if (this.hasSignalAborted()) |reason| {
- this.closeAndAbort(reason, is_ssl, socket);
+ if (this.hasSignalAborted()) {
+ this.closeAndAbort(is_ssl, socket);
return;
}
switch (this.state.response_stage) {
@@ -2140,34 +2104,22 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
}
-pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- if (this.state.stage != .fail and this.state.stage != .done) {
- log("closeAndAbort", .{});
- if (!socket.isClosed()) {
- socket.ext(**anyopaque).?.* = bun.cast(
- **anyopaque,
- NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
- );
- socket.close(0, null);
- }
- this.fail(error.Aborted, reason);
- }
+pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ this.closeAndFail(error.Aborted, comptime is_ssl, socket);
}
-fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void {
+fn fail(this: *HTTPClient, err: anyerror) void {
this.state.request_stage = .fail;
this.state.response_stage = .fail;
this.state.fail = err;
this.state.stage = .fail;
const callback = this.completion_callback;
- const result = this.toResult(this.cloned_metadata, reason);
+ const result = this.toResult(this.cloned_metadata);
this.state.reset();
this.proxy_tunneling = false;
callback.run(result);
-
- this.deinitSignal();
}
// We have to clone metadata immediately after use
@@ -2203,12 +2155,10 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss
if (this.state.stage != .done and this.state.stage != .fail) {
log("done", .{});
- this.deinitSignal();
-
var out_str = this.state.body_out_str.?;
var body = out_str.*;
this.cloned_metadata.response = this.state.pending_response;
- const result = this.toResult(this.cloned_metadata, null);
+ const result = this.toResult(this.cloned_metadata);
const callback = this.completion_callback;
socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
@@ -2251,7 +2201,6 @@ pub const HTTPClientResult = struct {
fail: anyerror = error.NoError,
redirected: bool = false,
headers_buf: []picohttp.Header = &.{},
- reason: ?JSC.JSValue = null,
pub fn isSuccess(this: *const HTTPClientResult) bool {
return this.fail == error.NoError;
@@ -2304,7 +2253,7 @@ pub const HTTPClientResult = struct {
};
};
-pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.JSValue) HTTPClientResult {
+pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult {
return HTTPClientResult{
.body = this.state.body_out_str,
.response = metadata.response,
@@ -2312,7 +2261,6 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.
.redirected = this.remaining_redirect_count != default_redirect_count,
.href = metadata.url,
.fail = this.state.fail,
- .reason = reason,
.headers_buf = metadata.response.headers,
};
}
diff --git a/test/bun.js/fetch.test.js b/test/bun.js/fetch.test.js
index 0ecba0cd8..ec903341f 100644
--- a/test/bun.js/fetch.test.js
+++ b/test/bun.js/fetch.test.js
@@ -1,21 +1,26 @@
-import { afterAll, beforeAll, describe, expect, it, test } from "bun:test";
+import { afterAll, afterEach, beforeAll, describe, expect, it, test, beforeEach } from "bun:test";
import fs, { chmodSync, unlinkSync } from "fs";
import { mkfifo } from "mkfifo";
import { gc, withoutAggressiveGC } from "./gc";
+const sleep = countdown => {
+ return Bun.sleep(countdown);
+};
+
const exampleFixture = fs.readFileSync(
import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) + "/fetch.js.txt",
"utf8",
);
var cachedServer;
-function getServer(handler) {
- cachedServer ||= Bun.serve(handler);
- cachedServer.reload(handler);
- return cachedServer;
+function getServer({ port, ...rest }) {
+ return (cachedServer = Bun.serve({
+ ...rest,
+ port: 0,
+ }));
}
-afterAll(() => {
+afterEach(() => {
cachedServer?.stop?.(true);
});
@@ -63,9 +68,9 @@ describe("AbortSignalStreamTest", async () => {
} catch (ex) {
error = ex;
}
- expect(error instanceof DOMException).toBeTruthy();
expect(error.name).toBe("AbortError");
expect(error.message).toBe("The operation was aborted.");
+ expect(error instanceof DOMException).toBeTruthy();
}
}
@@ -77,7 +82,6 @@ describe("AbortSignalStreamTest", async () => {
});
describe("AbortSignalDirectStreamTest", () => {
- const port = 74322;
async function abortOnStage(body, stage) {
let error = undefined;
var abortController = new AbortController();
@@ -119,9 +123,9 @@ describe("AbortSignalDirectStreamTest", () => {
} catch (ex) {
error = ex;
}
- expect(error instanceof DOMException).toBeTruthy();
expect(error.name).toBe("AbortError");
expect(error.message).toBe("The operation was aborted.");
+ expect(error instanceof DOMException).toBeTruthy();
}
}
@@ -133,6 +137,39 @@ describe("AbortSignalDirectStreamTest", () => {
});
describe("AbortSignal", () => {
+ var server;
+ beforeEach(() => {
+ server = getServer({
+ async fetch(request) {
+ if (request.url.endsWith("/nodelay")) {
+ return new Response("Hello");
+ }
+ if (request.url.endsWith("/stream")) {
+ const reader = request.body.getReader();
+ const body = new ReadableStream({
+ async pull(controller) {
+ if (!reader) controller.close();
+ const { done, value } = await reader.read();
+ // When no more data needs to be consumed, close the stream
+ if (done) {
+ controller.close();
+ return;
+ }
+ // Enqueue the next data chunk into our target stream
+ controller.enqueue(value);
+ },
+ });
+ return new Response(body);
+ }
+ if (request.method.toUpperCase() === "POST") {
+ const body = await request.text();
+ return new Response(body);
+ }
+ await sleep(15);
+ return new Response("Hello");
+ },
+ });
+ });
it("AbortError", async () => {
let name;
try {
@@ -140,7 +177,7 @@ describe("AbortSignal", () => {
const signal = controller.signal;
async function manualAbort() {
- await Bun.sleep(10);
+ await sleep(1);
controller.abort();
}
await Promise.all([
@@ -172,8 +209,9 @@ describe("AbortSignal", () => {
try {
var controller = new AbortController();
const signal = controller.signal;
+
async function manualAbort() {
- await Bun.sleep(10);
+ await sleep(10);
controller.abort("My Reason");
}
await Promise.all([
@@ -197,7 +235,7 @@ describe("AbortSignal", () => {
});
async function manualAbort() {
- await Bun.sleep(10);
+ await sleep(10);
controller.abort();
}
await Promise.all([
@@ -231,9 +269,9 @@ describe("AbortSignal", () => {
error = ex;
}
- expect(error instanceof DOMException).toBeTruthy();
expect(error.name).toBe("AbortError");
expect(error.message).toBe("The operation was aborted.");
+ expect(error instanceof DOMException).toBeTruthy();
});
it("TimeoutError", async () => {
@@ -253,7 +291,7 @@ describe("AbortSignal", () => {
const signal = controller.signal;
const request = new Request(`http://127.0.0.1:${server.port}`, { signal });
async function manualAbort() {
- await Bun.sleep(10);
+ await sleep(10);
controller.abort();
}
await Promise.all([fetch(request).then(res => res.text()), manualAbort()]);
@@ -443,7 +481,7 @@ describe("fetch", () => {
await fetch(url, { body: "buntastic" });
expect(false).toBe(true);
} catch (exception) {
- expect(exception instanceof TypeError).toBe(true);
+ expect(exception.name).toBe("TypeError");
}
});
});