aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api/server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r--src/bun.js/api/server.zig293
1 files changed, 269 insertions, 24 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index d7cfbe4c1..939e010f3 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -538,7 +538,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
sendfile: SendfileContext = undefined,
request_js_object: JSC.C.JSObjectRef = null,
request_body_buf: std.ArrayListUnmanaged(u8) = .{},
+ request_body_content_len: usize = 0,
sink: ?*ResponseStream.JSSink = null,
+ byte_stream: ?*JSC.WebCore.ByteStream = null,
has_written_status: bool = false,
@@ -696,20 +698,67 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
this.response_buf_owned = std.ArrayListUnmanaged(u8){ .items = bb.items, .capacity = bb.capacity };
- this.renderResponseBuffer();
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
}
pub fn renderResponseBuffer(this: *RequestContext) void {
this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this);
}
- pub fn onWritableResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool {
+ /// Render a complete response buffer
+ pub fn renderResponseBufferAndMetadata(this: *RequestContext) void {
+ this.renderMetadata();
+
+ if (!this.resp.tryEnd(
+ this.response_buf_owned.items,
+ this.response_buf_owned.items.len,
+ )) {
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
+ this.setAbortHandler();
+ return;
+ }
+
+ this.finalize();
+ }
+
+ /// Drain a partial response buffer
+ pub fn drainResponseBufferAndMetadata(this: *RequestContext) void {
+ this.renderMetadata();
+ this.setAbortHandler();
+
+ _ = this.resp.write(
+ this.response_buf_owned.items,
+ );
+
+ this.response_buf_owned.items.len = 0;
+ }
+
+ pub fn renderResponseBufferAndMetadataCorked(this: *RequestContext) void {
+ this.resp.runCorkedWithType(*RequestContext, renderResponseBufferAndMetadata, this);
+ }
+
+ pub fn drainResponseBufferAndMetadataCorked(this: *RequestContext) void {
+ this.resp.runCorkedWithType(*RequestContext, drainResponseBufferAndMetadata, this);
+ }
+
+ pub fn onWritableResponseBuffer(this: *RequestContext, _: c_ulong, resp: *App.Response) callconv(.C) bool {
+ std.debug.assert(this.resp == resp);
+ if (this.aborted) {
+ this.finalizeForAbort();
+ return false;
+ }
+ resp.end("", false);
+ this.finalize();
+ return false;
+ }
+
+ pub fn onWritableCompleteResponseBuffer(this: *RequestContext, write_offset: c_ulong, resp: *App.Response) callconv(.C) bool {
std.debug.assert(this.resp == resp);
if (this.aborted) {
this.finalizeForAbort();
return false;
}
- return this.sendWritableBytes(this.response_buf_owned.items, write_offset, resp);
+ return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp);
}
pub fn create(this: *RequestContext, server: *ThisServer, req: *uws.Request, resp: *App.Response) void {
@@ -772,11 +821,24 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) {
this.pending_promises_for_abort += 1;
req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
+ } else if (req.body == .Locked and (req.body.Locked.readable != null)) {
+ req.body.Locked.readable.?.abort(this.server.globalThis);
+ req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
+ req.body.Locked.readable = null;
}
req.uws_request = null;
}
}
+ if (this.response_ptr) |response| {
+ if (response.body.value == .Locked) {
+ if (response.body.value.Locked.readable) |*readable| {
+ response.body.value.Locked.readable = null;
+ readable.abort(this.server.globalThis);
+ }
+ }
+ }
+
// then, we reject the response promise
if (this.promise) |promise| {
this.pending_promises_for_abort += 1;
@@ -841,6 +903,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
JSC.C.JSValueUnprotect(this.server.globalThis.ref(), promise.asObjectRef());
}
+
+ if (this.byte_stream) |stream| {
+ this.byte_stream = null;
+ stream.unpipe();
+ }
}
pub fn finalize(this: *RequestContext) void {
this.finalizeWithoutDeinit();
@@ -974,10 +1041,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
var bytes = this.blob.sharedView();
- return this.sendWritableBytes(bytes, write_offset, resp);
+ _ = this.sendWritableBytesForBlob(bytes, write_offset, resp);
+ return true;
}
- pub fn sendWritableBytes(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {
+ pub fn sendWritableBytesForBlob(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {
std.debug.assert(this.resp == resp);
var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..];
@@ -990,6 +1058,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
+ pub fn sendWritableBytesForCompleteResponseBuffer(this: *RequestContext, bytes_: []const u8, write_offset: c_ulong, resp: *App.Response) bool {
+ std.debug.assert(this.resp == resp);
+
+ var bytes = bytes_[@minimum(bytes_.len, @truncate(usize, write_offset))..];
+ if (resp.tryEnd(bytes, bytes_.len)) {
+ this.response_buf_owned.items.len = 0;
+ this.finalize();
+ } else {
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
+ }
+
+ return true;
+ }
+
pub fn onWritableSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool {
return this.onSendfile();
}
@@ -1125,7 +1207,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
} else {
this.blob.size = @truncate(Blob.SizeType, result.result.buf.len);
this.response_buf_owned = .{ .items = result.result.buf, .capacity = result.result.buf.len };
- this.renderResponseBuffer();
+ this.resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
}
}
@@ -1481,6 +1563,41 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.resp.runCorkedWithType(*StreamPair, doRenderStream, &pair);
return;
},
+
+ .Bytes => |byte_stream| {
+ std.debug.assert(byte_stream.pipe.ctx == null);
+ std.debug.assert(this.byte_stream == null);
+
+ stream.detach(this.server.globalThis);
+
+ this.response_buf_owned = byte_stream.buffer.moveToUnmanaged();
+
+ // If we've received the complete body by the time this function is called
+ // we can avoid streaming it and just send it all at once.
+ if (byte_stream.has_received_last_chunk) {
+ this.blob.size = @truncate(Blob.SizeType, this.response_buf_owned.items.len);
+ byte_stream.parent().deinit();
+ this.renderResponseBufferAndMetadataCorked();
+ return;
+ }
+
+ byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this);
+ this.byte_stream = byte_stream;
+
+ // we don't set size here because even if we have a hint
+ // uWebSockets won't let us partially write streaming content
+ this.blob.size = 0;
+
+ // if we've received metadata and part of the body, send everything we can and drain
+ if (this.response_buf_owned.items.len > 0) {
+ this.drainResponseBufferAndMetadataCorked();
+ } else {
+ // if we only have metadata to send, send it now
+ this.resp.runCorkedWithType(*RequestContext, renderMetadata, this);
+ }
+ this.setAbortHandler();
+ return;
+ },
}
}
@@ -1496,6 +1613,42 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.doRenderBlob();
}
+ pub fn onPipe(this: *RequestContext, stream: JSC.WebCore.StreamResult, allocator: std.mem.Allocator) void {
+ var stream_needs_deinit = stream == .owned or stream == .owned_and_done;
+
+ defer {
+ if (stream_needs_deinit) {
+ if (stream.isDone()) {
+ stream.owned_and_done.listManaged(allocator).deinit();
+ } else {
+ stream.owned.listManaged(allocator).deinit();
+ }
+ }
+ }
+
+ if (this.aborted) {
+ this.finalizeForAbort();
+ return;
+ }
+
+ const chunk = stream.slice();
+ // on failure, it will continue to allocate
+ // we can't do buffering ourselves here or it won't work
+ // uSockets will append and manage the buffer
+ // so any write will buffer if the write fails
+ if (this.resp.write(chunk)) {
+ if (stream.isDone()) {
+ this.resp.endStream(false);
+ this.finalize();
+ }
+ } else {
+ // when it's the last one, we just want to know if it's done
+ if (stream.isDone()) {
+ this.resp.onWritable(*RequestContext, onWritableResponseBuffer, this);
+ }
+ }
+ }
+
pub fn doRenderBlob(this: *RequestContext) void {
// We are not corked
// The body is small
@@ -1712,34 +1865,123 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.aborted) return;
- this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body");
+ const request = JSC.JSValue.fromRef(this.request_js_object);
+ var req = request.as(Request) orelse {
+ this.request_body_buf.clearAndFree(this.allocator);
+ return;
+ };
+
+ if (req.body == .Locked) {
+ if (req.body.Locked.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ std.debug.assert(this.request_body_buf.items.len == 0);
+
+ if (!last) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.init(chunk),
+ },
+ bun.default_allocator,
+ );
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.init(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+
+ return;
+ }
+ }
+ }
+
if (last) {
- const request = JSC.JSValue.fromRef(this.request_js_object);
- if (request.as(Request)) |req| {
- request.ensureStillAlive();
- var bytes = this.request_body_buf.toOwnedSlice(this.allocator);
- var old = req.body;
- req.body = .{
- .Blob = if (bytes.len > 0)
- Blob.init(bytes, this.allocator, this.server.globalThis)
- else
- Blob.initEmpty(this.server.globalThis),
- };
- if (old == .Locked)
- old.resolve(&req.body, this.server.globalThis);
- request.unprotect();
+ request.ensureStillAlive();
+ var bytes = this.request_body_buf;
+ var old = req.body;
+ if (bytes.items.len == 0) {
+ req.body = .{ .Empty = {} };
} else {
- this.request_body_buf.clearAndFree(this.allocator);
+ req.body = .{ .InternalBlob = bytes.toManaged(this.allocator) };
}
+
+ if (old == .Locked)
+ old.resolve(&req.body, this.server.globalThis);
+ request.unprotect();
}
+
+ if (this.request_body_buf.capacity == 0) {
+ this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @minimum(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer");
+ }
+
+ this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body");
}
+ pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult {
+ if (this.aborted) {
+ return JSC.WebCore.DrainResult{
+ .aborted = void{},
+ };
+ }
+
+ std.debug.assert(!this.resp.hasResponded());
+
+ // This means we have received part of the body but not the whole thing
+ if (this.request_body_buf.items.len > 0) {
+ var emptied = this.request_body_buf;
+ this.request_body_buf = .{};
+ return .{
+ .owned = .{
+ .list = emptied.toManaged(this.allocator),
+ .size_hint = if (emptied.capacity < max_request_body_preallocate_length)
+ emptied.capacity
+ else
+ 0,
+ },
+ };
+ }
+
+ const content_length = this.req.header("content-length") orelse {
+ return .{
+ .empty = void{},
+ };
+ };
+
+ const len = std.fmt.parseInt(usize, content_length, 10) catch 0;
+ this.request_body_content_len = len;
+
+ if (len == 0) {
+ return JSC.WebCore.DrainResult{
+ .empty = void{},
+ };
+ }
+
+ if (len > this.server.config.max_request_body_size) {
+ this.resp.writeStatus("413 Request Entity Too Large");
+ this.resp.endWithoutBody();
+
+ this.finalize();
+ return JSC.WebCore.DrainResult{
+ .aborted = void{},
+ };
+ }
+
+ this.resp.onData(*RequestContext, onBufferedBodyChunk, this);
+
+ return .{
+ .estimated_size = len,
+ };
+ }
+ const max_request_body_preallocate_length = 1024 * 256;
pub fn onPull(this: *RequestContext) void {
const request = JSC.JSValue.c(this.request_js_object);
request.ensureStillAlive();
if (this.req.header("content-length")) |content_length| {
const len = std.fmt.parseInt(usize, content_length, 10) catch 0;
+ this.request_body_content_len = len;
if (len == 0) {
if (request.as(Request)) |req| {
var old = req.body;
@@ -1766,8 +2008,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.finalize();
return;
}
-
- this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, len) catch @panic("Out of memory while allocating request body buffer");
} else if (this.req.header("transfer-encoding") == null) {
// no content-length
// no transfer-encoding
@@ -1789,6 +2029,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
onPull(bun.cast(*RequestContext, this));
}
+ pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult {
+ return onDrainRequestBody(bun.cast(*RequestContext, this));
+ }
+
pub const Export = shim.exportFunctions(.{
.onResolve = onResolve,
.onReject = onReject,
@@ -2243,6 +2487,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
.task = ctx,
.global = this.globalThis,
.onPull = RequestContext.onPullCallback,
+ .onDrain = RequestContext.onDrainRequestBodyCallback,
},
},
};