diff options
Diffstat (limited to 'src/http/http_client_async.zig')
-rw-r--r-- | src/http/http_client_async.zig | 53 |
1 files changed, 34 insertions, 19 deletions
diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig index c90266bd6..b29234da4 100644 --- a/src/http/http_client_async.zig +++ b/src/http/http_client_async.zig @@ -204,9 +204,10 @@ pub const HTTPChannelContext = struct { http: AsyncHTTP = undefined, channel: *HTTPChannel, - pub fn callback(http: *AsyncHTTP) void { + pub fn callback(http: *AsyncHTTP, sender: *AsyncHTTP.HTTPSender) void { var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", http); this.channel.writeItem(http) catch unreachable; + sender.onFinish(); } }; @@ -242,7 +243,7 @@ pub const AsyncHTTP = struct { /// Executes on the network thread callback: ?CompletionCallback = null, - pub const CompletionCallback = fn (this: *AsyncHTTP) void; + pub const CompletionCallback = fn (this: *AsyncHTTP, sender: *HTTPSender) void; pub var active_requests_count = std.atomic.Atomic(u32).init(0); pub const State = enum(u32) { @@ -286,38 +287,54 @@ pub const AsyncHTTP = struct { batch.push(ThreadPool.Batch.from(&sender.task)); } - const HTTPSender = struct { + var http_sender_head: std.atomic.Atomic(?*HTTPSender) = std.atomic.Atomic(?*HTTPSender).init(null); + + pub const HTTPSender = struct { task: ThreadPool.Task = .{ .callback = callback }, frame: @Frame(AsyncHTTP.do) = undefined, http: *AsyncHTTP = undefined, next: ?*HTTPSender = null, - var head: ?*HTTPSender = null; - pub fn get(http: *AsyncHTTP, allocator: *std.mem.Allocator) *HTTPSender { - if (head == null) { - head = allocator.create(HTTPSender) catch unreachable; - head.?.* = HTTPSender{}; + @fence(.Acquire); + + var head_ = http_sender_head.load(.Monotonic); + + if (head_ == null) { + var new_head = allocator.create(HTTPSender) catch unreachable; + new_head.* = HTTPSender{}; + new_head.next = null; + new_head.task = .{ .callback = callback }; + new_head.http = http; + return new_head; } - var head_ = head.?; - head = head.?.next; - head_.next = null; - head_.task = .{ .callback = callback }; - head_.http = http; + http_sender_head.store(head_.?.next, .Monotonic); - return head_; + head_.?.* = HTTPSender{}; + head_.?.next = null; + head_.?.task = .{ .callback = callback }; + head_.?.http = http; + + return head_.?; } - pub fn release(this: *HTTPSender) void {} + pub fn release(this: *HTTPSender) void { + @fence(.Acquire); + this.task = .{ .callback = callback }; + this.http = undefined; + this.next = http_sender_head.swap(this, .Monotonic); + } pub fn callback(task: *ThreadPool.Task) void { var this = @fieldParentPtr(HTTPSender, "task", task); this.frame = async AsyncHTTP.do(this); } - pub fn onFinish(this: *HTTPSender) void {} + pub fn onFinish(this: *HTTPSender) void { + this.release(); + } }; pub fn do(sender: *HTTPSender) void { @@ -348,10 +365,8 @@ pub const AsyncHTTP = struct { } if (sender.http.callback) |callback| { - callback(sender.http); + callback(sender.http, sender); } - - sender.release(); } }; |