diff options
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r-- | src/bun.js/api/server.zig | 293 |
1 files changed, 269 insertions, 24 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index d7cfbe4c1..939e010f3 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -538,7 +538,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp sendfile: SendfileContext = undefined, request_js_object: JSC.C.JSObjectRef = null, request_body_buf: std.ArrayListUnmanaged(u8) = .{}, + request_body_content_len: usize = 0, sink: ?*ResponseStream.JSSink = null, + byte_stream: ?*JSC.WebCore.ByteStream = null, has_written_status: bool = false, @@ -696,20 +698,67 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } this.response_buf_owned = std.ArrayListUnmanaged(u8){ .items = bb.items, .capacity = bb.capacity }; - this.renderResponseBuffer(); + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); } pub fn renderResponseBuffer(this: *RequestContext) void { this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this); } - pub fn onWritableResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool { + /// Render a complete response buffer + pub fn renderResponseBufferAndMetadata(this: *RequestContext) void { + this.renderMetadata(); + + if (!this.resp.tryEnd( + this.response_buf_owned.items, + this.response_buf_owned.items.len, + )) { + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); + this.setAbortHandler(); + return; + } + + this.finalize(); + } + + /// Drain a partial response buffer + pub fn drainResponseBufferAndMetadata(this: *RequestContext) void { + this.renderMetadata(); + this.setAbortHandler(); + + _ = this.resp.write( + this.response_buf_owned.items, + ); + + this.response_buf_owned.items.len = 0; + } + + pub fn renderResponseBufferAndMetadataCorked(this: *RequestContext) void { + this.resp.runCorkedWithType(*RequestContext, renderResponseBufferAndMetadata, this); + } + + pub fn drainResponseBufferAndMetadataCorked(this: *RequestContext) void { + this.resp.runCorkedWithType(*RequestContext, drainResponseBufferAndMetadata, this); + } + + pub fn onWritableResponseBuffer(this: *RequestContext, _: c_ulong, resp: *App.Response) callconv(.C) bool { + std.debug.assert(this.resp == resp); + if (this.aborted) { + this.finalizeForAbort(); + return false; + } + resp.end("", false); + this.finalize(); + return false; + } + + pub fn onWritableCompleteResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool { std.debug.assert(this.resp == resp); if (this.aborted) { this.finalizeForAbort(); return false; } - return this.sendWritableBytes(this.response_buf_owned.items, write_offset, resp); + return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp); } pub fn create(this: *RequestContext, server: *ThisServer, req: *uws.Request, resp: *App.Response) void { @@ -772,11 +821,24 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) { this.pending_promises_for_abort += 1; req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + } else if (req.body == .Locked and (req.body.Locked.readable != null)) { + req.body.Locked.readable.?.abort(this.server.globalThis); + req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + req.body.Locked.readable = null; } req.uws_request = null; } } + if (this.response_ptr) |response| { + if (response.body.value == .Locked) { + if (response.body.value.Locked.readable) |*readable| { + response.body.value.Locked.readable = null; + readable.abort(this.server.globalThis); + } + } + } + // then, we reject the response promise if (this.promise) |promise| { this.pending_promises_for_abort += 1; @@ -841,6 +903,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } JSC.C.JSValueUnprotect(this.server.globalThis.ref(), promise.asObjectRef()); } + + if (this.byte_stream) |stream| { + this.byte_stream = null; + stream.unpipe(); + } } pub fn finalize(this: *RequestContext) void { this.finalizeWithoutDeinit(); @@ -974,10 +1041,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } var bytes = this.blob.sharedView(); - return this.sendWritableBytes(bytes, write_offset, resp); + _ = this.sendWritableBytesForBlob(bytes, write_offset, resp); + return true; } - pub fn sendWritableBytes(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { + pub fn sendWritableBytesForBlob(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { std.debug.assert(this.resp == resp); var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..]; @@ -990,6 +1058,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + pub fn sendWritableBytesForCompleteResponseBuffer(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool { + std.debug.assert(this.resp == resp); + + var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..]; + if (resp.tryEnd(bytes, bytes_.len)) { + this.response_buf_owned.items.len = 0; + this.finalize(); + } else { + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); + } + + return true; + } + pub fn onWritableSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool { return this.onSendfile(); } @@ -1125,7 +1207,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } else { this.blob.size = @truncate(Blob.SizeType, result.result.buf.len); this.response_buf_owned = .{ .items = result.result.buf, .capacity = result.result.buf.len }; - this.renderResponseBuffer(); + this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); } } @@ -1481,6 +1563,41 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair); return; }, + + .Bytes => |byte_stream| { + std.debug.assert(byte_stream.pipe.ctx == null); + std.debug.assert(this.byte_stream == null); + + stream.detach(this.server.globalThis); + + this.response_buf_owned = byte_stream.buffer.moveToUnmanaged(); + + // If we've received the complete body by the time this function is called + // we can avoid streaming it and just send it all at once. + if (byte_stream.has_received_last_chunk) { + this.blob.size = @truncate(Blob.SizeType, this.response_buf_owned.items.len); + byte_stream.parent().deinit(); + this.renderResponseBufferAndMetadataCorked(); + return; + } + + byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this); + this.byte_stream = byte_stream; + + // we don't set size here because even if we have a hint + // uWebSockets won't let us partially write streaming content + this.blob.size = 0; + + // if we've received metadata and part of the body, send everything we can and drain + if (this.response_buf_owned.items.len > 0) { + this.drainResponseBufferAndMetadataCorked(); + } else { + // if we only have metadata to send, send it now + this.resp.runCorkedWithType(*RequestContext, renderMetadata, this); + } + this.setAbortHandler(); + return; + }, } } @@ -1496,6 +1613,42 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.doRenderBlob(); } + pub fn onPipe(this: *RequestContext, stream: JSC.WebCore.StreamResult, allocator: std.mem.Allocator) void { + var stream_needs_deinit = stream == .owned or stream == .owned_and_done; + + defer { + if (stream_needs_deinit) { + if (stream.isDone()) { + stream.owned_and_done.listManaged(allocator).deinit(); + } else { + stream.owned.listManaged(allocator).deinit(); + } + } + } + + if (this.aborted) { + this.finalizeForAbort(); + return; + } + + const chunk = stream.slice(); + // on failure, it will continue to allocate + // we can't do buffering ourselves here or it won't work + // uSockets will append and manage the buffer + // so any write will buffer if the write fails + if (this.resp.write(chunk)) { + if (stream.isDone()) { + this.resp.endStream(false); + this.finalize(); + } + } else { + // when it's the last one, we just want to know if it's done + if (stream.isDone()) { + this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this); + } + } + } + pub fn doRenderBlob(this: *RequestContext) void { // We are not corked // The body is small @@ -1712,34 +1865,123 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.aborted) return; - this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); + const request = JSC.JSValue.fromRef(this.request_js_object); + var req = request.as(Request) orelse { + this.request_body_buf.clearAndFree(this.allocator); + return; + }; + + if (req.body == .Locked) { + if (req.body.Locked.readable) |readable| { + if (readable.ptr == .Bytes) { + std.debug.assert(this.request_body_buf.items.len == 0); + + if (!last) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.init(chunk), + }, + bun.default_allocator, + ); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.init(chunk), + }, + bun.default_allocator, + ); + } + + return; + } + } + } + if (last) { - const request = JSC.JSValue.fromRef(this.request_js_object); - if (request.as(Request)) |req| { - request.ensureStillAlive(); - var bytes = this.request_body_buf.toOwnedSlice(this.allocator); - var old = req.body; - req.body = .{ - .Blob = if (bytes.len > 0) - Blob.init(bytes, this.allocator, this.server.globalThis) - else - Blob.initEmpty(this.server.globalThis), - }; - if (old == .Locked) - old.resolve(&req.body, this.server.globalThis); - request.unprotect(); + request.ensureStillAlive(); + var bytes = this.request_body_buf; + var old = req.body; + if (bytes.items.len == 0) { + req.body = .{ .Empty = {} }; } else { - this.request_body_buf.clearAndFree(this.allocator); + req.body = .{ .InternalBlob = bytes.toManaged(this.allocator) }; } + + if (old == .Locked) + old.resolve(&req.body, this.server.globalThis); + request.unprotect(); } + + if (this.request_body_buf.capacity == 0) { + this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @minimum(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer"); + } + + this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); } + pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult { + if (this.aborted) { + return JSC.WebCore.DrainResult{ + .aborted = void{}, + }; + } + + std.debug.assert(!this.resp.hasResponded()); + + // This means we have received part of the body but not the whole thing + if (this.request_body_buf.items.len > 0) { + var emptied = this.request_body_buf; + this.request_body_buf = .{}; + return .{ + .owned = .{ + .list = emptied.toManaged(this.allocator), + .size_hint = if (emptied.capacity < max_request_body_preallocate_length) + emptied.capacity + else + 0, + }, + }; + } + + const content_length = this.req.header("content-length") orelse { + return .{ + .empty = void{}, + }; + }; + + const len = std.fmt.parseInt(usize, content_length, 10) catch 0; + this.request_body_content_len = len; + + if (len == 0) { + return JSC.WebCore.DrainResult{ + .empty = void{}, + }; + } + + if (len > this.server.config.max_request_body_size) { + this.resp.writeStatus("413 Request Entity Too Large"); + this.resp.endWithoutBody(); + + this.finalize(); + return JSC.WebCore.DrainResult{ + .aborted = void{}, + }; + } + + this.resp.onData(*RequestContext, onBufferedBodyChunk, this); + + return .{ + .estimated_size = len, + }; + } + const max_request_body_preallocate_length = 1024 * 256; pub fn onPull(this: *RequestContext) void { const request = JSC.JSValue.c(this.request_js_object); request.ensureStillAlive(); if (this.req.header("content-length")) |content_length| { const len = std.fmt.parseInt(usize, content_length, 10) catch 0; + this.request_body_content_len = len; if (len == 0) { if (request.as(Request)) |req| { var old = req.body; @@ -1766,8 +2008,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.finalize(); return; } - - this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, len) catch @panic("Out of memory while allocating request body buffer"); } else if (this.req.header("transfer-encoding") == null) { // no content-length // no transfer-encoding @@ -1789,6 +2029,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp onPull(bun.cast(*RequestContext, this)); } + pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult { + return onDrainRequestBody(bun.cast(*RequestContext, this)); + } + pub const Export = shim.exportFunctions(.{ .onResolve = onResolve, .onReject = onReject, @@ -2243,6 +2487,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { .task = ctx, .global = this.globalThis, .onPull = RequestContext.onPullCallback, + .onDrain = RequestContext.onDrainRequestBodyCallback, }, }, }; |