diff options
Diffstat (limited to '')
-rw-r--r-- | src/deps/backtrace.zig | 4 | ||||
-rw-r--r-- | src/http_client_async.zig | 101 | ||||
-rw-r--r-- | src/network_thread.zig | 4 |
3 files changed, 63 insertions, 46 deletions
diff --git a/src/deps/backtrace.zig b/src/deps/backtrace.zig index 8448642de..7c3cdbd05 100644 --- a/src/deps/backtrace.zig +++ b/src/deps/backtrace.zig @@ -137,7 +137,7 @@ pub fn reloadHandlers() void { os.sigaction(os.SIG.TRAP, &act, null); } const os = std.os; -pub fn start(ctx: ?*anyopaque, callback_: PrintCallback, onError: ErrorCallback) bool { +pub fn start(ctx: ?*anyopaque, callback_: PrintCallback, onError: ErrorCallback) void { callback_ctx = ctx; callback = callback_; on_error = onError; @@ -154,6 +154,4 @@ pub fn start(ctx: ?*anyopaque, callback_: PrintCallback, onError: ErrorCallback) os.sigaction(os.SIG.ILL, &act, null); os.sigaction(os.SIG.SEGV, &act, null); os.sigaction(os.SIG.TRAP, &act, null); - - return true; } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 0568591d6..d0ab75fab 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -8,7 +8,6 @@ const strings = _global.strings; const MutableString = _global.MutableString; const FeatureFlags = _global.FeatureFlags; const stringZ = _global.stringZ; -const default_allocator = _global.default_allocator; const C = _global.C; const std = @import("std"); const URL = @import("./query_string_map.zig").URL; @@ -24,6 +23,16 @@ const boring = @import("boringssl"); pub const NetworkThread = @import("./network_thread.zig"); const ObjectPool = @import("./pool.zig").ObjectPool; const SOCK = os.SOCK; +const Arena = @import("./mimalloc_arena.zig").Arena; + +// This becomes Arena.allocator +pub var default_allocator: std.mem.Allocator = undefined; +pub var default_arena: Arena = undefined; + +pub fn onThreadStart() void { + default_arena = Arena.init() catch unreachable; + default_allocator = default_arena.allocator(); +} pub const Headers = struct { pub const Kv = struct { @@ -236,13 +245,11 @@ const SingleHTTPChannel = struct { const SingleHTTPCHannel_ = @import("./sync.zig").Channel(*AsyncHTTP, .{ .Static = 8 }); channel: SingleHTTPCHannel_, pub fn reset(_: *@This()) void {} - pub fn init(_: std.mem.Allocator) anyerror!SingleHTTPChannel { + pub fn init() SingleHTTPChannel { return SingleHTTPChannel{ .channel = SingleHTTPCHannel_.init() }; } }; -const SingleHTTPChannelPool = ObjectPool(SingleHTTPChannel, SingleHTTPChannel.init, false); - pub const HTTPChannelContext = struct { http: AsyncHTTP = undefined, channel: *HTTPChannel, @@ -333,32 +340,29 @@ pub const AsyncHTTP = struct { } fn sendSyncCallback(this: *AsyncHTTP, sender: *HTTPSender) void { - var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?)); - pooled_node.data.channel.writeItem(this) catch unreachable; + var single_http_channel = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); + single_http_channel.channel.writeItem(this) catch unreachable; sender.release(); } - pub fn sendSync(this: *AsyncHTTP, comptime auto_release: bool) anyerror!picohttp.Response { - this.callback_ctx = SingleHTTPChannelPool.get(default_allocator); - defer { - if (comptime auto_release) { - var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?)); - SingleHTTPChannelPool.release(pooled_node); - this.callback_ctx = null; - } + pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response { + if (this.callback_ctx == null) { + var ctx = try _global.default_allocator.create(SingleHTTPChannel); + ctx.* = SingleHTTPChannel.init(); + this.callback_ctx = ctx; + } else { + var ctx = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); + ctx.* = SingleHTTPChannel.init(); } + this.callback = sendSyncCallback; var batch = NetworkThread.Batch{}; - this.schedule(default_allocator, &batch); + this.schedule(_global.default_allocator, &batch); NetworkThread.global.pool.schedule(batch); while (true) { - var pooled = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?)); - var async_http: *AsyncHTTP = (pooled.data.channel.tryReadItem() catch unreachable) orelse { - std.atomic.spinLoopHint(); - std.time.sleep(std.time.ns_per_us * 100); - continue; - }; + var data = @ptrCast(*SingleHTTPChannel, @alignCast(@alignOf(*SingleHTTPChannel), this.callback_ctx.?)); + var async_http: *AsyncHTTP = data.channel.readItem() catch unreachable; if (async_http.err) |err| { return err; } @@ -830,9 +834,10 @@ const AsyncSocket = struct { pub const SSL = struct { ssl: *boring.SSL = undefined, + ssl_loaded: bool = false, socket: AsyncSocket, handshake_complete: bool = false, - ssl_bio: *AsyncBIO = undefined, + ssl_bio: ?*AsyncBIO = null, read_bio: ?*AsyncMessage = null, handshake_frame: @Frame(SSL.handshake) = undefined, send_frame: @Frame(SSL.send) = undefined, @@ -850,6 +855,13 @@ const AsyncSocket = struct { this.handshake_complete = false; var ssl = boring.initClient(); + this.ssl = ssl; + this.ssl_loaded = true; + errdefer { + this.ssl_loaded = false; + this.ssl.deinit(); + this.ssl = undefined; + } { std.mem.copy(u8, &this.hostname, name); @@ -864,7 +876,6 @@ const AsyncSocket = struct { boring.SSL_set_bio(ssl, bio.bio, bio.bio); - this.ssl = ssl; this.read_bio = AsyncMessage.get(this.socket.allocator); try this.handshake(); } @@ -876,7 +887,7 @@ const AsyncSocket = struct { fn handshake(this: *SSL) HandshakeError!void { while (!this.ssl.isInitFinished()) { boring.ERR_clear_error(); - this.ssl_bio.enqueueSend(); + this.ssl_bio.?.enqueueSend(); const handshake_result = boring.SSL_connect(this.ssl); if (handshake_result == 0) { Output.prettyErrorln("ssl accept error", .{}); @@ -889,10 +900,10 @@ const AsyncSocket = struct { // accept_result < 0 const e = boring.SSL_get_error(this.ssl, handshake_result); if ((e == boring.SSL_ERROR_WANT_READ or e == boring.SSL_ERROR_WANT_WRITE)) { - this.ssl_bio.enqueueSend(); + this.ssl_bio.?.enqueueSend(); suspend { this.handshake_frame = @frame().*; - this.ssl_bio.pushPendingFrame(&this.handshake_frame); + this.ssl_bio.?.pushPendingFrame(&this.handshake_frame); } continue; @@ -932,16 +943,16 @@ const AsyncSocket = struct { error.WantRead => { suspend { this.send_frame = @frame().*; - this.ssl_bio.pushPendingFrame(&this.send_frame); + this.ssl_bio.?.pushPendingFrame(&this.send_frame); } continue; }, error.WantWrite => { - this.ssl_bio.enqueueSend(); + this.ssl_bio.?.enqueueSend(); suspend { this.send_frame = @frame().*; - this.ssl_bio.pushPendingFrame(&this.send_frame); + this.ssl_bio.?.pushPendingFrame(&this.send_frame); } continue; }, @@ -971,15 +982,15 @@ const AsyncSocket = struct { len = this.ssl.read(buf) catch |err| { switch (err) { error.WantWrite => { - this.ssl_bio.enqueueSend(); + this.ssl_bio.?.enqueueSend(); if (extremely_verbose) { Output.prettyErrorln( "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", .{ @errorName(err), - @tagName(this.ssl_bio.read_wait), - @tagName(this.ssl_bio.send_wait), + @tagName(this.ssl_bio.?.read_wait), + @tagName(this.ssl_bio.?.send_wait), }, ); Output.flush(); @@ -987,7 +998,7 @@ const AsyncSocket = struct { suspend { this.read_frame = @frame().*; - this.ssl_bio.pushPendingFrame(&this.read_frame); + this.ssl_bio.?.pushPendingFrame(&this.read_frame); } continue; }, @@ -999,8 +1010,8 @@ const AsyncSocket = struct { "error: {s}: \n Read Wait: {s}\n Send Wait: {s}", .{ @errorName(err), - @tagName(this.ssl_bio.read_wait), - @tagName(this.ssl_bio.send_wait), + @tagName(this.ssl_bio.?.read_wait), + @tagName(this.ssl_bio.?.send_wait), }, ); Output.flush(); @@ -1008,7 +1019,7 @@ const AsyncSocket = struct { suspend { this.read_frame = @frame().*; - this.ssl_bio.pushPendingFrame(&this.read_frame); + this.ssl_bio.?.pushPendingFrame(&this.read_frame); } continue; }, @@ -1033,11 +1044,19 @@ const AsyncSocket = struct { this.socket.deinit(); if (!this.is_ssl) return; - _ = boring.BIO_set_data(this.ssl_bio.bio, null); - this.ssl_bio.pending_frame = AsyncBIO.PendingFrame.init(); - this.ssl_bio.socket_fd = 0; - this.ssl_bio.release(); - this.ssl.deinit(); + if (this.ssl_bio) |bio| { + _ = boring.BIO_set_data(bio.bio, null); + bio.pending_frame = AsyncBIO.PendingFrame.init(); + bio.socket_fd = 0; + bio.release(); + this.ssl_bio = null; + } + + if (this.ssl_loaded) { + this.ssl.deinit(); + this.ssl_loaded = false; + } + this.handshake_complete = false; if (this.read_bio) |bio| { diff --git a/src/network_thread.zig b/src/network_thread.zig index 0f24e8f35..94eb13414 100644 --- a/src/network_thread.zig +++ b/src/network_thread.zig @@ -5,7 +5,7 @@ const std = @import("std"); const AsyncIO = @import("io"); const Output = @import("./global.zig").Output; const IdentityContext = @import("./identity_context.zig").IdentityContext; - +const HTTP = @import("./http_client_async.zig"); const NetworkThread = @This(); /// Single-thread in this pool @@ -81,7 +81,7 @@ pub fn init() !void { global = NetworkThread{ .pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }), }; - + global.pool.on_thread_spawn = HTTP.onThreadStart; global.pool.io = &AsyncIO.global; address_list_cached = AddressListCache.init(@import("./global.zig").default_allocator); } |