summaryrefslogtreecommitdiff
path: root/packages/integrations/node/src
diff options
context:
space:
mode:
authorGravatar matthewp <matthewp@users.noreply.github.com> 2022-11-16 21:58:37 +0000
committerGravatar fredkbot <fred+astrobot@astro.build> 2022-11-16 21:58:37 +0000
commitc18c618a09ea5d3d40a64bc47c60c64a7a3be141 (patch)
tree7bd0f9c2c5c4e1f5e8bc4eb2a762ef2de2b643d5 /packages/integrations/node/src
parente2fb0c4ff29d27bbd39193127c64fd4f312129e3 (diff)
downloadastro-c18c618a09ea5d3d40a64bc47c60c64a7a3be141.tar.gz
astro-c18c618a09ea5d3d40a64bc47c60c64a7a3be141.tar.zst
astro-c18c618a09ea5d3d40a64bc47c60c64a7a3be141.zip
[ci] format
Diffstat (limited to 'packages/integrations/node/src')
-rw-r--r--packages/integrations/node/src/response-iterator.ts456
1 files changed, 221 insertions, 235 deletions
diff --git a/packages/integrations/node/src/response-iterator.ts b/packages/integrations/node/src/response-iterator.ts
index d197f380c..7700e9331 100644
--- a/packages/integrations/node/src/response-iterator.ts
+++ b/packages/integrations/node/src/response-iterator.ts
@@ -4,238 +4,224 @@
* - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts
*/
- import type { Response as NodeResponse } from "node-fetch";
- import { Readable as NodeReadableStream } from "stream";
-
- interface NodeStreamIterator<T> {
- next(): Promise<IteratorResult<T, boolean | undefined>>;
- [Symbol.asyncIterator]?(): AsyncIterator<T>;
- }
-
- interface PromiseIterator<T> {
- next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>;
- [Symbol.asyncIterator]?(): AsyncIterator<T>;
- }
-
- interface ReaderIterator<T> {
- next(): Promise<ReadableStreamDefaultReadResult<T>>;
- [Symbol.asyncIterator]?(): AsyncIterator<T>;
- }
-
- const canUseSymbol =
- typeof Symbol === 'function' &&
- typeof Symbol.for === 'function';
-
- const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator;
-
- function isBuffer(value: any): value is Buffer {
- return value != null && value.constructor != null &&
- typeof value.constructor.isBuffer === 'function' && value.constructor.isBuffer(value)
- }
-
- function isNodeResponse(value: any): value is NodeResponse {
- return !!(value as NodeResponse).body;
- }
-
- function isReadableStream(value: any): value is ReadableStream<any> {
- return !!(value as ReadableStream<any>).getReader;
- }
-
- function isAsyncIterableIterator(
- value: any
- ): value is AsyncIterableIterator<any> {
- return !!(
- canUseAsyncIteratorSymbol &&
- (value as AsyncIterableIterator<any>)[Symbol.asyncIterator]
- );
- }
-
- function isStreamableBlob(value: any): value is Blob {
- return !!(value as Blob).stream;
- }
-
- function isBlob(value: any): value is Blob {
- return !!(value as Blob).arrayBuffer;
- }
-
- function isNodeReadableStream(value: any): value is NodeReadableStream {
- return !!(value as NodeReadableStream).pipe;
- }
-
- function readerIterator<T>(
- reader: ReadableStreamDefaultReader<T>
- ): AsyncIterableIterator<T> {
- const iterator: ReaderIterator<T> = {
- next() {
- return reader.read();
- },
- };
-
- if (canUseAsyncIteratorSymbol) {
- iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
- //@ts-ignore
- return this;
- };
- }
-
- return iterator as AsyncIterableIterator<T>;
- }
-
- function promiseIterator<T = ArrayBuffer>(
- promise: Promise<ArrayBuffer>
- ): AsyncIterableIterator<T> {
- let resolved = false;
-
- const iterator: PromiseIterator<T> = {
- next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> {
- if (resolved)
- return Promise.resolve({
- value: undefined,
- done: true,
- });
- resolved = true;
- return new Promise(function (resolve, reject) {
- promise
- .then(function (value) {
- resolve({ value: value as unknown as T, done: false });
- })
- .catch(reject);
- });
- },
- };
-
- if (canUseAsyncIteratorSymbol) {
- iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
- return this;
- };
- }
-
- return iterator as AsyncIterableIterator<T>;
- }
-
- function nodeStreamIterator<T>(
- stream: NodeReadableStream
- ): AsyncIterableIterator<T> {
- let cleanup: (() => void) | null = null;
- let error: Error | null = null;
- let done = false;
- const data: unknown[] = [];
-
- const waiting: [
- (
- value:
- | IteratorResult<T, boolean | undefined>
- | PromiseLike<IteratorResult<T, boolean | undefined>>
- ) => void,
- (reason?: any) => void
- ][] = [];
-
- function onData(chunk: any) {
- if (error) return;
- if (waiting.length) {
- const shiftedArr = waiting.shift();
- if (Array.isArray(shiftedArr) && shiftedArr[0]) {
- return shiftedArr[0]({ value: chunk, done: false });
- }
- }
- data.push(chunk);
- }
- function onError(err: Error) {
- error = err;
- const all = waiting.slice();
- all.forEach(function (pair) {
- pair[1](err);
- });
- !cleanup || cleanup();
- }
- function onEnd() {
- done = true;
- const all = waiting.slice();
- all.forEach(function (pair) {
- pair[0]({ value: undefined, done: true });
- });
- !cleanup || cleanup();
- }
-
- cleanup = function () {
- cleanup = null;
- stream.removeListener("data", onData);
- stream.removeListener("error", onError);
- stream.removeListener("end", onEnd);
- stream.removeListener("finish", onEnd);
- stream.removeListener("close", onEnd);
- };
- stream.on("data", onData);
- stream.on("error", onError);
- stream.on("end", onEnd);
- stream.on("finish", onEnd);
- stream.on("close", onEnd);
-
- function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
- return new Promise(function (resolve, reject) {
- if (error) return reject(error);
- if (data.length) return resolve({ value: data.shift() as T, done: false });
- if (done) return resolve({ value: undefined, done: true });
- waiting.push([resolve, reject]);
- });
- }
-
- const iterator: NodeStreamIterator<T> = {
- next(): Promise<IteratorResult<T, boolean | undefined>> {
- return getNext();
- },
- };
-
- if (canUseAsyncIteratorSymbol) {
- iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
- return this;
- };
- }
-
- return iterator as AsyncIterableIterator<T>;
- }
-
- function asyncIterator<T>(
- source: AsyncIterableIterator<T>
- ): AsyncIterableIterator<T> {
- const iterator = source[Symbol.asyncIterator]();
- return {
- next(): Promise<IteratorResult<T, boolean>> {
- return iterator.next();
- },
- [Symbol.asyncIterator](): AsyncIterableIterator<T> {
- return this;
- },
- };
- }
-
- export function responseIterator<T>(
- response: Response | NodeResponse | Buffer
- ): AsyncIterableIterator<T> {
- let body: unknown = response;
-
- if (isNodeResponse(response)) body = response.body;
-
- if (isBuffer(body)) body = NodeReadableStream.from(body);
-
- if (isAsyncIterableIterator(body)) return asyncIterator<T>(body);
-
- if (isReadableStream(body)) return readerIterator<T>(body.getReader());
-
- // this errors without casting to ReadableStream<T>
- // because Blob.stream() returns a NodeJS ReadableStream
- if (isStreamableBlob(body)) {
- return readerIterator<T>(
- (body.stream() as unknown as ReadableStream<T>).getReader()
- );
- }
-
- if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer());
-
- if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body);
-
- throw new Error(
- "Unknown body type for responseIterator. Please pass a streamable response."
- );
- }
- \ No newline at end of file
+import type { Response as NodeResponse } from 'node-fetch';
+import { Readable as NodeReadableStream } from 'stream';
+
+interface NodeStreamIterator<T> {
+ next(): Promise<IteratorResult<T, boolean | undefined>>;
+ [Symbol.asyncIterator]?(): AsyncIterator<T>;
+}
+
+interface PromiseIterator<T> {
+ next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>;
+ [Symbol.asyncIterator]?(): AsyncIterator<T>;
+}
+
+interface ReaderIterator<T> {
+ next(): Promise<ReadableStreamDefaultReadResult<T>>;
+ [Symbol.asyncIterator]?(): AsyncIterator<T>;
+}
+
+const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function';
+
+const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator;
+
+function isBuffer(value: any): value is Buffer {
+ return (
+ value != null &&
+ value.constructor != null &&
+ typeof value.constructor.isBuffer === 'function' &&
+ value.constructor.isBuffer(value)
+ );
+}
+
+function isNodeResponse(value: any): value is NodeResponse {
+ return !!(value as NodeResponse).body;
+}
+
+function isReadableStream(value: any): value is ReadableStream<any> {
+ return !!(value as ReadableStream<any>).getReader;
+}
+
+function isAsyncIterableIterator(value: any): value is AsyncIterableIterator<any> {
+ return !!(
+ canUseAsyncIteratorSymbol && (value as AsyncIterableIterator<any>)[Symbol.asyncIterator]
+ );
+}
+
+function isStreamableBlob(value: any): value is Blob {
+ return !!(value as Blob).stream;
+}
+
+function isBlob(value: any): value is Blob {
+ return !!(value as Blob).arrayBuffer;
+}
+
+function isNodeReadableStream(value: any): value is NodeReadableStream {
+ return !!(value as NodeReadableStream).pipe;
+}
+
+function readerIterator<T>(reader: ReadableStreamDefaultReader<T>): AsyncIterableIterator<T> {
+ const iterator: ReaderIterator<T> = {
+ next() {
+ return reader.read();
+ },
+ };
+
+ if (canUseAsyncIteratorSymbol) {
+ iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
+ //@ts-ignore
+ return this;
+ };
+ }
+
+ return iterator as AsyncIterableIterator<T>;
+}
+
+function promiseIterator<T = ArrayBuffer>(promise: Promise<ArrayBuffer>): AsyncIterableIterator<T> {
+ let resolved = false;
+
+ const iterator: PromiseIterator<T> = {
+ next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> {
+ if (resolved)
+ return Promise.resolve({
+ value: undefined,
+ done: true,
+ });
+ resolved = true;
+ return new Promise(function (resolve, reject) {
+ promise
+ .then(function (value) {
+ resolve({ value: value as unknown as T, done: false });
+ })
+ .catch(reject);
+ });
+ },
+ };
+
+ if (canUseAsyncIteratorSymbol) {
+ iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
+ return this;
+ };
+ }
+
+ return iterator as AsyncIterableIterator<T>;
+}
+
+function nodeStreamIterator<T>(stream: NodeReadableStream): AsyncIterableIterator<T> {
+ let cleanup: (() => void) | null = null;
+ let error: Error | null = null;
+ let done = false;
+ const data: unknown[] = [];
+
+ const waiting: [
+ (
+ value:
+ | IteratorResult<T, boolean | undefined>
+ | PromiseLike<IteratorResult<T, boolean | undefined>>
+ ) => void,
+ (reason?: any) => void
+ ][] = [];
+
+ function onData(chunk: any) {
+ if (error) return;
+ if (waiting.length) {
+ const shiftedArr = waiting.shift();
+ if (Array.isArray(shiftedArr) && shiftedArr[0]) {
+ return shiftedArr[0]({ value: chunk, done: false });
+ }
+ }
+ data.push(chunk);
+ }
+ function onError(err: Error) {
+ error = err;
+ const all = waiting.slice();
+ all.forEach(function (pair) {
+ pair[1](err);
+ });
+ !cleanup || cleanup();
+ }
+ function onEnd() {
+ done = true;
+ const all = waiting.slice();
+ all.forEach(function (pair) {
+ pair[0]({ value: undefined, done: true });
+ });
+ !cleanup || cleanup();
+ }
+
+ cleanup = function () {
+ cleanup = null;
+ stream.removeListener('data', onData);
+ stream.removeListener('error', onError);
+ stream.removeListener('end', onEnd);
+ stream.removeListener('finish', onEnd);
+ stream.removeListener('close', onEnd);
+ };
+ stream.on('data', onData);
+ stream.on('error', onError);
+ stream.on('end', onEnd);
+ stream.on('finish', onEnd);
+ stream.on('close', onEnd);
+
+ function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
+ return new Promise(function (resolve, reject) {
+ if (error) return reject(error);
+ if (data.length) return resolve({ value: data.shift() as T, done: false });
+ if (done) return resolve({ value: undefined, done: true });
+ waiting.push([resolve, reject]);
+ });
+ }
+
+ const iterator: NodeStreamIterator<T> = {
+ next(): Promise<IteratorResult<T, boolean | undefined>> {
+ return getNext();
+ },
+ };
+
+ if (canUseAsyncIteratorSymbol) {
+ iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
+ return this;
+ };
+ }
+
+ return iterator as AsyncIterableIterator<T>;
+}
+
+function asyncIterator<T>(source: AsyncIterableIterator<T>): AsyncIterableIterator<T> {
+ const iterator = source[Symbol.asyncIterator]();
+ return {
+ next(): Promise<IteratorResult<T, boolean>> {
+ return iterator.next();
+ },
+ [Symbol.asyncIterator](): AsyncIterableIterator<T> {
+ return this;
+ },
+ };
+}
+
+export function responseIterator<T>(
+ response: Response | NodeResponse | Buffer
+): AsyncIterableIterator<T> {
+ let body: unknown = response;
+
+ if (isNodeResponse(response)) body = response.body;
+
+ if (isBuffer(body)) body = NodeReadableStream.from(body);
+
+ if (isAsyncIterableIterator(body)) return asyncIterator<T>(body);
+
+ if (isReadableStream(body)) return readerIterator<T>(body.getReader());
+
+ // this errors without casting to ReadableStream<T>
+ // because Blob.stream() returns a NodeJS ReadableStream
+ if (isStreamableBlob(body)) {
+ return readerIterator<T>((body.stream() as unknown as ReadableStream<T>).getReader());
+ }
+
+ if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer());
+
+ if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body);
+
+ throw new Error('Unknown body type for responseIterator. Please pass a streamable response.');
+}