diff options
author | 2022-06-02 03:00:45 -0700 | |
---|---|---|
committer | 2022-06-02 03:00:45 -0700 | |
commit | e5eabc0658d2133603596ec17a6e7c956c5fe28c (patch) | |
tree | 8e50a0bfa0ca9eba4145191720bb7d412bf8d26f /src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js | |
parent | 121c2960de87c53cc6bdd5f92fab627a74d21a2b (diff) | |
download | bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.gz bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.zst bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.zip |
Faster ReadableStream
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js')
-rw-r--r-- | src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js | 76 |
1 files changed, 42 insertions, 34 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js index 45af990cd..5806ba48c 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js @@ -56,7 +56,7 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity"); } @putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize); - @putByIdDirectPrivate(this, "pendingPullIntos", []); + @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO()); const controller = this; const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => { @@ -116,8 +116,10 @@ function readableByteStreamControllerCancel(controller, reason) "use strict"; var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos"); - if (pendingPullIntos.length > 0) - pendingPullIntos[0].bytesFilled = 0; + var first = pendingPullIntos.peek(); + if (first) + first.bytesFilled = 0; + @putByIdDirectPrivate(controller, "queue", @newQueue()); return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]); } @@ -144,9 +146,9 @@ function readableByteStreamControllerClose(controller) return; } - var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos"); - if (pendingPullIntos.length > 0) { - if (pendingPullIntos[0].bytesFilled > 0) { + var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek(); + if (first) { + if (first.bytesFilled > 0) { const e = @makeTypeError("Close requested while there remain pending bytes"); @readableByteStreamControllerError(controller, e); throw e; @@ -161,7 +163,12 @@ function readableByteStreamControllerClearPendingPullIntos(controller) "use strict"; @readableByteStreamControllerInvalidateBYOBRequest(controller); - @putByIdDirectPrivate(controller, "pendingPullIntos", []); + var existing = @getByIdDirectPrivate(controller, "pendingPullIntos"); + if (existing !== @undefined) { + existing.clear(); + } else { + @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO()); + } } function readableByteStreamControllerGetDesiredSize(controller) @@ -214,8 +221,8 @@ function readableByteStreamControllerPull(controller) @assert(@readableStreamHasDefaultReader(stream)); if (@getByIdDirectPrivate(controller, "queue").size > 0) { - @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length === 0); - const entry = @getByIdDirectPrivate(controller, "queue").content.@shift(); + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isEmpty()); + const entry = @getByIdDirectPrivate(controller, "queue").content.shift(); @getByIdDirectPrivate(controller, "queue").size -= entry.byteLength; @readableByteStreamControllerHandleQueueDrain(controller); let view; @@ -230,7 +237,7 @@ function readableByteStreamControllerPull(controller) if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) { let buffer; try { - buffer = new @ArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize")); + buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize")); } catch (error) { return @Promise.@reject(error); } @@ -243,7 +250,7 @@ function readableByteStreamControllerPull(controller) ctor: @Uint8Array, readerType: 'default' }; - @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor); + @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); } const promise = @readableStreamAddReadRequest(stream); @@ -263,9 +270,9 @@ function readableByteStreamControllerShouldCallPull(controller) return false; if (!@getByIdDirectPrivate(controller, "started")) return false; - if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length > 0 || !!@getByIdDirectPrivate(reader, "bunNativePtr"))) + if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr"))) return true; - if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").length > 0) + if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty()) return true; if (@readableByteStreamControllerGetDesiredSize(controller) > 0) return true; @@ -334,10 +341,10 @@ function readableByteStreamControllerEnqueue(controller, chunk) switch (reader ? @readableStreamReaderKind(reader) : 0) { /* default reader */ case 1: { - if (!@getByIdDirectPrivate(reader, "readRequests").length) + if (!@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty()) @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength); else { - @assert(!@getByIdDirectPrivate(controller, "queue").content.length); + @assert(!@getByIdDirectPrivate(controller, "queue").content.size()); const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); @readableStreamFulfillReadRequest(stream, transferredView, false); } @@ -371,7 +378,7 @@ function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset { "use strict"; - @arrayPush(@getByIdDirectPrivate(controller, "queue").content, { + @getByIdDirectPrivate(controller, "queue").content.push({ buffer: buffer, byteOffset: byteOffset, byteLength: byteLength @@ -383,10 +390,10 @@ function readableByteStreamControllerRespondWithNewView(controller, view) { "use strict"; - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0); - - let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0]; + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); + let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) @throwRangeError("Invalid value for view.byteOffset"); @@ -406,7 +413,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 ) @throwRangeError("bytesWritten has an incorrect value"); - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0); + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); @readableByteStreamControllerRespondInternal(controller, bytesWritten); } @@ -415,7 +422,7 @@ function readableByteStreamControllerRespondInternal(controller, bytesWritten) { "use strict"; - let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0]; + let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); let stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { @@ -435,7 +442,7 @@ function readableByteStreamControllerRespondInReadableState(controller, bytesWri if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) @throwRangeError("bytesWritten value is too great"); - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor); + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor); @readableByteStreamControllerInvalidateBYOBRequest(controller); pullIntoDescriptor.bytesFilled += bytesWritten; @@ -465,7 +472,7 @@ function readableByteStreamControllerRespondInClosedState(controller, firstDescr @assert(firstDescriptor.bytesFilled === 0); if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) { - while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests").length > 0) { + while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) { let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller); @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); } @@ -478,10 +485,10 @@ function readableByteStreamControllerProcessPullDescriptors(controller) "use strict"; @assert(!@getByIdDirectPrivate(controller, "closeRequested")); - while (@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0) { + while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) { if (@getByIdDirectPrivate(controller, "queue").size === 0) return; - let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0]; + let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) { @readableByteStreamControllerShiftPendingDescriptor(controller); @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); @@ -508,7 +515,7 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt } while (totalBytesToCopyRemaining > 0) { - let headOfQueue = @getByIdDirectPrivate(controller, "queue").content[0]; + let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek(); const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength; // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer. // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes @@ -519,14 +526,14 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart); if (headOfQueue.byteLength === bytesToCopy) - @getByIdDirectPrivate(controller, "queue").content.@shift(); + @getByIdDirectPrivate(controller, "queue").content.shift(); else { headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; } @getByIdDirectPrivate(controller, "queue").size -= bytesToCopy; - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor); + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor); @readableByteStreamControllerInvalidateBYOBRequest(controller); pullIntoDescriptor.bytesFilled += bytesToCopy; totalBytesToCopyRemaining -= bytesToCopy; @@ -546,7 +553,7 @@ function readableByteStreamControllerShiftPendingDescriptor(controller) { "use strict"; - let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").@shift(); + let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift(); @readableByteStreamControllerInvalidateBYOBRequest(controller); return descriptor; } @@ -597,7 +604,7 @@ function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor) function readableStreamFulfillReadIntoRequest(stream, chunk, done) { "use strict"; - const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").@shift(); + const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift(); @fulfillPromise(readIntoRequest, { value: chunk, done: done }); } @@ -652,9 +659,10 @@ function readableByteStreamControllerPullInto(controller, view) readerType: 'byob' }; - if (@getByIdDirectPrivate(controller, "pendingPullIntos").length) { + var pending = @getByIdDirectPrivate(controller, "pendingPullIntos"); + if (pending?.isNotEmpty()) { pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); - @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor); + pending.push(pullIntoDescriptor); return @readableStreamAddReadIntoRequest(stream); } @@ -677,7 +685,7 @@ function readableByteStreamControllerPullInto(controller, view) } pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); - @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor); + @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); const promise = @readableStreamAddReadIntoRequest(stream); @readableByteStreamControllerCallPullIfNeeded(controller); return promise; @@ -691,7 +699,7 @@ function readableStreamAddReadIntoRequest(stream) @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed); const readRequest = @newPromise(); - @arrayPush(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests"), readRequest); + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest); return readRequest; } |