diff options
Diffstat (limited to 'src/bun.js/webcore/response.zig')
-rw-r--r-- | src/bun.js/webcore/response.zig | 260 |
1 files changed, 241 insertions, 19 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 4b15efc0e..01ecfad36 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -44,6 +44,7 @@ const JSPrinter = bun.js_printer; const picohttp = @import("root").bun.picohttp; const StringJoiner = @import("../../string_joiner.zig"); const uws = @import("root").bun.uws; +const Mutex = @import("../../lock.zig").Lock; const InlineBlob = JSC.WebCore.InlineBlob; const AnyBlob = JSC.WebCore.AnyBlob; @@ -123,6 +124,7 @@ pub const Response = struct { pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void { const Writer = @TypeOf(writer); try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.body.len())}); + { formatter.indent += 1; defer formatter.indent -|= 1; @@ -618,11 +620,21 @@ pub const Fetch = struct { javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, request_body: HTTPRequestBody = undefined, + /// buffer being used by AsyncHTTP response_buffer: MutableString = undefined, + /// buffer used to stream response to JS + scheduled_response_buffer: MutableString = undefined, + /// response strong ref + response: JSC.Strong = .{}, request_headers: Headers = Headers{ .allocator = undefined }, promise: JSC.JSPromise.Strong, concurrent_task: JSC.ConcurrentTask = .{}, poll_ref: JSC.PollRef = .{}, + /// For Http Client requests + /// when Content-Length is provided this represents the whole size of the request + /// If chunked encoded this will represent the total received size (ignoring the chunk headers) + /// If is not chunked encoded and Content-Length is not provided this will be unknown + body_size: HTTPClient.HTTPClientResult.BodySize = .unknown, /// This is url + proxy memory buffer and is owned by FetchTasklet /// We always clone url and proxy (if informed) @@ -630,11 +642,14 @@ pub const Fetch = struct { signal: ?*JSC.WebCore.AbortSignal = null, aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), // must be stored because AbortSignal stores reason weakly abort_reason: JSValue = JSValue.zero, // Custom Hostname hostname: ?[]u8 = null, + is_waiting_body: bool = false, + mutex: Mutex, tracker: JSC.AsyncTaskTracker, @@ -691,6 +706,8 @@ pub const Fetch = struct { this.result.deinitMetadata(); this.response_buffer.deinit(); + this.response.deinit(); + this.scheduled_response_buffer.deinit(); this.request_body.detach(); if (this.abort_reason != .zero) @@ -707,9 +724,122 @@ pub const Fetch = struct { this.javascript_vm.allocator.destroy(this); } - pub fn onDone(this: *FetchTasklet) void { + pub fn onBodyReceived(this: *FetchTasklet) void { + const success = this.result.isSuccess(); + const globalThis = this.global_this; + defer { + if (!success or !this.result.has_more) { + var vm = globalThis.bunVM(); + this.poll_ref.unref(vm); + this.clearData(); + this.deinit(); + } + } + + if (!success) { + const err = this.onReject(); + err.ensureStillAlive(); + if (this.response.get()) |response_js| { + if (response_js.as(Response)) |response| { + const body = response.body; + if (body.value == .Locked) { + if (body.value.Locked.readable) |readable| { + readable.ptr.Bytes.onData( + .{ + .err = .{ .JSValue = err }, + }, + bun.default_allocator, + ); + return; + } + } + + response.body.value.toErrorInstance(err, globalThis); + return; + } + } + + globalThis.throwValue(err); + return; + } + + if (this.response.get()) |response_js| { + if (response_js.as(Response)) |response| { + const body = response.body; + if (body.value == .Locked) { + if (body.value.Locked.readable) |readable| { + if (readable.ptr == .Bytes) { + readable.ptr.Bytes.size_hint = this.getSizeHint(); + + var scheduled_response_buffer = this.scheduled_response_buffer.list; + + const chunk = scheduled_response_buffer.items; + + if (this.result.has_more) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + + // clean for reuse later + this.scheduled_response_buffer.reset(); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } + + return; + } + } else { + response.body.value.Locked.size_hint = this.getSizeHint(); + } + // we will reach here when not streaming + if (!this.result.has_more) { + var scheduled_response_buffer = this.scheduled_response_buffer.list; + + // done resolve body + var old = body.value; + var body_value = Body.Value{ + .InternalBlob = .{ + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + response.body.value = body_value; + + this.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + if (old == .Locked) { + old.resolve(&response.body.value, this.global_this); + } + } + } + } + } + } + + pub fn onProgressUpdate(this: *FetchTasklet) void { JSC.markBinding(@src()); + this.mutex.lock(); + defer { + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); + } + if (this.is_waiting_body) { + return this.onBodyReceived(); + } const globalThis = this.global_this; var ref = this.promise; @@ -718,13 +848,22 @@ pub const Fetch = struct { var poll_ref = this.poll_ref; var vm = globalThis.bunVM(); - defer poll_ref.unref(vm); if (promise_value.isEmptyOrUndefinedOrNull()) { + poll_ref.unref(vm); this.clearData(); + this.deinit(); return; } + defer { + if (!this.is_waiting_body) { + poll_ref.unref(vm); + this.clearData(); + this.deinit(); + } + } + const promise = promise_value.asAnyPromise().?; const tracker = this.tracker; tracker.willDispatch(globalThis); @@ -735,7 +874,6 @@ pub const Fetch = struct { false => this.onReject(), }; result.ensureStillAlive(); - this.clearData(); promise_value.ensureStillAlive(); @@ -784,25 +922,77 @@ pub const Fetch = struct { return fetch_error.toErrorInstance(this.global_this); } - fn toBodyValue(this: *FetchTasklet) Body.Value { - var response_buffer = this.response_buffer.list; - this.response_buffer = .{ - .allocator = default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, + pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult { + const this = bun.cast(*FetchTasklet, ctx); + if (this.http) |http| { + http.enableBodyStreaming(); + } + if (this.aborted.load(.Acquire)) { + return JSC.WebCore.DrainResult{ + .aborted = {}, + }; + } + + this.mutex.lock(); + defer this.mutex.unlock(); + const size_hint = this.getSizeHint(); + + var scheduled_response_buffer = this.scheduled_response_buffer.list; + // This means we have received part of the body but not the whole thing + if (scheduled_response_buffer.items.len > 0) { + this.scheduled_response_buffer = .{ + .allocator = default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + return .{ + .owned = .{ + .list = scheduled_response_buffer.toManaged(bun.default_allocator), + .size_hint = size_hint, + }, + }; + } + + return .{ + .estimated_size = size_hint, }; + } + + fn getSizeHint(this: *FetchTasklet) Blob.SizeType { + return switch (this.body_size) { + .content_length => @truncate(this.body_size.content_length), + .total_received => @truncate(this.body_size.total_received), + else => 0, + }; + } - // if (response_buffer.items.len < InlineBlob.available_bytes) { - // const inline_blob = InlineBlob.init(response_buffer.items); - // defer response_buffer.deinit(bun.default_allocator); - // return .{ .InlineBlob = inline_blob }; - // } + fn toBodyValue(this: *FetchTasklet) Body.Value { + if (this.is_waiting_body) { + const response = Body.Value{ + .Locked = .{ + .size_hint = this.getSizeHint(), + .task = this, + .global = this.global_this, + .onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback, + }, + }; + return response; + } + var scheduled_response_buffer = this.scheduled_response_buffer.list; const response = Body.Value{ .InternalBlob = .{ - .bytes = response_buffer.toManaged(bun.default_allocator), + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + this.scheduled_response_buffer = .{ + .allocator = default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, }, }; @@ -811,6 +1001,7 @@ pub const Fetch = struct { fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response { const http_response = this.result.response; + this.is_waiting_body = this.result.has_more; return Response{ .allocator = allocator, .url = bun.String.createAtomIfPossible(this.result.href), @@ -830,7 +1021,10 @@ pub const Fetch = struct { const allocator = bun.default_allocator; var response = allocator.create(Response) catch unreachable; response.* = this.toResponse(allocator); - return Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response); + const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response); + response_js.ensureStillAlive(); + this.response = JSC.Strong.create(response_js, this.global_this); + return response_js; } pub fn get( @@ -843,6 +1037,14 @@ pub const Fetch = struct { var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet); fetch_tasklet.* = .{ + .mutex = Mutex.init(), + .scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, .response_buffer = MutableString{ .allocator = bun.default_allocator, .list = .{ @@ -905,6 +1107,8 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout; fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; + // we wanna to return after headers are received + fetch_tasklet.http.?.signalHeaderProgress(); if (fetch_tasklet.request_body == .Sendfile) { std.debug.assert(fetch_options.url.isHTTP()); @@ -973,8 +1177,26 @@ pub const Fetch = struct { } pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void { - task.response_buffer = result.body.?.*; + task.mutex.lock(); + defer task.mutex.unlock(); task.result = result; + task.body_size = result.body_size; + + const success = result.isSuccess(); + task.response_buffer = result.body.?.*; + + if (success) { + _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM"); + } + + // reset for reuse + task.response_buffer.reset(); + + if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| { + if (has_schedule_callback) { + return; + } + } task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit)); } }; |