aboutsummaryrefslogtreecommitdiff
path: root/src/http/async_message.zig
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/http/async_message.zig112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/http/async_message.zig b/src/http/async_message.zig
new file mode 100644
index 000000000..c1c11b109
--- /dev/null
+++ b/src/http/async_message.zig
@@ -0,0 +1,112 @@
+const std = @import("std");
+const ObjectPool = @import("../pool.zig").ObjectPool;
+const AsyncIO = @import("io");
+
+pub const buffer_pool_len = std.math.maxInt(u16) - 64;
+pub const BufferPool = ObjectPool([buffer_pool_len]u8, null, false);
+
+const AsyncMessage = @This();
+
+used: u32 = 0,
+sent: u32 = 0,
+completion: AsyncIO.Completion = undefined,
+buf: []u8 = undefined,
+pooled: ?*BufferPool.Node = null,
+allocator: std.mem.Allocator,
+next: ?*AsyncMessage = null,
+context: *anyopaque = undefined,
+released: bool = false,
+
+var _first_ssl: ?*AsyncMessage = null;
+
+pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage {
+ if (_first_ssl) |first| {
+ var prev = first;
+
+ std.debug.assert(prev.released);
+ if (prev.next) |next| {
+ _first_ssl = next;
+ prev.next = null;
+ } else {
+ _first_ssl = null;
+ }
+ prev.released = false;
+
+ return prev;
+ }
+
+ var msg = allocator.create(AsyncMessage) catch unreachable;
+ msg.* = AsyncMessage{
+ .allocator = allocator,
+ .pooled = null,
+ .buf = &[_]u8{},
+ };
+ return msg;
+}
+
+var _first: ?*AsyncMessage = null;
+pub fn get(allocator: std.mem.Allocator) *AsyncMessage {
+ if (_first) |first| {
+ var prev = first;
+ std.debug.assert(prev.released);
+ prev.released = false;
+
+ if (first.next) |next| {
+ _first = next;
+ prev.next = null;
+ return prev;
+ } else {
+ _first = null;
+ }
+
+ return prev;
+ }
+
+ var msg = allocator.create(AsyncMessage) catch unreachable;
+ var pooled = BufferPool.get(allocator);
+ msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled };
+ return msg;
+}
+
+pub fn release(self: *AsyncMessage) void {
+ self.used = 0;
+ self.sent = 0;
+ if (self.released) return;
+ self.released = true;
+
+ if (self.pooled != null) {
+ var old = _first;
+ _first = self;
+ self.next = old;
+ } else {
+ var old = _first_ssl;
+ self.next = old;
+ _first_ssl = self;
+ }
+}
+
+const WriteResponse = struct {
+ written: u32 = 0,
+ overflow: bool = false,
+};
+
+pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse {
+ var remain = this.buf[this.used..];
+ var writable = buffer[0..@minimum(buffer.len, remain.len)];
+ if (writable.len == 0) {
+ return .{ .written = 0, .overflow = buffer.len > 0 };
+ }
+
+ std.mem.copy(u8, remain, writable);
+ this.used += @intCast(u16, writable.len);
+
+ return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len };
+}
+
+pub inline fn slice(this: *const AsyncMessage) []const u8 {
+ return this.buf[0..this.used][this.sent..];
+}
+
+pub inline fn available(this: *AsyncMessage) []u8 {
+ return this.buf[0 .. this.buf.len - this.used];
+}