aboutsummaryrefslogtreecommitdiff
path: root/src/http/http_client_async.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/http/http_client_async.zig')
-rw-r--r--src/http/http_client_async.zig815
1 files changed, 747 insertions, 68 deletions
diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig
index 240a77dd5..128363171 100644
--- a/src/http/http_client_async.zig
+++ b/src/http/http_client_async.zig
@@ -13,31 +13,35 @@ const Zlib = @import("../zlib.zig");
const StringBuilder = @import("../string_builder.zig");
const AsyncIO = @import("io");
const ThreadPool = @import("../thread_pool.zig");
+const boring = @import("boringssl");
const NetoworkThread = @import("./network_thread.zig");
+const extremely_verbose = false;
+
fn writeRequest(
- writer: *AsyncSocket,
+ comptime Writer: type,
+ writer: Writer,
request: picohttp.Request,
body: string,
// header_hashes: []u64,
) !void {
- _ = try writer.write(request.method);
- _ = try writer.write(" ");
- _ = try writer.write(request.path);
- _ = try writer.write(" HTTP/1.1\r\n");
+ _ = writer.write(request.method);
+ _ = writer.write(" ");
+ _ = writer.write(request.path);
+ _ = writer.write(" HTTP/1.1\r\n");
for (request.headers) |header, i| {
- _ = try writer.write(header.name);
- _ = try writer.write(": ");
- _ = try writer.write(header.value);
- _ = try writer.write("\r\n");
+ _ = writer.write(header.name);
+ _ = writer.write(": ");
+ _ = writer.write(header.value);
+ _ = writer.write("\r\n");
}
- _ = try writer.write("\r\n");
+ _ = writer.write("\r\n");
if (body.len > 0) {
- _ = try writer.write(body);
+ _ = writer.write(body);
}
}
@@ -51,11 +55,11 @@ tcp_client: tcp.Client = undefined,
body_size: u32 = 0,
read_count: u32 = 0,
remaining_redirect_count: i8 = 127,
-redirect_buf: [2048]u8 = undefined,
+redirect: ?*URLBufferPool = null,
disable_shutdown: bool = true,
timeout: usize = 0,
progress_node: ?*std.Progress.Node = null,
-socket: AsyncSocket = undefined,
+socket: AsyncSocket.SSL = undefined,
pub fn init(
allocator: *std.mem.Allocator,
@@ -70,10 +74,17 @@ pub fn init(
.url = url,
.header_entries = header_entries,
.header_buf = header_buf,
- .socket = try AsyncSocket.init(&AsyncIO.global, 0, allocator),
+ .socket = undefined,
};
}
+pub fn deinit(this: *HTTPClient) !void {
+ if (this.redirect) |redirect| {
+ redirect.release();
+ this.redirect = null;
+ }
+}
+
threadlocal var response_headers_buf: [256]picohttp.Header = undefined;
threadlocal var request_content_len_buf: [64]u8 = undefined;
threadlocal var header_name_hashes: [256]u64 = undefined;
@@ -204,6 +215,7 @@ pub const AsyncHTTP = struct {
response_encoding: Encoding = Encoding.identity,
redirect_count: u32 = 0,
retries_count: u32 = 0,
+ verbose: bool = false,
client: HTTPClient = undefined,
err: ?anyerror = null,
@@ -279,9 +291,7 @@ pub const AsyncHTTP = struct {
return head_;
}
- pub fn release(this: *HTTPSender) void {
- // head = this;
- }
+ pub fn release(this: *HTTPSender) void {}
pub fn callback(task: *ThreadPool.Task) void {
var this = @fieldParentPtr(HTTPSender, "task", task);
@@ -328,15 +338,100 @@ pub const AsyncHTTP = struct {
}
};
-const AsyncMessage = struct {
- const buffer_size = std.math.maxInt(u16) - 64;
- used: u16 = 0,
- sent: u16 = 0,
+const BufferPool = struct {
+ pub const len = std.math.maxInt(u16) - 64;
+ buf: [len]u8 = undefined,
+ next: ?*BufferPool = null,
+ allocator: *std.mem.Allocator = undefined,
+
+ var head: ?*BufferPool = null;
+
+ pub fn get(allocator: *std.mem.Allocator) !*BufferPool {
+ if (head) |item| {
+ var this = item;
+ var head_ = item.next;
+ head = head_;
+ this.next = null;
+
+ return this;
+ }
+
+ var entry = try allocator.create(BufferPool);
+ entry.* = BufferPool{ .allocator = allocator };
+ return entry;
+ }
+
+ pub fn release(this: *BufferPool) void {
+ if (head) |item| {
+ item.next = this;
+ } else {
+ head = this;
+ }
+ }
+};
+
+const URLBufferPool = struct {
+ pub const len = 4096;
+ buf: [len]u8 = undefined,
+ next: ?*URLBufferPool = null,
+ allocator: *std.mem.Allocator = undefined,
+
+ var head: ?*URLBufferPool = null;
+
+ pub fn get(allocator: *std.mem.Allocator) !*URLBufferPool {
+ if (head) |item| {
+ var this = item;
+ var head_ = item.next;
+ head = head_;
+ this.next = null;
+
+ return this;
+ }
+
+ var entry = try allocator.create(URLBufferPool);
+ entry.* = URLBufferPool{ .allocator = allocator };
+ return entry;
+ }
+
+ pub fn release(this: *URLBufferPool) void {
+ if (head) |item| {
+ item.next = this;
+ } else {
+ head = this;
+ }
+ }
+};
+
+pub const AsyncMessage = struct {
+ used: u32 = 0,
+ sent: u32 = 0,
completion: AsyncIO.Completion = undefined,
- buf: [buffer_size]u8 = undefined,
+ buf: []u8 = undefined,
+ pooled: ?*BufferPool = null,
allocator: *std.mem.Allocator,
next: ?*AsyncMessage = null,
context: *c_void = undefined,
+ var _first_ssl: ?*AsyncMessage = null;
+ pub fn getSSL(allocator: *std.mem.Allocator) *AsyncMessage {
+ if (_first_ssl) |first| {
+ var prev = first;
+ if (first.next) |next| {
+ _first_ssl = next;
+ prev.next = null;
+ return prev;
+ }
+
+ return prev;
+ }
+
+ var msg = allocator.create(AsyncMessage) catch unreachable;
+ msg.* = AsyncMessage{
+ .allocator = allocator,
+ .pooled = null,
+ .buf = &[_]u8{},
+ };
+ return msg;
+ }
var _first: ?*AsyncMessage = null;
pub fn get(allocator: *std.mem.Allocator) *AsyncMessage {
@@ -352,15 +447,22 @@ const AsyncMessage = struct {
}
var msg = allocator.create(AsyncMessage) catch unreachable;
- msg.* = AsyncMessage{ .allocator = allocator };
+ var pooled = BufferPool.get(allocator) catch unreachable;
+ msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.buf, .pooled = pooled };
return msg;
}
pub fn release(self: *AsyncMessage) void {
- self.next = _first;
self.used = 0;
self.sent = 0;
- _first = self;
+
+ if (self.pooled) |pool| {
+ self.next = _first;
+ _first = self;
+ } else {
+ self.next = _first_ssl;
+ _first_ssl = self;
+ }
}
const WriteResponse = struct {
@@ -481,7 +583,15 @@ const AsyncSocket = struct {
this.sent += written;
if (has_more) {
- this.io.send(*AsyncMessage, msg, on_send, &msg.completion, this.socket, msg.slice());
+ this.io.send(
+ *AsyncMessage,
+ msg,
+ on_send,
+ &msg.completion,
+ this.socket,
+ msg.slice(),
+ SOCKET_FLAGS,
+ );
} else {
msg.release();
}
@@ -492,7 +602,7 @@ const AsyncSocket = struct {
}
}
- pub fn write(this: *AsyncSocket, buf: []const u8) AsyncSocket.SendError!usize {
+ pub fn write(this: *AsyncSocket, buf: []const u8) usize {
this.tail.context = this;
const resp = this.tail.writeAll(buf);
@@ -503,7 +613,7 @@ const AsyncSocket = struct {
this.tail.next = next;
this.tail = next;
- return @as(usize, resp.written) + try this.write(buf[resp.written..]);
+ return @as(usize, resp.written) + this.write(buf[resp.written..]);
}
return @as(usize, resp.written);
@@ -524,11 +634,27 @@ const AsyncSocket = struct {
const original_sent = this.sent;
this.head.context = this;
- this.io.send(*AsyncMessage, this.head, on_send, &this.head.completion, this.socket, this.head.slice());
+ this.io.send(
+ *AsyncMessage,
+ this.head,
+ on_send,
+ &this.head.completion,
+ this.socket,
+ this.head.slice(),
+ SOCKET_FLAGS,
+ );
var node = this.head;
while (node.next) |element| {
- this.io.send(*AsyncMessage, element, on_send, &element.completion, this.socket, element.slice());
+ this.io.send(
+ *AsyncMessage,
+ element,
+ on_send,
+ &element.completion,
+ this.socket,
+ element.slice(),
+ SOCKET_FLAGS,
+ );
node = element.next orelse break;
}
@@ -546,6 +672,18 @@ const AsyncSocket = struct {
pub const RecvError = AsyncIO.RecvError;
+ const Reader = struct {
+ pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void {
+ const len = result catch |err| {
+ ctx.err = err;
+ resume ctx.read_frame;
+ return;
+ };
+ ctx.read_offset += len;
+ resume ctx.read_frame;
+ }
+ };
+
pub fn read(
this: *AsyncSocket,
bytes: []u8,
@@ -554,17 +692,6 @@ const AsyncSocket = struct {
this.read_context = bytes;
this.read_offset = offset;
const original_read_offset = this.read_offset;
- const Reader = struct {
- pub fn on_read(ctx: *AsyncSocket, completion: *AsyncIO.Completion, result: RecvError!usize) void {
- const len = result catch |err| {
- ctx.err = err;
- resume ctx.read_frame;
- return;
- };
- ctx.read_offset += len;
- resume ctx.read_frame;
- }
- };
this.io.recv(
*AsyncSocket,
@@ -586,6 +713,506 @@ const AsyncSocket = struct {
return this.read_offset - original_read_offset;
}
+
+ pub const SSL = struct {
+ ssl: *boring.SSL = undefined,
+ socket: AsyncSocket,
+ handshake_complete: bool = false,
+ ssl_bio: *AsyncBIO = undefined,
+ read_bio: ?*AsyncMessage = null,
+ handshake_frame: @Frame(SSL.handshake) = undefined,
+ send_frame: @Frame(SSL.send) = undefined,
+ read_frame: @Frame(SSL.read) = undefined,
+
+ const SSLConnectError = ConnectError || HandshakeError;
+ const HandshakeError = error{OpenSSLError};
+
+ pub fn connect(this: *SSL, name: []const u8, port: u16) !void {
+ try this.socket.connect(name, port);
+ this.handshake_complete = false;
+
+ var ssl = boring.initClient();
+
+ // {
+ // var hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined;
+ // std.mem.copy(u8, &hostname, name);
+ // hostname[name.len] = 0;
+ // var name_ = hostname[0..name.len :0];
+ // ssl.setHostname(name_);
+ // }
+
+ var bio = try AsyncBIO.init(this.socket.allocator);
+ bio.socket_fd = this.socket.socket;
+ this.ssl_bio = bio;
+
+ boring.SSL_set_bio(ssl, bio.bio, bio.bio);
+
+ this.ssl = ssl;
+ this.read_bio = AsyncMessage.get(this.socket.allocator);
+
+ try this.handshake();
+ }
+
+ fn handshake(this: *SSL) HandshakeError!void {
+ while (true) {
+ if (!this.handshake_complete) {
+ boring.ERR_clear_error();
+ this.ssl_bio.enqueueSend();
+ const handshake_result = boring.SSL_connect(this.ssl);
+ if (handshake_result == 0) {
+ Output.prettyErrorln("ssl accept error", .{});
+ Output.flush();
+ return error.OpenSSLError;
+ }
+ this.handshake_complete = handshake_result == 1;
+
+ if (!this.handshake_complete) {
+ // accept_result < 0
+ const e = boring.SSL_get_error(this.ssl, handshake_result);
+ if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) {
+ this.ssl_bio.enqueueSend();
+ suspend {
+ this.handshake_frame = @frame().*;
+ this.ssl_bio.pending_frame = &this.handshake_frame;
+ }
+
+ continue;
+ }
+
+ Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result });
+ Output.flush();
+ return error.OpenSSLError;
+ }
+ break;
+ }
+ }
+ }
+
+ pub fn write(this: *SSL, buffer_: []const u8) usize {
+ var buffer = buffer_;
+ var read_bio = this.read_bio;
+ while (buffer.len > 0) {
+ const response = read_bio.?.writeAll(buffer);
+ buffer = buffer[response.written..];
+ if (response.overflow) {
+ read_bio = read_bio.?.next orelse brk: {
+ read_bio.?.next = AsyncMessage.get(this.socket.allocator);
+ break :brk read_bio.?.next.?;
+ };
+ }
+ }
+
+ return buffer_.len;
+ }
+
+ pub fn send(this: *SSL) !usize {
+ var bio_ = this.read_bio;
+ var len: usize = 0;
+ while (bio_) |bio| {
+ var slice = bio.slice();
+
+ len += this.ssl.write(slice) catch |err| {
+ switch (err) {
+ error.WantRead => {
+ suspend {
+ this.send_frame = @frame().*;
+ this.ssl_bio.pending_frame = &this.send_frame;
+ }
+ continue;
+ },
+ error.WantWrite => {
+ this.ssl_bio.enqueueSend();
+
+ suspend {
+ this.send_frame = @frame().*;
+ this.ssl_bio.pending_frame = &this.send_frame;
+ }
+ continue;
+ },
+ else => {},
+ }
+
+ if (comptime Environment.isDebug) {
+ Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{
+ @errorName(err),
+ bio.slice(),
+ });
+ Output.flush();
+ }
+
+ return err;
+ };
+
+ bio_ = bio.next;
+ }
+ return len;
+ }
+
+ pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 {
+ var buf = buf_[offset..];
+ var bio_ = this.read_bio;
+ var len: usize = 0;
+ while (buf.len > 0) {
+ len = this.ssl.read(buf) catch |err| {
+ switch (err) {
+ error.WantWrite => {
+ this.ssl_bio.enqueueSend();
+
+ if (extremely_verbose) {
+ Output.prettyErrorln(
+ "Error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
+ .{
+ @errorName(err),
+ @tagName(this.ssl_bio.read_wait),
+ @tagName(this.ssl_bio.send_wait),
+ },
+ );
+ Output.flush();
+ }
+
+ suspend {
+ this.read_frame = @frame().*;
+ this.ssl_bio.pending_frame = &this.read_frame;
+ }
+ continue;
+ },
+ error.WantRead => {
+ // this.ssl_bio.enqueueSend();
+
+ if (extremely_verbose) {
+ Output.prettyErrorln(
+ "Error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
+ .{
+ @errorName(err),
+ @tagName(this.ssl_bio.read_wait),
+ @tagName(this.ssl_bio.send_wait),
+ },
+ );
+ Output.flush();
+ }
+
+ suspend {
+ this.read_frame = @frame().*;
+ this.ssl_bio.pending_frame = &this.read_frame;
+ }
+ continue;
+ },
+ else => return err,
+ }
+ unreachable;
+ };
+
+ break;
+ }
+
+ return len;
+ }
+
+ pub inline fn init(allocator: *std.mem.Allocator, io: *AsyncIO) !SSL {
+ var head = AsyncMessage.get(allocator);
+
+ return SSL{
+ .socket = try AsyncSocket.init(io, 0, allocator),
+ };
+ }
+
+ pub fn deinit(this: *SSL) void {
+ _ = boring.BIO_set_data(this.ssl_bio.bio, null);
+ this.ssl_bio.pending_frame = null;
+ this.ssl_bio.socket_fd = 0;
+ this.ssl_bio.release();
+ this.ssl.deinit();
+ this.handshake_complete = false;
+
+ if (this.read_bio) |bio| {
+ var next_ = bio.next;
+ while (next_) |next| {
+ next.release();
+ next_ = next.next;
+ }
+
+ bio.release();
+ this.read_bio = null;
+ }
+ }
+ };
+};
+
+pub const AsyncBIO = struct {
+ bio: *boring.BIO = undefined,
+ socket_fd: std.os.socket_t = 0,
+ allocator: *std.mem.Allocator,
+
+ read_wait: Wait = Wait.pending,
+ send_wait: Wait = Wait.pending,
+ completion: AsyncIO.Completion = undefined,
+
+ write_buffer: ?*AsyncMessage = null,
+
+ last_send_result: AsyncIO.SendError!usize = 0,
+ last_write_buffer: *AsyncMessage = undefined,
+
+ last_read_result: AsyncIO.RecvError!usize = 0,
+ next: ?*AsyncBIO = null,
+ pending_frame: ?anyframe = null,
+
+ var method: ?*boring.BIO_METHOD = null;
+ var head: ?*AsyncBIO = null;
+
+ const async_bio_name: [:0]const u8 = "AsyncBIO";
+
+ const Wait = enum {
+ pending,
+ suspended,
+ completed,
+ };
+
+ fn instance(allocator: *std.mem.Allocator) *AsyncBIO {
+ if (head) |head_| {
+ var next = head_.next;
+ var ret = head_;
+ ret.read_wait = .pending;
+ ret.send_wait = .pending;
+ head = next;
+ ret.pending_frame = null;
+ return ret;
+ }
+
+ var bio = allocator.create(AsyncBIO) catch unreachable;
+ bio.* = AsyncBIO{
+ .allocator = allocator,
+ .read_wait = .pending,
+ .send_wait = .pending,
+ };
+
+ return bio;
+ }
+
+ pub fn release(this: *AsyncBIO) void {
+ if (head) |head_| {
+ this.next = head_;
+ }
+
+ this.read_wait = .pending;
+ this.last_read_result = 0;
+ this.send_wait = .pending;
+ this.last_read_result = 0;
+ this.pending_frame = null;
+
+ if (this.write_buffer) |write| {
+ write.release();
+ this.write_buffer = null;
+ }
+
+ head = this;
+ }
+
+ pub fn init(allocator: *std.mem.Allocator) !*AsyncBIO {
+ var bio = instance(allocator);
+
+ bio.bio = boring.BIO_new(
+ method orelse brk: {
+ method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl);
+ break :brk method.?;
+ },
+ ) orelse return error.OutOfMemory;
+
+ _ = boring.BIO_set_data(bio.bio, bio);
+ return bio;
+ }
+
+ const WaitResult = enum {
+ none,
+ read,
+ send,
+ };
+
+ const Sender = struct {
+ pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
+ this.last_send_result = result;
+ this.send_wait = .completed;
+ this.write_buffer.?.sent += @truncate(u32, result catch 0);
+
+ if (extremely_verbose) {
+ const read_result = result catch @as(usize, 999);
+ Output.prettyErrorln("onSend: {d}", .{read_result});
+ Output.flush();
+ }
+
+ if (this.pending_frame) |frame| {
+ var _frame = frame;
+ this.pending_frame = null;
+ resume _frame;
+ }
+ }
+ };
+
+ pub fn enqueueSend(
+ self: *AsyncBIO,
+ ) void {
+ if (self.write_buffer == null) return;
+ var to_write = self.write_buffer.?.slice();
+ if (to_write.len == 0) {
+ return;
+ }
+
+ self.last_write_buffer = self.write_buffer.?;
+ self.last_send_result = 0;
+
+ AsyncIO.global.send(
+ *AsyncBIO,
+ self,
+ Sender.onSend,
+ &self.completion,
+ self.socket_fd,
+ to_write,
+ SOCKET_FLAGS,
+ );
+ self.send_wait = .suspended;
+ if (extremely_verbose) {
+ Output.prettyErrorln("enqueueSend: {d}", .{to_write.len});
+ Output.flush();
+ }
+ }
+
+ const Reader = struct {
+ pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void {
+ this.last_read_result = result;
+ this.read_wait = .completed;
+ if (extremely_verbose) {
+ const read_result = result catch @as(usize, 999);
+ Output.prettyErrorln("onRead: {d}", .{read_result});
+ Output.flush();
+ }
+ if (this.pending_frame) |frame| {
+ var _frame = frame;
+ this.pending_frame = null;
+ resume _frame;
+ }
+ }
+ };
+
+ pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void {
+ var read_buffer = read_buf[off..];
+ if (read_buffer.len == 0) {
+ return;
+ }
+
+ self.last_read_result = 0;
+ AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.completion, self.socket_fd, read_buffer);
+ self.read_wait = .suspended;
+ if (extremely_verbose) {
+ Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len});
+ Output.flush();
+ }
+ }
+
+ pub const Bio = struct {
+ inline fn cast(bio: *boring.BIO) *AsyncBIO {
+ return @ptrCast(*AsyncBIO, @alignCast(@alignOf(*AsyncBIO), boring.BIO_get_data(bio)));
+ }
+
+ pub fn create(this_bio: *boring.BIO) callconv(.C) c_int {
+ boring.BIO_set_init(this_bio, 1);
+ return 1;
+ }
+ pub fn destroy(this_bio: *boring.BIO) callconv(.C) c_int {
+ boring.BIO_set_init(this_bio, 0);
+
+ if (boring.BIO_get_data(this_bio) != null) {
+ var this = cast(this_bio);
+ this.release();
+ }
+
+ return 0;
+ }
+ pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int {
+ std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
+
+ var buf = ptr[0..@intCast(usize, len)];
+ boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
+
+ if (len <= 0) {
+ return 0;
+ }
+
+ var this = cast(this_bio);
+ if (this.read_wait == .suspended) {
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
+ return -1;
+ }
+
+ switch (this.send_wait) {
+ .pending => {
+ var write_buffer = this.write_buffer orelse brk: {
+ this.write_buffer = AsyncMessage.get(this.allocator);
+ break :brk this.write_buffer.?;
+ };
+
+ _ = write_buffer.writeAll(buf);
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
+
+ return -1;
+ },
+ .suspended => {
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
+
+ return -1;
+ },
+ .completed => {
+ this.send_wait = .pending;
+ const written = this.last_send_result catch |err| {
+ Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
+ Output.flush();
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
+ return -1;
+ };
+ this.last_send_result = 0;
+ return @intCast(c_int, written);
+ },
+ }
+
+ unreachable;
+ }
+
+ pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int {
+ std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
+ var buf = ptr[0..@intCast(usize, len)];
+
+ boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
+ var this = cast(this_bio);
+
+ switch (this.read_wait) {
+ .pending => {
+ this.enqueueRead(buf, 0);
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
+ return -1;
+ },
+ .suspended => {
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
+ return -1;
+ },
+ .completed => {
+ this.read_wait = .pending;
+ const read_len = this.last_read_result catch |err| {
+ Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
+ Output.flush();
+ boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
+ return -1;
+ };
+ this.last_read_result = 0;
+ return @intCast(c_int, read_len);
+ },
+ }
+ unreachable;
+ }
+ pub fn ctrl(this_bio: *boring.BIO, cmd: c_int, larg: c_long, pargs: ?*c_void) callconv(.C) c_long {
+ return switch (cmd) {
+ boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0,
+ else => 1,
+ };
+ }
+ };
};
threadlocal var request_headers_buf: [256]picohttp.Header = undefined;
@@ -679,13 +1306,15 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request {
pub fn connect(
this: *HTTPClient,
+ comptime ConnectType: type,
+ connector: ConnectType,
) !void {
const port = this.url.getPortAuto();
- try this.socket.connect(this.url.hostname, port);
- var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket) };
+ try connector.connect(this.url.hostname, port);
+ var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) };
client.setNoDelay(true) catch {};
- client.setReadBufferSize(AsyncMessage.buffer_size) catch {};
+ client.setReadBufferSize(BufferPool.len) catch {};
client.setQuickACK(true) catch {};
if (this.timeout > 0) {
@@ -702,21 +1331,24 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !
// this prevents stack overflow
redirect: while (this.remaining_redirect_count >= -1) {
if (this.url.isHTTPS()) {
- return error.NotImplementedYet;
- // return this.sendHTTPS(body, body_out_str) catch |err| {
- // switch (err) {
- // error.Redirect => {
- // this.remaining_redirect_count -= 1;
- // continue :redirect;
- // },
- // else => return err,
- // }
- // };
+ return this.sendHTTPS(body, body_out_str) catch |err| {
+ switch (err) {
+ error.Redirect => {
+ this.remaining_redirect_count -= 1;
+ this.socket.deinit();
+
+ continue :redirect;
+ },
+ else => return err,
+ }
+ };
} else {
return this.sendHTTP(body, body_out_str) catch |err| {
switch (err) {
error.Redirect => {
this.remaining_redirect_count -= 1;
+ this.socket.socket.deinit();
+
continue :redirect;
},
else => return err,
@@ -731,31 +1363,33 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !
const Task = ThreadPool.Task;
pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
- try this.connect();
- defer if (this.socket.socket > 0) std.os.closeSocket(this.socket.socket);
+ this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global);
+ var socket = &this.socket.socket;
+ try this.connect(*AsyncSocket, socket);
+
+ defer if (this.socket.socket.socket > 0) std.os.closeSocket(this.socket.socket.socket);
var request = buildRequest(this, body.len);
if (this.verbose) {
Output.prettyErrorln("{s}", .{request});
}
- try writeRequest(&this.socket, request, body);
- _ = try this.socket.send();
- var client_reader = &this.socket;
+ try writeRequest(@TypeOf(socket), socket, request, body);
+ _ = try socket.send();
if (this.progress_node == null) {
return this.processResponse(
false,
false,
- @TypeOf(client_reader),
- client_reader,
+ @TypeOf(socket),
+ socket,
body_out_str,
);
} else {
return this.processResponse(
false,
true,
- @TypeOf(client_reader),
- client_reader,
+ @TypeOf(socket),
+ socket,
body_out_str,
);
}
@@ -801,10 +1435,11 @@ const ZlibPool = struct {
};
pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response {
+ defer if (this.verbose) Output.flush();
var response: picohttp.Response = undefined;
var request_message = AsyncMessage.get(this.allocator);
defer request_message.release();
- var request_buffer: []u8 = &request_message.buf;
+ var request_buffer: []u8 = request_message.buf;
var read_length: usize = 0;
{
var read_headers_up_to: usize = 0;
@@ -837,6 +1472,9 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo
break :restart;
}
}
+ if (read_length == 0) {
+ return error.NoData;
+ }
body_out_str.reset();
var content_length: u32 = 0;
@@ -891,22 +1529,34 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo
switch (response.status_code) {
302, 301, 307, 308, 303 => {
if (strings.indexOf(location, "://")) |i| {
+ var url_buf = this.redirect orelse try URLBufferPool.get(this.allocator);
+
const protocol_name = location[0..i];
if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else {
return error.UnsupportedRedirectProtocol;
}
- std.mem.copy(u8, &this.redirect_buf, location);
- this.url = URL.parse(location);
+ std.mem.copy(u8, &url_buf.buf, location);
+ this.url = URL.parse(url_buf.buf[0..location.len]);
+ this.redirect = url_buf;
} else {
+ var url_buf = try URLBufferPool.get(this.allocator);
const original_url = this.url;
this.url = URL.parse(std.fmt.bufPrint(
- &this.redirect_buf,
+ &url_buf.buf,
"{s}://{s}{s}",
.{ original_url.displayProtocol(), original_url.displayHostname(), location },
) catch return error.RedirectURLTooLong);
+
+ if (this.redirect) |red| {
+ red.release();
+ }
+
+ this.redirect = url_buf;
}
+ // Ensure we don't up ove
+
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303
if (response.status_code == 303) {
this.method = .GET;
@@ -1096,7 +1746,36 @@ pub fn processResponse(this: *HTTPClient, comptime is_https: bool, comptime repo
}
pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response {
- return error.NotImplementedYet;
+ this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global);
+ var socket = &this.socket;
+ try this.connect(*AsyncSocket.SSL, socket);
+
+ defer if (this.socket.socket.socket > 0) std.os.closeSocket(this.socket.socket.socket);
+ var request = buildRequest(this, body_str.len);
+ if (this.verbose) {
+ Output.prettyErrorln("{s}", .{request});
+ }
+
+ try writeRequest(@TypeOf(socket), socket, request, body_str);
+ _ = try socket.send();
+
+ if (this.progress_node == null) {
+ return this.processResponse(
+ false,
+ false,
+ @TypeOf(socket),
+ socket,
+ body_out_str,
+ );
+ } else {
+ return this.processResponse(
+ false,
+ true,
+ @TypeOf(socket),
+ socket,
+ body_out_str,
+ );
+ }
}
// zig test src/http_client.zig --test-filter "sendHTTP - only" -lc -lc++ /Users/jarred/Code/bun/src/deps/zlib/libz.a /Users/jarred/Code/bun/src/deps/picohttpparser.o --cache-dir /Users/jarred/Code/bun/zig-cache --global-cache-dir /Users/jarred/.cache/zig --name bun --pkg-begin clap /Users/jarred/Code/bun/src/deps/zig-clap/clap.zig --pkg-end --pkg-begin picohttp /Users/jarred/Code/bun/src/deps/picohttp.zig --pkg-end --pkg-begin iguanaTLS /Users/jarred/Code/bun/src/deps/iguanaTLS/src/main.zig --pkg-end -I /Users/jarred/Code/bun/src/deps -I /Users/jarred/Code/bun/src/deps/mimalloc -I /usr/local/opt/icu4c/include -L src/deps/mimalloc -L /usr/local/opt/icu4c/lib --main-pkg-path /Users/jarred/Code/bun --enable-cache -femit-bin=zig-out/bin/test --test-no-exec