aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-01-31 01:11:33 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-01 14:45:47 -0800
commit5d7c34093fad240fbe19d9ff0b38a9a4a6ef5862 (patch)
treebdc0ebd8136397a315a7addfc564ce04d9dfebc3
parent9322cec8f261dda20a2a6cc6fe7aa887c4dfeeb9 (diff)
downloadbun-5d7c34093fad240fbe19d9ff0b38a9a4a6ef5862.tar.gz
bun-5d7c34093fad240fbe19d9ff0b38a9a4a6ef5862.tar.zst
bun-5d7c34093fad240fbe19d9ff0b38a9a4a6ef5862.zip
new boringSSL integration almost works with TLS 1.3
-rw-r--r--src/deps/boringssl.translated.zig40
-rw-r--r--src/http/async_bio.zig199
-rw-r--r--src/http/async_socket.zig493
-rw-r--r--src/http_client_async.zig2
-rw-r--r--src/runtime.version2
-rw-r--r--src/thread_pool.zig9
6 files changed, 567 insertions, 178 deletions
diff --git a/src/deps/boringssl.translated.zig b/src/deps/boringssl.translated.zig
index b0509ef1e..5a161f99a 100644
--- a/src/deps/boringssl.translated.zig
+++ b/src/deps/boringssl.translated.zig
@@ -18764,6 +18764,9 @@ pub const SSL = opaque {
};
}
+ const Output = @import("../global.zig").Output;
+ const Environment = @import("../global.zig").Environment;
+
pub fn read(this: *SSL, buf: []u8) Error!usize {
const rc = SSL_read(this, buf.ptr, @intCast(c_int, buf.len));
return switch (SSL_get_error(this, rc)) {
@@ -18790,10 +18793,16 @@ pub const SSL = opaque {
};
}
- pub fn write(this: *SSL, buf: []const u8) Error!usize {
+ pub fn write(this: *SSL, buf: []const u8) Error!u32 {
const rc = SSL_write(this, buf.ptr, @intCast(c_int, buf.len));
return switch (SSL_get_error(this, rc)) {
- SSL_ERROR_SSL => error.SSL,
+ SSL_ERROR_SSL => {
+ if (comptime Environment.isDebug) {
+ const errdescription = std.mem.span(SSL_error_description(SSL_ERROR_SSL).?);
+ Output.prettyError("SSL_ERROR: {s}", .{errdescription});
+ }
+ return error.SSL;
+ },
SSL_ERROR_WANT_READ => error.WantRead,
SSL_ERROR_WANT_WRITE => error.WantWrite,
SSL_ERROR_WANT_X509_LOOKUP => error.WantX509Lookup,
@@ -18812,7 +18821,7 @@ pub const SSL = opaque {
SSL_ERROR_HANDBACK => error.Handback,
SSL_ERROR_WANT_RENEGOTIATE => error.WantRenegotiate,
SSL_ERROR_HANDSHAKE_HINTS_READY => error.HandshakeHintsReady,
- else => @intCast(usize, rc),
+ else => @intCast(u32, rc),
};
}
@@ -18894,3 +18903,28 @@ pub const BIOMethod = struct {
return method;
}
};
+
+pub fn getError(this: *SSL, rc: c_int) SSL.Error!u32 {
+ return switch (SSL_get_error(this, rc)) {
+ SSL_ERROR_SSL => error.SSL,
+ SSL_ERROR_WANT_READ => error.WantRead,
+ SSL_ERROR_WANT_WRITE => error.WantWrite,
+ SSL_ERROR_WANT_X509_LOOKUP => error.WantX509Lookup,
+ SSL_ERROR_SYSCALL => error.Syscall,
+ SSL_ERROR_ZERO_RETURN => error.ZeroReturn,
+ SSL_ERROR_WANT_CONNECT => error.WantConnect,
+ SSL_ERROR_WANT_ACCEPT => error.WantAccept,
+ SSL_ERROR_WANT_CHANNEL_ID_LOOKUP => error.WantChannelIdLookup,
+ SSL_ERROR_PENDING_SESSION => error.PendingSession,
+ SSL_ERROR_PENDING_CERTIFICATE => error.PendingCertificate,
+ SSL_ERROR_WANT_PRIVATE_KEY_OPERATION => error.WantPrivateKeyOperation,
+ SSL_ERROR_PENDING_TICKET => error.PendingTicket,
+ SSL_ERROR_EARLY_DATA_REJECTED => error.EarlyDataRejected,
+ SSL_ERROR_WANT_CERTIFICATE_VERIFY => error.WantCertificateVerify,
+ SSL_ERROR_HANDOFF => error.Handoff,
+ SSL_ERROR_HANDBACK => error.Handback,
+ SSL_ERROR_WANT_RENEGOTIATE => error.WantRenegotiate,
+ SSL_ERROR_HANDSHAKE_HINTS_READY => error.HandshakeHintsReady,
+ else => @intCast(u32, rc),
+ };
+}
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index eadc40b2a..6c14c8ed8 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -10,7 +10,7 @@ const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS;
const getAllocator = @import("../http_client_async.zig").getAllocator;
const assert = std.debug.assert;
const BufferPool = AsyncMessage.BufferPool;
-
+const buffer_pool_len = AsyncMessage.buffer_pool_len;
const fail = -3;
const connection_closed = -2;
const pending = -1;
@@ -32,6 +32,9 @@ send_completion: Completion = undefined,
write_error: c_int = 0,
socket_recv_len: c_int = 0,
+socket_send_len: u32 = 0,
+socket_recv_eof: bool = false,
+bio_write_offset: u32 = 0,
bio_read_offset: u32 = 0,
socket_send_error: ?anyerror = null,
@@ -39,20 +42,35 @@ socket_recv_error: ?anyerror = null,
next: ?*AsyncBIO = null,
-pending_frame: PendingFrame = PendingFrame.init(),
-pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 });
+onReady: ?Callback = null,
-pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void {
- this.pending_frame.writeItem(frame) catch {};
-}
+pub const Callback = struct {
+ ctx: *anyopaque,
+ callback: fn (ctx: *anyopaque) void,
-pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe {
- return this.pending_frame.readItem();
-}
+ pub inline fn run(this: Callback) void {
+ this.callback(this.ctx);
+ }
+
+ pub fn Wrap(comptime Context: type, comptime func: anytype) type {
+ return struct {
+ pub fn wrap(context: *anyopaque) void {
+ func(@ptrCast(*Context, @alignCast(@alignOf(*Context), context)));
+ }
+
+ pub fn get(ctx: *Context) Callback {
+ return Callback{
+ .ctx = ctx,
+ .callback = wrap,
+ };
+ }
+ };
+ }
+};
pub fn nextFrame(this: *AsyncBIO) void {
- if (this.pending_frame.readItem()) |frame| {
- resume frame;
+ if (this.onReady) |ready| {
+ ready.run();
}
}
@@ -75,7 +93,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO {
ret.send_wait = .pending;
head = next;
- ret.pending_frame = PendingFrame.init();
return ret;
}
@@ -96,16 +113,22 @@ pub fn release(this: *AsyncBIO) void {
this.read_wait = .pending;
this.send_wait = .pending;
- this.pending_frame = PendingFrame.init();
+ this.socket_send_len = 0;
+ this.socket_recv_len = 0;
+ this.bio_write_offset = 0;
+ this.bio_read_offset = 0;
+ this.socket_recv_error = null;
+ this.socket_send_error = null;
+ this.onReady = null;
if (this.recv_buffer) |recv| {
recv.release();
this.recv_buffer = null;
}
- if (this.write_buffer) |write| {
+ if (this.send_buffer) |write| {
write.release();
- this.write_buffer = null;
+ this.send_buffer = null;
}
head = this;
@@ -132,7 +155,7 @@ const WaitResult = enum {
};
pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void {
- const socket_recv_len = @truncate(
+ const socket_recv_len = @intCast(
c_int,
result_ catch |err| {
this.socket_recv_error = err;
@@ -142,18 +165,48 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!
},
);
this.socket_recv_len += socket_recv_len;
-
+ if (extremely_verbose) {
+ Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
+ Output.flush();
+ }
if (socket_recv_len == 0) {
- this.onSocketReadComplete();
+ this.socket_recv_eof = true;
+ }
+ this.read_wait = .pending;
+
+ // if (socket_recv_len == 0) {
+
+ this.onSocketReadComplete();
+ // return;
+ // }
+
+ // this.read_wait = .pending;
+ // this.scheduleSocketRead();
+}
+
+pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void {
+ const socket_send_len = @truncate(
+ u32,
+ result_ catch |err| {
+ this.socket_send_error = err;
+ this.send_wait = .pending;
+ this.onSocketWriteComplete();
+ return;
+ },
+ );
+ this.socket_send_len += socket_send_len;
+
+ if (socket_send_len == 0 or this.writeBuf().len == 0) {
+ this.send_wait = .pending;
+ this.onSocketWriteComplete();
return;
}
- this.read_wait = .pending;
- this.scheduleSocketRead();
+ this.send_wait = .pending;
+ this.scheduleSocketWrite();
}
fn onSocketReadComplete(this: *AsyncBIO) void {
- assert(this.read_wait == .suspended);
this.handleSocketReadComplete();
this.nextFrame();
@@ -163,17 +216,33 @@ inline fn readBuf(this: *AsyncBIO) []u8 {
return this.recv_buffer.?.data[this.bio_read_offset..];
}
+inline fn writeBuf(this: *AsyncBIO) []const u8 {
+ return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset];
+}
+
+pub fn hasPendingReadData(this: *AsyncBIO) bool {
+ return this.socket_recv_len > 0;
+}
+
pub fn scheduleSocketRead(this: *AsyncBIO) void {
- assert(this.read_wait == .pending);
this.read_wait = .suspended;
+ if (this.recv_buffer == null) {
+ this.recv_buffer = BufferPool.get(getAllocator());
+ }
+
+ AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
+}
- AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
+pub fn scheduleSocketWrite(this: *AsyncBIO) void {
+ this.send_wait = .suspended;
+
+ AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS);
}
fn handleSocketReadComplete(
this: *AsyncBIO,
) void {
- this.read_wait = .completed;
+ this.read_wait = .pending;
if (this.socket_recv_len <= 0) {
if (this.recv_buffer) |buf| {
@@ -183,19 +252,31 @@ fn handleSocketReadComplete(
}
}
-pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
+pub fn onSocketWriteComplete(
+ this: *AsyncBIO,
+) void {
assert(this.send_wait == .pending);
- this.handleSocketWriteComplete(result);
+ this.handleSocketWriteComplete();
this.nextFrame();
}
-pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void {
- // this.last_socket_recv_len = result;
- // this.read_wait = .completed;
- // if (extremely_verbose) {
- // const socket_recv_len = result catch @as(usize, 999);
- // Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
- // Output.flush();
+pub fn handleSocketWriteComplete(
+ this: *AsyncBIO,
+) void {
+ this.send_wait = .completed;
+
+ if (extremely_verbose) {
+ Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len});
+ Output.flush();
+ }
+
+ // if (this.send_buffer) |buf| {
+ // buf.release();
+ // this.send_buffer = null;
+
+ // // this might be incorrect!
+ // this.bio_write_offset = 0;
+ // this.socket_send_len = 0;
// }
}
@@ -218,11 +299,18 @@ pub const Bio = struct {
return 0;
}
- pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int {
- if (len < 0) return len;
+ pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len_: c_int) callconv(.C) c_int {
+ if (len_ < 0) return len_;
assert(@ptrToInt(ptr) > 0);
- boring.BIO_clear_retry_flags(this_bio);
+ {
+ const retry_flags = boring.BIO_get_retry_flags(this_bio);
+ boring.BIO_clear_retry_flags(this_bio);
+ if ((retry_flags & boring.BIO_FLAGS_READ) != 0) {
+ boring.BIO_set_retry_read(this_bio);
+ }
+ }
var this = cast(this_bio);
+ const len = @intCast(u32, len_);
if (this.socket_send_error != null) {
if (extremely_verbose) {
@@ -231,14 +319,39 @@ pub const Bio = struct {
}
return -1;
}
+
+ if (this.send_buffer == null) {
+ this.send_buffer = BufferPool.get(getAllocator());
+ }
+
+ var data = this.send_buffer.?.data[this.bio_write_offset..];
+ const to_copy = @minimum(len, @intCast(u32, data.len));
+
+ if (to_copy == 0) {
+ boring.BIO_set_retry_write(this_bio);
+ return -1;
+ }
+
+ @memcpy(data.ptr, ptr, to_copy);
+ this.bio_write_offset += to_copy;
+
+ this.scheduleSocketWrite();
+
+ return @intCast(c_int, to_copy);
}
pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int {
if (len_ < 0) return len_;
const len__: u32 = @intCast(u32, len_);
assert(@ptrToInt(ptr) > 0);
+ {
+ const retry_flags = boring.BIO_get_retry_flags(this_bio);
+ boring.BIO_clear_retry_flags(this_bio);
+ if ((retry_flags & boring.BIO_FLAGS_WRITE) != 0) {
+ boring.BIO_set_retry_write(this_bio);
+ }
+ }
- boring.BIO_clear_retry_flags(this_bio);
var this = cast(this_bio);
var socket_recv_len = this.socket_recv_len;
@@ -258,7 +371,7 @@ pub const Bio = struct {
// error while writing that would otherwise not be reported until the
// application attempted to write again - which it may never do. See
// https://crbug.com/249848.
- if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) {
+ if ((this.socket_send_error != null) and (socket_recv_len == OK or socket_recv_len == pending)) {
return -1;
}
@@ -270,7 +383,9 @@ pub const Bio = struct {
// reused after shutdown for non-SSL traffic, so overreading is fine.
assert(bio_read_offset == 0);
this.scheduleSocketRead();
- socket_recv_len = pending;
+
+ boring.BIO_set_retry_read(this_bio);
+ return pending;
}
if (socket_recv_len == pending) {
@@ -287,9 +402,11 @@ pub const Bio = struct {
const socket_recv_len_ = @intCast(u32, socket_recv_len);
// Report the result of the last Read() if non-empty.
- if (!(bio_read_offset < socket_recv_len_)) return 0;
+ if (!(bio_read_offset < socket_recv_len_)) {
+ return 0;
+ }
const len = @minimum(len__, socket_recv_len_ - bio_read_offset);
- var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]);
+ var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr);
@memcpy(ptr, data, len);
bio_read_offset += len;
diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig
index 83ee904aa..84b024dfd 100644
--- a/src/http/async_socket.zig
+++ b/src/http/async_socket.zig
@@ -15,6 +15,10 @@ const SOCKET_FLAGS: u32 = @import("../http_client_async.zig").SOCKET_FLAGS;
const getAllocator = @import("../http_client_async.zig").getAllocator;
const OPEN_SOCKET_FLAGS: u32 = @import("../http_client_async.zig").OPEN_SOCKET_FLAGS;
+const SSLFeatureFlags = struct {
+ pub const early_data_enabled = true;
+};
+
io: *AsyncIO = undefined,
socket: std.os.socket_t = 0,
head: *AsyncMessage = undefined,
@@ -278,21 +282,64 @@ pub fn read(
return this.read_offset - original_read_offset;
}
+pub fn Yield(comptime Type: anytype) type {
+ return struct {
+ frame: @Frame(Type) = undefined,
+ wait: bool = false,
+
+ pub fn set(this: *@This(), frame: anytype) void {
+ this.wait = true;
+ this.frame = frame.*;
+ }
+
+ pub fn maybeResume(this: *@This()) void {
+ if (!this.wait) return;
+ this.wait = false;
+ resume this.frame;
+ }
+ };
+}
+
pub const SSL = struct {
ssl: *boring.SSL = undefined,
ssl_loaded: bool = false,
socket: AsyncSocket,
handshake_complete: bool = false,
ssl_bio: ?*AsyncBIO = null,
- read_bio: ?*AsyncMessage = null,
- handshake_frame: @Frame(SSL.handshake) = undefined,
- send_frame: @Frame(SSL.send) = undefined,
- read_frame: @Frame(SSL.read) = undefined,
+ unencrypted_bytes_to_send: ?*AsyncMessage = null,
+ connect_frame: Yield(SSL.handshake) = Yield(SSL.handshake){},
+ send_frame: Yield(SSL.send) = Yield(SSL.send){},
+ read_frame: Yield(SSL.read) = Yield(SSL.read){},
+
hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined,
is_ssl: bool = false,
+ handshake_state: HandshakeState = HandshakeState.none,
+ next_handshake_state: HandshakeState = HandshakeState.none,
+ first_posthandshake_write: bool = true,
+ in_confirm_handshake: bool = false,
+ completed_connect: bool = false,
+ disconnected: bool = false,
+
+ pending_write_buffer: []const u8 = &[_]u8{},
+ pending_read_buffer: []u8 = &[_]u8{},
+ pending_read_result: anyerror!u32 = 0,
+ pending_write_result: anyerror!u32 = 0,
+
+ first_post_handshake_write: bool = true,
+
+ handshake_result: ?anyerror = null,
+
+ peek_complete: bool = false,
+
+ pub const HandshakeState = enum {
+ none,
+ handshake,
+ complete,
+ };
+
const SSLConnectError = ConnectError || HandshakeError;
- const HandshakeError = error{OpenSSLError};
+ const HandshakeError = error{ ClientCertNeeded, OpenSSLError, WouldBlock };
pub fn connect(this: *SSL, name: []const u8, port: u16) !void {
this.is_ssl = true;
@@ -309,6 +356,10 @@ pub const SSL = struct {
this.ssl = undefined;
}
+ // SNI should only contain valid DNS hostnames, not IP addresses (see RFC
+ // 6066, Section 3).
+ //
+ // See https://crbug.com/496472 and https://crbug.com/496468 for discussion.
{
std.mem.copy(u8, &this.hostname, name);
this.hostname[name.len] = 0;
@@ -317,168 +368,352 @@ pub const SSL = struct {
}
var bio = try AsyncBIO.init(this.socket.allocator);
+ errdefer bio.release();
+ bio.onReady = AsyncBIO.Callback.Wrap(SSL, SSL.retryAll).get(this);
bio.socket_fd = this.socket.socket;
this.ssl_bio = bio;
boring.SSL_set_bio(ssl, bio.bio, bio.bio);
- this.read_bio = AsyncMessage.get(this.socket.allocator);
+ // boring.SSL_set_early_data_enabled(ssl, 1);
+ _ = boring.SSL_clear_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT);
+ _ = boring.SSL_set_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT);
+ const mode = boring.SSL_MODE_RELEASE_BUFFERS | boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START;
+
+ _ = boring.SSL_set_mode(ssl, mode);
+ _ = boring.SSL_clear_mode(ssl, mode);
+
+ var alpns = &[_]u8{ 8, 'h', 't', 't', 'p', '/', '1', '.', '1' };
+ std.debug.assert(boring.SSL_set_alpn_protos(this.ssl, alpns, alpns.len) == 0);
+
+ boring.SSL_enable_signed_cert_timestamps(ssl);
+ boring.SSL_enable_ocsp_stapling(ssl);
+
+ // std.debug.assert(boring.SSL_set_strict_cipher_list(ssl, boring.SSL_DEFAULT_CIPHER_LIST) == 0);
+
+ boring.SSL_set_enable_ech_grease(ssl, 1);
+
+ // Configure BoringSSL to allow renegotiations. Once the initial handshake
+ // completes, if renegotiations are not allowed, the default reject value will
+ // be restored. This is done in this order to permit a BoringSSL
+ // optimization. See https://crbug.com/boringssl/123. Use
+ // ssl_renegotiate_explicit rather than ssl_renegotiate_freely so DoPeek()
+ // does not trigger renegotiations.
+ boring.SSL_set_renegotiate_mode(ssl, boring.ssl_renegotiate_explicit);
+
+ boring.SSL_set_shed_handshake_config(ssl, 1);
+
+ this.unencrypted_bytes_to_send = AsyncMessage.get(this.socket.allocator);
+
try this.handshake();
+
+ this.completed_connect = true;
}
pub fn close(this: *SSL) void {
this.socket.close();
}
- fn handshake(this: *SSL) HandshakeError!void {
- while (!this.ssl.isInitFinished()) {
- boring.ERR_clear_error();
- this.ssl_bio.?.enqueueSend();
- const handshake_result = boring.SSL_connect(this.ssl);
- if (handshake_result == 0) {
- Output.prettyErrorln("ssl accept error", .{});
- Output.flush();
- return error.OpenSSLError;
+ pub fn handshake(this: *SSL) HandshakeError!void {
+ this.next_handshake_state = .handshake;
+ this.handshake_result = null;
+ this.doHandshakeLoop() catch |err| {
+ if (err == error.WouldBlock) {
+ suspend {
+ this.connect_frame.set(@frame());
+ }
+ } else {
+ return err;
}
- this.handshake_complete = handshake_result == 1 and this.ssl.isInitFinished();
+ };
- if (!this.handshake_complete) {
- // accept_result < 0
- const e = boring.SSL_get_error(this.ssl, handshake_result);
- if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) {
- this.ssl_bio.?.enqueueSend();
- suspend {
- this.handshake_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.handshake_frame);
- }
+ if (this.handshake_result) |handshake_err| {
+ const err2 = @errSetCast(HandshakeError, handshake_err);
+ this.handshake_result = null;
+ return err2;
+ }
+ }
- continue;
- }
+ fn retryAll(this: *SSL) void {
+ const had_handshaked = this.completed_connect;
+ // SSL_do_handshake, SSL_read, and SSL_write may all be retried when blocked,
+ // so retry all operations for simplicity. (Otherwise, SSL_get_error for each
+ // operation may be remembered to retry only the blocked ones.)
+ if (this.next_handshake_state == .handshake) {
+ this.onHandshakeIOComplete() catch {};
+ }
+
+ this.doPeek();
+ if (!had_handshaked or !this.peek_complete) return;
+
+ if (this.pending_read_buffer.len > 0) {
+ reader: {
+ this.pending_read_result = this.doPayloadRead(this.pending_read_buffer) catch |err| brk: {
+ if (err == error.WouldBlock) break :reader;
+ break :brk err;
+ };
- Output.prettyErrorln("ssl accept error = {}, return val was {}", .{ e, handshake_result });
- Output.flush();
- return error.OpenSSLError;
+ this.read_frame.maybeResume();
}
}
- }
- pub fn write(this: *SSL, buffer_: []const u8) usize {
- var buffer = buffer_;
- var read_bio = this.read_bio;
- while (buffer.len > 0) {
- const response = read_bio.?.writeAll(buffer);
- buffer = buffer[response.written..];
- if (response.overflow) {
- read_bio = read_bio.?.next orelse brk: {
- read_bio.?.next = AsyncMessage.get(this.socket.allocator);
- break :brk read_bio.?.next.?;
+ if (this.pending_write_buffer.len > 0) {
+ writer: {
+ this.pending_write_result = this.doPayloadWrite() catch |err| brk: {
+ if (err == error.WantWrite or err == error.WantRead) break :writer;
+ break :brk err;
};
+
+ this.send_frame.maybeResume();
}
}
+ }
+
+ pub fn doPayloadWrite(this: *SSL) anyerror!u32 {
+ const rv = try this.ssl.write(this.pending_write_buffer);
+
+ if (rv >= 0) {
+ this.pending_write_buffer = this.pending_write_buffer[rv..];
+ }
- return buffer_.len;
+ return rv;
}
- pub fn send(this: *SSL) !usize {
- var bio_ = this.read_bio;
- var len: usize = 0;
- while (bio_) |bio| {
- var slice = bio.slice();
- len += this.ssl.write(slice) catch |err| {
- switch (err) {
- error.WantRead => {
- suspend {
- this.send_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.send_frame);
- }
- continue;
- },
- error.WantWrite => {
- this.ssl_bio.?.enqueueSend();
-
- suspend {
- this.send_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.send_frame);
- }
- continue;
- },
- else => {},
- }
+ pub fn doPayloadRead(this: *SSL, buffer: []u8) anyerror!u32 {
+ if (this.ssl_bio.?.socket_recv_error != null) {
+ const pending = this.ssl_bio.?.socket_recv_error.?;
+ this.ssl_bio.?.socket_recv_error = null;
+ return pending;
+ }
- if (comptime Environment.isDebug) {
- Output.prettyErrorln("SSL error: {s} (buf: {s})\n URL:", .{
- @errorName(err),
- bio.slice(),
- });
- Output.flush();
+ var total_bytes_read: u32 = 0;
+ var ssl_ret: c_int = 0;
+ var ssl_err: c_int = 0;
+ const buf_len = @truncate(u32, buffer.len);
+ while (true) {
+ ssl_ret = boring.SSL_read(this.ssl, buffer.ptr + total_bytes_read, @intCast(c_int, buf_len - total_bytes_read));
+ ssl_err = boring.SSL_get_error(this.ssl, ssl_ret);
+
+ if (ssl_ret > 0) {
+ total_bytes_read += @intCast(u32, ssl_ret);
+ } else if (ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE) {
+ if (boring.SSL_renegotiate(this.ssl) == 0) {
+ ssl_err = boring.SSL_ERROR_SSL;
}
+ }
- return err;
- };
+ // Continue processing records as long as there is more data available
+ // synchronously.
+ if (!(ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE or (total_bytes_read < buf_len and ssl_ret > 0 and this.ssl_bio.?.hasPendingReadData()))) break;
+ }
+
+ // Although only the final SSL_read call may have failed, the failure needs to
+ // processed immediately, while the information still available in OpenSSL's
+ // error queue.
+ var result: anyerror!u32 = total_bytes_read;
+
+ if (ssl_ret <= 0) {
+ switch (ssl_err) {
+ boring.SSL_ERROR_ZERO_RETURN => {},
+ boring.SSL_ERROR_WANT_X509_LOOKUP => {
+ result = error.SSLErrorWantX509Lookup;
+ },
+ boring.SSL_ERROR_WANT_PRIVATE_KEY_OPERATION => {
+ result = error.SSLErrorWantPrivateKeyOperation;
+ },
+
+ // Do not treat insufficient data as an error to return in the next call to
+ // DoPayloadRead() - instead, let the call fall through to check SSL_read()
+ // again. The transport may have data available by then.
+ boring.SSL_ERROR_WANT_READ, boring.SSL_ERROR_WANT_WRITE => {
+ result = error.WouldBlock;
+ },
+ else => {
+ result = error.OpenSSLError;
+ },
+ }
+ }
- bio_ = bio.next;
+ // Many servers do not reliably send a close_notify alert when shutting down
+ // a connection, and instead terminate the TCP connection. This is reported
+ // as ERR_CONNECTION_CLOSED. Because of this, map the unclean shutdown to a
+ // graceful EOF, instead of treating it as an error as it should be.
+ if (this.ssl_bio.?.socket_recv_error) |err| {
+ this.ssl_bio.?.socket_recv_error = null;
+ return err;
}
- return len;
+
+ return result;
}
- pub fn read(this: *SSL, buf_: []u8, offset: u64) !u64 {
- var buf = buf_[offset..];
- var len: usize = 0;
- while (buf.len > 0) {
- this.ssl_bio.?.read_buf_len = buf.len;
- len = this.ssl.read(buf) catch |err| {
- switch (err) {
- error.WantWrite => {
- this.ssl_bio.?.enqueueSend();
-
- if (extremely_verbose) {
- Output.prettyErrorln(
- "error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
- .{
- @errorName(err),
- @tagName(this.ssl_bio.?.read_wait),
- @tagName(this.ssl_bio.?.send_wait),
- },
- );
- Output.flush();
+ fn doHandshakeLoop(
+ this: *SSL,
+ ) HandshakeError!void {
+ while (true) {
+ var state = this.next_handshake_state;
+ this.next_handshake_state = HandshakeState.none;
+ switch (state) {
+ .handshake => {
+ this.doHandshake() catch |err| {
+ if (err != error.WouldBlock) {
+ this.handshake_result = err;
}
+ return err;
+ };
+ },
+ .complete => {
+ this.doHandshakeComplete();
+ },
+ else => unreachable,
+ }
+ if (this.next_handshake_state == .none) return;
+ }
+ }
- suspend {
- this.read_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.read_frame);
- }
- continue;
- },
- error.WantRead => {
- // this.ssl_bio.enqueueSend();
-
- if (extremely_verbose) {
- Output.prettyErrorln(
- "error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
- .{
- @errorName(err),
- @tagName(this.ssl_bio.?.read_wait),
- @tagName(this.ssl_bio.?.send_wait),
- },
- );
- Output.flush();
- }
+ fn onHandshakeIOComplete(this: *SSL) HandshakeError!void {
+ this.doHandshakeLoop() catch |err| {
+ if (err == error.WouldBlock) {
+ return;
+ }
+ this.in_confirm_handshake = false;
+ this.connect_frame.maybeResume();
+ return;
+ };
+ this.connect_frame.maybeResume();
+ }
+
+ fn doHandshakeComplete(this: *SSL) void {
+ if (this.in_confirm_handshake) {
+ this.next_handshake_state = .none;
+ return;
+ }
+
+ this.completed_connect = true;
+ this.next_handshake_state = .none;
+ this.doPeek();
+ }
+
+ fn doPeek(this: *SSL) void {
+ if (!this.completed_connect) {
+ return;
+ }
+
+ if (this.peek_complete) {
+ return;
+ }
+
+ var byte: u8 = 0;
+ var rv = boring.SSL_peek(this.ssl, &byte, 1);
+ var ssl_error = boring.SSL_get_error(this.ssl, rv);
+ switch (ssl_error) {
+ boring.SSL_ERROR_WANT_READ, boring.SSL_ERROR_WANT_WRITE => {},
+ else => {
+ this.peek_complete = true;
+ },
+ }
+ }
+
+ fn doHandshake(this: *SSL) HandshakeError!void {
+ const rv = boring.SSL_do_handshake(this.ssl);
+ if (rv <= 0) {
+ const ssl_error = boring.SSL_get_error(this.ssl, rv);
+
+ switch (ssl_error) {
+ boring.SSL_ERROR_WANT_PRIVATE_KEY_OPERATION, boring.SSL_ERROR_WANT_X509_LOOKUP => {
+ this.next_handshake_state = HandshakeState.handshake;
+ return error.ClientCertNeeded;
+ },
+ boring.SSL_ERROR_WANT_CERTIFICATE_VERIFY => {
+ this.next_handshake_state = HandshakeState.handshake;
+ return error.ClientCertNeeded;
+ },
+ boring.SSL_ERROR_WANT_READ, boring.SSL_ERROR_WANT_WRITE => {
+ this.next_handshake_state = HandshakeState.handshake;
+ return error.WouldBlock;
+ },
+ else => return error.OpenSSLError,
+ }
+ }
+
+ this.next_handshake_state = HandshakeState.complete;
+ }
+
+ pub fn write(this: *SSL, buffer_: []const u8) usize {
+ return this.unencrypted_bytes_to_send.?.writeAll(buffer_).written;
+ }
- suspend {
- this.read_frame = @frame().*;
- this.ssl_bio.?.pushPendingFrame(&this.read_frame);
+ pub fn send(this: *SSL) anyerror!usize {
+ this.unencrypted_bytes_to_send.?.sent = 0;
+ this.pending_write_buffer = this.unencrypted_bytes_to_send.?.buf[this.unencrypted_bytes_to_send.?.sent..this.unencrypted_bytes_to_send.?.used];
+ while (true) {
+ const sent = this.doPayloadWrite() catch |err| {
+ if (err == error.WantRead or err == error.WantWrite) {
+ if (err == error.WantWrite) {
+ if (this.first_post_handshake_write and boring.SSL_is_init_finished(this.ssl) != 0 and this.pending_write_buffer.len == 0) {
+ this.first_post_handshake_write = false;
+
+ if (boring.SSL_version(this.ssl) == boring.TLS1_3_VERSION) {
+ std.debug.assert(boring.SSL_key_update(this.ssl, boring.SSL_KEY_UPDATE_REQUESTED) == 0);
+ continue;
+ }
}
- continue;
- },
- else => return err,
+ }
+
+ this.pending_write_result = 0;
+ suspend {
+ this.send_frame.set(@frame());
+ }
+ const result = this.pending_write_result;
+ this.pending_write_result = 0;
+ this.unencrypted_bytes_to_send.?.used = 0;
+ if (result) |res| {
+ return res;
+ } else |er| {
+ return er;
+ }
+ }
+ if (extremely_verbose) {
+ Output.prettyErrorln("SSL error: {s}", .{@errorName(err)});
+ Output.flush();
}
- unreachable;
+ return err;
};
+ this.unencrypted_bytes_to_send.?.sent += sent;
+
+ if (this.unencrypted_bytes_to_send.?.sent == this.unencrypted_bytes_to_send.?.used) {
+ this.unencrypted_bytes_to_send.?.used = 0;
+ this.unencrypted_bytes_to_send.?.sent = 0;
+ }
- break;
+ return sent;
}
+ }
+
+ fn readIfReady(this: *SSL, buf: []u8) anyerror!u32 {
+ return this.doPayloadRead(buf) catch |err| {
+ return err;
+ };
+ }
- return len;
+ pub fn read(this: *SSL, buf_: []u8, offset: u64) !u32 {
+ var buf = buf_[offset..];
+
+ return this.readIfReady(buf) catch |err| {
+ if (err == error.WouldBlock) {
+ this.pending_read_result = 0;
+ this.pending_read_buffer = buf;
+
+ suspend {
+ this.read_frame.set(@frame());
+ }
+ const result = this.pending_read_result;
+ this.pending_read_result = 0;
+ this.pending_read_buffer = &[_]u8{};
+
+ return result;
+ }
+ return err;
+ };
}
pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL {
@@ -493,8 +728,8 @@ pub const SSL = struct {
if (this.ssl_bio) |bio| {
_ = boring.BIO_set_data(bio.bio, null);
- bio.pending_frame = AsyncBIO.PendingFrame.init();
bio.socket_fd = 0;
+ bio.onReady = null;
bio.release();
this.ssl_bio = null;
}
@@ -506,7 +741,7 @@ pub const SSL = struct {
this.handshake_complete = false;
- if (this.read_bio) |bio| {
+ if (this.unencrypted_bytes_to_send) |bio| {
var next_ = bio.next;
while (next_) |next| {
next.release();
@@ -514,7 +749,7 @@ pub const SSL = struct {
}
bio.release();
- this.read_bio = null;
+ this.unencrypted_bytes_to_send = null;
}
}
};
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 54c42418f..107f576f3 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -61,7 +61,7 @@ else
pub const OPEN_SOCKET_FLAGS = SOCK.CLOEXEC;
-pub const extremely_verbose = false;
+pub const extremely_verbose = true;
fn writeRequest(
comptime Writer: type,
diff --git a/src/runtime.version b/src/runtime.version
index 65a740c58..78a0f916c 100644
--- a/src/runtime.version
+++ b/src/runtime.version
@@ -1 +1 @@
-f788d09f30e73401 \ No newline at end of file
+6d5479b747f121cc \ No newline at end of file
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index b3d25b015..fd1fb7e83 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -261,9 +261,12 @@ fn _wait(self: *ThreadPool, _is_waking: bool, comptime sleep_on_idle: bool) erro
} else {
if (self.io) |io| {
const HTTP = @import("http");
- io.run_for_ns(std.time.ns_per_us * 100) catch {};
- while (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > HTTP.AsyncHTTP.max_simultaneous_requests) {
- io.run_for_ns(std.time.ns_per_us * 10) catch {};
+ io.tick() catch {};
+
+ if (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > 0) {
+ while (HTTP.AsyncHTTP.active_requests_count.load(.Monotonic) > HTTP.AsyncHTTP.max_simultaneous_requests) {
+ io.run_for_ns(std.time.ns_per_us * 10) catch {};
+ }
}
if (sleep_on_idle) {