diff options
author | 2023-09-05 19:22:09 -0300 | |
---|---|---|
committer | 2023-09-05 15:22:09 -0700 | |
commit | 6e50dd210fb052a4db4867fa03fe450ce87b4179 (patch) | |
tree | e8b4c5d7db0ced46065bf08fe37602532a48ac9f | |
parent | d268097ded4513abe3cff9ca0037f72e90c23a21 (diff) | |
download | bun-6e50dd210fb052a4db4867fa03fe450ce87b4179.tar.gz bun-6e50dd210fb052a4db4867fa03fe450ce87b4179.tar.zst bun-6e50dd210fb052a4db4867fa03fe450ce87b4179.zip |
fix(fetch) always use readable stream if it is available (#4503)
* always use readable stream if it is available
* use bun sleep
* fix tests
* rm uws dep
-rw-r--r-- | src/bun.js/webcore/response.zig | 57 | ||||
-rw-r--r-- | test/js/web/fetch/fetch.stream.test.ts | 52 |
2 files changed, 82 insertions, 27 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 8fc282cf0..da1655821 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -792,6 +792,36 @@ pub const Fetch = struct { return; } + if (this.readable_stream_ref.get()) |readable| { + if (readable.ptr == .Bytes) { + readable.ptr.Bytes.size_hint = this.getSizeHint(); + // body can be marked as used but we still need to pipe the data + var scheduled_response_buffer = this.scheduled_response_buffer.list; + + const chunk = scheduled_response_buffer.items; + + if (this.result.has_more) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + + // clean for reuse later + this.scheduled_response_buffer.reset(); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } + return; + } + } + if (this.response.get()) |response_js| { if (response_js.as(Response)) |response| { const body = response.body; @@ -854,33 +884,6 @@ pub const Fetch = struct { old.resolve(&response.body.value, this.global_this); } } - } else if (this.readable_stream_ref.get()) |readable| { - if (readable.ptr == .Bytes) { - readable.ptr.Bytes.size_hint = this.getSizeHint(); - // body can be marked as used but we still need to pipe the data - var scheduled_response_buffer = this.scheduled_response_buffer.list; - - const chunk = scheduled_response_buffer.items; - - if (this.result.has_more) { - readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - - // clean for reuse later - this.scheduled_response_buffer.reset(); - } else { - readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - } - } } } } diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts index 49cc0dd6a..98271ee79 100644 --- a/test/js/web/fetch/fetch.stream.test.ts +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -4,6 +4,17 @@ import { join } from "path"; import { describe, expect, it } from "bun:test"; import { gcTick } from "harness"; import zlib from "zlib"; +import http from "http"; +import { createReadStream } from "fs"; +import { pipeline } from "stream"; +import type { AddressInfo } from "net"; + +const files = [ + join(import.meta.dir, "fixture.html"), + join(import.meta.dir, "fixture.png"), + join(import.meta.dir, "fixture.png.gz"), +]; + const fixtures = { "fixture": readFileSync(join(import.meta.dir, "fixture.html")), "fixture.png": readFileSync(join(import.meta.dir, "fixture.png")), @@ -51,6 +62,47 @@ describe("fetch() with streaming", () => { } }); + for (let file of files) { + it("stream can handle response.body + await response.something() #4500", async () => { + let server: ReturnType<typeof http.createServer> | null = null; + try { + const errorHandler = (err: any) => expect(err).toBeUndefined(); + + server = http + .createServer(function (req, res) { + res.writeHead(200, { "Content-Type": "text/plain" }); + + pipeline(createReadStream(file), res, errorHandler); + }) + .listen(0); + + const address = server.address() as AddressInfo; + const url = `http://${address.address}:${address.port}`; + async function getRequestLen(url: string) { + const response = await fetch(url); + const hasBody = response.body; + if (hasBody) { + const res = await response.blob(); + return res.size; + } + return 0; + } + + for (let i = 0; i < 10; i++) { + let len = await getRequestLen(url); + if (len <= 0) { + throw new Error("Request length is 0"); + } + await Bun.sleep(50); + } + + expect(true).toBe(true); + } finally { + server?.close(); + } + }); + } + it("stream still works after response get out of scope", async () => { let server: Server | null = null; try { |