diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/streams.exports.js | 354 |
1 files changed, 214 insertions, 140 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 2e031a0df..979ef32f6 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -2199,11 +2199,12 @@ var require_legacy = __commonJS({ "use strict"; var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials(); var { EventEmitter: EE } = __require("events"); - function Stream(opts) { - EE.call(this, opts); - } - ObjectSetPrototypeOf(Stream.prototype, EE.prototype); - ObjectSetPrototypeOf(Stream, EE); + var Stream = class Stream extends EE { + constructor(opts) { + super(opts); + } + }; + Stream.prototype.pipe = function (dest, options) { const source = this; function ondata(chunk) { @@ -2475,22 +2476,65 @@ var require_readable = __commonJS({ SymbolAsyncIterator, Symbol: Symbol2, } = require_primordials(); - module.exports = Readable; - var ReadableState = globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState; - Readable.ReadableState = ReadableState; + + var ReadableState = + globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState; var { EventEmitter: EE } = __require("events"); var { Stream, prependListener } = require_legacy(); + + class Readable extends Stream { + constructor(options) { + super(options); + + const isDuplex = this instanceof require_duplex(); + this._readableState = new ReadableState(options, this, isDuplex); + if (options) { + if (typeof options.read === "function") this._read = options.read; + if (typeof options.destroy === "function") + this._destroy = options.destroy; + if (typeof options.construct === "function") + this._construct = options.construct; + if (options.signal && !isDuplex) addAbortSignal(options.signal, this); + } + + destroyImpl.construct(this, () => { + if (this._readableState.needReadable) { + maybeReadMore(this, this._readableState); + } + }); + } + + on(ev, fn) { + const res = super.on(ev, fn); + const state = this._readableState; + if (ev === "data") { + state.readableListening = this.listenerCount("readable") > 0; + if (state.flowing !== false) this.resume(); + } else if (ev === "readable") { + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; + state.flowing = false; + state.emittedReadable = false; + debug("on readable", state.length, state.reading); + if (state.length) { + emitReadable(this, state); + } else if (!state.reading) { + runOnNextTick(nReadingNextTick, this); + } + } + } + return res; + } + + static ReadableState = ReadableState; + } + module.exports = Readable; + var { addAbortSignal } = require_add_abort_signal(); var eos = require_end_of_stream(); - var debug = require_util().debuglog("stream", (fn) => { - debug = fn; - }); - const { - maybeReadMore, - resume, - emitReadable, - onEofChunk, - } = globalThis[Symbol.for("Bun.lazy")]("bun:stream"); + var debug = (name) => {}; + const { maybeReadMore, resume, emitReadable, onEofChunk } = + globalThis[Symbol.for("Bun.lazy")]("bun:stream"); var destroyImpl = require_destroy(); var { aggregateTwoErrors, @@ -2505,29 +2549,9 @@ var require_readable = __commonJS({ var { validateObject } = require_validators(); var { StringDecoder } = __require("string_decoder"); var from = require_from(); - ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); - ObjectSetPrototypeOf(Readable, Stream); var nop = () => {}; var { errorOrDestroy } = destroyImpl; - function Readable(options) { - if (!(this instanceof Readable)) return new Readable(options); - const isDuplex = this instanceof require_duplex(); - this._readableState = new ReadableState(options, this, isDuplex); - if (options) { - if (typeof options.read === "function") this._read = options.read; - if (typeof options.destroy === "function") - this._destroy = options.destroy; - if (typeof options.construct === "function") - this._construct = options.construct; - if (options.signal && !isDuplex) addAbortSignal(options.signal, this); - } - Stream.call(this, options); - destroyImpl.construct(this, () => { - if (this._readableState.needReadable) { - maybeReadMore(this, this._readableState); - } - }); - } + Readable.prototype.destroy = destroyImpl.destroy; Readable.prototype._undestroy = destroyImpl.undestroy; Readable.prototype._destroy = function (err, cb) { @@ -2674,18 +2698,24 @@ var require_readable = __commonJS({ if (n <= state.length) return n; return state.ended ? state.length : 0; } + // You can override either this method, or the async _read(n) below. Readable.prototype.read = function (n) { debug("read", n); - if (n === void 0) { - n = NaN; - } else if (!NumberIsInteger(n)) { + if (!NumberIsInteger(n)) { n = NumberParseInt(n, 10); } const state = this._readableState; const nOrig = n; + + // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); + if (n !== 0) state.emittedReadable = false; + + // If we're doing read(0) to trigger a readable event, but we + // already have a bunch of data in the buffer, then just trigger + // the 'readable' event and move on. if ( n === 0 && state.needReadable && @@ -2699,17 +2729,50 @@ var require_readable = __commonJS({ else emitReadable(this, state); return null; } + n = howMuchToRead(n, state); + + // If we've ended, and we're now clear, then finish it up. if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; } + + // All the actual chunk generation logic needs to be + // *below* the call to _read. The reason is that in certain + // synthetic stream cases, such as passthrough streams, _read + // may be a completely synchronous operation which may change + // the state of the read buffer, providing enough data when + // before there was *not* enough. + // + // So, the steps are: + // 1. Figure out what the state of things will be after we do + // a read from the buffer. + // + // 2. If that resulting state will trigger a _read, then call _read. + // Note that this may be asynchronous, or synchronous. Yes, it is + // deeply ugly to write APIs this way, but that still doesn't mean + // that the Readable class should behave improperly, as streams are + // designed to be sync/async agnostic. + // Take note if the _read call is sync or async (ie, if the read call + // has returned yet), so that we know whether or not it's safe to emit + // 'readable' etc. + // + // 3. Actually pull the requested chunks out of the buffer and return. + + // if we need a readable event, then we need to do some reading. let doRead = state.needReadable; debug("need readable", doRead); + + // If we currently have less than the highWaterMark, then also read some. if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; debug("length less than watermark", doRead); } + + // However, if we've ended, then there's no point, if we're already + // reading, then it's unnecessary, if we're constructing we have to wait, + // and if we're destroyed or errored, then it's not allowed, if ( state.ended || state.reading || @@ -2723,18 +2786,32 @@ var require_readable = __commonJS({ debug("do read"); state.reading = true; state.sync = true; + // If the length is currently zero, then we *need* a readable event. if (state.length === 0) state.needReadable = true; + + // Call internal read method try { - this._read(state.highWaterMark); + const result = this._read(state.highWaterMark); + const then = result?.then; + if (then && typeof then === "function") { + then.call(result, nop, function (err) { + errorOrDestroy(this, err); + }); + } } catch (err) { errorOrDestroy(this, err); } + state.sync = false; + // If _read pushed data synchronously, then `reading` will be false, + // and we need to re-evaluate how much data we can return to the user. if (!state.reading) n = howMuchToRead(nOrig, state); } + let ret; if (n > 0) ret = fromList(n, state); else ret = null; + if (ret === null) { state.needReadable = state.length <= state.highWaterMark; n = 0; @@ -2746,14 +2823,21 @@ var require_readable = __commonJS({ state.awaitDrainWriters = null; } } + if (state.length === 0) { + // If we have nothing in the buffer, then we want to know + // as soon as we *do* get something into the buffer. if (!state.ended) state.needReadable = true; + + // If we tried to read() past the EOF, then emit end on the next tick. if (nOrig !== n && state.ended) endReadable(this); } + if (ret !== null && !state.errorEmitted && !state.closeEmitted) { state.dataEmitted = true; this.emit("data", ret); } + return ret; }; Readable.prototype._read = function (n) { @@ -2922,27 +3006,6 @@ var require_readable = __commonJS({ dest.emit("unpipe", this, unpipeInfo); return this; }; - Readable.prototype.on = function (ev, fn) { - const res = Stream.prototype.on.call(this, ev, fn); - const state = this._readableState; - if (ev === "data") { - state.readableListening = this.listenerCount("readable") > 0; - if (state.flowing !== false) this.resume(); - } else if (ev === "readable") { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - debug("on readable", state.length, state.reading); - if (state.length) { - emitReadable(this, state); - } else if (!state.reading) { - runOnNextTick(nReadingNextTick, this); - } - } - } - return res; - }; Readable.prototype.addListener = Readable.prototype.on; Readable.prototype.removeListener = function (ev, fn) { const res = Stream.prototype.removeListener.call(this, ev, fn); @@ -3319,8 +3382,7 @@ var require_writable = __commonJS({ Symbol: Symbol2, SymbolHasInstance, } = require_primordials(); - module.exports = Writable; - Writable.WritableState = WritableState; + var { EventEmitter: EE } = __require("events"); var Stream = require_legacy().Stream; var destroyImpl = require_destroy(); @@ -3338,8 +3400,37 @@ var require_writable = __commonJS({ ERR_UNKNOWN_ENCODING, } = require_errors().codes; var { errorOrDestroy } = destroyImpl; - ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); - ObjectSetPrototypeOf(Writable, Stream); + + var Writable = class Writable extends Stream { + constructor(options = {}) { + super(options); + + const isDuplex = this instanceof require_duplex(); + if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) + return new Writable(options); + this._writableState = new WritableState(options, this, isDuplex); + if (options) { + if (typeof options.write === "function") this._write = options.write; + if (typeof options.writev === "function") + this._writev = options.writev; + if (typeof options.destroy === "function") + this._destroy = options.destroy; + if (typeof options.final === "function") this._final = options.final; + if (typeof options.construct === "function") + this._construct = options.construct; + if (options.signal) addAbortSignal(options.signal, this); + } + destroyImpl.construct(this, () => { + const state = this._writableState; + if (!state.writing) { + clearBuffer(this, state); + } + finishMaybe(this, state); + }); + } + }; + module.exports = Writable; + function nop() {} var kOnFinished = Symbol2("kOnFinished"); function WritableState(options, stream, isDuplex) { @@ -3396,30 +3487,7 @@ var require_writable = __commonJS({ return this.buffered.length - this.bufferedIndex; }, }); - function Writable(options) { - const isDuplex = this instanceof require_duplex(); - if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) - return new Writable(options); - this._writableState = new WritableState(options, this, isDuplex); - if (options) { - if (typeof options.write === "function") this._write = options.write; - if (typeof options.writev === "function") this._writev = options.writev; - if (typeof options.destroy === "function") - this._destroy = options.destroy; - if (typeof options.final === "function") this._final = options.final; - if (typeof options.construct === "function") - this._construct = options.construct; - if (options.signal) addAbortSignal(options.signal, this); - } - Stream.call(this, options); - destroyImpl.construct(this, () => { - const state = this._writableState; - if (!state.writing) { - clearBuffer(this, state); - } - finishMaybe(this, state); - }); - } + ObjectDefineProperty(Writable, SymbolHasInstance, { value: function (object) { if (FunctionPrototypeSymbolHasInstance(this, object)) return true; @@ -4381,11 +4449,35 @@ var require_duplex = __commonJS({ ObjectKeys, ObjectSetPrototypeOf, } = require_primordials(); - module.exports = Duplex; + var Readable = require_readable(); var Writable = require_writable(); - ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); - ObjectSetPrototypeOf(Duplex, Readable); + + var Duplex = class Duplex extends Readable { + constructor(options) { + super(options); + + Writable.call(this, options); + if (options) { + this.allowHalfOpen = options.allowHalfOpen !== false; + if (options.readable === false) { + this._readableState.readable = false; + this._readableState.ended = true; + this._readableState.endEmitted = true; + } + if (options.writable === false) { + this._writableState.writable = false; + this._writableState.ending = true; + this._writableState.ended = true; + this._writableState.finished = true; + } + } else { + this.allowHalfOpen = true; + } + } + }; + module.exports = Duplex; + { const keys = ObjectKeys(Writable.prototype); for (let i = 0; i < keys.length; i++) { @@ -4394,27 +4486,7 @@ var require_duplex = __commonJS({ Duplex.prototype[method] = Writable.prototype[method]; } } - function Duplex(options) { - if (!(this instanceof Duplex)) return new Duplex(options); - Readable.call(this, options); - Writable.call(this, options); - if (options) { - this.allowHalfOpen = options.allowHalfOpen !== false; - if (options.readable === false) { - this._readableState.readable = false; - this._readableState.ended = true; - this._readableState.endEmitted = true; - } - if (options.writable === false) { - this._writableState.writable = false; - this._writableState.ending = true; - this._writableState.ended = true; - this._writableState.finished = true; - } - } else { - this.allowHalfOpen = true; - } - } + ObjectDefineProperties(Duplex.prototype, { writable: ObjectGetOwnPropertyDescriptor(Writable.prototype, "writable"), writableHighWaterMark: ObjectGetOwnPropertyDescriptor( @@ -4499,24 +4571,23 @@ var require_transform = __commonJS({ ) { "use strict"; var { ObjectSetPrototypeOf, Symbol: Symbol2 } = require_primordials(); - module.exports = Transform; var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes; var Duplex = require_duplex(); - ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); - ObjectSetPrototypeOf(Transform, Duplex); + var Transform = class Transform extends Duplex { + constructor(options) { + super(options); + this._readableState.sync = false; + this[kCallback] = null; + if (options) { + if (typeof options.transform === "function") + this._transform = options.transform; + if (typeof options.flush === "function") this._flush = options.flush; + } + this.on("prefinish", prefinish); + } + }; + module.exports = Transform; var kCallback = Symbol2("kCallback"); - function Transform(options) { - if (!(this instanceof Transform)) return new Transform(options); - Duplex.call(this, options); - this._readableState.sync = false; - this[kCallback] = null; - if (options) { - if (typeof options.transform === "function") - this._transform = options.transform; - if (typeof options.flush === "function") this._flush = options.flush; - } - this.on("prefinish", prefinish); - } function final(cb) { if (typeof this._flush === "function" && !this.destroyed) { this._flush((er, data) => { @@ -4594,18 +4665,20 @@ var require_passthrough = __commonJS({ module ) { "use strict"; - var { ObjectSetPrototypeOf } = require_primordials(); - module.exports = PassThrough; var Transform = require_transform(); - ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype); - ObjectSetPrototypeOf(PassThrough, Transform); - function PassThrough(options) { - if (!(this instanceof PassThrough)) return new PassThrough(options); - Transform.call(this, options); - } - PassThrough.prototype._transform = function (chunk, encoding, cb) { - cb(null, chunk); + var PassThrough = class PassThrough extends Transform { + constructor(options) { + super(options); + this._transform = this.#transform; + } + + _transform; + + #transform(chunk, encoding, cb) { + cb(null, chunk); + } }; + module.exports = PassThrough; }, }); @@ -5361,3 +5434,4 @@ export var destroy = stream_exports.destroy; export var pipeline = stream_exports.pipeline; export var compose = stream_exports.compose; export var Stream = stream_exports.Stream; +export var eos = (stream_exports["eos"] = require_end_of_stream); |