aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore')
-rw-r--r--src/bun.js/webcore/streams.zig134
1 files changed, 98 insertions, 36 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 9c5ce2c8e..4a10b1062 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -46,6 +46,7 @@ const uws = @import("uws");
const Blob = JSC.WebCore.Blob;
const Response = JSC.WebCore.Response;
const Request = JSC.WebCore.Request;
+const assert = std.debug.assert;
pub const ReadableStream = struct {
value: JSValue,
@@ -882,21 +883,22 @@ pub const ArrayBufferSink = struct {
return .{ .result = {} };
}
- pub fn drain(_: *ArrayBufferSink) JSC.Node.Maybe(void) {
+ pub fn flush(_: *ArrayBufferSink) JSC.Node.Maybe(void) {
return .{ .result = {} };
}
- pub fn drainFromJS(this: *ArrayBufferSink, globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
+ pub fn flushFromJS(this: *ArrayBufferSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
if (this.streaming) {
const value: JSValue = switch (this.as_uint8array) {
true => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .Uint8Array),
false => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .ArrayBuffer),
};
this.bytes.len = 0;
+ if (wait) {}
return .{ .result = value };
}
- return .{ .result = JSValue.jsUndefined() };
+ return .{ .result = JSValue.jsNumber(0) };
}
pub fn finalize(this: *ArrayBufferSink) void {
@@ -1108,6 +1110,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), ptr));
this.sink.finalize();
+ this.detach();
}
pub fn detach(this: *ThisSink) void {
@@ -1244,7 +1247,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
return this.sink.end(null).toJS(globalThis);
}
- pub fn drain(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
+ pub fn flush(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
@@ -1262,11 +1265,14 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
}
}
- if (comptime @hasDecl(SinkType, "drainFromJS")) {
- return this.sink.drainFromJS(globalThis).result;
+ if (comptime @hasDecl(SinkType, "flushFromJS")) {
+ const wait = callframe.argumentsCount() > 0 and
+ callframe.argument(0).isBoolean() and
+ callframe.argument(0).asBoolean();
+ return this.sink.flushFromJS(globalThis, wait).result;
}
- return this.sink.drain().toJS(globalThis);
+ return this.sink.flush().toJS(globalThis);
}
pub fn start(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
@@ -1346,7 +1352,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
.@"finalize" = finalize,
.@"write" = write,
.@"close" = close,
- .@"drain" = drain,
+ .@"flush" = flush,
.@"start" = start,
.@"end" = end,
.@"construct" = construct,
@@ -1358,7 +1364,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
@export(finalize, .{ .name = Export[0].symbol_name });
@export(write, .{ .name = Export[1].symbol_name });
@export(close, .{ .name = Export[2].symbol_name });
- @export(drain, .{ .name = Export[3].symbol_name });
+ @export(flush, .{ .name = Export[3].symbol_name });
@export(start, .{ .name = Export[4].symbol_name });
@export(end, .{ .name = Export[5].symbol_name });
@export(construct, .{ .name = Export[6].symbol_name });
@@ -1414,7 +1420,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
allocator: std.mem.Allocator,
done: bool = false,
signal: Signal = .{},
- pending_drain: ?*JSC.JSPromise = null,
+ pending_flush: ?*JSC.JSPromise = null,
+ wrote_at_start_of_flush: Blob.SizeType = 0,
globalThis: *JSGlobalObject = undefined,
highWaterMark: Blob.SizeType = 2048,
@@ -1488,7 +1495,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return true;
}
- // flush the javascript promise from calling .drain()
+ // flush the javascript promise from calling .flush()
this.flushPromise();
if (this.has_callback) {
@@ -1500,14 +1507,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
resume callback;
}
- // pending_drain or callback could have caused another send()
+ // pending_flush or callback could have caused another send()
// so we check again if we should report readiness
if (!this.done and !this.requested_end and !this.hasBackpressure()) {
const pending = @truncate(Blob.SizeType, available) - to_write;
- const written_after_drain = this.wrote - initial_wrote;
- const to_report = pending - @minimum(written_after_drain, pending);
+ const written_after_flush = this.wrote - initial_wrote;
+ const to_report = pending - @minimum(written_after_flush, pending);
- if ((written_after_drain == initial_wrote and pending == 0) or to_report > 0) {
+ if ((written_after_flush == initial_wrote and pending == 0) or to_report > 0) {
this.signal.ready(to_report, null);
}
}
@@ -1522,11 +1529,17 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return .{ .result = {} };
}
+ this.wrote = 0;
+ this.wrote_at_start_of_flush = 0;
+ this.flushPromise();
+
if (this.buffer.cap == 0) {
std.debug.assert(this.pooled_buffer == null);
- if (ByteListPool.has()) {
- this.pooled_buffer = ByteListPool.get(this.allocator);
- this.buffer = this.pooled_buffer.?.data;
+ if (comptime FeatureFlags.http_buffer_pooling) {
+ if (ByteListPool.has()) {
+ this.pooled_buffer = ByteListPool.get(this.allocator);
+ this.buffer = this.pooled_buffer.?.data;
+ }
}
}
@@ -1547,6 +1560,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.buffer.update(list);
this.done = false;
+
this.signal.start();
log("start({d})", .{this.highWaterMark});
@@ -1554,27 +1568,61 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return .{ .result = {} };
}
- pub fn drainFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
- log("drainFromJS()", .{});
+ fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
+ if (this.hasBackpressure() or this.done) {
+ return .{ .result = JSValue.jsNumberFromInt32(0) };
+ }
- if (!this.hasBackpressure() or this.done) {
+ const slice = this.readableSlice();
+ if (slice.len == 0) {
return .{ .result = JSValue.jsNumberFromInt32(0) };
}
- if (this.pending_drain) |prom| {
+ const success = this.send(slice);
+ if (success) {
+ this.handleWrote(@truncate(Blob.SizeType, slice.len));
+ return .{ .result = JSValue.jsNumber(slice.len) };
+ }
+
+ return .{ .result = JSValue.jsNumberFromInt32(0) };
+ }
+
+ pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
+ log("flushFromJS({s})", .{wait});
+ if (!wait) {
+ return this.flushFromJSNoWait();
+ }
+
+ if (this.pending_flush) |prom| {
return .{ .result = prom.asValue(globalThis) };
}
- this.pending_drain = JSC.JSPromise.create(globalThis);
+ if (this.buffer.len == 0 or this.done) {
+ return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumberFromInt32(0)) };
+ }
+
+ if (!this.hasBackpressure()) {
+ const slice = this.readableSlice();
+ assert(slice.len > 0);
+ const success = this.send(slice);
+ if (success) {
+ this.handleWrote(@truncate(Blob.SizeType, slice.len));
+ return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(slice.len)) };
+ }
+
+ this.res.onWritable(*@This(), onWritable, this);
+ }
+ this.wrote_at_start_of_flush = this.wrote;
+ this.pending_flush = JSC.JSPromise.create(globalThis);
this.globalThis = globalThis;
- var promise_value = this.pending_drain.?.asValue(globalThis);
+ var promise_value = this.pending_flush.?.asValue(globalThis);
promise_value.protect();
return .{ .result = promise_value };
}
- pub fn drain(this: *@This()) JSC.Node.Maybe(void) {
- log("drain()", .{});
+ pub fn flush(this: *@This()) JSC.Node.Maybe(void) {
+ log("flush()", .{});
if (!this.hasBackpressure() or this.done) {
return .{ .result = {} };
}
@@ -1833,14 +1881,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.onWritable(*@This(), onWritable, this);
}
- if (this.pending_drain) |prom| {
- this.pending_drain = null;
+ if (this.pending_flush) |prom| {
+ this.pending_flush = null;
return .{ .result = prom.asValue(globalThis) };
}
- this.pending_drain = JSC.JSPromise.create(globalThis);
+ this.pending_flush = JSC.JSPromise.create(globalThis);
this.globalThis = globalThis;
- const value = this.pending_drain.?.asValue(globalThis);
+ const value = this.pending_flush.?.asValue(globalThis);
value.protect();
return .{ .result = value };
}
@@ -1858,6 +1906,17 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.finalize();
}
+ pub fn destroy(this: *@This()) void {
+ log("destroy()", .{});
+ var bytes = this.buffer.listManaged(this.allocator);
+ if (bytes.capacity > 0) {
+ this.buffer = bun.ByteList.init("");
+ bytes.deinit();
+ }
+
+ this.allocator.destroy(this);
+ }
+
// This can be called _many_ times for the same instance
// so it must zero out state instead of make it
pub fn finalize(this: *@This()) void {
@@ -1868,32 +1927,35 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.endStream(false);
}
+ if (comptime !FeatureFlags.http_buffer_pooling) {
+ assert(this.pooled_buffer == null);
+ }
+
if (this.pooled_buffer) |pooled| {
this.buffer.len = 0;
pooled.data = this.buffer;
this.buffer = bun.ByteList.init("");
this.pooled_buffer = null;
pooled.release();
- } else if (this.buffer.cap == 0) {} else if (!ByteListPool.full()) {
+ } else if (this.buffer.cap == 0) {} else if (FeatureFlags.http_buffer_pooling and !ByteListPool.full()) {
var entry = ByteListPool.get(this.allocator);
entry.data = this.buffer;
this.buffer = bun.ByteList.init("");
entry.release();
} else {
- var bytes = this.buffer.listManaged(this.allocator);
- bytes.deinit();
this.buffer.len = 0;
}
}
pub fn flushPromise(this: *@This()) void {
- if (this.pending_drain) |prom| {
+ if (this.pending_flush) |prom| {
log("flushPromise()", .{});
- this.pending_drain = null;
+ this.pending_flush = null;
const globalThis = this.globalThis;
prom.asValue(globalThis).unprotect();
- prom.resolve(globalThis, JSC.JSValue.jsNumber(0));
+ prom.resolve(globalThis, JSC.JSValue.jsNumber(this.wrote -| this.wrote_at_start_of_flush));
+ this.wrote_at_start_of_flush = this.wrote;
}
}