diff options
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r-- | src/bun.js/api/server.zig | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 5bbf159d2..85d4dadb5 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1170,6 +1170,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp sink: ?*ResponseStream.JSSink = null, byte_stream: ?*JSC.WebCore.ByteStream = null, + // reference to the readable stream / byte_stream alive + readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{}, /// Used in errors pathname: bun.String = bun.String.empty, @@ -1666,6 +1668,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp stream.unpipe(); } + this.readable_stream_ref.deinit(); + if (!this.pathname.isEmpty()) { this.pathname.deref(); this.pathname = bun.String.empty; @@ -2594,24 +2598,31 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp std.debug.assert(byte_stream.pipe.ctx == null); std.debug.assert(this.byte_stream == null); - stream.detach(this.server.globalThis); - if (this.resp == null) { - byte_stream.parent().deinit(); + // we don't have a response, so we can discard the stream + stream.detachIfPossible(this.server.globalThis); return; } const resp = this.resp.?; - // If we've received the complete body by the time this function is called // we can avoid streaming it and just send it all at once. if (byte_stream.has_received_last_chunk) { this.blob.from(byte_stream.buffer); - byte_stream.parent().deinit(); this.doRenderBlob(); + // is safe to detach here because we're not going to receive any more data + stream.detachIfPossible(this.server.globalThis); return; } byte_stream.pipe = JSC.WebCore.Pipe.New(@This(), onPipe).init(this); + this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(stream, this.server.globalThis) catch { + // Invalid Stream + this.renderMissing(); + return; + }; + // we now hold a reference so we can safely ask to detach and will be detached when the last ref is dropped + stream.detachIfPossible(this.server.globalThis); + this.byte_stream = byte_stream; this.response_buf_owned = byte_stream.buffer.moveToUnmanaged(); @@ -2632,6 +2643,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + if (lock.onReceiveValue != null or lock.task != null) { + // someone else is waiting for the stream or waiting for `onStartStreaming` + const readable = value.toReadableStream(this.server.globalThis); + readable.ensureStillAlive(); + readable.protect(); + this.doRenderWithBody(value); + return; + } + // when there's no stream, we need to lock.onReceiveValue = doRenderWithBodyLocked; lock.task = this; |