diff options
author | 2023-08-22 00:30:34 -0300 | |
---|---|---|
committer | 2023-08-21 20:30:34 -0700 | |
commit | 9027484ae1a9eaf9769d79828db48de66450a3fc (patch) | |
tree | e6b679132046bbc0ef254a170c0b0ec3030d3fa2 /test | |
parent | 91eacade975a522f7ad3d62185ff8adace3aad97 (diff) | |
download | bun-9027484ae1a9eaf9769d79828db48de66450a3fc.tar.gz bun-9027484ae1a9eaf9769d79828db48de66450a3fc.tar.zst bun-9027484ae1a9eaf9769d79828db48de66450a3fc.zip |
fetch(stream) add stream support for compressed and uncompressed data (#4127)
* streams non compressed data in 64kb chunks (at least)
* fmt
* wip remove pause
* fix default streaming and buffering
* fix atomic lags
* fix size
* make chunked encoding work again (WIP streaming chunked)
* WIP: chunked encoding streaming
* fix end of streamings
* working streaming + compression
* add fixes + tests
* fmt + fix proxy
* fix oopsies
* codegen after merge
* fmt + fixes
* more fixes
* more fixes and logs
* avoid double free
* check empty before pop
* check empty on pop
* fix copy to real when complete
* remove unnecessary logs
* better has_schedule_callback swap, body locked size helper, remove isEmpty from unbounded_queue pop
* fix response ref, fix body_size
* add deflate support, fix error throw, add more tests
* codegen after merge
* remove logs, add connection close test
* fix macOS build
* fix redirect error option
* make body_size more clear
* support new Reponse(response)
* toString DOMWrapper objects properly instead of supporting response in Response constructor
* ignore headers with no name, add more tests
* oops
* handle transform with fetch
* add gz image stream test
* remove duplicate test
* fix missing chunk on macOS under pressure
* oops include all OS
* some fixes
* compare buffers instead of sizes
* refactor err.err and protect it
Diffstat (limited to 'test')
-rw-r--r-- | test/js/web/fetch/fetch-gzip.test.ts | 4 | ||||
-rw-r--r-- | test/js/web/fetch/fetch.stream.test.ts | 1129 | ||||
-rw-r--r-- | test/js/web/fetch/fetch.test.ts | 17 | ||||
-rw-r--r-- | test/js/web/fetch/fixture.png | bin | 0 -> 82022 bytes | |||
-rw-r--r-- | test/js/web/fetch/fixture.png.gz | bin | 0 -> 78116 bytes |
5 files changed, 1143 insertions, 7 deletions
diff --git a/test/js/web/fetch/fetch-gzip.test.ts b/test/js/web/fetch/fetch-gzip.test.ts index 32888947b..0569eaad8 100644 --- a/test/js/web/fetch/fetch-gzip.test.ts +++ b/test/js/web/fetch/fetch-gzip.test.ts @@ -86,7 +86,7 @@ it("fetch() with a protocol-relative redirect that returns a buffered gzip respo server.stop(); }); -it("fetch() with a gzip response works (one chunk, streamed, with a delay", async () => { +it("fetch() with a gzip response works (one chunk, streamed, with a delay)", async () => { var server = Bun.serve({ port: 0, @@ -121,7 +121,7 @@ it("fetch() with a gzip response works (one chunk, streamed, with a delay", asyn server.stop(); }); -it("fetch() with a gzip response works (multiple chunks, TCP server", async done => { +it("fetch() with a gzip response works (multiple chunks, TCP server)", async done => { const compressed = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer(); var socketToClose!: Socket; const server = Bun.listen({ diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts new file mode 100644 index 000000000..efef6a161 --- /dev/null +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -0,0 +1,1129 @@ +import { Socket, Server, TCPSocketListener } from "bun"; +import { readFileSync } from "fs"; +import { join } from "path"; +import { describe, expect, it } from "bun:test"; +import { gcTick } from "harness"; + +const fixtures = { + "fixture": readFileSync(join(import.meta.dir, "fixture.html")), + "fixture.png": readFileSync(join(import.meta.dir, "fixture.png")), + "fixture.png.gz": readFileSync(join(import.meta.dir, "fixture.png.gz")), +}; + +const invalid = Buffer.from([0xc0]); + +const bigText = Buffer.from("a".repeat(1 * 1024 * 1024)); +const smallText = Buffer.from("Hello".repeat(16)); +const empty = Buffer.alloc(0); + +describe("fetch() with streaming", () => { + it("stream still works after response get out of scope", async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { status: 200, headers: { "Content-Type": "text/plain" } }, + ); + }, + }); + + async function getReader() { + return (await fetch(`http://${server.hostname}:${server.port}`, {})).body?.getReader(); + } + gcTick(false); + const reader = await getReader(); + gcTick(false); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + parts++; + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } finally { + server?.stop(); + } + }); + + it("response inspected size should reflect stream state", async () => { + let server: Server | null = null; + try { + const content = "Bun!\n".repeat(4); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 4)); + await controller.flush(); + + controller.close(); + }, + }), + { status: 200, headers: { "Content-Type": "text/plain" } }, + ); + }, + }); + + function inspectBytes(response: Response) { + const match = /Response \(([0-9]+ )bytes\)/g.exec( + Bun.inspect(response, { + depth: 0, + }), + ); + if (!match) return 0; + return parseInt(match[1]?.trim(), 10); + } + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + gcTick(false); + let size = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + size += value.length; + } + expect(inspectBytes(res)).toBe(size); + if (done) { + break; + } + } + + gcTick(false); + } finally { + server?.stop(); + } + }); + + it("can handle multiple simultaneos requests", async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + async function doRequest() { + await Bun.sleep(10); + const res = await fetch(server_url); + const reader = res.body?.getReader(); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } + + await Promise.all([doRequest(), doRequest(), doRequest(), doRequest(), doRequest(), doRequest()]); + } finally { + server?.stop(); + } + }); + + it(`can handle transforms`, async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + const res = await fetch(server_url); + + const transform = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(Buffer.from(chunk).toString("utf8").toUpperCase()); + }, + }); + + const reader = res.body?.pipeThrough(transform).getReader(); + + let result = ""; + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + result += value; + } + if (done) { + break; + } + } + + gcTick(false); + expect(result).toBe(content.toUpperCase()); + } finally { + server?.stop(); + } + }); + + it(`can handle gz images`, async () => { + let server: Server | null = null; + try { + server = Bun.serve({ + port: 0, + fetch(req) { + const data = fixtures["fixture.png.gz"]; + return new Response(data, { + status: 200, + headers: { + "Content-Type": "text/plain", + "Content-Encoding": "gzip", + }, + }); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + const res = await fetch(server_url); + + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer).toEqual(fixtures["fixture.png"]); + } finally { + server?.stop(); + } + }); + + it(`can proxy fetch with Bun.serve`, async () => { + let server: Server | null = null; + let server_original: Server | null = null; + try { + const content = "a".repeat(64 * 1024); + + server_original = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + server = Bun.serve({ + port: 0, + async fetch(req) { + const response = await fetch(`http://${server_original.hostname}:${server_original.port}`, {}); + await Bun.sleep(10); + return new Response(response.body, { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }); + }, + }); + + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThanOrEqual(1); + } finally { + server?.stop(); + server_original?.stop(); + } + }); + const matrix = [ + { name: "small", data: fixtures["fixture"] }, + { name: "small text", data: smallText }, + { name: "big text", data: bigText }, + { name: "img", data: fixtures["fixture.png"] }, + { name: "empty", data: empty }, + ]; + for (let i = 0; i < matrix.length; i++) { + const fixture = matrix[i]; + for (let j = 0; j < matrix.length; j++) { + const fixtureb = matrix[j]; + const test = fixture.name == "empty" && fixtureb.name == "empty" ? it.todo : it; + test(`can handle fixture ${fixture.name} x ${fixtureb.name}`, async () => { + let server: Server | null = null; + try { + //@ts-ignore + const data = fixture.data; + //@ts-ignore + const data_b = fixtureb.data; + const content = Buffer.concat([data, data_b]); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + controller.write(data); + await controller.flush(); + await Bun.sleep(100); + controller.write(data_b); + await controller.flush(); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + const res = await fetch(server_url); + const reader = res.body?.getReader(); + let buffer = Buffer.alloc(0); + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + gcTick(false); + expect(buffer).toEqual(content); + } finally { + server?.stop(); + } + }); + } + } + + type CompressionType = "no" | "gzip" | "deflate" | "br"; + type TestType = { headers: Record<string, string>; compression: CompressionType; skip?: boolean }; + const types: Array<TestType> = [ + { headers: {}, compression: "no" }, + { headers: { "Content-Encoding": "gzip" }, compression: "gzip" }, + { headers: { "Content-Encoding": "deflate" }, compression: "deflate" }, + // { headers: { "Content-Encoding": "br" }, compression: "br", skip: true }, // not implemented yet + ]; + + function compress(compression: CompressionType, data: Uint8Array) { + switch (compression) { + case "gzip": + return Bun.gzipSync(data); + case "deflate": + return Bun.deflateSync(data); + default: + return data; + } + } + + for (const { headers, compression, skip } of types) { + const test = skip ? it.skip : it; + + test(`with invalid utf8 with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = Buffer.concat([invalid, Buffer.from("Hello, world!\n".repeat(5), "utf8"), invalid]); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, content); + const size = data.byteLength / 4; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 4)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer).toEqual(content); + } finally { + server?.stop(); + } + }); + + test(`chunked response works (single chunk) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, Buffer.from(content, "utf8")); + controller.write(data); + await controller.flush(); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBe(1); + } finally { + server?.stop(); + } + }); + + test(`chunked response works (multiple chunks) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, Buffer.from(content, "utf8")); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + parts++; + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } finally { + server?.stop(); + } + }); + + test(`Content-Length response works (single part) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "a".repeat(1024); + + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response(compress(compression, Buffer.from(content)), { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBe(1); + } finally { + server?.stop(); + } + }); + + test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "a".repeat(64 * 1024); + + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response(compress(compression, Buffer.from(content)), { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } finally { + server?.stop(); + } + }); + + test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + await write("Content-Length: " + compressed.byteLength + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + await write("Extra Data!"); + await write("Extra Data!"); + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + } finally { + server?.stop(true); + } + }); + + test(`Missing data should timeout on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent + await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, { + signal: AbortSignal.timeout(1000), + }); + gcTick(false); + try { + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("TimeoutError"); + } + } finally { + server?.stop(true); + } + }); + + if (compression !== "no") { + test(`can handle corrupted ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent in this case we will wait to close + await write("Content-Length: " + compressed.byteLength + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + compressed[0] = 0; // corrupt data + cork = false; + for (var i = 0; i < 5; i++) { + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + + try { + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + + while (true) { + gcTick(false); + const read_promise = reader?.read(); + const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult<any>; + + if (value) { + buffer = Buffer.concat([buffer, value]); + } + + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("ZlibError"); + } + } finally { + server?.stop(true); + } + }); + } + + test(`can handle socket close with ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + const { promise, resolve: resolveSocket } = Promise.withResolvers<Socket>(); + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent in this case we will wait to close + await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + resolveSocket(socket); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + + let socket: Socket | null = await promise; + try { + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + + while (true) { + gcTick(false); + const read_promise = reader?.read(); + socket?.end(); + socket = null; + const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult<any>; + + if (value) { + buffer = Buffer.concat([buffer, value]); + } + + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("ConnectionClosed"); + } + } finally { + server?.stop(true); + } + }); + } +}); diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts index a381cb320..59847dde9 100644 --- a/test/js/web/fetch/fetch.test.ts +++ b/test/js/web/fetch/fetch.test.ts @@ -4,7 +4,7 @@ import { chmodSync, mkdtempSync, readFileSync, realpathSync, rmSync, writeFileSy import { mkfifo } from "mkfifo"; import { tmpdir } from "os"; import { join } from "path"; -import { gc, withoutAggressiveGC } from "harness"; +import { gc, withoutAggressiveGC, gcTick } from "harness"; const tmp_dir = mkdtempSync(join(realpathSync(tmpdir()), "fetch.test")); @@ -1334,11 +1334,18 @@ it("fetch() file:// works", async () => { expect(await (await fetch(new URL("fetch.test.ts", import.meta.url))).text()).toEqual( await Bun.file(Bun.fileURLToPath(new URL("fetch.test.ts", import.meta.url))).text(), ); - expect(await (await fetch(new URL("file with space in the name.txt", import.meta.url))).text()).toEqual( - await Bun.file(Bun.fileURLToPath(new URL("file with space in the name.txt", import.meta.url))).text(), - ); + gc(true); + var fileResponse = await fetch(new URL("file with space in the name.txt", import.meta.url)); + gc(true); + var fileResponseText = await fileResponse.text(); + gc(true); + var bunFile = Bun.file(Bun.fileURLToPath(new URL("file with space in the name.txt", import.meta.url))); + gc(true); + var bunFileText = await bunFile.text(); + gc(true); + expect(fileResponseText).toEqual(bunFileText); + gc(true); }); - it("cloned response headers are independent before accessing", () => { const response = new Response("hello", { headers: { diff --git a/test/js/web/fetch/fixture.png b/test/js/web/fetch/fixture.png Binary files differnew file mode 100644 index 000000000..b34dbd29d --- /dev/null +++ b/test/js/web/fetch/fixture.png diff --git a/test/js/web/fetch/fixture.png.gz b/test/js/web/fetch/fixture.png.gz Binary files differnew file mode 100644 index 000000000..8c0071811 --- /dev/null +++ b/test/js/web/fetch/fixture.png.gz |