aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-19 22:34:57 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-19 22:34:57 -0800
commitb230e7a73a78e67533cba0d852cefdbbd787eae9 (patch)
treec720a12d0bc6af9f8b3a6f053a8eefbb2ca50e76 /src
parente024116b776efc19b92d4be613c1449213690ca1 (diff)
downloadbun-b230e7a73a78e67533cba0d852cefdbbd787eae9.tar.gz
bun-b230e7a73a78e67533cba0d852cefdbbd787eae9.tar.zst
bun-b230e7a73a78e67533cba0d852cefdbbd787eae9.zip
[fetch] Fix sporadic data corruption bug in HTTP client and add fast path
- This removes memory pooling from the HTTP client which sometimes caused invalid memory to be written to the response body. - This adds a fast path for small HTTP/HTTPS responses that makes it a single memory allocation for the response body, instead of copying & allocating a temporary buffer cc @Electroid
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/bindings/bindings.zig1
-rw-r--r--src/bun.js/javascript.zig8
-rw-r--r--src/http/zlib.zig12
-rw-r--r--src/http_client_async.zig257
-rw-r--r--src/url.zig6
-rw-r--r--src/zlib.zig16
6 files changed, 167 insertions, 133 deletions
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig
index 7167cf828..d0da4d1fc 100644
--- a/src/bun.js/bindings/bindings.zig
+++ b/src/bun.js/bindings/bindings.zig
@@ -2803,6 +2803,7 @@ pub const JSValue = enum(JSValueReprInt) {
pub fn coerce(this: JSValue, comptime T: type, globalThis: *JSC.JSGlobalObject) T {
return switch (T) {
ZigString => this.getZigString(globalThis),
+ bool => this.toBooleanSlow(globalThis),
i32 => {
if (this.isInt32()) {
return this.asInt32();
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index 80e5c43e1..4897ab562 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -436,10 +436,10 @@ pub const VirtualMachine = struct {
modules: ModuleLoader.AsyncModule.Queue = .{},
aggressive_garbage_collection: GCLevel = GCLevel.none,
- pub const GCLevel = enum {
- none,
- mild,
- aggressive,
+ pub const GCLevel = enum(u3) {
+ none = 0,
+ mild = 1,
+ aggressive = 2,
};
pub threadlocal var is_main_thread_vm: bool = false;
diff --git a/src/http/zlib.zig b/src/http/zlib.zig
index 4a2e88bec..0e48be608 100644
--- a/src/http/zlib.zig
+++ b/src/http/zlib.zig
@@ -22,8 +22,16 @@ pub fn put(mutable: *MutableString) void {
node.release();
}
-pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void {
- var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, output.allocator);
+pub fn decompress(compressed_data: []const u8, output: *MutableString, allocator: std.mem.Allocator) Zlib.ZlibError!void {
+ var reader = try Zlib.ZlibReaderArrayList.initWithOptionsAndListAllocator(
+ compressed_data,
+ &output.list,
+ output.allocator,
+ allocator,
+ .{
+ .windowBits = 15 + 32,
+ },
+ );
try reader.readAll();
reader.deinit();
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index aec616b06..5f49641cb 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -31,7 +31,7 @@ pub const MimeType = @import("./http/mime_type.zig");
pub const URLPath = @import("./http/url_path.zig");
// This becomes Arena.allocator
pub var default_allocator: std.mem.Allocator = undefined;
-pub var default_arena: Arena = undefined;
+var default_arena: Arena = undefined;
pub var http_thread: HTTPThread = undefined;
const HiveArray = @import("./hive_array.zig").HiveArray;
const Batch = NetworkThread.Batch;
@@ -527,11 +527,10 @@ pub fn onClose(
// a missing 0\r\n chunk
if (in_progress and client.state.transfer_encoding == .chunked) {
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
- if (client.state.compressed_body orelse client.state.body_out_str) |body| {
- if (body.list.items.len > 0) {
- client.done(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
- return;
- }
+ var buf = client.state.getBodyBuffer();
+ if (buf.list.items.len > 0) {
+ client.done(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
+ return;
}
}
}
@@ -628,7 +627,7 @@ pub const HTTPStage = enum {
};
pub const InternalState = struct {
- request_message: ?*BodyPreamblePool.Node = null,
+ response_message_buffer: MutableString = undefined,
pending_response: picohttp.Response = undefined,
allow_keepalive: bool = true,
transfer_encoding: Encoding = Encoding.identity,
@@ -637,7 +636,7 @@ pub const InternalState = struct {
chunked_decoder: picohttp.phr_chunked_decoder = .{},
stage: Stage = Stage.pending,
body_out_str: ?*MutableString = null,
- compressed_body: ?*MutableString = null,
+ compressed_body: MutableString = undefined,
body_size: usize = 0,
request_body: []const u8 = "",
request_sent_len: usize = 0,
@@ -645,31 +644,36 @@ pub const InternalState = struct {
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
- pub fn reset(this: *InternalState) void {
- if (this.request_message) |msg| {
- msg.release();
- this.request_message = null;
- }
+ pub fn init(body: []const u8, body_out_str: *MutableString) InternalState {
+ return .{
+ .request_body = body,
+ .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
+ .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
+ .body_out_str = body_out_str,
+ .stage = Stage.pending,
+ .pending_response = picohttp.Response{},
+ };
+ }
- if (this.compressed_body) |body| {
- ZlibPool.put(body);
- this.compressed_body = null;
- }
+ pub fn reset(this: *InternalState) void {
+ this.compressed_body.deinit();
+ this.response_message_buffer.deinit();
var body_msg = this.body_out_str;
+ if (body_msg) |body| body.reset();
+
this.* = .{
.body_out_str = body_msg,
+ .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
+ .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
+ .request_body = "",
};
}
pub fn getBodyBuffer(this: *InternalState) *MutableString {
switch (this.encoding) {
Encoding.gzip, Encoding.deflate => {
- if (this.compressed_body == null) {
- this.compressed_body = ZlibPool.get(default_allocator);
- }
-
- return this.compressed_body.?;
+ return &this.compressed_body;
},
else => {
return this.body_out_str.?;
@@ -677,32 +681,48 @@ pub const InternalState = struct {
}
}
+ fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString) !void {
+ defer this.compressed_body.deinit();
+
+ var gzip_timer: std.time.Timer = undefined;
+
+ if (extremely_verbose)
+ gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
+
+ body_out_str.list.expandToCapacity();
+
+ ZlibPool.decompress(buffer.list.items, body_out_str, default_allocator) catch |err| {
+ Output.prettyErrorln("<r><red>Zlib error: {s}<r>", .{std.mem.span(@errorName(err))});
+ Output.flush();
+ return err;
+ };
+
+ if (extremely_verbose)
+ this.gzip_elapsed = gzip_timer.read();
+ }
+
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !void {
var body_out_str = this.body_out_str.?;
- var buffer_ = this.getBodyBuffer();
- buffer_.* = buffer;
switch (this.encoding) {
Encoding.gzip, Encoding.deflate => {
- var gzip_timer: std.time.Timer = undefined;
-
- if (extremely_verbose)
- gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
-
- body_out_str.list.expandToCapacity();
- defer ZlibPool.put(buffer_);
- ZlibPool.decompress(buffer_.list.items, body_out_str) catch |err| {
- Output.prettyErrorln("<r><red>Zlib error<r>", .{});
- Output.flush();
- return err;
- };
-
- if (extremely_verbose)
- this.gzip_elapsed = gzip_timer.read();
+ try this.decompress(buffer, body_out_str);
+ },
+ else => {
+ if (!body_out_str.owns(buffer.list.items)) {
+ body_out_str.append(buffer.list.items) catch |err| {
+ Output.prettyErrorln("<r><red>Failed to append to body buffer: {s}<r>", .{std.mem.span(@errorName(err))});
+ Output.flush();
+ return err;
+ };
+ }
},
- else => {},
}
+ this.postProcessBody(body_out_str);
+ }
+
+ pub fn postProcessBody(this: *InternalState, body_out_str: *MutableString) void {
var response = &this.pending_response;
// if it compressed with this header, it is no longer
if (this.content_encoding_i < response.headers.len) {
@@ -768,6 +788,9 @@ pub fn deinit(this: *HTTPClient) void {
redirect.release();
this.redirect = null;
}
+
+ this.state.compressed_body.deinit();
+ this.state.response_message_buffer.deinit();
}
const Stage = enum(u8) {
@@ -812,6 +835,14 @@ pub const Encoding = enum {
deflate,
brotli,
chunked,
+
+ pub fn isCompressed(this: Encoding) bool {
+ return switch (this) {
+ // we don't support brotli yet
+ .gzip, .deflate => true,
+ else => false,
+ };
+ }
};
const content_encoding_hash = hashHeaderName("Content-Encoding");
@@ -1067,9 +1098,6 @@ pub const AsyncHTTP = struct {
}
};
-const BodyPreambleArray = std.BoundedArray(u8, 1024 * 16);
-const BodyPreamblePool = ObjectPool(BodyPreambleArray, null, false, 16);
-
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
var header_count: usize = 0;
var header_entries = this.header_entries.slice();
@@ -1189,15 +1217,8 @@ pub fn doRedirect(this: *HTTPClient) void {
pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void {
body_out_str.reset();
- std.debug.assert(this.state.request_message == null);
- this.state = InternalState{
- .request_body = body,
- .body_out_str = body_out_str,
- .stage = Stage.pending,
- .request_message = null,
- .pending_response = picohttp.Response{},
- .compressed_body = null,
- };
+ std.debug.assert(this.state.response_message_buffer.list.capacity == 0);
+ this.state = InternalState.init(body, body_out_str);
if (this.url.isHTTPS()) {
this.start_(true);
@@ -1352,23 +1373,12 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
switch (this.state.response_stage) {
.pending, .headers => {
var to_read = incoming_data;
- var pending_buffers: [2]string = .{ "", "" };
var amount_read: usize = 0;
var needs_move = true;
- if (this.state.request_message) |req_msg| {
- var available = req_msg.data.unusedCapacitySlice();
- if (available.len == 0) {
- this.state.request_message.?.release();
- this.state.request_message = null;
- this.closeAndFail(error.ResponseHeaderTooLarge, is_ssl, socket);
- return;
- }
-
- const to_read_len = @minimum(available.len, to_read.len);
- req_msg.data.appendSliceAssumeCapacity(to_read[0..to_read_len]);
- to_read = req_msg.data.slice();
- pending_buffers[1] = incoming_data[to_read_len..];
- needs_move = pending_buffers[1].len > 0;
+ if (this.state.response_message_buffer.list.items.len > 0) {
+ this.state.response_message_buffer.append(incoming_data) catch @panic("Out of memory");
+ to_read = this.state.response_message_buffer.list.items;
+ needs_move = false;
}
this.state.pending_response = picohttp.Response{};
@@ -1384,15 +1394,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
const to_copy = incoming_data;
if (to_copy.len > 0) {
- this.state.request_message = this.state.request_message orelse brk: {
- var preamble = BodyPreamblePool.get(getAllocator());
- preamble.data = .{};
- break :brk preamble;
- };
- this.state.request_message.?.data.appendSlice(to_copy) catch {
- this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket);
- return;
- };
+ this.state.response_message_buffer.append(to_copy) catch @panic("Out of memory");
}
}
@@ -1407,11 +1409,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
this.state.pending_response = response;
- pending_buffers[0] = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..];
- if (pending_buffers[0].len == 0 and pending_buffers[1].len > 0) {
- pending_buffers[0] = pending_buffers[1];
- pending_buffers[1] = "";
- }
+ var body_buf = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..];
var deferred_redirect: ?*URLBufferPool.Node = null;
const can_continue = this.handleResponseMetadata(
@@ -1424,10 +1422,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
&deferred_redirect,
) catch |err| {
if (err == error.Redirect) {
- if (this.state.request_message) |msg| {
- msg.release();
- this.state.request_message = null;
- }
+ this.state.response_message_buffer.deinit();
if (this.state.allow_keepalive and FeatureFlags.enable_keepalive) {
std.debug.assert(this.connected_url.hostname.len > 0);
@@ -1461,25 +1456,13 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
}
- if (pending_buffers[0].len == 0) {
+ if (body_buf.len == 0) {
return;
}
if (this.state.response_stage == .body) {
{
- const is_done = this.handleResponseBody(pending_buffers[0]) catch |err| {
- this.closeAndFail(err, is_ssl, socket);
- return;
- };
-
- if (is_done) {
- this.done(is_ssl, ctx, socket);
- return;
- }
- }
-
- if (pending_buffers[1].len > 0) {
- const is_done = this.handleResponseBody(pending_buffers[1]) catch |err| {
+ const is_done = this.handleResponseBody(body_buf, true) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1492,19 +1475,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
} else if (this.state.response_stage == .body_chunk) {
this.setTimeout(socket, 500);
{
- const is_done = this.handleResponseBodyChunk(pending_buffers[0]) catch |err| {
- this.closeAndFail(err, is_ssl, socket);
- return;
- };
-
- if (is_done) {
- this.done(is_ssl, ctx, socket);
- return;
- }
- }
-
- if (pending_buffers[1].len > 0) {
- const is_done = this.handleResponseBodyChunk(pending_buffers[1]) catch |err| {
+ const is_done = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1514,15 +1485,13 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
}
}
-
- this.setTimeout(socket, 60);
}
},
.body => {
this.setTimeout(socket, 60);
- const is_done = this.handleResponseBody(incoming_data) catch |err| {
+ const is_done = this.handleResponseBody(incoming_data, false) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1536,7 +1505,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
.body_chunk => {
this.setTimeout(socket, 500);
- const is_done = this.handleResponseBodyChunk(incoming_data) catch |err| {
+ const is_done = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1709,7 +1678,51 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes
// never finishing sending the body
const preallocate_max = 1024 * 1024 * 256;
-pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool {
+pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
+
+ // is it exactly as much as we need?
+ if (is_only_buffer and incoming_data.len >= this.state.body_size) {
+ return handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]);
+ } else {
+ return handleResponseBodyFromMultiplePackets(this, incoming_data);
+ }
+}
+
+fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !bool {
+ if (this.state.encoding.isCompressed()) {
+ var body_buffer = this.state.body_out_str.?;
+ if (body_buffer.list.capacity == 0) {
+ const min = @minimum(@ceil(@intToFloat(f64, incoming_data.len) * 1.5), @as(f64, 1024 * 1024 * 2));
+ try body_buffer.growBy(@maximum(@floatToInt(usize, min), 32));
+ }
+
+ try ZlibPool.decompress(incoming_data, body_buffer, default_allocator);
+ } else {
+ try this.state.getBodyBuffer().appendSliceExact(incoming_data);
+ }
+
+ if (this.state.response_message_buffer.owns(incoming_data)) {
+ if (comptime Environment.allow_assert) {
+ // i'm not sure why this would happen and i haven't seen it happen
+ // but we should check
+ std.debug.assert(this.state.getBodyBuffer().list.items.ptr != this.state.response_message_buffer.list.items.ptr);
+ }
+
+ this.state.response_message_buffer.deinit();
+ }
+
+ if (this.progress_node) |progress| {
+ progress.activate();
+ progress.setCompletedItems(incoming_data.len);
+ progress.context.maybeRefresh();
+ }
+
+ this.state.postProcessBody(this.state.getBodyBuffer());
+
+ return true;
+}
+
+fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
var buffer = this.state.getBodyBuffer();
if (buffer.list.items.len == 0 and
@@ -1745,7 +1758,7 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool {
return false;
}
-pub fn handleResponseBodyChunk(
+pub fn handleResponseBodyChunkedEncoding(
this: *HTTPClient,
incoming_data: []const u8,
) !bool {
@@ -1762,10 +1775,11 @@ pub fn handleResponseBodyChunk(
// phr_decode_chunked mutates in-place
const pret = picohttp.phr_decode_chunked(
decoder,
- buffer.list.items.ptr + (buffer.list.items.len - incoming_data.len),
+ buffer.list.items.ptr + (buffer.list.items.len -| incoming_data.len),
&bytes_decoded,
);
buffer.list.items.len -|= incoming_data.len - bytes_decoded;
+ buffer_.* = buffer;
switch (pret) {
// Invalid HTTP response body
@@ -1780,11 +1794,6 @@ pub fn handleResponseBodyChunk(
progress.context.maybeRefresh();
}
- if (this.state.compressed_body) |compressed| {
- compressed.* = buffer;
- } else {
- this.state.body_out_str.?.* = buffer;
- }
return false;
},
// Done
diff --git a/src/url.zig b/src/url.zig
index c212c3fe2..a4e0946fb 100644
--- a/src/url.zig
+++ b/src/url.zig
@@ -120,7 +120,11 @@ pub const URL = struct {
}
pub fn getPortAuto(this: *const URL) u16 {
- return this.getPort() orelse (if (this.isHTTPS()) @as(u16, 443) else @as(u16, 80));
+ return this.getPort() orelse this.getDefaultPort();
+ }
+
+ pub fn getDefaultPort(this: *const URL) u16 {
+ return if (this.isHTTPS()) @as(u16, 443) else @as(u16, 80);
}
pub fn hasValidPort(this: *const URL) bool {
diff --git a/src/zlib.zig b/src/zlib.zig
index 2ff037c52..4be96760f 100644
--- a/src/zlib.zig
+++ b/src/zlib.zig
@@ -417,6 +417,7 @@ pub const ZlibReaderArrayList = struct {
input: []const u8,
list: std.ArrayListUnmanaged(u8),
+ list_allocator: std.mem.Allocator,
list_ptr: *std.ArrayListUnmanaged(u8),
zlib: zStream_struct,
allocator: std.mem.Allocator,
@@ -459,10 +460,15 @@ pub const ZlibReaderArrayList = struct {
}
pub fn initWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibReader {
+ return initWithOptionsAndListAllocator(input, list, allocator, allocator, options);
+ }
+
+ pub fn initWithOptionsAndListAllocator(input: []const u8, list: *std.ArrayListUnmanaged(u8), list_allocator: std.mem.Allocator, allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibReader {
var zlib_reader = try allocator.create(ZlibReader);
zlib_reader.* = ZlibReader{
.input = input,
.list = list.*,
+ .list_allocator = list_allocator,
.list_ptr = list,
.allocator = allocator,
.zlib = undefined,
@@ -552,7 +558,7 @@ pub const ZlibReaderArrayList = struct {
if (this.zlib.avail_out == 0) {
const initial = this.list.items.len;
- try this.list.ensureUnusedCapacity(this.allocator, 4096);
+ try this.list.ensureUnusedCapacity(this.list_allocator, 4096);
this.list.expandToCapacity();
this.zlib.next_out = &this.list.items[initial];
this.zlib.avail_out = @intCast(u32, this.list.items.len - initial);
@@ -818,6 +824,7 @@ pub const ZlibCompressorArrayList = struct {
input: []const u8,
list: std.ArrayListUnmanaged(u8),
+ list_allocator: std.mem.Allocator,
list_ptr: *std.ArrayListUnmanaged(u8),
zlib: zStream_struct,
allocator: std.mem.Allocator,
@@ -848,11 +855,16 @@ pub const ZlibCompressorArrayList = struct {
}
pub fn init(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibCompressor {
+ return initWithListAllocator(input, list, allocator, allocator, options);
+ }
+
+ pub fn initWithListAllocator(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, list_allocator: std.mem.Allocator, options: Options) ZlibError!*ZlibCompressor {
var zlib_reader = try allocator.create(ZlibCompressor);
zlib_reader.* = ZlibCompressor{
.input = input,
.list = list.*,
.list_ptr = list,
+ .list_allocator = list_allocator,
.allocator = allocator,
.zlib = undefined,
.arena = std.heap.ArenaAllocator.init(allocator),
@@ -957,7 +969,7 @@ pub const ZlibCompressorArrayList = struct {
if (this.zlib.avail_out == 0) {
const initial = this.list.items.len;
- try this.list.ensureUnusedCapacity(this.allocator, 4096);
+ try this.list.ensureUnusedCapacity(this.list_allocator, 4096);
this.list.expandToCapacity();
this.zlib.next_out = &this.list.items[initial];
this.zlib.avail_out = @intCast(u32, this.list.items.len - initial);