aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api/server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r--src/bun.js/api/server.zig30
1 files changed, 25 insertions, 5 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 5bbf159d2..85d4dadb5 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1170,6 +1170,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
sink: ?*ResponseStream.JSSink = null,
byte_stream: ?*JSC.WebCore.ByteStream = null,
+ // reference to the readable stream / byte_stream alive
+ readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{},
/// Used in errors
pathname: bun.String = bun.String.empty,
@@ -1666,6 +1668,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
stream.unpipe();
}
+ this.readable_stream_ref.deinit();
+
if (!this.pathname.isEmpty()) {
this.pathname.deref();
this.pathname = bun.String.empty;
@@ -2594,24 +2598,31 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
std.debug.assert(byte_stream.pipe.ctx == null);
std.debug.assert(this.byte_stream == null);
- stream.detach(this.server.globalThis);
-
if (this.resp == null) {
- byte_stream.parent().deinit();
+ // we don't have a response, so we can discard the stream
+ stream.detachIfPossible(this.server.globalThis);
return;
}
const resp = this.resp.?;
-
// If we've received the complete body by the time this function is called
// we can avoid streaming it and just send it all at once.
if (byte_stream.has_received_last_chunk) {
this.blob.from(byte_stream.buffer);
- byte_stream.parent().deinit();
this.doRenderBlob();
+ // is safe to detach here because we're not going to receive any more data
+ stream.detachIfPossible(this.server.globalThis);
return;
}
byte_stream.pipe = JSC.WebCore.Pipe.New(@This(), onPipe).init(this);
+ this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(stream, this.server.globalThis) catch {
+ // Invalid Stream
+ this.renderMissing();
+ return;
+ };
+ // we now hold a reference so we can safely ask to detach and will be detached when the last ref is dropped
+ stream.detachIfPossible(this.server.globalThis);
+
this.byte_stream = byte_stream;
this.response_buf_owned = byte_stream.buffer.moveToUnmanaged();
@@ -2632,6 +2643,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
+ if (lock.onReceiveValue != null or lock.task != null) {
+ // someone else is waiting for the stream or waiting for `onStartStreaming`
+ const readable = value.toReadableStream(this.server.globalThis);
+ readable.ensureStillAlive();
+ readable.protect();
+ this.doRenderWithBody(value);
+ return;
+ }
+
// when there's no stream, we need to
lock.onReceiveValue = doRenderWithBodyLocked;
lock.task = this;