aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-03-01 18:15:52 -0300
committerGravatar GitHub <noreply@github.com> 2023-03-01 13:15:52 -0800
commitcf8568ccff4ba5f6e54e8e3b552468069b68bc9e (patch)
treeda6e4088eda132fd59c3874cd49de8a38af020d1
parent56ca48ece88c0de854ec20c5e71a639fee6ccb0f (diff)
downloadbun-cf8568ccff4ba5f6e54e8e3b552468069b68bc9e.tar.gz
bun-cf8568ccff4ba5f6e54e8e3b552468069b68bc9e.tar.zst
bun-cf8568ccff4ba5f6e54e8e3b552468069b68bc9e.zip
fix deinit behavior when connection is aborted using ResponseStream and abort event behavior (#2252)
* fix deinit behavior when connection is aborted using ResponseStream * fix abort handling on stream, and get better tests * avoid segfault by trying to deinit 2x when aborted * make tests more reliable * more reliable onResolveStream after aborted * add test case for not firing the abort signal
-rw-r--r--src/bun.js/api/server.zig92
-rw-r--r--src/bun.js/webcore/streams.zig2
-rw-r--r--test/bun.js/bun-server.test.ts114
3 files changed, 174 insertions, 34 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 546350679..3a9a31ec9 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -635,6 +635,7 @@ pub fn NewRequestContextStackAllocator(comptime RequestContext: type, comptime c
fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comptime ThisServer: type) type {
return struct {
const RequestContext = @This();
+ const ctxLog = Output.scoped(.RequestContext, false);
const App = uws.NewApp(ssl_enabled);
pub threadlocal var pool: ?*RequestContext.RequestContextStackAllocator = null;
pub threadlocal var pool_allocator: std.mem.Allocator = undefined;
@@ -738,6 +739,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn finalizeForAbort(this: *RequestContext) void {
+ streamLog("finalizeForAbort", .{});
this.pending_promises_for_abort -|= 1;
if (this.pending_promises_for_abort == 0) this.finalize();
}
@@ -988,7 +990,17 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn onAbort(this: *RequestContext, resp: *App.Response) void {
std.debug.assert(this.resp == resp);
std.debug.assert(!this.aborted);
+ //mark request as aborted
this.aborted = true;
+ //if have sink, call onAborted on sink
+ if (this.sink) |wrapper| {
+ wrapper.detach();
+ wrapper.sink.onAborted(resp);
+ this.sink = null;
+ wrapper.sink.destroy();
+ this.finalizeForAbort();
+ return;
+ }
// if we can, free the request now.
if (this.isDeadRequest()) {
@@ -1010,14 +1022,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// User called .blob(), .json(), text(), or .arrayBuffer() on the Request object
// but we received nothing or the connection was aborted
if (request_js.as(Request)) |req| {
- if (req.signal) |signal| {
- // if signal is not aborted, abort the signal
- if (!signal.aborted()) {
- const reason = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.server.globalThis);
- reason.ensureStillAlive();
- _ = signal.signal(reason);
- }
- }
+ this._signalAbort(req);
// the promise is pending
if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) {
@@ -1054,6 +1059,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
+ pub fn _signalAbort(this: *RequestContext, req: *Request) void {
+ //only call when actually aborted
+ if (!this.aborted) return;
+ //check if have a valid signal
+ if (req.signal) |signal| {
+ // if signal is not aborted, abort the signal
+ if (!signal.aborted()) {
+ const reason = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.server.globalThis);
+ reason.ensureStillAlive();
+ _ = signal.signal(reason);
+ }
+ }
+ }
+
pub fn markComplete(this: *RequestContext) void {
if (!this.has_marked_complete) this.server.onRequestComplete();
this.has_marked_complete = true;
@@ -1062,14 +1081,16 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// This function may be called multiple times
// so it's important that we can safely do that
pub fn finalizeWithoutDeinit(this: *RequestContext) void {
+ ctxLog("finalizeWithoutDeinit", .{});
this.blob.detach();
if (comptime Environment.allow_assert) {
std.debug.assert(!this.finalized);
this.finalized = true;
}
-
+
if (!this.response_jsvalue.isEmpty()) {
+ ctxLog("finalizeWithoutDeinit: response_jsvalue != .zero", .{});
if (this.response_protected) {
this.response_jsvalue.unprotect();
this.response_protected = false;
@@ -1078,6 +1099,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (this.request_js_object != null) {
+ ctxLog("finalizeWithoutDeinit: request_js_object != null", .{});
+
var request_js = this.request_js_object.?.value();
request_js.ensureStillAlive();
@@ -1087,14 +1110,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// User called .blob(), .json(), text(), or .arrayBuffer() on the Request object
// but we received nothing or the connection was aborted
if (request_js.as(Request)) |req| {
- if (req.signal) |signal| {
- // if signal is not aborted, abort the signal
- if (!signal.aborted()) {
- const reason = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.server.globalThis);
- reason.ensureStillAlive();
- _ = signal.signal(reason);
- }
- }
+ this._signalAbort(req);
// the promise is pending
if (req.body == .Locked and req.body.Locked.action != .none and req.body.Locked.promise != null) {
req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
@@ -1104,6 +1120,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (this.promise) |promise| {
+ ctxLog("finalizeWithoutDeinit: this.promise != null", .{});
this.promise = null;
if (promise.asAnyPromise()) |prom| {
@@ -1113,22 +1130,27 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (this.byte_stream) |stream| {
+ ctxLog("finalizeWithoutDeinit: stream != null", .{});
+
this.byte_stream = null;
stream.unpipe();
}
if (this.pathname.len > 0) {
+ ctxLog("finalizeWithoutDeinit: this.pathname.len > 0 null", .{});
this.allocator.free(bun.constStrToU8(this.pathname));
this.pathname = "";
}
}
pub fn finalize(this: *RequestContext) void {
+ ctxLog("finalize", .{});
this.finalizeWithoutDeinit();
this.markComplete();
this.deinit();
}
pub fn deinit(this: *RequestContext) void {
+ ctxLog("deinit", .{});
if (comptime Environment.allow_assert)
std.debug.assert(this.finalized);
@@ -1577,8 +1599,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("returned a promise", .{});
switch (promise.status(this.server.globalThis.vm())) {
.Pending => {
+ streamLog("promise still Pending", .{});
// TODO: should this timeout?
- this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
+ this.setAbortHandler();
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = stream,
@@ -1595,9 +1618,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.Fulfilled => {
+ streamLog("promise Fulfilled", .{});
this.handleResolveStream();
},
.Rejected => {
+ streamLog("promise Rejected", .{});
this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm()));
},
}
@@ -1631,7 +1656,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
- this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
+ this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
this.finalize();
stream.value.unprotect();
@@ -1790,7 +1815,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn handleResolveStream(req: *RequestContext) void {
- streamLog("onResolve", .{});
+ streamLog("handleResolveStream", .{});
+ //aborted already called finalizeForAbort at this stage
+ if(req.aborted) return;
+
var wrote_anything = false;
if (req.sink) |wrapper| {
wrapper.sink.pending_flush = null;
@@ -1809,33 +1837,30 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
resp.body.value = .{ .Used = {} };
}
}
-
- if (req.aborted) {
- req.finalizeForAbort();
- return;
- }
-
+
const responded = req.resp.hasResponded();
if (!responded and !wrote_anything) {
req.resp.clearAborted();
req.renderMissing();
return;
- } else if (!responded and wrote_anything and !req.aborted) {
+ } else if (!responded and wrote_anything) {
req.resp.clearAborted();
req.resp.endStream(req.shouldCloseConnection());
}
-
+
req.finalize();
}
pub fn onResolveStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
+ streamLog("onResolveStream", .{});
var args = callframe.arguments(2);
var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
req.handleResolveStream();
return JSValue.jsUndefined();
}
pub fn onRejectStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
+ streamLog("onRejectStream", .{});
const args = callframe.arguments(2);
var req = args.ptr[args.len - 1].asPromisePtr(@This());
var err = args.ptr[0];
@@ -1844,6 +1869,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void {
+ streamLog("handleRejectStream", .{});
var wrote_anything = req.has_written_status;
if (req.sink) |wrapper| {
@@ -1866,11 +1892,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onReject({any})", .{wrote_anything});
- if (req.aborted) {
- req.finalizeForAbort();
- return;
- }
-
if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) {
req.response_jsvalue.unprotect();
req.response_jsvalue = JSValue.zero;
@@ -1885,6 +1906,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
req.server.vm.runErrorHandler(err, &exception_list);
}
}
+ //aborted already called finalizeForAbort at this stage
+ if(req.aborted) return;
req.finalize();
return;
}
@@ -2056,6 +2079,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn doRender(this: *RequestContext) void {
+ ctxLog("render", .{});
+
if (this.aborted) {
this.finalizeForAbort();
return;
@@ -2295,6 +2320,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn render(this: *RequestContext, response: *JSC.WebCore.Response) void {
+ ctxLog("render", .{});
this.response_ptr = response;
this.doRender();
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 0a38c7ed0..61a83669d 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -2760,9 +2760,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn onAborted(this: *@This(), _: *UWSResponse) void {
log("onAborted()", .{});
- this.signal.close(null);
this.done = true;
this.aborted = true;
+ this.signal.close(null);
this.flushPromise();
this.finalize();
}
diff --git a/test/bun.js/bun-server.test.ts b/test/bun.js/bun-server.test.ts
index d5aae537e..6e0eab6fd 100644
--- a/test/bun.js/bun-server.test.ts
+++ b/test/bun.js/bun-server.test.ts
@@ -48,4 +48,118 @@ describe("Server", () => {
server.stop(true);
}
});
+
+ test("abort signal on server should only fire if aborted", async () => {
+ {
+ const abortController = new AbortController();
+
+ let signalOnServer = false;
+ const server = Bun.serve({
+ async fetch(req) {
+ req.signal.addEventListener("abort", () => {
+ signalOnServer = true;
+ });
+ return new Response("Hello");
+ },
+ port: 0,
+ });
+
+ try {
+ await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
+ } catch {}
+ expect(signalOnServer).toBe(false);
+ server.stop(true);
+ }
+ });
+
+ test("abort signal on server with direct stream", async () => {
+ {
+ let signalOnServer = false;
+ const abortController = new AbortController();
+
+ const server = Bun.serve({
+ async fetch(req) {
+ req.signal.addEventListener("abort", () => {
+ signalOnServer = true;
+ });
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ abortController.abort();
+
+ const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
+ controller.write(buffer);
+
+ //wait to detect the connection abortion
+ await Bun.sleep(15);
+
+ controller.close();
+ },
+ }),
+ {
+ headers: {
+ "Content-Encoding": "gzip",
+ "Content-Type": "text/html; charset=utf-8",
+ "Content-Length": "1",
+ },
+ },
+ );
+ },
+ port: 0,
+ });
+
+ try {
+ await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
+ } catch {}
+ await Bun.sleep(10);
+ expect(signalOnServer).toBe(true);
+ server.stop(true);
+ }
+ });
+
+ test("abort signal on server with stream", async () => {
+ {
+ let signalOnServer = false;
+ const abortController = new AbortController();
+
+ const server = Bun.serve({
+ async fetch(req) {
+ req.signal.addEventListener("abort", () => {
+ signalOnServer = true;
+ });
+ return new Response(
+ new ReadableStream({
+ async pull(controller) {
+ abortController.abort();
+
+ const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
+ controller.enqueue(buffer);
+
+ //wait to detect the connection abortion
+ await Bun.sleep(15);
+
+ controller.close();
+ },
+ }),
+ {
+ headers: {
+ "Content-Encoding": "gzip",
+ "Content-Type": "text/html; charset=utf-8",
+ "Content-Length": "1",
+ },
+ },
+ );
+ },
+ port: 0,
+ });
+
+ try {
+ await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
+ } catch {}
+ await Bun.sleep(10);
+ expect(signalOnServer).toBe(true);
+ server.stop(true);
+ }
+ });
});