aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-09-30 19:43:03 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-09-30 19:43:03 -0700
commite4e7966d64cb615cc596711213b72e531cd655fc (patch)
tree84049543b430564927f5a45195f81a11829d8ef3 /src
parent48cb526e0bf2b6ffc9884dc7c54fb03077d5f849 (diff)
downloadbun-e4e7966d64cb615cc596711213b72e531cd655fc.tar.gz
bun-e4e7966d64cb615cc596711213b72e531cd655fc.tar.zst
bun-e4e7966d64cb615cc596711213b72e531cd655fc.zip
Fix failing tests from backpressure
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/webcore/streams.zig223
1 files changed, 91 insertions, 132 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index a71f8b9f7..e39d14c93 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1917,12 +1917,18 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
fn send(this: *@This(), buf: []const u8) bool {
std.debug.assert(!this.done);
defer log("send: {d} bytes (backpressure: {d})", .{ buf.len, this.has_backpressure });
+
if (this.requested_end and !this.res.state().isHttpWriteCalled()) {
const success = this.res.tryEnd(buf, this.end_len, false);
this.has_backpressure = !success;
return success;
}
+ // uWebSockets lacks a tryWrite() function
+ // This means that backpressure will be handled by appending to an "infinite" memory buffer
+ // It will do the backpressure handling for us
+ // so in this scenario, we just append to the buffer
+ // and report success
if (this.requested_end) {
this.res.end(buf, false);
this.has_backpressure = false;
@@ -1930,7 +1936,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
} else {
const backpressure = this.res.write(buf);
this.has_backpressure = backpressure;
-
return true;
}
@@ -2119,47 +2124,40 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const len = @truncate(Blob.SizeType, bytes.len);
log("write({d})", .{bytes.len});
- if (!this.hasBackpressure()) {
- if (this.buffer.len == 0 and len >= this.highWaterMark) {
- // fast path:
- // - large-ish chunk
- // - no backpressure
- if (this.send(bytes)) {
- this.handleWrote(len);
- return .{ .owned = len };
- }
-
- _ = this.buffer.write(this.allocator, bytes) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
- } else if (this.buffer.len + len >= this.highWaterMark) {
- // TODO: attempt to write both in a corked buffer?
- _ = this.buffer.write(this.allocator, bytes) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
- const slice = this.readableSlice();
- if (this.send(slice)) {
- this.handleWrote(slice.len);
- this.buffer.len = 0;
- return .{ .owned = len };
- }
- } else {
- // queue the data
- // do not send it
- _ = this.buffer.write(this.allocator, bytes) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
+ if (this.buffer.len == 0 and len >= this.highWaterMark) {
+ // fast path:
+ // - large-ish chunk
+ // - no backpressure
+ if (this.send(bytes)) {
+ this.handleWrote(len);
return .{ .owned = len };
}
- this.res.onWritable(*@This(), onWritable, this);
+ _ = this.buffer.write(this.allocator, bytes) catch {
+ return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
+ };
+ } else if (this.buffer.len + len >= this.highWaterMark) {
+ // TODO: attempt to write both in a corked buffer?
+ _ = this.buffer.write(this.allocator, bytes) catch {
+ return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
+ };
+ const slice = this.readableSlice();
+ if (this.send(slice)) {
+ this.handleWrote(slice.len);
+ this.buffer.len = 0;
+ return .{ .owned = len };
+ }
} else {
- log("has backpressure", .{});
+ // queue the data
+ // do not send it
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ return .{ .owned = len };
}
+ this.res.onWritable(*@This(), onWritable, this);
+
return .{ .owned = len };
}
pub const writeBytes = write;
@@ -2178,57 +2176,51 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const len = @truncate(Blob.SizeType, bytes.len);
log("writeLatin1({d})", .{bytes.len});
- if (!this.hasBackpressure()) {
- if (this.buffer.len == 0 and len >= this.highWaterMark) {
- var do_send = true;
- // common case
- if (strings.isAllASCII(bytes)) {
- // fast path:
- // - large-ish chunk
- // - no backpressure
- if (this.send(bytes)) {
- this.handleWrote(bytes.len);
- return .{ .owned = len };
- }
- do_send = false;
+ if (this.buffer.len == 0 and len >= this.highWaterMark) {
+ var do_send = true;
+ // common case
+ if (strings.isAllASCII(bytes)) {
+ // fast path:
+ // - large-ish chunk
+ // - no backpressure
+ if (this.send(bytes)) {
+ this.handleWrote(bytes.len);
+ return .{ .owned = len };
}
+ do_send = false;
+ }
- _ = this.buffer.writeLatin1(this.allocator, bytes) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
+ _ = this.buffer.writeLatin1(this.allocator, bytes) catch {
+ return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
+ };
- if (do_send) {
- if (this.send(this.readableSlice())) {
- this.handleWrote(bytes.len);
- return .{ .owned = len };
- }
- }
- } else if (this.buffer.len + len >= this.highWaterMark) {
- // kinda fast path:
- // - combined chunk is large enough to flush automatically
- // - no backpressure
- _ = this.buffer.writeLatin1(this.allocator, bytes) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
- const readable = this.readableSlice();
- if (this.send(readable)) {
- this.handleWrote(readable.len);
+ if (do_send) {
+ if (this.send(this.readableSlice())) {
+ this.handleWrote(bytes.len);
return .{ .owned = len };
}
- } else {
- _ = this.buffer.writeLatin1(this.allocator, bytes) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
+ }
+ } else if (this.buffer.len + len >= this.highWaterMark) {
+ // kinda fast path:
+ // - combined chunk is large enough to flush automatically
+ // - no backpressure
+ _ = this.buffer.writeLatin1(this.allocator, bytes) catch {
+ return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
+ };
+ const readable = this.readableSlice();
+ if (this.send(readable)) {
+ this.handleWrote(readable.len);
return .{ .owned = len };
}
-
- this.res.onWritable(*@This(), onWritable, this);
} else {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ return .{ .owned = len };
}
+ this.res.onWritable(*@This(), onWritable, this);
+
return .{ .owned = len };
}
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
@@ -2246,31 +2238,24 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
log("writeUTF16({d})", .{bytes.len});
- var written: usize = undefined;
- if (!this.hasBackpressure()) {
- // we must always buffer UTF-16
- // we assume the case of all-ascii UTF-16 string is pretty uncommon
- written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
-
- const readable = this.readableSlice();
+ // we must always buffer UTF-16
+ // we assume the case of all-ascii UTF-16 string is pretty uncommon
+ const written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch {
+ return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
+ };
- if (readable.len >= this.highWaterMark) {
- if (this.send(readable)) {
- this.handleWrote(readable.len);
- return .{ .owned = @truncate(Blob.SizeType, written) };
- }
+ const readable = this.readableSlice();
- this.res.onWritable(*@This(), onWritable, this);
+ if (readable.len >= this.highWaterMark or this.hasBackpressure()) {
+ if (this.send(readable)) {
+ this.handleWrote(readable.len);
+ return .{ .owned = @intCast(Blob.SizeType, written) };
}
- } else {
- written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch {
- return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
- };
+
+ this.res.onWritable(*@This(), onWritable, this);
}
- return .{ .owned = @truncate(Blob.SizeType, written) };
+ return .{ .owned = @intCast(Blob.SizeType, written) };
}
// In this case, it's always an error
@@ -2301,19 +2286,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return .{ .result = {} };
}
- if (!this.hasBackpressure()) {
- if (this.send(readable)) {
- this.handleWrote(readable.len);
- this.signal.close(err);
- this.done = true;
- this.res.endStream(false);
- this.finalize();
- return .{ .result = {} };
- }
-
- this.res.onWritable(*@This(), onWritable, this);
- }
-
return .{ .result = {} };
}
@@ -2335,38 +2307,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const readable = this.readableSlice();
this.end_len = readable.len;
- if (readable.len == 0) {
- this.done = true;
- this.res.endStream(false);
- this.signal.close(null);
- const wrote = this.wrote;
- this.finalize();
- return .{ .result = JSC.JSValue.jsNumber(wrote) };
- }
-
- if (!this.hasBackpressure()) {
- if (this.send(readable)) {
- this.handleWrote(readable.len);
- this.signal.close(null);
- this.done = true;
- const wrote = this.wrote;
- this.finalize();
- return .{ .result = JSC.JSValue.jsNumber(wrote) };
+ if (readable.len > 0) {
+ if (!this.send(readable)) {
+ this.pending_flush = JSC.JSPromise.create(globalThis);
+ this.globalThis = globalThis;
+ const value = this.pending_flush.?.asValue(globalThis);
+ value.protect();
+ return .{ .result = value };
}
-
- this.res.onWritable(*@This(), onWritable, this);
+ } else {
+ this.res.end("", false);
}
- if (this.pending_flush) |prom| {
- this.pending_flush = null;
- return .{ .result = prom.asValue(globalThis) };
- }
+ this.done = true;
+ this.flushPromise();
+ this.signal.close(null);
+ this.done = true;
+ this.finalize();
- this.pending_flush = JSC.JSPromise.create(globalThis);
- this.globalThis = globalThis;
- const value = this.pending_flush.?.asValue(globalThis);
- value.protect();
- return .{ .result = value };
+ return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
}
pub fn sink(this: *@This()) Sink {