aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-02 03:00:45 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-02 03:00:45 -0700
commite5eabc0658d2133603596ec17a6e7c956c5fe28c (patch)
tree8e50a0bfa0ca9eba4145191720bb7d412bf8d26f /src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
parent121c2960de87c53cc6bdd5f92fab627a74d21a2b (diff)
downloadbun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.gz
bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.zst
bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.zip
Faster ReadableStream
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js76
1 files changed, 42 insertions, 34 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
index 45af990cd..5806ba48c 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
@@ -56,7 +56,7 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou
@throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
}
@putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize);
- @putByIdDirectPrivate(this, "pendingPullIntos", []);
+ @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO());
const controller = this;
const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => {
@@ -116,8 +116,10 @@ function readableByteStreamControllerCancel(controller, reason)
"use strict";
var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos");
- if (pendingPullIntos.length > 0)
- pendingPullIntos[0].bytesFilled = 0;
+ var first = pendingPullIntos.peek();
+ if (first)
+ first.bytesFilled = 0;
+
@putByIdDirectPrivate(controller, "queue", @newQueue());
return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]);
}
@@ -144,9 +146,9 @@ function readableByteStreamControllerClose(controller)
return;
}
- var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos");
- if (pendingPullIntos.length > 0) {
- if (pendingPullIntos[0].bytesFilled > 0) {
+ var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek();
+ if (first) {
+ if (first.bytesFilled > 0) {
const e = @makeTypeError("Close requested while there remain pending bytes");
@readableByteStreamControllerError(controller, e);
throw e;
@@ -161,7 +163,12 @@ function readableByteStreamControllerClearPendingPullIntos(controller)
"use strict";
@readableByteStreamControllerInvalidateBYOBRequest(controller);
- @putByIdDirectPrivate(controller, "pendingPullIntos", []);
+ var existing = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (existing !== @undefined) {
+ existing.clear();
+ } else {
+ @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO());
+ }
}
function readableByteStreamControllerGetDesiredSize(controller)
@@ -214,8 +221,8 @@ function readableByteStreamControllerPull(controller)
@assert(@readableStreamHasDefaultReader(stream));
if (@getByIdDirectPrivate(controller, "queue").size > 0) {
- @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length === 0);
- const entry = @getByIdDirectPrivate(controller, "queue").content.@shift();
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isEmpty());
+ const entry = @getByIdDirectPrivate(controller, "queue").content.shift();
@getByIdDirectPrivate(controller, "queue").size -= entry.byteLength;
@readableByteStreamControllerHandleQueueDrain(controller);
let view;
@@ -230,7 +237,7 @@ function readableByteStreamControllerPull(controller)
if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) {
let buffer;
try {
- buffer = new @ArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
+ buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
} catch (error) {
return @Promise.@reject(error);
}
@@ -243,7 +250,7 @@ function readableByteStreamControllerPull(controller)
ctor: @Uint8Array,
readerType: 'default'
};
- @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor);
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
}
const promise = @readableStreamAddReadRequest(stream);
@@ -263,9 +270,9 @@ function readableByteStreamControllerShouldCallPull(controller)
return false;
if (!@getByIdDirectPrivate(controller, "started"))
return false;
- if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length > 0 || !!@getByIdDirectPrivate(reader, "bunNativePtr")))
+ if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr")))
return true;
- if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").length > 0)
+ if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty())
return true;
if (@readableByteStreamControllerGetDesiredSize(controller) > 0)
return true;
@@ -334,10 +341,10 @@ function readableByteStreamControllerEnqueue(controller, chunk)
switch (reader ? @readableStreamReaderKind(reader) : 0) {
/* default reader */
case 1: {
- if (!@getByIdDirectPrivate(reader, "readRequests").length)
+ if (!@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty())
@readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
else {
- @assert(!@getByIdDirectPrivate(controller, "queue").content.length);
+ @assert(!@getByIdDirectPrivate(controller, "queue").content.size());
const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
@readableStreamFulfillReadRequest(stream, transferredView, false);
}
@@ -371,7 +378,7 @@ function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset
{
"use strict";
- @arrayPush(@getByIdDirectPrivate(controller, "queue").content, {
+ @getByIdDirectPrivate(controller, "queue").content.push({
buffer: buffer,
byteOffset: byteOffset,
byteLength: byteLength
@@ -383,10 +390,10 @@ function readableByteStreamControllerRespondWithNewView(controller, view)
{
"use strict";
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0);
-
- let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0];
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+
if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset)
@throwRangeError("Invalid value for view.byteOffset");
@@ -406,7 +413,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten)
if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 )
@throwRangeError("bytesWritten has an incorrect value");
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0);
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
@readableByteStreamControllerRespondInternal(controller, bytesWritten);
}
@@ -415,7 +422,7 @@ function readableByteStreamControllerRespondInternal(controller, bytesWritten)
{
"use strict";
- let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0];
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
let stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
@@ -435,7 +442,7 @@ function readableByteStreamControllerRespondInReadableState(controller, bytesWri
if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength)
@throwRangeError("bytesWritten value is too great");
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor);
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
@readableByteStreamControllerInvalidateBYOBRequest(controller);
pullIntoDescriptor.bytesFilled += bytesWritten;
@@ -465,7 +472,7 @@ function readableByteStreamControllerRespondInClosedState(controller, firstDescr
@assert(firstDescriptor.bytesFilled === 0);
if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) {
- while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests").length > 0) {
+ while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) {
let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller);
@readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
}
@@ -478,10 +485,10 @@ function readableByteStreamControllerProcessPullDescriptors(controller)
"use strict";
@assert(!@getByIdDirectPrivate(controller, "closeRequested"));
- while (@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0) {
+ while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) {
if (@getByIdDirectPrivate(controller, "queue").size === 0)
return;
- let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0];
+ let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
@readableByteStreamControllerShiftPendingDescriptor(controller);
@readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
@@ -508,7 +515,7 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt
}
while (totalBytesToCopyRemaining > 0) {
- let headOfQueue = @getByIdDirectPrivate(controller, "queue").content[0];
+ let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek();
const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength;
// Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer.
// Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes
@@ -519,14 +526,14 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt
new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart);
if (headOfQueue.byteLength === bytesToCopy)
- @getByIdDirectPrivate(controller, "queue").content.@shift();
+ @getByIdDirectPrivate(controller, "queue").content.shift();
else {
headOfQueue.byteOffset += bytesToCopy;
headOfQueue.byteLength -= bytesToCopy;
}
@getByIdDirectPrivate(controller, "queue").size -= bytesToCopy;
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor);
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
@readableByteStreamControllerInvalidateBYOBRequest(controller);
pullIntoDescriptor.bytesFilled += bytesToCopy;
totalBytesToCopyRemaining -= bytesToCopy;
@@ -546,7 +553,7 @@ function readableByteStreamControllerShiftPendingDescriptor(controller)
{
"use strict";
- let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").@shift();
+ let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift();
@readableByteStreamControllerInvalidateBYOBRequest(controller);
return descriptor;
}
@@ -597,7 +604,7 @@ function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor)
function readableStreamFulfillReadIntoRequest(stream, chunk, done)
{
"use strict";
- const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").@shift();
+ const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift();
@fulfillPromise(readIntoRequest, { value: chunk, done: done });
}
@@ -652,9 +659,10 @@ function readableByteStreamControllerPullInto(controller, view)
readerType: 'byob'
};
- if (@getByIdDirectPrivate(controller, "pendingPullIntos").length) {
+ var pending = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (pending?.isNotEmpty()) {
pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
- @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor);
+ pending.push(pullIntoDescriptor);
return @readableStreamAddReadIntoRequest(stream);
}
@@ -677,7 +685,7 @@ function readableByteStreamControllerPullInto(controller, view)
}
pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
- @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor);
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
const promise = @readableStreamAddReadIntoRequest(stream);
@readableByteStreamControllerCallPullIfNeeded(controller);
return promise;
@@ -691,7 +699,7 @@ function readableStreamAddReadIntoRequest(stream)
@assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed);
const readRequest = @newPromise();
- @arrayPush(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests"), readRequest);
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest);
return readRequest;
}