diff options
author | 2022-02-01 18:51:25 -0800 | |
---|---|---|
committer | 2022-02-01 18:51:25 -0800 | |
commit | 170e58a99db61536c345ce2dd41693aea57cd359 (patch) | |
tree | 3f82ba8294c445c9e3ab83c2f07a1ae045e1592e | |
parent | f3c4bfcbaeb8a0ea08ceed7c46a7c27150de71c7 (diff) | |
download | bun-170e58a99db61536c345ce2dd41693aea57cd359.tar.gz bun-170e58a99db61536c345ce2dd41693aea57cd359.tar.zst bun-170e58a99db61536c345ce2dd41693aea57cd359.zip |
Fix biggest issue with HTTPS client!
-rw-r--r-- | src/http/async_bio.zig | 131 |
1 files changed, 70 insertions, 61 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index 6c14c8ed8..7ac9bc92d 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -15,20 +15,27 @@ const fail = -3; const connection_closed = -2; const pending = -1; const OK = 0; +const ObjectPool = @import("../pool.zig").ObjectPool; + +const Packet = struct { + completion: Completion, + min: u32 = 0, + owned_slice: []u8 = &[_]u8{}, + + pub const Pool = ObjectPool(Packet, null, false); +}; bio: *boring.BIO = undefined, socket_fd: std.os.socket_t = 0, allocator: std.mem.Allocator, -read_wait: Wait = Wait.pending, -send_wait: Wait = Wait.pending, +pending_reads: u32 = 0, +pending_sends: u32 = 0, recv_buffer: ?*BufferPool.Node = null, -recv_completion: Completion = undefined, send_buffer: ?*BufferPool.Node = null, -send_completion: Completion = undefined, write_error: c_int = 0, socket_recv_len: c_int = 0, @@ -89,8 +96,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO { if (head) |head_| { var next = head_.next; var ret = head_; - ret.read_wait = .pending; - ret.send_wait = .pending; head = next; return ret; @@ -99,8 +104,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO { var bio = allocator.create(AsyncBIO) catch unreachable; bio.* = AsyncBIO{ .allocator = allocator, - .read_wait = .pending, - .send_wait = .pending, }; return bio; @@ -111,8 +114,6 @@ pub fn release(this: *AsyncBIO) void { this.next = head_; } - this.read_wait = .pending; - this.send_wait = .pending; this.socket_send_len = 0; this.socket_recv_len = 0; this.bio_write_offset = 0; @@ -154,7 +155,10 @@ const WaitResult = enum { send, }; -pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void { +pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.RecvError!usize) void { + var ctx = @fieldParentPtr(Packet.Pool.Node, "data", @fieldParentPtr(Packet, "completion", completion)); + ctx.release(); + const socket_recv_len = @intCast( c_int, result_ catch |err| { @@ -169,10 +173,10 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError! Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); Output.flush(); } + if (socket_recv_len == 0) { this.socket_recv_eof = true; } - this.read_wait = .pending; // if (socket_recv_len == 0) { @@ -184,26 +188,28 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError! // this.scheduleSocketRead(); } -pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void { +pub fn doSocketWrite(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.SendError!usize) void { + var ctx = @fieldParentPtr(Packet.Pool.Node, "data", @fieldParentPtr(Packet, "completion", completion)); + defer ctx.release(); + const socket_send_len = @truncate( u32, result_ catch |err| { this.socket_send_error = err; - this.send_wait = .pending; this.onSocketWriteComplete(); return; }, ); this.socket_send_len += socket_send_len; - if (socket_send_len == 0 or this.writeBuf().len == 0) { - this.send_wait = .pending; + const remain = ctx.data.min - @minimum(ctx.data.min, socket_send_len); + + if (socket_send_len == 0 or remain == 0) { this.onSocketWriteComplete(); return; } - this.send_wait = .pending; - this.scheduleSocketWrite(); + this.scheduleSocketWrite(completion.operation.slice()[remain..]); } fn onSocketReadComplete(this: *AsyncBIO) void { @@ -213,49 +219,39 @@ fn onSocketReadComplete(this: *AsyncBIO) void { } inline fn readBuf(this: *AsyncBIO) []u8 { - return this.recv_buffer.?.data[this.bio_read_offset..]; -} - -inline fn writeBuf(this: *AsyncBIO) []const u8 { - return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset]; + return this.recv_buffer.?.data[@intCast(u32, this.socket_recv_len)..]; } pub fn hasPendingReadData(this: *AsyncBIO) bool { - return this.socket_recv_len > 0; + return this.socket_recv_len - @intCast(c_int, this.bio_read_offset) > 0; } -pub fn scheduleSocketRead(this: *AsyncBIO) void { - this.read_wait = .suspended; +pub fn scheduleSocketRead(this: *AsyncBIO, min: u32) void { if (this.recv_buffer == null) { this.recv_buffer = BufferPool.get(getAllocator()); } - AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf()); -} + var packet = Packet.Pool.get(getAllocator()); + packet.data.min = @truncate(u32, min); -pub fn scheduleSocketWrite(this: *AsyncBIO) void { - this.send_wait = .suspended; + AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &packet.data.completion, this.socket_fd, this.readBuf()); +} - AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS); +pub fn scheduleSocketWrite(this: *AsyncBIO, buf: []const u8) void { + var packet = Packet.Pool.get(getAllocator()); + packet.data.min = @truncate(u32, buf.len); + AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &packet.data.completion, this.socket_fd, buf, SOCKET_FLAGS); } fn handleSocketReadComplete( this: *AsyncBIO, ) void { - this.read_wait = .pending; - - if (this.socket_recv_len <= 0) { - if (this.recv_buffer) |buf| { - buf.release(); - this.recv_buffer = null; - } - } + this.pending_reads -|= 1; } pub fn onSocketWriteComplete( this: *AsyncBIO, ) void { - assert(this.send_wait == .pending); this.handleSocketWriteComplete(); this.nextFrame(); } @@ -263,21 +259,23 @@ pub fn onSocketWriteComplete( pub fn handleSocketWriteComplete( this: *AsyncBIO, ) void { - this.send_wait = .completed; + this.pending_sends -|= 1; if (extremely_verbose) { Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len}); Output.flush(); } - // if (this.send_buffer) |buf| { - // buf.release(); - // this.send_buffer = null; + if (this.pending_sends == 0) { + if (this.send_buffer) |buf| { + buf.release(); + this.send_buffer = null; - // // this might be incorrect! - // this.bio_write_offset = 0; - // this.socket_send_len = 0; - // } + // this might be incorrect! + this.bio_write_offset = 0; + this.socket_send_len = 0; + } + } } pub const Bio = struct { @@ -320,6 +318,9 @@ pub const Bio = struct { return -1; } + const remaining_in_send_buffer = buffer_pool_len - this.bio_write_offset; + const total_remaining = remaining_in_send_buffer - @minimum(remaining_in_send_buffer, len); + if (this.send_buffer == null) { this.send_buffer = BufferPool.get(getAllocator()); } @@ -335,7 +336,8 @@ pub const Bio = struct { @memcpy(data.ptr, ptr, to_copy); this.bio_write_offset += to_copy; - this.scheduleSocketWrite(); + this.scheduleSocketWrite(data[0..to_copy]); + this.pending_sends += 1; return @intCast(c_int, to_copy); } @@ -382,17 +384,15 @@ pub const Bio = struct { // overreading, but issuing one is more efficient. SSL sockets are not // reused after shutdown for non-SSL traffic, so overreading is fine. assert(bio_read_offset == 0); - this.scheduleSocketRead(); + if (this.pending_reads == 0) { + this.scheduleSocketRead(len__); + this.pending_reads += 1; + } boring.BIO_set_retry_read(this_bio); return pending; } - if (socket_recv_len == pending) { - boring.BIO_set_retry_read(this_bio); - return -1; - } - // If the last Read() failed, report the error. if (socket_recv_len < 0) { if (extremely_verbose) Output.prettyErrorln("Unexpected ssl error: {d}", .{socket_recv_len}); @@ -402,16 +402,25 @@ pub const Bio = struct { const socket_recv_len_ = @intCast(u32, socket_recv_len); // Report the result of the last Read() if non-empty. - if (!(bio_read_offset < socket_recv_len_)) { - return 0; - } const len = @minimum(len__, socket_recv_len_ - bio_read_offset); - var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr); - @memcpy(ptr, data, len); + var bytes = this.recv_buffer.?.data[bio_read_offset..socket_recv_len_]; + + if (len__ > @truncate(u32, bytes.len)) { + if (this.pending_reads == 0) { + this.pending_reads += 1; + this.scheduleSocketRead(len); + } + + boring.BIO_set_retry_read(this_bio); + return -1; + } + @memcpy(ptr, bytes.ptr, len); bio_read_offset += len; - if (bio_read_offset == socket_recv_len_) { + if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0) { // The read buffer is empty. + // we can reset the pointer back to the beginning of the buffer + // if there is more data to read, we will ask for another bio_read_offset = 0; socket_recv_len = 0; |