aboutsummaryrefslogtreecommitdiff
path: root/src/http_client_async.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/http_client_async.zig')
-rw-r--r--src/http_client_async.zig51
1 files changed, 36 insertions, 15 deletions
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 15e29f345..65ffbee62 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -68,6 +68,10 @@ pub const Signals = struct {
aborted: ?*std.atomic.Atomic(bool) = null,
cert_errors: ?*std.atomic.Atomic(bool) = null,
+ pub fn isEmpty(this: *const Signals) bool {
+ return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
+ }
+
pub const Store = struct {
header_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
body_streaming: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
@@ -1339,7 +1343,7 @@ pub const InternalState = struct {
return this.transfer_encoding == Encoding.chunked;
}
- pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void {
+ pub fn reset(this: *InternalState, buffering: bool, allocator: std.mem.Allocator) void {
this.compressed_body.deinit();
this.response_message_buffer.deinit();
@@ -1350,9 +1354,12 @@ pub const InternalState = struct {
reader.deinit();
}
- // if we are holding a cloned_metadata we need to deinit it
- // this should never happen because we should always return the metadata to the user
- std.debug.assert(this.cloned_metadata == null);
+ if (!buffering) {
+ // if we are holding a cloned_metadata we need to deinit it
+ // this should never happen because we should always return the metadata to the user
+ std.debug.assert(this.cloned_metadata == null);
+ }
+
// just in case we check and free to avoid leaks
if (this.cloned_metadata != null) {
this.cloned_metadata.?.deinit(allocator);
@@ -2179,7 +2186,7 @@ pub fn doRedirect(this: *HTTPClient) void {
this.fail(error.TooManyRedirects);
return;
}
- this.state.reset(this.allocator);
+ this.state.reset(this.signals.isEmpty(), this.allocator);
// also reset proxy to redirect
this.proxy_tunneling = false;
if (this.proxy_tunnel != null) {
@@ -2673,7 +2680,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
var deferred_redirect: ?*URLBufferPool.Node = null;
- const can_continue = this.handleResponseMetadata(
+ const should_continue = this.handleResponseMetadata(
&response,
// If there are multiple consecutive redirects
// and the redirect differs in hostname
@@ -2720,8 +2727,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
this.state.pending_response = response;
}
- if (!can_continue) {
- log("onData: can_continue is false", .{});
+ if (should_continue == .finished) {
// this means that the request ended
// clone metadata and return the progress at this point
this.cloneMetadata();
@@ -2901,7 +2907,7 @@ fn fail(this: *HTTPClient, err: anyerror) void {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
- this.state.reset(this.allocator);
+ this.state.reset(this.signals.isEmpty(), this.allocator);
this.proxy_tunneling = false;
this.state.request_stage = .fail;
@@ -2987,7 +2993,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
socket.close(0, null);
}
- this.state.reset(this.allocator);
+ this.state.reset(this.signals.isEmpty(), this.allocator);
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
@@ -3355,13 +3361,19 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
unreachable;
}
+const ShouldContinue = enum {
+ continue_streaming,
+ finished,
+};
+
pub fn handleResponseMetadata(
this: *HTTPClient,
response: *picohttp.Response,
deferred_redirect: *?*URLBufferPool.Node,
-) !bool {
+) !ShouldContinue {
var location: string = "";
var pretend_304 = false;
+ var is_server_sent_events = false;
for (response.headers, 0..) |header, header_i| {
switch (hashHeaderName(header.name)) {
hashHeaderConst("Content-Length") => {
@@ -3373,6 +3385,11 @@ pub fn handleResponseMetadata(
this.state.content_length = 0;
}
},
+ hashHeaderConst("Content-Type") => {
+ if (strings.contains(header.value, "text/event-stream")) {
+ is_server_sent_events = true;
+ }
+ },
hashHeaderConst("Content-Encoding") => {
if (!this.disable_decompression) {
if (strings.eqlComptime(header.value, "gzip")) {
@@ -3429,8 +3446,8 @@ pub fn handleResponseMetadata(
if (this.proxy_tunneling and this.proxy_tunnel == null) {
if (response.status_code == 200) {
- //signal to continue the proxing
- return true;
+ // signal to continue the proxing
+ return ShouldContinue.finished;
}
//proxy denied connection so return proxy result (407, 403 etc)
@@ -3623,6 +3640,10 @@ pub fn handleResponseMetadata(
} else {
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
}
- // if no body is expected we should stop processing
- return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);
+
+ if (this.method.hasBody() and ((content_length != null and content_length.? > 0) or !this.state.allow_keepalive or this.state.transfer_encoding == .chunked or is_server_sent_events)) {
+ return ShouldContinue.continue_streaming;
+ } else {
+ return ShouldContinue.finished;
+ }
}