aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r--src/js/node/stream.js123
1 files changed, 39 insertions, 84 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index 67d82d287..30c76d797 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -1,13 +1,20 @@
// Hardcoded module "node:stream" / "readable-stream"
// "readable-stream" npm package
// just transpiled
-var { isPromise, isCallable, direct, Object } = import.meta.primordials;
-globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length
- ? process.env.DEBUG_TRACK_EE.split(",")
- : process.env.DEBUG_STREAMS?.length
- ? process.env.DEBUG_STREAMS.split(",")
- : null;
+// This must go at the top of the file, before any side effects.
+// IS_BUN_DEVELOPMENT is a bundle-only global variable that is set to true when
+// building a development bundle.
+const __TRACK_EE__ = IS_BUN_DEVELOPMENT && !!process.env.DEBUG_TRACK_EE;
+const __DEBUG__ = IS_BUN_DEVELOPMENT && !!(process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__);
+
+if (__DEBUG__) {
+ globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length
+ ? process.env.DEBUG_TRACK_EE.split(",")
+ : process.env.DEBUG_STREAMS?.length
+ ? process.env.DEBUG_STREAMS.split(",")
+ : null;
+}
// Separating DEBUG, DEBUG_STREAMS and DEBUG_TRACK_EE env vars makes it easier to focus on the
// events in this file rather than all debug output across all files
@@ -16,9 +23,6 @@ globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length
// The events and/or all of the outputs for the given stream IDs assigned at stream construction
// By default, child_process gives
-const __TRACK_EE__ = !!process.env.DEBUG_TRACK_EE;
-const __DEBUG__ = !!(process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__);
-
var debug = __DEBUG__
? globalThis.__IDS_TO_TRACK
? // If we are tracking IDs for debug event emitters, we should prefix the debug output with the ID
@@ -30,6 +34,10 @@ var debug = __DEBUG__
: (...args) => console.log(...args.slice(0, -1))
: () => {};
+var { isPromise, isCallable, direct, Object } = globalThis[Symbol.for("Bun.lazy")]("primordials");
+import { EventEmitter as EE } from "bun:events_native";
+import { StringDecoder } from "node:string_decoder";
+
var __create = Object.create;
var __defProp = Object.defineProperty;
var __getOwnPropDesc = Object.getOwnPropertyDescriptor;
@@ -37,48 +45,6 @@ var __getOwnPropNames = Object.getOwnPropertyNames;
var __getProtoOf = Object.getPrototypeOf;
var __hasOwnProp = Object.prototype.hasOwnProperty;
var __ObjectSetPrototypeOf = Object.setPrototypeOf;
-var __require = x => import.meta.require(x);
-
-var _EE = __require("bun:events_native");
-
-function DebugEventEmitter(opts) {
- if (!(this instanceof DebugEventEmitter)) return new DebugEventEmitter(opts);
- _EE.call(this, opts);
- const __id = opts.__id;
- if (__id) {
- __defProp(this, "__id", {
- value: __id,
- readable: true,
- writable: false,
- enumerable: false,
- });
- }
-}
-
-__ObjectSetPrototypeOf(DebugEventEmitter.prototype, _EE.prototype);
-__ObjectSetPrototypeOf(DebugEventEmitter, _EE);
-
-DebugEventEmitter.prototype.emit = function (event, ...args) {
- var __id = this.__id;
- if (__id) {
- debug("emit", event, ...args, __id);
- } else {
- debug("emit", event, ...args);
- }
- return _EE.prototype.emit.call(this, event, ...args);
-};
-DebugEventEmitter.prototype.on = function (event, handler) {
- var __id = this.__id;
- if (__id) {
- debug("on", event, "added", __id);
- } else {
- debug("on", event, "added");
- }
- return _EE.prototype.on.call(this, event, handler);
-};
-DebugEventEmitter.prototype.addListener = function (event, handler) {
- return this.on(event, handler);
-};
var __commonJS = (cb, mod) =>
function __require2() {
@@ -260,9 +226,8 @@ var require_primordials = __commonJS({
var require_util = __commonJS({
"node_modules/readable-stream/lib/ours/util.js"(exports, module) {
"use strict";
- var bufferModule = __require("buffer");
+
var AsyncFunction = Object.getPrototypeOf(async function () {}).constructor;
- var Blob = globalThis.Blob || bufferModule.Blob;
var isBlob =
typeof Blob !== "undefined"
? function isBlob2(b) {
@@ -1388,7 +1353,6 @@ var require_end_of_stream = __commonJS({
var require_operators = __commonJS({
"node_modules/readable-stream/lib/internal/streams/operators.js"(exports, module) {
"use strict";
- var AbortController = globalThis.AbortController || __require("abort-controller").AbortController;
var {
codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
AbortError,
@@ -2084,13 +2048,6 @@ var require_legacy = __commonJS({
"node_modules/readable-stream/lib/internal/streams/legacy.js"(exports, module) {
"use strict";
var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials();
- var { EventEmitter: _EE } = __require("bun:events_native");
- var EE;
- if (__TRACK_EE__) {
- EE = DebugEventEmitter;
- } else {
- EE = _EE;
- }
function Stream(options) {
if (!(this instanceof Stream)) return new Stream(options);
@@ -2332,6 +2289,7 @@ var require_from = __commonJS({
});
var _ReadableFromWeb;
+var _ReadableFromWebForUndici;
// node_modules/readable-stream/lib/internal/streams/readable.js
var require_readable = __commonJS({
@@ -2352,7 +2310,6 @@ var require_readable = __commonJS({
} = require_primordials();
var ReadableState = globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState;
- var { EventEmitter: EE } = __require("bun:events_native");
var { Stream, prependListener } = require_legacy();
function Readable(options) {
@@ -2537,6 +2494,8 @@ var require_readable = __commonJS({
}
}
+ _ReadableFromWebForUndici = ReadableFromWeb;
+
/**
* @param {ReadableStream} readableStream
* @param {{
@@ -2596,7 +2555,7 @@ var require_readable = __commonJS({
}
module.exports = Readable;
- _ReadableFromWeb = ReadableFromWeb;
+ _ReadableFromWeb = newStreamReadableFromReadableStream;
var { addAbortSignal } = require_add_abort_signal();
var eos = require_end_of_stream();
@@ -2626,7 +2585,6 @@ var require_readable = __commonJS({
},
} = require_errors();
var { validateObject } = require_validators();
- var { StringDecoder } = __require("string_decoder");
var from = require_from();
var nop = () => {};
var { errorOrDestroy } = destroyImpl;
@@ -3422,7 +3380,6 @@ var require_writable = __commonJS({
SymbolHasInstance,
} = require_primordials();
- var { EventEmitter: EE } = __require("bun:events_native");
var Stream = require_legacy().Stream;
var destroyImpl = require_destroy();
var { addAbortSignal } = require_add_abort_signal();
@@ -4048,7 +4005,6 @@ var require_writable = __commonJS({
var require_duplexify = __commonJS({
"node_modules/readable-stream/lib/internal/streams/duplexify.js"(exports, module) {
"use strict";
- var bufferModule = __require("buffer");
var {
isReadable,
isWritable,
@@ -4068,7 +4024,6 @@ var require_duplexify = __commonJS({
var Readable = require_readable();
var { createDeferredPromise } = require_util();
var from = require_from();
- var Blob = globalThis.Blob || bufferModule.Blob;
var isBlob =
typeof Blob !== "undefined"
? function isBlob2(b) {
@@ -4077,7 +4032,6 @@ var require_duplexify = __commonJS({
: function isBlob2(b) {
return false;
};
- var AbortController = globalThis.AbortController || __require("abort-controller").AbortController;
var { FunctionPrototypeCall } = require_primordials();
class Duplexify extends Duplex {
constructor(options) {
@@ -4619,7 +4573,6 @@ var require_pipeline = __commonJS({
} = require_errors();
var { validateFunction, validateAbortSignal } = require_validators();
var { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require_utils();
- var AbortController = globalThis.AbortController || __require("abort-controller").AbortController;
var PassThrough;
var Readable;
function destroyer(stream, reading, writing) {
@@ -5304,7 +5257,7 @@ function createNativeStreamReadable(nativeType, Readable) {
const finalizer = new FinalizationRegistry(ptr => ptr && deinit(ptr));
const MIN_BUFFER_SIZE = 512;
var NativeReadable = class NativeReadable extends Readable {
- #ptr;
+ #bunNativePtr;
#refCount = 1;
#constructed = false;
#remainingChunk = undefined;
@@ -5319,12 +5272,12 @@ function createNativeStreamReadable(nativeType, Readable) {
} else {
this.#highWaterMark = 256 * 1024;
}
- this.#ptr = ptr;
+ this.#bunNativePtr = ptr;
this.#constructed = false;
this.#remainingChunk = undefined;
this.#pendingRead = false;
this.#unregisterToken = {};
- finalizer.register(this, this.#ptr, this.#unregisterToken);
+ finalizer.register(this, this.#bunNativePtr, this.#unregisterToken);
}
// maxToRead is by default the highWaterMark passed from the Readable.read call to this fn
@@ -5337,7 +5290,7 @@ function createNativeStreamReadable(nativeType, Readable) {
return;
}
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
__DEBUG__ && debug("ptr @ NativeReadable._read", ptr, this.__id);
if (ptr === 0) {
this.push(null);
@@ -5403,10 +5356,10 @@ function createNativeStreamReadable(nativeType, Readable) {
return chunk;
}
- push(result, encoding) {
- __DEBUG__ && debug("NativeReadable push -- result, encoding", result, encoding, this.__id);
- return super.push(...arguments);
- }
+ // push(result, encoding) {
+ // __DEBUG__ && debug("NativeReadable push -- result, encoding", result, encoding, this.__id);
+ // return super.push(...arguments);
+ // }
#handleResult(result, view, isClosed) {
__DEBUG__ && debug("result, isClosed @ #handleResult", result, isClosed, this.__id);
@@ -5419,7 +5372,9 @@ function createNativeStreamReadable(nativeType, Readable) {
return handleNumberResult(this, result, view, isClosed);
} else if (typeof result === "boolean") {
- this.push(null);
+ process.nextTick(() => {
+ this.push(null);
+ });
return view?.byteLength ?? 0 > 0 ? view : undefined;
} else if (ArrayBuffer.isView(result)) {
if (result.byteLength >= this.#highWaterMark && !this.#hasResized && !isClosed) {
@@ -5458,14 +5413,14 @@ function createNativeStreamReadable(nativeType, Readable) {
}
_destroy(error, callback) {
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
if (ptr === 0) {
callback(error);
return;
}
finalizer.unregister(this.#unregisterToken);
- this.#ptr = 0;
+ this.#bunNativePtr = 0;
if (updateRef) {
updateRef(ptr, false);
}
@@ -5475,7 +5430,7 @@ function createNativeStreamReadable(nativeType, Readable) {
}
ref() {
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
if (ptr === 0) return;
if (this.#refCount++ === 0) {
updateRef(ptr, true);
@@ -5483,7 +5438,7 @@ function createNativeStreamReadable(nativeType, Readable) {
}
unref() {
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
if (ptr === 0) return;
if (this.#refCount-- === 1) {
updateRef(ptr, false);
@@ -5632,7 +5587,7 @@ var NativeWritable = class NativeWritable extends Writable {
const stream_exports = require_ours();
stream_exports[Symbol.for("CommonJS")] = 0;
-stream_exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb };
+stream_exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUndici };
export default stream_exports;
export var _uint8ArrayToBuffer = stream_exports._uint8ArrayToBuffer;
export var _isUint8Array = stream_exports._isUint8Array;
@@ -5654,4 +5609,4 @@ export var Stream = stream_exports.Stream;
export var eos = (stream_exports["eos"] = require_end_of_stream);
export var _getNativeReadableStreamPrototype = stream_exports._getNativeReadableStreamPrototype;
export var NativeWritable = stream_exports.NativeWritable;
-export var promises = Stream.promise;
+export var promises = Stream.promises;