diff options
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r-- | src/bun.js/webcore/streams.zig | 336 |
1 files changed, 297 insertions, 39 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 23aad70ec..681ff6172 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -70,7 +70,7 @@ pub const ReadableStream = struct { pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void { JSC.markBinding(); this.value.unprotect(); - ReadableStream__abort(this.value, globalThis); + ReadableStream__cancel(this.value, globalThis); } pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void { @@ -98,6 +98,8 @@ pub const ReadableStream = struct { /// This is a direct readable stream /// That means we can turn it into whatever we want Direct = 3, + + Bytes = 4, }; pub const Source = union(Tag) { Invalid: void, @@ -116,6 +118,8 @@ pub const ReadableStream = struct { /// This is a direct readable stream /// That means we can turn it into whatever we want Direct: void, + + Bytes: *ByteStream, }; extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag; @@ -165,6 +169,13 @@ pub const ReadableStream = struct { }, }, + .Bytes => ReadableStream{ + .value = value, + .ptr = .{ + .Bytes = ptr.asPtr(ByteStream), + }, + }, + // .HTTPRequest => ReadableStream{ // .value = value, // .ptr = .{ @@ -184,6 +195,7 @@ pub const ReadableStream = struct { extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue; pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue { + JSC.markBinding(); return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id))); } pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { @@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) { } }; +pub const DrainResult = union(enum) { + owned: struct { + list: std.ArrayList(u8), + size_hint: usize, + }, + estimated_size: usize, + empty: void, + aborted: void, +}; + pub const StreamResult = union(Tag) { owned: bun.ByteList, owned_and_done: bun.ByteList, @@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct { pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit); }; -pub fn RequestBodyStreamer( - comptime is_ssl: bool, -) type { - return struct { - response: *uws.NewApp(is_ssl).Response, +pub const ByteStream = struct { + buffer: std.ArrayList(u8) = .{ + .allocator = bun.default_allocator, + .items = &.{}, + .capacity = 0, + }, + has_received_last_chunk: bool = false, + pending: StreamResult.Pending = StreamResult.Pending{ + .frame = undefined, + .used = false, + .result = .{ .done = {} }, + }, + done: bool = false, + pending_buffer: []u8 = &.{}, + pending_value: ?*JSC.napi.Ref = null, + offset: usize = 0, + highWaterMark: Blob.SizeType = 0, + pipe: Pipe = .{}, + size_hint: Blob.SizeType = 0, + + pub const Pipe = struct { + ctx: ?*anyopaque = null, + onPipe: ?PipeFunction = null, + + pub fn New(comptime Type: type, comptime Function: anytype) type { + return struct { + pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void { + Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator); + } - pub const tag = if (is_ssl) - ReadableStream.Tag.HTTPRequest - else if (is_ssl) - ReadableStream.Tag.HTTPSRequest; + pub fn init(self: *Type) Pipe { + return Pipe{ + .ctx = self, + .onPipe = pipe, + }; + } + }; + } + }; - pub fn onStart(this: *ByteBlobLoader) StreamStart { - return .{ .chunk_size = this.chunk_size }; + pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; + + pub const tag = ReadableStream.Tag.Bytes; + + pub fn setup(this: *ByteStream) void { + this.* = .{}; + } + + pub fn onStart(this: *@This()) StreamStart { + if (this.has_received_last_chunk and this.buffer.items.len == 0) { + return .{ .empty = void{} }; } - pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult { - array.ensureStillAlive(); - defer array.ensureStillAlive(); - if (this.done) { - return .{ .done = {} }; + if (this.has_received_last_chunk) { + return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) }; + } + + if (this.highWaterMark == 0) { + return .{ .ready = void{} }; + } + + return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) }; + } + + pub fn value(this: *@This()) JSValue { + if (this.pending_value == null) + return .zero; + + const result = this.pending_value.?.get(); + this.pending_value.?.set(.zero); + + return result; + } + + pub fn isCancelled(this: *const @This()) bool { + return @fieldParentPtr(Source, "context", this).cancelled; + } + + pub fn unpipe(this: *@This()) void { + this.pipe.ctx = null; + this.pipe.onPipe = null; + if (!this.parent().deinited) { + this.parent().deinited = true; + bun.default_allocator.destroy(this.parent()); + } + } + + pub fn onData( + this: *@This(), + stream: StreamResult, + allocator: std.mem.Allocator, + ) void { + JSC.markBinding(); + if (this.done) { + if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) { + if (stream == .owned) allocator.free(stream.owned.slice()); + if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice()); } - var temporary = this.store.sharedView(); - temporary = temporary[this.offset..]; + return; + } + + std.debug.assert(!this.has_received_last_chunk); + this.has_received_last_chunk = stream.isDone(); + + if (this.pipe.ctx != null) { + this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator); + return; + } + + const chunk = stream.slice(); + + if (!this.pending.used) { + std.debug.assert(this.buffer.items.len == 0); + var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)]; + const pending_buffer_len = this.pending_buffer.len; + @memcpy(to_copy.ptr, chunk.ptr, to_copy.len); + this.pending_buffer = &.{}; + + const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len; - temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))]; - if (temporary.len == 0) { - this.store.deref(); + if (is_really_done) { this.done = true; - return .{ .done = {} }; + this.pending.result = .{ + .into_array_and_done = .{ + .value = this.value(), + .len = @truncate(Blob.SizeType, to_copy.len), + }, + }; + } else { + this.pending.result = .{ + .into_array = .{ + .value = this.value(), + .len = @truncate(Blob.SizeType, to_copy.len), + }, + }; } - const copied = @intCast(Blob.SizeType, temporary.len); + const remaining = chunk[to_copy.len..]; + if (remaining.len > 0) + this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body"); + resume this.pending.frame; + return; + } - this.remain -|= copied; - this.offset +|= copied; - @memcpy(buffer.ptr, temporary.ptr, temporary.len); - if (this.remain == 0) { - return .{ .into_array_and_done = .{ .value = array, .len = copied } }; + this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body"); + } + + pub fn append( + this: *@This(), + stream: StreamResult, + offset: usize, + allocator: std.mem.Allocator, + ) !void { + const chunk = stream.slice()[offset..]; + + if (this.buffer.capacity == 0) { + switch (stream) { + .owned => |owned| { + this.buffer = owned.listManaged(allocator); + this.offset += offset; + }, + .owned_and_done => |owned| { + this.buffer = owned.listManaged(allocator); + this.offset += offset; + }, + .temporary_and_done, .temporary => { + this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len); + this.buffer.appendSliceAssumeCapacity(chunk); + }, + else => unreachable, } + return; + } + + switch (stream) { + .temporary_and_done, .temporary => { + try this.buffer.appendSlice(chunk); + }, + // We don't support the rest of these yet + else => unreachable, + } + } - return .{ .into_array = .{ .value = array, .len = copied } }; + pub fn setValue(this: *@This(), view: JSC.JSValue) void { + JSC.markBinding(); + if (this.pending_value) |pending| { + pending.set(view); + } else { + this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view); } + } - pub fn onCancel(_: *ByteBlobLoader) void {} + pub fn parent(this: *@This()) *Source { + return @fieldParentPtr(Source, "context", this); + } - pub fn deinit(this: *ByteBlobLoader) void { - if (!this.done) { + pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult { + JSC.markBinding(); + std.debug.assert(buffer.len > 0); + + if (this.buffer.items.len > 0) { + std.debug.assert(this.value() == .zero); + const to_write = @minimum( + this.buffer.items.len - this.offset, + buffer.len, + ); + var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write]; + + @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write); + + if (this.offset + to_write == this.buffer.items.len) { + this.offset = 0; + this.buffer.items.len = 0; + } else { + this.offset += to_write; + } + + if (this.has_received_last_chunk and remaining_in_buffer.len == 0) { + this.buffer.clearAndFree(); this.done = true; - this.store.deref(); + + return .{ + .into_array_and_done = .{ + .value = view, + .len = @truncate(Blob.SizeType, to_write), + }, + }; + } + + return .{ + .into_array = .{ + .value = view, + .len = @truncate(Blob.SizeType, to_write), + }, + }; + } + + if (this.has_received_last_chunk) { + return .{ + .done = void{}, + }; + } + + this.pending_buffer = buffer; + this.setValue(view); + + return .{ + .pending = &this.pending, + }; + } + + pub fn onCancel(this: *@This()) void { + JSC.markBinding(); + const view = this.value(); + if (this.buffer.capacity > 0) this.buffer.clearAndFree(); + this.done = true; + if (this.pending_value) |ref| { + this.pending_value = null; + ref.destroy(undefined); + } + + if (view != .zero) { + this.pending_buffer = &.{}; + this.pending.result = .{ .done = {} }; + if (!this.pending.used) { + resume this.pending.frame; } + } + } + + pub fn deinit(this: *@This()) void { + JSC.markBinding(); + if (this.buffer.capacity > 0) this.buffer.clearAndFree(); - bun.default_allocator.destroy(this); + if (this.pending_value) |ref| { + this.pending_value = null; + ref.destroy(undefined); } + if (!this.done) { + this.done = true; - pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer"; - pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit); - }; -} + this.pending_buffer = &.{}; + this.pending.result = .{ .done = {} }; + + if (!this.pending.used) { + resume this.pending.frame; + } + } + + bun.default_allocator.destroy(this.parent()); + } + + pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit); +}; pub const FileBlobLoader = struct { buf: []u8 = &[_]u8{}, |