aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-27 05:32:46 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-27 05:32:46 -0700
commit628cbc8eb347edbafdacacaa7ff90b41ca104526 (patch)
tree41a29b402bd54b7b2ab268b3061942ee3f749950
parentf66c277e5475399d1eb0102f7de5870ed327a2bc (diff)
downloadbun-628cbc8eb347edbafdacacaa7ff90b41ca104526.tar.gz
bun-628cbc8eb347edbafdacacaa7ff90b41ca104526.tar.zst
bun-628cbc8eb347edbafdacacaa7ff90b41ca104526.zip
Cork streams when possible
-rw-r--r--src/bun.js/api/server.zig29
-rw-r--r--src/bun.js/webcore/streams.zig218
2 files changed, 99 insertions, 148 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 35abc0a7f..1e3e0c91b 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -875,6 +875,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn writeStatus(this: *RequestContext, status: u16) void {
var status_text_buf: [48]u8 = undefined;
+ std.debug.assert(!this.has_written_status);
+ this.has_written_status = true;
if (status == 302) {
this.resp.writeStatus("302 Found");
@@ -1073,7 +1075,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
.socket_fd = if (!this.aborted) this.resp.getNativeHandle() else -999,
};
- this.resp.runCorked(*RequestContext, renderMetadataAndNewline, this);
+ this.resp.runCorkedWithType(*RequestContext, renderMetadataAndNewline, this);
if (this.blob.size == 0) {
this.cleanupAndFinalizeAfterSendfile();
@@ -1221,10 +1223,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.JavaScript, .Direct => {
- if (this.has_abort_handler)
- this.resp.runCorked(*RequestContext, renderMetadata, this)
- else
+ // uWS automatically adds the status line if needed
+ // we want to batch network calls as much as possible
+ if (!(this.response_ptr.?.statusCode() == 200 or this.response_headers == null)) {
this.renderMetadata();
+ }
stream.value.ensureStillAlive();
var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
@@ -1243,11 +1246,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
signal.clear();
std.debug.assert(signal.isDead());
- const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
- this.server.globalThis,
- stream.value,
- response_stream,
- @ptrCast(**anyopaque, &signal.ptr),
+ const assignment_result: JSValue = this.resp.corked(
+ ResponseStream.JSSink.assignToStream,
+ .{
+ this.server.globalThis,
+ stream.value,
+ response_stream,
+ @ptrCast(**anyopaque, &signal.ptr),
+ },
);
// assert that it was updated
@@ -1381,7 +1387,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn doRenderBlob(this: *RequestContext) void {
if (this.has_abort_handler)
- this.resp.runCorked(*RequestContext, renderMetadata, this)
+ this.resp.runCorkedWithType(*RequestContext, renderMetadata, this)
else
this.renderMetadata();
@@ -1482,9 +1488,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
else
status;
- std.debug.assert(!this.has_written_status);
- this.has_written_status = true;
-
this.writeStatus(status);
var needs_content_type = true;
const content_type: MimeType = brk: {
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 5d568e459..654bf4b47 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -837,8 +837,9 @@ pub const ArrayBufferSink = struct {
}
pub fn start(this: *ArrayBufferSink, stream_start: StreamStart) JSC.Node.Maybe(void) {
+ this.bytes.len = 0;
var list = this.bytes.listManaged(this.allocator);
- list.clearAndFree();
+ list.clearRetainingCapacity();
switch (stream_start) {
.ArrayBufferSink => |config| {
@@ -882,6 +883,8 @@ pub const ArrayBufferSink = struct {
this.bytes = bun.ByteList.init("");
this.done = true;
}
+
+ this.allocator.destroy(this);
}
pub fn init(allocator: std.mem.Allocator, next: ?Sink) !*ArrayBufferSink {
@@ -1100,13 +1103,28 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
shim.cppFn("detachPtr", .{ptr});
}
+ fn getThis(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) ?*ThisSink {
+ return @ptrCast(
+ *ThisSink,
+ @alignCast(
+ std.meta.alignment(ThisSink),
+ fromJS(
+ globalThis,
+ callframe.this(),
+ ) orelse return null,
+ ),
+ );
+ }
+
+ fn invalidThis(globalThis: *JSGlobalObject) JSValue {
+ const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
+ globalThis.vm().throwError(globalThis, err);
+ return JSC.JSValue.jsUndefined();
+ }
+
pub fn write(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
- var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
- const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
- globalThis.vm().throwError(globalThis, err);
- return JSC.JSValue.jsUndefined();
- }));
+ var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1153,11 +1171,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn writeString(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
- var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
- const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
- globalThis.vm().throwError(globalThis, err);
- return JSC.JSValue.jsUndefined();
- }));
+ var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1194,11 +1208,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn close(globalThis: *JSGlobalObject, sink_ptr: ?*anyopaque) callconv(.C) JSValue {
JSC.markBinding();
- var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), sink_ptr) orelse {
- const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
- globalThis.vm().throwError(globalThis, err);
- return JSC.JSValue.jsUndefined();
- });
+ var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), sink_ptr orelse return invalidThis(globalThis)));
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1213,11 +1223,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn drain(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
- var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
- const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
- globalThis.vm().throwError(globalThis, err);
- return JSC.JSValue.jsUndefined();
- }));
+ var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1236,11 +1242,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn start(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
- var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
- const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
- globalThis.vm().throwError(globalThis, err);
- return JSC.JSValue.jsUndefined();
- }));
+ var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1273,11 +1275,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn end(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
- var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
- const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
- globalThis.vm().throwError(globalThis, err);
- return JSC.JSValue.jsUndefined();
- }));
+ var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1336,109 +1334,30 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
};
}
-pub fn WritableStreamSink(
- comptime Context: type,
- comptime onStart: ?fn (this: Context) void,
- comptime onWrite: fn (this: Context, bytes: []const u8) JSC.Maybe(Blob.SizeType),
- comptime onAbort: ?fn (this: Context) void,
- comptime onClose: ?fn (this: Context) void,
- comptime deinit: ?fn (this: Context) void,
-) type {
- return struct {
- context: Context,
- closed: bool = false,
- deinited: bool = false,
- pending_err: ?JSC.Node.Syscall.Error = null,
- aborted: bool = false,
-
- abort_signaler: ?*anyopaque = null,
- onAbortCallback: ?fn (?*anyopaque) void = null,
-
- close_signaler: ?*anyopaque = null,
- onCloseCallback: ?fn (?*anyopaque) void = null,
-
- pub const This = @This();
-
- pub fn write(this: *This, bytes: []const u8) JSC.Maybe(Blob.SizeType) {
- if (this.pending_err) |err| {
- this.pending_err = null;
- return .{ .err = err };
- }
-
- if (this.closed or this.aborted or this.deinited) {
- return .{ .result = 0 };
- }
- return onWrite(&this.context, bytes);
- }
-
- pub fn start(this: *This) StreamStart {
- return onStart(&this.context);
- }
-
- pub fn abort(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
-
- this.aborted = true;
- onAbort(&this.context);
- }
-
- pub fn didAbort(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
- this.aborted = true;
-
- if (this.onAbortCallback) |cb| {
- this.onAbortCallback = null;
- cb(this.abort_signaler);
- }
- }
-
- pub fn didClose(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
- this.closed = true;
-
- if (this.onCloseCallback) |cb| {
- this.onCloseCallback = null;
- cb(this.close_signaler);
- }
- }
+// pub fn NetworkSocket(comptime tls: bool) type {
+// return struct {
+// const Socket = uws.NewSocketHandler(tls);
+// const ThisSocket = @This();
- pub fn close(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
+// socket: Socket,
- this.closed = true;
- onClose(this.context);
- }
+// pub fn connect(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
+// JSC.markBinding();
- pub fn deinit(this: *This) void {
- if (this.deinited) {
- return;
- }
- this.deinited = true;
- deinit(this.context);
- }
+// var this = @ptrCast(*ThisSocket, @alignCast(std.meta.alignment(ThisSocket), fromJS(globalThis, callframe.this()) orelse {
+// const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Socket", .{}, globalThis);
+// globalThis.vm().throwError(globalThis, err);
+// return JSC.JSValue.jsUndefined();
+// }));
- pub fn getError(this: *This) ?JSC.Node.Syscall.Error {
- if (this.pending_err) |err| {
- this.pending_err = null;
- return err;
- }
-
- return null;
- }
- };
-}
+// this.socket.connect()
+// }
+// };
+// }
pub fn HTTPServerWritable(comptime ssl: bool) type {
return struct {
- pub const UWSResponse = uws.NewApp(ssl).Response;
+ const UWSResponse = uws.NewApp(ssl).Response;
res: *UWSResponse,
buffer: bun.ByteList,
offset: Blob.SizeType = 0,
@@ -1460,6 +1379,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
end_len: usize = 0,
aborted: bool = false,
+ const log = Output.scoped(.HTTPServerWritable, false);
+
pub fn connect(this: *@This(), signal: Signal) void {
this.signal = signal;
}
@@ -1485,16 +1406,19 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
std.debug.assert(!this.done);
const success = if (!this.requested_end) this.res.write(buf) else this.res.tryEnd(buf, this.end_len);
this.has_backpressure = !success;
+ log("send: {d} bytes ({d})", .{ buf.len, this.has_backpressure });
return success;
}
fn readableSlice(this: *@This()) []const u8 {
- return this.buffer.ptr[this.offset..this.buffer.cap];
+ return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
}
pub fn onWritable(this: *@This(), available: c_ulong, _: *UWSResponse) callconv(.C) bool {
+ log("onWritable ({d})", .{available});
+
if (this.done) {
- this.res.end("", false);
+ this.res.endStream(false);
return false;
}
@@ -1521,6 +1445,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// flush the javascript promise from calling .drain()
if (this.pending_drain) |prom| {
this.pending_drain = null;
+ log("flush promise ({d})", .{readable.len});
+
prom.asValue(this.globalThis).unprotect();
prom.resolve(this.globalThis, JSC.jsNumber(readable.len));
}
@@ -1550,6 +1476,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn start(this: *@This(), _: StreamStart) JSC.Node.Maybe(void) {
+ log("start()", .{});
+
if (this.res.hasResponded()) {
this.done = true;
this.signal.close(null);
@@ -1562,7 +1490,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn drainFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
- if (this.buffer.len == 0 or this.done) {
+ log("drainFromJS()", .{});
+
+ if (!this.hasBackpressure() or this.done) {
return .{ .result = JSValue.jsNumberFromInt32(0) };
}
@@ -1579,7 +1509,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn drain(this: *@This()) JSC.Node.Maybe(void) {
- if (this.buffer.len == 0 or this.done) {
+ log("drain()", .{});
+ if (!this.hasBackpressure() or this.done) {
return .{ .result = {} };
}
@@ -1597,6 +1528,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
const bytes = data.slice();
+
+ log("write({d})", .{bytes.len});
+
if (!this.hasBackpressure()) {
if (this.buffer.len == 0) {
// fast path:
@@ -1646,6 +1580,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
const bytes = data.slice();
+ log("writeLatin1({d})", .{bytes.len});
+
if (!this.hasBackpressure()) {
if (this.buffer.len == 0 and strings.isAllASCII(bytes)) {
// fast path:
@@ -1695,6 +1631,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
const bytes = data.slice();
+
+ log("writeUTF16({d})", .{bytes.len});
+
var written: usize = undefined;
if (!this.hasBackpressure()) {
// we must always buffer UTF-16
@@ -1720,6 +1659,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn end(this: *@This(), err: ?JSC.Node.Syscall.Error) JSC.Node.Maybe(void) {
+ log("end({s})", .{err});
+
if (this.requested_end) {
return .{ .result = {} };
}
@@ -1736,7 +1677,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.done = true;
- this.res.end("", false);
+ this.res.endStream(false);
return .{ .result = {} };
}
@@ -1753,6 +1694,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
+ log("endFromJS()", .{});
+
if (this.requested_end) {
return .{ .result = JSC.JSValue.jsNumber(0) };
}
@@ -1769,10 +1712,10 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.done = true;
- this.res.end("", false);
+ this.res.endStream(false);
this.signal.close(null);
this.done = true;
- return .{ .result = JSC.JSValue.jsNumber(0) };
+ return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
}
if (!this.hasBackpressure()) {
@@ -1803,6 +1746,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn onAborted(this: *@This(), _: *UWSResponse) void {
+ log("onAborted()", .{});
this.signal.close(null);
this.done = true;
this.aborted = true;
@@ -1810,9 +1754,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn finalize(this: *@This()) void {
+ log("finalize()", .{});
+
if (!this.done) {
this.done = true;
- this.res.end("", false);
+ this.res.endStream(false);
}
var bytes = this.buffer.listManaged(this.allocator);
@@ -1823,6 +1769,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flushPromise(this: *@This()) void {
if (this.pending_drain) |prom| {
+ log("flushPromise()", .{});
+
this.pending_drain = null;
prom.asValue(this.globalThis).unprotect();
prom.resolve(this.globalThis, JSC.JSValue.jsNumber(0));