aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/deps/backtrace.zig4
-rw-r--r--src/http_client_async.zig101
-rw-r--r--src/network_thread.zig4
3 files changed, 63 insertions, 46 deletions
diff --git a/src/deps/backtrace.zig b/src/deps/backtrace.zig
index 8448642de..7c3cdbd05 100644
--- a/src/deps/backtrace.zig
+++ b/src/deps/backtrace.zig
@@ -137,7 +137,7 @@ pub fn reloadHandlers() void {
os.sigaction(os.SIG.TRAP, &act, null);
}
const os = std.os;
-pub fn start(ctx: ?*anyopaque, callback_: PrintCallback, onError: ErrorCallback) bool {
+pub fn start(ctx: ?*anyopaque, callback_: PrintCallback, onError: ErrorCallback) void {
callback_ctx = ctx;
callback = callback_;
on_error = onError;
@@ -154,6 +154,4 @@ pub fn start(ctx: ?*anyopaque, callback_: PrintCallback, onError: ErrorCallback)
os.sigaction(os.SIG.ILL, &act, null);
os.sigaction(os.SIG.SEGV, &act, null);
os.sigaction(os.SIG.TRAP, &act, null);
-
- return true;
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 0568591d6..d0ab75fab 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -8,7 +8,6 @@ const strings = _global.strings;
const MutableString = _global.MutableString;
const FeatureFlags = _global.FeatureFlags;
const stringZ = _global.stringZ;
-const default_allocator = _global.default_allocator;
const C = _global.C;
const std = @import("std");
const URL = @import("./query_string_map.zig").URL;
@@ -24,6 +23,16 @@ const boring = @import("boringssl");
pub const NetworkThread = @import("./network_thread.zig");
const ObjectPool = @import("./pool.zig").ObjectPool;
const SOCK = os.SOCK;
+const Arena = @import("./mimalloc_arena.zig").Arena;
+
+// This becomes Arena.allocator
+pub var default_allocator: std.mem.Allocator = undefined;
+pub var default_arena: Arena = undefined;
+
+pub fn onThreadStart() void {
+ default_arena = Arena.init() catch unreachable;
+ default_allocator = default_arena.allocator();
+}
pub const Headers = struct {
pub const Kv = struct {
@@ -236,13 +245,11 @@ const SingleHTTPChannel = struct {
const SingleHTTPCHannel_ = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 8 });
channel: SingleHTTPCHannel_,
pub fn reset(_: *@This()) void {}
- pub fn init(_: std.mem.Allocator) anyerror!SingleHTTPChannel {
+ pub fn init() SingleHTTPChannel {
return SingleHTTPChannel{ .channel = SingleHTTPCHannel_.init() };
}
};
-const SingleHTTPChannelPool = ObjectPool(SingleHTTPChannel, SingleHTTPChannel.init, false);
-
pub const HTTPChannelContext = struct {
http: AsyncHTTP = undefined,
channel: *HTTPChannel,
@@ -333,32 +340,29 @@ pub const AsyncHTTP = struct {
}
fn sendSyncCallback(this: *AsyncHTTP, sender: *HTTPSender) void {
- var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?));
- pooled_node.data.channel.writeItem(this) catch unreachable;
+ var single_http_channel = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
+ single_http_channel.channel.writeItem(this) catch unreachable;
sender.release();
}
- pub fn sendSync(this: *AsyncHTTP, comptime auto_release: bool) anyerror!picohttp.Response {
- this.callback_ctx = SingleHTTPChannelPool.get(default_allocator);
- defer {
- if (comptime auto_release) {
- var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?));
- SingleHTTPChannelPool.release(pooled_node);
- this.callback_ctx = null;
- }
+ pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response {
+ if (this.callback_ctx == null) {
+ var ctx = try _global.default_allocator.create(SingleHTTPChannel);
+ ctx.* = SingleHTTPChannel.init();
+ this.callback_ctx = ctx;
+ } else {
+ var ctx = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
+ ctx.* = SingleHTTPChannel.init();
}
+
this.callback = sendSyncCallback;
var batch = NetworkThread.Batch{};
- this.schedule(default_allocator, &batch);
+ this.schedule(_global.default_allocator, &batch);
NetworkThread.global.pool.schedule(batch);
while (true) {
- var pooled = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?));
- var async_http: *AsyncHTTP = (pooled.data.channel.tryReadItem() catch unreachable) orelse {
- std.atomic.spinLoopHint();
- std.time.sleep(std.time.ns_per_us * 100);
- continue;
- };
+ var data = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?));
+ var async_http: *AsyncHTTP = data.channel.readItem() catch unreachable;
if (async_http.err) |err| {
return err;
}
@@ -830,9 +834,10 @@ const AsyncSocket = struct {
pub const SSL = struct {
ssl: *boring.SSL = undefined,
+ ssl_loaded: bool = false,
socket: AsyncSocket,
handshake_complete: bool = false,
- ssl_bio: *AsyncBIO = undefined,
+ ssl_bio: ?*AsyncBIO = null,
read_bio: ?*AsyncMessage = null,
handshake_frame: @Frame(SSL.handshake) = undefined,
send_frame: @Frame(SSL.send) = undefined,
@@ -850,6 +855,13 @@ const AsyncSocket = struct {
this.handshake_complete = false;
var ssl = boring.initClient();
+ this.ssl = ssl;
+ this.ssl_loaded = true;
+ errdefer {
+ this.ssl_loaded = false;
+ this.ssl.deinit();
+ this.ssl = undefined;
+ }
{
std.mem.copy(u8, &this.hostname, name);
@@ -864,7 +876,6 @@ const AsyncSocket = struct {
boring.SSL_set_bio(ssl, bio.bio, bio.bio);
- this.ssl = ssl;
this.read_bio = AsyncMessage.get(this.socket.allocator);
try this.handshake();
}
@@ -876,7 +887,7 @@ const AsyncSocket = struct {
fn handshake(this: *SSL) HandshakeError!void {
while (!this.ssl.isInitFinished()) {
boring.ERR_clear_error();
- this.ssl_bio.enqueueSend();
+ this.ssl_bio.?.enqueueSend();
const handshake_result = boring.SSL_connect(this.ssl);
if (handshake_result == 0) {
Output.prettyErrorln("ssl accept error", .{});
@@ -889,10 +900,10 @@ const AsyncSocket = struct {
// accept_result < 0
const e = boring.SSL_get_error(this.ssl, handshake_result);
if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) {
- this.ssl_bio.enqueueSend();
+ this.ssl_bio.?.enqueueSend();
suspend {
this.handshake_frame = @frame().*;
- this.ssl_bio.pushPendingFrame(&this.handshake_frame);
+ this.ssl_bio.?.pushPendingFrame(&this.handshake_frame);
}
continue;
@@ -932,16 +943,16 @@ const AsyncSocket = struct {
error.WantRead => {
suspend {
this.send_frame = @frame().*;
- this.ssl_bio.pushPendingFrame(&this.send_frame);
+ this.ssl_bio.?.pushPendingFrame(&this.send_frame);
}
continue;
},
error.WantWrite => {
- this.ssl_bio.enqueueSend();
+ this.ssl_bio.?.enqueueSend();
suspend {
this.send_frame = @frame().*;
- this.ssl_bio.pushPendingFrame(&this.send_frame);
+ this.ssl_bio.?.pushPendingFrame(&this.send_frame);
}
continue;
},
@@ -971,15 +982,15 @@ const AsyncSocket = struct {
len = this.ssl.read(buf) catch |err| {
switch (err) {
error.WantWrite => {
- this.ssl_bio.enqueueSend();
+ this.ssl_bio.?.enqueueSend();
if (extremely_verbose) {
Output.prettyErrorln(
"error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
.{
@errorName(err),
- @tagName(this.ssl_bio.read_wait),
- @tagName(this.ssl_bio.send_wait),
+ @tagName(this.ssl_bio.?.read_wait),
+ @tagName(this.ssl_bio.?.send_wait),
},
);
Output.flush();
@@ -987,7 +998,7 @@ const AsyncSocket = struct {
suspend {
this.read_frame = @frame().*;
- this.ssl_bio.pushPendingFrame(&this.read_frame);
+ this.ssl_bio.?.pushPendingFrame(&this.read_frame);
}
continue;
},
@@ -999,8 +1010,8 @@ const AsyncSocket = struct {
"error: {s}: \n Read Wait: {s}\n Send Wait: {s}",
.{
@errorName(err),
- @tagName(this.ssl_bio.read_wait),
- @tagName(this.ssl_bio.send_wait),
+ @tagName(this.ssl_bio.?.read_wait),
+ @tagName(this.ssl_bio.?.send_wait),
},
);
Output.flush();
@@ -1008,7 +1019,7 @@ const AsyncSocket = struct {
suspend {
this.read_frame = @frame().*;
- this.ssl_bio.pushPendingFrame(&this.read_frame);
+ this.ssl_bio.?.pushPendingFrame(&this.read_frame);
}
continue;
},
@@ -1033,11 +1044,19 @@ const AsyncSocket = struct {
this.socket.deinit();
if (!this.is_ssl) return;
- _ = boring.BIO_set_data(this.ssl_bio.bio, null);
- this.ssl_bio.pending_frame = AsyncBIO.PendingFrame.init();
- this.ssl_bio.socket_fd = 0;
- this.ssl_bio.release();
- this.ssl.deinit();
+ if (this.ssl_bio) |bio| {
+ _ = boring.BIO_set_data(bio.bio, null);
+ bio.pending_frame = AsyncBIO.PendingFrame.init();
+ bio.socket_fd = 0;
+ bio.release();
+ this.ssl_bio = null;
+ }
+
+ if (this.ssl_loaded) {
+ this.ssl.deinit();
+ this.ssl_loaded = false;
+ }
+
this.handshake_complete = false;
if (this.read_bio) |bio| {
diff --git a/src/network_thread.zig b/src/network_thread.zig
index 0f24e8f35..94eb13414 100644
--- a/src/network_thread.zig
+++ b/src/network_thread.zig
@@ -5,7 +5,7 @@ const std = @import("std");
const AsyncIO = @import("io");
const Output = @import("./global.zig").Output;
const IdentityContext = @import("./identity_context.zig").IdentityContext;
-
+const HTTP = @import("./http_client_async.zig");
const NetworkThread = @This();
/// Single-thread in this pool
@@ -81,7 +81,7 @@ pub fn init() !void {
global = NetworkThread{
.pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }),
};
-
+ global.pool.on_thread_spawn = HTTP.onThreadStart;
global.pool.io = &AsyncIO.global;
address_list_cached = AddressListCache.init(@import("./global.zig").default_allocator);
}