aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-02 18:07:05 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-02-02 18:07:05 -0800
commit1a4ccd3f5c2ef273bd5307d9c8939177bdc310cc (patch)
tree1b1634fa2d1450b87846de1d1dc572ba44d1f333
parent68cb6130d3e4bd25a53c959db9108a68f5268298 (diff)
downloadbun-jarred/async_bio.tar.gz
bun-jarred/async_bio.tar.zst
bun-jarred/async_bio.zip
[http] Fixes to chunked transfer encoding readerjarred/async_bio
-rw-r--r--misctools/fetch.zig15
-rw-r--r--src/http/async_bio.zig15
-rw-r--r--src/http/async_message.zig2
-rw-r--r--src/http/async_socket.zig40
-rw-r--r--src/http_client_async.zig82
5 files changed, 130 insertions, 24 deletions
diff --git a/misctools/fetch.zig b/misctools/fetch.zig
index 53a7f9499..2167429e0 100644
--- a/misctools/fetch.zig
+++ b/misctools/fetch.zig
@@ -31,6 +31,7 @@ const params = [_]clap.Param(clap.Help){
clap.parseParam("-r, --max-redirects <STR> Maximum number of redirects to follow (default: 128)") catch unreachable,
clap.parseParam("-b, --body <STR> HTTP request body as a string") catch unreachable,
clap.parseParam("-f, --file <STR> File path to load as body") catch unreachable,
+ clap.parseParam("-q, --quiet Quiet mode") catch unreachable,
clap.parseParam("--no-gzip Disable gzip") catch unreachable,
clap.parseParam("--no-deflate Disable deflate") catch unreachable,
clap.parseParam("--no-compression Disable gzip & deflate") catch unreachable,
@@ -70,6 +71,7 @@ pub const Arguments = struct {
headers_buf: string,
body: string = "",
turbo: bool = false,
+ quiet: bool = false,
pub fn parse(allocator: std.mem.Allocator) !Arguments {
var diag = clap.Diagnostic{};
@@ -159,6 +161,7 @@ pub const Arguments = struct {
.headers_buf = "",
.body = body_string,
.turbo = args.flag("--turbo"),
+ .quiet = args.flag("--quiet"),
};
}
};
@@ -213,7 +216,7 @@ pub fn main() anyerror!void {
while (true) {
while (channel.tryReadItem() catch null) |http| {
var response = http.response orelse {
- Output.printErrorln("<r><red>error<r><d>:<r> <b>HTTP response missing<r>", .{});
+ Output.prettyErrorln("<r><red>error<r><d>:<r> <b>HTTP response missing<r>", .{});
Output.flush();
std.os.exit(1);
};
@@ -227,10 +230,12 @@ pub fn main() anyerror!void {
},
}
- Output.flush();
- Output.disableBuffering();
- try Output.writer().writeAll(response_body_string.list.items);
- Output.enableBuffering();
+ if (!args.quiet) {
+ Output.flush();
+ Output.disableBuffering();
+ try Output.writer().writeAll(response_body_string.list.items);
+ Output.enableBuffering();
+ }
return;
}
}
diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig
index 765c841e0..2dfde8968 100644
--- a/src/http/async_bio.zig
+++ b/src/http/async_bio.zig
@@ -356,6 +356,7 @@ pub const Bio = struct {
}
var this = cast(this_bio);
+ assert(len_ < buffer_pool_len);
var socket_recv_len = this.socket_recv_len;
var bio_read_offset = this.bio_read_offset;
@@ -408,8 +409,20 @@ pub const Bio = struct {
if (len__ > @truncate(u32, bytes.len)) {
if (this.pending_reads == 0) {
+ // if this is true, we will never have enough space
+ if (socket_recv_len_ + len__ >= buffer_pool_len and len_ < buffer_pool_len) {
+ const unread = socket_recv_len_ - bio_read_offset;
+ // TODO: can we use virtual memory magic to skip the copy here?
+ std.mem.copyBackwards(u8, this.recv_buffer.?.data[0..unread], bytes);
+ bio_read_offset = 0;
+ this.bio_read_offset = 0;
+ this.socket_recv_len = @intCast(c_int, unread);
+ socket_recv_len = this.socket_recv_len;
+ bytes = this.recv_buffer.?.data[0..unread];
+ }
+
this.pending_reads += 1;
- this.scheduleSocketRead(len);
+ this.scheduleSocketRead(@maximum(len__ - @truncate(u32, bytes.len), 1));
}
boring.BIO_set_retry_read(this_bio);
diff --git a/src/http/async_message.zig b/src/http/async_message.zig
index 5fce3488b..3e204455f 100644
--- a/src/http/async_message.zig
+++ b/src/http/async_message.zig
@@ -2,7 +2,7 @@ const std = @import("std");
const ObjectPool = @import("../pool.zig").ObjectPool;
const AsyncIO = @import("io");
-pub const buffer_pool_len = std.math.maxInt(u16) - 64;
+pub const buffer_pool_len = std.math.maxInt(u16);
pub const BufferPoolBytes = [buffer_pool_len]u8;
pub const BufferPool = ObjectPool(BufferPoolBytes, null, false, 4);
diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig
index 062a645de..db4964fc0 100644
--- a/src/http/async_socket.zig
+++ b/src/http/async_socket.zig
@@ -252,6 +252,10 @@ const Reader = struct {
}
};
+pub inline fn bufferedReadAmount(_: *AsyncSocket) usize {
+ return 0;
+}
+
pub fn read(
this: *AsyncSocket,
bytes: []u8,
@@ -448,17 +452,19 @@ pub const SSL = struct {
if (this.pending_read_buffer.len > 0) {
reader: {
- var count: u32 = 0;
+ var count: u32 = this.pending_read_result catch unreachable;
this.pending_read_result = this.doPayloadRead(this.pending_read_buffer, &count) catch |err| brk: {
+ this.pending_read_result = count;
+
if (err == error.WouldBlock) {
- // partial reads are a success case
- // allow the client to ask for more
- if (count > 0) {
- this.pending_read_result = count;
- this.read_frame.maybeResume();
- break :reader;
- }
- this.pending_read_buffer = this.pending_read_buffer[count..];
+
+ // // partial reads are a success case
+ // // allow the client to ask for more
+ // if (count > 0) {
+ // this.read_frame.maybeResume();
+ // break :reader;
+ // }
+
break :reader;
}
break :brk err;
@@ -497,7 +503,7 @@ pub const SSL = struct {
return pending;
}
- var total_bytes_read: u32 = 0;
+ var total_bytes_read: u32 = count.*;
var ssl_ret: c_int = 0;
var ssl_err: c_int = 0;
const buf_len = @truncate(u32, buffer.len);
@@ -663,6 +669,14 @@ pub const SSL = struct {
return this.unencrypted_bytes_to_send.?.writeAll(buffer_).written;
}
+ pub fn bufferedReadAmount(this: *SSL) usize {
+ const pend = boring.SSL_pending(this.ssl);
+ return if (pend <= 0)
+ 0
+ else
+ @intCast(usize, pend);
+ }
+
pub fn send(this: *SSL) anyerror!usize {
this.unencrypted_bytes_to_send.?.sent = 0;
this.pending_write_buffer = this.unencrypted_bytes_to_send.?.buf[this.unencrypted_bytes_to_send.?.sent..this.unencrypted_bytes_to_send.?.used];
@@ -713,18 +727,18 @@ pub const SSL = struct {
pub fn read(this: *SSL, buf_: []u8, offset: u64) !u32 {
var buf = buf_[offset..];
var read_bytes: u32 = 0;
+ this.pending_read_result = 0;
return this.doPayloadRead(buf, &read_bytes) catch |err| {
if (err == error.WouldBlock) {
- this.pending_read_result = 0;
- this.pending_read_buffer = buf[read_bytes..];
+ this.pending_read_result = (this.pending_read_result catch unreachable) + read_bytes;
+ this.pending_read_buffer = buf;
suspend {
this.read_frame.set(@frame());
}
const result = this.pending_read_result;
this.pending_read_result = 0;
- this.pending_read_buffer = &[_]u8{};
return result;
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index d81d0bf4e..171693ace 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -804,6 +804,14 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
var last_read: usize = 0;
{
+ const buffered_amount = client.bufferedReadAmount();
+ if (buffered_amount > 0) {
+ var end = request_buffer[read_length..];
+ if (buffered_amount <= end.len) {
+ std.debug.assert(client.read(end, buffered_amount) catch unreachable == buffered_amount);
+ response.bytes_read += @intCast(c_int, buffered_amount);
+ }
+ }
var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length];
last_read = remainder.len;
try buffer.inflate(std.math.max(remainder.len, 2048));
@@ -824,20 +832,86 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti
var total_size = rsize;
while (pret == -2) {
- if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) {
- try buffer.inflate(std.math.max(total_size * 2, 1024));
+ var buffered_amount = client.bufferedReadAmount();
+ if (buffer.list.items.len < total_size + 512 or buffer.list.items[total_size..].len < @intCast(usize, @maximum(decoder.bytes_left_in_chunk, buffered_amount)) or buffer.list.items[total_size..].len < 512) {
+ try buffer.inflate(std.math.max((buffered_amount + total_size) * 2, 1024));
+ if (comptime Environment.isDebug) {
+ var temp_buffer = buffer;
+ temp_buffer.list.expandToCapacity();
+ @memset(temp_buffer.list.items.ptr + buffer.list.items.len, 0, temp_buffer.list.items.len - buffer.list.items.len);
+ buffer = temp_buffer;
+ }
buffer.list.expandToCapacity();
}
- rret = try client.read(buffer.list.items, total_size);
+ // while (true) {
+
+ if (extremely_verbose) {
+ Output.prettyErrorln(
+ \\ Buffered: {d}
+ \\ Chunk
+ \\ {d} left / {d} bytes total (buffer: {d})
+ \\ Read
+ \\ {d} bytes / {d} total ({d} parsed)
+ , .{
+ client.bufferedReadAmount(),
+ decoder.bytes_left_in_chunk,
+ total_size,
+ buffer.list.items.len,
+ rret,
+ total_size,
+ total_size,
+ });
+ }
+
+ var remainder = buffer.list.items[total_size..];
+ const errorable_read = client.read(remainder, 0);
+
+ rret = errorable_read catch |err| {
+ if (extremely_verbose) Output.prettyErrorln("Chunked transfoer encoding error: {s}", .{@errorName(err)});
+ return err;
+ };
+
+ buffered_amount = client.bufferedReadAmount();
+ if (buffered_amount > 0) {
+ try buffer.list.ensureTotalCapacity(default_allocator, rret + total_size + buffered_amount);
+ buffer.list.expandToCapacity();
+ remainder = buffer.list.items[total_size..];
+ remainder = remainder[rret..][0..buffered_amount];
+ rret += client.read(remainder, 0) catch |err| {
+ if (extremely_verbose) Output.prettyErrorln("Chunked transfoer encoding error: {s}", .{@errorName(err)});
+ return err;
+ };
+ }
if (rret == 0) {
+ if (extremely_verbose) Output.prettyErrorln("Unexpected 0", .{});
+
return error.ChunkedEncodingError;
}
rsize = rret;
pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize);
- if (pret == -1) return error.ChunkedEncodingParseError;
+ if (pret == -1) {
+ if (extremely_verbose)
+ Output.prettyErrorln(
+ \\ buffered: {d}
+ \\ rsize: {d}
+ \\ Read: {d} bytes / {d} total ({d} parsed)
+ \\ Chunk {d} left
+ \\ {}
+ , .{
+ client.bufferedReadAmount(),
+ rsize,
+ rret,
+ buffer.list.items.len,
+ total_size,
+ decoder.bytes_left_in_chunk,
+
+ decoder,
+ });
+ return error.ChunkedEncodingParseError;
+ }
total_size += rsize;