diff options
Diffstat (limited to 'src/http/http_client_async.zig')
-rw-r--r-- | src/http/http_client_async.zig | 815 |
1 files changed, 747 insertions, 68 deletions
diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig index 240a77dd5..128363171 100644 --- a/src/http/http_client_async.zig +++ b/src/http/http_client_async.zig @@ -13,31 +13,35 @@ const Zlib = @import("../zlib.zig"); const StringBuilder = @import("../string_builder.zig"); const AsyncIO = @import("io"); const ThreadPool = @import("../thread_pool.zig"); +const boring = @import("boringssl"); const NetoworkThread = @import("./network_thread.zig"); +const extremely_verbose = false; + fn writeRequest( - writer: *AsyncSocket, + comptime Writer: type, + writer: Writer, request: picohttp.Request, body: string, // header_hashes: []u64, ) !void { - _ = try writer.write(request.method); - _ = try writer.write(" "); - _ = try writer.write(request.path); - _ = try writer.write(" HTTP/1.1\r\n"); + _ = writer.write(request.method); + _ = writer.write(" "); + _ = writer.write(request.path); + _ = writer.write(" HTTP/1.1\r\n"); for (request.headers) |header, i| { - _ = try writer.write(header.name); - _ = try writer.write(": "); - _ = try writer.write(header.value); - _ = try writer.write("\r\n"); + _ = writer.write(header.name); + _ = writer.write(": "); + _ = writer.write(header.value); + _ = writer.write("\r\n"); } - _ = try writer.write("\r\n"); + _ = writer.write("\r\n"); if (body.len > 0) { - _ = try writer.write(body); + _ = writer.write(body); } } @@ -51,11 +55,11 @@ tcp_client: tcp.Client = undefined, body_size: u32 = 0, read_count: u32 = 0, remaining_redirect_count: i8 = 127, -redirect_buf: [2048]u8 = undefined, +redirect: ?*URLBufferPool = null, disable_shutdown: bool = true, timeout: usize = 0, progress_node: ?*std.Progress.Node = null, -socket: AsyncSocket = undefined, +socket: AsyncSocket.SSL = undefined, pub fn init( allocator: *std.mem.Allocator, @@ -70,10 +74,17 @@ pub fn init( .url = url, .header_entries = header_entries, .header_buf = header_buf, - .socket = try AsyncSocket.init(&AsyncIO.global, 0, allocator), + .socket = undefined, }; } +pub fn deinit(this: *HTTPClient) !void { + if (this.redirect) |redirect| { + redirect.release(); + this.redirect = null; + } +} + threadlocal var response_headers_buf: [256]picohttp.Header = undefined; threadlocal var request_content_len_buf: [64]u8 = undefined; threadlocal var header_name_hashes: [256]u64 = undefined; @@ -204,6 +215,7 @@ pub const AsyncHTTP = struct { response_encoding: Encoding = Encoding.identity, redirect_count: u32 = 0, retries_count: u32 = 0, + verbose: bool = false, client: HTTPClient = undefined, err: ?anyerror = null, @@ -279,9 +291,7 @@ pub const AsyncHTTP = struct { return head_; } - pub fn release(this: *HTTPSender) void { - // head = this; - } + pub fn release(this: *HTTPSender) void {} pub fn callback(task: *ThreadPool.Task) void { var this = @fieldParentPtr(HTTPSender, "task", task); @@ -328,15 +338,100 @@ pub const AsyncHTTP = struct { } }; -const AsyncMessage = struct { - const buffer_size = std.math.maxInt(u16) - 64; - used: u16 = 0, - sent: u16 = 0, +const BufferPool = struct { + pub const len = std.math.maxInt(u16) - 64; + buf: [len]u8 = undefined, + next: ?*BufferPool = null, + allocator: *std.mem.Allocator = undefined, + + var head: ?*BufferPool = null; + + pub fn get(allocator: *std.mem.Allocator) !*BufferPool { + if (head) |item| { + var this = item; + var head_ = item.next; + head = head_; + this.next = null; + + return this; + } + + var entry = try allocator.create(BufferPool); + entry.* = BufferPool{ .allocator = allocator }; + return entry; + } + + pub fn release(this: *BufferPool) void { + if (head) |item| { + item.next = this; + } else { + head = this; + } + } +}; + +const URLBufferPool = struct { + pub const len = 4096; + buf: [len]u8 = undefined, + next: ?*URLBufferPool = null, + allocator: *std.mem.Allocator = undefined, + + var head: ?*URLBufferPool = null; + + pub fn get(allocator: *std.mem.Allocator) !*URLBufferPool { + if (head) |item| { + var this = item; + var head_ = item.next; + head = head_; + this.next = null; + + return this; + } + + var entry = try allocator.create(URLBufferPool); + entry.* = URLBufferPool{ .allocator = allocator }; + return entry; + } + + pub fn release(this: *URLBufferPool) void { + if (head) |item| { + item.next = this; + } else { + head = this; + } + } +}; + +pub const AsyncMessage = struct { + used: u32 = 0, + sent: u32 = 0, completion: AsyncIO.Completion = undefined, - buf: [buffer_size]u8 = undefined, + buf: []u8 = undefined, + pooled: ?*BufferPool = null, allocator: *std.mem.Allocator, next: ?*AsyncMessage = null, context: *c_void = undefined, + var _first_ssl: ?*AsyncMessage = null; + pub fn getSSL(allocator: *std.mem.Allocator) *AsyncMessage { + if (_first_ssl) |first| { + var prev = first; + if (first.next) |next| { + _first_ssl = next; + prev.next = null; + return prev; + } + + return prev; + } + + var msg = allocator.create(AsyncMessage) catch unreachable; + msg.* = AsyncMessage{ + .allocator = allocator, + .pooled = null, + .buf = &[_]u8{}, + }; + return msg; + } var _first: ?*AsyncMessage = null; pub fn get(allocator: *std.mem.Allocator) *AsyncMessage { @@ -352,15 +447,22 @@ const AsyncMessage = struct { } var msg = allocator.create(AsyncMessage) catch unreachable; - msg.* = AsyncMessage{ .allocator = allocator }; + var pooled = BufferPool.get(allocator) catch unreachable; + msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.buf, .pooled = pooled }; return msg; } pub fn release(self: *AsyncMessage) void { - self.next = _first; self.used = 0; self.sent = 0; - _first = self; + + if (self.pooled) |pool| { + self.next = _first; + _first = self; + } else { + self.next = _first_ssl; + _first_ssl = self; + } } const WriteResponse = struct { @@ -481,7 +583,15 @@ const AsyncSocket = struct { this.sent += written; if (has_more) { - this.io.send(*AsyncMessage, msg, on_send, &msg.completion, this.socket, msg.slice()); + this.io.send( + *AsyncMessage, + msg, + on_send, + &msg.completion, + this.socket, + msg.slice(), + SOCKET_FLAGS, + ); } else { msg.release(); } @@ -492,7 +602,7 @@ const AsyncSocket = struct { } } - pub fn write(this: *AsyncSocket, buf: []const u8) AsyncSocket.SendError!usize { + pub fn write(this: *AsyncSocket, buf: []const u8) usize { this.tail.context = this; const resp = this.tail.writeAll(buf); @@ -503,7 +613,7 @@ const AsyncSocket = struct { this.tail.next = next; this.tail = next; - return @as(usize, resp.written) + try this.write(buf[resp.written..]); + return @as(usize, resp.written) + this.write(buf[resp.written..]); } return @as(usize, resp.written); @@ -524,11 +634,27 @@ const AsyncSocket = struct { const original_sent = this.sent; this.head.context = this; - this.io.send(*AsyncMessage, this.head, on_send, &this.head.completion, this.socket, this.head.slice()); + this.io.send( + *AsyncMessage, + this.head, + on_send, + &this.head.completion, + this.socket, + this.head.slice(), + SOCKET_FLAGS, + ); var node = this.head; while (node.next) |element| { - this.io.send(*AsyncMessage, element, on_send, &element.completion, this.socket, element.slice()); + this.io.send( + *AsyncMessage, + element, + on_send, + &element.completion, + this.socket, + element.slice(), + SOCKET_FLAGS, + ); node = element.next orelse break; } @@ -546,6 +672,18 @@ const AsyncSocket = struct { pub const RecvError = AsyncIO.RecvError; + const Reader = struct { + pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void { + const len = result catch |err| { + ctx.err = err; + resume ctx.read_frame; + return; + }; + ctx.read_offset += len; + resume ctx.read_frame; + } + }; + pub fn read( this: *AsyncSocket, bytes: []u8, @@ -554,17 +692,6 @@ const AsyncSocket = struct { this.read_context = bytes; this.read_offset = offset; const original_read_offset = this.read_offset; - const Reader = struct { - pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void { - const len = result catch |err| { - ctx.err = err; - resume ctx.read_frame; - return; - }; - ctx.read_offset += len; - resume ctx.read_frame; - } - }; this.io.recv( *AsyncSocket, @@ -586,6 +713,506 @@ const AsyncSocket = struct { return this.read_offset - original_read_offset; } + + pub const SSL = struct { + ssl: *boring.SSL = undefined, + socket: AsyncSocket, + handshake_complete: bool = false, + ssl_bio: *AsyncBIO = undefined, + read_bio: ?*AsyncMessage = null, + handshake_frame: @Frame(SSL.handshake) = undefined, + send_frame: @Frame(SSL.send) = undefined, + read_frame: @Frame(SSL.read) = undefined, + + const SSLConnectError = ConnectError || HandshakeError; + const HandshakeError = error{OpenSSLError}; + + pub fn connect(this: *SSL, name: []const u8, port: u16) !void { + try this.socket.connect(name, port); + this.handshake_complete = false; + + var ssl = boring.initClient(); + + // { + // var hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined; + // std.mem.copy(u8, &hostname, name); + // hostname[name.len] = 0; + // var name_ = hostname[0..name.len :0]; + // ssl.setHostname(name_); + // } + + var bio = try AsyncBIO.init(this.socket.allocator); + bio.socket_fd = this.socket.socket; + this.ssl_bio = bio; + + boring.SSL_set_bio(ssl, bio.bio, bio.bio); + + this.ssl = ssl; + this.read_bio = AsyncMessage.get(this.socket.allocator); + + try this.handshake(); + } + + fn handshake(this: *SSL) HandshakeError!void { + while (true) { + if (!this.handshake_complete) { + boring.ERR_clear_error(); + this.ssl_bio.enqueueSend(); + const handshake_result = boring.SSL_connect(this.ssl); + if (handshake_result == 0) { + Output.prettyErrorln("ssl accept error", .{}); + Output.flush(); + return error.OpenSSLError; + } + this.handshake_complete = handshake_result == 1; + + if (!this.handshake_complete) { + // accept_result < 0 + const e = boring.SSL_get_error(this.ssl, handshake_result); + if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { + this.ssl_bio.enqueueSend(); + suspend { + this.handshake_frame = @frame().*; + this.ssl_bio.pending_frame = &this.handshake_frame; + } + + continue; + } + + Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result }); + Output.flush(); + return error.OpenSSLError; + } + break; + } + } + } + + pub fn write(this: *SSL, buffer_: []const u8) usize { + var buffer = buffer_; + var read_bio = this.read_bio; + while (buffer.len > 0) { + const response = read_bio.?.writeAll(buffer); + buffer = buffer[response.written..]; + if (response.overflow) { + read_bio = read_bio.?.next orelse brk: { + read_bio.?.next = AsyncMessage.get(this.socket.allocator); + break :brk read_bio.?.next.?; + }; + } + } + + return buffer_.len; + } + + pub fn send(this: *SSL) !usize { + var bio_ = this.read_bio; + var len: usize = 0; + while (bio_) |bio| { + var slice = bio.slice(); + + len += this.ssl.write(slice) catch |err| { + switch (err) { + error.WantRead => { + suspend { + this.send_frame = @frame().*; + this.ssl_bio.pending_frame = &this.send_frame; + } + continue; + }, + error.WantWrite => { + this.ssl_bio.enqueueSend(); + + suspend { + this.send_frame = @frame().*; + this.ssl_bio.pending_frame = &this.send_frame; + } + continue; + }, + else => {}, + } + + if (comptime Environment.isDebug) { + Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{ + @errorName(err), + bio.slice(), + }); + Output.flush(); + } + + return err; + }; + + bio_ = bio.next; + } + return len; + } + + pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 { + var buf = buf_[offset..]; + var bio_ = this.read_bio; + var len: usize = 0; + while (buf.len > 0) { + len = this.ssl.read(buf) catch |err| { + switch (err) { + error.WantWrite => { + this.ssl_bio.enqueueSend(); + + if (extremely_verbose) { + Output.prettyErrorln( + "Error: {s}: \n Read Wait: {s}\n Send Wait: {s}", + .{ + @errorName(err), + @tagName(this.ssl_bio.read_wait), + @tagName(this.ssl_bio.send_wait), + }, + ); + Output.flush(); + } + + suspend { + this.read_frame = @frame().*; + this.ssl_bio.pending_frame = &this.read_frame; + } + continue; + }, + error.WantRead => { + // this.ssl_bio.enqueueSend(); + + if (extremely_verbose) { + Output.prettyErrorln( + "Error: {s}: \n Read Wait: {s}\n Send Wait: {s}", + .{ + @errorName(err), + @tagName(this.ssl_bio.read_wait), + @tagName(this.ssl_bio.send_wait), + }, + ); + Output.flush(); + } + + suspend { + this.read_frame = @frame().*; + this.ssl_bio.pending_frame = &this.read_frame; + } + continue; + }, + else => return err, + } + unreachable; + }; + + break; + } + + return len; + } + + pub inline fn init(allocator: *std.mem.Allocator, io: *AsyncIO) !SSL { + var head = AsyncMessage.get(allocator); + + return SSL{ + .socket = try AsyncSocket.init(io, 0, allocator), + }; + } + + pub fn deinit(this: *SSL) void { + _ = boring.BIO_set_data(this.ssl_bio.bio, null); + this.ssl_bio.pending_frame = null; + this.ssl_bio.socket_fd = 0; + this.ssl_bio.release(); + this.ssl.deinit(); + this.handshake_complete = false; + + if (this.read_bio) |bio| { + var next_ = bio.next; + while (next_) |next| { + next.release(); + next_ = next.next; + } + + bio.release(); + this.read_bio = null; + } + } + }; +}; + +pub const AsyncBIO = struct { + bio: *boring.BIO = undefined, + socket_fd: std.os.socket_t = 0, + allocator: *std.mem.Allocator, + + read_wait: Wait = Wait.pending, + send_wait: Wait = Wait.pending, + completion: AsyncIO.Completion = undefined, + + write_buffer: ?*AsyncMessage = null, + + last_send_result: AsyncIO.SendError!usize = 0, + last_write_buffer: *AsyncMessage = undefined, + + last_read_result: AsyncIO.RecvError!usize = 0, + next: ?*AsyncBIO = null, + pending_frame: ?anyframe = null, + + var method: ?*boring.BIO_METHOD = null; + var head: ?*AsyncBIO = null; + + const async_bio_name: [:0]const u8 = "AsyncBIO"; + + const Wait = enum { + pending, + suspended, + completed, + }; + + fn instance(allocator: *std.mem.Allocator) *AsyncBIO { + if (head) |head_| { + var next = head_.next; + var ret = head_; + ret.read_wait = .pending; + ret.send_wait = .pending; + head = next; + ret.pending_frame = null; + return ret; + } + + var bio = allocator.create(AsyncBIO) catch unreachable; + bio.* = AsyncBIO{ + .allocator = allocator, + .read_wait = .pending, + .send_wait = .pending, + }; + + return bio; + } + + pub fn release(this: *AsyncBIO) void { + if (head) |head_| { + this.next = head_; + } + + this.read_wait = .pending; + this.last_read_result = 0; + this.send_wait = .pending; + this.last_read_result = 0; + this.pending_frame = null; + + if (this.write_buffer) |write| { + write.release(); + this.write_buffer = null; + } + + head = this; + } + + pub fn init(allocator: *std.mem.Allocator) !*AsyncBIO { + var bio = instance(allocator); + + bio.bio = boring.BIO_new( + method orelse brk: { + method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); + break :brk method.?; + }, + ) orelse return error.OutOfMemory; + + _ = boring.BIO_set_data(bio.bio, bio); + return bio; + } + + const WaitResult = enum { + none, + read, + send, + }; + + const Sender = struct { + pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { + this.last_send_result = result; + this.send_wait = .completed; + this.write_buffer.?.sent += @truncate(u32, result catch 0); + + if (extremely_verbose) { + const read_result = result catch @as(usize, 999); + Output.prettyErrorln("onSend: {d}", .{read_result}); + Output.flush(); + } + + if (this.pending_frame) |frame| { + var _frame = frame; + this.pending_frame = null; + resume _frame; + } + } + }; + + pub fn enqueueSend( + self: *AsyncBIO, + ) void { + if (self.write_buffer == null) return; + var to_write = self.write_buffer.?.slice(); + if (to_write.len == 0) { + return; + } + + self.last_write_buffer = self.write_buffer.?; + self.last_send_result = 0; + + AsyncIO.global.send( + *AsyncBIO, + self, + Sender.onSend, + &self.completion, + self.socket_fd, + to_write, + SOCKET_FLAGS, + ); + self.send_wait = .suspended; + if (extremely_verbose) { + Output.prettyErrorln("enqueueSend: {d}", .{to_write.len}); + Output.flush(); + } + } + + const Reader = struct { + pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void { + this.last_read_result = result; + this.read_wait = .completed; + if (extremely_verbose) { + const read_result = result catch @as(usize, 999); + Output.prettyErrorln("onRead: {d}", .{read_result}); + Output.flush(); + } + if (this.pending_frame) |frame| { + var _frame = frame; + this.pending_frame = null; + resume _frame; + } + } + }; + + pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void { + var read_buffer = read_buf[off..]; + if (read_buffer.len == 0) { + return; + } + + self.last_read_result = 0; + AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.completion, self.socket_fd, read_buffer); + self.read_wait = .suspended; + if (extremely_verbose) { + Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len}); + Output.flush(); + } + } + + pub const Bio = struct { + inline fn cast(bio: *boring.BIO) *AsyncBIO { + return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio))); + } + + pub fn create(this_bio: *boring.BIO) callconv(.C) c_int { + boring.BIO_set_init(this_bio, 1); + return 1; + } + pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int { + boring.BIO_set_init(this_bio, 0); + + if (boring.BIO_get_data(this_bio) != null) { + var this = cast(this_bio); + this.release(); + } + + return 0; + } + pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { + std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); + + var buf = ptr[0..@intCast(usize, len)]; + boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + + if (len <= 0) { + return 0; + } + + var this = cast(this_bio); + if (this.read_wait == .suspended) { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + } + + switch (this.send_wait) { + .pending => { + var write_buffer = this.write_buffer orelse brk: { + this.write_buffer = AsyncMessage.get(this.allocator); + break :brk this.write_buffer.?; + }; + + _ = write_buffer.writeAll(buf); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + return -1; + }, + .suspended => { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + return -1; + }, + .completed => { + this.send_wait = .pending; + const written = this.last_send_result catch |err| { + Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); + Output.flush(); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }; + this.last_send_result = 0; + return @intCast(c_int, written); + }, + } + + unreachable; + } + + pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int { + std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); + var buf = ptr[0..@intCast(usize, len)]; + + boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + var this = cast(this_bio); + + switch (this.read_wait) { + .pending => { + this.enqueueRead(buf, 0); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }, + .suspended => { + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }, + .completed => { + this.read_wait = .pending; + const read_len = this.last_read_result catch |err| { + Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); + Output.flush(); + boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); + return -1; + }; + this.last_read_result = 0; + return @intCast(c_int, read_len); + }, + } + unreachable; + } + pub fn ctrl(this_bio: *boring.BIO, cmd: c_int, larg: c_long, pargs: ?*c_void) callconv(.C) c_long { + return switch (cmd) { + boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0, + else => 1, + }; + } + }; }; threadlocal var request_headers_buf: [256]picohttp.Header = undefined; @@ -679,13 +1306,15 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request { pub fn connect( this: *HTTPClient, + comptime ConnectType: type, + connector: ConnectType, ) !void { const port = this.url.getPortAuto(); - try this.socket.connect(this.url.hostname, port); - var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket) }; + try connector.connect(this.url.hostname, port); + var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) }; client.setNoDelay(true) catch {}; - client.setReadBufferSize(AsyncMessage.buffer_size) catch {}; + client.setReadBufferSize(BufferPool.len) catch {}; client.setQuickACK(true) catch {}; if (this.timeout > 0) { @@ -702,21 +1331,24 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) ! // this prevents stack overflow redirect: while (this.remaining_redirect_count >= -1) { if (this.url.isHTTPS()) { - return error.NotImplementedYet; - // return this.sendHTTPS(body, body_out_str) catch |err| { - // switch (err) { - // error.Redirect => { - // this.remaining_redirect_count -= 1; - // continue :redirect; - // }, - // else => return err, - // } - // }; + return this.sendHTTPS(body, body_out_str) catch |err| { + switch (err) { + error.Redirect => { + this.remaining_redirect_count -= 1; + this.socket.deinit(); + + continue :redirect; + }, + else => return err, + } + }; } else { return this.sendHTTP(body, body_out_str) catch |err| { switch (err) { error.Redirect => { this.remaining_redirect_count -= 1; + this.socket.socket.deinit(); + continue :redirect; }, else => return err, @@ -731,31 +1363,33 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) ! const Task = ThreadPool.Task; pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response { - try this.connect(); - defer if (this.socket.socket > 0) std.os.closeSocket(this.socket.socket); + this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global); + var socket = &this.socket.socket; + try this.connect(*AsyncSocket, socket); + + defer if (this.socket.socket.socket > 0) std.os.closeSocket(this.socket.socket.socket); var request = buildRequest(this, body.len); if (this.verbose) { Output.prettyErrorln("{s}", .{request}); } - try writeRequest(&this.socket, request, body); - _ = try this.socket.send(); - var client_reader = &this.socket; + try writeRequest(@TypeOf(socket), socket, request, body); + _ = try socket.send(); if (this.progress_node == null) { return this.processResponse( false, false, - @TypeOf(client_reader), - client_reader, + @TypeOf(socket), + socket, body_out_str, ); } else { return this.processResponse( false, true, - @TypeOf(client_reader), - client_reader, + @TypeOf(socket), + socket, body_out_str, ); } @@ -801,10 +1435,11 @@ const ZlibPool = struct { }; pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response { + defer if (this.verbose) Output.flush(); var response: picohttp.Response = undefined; var request_message = AsyncMessage.get(this.allocator); defer request_message.release(); - var request_buffer: []u8 = &request_message.buf; + var request_buffer: []u8 = request_message.buf; var read_length: usize = 0; { var read_headers_up_to: usize = 0; @@ -837,6 +1472,9 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo break :restart; } } + if (read_length == 0) { + return error.NoData; + } body_out_str.reset(); var content_length: u32 = 0; @@ -891,22 +1529,34 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo switch (response.status_code) { 302, 301, 307, 308, 303 => { if (strings.indexOf(location, "://")) |i| { + var url_buf = this.redirect orelse try URLBufferPool.get(this.allocator); + const protocol_name = location[0..i]; if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else { return error.UnsupportedRedirectProtocol; } - std.mem.copy(u8, &this.redirect_buf, location); - this.url = URL.parse(location); + std.mem.copy(u8, &url_buf.buf, location); + this.url = URL.parse(url_buf.buf[0..location.len]); + this.redirect = url_buf; } else { + var url_buf = try URLBufferPool.get(this.allocator); const original_url = this.url; this.url = URL.parse(std.fmt.bufPrint( - &this.redirect_buf, + &url_buf.buf, "{s}://{s}{s}", .{ original_url.displayProtocol(), original_url.displayHostname(), location }, ) catch return error.RedirectURLTooLong); + + if (this.redirect) |red| { + red.release(); + } + + this.redirect = url_buf; } + // Ensure we don't up ove + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303 if (response.status_code == 303) { this.method = .GET; @@ -1096,7 +1746,36 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo } pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response { - return error.NotImplementedYet; + this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global); + var socket = &this.socket; + try this.connect(*AsyncSocket.SSL, socket); + + defer if (this.socket.socket.socket > 0) std.os.closeSocket(this.socket.socket.socket); + var request = buildRequest(this, body_str.len); + if (this.verbose) { + Output.prettyErrorln("{s}", .{request}); + } + + try writeRequest(@TypeOf(socket), socket, request, body_str); + _ = try socket.send(); + + if (this.progress_node == null) { + return this.processResponse( + false, + false, + @TypeOf(socket), + socket, + body_out_str, + ); + } else { + return this.processResponse( + false, + true, + @TypeOf(socket), + socket, + body_out_str, + ); + } } // zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec |