diff options
| author | 2023-02-22 21:29:43 -0800 | |
|---|---|---|
| committer | 2023-02-22 21:29:43 -0800 | |
| commit | 0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef (patch) | |
| tree | c16f4292aea038ab2101ab30ffeb41e8fbebc446 /src/bun.js | |
| parent | 583b29632c6006ad4cdce75d1609f087c6bd2173 (diff) | |
| download | bun-0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef.tar.gz bun-0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef.tar.zst bun-0143eccb27ef0c9a93bd39636d5f3ecfd08f7eef.zip | |
Fix flaky request.signal implementation
Diffstat (limited to 'src/bun.js')
| -rw-r--r-- | src/bun.js/bindings/bindings.zig | 50 | ||||
| -rw-r--r-- | src/bun.js/webcore/request.zig | 44 | ||||
| -rw-r--r-- | src/bun.js/webcore/response.zig | 118 |
3 files changed, 134 insertions, 78 deletions
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index a43ff8265..48cbe0887 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -1690,6 +1690,26 @@ pub const AbortSignal = extern opaque { pub const name = "JSC::AbortSignal"; pub const namespace = "JSC"; + pub fn listen( + this: *AbortSignal, + comptime Context: type, + ctx: *Context, + comptime cb: *const fn (*Context, JSValue) void, + ) *AbortSignal { + const Wrapper = struct { + const call = cb; + pub fn callback( + ptr: ?*anyopaque, + reason: JSValue, + ) callconv(.C) void { + var val = bun.cast(*Context, ptr.?); + call(val, reason); + } + }; + + return this.addListener(@ptrCast(?*anyopaque, ctx), Wrapper.callback); + } + pub fn addListener( this: *AbortSignal, ctx: ?*anyopaque, @@ -1709,10 +1729,12 @@ pub const AbortSignal = extern opaque { return cppFn("signal", .{ this, reason }); } + /// This function is not threadsafe. aborted is a boolean, not an atomic! pub fn aborted(this: *AbortSignal) bool { return cppFn("aborted", .{this}); } + /// This function is not threadsafe. JSValue cannot safely be passed between threads. pub fn abortReason(this: *AbortSignal) JSValue { return cppFn("abortReason", .{this}); } @@ -1734,11 +1756,11 @@ pub const AbortSignal = extern opaque { } pub fn toJS(this: *AbortSignal, global: *JSGlobalObject) JSValue { - return cppFn("toJS", .{this, global}); + return cppFn("toJS", .{ this, global }); } pub fn create(global: *JSGlobalObject) JSValue { - return cppFn("create", .{ global }); + return cppFn("create", .{global}); } pub fn createAbortError(message: *const ZigString, code: *const ZigString, global: *JSGlobalObject) JSValue { @@ -1749,20 +1771,7 @@ pub const AbortSignal = extern opaque { return cppFn("createTimeoutError", .{ message, code, global }); } - pub const Extern = [_][]const u8{ - "createAbortError", - "createTimeoutError", - "create", - "ref", - "unref", - "signal", - "abortReason", - "aborted", - "addListener", - "fromJS", - "toJS", - "cleanNativeBindings" - }; + pub const Extern = [_][]const u8{ "createAbortError", "createTimeoutError", "create", "ref", "unref", "signal", "abortReason", "aborted", "addListener", "fromJS", "toJS", "cleanNativeBindings" }; }; pub const JSPromise = extern struct { @@ -3567,14 +3576,7 @@ pub const JSValue = enum(JSValueReprInt) { return cppFn("eqlCell", .{ this, other }); } - pub const BuiltinName = enum(u8) { - method, - headers, - status, - url, - body, - data - }; + pub const BuiltinName = enum(u8) { method, headers, status, url, body, data }; // intended to be more lightweight than ZigString pub fn fastGet(this: JSValue, global: *JSGlobalObject, builtin_name: BuiltinName) ?JSValue { diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig index fc5bff2b5..b2ff43a5a 100644 --- a/src/bun.js/webcore/request.zig +++ b/src/bun.js/webcore/request.zig @@ -260,7 +260,7 @@ pub const Request = struct { return ZigString.init(Properties.UTF8.navigate).toValue(globalThis); } - pub fn finalize(this: *Request) callconv(.C) void { + pub fn finalizeWithoutDeinit(this: *Request) void { if (this.headers) |headers| { headers.deref(); this.headers = null; @@ -275,7 +275,10 @@ pub const Request = struct { _ = signal.unref(); this.signal = null; } + } + pub fn finalize(this: *Request) callconv(.C) void { + this.finalizeWithoutDeinit(); bun.default_allocator.destroy(this); } @@ -402,10 +405,7 @@ pub const Request = struct { if (Body.Value.fromJS(globalThis, body_)) |body| { request.body = body; } else { - if (request.headers) |head| { - head.deref(); - } - + request.finalizeWithoutDeinit(); return null; } } @@ -419,23 +419,6 @@ pub const Request = struct { } }, else => { - if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| { - request.headers = req_init.headers; - request.method = req_init.method; - } - - if (arguments[1].fastGet(globalThis, .body)) |body_| { - if (Body.Value.fromJS(globalThis, body_)) |body| { - request.body = body; - } else { - if (request.headers) |head| { - head.deref(); - } - - return null; - } - } - if (arguments[1].get(globalThis, "signal")) |signal_| { if (AbortSignal.fromJS(signal_)) |signal| { //Keep it alive @@ -444,10 +427,21 @@ pub const Request = struct { } else { globalThis.throw("Failed to construct 'Request': member signal is not of type AbortSignal.", .{}); - if (request.headers) |head| { - head.deref(); - } + request.finalizeWithoutDeinit(); + return null; + } + } + + if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| { + request.headers = req_init.headers; + request.method = req_init.method; + } + if (arguments[1].fastGet(globalThis, .body)) |body_| { + if (Body.Value.fromJS(globalThis, body_)) |body| { + request.body = body; + } else { + request.finalizeWithoutDeinit(); return null; } } diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 39ee205c2..cd4b1be4c 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -623,6 +623,12 @@ pub const Fetch = struct { /// We always clone url and proxy (if informed) url_proxy_buffer: []const u8 = "", + signal: ?*JSC.AbortSignal = null, + aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + + // must be stored because AbortSignal stores reason weakly + abort_reason: JSValue = JSValue.zero, + pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { return FetchTasklet{}; } @@ -641,6 +647,14 @@ pub const Fetch = struct { this.result.deinitMetadata(); this.response_buffer.deinit(); this.request_body.detach(); + + if (this.abort_reason != .zero) this.abort_reason.unprotect(); + + if (this.signal) |signal| { + signal.cleanNativeBindings(this); + _ = signal.unref(); + this.signal = null; + } } pub fn deinit(this: *FetchTasklet) void { @@ -688,29 +702,23 @@ pub const Fetch = struct { } pub fn onReject(this: *FetchTasklet) JSValue { + if (this.signal) |signal| { + _ = signal.unref(); + this.signal = null; + } + + if (!this.abort_reason.isEmptyOrUndefinedOrNull()) { + return this.abort_reason; + } + if (this.result.isTimeout()) { - //Timeout with reason - if (this.result.reason) |exception| { - if (!exception.isEmptyOrUndefinedOrNull()) { - return exception; - } - } - //Timeout without reason - const exception = JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this); - return exception; + // Timeout without reason + return JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this); } if (this.result.isAbort()) { - //Abort can be TimeoutError (AbortSignal.timeout(ms)) or AbortError so we need to detect - if (this.result.reason) |exception| { - if (!exception.isEmptyOrUndefinedOrNull()) { - return exception; - } - } - - //Abort without reason - const exception = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this); - return exception; + // Abort without reason + return JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this); } const fetch_error = JSC.SystemError{ @@ -793,6 +801,7 @@ pub const Fetch = struct { .request_headers = fetch_options.headers, .ref = JSC.napi.Ref.create(globalThis, promise), .url_proxy_buffer = fetch_options.url_proxy_buffer, + .signal = fetch_options.signal, }; if (fetch_tasklet.request_body.store()) |store| { @@ -808,12 +817,24 @@ pub const Fetch = struct { proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url); } - fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New( - *FetchTasklet, - FetchTasklet.callback, - ).init( - fetch_tasklet, - ), proxy, fetch_options.signal); + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( + allocator, + fetch_options.method, + fetch_options.url, + fetch_options.headers.entries, + fetch_options.headers.buf.items, + &fetch_tasklet.response_buffer, + fetch_tasklet.request_body.slice(), + fetch_options.timeout, + HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + fetch_tasklet, + ), + proxy, + if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, + ); if (!fetch_options.follow_redirects) { fetch_tasklet.http.?.client.remaining_redirect_count = 0; @@ -822,10 +843,35 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout; fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; + + if (fetch_tasklet.signal) |signal| { + fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); + } return fetch_tasklet; } - const FetchOptions = struct { method: Method, headers: Headers, body: AnyBlob, timeout: usize, disable_timeout: bool, disable_keepalive: bool, url: ZigURL, verbose: bool = false, follow_redirects: bool = true, proxy: ?ZigURL = null, url_proxy_buffer: []const u8 = "", signal: ?*JSC.AbortSignal = null, globalThis: ?*JSGlobalObject }; + pub fn abortListener(this: *FetchTasklet, reason: JSValue) void { + reason.ensureStillAlive(); + this.abort_reason = reason; + reason.protect(); + this.aborted.store(true, .Monotonic); + } + + const FetchOptions = struct { + method: Method, + headers: Headers, + body: AnyBlob, + timeout: usize, + disable_timeout: bool, + disable_keepalive: bool, + url: ZigURL, + verbose: bool = false, + follow_redirects: bool = true, + proxy: ?ZigURL = null, + url_proxy_buffer: []const u8 = "", + signal: ?*JSC.AbortSignal = null, + globalThis: ?*JSGlobalObject, + }; pub fn queue( allocator: std.mem.Allocator, @@ -1205,9 +1251,23 @@ pub const Fetch = struct { _ = FetchTasklet.queue( default_allocator, globalThis, - .{ .method = method, .url = url, .headers = headers orelse Headers{ - .allocator = bun.default_allocator, - }, .body = body, .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, .follow_redirects = follow_redirects, .verbose = verbose, .proxy = proxy, .url_proxy_buffer = url_proxy_buffer, .signal = signal, .globalThis = globalThis }, + .{ + .method = method, + .url = url, + .headers = headers orelse Headers{ + .allocator = bun.default_allocator, + }, + .body = body, + .timeout = std.time.ns_per_hour, + .disable_keepalive = disable_keepalive, + .disable_timeout = disable_timeout, + .follow_redirects = follow_redirects, + .verbose = verbose, + .proxy = proxy, + .url_proxy_buffer = url_proxy_buffer, + .signal = signal, + .globalThis = globalThis, + }, promise_val, ) catch unreachable; return promise_val.asRef(); |
