aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/streams.exports.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/streams.exports.js')
-rw-r--r--src/bun.js/streams.exports.js199
1 files changed, 197 insertions, 2 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 979ef32f6..576f3ea89 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -1,5 +1,6 @@
// "readable-stream" npm package
// just transpiled
+var { isPromise } = import.meta.primordials;
var __create = Object.create;
var __defProp = Object.defineProperty;
@@ -33,6 +34,76 @@ var __copyProps = (to, from, except, desc) => {
var runOnNextTick = process.nextTick;
+function isReadableStream(value) {
+ return (
+ typeof value === "object" &&
+ value !== null &&
+ value instanceof ReadableStream
+ );
+}
+
+function validateBoolean(value, name) {
+ if (typeof value !== "boolean")
+ throw new ERR_INVALID_ARG_TYPE(name, "boolean", value);
+}
+
+/**
+ * @callback validateObject
+ * @param {*} value
+ * @param {string} name
+ * @param {{
+ * allowArray?: boolean,
+ * allowFunction?: boolean,
+ * nullable?: boolean
+ * }} [options]
+ */
+
+/** @type {validateObject} */
+const validateObject = (value, name, options = null) => {
+ const allowArray = options?.allowArray ?? false;
+ const allowFunction = options?.allowFunction ?? false;
+ const nullable = options?.nullable ?? false;
+ if (
+ (!nullable && value === null) ||
+ (!allowArray && ArrayIsArray(value)) ||
+ (typeof value !== "object" &&
+ (!allowFunction || typeof value !== "function"))
+ ) {
+ throw new ERR_INVALID_ARG_TYPE(name, "Object", value);
+ }
+};
+
+/**
+ * @callback validateString
+ * @param {*} value
+ * @param {string} name
+ * @returns {asserts value is string}
+ */
+
+/** @type {validateString} */
+function validateString(value, name) {
+ if (typeof value !== "string")
+ throw new ERR_INVALID_ARG_TYPE(name, "string", value);
+}
+
+var ArrayIsArray = Array.isArray;
+
+//------------------------------------------------------------------------------
+// Node error polyfills
+//------------------------------------------------------------------------------
+
+function ERR_INVALID_ARG_TYPE(name, type, value) {
+ return new Error(
+ `The argument '${name}' is invalid. Received '${value}' for type '${type}'`
+ );
+}
+
+function ERR_INVALID_ARG_VALUE(name, value, reason) {
+ return new Error(
+ `The value '${value}' is invalid for argument '${name}'. Reason: ${reason}`
+ );
+}
+
// node_modules/readable-stream/lib/ours/primordials.js
var require_primordials = __commonJS({
"node_modules/readable-stream/lib/ours/primordials.js"(exports, module) {
@@ -2509,7 +2580,9 @@ var require_readable = __commonJS({
const state = this._readableState;
if (ev === "data") {
state.readableListening = this.listenerCount("readable") > 0;
- if (state.flowing !== false) this.resume();
+ if (state.flowing !== false) {
+ this.resume();
+ }
} else if (ev === "readable") {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
@@ -2528,6 +2601,126 @@ var require_readable = __commonJS({
static ReadableState = ReadableState;
}
+
+ class ReadableFromWeb extends Readable {
+ #reader;
+ #closed;
+
+ constructor(options) {
+ const { objectMode, highWaterMark, encoding, signal, reader } = options;
+ super({
+ objectMode,
+ highWaterMark,
+ encoding,
+ signal,
+ });
+
+ this.#reader = reader;
+ this.#reader.closed
+ .then(() => {
+ this.#closed = true;
+ })
+ .catch((error) => {
+ this.#closed = true;
+ destroy(this, error);
+ });
+ }
+
+ async _read() {
+ var deferredError;
+ try {
+ var done, value;
+ const firstResult = this.#reader.readMany();
+
+ if (isPromise(firstResult)) {
+ const result = await firstResult;
+ done = result.done;
+ value = result.value;
+ } else {
+ done = firstResult.done;
+ value = firstResult.value;
+ }
+
+ if (done) {
+ this.push(null);
+ return;
+ }
+
+ if (!value)
+ throw new Error(
+ `Invalid value from ReadableStream reader: ${value}`
+ );
+ if (ArrayIsArray(value)) {
+ this.push(...value);
+ } else {
+ this.push(value);
+ }
+ } catch (e) {
+ deferredError = e;
+ } finally {
+ if (deferredError) throw deferredError;
+ }
+ }
+
+ _destroy(error, callback) {
+ if (!this.#closed) {
+ this.#reader.releaseLock();
+ this.#reader.cancel(error).then(done).catch(done);
+ return;
+ }
+ try {
+ callback(error);
+ } catch (error) {
+ globalThis.reportError(error);
+ }
+ }
+
+ // NOTE(Derrick): For whatever reason this seems to be necessary to make this work
+ // I couldn't find out why .constructed was getting set to false
+ // even though construct() was getting called
+ _construct() {
+ this._readableState.constructed = true;
+ }
+ }
+
+ /**
+ * @param {ReadableStream} readableStream
+ * @param {{
+ * highWaterMark? : number,
+ * encoding? : string,
+ * objectMode? : boolean,
+ * signal? : AbortSignal,
+ * }} [options]
+ * @returns {Readable}
+ */
+ function newStreamReadableFromReadableStream(readableStream, options = {}) {
+ if (!isReadableStream(readableStream)) {
+ throw new ERR_INVALID_ARG_TYPE(
+ "readableStream",
+ "ReadableStream",
+ readableStream
+ );
+ }
+
+ validateObject(options, "options");
+ const { highWaterMark, encoding, objectMode = false, signal } = options;
+
+ if (encoding !== undefined && !Buffer.isEncoding(encoding))
+ throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
+ validateBoolean(objectMode, "options.objectMode");
+
+ const reader = readableStream.getReader();
+ const readable = new ReadableFromWeb({
+ highWaterMark,
+ encoding,
+ objectMode,
+ signal,
+ reader,
+ });
+
+ return readable;
+ }
+
module.exports = Readable;
var { addAbortSignal } = require_add_abort_signal();
@@ -3327,7 +3520,9 @@ var require_readable = __commonJS({
Readable.from = function (iterable, opts) {
return from(Readable, iterable, opts);
};
- var webStreamsAdapters;
+ var webStreamsAdapters = {
+ newStreamReadableFromReadableStream,
+ };
function lazyWebStreams() {
if (webStreamsAdapters === void 0) webStreamsAdapters = {};
return webStreamsAdapters;