aboutsummaryrefslogtreecommitdiff
path: root/src/http_client_async.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r--src/http_client_async.zig194
1 files changed, 106 insertions, 88 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 3b1982d9a..4f7647b4e 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -1018,7 +1018,17 @@ pub const HTTPStage = enum {
pub const InternalState = struct {
response_message_buffer: MutableString = undefined,
- pending_response: picohttp.Response = undefined,
+ /// pending response is the temporary storage for the response headers, url and status code
+ /// this uses shared_response_headers_buf to store the headers
+ /// this will be turned null once the metadata is cloned
+ pending_response: ?picohttp.Response = null,
+
+ /// This is the cloned metadata containing the response headers, url and status code after the .headers phase are received
+ /// will be turned null once returned to the user (the ownership is transferred to the user)
+ /// this can happen after await fetch(...) and the body can continue streaming when this is already null
+ /// the user will receive only chunks of the body stored in body_out_str
+ cloned_metadata: ?HTTPResponseMetadata = null,
+
allow_keepalive: bool = true,
received_last_chunk: bool = false,
transfer_encoding: Encoding = Encoding.identity,
@@ -1027,6 +1037,7 @@ pub const InternalState = struct {
chunked_decoder: picohttp.phr_chunked_decoder = .{},
zlib_reader: ?*Zlib.ZlibReaderArrayList = null,
stage: Stage = Stage.pending,
+ /// This is owned by the user and should not be freed here
body_out_str: ?*MutableString = null,
compressed_body: MutableString = undefined,
content_length: ?usize = null,
@@ -1037,7 +1048,6 @@ pub const InternalState = struct {
fail: anyerror = error.NoError,
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
- metadata_sent: bool = false,
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
return .{
@@ -1047,7 +1057,7 @@ pub const InternalState = struct {
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.body_out_str = body_out_str,
.stage = Stage.pending,
- .pending_response = picohttp.Response{},
+ .pending_response = null,
};
}
@@ -1055,7 +1065,7 @@ pub const InternalState = struct {
return this.transfer_encoding == Encoding.chunked;
}
- pub fn reset(this: *InternalState) void {
+ pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void {
this.compressed_body.deinit();
this.response_message_buffer.deinit();
@@ -1066,6 +1076,15 @@ pub const InternalState = struct {
reader.deinit();
}
+ // if we are holding a cloned_metadata we need to deinit it
+ // this should never happen because we should always return the metadata to the user
+ std.debug.assert(this.cloned_metadata == null);
+ // just in case we check and free to avoid leaks
+ if (this.cloned_metadata != null) {
+ this.cloned_metadata.?.deinit(allocator);
+ this.cloned_metadata = null;
+ }
+
this.* = .{
.body_out_str = body_msg,
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
@@ -1175,23 +1194,6 @@ pub const InternalState = struct {
},
}
- return this.postProcessBody();
- }
-
- pub fn postProcessBody(this: *InternalState) usize {
-
- // we only touch it if we did not sent the headers yet
- if (!this.metadata_sent) {
- var response = &this.pending_response;
- if (this.content_encoding_i < response.headers.len) {
- // if it compressed with this header, it is no longer
- var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
- _ = mutable_headers.orderedRemove(this.content_encoding_i);
- response.headers = mutable_headers.items;
- this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
- }
- }
-
return this.body_out_str.?.list.items.len;
}
};
@@ -1226,7 +1228,6 @@ force_last_modified: bool = false,
if_modified_since: string = "",
request_content_len_buf: ["-4294967295".len]u8 = undefined,
-cloned_metadata: HTTPResponseMetadata = .{},
http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunneling: bool = false,
@@ -1863,6 +1864,7 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
}
pub fn doRedirect(this: *HTTPClient) void {
+ std.debug.assert(this.state.cloned_metadata == null);
var body_out_str = this.state.body_out_str.?;
this.remaining_redirect_count -|= 1;
std.debug.assert(this.redirect_type == FetchRedirect.follow);
@@ -1871,7 +1873,7 @@ pub fn doRedirect(this: *HTTPClient) void {
this.fail(error.TooManyRedirects);
return;
}
- this.state.reset();
+ this.state.reset(this.allocator);
// also reset proxy to redirect
this.proxy_tunneling = false;
if (this.proxy_tunnel != null) {
@@ -1930,10 +1932,18 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
const Task = ThreadPool.Task;
-const HTTPResponseMetadata = struct {
+pub const HTTPResponseMetadata = struct {
url: []const u8 = "",
owned_buf: []u8 = "",
response: picohttp.Response = .{},
+ pub fn deinit(this: *HTTPResponseMetadata, allocator: std.mem.Allocator) void {
+ if (this.owned_buf.len > 0) allocator.free(this.owned_buf);
+ if (this.response.headers.len > 0) allocator.free(this.response.headers);
+ this.owned_buf = &.{};
+ this.url = "";
+ this.response.headers = &.{};
+ this.response.status = "";
+ }
};
fn printRequest(request: picohttp.Request) void {
@@ -2269,7 +2279,7 @@ fn retryProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
this.startProxySendHeaders(is_ssl, socket);
}
fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- this.state.reset();
+ this.state.reset(this.allocator);
this.state.response_stage = .proxy_handshake;
this.state.request_stage = .proxy_handshake;
const proxy = ProxyTunnel.init(is_ssl, this, socket);
@@ -2309,9 +2319,10 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
needs_move = false;
}
+ // we reset the pending_response each time wich means that on parse error this will be always be empty
this.state.pending_response = picohttp.Response{};
- const response = picohttp.Response.parseParts(
+ var response = picohttp.Response.parseParts(
to_read,
&shared_response_headers_buf,
&amount_read,
@@ -2336,13 +2347,14 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
};
+ // we save the successful parsed response
this.state.pending_response = response;
var body_buf = to_read[@min(@as(usize, @intCast(response.bytes_read)), to_read.len)..];
var deferred_redirect: ?*URLBufferPool.Node = null;
const can_continue = this.handleResponseMetadata(
- response,
+ &response,
// If there are multiple consecutive redirects
// and the redirect differs in hostname
// the new URL buffer may point to invalid memory after
@@ -2378,9 +2390,20 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
};
- this.cloneMetadata();
+ if (this.state.content_encoding_i < response.headers.len) {
+ // if it compressed with this header, it is no longer because we will decompress it
+ var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
+ _ = mutable_headers.orderedRemove(this.state.content_encoding_i);
+ response.headers = mutable_headers.items;
+ this.state.content_encoding_i = std.math.maxInt(@TypeOf(this.state.content_encoding_i));
+ // we need to reset the pending response because we removed a header
+ this.state.pending_response = response;
+ }
if (!can_continue) {
+ // this means that the request ended
+ // clone metadata and return the progress at this point
+ this.cloneMetadata();
// if is chuncked but no body is expected we mark the last chunk
this.state.received_last_chunk = true;
// if is not we ignore the content_length
@@ -2390,10 +2413,14 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
if (this.proxy_tunneling and this.proxy_tunnel == null) {
+ // we are proxing we dont need to cloneMetadata yet
this.startProxyHandshake(is_ssl, socket);
return;
}
+ // we have body data incoming so we clone metadata and keep going
+ this.cloneMetadata();
+
if (body_buf.len == 0) {
// no body data yet, but we can report the headers
if (this.signals.get(.header_progress)) {
@@ -2537,7 +2564,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
},
else => {
- this.state.pending_response = .{};
+ this.state.pending_response = null;
this.closeAndFail(error.UnexpectedData, is_ssl, socket);
return;
},
@@ -2558,8 +2585,8 @@ fn fail(this: *HTTPClient, err: anyerror) void {
this.state.stage = .fail;
const callback = this.result_callback;
- const result = this.toResult(this.cloned_metadata);
- this.state.reset();
+ const result = this.toResult();
+ this.state.reset(this.allocator);
this.proxy_tunneling = false;
callback.run(result);
@@ -2567,22 +2594,38 @@ fn fail(this: *HTTPClient, err: anyerror) void {
// We have to clone metadata immediately after use
fn cloneMetadata(this: *HTTPClient) void {
- var builder_ = StringBuilder{};
- var builder = &builder_;
- this.state.pending_response.count(builder);
- builder.count(this.url.href);
- builder.allocate(this.allocator) catch unreachable;
- var headers_buf = this.allocator.alloc(picohttp.Header, this.state.pending_response.headers.len) catch unreachable;
- const response = this.state.pending_response.clone(headers_buf, builder);
-
- this.state.pending_response = response;
-
- const href = builder.append(this.url.href);
- this.cloned_metadata = .{
- .owned_buf = builder.ptr.?[0..builder.cap],
- .response = response,
- .url = href,
- };
+ std.debug.assert(this.state.pending_response != null);
+ if (this.state.pending_response) |response| {
+ // we should never clone metadata twice
+ std.debug.assert(this.state.cloned_metadata == null);
+ // just in case we check and free
+ if (this.state.cloned_metadata != null) {
+ this.state.cloned_metadata.?.deinit(this.allocator);
+ this.state.cloned_metadata = null;
+ }
+ var builder_ = StringBuilder{};
+ var builder = &builder_;
+ response.count(builder);
+ builder.count(this.url.href);
+ builder.allocate(this.allocator) catch unreachable;
+ // headers_buf is owned by the cloned_response (aka cloned_response.headers)
+ var headers_buf = this.allocator.alloc(picohttp.Header, response.headers.len) catch unreachable;
+ const cloned_response = response.clone(headers_buf, builder);
+
+ // we clean the temporary response since cloned_metadata is now the owner
+ this.state.pending_response = null;
+
+ const href = builder.append(this.url.href);
+ this.state.cloned_metadata = .{
+ .owned_buf = builder.ptr.?[0..builder.cap],
+ .response = cloned_response,
+ .url = href,
+ };
+ } else {
+ // we should never clone metadata that dont exists
+ // we added a empty metadata just in case but will hit the std.debug.assert
+ this.state.cloned_metadata = .{};
+ }
}
pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void {
@@ -2606,8 +2649,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
var out_str = this.state.body_out_str.?;
var body = out_str.*;
- this.cloned_metadata.response = this.state.pending_response;
- const result = this.toResult(this.cloned_metadata);
+ const result = this.toResult();
const callback = this.result_callback;
if (is_done) {
@@ -2623,7 +2665,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
socket.close(0, null);
}
- this.state.reset();
+ this.state.reset(this.allocator);
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
@@ -2649,8 +2691,8 @@ pub const HTTPClientResult = struct {
body: ?*MutableString = null,
has_more: bool = false,
fail: anyerror = error.NoError,
-
- metadata: ?ResultMetadata = null,
+ /// Owns the response metadata aka headers, url and status code
+ metadata: ?HTTPResponseMetadata = null,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
@@ -2659,23 +2701,6 @@ pub const HTTPClientResult = struct {
body_size: BodySize = .unknown,
redirected: bool = false,
- pub const ResultMetadata = struct {
- response: picohttp.Response = .{},
- metadata_buf: []u8 = &.{},
- href: []const u8 = "",
- headers_buf: []picohttp.Header = &.{},
-
- pub fn deinit(this: *ResultMetadata, allocator: std.mem.Allocator) void {
- if (this.metadata_buf.len > 0) allocator.free(this.metadata_buf);
- if (this.headers_buf.len > 0) allocator.free(this.headers_buf);
- this.headers_buf = &.{};
- this.metadata_buf = &.{};
- this.href = "";
- this.response.headers = &.{};
- this.response.status = "";
- }
- };
-
pub const BodySize = union(enum) {
total_received: usize,
content_length: usize,
@@ -2722,22 +2747,18 @@ pub const HTTPClientResult = struct {
};
};
-pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult {
+pub fn toResult(this: *HTTPClient) HTTPClientResult {
const body_size: HTTPClientResult.BodySize = if (this.state.isChunkedEncoding())
.{ .total_received = this.state.total_body_received }
else if (this.state.content_length) |content_length|
.{ .content_length = content_length }
else
.{ .unknown = {} };
- if (!this.state.metadata_sent) {
- this.state.metadata_sent = true;
+ if (this.state.cloned_metadata) |metadata| {
+ // transfer owner ship of the metadata here
+ this.state.cloned_metadata = null;
return HTTPClientResult{
- .metadata = .{
- .response = metadata.response,
- .metadata_buf = metadata.owned_buf,
- .href = metadata.url,
- .headers_buf = metadata.response.headers,
- },
+ .metadata = metadata,
.body = this.state.body_out_str,
.redirected = this.remaining_redirect_count != default_redirect_count,
.fail = this.state.fail,
@@ -2804,8 +2825,6 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
progress.setCompletedItems(incoming_data.len);
progress.context.maybeRefresh();
}
-
- _ = this.state.postProcessBody();
}
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
@@ -3002,7 +3021,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
pub fn handleResponseMetadata(
this: *HTTPClient,
- response: picohttp.Response,
+ response: *picohttp.Response,
deferred_redirect: *?*URLBufferPool.Node,
) !bool {
var location: string = "";
@@ -3015,7 +3034,7 @@ pub fn handleResponseMetadata(
this.state.content_length = content_length;
} else {
// ignore body size for HEAD requests
- this.state.content_length = content_length;
+ this.state.content_length = 0;
}
},
hashHeaderConst("Content-Encoding") => {
@@ -3059,16 +3078,15 @@ pub fn handleResponseMetadata(
}
if (this.verbose) {
- printResponse(response);
+ printResponse(response.*);
}
- this.state.pending_response = response;
if (pretend_304) {
- this.state.pending_response.status_code = 304;
+ response.status_code = 304;
}
if (this.proxy_tunneling and this.proxy_tunnel == null) {
- if (this.state.pending_response.status_code == 200) {
+ if (response.status_code == 200) {
//signal to continue the proxing
return true;
}
@@ -3077,10 +3095,10 @@ pub fn handleResponseMetadata(
this.proxy_tunneling = false;
}
- const is_redirect = this.state.pending_response.status_code >= 300 and this.state.pending_response.status_code <= 399;
+ const is_redirect = response.status_code >= 300 and response.status_code <= 399;
if (is_redirect) {
if (this.redirect_type == FetchRedirect.follow and location.len > 0 and this.remaining_redirect_count > 0) {
- switch (this.state.pending_response.status_code) {
+ switch (response.status_code) {
302, 301, 307, 308, 303 => {
if (strings.indexOf(location, "://")) |i| {
var url_buf = URLBufferPool.get(default_allocator);