aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-09-26 20:04:28 -0700
committerGravatar GitHub <noreply@github.com> 2022-09-26 20:04:28 -0700
commit24a9bc23b7e1c7911cb2e146be199d940b9729e6 (patch)
tree852a75cff3950063b405ca3a0dfe22e46d0eecfb /src/bun.js
parent97c3688788a94faffb6bceb4bc6c97fb84307ceb (diff)
downloadbun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.gz
bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.zst
bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.zip
[Web Streams] Add `body` to `Response` and `Request` (#1255)
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/api/server.zig293
-rw-r--r--src/bun.js/bindings/ZigGeneratedClasses.cpp52
-rw-r--r--src/bun.js/bindings/ZigGeneratedClasses.h6
-rw-r--r--src/bun.js/bindings/exports.zig1
-rw-r--r--src/bun.js/bindings/generated_classes.zig8
-rw-r--r--src/bun.js/bindings/headers-cpp.h2
-rw-r--r--src/bun.js/bindings/headers.h8
-rw-r--r--src/bun.js/bindings/napi.cpp16
-rw-r--r--src/bun.js/webcore/response.classes.ts2
-rw-r--r--src/bun.js/webcore/response.zig340
-rw-r--r--src/bun.js/webcore/streams.zig336
11 files changed, 914 insertions, 150 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index d7cfbe4c1..939e010f3 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -538,7 +538,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
sendfile: SendfileContext = undefined,
request_js_object: JSC.C.JSObjectRef = null,
request_body_buf: std.ArrayListUnmanaged(u8) = .{},
+ request_body_content_len: usize = 0,
sink: ?*ResponseStream.JSSink = null,
+ byte_stream: ?*JSC.WebCore.ByteStream = null,
has_written_status: bool = false,
@@ -696,20 +698,67 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
this.response_buf_owned = std.ArrayListUnmanaged(u8){ .items = bb.items, .capacity = bb.capacity };
- this.renderResponseBuffer();
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
}
pub fn renderResponseBuffer(this: *RequestContext) void {
this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this);
}
- pub fn onWritableResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool {
+ /// Render a complete response buffer
+ pub fn renderResponseBufferAndMetadata(this: *RequestContext) void {
+ this.renderMetadata();
+
+ if (!this.resp.tryEnd(
+ this.response_buf_owned.items,
+ this.response_buf_owned.items.len,
+ )) {
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
+ this.setAbortHandler();
+ return;
+ }
+
+ this.finalize();
+ }
+
+ /// Drain a partial response buffer
+ pub fn drainResponseBufferAndMetadata(this: *RequestContext) void {
+ this.renderMetadata();
+ this.setAbortHandler();
+
+ _ = this.resp.write(
+ this.response_buf_owned.items,
+ );
+
+ this.response_buf_owned.items.len = 0;
+ }
+
+ pub fn renderResponseBufferAndMetadataCorked(this: *RequestContext) void {
+ this.resp.runCorkedWithType(*RequestContext, renderResponseBufferAndMetadata, this);
+ }
+
+ pub fn drainResponseBufferAndMetadataCorked(this: *RequestContext) void {
+ this.resp.runCorkedWithType(*RequestContext, drainResponseBufferAndMetadata, this);
+ }
+
+ pub fn onWritableResponseBuffer(this: *RequestContext, _: c_ulong, resp: *App.Response) callconv(.C) bool {
+ std.debug.assert(this.resp == resp);
+ if (this.aborted) {
+ this.finalizeForAbort();
+ return false;
+ }
+ resp.end("", false);
+ this.finalize();
+ return false;
+ }
+
+ pub fn onWritableCompleteResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool {
std.debug.assert(this.resp == resp);
if (this.aborted) {
this.finalizeForAbort();
return false;
}
- return this.sendWritableBytes(this.response_buf_owned.items, write_offset, resp);
+ return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp);
}
pub fn create(this: *RequestContext, server: *ThisServer, req: *uws.Request, resp: *App.Response) void {
@@ -772,11 +821,24 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) {
this.pending_promises_for_abort += 1;
req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
+ } else if (req.body == .Locked and (req.body.Locked.readable != null)) {
+ req.body.Locked.readable.?.abort(this.server.globalThis);
+ req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
+ req.body.Locked.readable = null;
}
req.uws_request = null;
}
}
+ if (this.response_ptr) |response| {
+ if (response.body.value == .Locked) {
+ if (response.body.value.Locked.readable) |*readable| {
+ response.body.value.Locked.readable = null;
+ readable.abort(this.server.globalThis);
+ }
+ }
+ }
+
// then, we reject the response promise
if (this.promise) |promise| {
this.pending_promises_for_abort += 1;
@@ -841,6 +903,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
JSC.C.JSValueUnprotect(this.server.globalThis.ref(), promise.asObjectRef());
}
+
+ if (this.byte_stream) |stream| {
+ this.byte_stream = null;
+ stream.unpipe();
+ }
}
pub fn finalize(this: *RequestContext) void {
this.finalizeWithoutDeinit();
@@ -974,10 +1041,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
var bytes = this.blob.sharedView();
- return this.sendWritableBytes(bytes, write_offset, resp);
+ _ = this.sendWritableBytesForBlob(bytes, write_offset, resp);
+ return true;
}
- pub fn sendWritableBytes(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {
+ pub fn sendWritableBytesForBlob(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {
std.debug.assert(this.resp == resp);
var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..];
@@ -990,6 +1058,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
+ pub fn sendWritableBytesForCompleteResponseBuffer(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {
+ std.debug.assert(this.resp == resp);
+
+ var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..];
+ if (resp.tryEnd(bytes, bytes_.len)) {
+ this.response_buf_owned.items.len = 0;
+ this.finalize();
+ } else {
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
+ }
+
+ return true;
+ }
+
pub fn onWritableSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool {
return this.onSendfile();
}
@@ -1125,7 +1207,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
} else {
this.blob.size = @truncate(Blob.SizeType, result.result.buf.len);
this.response_buf_owned = .{ .items = result.result.buf, .capacity = result.result.buf.len };
- this.renderResponseBuffer();
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
}
}
@@ -1481,6 +1563,41 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair);
return;
},
+
+ .Bytes => |byte_stream| {
+ std.debug.assert(byte_stream.pipe.ctx == null);
+ std.debug.assert(this.byte_stream == null);
+
+ stream.detach(this.server.globalThis);
+
+ this.response_buf_owned = byte_stream.buffer.moveToUnmanaged();
+
+ // If we've received the complete body by the time this function is called
+ // we can avoid streaming it and just send it all at once.
+ if (byte_stream.has_received_last_chunk) {
+ this.blob.size = @truncate(Blob.SizeType, this.response_buf_owned.items.len);
+ byte_stream.parent().deinit();
+ this.renderResponseBufferAndMetadataCorked();
+ return;
+ }
+
+ byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this);
+ this.byte_stream = byte_stream;
+
+ // we don't set size here because even if we have a hint
+ // uWebSockets won't let us partially write streaming content
+ this.blob.size = 0;
+
+ // if we've received metadata and part of the body, send everything we can and drain
+ if (this.response_buf_owned.items.len > 0) {
+ this.drainResponseBufferAndMetadataCorked();
+ } else {
+ // if we only have metadata to send, send it now
+ this.resp.runCorkedWithType(*RequestContext, renderMetadata, this);
+ }
+ this.setAbortHandler();
+ return;
+ },
}
}
@@ -1496,6 +1613,42 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.doRenderBlob();
}
+ pub fn onPipe(this: *RequestContext, stream: JSC.WebCore.StreamResult, allocator: std.mem.Allocator) void {
+ var stream_needs_deinit = stream == .owned or stream == .owned_and_done;
+
+ defer {
+ if (stream_needs_deinit) {
+ if (stream.isDone()) {
+ stream.owned_and_done.listManaged(allocator).deinit();
+ } else {
+ stream.owned.listManaged(allocator).deinit();
+ }
+ }
+ }
+
+ if (this.aborted) {
+ this.finalizeForAbort();
+ return;
+ }
+
+ const chunk = stream.slice();
+ // on failure, it will continue to allocate
+ // we can't do buffering ourselves here or it won't work
+ // uSockets will append and manage the buffer
+ // so any write will buffer if the write fails
+ if (this.resp.write(chunk)) {
+ if (stream.isDone()) {
+ this.resp.endStream(false);
+ this.finalize();
+ }
+ } else {
+ // when it's the last one, we just want to know if it's done
+ if (stream.isDone()) {
+ this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this);
+ }
+ }
+ }
+
pub fn doRenderBlob(this: *RequestContext) void {
// We are not corked
// The body is small
@@ -1712,34 +1865,123 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.aborted) return;
- this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body");
+ const request = JSC.JSValue.fromRef(this.request_js_object);
+ var req = request.as(Request) orelse {
+ this.request_body_buf.clearAndFree(this.allocator);
+ return;
+ };
+
+ if (req.body == .Locked) {
+ if (req.body.Locked.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ std.debug.assert(this.request_body_buf.items.len == 0);
+
+ if (!last) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.init(chunk),
+ },
+ bun.default_allocator,
+ );
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.init(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+
+ return;
+ }
+ }
+ }
+
if (last) {
- const request = JSC.JSValue.fromRef(this.request_js_object);
- if (request.as(Request)) |req| {
- request.ensureStillAlive();
- var bytes = this.request_body_buf.toOwnedSlice(this.allocator);
- var old = req.body;
- req.body = .{
- .Blob = if (bytes.len > 0)
- Blob.init(bytes, this.allocator, this.server.globalThis)
- else
- Blob.initEmpty(this.server.globalThis),
- };
- if (old == .Locked)
- old.resolve(&req.body, this.server.globalThis);
- request.unprotect();
+ request.ensureStillAlive();
+ var bytes = this.request_body_buf;
+ var old = req.body;
+ if (bytes.items.len == 0) {
+ req.body = .{ .Empty = {} };
} else {
- this.request_body_buf.clearAndFree(this.allocator);
+ req.body = .{ .InternalBlob = bytes.toManaged(this.allocator) };
}
+
+ if (old == .Locked)
+ old.resolve(&req.body, this.server.globalThis);
+ request.unprotect();
}
+
+ if (this.request_body_buf.capacity == 0) {
+ this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @minimum(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer");
+ }
+
+ this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body");
}
+ pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult {
+ if (this.aborted) {
+ return JSC.WebCore.DrainResult{
+ .aborted = void{},
+ };
+ }
+
+ std.debug.assert(!this.resp.hasResponded());
+
+ // This means we have received part of the body but not the whole thing
+ if (this.request_body_buf.items.len > 0) {
+ var emptied = this.request_body_buf;
+ this.request_body_buf = .{};
+ return .{
+ .owned = .{
+ .list = emptied.toManaged(this.allocator),
+ .size_hint = if (emptied.capacity < max_request_body_preallocate_length)
+ emptied.capacity
+ else
+ 0,
+ },
+ };
+ }
+
+ const content_length = this.req.header("content-length") orelse {
+ return .{
+ .empty = void{},
+ };
+ };
+
+ const len = std.fmt.parseInt(usize, content_length, 10) catch 0;
+ this.request_body_content_len = len;
+
+ if (len == 0) {
+ return JSC.WebCore.DrainResult{
+ .empty = void{},
+ };
+ }
+
+ if (len > this.server.config.max_request_body_size) {
+ this.resp.writeStatus("413 Request Entity Too Large");
+ this.resp.endWithoutBody();
+
+ this.finalize();
+ return JSC.WebCore.DrainResult{
+ .aborted = void{},
+ };
+ }
+
+ this.resp.onData(*RequestContext, onBufferedBodyChunk, this);
+
+ return .{
+ .estimated_size = len,
+ };
+ }
+ const max_request_body_preallocate_length = 1024 * 256;
pub fn onPull(this: *RequestContext) void {
const request = JSC.JSValue.c(this.request_js_object);
request.ensureStillAlive();
if (this.req.header("content-length")) |content_length| {
const len = std.fmt.parseInt(usize, content_length, 10) catch 0;
+ this.request_body_content_len = len;
if (len == 0) {
if (request.as(Request)) |req| {
var old = req.body;
@@ -1766,8 +2008,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.finalize();
return;
}
-
- this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, len) catch @panic("Out of memory while allocating request body buffer");
} else if (this.req.header("transfer-encoding") == null) {
// no content-length
// no transfer-encoding
@@ -1789,6 +2029,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
onPull(bun.cast(*RequestContext, this));
}
+ pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult {
+ return onDrainRequestBody(bun.cast(*RequestContext, this));
+ }
+
pub const Export = shim.exportFunctions(.{
.onResolve = onResolve,
.onReject = onReject,
@@ -2243,6 +2487,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
.task = ctx,
.global = this.globalThis,
.onPull = RequestContext.onPullCallback,
+ .onDrain = RequestContext.onDrainRequestBodyCallback,
},
},
};
diff --git a/src/bun.js/bindings/ZigGeneratedClasses.cpp b/src/bun.js/bindings/ZigGeneratedClasses.cpp
index 78f1bb179..925814c2d 100644
--- a/src/bun.js/bindings/ZigGeneratedClasses.cpp
+++ b/src/bun.js/bindings/ZigGeneratedClasses.cpp
@@ -2505,6 +2505,10 @@ extern "C" EncodedJSValue RequestPrototype__getBlob(void* ptr, JSC::JSGlobalObje
JSC_DECLARE_HOST_FUNCTION(RequestPrototype__blobCallback);
+extern "C" JSC::EncodedJSValue RequestPrototype__getBody(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject);
+JSC_DECLARE_CUSTOM_GETTER(RequestPrototype__bodyGetterWrap);
+
+
extern "C" JSC::EncodedJSValue RequestPrototype__getBodyUsed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject);
JSC_DECLARE_CUSTOM_GETTER(RequestPrototype__bodyUsedGetterWrap);
@@ -2571,6 +2575,7 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSRequestPrototype, JSRequestPrototype::Base
static const HashTableValue JSRequestPrototypeTableValues[] = {
{ "arrayBuffer"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__arrayBufferCallback, 0 } } ,
{ "blob"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__blobCallback, 0 } } ,
+{ "body"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__bodyGetterWrap, 0 } } ,
{ "bodyUsed"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__bodyUsedGetterWrap, 0 } } ,
{ "cache"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__cacheGetterWrap, 0 } } ,
{ "clone"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__cloneCallback, 1 } } ,
@@ -2641,6 +2646,26 @@ JSC_DEFINE_HOST_FUNCTION(RequestPrototype__blobCallback, (JSGlobalObject * lexic
}
+JSC_DEFINE_CUSTOM_GETTER(RequestPrototype__bodyGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))
+{
+ auto& vm = lexicalGlobalObject->vm();
+ Zig::GlobalObject *globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject);
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+ JSRequest* thisObject = jsCast<JSRequest*>(JSValue::decode(thisValue));
+ JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject);
+
+ if (JSValue cachedValue = thisObject->m_body.get())
+ return JSValue::encode(cachedValue);
+
+ JSC::JSValue result = JSC::JSValue::decode(
+ RequestPrototype__getBody(thisObject->wrapped(), globalObject)
+ );
+ RETURN_IF_EXCEPTION(throwScope, {});
+ thisObject->m_body.set(vm, thisObject, result);
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(result));
+}
+
+
JSC_DEFINE_CUSTOM_GETTER(RequestPrototype__bodyUsedGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))
{
auto& vm = lexicalGlobalObject->vm();
@@ -3002,6 +3027,7 @@ void JSRequest::visitChildrenImpl(JSCell* cell, Visitor& visitor)
JSRequest* thisObject = jsCast<JSRequest*>(cell);
ASSERT_GC_OBJECT_INHERITS(thisObject, info());
Base::visitChildren(thisObject, visitor);
+ visitor.append(thisObject->m_body);
visitor.append(thisObject->m_headers);
visitor.append(thisObject->m_url);
}
@@ -3018,6 +3044,10 @@ extern "C" EncodedJSValue ResponsePrototype__getBlob(void* ptr, JSC::JSGlobalObj
JSC_DECLARE_HOST_FUNCTION(ResponsePrototype__blobCallback);
+extern "C" JSC::EncodedJSValue ResponsePrototype__getBody(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject);
+JSC_DECLARE_CUSTOM_GETTER(ResponsePrototype__bodyGetterWrap);
+
+
extern "C" JSC::EncodedJSValue ResponsePrototype__getBodyUsed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject);
JSC_DECLARE_CUSTOM_GETTER(ResponsePrototype__bodyUsedGetterWrap);
@@ -3064,6 +3094,7 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSResponsePrototype, JSResponsePrototype::Ba
static const HashTableValue JSResponsePrototypeTableValues[] = {
{ "arrayBuffer"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__arrayBufferCallback, 0 } } ,
{ "blob"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__blobCallback, 0 } } ,
+{ "body"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__bodyGetterWrap, 0 } } ,
{ "bodyUsed"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__bodyUsedGetterWrap, 0 } } ,
{ "clone"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__cloneCallback, 1 } } ,
{ "headers"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__headersGetterWrap, 0 } } ,
@@ -3129,6 +3160,26 @@ JSC_DEFINE_HOST_FUNCTION(ResponsePrototype__blobCallback, (JSGlobalObject * lexi
}
+JSC_DEFINE_CUSTOM_GETTER(ResponsePrototype__bodyGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))
+{
+ auto& vm = lexicalGlobalObject->vm();
+ Zig::GlobalObject *globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject);
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+ JSResponse* thisObject = jsCast<JSResponse*>(JSValue::decode(thisValue));
+ JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject);
+
+ if (JSValue cachedValue = thisObject->m_body.get())
+ return JSValue::encode(cachedValue);
+
+ JSC::JSValue result = JSC::JSValue::decode(
+ ResponsePrototype__getBody(thisObject->wrapped(), globalObject)
+ );
+ RETURN_IF_EXCEPTION(throwScope, {});
+ thisObject->m_body.set(vm, thisObject, result);
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(result));
+}
+
+
JSC_DEFINE_CUSTOM_GETTER(ResponsePrototype__bodyUsedGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))
{
auto& vm = lexicalGlobalObject->vm();
@@ -3443,6 +3494,7 @@ void JSResponse::visitChildrenImpl(JSCell* cell, Visitor& visitor)
JSResponse* thisObject = jsCast<JSResponse*>(cell);
ASSERT_GC_OBJECT_INHERITS(thisObject, info());
Base::visitChildren(thisObject, visitor);
+ visitor.append(thisObject->m_body);
visitor.append(thisObject->m_headers);
visitor.append(thisObject->m_statusText);
visitor.append(thisObject->m_url);
diff --git a/src/bun.js/bindings/ZigGeneratedClasses.h b/src/bun.js/bindings/ZigGeneratedClasses.h
index feae81ae0..ad85dd3f9 100644
--- a/src/bun.js/bindings/ZigGeneratedClasses.h
+++ b/src/bun.js/bindings/ZigGeneratedClasses.h
@@ -1309,7 +1309,8 @@ class JSRequest final : public JSC::JSDestructibleObject {
DECLARE_VISIT_CHILDREN;
- mutable JSC::WriteBarrier<JSC::Unknown> m_headers;
+ mutable JSC::WriteBarrier<JSC::Unknown> m_body;
+mutable JSC::WriteBarrier<JSC::Unknown> m_headers;
mutable JSC::WriteBarrier<JSC::Unknown> m_url;
};
class JSRequestPrototype final : public JSC::JSNonFinalObject {
@@ -1434,7 +1435,8 @@ class JSResponse final : public JSC::JSDestructibleObject {
DECLARE_VISIT_CHILDREN;
- mutable JSC::WriteBarrier<JSC::Unknown> m_headers;
+ mutable JSC::WriteBarrier<JSC::Unknown> m_body;
+mutable JSC::WriteBarrier<JSC::Unknown> m_headers;
mutable JSC::WriteBarrier<JSC::Unknown> m_statusText;
mutable JSC::WriteBarrier<JSC::Unknown> m_url;
};
diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig
index b2fdd001e..c5ab610b2 100644
--- a/src/bun.js/bindings/exports.zig
+++ b/src/bun.js/bindings/exports.zig
@@ -180,6 +180,7 @@ pub const NodePath = JSC.Node.Path;
// Web Streams
pub const JSReadableStreamBlob = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource;
pub const JSReadableStreamFile = JSC.WebCore.FileBlobLoader.Source.JSReadableStreamSource;
+pub const JSReadableStreamBytes = JSC.WebCore.ByteStream.Source.JSReadableStreamSource;
// Sinks
pub const JSArrayBufferSink = JSC.WebCore.ArrayBufferSink.JSSink;
diff --git a/src/bun.js/bindings/generated_classes.zig b/src/bun.js/bindings/generated_classes.zig
index 425d70a44..7230de74a 100644
--- a/src/bun.js/bindings/generated_classes.zig
+++ b/src/bun.js/bindings/generated_classes.zig
@@ -845,6 +845,9 @@ pub const JSRequest = struct {
@compileLog("Expected Request.getArrayBuffer to be a callback");
if (@TypeOf(Request.getBlob) != CallbackType)
@compileLog("Expected Request.getBlob to be a callback");
+ if (@TypeOf(Request.getBody) != GetterType)
+ @compileLog("Expected Request.getBody to be a getter");
+
if (@TypeOf(Request.getBodyUsed) != GetterType)
@compileLog("Expected Request.getBodyUsed to be a getter");
@@ -893,6 +896,7 @@ pub const JSRequest = struct {
@export(Request.finalize, .{ .name = "RequestClass__finalize" });
@export(Request.getArrayBuffer, .{ .name = "RequestPrototype__getArrayBuffer" });
@export(Request.getBlob, .{ .name = "RequestPrototype__getBlob" });
+ @export(Request.getBody, .{ .name = "RequestPrototype__getBody" });
@export(Request.getBodyUsed, .{ .name = "RequestPrototype__getBodyUsed" });
@export(Request.getCache, .{ .name = "RequestPrototype__getCache" });
@export(Request.getCredentials, .{ .name = "RequestPrototype__getCredentials" });
@@ -968,6 +972,9 @@ pub const JSResponse = struct {
@compileLog("Expected Response.getArrayBuffer to be a callback");
if (@TypeOf(Response.getBlob) != CallbackType)
@compileLog("Expected Response.getBlob to be a callback");
+ if (@TypeOf(Response.getBody) != GetterType)
+ @compileLog("Expected Response.getBody to be a getter");
+
if (@TypeOf(Response.getBodyUsed) != GetterType)
@compileLog("Expected Response.getBodyUsed to be a getter");
@@ -1010,6 +1017,7 @@ pub const JSResponse = struct {
@export(Response.finalize, .{ .name = "ResponseClass__finalize" });
@export(Response.getArrayBuffer, .{ .name = "ResponsePrototype__getArrayBuffer" });
@export(Response.getBlob, .{ .name = "ResponsePrototype__getBlob" });
+ @export(Response.getBody, .{ .name = "ResponsePrototype__getBody" });
@export(Response.getBodyUsed, .{ .name = "ResponsePrototype__getBodyUsed" });
@export(Response.getHeaders, .{ .name = "ResponsePrototype__getHeaders" });
@export(Response.getJSON, .{ .name = "ResponsePrototype__getJSON" });
diff --git a/src/bun.js/bindings/headers-cpp.h b/src/bun.js/bindings/headers-cpp.h
index 93b878c94..5f97769c8 100644
--- a/src/bun.js/bindings/headers-cpp.h
+++ b/src/bun.js/bindings/headers-cpp.h
@@ -1,4 +1,4 @@
-//-- AUTOGENERATED FILE -- 1663900299
+//-- AUTOGENERATED FILE -- 1663581060
// clang-format off
#pragma once
diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h
index b251568f9..49b26d394 100644
--- a/src/bun.js/bindings/headers.h
+++ b/src/bun.js/bindings/headers.h
@@ -1,5 +1,5 @@
// clang-format off
-//-- AUTOGENERATED FILE -- 1663900299
+//-- AUTOGENERATED FILE -- 1663581060
#pragma once
#include <stddef.h>
@@ -822,6 +822,12 @@ ZIG_DECL JSC__JSValue ByteBlob__JSReadableStreamSource__load(JSC__JSGlobalObject
ZIG_DECL JSC__JSValue FileBlobLoader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0);
#endif
+
+#ifdef __cplusplus
+
+ZIG_DECL JSC__JSValue ByteStream__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0);
+
+#endif
CPP_DECL JSC__JSValue ArrayBufferSink__assignToStream(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1, void* arg2, void** arg3);
CPP_DECL JSC__JSValue ArrayBufferSink__createObject(JSC__JSGlobalObject* arg0, void* arg1);
CPP_DECL void ArrayBufferSink__detachPtr(JSC__JSValue JSValue0);
diff --git a/src/bun.js/bindings/napi.cpp b/src/bun.js/bindings/napi.cpp
index 009a11f61..b4bef19f9 100644
--- a/src/bun.js/bindings/napi.cpp
+++ b/src/bun.js/bindings/napi.cpp
@@ -751,6 +751,17 @@ extern "C" napi_status napi_create_reference(napi_env env, napi_value value,
return napi_ok;
}
+extern "C" void napi_set_ref(NapiRef* ref, JSC__JSValue val_)
+{
+
+ JSC::JSValue val = JSC::JSValue::decode(val_);
+ if (val) {
+ ref->strongRef.set(ref->globalObject->vm(), val);
+ } else {
+ ref->strongRef.clear();
+ }
+}
+
extern "C" napi_status napi_add_finalizer(napi_env env, napi_value js_object,
void* native_object,
napi_finalize finalize_cb,
@@ -794,6 +805,11 @@ extern "C" napi_status napi_get_reference_value(napi_env env, napi_ref ref,
return napi_ok;
}
+extern "C" JSC__JSValue napi_get_reference_value_internal(NapiRef* napiRef)
+{
+ return JSC::JSValue::encode(napiRef->value());
+}
+
extern "C" napi_status napi_reference_ref(napi_env env, napi_ref ref,
uint32_t* result)
{
diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts
index 3cf695f74..84bad8f24 100644
--- a/src/bun.js/webcore/response.classes.ts
+++ b/src/bun.js/webcore/response.classes.ts
@@ -10,6 +10,7 @@ export default [
proto: {
text: { fn: "getText" },
json: { fn: "getJSON" },
+ body: { getter: "getBody", cache: true },
arrayBuffer: { fn: "getArrayBuffer" },
blob: { fn: "getBlob" },
clone: { fn: "doClone", length: 1 },
@@ -74,6 +75,7 @@ export default [
getter: "getURL",
cache: true,
},
+ body: { getter: "getBody", cache: true },
text: { fn: "getText" },
json: { fn: "getJSON" },
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index dd37537e3..64dfe3dc7 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -145,6 +145,18 @@ pub const Response = struct {
return JSValue.jsBoolean(this.body.value == .Used);
}
+ pub fn getBody(
+ this: *Response,
+ globalThis: *JSC.JSGlobalObject,
+ ) callconv(.C) JSValue {
+ if (this.body.value == .Used) {
+ globalThis.throw("Body already used", .{});
+ return JSValue.jsUndefined();
+ }
+
+ return this.body.value.toReadableStream(globalThis);
+ }
+
pub fn getStatusText(
this: *Response,
globalThis: *JSC.JSGlobalObject,
@@ -194,21 +206,21 @@ pub const Response = struct {
}
pub fn cloneInto(
- this: *const Response,
+ this: *Response,
new_response: *Response,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
) void {
new_response.* = Response{
.allocator = allocator,
- .body = this.body.clone(allocator, globalThis),
+ .body = this.body.clone(globalThis),
.url = allocator.dupe(u8, this.url) catch unreachable,
.status_text = allocator.dupe(u8, this.status_text) catch unreachable,
.redirected = this.redirected,
};
}
- pub fn clone(this: *const Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response {
+ pub fn clone(this: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response {
var new_response = allocator.create(Response) catch unreachable;
this.cloneInto(new_response, allocator, globalThis);
return new_response;
@@ -267,7 +279,7 @@ pub const Response = struct {
return default.value;
},
- .Used, .Locked, .Empty, .Error => return default.value,
+ .InternalBlob, .Used, .Locked, .Empty, .Error => return default.value,
}
}
@@ -552,7 +564,7 @@ pub const Fetch = struct {
const globalThis = this.global_this;
var ref = this.ref;
- const promise_value = ref.get(globalThis);
+ const promise_value = ref.get();
defer ref.destroy(globalThis);
if (promise_value.isEmptyOrUndefinedOrNull()) {
@@ -596,7 +608,7 @@ pub const Fetch = struct {
var allocator = this.global_this.bunVM().allocator;
const http_response = this.result.response;
var response = allocator.create(Response) catch unreachable;
- const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this);
+ const internal_blob = this.response_buffer.list.toManaged(this.response_buffer.allocator);
this.response_buffer = .{ .allocator = default_allocator, .list = .{
.items = &.{},
.capacity = 0,
@@ -613,7 +625,7 @@ pub const Fetch = struct {
.status_code = @truncate(u16, http_response.status_code),
},
.value = .{
- .Blob = blob,
+ .InternalBlob = internal_blob,
},
},
};
@@ -937,6 +949,15 @@ pub const Blob = struct {
return this.store == null;
}
+ pub fn writeFormatForSize(size: usize, writer: anytype, comptime enable_ansi_colors: bool) !void {
+ try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors));
+ try writer.print(
+ comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors),
+ .{
+ bun.fmt.size(size),
+ },
+ );
+ }
pub fn writeFormat(this: *const Blob, formatter: *JSC.Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {
const Writer = @TypeOf(writer);
@@ -947,38 +968,34 @@ pub const Blob = struct {
return;
}
- var store = this.store.?;
- switch (store.data) {
- .file => |file| {
- try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors));
- switch (file.pathlike) {
- .path => |path| {
- try writer.print(
- comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors),
- .{
- path.slice(),
- },
- );
- },
- .fd => |fd| {
- try writer.print(
- comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors),
- .{
- fd,
- },
- );
- },
- }
- },
- .bytes => {
- try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors));
- try writer.print(
- comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors),
- .{
- bun.fmt.size(this.size),
- },
- );
- },
+ {
+ var store = this.store.?;
+ switch (store.data) {
+ .file => |file| {
+ try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors));
+ switch (file.pathlike) {
+ .path => |path| {
+ try writer.print(
+ comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors),
+ .{
+ path.slice(),
+ },
+ );
+ },
+ .fd => |fd| {
+ try writer.print(
+ comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors),
+ .{
+ fd,
+ },
+ );
+ },
+ }
+ },
+ .bytes => {
+ try writeFormatForSize(this.size, writer, enable_ansi_colors);
+ },
+ }
}
if (this.content_type.len > 0 or this.offset > 0) {
@@ -1071,7 +1088,7 @@ pub const Blob = struct {
bun.default_allocator.destroy(this);
promise.reject(globalThis, ZigString.init("Body was used after it was consumed").toErrorInstance(globalThis));
},
- .Empty, .Blob => {
+ .InternalBlob, .Empty, .Blob => {
var blob = value.use();
// TODO: this should be one promise not two!
const new_promise = writeFileWithSourceDestination(globalThis.ref(), &blob, &file_blob);
@@ -1273,7 +1290,7 @@ pub const Blob = struct {
var source_blob: Blob = brk: {
if (data.as(Response)) |response| {
switch (response.body.value) {
- .Used, .Empty, .Blob => {
+ .InternalBlob, .Used, .Empty, .Blob => {
break :brk response.body.use();
},
.Error => {
@@ -1302,7 +1319,7 @@ pub const Blob = struct {
if (data.as(Request)) |request| {
switch (request.body) {
- .Used, .Empty, .Blob => {
+ .InternalBlob, .Used, .Empty, .Blob => {
break :brk request.body.use();
},
.Error => {
@@ -3321,11 +3338,6 @@ pub const Blob = struct {
return slice_[0..@minimum(slice_.len, @as(usize, this.size))];
}
- pub fn view(this: *const Blob) []const u8 {
- if (this.size == 0 or this.store == null) return "";
- return this.store.?.sharedView()[this.offset..][0..this.size];
- }
-
pub const Lifetime = JSC.WebCore.Lifetime;
pub fn setIsASCIIFlag(this: *Blob, is_all_ascii: bool) void {
this.is_all_ascii = is_all_ascii;
@@ -3881,10 +3893,10 @@ pub const Body = struct {
return this.value.use();
}
- pub fn clone(this: Body, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) Body {
+ pub fn clone(this: *Body, globalThis: *JSGlobalObject) Body {
return Body{
.init = this.init.clone(globalThis),
- .value = this.value.clone(allocator),
+ .value = this.value.clone(globalThis),
};
}
@@ -3911,6 +3923,16 @@ pub const Body = struct {
try formatter.printComma(Writer, writer, enable_ansi_colors);
try writer.writeAll("\n");
try this.value.Blob.writeFormat(formatter, writer, enable_ansi_colors);
+ } else if (this.value == .InternalBlob) {
+ try formatter.printComma(Writer, writer, enable_ansi_colors);
+ try writer.writeAll("\n");
+ try Blob.writeFormatForSize(this.value.InternalBlob.items.len, writer, enable_ansi_colors);
+ } else if (this.value == .Locked) {
+ if (this.value.Locked.readable) |stream| {
+ try formatter.printComma(Writer, writer, enable_ansi_colors);
+ try writer.writeAll("\n");
+ formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors);
+ }
}
}
@@ -3982,6 +4004,8 @@ pub const Body = struct {
/// conditionally runs when requesting data
/// used in HTTP server to ignore request bodies unless asked for it
onPull: ?fn (ctx: *anyopaque) void = null,
+ onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null,
+
deinit: bool = false,
action: Action = Action.none,
@@ -4041,6 +4065,9 @@ pub const Body = struct {
pub const Value = union(Tag) {
Blob: Blob,
+ /// Single-use Blob
+ /// Avoids a heap allocation.
+ InternalBlob: std.ArrayList(u8),
Locked: PendingValue,
Used: void,
Empty: void,
@@ -4048,6 +4075,7 @@ pub const Body = struct {
pub const Tag = enum {
Blob,
+ InternalBlob,
Locked,
Used,
Empty,
@@ -4056,6 +4084,87 @@ pub const Body = struct {
pub const empty = Value{ .Empty = .{} };
+
+ pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) JSValue {
+ JSC.markBinding();
+
+ switch (this.*) {
+ .Used, .Empty => {
+ return JSC.WebCore.ReadableStream.empty(globalThis);
+ },
+ .InternalBlob => |*bytes| {
+ var blob = Blob.init(bytes.toOwnedSlice(), bytes.allocator, globalThis);
+ defer blob.detach();
+ var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size);
+ this.* = .{
+ .Locked = .{
+ .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?,
+ .global = globalThis,
+ },
+ };
+ return readable;
+ },
+ .Blob => {
+ var blob = this.Blob;
+ defer blob.detach();
+ var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size);
+ readable.protect();
+ this.* = .{
+ .Locked = .{
+ .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?,
+ .global = globalThis,
+ },
+ };
+ return readable;
+ },
+ .Locked => {
+ var locked = &this.Locked;
+ if (locked.readable) |readable| {
+ return readable.value;
+ }
+ var drain_result: JSC.WebCore.DrainResult = .{
+ .estimated_size = 0,
+ };
+
+ if (locked.onDrain) |drain| {
+ locked.onDrain = null;
+ drain_result = drain(locked.task.?);
+ }
+
+ if (drain_result == .empty or drain_result == .aborted) {
+ this.* = .{ .Empty = void{} };
+ return JSC.WebCore.ReadableStream.empty(globalThis);
+ }
+
+ var reader = bun.default_allocator.create(JSC.WebCore.ByteStream.Source) catch unreachable;
+ reader.* = .{
+ .context = undefined,
+ .globalThis = globalThis,
+ };
+
+ reader.context.setup();
+
+ if (drain_result == .estimated_size) {
+ reader.context.highWaterMark = @truncate(Blob.SizeType, drain_result.estimated_size);
+ reader.context.size_hint = @truncate(Blob.SizeType, drain_result.estimated_size);
+ } else if (drain_result == .owned) {
+ reader.context.buffer = drain_result.owned.list;
+ reader.context.size_hint = @truncate(Blob.SizeType, drain_result.owned.size_hint);
+ }
+
+ locked.readable = .{
+ .ptr = .{ .Bytes = &reader.context },
+ .value = reader.toJS(globalThis),
+ };
+
+ locked.readable.?.value.protect();
+ return locked.readable.?.value;
+ },
+
+ else => unreachable,
+ }
+ }
+
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) ?Value {
if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |readable| {
switch (readable.ptr) {
@@ -4121,13 +4230,29 @@ pub const Body = struct {
if (locked.promise) |promise| {
locked.promise = null;
- var blob = new.use();
switch (locked.action) {
.getText => {
- promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref()));
+ if (new.* == .InternalBlob) {
+ var bytes = new.InternalBlob;
+ new.* = .{ .Empty = void{} };
+ var str = ZigString.init(bytes.items).withEncoding();
+ str.mark();
+ if (str.is16Bit()) {
+ const out = str.toValueGC(global);
+ bytes.deinit();
+ promise.asPromise().?.resolve(global, out);
+ } else {
+ const out = str.toExternalValue(global);
+ promise.asPromise().?.resolve(global, out);
+ }
+ } else {
+ var blob = new.use();
+ promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref()));
+ }
},
.getJSON => {
+ var blob = new.use();
const json_value = blob.toJSON(global, .share);
blob.detach();
@@ -4138,17 +4263,25 @@ pub const Body = struct {
}
},
.getArrayBuffer => {
- promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global));
+ if (new.* == .InternalBlob) {
+ const marked = JSC.MarkedArrayBuffer.fromBytes(new.InternalBlob.items, new.InternalBlob.allocator, .ArrayBuffer);
+ new.* = .{ .Empty = void{} };
+ var object_ref = marked.toJS(global, null);
+ promise.asPromise().?.resolve(global, JSC.JSValue.c(object_ref));
+ } else {
+ var blob = new.use();
+ promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global));
+ }
},
.getBlob => {
var ptr = bun.default_allocator.create(Blob) catch unreachable;
- ptr.* = blob;
+ ptr.* = new.use();
ptr.allocator = bun.default_allocator;
promise.asPromise().?.resolve(global, ptr.toJS(global));
},
else => {
var ptr = bun.default_allocator.create(Blob) catch unreachable;
- ptr.* = blob;
+ ptr.* = new.use();
ptr.allocator = bun.default_allocator;
promise.asInternalPromise().?.resolve(global, ptr.toJS(global));
},
@@ -4160,6 +4293,7 @@ pub const Body = struct {
pub fn slice(this: Value) []const u8 {
return switch (this) {
.Blob => this.Blob.sharedView(),
+ .InternalBlob => |list| list.items,
else => "",
};
}
@@ -4172,6 +4306,15 @@ pub const Body = struct {
this.* = .{ .Used = .{} };
return new_blob;
},
+ .InternalBlob => {
+ var new_blob = Blob.init(
+ this.InternalBlob.toOwnedSlice(),
+ this.InternalBlob.allocator,
+ JSC.VirtualMachine.vm.global,
+ );
+ this.* = .{ .Used = .{} };
+ return new_blob;
+ },
else => {
return Blob.initEmpty(undefined);
},
@@ -4228,14 +4371,22 @@ pub const Body = struct {
pub fn deinit(this: *Value) void {
const tag = @as(Tag, this.*);
if (tag == .Locked) {
- if (this.Locked.readable) |*readable| {
- readable.done();
+ if (!this.Locked.deinit) {
+ this.Locked.deinit = true;
+
+ if (this.Locked.readable) |*readable| {
+ readable.done();
+ }
}
- this.Locked.deinit = true;
return;
}
+ if (tag == .InternalBlob) {
+ this.InternalBlob.clearAndFree();
+ this.* = Value.empty;
+ }
+
if (tag == .Blob) {
this.Blob.deinit();
this.* = Value.empty;
@@ -4246,8 +4397,18 @@ pub const Body = struct {
}
}
- pub fn clone(this: Value, _: std.mem.Allocator) Value {
- if (this == .Blob) {
+ pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value {
+ if (this.* == .InternalBlob) {
+ this.* = .{
+ .Blob = Blob.init(
+ this.InternalBlob.toOwnedSlice(),
+ this.InternalBlob.allocator,
+ globalThis,
+ ),
+ };
+ }
+
+ if (this.* == .Blob) {
return Value{ .Blob = this.Blob.dupe() };
}
@@ -4319,10 +4480,16 @@ pub const Body = struct {
} else |_| {}
}
+ if (value.isUndefined()) {
+ body.value = Value.empty;
+ return body;
+ }
+
body.value = Value.fromJS(globalThis, value) orelse return null;
if (body.value == .Blob)
std.debug.assert(body.value.Blob.allocator == null); // owned by Body
+
return body;
}
};
@@ -4358,7 +4525,18 @@ pub const Request = struct {
try writer.writeAll("\"");
if (this.body == .Blob) {
try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
try this.body.Blob.writeFormat(formatter, writer, enable_ansi_colors);
+ } else if (this.body == .InternalBlob) {
+ try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
+ try Blob.writeFormatForSize(this.body.InternalBlob.items.len, writer, enable_ansi_colors);
+ } else if (this.body == .Locked) {
+ if (this.body.Locked.readable) |stream| {
+ try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
+ formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors);
+ }
}
}
try writer.writeAll("\n");
@@ -4392,7 +4570,7 @@ pub const Request = struct {
return MimeType.other.value;
},
- .Error, .Used, .Locked, .Empty => return MimeType.other.value,
+ .InternalBlob, .Error, .Used, .Locked, .Empty => return MimeType.other.value,
}
}
@@ -4415,6 +4593,18 @@ pub const Request = struct {
return ZigString.init("").toValueGC(globalThis);
}
+ pub fn getBody(
+ this: *Request,
+ globalThis: *JSC.JSGlobalObject,
+ ) callconv(.C) JSValue {
+ if (this.body == .Used) {
+ globalThis.throw("Body already used", .{});
+ return JSValue.jsUndefined();
+ }
+
+ return this.body.toReadableStream(globalThis);
+ }
+
pub fn getIntegrity(
_: *Request,
globalThis: *JSC.JSGlobalObject,
@@ -4521,17 +4711,9 @@ pub const Request = struct {
}
if (urlOrObject.fastGet(globalThis, .body)) |body_| {
- if (Blob.get(globalThis, body_, true, false)) |blob| {
- if (blob.size > 0) {
- request.body = Body.Value{ .Blob = blob };
- }
- } else |err| {
- if (err == error.InvalidArguments) {
- globalThis.throwInvalidArguments("Expected an Array", .{});
- return null;
- }
-
- globalThis.throwInvalidArguments("Invalid Body object", .{});
+ if (Body.Value.fromJS(globalThis, body_)) |body| {
+ request.body = body.value;
+ } else {
return null;
}
}
@@ -4546,17 +4728,9 @@ pub const Request = struct {
}
if (arguments[1].fastGet(globalThis, .body)) |body_| {
- if (Blob.get(globalThis, body_, true, false)) |blob| {
- if (blob.size > 0) {
- request.body = Body.Value{ .Blob = blob };
- }
- } else |err| {
- if (err == error.InvalidArguments) {
- globalThis.throwInvalidArguments("Expected an Array", .{});
- return null;
- }
-
- globalThis.throwInvalidArguments("Invalid Body object", .{});
+ if (Body.Value.fromJS(globalThis, body_)) |body| {
+ request.body = body.value;
+ } else {
return null;
}
}
@@ -4614,7 +4788,7 @@ pub const Request = struct {
globalThis: *JSGlobalObject,
) void {
req.* = Request{
- .body = this.body.clone(allocator),
+ .body = this.body.clone(globalThis),
.url = ZigString.init(allocator.dupe(u8, this.url.slice()) catch unreachable),
.method = this.method,
};
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 23aad70ec..681ff6172 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -70,7 +70,7 @@ pub const ReadableStream = struct {
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding();
this.value.unprotect();
- ReadableStream__abort(this.value, globalThis);
+ ReadableStream__cancel(this.value, globalThis);
}
pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
@@ -98,6 +98,8 @@ pub const ReadableStream = struct {
/// This is a direct readable stream
/// That means we can turn it into whatever we want
Direct = 3,
+
+ Bytes = 4,
};
pub const Source = union(Tag) {
Invalid: void,
@@ -116,6 +118,8 @@ pub const ReadableStream = struct {
/// This is a direct readable stream
/// That means we can turn it into whatever we want
Direct: void,
+
+ Bytes: *ByteStream,
};
extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag;
@@ -165,6 +169,13 @@ pub const ReadableStream = struct {
},
},
+ .Bytes => ReadableStream{
+ .value = value,
+ .ptr = .{
+ .Bytes = ptr.asPtr(ByteStream),
+ },
+ },
+
// .HTTPRequest => ReadableStream{
// .value = value,
// .ptr = .{
@@ -184,6 +195,7 @@ pub const ReadableStream = struct {
extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue;
pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue {
+ JSC.markBinding();
return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id)));
}
pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
@@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) {
}
};
+pub const DrainResult = union(enum) {
+ owned: struct {
+ list: std.ArrayList(u8),
+ size_hint: usize,
+ },
+ estimated_size: usize,
+ empty: void,
+ aborted: void,
+};
+
pub const StreamResult = union(Tag) {
owned: bun.ByteList,
owned_and_done: bun.ByteList,
@@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct {
pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
};
-pub fn RequestBodyStreamer(
- comptime is_ssl: bool,
-) type {
- return struct {
- response: *uws.NewApp(is_ssl).Response,
+pub const ByteStream = struct {
+ buffer: std.ArrayList(u8) = .{
+ .allocator = bun.default_allocator,
+ .items = &.{},
+ .capacity = 0,
+ },
+ has_received_last_chunk: bool = false,
+ pending: StreamResult.Pending = StreamResult.Pending{
+ .frame = undefined,
+ .used = false,
+ .result = .{ .done = {} },
+ },
+ done: bool = false,
+ pending_buffer: []u8 = &.{},
+ pending_value: ?*JSC.napi.Ref = null,
+ offset: usize = 0,
+ highWaterMark: Blob.SizeType = 0,
+ pipe: Pipe = .{},
+ size_hint: Blob.SizeType = 0,
+
+ pub const Pipe = struct {
+ ctx: ?*anyopaque = null,
+ onPipe: ?PipeFunction = null,
+
+ pub fn New(comptime Type: type, comptime Function: anytype) type {
+ return struct {
+ pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void {
+ Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator);
+ }
- pub const tag = if (is_ssl)
- ReadableStream.Tag.HTTPRequest
- else if (is_ssl)
- ReadableStream.Tag.HTTPSRequest;
+ pub fn init(self: *Type) Pipe {
+ return Pipe{
+ .ctx = self,
+ .onPipe = pipe,
+ };
+ }
+ };
+ }
+ };
- pub fn onStart(this: *ByteBlobLoader) StreamStart {
- return .{ .chunk_size = this.chunk_size };
+ pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
+
+ pub const tag = ReadableStream.Tag.Bytes;
+
+ pub fn setup(this: *ByteStream) void {
+ this.* = .{};
+ }
+
+ pub fn onStart(this: *@This()) StreamStart {
+ if (this.has_received_last_chunk and this.buffer.items.len == 0) {
+ return .{ .empty = void{} };
}
- pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
- array.ensureStillAlive();
- defer array.ensureStillAlive();
- if (this.done) {
- return .{ .done = {} };
+ if (this.has_received_last_chunk) {
+ return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) };
+ }
+
+ if (this.highWaterMark == 0) {
+ return .{ .ready = void{} };
+ }
+
+ return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) };
+ }
+
+ pub fn value(this: *@This()) JSValue {
+ if (this.pending_value == null)
+ return .zero;
+
+ const result = this.pending_value.?.get();
+ this.pending_value.?.set(.zero);
+
+ return result;
+ }
+
+ pub fn isCancelled(this: *const @This()) bool {
+ return @fieldParentPtr(Source, "context", this).cancelled;
+ }
+
+ pub fn unpipe(this: *@This()) void {
+ this.pipe.ctx = null;
+ this.pipe.onPipe = null;
+ if (!this.parent().deinited) {
+ this.parent().deinited = true;
+ bun.default_allocator.destroy(this.parent());
+ }
+ }
+
+ pub fn onData(
+ this: *@This(),
+ stream: StreamResult,
+ allocator: std.mem.Allocator,
+ ) void {
+ JSC.markBinding();
+ if (this.done) {
+ if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) {
+ if (stream == .owned) allocator.free(stream.owned.slice());
+ if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice());
}
- var temporary = this.store.sharedView();
- temporary = temporary[this.offset..];
+ return;
+ }
+
+ std.debug.assert(!this.has_received_last_chunk);
+ this.has_received_last_chunk = stream.isDone();
+
+ if (this.pipe.ctx != null) {
+ this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator);
+ return;
+ }
+
+ const chunk = stream.slice();
+
+ if (!this.pending.used) {
+ std.debug.assert(this.buffer.items.len == 0);
+ var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)];
+ const pending_buffer_len = this.pending_buffer.len;
+ @memcpy(to_copy.ptr, chunk.ptr, to_copy.len);
+ this.pending_buffer = &.{};
+
+ const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len;
- temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
- if (temporary.len == 0) {
- this.store.deref();
+ if (is_really_done) {
this.done = true;
- return .{ .done = {} };
+ this.pending.result = .{
+ .into_array_and_done = .{
+ .value = this.value(),
+ .len = @truncate(Blob.SizeType, to_copy.len),
+ },
+ };
+ } else {
+ this.pending.result = .{
+ .into_array = .{
+ .value = this.value(),
+ .len = @truncate(Blob.SizeType, to_copy.len),
+ },
+ };
}
- const copied = @intCast(Blob.SizeType, temporary.len);
+ const remaining = chunk[to_copy.len..];
+ if (remaining.len > 0)
+ this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body");
+ resume this.pending.frame;
+ return;
+ }
- this.remain -|= copied;
- this.offset +|= copied;
- @memcpy(buffer.ptr, temporary.ptr, temporary.len);
- if (this.remain == 0) {
- return .{ .into_array_and_done = .{ .value = array, .len = copied } };
+ this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body");
+ }
+
+ pub fn append(
+ this: *@This(),
+ stream: StreamResult,
+ offset: usize,
+ allocator: std.mem.Allocator,
+ ) !void {
+ const chunk = stream.slice()[offset..];
+
+ if (this.buffer.capacity == 0) {
+ switch (stream) {
+ .owned => |owned| {
+ this.buffer = owned.listManaged(allocator);
+ this.offset += offset;
+ },
+ .owned_and_done => |owned| {
+ this.buffer = owned.listManaged(allocator);
+ this.offset += offset;
+ },
+ .temporary_and_done, .temporary => {
+ this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
+ this.buffer.appendSliceAssumeCapacity(chunk);
+ },
+ else => unreachable,
}
+ return;
+ }
+
+ switch (stream) {
+ .temporary_and_done, .temporary => {
+ try this.buffer.appendSlice(chunk);
+ },
+ // We don't support the rest of these yet
+ else => unreachable,
+ }
+ }
- return .{ .into_array = .{ .value = array, .len = copied } };
+ pub fn setValue(this: *@This(), view: JSC.JSValue) void {
+ JSC.markBinding();
+ if (this.pending_value) |pending| {
+ pending.set(view);
+ } else {
+ this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view);
}
+ }
- pub fn onCancel(_: *ByteBlobLoader) void {}
+ pub fn parent(this: *@This()) *Source {
+ return @fieldParentPtr(Source, "context", this);
+ }
- pub fn deinit(this: *ByteBlobLoader) void {
- if (!this.done) {
+ pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult {
+ JSC.markBinding();
+ std.debug.assert(buffer.len > 0);
+
+ if (this.buffer.items.len > 0) {
+ std.debug.assert(this.value() == .zero);
+ const to_write = @minimum(
+ this.buffer.items.len - this.offset,
+ buffer.len,
+ );
+ var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write];
+
+ @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write);
+
+ if (this.offset + to_write == this.buffer.items.len) {
+ this.offset = 0;
+ this.buffer.items.len = 0;
+ } else {
+ this.offset += to_write;
+ }
+
+ if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
+ this.buffer.clearAndFree();
this.done = true;
- this.store.deref();
+
+ return .{
+ .into_array_and_done = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, to_write),
+ },
+ };
+ }
+
+ return .{
+ .into_array = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, to_write),
+ },
+ };
+ }
+
+ if (this.has_received_last_chunk) {
+ return .{
+ .done = void{},
+ };
+ }
+
+ this.pending_buffer = buffer;
+ this.setValue(view);
+
+ return .{
+ .pending = &this.pending,
+ };
+ }
+
+ pub fn onCancel(this: *@This()) void {
+ JSC.markBinding();
+ const view = this.value();
+ if (this.buffer.capacity > 0) this.buffer.clearAndFree();
+ this.done = true;
+ if (this.pending_value) |ref| {
+ this.pending_value = null;
+ ref.destroy(undefined);
+ }
+
+ if (view != .zero) {
+ this.pending_buffer = &.{};
+ this.pending.result = .{ .done = {} };
+ if (!this.pending.used) {
+ resume this.pending.frame;
}
+ }
+ }
+
+ pub fn deinit(this: *@This()) void {
+ JSC.markBinding();
+ if (this.buffer.capacity > 0) this.buffer.clearAndFree();
- bun.default_allocator.destroy(this);
+ if (this.pending_value) |ref| {
+ this.pending_value = null;
+ ref.destroy(undefined);
}
+ if (!this.done) {
+ this.done = true;
- pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer";
- pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit);
- };
-}
+ this.pending_buffer = &.{};
+ this.pending.result = .{ .done = {} };
+
+ if (!this.pending.used) {
+ resume this.pending.frame;
+ }
+ }
+
+ bun.default_allocator.destroy(this.parent());
+ }
+
+ pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit);
+};
pub const FileBlobLoader = struct {
buf: []u8 = &[_]u8{},