diff options
author | 2022-02-03 21:01:14 -0800 | |
---|---|---|
committer | 2022-02-03 21:01:14 -0800 | |
commit | baffe26dd1e8da568fc77da53d36cd9c77d38c1d (patch) | |
tree | 827c6915b46f996673379c4be4d785d32942d16a | |
parent | 1993f9f9a5f3b62ec55ee605d15dfce63afecbcd (diff) | |
download | bun-baffe26dd1e8da568fc77da53d36cd9c77d38c1d.tar.gz bun-baffe26dd1e8da568fc77da53d36cd9c77d38c1d.tar.zst bun-baffe26dd1e8da568fc77da53d36cd9c77d38c1d.zip |
Fix bug with http client
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | src/deps/boringssl.translated.zig | 1 | ||||
-rw-r--r-- | src/http/async_bio.zig | 79 | ||||
-rw-r--r-- | src/http/async_socket.zig | 112 | ||||
-rw-r--r-- | src/http_client_async.zig | 9 | ||||
-rw-r--r-- | src/thread_pool.zig | 3 |
6 files changed, 101 insertions, 107 deletions
@@ -308,11 +308,15 @@ vendor-without-check: api analytics node-fallbacks runtime_js fallback_decoder b boringssl-build: cd $(BUN_DEPS_DIR)/boringssl && mkdir -p build && cd build && CFLAGS="$(CFLAGS)" cmake $(CMAKE_FLAGS) -GNinja .. && ninja +boringssl-build-debug: + cd $(BUN_DEPS_DIR)/boringssl && mkdir -p build && cd build && CFLAGS="$(CFLAGS)" cmake $(CMAKE_FLAGS_WITHOUT_RELEASE) -GNinja .. && ninja + boringssl-copy: cp $(BUN_DEPS_DIR)/boringssl/build/ssl/libssl.a $(BUN_DEPS_OUT_DIR)/libssl.a cp $(BUN_DEPS_DIR)/boringssl/build/crypto/libcrypto.a $(BUN_DEPS_OUT_DIR)/libcrypto.boring.a boringssl: boringssl-build boringssl-copy +boringssl-debug: boringssl-build-debug boringssl-copy libbacktrace: cd $(BUN_DEPS_DIR)/libbacktrace && \ diff --git a/src/deps/boringssl.translated.zig b/src/deps/boringssl.translated.zig index 5a161f99a..bb0f03136 100644 --- a/src/deps/boringssl.translated.zig +++ b/src/deps/boringssl.translated.zig @@ -18848,6 +18848,7 @@ pub const SSL_CTX = opaque { SSL_CTX_set0_buffer_pool(ctx, auto_crypto_buffer_pool); // _ = SSL_CTX_set_mode(ctx, SSL_MODE_AUTO_RETRY); _ = SSL_CTX_set_cipher_list(ctx, SSL_DEFAULT_CIPHER_LIST); + SSL_CTX_set_quiet_shutdown(ctx, 1); return ctx; } diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index 2dfde8968..ab1c7a645 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -25,30 +25,22 @@ const Packet = struct { pub const Pool = ObjectPool(Packet, null, false, 32); }; -bio: *boring.BIO = undefined, +bio: ?*boring.BIO = null, socket_fd: std.os.socket_t = 0, allocator: std.mem.Allocator, pending_reads: u32 = 0, pending_sends: u32 = 0, - recv_buffer: ?*BufferPool.Node = null, -big_buffer: std.ArrayListUnmanaged(u8) = .{}, send_buffer: ?*BufferPool.Node = null, - -write_error: c_int = 0, socket_recv_len: c_int = 0, socket_send_len: u32 = 0, -socket_recv_eof: bool = false, bio_write_offset: u32 = 0, bio_read_offset: u32 = 0, - socket_send_error: ?anyerror = null, socket_recv_error: ?anyerror = null, -next: ?*AsyncBIO = null, - onReady: ?Callback = null, pub const Callback = struct { @@ -82,7 +74,9 @@ pub fn nextFrame(this: *AsyncBIO) void { } var method: ?*boring.BIO_METHOD = null; -var head: ?*AsyncBIO = null; +pub fn initBoringSSL() void { + method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); +} const async_bio_name: [:0]const u8 = "AsyncBIO"; @@ -92,61 +86,14 @@ const Wait = enum { completed, }; -fn instance(allocator: std.mem.Allocator) *AsyncBIO { - if (head) |head_| { - var next = head_.next; - var ret = head_; - head = next; - - return ret; - } - - var bio = allocator.create(AsyncBIO) catch unreachable; - bio.* = AsyncBIO{ - .allocator = allocator, - }; - - return bio; -} - -pub fn release(this: *AsyncBIO) void { - if (head) |head_| { - this.next = head_; +pub fn init(this: *AsyncBIO) !void { + if (this.bio == null) { + this.bio = boring.BIO_new( + method.?, + ); } - this.socket_send_len = 0; - this.socket_recv_len = 0; - this.bio_write_offset = 0; - this.bio_read_offset = 0; - this.socket_recv_error = null; - this.socket_send_error = null; - this.onReady = null; - - if (this.recv_buffer) |recv| { - recv.release(); - this.recv_buffer = null; - } - - if (this.send_buffer) |write| { - write.release(); - this.send_buffer = null; - } - - head = this; -} - -pub fn init(allocator: std.mem.Allocator) !*AsyncBIO { - var bio = instance(allocator); - - bio.bio = boring.BIO_new( - method orelse brk: { - method = boring.BIOMethod.init(async_bio_name, Bio.create, Bio.destroy, Bio.write, Bio.read, null, Bio.ctrl); - break :brk method.?; - }, - ) orelse return error.OutOfMemory; - - _ = boring.BIO_set_data(bio.bio, bio); - return bio; + _ = boring.BIO_set_data(this.bio.?, this); } const WaitResult = enum { @@ -174,10 +121,6 @@ pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.R Output.flush(); } - if (socket_recv_len == 0) { - this.socket_recv_eof = true; - } - // if (socket_recv_len == 0) { this.onSocketReadComplete(); @@ -296,7 +239,7 @@ pub const Bio = struct { if (boring.BIO_get_data(this_bio) != null) { var this = cast(this_bio); - this.release(); + this.bio = null; } return 0; diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig index db4964fc0..e58462b09 100644 --- a/src/http/async_socket.zig +++ b/src/http/async_socket.zig @@ -196,6 +196,11 @@ pub const SendError = AsyncIO.SendError; pub fn deinit(this: *AsyncSocket) void { this.head.release(); + this.err = null; + this.queued = 0; + this.sent = 0; + this.read_context = &[_]u8{}; + this.read_offset = 0; } pub fn send(this: *AsyncSocket) SendError!usize { @@ -309,7 +314,7 @@ pub const SSL = struct { ssl_loaded: bool = false, socket: AsyncSocket, handshake_complete: bool = false, - ssl_bio: ?*AsyncBIO = null, + ssl_bio: AsyncBIO = undefined, unencrypted_bytes_to_send: ?*AsyncMessage = null, connect_frame: Yield(SSL.handshake) = Yield(SSL.handshake){}, send_frame: Yield(SSL.send) = Yield(SSL.send){}, @@ -330,6 +335,8 @@ pub const SSL = struct { pending_read_result: anyerror!u32 = 0, pending_write_result: anyerror!u32 = 0, + handshake_retry_count: u16 = 5, + first_post_handshake_write: bool = true, handshake_result: ?anyerror = null, @@ -371,18 +378,16 @@ pub const SSL = struct { ssl.setHostname(name_); } - var bio = try AsyncBIO.init(this.socket.allocator); - errdefer bio.release(); - bio.onReady = AsyncBIO.Callback.Wrap(SSL, SSL.retryAll).get(this); - bio.socket_fd = this.socket.socket; - this.ssl_bio = bio; + try this.ssl_bio.init(); + this.ssl_bio.onReady = AsyncBIO.Callback.Wrap(SSL, SSL.retryAll).get(this); + this.ssl_bio.socket_fd = this.socket.socket; - boring.SSL_set_bio(ssl, bio.bio, bio.bio); + boring.SSL_set_bio(ssl, this.ssl_bio.bio.?, this.ssl_bio.bio.?); // boring.SSL_set_early_data_enabled(ssl, 1); _ = boring.SSL_clear_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT); _ = boring.SSL_set_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT); - const mode = boring.SSL_MODE_RELEASE_BUFFERS | boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START; + const mode = boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START; _ = boring.SSL_set_mode(ssl, mode); _ = boring.SSL_clear_mode(ssl, mode); @@ -407,7 +412,7 @@ pub const SSL = struct { boring.SSL_set_shed_handshake_config(ssl, 1); - this.unencrypted_bytes_to_send = AsyncMessage.get(this.socket.allocator); + this.unencrypted_bytes_to_send = this.socket.head; try this.handshake(); @@ -497,9 +502,9 @@ pub const SSL = struct { } pub fn doPayloadRead(this: *SSL, buffer: []u8, count: *u32) anyerror!u32 { - if (this.ssl_bio.?.socket_recv_error != null) { - const pending = this.ssl_bio.?.socket_recv_error.?; - this.ssl_bio.?.socket_recv_error = null; + if (this.ssl_bio.socket_recv_error != null) { + const pending = this.ssl_bio.socket_recv_error.?; + this.ssl_bio.socket_recv_error = null; return pending; } @@ -508,6 +513,7 @@ pub const SSL = struct { var ssl_err: c_int = 0; const buf_len = @truncate(u32, buffer.len); while (true) { + boring.ERR_clear_error(); ssl_ret = boring.SSL_read(this.ssl, buffer.ptr + total_bytes_read, @intCast(c_int, buf_len - total_bytes_read)); ssl_err = boring.SSL_get_error(this.ssl, ssl_ret); @@ -521,7 +527,7 @@ pub const SSL = struct { // Continue processing records as long as there is more data available // synchronously. - if (!(ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE or (total_bytes_read < buf_len and ssl_ret > 0 and this.ssl_bio.?.hasPendingReadData()))) break; + if (!(ssl_err == boring.SSL_ERROR_WANT_RENEGOTIATE or (total_bytes_read < buf_len and ssl_ret > 0 and this.ssl_bio.hasPendingReadData()))) break; } // Although only the final SSL_read call may have failed, the failure needs to @@ -547,6 +553,13 @@ pub const SSL = struct { result = error.WouldBlock; }, else => { + if (extremely_verbose) { + const err = boring.ERR_get_error(); + + const version = std.mem.span(boring.SSL_get_version(this.ssl)); + var hostname = std.mem.span(std.mem.sliceTo(&this.hostname, 0)); + Output.prettyErrorln("[{s}] OpenSSLError reading (version: {s}, total read: {d}) - code: {d}", .{ hostname, version, total_bytes_read, err }); + } result = error.OpenSSLError; }, } @@ -556,8 +569,8 @@ pub const SSL = struct { // a connection, and instead terminate the TCP connection. This is reported // as ERR_CONNECTION_CLOSED. Because of this, map the unclean shutdown to a // graceful EOF, instead of treating it as an error as it should be. - if (this.ssl_bio.?.socket_recv_error) |err| { - this.ssl_bio.?.socket_recv_error = null; + if (this.ssl_bio.socket_recv_error) |err| { + this.ssl_bio.socket_recv_error = null; return err; } @@ -630,6 +643,7 @@ pub const SSL = struct { } var byte: u8 = 0; + boring.ERR_clear_error(); var rv = boring.SSL_peek(this.ssl, &byte, 1); var ssl_error = boring.SSL_get_error(this.ssl, rv); switch (ssl_error) { @@ -641,6 +655,8 @@ pub const SSL = struct { } fn doHandshake(this: *SSL) HandshakeError!void { + boring.ERR_clear_error(); + const rv = boring.SSL_do_handshake(this.ssl); if (rv <= 0) { const ssl_error = boring.SSL_get_error(this.ssl, rv); @@ -658,7 +674,25 @@ pub const SSL = struct { this.next_handshake_state = HandshakeState.handshake; return error.WouldBlock; }, - else => return error.OpenSSLError, + boring.SSL_ERROR_SYSCALL => { + this.handshake_retry_count -|= 1; + if (this.handshake_retry_count > 0) { + this.next_handshake_state = HandshakeState.handshake; + return error.WouldBlock; + } + + return error.OpenSSLError; + }, + else => { + if (extremely_verbose) { + const err = boring.ERR_get_error(); + var error_buf: [1024]u8 = undefined; + @memset(&error_buf, 0, 1024); + var err_msg = std.mem.span(boring.ERR_error_string(err, &error_buf)); + Output.prettyErrorln("Handshaking error {s}", .{err_msg}); + } + return error.OpenSSLError; + }, } } @@ -748,38 +782,46 @@ pub const SSL = struct { pub inline fn init(allocator: std.mem.Allocator, io: *AsyncIO) !SSL { return SSL{ + .ssl_bio = AsyncBIO{ + .allocator = allocator, + }, .socket = try AsyncSocket.init(io, 0, allocator), }; } pub fn deinit(this: *SSL) void { this.socket.deinit(); - if (!this.is_ssl) return; - - if (this.ssl_bio) |bio| { - _ = boring.BIO_set_data(bio.bio, null); - bio.socket_fd = 0; - bio.onReady = null; - bio.release(); - this.ssl_bio = null; - } if (this.ssl_loaded) { + _ = boring.SSL_shutdown(this.ssl); this.ssl.deinit(); this.ssl_loaded = false; } - this.handshake_complete = false; - - if (this.unencrypted_bytes_to_send) |bio| { - var next_ = bio.next; - while (next_) |next| { - next.release(); - next_ = next.next; - } + if (this.ssl_bio.recv_buffer) |recv| { + recv.release(); + } - bio.release(); - this.unencrypted_bytes_to_send = null; + if (this.ssl_bio.send_buffer) |recv| { + recv.release(); } + + this.ssl_bio.pending_reads = 0; + this.ssl_bio.pending_sends = 0; + this.ssl_bio.socket_recv_len = 0; + this.ssl_bio.socket_send_len = 0; + this.ssl_bio.bio_write_offset = 0; + this.ssl_bio.bio_read_offset = 0; + this.ssl_bio.socket_send_error = null; + this.ssl_bio.socket_recv_error = null; + + this.ssl_bio.socket_fd = 0; + this.ssl_bio.onReady = null; + + this.handshake_complete = false; + + this.* = SSL{ + .socket = this.socket, + }; } }; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 171693ace..bd5c43d2d 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -47,6 +47,7 @@ pub fn onThreadStart() void { AsyncIO.global_loaded = true; NetworkThread.global.pool.io = &AsyncIO.global; Global.setThreadName("HTTP"); + AsyncBIO.initBoringSSL(); } pub inline fn getAllocator() std.mem.Allocator { @@ -412,15 +413,14 @@ pub const AsyncHTTP = struct { this.state.store(.sending, .Monotonic); var timer = std.time.Timer.start() catch @panic("Timer failure"); defer this.elapsed = timer.read(); - _ = active_requests_count.fetchAdd(1, .Monotonic); this.response = await this.client.sendAsync(this.request_body.list.items, this.response_buffer) catch |err| { - _ = active_requests_count.fetchSub(1, .Monotonic); this.state.store(.fail, .Monotonic); this.err = err; if (sender.http.max_retry_count > sender.http.retries_count) { sender.http.retries_count += 1; + sender.http.response_buffer.reset(); NetworkThread.global.pool.schedule(ThreadPool.Batch.from(&sender.task)); return; } @@ -430,7 +430,6 @@ pub const AsyncHTTP = struct { this.redirect_count = @intCast(u32, @maximum(127 - this.client.remaining_redirect_count, 0)); this.state.store(.success, .Monotonic); this.gzip_elapsed = this.client.gzip_elapsed; - _ = active_requests_count.fetchSub(1, .Monotonic); } if (sender.http.callback) |callback| { @@ -566,6 +565,10 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) ! // this prevents stack overflow redirect: while (this.remaining_redirect_count >= -1) { if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit(); + _ = AsyncHTTP.active_requests_count.fetchAdd(1, .Monotonic); + defer { + _ = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); + } this.stage = Stage.pending; body_out_str.reset(); diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 8281e473b..166705a81 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -266,7 +266,8 @@ fn _wait(self: *ThreadPool, _is_waking: bool, comptime sleep_on_idle: bool) erro if (end_count > 0) { while (HTTP.AsyncHTTP.active_requests_count.loadUnchecked() > HTTP.AsyncHTTP.max_simultaneous_requests) { - io.run_for_ns(std.time.ns_per_us * 10) catch {}; + io.run_for_ns(std.time.ns_per_ms) catch {}; + io.tick() catch {}; } } |