diff options
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r-- | src/bun.js/api/server.zig | 275 |
1 files changed, 254 insertions, 21 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index cb3c4387a..35abc0a7f 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -503,7 +503,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const App = uws.NewApp(ssl_enabled); pub threadlocal var pool: ?*RequestContext.RequestContextStackAllocator = null; pub threadlocal var pool_allocator: std.mem.Allocator = undefined; - + pub const ResponseStream = JSC.WebCore.HTTPServerWritable(ssl_enabled); pub const RequestContextStackAllocator = NewRequestContextStackAllocator(RequestContext, 2048); pub const name = "HTTPRequestContext" ++ (if (debug_mode) "Debug" else "") ++ (if (ThisServer.ssl_enabled) "TLS" else ""); pub const shim = JSC.Shimmer("Bun", name, @This()); @@ -537,6 +537,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp request_js_object: JSC.C.JSObjectRef = null, request_body_buf: std.ArrayListUnmanaged(u8) = .{}, + has_written_status: bool = false, + /// Used either for temporary blob data or fallback /// When the response body is a temporary value response_buf_owned: std.ArrayListUnmanaged(u8) = .{}, @@ -628,11 +630,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn renderMissing(ctx: *RequestContext) void { if (comptime !debug_mode) { - ctx.resp.writeStatus("204 No Content"); + if (!ctx.has_written_status) + ctx.resp.writeStatus("204 No Content"); + ctx.has_written_status = true; ctx.resp.endWithoutBody(); ctx.finalize(); } else { - ctx.resp.writeStatus("200 OK"); + if (!ctx.has_written_status) + ctx.resp.writeStatus("200 OK"); + ctx.has_written_status = true; ctx.resp.end("Welcome to Bun! To get started, return a Response object.", false); ctx.finalize(); } @@ -646,8 +652,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp comptime fmt: string, args: anytype, ) void { - this.resp.writeStatus("500 Internal Server Error"); - this.resp.writeHeader("content-type", MimeType.html.value); + if (!this.has_written_status) { + this.has_written_status = true; + + this.resp.writeStatus("500 Internal Server Error"); + this.resp.writeHeader("content-type", MimeType.html.value); + } const allocator = this.allocator; @@ -1122,6 +1132,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp doRenderWithBody(bun.cast(*RequestContext, this), value); } + fn renderWithBlobFromBodyValue(this: *RequestContext) void { + if (this.aborted) { + this.finalizeForAbort(); + return; + } + + if (this.blob.needsToReadFile()) { + this.req.setYield(false); + if (!this.has_sendfile_ctx) + this.doSendfile(this.blob); + return; + } + + this.doRenderBlob(); + } pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void { switch (value.*) { .Error => { @@ -1136,23 +1161,216 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .Blob => { this.blob = value.use(); - + this.renderWithBlobFromBodyValue(); + return; + }, + .Locked => |*lock| { if (this.aborted) { this.finalizeForAbort(); return; } - if (this.blob.needsToReadFile()) { - this.req.setYield(false); - if (!this.has_sendfile_ctx) - this.doSendfile(this.blob); - return; - } - }, - // TODO: this needs to support streaming! - .Locked => |*lock| { lock.callback = doRenderWithBodyLocked; lock.task = this; + if (lock.readable) |stream_| { + const stream: JSC.WebCore.ReadableStream = stream_; + stream.value.ensureStillAlive(); + value.* = .{ .Used = {} }; + + if (stream.isLocked(this.server.globalThis)) { + Output.debug("response_stream was locked but it shouldn't be", .{}); + var err = JSC.SystemError{ + .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE))), + .message = ZigString.init("Stream already used, please create a new one"), + }; + + this.runErrorHandler(err.toErrorInstance(this.server.globalThis)); + return; + } + + switch (stream.ptr) { + .Invalid => {}, + + // fast path for Blob + .Blob => |val| { + Output.debug("response_stream was Blob", .{}); + this.blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis); + this.blob.offset = val.offset; + this.blob.size = val.remain; + + val.store.ref(); + stream.detach(this.server.globalThis); + val.deinit(); + this.renderWithBlobFromBodyValue(); + return; + }, + + // fast path for File + .File => |val| { + Output.debug("response_stream was File Blob", .{}); + this.blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis); + val.store.ref(); + + // it should be lazy, file shouldn't have opened yet. + std.debug.assert(!val.started); + + stream.detach(this.server.globalThis); + val.deinit(); + this.renderWithBlobFromBodyValue(); + return; + }, + + .JavaScript, .Direct => { + if (this.has_abort_handler) + this.resp.runCorked(*RequestContext, renderMetadata, this) + else + this.renderMetadata(); + + stream.value.ensureStillAlive(); + var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable; + response_stream.* = ResponseStream.JSSink{ + .sink = .{ + .res = this.resp, + .allocator = this.allocator, + .buffer = bun.ByteList.init(""), + }, + }; + var signal = &response_stream.sink.signal; + signal.* = ResponseStream.JSSink.SinkSignal.init(JSValue.zero); + + // explicitly set it to a dead pointer + // we use this memory address to disable signals being sent + signal.clear(); + std.debug.assert(signal.isDead()); + + const assignment_result: JSValue = ResponseStream.JSSink.assignToStream( + this.server.globalThis, + stream.value, + response_stream, + @ptrCast(**anyopaque, &signal.ptr), + ); + + // assert that it was updated + std.debug.assert(!signal.isDead()); + + if (comptime Environment.allow_assert) { + if (this.resp.hasResponded()) { + Output.debug("response_stream responded", .{}); + } + } + + this.aborted = this.aborted or response_stream.sink.aborted; + + if (assignment_result.isAnyError(this.server.globalThis)) { + Output.debug("response_stream returned an error", .{}); + response_stream.detach(); + this.allocator.destroy(response_stream); + return this.handleReject(assignment_result); + } + + if (response_stream.sink.done or + // TODO: is there a condition where resp could be freed before done? + this.resp.hasResponded()) + { + Output.debug("response_stream is done", .{}); + response_stream.detach(); + this.allocator.destroy(response_stream); + + if (!this.resp.hasResponded()) { + this.renderMissing(); + return; + } + + this.finalize(); + return; + } + + // it returns a Promise when it goes through ReadableStreamDefaultReader + if (assignment_result.asPromise()) |promise| { + Output.debug("response_stream returned a promise", .{}); + switch (promise.status(this.server.globalThis.vm())) { + .Pending => { + // TODO: should this timeout? + this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); + const AwaitPromise = struct { + pub fn onResolve(req: *RequestContext, _: *JSGlobalObject, _: []const JSC.JSValue) void { + Output.debug("response_stream promise resolved", .{}); + if (!req.resp.hasResponded()) { + req.renderMissing(); + return; + } + req.finalize(); + } + pub fn onReject(req: *RequestContext, globalThis: *JSGlobalObject, args: []const JSC.JSValue) void { + Output.debug("response_stream promise rejected", .{}); + if (args.len > 0) { + req.handleReject(args[0]); + return; + } + + const fallback = JSC.SystemError{ + .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))), + .message = ZigString.init("Unhandled error in ReadableStream"), + }; + req.handleReject(fallback.toErrorInstance(globalThis)); + } + }; + assignment_result.then( + this.server.globalThis, + RequestContext, + this, + AwaitPromise.onResolve, + AwaitPromise.onReject, + ); + // the response_stream should be GC'd + return; + }, + .Fulfilled => { + this.aborted = this.aborted or response_stream.sink.aborted; + response_stream.detach(); + + this.allocator.destroy(response_stream); + + _ = promise.result(this.server.globalThis.vm()); + if (!this.resp.hasResponded()) { + this.renderMissing(); + return; + } + this.finalize(); + return; + }, + .Rejected => { + this.aborted = this.aborted or response_stream.sink.aborted; + response_stream.detach(); + this.allocator.destroy(response_stream); + + this.handleReject(promise.result(this.server.globalThis.vm())); + return; + }, + } + return; + } + + if (this.aborted) { + this.finalizeForAbort(); + return; + } + + stream.value.ensureStillAlive(); + + if (!stream.isLocked(this.server.globalThis)) { + Output.debug("response_stream is not locked", .{}); + this.renderMissing(); + return; + } + + this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); + Output.debug("response_stream is in progress, but did not return a Promise. Finalizing request context", .{}); + this.finalize(); + return; + }, + } + } return; }, else => {}, @@ -1182,12 +1400,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn renderProductionError(this: *RequestContext, status: u16) void { switch (status) { 404 => { - this.resp.writeStatus("404 Not Found"); + if (!this.has_written_status) { + this.resp.writeStatus("404 Not Found"); + this.has_written_status = true; + } + this.resp.endWithoutBody(); }, else => { - this.resp.writeStatus("500 Internal Server Error"); - this.resp.writeHeader("content-type", "text/plain"); + if (!this.has_written_status) { + this.resp.writeStatus("500 Internal Server Error"); + this.resp.writeHeader("content-type", "text/plain"); + this.has_written_status = true; + } + this.resp.end("Something went wrong!", true); }, } @@ -1236,7 +1462,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp error.ExceptionOcurred, exception_list.toOwnedSlice(), "<r><red>{s}<r> - <b>{s}<r> failed", - .{ std.mem.span(@tagName(this.method)), this.url }, + .{ @as(string, @tagName(this.method)), this.url }, ); } else { if (status != 404) @@ -1251,11 +1477,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp var response: *JSC.WebCore.Response = this.response_ptr.?; var status = response.statusCode(); const size = this.blob.size; - status = if (status == 200 and size == 0) + status = if (status == 200 and size == 0 and !this.blob.isDetached()) 204 else status; + std.debug.assert(!this.has_written_status); + this.has_written_status = true; + this.writeStatus(status); var needs_content_type = true; const content_type: MimeType = brk: { @@ -1284,7 +1513,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp headers_.deref(); } - if (needs_content_type) { + if (needs_content_type and + // do not insert the content type if it is the fallback value + // we may not know the content-type when streaming + (!this.blob.isDetached() or content_type.value.ptr != MimeType.other.value.ptr)) + { this.resp.writeHeader("content-type", content_type.value); } |