aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bun.js/api/bun/socket.zig4
-rw-r--r--src/deps/uws.zig6
-rw-r--r--src/global.zig4
-rw-r--r--src/http_client_async.zig98
-rw-r--r--src/install/semver.zig2
-rw-r--r--src/string_mutable.zig14
6 files changed, 107 insertions, 21 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig
index ccf265202..2405996cf 100644
--- a/src/bun.js/api/bun/socket.zig
+++ b/src/bun.js/api/bun/socket.zig
@@ -774,6 +774,10 @@ fn NewSocket(comptime ssl: bool) type {
poll_ref: JSC.PollRef = JSC.PollRef.init(),
reffer: JSC.Ref = JSC.Ref.init(),
last_4: [4]u8 = .{ 0, 0, 0, 0 },
+
+ // TODO: switch to something that uses `visitAggregate` and have the
+ // `Listener` keep a list of all the sockets JSValue in there
+ // This is wasteful because it means we are keeping a JSC::Weak for every single open socket
has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
const This = @This();
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index 1551efba7..e69b096b4 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -432,6 +432,10 @@ pub const Loop = extern struct {
low_prio_head: ?*Socket,
low_prio_budget: i32,
iteration_nr: c_longlong,
+
+ pub fn recvSlice(this: *InternalLoopData) []u8 {
+ return this.recv_buf[0..LIBUS_RECV_BUFFER_LENGTH];
+ }
};
pub fn get() ?*Loop {
@@ -1662,7 +1666,7 @@ extern fn uws_res_upgrade(
) void;
extern fn uws_res_cork(i32, res: *uws_res, ctx: *anyopaque, corker: fn (?*anyopaque) callconv(.C) void) void;
extern fn uws_res_write_headers(i32, res: *uws_res, names: [*]const Api.StringPointer, values: [*]const Api.StringPointer, count: usize, buf: [*]const u8) void;
-pub const LIBUS_RECV_BUFFER_LENGTH = @import("std").zig.c_translation.promoteIntLiteral(i32, 524288, .decimal);
+pub const LIBUS_RECV_BUFFER_LENGTH = 524288;
pub const LIBUS_TIMEOUT_GRANULARITY = @as(i32, 4);
pub const LIBUS_RECV_BUFFER_PADDING = @as(i32, 32);
pub const LIBUS_EXT_ALIGNMENT = @as(i32, 16);
diff --git a/src/global.zig b/src/global.zig
index c5d6af7a8..c75a72891 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -409,3 +409,7 @@ pub fn isHeapMemory(memory: anytype) bool {
}
pub const Mimalloc = @import("./allocators/mimalloc.zig");
+
+pub fn isSliceInBuffer(slice: []const u8, buffer: []const u8) bool {
+ return slice.len > 0 and @ptrToInt(buffer.ptr) <= @ptrToInt(slice.ptr) and ((@ptrToInt(slice.ptr) + slice.len) <= (@ptrToInt(buffer.ptr) + buffer.len));
+}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 5f49641cb..4d46f41b9 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -49,6 +49,8 @@ var shared_request_headers_buf: [256]picohttp.Header = undefined;
// this doesn't need to be stack memory because it is immediately cloned after use
var shared_response_headers_buf: [256]picohttp.Header = undefined;
+const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
+
fn NewHTTPContext(comptime ssl: bool) type {
return struct {
const pool_size = 64;
@@ -191,7 +193,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
);
} else {
// trailing zero is fine to ignore
- if (strings.eqlComptime(buf, "0\r\n")) {
+ if (strings.eqlComptime(buf, end_of_chunked_http1_1_encoding_response_body)) {
return;
}
@@ -541,8 +543,9 @@ pub fn onClose(
return;
}
- if (in_progress)
+ if (in_progress) {
client.fail(error.ConnectionClosed);
+ }
}
pub fn onTimeout(
client: *HTTPClient,
@@ -1376,7 +1379,8 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
var amount_read: usize = 0;
var needs_move = true;
if (this.state.response_message_buffer.list.items.len > 0) {
- this.state.response_message_buffer.append(incoming_data) catch @panic("Out of memory");
+ // this one probably won't be another chunk, so we use appendSliceExact() to avoid over-allocating
+ this.state.response_message_buffer.appendSliceExact(incoming_data) catch @panic("Out of memory");
to_read = this.state.response_message_buffer.list.items;
needs_move = false;
}
@@ -1394,6 +1398,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
const to_copy = incoming_data;
if (to_copy.len > 0) {
+ // this one will probably be another chunk, so we leave a little extra room
this.state.response_message_buffer.append(to_copy) catch @panic("Out of memory");
}
}
@@ -1679,16 +1684,18 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes
const preallocate_max = 1024 * 1024 * 256;
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
+ std.debug.assert(this.state.transfer_encoding == .identity);
// is it exactly as much as we need?
if (is_only_buffer and incoming_data.len >= this.state.body_size) {
- return handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]);
+ try handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]);
+ return true;
} else {
return handleResponseBodyFromMultiplePackets(this, incoming_data);
}
}
-fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !bool {
+fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !void {
if (this.state.encoding.isCompressed()) {
var body_buffer = this.state.body_out_str.?;
if (body_buffer.list.capacity == 0) {
@@ -1718,8 +1725,6 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
}
this.state.postProcessBody(this.state.getBodyBuffer());
-
- return true;
}
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
@@ -1762,6 +1767,17 @@ pub fn handleResponseBodyChunkedEncoding(
this: *HTTPClient,
incoming_data: []const u8,
) !bool {
+ if (incoming_data.len <= single_packet_small_buffer.len and this.state.getBodyBuffer().list.items.len == 0) {
+ return try this.handleResponseBodyChunkedEncodingFromSinglePacket(incoming_data);
+ } else {
+ return try this.handleResponseBodyChunkedEncodingFromMultiplePackets(incoming_data);
+ }
+}
+
+fn handleResponseBodyChunkedEncodingFromMultiplePackets(
+ this: *HTTPClient,
+ incoming_data: []const u8,
+) !bool {
var decoder = &this.state.chunked_decoder;
var buffer_ = this.state.getBodyBuffer();
var buffer = buffer_.*;
@@ -1815,6 +1831,74 @@ pub fn handleResponseBodyChunkedEncoding(
unreachable;
}
+// the first packet for Transfer-Encoding: chunk
+// is usually pretty small or sometimes even just a length
+// so we can avoid allocating a temporary buffer to copy the data in
+var single_packet_small_buffer: [16 * 1024]u8 = undefined;
+fn handleResponseBodyChunkedEncodingFromSinglePacket(
+ this: *HTTPClient,
+ incoming_data: []const u8,
+) !bool {
+ var decoder = &this.state.chunked_decoder;
+ std.debug.assert(incoming_data.len <= single_packet_small_buffer.len);
+
+ // set consume_trailer to 1 to discard the trailing header
+ // using content-encoding per chunk is not supported
+ decoder.consume_trailer = 1;
+
+ var buffer: []u8 = undefined;
+
+ if (
+ // if we've already copied the buffer once, we can avoid copying it again.
+ this.state.response_message_buffer.owns(incoming_data)) {
+ buffer = bun.constStrToU8(incoming_data);
+ } else {
+ buffer = single_packet_small_buffer[0..incoming_data.len];
+ @memcpy(buffer.ptr, incoming_data.ptr, incoming_data.len);
+ }
+
+ var bytes_decoded = incoming_data.len;
+ // phr_decode_chunked mutates in-place
+ const pret = picohttp.phr_decode_chunked(
+ decoder,
+ buffer.ptr + (buffer.len -| incoming_data.len),
+ &bytes_decoded,
+ );
+ buffer.len -|= incoming_data.len - bytes_decoded;
+
+ switch (pret) {
+ // Invalid HTTP response body
+ -1 => {
+ return error.InvalidHTTPResponse;
+ },
+ // Needs more data
+ -2 => {
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(buffer.len);
+ progress.context.maybeRefresh();
+ }
+ try this.state.getBodyBuffer().appendSliceExact(buffer);
+
+ return false;
+ },
+ // Done
+ else => {
+ try this.handleResponseBodyFromSinglePacket(buffer);
+ std.debug.assert(this.state.body_out_str.?.list.items.ptr != buffer.ptr);
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(buffer.len);
+ progress.context.maybeRefresh();
+ }
+
+ return true;
+ },
+ }
+
+ unreachable;
+}
+
pub fn handleResponseMetadata(
this: *HTTPClient,
response: picohttp.Response,
diff --git a/src/install/semver.zig b/src/install/semver.zig
index 813ee1a8e..37af5c7e2 100644
--- a/src/install/semver.zig
+++ b/src/install/semver.zig
@@ -211,7 +211,7 @@ pub const String = extern struct {
buf: string,
in: string,
) Pointer {
- std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr) and ((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len)));
+ std.debug.assert(bun.isSliceInBuffer(in, buf));
return Pointer{
.off = @truncate(u32, @ptrToInt(in.ptr) - @ptrToInt(buf.ptr)),
diff --git a/src/string_mutable.zig b/src/string_mutable.zig
index 9d89ea9db..a9f528cc0 100644
--- a/src/string_mutable.zig
+++ b/src/string_mutable.zig
@@ -30,18 +30,8 @@ pub const MutableString = struct {
}
}
- pub fn owns(this: *const MutableString, buffer: []const u8) bool {
- if (this.list.capacity < buffer.len) {
- return false;
- }
-
- if (@ptrToInt(this.list.items.ptr) <= @ptrToInt(buffer.ptr) and
- @ptrToInt(buffer.ptr) + buffer.len <= @ptrToInt(this.list.items.ptr) + this.list.capacity)
- {
- return true;
- }
-
- return false;
+ pub fn owns(this: *const MutableString, slice: []const u8) bool {
+ return @import("./global.zig").isSliceInBuffer(slice, this.list.items.ptr[0..this.list.capacity]);
}
pub fn growIfNeeded(self: *MutableString, amount: usize) !void {