aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-09-04 16:26:49 -0300
committerGravatar GitHub <noreply@github.com> 2023-09-04 12:26:49 -0700
commit2d80f94edafe09329b027424b32908632694553d (patch)
treee7de3a4fa31096a26d4dda37eedf18cb51a902a1 /src/bun.js/api
parent18767906db0dd29b2d898f84a023d403c3084d6e (diff)
downloadbun-2d80f94edafe09329b027424b32908632694553d.tar.gz
bun-2d80f94edafe09329b027424b32908632694553d.tar.zst
bun-2d80f94edafe09329b027424b32908632694553d.zip
fix(HTMLRewriter) buffer response before transform (#4418)
* html rewriter response buffering * pipe the data when marked as used * fix empty response * add some fetch tests * deinit parent stream * fix decompression * keep byte_reader alive * update builds * remove nonsense * was not nonsense after all * protect tmp ret value from GC, fix readable strong ref deinit/init * fmt * if we detach the stream we cannot update the fetch stream * detach checking source * more tests, progress with javascript and Direct sink * drop support for pure readable stream for now * more fixes --------- Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
Diffstat (limited to 'src/bun.js/api')
-rw-r--r--src/bun.js/api/html_rewriter.zig138
-rw-r--r--src/bun.js/api/server.zig30
2 files changed, 114 insertions, 54 deletions
diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig
index 86c2c4b53..1bda47512 100644
--- a/src/bun.js/api/html_rewriter.zig
+++ b/src/bun.js/api/html_rewriter.zig
@@ -170,10 +170,6 @@ pub const HTMLRewriter = struct {
}
pub fn transform_(this: *HTMLRewriter, global: *JSGlobalObject, response: *Response) JSValue {
- if (response.body.len() == 0 and !(response.body.value == .Blob and response.body.value.Blob.needsToReadFile())) {
- return this.returnEmptyResponse(global, response);
- }
-
return this.beginTransform(global, response);
}
@@ -344,7 +340,9 @@ pub const HTMLRewriter = struct {
rewriter: *LOLHTML.HTMLRewriter,
context: LOLHTMLContext,
response: *Response,
- input: JSC.WebCore.AnyBlob = undefined,
+ bodyValueBufferer: ?JSC.WebCore.BodyValueBufferer = null,
+ tmp_sync_error: ?JSC.JSValue = null,
+ // const log = bun.Output.scoped(.BufferOutputSink, false);
pub fn init(context: LOLHTMLContext, global: *JSGlobalObject, original: *Response, builder: *LOLHTML.HTMLRewriter.Builder) JSValue {
var result = bun.default_allocator.create(Response) catch unreachable;
var sink = bun.default_allocator.create(BufferOutputSink) catch unreachable;
@@ -409,47 +407,77 @@ pub const HTMLRewriter = struct {
result.url = original.url.clone();
result.status_text = original.status_text.clone();
-
- var input = original.body.value.useAsAnyBlob();
- sink.input = input;
-
- const is_pending = input.needsToReadFile();
- defer if (!is_pending) input.detach();
-
- if (is_pending) {
- sink.input.Blob.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoading, global);
- } else if (sink.runOutputSink(input.slice(), false, false)) |error_value| {
- return error_value;
+ var value = original.getBodyValue();
+ sink.bodyValueBufferer = JSC.WebCore.BodyValueBufferer.init(sink, onFinishedBuffering, sink.global, bun.default_allocator);
+ sink.bodyValueBufferer.?.run(value) catch |buffering_error| {
+ return switch (buffering_error) {
+ error.StreamAlreadyUsed => {
+ var err = JSC.SystemError{
+ .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE))),
+ .message = bun.String.static("Stream already used, please create a new one"),
+ };
+ return err.toErrorInstance(sink.global);
+ },
+ error.InvalidStream => {
+ var err = JSC.SystemError{
+ .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE))),
+ .message = bun.String.static("Invalid stream"),
+ };
+ return err.toErrorInstance(sink.global);
+ },
+ else => {
+ var err = JSC.SystemError{
+ .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE))),
+ .message = bun.String.static("Failed to pipe stream"),
+ };
+ return err.toErrorInstance(sink.global);
+ },
+ };
+ };
+ // sync error occurs
+ if (sink.tmp_sync_error) |err| {
+ err.ensureStillAlive();
+ err.unprotect();
+ sink.tmp_sync_error = null;
+ return err;
}
// Hold off on cloning until we're actually done.
return sink.response.toJS(sink.global);
}
- pub fn onFinishedLoading(sink: *BufferOutputSink, bytes: JSC.WebCore.Blob.Store.ReadFile.ResultType) void {
- switch (bytes) {
- .err => |err| {
- if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and
- sink.response.body.value.Locked.promise == null)
- {
- sink.response.body.value = .{ .Empty = {} };
- // is there a pending promise?
- // we will need to reject it
- } else if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and
- sink.response.body.value.Locked.promise != null)
- {
- sink.response.body.value.Locked.onReceiveValue = null;
- sink.response.body.value.Locked.task = null;
- }
+ pub fn onFinishedBuffering(ctx: *anyopaque, bytes: []const u8, js_err: ?JSC.JSValue, is_async: bool) void {
+ const sink = bun.cast(*BufferOutputSink, ctx);
+ if (js_err) |err| {
+ if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and
+ sink.response.body.value.Locked.promise == null)
+ {
+ sink.response.body.value = .{ .Empty = {} };
+ // is there a pending promise?
+ // we will need to reject it
+ } else if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and
+ sink.response.body.value.Locked.promise != null)
+ {
+ sink.response.body.value.Locked.onReceiveValue = null;
+ sink.response.body.value.Locked.task = null;
+ }
+ if (is_async) {
+ sink.response.body.value.toErrorInstance(err, sink.global);
+ } else {
+ var ret_err = throwLOLHTMLError(sink.global);
+ ret_err.ensureStillAlive();
+ ret_err.protect();
+ sink.tmp_sync_error = ret_err;
+ }
+ sink.rewriter.end() catch {};
+ sink.deinit();
+ return;
+ }
- sink.response.body.value.toErrorInstance(err.toErrorInstance(sink.global), sink.global);
- sink.rewriter.end() catch {};
- sink.deinit();
- return;
- },
- .result => |data| {
- _ = sink.runOutputSink(data.buf, true, data.is_temporary);
- },
+ if (sink.runOutputSink(bytes, is_async)) |ret_err| {
+ ret_err.ensureStillAlive();
+ ret_err.protect();
+ sink.tmp_sync_error = ret_err;
}
}
@@ -457,11 +485,7 @@ pub const HTMLRewriter = struct {
sink: *BufferOutputSink,
bytes: []const u8,
is_async: bool,
- free_bytes_on_end: bool,
) ?JSValue {
- defer if (free_bytes_on_end)
- bun.default_allocator.free(bun.constStrToU8(bytes));
-
sink.bytes.growBy(bytes.len) catch unreachable;
var global = sink.global;
var response = sink.response;
@@ -499,13 +523,19 @@ pub const HTMLRewriter = struct {
pub fn done(this: *BufferOutputSink) void {
var prev_value = this.response.body.value;
- var bytes = this.bytes.toOwnedSliceLeaky();
- this.response.body.value = JSC.WebCore.Body.Value.createBlobValue(
- bytes,
- bun.default_allocator,
+ this.response.body.value = JSC.WebCore.Body.Value{
+ .InternalBlob = .{
+ .bytes = this.bytes.list.toManaged(bun.default_allocator),
+ },
+ };
- true,
- );
+ this.bytes = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
prev_value.resolve(
&this.response.body.value,
this.global,
@@ -518,6 +548,16 @@ pub const HTMLRewriter = struct {
pub fn deinit(this: *BufferOutputSink) void {
this.bytes.deinit();
+ if (this.bodyValueBufferer != null) {
+ var bufferer = this.bodyValueBufferer.?;
+ bufferer.deinit();
+ }
+
+ if (this.tmp_sync_error) |ret_err| {
+ // this should never happens, but still we avoid future leaks
+ ret_err.unprotect();
+ this.tmp_sync_error = null;
+ }
this.context.deinit(bun.default_allocator);
}
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 5bbf159d2..85d4dadb5 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1170,6 +1170,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
sink: ?*ResponseStream.JSSink = null,
byte_stream: ?*JSC.WebCore.ByteStream = null,
+ // reference to the readable stream / byte_stream alive
+ readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{},
/// Used in errors
pathname: bun.String = bun.String.empty,
@@ -1666,6 +1668,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
stream.unpipe();
}
+ this.readable_stream_ref.deinit();
+
if (!this.pathname.isEmpty()) {
this.pathname.deref();
this.pathname = bun.String.empty;
@@ -2594,24 +2598,31 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
std.debug.assert(byte_stream.pipe.ctx == null);
std.debug.assert(this.byte_stream == null);
- stream.detach(this.server.globalThis);
-
if (this.resp == null) {
- byte_stream.parent().deinit();
+ // we don't have a response, so we can discard the stream
+ stream.detachIfPossible(this.server.globalThis);
return;
}
const resp = this.resp.?;
-
// If we've received the complete body by the time this function is called
// we can avoid streaming it and just send it all at once.
if (byte_stream.has_received_last_chunk) {
this.blob.from(byte_stream.buffer);
- byte_stream.parent().deinit();
this.doRenderBlob();
+ // is safe to detach here because we're not going to receive any more data
+ stream.detachIfPossible(this.server.globalThis);
return;
}
byte_stream.pipe = JSC.WebCore.Pipe.New(@This(), onPipe).init(this);
+ this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(stream, this.server.globalThis) catch {
+ // Invalid Stream
+ this.renderMissing();
+ return;
+ };
+ // we now hold a reference so we can safely ask to detach and will be detached when the last ref is dropped
+ stream.detachIfPossible(this.server.globalThis);
+
this.byte_stream = byte_stream;
this.response_buf_owned = byte_stream.buffer.moveToUnmanaged();
@@ -2632,6 +2643,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
+ if (lock.onReceiveValue != null or lock.task != null) {
+ // someone else is waiting for the stream or waiting for `onStartStreaming`
+ const readable = value.toReadableStream(this.server.globalThis);
+ readable.ensureStillAlive();
+ readable.protect();
+ this.doRenderWithBody(value);
+ return;
+ }
+
// when there's no stream, we need to
lock.onReceiveValue = doRenderWithBodyLocked;
lock.task = this;