diff options
author | 2022-02-05 00:30:11 -0800 | |
---|---|---|
committer | 2022-02-05 00:30:28 -0800 | |
commit | 6cf3a0878016a6a03833d0d2984051f6b481059e (patch) | |
tree | ed41028d69b6cec6e1bff5cd4230b72125fca27b | |
parent | 2b45c8dffec45dd838355b71e30f04a2dc117325 (diff) | |
download | bun-6cf3a0878016a6a03833d0d2984051f6b481059e.tar.gz bun-6cf3a0878016a6a03833d0d2984051f6b481059e.tar.zst bun-6cf3a0878016a6a03833d0d2984051f6b481059e.zip |
Further reliability improvements to http client
-rw-r--r-- | src/http/async_bio.zig | 20 | ||||
-rw-r--r-- | src/http/async_socket.zig | 56 |
2 files changed, 62 insertions, 14 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index e6ac5e71c..3ee88fdbd 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -41,6 +41,7 @@ bio_write_offset: u32 = 0, bio_read_offset: u32 = 0, socket_send_error: ?anyerror = null, socket_recv_error: ?anyerror = null, +recv_eof: bool = false, onReady: ?Callback = null, @@ -121,6 +122,7 @@ pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.R Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); Output.flush(); } + this.recv_eof = this.recv_eof or socket_recv_len == 0; // if (socket_recv_len == 0) { @@ -145,6 +147,7 @@ pub fn doSocketWrite(this: *AsyncBIO, completion: *Completion, result_: AsyncIO. }, ); this.socket_send_len += socket_send_len; + this.recv_eof = this.recv_eof or socket_send_len == 0; const remain = ctx.data.min - @minimum(ctx.data.min, socket_send_len); @@ -264,6 +267,11 @@ pub const Bio = struct { return -1; } + if (this.recv_eof) { + this.recv_eof = false; + return 0; + } + if (this.socket_send_error != null) { if (extremely_verbose) { Output.prettyErrorln("write: {s}", .{@errorName(this.socket_send_error.?)}); @@ -329,6 +337,11 @@ pub const Bio = struct { return -1; } + if (this.recv_buffer == null and socket_recv_len > 0) { + socket_recv_len = 0; + bio_read_offset = 0; + } + if (socket_recv_len == 0) { // Instantiate the read buffer and read from the socket. Although only |len| // bytes were requested, intentionally read to the full buffer size. The SSL @@ -343,6 +356,11 @@ pub const Bio = struct { } if (this.pending_reads == 0) { + if (this.recv_eof) { + this.recv_eof = false; + return 0; + } + this.pending_reads += 1; this.scheduleSocketRead(len__); } @@ -392,7 +410,7 @@ pub const Bio = struct { @memcpy(ptr, bytes.ptr, len); bio_read_offset += len; - if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0) { + if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0 and this.pending_sends == 0) { // The read buffer is empty. // we can reset the pointer back to the beginning of the buffer // if there is more data to read, we will ask for another diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig index 205940b2d..080654e0a 100644 --- a/src/http/async_socket.zig +++ b/src/http/async_socket.zig @@ -5,7 +5,7 @@ const AsyncMessage = @import("./async_message.zig"); const AsyncBIO = @import("./async_bio.zig"); const Completion = AsyncIO.Completion; const AsyncSocket = @This(); - +const KeepAlive = @import("../http_client_async.zig").KeepAlive; const Output = @import("../global.zig").Output; const NetworkThread = @import("../network_thread.zig"); const Environment = @import("../global.zig").Environment; @@ -29,8 +29,10 @@ queued: usize = 0, sent: usize = 0, send_frame: @Frame(AsyncSocket.send) = undefined, read_frame: @Frame(AsyncSocket.read) = undefined, -connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined, -close_frame: @Frame(AsyncSocket.close) = undefined, +connect_frame: Yield(AsyncSocket.connectToAddress) = Yield(AsyncSocket.connectToAddress){}, +close_frame: Yield(AsyncSocket.close) = Yield(AsyncSocket.close){}, + +was_keepalive: bool = false, read_context: []u8 = undefined, read_offset: u64 = 0, @@ -51,7 +53,7 @@ fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void { this.err = resolved_err; }; - resume this.connect_frame; + this.connect_frame.maybeResume(); } fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void { @@ -70,7 +72,7 @@ fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!v this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address); suspend { - this.connect_frame = @frame().*; + this.connect_frame.set(@frame()); } if (this.err) |e| { @@ -79,7 +81,7 @@ fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!v } fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void { - resume this.close_frame; + this.close_frame.maybeResume(); } pub fn close(this: *AsyncSocket) void { @@ -88,12 +90,35 @@ pub fn close(this: *AsyncSocket) void { this.socket = 0; this.io.close(*AsyncSocket, this, on_close, &this.close_completion, to_close); suspend { - this.close_frame = @frame().*; + this.close_frame.set(@frame()); } } - pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { - this.close(); + if (!this.was_keepalive and !KeepAlive.disabled) { + if (KeepAlive.instance.find(name, port)) |socket| { + var err_code: i32 = undefined; + var size: u32 = @sizeOf(u32); + const rc = std.os.system.getsockopt(socket, std.os.SOL.SOCKET, std.os.SO.ERROR, @ptrCast([*]u8, &err_code), &size); + switch (std.os.errno(rc)) { + .SUCCESS => { + this.socket = socket; + this.was_keepalive = true; + return; + }, + .BADF, .FAULT, .INVAL => {}, + else => { + std.os.closeSocket(socket); + }, + } + } + } + + this.was_keepalive = false; + return try this.doConnect(name, port); +} + +fn doConnect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void { + this.was_keepalive = false; outer: while (true) { // on macOS, getaddrinfo() is very slow @@ -372,9 +397,9 @@ pub const SSL = struct { boring.SSL_set_bio(ssl, this.ssl_bio.bio.?, this.ssl_bio.bio.?); // boring.SSL_set_early_data_enabled(ssl, 1); - _ = boring.SSL_clear_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT); - _ = boring.SSL_set_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT); - const mode = boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START; + _ = boring.SSL_clear_options(ssl, boring.SSL_OP_LEGACY_SERVER_CONNECT); + _ = boring.SSL_set_options(ssl, boring.SSL_OP_LEGACY_SERVER_CONNECT); + const mode = boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START | boring.SSL_MODE_RELEASE_BUFFERS; _ = boring.SSL_set_mode(ssl, mode); _ = boring.SSL_clear_mode(ssl, mode); @@ -790,6 +815,10 @@ pub const SSL = struct { this.socket.deinit(); if (this.ssl_loaded) { + this.connect_frame.wait = false; + this.read_frame.wait = false; + this.send_frame.wait = false; + this.ssl.shutdown(); this.ssl.deinit(); this.ssl_loaded = false; @@ -817,6 +846,7 @@ pub const SSL = struct { this.ssl_bio.socket_recv_len = 0; this.ssl_bio.socket_send_len = 0; this.ssl_bio.bio_write_offset = 0; + this.ssl_bio.recv_eof = false; this.ssl_bio.bio_read_offset = 0; this.ssl_bio.socket_send_error = null; this.ssl_bio.socket_recv_error = null; @@ -824,7 +854,7 @@ pub const SSL = struct { this.ssl_bio.socket_fd = 0; this.ssl_bio.onReady = null; } else { - this.ssl_bio = undefined; + this.ssl_bio = AsyncBIO{ .allocator = getAllocator() }; } this.handshake_complete = false; |