diff options
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r-- | src/js/node/stream.js | 123 |
1 files changed, 39 insertions, 84 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js index 67d82d287..30c76d797 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -1,13 +1,20 @@ // Hardcoded module "node:stream" / "readable-stream" // "readable-stream" npm package // just transpiled -var { isPromise, isCallable, direct, Object } = import.meta.primordials; -globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length - ? process.env.DEBUG_TRACK_EE.split(",") - : process.env.DEBUG_STREAMS?.length - ? process.env.DEBUG_STREAMS.split(",") - : null; +// This must go at the top of the file, before any side effects. +// IS_BUN_DEVELOPMENT is a bundle-only global variable that is set to true when +// building a development bundle. +const __TRACK_EE__ = IS_BUN_DEVELOPMENT && !!process.env.DEBUG_TRACK_EE; +const __DEBUG__ = IS_BUN_DEVELOPMENT && !!(process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__); + +if (__DEBUG__) { + globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length + ? process.env.DEBUG_TRACK_EE.split(",") + : process.env.DEBUG_STREAMS?.length + ? process.env.DEBUG_STREAMS.split(",") + : null; +} // Separating DEBUG, DEBUG_STREAMS and DEBUG_TRACK_EE env vars makes it easier to focus on the // events in this file rather than all debug output across all files @@ -16,9 +23,6 @@ globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length // The events and/or all of the outputs for the given stream IDs assigned at stream construction // By default, child_process gives -const __TRACK_EE__ = !!process.env.DEBUG_TRACK_EE; -const __DEBUG__ = !!(process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__); - var debug = __DEBUG__ ? globalThis.__IDS_TO_TRACK ? // If we are tracking IDs for debug event emitters, we should prefix the debug output with the ID @@ -30,6 +34,10 @@ var debug = __DEBUG__ : (...args) => console.log(...args.slice(0, -1)) : () => {}; +var { isPromise, isCallable, direct, Object } = globalThis[Symbol.for("Bun.lazy")]("primordials"); +import { EventEmitter as EE } from "bun:events_native"; +import { StringDecoder } from "node:string_decoder"; + var __create = Object.create; var __defProp = Object.defineProperty; var __getOwnPropDesc = Object.getOwnPropertyDescriptor; @@ -37,48 +45,6 @@ var __getOwnPropNames = Object.getOwnPropertyNames; var __getProtoOf = Object.getPrototypeOf; var __hasOwnProp = Object.prototype.hasOwnProperty; var __ObjectSetPrototypeOf = Object.setPrototypeOf; -var __require = x => import.meta.require(x); - -var _EE = __require("bun:events_native"); - -function DebugEventEmitter(opts) { - if (!(this instanceof DebugEventEmitter)) return new DebugEventEmitter(opts); - _EE.call(this, opts); - const __id = opts.__id; - if (__id) { - __defProp(this, "__id", { - value: __id, - readable: true, - writable: false, - enumerable: false, - }); - } -} - -__ObjectSetPrototypeOf(DebugEventEmitter.prototype, _EE.prototype); -__ObjectSetPrototypeOf(DebugEventEmitter, _EE); - -DebugEventEmitter.prototype.emit = function (event, ...args) { - var __id = this.__id; - if (__id) { - debug("emit", event, ...args, __id); - } else { - debug("emit", event, ...args); - } - return _EE.prototype.emit.call(this, event, ...args); -}; -DebugEventEmitter.prototype.on = function (event, handler) { - var __id = this.__id; - if (__id) { - debug("on", event, "added", __id); - } else { - debug("on", event, "added"); - } - return _EE.prototype.on.call(this, event, handler); -}; -DebugEventEmitter.prototype.addListener = function (event, handler) { - return this.on(event, handler); -}; var __commonJS = (cb, mod) => function __require2() { @@ -260,9 +226,8 @@ var require_primordials = __commonJS({ var require_util = __commonJS({ "node_modules/readable-stream/lib/ours/util.js"(exports, module) { "use strict"; - var bufferModule = __require("buffer"); + var AsyncFunction = Object.getPrototypeOf(async function () {}).constructor; - var Blob = globalThis.Blob || bufferModule.Blob; var isBlob = typeof Blob !== "undefined" ? function isBlob2(b) { @@ -1388,7 +1353,6 @@ var require_end_of_stream = __commonJS({ var require_operators = __commonJS({ "node_modules/readable-stream/lib/internal/streams/operators.js"(exports, module) { "use strict"; - var AbortController = globalThis.AbortController || __require("abort-controller").AbortController; var { codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, AbortError, @@ -2084,13 +2048,6 @@ var require_legacy = __commonJS({ "node_modules/readable-stream/lib/internal/streams/legacy.js"(exports, module) { "use strict"; var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials(); - var { EventEmitter: _EE } = __require("bun:events_native"); - var EE; - if (__TRACK_EE__) { - EE = DebugEventEmitter; - } else { - EE = _EE; - } function Stream(options) { if (!(this instanceof Stream)) return new Stream(options); @@ -2332,6 +2289,7 @@ var require_from = __commonJS({ }); var _ReadableFromWeb; +var _ReadableFromWebForUndici; // node_modules/readable-stream/lib/internal/streams/readable.js var require_readable = __commonJS({ @@ -2352,7 +2310,6 @@ var require_readable = __commonJS({ } = require_primordials(); var ReadableState = globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState; - var { EventEmitter: EE } = __require("bun:events_native"); var { Stream, prependListener } = require_legacy(); function Readable(options) { @@ -2537,6 +2494,8 @@ var require_readable = __commonJS({ } } + _ReadableFromWebForUndici = ReadableFromWeb; + /** * @param {ReadableStream} readableStream * @param {{ @@ -2596,7 +2555,7 @@ var require_readable = __commonJS({ } module.exports = Readable; - _ReadableFromWeb = ReadableFromWeb; + _ReadableFromWeb = newStreamReadableFromReadableStream; var { addAbortSignal } = require_add_abort_signal(); var eos = require_end_of_stream(); @@ -2626,7 +2585,6 @@ var require_readable = __commonJS({ }, } = require_errors(); var { validateObject } = require_validators(); - var { StringDecoder } = __require("string_decoder"); var from = require_from(); var nop = () => {}; var { errorOrDestroy } = destroyImpl; @@ -3422,7 +3380,6 @@ var require_writable = __commonJS({ SymbolHasInstance, } = require_primordials(); - var { EventEmitter: EE } = __require("bun:events_native"); var Stream = require_legacy().Stream; var destroyImpl = require_destroy(); var { addAbortSignal } = require_add_abort_signal(); @@ -4048,7 +4005,6 @@ var require_writable = __commonJS({ var require_duplexify = __commonJS({ "node_modules/readable-stream/lib/internal/streams/duplexify.js"(exports, module) { "use strict"; - var bufferModule = __require("buffer"); var { isReadable, isWritable, @@ -4068,7 +4024,6 @@ var require_duplexify = __commonJS({ var Readable = require_readable(); var { createDeferredPromise } = require_util(); var from = require_from(); - var Blob = globalThis.Blob || bufferModule.Blob; var isBlob = typeof Blob !== "undefined" ? function isBlob2(b) { @@ -4077,7 +4032,6 @@ var require_duplexify = __commonJS({ : function isBlob2(b) { return false; }; - var AbortController = globalThis.AbortController || __require("abort-controller").AbortController; var { FunctionPrototypeCall } = require_primordials(); class Duplexify extends Duplex { constructor(options) { @@ -4619,7 +4573,6 @@ var require_pipeline = __commonJS({ } = require_errors(); var { validateFunction, validateAbortSignal } = require_validators(); var { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require_utils(); - var AbortController = globalThis.AbortController || __require("abort-controller").AbortController; var PassThrough; var Readable; function destroyer(stream, reading, writing) { @@ -5304,7 +5257,7 @@ function createNativeStreamReadable(nativeType, Readable) { const finalizer = new FinalizationRegistry(ptr => ptr && deinit(ptr)); const MIN_BUFFER_SIZE = 512; var NativeReadable = class NativeReadable extends Readable { - #ptr; + #bunNativePtr; #refCount = 1; #constructed = false; #remainingChunk = undefined; @@ -5319,12 +5272,12 @@ function createNativeStreamReadable(nativeType, Readable) { } else { this.#highWaterMark = 256 * 1024; } - this.#ptr = ptr; + this.#bunNativePtr = ptr; this.#constructed = false; this.#remainingChunk = undefined; this.#pendingRead = false; this.#unregisterToken = {}; - finalizer.register(this, this.#ptr, this.#unregisterToken); + finalizer.register(this, this.#bunNativePtr, this.#unregisterToken); } // maxToRead is by default the highWaterMark passed from the Readable.read call to this fn @@ -5337,7 +5290,7 @@ function createNativeStreamReadable(nativeType, Readable) { return; } - var ptr = this.#ptr; + var ptr = this.#bunNativePtr; __DEBUG__ && debug("ptr @ NativeReadable._read", ptr, this.__id); if (ptr === 0) { this.push(null); @@ -5403,10 +5356,10 @@ function createNativeStreamReadable(nativeType, Readable) { return chunk; } - push(result, encoding) { - __DEBUG__ && debug("NativeReadable push -- result, encoding", result, encoding, this.__id); - return super.push(...arguments); - } + // push(result, encoding) { + // __DEBUG__ && debug("NativeReadable push -- result, encoding", result, encoding, this.__id); + // return super.push(...arguments); + // } #handleResult(result, view, isClosed) { __DEBUG__ && debug("result, isClosed @ #handleResult", result, isClosed, this.__id); @@ -5419,7 +5372,9 @@ function createNativeStreamReadable(nativeType, Readable) { return handleNumberResult(this, result, view, isClosed); } else if (typeof result === "boolean") { - this.push(null); + process.nextTick(() => { + this.push(null); + }); return view?.byteLength ?? 0 > 0 ? view : undefined; } else if (ArrayBuffer.isView(result)) { if (result.byteLength >= this.#highWaterMark && !this.#hasResized && !isClosed) { @@ -5458,14 +5413,14 @@ function createNativeStreamReadable(nativeType, Readable) { } _destroy(error, callback) { - var ptr = this.#ptr; + var ptr = this.#bunNativePtr; if (ptr === 0) { callback(error); return; } finalizer.unregister(this.#unregisterToken); - this.#ptr = 0; + this.#bunNativePtr = 0; if (updateRef) { updateRef(ptr, false); } @@ -5475,7 +5430,7 @@ function createNativeStreamReadable(nativeType, Readable) { } ref() { - var ptr = this.#ptr; + var ptr = this.#bunNativePtr; if (ptr === 0) return; if (this.#refCount++ === 0) { updateRef(ptr, true); @@ -5483,7 +5438,7 @@ function createNativeStreamReadable(nativeType, Readable) { } unref() { - var ptr = this.#ptr; + var ptr = this.#bunNativePtr; if (ptr === 0) return; if (this.#refCount-- === 1) { updateRef(ptr, false); @@ -5632,7 +5587,7 @@ var NativeWritable = class NativeWritable extends Writable { const stream_exports = require_ours(); stream_exports[Symbol.for("CommonJS")] = 0; -stream_exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb }; +stream_exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUndici }; export default stream_exports; export var _uint8ArrayToBuffer = stream_exports._uint8ArrayToBuffer; export var _isUint8Array = stream_exports._isUint8Array; @@ -5654,4 +5609,4 @@ export var Stream = stream_exports.Stream; export var eos = (stream_exports["eos"] = require_end_of_stream); export var _getNativeReadableStreamPrototype = stream_exports._getNativeReadableStreamPrototype; export var NativeWritable = stream_exports.NativeWritable; -export var promises = Stream.promise; +export var promises = Stream.promises; |