aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/response.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/response.zig')
-rw-r--r--src/bun.js/webcore/response.zig260
1 files changed, 241 insertions, 19 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 4b15efc0e..01ecfad36 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -44,6 +44,7 @@ const JSPrinter = bun.js_printer;
const picohttp = @import("root").bun.picohttp;
const StringJoiner = @import("../../string_joiner.zig");
const uws = @import("root").bun.uws;
+const Mutex = @import("../../lock.zig").Lock;
const InlineBlob = JSC.WebCore.InlineBlob;
const AnyBlob = JSC.WebCore.AnyBlob;
@@ -123,6 +124,7 @@ pub const Response = struct {
pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {
const Writer = @TypeOf(writer);
try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.body.len())});
+
{
formatter.indent += 1;
defer formatter.indent -|= 1;
@@ -618,11 +620,21 @@ pub const Fetch = struct {
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
+ /// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
+ /// buffer used to stream response to JS
+ scheduled_response_buffer: MutableString = undefined,
+ /// response strong ref
+ response: JSC.Strong = .{},
request_headers: Headers = Headers{ .allocator = undefined },
promise: JSC.JSPromise.Strong,
concurrent_task: JSC.ConcurrentTask = .{},
poll_ref: JSC.PollRef = .{},
+ /// For Http Client requests
+ /// when Content-Length is provided this represents the whole size of the request
+ /// If chunked encoded this will represent the total received size (ignoring the chunk headers)
+ /// If is not chunked encoded and Content-Length is not provided this will be unknown
+ body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
/// This is url + proxy memory buffer and is owned by FetchTasklet
/// We always clone url and proxy (if informed)
@@ -630,11 +642,14 @@ pub const Fetch = struct {
signal: ?*JSC.WebCore.AbortSignal = null,
aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
abort_reason: JSValue = JSValue.zero,
// Custom Hostname
hostname: ?[]u8 = null,
+ is_waiting_body: bool = false,
+ mutex: Mutex,
tracker: JSC.AsyncTaskTracker,
@@ -691,6 +706,8 @@ pub const Fetch = struct {
this.result.deinitMetadata();
this.response_buffer.deinit();
+ this.response.deinit();
+ this.scheduled_response_buffer.deinit();
this.request_body.detach();
if (this.abort_reason != .zero)
@@ -707,9 +724,122 @@ pub const Fetch = struct {
this.javascript_vm.allocator.destroy(this);
}
- pub fn onDone(this: *FetchTasklet) void {
+ pub fn onBodyReceived(this: *FetchTasklet) void {
+ const success = this.result.isSuccess();
+ const globalThis = this.global_this;
+ defer {
+ if (!success or !this.result.has_more) {
+ var vm = globalThis.bunVM();
+ this.poll_ref.unref(vm);
+ this.clearData();
+ this.deinit();
+ }
+ }
+
+ if (!success) {
+ const err = this.onReject();
+ err.ensureStillAlive();
+ if (this.response.get()) |response_js| {
+ if (response_js.as(Response)) |response| {
+ const body = response.body;
+ if (body.value == .Locked) {
+ if (body.value.Locked.readable) |readable| {
+ readable.ptr.Bytes.onData(
+ .{
+ .err = .{ .JSValue = err },
+ },
+ bun.default_allocator,
+ );
+ return;
+ }
+ }
+
+ response.body.value.toErrorInstance(err, globalThis);
+ return;
+ }
+ }
+
+ globalThis.throwValue(err);
+ return;
+ }
+
+ if (this.response.get()) |response_js| {
+ if (response_js.as(Response)) |response| {
+ const body = response.body;
+ if (body.value == .Locked) {
+ if (body.value.Locked.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ readable.ptr.Bytes.size_hint = this.getSizeHint();
+
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ const chunk = scheduled_response_buffer.items;
+
+ if (this.result.has_more) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+
+ // clean for reuse later
+ this.scheduled_response_buffer.reset();
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+
+ return;
+ }
+ } else {
+ response.body.value.Locked.size_hint = this.getSizeHint();
+ }
+ // we will reach here when not streaming
+ if (!this.result.has_more) {
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ // done resolve body
+ var old = body.value;
+ var body_value = Body.Value{
+ .InternalBlob = .{
+ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
+ },
+ };
+ response.body.value = body_value;
+
+ this.scheduled_response_buffer = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ if (old == .Locked) {
+ old.resolve(&response.body.value, this.global_this);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
+ this.mutex.lock();
+ defer {
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
+ }
+ if (this.is_waiting_body) {
+ return this.onBodyReceived();
+ }
const globalThis = this.global_this;
var ref = this.promise;
@@ -718,13 +848,22 @@ pub const Fetch = struct {
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
- defer poll_ref.unref(vm);
if (promise_value.isEmptyOrUndefinedOrNull()) {
+ poll_ref.unref(vm);
this.clearData();
+ this.deinit();
return;
}
+ defer {
+ if (!this.is_waiting_body) {
+ poll_ref.unref(vm);
+ this.clearData();
+ this.deinit();
+ }
+ }
+
const promise = promise_value.asAnyPromise().?;
const tracker = this.tracker;
tracker.willDispatch(globalThis);
@@ -735,7 +874,6 @@ pub const Fetch = struct {
false => this.onReject(),
};
result.ensureStillAlive();
- this.clearData();
promise_value.ensureStillAlive();
@@ -784,25 +922,77 @@ pub const Fetch = struct {
return fetch_error.toErrorInstance(this.global_this);
}
- fn toBodyValue(this: *FetchTasklet) Body.Value {
- var response_buffer = this.response_buffer.list;
- this.response_buffer = .{
- .allocator = default_allocator,
- .list = .{
- .items = &.{},
- .capacity = 0,
- },
+ pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
+ const this = bun.cast(*FetchTasklet, ctx);
+ if (this.http) |http| {
+ http.enableBodyStreaming();
+ }
+ if (this.aborted.load(.Acquire)) {
+ return JSC.WebCore.DrainResult{
+ .aborted = {},
+ };
+ }
+
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ const size_hint = this.getSizeHint();
+
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+ // This means we have received part of the body but not the whole thing
+ if (scheduled_response_buffer.items.len > 0) {
+ this.scheduled_response_buffer = .{
+ .allocator = default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ return .{
+ .owned = .{
+ .list = scheduled_response_buffer.toManaged(bun.default_allocator),
+ .size_hint = size_hint,
+ },
+ };
+ }
+
+ return .{
+ .estimated_size = size_hint,
};
+ }
+
+ fn getSizeHint(this: *FetchTasklet) Blob.SizeType {
+ return switch (this.body_size) {
+ .content_length => @truncate(this.body_size.content_length),
+ .total_received => @truncate(this.body_size.total_received),
+ else => 0,
+ };
+ }
- // if (response_buffer.items.len < InlineBlob.available_bytes) {
- // const inline_blob = InlineBlob.init(response_buffer.items);
- // defer response_buffer.deinit(bun.default_allocator);
- // return .{ .InlineBlob = inline_blob };
- // }
+ fn toBodyValue(this: *FetchTasklet) Body.Value {
+ if (this.is_waiting_body) {
+ const response = Body.Value{
+ .Locked = .{
+ .size_hint = this.getSizeHint(),
+ .task = this,
+ .global = this.global_this,
+ .onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback,
+ },
+ };
+ return response;
+ }
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
const response = Body.Value{
.InternalBlob = .{
- .bytes = response_buffer.toManaged(bun.default_allocator),
+ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
+ },
+ };
+ this.scheduled_response_buffer = .{
+ .allocator = default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
},
};
@@ -811,6 +1001,7 @@ pub const Fetch = struct {
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
const http_response = this.result.response;
+ this.is_waiting_body = this.result.has_more;
return Response{
.allocator = allocator,
.url = bun.String.createAtomIfPossible(this.result.href),
@@ -830,7 +1021,10 @@ pub const Fetch = struct {
const allocator = bun.default_allocator;
var response = allocator.create(Response) catch unreachable;
response.* = this.toResponse(allocator);
- return Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response);
+ const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response);
+ response_js.ensureStillAlive();
+ this.response = JSC.Strong.create(response_js, this.global_this);
+ return response_js;
}
pub fn get(
@@ -843,6 +1037,14 @@ pub const Fetch = struct {
var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet);
fetch_tasklet.* = .{
+ .mutex = Mutex.init(),
+ .scheduled_response_buffer = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ },
.response_buffer = MutableString{
.allocator = bun.default_allocator,
.list = .{
@@ -905,6 +1107,8 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout;
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
+ // we wanna to return after headers are received
+ fetch_tasklet.http.?.signalHeaderProgress();
if (fetch_tasklet.request_body == .Sendfile) {
std.debug.assert(fetch_options.url.isHTTP());
@@ -973,8 +1177,26 @@ pub const Fetch = struct {
}
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
- task.response_buffer = result.body.?.*;
+ task.mutex.lock();
+ defer task.mutex.unlock();
task.result = result;
+ task.body_size = result.body_size;
+
+ const success = result.isSuccess();
+ task.response_buffer = result.body.?.*;
+
+ if (success) {
+ _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
+ }
+
+ // reset for reuse
+ task.response_buffer.reset();
+
+ if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| {
+ if (has_schedule_callback) {
+ return;
+ }
+ }
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
};