diff options
author | 2022-12-06 15:26:39 -0600 | |
---|---|---|
committer | 2022-12-06 13:26:39 -0800 | |
commit | 7d29782896f31ae5249f00e9505fd978e9733644 (patch) | |
tree | 8c32e0a7aaefeb3d74ed1e5317ca840334997c36 | |
parent | 1aa4cd2f6aadb2b49851e6fefbde9b48c87047be (diff) | |
download | bun-7d29782896f31ae5249f00e9505fd978e9733644.tar.gz bun-7d29782896f31ae5249f00e9505fd978e9733644.tar.zst bun-7d29782896f31ae5249f00e9505fd978e9733644.zip |
cleanup/fix(stdio,child_process): bug fixes, refactoring, etc. (#1574)
* fix(stream): add back call to Error.captureStackTrace, remove stray garbage
* cleanup(streams): remove unnecessary checks on Promise.then
* fix(child_process): WrappedFileSink -> NativeWritable for ChildProcess stdio
* cleanup(child_process): remove debug id
* fix(child_process): fix process.nextTick not working in onExit, fail to flush stdio
* refactor(streams): revert stream impls to functions from classes
* fix(child_process): revert runOnNextTick to process.nextTick
-rw-r--r-- | src/bun.js/child_process.exports.js | 59 | ||||
-rw-r--r-- | src/bun.js/streams.exports.js | 1065 |
2 files changed, 218 insertions, 906 deletions
diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js index df4a609d8..b0c32abb6 100644 --- a/src/bun.js/child_process.exports.js +++ b/src/bun.js/child_process.exports.js @@ -1,12 +1,13 @@ const EventEmitter = import.meta.require("node:events"); const { Readable: { fromWeb: ReadableFromWeb }, + NativeWritable, } = import.meta.require("node:stream"); const { constants: { signals }, } = import.meta.require("node:os"); -const { ArrayBuffer, isPromise, isCallable } = import.meta.primordials; +const { ArrayBuffer } = import.meta.primordials; const MAX_BUFFER = 1024 * 1024; @@ -946,7 +947,7 @@ export class ChildProcess extends EventEmitter { case 0: { switch (io) { case "pipe": - return new WrappedFileSink(this.#handle.stdin); + return new NativeWritable(this.#handle.stdin); case "inherit": return process.stdin || null; case "destroyed": @@ -1267,7 +1268,6 @@ function normalizeStdio(stdio) { function flushStdio(subprocess) { const stdio = subprocess.stdio; - if (stdio == null) return; for (let i = 0; i < stdio.length; i++) { @@ -1297,53 +1297,6 @@ function abortChildProcess(child, killSignal) { } } -class WrappedFileSink extends EventEmitter { - #fileSink; - #writePromises = []; - - constructor(fileSink) { - super(); - this.#fileSink = fileSink; - } - - write(data) { - var fileSink = this.#fileSink; - var result = fileSink.write(data); - - var then = result?.then; - if (isPromise(result) && then && isCallable(then)) { - var writePromises = this.#writePromises; - var i = writePromises.length; - writePromises[i] = result; - - then(() => { - this.emit("drain"); - fileSink.flush(true); - // We can't naively use i here because we don't know when writes will resolve necessarily - writePromises.splice(writePromises.indexOf(result), 1); - }); - return false; - } - fileSink.flush(true); - return true; - } - - destroy() { - this.end(); - } - - end() { - var writePromises = this.#writePromises; - if (writePromises.length) { - PromiseAll(writePromises).then(() => { - this.#fileSink.end(); - }); - } else { - this.#fileSink.end(); - } - } -} - class ShimmedStdin extends EventEmitter { constructor() { super(); @@ -1353,9 +1306,12 @@ class ShimmedStdin extends EventEmitter { } destroy() {} end() {} + pipe() {} } -class ShimmedStdioOutStream extends EventEmitter {} +class ShimmedStdioOutStream extends EventEmitter { + pipe() {} +} //------------------------------------------------------------------------------ // Section 5. Validators @@ -1819,7 +1775,6 @@ function ERR_INVALID_ARG_VALUE(name, value, reason) { ); } -// TODO: Add actual proper error implementation here class SystemError extends Error { path; syscall; diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 7f5a0b8f7..1582cb756 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -1,6 +1,6 @@ // "readable-stream" npm package // just transpiled -var { isPromise, isCallable, direct } = import.meta.primordials; +var { isPromise, isCallable, direct, Object } = import.meta.primordials; globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length ? process.env.DEBUG_TRACK_EE.split(",") @@ -36,42 +36,48 @@ var __getOwnPropDesc = Object.getOwnPropertyDescriptor; var __getOwnPropNames = Object.getOwnPropertyNames; var __getProtoOf = Object.getPrototypeOf; var __hasOwnProp = Object.prototype.hasOwnProperty; +var __ObjectSetPrototypeOf = Object.setPrototypeOf; var __require = (x) => import.meta.require(x); -var DebugEventEmitter = class DebugEventEmitter extends __require("events") { - constructor(opts) { - super(opts); - const __id = opts.__id; - if (__id) { - __defProp(this, "__id", { - value: __id, - readable: true, - writable: false, - enumerable: false, - }); - } - } - emit(event, ...args) { - var __id = this.__id; - if (__id) { - debug("emit", event, ...args, __id); - } else { - debug("emit", event, ...args); - } - return super.emit(event, ...args); +var _EE = __require("events"); + +function DebugEventEmitter(opts) { + if (!(this instanceof DebugEventEmitter)) return new DebugEventEmitter(opts); + _EE.call(this, opts); + const __id = opts.__id; + if (__id) { + __defProp(this, "__id", { + value: __id, + readable: true, + writable: false, + enumerable: false, + }); } - on(event, handler) { - var __id = this.__id; - if (__id) { - debug("on", event, "added", __id); - } else { - debug("on", event, "added"); - } - super.on(event, handler); +} + +__ObjectSetPrototypeOf(DebugEventEmitter.prototype, _EE.prototype); +__ObjectSetPrototypeOf(DebugEventEmitter, _EE); + +DebugEventEmitter.prototype.emit = function (event, ...args) { + var __id = this.__id; + if (__id) { + debug("emit", event, ...args, __id); + } else { + debug("emit", event, ...args); } - addListener(event, handler) { - this.on(event, handler); + return _EE.prototype.emit.call(this, event, ...args); +}; +DebugEventEmitter.prototype.on = function (event, handler) { + var __id = this.__id; + if (__id) { + debug("on", event, "added", __id); + } else { + debug("on", event, "added"); } + return _EE.prototype.on.call(this, event, handler); +}; +DebugEventEmitter.prototype.addListener = function (event, handler) { + this.on(event, handler); }; var __commonJS = (cb, mod) => @@ -2340,11 +2346,13 @@ var require_legacy = __commonJS({ } else { EE = _EE; } - var Stream = class Stream extends EE { - constructor(opts) { - super(opts); - } - }; + + function Stream(options) { + if (!(this instanceof Stream)) return new Stream(options); + EE.call(this, options); + } + ObjectSetPrototypeOf(Stream.prototype, EE.prototype); + ObjectSetPrototypeOf(Stream, EE); Stream.prototype.pipe = function (dest, options) { const source = this; @@ -2623,65 +2631,63 @@ var require_readable = __commonJS({ var { EventEmitter: EE } = __require("events"); var { Stream, prependListener } = require_legacy(); - class Readable extends Stream { - constructor(options) { - super(options); + function Readable(options) { + if (!(this instanceof Readable)) return new Readable(options); + const isDuplex = this instanceof require_duplex(); + this._readableState = new ReadableState(options, this, isDuplex); + if (options) { + const { read, destroy, construct, signal } = options; + if (typeof read === "function") this._read = read; + if (typeof destroy === "function") this._destroy = destroy; + if (typeof construct === "function") this._construct = construct; + if (signal && !isDuplex) addAbortSignal(signal, this); + } + Stream.call(this, options); - const isDuplex = this instanceof require_duplex(); - this._readableState = new ReadableState(options, this, isDuplex); - if (options) { - const { read, destroy, construct, signal } = options; - if (typeof read === "function") this._read = read; - if (typeof destroy === "function") this._destroy = destroy; - if (typeof construct === "function") this._construct = construct; - if (signal && !isDuplex) addAbortSignal(signal, this); + destroyImpl.construct(this, () => { + if (this._readableState.needReadable) { + maybeReadMore(this, this._readableState); } + }); + } + ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); + ObjectSetPrototypeOf(Readable, Stream); - destroyImpl.construct(this, () => { - if (this._readableState.needReadable) { - maybeReadMore(this, this._readableState); - } - }); - } - - on(ev, fn) { - const res = super.on(ev, fn); - const state = this._readableState; - if (ev === "data") { - state.readableListening = this.listenerCount("readable") > 0; - if (state.flowing !== false) { - this.resume(); - debug("in flowing mode!"); - } else { - debug("in readable mode!", this.__id); - } - } else if (ev === "readable") { - debug("readable listener added!", this.__id); - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - debug( - "on readable - state.length, reading, emittedReadable", - state.length, - state.reading, - state.emittedReadable, - this.__id, - ); - if (state.length) { - emitReadable(this, state); - } else if (!state.reading) { - runOnNextTick(nReadingNextTick, this); - } - } else if (state.endEmitted) { - debug("end already emitted...", this.__id); + Readable.prototype.on = function (ev, fn) { + const res = Stream.prototype.on.call(this, ev, fn); + const state = this._readableState; + if (ev === "data") { + state.readableListening = this.listenerCount("readable") > 0; + if (state.flowing !== false) { + this.resume(); + debug("in flowing mode!"); + } else { + debug("in readable mode!", this.__id); + } + } else if (ev === "readable") { + debug("readable listener added!", this.__id); + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; + state.flowing = false; + state.emittedReadable = false; + debug( + "on readable - state.length, reading, emittedReadable", + state.length, + state.reading, + state.emittedReadable, + this.__id, + ); + if (state.length) { + emitReadable(this, state); + } else if (!state.reading) { + runOnNextTick(nReadingNextTick, this); } + } else if (state.endEmitted) { + debug("end already emitted...", this.__id); } - return res; } - - static ReadableState = ReadableState; - } + return res; + }; class ReadableFromWeb extends Readable { #reader; @@ -3740,666 +3746,6 @@ var require_readable = __commonJS({ }, }); -var require_writable_readable = __commonJS({ - "node_modules/readable-stream/lib/internal/streams/writable-readable.js"( - exports, - module, - ) { - "use strict"; - var { - ArrayPrototypeSlice, - Error: Error2, - FunctionPrototypeSymbolHasInstance, - ObjectDefineProperty, - ObjectDefineProperties, - ObjectSetPrototypeOf, - StringPrototypeToLowerCase, - Symbol: Symbol2, - SymbolHasInstance, - } = require_primordials(); - - var { EventEmitter: EE } = __require("events"); - var Stream = require_legacy().Stream; - var destroyImpl = require_destroy(); - var { addAbortSignal } = require_add_abort_signal(); - var { getHighWaterMark, getDefaultHighWaterMark } = require_state(); - var { - ERR_INVALID_ARG_TYPE, - ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, - ERR_STREAM_CANNOT_PIPE, - ERR_STREAM_DESTROYED, - ERR_STREAM_ALREADY_FINISHED, - ERR_STREAM_NULL_VALUES, - ERR_STREAM_WRITE_AFTER_END, - ERR_UNKNOWN_ENCODING, - } = require_errors().codes; - var { errorOrDestroy } = destroyImpl; - var Readable = require_readable(); - - var destroy = destroyImpl.destroy; - var WritableReadable = class WritableReadable extends Readable { - _writev = null; - - static [SymbolHasInstance](object) { - if (FunctionPrototypeSymbolHasInstance(this, object)) return true; - if (this !== WritableReadable) return false; - return object && object._writableState instanceof WritableState; - } - - [EE.captureRejectionSymbol](err) { - this.destroy(err); - } - - constructor(options = {}) { - super(options); - - const isDuplex = this instanceof require_duplex(); - if ( - !isDuplex && - !FunctionPrototypeSymbolHasInstance(WritableReadable, this) - ) - return new WritableReadable(options); - this._writableState = new WritableState(options, this, isDuplex); - if (options) { - var { write, writev, destroy, final, construct, signal } = options; - if (typeof write === "function") this._write = write; - if (typeof writev === "function") this._writev = writev; - if (typeof destroy === "function") this._destroy = destroy; - if (typeof final === "function") this._final = final; - if (typeof construct === "function") this._construct = construct; - if (signal) addAbortSignal(signal, this); - } - destroyImpl.construct(this, () => { - const state = this._writableState; - if (!state.writing) { - clearBuffer(this, state); - } - finishMaybe(this, state); - }); - - ObjectDefineProperties(this, { - errored: { - enumerable: false, - }, - writableAborted: { - enumerable: false, - }, - }); - } - - get closed() { - return this._writableState ? this._writableState.closed : false; - } - get destroyed() { - return this._writableState ? this._writableState.destroyed : false; - } - set destroyed(value) { - if (this._writableState) { - this._writableState.destroyed = value; - } - } - get writable() { - const w = this._writableState; - return ( - !!w && - w.writable !== false && - !w.destroyed && - !w.errored && - !w.ending && - !w.ended - ); - } - set writable(val) { - if (this._writableState) { - this._writableState.writable = !!val; - } - } - get writableFinished() { - return this._writableState ? this._writableState.finished : false; - } - get writableObjectMode() { - return this._writableState ? this._writableState.objectMode : false; - } - get writableBuffer() { - return this._writableState && this._writableState.getBuffer(); - } - get writableEnded() { - return this._writableState ? this._writableState.ending : false; - } - get writableNeedDrain() { - const wState = this._writableState; - if (!wState) return false; - return !wState.destroyed && !wState.ending && wState.needDrain; - } - get writableHighWaterMark() { - return this._writableState && this._writableState.highWaterMark; - } - get writableCorked() { - return this._writableState ? this._writableState.corked : 0; - } - get writableLength() { - return this._writableState && this._writableState.length; - } - get errored() { - return this._writableState ? this._writableState.errored : null; - } - get writableAborted() { - return !!( - this._writableState.writable !== false && - (this._writableState.destroyed || this._writableState.errored) && - !this._writableState.finished - ); - } - - pipe() { - errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); - } - _write(chunk, encoding, cb) { - if (this._writev) { - this._writev( - [ - { - chunk, - encoding, - }, - ], - cb, - ); - } else { - throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); - } - } - write(chunk, encoding, cb) { - return _write(this, chunk, encoding, cb) === true; - } - cork() { - this._writableState.corked++; - } - uncork() { - const state = this._writableState; - if (state.corked) { - state.corked--; - if (!state.writing) clearBuffer(this, state); - } - } - setDefaultEncoding(encoding) { - if (typeof encoding === "string") - encoding = StringPrototypeToLowerCase(encoding); - if (!Buffer.isEncoding(encoding)) - throw new ERR_UNKNOWN_ENCODING(encoding); - this._writableState.defaultEncoding = encoding; - return this; - } - end(chunk, encoding, cb) { - const state = this._writableState; - debug("end", state, this.__id); - if (typeof chunk === "function") { - cb = chunk; - chunk = null; - encoding = null; - } else if (typeof encoding === "function") { - cb = encoding; - encoding = null; - } - let err; - if (chunk !== null && chunk !== void 0) { - const ret = _write(this, chunk, encoding); - if (ret instanceof Error2) { - err = ret; - } - } - if (state.corked) { - state.corked = 1; - this.uncork(); - } - if (err) { - } else if (!state.errored && !state.ending) { - state.ending = true; - finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED("end"); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("end"); - } - if (typeof cb === "function") { - if (err || state.finished) { - runOnNextTick(cb, err); - } else { - state[kOnFinished].push(cb); - } - } - return this; - } - destroy(err, cb) { - const state = this._writableState; - if ( - !state.destroyed && - (state.bufferedIndex < state.buffered.length || - state[kOnFinished].length) - ) { - runOnNextTick(errorBuffer, state); - } - destroy.call(this, err, cb); - return this; - } - _undestroy() { - return destroyImpl.undestroy.call(this); - } - _destroy(err, cb) { - cb(err); - } - }; - module.exports = WritableReadable; - - function nop() {} - var kOnFinished = Symbol2("kOnFinished"); - - function _write(stream, chunk, encoding, cb) { - const state = stream._writableState; - if (typeof encoding === "function") { - cb = encoding; - encoding = state.defaultEncoding; - } else { - if (!encoding) encoding = state.defaultEncoding; - else if (encoding !== "buffer" && !Buffer.isEncoding(encoding)) - throw new ERR_UNKNOWN_ENCODING(encoding); - if (typeof cb !== "function") cb = nop; - } - if (chunk === null) { - throw new ERR_STREAM_NULL_VALUES(); - } else if (!state.objectMode) { - if (typeof chunk === "string") { - if (state.decodeStrings !== false) { - chunk = Buffer.from(chunk, encoding); - encoding = "buffer"; - } - } else if (chunk instanceof Buffer) { - encoding = "buffer"; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = "buffer"; - } else { - throw new ERR_INVALID_ARG_TYPE( - "chunk", - ["string", "Buffer", "Uint8Array"], - chunk, - ); - } - } - let err; - if (state.ending) { - err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("write"); - } - if (err) { - runOnNextTick(cb, err); - errorOrDestroy(stream, err, true); - return err; - } - state.pendingcb++; - return writeOrBuffer(stream, state, chunk, encoding, cb); - } - function WritableState(options, stream, isDuplex) { - if (typeof isDuplex !== "boolean") - isDuplex = stream instanceof require_duplex(); - this.objectMode = !!(options && options.objectMode); - if (isDuplex) - this.objectMode = - this.objectMode || !!(options && options.writableObjectMode); - this.highWaterMark = options - ? getHighWaterMark(this, options, "writableHighWaterMark", isDuplex) - : getDefaultHighWaterMark(false); - this.finalCalled = false; - this.needDrain = false; - this.ending = false; - this.ended = false; - this.finished = false; - this.destroyed = false; - const noDecode = !!(options && options.decodeStrings === false); - this.decodeStrings = !noDecode; - this.defaultEncoding = (options && options.defaultEncoding) || "utf8"; - this.length = 0; - this.writing = false; - this.corked = 0; - this.sync = true; - this.bufferProcessing = false; - this.onwrite = onwrite.bind(void 0, stream); - this.writecb = null; - this.writelen = 0; - this.afterWriteTickInfo = null; - resetBuffer(this); - this.pendingcb = 0; - this.constructed = true; - this.prefinished = false; - this.errorEmitted = false; - this.emitClose = !options || options.emitClose !== false; - this.autoDestroy = !options || options.autoDestroy !== false; - this.errored = null; - this.closed = false; - this.closeEmitted = false; - this[kOnFinished] = []; - } - function resetBuffer(state) { - state.buffered = []; - state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; - } - WritableState.prototype.getBuffer = function getBuffer() { - return ArrayPrototypeSlice(this.buffered, this.bufferedIndex); - }; - ObjectDefineProperty(WritableState.prototype, "bufferedRequestCount", { - get() { - return this.buffered.length - this.bufferedIndex; - }, - }); - - function writeOrBuffer(stream, state, chunk, encoding, callback) { - const len = state.objectMode ? 1 : chunk.length; - state.length += len; - const ret = state.length < state.highWaterMark; - if (!ret) state.needDrain = true; - if ( - state.writing || - state.corked || - state.errored || - !state.constructed - ) { - state.buffered.push({ - chunk, - encoding, - callback, - }); - if (state.allBuffers && encoding !== "buffer") { - state.allBuffers = false; - } - if (state.allNoop && callback !== nop) { - state.allNoop = false; - } - } else { - state.writelen = len; - state.writecb = callback; - state.writing = true; - state.sync = true; - stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - return ret && !state.errored && !state.destroyed; - } - function doWrite(stream, state, writev, len, chunk, encoding, cb) { - state.writelen = len; - state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED("write")); - else if (writev) stream._writev(chunk, state.onwrite); - else stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - function onwriteError(stream, state, er, cb) { - --state.pendingcb; - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); - } - function onwrite(stream, er) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - if (er) { - er.stack; - if (!state.errored) { - state.errored = er; - } - if (stream._readableState && !stream._readableState.errored) { - stream._readableState.errored = er; - } - if (sync) { - runOnNextTick(onwriteError, stream, state, er, cb); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb, - stream, - state, - }; - runOnNextTick(afterWriteTick, state.afterWriteTickInfo); - } - } else { - afterWrite(stream, state, 1, cb); - } - } - } - function afterWriteTick({ stream, state, count, cb }) { - state.afterWriteTickInfo = null; - return afterWrite(stream, state, count, cb); - } - function afterWrite(stream, state, count, cb) { - const needDrain = - !state.ending && - !stream.destroyed && - state.length === 0 && - state.needDrain; - if (needDrain) { - state.needDrain = false; - stream.emit("drain"); - } - while (count-- > 0) { - state.pendingcb--; - cb(); - } - if (state.destroyed) { - errorBuffer(state); - } - finishMaybe(stream, state); - } - function errorBuffer(state) { - if (state.writing) { - return; - } - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - var _state$errored; - const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; - state.length -= len; - callback( - (_state$errored = state.errored) !== null && _state$errored !== void 0 - ? _state$errored - : new ERR_STREAM_DESTROYED("write"), - ); - } - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - var _state$errored2; - onfinishCallbacks[i]( - (_state$errored2 = state.errored) !== null && - _state$errored2 !== void 0 - ? _state$errored2 - : new ERR_STREAM_DESTROYED("end"), - ); - } - resetBuffer(state); - } - function clearBuffer(stream, state) { - if ( - state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed - ) { - return; - } - const { buffered, bufferedIndex, objectMode } = state; - const bufferedLength = buffered.length - bufferedIndex; - if (!bufferedLength) { - return; - } - let i = bufferedIndex; - state.bufferProcessing = true; - if (bufferedLength > 1 && stream._writev) { - state.pendingcb -= bufferedLength - 1; - const callback = state.allNoop - ? nop - : (err) => { - for (let n = i; n < buffered.length; ++n) { - buffered[n].callback(err); - } - }; - const chunks = - state.allNoop && i === 0 - ? buffered - : ArrayPrototypeSlice(buffered, i); - chunks.allBuffers = state.allBuffers; - doWrite(stream, state, true, state.length, chunks, "", callback); - resetBuffer(state); - } else { - do { - const { chunk, encoding, callback } = buffered[i]; - buffered[i++] = null; - const len = objectMode ? 1 : chunk.length; - doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); - if (i === buffered.length) { - resetBuffer(state); - } else if (i > 256) { - buffered.splice(0, i); - state.bufferedIndex = 0; - } else { - state.bufferedIndex = i; - } - } - state.bufferProcessing = false; - } - function needFinish(state, tag) { - var needFinish = - state.ending && - !state.destroyed && - state.constructed && - state.length === 0 && - !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing && - !state.errorEmitted && - !state.closeEmitted; - debug("needFinish", needFinish, tag); - return needFinish; - } - function callFinal(stream, state) { - let called = false; - function onFinish(err) { - if (called) { - errorOrDestroy( - stream, - err !== null && err !== void 0 ? err : ERR_MULTIPLE_CALLBACK(), - ); - return; - } - called = true; - state.pendingcb--; - if (err) { - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](err); - } - errorOrDestroy(stream, err, state.sync); - } else if (needFinish(state)) { - state.prefinished = true; - stream.emit("prefinish"); - state.pendingcb++; - runOnNextTick(finish, stream, state); - } - } - state.sync = true; - state.pendingcb++; - try { - stream._final(onFinish); - } catch (err) { - onFinish(err); - } - state.sync = false; - } - function prefinish(stream, state) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === "function" && !state.destroyed) { - state.finalCalled = true; - callFinal(stream, state); - } else { - state.prefinished = true; - stream.emit("prefinish"); - } - } - } - function finishMaybe(stream, state, sync) { - if (needFinish(state, stream.__id)) { - prefinish(stream, state); - if (state.pendingcb === 0) { - if (sync) { - state.pendingcb++; - runOnNextTick( - (stream2, state2) => { - if (needFinish(state2)) { - finish(stream2, state2); - } else { - state2.pendingcb--; - } - }, - stream, - state, - ); - } else if (needFinish(state)) { - state.pendingcb++; - finish(stream, state); - } - } - } - } - function finish(stream, state) { - state.pendingcb--; - state.finished = true; - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](); - } - stream.emit("finish"); - if (state.autoDestroy) { - const rState = stream._readableState; - const autoDestroy = - !rState || - (rState.autoDestroy && - (rState.endEmitted || rState.readable === false)); - if (autoDestroy) { - stream.destroy(); - } - } - } - }, -}); - // node_modules/readable-stream/lib/internal/streams/writable.js var require_writable = __commonJS({ "node_modules/readable-stream/lib/internal/streams/writable.js"( @@ -4436,34 +3782,34 @@ var require_writable = __commonJS({ ERR_UNKNOWN_ENCODING, } = require_errors().codes; var { errorOrDestroy } = destroyImpl; - var Writable = class Writable extends Stream { - constructor(options = {}) { - super(options); - const isDuplex = this instanceof require_duplex(); - if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) - return new Writable(options); - this._writableState = new WritableState(options, this, isDuplex); - if (options) { - if (typeof options.write === "function") this._write = options.write; - if (typeof options.writev === "function") - this._writev = options.writev; - if (typeof options.destroy === "function") - this._destroy = options.destroy; - if (typeof options.final === "function") this._final = options.final; - if (typeof options.construct === "function") - this._construct = options.construct; - if (options.signal) addAbortSignal(options.signal, this); - } - destroyImpl.construct(this, () => { - const state = this._writableState; - if (!state.writing) { - clearBuffer(this, state); - } - finishMaybe(this, state); - }); - } - }; + function Writable(options = {}) { + const isDuplex = this instanceof require_duplex(); + if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) + return new Writable(options); + this._writableState = new WritableState(options, this, isDuplex); + if (options) { + if (typeof options.write === "function") this._write = options.write; + if (typeof options.writev === "function") this._writev = options.writev; + if (typeof options.destroy === "function") + this._destroy = options.destroy; + if (typeof options.final === "function") this._final = options.final; + if (typeof options.construct === "function") + this._construct = options.construct; + if (options.signal) addAbortSignal(options.signal, this); + } + Stream.call(this, options); + + destroyImpl.construct(this, () => { + const state = this._writableState; + if (!state.writing) { + clearBuffer(this, state); + } + finishMaybe(this, state); + }); + } + ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); + ObjectSetPrototypeOf(Writable, Stream); module.exports = Writable; function nop() {} @@ -5137,12 +4483,15 @@ var require_duplexify = __commonJS({ globalThis.AbortController || __require("abort-controller").AbortController; var { FunctionPrototypeCall } = require_primordials(); - var Duplexify = class extends Duplex { + class Duplexify extends Duplex { constructor(options) { super(options); + + // https://github.com/nodejs/node/pull/34385 + if ( - (options === null || options === void 0 - ? void 0 + (options === null || options === undefined + ? undefined : options.readable) === false ) { this._readableState.readable = false; @@ -5150,8 +4499,8 @@ var require_duplexify = __commonJS({ this._readableState.endEmitted = true; } if ( - (options === null || options === void 0 - ? void 0 + (options === null || options === undefined + ? undefined : options.writable) === false ) { this._writableState.writable = false; @@ -5160,7 +4509,7 @@ var require_duplexify = __commonJS({ this._writableState.finished = true; } } - }; + } module.exports = function duplexify(body, name) { if (isDuplexNodeStream(body)) { return body; @@ -5493,75 +4842,74 @@ var require_duplex = __commonJS({ ObjectSetPrototypeOf, } = require_primordials(); - // var Readable = require_readable(); - var WritableReadable = require_writable_readable(); + var Readable = require_readable(); - var Duplex = class Duplex extends WritableReadable { - constructor(options) { - super(options); + function Duplex(options) { + if (!(this instanceof Duplex)) return new Duplex(options); + Readable.call(this, options); + Writable.call(this, options); - if (options) { - this.allowHalfOpen = options.allowHalfOpen !== false; - if (options.readable === false) { - this._readableState.readable = false; - this._readableState.ended = true; - this._readableState.endEmitted = true; - } - if (options.writable === false) { - this._writableState.writable = false; - this._writableState.ending = true; - this._writableState.ended = true; - this._writableState.finished = true; - } - } else { - this.allowHalfOpen = true; + if (options) { + this.allowHalfOpen = options.allowHalfOpen !== false; + if (options.readable === false) { + this._readableState.readable = false; + this._readableState.ended = true; + this._readableState.endEmitted = true; } + if (options.writable === false) { + this._writableState.writable = false; + this._writableState.ending = true; + this._writableState.ended = true; + this._writableState.finished = true; + } + } else { + this.allowHalfOpen = true; } - }; + } module.exports = Duplex; + ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); + ObjectSetPrototypeOf(Duplex, Readable); + { - for (var method in WritableReadable.prototype) { + for (var method in Writable.prototype) { if (!Duplex.prototype[method]) - Duplex.prototype[method] = WritableReadable.prototype[method]; + Duplex.prototype[method] = Writable.prototype[method]; } } ObjectDefineProperties(Duplex.prototype, { - writable: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, - "writable", - ), + writable: ObjectGetOwnPropertyDescriptor(Writable.prototype, "writable"), writableHighWaterMark: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableHighWaterMark", ), writableObjectMode: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableObjectMode", ), writableBuffer: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableBuffer", ), writableLength: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableLength", ), writableFinished: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableFinished", ), writableCorked: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableCorked", ), writableEnded: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableEnded", ), writableNeedDrain: ObjectGetOwnPropertyDescriptor( - WritableReadable.prototype, + Writable.prototype, "writableNeedDrain", ), destroyed: { @@ -5616,19 +4964,21 @@ var require_transform = __commonJS({ var { ObjectSetPrototypeOf, Symbol: Symbol2 } = require_primordials(); var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes; var Duplex = require_duplex(); - var Transform = class Transform extends Duplex { - constructor(options) { - super(options); - this._readableState.sync = false; - this[kCallback] = null; - if (options) { - if (typeof options.transform === "function") - this._transform = options.transform; - if (typeof options.flush === "function") this._flush = options.flush; - } - this.on("prefinish", prefinish); + function Transform(options) { + if (!(this instanceof Transform)) return new Transform(options); + this._readableState.sync = false; + this[kCallback] = null; + if (options) { + if (typeof options.transform === "function") + this._transform = options.transform; + if (typeof options.flush === "function") this._flush = options.flush; } - }; + this.on("prefinish", prefinish); + Duplex.call(this, options); + } + ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); + ObjectSetPrototypeOf(Transform, Duplex); + module.exports = Transform; var kCallback = Symbol2("kCallback"); function final(cb) { @@ -5708,19 +5058,21 @@ var require_passthrough = __commonJS({ module, ) { "use strict"; + var { ObjectSetPrototypeOf } = require_primordials(); var Transform = require_transform(); - var PassThrough = class PassThrough extends Transform { - constructor(options) { - super(options); - this._transform = this.#transform; - } - _transform; + function PassThrough(options) { + if (!(this instanceof PassThrough)) return new PassThrough(options); + Transform.call(this, options); + } - #transform(chunk, encoding, cb) { - cb(null, chunk); - } + ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype); + ObjectSetPrototypeOf(PassThrough, Transform); + + PassThrough.prototype._transform = function (chunk, encoding, cb) { + cb(null, chunk); }; + module.exports = PassThrough; }, }); @@ -6758,8 +6110,7 @@ function getNativeReadableStream(Readable, stream, options) { var Writable = require_writable(); var NativeWritable = class NativeWritable extends Writable { - #writePromises = []; - #pathOrFd; + #pathOrFdOrSink; #fileSink; #native = true; @@ -6767,14 +6118,14 @@ var NativeWritable = class NativeWritable extends Writable { _destroy; _final; - constructor(pathOrFd, options = {}) { + constructor(pathOrFdOrSink, options = {}) { super(options); this._construct = this.#internalConstruct; this._destroy = this.#internalDestroy; this._final = this.#internalFinal; - this.#pathOrFd = pathOrFd; + this.#pathOrFdOrSink = pathOrFdOrSink; } // These are confusingly two different fns for construct which initially were the same thing because @@ -6787,7 +6138,16 @@ var NativeWritable = class NativeWritable extends Writable { } #lazyConstruct() { - this.#fileSink = Bun.file(this.#pathOrFd).writer(); + // TODO: Turn this check into check for instanceof FileSink + if (typeof this.#pathOrFdOrSink === "object") { + if (typeof this.#pathOrFdOrSink.write === "function") { + this.#fileSink = this.#pathOrFdOrSink; + } else { + throw new Error("Invalid FileSink"); + } + } else { + this.#fileSink = Bun.file(this.#pathOrFdOrSink).writer(); + } } write(chunk, encoding, cb, native = this.#native) { @@ -6802,17 +6162,15 @@ var NativeWritable = class NativeWritable extends Writable { var fileSink = this.#fileSink; var result = fileSink.write(chunk); - var then = result?.then; - if (isPromise(result) && then && isCallable(then)) { - var writePromises = this.#writePromises; - var i = writePromises.length; - writePromises[i] = result; - - then(() => { + if (isPromise(result)) { + // var writePromises = this.#writePromises; + // var i = writePromises.length; + // writePromises[i] = result; + result.then(() => { this.emit("drain"); fileSink.flush(true); - // We can't naively use i here because we don't know when writes will resolve necessarily - writePromises.splice(writePromises.indexOf(result), 1); + // // We can't naively use i here because we don't know when writes will resolve necessarily + // writePromises.splice(writePromises.indexOf(result), 1); }); return false; } @@ -6839,7 +6197,6 @@ var NativeWritable = class NativeWritable extends Writable { } ref() { - // TODO: Is this right? Should we construct the stream if we call ref? if (!this.#fileSink) { this.#lazyConstruct(); } |