aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar cirospaciari <ciro.spaciari@gmail.com> 2023-09-07 15:53:01 -0300
committerGravatar cirospaciari <ciro.spaciari@gmail.com> 2023-09-07 15:53:01 -0300
commitab0cb34cf0fa0ac8d9c89af6994f7a79cd86427e (patch)
treeda72aae73889e73f97c6a548d433bf35d17e8c58
parent78b31e588a055e79dae3e4f8bcddfac6aec4533a (diff)
downloadbun-ab0cb34cf0fa0ac8d9c89af6994f7a79cd86427e.tar.gz
bun-ab0cb34cf0fa0ac8d9c89af6994f7a79cd86427e.tar.zst
bun-ab0cb34cf0fa0ac8d9c89af6994f7a79cd86427e.zip
fix partial body
-rw-r--r--src/bun.js/webcore/response.zig189
1 files changed, 114 insertions, 75 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 192ddf677..4d73e6dca 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -619,11 +619,11 @@ pub const Fetch = struct {
}
const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue;
-
+
pub const HTTPClientQueueResult = struct {
body: MutableString,
has_more: bool,
- redirected: bool,
+ redirected: bool,
fail: anyerror,
certificate_info: ?HTTPClient.CertificateInfo,
/// For Http Client requests
@@ -637,7 +637,7 @@ pub const Fetch = struct {
pub fn isSuccess(this: *HTTPClientQueueResult) bool {
return this.fail == error.NoError;
}
-
+
pub fn isTimeout(this: *HTTPClientQueueResult) bool {
return this.fail == error.Timeout;
}
@@ -645,7 +645,7 @@ pub const Fetch = struct {
pub fn isAbort(this: *HTTPClientQueueResult) bool {
return this.fail == error.Aborted;
}
-
+
fn getSizeHint(this: *HTTPClientQueueResult) Blob.SizeType {
return switch (this.body_size) {
.content_length => @truncate(this.body_size.content_length),
@@ -669,7 +669,7 @@ pub const Fetch = struct {
const log = Output.scoped(.FetchTasklet, false);
http: ?*HTTPClient.AsyncHTTP = null,
-
+
result_queue: HTTPClientResultQueue = HTTPClientResultQueue{},
metadata: ?HTTPClient.HTTPResponseMetadata = null,
@@ -678,6 +678,8 @@ pub const Fetch = struct {
request_body: HTTPRequestBody = undefined,
/// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
+ // all shedule buffers are stored here when not streaming
+ scheduled_response_buffer: MutableString = undefined,
/// response strong ref
response: JSC.Strong = .{},
/// stream strong ref if any is available
@@ -744,6 +746,59 @@ pub const Fetch = struct {
return FetchTasklet{};
}
+ fn getFullResponseBodyValue(this: *FetchTasklet, result: *HTTPClientQueueResult) Body.Value {
+ var body_value: Body.Value = undefined;
+ if (this.scheduled_response_buffer.list.items.len > 0) {
+ _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
+ this.memory_reporter.discard(this.scheduled_response_buffer.list.allocatedSlice());
+ body_value = Body.Value{
+ .InternalBlob = .{
+ .bytes = this.scheduled_response_buffer.list.toManaged(bun.default_allocator),
+ },
+ };
+ this.scheduled_response_buffer = .{
+ .allocator = this.memory_reporter.allocator(),
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+ } else {
+ body_value = Body.Value{
+ .InternalBlob = .{
+ .bytes = result.body.list.toManaged(bun.default_allocator),
+ },
+ };
+ }
+
+ result.body = MutableString{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+ return body_value;
+ }
+
+ fn writeChunkToReadable(readable: JSC.WebCore.ReadableStream, chunk: []const u8, comptime has_more: bool) void {
+ if (has_more) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+ }
+
fn clearData(this: *FetchTasklet) void {
log("clearData", .{});
const allocator = this.memory_reporter.allocator();
@@ -771,6 +826,7 @@ pub const Fetch = struct {
}
this.response_buffer.deinit();
+ this.scheduled_response_buffer.deinit();
this.response.deinit();
this.readable_stream_ref.deinit();
@@ -841,29 +897,17 @@ pub const Fetch = struct {
if (this.readable_stream_ref.get()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = result.getSizeHint();
-
-
- // body can be marked as used but we still need to pipe the data
- var scheduled_response_buffer = result.body.list;
-
- const chunk = scheduled_response_buffer.items;
+ if (this.scheduled_response_buffer.list.items.len > 0) {
+ writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true);
+ this.scheduled_response_buffer.reset();
+ }
if (result.has_more) {
- readable.ptr.Bytes.onData(
- .{
- .temporary = bun.ByteList.initConst(chunk),
- },
- bun.default_allocator,
- );
-
+ writeChunkToReadable(readable, result.body.list.items, true);
} else {
- readable.ptr.Bytes.onData(
- .{
- .temporary_and_done = bun.ByteList.initConst(chunk),
- },
- bun.default_allocator,
- );
+ writeChunkToReadable(readable, result.body.list.items, false);
}
+
return;
}
}
@@ -875,26 +919,15 @@ pub const Fetch = struct {
if (body.value.Locked.readable) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = result.getSizeHint();
-
- var scheduled_response_buffer = result.body.list;
-
- const chunk = scheduled_response_buffer.items;
+ if (this.scheduled_response_buffer.list.items.len > 0) {
+ writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true);
+ this.scheduled_response_buffer.reset();
+ }
if (result.has_more) {
- readable.ptr.Bytes.onData(
- .{
- .temporary = bun.ByteList.initConst(chunk),
- },
- bun.default_allocator,
- );
-
+ writeChunkToReadable(readable, result.body.list.items, true);
} else {
- readable.ptr.Bytes.onData(
- .{
- .temporary_and_done = bun.ByteList.initConst(chunk),
- },
- bun.default_allocator,
- );
+ writeChunkToReadable(readable, result.body.list.items, false);
}
return;
@@ -902,25 +935,17 @@ pub const Fetch = struct {
} else {
response.body.value.Locked.size_hint = result.getSizeHint();
}
+
// we will reach here when not streaming
- if (!result.has_more) {
- var scheduled_response_buffer = result.body.list;
+ if (result.has_more) {
+ // buffer more data
+ _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
+ } else {
// done resolve body
var old = body.value;
- var body_value = Body.Value{
- .InternalBlob = .{
- .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
- },
- };
- result.body = MutableString{
- .allocator = bun.default_allocator,
- .list = .{
- .items = &.{},
- .capacity = 0,
- },
- };
- response.body.value = body_value;
+
+ response.body.value = this.getFullResponseBodyValue(result);
if (old == .Locked) {
old.resolve(&response.body.value, this.global_this);
@@ -1028,7 +1053,6 @@ pub const Fetch = struct {
false => this.onReject(result),
};
-
promise_value.ensureStillAlive();
const promise = promise_value.asAnyPromise() orelse return;
@@ -1148,6 +1172,26 @@ pub const Fetch = struct {
};
}
+ var first_packed_buffer = this.scheduled_response_buffer.list;
+ // This means we have received part of the body but not the whole thing
+ if (first_packed_buffer.items.len > 0) {
+ this.memory_reporter.discard(first_packed_buffer.allocatedSlice());
+ this.scheduled_response_buffer = .{
+ .allocator = this.memory_reporter.allocator(),
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ return .{
+ .owned = .{
+ .list = first_packed_buffer.toManaged(bun.default_allocator),
+ .size_hint = this.size_hint,
+ },
+ };
+ }
+
return .{
.estimated_size = this.size_hint,
};
@@ -1164,23 +1208,11 @@ pub const Fetch = struct {
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
},
};
+ _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
return response;
}
- var scheduled_response_buffer = result.body.list;
- const response = Body.Value{
- .InternalBlob = .{
- .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
- },
- };
- result.body = MutableString{
- .allocator = bun.default_allocator,
- .list = .{
- .items = &.{},
- .capacity = 0,
- },
- };
- return response;
+ return this.getFullResponseBodyValue(result);
}
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator, result: *HTTPClientQueueResult) Response {
@@ -1227,6 +1259,13 @@ pub const Fetch = struct {
fetch_tasklet.* = .{
.size_hint = 0,
+ .scheduled_response_buffer = MutableString{
+ .allocator = fetch_options.memory_reporter.allocator(),
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ },
.response_buffer = MutableString{
.allocator = fetch_options.memory_reporter.allocator(),
.list = .{
@@ -1382,7 +1421,7 @@ pub const Fetch = struct {
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
log("callback success {} has_more {} bytes {}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len });
-
+
// metadata should be provided only once so we preserve it until we consume it
if (result.metadata) |metadata| {
log("added callback metadata", .{});
@@ -1392,10 +1431,10 @@ pub const Fetch = struct {
task.response_buffer = result.body.?.*;
var item = bun.default_allocator.create(HTTPClientQueueResult) catch @panic("OOM");
- item.* = .{
+ item.* = .{
.body_size = result.body_size,
- .has_more = result.has_more,
- .redirected = result.redirected,
+ .has_more = result.has_more,
+ .redirected = result.redirected,
.fail = result.fail,
.certificate_info = result.certificate_info,
.body = MutableString.initCopy(bun.default_allocator, result.body.?.*.list.items) catch @panic("OOM"),