aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/streams.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r--src/bun.js/webcore/streams.zig336
1 files changed, 297 insertions, 39 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 23aad70ec..681ff6172 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -70,7 +70,7 @@ pub const ReadableStream = struct {
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding();
this.value.unprotect();
- ReadableStream__abort(this.value, globalThis);
+ ReadableStream__cancel(this.value, globalThis);
}
pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
@@ -98,6 +98,8 @@ pub const ReadableStream = struct {
/// This is a direct readable stream
/// That means we can turn it into whatever we want
Direct = 3,
+
+ Bytes = 4,
};
pub const Source = union(Tag) {
Invalid: void,
@@ -116,6 +118,8 @@ pub const ReadableStream = struct {
/// This is a direct readable stream
/// That means we can turn it into whatever we want
Direct: void,
+
+ Bytes: *ByteStream,
};
extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag;
@@ -165,6 +169,13 @@ pub const ReadableStream = struct {
},
},
+ .Bytes => ReadableStream{
+ .value = value,
+ .ptr = .{
+ .Bytes = ptr.asPtr(ByteStream),
+ },
+ },
+
// .HTTPRequest => ReadableStream{
// .value = value,
// .ptr = .{
@@ -184,6 +195,7 @@ pub const ReadableStream = struct {
extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue;
pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue {
+ JSC.markBinding();
return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id)));
}
pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
@@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) {
}
};
+pub const DrainResult = union(enum) {
+ owned: struct {
+ list: std.ArrayList(u8),
+ size_hint: usize,
+ },
+ estimated_size: usize,
+ empty: void,
+ aborted: void,
+};
+
pub const StreamResult = union(Tag) {
owned: bun.ByteList,
owned_and_done: bun.ByteList,
@@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct {
pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
};
-pub fn RequestBodyStreamer(
- comptime is_ssl: bool,
-) type {
- return struct {
- response: *uws.NewApp(is_ssl).Response,
+pub const ByteStream = struct {
+ buffer: std.ArrayList(u8) = .{
+ .allocator = bun.default_allocator,
+ .items = &.{},
+ .capacity = 0,
+ },
+ has_received_last_chunk: bool = false,
+ pending: StreamResult.Pending = StreamResult.Pending{
+ .frame = undefined,
+ .used = false,
+ .result = .{ .done = {} },
+ },
+ done: bool = false,
+ pending_buffer: []u8 = &.{},
+ pending_value: ?*JSC.napi.Ref = null,
+ offset: usize = 0,
+ highWaterMark: Blob.SizeType = 0,
+ pipe: Pipe = .{},
+ size_hint: Blob.SizeType = 0,
+
+ pub const Pipe = struct {
+ ctx: ?*anyopaque = null,
+ onPipe: ?PipeFunction = null,
+
+ pub fn New(comptime Type: type, comptime Function: anytype) type {
+ return struct {
+ pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void {
+ Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator);
+ }
- pub const tag = if (is_ssl)
- ReadableStream.Tag.HTTPRequest
- else if (is_ssl)
- ReadableStream.Tag.HTTPSRequest;
+ pub fn init(self: *Type) Pipe {
+ return Pipe{
+ .ctx = self,
+ .onPipe = pipe,
+ };
+ }
+ };
+ }
+ };
- pub fn onStart(this: *ByteBlobLoader) StreamStart {
- return .{ .chunk_size = this.chunk_size };
+ pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
+
+ pub const tag = ReadableStream.Tag.Bytes;
+
+ pub fn setup(this: *ByteStream) void {
+ this.* = .{};
+ }
+
+ pub fn onStart(this: *@This()) StreamStart {
+ if (this.has_received_last_chunk and this.buffer.items.len == 0) {
+ return .{ .empty = void{} };
}
- pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
- array.ensureStillAlive();
- defer array.ensureStillAlive();
- if (this.done) {
- return .{ .done = {} };
+ if (this.has_received_last_chunk) {
+ return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) };
+ }
+
+ if (this.highWaterMark == 0) {
+ return .{ .ready = void{} };
+ }
+
+ return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) };
+ }
+
+ pub fn value(this: *@This()) JSValue {
+ if (this.pending_value == null)
+ return .zero;
+
+ const result = this.pending_value.?.get();
+ this.pending_value.?.set(.zero);
+
+ return result;
+ }
+
+ pub fn isCancelled(this: *const @This()) bool {
+ return @fieldParentPtr(Source, "context", this).cancelled;
+ }
+
+ pub fn unpipe(this: *@This()) void {
+ this.pipe.ctx = null;
+ this.pipe.onPipe = null;
+ if (!this.parent().deinited) {
+ this.parent().deinited = true;
+ bun.default_allocator.destroy(this.parent());
+ }
+ }
+
+ pub fn onData(
+ this: *@This(),
+ stream: StreamResult,
+ allocator: std.mem.Allocator,
+ ) void {
+ JSC.markBinding();
+ if (this.done) {
+ if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) {
+ if (stream == .owned) allocator.free(stream.owned.slice());
+ if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice());
}
- var temporary = this.store.sharedView();
- temporary = temporary[this.offset..];
+ return;
+ }
+
+ std.debug.assert(!this.has_received_last_chunk);
+ this.has_received_last_chunk = stream.isDone();
+
+ if (this.pipe.ctx != null) {
+ this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator);
+ return;
+ }
+
+ const chunk = stream.slice();
+
+ if (!this.pending.used) {
+ std.debug.assert(this.buffer.items.len == 0);
+ var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)];
+ const pending_buffer_len = this.pending_buffer.len;
+ @memcpy(to_copy.ptr, chunk.ptr, to_copy.len);
+ this.pending_buffer = &.{};
+
+ const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len;
- temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
- if (temporary.len == 0) {
- this.store.deref();
+ if (is_really_done) {
this.done = true;
- return .{ .done = {} };
+ this.pending.result = .{
+ .into_array_and_done = .{
+ .value = this.value(),
+ .len = @truncate(Blob.SizeType, to_copy.len),
+ },
+ };
+ } else {
+ this.pending.result = .{
+ .into_array = .{
+ .value = this.value(),
+ .len = @truncate(Blob.SizeType, to_copy.len),
+ },
+ };
}
- const copied = @intCast(Blob.SizeType, temporary.len);
+ const remaining = chunk[to_copy.len..];
+ if (remaining.len > 0)
+ this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body");
+ resume this.pending.frame;
+ return;
+ }
- this.remain -|= copied;
- this.offset +|= copied;
- @memcpy(buffer.ptr, temporary.ptr, temporary.len);
- if (this.remain == 0) {
- return .{ .into_array_and_done = .{ .value = array, .len = copied } };
+ this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body");
+ }
+
+ pub fn append(
+ this: *@This(),
+ stream: StreamResult,
+ offset: usize,
+ allocator: std.mem.Allocator,
+ ) !void {
+ const chunk = stream.slice()[offset..];
+
+ if (this.buffer.capacity == 0) {
+ switch (stream) {
+ .owned => |owned| {
+ this.buffer = owned.listManaged(allocator);
+ this.offset += offset;
+ },
+ .owned_and_done => |owned| {
+ this.buffer = owned.listManaged(allocator);
+ this.offset += offset;
+ },
+ .temporary_and_done, .temporary => {
+ this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
+ this.buffer.appendSliceAssumeCapacity(chunk);
+ },
+ else => unreachable,
}
+ return;
+ }
+
+ switch (stream) {
+ .temporary_and_done, .temporary => {
+ try this.buffer.appendSlice(chunk);
+ },
+ // We don't support the rest of these yet
+ else => unreachable,
+ }
+ }
- return .{ .into_array = .{ .value = array, .len = copied } };
+ pub fn setValue(this: *@This(), view: JSC.JSValue) void {
+ JSC.markBinding();
+ if (this.pending_value) |pending| {
+ pending.set(view);
+ } else {
+ this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view);
}
+ }
- pub fn onCancel(_: *ByteBlobLoader) void {}
+ pub fn parent(this: *@This()) *Source {
+ return @fieldParentPtr(Source, "context", this);
+ }
- pub fn deinit(this: *ByteBlobLoader) void {
- if (!this.done) {
+ pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult {
+ JSC.markBinding();
+ std.debug.assert(buffer.len > 0);
+
+ if (this.buffer.items.len > 0) {
+ std.debug.assert(this.value() == .zero);
+ const to_write = @minimum(
+ this.buffer.items.len - this.offset,
+ buffer.len,
+ );
+ var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write];
+
+ @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write);
+
+ if (this.offset + to_write == this.buffer.items.len) {
+ this.offset = 0;
+ this.buffer.items.len = 0;
+ } else {
+ this.offset += to_write;
+ }
+
+ if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
+ this.buffer.clearAndFree();
this.done = true;
- this.store.deref();
+
+ return .{
+ .into_array_and_done = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, to_write),
+ },
+ };
+ }
+
+ return .{
+ .into_array = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, to_write),
+ },
+ };
+ }
+
+ if (this.has_received_last_chunk) {
+ return .{
+ .done = void{},
+ };
+ }
+
+ this.pending_buffer = buffer;
+ this.setValue(view);
+
+ return .{
+ .pending = &this.pending,
+ };
+ }
+
+ pub fn onCancel(this: *@This()) void {
+ JSC.markBinding();
+ const view = this.value();
+ if (this.buffer.capacity > 0) this.buffer.clearAndFree();
+ this.done = true;
+ if (this.pending_value) |ref| {
+ this.pending_value = null;
+ ref.destroy(undefined);
+ }
+
+ if (view != .zero) {
+ this.pending_buffer = &.{};
+ this.pending.result = .{ .done = {} };
+ if (!this.pending.used) {
+ resume this.pending.frame;
}
+ }
+ }
+
+ pub fn deinit(this: *@This()) void {
+ JSC.markBinding();
+ if (this.buffer.capacity > 0) this.buffer.clearAndFree();
- bun.default_allocator.destroy(this);
+ if (this.pending_value) |ref| {
+ this.pending_value = null;
+ ref.destroy(undefined);
}
+ if (!this.done) {
+ this.done = true;
- pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer";
- pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit);
- };
-}
+ this.pending_buffer = &.{};
+ this.pending.result = .{ .done = {} };
+
+ if (!this.pending.used) {
+ resume this.pending.frame;
+ }
+ }
+
+ bun.default_allocator.destroy(this.parent());
+ }
+
+ pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit);
+};
pub const FileBlobLoader = struct {
buf: []u8 = &[_]u8{},