diff options
-rw-r--r-- | src/deps/boringssl.translated.zig | 40 | ||||
-rw-r--r-- | src/http/async_bio.zig | 199 | ||||
-rw-r--r-- | src/http/async_socket.zig | 493 | ||||
-rw-r--r-- | src/http_client_async.zig | 2 | ||||
-rw-r--r-- | src/runtime.version | 2 | ||||
-rw-r--r-- | src/thread_pool.zig | 9 |
6 files changed, 567 insertions, 178 deletions
diff --git a/src/deps/boringssl.translated.zig b/src/deps/boringssl.translated.zig index b0509ef1e..5a161f99a 100644 --- a/src/deps/boringssl.translated.zig +++ b/src/deps/boringssl.translated.zig @@ -18764,6 +18764,9 @@ pub const SSL = opaque { }; } + const Output = @import("../global.zig").Output; + const Environment = @import("../global.zig").Environment; + pub fn read(this: *SSL, buf: []u8) Error!usize { const rc = SSL_read(this, buf.ptr, @intCast(c_int, buf.len)); return switch (SSL_get_error(this, rc)) { @@ -18790,10 +18793,16 @@ pub const SSL = opaque { }; } - pub fn write(this: *SSL, buf: []const u8) Error!usize { + pub fn write(this: *SSL, buf: []const u8) Error!u32 { const rc = SSL_write(this, buf.ptr, @intCast(c_int, buf.len)); return switch (SSL_get_error(this, rc)) { - SSL_ERROR_SSL => error.SSL, + SSL_ERROR_SSL => { + if (comptime Environment.isDebug) { + const errdescription = std.mem.span(SSL_error_description(SSL_ERROR_SSL).?); + Output.prettyError("SSL_ERROR: {s}", .{errdescription}); + } + return error.SSL; + }, SSL_ERROR_WANT_READ => error.WantRead, SSL_ERROR_WANT_WRITE => error.WantWrite, SSL_ERROR_WANT_X509_LOOKUP => error.WantX509Lookup, @@ -18812,7 +18821,7 @@ pub const SSL = opaque { SSL_ERROR_HANDBACK => error.Handback, SSL_ERROR_WANT_RENEGOTIATE => error.WantRenegotiate, SSL_ERROR_HANDSHAKE_HINTS_READY => error.HandshakeHintsReady, - else => @intCast(usize, rc), + else => @intCast(u32, rc), }; } @@ -18894,3 +18903,28 @@ pub const BIOMethod = struct { return method; } }; + +pub fn getError(this: *SSL, rc: c_int) SSL.Error!u32 { + return switch (SSL_get_error(this, rc)) { + SSL_ERROR_SSL => error.SSL, + SSL_ERROR_WANT_READ => error.WantRead, + SSL_ERROR_WANT_WRITE => error.WantWrite, + SSL_ERROR_WANT_X509_LOOKUP => error.WantX509Lookup, + SSL_ERROR_SYSCALL => error.Syscall, + SSL_ERROR_ZERO_RETURN => error.ZeroReturn, + SSL_ERROR_WANT_CONNECT => error.WantConnect, + SSL_ERROR_WANT_ACCEPT => error.WantAccept, + SSL_ERROR_WANT_CHANNEL_ID_LOOKUP => error.WantChannelIdLookup, + SSL_ERROR_PENDING_SESSION => error.PendingSession, + SSL_ERROR_PENDING_CERTIFICATE => error.PendingCertificate, + SSL_ERROR_WANT_PRIVATE_KEY_OPERATION => error.WantPrivateKeyOperation, + SSL_ERROR_PENDING_TICKET => error.PendingTicket, + SSL_ERROR_EARLY_DATA_REJECTED => error.EarlyDataRejected, + SSL_ERROR_WANT_CERTIFICATE_VERIFY => error.WantCertificateVerify, + SSL_ERROR_HANDOFF => error.Handoff, + SSL_ERROR_HANDBACK => error.Handback, + SSL_ERROR_WANT_RENEGOTIATE => error.WantRenegotiate, + SSL_ERROR_HANDSHAKE_HINTS_READY => error.HandshakeHintsReady, + else => @intCast(u32, rc), + }; +} diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index eadc40b2a..6c14c8ed8 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -10,7 +10,7 @@ const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS; const getAllocator = @import("../http_client_async.zig").getAllocator; const assert = std.debug.assert; const BufferPool = AsyncMessage.BufferPool; - +const buffer_pool_len = AsyncMessage.buffer_pool_len; const fail = -3; const connection_closed = -2; const pending = -1; @@ -32,6 +32,9 @@ send_completion: Completion = undefined, write_error: c_int = 0, socket_recv_len: c_int = 0, +socket_send_len: u32 = 0, +socket_recv_eof: bool = false, +bio_write_offset: u32 = 0, bio_read_offset: u32 = 0, socket_send_error: ?anyerror = null, @@ -39,20 +42,35 @@ socket_recv_error: ?anyerror = null, next: ?*AsyncBIO = null, -pending_frame: PendingFrame = PendingFrame.init(), -pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); +onReady: ?Callback = null, -pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { - this.pending_frame.writeItem(frame) catch {}; -} +pub const Callback = struct { + ctx: *anyopaque, + callback: fn (ctx: *anyopaque) void, -pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { - return this.pending_frame.readItem(); -} + pub inline fn run(this: Callback) void { + this.callback(this.ctx); + } + + pub fn Wrap(comptime Context: type, comptime func: anytype) type { + return struct { + pub fn wrap(context: *anyopaque) void { + func(@ptrCast(*Context, @alignCast(@alignOf(*Context), context))); + } + + pub fn get(ctx: *Context) Callback { + return Callback{ + .ctx = ctx, + .callback = wrap, + }; + } + }; + } +}; pub fn nextFrame(this: *AsyncBIO) void { - if (this.pending_frame.readItem()) |frame| { - resume frame; + if (this.onReady) |ready| { + ready.run(); } } @@ -75,7 +93,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO { ret.send_wait = .pending; head = next; - ret.pending_frame = PendingFrame.init(); return ret; } @@ -96,16 +113,22 @@ pub fn release(this: *AsyncBIO) void { this.read_wait = .pending; this.send_wait = .pending; - this.pending_frame = PendingFrame.init(); + this.socket_send_len = 0; + this.socket_recv_len = 0; + this.bio_write_offset = 0; + this.bio_read_offset = 0; + this.socket_recv_error = null; + this.socket_send_error = null; + this.onReady = null; if (this.recv_buffer) |recv| { recv.release(); this.recv_buffer = null; } - if (this.write_buffer) |write| { + if (this.send_buffer) |write| { write.release(); - this.write_buffer = null; + this.send_buffer = null; } head = this; @@ -132,7 +155,7 @@ const WaitResult = enum { }; pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void { - const socket_recv_len = @truncate( + const socket_recv_len = @intCast( c_int, result_ catch |err| { this.socket_recv_error = err; @@ -142,18 +165,48 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError! }, ); this.socket_recv_len += socket_recv_len; - + if (extremely_verbose) { + Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); + Output.flush(); + } if (socket_recv_len == 0) { - this.onSocketReadComplete(); + this.socket_recv_eof = true; + } + this.read_wait = .pending; + + // if (socket_recv_len == 0) { + + this.onSocketReadComplete(); + // return; + // } + + // this.read_wait = .pending; + // this.scheduleSocketRead(); +} + +pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void { + const socket_send_len = @truncate( + u32, + result_ catch |err| { + this.socket_send_error = err; + this.send_wait = .pending; + this.onSocketWriteComplete(); + return; + }, + ); + this.socket_send_len += socket_send_len; + + if (socket_send_len == 0 or this.writeBuf().len == 0) { + this.send_wait = .pending; + this.onSocketWriteComplete(); return; } - this.read_wait = .pending; - this.scheduleSocketRead(); + this.send_wait = .pending; + this.scheduleSocketWrite(); } fn onSocketReadComplete(this: *AsyncBIO) void { - assert(this.read_wait == .suspended); this.handleSocketReadComplete(); this.nextFrame(); @@ -163,17 +216,33 @@ inline fn readBuf(this: *AsyncBIO) []u8 { return this.recv_buffer.?.data[this.bio_read_offset..]; } +inline fn writeBuf(this: *AsyncBIO) []const u8 { + return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset]; +} + +pub fn hasPendingReadData(this: *AsyncBIO) bool { + return this.socket_recv_len > 0; +} + pub fn scheduleSocketRead(this: *AsyncBIO) void { - assert(this.read_wait == .pending); this.read_wait = .suspended; + if (this.recv_buffer == null) { + this.recv_buffer = BufferPool.get(getAllocator()); + } + + AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf()); +} - AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf()); +pub fn scheduleSocketWrite(this: *AsyncBIO) void { + this.send_wait = .suspended; + + AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS); } fn handleSocketReadComplete( this: *AsyncBIO, ) void { - this.read_wait = .completed; + this.read_wait = .pending; if (this.socket_recv_len <= 0) { if (this.recv_buffer) |buf| { @@ -183,19 +252,31 @@ fn handleSocketReadComplete( } } -pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { +pub fn onSocketWriteComplete( + this: *AsyncBIO, +) void { assert(this.send_wait == .pending); - this.handleSocketWriteComplete(result); + this.handleSocketWriteComplete(); this.nextFrame(); } -pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void { - // this.last_socket_recv_len = result; - // this.read_wait = .completed; - // if (extremely_verbose) { - // const socket_recv_len = result catch @as(usize, 999); - // Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); - // Output.flush(); +pub fn handleSocketWriteComplete( + this: *AsyncBIO, +) void { + this.send_wait = .completed; + + if (extremely_verbose) { + Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len}); + Output.flush(); + } + + // if (this.send_buffer) |buf| { + // buf.release(); + // this.send_buffer = null; + + // // this might be incorrect! + // this.bio_write_offset = 0; + // this.socket_send_len = 0; // } } @@ -218,11 +299,18 @@ pub const Bio = struct { return 0; } - pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { - if (len < 0) return len; + pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len_: c_int) callconv(.C) c_int { + if (len_ < 0) return len_; assert(@ptrToInt(ptr) > 0); - boring.BIO_clear_retry_flags(this_bio); + { + const retry_flags = boring.BIO_get_retry_flags(this_bio); + boring.BIO_clear_retry_flags(this_bio); + if ((retry_flags & boring.BIO_FLAGS_READ) != 0) { + boring.BIO_set_retry_read(this_bio); + } + } var this = cast(this_bio); + const len = @intCast(u32, len_); if (this.socket_send_error != null) { if (extremely_verbose) { @@ -231,14 +319,39 @@ pub const Bio = struct { } return -1; } + + if (this.send_buffer == null) { + this.send_buffer = BufferPool.get(getAllocator()); + } + + var data = this.send_buffer.?.data[this.bio_write_offset..]; + const to_copy = @minimum(len, @intCast(u32, data.len)); + + if (to_copy == 0) { + boring.BIO_set_retry_write(this_bio); + return -1; + } + + @memcpy(data.ptr, ptr, to_copy); + this.bio_write_offset += to_copy; + + this.scheduleSocketWrite(); + + return @intCast(c_int, to_copy); } pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int { if (len_ < 0) return len_; const len__: u32 = @intCast(u32, len_); assert(@ptrToInt(ptr) > 0); + { + const retry_flags = boring.BIO_get_retry_flags(this_bio); + boring.BIO_clear_retry_flags(this_bio); + if ((retry_flags & boring.BIO_FLAGS_WRITE) != 0) { + boring.BIO_set_retry_write(this_bio); + } + } - boring.BIO_clear_retry_flags(this_bio); var this = cast(this_bio); var socket_recv_len = this.socket_recv_len; @@ -258,7 +371,7 @@ pub const Bio = struct { // error while writing that would otherwise not be reported until the // application attempted to write again - which it may never do. See // https://crbug.com/249848. - if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) { + if ((this.socket_send_error != null) and (socket_recv_len == OK or socket_recv_len == pending)) { return -1; } @@ -270,7 +383,9 @@ pub const Bio = struct { // reused after shutdown for non-SSL traffic, so overreading is fine. assert(bio_read_offset == 0); this.scheduleSocketRead(); - socket_recv_len = pending; + + boring.BIO_set_retry_read(this_bio); + return pending; } if (socket_recv_len == pending) { @@ -287,9 +402,11 @@ pub const Bio = struct { const socket_recv_len_ = @intCast(u32, socket_recv_len); // Report the result of the last Read() if non-empty. - if (!(bio_read_offset < socket_recv_len_)) return 0; + if (!(bio_read_offset < socket_recv_len_)) { + return 0; + } const len = @minimum(len__, socket_recv_len_ - bio_read_offset); - var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]); + var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr); @memcpy(ptr, data, len); bio_read_offset += len; diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig index 83ee904aa..84b024dfd 100644 --- a/src/http/async_socket.zig +++ b/src/http/async_socket.zig @@ -15,6 +15,10 @@ const SOCKET_FLAGS: u32 = @import("../http_client_async.zig").SOCKET_FLAGS; const getAllocator = @import("../http_client_async.zig").getAllocator; const OPEN_SOCKET_FLAGS: u32 = @import("../http_client_async.zig").OPEN_SOCKET_FLAGS; +const SSLFeatureFlags = struct { + pub const early_data_enabled = true; +}; + io: *AsyncIO = undefined, socket: std.os.socket_t = 0, head: *AsyncMessage = undefined, @@ -278,21 +282,64 @@ pub fn read( return this.read_offset - original_read_offset; } +pub fn Yield(comptime Type: anytype) type { + return struct { + frame: @Frame(Type) = undefined, + wait: bool = false, + + pub fn set(this: *@This(), frame: anytype) void { + this.wait = true; + this.frame = frame.*; + } + + pub fn maybeResume(this: *@This()) void { + if (!this.wait) return; + this.wait = false; + resume this.frame; + } + }; +} + pub const SSL = struct { ssl: *boring.SSL = undefined, ssl_loaded: bool = false, socket: AsyncSocket, handshake_complete: bool = false, ssl_bio: ?*AsyncBIO = null, - read_bio: ?*AsyncMessage = null, - handshake_frame: @Frame(SSL.handshake) = undefined, - send_frame: @Frame(SSL.send) = undefined, - read_frame: @Frame(SSL.read) = undefined, + unencrypted_bytes_to_send: ?*AsyncMessage = null, + connect_frame: Yield(SSL.handshake) = Yield(SSL.handshake){}, + send_frame: Yield(SSL.send) = Yield(SSL.send){}, + read_frame: Yield(SSL.read) = Yield(SSL.read){}, + hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined, is_ssl: bool = false, + handshake_state: HandshakeState = HandshakeState.none, + next_handshake_state: HandshakeState = HandshakeState.none, + first_posthandshake_write: bool = true, + in_confirm_handshake: bool = false, + completed_connect: bool = false, + disconnected: bool = false, + + pending_write_buffer: []const u8 = &[_]u8{}, + pending_read_buffer: []u8 = &[_]u8{}, + pending_read_result: anyerror!u32 = 0, + pending_write_result: anyerror!u32 = 0, + + first_post_handshake_write: bool = true, + + handshake_result: ?anyerror = null, + + peek_complete: bool = false, + + pub const HandshakeState = enum { + none, + handshake, + complete, + }; + const SSLConnectError = ConnectError || HandshakeError; - const HandshakeError = error{OpenSSLError}; + const HandshakeError = error{ ClientCertNeeded, OpenSSLError, WouldBlock }; pub fn connect(this: *SSL, name: []const u8, port: u16) !void { this.is_ssl = true; @@ -309,6 +356,10 @@ pub const SSL = struct { this.ssl = undefined; } + // SNI should only contain valid DNS hostnames, not IP addresses (see RFC + // 6066, Section 3). + // + // See https://crbug.com/496472 and https://crbug.com/496468 for discussion. { std.mem.copy(u8, &this.hostname, name); this.hostname[name.len] = 0; @@ -317,168 +368,352 @@ pub const SSL = struct { } var bio = try AsyncBIO.init(this.socket.allocator); + errdefer bio.release(); + bio.onReady = AsyncBIO.Callback.Wrap(SSL, SSL.retryAll).get(this); bio.socket_fd = this.socket.socket; this.ssl_bio = bio; boring.SSL_set_bio(ssl, bio.bio, bio.bio); - this.read_bio = AsyncMessage.get(this.socket.allocator); + // boring.SSL_set_early_data_enabled(ssl, 1); + _ = boring.SSL_clear_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT); + _ = boring.SSL_set_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT); + const mode = boring.SSL_MODE_RELEASE_BUFFERS | boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START; + + _ = boring.SSL_set_mode(ssl, mode); + _ = boring.SSL_clear_mode(ssl, mode); + + var alpns = &[_]u8{ 8, 'h', 't', 't', 'p', '/', '1', '.', '1' }; + std.debug.assert(boring.SSL_set_alpn_protos(this.ssl, alpns, alpns.len) == 0); + + boring.SSL_enable_signed_cert_timestamps(ssl); + boring.SSL_enable_ocsp_stapling(ssl); + + // std.debug.assert(boring.SSL_set_strict_cipher_list(ssl, boring.SSL_DEFAULT_CIPHER_LIST) == 0); + + boring.SSL_set_enable_ech_grease(ssl, 1); + + // Configure BoringSSL to allow renegotiations. Once the initial handshake + // completes, if renegotiations are not allowed, the default reject value will + // be restored. This is done in this order to permit a BoringSSL + // optimization. See https://crbug.com/boringssl/123. Use + // ssl_renegotiate_explicit rather than ssl_renegotiate_freely so DoPeek() + // does not trigger renegotiations. + boring.SSL_set_renegotiate_mode(ssl, boring.ssl_renegotiate_explicit); + + boring.SSL_set_shed_handshake_config(ssl, 1); + + this.unencrypted_bytes_to_send = AsyncMessage.get(this.socket.allocator); + try this.handshake(); + + this.completed_connect = true; } pub fn close(this: *SSL) void { this.socket.close(); } - fn handshake(this: *SSL) HandshakeError!void { - while (!this.ssl.isInitFinished()) { - boring.ERR_clear_error(); - this.ssl_bio.?.enqueueSend(); - const handshake_result = boring.SSL_connect(this.ssl); - if (handshake_result == 0) { - Output.prettyErrorln("ssl accept error", .{}); - Output.flush(); - return error.OpenSSLError; + pub fn handshake(this: *SSL) HandshakeError!void { + this.next_handshake_state = .handshake; + this.handshake_result = null; + this.doHandshakeLoop() catch |err| { + if (err == error.WouldBlock) { + suspend { + this.connect_frame.set(@frame()); + } + } else { + return err; } - this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished(); + }; - if (!this.handshake_complete) { - // accept_result < 0 - const e = boring.SSL_get_error(this.ssl, handshake_result); - if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { - this.ssl_bio.?.enqueueSend(); - suspend { - this.handshake_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.handshake_frame); - } + if (this.handshake_result) |handshake_err| { + const err2 = @errSetCast(HandshakeError, handshake_err); + this.handshake_result = null; + return err2; + } + } - continue; - } + fn retryAll(this: *SSL) void { + const had_handshaked = this.completed_connect; + // SSL_do_handshake, SSL_read, and SSL_write may all be retried when blocked, + // so retry all operations for simplicity. (Otherwise, SSL_get_error for each + // operation may be remembered to retry only the blocked ones.) + if (this.next_handshake_state == .handshake) { + this.onHandshakeIOComplete() catch {}; + } + + this.doPeek(); + if (!had_handshaked or !this.peek_complete) return; + + if (this.pending_read_buffer.len > 0) { + reader: { + this.pending_read_result = this.doPayloadRead(this.pending_read_buffer) catch |err| brk: { + if (err == error.WouldBlock) break :reader; + break :brk err; + }; - Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result }); - Output.flush(); - return error.OpenSSLError; + this.read_frame.maybeResume(); } } - } - pub fn write(this: *SSL, buffer_: []const u8) usize { - var buffer = buffer_; - var read_bio = this.read_bio; - while (buffer.len > 0) { - const response = read_bio.?.writeAll(buffer); - buffer = buffer[response.written..]; - if (response.overflow) { - read_bio = read_bio.?.next orelse brk: { - read_bio.?.next = AsyncMessage.get(this.socket.allocator); - break :brk read_bio.?.next.?; + if (this.pending_write_buffer.len > 0) { + writer: { + this.pending_write_result = this.doPayloadWrite() catch |err| brk: { + if (err == error.WantWrite or err == error.WantRead) break :writer; + break :brk err; }; + + this.send_frame.maybeResume(); } } + } + + pub fn doPayloadWrite(this: *SSL) anyerror!u32 { + const rv = try this.ssl.write(this.pending_write_buffer); + + if (rv >= 0) { + this.pending_write_buffer = this.pending_write_buffer[rv..]; + } - return buffer_.len; + return rv; } - pub fn send(this: *SSL) !usize { - var bio_ = this.read_bio; - var len: usize = 0; - while (bio_) |bio| { - var slice = bio.slice(); - len += this.ssl.write(slice) catch |err| { - switch (err) { - error.WantRead => { - suspend { - this.send_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.send_frame); - } - continue; - }, - error.WantWrite => { - this.ssl_bio.?.enqueueSend(); - - suspend { - this.send_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.send_frame); - } - continue; - }, - else => {}, - } + pub fn doPayloadRead(this: *SSL, buffer: []u8) anyerror!u32 { + if (this.ssl_bio.?.socket_recv_error != null) { + const pending = this.ssl_bio.?.socket_recv_error.?; + this.ssl_bio.?.socket_recv_error = null; + return pending; + } - if (comptime Environment.isDebug) { - Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{ - @errorName(err), - bio.slice(), - }); - Output.flush(); + var total_bytes_read: u32 = 0; + var ssl_ret: c_int = 0; + var ssl_err: c_int = 0; + const buf_len = @truncate(u32, buffer.len); + while (true) { + ssl_ret = boring.SSL_read(this.ssl, buffer.ptr + total_bytes_read, @intCast(c_int, buf_len - total_bytes_read)); + ssl_err = boring.SSL_get_error(this.ssl, ssl_ret); + + if (ssl_ret > 0) { + total_bytes_read += @intCast(u32, ssl_ret); + } else if (ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE) { + if (boring.SSL_renegotiate(this.ssl) == 0) { + ssl_err = boring.SSL_ERROR_SSL; } + } - return err; - }; + // Continue processing records as long as there is more data available + // synchronously. + if (!(ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE or (total_bytes_read < buf_len and ssl_ret > 0 and this.ssl_bio.?.hasPendingReadData()))) break; + } + + // Although only the final SSL_read call may have failed, the failure needs to + // processed immediately, while the information still available in OpenSSL's + // error queue. + var result: anyerror!u32 = total_bytes_read; + + if (ssl_ret <= 0) { + switch (ssl_err) { + boring.SSL_ERROR_ZERO_RETURN => {}, + boring.SSL_ERROR_WANT_X509_LOOKUP => { + result = error.SSLErrorWantX509Lookup; + }, + boring.SSL_ERROR_WANT_PRIVATE_KEY_OPERATION => { + result = error.SSLErrorWantPrivateKeyOperation; + }, + + // Do not treat insufficient data as an error to return in the next call to + // DoPayloadRead() - instead, let the call fall through to check SSL_read() + // again. The transport may have data available by then. + boring.SSL_ERROR_WANT_READ, boring.SSL_ERROR_WANT_WRITE => { + result = error.WouldBlock; + }, + else => { + result = error.OpenSSLError; + }, + } + } - bio_ = bio.next; + // Many servers do not reliably send a close_notify alert when shutting down + // a connection, and instead terminate the TCP connection. This is reported + // as ERR_CONNECTION_CLOSED. Because of this, map the unclean shutdown to a + // graceful EOF, instead of treating it as an error as it should be. + if (this.ssl_bio.?.socket_recv_error) |err| { + this.ssl_bio.?.socket_recv_error = null; + return err; } - return len; + + return result; } - pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 { - var buf = buf_[offset..]; - var len: usize = 0; - while (buf.len > 0) { - this.ssl_bio.?.read_buf_len = buf.len; - len = this.ssl.read(buf) catch |err| { - switch (err) { - error.WantWrite => { - this.ssl_bio.?.enqueueSend(); - - if (extremely_verbose) { - Output.prettyErrorln( - "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", - .{ - @errorName(err), - @tagName(this.ssl_bio.?.read_wait), - @tagName(this.ssl_bio.?.send_wait), - }, - ); - Output.flush(); + fn doHandshakeLoop( + this: *SSL, + ) HandshakeError!void { + while (true) { + var state = this.next_handshake_state; + this.next_handshake_state = HandshakeState.none; + switch (state) { + .handshake => { + this.doHandshake() catch |err| { + if (err != error.WouldBlock) { + this.handshake_result = err; } + return err; + }; + }, + .complete => { + this.doHandshakeComplete(); + }, + else => unreachable, + } + if (this.next_handshake_state == .none) return; + } + } - suspend { - this.read_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.read_frame); - } - continue; - }, - error.WantRead => { - // this.ssl_bio.enqueueSend(); - - if (extremely_verbose) { - Output.prettyErrorln( - "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", - .{ - @errorName(err), - @tagName(this.ssl_bio.?.read_wait), - @tagName(this.ssl_bio.?.send_wait), - }, - ); - Output.flush(); - } + fn onHandshakeIOComplete(this: *SSL) HandshakeError!void { + this.doHandshakeLoop() catch |err| { + if (err == error.WouldBlock) { + return; + } + this.in_confirm_handshake = false; + this.connect_frame.maybeResume(); + return; + }; + this.connect_frame.maybeResume(); + } + + fn doHandshakeComplete(this: *SSL) void { + if (this.in_confirm_handshake) { + this.next_handshake_state = .none; + return; + } + + this.completed_connect = true; + this.next_handshake_state = .none; + this.doPeek(); + } + + fn doPeek(this: *SSL) void { + if (!this.completed_connect) { + return; + } + + if (this.peek_complete) { + return; + } + + var byte: u8 = 0; + var rv = boring.SSL_peek(this.ssl, &byte, 1); + var ssl_error = boring.SSL_get_error(this.ssl, rv); + switch (ssl_error) { + boring.SSL_ERROR_WANT_READ, boring.SSL_ERROR_WANT_WRITE => {}, + else => { + this.peek_complete = true; + }, + } + } + + fn doHandshake(this: *SSL) HandshakeError!void { + const rv = boring.SSL_do_handshake(this.ssl); + if (rv <= 0) { + const ssl_error = boring.SSL_get_error(this.ssl, rv); + + switch (ssl_error) { + boring.SSL_ERROR_WANT_PRIVATE_KEY_OPERATION, boring.SSL_ERROR_WANT_X509_LOOKUP => { + this.next_handshake_state = HandshakeState.handshake; + return error.ClientCertNeeded; + }, + boring.SSL_ERROR_WANT_CERTIFICATE_VERIFY => { + this.next_handshake_state = HandshakeState.handshake; + return error.ClientCertNeeded; + }, + boring.SSL_ERROR_WANT_READ, boring.SSL_ERROR_WANT_WRITE => { + this.next_handshake_state = HandshakeState.handshake; + return error.WouldBlock; + }, + else => return error.OpenSSLError, + } + } + + this.next_handshake_state = HandshakeState.complete; + } + + pub fn write(this: *SSL, buffer_: []const u8) usize { + return this.unencrypted_bytes_to_send.?.writeAll(buffer_).written; + } - suspend { - this.read_frame = @frame().*; - this.ssl_bio.?.pushPendingFrame(&this.read_frame); + pub fn send(this: *SSL) anyerror!usize { + this.unencrypted_bytes_to_send.?.sent = 0; + this.pending_write_buffer = this.unencrypted_bytes_to_send.?.buf[this.unencrypted_bytes_to_send.?.sent..this.unencrypted_bytes_to_send.?.used]; + while (true) { + const sent = this.doPayloadWrite() catch |err| { + if (err == error.WantRead or err == error.WantWrite) { + if (err == error.WantWrite) { + if (this.first_post_handshake_write and boring.SSL_is_init_finished(this.ssl) != 0 and this.pending_write_buffer.len == 0) { + this.first_post_handshake_write = false; + + if (boring.SSL_version(this.ssl) == boring.TLS1_3_VERSION) { + std.debug.assert(boring.SSL_key_update(this.ssl, boring.SSL_KEY_UPDATE_REQUESTED) == 0); + continue; + } } - continue; - }, - else => return err, + } + + this.pending_write_result = 0; + suspend { + this.send_frame.set(@frame()); + } + const result = this.pending_write_result; + this.pending_write_result = 0; + this.unencrypted_bytes_to_send.?.used = 0; + if (result) |res| { + return res; + } else |er| { + return er; + } + } + if (extremely_verbose) { + Output.prettyErrorln("SSL error: {s}", .{@errorName(err)}); + Output.flush(); } - unreachable; + return err; }; + this.unencrypted_bytes_to_send.?.sent += sent; + + if (this.unencrypted_bytes_to_send.?.sent == this.unencrypted_bytes_to_send.?.used) { + this.unencrypted_bytes_to_send.?.used = 0; + this.unencrypted_bytes_to_send.?.sent = 0; + } - break; + return sent; } + } + + fn readIfReady(this: *SSL, buf: []u8) anyerror!u32 { + return this.doPayloadRead(buf) catch |err| { + return err; + }; + } - return len; + pub fn read(this: *SSL, buf_: []u8, offset: u64) !u32 { + var buf = buf_[offset..]; + + return this.readIfReady(buf) catch |err| { + if (err == error.WouldBlock) { + this.pending_read_result = 0; + this.pending_read_buffer = buf; + + suspend { + this.read_frame.set(@frame()); + } + const result = this.pending_read_result; + this.pending_read_result = 0; + this.pending_read_buffer = &[_]u8{}; + + return result; + } + return err; + }; } pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL { @@ -493,8 +728,8 @@ pub const SSL = struct { if (this.ssl_bio) |bio| { _ = boring.BIO_set_data(bio.bio, null); - bio.pending_frame = AsyncBIO.PendingFrame.init(); bio.socket_fd = 0; + bio.onReady = null; bio.release(); this.ssl_bio = null; } @@ -506,7 +741,7 @@ pub const SSL = struct { this.handshake_complete = false; - if (this.read_bio) |bio| { + if (this.unencrypted_bytes_to_send) |bio| { var next_ = bio.next; while (next_) |next| { next.release(); @@ -514,7 +749,7 @@ pub const SSL = struct { } bio.release(); - this.read_bio = null; + this.unencrypted_bytes_to_send = null; } } }; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 54c42418f..107f576f3 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -61,7 +61,7 @@ else pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC; -pub const extremely_verbose = false; +pub const extremely_verbose = true; fn writeRequest( comptime Writer: type, diff --git a/src/runtime.version b/src/runtime.version index 65a740c58..78a0f916c 100644 --- a/src/runtime.version +++ b/src/runtime.version @@ -1 +1 @@ -f788d09f30e73401
\ No newline at end of file +6d5479b747f121cc
\ No newline at end of file diff --git a/src/thread_pool.zig b/src/thread_pool.zig index b3d25b015..fd1fb7e83 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -261,9 +261,12 @@ fn _wait(self: *ThreadPool, _is_waking: bool, comptime sleep_on_idle: bool) erro } else { if (self.io) |io| { const HTTP = @import("http"); - io.run_for_ns(std.time.ns_per_us * 100) catch {}; - while (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > HTTP.AsyncHTTP.max_simultaneous_requests) { - io.run_for_ns(std.time.ns_per_us * 10) catch {}; + io.tick() catch {}; + + if (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > 0) { + while (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > HTTP.AsyncHTTP.max_simultaneous_requests) { + io.run_for_ns(std.time.ns_per_us * 10) catch {}; + } } if (sleep_on_idle) { |