aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableByteStreamInternals.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js/ReadableByteStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableByteStreamInternals.js712
1 files changed, 712 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/ReadableByteStreamInternals.js b/src/bun.js/builtins/js/ReadableByteStreamInternals.js
new file mode 100644
index 000000000..01da62e1a
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableByteStreamInternals.js
@@ -0,0 +1,712 @@
+/*
+ * Copyright (C) 2016 Canon Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+// @internal
+
+function privateInitializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
+{
+ "use strict";
+
+ if (!@isReadableStream(stream))
+ @throwTypeError("ReadableByteStreamController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ @throwTypeError("ReadableStream already has a controller");
+
+ @putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ @putByIdDirectPrivate(this, "underlyingByteSource", underlyingByteSource);
+ @putByIdDirectPrivate(this, "pullAgain", false);
+ @putByIdDirectPrivate(this, "pulling", false);
+ @readableByteStreamControllerClearPendingPullIntos(this);
+ @putByIdDirectPrivate(this, "queue", @newQueue());
+ @putByIdDirectPrivate(this, "started", 0);
+ @putByIdDirectPrivate(this, "closeRequested", false);
+
+ let hwm = @toNumber(highWaterMark);
+ if (@isNaN(hwm) || hwm < 0)
+ @throwRangeError("highWaterMark value is negative or not a number");
+ @putByIdDirectPrivate(this, "strategyHWM", hwm);
+
+ let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
+ if (autoAllocateChunkSize !== @undefined) {
+ autoAllocateChunkSize = @toNumber(autoAllocateChunkSize);
+ if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === @Infinity || autoAllocateChunkSize === -@Infinity)
+ @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
+ }
+ @putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize);
+ @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO());
+
+
+ const controller = this;
+ @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
+ @assert(!@getByIdDirectPrivate(controller, "pulling"));
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ }, (error) => {
+ if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
+ @readableByteStreamControllerError(controller, error);
+ });
+
+ @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel);
+ @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull);
+
+ return this;
+}
+
+function readableStreamByteStreamControllerStart(controller) {
+ "use strict";
+ @putByIdDirectPrivate(controller, "start", @undefined);
+
+}
+
+
+function privateInitializeReadableStreamBYOBRequest(controller, view)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(this, "associatedReadableByteStreamController", controller);
+ @putByIdDirectPrivate(this, "view", view);
+}
+
+function isReadableByteStreamController(controller)
+{
+ "use strict";
+
+ // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
+ // See corresponding function for explanations.
+ return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingByteSource");
+}
+
+function isReadableStreamBYOBRequest(byobRequest)
+{
+ "use strict";
+
+ // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
+ // See corresponding function for explanations.
+ return @isObject(byobRequest) && !!@getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController");
+}
+
+function isReadableStreamBYOBReader(reader)
+{
+ "use strict";
+
+ // Spec tells to return true only if reader has a readIntoRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readIntoRequests is initialized with an empty array, the following test is ok.
+ return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readIntoRequests");
+}
+
+function readableByteStreamControllerCancel(controller, reason)
+{
+ "use strict";
+
+ var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ var first = pendingPullIntos.peek();
+ if (first)
+ first.bytesFilled = 0;
+
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+ return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]);
+}
+
+function readableByteStreamControllerError(controller, e)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable);
+ @readableByteStreamControllerClearPendingPullIntos(controller);
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+ @readableStreamError(@getByIdDirectPrivate(controller, "controlledReadableStream"), e);
+}
+
+function readableByteStreamControllerClose(controller)
+{
+ "use strict";
+
+ @assert(!@getByIdDirectPrivate(controller, "closeRequested"));
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable);
+
+ if (@getByIdDirectPrivate(controller, "queue").size > 0) {
+ @putByIdDirectPrivate(controller, "closeRequested", true);
+ return;
+ }
+
+ var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek();
+ if (first) {
+ if (first.bytesFilled > 0) {
+ const e = @makeTypeError("Close requested while there remain pending bytes");
+ @readableByteStreamControllerError(controller, e);
+ throw e;
+ }
+ }
+
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+function readableByteStreamControllerClearPendingPullIntos(controller)
+{
+ "use strict";
+
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ var existing = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (existing !== @undefined) {
+ existing.clear();
+ } else {
+ @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO());
+ }
+}
+
+function readableByteStreamControllerGetDesiredSize(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ if (state === @streamErrored)
+ return null;
+ if (state === @streamClosed)
+ return 0;
+
+ return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size;
+}
+
+function readableStreamHasBYOBReader(stream)
+{
+ "use strict";
+
+ const reader = @getByIdDirectPrivate(stream, "reader");
+ return reader !== @undefined && @isReadableStreamBYOBReader(reader);
+}
+
+function readableStreamHasDefaultReader(stream)
+{
+ "use strict";
+
+ const reader = @getByIdDirectPrivate(stream, "reader");
+ return reader !== @undefined && @isReadableStreamDefaultReader(reader);
+}
+
+function readableByteStreamControllerHandleQueueDrain(controller) {
+
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable);
+ if (!@getByIdDirectPrivate(controller, "queue").size && @getByIdDirectPrivate(controller, "closeRequested"))
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerPull(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ @assert(@readableStreamHasDefaultReader(stream));
+ if (@getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) {
+ const entry = @getByIdDirectPrivate(controller, "queue").content.shift();
+ @getByIdDirectPrivate(controller, "queue").size -= entry.byteLength;
+ @readableByteStreamControllerHandleQueueDrain(controller);
+ let view;
+ try {
+ view = new @Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);
+ } catch (error) {
+ return @Promise.@reject(error);
+ }
+ return @createFulfilledPromise({ value: view, done: false });
+ }
+
+ if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) {
+ let buffer;
+ try {
+ buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
+ } catch (error) {
+ return @Promise.@reject(error);
+ }
+ const pullIntoDescriptor = {
+ buffer,
+ byteOffset: 0,
+ byteLength: @getByIdDirectPrivate(controller, "autoAllocateChunkSize"),
+ bytesFilled: 0,
+ elementSize: 1,
+ ctor: @Uint8Array,
+ readerType: 'default'
+ };
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
+ }
+
+ const promise = @readableStreamAddReadRequest(stream);
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+function readableByteStreamControllerShouldCallPull(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return false;
+ if (@getByIdDirectPrivate(controller, "closeRequested"))
+ return false;
+ if (!(@getByIdDirectPrivate(controller, "started") > 0))
+ return false;
+ const reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (reader && (@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr")))
+ return true;
+ if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty())
+ return true;
+ if (@readableByteStreamControllerGetDesiredSize(controller) > 0)
+ return true;
+ return false;
+}
+
+function readableByteStreamControllerCallPullIfNeeded(controller)
+{
+ "use strict";
+
+ if (!@readableByteStreamControllerShouldCallPull(controller))
+ return;
+
+ if (@getByIdDirectPrivate(controller, "pulling")) {
+ @putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @putByIdDirectPrivate(controller, "pulling", true);
+ @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "pull", [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "pulling", false);
+ if (@getByIdDirectPrivate(controller, "pullAgain")) {
+ @putByIdDirectPrivate(controller, "pullAgain", false);
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ }, (error) => {
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable)
+ @readableByteStreamControllerError(controller, error);
+ });
+}
+
+function transferBufferToCurrentRealm(buffer)
+{
+ "use strict";
+
+ // FIXME: Determine what should be done here exactly (what is already existing in current
+ // codebase and what has to be added). According to spec, Transfer operation should be
+ // performed in order to transfer buffer to current realm. For the moment, simply return
+ // received buffer.
+ return buffer;
+}
+
+function readableStreamReaderKind(reader) {
+ "use strict";
+
+
+ if (!!@getByIdDirectPrivate(reader, "readRequests"))
+ return @getByIdDirectPrivate(reader, "bunNativePtr") ? 3 : 1;
+
+ if (!!@getByIdDirectPrivate(reader, "readIntoRequests"))
+ return 2;
+
+ return 0;
+}
+function readableByteStreamControllerEnqueue(controller, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ @assert(!@getByIdDirectPrivate(controller, "closeRequested"));
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+
+
+ switch (@getByIdDirectPrivate(stream, "reader") ? @readableStreamReaderKind(@getByIdDirectPrivate(stream, "reader")) : 0) {
+ /* default reader */
+ case 1: {
+ if (!@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty())
+ @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
+ else {
+ @assert(!@getByIdDirectPrivate(controller, "queue").content.size());
+ const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
+ @readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ break;
+ }
+
+ /* BYOB */
+ case 2: {
+ @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
+ @readableByteStreamControllerProcessPullDescriptors(controller);
+ break;
+ }
+
+ /* NativeReader */
+ case 3: {
+ // reader.@enqueueNative(@getByIdDirectPrivate(reader, "bunNativePtr"), chunk);
+
+ break;
+ }
+
+ default: {
+ @assert(!@isReadableStreamLocked(stream));
+ @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
+ break;
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerEnqueueChunkToQueue.
+function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset, byteLength)
+{
+ "use strict";
+
+ @getByIdDirectPrivate(controller, "queue").content.push({
+ buffer: buffer,
+ byteOffset: byteOffset,
+ byteLength: byteLength
+ });
+ @getByIdDirectPrivate(controller, "queue").size += byteLength;
+}
+
+function readableByteStreamControllerRespondWithNewView(controller, view)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+
+ if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset)
+ @throwRangeError("Invalid value for view.byteOffset");
+
+ if (firstDescriptor.byteLength !== view.byteLength)
+ @throwRangeError("Invalid value for view.byteLength");
+
+ firstDescriptor.buffer = view.buffer;
+ @readableByteStreamControllerRespondInternal(controller, view.byteLength);
+}
+
+function readableByteStreamControllerRespond(controller, bytesWritten)
+{
+ "use strict";
+
+ bytesWritten = @toNumber(bytesWritten);
+
+ if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 )
+ @throwRangeError("bytesWritten has an incorrect value");
+
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+
+ @readableByteStreamControllerRespondInternal(controller, bytesWritten);
+}
+
+function readableByteStreamControllerRespondInternal(controller, bytesWritten)
+{
+ "use strict";
+
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+ let stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ if (bytesWritten !== 0)
+ @throwTypeError("bytesWritten is different from 0 even though stream is closed");
+ @readableByteStreamControllerRespondInClosedState(controller, firstDescriptor);
+ } else {
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+ @readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);
+ }
+}
+
+function readableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor)
+{
+ "use strict";
+
+ if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength)
+ @throwRangeError("bytesWritten value is too great");
+
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += bytesWritten;
+
+ if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize)
+ return;
+
+ @readableByteStreamControllerShiftPendingDescriptor(controller);
+ const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
+
+ if (remainderSize > 0) {
+ const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ const remainder = @cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize);
+ @readableByteStreamControllerEnqueueChunk(controller, remainder, 0, remainder.byteLength);
+ }
+
+ pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ pullIntoDescriptor.bytesFilled -= remainderSize;
+ @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
+ @readableByteStreamControllerProcessPullDescriptors(controller);
+}
+
+function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor)
+{
+ "use strict";
+
+ firstDescriptor.buffer = @transferBufferToCurrentRealm(firstDescriptor.buffer);
+ @assert(firstDescriptor.bytesFilled === 0);
+
+ if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) {
+ while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) {
+ let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller);
+ @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (shortened for readability).
+function readableByteStreamControllerProcessPullDescriptors(controller)
+{
+ "use strict";
+
+ @assert(!@getByIdDirectPrivate(controller, "closeRequested"));
+ while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) {
+ if (@getByIdDirectPrivate(controller, "queue").size === 0)
+ return;
+ let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+ if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
+ @readableByteStreamControllerShiftPendingDescriptor(controller);
+ @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerFillPullIntoDescriptorFromQueue (shortened for readability).
+function readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)
+{
+ "use strict";
+
+ const currentAlignedBytes = pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize);
+ const maxBytesToCopy = @getByIdDirectPrivate(controller, "queue").size < pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled ?
+ @getByIdDirectPrivate(controller, "queue").size : pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled;
+ const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % pullIntoDescriptor.elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;
+ ready = true;
+ }
+
+ while (totalBytesToCopyRemaining > 0) {
+ let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek();
+ const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength;
+ // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer.
+ // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes
+ // operation of ECMAScript (the case of Shared Data Block is not considered here, but it doesn't seem to be an issue).
+ const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ // FIXME: As indicated in comments of bug 172717, access to set is not safe. However, using prototype.@set.@call does
+ // not work (@set is undefined). A safe way to do that is needed.
+ new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart);
+
+ if (headOfQueue.byteLength === bytesToCopy)
+ @getByIdDirectPrivate(controller, "queue").content.shift();
+ else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+
+ @getByIdDirectPrivate(controller, "queue").size -= bytesToCopy;
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += bytesToCopy;
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+
+ if (!ready) {
+ @assert(@getByIdDirectPrivate(controller, "queue").size === 0);
+ @assert(pullIntoDescriptor.bytesFilled > 0);
+ @assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
+ }
+
+ return ready;
+}
+
+// Spec name: readableByteStreamControllerShiftPendingPullInto (renamed for consistency).
+function readableByteStreamControllerShiftPendingDescriptor(controller)
+{
+ "use strict";
+
+ let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift();
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ return descriptor;
+}
+
+function readableByteStreamControllerInvalidateBYOBRequest(controller)
+{
+ "use strict";
+
+ if (@getByIdDirectPrivate(controller, "byobRequest") === @undefined)
+ return;
+ const byobRequest = @getByIdDirectPrivate(controller, "byobRequest");
+ @putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", @undefined);
+ @putByIdDirectPrivate(byobRequest, "view", @undefined);
+ @putByIdDirectPrivate(controller, "byobRequest", @undefined);
+}
+
+// Spec name: readableByteStreamControllerCommitPullIntoDescriptor (shortened for readability).
+function readableByteStreamControllerCommitDescriptor(stream, pullIntoDescriptor)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "state") !== @streamErrored);
+ let done = false;
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ @assert(!pullIntoDescriptor.bytesFilled);
+ done = true;
+ }
+ let filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
+ if (pullIntoDescriptor.readerType === "default")
+ @readableStreamFulfillReadRequest(stream, filledView, done);
+ else {
+ @assert(pullIntoDescriptor.readerType === "byob");
+ @readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+}
+
+// Spec name: readableByteStreamControllerConvertPullIntoDescriptor (shortened for readability).
+function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor)
+{
+ "use strict";
+
+ @assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.byteLength);
+ @assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
+
+ return new pullIntoDescriptor.ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize);
+}
+
+function readableStreamFulfillReadIntoRequest(stream, chunk, done)
+{
+ "use strict";
+ const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift();
+ @fulfillPromise(readIntoRequest, { value: chunk, done: done });
+}
+
+function readableStreamBYOBReaderRead(reader, view)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
+ @assert(!!stream);
+
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ if (@getByIdDirectPrivate(stream, "state") === @streamErrored)
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+
+ return @readableByteStreamControllerPullInto(@getByIdDirectPrivate(stream, "readableStreamController"), view);
+}
+
+function readableByteStreamControllerPullInto(controller, view)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ let elementSize = 1;
+ // Spec describes that in the case where view is a TypedArray, elementSize
+ // should be set to the size of an element (e.g. 2 for UInt16Array). For
+ // DataView, BYTES_PER_ELEMENT is undefined, contrary to the same property
+ // for TypedArrays.
+ // FIXME: Getting BYTES_PER_ELEMENT like this is not safe (property is read-only
+ // but can be modified if the prototype is redefined). A safe way of getting
+ // it would be to determine which type of ArrayBufferView view is an instance
+ // of based on typed arrays private variables. However, this is not possible due
+ // to bug 167697, which prevents access to typed arrays through their private
+ // names unless public name has already been met before.
+ if (view.BYTES_PER_ELEMENT !== @undefined)
+ elementSize = view.BYTES_PER_ELEMENT;
+
+ // FIXME: Getting constructor like this is not safe. A safe way of getting
+ // it would be to determine which type of ArrayBufferView view is an instance
+ // of, and to assign appropriate constructor based on this (e.g. ctor =
+ // @Uint8Array). However, this is not possible due to bug 167697, which
+ // prevents access to typed arrays through their private names unless public
+ // name has already been met before.
+ const ctor = view.constructor;
+
+ const pullIntoDescriptor = {
+ buffer: view.buffer,
+ byteOffset: view.byteOffset,
+ byteLength: view.byteLength,
+ bytesFilled: 0,
+ elementSize,
+ ctor,
+ readerType: 'byob'
+ };
+
+ var pending = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (pending?.isNotEmpty()) {
+ pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ pending.push(pullIntoDescriptor);
+ return @readableStreamAddReadIntoRequest(stream);
+ }
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);
+ return @createFulfilledPromise({ value: emptyView, done: true });
+ }
+
+ if (@getByIdDirectPrivate(controller, "queue").size > 0) {
+ if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
+ const filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
+ @readableByteStreamControllerHandleQueueDrain(controller);
+ return @createFulfilledPromise({ value: filledView, done: false });
+ }
+ if (@getByIdDirectPrivate(controller, "closeRequested")) {
+ const e = @makeTypeError("Closing stream has been requested");
+ @readableByteStreamControllerError(controller, e);
+ return @Promise.@reject(e);
+ }
+ }
+
+ pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
+ const promise = @readableStreamAddReadIntoRequest(stream);
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+function readableStreamAddReadIntoRequest(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStreamBYOBReader(@getByIdDirectPrivate(stream, "reader")));
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed);
+
+ const readRequest = @newPromise();
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest);
+
+ return readRequest;
+}