aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-03 21:01:14 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-03 21:01:14 -0800
commitbaffe26dd1e8da568fc77da53d36cd9c77d38c1d (patch)
tree827c6915b46f996673379c4be4d785d32942d16a /src
parent1993f9f9a5f3b62ec55ee605d15dfce63afecbcd (diff)
downloadbun-baffe26dd1e8da568fc77da53d36cd9c77d38c1d.tar.gz
bun-baffe26dd1e8da568fc77da53d36cd9c77d38c1d.tar.zst
bun-baffe26dd1e8da568fc77da53d36cd9c77d38c1d.zip
Fix bug with http client
Diffstat (limited to 'src')
-rw-r--r--src/deps/boringssl.translated.zig1
-rw-r--r--src/http/async_bio.zig79
-rw-r--r--src/http/async_socket.zig112
-rw-r--r--src/http_client_async.zig9
-rw-r--r--src/thread_pool.zig3
5 files changed, 97 insertions, 107 deletions
diff --git a/src/deps/boringssl.translated.zig b/src/deps/boringssl.translated.zig
index 5a161f99a..bb0f03136 100644
--- a/src/deps/boringssl.translated.zig
+++ b/src/deps/boringssl.translated.zig
@@ -18848,6 +18848,7 @@ pub const SSL_CTX = opaque {
SSL_CTX_set0_buffer_pool(ctx, auto_crypto_buffer_pool);
// _ = SSL_CTX_set_mode(ctx, SSL_MODE_AUTO_RETRY);
_ = SSL_CTX_set_cipher_list(ctx, SSL_DEFAULT_CIPHER_LIST);
+ SSL_CTX_set_quiet_shutdown(ctx, 1);
return ctx;
}
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index 2dfde8968..ab1c7a645 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -25,30 +25,22 @@ const Packet = struct {
pub const Pool = ObjectPool(Packet, null, false, 32);
};
-bio: *boring.BIO = undefined,
+bio: ?*boring.BIO = null,
socket_fd: std.os.socket_t = 0,
allocator: std.mem.Allocator,
pending_reads: u32 = 0,
pending_sends: u32 = 0,
-
recv_buffer: ?*BufferPool.Node = null,
-big_buffer: std.ArrayListUnmanaged(u8) = .{},
send_buffer: ?*BufferPool.Node = null,
-
-write_error: c_int = 0,
socket_recv_len: c_int = 0,
socket_send_len: u32 = 0,
-socket_recv_eof: bool = false,
bio_write_offset: u32 = 0,
bio_read_offset: u32 = 0,
-
socket_send_error: ?anyerror = null,
socket_recv_error: ?anyerror = null,
-next: ?*AsyncBIO = null,
-
onReady: ?Callback = null,
pub const Callback = struct {
@@ -82,7 +74,9 @@ pub fn nextFrame(this: *AsyncBIO) void {
}
var method: ?*boring.BIO_METHOD = null;
-var head: ?*AsyncBIO = null;
+pub fn initBoringSSL() void {
+ method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl);
+}
const async_bio_name: [:0]const u8 = "AsyncBIO";
@@ -92,61 +86,14 @@ const Wait = enum {
completed,
};
-fn instance(allocator: std.mem.Allocator) *AsyncBIO {
- if (head) |head_| {
- var next = head_.next;
- var ret = head_;
- head = next;
-
- return ret;
- }
-
- var bio = allocator.create(AsyncBIO) catch unreachable;
- bio.* = AsyncBIO{
- .allocator = allocator,
- };
-
- return bio;
-}
-
-pub fn release(this: *AsyncBIO) void {
- if (head) |head_| {
- this.next = head_;
+pub fn init(this: *AsyncBIO) !void {
+ if (this.bio == null) {
+ this.bio = boring.BIO_new(
+ method.?,
+ );
}
- this.socket_send_len = 0;
- this.socket_recv_len = 0;
- this.bio_write_offset = 0;
- this.bio_read_offset = 0;
- this.socket_recv_error = null;
- this.socket_send_error = null;
- this.onReady = null;
-
- if (this.recv_buffer) |recv| {
- recv.release();
- this.recv_buffer = null;
- }
-
- if (this.send_buffer) |write| {
- write.release();
- this.send_buffer = null;
- }
-
- head = this;
-}
-
-pub fn init(allocator: std.mem.Allocator) !*AsyncBIO {
- var bio = instance(allocator);
-
- bio.bio = boring.BIO_new(
- method orelse brk: {
- method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl);
- break :brk method.?;
- },
- ) orelse return error.OutOfMemory;
-
- _ = boring.BIO_set_data(bio.bio, bio);
- return bio;
+ _ = boring.BIO_set_data(this.bio.?, this);
}
const WaitResult = enum {
@@ -174,10 +121,6 @@ pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.R
Output.flush();
}
- if (socket_recv_len == 0) {
- this.socket_recv_eof = true;
- }
-
// if (socket_recv_len == 0) {
this.onSocketReadComplete();
@@ -296,7 +239,7 @@ pub const Bio = struct {
if (boring.BIO_get_data(this_bio) != null) {
var this = cast(this_bio);
- this.release();
+ this.bio = null;
}
return 0;
diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig
index db4964fc0..e58462b09 100644
--- a/src/http/async_socket.zig
+++ b/src/http/async_socket.zig
@@ -196,6 +196,11 @@ pub const SendError = AsyncIO.SendError;
pub fn deinit(this: *AsyncSocket) void {
this.head.release();
+ this.err = null;
+ this.queued = 0;
+ this.sent = 0;
+ this.read_context = &[_]u8{};
+ this.read_offset = 0;
}
pub fn send(this: *AsyncSocket) SendError!usize {
@@ -309,7 +314,7 @@ pub const SSL = struct {
ssl_loaded: bool = false,
socket: AsyncSocket,
handshake_complete: bool = false,
- ssl_bio: ?*AsyncBIO = null,
+ ssl_bio: AsyncBIO = undefined,
unencrypted_bytes_to_send: ?*AsyncMessage = null,
connect_frame: Yield(SSL.handshake) = Yield(SSL.handshake){},
send_frame: Yield(SSL.send) = Yield(SSL.send){},
@@ -330,6 +335,8 @@ pub const SSL = struct {
pending_read_result: anyerror!u32 = 0,
pending_write_result: anyerror!u32 = 0,
+ handshake_retry_count: u16 = 5,
+
first_post_handshake_write: bool = true,
handshake_result: ?anyerror = null,
@@ -371,18 +378,16 @@ pub const SSL = struct {
ssl.setHostname(name_);
}
- var bio = try AsyncBIO.init(this.socket.allocator);
- errdefer bio.release();
- bio.onReady = AsyncBIO.Callback.Wrap(SSL, SSL.retryAll).get(this);
- bio.socket_fd = this.socket.socket;
- this.ssl_bio = bio;
+ try this.ssl_bio.init();
+ this.ssl_bio.onReady = AsyncBIO.Callback.Wrap(SSL, SSL.retryAll).get(this);
+ this.ssl_bio.socket_fd = this.socket.socket;
- boring.SSL_set_bio(ssl, bio.bio, bio.bio);
+ boring.SSL_set_bio(ssl, this.ssl_bio.bio.?, this.ssl_bio.bio.?);
// boring.SSL_set_early_data_enabled(ssl, 1);
_ = boring.SSL_clear_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT);
_ = boring.SSL_set_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT);
- const mode = boring.SSL_MODE_RELEASE_BUFFERS | boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START;
+ const mode = boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START;
_ = boring.SSL_set_mode(ssl, mode);
_ = boring.SSL_clear_mode(ssl, mode);
@@ -407,7 +412,7 @@ pub const SSL = struct {
boring.SSL_set_shed_handshake_config(ssl, 1);
- this.unencrypted_bytes_to_send = AsyncMessage.get(this.socket.allocator);
+ this.unencrypted_bytes_to_send = this.socket.head;
try this.handshake();
@@ -497,9 +502,9 @@ pub const SSL = struct {
}
pub fn doPayloadRead(this: *SSL, buffer: []u8, count: *u32) anyerror!u32 {
- if (this.ssl_bio.?.socket_recv_error != null) {
- const pending = this.ssl_bio.?.socket_recv_error.?;
- this.ssl_bio.?.socket_recv_error = null;
+ if (this.ssl_bio.socket_recv_error != null) {
+ const pending = this.ssl_bio.socket_recv_error.?;
+ this.ssl_bio.socket_recv_error = null;
return pending;
}
@@ -508,6 +513,7 @@ pub const SSL = struct {
var ssl_err: c_int = 0;
const buf_len = @truncate(u32, buffer.len);
while (true) {
+ boring.ERR_clear_error();
ssl_ret = boring.SSL_read(this.ssl, buffer.ptr + total_bytes_read, @intCast(c_int, buf_len - total_bytes_read));
ssl_err = boring.SSL_get_error(this.ssl, ssl_ret);
@@ -521,7 +527,7 @@ pub const SSL = struct {
// Continue processing records as long as there is more data available
// synchronously.
- if (!(ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE or (total_bytes_read < buf_len and ssl_ret > 0 and this.ssl_bio.?.hasPendingReadData()))) break;
+ if (!(ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE or (total_bytes_read < buf_len and ssl_ret > 0 and this.ssl_bio.hasPendingReadData()))) break;
}
// Although only the final SSL_read call may have failed, the failure needs to
@@ -547,6 +553,13 @@ pub const SSL = struct {
result = error.WouldBlock;
},
else => {
+ if (extremely_verbose) {
+ const err = boring.ERR_get_error();
+
+ const version = std.mem.span(boring.SSL_get_version(this.ssl));
+ var hostname = std.mem.span(std.mem.sliceTo(&this.hostname, 0));
+ Output.prettyErrorln("[{s}] OpenSSLError reading (version: {s}, total read: {d}) - code: {d}", .{ hostname, version, total_bytes_read, err });
+ }
result = error.OpenSSLError;
},
}
@@ -556,8 +569,8 @@ pub const SSL = struct {
// a connection, and instead terminate the TCP connection. This is reported
// as ERR_CONNECTION_CLOSED. Because of this, map the unclean shutdown to a
// graceful EOF, instead of treating it as an error as it should be.
- if (this.ssl_bio.?.socket_recv_error) |err| {
- this.ssl_bio.?.socket_recv_error = null;
+ if (this.ssl_bio.socket_recv_error) |err| {
+ this.ssl_bio.socket_recv_error = null;
return err;
}
@@ -630,6 +643,7 @@ pub const SSL = struct {
}
var byte: u8 = 0;
+ boring.ERR_clear_error();
var rv = boring.SSL_peek(this.ssl, &byte, 1);
var ssl_error = boring.SSL_get_error(this.ssl, rv);
switch (ssl_error) {
@@ -641,6 +655,8 @@ pub const SSL = struct {
}
fn doHandshake(this: *SSL) HandshakeError!void {
+ boring.ERR_clear_error();
+
const rv = boring.SSL_do_handshake(this.ssl);
if (rv <= 0) {
const ssl_error = boring.SSL_get_error(this.ssl, rv);
@@ -658,7 +674,25 @@ pub const SSL = struct {
this.next_handshake_state = HandshakeState.handshake;
return error.WouldBlock;
},
- else => return error.OpenSSLError,
+ boring.SSL_ERROR_SYSCALL => {
+ this.handshake_retry_count -|= 1;
+ if (this.handshake_retry_count > 0) {
+ this.next_handshake_state = HandshakeState.handshake;
+ return error.WouldBlock;
+ }
+
+ return error.OpenSSLError;
+ },
+ else => {
+ if (extremely_verbose) {
+ const err = boring.ERR_get_error();
+ var error_buf: [1024]u8 = undefined;
+ @memset(&error_buf, 0, 1024);
+ var err_msg = std.mem.span(boring.ERR_error_string(err, &error_buf));
+ Output.prettyErrorln("Handshaking error {s}", .{err_msg});
+ }
+ return error.OpenSSLError;
+ },
}
}
@@ -748,38 +782,46 @@ pub const SSL = struct {
pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL {
return SSL{
+ .ssl_bio = AsyncBIO{
+ .allocator = allocator,
+ },
.socket = try AsyncSocket.init(io, 0, allocator),
};
}
pub fn deinit(this: *SSL) void {
this.socket.deinit();
- if (!this.is_ssl) return;
-
- if (this.ssl_bio) |bio| {
- _ = boring.BIO_set_data(bio.bio, null);
- bio.socket_fd = 0;
- bio.onReady = null;
- bio.release();
- this.ssl_bio = null;
- }
if (this.ssl_loaded) {
+ _ = boring.SSL_shutdown(this.ssl);
this.ssl.deinit();
this.ssl_loaded = false;
}
- this.handshake_complete = false;
-
- if (this.unencrypted_bytes_to_send) |bio| {
- var next_ = bio.next;
- while (next_) |next| {
- next.release();
- next_ = next.next;
- }
+ if (this.ssl_bio.recv_buffer) |recv| {
+ recv.release();
+ }
- bio.release();
- this.unencrypted_bytes_to_send = null;
+ if (this.ssl_bio.send_buffer) |recv| {
+ recv.release();
}
+
+ this.ssl_bio.pending_reads = 0;
+ this.ssl_bio.pending_sends = 0;
+ this.ssl_bio.socket_recv_len = 0;
+ this.ssl_bio.socket_send_len = 0;
+ this.ssl_bio.bio_write_offset = 0;
+ this.ssl_bio.bio_read_offset = 0;
+ this.ssl_bio.socket_send_error = null;
+ this.ssl_bio.socket_recv_error = null;
+
+ this.ssl_bio.socket_fd = 0;
+ this.ssl_bio.onReady = null;
+
+ this.handshake_complete = false;
+
+ this.* = SSL{
+ .socket = this.socket,
+ };
}
};
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 171693ace..bd5c43d2d 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -47,6 +47,7 @@ pub fn onThreadStart() void {
AsyncIO.global_loaded = true;
NetworkThread.global.pool.io = &AsyncIO.global;
Global.setThreadName("HTTP");
+ AsyncBIO.initBoringSSL();
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -412,15 +413,14 @@ pub const AsyncHTTP = struct {
this.state.store(.sending, .Monotonic);
var timer = std.time.Timer.start() catch @panic("Timer failure");
defer this.elapsed = timer.read();
- _ = active_requests_count.fetchAdd(1, .Monotonic);
this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| {
- _ = active_requests_count.fetchSub(1, .Monotonic);
this.state.store(.fail, .Monotonic);
this.err = err;
if (sender.http.max_retry_count > sender.http.retries_count) {
sender.http.retries_count += 1;
+ sender.http.response_buffer.reset();
NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task));
return;
}
@@ -430,7 +430,6 @@ pub const AsyncHTTP = struct {
this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0));
this.state.store(.success, .Monotonic);
this.gzip_elapsed = this.client.gzip_elapsed;
- _ = active_requests_count.fetchSub(1, .Monotonic);
}
if (sender.http.callback) |callback| {
@@ -566,6 +565,10 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !
// this prevents stack overflow
redirect: while (this.remaining_redirect_count >= -1) {
if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit();
+ _ = AsyncHTTP.active_requests_count.fetchAdd(1, .Monotonic);
+ defer {
+ _ = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
+ }
this.stage = Stage.pending;
body_out_str.reset();
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index 8281e473b..166705a81 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -266,7 +266,8 @@ fn _wait(self: *ThreadPool, _is_waking: bool, comptime sleep_on_idle: bool) erro
if (end_count > 0) {
while (HTTP.AsyncHTTP.active_requests_count.loadUnchecked() > HTTP.AsyncHTTP.max_simultaneous_requests) {
- io.run_for_ns(std.time.ns_per_us * 10) catch {};
+ io.run_for_ns(std.time.ns_per_ms) catch {};
+ io.tick() catch {};
}
}