summaryrefslogtreecommitdiff
path: root/packages/integrations/node
diff options
context:
space:
mode:
authorGravatar pilcrowOnPaper <80624252+pilcrowOnPaper@users.noreply.github.com> 2023-11-16 08:39:41 +0900
committerGravatar GitHub <noreply@github.com> 2023-11-16 05:09:41 +0530
commitc9487138d6d8fd39c8c8512239b6724cf2b275ff (patch)
tree5d9f7cb1dc6b134327b1dad89bd0b4dd93765c27 /packages/integrations/node
parenteeac2885514ecde558ad39bcee39f78f7b60109e (diff)
downloadastro-c9487138d6d8fd39c8c8512239b6724cf2b275ff.tar.gz
astro-c9487138d6d8fd39c8c8512239b6724cf2b275ff.tar.zst
astro-c9487138d6d8fd39c8c8512239b6724cf2b275ff.zip
Cancel response stream when connection closes (#9071)
* cancel stream on close * add changeset * add test * Update .changeset/modern-ways-develop.md Co-authored-by: Sarah Rainsberger <sarah@rainsberger.ca> --------- Co-authored-by: lilnasy <69170106+lilnasy@users.noreply.github.com> Co-authored-by: Sarah Rainsberger <sarah@rainsberger.ca>
Diffstat (limited to 'packages/integrations/node')
-rw-r--r--packages/integrations/node/src/nodeMiddleware.ts12
-rw-r--r--packages/integrations/node/src/response-iterator.ts228
-rw-r--r--packages/integrations/node/test/api-route.test.js19
-rw-r--r--packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts2
-rw-r--r--packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts22
5 files changed, 50 insertions, 233 deletions
diff --git a/packages/integrations/node/src/nodeMiddleware.ts b/packages/integrations/node/src/nodeMiddleware.ts
index ddaa95deb..7f242809e 100644
--- a/packages/integrations/node/src/nodeMiddleware.ts
+++ b/packages/integrations/node/src/nodeMiddleware.ts
@@ -1,8 +1,6 @@
import type { NodeApp } from 'astro/app/node';
import type { ServerResponse } from 'node:http';
-import type { Readable } from 'stream';
import { createOutgoingHttpHeaders } from './createOutgoingHttpHeaders.js';
-import { responseIterator } from './response-iterator.js';
import type { ErrorHandlerParams, Options, RequestHandlerParams } from './types.js';
// Disable no-unused-vars to avoid breaking signature change
@@ -79,8 +77,14 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse:
res.writeHead(status, nodeHeaders);
if (webResponse.body) {
try {
- for await (const chunk of responseIterator(webResponse) as unknown as Readable) {
- res.write(chunk);
+ const reader = webResponse.body.getReader();
+ res.on("close", () => {
+ reader.cancel();
+ })
+ let result = await reader.read();
+ while (!result.done) {
+ res.write(result.value);
+ result = await reader.read();
}
} catch (err: any) {
console.error(err?.stack || err?.message || String(err));
diff --git a/packages/integrations/node/src/response-iterator.ts b/packages/integrations/node/src/response-iterator.ts
deleted file mode 100644
index b79c3a853..000000000
--- a/packages/integrations/node/src/response-iterator.ts
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Original sources:
- * - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts
- * - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts
- */
-
-import { AstroError } from 'astro/errors';
-import type { ReadableStreamDefaultReadResult } from 'node:stream/web';
-import { Readable as NodeReadableStream } from 'stream';
-
-interface NodeStreamIterator<T> {
- next(): Promise<IteratorResult<T, boolean | undefined>>;
- [Symbol.asyncIterator]?(): AsyncIterator<T>;
-}
-
-interface PromiseIterator<T> {
- next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>;
- [Symbol.asyncIterator]?(): AsyncIterator<T>;
-}
-
-interface ReaderIterator<T> {
- next(): Promise<ReadableStreamDefaultReadResult<T>>;
- [Symbol.asyncIterator]?(): AsyncIterator<T>;
-}
-
-const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function';
-
-const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator;
-
-function isBuffer(value: any): value is Buffer {
- return (
- value?.constructor != null &&
- typeof value.constructor.isBuffer === 'function' &&
- value.constructor.isBuffer(value)
- );
-}
-
-function isNodeResponse(value: any): value is Response {
- return !!(value as Response).body;
-}
-
-function isReadableStream(value: any): value is ReadableStream<any> {
- return !!(value as ReadableStream<any>).getReader;
-}
-
-function isAsyncIterableIterator(value: any): value is AsyncIterableIterator<any> {
- return !!(
- canUseAsyncIteratorSymbol && (value as AsyncIterableIterator<any>)[Symbol.asyncIterator]
- );
-}
-
-function isStreamableBlob(value: any): value is Blob {
- return !!(value as Blob).stream;
-}
-
-function isBlob(value: any): value is Blob {
- return !!(value as Blob).arrayBuffer;
-}
-
-function isNodeReadableStream(value: any): value is NodeReadableStream {
- return !!(value as NodeReadableStream).pipe;
-}
-
-function readerIterator<T>(reader: ReadableStreamDefaultReader<T>): AsyncIterableIterator<T> {
- const iterator: ReaderIterator<T> = {
- //@ts-expect-error
- next() {
- return reader.read();
- },
- };
-
- if (canUseAsyncIteratorSymbol) {
- iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
- //@ts-expect-error
- return this;
- };
- }
-
- return iterator as AsyncIterableIterator<T>;
-}
-
-function promiseIterator<T = ArrayBuffer>(promise: Promise<ArrayBuffer>): AsyncIterableIterator<T> {
- let resolved = false;
-
- const iterator: PromiseIterator<T> = {
- next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> {
- if (resolved)
- return Promise.resolve({
- value: undefined,
- done: true,
- });
- resolved = true;
- return new Promise(function (resolve, reject) {
- promise
- .then(function (value) {
- resolve({ value: value as unknown as T, done: false });
- })
- .catch(reject);
- });
- },
- };
-
- if (canUseAsyncIteratorSymbol) {
- iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
- return this;
- };
- }
-
- return iterator as AsyncIterableIterator<T>;
-}
-
-function nodeStreamIterator<T>(stream: NodeReadableStream): AsyncIterableIterator<T> {
- let cleanup: (() => void) | null = null;
- let error: Error | null = null;
- let done = false;
- const data: unknown[] = [];
-
- const waiting: [
- (
- value:
- | IteratorResult<T, boolean | undefined>
- | PromiseLike<IteratorResult<T, boolean | undefined>>
- ) => void,
- (reason?: any) => void,
- ][] = [];
-
- function onData(chunk: any) {
- if (error) return;
- if (waiting.length) {
- const shiftedArr = waiting.shift();
- if (Array.isArray(shiftedArr) && shiftedArr[0]) {
- return shiftedArr[0]({ value: chunk, done: false });
- }
- }
- data.push(chunk);
- }
- function onError(err: Error) {
- error = err;
- const all = waiting.slice();
- all.forEach(function (pair) {
- pair[1](err);
- });
- !cleanup || cleanup();
- }
- function onEnd() {
- done = true;
- const all = waiting.slice();
- all.forEach(function (pair) {
- pair[0]({ value: undefined, done: true });
- });
- !cleanup || cleanup();
- }
-
- cleanup = function () {
- cleanup = null;
- stream.removeListener('data', onData);
- stream.removeListener('error', onError);
- stream.removeListener('end', onEnd);
- stream.removeListener('finish', onEnd);
- stream.removeListener('close', onEnd);
- };
- stream.on('data', onData);
- stream.on('error', onError);
- stream.on('end', onEnd);
- stream.on('finish', onEnd);
- stream.on('close', onEnd);
-
- function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
- return new Promise(function (resolve, reject) {
- if (error) return reject(error);
- if (data.length) return resolve({ value: data.shift() as T, done: false });
- if (done) return resolve({ value: undefined, done: true });
- waiting.push([resolve, reject]);
- });
- }
-
- const iterator: NodeStreamIterator<T> = {
- next(): Promise<IteratorResult<T, boolean | undefined>> {
- return getNext();
- },
- };
-
- if (canUseAsyncIteratorSymbol) {
- iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
- return this;
- };
- }
-
- return iterator as AsyncIterableIterator<T>;
-}
-
-function asyncIterator<T>(source: AsyncIterableIterator<T>): AsyncIterableIterator<T> {
- const iterator = source[Symbol.asyncIterator]();
- return {
- next(): Promise<IteratorResult<T, boolean>> {
- return iterator.next();
- },
- [Symbol.asyncIterator](): AsyncIterableIterator<T> {
- return this;
- },
- };
-}
-
-export function responseIterator<T>(response: Response | Buffer): AsyncIterableIterator<T> {
- let body: unknown = response;
-
- if (isNodeResponse(response)) body = response.body;
-
- if (isBuffer(body)) body = NodeReadableStream.from(body);
-
- if (isAsyncIterableIterator(body)) return asyncIterator<T>(body);
-
- if (isReadableStream(body)) return readerIterator<T>(body.getReader());
-
- // this errors without casting to ReadableStream<T>
- // because Blob.stream() returns a NodeJS ReadableStream
- if (isStreamableBlob(body)) {
- return readerIterator<T>((body.stream() as unknown as ReadableStream<T>).getReader());
- }
-
- if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer());
-
- if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body);
-
- throw new AstroError(
- 'Unknown body type for responseIterator. Please pass a streamable response.'
- );
-}
diff --git a/packages/integrations/node/test/api-route.test.js b/packages/integrations/node/test/api-route.test.js
index c830eee2d..7d9422ab4 100644
--- a/packages/integrations/node/test/api-route.test.js
+++ b/packages/integrations/node/test/api-route.test.js
@@ -89,4 +89,23 @@ describe('API routes', () => {
let [out] = await done;
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
});
+
+ it('Can bail on streaming', async () => {
+ const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');
+ let { req, res, done } = createRequestAndResponse({
+ url: '/streaming',
+ });
+
+ let locals = { cancelledByTheServer: false };
+
+ handler(req, res, () => {}, locals);
+ req.send();
+
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ res.emit("close");
+
+ await done;
+
+ expect(locals).to.deep.include({ cancelledByTheServer: true });
+ });
});
diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts
index fbf44c547..3f1b236de 100644
--- a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts
+++ b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts
@@ -1,6 +1,6 @@
import crypto from 'node:crypto';
-export async function post({ request }: { request: Request }) {
+export async function POST({ request }: { request: Request }) {
const hash = crypto.createHash('sha256');
const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts
new file mode 100644
index 000000000..9ecb884bf
--- /dev/null
+++ b/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts
@@ -0,0 +1,22 @@
+export const GET = ({ locals }) => {
+ let sentChunks = 0;
+
+ const readableStream = new ReadableStream({
+ async pull(controller) {
+ if (sentChunks === 3) return controller.close();
+ else sentChunks++;
+
+ await new Promise(resolve => setTimeout(resolve, 1000));
+ controller.enqueue(new TextEncoder().encode('hello\n'));
+ },
+ cancel() {
+ locals.cancelledByTheServer = true;
+ }
+ });
+
+ return new Response(readableStream, {
+ headers: {
+ "Content-Type": "text/event-stream"
+ }
+ });
+}