diff options
author | 2022-06-27 05:32:46 -0700 | |
---|---|---|
committer | 2022-06-27 05:32:46 -0700 | |
commit | 628cbc8eb347edbafdacacaa7ff90b41ca104526 (patch) | |
tree | 41a29b402bd54b7b2ab268b3061942ee3f749950 | |
parent | f66c277e5475399d1eb0102f7de5870ed327a2bc (diff) | |
download | bun-628cbc8eb347edbafdacacaa7ff90b41ca104526.tar.gz bun-628cbc8eb347edbafdacacaa7ff90b41ca104526.tar.zst bun-628cbc8eb347edbafdacacaa7ff90b41ca104526.zip |
Cork streams when possible
-rw-r--r-- | src/bun.js/api/server.zig | 29 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 218 |
2 files changed, 99 insertions, 148 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 35abc0a7f..1e3e0c91b 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -875,6 +875,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn writeStatus(this: *RequestContext, status: u16) void { var status_text_buf: [48]u8 = undefined; + std.debug.assert(!this.has_written_status); + this.has_written_status = true; if (status == 302) { this.resp.writeStatus("302 Found"); @@ -1073,7 +1075,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp .socket_fd = if (!this.aborted) this.resp.getNativeHandle() else -999, }; - this.resp.runCorked(*RequestContext, renderMetadataAndNewline, this); + this.resp.runCorkedWithType(*RequestContext, renderMetadataAndNewline, this); if (this.blob.size == 0) { this.cleanupAndFinalizeAfterSendfile(); @@ -1221,10 +1223,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .JavaScript, .Direct => { - if (this.has_abort_handler) - this.resp.runCorked(*RequestContext, renderMetadata, this) - else + // uWS automatically adds the status line if needed + // we want to batch network calls as much as possible + if (!(this.response_ptr.?.statusCode() == 200 or this.response_headers == null)) { this.renderMetadata(); + } stream.value.ensureStillAlive(); var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable; @@ -1243,11 +1246,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp signal.clear(); std.debug.assert(signal.isDead()); - const assignment_result: JSValue = ResponseStream.JSSink.assignToStream( - this.server.globalThis, - stream.value, - response_stream, - @ptrCast(**anyopaque, &signal.ptr), + const assignment_result: JSValue = this.resp.corked( + ResponseStream.JSSink.assignToStream, + .{ + this.server.globalThis, + stream.value, + response_stream, + @ptrCast(**anyopaque, &signal.ptr), + }, ); // assert that it was updated @@ -1381,7 +1387,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn doRenderBlob(this: *RequestContext) void { if (this.has_abort_handler) - this.resp.runCorked(*RequestContext, renderMetadata, this) + this.resp.runCorkedWithType(*RequestContext, renderMetadata, this) else this.renderMetadata(); @@ -1482,9 +1488,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp else status; - std.debug.assert(!this.has_written_status); - this.has_written_status = true; - this.writeStatus(status); var needs_content_type = true; const content_type: MimeType = brk: { diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 5d568e459..654bf4b47 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -837,8 +837,9 @@ pub const ArrayBufferSink = struct { } pub fn start(this: *ArrayBufferSink, stream_start: StreamStart) JSC.Node.Maybe(void) { + this.bytes.len = 0; var list = this.bytes.listManaged(this.allocator); - list.clearAndFree(); + list.clearRetainingCapacity(); switch (stream_start) { .ArrayBufferSink => |config| { @@ -882,6 +883,8 @@ pub const ArrayBufferSink = struct { this.bytes = bun.ByteList.init(""); this.done = true; } + + this.allocator.destroy(this); } pub fn init(allocator: std.mem.Allocator, next: ?Sink) !*ArrayBufferSink { @@ -1100,13 +1103,28 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { shim.cppFn("detachPtr", .{ptr}); } + fn getThis(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) ?*ThisSink { + return @ptrCast( + *ThisSink, + @alignCast( + std.meta.alignment(ThisSink), + fromJS( + globalThis, + callframe.this(), + ) orelse return null, + ), + ); + } + + fn invalidThis(globalThis: *JSGlobalObject) JSValue { + const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); + globalThis.vm().throwError(globalThis, err); + return JSC.JSValue.jsUndefined(); + } + pub fn write(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { JSC.markBinding(); - var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse { - const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); - globalThis.vm().throwError(globalThis, err); - return JSC.JSValue.jsUndefined(); - })); + var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis); if (comptime @hasDecl(SinkType, "getPendingError")) { if (this.sink.getPendingError()) |err| { @@ -1153,11 +1171,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { pub fn writeString(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { JSC.markBinding(); - var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse { - const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); - globalThis.vm().throwError(globalThis, err); - return JSC.JSValue.jsUndefined(); - })); + var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis); if (comptime @hasDecl(SinkType, "getPendingError")) { if (this.sink.getPendingError()) |err| { @@ -1194,11 +1208,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { pub fn close(globalThis: *JSGlobalObject, sink_ptr: ?*anyopaque) callconv(.C) JSValue { JSC.markBinding(); - var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), sink_ptr) orelse { - const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); - globalThis.vm().throwError(globalThis, err); - return JSC.JSValue.jsUndefined(); - }); + var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), sink_ptr orelse return invalidThis(globalThis))); if (comptime @hasDecl(SinkType, "getPendingError")) { if (this.sink.getPendingError()) |err| { @@ -1213,11 +1223,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { pub fn drain(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { JSC.markBinding(); - var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse { - const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); - globalThis.vm().throwError(globalThis, err); - return JSC.JSValue.jsUndefined(); - })); + var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis); if (comptime @hasDecl(SinkType, "getPendingError")) { if (this.sink.getPendingError()) |err| { @@ -1236,11 +1242,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { pub fn start(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { JSC.markBinding(); - var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse { - const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); - globalThis.vm().throwError(globalThis, err); - return JSC.JSValue.jsUndefined(); - })); + var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis); if (comptime @hasDecl(SinkType, "getPendingError")) { if (this.sink.getPendingError()) |err| { @@ -1273,11 +1275,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { pub fn end(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { JSC.markBinding(); - var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse { - const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis); - globalThis.vm().throwError(globalThis, err); - return JSC.JSValue.jsUndefined(); - })); + var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis); if (comptime @hasDecl(SinkType, "getPendingError")) { if (this.sink.getPendingError()) |err| { @@ -1336,109 +1334,30 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { }; } -pub fn WritableStreamSink( - comptime Context: type, - comptime onStart: ?fn (this: Context) void, - comptime onWrite: fn (this: Context, bytes: []const u8) JSC.Maybe(Blob.SizeType), - comptime onAbort: ?fn (this: Context) void, - comptime onClose: ?fn (this: Context) void, - comptime deinit: ?fn (this: Context) void, -) type { - return struct { - context: Context, - closed: bool = false, - deinited: bool = false, - pending_err: ?JSC.Node.Syscall.Error = null, - aborted: bool = false, - - abort_signaler: ?*anyopaque = null, - onAbortCallback: ?fn (?*anyopaque) void = null, - - close_signaler: ?*anyopaque = null, - onCloseCallback: ?fn (?*anyopaque) void = null, - - pub const This = @This(); - - pub fn write(this: *This, bytes: []const u8) JSC.Maybe(Blob.SizeType) { - if (this.pending_err) |err| { - this.pending_err = null; - return .{ .err = err }; - } - - if (this.closed or this.aborted or this.deinited) { - return .{ .result = 0 }; - } - return onWrite(&this.context, bytes); - } - - pub fn start(this: *This) StreamStart { - return onStart(&this.context); - } - - pub fn abort(this: *This) void { - if (this.closed or this.deinited or this.aborted) { - return; - } - - this.aborted = true; - onAbort(&this.context); - } - - pub fn didAbort(this: *This) void { - if (this.closed or this.deinited or this.aborted) { - return; - } - this.aborted = true; - - if (this.onAbortCallback) |cb| { - this.onAbortCallback = null; - cb(this.abort_signaler); - } - } - - pub fn didClose(this: *This) void { - if (this.closed or this.deinited or this.aborted) { - return; - } - this.closed = true; - - if (this.onCloseCallback) |cb| { - this.onCloseCallback = null; - cb(this.close_signaler); - } - } +// pub fn NetworkSocket(comptime tls: bool) type { +// return struct { +// const Socket = uws.NewSocketHandler(tls); +// const ThisSocket = @This(); - pub fn close(this: *This) void { - if (this.closed or this.deinited or this.aborted) { - return; - } +// socket: Socket, - this.closed = true; - onClose(this.context); - } +// pub fn connect(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { +// JSC.markBinding(); - pub fn deinit(this: *This) void { - if (this.deinited) { - return; - } - this.deinited = true; - deinit(this.context); - } +// var this = @ptrCast(*ThisSocket, @alignCast(std.meta.alignment(ThisSocket), fromJS(globalThis, callframe.this()) orelse { +// const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Socket", .{}, globalThis); +// globalThis.vm().throwError(globalThis, err); +// return JSC.JSValue.jsUndefined(); +// })); - pub fn getError(this: *This) ?JSC.Node.Syscall.Error { - if (this.pending_err) |err| { - this.pending_err = null; - return err; - } - - return null; - } - }; -} +// this.socket.connect() +// } +// }; +// } pub fn HTTPServerWritable(comptime ssl: bool) type { return struct { - pub const UWSResponse = uws.NewApp(ssl).Response; + const UWSResponse = uws.NewApp(ssl).Response; res: *UWSResponse, buffer: bun.ByteList, offset: Blob.SizeType = 0, @@ -1460,6 +1379,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { end_len: usize = 0, aborted: bool = false, + const log = Output.scoped(.HTTPServerWritable, false); + pub fn connect(this: *@This(), signal: Signal) void { this.signal = signal; } @@ -1485,16 +1406,19 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { std.debug.assert(!this.done); const success = if (!this.requested_end) this.res.write(buf) else this.res.tryEnd(buf, this.end_len); this.has_backpressure = !success; + log("send: {d} bytes ({d})", .{ buf.len, this.has_backpressure }); return success; } fn readableSlice(this: *@This()) []const u8 { - return this.buffer.ptr[this.offset..this.buffer.cap]; + return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len]; } pub fn onWritable(this: *@This(), available: c_ulong, _: *UWSResponse) callconv(.C) bool { + log("onWritable ({d})", .{available}); + if (this.done) { - this.res.end("", false); + this.res.endStream(false); return false; } @@ -1521,6 +1445,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { // flush the javascript promise from calling .drain() if (this.pending_drain) |prom| { this.pending_drain = null; + log("flush promise ({d})", .{readable.len}); + prom.asValue(this.globalThis).unprotect(); prom.resolve(this.globalThis, JSC.jsNumber(readable.len)); } @@ -1550,6 +1476,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn start(this: *@This(), _: StreamStart) JSC.Node.Maybe(void) { + log("start()", .{}); + if (this.res.hasResponded()) { this.done = true; this.signal.close(null); @@ -1562,7 +1490,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn drainFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { - if (this.buffer.len == 0 or this.done) { + log("drainFromJS()", .{}); + + if (!this.hasBackpressure() or this.done) { return .{ .result = JSValue.jsNumberFromInt32(0) }; } @@ -1579,7 +1509,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn drain(this: *@This()) JSC.Node.Maybe(void) { - if (this.buffer.len == 0 or this.done) { + log("drain()", .{}); + if (!this.hasBackpressure() or this.done) { return .{ .result = {} }; } @@ -1597,6 +1528,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } const bytes = data.slice(); + + log("write({d})", .{bytes.len}); + if (!this.hasBackpressure()) { if (this.buffer.len == 0) { // fast path: @@ -1646,6 +1580,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } const bytes = data.slice(); + log("writeLatin1({d})", .{bytes.len}); + if (!this.hasBackpressure()) { if (this.buffer.len == 0 and strings.isAllASCII(bytes)) { // fast path: @@ -1695,6 +1631,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } const bytes = data.slice(); + + log("writeUTF16({d})", .{bytes.len}); + var written: usize = undefined; if (!this.hasBackpressure()) { // we must always buffer UTF-16 @@ -1720,6 +1659,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn end(this: *@This(), err: ?JSC.Node.Syscall.Error) JSC.Node.Maybe(void) { + log("end({s})", .{err}); + if (this.requested_end) { return .{ .result = {} }; } @@ -1736,7 +1677,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (readable.len == 0) { this.done = true; - this.res.end("", false); + this.res.endStream(false); return .{ .result = {} }; } @@ -1753,6 +1694,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { + log("endFromJS()", .{}); + if (this.requested_end) { return .{ .result = JSC.JSValue.jsNumber(0) }; } @@ -1769,10 +1712,10 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (readable.len == 0) { this.done = true; - this.res.end("", false); + this.res.endStream(false); this.signal.close(null); this.done = true; - return .{ .result = JSC.JSValue.jsNumber(0) }; + return .{ .result = JSC.JSValue.jsNumber(this.wrote) }; } if (!this.hasBackpressure()) { @@ -1803,6 +1746,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn onAborted(this: *@This(), _: *UWSResponse) void { + log("onAborted()", .{}); this.signal.close(null); this.done = true; this.aborted = true; @@ -1810,9 +1754,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } pub fn finalize(this: *@This()) void { + log("finalize()", .{}); + if (!this.done) { this.done = true; - this.res.end("", false); + this.res.endStream(false); } var bytes = this.buffer.listManaged(this.allocator); @@ -1823,6 +1769,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn flushPromise(this: *@This()) void { if (this.pending_drain) |prom| { + log("flushPromise()", .{}); + this.pending_drain = null; prom.asValue(this.globalThis).unprotect(); prom.resolve(this.globalThis, JSC.JSValue.jsNumber(0)); |