aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/api/server.zig485
1 files changed, 247 insertions, 238 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 80565e84b..6c666a36e 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1152,6 +1152,251 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.doRenderBlob();
}
+
+ const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream };
+
+ fn doRenderStream(pair: *StreamPair) void {
+ var this = pair.this;
+ var stream = pair.stream;
+ // uWS automatically adds the status line if needed
+ // we want to batch network calls as much as possible
+ if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) {
+ this.renderMetadata();
+ }
+
+ stream.value.ensureStillAlive();
+
+ var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
+ response_stream.* = ResponseStream.JSSink{
+ .sink = .{
+ .res = this.resp,
+ .allocator = this.allocator,
+ .buffer = bun.ByteList.init(""),
+ },
+ };
+ var signal = &response_stream.sink.signal;
+ this.sink = response_stream;
+
+ signal.* = ResponseStream.JSSink.SinkSignal.init(JSValue.zero);
+
+ // explicitly set it to a dead pointer
+ // we use this memory address to disable signals being sent
+ signal.clear();
+ std.debug.assert(signal.isDead());
+
+ // We are already corked!
+ const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
+ this.server.globalThis,
+ stream.value,
+ response_stream,
+ @ptrCast(**anyopaque, &signal.ptr),
+ );
+
+ assignment_result.ensureStillAlive();
+ // assert that it was updated
+ std.debug.assert(!signal.isDead());
+
+ if (comptime Environment.allow_assert) {
+ if (this.resp.hasResponded()) {
+ streamLog("responded", .{});
+ }
+ }
+
+ this.aborted = this.aborted or response_stream.sink.aborted;
+
+ if (assignment_result.isAnyError(this.server.globalThis)) {
+ streamLog("returned an error", .{});
+ if (!this.aborted) this.resp.clearAborted();
+ response_stream.detach();
+ this.sink = null;
+ response_stream.sink.destroy();
+ stream.value.unprotect();
+ return this.handleReject(assignment_result);
+ }
+
+ if (response_stream.sink.done or
+ // TODO: is there a condition where resp could be freed before done?
+ this.resp.hasResponded())
+ {
+ if (!this.aborted) this.resp.clearAborted();
+ const wrote_anything = response_stream.sink.wrote > 0;
+ streamLog("is done", .{});
+ const responded = this.resp.hasResponded();
+
+ response_stream.detach();
+ this.sink = null;
+ response_stream.sink.destroy();
+ if (!responded and !wrote_anything and !this.aborted) {
+ this.renderMissing();
+ return;
+ } else if (wrote_anything and !responded and !this.aborted) {
+ this.resp.endStream(false);
+ }
+
+ this.finalize();
+ stream.value.unprotect();
+
+ return;
+ }
+
+ if (!assignment_result.isEmptyOrUndefinedOrNull()) {
+ assignment_result.ensureStillAlive();
+ // it returns a Promise when it goes through ReadableStreamDefaultReader
+ if (assignment_result.asPromise()) |promise| {
+ const AwaitPromise = struct {
+ pub fn onResolve(req: *RequestContext, _: *JSGlobalObject, _: []const JSC.JSValue) void {
+ streamLog("onResolve", .{});
+ var wrote_anything = false;
+
+ if (req.sink) |wrapper| {
+ wrapper.sink.pending_flush = null;
+ wrapper.sink.done = true;
+ req.aborted = req.aborted or wrapper.sink.aborted;
+ wrote_anything = wrapper.sink.wrote > 0;
+ wrapper.sink.finalize();
+ wrapper.detach();
+ req.sink = null;
+ wrapper.sink.destroy();
+ }
+
+ if (req.response_ptr) |resp| {
+ if (resp.body.value == .Locked) {
+ resp.body.value.Locked.readable.?.done();
+ resp.body.value = .{ .Used = {} };
+ }
+ }
+
+ if (req.aborted) {
+ req.finalizeForAbort();
+ return;
+ }
+
+ const responded = req.resp.hasResponded();
+
+ if (!responded and !wrote_anything) {
+ req.resp.clearAborted();
+ req.renderMissing();
+ return;
+ } else if (!responded and wrote_anything and !req.aborted) {
+ req.resp.clearAborted();
+ req.resp.endStream(false);
+ }
+
+ req.finalize();
+ }
+ pub fn onReject(req: *RequestContext, globalThis: *JSGlobalObject, args: []const JSC.JSValue) void {
+ var wrote_anything = req.has_written_status;
+
+ if (req.sink) |wrapper| {
+ wrapper.sink.pending_flush = null;
+ wrapper.sink.done = true;
+ wrote_anything = wrote_anything or wrapper.sink.wrote > 0;
+ req.aborted = req.aborted or wrapper.sink.aborted;
+ wrapper.sink.finalize();
+ wrapper.detach();
+ req.sink = null;
+ wrapper.sink.destroy();
+ }
+
+ if (req.response_ptr) |resp| {
+ if (resp.body.value == .Locked) {
+ resp.body.value.Locked.readable.?.done();
+ resp.body.value = .{ .Used = {} };
+ }
+ }
+
+ streamLog("onReject({s})", .{wrote_anything});
+
+ if (req.aborted) {
+ req.finalizeForAbort();
+ return;
+ }
+
+ if (args.len > 0 and !wrote_anything) {
+ req.response_jsvalue.unprotect();
+ req.response_jsvalue = JSValue.zero;
+ req.handleReject(args[0]);
+ return;
+ } else if (wrote_anything) {
+ req.resp.endStream(true);
+ if (comptime debug_mode) {
+ if (args.len > 0) {
+ var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
+ defer exception_list.deinit();
+ req.server.vm.runErrorHandler(args[0], &exception_list);
+ }
+ }
+ req.finalize();
+ return;
+ }
+
+ const fallback = JSC.SystemError{
+ .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))),
+ .message = ZigString.init("Unhandled error in ReadableStream"),
+ };
+ req.handleReject(fallback.toErrorInstance(globalThis));
+ }
+ };
+
+ streamLog("returned a promise", .{});
+ switch (promise.status(this.server.globalThis.vm())) {
+ .Pending => {
+ // TODO: should this timeout?
+ this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
+ this.response_ptr.?.body.value = .{
+ .Locked = .{
+ .readable = stream,
+ .global = this.server.globalThis,
+ },
+ };
+ assignment_result.then(
+ this.server.globalThis,
+ RequestContext,
+ this,
+ AwaitPromise.onResolve,
+ AwaitPromise.onReject,
+ );
+ // the response_stream should be GC'd
+
+ },
+ .Fulfilled => {
+ AwaitPromise.onResolve(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())});
+ },
+ .Rejected => {
+ AwaitPromise.onReject(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())});
+ },
+ }
+ return;
+ }
+ }
+
+ if (this.aborted) {
+ response_stream.detach();
+ stream.cancel(this.server.globalThis);
+ response_stream.sink.done = true;
+ this.finalizeForAbort();
+
+ response_stream.sink.finalize();
+ stream.value.unprotect();
+ return;
+ }
+
+ stream.value.ensureStillAlive();
+
+ if (!stream.isLocked(this.server.globalThis)) {
+ streamLog("is not locked", .{});
+ this.renderMissing();
+ return;
+ }
+
+ this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
+ streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
+ this.finalize();
+ stream.value.unprotect();
+ }
+
+ const streamLog = Output.scoped(.ReadableStream, false);
+
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
switch (value.*) {
.Error => {
@@ -1180,7 +1425,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
stream.value.ensureStillAlive();
value.* = .{ .Used = {} };
- const streamLog = Output.scoped(.ReadableStream, false);
if (stream.isLocked(this.server.globalThis)) {
streamLog("was locked but it shouldn't be", .{});
@@ -1226,243 +1470,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.JavaScript, .Direct => {
- // uWS automatically adds the status line if needed
- // we want to batch network calls as much as possible
- if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) {
- this.renderMetadata();
- }
-
- stream.value.ensureStillAlive();
-
- var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
- response_stream.* = ResponseStream.JSSink{
- .sink = .{
- .res = this.resp,
- .allocator = this.allocator,
- .buffer = bun.ByteList.init(""),
- },
- };
- var signal = &response_stream.sink.signal;
- this.sink = response_stream;
-
- signal.* = ResponseStream.JSSink.SinkSignal.init(JSValue.zero);
-
- // explicitly set it to a dead pointer
- // we use this memory address to disable signals being sent
- signal.clear();
- std.debug.assert(signal.isDead());
-
- const assignment_result: JSValue = this.resp.corked(
- ResponseStream.JSSink.assignToStream,
- .{
- this.server.globalThis,
- stream.value,
- response_stream,
- @ptrCast(**anyopaque, &signal.ptr),
- },
- );
-
- assignment_result.ensureStillAlive();
- // assert that it was updated
- std.debug.assert(!signal.isDead());
-
- if (comptime Environment.allow_assert) {
- if (this.resp.hasResponded()) {
- streamLog("responded", .{});
- }
- }
-
- this.aborted = this.aborted or response_stream.sink.aborted;
-
- if (assignment_result.isAnyError(this.server.globalThis)) {
- streamLog("returned an error", .{});
- if (!this.aborted) this.resp.clearAborted();
- response_stream.detach();
- this.sink = null;
- response_stream.sink.destroy();
- stream.value.unprotect();
- return this.handleReject(assignment_result);
- }
-
- if (response_stream.sink.done or
- // TODO: is there a condition where resp could be freed before done?
- this.resp.hasResponded())
- {
- if (!this.aborted) this.resp.clearAborted();
- const wrote_anything = response_stream.sink.wrote > 0;
- streamLog("is done", .{});
- const responded = this.resp.hasResponded();
-
- response_stream.detach();
- this.sink = null;
- response_stream.sink.destroy();
- if (!responded and !wrote_anything and !this.aborted) {
- this.renderMissing();
- return;
- } else if (wrote_anything and !responded and !this.aborted) {
- this.resp.endStream(false);
- }
-
- this.finalize();
- stream.value.unprotect();
-
- return;
- }
-
- if (!assignment_result.isEmptyOrUndefinedOrNull()) {
- assignment_result.ensureStillAlive();
- // it returns a Promise when it goes through ReadableStreamDefaultReader
- if (assignment_result.asPromise()) |promise| {
- const AwaitPromise = struct {
- pub fn onResolve(req: *RequestContext, _: *JSGlobalObject, _: []const JSC.JSValue) void {
- streamLog("onResolve", .{});
- var wrote_anything = false;
-
- if (req.sink) |wrapper| {
- wrapper.sink.pending_flush = null;
- wrapper.sink.done = true;
- req.aborted = req.aborted or wrapper.sink.aborted;
- wrote_anything = wrapper.sink.wrote > 0;
- wrapper.sink.finalize();
- wrapper.detach();
- req.sink = null;
- wrapper.sink.destroy();
- }
-
- if (req.response_ptr) |resp| {
- if (resp.body.value == .Locked) {
- resp.body.value.Locked.readable.?.done();
- resp.body.value = .{ .Used = {} };
- }
- }
-
- if (req.aborted) {
- req.finalizeForAbort();
- return;
- }
-
- const responded = req.resp.hasResponded();
-
- if (!responded and !wrote_anything) {
- req.resp.clearAborted();
- req.renderMissing();
- return;
- } else if (!responded and wrote_anything and !req.aborted) {
- req.resp.clearAborted();
- req.resp.endStream(false);
- }
-
- req.finalize();
- }
- pub fn onReject(req: *RequestContext, globalThis: *JSGlobalObject, args: []const JSC.JSValue) void {
- var wrote_anything = req.has_written_status;
-
- if (req.sink) |wrapper| {
- wrapper.sink.pending_flush = null;
- wrapper.sink.done = true;
- wrote_anything = wrote_anything or wrapper.sink.wrote > 0;
- req.aborted = req.aborted or wrapper.sink.aborted;
- wrapper.sink.finalize();
- wrapper.detach();
- req.sink = null;
- wrapper.sink.destroy();
- }
-
- if (req.response_ptr) |resp| {
- if (resp.body.value == .Locked) {
- resp.body.value.Locked.readable.?.done();
- resp.body.value = .{ .Used = {} };
- }
- }
-
- streamLog("onReject({s})", .{wrote_anything});
-
- if (req.aborted) {
- req.finalizeForAbort();
- return;
- }
-
- if (args.len > 0 and !wrote_anything) {
- req.response_jsvalue.unprotect();
- req.response_jsvalue = JSValue.zero;
- req.handleReject(args[0]);
- return;
- } else if (wrote_anything) {
- req.resp.endStream(true);
- if (comptime debug_mode) {
- if (args.len > 0) {
- var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
- defer exception_list.deinit();
- req.server.vm.runErrorHandler(args[0], &exception_list);
- }
- }
- req.finalize();
- return;
- }
-
- const fallback = JSC.SystemError{
- .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))),
- .message = ZigString.init("Unhandled error in ReadableStream"),
- };
- req.handleReject(fallback.toErrorInstance(globalThis));
- }
- };
-
- streamLog("returned a promise", .{});
- switch (promise.status(this.server.globalThis.vm())) {
- .Pending => {
- // TODO: should this timeout?
- this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
- this.response_ptr.?.body.value = .{
- .Locked = .{
- .readable = stream,
- .global = this.server.globalThis,
- },
- };
- assignment_result.then(
- this.server.globalThis,
- RequestContext,
- this,
- AwaitPromise.onResolve,
- AwaitPromise.onReject,
- );
- // the response_stream should be GC'd
-
- },
- .Fulfilled => {
- AwaitPromise.onResolve(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())});
- },
- .Rejected => {
- AwaitPromise.onReject(this, this.server.globalThis, &.{promise.result(this.server.globalThis.vm())});
- },
- }
- return;
- }
- }
-
- if (this.aborted) {
- response_stream.detach();
- stream.cancel(this.server.globalThis);
- response_stream.sink.done = true;
- this.finalizeForAbort();
-
- response_stream.sink.finalize();
- stream.value.unprotect();
- return;
- }
-
- stream.value.ensureStillAlive();
-
- if (!stream.isLocked(this.server.globalThis)) {
- streamLog("is not locked", .{});
- this.renderMissing();
- return;
- }
-
- this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
- streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
- this.finalize();
- stream.value.unprotect();
+ var pair = StreamPair{ .stream = stream, .this = this };
+ this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair);
return;
},
}