aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js
diff options
context:
space:
mode:
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js13
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js76
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStream.js84
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js4
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js59
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js39
-rw-r--r--src/javascript/jsc/bindings/builtins/js/StreamInternals.js114
-rw-r--r--src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js19
8 files changed, 251 insertions, 157 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js
index fac5c864c..0b47d730c 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js
@@ -89,12 +89,17 @@ function byobRequest()
if (!@isReadableByteStreamController(this))
throw @makeGetterTypeError("ReadableByteStreamController", "byobRequest");
- if (@getByIdDirectPrivate(this, "byobRequest") === @undefined && @getByIdDirectPrivate(this, "pendingPullIntos").length) {
- const firstDescriptor = @getByIdDirectPrivate(this, "pendingPullIntos")[0];
- const view = new @Uint8Array(firstDescriptor.buffer,
+
+ var request = @getByIdDirectPrivate(this, "byobRequest");
+ if (request === @undefined) {
+ var pending = @getByIdDirectPrivate(this, "pendingPullIntos");
+ const firstDescriptor = pending.peek();
+ if (firstDescriptor) {
+ const view = new @Uint8Array(firstDescriptor.buffer,
firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
firstDescriptor.byteLength - firstDescriptor.bytesFilled);
- @putByIdDirectPrivate(this, "byobRequest", new @ReadableStreamBYOBRequest(this, view, @isReadableStream));
+ @putByIdDirectPrivate(this, "byobRequest", new @ReadableStreamBYOBRequest(this, view, @isReadableStream));
+ }
}
return @getByIdDirectPrivate(this, "byobRequest");
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
index 45af990cd..5806ba48c 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
@@ -56,7 +56,7 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou
@throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
}
@putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize);
- @putByIdDirectPrivate(this, "pendingPullIntos", []);
+ @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO());
const controller = this;
const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => {
@@ -116,8 +116,10 @@ function readableByteStreamControllerCancel(controller, reason)
"use strict";
var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos");
- if (pendingPullIntos.length > 0)
- pendingPullIntos[0].bytesFilled = 0;
+ var first = pendingPullIntos.peek();
+ if (first)
+ first.bytesFilled = 0;
+
@putByIdDirectPrivate(controller, "queue", @newQueue());
return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]);
}
@@ -144,9 +146,9 @@ function readableByteStreamControllerClose(controller)
return;
}
- var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos");
- if (pendingPullIntos.length > 0) {
- if (pendingPullIntos[0].bytesFilled > 0) {
+ var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek();
+ if (first) {
+ if (first.bytesFilled > 0) {
const e = @makeTypeError("Close requested while there remain pending bytes");
@readableByteStreamControllerError(controller, e);
throw e;
@@ -161,7 +163,12 @@ function readableByteStreamControllerClearPendingPullIntos(controller)
"use strict";
@readableByteStreamControllerInvalidateBYOBRequest(controller);
- @putByIdDirectPrivate(controller, "pendingPullIntos", []);
+ var existing = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (existing !== @undefined) {
+ existing.clear();
+ } else {
+ @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO());
+ }
}
function readableByteStreamControllerGetDesiredSize(controller)
@@ -214,8 +221,8 @@ function readableByteStreamControllerPull(controller)
@assert(@readableStreamHasDefaultReader(stream));
if (@getByIdDirectPrivate(controller, "queue").size > 0) {
- @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length === 0);
- const entry = @getByIdDirectPrivate(controller, "queue").content.@shift();
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isEmpty());
+ const entry = @getByIdDirectPrivate(controller, "queue").content.shift();
@getByIdDirectPrivate(controller, "queue").size -= entry.byteLength;
@readableByteStreamControllerHandleQueueDrain(controller);
let view;
@@ -230,7 +237,7 @@ function readableByteStreamControllerPull(controller)
if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) {
let buffer;
try {
- buffer = new @ArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
+ buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
} catch (error) {
return @Promise.@reject(error);
}
@@ -243,7 +250,7 @@ function readableByteStreamControllerPull(controller)
ctor: @Uint8Array,
readerType: 'default'
};
- @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor);
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
}
const promise = @readableStreamAddReadRequest(stream);
@@ -263,9 +270,9 @@ function readableByteStreamControllerShouldCallPull(controller)
return false;
if (!@getByIdDirectPrivate(controller, "started"))
return false;
- if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length > 0 || !!@getByIdDirectPrivate(reader, "bunNativePtr")))
+ if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr")))
return true;
- if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").length > 0)
+ if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty())
return true;
if (@readableByteStreamControllerGetDesiredSize(controller) > 0)
return true;
@@ -334,10 +341,10 @@ function readableByteStreamControllerEnqueue(controller, chunk)
switch (reader ? @readableStreamReaderKind(reader) : 0) {
/* default reader */
case 1: {
- if (!@getByIdDirectPrivate(reader, "readRequests").length)
+ if (!@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty())
@readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
else {
- @assert(!@getByIdDirectPrivate(controller, "queue").content.length);
+ @assert(!@getByIdDirectPrivate(controller, "queue").content.size());
const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
@readableStreamFulfillReadRequest(stream, transferredView, false);
}
@@ -371,7 +378,7 @@ function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset
{
"use strict";
- @arrayPush(@getByIdDirectPrivate(controller, "queue").content, {
+ @getByIdDirectPrivate(controller, "queue").content.push({
buffer: buffer,
byteOffset: byteOffset,
byteLength: byteLength
@@ -383,10 +390,10 @@ function readableByteStreamControllerRespondWithNewView(controller, view)
{
"use strict";
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0);
-
- let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0];
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+
if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset)
@throwRangeError("Invalid value for view.byteOffset");
@@ -406,7 +413,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten)
if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 )
@throwRangeError("bytesWritten has an incorrect value");
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0);
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
@readableByteStreamControllerRespondInternal(controller, bytesWritten);
}
@@ -415,7 +422,7 @@ function readableByteStreamControllerRespondInternal(controller, bytesWritten)
{
"use strict";
- let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0];
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
let stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
@@ -435,7 +442,7 @@ function readableByteStreamControllerRespondInReadableState(controller, bytesWri
if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength)
@throwRangeError("bytesWritten value is too great");
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor);
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
@readableByteStreamControllerInvalidateBYOBRequest(controller);
pullIntoDescriptor.bytesFilled += bytesWritten;
@@ -465,7 +472,7 @@ function readableByteStreamControllerRespondInClosedState(controller, firstDescr
@assert(firstDescriptor.bytesFilled === 0);
if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) {
- while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests").length > 0) {
+ while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) {
let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller);
@readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
}
@@ -478,10 +485,10 @@ function readableByteStreamControllerProcessPullDescriptors(controller)
"use strict";
@assert(!@getByIdDirectPrivate(controller, "closeRequested"));
- while (@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0) {
+ while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) {
if (@getByIdDirectPrivate(controller, "queue").size === 0)
return;
- let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0];
+ let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
@readableByteStreamControllerShiftPendingDescriptor(controller);
@readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
@@ -508,7 +515,7 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt
}
while (totalBytesToCopyRemaining > 0) {
- let headOfQueue = @getByIdDirectPrivate(controller, "queue").content[0];
+ let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek();
const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength;
// Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer.
// Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes
@@ -519,14 +526,14 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt
new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart);
if (headOfQueue.byteLength === bytesToCopy)
- @getByIdDirectPrivate(controller, "queue").content.@shift();
+ @getByIdDirectPrivate(controller, "queue").content.shift();
else {
headOfQueue.byteOffset += bytesToCopy;
headOfQueue.byteLength -= bytesToCopy;
}
@getByIdDirectPrivate(controller, "queue").size -= bytesToCopy;
- @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor);
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
@readableByteStreamControllerInvalidateBYOBRequest(controller);
pullIntoDescriptor.bytesFilled += bytesToCopy;
totalBytesToCopyRemaining -= bytesToCopy;
@@ -546,7 +553,7 @@ function readableByteStreamControllerShiftPendingDescriptor(controller)
{
"use strict";
- let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").@shift();
+ let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift();
@readableByteStreamControllerInvalidateBYOBRequest(controller);
return descriptor;
}
@@ -597,7 +604,7 @@ function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor)
function readableStreamFulfillReadIntoRequest(stream, chunk, done)
{
"use strict";
- const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").@shift();
+ const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift();
@fulfillPromise(readIntoRequest, { value: chunk, done: done });
}
@@ -652,9 +659,10 @@ function readableByteStreamControllerPullInto(controller, view)
readerType: 'byob'
};
- if (@getByIdDirectPrivate(controller, "pendingPullIntos").length) {
+ var pending = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (pending?.isNotEmpty()) {
pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
- @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor);
+ pending.push(pullIntoDescriptor);
return @readableStreamAddReadIntoRequest(stream);
}
@@ -677,7 +685,7 @@ function readableByteStreamControllerPullInto(controller, view)
}
pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
- @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor);
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
const promise = @readableStreamAddReadIntoRequest(stream);
@readableByteStreamControllerCallPullIfNeeded(controller);
return promise;
@@ -691,7 +699,7 @@ function readableStreamAddReadIntoRequest(stream)
@assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed);
const readRequest = @newPromise();
- @arrayPush(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests"), readRequest);
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest);
return readRequest;
}
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
index 5a5ea4094..4d7113888 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
@@ -89,79 +89,69 @@ function initializeReadableStream(underlyingSource, strategy)
}
@globalPrivate
-function createNativeReadableStream(nativePtr, nativeType) {
+function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
"use strict";
var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map;
var Prototype = cached.@get(nativeType);
if (Prototype === @undefined) {
var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
var closer = [false];
-
+var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
"use strict";
- var {r, c} = this;
- this.r = @undefined;
+ var {c, v} = this;
this.c = @undefined;
- r(val, c);
+ this.v = @undefined;
+ handleResult(val, c, v);
}
- function closeNativeReadableStreamOnNextTick(controller) {
- "use strict";
- controller.close();
- controller = @undefined;
- }
-
- var handleResult = function handleResult(result, controller) {
+
+ handleResult = function handleResult(result, controller, view) {
"use strict";
if (result && @isPromise(result)) {
- return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, r: handleResult}), controller.error);
+ return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err));
} else if (result !== false) {
- controller.enqueue(result);
+ if (view && view.byteLength === result) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
}
if (closer[0] || result === false) {
- @enqueueJob(closeNativeReadableStreamOnNextTick, controller);
+ @enqueueJob(() => controller.close());
closer[0] = false;
}
- }
+ };
Prototype = class NativeReadableStreamSource {
- constructor(tag) {
+ constructor(tag, autoAllocateChunkSize) {
this.pull = this.pull_.bind(tag);
- this.start = this.start_.bind(tag);
this.cancel = this.cancel_.bind(tag);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
}
pull;
- start;
cancel;
-
- pull_(controller) {
- closer[0] = false;
- var result;
-
- try {
- result = pull(this, closer);
- } catch(err) {
- return controller.error(err);
- }
- return handleResult(result, controller);
- }
+ type = "bytes";
+ autoAllocateChunkSize = 0;
- start_(controller) {
- setClose(this, controller.close);
+ static startSync = start;
+
+ pull_(controller) {
closer[0] = false;
var result;
+ const view = controller.byobRequest.view;
try {
- result = start(this, closer);
+ result = pull(this, view, closer);
} catch(err) {
return controller.error(err);
}
- return handleResult(result, controller);
+ return handleResult(result, controller, view);
}
cancel_(reason) {
@@ -173,7 +163,29 @@ function createNativeReadableStream(nativePtr, nativeType) {
cached.@set(nativeType, Prototype);
}
- var instance = new Prototype(nativePtr);
+ // either returns the chunk size
+ // or throws an error
+ // should never return a Promise
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ return new @ReadableStream({
+ start(controller) {
+ controller.close();
+ },
+
+ pull() {
+
+ },
+
+ cancel() {
+
+ },
+ });
+ }
+
+ var instance = new Prototype(nativePtr, chunkSize);
Prototype.registry.register(instance, nativePtr);
var stream = new @ReadableStream(instance);
@putByIdDirectPrivate(stream, "bunNativeType", nativeType);
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js
index f4bc203c8..16e4ebce5 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js
@@ -34,7 +34,7 @@ function initializeReadableStreamBYOBReader(stream)
@throwTypeError("ReadableStream is locked");
@readableStreamReaderGenericInitialize(this, stream);
- @putByIdDirectPrivate(this, "readIntoRequests", []);
+ @putByIdDirectPrivate(this, "readIntoRequests", @createFIFO());
return this;
}
@@ -84,7 +84,7 @@ function releaseLock()
if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
return;
- if (@getByIdDirectPrivate(this, "readIntoRequests").length)
+ if (@getByIdDirectPrivate(this, "readIntoRequests")?.isNotEmpty())
@throwTypeError("There are still pending read requests, cannot release the lock");
@readableStreamReaderGenericRelease(this);
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js
index 3e7438a62..9766d150e 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js
@@ -33,7 +33,7 @@ function initializeReadableStreamDefaultReader(stream)
@throwTypeError("ReadableStream is locked");
@readableStreamReaderGenericInitialize(this, stream);
- @putByIdDirectPrivate(this, "readRequests", []);
+ @putByIdDirectPrivate(this, "readRequests", @createFIFO());
return this;
}
@@ -70,30 +70,14 @@ function readMany()
throw @getByIdDirectPrivate(stream, "storedError");
}
- const controller = @getByIdDirectPrivate(stream, "readableStreamController");
+ var controller = @getByIdDirectPrivate(stream, "readableStreamController");
const content = @getByIdDirectPrivate(controller, "queue").content;
var size = @getByIdDirectPrivate(controller, "queue").size;
+ var values = content.toArray(false);
+ var length = values.length;
- var values = new @Array(content.length);
-
- if (content.length > 0) {
- if ("buffer" in content[0]) {
- values[0] = new @Uint8Array(content[0].buffer, content[0].byteOffset, content[0].byteLength);
- for (var i = 0; i < content.length; ++i) {
- @putByValDirect(values, i+1, new @Uint8Array(content[i].buffer, content[i].byteOffset, content[i].byteLength));
- }
- } else if (typeof content[0] === 'object' && content[0] && "byteLength" in content[0]) {
- size = 0;
- for (var i = 0; i < content.length; ++i) {
- @putByValDirect(values, i+1, content[i].value);
- size += content[i].value.byteLength;
- }
- } else {
- for (var i = 0; i < content.length; ++i) {
- @putByValDirect(values, i+1, content[i].value);
- }
- }
+ if (length > 0) {
@resetQueue(@getByIdDirectPrivate(controller, "queue"));
if (@getByIdDirectPrivate(controller, "closeRequested"))
@@ -105,44 +89,23 @@ function readMany()
if (done) {
return {value: [], size: 0, done: true};
}
-
- const content = queue.content;
- var values = new @Array(content.length + 1);
-
+ var queue = @getByIdDirectPrivate(controller, "queue");
+ const content = [value].concat(queue.content.toArray(false));
var size = queue.size;
-
- if ("buffer" in content[0]) {
- values[0] = new @Uint8Array(value.buffer, value.byteOffset, value.byteLength);
- for (var i = 0; i < content.length; ++i) {
- @putByValDirect(values, i+1, new @Uint8Array(content[i].buffer, content[i].byteOffset, content[i].byteLength));
- }
- size += value.byteLength;
- } else if (typeof value === 'object' && value && "byteLength" in value) {
- size += value.byteLength;
- values[0] = value;
- for (var i = 0; i < content.length; ++i) {
- @putByValDirect(values, i+1, content[i].value);
- size += content[i].value.byteLength;
- }
-
- } else {
- values[0] = value;
- for (var i = 0; i < content.length; ++i) {
- @putByValDirect(values, i+1, content[i].value);
- }
- }
-
@resetQueue(queue);
if (@getByIdDirectPrivate(controller, "closeRequested"))
@readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
else
@readableStreamDefaultControllerCallPullIfNeeded(controller);
+ controller = @undefined;
return {value: values, size: size, done: false};
});
}
+ controller = @undefined;
+
return {value: values, size, done: false};
}
@@ -168,7 +131,7 @@ function releaseLock()
if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
return;
- if (@getByIdDirectPrivate(this, "readRequests").length)
+ if (@getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
@throwTypeError("There are still pending read requests, cannot release the lock");
@readableStreamReaderGenericRelease(this);
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
index 7c2384330..f441858cc 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
@@ -547,15 +547,15 @@ function readableStreamError(stream, error)
if (@isReadableStreamDefaultReader(reader)) {
const requests = @getByIdDirectPrivate(reader, "readRequests");
- @putByIdDirectPrivate(reader, "readRequests", []);
- for (let index = 0, length = requests.length; index < length; ++index)
- @rejectPromise(requests[index], error);
+ @putByIdDirectPrivate(reader, "readRequests", @createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift())
+ @rejectPromise(request, error);
} else {
@assert(@isReadableStreamBYOBReader(reader));
const requests = @getByIdDirectPrivate(reader, "readIntoRequests");
- @putByIdDirectPrivate(reader, "readIntoRequests", []);
- for (let index = 0, length = requests.length; index < length; ++index)
- @rejectPromise(requests[index], error);
+ @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift())
+ @rejectPromise(request, error);
}
@getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error);
@@ -571,7 +571,7 @@ function readableStreamDefaultControllerShouldCallPull(controller)
return false;
if (!@getByIdDirectPrivate(controller, "started"))
return false;
- if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
+ if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return false;
const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller);
@assert(desiredSize !== null);
@@ -589,7 +589,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller)
return;
if (!@getByIdDirectPrivate(controller, "started"))
return;
- if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
+ if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return;
if (@getByIdDirectPrivate(controller, "pulling")) {
@@ -670,9 +670,10 @@ function readableStreamDefaultControllerPull(controller)
{
"use strict";
- if (@getByIdDirectPrivate(controller, "queue").content.length) {
- const chunk = @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
- if (@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(controller, "queue").content.length === 0)
+ var queue = @getByIdDirectPrivate(controller, "queue");
+ if (queue.isNotEmpty()) {
+ const chunk = @dequeueValue(queue);
+ if (@getByIdDirectPrivate(controller, "closeRequested") && queue.isEmpty())
@readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
else
@readableStreamDefaultControllerCallPullIfNeeded(controller);
@@ -690,7 +691,7 @@ function readableStreamDefaultControllerClose(controller)
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
@putByIdDirectPrivate(controller, "closeRequested", true);
- if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
+ if (!@getByIdDirectPrivate(controller, "queue")?.isNotEmpty())
@readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
}
@@ -707,9 +708,10 @@ function readableStreamClose(stream)
if (@isReadableStreamDefaultReader(reader)) {
const requests = @getByIdDirectPrivate(reader, "readRequests");
- @putByIdDirectPrivate(reader, "readRequests", []);
- for (let index = 0, length = requests.length; index < length; ++index)
- @fulfillPromise(requests[index], { value: @undefined, done: true });
+ @putByIdDirectPrivate(reader, "readRequests", @createFIFO());
+
+ for (var request = requests.shift(); request; request = requests.shift())
+ @fulfillPromise(request, { value: @undefined, done: true });
}
@getByIdDirectPrivate(reader, "closedPromiseCapability").@resolve.@call();
@@ -718,7 +720,7 @@ function readableStreamClose(stream)
function readableStreamFulfillReadRequest(stream, chunk, done)
{
"use strict";
- const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@shift();
+ const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift();
@fulfillPromise(readRequest, { value: chunk, done: done });
}
@@ -730,7 +732,7 @@ function readableStreamDefaultControllerEnqueue(controller, chunk)
// this is checked by callers
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
- if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) {
+ if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").isNotEmpty) {
@readableStreamFulfillReadRequest(stream, chunk, false);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return;
@@ -775,7 +777,8 @@ function readableStreamAddReadRequest(stream)
@assert(@getByIdDirectPrivate(stream, "state") == @streamReadable);
const readRequest = @newPromise();
- @arrayPush(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests"), readRequest);
+
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest);
return readRequest;
}
diff --git a/src/javascript/jsc/bindings/builtins/js/StreamInternals.js b/src/javascript/jsc/bindings/builtins/js/StreamInternals.js
index 9c2103293..c2ca3f5b5 100644
--- a/src/javascript/jsc/bindings/builtins/js/StreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/StreamInternals.js
@@ -114,18 +114,119 @@ function validateAndNormalizeQueuingStrategy(size, highWaterMark)
return { size: size, highWaterMark: newHighWaterMark };
}
+@globalPrivate
+function createFIFO() {
+ "use strict";
+ class Denqueue {
+ constructor() {
+ this._head = 0;
+ this._tail = 0;
+ // this._capacity = 0;
+ this._capacityMask = 0x3;
+ this._list = @newArrayWithSize(4);
+ }
+
+ size() {
+ if (this._head === this._tail) return 0;
+ if (this._head < this._tail) return this._tail - this._head;
+ else return this._capacityMask + 1 - (this._head - this._tail);
+ }
+
+ isEmpty() {
+ return this.size() == 0;
+ }
+
+ isNotEmpty() {
+ return this.size() > 0;
+ }
+
+ shift() {
+ var head = this._head;
+ if (head === this._tail) return @undefined;
+ var item = this._list[head];
+ @putByValDirect(this._list, head, @undefined);
+ this._head = (head + 1) & this._capacityMask;
+ if (head < 2 && this._tail > 10000 && this._tail <= this._list.length >>> 2) this._shrinkArray();
+ return item;
+ }
+
+ peek() {
+ if (this._head === this._tail) return @undefined;
+ return this._list[this._head];
+ }
+
+ push(item) {
+ var tail = this._tail;
+ @putByValDirect(this._list, tail, item);
+ this._tail = (tail + 1) & this._capacityMask;
+ if (this._tail === this._head) {
+ this._growArray();
+ }
+ // if (this._capacity && this.size() > this._capacity) {
+ // this.shift();
+ // }
+ }
+
+ toArray(fullCopy) {
+ var list = this._list;
+ var len = @toLength(list.length);
+
+ if (fullCopy || this._head > this._tail) {
+ var _head = @toLength(this._head);
+ var _tail = @toLength(this._tail);
+ var total = @toLength((len - _head) + _tail);
+ var array = @newArrayWithSize(total);
+ var j = 0;
+ for (var i = _head; i < len; i++) @putByValDirect(array, j++, list[i]);
+ for (var i = 0; i < _tail; i++) @putByValDirect(array, j++, list[i]);
+ return array;
+ } else {
+ return @Array.prototype.slice.@call(list, this._head, this._tail);
+ }
+ }
+
+ clear() {
+ this._head = 0;
+ this._tail = 0;
+ this._list.fill(undefined);
+ }
+
+ _growArray() {
+ if (this._head) {
+ // copy existing data, head to end, then beginning to tail.
+ this._list = this.toArray(true);
+ this._head = 0;
+ }
+
+ // head is at 0 and array is now full, safe to extend
+ this._tail = @toLength(this._list.length);
+
+ this._list.length <<= 1;
+ this._capacityMask = (this._capacityMask << 1) | 1;
+ }
+
+ shrinkArray() {
+ this._list.length >>>= 1;
+ this._capacityMask >>>= 1;
+ }
+ }
+
+
+ return new Denqueue();
+}
+
function newQueue()
{
"use strict";
- return { content: [], size: 0 };
+ return { content: @createFIFO(), size: 0 };
}
function dequeueValue(queue)
{
"use strict";
- const record = queue.content.@shift();
+ const record = queue.content.shift();
queue.size -= record.size;
// As described by spec, below case may occur due to rounding errors.
if (queue.size < 0)
@@ -140,7 +241,8 @@ function enqueueValueWithSize(queue, value, size)
size = @toNumber(size);
if (!@isFinite(size) || size < 0)
@throwRangeError("size has an incorrect value");
- @arrayPush(queue.content, { value, size });
+
+ queue.content.push({ value, size });
queue.size += size;
}
@@ -148,9 +250,9 @@ function peekQueueValue(queue)
{
"use strict";
- @assert(queue.content.length > 0);
+ @assert(queue.content.isNotEmpty());
- return queue.content[0].value;
+ return queue.peek().value;
}
function resetQueue(queue)
@@ -159,7 +261,7 @@ function resetQueue(queue)
@assert("content" in queue);
@assert("size" in queue);
- queue.content = [];
+ queue.content.clear();
queue.size = 0;
}
diff --git a/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js
index 406b9ea48..6b2b3cf90 100644
--- a/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js
@@ -118,7 +118,7 @@ function initializeWritableStreamSlots(stream, underlyingSink)
@putByIdDirectPrivate(stream, "closeRequest", @undefined);
@putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
@putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
- @putByIdDirectPrivate(stream, "writeRequests", []);
+ @putByIdDirectPrivate(stream, "writeRequests", @createFIFO());
@putByIdDirectPrivate(stream, "backpressure", false);
@putByIdDirectPrivate(stream, "underlyingSink", underlyingSink);
}
@@ -233,7 +233,7 @@ function writableStreamAddWriteRequest(stream)
const writePromiseCapability = @newPromiseCapability(@Promise);
const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
- @arrayPush(writeRequests, writePromiseCapability);
+ writeRequests.push(writePromiseCapability);
return writePromiseCapability.@promise;
}
@@ -266,10 +266,11 @@ function writableStreamFinishErroring(stream)
const storedError = @getByIdDirectPrivate(stream, "storedError");
const requests = @getByIdDirectPrivate(stream, "writeRequests");
- for (let index = 0, length = requests.length; index < length; ++index)
- requests[index].@reject.@call(@undefined, storedError);
+ for (var request = requests.shift(); request; request = requests.shift())
+ request.@reject.@call(@undefined, storedError);
- @putByIdDirectPrivate(stream, "writeRequests", []);
+ // TODO: is this still necessary?
+ @putByIdDirectPrivate(stream, "writeRequests", @createFIFO());
const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
if (abortRequest === @undefined) {
@@ -384,9 +385,9 @@ function writableStreamMarkFirstWriteRequestInFlight(stream)
{
const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
@assert(@getByIdDirectPrivate(stream, "inFlightWriteRequest") === @undefined);
- @assert(writeRequests.length > 0);
+ @assert(writeRequests.isNotEmpty());
- const writeRequest = writeRequests.@shift();
+ const writeRequest = writeRequests.shift();
@putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest);
}
@@ -649,7 +650,7 @@ function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller)
return;
}
- if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
+ if (@getByIdDirectPrivate(controller, "queue").content?.isEmpty() ?? false)
return;
const value = @peekQueueValue(@getByIdDirectPrivate(controller, "queue"));
@@ -722,7 +723,7 @@ function writableStreamDefaultControllerProcessClose(controller)
@writableStreamMarkCloseRequestInFlight(stream);
@dequeueValue(@getByIdDirectPrivate(controller, "queue"));
- @assert(@getByIdDirectPrivate(controller, "queue").content.length === 0);
+ @assert(@getByIdDirectPrivate(controller, "queue").content?.isEmpty());
const sinkClosePromise = @getByIdDirectPrivate(controller, "closeAlgorithm").@call();
@writableStreamDefaultControllerClearAlgorithms(controller);