diff options
Diffstat (limited to 'src/http/async_bio.zig')
-rw-r--r-- | src/http/async_bio.zig | 199 |
1 files changed, 158 insertions, 41 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index eadc40b2a..6c14c8ed8 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -10,7 +10,7 @@ const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS; const getAllocator = @import("../http_client_async.zig").getAllocator; const assert = std.debug.assert; const BufferPool = AsyncMessage.BufferPool; - +const buffer_pool_len = AsyncMessage.buffer_pool_len; const fail = -3; const connection_closed = -2; const pending = -1; @@ -32,6 +32,9 @@ send_completion: Completion = undefined, write_error: c_int = 0, socket_recv_len: c_int = 0, +socket_send_len: u32 = 0, +socket_recv_eof: bool = false, +bio_write_offset: u32 = 0, bio_read_offset: u32 = 0, socket_send_error: ?anyerror = null, @@ -39,20 +42,35 @@ socket_recv_error: ?anyerror = null, next: ?*AsyncBIO = null, -pending_frame: PendingFrame = PendingFrame.init(), -pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); +onReady: ?Callback = null, -pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { - this.pending_frame.writeItem(frame) catch {}; -} +pub const Callback = struct { + ctx: *anyopaque, + callback: fn (ctx: *anyopaque) void, -pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { - return this.pending_frame.readItem(); -} + pub inline fn run(this: Callback) void { + this.callback(this.ctx); + } + + pub fn Wrap(comptime Context: type, comptime func: anytype) type { + return struct { + pub fn wrap(context: *anyopaque) void { + func(@ptrCast(*Context, @alignCast(@alignOf(*Context), context))); + } + + pub fn get(ctx: *Context) Callback { + return Callback{ + .ctx = ctx, + .callback = wrap, + }; + } + }; + } +}; pub fn nextFrame(this: *AsyncBIO) void { - if (this.pending_frame.readItem()) |frame| { - resume frame; + if (this.onReady) |ready| { + ready.run(); } } @@ -75,7 +93,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO { ret.send_wait = .pending; head = next; - ret.pending_frame = PendingFrame.init(); return ret; } @@ -96,16 +113,22 @@ pub fn release(this: *AsyncBIO) void { this.read_wait = .pending; this.send_wait = .pending; - this.pending_frame = PendingFrame.init(); + this.socket_send_len = 0; + this.socket_recv_len = 0; + this.bio_write_offset = 0; + this.bio_read_offset = 0; + this.socket_recv_error = null; + this.socket_send_error = null; + this.onReady = null; if (this.recv_buffer) |recv| { recv.release(); this.recv_buffer = null; } - if (this.write_buffer) |write| { + if (this.send_buffer) |write| { write.release(); - this.write_buffer = null; + this.send_buffer = null; } head = this; @@ -132,7 +155,7 @@ const WaitResult = enum { }; pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void { - const socket_recv_len = @truncate( + const socket_recv_len = @intCast( c_int, result_ catch |err| { this.socket_recv_error = err; @@ -142,18 +165,48 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError! }, ); this.socket_recv_len += socket_recv_len; - + if (extremely_verbose) { + Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); + Output.flush(); + } if (socket_recv_len == 0) { - this.onSocketReadComplete(); + this.socket_recv_eof = true; + } + this.read_wait = .pending; + + // if (socket_recv_len == 0) { + + this.onSocketReadComplete(); + // return; + // } + + // this.read_wait = .pending; + // this.scheduleSocketRead(); +} + +pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void { + const socket_send_len = @truncate( + u32, + result_ catch |err| { + this.socket_send_error = err; + this.send_wait = .pending; + this.onSocketWriteComplete(); + return; + }, + ); + this.socket_send_len += socket_send_len; + + if (socket_send_len == 0 or this.writeBuf().len == 0) { + this.send_wait = .pending; + this.onSocketWriteComplete(); return; } - this.read_wait = .pending; - this.scheduleSocketRead(); + this.send_wait = .pending; + this.scheduleSocketWrite(); } fn onSocketReadComplete(this: *AsyncBIO) void { - assert(this.read_wait == .suspended); this.handleSocketReadComplete(); this.nextFrame(); @@ -163,17 +216,33 @@ inline fn readBuf(this: *AsyncBIO) []u8 { return this.recv_buffer.?.data[this.bio_read_offset..]; } +inline fn writeBuf(this: *AsyncBIO) []const u8 { + return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset]; +} + +pub fn hasPendingReadData(this: *AsyncBIO) bool { + return this.socket_recv_len > 0; +} + pub fn scheduleSocketRead(this: *AsyncBIO) void { - assert(this.read_wait == .pending); this.read_wait = .suspended; + if (this.recv_buffer == null) { + this.recv_buffer = BufferPool.get(getAllocator()); + } + + AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf()); +} - AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf()); +pub fn scheduleSocketWrite(this: *AsyncBIO) void { + this.send_wait = .suspended; + + AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS); } fn handleSocketReadComplete( this: *AsyncBIO, ) void { - this.read_wait = .completed; + this.read_wait = .pending; if (this.socket_recv_len <= 0) { if (this.recv_buffer) |buf| { @@ -183,19 +252,31 @@ fn handleSocketReadComplete( } } -pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { +pub fn onSocketWriteComplete( + this: *AsyncBIO, +) void { assert(this.send_wait == .pending); - this.handleSocketWriteComplete(result); + this.handleSocketWriteComplete(); this.nextFrame(); } -pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void { - // this.last_socket_recv_len = result; - // this.read_wait = .completed; - // if (extremely_verbose) { - // const socket_recv_len = result catch @as(usize, 999); - // Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); - // Output.flush(); +pub fn handleSocketWriteComplete( + this: *AsyncBIO, +) void { + this.send_wait = .completed; + + if (extremely_verbose) { + Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len}); + Output.flush(); + } + + // if (this.send_buffer) |buf| { + // buf.release(); + // this.send_buffer = null; + + // // this might be incorrect! + // this.bio_write_offset = 0; + // this.socket_send_len = 0; // } } @@ -218,11 +299,18 @@ pub const Bio = struct { return 0; } - pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { - if (len < 0) return len; + pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len_: c_int) callconv(.C) c_int { + if (len_ < 0) return len_; assert(@ptrToInt(ptr) > 0); - boring.BIO_clear_retry_flags(this_bio); + { + const retry_flags = boring.BIO_get_retry_flags(this_bio); + boring.BIO_clear_retry_flags(this_bio); + if ((retry_flags & boring.BIO_FLAGS_READ) != 0) { + boring.BIO_set_retry_read(this_bio); + } + } var this = cast(this_bio); + const len = @intCast(u32, len_); if (this.socket_send_error != null) { if (extremely_verbose) { @@ -231,14 +319,39 @@ pub const Bio = struct { } return -1; } + + if (this.send_buffer == null) { + this.send_buffer = BufferPool.get(getAllocator()); + } + + var data = this.send_buffer.?.data[this.bio_write_offset..]; + const to_copy = @minimum(len, @intCast(u32, data.len)); + + if (to_copy == 0) { + boring.BIO_set_retry_write(this_bio); + return -1; + } + + @memcpy(data.ptr, ptr, to_copy); + this.bio_write_offset += to_copy; + + this.scheduleSocketWrite(); + + return @intCast(c_int, to_copy); } pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int { if (len_ < 0) return len_; const len__: u32 = @intCast(u32, len_); assert(@ptrToInt(ptr) > 0); + { + const retry_flags = boring.BIO_get_retry_flags(this_bio); + boring.BIO_clear_retry_flags(this_bio); + if ((retry_flags & boring.BIO_FLAGS_WRITE) != 0) { + boring.BIO_set_retry_write(this_bio); + } + } - boring.BIO_clear_retry_flags(this_bio); var this = cast(this_bio); var socket_recv_len = this.socket_recv_len; @@ -258,7 +371,7 @@ pub const Bio = struct { // error while writing that would otherwise not be reported until the // application attempted to write again - which it may never do. See // https://crbug.com/249848. - if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) { + if ((this.socket_send_error != null) and (socket_recv_len == OK or socket_recv_len == pending)) { return -1; } @@ -270,7 +383,9 @@ pub const Bio = struct { // reused after shutdown for non-SSL traffic, so overreading is fine. assert(bio_read_offset == 0); this.scheduleSocketRead(); - socket_recv_len = pending; + + boring.BIO_set_retry_read(this_bio); + return pending; } if (socket_recv_len == pending) { @@ -287,9 +402,11 @@ pub const Bio = struct { const socket_recv_len_ = @intCast(u32, socket_recv_len); // Report the result of the last Read() if non-empty. - if (!(bio_read_offset < socket_recv_len_)) return 0; + if (!(bio_read_offset < socket_recv_len_)) { + return 0; + } const len = @minimum(len__, socket_recv_len_ - bio_read_offset); - var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]); + var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr); @memcpy(ptr, data, len); bio_read_offset += len; |