From 2d80f94edafe09329b027424b32908632694553d Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Mon, 4 Sep 2023 16:26:49 -0300 Subject: fix(HTMLRewriter) buffer response before transform (#4418) * html rewriter response buffering * pipe the data when marked as used * fix empty response * add some fetch tests * deinit parent stream * fix decompression * keep byte_reader alive * update builds * remove nonsense * was not nonsense after all * protect tmp ret value from GC, fix readable strong ref deinit/init * fmt * if we detach the stream we cannot update the fetch stream * detach checking source * more tests, progress with javascript and Direct sink * drop support for pure readable stream for now * more fixes --------- Co-authored-by: Jarred Sumner --- src/bun.js/api/server.zig | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) (limited to 'src/bun.js/api/server.zig') diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 5bbf159d2..85d4dadb5 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1170,6 +1170,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp sink: ?*ResponseStream.JSSink = null, byte_stream: ?*JSC.WebCore.ByteStream = null, + // reference to the readable stream / byte_stream alive + readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{}, /// Used in errors pathname: bun.String = bun.String.empty, @@ -1666,6 +1668,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp stream.unpipe(); } + this.readable_stream_ref.deinit(); + if (!this.pathname.isEmpty()) { this.pathname.deref(); this.pathname = bun.String.empty; @@ -2594,24 +2598,31 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp std.debug.assert(byte_stream.pipe.ctx == null); std.debug.assert(this.byte_stream == null); - stream.detach(this.server.globalThis); - if (this.resp == null) { - byte_stream.parent().deinit(); + // we don't have a response, so we can discard the stream + stream.detachIfPossible(this.server.globalThis); return; } const resp = this.resp.?; - // If we've received the complete body by the time this function is called // we can avoid streaming it and just send it all at once. if (byte_stream.has_received_last_chunk) { this.blob.from(byte_stream.buffer); - byte_stream.parent().deinit(); this.doRenderBlob(); + // is safe to detach here because we're not going to receive any more data + stream.detachIfPossible(this.server.globalThis); return; } byte_stream.pipe = JSC.WebCore.Pipe.New(@This(), onPipe).init(this); + this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(stream, this.server.globalThis) catch { + // Invalid Stream + this.renderMissing(); + return; + }; + // we now hold a reference so we can safely ask to detach and will be detached when the last ref is dropped + stream.detachIfPossible(this.server.globalThis); + this.byte_stream = byte_stream; this.response_buf_owned = byte_stream.buffer.moveToUnmanaged(); @@ -2632,6 +2643,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + if (lock.onReceiveValue != null or lock.task != null) { + // someone else is waiting for the stream or waiting for `onStartStreaming` + const readable = value.toReadableStream(this.server.globalThis); + readable.ensureStillAlive(); + readable.protect(); + this.doRenderWithBody(value); + return; + } + // when there's no stream, we need to lock.onReceiveValue = doRenderWithBodyLocked; lock.task = this; -- cgit v1.2.3