aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bun.js/child_process.exports.js59
-rw-r--r--src/bun.js/streams.exports.js1065
2 files changed, 218 insertions, 906 deletions
diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js
index df4a609d8..b0c32abb6 100644
--- a/src/bun.js/child_process.exports.js
+++ b/src/bun.js/child_process.exports.js
@@ -1,12 +1,13 @@
const EventEmitter = import.meta.require("node:events");
const {
Readable: { fromWeb: ReadableFromWeb },
+ NativeWritable,
} = import.meta.require("node:stream");
const {
constants: { signals },
} = import.meta.require("node:os");
-const { ArrayBuffer, isPromise, isCallable } = import.meta.primordials;
+const { ArrayBuffer } = import.meta.primordials;
const MAX_BUFFER = 1024 * 1024;
@@ -946,7 +947,7 @@ export class ChildProcess extends EventEmitter {
case 0: {
switch (io) {
case "pipe":
- return new WrappedFileSink(this.#handle.stdin);
+ return new NativeWritable(this.#handle.stdin);
case "inherit":
return process.stdin || null;
case "destroyed":
@@ -1267,7 +1268,6 @@ function normalizeStdio(stdio) {
function flushStdio(subprocess) {
const stdio = subprocess.stdio;
-
if (stdio == null) return;
for (let i = 0; i < stdio.length; i++) {
@@ -1297,53 +1297,6 @@ function abortChildProcess(child, killSignal) {
}
}
-class WrappedFileSink extends EventEmitter {
- #fileSink;
- #writePromises = [];
-
- constructor(fileSink) {
- super();
- this.#fileSink = fileSink;
- }
-
- write(data) {
- var fileSink = this.#fileSink;
- var result = fileSink.write(data);
-
- var then = result?.then;
- if (isPromise(result) && then && isCallable(then)) {
- var writePromises = this.#writePromises;
- var i = writePromises.length;
- writePromises[i] = result;
-
- then(() => {
- this.emit("drain");
- fileSink.flush(true);
- // We can't naively use i here because we don't know when writes will resolve necessarily
- writePromises.splice(writePromises.indexOf(result), 1);
- });
- return false;
- }
- fileSink.flush(true);
- return true;
- }
-
- destroy() {
- this.end();
- }
-
- end() {
- var writePromises = this.#writePromises;
- if (writePromises.length) {
- PromiseAll(writePromises).then(() => {
- this.#fileSink.end();
- });
- } else {
- this.#fileSink.end();
- }
- }
-}
-
class ShimmedStdin extends EventEmitter {
constructor() {
super();
@@ -1353,9 +1306,12 @@ class ShimmedStdin extends EventEmitter {
}
destroy() {}
end() {}
+ pipe() {}
}
-class ShimmedStdioOutStream extends EventEmitter {}
+class ShimmedStdioOutStream extends EventEmitter {
+ pipe() {}
+}
//------------------------------------------------------------------------------
// Section 5. Validators
@@ -1819,7 +1775,6 @@ function ERR_INVALID_ARG_VALUE(name, value, reason) {
);
}
-// TODO: Add actual proper error implementation here
class SystemError extends Error {
path;
syscall;
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 7f5a0b8f7..1582cb756 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -1,6 +1,6 @@
// "readable-stream" npm package
// just transpiled
-var { isPromise, isCallable, direct } = import.meta.primordials;
+var { isPromise, isCallable, direct, Object } = import.meta.primordials;
globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length
? process.env.DEBUG_TRACK_EE.split(",")
@@ -36,42 +36,48 @@ var __getOwnPropDesc = Object.getOwnPropertyDescriptor;
var __getOwnPropNames = Object.getOwnPropertyNames;
var __getProtoOf = Object.getPrototypeOf;
var __hasOwnProp = Object.prototype.hasOwnProperty;
+var __ObjectSetPrototypeOf = Object.setPrototypeOf;
var __require = (x) => import.meta.require(x);
-var DebugEventEmitter = class DebugEventEmitter extends __require("events") {
- constructor(opts) {
- super(opts);
- const __id = opts.__id;
- if (__id) {
- __defProp(this, "__id", {
- value: __id,
- readable: true,
- writable: false,
- enumerable: false,
- });
- }
- }
- emit(event, ...args) {
- var __id = this.__id;
- if (__id) {
- debug("emit", event, ...args, __id);
- } else {
- debug("emit", event, ...args);
- }
- return super.emit(event, ...args);
+var _EE = __require("events");
+
+function DebugEventEmitter(opts) {
+ if (!(this instanceof DebugEventEmitter)) return new DebugEventEmitter(opts);
+ _EE.call(this, opts);
+ const __id = opts.__id;
+ if (__id) {
+ __defProp(this, "__id", {
+ value: __id,
+ readable: true,
+ writable: false,
+ enumerable: false,
+ });
}
- on(event, handler) {
- var __id = this.__id;
- if (__id) {
- debug("on", event, "added", __id);
- } else {
- debug("on", event, "added");
- }
- super.on(event, handler);
+}
+
+__ObjectSetPrototypeOf(DebugEventEmitter.prototype, _EE.prototype);
+__ObjectSetPrototypeOf(DebugEventEmitter, _EE);
+
+DebugEventEmitter.prototype.emit = function (event, ...args) {
+ var __id = this.__id;
+ if (__id) {
+ debug("emit", event, ...args, __id);
+ } else {
+ debug("emit", event, ...args);
}
- addListener(event, handler) {
- this.on(event, handler);
+ return _EE.prototype.emit.call(this, event, ...args);
+};
+DebugEventEmitter.prototype.on = function (event, handler) {
+ var __id = this.__id;
+ if (__id) {
+ debug("on", event, "added", __id);
+ } else {
+ debug("on", event, "added");
}
+ return _EE.prototype.on.call(this, event, handler);
+};
+DebugEventEmitter.prototype.addListener = function (event, handler) {
+ this.on(event, handler);
};
var __commonJS = (cb, mod) =>
@@ -2340,11 +2346,13 @@ var require_legacy = __commonJS({
} else {
EE = _EE;
}
- var Stream = class Stream extends EE {
- constructor(opts) {
- super(opts);
- }
- };
+
+ function Stream(options) {
+ if (!(this instanceof Stream)) return new Stream(options);
+ EE.call(this, options);
+ }
+ ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
+ ObjectSetPrototypeOf(Stream, EE);
Stream.prototype.pipe = function (dest, options) {
const source = this;
@@ -2623,65 +2631,63 @@ var require_readable = __commonJS({
var { EventEmitter: EE } = __require("events");
var { Stream, prependListener } = require_legacy();
- class Readable extends Stream {
- constructor(options) {
- super(options);
+ function Readable(options) {
+ if (!(this instanceof Readable)) return new Readable(options);
+ const isDuplex = this instanceof require_duplex();
+ this._readableState = new ReadableState(options, this, isDuplex);
+ if (options) {
+ const { read, destroy, construct, signal } = options;
+ if (typeof read === "function") this._read = read;
+ if (typeof destroy === "function") this._destroy = destroy;
+ if (typeof construct === "function") this._construct = construct;
+ if (signal && !isDuplex) addAbortSignal(signal, this);
+ }
+ Stream.call(this, options);
- const isDuplex = this instanceof require_duplex();
- this._readableState = new ReadableState(options, this, isDuplex);
- if (options) {
- const { read, destroy, construct, signal } = options;
- if (typeof read === "function") this._read = read;
- if (typeof destroy === "function") this._destroy = destroy;
- if (typeof construct === "function") this._construct = construct;
- if (signal && !isDuplex) addAbortSignal(signal, this);
+ destroyImpl.construct(this, () => {
+ if (this._readableState.needReadable) {
+ maybeReadMore(this, this._readableState);
}
+ });
+ }
+ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
+ ObjectSetPrototypeOf(Readable, Stream);
- destroyImpl.construct(this, () => {
- if (this._readableState.needReadable) {
- maybeReadMore(this, this._readableState);
- }
- });
- }
-
- on(ev, fn) {
- const res = super.on(ev, fn);
- const state = this._readableState;
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
- if (state.flowing !== false) {
- this.resume();
- debug("in flowing mode!");
- } else {
- debug("in readable mode!", this.__id);
- }
- } else if (ev === "readable") {
- debug("readable listener added!", this.__id);
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- debug(
- "on readable - state.length, reading, emittedReadable",
- state.length,
- state.reading,
- state.emittedReadable,
- this.__id,
- );
- if (state.length) {
- emitReadable(this, state);
- } else if (!state.reading) {
- runOnNextTick(nReadingNextTick, this);
- }
- } else if (state.endEmitted) {
- debug("end already emitted...", this.__id);
+ Readable.prototype.on = function (ev, fn) {
+ const res = Stream.prototype.on.call(this, ev, fn);
+ const state = this._readableState;
+ if (ev === "data") {
+ state.readableListening = this.listenerCount("readable") > 0;
+ if (state.flowing !== false) {
+ this.resume();
+ debug("in flowing mode!");
+ } else {
+ debug("in readable mode!", this.__id);
+ }
+ } else if (ev === "readable") {
+ debug("readable listener added!", this.__id);
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
+ state.flowing = false;
+ state.emittedReadable = false;
+ debug(
+ "on readable - state.length, reading, emittedReadable",
+ state.length,
+ state.reading,
+ state.emittedReadable,
+ this.__id,
+ );
+ if (state.length) {
+ emitReadable(this, state);
+ } else if (!state.reading) {
+ runOnNextTick(nReadingNextTick, this);
}
+ } else if (state.endEmitted) {
+ debug("end already emitted...", this.__id);
}
- return res;
}
-
- static ReadableState = ReadableState;
- }
+ return res;
+ };
class ReadableFromWeb extends Readable {
#reader;
@@ -3740,666 +3746,6 @@ var require_readable = __commonJS({
},
});
-var require_writable_readable = __commonJS({
- "node_modules/readable-stream/lib/internal/streams/writable-readable.js"(
- exports,
- module,
- ) {
- "use strict";
- var {
- ArrayPrototypeSlice,
- Error: Error2,
- FunctionPrototypeSymbolHasInstance,
- ObjectDefineProperty,
- ObjectDefineProperties,
- ObjectSetPrototypeOf,
- StringPrototypeToLowerCase,
- Symbol: Symbol2,
- SymbolHasInstance,
- } = require_primordials();
-
- var { EventEmitter: EE } = __require("events");
- var Stream = require_legacy().Stream;
- var destroyImpl = require_destroy();
- var { addAbortSignal } = require_add_abort_signal();
- var { getHighWaterMark, getDefaultHighWaterMark } = require_state();
- var {
- ERR_INVALID_ARG_TYPE,
- ERR_METHOD_NOT_IMPLEMENTED,
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_CANNOT_PIPE,
- ERR_STREAM_DESTROYED,
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_NULL_VALUES,
- ERR_STREAM_WRITE_AFTER_END,
- ERR_UNKNOWN_ENCODING,
- } = require_errors().codes;
- var { errorOrDestroy } = destroyImpl;
- var Readable = require_readable();
-
- var destroy = destroyImpl.destroy;
- var WritableReadable = class WritableReadable extends Readable {
- _writev = null;
-
- static [SymbolHasInstance](object) {
- if (FunctionPrototypeSymbolHasInstance(this, object)) return true;
- if (this !== WritableReadable) return false;
- return object && object._writableState instanceof WritableState;
- }
-
- [EE.captureRejectionSymbol](err) {
- this.destroy(err);
- }
-
- constructor(options = {}) {
- super(options);
-
- const isDuplex = this instanceof require_duplex();
- if (
- !isDuplex &&
- !FunctionPrototypeSymbolHasInstance(WritableReadable, this)
- )
- return new WritableReadable(options);
- this._writableState = new WritableState(options, this, isDuplex);
- if (options) {
- var { write, writev, destroy, final, construct, signal } = options;
- if (typeof write === "function") this._write = write;
- if (typeof writev === "function") this._writev = writev;
- if (typeof destroy === "function") this._destroy = destroy;
- if (typeof final === "function") this._final = final;
- if (typeof construct === "function") this._construct = construct;
- if (signal) addAbortSignal(signal, this);
- }
- destroyImpl.construct(this, () => {
- const state = this._writableState;
- if (!state.writing) {
- clearBuffer(this, state);
- }
- finishMaybe(this, state);
- });
-
- ObjectDefineProperties(this, {
- errored: {
- enumerable: false,
- },
- writableAborted: {
- enumerable: false,
- },
- });
- }
-
- get closed() {
- return this._writableState ? this._writableState.closed : false;
- }
- get destroyed() {
- return this._writableState ? this._writableState.destroyed : false;
- }
- set destroyed(value) {
- if (this._writableState) {
- this._writableState.destroyed = value;
- }
- }
- get writable() {
- const w = this._writableState;
- return (
- !!w &&
- w.writable !== false &&
- !w.destroyed &&
- !w.errored &&
- !w.ending &&
- !w.ended
- );
- }
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
- get writableNeedDrain() {
- const wState = this._writableState;
- if (!wState) return false;
- return !wState.destroyed && !wState.ending && wState.needDrain;
- }
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
- get errored() {
- return this._writableState ? this._writableState.errored : null;
- }
- get writableAborted() {
- return !!(
- this._writableState.writable !== false &&
- (this._writableState.destroyed || this._writableState.errored) &&
- !this._writableState.finished
- );
- }
-
- pipe() {
- errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
- }
- _write(chunk, encoding, cb) {
- if (this._writev) {
- this._writev(
- [
- {
- chunk,
- encoding,
- },
- ],
- cb,
- );
- } else {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
- }
- }
- write(chunk, encoding, cb) {
- return _write(this, chunk, encoding, cb) === true;
- }
- cork() {
- this._writableState.corked++;
- }
- uncork() {
- const state = this._writableState;
- if (state.corked) {
- state.corked--;
- if (!state.writing) clearBuffer(this, state);
- }
- }
- setDefaultEncoding(encoding) {
- if (typeof encoding === "string")
- encoding = StringPrototypeToLowerCase(encoding);
- if (!Buffer.isEncoding(encoding))
- throw new ERR_UNKNOWN_ENCODING(encoding);
- this._writableState.defaultEncoding = encoding;
- return this;
- }
- end(chunk, encoding, cb) {
- const state = this._writableState;
- debug("end", state, this.__id);
- if (typeof chunk === "function") {
- cb = chunk;
- chunk = null;
- encoding = null;
- } else if (typeof encoding === "function") {
- cb = encoding;
- encoding = null;
- }
- let err;
- if (chunk !== null && chunk !== void 0) {
- const ret = _write(this, chunk, encoding);
- if (ret instanceof Error2) {
- err = ret;
- }
- }
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
- if (err) {
- } else if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
- if (typeof cb === "function") {
- if (err || state.finished) {
- runOnNextTick(cb, err);
- } else {
- state[kOnFinished].push(cb);
- }
- }
- return this;
- }
- destroy(err, cb) {
- const state = this._writableState;
- if (
- !state.destroyed &&
- (state.bufferedIndex < state.buffered.length ||
- state[kOnFinished].length)
- ) {
- runOnNextTick(errorBuffer, state);
- }
- destroy.call(this, err, cb);
- return this;
- }
- _undestroy() {
- return destroyImpl.undestroy.call(this);
- }
- _destroy(err, cb) {
- cb(err);
- }
- };
- module.exports = WritableReadable;
-
- function nop() {}
- var kOnFinished = Symbol2("kOnFinished");
-
- function _write(stream, chunk, encoding, cb) {
- const state = stream._writableState;
- if (typeof encoding === "function") {
- cb = encoding;
- encoding = state.defaultEncoding;
- } else {
- if (!encoding) encoding = state.defaultEncoding;
- else if (encoding !== "buffer" && !Buffer.isEncoding(encoding))
- throw new ERR_UNKNOWN_ENCODING(encoding);
- if (typeof cb !== "function") cb = nop;
- }
- if (chunk === null) {
- throw new ERR_STREAM_NULL_VALUES();
- } else if (!state.objectMode) {
- if (typeof chunk === "string") {
- if (state.decodeStrings !== false) {
- chunk = Buffer.from(chunk, encoding);
- encoding = "buffer";
- }
- } else if (chunk instanceof Buffer) {
- encoding = "buffer";
- } else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk);
- encoding = "buffer";
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "chunk",
- ["string", "Buffer", "Uint8Array"],
- chunk,
- );
- }
- }
- let err;
- if (state.ending) {
- err = new ERR_STREAM_WRITE_AFTER_END();
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("write");
- }
- if (err) {
- runOnNextTick(cb, err);
- errorOrDestroy(stream, err, true);
- return err;
- }
- state.pendingcb++;
- return writeOrBuffer(stream, state, chunk, encoding, cb);
- }
- function WritableState(options, stream, isDuplex) {
- if (typeof isDuplex !== "boolean")
- isDuplex = stream instanceof require_duplex();
- this.objectMode = !!(options && options.objectMode);
- if (isDuplex)
- this.objectMode =
- this.objectMode || !!(options && options.writableObjectMode);
- this.highWaterMark = options
- ? getHighWaterMark(this, options, "writableHighWaterMark", isDuplex)
- : getDefaultHighWaterMark(false);
- this.finalCalled = false;
- this.needDrain = false;
- this.ending = false;
- this.ended = false;
- this.finished = false;
- this.destroyed = false;
- const noDecode = !!(options && options.decodeStrings === false);
- this.decodeStrings = !noDecode;
- this.defaultEncoding = (options && options.defaultEncoding) || "utf8";
- this.length = 0;
- this.writing = false;
- this.corked = 0;
- this.sync = true;
- this.bufferProcessing = false;
- this.onwrite = onwrite.bind(void 0, stream);
- this.writecb = null;
- this.writelen = 0;
- this.afterWriteTickInfo = null;
- resetBuffer(this);
- this.pendingcb = 0;
- this.constructed = true;
- this.prefinished = false;
- this.errorEmitted = false;
- this.emitClose = !options || options.emitClose !== false;
- this.autoDestroy = !options || options.autoDestroy !== false;
- this.errored = null;
- this.closed = false;
- this.closeEmitted = false;
- this[kOnFinished] = [];
- }
- function resetBuffer(state) {
- state.buffered = [];
- state.bufferedIndex = 0;
- state.allBuffers = true;
- state.allNoop = true;
- }
- WritableState.prototype.getBuffer = function getBuffer() {
- return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
- };
- ObjectDefineProperty(WritableState.prototype, "bufferedRequestCount", {
- get() {
- return this.buffered.length - this.bufferedIndex;
- },
- });
-
- function writeOrBuffer(stream, state, chunk, encoding, callback) {
- const len = state.objectMode ? 1 : chunk.length;
- state.length += len;
- const ret = state.length < state.highWaterMark;
- if (!ret) state.needDrain = true;
- if (
- state.writing ||
- state.corked ||
- state.errored ||
- !state.constructed
- ) {
- state.buffered.push({
- chunk,
- encoding,
- callback,
- });
- if (state.allBuffers && encoding !== "buffer") {
- state.allBuffers = false;
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false;
- }
- } else {
- state.writelen = len;
- state.writecb = callback;
- state.writing = true;
- state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
- return ret && !state.errored && !state.destroyed;
- }
- function doWrite(stream, state, writev, len, chunk, encoding, cb) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED("write"));
- else if (writev) stream._writev(chunk, state.onwrite);
- else stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
- function onwriteError(stream, state, er, cb) {
- --state.pendingcb;
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
- }
- function onwrite(stream, er) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
- if (er) {
- er.stack;
- if (!state.errored) {
- state.errored = er;
- }
- if (stream._readableState && !stream._readableState.errored) {
- stream._readableState.errored = er;
- }
- if (sync) {
- runOnNextTick(onwriteError, stream, state, er, cb);
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb,
- stream,
- state,
- };
- runOnNextTick(afterWriteTick, state.afterWriteTickInfo);
- }
- } else {
- afterWrite(stream, state, 1, cb);
- }
- }
- }
- function afterWriteTick({ stream, state, count, cb }) {
- state.afterWriteTickInfo = null;
- return afterWrite(stream, state, count, cb);
- }
- function afterWrite(stream, state, count, cb) {
- const needDrain =
- !state.ending &&
- !stream.destroyed &&
- state.length === 0 &&
- state.needDrain;
- if (needDrain) {
- state.needDrain = false;
- stream.emit("drain");
- }
- while (count-- > 0) {
- state.pendingcb--;
- cb();
- }
- if (state.destroyed) {
- errorBuffer(state);
- }
- finishMaybe(stream, state);
- }
- function errorBuffer(state) {
- if (state.writing) {
- return;
- }
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- var _state$errored;
- const { chunk, callback } = state.buffered[n];
- const len = state.objectMode ? 1 : chunk.length;
- state.length -= len;
- callback(
- (_state$errored = state.errored) !== null && _state$errored !== void 0
- ? _state$errored
- : new ERR_STREAM_DESTROYED("write"),
- );
- }
- const onfinishCallbacks = state[kOnFinished].splice(0);
- for (let i = 0; i < onfinishCallbacks.length; i++) {
- var _state$errored2;
- onfinishCallbacks[i](
- (_state$errored2 = state.errored) !== null &&
- _state$errored2 !== void 0
- ? _state$errored2
- : new ERR_STREAM_DESTROYED("end"),
- );
- }
- resetBuffer(state);
- }
- function clearBuffer(stream, state) {
- if (
- state.corked ||
- state.bufferProcessing ||
- state.destroyed ||
- !state.constructed
- ) {
- return;
- }
- const { buffered, bufferedIndex, objectMode } = state;
- const bufferedLength = buffered.length - bufferedIndex;
- if (!bufferedLength) {
- return;
- }
- let i = bufferedIndex;
- state.bufferProcessing = true;
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1;
- const callback = state.allNoop
- ? nop
- : (err) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err);
- }
- };
- const chunks =
- state.allNoop && i === 0
- ? buffered
- : ArrayPrototypeSlice(buffered, i);
- chunks.allBuffers = state.allBuffers;
- doWrite(stream, state, true, state.length, chunks, "", callback);
- resetBuffer(state);
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i];
- buffered[i++] = null;
- const len = objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, callback);
- } while (i < buffered.length && !state.writing);
- if (i === buffered.length) {
- resetBuffer(state);
- } else if (i > 256) {
- buffered.splice(0, i);
- state.bufferedIndex = 0;
- } else {
- state.bufferedIndex = i;
- }
- }
- state.bufferProcessing = false;
- }
- function needFinish(state, tag) {
- var needFinish =
- state.ending &&
- !state.destroyed &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing &&
- !state.errorEmitted &&
- !state.closeEmitted;
- debug("needFinish", needFinish, tag);
- return needFinish;
- }
- function callFinal(stream, state) {
- let called = false;
- function onFinish(err) {
- if (called) {
- errorOrDestroy(
- stream,
- err !== null && err !== void 0 ? err : ERR_MULTIPLE_CALLBACK(),
- );
- return;
- }
- called = true;
- state.pendingcb--;
- if (err) {
- const onfinishCallbacks = state[kOnFinished].splice(0);
- for (let i = 0; i < onfinishCallbacks.length; i++) {
- onfinishCallbacks[i](err);
- }
- errorOrDestroy(stream, err, state.sync);
- } else if (needFinish(state)) {
- state.prefinished = true;
- stream.emit("prefinish");
- state.pendingcb++;
- runOnNextTick(finish, stream, state);
- }
- }
- state.sync = true;
- state.pendingcb++;
- try {
- stream._final(onFinish);
- } catch (err) {
- onFinish(err);
- }
- state.sync = false;
- }
- function prefinish(stream, state) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === "function" && !state.destroyed) {
- state.finalCalled = true;
- callFinal(stream, state);
- } else {
- state.prefinished = true;
- stream.emit("prefinish");
- }
- }
- }
- function finishMaybe(stream, state, sync) {
- if (needFinish(state, stream.__id)) {
- prefinish(stream, state);
- if (state.pendingcb === 0) {
- if (sync) {
- state.pendingcb++;
- runOnNextTick(
- (stream2, state2) => {
- if (needFinish(state2)) {
- finish(stream2, state2);
- } else {
- state2.pendingcb--;
- }
- },
- stream,
- state,
- );
- } else if (needFinish(state)) {
- state.pendingcb++;
- finish(stream, state);
- }
- }
- }
- }
- function finish(stream, state) {
- state.pendingcb--;
- state.finished = true;
- const onfinishCallbacks = state[kOnFinished].splice(0);
- for (let i = 0; i < onfinishCallbacks.length; i++) {
- onfinishCallbacks[i]();
- }
- stream.emit("finish");
- if (state.autoDestroy) {
- const rState = stream._readableState;
- const autoDestroy =
- !rState ||
- (rState.autoDestroy &&
- (rState.endEmitted || rState.readable === false));
- if (autoDestroy) {
- stream.destroy();
- }
- }
- }
- },
-});
-
// node_modules/readable-stream/lib/internal/streams/writable.js
var require_writable = __commonJS({
"node_modules/readable-stream/lib/internal/streams/writable.js"(
@@ -4436,34 +3782,34 @@ var require_writable = __commonJS({
ERR_UNKNOWN_ENCODING,
} = require_errors().codes;
var { errorOrDestroy } = destroyImpl;
- var Writable = class Writable extends Stream {
- constructor(options = {}) {
- super(options);
- const isDuplex = this instanceof require_duplex();
- if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
- return new Writable(options);
- this._writableState = new WritableState(options, this, isDuplex);
- if (options) {
- if (typeof options.write === "function") this._write = options.write;
- if (typeof options.writev === "function")
- this._writev = options.writev;
- if (typeof options.destroy === "function")
- this._destroy = options.destroy;
- if (typeof options.final === "function") this._final = options.final;
- if (typeof options.construct === "function")
- this._construct = options.construct;
- if (options.signal) addAbortSignal(options.signal, this);
- }
- destroyImpl.construct(this, () => {
- const state = this._writableState;
- if (!state.writing) {
- clearBuffer(this, state);
- }
- finishMaybe(this, state);
- });
- }
- };
+ function Writable(options = {}) {
+ const isDuplex = this instanceof require_duplex();
+ if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
+ return new Writable(options);
+ this._writableState = new WritableState(options, this, isDuplex);
+ if (options) {
+ if (typeof options.write === "function") this._write = options.write;
+ if (typeof options.writev === "function") this._writev = options.writev;
+ if (typeof options.destroy === "function")
+ this._destroy = options.destroy;
+ if (typeof options.final === "function") this._final = options.final;
+ if (typeof options.construct === "function")
+ this._construct = options.construct;
+ if (options.signal) addAbortSignal(options.signal, this);
+ }
+ Stream.call(this, options);
+
+ destroyImpl.construct(this, () => {
+ const state = this._writableState;
+ if (!state.writing) {
+ clearBuffer(this, state);
+ }
+ finishMaybe(this, state);
+ });
+ }
+ ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
+ ObjectSetPrototypeOf(Writable, Stream);
module.exports = Writable;
function nop() {}
@@ -5137,12 +4483,15 @@ var require_duplexify = __commonJS({
globalThis.AbortController ||
__require("abort-controller").AbortController;
var { FunctionPrototypeCall } = require_primordials();
- var Duplexify = class extends Duplex {
+ class Duplexify extends Duplex {
constructor(options) {
super(options);
+
+ // https://github.com/nodejs/node/pull/34385
+
if (
- (options === null || options === void 0
- ? void 0
+ (options === null || options === undefined
+ ? undefined
: options.readable) === false
) {
this._readableState.readable = false;
@@ -5150,8 +4499,8 @@ var require_duplexify = __commonJS({
this._readableState.endEmitted = true;
}
if (
- (options === null || options === void 0
- ? void 0
+ (options === null || options === undefined
+ ? undefined
: options.writable) === false
) {
this._writableState.writable = false;
@@ -5160,7 +4509,7 @@ var require_duplexify = __commonJS({
this._writableState.finished = true;
}
}
- };
+ }
module.exports = function duplexify(body, name) {
if (isDuplexNodeStream(body)) {
return body;
@@ -5493,75 +4842,74 @@ var require_duplex = __commonJS({
ObjectSetPrototypeOf,
} = require_primordials();
- // var Readable = require_readable();
- var WritableReadable = require_writable_readable();
+ var Readable = require_readable();
- var Duplex = class Duplex extends WritableReadable {
- constructor(options) {
- super(options);
+ function Duplex(options) {
+ if (!(this instanceof Duplex)) return new Duplex(options);
+ Readable.call(this, options);
+ Writable.call(this, options);
- if (options) {
- this.allowHalfOpen = options.allowHalfOpen !== false;
- if (options.readable === false) {
- this._readableState.readable = false;
- this._readableState.ended = true;
- this._readableState.endEmitted = true;
- }
- if (options.writable === false) {
- this._writableState.writable = false;
- this._writableState.ending = true;
- this._writableState.ended = true;
- this._writableState.finished = true;
- }
- } else {
- this.allowHalfOpen = true;
+ if (options) {
+ this.allowHalfOpen = options.allowHalfOpen !== false;
+ if (options.readable === false) {
+ this._readableState.readable = false;
+ this._readableState.ended = true;
+ this._readableState.endEmitted = true;
}
+ if (options.writable === false) {
+ this._writableState.writable = false;
+ this._writableState.ending = true;
+ this._writableState.ended = true;
+ this._writableState.finished = true;
+ }
+ } else {
+ this.allowHalfOpen = true;
}
- };
+ }
module.exports = Duplex;
+ ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
+ ObjectSetPrototypeOf(Duplex, Readable);
+
{
- for (var method in WritableReadable.prototype) {
+ for (var method in Writable.prototype) {
if (!Duplex.prototype[method])
- Duplex.prototype[method] = WritableReadable.prototype[method];
+ Duplex.prototype[method] = Writable.prototype[method];
}
}
ObjectDefineProperties(Duplex.prototype, {
- writable: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
- "writable",
- ),
+ writable: ObjectGetOwnPropertyDescriptor(Writable.prototype, "writable"),
writableHighWaterMark: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableHighWaterMark",
),
writableObjectMode: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableObjectMode",
),
writableBuffer: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableBuffer",
),
writableLength: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableLength",
),
writableFinished: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableFinished",
),
writableCorked: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableCorked",
),
writableEnded: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableEnded",
),
writableNeedDrain: ObjectGetOwnPropertyDescriptor(
- WritableReadable.prototype,
+ Writable.prototype,
"writableNeedDrain",
),
destroyed: {
@@ -5616,19 +4964,21 @@ var require_transform = __commonJS({
var { ObjectSetPrototypeOf, Symbol: Symbol2 } = require_primordials();
var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes;
var Duplex = require_duplex();
- var Transform = class Transform extends Duplex {
- constructor(options) {
- super(options);
- this._readableState.sync = false;
- this[kCallback] = null;
- if (options) {
- if (typeof options.transform === "function")
- this._transform = options.transform;
- if (typeof options.flush === "function") this._flush = options.flush;
- }
- this.on("prefinish", prefinish);
+ function Transform(options) {
+ if (!(this instanceof Transform)) return new Transform(options);
+ this._readableState.sync = false;
+ this[kCallback] = null;
+ if (options) {
+ if (typeof options.transform === "function")
+ this._transform = options.transform;
+ if (typeof options.flush === "function") this._flush = options.flush;
}
- };
+ this.on("prefinish", prefinish);
+ Duplex.call(this, options);
+ }
+ ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
+ ObjectSetPrototypeOf(Transform, Duplex);
+
module.exports = Transform;
var kCallback = Symbol2("kCallback");
function final(cb) {
@@ -5708,19 +5058,21 @@ var require_passthrough = __commonJS({
module,
) {
"use strict";
+ var { ObjectSetPrototypeOf } = require_primordials();
var Transform = require_transform();
- var PassThrough = class PassThrough extends Transform {
- constructor(options) {
- super(options);
- this._transform = this.#transform;
- }
- _transform;
+ function PassThrough(options) {
+ if (!(this instanceof PassThrough)) return new PassThrough(options);
+ Transform.call(this, options);
+ }
- #transform(chunk, encoding, cb) {
- cb(null, chunk);
- }
+ ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype);
+ ObjectSetPrototypeOf(PassThrough, Transform);
+
+ PassThrough.prototype._transform = function (chunk, encoding, cb) {
+ cb(null, chunk);
};
+
module.exports = PassThrough;
},
});
@@ -6758,8 +6110,7 @@ function getNativeReadableStream(Readable, stream, options) {
var Writable = require_writable();
var NativeWritable = class NativeWritable extends Writable {
- #writePromises = [];
- #pathOrFd;
+ #pathOrFdOrSink;
#fileSink;
#native = true;
@@ -6767,14 +6118,14 @@ var NativeWritable = class NativeWritable extends Writable {
_destroy;
_final;
- constructor(pathOrFd, options = {}) {
+ constructor(pathOrFdOrSink, options = {}) {
super(options);
this._construct = this.#internalConstruct;
this._destroy = this.#internalDestroy;
this._final = this.#internalFinal;
- this.#pathOrFd = pathOrFd;
+ this.#pathOrFdOrSink = pathOrFdOrSink;
}
// These are confusingly two different fns for construct which initially were the same thing because
@@ -6787,7 +6138,16 @@ var NativeWritable = class NativeWritable extends Writable {
}
#lazyConstruct() {
- this.#fileSink = Bun.file(this.#pathOrFd).writer();
+ // TODO: Turn this check into check for instanceof FileSink
+ if (typeof this.#pathOrFdOrSink === "object") {
+ if (typeof this.#pathOrFdOrSink.write === "function") {
+ this.#fileSink = this.#pathOrFdOrSink;
+ } else {
+ throw new Error("Invalid FileSink");
+ }
+ } else {
+ this.#fileSink = Bun.file(this.#pathOrFdOrSink).writer();
+ }
}
write(chunk, encoding, cb, native = this.#native) {
@@ -6802,17 +6162,15 @@ var NativeWritable = class NativeWritable extends Writable {
var fileSink = this.#fileSink;
var result = fileSink.write(chunk);
- var then = result?.then;
- if (isPromise(result) && then && isCallable(then)) {
- var writePromises = this.#writePromises;
- var i = writePromises.length;
- writePromises[i] = result;
-
- then(() => {
+ if (isPromise(result)) {
+ // var writePromises = this.#writePromises;
+ // var i = writePromises.length;
+ // writePromises[i] = result;
+ result.then(() => {
this.emit("drain");
fileSink.flush(true);
- // We can't naively use i here because we don't know when writes will resolve necessarily
- writePromises.splice(writePromises.indexOf(result), 1);
+ // // We can't naively use i here because we don't know when writes will resolve necessarily
+ // writePromises.splice(writePromises.indexOf(result), 1);
});
return false;
}
@@ -6839,7 +6197,6 @@ var NativeWritable = class NativeWritable extends Writable {
}
ref() {
- // TODO: Is this right? Should we construct the stream if we call ref?
if (!this.#fileSink) {
this.#lazyConstruct();
}