diff options
| author | 2022-09-26 20:04:28 -0700 | |
|---|---|---|
| committer | 2022-09-26 20:04:28 -0700 | |
| commit | 24a9bc23b7e1c7911cb2e146be199d940b9729e6 (patch) | |
| tree | 852a75cff3950063b405ca3a0dfe22e46d0eecfb /src | |
| parent | 97c3688788a94faffb6bceb4bc6c97fb84307ceb (diff) | |
| download | bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.gz bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.tar.zst bun-24a9bc23b7e1c7911cb2e146be199d940b9729e6.zip | |
[Web Streams] Add `body` to `Response` and `Request` (#1255)
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src')
| -rw-r--r-- | src/bun.js/api/server.zig | 293 | ||||
| -rw-r--r-- | src/bun.js/bindings/ZigGeneratedClasses.cpp | 52 | ||||
| -rw-r--r-- | src/bun.js/bindings/ZigGeneratedClasses.h | 6 | ||||
| -rw-r--r-- | src/bun.js/bindings/exports.zig | 1 | ||||
| -rw-r--r-- | src/bun.js/bindings/generated_classes.zig | 8 | ||||
| -rw-r--r-- | src/bun.js/bindings/headers-cpp.h | 2 | ||||
| -rw-r--r-- | src/bun.js/bindings/headers.h | 8 | ||||
| -rw-r--r-- | src/bun.js/bindings/napi.cpp | 16 | ||||
| -rw-r--r-- | src/bun.js/webcore/response.classes.ts | 2 | ||||
| -rw-r--r-- | src/bun.js/webcore/response.zig | 340 | ||||
| -rw-r--r-- | src/bun.js/webcore/streams.zig | 336 | ||||
| -rw-r--r-- | src/napi/napi.zig | 19 | 
12 files changed, 928 insertions, 155 deletions
| diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index d7cfbe4c1..939e010f3 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -538,7 +538,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp          sendfile: SendfileContext = undefined,          request_js_object: JSC.C.JSObjectRef = null,          request_body_buf: std.ArrayListUnmanaged(u8) = .{}, +        request_body_content_len: usize = 0,          sink: ?*ResponseStream.JSSink = null, +        byte_stream: ?*JSC.WebCore.ByteStream = null,          has_written_status: bool = false, @@ -696,20 +698,67 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              }              this.response_buf_owned = std.ArrayListUnmanaged(u8){ .items = bb.items, .capacity = bb.capacity }; -            this.renderResponseBuffer(); +            this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);          }          pub fn renderResponseBuffer(this: *RequestContext) void {              this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this);          } -        pub fn onWritableResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool { +        /// Render a complete response buffer +        pub fn renderResponseBufferAndMetadata(this: *RequestContext) void { +            this.renderMetadata(); + +            if (!this.resp.tryEnd( +                this.response_buf_owned.items, +                this.response_buf_owned.items.len, +            )) { +                this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); +                this.setAbortHandler(); +                return; +            } + +            this.finalize(); +        } + +        /// Drain a partial response buffer +        pub fn drainResponseBufferAndMetadata(this: *RequestContext) void { +            this.renderMetadata(); +            this.setAbortHandler(); + +            _ = this.resp.write( +                this.response_buf_owned.items, +            ); + +            this.response_buf_owned.items.len = 0; +        } + +        pub fn renderResponseBufferAndMetadataCorked(this: *RequestContext) void { +            this.resp.runCorkedWithType(*RequestContext, renderResponseBufferAndMetadata, this); +        } + +        pub fn drainResponseBufferAndMetadataCorked(this: *RequestContext) void { +            this.resp.runCorkedWithType(*RequestContext, drainResponseBufferAndMetadata, this); +        } + +        pub fn onWritableResponseBuffer(this: *RequestContext, _: c_ulong, resp: *App.Response) callconv(.C) bool { +            std.debug.assert(this.resp == resp); +            if (this.aborted) { +                this.finalizeForAbort(); +                return false; +            } +            resp.end("", false); +            this.finalize(); +            return false; +        } + +        pub fn onWritableCompleteResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool {              std.debug.assert(this.resp == resp);              if (this.aborted) {                  this.finalizeForAbort();                  return false;              } -            return this.sendWritableBytes(this.response_buf_owned.items, write_offset, resp); +            return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp);          }          pub fn create(this: *RequestContext, server: *ThisServer, req: *uws.Request, resp: *App.Response) void { @@ -772,11 +821,24 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp                          if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) {                              this.pending_promises_for_abort += 1;                              req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); +                        } else if (req.body == .Locked and (req.body.Locked.readable != null)) { +                            req.body.Locked.readable.?.abort(this.server.globalThis); +                            req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); +                            req.body.Locked.readable = null;                          }                          req.uws_request = null;                      }                  } +                if (this.response_ptr) |response| { +                    if (response.body.value == .Locked) { +                        if (response.body.value.Locked.readable) |*readable| { +                            response.body.value.Locked.readable = null; +                            readable.abort(this.server.globalThis); +                        } +                    } +                } +                  // then, we reject the response promise                  if (this.promise) |promise| {                      this.pending_promises_for_abort += 1; @@ -841,6 +903,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp                  }                  JSC.C.JSValueUnprotect(this.server.globalThis.ref(), promise.asObjectRef());              } + +            if (this.byte_stream) |stream| { +                this.byte_stream = null; +                stream.unpipe(); +            }          }          pub fn finalize(this: *RequestContext) void {              this.finalizeWithoutDeinit(); @@ -974,10 +1041,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              }              var bytes = this.blob.sharedView(); -            return this.sendWritableBytes(bytes, write_offset, resp); +            _ = this.sendWritableBytesForBlob(bytes, write_offset, resp); +            return true;          } -        pub fn sendWritableBytes(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { +        pub fn sendWritableBytesForBlob(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {              std.debug.assert(this.resp == resp);              var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..]; @@ -990,6 +1058,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              }          } +        pub fn sendWritableBytesForCompleteResponseBuffer(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { +            std.debug.assert(this.resp == resp); + +            var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..]; +            if (resp.tryEnd(bytes, bytes_.len)) { +                this.response_buf_owned.items.len = 0; +                this.finalize(); +            } else { +                this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); +            } + +            return true; +        } +          pub fn onWritableSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool {              return this.onSendfile();          } @@ -1125,7 +1207,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              } else {                  this.blob.size = @truncate(Blob.SizeType, result.result.buf.len);                  this.response_buf_owned = .{ .items = result.result.buf, .capacity = result.result.buf.len }; -                this.renderResponseBuffer(); +                this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);              }          } @@ -1481,6 +1563,41 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp                                  this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair);                                  return;                              }, + +                            .Bytes => |byte_stream| { +                                std.debug.assert(byte_stream.pipe.ctx == null); +                                std.debug.assert(this.byte_stream == null); + +                                stream.detach(this.server.globalThis); + +                                this.response_buf_owned = byte_stream.buffer.moveToUnmanaged(); + +                                // If we've received the complete body by the time this function is called +                                // we can avoid streaming it and just send it all at once. +                                if (byte_stream.has_received_last_chunk) { +                                    this.blob.size = @truncate(Blob.SizeType, this.response_buf_owned.items.len); +                                    byte_stream.parent().deinit(); +                                    this.renderResponseBufferAndMetadataCorked(); +                                    return; +                                } + +                                byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this); +                                this.byte_stream = byte_stream; + +                                // we don't set size here because even if we have a hint +                                // uWebSockets won't let us partially write streaming content +                                this.blob.size = 0; + +                                // if we've received metadata and part of the body, send everything we can and drain +                                if (this.response_buf_owned.items.len > 0) { +                                    this.drainResponseBufferAndMetadataCorked(); +                                } else { +                                    // if we only have metadata to send, send it now +                                    this.resp.runCorkedWithType(*RequestContext, renderMetadata, this); +                                } +                                this.setAbortHandler(); +                                return; +                            },                          }                      } @@ -1496,6 +1613,42 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              this.doRenderBlob();          } +        pub fn onPipe(this: *RequestContext, stream: JSC.WebCore.StreamResult, allocator: std.mem.Allocator) void { +            var stream_needs_deinit = stream == .owned or stream == .owned_and_done; + +            defer { +                if (stream_needs_deinit) { +                    if (stream.isDone()) { +                        stream.owned_and_done.listManaged(allocator).deinit(); +                    } else { +                        stream.owned.listManaged(allocator).deinit(); +                    } +                } +            } + +            if (this.aborted) { +                this.finalizeForAbort(); +                return; +            } + +            const chunk = stream.slice(); +            // on failure, it will continue to allocate +            // we can't do buffering ourselves here or it won't work +            // uSockets will append and manage the buffer +            // so any write will buffer if the write fails +            if (this.resp.write(chunk)) { +                if (stream.isDone()) { +                    this.resp.endStream(false); +                    this.finalize(); +                } +            } else { +                // when it's the last one, we just want to know if it's done +                if (stream.isDone()) { +                    this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this); +                } +            } +        } +          pub fn doRenderBlob(this: *RequestContext) void {              // We are not corked              // The body is small @@ -1712,34 +1865,123 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              if (this.aborted) return; -            this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); +            const request = JSC.JSValue.fromRef(this.request_js_object); +            var req = request.as(Request) orelse { +                this.request_body_buf.clearAndFree(this.allocator); +                return; +            }; + +            if (req.body == .Locked) { +                if (req.body.Locked.readable) |readable| { +                    if (readable.ptr == .Bytes) { +                        std.debug.assert(this.request_body_buf.items.len == 0); + +                        if (!last) { +                            readable.ptr.Bytes.onData( +                                .{ +                                    .temporary = bun.ByteList.init(chunk), +                                }, +                                bun.default_allocator, +                            ); +                        } else { +                            readable.ptr.Bytes.onData( +                                .{ +                                    .temporary_and_done = bun.ByteList.init(chunk), +                                }, +                                bun.default_allocator, +                            ); +                        } + +                        return; +                    } +                } +            } +              if (last) { -                const request = JSC.JSValue.fromRef(this.request_js_object); -                if (request.as(Request)) |req| { -                    request.ensureStillAlive(); -                    var bytes = this.request_body_buf.toOwnedSlice(this.allocator); -                    var old = req.body; -                    req.body = .{ -                        .Blob = if (bytes.len > 0) -                            Blob.init(bytes, this.allocator, this.server.globalThis) -                        else -                            Blob.initEmpty(this.server.globalThis), -                    }; -                    if (old == .Locked) -                        old.resolve(&req.body, this.server.globalThis); -                    request.unprotect(); +                request.ensureStillAlive(); +                var bytes = this.request_body_buf; +                var old = req.body; +                if (bytes.items.len == 0) { +                    req.body = .{ .Empty = {} };                  } else { -                    this.request_body_buf.clearAndFree(this.allocator); +                    req.body = .{ .InternalBlob = bytes.toManaged(this.allocator) };                  } + +                if (old == .Locked) +                    old.resolve(&req.body, this.server.globalThis); +                request.unprotect();              } + +            if (this.request_body_buf.capacity == 0) { +                this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @minimum(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer"); +            } + +            this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body");          } +        pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult { +            if (this.aborted) { +                return JSC.WebCore.DrainResult{ +                    .aborted = void{}, +                }; +            } + +            std.debug.assert(!this.resp.hasResponded()); + +            // This means we have received part of the body but not the whole thing +            if (this.request_body_buf.items.len > 0) { +                var emptied = this.request_body_buf; +                this.request_body_buf = .{}; +                return .{ +                    .owned = .{ +                        .list = emptied.toManaged(this.allocator), +                        .size_hint = if (emptied.capacity < max_request_body_preallocate_length) +                            emptied.capacity +                        else +                            0, +                    }, +                }; +            } + +            const content_length = this.req.header("content-length") orelse { +                return .{ +                    .empty = void{}, +                }; +            }; + +            const len = std.fmt.parseInt(usize, content_length, 10) catch 0; +            this.request_body_content_len = len; + +            if (len == 0) { +                return JSC.WebCore.DrainResult{ +                    .empty = void{}, +                }; +            } + +            if (len > this.server.config.max_request_body_size) { +                this.resp.writeStatus("413 Request Entity Too Large"); +                this.resp.endWithoutBody(); + +                this.finalize(); +                return JSC.WebCore.DrainResult{ +                    .aborted = void{}, +                }; +            } + +            this.resp.onData(*RequestContext, onBufferedBodyChunk, this); + +            return .{ +                .estimated_size = len, +            }; +        } +        const max_request_body_preallocate_length = 1024 * 256;          pub fn onPull(this: *RequestContext) void {              const request = JSC.JSValue.c(this.request_js_object);              request.ensureStillAlive();              if (this.req.header("content-length")) |content_length| {                  const len = std.fmt.parseInt(usize, content_length, 10) catch 0; +                this.request_body_content_len = len;                  if (len == 0) {                      if (request.as(Request)) |req| {                          var old = req.body; @@ -1766,8 +2008,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp                      this.finalize();                      return;                  } - -                this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, len) catch @panic("Out of memory while allocating request body buffer");              } else if (this.req.header("transfer-encoding") == null) {                  // no content-length                  // no transfer-encoding @@ -1789,6 +2029,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp              onPull(bun.cast(*RequestContext, this));          } +        pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult { +            return onDrainRequestBody(bun.cast(*RequestContext, this)); +        } +          pub const Export = shim.exportFunctions(.{              .onResolve = onResolve,              .onReject = onReject, @@ -2243,6 +2487,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {                          .task = ctx,                          .global = this.globalThis,                          .onPull = RequestContext.onPullCallback, +                        .onDrain = RequestContext.onDrainRequestBodyCallback,                      },                  },              }; diff --git a/src/bun.js/bindings/ZigGeneratedClasses.cpp b/src/bun.js/bindings/ZigGeneratedClasses.cpp index 78f1bb179..925814c2d 100644 --- a/src/bun.js/bindings/ZigGeneratedClasses.cpp +++ b/src/bun.js/bindings/ZigGeneratedClasses.cpp @@ -2505,6 +2505,10 @@ extern "C" EncodedJSValue RequestPrototype__getBlob(void* ptr, JSC::JSGlobalObje  JSC_DECLARE_HOST_FUNCTION(RequestPrototype__blobCallback); +extern "C" JSC::EncodedJSValue RequestPrototype__getBody(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject); +JSC_DECLARE_CUSTOM_GETTER(RequestPrototype__bodyGetterWrap); + +  extern "C" JSC::EncodedJSValue RequestPrototype__getBodyUsed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject);  JSC_DECLARE_CUSTOM_GETTER(RequestPrototype__bodyUsedGetterWrap); @@ -2571,6 +2575,7 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSRequestPrototype, JSRequestPrototype::Base    static const HashTableValue JSRequestPrototypeTableValues[] = {  { "arrayBuffer"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__arrayBufferCallback, 0 } }  ,  { "blob"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__blobCallback, 0 } }  , +{ "body"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__bodyGetterWrap, 0 } }  ,  { "bodyUsed"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__bodyUsedGetterWrap, 0 } }  ,  { "cache"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, RequestPrototype__cacheGetterWrap, 0 } }  ,  { "clone"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, RequestPrototype__cloneCallback, 1 } }  , @@ -2641,6 +2646,26 @@ JSC_DEFINE_HOST_FUNCTION(RequestPrototype__blobCallback, (JSGlobalObject * lexic  } +JSC_DEFINE_CUSTOM_GETTER(RequestPrototype__bodyGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) +{ +    auto& vm = lexicalGlobalObject->vm(); +    Zig::GlobalObject *globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject); +    auto throwScope = DECLARE_THROW_SCOPE(vm); +    JSRequest* thisObject = jsCast<JSRequest*>(JSValue::decode(thisValue)); +      JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); +     +    if (JSValue cachedValue = thisObject->m_body.get()) +        return JSValue::encode(cachedValue); +     +    JSC::JSValue result = JSC::JSValue::decode( +        RequestPrototype__getBody(thisObject->wrapped(), globalObject) +    ); +    RETURN_IF_EXCEPTION(throwScope, {}); +    thisObject->m_body.set(vm, thisObject, result); +    RELEASE_AND_RETURN(throwScope, JSValue::encode(result)); +} + +  JSC_DEFINE_CUSTOM_GETTER(RequestPrototype__bodyUsedGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))  {      auto& vm = lexicalGlobalObject->vm(); @@ -3002,6 +3027,7 @@ void JSRequest::visitChildrenImpl(JSCell* cell, Visitor& visitor)      JSRequest* thisObject = jsCast<JSRequest*>(cell);      ASSERT_GC_OBJECT_INHERITS(thisObject, info());      Base::visitChildren(thisObject, visitor); +    visitor.append(thisObject->m_body);      visitor.append(thisObject->m_headers);      visitor.append(thisObject->m_url);  } @@ -3018,6 +3044,10 @@ extern "C" EncodedJSValue ResponsePrototype__getBlob(void* ptr, JSC::JSGlobalObj  JSC_DECLARE_HOST_FUNCTION(ResponsePrototype__blobCallback); +extern "C" JSC::EncodedJSValue ResponsePrototype__getBody(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject); +JSC_DECLARE_CUSTOM_GETTER(ResponsePrototype__bodyGetterWrap); + +  extern "C" JSC::EncodedJSValue ResponsePrototype__getBodyUsed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject);  JSC_DECLARE_CUSTOM_GETTER(ResponsePrototype__bodyUsedGetterWrap); @@ -3064,6 +3094,7 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSResponsePrototype, JSResponsePrototype::Ba    static const HashTableValue JSResponsePrototypeTableValues[] = {  { "arrayBuffer"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__arrayBufferCallback, 0 } }  ,  { "blob"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__blobCallback, 0 } }  , +{ "body"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__bodyGetterWrap, 0 } }  ,  { "bodyUsed"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__bodyUsedGetterWrap, 0 } }  ,  { "clone"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, ResponsePrototype__cloneCallback, 1 } }  ,  { "headers"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, ResponsePrototype__headersGetterWrap, 0 } }  , @@ -3129,6 +3160,26 @@ JSC_DEFINE_HOST_FUNCTION(ResponsePrototype__blobCallback, (JSGlobalObject * lexi  } +JSC_DEFINE_CUSTOM_GETTER(ResponsePrototype__bodyGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) +{ +    auto& vm = lexicalGlobalObject->vm(); +    Zig::GlobalObject *globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject); +    auto throwScope = DECLARE_THROW_SCOPE(vm); +    JSResponse* thisObject = jsCast<JSResponse*>(JSValue::decode(thisValue)); +      JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); +     +    if (JSValue cachedValue = thisObject->m_body.get()) +        return JSValue::encode(cachedValue); +     +    JSC::JSValue result = JSC::JSValue::decode( +        ResponsePrototype__getBody(thisObject->wrapped(), globalObject) +    ); +    RETURN_IF_EXCEPTION(throwScope, {}); +    thisObject->m_body.set(vm, thisObject, result); +    RELEASE_AND_RETURN(throwScope, JSValue::encode(result)); +} + +  JSC_DEFINE_CUSTOM_GETTER(ResponsePrototype__bodyUsedGetterWrap, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))  {      auto& vm = lexicalGlobalObject->vm(); @@ -3443,6 +3494,7 @@ void JSResponse::visitChildrenImpl(JSCell* cell, Visitor& visitor)      JSResponse* thisObject = jsCast<JSResponse*>(cell);      ASSERT_GC_OBJECT_INHERITS(thisObject, info());      Base::visitChildren(thisObject, visitor); +    visitor.append(thisObject->m_body);      visitor.append(thisObject->m_headers);      visitor.append(thisObject->m_statusText);      visitor.append(thisObject->m_url); diff --git a/src/bun.js/bindings/ZigGeneratedClasses.h b/src/bun.js/bindings/ZigGeneratedClasses.h index feae81ae0..ad85dd3f9 100644 --- a/src/bun.js/bindings/ZigGeneratedClasses.h +++ b/src/bun.js/bindings/ZigGeneratedClasses.h @@ -1309,7 +1309,8 @@ class JSRequest final : public JSC::JSDestructibleObject {          DECLARE_VISIT_CHILDREN; -        mutable JSC::WriteBarrier<JSC::Unknown> m_headers; +        mutable JSC::WriteBarrier<JSC::Unknown> m_body; +mutable JSC::WriteBarrier<JSC::Unknown> m_headers;  mutable JSC::WriteBarrier<JSC::Unknown> m_url;      };  class JSRequestPrototype final : public JSC::JSNonFinalObject { @@ -1434,7 +1435,8 @@ class JSResponse final : public JSC::JSDestructibleObject {          DECLARE_VISIT_CHILDREN; -        mutable JSC::WriteBarrier<JSC::Unknown> m_headers; +        mutable JSC::WriteBarrier<JSC::Unknown> m_body; +mutable JSC::WriteBarrier<JSC::Unknown> m_headers;  mutable JSC::WriteBarrier<JSC::Unknown> m_statusText;  mutable JSC::WriteBarrier<JSC::Unknown> m_url;      }; diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig index b2fdd001e..c5ab610b2 100644 --- a/src/bun.js/bindings/exports.zig +++ b/src/bun.js/bindings/exports.zig @@ -180,6 +180,7 @@ pub const NodePath = JSC.Node.Path;  // Web Streams  pub const JSReadableStreamBlob = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource;  pub const JSReadableStreamFile = JSC.WebCore.FileBlobLoader.Source.JSReadableStreamSource; +pub const JSReadableStreamBytes = JSC.WebCore.ByteStream.Source.JSReadableStreamSource;  // Sinks  pub const JSArrayBufferSink = JSC.WebCore.ArrayBufferSink.JSSink; diff --git a/src/bun.js/bindings/generated_classes.zig b/src/bun.js/bindings/generated_classes.zig index 425d70a44..7230de74a 100644 --- a/src/bun.js/bindings/generated_classes.zig +++ b/src/bun.js/bindings/generated_classes.zig @@ -845,6 +845,9 @@ pub const JSRequest = struct {              @compileLog("Expected Request.getArrayBuffer to be a callback");          if (@TypeOf(Request.getBlob) != CallbackType)              @compileLog("Expected Request.getBlob to be a callback"); +        if (@TypeOf(Request.getBody) != GetterType) +            @compileLog("Expected Request.getBody to be a getter"); +          if (@TypeOf(Request.getBodyUsed) != GetterType)              @compileLog("Expected Request.getBodyUsed to be a getter"); @@ -893,6 +896,7 @@ pub const JSRequest = struct {              @export(Request.finalize, .{ .name = "RequestClass__finalize" });              @export(Request.getArrayBuffer, .{ .name = "RequestPrototype__getArrayBuffer" });              @export(Request.getBlob, .{ .name = "RequestPrototype__getBlob" }); +            @export(Request.getBody, .{ .name = "RequestPrototype__getBody" });              @export(Request.getBodyUsed, .{ .name = "RequestPrototype__getBodyUsed" });              @export(Request.getCache, .{ .name = "RequestPrototype__getCache" });              @export(Request.getCredentials, .{ .name = "RequestPrototype__getCredentials" }); @@ -968,6 +972,9 @@ pub const JSResponse = struct {              @compileLog("Expected Response.getArrayBuffer to be a callback");          if (@TypeOf(Response.getBlob) != CallbackType)              @compileLog("Expected Response.getBlob to be a callback"); +        if (@TypeOf(Response.getBody) != GetterType) +            @compileLog("Expected Response.getBody to be a getter"); +          if (@TypeOf(Response.getBodyUsed) != GetterType)              @compileLog("Expected Response.getBodyUsed to be a getter"); @@ -1010,6 +1017,7 @@ pub const JSResponse = struct {              @export(Response.finalize, .{ .name = "ResponseClass__finalize" });              @export(Response.getArrayBuffer, .{ .name = "ResponsePrototype__getArrayBuffer" });              @export(Response.getBlob, .{ .name = "ResponsePrototype__getBlob" }); +            @export(Response.getBody, .{ .name = "ResponsePrototype__getBody" });              @export(Response.getBodyUsed, .{ .name = "ResponsePrototype__getBodyUsed" });              @export(Response.getHeaders, .{ .name = "ResponsePrototype__getHeaders" });              @export(Response.getJSON, .{ .name = "ResponsePrototype__getJSON" }); diff --git a/src/bun.js/bindings/headers-cpp.h b/src/bun.js/bindings/headers-cpp.h index 93b878c94..5f97769c8 100644 --- a/src/bun.js/bindings/headers-cpp.h +++ b/src/bun.js/bindings/headers-cpp.h @@ -1,4 +1,4 @@ -//-- AUTOGENERATED FILE -- 1663900299 +//-- AUTOGENERATED FILE -- 1663581060  // clang-format off  #pragma once diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index b251568f9..49b26d394 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -1,5 +1,5 @@  // clang-format off -//-- AUTOGENERATED FILE -- 1663900299 +//-- AUTOGENERATED FILE -- 1663581060  #pragma once  #include <stddef.h> @@ -822,6 +822,12 @@ ZIG_DECL JSC__JSValue ByteBlob__JSReadableStreamSource__load(JSC__JSGlobalObject  ZIG_DECL JSC__JSValue FileBlobLoader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0);  #endif + +#ifdef __cplusplus + +ZIG_DECL JSC__JSValue ByteStream__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0); + +#endif  CPP_DECL JSC__JSValue ArrayBufferSink__assignToStream(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1, void* arg2, void** arg3);  CPP_DECL JSC__JSValue ArrayBufferSink__createObject(JSC__JSGlobalObject* arg0, void* arg1);  CPP_DECL void ArrayBufferSink__detachPtr(JSC__JSValue JSValue0); diff --git a/src/bun.js/bindings/napi.cpp b/src/bun.js/bindings/napi.cpp index 009a11f61..b4bef19f9 100644 --- a/src/bun.js/bindings/napi.cpp +++ b/src/bun.js/bindings/napi.cpp @@ -751,6 +751,17 @@ extern "C" napi_status napi_create_reference(napi_env env, napi_value value,      return napi_ok;  } +extern "C" void napi_set_ref(NapiRef* ref, JSC__JSValue val_) +{ + +    JSC::JSValue val = JSC::JSValue::decode(val_); +    if (val) { +        ref->strongRef.set(ref->globalObject->vm(), val); +    } else { +        ref->strongRef.clear(); +    } +} +  extern "C" napi_status napi_add_finalizer(napi_env env, napi_value js_object,      void* native_object,      napi_finalize finalize_cb, @@ -794,6 +805,11 @@ extern "C" napi_status napi_get_reference_value(napi_env env, napi_ref ref,      return napi_ok;  } +extern "C" JSC__JSValue napi_get_reference_value_internal(NapiRef* napiRef) +{ +    return JSC::JSValue::encode(napiRef->value()); +} +  extern "C" napi_status napi_reference_ref(napi_env env, napi_ref ref,      uint32_t* result)  { diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index 3cf695f74..84bad8f24 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -10,6 +10,7 @@ export default [      proto: {        text: { fn: "getText" },        json: { fn: "getJSON" }, +      body: { getter: "getBody", cache: true },        arrayBuffer: { fn: "getArrayBuffer" },        blob: { fn: "getBlob" },        clone: { fn: "doClone", length: 1 }, @@ -74,6 +75,7 @@ export default [          getter: "getURL",          cache: true,        }, +      body: { getter: "getBody", cache: true },        text: { fn: "getText" },        json: { fn: "getJSON" }, diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index dd37537e3..64dfe3dc7 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -145,6 +145,18 @@ pub const Response = struct {          return JSValue.jsBoolean(this.body.value == .Used);      } +    pub fn getBody( +        this: *Response, +        globalThis: *JSC.JSGlobalObject, +    ) callconv(.C) JSValue { +        if (this.body.value == .Used) { +            globalThis.throw("Body already used", .{}); +            return JSValue.jsUndefined(); +        } + +        return this.body.value.toReadableStream(globalThis); +    } +      pub fn getStatusText(          this: *Response,          globalThis: *JSC.JSGlobalObject, @@ -194,21 +206,21 @@ pub const Response = struct {      }      pub fn cloneInto( -        this: *const Response, +        this: *Response,          new_response: *Response,          allocator: std.mem.Allocator,          globalThis: *JSGlobalObject,      ) void {          new_response.* = Response{              .allocator = allocator, -            .body = this.body.clone(allocator, globalThis), +            .body = this.body.clone(globalThis),              .url = allocator.dupe(u8, this.url) catch unreachable,              .status_text = allocator.dupe(u8, this.status_text) catch unreachable,              .redirected = this.redirected,          };      } -    pub fn clone(this: *const Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response { +    pub fn clone(this: *Response, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) *Response {          var new_response = allocator.create(Response) catch unreachable;          this.cloneInto(new_response, allocator, globalThis);          return new_response; @@ -267,7 +279,7 @@ pub const Response = struct {                  return default.value;              }, -            .Used, .Locked, .Empty, .Error => return default.value, +            .InternalBlob, .Used, .Locked, .Empty, .Error => return default.value,          }      } @@ -552,7 +564,7 @@ pub const Fetch = struct {              const globalThis = this.global_this;              var ref = this.ref; -            const promise_value = ref.get(globalThis); +            const promise_value = ref.get();              defer ref.destroy(globalThis);              if (promise_value.isEmptyOrUndefinedOrNull()) { @@ -596,7 +608,7 @@ pub const Fetch = struct {              var allocator = this.global_this.bunVM().allocator;              const http_response = this.result.response;              var response = allocator.create(Response) catch unreachable; -            const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this); +            const internal_blob = this.response_buffer.list.toManaged(this.response_buffer.allocator);              this.response_buffer = .{ .allocator = default_allocator, .list = .{                  .items = &.{},                  .capacity = 0, @@ -613,7 +625,7 @@ pub const Fetch = struct {                          .status_code = @truncate(u16, http_response.status_code),                      },                      .value = .{ -                        .Blob = blob, +                        .InternalBlob = internal_blob,                      },                  },              }; @@ -937,6 +949,15 @@ pub const Blob = struct {          return this.store == null;      } +    pub fn writeFormatForSize(size: usize, writer: anytype, comptime enable_ansi_colors: bool) !void { +        try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors)); +        try writer.print( +            comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors), +            .{ +                bun.fmt.size(size), +            }, +        ); +    }      pub fn writeFormat(this: *const Blob, formatter: *JSC.Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {          const Writer = @TypeOf(writer); @@ -947,38 +968,34 @@ pub const Blob = struct {              return;          } -        var store = this.store.?; -        switch (store.data) { -            .file => |file| { -                try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors)); -                switch (file.pathlike) { -                    .path => |path| { -                        try writer.print( -                            comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors), -                            .{ -                                path.slice(), -                            }, -                        ); -                    }, -                    .fd => |fd| { -                        try writer.print( -                            comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors), -                            .{ -                                fd, -                            }, -                        ); -                    }, -                } -            }, -            .bytes => { -                try writer.writeAll(comptime Output.prettyFmt("<r>Blob<r>", enable_ansi_colors)); -                try writer.print( -                    comptime Output.prettyFmt(" (<yellow>{any}<r>)", enable_ansi_colors), -                    .{ -                        bun.fmt.size(this.size), -                    }, -                ); -            }, +        { +            var store = this.store.?; +            switch (store.data) { +                .file => |file| { +                    try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors)); +                    switch (file.pathlike) { +                        .path => |path| { +                            try writer.print( +                                comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors), +                                .{ +                                    path.slice(), +                                }, +                            ); +                        }, +                        .fd => |fd| { +                            try writer.print( +                                comptime Output.prettyFmt(" (<r>fd: <yellow>{d}<r>)<r>", enable_ansi_colors), +                                .{ +                                    fd, +                                }, +                            ); +                        }, +                    } +                }, +                .bytes => { +                    try writeFormatForSize(this.size, writer, enable_ansi_colors); +                }, +            }          }          if (this.content_type.len > 0 or this.offset > 0) { @@ -1071,7 +1088,7 @@ pub const Blob = struct {                      bun.default_allocator.destroy(this);                      promise.reject(globalThis, ZigString.init("Body was used after it was consumed").toErrorInstance(globalThis));                  }, -                .Empty, .Blob => { +                .InternalBlob, .Empty, .Blob => {                      var blob = value.use();                      // TODO: this should be one promise not two!                      const new_promise = writeFileWithSourceDestination(globalThis.ref(), &blob, &file_blob); @@ -1273,7 +1290,7 @@ pub const Blob = struct {          var source_blob: Blob = brk: {              if (data.as(Response)) |response| {                  switch (response.body.value) { -                    .Used, .Empty, .Blob => { +                    .InternalBlob, .Used, .Empty, .Blob => {                          break :brk response.body.use();                      },                      .Error => { @@ -1302,7 +1319,7 @@ pub const Blob = struct {              if (data.as(Request)) |request| {                  switch (request.body) { -                    .Used, .Empty, .Blob => { +                    .InternalBlob, .Used, .Empty, .Blob => {                          break :brk request.body.use();                      },                      .Error => { @@ -3321,11 +3338,6 @@ pub const Blob = struct {          return slice_[0..@minimum(slice_.len, @as(usize, this.size))];      } -    pub fn view(this: *const Blob) []const u8 { -        if (this.size == 0 or this.store == null) return ""; -        return this.store.?.sharedView()[this.offset..][0..this.size]; -    } -      pub const Lifetime = JSC.WebCore.Lifetime;      pub fn setIsASCIIFlag(this: *Blob, is_all_ascii: bool) void {          this.is_all_ascii = is_all_ascii; @@ -3881,10 +3893,10 @@ pub const Body = struct {          return this.value.use();      } -    pub fn clone(this: Body, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) Body { +    pub fn clone(this: *Body, globalThis: *JSGlobalObject) Body {          return Body{              .init = this.init.clone(globalThis), -            .value = this.value.clone(allocator), +            .value = this.value.clone(globalThis),          };      } @@ -3911,6 +3923,16 @@ pub const Body = struct {              try formatter.printComma(Writer, writer, enable_ansi_colors);              try writer.writeAll("\n");              try this.value.Blob.writeFormat(formatter, writer, enable_ansi_colors); +        } else if (this.value == .InternalBlob) { +            try formatter.printComma(Writer, writer, enable_ansi_colors); +            try writer.writeAll("\n"); +            try Blob.writeFormatForSize(this.value.InternalBlob.items.len, writer, enable_ansi_colors); +        } else if (this.value == .Locked) { +            if (this.value.Locked.readable) |stream| { +                try formatter.printComma(Writer, writer, enable_ansi_colors); +                try writer.writeAll("\n"); +                formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); +            }          }      } @@ -3982,6 +4004,8 @@ pub const Body = struct {          /// conditionally runs when requesting data          /// used in HTTP server to ignore request bodies unless asked for it          onPull: ?fn (ctx: *anyopaque) void = null, +        onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, +          deinit: bool = false,          action: Action = Action.none, @@ -4041,6 +4065,9 @@ pub const Body = struct {      pub const Value = union(Tag) {          Blob: Blob, +        /// Single-use Blob +        /// Avoids a heap allocation. +        InternalBlob: std.ArrayList(u8),          Locked: PendingValue,          Used: void,          Empty: void, @@ -4048,6 +4075,7 @@ pub const Body = struct {          pub const Tag = enum {              Blob, +            InternalBlob,              Locked,              Used,              Empty, @@ -4056,6 +4084,87 @@ pub const Body = struct {          pub const empty = Value{ .Empty = .{} }; + +        pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) JSValue { +            JSC.markBinding(); + +            switch (this.*) { +                .Used, .Empty => { +                    return JSC.WebCore.ReadableStream.empty(globalThis); +                }, +                .InternalBlob => |*bytes| { +                    var blob = Blob.init(bytes.toOwnedSlice(), bytes.allocator, globalThis); +                    defer blob.detach(); +                    var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); +                    this.* = .{ +                        .Locked = .{ +                            .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?, +                            .global = globalThis, +                        }, +                    }; +                    return readable; +                }, +                .Blob => { +                    var blob = this.Blob; +                    defer blob.detach(); +                    var readable = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); +                    readable.protect(); +                    this.* = .{ +                        .Locked = .{ +                            .readable = JSC.WebCore.ReadableStream.fromJS(readable, globalThis).?, +                            .global = globalThis, +                        }, +                    }; +                    return readable; +                }, +                .Locked => { +                    var locked = &this.Locked; +                    if (locked.readable) |readable| { +                        return readable.value; +                    } +                    var drain_result: JSC.WebCore.DrainResult = .{ +                        .estimated_size = 0, +                    }; + +                    if (locked.onDrain) |drain| { +                        locked.onDrain = null; +                        drain_result = drain(locked.task.?); +                    } + +                    if (drain_result == .empty or drain_result == .aborted) { +                        this.* = .{ .Empty = void{} }; +                        return JSC.WebCore.ReadableStream.empty(globalThis); +                    } + +                    var reader = bun.default_allocator.create(JSC.WebCore.ByteStream.Source) catch unreachable; +                    reader.* = .{ +                        .context = undefined, +                        .globalThis = globalThis, +                    }; + +                    reader.context.setup(); + +                    if (drain_result == .estimated_size) { +                        reader.context.highWaterMark = @truncate(Blob.SizeType, drain_result.estimated_size); +                        reader.context.size_hint = @truncate(Blob.SizeType, drain_result.estimated_size); +                    } else if (drain_result == .owned) { +                        reader.context.buffer = drain_result.owned.list; +                        reader.context.size_hint = @truncate(Blob.SizeType, drain_result.owned.size_hint); +                    } + +                    locked.readable = .{ +                        .ptr = .{ .Bytes = &reader.context }, +                        .value = reader.toJS(globalThis), +                    }; + +                    locked.readable.?.value.protect(); +                    return locked.readable.?.value; +                }, + +                else => unreachable, +            } +        } +          pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) ?Value {              if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |readable| {                  switch (readable.ptr) { @@ -4121,13 +4230,29 @@ pub const Body = struct {                  if (locked.promise) |promise| {                      locked.promise = null; -                    var blob = new.use();                      switch (locked.action) {                          .getText => { -                            promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref())); +                            if (new.* == .InternalBlob) { +                                var bytes = new.InternalBlob; +                                new.* = .{ .Empty = void{} }; +                                var str = ZigString.init(bytes.items).withEncoding(); +                                str.mark(); +                                if (str.is16Bit()) { +                                    const out = str.toValueGC(global); +                                    bytes.deinit(); +                                    promise.asPromise().?.resolve(global, out); +                                } else { +                                    const out = str.toExternalValue(global); +                                    promise.asPromise().?.resolve(global, out); +                                } +                            } else { +                                var blob = new.use(); +                                promise.asPromise().?.resolve(global, blob.getTextTransfer(global.ref())); +                            }                          },                          .getJSON => { +                            var blob = new.use();                              const json_value = blob.toJSON(global, .share);                              blob.detach(); @@ -4138,17 +4263,25 @@ pub const Body = struct {                              }                          },                          .getArrayBuffer => { -                            promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global)); +                            if (new.* == .InternalBlob) { +                                const marked = JSC.MarkedArrayBuffer.fromBytes(new.InternalBlob.items, new.InternalBlob.allocator, .ArrayBuffer); +                                new.* = .{ .Empty = void{} }; +                                var object_ref = marked.toJS(global, null); +                                promise.asPromise().?.resolve(global, JSC.JSValue.c(object_ref)); +                            } else { +                                var blob = new.use(); +                                promise.asPromise().?.resolve(global, blob.getArrayBufferTransfer(global)); +                            }                          },                          .getBlob => {                              var ptr = bun.default_allocator.create(Blob) catch unreachable; -                            ptr.* = blob; +                            ptr.* = new.use();                              ptr.allocator = bun.default_allocator;                              promise.asPromise().?.resolve(global, ptr.toJS(global));                          },                          else => {                              var ptr = bun.default_allocator.create(Blob) catch unreachable; -                            ptr.* = blob; +                            ptr.* = new.use();                              ptr.allocator = bun.default_allocator;                              promise.asInternalPromise().?.resolve(global, ptr.toJS(global));                          }, @@ -4160,6 +4293,7 @@ pub const Body = struct {          pub fn slice(this: Value) []const u8 {              return switch (this) {                  .Blob => this.Blob.sharedView(), +                .InternalBlob => |list| list.items,                  else => "",              };          } @@ -4172,6 +4306,15 @@ pub const Body = struct {                      this.* = .{ .Used = .{} };                      return new_blob;                  }, +                .InternalBlob => { +                    var new_blob = Blob.init( +                        this.InternalBlob.toOwnedSlice(), +                        this.InternalBlob.allocator, +                        JSC.VirtualMachine.vm.global, +                    ); +                    this.* = .{ .Used = .{} }; +                    return new_blob; +                },                  else => {                      return Blob.initEmpty(undefined);                  }, @@ -4228,14 +4371,22 @@ pub const Body = struct {          pub fn deinit(this: *Value) void {              const tag = @as(Tag, this.*);              if (tag == .Locked) { -                if (this.Locked.readable) |*readable| { -                    readable.done(); +                if (!this.Locked.deinit) { +                    this.Locked.deinit = true; + +                    if (this.Locked.readable) |*readable| { +                        readable.done(); +                    }                  } -                this.Locked.deinit = true;                  return;              } +            if (tag == .InternalBlob) { +                this.InternalBlob.clearAndFree(); +                this.* = Value.empty; +            } +              if (tag == .Blob) {                  this.Blob.deinit();                  this.* = Value.empty; @@ -4246,8 +4397,18 @@ pub const Body = struct {              }          } -        pub fn clone(this: Value, _: std.mem.Allocator) Value { -            if (this == .Blob) { +        pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value { +            if (this.* == .InternalBlob) { +                this.* = .{ +                    .Blob = Blob.init( +                        this.InternalBlob.toOwnedSlice(), +                        this.InternalBlob.allocator, +                        globalThis, +                    ), +                }; +            } + +            if (this.* == .Blob) {                  return Value{ .Blob = this.Blob.dupe() };              } @@ -4319,10 +4480,16 @@ pub const Body = struct {              } else |_| {}          } +        if (value.isUndefined()) { +            body.value = Value.empty; +            return body; +        } +          body.value = Value.fromJS(globalThis, value) orelse return null;          if (body.value == .Blob)              std.debug.assert(body.value.Blob.allocator == null); // owned by Body +          return body;      }  }; @@ -4358,7 +4525,18 @@ pub const Request = struct {              try writer.writeAll("\"");              if (this.body == .Blob) {                  try writer.writeAll("\n"); +                try formatter.writeIndent(Writer, writer);                  try this.body.Blob.writeFormat(formatter, writer, enable_ansi_colors); +            } else if (this.body == .InternalBlob) { +                try writer.writeAll("\n"); +                try formatter.writeIndent(Writer, writer); +                try Blob.writeFormatForSize(this.body.InternalBlob.items.len, writer, enable_ansi_colors); +            } else if (this.body == .Locked) { +                if (this.body.Locked.readable) |stream| { +                    try writer.writeAll("\n"); +                    try formatter.writeIndent(Writer, writer); +                    formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); +                }              }          }          try writer.writeAll("\n"); @@ -4392,7 +4570,7 @@ pub const Request = struct {                  return MimeType.other.value;              }, -            .Error, .Used, .Locked, .Empty => return MimeType.other.value, +            .InternalBlob, .Error, .Used, .Locked, .Empty => return MimeType.other.value,          }      } @@ -4415,6 +4593,18 @@ pub const Request = struct {          return ZigString.init("").toValueGC(globalThis);      } +    pub fn getBody( +        this: *Request, +        globalThis: *JSC.JSGlobalObject, +    ) callconv(.C) JSValue { +        if (this.body == .Used) { +            globalThis.throw("Body already used", .{}); +            return JSValue.jsUndefined(); +        } + +        return this.body.toReadableStream(globalThis); +    } +      pub fn getIntegrity(          _: *Request,          globalThis: *JSC.JSGlobalObject, @@ -4521,17 +4711,9 @@ pub const Request = struct {                      }                      if (urlOrObject.fastGet(globalThis, .body)) |body_| { -                        if (Blob.get(globalThis, body_, true, false)) |blob| { -                            if (blob.size > 0) { -                                request.body = Body.Value{ .Blob = blob }; -                            } -                        } else |err| { -                            if (err == error.InvalidArguments) { -                                globalThis.throwInvalidArguments("Expected an Array", .{}); -                                return null; -                            } - -                            globalThis.throwInvalidArguments("Invalid Body object", .{}); +                        if (Body.Value.fromJS(globalThis, body_)) |body| { +                            request.body = body.value; +                        } else {                              return null;                          }                      } @@ -4546,17 +4728,9 @@ pub const Request = struct {                  }                  if (arguments[1].fastGet(globalThis, .body)) |body_| { -                    if (Blob.get(globalThis, body_, true, false)) |blob| { -                        if (blob.size > 0) { -                            request.body = Body.Value{ .Blob = blob }; -                        } -                    } else |err| { -                        if (err == error.InvalidArguments) { -                            globalThis.throwInvalidArguments("Expected an Array", .{}); -                            return null; -                        } - -                        globalThis.throwInvalidArguments("Invalid Body object", .{}); +                    if (Body.Value.fromJS(globalThis, body_)) |body| { +                        request.body = body.value; +                    } else {                          return null;                      }                  } @@ -4614,7 +4788,7 @@ pub const Request = struct {          globalThis: *JSGlobalObject,      ) void {          req.* = Request{ -            .body = this.body.clone(allocator), +            .body = this.body.clone(globalThis),              .url = ZigString.init(allocator.dupe(u8, this.url.slice()) catch unreachable),              .method = this.method,          }; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 23aad70ec..681ff6172 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -70,7 +70,7 @@ pub const ReadableStream = struct {      pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {          JSC.markBinding();          this.value.unprotect(); -        ReadableStream__abort(this.value, globalThis); +        ReadableStream__cancel(this.value, globalThis);      }      pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void { @@ -98,6 +98,8 @@ pub const ReadableStream = struct {          /// This is a direct readable stream          /// That means we can turn it into whatever we want          Direct = 3, + +        Bytes = 4,      };      pub const Source = union(Tag) {          Invalid: void, @@ -116,6 +118,8 @@ pub const ReadableStream = struct {          /// This is a direct readable stream          /// That means we can turn it into whatever we want          Direct: void, + +        Bytes: *ByteStream,      };      extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag; @@ -165,6 +169,13 @@ pub const ReadableStream = struct {                  },              }, +            .Bytes => ReadableStream{ +                .value = value, +                .ptr = .{ +                    .Bytes = ptr.asPtr(ByteStream), +                }, +            }, +              // .HTTPRequest => ReadableStream{              //     .value = value,              //     .ptr = .{ @@ -184,6 +195,7 @@ pub const ReadableStream = struct {      extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue;      pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue { +        JSC.markBinding();          return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id)));      }      pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { @@ -402,6 +414,16 @@ pub const StreamStart = union(Tag) {      }  }; +pub const DrainResult = union(enum) { +    owned: struct { +        list: std.ArrayList(u8), +        size_hint: usize, +    }, +    estimated_size: usize, +    empty: void, +    aborted: void, +}; +  pub const StreamResult = union(Tag) {      owned: bun.ByteList,      owned_and_done: bun.ByteList, @@ -2660,65 +2682,301 @@ pub const ByteBlobLoader = struct {      pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);  }; -pub fn RequestBodyStreamer( -    comptime is_ssl: bool, -) type { -    return struct { -        response: *uws.NewApp(is_ssl).Response, +pub const ByteStream = struct { +    buffer: std.ArrayList(u8) = .{ +        .allocator = bun.default_allocator, +        .items = &.{}, +        .capacity = 0, +    }, +    has_received_last_chunk: bool = false, +    pending: StreamResult.Pending = StreamResult.Pending{ +        .frame = undefined, +        .used = false, +        .result = .{ .done = {} }, +    }, +    done: bool = false, +    pending_buffer: []u8 = &.{}, +    pending_value: ?*JSC.napi.Ref = null, +    offset: usize = 0, +    highWaterMark: Blob.SizeType = 0, +    pipe: Pipe = .{}, +    size_hint: Blob.SizeType = 0, + +    pub const Pipe = struct { +        ctx: ?*anyopaque = null, +        onPipe: ?PipeFunction = null, + +        pub fn New(comptime Type: type, comptime Function: anytype) type { +            return struct { +                pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void { +                    Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator); +                } -        pub const tag = if (is_ssl) -            ReadableStream.Tag.HTTPRequest -        else if (is_ssl) -            ReadableStream.Tag.HTTPSRequest; +                pub fn init(self: *Type) Pipe { +                    return Pipe{ +                        .ctx = self, +                        .onPipe = pipe, +                    }; +                } +            }; +        } +    }; -        pub fn onStart(this: *ByteBlobLoader) StreamStart { -            return .{ .chunk_size = this.chunk_size }; +    pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; + +    pub const tag = ReadableStream.Tag.Bytes; + +    pub fn setup(this: *ByteStream) void { +        this.* = .{}; +    } + +    pub fn onStart(this: *@This()) StreamStart { +        if (this.has_received_last_chunk and this.buffer.items.len == 0) { +            return .{ .empty = void{} };          } -        pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult { -            array.ensureStillAlive(); -            defer array.ensureStillAlive(); -            if (this.done) { -                return .{ .done = {} }; +        if (this.has_received_last_chunk) { +            return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) }; +        } + +        if (this.highWaterMark == 0) { +            return .{ .ready = void{} }; +        } + +        return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) }; +    } + +    pub fn value(this: *@This()) JSValue { +        if (this.pending_value == null) +            return .zero; + +        const result = this.pending_value.?.get(); +        this.pending_value.?.set(.zero); + +        return result; +    } + +    pub fn isCancelled(this: *const @This()) bool { +        return @fieldParentPtr(Source, "context", this).cancelled; +    } + +    pub fn unpipe(this: *@This()) void { +        this.pipe.ctx = null; +        this.pipe.onPipe = null; +        if (!this.parent().deinited) { +            this.parent().deinited = true; +            bun.default_allocator.destroy(this.parent()); +        } +    } + +    pub fn onData( +        this: *@This(), +        stream: StreamResult, +        allocator: std.mem.Allocator, +    ) void { +        JSC.markBinding(); +        if (this.done) { +            if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) { +                if (stream == .owned) allocator.free(stream.owned.slice()); +                if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice());              } -            var temporary = this.store.sharedView(); -            temporary = temporary[this.offset..]; +            return; +        } + +        std.debug.assert(!this.has_received_last_chunk); +        this.has_received_last_chunk = stream.isDone(); + +        if (this.pipe.ctx != null) { +            this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator); +            return; +        } + +        const chunk = stream.slice(); + +        if (!this.pending.used) { +            std.debug.assert(this.buffer.items.len == 0); +            var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)]; +            const pending_buffer_len = this.pending_buffer.len; +            @memcpy(to_copy.ptr, chunk.ptr, to_copy.len); +            this.pending_buffer = &.{}; + +            const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len; -            temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))]; -            if (temporary.len == 0) { -                this.store.deref(); +            if (is_really_done) {                  this.done = true; -                return .{ .done = {} }; +                this.pending.result = .{ +                    .into_array_and_done = .{ +                        .value = this.value(), +                        .len = @truncate(Blob.SizeType, to_copy.len), +                    }, +                }; +            } else { +                this.pending.result = .{ +                    .into_array = .{ +                        .value = this.value(), +                        .len = @truncate(Blob.SizeType, to_copy.len), +                    }, +                };              } -            const copied = @intCast(Blob.SizeType, temporary.len); +            const remaining = chunk[to_copy.len..]; +            if (remaining.len > 0) +                this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body"); +            resume this.pending.frame; +            return; +        } -            this.remain -|= copied; -            this.offset +|= copied; -            @memcpy(buffer.ptr, temporary.ptr, temporary.len); -            if (this.remain == 0) { -                return .{ .into_array_and_done = .{ .value = array, .len = copied } }; +        this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body"); +    } + +    pub fn append( +        this: *@This(), +        stream: StreamResult, +        offset: usize, +        allocator: std.mem.Allocator, +    ) !void { +        const chunk = stream.slice()[offset..]; + +        if (this.buffer.capacity == 0) { +            switch (stream) { +                .owned => |owned| { +                    this.buffer = owned.listManaged(allocator); +                    this.offset += offset; +                }, +                .owned_and_done => |owned| { +                    this.buffer = owned.listManaged(allocator); +                    this.offset += offset; +                }, +                .temporary_and_done, .temporary => { +                    this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len); +                    this.buffer.appendSliceAssumeCapacity(chunk); +                }, +                else => unreachable,              } +            return; +        } + +        switch (stream) { +            .temporary_and_done, .temporary => { +                try this.buffer.appendSlice(chunk); +            }, +            // We don't support the rest of these yet +            else => unreachable, +        } +    } -            return .{ .into_array = .{ .value = array, .len = copied } }; +    pub fn setValue(this: *@This(), view: JSC.JSValue) void { +        JSC.markBinding(); +        if (this.pending_value) |pending| { +            pending.set(view); +        } else { +            this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view);          } +    } -        pub fn onCancel(_: *ByteBlobLoader) void {} +    pub fn parent(this: *@This()) *Source { +        return @fieldParentPtr(Source, "context", this); +    } -        pub fn deinit(this: *ByteBlobLoader) void { -            if (!this.done) { +    pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult { +        JSC.markBinding(); +        std.debug.assert(buffer.len > 0); + +        if (this.buffer.items.len > 0) { +            std.debug.assert(this.value() == .zero); +            const to_write = @minimum( +                this.buffer.items.len - this.offset, +                buffer.len, +            ); +            var remaining_in_buffer = this.buffer.items[this.offset..][0..to_write]; + +            @memcpy(buffer.ptr, this.buffer.items.ptr + this.offset, to_write); + +            if (this.offset + to_write == this.buffer.items.len) { +                this.offset = 0; +                this.buffer.items.len = 0; +            } else { +                this.offset += to_write; +            } + +            if (this.has_received_last_chunk and remaining_in_buffer.len == 0) { +                this.buffer.clearAndFree();                  this.done = true; -                this.store.deref(); + +                return .{ +                    .into_array_and_done = .{ +                        .value = view, +                        .len = @truncate(Blob.SizeType, to_write), +                    }, +                }; +            } + +            return .{ +                .into_array = .{ +                    .value = view, +                    .len = @truncate(Blob.SizeType, to_write), +                }, +            }; +        } + +        if (this.has_received_last_chunk) { +            return .{ +                .done = void{}, +            }; +        } + +        this.pending_buffer = buffer; +        this.setValue(view); + +        return .{ +            .pending = &this.pending, +        }; +    } + +    pub fn onCancel(this: *@This()) void { +        JSC.markBinding(); +        const view = this.value(); +        if (this.buffer.capacity > 0) this.buffer.clearAndFree(); +        this.done = true; +        if (this.pending_value) |ref| { +            this.pending_value = null; +            ref.destroy(undefined); +        } + +        if (view != .zero) { +            this.pending_buffer = &.{}; +            this.pending.result = .{ .done = {} }; +            if (!this.pending.used) { +                resume this.pending.frame;              } +        } +    } + +    pub fn deinit(this: *@This()) void { +        JSC.markBinding(); +        if (this.buffer.capacity > 0) this.buffer.clearAndFree(); -            bun.default_allocator.destroy(this); +        if (this.pending_value) |ref| { +            this.pending_value = null; +            ref.destroy(undefined);          } +        if (!this.done) { +            this.done = true; -        pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer"; -        pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit); -    }; -} +            this.pending_buffer = &.{}; +            this.pending.result = .{ .done = {} }; + +            if (!this.pending.used) { +                resume this.pending.frame; +            } +        } + +        bun.default_allocator.destroy(this.parent()); +    } + +    pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit); +};  pub const FileBlobLoader = struct {      buf: []u8 = &[_]u8{}, diff --git a/src/napi/napi.zig b/src/napi/napi.zig index 54e16d153..6b191167f 100644 --- a/src/napi/napi.zig +++ b/src/napi/napi.zig @@ -12,6 +12,7 @@ const Channel = @import("../sync.zig").Channel;  pub const napi_env = *JSC.JSGlobalObject;  pub const Ref = opaque {      pub fn create(globalThis: *JSC.JSGlobalObject, value: JSValue) *Ref { +        JSC.markBinding();          var ref: *Ref = undefined;          std.debug.assert(              napi_create_reference( @@ -22,20 +23,27 @@ pub const Ref = opaque {              ) == .ok,          );          if (comptime bun.Environment.isDebug) { -            std.debug.assert(ref.get(globalThis) == value); +            std.debug.assert(ref.get() == value);          }          return ref;      } -    pub fn get(ref: *Ref, globalThis: *JSC.JSGlobalObject) JSValue { -        var value: JSValue = JSValue.zero; -        std.debug.assert(napi_get_reference_value(globalThis, ref, &value) == .ok); -        return value; +    pub fn get(ref: *Ref) JSValue { +        JSC.markBinding(); +        return napi_get_reference_value_internal(ref);      }      pub fn destroy(ref: *Ref, globalThis: *JSC.JSGlobalObject) void { +        JSC.markBinding();          std.debug.assert(napi_delete_reference(globalThis, ref) == .ok);      } + +    pub fn set(this: *Ref, value: JSC.JSValue) void { +        JSC.markBinding(); +        napi_set_ref(this, value); +    } + +    extern fn napi_set_ref(ref: *Ref, value: JSC.JSValue) void;  };  pub const napi_handle_scope = napi_env;  pub const napi_escapable_handle_scope = struct_napi_escapable_handle_scope__; @@ -664,6 +672,7 @@ pub extern fn napi_delete_reference(env: napi_env, ref: *Ref) napi_status;  pub extern fn napi_reference_ref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status;  pub extern fn napi_reference_unref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status;  pub extern fn napi_get_reference_value(env: napi_env, ref: *Ref, result: *napi_value) napi_status; +pub extern fn napi_get_reference_value_internal(ref: *Ref) JSC.JSValue;  // JSC scans the stack  // we don't need this | 
