aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-05 00:30:11 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-05 00:30:28 -0800
commit6cf3a0878016a6a03833d0d2984051f6b481059e (patch)
treeed41028d69b6cec6e1bff5cd4230b72125fca27b
parent2b45c8dffec45dd838355b71e30f04a2dc117325 (diff)
downloadbun-6cf3a0878016a6a03833d0d2984051f6b481059e.tar.gz
bun-6cf3a0878016a6a03833d0d2984051f6b481059e.tar.zst
bun-6cf3a0878016a6a03833d0d2984051f6b481059e.zip
Further reliability improvements to http client
-rw-r--r--src/http/async_bio.zig20
-rw-r--r--src/http/async_socket.zig56
2 files changed, 62 insertions, 14 deletions
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index e6ac5e71c..3ee88fdbd 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -41,6 +41,7 @@ bio_write_offset: u32 = 0,
bio_read_offset: u32 = 0,
socket_send_error: ?anyerror = null,
socket_recv_error: ?anyerror = null,
+recv_eof: bool = false,
onReady: ?Callback = null,
@@ -121,6 +122,7 @@ pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.R
Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
Output.flush();
}
+ this.recv_eof = this.recv_eof or socket_recv_len == 0;
// if (socket_recv_len == 0) {
@@ -145,6 +147,7 @@ pub fn doSocketWrite(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.
},
);
this.socket_send_len += socket_send_len;
+ this.recv_eof = this.recv_eof or socket_send_len == 0;
const remain = ctx.data.min - @minimum(ctx.data.min, socket_send_len);
@@ -264,6 +267,11 @@ pub const Bio = struct {
return -1;
}
+ if (this.recv_eof) {
+ this.recv_eof = false;
+ return 0;
+ }
+
if (this.socket_send_error != null) {
if (extremely_verbose) {
Output.prettyErrorln("write: {s}", .{@errorName(this.socket_send_error.?)});
@@ -329,6 +337,11 @@ pub const Bio = struct {
return -1;
}
+ if (this.recv_buffer == null and socket_recv_len > 0) {
+ socket_recv_len = 0;
+ bio_read_offset = 0;
+ }
+
if (socket_recv_len == 0) {
// Instantiate the read buffer and read from the socket. Although only |len|
// bytes were requested, intentionally read to the full buffer size. The SSL
@@ -343,6 +356,11 @@ pub const Bio = struct {
}
if (this.pending_reads == 0) {
+ if (this.recv_eof) {
+ this.recv_eof = false;
+ return 0;
+ }
+
this.pending_reads += 1;
this.scheduleSocketRead(len__);
}
@@ -392,7 +410,7 @@ pub const Bio = struct {
@memcpy(ptr, bytes.ptr, len);
bio_read_offset += len;
- if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0) {
+ if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0 and this.pending_sends == 0) {
// The read buffer is empty.
// we can reset the pointer back to the beginning of the buffer
// if there is more data to read, we will ask for another
diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig
index 205940b2d..080654e0a 100644
--- a/src/http/async_socket.zig
+++ b/src/http/async_socket.zig
@@ -5,7 +5,7 @@ const AsyncMessage = @import("./async_message.zig");
const AsyncBIO = @import("./async_bio.zig");
const Completion = AsyncIO.Completion;
const AsyncSocket = @This();
-
+const KeepAlive = @import("../http_client_async.zig").KeepAlive;
const Output = @import("../global.zig").Output;
const NetworkThread = @import("../network_thread.zig");
const Environment = @import("../global.zig").Environment;
@@ -29,8 +29,10 @@ queued: usize = 0,
sent: usize = 0,
send_frame: @Frame(AsyncSocket.send) = undefined,
read_frame: @Frame(AsyncSocket.read) = undefined,
-connect_frame: @Frame(AsyncSocket.connectToAddress) = undefined,
-close_frame: @Frame(AsyncSocket.close) = undefined,
+connect_frame: Yield(AsyncSocket.connectToAddress) = Yield(AsyncSocket.connectToAddress){},
+close_frame: Yield(AsyncSocket.close) = Yield(AsyncSocket.close){},
+
+was_keepalive: bool = false,
read_context: []u8 = undefined,
read_offset: u64 = 0,
@@ -51,7 +53,7 @@ fn on_connect(this: *AsyncSocket, _: *Completion, err: ConnectError!void) void {
this.err = resolved_err;
};
- resume this.connect_frame;
+ this.connect_frame.maybeResume();
}
fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!void {
@@ -70,7 +72,7 @@ fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!v
this.io.connect(*AsyncSocket, this, on_connect, &this.connect_completion, sockfd, address);
suspend {
- this.connect_frame = @frame().*;
+ this.connect_frame.set(@frame());
}
if (this.err) |e| {
@@ -79,7 +81,7 @@ fn connectToAddress(this: *AsyncSocket, address: std.net.Address) ConnectError!v
}
fn on_close(this: *AsyncSocket, _: *Completion, _: AsyncIO.CloseError!void) void {
- resume this.close_frame;
+ this.close_frame.maybeResume();
}
pub fn close(this: *AsyncSocket) void {
@@ -88,12 +90,35 @@ pub fn close(this: *AsyncSocket) void {
this.socket = 0;
this.io.close(*AsyncSocket, this, on_close, &this.close_completion, to_close);
suspend {
- this.close_frame = @frame().*;
+ this.close_frame.set(@frame());
}
}
-
pub fn connect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void {
- this.close();
+ if (!this.was_keepalive and !KeepAlive.disabled) {
+ if (KeepAlive.instance.find(name, port)) |socket| {
+ var err_code: i32 = undefined;
+ var size: u32 = @sizeOf(u32);
+ const rc = std.os.system.getsockopt(socket, std.os.SOL.SOCKET, std.os.SO.ERROR, @ptrCast([*]u8, &err_code), &size);
+ switch (std.os.errno(rc)) {
+ .SUCCESS => {
+ this.socket = socket;
+ this.was_keepalive = true;
+ return;
+ },
+ .BADF, .FAULT, .INVAL => {},
+ else => {
+ std.os.closeSocket(socket);
+ },
+ }
+ }
+ }
+
+ this.was_keepalive = false;
+ return try this.doConnect(name, port);
+}
+
+fn doConnect(this: *AsyncSocket, name: []const u8, port: u16) ConnectError!void {
+ this.was_keepalive = false;
outer: while (true) {
// on macOS, getaddrinfo() is very slow
@@ -372,9 +397,9 @@ pub const SSL = struct {
boring.SSL_set_bio(ssl, this.ssl_bio.bio.?, this.ssl_bio.bio.?);
// boring.SSL_set_early_data_enabled(ssl, 1);
- _ = boring.SSL_clear_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT);
- _ = boring.SSL_set_options(ssl, boring.SSL_OP_NO_COMPRESSION | boring.SSL_OP_LEGACY_SERVER_CONNECT);
- const mode = boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START;
+ _ = boring.SSL_clear_options(ssl, boring.SSL_OP_LEGACY_SERVER_CONNECT);
+ _ = boring.SSL_set_options(ssl, boring.SSL_OP_LEGACY_SERVER_CONNECT);
+ const mode = boring.SSL_MODE_CBC_RECORD_SPLITTING | boring.SSL_MODE_ENABLE_FALSE_START | boring.SSL_MODE_RELEASE_BUFFERS;
_ = boring.SSL_set_mode(ssl, mode);
_ = boring.SSL_clear_mode(ssl, mode);
@@ -790,6 +815,10 @@ pub const SSL = struct {
this.socket.deinit();
if (this.ssl_loaded) {
+ this.connect_frame.wait = false;
+ this.read_frame.wait = false;
+ this.send_frame.wait = false;
+
this.ssl.shutdown();
this.ssl.deinit();
this.ssl_loaded = false;
@@ -817,6 +846,7 @@ pub const SSL = struct {
this.ssl_bio.socket_recv_len = 0;
this.ssl_bio.socket_send_len = 0;
this.ssl_bio.bio_write_offset = 0;
+ this.ssl_bio.recv_eof = false;
this.ssl_bio.bio_read_offset = 0;
this.ssl_bio.socket_send_error = null;
this.ssl_bio.socket_recv_error = null;
@@ -824,7 +854,7 @@ pub const SSL = struct {
this.ssl_bio.socket_fd = 0;
this.ssl_bio.onReady = null;
} else {
- this.ssl_bio = undefined;
+ this.ssl_bio = AsyncBIO{ .allocator = getAllocator() };
}
this.handshake_complete = false;