diff options
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/bindings/bindings.zig | 50 | ||||
-rw-r--r-- | src/bun.js/webcore/request.zig | 44 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 118 | ||||
-rw-r--r-- | src/http_client_async.zig | 198 | ||||
-rw-r--r-- | test/bun.js/fetch.test.js | 68 |
5 files changed, 260 insertions, 218 deletions
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index a43ff8265..48cbe0887 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -1690,6 +1690,26 @@ pub const AbortSignal = extern opaque { pub const name = "JSC::AbortSignal"; pub const namespace = "JSC"; + pub fn listen( + this: *AbortSignal, + comptime Context: type, + ctx: *Context, + comptime cb: *const fn (*Context, JSValue) void, + ) *AbortSignal { + const Wrapper = struct { + const call = cb; + pub fn callback( + ptr: ?*anyopaque, + reason: JSValue, + ) callconv(.C) void { + var val = bun.cast(*Context, ptr.?); + call(val, reason); + } + }; + + return this.addListener(@ptrCast(?*anyopaque, ctx), Wrapper.callback); + } + pub fn addListener( this: *AbortSignal, ctx: ?*anyopaque, @@ -1709,10 +1729,12 @@ pub const AbortSignal = extern opaque { return cppFn("signal", .{ this, reason }); } + /// This function is not threadsafe. aborted is a boolean, not an atomic! pub fn aborted(this: *AbortSignal) bool { return cppFn("aborted", .{this}); } + /// This function is not threadsafe. JSValue cannot safely be passed between threads. pub fn abortReason(this: *AbortSignal) JSValue { return cppFn("abortReason", .{this}); } @@ -1734,11 +1756,11 @@ pub const AbortSignal = extern opaque { } pub fn toJS(this: *AbortSignal, global: *JSGlobalObject) JSValue { - return cppFn("toJS", .{this, global}); + return cppFn("toJS", .{ this, global }); } pub fn create(global: *JSGlobalObject) JSValue { - return cppFn("create", .{ global }); + return cppFn("create", .{global}); } pub fn createAbortError(message: *const ZigString, code: *const ZigString, global: *JSGlobalObject) JSValue { @@ -1749,20 +1771,7 @@ pub const AbortSignal = extern opaque { return cppFn("createTimeoutError", .{ message, code, global }); } - pub const Extern = [_][]const u8{ - "createAbortError", - "createTimeoutError", - "create", - "ref", - "unref", - "signal", - "abortReason", - "aborted", - "addListener", - "fromJS", - "toJS", - "cleanNativeBindings" - }; + pub const Extern = [_][]const u8{ "createAbortError", "createTimeoutError", "create", "ref", "unref", "signal", "abortReason", "aborted", "addListener", "fromJS", "toJS", "cleanNativeBindings" }; }; pub const JSPromise = extern struct { @@ -3567,14 +3576,7 @@ pub const JSValue = enum(JSValueReprInt) { return cppFn("eqlCell", .{ this, other }); } - pub const BuiltinName = enum(u8) { - method, - headers, - status, - url, - body, - data - }; + pub const BuiltinName = enum(u8) { method, headers, status, url, body, data }; // intended to be more lightweight than ZigString pub fn fastGet(this: JSValue, global: *JSGlobalObject, builtin_name: BuiltinName) ?JSValue { diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig index fc5bff2b5..b2ff43a5a 100644 --- a/src/bun.js/webcore/request.zig +++ b/src/bun.js/webcore/request.zig @@ -260,7 +260,7 @@ pub const Request = struct { return ZigString.init(Properties.UTF8.navigate).toValue(globalThis); } - pub fn finalize(this: *Request) callconv(.C) void { + pub fn finalizeWithoutDeinit(this: *Request) void { if (this.headers) |headers| { headers.deref(); this.headers = null; @@ -275,7 +275,10 @@ pub const Request = struct { _ = signal.unref(); this.signal = null; } + } + pub fn finalize(this: *Request) callconv(.C) void { + this.finalizeWithoutDeinit(); bun.default_allocator.destroy(this); } @@ -402,10 +405,7 @@ pub const Request = struct { if (Body.Value.fromJS(globalThis, body_)) |body| { request.body = body; } else { - if (request.headers) |head| { - head.deref(); - } - + request.finalizeWithoutDeinit(); return null; } } @@ -419,23 +419,6 @@ pub const Request = struct { } }, else => { - if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| { - request.headers = req_init.headers; - request.method = req_init.method; - } - - if (arguments[1].fastGet(globalThis, .body)) |body_| { - if (Body.Value.fromJS(globalThis, body_)) |body| { - request.body = body; - } else { - if (request.headers) |head| { - head.deref(); - } - - return null; - } - } - if (arguments[1].get(globalThis, "signal")) |signal_| { if (AbortSignal.fromJS(signal_)) |signal| { //Keep it alive @@ -444,10 +427,21 @@ pub const Request = struct { } else { globalThis.throw("Failed to construct 'Request': member signal is not of type AbortSignal.", .{}); - if (request.headers) |head| { - head.deref(); - } + request.finalizeWithoutDeinit(); + return null; + } + } + + if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| { + request.headers = req_init.headers; + request.method = req_init.method; + } + if (arguments[1].fastGet(globalThis, .body)) |body_| { + if (Body.Value.fromJS(globalThis, body_)) |body| { + request.body = body; + } else { + request.finalizeWithoutDeinit(); return null; } } diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 39ee205c2..cd4b1be4c 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -623,6 +623,12 @@ pub const Fetch = struct { /// We always clone url and proxy (if informed) url_proxy_buffer: []const u8 = "", + signal: ?*JSC.AbortSignal = null, + aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + + // must be stored because AbortSignal stores reason weakly + abort_reason: JSValue = JSValue.zero, + pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { return FetchTasklet{}; } @@ -641,6 +647,14 @@ pub const Fetch = struct { this.result.deinitMetadata(); this.response_buffer.deinit(); this.request_body.detach(); + + if (this.abort_reason != .zero) this.abort_reason.unprotect(); + + if (this.signal) |signal| { + signal.cleanNativeBindings(this); + _ = signal.unref(); + this.signal = null; + } } pub fn deinit(this: *FetchTasklet) void { @@ -688,29 +702,23 @@ pub const Fetch = struct { } pub fn onReject(this: *FetchTasklet) JSValue { + if (this.signal) |signal| { + _ = signal.unref(); + this.signal = null; + } + + if (!this.abort_reason.isEmptyOrUndefinedOrNull()) { + return this.abort_reason; + } + if (this.result.isTimeout()) { - //Timeout with reason - if (this.result.reason) |exception| { - if (!exception.isEmptyOrUndefinedOrNull()) { - return exception; - } - } - //Timeout without reason - const exception = JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this); - return exception; + // Timeout without reason + return JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this); } if (this.result.isAbort()) { - //Abort can be TimeoutError (AbortSignal.timeout(ms)) or AbortError so we need to detect - if (this.result.reason) |exception| { - if (!exception.isEmptyOrUndefinedOrNull()) { - return exception; - } - } - - //Abort without reason - const exception = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this); - return exception; + // Abort without reason + return JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this); } const fetch_error = JSC.SystemError{ @@ -793,6 +801,7 @@ pub const Fetch = struct { .request_headers = fetch_options.headers, .ref = JSC.napi.Ref.create(globalThis, promise), .url_proxy_buffer = fetch_options.url_proxy_buffer, + .signal = fetch_options.signal, }; if (fetch_tasklet.request_body.store()) |store| { @@ -808,12 +817,24 @@ pub const Fetch = struct { proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url); } - fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New( - *FetchTasklet, - FetchTasklet.callback, - ).init( - fetch_tasklet, - ), proxy, fetch_options.signal); + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( + allocator, + fetch_options.method, + fetch_options.url, + fetch_options.headers.entries, + fetch_options.headers.buf.items, + &fetch_tasklet.response_buffer, + fetch_tasklet.request_body.slice(), + fetch_options.timeout, + HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + fetch_tasklet, + ), + proxy, + if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, + ); if (!fetch_options.follow_redirects) { fetch_tasklet.http.?.client.remaining_redirect_count = 0; @@ -822,10 +843,35 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout; fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; + + if (fetch_tasklet.signal) |signal| { + fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); + } return fetch_tasklet; } - const FetchOptions = struct { method: Method, headers: Headers, body: AnyBlob, timeout: usize, disable_timeout: bool, disable_keepalive: bool, url: ZigURL, verbose: bool = false, follow_redirects: bool = true, proxy: ?ZigURL = null, url_proxy_buffer: []const u8 = "", signal: ?*JSC.AbortSignal = null, globalThis: ?*JSGlobalObject }; + pub fn abortListener(this: *FetchTasklet, reason: JSValue) void { + reason.ensureStillAlive(); + this.abort_reason = reason; + reason.protect(); + this.aborted.store(true, .Monotonic); + } + + const FetchOptions = struct { + method: Method, + headers: Headers, + body: AnyBlob, + timeout: usize, + disable_timeout: bool, + disable_keepalive: bool, + url: ZigURL, + verbose: bool = false, + follow_redirects: bool = true, + proxy: ?ZigURL = null, + url_proxy_buffer: []const u8 = "", + signal: ?*JSC.AbortSignal = null, + globalThis: ?*JSGlobalObject, + }; pub fn queue( allocator: std.mem.Allocator, @@ -1205,9 +1251,23 @@ pub const Fetch = struct { _ = FetchTasklet.queue( default_allocator, globalThis, - .{ .method = method, .url = url, .headers = headers orelse Headers{ - .allocator = bun.default_allocator, - }, .body = body, .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, .follow_redirects = follow_redirects, .verbose = verbose, .proxy = proxy, .url_proxy_buffer = url_proxy_buffer, .signal = signal, .globalThis = globalThis }, + .{ + .method = method, + .url = url, + .headers = headers orelse Headers{ + .allocator = bun.default_allocator, + }, + .body = body, + .timeout = std.time.ns_per_hour, + .disable_keepalive = disable_keepalive, + .disable_timeout = disable_timeout, + .follow_redirects = follow_redirects, + .verbose = verbose, + .proxy = proxy, + .url_proxy_buffer = url_proxy_buffer, + .signal = signal, + .globalThis = globalThis, + }, promise_val, ) catch unreachable; return promise_val.asRef(); diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 0c7395319..670bb2f7c 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -635,6 +635,11 @@ pub fn onOpen( log("Connected {s} \n", .{client.url.href}); + if (client.hasSignalAborted()) { + client.closeAndAbort(comptime is_ssl, socket); + return; + } + if (comptime is_ssl) { var ssl: *BoringSSL.SSL = @ptrCast(*BoringSSL.SSL, socket.getNativeHandle()); if (!ssl.isInitFinished()) { @@ -661,7 +666,7 @@ pub fn onOpen( ssl.configureHTTPClient(hostname); } } - client.addAbortSignalEventListenner(is_ssl, socket); + if (client.state.request_stage == .pending) { client.onWritable(true, comptime is_ssl, socket); } @@ -695,7 +700,7 @@ pub fn onClose( } if (in_progress) { - client.fail(error.ConnectionClosed, null); + client.fail(error.ConnectionClosed); } } pub fn onTimeout( @@ -707,7 +712,7 @@ pub fn onTimeout( log("Timeout {s}\n", .{client.url.href}); if (client.state.stage != .done and client.state.stage != .fail) { - client.fail(error.Timeout, null); + client.fail(error.Timeout); } } pub fn onConnectError( @@ -719,7 +724,7 @@ pub fn onConnectError( log("onConnectError {s}\n", .{client.url.href}); if (client.state.stage != .done and client.state.stage != .fail) - client.fail(error.ConnectionRefused, null); + client.fail(error.ConnectionRefused); } pub fn onEnd( client: *HTTPClient, @@ -729,7 +734,7 @@ pub fn onEnd( log("onEnd {s}\n", .{client.url.href}); if (client.state.stage != .done and client.state.stage != .fail) - client.fail(error.ConnectionClosed, null); + client.fail(error.ConnectionClosed); } pub inline fn getAllocator() std.mem.Allocator { @@ -994,87 +999,26 @@ http_proxy: ?URL = null, proxy_authorization: ?[]u8 = null, proxy_tunneling: bool = false, proxy_tunnel: ?ProxyTunnel = null, -signal: ?*JSC.AbortSignal = null, -abort_handler: ?*anyopaque = null, -abort_handler_deinit: ?*const fn (?*anyopaque) void = null, -pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*JSC.AbortSignal) HTTPClient { - return HTTPClient{ .allocator = allocator, .method = method, .url = url, .header_entries = header_entries, .header_buf = header_buf, .signal = signal, .abort_handler = null, .abort_handler_deinit = null }; -} - -pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type { - return struct { - client: *HTTPClient, - socket: NewHTTPContext(is_ssl).HTTPSocket, - - pub fn init(client: *HTTPClient, socket: NewHTTPContext(is_ssl).HTTPSocket) !*@This() { - var ctx = try client.allocator.create(@This()); - ctx.client = client; - ctx.socket = socket; - return ctx; - } - - pub fn onAborted(this: ?*anyopaque, reason: JSC.JSValue) callconv(.C) void { - log("onAborted", .{}); - if (this) |this_| { - const self = bun.cast(*@This(), this_); - if (self.client.state.response_stage != .done and self.client.state.response_stage != .fail) { - self.client.closeAndAbort(reason, is_ssl, self.socket); - } - } - } +aborted: ?*std.atomic.Atomic(bool) = null, - pub fn deinit(this: ?*anyopaque) void { - if (this) |this_| { - var self = bun.cast(*@This(), this_); - const allocator = self.client.allocator; - allocator.destroy(self); - } - } +pub fn init( + allocator: std.mem.Allocator, + method: Method, + url: URL, + header_entries: Headers.Entries, + header_buf: string, + signal: ?*std.atomic.Atomic(bool), +) HTTPClient { + return HTTPClient{ + .allocator = allocator, + .method = method, + .url = url, + .header_entries = header_entries, + .header_buf = header_buf, + .aborted = signal, }; } -pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - if (this.signal) |signal| { - const aborted = signal.aborted(); - if (aborted) { - log("addAbortSignalEventListenner already aborted!", .{}); - const reason = signal.abortReason(); - this.closeAndAbort(reason, is_ssl, socket); - return; - } - - const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable; - this.abort_handler = bun.cast(*anyopaque, handler); - this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit; - _ = signal.addListener(this.abort_handler.?, ClientSocketAbortHandler(is_ssl).onAborted); - log("addAbortSignalEventListenner added!", .{}); - return; - } - log("addAbortSignalEventListenner (signal == null)", .{}); -} - -pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue { - if (this.signal) |signal| { - const aborted = signal.aborted(); - log("hasSignalAborted {any}", .{aborted}); - if (aborted) { - return signal.abortReason(); - } - return null; - } - log("hasSignalAborted (signal == null)", .{}); - return null; -} - -pub fn deinitSignal(this: *HTTPClient) void { - if (this.signal != null) { - var signal = this.signal.?; - const ctx = bun.cast(*anyopaque, this); - signal.cleanNativeBindings(ctx); - _ = signal.unref(); - this.signal = null; - } -} pub fn deinit(this: *HTTPClient) void { if (this.redirect) |redirect| { redirect.release(); @@ -1089,13 +1033,6 @@ pub fn deinit(this: *HTTPClient) void { this.proxy_tunnel = null; } - this.deinitSignal(); - - if (this.abort_handler != null and this.abort_handler_deinit != null) { - this.abort_handler_deinit.?(this.abort_handler.?); - this.abort_handler = null; - this.abort_handler_deinit = null; - } this.state.compressed_body.deinit(); this.state.response_message_buffer.deinit(); } @@ -1255,8 +1192,30 @@ pub const AsyncHTTP = struct { }; const AtomicState = std.atomic.Atomic(State); - pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, callback: HTTPClientResult.Callback, http_proxy: ?URL, signal: ?*JSC.AbortSignal) AsyncHTTP { - var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy }; + pub fn init( + allocator: std.mem.Allocator, + method: Method, + url: URL, + headers: Headers.Entries, + headers_buf: string, + response_buffer: *MutableString, + request_body: []const u8, + timeout: usize, + callback: HTTPClientResult.Callback, + http_proxy: ?URL, + signal: ?*std.atomic.Atomic(bool), + ) AsyncHTTP { + var this = AsyncHTTP{ + .allocator = allocator, + .url = url, + .method = method, + .request_headers = headers, + .request_header_buf = headers_buf, + .request_body = request_body, + .response_buffer = response_buffer, + .completion_callback = callback, + .http_proxy = http_proxy, + }; this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal); this.client.timeout = timeout; this.client.http_proxy = this.http_proxy; @@ -1293,7 +1252,8 @@ pub const AsyncHTTP = struct { fn reset(this: *AsyncHTTP) !void { const timeout = this.timeout; - this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf); + var aborted = this.client.aborted; + this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf, aborted); this.client.timeout = timeout; this.client.http_proxy = this.http_proxy; if (this.http_proxy) |proxy| { @@ -1409,6 +1369,10 @@ pub const AsyncHTTP = struct { } }; +pub fn hasSignalAborted(this: *const HTTPClient) bool { + return (this.aborted orelse return false).load(.Monotonic); +} + pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); @@ -1520,7 +1484,7 @@ pub fn doRedirect(this: *HTTPClient) void { std.debug.assert(this.follow_redirects); if (this.remaining_redirect_count == 0) { - this.fail(error.TooManyRedirects, null); + this.fail(error.TooManyRedirects); return; } this.state.reset(); @@ -1557,18 +1521,18 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) fn start_(this: *HTTPClient, comptime is_ssl: bool) void { // Aborted before connecting - if (this.hasSignalAborted()) |reason| { - this.fail(error.Aborted, reason); + if (this.hasSignalAborted()){ + this.fail(error.Aborted); return; } var socket = http_thread.connect(this, is_ssl) catch |err| { - this.fail(err, null); + this.fail(err); return; }; if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) { - this.fail(error.ConnectionClosed, null); + this.fail(error.ConnectionClosed); std.debug.assert(this.state.fail != error.NoError); } } @@ -1594,8 +1558,8 @@ fn printResponse(response: picohttp.Response) void { } pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - if (this.hasSignalAborted()) |reason| { - this.closeAndAbort(reason, is_ssl, socket); + if (this.hasSignalAborted()) { + this.closeAndAbort(is_ssl, socket); return; } @@ -1840,7 +1804,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { if (this.state.stage != .fail and this.state.stage != .done) { - log("closeAndFail", .{}); + log("closeAndFail: {s}", .{@errorName(err)}); if (!socket.isClosed()) { socket.ext(**anyopaque).?.* = bun.cast( **anyopaque, @@ -1848,7 +1812,7 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc ); socket.close(0, null); } - this.fail(err, null); + this.fail(err); } } @@ -1903,8 +1867,8 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP } pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { - if (this.hasSignalAborted()) |reason| { - this.closeAndAbort(reason, is_ssl, socket); + if (this.hasSignalAborted()) { + this.closeAndAbort(is_ssl, socket); return; } switch (this.state.response_stage) { @@ -2140,34 +2104,22 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } } -pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - if (this.state.stage != .fail and this.state.stage != .done) { - log("closeAndAbort", .{}); - if (!socket.isClosed()) { - socket.ext(**anyopaque).?.* = bun.cast( - **anyopaque, - NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), - ); - socket.close(0, null); - } - this.fail(error.Aborted, reason); - } +pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + this.closeAndFail(error.Aborted, comptime is_ssl, socket); } -fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void { +fn fail(this: *HTTPClient, err: anyerror) void { this.state.request_stage = .fail; this.state.response_stage = .fail; this.state.fail = err; this.state.stage = .fail; const callback = this.completion_callback; - const result = this.toResult(this.cloned_metadata, reason); + const result = this.toResult(this.cloned_metadata); this.state.reset(); this.proxy_tunneling = false; callback.run(result); - - this.deinitSignal(); } // We have to clone metadata immediately after use @@ -2203,12 +2155,10 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss if (this.state.stage != .done and this.state.stage != .fail) { log("done", .{}); - this.deinitSignal(); - var out_str = this.state.body_out_str.?; var body = out_str.*; this.cloned_metadata.response = this.state.pending_response; - const result = this.toResult(this.cloned_metadata, null); + const result = this.toResult(this.cloned_metadata); const callback = this.completion_callback; socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); @@ -2251,7 +2201,6 @@ pub const HTTPClientResult = struct { fail: anyerror = error.NoError, redirected: bool = false, headers_buf: []picohttp.Header = &.{}, - reason: ?JSC.JSValue = null, pub fn isSuccess(this: *const HTTPClientResult) bool { return this.fail == error.NoError; @@ -2304,7 +2253,7 @@ pub const HTTPClientResult = struct { }; }; -pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.JSValue) HTTPClientResult { +pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult { return HTTPClientResult{ .body = this.state.body_out_str, .response = metadata.response, @@ -2312,7 +2261,6 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC. .redirected = this.remaining_redirect_count != default_redirect_count, .href = metadata.url, .fail = this.state.fail, - .reason = reason, .headers_buf = metadata.response.headers, }; } diff --git a/test/bun.js/fetch.test.js b/test/bun.js/fetch.test.js index 0ecba0cd8..ec903341f 100644 --- a/test/bun.js/fetch.test.js +++ b/test/bun.js/fetch.test.js @@ -1,21 +1,26 @@ -import { afterAll, beforeAll, describe, expect, it, test } from "bun:test"; +import { afterAll, afterEach, beforeAll, describe, expect, it, test, beforeEach } from "bun:test"; import fs, { chmodSync, unlinkSync } from "fs"; import { mkfifo } from "mkfifo"; import { gc, withoutAggressiveGC } from "./gc"; +const sleep = countdown => { + return Bun.sleep(countdown); +}; + const exampleFixture = fs.readFileSync( import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) + "/fetch.js.txt", "utf8", ); var cachedServer; -function getServer(handler) { - cachedServer ||= Bun.serve(handler); - cachedServer.reload(handler); - return cachedServer; +function getServer({ port, ...rest }) { + return (cachedServer = Bun.serve({ + ...rest, + port: 0, + })); } -afterAll(() => { +afterEach(() => { cachedServer?.stop?.(true); }); @@ -63,9 +68,9 @@ describe("AbortSignalStreamTest", async () => { } catch (ex) { error = ex; } - expect(error instanceof DOMException).toBeTruthy(); expect(error.name).toBe("AbortError"); expect(error.message).toBe("The operation was aborted."); + expect(error instanceof DOMException).toBeTruthy(); } } @@ -77,7 +82,6 @@ describe("AbortSignalStreamTest", async () => { }); describe("AbortSignalDirectStreamTest", () => { - const port = 74322; async function abortOnStage(body, stage) { let error = undefined; var abortController = new AbortController(); @@ -119,9 +123,9 @@ describe("AbortSignalDirectStreamTest", () => { } catch (ex) { error = ex; } - expect(error instanceof DOMException).toBeTruthy(); expect(error.name).toBe("AbortError"); expect(error.message).toBe("The operation was aborted."); + expect(error instanceof DOMException).toBeTruthy(); } } @@ -133,6 +137,39 @@ describe("AbortSignalDirectStreamTest", () => { }); describe("AbortSignal", () => { + var server; + beforeEach(() => { + server = getServer({ + async fetch(request) { + if (request.url.endsWith("/nodelay")) { + return new Response("Hello"); + } + if (request.url.endsWith("/stream")) { + const reader = request.body.getReader(); + const body = new ReadableStream({ + async pull(controller) { + if (!reader) controller.close(); + const { done, value } = await reader.read(); + // When no more data needs to be consumed, close the stream + if (done) { + controller.close(); + return; + } + // Enqueue the next data chunk into our target stream + controller.enqueue(value); + }, + }); + return new Response(body); + } + if (request.method.toUpperCase() === "POST") { + const body = await request.text(); + return new Response(body); + } + await sleep(15); + return new Response("Hello"); + }, + }); + }); it("AbortError", async () => { let name; try { @@ -140,7 +177,7 @@ describe("AbortSignal", () => { const signal = controller.signal; async function manualAbort() { - await Bun.sleep(10); + await sleep(1); controller.abort(); } await Promise.all([ @@ -172,8 +209,9 @@ describe("AbortSignal", () => { try { var controller = new AbortController(); const signal = controller.signal; + async function manualAbort() { - await Bun.sleep(10); + await sleep(10); controller.abort("My Reason"); } await Promise.all([ @@ -197,7 +235,7 @@ describe("AbortSignal", () => { }); async function manualAbort() { - await Bun.sleep(10); + await sleep(10); controller.abort(); } await Promise.all([ @@ -231,9 +269,9 @@ describe("AbortSignal", () => { error = ex; } - expect(error instanceof DOMException).toBeTruthy(); expect(error.name).toBe("AbortError"); expect(error.message).toBe("The operation was aborted."); + expect(error instanceof DOMException).toBeTruthy(); }); it("TimeoutError", async () => { @@ -253,7 +291,7 @@ describe("AbortSignal", () => { const signal = controller.signal; const request = new Request(`http://127.0.0.1:${server.port}`, { signal }); async function manualAbort() { - await Bun.sleep(10); + await sleep(10); controller.abort(); } await Promise.all([fetch(request).then(res => res.text()), manualAbort()]); @@ -443,7 +481,7 @@ describe("fetch", () => { await fetch(url, { body: "buntastic" }); expect(false).toBe(true); } catch (exception) { - expect(exception instanceof TypeError).toBe(true); + expect(exception.name).toBe("TypeError"); } }); }); |