aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/http/async_message.zig113
-rw-r--r--src/http_client_async.zig45
2 files changed, 22 insertions, 136 deletions
diff --git a/src/http/async_message.zig b/src/http/async_message.zig
deleted file mode 100644
index 45c581bbb..000000000
--- a/src/http/async_message.zig
+++ /dev/null
@@ -1,113 +0,0 @@
-const std = @import("std");
-const ObjectPool = @import("../pool.zig").ObjectPool;
-const AsyncIO = @import("io");
-
-pub const buffer_pool_len = std.math.maxInt(u16);
-pub const BufferPoolBytes = [buffer_pool_len]u8;
-pub const BufferPool = ObjectPool(BufferPoolBytes, null, false, 4);
-const Environment = @import("../env.zig");
-const AsyncMessage = @This();
-
-used: u32 = 0,
-sent: u32 = 0,
-completion: AsyncIO.Completion = undefined,
-buf: []u8 = undefined,
-pooled: ?*BufferPool.Node = null,
-allocator: std.mem.Allocator,
-next: ?*AsyncMessage = null,
-context: *anyopaque = undefined,
-released: bool = false,
-
-var _first_ssl: ?*AsyncMessage = null;
-
-pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage {
- if (_first_ssl) |first| {
- var prev = first;
-
- std.debug.assert(prev.released);
- if (prev.next) |next| {
- _first_ssl = next;
- prev.next = null;
- } else {
- _first_ssl = null;
- }
- prev.released = false;
-
- return prev;
- }
-
- var msg = allocator.create(AsyncMessage) catch unreachable;
- msg.* = AsyncMessage{
- .allocator = allocator,
- .pooled = null,
- .buf = &[_]u8{},
- };
- return msg;
-}
-
-var _first: ?*AsyncMessage = null;
-pub fn get(allocator: std.mem.Allocator) *AsyncMessage {
- if (_first) |first| {
- var prev = first;
- if (Environment.allow_assert) std.debug.assert(prev.released);
- prev.released = false;
-
- if (first.next) |next| {
- _first = next;
- prev.next = null;
- return prev;
- } else {
- _first = null;
- }
-
- return prev;
- }
-
- var msg = allocator.create(AsyncMessage) catch unreachable;
- var pooled = BufferPool.get(allocator);
- msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled };
- return msg;
-}
-
-pub fn release(self: *AsyncMessage) void {
- self.used = 0;
- self.sent = 0;
- if (self.released) return;
- self.released = true;
-
- if (self.pooled != null) {
- var old = _first;
- _first = self;
- self.next = old;
- } else {
- var old = _first_ssl;
- self.next = old;
- _first_ssl = self;
- }
-}
-
-const WriteResponse = struct {
- written: u32 = 0,
- overflow: bool = false,
-};
-
-pub fn writeAll(this: *AsyncMessage, buffer: []const u8) WriteResponse {
- var remain = this.buf[this.used..];
- var writable = buffer[0..@minimum(buffer.len, remain.len)];
- if (writable.len == 0) {
- return .{ .written = 0, .overflow = buffer.len > 0 };
- }
-
- std.mem.copy(u8, remain, writable);
- this.used += @intCast(u16, writable.len);
-
- return .{ .written = @truncate(u32, writable.len), .overflow = writable.len == remain.len };
-}
-
-pub inline fn slice(this: *const AsyncMessage) []const u8 {
- return this.buf[0..this.used][this.sent..];
-}
-
-pub inline fn available(this: *AsyncMessage) []u8 {
- return this.buf[0 .. this.buf.len - this.used];
-}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 103db175c..c566f29bf 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -24,7 +24,6 @@ pub const NetworkThread = @import("./network_thread.zig");
const ObjectPool = @import("./pool.zig").ObjectPool;
const SOCK = os.SOCK;
const Arena = @import("./mimalloc_arena.zig").Arena;
-const AsyncMessage = @import("./http/async_message.zig");
const ZlibPool = @import("./http/zlib.zig");
const URLBufferPool = ObjectPool([4096]u8, null, false, 10);
const uws = @import("uws");
@@ -623,7 +622,7 @@ pub const HTTPStage = enum {
};
pub const InternalState = struct {
- request_message: ?*AsyncMessage = null,
+ request_message: ?*BodyPreamblePool.Node = null,
pending_response: picohttp.Response = undefined,
allow_keepalive: bool = true,
transfer_encoding: Encoding = Encoding.identity,
@@ -1067,6 +1066,9 @@ pub const AsyncHTTP = struct {
}
};
+const BodyPreambleArray = std.BoundedArray(u8, 1024 * 16);
+const BodyPreamblePool = ObjectPool(BodyPreambleArray, null, false, 16);
+
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
var header_count: usize = 0;
var header_entries = this.header_entries.slice();
@@ -1353,7 +1355,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
var amount_read: usize = 0;
var needs_move = true;
if (this.state.request_message) |req_msg| {
- var available = req_msg.buf;
+ var available = req_msg.data.unusedCapacitySlice();
if (available.len == 0) {
this.state.request_message.?.release();
this.state.request_message = null;
@@ -1361,18 +1363,11 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
}
- const wrote = @minimum(available.len - req_msg.used, incoming_data.len);
- @memcpy(
- available.ptr + req_msg.used,
- incoming_data.ptr,
- wrote,
- );
- req_msg.used += @truncate(u32, wrote);
- amount_read = 0;
- req_msg.sent = 0;
- needs_move = false;
- to_read = available[0..req_msg.used];
- pending_buffers[1] = incoming_data[wrote..];
+ const to_read_len = @minimum(available.len, to_read.len);
+ req_msg.data.appendSliceAssumeCapacity(to_read[0..to_read_len]);
+ to_read = req_msg.data.slice();
+ pending_buffers[1] = incoming_data[to_read_len..];
+ needs_move = pending_buffers[1].len > 0;
}
this.state.pending_response = picohttp.Response{};
@@ -1385,15 +1380,19 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
switch (err) {
error.ShortRead => {
if (needs_move) {
- std.debug.assert(this.state.request_message == null);
- this.state.request_message = AsyncMessage.get(default_allocator);
- if (to_read.len > this.state.request_message.?.buf.len) {
- this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket);
- return;
+ const to_copy = incoming_data;
+
+ if (to_copy.len > 0) {
+ this.state.request_message = this.state.request_message orelse brk: {
+ var preamble = BodyPreamblePool.get(getAllocator());
+ preamble.data = .{};
+ break :brk preamble;
+ };
+ this.state.request_message.?.data.appendSlice(to_copy) catch {
+ this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket);
+ return;
+ };
}
-
- _ = this.state.request_message.?.writeAll(incoming_data);
- this.state.request_message.?.sent = @truncate(u32, amount_read);
}
this.setTimeout(socket, 60);