aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/streams.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r--src/bun.js/webcore/streams.zig87
1 files changed, 68 insertions, 19 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 603694f60..1bbd6cdba 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -57,16 +57,19 @@ pub const ReadableStream = struct {
pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding();
+ this.value.unprotect();
ReadableStream__cancel(this.value, globalThis);
}
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding();
+ this.value.unprotect();
ReadableStream__abort(this.value, globalThis);
}
pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding();
+ this.value.unprotect();
ReadableStream__detach(this.value, globalThis);
}
@@ -1115,7 +1118,9 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
if (this.sink.signal.isDead())
return;
this.sink.signal.clear();
- detachPtr(@intToEnum(JSValue, @bitCast(JSC.JSValueReprInt, @ptrToInt(ptr))));
+ const value = @intToEnum(JSValue, @bitCast(JSC.JSValueReprInt, @ptrToInt(ptr)));
+ value.unprotect();
+ detachPtr(value);
}
pub fn detachPtr(ptr: JSValue) callconv(.C) void {
@@ -1251,6 +1256,12 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
}
}
+ defer {
+ if (comptime @hasField(SinkType, "done") and this.sink.done) {
+ callframe.this().unprotect();
+ }
+ }
+
if (comptime @hasDecl(SinkType, "drainFromJS")) {
return this.sink.drainFromJS(globalThis).result;
}
@@ -1303,6 +1314,12 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
}
}
+ defer {
+ if (comptime @hasField(SinkType, "done") and this.sink.done) {
+ callframe.this().unprotect();
+ }
+ }
+
return this.sink.endFromJS(globalThis).toJS(globalThis);
}
@@ -1368,17 +1385,25 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
// globalThis.vm().throwError(globalThis, err);
// return JSC.JSValue.jsUndefined();
// }));
-
-// this.socket.connect()
// }
// };
// }
+// TODO: make this JSGlobalObject local
+// for better security
+const ByteListPool = ObjectPool(
+ bun.ByteList,
+ null,
+ true,
+ 8,
+);
+
pub fn HTTPServerWritable(comptime ssl: bool) type {
return struct {
const UWSResponse = uws.NewApp(ssl).Response;
res: *UWSResponse,
buffer: bun.ByteList,
+ pooled_buffer: ?*ByteListPool.Node = null,
offset: Blob.SizeType = 0,
is_listening_for_abort: bool = false,
@@ -1439,6 +1464,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done) {
this.res.endStream(false);
+ this.finalize();
return false;
}
@@ -1463,13 +1489,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
// flush the javascript promise from calling .drain()
- if (this.pending_drain) |prom| {
- this.pending_drain = null;
- log("flush promise ({d})", .{readable.len});
-
- prom.asValue(this.globalThis).unprotect();
- prom.resolve(this.globalThis, JSC.jsNumber(readable.len));
- }
+ this.flushPromise();
if (this.has_callback) {
this.has_callback = false;
@@ -1502,6 +1522,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return .{ .result = {} };
}
+ if (this.buffer.cap == 0) {
+ std.debug.assert(this.pooled_buffer == null);
+ if (ByteListPool.has()) {
+ this.pooled_buffer = ByteListPool.get(this.allocator);
+ this.buffer = this.pooled_buffer.?.data;
+ }
+ }
+
this.buffer.len = 0;
switch (stream_start) {
@@ -1731,6 +1759,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.signal.close(err);
this.done = true;
+ this.finalize();
return .{ .result = {} };
}
@@ -1739,8 +1768,10 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.end_len = readable.len;
if (readable.len == 0) {
+ this.signal.close(err);
this.done = true;
this.res.endStream(false);
+ this.finalize();
return .{ .result = {} };
}
@@ -1766,6 +1797,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.signal.close(null);
this.done = true;
+ this.finalize();
return .{ .result = JSC.JSValue.jsNumber(0) };
}
@@ -1777,8 +1809,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.done = true;
this.res.endStream(false);
this.signal.close(null);
- this.done = true;
- return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
+ const wrote = this.wrote;
+ this.finalize();
+ return .{ .result = JSC.JSValue.jsNumber(wrote) };
}
if (!this.hasBackpressure()) {
@@ -1786,7 +1819,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.handleWrote(readable.len);
this.signal.close(null);
this.done = true;
- return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
+ const wrote = this.wrote;
+ this.finalize();
+ return .{ .result = JSC.JSValue.jsNumber(wrote) };
}
this.res.onWritable(*@This(), onWritable, this);
@@ -1814,6 +1849,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.done = true;
this.aborted = true;
this.flushPromise();
+ this.finalize();
}
pub fn finalize(this: *@This()) void {
@@ -1824,10 +1860,22 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.endStream(false);
}
- var bytes = this.buffer.listManaged(this.allocator);
- bytes.deinit();
- this.buffer.len = 0;
- this.flushPromise();
+ if (this.pooled_buffer) |pooled| {
+ this.buffer.len = 0;
+ pooled.data = this.buffer;
+ this.buffer = bun.ByteList.init("");
+ this.pooled_buffer = null;
+ pooled.release();
+ } else if (!ByteListPool.full()) {
+ var entry = ByteListPool.get(this.allocator);
+ entry.data = this.buffer;
+ this.buffer = bun.ByteList.init("");
+ entry.release();
+ } else {
+ var bytes = this.buffer.listManaged(this.allocator);
+ bytes.deinit();
+ this.buffer.len = 0;
+ }
}
pub fn flushPromise(this: *@This()) void {
@@ -1835,8 +1883,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
log("flushPromise()", .{});
this.pending_drain = null;
- prom.asValue(this.globalThis).unprotect();
- prom.resolve(this.globalThis, JSC.JSValue.jsNumber(0));
+ const globalThis = this.globalThis;
+ prom.asValue(globalThis).unprotect();
+ prom.resolve(globalThis, JSC.JSValue.jsNumber(0));
}
}