aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r--src/js/node/stream.js89
1 files changed, 18 insertions, 71 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index 67d82d287..9344fa73f 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -1,7 +1,9 @@
// Hardcoded module "node:stream" / "readable-stream"
// "readable-stream" npm package
// just transpiled
-var { isPromise, isCallable, direct, Object } = import.meta.primordials;
+var { isPromise, isCallable, direct, Object } = globalThis[Symbol.for("Bun.lazy")]("primordials");
+import { EventEmitter as EE } from "bun:events_native";
+import { StringDecoder } from "node:string_decoder";
globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length
? process.env.DEBUG_TRACK_EE.split(",")
@@ -37,48 +39,6 @@ var __getOwnPropNames = Object.getOwnPropertyNames;
var __getProtoOf = Object.getPrototypeOf;
var __hasOwnProp = Object.prototype.hasOwnProperty;
var __ObjectSetPrototypeOf = Object.setPrototypeOf;
-var __require = x => import.meta.require(x);
-
-var _EE = __require("bun:events_native");
-
-function DebugEventEmitter(opts) {
- if (!(this instanceof DebugEventEmitter)) return new DebugEventEmitter(opts);
- _EE.call(this, opts);
- const __id = opts.__id;
- if (__id) {
- __defProp(this, "__id", {
- value: __id,
- readable: true,
- writable: false,
- enumerable: false,
- });
- }
-}
-
-__ObjectSetPrototypeOf(DebugEventEmitter.prototype, _EE.prototype);
-__ObjectSetPrototypeOf(DebugEventEmitter, _EE);
-
-DebugEventEmitter.prototype.emit = function (event, ...args) {
- var __id = this.__id;
- if (__id) {
- debug("emit", event, ...args, __id);
- } else {
- debug("emit", event, ...args);
- }
- return _EE.prototype.emit.call(this, event, ...args);
-};
-DebugEventEmitter.prototype.on = function (event, handler) {
- var __id = this.__id;
- if (__id) {
- debug("on", event, "added", __id);
- } else {
- debug("on", event, "added");
- }
- return _EE.prototype.on.call(this, event, handler);
-};
-DebugEventEmitter.prototype.addListener = function (event, handler) {
- return this.on(event, handler);
-};
var __commonJS = (cb, mod) =>
function __require2() {
@@ -260,9 +220,8 @@ var require_primordials = __commonJS({
var require_util = __commonJS({
"node_modules/readable-stream/lib/ours/util.js"(exports, module) {
"use strict";
- var bufferModule = __require("buffer");
+
var AsyncFunction = Object.getPrototypeOf(async function () {}).constructor;
- var Blob = globalThis.Blob || bufferModule.Blob;
var isBlob =
typeof Blob !== "undefined"
? function isBlob2(b) {
@@ -1388,7 +1347,6 @@ var require_end_of_stream = __commonJS({
var require_operators = __commonJS({
"node_modules/readable-stream/lib/internal/streams/operators.js"(exports, module) {
"use strict";
- var AbortController = globalThis.AbortController || __require("abort-controller").AbortController;
var {
codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
AbortError,
@@ -2084,13 +2042,6 @@ var require_legacy = __commonJS({
"node_modules/readable-stream/lib/internal/streams/legacy.js"(exports, module) {
"use strict";
var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials();
- var { EventEmitter: _EE } = __require("bun:events_native");
- var EE;
- if (__TRACK_EE__) {
- EE = DebugEventEmitter;
- } else {
- EE = _EE;
- }
function Stream(options) {
if (!(this instanceof Stream)) return new Stream(options);
@@ -2332,6 +2283,7 @@ var require_from = __commonJS({
});
var _ReadableFromWeb;
+var _ReadableFromWebForUndici;
// node_modules/readable-stream/lib/internal/streams/readable.js
var require_readable = __commonJS({
@@ -2352,7 +2304,6 @@ var require_readable = __commonJS({
} = require_primordials();
var ReadableState = globalThis[Symbol.for("Bun.lazy")]("bun:stream").ReadableState;
- var { EventEmitter: EE } = __require("bun:events_native");
var { Stream, prependListener } = require_legacy();
function Readable(options) {
@@ -2537,6 +2488,8 @@ var require_readable = __commonJS({
}
}
+ _ReadableFromWebForUndici = ReadableFromWeb;
+
/**
* @param {ReadableStream} readableStream
* @param {{
@@ -2596,7 +2549,7 @@ var require_readable = __commonJS({
}
module.exports = Readable;
- _ReadableFromWeb = ReadableFromWeb;
+ _ReadableFromWeb = newStreamReadableFromReadableStream;
var { addAbortSignal } = require_add_abort_signal();
var eos = require_end_of_stream();
@@ -2626,7 +2579,6 @@ var require_readable = __commonJS({
},
} = require_errors();
var { validateObject } = require_validators();
- var { StringDecoder } = __require("string_decoder");
var from = require_from();
var nop = () => {};
var { errorOrDestroy } = destroyImpl;
@@ -3422,7 +3374,6 @@ var require_writable = __commonJS({
SymbolHasInstance,
} = require_primordials();
- var { EventEmitter: EE } = __require("bun:events_native");
var Stream = require_legacy().Stream;
var destroyImpl = require_destroy();
var { addAbortSignal } = require_add_abort_signal();
@@ -4048,7 +3999,6 @@ var require_writable = __commonJS({
var require_duplexify = __commonJS({
"node_modules/readable-stream/lib/internal/streams/duplexify.js"(exports, module) {
"use strict";
- var bufferModule = __require("buffer");
var {
isReadable,
isWritable,
@@ -4068,7 +4018,6 @@ var require_duplexify = __commonJS({
var Readable = require_readable();
var { createDeferredPromise } = require_util();
var from = require_from();
- var Blob = globalThis.Blob || bufferModule.Blob;
var isBlob =
typeof Blob !== "undefined"
? function isBlob2(b) {
@@ -4077,7 +4026,6 @@ var require_duplexify = __commonJS({
: function isBlob2(b) {
return false;
};
- var AbortController = globalThis.AbortController || __require("abort-controller").AbortController;
var { FunctionPrototypeCall } = require_primordials();
class Duplexify extends Duplex {
constructor(options) {
@@ -4619,7 +4567,6 @@ var require_pipeline = __commonJS({
} = require_errors();
var { validateFunction, validateAbortSignal } = require_validators();
var { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require_utils();
- var AbortController = globalThis.AbortController || __require("abort-controller").AbortController;
var PassThrough;
var Readable;
function destroyer(stream, reading, writing) {
@@ -5304,7 +5251,7 @@ function createNativeStreamReadable(nativeType, Readable) {
const finalizer = new FinalizationRegistry(ptr => ptr && deinit(ptr));
const MIN_BUFFER_SIZE = 512;
var NativeReadable = class NativeReadable extends Readable {
- #ptr;
+ #bunNativePtr;
#refCount = 1;
#constructed = false;
#remainingChunk = undefined;
@@ -5319,12 +5266,12 @@ function createNativeStreamReadable(nativeType, Readable) {
} else {
this.#highWaterMark = 256 * 1024;
}
- this.#ptr = ptr;
+ this.#bunNativePtr = ptr;
this.#constructed = false;
this.#remainingChunk = undefined;
this.#pendingRead = false;
this.#unregisterToken = {};
- finalizer.register(this, this.#ptr, this.#unregisterToken);
+ finalizer.register(this, this.#bunNativePtr, this.#unregisterToken);
}
// maxToRead is by default the highWaterMark passed from the Readable.read call to this fn
@@ -5337,7 +5284,7 @@ function createNativeStreamReadable(nativeType, Readable) {
return;
}
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
__DEBUG__ && debug("ptr @ NativeReadable._read", ptr, this.__id);
if (ptr === 0) {
this.push(null);
@@ -5458,14 +5405,14 @@ function createNativeStreamReadable(nativeType, Readable) {
}
_destroy(error, callback) {
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
if (ptr === 0) {
callback(error);
return;
}
finalizer.unregister(this.#unregisterToken);
- this.#ptr = 0;
+ this.#bunNativePtr = 0;
if (updateRef) {
updateRef(ptr, false);
}
@@ -5475,7 +5422,7 @@ function createNativeStreamReadable(nativeType, Readable) {
}
ref() {
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
if (ptr === 0) return;
if (this.#refCount++ === 0) {
updateRef(ptr, true);
@@ -5483,7 +5430,7 @@ function createNativeStreamReadable(nativeType, Readable) {
}
unref() {
- var ptr = this.#ptr;
+ var ptr = this.#bunNativePtr;
if (ptr === 0) return;
if (this.#refCount-- === 1) {
updateRef(ptr, false);
@@ -5632,7 +5579,7 @@ var NativeWritable = class NativeWritable extends Writable {
const stream_exports = require_ours();
stream_exports[Symbol.for("CommonJS")] = 0;
-stream_exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb };
+stream_exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUndici };
export default stream_exports;
export var _uint8ArrayToBuffer = stream_exports._uint8ArrayToBuffer;
export var _isUint8Array = stream_exports._isUint8Array;
@@ -5654,4 +5601,4 @@ export var Stream = stream_exports.Stream;
export var eos = (stream_exports["eos"] = require_end_of_stream);
export var _getNativeReadableStreamPrototype = stream_exports._getNativeReadableStreamPrototype;
export var NativeWritable = stream_exports.NativeWritable;
-export var promises = Stream.promise;
+export var promises = Stream.promises;