aboutsummaryrefslogtreecommitdiff
path: root/src/http/async_bio.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/http/async_bio.zig')
-rw-r--r--src/http/async_bio.zig199
1 files changed, 158 insertions, 41 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index eadc40b2a..6c14c8ed8 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -10,7 +10,7 @@ const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS;
const getAllocator = @import("../http_client_async.zig").getAllocator;
const assert = std.debug.assert;
const BufferPool = AsyncMessage.BufferPool;
-
+const buffer_pool_len = AsyncMessage.buffer_pool_len;
const fail = -3;
const connection_closed = -2;
const pending = -1;
@@ -32,6 +32,9 @@ send_completion: Completion = undefined,
write_error: c_int = 0,
socket_recv_len: c_int = 0,
+socket_send_len: u32 = 0,
+socket_recv_eof: bool = false,
+bio_write_offset: u32 = 0,
bio_read_offset: u32 = 0,
socket_send_error: ?anyerror = null,
@@ -39,20 +42,35 @@ socket_recv_error: ?anyerror = null,
next: ?*AsyncBIO = null,
-pending_frame: PendingFrame = PendingFrame.init(),
-pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 });
+onReady: ?Callback = null,
-pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void {
- this.pending_frame.writeItem(frame) catch {};
-}
+pub const Callback = struct {
+ ctx: *anyopaque,
+ callback: fn (ctx: *anyopaque) void,
-pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe {
- return this.pending_frame.readItem();
-}
+ pub inline fn run(this: Callback) void {
+ this.callback(this.ctx);
+ }
+
+ pub fn Wrap(comptime Context: type, comptime func: anytype) type {
+ return struct {
+ pub fn wrap(context: *anyopaque) void {
+ func(@ptrCast(*Context, @alignCast(@alignOf(*Context), context)));
+ }
+
+ pub fn get(ctx: *Context) Callback {
+ return Callback{
+ .ctx = ctx,
+ .callback = wrap,
+ };
+ }
+ };
+ }
+};
pub fn nextFrame(this: *AsyncBIO) void {
- if (this.pending_frame.readItem()) |frame| {
- resume frame;
+ if (this.onReady) |ready| {
+ ready.run();
}
}
@@ -75,7 +93,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO {
ret.send_wait = .pending;
head = next;
- ret.pending_frame = PendingFrame.init();
return ret;
}
@@ -96,16 +113,22 @@ pub fn release(this: *AsyncBIO) void {
this.read_wait = .pending;
this.send_wait = .pending;
- this.pending_frame = PendingFrame.init();
+ this.socket_send_len = 0;
+ this.socket_recv_len = 0;
+ this.bio_write_offset = 0;
+ this.bio_read_offset = 0;
+ this.socket_recv_error = null;
+ this.socket_send_error = null;
+ this.onReady = null;
if (this.recv_buffer) |recv| {
recv.release();
this.recv_buffer = null;
}
- if (this.write_buffer) |write| {
+ if (this.send_buffer) |write| {
write.release();
- this.write_buffer = null;
+ this.send_buffer = null;
}
head = this;
@@ -132,7 +155,7 @@ const WaitResult = enum {
};
pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void {
- const socket_recv_len = @truncate(
+ const socket_recv_len = @intCast(
c_int,
result_ catch |err| {
this.socket_recv_error = err;
@@ -142,18 +165,48 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!
},
);
this.socket_recv_len += socket_recv_len;
-
+ if (extremely_verbose) {
+ Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
+ Output.flush();
+ }
if (socket_recv_len == 0) {
- this.onSocketReadComplete();
+ this.socket_recv_eof = true;
+ }
+ this.read_wait = .pending;
+
+ // if (socket_recv_len == 0) {
+
+ this.onSocketReadComplete();
+ // return;
+ // }
+
+ // this.read_wait = .pending;
+ // this.scheduleSocketRead();
+}
+
+pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void {
+ const socket_send_len = @truncate(
+ u32,
+ result_ catch |err| {
+ this.socket_send_error = err;
+ this.send_wait = .pending;
+ this.onSocketWriteComplete();
+ return;
+ },
+ );
+ this.socket_send_len += socket_send_len;
+
+ if (socket_send_len == 0 or this.writeBuf().len == 0) {
+ this.send_wait = .pending;
+ this.onSocketWriteComplete();
return;
}
- this.read_wait = .pending;
- this.scheduleSocketRead();
+ this.send_wait = .pending;
+ this.scheduleSocketWrite();
}
fn onSocketReadComplete(this: *AsyncBIO) void {
- assert(this.read_wait == .suspended);
this.handleSocketReadComplete();
this.nextFrame();
@@ -163,17 +216,33 @@ inline fn readBuf(this: *AsyncBIO) []u8 {
return this.recv_buffer.?.data[this.bio_read_offset..];
}
+inline fn writeBuf(this: *AsyncBIO) []const u8 {
+ return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset];
+}
+
+pub fn hasPendingReadData(this: *AsyncBIO) bool {
+ return this.socket_recv_len > 0;
+}
+
pub fn scheduleSocketRead(this: *AsyncBIO) void {
- assert(this.read_wait == .pending);
this.read_wait = .suspended;
+ if (this.recv_buffer == null) {
+ this.recv_buffer = BufferPool.get(getAllocator());
+ }
+
+ AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
+}
- AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
+pub fn scheduleSocketWrite(this: *AsyncBIO) void {
+ this.send_wait = .suspended;
+
+ AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS);
}
fn handleSocketReadComplete(
this: *AsyncBIO,
) void {
- this.read_wait = .completed;
+ this.read_wait = .pending;
if (this.socket_recv_len <= 0) {
if (this.recv_buffer) |buf| {
@@ -183,19 +252,31 @@ fn handleSocketReadComplete(
}
}
-pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
+pub fn onSocketWriteComplete(
+ this: *AsyncBIO,
+) void {
assert(this.send_wait == .pending);
- this.handleSocketWriteComplete(result);
+ this.handleSocketWriteComplete();
this.nextFrame();
}
-pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void {
- // this.last_socket_recv_len = result;
- // this.read_wait = .completed;
- // if (extremely_verbose) {
- // const socket_recv_len = result catch @as(usize, 999);
- // Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
- // Output.flush();
+pub fn handleSocketWriteComplete(
+ this: *AsyncBIO,
+) void {
+ this.send_wait = .completed;
+
+ if (extremely_verbose) {
+ Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len});
+ Output.flush();
+ }
+
+ // if (this.send_buffer) |buf| {
+ // buf.release();
+ // this.send_buffer = null;
+
+ // // this might be incorrect!
+ // this.bio_write_offset = 0;
+ // this.socket_send_len = 0;
// }
}
@@ -218,11 +299,18 @@ pub const Bio = struct {
return 0;
}
- pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int {
- if (len < 0) return len;
+ pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len_: c_int) callconv(.C) c_int {
+ if (len_ < 0) return len_;
assert(@ptrToInt(ptr) > 0);
- boring.BIO_clear_retry_flags(this_bio);
+ {
+ const retry_flags = boring.BIO_get_retry_flags(this_bio);
+ boring.BIO_clear_retry_flags(this_bio);
+ if ((retry_flags & boring.BIO_FLAGS_READ) != 0) {
+ boring.BIO_set_retry_read(this_bio);
+ }
+ }
var this = cast(this_bio);
+ const len = @intCast(u32, len_);
if (this.socket_send_error != null) {
if (extremely_verbose) {
@@ -231,14 +319,39 @@ pub const Bio = struct {
}
return -1;
}
+
+ if (this.send_buffer == null) {
+ this.send_buffer = BufferPool.get(getAllocator());
+ }
+
+ var data = this.send_buffer.?.data[this.bio_write_offset..];
+ const to_copy = @minimum(len, @intCast(u32, data.len));
+
+ if (to_copy == 0) {
+ boring.BIO_set_retry_write(this_bio);
+ return -1;
+ }
+
+ @memcpy(data.ptr, ptr, to_copy);
+ this.bio_write_offset += to_copy;
+
+ this.scheduleSocketWrite();
+
+ return @intCast(c_int, to_copy);
}
pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int {
if (len_ < 0) return len_;
const len__: u32 = @intCast(u32, len_);
assert(@ptrToInt(ptr) > 0);
+ {
+ const retry_flags = boring.BIO_get_retry_flags(this_bio);
+ boring.BIO_clear_retry_flags(this_bio);
+ if ((retry_flags & boring.BIO_FLAGS_WRITE) != 0) {
+ boring.BIO_set_retry_write(this_bio);
+ }
+ }
- boring.BIO_clear_retry_flags(this_bio);
var this = cast(this_bio);
var socket_recv_len = this.socket_recv_len;
@@ -258,7 +371,7 @@ pub const Bio = struct {
// error while writing that would otherwise not be reported until the
// application attempted to write again - which it may never do. See
// https://crbug.com/249848.
- if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) {
+ if ((this.socket_send_error != null) and (socket_recv_len == OK or socket_recv_len == pending)) {
return -1;
}
@@ -270,7 +383,9 @@ pub const Bio = struct {
// reused after shutdown for non-SSL traffic, so overreading is fine.
assert(bio_read_offset == 0);
this.scheduleSocketRead();
- socket_recv_len = pending;
+
+ boring.BIO_set_retry_read(this_bio);
+ return pending;
}
if (socket_recv_len == pending) {
@@ -287,9 +402,11 @@ pub const Bio = struct {
const socket_recv_len_ = @intCast(u32, socket_recv_len);
// Report the result of the last Read() if non-empty.
- if (!(bio_read_offset < socket_recv_len_)) return 0;
+ if (!(bio_read_offset < socket_recv_len_)) {
+ return 0;
+ }
const len = @minimum(len__, socket_recv_len_ - bio_read_offset);
- var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]);
+ var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr);
@memcpy(ptr, data, len);
bio_read_offset += len;