diff options
| author | 2022-02-02 18:07:05 -0800 | |
|---|---|---|
| committer | 2022-02-02 18:07:05 -0800 | |
| commit | 1a4ccd3f5c2ef273bd5307d9c8939177bdc310cc (patch) | |
| tree | 1b1634fa2d1450b87846de1d1dc572ba44d1f333 | |
| parent | 68cb6130d3e4bd25a53c959db9108a68f5268298 (diff) | |
| download | bun-1a4ccd3f5c2ef273bd5307d9c8939177bdc310cc.tar.gz bun-1a4ccd3f5c2ef273bd5307d9c8939177bdc310cc.tar.zst bun-1a4ccd3f5c2ef273bd5307d9c8939177bdc310cc.zip | |
[http] Fixes to chunked transfer encoding readerjarred/async_bio
| -rw-r--r-- | misctools/fetch.zig | 15 | ||||
| -rw-r--r-- | src/http/async_bio.zig | 15 | ||||
| -rw-r--r-- | src/http/async_message.zig | 2 | ||||
| -rw-r--r-- | src/http/async_socket.zig | 40 | ||||
| -rw-r--r-- | src/http_client_async.zig | 82 | 
5 files changed, 130 insertions, 24 deletions
| diff --git a/misctools/fetch.zig b/misctools/fetch.zig index 53a7f9499..2167429e0 100644 --- a/misctools/fetch.zig +++ b/misctools/fetch.zig @@ -31,6 +31,7 @@ const params = [_]clap.Param(clap.Help){      clap.parseParam("-r, --max-redirects <STR>  Maximum number of redirects to follow (default: 128)") catch unreachable,      clap.parseParam("-b, --body <STR>           HTTP request body as a string") catch unreachable,      clap.parseParam("-f, --file <STR>           File path to load as body") catch unreachable, +    clap.parseParam("-q, --quiet  Quiet mode") catch unreachable,      clap.parseParam("--no-gzip                  Disable gzip") catch unreachable,      clap.parseParam("--no-deflate               Disable deflate") catch unreachable,      clap.parseParam("--no-compression           Disable gzip & deflate") catch unreachable, @@ -70,6 +71,7 @@ pub const Arguments = struct {      headers_buf: string,      body: string = "",      turbo: bool = false, +    quiet: bool = false,      pub fn parse(allocator: std.mem.Allocator) !Arguments {          var diag = clap.Diagnostic{}; @@ -159,6 +161,7 @@ pub const Arguments = struct {              .headers_buf = "",              .body = body_string,              .turbo = args.flag("--turbo"), +            .quiet = args.flag("--quiet"),          };      }  }; @@ -213,7 +216,7 @@ pub fn main() anyerror!void {      while (true) {          while (channel.tryReadItem() catch null) |http| {              var response = http.response orelse { -                Output.printErrorln("<r><red>error<r><d>:<r> <b>HTTP response missing<r>", .{}); +                Output.prettyErrorln("<r><red>error<r><d>:<r> <b>HTTP response missing<r>", .{});                  Output.flush();                  std.os.exit(1);              }; @@ -227,10 +230,12 @@ pub fn main() anyerror!void {                  },              } -            Output.flush(); -            Output.disableBuffering(); -            try Output.writer().writeAll(response_body_string.list.items); -            Output.enableBuffering(); +            if (!args.quiet) { +                Output.flush(); +                Output.disableBuffering(); +                try Output.writer().writeAll(response_body_string.list.items); +                Output.enableBuffering(); +            }              return;          }      } diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index 765c841e0..2dfde8968 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -356,6 +356,7 @@ pub const Bio = struct {          }          var this = cast(this_bio); +        assert(len_ < buffer_pool_len);          var socket_recv_len = this.socket_recv_len;          var bio_read_offset = this.bio_read_offset; @@ -408,8 +409,20 @@ pub const Bio = struct {          if (len__ > @truncate(u32, bytes.len)) {              if (this.pending_reads == 0) { +                // if this is true, we will never have enough space +                if (socket_recv_len_ + len__ >= buffer_pool_len and len_ < buffer_pool_len) { +                    const unread = socket_recv_len_ - bio_read_offset; +                    // TODO: can we use virtual memory magic to skip the copy here? +                    std.mem.copyBackwards(u8, this.recv_buffer.?.data[0..unread], bytes); +                    bio_read_offset = 0; +                    this.bio_read_offset = 0; +                    this.socket_recv_len = @intCast(c_int, unread); +                    socket_recv_len = this.socket_recv_len; +                    bytes = this.recv_buffer.?.data[0..unread]; +                } +                  this.pending_reads += 1; -                this.scheduleSocketRead(len); +                this.scheduleSocketRead(@maximum(len__ - @truncate(u32, bytes.len), 1));              }              boring.BIO_set_retry_read(this_bio); diff --git a/src/http/async_message.zig b/src/http/async_message.zig index 5fce3488b..3e204455f 100644 --- a/src/http/async_message.zig +++ b/src/http/async_message.zig @@ -2,7 +2,7 @@ const std = @import("std");  const ObjectPool = @import("../pool.zig").ObjectPool;  const AsyncIO = @import("io"); -pub const buffer_pool_len = std.math.maxInt(u16) - 64; +pub const buffer_pool_len = std.math.maxInt(u16);  pub const BufferPoolBytes = [buffer_pool_len]u8;  pub const BufferPool = ObjectPool(BufferPoolBytes, null, false, 4); diff --git a/src/http/async_socket.zig b/src/http/async_socket.zig index 062a645de..db4964fc0 100644 --- a/src/http/async_socket.zig +++ b/src/http/async_socket.zig @@ -252,6 +252,10 @@ const Reader = struct {      }  }; +pub inline fn bufferedReadAmount(_: *AsyncSocket) usize { +    return 0; +} +  pub fn read(      this: *AsyncSocket,      bytes: []u8, @@ -448,17 +452,19 @@ pub const SSL = struct {          if (this.pending_read_buffer.len > 0) {              reader: { -                var count: u32 = 0; +                var count: u32 = this.pending_read_result catch unreachable;                  this.pending_read_result = this.doPayloadRead(this.pending_read_buffer, &count) catch |err| brk: { +                    this.pending_read_result = count; +                      if (err == error.WouldBlock) { -                        // partial reads are a success case -                        // allow the client to ask for more -                        if (count > 0) { -                            this.pending_read_result = count; -                            this.read_frame.maybeResume(); -                            break :reader; -                        } -                        this.pending_read_buffer = this.pending_read_buffer[count..]; + +                        // // partial reads are a success case +                        // // allow the client to ask for more +                        // if (count > 0) { +                        //     this.read_frame.maybeResume(); +                        //     break :reader; +                        // } +                          break :reader;                      }                      break :brk err; @@ -497,7 +503,7 @@ pub const SSL = struct {              return pending;          } -        var total_bytes_read: u32 = 0; +        var total_bytes_read: u32 = count.*;          var ssl_ret: c_int = 0;          var ssl_err: c_int = 0;          const buf_len = @truncate(u32, buffer.len); @@ -663,6 +669,14 @@ pub const SSL = struct {          return this.unencrypted_bytes_to_send.?.writeAll(buffer_).written;      } +    pub fn bufferedReadAmount(this: *SSL) usize { +        const pend = boring.SSL_pending(this.ssl); +        return if (pend <= 0) +            0 +        else +            @intCast(usize, pend); +    } +      pub fn send(this: *SSL) anyerror!usize {          this.unencrypted_bytes_to_send.?.sent = 0;          this.pending_write_buffer = this.unencrypted_bytes_to_send.?.buf[this.unencrypted_bytes_to_send.?.sent..this.unencrypted_bytes_to_send.?.used]; @@ -713,18 +727,18 @@ pub const SSL = struct {      pub fn read(this: *SSL, buf_: []u8, offset: u64) !u32 {          var buf = buf_[offset..];          var read_bytes: u32 = 0; +        this.pending_read_result = 0;          return this.doPayloadRead(buf, &read_bytes) catch |err| {              if (err == error.WouldBlock) { -                this.pending_read_result = 0; -                this.pending_read_buffer = buf[read_bytes..]; +                this.pending_read_result = (this.pending_read_result catch unreachable) + read_bytes; +                this.pending_read_buffer = buf;                  suspend {                      this.read_frame.set(@frame());                  }                  const result = this.pending_read_result;                  this.pending_read_result = 0; -                this.pending_read_buffer = &[_]u8{};                  return result;              } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index d81d0bf4e..171693ace 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -804,6 +804,14 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti              var last_read: usize = 0;              { +                const buffered_amount = client.bufferedReadAmount(); +                if (buffered_amount > 0) { +                    var end = request_buffer[read_length..]; +                    if (buffered_amount <= end.len) { +                        std.debug.assert(client.read(end, buffered_amount) catch unreachable == buffered_amount); +                        response.bytes_read += @intCast(c_int, buffered_amount); +                    } +                }                  var remainder = request_buffer[@intCast(usize, response.bytes_read)..read_length];                  last_read = remainder.len;                  try buffer.inflate(std.math.max(remainder.len, 2048)); @@ -824,20 +832,86 @@ pub fn processResponse(this: *HTTPClient, comptime report_progress: bool, compti              var total_size = rsize;              while (pret == -2) { -                if (buffer.list.items[total_size..].len < @intCast(usize, decoder.bytes_left_in_chunk) or buffer.list.items[total_size..].len < 512) { -                    try buffer.inflate(std.math.max(total_size * 2, 1024)); +                var buffered_amount = client.bufferedReadAmount(); +                if (buffer.list.items.len < total_size + 512 or buffer.list.items[total_size..].len < @intCast(usize, @maximum(decoder.bytes_left_in_chunk, buffered_amount)) or buffer.list.items[total_size..].len < 512) { +                    try buffer.inflate(std.math.max((buffered_amount + total_size) * 2, 1024)); +                    if (comptime Environment.isDebug) { +                        var temp_buffer = buffer; +                        temp_buffer.list.expandToCapacity(); +                        @memset(temp_buffer.list.items.ptr + buffer.list.items.len, 0, temp_buffer.list.items.len - buffer.list.items.len); +                        buffer = temp_buffer; +                    }                      buffer.list.expandToCapacity();                  } -                rret = try client.read(buffer.list.items, total_size); +                // while (true) { + +                if (extremely_verbose) { +                    Output.prettyErrorln( +                        \\  Buffered: {d} +                        \\   Chunk +                        \\    {d} left / {d} bytes total (buffer: {d}) +                        \\   Read +                        \\    {d} bytes / {d} total ({d} parsed) +                    , .{ +                        client.bufferedReadAmount(), +                        decoder.bytes_left_in_chunk, +                        total_size, +                        buffer.list.items.len, +                        rret, +                        total_size, +                        total_size, +                    }); +                } + +                var remainder = buffer.list.items[total_size..]; +                const errorable_read = client.read(remainder, 0); + +                rret = errorable_read catch |err| { +                    if (extremely_verbose) Output.prettyErrorln("Chunked transfoer encoding error: {s}", .{@errorName(err)}); +                    return err; +                }; + +                buffered_amount = client.bufferedReadAmount(); +                if (buffered_amount > 0) { +                    try buffer.list.ensureTotalCapacity(default_allocator, rret + total_size + buffered_amount); +                    buffer.list.expandToCapacity(); +                    remainder = buffer.list.items[total_size..]; +                    remainder = remainder[rret..][0..buffered_amount]; +                    rret += client.read(remainder, 0) catch |err| { +                        if (extremely_verbose) Output.prettyErrorln("Chunked transfoer encoding error: {s}", .{@errorName(err)}); +                        return err; +                    }; +                }                  if (rret == 0) { +                    if (extremely_verbose) Output.prettyErrorln("Unexpected 0", .{}); +                      return error.ChunkedEncodingError;                  }                  rsize = rret;                  pret = picohttp.phr_decode_chunked(&decoder, buffer.list.items[total_size..].ptr, &rsize); -                if (pret == -1) return error.ChunkedEncodingParseError; +                if (pret == -1) { +                    if (extremely_verbose) +                        Output.prettyErrorln( +                            \\ buffered: {d}  +                            \\ rsize: {d} +                            \\ Read: {d} bytes / {d} total ({d} parsed) +                            \\ Chunk {d} left +                            \\ {} +                        , .{ +                            client.bufferedReadAmount(), +                            rsize, +                            rret, +                            buffer.list.items.len, +                            total_size, +                            decoder.bytes_left_in_chunk, + +                            decoder, +                        }); +                    return error.ChunkedEncodingParseError; +                }                  total_size += rsize; | 
