aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-09-05 19:22:09 -0300
committerGravatar GitHub <noreply@github.com> 2023-09-05 15:22:09 -0700
commit6e50dd210fb052a4db4867fa03fe450ce87b4179 (patch)
treee8b4c5d7db0ced46065bf08fe37602532a48ac9f
parentd268097ded4513abe3cff9ca0037f72e90c23a21 (diff)
downloadbun-6e50dd210fb052a4db4867fa03fe450ce87b4179.tar.gz
bun-6e50dd210fb052a4db4867fa03fe450ce87b4179.tar.zst
bun-6e50dd210fb052a4db4867fa03fe450ce87b4179.zip
fix(fetch) always use readable stream if it is available (#4503)
* always use readable stream if it is available * use bun sleep * fix tests * rm uws dep
-rw-r--r--src/bun.js/webcore/response.zig57
-rw-r--r--test/js/web/fetch/fetch.stream.test.ts52
2 files changed, 82 insertions, 27 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 8fc282cf0..da1655821 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -792,6 +792,36 @@ pub const Fetch = struct {
return;
}
+ if (this.readable_stream_ref.get()) |readable| {
+ if (readable.ptr == .Bytes) {
+ readable.ptr.Bytes.size_hint = this.getSizeHint();
+ // body can be marked as used but we still need to pipe the data
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ const chunk = scheduled_response_buffer.items;
+
+ if (this.result.has_more) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+
+ // clean for reuse later
+ this.scheduled_response_buffer.reset();
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+ return;
+ }
+ }
+
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
const body = response.body;
@@ -854,33 +884,6 @@ pub const Fetch = struct {
old.resolve(&response.body.value, this.global_this);
}
}
- } else if (this.readable_stream_ref.get()) |readable| {
- if (readable.ptr == .Bytes) {
- readable.ptr.Bytes.size_hint = this.getSizeHint();
- // body can be marked as used but we still need to pipe the data
- var scheduled_response_buffer = this.scheduled_response_buffer.list;
-
- const chunk = scheduled_response_buffer.items;
-
- if (this.result.has_more) {
- readable.ptr.Bytes.onData(
- .{
- .temporary = bun.ByteList.initConst(chunk),
- },
- bun.default_allocator,
- );
-
- // clean for reuse later
- this.scheduled_response_buffer.reset();
- } else {
- readable.ptr.Bytes.onData(
- .{
- .temporary_and_done = bun.ByteList.initConst(chunk),
- },
- bun.default_allocator,
- );
- }
- }
}
}
}
diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts
index 49cc0dd6a..98271ee79 100644
--- a/test/js/web/fetch/fetch.stream.test.ts
+++ b/test/js/web/fetch/fetch.stream.test.ts
@@ -4,6 +4,17 @@ import { join } from "path";
import { describe, expect, it } from "bun:test";
import { gcTick } from "harness";
import zlib from "zlib";
+import http from "http";
+import { createReadStream } from "fs";
+import { pipeline } from "stream";
+import type { AddressInfo } from "net";
+
+const files = [
+ join(import.meta.dir, "fixture.html"),
+ join(import.meta.dir, "fixture.png"),
+ join(import.meta.dir, "fixture.png.gz"),
+];
+
const fixtures = {
"fixture": readFileSync(join(import.meta.dir, "fixture.html")),
"fixture.png": readFileSync(join(import.meta.dir, "fixture.png")),
@@ -51,6 +62,47 @@ describe("fetch() with streaming", () => {
}
});
+ for (let file of files) {
+ it("stream can handle response.body + await response.something() #4500", async () => {
+ let server: ReturnType<typeof http.createServer> | null = null;
+ try {
+ const errorHandler = (err: any) => expect(err).toBeUndefined();
+
+ server = http
+ .createServer(function (req, res) {
+ res.writeHead(200, { "Content-Type": "text/plain" });
+
+ pipeline(createReadStream(file), res, errorHandler);
+ })
+ .listen(0);
+
+ const address = server.address() as AddressInfo;
+ const url = `http://${address.address}:${address.port}`;
+ async function getRequestLen(url: string) {
+ const response = await fetch(url);
+ const hasBody = response.body;
+ if (hasBody) {
+ const res = await response.blob();
+ return res.size;
+ }
+ return 0;
+ }
+
+ for (let i = 0; i < 10; i++) {
+ let len = await getRequestLen(url);
+ if (len <= 0) {
+ throw new Error("Request length is 0");
+ }
+ await Bun.sleep(50);
+ }
+
+ expect(true).toBe(true);
+ } finally {
+ server?.close();
+ }
+ });
+ }
+
it("stream still works after response get out of scope", async () => {
let server: Server | null = null;
try {