diff options
author | 2022-09-26 20:04:28 -0700 | |
---|---|---|
committer | 2022-09-26 20:04:28 -0700 | |
commit | 24a9bc23b7e1c7911cb2e146be199d940b9729e6 (patch) | |
tree | 852a75cff3950063b405ca3a0dfe22e46d0eecfb /src/bun.js/webcore | |
parent | 97c3688788a94faffb6bceb4bc6c97fb84307ceb (diff) | |
download | bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.gz bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.zst bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.zip |
[Web Streams] Add `body` to `Response` and `Request` (#1255)
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/bun.js/webcore')
-rw-r--r-- | src/bun.js/webcore/response.classes.ts | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 340 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 336 |
3 files changed, 556 insertions, 122 deletions
diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index 3cf695f74..84bad8f24 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -10,6 +10,7 @@ export default [ proto: { text: { fn: "getText" }, json: { fn: "getJSON" }, + body: { getter: "getBody", cache: true }, arrayBuffer: { fn: "getArrayBuffer" }, blob: { fn: "getBlob" }, clone: { fn: "doClone", length: 1 }, @@ -74,6 +75,7 @@ export default [ getter: "getURL", cache: true, }, + body: { getter: "getBody", cache: true }, text: { fn: "getText" }, json: { fn: "getJSON" }, diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index dd37537e3..64dfe3dc7 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -145,6 +145,18 @@ pub const Response = struct { return JSValue.jsBoolean(this.body.value == .Used); } + pub fn getBody( + this: *Response, + globalThis: *JSC.JSGlobalObject, + ) callconv(.C) JSValue { + if (this.body.value == .Used) { + globalThis.throw("Body already used", .{}); + return JSValue.jsUndefined(); + } + + return this.body.value.toReadableStream(globalThis); + } + pub fn getStatusText( this: *Response, globalThis: *JSC.JSGlobalObject, @@ -194,21 +206,21 @@ pub const Response = struct { } pub fn cloneInto( - this: *const Response, + this: *Response, new_response: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, ) void { new_response.* = Response{ .allocator = allocator, - .body = this.body.clone(allocator, globalThis), + .body = this.body.clone(globalThis), .url = allocator.dupe(u8, this.url) catch unreachable, .status_text = allocator.dupe(u8, this.status_text) catch unreachable, .redirected = this.redirected, }; } - pub fn clone(this: *const Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response { + pub fn clone(this: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response { var new_response = allocator.create(Response) catch unreachable; this.cloneInto(new_response, allocator, globalThis); return new_response; @@ -267,7 +279,7 @@ pub const Response = struct { return default.value; }, - .Used, .Locked, .Empty, .Error => return default.value, + .InternalBlob, .Used, .Locked, .Empty, .Error => return default.value, } } @@ -552,7 +564,7 @@ pub const Fetch = struct { const globalThis = this.global_this; var ref = this.ref; - const promise_value = ref.get(globalThis); + const promise_value = ref.get(); defer ref.destroy(globalThis); if (promise_value.isEmptyOrUndefinedOrNull()) { @@ -596,7 +608,7 @@ pub const Fetch = struct { var allocator = this.global_this.bunVM().allocator; const http_response = this.result.response; var response = allocator.create(Response) catch unreachable; - const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this); + const internal_blob = this.response_buffer.list.toManaged(this.response_buffer.allocator); this.response_buffer = .{ .allocator = default_allocator, .list = .{ .items = &.{}, .capacity = 0, @@ -613,7 +625,7 @@ pub const Fetch = struct { .status_code = @truncate(u16, http_response.status_code), }, .value = .{ - .Blob = blob, + .InternalBlob = internal_blob, }, }, }; @@ -937,6 +949,15 @@ pub const Blob = struct { return this.store == null; } + pub fn writeFormatForSize(size: usize, writer: anytype, comptime enable_ansi_colors: bool) !void { + try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors)); + try writer.print( + comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors), + .{ + bun.fmt.size(size), + }, + ); + } pub fn writeFormat(this: *const Blob, formatter: *JSC.Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void { const Writer = @TypeOf(writer); @@ -947,38 +968,34 @@ pub const Blob = struct { return; } - var store = this.store.?; - switch (store.data) { - .file => |file| { - try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors)); - switch (file.pathlike) { - .path => |path| { - try writer.print( - comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors), - .{ - path.slice(), - }, - ); - }, - .fd => |fd| { - try writer.print( - comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors), - .{ - fd, - }, - ); - }, - } - }, - .bytes => { - try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors)); - try writer.print( - comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors), - .{ - bun.fmt.size(this.size), - }, - ); - }, + { + var store = this.store.?; + switch (store.data) { + .file => |file| { + try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors)); + switch (file.pathlike) { + .path => |path| { + try writer.print( + comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors), + .{ + path.slice(), + }, + ); + }, + .fd => |fd| { + try writer.print( + comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors), + .{ + fd, + }, + ); + }, + } + }, + .bytes => { + try writeFormatForSize(this.size, writer, enable_ansi_colors); + }, + } } if (this.content_type.len > 0 or this.offset > 0) { @@ -1071,7 +1088,7 @@ pub const Blob = struct { bun.default_allocator.destroy(this); promise.reject(globalThis, ZigString.init("Body was used after it was consumed").toErrorInstance(globalThis)); }, - .Empty, .Blob => { + .InternalBlob, .Empty, .Blob => { var blob = value.use(); // TODO: this should be one promise not two! const new_promise = writeFileWithSourceDestination(globalThis.ref(), &blob, &file_blob); @@ -1273,7 +1290,7 @@ pub const Blob = struct { var source_blob: Blob = brk: { if (data.as(Response)) |response| { switch (response.body.value) { - .Used, .Empty, .Blob => { + .InternalBlob, .Used, .Empty, .Blob => { break :brk response.body.use(); }, .Error => { @@ -1302,7 +1319,7 @@ pub const Blob = struct { if (data.as(Request)) |request| { switch (request.body) { - .Used, .Empty, .Blob => { + .InternalBlob, .Used, .Empty, .Blob => { break :brk request.body.use(); }, .Error => { @@ -3321,11 +3338,6 @@ pub const Blob = struct { return slice_[0..@minimum(slice_.len, @as(usize, this.size))]; } - pub fn view(this: *const Blob) []const u8 { - if (this.size == 0 or this.store == null) return ""; - return this.store.?.sharedView()[this.offset..][0..this.size]; - } - pub const Lifetime = JSC.WebCore.Lifetime; pub fn setIsASCIIFlag(this: *Blob, is_all_ascii: bool) void { this.is_all_ascii = is_all_ascii; @@ -3881,10 +3893,10 @@ pub const Body = struct { return this.value.use(); } - pub fn clone(this: Body, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) Body { + pub fn clone(this: *Body, globalThis: *JSGlobalObject) Body { return Body{ .init = this.init.clone(globalThis), - .value = this.value.clone(allocator), + .value = this.value.clone(globalThis), }; } @@ -3911,6 +3923,16 @@ pub const Body = struct { try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); try this.value.Blob.writeFormat(formatter, writer, enable_ansi_colors); + } else if (this.value == .InternalBlob) { + try formatter.printComma(Writer, writer, enable_ansi_colors); + try writer.writeAll("\n"); + try Blob.writeFormatForSize(this.value.InternalBlob.items.len, writer, enable_ansi_colors); + } else if (this.value == .Locked) { + if (this.value.Locked.readable) |stream| { + try formatter.printComma(Writer, writer, enable_ansi_colors); + try writer.writeAll("\n"); + formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); + } } } @@ -3982,6 +4004,8 @@ pub const Body = struct { /// conditionally runs when requesting data /// used in HTTP server to ignore request bodies unless asked for it onPull: ?fn (ctx: *anyopaque) void = null, + onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, + deinit: bool = false, action: Action = Action.none, @@ -4041,6 +4065,9 @@ pub const Body = struct { pub const Value = union(Tag) { Blob: Blob, + /// Single-use Blob + /// Avoids a heap allocation. + InternalBlob: std.ArrayList(u8), Locked: PendingValue, Used: void, Empty: void, @@ -4048,6 +4075,7 @@ pub const Body = struct { pub const Tag = enum { Blob, + InternalBlob, Locked, Used, Empty, @@ -4056,6 +4084,87 @@ pub const Body = struct { pub const empty = Value{ .Empty = .{} }; + + pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) JSValue { + JSC.markBinding(); + + switch (this.*) { + .Used, .Empty => { + return JSC.WebCore.ReadableStream.empty(globalThis); + }, + .InternalBlob => |*bytes| { + var blob = Blob.init(bytes.toOwnedSlice(), bytes.allocator, globalThis); + defer blob.detach(); + var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); + this.* = .{ + .Locked = .{ + .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?, + .global = globalThis, + }, + }; + return readable; + }, + .Blob => { + var blob = this.Blob; + defer blob.detach(); + var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); + readable.protect(); + this.* = .{ + .Locked = .{ + .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?, + .global = globalThis, + }, + }; + return readable; + }, + .Locked => { + var locked = &this.Locked; + if (locked.readable) |readable| { + return readable.value; + } + var drain_result: JSC.WebCore.DrainResult = .{ + .estimated_size = 0, + }; + + if (locked.onDrain) |drain| { + locked.onDrain = null; + drain_result = drain(locked.task.?); + } + + if (drain_result == .empty or drain_result == .aborted) { + this.* = .{ .Empty = void{} }; + return JSC.WebCore.ReadableStream.empty(globalThis); + } + + var reader = bun.default_allocator.create(JSC.WebCore.ByteStream.Source) catch unreachable; + reader.* = .{ + .context = undefined, + .globalThis = globalThis, + }; + + reader.context.setup(); + + if (drain_result == .estimated_size) { + reader.context.highWaterMark = @truncate(Blob.SizeType, drain_result.estimated_size); + reader.context.size_hint = @truncate(Blob.SizeType, drain_result.estimated_size); + } else if (drain_result == .owned) { + reader.context.buffer = drain_result.owned.list; + reader.context.size_hint = @truncate(Blob.SizeType, drain_result.owned.size_hint); + } + + locked.readable = .{ + .ptr = .{ .Bytes = &reader.context }, + .value = reader.toJS(globalThis), + }; + + locked.readable.?.value.protect(); + return locked.readable.?.value; + }, + + else => unreachable, + } + } + pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) ?Value { if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |readable| { switch (readable.ptr) { @@ -4121,13 +4230,29 @@ pub const Body = struct { if (locked.promise) |promise| { locked.promise = null; - var blob = new.use(); switch (locked.action) { .getText => { - promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref())); + if (new.* == .InternalBlob) { + var bytes = new.InternalBlob; + new.* = .{ .Empty = void{} }; + var str = ZigString.init(bytes.items).withEncoding(); + str.mark(); + if (str.is16Bit()) { + const out = str.toValueGC(global); + bytes.deinit(); + promise.asPromise().?.resolve(global, out); + } else { + const out = str.toExternalValue(global); + promise.asPromise().?.resolve(global, out); + } + } else { + var blob = new.use(); + promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref())); + } }, .getJSON => { + var blob = new.use(); const json_value = blob.toJSON(global, .share); blob.detach(); @@ -4138,17 +4263,25 @@ pub const Body = struct { } }, .getArrayBuffer => { - promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global)); + if (new.* == .InternalBlob) { + const marked = JSC.MarkedArrayBuffer.fromBytes(new.InternalBlob.items, new.InternalBlob.allocator, .ArrayBuffer); + new.* = .{ .Empty = void{} }; + var object_ref = marked.toJS(global, null); + promise.asPromise().?.resolve(global, JSC.JSValue.c(object_ref)); + } else { + var blob = new.use(); + promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global)); + } }, .getBlob => { var ptr = bun.default_allocator.create(Blob) catch unreachable; - ptr.* = blob; + ptr.* = new.use(); ptr.allocator = bun.default_allocator; promise.asPromise().?.resolve(global, ptr.toJS(global)); }, else => { var ptr = bun.default_allocator.create(Blob) catch unreachable; - ptr.* = blob; + ptr.* = new.use(); ptr.allocator = bun.default_allocator; promise.asInternalPromise().?.resolve(global, ptr.toJS(global)); }, @@ -4160,6 +4293,7 @@ pub const Body = struct { pub fn slice(this: Value) []const u8 { return switch (this) { .Blob => this.Blob.sharedView(), + .InternalBlob => |list| list.items, else => "", }; } @@ -4172,6 +4306,15 @@ pub const Body = struct { this.* = .{ .Used = .{} }; return new_blob; }, + .InternalBlob => { + var new_blob = Blob.init( + this.InternalBlob.toOwnedSlice(), + this.InternalBlob.allocator, + JSC.VirtualMachine.vm.global, + ); + this.* = .{ .Used = .{} }; + return new_blob; + }, else => { return Blob.initEmpty(undefined); }, @@ -4228,14 +4371,22 @@ pub const Body = struct { pub fn deinit(this: *Value) void { const tag = @as(Tag, this.*); if (tag == .Locked) { - if (this.Locked.readable) |*readable| { - readable.done(); + if (!this.Locked.deinit) { + this.Locked.deinit = true; + + if (this.Locked.readable) |*readable| { + readable.done(); + } } - this.Locked.deinit = true; return; } + if (tag == .InternalBlob) { + this.InternalBlob.clearAndFree(); + this.* = Value.empty; + } + if (tag == .Blob) { this.Blob.deinit(); this.* = Value.empty; @@ -4246,8 +4397,18 @@ pub const Body = struct { } } - pub fn clone(this: Value, _: std.mem.Allocator) Value { - if (this == .Blob) { + pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value { + if (this.* == .InternalBlob) { + this.* = .{ + .Blob = Blob.init( + this.InternalBlob.toOwnedSlice(), + this.InternalBlob.allocator, + globalThis, + ), + }; + } + + if (this.* == .Blob) { return Value{ .Blob = this.Blob.dupe() }; } @@ -4319,10 +4480,16 @@ pub const Body = struct { } else |_| {} } + if (value.isUndefined()) { + body.value = Value.empty; + return body; + } + body.value = Value.fromJS(globalThis, value) orelse return null; if (body.value == .Blob) std.debug.assert(body.value.Blob.allocator == null); // owned by Body + return body; } }; @@ -4358,7 +4525,18 @@ pub const Request = struct { try writer.writeAll("\""); if (this.body == .Blob) { try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); try this.body.Blob.writeFormat(formatter, writer, enable_ansi_colors); + } else if (this.body == .InternalBlob) { + try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); + try Blob.writeFormatForSize(this.body.InternalBlob.items.len, writer, enable_ansi_colors); + } else if (this.body == .Locked) { + if (this.body.Locked.readable) |stream| { + try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); + formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); + } } } try writer.writeAll("\n"); @@ -4392,7 +4570,7 @@ pub const Request = struct { return MimeType.other.value; }, - .Error, .Used, .Locked, .Empty => return MimeType.other.value, + .InternalBlob, .Error, .Used, .Locked, .Empty => return MimeType.other.value, } } @@ -4415,6 +4593,18 @@ pub const Request = struct { return ZigString.init("").toValueGC(globalThis); } + pub fn getBody( + this: *Request, + globalThis: *JSC.JSGlobalObject, + ) callconv(.C) JSValue { + if (this.body == .Used) { + globalThis.throw("Body already used", .{}); + return JSValue.jsUndefined(); + } + + return this.body.toReadableStream(globalThis); + } + pub fn getIntegrity( _: *Request, globalThis: *JSC.JSGlobalObject, @@ -4521,17 +4711,9 @@ pub const Request = struct { } if (urlOrObject.fastGet(globalThis, .body)) |body_| { - if (Blob.get(globalThis, body_, true, false)) |blob| { - if (blob.size > 0) { - request.body = Body.Value{ .Blob = blob }; - } - } else |err| { - if (err == error.InvalidArguments) { - globalThis.throwInvalidArguments("Expected an Array", .{}); - return null; - } - - globalThis.throwInvalidArguments("Invalid Body object", .{}); + if (Body.Value.fromJS(globalThis, body_)) |body| { + request.body = body.value; + } else { return null; } } @@ -4546,17 +4728,9 @@ pub const Request = struct { } if (arguments[1].fastGet(globalThis, .body)) |body_| { - if (Blob.get(globalThis, body_, true, false)) |blob| { - if (blob.size > 0) { - request.body = Body.Value{ .Blob = blob }; - } - } else |err| { - if (err == error.InvalidArguments) { - globalThis.throwInvalidArguments("Expected an Array", .{}); - return null; - } - - globalThis.throwInvalidArguments("Invalid Body object", .{}); + if (Body.Value.fromJS(globalThis, body_)) |body| { + request.body = body.value; + } else { return null; } } @@ -4614,7 +4788,7 @@ pub const Request = struct { globalThis: *JSGlobalObject, ) void { req.* = Request{ - .body = this.body.clone(allocator), + .body = this.body.clone(globalThis), .url = ZigString.init(allocator.dupe(u8, this.url.slice()) catch unreachable), .method = this.method, }; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 23aad70ec..681ff6172 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -70,7 +70,7 @@ pub const ReadableStream = struct { pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void { JSC.markBinding(); this.value.unprotect(); - ReadableStream__abort(this.value, globalThis); + ReadableStream__cancel(this.value, globalThis); } pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void { @@ -98,6 +98,8 @@ pub const ReadableStream = struct { /// This is a direct readable stream /// That means we can turn it into whatever we want Direct = 3, + + Bytes = 4, }; pub const Source = union(Tag) { Invalid: void, @@ -116,6 +118,8 @@ pub const ReadableStream = struct { /// This is a direct readable stream /// That means we can turn it into whatever we want Direct: void, + + Bytes: *ByteStream, }; extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag; @@ -165,6 +169,13 @@ pub const ReadableStream = struct { }, }, + .Bytes => ReadableStream{ + .value = value, + .ptr = .{ + .Bytes = ptr.asPtr(ByteStream), + }, + }, + // .HTTPRequest => ReadableStream{ // .value = value, // .ptr = .{ @@ -184,6 +195,7 @@ pub const ReadableStream = struct { extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue; pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue { + JSC.markBinding(); return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id))); } pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { @@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) { } }; +pub const DrainResult = union(enum) { + owned: struct { + list: std.ArrayList(u8), + size_hint: usize, + }, + estimated_size: usize, + empty: void, + aborted: void, +}; + pub const StreamResult = union(Tag) { owned: bun.ByteList, owned_and_done: bun.ByteList, @@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct { pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit); }; -pub fn RequestBodyStreamer( - comptime is_ssl: bool, -) type { - return struct { - response: *uws.NewApp(is_ssl).Response, +pub const ByteStream = struct { + buffer: std.ArrayList(u8) = .{ + .allocator = bun.default_allocator, + .items = &.{}, + .capacity = 0, + }, + has_received_last_chunk: bool = false, + pending: StreamResult.Pending = StreamResult.Pending{ + .frame = undefined, + .used = false, + .result = .{ .done = {} }, + }, + done: bool = false, + pending_buffer: []u8 = &.{}, + pending_value: ?*JSC.napi.Ref = null, + offset: usize = 0, + highWaterMark: Blob.SizeType = 0, + pipe: Pipe = .{}, + size_hint: Blob.SizeType = 0, + + pub const Pipe = struct { + ctx: ?*anyopaque = null, + onPipe: ?PipeFunction = null, + + pub fn New(comptime Type: type, comptime Function: anytype) type { + return struct { + pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void { + Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator); + } - pub const tag = if (is_ssl) - ReadableStream.Tag.HTTPRequest - else if (is_ssl) - ReadableStream.Tag.HTTPSRequest; + pub fn init(self: *Type) Pipe { + return Pipe{ + .ctx = self, + .onPipe = pipe, + }; + } + }; + } + }; - pub fn onStart(this: *ByteBlobLoader) StreamStart { - return .{ .chunk_size = this.chunk_size }; + pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; + + pub const tag = ReadableStream.Tag.Bytes; + + pub fn setup(this: *ByteStream) void { + this.* = .{}; + } + + pub fn onStart(this: *@This()) StreamStart { + if (this.has_received_last_chunk and this.buffer.items.len == 0) { + return .{ .empty = void{} }; } - pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult { - array.ensureStillAlive(); - defer array.ensureStillAlive(); - if (this.done) { - return .{ .done = {} }; + if (this.has_received_last_chunk) { + return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) }; + } + + if (this.highWaterMark == 0) { + return .{ .ready = void{} }; + } + + return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) }; + } + + pub fn value(this: *@This()) JSValue { + if (this.pending_value == null) + return .zero; + + const result = this.pending_value.?.get(); + this.pending_value.?.set(.zero); + + return result; + } + + pub fn isCancelled(this: *const @This()) bool { + return @fieldParentPtr(Source, "context", this).cancelled; + } + + pub fn unpipe(this: *@This()) void { + this.pipe.ctx = null; + this.pipe.onPipe = null; + if (!this.parent().deinited) { + this.parent().deinited = true; + bun.default_allocator.destroy(this.parent()); + } + } + + pub fn onData( + this: *@This(), + stream: StreamResult, + allocator: std.mem.Allocator, + ) void { + JSC.markBinding(); + if (this.done) { + if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) { + if (stream == .owned) allocator.free(stream.owned.slice()); + if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice()); } - var temporary = this.store.sharedView(); - temporary = temporary[this.offset..]; + return; + } + + std.debug.assert(!this.has_received_last_chunk); + this.has_received_last_chunk = stream.isDone(); + + if (this.pipe.ctx != null) { + this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator); + return; + } + + const chunk = stream.slice(); + + if (!this.pending.used) { + std.debug.assert(this.buffer.items.len == 0); + var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)]; + const pending_buffer_len = this.pending_buffer.len; + @memcpy(to_copy.ptr, chunk.ptr, to_copy.len); + this.pending_buffer = &.{}; + + const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len; - temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))]; - if (temporary.len == 0) { - this.store.deref(); + if (is_really_done) { this.done = true; - return .{ .done = {} }; + this.pending.result = .{ + .into_array_and_done = .{ + .value = this.value(), + .len = @truncate(Blob.SizeType, to_copy.len), + }, + }; + } else { + this.pending.result = .{ + .into_array = .{ + .value = this.value(), + .len = @truncate(Blob.SizeType, to_copy.len), + }, + }; } - const copied = @intCast(Blob.SizeType, temporary.len); + const remaining = chunk[to_copy.len..]; + if (remaining.len > 0) + this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body"); + resume this.pending.frame; + return; + } - this.remain -|= copied; - this.offset +|= copied; - @memcpy(buffer.ptr, temporary.ptr, temporary.len); - if (this.remain == 0) { - return .{ .into_array_and_done = .{ .value = array, .len = copied } }; + this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body"); + } + + pub fn append( + this: *@This(), + stream: StreamResult, + offset: usize, + allocator: std.mem.Allocator, + ) !void { + const chunk = stream.slice()[offset..]; + + if (this.buffer.capacity == 0) { + switch (stream) { + .owned => |owned| { + this.buffer = owned.listManaged(allocator); + this.offset += offset; + }, + .owned_and_done => |owned| { + this.buffer = owned.listManaged(allocator); + this.offset += offset; + }, + .temporary_and_done, .temporary => { + this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len); + this.buffer.appendSliceAssumeCapacity(chunk); + }, + else => unreachable, } + return; + } + + switch (stream) { + .temporary_and_done, .temporary => { + try this.buffer.appendSlice(chunk); + }, + // We don't support the rest of these yet + else => unreachable, + } + } - return .{ .into_array = .{ .value = array, .len = copied } }; + pub fn setValue(this: *@This(), view: JSC.JSValue) void { + JSC.markBinding(); + if (this.pending_value) |pending| { + pending.set(view); + } else { + this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view); } + } - pub fn onCancel(_: *ByteBlobLoader) void {} + pub fn parent(this: *@This()) *Source { + return @fieldParentPtr(Source, "context", this); + } - pub fn deinit(this: *ByteBlobLoader) void { - if (!this.done) { + pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult { + JSC.markBinding(); + std.debug.assert(buffer.len > 0); + + if (this.buffer.items.len > 0) { + std.debug.assert(this.value() == .zero); + const to_write = @minimum( + this.buffer.items.len - this.offset, + buffer.len, + ); + var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write]; + + @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write); + + if (this.offset + to_write == this.buffer.items.len) { + this.offset = 0; + this.buffer.items.len = 0; + } else { + this.offset += to_write; + } + + if (this.has_received_last_chunk and remaining_in_buffer.len == 0) { + this.buffer.clearAndFree(); this.done = true; - this.store.deref(); + + return .{ + .into_array_and_done = .{ + .value = view, + .len = @truncate(Blob.SizeType, to_write), + }, + }; + } + + return .{ + .into_array = .{ + .value = view, + .len = @truncate(Blob.SizeType, to_write), + }, + }; + } + + if (this.has_received_last_chunk) { + return .{ + .done = void{}, + }; + } + + this.pending_buffer = buffer; + this.setValue(view); + + return .{ + .pending = &this.pending, + }; + } + + pub fn onCancel(this: *@This()) void { + JSC.markBinding(); + const view = this.value(); + if (this.buffer.capacity > 0) this.buffer.clearAndFree(); + this.done = true; + if (this.pending_value) |ref| { + this.pending_value = null; + ref.destroy(undefined); + } + + if (view != .zero) { + this.pending_buffer = &.{}; + this.pending.result = .{ .done = {} }; + if (!this.pending.used) { + resume this.pending.frame; } + } + } + + pub fn deinit(this: *@This()) void { + JSC.markBinding(); + if (this.buffer.capacity > 0) this.buffer.clearAndFree(); - bun.default_allocator.destroy(this); + if (this.pending_value) |ref| { + this.pending_value = null; + ref.destroy(undefined); } + if (!this.done) { + this.done = true; - pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer"; - pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit); - }; -} + this.pending_buffer = &.{}; + this.pending.result = .{ .done = {} }; + + if (!this.pending.used) { + resume this.pending.frame; + } + } + + bun.default_allocator.destroy(this.parent()); + } + + pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit); +}; pub const FileBlobLoader = struct { buf: []u8 = &[_]u8{}, |