aboutsummaryrefslogtreecommitdiff
path: root/src/http
diff options
context:
space:
mode:
Diffstat (limited to 'src/http')
-rw-r--r--src/http/async_bio.zig306
-rw-r--r--src/http/async_message.zig3
2 files changed, 167 insertions, 142 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index 5e86da949..eadc40b2a 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -8,26 +8,38 @@ const Output = @import("../global.zig").Output;
const extremely_verbose = @import("../http_client_async.zig").extremely_verbose;
const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS;
const getAllocator = @import("../http_client_async.zig").getAllocator;
+const assert = std.debug.assert;
+const BufferPool = AsyncMessage.BufferPool;
+
+const fail = -3;
+const connection_closed = -2;
+const pending = -1;
+const OK = 0;
bio: *boring.BIO = undefined,
socket_fd: std.os.socket_t = 0,
-allocator: std.mem.Allocator,
-read_buf_len: usize = 0,
+allocator: std.mem.Allocator,
read_wait: Wait = Wait.pending,
send_wait: Wait = Wait.pending,
-recv_completion: AsyncIO.Completion = undefined,
-send_completion: AsyncIO.Completion = undefined,
-write_buffer: ?*AsyncMessage = null,
+recv_buffer: ?*BufferPool.Node = null,
+recv_completion: Completion = undefined,
+
+send_buffer: ?*BufferPool.Node = null,
+send_completion: Completion = undefined,
-last_send_result: AsyncIO.SendError!usize = 0,
+write_error: c_int = 0,
+socket_recv_len: c_int = 0,
+bio_read_offset: u32 = 0,
+
+socket_send_error: ?anyerror = null,
+socket_recv_error: ?anyerror = null,
-last_read_result: AsyncIO.RecvError!usize = 0,
next: ?*AsyncBIO = null,
-pending_frame: PendingFrame = PendingFrame.init(),
+pending_frame: PendingFrame = PendingFrame.init(),
pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 });
pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void {
@@ -38,6 +50,12 @@ pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe {
return this.pending_frame.readItem();
}
+pub fn nextFrame(this: *AsyncBIO) void {
+ if (this.pending_frame.readItem()) |frame| {
+ resume frame;
+ }
+}
+
var method: ?*boring.BIO_METHOD = null;
var head: ?*AsyncBIO = null;
@@ -77,11 +95,14 @@ pub fn release(this: *AsyncBIO) void {
}
this.read_wait = .pending;
- this.last_read_result = 0;
this.send_wait = .pending;
- this.last_read_result = 0;
this.pending_frame = PendingFrame.init();
+ if (this.recv_buffer) |recv| {
+ recv.release();
+ this.recv_buffer = null;
+ }
+
if (this.write_buffer) |write| {
write.release();
this.write_buffer = null;
@@ -110,79 +131,72 @@ const WaitResult = enum {
send,
};
-const Sender = struct {
- pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
- this.last_send_result = result;
- this.send_wait = .completed;
- this.write_buffer.?.sent += @truncate(u32, result catch 0);
-
- if (extremely_verbose) {
- const read_result = result catch @as(usize, 999);
- Output.prettyErrorln("onSend: {d}", .{read_result});
- Output.flush();
- }
-
- if (this.pending_frame.readItem()) |frame| {
- resume frame;
- }
- }
-};
+pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void {
+ const socket_recv_len = @truncate(
+ c_int,
+ result_ catch |err| {
+ this.socket_recv_error = err;
+ this.socket_recv_len = fail;
+ this.onSocketReadComplete();
+ return;
+ },
+ );
+ this.socket_recv_len += socket_recv_len;
-pub fn enqueueSend(
- self: *AsyncBIO,
-) void {
- if (self.write_buffer == null) return;
- var to_write = self.write_buffer.?.slice();
- if (to_write.len == 0) {
+ if (socket_recv_len == 0) {
+ this.onSocketReadComplete();
return;
}
- self.last_send_result = 0;
+ this.read_wait = .pending;
+ this.scheduleSocketRead();
+}
- AsyncIO.global.send(
- *AsyncBIO,
- self,
- Sender.onSend,
- &self.send_completion,
- self.socket_fd,
- to_write,
- SOCKET_FLAGS,
- );
- self.send_wait = .suspended;
- if (extremely_verbose) {
- Output.prettyErrorln("enqueueSend: {d}", .{to_write.len});
- Output.flush();
- }
+fn onSocketReadComplete(this: *AsyncBIO) void {
+ assert(this.read_wait == .suspended);
+ this.handleSocketReadComplete();
+
+ this.nextFrame();
}
-const Reader = struct {
- pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void {
- this.last_read_result = result;
- this.read_wait = .completed;
- if (extremely_verbose) {
- const read_result = result catch @as(usize, 999);
- Output.prettyErrorln("onRead: {d}", .{read_result});
- Output.flush();
- }
- if (this.pending_frame.readItem()) |frame| {
- resume frame;
+inline fn readBuf(this: *AsyncBIO) []u8 {
+ return this.recv_buffer.?.data[this.bio_read_offset..];
+}
+
+pub fn scheduleSocketRead(this: *AsyncBIO) void {
+ assert(this.read_wait == .pending);
+ this.read_wait = .suspended;
+
+ AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
+}
+
+fn handleSocketReadComplete(
+ this: *AsyncBIO,
+) void {
+ this.read_wait = .completed;
+
+ if (this.socket_recv_len <= 0) {
+ if (this.recv_buffer) |buf| {
+ buf.release();
+ this.recv_buffer = null;
}
}
-};
+}
-pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void {
- var read_buffer = read_buf[off..];
- if (read_buffer.len == 0) {
- return;
- }
+pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
+ assert(this.send_wait == .pending);
+ this.handleSocketWriteComplete(result);
+ this.nextFrame();
+}
- self.last_read_result = 0;
- AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer);
- self.read_wait = .suspended;
- if (extremely_verbose) {
- Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len});
- Output.flush();
- }
+pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void {
+ // this.last_socket_recv_len = result;
+ // this.read_wait = .completed;
+ // if (extremely_verbose) {
+ // const socket_recv_len = result catch @as(usize, 999);
+ // Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
+ // Output.flush();
+ // }
}
pub const Bio = struct {
@@ -205,90 +219,100 @@ pub const Bio = struct {
return 0;
}
pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int {
- std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
-
- var buf = ptr[0..@intCast(usize, len)];
- boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
+ if (len < 0) return len;
+ assert(@ptrToInt(ptr) > 0);
+ boring.BIO_clear_retry_flags(this_bio);
+ var this = cast(this_bio);
- if (len <= 0) {
- return 0;
+ if (this.socket_send_error != null) {
+ if (extremely_verbose) {
+ Output.prettyErrorln("write: {s}", .{@errorName(this.socket_send_error.?)});
+ Output.flush();
+ }
+ return -1;
}
+ }
+
+ pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int {
+ if (len_ < 0) return len_;
+ const len__: u32 = @intCast(u32, len_);
+ assert(@ptrToInt(ptr) > 0);
+ boring.BIO_clear_retry_flags(this_bio);
var this = cast(this_bio);
- if (this.read_wait == .suspended) {
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
+
+ var socket_recv_len = this.socket_recv_len;
+ var bio_read_offset = this.bio_read_offset;
+ defer {
+ this.bio_read_offset = bio_read_offset;
+ this.socket_recv_len = socket_recv_len;
+ }
+
+ if (this.socket_recv_error) |socket_err| {
+ if (extremely_verbose) Output.prettyErrorln("SSL read error: {s}", .{@errorName(socket_err)});
return -1;
}
- switch (this.send_wait) {
- .pending => {
- var write_buffer = this.write_buffer orelse brk: {
- this.write_buffer = AsyncMessage.get(getAllocator());
- break :brk this.write_buffer.?;
- };
-
- _ = write_buffer.writeAll(buf);
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
-
- return -1;
- },
- .suspended => {
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
-
- return -1;
- },
- .completed => {
- this.send_wait = .pending;
- const written = this.last_send_result catch |err| {
- Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
- Output.flush();
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- };
- this.last_send_result = 0;
- return @intCast(c_int, written);
- },
+ // If there is no result available synchronously, report any Write() errors
+ // that were observed. Otherwise the application may have encountered a socket
+ // error while writing that would otherwise not be reported until the
+ // application attempted to write again - which it may never do. See
+ // https://crbug.com/249848.
+ if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) {
+ return -1;
}
- unreachable;
- }
+ if (socket_recv_len == 0) {
+ // Instantiate the read buffer and read from the socket. Although only |len|
+ // bytes were requested, intentionally read to the full buffer size. The SSL
+ // layer reads the record header and body in separate reads to avoid
+ // overreading, but issuing one is more efficient. SSL sockets are not
+ // reused after shutdown for non-SSL traffic, so overreading is fine.
+ assert(bio_read_offset == 0);
+ this.scheduleSocketRead();
+ socket_recv_len = pending;
+ }
- pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int {
- std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
- var this = cast(this_bio);
+ if (socket_recv_len == pending) {
+ boring.BIO_set_retry_read(this_bio);
+ return -1;
+ }
- var buf = ptr[0..@maximum(@intCast(usize, len), this.read_buf_len)];
-
- boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
-
- switch (this.read_wait) {
- .pending => {
- this.enqueueRead(buf, 0);
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- },
- .suspended => {
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- },
- .completed => {
- this.read_wait = .pending;
- const read_len = this.last_read_result catch |err| {
- Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
- Output.flush();
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- };
- this.last_read_result = 0;
- return @intCast(c_int, read_len);
- },
+ // If the last Read() failed, report the error.
+ if (socket_recv_len < 0) {
+ if (extremely_verbose) Output.prettyErrorln("Unexpected ssl error: {d}", .{socket_recv_len});
+ return -1;
}
- unreachable;
+
+ const socket_recv_len_ = @intCast(u32, socket_recv_len);
+
+ // Report the result of the last Read() if non-empty.
+ if (!(bio_read_offset < socket_recv_len_)) return 0;
+ const len = @minimum(len__, socket_recv_len_ - bio_read_offset);
+ var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]);
+ @memcpy(ptr, data, len);
+ bio_read_offset += len;
+
+ if (bio_read_offset == socket_recv_len_) {
+ // The read buffer is empty.
+ bio_read_offset = 0;
+ socket_recv_len = 0;
+
+ if (this.recv_buffer) |buf| {
+ buf.release();
+ this.recv_buffer = null;
+ }
+ }
+
+ return @intCast(c_int, len);
}
+
+ // https://chromium.googlesource.com/chromium/src/+/refs/heads/main/net/socket/socket_bio_adapter.cc#376
pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long {
return switch (cmd) {
- boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0,
- else => 1,
+ // The SSL stack requires BIOs handle BIO_flush.
+ boring.BIO_CTRL_FLUSH => 1,
+ else => 0,
};
}
};
diff --git a/src/http/async_message.zig b/src/http/async_message.zig
index c1c11b109..d68c58cb0 100644
--- a/src/http/async_message.zig
+++ b/src/http/async_message.zig
@@ -3,7 +3,8 @@ const ObjectPool = @import("../pool.zig").ObjectPool;
const AsyncIO = @import("io");
pub const buffer_pool_len = std.math.maxInt(u16) - 64;
-pub const BufferPool = ObjectPool([buffer_pool_len]u8, null, false);
+pub const BufferPoolBytes = [buffer_pool_len]u8;
+pub const BufferPool = ObjectPool(BufferPoolBytes, null, false);
const AsyncMessage = @This();