aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-09-26 20:04:28 -0700
committerGravatar GitHub <noreply@github.com> 2022-09-26 20:04:28 -0700
commit24a9bc23b7e1c7911cb2e146be199d940b9729e6 (patch)
tree852a75cff3950063b405ca3a0dfe22e46d0eecfb /src/bun.js/webcore
parent97c3688788a94faffb6bceb4bc6c97fb84307ceb (diff)
downloadbun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.gz
bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.zst
bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.zip
[Web Streams] Add `body` to `Response` and `Request` (#1255)
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/bun.js/webcore')
-rw-r--r--src/bun.js/webcore/response.classes.ts2
-rw-r--r--src/bun.js/webcore/response.zig340
-rw-r--r--src/bun.js/webcore/streams.zig336
3 files changed, 556 insertions, 122 deletions
diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts
index 3cf695f74..84bad8f24 100644
--- a/src/bun.js/webcore/response.classes.ts
+++ b/src/bun.js/webcore/response.classes.ts
@@ -10,6 +10,7 @@ export default [
proto: {
text: { fn: "getText" },
json: { fn: "getJSON" },
+ body: { getter: "getBody", cache: true },
arrayBuffer: { fn: "getArrayBuffer" },
blob: { fn: "getBlob" },
clone: { fn: "doClone", length: 1 },
@@ -74,6 +75,7 @@ export default [
getter: "getURL",
cache: true,
},
+ body: { getter: "getBody", cache: true },
text: { fn: "getText" },
json: { fn: "getJSON" },
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index dd37537e3..64dfe3dc7 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -145,6 +145,18 @@ pub const Response = struct {
return JSValue.jsBoolean(this.body.value == .Used);
}
+ pub fn getBody(
+ this: *Response,
+ globalThis: *JSC.JSGlobalObject,
+ ) callconv(.C) JSValue {
+ if (this.body.value == .Used) {
+ globalThis.throw("Body already used", .{});
+ return JSValue.jsUndefined();
+ }
+
+ return this.body.value.toReadableStream(globalThis);
+ }
+
pub fn getStatusText(
this: *Response,
globalThis: *JSC.JSGlobalObject,
@@ -194,21 +206,21 @@ pub const Response = struct {
}
pub fn cloneInto(
- this: *const Response,
+ this: *Response,
new_response: *Response,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
) void {
new_response.* = Response{
.allocator = allocator,
- .body = this.body.clone(allocator, globalThis),
+ .body = this.body.clone(globalThis),
.url = allocator.dupe(u8, this.url) catch unreachable,
.status_text = allocator.dupe(u8, this.status_text) catch unreachable,
.redirected = this.redirected,
};
}
- pub fn clone(this: *const Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response {
+ pub fn clone(this: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response {
var new_response = allocator.create(Response) catch unreachable;
this.cloneInto(new_response, allocator, globalThis);
return new_response;
@@ -267,7 +279,7 @@ pub const Response = struct {
return default.value;
},
- .Used, .Locked, .Empty, .Error => return default.value,
+ .InternalBlob, .Used, .Locked, .Empty, .Error => return default.value,
}
}
@@ -552,7 +564,7 @@ pub const Fetch = struct {
const globalThis = this.global_this;
var ref = this.ref;
- const promise_value = ref.get(globalThis);
+ const promise_value = ref.get();
defer ref.destroy(globalThis);
if (promise_value.isEmptyOrUndefinedOrNull()) {
@@ -596,7 +608,7 @@ pub const Fetch = struct {
var allocator = this.global_this.bunVM().allocator;
const http_response = this.result.response;
var response = allocator.create(Response) catch unreachable;
- const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this);
+ const internal_blob = this.response_buffer.list.toManaged(this.response_buffer.allocator);
this.response_buffer = .{ .allocator = default_allocator, .list = .{
.items = &.{},
.capacity = 0,
@@ -613,7 +625,7 @@ pub const Fetch = struct {
.status_code = @truncate(u16, http_response.status_code),
},
.value = .{
- .Blob = blob,
+ .InternalBlob = internal_blob,
},
},
};
@@ -937,6 +949,15 @@ pub const Blob = struct {
return this.store == null;
}
+ pub fn writeFormatForSize(size: usize, writer: anytype, comptime enable_ansi_colors: bool) !void {
+ try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors));
+ try writer.print(
+ comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors),
+ .{
+ bun.fmt.size(size),
+ },
+ );
+ }
pub fn writeFormat(this: *const Blob, formatter: *JSC.Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {
const Writer = @TypeOf(writer);
@@ -947,38 +968,34 @@ pub const Blob = struct {
return;
}
- var store = this.store.?;
- switch (store.data) {
- .file => |file| {
- try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors));
- switch (file.pathlike) {
- .path => |path| {
- try writer.print(
- comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors),
- .{
- path.slice(),
- },
- );
- },
- .fd => |fd| {
- try writer.print(
- comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors),
- .{
- fd,
- },
- );
- },
- }
- },
- .bytes => {
- try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors));
- try writer.print(
- comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors),
- .{
- bun.fmt.size(this.size),
- },
- );
- },
+ {
+ var store = this.store.?;
+ switch (store.data) {
+ .file => |file| {
+ try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors));
+ switch (file.pathlike) {
+ .path => |path| {
+ try writer.print(
+ comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors),
+ .{
+ path.slice(),
+ },
+ );
+ },
+ .fd => |fd| {
+ try writer.print(
+ comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors),
+ .{
+ fd,
+ },
+ );
+ },
+ }
+ },
+ .bytes => {
+ try writeFormatForSize(this.size, writer, enable_ansi_colors);
+ },
+ }
}
if (this.content_type.len > 0 or this.offset > 0) {
@@ -1071,7 +1088,7 @@ pub const Blob = struct {
bun.default_allocator.destroy(this);
promise.reject(globalThis, ZigString.init("Body was used after it was consumed").toErrorInstance(globalThis));
},
- .Empty, .Blob => {
+ .InternalBlob, .Empty, .Blob => {
var blob = value.use();
// TODO: this should be one promise not two!
const new_promise = writeFileWithSourceDestination(globalThis.ref(), &blob, &file_blob);
@@ -1273,7 +1290,7 @@ pub const Blob = struct {
var source_blob: Blob = brk: {
if (data.as(Response)) |response| {
switch (response.body.value) {
- .Used, .Empty, .Blob => {
+ .InternalBlob, .Used, .Empty, .Blob => {
break :brk response.body.use();
},
.Error => {
@@ -1302,7 +1319,7 @@ pub const Blob = struct {
if (data.as(Request)) |request| {
switch (request.body) {
- .Used, .Empty, .Blob => {
+ .InternalBlob, .Used, .Empty, .Blob => {
break :brk request.body.use();
},
.Error => {
@@ -3321,11 +3338,6 @@ pub const Blob = struct {
return slice_[0..@minimum(slice_.len, @as(usize, this.size))];
}
- pub fn view(this: *const Blob) []const u8 {
- if (this.size == 0 or this.store == null) return "";
- return this.store.?.sharedView()[this.offset..][0..this.size];
- }
-
pub const Lifetime = JSC.WebCore.Lifetime;
pub fn setIsASCIIFlag(this: *Blob, is_all_ascii: bool) void {
this.is_all_ascii = is_all_ascii;
@@ -3881,10 +3893,10 @@ pub const Body = struct {
return this.value.use();
}
- pub fn clone(this: Body, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) Body {
+ pub fn clone(this: *Body, globalThis: *JSGlobalObject) Body {
return Body{
.init = this.init.clone(globalThis),
- .value = this.value.clone(allocator),
+ .value = this.value.clone(globalThis),
};
}
@@ -3911,6 +3923,16 @@ pub const Body = struct {
try formatter.printComma(Writer, writer, enable_ansi_colors);
try writer.writeAll("\n");
try this.value.Blob.writeFormat(formatter, writer, enable_ansi_colors);
+ } else if (this.value == .InternalBlob) {
+ try formatter.printComma(Writer, writer, enable_ansi_colors);
+ try writer.writeAll("\n");
+ try Blob.writeFormatForSize(this.value.InternalBlob.items.len, writer, enable_ansi_colors);
+ } else if (this.value == .Locked) {
+ if (this.value.Locked.readable) |stream| {
+ try formatter.printComma(Writer, writer, enable_ansi_colors);
+ try writer.writeAll("\n");
+ formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors);
+ }
}
}
@@ -3982,6 +4004,8 @@ pub const Body = struct {
/// conditionally runs when requesting data
/// used in HTTP server to ignore request bodies unless asked for it
onPull: ?fn (ctx: *anyopaque) void = null,
+ onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null,
+
deinit: bool = false,
action: Action = Action.none,
@@ -4041,6 +4065,9 @@ pub const Body = struct {
pub const Value = union(Tag) {
Blob: Blob,
+ /// Single-use Blob
+ /// Avoids a heap allocation.
+ InternalBlob: std.ArrayList(u8),
Locked: PendingValue,
Used: void,
Empty: void,
@@ -4048,6 +4075,7 @@ pub const Body = struct {
pub const Tag = enum {
Blob,
+ InternalBlob,
Locked,
Used,
Empty,
@@ -4056,6 +4084,87 @@ pub const Body = struct {
pub const empty = Value{ .Empty = .{} };
+
+ pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) JSValue {
+ JSC.markBinding();
+
+ switch (this.*) {
+ .Used, .Empty => {
+ return JSC.WebCore.ReadableStream.empty(globalThis);
+ },
+ .InternalBlob => |*bytes| {
+ var blob = Blob.init(bytes.toOwnedSlice(), bytes.allocator, globalThis);
+ defer blob.detach();
+ var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size);
+ this.* = .{
+ .Locked = .{
+ .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?,
+ .global = globalThis,
+ },
+ };
+ return readable;
+ },
+ .Blob => {
+ var blob = this.Blob;
+ defer blob.detach();
+ var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size);
+ readable.protect();
+ this.* = .{
+ .Locked = .{
+ .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?,
+ .global = globalThis,
+ },
+ };
+ return readable;
+ },
+ .Locked => {
+ var locked = &this.Locked;
+ if (locked.readable) |readable| {
+ return readable.value;
+ }
+ var drain_result: JSC.WebCore.DrainResult = .{
+ .estimated_size = 0,
+ };
+
+ if (locked.onDrain) |drain| {
+ locked.onDrain = null;
+ drain_result = drain(locked.task.?);
+ }
+
+ if (drain_result == .empty or drain_result == .aborted) {
+ this.* = .{ .Empty = void{} };
+ return JSC.WebCore.ReadableStream.empty(globalThis);
+ }
+
+ var reader = bun.default_allocator.create(JSC.WebCore.ByteStream.Source) catch unreachable;
+ reader.* = .{
+ .context = undefined,
+ .globalThis = globalThis,
+ };
+
+ reader.context.setup();
+
+ if (drain_result == .estimated_size) {
+ reader.context.highWaterMark = @truncate(Blob.SizeType, drain_result.estimated_size);
+ reader.context.size_hint = @truncate(Blob.SizeType, drain_result.estimated_size);
+ } else if (drain_result == .owned) {
+ reader.context.buffer = drain_result.owned.list;
+ reader.context.size_hint = @truncate(Blob.SizeType, drain_result.owned.size_hint);
+ }
+
+ locked.readable = .{
+ .ptr = .{ .Bytes = &reader.context },
+ .value = reader.toJS(globalThis),
+ };
+
+ locked.readable.?.value.protect();
+ return locked.readable.?.value;
+ },
+
+ else => unreachable,
+ }
+ }
+
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) ?Value {
if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |readable| {
switch (readable.ptr) {
@@ -4121,13 +4230,29 @@ pub const Body = struct {
if (locked.promise) |promise| {
locked.promise = null;
- var blob = new.use();
switch (locked.action) {
.getText => {
- promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref()));
+ if (new.* == .InternalBlob) {
+ var bytes = new.InternalBlob;
+ new.* = .{ .Empty = void{} };
+ var str = ZigString.init(bytes.items).withEncoding();
+ str.mark();
+ if (str.is16Bit()) {
+ const out = str.toValueGC(global);
+ bytes.deinit();
+ promise.asPromise().?.resolve(global, out);
+ } else {
+ const out = str.toExternalValue(global);
+ promise.asPromise().?.resolve(global, out);
+ }
+ } else {
+ var blob = new.use();
+ promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref()));
+ }
},
.getJSON => {
+ var blob = new.use();
const json_value = blob.toJSON(global, .share);
blob.detach();
@@ -4138,17 +4263,25 @@ pub const Body = struct {
}
},
.getArrayBuffer => {
- promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global));
+ if (new.* == .InternalBlob) {
+ const marked = JSC.MarkedArrayBuffer.fromBytes(new.InternalBlob.items, new.InternalBlob.allocator, .ArrayBuffer);
+ new.* = .{ .Empty = void{} };
+ var object_ref = marked.toJS(global, null);
+ promise.asPromise().?.resolve(global, JSC.JSValue.c(object_ref));
+ } else {
+ var blob = new.use();
+ promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global));
+ }
},
.getBlob => {
var ptr = bun.default_allocator.create(Blob) catch unreachable;
- ptr.* = blob;
+ ptr.* = new.use();
ptr.allocator = bun.default_allocator;
promise.asPromise().?.resolve(global, ptr.toJS(global));
},
else => {
var ptr = bun.default_allocator.create(Blob) catch unreachable;
- ptr.* = blob;
+ ptr.* = new.use();
ptr.allocator = bun.default_allocator;
promise.asInternalPromise().?.resolve(global, ptr.toJS(global));
},
@@ -4160,6 +4293,7 @@ pub const Body = struct {
pub fn slice(this: Value) []const u8 {
return switch (this) {
.Blob => this.Blob.sharedView(),
+ .InternalBlob => |list| list.items,
else => "",
};
}
@@ -4172,6 +4306,15 @@ pub const Body = struct {
this.* = .{ .Used = .{} };
return new_blob;
},
+ .InternalBlob => {
+ var new_blob = Blob.init(
+ this.InternalBlob.toOwnedSlice(),
+ this.InternalBlob.allocator,
+ JSC.VirtualMachine.vm.global,
+ );
+ this.* = .{ .Used = .{} };
+ return new_blob;
+ },
else => {
return Blob.initEmpty(undefined);
},
@@ -4228,14 +4371,22 @@ pub const Body = struct {
pub fn deinit(this: *Value) void {
const tag = @as(Tag, this.*);
if (tag == .Locked) {
- if (this.Locked.readable) |*readable| {
- readable.done();
+ if (!this.Locked.deinit) {
+ this.Locked.deinit = true;
+
+ if (this.Locked.readable) |*readable| {
+ readable.done();
+ }
}
- this.Locked.deinit = true;
return;
}
+ if (tag == .InternalBlob) {
+ this.InternalBlob.clearAndFree();
+ this.* = Value.empty;
+ }
+
if (tag == .Blob) {
this.Blob.deinit();
this.* = Value.empty;
@@ -4246,8 +4397,18 @@ pub const Body = struct {
}
}
- pub fn clone(this: Value, _: std.mem.Allocator) Value {
- if (this == .Blob) {
+ pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value {
+ if (this.* == .InternalBlob) {
+ this.* = .{
+ .Blob = Blob.init(
+ this.InternalBlob.toOwnedSlice(),
+ this.InternalBlob.allocator,
+ globalThis,
+ ),
+ };
+ }
+
+ if (this.* == .Blob) {
return Value{ .Blob = this.Blob.dupe() };
}
@@ -4319,10 +4480,16 @@ pub const Body = struct {
} else |_| {}
}
+ if (value.isUndefined()) {
+ body.value = Value.empty;
+ return body;
+ }
+
body.value = Value.fromJS(globalThis, value) orelse return null;
if (body.value == .Blob)
std.debug.assert(body.value.Blob.allocator == null); // owned by Body
+
return body;
}
};
@@ -4358,7 +4525,18 @@ pub const Request = struct {
try writer.writeAll("\"");
if (this.body == .Blob) {
try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
try this.body.Blob.writeFormat(formatter, writer, enable_ansi_colors);
+ } else if (this.body == .InternalBlob) {
+ try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
+ try Blob.writeFormatForSize(this.body.InternalBlob.items.len, writer, enable_ansi_colors);
+ } else if (this.body == .Locked) {
+ if (this.body.Locked.readable) |stream| {
+ try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
+ formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors);
+ }
}
}
try writer.writeAll("\n");
@@ -4392,7 +4570,7 @@ pub const Request = struct {
return MimeType.other.value;
},
- .Error, .Used, .Locked, .Empty => return MimeType.other.value,
+ .InternalBlob, .Error, .Used, .Locked, .Empty => return MimeType.other.value,
}
}
@@ -4415,6 +4593,18 @@ pub const Request = struct {
return ZigString.init("").toValueGC(globalThis);
}
+ pub fn getBody(
+ this: *Request,
+ globalThis: *JSC.JSGlobalObject,
+ ) callconv(.C) JSValue {
+ if (this.body == .Used) {
+ globalThis.throw("Body already used", .{});
+ return JSValue.jsUndefined();
+ }
+
+ return this.body.toReadableStream(globalThis);
+ }
+
pub fn getIntegrity(
_: *Request,
globalThis: *JSC.JSGlobalObject,
@@ -4521,17 +4711,9 @@ pub const Request = struct {
}
if (urlOrObject.fastGet(globalThis, .body)) |body_| {
- if (Blob.get(globalThis, body_, true, false)) |blob| {
- if (blob.size > 0) {
- request.body = Body.Value{ .Blob = blob };
- }
- } else |err| {
- if (err == error.InvalidArguments) {
- globalThis.throwInvalidArguments("Expected an Array", .{});
- return null;
- }
-
- globalThis.throwInvalidArguments("Invalid Body object", .{});
+ if (Body.Value.fromJS(globalThis, body_)) |body| {
+ request.body = body.value;
+ } else {
return null;
}
}
@@ -4546,17 +4728,9 @@ pub const Request = struct {
}
if (arguments[1].fastGet(globalThis, .body)) |body_| {
- if (Blob.get(globalThis, body_, true, false)) |blob| {
- if (blob.size > 0) {
- request.body = Body.Value{ .Blob = blob };
- }
- } else |err| {
- if (err == error.InvalidArguments) {
- globalThis.throwInvalidArguments("Expected an Array", .{});
- return null;
- }
-
- globalThis.throwInvalidArguments("Invalid Body object", .{});
+ if (Body.Value.fromJS(globalThis, body_)) |body| {
+ request.body = body.value;
+ } else {
return null;
}
}
@@ -4614,7 +4788,7 @@ pub const Request = struct {
globalThis: *JSGlobalObject,
) void {
req.* = Request{
- .body = this.body.clone(allocator),
+ .body = this.body.clone(globalThis),
.url = ZigString.init(allocator.dupe(u8, this.url.slice()) catch unreachable),
.method = this.method,
};
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 23aad70ec..681ff6172 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -70,7 +70,7 @@ pub const ReadableStream = struct {
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding();
this.value.unprotect();
- ReadableStream__abort(this.value, globalThis);
+ ReadableStream__cancel(this.value, globalThis);
}
pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
@@ -98,6 +98,8 @@ pub const ReadableStream = struct {
/// This is a direct readable stream
/// That means we can turn it into whatever we want
Direct = 3,
+
+ Bytes = 4,
};
pub const Source = union(Tag) {
Invalid: void,
@@ -116,6 +118,8 @@ pub const ReadableStream = struct {
/// This is a direct readable stream
/// That means we can turn it into whatever we want
Direct: void,
+
+ Bytes: *ByteStream,
};
extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag;
@@ -165,6 +169,13 @@ pub const ReadableStream = struct {
},
},
+ .Bytes => ReadableStream{
+ .value = value,
+ .ptr = .{
+ .Bytes = ptr.asPtr(ByteStream),
+ },
+ },
+
// .HTTPRequest => ReadableStream{
// .value = value,
// .ptr = .{
@@ -184,6 +195,7 @@ pub const ReadableStream = struct {
extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue;
pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue {
+ JSC.markBinding();
return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id)));
}
pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
@@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) {
}
};
+pub const DrainResult = union(enum) {
+ owned: struct {
+ list: std.ArrayList(u8),
+ size_hint: usize,
+ },
+ estimated_size: usize,
+ empty: void,
+ aborted: void,
+};
+
pub const StreamResult = union(Tag) {
owned: bun.ByteList,
owned_and_done: bun.ByteList,
@@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct {
pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
};
-pub fn RequestBodyStreamer(
- comptime is_ssl: bool,
-) type {
- return struct {
- response: *uws.NewApp(is_ssl).Response,
+pub const ByteStream = struct {
+ buffer: std.ArrayList(u8) = .{
+ .allocator = bun.default_allocator,
+ .items = &.{},
+ .capacity = 0,
+ },
+ has_received_last_chunk: bool = false,
+ pending: StreamResult.Pending = StreamResult.Pending{
+ .frame = undefined,
+ .used = false,
+ .result = .{ .done = {} },
+ },
+ done: bool = false,
+ pending_buffer: []u8 = &.{},
+ pending_value: ?*JSC.napi.Ref = null,
+ offset: usize = 0,
+ highWaterMark: Blob.SizeType = 0,
+ pipe: Pipe = .{},
+ size_hint: Blob.SizeType = 0,
+
+ pub const Pipe = struct {
+ ctx: ?*anyopaque = null,
+ onPipe: ?PipeFunction = null,
+
+ pub fn New(comptime Type: type, comptime Function: anytype) type {
+ return struct {
+ pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void {
+ Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator);
+ }
- pub const tag = if (is_ssl)
- ReadableStream.Tag.HTTPRequest
- else if (is_ssl)
- ReadableStream.Tag.HTTPSRequest;
+ pub fn init(self: *Type) Pipe {
+ return Pipe{
+ .ctx = self,
+ .onPipe = pipe,
+ };
+ }
+ };
+ }
+ };
- pub fn onStart(this: *ByteBlobLoader) StreamStart {
- return .{ .chunk_size = this.chunk_size };
+ pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
+
+ pub const tag = ReadableStream.Tag.Bytes;
+
+ pub fn setup(this: *ByteStream) void {
+ this.* = .{};
+ }
+
+ pub fn onStart(this: *@This()) StreamStart {
+ if (this.has_received_last_chunk and this.buffer.items.len == 0) {
+ return .{ .empty = void{} };
}
- pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
- array.ensureStillAlive();
- defer array.ensureStillAlive();
- if (this.done) {
- return .{ .done = {} };
+ if (this.has_received_last_chunk) {
+ return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) };
+ }
+
+ if (this.highWaterMark == 0) {
+ return .{ .ready = void{} };
+ }
+
+ return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) };
+ }
+
+ pub fn value(this: *@This()) JSValue {
+ if (this.pending_value == null)
+ return .zero;
+
+ const result = this.pending_value.?.get();
+ this.pending_value.?.set(.zero);
+
+ return result;
+ }
+
+ pub fn isCancelled(this: *const @This()) bool {
+ return @fieldParentPtr(Source, "context", this).cancelled;
+ }
+
+ pub fn unpipe(this: *@This()) void {
+ this.pipe.ctx = null;
+ this.pipe.onPipe = null;
+ if (!this.parent().deinited) {
+ this.parent().deinited = true;
+ bun.default_allocator.destroy(this.parent());
+ }
+ }
+
+ pub fn onData(
+ this: *@This(),
+ stream: StreamResult,
+ allocator: std.mem.Allocator,
+ ) void {
+ JSC.markBinding();
+ if (this.done) {
+ if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) {
+ if (stream == .owned) allocator.free(stream.owned.slice());
+ if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice());
}
- var temporary = this.store.sharedView();
- temporary = temporary[this.offset..];
+ return;
+ }
+
+ std.debug.assert(!this.has_received_last_chunk);
+ this.has_received_last_chunk = stream.isDone();
+
+ if (this.pipe.ctx != null) {
+ this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator);
+ return;
+ }
+
+ const chunk = stream.slice();
+
+ if (!this.pending.used) {
+ std.debug.assert(this.buffer.items.len == 0);
+ var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)];
+ const pending_buffer_len = this.pending_buffer.len;
+ @memcpy(to_copy.ptr, chunk.ptr, to_copy.len);
+ this.pending_buffer = &.{};
+
+ const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len;
- temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
- if (temporary.len == 0) {
- this.store.deref();
+ if (is_really_done) {
this.done = true;
- return .{ .done = {} };
+ this.pending.result = .{
+ .into_array_and_done = .{
+ .value = this.value(),
+ .len = @truncate(Blob.SizeType, to_copy.len),
+ },
+ };
+ } else {
+ this.pending.result = .{
+ .into_array = .{
+ .value = this.value(),
+ .len = @truncate(Blob.SizeType, to_copy.len),
+ },
+ };
}
- const copied = @intCast(Blob.SizeType, temporary.len);
+ const remaining = chunk[to_copy.len..];
+ if (remaining.len > 0)
+ this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body");
+ resume this.pending.frame;
+ return;
+ }
- this.remain -|= copied;
- this.offset +|= copied;
- @memcpy(buffer.ptr, temporary.ptr, temporary.len);
- if (this.remain == 0) {
- return .{ .into_array_and_done = .{ .value = array, .len = copied } };
+ this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body");
+ }
+
+ pub fn append(
+ this: *@This(),
+ stream: StreamResult,
+ offset: usize,
+ allocator: std.mem.Allocator,
+ ) !void {
+ const chunk = stream.slice()[offset..];
+
+ if (this.buffer.capacity == 0) {
+ switch (stream) {
+ .owned => |owned| {
+ this.buffer = owned.listManaged(allocator);
+ this.offset += offset;
+ },
+ .owned_and_done => |owned| {
+ this.buffer = owned.listManaged(allocator);
+ this.offset += offset;
+ },
+ .temporary_and_done, .temporary => {
+ this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
+ this.buffer.appendSliceAssumeCapacity(chunk);
+ },
+ else => unreachable,
}
+ return;
+ }
+
+ switch (stream) {
+ .temporary_and_done, .temporary => {
+ try this.buffer.appendSlice(chunk);
+ },
+ // We don't support the rest of these yet
+ else => unreachable,
+ }
+ }
- return .{ .into_array = .{ .value = array, .len = copied } };
+ pub fn setValue(this: *@This(), view: JSC.JSValue) void {
+ JSC.markBinding();
+ if (this.pending_value) |pending| {
+ pending.set(view);
+ } else {
+ this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view);
}
+ }
- pub fn onCancel(_: *ByteBlobLoader) void {}
+ pub fn parent(this: *@This()) *Source {
+ return @fieldParentPtr(Source, "context", this);
+ }
- pub fn deinit(this: *ByteBlobLoader) void {
- if (!this.done) {
+ pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult {
+ JSC.markBinding();
+ std.debug.assert(buffer.len > 0);
+
+ if (this.buffer.items.len > 0) {
+ std.debug.assert(this.value() == .zero);
+ const to_write = @minimum(
+ this.buffer.items.len - this.offset,
+ buffer.len,
+ );
+ var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write];
+
+ @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write);
+
+ if (this.offset + to_write == this.buffer.items.len) {
+ this.offset = 0;
+ this.buffer.items.len = 0;
+ } else {
+ this.offset += to_write;
+ }
+
+ if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
+ this.buffer.clearAndFree();
this.done = true;
- this.store.deref();
+
+ return .{
+ .into_array_and_done = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, to_write),
+ },
+ };
+ }
+
+ return .{
+ .into_array = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, to_write),
+ },
+ };
+ }
+
+ if (this.has_received_last_chunk) {
+ return .{
+ .done = void{},
+ };
+ }
+
+ this.pending_buffer = buffer;
+ this.setValue(view);
+
+ return .{
+ .pending = &this.pending,
+ };
+ }
+
+ pub fn onCancel(this: *@This()) void {
+ JSC.markBinding();
+ const view = this.value();
+ if (this.buffer.capacity > 0) this.buffer.clearAndFree();
+ this.done = true;
+ if (this.pending_value) |ref| {
+ this.pending_value = null;
+ ref.destroy(undefined);
+ }
+
+ if (view != .zero) {
+ this.pending_buffer = &.{};
+ this.pending.result = .{ .done = {} };
+ if (!this.pending.used) {
+ resume this.pending.frame;
}
+ }
+ }
+
+ pub fn deinit(this: *@This()) void {
+ JSC.markBinding();
+ if (this.buffer.capacity > 0) this.buffer.clearAndFree();
- bun.default_allocator.destroy(this);
+ if (this.pending_value) |ref| {
+ this.pending_value = null;
+ ref.destroy(undefined);
}
+ if (!this.done) {
+ this.done = true;
- pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer";
- pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit);
- };
-}
+ this.pending_buffer = &.{};
+ this.pending.result = .{ .done = {} };
+
+ if (!this.pending.used) {
+ resume this.pending.frame;
+ }
+ }
+
+ bun.default_allocator.destroy(this.parent());
+ }
+
+ pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit);
+};
pub const FileBlobLoader = struct {
buf: []u8 = &[_]u8{},