diff options
author | 2022-01-25 19:58:59 -0800 | |
---|---|---|
committer | 2022-01-25 20:00:23 -0800 | |
commit | 9db4d195a7c6c777c37ba72675de08f5179aa5ee (patch) | |
tree | 1b7ce4e5422d3e74d7a137d84bbbd51329f579a8 /src/http_client_async.zig | |
parent | 0808f293756e3cb61a814091cbde03090ee165fb (diff) | |
download | bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.tar.gz bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.tar.zst bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.zip |
Split http into files
Diffstat (limited to '')
-rw-r--r-- | src/http_client_async.zig | 1093 |
1 files changed, 14 insertions, 1079 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 069908322..54c42418f 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -24,6 +24,11 @@ pub const NetworkThread = @import("./network_thread.zig"); const ObjectPool = @import("./pool.zig").ObjectPool; const SOCK = os.SOCK; const Arena = @import("./mimalloc_arena.zig").Arena; +const AsyncMessage = @import("./http/async_message.zig"); +const AsyncBIO = @import("./http/async_bio.zig"); +const AsyncSocket = @import("./http/async_socket.zig"); +const ZlibPool = @import("./http/zlib.zig"); +const URLBufferPool = ObjectPool([4096]u8, null, false); // This becomes Arena.allocator pub var default_allocator: std.mem.Allocator = undefined; @@ -43,22 +48,20 @@ pub fn onThreadStart() void { NetworkThread.global.pool.io = &AsyncIO.global; } -pub const Headers = struct { - pub const Kv = struct { - name: Api.StringPointer, - value: Api.StringPointer, - }; - pub const Entries = std.MultiArrayList(Kv); -}; +pub inline fn getAllocator() std.mem.Allocator { + return default_allocator; +} + +pub const Headers = @import("./http/headers.zig"); -const SOCKET_FLAGS: u32 = if (Environment.isLinux) +pub const SOCKET_FLAGS: u32 = if (Environment.isLinux) SOCK.CLOEXEC | os.MSG.NOSIGNAL else SOCK.CLOEXEC; -const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; +pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; -const extremely_verbose = false; +pub const extremely_verbose = false; fn writeRequest( comptime Writer: type, @@ -211,42 +214,7 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string { return this.header_buf[ptr.offset..][0..ptr.length]; } -pub const HeaderBuilder = struct { - content: StringBuilder = StringBuilder{}, - header_count: u64 = 0, - entries: Headers.Entries = Headers.Entries{}, - - pub fn count(this: *HeaderBuilder, name: string, value: string) void { - this.header_count += 1; - this.content.count(name); - this.content.count(value); - } - - pub fn allocate(this: *HeaderBuilder, allocator: std.mem.Allocator) !void { - try this.content.allocate(allocator); - try this.entries.ensureTotalCapacity(allocator, this.header_count); - } - pub fn append(this: *HeaderBuilder, name: string, value: string) void { - const name_ptr = Api.StringPointer{ - .offset = @truncate(u32, this.content.len), - .length = @truncate(u32, name.len), - }; - - _ = this.content.append(name); - - const value_ptr = Api.StringPointer{ - .offset = @truncate(u32, this.content.len), - .length = @truncate(u32, value.len), - }; - _ = this.content.append(value); - this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr }); - } - - pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void { - client.header_entries = this.entries; - client.header_buf = this.content.ptr.?[0..this.content.len]; - } -}; +pub const HeaderBuilder = @import("./http/header_builder.zig"); pub const HTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 }); // 32 pointers much cheaper than 1000 pointers @@ -466,909 +434,6 @@ pub const AsyncHTTP = struct { } }; -const buffer_pool_len = std.math.maxInt(u16) - 64; -const BufferPool = ObjectPool([buffer_pool_len]u8, null, false); -const URLBufferPool = ObjectPool([4096]u8, null, false); - -pub const AsyncMessage = struct { - used: u32 = 0, - sent: u32 = 0, - completion: AsyncIO.Completion = undefined, - buf: []u8 = undefined, - pooled: ?*BufferPool.Node = null, - allocator: std.mem.Allocator, - next: ?*AsyncMessage = null, - context: *anyopaque = undefined, - released: bool = false, - var _first_ssl: ?*AsyncMessage = null; - pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage { - if (_first_ssl) |first| { - var prev = first; - - std.debug.assert(prev.released); - if (prev.next) |next| { - _first_ssl = next; - prev.next = null; - } else { - _first_ssl = null; - } - prev.released = false; - - return prev; - } - - var msg = allocator.create(AsyncMessage) catch unreachable; - msg.* = AsyncMessage{ - .allocator = allocator, - .pooled = null, - .buf = &[_]u8{}, - }; - return msg; - } - - var _first: ?*AsyncMessage = null; - pub fn get(allocator: std.mem.Allocator) *AsyncMessage { - if (_first) |first| { - var prev = first; - std.debug.assert(prev.released); - prev.released = false; - - if (first.next) |next| { - _first = next; - prev.next = null; - return prev; - } else { - _first = null; - } - - return prev; - } - - var msg = allocator.create(AsyncMessage) catch unreachable; - var pooled = BufferPool.get(allocator); - msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled }; - return msg; - } - - pub fn release(self: *AsyncMessage) void { - self.used = 0; - self.sent = 0; - if (self.released) return; - self.released = true; - - if (self.pooled != null) { - var old = _first; - _first = self; - self.next = old; - } else { - var old = _first_ssl; - self.next = old; - _first_ssl = self; - } - } - - const WriteResponse = struct { - written: u32 = 0, - overflow: bool = false, - }; - - pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse { - var remain = this.buf[this.used..]; - var writable = buffer[0..@minimum(buffer.len, remain.len)]; - if (writable.len == 0) { - return .{ .written = 0, .overflow = buffer.len > 0 }; - } - - std.mem.copy(u8, remain, writable); - this.used += @intCast(u16, writable.len); - - return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len }; - } - - pub inline fn slice(this: *const AsyncMessage) []const u8 { - return this.buf[0..this.used][this.sent..]; - } - - pub inline fn available(this: *AsyncMessage) []u8 { - return this.buf[0 .. this.buf.len - this.used]; - } -}; - -const Completion = AsyncIO.Completion; - -const AsyncSocket = struct { - const This = @This(); - io: *AsyncIO = undefined, - socket: std.os.socket_t = 0, - head: *AsyncMessage = undefined, - tail: *AsyncMessage = undefined, - allocator: std.mem.Allocator, - err: ?anyerror = null, - queued: usize = 0, - sent: usize = 0, - send_frame: @Frame(AsyncSocket.send) = undefined, - read_frame: @Frame(AsyncSocket.read) = undefined, - connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined, - close_frame: @Frame(AsyncSocket.close) = undefined, - - read_context: []u8 = undefined, - read_offset: u64 = 0, - read_completion: AsyncIO.Completion = undefined, - connect_completion: AsyncIO.Completion = undefined, - close_completion: AsyncIO.Completion = undefined, - - const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError || error{ - UnknownHostName, - ConnectionRefused, - AddressNotAvailable, - }; - - pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: std.mem.Allocator) !AsyncSocket { - var head = AsyncMessage.get(allocator); - - return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator }; - } - - fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void { - err catch |resolved_err| { - this.err = resolved_err; - }; - - resume this.connect_frame; - } - - fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void { - const sockfd = AsyncIO.openSocket(address.any.family, OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP) catch |err| { - if (extremely_verbose) { - Output.prettyErrorln("openSocket error: {s}", .{@errorName(err)}); - } - - return error.ConnectionRefused; - }; - - this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); - suspend { - this.connect_frame = @frame().*; - } - - if (this.err) |e| { - return @errSetCast(ConnectError, e); - } - - this.socket = sockfd; - return; - } - - fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void { - resume this.close_frame; - } - - pub fn close(this: *AsyncSocket) void { - if (this.socket == 0) return; - this.io.close(*AsyncSocket, this, on_close, &this.close_completion, this.socket); - suspend { - this.close_frame = @frame().*; - } - this.socket = 0; - } - - pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { - this.socket = 0; - outer: while (true) { - // on macOS, getaddrinfo() is very slow - // If you send ~200 network requests, about 1.5s is spent on getaddrinfo() - // So, we cache this. - var address_list = NetworkThread.getAddressList(default_allocator, name, port) catch |err| { - return @errSetCast(ConnectError, err); - }; - - const list = address_list.address_list; - if (list.addrs.len == 0) return error.ConnectionRefused; - - try_cached_index: { - if (address_list.index) |i| { - const address = list.addrs[i]; - if (address_list.invalidated) continue :outer; - - this.connectToAddress(address) catch |err| { - if (err == error.ConnectionRefused) { - address_list.index = null; - break :try_cached_index; - } - - address_list.invalidate(); - continue :outer; - }; - } - } - - for (list.addrs) |address, i| { - if (address_list.invalidated) continue :outer; - this.connectToAddress(address) catch |err| { - if (err == error.ConnectionRefused) continue; - address_list.invalidate(); - if (err == error.AddressNotAvailable or err == error.UnknownHostName) continue :outer; - return err; - }; - address_list.index = @truncate(u32, i); - return; - } - - if (address_list.invalidated) continue :outer; - - address_list.invalidate(); - return error.ConnectionRefused; - } - } - - fn on_send(msg: *AsyncMessage, _: *Completion, result: SendError!usize) void { - var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context)); - const written = result catch |err| { - this.err = err; - resume this.send_frame; - return; - }; - - if (written == 0) { - resume this.send_frame; - return; - } - - msg.sent += @truncate(u16, written); - const has_more = msg.used > msg.sent; - this.sent += written; - - if (has_more) { - this.io.send( - *AsyncMessage, - msg, - on_send, - &msg.completion, - this.socket, - msg.slice(), - SOCKET_FLAGS, - ); - } else { - msg.release(); - } - - // complete - if (this.queued <= this.sent) { - resume this.send_frame; - } - } - - pub fn write(this: *AsyncSocket, buf: []const u8) usize { - this.tail.context = this; - - const resp = this.tail.writeAll(buf); - this.queued += resp.written; - - if (resp.overflow) { - var next = AsyncMessage.get(default_allocator); - this.tail.next = next; - this.tail = next; - - return @as(usize, resp.written) + this.write(buf[resp.written..]); - } - - return @as(usize, resp.written); - } - - pub const SendError = AsyncIO.SendError; - - pub fn deinit(this: *AsyncSocket) void { - this.head.release(); - } - - pub fn send(this: *This) SendError!usize { - const original_sent = this.sent; - this.head.context = this; - - this.io.send( - *AsyncMessage, - this.head, - on_send, - &this.head.completion, - this.socket, - this.head.slice(), - SOCKET_FLAGS, - ); - - var node = this.head; - while (node.next) |element| { - this.io.send( - *AsyncMessage, - element, - on_send, - &element.completion, - this.socket, - element.slice(), - SOCKET_FLAGS, - ); - node = element.next orelse break; - } - - suspend { - this.send_frame = @frame().*; - } - - if (this.err) |err| { - this.err = null; - return @errSetCast(AsyncSocket.SendError, err); - } - - return this.sent - original_sent; - } - - pub const RecvError = AsyncIO.RecvError; - - const Reader = struct { - pub fn on_read(ctx: *AsyncSocket, _: *AsyncIO.Completion, result: RecvError!usize) void { - const len = result catch |err| { - ctx.err = err; - resume ctx.read_frame; - return; - }; - ctx.read_offset += len; - resume ctx.read_frame; - } - }; - - pub fn read( - this: *AsyncSocket, - bytes: []u8, - offset: u64, - ) RecvError!u64 { - this.read_context = bytes; - this.read_offset = offset; - const original_read_offset = this.read_offset; - - this.io.recv( - *AsyncSocket, - this, - Reader.on_read, - &this.read_completion, - this.socket, - bytes, - ); - - suspend { - this.read_frame = @frame().*; - } - - if (this.err) |err| { - this.err = null; - return @errSetCast(RecvError, err); - } - - return this.read_offset - original_read_offset; - } - - pub const SSL = struct { - ssl: *boring.SSL = undefined, - ssl_loaded: bool = false, - socket: AsyncSocket, - handshake_complete: bool = false, - ssl_bio: ?*AsyncBIO = null, - read_bio: ?*AsyncMessage = null, - handshake_frame: @Frame(SSL.handshake) = undefined, - send_frame: @Frame(SSL.send) = undefined, - read_frame: @Frame(SSL.read) = undefined, - hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined, - is_ssl: bool = false, - - const SSLConnectError = ConnectError || HandshakeError; - const HandshakeError = error{OpenSSLError}; - - pub fn connect(this: *SSL, name: []const u8, port: u16) !void { - this.is_ssl = true; - try this.socket.connect(name, port); - - this.handshake_complete = false; - - var ssl = boring.initClient(); - this.ssl = ssl; - this.ssl_loaded = true; - errdefer { - this.ssl_loaded = false; - this.ssl.deinit(); - this.ssl = undefined; - } - - { - std.mem.copy(u8, &this.hostname, name); - this.hostname[name.len] = 0; - var name_ = this.hostname[0..name.len :0]; - ssl.setHostname(name_); - } - - var bio = try AsyncBIO.init(this.socket.allocator); - bio.socket_fd = this.socket.socket; - this.ssl_bio = bio; - - boring.SSL_set_bio(ssl, bio.bio, bio.bio); - - this.read_bio = AsyncMessage.get(this.socket.allocator); - try this.handshake(); - } - - pub fn close(this: *SSL) void { - this.socket.close(); - } - - fn handshake(this: *SSL) HandshakeError!void { - while (!this.ssl.isInitFinished()) { - boring.ERR_clear_error(); - this.ssl_bio.?.enqueueSend(); - const handshake_result = boring.SSL_connect(this.ssl); - if (handshake_result == 0) { - Output.prettyErrorln("ssl accept error", .{}); - Output.flush(); - return error.OpenSSLError; - } - this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished(); - - if (!this.handshake_complete) { - // accept_result < 0 - const e = boring.SSL_get_error(this.ssl, handshake_result); - if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { - this.ssl_bio.?.enqueueSend(); - suspend { - this.handshake_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.handshake_frame); - } - - continue; - } - - Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result }); - Output.flush(); - return error.OpenSSLError; - } - } - } - - pub fn write(this: *SSL, buffer_: []const u8) usize { - var buffer = buffer_; - var read_bio = this.read_bio; - while (buffer.len > 0) { - const response = read_bio.?.writeAll(buffer); - buffer = buffer[response.written..]; - if (response.overflow) { - read_bio = read_bio.?.next orelse brk: { - read_bio.?.next = AsyncMessage.get(this.socket.allocator); - break :brk read_bio.?.next.?; - }; - } - } - - return buffer_.len; - } - - pub fn send(this: *SSL) !usize { - var bio_ = this.read_bio; - var len: usize = 0; - while (bio_) |bio| { - var slice = bio.slice(); - len += this.ssl.write(slice) catch |err| { - switch (err) { - error.WantRead => { - suspend { - this.send_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.send_frame); - } - continue; - }, - error.WantWrite => { - this.ssl_bio.?.enqueueSend(); - - suspend { - this.send_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.send_frame); - } - continue; - }, - else => {}, - } - - if (comptime Environment.isDebug) { - Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{ - @errorName(err), - bio.slice(), - }); - Output.flush(); - } - - return err; - }; - - bio_ = bio.next; - } - return len; - } - - pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 { - var buf = buf_[offset..]; - var len: usize = 0; - while (buf.len > 0) { - len = this.ssl.read(buf) catch |err| { - switch (err) { - error.WantWrite => { - this.ssl_bio.?.enqueueSend(); - - if (extremely_verbose) { - Output.prettyErrorln( - "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", - .{ - @errorName(err), - @tagName(this.ssl_bio.?.read_wait), - @tagName(this.ssl_bio.?.send_wait), - }, - ); - Output.flush(); - } - - suspend { - this.read_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.read_frame); - } - continue; - }, - error.WantRead => { - // this.ssl_bio.enqueueSend(); - - if (extremely_verbose) { - Output.prettyErrorln( - "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", - .{ - @errorName(err), - @tagName(this.ssl_bio.?.read_wait), - @tagName(this.ssl_bio.?.send_wait), - }, - ); - Output.flush(); - } - - suspend { - this.read_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.read_frame); - } - continue; - }, - else => return err, - } - unreachable; - }; - - break; - } - - return len; - } - - pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL { - return SSL{ - .socket = try AsyncSocket.init(io, 0, allocator), - }; - } - - pub fn deinit(this: *SSL) void { - this.socket.deinit(); - if (!this.is_ssl) return; - - if (this.ssl_bio) |bio| { - _ = boring.BIO_set_data(bio.bio, null); - bio.pending_frame = AsyncBIO.PendingFrame.init(); - bio.socket_fd = 0; - bio.release(); - this.ssl_bio = null; - } - - if (this.ssl_loaded) { - this.ssl.deinit(); - this.ssl_loaded = false; - } - - this.handshake_complete = false; - - if (this.read_bio) |bio| { - var next_ = bio.next; - while (next_) |next| { - next.release(); - next_ = next.next; - } - - bio.release(); - this.read_bio = null; - } - } - }; -}; - -pub const AsyncBIO = struct { - bio: *boring.BIO = undefined, - socket_fd: std.os.socket_t = 0, - allocator: std.mem.Allocator, - - read_wait: Wait = Wait.pending, - send_wait: Wait = Wait.pending, - recv_completion: AsyncIO.Completion = undefined, - send_completion: AsyncIO.Completion = undefined, - - write_buffer: ?*AsyncMessage = null, - - last_send_result: AsyncIO.SendError!usize = 0, - - last_read_result: AsyncIO.RecvError!usize = 0, - next: ?*AsyncBIO = null, - pending_frame: PendingFrame = PendingFrame.init(), - - pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); - - pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { - this.pending_frame.writeItem(frame) catch {}; - } - - pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { - return this.pending_frame.readItem(); - } - - var method: ?*boring.BIO_METHOD = null; - var head: ?*AsyncBIO = null; - - const async_bio_name: [:0]const u8 = "AsyncBIO"; - - const Wait = enum { - pending, - suspended, - completed, - }; - - fn instance(allocator: std.mem.Allocator) *AsyncBIO { - if (head) |head_| { - var next = head_.next; - var ret = head_; - ret.read_wait = .pending; - ret.send_wait = .pending; - head = next; - - ret.pending_frame = PendingFrame.init(); - return ret; - } - - var bio = allocator.create(AsyncBIO) catch unreachable; - bio.* = AsyncBIO{ - .allocator = allocator, - .read_wait = .pending, - .send_wait = .pending, - }; - - return bio; - } - - pub fn release(this: *AsyncBIO) void { - if (head) |head_| { - this.next = head_; - } - - this.read_wait = .pending; - this.last_read_result = 0; - this.send_wait = .pending; - this.last_read_result = 0; - this.pending_frame = PendingFrame.init(); - - if (this.write_buffer) |write| { - write.release(); - this.write_buffer = null; - } - - head = this; - } - - pub fn init(allocator: std.mem.Allocator) !*AsyncBIO { - var bio = instance(allocator); - - bio.bio = boring.BIO_new( - method orelse brk: { - method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); - break :brk method.?; - }, - ) orelse return error.OutOfMemory; - - _ = boring.BIO_set_data(bio.bio, bio); - return bio; - } - - const WaitResult = enum { - none, - read, - send, - }; - - const Sender = struct { - pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { - this.last_send_result = result; - this.send_wait = .completed; - this.write_buffer.?.sent += @truncate(u32, result catch 0); - - if (extremely_verbose) { - const read_result = result catch @as(usize, 999); - Output.prettyErrorln("onSend: {d}", .{read_result}); - Output.flush(); - } - - if (this.pending_frame.readItem()) |frame| { - resume frame; - } - } - }; - - pub fn enqueueSend( - self: *AsyncBIO, - ) void { - if (self.write_buffer == null) return; - var to_write = self.write_buffer.?.slice(); - if (to_write.len == 0) { - return; - } - - self.last_send_result = 0; - - AsyncIO.global.send( - *AsyncBIO, - self, - Sender.onSend, - &self.send_completion, - self.socket_fd, - to_write, - SOCKET_FLAGS, - ); - self.send_wait = .suspended; - if (extremely_verbose) { - Output.prettyErrorln("enqueueSend: {d}", .{to_write.len}); - Output.flush(); - } - } - - const Reader = struct { - pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void { - this.last_read_result = result; - this.read_wait = .completed; - if (extremely_verbose) { - const read_result = result catch @as(usize, 999); - Output.prettyErrorln("onRead: {d}", .{read_result}); - Output.flush(); - } - if (this.pending_frame.readItem()) |frame| { - resume frame; - } - } - }; - - pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void { - var read_buffer = read_buf[off..]; - if (read_buffer.len == 0) { - return; - } - - self.last_read_result = 0; - AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer); - self.read_wait = .suspended; - if (extremely_verbose) { - Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len}); - Output.flush(); - } - } - - pub const Bio = struct { - inline fn cast(bio: *boring.BIO) *AsyncBIO { - return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio))); - } - - pub fn create(this_bio: *boring.BIO) callconv(.C) c_int { - boring.BIO_set_init(this_bio, 1); - return 1; - } - pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int { - boring.BIO_set_init(this_bio, 0); - - if (boring.BIO_get_data(this_bio) != null) { - var this = cast(this_bio); - this.release(); - } - - return 0; - } - pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { - std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); - - var buf = ptr[0..@intCast(usize, len)]; - boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); - - if (len <= 0) { - return 0; - } - - var this = cast(this_bio); - if (this.read_wait == .suspended) { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - } - - switch (this.send_wait) { - .pending => { - var write_buffer = this.write_buffer orelse brk: { - this.write_buffer = AsyncMessage.get(default_allocator); - break :brk this.write_buffer.?; - }; - - _ = write_buffer.writeAll(buf); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - - return -1; - }, - .suspended => { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - - return -1; - }, - .completed => { - this.send_wait = .pending; - const written = this.last_send_result catch |err| { - Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); - Output.flush(); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }; - this.last_send_result = 0; - return @intCast(c_int, written); - }, - } - - unreachable; - } - - pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int { - std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); - var buf = ptr[0..@intCast(usize, len)]; - - boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); - var this = cast(this_bio); - - switch (this.read_wait) { - .pending => { - this.enqueueRead(buf, 0); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }, - .suspended => { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }, - .completed => { - this.read_wait = .pending; - const read_len = this.last_read_result catch |err| { - Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); - Output.flush(); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }; - this.last_read_result = 0; - return @intCast(c_int, read_len); - }, - } - unreachable; - } - pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long { - return switch (cmd) { - boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0, - else => 1, - }; - } - }; -}; - pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); @@ -1477,7 +542,6 @@ pub fn connect( try connector.connect(this.url.hostname, port); var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; - client.setReadBufferSize(buffer_pool_len) catch {}; // client.setQuickACK(true) catch {}; this.tcp_client = client; @@ -1564,135 +628,6 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin } } -const ZlibPool = struct { - lock: Lock = Lock.init(), - items: std.ArrayList(*MutableString), - allocator: std.mem.Allocator, - pub var instance: ZlibPool = undefined; - pub var loaded: bool = false; - pub var decompression_thread_pool: ThreadPool = undefined; - pub var decompression_thread_pool_loaded: bool = false; - - pub fn init(allocator: std.mem.Allocator) ZlibPool { - return ZlibPool{ - .allocator = allocator, - .items = std.ArrayList(*MutableString).init(allocator), - }; - } - - pub fn get(this: *ZlibPool) !*MutableString { - switch (this.items.items.len) { - 0 => { - var mutable = try default_allocator.create(MutableString); - mutable.* = try MutableString.init(default_allocator, 0); - return mutable; - }, - else => { - return this.items.pop(); - }, - } - - unreachable; - } - - pub fn put(this: *ZlibPool, mutable: *MutableString) !void { - mutable.reset(); - try this.items.append(mutable); - } - - pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void { - // Heuristic: if we have more than 128 KB of data to decompress - // it may take 1ms or so - // We must keep the network thread unblocked as often as possible - // So if we have more than 50 KB of data to decompress, we do it off the network thread - // if (compressed_data.len < 50_000) { - var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, default_allocator); - try reader.readAll(); - return; - // } - - // var task = try DecompressionTask.get(default_allocator); - // defer task.release(); - // task.* = DecompressionTask{ - // .data = compressed_data, - // .output = output, - // .event_fd = AsyncIO.global.eventfd(), - // }; - // task.scheduleAndWait(); - - // if (task.err) |err| { - // return @errSetCast(Zlib.ZlibError, err); - // } - } - - pub const DecompressionTask = struct { - task: ThreadPool.Task = ThreadPool.Task{ .callback = callback }, - frame: @Frame(scheduleAndWait) = undefined, - data: []const u8, - output: *MutableString = undefined, - completion: Completion = undefined, - event_fd: std.os.fd_t = 0, - err: ?anyerror = null, - next: ?*DecompressionTask = null, - - pub var head: ?*DecompressionTask = null; - - pub fn get(allocator: std.mem.Allocator) !*DecompressionTask { - if (head) |head_| { - var this = head_; - head = this.next; - this.next = null; - return this; - } - - return try allocator.create(DecompressionTask); - } - - pub fn scheduleAndWait(task: *DecompressionTask) void { - if (!decompression_thread_pool_loaded) { - decompression_thread_pool_loaded = true; - decompression_thread_pool = ThreadPool.init(.{ .max_threads = 1 }); - } - - AsyncIO.global.event( - *DecompressionTask, - task, - DecompressionTask.finished, - &task.completion, - task.event_fd, - ); - - suspend { - var batch = ThreadPool.Batch.from(&task.task); - decompression_thread_pool.schedule(batch); - task.frame = @frame().*; - } - } - - pub fn release(this: *DecompressionTask) void { - this.next = head; - head = this; - } - - fn callback_(this: *DecompressionTask) Zlib.ZlibError!void { - var reader = try Zlib.ZlibReaderArrayList.init(this.data, &this.output.list, default_allocator); - try reader.readAll(); - } - - pub fn callback(task: *ThreadPool.Task) void { - var this: *DecompressionTask = @fieldParentPtr(DecompressionTask, "task", task); - this.callback_() catch |err| { - this.err = err; - }; - AsyncIO.triggerEvent(this.event_fd, &this.completion) catch {}; - } - - pub fn finished(this: *DecompressionTask, _: *Completion, _: void) void { - resume this.frame; - } - }; -}; - pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { defer if (this.verbose) Output.flush(); var response: picohttp.Response = undefined; |