diff options
author | 2022-01-25 19:58:59 -0800 | |
---|---|---|
committer | 2022-01-25 20:00:23 -0800 | |
commit | 9db4d195a7c6c777c37ba72675de08f5179aa5ee (patch) | |
tree | 1b7ce4e5422d3e74d7a137d84bbbd51329f579a8 | |
parent | 0808f293756e3cb61a814091cbde03090ee165fb (diff) | |
download | bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.tar.gz bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.tar.zst bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.zip |
Split http into files
Diffstat (limited to '')
-rw-r--r-- | src/http/async_bio.zig | 294 | ||||
-rw-r--r-- | src/http/async_message.zig | 112 | ||||
-rw-r--r-- | src/http/async_socket.zig | 520 | ||||
-rw-r--r-- | src/http/header_builder.zig | 42 | ||||
-rw-r--r-- | src/http/headers.zig | 8 | ||||
-rw-r--r-- | src/http/zlib.zig | 65 | ||||
-rw-r--r-- | src/http_client_async.zig | 1093 |
7 files changed, 1055 insertions, 1079 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig new file mode 100644 index 000000000..5e86da949 --- /dev/null +++ b/src/http/async_bio.zig @@ -0,0 +1,294 @@ +const boring = @import("boringssl"); +const std = @import("std"); +const AsyncIO = @import("io"); +const Completion = AsyncIO.Completion; +const AsyncMessage = @import("./async_message.zig"); +const AsyncBIO = @This(); +const Output = @import("../global.zig").Output; +const extremely_verbose = @import("../http_client_async.zig").extremely_verbose; +const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS; +const getAllocator = @import("../http_client_async.zig").getAllocator; + +bio: *boring.BIO = undefined, +socket_fd: std.os.socket_t = 0, +allocator: std.mem.Allocator, + +read_buf_len: usize = 0, + +read_wait: Wait = Wait.pending, +send_wait: Wait = Wait.pending, +recv_completion: AsyncIO.Completion = undefined, +send_completion: AsyncIO.Completion = undefined, + +write_buffer: ?*AsyncMessage = null, + +last_send_result: AsyncIO.SendError!usize = 0, + +last_read_result: AsyncIO.RecvError!usize = 0, +next: ?*AsyncBIO = null, +pending_frame: PendingFrame = PendingFrame.init(), + +pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); + +pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { + this.pending_frame.writeItem(frame) catch {}; +} + +pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { + return this.pending_frame.readItem(); +} + +var method: ?*boring.BIO_METHOD = null; +var head: ?*AsyncBIO = null; + +const async_bio_name: [:0]const u8 = "AsyncBIO"; + +const Wait = enum { + pending, + suspended, + completed, +}; + +fn instance(allocator: std.mem.Allocator) *AsyncBIO { + if (head) |head_| { + var next = head_.next; + var ret = head_; + ret.read_wait = .pending; + ret.send_wait = .pending; + head = next; + + ret.pending_frame = PendingFrame.init(); + return ret; + } + + var bio = allocator.create(AsyncBIO) catch unreachable; + bio.* = AsyncBIO{ + .allocator = allocator, + .read_wait = .pending, + .send_wait = .pending, + }; + + return bio; +} + +pub fn release(this: *AsyncBIO) void { + if (head) |head_| { + this.next = head_; + } + + this.read_wait = .pending; + this.last_read_result = 0; + this.send_wait = .pending; + this.last_read_result = 0; + this.pending_frame = PendingFrame.init(); + + if (this.write_buffer) |write| { + write.release(); + this.write_buffer = null; + } + + head = this; +} + +pub fn init(allocator: std.mem.Allocator) !*AsyncBIO { + var bio = instance(allocator); + + bio.bio = boring.BIO_new( + method orelse brk: { + method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); + break :brk method.?; + }, + ) orelse return error.OutOfMemory; + + _ = boring.BIO_set_data(bio.bio, bio); + return bio; +} + +const WaitResult = enum { + none, + read, + send, +}; + +const Sender = struct { + pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { + this.last_send_result = result; + this.send_wait = .completed; + this.write_buffer.?.sent += @truncate(u32, result catch 0); + + if (extremely_verbose) { + const read_result = result catch @as(usize, 999); + Output.prettyErrorln("onSend: {d}", .{read_result}); + Output.flush(); + } + + if (this.pending_frame.readItem()) |frame| { + resume frame; + } + } +}; + +pub fn enqueueSend( + self: *AsyncBIO, +) void { + if (self.write_buffer == null) return; + var to_write = self.write_buffer.?.slice(); + if (to_write.len == 0) { + return; + } + + self.last_send_result = 0; + + AsyncIO.global.send( + *AsyncBIO, + self, + Sender.onSend, + &self.send_completion, + self.socket_fd, + to_write, + SOCKET_FLAGS, + ); + self.send_wait = .suspended; + if (extremely_verbose) { + Output.prettyErrorln("enqueueSend: {d}", .{to_write.len}); + Output.flush(); + } +} + +const Reader = struct { + pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void { + this.last_read_result = result; + this.read_wait = .completed; + if (extremely_verbose) { + const read_result = result catch @as(usize, 999); + Output.prettyErrorln("onRead: {d}", .{read_result}); + Output.flush(); + } + if (this.pending_frame.readItem()) |frame| { + resume frame; + } + } +}; + +pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void { + var read_buffer = read_buf[off..]; + if (read_buffer.len == 0) { + return; + } + + self.last_read_result = 0; + AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer); + self.read_wait = .suspended; + if (extremely_verbose) { + Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len}); + Output.flush(); + } +} + +pub const Bio = struct { + inline fn cast(bio: *boring.BIO) *AsyncBIO { + return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio))); + } + + pub fn create(this_bio: *boring.BIO) callconv(.C) c_int { + boring.BIO_set_init(this_bio, 1); + return 1; + } + pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int { + boring.BIO_set_init(this_bio, 0); + + if (boring.BIO_get_data(this_bio) != null) { + var this = cast(this_bio); + this.release(); + } + + return 0; + } + pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { + std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); + + var buf = ptr[0..@intCast(usize, len)]; + boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + + if (len <= 0) { + return 0; + } + + var this = cast(this_bio); + if (this.read_wait == .suspended) { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + } + + switch (this.send_wait) { + .pending => { + var write_buffer = this.write_buffer orelse brk: { + this.write_buffer = AsyncMessage.get(getAllocator()); + break :brk this.write_buffer.?; + }; + + _ = write_buffer.writeAll(buf); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + return -1; + }, + .suspended => { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + return -1; + }, + .completed => { + this.send_wait = .pending; + const written = this.last_send_result catch |err| { + Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); + Output.flush(); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }; + this.last_send_result = 0; + return @intCast(c_int, written); + }, + } + + unreachable; + } + + pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int { + std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); + var this = cast(this_bio); + + var buf = ptr[0..@maximum(@intCast(usize, len), this.read_buf_len)]; + + boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + + switch (this.read_wait) { + .pending => { + this.enqueueRead(buf, 0); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }, + .suspended => { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }, + .completed => { + this.read_wait = .pending; + const read_len = this.last_read_result catch |err| { + Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); + Output.flush(); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }; + this.last_read_result = 0; + return @intCast(c_int, read_len); + }, + } + unreachable; + } + pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long { + return switch (cmd) { + boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0, + else => 1, + }; + } +}; diff --git a/src/http/async_message.zig b/src/http/async_message.zig new file mode 100644 index 000000000..c1c11b109 --- /dev/null +++ b/src/http/async_message.zig @@ -0,0 +1,112 @@ +const std = @import("std"); +const ObjectPool = @import("../pool.zig").ObjectPool; +const AsyncIO = @import("io"); + +pub const buffer_pool_len = std.math.maxInt(u16) - 64; +pub const BufferPool = ObjectPool([buffer_pool_len]u8, null, false); + +const AsyncMessage = @This(); + +used: u32 = 0, +sent: u32 = 0, +completion: AsyncIO.Completion = undefined, +buf: []u8 = undefined, +pooled: ?*BufferPool.Node = null, +allocator: std.mem.Allocator, +next: ?*AsyncMessage = null, +context: *anyopaque = undefined, +released: bool = false, + +var _first_ssl: ?*AsyncMessage = null; + +pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage { + if (_first_ssl) |first| { + var prev = first; + + std.debug.assert(prev.released); + if (prev.next) |next| { + _first_ssl = next; + prev.next = null; + } else { + _first_ssl = null; + } + prev.released = false; + + return prev; + } + + var msg = allocator.create(AsyncMessage) catch unreachable; + msg.* = AsyncMessage{ + .allocator = allocator, + .pooled = null, + .buf = &[_]u8{}, + }; + return msg; +} + +var _first: ?*AsyncMessage = null; +pub fn get(allocator: std.mem.Allocator) *AsyncMessage { + if (_first) |first| { + var prev = first; + std.debug.assert(prev.released); + prev.released = false; + + if (first.next) |next| { + _first = next; + prev.next = null; + return prev; + } else { + _first = null; + } + + return prev; + } + + var msg = allocator.create(AsyncMessage) catch unreachable; + var pooled = BufferPool.get(allocator); + msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled }; + return msg; +} + +pub fn release(self: *AsyncMessage) void { + self.used = 0; + self.sent = 0; + if (self.released) return; + self.released = true; + + if (self.pooled != null) { + var old = _first; + _first = self; + self.next = old; + } else { + var old = _first_ssl; + self.next = old; + _first_ssl = self; + } +} + +const WriteResponse = struct { + written: u32 = 0, + overflow: bool = false, +}; + +pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse { + var remain = this.buf[this.used..]; + var writable = buffer[0..@minimum(buffer.len, remain.len)]; + if (writable.len == 0) { + return .{ .written = 0, .overflow = buffer.len > 0 }; + } + + std.mem.copy(u8, remain, writable); + this.used += @intCast(u16, writable.len); + + return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len }; +} + +pub inline fn slice(this: *const AsyncMessage) []const u8 { + return this.buf[0..this.used][this.sent..]; +} + +pub inline fn available(this: *AsyncMessage) []u8 { + return this.buf[0 .. this.buf.len - this.used]; +} diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig new file mode 100644 index 000000000..83ee904aa --- /dev/null +++ b/src/http/async_socket.zig @@ -0,0 +1,520 @@ +const boring = @import("boringssl"); +const std = @import("std"); +const AsyncIO = @import("io"); +const AsyncMessage = @import("./async_message.zig"); +const AsyncBIO = @import("./async_bio.zig"); +const Completion = AsyncIO.Completion; +const AsyncSocket = @This(); + +const Output = @import("../global.zig").Output; +const NetworkThread = @import("../network_thread.zig"); +const Environment = @import("../global.zig").Environment; + +const extremely_verbose = @import("../http_client_async.zig").extremely_verbose; +const SOCKET_FLAGS: u32 = @import("../http_client_async.zig").SOCKET_FLAGS; +const getAllocator = @import("../http_client_async.zig").getAllocator; +const OPEN_SOCKET_FLAGS: u32 = @import("../http_client_async.zig").OPEN_SOCKET_FLAGS; + +io: *AsyncIO = undefined, +socket: std.os.socket_t = 0, +head: *AsyncMessage = undefined, +tail: *AsyncMessage = undefined, +allocator: std.mem.Allocator, +err: ?anyerror = null, +queued: usize = 0, +sent: usize = 0, +send_frame: @Frame(AsyncSocket.send) = undefined, +read_frame: @Frame(AsyncSocket.read) = undefined, +connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined, +close_frame: @Frame(AsyncSocket.close) = undefined, + +read_context: []u8 = undefined, +read_offset: u64 = 0, +read_completion: AsyncIO.Completion = undefined, +connect_completion: AsyncIO.Completion = undefined, +close_completion: AsyncIO.Completion = undefined, + +const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError || error{UnknownHostName}; + +pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: std.mem.Allocator) !AsyncSocket { + var head = AsyncMessage.get(allocator); + + return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator }; +} + +fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void { + err catch |resolved_err| { + this.err = resolved_err; + }; + + resume this.connect_frame; +} + +fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void { + const sockfd = AsyncIO.openSocket(address.any.family, OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP) catch |err| { + if (extremely_verbose) { + Output.prettyErrorln("openSocket error: {s}", .{@errorName(err)}); + } + + return error.ConnectionRefused; + }; + + this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); + suspend { + this.connect_frame = @frame().*; + } + + if (this.err) |e| { + return @errSetCast(ConnectError, e); + } + + this.socket = sockfd; + return; +} + +fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void { + resume this.close_frame; +} + +pub fn close(this: *AsyncSocket) void { + if (this.socket == 0) return; + this.io.close(*AsyncSocket, this, on_close, &this.close_completion, this.socket); + suspend { + this.close_frame = @frame().*; + } + this.socket = 0; +} + +pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { + this.socket = 0; + outer: while (true) { + // on macOS, getaddrinfo() is very slow + // If you send ~200 network requests, about 1.5s is spent on getaddrinfo() + // So, we cache this. + var address_list = NetworkThread.getAddressList(getAllocator(), name, port) catch |err| { + return @errSetCast(ConnectError, err); + }; + + const list = address_list.address_list; + if (list.addrs.len == 0) return error.ConnectionRefused; + + try_cached_index: { + if (address_list.index) |i| { + const address = list.addrs[i]; + if (address_list.invalidated) continue :outer; + + this.connectToAddress(address) catch |err| { + if (err == error.ConnectionRefused) { + address_list.index = null; + break :try_cached_index; + } + + address_list.invalidate(); + continue :outer; + }; + } + } + + for (list.addrs) |address, i| { + if (address_list.invalidated) continue :outer; + this.connectToAddress(address) catch |err| { + if (err == error.ConnectionRefused) continue; + address_list.invalidate(); + if (err == error.AddressNotAvailable or err == error.UnknownHostName) continue :outer; + return err; + }; + address_list.index = @truncate(u32, i); + return; + } + + if (address_list.invalidated) continue :outer; + + address_list.invalidate(); + return error.ConnectionRefused; + } +} + +fn on_send(msg: *AsyncMessage, _: *Completion, result: SendError!usize) void { + var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context)); + const written = result catch |err| { + this.err = err; + resume this.send_frame; + return; + }; + + if (written == 0) { + resume this.send_frame; + return; + } + + msg.sent += @truncate(u16, written); + const has_more = msg.used > msg.sent; + this.sent += written; + + if (has_more) { + this.io.send( + *AsyncMessage, + msg, + on_send, + &msg.completion, + this.socket, + msg.slice(), + SOCKET_FLAGS, + ); + } else { + msg.release(); + } + + // complete + if (this.queued <= this.sent) { + resume this.send_frame; + } +} + +pub fn write(this: *AsyncSocket, buf: []const u8) usize { + this.tail.context = this; + + const resp = this.tail.writeAll(buf); + this.queued += resp.written; + + if (resp.overflow) { + var next = AsyncMessage.get(getAllocator()); + this.tail.next = next; + this.tail = next; + + return @as(usize, resp.written) + this.write(buf[resp.written..]); + } + + return @as(usize, resp.written); +} + +pub const SendError = AsyncIO.SendError; + +pub fn deinit(this: *AsyncSocket) void { + this.head.release(); +} + +pub fn send(this: *AsyncSocket) SendError!usize { + const original_sent = this.sent; + this.head.context = this; + + this.io.send( + *AsyncMessage, + this.head, + on_send, + &this.head.completion, + this.socket, + this.head.slice(), + SOCKET_FLAGS, + ); + + var node = this.head; + while (node.next) |element| { + this.io.send( + *AsyncMessage, + element, + on_send, + &element.completion, + this.socket, + element.slice(), + SOCKET_FLAGS, + ); + node = element.next orelse break; + } + + suspend { + this.send_frame = @frame().*; + } + + if (this.err) |err| { + this.err = null; + return @errSetCast(AsyncSocket.SendError, err); + } + + return this.sent - original_sent; +} + +pub const RecvError = AsyncIO.RecvError; + +const Reader = struct { + pub fn on_read(ctx: *AsyncSocket, _: *AsyncIO.Completion, result: RecvError!usize) void { + const len = result catch |err| { + ctx.err = err; + resume ctx.read_frame; + return; + }; + ctx.read_offset += len; + resume ctx.read_frame; + } +}; + +pub fn read( + this: *AsyncSocket, + bytes: []u8, + offset: u64, +) RecvError!u64 { + this.read_context = bytes; + this.read_offset = offset; + const original_read_offset = this.read_offset; + + this.io.recv( + *AsyncSocket, + this, + Reader.on_read, + &this.read_completion, + this.socket, + bytes, + ); + + suspend { + this.read_frame = @frame().*; + } + + if (this.err) |err| { + this.err = null; + return @errSetCast(RecvError, err); + } + + return this.read_offset - original_read_offset; +} + +pub const SSL = struct { + ssl: *boring.SSL = undefined, + ssl_loaded: bool = false, + socket: AsyncSocket, + handshake_complete: bool = false, + ssl_bio: ?*AsyncBIO = null, + read_bio: ?*AsyncMessage = null, + handshake_frame: @Frame(SSL.handshake) = undefined, + send_frame: @Frame(SSL.send) = undefined, + read_frame: @Frame(SSL.read) = undefined, + hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined, + is_ssl: bool = false, + + const SSLConnectError = ConnectError || HandshakeError; + const HandshakeError = error{OpenSSLError}; + + pub fn connect(this: *SSL, name: []const u8, port: u16) !void { + this.is_ssl = true; + try this.socket.connect(name, port); + + this.handshake_complete = false; + + var ssl = boring.initClient(); + this.ssl = ssl; + this.ssl_loaded = true; + errdefer { + this.ssl_loaded = false; + this.ssl.deinit(); + this.ssl = undefined; + } + + { + std.mem.copy(u8, &this.hostname, name); + this.hostname[name.len] = 0; + var name_ = this.hostname[0..name.len :0]; + ssl.setHostname(name_); + } + + var bio = try AsyncBIO.init(this.socket.allocator); + bio.socket_fd = this.socket.socket; + this.ssl_bio = bio; + + boring.SSL_set_bio(ssl, bio.bio, bio.bio); + + this.read_bio = AsyncMessage.get(this.socket.allocator); + try this.handshake(); + } + + pub fn close(this: *SSL) void { + this.socket.close(); + } + + fn handshake(this: *SSL) HandshakeError!void { + while (!this.ssl.isInitFinished()) { + boring.ERR_clear_error(); + this.ssl_bio.?.enqueueSend(); + const handshake_result = boring.SSL_connect(this.ssl); + if (handshake_result == 0) { + Output.prettyErrorln("ssl accept error", .{}); + Output.flush(); + return error.OpenSSLError; + } + this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished(); + + if (!this.handshake_complete) { + // accept_result < 0 + const e = boring.SSL_get_error(this.ssl, handshake_result); + if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { + this.ssl_bio.?.enqueueSend(); + suspend { + this.handshake_frame = @frame().*; + this.ssl_bio.?.pushPendingFrame(&this.handshake_frame); + } + + continue; + } + + Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result }); + Output.flush(); + return error.OpenSSLError; + } + } + } + + pub fn write(this: *SSL, buffer_: []const u8) usize { + var buffer = buffer_; + var read_bio = this.read_bio; + while (buffer.len > 0) { + const response = read_bio.?.writeAll(buffer); + buffer = buffer[response.written..]; + if (response.overflow) { + read_bio = read_bio.?.next orelse brk: { + read_bio.?.next = AsyncMessage.get(this.socket.allocator); + break :brk read_bio.?.next.?; + }; + } + } + + return buffer_.len; + } + + pub fn send(this: *SSL) !usize { + var bio_ = this.read_bio; + var len: usize = 0; + while (bio_) |bio| { + var slice = bio.slice(); + len += this.ssl.write(slice) catch |err| { + switch (err) { + error.WantRead => { + suspend { + this.send_frame = @frame().*; + this.ssl_bio.?.pushPendingFrame(&this.send_frame); + } + continue; + }, + error.WantWrite => { + this.ssl_bio.?.enqueueSend(); + + suspend { + this.send_frame = @frame().*; + this.ssl_bio.?.pushPendingFrame(&this.send_frame); + } + continue; + }, + else => {}, + } + + if (comptime Environment.isDebug) { + Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{ + @errorName(err), + bio.slice(), + }); + Output.flush(); + } + + return err; + }; + + bio_ = bio.next; + } + return len; + } + + pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 { + var buf = buf_[offset..]; + var len: usize = 0; + while (buf.len > 0) { + this.ssl_bio.?.read_buf_len = buf.len; + len = this.ssl.read(buf) catch |err| { + switch (err) { + error.WantWrite => { + this.ssl_bio.?.enqueueSend(); + + if (extremely_verbose) { + Output.prettyErrorln( + "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", + .{ + @errorName(err), + @tagName(this.ssl_bio.?.read_wait), + @tagName(this.ssl_bio.?.send_wait), + }, + ); + Output.flush(); + } + + suspend { + this.read_frame = @frame().*; + this.ssl_bio.?.pushPendingFrame(&this.read_frame); + } + continue; + }, + error.WantRead => { + // this.ssl_bio.enqueueSend(); + + if (extremely_verbose) { + Output.prettyErrorln( + "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", + .{ + @errorName(err), + @tagName(this.ssl_bio.?.read_wait), + @tagName(this.ssl_bio.?.send_wait), + }, + ); + Output.flush(); + } + + suspend { + this.read_frame = @frame().*; + this.ssl_bio.?.pushPendingFrame(&this.read_frame); + } + continue; + }, + else => return err, + } + unreachable; + }; + + break; + } + + return len; + } + + pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL { + return SSL{ + .socket = try AsyncSocket.init(io, 0, allocator), + }; + } + + pub fn deinit(this: *SSL) void { + this.socket.deinit(); + if (!this.is_ssl) return; + + if (this.ssl_bio) |bio| { + _ = boring.BIO_set_data(bio.bio, null); + bio.pending_frame = AsyncBIO.PendingFrame.init(); + bio.socket_fd = 0; + bio.release(); + this.ssl_bio = null; + } + + if (this.ssl_loaded) { + this.ssl.deinit(); + this.ssl_loaded = false; + } + + this.handshake_complete = false; + + if (this.read_bio) |bio| { + var next_ = bio.next; + while (next_) |next| { + next.release(); + next_ = next.next; + } + + bio.release(); + this.read_bio = null; + } + } +}; diff --git a/src/http/header_builder.zig b/src/http/header_builder.zig new file mode 100644 index 000000000..0f33b9d4b --- /dev/null +++ b/src/http/header_builder.zig @@ -0,0 +1,42 @@ +const HeaderBuilder = @This(); +const StringBuilder = @import("../string_builder.zig"); +const Headers = @import("./headers.zig"); +const string = @import("../global.zig").string; +const HTTPClient = @import("../http_client_async.zig"); +const Api = @import("../api/schema.zig").Api; +const std = @import("std"); + +content: StringBuilder = StringBuilder{}, +header_count: u64 = 0, +entries: Headers.Entries = Headers.Entries{}, + +pub fn count(this: *HeaderBuilder, name: string, value: string) void { + this.header_count += 1; + this.content.count(name); + this.content.count(value); +} + +pub fn allocate(this: *HeaderBuilder, allocator: std.mem.Allocator) !void { + try this.content.allocate(allocator); + try this.entries.ensureTotalCapacity(allocator, this.header_count); +} +pub fn append(this: *HeaderBuilder, name: string, value: string) void { + const name_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, name.len), + }; + + _ = this.content.append(name); + + const value_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, value.len), + }; + _ = this.content.append(value); + this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr }); +} + +pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void { + client.header_entries = this.entries; + client.header_buf = this.content.ptr.?[0..this.content.len]; +} diff --git a/src/http/headers.zig b/src/http/headers.zig new file mode 100644 index 000000000..4c76f3b4a --- /dev/null +++ b/src/http/headers.zig @@ -0,0 +1,8 @@ +const Api = @import("../api/schema.zig").Api; +const std = @import("std"); + +pub const Kv = struct { + name: Api.StringPointer, + value: Api.StringPointer, +}; +pub const Entries = std.MultiArrayList(Kv); diff --git a/src/http/zlib.zig b/src/http/zlib.zig new file mode 100644 index 000000000..1bd38777d --- /dev/null +++ b/src/http/zlib.zig @@ -0,0 +1,65 @@ +const Lock = @import("../lock.zig").Lock; +const std = @import("std"); +const MutableString = @import("../global.zig").MutableString; +const getAllocator = @import("../http_client_async.zig").getAllocator; +const ZlibPool = @This(); +const Zlib = @import("../zlib.zig"); + +lock: Lock = Lock.init(), +items: std.ArrayList(*MutableString), +allocator: std.mem.Allocator, + +pub var instance: ZlibPool = undefined; +pub var loaded: bool = false; + +pub fn init(allocator: std.mem.Allocator) ZlibPool { + return ZlibPool{ + .allocator = allocator, + .items = std.ArrayList(*MutableString).init(allocator), + }; +} + +pub fn get(this: *ZlibPool) !*MutableString { + switch (this.items.items.len) { + 0 => { + var mutable = try getAllocator().create(MutableString); + mutable.* = try MutableString.init(getAllocator(), 0); + return mutable; + }, + else => { + return this.items.pop(); + }, + } + + unreachable; +} + +pub fn put(this: *ZlibPool, mutable: *MutableString) !void { + mutable.reset(); + try this.items.append(mutable); +} + +pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void { + // Heuristic: if we have more than 128 KB of data to decompress + // it may take 1ms or so + // We must keep the network thread unblocked as often as possible + // So if we have more than 50 KB of data to decompress, we do it off the network thread + // if (compressed_data.len < 50_000) { + var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, getAllocator()); + try reader.readAll(); + return; + // } + + // var task = try DecompressionTask.get(default_allocator); + // defer task.release(); + // task.* = DecompressionTask{ + // .data = compressed_data, + // .output = output, + // .event_fd = AsyncIO.global.eventfd(), + // }; + // task.scheduleAndWait(); + + // if (task.err) |err| { + // return @errSetCast(Zlib.ZlibError, err); + // } +} diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 069908322..54c42418f 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -24,6 +24,11 @@ pub const NetworkThread = @import("./network_thread.zig"); const ObjectPool = @import("./pool.zig").ObjectPool; const SOCK = os.SOCK; const Arena = @import("./mimalloc_arena.zig").Arena; +const AsyncMessage = @import("./http/async_message.zig"); +const AsyncBIO = @import("./http/async_bio.zig"); +const AsyncSocket = @import("./http/async_socket.zig"); +const ZlibPool = @import("./http/zlib.zig"); +const URLBufferPool = ObjectPool([4096]u8, null, false); // This becomes Arena.allocator pub var default_allocator: std.mem.Allocator = undefined; @@ -43,22 +48,20 @@ pub fn onThreadStart() void { NetworkThread.global.pool.io = &AsyncIO.global; } -pub const Headers = struct { - pub const Kv = struct { - name: Api.StringPointer, - value: Api.StringPointer, - }; - pub const Entries = std.MultiArrayList(Kv); -}; +pub inline fn getAllocator() std.mem.Allocator { + return default_allocator; +} + +pub const Headers = @import("./http/headers.zig"); -const SOCKET_FLAGS: u32 = if (Environment.isLinux) +pub const SOCKET_FLAGS: u32 = if (Environment.isLinux) SOCK.CLOEXEC | os.MSG.NOSIGNAL else SOCK.CLOEXEC; -const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; +pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; -const extremely_verbose = false; +pub const extremely_verbose = false; fn writeRequest( comptime Writer: type, @@ -211,42 +214,7 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string { return this.header_buf[ptr.offset..][0..ptr.length]; } -pub const HeaderBuilder = struct { - content: StringBuilder = StringBuilder{}, - header_count: u64 = 0, - entries: Headers.Entries = Headers.Entries{}, - - pub fn count(this: *HeaderBuilder, name: string, value: string) void { - this.header_count += 1; - this.content.count(name); - this.content.count(value); - } - - pub fn allocate(this: *HeaderBuilder, allocator: std.mem.Allocator) !void { - try this.content.allocate(allocator); - try this.entries.ensureTotalCapacity(allocator, this.header_count); - } - pub fn append(this: *HeaderBuilder, name: string, value: string) void { - const name_ptr = Api.StringPointer{ - .offset = @truncate(u32, this.content.len), - .length = @truncate(u32, name.len), - }; - - _ = this.content.append(name); - - const value_ptr = Api.StringPointer{ - .offset = @truncate(u32, this.content.len), - .length = @truncate(u32, value.len), - }; - _ = this.content.append(value); - this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr }); - } - - pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void { - client.header_entries = this.entries; - client.header_buf = this.content.ptr.?[0..this.content.len]; - } -}; +pub const HeaderBuilder = @import("./http/header_builder.zig"); pub const HTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 }); // 32 pointers much cheaper than 1000 pointers @@ -466,909 +434,6 @@ pub const AsyncHTTP = struct { } }; -const buffer_pool_len = std.math.maxInt(u16) - 64; -const BufferPool = ObjectPool([buffer_pool_len]u8, null, false); -const URLBufferPool = ObjectPool([4096]u8, null, false); - -pub const AsyncMessage = struct { - used: u32 = 0, - sent: u32 = 0, - completion: AsyncIO.Completion = undefined, - buf: []u8 = undefined, - pooled: ?*BufferPool.Node = null, - allocator: std.mem.Allocator, - next: ?*AsyncMessage = null, - context: *anyopaque = undefined, - released: bool = false, - var _first_ssl: ?*AsyncMessage = null; - pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage { - if (_first_ssl) |first| { - var prev = first; - - std.debug.assert(prev.released); - if (prev.next) |next| { - _first_ssl = next; - prev.next = null; - } else { - _first_ssl = null; - } - prev.released = false; - - return prev; - } - - var msg = allocator.create(AsyncMessage) catch unreachable; - msg.* = AsyncMessage{ - .allocator = allocator, - .pooled = null, - .buf = &[_]u8{}, - }; - return msg; - } - - var _first: ?*AsyncMessage = null; - pub fn get(allocator: std.mem.Allocator) *AsyncMessage { - if (_first) |first| { - var prev = first; - std.debug.assert(prev.released); - prev.released = false; - - if (first.next) |next| { - _first = next; - prev.next = null; - return prev; - } else { - _first = null; - } - - return prev; - } - - var msg = allocator.create(AsyncMessage) catch unreachable; - var pooled = BufferPool.get(allocator); - msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled }; - return msg; - } - - pub fn release(self: *AsyncMessage) void { - self.used = 0; - self.sent = 0; - if (self.released) return; - self.released = true; - - if (self.pooled != null) { - var old = _first; - _first = self; - self.next = old; - } else { - var old = _first_ssl; - self.next = old; - _first_ssl = self; - } - } - - const WriteResponse = struct { - written: u32 = 0, - overflow: bool = false, - }; - - pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse { - var remain = this.buf[this.used..]; - var writable = buffer[0..@minimum(buffer.len, remain.len)]; - if (writable.len == 0) { - return .{ .written = 0, .overflow = buffer.len > 0 }; - } - - std.mem.copy(u8, remain, writable); - this.used += @intCast(u16, writable.len); - - return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len }; - } - - pub inline fn slice(this: *const AsyncMessage) []const u8 { - return this.buf[0..this.used][this.sent..]; - } - - pub inline fn available(this: *AsyncMessage) []u8 { - return this.buf[0 .. this.buf.len - this.used]; - } -}; - -const Completion = AsyncIO.Completion; - -const AsyncSocket = struct { - const This = @This(); - io: *AsyncIO = undefined, - socket: std.os.socket_t = 0, - head: *AsyncMessage = undefined, - tail: *AsyncMessage = undefined, - allocator: std.mem.Allocator, - err: ?anyerror = null, - queued: usize = 0, - sent: usize = 0, - send_frame: @Frame(AsyncSocket.send) = undefined, - read_frame: @Frame(AsyncSocket.read) = undefined, - connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined, - close_frame: @Frame(AsyncSocket.close) = undefined, - - read_context: []u8 = undefined, - read_offset: u64 = 0, - read_completion: AsyncIO.Completion = undefined, - connect_completion: AsyncIO.Completion = undefined, - close_completion: AsyncIO.Completion = undefined, - - const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError || error{ - UnknownHostName, - ConnectionRefused, - AddressNotAvailable, - }; - - pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: std.mem.Allocator) !AsyncSocket { - var head = AsyncMessage.get(allocator); - - return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator }; - } - - fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void { - err catch |resolved_err| { - this.err = resolved_err; - }; - - resume this.connect_frame; - } - - fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void { - const sockfd = AsyncIO.openSocket(address.any.family, OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP) catch |err| { - if (extremely_verbose) { - Output.prettyErrorln("openSocket error: {s}", .{@errorName(err)}); - } - - return error.ConnectionRefused; - }; - - this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); - suspend { - this.connect_frame = @frame().*; - } - - if (this.err) |e| { - return @errSetCast(ConnectError, e); - } - - this.socket = sockfd; - return; - } - - fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void { - resume this.close_frame; - } - - pub fn close(this: *AsyncSocket) void { - if (this.socket == 0) return; - this.io.close(*AsyncSocket, this, on_close, &this.close_completion, this.socket); - suspend { - this.close_frame = @frame().*; - } - this.socket = 0; - } - - pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { - this.socket = 0; - outer: while (true) { - // on macOS, getaddrinfo() is very slow - // If you send ~200 network requests, about 1.5s is spent on getaddrinfo() - // So, we cache this. - var address_list = NetworkThread.getAddressList(default_allocator, name, port) catch |err| { - return @errSetCast(ConnectError, err); - }; - - const list = address_list.address_list; - if (list.addrs.len == 0) return error.ConnectionRefused; - - try_cached_index: { - if (address_list.index) |i| { - const address = list.addrs[i]; - if (address_list.invalidated) continue :outer; - - this.connectToAddress(address) catch |err| { - if (err == error.ConnectionRefused) { - address_list.index = null; - break :try_cached_index; - } - - address_list.invalidate(); - continue :outer; - }; - } - } - - for (list.addrs) |address, i| { - if (address_list.invalidated) continue :outer; - this.connectToAddress(address) catch |err| { - if (err == error.ConnectionRefused) continue; - address_list.invalidate(); - if (err == error.AddressNotAvailable or err == error.UnknownHostName) continue :outer; - return err; - }; - address_list.index = @truncate(u32, i); - return; - } - - if (address_list.invalidated) continue :outer; - - address_list.invalidate(); - return error.ConnectionRefused; - } - } - - fn on_send(msg: *AsyncMessage, _: *Completion, result: SendError!usize) void { - var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context)); - const written = result catch |err| { - this.err = err; - resume this.send_frame; - return; - }; - - if (written == 0) { - resume this.send_frame; - return; - } - - msg.sent += @truncate(u16, written); - const has_more = msg.used > msg.sent; - this.sent += written; - - if (has_more) { - this.io.send( - *AsyncMessage, - msg, - on_send, - &msg.completion, - this.socket, - msg.slice(), - SOCKET_FLAGS, - ); - } else { - msg.release(); - } - - // complete - if (this.queued <= this.sent) { - resume this.send_frame; - } - } - - pub fn write(this: *AsyncSocket, buf: []const u8) usize { - this.tail.context = this; - - const resp = this.tail.writeAll(buf); - this.queued += resp.written; - - if (resp.overflow) { - var next = AsyncMessage.get(default_allocator); - this.tail.next = next; - this.tail = next; - - return @as(usize, resp.written) + this.write(buf[resp.written..]); - } - - return @as(usize, resp.written); - } - - pub const SendError = AsyncIO.SendError; - - pub fn deinit(this: *AsyncSocket) void { - this.head.release(); - } - - pub fn send(this: *This) SendError!usize { - const original_sent = this.sent; - this.head.context = this; - - this.io.send( - *AsyncMessage, - this.head, - on_send, - &this.head.completion, - this.socket, - this.head.slice(), - SOCKET_FLAGS, - ); - - var node = this.head; - while (node.next) |element| { - this.io.send( - *AsyncMessage, - element, - on_send, - &element.completion, - this.socket, - element.slice(), - SOCKET_FLAGS, - ); - node = element.next orelse break; - } - - suspend { - this.send_frame = @frame().*; - } - - if (this.err) |err| { - this.err = null; - return @errSetCast(AsyncSocket.SendError, err); - } - - return this.sent - original_sent; - } - - pub const RecvError = AsyncIO.RecvError; - - const Reader = struct { - pub fn on_read(ctx: *AsyncSocket, _: *AsyncIO.Completion, result: RecvError!usize) void { - const len = result catch |err| { - ctx.err = err; - resume ctx.read_frame; - return; - }; - ctx.read_offset += len; - resume ctx.read_frame; - } - }; - - pub fn read( - this: *AsyncSocket, - bytes: []u8, - offset: u64, - ) RecvError!u64 { - this.read_context = bytes; - this.read_offset = offset; - const original_read_offset = this.read_offset; - - this.io.recv( - *AsyncSocket, - this, - Reader.on_read, - &this.read_completion, - this.socket, - bytes, - ); - - suspend { - this.read_frame = @frame().*; - } - - if (this.err) |err| { - this.err = null; - return @errSetCast(RecvError, err); - } - - return this.read_offset - original_read_offset; - } - - pub const SSL = struct { - ssl: *boring.SSL = undefined, - ssl_loaded: bool = false, - socket: AsyncSocket, - handshake_complete: bool = false, - ssl_bio: ?*AsyncBIO = null, - read_bio: ?*AsyncMessage = null, - handshake_frame: @Frame(SSL.handshake) = undefined, - send_frame: @Frame(SSL.send) = undefined, - read_frame: @Frame(SSL.read) = undefined, - hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined, - is_ssl: bool = false, - - const SSLConnectError = ConnectError || HandshakeError; - const HandshakeError = error{OpenSSLError}; - - pub fn connect(this: *SSL, name: []const u8, port: u16) !void { - this.is_ssl = true; - try this.socket.connect(name, port); - - this.handshake_complete = false; - - var ssl = boring.initClient(); - this.ssl = ssl; - this.ssl_loaded = true; - errdefer { - this.ssl_loaded = false; - this.ssl.deinit(); - this.ssl = undefined; - } - - { - std.mem.copy(u8, &this.hostname, name); - this.hostname[name.len] = 0; - var name_ = this.hostname[0..name.len :0]; - ssl.setHostname(name_); - } - - var bio = try AsyncBIO.init(this.socket.allocator); - bio.socket_fd = this.socket.socket; - this.ssl_bio = bio; - - boring.SSL_set_bio(ssl, bio.bio, bio.bio); - - this.read_bio = AsyncMessage.get(this.socket.allocator); - try this.handshake(); - } - - pub fn close(this: *SSL) void { - this.socket.close(); - } - - fn handshake(this: *SSL) HandshakeError!void { - while (!this.ssl.isInitFinished()) { - boring.ERR_clear_error(); - this.ssl_bio.?.enqueueSend(); - const handshake_result = boring.SSL_connect(this.ssl); - if (handshake_result == 0) { - Output.prettyErrorln("ssl accept error", .{}); - Output.flush(); - return error.OpenSSLError; - } - this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished(); - - if (!this.handshake_complete) { - // accept_result < 0 - const e = boring.SSL_get_error(this.ssl, handshake_result); - if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { - this.ssl_bio.?.enqueueSend(); - suspend { - this.handshake_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.handshake_frame); - } - - continue; - } - - Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result }); - Output.flush(); - return error.OpenSSLError; - } - } - } - - pub fn write(this: *SSL, buffer_: []const u8) usize { - var buffer = buffer_; - var read_bio = this.read_bio; - while (buffer.len > 0) { - const response = read_bio.?.writeAll(buffer); - buffer = buffer[response.written..]; - if (response.overflow) { - read_bio = read_bio.?.next orelse brk: { - read_bio.?.next = AsyncMessage.get(this.socket.allocator); - break :brk read_bio.?.next.?; - }; - } - } - - return buffer_.len; - } - - pub fn send(this: *SSL) !usize { - var bio_ = this.read_bio; - var len: usize = 0; - while (bio_) |bio| { - var slice = bio.slice(); - len += this.ssl.write(slice) catch |err| { - switch (err) { - error.WantRead => { - suspend { - this.send_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.send_frame); - } - continue; - }, - error.WantWrite => { - this.ssl_bio.?.enqueueSend(); - - suspend { - this.send_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.send_frame); - } - continue; - }, - else => {}, - } - - if (comptime Environment.isDebug) { - Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{ - @errorName(err), - bio.slice(), - }); - Output.flush(); - } - - return err; - }; - - bio_ = bio.next; - } - return len; - } - - pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 { - var buf = buf_[offset..]; - var len: usize = 0; - while (buf.len > 0) { - len = this.ssl.read(buf) catch |err| { - switch (err) { - error.WantWrite => { - this.ssl_bio.?.enqueueSend(); - - if (extremely_verbose) { - Output.prettyErrorln( - "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", - .{ - @errorName(err), - @tagName(this.ssl_bio.?.read_wait), - @tagName(this.ssl_bio.?.send_wait), - }, - ); - Output.flush(); - } - - suspend { - this.read_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.read_frame); - } - continue; - }, - error.WantRead => { - // this.ssl_bio.enqueueSend(); - - if (extremely_verbose) { - Output.prettyErrorln( - "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", - .{ - @errorName(err), - @tagName(this.ssl_bio.?.read_wait), - @tagName(this.ssl_bio.?.send_wait), - }, - ); - Output.flush(); - } - - suspend { - this.read_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.read_frame); - } - continue; - }, - else => return err, - } - unreachable; - }; - - break; - } - - return len; - } - - pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL { - return SSL{ - .socket = try AsyncSocket.init(io, 0, allocator), - }; - } - - pub fn deinit(this: *SSL) void { - this.socket.deinit(); - if (!this.is_ssl) return; - - if (this.ssl_bio) |bio| { - _ = boring.BIO_set_data(bio.bio, null); - bio.pending_frame = AsyncBIO.PendingFrame.init(); - bio.socket_fd = 0; - bio.release(); - this.ssl_bio = null; - } - - if (this.ssl_loaded) { - this.ssl.deinit(); - this.ssl_loaded = false; - } - - this.handshake_complete = false; - - if (this.read_bio) |bio| { - var next_ = bio.next; - while (next_) |next| { - next.release(); - next_ = next.next; - } - - bio.release(); - this.read_bio = null; - } - } - }; -}; - -pub const AsyncBIO = struct { - bio: *boring.BIO = undefined, - socket_fd: std.os.socket_t = 0, - allocator: std.mem.Allocator, - - read_wait: Wait = Wait.pending, - send_wait: Wait = Wait.pending, - recv_completion: AsyncIO.Completion = undefined, - send_completion: AsyncIO.Completion = undefined, - - write_buffer: ?*AsyncMessage = null, - - last_send_result: AsyncIO.SendError!usize = 0, - - last_read_result: AsyncIO.RecvError!usize = 0, - next: ?*AsyncBIO = null, - pending_frame: PendingFrame = PendingFrame.init(), - - pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); - - pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { - this.pending_frame.writeItem(frame) catch {}; - } - - pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { - return this.pending_frame.readItem(); - } - - var method: ?*boring.BIO_METHOD = null; - var head: ?*AsyncBIO = null; - - const async_bio_name: [:0]const u8 = "AsyncBIO"; - - const Wait = enum { - pending, - suspended, - completed, - }; - - fn instance(allocator: std.mem.Allocator) *AsyncBIO { - if (head) |head_| { - var next = head_.next; - var ret = head_; - ret.read_wait = .pending; - ret.send_wait = .pending; - head = next; - - ret.pending_frame = PendingFrame.init(); - return ret; - } - - var bio = allocator.create(AsyncBIO) catch unreachable; - bio.* = AsyncBIO{ - .allocator = allocator, - .read_wait = .pending, - .send_wait = .pending, - }; - - return bio; - } - - pub fn release(this: *AsyncBIO) void { - if (head) |head_| { - this.next = head_; - } - - this.read_wait = .pending; - this.last_read_result = 0; - this.send_wait = .pending; - this.last_read_result = 0; - this.pending_frame = PendingFrame.init(); - - if (this.write_buffer) |write| { - write.release(); - this.write_buffer = null; - } - - head = this; - } - - pub fn init(allocator: std.mem.Allocator) !*AsyncBIO { - var bio = instance(allocator); - - bio.bio = boring.BIO_new( - method orelse brk: { - method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); - break :brk method.?; - }, - ) orelse return error.OutOfMemory; - - _ = boring.BIO_set_data(bio.bio, bio); - return bio; - } - - const WaitResult = enum { - none, - read, - send, - }; - - const Sender = struct { - pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { - this.last_send_result = result; - this.send_wait = .completed; - this.write_buffer.?.sent += @truncate(u32, result catch 0); - - if (extremely_verbose) { - const read_result = result catch @as(usize, 999); - Output.prettyErrorln("onSend: {d}", .{read_result}); - Output.flush(); - } - - if (this.pending_frame.readItem()) |frame| { - resume frame; - } - } - }; - - pub fn enqueueSend( - self: *AsyncBIO, - ) void { - if (self.write_buffer == null) return; - var to_write = self.write_buffer.?.slice(); - if (to_write.len == 0) { - return; - } - - self.last_send_result = 0; - - AsyncIO.global.send( - *AsyncBIO, - self, - Sender.onSend, - &self.send_completion, - self.socket_fd, - to_write, - SOCKET_FLAGS, - ); - self.send_wait = .suspended; - if (extremely_verbose) { - Output.prettyErrorln("enqueueSend: {d}", .{to_write.len}); - Output.flush(); - } - } - - const Reader = struct { - pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void { - this.last_read_result = result; - this.read_wait = .completed; - if (extremely_verbose) { - const read_result = result catch @as(usize, 999); - Output.prettyErrorln("onRead: {d}", .{read_result}); - Output.flush(); - } - if (this.pending_frame.readItem()) |frame| { - resume frame; - } - } - }; - - pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void { - var read_buffer = read_buf[off..]; - if (read_buffer.len == 0) { - return; - } - - self.last_read_result = 0; - AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer); - self.read_wait = .suspended; - if (extremely_verbose) { - Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len}); - Output.flush(); - } - } - - pub const Bio = struct { - inline fn cast(bio: *boring.BIO) *AsyncBIO { - return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio))); - } - - pub fn create(this_bio: *boring.BIO) callconv(.C) c_int { - boring.BIO_set_init(this_bio, 1); - return 1; - } - pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int { - boring.BIO_set_init(this_bio, 0); - - if (boring.BIO_get_data(this_bio) != null) { - var this = cast(this_bio); - this.release(); - } - - return 0; - } - pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { - std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); - - var buf = ptr[0..@intCast(usize, len)]; - boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); - - if (len <= 0) { - return 0; - } - - var this = cast(this_bio); - if (this.read_wait == .suspended) { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - } - - switch (this.send_wait) { - .pending => { - var write_buffer = this.write_buffer orelse brk: { - this.write_buffer = AsyncMessage.get(default_allocator); - break :brk this.write_buffer.?; - }; - - _ = write_buffer.writeAll(buf); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - - return -1; - }, - .suspended => { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - - return -1; - }, - .completed => { - this.send_wait = .pending; - const written = this.last_send_result catch |err| { - Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); - Output.flush(); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }; - this.last_send_result = 0; - return @intCast(c_int, written); - }, - } - - unreachable; - } - - pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int { - std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); - var buf = ptr[0..@intCast(usize, len)]; - - boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); - var this = cast(this_bio); - - switch (this.read_wait) { - .pending => { - this.enqueueRead(buf, 0); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }, - .suspended => { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }, - .completed => { - this.read_wait = .pending; - const read_len = this.last_read_result catch |err| { - Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); - Output.flush(); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }; - this.last_read_result = 0; - return @intCast(c_int, read_len); - }, - } - unreachable; - } - pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long { - return switch (cmd) { - boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0, - else => 1, - }; - } - }; -}; - pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); @@ -1477,7 +542,6 @@ pub fn connect( try connector.connect(this.url.hostname, port); var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; - client.setReadBufferSize(buffer_pool_len) catch {}; // client.setQuickACK(true) catch {}; this.tcp_client = client; @@ -1564,135 +628,6 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin } } -const ZlibPool = struct { - lock: Lock = Lock.init(), - items: std.ArrayList(*MutableString), - allocator: std.mem.Allocator, - pub var instance: ZlibPool = undefined; - pub var loaded: bool = false; - pub var decompression_thread_pool: ThreadPool = undefined; - pub var decompression_thread_pool_loaded: bool = false; - - pub fn init(allocator: std.mem.Allocator) ZlibPool { - return ZlibPool{ - .allocator = allocator, - .items = std.ArrayList(*MutableString).init(allocator), - }; - } - - pub fn get(this: *ZlibPool) !*MutableString { - switch (this.items.items.len) { - 0 => { - var mutable = try default_allocator.create(MutableString); - mutable.* = try MutableString.init(default_allocator, 0); - return mutable; - }, - else => { - return this.items.pop(); - }, - } - - unreachable; - } - - pub fn put(this: *ZlibPool, mutable: *MutableString) !void { - mutable.reset(); - try this.items.append(mutable); - } - - pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void { - // Heuristic: if we have more than 128 KB of data to decompress - // it may take 1ms or so - // We must keep the network thread unblocked as often as possible - // So if we have more than 50 KB of data to decompress, we do it off the network thread - // if (compressed_data.len < 50_000) { - var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, default_allocator); - try reader.readAll(); - return; - // } - - // var task = try DecompressionTask.get(default_allocator); - // defer task.release(); - // task.* = DecompressionTask{ - // .data = compressed_data, - // .output = output, - // .event_fd = AsyncIO.global.eventfd(), - // }; - // task.scheduleAndWait(); - - // if (task.err) |err| { - // return @errSetCast(Zlib.ZlibError, err); - // } - } - - pub const DecompressionTask = struct { - task: ThreadPool.Task = ThreadPool.Task{ .callback = callback }, - frame: @Frame(scheduleAndWait) = undefined, - data: []const u8, - output: *MutableString = undefined, - completion: Completion = undefined, - event_fd: std.os.fd_t = 0, - err: ?anyerror = null, - next: ?*DecompressionTask = null, - - pub var head: ?*DecompressionTask = null; - - pub fn get(allocator: std.mem.Allocator) !*DecompressionTask { - if (head) |head_| { - var this = head_; - head = this.next; - this.next = null; - return this; - } - - return try allocator.create(DecompressionTask); - } - - pub fn scheduleAndWait(task: *DecompressionTask) void { - if (!decompression_thread_pool_loaded) { - decompression_thread_pool_loaded = true; - decompression_thread_pool = ThreadPool.init(.{ .max_threads = 1 }); - } - - AsyncIO.global.event( - *DecompressionTask, - task, - DecompressionTask.finished, - &task.completion, - task.event_fd, - ); - - suspend { - var batch = ThreadPool.Batch.from(&task.task); - decompression_thread_pool.schedule(batch); - task.frame = @frame().*; - } - } - - pub fn release(this: *DecompressionTask) void { - this.next = head; - head = this; - } - - fn callback_(this: *DecompressionTask) Zlib.ZlibError!void { - var reader = try Zlib.ZlibReaderArrayList.init(this.data, &this.output.list, default_allocator); - try reader.readAll(); - } - - pub fn callback(task: *ThreadPool.Task) void { - var this: *DecompressionTask = @fieldParentPtr(DecompressionTask, "task", task); - this.callback_() catch |err| { - this.err = err; - }; - AsyncIO.triggerEvent(this.event_fd, &this.completion) catch {}; - } - - pub fn finished(this: *DecompressionTask, _: *Completion, _: void) void { - resume this.frame; - } - }; -}; - pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { defer if (this.verbose) Output.flush(); var response: picohttp.Response = undefined; |