aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-01 18:51:25 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-01 18:51:25 -0800
commit170e58a99db61536c345ce2dd41693aea57cd359 (patch)
tree3f82ba8294c445c9e3ab83c2f07a1ae045e1592e
parentf3c4bfcbaeb8a0ea08ceed7c46a7c27150de71c7 (diff)
downloadbun-170e58a99db61536c345ce2dd41693aea57cd359.tar.gz
bun-170e58a99db61536c345ce2dd41693aea57cd359.tar.zst
bun-170e58a99db61536c345ce2dd41693aea57cd359.zip
Fix biggest issue with HTTPS client!
-rw-r--r--src/http/async_bio.zig131
1 files changed, 70 insertions, 61 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index 6c14c8ed8..7ac9bc92d 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -15,20 +15,27 @@ const fail = -3;
const connection_closed = -2;
const pending = -1;
const OK = 0;
+const ObjectPool = @import("../pool.zig").ObjectPool;
+
+const Packet = struct {
+ completion: Completion,
+ min: u32 = 0,
+ owned_slice: []u8 = &[_]u8{},
+
+ pub const Pool = ObjectPool(Packet, null, false);
+};
bio: *boring.BIO = undefined,
socket_fd: std.os.socket_t = 0,
allocator: std.mem.Allocator,
-read_wait: Wait = Wait.pending,
-send_wait: Wait = Wait.pending,
+pending_reads: u32 = 0,
+pending_sends: u32 = 0,
recv_buffer: ?*BufferPool.Node = null,
-recv_completion: Completion = undefined,
send_buffer: ?*BufferPool.Node = null,
-send_completion: Completion = undefined,
write_error: c_int = 0,
socket_recv_len: c_int = 0,
@@ -89,8 +96,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO {
if (head) |head_| {
var next = head_.next;
var ret = head_;
- ret.read_wait = .pending;
- ret.send_wait = .pending;
head = next;
return ret;
@@ -99,8 +104,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO {
var bio = allocator.create(AsyncBIO) catch unreachable;
bio.* = AsyncBIO{
.allocator = allocator,
- .read_wait = .pending,
- .send_wait = .pending,
};
return bio;
@@ -111,8 +114,6 @@ pub fn release(this: *AsyncBIO) void {
this.next = head_;
}
- this.read_wait = .pending;
- this.send_wait = .pending;
this.socket_send_len = 0;
this.socket_recv_len = 0;
this.bio_write_offset = 0;
@@ -154,7 +155,10 @@ const WaitResult = enum {
send,
};
-pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void {
+pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.RecvError!usize) void {
+ var ctx = @fieldParentPtr(Packet.Pool.Node, "data", @fieldParentPtr(Packet, "completion", completion));
+ ctx.release();
+
const socket_recv_len = @intCast(
c_int,
result_ catch |err| {
@@ -169,10 +173,10 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!
Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
Output.flush();
}
+
if (socket_recv_len == 0) {
this.socket_recv_eof = true;
}
- this.read_wait = .pending;
// if (socket_recv_len == 0) {
@@ -184,26 +188,28 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!
// this.scheduleSocketRead();
}
-pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void {
+pub fn doSocketWrite(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.SendError!usize) void {
+ var ctx = @fieldParentPtr(Packet.Pool.Node, "data", @fieldParentPtr(Packet, "completion", completion));
+ defer ctx.release();
+
const socket_send_len = @truncate(
u32,
result_ catch |err| {
this.socket_send_error = err;
- this.send_wait = .pending;
this.onSocketWriteComplete();
return;
},
);
this.socket_send_len += socket_send_len;
- if (socket_send_len == 0 or this.writeBuf().len == 0) {
- this.send_wait = .pending;
+ const remain = ctx.data.min - @minimum(ctx.data.min, socket_send_len);
+
+ if (socket_send_len == 0 or remain == 0) {
this.onSocketWriteComplete();
return;
}
- this.send_wait = .pending;
- this.scheduleSocketWrite();
+ this.scheduleSocketWrite(completion.operation.slice()[remain..]);
}
fn onSocketReadComplete(this: *AsyncBIO) void {
@@ -213,49 +219,39 @@ fn onSocketReadComplete(this: *AsyncBIO) void {
}
inline fn readBuf(this: *AsyncBIO) []u8 {
- return this.recv_buffer.?.data[this.bio_read_offset..];
-}
-
-inline fn writeBuf(this: *AsyncBIO) []const u8 {
- return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset];
+ return this.recv_buffer.?.data[@intCast(u32, this.socket_recv_len)..];
}
pub fn hasPendingReadData(this: *AsyncBIO) bool {
- return this.socket_recv_len > 0;
+ return this.socket_recv_len - @intCast(c_int, this.bio_read_offset) > 0;
}
-pub fn scheduleSocketRead(this: *AsyncBIO) void {
- this.read_wait = .suspended;
+pub fn scheduleSocketRead(this: *AsyncBIO, min: u32) void {
if (this.recv_buffer == null) {
this.recv_buffer = BufferPool.get(getAllocator());
}
- AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
-}
+ var packet = Packet.Pool.get(getAllocator());
+ packet.data.min = @truncate(u32, min);
-pub fn scheduleSocketWrite(this: *AsyncBIO) void {
- this.send_wait = .suspended;
+ AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &packet.data.completion, this.socket_fd, this.readBuf());
+}
- AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS);
+pub fn scheduleSocketWrite(this: *AsyncBIO, buf: []const u8) void {
+ var packet = Packet.Pool.get(getAllocator());
+ packet.data.min = @truncate(u32, buf.len);
+ AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &packet.data.completion, this.socket_fd, buf, SOCKET_FLAGS);
}
fn handleSocketReadComplete(
this: *AsyncBIO,
) void {
- this.read_wait = .pending;
-
- if (this.socket_recv_len <= 0) {
- if (this.recv_buffer) |buf| {
- buf.release();
- this.recv_buffer = null;
- }
- }
+ this.pending_reads -|= 1;
}
pub fn onSocketWriteComplete(
this: *AsyncBIO,
) void {
- assert(this.send_wait == .pending);
this.handleSocketWriteComplete();
this.nextFrame();
}
@@ -263,21 +259,23 @@ pub fn onSocketWriteComplete(
pub fn handleSocketWriteComplete(
this: *AsyncBIO,
) void {
- this.send_wait = .completed;
+ this.pending_sends -|= 1;
if (extremely_verbose) {
Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len});
Output.flush();
}
- // if (this.send_buffer) |buf| {
- // buf.release();
- // this.send_buffer = null;
+ if (this.pending_sends == 0) {
+ if (this.send_buffer) |buf| {
+ buf.release();
+ this.send_buffer = null;
- // // this might be incorrect!
- // this.bio_write_offset = 0;
- // this.socket_send_len = 0;
- // }
+ // this might be incorrect!
+ this.bio_write_offset = 0;
+ this.socket_send_len = 0;
+ }
+ }
}
pub const Bio = struct {
@@ -320,6 +318,9 @@ pub const Bio = struct {
return -1;
}
+ const remaining_in_send_buffer = buffer_pool_len - this.bio_write_offset;
+ const total_remaining = remaining_in_send_buffer - @minimum(remaining_in_send_buffer, len);
+
if (this.send_buffer == null) {
this.send_buffer = BufferPool.get(getAllocator());
}
@@ -335,7 +336,8 @@ pub const Bio = struct {
@memcpy(data.ptr, ptr, to_copy);
this.bio_write_offset += to_copy;
- this.scheduleSocketWrite();
+ this.scheduleSocketWrite(data[0..to_copy]);
+ this.pending_sends += 1;
return @intCast(c_int, to_copy);
}
@@ -382,17 +384,15 @@ pub const Bio = struct {
// overreading, but issuing one is more efficient. SSL sockets are not
// reused after shutdown for non-SSL traffic, so overreading is fine.
assert(bio_read_offset == 0);
- this.scheduleSocketRead();
+ if (this.pending_reads == 0) {
+ this.scheduleSocketRead(len__);
+ this.pending_reads += 1;
+ }
boring.BIO_set_retry_read(this_bio);
return pending;
}
- if (socket_recv_len == pending) {
- boring.BIO_set_retry_read(this_bio);
- return -1;
- }
-
// If the last Read() failed, report the error.
if (socket_recv_len < 0) {
if (extremely_verbose) Output.prettyErrorln("Unexpected ssl error: {d}", .{socket_recv_len});
@@ -402,16 +402,25 @@ pub const Bio = struct {
const socket_recv_len_ = @intCast(u32, socket_recv_len);
// Report the result of the last Read() if non-empty.
- if (!(bio_read_offset < socket_recv_len_)) {
- return 0;
- }
const len = @minimum(len__, socket_recv_len_ - bio_read_offset);
- var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr);
- @memcpy(ptr, data, len);
+ var bytes = this.recv_buffer.?.data[bio_read_offset..socket_recv_len_];
+
+ if (len__ > @truncate(u32, bytes.len)) {
+ if (this.pending_reads == 0) {
+ this.pending_reads += 1;
+ this.scheduleSocketRead(len);
+ }
+
+ boring.BIO_set_retry_read(this_bio);
+ return -1;
+ }
+ @memcpy(ptr, bytes.ptr, len);
bio_read_offset += len;
- if (bio_read_offset == socket_recv_len_) {
+ if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0) {
// The read buffer is empty.
+ // we can reset the pointer back to the beginning of the buffer
+ // if there is more data to read, we will ask for another
bio_read_offset = 0;
socket_recv_len = 0;