diff options
author | 2023-03-01 18:15:52 -0300 | |
---|---|---|
committer | 2023-03-01 13:15:52 -0800 | |
commit | cf8568ccff4ba5f6e54e8e3b552468069b68bc9e (patch) | |
tree | da6e4088eda132fd59c3874cd49de8a38af020d1 | |
parent | 56ca48ece88c0de854ec20c5e71a639fee6ccb0f (diff) | |
download | bun-cf8568ccff4ba5f6e54e8e3b552468069b68bc9e.tar.gz bun-cf8568ccff4ba5f6e54e8e3b552468069b68bc9e.tar.zst bun-cf8568ccff4ba5f6e54e8e3b552468069b68bc9e.zip |
fix deinit behavior when connection is aborted using ResponseStream and abort event behavior (#2252)
* fix deinit behavior when connection is aborted using ResponseStream
* fix abort handling on stream, and get better tests
* avoid segfault by trying to deinit 2x when aborted
* make tests more reliable
* more reliable onResolveStream after aborted
* add test case for not firing the abort signal
-rw-r--r-- | src/bun.js/api/server.zig | 92 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 2 | ||||
-rw-r--r-- | test/bun.js/bun-server.test.ts | 114 |
3 files changed, 174 insertions, 34 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 546350679..3a9a31ec9 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -635,6 +635,7 @@ pub fn NewRequestContextStackAllocator(comptime RequestContext: type, comptime c fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comptime ThisServer: type) type { return struct { const RequestContext = @This(); + const ctxLog = Output.scoped(.RequestContext, false); const App = uws.NewApp(ssl_enabled); pub threadlocal var pool: ?*RequestContext.RequestContextStackAllocator = null; pub threadlocal var pool_allocator: std.mem.Allocator = undefined; @@ -738,6 +739,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn finalizeForAbort(this: *RequestContext) void { + streamLog("finalizeForAbort", .{}); this.pending_promises_for_abort -|= 1; if (this.pending_promises_for_abort == 0) this.finalize(); } @@ -988,7 +990,17 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn onAbort(this: *RequestContext, resp: *App.Response) void { std.debug.assert(this.resp == resp); std.debug.assert(!this.aborted); + //mark request as aborted this.aborted = true; + //if have sink, call onAborted on sink + if (this.sink) |wrapper| { + wrapper.detach(); + wrapper.sink.onAborted(resp); + this.sink = null; + wrapper.sink.destroy(); + this.finalizeForAbort(); + return; + } // if we can, free the request now. if (this.isDeadRequest()) { @@ -1010,14 +1022,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // User called .blob(), .json(), text(), or .arrayBuffer() on the Request object // but we received nothing or the connection was aborted if (request_js.as(Request)) |req| { - if (req.signal) |signal| { - // if signal is not aborted, abort the signal - if (!signal.aborted()) { - const reason = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.server.globalThis); - reason.ensureStillAlive(); - _ = signal.signal(reason); - } - } + this._signalAbort(req); // the promise is pending if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) { @@ -1054,6 +1059,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + pub fn _signalAbort(this: *RequestContext, req: *Request) void { + //only call when actually aborted + if (!this.aborted) return; + //check if have a valid signal + if (req.signal) |signal| { + // if signal is not aborted, abort the signal + if (!signal.aborted()) { + const reason = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.server.globalThis); + reason.ensureStillAlive(); + _ = signal.signal(reason); + } + } + } + pub fn markComplete(this: *RequestContext) void { if (!this.has_marked_complete) this.server.onRequestComplete(); this.has_marked_complete = true; @@ -1062,14 +1081,16 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // This function may be called multiple times // so it's important that we can safely do that pub fn finalizeWithoutDeinit(this: *RequestContext) void { + ctxLog("finalizeWithoutDeinit", .{}); this.blob.detach(); if (comptime Environment.allow_assert) { std.debug.assert(!this.finalized); this.finalized = true; } - + if (!this.response_jsvalue.isEmpty()) { + ctxLog("finalizeWithoutDeinit: response_jsvalue != .zero", .{}); if (this.response_protected) { this.response_jsvalue.unprotect(); this.response_protected = false; @@ -1078,6 +1099,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (this.request_js_object != null) { + ctxLog("finalizeWithoutDeinit: request_js_object != null", .{}); + var request_js = this.request_js_object.?.value(); request_js.ensureStillAlive(); @@ -1087,14 +1110,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // User called .blob(), .json(), text(), or .arrayBuffer() on the Request object // but we received nothing or the connection was aborted if (request_js.as(Request)) |req| { - if (req.signal) |signal| { - // if signal is not aborted, abort the signal - if (!signal.aborted()) { - const reason = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.server.globalThis); - reason.ensureStillAlive(); - _ = signal.signal(reason); - } - } + this._signalAbort(req); // the promise is pending if (req.body == .Locked and req.body.Locked.action != .none and req.body.Locked.promise != null) { req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); @@ -1104,6 +1120,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (this.promise) |promise| { + ctxLog("finalizeWithoutDeinit: this.promise != null", .{}); this.promise = null; if (promise.asAnyPromise()) |prom| { @@ -1113,22 +1130,27 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (this.byte_stream) |stream| { + ctxLog("finalizeWithoutDeinit: stream != null", .{}); + this.byte_stream = null; stream.unpipe(); } if (this.pathname.len > 0) { + ctxLog("finalizeWithoutDeinit: this.pathname.len > 0 null", .{}); this.allocator.free(bun.constStrToU8(this.pathname)); this.pathname = ""; } } pub fn finalize(this: *RequestContext) void { + ctxLog("finalize", .{}); this.finalizeWithoutDeinit(); this.markComplete(); this.deinit(); } pub fn deinit(this: *RequestContext) void { + ctxLog("deinit", .{}); if (comptime Environment.allow_assert) std.debug.assert(this.finalized); @@ -1577,8 +1599,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("returned a promise", .{}); switch (promise.status(this.server.globalThis.vm())) { .Pending => { + streamLog("promise still Pending", .{}); // TODO: should this timeout? - this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); + this.setAbortHandler(); this.response_ptr.?.body.value = .{ .Locked = .{ .readable = stream, @@ -1595,9 +1618,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .Fulfilled => { + streamLog("promise Fulfilled", .{}); this.handleResolveStream(); }, .Rejected => { + streamLog("promise Rejected", .{}); this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm())); }, } @@ -1631,7 +1656,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } - this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); + this.setAbortHandler(); streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); this.finalize(); stream.value.unprotect(); @@ -1790,7 +1815,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn handleResolveStream(req: *RequestContext) void { - streamLog("onResolve", .{}); + streamLog("handleResolveStream", .{}); + //aborted already called finalizeForAbort at this stage + if(req.aborted) return; + var wrote_anything = false; if (req.sink) |wrapper| { wrapper.sink.pending_flush = null; @@ -1809,33 +1837,30 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp resp.body.value = .{ .Used = {} }; } } - - if (req.aborted) { - req.finalizeForAbort(); - return; - } - + const responded = req.resp.hasResponded(); if (!responded and !wrote_anything) { req.resp.clearAborted(); req.renderMissing(); return; - } else if (!responded and wrote_anything and !req.aborted) { + } else if (!responded and wrote_anything) { req.resp.clearAborted(); req.resp.endStream(req.shouldCloseConnection()); } - + req.finalize(); } pub fn onResolveStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { + streamLog("onResolveStream", .{}); var args = callframe.arguments(2); var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This()); req.handleResolveStream(); return JSValue.jsUndefined(); } pub fn onRejectStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { + streamLog("onRejectStream", .{}); const args = callframe.arguments(2); var req = args.ptr[args.len - 1].asPromisePtr(@This()); var err = args.ptr[0]; @@ -1844,6 +1869,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void { + streamLog("handleRejectStream", .{}); var wrote_anything = req.has_written_status; if (req.sink) |wrapper| { @@ -1866,11 +1892,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onReject({any})", .{wrote_anything}); - if (req.aborted) { - req.finalizeForAbort(); - return; - } - if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) { req.response_jsvalue.unprotect(); req.response_jsvalue = JSValue.zero; @@ -1885,6 +1906,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp req.server.vm.runErrorHandler(err, &exception_list); } } + //aborted already called finalizeForAbort at this stage + if(req.aborted) return; req.finalize(); return; } @@ -2056,6 +2079,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn doRender(this: *RequestContext) void { + ctxLog("render", .{}); + if (this.aborted) { this.finalizeForAbort(); return; @@ -2295,6 +2320,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn render(this: *RequestContext, response: *JSC.WebCore.Response) void { + ctxLog("render", .{}); this.response_ptr = response; this.doRender(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 0a38c7ed0..61a83669d 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2760,9 +2760,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn onAborted(this: *@This(), _: *UWSResponse) void { log("onAborted()", .{}); - this.signal.close(null); this.done = true; this.aborted = true; + this.signal.close(null); this.flushPromise(); this.finalize(); } diff --git a/test/bun.js/bun-server.test.ts b/test/bun.js/bun-server.test.ts index d5aae537e..6e0eab6fd 100644 --- a/test/bun.js/bun-server.test.ts +++ b/test/bun.js/bun-server.test.ts @@ -48,4 +48,118 @@ describe("Server", () => { server.stop(true); } }); + + test("abort signal on server should only fire if aborted", async () => { + { + const abortController = new AbortController(); + + let signalOnServer = false; + const server = Bun.serve({ + async fetch(req) { + req.signal.addEventListener("abort", () => { + signalOnServer = true; + }); + return new Response("Hello"); + }, + port: 0, + }); + + try { + await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal }); + } catch {} + expect(signalOnServer).toBe(false); + server.stop(true); + } + }); + + test("abort signal on server with direct stream", async () => { + { + let signalOnServer = false; + const abortController = new AbortController(); + + const server = Bun.serve({ + async fetch(req) { + req.signal.addEventListener("abort", () => { + signalOnServer = true; + }); + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + abortController.abort(); + + const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer(); + controller.write(buffer); + + //wait to detect the connection abortion + await Bun.sleep(15); + + controller.close(); + }, + }), + { + headers: { + "Content-Encoding": "gzip", + "Content-Type": "text/html; charset=utf-8", + "Content-Length": "1", + }, + }, + ); + }, + port: 0, + }); + + try { + await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal }); + } catch {} + await Bun.sleep(10); + expect(signalOnServer).toBe(true); + server.stop(true); + } + }); + + test("abort signal on server with stream", async () => { + { + let signalOnServer = false; + const abortController = new AbortController(); + + const server = Bun.serve({ + async fetch(req) { + req.signal.addEventListener("abort", () => { + signalOnServer = true; + }); + return new Response( + new ReadableStream({ + async pull(controller) { + abortController.abort(); + + const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer(); + controller.enqueue(buffer); + + //wait to detect the connection abortion + await Bun.sleep(15); + + controller.close(); + }, + }), + { + headers: { + "Content-Encoding": "gzip", + "Content-Type": "text/html; charset=utf-8", + "Content-Length": "1", + }, + }, + ); + }, + port: 0, + }); + + try { + await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal }); + } catch {} + await Bun.sleep(10); + expect(signalOnServer).toBe(true); + server.stop(true); + } + }); }); |