diff options
Diffstat (limited to 'src/bun.js/streams.exports.js')
-rw-r--r-- | src/bun.js/streams.exports.js | 72 |
1 files changed, 46 insertions, 26 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 15d32adb5..4e812437e 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -5679,7 +5679,7 @@ var require_ours = __commonJS({ * */ function createNativeStream(nativeType, Readable) { - var [pull, start, cancel, setClose, deinit, updateRef] = + var [pull, start, cancel, setClose, deinit, updateRef, drainFn] = globalThis[Symbol.for("Bun.lazy")](nativeType); var closer = [false]; @@ -5687,7 +5687,7 @@ function createNativeStream(nativeType, Readable) { if (result > 0) { const slice = view.subarray(0, result); const remainder = view.subarray(result); - if (remainder.byteLength > 0) { + if (slice.byteLength > 0) { nativeReadable.push(slice); } @@ -5701,6 +5701,8 @@ function createNativeStream(nativeType, Readable) { if (isClosed) { nativeReadable.push(null); } + + return view; }; var handleArrayBufferViewResult = function ( @@ -5720,23 +5722,9 @@ function createNativeStream(nativeType, Readable) { return view; }; - var handleResult = function (nativeReadable, result, view, isClosed) { - if (typeof result === "number") { - return handleNumberResult(nativeReadable, result, view, isClosed); - } else if (typeof result === "boolean") { - nativeReadable.push(null); - return view?.byteLength ?? 0 > 0 ? view : undefined; - } else if (ArrayBuffer.isView(result)) { - return handleArrayBufferViewResult( - nativeReadable, - result, - view, - isClosed, - ); - } else { - throw new Error("Invalid result from pull"); - } - }; + var DYNAMICALLY_ADJUST_CHUNK_SIZE = + process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1"; + var NativeReadable = class NativeReadable extends Readable { #ptr; #refCount = 1; @@ -5744,6 +5732,7 @@ function createNativeStream(nativeType, Readable) { #remainingChunk = undefined; #highWaterMark; #pendingRead = false; + #hasResized = !DYNAMICALLY_ADJUST_CHUNK_SIZE; constructor(ptr, options = {}) { super(options); if (typeof options.highWaterMark === "number") { @@ -5767,13 +5756,24 @@ function createNativeStream(nativeType, Readable) { } if (!this.#constructed) { - this.#constructed = true; - start(ptr, this.#highWaterMark); + this.#internalConstruct(ptr); } return this.#internalRead(this.#getRemainingChunk(), ptr); } + #internalConstruct(ptr) { + this.#constructed = true; + start(ptr, this.#highWaterMark); + + if (drainFn) { + const drainResult = drainFn(ptr); + if ((drainResult?.byteLength ?? 0) > 0) { + this.push(drainResult); + } + } + } + #getRemainingChunk() { var chunk = this.#remainingChunk; var highWaterMark = this.#highWaterMark; @@ -5784,23 +5784,43 @@ function createNativeStream(nativeType, Readable) { return chunk; } + #handleResult(result, view, isClosed) { + if (typeof result === "number") { + if (result >= this.#highWaterMark && !this.#hasResized) { + this.#highWaterMark *= 2; + this.#hasResized = true; + } + + return handleNumberResult(this, result, view, isClosed); + } else if (typeof result === "boolean") { + this.push(null); + return view?.byteLength ?? 0 > 0 ? view : undefined; + } else if (ArrayBuffer.isView(result)) { + if (result.byteLength >= this.#highWaterMark && !this.#hasResized) { + this.#highWaterMark *= 2; + this.#hasResized = true; + } + + return handleArrayBufferViewResult(this, result, view, isClosed); + } else { + throw new Error("Invalid result from pull"); + } + } + #internalRead(view, ptr) { closer[0] = false; var result = pull(ptr, view, closer); if (isPromise(result)) { this.#pendingRead = true; - var originalFlowing = this._readableState.flowing; - this._readableState.flowing = false; return result.then( (result) => { - this._readableState.flowing = originalFlowing; this.#pendingRead = false; - this.#remainingChunk = handleResult(this, result, view, closer[0]); + this.#remainingChunk = this.#handleResult(result, view, closer[0]); }, (reason) => errorOrDestroy(this, reason), ); } else { - this.#remainingChunk = handleResult(this, result, view, closer[0]); + this.#remainingChunk = this.#handleResult(result, view, closer[0]); } } |