aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/streams.exports.js354
1 files changed, 214 insertions, 140 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 2e031a0df..979ef32f6 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -2199,11 +2199,12 @@ var require_legacy = __commonJS({
"use strict";
var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials();
var { EventEmitter: EE } = __require("events");
- function Stream(opts) {
- EE.call(this, opts);
- }
- ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
- ObjectSetPrototypeOf(Stream, EE);
+ var Stream = class Stream extends EE {
+ constructor(opts) {
+ super(opts);
+ }
+ };
+
Stream.prototype.pipe = function (dest, options) {
const source = this;
function ondata(chunk) {
@@ -2475,22 +2476,65 @@ var require_readable = __commonJS({
SymbolAsyncIterator,
Symbol: Symbol2,
} = require_primordials();
- module.exports = Readable;
- var ReadableState = globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState;
- Readable.ReadableState = ReadableState;
+
+ var ReadableState =
+ globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState;
var { EventEmitter: EE } = __require("events");
var { Stream, prependListener } = require_legacy();
+
+ class Readable extends Stream {
+ constructor(options) {
+ super(options);
+
+ const isDuplex = this instanceof require_duplex();
+ this._readableState = new ReadableState(options, this, isDuplex);
+ if (options) {
+ if (typeof options.read === "function") this._read = options.read;
+ if (typeof options.destroy === "function")
+ this._destroy = options.destroy;
+ if (typeof options.construct === "function")
+ this._construct = options.construct;
+ if (options.signal && !isDuplex) addAbortSignal(options.signal, this);
+ }
+
+ destroyImpl.construct(this, () => {
+ if (this._readableState.needReadable) {
+ maybeReadMore(this, this._readableState);
+ }
+ });
+ }
+
+ on(ev, fn) {
+ const res = super.on(ev, fn);
+ const state = this._readableState;
+ if (ev === "data") {
+ state.readableListening = this.listenerCount("readable") > 0;
+ if (state.flowing !== false) this.resume();
+ } else if (ev === "readable") {
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
+ state.flowing = false;
+ state.emittedReadable = false;
+ debug("on readable", state.length, state.reading);
+ if (state.length) {
+ emitReadable(this, state);
+ } else if (!state.reading) {
+ runOnNextTick(nReadingNextTick, this);
+ }
+ }
+ }
+ return res;
+ }
+
+ static ReadableState = ReadableState;
+ }
+ module.exports = Readable;
+
var { addAbortSignal } = require_add_abort_signal();
var eos = require_end_of_stream();
- var debug = require_util().debuglog("stream", (fn) => {
- debug = fn;
- });
- const {
- maybeReadMore,
- resume,
- emitReadable,
- onEofChunk,
- } = globalThis[Symbol.for("Bun.lazy")]("bun:stream");
+ var debug = (name) => {};
+ const { maybeReadMore, resume, emitReadable, onEofChunk } =
+ globalThis[Symbol.for("Bun.lazy")]("bun:stream");
var destroyImpl = require_destroy();
var {
aggregateTwoErrors,
@@ -2505,29 +2549,9 @@ var require_readable = __commonJS({
var { validateObject } = require_validators();
var { StringDecoder } = __require("string_decoder");
var from = require_from();
- ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
- ObjectSetPrototypeOf(Readable, Stream);
var nop = () => {};
var { errorOrDestroy } = destroyImpl;
- function Readable(options) {
- if (!(this instanceof Readable)) return new Readable(options);
- const isDuplex = this instanceof require_duplex();
- this._readableState = new ReadableState(options, this, isDuplex);
- if (options) {
- if (typeof options.read === "function") this._read = options.read;
- if (typeof options.destroy === "function")
- this._destroy = options.destroy;
- if (typeof options.construct === "function")
- this._construct = options.construct;
- if (options.signal && !isDuplex) addAbortSignal(options.signal, this);
- }
- Stream.call(this, options);
- destroyImpl.construct(this, () => {
- if (this._readableState.needReadable) {
- maybeReadMore(this, this._readableState);
- }
- });
- }
+
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function (err, cb) {
@@ -2674,18 +2698,24 @@ var require_readable = __commonJS({
if (n <= state.length) return n;
return state.ended ? state.length : 0;
}
+ // You can override either this method, or the async _read(n) below.
Readable.prototype.read = function (n) {
debug("read", n);
- if (n === void 0) {
- n = NaN;
- } else if (!NumberIsInteger(n)) {
+ if (!NumberIsInteger(n)) {
n = NumberParseInt(n, 10);
}
const state = this._readableState;
const nOrig = n;
+
+ // If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
+
if (n !== 0) state.emittedReadable = false;
+
+ // If we're doing read(0) to trigger a readable event, but we
+ // already have a bunch of data in the buffer, then just trigger
+ // the 'readable' event and move on.
if (
n === 0 &&
state.needReadable &&
@@ -2699,17 +2729,50 @@ var require_readable = __commonJS({
else emitReadable(this, state);
return null;
}
+
n = howMuchToRead(n, state);
+
+ // If we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
if (state.length === 0) endReadable(this);
return null;
}
+
+ // All the actual chunk generation logic needs to be
+ // *below* the call to _read. The reason is that in certain
+ // synthetic stream cases, such as passthrough streams, _read
+ // may be a completely synchronous operation which may change
+ // the state of the read buffer, providing enough data when
+ // before there was *not* enough.
+ //
+ // So, the steps are:
+ // 1. Figure out what the state of things will be after we do
+ // a read from the buffer.
+ //
+ // 2. If that resulting state will trigger a _read, then call _read.
+ // Note that this may be asynchronous, or synchronous. Yes, it is
+ // deeply ugly to write APIs this way, but that still doesn't mean
+ // that the Readable class should behave improperly, as streams are
+ // designed to be sync/async agnostic.
+ // Take note if the _read call is sync or async (ie, if the read call
+ // has returned yet), so that we know whether or not it's safe to emit
+ // 'readable' etc.
+ //
+ // 3. Actually pull the requested chunks out of the buffer and return.
+
+ // if we need a readable event, then we need to do some reading.
let doRead = state.needReadable;
debug("need readable", doRead);
+
+ // If we currently have less than the highWaterMark, then also read some.
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug("length less than watermark", doRead);
}
+
+ // However, if we've ended, then there's no point, if we're already
+ // reading, then it's unnecessary, if we're constructing we have to wait,
+ // and if we're destroyed or errored, then it's not allowed,
if (
state.ended ||
state.reading ||
@@ -2723,18 +2786,32 @@ var require_readable = __commonJS({
debug("do read");
state.reading = true;
state.sync = true;
+ // If the length is currently zero, then we *need* a readable event.
if (state.length === 0) state.needReadable = true;
+
+ // Call internal read method
try {
- this._read(state.highWaterMark);
+ const result = this._read(state.highWaterMark);
+ const then = result?.then;
+ if (then && typeof then === "function") {
+ then.call(result, nop, function (err) {
+ errorOrDestroy(this, err);
+ });
+ }
} catch (err) {
errorOrDestroy(this, err);
}
+
state.sync = false;
+ // If _read pushed data synchronously, then `reading` will be false,
+ // and we need to re-evaluate how much data we can return to the user.
if (!state.reading) n = howMuchToRead(nOrig, state);
}
+
let ret;
if (n > 0) ret = fromList(n, state);
else ret = null;
+
if (ret === null) {
state.needReadable = state.length <= state.highWaterMark;
n = 0;
@@ -2746,14 +2823,21 @@ var require_readable = __commonJS({
state.awaitDrainWriters = null;
}
}
+
if (state.length === 0) {
+ // If we have nothing in the buffer, then we want to know
+ // as soon as we *do* get something into the buffer.
if (!state.ended) state.needReadable = true;
+
+ // If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended) endReadable(this);
}
+
if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
state.dataEmitted = true;
this.emit("data", ret);
}
+
return ret;
};
Readable.prototype._read = function (n) {
@@ -2922,27 +3006,6 @@ var require_readable = __commonJS({
dest.emit("unpipe", this, unpipeInfo);
return this;
};
- Readable.prototype.on = function (ev, fn) {
- const res = Stream.prototype.on.call(this, ev, fn);
- const state = this._readableState;
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
- if (state.flowing !== false) this.resume();
- } else if (ev === "readable") {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- debug("on readable", state.length, state.reading);
- if (state.length) {
- emitReadable(this, state);
- } else if (!state.reading) {
- runOnNextTick(nReadingNextTick, this);
- }
- }
- }
- return res;
- };
Readable.prototype.addListener = Readable.prototype.on;
Readable.prototype.removeListener = function (ev, fn) {
const res = Stream.prototype.removeListener.call(this, ev, fn);
@@ -3319,8 +3382,7 @@ var require_writable = __commonJS({
Symbol: Symbol2,
SymbolHasInstance,
} = require_primordials();
- module.exports = Writable;
- Writable.WritableState = WritableState;
+
var { EventEmitter: EE } = __require("events");
var Stream = require_legacy().Stream;
var destroyImpl = require_destroy();
@@ -3338,8 +3400,37 @@ var require_writable = __commonJS({
ERR_UNKNOWN_ENCODING,
} = require_errors().codes;
var { errorOrDestroy } = destroyImpl;
- ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
- ObjectSetPrototypeOf(Writable, Stream);
+
+ var Writable = class Writable extends Stream {
+ constructor(options = {}) {
+ super(options);
+
+ const isDuplex = this instanceof require_duplex();
+ if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
+ return new Writable(options);
+ this._writableState = new WritableState(options, this, isDuplex);
+ if (options) {
+ if (typeof options.write === "function") this._write = options.write;
+ if (typeof options.writev === "function")
+ this._writev = options.writev;
+ if (typeof options.destroy === "function")
+ this._destroy = options.destroy;
+ if (typeof options.final === "function") this._final = options.final;
+ if (typeof options.construct === "function")
+ this._construct = options.construct;
+ if (options.signal) addAbortSignal(options.signal, this);
+ }
+ destroyImpl.construct(this, () => {
+ const state = this._writableState;
+ if (!state.writing) {
+ clearBuffer(this, state);
+ }
+ finishMaybe(this, state);
+ });
+ }
+ };
+ module.exports = Writable;
+
function nop() {}
var kOnFinished = Symbol2("kOnFinished");
function WritableState(options, stream, isDuplex) {
@@ -3396,30 +3487,7 @@ var require_writable = __commonJS({
return this.buffered.length - this.bufferedIndex;
},
});
- function Writable(options) {
- const isDuplex = this instanceof require_duplex();
- if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
- return new Writable(options);
- this._writableState = new WritableState(options, this, isDuplex);
- if (options) {
- if (typeof options.write === "function") this._write = options.write;
- if (typeof options.writev === "function") this._writev = options.writev;
- if (typeof options.destroy === "function")
- this._destroy = options.destroy;
- if (typeof options.final === "function") this._final = options.final;
- if (typeof options.construct === "function")
- this._construct = options.construct;
- if (options.signal) addAbortSignal(options.signal, this);
- }
- Stream.call(this, options);
- destroyImpl.construct(this, () => {
- const state = this._writableState;
- if (!state.writing) {
- clearBuffer(this, state);
- }
- finishMaybe(this, state);
- });
- }
+
ObjectDefineProperty(Writable, SymbolHasInstance, {
value: function (object) {
if (FunctionPrototypeSymbolHasInstance(this, object)) return true;
@@ -4381,11 +4449,35 @@ var require_duplex = __commonJS({
ObjectKeys,
ObjectSetPrototypeOf,
} = require_primordials();
- module.exports = Duplex;
+
var Readable = require_readable();
var Writable = require_writable();
- ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
- ObjectSetPrototypeOf(Duplex, Readable);
+
+ var Duplex = class Duplex extends Readable {
+ constructor(options) {
+ super(options);
+
+ Writable.call(this, options);
+ if (options) {
+ this.allowHalfOpen = options.allowHalfOpen !== false;
+ if (options.readable === false) {
+ this._readableState.readable = false;
+ this._readableState.ended = true;
+ this._readableState.endEmitted = true;
+ }
+ if (options.writable === false) {
+ this._writableState.writable = false;
+ this._writableState.ending = true;
+ this._writableState.ended = true;
+ this._writableState.finished = true;
+ }
+ } else {
+ this.allowHalfOpen = true;
+ }
+ }
+ };
+ module.exports = Duplex;
+
{
const keys = ObjectKeys(Writable.prototype);
for (let i = 0; i < keys.length; i++) {
@@ -4394,27 +4486,7 @@ var require_duplex = __commonJS({
Duplex.prototype[method] = Writable.prototype[method];
}
}
- function Duplex(options) {
- if (!(this instanceof Duplex)) return new Duplex(options);
- Readable.call(this, options);
- Writable.call(this, options);
- if (options) {
- this.allowHalfOpen = options.allowHalfOpen !== false;
- if (options.readable === false) {
- this._readableState.readable = false;
- this._readableState.ended = true;
- this._readableState.endEmitted = true;
- }
- if (options.writable === false) {
- this._writableState.writable = false;
- this._writableState.ending = true;
- this._writableState.ended = true;
- this._writableState.finished = true;
- }
- } else {
- this.allowHalfOpen = true;
- }
- }
+
ObjectDefineProperties(Duplex.prototype, {
writable: ObjectGetOwnPropertyDescriptor(Writable.prototype, "writable"),
writableHighWaterMark: ObjectGetOwnPropertyDescriptor(
@@ -4499,24 +4571,23 @@ var require_transform = __commonJS({
) {
"use strict";
var { ObjectSetPrototypeOf, Symbol: Symbol2 } = require_primordials();
- module.exports = Transform;
var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes;
var Duplex = require_duplex();
- ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
- ObjectSetPrototypeOf(Transform, Duplex);
+ var Transform = class Transform extends Duplex {
+ constructor(options) {
+ super(options);
+ this._readableState.sync = false;
+ this[kCallback] = null;
+ if (options) {
+ if (typeof options.transform === "function")
+ this._transform = options.transform;
+ if (typeof options.flush === "function") this._flush = options.flush;
+ }
+ this.on("prefinish", prefinish);
+ }
+ };
+ module.exports = Transform;
var kCallback = Symbol2("kCallback");
- function Transform(options) {
- if (!(this instanceof Transform)) return new Transform(options);
- Duplex.call(this, options);
- this._readableState.sync = false;
- this[kCallback] = null;
- if (options) {
- if (typeof options.transform === "function")
- this._transform = options.transform;
- if (typeof options.flush === "function") this._flush = options.flush;
- }
- this.on("prefinish", prefinish);
- }
function final(cb) {
if (typeof this._flush === "function" && !this.destroyed) {
this._flush((er, data) => {
@@ -4594,18 +4665,20 @@ var require_passthrough = __commonJS({
module
) {
"use strict";
- var { ObjectSetPrototypeOf } = require_primordials();
- module.exports = PassThrough;
var Transform = require_transform();
- ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype);
- ObjectSetPrototypeOf(PassThrough, Transform);
- function PassThrough(options) {
- if (!(this instanceof PassThrough)) return new PassThrough(options);
- Transform.call(this, options);
- }
- PassThrough.prototype._transform = function (chunk, encoding, cb) {
- cb(null, chunk);
+ var PassThrough = class PassThrough extends Transform {
+ constructor(options) {
+ super(options);
+ this._transform = this.#transform;
+ }
+
+ _transform;
+
+ #transform(chunk, encoding, cb) {
+ cb(null, chunk);
+ }
};
+ module.exports = PassThrough;
},
});
@@ -5361,3 +5434,4 @@ export var destroy = stream_exports.destroy;
export var pipeline = stream_exports.pipeline;
export var compose = stream_exports.compose;
export var Stream = stream_exports.Stream;
+export var eos = (stream_exports["eos"] = require_end_of_stream);