aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/streams.exports.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/streams.exports.js')
-rw-r--r--src/bun.js/streams.exports.js72
1 files changed, 46 insertions, 26 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 15d32adb5..4e812437e 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -5679,7 +5679,7 @@ var require_ours = __commonJS({
*
*/
function createNativeStream(nativeType, Readable) {
- var [pull, start, cancel, setClose, deinit, updateRef] =
+ var [pull, start, cancel, setClose, deinit, updateRef, drainFn] =
globalThis[Symbol.for("Bun.lazy")](nativeType);
var closer = [false];
@@ -5687,7 +5687,7 @@ function createNativeStream(nativeType, Readable) {
if (result > 0) {
const slice = view.subarray(0, result);
const remainder = view.subarray(result);
- if (remainder.byteLength > 0) {
+ if (slice.byteLength > 0) {
nativeReadable.push(slice);
}
@@ -5701,6 +5701,8 @@ function createNativeStream(nativeType, Readable) {
if (isClosed) {
nativeReadable.push(null);
}
+
+ return view;
};
var handleArrayBufferViewResult = function (
@@ -5720,23 +5722,9 @@ function createNativeStream(nativeType, Readable) {
return view;
};
- var handleResult = function (nativeReadable, result, view, isClosed) {
- if (typeof result === "number") {
- return handleNumberResult(nativeReadable, result, view, isClosed);
- } else if (typeof result === "boolean") {
- nativeReadable.push(null);
- return view?.byteLength ?? 0 > 0 ? view : undefined;
- } else if (ArrayBuffer.isView(result)) {
- return handleArrayBufferViewResult(
- nativeReadable,
- result,
- view,
- isClosed,
- );
- } else {
- throw new Error("Invalid result from pull");
- }
- };
+ var DYNAMICALLY_ADJUST_CHUNK_SIZE =
+ process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1";
+
var NativeReadable = class NativeReadable extends Readable {
#ptr;
#refCount = 1;
@@ -5744,6 +5732,7 @@ function createNativeStream(nativeType, Readable) {
#remainingChunk = undefined;
#highWaterMark;
#pendingRead = false;
+ #hasResized = !DYNAMICALLY_ADJUST_CHUNK_SIZE;
constructor(ptr, options = {}) {
super(options);
if (typeof options.highWaterMark === "number") {
@@ -5767,13 +5756,24 @@ function createNativeStream(nativeType, Readable) {
}
if (!this.#constructed) {
- this.#constructed = true;
- start(ptr, this.#highWaterMark);
+ this.#internalConstruct(ptr);
}
return this.#internalRead(this.#getRemainingChunk(), ptr);
}
+ #internalConstruct(ptr) {
+ this.#constructed = true;
+ start(ptr, this.#highWaterMark);
+
+ if (drainFn) {
+ const drainResult = drainFn(ptr);
+ if ((drainResult?.byteLength ?? 0) > 0) {
+ this.push(drainResult);
+ }
+ }
+ }
+
#getRemainingChunk() {
var chunk = this.#remainingChunk;
var highWaterMark = this.#highWaterMark;
@@ -5784,23 +5784,43 @@ function createNativeStream(nativeType, Readable) {
return chunk;
}
+ #handleResult(result, view, isClosed) {
+ if (typeof result === "number") {
+ if (result >= this.#highWaterMark && !this.#hasResized) {
+ this.#highWaterMark *= 2;
+ this.#hasResized = true;
+ }
+
+ return handleNumberResult(this, result, view, isClosed);
+ } else if (typeof result === "boolean") {
+ this.push(null);
+ return view?.byteLength ?? 0 > 0 ? view : undefined;
+ } else if (ArrayBuffer.isView(result)) {
+ if (result.byteLength >= this.#highWaterMark && !this.#hasResized) {
+ this.#highWaterMark *= 2;
+ this.#hasResized = true;
+ }
+
+ return handleArrayBufferViewResult(this, result, view, isClosed);
+ } else {
+ throw new Error("Invalid result from pull");
+ }
+ }
+
#internalRead(view, ptr) {
closer[0] = false;
var result = pull(ptr, view, closer);
if (isPromise(result)) {
this.#pendingRead = true;
- var originalFlowing = this._readableState.flowing;
- this._readableState.flowing = false;
return result.then(
(result) => {
- this._readableState.flowing = originalFlowing;
this.#pendingRead = false;
- this.#remainingChunk = handleResult(this, result, view, closer[0]);
+ this.#remainingChunk = this.#handleResult(result, view, closer[0]);
},
(reason) => errorOrDestroy(this, reason),
);
} else {
- this.#remainingChunk = handleResult(this, result, view, closer[0]);
+ this.#remainingChunk = this.#handleResult(result, view, closer[0]);
}
}