diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/api/server.zig | 485 |
1 files changed, 247 insertions, 238 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 80565e84b..6c666a36e 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1152,6 +1152,251 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.doRenderBlob(); } + + const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream }; + + fn doRenderStream(pair: *StreamPair) void { + var this = pair.this; + var stream = pair.stream; + // uWS automatically adds the status line if needed + // we want to batch network calls as much as possible + if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) { + this.renderMetadata(); + } + + stream.value.ensureStillAlive(); + + var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable; + response_stream.* = ResponseStream.JSSink{ + .sink = .{ + .res = this.resp, + .allocator = this.allocator, + .buffer = bun.ByteList.init(""), + }, + }; + var signal = &response_stream.sink.signal; + this.sink = response_stream; + + signal.* = ResponseStream.JSSink.SinkSignal.init(JSValue.zero); + + // explicitly set it to a dead pointer + // we use this memory address to disable signals being sent + signal.clear(); + std.debug.assert(signal.isDead()); + + // We are already corked! + const assignment_result: JSValue = ResponseStream.JSSink.assignToStream( + this.server.globalThis, + stream.value, + response_stream, + @ptrCast(**anyopaque, &signal.ptr), + ); + + assignment_result.ensureStillAlive(); + // assert that it was updated + std.debug.assert(!signal.isDead()); + + if (comptime Environment.allow_assert) { + if (this.resp.hasResponded()) { + streamLog("responded", .{}); + } + } + + this.aborted = this.aborted or response_stream.sink.aborted; + + if (assignment_result.isAnyError(this.server.globalThis)) { + streamLog("returned an error", .{}); + if (!this.aborted) this.resp.clearAborted(); + response_stream.detach(); + this.sink = null; + response_stream.sink.destroy(); + stream.value.unprotect(); + return this.handleReject(assignment_result); + } + + if (response_stream.sink.done or + // TODO: is there a condition where resp could be freed before done? + this.resp.hasResponded()) + { + if (!this.aborted) this.resp.clearAborted(); + const wrote_anything = response_stream.sink.wrote > 0; + streamLog("is done", .{}); + const responded = this.resp.hasResponded(); + + response_stream.detach(); + this.sink = null; + response_stream.sink.destroy(); + if (!responded and !wrote_anything and !this.aborted) { + this.renderMissing(); + return; + } else if (wrote_anything and !responded and !this.aborted) { + this.resp.endStream(false); + } + + this.finalize(); + stream.value.unprotect(); + + return; + } + + if (!assignment_result.isEmptyOrUndefinedOrNull()) { + assignment_result.ensureStillAlive(); + // it returns a Promise when it goes through ReadableStreamDefaultReader + if (assignment_result.asPromise()) |promise| { + const AwaitPromise = struct { + pub fn onResolve(req: *RequestContext, _: *JSGlobalObject, _: []const JSC.JSValue) void { + streamLog("onResolve", .{}); + var wrote_anything = false; + + if (req.sink) |wrapper| { + wrapper.sink.pending_flush = null; + wrapper.sink.done = true; + req.aborted = req.aborted or wrapper.sink.aborted; + wrote_anything = wrapper.sink.wrote > 0; + wrapper.sink.finalize(); + wrapper.detach(); + req.sink = null; + wrapper.sink.destroy(); + } + + if (req.response_ptr) |resp| { + if (resp.body.value == .Locked) { + resp.body.value.Locked.readable.?.done(); + resp.body.value = .{ .Used = {} }; + } + } + + if (req.aborted) { + req.finalizeForAbort(); + return; + } + + const responded = req.resp.hasResponded(); + + if (!responded and !wrote_anything) { + req.resp.clearAborted(); + req.renderMissing(); + return; + } else if (!responded and wrote_anything and !req.aborted) { + req.resp.clearAborted(); + req.resp.endStream(false); + } + + req.finalize(); + } + pub fn onReject(req: *RequestContext, globalThis: *JSGlobalObject, args: []const JSC.JSValue) void { + var wrote_anything = req.has_written_status; + + if (req.sink) |wrapper| { + wrapper.sink.pending_flush = null; + wrapper.sink.done = true; + wrote_anything = wrote_anything or wrapper.sink.wrote > 0; + req.aborted = req.aborted or wrapper.sink.aborted; + wrapper.sink.finalize(); + wrapper.detach(); + req.sink = null; + wrapper.sink.destroy(); + } + + if (req.response_ptr) |resp| { + if (resp.body.value == .Locked) { + resp.body.value.Locked.readable.?.done(); + resp.body.value = .{ .Used = {} }; + } + } + + streamLog("onReject({s})", .{wrote_anything}); + + if (req.aborted) { + req.finalizeForAbort(); + return; + } + + if (args.len > 0 and !wrote_anything) { + req.response_jsvalue.unprotect(); + req.response_jsvalue = JSValue.zero; + req.handleReject(args[0]); + return; + } else if (wrote_anything) { + req.resp.endStream(true); + if (comptime debug_mode) { + if (args.len > 0) { + var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator); + defer exception_list.deinit(); + req.server.vm.runErrorHandler(args[0], &exception_list); + } + } + req.finalize(); + return; + } + + const fallback = JSC.SystemError{ + .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))), + .message = ZigString.init("Unhandled error in ReadableStream"), + }; + req.handleReject(fallback.toErrorInstance(globalThis)); + } + }; + + streamLog("returned a promise", .{}); + switch (promise.status(this.server.globalThis.vm())) { + .Pending => { + // TODO: should this timeout? + this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); + this.response_ptr.?.body.value = .{ + .Locked = .{ + .readable = stream, + .global = this.server.globalThis, + }, + }; + assignment_result.then( + this.server.globalThis, + RequestContext, + this, + AwaitPromise.onResolve, + AwaitPromise.onReject, + ); + // the response_stream should be GC'd + + }, + .Fulfilled => { + AwaitPromise.onResolve(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())}); + }, + .Rejected => { + AwaitPromise.onReject(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())}); + }, + } + return; + } + } + + if (this.aborted) { + response_stream.detach(); + stream.cancel(this.server.globalThis); + response_stream.sink.done = true; + this.finalizeForAbort(); + + response_stream.sink.finalize(); + stream.value.unprotect(); + return; + } + + stream.value.ensureStillAlive(); + + if (!stream.isLocked(this.server.globalThis)) { + streamLog("is not locked", .{}); + this.renderMissing(); + return; + } + + this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); + streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); + this.finalize(); + stream.value.unprotect(); + } + + const streamLog = Output.scoped(.ReadableStream, false); + pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void { switch (value.*) { .Error => { @@ -1180,7 +1425,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp stream.value.ensureStillAlive(); value.* = .{ .Used = {} }; - const streamLog = Output.scoped(.ReadableStream, false); if (stream.isLocked(this.server.globalThis)) { streamLog("was locked but it shouldn't be", .{}); @@ -1226,243 +1470,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .JavaScript, .Direct => { - // uWS automatically adds the status line if needed - // we want to batch network calls as much as possible - if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) { - this.renderMetadata(); - } - - stream.value.ensureStillAlive(); - - var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable; - response_stream.* = ResponseStream.JSSink{ - .sink = .{ - .res = this.resp, - .allocator = this.allocator, - .buffer = bun.ByteList.init(""), - }, - }; - var signal = &response_stream.sink.signal; - this.sink = response_stream; - - signal.* = ResponseStream.JSSink.SinkSignal.init(JSValue.zero); - - // explicitly set it to a dead pointer - // we use this memory address to disable signals being sent - signal.clear(); - std.debug.assert(signal.isDead()); - - const assignment_result: JSValue = this.resp.corked( - ResponseStream.JSSink.assignToStream, - .{ - this.server.globalThis, - stream.value, - response_stream, - @ptrCast(**anyopaque, &signal.ptr), - }, - ); - - assignment_result.ensureStillAlive(); - // assert that it was updated - std.debug.assert(!signal.isDead()); - - if (comptime Environment.allow_assert) { - if (this.resp.hasResponded()) { - streamLog("responded", .{}); - } - } - - this.aborted = this.aborted or response_stream.sink.aborted; - - if (assignment_result.isAnyError(this.server.globalThis)) { - streamLog("returned an error", .{}); - if (!this.aborted) this.resp.clearAborted(); - response_stream.detach(); - this.sink = null; - response_stream.sink.destroy(); - stream.value.unprotect(); - return this.handleReject(assignment_result); - } - - if (response_stream.sink.done or - // TODO: is there a condition where resp could be freed before done? - this.resp.hasResponded()) - { - if (!this.aborted) this.resp.clearAborted(); - const wrote_anything = response_stream.sink.wrote > 0; - streamLog("is done", .{}); - const responded = this.resp.hasResponded(); - - response_stream.detach(); - this.sink = null; - response_stream.sink.destroy(); - if (!responded and !wrote_anything and !this.aborted) { - this.renderMissing(); - return; - } else if (wrote_anything and !responded and !this.aborted) { - this.resp.endStream(false); - } - - this.finalize(); - stream.value.unprotect(); - - return; - } - - if (!assignment_result.isEmptyOrUndefinedOrNull()) { - assignment_result.ensureStillAlive(); - // it returns a Promise when it goes through ReadableStreamDefaultReader - if (assignment_result.asPromise()) |promise| { - const AwaitPromise = struct { - pub fn onResolve(req: *RequestContext, _: *JSGlobalObject, _: []const JSC.JSValue) void { - streamLog("onResolve", .{}); - var wrote_anything = false; - - if (req.sink) |wrapper| { - wrapper.sink.pending_flush = null; - wrapper.sink.done = true; - req.aborted = req.aborted or wrapper.sink.aborted; - wrote_anything = wrapper.sink.wrote > 0; - wrapper.sink.finalize(); - wrapper.detach(); - req.sink = null; - wrapper.sink.destroy(); - } - - if (req.response_ptr) |resp| { - if (resp.body.value == .Locked) { - resp.body.value.Locked.readable.?.done(); - resp.body.value = .{ .Used = {} }; - } - } - - if (req.aborted) { - req.finalizeForAbort(); - return; - } - - const responded = req.resp.hasResponded(); - - if (!responded and !wrote_anything) { - req.resp.clearAborted(); - req.renderMissing(); - return; - } else if (!responded and wrote_anything and !req.aborted) { - req.resp.clearAborted(); - req.resp.endStream(false); - } - - req.finalize(); - } - pub fn onReject(req: *RequestContext, globalThis: *JSGlobalObject, args: []const JSC.JSValue) void { - var wrote_anything = req.has_written_status; - - if (req.sink) |wrapper| { - wrapper.sink.pending_flush = null; - wrapper.sink.done = true; - wrote_anything = wrote_anything or wrapper.sink.wrote > 0; - req.aborted = req.aborted or wrapper.sink.aborted; - wrapper.sink.finalize(); - wrapper.detach(); - req.sink = null; - wrapper.sink.destroy(); - } - - if (req.response_ptr) |resp| { - if (resp.body.value == .Locked) { - resp.body.value.Locked.readable.?.done(); - resp.body.value = .{ .Used = {} }; - } - } - - streamLog("onReject({s})", .{wrote_anything}); - - if (req.aborted) { - req.finalizeForAbort(); - return; - } - - if (args.len > 0 and !wrote_anything) { - req.response_jsvalue.unprotect(); - req.response_jsvalue = JSValue.zero; - req.handleReject(args[0]); - return; - } else if (wrote_anything) { - req.resp.endStream(true); - if (comptime debug_mode) { - if (args.len > 0) { - var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator); - defer exception_list.deinit(); - req.server.vm.runErrorHandler(args[0], &exception_list); - } - } - req.finalize(); - return; - } - - const fallback = JSC.SystemError{ - .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))), - .message = ZigString.init("Unhandled error in ReadableStream"), - }; - req.handleReject(fallback.toErrorInstance(globalThis)); - } - }; - - streamLog("returned a promise", .{}); - switch (promise.status(this.server.globalThis.vm())) { - .Pending => { - // TODO: should this timeout? - this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); - this.response_ptr.?.body.value = .{ - .Locked = .{ - .readable = stream, - .global = this.server.globalThis, - }, - }; - assignment_result.then( - this.server.globalThis, - RequestContext, - this, - AwaitPromise.onResolve, - AwaitPromise.onReject, - ); - // the response_stream should be GC'd - - }, - .Fulfilled => { - AwaitPromise.onResolve(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())}); - }, - .Rejected => { - AwaitPromise.onReject(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())}); - }, - } - return; - } - } - - if (this.aborted) { - response_stream.detach(); - stream.cancel(this.server.globalThis); - response_stream.sink.done = true; - this.finalizeForAbort(); - - response_stream.sink.finalize(); - stream.value.unprotect(); - return; - } - - stream.value.ensureStillAlive(); - - if (!stream.isLocked(this.server.globalThis)) { - streamLog("is not locked", .{}); - this.renderMissing(); - return; - } - - this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink); - streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); - this.finalize(); - stream.value.unprotect(); + var pair = StreamPair{ .stream = stream, .this = this }; + this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair); return; }, } |