diff options
author | 2022-09-30 19:43:03 -0700 | |
---|---|---|
committer | 2022-09-30 19:43:03 -0700 | |
commit | e4e7966d64cb615cc596711213b72e531cd655fc (patch) | |
tree | 84049543b430564927f5a45195f81a11829d8ef3 | |
parent | 48cb526e0bf2b6ffc9884dc7c54fb03077d5f849 (diff) | |
download | bun-e4e7966d64cb615cc596711213b72e531cd655fc.tar.gz bun-e4e7966d64cb615cc596711213b72e531cd655fc.tar.zst bun-e4e7966d64cb615cc596711213b72e531cd655fc.zip |
Fix failing tests from backpressure
-rw-r--r-- | src/bun.js/webcore/streams.zig | 223 |
1 files changed, 91 insertions, 132 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index a71f8b9f7..e39d14c93 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1917,12 +1917,18 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { fn send(this: *@This(), buf: []const u8) bool { std.debug.assert(!this.done); defer log("send: {d} bytes (backpressure: {d})", .{ buf.len, this.has_backpressure }); + if (this.requested_end and !this.res.state().isHttpWriteCalled()) { const success = this.res.tryEnd(buf, this.end_len, false); this.has_backpressure = !success; return success; } + // uWebSockets lacks a tryWrite() function + // This means that backpressure will be handled by appending to an "infinite" memory buffer + // It will do the backpressure handling for us + // so in this scenario, we just append to the buffer + // and report success if (this.requested_end) { this.res.end(buf, false); this.has_backpressure = false; @@ -1930,7 +1936,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } else { const backpressure = this.res.write(buf); this.has_backpressure = backpressure; - return true; } @@ -2119,47 +2124,40 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { const len = @truncate(Blob.SizeType, bytes.len); log("write({d})", .{bytes.len}); - if (!this.hasBackpressure()) { - if (this.buffer.len == 0 and len >= this.highWaterMark) { - // fast path: - // - large-ish chunk - // - no backpressure - if (this.send(bytes)) { - this.handleWrote(len); - return .{ .owned = len }; - } - - _ = this.buffer.write(this.allocator, bytes) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; - } else if (this.buffer.len + len >= this.highWaterMark) { - // TODO: attempt to write both in a corked buffer? - _ = this.buffer.write(this.allocator, bytes) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; - const slice = this.readableSlice(); - if (this.send(slice)) { - this.handleWrote(slice.len); - this.buffer.len = 0; - return .{ .owned = len }; - } - } else { - // queue the data - // do not send it - _ = this.buffer.write(this.allocator, bytes) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; + if (this.buffer.len == 0 and len >= this.highWaterMark) { + // fast path: + // - large-ish chunk + // - no backpressure + if (this.send(bytes)) { + this.handleWrote(len); return .{ .owned = len }; } - this.res.onWritable(*@This(), onWritable, this); + _ = this.buffer.write(this.allocator, bytes) catch { + return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; + }; + } else if (this.buffer.len + len >= this.highWaterMark) { + // TODO: attempt to write both in a corked buffer? + _ = this.buffer.write(this.allocator, bytes) catch { + return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; + }; + const slice = this.readableSlice(); + if (this.send(slice)) { + this.handleWrote(slice.len); + this.buffer.len = 0; + return .{ .owned = len }; + } } else { - log("has backpressure", .{}); + // queue the data + // do not send it _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + return .{ .owned = len }; } + this.res.onWritable(*@This(), onWritable, this); + return .{ .owned = len }; } pub const writeBytes = write; @@ -2178,57 +2176,51 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { const len = @truncate(Blob.SizeType, bytes.len); log("writeLatin1({d})", .{bytes.len}); - if (!this.hasBackpressure()) { - if (this.buffer.len == 0 and len >= this.highWaterMark) { - var do_send = true; - // common case - if (strings.isAllASCII(bytes)) { - // fast path: - // - large-ish chunk - // - no backpressure - if (this.send(bytes)) { - this.handleWrote(bytes.len); - return .{ .owned = len }; - } - do_send = false; + if (this.buffer.len == 0 and len >= this.highWaterMark) { + var do_send = true; + // common case + if (strings.isAllASCII(bytes)) { + // fast path: + // - large-ish chunk + // - no backpressure + if (this.send(bytes)) { + this.handleWrote(bytes.len); + return .{ .owned = len }; } + do_send = false; + } - _ = this.buffer.writeLatin1(this.allocator, bytes) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; + _ = this.buffer.writeLatin1(this.allocator, bytes) catch { + return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; + }; - if (do_send) { - if (this.send(this.readableSlice())) { - this.handleWrote(bytes.len); - return .{ .owned = len }; - } - } - } else if (this.buffer.len + len >= this.highWaterMark) { - // kinda fast path: - // - combined chunk is large enough to flush automatically - // - no backpressure - _ = this.buffer.writeLatin1(this.allocator, bytes) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; - const readable = this.readableSlice(); - if (this.send(readable)) { - this.handleWrote(readable.len); + if (do_send) { + if (this.send(this.readableSlice())) { + this.handleWrote(bytes.len); return .{ .owned = len }; } - } else { - _ = this.buffer.writeLatin1(this.allocator, bytes) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; + } + } else if (this.buffer.len + len >= this.highWaterMark) { + // kinda fast path: + // - combined chunk is large enough to flush automatically + // - no backpressure + _ = this.buffer.writeLatin1(this.allocator, bytes) catch { + return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; + }; + const readable = this.readableSlice(); + if (this.send(readable)) { + this.handleWrote(readable.len); return .{ .owned = len }; } - - this.res.onWritable(*@This(), onWritable, this); } else { _ = this.buffer.writeLatin1(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + return .{ .owned = len }; } + this.res.onWritable(*@This(), onWritable, this); + return .{ .owned = len }; } pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable { @@ -2246,31 +2238,24 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { log("writeUTF16({d})", .{bytes.len}); - var written: usize = undefined; - if (!this.hasBackpressure()) { - // we must always buffer UTF-16 - // we assume the case of all-ascii UTF-16 string is pretty uncommon - written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; - - const readable = this.readableSlice(); + // we must always buffer UTF-16 + // we assume the case of all-ascii UTF-16 string is pretty uncommon + const written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch { + return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; + }; - if (readable.len >= this.highWaterMark) { - if (this.send(readable)) { - this.handleWrote(readable.len); - return .{ .owned = @truncate(Blob.SizeType, written) }; - } + const readable = this.readableSlice(); - this.res.onWritable(*@This(), onWritable, this); + if (readable.len >= this.highWaterMark or this.hasBackpressure()) { + if (this.send(readable)) { + this.handleWrote(readable.len); + return .{ .owned = @intCast(Blob.SizeType, written) }; } - } else { - written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch { - return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; - }; + + this.res.onWritable(*@This(), onWritable, this); } - return .{ .owned = @truncate(Blob.SizeType, written) }; + return .{ .owned = @intCast(Blob.SizeType, written) }; } // In this case, it's always an error @@ -2301,19 +2286,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return .{ .result = {} }; } - if (!this.hasBackpressure()) { - if (this.send(readable)) { - this.handleWrote(readable.len); - this.signal.close(err); - this.done = true; - this.res.endStream(false); - this.finalize(); - return .{ .result = {} }; - } - - this.res.onWritable(*@This(), onWritable, this); - } - return .{ .result = {} }; } @@ -2335,38 +2307,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { const readable = this.readableSlice(); this.end_len = readable.len; - if (readable.len == 0) { - this.done = true; - this.res.endStream(false); - this.signal.close(null); - const wrote = this.wrote; - this.finalize(); - return .{ .result = JSC.JSValue.jsNumber(wrote) }; - } - - if (!this.hasBackpressure()) { - if (this.send(readable)) { - this.handleWrote(readable.len); - this.signal.close(null); - this.done = true; - const wrote = this.wrote; - this.finalize(); - return .{ .result = JSC.JSValue.jsNumber(wrote) }; + if (readable.len > 0) { + if (!this.send(readable)) { + this.pending_flush = JSC.JSPromise.create(globalThis); + this.globalThis = globalThis; + const value = this.pending_flush.?.asValue(globalThis); + value.protect(); + return .{ .result = value }; } - - this.res.onWritable(*@This(), onWritable, this); + } else { + this.res.end("", false); } - if (this.pending_flush) |prom| { - this.pending_flush = null; - return .{ .result = prom.asValue(globalThis) }; - } + this.done = true; + this.flushPromise(); + this.signal.close(null); + this.done = true; + this.finalize(); - this.pending_flush = JSC.JSPromise.create(globalThis); - this.globalThis = globalThis; - const value = this.pending_flush.?.asValue(globalThis); - value.protect(); - return .{ .result = value }; + return .{ .result = JSC.JSValue.jsNumber(this.wrote) }; } pub fn sink(this: *@This()) Sink { |