aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/analytics/analytics_thread.zig5
-rw-r--r--src/cli/create_command.zig8
-rw-r--r--src/cli/upgrade_command.zig8
-rw-r--r--src/http_client_async.zig146
-rw-r--r--src/pool.zig124
5 files changed, 185 insertions, 106 deletions
diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig
index e883a798f..f5caa7f10 100644
--- a/src/analytics/analytics_thread.zig
+++ b/src/analytics/analytics_thread.zig
@@ -513,7 +513,10 @@ pub const EventList = struct {
var retry_remaining: usize = 10;
const rand = random.random();
retry: while (retry_remaining > 0) {
- const response = this.async_http.sendSync() catch |err| {
+ this.async_http.max_retry_count = 0;
+ this.async_http.retries_count = 0;
+
+ const response = this.async_http.sendSync(true) catch |err| {
if (FeatureFlags.verbose_analytics) {
Output.prettyErrorln("[Analytics] failed due to error {s} ({d} retries remain)", .{ @errorName(err), retry_remaining });
}
diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig
index b653aa4a7..7e3ea96eb 100644
--- a/src/cli/create_command.zig
+++ b/src/cli/create_command.zig
@@ -1847,7 +1847,7 @@ pub const Example = struct {
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, &request_body, 60 * std.time.ns_per_min);
async_http.client.progress_node = progress;
- const response = try async_http.sendSync();
+ const response = try async_http.sendSync(true);
switch (response.status_code) {
404 => return error.GitHubRepositoryNotFound,
@@ -1913,7 +1913,7 @@ pub const Example = struct {
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.client.progress_node = progress;
- var response = try async_http.sendSync();
+ var response = try async_http.sendSync(true);
switch (response.status_code) {
404 => return error.ExampleNotFound,
@@ -1992,7 +1992,7 @@ pub const Example = struct {
refresher.maybeRefresh();
- response = try async_http.sendSync();
+ response = try async_http.sendSync(true);
refresher.maybeRefresh();
@@ -2023,7 +2023,7 @@ pub const Example = struct {
async_http.client.progress_node = progress_node;
}
- const response = async_http.sendSync() catch |err| {
+ const response = async_http.sendSync(true) catch |err| {
switch (err) {
error.WouldBlock => {
Output.prettyErrorln("Request timed out while trying to fetch examples list. Please try again", .{});
diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig
index b3445e278..3f6a4840d 100644
--- a/src/cli/upgrade_command.zig
+++ b/src/cli/upgrade_command.zig
@@ -95,7 +95,9 @@ pub const UpgradeCheckerThread = struct {
js_ast.Expr.Data.Store.deinit();
js_ast.Stmt.Data.Store.deinit();
}
- const version = (try UpgradeCommand.getLatestVersion(default_allocator, env_loader, undefined, undefined, true)) orelse return;
+ var arena = std.heap.ArenaAllocator.init(default_allocator);
+ defer arena.deinit();
+ const version = (try UpgradeCommand.getLatestVersion(arena.allocator(), env_loader, undefined, undefined, true)) orelse return;
if (!version.isCurrent()) {
if (version.name()) |name| {
@@ -182,7 +184,7 @@ pub const UpgradeCommand = struct {
var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = try HTTP.AsyncHTTP.init(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, &request_body, 60 * std.time.ns_per_min);
if (!silent) async_http.client.progress_node = progress;
- const response = try async_http.sendSync();
+ const response = try async_http.sendSync(true);
switch (response.status_code) {
404 => return error.HTTP404,
@@ -407,7 +409,7 @@ pub const UpgradeCommand = struct {
);
async_http.client.timeout = timeout;
async_http.client.progress_node = progress;
- const response = try async_http.sendSync();
+ const response = try async_http.sendSync(true);
switch (response.status_code) {
404 => return error.HTTP404,
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index df5677ae7..0568591d6 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -78,12 +78,13 @@ tcp_client: tcp.Client = undefined,
body_size: u32 = 0,
read_count: u32 = 0,
remaining_redirect_count: i8 = 127,
-redirect: ?*URLBufferPool = null,
+redirect: ?*URLBufferPool.Node = null,
disable_shutdown: bool = true,
timeout: usize = 0,
progress_node: ?*std.Progress.Node = null,
socket: AsyncSocket.SSL = undefined,
gzip_elapsed: u64 = 0,
+stage: Stage = Stage.pending,
/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since.
/// This is a workaround for that.
@@ -117,6 +118,14 @@ pub fn deinit(this: *HTTPClient) !void {
}
}
+const Stage = enum(u8) {
+ pending,
+ connect,
+ request,
+ response,
+ done,
+};
+
// threadlocal var resolver_cache
const tcp = std.x.net.tcp;
const ip = std.x.net.ip;
@@ -329,17 +338,19 @@ pub const AsyncHTTP = struct {
sender.release();
}
- pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response {
+ pub fn sendSync(this: *AsyncHTTP, comptime auto_release: bool) anyerror!picohttp.Response {
this.callback_ctx = SingleHTTPChannelPool.get(default_allocator);
defer {
- var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?));
- SingleHTTPChannelPool.release(pooled_node);
- this.callback_ctx = null;
+ if (comptime auto_release) {
+ var pooled_node = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?));
+ SingleHTTPChannelPool.release(pooled_node);
+ this.callback_ctx = null;
+ }
}
this.callback = sendSyncCallback;
var batch = NetworkThread.Batch{};
- this.schedule(this.allocator, &batch);
+ this.schedule(default_allocator, &batch);
NetworkThread.global.pool.schedule(batch);
while (true) {
var pooled = @ptrCast(*SingleHTTPChannelPool.Node, @alignCast(@alignOf(*SingleHTTPChannelPool.Node), this.callback_ctx.?));
@@ -442,76 +453,16 @@ pub const AsyncHTTP = struct {
}
};
-const BufferPool = struct {
- pub const len = std.math.maxInt(u16) - 64;
- buf: [len]u8 = undefined,
- next: ?*BufferPool = null,
- allocator: std.mem.Allocator = undefined,
-
- var head: ?*BufferPool = null;
-
- pub fn get(allocator: std.mem.Allocator) !*BufferPool {
- if (head) |item| {
- var this = item;
- var head_ = item.next;
- head = head_;
- this.next = null;
-
- return this;
- }
-
- var entry = try allocator.create(BufferPool);
- entry.* = BufferPool{ .allocator = allocator };
- return entry;
- }
-
- pub fn release(this: *BufferPool) void {
- if (head) |item| {
- item.next = this;
- } else {
- head = this;
- }
- }
-};
-
-const URLBufferPool = struct {
- pub const len = 4096;
- buf: [len]u8 = undefined,
- next: ?*URLBufferPool = null,
- allocator: std.mem.Allocator = undefined,
-
- var head: ?*URLBufferPool = null;
-
- pub fn get(allocator: std.mem.Allocator) !*URLBufferPool {
- if (head) |item| {
- var this = item;
- var head_ = item.next;
- head = head_;
- this.next = null;
-
- return this;
- }
-
- var entry = try allocator.create(URLBufferPool);
- entry.* = URLBufferPool{ .allocator = allocator };
- return entry;
- }
-
- pub fn release(this: *URLBufferPool) void {
- if (head) |item| {
- item.next = this;
- } else {
- head = this;
- }
- }
-};
+const buffer_pool_len = std.math.maxInt(u16) - 64;
+const BufferPool = ObjectPool([buffer_pool_len]u8, null, false);
+const URLBufferPool = ObjectPool([4096]u8, null, false);
pub const AsyncMessage = struct {
used: u32 = 0,
sent: u32 = 0,
completion: AsyncIO.Completion = undefined,
buf: []u8 = undefined,
- pooled: ?*BufferPool = null,
+ pooled: ?*BufferPool.Node = null,
allocator: std.mem.Allocator,
next: ?*AsyncMessage = null,
context: *anyopaque = undefined,
@@ -520,6 +471,7 @@ pub const AsyncMessage = struct {
pub fn getSSL(allocator: std.mem.Allocator) *AsyncMessage {
if (_first_ssl) |first| {
var prev = first;
+
std.debug.assert(prev.released);
if (prev.next) |next| {
_first_ssl = next;
@@ -560,8 +512,8 @@ pub const AsyncMessage = struct {
}
var msg = allocator.create(AsyncMessage) catch unreachable;
- var pooled = BufferPool.get(allocator) catch unreachable;
- msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.buf, .pooled = pooled };
+ var pooled = BufferPool.get(allocator);
+ msg.* = AsyncMessage{ .allocator = allocator, .buf = &pooled.data, .pooled = pooled };
return msg;
}
@@ -689,7 +641,7 @@ const AsyncSocket = struct {
// on macOS, getaddrinfo() is very slow
// If you send ~200 network requests, about 1.5s is spent on getaddrinfo()
// So, we cache this.
- var address_list = NetworkThread.getAddressList(this.allocator, name, port) catch |err| {
+ var address_list = NetworkThread.getAddressList(default_allocator, name, port) catch |err| {
return @errSetCast(ConnectError, err);
};
@@ -776,7 +728,7 @@ const AsyncSocket = struct {
this.queued += resp.written;
if (resp.overflow) {
- var next = AsyncMessage.get(this.allocator);
+ var next = AsyncMessage.get(default_allocator);
this.tail.next = next;
this.tail = next;
@@ -886,12 +838,15 @@ const AsyncSocket = struct {
send_frame: @Frame(SSL.send) = undefined,
read_frame: @Frame(SSL.read) = undefined,
hostname: [std.fs.MAX_PATH_BYTES]u8 = undefined,
+ is_ssl: bool = false,
const SSLConnectError = ConnectError || HandshakeError;
const HandshakeError = error{OpenSSLError};
pub fn connect(this: *SSL, name: []const u8, port: u16) !void {
+ this.is_ssl = true;
try this.socket.connect(name, port);
+
this.handshake_complete = false;
var ssl = boring.initClient();
@@ -911,7 +866,6 @@ const AsyncSocket = struct {
this.ssl = ssl;
this.read_bio = AsyncMessage.get(this.socket.allocator);
-
try this.handshake();
}
@@ -1076,6 +1030,9 @@ const AsyncSocket = struct {
}
pub fn deinit(this: *SSL) void {
+ this.socket.deinit();
+ if (!this.is_ssl) return;
+
_ = boring.BIO_set_data(this.ssl_bio.bio, null);
this.ssl_bio.pending_frame = AsyncBIO.PendingFrame.init();
this.ssl_bio.socket_fd = 0;
@@ -1310,7 +1267,7 @@ pub const AsyncBIO = struct {
switch (this.send_wait) {
.pending => {
var write_buffer = this.write_buffer orelse brk: {
- this.write_buffer = AsyncMessage.get(this.allocator);
+ this.write_buffer = AsyncMessage.get(default_allocator);
break :brk this.write_buffer.?;
};
@@ -1488,7 +1445,7 @@ pub fn connect(
try connector.connect(this.url.hostname, port);
var client = std.x.net.tcp.Client{ .socket = std.x.os.Socket.from(this.socket.socket.socket) };
- client.setReadBufferSize(BufferPool.len) catch {};
+ client.setReadBufferSize(buffer_pool_len) catch {};
// client.setQuickACK(true) catch {};
this.tcp_client = client;
@@ -1503,8 +1460,12 @@ pub fn sendAsync(this: *HTTPClient, body: []const u8, body_out_str: *MutableStri
}
pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
+ defer if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit();
// this prevents stack overflow
redirect: while (this.remaining_redirect_count >= -1) {
+ if (@enumToInt(this.stage) > @enumToInt(Stage.pending)) this.socket.deinit();
+
+ this.stage = Stage.pending;
body_out_str.reset();
if (this.url.isHTTPS()) {
@@ -1512,7 +1473,6 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !
switch (err) {
error.Redirect => {
this.remaining_redirect_count -= 1;
- this.socket.deinit();
continue :redirect;
},
@@ -1524,7 +1484,6 @@ pub fn send(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !
switch (err) {
error.Redirect => {
this.remaining_redirect_count -= 1;
- this.socket.socket.deinit();
continue :redirect;
},
@@ -1541,11 +1500,12 @@ const Task = ThreadPool.Task;
pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) !picohttp.Response {
this.socket = AsyncSocket.SSL{
- .socket = try AsyncSocket.init(&AsyncIO.global, 0, this.allocator),
+ .socket = try AsyncSocket.init(&AsyncIO.global, 0, default_allocator),
};
+ this.stage = Stage.connect;
var socket = &this.socket.socket;
try this.connect(*AsyncSocket, socket);
-
+ this.stage = Stage.request;
defer this.socket.close();
var request = buildRequest(this, body.len);
if (this.verbose) {
@@ -1554,7 +1514,7 @@ pub fn sendHTTP(this: *HTTPClient, body: []const u8, body_out_str: *MutableStrin
try writeRequest(@TypeOf(socket), socket, request, body);
_ = try socket.send();
-
+ this.stage = Stage.response;
if (this.progress_node == null) {
return this.processResponse(
false,
@@ -1591,8 +1551,8 @@ const ZlibPool = struct {
pub fn get(this: *ZlibPool) !*MutableString {
switch (this.items.items.len) {
0 => {
- var mutable = try this.allocator.create(MutableString);
- mutable.* = try MutableString.init(this.allocator, 0);
+ var mutable = try default_allocator.create(MutableString);
+ mutable.* = try MutableString.init(default_allocator, 0);
return mutable;
},
else => {
@@ -1704,7 +1664,7 @@ const ZlibPool = struct {
pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, comptime Client: type, client: Client, body_out_str: *MutableString) !picohttp.Response {
defer if (this.verbose) Output.flush();
var response: picohttp.Response = undefined;
- var request_message = AsyncMessage.get(this.allocator);
+ var request_message = AsyncMessage.get(default_allocator);
defer request_message.release();
var request_buffer: []u8 = request_message.buf;
var read_length: usize = 0;
@@ -1805,21 +1765,21 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
switch (response.status_code) {
302, 301, 307, 308, 303 => {
if (strings.indexOf(location, "://")) |i| {
- var url_buf = this.redirect orelse try URLBufferPool.get(this.allocator);
+ var url_buf = this.redirect orelse URLBufferPool.get(default_allocator);
const protocol_name = location[0..i];
if (strings.eqlComptime(protocol_name, "http") or strings.eqlComptime(protocol_name, "https")) {} else {
return error.UnsupportedRedirectProtocol;
}
- std.mem.copy(u8, &url_buf.buf, location);
- this.url = URL.parse(url_buf.buf[0..location.len]);
+ std.mem.copy(u8, &url_buf.data, location);
+ this.url = URL.parse(url_buf.data[0..location.len]);
this.redirect = url_buf;
} else {
- var url_buf = try URLBufferPool.get(this.allocator);
+ var url_buf = URLBufferPool.get(default_allocator);
const original_url = this.url;
this.url = URL.parse(std.fmt.bufPrint(
- &url_buf.buf,
+ &url_buf.data,
"{s}://{s}{s}",
.{ original_url.displayProtocol(), original_url.displayHostname(), location },
) catch return error.RedirectURLTooLong);
@@ -2028,9 +1988,11 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
}
pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *MutableString) !picohttp.Response {
- this.socket = try AsyncSocket.SSL.init(this.allocator, &AsyncIO.global);
+ this.socket = try AsyncSocket.SSL.init(default_allocator, &AsyncIO.global);
var socket = &this.socket;
+ this.stage = Stage.connect;
try this.connect(*AsyncSocket.SSL, socket);
+ this.stage = Stage.request;
defer this.socket.close();
var request = buildRequest(this, body_str.len);
@@ -2041,6 +2003,8 @@ pub fn sendHTTPS(this: *HTTPClient, body_str: []const u8, body_out_str: *Mutable
try writeRequest(@TypeOf(socket), socket, request, body_str);
_ = try socket.send();
+ this.stage = Stage.response;
+
if (this.progress_node == null) {
return this.processResponse(
false,
diff --git a/src/pool.zig b/src/pool.zig
index 4f3396d30..36a3d9d0a 100644
--- a/src/pool.zig
+++ b/src/pool.zig
@@ -1,8 +1,116 @@
const std = @import("std");
-pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: std.mem.Allocator) anyerror!Type), comptime threadsafe: bool) type {
+fn SinglyLinkedList(comptime T: type, comptime Parent: type) type {
return struct {
- const LinkedList = std.SinglyLinkedList(Type);
+ const Self = @This();
+
+ /// Node inside the linked list wrapping the actual data.
+ pub const Node = struct {
+ next: ?*Node = null,
+ data: T,
+
+ pub const Data = T;
+
+ /// Insert a new node after the current one.
+ ///
+ /// Arguments:
+ /// new_node: Pointer to the new node to insert.
+ pub fn insertAfter(node: *Node, new_node: *Node) void {
+ new_node.next = node.next;
+ node.next = new_node;
+ }
+
+ /// Remove a node from the list.
+ ///
+ /// Arguments:
+ /// node: Pointer to the node to be removed.
+ /// Returns:
+ /// node removed
+ pub fn removeNext(node: *Node) ?*Node {
+ const next_node = node.next orelse return null;
+ node.next = next_node.next;
+ return next_node;
+ }
+
+ /// Iterate over the singly-linked list from this node, until the final node is found.
+ /// This operation is O(N).
+ pub fn findLast(node: *Node) *Node {
+ var it = node;
+ while (true) {
+ it = it.next orelse return it;
+ }
+ }
+
+ /// Iterate over each next node, returning the count of all nodes except the starting one.
+ /// This operation is O(N).
+ pub fn countChildren(node: *const Node) usize {
+ var count: usize = 0;
+ var it: ?*const Node = node.next;
+ while (it) |n| : (it = n.next) {
+ count += 1;
+ }
+ return count;
+ }
+
+ pub inline fn release(node: *Node) void {
+ Parent.release(node);
+ }
+ };
+
+ first: ?*Node = null,
+
+ /// Insert a new node at the head.
+ ///
+ /// Arguments:
+ /// new_node: Pointer to the new node to insert.
+ pub fn prepend(list: *Self, new_node: *Node) void {
+ new_node.next = list.first;
+ list.first = new_node;
+ }
+
+ /// Remove a node from the list.
+ ///
+ /// Arguments:
+ /// node: Pointer to the node to be removed.
+ pub fn remove(list: *Self, node: *Node) void {
+ if (list.first == node) {
+ list.first = node.next;
+ } else {
+ var current_elm = list.first.?;
+ while (current_elm.next != node) {
+ current_elm = current_elm.next.?;
+ }
+ current_elm.next = node.next;
+ }
+ }
+
+ /// Remove and return the first node in the list.
+ ///
+ /// Returns:
+ /// A pointer to the first node in the list.
+ pub fn popFirst(list: *Self) ?*Node {
+ const first = list.first orelse return null;
+ list.first = first.next;
+ return first;
+ }
+
+ /// Iterate over all nodes, returning the count.
+ /// This operation is O(N).
+ pub fn len(list: Self) usize {
+ if (list.first) |n| {
+ return 1 + n.countChildren();
+ } else {
+ return 0;
+ }
+ }
+ };
+}
+
+pub fn ObjectPool(comptime Type: type, comptime Init: (?fn (allocator: std.mem.Allocator) anyerror!Type), comptime threadsafe: bool) type {
+ return struct {
+ const Pool = @This();
+ const LinkedList = SinglyLinkedList(Type, Pool);
+ pub const Node = LinkedList.Node;
const Data = if (threadsafe)
struct {
pub threadlocal var list: LinkedList = undefined;
@@ -15,21 +123,23 @@ pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: std.mem.Al
};
const data = Data;
- pub const Node = LinkedList.Node;
pub fn get(allocator: std.mem.Allocator) *LinkedList.Node {
if (data.loaded) {
if (data.list.popFirst()) |node| {
- if (comptime @hasDecl(Type, "reset")) node.data.reset();
+ if (comptime std.meta.trait.isContainer(Type) and @hasDecl(Type, "reset")) node.data.reset();
return node;
}
}
var new_node = allocator.create(LinkedList.Node) catch unreachable;
new_node.* = LinkedList.Node{
- .data = Init(
- allocator,
- ) catch unreachable,
+ .data = if (comptime Init) |init_|
+ (init_(
+ allocator,
+ ) catch unreachable)
+ else
+ undefined,
};
return new_node;