aboutsummaryrefslogtreecommitdiff
path: root/src/http/http_client_async.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/http/http_client_async.zig')
-rw-r--r--src/http/http_client_async.zig53
1 files changed, 34 insertions, 19 deletions
diff --git a/src/http/http_client_async.zig b/src/http/http_client_async.zig
index c90266bd6..b29234da4 100644
--- a/src/http/http_client_async.zig
+++ b/src/http/http_client_async.zig
@@ -204,9 +204,10 @@ pub const HTTPChannelContext = struct {
http: AsyncHTTP = undefined,
channel: *HTTPChannel,
- pub fn callback(http: *AsyncHTTP) void {
+ pub fn callback(http: *AsyncHTTP, sender: *AsyncHTTP.HTTPSender) void {
var this: *HTTPChannelContext = @fieldParentPtr(HTTPChannelContext, "http", http);
this.channel.writeItem(http) catch unreachable;
+ sender.onFinish();
}
};
@@ -242,7 +243,7 @@ pub const AsyncHTTP = struct {
/// Executes on the network thread
callback: ?CompletionCallback = null,
- pub const CompletionCallback = fn (this: *AsyncHTTP) void;
+ pub const CompletionCallback = fn (this: *AsyncHTTP, sender: *HTTPSender) void;
pub var active_requests_count = std.atomic.Atomic(u32).init(0);
pub const State = enum(u32) {
@@ -286,38 +287,54 @@ pub const AsyncHTTP = struct {
batch.push(ThreadPool.Batch.from(&sender.task));
}
- const HTTPSender = struct {
+ var http_sender_head: std.atomic.Atomic(?*HTTPSender) = std.atomic.Atomic(?*HTTPSender).init(null);
+
+ pub const HTTPSender = struct {
task: ThreadPool.Task = .{ .callback = callback },
frame: @Frame(AsyncHTTP.do) = undefined,
http: *AsyncHTTP = undefined,
next: ?*HTTPSender = null,
- var head: ?*HTTPSender = null;
-
pub fn get(http: *AsyncHTTP, allocator: *std.mem.Allocator) *HTTPSender {
- if (head == null) {
- head = allocator.create(HTTPSender) catch unreachable;
- head.?.* = HTTPSender{};
+ @fence(.Acquire);
+
+ var head_ = http_sender_head.load(.Monotonic);
+
+ if (head_ == null) {
+ var new_head = allocator.create(HTTPSender) catch unreachable;
+ new_head.* = HTTPSender{};
+ new_head.next = null;
+ new_head.task = .{ .callback = callback };
+ new_head.http = http;
+ return new_head;
}
- var head_ = head.?;
- head = head.?.next;
- head_.next = null;
- head_.task = .{ .callback = callback };
- head_.http = http;
+ http_sender_head.store(head_.?.next, .Monotonic);
- return head_;
+ head_.?.* = HTTPSender{};
+ head_.?.next = null;
+ head_.?.task = .{ .callback = callback };
+ head_.?.http = http;
+
+ return head_.?;
}
- pub fn release(this: *HTTPSender) void {}
+ pub fn release(this: *HTTPSender) void {
+ @fence(.Acquire);
+ this.task = .{ .callback = callback };
+ this.http = undefined;
+ this.next = http_sender_head.swap(this, .Monotonic);
+ }
pub fn callback(task: *ThreadPool.Task) void {
var this = @fieldParentPtr(HTTPSender, "task", task);
this.frame = async AsyncHTTP.do(this);
}
- pub fn onFinish(this: *HTTPSender) void {}
+ pub fn onFinish(this: *HTTPSender) void {
+ this.release();
+ }
};
pub fn do(sender: *HTTPSender) void {
@@ -348,10 +365,8 @@ pub const AsyncHTTP = struct {
}
if (sender.http.callback) |callback| {
- callback(sender.http);
+ callback(sender.http, sender);
}
-
- sender.release();
}
};