aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-09-26 23:31:20 -0300
committerGravatar GitHub <noreply@github.com> 2023-09-26 19:31:20 -0700
commit648d5aecf3980997c0672746374068033e632e1d (patch)
tree980fb11edcda91ad6d92a68c7f7525d4d8b28a10
parentdc55492698326a668d730970f16e61728b83bb1a (diff)
downloadbun-648d5aecf3980997c0672746374068033e632e1d.tar.gz
bun-648d5aecf3980997c0672746374068033e632e1d.tar.zst
bun-648d5aecf3980997c0672746374068033e632e1d.zip
fix server end of stream, fix fetch not streaming without content-length or chunked encoding, fix case when stream do not return a promise on pull (#6086)
-rw-r--r--packages/bun-uws/src/HttpResponse.h18
-rw-r--r--src/bun.js/api/server.zig20
-rw-r--r--src/bun.js/webcore/streams.zig1
-rw-r--r--src/deps/libuwsockets.cpp4
-rw-r--r--src/http_client_async.zig91
-rw-r--r--test/js/web/fetch/body-stream.test.ts29
6 files changed, 131 insertions, 32 deletions
diff --git a/packages/bun-uws/src/HttpResponse.h b/packages/bun-uws/src/HttpResponse.h
index d73f98152..11aad7bc0 100644
--- a/packages/bun-uws/src/HttpResponse.h
+++ b/packages/bun-uws/src/HttpResponse.h
@@ -439,6 +439,24 @@ public:
return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()};
}
+ /* Write the end of chunked encoded stream */
+ bool sendTerminatingChunk(bool closeConnection = false) {
+ writeStatus(HTTP_200_OK);
+ HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
+ if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
+ /* Write mark on first call to write */
+ writeMark();
+
+ writeHeader("Transfer-Encoding", "chunked");
+ httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
+ }
+
+ /* Terminating 0 chunk */
+ Super::write("\r\n0\r\n\r\n", 7);
+
+ return internalEnd({nullptr, 0}, 0, false, false, closeConnection);
+ }
+
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
bool write(std::string_view data) {
writeStatus(HTTP_200_OK);
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 14e636a2b..685267bfd 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1443,6 +1443,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
+ ctxLog("endStream", .{});
if (this.resp) |resp| {
if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false;
@@ -1729,6 +1730,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this: *RequestContext,
headers: *JSC.FetchHeaders,
) void {
+ ctxLog("writeHeaders", .{});
headers.fastRemove(.ContentLength);
headers.fastRemove(.TransferEncoding);
if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity);
@@ -2100,6 +2102,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
fn doRenderStream(pair: *StreamPair) void {
+ ctxLog("doRenderStream", .{});
var this = pair.this;
var stream = pair.stream;
if (this.resp == null or this.flags.aborted) {
@@ -2223,6 +2226,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
}
return;
+ } else {
+ // if is not a promise we treat it as Error
+ streamLog("returned an error", .{});
+ if (!this.flags.aborted) resp.clearAborted();
+ response_stream.detach();
+ this.sink = null;
+ response_stream.sink.destroy();
+ return this.handleReject(assignment_result);
}
}
@@ -2232,6 +2243,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
defer stream.value.unprotect();
response_stream.sink.markDone();
this.finalizeForAbort();
+ response_stream.sink.onFirstWrite = null;
response_stream.sink.finalize();
return;
@@ -2255,7 +2267,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
- this.finalize();
+ response_stream.sink.onFirstWrite = null;
+ response_stream.sink.ctx = null;
+ response_stream.detach();
+ stream.cancel(globalThis);
+ response_stream.sink.markDone();
+ this.renderMissing();
}
const streamLog = Output.scoped(.ReadableStream, false);
@@ -2455,7 +2472,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
streamLog("onResolve({any})", .{wrote_anything});
-
//aborted so call finalizeForAbort
if (req.flags.aborted or req.resp == null) {
req.finalizeForAbort();
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index b96721e6e..2e6bbdce2 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -2634,6 +2634,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
+ log("flushFromJSNoWait", .{});
if (this.hasBackpressure() or this.done) {
return .{ .result = JSValue.jsNumberFromInt32(0) };
}
diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp
index 67778c3e3..da38bcebb 100644
--- a/src/deps/libuwsockets.cpp
+++ b/src/deps/libuwsockets.cpp
@@ -1040,14 +1040,14 @@ extern "C"
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
uwsRes->getHttpResponseData()->onWritable = nullptr;
uwsRes->onAborted(nullptr);
- uwsRes->endWithoutBody(std::nullopt, close_connection);
+ uwsRes->sendTerminatingChunk(close_connection);
}
else
{
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
uwsRes->getHttpResponseData()->onWritable = nullptr;
uwsRes->onAborted(nullptr);
- uwsRes->endWithoutBody(std::nullopt, close_connection);
+ uwsRes->sendTerminatingChunk(close_connection);
}
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 1b03483a0..a8ba8d367 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -1071,17 +1071,24 @@ pub fn onClose(
const in_progress = client.state.stage != .done and client.state.stage != .fail;
- // if the peer closed after a full chunk, treat this
- // as if the transfer had complete, browsers appear to ignore
- // a missing 0\r\n chunk
- if (in_progress and client.state.isChunkedEncoding()) {
- if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
- var buf = client.state.getBodyBuffer();
- if (buf.list.items.len > 0) {
- client.state.received_last_chunk = true;
- client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
- return;
+ if (in_progress) {
+ // if the peer closed after a full chunk, treat this
+ // as if the transfer had complete, browsers appear to ignore
+ // a missing 0\r\n chunk
+ if (client.state.isChunkedEncoding()) {
+ if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
+ var buf = client.state.getBodyBuffer();
+ if (buf.list.items.len > 0) {
+ client.state.received_last_chunk = true;
+ client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
+ return;
+ }
}
+ } else if (client.state.content_length == null and client.state.response_stage == .body) {
+ // no content length informed so we are done here
+ client.state.received_last_chunk = true;
+ client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
+ return;
}
}
@@ -1121,12 +1128,31 @@ pub fn onConnectError(
pub fn onEnd(
client: *HTTPClient,
comptime is_ssl: bool,
- _: NewHTTPContext(is_ssl).HTTPSocket,
+ socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
log("onEnd {s}\n", .{client.url.href});
-
- if (client.state.stage != .done and client.state.stage != .fail)
- client.fail(error.ConnectionClosed);
+ const in_progress = client.state.stage != .done and client.state.stage != .fail;
+ if (in_progress) {
+ // if the peer closed after a full chunk, treat this
+ // as if the transfer had complete, browsers appear to ignore
+ // a missing 0\r\n chunk
+ if (client.state.isChunkedEncoding()) {
+ if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
+ var buf = client.state.getBodyBuffer();
+ if (buf.list.items.len > 0) {
+ client.state.received_last_chunk = true;
+ client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
+ return;
+ }
+ }
+ } else if (client.state.content_length == null and client.state.response_stage == .body) {
+ // no content length informed so we are done here
+ client.state.received_last_chunk = true;
+ client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
+ return;
+ }
+ }
+ client.fail(error.ConnectionClosed);
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -1369,8 +1395,8 @@ pub const InternalState = struct {
return this.total_body_received >= content_length;
}
- // TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
- return true;
+ // Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
+ return this.received_last_chunk;
}
fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
@@ -2695,6 +2721,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
if (!can_continue) {
+ log("onData: can_continue is false", .{});
// this means that the request ended
// clone metadata and return the progress at this point
this.cloneMetadata();
@@ -3089,10 +3116,10 @@ const preallocate_max = 1024 * 1024 * 256;
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
std.debug.assert(this.state.transfer_encoding == .identity);
- const content_length = this.state.content_length orelse 0;
+ const content_length = this.state.content_length;
// is it exactly as much as we need?
- if (is_only_buffer and incoming_data.len >= content_length) {
- try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]);
+ if (is_only_buffer and content_length != null and incoming_data.len >= content_length.?) {
+ try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length.?]);
return true;
} else {
return handleResponseBodyFromMultiplePackets(this, incoming_data);
@@ -3135,16 +3162,19 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
var buffer = this.state.getBodyBuffer();
- const content_length = this.state.content_length orelse 0;
+ const content_length = this.state.content_length;
- if (buffer.list.items.len == 0 and
- content_length > 0 and incoming_data.len < preallocate_max)
- {
+ if (buffer.list.items.len == 0 and incoming_data.len < preallocate_max) {
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
}
- const remaining_content_length = content_length -| this.state.total_body_received;
- var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
+ var remainder: []const u8 = undefined;
+ if (content_length != null) {
+ const remaining_content_length = content_length.? -| this.state.total_body_received;
+ remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
+ } else {
+ remainder = incoming_data;
+ }
_ = try buffer.write(remainder);
@@ -3157,7 +3187,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
}
// done or streaming
- const is_done = this.state.total_body_received >= content_length;
+ const is_done = content_length != null and this.state.total_body_received >= content_length.?;
if (is_done or this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer.*);
@@ -3545,7 +3575,12 @@ pub fn handleResponseMetadata(
}
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
- const content_length = this.state.content_length orelse 0;
+ const content_length = this.state.content_length;
+ if (content_length) |length| {
+ log("handleResponseMetadata: content_length is {} and transfer_encoding {}", .{ length, this.state.transfer_encoding });
+ } else {
+ log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
+ }
// if no body is expected we should stop processing
- return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked);
+ return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);
}
diff --git a/test/js/web/fetch/body-stream.test.ts b/test/js/web/fetch/body-stream.test.ts
index 8e2baf92a..8f7675528 100644
--- a/test/js/web/fetch/body-stream.test.ts
+++ b/test/js/web/fetch/body-stream.test.ts
@@ -13,6 +13,35 @@ var port = 0;
];
const useRequestObjectValues = [true, false];
+ test("Should not crash when not returning a promise when stream is in progress", async () => {
+ var called = false;
+ await runInServer(
+ {
+ async fetch() {
+ var stream = new ReadableStream({
+ type: "direct",
+ pull(controller) {
+ controller.write("hey");
+ setTimeout(() => {
+ controller.end();
+ }, 100);
+ },
+ });
+
+ return new Response(stream);
+ },
+ },
+ async url => {
+ called = true;
+ expect(await fetch(url).then(res => res.text())).toContain(
+ "Welcome to Bun! To get started, return a Response object.",
+ );
+ },
+ );
+
+ expect(called).toBe(true);
+ });
+
for (let RequestPrototypeMixin of BodyMixin) {
for (let useRequestObject of useRequestObjectValues) {
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${