diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/webcore/response.zig | 189 |
1 files changed, 114 insertions, 75 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 192ddf677..4d73e6dca 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -619,11 +619,11 @@ pub const Fetch = struct { } const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; - + pub const HTTPClientQueueResult = struct { body: MutableString, has_more: bool, - redirected: bool, + redirected: bool, fail: anyerror, certificate_info: ?HTTPClient.CertificateInfo, /// For Http Client requests @@ -637,7 +637,7 @@ pub const Fetch = struct { pub fn isSuccess(this: *HTTPClientQueueResult) bool { return this.fail == error.NoError; } - + pub fn isTimeout(this: *HTTPClientQueueResult) bool { return this.fail == error.Timeout; } @@ -645,7 +645,7 @@ pub const Fetch = struct { pub fn isAbort(this: *HTTPClientQueueResult) bool { return this.fail == error.Aborted; } - + fn getSizeHint(this: *HTTPClientQueueResult) Blob.SizeType { return switch (this.body_size) { .content_length => @truncate(this.body_size.content_length), @@ -669,7 +669,7 @@ pub const Fetch = struct { const log = Output.scoped(.FetchTasklet, false); http: ?*HTTPClient.AsyncHTTP = null, - + result_queue: HTTPClientResultQueue = HTTPClientResultQueue{}, metadata: ?HTTPClient.HTTPResponseMetadata = null, @@ -678,6 +678,8 @@ pub const Fetch = struct { request_body: HTTPRequestBody = undefined, /// buffer being used by AsyncHTTP response_buffer: MutableString = undefined, + // all shedule buffers are stored here when not streaming + scheduled_response_buffer: MutableString = undefined, /// response strong ref response: JSC.Strong = .{}, /// stream strong ref if any is available @@ -744,6 +746,59 @@ pub const Fetch = struct { return FetchTasklet{}; } + fn getFullResponseBodyValue(this: *FetchTasklet, result: *HTTPClientQueueResult) Body.Value { + var body_value: Body.Value = undefined; + if (this.scheduled_response_buffer.list.items.len > 0) { + _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM"); + this.memory_reporter.discard(this.scheduled_response_buffer.list.allocatedSlice()); + body_value = Body.Value{ + .InternalBlob = .{ + .bytes = this.scheduled_response_buffer.list.toManaged(bun.default_allocator), + }, + }; + this.scheduled_response_buffer = .{ + .allocator = this.memory_reporter.allocator(), + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + } else { + body_value = Body.Value{ + .InternalBlob = .{ + .bytes = result.body.list.toManaged(bun.default_allocator), + }, + }; + } + + result.body = MutableString{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + return body_value; + } + + fn writeChunkToReadable(readable: JSC.WebCore.ReadableStream, chunk: []const u8, comptime has_more: bool) void { + if (has_more) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } + } + fn clearData(this: *FetchTasklet) void { log("clearData", .{}); const allocator = this.memory_reporter.allocator(); @@ -771,6 +826,7 @@ pub const Fetch = struct { } this.response_buffer.deinit(); + this.scheduled_response_buffer.deinit(); this.response.deinit(); this.readable_stream_ref.deinit(); @@ -841,29 +897,17 @@ pub const Fetch = struct { if (this.readable_stream_ref.get()) |readable| { if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = result.getSizeHint(); - - - // body can be marked as used but we still need to pipe the data - var scheduled_response_buffer = result.body.list; - - const chunk = scheduled_response_buffer.items; + if (this.scheduled_response_buffer.list.items.len > 0) { + writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true); + this.scheduled_response_buffer.reset(); + } if (result.has_more) { - readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - + writeChunkToReadable(readable, result.body.list.items, true); } else { - readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); + writeChunkToReadable(readable, result.body.list.items, false); } + return; } } @@ -875,26 +919,15 @@ pub const Fetch = struct { if (body.value.Locked.readable) |readable| { if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = result.getSizeHint(); - - var scheduled_response_buffer = result.body.list; - - const chunk = scheduled_response_buffer.items; + if (this.scheduled_response_buffer.list.items.len > 0) { + writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true); + this.scheduled_response_buffer.reset(); + } if (result.has_more) { - readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - + writeChunkToReadable(readable, result.body.list.items, true); } else { - readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); + writeChunkToReadable(readable, result.body.list.items, false); } return; @@ -902,25 +935,17 @@ pub const Fetch = struct { } else { response.body.value.Locked.size_hint = result.getSizeHint(); } + // we will reach here when not streaming - if (!result.has_more) { - var scheduled_response_buffer = result.body.list; + if (result.has_more) { + // buffer more data + _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM"); + } else { // done resolve body var old = body.value; - var body_value = Body.Value{ - .InternalBlob = .{ - .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), - }, - }; - result.body = MutableString{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; - response.body.value = body_value; + + response.body.value = this.getFullResponseBodyValue(result); if (old == .Locked) { old.resolve(&response.body.value, this.global_this); @@ -1028,7 +1053,6 @@ pub const Fetch = struct { false => this.onReject(result), }; - promise_value.ensureStillAlive(); const promise = promise_value.asAnyPromise() orelse return; @@ -1148,6 +1172,26 @@ pub const Fetch = struct { }; } + var first_packed_buffer = this.scheduled_response_buffer.list; + // This means we have received part of the body but not the whole thing + if (first_packed_buffer.items.len > 0) { + this.memory_reporter.discard(first_packed_buffer.allocatedSlice()); + this.scheduled_response_buffer = .{ + .allocator = this.memory_reporter.allocator(), + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + return .{ + .owned = .{ + .list = first_packed_buffer.toManaged(bun.default_allocator), + .size_hint = this.size_hint, + }, + }; + } + return .{ .estimated_size = this.size_hint, }; @@ -1164,23 +1208,11 @@ pub const Fetch = struct { .onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable, }, }; + _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM"); return response; } - var scheduled_response_buffer = result.body.list; - const response = Body.Value{ - .InternalBlob = .{ - .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), - }, - }; - result.body = MutableString{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; - return response; + return this.getFullResponseBodyValue(result); } fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator, result: *HTTPClientQueueResult) Response { @@ -1227,6 +1259,13 @@ pub const Fetch = struct { fetch_tasklet.* = .{ .size_hint = 0, + .scheduled_response_buffer = MutableString{ + .allocator = fetch_options.memory_reporter.allocator(), + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, .response_buffer = MutableString{ .allocator = fetch_options.memory_reporter.allocator(), .list = .{ @@ -1382,7 +1421,7 @@ pub const Fetch = struct { pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void { log("callback success {} has_more {} bytes {}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len }); - + // metadata should be provided only once so we preserve it until we consume it if (result.metadata) |metadata| { log("added callback metadata", .{}); @@ -1392,10 +1431,10 @@ pub const Fetch = struct { task.response_buffer = result.body.?.*; var item = bun.default_allocator.create(HTTPClientQueueResult) catch @panic("OOM"); - item.* = .{ + item.* = .{ .body_size = result.body_size, - .has_more = result.has_more, - .redirected = result.redirected, + .has_more = result.has_more, + .redirected = result.redirected, .fail = result.fail, .certificate_info = result.certificate_info, .body = MutableString.initCopy(bun.default_allocator, result.body.?.*.list.items) catch @panic("OOM"), |