aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api/server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r--src/bun.js/api/server.zig275
1 files changed, 254 insertions, 21 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index cb3c4387a..35abc0a7f 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -503,7 +503,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const App = uws.NewApp(ssl_enabled);
pub threadlocal var pool: ?*RequestContext.RequestContextStackAllocator = null;
pub threadlocal var pool_allocator: std.mem.Allocator = undefined;
-
+ pub const ResponseStream = JSC.WebCore.HTTPServerWritable(ssl_enabled);
pub const RequestContextStackAllocator = NewRequestContextStackAllocator(RequestContext, 2048);
pub const name = "HTTPRequestContext" ++ (if (debug_mode) "Debug" else "") ++ (if (ThisServer.ssl_enabled) "TLS" else "");
pub const shim = JSC.Shimmer("Bun", name, @This());
@@ -537,6 +537,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
request_js_object: JSC.C.JSObjectRef = null,
request_body_buf: std.ArrayListUnmanaged(u8) = .{},
+ has_written_status: bool = false,
+
/// Used either for temporary blob data or fallback
/// When the response body is a temporary value
response_buf_owned: std.ArrayListUnmanaged(u8) = .{},
@@ -628,11 +630,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn renderMissing(ctx: *RequestContext) void {
if (comptime !debug_mode) {
- ctx.resp.writeStatus("204 No Content");
+ if (!ctx.has_written_status)
+ ctx.resp.writeStatus("204 No Content");
+ ctx.has_written_status = true;
ctx.resp.endWithoutBody();
ctx.finalize();
} else {
- ctx.resp.writeStatus("200 OK");
+ if (!ctx.has_written_status)
+ ctx.resp.writeStatus("200 OK");
+ ctx.has_written_status = true;
ctx.resp.end("Welcome to Bun! To get started, return a Response object.", false);
ctx.finalize();
}
@@ -646,8 +652,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
comptime fmt: string,
args: anytype,
) void {
- this.resp.writeStatus("500 Internal Server Error");
- this.resp.writeHeader("content-type", MimeType.html.value);
+ if (!this.has_written_status) {
+ this.has_written_status = true;
+
+ this.resp.writeStatus("500 Internal Server Error");
+ this.resp.writeHeader("content-type", MimeType.html.value);
+ }
const allocator = this.allocator;
@@ -1122,6 +1132,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
doRenderWithBody(bun.cast(*RequestContext, this), value);
}
+ fn renderWithBlobFromBodyValue(this: *RequestContext) void {
+ if (this.aborted) {
+ this.finalizeForAbort();
+ return;
+ }
+
+ if (this.blob.needsToReadFile()) {
+ this.req.setYield(false);
+ if (!this.has_sendfile_ctx)
+ this.doSendfile(this.blob);
+ return;
+ }
+
+ this.doRenderBlob();
+ }
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
switch (value.*) {
.Error => {
@@ -1136,23 +1161,216 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.Blob => {
this.blob = value.use();
-
+ this.renderWithBlobFromBodyValue();
+ return;
+ },
+ .Locked => |*lock| {
if (this.aborted) {
this.finalizeForAbort();
return;
}
- if (this.blob.needsToReadFile()) {
- this.req.setYield(false);
- if (!this.has_sendfile_ctx)
- this.doSendfile(this.blob);
- return;
- }
- },
- // TODO: this needs to support streaming!
- .Locked => |*lock| {
lock.callback = doRenderWithBodyLocked;
lock.task = this;
+ if (lock.readable) |stream_| {
+ const stream: JSC.WebCore.ReadableStream = stream_;
+ stream.value.ensureStillAlive();
+ value.* = .{ .Used = {} };
+
+ if (stream.isLocked(this.server.globalThis)) {
+ Output.debug("response_stream was locked but it shouldn't be", .{});
+ var err = JSC.SystemError{
+ .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE))),
+ .message = ZigString.init("Stream already used, please create a new one"),
+ };
+
+ this.runErrorHandler(err.toErrorInstance(this.server.globalThis));
+ return;
+ }
+
+ switch (stream.ptr) {
+ .Invalid => {},
+
+ // fast path for Blob
+ .Blob => |val| {
+ Output.debug("response_stream was Blob", .{});
+ this.blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis);
+ this.blob.offset = val.offset;
+ this.blob.size = val.remain;
+
+ val.store.ref();
+ stream.detach(this.server.globalThis);
+ val.deinit();
+ this.renderWithBlobFromBodyValue();
+ return;
+ },
+
+ // fast path for File
+ .File => |val| {
+ Output.debug("response_stream was File Blob", .{});
+ this.blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis);
+ val.store.ref();
+
+ // it should be lazy, file shouldn't have opened yet.
+ std.debug.assert(!val.started);
+
+ stream.detach(this.server.globalThis);
+ val.deinit();
+ this.renderWithBlobFromBodyValue();
+ return;
+ },
+
+ .JavaScript, .Direct => {
+ if (this.has_abort_handler)
+ this.resp.runCorked(*RequestContext, renderMetadata, this)
+ else
+ this.renderMetadata();
+
+ stream.value.ensureStillAlive();
+ var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
+ response_stream.* = ResponseStream.JSSink{
+ .sink = .{
+ .res = this.resp,
+ .allocator = this.allocator,
+ .buffer = bun.ByteList.init(""),
+ },
+ };
+ var signal = &response_stream.sink.signal;
+ signal.* = ResponseStream.JSSink.SinkSignal.init(JSValue.zero);
+
+ // explicitly set it to a dead pointer
+ // we use this memory address to disable signals being sent
+ signal.clear();
+ std.debug.assert(signal.isDead());
+
+ const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
+ this.server.globalThis,
+ stream.value,
+ response_stream,
+ @ptrCast(**anyopaque, &signal.ptr),
+ );
+
+ // assert that it was updated
+ std.debug.assert(!signal.isDead());
+
+ if (comptime Environment.allow_assert) {
+ if (this.resp.hasResponded()) {
+ Output.debug("response_stream responded", .{});
+ }
+ }
+
+ this.aborted = this.aborted or response_stream.sink.aborted;
+
+ if (assignment_result.isAnyError(this.server.globalThis)) {
+ Output.debug("response_stream returned an error", .{});
+ response_stream.detach();
+ this.allocator.destroy(response_stream);
+ return this.handleReject(assignment_result);
+ }
+
+ if (response_stream.sink.done or
+ // TODO: is there a condition where resp could be freed before done?
+ this.resp.hasResponded())
+ {
+ Output.debug("response_stream is done", .{});
+ response_stream.detach();
+ this.allocator.destroy(response_stream);
+
+ if (!this.resp.hasResponded()) {
+ this.renderMissing();
+ return;
+ }
+
+ this.finalize();
+ return;
+ }
+
+ // it returns a Promise when it goes through ReadableStreamDefaultReader
+ if (assignment_result.asPromise()) |promise| {
+ Output.debug("response_stream returned a promise", .{});
+ switch (promise.status(this.server.globalThis.vm())) {
+ .Pending => {
+ // TODO: should this timeout?
+ this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
+ const AwaitPromise = struct {
+ pub fn onResolve(req: *RequestContext, _: *JSGlobalObject, _: []const JSC.JSValue) void {
+ Output.debug("response_stream promise resolved", .{});
+ if (!req.resp.hasResponded()) {
+ req.renderMissing();
+ return;
+ }
+ req.finalize();
+ }
+ pub fn onReject(req: *RequestContext, globalThis: *JSGlobalObject, args: []const JSC.JSValue) void {
+ Output.debug("response_stream promise rejected", .{});
+ if (args.len > 0) {
+ req.handleReject(args[0]);
+ return;
+ }
+
+ const fallback = JSC.SystemError{
+ .code = ZigString.init(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))),
+ .message = ZigString.init("Unhandled error in ReadableStream"),
+ };
+ req.handleReject(fallback.toErrorInstance(globalThis));
+ }
+ };
+ assignment_result.then(
+ this.server.globalThis,
+ RequestContext,
+ this,
+ AwaitPromise.onResolve,
+ AwaitPromise.onReject,
+ );
+ // the response_stream should be GC'd
+ return;
+ },
+ .Fulfilled => {
+ this.aborted = this.aborted or response_stream.sink.aborted;
+ response_stream.detach();
+
+ this.allocator.destroy(response_stream);
+
+ _ = promise.result(this.server.globalThis.vm());
+ if (!this.resp.hasResponded()) {
+ this.renderMissing();
+ return;
+ }
+ this.finalize();
+ return;
+ },
+ .Rejected => {
+ this.aborted = this.aborted or response_stream.sink.aborted;
+ response_stream.detach();
+ this.allocator.destroy(response_stream);
+
+ this.handleReject(promise.result(this.server.globalThis.vm()));
+ return;
+ },
+ }
+ return;
+ }
+
+ if (this.aborted) {
+ this.finalizeForAbort();
+ return;
+ }
+
+ stream.value.ensureStillAlive();
+
+ if (!stream.isLocked(this.server.globalThis)) {
+ Output.debug("response_stream is not locked", .{});
+ this.renderMissing();
+ return;
+ }
+
+ this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
+ Output.debug("response_stream is in progress, but did not return a Promise. Finalizing request context", .{});
+ this.finalize();
+ return;
+ },
+ }
+ }
return;
},
else => {},
@@ -1182,12 +1400,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn renderProductionError(this: *RequestContext, status: u16) void {
switch (status) {
404 => {
- this.resp.writeStatus("404 Not Found");
+ if (!this.has_written_status) {
+ this.resp.writeStatus("404 Not Found");
+ this.has_written_status = true;
+ }
+
this.resp.endWithoutBody();
},
else => {
- this.resp.writeStatus("500 Internal Server Error");
- this.resp.writeHeader("content-type", "text/plain");
+ if (!this.has_written_status) {
+ this.resp.writeStatus("500 Internal Server Error");
+ this.resp.writeHeader("content-type", "text/plain");
+ this.has_written_status = true;
+ }
+
this.resp.end("Something went wrong!", true);
},
}
@@ -1236,7 +1462,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
error.ExceptionOcurred,
exception_list.toOwnedSlice(),
"<r><red>{s}<r> - <b>{s}<r> failed",
- .{ std.mem.span(@tagName(this.method)), this.url },
+ .{ @as(string, @tagName(this.method)), this.url },
);
} else {
if (status != 404)
@@ -1251,11 +1477,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
var response: *JSC.WebCore.Response = this.response_ptr.?;
var status = response.statusCode();
const size = this.blob.size;
- status = if (status == 200 and size == 0)
+ status = if (status == 200 and size == 0 and !this.blob.isDetached())
204
else
status;
+ std.debug.assert(!this.has_written_status);
+ this.has_written_status = true;
+
this.writeStatus(status);
var needs_content_type = true;
const content_type: MimeType = brk: {
@@ -1284,7 +1513,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
headers_.deref();
}
- if (needs_content_type) {
+ if (needs_content_type and
+ // do not insert the content type if it is the fallback value
+ // we may not know the content-type when streaming
+ (!this.blob.isDetached() or content_type.value.ptr != MimeType.other.value.ptr))
+ {
this.resp.writeHeader("content-type", content_type.value);
}