diff options
| -rw-r--r-- | src/bun.js/api/bun/socket.zig | 4 | ||||
| -rw-r--r-- | src/deps/uws.zig | 6 | ||||
| -rw-r--r-- | src/global.zig | 4 | ||||
| -rw-r--r-- | src/http_client_async.zig | 98 | ||||
| -rw-r--r-- | src/install/semver.zig | 2 | ||||
| -rw-r--r-- | src/string_mutable.zig | 14 |
6 files changed, 107 insertions, 21 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index ccf265202..2405996cf 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -774,6 +774,10 @@ fn NewSocket(comptime ssl: bool) type { poll_ref: JSC.PollRef = JSC.PollRef.init(), reffer: JSC.Ref = JSC.Ref.init(), last_4: [4]u8 = .{ 0, 0, 0, 0 }, + + // TODO: switch to something that uses `visitAggregate` and have the + // `Listener` keep a list of all the sockets JSValue in there + // This is wasteful because it means we are keeping a JSC::Weak for every single open socket has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), const This = @This(); diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 1551efba7..e69b096b4 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -432,6 +432,10 @@ pub const Loop = extern struct { low_prio_head: ?*Socket, low_prio_budget: i32, iteration_nr: c_longlong, + + pub fn recvSlice(this: *InternalLoopData) []u8 { + return this.recv_buf[0..LIBUS_RECV_BUFFER_LENGTH]; + } }; pub fn get() ?*Loop { @@ -1662,7 +1666,7 @@ extern fn uws_res_upgrade( ) void; extern fn uws_res_cork(i32, res: *uws_res, ctx: *anyopaque, corker: fn (?*anyopaque) callconv(.C) void) void; extern fn uws_res_write_headers(i32, res: *uws_res, names: [*]const Api.StringPointer, values: [*]const Api.StringPointer, count: usize, buf: [*]const u8) void; -pub const LIBUS_RECV_BUFFER_LENGTH = @import("std").zig.c_translation.promoteIntLiteral(i32, 524288, .decimal); +pub const LIBUS_RECV_BUFFER_LENGTH = 524288; pub const LIBUS_TIMEOUT_GRANULARITY = @as(i32, 4); pub const LIBUS_RECV_BUFFER_PADDING = @as(i32, 32); pub const LIBUS_EXT_ALIGNMENT = @as(i32, 16); diff --git a/src/global.zig b/src/global.zig index c5d6af7a8..c75a72891 100644 --- a/src/global.zig +++ b/src/global.zig @@ -409,3 +409,7 @@ pub fn isHeapMemory(memory: anytype) bool { } pub const Mimalloc = @import("./allocators/mimalloc.zig"); + +pub fn isSliceInBuffer(slice: []const u8, buffer: []const u8) bool { + return slice.len > 0 and @ptrToInt(buffer.ptr) <= @ptrToInt(slice.ptr) and ((@ptrToInt(slice.ptr) + slice.len) <= (@ptrToInt(buffer.ptr) + buffer.len)); +} diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 5f49641cb..4d46f41b9 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -49,6 +49,8 @@ var shared_request_headers_buf: [256]picohttp.Header = undefined; // this doesn't need to be stack memory because it is immediately cloned after use var shared_response_headers_buf: [256]picohttp.Header = undefined; +const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n"; + fn NewHTTPContext(comptime ssl: bool) type { return struct { const pool_size = 64; @@ -191,7 +193,7 @@ fn NewHTTPContext(comptime ssl: bool) type { ); } else { // trailing zero is fine to ignore - if (strings.eqlComptime(buf, "0\r\n")) { + if (strings.eqlComptime(buf, end_of_chunked_http1_1_encoding_response_body)) { return; } @@ -541,8 +543,9 @@ pub fn onClose( return; } - if (in_progress) + if (in_progress) { client.fail(error.ConnectionClosed); + } } pub fn onTimeout( client: *HTTPClient, @@ -1376,7 +1379,8 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u var amount_read: usize = 0; var needs_move = true; if (this.state.response_message_buffer.list.items.len > 0) { - this.state.response_message_buffer.append(incoming_data) catch @panic("Out of memory"); + // this one probably won't be another chunk, so we use appendSliceExact() to avoid over-allocating + this.state.response_message_buffer.appendSliceExact(incoming_data) catch @panic("Out of memory"); to_read = this.state.response_message_buffer.list.items; needs_move = false; } @@ -1394,6 +1398,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u const to_copy = incoming_data; if (to_copy.len > 0) { + // this one will probably be another chunk, so we leave a little extra room this.state.response_message_buffer.append(to_copy) catch @panic("Out of memory"); } } @@ -1679,16 +1684,18 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes const preallocate_max = 1024 * 1024 * 256; pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool { + std.debug.assert(this.state.transfer_encoding == .identity); // is it exactly as much as we need? if (is_only_buffer and incoming_data.len >= this.state.body_size) { - return handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]); + try handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]); + return true; } else { return handleResponseBodyFromMultiplePackets(this, incoming_data); } } -fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !bool { +fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !void { if (this.state.encoding.isCompressed()) { var body_buffer = this.state.body_out_str.?; if (body_buffer.list.capacity == 0) { @@ -1718,8 +1725,6 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const } this.state.postProcessBody(this.state.getBodyBuffer()); - - return true; } fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool { @@ -1762,6 +1767,17 @@ pub fn handleResponseBodyChunkedEncoding( this: *HTTPClient, incoming_data: []const u8, ) !bool { + if (incoming_data.len <= single_packet_small_buffer.len and this.state.getBodyBuffer().list.items.len == 0) { + return try this.handleResponseBodyChunkedEncodingFromSinglePacket(incoming_data); + } else { + return try this.handleResponseBodyChunkedEncodingFromMultiplePackets(incoming_data); + } +} + +fn handleResponseBodyChunkedEncodingFromMultiplePackets( + this: *HTTPClient, + incoming_data: []const u8, +) !bool { var decoder = &this.state.chunked_decoder; var buffer_ = this.state.getBodyBuffer(); var buffer = buffer_.*; @@ -1815,6 +1831,74 @@ pub fn handleResponseBodyChunkedEncoding( unreachable; } +// the first packet for Transfer-Encoding: chunk +// is usually pretty small or sometimes even just a length +// so we can avoid allocating a temporary buffer to copy the data in +var single_packet_small_buffer: [16 * 1024]u8 = undefined; +fn handleResponseBodyChunkedEncodingFromSinglePacket( + this: *HTTPClient, + incoming_data: []const u8, +) !bool { + var decoder = &this.state.chunked_decoder; + std.debug.assert(incoming_data.len <= single_packet_small_buffer.len); + + // set consume_trailer to 1 to discard the trailing header + // using content-encoding per chunk is not supported + decoder.consume_trailer = 1; + + var buffer: []u8 = undefined; + + if ( + // if we've already copied the buffer once, we can avoid copying it again. + this.state.response_message_buffer.owns(incoming_data)) { + buffer = bun.constStrToU8(incoming_data); + } else { + buffer = single_packet_small_buffer[0..incoming_data.len]; + @memcpy(buffer.ptr, incoming_data.ptr, incoming_data.len); + } + + var bytes_decoded = incoming_data.len; + // phr_decode_chunked mutates in-place + const pret = picohttp.phr_decode_chunked( + decoder, + buffer.ptr + (buffer.len -| incoming_data.len), + &bytes_decoded, + ); + buffer.len -|= incoming_data.len - bytes_decoded; + + switch (pret) { + // Invalid HTTP response body + -1 => { + return error.InvalidHTTPResponse; + }, + // Needs more data + -2 => { + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(buffer.len); + progress.context.maybeRefresh(); + } + try this.state.getBodyBuffer().appendSliceExact(buffer); + + return false; + }, + // Done + else => { + try this.handleResponseBodyFromSinglePacket(buffer); + std.debug.assert(this.state.body_out_str.?.list.items.ptr != buffer.ptr); + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(buffer.len); + progress.context.maybeRefresh(); + } + + return true; + }, + } + + unreachable; +} + pub fn handleResponseMetadata( this: *HTTPClient, response: picohttp.Response, diff --git a/src/install/semver.zig b/src/install/semver.zig index 813ee1a8e..37af5c7e2 100644 --- a/src/install/semver.zig +++ b/src/install/semver.zig @@ -211,7 +211,7 @@ pub const String = extern struct { buf: string, in: string, ) Pointer { - std.debug.assert(@ptrToInt(buf.ptr) <= @ptrToInt(in.ptr) and ((@ptrToInt(in.ptr) + in.len) <= (@ptrToInt(buf.ptr) + buf.len))); + std.debug.assert(bun.isSliceInBuffer(in, buf)); return Pointer{ .off = @truncate(u32, @ptrToInt(in.ptr) - @ptrToInt(buf.ptr)), diff --git a/src/string_mutable.zig b/src/string_mutable.zig index 9d89ea9db..a9f528cc0 100644 --- a/src/string_mutable.zig +++ b/src/string_mutable.zig @@ -30,18 +30,8 @@ pub const MutableString = struct { } } - pub fn owns(this: *const MutableString, buffer: []const u8) bool { - if (this.list.capacity < buffer.len) { - return false; - } - - if (@ptrToInt(this.list.items.ptr) <= @ptrToInt(buffer.ptr) and - @ptrToInt(buffer.ptr) + buffer.len <= @ptrToInt(this.list.items.ptr) + this.list.capacity) - { - return true; - } - - return false; + pub fn owns(this: *const MutableString, slice: []const u8) bool { + return @import("./global.zig").isSliceInBuffer(slice, this.list.items.ptr[0..this.list.capacity]); } pub fn growIfNeeded(self: *MutableString, amount: usize) !void { |
