diff options
-rw-r--r-- | misctools/fetch.zig | 15 | ||||
-rw-r--r-- | src/http/async_bio.zig | 15 | ||||
-rw-r--r-- | src/http/async_message.zig | 2 | ||||
-rw-r--r-- | src/http/async_socket.zig | 40 | ||||
-rw-r--r-- | src/http_client_async.zig | 82 |
5 files changed, 130 insertions, 24 deletions
diff --git a/misctools/fetch.zig b/misctools/fetch.zig index 53a7f9499..2167429e0 100644 --- a/misctools/fetch.zig +++ b/misctools/fetch.zig @@ -31,6 +31,7 @@ const params = [_]clap.Param(clap.Help){ clap.parseParam("-r, --max-redirects <STR> Maximum number of redirects to follow (default: 128)") catch unreachable, clap.parseParam("-b, --body <STR> HTTP request body as a string") catch unreachable, clap.parseParam("-f, --file <STR> File path to load as body") catch unreachable, + clap.parseParam("-q, --quiet Quiet mode") catch unreachable, clap.parseParam("--no-gzip Disable gzip") catch unreachable, clap.parseParam("--no-deflate Disable deflate") catch unreachable, clap.parseParam("--no-compression Disable gzip & deflate") catch unreachable, @@ -70,6 +71,7 @@ pub const Arguments = struct { headers_buf: string, body: string = "", turbo: bool = false, + quiet: bool = false, pub fn parse(allocator: std.mem.Allocator) !Arguments { var diag = clap.Diagnostic{}; @@ -159,6 +161,7 @@ pub const Arguments = struct { .headers_buf = "", .body = body_string, .turbo = args.flag("--turbo"), + .quiet = args.flag("--quiet"), }; } }; @@ -213,7 +216,7 @@ pub fn main() anyerror!void { while (true) { while (channel.tryReadItem() catch null) |http| { var response = http.response orelse { - Output.printErrorln("<r><red>error<r><d>:<r> <b>HTTP response missing<r>", .{}); + Output.prettyErrorln("<r><red>error<r><d>:<r> <b>HTTP response missing<r>", .{}); Output.flush(); std.os.exit(1); }; @@ -227,10 +230,12 @@ pub fn main() anyerror!void { }, } - Output.flush(); - Output.disableBuffering(); - try Output.writer().writeAll(response_body_string.list.items); - Output.enableBuffering(); + if (!args.quiet) { + Output.flush(); + Output.disableBuffering(); + try Output.writer().writeAll(response_body_string.list.items); + Output.enableBuffering(); + } return; } } diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index 765c841e0..2dfde8968 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -356,6 +356,7 @@ pub const Bio = struct { } var this = cast(this_bio); + assert(len_ < buffer_pool_len); var socket_recv_len = this.socket_recv_len; var bio_read_offset = this.bio_read_offset; @@ -408,8 +409,20 @@ pub const Bio = struct { if (len__ > @truncate(u32, bytes.len)) { if (this.pending_reads == 0) { + // if this is true, we will never have enough space + if (socket_recv_len_ + len__ >= buffer_pool_len and len_ < buffer_pool_len) { + const unread = socket_recv_len_ - bio_read_offset; + // TODO: can we use virtual memory magic to skip the copy here? + std.mem.copyBackwards(u8, this.recv_buffer.?.data[0..unread], bytes); + bio_read_offset = 0; + this.bio_read_offset = 0; + this.socket_recv_len = @intCast(c_int, unread); + socket_recv_len = this.socket_recv_len; + bytes = this.recv_buffer.?.data[0..unread]; + } + this.pending_reads += 1; - this.scheduleSocketRead(len); + this.scheduleSocketRead(@maximum(len__ - @truncate(u32, bytes.len), 1)); } boring.BIO_set_retry_read(this_bio); diff --git a/src/http/async_message.zig b/src/http/async_message.zig index 5fce3488b..3e204455f 100644 --- a/src/http/async_message.zig +++ b/src/http/async_message.zig @@ -2,7 +2,7 @@ const std = @import("std"); const ObjectPool = @import("../pool.zig").ObjectPool; const AsyncIO = @import("io"); -pub const buffer_pool_len = std.math.maxInt(u16) - 64; +pub const buffer_pool_len = std.math.maxInt(u16); pub const BufferPoolBytes = [buffer_pool_len]u8; pub const BufferPool = ObjectPool(BufferPoolBytes, null, false, 4); diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig index 062a645de..db4964fc0 100644 --- a/src/http/async_socket.zig +++ b/src/http/async_socket.zig @@ -252,6 +252,10 @@ const Reader = struct { } }; +pub inline fn bufferedReadAmount(_: *AsyncSocket) usize { + return 0; +} + pub fn read( this: *AsyncSocket, bytes: []u8, @@ -448,17 +452,19 @@ pub const SSL = struct { if (this.pending_read_buffer.len > 0) { reader: { - var count: u32 = 0; + var count: u32 = this.pending_read_result catch unreachable; this.pending_read_result = this.doPayloadRead(this.pending_read_buffer, &count) catch |err| brk: { + this.pending_read_result = count; + if (err == error.WouldBlock) { - // partial reads are a success case - // allow the client to ask for more - if (count > 0) { - this.pending_read_result = count; - this.read_frame.maybeResume(); - break :reader; - } - this.pending_read_buffer = this.pending_read_buffer[count..]; + + // // partial reads are a success case + // // allow the client to ask for more + // if (count > 0) { + // this.read_frame.maybeResume(); + // break :reader; + // } + break :reader; } break :brk err; @@ -497,7 +503,7 @@ pub const SSL = struct { return pending; } - var total_bytes_read: u32 = 0; + var total_bytes_read: u32 = count.*; var ssl_ret: c_int = 0; var ssl_err: c_int = 0; const buf_len = @truncate(u32, buffer.len); @@ -663,6 +669,14 @@ pub const SSL = struct { return this.unencrypted_bytes_to_send.?.writeAll(buffer_).written; } + pub fn bufferedReadAmount(this: *SSL) usize { + const pend = boring.SSL_pending(this.ssl); + return if (pend <= 0) + 0 + else + @intCast(usize, pend); + } + pub fn send(this: *SSL) anyerror!usize { this.unencrypted_bytes_to_send.?.sent = 0; this.pending_write_buffer = this.unencrypted_bytes_to_send.?.buf[this.unencrypted_bytes_to_send.?.sent..this.unencrypted_bytes_to_send.?.used]; @@ -713,18 +727,18 @@ pub const SSL = struct { pub fn read(this: *SSL, buf_: []u8, offset: u64) !u32 { var buf = buf_[offset..]; var read_bytes: u32 = 0; + this.pending_read_result = 0; return this.doPayloadRead(buf, &read_bytes) catch |err| { if (err == error.WouldBlock) { - this.pending_read_result = 0; - this.pending_read_buffer = buf[read_bytes..]; + this.pending_read_result = (this.pending_read_result catch unreachable) + read_bytes; + this.pending_read_buffer = buf; suspend { this.read_frame.set(@frame()); } const result = this.pending_read_result; this.pending_read_result = 0; - this.pending_read_buffer = &[_]u8{}; return result; } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index d81d0bf4e..171693ace 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -804,6 +804,14 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti var last_read: usize = 0; { + const buffered_amount = client.bufferedReadAmount(); + if (buffered_amount > 0) { + var end = request_buffer[read_length..]; + if (buffered_amount <= end.len) { + std.debug.assert(client.read(end, buffered_amount) catch unreachable == buffered_amount); + response.bytes_read += @intCast(c_int, buffered_amount); + } + } var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length]; last_read = remainder.len; try buffer.inflate(std.math.max(remainder.len, 2048)); @@ -824,20 +832,86 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti var total_size = rsize; while (pret == -2) { - if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { - try buffer.inflate(std.math.max(total_size * 2, 1024)); + var buffered_amount = client.bufferedReadAmount(); + if (buffer.list.items.len < total_size + 512 or buffer.list.items[total_size..].len < @intCast(usize, @maximum(decoder.bytes_left_in_chunk, buffered_amount)) or buffer.list.items[total_size..].len < 512) { + try buffer.inflate(std.math.max((buffered_amount + total_size) * 2, 1024)); + if (comptime Environment.isDebug) { + var temp_buffer = buffer; + temp_buffer.list.expandToCapacity(); + @memset(temp_buffer.list.items.ptr + buffer.list.items.len, 0, temp_buffer.list.items.len - buffer.list.items.len); + buffer = temp_buffer; + } buffer.list.expandToCapacity(); } - rret = try client.read(buffer.list.items, total_size); + // while (true) { + + if (extremely_verbose) { + Output.prettyErrorln( + \\ Buffered: {d} + \\ Chunk + \\ {d} left / {d} bytes total (buffer: {d}) + \\ Read + \\ {d} bytes / {d} total ({d} parsed) + , .{ + client.bufferedReadAmount(), + decoder.bytes_left_in_chunk, + total_size, + buffer.list.items.len, + rret, + total_size, + total_size, + }); + } + + var remainder = buffer.list.items[total_size..]; + const errorable_read = client.read(remainder, 0); + + rret = errorable_read catch |err| { + if (extremely_verbose) Output.prettyErrorln("Chunked transfoer encoding error: {s}", .{@errorName(err)}); + return err; + }; + + buffered_amount = client.bufferedReadAmount(); + if (buffered_amount > 0) { + try buffer.list.ensureTotalCapacity(default_allocator, rret + total_size + buffered_amount); + buffer.list.expandToCapacity(); + remainder = buffer.list.items[total_size..]; + remainder = remainder[rret..][0..buffered_amount]; + rret += client.read(remainder, 0) catch |err| { + if (extremely_verbose) Output.prettyErrorln("Chunked transfoer encoding error: {s}", .{@errorName(err)}); + return err; + }; + } if (rret == 0) { + if (extremely_verbose) Output.prettyErrorln("Unexpected 0", .{}); + return error.ChunkedEncodingError; } rsize = rret; pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); - if (pret == -1) return error.ChunkedEncodingParseError; + if (pret == -1) { + if (extremely_verbose) + Output.prettyErrorln( + \\ buffered: {d} + \\ rsize: {d} + \\ Read: {d} bytes / {d} total ({d} parsed) + \\ Chunk {d} left + \\ {} + , .{ + client.bufferedReadAmount(), + rsize, + rret, + buffer.list.items.len, + total_size, + decoder.bytes_left_in_chunk, + + decoder, + }); + return error.ChunkedEncodingParseError; + } total_size += rsize; |