aboutsummaryrefslogtreecommitdiff
path: root/src/http_client_async.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-01-25 19:58:59 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-01-25 20:00:23 -0800
commit9db4d195a7c6c777c37ba72675de08f5179aa5ee (patch)
tree1b7ce4e5422d3e74d7a137d84bbbd51329f579a8 /src/http_client_async.zig
parent0808f293756e3cb61a814091cbde03090ee165fb (diff)
downloadbun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.tar.gz
bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.tar.zst
bun-9db4d195a7c6c777c37ba72675de08f5179aa5ee.zip
Split http into files
Diffstat (limited to '')
-rw-r--r--src/http_client_async.zig1093
1 files changed, 14 insertions, 1079 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 069908322..54c42418f 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -24,6 +24,11 @@ pub const NetworkThread = @import("./network_thread.zig");
const ObjectPool = @import("./pool.zig").ObjectPool;
const SOCK = os.SOCK;
const Arena = @import("./mimalloc_arena.zig").Arena;
+const AsyncMessage = @import("./http/async_message.zig");
+const AsyncBIO = @import("./http/async_bio.zig");
+const AsyncSocket = @import("./http/async_socket.zig");
+const ZlibPool = @import("./http/zlib.zig");
+const URLBufferPool = ObjectPool([4096]u8, null, false);
// This becomes Arena.allocator
pub var default_allocator: std.mem.Allocator = undefined;
@@ -43,22 +48,20 @@ pub fn onThreadStart() void {
NetworkThread.global.pool.io = &AsyncIO.global;
}
-pub const Headers = struct {
- pub const Kv = struct {
- name: Api.StringPointer,
- value: Api.StringPointer,
- };
- pub const Entries = std.MultiArrayList(Kv);
-};
+pub inline fn getAllocator() std.mem.Allocator {
+ return default_allocator;
+}
+
+pub const Headers = @import("./http/headers.zig");
-const SOCKET_FLAGS: u32 = if (Environment.isLinux)
+pub const SOCKET_FLAGS: u32 = if (Environment.isLinux)
SOCK.CLOEXEC | os.MSG.NOSIGNAL
else
SOCK.CLOEXEC;
-const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC;
+pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC;
-const extremely_verbose = false;
+pub const extremely_verbose = false;
fn writeRequest(
comptime Writer: type,
@@ -211,42 +214,7 @@ pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string {
return this.header_buf[ptr.offset..][0..ptr.length];
}
-pub const HeaderBuilder = struct {
- content: StringBuilder = StringBuilder{},
- header_count: u64 = 0,
- entries: Headers.Entries = Headers.Entries{},
-
- pub fn count(this: *HeaderBuilder, name: string, value: string) void {
- this.header_count += 1;
- this.content.count(name);
- this.content.count(value);
- }
-
- pub fn allocate(this: *HeaderBuilder, allocator: std.mem.Allocator) !void {
- try this.content.allocate(allocator);
- try this.entries.ensureTotalCapacity(allocator, this.header_count);
- }
- pub fn append(this: *HeaderBuilder, name: string, value: string) void {
- const name_ptr = Api.StringPointer{
- .offset = @truncate(u32, this.content.len),
- .length = @truncate(u32, name.len),
- };
-
- _ = this.content.append(name);
-
- const value_ptr = Api.StringPointer{
- .offset = @truncate(u32, this.content.len),
- .length = @truncate(u32, value.len),
- };
- _ = this.content.append(value);
- this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr });
- }
-
- pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void {
- client.header_entries = this.entries;
- client.header_buf = this.content.ptr.?[0..this.content.len];
- }
-};
+pub const HeaderBuilder = @import("./http/header_builder.zig");
pub const HTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 });
// 32 pointers much cheaper than 1000 pointers
@@ -466,909 +434,6 @@ pub const AsyncHTTP = struct {
}
};
-const buffer_pool_len = std.math.maxInt(u16) - 64;
-const BufferPool = ObjectPool([buffer_pool_len]u8, null, false);
-const URLBufferPool = ObjectPool([4096]u8, null, false);
-
-pub const AsyncMessage = struct {
- used: u32 = 0,
- sent: u32 = 0,
- completion: AsyncIO.Completion = undefined,
- buf: []u8 = undefined,
- pooled: ?*BufferPool.Node = null,
- allocator: std.mem.Allocator,
- next: ?*AsyncMessage = null,
- context: *anyopaque = undefined,
- released: bool = false,
- var _first_ssl: ?*AsyncMessage = null;
- pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage {
- if (_first_ssl) |first| {
- var prev = first;
-
- std.debug.assert(prev.released);
- if (prev.next) |next| {
- _first_ssl = next;
- prev.next = null;
- } else {
- _first_ssl = null;
- }
- prev.released = false;
-
- return prev;
- }
-
- var msg = allocator.create(AsyncMessage) catch unreachable;
- msg.* = AsyncMessage{
- .allocator = allocator,
- .pooled = null,
- .buf = &[_]u8{},
- };
- return msg;
- }
-
- var _first: ?*AsyncMessage = null;
- pub fn get(allocator: std.mem.Allocator) *AsyncMessage {
- if (_first) |first| {
- var prev = first;
- std.debug.assert(prev.released);
- prev.released = false;
-
- if (first.next) |next| {
- _first = next;
- prev.next = null;
- return prev;
- } else {
- _first = null;
- }
-
- return prev;
- }
-
- var msg = allocator.create(AsyncMessage) catch unreachable;
- var pooled = BufferPool.get(allocator);
- msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled };
- return msg;
- }
-
- pub fn release(self: *AsyncMessage) void {
- self.used = 0;
- self.sent = 0;
- if (self.released) return;
- self.released = true;
-
- if (self.pooled != null) {
- var old = _first;
- _first = self;
- self.next = old;
- } else {
- var old = _first_ssl;
- self.next = old;
- _first_ssl = self;
- }
- }
-
- const WriteResponse = struct {
- written: u32 = 0,
- overflow: bool = false,
- };
-
- pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse {
- var remain = this.buf[this.used..];
- var writable = buffer[0..@minimum(buffer.len, remain.len)];
- if (writable.len == 0) {
- return .{ .written = 0, .overflow = buffer.len > 0 };
- }
-
- std.mem.copy(u8, remain, writable);
- this.used += @intCast(u16, writable.len);
-
- return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len };
- }
-
- pub inline fn slice(this: *const AsyncMessage) []const u8 {
- return this.buf[0..this.used][this.sent..];
- }
-
- pub inline fn available(this: *AsyncMessage) []u8 {
- return this.buf[0 .. this.buf.len - this.used];
- }
-};
-
-const Completion = AsyncIO.Completion;
-
-const AsyncSocket = struct {
- const This = @This();
- io: *AsyncIO = undefined,
- socket: std.os.socket_t = 0,
- head: *AsyncMessage = undefined,
- tail: *AsyncMessage = undefined,
- allocator: std.mem.Allocator,
- err: ?anyerror = null,
- queued: usize = 0,
- sent: usize = 0,
- send_frame: @Frame(AsyncSocket.send) = undefined,
- read_frame: @Frame(AsyncSocket.read) = undefined,
- connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined,
- close_frame: @Frame(AsyncSocket.close) = undefined,
-
- read_context: []u8 = undefined,
- read_offset: u64 = 0,
- read_completion: AsyncIO.Completion = undefined,
- connect_completion: AsyncIO.Completion = undefined,
- close_completion: AsyncIO.Completion = undefined,
-
- const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError || error{
- UnknownHostName,
- ConnectionRefused,
- AddressNotAvailable,
- };
-
- pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: std.mem.Allocator) !AsyncSocket {
- var head = AsyncMessage.get(allocator);
-
- return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator };
- }
-
- fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void {
- err catch |resolved_err| {
- this.err = resolved_err;
- };
-
- resume this.connect_frame;
- }
-
- fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void {
- const sockfd = AsyncIO.openSocket(address.any.family, OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP) catch |err| {
- if (extremely_verbose) {
- Output.prettyErrorln("openSocket error: {s}", .{@errorName(err)});
- }
-
- return error.ConnectionRefused;
- };
-
- this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address);
- suspend {
- this.connect_frame = @frame().*;
- }
-
- if (this.err) |e| {
- return @errSetCast(ConnectError, e);
- }
-
- this.socket = sockfd;
- return;
- }
-
- fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void {
- resume this.close_frame;
- }
-
- pub fn close(this: *AsyncSocket) void {
- if (this.socket == 0) return;
- this.io.close(*AsyncSocket, this, on_close, &this.close_completion, this.socket);
- suspend {
- this.close_frame = @frame().*;
- }
- this.socket = 0;
- }
-
- pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void {
- this.socket = 0;
- outer: while (true) {
- // on macOS, getaddrinfo() is very slow
- // If you send ~200 network requests, about 1.5s is spent on getaddrinfo()
- // So, we cache this.
- var address_list = NetworkThread.getAddressList(default_allocator, name, port) catch |err| {
- return @errSetCast(ConnectError, err);
- };
-
- const list = address_list.address_list;
- if (list.addrs.len == 0) return error.ConnectionRefused;
-
- try_cached_index: {
- if (address_list.index) |i| {
- const address = list.addrs[i];
- if (address_list.invalidated) continue :outer;
-
- this.connectToAddress(address) catch |err| {
- if (err == error.ConnectionRefused) {
- address_list.index = null;
- break :try_cached_index;
- }
-
- address_list.invalidate();
- continue :outer;
- };
- }
- }
-
- for (list.addrs) |address, i| {
- if (address_list.invalidated) continue :outer;
- this.connectToAddress(address) catch |err| {
- if (err == error.ConnectionRefused) continue;
- address_list.invalidate();
- if (err == error.AddressNotAvailable or err == error.UnknownHostName) continue :outer;
- return err;
- };
- address_list.index = @truncate(u32, i);
- return;
- }
-
- if (address_list.invalidated) continue :outer;
-
- address_list.invalidate();
- return error.ConnectionRefused;
- }
- }
-
- fn on_send(msg: *AsyncMessage, _: *Completion, result: SendError!usize) void {
- var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context));
- const written = result catch |err| {
- this.err = err;
- resume this.send_frame;
- return;
- };
-
- if (written == 0) {
- resume this.send_frame;
- return;
- }
-
- msg.sent += @truncate(u16, written);
- const has_more = msg.used > msg.sent;
- this.sent += written;
-
- if (has_more) {
- this.io.send(
- *AsyncMessage,
- msg,
- on_send,
- &msg.completion,
- this.socket,
- msg.slice(),
- SOCKET_FLAGS,
- );
- } else {
- msg.release();
- }
-
- // complete
- if (this.queued <= this.sent) {
- resume this.send_frame;
- }
- }
-
- pub fn write(this: *AsyncSocket, buf: []const u8) usize {
- this.tail.context = this;
-
- const resp = this.tail.writeAll(buf);
- this.queued += resp.written;
-
- if (resp.overflow) {
- var next = AsyncMessage.get(default_allocator);
- this.tail.next = next;
- this.tail = next;
-
- return @as(usize, resp.written) + this.write(buf[resp.written..]);
- }
-
- return @as(usize, resp.written);
- }
-
- pub const SendError = AsyncIO.SendError;
-
- pub fn deinit(this: *AsyncSocket) void {
- this.head.release();
- }
-
- pub fn send(this: *This) SendError!usize {
- const original_sent = this.sent;
- this.head.context = this;
-
- this.io.send(
- *AsyncMessage,
- this.head,
- on_send,
- &this.head.completion,
- this.socket,
- this.head.slice(),
- SOCKET_FLAGS,
- );
-
- var node = this.head;
- while (node.next) |element| {
- this.io.send(
- *AsyncMessage,
- element,
- on_send,
- &element.completion,
- this.socket,
- element.slice(),
- SOCKET_FLAGS,
- );
- node = element.next orelse break;
- }
-
- suspend {
- this.send_frame = @frame().*;
- }
-
- if (this.err) |err| {
- this.err = null;
- return @errSetCast(AsyncSocket.SendError, err);
- }
-
- return this.sent - original_sent;
- }
-
- pub const RecvError = AsyncIO.RecvError;
-
- const Reader = struct {
- pub fn on_read(ctx: *AsyncSocket, _: *AsyncIO.Completion, result: RecvError!usize) void {
- const len = result catch |err| {
- ctx.err = err;
- resume ctx.read_frame;
- return;
- };
- ctx.read_offset += len;
- resume ctx.read_frame;
- }
- };
-
- pub fn read(
- this: *AsyncSocket,
- bytes: []u8,
- offset: u64,
- ) RecvError!u64 {
- this.read_context = bytes;
- this.read_offset = offset;
- const original_read_offset = this.read_offset;
-
- this.io.recv(
- *AsyncSocket,
- this,
- Reader.on_read,
- &this.read_completion,
- this.socket,
- bytes,
- );
-
- suspend {
- this.read_frame = @frame().*;
- }
-
- if (this.err) |err| {
- this.err = null;
- return @errSetCast(RecvError, err);
- }
-
- return this.read_offset - original_read_offset;
- }
-
- pub const SSL = struct {
- ssl: *boring.SSL = undefined,
- ssl_loaded: bool = false,
- socket: AsyncSocket,
- handshake_complete: bool = false,
- ssl_bio: ?*AsyncBIO = null,
- read_bio: ?*AsyncMessage = null,
- handshake_frame: @Frame(SSL.handshake) = undefined,
- send_frame: @Frame(SSL.send) = undefined,
- read_frame: @Frame(SSL.read) = undefined,
- hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined,
- is_ssl: bool = false,
-
- const SSLConnectError = ConnectError || HandshakeError;
- const HandshakeError = error{OpenSSLError};
-
- pub fn connect(this: *SSL, name: []const u8, port: u16) !void {
- this.is_ssl = true;
- try this.socket.connect(name, port);
-
- this.handshake_complete = false;
-
- var ssl = boring.initClient();
- this.ssl = ssl;
- this.ssl_loaded = true;
- errdefer {
- this.ssl_loaded = false;
- this.ssl.deinit();
- this.ssl = undefined;
- }
-
- {
- std.mem.copy(u8, &this.hostname, name);
- this.hostname[name.len] = 0;
- var name_ = this.hostname[0..name.len :0];
- ssl.setHostname(name_);
- }
-
- var bio = try AsyncBIO.init(this.socket.allocator);
- bio.socket_fd = this.socket.socket;
- this.ssl_bio = bio;
-
- boring.SSL_set_bio(ssl, bio.bio, bio.bio);
-
- this.read_bio = AsyncMessage.get(this.socket.allocator);
- try this.handshake();
- }
-
- pub fn close(this: *SSL) void {
- this.socket.close();
- }
-
- fn handshake(this: *SSL) HandshakeError!void {
- while (!this.ssl.isInitFinished()) {
- boring.ERR_clear_error();
- this.ssl_bio.?.enqueueSend();
- const handshake_result = boring.SSL_connect(this.ssl);
- if (handshake_result == 0) {
- Output.prettyErrorln("ssl accept error", .{});
- Output.flush();
- return error.OpenSSLError;
- }
- this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished();
-
- if (!this.handshake_complete) {
- // accept_result < 0
- const e = boring.SSL_get_error(this.ssl, handshake_result);
- if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) {
- this.ssl_bio.?.enqueueSend();
- suspend {
- this.handshake_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.handshake_frame);
- }
-
- continue;
- }
-
- Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result });
- Output.flush();
- return error.OpenSSLError;
- }
- }
- }
-
- pub fn write(this: *SSL, buffer_: []const u8) usize {
- var buffer = buffer_;
- var read_bio = this.read_bio;
- while (buffer.len > 0) {
- const response = read_bio.?.writeAll(buffer);
- buffer = buffer[response.written..];
- if (response.overflow) {
- read_bio = read_bio.?.next orelse brk: {
- read_bio.?.next = AsyncMessage.get(this.socket.allocator);
- break :brk read_bio.?.next.?;
- };
- }
- }
-
- return buffer_.len;
- }
-
- pub fn send(this: *SSL) !usize {
- var bio_ = this.read_bio;
- var len: usize = 0;
- while (bio_) |bio| {
- var slice = bio.slice();
- len += this.ssl.write(slice) catch |err| {
- switch (err) {
- error.WantRead => {
- suspend {
- this.send_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.send_frame);
- }
- continue;
- },
- error.WantWrite => {
- this.ssl_bio.?.enqueueSend();
-
- suspend {
- this.send_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.send_frame);
- }
- continue;
- },
- else => {},
- }
-
- if (comptime Environment.isDebug) {
- Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{
- @errorName(err),
- bio.slice(),
- });
- Output.flush();
- }
-
- return err;
- };
-
- bio_ = bio.next;
- }
- return len;
- }
-
- pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 {
- var buf = buf_[offset..];
- var len: usize = 0;
- while (buf.len > 0) {
- len = this.ssl.read(buf) catch |err| {
- switch (err) {
- error.WantWrite => {
- this.ssl_bio.?.enqueueSend();
-
- if (extremely_verbose) {
- Output.prettyErrorln(
- "error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
- .{
- @errorName(err),
- @tagName(this.ssl_bio.?.read_wait),
- @tagName(this.ssl_bio.?.send_wait),
- },
- );
- Output.flush();
- }
-
- suspend {
- this.read_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.read_frame);
- }
- continue;
- },
- error.WantRead => {
- // this.ssl_bio.enqueueSend();
-
- if (extremely_verbose) {
- Output.prettyErrorln(
- "error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
- .{
- @errorName(err),
- @tagName(this.ssl_bio.?.read_wait),
- @tagName(this.ssl_bio.?.send_wait),
- },
- );
- Output.flush();
- }
-
- suspend {
- this.read_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.read_frame);
- }
- continue;
- },
- else => return err,
- }
- unreachable;
- };
-
- break;
- }
-
- return len;
- }
-
- pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL {
- return SSL{
- .socket = try AsyncSocket.init(io, 0, allocator),
- };
- }
-
- pub fn deinit(this: *SSL) void {
- this.socket.deinit();
- if (!this.is_ssl) return;
-
- if (this.ssl_bio) |bio| {
- _ = boring.BIO_set_data(bio.bio, null);
- bio.pending_frame = AsyncBIO.PendingFrame.init();
- bio.socket_fd = 0;
- bio.release();
- this.ssl_bio = null;
- }
-
- if (this.ssl_loaded) {
- this.ssl.deinit();
- this.ssl_loaded = false;
- }
-
- this.handshake_complete = false;
-
- if (this.read_bio) |bio| {
- var next_ = bio.next;
- while (next_) |next| {
- next.release();
- next_ = next.next;
- }
-
- bio.release();
- this.read_bio = null;
- }
- }
- };
-};
-
-pub const AsyncBIO = struct {
- bio: *boring.BIO = undefined,
- socket_fd: std.os.socket_t = 0,
- allocator: std.mem.Allocator,
-
- read_wait: Wait = Wait.pending,
- send_wait: Wait = Wait.pending,
- recv_completion: AsyncIO.Completion = undefined,
- send_completion: AsyncIO.Completion = undefined,
-
- write_buffer: ?*AsyncMessage = null,
-
- last_send_result: AsyncIO.SendError!usize = 0,
-
- last_read_result: AsyncIO.RecvError!usize = 0,
- next: ?*AsyncBIO = null,
- pending_frame: PendingFrame = PendingFrame.init(),
-
- pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 });
-
- pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void {
- this.pending_frame.writeItem(frame) catch {};
- }
-
- pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe {
- return this.pending_frame.readItem();
- }
-
- var method: ?*boring.BIO_METHOD = null;
- var head: ?*AsyncBIO = null;
-
- const async_bio_name: [:0]const u8 = "AsyncBIO";
-
- const Wait = enum {
- pending,
- suspended,
- completed,
- };
-
- fn instance(allocator: std.mem.Allocator) *AsyncBIO {
- if (head) |head_| {
- var next = head_.next;
- var ret = head_;
- ret.read_wait = .pending;
- ret.send_wait = .pending;
- head = next;
-
- ret.pending_frame = PendingFrame.init();
- return ret;
- }
-
- var bio = allocator.create(AsyncBIO) catch unreachable;
- bio.* = AsyncBIO{
- .allocator = allocator,
- .read_wait = .pending,
- .send_wait = .pending,
- };
-
- return bio;
- }
-
- pub fn release(this: *AsyncBIO) void {
- if (head) |head_| {
- this.next = head_;
- }
-
- this.read_wait = .pending;
- this.last_read_result = 0;
- this.send_wait = .pending;
- this.last_read_result = 0;
- this.pending_frame = PendingFrame.init();
-
- if (this.write_buffer) |write| {
- write.release();
- this.write_buffer = null;
- }
-
- head = this;
- }
-
- pub fn init(allocator: std.mem.Allocator) !*AsyncBIO {
- var bio = instance(allocator);
-
- bio.bio = boring.BIO_new(
- method orelse brk: {
- method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl);
- break :brk method.?;
- },
- ) orelse return error.OutOfMemory;
-
- _ = boring.BIO_set_data(bio.bio, bio);
- return bio;
- }
-
- const WaitResult = enum {
- none,
- read,
- send,
- };
-
- const Sender = struct {
- pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
- this.last_send_result = result;
- this.send_wait = .completed;
- this.write_buffer.?.sent += @truncate(u32, result catch 0);
-
- if (extremely_verbose) {
- const read_result = result catch @as(usize, 999);
- Output.prettyErrorln("onSend: {d}", .{read_result});
- Output.flush();
- }
-
- if (this.pending_frame.readItem()) |frame| {
- resume frame;
- }
- }
- };
-
- pub fn enqueueSend(
- self: *AsyncBIO,
- ) void {
- if (self.write_buffer == null) return;
- var to_write = self.write_buffer.?.slice();
- if (to_write.len == 0) {
- return;
- }
-
- self.last_send_result = 0;
-
- AsyncIO.global.send(
- *AsyncBIO,
- self,
- Sender.onSend,
- &self.send_completion,
- self.socket_fd,
- to_write,
- SOCKET_FLAGS,
- );
- self.send_wait = .suspended;
- if (extremely_verbose) {
- Output.prettyErrorln("enqueueSend: {d}", .{to_write.len});
- Output.flush();
- }
- }
-
- const Reader = struct {
- pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void {
- this.last_read_result = result;
- this.read_wait = .completed;
- if (extremely_verbose) {
- const read_result = result catch @as(usize, 999);
- Output.prettyErrorln("onRead: {d}", .{read_result});
- Output.flush();
- }
- if (this.pending_frame.readItem()) |frame| {
- resume frame;
- }
- }
- };
-
- pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void {
- var read_buffer = read_buf[off..];
- if (read_buffer.len == 0) {
- return;
- }
-
- self.last_read_result = 0;
- AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer);
- self.read_wait = .suspended;
- if (extremely_verbose) {
- Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len});
- Output.flush();
- }
- }
-
- pub const Bio = struct {
- inline fn cast(bio: *boring.BIO) *AsyncBIO {
- return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio)));
- }
-
- pub fn create(this_bio: *boring.BIO) callconv(.C) c_int {
- boring.BIO_set_init(this_bio, 1);
- return 1;
- }
- pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int {
- boring.BIO_set_init(this_bio, 0);
-
- if (boring.BIO_get_data(this_bio) != null) {
- var this = cast(this_bio);
- this.release();
- }
-
- return 0;
- }
- pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int {
- std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
-
- var buf = ptr[0..@intCast(usize, len)];
- boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
-
- if (len <= 0) {
- return 0;
- }
-
- var this = cast(this_bio);
- if (this.read_wait == .suspended) {
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- }
-
- switch (this.send_wait) {
- .pending => {
- var write_buffer = this.write_buffer orelse brk: {
- this.write_buffer = AsyncMessage.get(default_allocator);
- break :brk this.write_buffer.?;
- };
-
- _ = write_buffer.writeAll(buf);
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
-
- return -1;
- },
- .suspended => {
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
-
- return -1;
- },
- .completed => {
- this.send_wait = .pending;
- const written = this.last_send_result catch |err| {
- Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
- Output.flush();
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- };
- this.last_send_result = 0;
- return @intCast(c_int, written);
- },
- }
-
- unreachable;
- }
-
- pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int {
- std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
- var buf = ptr[0..@intCast(usize, len)];
-
- boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
- var this = cast(this_bio);
-
- switch (this.read_wait) {
- .pending => {
- this.enqueueRead(buf, 0);
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- },
- .suspended => {
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- },
- .completed => {
- this.read_wait = .pending;
- const read_len = this.last_read_result catch |err| {
- Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
- Output.flush();
- boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
- return -1;
- };
- this.last_read_result = 0;
- return @intCast(c_int, read_len);
- },
- }
- unreachable;
- }
- pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long {
- return switch (cmd) {
- boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0,
- else => 1,
- };
- }
- };
-};
-
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
var header_count: usize = 0;
var header_entries = this.header_entries.slice();
@@ -1477,7 +542,6 @@ pub fn connect(
try connector.connect(this.url.hostname, port);
var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) };
- client.setReadBufferSize(buffer_pool_len) catch {};
// client.setQuickACK(true) catch {};
this.tcp_client = client;
@@ -1564,135 +628,6 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin
}
}
-const ZlibPool = struct {
- lock: Lock = Lock.init(),
- items: std.ArrayList(*MutableString),
- allocator: std.mem.Allocator,
- pub var instance: ZlibPool = undefined;
- pub var loaded: bool = false;
- pub var decompression_thread_pool: ThreadPool = undefined;
- pub var decompression_thread_pool_loaded: bool = false;
-
- pub fn init(allocator: std.mem.Allocator) ZlibPool {
- return ZlibPool{
- .allocator = allocator,
- .items = std.ArrayList(*MutableString).init(allocator),
- };
- }
-
- pub fn get(this: *ZlibPool) !*MutableString {
- switch (this.items.items.len) {
- 0 => {
- var mutable = try default_allocator.create(MutableString);
- mutable.* = try MutableString.init(default_allocator, 0);
- return mutable;
- },
- else => {
- return this.items.pop();
- },
- }
-
- unreachable;
- }
-
- pub fn put(this: *ZlibPool, mutable: *MutableString) !void {
- mutable.reset();
- try this.items.append(mutable);
- }
-
- pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void {
- // Heuristic: if we have more than 128 KB of data to decompress
- // it may take 1ms or so
- // We must keep the network thread unblocked as often as possible
- // So if we have more than 50 KB of data to decompress, we do it off the network thread
- // if (compressed_data.len < 50_000) {
- var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, default_allocator);
- try reader.readAll();
- return;
- // }
-
- // var task = try DecompressionTask.get(default_allocator);
- // defer task.release();
- // task.* = DecompressionTask{
- // .data = compressed_data,
- // .output = output,
- // .event_fd = AsyncIO.global.eventfd(),
- // };
- // task.scheduleAndWait();
-
- // if (task.err) |err| {
- // return @errSetCast(Zlib.ZlibError, err);
- // }
- }
-
- pub const DecompressionTask = struct {
- task: ThreadPool.Task = ThreadPool.Task{ .callback = callback },
- frame: @Frame(scheduleAndWait) = undefined,
- data: []const u8,
- output: *MutableString = undefined,
- completion: Completion = undefined,
- event_fd: std.os.fd_t = 0,
- err: ?anyerror = null,
- next: ?*DecompressionTask = null,
-
- pub var head: ?*DecompressionTask = null;
-
- pub fn get(allocator: std.mem.Allocator) !*DecompressionTask {
- if (head) |head_| {
- var this = head_;
- head = this.next;
- this.next = null;
- return this;
- }
-
- return try allocator.create(DecompressionTask);
- }
-
- pub fn scheduleAndWait(task: *DecompressionTask) void {
- if (!decompression_thread_pool_loaded) {
- decompression_thread_pool_loaded = true;
- decompression_thread_pool = ThreadPool.init(.{ .max_threads = 1 });
- }
-
- AsyncIO.global.event(
- *DecompressionTask,
- task,
- DecompressionTask.finished,
- &task.completion,
- task.event_fd,
- );
-
- suspend {
- var batch = ThreadPool.Batch.from(&task.task);
- decompression_thread_pool.schedule(batch);
- task.frame = @frame().*;
- }
- }
-
- pub fn release(this: *DecompressionTask) void {
- this.next = head;
- head = this;
- }
-
- fn callback_(this: *DecompressionTask) Zlib.ZlibError!void {
- var reader = try Zlib.ZlibReaderArrayList.init(this.data, &this.output.list, default_allocator);
- try reader.readAll();
- }
-
- pub fn callback(task: *ThreadPool.Task) void {
- var this: *DecompressionTask = @fieldParentPtr(DecompressionTask, "task", task);
- this.callback_() catch |err| {
- this.err = err;
- };
- AsyncIO.triggerEvent(this.event_fd, &this.completion) catch {};
- }
-
- pub fn finished(this: *DecompressionTask, _: *Completion, _: void) void {
- resume this.frame;
- }
- };
-};
-
pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response {
defer if (this.verbose) Output.flush();
var response: picohttp.Response = undefined;