diff options
author | 2022-11-19 22:34:57 -0800 | |
---|---|---|
committer | 2022-11-19 22:34:57 -0800 | |
commit | b230e7a73a78e67533cba0d852cefdbbd787eae9 (patch) | |
tree | c720a12d0bc6af9f8b3a6f053a8eefbb2ca50e76 /src | |
parent | e024116b776efc19b92d4be613c1449213690ca1 (diff) | |
download | bun-b230e7a73a78e67533cba0d852cefdbbd787eae9.tar.gz bun-b230e7a73a78e67533cba0d852cefdbbd787eae9.tar.zst bun-b230e7a73a78e67533cba0d852cefdbbd787eae9.zip |
[fetch] Fix sporadic data corruption bug in HTTP client and add fast path
- This removes memory pooling from the HTTP client which sometimes caused invalid memory to be written to the response body.
- This adds a fast path for small HTTP/HTTPS responses that makes it a single memory allocation for the response body, instead of copying & allocating a temporary buffer
cc @Electroid
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/bindings/bindings.zig | 1 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 8 | ||||
-rw-r--r-- | src/http/zlib.zig | 12 | ||||
-rw-r--r-- | src/http_client_async.zig | 257 | ||||
-rw-r--r-- | src/url.zig | 6 | ||||
-rw-r--r-- | src/zlib.zig | 16 |
6 files changed, 167 insertions, 133 deletions
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index 7167cf828..d0da4d1fc 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -2803,6 +2803,7 @@ pub const JSValue = enum(JSValueReprInt) { pub fn coerce(this: JSValue, comptime T: type, globalThis: *JSC.JSGlobalObject) T { return switch (T) { ZigString => this.getZigString(globalThis), + bool => this.toBooleanSlow(globalThis), i32 => { if (this.isInt32()) { return this.asInt32(); diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 80e5c43e1..4897ab562 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -436,10 +436,10 @@ pub const VirtualMachine = struct { modules: ModuleLoader.AsyncModule.Queue = .{}, aggressive_garbage_collection: GCLevel = GCLevel.none, - pub const GCLevel = enum { - none, - mild, - aggressive, + pub const GCLevel = enum(u3) { + none = 0, + mild = 1, + aggressive = 2, }; pub threadlocal var is_main_thread_vm: bool = false; diff --git a/src/http/zlib.zig b/src/http/zlib.zig index 4a2e88bec..0e48be608 100644 --- a/src/http/zlib.zig +++ b/src/http/zlib.zig @@ -22,8 +22,16 @@ pub fn put(mutable: *MutableString) void { node.release(); } -pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void { - var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, output.allocator); +pub fn decompress(compressed_data: []const u8, output: *MutableString, allocator: std.mem.Allocator) Zlib.ZlibError!void { + var reader = try Zlib.ZlibReaderArrayList.initWithOptionsAndListAllocator( + compressed_data, + &output.list, + output.allocator, + allocator, + .{ + .windowBits = 15 + 32, + }, + ); try reader.readAll(); reader.deinit(); } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index aec616b06..5f49641cb 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -31,7 +31,7 @@ pub const MimeType = @import("./http/mime_type.zig"); pub const URLPath = @import("./http/url_path.zig"); // This becomes Arena.allocator pub var default_allocator: std.mem.Allocator = undefined; -pub var default_arena: Arena = undefined; +var default_arena: Arena = undefined; pub var http_thread: HTTPThread = undefined; const HiveArray = @import("./hive_array.zig").HiveArray; const Batch = NetworkThread.Batch; @@ -527,11 +527,10 @@ pub fn onClose( // a missing 0\r\n chunk if (in_progress and client.state.transfer_encoding == .chunked) { if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) { - if (client.state.compressed_body orelse client.state.body_out_str) |body| { - if (body.list.items.len > 0) { - client.done(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); - return; - } + var buf = client.state.getBodyBuffer(); + if (buf.list.items.len > 0) { + client.done(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); + return; } } } @@ -628,7 +627,7 @@ pub const HTTPStage = enum { }; pub const InternalState = struct { - request_message: ?*BodyPreamblePool.Node = null, + response_message_buffer: MutableString = undefined, pending_response: picohttp.Response = undefined, allow_keepalive: bool = true, transfer_encoding: Encoding = Encoding.identity, @@ -637,7 +636,7 @@ pub const InternalState = struct { chunked_decoder: picohttp.phr_chunked_decoder = .{}, stage: Stage = Stage.pending, body_out_str: ?*MutableString = null, - compressed_body: ?*MutableString = null, + compressed_body: MutableString = undefined, body_size: usize = 0, request_body: []const u8 = "", request_sent_len: usize = 0, @@ -645,31 +644,36 @@ pub const InternalState = struct { request_stage: HTTPStage = .pending, response_stage: HTTPStage = .pending, - pub fn reset(this: *InternalState) void { - if (this.request_message) |msg| { - msg.release(); - this.request_message = null; - } + pub fn init(body: []const u8, body_out_str: *MutableString) InternalState { + return .{ + .request_body = body, + .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, + .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, + .body_out_str = body_out_str, + .stage = Stage.pending, + .pending_response = picohttp.Response{}, + }; + } - if (this.compressed_body) |body| { - ZlibPool.put(body); - this.compressed_body = null; - } + pub fn reset(this: *InternalState) void { + this.compressed_body.deinit(); + this.response_message_buffer.deinit(); var body_msg = this.body_out_str; + if (body_msg) |body| body.reset(); + this.* = .{ .body_out_str = body_msg, + .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, + .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, + .request_body = "", }; } pub fn getBodyBuffer(this: *InternalState) *MutableString { switch (this.encoding) { Encoding.gzip, Encoding.deflate => { - if (this.compressed_body == null) { - this.compressed_body = ZlibPool.get(default_allocator); - } - - return this.compressed_body.?; + return &this.compressed_body; }, else => { return this.body_out_str.?; @@ -677,32 +681,48 @@ pub const InternalState = struct { } } + fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString) !void { + defer this.compressed_body.deinit(); + + var gzip_timer: std.time.Timer = undefined; + + if (extremely_verbose) + gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); + + body_out_str.list.expandToCapacity(); + + ZlibPool.decompress(buffer.list.items, body_out_str, default_allocator) catch |err| { + Output.prettyErrorln("<r><red>Zlib error: {s}<r>", .{std.mem.span(@errorName(err))}); + Output.flush(); + return err; + }; + + if (extremely_verbose) + this.gzip_elapsed = gzip_timer.read(); + } + pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !void { var body_out_str = this.body_out_str.?; - var buffer_ = this.getBodyBuffer(); - buffer_.* = buffer; switch (this.encoding) { Encoding.gzip, Encoding.deflate => { - var gzip_timer: std.time.Timer = undefined; - - if (extremely_verbose) - gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); - - body_out_str.list.expandToCapacity(); - defer ZlibPool.put(buffer_); - ZlibPool.decompress(buffer_.list.items, body_out_str) catch |err| { - Output.prettyErrorln("<r><red>Zlib error<r>", .{}); - Output.flush(); - return err; - }; - - if (extremely_verbose) - this.gzip_elapsed = gzip_timer.read(); + try this.decompress(buffer, body_out_str); + }, + else => { + if (!body_out_str.owns(buffer.list.items)) { + body_out_str.append(buffer.list.items) catch |err| { + Output.prettyErrorln("<r><red>Failed to append to body buffer: {s}<r>", .{std.mem.span(@errorName(err))}); + Output.flush(); + return err; + }; + } }, - else => {}, } + this.postProcessBody(body_out_str); + } + + pub fn postProcessBody(this: *InternalState, body_out_str: *MutableString) void { var response = &this.pending_response; // if it compressed with this header, it is no longer if (this.content_encoding_i < response.headers.len) { @@ -768,6 +788,9 @@ pub fn deinit(this: *HTTPClient) void { redirect.release(); this.redirect = null; } + + this.state.compressed_body.deinit(); + this.state.response_message_buffer.deinit(); } const Stage = enum(u8) { @@ -812,6 +835,14 @@ pub const Encoding = enum { deflate, brotli, chunked, + + pub fn isCompressed(this: Encoding) bool { + return switch (this) { + // we don't support brotli yet + .gzip, .deflate => true, + else => false, + }; + } }; const content_encoding_hash = hashHeaderName("Content-Encoding"); @@ -1067,9 +1098,6 @@ pub const AsyncHTTP = struct { } }; -const BodyPreambleArray = std.BoundedArray(u8, 1024 * 16); -const BodyPreamblePool = ObjectPool(BodyPreambleArray, null, false, 16); - pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); @@ -1189,15 +1217,8 @@ pub fn doRedirect(this: *HTTPClient) void { pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void { body_out_str.reset(); - std.debug.assert(this.state.request_message == null); - this.state = InternalState{ - .request_body = body, - .body_out_str = body_out_str, - .stage = Stage.pending, - .request_message = null, - .pending_response = picohttp.Response{}, - .compressed_body = null, - }; + std.debug.assert(this.state.response_message_buffer.list.capacity == 0); + this.state = InternalState.init(body, body_out_str); if (this.url.isHTTPS()) { this.start_(true); @@ -1352,23 +1373,12 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u switch (this.state.response_stage) { .pending, .headers => { var to_read = incoming_data; - var pending_buffers: [2]string = .{ "", "" }; var amount_read: usize = 0; var needs_move = true; - if (this.state.request_message) |req_msg| { - var available = req_msg.data.unusedCapacitySlice(); - if (available.len == 0) { - this.state.request_message.?.release(); - this.state.request_message = null; - this.closeAndFail(error.ResponseHeaderTooLarge, is_ssl, socket); - return; - } - - const to_read_len = @minimum(available.len, to_read.len); - req_msg.data.appendSliceAssumeCapacity(to_read[0..to_read_len]); - to_read = req_msg.data.slice(); - pending_buffers[1] = incoming_data[to_read_len..]; - needs_move = pending_buffers[1].len > 0; + if (this.state.response_message_buffer.list.items.len > 0) { + this.state.response_message_buffer.append(incoming_data) catch @panic("Out of memory"); + to_read = this.state.response_message_buffer.list.items; + needs_move = false; } this.state.pending_response = picohttp.Response{}; @@ -1384,15 +1394,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u const to_copy = incoming_data; if (to_copy.len > 0) { - this.state.request_message = this.state.request_message orelse brk: { - var preamble = BodyPreamblePool.get(getAllocator()); - preamble.data = .{}; - break :brk preamble; - }; - this.state.request_message.?.data.appendSlice(to_copy) catch { - this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket); - return; - }; + this.state.response_message_buffer.append(to_copy) catch @panic("Out of memory"); } } @@ -1407,11 +1409,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u this.state.pending_response = response; - pending_buffers[0] = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..]; - if (pending_buffers[0].len == 0 and pending_buffers[1].len > 0) { - pending_buffers[0] = pending_buffers[1]; - pending_buffers[1] = ""; - } + var body_buf = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..]; var deferred_redirect: ?*URLBufferPool.Node = null; const can_continue = this.handleResponseMetadata( @@ -1424,10 +1422,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u &deferred_redirect, ) catch |err| { if (err == error.Redirect) { - if (this.state.request_message) |msg| { - msg.release(); - this.state.request_message = null; - } + this.state.response_message_buffer.deinit(); if (this.state.allow_keepalive and FeatureFlags.enable_keepalive) { std.debug.assert(this.connected_url.hostname.len > 0); @@ -1461,25 +1456,13 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; } - if (pending_buffers[0].len == 0) { + if (body_buf.len == 0) { return; } if (this.state.response_stage == .body) { { - const is_done = this.handleResponseBody(pending_buffers[0]) catch |err| { - this.closeAndFail(err, is_ssl, socket); - return; - }; - - if (is_done) { - this.done(is_ssl, ctx, socket); - return; - } - } - - if (pending_buffers[1].len > 0) { - const is_done = this.handleResponseBody(pending_buffers[1]) catch |err| { + const is_done = this.handleResponseBody(body_buf, true) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; @@ -1492,19 +1475,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } else if (this.state.response_stage == .body_chunk) { this.setTimeout(socket, 500); { - const is_done = this.handleResponseBodyChunk(pending_buffers[0]) catch |err| { - this.closeAndFail(err, is_ssl, socket); - return; - }; - - if (is_done) { - this.done(is_ssl, ctx, socket); - return; - } - } - - if (pending_buffers[1].len > 0) { - const is_done = this.handleResponseBodyChunk(pending_buffers[1]) catch |err| { + const is_done = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; @@ -1514,15 +1485,13 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; } } - - this.setTimeout(socket, 60); } }, .body => { this.setTimeout(socket, 60); - const is_done = this.handleResponseBody(incoming_data) catch |err| { + const is_done = this.handleResponseBody(incoming_data, false) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; @@ -1536,7 +1505,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u .body_chunk => { this.setTimeout(socket, 500); - const is_done = this.handleResponseBodyChunk(incoming_data) catch |err| { + const is_done = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; @@ -1709,7 +1678,51 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes // never finishing sending the body const preallocate_max = 1024 * 1024 * 256; -pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool { +pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool { + + // is it exactly as much as we need? + if (is_only_buffer and incoming_data.len >= this.state.body_size) { + return handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]); + } else { + return handleResponseBodyFromMultiplePackets(this, incoming_data); + } +} + +fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !bool { + if (this.state.encoding.isCompressed()) { + var body_buffer = this.state.body_out_str.?; + if (body_buffer.list.capacity == 0) { + const min = @minimum(@ceil(@intToFloat(f64, incoming_data.len) * 1.5), @as(f64, 1024 * 1024 * 2)); + try body_buffer.growBy(@maximum(@floatToInt(usize, min), 32)); + } + + try ZlibPool.decompress(incoming_data, body_buffer, default_allocator); + } else { + try this.state.getBodyBuffer().appendSliceExact(incoming_data); + } + + if (this.state.response_message_buffer.owns(incoming_data)) { + if (comptime Environment.allow_assert) { + // i'm not sure why this would happen and i haven't seen it happen + // but we should check + std.debug.assert(this.state.getBodyBuffer().list.items.ptr != this.state.response_message_buffer.list.items.ptr); + } + + this.state.response_message_buffer.deinit(); + } + + if (this.progress_node) |progress| { + progress.activate(); + progress.setCompletedItems(incoming_data.len); + progress.context.maybeRefresh(); + } + + this.state.postProcessBody(this.state.getBodyBuffer()); + + return true; +} + +fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool { var buffer = this.state.getBodyBuffer(); if (buffer.list.items.len == 0 and @@ -1745,7 +1758,7 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool { return false; } -pub fn handleResponseBodyChunk( +pub fn handleResponseBodyChunkedEncoding( this: *HTTPClient, incoming_data: []const u8, ) !bool { @@ -1762,10 +1775,11 @@ pub fn handleResponseBodyChunk( // phr_decode_chunked mutates in-place const pret = picohttp.phr_decode_chunked( decoder, - buffer.list.items.ptr + (buffer.list.items.len - incoming_data.len), + buffer.list.items.ptr + (buffer.list.items.len -| incoming_data.len), &bytes_decoded, ); buffer.list.items.len -|= incoming_data.len - bytes_decoded; + buffer_.* = buffer; switch (pret) { // Invalid HTTP response body @@ -1780,11 +1794,6 @@ pub fn handleResponseBodyChunk( progress.context.maybeRefresh(); } - if (this.state.compressed_body) |compressed| { - compressed.* = buffer; - } else { - this.state.body_out_str.?.* = buffer; - } return false; }, // Done diff --git a/src/url.zig b/src/url.zig index c212c3fe2..a4e0946fb 100644 --- a/src/url.zig +++ b/src/url.zig @@ -120,7 +120,11 @@ pub const URL = struct { } pub fn getPortAuto(this: *const URL) u16 { - return this.getPort() orelse (if (this.isHTTPS()) @as(u16, 443) else @as(u16, 80)); + return this.getPort() orelse this.getDefaultPort(); + } + + pub fn getDefaultPort(this: *const URL) u16 { + return if (this.isHTTPS()) @as(u16, 443) else @as(u16, 80); } pub fn hasValidPort(this: *const URL) bool { diff --git a/src/zlib.zig b/src/zlib.zig index 2ff037c52..4be96760f 100644 --- a/src/zlib.zig +++ b/src/zlib.zig @@ -417,6 +417,7 @@ pub const ZlibReaderArrayList = struct { input: []const u8, list: std.ArrayListUnmanaged(u8), + list_allocator: std.mem.Allocator, list_ptr: *std.ArrayListUnmanaged(u8), zlib: zStream_struct, allocator: std.mem.Allocator, @@ -459,10 +460,15 @@ pub const ZlibReaderArrayList = struct { } pub fn initWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibReader { + return initWithOptionsAndListAllocator(input, list, allocator, allocator, options); + } + + pub fn initWithOptionsAndListAllocator(input: []const u8, list: *std.ArrayListUnmanaged(u8), list_allocator: std.mem.Allocator, allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibReader { var zlib_reader = try allocator.create(ZlibReader); zlib_reader.* = ZlibReader{ .input = input, .list = list.*, + .list_allocator = list_allocator, .list_ptr = list, .allocator = allocator, .zlib = undefined, @@ -552,7 +558,7 @@ pub const ZlibReaderArrayList = struct { if (this.zlib.avail_out == 0) { const initial = this.list.items.len; - try this.list.ensureUnusedCapacity(this.allocator, 4096); + try this.list.ensureUnusedCapacity(this.list_allocator, 4096); this.list.expandToCapacity(); this.zlib.next_out = &this.list.items[initial]; this.zlib.avail_out = @intCast(u32, this.list.items.len - initial); @@ -818,6 +824,7 @@ pub const ZlibCompressorArrayList = struct { input: []const u8, list: std.ArrayListUnmanaged(u8), + list_allocator: std.mem.Allocator, list_ptr: *std.ArrayListUnmanaged(u8), zlib: zStream_struct, allocator: std.mem.Allocator, @@ -848,11 +855,16 @@ pub const ZlibCompressorArrayList = struct { } pub fn init(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibCompressor { + return initWithListAllocator(input, list, allocator, allocator, options); + } + + pub fn initWithListAllocator(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, list_allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibCompressor { var zlib_reader = try allocator.create(ZlibCompressor); zlib_reader.* = ZlibCompressor{ .input = input, .list = list.*, .list_ptr = list, + .list_allocator = list_allocator, .allocator = allocator, .zlib = undefined, .arena = std.heap.ArenaAllocator.init(allocator), @@ -957,7 +969,7 @@ pub const ZlibCompressorArrayList = struct { if (this.zlib.avail_out == 0) { const initial = this.list.items.len; - try this.list.ensureUnusedCapacity(this.allocator, 4096); + try this.list.ensureUnusedCapacity(this.list_allocator, 4096); this.list.expandToCapacity(); this.zlib.next_out = &this.list.items[initial]; this.zlib.avail_out = @intCast(u32, this.list.items.len - initial); |