diff options
author | 2023-11-16 08:39:41 +0900 | |
---|---|---|
committer | 2023-11-16 05:09:41 +0530 | |
commit | c9487138d6d8fd39c8c8512239b6724cf2b275ff (patch) | |
tree | 5d9f7cb1dc6b134327b1dad89bd0b4dd93765c27 /packages/integrations/node | |
parent | eeac2885514ecde558ad39bcee39f78f7b60109e (diff) | |
download | astro-c9487138d6d8fd39c8c8512239b6724cf2b275ff.tar.gz astro-c9487138d6d8fd39c8c8512239b6724cf2b275ff.tar.zst astro-c9487138d6d8fd39c8c8512239b6724cf2b275ff.zip |
Cancel response stream when connection closes (#9071)
* cancel stream on close
* add changeset
* add test
* Update .changeset/modern-ways-develop.md
Co-authored-by: Sarah Rainsberger <sarah@rainsberger.ca>
---------
Co-authored-by: lilnasy <69170106+lilnasy@users.noreply.github.com>
Co-authored-by: Sarah Rainsberger <sarah@rainsberger.ca>
Diffstat (limited to 'packages/integrations/node')
5 files changed, 50 insertions, 233 deletions
diff --git a/packages/integrations/node/src/nodeMiddleware.ts b/packages/integrations/node/src/nodeMiddleware.ts index ddaa95deb..7f242809e 100644 --- a/packages/integrations/node/src/nodeMiddleware.ts +++ b/packages/integrations/node/src/nodeMiddleware.ts @@ -1,8 +1,6 @@ import type { NodeApp } from 'astro/app/node'; import type { ServerResponse } from 'node:http'; -import type { Readable } from 'stream'; import { createOutgoingHttpHeaders } from './createOutgoingHttpHeaders.js'; -import { responseIterator } from './response-iterator.js'; import type { ErrorHandlerParams, Options, RequestHandlerParams } from './types.js'; // Disable no-unused-vars to avoid breaking signature change @@ -79,8 +77,14 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: res.writeHead(status, nodeHeaders); if (webResponse.body) { try { - for await (const chunk of responseIterator(webResponse) as unknown as Readable) { - res.write(chunk); + const reader = webResponse.body.getReader(); + res.on("close", () => { + reader.cancel(); + }) + let result = await reader.read(); + while (!result.done) { + res.write(result.value); + result = await reader.read(); } } catch (err: any) { console.error(err?.stack || err?.message || String(err)); diff --git a/packages/integrations/node/src/response-iterator.ts b/packages/integrations/node/src/response-iterator.ts deleted file mode 100644 index b79c3a853..000000000 --- a/packages/integrations/node/src/response-iterator.ts +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Original sources: - * - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts - * - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts - */ - -import { AstroError } from 'astro/errors'; -import type { ReadableStreamDefaultReadResult } from 'node:stream/web'; -import { Readable as NodeReadableStream } from 'stream'; - -interface NodeStreamIterator<T> { - next(): Promise<IteratorResult<T, boolean | undefined>>; - [Symbol.asyncIterator]?(): AsyncIterator<T>; -} - -interface PromiseIterator<T> { - next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>; - [Symbol.asyncIterator]?(): AsyncIterator<T>; -} - -interface ReaderIterator<T> { - next(): Promise<ReadableStreamDefaultReadResult<T>>; - [Symbol.asyncIterator]?(): AsyncIterator<T>; -} - -const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function'; - -const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator; - -function isBuffer(value: any): value is Buffer { - return ( - value?.constructor != null && - typeof value.constructor.isBuffer === 'function' && - value.constructor.isBuffer(value) - ); -} - -function isNodeResponse(value: any): value is Response { - return !!(value as Response).body; -} - -function isReadableStream(value: any): value is ReadableStream<any> { - return !!(value as ReadableStream<any>).getReader; -} - -function isAsyncIterableIterator(value: any): value is AsyncIterableIterator<any> { - return !!( - canUseAsyncIteratorSymbol && (value as AsyncIterableIterator<any>)[Symbol.asyncIterator] - ); -} - -function isStreamableBlob(value: any): value is Blob { - return !!(value as Blob).stream; -} - -function isBlob(value: any): value is Blob { - return !!(value as Blob).arrayBuffer; -} - -function isNodeReadableStream(value: any): value is NodeReadableStream { - return !!(value as NodeReadableStream).pipe; -} - -function readerIterator<T>(reader: ReadableStreamDefaultReader<T>): AsyncIterableIterator<T> { - const iterator: ReaderIterator<T> = { - //@ts-expect-error - next() { - return reader.read(); - }, - }; - - if (canUseAsyncIteratorSymbol) { - iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> { - //@ts-expect-error - return this; - }; - } - - return iterator as AsyncIterableIterator<T>; -} - -function promiseIterator<T = ArrayBuffer>(promise: Promise<ArrayBuffer>): AsyncIterableIterator<T> { - let resolved = false; - - const iterator: PromiseIterator<T> = { - next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> { - if (resolved) - return Promise.resolve({ - value: undefined, - done: true, - }); - resolved = true; - return new Promise(function (resolve, reject) { - promise - .then(function (value) { - resolve({ value: value as unknown as T, done: false }); - }) - .catch(reject); - }); - }, - }; - - if (canUseAsyncIteratorSymbol) { - iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> { - return this; - }; - } - - return iterator as AsyncIterableIterator<T>; -} - -function nodeStreamIterator<T>(stream: NodeReadableStream): AsyncIterableIterator<T> { - let cleanup: (() => void) | null = null; - let error: Error | null = null; - let done = false; - const data: unknown[] = []; - - const waiting: [ - ( - value: - | IteratorResult<T, boolean | undefined> - | PromiseLike<IteratorResult<T, boolean | undefined>> - ) => void, - (reason?: any) => void, - ][] = []; - - function onData(chunk: any) { - if (error) return; - if (waiting.length) { - const shiftedArr = waiting.shift(); - if (Array.isArray(shiftedArr) && shiftedArr[0]) { - return shiftedArr[0]({ value: chunk, done: false }); - } - } - data.push(chunk); - } - function onError(err: Error) { - error = err; - const all = waiting.slice(); - all.forEach(function (pair) { - pair[1](err); - }); - !cleanup || cleanup(); - } - function onEnd() { - done = true; - const all = waiting.slice(); - all.forEach(function (pair) { - pair[0]({ value: undefined, done: true }); - }); - !cleanup || cleanup(); - } - - cleanup = function () { - cleanup = null; - stream.removeListener('data', onData); - stream.removeListener('error', onError); - stream.removeListener('end', onEnd); - stream.removeListener('finish', onEnd); - stream.removeListener('close', onEnd); - }; - stream.on('data', onData); - stream.on('error', onError); - stream.on('end', onEnd); - stream.on('finish', onEnd); - stream.on('close', onEnd); - - function getNext(): Promise<IteratorResult<T, boolean | undefined>> { - return new Promise(function (resolve, reject) { - if (error) return reject(error); - if (data.length) return resolve({ value: data.shift() as T, done: false }); - if (done) return resolve({ value: undefined, done: true }); - waiting.push([resolve, reject]); - }); - } - - const iterator: NodeStreamIterator<T> = { - next(): Promise<IteratorResult<T, boolean | undefined>> { - return getNext(); - }, - }; - - if (canUseAsyncIteratorSymbol) { - iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> { - return this; - }; - } - - return iterator as AsyncIterableIterator<T>; -} - -function asyncIterator<T>(source: AsyncIterableIterator<T>): AsyncIterableIterator<T> { - const iterator = source[Symbol.asyncIterator](); - return { - next(): Promise<IteratorResult<T, boolean>> { - return iterator.next(); - }, - [Symbol.asyncIterator](): AsyncIterableIterator<T> { - return this; - }, - }; -} - -export function responseIterator<T>(response: Response | Buffer): AsyncIterableIterator<T> { - let body: unknown = response; - - if (isNodeResponse(response)) body = response.body; - - if (isBuffer(body)) body = NodeReadableStream.from(body); - - if (isAsyncIterableIterator(body)) return asyncIterator<T>(body); - - if (isReadableStream(body)) return readerIterator<T>(body.getReader()); - - // this errors without casting to ReadableStream<T> - // because Blob.stream() returns a NodeJS ReadableStream - if (isStreamableBlob(body)) { - return readerIterator<T>((body.stream() as unknown as ReadableStream<T>).getReader()); - } - - if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer()); - - if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body); - - throw new AstroError( - 'Unknown body type for responseIterator. Please pass a streamable response.' - ); -} diff --git a/packages/integrations/node/test/api-route.test.js b/packages/integrations/node/test/api-route.test.js index c830eee2d..7d9422ab4 100644 --- a/packages/integrations/node/test/api-route.test.js +++ b/packages/integrations/node/test/api-route.test.js @@ -89,4 +89,23 @@ describe('API routes', () => { let [out] = await done; expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest); }); + + it('Can bail on streaming', async () => { + const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs'); + let { req, res, done } = createRequestAndResponse({ + url: '/streaming', + }); + + let locals = { cancelledByTheServer: false }; + + handler(req, res, () => {}, locals); + req.send(); + + await new Promise((resolve) => setTimeout(resolve, 500)); + res.emit("close"); + + await done; + + expect(locals).to.deep.include({ cancelledByTheServer: true }); + }); }); diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts index fbf44c547..3f1b236de 100644 --- a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts +++ b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts @@ -1,6 +1,6 @@ import crypto from 'node:crypto'; -export async function post({ request }: { request: Request }) { +export async function POST({ request }: { request: Request }) { const hash = crypto.createHash('sha256'); const iterable = request.body as unknown as AsyncIterable<Uint8Array>; diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts new file mode 100644 index 000000000..9ecb884bf --- /dev/null +++ b/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts @@ -0,0 +1,22 @@ +export const GET = ({ locals }) => { + let sentChunks = 0; + + const readableStream = new ReadableStream({ + async pull(controller) { + if (sentChunks === 3) return controller.close(); + else sentChunks++; + + await new Promise(resolve => setTimeout(resolve, 1000)); + controller.enqueue(new TextEncoder().encode('hello\n')); + }, + cancel() { + locals.cancelledByTheServer = true; + } + }); + + return new Response(readableStream, { + headers: { + "Content-Type": "text/event-stream" + } + }); +} |