diff options
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r-- | src/http_client_async.zig | 194 |
1 files changed, 106 insertions, 88 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 3b1982d9a..4f7647b4e 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -1018,7 +1018,17 @@ pub const HTTPStage = enum { pub const InternalState = struct { response_message_buffer: MutableString = undefined, - pending_response: picohttp.Response = undefined, + /// pending response is the temporary storage for the response headers, url and status code + /// this uses shared_response_headers_buf to store the headers + /// this will be turned null once the metadata is cloned + pending_response: ?picohttp.Response = null, + + /// This is the cloned metadata containing the response headers, url and status code after the .headers phase are received + /// will be turned null once returned to the user (the ownership is transferred to the user) + /// this can happen after await fetch(...) and the body can continue streaming when this is already null + /// the user will receive only chunks of the body stored in body_out_str + cloned_metadata: ?HTTPResponseMetadata = null, + allow_keepalive: bool = true, received_last_chunk: bool = false, transfer_encoding: Encoding = Encoding.identity, @@ -1027,6 +1037,7 @@ pub const InternalState = struct { chunked_decoder: picohttp.phr_chunked_decoder = .{}, zlib_reader: ?*Zlib.ZlibReaderArrayList = null, stage: Stage = Stage.pending, + /// This is owned by the user and should not be freed here body_out_str: ?*MutableString = null, compressed_body: MutableString = undefined, content_length: ?usize = null, @@ -1037,7 +1048,6 @@ pub const InternalState = struct { fail: anyerror = error.NoError, request_stage: HTTPStage = .pending, response_stage: HTTPStage = .pending, - metadata_sent: bool = false, pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState { return .{ @@ -1047,7 +1057,7 @@ pub const InternalState = struct { .response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} }, .body_out_str = body_out_str, .stage = Stage.pending, - .pending_response = picohttp.Response{}, + .pending_response = null, }; } @@ -1055,7 +1065,7 @@ pub const InternalState = struct { return this.transfer_encoding == Encoding.chunked; } - pub fn reset(this: *InternalState) void { + pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void { this.compressed_body.deinit(); this.response_message_buffer.deinit(); @@ -1066,6 +1076,15 @@ pub const InternalState = struct { reader.deinit(); } + // if we are holding a cloned_metadata we need to deinit it + // this should never happen because we should always return the metadata to the user + std.debug.assert(this.cloned_metadata == null); + // just in case we check and free to avoid leaks + if (this.cloned_metadata != null) { + this.cloned_metadata.?.deinit(allocator); + this.cloned_metadata = null; + } + this.* = .{ .body_out_str = body_msg, .compressed_body = MutableString{ .allocator = default_allocator, .list = .{} }, @@ -1175,23 +1194,6 @@ pub const InternalState = struct { }, } - return this.postProcessBody(); - } - - pub fn postProcessBody(this: *InternalState) usize { - - // we only touch it if we did not sent the headers yet - if (!this.metadata_sent) { - var response = &this.pending_response; - if (this.content_encoding_i < response.headers.len) { - // if it compressed with this header, it is no longer - var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; - _ = mutable_headers.orderedRemove(this.content_encoding_i); - response.headers = mutable_headers.items; - this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i)); - } - } - return this.body_out_str.?.list.items.len; } }; @@ -1226,7 +1228,6 @@ force_last_modified: bool = false, if_modified_since: string = "", request_content_len_buf: ["-4294967295".len]u8 = undefined, -cloned_metadata: HTTPResponseMetadata = .{}, http_proxy: ?URL = null, proxy_authorization: ?[]u8 = null, proxy_tunneling: bool = false, @@ -1863,6 +1864,7 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { } pub fn doRedirect(this: *HTTPClient) void { + std.debug.assert(this.state.cloned_metadata == null); var body_out_str = this.state.body_out_str.?; this.remaining_redirect_count -|= 1; std.debug.assert(this.redirect_type == FetchRedirect.follow); @@ -1871,7 +1873,7 @@ pub fn doRedirect(this: *HTTPClient) void { this.fail(error.TooManyRedirects); return; } - this.state.reset(); + this.state.reset(this.allocator); // also reset proxy to redirect this.proxy_tunneling = false; if (this.proxy_tunnel != null) { @@ -1930,10 +1932,18 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void { const Task = ThreadPool.Task; -const HTTPResponseMetadata = struct { +pub const HTTPResponseMetadata = struct { url: []const u8 = "", owned_buf: []u8 = "", response: picohttp.Response = .{}, + pub fn deinit(this: *HTTPResponseMetadata, allocator: std.mem.Allocator) void { + if (this.owned_buf.len > 0) allocator.free(this.owned_buf); + if (this.response.headers.len > 0) allocator.free(this.response.headers); + this.owned_buf = &.{}; + this.url = ""; + this.response.headers = &.{}; + this.response.status = ""; + } }; fn printRequest(request: picohttp.Request) void { @@ -2269,7 +2279,7 @@ fn retryProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP this.startProxySendHeaders(is_ssl, socket); } fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - this.state.reset(); + this.state.reset(this.allocator); this.state.response_stage = .proxy_handshake; this.state.request_stage = .proxy_handshake; const proxy = ProxyTunnel.init(is_ssl, this, socket); @@ -2309,9 +2319,10 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u needs_move = false; } + // we reset the pending_response each time wich means that on parse error this will be always be empty this.state.pending_response = picohttp.Response{}; - const response = picohttp.Response.parseParts( + var response = picohttp.Response.parseParts( to_read, &shared_response_headers_buf, &amount_read, @@ -2336,13 +2347,14 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; }; + // we save the successful parsed response this.state.pending_response = response; var body_buf = to_read[@min(@as(usize, @intCast(response.bytes_read)), to_read.len)..]; var deferred_redirect: ?*URLBufferPool.Node = null; const can_continue = this.handleResponseMetadata( - response, + &response, // If there are multiple consecutive redirects // and the redirect differs in hostname // the new URL buffer may point to invalid memory after @@ -2378,9 +2390,20 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; }; - this.cloneMetadata(); + if (this.state.content_encoding_i < response.headers.len) { + // if it compressed with this header, it is no longer because we will decompress it + var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; + _ = mutable_headers.orderedRemove(this.state.content_encoding_i); + response.headers = mutable_headers.items; + this.state.content_encoding_i = std.math.maxInt(@TypeOf(this.state.content_encoding_i)); + // we need to reset the pending response because we removed a header + this.state.pending_response = response; + } if (!can_continue) { + // this means that the request ended + // clone metadata and return the progress at this point + this.cloneMetadata(); // if is chuncked but no body is expected we mark the last chunk this.state.received_last_chunk = true; // if is not we ignore the content_length @@ -2390,10 +2413,14 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } if (this.proxy_tunneling and this.proxy_tunnel == null) { + // we are proxing we dont need to cloneMetadata yet this.startProxyHandshake(is_ssl, socket); return; } + // we have body data incoming so we clone metadata and keep going + this.cloneMetadata(); + if (body_buf.len == 0) { // no body data yet, but we can report the headers if (this.signals.get(.header_progress)) { @@ -2537,7 +2564,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; }, else => { - this.state.pending_response = .{}; + this.state.pending_response = null; this.closeAndFail(error.UnexpectedData, is_ssl, socket); return; }, @@ -2558,8 +2585,8 @@ fn fail(this: *HTTPClient, err: anyerror) void { this.state.stage = .fail; const callback = this.result_callback; - const result = this.toResult(this.cloned_metadata); - this.state.reset(); + const result = this.toResult(); + this.state.reset(this.allocator); this.proxy_tunneling = false; callback.run(result); @@ -2567,22 +2594,38 @@ fn fail(this: *HTTPClient, err: anyerror) void { // We have to clone metadata immediately after use fn cloneMetadata(this: *HTTPClient) void { - var builder_ = StringBuilder{}; - var builder = &builder_; - this.state.pending_response.count(builder); - builder.count(this.url.href); - builder.allocate(this.allocator) catch unreachable; - var headers_buf = this.allocator.alloc(picohttp.Header, this.state.pending_response.headers.len) catch unreachable; - const response = this.state.pending_response.clone(headers_buf, builder); - - this.state.pending_response = response; - - const href = builder.append(this.url.href); - this.cloned_metadata = .{ - .owned_buf = builder.ptr.?[0..builder.cap], - .response = response, - .url = href, - }; + std.debug.assert(this.state.pending_response != null); + if (this.state.pending_response) |response| { + // we should never clone metadata twice + std.debug.assert(this.state.cloned_metadata == null); + // just in case we check and free + if (this.state.cloned_metadata != null) { + this.state.cloned_metadata.?.deinit(this.allocator); + this.state.cloned_metadata = null; + } + var builder_ = StringBuilder{}; + var builder = &builder_; + response.count(builder); + builder.count(this.url.href); + builder.allocate(this.allocator) catch unreachable; + // headers_buf is owned by the cloned_response (aka cloned_response.headers) + var headers_buf = this.allocator.alloc(picohttp.Header, response.headers.len) catch unreachable; + const cloned_response = response.clone(headers_buf, builder); + + // we clean the temporary response since cloned_metadata is now the owner + this.state.pending_response = null; + + const href = builder.append(this.url.href); + this.state.cloned_metadata = .{ + .owned_buf = builder.ptr.?[0..builder.cap], + .response = cloned_response, + .url = href, + }; + } else { + // we should never clone metadata that dont exists + // we added a empty metadata just in case but will hit the std.debug.assert + this.state.cloned_metadata = .{}; + } } pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void { @@ -2606,8 +2649,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon var out_str = this.state.body_out_str.?; var body = out_str.*; - this.cloned_metadata.response = this.state.pending_response; - const result = this.toResult(this.cloned_metadata); + const result = this.toResult(); const callback = this.result_callback; if (is_done) { @@ -2623,7 +2665,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon socket.close(0, null); } - this.state.reset(); + this.state.reset(this.allocator); this.state.response_stage = .done; this.state.request_stage = .done; this.state.stage = .done; @@ -2649,8 +2691,8 @@ pub const HTTPClientResult = struct { body: ?*MutableString = null, has_more: bool = false, fail: anyerror = error.NoError, - - metadata: ?ResultMetadata = null, + /// Owns the response metadata aka headers, url and status code + metadata: ?HTTPResponseMetadata = null, /// For Http Client requests /// when Content-Length is provided this represents the whole size of the request @@ -2659,23 +2701,6 @@ pub const HTTPClientResult = struct { body_size: BodySize = .unknown, redirected: bool = false, - pub const ResultMetadata = struct { - response: picohttp.Response = .{}, - metadata_buf: []u8 = &.{}, - href: []const u8 = "", - headers_buf: []picohttp.Header = &.{}, - - pub fn deinit(this: *ResultMetadata, allocator: std.mem.Allocator) void { - if (this.metadata_buf.len > 0) allocator.free(this.metadata_buf); - if (this.headers_buf.len > 0) allocator.free(this.headers_buf); - this.headers_buf = &.{}; - this.metadata_buf = &.{}; - this.href = ""; - this.response.headers = &.{}; - this.response.status = ""; - } - }; - pub const BodySize = union(enum) { total_received: usize, content_length: usize, @@ -2722,22 +2747,18 @@ pub const HTTPClientResult = struct { }; }; -pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult { +pub fn toResult(this: *HTTPClient) HTTPClientResult { const body_size: HTTPClientResult.BodySize = if (this.state.isChunkedEncoding()) .{ .total_received = this.state.total_body_received } else if (this.state.content_length) |content_length| .{ .content_length = content_length } else .{ .unknown = {} }; - if (!this.state.metadata_sent) { - this.state.metadata_sent = true; + if (this.state.cloned_metadata) |metadata| { + // transfer owner ship of the metadata here + this.state.cloned_metadata = null; return HTTPClientResult{ - .metadata = .{ - .response = metadata.response, - .metadata_buf = metadata.owned_buf, - .href = metadata.url, - .headers_buf = metadata.response.headers, - }, + .metadata = metadata, .body = this.state.body_out_str, .redirected = this.remaining_redirect_count != default_redirect_count, .fail = this.state.fail, @@ -2804,8 +2825,6 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const progress.setCompletedItems(incoming_data.len); progress.context.maybeRefresh(); } - - _ = this.state.postProcessBody(); } fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool { @@ -3002,7 +3021,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( pub fn handleResponseMetadata( this: *HTTPClient, - response: picohttp.Response, + response: *picohttp.Response, deferred_redirect: *?*URLBufferPool.Node, ) !bool { var location: string = ""; @@ -3015,7 +3034,7 @@ pub fn handleResponseMetadata( this.state.content_length = content_length; } else { // ignore body size for HEAD requests - this.state.content_length = content_length; + this.state.content_length = 0; } }, hashHeaderConst("Content-Encoding") => { @@ -3059,16 +3078,15 @@ pub fn handleResponseMetadata( } if (this.verbose) { - printResponse(response); + printResponse(response.*); } - this.state.pending_response = response; if (pretend_304) { - this.state.pending_response.status_code = 304; + response.status_code = 304; } if (this.proxy_tunneling and this.proxy_tunnel == null) { - if (this.state.pending_response.status_code == 200) { + if (response.status_code == 200) { //signal to continue the proxing return true; } @@ -3077,10 +3095,10 @@ pub fn handleResponseMetadata( this.proxy_tunneling = false; } - const is_redirect = this.state.pending_response.status_code >= 300 and this.state.pending_response.status_code <= 399; + const is_redirect = response.status_code >= 300 and response.status_code <= 399; if (is_redirect) { if (this.redirect_type == FetchRedirect.follow and location.len > 0 and this.remaining_redirect_count > 0) { - switch (this.state.pending_response.status_code) { + switch (response.status_code) { 302, 301, 307, 308, 303 => { if (strings.indexOf(location, "://")) |i| { var url_buf = URLBufferPool.get(default_allocator); |