diff options
author | 2021-12-18 21:07:07 -0800 | |
---|---|---|
committer | 2021-12-18 21:07:07 -0800 | |
commit | 0cee57f1d997fe21e519d5e771df0877ab489d5f (patch) | |
tree | 417d044ebbc47cc9b6ef49213620c07ae2927e0a /src/http_client_async.zig | |
parent | d1783babd99ff2a8020765837b3b9b3099137024 (diff) | |
parent | eab99b3bae9a810d76e6eb16afd9fb32cd7672bd (diff) | |
download | bun-0cee57f1d997fe21e519d5e771df0877ab489d5f.tar.gz bun-0cee57f1d997fe21e519d5e771df0877ab489d5f.tar.zst bun-0cee57f1d997fe21e519d5e771df0877ab489d5f.zip |
Merge pull request #80 from Jarred-Sumner/jarred/npm-install
bun install
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r-- | src/http_client_async.zig | 2190 |
1 files changed, 2190 insertions, 0 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig new file mode 100644 index 000000000..8b8bd6082 --- /dev/null +++ b/src/http_client_async.zig @@ -0,0 +1,2190 @@ +const picohttp = @import("picohttp"); +usingnamespace @import("./global.zig"); +const std = @import("std"); +const Headers = @import("./javascript/jsc/webcore/response.zig").Headers; +const URL = @import("./query_string_map.zig").URL; +const Method = @import("./http/method.zig").Method; +const Api = @import("./api/schema.zig").Api; +const Lock = @import("./lock.zig").Lock; +const HTTPClient = @This(); +const SOCKET_FLAGS = os.SOCK_CLOEXEC; +const Zlib = @import("./zlib.zig"); +const StringBuilder = @import("./string_builder.zig"); +const AsyncIO = @import("io"); +const ThreadPool = @import("thread_pool"); +const boring = @import("boringssl"); + +const NetworkThread = @import("network_thread"); + +const extremely_verbose = false; + +fn writeRequest( + comptime Writer: type, + writer: Writer, + request: picohttp.Request, + body: string, + // header_hashes: []u64, +) !void { + _ = writer.write(request.method); + _ = writer.write(" "); + _ = writer.write(request.path); + _ = writer.write(" HTTP/1.1\r\n"); + + for (request.headers) |header, i| { + _ = writer.write(header.name); + _ = writer.write(": "); + _ = writer.write(header.value); + _ = writer.write("\r\n"); + } + + _ = writer.write("\r\n"); + + if (body.len > 0) { + _ = writer.write(body); + } +} + +method: Method, +header_entries: Headers.Entries, +header_buf: string, +url: URL, +allocator: *std.mem.Allocator, +verbose: bool = isTest, +tcp_client: tcp.Client = undefined, +body_size: u32 = 0, +read_count: u32 = 0, +remaining_redirect_count: i8 = 127, +redirect: ?*URLBufferPool = null, +disable_shutdown: bool = true, +timeout: usize = 0, +progress_node: ?*std.Progress.Node = null, +socket: AsyncSocket.SSL = undefined, +gzip_elapsed: u64 = 0, + +/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since. +/// This is a workaround for that. +force_last_modified: bool = false, +if_modified_since: string = "", +request_content_len_buf: ["-4294967295".len]u8 = undefined, +request_headers_buf: [128]picohttp.Header = undefined, +response_headers_buf: [128]picohttp.Header = undefined, + +pub fn init( + allocator: *std.mem.Allocator, + method: Method, + url: URL, + header_entries: Headers.Entries, + header_buf: string, +) !HTTPClient { + return HTTPClient{ + .allocator = allocator, + .method = method, + .url = url, + .header_entries = header_entries, + .header_buf = header_buf, + .socket = undefined, + }; +} + +pub fn deinit(this: *HTTPClient) !void { + if (this.redirect) |redirect| { + redirect.release(); + this.redirect = null; + } +} + +// threadlocal var resolver_cache +const tcp = std.x.net.tcp; +const ip = std.x.net.ip; + +const IPv4 = std.x.os.IPv4; +const IPv6 = std.x.os.IPv6; +const Socket = std.x.os.Socket; +const os = std.os; + +// lowercase hash header names so that we can be sure +pub fn hashHeaderName(name: string) u64 { + var hasher = std.hash.Wyhash.init(0); + var remain: string = name; + var buf: [32]u8 = undefined; + var buf_slice: []u8 = std.mem.span(&buf); + + while (remain.len > 0) { + var end = std.math.min(hasher.buf.len, remain.len); + + hasher.update(strings.copyLowercase(std.mem.span(remain[0..end]), buf_slice)); + remain = remain[end..]; + } + + return hasher.final(); +} + +const host_header_hash = hashHeaderName("Host"); +const connection_header_hash = hashHeaderName("Connection"); + +pub const Encoding = enum { + identity, + gzip, + deflate, + brotli, + chunked, +}; + +const content_encoding_hash = hashHeaderName("Content-Encoding"); +const transfer_encoding_header = hashHeaderName("Transfer-Encoding"); + +const host_header_name = "Host"; +const content_length_header_name = "Content-Length"; +const content_length_header_hash = hashHeaderName("Content-Length"); +const connection_header = picohttp.Header{ .name = "Connection", .value = "close" }; +const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" }; +const accept_header_hash = hashHeaderName("Accept"); + +const accept_encoding_no_compression = "identity"; +const accept_encoding_compression = "deflate, gzip"; +const accept_encoding_header_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression }; +const accept_encoding_header_no_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_no_compression }; + +const accept_encoding_header = if (FeatureFlags.disable_compression_in_http_client) + accept_encoding_header_no_compression +else + accept_encoding_header_compression; + +const accept_encoding_header_hash = hashHeaderName("Accept-Encoding"); + +const user_agent_header = picohttp.Header{ .name = "User-Agent", .value = "Bun.js " ++ Global.package_json_version }; +const user_agent_header_hash = hashHeaderName("User-Agent"); +const location_header_hash = hashHeaderName("Location"); + +pub fn headerStr(this: *const HTTPClient, ptr: Api.StringPointer) string { + return this.header_buf[ptr.offset..][0..ptr.length]; +} + +pub const HeaderBuilder = struct { + content: StringBuilder = StringBuilder{}, + header_count: u64 = 0, + entries: Headers.Entries = Headers.Entries{}, + + pub fn count(this: *HeaderBuilder, name: string, value: string) void { + this.header_count += 1; + this.content.count(name); + this.content.count(value); + } + + pub fn allocate(this: *HeaderBuilder, allocator: *std.mem.Allocator) !void { + try this.content.allocate(allocator); + try this.entries.ensureTotalCapacity(allocator, this.header_count); + } + pub fn append(this: *HeaderBuilder, name: string, value: string) void { + const name_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, name.len), + }; + + _ = this.content.append(name); + + const value_ptr = Api.StringPointer{ + .offset = @truncate(u32, this.content.len), + .length = @truncate(u32, value.len), + }; + _ = this.content.append(value); + this.entries.appendAssumeCapacity(Headers.Kv{ .name = name_ptr, .value = value_ptr }); + } + + pub fn apply(this: *HeaderBuilder, client: *HTTPClient) void { + client.header_entries = this.entries; + client.header_buf = this.content.ptr.?[0..this.content.len]; + } +}; + +pub const HTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 1000 }); + +// 32 pointers much cheaper than 1000 pointers +const SingleHTTPChannel = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 32 }); +var send_sync_channel: SingleHTTPChannel = undefined; +var send_sync_channel_loaded: bool = false; + +pub const HTTPChannelContext = struct { + http: AsyncHTTP = undefined, + channel: *HTTPChannel, + + pub fn callback(http: *AsyncHTTP, sender: *AsyncHTTP.HTTPSender) void { + var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", http); + this.channel.writeItem(http) catch unreachable; + sender.onFinish(); + } +}; + +pub const AsyncHTTP = struct { + request: ?picohttp.Request = null, + response: ?picohttp.Response = null, + request_headers: Headers.Entries = Headers.Entries{}, + response_headers: Headers.Entries = Headers.Entries{}, + response_buffer: *MutableString, + request_body: *MutableString, + allocator: *std.mem.Allocator, + request_header_buf: string = "", + method: Method = Method.GET, + max_retry_count: u32 = 0, + url: URL, + + /// Timeout in nanoseconds + timeout: usize = 0, + + response_encoding: Encoding = Encoding.identity, + redirect_count: u32 = 0, + retries_count: u32 = 0, + verbose: bool = false, + + client: HTTPClient = undefined, + err: ?anyerror = null, + + state: AtomicState = AtomicState.init(State.pending), + elapsed: u64 = 0, + gzip_elapsed: u64 = 0, + + /// Callback runs when request finishes + /// Executes on the network thread + callback: ?CompletionCallback = null, + + pub const CompletionCallback = fn (this: *AsyncHTTP, sender: *HTTPSender) void; + pub var active_requests_count = std.atomic.Atomic(u32).init(0); + + pub const State = enum(u32) { + pending = 0, + scheduled = 1, + sending = 2, + success = 3, + fail = 4, + }; + const AtomicState = std.atomic.Atomic(State); + + pub fn init( + allocator: *std.mem.Allocator, + method: Method, + url: URL, + headers: Headers.Entries, + headers_buf: string, + response_buffer: *MutableString, + request_body: *MutableString, + timeout: usize, + ) !AsyncHTTP { + var this = AsyncHTTP{ + .allocator = allocator, + .url = url, + .method = method, + .request_headers = headers, + .request_header_buf = headers_buf, + .request_body = request_body, + .response_buffer = response_buffer, + }; + this.client = try HTTPClient.init(allocator, method, url, headers, headers_buf); + this.client.timeout = timeout; + this.timeout = timeout; + return this; + } + + pub fn schedule(this: *AsyncHTTP, allocator: *std.mem.Allocator, batch: *ThreadPool.Batch) void { + std.debug.assert(NetworkThread.global_loaded.load(.Monotonic) == 1); + var sender = HTTPSender.get(this, allocator); + this.state.store(.scheduled, .Monotonic); + batch.push(ThreadPool.Batch.from(&sender.task)); + } + + fn sendSyncCallback(this: *AsyncHTTP, sender: *HTTPSender) void { + send_sync_channel.writeItem(this) catch unreachable; + sender.release(); + } + + pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response { + if (!send_sync_channel_loaded) { + send_sync_channel_loaded = true; + send_sync_channel = SingleHTTPChannel.init(); + } + + this.callback = sendSyncCallback; + var batch = NetworkThread.Batch{}; + this.schedule(this.allocator, &batch); + NetworkThread.global.pool.schedule(batch); + while (true) { + var async_http: *AsyncHTTP = (send_sync_channel.tryReadItem() catch unreachable) orelse { + std.atomic.spinLoopHint(); + std.time.sleep(std.time.ns_per_us * 100); + continue; + }; + if (async_http.err) |err| { + return err; + } + + return async_http.response.?; + } + + unreachable; + } + + var http_sender_head: std.atomic.Atomic(?*HTTPSender) = std.atomic.Atomic(?*HTTPSender).init(null); + + pub const HTTPSender = struct { + task: ThreadPool.Task = .{ .callback = callback }, + frame: @Frame(AsyncHTTP.do) = undefined, + http: *AsyncHTTP = undefined, + + next: ?*HTTPSender = null, + + pub fn get(http: *AsyncHTTP, allocator: *std.mem.Allocator) *HTTPSender { + @fence(.Acquire); + + var head_ = http_sender_head.load(.Monotonic); + + if (head_ == null) { + var new_head = allocator.create(HTTPSender) catch unreachable; + new_head.* = HTTPSender{}; + new_head.next = null; + new_head.task = .{ .callback = callback }; + new_head.http = http; + return new_head; + } + + http_sender_head.store(head_.?.next, .Monotonic); + + head_.?.* = HTTPSender{}; + head_.?.next = null; + head_.?.task = .{ .callback = callback }; + head_.?.http = http; + + return head_.?; + } + + pub fn release(this: *HTTPSender) void { + @fence(.Acquire); + this.task = .{ .callback = callback }; + this.http = undefined; + this.next = http_sender_head.swap(this, .Monotonic); + } + + pub fn callback(task: *ThreadPool.Task) void { + var this = @fieldParentPtr(HTTPSender, "task", task); + this.frame = async AsyncHTTP.do(this); + } + + pub fn onFinish(this: *HTTPSender) void { + this.release(); + } + }; + + pub fn do(sender: *HTTPSender) void { + outer: { + var this = sender.http; + this.err = null; + this.state.store(.sending, .Monotonic); + var timer = std.time.Timer.start() catch @panic("Timer failure"); + defer this.elapsed = timer.read(); + _ = active_requests_count.fetchAdd(1, .Monotonic); + defer _ = active_requests_count.fetchSub(1, .Monotonic); + + this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| { + this.state.store(.fail, .Monotonic); + this.err = err; + + if (sender.http.max_retry_count > sender.http.retries_count) { + sender.http.retries_count += 1; + NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task)); + return; + } + break :outer; + }; + + this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0)); + this.state.store(.success, .Monotonic); + this.gzip_elapsed = this.client.gzip_elapsed; + } + + if (sender.http.callback) |callback| { + callback(sender.http, sender); + } + } +}; + +const BufferPool = struct { + pub const len = std.math.maxInt(u16) - 64; + buf: [len]u8 = undefined, + next: ?*BufferPool = null, + allocator: *std.mem.Allocator = undefined, + + var head: ?*BufferPool = null; + + pub fn get(allocator: *std.mem.Allocator) !*BufferPool { + if (head) |item| { + var this = item; + var head_ = item.next; + head = head_; + this.next = null; + + return this; + } + + var entry = try allocator.create(BufferPool); + entry.* = BufferPool{ .allocator = allocator }; + return entry; + } + + pub fn release(this: *BufferPool) void { + if (head) |item| { + item.next = this; + } else { + head = this; + } + } +}; + +const URLBufferPool = struct { + pub const len = 4096; + buf: [len]u8 = undefined, + next: ?*URLBufferPool = null, + allocator: *std.mem.Allocator = undefined, + + var head: ?*URLBufferPool = null; + + pub fn get(allocator: *std.mem.Allocator) !*URLBufferPool { + if (head) |item| { + var this = item; + var head_ = item.next; + head = head_; + this.next = null; + + return this; + } + + var entry = try allocator.create(URLBufferPool); + entry.* = URLBufferPool{ .allocator = allocator }; + return entry; + } + + pub fn release(this: *URLBufferPool) void { + if (head) |item| { + item.next = this; + } else { + head = this; + } + } +}; + +pub const AsyncMessage = struct { + used: u32 = 0, + sent: u32 = 0, + completion: AsyncIO.Completion = undefined, + buf: []u8 = undefined, + pooled: ?*BufferPool = null, + allocator: *std.mem.Allocator, + next: ?*AsyncMessage = null, + context: *c_void = undefined, + released: bool = false, + var _first_ssl: ?*AsyncMessage = null; + pub fn getSSL(allocator: *std.mem.Allocator) *AsyncMessage { + if (_first_ssl) |first| { + var prev = first; + std.debug.assert(prev.released); + if (prev.next) |next| { + _first_ssl = next; + prev.next = null; + } else { + _first_ssl = null; + } + prev.released = false; + + return prev; + } + + var msg = allocator.create(AsyncMessage) catch unreachable; + msg.* = AsyncMessage{ + .allocator = allocator, + .pooled = null, + .buf = &[_]u8{}, + }; + return msg; + } + + var _first: ?*AsyncMessage = null; + pub fn get(allocator: *std.mem.Allocator) *AsyncMessage { + if (_first) |first| { + var prev = first; + std.debug.assert(prev.released); + prev.released = false; + + if (first.next) |next| { + _first = next; + prev.next = null; + return prev; + } else { + _first = null; + } + + return prev; + } + + var msg = allocator.create(AsyncMessage) catch unreachable; + var pooled = BufferPool.get(allocator) catch unreachable; + msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.buf, .pooled = pooled }; + return msg; + } + + pub fn release(self: *AsyncMessage) void { + self.used = 0; + self.sent = 0; + std.debug.assert(!self.released); + self.released = true; + + if (self.pooled != null) { + var old = _first; + _first = self; + self.next = old; + } else { + var old = _first_ssl; + self.next = old; + _first_ssl = self; + } + } + + const WriteResponse = struct { + written: u32 = 0, + overflow: bool = false, + }; + + pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse { + var remain = this.buf[this.used..]; + var writable = buffer[0..@minimum(buffer.len, remain.len)]; + if (writable.len == 0) { + return .{ .written = 0, .overflow = buffer.len > 0 }; + } + + std.mem.copy(u8, remain, writable); + this.used += @intCast(u16, writable.len); + + return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len }; + } + + pub inline fn slice(this: *const AsyncMessage) []const u8 { + return this.buf[0..this.used][this.sent..]; + } + + pub inline fn available(this: *AsyncMessage) []u8 { + return this.buf[0 .. this.buf.len - this.used]; + } +}; + +const Completion = AsyncIO.Completion; + +const AsyncSocket = struct { + const This = @This(); + io: *AsyncIO = undefined, + socket: std.os.socket_t = 0, + head: *AsyncMessage = undefined, + tail: *AsyncMessage = undefined, + allocator: *std.mem.Allocator, + err: ?anyerror = null, + queued: usize = 0, + sent: usize = 0, + send_frame: @Frame(AsyncSocket.send) = undefined, + read_frame: @Frame(AsyncSocket.read) = undefined, + connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined, + close_frame: @Frame(AsyncSocket.close) = undefined, + + read_context: []u8 = undefined, + read_offset: u64 = 0, + read_completion: AsyncIO.Completion = undefined, + connect_completion: AsyncIO.Completion = undefined, + close_completion: AsyncIO.Completion = undefined, + + const ConnectError = AsyncIO.ConnectError || std.os.SocketError || std.os.SetSockOptError; + + pub fn init(io: *AsyncIO, socket: std.os.socket_t, allocator: *std.mem.Allocator) !AsyncSocket { + var head = AsyncMessage.get(allocator); + + return AsyncSocket{ .io = io, .socket = socket, .head = head, .tail = head, .allocator = allocator }; + } + + fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void { + err catch |resolved_err| { + this.err = resolved_err; + }; + + resume this.connect_frame; + } + + fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void { + const sockfd = AsyncIO.openSocket(address.any.family, SOCKET_FLAGS | std.os.SOCK_STREAM, std.os.IPPROTO_TCP) catch return error.ConnectionRefused; + + this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); + suspend { + this.connect_frame = @frame().*; + } + + if (this.err) |e| { + return @errSetCast(ConnectError, e); + } + + this.socket = sockfd; + return; + } + + fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void { + resume this.close_frame; + } + + pub fn close(this: *AsyncSocket) void { + if (this.socket == 0) return; + this.io.close(*AsyncSocket, this, on_close, &this.close_completion, this.socket); + suspend { + this.close_frame = @frame().*; + } + this.socket = 0; + } + + pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { + this.socket = 0; + outer: while (true) { + // on macOS, getaddrinfo() is very slow + // If you send ~200 network requests, about 1.5s is spent on getaddrinfo() + // So, we cache this. + var address_list = NetworkThread.getAddressList(this.allocator, name, port) catch |err| { + return @errSetCast(ConnectError, err); + }; + + const list = address_list.address_list; + if (list.addrs.len == 0) return error.ConnectionRefused; + + try_cached_index: { + if (address_list.index) |i| { + const address = list.addrs[i]; + this.connectToAddress(address) catch |err| { + if (err == error.ConnectionRefused) { + address_list.index = null; + break :try_cached_index; + } + + address_list.invalidate(); + continue :outer; + }; + } + } + + for (list.addrs) |address, i| { + this.connectToAddress(address) catch |err| { + if (err == error.ConnectionRefused) continue; + address_list.invalidate(); + return err; + }; + address_list.index = @truncate(u32, i); + return; + } + + address_list.invalidate(); + return error.ConnectionRefused; + } + } + + fn on_send(msg: *AsyncMessage, completion: *Completion, result: SendError!usize) void { + var this = @ptrCast(*AsyncSocket, @alignCast(@alignOf(*AsyncSocket), msg.context)); + const written = result catch |err| { + this.err = err; + resume this.send_frame; + return; + }; + + if (written == 0) { + resume this.send_frame; + return; + } + + msg.sent += @truncate(u16, written); + const has_more = msg.used > msg.sent; + this.sent += written; + + if (has_more) { + this.io.send( + *AsyncMessage, + msg, + on_send, + &msg.completion, + this.socket, + msg.slice(), + SOCKET_FLAGS, + ); + } else { + msg.release(); + } + + // complete + if (this.queued <= this.sent) { + resume this.send_frame; + } + } + + pub fn write(this: *AsyncSocket, buf: []const u8) usize { + this.tail.context = this; + + const resp = this.tail.writeAll(buf); + this.queued += resp.written; + + if (resp.overflow) { + var next = AsyncMessage.get(this.allocator); + this.tail.next = next; + this.tail = next; + + return @as(usize, resp.written) + this.write(buf[resp.written..]); + } + + return @as(usize, resp.written); + } + + pub const SendError = AsyncIO.SendError; + + pub fn deinit(this: *AsyncSocket) void { + var node = this.head; + this.head.release(); + } + + pub fn send(this: *This) SendError!usize { + const original_sent = this.sent; + this.head.context = this; + + this.io.send( + *AsyncMessage, + this.head, + on_send, + &this.head.completion, + this.socket, + this.head.slice(), + SOCKET_FLAGS, + ); + + var node = this.head; + while (node.next) |element| { + this.io.send( + *AsyncMessage, + element, + on_send, + &element.completion, + this.socket, + element.slice(), + SOCKET_FLAGS, + ); + node = element.next orelse break; + } + + suspend { + this.send_frame = @frame().*; + } + + if (this.err) |err| { + this.err = null; + return @errSetCast(AsyncSocket.SendError, err); + } + + return this.sent - original_sent; + } + + pub const RecvError = AsyncIO.RecvError; + + const Reader = struct { + pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void { + const len = result catch |err| { + ctx.err = err; + resume ctx.read_frame; + return; + }; + ctx.read_offset += len; + resume ctx.read_frame; + } + }; + + pub fn read( + this: *AsyncSocket, + bytes: []u8, + offset: u64, + ) RecvError!u64 { + this.read_context = bytes; + this.read_offset = offset; + const original_read_offset = this.read_offset; + + this.io.recv( + *AsyncSocket, + this, + Reader.on_read, + &this.read_completion, + this.socket, + bytes, + ); + + suspend { + this.read_frame = @frame().*; + } + + if (this.err) |err| { + this.err = null; + return @errSetCast(RecvError, err); + } + + return this.read_offset - original_read_offset; + } + + pub const SSL = struct { + ssl: *boring.SSL = undefined, + socket: AsyncSocket, + handshake_complete: bool = false, + ssl_bio: *AsyncBIO = undefined, + read_bio: ?*AsyncMessage = null, + handshake_frame: @Frame(SSL.handshake) = undefined, + send_frame: @Frame(SSL.send) = undefined, + read_frame: @Frame(SSL.read) = undefined, + hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined, + + const SSLConnectError = ConnectError || HandshakeError; + const HandshakeError = error{OpenSSLError}; + + pub fn connect(this: *SSL, name: []const u8, port: u16) !void { + try this.socket.connect(name, port); + this.handshake_complete = false; + + var ssl = boring.initClient(); + + { + std.mem.copy(u8, &this.hostname, name); + this.hostname[name.len] = 0; + var name_ = this.hostname[0..name.len :0]; + ssl.setHostname(name_); + } + + var bio = try AsyncBIO.init(this.socket.allocator); + bio.socket_fd = this.socket.socket; + this.ssl_bio = bio; + + boring.SSL_set_bio(ssl, bio.bio, bio.bio); + + this.ssl = ssl; + this.read_bio = AsyncMessage.get(this.socket.allocator); + + try this.handshake(); + } + + pub fn close(this: *SSL) void { + this.socket.close(); + } + + fn handshake(this: *SSL) HandshakeError!void { + while (!this.ssl.isInitFinished()) { + boring.ERR_clear_error(); + this.ssl_bio.enqueueSend(); + const handshake_result = boring.SSL_connect(this.ssl); + if (handshake_result == 0) { + Output.prettyErrorln("ssl accept error", .{}); + Output.flush(); + return error.OpenSSLError; + } + this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished(); + + if (!this.handshake_complete) { + // accept_result < 0 + const e = boring.SSL_get_error(this.ssl, handshake_result); + if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { + this.ssl_bio.enqueueSend(); + suspend { + this.handshake_frame = @frame().*; + this.ssl_bio.pushPendingFrame(&this.handshake_frame); + } + + continue; + } + + Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result }); + Output.flush(); + return error.OpenSSLError; + } + } + } + + pub fn write(this: *SSL, buffer_: []const u8) usize { + var buffer = buffer_; + var read_bio = this.read_bio; + while (buffer.len > 0) { + const response = read_bio.?.writeAll(buffer); + buffer = buffer[response.written..]; + if (response.overflow) { + read_bio = read_bio.?.next orelse brk: { + read_bio.?.next = AsyncMessage.get(this.socket.allocator); + break :brk read_bio.?.next.?; + }; + } + } + + return buffer_.len; + } + + pub fn send(this: *SSL) !usize { + var bio_ = this.read_bio; + var len: usize = 0; + while (bio_) |bio| { + var slice = bio.slice(); + len += this.ssl.write(slice) catch |err| { + switch (err) { + error.WantRead => { + suspend { + this.send_frame = @frame().*; + this.ssl_bio.pushPendingFrame(&this.send_frame); + } + continue; + }, + error.WantWrite => { + this.ssl_bio.enqueueSend(); + + suspend { + this.send_frame = @frame().*; + this.ssl_bio.pushPendingFrame(&this.send_frame); + } + continue; + }, + else => {}, + } + + if (comptime Environment.isDebug) { + Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{ + @errorName(err), + bio.slice(), + }); + Output.flush(); + } + + return err; + }; + + bio_ = bio.next; + } + return len; + } + + pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 { + var buf = buf_[offset..]; + var bio_ = this.read_bio; + var len: usize = 0; + while (buf.len > 0) { + len = this.ssl.read(buf) catch |err| { + switch (err) { + error.WantWrite => { + this.ssl_bio.enqueueSend(); + + if (extremely_verbose) { + Output.prettyErrorln( + "Error: {s}: \n Read Wait: {s}\n Send Wait: {s}", + .{ + @errorName(err), + @tagName(this.ssl_bio.read_wait), + @tagName(this.ssl_bio.send_wait), + }, + ); + Output.flush(); + } + + suspend { + this.read_frame = @frame().*; + this.ssl_bio.pushPendingFrame(&this.read_frame); + } + continue; + }, + error.WantRead => { + // this.ssl_bio.enqueueSend(); + + if (extremely_verbose) { + Output.prettyErrorln( + "Error: {s}: \n Read Wait: {s}\n Send Wait: {s}", + .{ + @errorName(err), + @tagName(this.ssl_bio.read_wait), + @tagName(this.ssl_bio.send_wait), + }, + ); + Output.flush(); + } + + suspend { + this.read_frame = @frame().*; + this.ssl_bio.pushPendingFrame(&this.read_frame); + } + continue; + }, + else => return err, + } + unreachable; + }; + + break; + } + + return len; + } + + pub inline fn init(allocator: *std.mem.Allocator, io: *AsyncIO) !SSL { + var head = AsyncMessage.get(allocator); + + return SSL{ + .socket = try AsyncSocket.init(io, 0, allocator), + }; + } + + pub fn deinit(this: *SSL) void { + _ = boring.BIO_set_data(this.ssl_bio.bio, null); + this.ssl_bio.pending_frame = AsyncBIO.PendingFrame.init(); + this.ssl_bio.socket_fd = 0; + this.ssl_bio.release(); + this.ssl.deinit(); + this.handshake_complete = false; + + if (this.read_bio) |bio| { + var next_ = bio.next; + while (next_) |next| { + next.release(); + next_ = next.next; + } + + bio.release(); + this.read_bio = null; + } + } + }; +}; + +pub const AsyncBIO = struct { + bio: *boring.BIO = undefined, + socket_fd: std.os.socket_t = 0, + allocator: *std.mem.Allocator, + + read_wait: Wait = Wait.pending, + send_wait: Wait = Wait.pending, + recv_completion: AsyncIO.Completion = undefined, + send_completion: AsyncIO.Completion = undefined, + + write_buffer: ?*AsyncMessage = null, + + last_send_result: AsyncIO.SendError!usize = 0, + + last_read_result: AsyncIO.RecvError!usize = 0, + next: ?*AsyncBIO = null, + pending_frame: PendingFrame = PendingFrame.init(), + + pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); + + pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { + this.pending_frame.writeItem(frame) catch {}; + } + + pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { + return this.pending_frame.readItem(); + } + + var method: ?*boring.BIO_METHOD = null; + var head: ?*AsyncBIO = null; + + const async_bio_name: [:0]const u8 = "AsyncBIO"; + + const Wait = enum { + pending, + suspended, + completed, + }; + + fn instance(allocator: *std.mem.Allocator) *AsyncBIO { + if (head) |head_| { + var next = head_.next; + var ret = head_; + ret.read_wait = .pending; + ret.send_wait = .pending; + head = next; + + ret.pending_frame = PendingFrame.init(); + return ret; + } + + var bio = allocator.create(AsyncBIO) catch unreachable; + bio.* = AsyncBIO{ + .allocator = allocator, + .read_wait = .pending, + .send_wait = .pending, + }; + + return bio; + } + + pub fn release(this: *AsyncBIO) void { + if (head) |head_| { + this.next = head_; + } + + this.read_wait = .pending; + this.last_read_result = 0; + this.send_wait = .pending; + this.last_read_result = 0; + this.pending_frame = PendingFrame.init(); + + if (this.write_buffer) |write| { + write.release(); + this.write_buffer = null; + } + + head = this; + } + + pub fn init(allocator: *std.mem.Allocator) !*AsyncBIO { + var bio = instance(allocator); + + bio.bio = boring.BIO_new( + method orelse brk: { + method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); + break :brk method.?; + }, + ) orelse return error.OutOfMemory; + + _ = boring.BIO_set_data(bio.bio, bio); + return bio; + } + + const WaitResult = enum { + none, + read, + send, + }; + + const Sender = struct { + pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { + this.last_send_result = result; + this.send_wait = .completed; + this.write_buffer.?.sent += @truncate(u32, result catch 0); + + if (extremely_verbose) { + const read_result = result catch @as(usize, 999); + Output.prettyErrorln("onSend: {d}", .{read_result}); + Output.flush(); + } + + if (this.pending_frame.readItem()) |frame| { + resume frame; + } + } + }; + + pub fn enqueueSend( + self: *AsyncBIO, + ) void { + if (self.write_buffer == null) return; + var to_write = self.write_buffer.?.slice(); + if (to_write.len == 0) { + return; + } + + self.last_send_result = 0; + + AsyncIO.global.send( + *AsyncBIO, + self, + Sender.onSend, + &self.send_completion, + self.socket_fd, + to_write, + SOCKET_FLAGS, + ); + self.send_wait = .suspended; + if (extremely_verbose) { + Output.prettyErrorln("enqueueSend: {d}", .{to_write.len}); + Output.flush(); + } + } + + const Reader = struct { + pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void { + this.last_read_result = result; + this.read_wait = .completed; + if (extremely_verbose) { + const read_result = result catch @as(usize, 999); + Output.prettyErrorln("onRead: {d}", .{read_result}); + Output.flush(); + } + if (this.pending_frame.readItem()) |frame| { + resume frame; + } + } + }; + + pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void { + var read_buffer = read_buf[off..]; + if (read_buffer.len == 0) { + return; + } + + self.last_read_result = 0; + AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer); + self.read_wait = .suspended; + if (extremely_verbose) { + Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len}); + Output.flush(); + } + } + + pub const Bio = struct { + inline fn cast(bio: *boring.BIO) *AsyncBIO { + return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio))); + } + + pub fn create(this_bio: *boring.BIO) callconv(.C) c_int { + boring.BIO_set_init(this_bio, 1); + return 1; + } + pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int { + boring.BIO_set_init(this_bio, 0); + + if (boring.BIO_get_data(this_bio) != null) { + var this = cast(this_bio); + this.release(); + } + + return 0; + } + pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { + std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); + + var buf = ptr[0..@intCast(usize, len)]; + boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + + if (len <= 0) { + return 0; + } + + var this = cast(this_bio); + if (this.read_wait == .suspended) { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + } + + switch (this.send_wait) { + .pending => { + var write_buffer = this.write_buffer orelse brk: { + this.write_buffer = AsyncMessage.get(this.allocator); + break :brk this.write_buffer.?; + }; + + _ = write_buffer.writeAll(buf); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + return -1; + }, + .suspended => { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + return -1; + }, + .completed => { + this.send_wait = .pending; + const written = this.last_send_result catch |err| { + Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); + Output.flush(); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }; + this.last_send_result = 0; + return @intCast(c_int, written); + }, + } + + unreachable; + } + + pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int { + std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); + var buf = ptr[0..@intCast(usize, len)]; + + boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + var this = cast(this_bio); + + switch (this.read_wait) { + .pending => { + this.enqueueRead(buf, 0); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }, + .suspended => { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }, + .completed => { + this.read_wait = .pending; + const read_len = this.last_read_result catch |err| { + Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); + Output.flush(); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }; + this.last_read_result = 0; + return @intCast(c_int, read_len); + }, + } + unreachable; + } + pub fn ctrl(this_bio: *boring.BIO, cmd: c_int, larg: c_long, pargs: ?*c_void) callconv(.C) c_long { + return switch (cmd) { + boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0, + else => 1, + }; + } + }; +}; + +pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { + var header_count: usize = 0; + var header_entries = this.header_entries.slice(); + var header_names = header_entries.items(.name); + var header_values = header_entries.items(.value); + var request_headers_buf = &this.request_headers_buf; + + var override_accept_encoding = false; + var override_accept_header = false; + + var override_user_agent = false; + for (header_names) |head, i| { + const name = this.headerStr(head); + // Hash it as lowercase + const hash = hashHeaderName(name); + + // Skip host and connection header + // we manage those + switch (hash) { + host_header_hash, + connection_header_hash, + content_length_header_hash, + => continue, + hashHeaderName("if-modified-since") => { + if (this.force_last_modified and this.if_modified_since.len == 0) { + this.if_modified_since = this.headerStr(header_values[i]); + } + }, + accept_header_hash => { + override_accept_header = true; + }, + else => {}, + } + + override_user_agent = override_user_agent or hash == user_agent_header_hash; + + override_accept_encoding = override_accept_encoding or hash == accept_encoding_header_hash; + + request_headers_buf[header_count] = (picohttp.Header{ + .name = name, + .value = this.headerStr(header_values[i]), + }); + + // header_name_hashes[header_count] = hash; + + // // ensure duplicate headers come after each other + // if (header_count > 2) { + // var head_i: usize = header_count - 1; + // while (head_i > 0) : (head_i -= 1) { + // if (header_name_hashes[head_i] == header_name_hashes[header_count]) { + // std.mem.swap(picohttp.Header, &header_name_hashes[header_count], &header_name_hashes[head_i + 1]); + // std.mem.swap(u64, &request_headers_buf[header_count], &request_headers_buf[head_i + 1]); + // break; + // } + // } + // } + header_count += 1; + } + + // request_headers_buf[header_count] = connection_header; + // header_count += 1; + + if (!override_user_agent) { + request_headers_buf[header_count] = user_agent_header; + header_count += 1; + } + + if (!override_accept_header) { + request_headers_buf[header_count] = accept_header; + header_count += 1; + } + + request_headers_buf[header_count] = picohttp.Header{ + .name = host_header_name, + .value = this.url.hostname, + }; + header_count += 1; + + if (!override_accept_encoding) { + request_headers_buf[header_count] = accept_encoding_header; + header_count += 1; + } + + if (body_len > 0) { + request_headers_buf[header_count] = picohttp.Header{ + .name = content_length_header_name, + .value = std.fmt.bufPrint(&this.request_content_len_buf, "{d}", .{body_len}) catch "0", + }; + header_count += 1; + } + + return picohttp.Request{ + .method = @tagName(this.method), + .path = this.url.pathname, + .minor_version = 1, + .headers = request_headers_buf[0..header_count], + }; +} + +pub fn connect( + this: *HTTPClient, + comptime ConnectType: type, + connector: ConnectType, +) !void { + const port = this.url.getPortAuto(); + + try connector.connect(this.url.hostname, port); + var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; + client.setNoDelay(true) catch {}; + client.setReadBufferSize(BufferPool.len) catch {}; + client.setQuickACK(true) catch {}; + this.tcp_client = client; + if (this.timeout > 0) { + client.setReadTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; + client.setWriteTimeout(@truncate(u32, this.timeout / std.time.ns_per_ms)) catch {}; + } +} + +pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) @Frame(HTTPClient.send) { + return async this.send(body, body_out_str); +} + +pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { + // this prevents stack overflow + redirect: while (this.remaining_redirect_count >= -1) { + if (this.url.isHTTPS()) { + return this.sendHTTPS(body, body_out_str) catch |err| { + switch (err) { + error.Redirect => { + this.remaining_redirect_count -= 1; + this.socket.deinit(); + + continue :redirect; + }, + else => return err, + } + }; + } else { + return this.sendHTTP(body, body_out_str) catch |err| { + switch (err) { + error.Redirect => { + this.remaining_redirect_count -= 1; + this.socket.socket.deinit(); + + continue :redirect; + }, + else => return err, + } + }; + } + } + + return error.TooManyRedirects; +} + +const Task = ThreadPool.Task; + +pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { + this.socket = AsyncSocket.SSL{ + .socket = try AsyncSocket.init(&AsyncIO.global, 0, this.allocator), + }; + var socket = &this.socket.socket; + try this.connect(*AsyncSocket, socket); + + defer this.socket.close(); + var request = buildRequest(this, body.len); + if (this.verbose) { + Output.prettyErrorln("{s}", .{request}); + } + + try writeRequest(@TypeOf(socket), socket, request, body); + _ = try socket.send(); + + if (this.progress_node == null) { + return this.processResponse( + false, + false, + @TypeOf(socket), + socket, + body_out_str, + ); + } else { + return this.processResponse( + false, + true, + @TypeOf(socket), + socket, + body_out_str, + ); + } +} + +const ZlibPool = struct { + lock: Lock = Lock.init(), + items: std.ArrayList(*MutableString), + allocator: *std.mem.Allocator, + pub var instance: ZlibPool = undefined; + pub var loaded: bool = false; + pub var decompression_thread_pool: ThreadPool = undefined; + pub var decompression_thread_pool_loaded: bool = false; + + pub fn init(allocator: *std.mem.Allocator) ZlibPool { + return ZlibPool{ + .allocator = allocator, + .items = std.ArrayList(*MutableString).init(allocator), + }; + } + + pub fn get(this: *ZlibPool) !*MutableString { + switch (this.items.items.len) { + 0 => { + var mutable = try this.allocator.create(MutableString); + mutable.* = try MutableString.init(this.allocator, 0); + return mutable; + }, + else => { + return this.items.pop(); + }, + } + + return item; + } + + pub fn put(this: *ZlibPool, mutable: *MutableString) !void { + mutable.reset(); + try this.items.append(mutable); + } + + pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void { + // Heuristic: if we have more than 128 KB of data to decompress + // it may take 1ms or so + // We must keep the network thread unblocked as often as possible + // So if we have more than 50 KB of data to decompress, we do it off the network thread + // if (compressed_data.len < 50_000) { + var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, default_allocator); + try reader.readAll(); + return; + // } + + // var task = try DecompressionTask.get(default_allocator); + // defer task.release(); + // task.* = DecompressionTask{ + // .data = compressed_data, + // .output = output, + // .event_fd = AsyncIO.global.eventfd(), + // }; + // task.scheduleAndWait(); + + // if (task.err) |err| { + // return @errSetCast(Zlib.ZlibError, err); + // } + } + + pub const DecompressionTask = struct { + task: ThreadPool.Task = ThreadPool.Task{ .callback = callback }, + frame: @Frame(scheduleAndWait) = undefined, + data: []const u8, + output: *MutableString = undefined, + completion: Completion = undefined, + event_fd: std.os.fd_t = 0, + err: ?anyerror = null, + next: ?*DecompressionTask = null, + + pub var head: ?*DecompressionTask = null; + + pub fn get(allocator: *std.mem.Allocator) !*DecompressionTask { + if (head) |head_| { + var this = head_; + head = this.next; + this.next = null; + return this; + } + + return try allocator.create(DecompressionTask); + } + + pub fn scheduleAndWait(task: *DecompressionTask) void { + if (!decompression_thread_pool_loaded) { + decompression_thread_pool_loaded = true; + decompression_thread_pool = ThreadPool.init(.{ .max_threads = 1 }); + } + + AsyncIO.global.event( + *DecompressionTask, + task, + DecompressionTask.finished, + &task.completion, + task.event_fd, + ); + + suspend { + var batch = ThreadPool.Batch.from(&task.task); + decompression_thread_pool.schedule(batch); + task.frame = @frame().*; + } + } + + pub fn release(this: *DecompressionTask) void { + this.next = head; + head = this; + } + + fn callback_(this: *DecompressionTask) Zlib.ZlibError!void { + var reader = try Zlib.ZlibReaderArrayList.init(this.data, &this.output.list, default_allocator); + try reader.readAll(); + } + + pub fn callback(task: *ThreadPool.Task) void { + var this: *DecompressionTask = @fieldParentPtr(DecompressionTask, "task", task); + this.callback_() catch |err| { + this.err = err; + }; + AsyncIO.triggerEvent(this.event_fd, &this.completion) catch {}; + } + + pub fn finished(this: *DecompressionTask, completion: *Completion, _: void) void { + resume this.frame; + } + }; +}; + +pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { + defer if (this.verbose) Output.flush(); + var response: picohttp.Response = undefined; + var request_message = AsyncMessage.get(this.allocator); + defer request_message.release(); + var request_buffer: []u8 = request_message.buf; + var read_length: usize = 0; + { + var read_headers_up_to: usize = 0; + + var req_buf_read: usize = std.math.maxInt(usize); + defer this.read_count += @intCast(u32, read_length); + + restart: while (req_buf_read != 0) { + req_buf_read = try client.read(request_buffer, read_length); + read_length += req_buf_read; + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(read_length); + this.progress_node.?.context.maybeRefresh(); + } + + var request_body = request_buffer[0..read_length]; + read_headers_up_to = if (read_headers_up_to > read_length) read_length else read_headers_up_to; + + response = picohttp.Response.parseParts(request_body, &this.response_headers_buf, &read_headers_up_to) catch |err| { + switch (err) { + error.ShortRead => { + continue :restart; + }, + else => { + return err; + }, + } + }; + break :restart; + } + } + if (read_length == 0) { + return error.NoData; + } + + body_out_str.reset(); + var content_length: u32 = 0; + var encoding = Encoding.identity; + var transfer_encoding = Encoding.identity; + + var location: string = ""; + + var pretend_its_304 = false; + + for (response.headers) |header| { + switch (hashHeaderName(header.name)) { + content_length_header_hash => { + content_length = std.fmt.parseInt(u32, header.value, 10) catch 0; + try body_out_str.inflate(content_length); + body_out_str.list.expandToCapacity(); + this.body_size = content_length; + }, + content_encoding_hash => { + if (strings.eqlComptime(header.value, "gzip")) { + encoding = Encoding.gzip; + } else if (strings.eqlComptime(header.value, "deflate")) { + encoding = Encoding.deflate; + } else if (!strings.eqlComptime(header.value, "identity")) { + return error.UnsupportedContentEncoding; + } + }, + transfer_encoding_header => { + if (strings.eqlComptime(header.value, "gzip")) { + transfer_encoding = Encoding.gzip; + } else if (strings.eqlComptime(header.value, "deflate")) { + transfer_encoding = Encoding.deflate; + } else if (strings.eqlComptime(header.value, "identity")) { + transfer_encoding = Encoding.identity; + } else if (strings.eqlComptime(header.value, "chunked")) { + transfer_encoding = Encoding.chunked; + } else { + return error.UnsupportedTransferEncoding; + } + }, + location_header_hash => { + location = header.value; + }, + hashHeaderName("Last-Modified") => { + if (this.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0) { + if (strings.eql(this.if_modified_since, header.value)) { + pretend_its_304 = true; + } + } + }, + + else => {}, + } + } + + if (this.verbose) { + Output.prettyErrorln("Response: {s}", .{response}); + } + + if (location.len > 0 and this.remaining_redirect_count > 0) { + switch (response.status_code) { + 302, 301, 307, 308, 303 => { + if (strings.indexOf(location, "://")) |i| { + var url_buf = this.redirect orelse try URLBufferPool.get(this.allocator); + + const protocol_name = location[0..i]; + if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else { + return error.UnsupportedRedirectProtocol; + } + + std.mem.copy(u8, &url_buf.buf, location); + this.url = URL.parse(url_buf.buf[0..location.len]); + this.redirect = url_buf; + } else { + var url_buf = try URLBufferPool.get(this.allocator); + const original_url = this.url; + this.url = URL.parse(std.fmt.bufPrint( + &url_buf.buf, + "{s}://{s}{s}", + .{ original_url.displayProtocol(), original_url.displayHostname(), location }, + ) catch return error.RedirectURLTooLong); + + if (this.redirect) |red| { + red.release(); + } + + this.redirect = url_buf; + } + + // Ensure we don't up ove + + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303 + if (response.status_code == 303) { + this.method = .GET; + } + + return error.Redirect; + }, + else => {}, + } + } + + body_getter: { + if (pretend_its_304) { + response.status_code = 304; + } + + if (response.status_code == 304) break :body_getter; + + if (transfer_encoding == Encoding.chunked) { + var decoder = std.mem.zeroes(picohttp.phr_chunked_decoder); + var buffer_: *MutableString = body_out_str; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } + + buffer_ = try ZlibPool.instance.get(); + }, + else => {}, + } + + var buffer = buffer_.*; + + var last_read: usize = 0; + { + var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; + last_read = remainder.len; + try buffer.inflate(std.math.max(remainder.len, 2048)); + buffer.list.expandToCapacity(); + std.mem.copy(u8, buffer.list.items, remainder); + } + + // set consume_trailer to 1 to discard the trailing header + // using content-encoding per chunk is not supported + decoder.consume_trailer = 1; + + // these variable names are terrible + // it's copypasta from https://github.com/h2o/picohttpparser#phr_decode_chunked + // (but ported from C -> zig) + var rret: usize = 0; + var rsize: usize = last_read; + var pret: isize = picohttp.phr_decode_chunked(&decoder, buffer.list.items.ptr, &rsize); + var total_size = rsize; + + while (pret == -2) { + if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { + try buffer.inflate(std.math.max(total_size * 2, 1024)); + buffer.list.expandToCapacity(); + } + + rret = try client.read(buffer.list.items, total_size); + + if (rret == 0) { + return error.ChunkedEncodingError; + } + + rsize = rret; + pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); + if (pret == -1) return error.ChunkedEncodingParseError; + + total_size += rsize; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(total_size); + this.progress_node.?.context.maybeRefresh(); + } + } + + buffer.list.shrinkRetainingCapacity(total_size); + buffer_.* = buffer; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + var gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { + Output.prettyErrorln("<r><red>Zlib error<r>", .{}); + Output.flush(); + return err; + }; + this.gzip_elapsed = gzip_timer.read(); + }, + else => {}, + } + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_out_str.list.items.len); + this.progress_node.?.context.maybeRefresh(); + } + + this.body_size = @intCast(u32, body_out_str.list.items.len); + return response; + } + + if (content_length > 0) { + var remaining_content_length = content_length; + var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; + remainder = remainder[0..std.math.min(remainder.len, content_length)]; + var buffer_: *MutableString = body_out_str; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + if (!ZlibPool.loaded) { + ZlibPool.instance = ZlibPool.init(default_allocator); + ZlibPool.loaded = true; + } + + buffer_ = try ZlibPool.instance.get(); + if (buffer_.list.capacity < remaining_content_length) { + try buffer_.list.ensureUnusedCapacity(buffer_.allocator, remaining_content_length); + } + buffer_.list.items = buffer_.list.items.ptr[0..remaining_content_length]; + }, + else => {}, + } + var buffer = buffer_.*; + + var body_size: usize = 0; + if (remainder.len > 0) { + std.mem.copy(u8, buffer.list.items, remainder); + body_size = remainder.len; + this.read_count += @intCast(u32, body_size); + remaining_content_length -= @intCast(u32, remainder.len); + } + + while (remaining_content_length > 0) { + const size = @intCast(u32, try client.read( + buffer.list.items, + body_size, + )); + this.read_count += size; + if (size == 0) break; + + body_size += size; + remaining_content_length -= size; + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_size); + this.progress_node.?.context.maybeRefresh(); + } + } + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_size); + this.progress_node.?.context.maybeRefresh(); + } + + buffer.list.shrinkRetainingCapacity(body_size); + buffer_.* = buffer; + + switch (encoding) { + Encoding.gzip, Encoding.deflate => { + var gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); + body_out_str.list.expandToCapacity(); + defer ZlibPool.instance.put(buffer_) catch unreachable; + ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| { + Output.prettyErrorln("<r><red>Zlib error<r>", .{}); + Output.flush(); + return err; + }; + this.gzip_elapsed = gzip_timer.read(); + }, + else => {}, + } + } + } + + if (comptime report_progress) { + this.progress_node.?.activate(); + this.progress_node.?.setCompletedItems(body_out_str.list.items.len); + this.progress_node.?.context.maybeRefresh(); + } + + return response; +} + +pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response { + this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global); + var socket = &this.socket; + try this.connect(*AsyncSocket.SSL, socket); + defer this.socket.close(); + + var request = buildRequest(this, body_str.len); + if (this.verbose) { + Output.prettyErrorln("{s}", .{request}); + } + + try writeRequest(@TypeOf(socket), socket, request, body_str); + _ = try socket.send(); + + if (this.progress_node == null) { + return this.processResponse( + false, + false, + @TypeOf(socket), + socket, + body_out_str, + ); + } else { + return this.processResponse( + false, + true, + @TypeOf(socket), + socket, + body_out_str, + ); + } +} + +// zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec +test "sendHTTP - only" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + // headers.appendHeader("X-What", "ok", true, true, false); + headers.appendHeader("Accept-Encoding", "identity", true, true, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("http://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTP("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqual(body_out_str.list.items.len, 1256); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTP - gzip" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec +test "sendHTTP - gzip" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + // headers.appendHeader("X-What", "ok", true, true, false); + headers.appendHeader("Accept-Encoding", "gzip", true, true, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("http://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTP("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTPS - identity" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec +test "sendHTTPS - identity" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("X-What", "ok", true, true, false); + headers.appendHeader("Accept-Encoding", "identity", true, true, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTPS("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +test "sendHTTPS - gzip" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("Accept-Encoding", "gzip", false, false, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTPS("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTPS - deflate" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test +test "sendHTTPS - deflate" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("Accept-Encoding", "deflate", false, false, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://example.com/"), + headers.entries, + headers.buf.items, + ); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.sendHTTPS("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} + +// zig test src/http_client.zig --test-filter "sendHTTP" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test + +test "send - redirect" { + Output.initTest(); + defer Output.flush(); + + var headers = try std.heap.c_allocator.create(Headers); + headers.* = Headers{ + .entries = @TypeOf(headers.entries){}, + .buf = @TypeOf(headers.buf){}, + .used = 0, + .allocator = std.heap.c_allocator, + }; + + headers.appendHeader("Accept-Encoding", "gzip", false, false, false); + + var client = HTTPClient.init( + std.heap.c_allocator, + .GET, + URL.parse("https://www.bun.sh/"), + headers.entries, + headers.buf.items, + ); + try std.testing.expectEqualStrings(client.url.hostname, "www.bun.sh"); + var body_out_str = try MutableString.init(std.heap.c_allocator, 0); + var response = try client.send("", &body_out_str); + try std.testing.expectEqual(response.status_code, 200); + try std.testing.expectEqual(client.url.hostname, "bun.sh"); + try std.testing.expectEqualStrings(body_out_str.list.items, @embedFile("fixtures_example.com.html")); +} |