diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/http/async_message.zig | 113 | ||||
-rw-r--r-- | src/http_client_async.zig | 45 |
2 files changed, 22 insertions, 136 deletions
diff --git a/src/http/async_message.zig b/src/http/async_message.zig deleted file mode 100644 index 45c581bbb..000000000 --- a/src/http/async_message.zig +++ /dev/null @@ -1,113 +0,0 @@ -const std = @import("std"); -const ObjectPool = @import("../pool.zig").ObjectPool; -const AsyncIO = @import("io"); - -pub const buffer_pool_len = std.math.maxInt(u16); -pub const BufferPoolBytes = [buffer_pool_len]u8; -pub const BufferPool = ObjectPool(BufferPoolBytes, null, false, 4); -const Environment = @import("../env.zig"); -const AsyncMessage = @This(); - -used: u32 = 0, -sent: u32 = 0, -completion: AsyncIO.Completion = undefined, -buf: []u8 = undefined, -pooled: ?*BufferPool.Node = null, -allocator: std.mem.Allocator, -next: ?*AsyncMessage = null, -context: *anyopaque = undefined, -released: bool = false, - -var _first_ssl: ?*AsyncMessage = null; - -pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage { - if (_first_ssl) |first| { - var prev = first; - - std.debug.assert(prev.released); - if (prev.next) |next| { - _first_ssl = next; - prev.next = null; - } else { - _first_ssl = null; - } - prev.released = false; - - return prev; - } - - var msg = allocator.create(AsyncMessage) catch unreachable; - msg.* = AsyncMessage{ - .allocator = allocator, - .pooled = null, - .buf = &[_]u8{}, - }; - return msg; -} - -var _first: ?*AsyncMessage = null; -pub fn get(allocator: std.mem.Allocator) *AsyncMessage { - if (_first) |first| { - var prev = first; - if (Environment.allow_assert) std.debug.assert(prev.released); - prev.released = false; - - if (first.next) |next| { - _first = next; - prev.next = null; - return prev; - } else { - _first = null; - } - - return prev; - } - - var msg = allocator.create(AsyncMessage) catch unreachable; - var pooled = BufferPool.get(allocator); - msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled }; - return msg; -} - -pub fn release(self: *AsyncMessage) void { - self.used = 0; - self.sent = 0; - if (self.released) return; - self.released = true; - - if (self.pooled != null) { - var old = _first; - _first = self; - self.next = old; - } else { - var old = _first_ssl; - self.next = old; - _first_ssl = self; - } -} - -const WriteResponse = struct { - written: u32 = 0, - overflow: bool = false, -}; - -pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse { - var remain = this.buf[this.used..]; - var writable = buffer[0..@minimum(buffer.len, remain.len)]; - if (writable.len == 0) { - return .{ .written = 0, .overflow = buffer.len > 0 }; - } - - std.mem.copy(u8, remain, writable); - this.used += @intCast(u16, writable.len); - - return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len }; -} - -pub inline fn slice(this: *const AsyncMessage) []const u8 { - return this.buf[0..this.used][this.sent..]; -} - -pub inline fn available(this: *AsyncMessage) []u8 { - return this.buf[0 .. this.buf.len - this.used]; -} diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 103db175c..c566f29bf 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -24,7 +24,6 @@ pub const NetworkThread = @import("./network_thread.zig"); const ObjectPool = @import("./pool.zig").ObjectPool; const SOCK = os.SOCK; const Arena = @import("./mimalloc_arena.zig").Arena; -const AsyncMessage = @import("./http/async_message.zig"); const ZlibPool = @import("./http/zlib.zig"); const URLBufferPool = ObjectPool([4096]u8, null, false, 10); const uws = @import("uws"); @@ -623,7 +622,7 @@ pub const HTTPStage = enum { }; pub const InternalState = struct { - request_message: ?*AsyncMessage = null, + request_message: ?*BodyPreamblePool.Node = null, pending_response: picohttp.Response = undefined, allow_keepalive: bool = true, transfer_encoding: Encoding = Encoding.identity, @@ -1067,6 +1066,9 @@ pub const AsyncHTTP = struct { } }; +const BodyPreambleArray = std.BoundedArray(u8, 1024 * 16); +const BodyPreamblePool = ObjectPool(BodyPreambleArray, null, false, 16); + pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); @@ -1353,7 +1355,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u var amount_read: usize = 0; var needs_move = true; if (this.state.request_message) |req_msg| { - var available = req_msg.buf; + var available = req_msg.data.unusedCapacitySlice(); if (available.len == 0) { this.state.request_message.?.release(); this.state.request_message = null; @@ -1361,18 +1363,11 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; } - const wrote = @minimum(available.len - req_msg.used, incoming_data.len); - @memcpy( - available.ptr + req_msg.used, - incoming_data.ptr, - wrote, - ); - req_msg.used += @truncate(u32, wrote); - amount_read = 0; - req_msg.sent = 0; - needs_move = false; - to_read = available[0..req_msg.used]; - pending_buffers[1] = incoming_data[wrote..]; + const to_read_len = @minimum(available.len, to_read.len); + req_msg.data.appendSliceAssumeCapacity(to_read[0..to_read_len]); + to_read = req_msg.data.slice(); + pending_buffers[1] = incoming_data[to_read_len..]; + needs_move = pending_buffers[1].len > 0; } this.state.pending_response = picohttp.Response{}; @@ -1385,15 +1380,19 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u switch (err) { error.ShortRead => { if (needs_move) { - std.debug.assert(this.state.request_message == null); - this.state.request_message = AsyncMessage.get(default_allocator); - if (to_read.len > this.state.request_message.?.buf.len) { - this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket); - return; + const to_copy = incoming_data; + + if (to_copy.len > 0) { + this.state.request_message = this.state.request_message orelse brk: { + var preamble = BodyPreamblePool.get(getAllocator()); + preamble.data = .{}; + break :brk preamble; + }; + this.state.request_message.?.data.appendSlice(to_copy) catch { + this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket); + return; + }; } - - _ = this.state.request_message.?.writeAll(incoming_data); - this.state.request_message.?.sent = @truncate(u32, amount_read); } this.setTimeout(socket, 60); |