diff options
-rw-r--r-- | packages/bun-uws/src/HttpResponse.h | 18 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 20 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 1 | ||||
-rw-r--r-- | src/deps/libuwsockets.cpp | 4 | ||||
-rw-r--r-- | src/http_client_async.zig | 91 | ||||
-rw-r--r-- | test/js/web/fetch/body-stream.test.ts | 29 |
6 files changed, 131 insertions, 32 deletions
diff --git a/packages/bun-uws/src/HttpResponse.h b/packages/bun-uws/src/HttpResponse.h index d73f98152..11aad7bc0 100644 --- a/packages/bun-uws/src/HttpResponse.h +++ b/packages/bun-uws/src/HttpResponse.h @@ -439,6 +439,24 @@ public: return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()}; } + /* Write the end of chunked encoded stream */ + bool sendTerminatingChunk(bool closeConnection = false) { + writeStatus(HTTP_200_OK); + HttpResponseData<SSL> *httpResponseData = getHttpResponseData(); + if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) { + /* Write mark on first call to write */ + writeMark(); + + writeHeader("Transfer-Encoding", "chunked"); + httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED; + } + + /* Terminating 0 chunk */ + Super::write("\r\n0\r\n\r\n", 7); + + return internalEnd({nullptr, 0}, 0, false, false, closeConnection); + } + /* Write parts of the response in chunking fashion. Starts timeout if failed. */ bool write(std::string_view data) { writeStatus(HTTP_200_OK); diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 14e636a2b..685267bfd 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1443,6 +1443,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn endStream(this: *RequestContext, closeConnection: bool) void { + ctxLog("endStream", .{}); if (this.resp) |resp| { if (this.flags.is_waiting_for_request_body) { this.flags.is_waiting_for_request_body = false; @@ -1729,6 +1730,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this: *RequestContext, headers: *JSC.FetchHeaders, ) void { + ctxLog("writeHeaders", .{}); headers.fastRemove(.ContentLength); headers.fastRemove(.TransferEncoding); if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity); @@ -2100,6 +2102,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } fn doRenderStream(pair: *StreamPair) void { + ctxLog("doRenderStream", .{}); var this = pair.this; var stream = pair.stream; if (this.resp == null or this.flags.aborted) { @@ -2223,6 +2226,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, } return; + } else { + // if is not a promise we treat it as Error + streamLog("returned an error", .{}); + if (!this.flags.aborted) resp.clearAborted(); + response_stream.detach(); + this.sink = null; + response_stream.sink.destroy(); + return this.handleReject(assignment_result); } } @@ -2232,6 +2243,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp defer stream.value.unprotect(); response_stream.sink.markDone(); this.finalizeForAbort(); + response_stream.sink.onFirstWrite = null; response_stream.sink.finalize(); return; @@ -2255,7 +2267,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.setAbortHandler(); streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); - this.finalize(); + response_stream.sink.onFirstWrite = null; + response_stream.sink.ctx = null; + response_stream.detach(); + stream.cancel(globalThis); + response_stream.sink.markDone(); + this.renderMissing(); } const streamLog = Output.scoped(.ReadableStream, false); @@ -2455,7 +2472,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } streamLog("onResolve({any})", .{wrote_anything}); - //aborted so call finalizeForAbort if (req.flags.aborted or req.resp == null) { req.finalizeForAbort(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index b96721e6e..2e6bbdce2 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2634,6 +2634,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) { + log("flushFromJSNoWait", .{}); if (this.hasBackpressure() or this.done) { return .{ .result = JSValue.jsNumberFromInt32(0) }; } diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index 67778c3e3..da38bcebb 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1040,14 +1040,14 @@ extern "C" uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res; uwsRes->getHttpResponseData()->onWritable = nullptr; uwsRes->onAborted(nullptr); - uwsRes->endWithoutBody(std::nullopt, close_connection); + uwsRes->sendTerminatingChunk(close_connection); } else { uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res; uwsRes->getHttpResponseData()->onWritable = nullptr; uwsRes->onAborted(nullptr); - uwsRes->endWithoutBody(std::nullopt, close_connection); + uwsRes->sendTerminatingChunk(close_connection); } } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 1b03483a0..a8ba8d367 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -1071,17 +1071,24 @@ pub fn onClose( const in_progress = client.state.stage != .done and client.state.stage != .fail; - // if the peer closed after a full chunk, treat this - // as if the transfer had complete, browsers appear to ignore - // a missing 0\r\n chunk - if (in_progress and client.state.isChunkedEncoding()) { - if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) { - var buf = client.state.getBodyBuffer(); - if (buf.list.items.len > 0) { - client.state.received_last_chunk = true; - client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); - return; + if (in_progress) { + // if the peer closed after a full chunk, treat this + // as if the transfer had complete, browsers appear to ignore + // a missing 0\r\n chunk + if (client.state.isChunkedEncoding()) { + if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) { + var buf = client.state.getBodyBuffer(); + if (buf.list.items.len > 0) { + client.state.received_last_chunk = true; + client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); + return; + } } + } else if (client.state.content_length == null and client.state.response_stage == .body) { + // no content length informed so we are done here + client.state.received_last_chunk = true; + client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); + return; } } @@ -1121,12 +1128,31 @@ pub fn onConnectError( pub fn onEnd( client: *HTTPClient, comptime is_ssl: bool, - _: NewHTTPContext(is_ssl).HTTPSocket, + socket: NewHTTPContext(is_ssl).HTTPSocket, ) void { log("onEnd {s}\n", .{client.url.href}); - - if (client.state.stage != .done and client.state.stage != .fail) - client.fail(error.ConnectionClosed); + const in_progress = client.state.stage != .done and client.state.stage != .fail; + if (in_progress) { + // if the peer closed after a full chunk, treat this + // as if the transfer had complete, browsers appear to ignore + // a missing 0\r\n chunk + if (client.state.isChunkedEncoding()) { + if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) { + var buf = client.state.getBodyBuffer(); + if (buf.list.items.len > 0) { + client.state.received_last_chunk = true; + client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); + return; + } + } + } else if (client.state.content_length == null and client.state.response_stage == .body) { + // no content length informed so we are done here + client.state.received_last_chunk = true; + client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); + return; + } + } + client.fail(error.ConnectionClosed); } pub inline fn getAllocator() std.mem.Allocator { @@ -1369,8 +1395,8 @@ pub const InternalState = struct { return this.total_body_received >= content_length; } - // TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection - return true; + // Content-Type: text/event-stream we should be done only when Close/End/Timeout connection + return this.received_last_chunk; } fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void { @@ -2695,6 +2721,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } if (!can_continue) { + log("onData: can_continue is false", .{}); // this means that the request ended // clone metadata and return the progress at this point this.cloneMetadata(); @@ -3089,10 +3116,10 @@ const preallocate_max = 1024 * 1024 * 256; pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool { std.debug.assert(this.state.transfer_encoding == .identity); - const content_length = this.state.content_length orelse 0; + const content_length = this.state.content_length; // is it exactly as much as we need? - if (is_only_buffer and incoming_data.len >= content_length) { - try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]); + if (is_only_buffer and content_length != null and incoming_data.len >= content_length.?) { + try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length.?]); return true; } else { return handleResponseBodyFromMultiplePackets(this, incoming_data); @@ -3135,16 +3162,19 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool { var buffer = this.state.getBodyBuffer(); - const content_length = this.state.content_length orelse 0; + const content_length = this.state.content_length; - if (buffer.list.items.len == 0 and - content_length > 0 and incoming_data.len < preallocate_max) - { + if (buffer.list.items.len == 0 and incoming_data.len < preallocate_max) { buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {}; } - const remaining_content_length = content_length -| this.state.total_body_received; - var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)]; + var remainder: []const u8 = undefined; + if (content_length != null) { + const remaining_content_length = content_length.? -| this.state.total_body_received; + remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)]; + } else { + remainder = incoming_data; + } _ = try buffer.write(remainder); @@ -3157,7 +3187,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con } // done or streaming - const is_done = this.state.total_body_received >= content_length; + const is_done = content_length != null and this.state.total_body_received >= content_length.?; if (is_done or this.signals.get(.body_streaming)) { const processed = try this.state.processBodyBuffer(buffer.*); @@ -3545,7 +3575,12 @@ pub fn handleResponseMetadata( } this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body; - const content_length = this.state.content_length orelse 0; + const content_length = this.state.content_length; + if (content_length) |length| { + log("handleResponseMetadata: content_length is {} and transfer_encoding {}", .{ length, this.state.transfer_encoding }); + } else { + log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding}); + } // if no body is expected we should stop processing - return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked); + return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked); } diff --git a/test/js/web/fetch/body-stream.test.ts b/test/js/web/fetch/body-stream.test.ts index 8e2baf92a..8f7675528 100644 --- a/test/js/web/fetch/body-stream.test.ts +++ b/test/js/web/fetch/body-stream.test.ts @@ -13,6 +13,35 @@ var port = 0; ]; const useRequestObjectValues = [true, false]; + test("Should not crash when not returning a promise when stream is in progress", async () => { + var called = false; + await runInServer( + { + async fetch() { + var stream = new ReadableStream({ + type: "direct", + pull(controller) { + controller.write("hey"); + setTimeout(() => { + controller.end(); + }, 100); + }, + }); + + return new Response(stream); + }, + }, + async url => { + called = true; + expect(await fetch(url).then(res => res.text())).toContain( + "Welcome to Bun! To get started, return a Response object.", + ); + }, + ); + + expect(called).toBe(true); + }); + for (let RequestPrototypeMixin of BodyMixin) { for (let useRequestObject of useRequestObjectValues) { describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${ |