diff options
Diffstat (limited to 'src/bun.js/webcore')
-rw-r--r-- | src/bun.js/webcore/streams.zig | 134 |
1 files changed, 98 insertions, 36 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 9c5ce2c8e..4a10b1062 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -46,6 +46,7 @@ const uws = @import("uws"); const Blob = JSC.WebCore.Blob; const Response = JSC.WebCore.Response; const Request = JSC.WebCore.Request; +const assert = std.debug.assert; pub const ReadableStream = struct { value: JSValue, @@ -882,21 +883,22 @@ pub const ArrayBufferSink = struct { return .{ .result = {} }; } - pub fn drain(_: *ArrayBufferSink) JSC.Node.Maybe(void) { + pub fn flush(_: *ArrayBufferSink) JSC.Node.Maybe(void) { return .{ .result = {} }; } - pub fn drainFromJS(this: *ArrayBufferSink, globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { + pub fn flushFromJS(this: *ArrayBufferSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { if (this.streaming) { const value: JSValue = switch (this.as_uint8array) { true => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .Uint8Array), false => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .ArrayBuffer), }; this.bytes.len = 0; + if (wait) {} return .{ .result = value }; } - return .{ .result = JSValue.jsUndefined() }; + return .{ .result = JSValue.jsNumber(0) }; } pub fn finalize(this: *ArrayBufferSink) void { @@ -1108,6 +1110,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), ptr)); this.sink.finalize(); + this.detach(); } pub fn detach(this: *ThisSink) void { @@ -1244,7 +1247,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { return this.sink.end(null).toJS(globalThis); } - pub fn drain(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { + pub fn flush(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { JSC.markBinding(); var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis); @@ -1262,11 +1265,14 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { } } - if (comptime @hasDecl(SinkType, "drainFromJS")) { - return this.sink.drainFromJS(globalThis).result; + if (comptime @hasDecl(SinkType, "flushFromJS")) { + const wait = callframe.argumentsCount() > 0 and + callframe.argument(0).isBoolean() and + callframe.argument(0).asBoolean(); + return this.sink.flushFromJS(globalThis, wait).result; } - return this.sink.drain().toJS(globalThis); + return this.sink.flush().toJS(globalThis); } pub fn start(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue { @@ -1346,7 +1352,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { .@"finalize" = finalize, .@"write" = write, .@"close" = close, - .@"drain" = drain, + .@"flush" = flush, .@"start" = start, .@"end" = end, .@"construct" = construct, @@ -1358,7 +1364,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { @export(finalize, .{ .name = Export[0].symbol_name }); @export(write, .{ .name = Export[1].symbol_name }); @export(close, .{ .name = Export[2].symbol_name }); - @export(drain, .{ .name = Export[3].symbol_name }); + @export(flush, .{ .name = Export[3].symbol_name }); @export(start, .{ .name = Export[4].symbol_name }); @export(end, .{ .name = Export[5].symbol_name }); @export(construct, .{ .name = Export[6].symbol_name }); @@ -1414,7 +1420,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { allocator: std.mem.Allocator, done: bool = false, signal: Signal = .{}, - pending_drain: ?*JSC.JSPromise = null, + pending_flush: ?*JSC.JSPromise = null, + wrote_at_start_of_flush: Blob.SizeType = 0, globalThis: *JSGlobalObject = undefined, highWaterMark: Blob.SizeType = 2048, @@ -1488,7 +1495,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return true; } - // flush the javascript promise from calling .drain() + // flush the javascript promise from calling .flush() this.flushPromise(); if (this.has_callback) { @@ -1500,14 +1507,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { resume callback; } - // pending_drain or callback could have caused another send() + // pending_flush or callback could have caused another send() // so we check again if we should report readiness if (!this.done and !this.requested_end and !this.hasBackpressure()) { const pending = @truncate(Blob.SizeType, available) - to_write; - const written_after_drain = this.wrote - initial_wrote; - const to_report = pending - @minimum(written_after_drain, pending); + const written_after_flush = this.wrote - initial_wrote; + const to_report = pending - @minimum(written_after_flush, pending); - if ((written_after_drain == initial_wrote and pending == 0) or to_report > 0) { + if ((written_after_flush == initial_wrote and pending == 0) or to_report > 0) { this.signal.ready(to_report, null); } } @@ -1522,11 +1529,17 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return .{ .result = {} }; } + this.wrote = 0; + this.wrote_at_start_of_flush = 0; + this.flushPromise(); + if (this.buffer.cap == 0) { std.debug.assert(this.pooled_buffer == null); - if (ByteListPool.has()) { - this.pooled_buffer = ByteListPool.get(this.allocator); - this.buffer = this.pooled_buffer.?.data; + if (comptime FeatureFlags.http_buffer_pooling) { + if (ByteListPool.has()) { + this.pooled_buffer = ByteListPool.get(this.allocator); + this.buffer = this.pooled_buffer.?.data; + } } } @@ -1547,6 +1560,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.buffer.update(list); this.done = false; + this.signal.start(); log("start({d})", .{this.highWaterMark}); @@ -1554,27 +1568,61 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return .{ .result = {} }; } - pub fn drainFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { - log("drainFromJS()", .{}); + fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) { + if (this.hasBackpressure() or this.done) { + return .{ .result = JSValue.jsNumberFromInt32(0) }; + } - if (!this.hasBackpressure() or this.done) { + const slice = this.readableSlice(); + if (slice.len == 0) { return .{ .result = JSValue.jsNumberFromInt32(0) }; } - if (this.pending_drain) |prom| { + const success = this.send(slice); + if (success) { + this.handleWrote(@truncate(Blob.SizeType, slice.len)); + return .{ .result = JSValue.jsNumber(slice.len) }; + } + + return .{ .result = JSValue.jsNumberFromInt32(0) }; + } + + pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { + log("flushFromJS({s})", .{wait}); + if (!wait) { + return this.flushFromJSNoWait(); + } + + if (this.pending_flush) |prom| { return .{ .result = prom.asValue(globalThis) }; } - this.pending_drain = JSC.JSPromise.create(globalThis); + if (this.buffer.len == 0 or this.done) { + return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumberFromInt32(0)) }; + } + + if (!this.hasBackpressure()) { + const slice = this.readableSlice(); + assert(slice.len > 0); + const success = this.send(slice); + if (success) { + this.handleWrote(@truncate(Blob.SizeType, slice.len)); + return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(slice.len)) }; + } + + this.res.onWritable(*@This(), onWritable, this); + } + this.wrote_at_start_of_flush = this.wrote; + this.pending_flush = JSC.JSPromise.create(globalThis); this.globalThis = globalThis; - var promise_value = this.pending_drain.?.asValue(globalThis); + var promise_value = this.pending_flush.?.asValue(globalThis); promise_value.protect(); return .{ .result = promise_value }; } - pub fn drain(this: *@This()) JSC.Node.Maybe(void) { - log("drain()", .{}); + pub fn flush(this: *@This()) JSC.Node.Maybe(void) { + log("flush()", .{}); if (!this.hasBackpressure() or this.done) { return .{ .result = {} }; } @@ -1833,14 +1881,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.res.onWritable(*@This(), onWritable, this); } - if (this.pending_drain) |prom| { - this.pending_drain = null; + if (this.pending_flush) |prom| { + this.pending_flush = null; return .{ .result = prom.asValue(globalThis) }; } - this.pending_drain = JSC.JSPromise.create(globalThis); + this.pending_flush = JSC.JSPromise.create(globalThis); this.globalThis = globalThis; - const value = this.pending_drain.?.asValue(globalThis); + const value = this.pending_flush.?.asValue(globalThis); value.protect(); return .{ .result = value }; } @@ -1858,6 +1906,17 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.finalize(); } + pub fn destroy(this: *@This()) void { + log("destroy()", .{}); + var bytes = this.buffer.listManaged(this.allocator); + if (bytes.capacity > 0) { + this.buffer = bun.ByteList.init(""); + bytes.deinit(); + } + + this.allocator.destroy(this); + } + // This can be called _many_ times for the same instance // so it must zero out state instead of make it pub fn finalize(this: *@This()) void { @@ -1868,32 +1927,35 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.res.endStream(false); } + if (comptime !FeatureFlags.http_buffer_pooling) { + assert(this.pooled_buffer == null); + } + if (this.pooled_buffer) |pooled| { this.buffer.len = 0; pooled.data = this.buffer; this.buffer = bun.ByteList.init(""); this.pooled_buffer = null; pooled.release(); - } else if (this.buffer.cap == 0) {} else if (!ByteListPool.full()) { + } else if (this.buffer.cap == 0) {} else if (FeatureFlags.http_buffer_pooling and !ByteListPool.full()) { var entry = ByteListPool.get(this.allocator); entry.data = this.buffer; this.buffer = bun.ByteList.init(""); entry.release(); } else { - var bytes = this.buffer.listManaged(this.allocator); - bytes.deinit(); this.buffer.len = 0; } } pub fn flushPromise(this: *@This()) void { - if (this.pending_drain) |prom| { + if (this.pending_flush) |prom| { log("flushPromise()", .{}); - this.pending_drain = null; + this.pending_flush = null; const globalThis = this.globalThis; prom.asValue(globalThis).unprotect(); - prom.resolve(globalThis, JSC.JSValue.jsNumber(0)); + prom.resolve(globalThis, JSC.JSValue.jsNumber(this.wrote -| this.wrote_at_start_of_flush)); + this.wrote_at_start_of_flush = this.wrote; } } |