diff options
author | 2022-09-26 20:04:28 -0700 | |
---|---|---|
committer | 2022-09-26 20:04:28 -0700 | |
commit | 24a9bc23b7e1c7911cb2e146be199d940b9729e6 (patch) | |
tree | 852a75cff3950063b405ca3a0dfe22e46d0eecfb | |
parent | 97c3688788a94faffb6bceb4bc6c97fb84307ceb (diff) | |
download | bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.gz bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.zst bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.zip |
[Web Streams] Add `body` to `Response` and `Request` (#1255)
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
-rw-r--r-- | src/bun.js/api/server.zig | 293 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGeneratedClasses.cpp | 52 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGeneratedClasses.h | 6 | ||||
-rw-r--r-- | src/bun.js/bindings/exports.zig | 1 | ||||
-rw-r--r-- | src/bun.js/bindings/generated_classes.zig | 8 | ||||
-rw-r--r-- | src/bun.js/bindings/headers-cpp.h | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/headers.h | 8 | ||||
-rw-r--r-- | src/bun.js/bindings/napi.cpp | 16 | ||||
-rw-r--r-- | src/bun.js/webcore/response.classes.ts | 2 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 340 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 336 | ||||
-rw-r--r-- | src/napi/napi.zig | 19 | ||||
-rw-r--r-- | test/bun.js/streams.test.js | 28 |
13 files changed, 956 insertions, 155 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index d7cfbe4c1..939e010f3 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -538,7 +538,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp sendfile: SendfileContext = undefined, request_js_object: JSC.C.JSObjectRef = null, request_body_buf: std.ArrayListUnmanaged(u8) = .{}, + request_body_content_len: usize = 0, sink: ?*ResponseStream.JSSink = null, + byte_stream: ?*JSC.WebCore.ByteStream = null, has_written_status: bool = false, @@ -696,20 +698,67 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } this.response_buf_owned = std.ArrayListUnmanaged(u8){ .items = bb.items, .capacity = bb.capacity }; - this.renderResponseBuffer(); + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); } pub fn renderResponseBuffer(this: *RequestContext) void { this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this); } - pub fn onWritableResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool { + /// Render a complete response buffer + pub fn renderResponseBufferAndMetadata(this: *RequestContext) void { + this.renderMetadata(); + + if (!this.resp.tryEnd( + this.response_buf_owned.items, + this.response_buf_owned.items.len, + )) { + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); + this.setAbortHandler(); + return; + } + + this.finalize(); + } + + /// Drain a partial response buffer + pub fn drainResponseBufferAndMetadata(this: *RequestContext) void { + this.renderMetadata(); + this.setAbortHandler(); + + _ = this.resp.write( + this.response_buf_owned.items, + ); + + this.response_buf_owned.items.len = 0; + } + + pub fn renderResponseBufferAndMetadataCorked(this: *RequestContext) void { + this.resp.runCorkedWithType(*RequestContext, renderResponseBufferAndMetadata, this); + } + + pub fn drainResponseBufferAndMetadataCorked(this: *RequestContext) void { + this.resp.runCorkedWithType(*RequestContext, drainResponseBufferAndMetadata, this); + } + + pub fn onWritableResponseBuffer(this: *RequestContext, _: c_ulong, resp: *App.Response) callconv(.C) bool { + std.debug.assert(this.resp == resp); + if (this.aborted) { + this.finalizeForAbort(); + return false; + } + resp.end("", false); + this.finalize(); + return false; + } + + pub fn onWritableCompleteResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool { std.debug.assert(this.resp == resp); if (this.aborted) { this.finalizeForAbort(); return false; } - return this.sendWritableBytes(this.response_buf_owned.items, write_offset, resp); + return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp); } pub fn create(this: *RequestContext, server: *ThisServer, req: *uws.Request, resp: *App.Response) void { @@ -772,11 +821,24 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) { this.pending_promises_for_abort += 1; req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + } else if (req.body == .Locked and (req.body.Locked.readable != null)) { + req.body.Locked.readable.?.abort(this.server.globalThis); + req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + req.body.Locked.readable = null; } req.uws_request = null; } } + if (this.response_ptr) |response| { + if (response.body.value == .Locked) { + if (response.body.value.Locked.readable) |*readable| { + response.body.value.Locked.readable = null; + readable.abort(this.server.globalThis); + } + } + } + // then, we reject the response promise if (this.promise) |promise| { this.pending_promises_for_abort += 1; @@ -841,6 +903,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } JSC.C.JSValueUnprotect(this.server.globalThis.ref(), promise.asObjectRef()); } + + if (this.byte_stream) |stream| { + this.byte_stream = null; + stream.unpipe(); + } } pub fn finalize(this: *RequestContext) void { this.finalizeWithoutDeinit(); @@ -974,10 +1041,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } var bytes = this.blob.sharedView(); - return this.sendWritableBytes(bytes, write_offset, resp); + _ = this.sendWritableBytesForBlob(bytes, write_offset, resp); + return true; } - pub fn sendWritableBytes(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { + pub fn sendWritableBytesForBlob(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { std.debug.assert(this.resp == resp); var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..]; @@ -990,6 +1058,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + pub fn sendWritableBytesForCompleteResponseBuffer(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { + std.debug.assert(this.resp == resp); + + var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..]; + if (resp.tryEnd(bytes, bytes_.len)) { + this.response_buf_owned.items.len = 0; + this.finalize(); + } else { + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); + } + + return true; + } + pub fn onWritableSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool { return this.onSendfile(); } @@ -1125,7 +1207,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } else { this.blob.size = @truncate(Blob.SizeType, result.result.buf.len); this.response_buf_owned = .{ .items = result.result.buf, .capacity = result.result.buf.len }; - this.renderResponseBuffer(); + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); } } @@ -1481,6 +1563,41 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair); return; }, + + .Bytes => |byte_stream| { + std.debug.assert(byte_stream.pipe.ctx == null); + std.debug.assert(this.byte_stream == null); + + stream.detach(this.server.globalThis); + + this.response_buf_owned = byte_stream.buffer.moveToUnmanaged(); + + // If we've received the complete body by the time this function is called + // we can avoid streaming it and just send it all at once. + if (byte_stream.has_received_last_chunk) { + this.blob.size = @truncate(Blob.SizeType, this.response_buf_owned.items.len); + byte_stream.parent().deinit(); + this.renderResponseBufferAndMetadataCorked(); + return; + } + + byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this); + this.byte_stream = byte_stream; + + // we don't set size here because even if we have a hint + // uWebSockets won't let us partially write streaming content + this.blob.size = 0; + + // if we've received metadata and part of the body, send everything we can and drain + if (this.response_buf_owned.items.len > 0) { + this.drainResponseBufferAndMetadataCorked(); + } else { + // if we only have metadata to send, send it now + this.resp.runCorkedWithType(*RequestContext, renderMetadata, this); + } + this.setAbortHandler(); + return; + }, } } @@ -1496,6 +1613,42 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.doRenderBlob(); } + pub fn onPipe(this: *RequestContext, stream: JSC.WebCore.StreamResult, allocator: std.mem.Allocator) void { + var stream_needs_deinit = stream == .owned or stream == .owned_and_done; + + defer { + if (stream_needs_deinit) { + if (stream.isDone()) { + stream.owned_and_done.listManaged(allocator).deinit(); + } else { + stream.owned.listManaged(allocator).deinit(); + } + } + } + + if (this.aborted) { + this.finalizeForAbort(); + return; + } + + const chunk = stream.slice(); + // on failure, it will continue to allocate + // we can't do buffering ourselves here or it won't work + // uSockets will append and manage the buffer + // so any write will buffer if the write fails + if (this.resp.write(chunk)) { + if (stream.isDone()) { + this.resp.endStream(false); + this.finalize(); + } + } else { + // when it's the last one, we just want to know if it's done + if (stream.isDone()) { + this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this); + } + } + } + pub fn doRenderBlob(this: *RequestContext) void { // We are not corked // The body is small @@ -1712,34 +1865,123 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.aborted) return; - this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); + const request = JSC.JSValue.fromRef(this.request_js_object); + var req = request.as(Request) orelse { + this.request_body_buf.clearAndFree(this.allocator); + return; + }; + + if (req.body == .Locked) { + if (req.body.Locked.readable) |readable| { + if (readable.ptr == .Bytes) { + std.debug.assert(this.request_body_buf.items.len == 0); + + if (!last) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.init(chunk), + }, + bun.default_allocator, + ); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.init(chunk), + }, + bun.default_allocator, + ); + } + + return; + } + } + } + if (last) { - const request = JSC.JSValue.fromRef(this.request_js_object); - if (request.as(Request)) |req| { - request.ensureStillAlive(); - var bytes = this.request_body_buf.toOwnedSlice(this.allocator); - var old = req.body; - req.body = .{ - .Blob = if (bytes.len > 0) - Blob.init(bytes, this.allocator, this.server.globalThis) - else - Blob.initEmpty(this.server.globalThis), - }; - if (old == .Locked) - old.resolve(&req.body, this.server.globalThis); - request.unprotect(); + request.ensureStillAlive(); + var bytes = this.request_body_buf; + var old = req.body; + if (bytes.items.len == 0) { + req.body = .{ .Empty = {} }; } else { - this.request_body_buf.clearAndFree(this.allocator); + req.body = .{ .InternalBlob = bytes.toManaged(this.allocator) }; } + + if (old == .Locked) + old.resolve(&req.body, this.server.globalThis); + request.unprotect(); } + + if (this.request_body_buf.capacity == 0) { + this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @minimum(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer"); + } + + this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); } + pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult { + if (this.aborted) { + return JSC.WebCore.DrainResult{ + .aborted = void{}, + }; + } + + std.debug.assert(!this.resp.hasResponded()); + + // This means we have received part of the body but not the whole thing + if (this.request_body_buf.items.len > 0) { + var emptied = this.request_body_buf; + this.request_body_buf = .{}; + return .{ + .owned = .{ + .list = emptied.toManaged(this.allocator), + .size_hint = if (emptied.capacity < max_request_body_preallocate_length) + emptied.capacity + else + 0, + }, + }; + } + + const content_length = this.req.header("content-length") orelse { + return .{ + .empty = void{}, + }; + }; + + const len = std.fmt.parseInt(usize, content_length, 10) catch 0; + this.request_body_content_len = len; + + if (len == 0) { + return JSC.WebCore.DrainResult{ + .empty = void{}, + }; + } + + if (len > this.server.config.max_request_body_size) { + this.resp.writeStatus("413 Request Entity Too Large"); + this.resp.endWithoutBody(); + + this.finalize(); + return JSC.WebCore.DrainResult{ + .aborted = void{}, + }; + } + + this.resp.onData(*RequestContext, onBufferedBodyChunk, this); + + return .{ + .estimated_size = len, + }; + } + const max_request_body_preallocate_length = 1024 * 256; pub fn onPull(this: *RequestContext) void { const request = JSC.JSValue.c(this.request_js_object); request.ensureStillAlive(); if (this.req.header("content-length")) |content_length| { const len = std.fmt.parseInt(usize, content_length, 10) catch 0; + this.request_body_content_len = len; if (len == 0) { if (request.as(Request)) |req| { var old = req.body; @@ -1766,8 +2008,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.finalize(); return; } - - this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, len) catch @panic("Out of memory while allocating request body buffer"); } else if (this.req.header("transfer-encoding") == null) { // no content-length // no transfer-encoding @@ -1789,6 +2029,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp onPull(bun.cast(*RequestContext, this)); } + pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult { + return onDrainRequestBody(bun.cast(*RequestContext, this)); + } + pub const Export = shim.exportFunctions(.{ .onResolve = onResolve, .onReject = onReject, @@ -2243,6 +2487,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { .task = ctx, .global = this.globalThis, .onPull = RequestContext.onPullCallback, + .onDrain = RequestContext.onDrainRequestBodyCallback, }, }, }; diff --git a/src/bun.js/bindings/ZigGeneratedClasses.cpp b/src/bun.js/bindings/ZigGeneratedClasses.cpp index 78f1bb179..925814c2d 100644 --- a/src/bun.js/bindings/ZigGeneratedClasses.cpp +++ b/src/bun.js/bindings/ZigGeneratedClasses.cpp @@ -2505,6 +2505,10 @@ extern "C" EncodedJSValue RequestPrototype__getBlob(void* ptr, JSC::JSGlobalObje JSC_DECLARE_HOST_FUNCTION(RequestPrototype__blobCallback); +extern "C" JSC::EncodedJSValue RequestPrototype__getBody(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject); +JSC_DECLARE_CUSTOM_GETTER(RequestPrototype__bodyGetterWrap); + + extern "C" JSC::EncodedJSValue RequestPrototype__getBodyUsed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject); JSC_DECLARE_CUSTOM_GETTER(RequestPrototype__bodyUsedGetterWrap); @@ -2571,6 +2575,7 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSRequestPrototype, JSRequestPrototype::Base static const HashTableValue JSRequestPrototypeTableValues[] = { { "arrayBuffer"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__arrayBufferCallback, 0 } } , { "blob"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__blobCallback, 0 } } , +{ "body"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__bodyGetterWrap, 0 } } , { "bodyUsed"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__bodyUsedGetterWrap, 0 } } , { "cache"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__cacheGetterWrap, 0 } } , { "clone"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__cloneCallback, 1 } } , @@ -2641,6 +2646,26 @@ JSC_DEFINE_HOST_FUNCTION(RequestPrototype__blobCallback, (JSGlobalObject * lexic } +JSC_DEFINE_CUSTOM_GETTER(RequestPrototype__bodyGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) +{ + auto& vm = lexicalGlobalObject->vm(); + Zig::GlobalObject *globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + JSRequest* thisObject = jsCast<JSRequest*>(JSValue::decode(thisValue)); + JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); + + if (JSValue cachedValue = thisObject->m_body.get()) + return JSValue::encode(cachedValue); + + JSC::JSValue result = JSC::JSValue::decode( + RequestPrototype__getBody(thisObject->wrapped(), globalObject) + ); + RETURN_IF_EXCEPTION(throwScope, {}); + thisObject->m_body.set(vm, thisObject, result); + RELEASE_AND_RETURN(throwScope, JSValue::encode(result)); +} + + JSC_DEFINE_CUSTOM_GETTER(RequestPrototype__bodyUsedGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) { auto& vm = lexicalGlobalObject->vm(); @@ -3002,6 +3027,7 @@ void JSRequest::visitChildrenImpl(JSCell* cell, Visitor& visitor) JSRequest* thisObject = jsCast<JSRequest*>(cell); ASSERT_GC_OBJECT_INHERITS(thisObject, info()); Base::visitChildren(thisObject, visitor); + visitor.append(thisObject->m_body); visitor.append(thisObject->m_headers); visitor.append(thisObject->m_url); } @@ -3018,6 +3044,10 @@ extern "C" EncodedJSValue ResponsePrototype__getBlob(void* ptr, JSC::JSGlobalObj JSC_DECLARE_HOST_FUNCTION(ResponsePrototype__blobCallback); +extern "C" JSC::EncodedJSValue ResponsePrototype__getBody(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject); +JSC_DECLARE_CUSTOM_GETTER(ResponsePrototype__bodyGetterWrap); + + extern "C" JSC::EncodedJSValue ResponsePrototype__getBodyUsed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject); JSC_DECLARE_CUSTOM_GETTER(ResponsePrototype__bodyUsedGetterWrap); @@ -3064,6 +3094,7 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSResponsePrototype, JSResponsePrototype::Ba static const HashTableValue JSResponsePrototypeTableValues[] = { { "arrayBuffer"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__arrayBufferCallback, 0 } } , { "blob"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__blobCallback, 0 } } , +{ "body"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__bodyGetterWrap, 0 } } , { "bodyUsed"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__bodyUsedGetterWrap, 0 } } , { "clone"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__cloneCallback, 1 } } , { "headers"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__headersGetterWrap, 0 } } , @@ -3129,6 +3160,26 @@ JSC_DEFINE_HOST_FUNCTION(ResponsePrototype__blobCallback, (JSGlobalObject * lexi } +JSC_DEFINE_CUSTOM_GETTER(ResponsePrototype__bodyGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) +{ + auto& vm = lexicalGlobalObject->vm(); + Zig::GlobalObject *globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + JSResponse* thisObject = jsCast<JSResponse*>(JSValue::decode(thisValue)); + JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); + + if (JSValue cachedValue = thisObject->m_body.get()) + return JSValue::encode(cachedValue); + + JSC::JSValue result = JSC::JSValue::decode( + ResponsePrototype__getBody(thisObject->wrapped(), globalObject) + ); + RETURN_IF_EXCEPTION(throwScope, {}); + thisObject->m_body.set(vm, thisObject, result); + RELEASE_AND_RETURN(throwScope, JSValue::encode(result)); +} + + JSC_DEFINE_CUSTOM_GETTER(ResponsePrototype__bodyUsedGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) { auto& vm = lexicalGlobalObject->vm(); @@ -3443,6 +3494,7 @@ void JSResponse::visitChildrenImpl(JSCell* cell, Visitor& visitor) JSResponse* thisObject = jsCast<JSResponse*>(cell); ASSERT_GC_OBJECT_INHERITS(thisObject, info()); Base::visitChildren(thisObject, visitor); + visitor.append(thisObject->m_body); visitor.append(thisObject->m_headers); visitor.append(thisObject->m_statusText); visitor.append(thisObject->m_url); diff --git a/src/bun.js/bindings/ZigGeneratedClasses.h b/src/bun.js/bindings/ZigGeneratedClasses.h index feae81ae0..ad85dd3f9 100644 --- a/src/bun.js/bindings/ZigGeneratedClasses.h +++ b/src/bun.js/bindings/ZigGeneratedClasses.h @@ -1309,7 +1309,8 @@ class JSRequest final : public JSC::JSDestructibleObject { DECLARE_VISIT_CHILDREN; - mutable JSC::WriteBarrier<JSC::Unknown> m_headers; + mutable JSC::WriteBarrier<JSC::Unknown> m_body; +mutable JSC::WriteBarrier<JSC::Unknown> m_headers; mutable JSC::WriteBarrier<JSC::Unknown> m_url; }; class JSRequestPrototype final : public JSC::JSNonFinalObject { @@ -1434,7 +1435,8 @@ class JSResponse final : public JSC::JSDestructibleObject { DECLARE_VISIT_CHILDREN; - mutable JSC::WriteBarrier<JSC::Unknown> m_headers; + mutable JSC::WriteBarrier<JSC::Unknown> m_body; +mutable JSC::WriteBarrier<JSC::Unknown> m_headers; mutable JSC::WriteBarrier<JSC::Unknown> m_statusText; mutable JSC::WriteBarrier<JSC::Unknown> m_url; }; diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig index b2fdd001e..c5ab610b2 100644 --- a/src/bun.js/bindings/exports.zig +++ b/src/bun.js/bindings/exports.zig @@ -180,6 +180,7 @@ pub const NodePath = JSC.Node.Path; // Web Streams pub const JSReadableStreamBlob = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource; pub const JSReadableStreamFile = JSC.WebCore.FileBlobLoader.Source.JSReadableStreamSource; +pub const JSReadableStreamBytes = JSC.WebCore.ByteStream.Source.JSReadableStreamSource; // Sinks pub const JSArrayBufferSink = JSC.WebCore.ArrayBufferSink.JSSink; diff --git a/src/bun.js/bindings/generated_classes.zig b/src/bun.js/bindings/generated_classes.zig index 425d70a44..7230de74a 100644 --- a/src/bun.js/bindings/generated_classes.zig +++ b/src/bun.js/bindings/generated_classes.zig @@ -845,6 +845,9 @@ pub const JSRequest = struct { @compileLog("Expected Request.getArrayBuffer to be a callback"); if (@TypeOf(Request.getBlob) != CallbackType) @compileLog("Expected Request.getBlob to be a callback"); + if (@TypeOf(Request.getBody) != GetterType) + @compileLog("Expected Request.getBody to be a getter"); + if (@TypeOf(Request.getBodyUsed) != GetterType) @compileLog("Expected Request.getBodyUsed to be a getter"); @@ -893,6 +896,7 @@ pub const JSRequest = struct { @export(Request.finalize, .{ .name = "RequestClass__finalize" }); @export(Request.getArrayBuffer, .{ .name = "RequestPrototype__getArrayBuffer" }); @export(Request.getBlob, .{ .name = "RequestPrototype__getBlob" }); + @export(Request.getBody, .{ .name = "RequestPrototype__getBody" }); @export(Request.getBodyUsed, .{ .name = "RequestPrototype__getBodyUsed" }); @export(Request.getCache, .{ .name = "RequestPrototype__getCache" }); @export(Request.getCredentials, .{ .name = "RequestPrototype__getCredentials" }); @@ -968,6 +972,9 @@ pub const JSResponse = struct { @compileLog("Expected Response.getArrayBuffer to be a callback"); if (@TypeOf(Response.getBlob) != CallbackType) @compileLog("Expected Response.getBlob to be a callback"); + if (@TypeOf(Response.getBody) != GetterType) + @compileLog("Expected Response.getBody to be a getter"); + if (@TypeOf(Response.getBodyUsed) != GetterType) @compileLog("Expected Response.getBodyUsed to be a getter"); @@ -1010,6 +1017,7 @@ pub const JSResponse = struct { @export(Response.finalize, .{ .name = "ResponseClass__finalize" }); @export(Response.getArrayBuffer, .{ .name = "ResponsePrototype__getArrayBuffer" }); @export(Response.getBlob, .{ .name = "ResponsePrototype__getBlob" }); + @export(Response.getBody, .{ .name = "ResponsePrototype__getBody" }); @export(Response.getBodyUsed, .{ .name = "ResponsePrototype__getBodyUsed" }); @export(Response.getHeaders, .{ .name = "ResponsePrototype__getHeaders" }); @export(Response.getJSON, .{ .name = "ResponsePrototype__getJSON" }); diff --git a/src/bun.js/bindings/headers-cpp.h b/src/bun.js/bindings/headers-cpp.h index 93b878c94..5f97769c8 100644 --- a/src/bun.js/bindings/headers-cpp.h +++ b/src/bun.js/bindings/headers-cpp.h @@ -1,4 +1,4 @@ -//-- AUTOGENERATED FILE -- 1663900299 +//-- AUTOGENERATED FILE -- 1663581060 // clang-format off #pragma once diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index b251568f9..49b26d394 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -1,5 +1,5 @@ // clang-format off -//-- AUTOGENERATED FILE -- 1663900299 +//-- AUTOGENERATED FILE -- 1663581060 #pragma once #include <stddef.h> @@ -822,6 +822,12 @@ ZIG_DECL JSC__JSValue ByteBlob__JSReadableStreamSource__load(JSC__JSGlobalObject ZIG_DECL JSC__JSValue FileBlobLoader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0); #endif + +#ifdef __cplusplus + +ZIG_DECL JSC__JSValue ByteStream__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0); + +#endif CPP_DECL JSC__JSValue ArrayBufferSink__assignToStream(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1, void* arg2, void** arg3); CPP_DECL JSC__JSValue ArrayBufferSink__createObject(JSC__JSGlobalObject* arg0, void* arg1); CPP_DECL void ArrayBufferSink__detachPtr(JSC__JSValue JSValue0); diff --git a/src/bun.js/bindings/napi.cpp b/src/bun.js/bindings/napi.cpp index 009a11f61..b4bef19f9 100644 --- a/src/bun.js/bindings/napi.cpp +++ b/src/bun.js/bindings/napi.cpp @@ -751,6 +751,17 @@ extern "C" napi_status napi_create_reference(napi_env env, napi_value value, return napi_ok; } +extern "C" void napi_set_ref(NapiRef* ref, JSC__JSValue val_) +{ + + JSC::JSValue val = JSC::JSValue::decode(val_); + if (val) { + ref->strongRef.set(ref->globalObject->vm(), val); + } else { + ref->strongRef.clear(); + } +} + extern "C" napi_status napi_add_finalizer(napi_env env, napi_value js_object, void* native_object, napi_finalize finalize_cb, @@ -794,6 +805,11 @@ extern "C" napi_status napi_get_reference_value(napi_env env, napi_ref ref, return napi_ok; } +extern "C" JSC__JSValue napi_get_reference_value_internal(NapiRef* napiRef) +{ + return JSC::JSValue::encode(napiRef->value()); +} + extern "C" napi_status napi_reference_ref(napi_env env, napi_ref ref, uint32_t* result) { diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index 3cf695f74..84bad8f24 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -10,6 +10,7 @@ export default [ proto: { text: { fn: "getText" }, json: { fn: "getJSON" }, + body: { getter: "getBody", cache: true }, arrayBuffer: { fn: "getArrayBuffer" }, blob: { fn: "getBlob" }, clone: { fn: "doClone", length: 1 }, @@ -74,6 +75,7 @@ export default [ getter: "getURL", cache: true, }, + body: { getter: "getBody", cache: true }, text: { fn: "getText" }, json: { fn: "getJSON" }, diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index dd37537e3..64dfe3dc7 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -145,6 +145,18 @@ pub const Response = struct { return JSValue.jsBoolean(this.body.value == .Used); } + pub fn getBody( + this: *Response, + globalThis: *JSC.JSGlobalObject, + ) callconv(.C) JSValue { + if (this.body.value == .Used) { + globalThis.throw("Body already used", .{}); + return JSValue.jsUndefined(); + } + + return this.body.value.toReadableStream(globalThis); + } + pub fn getStatusText( this: *Response, globalThis: *JSC.JSGlobalObject, @@ -194,21 +206,21 @@ pub const Response = struct { } pub fn cloneInto( - this: *const Response, + this: *Response, new_response: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, ) void { new_response.* = Response{ .allocator = allocator, - .body = this.body.clone(allocator, globalThis), + .body = this.body.clone(globalThis), .url = allocator.dupe(u8, this.url) catch unreachable, .status_text = allocator.dupe(u8, this.status_text) catch unreachable, .redirected = this.redirected, }; } - pub fn clone(this: *const Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response { + pub fn clone(this: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response { var new_response = allocator.create(Response) catch unreachable; this.cloneInto(new_response, allocator, globalThis); return new_response; @@ -267,7 +279,7 @@ pub const Response = struct { return default.value; }, - .Used, .Locked, .Empty, .Error => return default.value, + .InternalBlob, .Used, .Locked, .Empty, .Error => return default.value, } } @@ -552,7 +564,7 @@ pub const Fetch = struct { const globalThis = this.global_this; var ref = this.ref; - const promise_value = ref.get(globalThis); + const promise_value = ref.get(); defer ref.destroy(globalThis); if (promise_value.isEmptyOrUndefinedOrNull()) { @@ -596,7 +608,7 @@ pub const Fetch = struct { var allocator = this.global_this.bunVM().allocator; const http_response = this.result.response; var response = allocator.create(Response) catch unreachable; - const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this); + const internal_blob = this.response_buffer.list.toManaged(this.response_buffer.allocator); this.response_buffer = .{ .allocator = default_allocator, .list = .{ .items = &.{}, .capacity = 0, @@ -613,7 +625,7 @@ pub const Fetch = struct { .status_code = @truncate(u16, http_response.status_code), }, .value = .{ - .Blob = blob, + .InternalBlob = internal_blob, }, }, }; @@ -937,6 +949,15 @@ pub const Blob = struct { return this.store == null; } + pub fn writeFormatForSize(size: usize, writer: anytype, comptime enable_ansi_colors: bool) !void { + try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors)); + try writer.print( + comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors), + .{ + bun.fmt.size(size), + }, + ); + } pub fn writeFormat(this: *const Blob, formatter: *JSC.Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void { const Writer = @TypeOf(writer); @@ -947,38 +968,34 @@ pub const Blob = struct { return; } - var store = this.store.?; - switch (store.data) { - .file => |file| { - try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors)); - switch (file.pathlike) { - .path => |path| { - try writer.print( - comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors), - .{ - path.slice(), - }, - ); - }, - .fd => |fd| { - try writer.print( - comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors), - .{ - fd, - }, - ); - }, - } - }, - .bytes => { - try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors)); - try writer.print( - comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors), - .{ - bun.fmt.size(this.size), - }, - ); - }, + { + var store = this.store.?; + switch (store.data) { + .file => |file| { + try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors)); + switch (file.pathlike) { + .path => |path| { + try writer.print( + comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors), + .{ + path.slice(), + }, + ); + }, + .fd => |fd| { + try writer.print( + comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors), + .{ + fd, + }, + ); + }, + } + }, + .bytes => { + try writeFormatForSize(this.size, writer, enable_ansi_colors); + }, + } } if (this.content_type.len > 0 or this.offset > 0) { @@ -1071,7 +1088,7 @@ pub const Blob = struct { bun.default_allocator.destroy(this); promise.reject(globalThis, ZigString.init("Body was used after it was consumed").toErrorInstance(globalThis)); }, - .Empty, .Blob => { + .InternalBlob, .Empty, .Blob => { var blob = value.use(); // TODO: this should be one promise not two! const new_promise = writeFileWithSourceDestination(globalThis.ref(), &blob, &file_blob); @@ -1273,7 +1290,7 @@ pub const Blob = struct { var source_blob: Blob = brk: { if (data.as(Response)) |response| { switch (response.body.value) { - .Used, .Empty, .Blob => { + .InternalBlob, .Used, .Empty, .Blob => { break :brk response.body.use(); }, .Error => { @@ -1302,7 +1319,7 @@ pub const Blob = struct { if (data.as(Request)) |request| { switch (request.body) { - .Used, .Empty, .Blob => { + .InternalBlob, .Used, .Empty, .Blob => { break :brk request.body.use(); }, .Error => { @@ -3321,11 +3338,6 @@ pub const Blob = struct { return slice_[0..@minimum(slice_.len, @as(usize, this.size))]; } - pub fn view(this: *const Blob) []const u8 { - if (this.size == 0 or this.store == null) return ""; - return this.store.?.sharedView()[this.offset..][0..this.size]; - } - pub const Lifetime = JSC.WebCore.Lifetime; pub fn setIsASCIIFlag(this: *Blob, is_all_ascii: bool) void { this.is_all_ascii = is_all_ascii; @@ -3881,10 +3893,10 @@ pub const Body = struct { return this.value.use(); } - pub fn clone(this: Body, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) Body { + pub fn clone(this: *Body, globalThis: *JSGlobalObject) Body { return Body{ .init = this.init.clone(globalThis), - .value = this.value.clone(allocator), + .value = this.value.clone(globalThis), }; } @@ -3911,6 +3923,16 @@ pub const Body = struct { try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); try this.value.Blob.writeFormat(formatter, writer, enable_ansi_colors); + } else if (this.value == .InternalBlob) { + try formatter.printComma(Writer, writer, enable_ansi_colors); + try writer.writeAll("\n"); + try Blob.writeFormatForSize(this.value.InternalBlob.items.len, writer, enable_ansi_colors); + } else if (this.value == .Locked) { + if (this.value.Locked.readable) |stream| { + try formatter.printComma(Writer, writer, enable_ansi_colors); + try writer.writeAll("\n"); + formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); + } } } @@ -3982,6 +4004,8 @@ pub const Body = struct { /// conditionally runs when requesting data /// used in HTTP server to ignore request bodies unless asked for it onPull: ?fn (ctx: *anyopaque) void = null, + onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, + deinit: bool = false, action: Action = Action.none, @@ -4041,6 +4065,9 @@ pub const Body = struct { pub const Value = union(Tag) { Blob: Blob, + /// Single-use Blob + /// Avoids a heap allocation. + InternalBlob: std.ArrayList(u8), Locked: PendingValue, Used: void, Empty: void, @@ -4048,6 +4075,7 @@ pub const Body = struct { pub const Tag = enum { Blob, + InternalBlob, Locked, Used, Empty, @@ -4056,6 +4084,87 @@ pub const Body = struct { pub const empty = Value{ .Empty = .{} }; + + pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) JSValue { + JSC.markBinding(); + + switch (this.*) { + .Used, .Empty => { + return JSC.WebCore.ReadableStream.empty(globalThis); + }, + .InternalBlob => |*bytes| { + var blob = Blob.init(bytes.toOwnedSlice(), bytes.allocator, globalThis); + defer blob.detach(); + var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); + this.* = .{ + .Locked = .{ + .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?, + .global = globalThis, + }, + }; + return readable; + }, + .Blob => { + var blob = this.Blob; + defer blob.detach(); + var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); + readable.protect(); + this.* = .{ + .Locked = .{ + .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?, + .global = globalThis, + }, + }; + return readable; + }, + .Locked => { + var locked = &this.Locked; + if (locked.readable) |readable| { + return readable.value; + } + var drain_result: JSC.WebCore.DrainResult = .{ + .estimated_size = 0, + }; + + if (locked.onDrain) |drain| { + locked.onDrain = null; + drain_result = drain(locked.task.?); + } + + if (drain_result == .empty or drain_result == .aborted) { + this.* = .{ .Empty = void{} }; + return JSC.WebCore.ReadableStream.empty(globalThis); + } + + var reader = bun.default_allocator.create(JSC.WebCore.ByteStream.Source) catch unreachable; + reader.* = .{ + .context = undefined, + .globalThis = globalThis, + }; + + reader.context.setup(); + + if (drain_result == .estimated_size) { + reader.context.highWaterMark = @truncate(Blob.SizeType, drain_result.estimated_size); + reader.context.size_hint = @truncate(Blob.SizeType, drain_result.estimated_size); + } else if (drain_result == .owned) { + reader.context.buffer = drain_result.owned.list; + reader.context.size_hint = @truncate(Blob.SizeType, drain_result.owned.size_hint); + } + + locked.readable = .{ + .ptr = .{ .Bytes = &reader.context }, + .value = reader.toJS(globalThis), + }; + + locked.readable.?.value.protect(); + return locked.readable.?.value; + }, + + else => unreachable, + } + } + pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) ?Value { if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |readable| { switch (readable.ptr) { @@ -4121,13 +4230,29 @@ pub const Body = struct { if (locked.promise) |promise| { locked.promise = null; - var blob = new.use(); switch (locked.action) { .getText => { - promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref())); + if (new.* == .InternalBlob) { + var bytes = new.InternalBlob; + new.* = .{ .Empty = void{} }; + var str = ZigString.init(bytes.items).withEncoding(); + str.mark(); + if (str.is16Bit()) { + const out = str.toValueGC(global); + bytes.deinit(); + promise.asPromise().?.resolve(global, out); + } else { + const out = str.toExternalValue(global); + promise.asPromise().?.resolve(global, out); + } + } else { + var blob = new.use(); + promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref())); + } }, .getJSON => { + var blob = new.use(); const json_value = blob.toJSON(global, .share); blob.detach(); @@ -4138,17 +4263,25 @@ pub const Body = struct { } }, .getArrayBuffer => { - promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global)); + if (new.* == .InternalBlob) { + const marked = JSC.MarkedArrayBuffer.fromBytes(new.InternalBlob.items, new.InternalBlob.allocator, .ArrayBuffer); + new.* = .{ .Empty = void{} }; + var object_ref = marked.toJS(global, null); + promise.asPromise().?.resolve(global, JSC.JSValue.c(object_ref)); + } else { + var blob = new.use(); + promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global)); + } }, .getBlob => { var ptr = bun.default_allocator.create(Blob) catch unreachable; - ptr.* = blob; + ptr.* = new.use(); ptr.allocator = bun.default_allocator; promise.asPromise().?.resolve(global, ptr.toJS(global)); }, else => { var ptr = bun.default_allocator.create(Blob) catch unreachable; - ptr.* = blob; + ptr.* = new.use(); ptr.allocator = bun.default_allocator; promise.asInternalPromise().?.resolve(global, ptr.toJS(global)); }, @@ -4160,6 +4293,7 @@ pub const Body = struct { pub fn slice(this: Value) []const u8 { return switch (this) { .Blob => this.Blob.sharedView(), + .InternalBlob => |list| list.items, else => "", }; } @@ -4172,6 +4306,15 @@ pub const Body = struct { this.* = .{ .Used = .{} }; return new_blob; }, + .InternalBlob => { + var new_blob = Blob.init( + this.InternalBlob.toOwnedSlice(), + this.InternalBlob.allocator, + JSC.VirtualMachine.vm.global, + ); + this.* = .{ .Used = .{} }; + return new_blob; + }, else => { return Blob.initEmpty(undefined); }, @@ -4228,14 +4371,22 @@ pub const Body = struct { pub fn deinit(this: *Value) void { const tag = @as(Tag, this.*); if (tag == .Locked) { - if (this.Locked.readable) |*readable| { - readable.done(); + if (!this.Locked.deinit) { + this.Locked.deinit = true; + + if (this.Locked.readable) |*readable| { + readable.done(); + } } - this.Locked.deinit = true; return; } + if (tag == .InternalBlob) { + this.InternalBlob.clearAndFree(); + this.* = Value.empty; + } + if (tag == .Blob) { this.Blob.deinit(); this.* = Value.empty; @@ -4246,8 +4397,18 @@ pub const Body = struct { } } - pub fn clone(this: Value, _: std.mem.Allocator) Value { - if (this == .Blob) { + pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value { + if (this.* == .InternalBlob) { + this.* = .{ + .Blob = Blob.init( + this.InternalBlob.toOwnedSlice(), + this.InternalBlob.allocator, + globalThis, + ), + }; + } + + if (this.* == .Blob) { return Value{ .Blob = this.Blob.dupe() }; } @@ -4319,10 +4480,16 @@ pub const Body = struct { } else |_| {} } + if (value.isUndefined()) { + body.value = Value.empty; + return body; + } + body.value = Value.fromJS(globalThis, value) orelse return null; if (body.value == .Blob) std.debug.assert(body.value.Blob.allocator == null); // owned by Body + return body; } }; @@ -4358,7 +4525,18 @@ pub const Request = struct { try writer.writeAll("\""); if (this.body == .Blob) { try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); try this.body.Blob.writeFormat(formatter, writer, enable_ansi_colors); + } else if (this.body == .InternalBlob) { + try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); + try Blob.writeFormatForSize(this.body.InternalBlob.items.len, writer, enable_ansi_colors); + } else if (this.body == .Locked) { + if (this.body.Locked.readable) |stream| { + try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); + formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); + } } } try writer.writeAll("\n"); @@ -4392,7 +4570,7 @@ pub const Request = struct { return MimeType.other.value; }, - .Error, .Used, .Locked, .Empty => return MimeType.other.value, + .InternalBlob, .Error, .Used, .Locked, .Empty => return MimeType.other.value, } } @@ -4415,6 +4593,18 @@ pub const Request = struct { return ZigString.init("").toValueGC(globalThis); } + pub fn getBody( + this: *Request, + globalThis: *JSC.JSGlobalObject, + ) callconv(.C) JSValue { + if (this.body == .Used) { + globalThis.throw("Body already used", .{}); + return JSValue.jsUndefined(); + } + + return this.body.toReadableStream(globalThis); + } + pub fn getIntegrity( _: *Request, globalThis: *JSC.JSGlobalObject, @@ -4521,17 +4711,9 @@ pub const Request = struct { } if (urlOrObject.fastGet(globalThis, .body)) |body_| { - if (Blob.get(globalThis, body_, true, false)) |blob| { - if (blob.size > 0) { - request.body = Body.Value{ .Blob = blob }; - } - } else |err| { - if (err == error.InvalidArguments) { - globalThis.throwInvalidArguments("Expected an Array", .{}); - return null; - } - - globalThis.throwInvalidArguments("Invalid Body object", .{}); + if (Body.Value.fromJS(globalThis, body_)) |body| { + request.body = body.value; + } else { return null; } } @@ -4546,17 +4728,9 @@ pub const Request = struct { } if (arguments[1].fastGet(globalThis, .body)) |body_| { - if (Blob.get(globalThis, body_, true, false)) |blob| { - if (blob.size > 0) { - request.body = Body.Value{ .Blob = blob }; - } - } else |err| { - if (err == error.InvalidArguments) { - globalThis.throwInvalidArguments("Expected an Array", .{}); - return null; - } - - globalThis.throwInvalidArguments("Invalid Body object", .{}); + if (Body.Value.fromJS(globalThis, body_)) |body| { + request.body = body.value; + } else { return null; } } @@ -4614,7 +4788,7 @@ pub const Request = struct { globalThis: *JSGlobalObject, ) void { req.* = Request{ - .body = this.body.clone(allocator), + .body = this.body.clone(globalThis), .url = ZigString.init(allocator.dupe(u8, this.url.slice()) catch unreachable), .method = this.method, }; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 23aad70ec..681ff6172 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -70,7 +70,7 @@ pub const ReadableStream = struct { pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void { JSC.markBinding(); this.value.unprotect(); - ReadableStream__abort(this.value, globalThis); + ReadableStream__cancel(this.value, globalThis); } pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void { @@ -98,6 +98,8 @@ pub const ReadableStream = struct { /// This is a direct readable stream /// That means we can turn it into whatever we want Direct = 3, + + Bytes = 4, }; pub const Source = union(Tag) { Invalid: void, @@ -116,6 +118,8 @@ pub const ReadableStream = struct { /// This is a direct readable stream /// That means we can turn it into whatever we want Direct: void, + + Bytes: *ByteStream, }; extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag; @@ -165,6 +169,13 @@ pub const ReadableStream = struct { }, }, + .Bytes => ReadableStream{ + .value = value, + .ptr = .{ + .Bytes = ptr.asPtr(ByteStream), + }, + }, + // .HTTPRequest => ReadableStream{ // .value = value, // .ptr = .{ @@ -184,6 +195,7 @@ pub const ReadableStream = struct { extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue; pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue { + JSC.markBinding(); return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id))); } pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { @@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) { } }; +pub const DrainResult = union(enum) { + owned: struct { + list: std.ArrayList(u8), + size_hint: usize, + }, + estimated_size: usize, + empty: void, + aborted: void, +}; + pub const StreamResult = union(Tag) { owned: bun.ByteList, owned_and_done: bun.ByteList, @@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct { pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit); }; -pub fn RequestBodyStreamer( - comptime is_ssl: bool, -) type { - return struct { - response: *uws.NewApp(is_ssl).Response, +pub const ByteStream = struct { + buffer: std.ArrayList(u8) = .{ + .allocator = bun.default_allocator, + .items = &.{}, + .capacity = 0, + }, + has_received_last_chunk: bool = false, + pending: StreamResult.Pending = StreamResult.Pending{ + .frame = undefined, + .used = false, + .result = .{ .done = {} }, + }, + done: bool = false, + pending_buffer: []u8 = &.{}, + pending_value: ?*JSC.napi.Ref = null, + offset: usize = 0, + highWaterMark: Blob.SizeType = 0, + pipe: Pipe = .{}, + size_hint: Blob.SizeType = 0, + + pub const Pipe = struct { + ctx: ?*anyopaque = null, + onPipe: ?PipeFunction = null, + + pub fn New(comptime Type: type, comptime Function: anytype) type { + return struct { + pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void { + Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator); + } - pub const tag = if (is_ssl) - ReadableStream.Tag.HTTPRequest - else if (is_ssl) - ReadableStream.Tag.HTTPSRequest; + pub fn init(self: *Type) Pipe { + return Pipe{ + .ctx = self, + .onPipe = pipe, + }; + } + }; + } + }; - pub fn onStart(this: *ByteBlobLoader) StreamStart { - return .{ .chunk_size = this.chunk_size }; + pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; + + pub const tag = ReadableStream.Tag.Bytes; + + pub fn setup(this: *ByteStream) void { + this.* = .{}; + } + + pub fn onStart(this: *@This()) StreamStart { + if (this.has_received_last_chunk and this.buffer.items.len == 0) { + return .{ .empty = void{} }; } - pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult { - array.ensureStillAlive(); - defer array.ensureStillAlive(); - if (this.done) { - return .{ .done = {} }; + if (this.has_received_last_chunk) { + return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) }; + } + + if (this.highWaterMark == 0) { + return .{ .ready = void{} }; + } + + return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) }; + } + + pub fn value(this: *@This()) JSValue { + if (this.pending_value == null) + return .zero; + + const result = this.pending_value.?.get(); + this.pending_value.?.set(.zero); + + return result; + } + + pub fn isCancelled(this: *const @This()) bool { + return @fieldParentPtr(Source, "context", this).cancelled; + } + + pub fn unpipe(this: *@This()) void { + this.pipe.ctx = null; + this.pipe.onPipe = null; + if (!this.parent().deinited) { + this.parent().deinited = true; + bun.default_allocator.destroy(this.parent()); + } + } + + pub fn onData( + this: *@This(), + stream: StreamResult, + allocator: std.mem.Allocator, + ) void { + JSC.markBinding(); + if (this.done) { + if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) { + if (stream == .owned) allocator.free(stream.owned.slice()); + if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice()); } - var temporary = this.store.sharedView(); - temporary = temporary[this.offset..]; + return; + } + + std.debug.assert(!this.has_received_last_chunk); + this.has_received_last_chunk = stream.isDone(); + + if (this.pipe.ctx != null) { + this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator); + return; + } + + const chunk = stream.slice(); + + if (!this.pending.used) { + std.debug.assert(this.buffer.items.len == 0); + var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)]; + const pending_buffer_len = this.pending_buffer.len; + @memcpy(to_copy.ptr, chunk.ptr, to_copy.len); + this.pending_buffer = &.{}; + + const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len; - temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))]; - if (temporary.len == 0) { - this.store.deref(); + if (is_really_done) { this.done = true; - return .{ .done = {} }; + this.pending.result = .{ + .into_array_and_done = .{ + .value = this.value(), + .len = @truncate(Blob.SizeType, to_copy.len), + }, + }; + } else { + this.pending.result = .{ + .into_array = .{ + .value = this.value(), + .len = @truncate(Blob.SizeType, to_copy.len), + }, + }; } - const copied = @intCast(Blob.SizeType, temporary.len); + const remaining = chunk[to_copy.len..]; + if (remaining.len > 0) + this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body"); + resume this.pending.frame; + return; + } - this.remain -|= copied; - this.offset +|= copied; - @memcpy(buffer.ptr, temporary.ptr, temporary.len); - if (this.remain == 0) { - return .{ .into_array_and_done = .{ .value = array, .len = copied } }; + this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body"); + } + + pub fn append( + this: *@This(), + stream: StreamResult, + offset: usize, + allocator: std.mem.Allocator, + ) !void { + const chunk = stream.slice()[offset..]; + + if (this.buffer.capacity == 0) { + switch (stream) { + .owned => |owned| { + this.buffer = owned.listManaged(allocator); + this.offset += offset; + }, + .owned_and_done => |owned| { + this.buffer = owned.listManaged(allocator); + this.offset += offset; + }, + .temporary_and_done, .temporary => { + this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len); + this.buffer.appendSliceAssumeCapacity(chunk); + }, + else => unreachable, } + return; + } + + switch (stream) { + .temporary_and_done, .temporary => { + try this.buffer.appendSlice(chunk); + }, + // We don't support the rest of these yet + else => unreachable, + } + } - return .{ .into_array = .{ .value = array, .len = copied } }; + pub fn setValue(this: *@This(), view: JSC.JSValue) void { + JSC.markBinding(); + if (this.pending_value) |pending| { + pending.set(view); + } else { + this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view); } + } - pub fn onCancel(_: *ByteBlobLoader) void {} + pub fn parent(this: *@This()) *Source { + return @fieldParentPtr(Source, "context", this); + } - pub fn deinit(this: *ByteBlobLoader) void { - if (!this.done) { + pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult { + JSC.markBinding(); + std.debug.assert(buffer.len > 0); + + if (this.buffer.items.len > 0) { + std.debug.assert(this.value() == .zero); + const to_write = @minimum( + this.buffer.items.len - this.offset, + buffer.len, + ); + var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write]; + + @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write); + + if (this.offset + to_write == this.buffer.items.len) { + this.offset = 0; + this.buffer.items.len = 0; + } else { + this.offset += to_write; + } + + if (this.has_received_last_chunk and remaining_in_buffer.len == 0) { + this.buffer.clearAndFree(); this.done = true; - this.store.deref(); + + return .{ + .into_array_and_done = .{ + .value = view, + .len = @truncate(Blob.SizeType, to_write), + }, + }; + } + + return .{ + .into_array = .{ + .value = view, + .len = @truncate(Blob.SizeType, to_write), + }, + }; + } + + if (this.has_received_last_chunk) { + return .{ + .done = void{}, + }; + } + + this.pending_buffer = buffer; + this.setValue(view); + + return .{ + .pending = &this.pending, + }; + } + + pub fn onCancel(this: *@This()) void { + JSC.markBinding(); + const view = this.value(); + if (this.buffer.capacity > 0) this.buffer.clearAndFree(); + this.done = true; + if (this.pending_value) |ref| { + this.pending_value = null; + ref.destroy(undefined); + } + + if (view != .zero) { + this.pending_buffer = &.{}; + this.pending.result = .{ .done = {} }; + if (!this.pending.used) { + resume this.pending.frame; } + } + } + + pub fn deinit(this: *@This()) void { + JSC.markBinding(); + if (this.buffer.capacity > 0) this.buffer.clearAndFree(); - bun.default_allocator.destroy(this); + if (this.pending_value) |ref| { + this.pending_value = null; + ref.destroy(undefined); } + if (!this.done) { + this.done = true; - pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer"; - pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit); - }; -} + this.pending_buffer = &.{}; + this.pending.result = .{ .done = {} }; + + if (!this.pending.used) { + resume this.pending.frame; + } + } + + bun.default_allocator.destroy(this.parent()); + } + + pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit); +}; pub const FileBlobLoader = struct { buf: []u8 = &[_]u8{}, diff --git a/src/napi/napi.zig b/src/napi/napi.zig index 54e16d153..6b191167f 100644 --- a/src/napi/napi.zig +++ b/src/napi/napi.zig @@ -12,6 +12,7 @@ const Channel = @import("../sync.zig").Channel; pub const napi_env = *JSC.JSGlobalObject; pub const Ref = opaque { pub fn create(globalThis: *JSC.JSGlobalObject, value: JSValue) *Ref { + JSC.markBinding(); var ref: *Ref = undefined; std.debug.assert( napi_create_reference( @@ -22,20 +23,27 @@ pub const Ref = opaque { ) == .ok, ); if (comptime bun.Environment.isDebug) { - std.debug.assert(ref.get(globalThis) == value); + std.debug.assert(ref.get() == value); } return ref; } - pub fn get(ref: *Ref, globalThis: *JSC.JSGlobalObject) JSValue { - var value: JSValue = JSValue.zero; - std.debug.assert(napi_get_reference_value(globalThis, ref, &value) == .ok); - return value; + pub fn get(ref: *Ref) JSValue { + JSC.markBinding(); + return napi_get_reference_value_internal(ref); } pub fn destroy(ref: *Ref, globalThis: *JSC.JSGlobalObject) void { + JSC.markBinding(); std.debug.assert(napi_delete_reference(globalThis, ref) == .ok); } + + pub fn set(this: *Ref, value: JSC.JSValue) void { + JSC.markBinding(); + napi_set_ref(this, value); + } + + extern fn napi_set_ref(ref: *Ref, value: JSC.JSValue) void; }; pub const napi_handle_scope = napi_env; pub const napi_escapable_handle_scope = struct_napi_escapable_handle_scope__; @@ -664,6 +672,7 @@ pub extern fn napi_delete_reference(env: napi_env, ref: *Ref) napi_status; pub extern fn napi_reference_ref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status; pub extern fn napi_reference_unref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status; pub extern fn napi_get_reference_value(env: napi_env, ref: *Ref, result: *napi_value) napi_status; +pub extern fn napi_get_reference_value_internal(ref: *Ref) JSC.JSValue; // JSC scans the stack // we don't need this diff --git a/test/bun.js/streams.test.js b/test/bun.js/streams.test.js index 4e70409a6..1268a04c0 100644 --- a/test/bun.js/streams.test.js +++ b/test/bun.js/streams.test.js @@ -22,6 +22,34 @@ it("exists globally", () => { expect(typeof CountQueuingStrategy).toBe("function"); }); +it("new Response(stream).body", async () => { + var stream = new ReadableStream({ + pull(controller) { + controller.enqueue("hello"); + controller.enqueue("world"); + controller.close(); + }, + cancel() {}, + }); + var response = new Response(stream); + expect(response.body).toBe(stream); + expect(await response.text()).toBe("helloworld"); +}); + +it("new Request({body: stream}).body", async () => { + var stream = new ReadableStream({ + pull(controller) { + controller.enqueue("hello"); + controller.enqueue("world"); + controller.close(); + }, + cancel() {}, + }); + var response = new Request({ body: stream }); + expect(response.body).toBe(stream); + expect(await response.text()).toBe("helloworld"); +}); + it("ReadableStream (readMany)", async () => { var stream = new ReadableStream({ pull(controller) { |