diff options
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
8 files changed, 251 insertions, 157 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js index fac5c864c..0b47d730c 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamController.js @@ -89,12 +89,17 @@ function byobRequest() if (!@isReadableByteStreamController(this)) throw @makeGetterTypeError("ReadableByteStreamController", "byobRequest"); - if (@getByIdDirectPrivate(this, "byobRequest") === @undefined && @getByIdDirectPrivate(this, "pendingPullIntos").length) { - const firstDescriptor = @getByIdDirectPrivate(this, "pendingPullIntos")[0]; - const view = new @Uint8Array(firstDescriptor.buffer, + + var request = @getByIdDirectPrivate(this, "byobRequest"); + if (request === @undefined) { + var pending = @getByIdDirectPrivate(this, "pendingPullIntos"); + const firstDescriptor = pending.peek(); + if (firstDescriptor) { + const view = new @Uint8Array(firstDescriptor.buffer, firstDescriptor.byteOffset + firstDescriptor.bytesFilled, firstDescriptor.byteLength - firstDescriptor.bytesFilled); - @putByIdDirectPrivate(this, "byobRequest", new @ReadableStreamBYOBRequest(this, view, @isReadableStream)); + @putByIdDirectPrivate(this, "byobRequest", new @ReadableStreamBYOBRequest(this, view, @isReadableStream)); + } } return @getByIdDirectPrivate(this, "byobRequest"); diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js index 45af990cd..5806ba48c 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js @@ -56,7 +56,7 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity"); } @putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize); - @putByIdDirectPrivate(this, "pendingPullIntos", []); + @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO()); const controller = this; const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => { @@ -116,8 +116,10 @@ function readableByteStreamControllerCancel(controller, reason) "use strict"; var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos"); - if (pendingPullIntos.length > 0) - pendingPullIntos[0].bytesFilled = 0; + var first = pendingPullIntos.peek(); + if (first) + first.bytesFilled = 0; + @putByIdDirectPrivate(controller, "queue", @newQueue()); return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]); } @@ -144,9 +146,9 @@ function readableByteStreamControllerClose(controller) return; } - var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos"); - if (pendingPullIntos.length > 0) { - if (pendingPullIntos[0].bytesFilled > 0) { + var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek(); + if (first) { + if (first.bytesFilled > 0) { const e = @makeTypeError("Close requested while there remain pending bytes"); @readableByteStreamControllerError(controller, e); throw e; @@ -161,7 +163,12 @@ function readableByteStreamControllerClearPendingPullIntos(controller) "use strict"; @readableByteStreamControllerInvalidateBYOBRequest(controller); - @putByIdDirectPrivate(controller, "pendingPullIntos", []); + var existing = @getByIdDirectPrivate(controller, "pendingPullIntos"); + if (existing !== @undefined) { + existing.clear(); + } else { + @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO()); + } } function readableByteStreamControllerGetDesiredSize(controller) @@ -214,8 +221,8 @@ function readableByteStreamControllerPull(controller) @assert(@readableStreamHasDefaultReader(stream)); if (@getByIdDirectPrivate(controller, "queue").size > 0) { - @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length === 0); - const entry = @getByIdDirectPrivate(controller, "queue").content.@shift(); + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isEmpty()); + const entry = @getByIdDirectPrivate(controller, "queue").content.shift(); @getByIdDirectPrivate(controller, "queue").size -= entry.byteLength; @readableByteStreamControllerHandleQueueDrain(controller); let view; @@ -230,7 +237,7 @@ function readableByteStreamControllerPull(controller) if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) { let buffer; try { - buffer = new @ArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize")); + buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize")); } catch (error) { return @Promise.@reject(error); } @@ -243,7 +250,7 @@ function readableByteStreamControllerPull(controller) ctor: @Uint8Array, readerType: 'default' }; - @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor); + @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); } const promise = @readableStreamAddReadRequest(stream); @@ -263,9 +270,9 @@ function readableByteStreamControllerShouldCallPull(controller) return false; if (!@getByIdDirectPrivate(controller, "started")) return false; - if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length > 0 || !!@getByIdDirectPrivate(reader, "bunNativePtr"))) + if (@readableStreamHasDefaultReader(stream) && (@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr"))) return true; - if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").length > 0) + if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty()) return true; if (@readableByteStreamControllerGetDesiredSize(controller) > 0) return true; @@ -334,10 +341,10 @@ function readableByteStreamControllerEnqueue(controller, chunk) switch (reader ? @readableStreamReaderKind(reader) : 0) { /* default reader */ case 1: { - if (!@getByIdDirectPrivate(reader, "readRequests").length) + if (!@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty()) @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength); else { - @assert(!@getByIdDirectPrivate(controller, "queue").content.length); + @assert(!@getByIdDirectPrivate(controller, "queue").content.size()); const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); @readableStreamFulfillReadRequest(stream, transferredView, false); } @@ -371,7 +378,7 @@ function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset { "use strict"; - @arrayPush(@getByIdDirectPrivate(controller, "queue").content, { + @getByIdDirectPrivate(controller, "queue").content.push({ buffer: buffer, byteOffset: byteOffset, byteLength: byteLength @@ -383,10 +390,10 @@ function readableByteStreamControllerRespondWithNewView(controller, view) { "use strict"; - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0); - - let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0]; + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); + let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) @throwRangeError("Invalid value for view.byteOffset"); @@ -406,7 +413,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 ) @throwRangeError("bytesWritten has an incorrect value"); - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0); + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); @readableByteStreamControllerRespondInternal(controller, bytesWritten); } @@ -415,7 +422,7 @@ function readableByteStreamControllerRespondInternal(controller, bytesWritten) { "use strict"; - let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0]; + let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); let stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { @@ -435,7 +442,7 @@ function readableByteStreamControllerRespondInReadableState(controller, bytesWri if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) @throwRangeError("bytesWritten value is too great"); - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor); + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor); @readableByteStreamControllerInvalidateBYOBRequest(controller); pullIntoDescriptor.bytesFilled += bytesWritten; @@ -465,7 +472,7 @@ function readableByteStreamControllerRespondInClosedState(controller, firstDescr @assert(firstDescriptor.bytesFilled === 0); if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) { - while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests").length > 0) { + while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) { let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller); @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); } @@ -478,10 +485,10 @@ function readableByteStreamControllerProcessPullDescriptors(controller) "use strict"; @assert(!@getByIdDirectPrivate(controller, "closeRequested")); - while (@getByIdDirectPrivate(controller, "pendingPullIntos").length > 0) { + while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) { if (@getByIdDirectPrivate(controller, "queue").size === 0) return; - let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos")[0]; + let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) { @readableByteStreamControllerShiftPendingDescriptor(controller); @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); @@ -508,7 +515,7 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt } while (totalBytesToCopyRemaining > 0) { - let headOfQueue = @getByIdDirectPrivate(controller, "queue").content[0]; + let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek(); const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength; // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer. // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes @@ -519,14 +526,14 @@ function readableByteStreamControllerFillDescriptorFromQueue(controller, pullInt new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart); if (headOfQueue.byteLength === bytesToCopy) - @getByIdDirectPrivate(controller, "queue").content.@shift(); + @getByIdDirectPrivate(controller, "queue").content.shift(); else { headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; } @getByIdDirectPrivate(controller, "queue").size -= bytesToCopy; - @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").length === 0 || @getByIdDirectPrivate(controller, "pendingPullIntos")[0] === pullIntoDescriptor); + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor); @readableByteStreamControllerInvalidateBYOBRequest(controller); pullIntoDescriptor.bytesFilled += bytesToCopy; totalBytesToCopyRemaining -= bytesToCopy; @@ -546,7 +553,7 @@ function readableByteStreamControllerShiftPendingDescriptor(controller) { "use strict"; - let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").@shift(); + let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift(); @readableByteStreamControllerInvalidateBYOBRequest(controller); return descriptor; } @@ -597,7 +604,7 @@ function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor) function readableStreamFulfillReadIntoRequest(stream, chunk, done) { "use strict"; - const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").@shift(); + const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift(); @fulfillPromise(readIntoRequest, { value: chunk, done: done }); } @@ -652,9 +659,10 @@ function readableByteStreamControllerPullInto(controller, view) readerType: 'byob' }; - if (@getByIdDirectPrivate(controller, "pendingPullIntos").length) { + var pending = @getByIdDirectPrivate(controller, "pendingPullIntos"); + if (pending?.isNotEmpty()) { pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); - @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor); + pending.push(pullIntoDescriptor); return @readableStreamAddReadIntoRequest(stream); } @@ -677,7 +685,7 @@ function readableByteStreamControllerPullInto(controller, view) } pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); - @arrayPush(@getByIdDirectPrivate(controller, "pendingPullIntos"), pullIntoDescriptor); + @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); const promise = @readableStreamAddReadIntoRequest(stream); @readableByteStreamControllerCallPullIfNeeded(controller); return promise; @@ -691,7 +699,7 @@ function readableStreamAddReadIntoRequest(stream) @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed); const readRequest = @newPromise(); - @arrayPush(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests"), readRequest); + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest); return readRequest; } diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js index 5a5ea4094..4d7113888 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js @@ -89,79 +89,69 @@ function initializeReadableStream(underlyingSource, strategy) } @globalPrivate -function createNativeReadableStream(nativePtr, nativeType) { +function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { "use strict"; var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; var Prototype = cached.@get(nativeType); if (Prototype === @undefined) { var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType); var closer = [false]; - +var handleResult; function handleNativeReadableStreamPromiseResult(val) { "use strict"; - var {r, c} = this; - this.r = @undefined; + var {c, v} = this; this.c = @undefined; - r(val, c); + this.v = @undefined; + handleResult(val, c, v); } - function closeNativeReadableStreamOnNextTick(controller) { - "use strict"; - controller.close(); - controller = @undefined; - } - - var handleResult = function handleResult(result, controller) { + + handleResult = function handleResult(result, controller, view) { "use strict"; if (result && @isPromise(result)) { - return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, r: handleResult}), controller.error); + return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); } else if (result !== false) { - controller.enqueue(result); + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } } if (closer[0] || result === false) { - @enqueueJob(closeNativeReadableStreamOnNextTick, controller); + @enqueueJob(() => controller.close()); closer[0] = false; } - } + }; Prototype = class NativeReadableStreamSource { - constructor(tag) { + constructor(tag, autoAllocateChunkSize) { this.pull = this.pull_.bind(tag); - this.start = this.start_.bind(tag); this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; } pull; - start; cancel; - - pull_(controller) { - closer[0] = false; - var result; - - try { - result = pull(this, closer); - } catch(err) { - return controller.error(err); - } - return handleResult(result, controller); - } + type = "bytes"; + autoAllocateChunkSize = 0; - start_(controller) { - setClose(this, controller.close); + static startSync = start; + + pull_(controller) { closer[0] = false; var result; + const view = controller.byobRequest.view; try { - result = start(this, closer); + result = pull(this, view, closer); } catch(err) { return controller.error(err); } - return handleResult(result, controller); + return handleResult(result, controller, view); } cancel_(reason) { @@ -173,7 +163,29 @@ function createNativeReadableStream(nativePtr, nativeType) { cached.@set(nativeType, Prototype); } - var instance = new Prototype(nativePtr); + // either returns the chunk size + // or throws an error + // should never return a Promise + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + return new @ReadableStream({ + start(controller) { + controller.close(); + }, + + pull() { + + }, + + cancel() { + + }, + }); + } + + var instance = new Prototype(nativePtr, chunkSize); Prototype.registry.register(instance, nativePtr); var stream = new @ReadableStream(instance); @putByIdDirectPrivate(stream, "bunNativeType", nativeType); diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js index f4bc203c8..16e4ebce5 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamBYOBReader.js @@ -34,7 +34,7 @@ function initializeReadableStreamBYOBReader(stream) @throwTypeError("ReadableStream is locked"); @readableStreamReaderGenericInitialize(this, stream); - @putByIdDirectPrivate(this, "readIntoRequests", []); + @putByIdDirectPrivate(this, "readIntoRequests", @createFIFO()); return this; } @@ -84,7 +84,7 @@ function releaseLock() if (!@getByIdDirectPrivate(this, "ownerReadableStream")) return; - if (@getByIdDirectPrivate(this, "readIntoRequests").length) + if (@getByIdDirectPrivate(this, "readIntoRequests")?.isNotEmpty()) @throwTypeError("There are still pending read requests, cannot release the lock"); @readableStreamReaderGenericRelease(this); diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js index 3e7438a62..9766d150e 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamDefaultReader.js @@ -33,7 +33,7 @@ function initializeReadableStreamDefaultReader(stream) @throwTypeError("ReadableStream is locked"); @readableStreamReaderGenericInitialize(this, stream); - @putByIdDirectPrivate(this, "readRequests", []); + @putByIdDirectPrivate(this, "readRequests", @createFIFO()); return this; } @@ -70,30 +70,14 @@ function readMany() throw @getByIdDirectPrivate(stream, "storedError"); } - const controller = @getByIdDirectPrivate(stream, "readableStreamController"); + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); const content = @getByIdDirectPrivate(controller, "queue").content; var size = @getByIdDirectPrivate(controller, "queue").size; + var values = content.toArray(false); + var length = values.length; - var values = new @Array(content.length); - - if (content.length > 0) { - if ("buffer" in content[0]) { - values[0] = new @Uint8Array(content[0].buffer, content[0].byteOffset, content[0].byteLength); - for (var i = 0; i < content.length; ++i) { - @putByValDirect(values, i+1, new @Uint8Array(content[i].buffer, content[i].byteOffset, content[i].byteLength)); - } - } else if (typeof content[0] === 'object' && content[0] && "byteLength" in content[0]) { - size = 0; - for (var i = 0; i < content.length; ++i) { - @putByValDirect(values, i+1, content[i].value); - size += content[i].value.byteLength; - } - } else { - for (var i = 0; i < content.length; ++i) { - @putByValDirect(values, i+1, content[i].value); - } - } + if (length > 0) { @resetQueue(@getByIdDirectPrivate(controller, "queue")); if (@getByIdDirectPrivate(controller, "closeRequested")) @@ -105,44 +89,23 @@ function readMany() if (done) { return {value: [], size: 0, done: true}; } - - const content = queue.content; - var values = new @Array(content.length + 1); - + var queue = @getByIdDirectPrivate(controller, "queue"); + const content = [value].concat(queue.content.toArray(false)); var size = queue.size; - - if ("buffer" in content[0]) { - values[0] = new @Uint8Array(value.buffer, value.byteOffset, value.byteLength); - for (var i = 0; i < content.length; ++i) { - @putByValDirect(values, i+1, new @Uint8Array(content[i].buffer, content[i].byteOffset, content[i].byteLength)); - } - size += value.byteLength; - } else if (typeof value === 'object' && value && "byteLength" in value) { - size += value.byteLength; - values[0] = value; - for (var i = 0; i < content.length; ++i) { - @putByValDirect(values, i+1, content[i].value); - size += content[i].value.byteLength; - } - - } else { - values[0] = value; - for (var i = 0; i < content.length; ++i) { - @putByValDirect(values, i+1, content[i].value); - } - } - @resetQueue(queue); if (@getByIdDirectPrivate(controller, "closeRequested")) @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); else @readableStreamDefaultControllerCallPullIfNeeded(controller); + controller = @undefined; return {value: values, size: size, done: false}; }); } + controller = @undefined; + return {value: values, size, done: false}; } @@ -168,7 +131,7 @@ function releaseLock() if (!@getByIdDirectPrivate(this, "ownerReadableStream")) return; - if (@getByIdDirectPrivate(this, "readRequests").length) + if (@getByIdDirectPrivate(this, "readRequests")?.isNotEmpty()) @throwTypeError("There are still pending read requests, cannot release the lock"); @readableStreamReaderGenericRelease(this); diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js index 7c2384330..f441858cc 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js @@ -547,15 +547,15 @@ function readableStreamError(stream, error) if (@isReadableStreamDefaultReader(reader)) { const requests = @getByIdDirectPrivate(reader, "readRequests"); - @putByIdDirectPrivate(reader, "readRequests", []); - for (let index = 0, length = requests.length; index < length; ++index) - @rejectPromise(requests[index], error); + @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); } else { @assert(@isReadableStreamBYOBReader(reader)); const requests = @getByIdDirectPrivate(reader, "readIntoRequests"); - @putByIdDirectPrivate(reader, "readIntoRequests", []); - for (let index = 0, length = requests.length; index < length; ++index) - @rejectPromise(requests[index], error); + @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); } @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error); @@ -571,7 +571,7 @@ function readableStreamDefaultControllerShouldCallPull(controller) return false; if (!@getByIdDirectPrivate(controller, "started")) return false; - if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) + if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return false; const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller); @assert(desiredSize !== null); @@ -589,7 +589,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) return; if (!@getByIdDirectPrivate(controller, "started")) return; - if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) + if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return; if (@getByIdDirectPrivate(controller, "pulling")) { @@ -670,9 +670,10 @@ function readableStreamDefaultControllerPull(controller) { "use strict"; - if (@getByIdDirectPrivate(controller, "queue").content.length) { - const chunk = @dequeueValue(@getByIdDirectPrivate(controller, "queue")); - if (@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(controller, "queue").content.length === 0) + var queue = @getByIdDirectPrivate(controller, "queue"); + if (queue.isNotEmpty()) { + const chunk = @dequeueValue(queue); + if (@getByIdDirectPrivate(controller, "closeRequested") && queue.isEmpty()) @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); else @readableStreamDefaultControllerCallPullIfNeeded(controller); @@ -690,7 +691,7 @@ function readableStreamDefaultControllerClose(controller) @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); @putByIdDirectPrivate(controller, "closeRequested", true); - if (@getByIdDirectPrivate(controller, "queue").content.length === 0) + if (!@getByIdDirectPrivate(controller, "queue")?.isNotEmpty()) @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); } @@ -707,9 +708,10 @@ function readableStreamClose(stream) if (@isReadableStreamDefaultReader(reader)) { const requests = @getByIdDirectPrivate(reader, "readRequests"); - @putByIdDirectPrivate(reader, "readRequests", []); - for (let index = 0, length = requests.length; index < length; ++index) - @fulfillPromise(requests[index], { value: @undefined, done: true }); + @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); + + for (var request = requests.shift(); request; request = requests.shift()) + @fulfillPromise(request, { value: @undefined, done: true }); } @getByIdDirectPrivate(reader, "closedPromiseCapability").@resolve.@call(); @@ -718,7 +720,7 @@ function readableStreamClose(stream) function readableStreamFulfillReadRequest(stream, chunk, done) { "use strict"; - const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@shift(); + const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); @fulfillPromise(readRequest, { value: chunk, done: done }); } @@ -730,7 +732,7 @@ function readableStreamDefaultControllerEnqueue(controller, chunk) // this is checked by callers @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); - if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) { + if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").isNotEmpty) { @readableStreamFulfillReadRequest(stream, chunk, false); @readableStreamDefaultControllerCallPullIfNeeded(controller); return; @@ -775,7 +777,8 @@ function readableStreamAddReadRequest(stream) @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable); const readRequest = @newPromise(); - @arrayPush(@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests"), readRequest); + + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); return readRequest; } diff --git a/src/javascript/jsc/bindings/builtins/js/StreamInternals.js b/src/javascript/jsc/bindings/builtins/js/StreamInternals.js index 9c2103293..c2ca3f5b5 100644 --- a/src/javascript/jsc/bindings/builtins/js/StreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/StreamInternals.js @@ -114,18 +114,119 @@ function validateAndNormalizeQueuingStrategy(size, highWaterMark) return { size: size, highWaterMark: newHighWaterMark }; } +@globalPrivate +function createFIFO() { + "use strict"; + class Denqueue { + constructor() { + this._head = 0; + this._tail = 0; + // this._capacity = 0; + this._capacityMask = 0x3; + this._list = @newArrayWithSize(4); + } + + size() { + if (this._head === this._tail) return 0; + if (this._head < this._tail) return this._tail - this._head; + else return this._capacityMask + 1 - (this._head - this._tail); + } + + isEmpty() { + return this.size() == 0; + } + + isNotEmpty() { + return this.size() > 0; + } + + shift() { + var head = this._head; + if (head === this._tail) return @undefined; + var item = this._list[head]; + @putByValDirect(this._list, head, @undefined); + this._head = (head + 1) & this._capacityMask; + if (head < 2 && this._tail > 10000 && this._tail <= this._list.length >>> 2) this._shrinkArray(); + return item; + } + + peek() { + if (this._head === this._tail) return @undefined; + return this._list[this._head]; + } + + push(item) { + var tail = this._tail; + @putByValDirect(this._list, tail, item); + this._tail = (tail + 1) & this._capacityMask; + if (this._tail === this._head) { + this._growArray(); + } + // if (this._capacity && this.size() > this._capacity) { + // this.shift(); + // } + } + + toArray(fullCopy) { + var list = this._list; + var len = @toLength(list.length); + + if (fullCopy || this._head > this._tail) { + var _head = @toLength(this._head); + var _tail = @toLength(this._tail); + var total = @toLength((len - _head) + _tail); + var array = @newArrayWithSize(total); + var j = 0; + for (var i = _head; i < len; i++) @putByValDirect(array, j++, list[i]); + for (var i = 0; i < _tail; i++) @putByValDirect(array, j++, list[i]); + return array; + } else { + return @Array.prototype.slice.@call(list, this._head, this._tail); + } + } + + clear() { + this._head = 0; + this._tail = 0; + this._list.fill(undefined); + } + + _growArray() { + if (this._head) { + // copy existing data, head to end, then beginning to tail. + this._list = this.toArray(true); + this._head = 0; + } + + // head is at 0 and array is now full, safe to extend + this._tail = @toLength(this._list.length); + + this._list.length <<= 1; + this._capacityMask = (this._capacityMask << 1) | 1; + } + + shrinkArray() { + this._list.length >>>= 1; + this._capacityMask >>>= 1; + } + } + + + return new Denqueue(); +} + function newQueue() { "use strict"; - return { content: [], size: 0 }; + return { content: @createFIFO(), size: 0 }; } function dequeueValue(queue) { "use strict"; - const record = queue.content.@shift(); + const record = queue.content.shift(); queue.size -= record.size; // As described by spec, below case may occur due to rounding errors. if (queue.size < 0) @@ -140,7 +241,8 @@ function enqueueValueWithSize(queue, value, size) size = @toNumber(size); if (!@isFinite(size) || size < 0) @throwRangeError("size has an incorrect value"); - @arrayPush(queue.content, { value, size }); + + queue.content.push({ value, size }); queue.size += size; } @@ -148,9 +250,9 @@ function peekQueueValue(queue) { "use strict"; - @assert(queue.content.length > 0); + @assert(queue.content.isNotEmpty()); - return queue.content[0].value; + return queue.peek().value; } function resetQueue(queue) @@ -159,7 +261,7 @@ function resetQueue(queue) @assert("content" in queue); @assert("size" in queue); - queue.content = []; + queue.content.clear(); queue.size = 0; } diff --git a/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js index 406b9ea48..6b2b3cf90 100644 --- a/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js @@ -118,7 +118,7 @@ function initializeWritableStreamSlots(stream, underlyingSink) @putByIdDirectPrivate(stream, "closeRequest", @undefined); @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined); @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined); - @putByIdDirectPrivate(stream, "writeRequests", []); + @putByIdDirectPrivate(stream, "writeRequests", @createFIFO()); @putByIdDirectPrivate(stream, "backpressure", false); @putByIdDirectPrivate(stream, "underlyingSink", underlyingSink); } @@ -233,7 +233,7 @@ function writableStreamAddWriteRequest(stream) const writePromiseCapability = @newPromiseCapability(@Promise); const writeRequests = @getByIdDirectPrivate(stream, "writeRequests"); - @arrayPush(writeRequests, writePromiseCapability); + writeRequests.push(writePromiseCapability); return writePromiseCapability.@promise; } @@ -266,10 +266,11 @@ function writableStreamFinishErroring(stream) const storedError = @getByIdDirectPrivate(stream, "storedError"); const requests = @getByIdDirectPrivate(stream, "writeRequests"); - for (let index = 0, length = requests.length; index < length; ++index) - requests[index].@reject.@call(@undefined, storedError); + for (var request = requests.shift(); request; request = requests.shift()) + request.@reject.@call(@undefined, storedError); - @putByIdDirectPrivate(stream, "writeRequests", []); + // TODO: is this still necessary? + @putByIdDirectPrivate(stream, "writeRequests", @createFIFO()); const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest"); if (abortRequest === @undefined) { @@ -384,9 +385,9 @@ function writableStreamMarkFirstWriteRequestInFlight(stream) { const writeRequests = @getByIdDirectPrivate(stream, "writeRequests"); @assert(@getByIdDirectPrivate(stream, "inFlightWriteRequest") === @undefined); - @assert(writeRequests.length > 0); + @assert(writeRequests.isNotEmpty()); - const writeRequest = writeRequests.@shift(); + const writeRequest = writeRequests.shift(); @putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest); } @@ -649,7 +650,7 @@ function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) return; } - if (@getByIdDirectPrivate(controller, "queue").content.length === 0) + if (@getByIdDirectPrivate(controller, "queue").content?.isEmpty() ?? false) return; const value = @peekQueueValue(@getByIdDirectPrivate(controller, "queue")); @@ -722,7 +723,7 @@ function writableStreamDefaultControllerProcessClose(controller) @writableStreamMarkCloseRequestInFlight(stream); @dequeueValue(@getByIdDirectPrivate(controller, "queue")); - @assert(@getByIdDirectPrivate(controller, "queue").content.length === 0); + @assert(@getByIdDirectPrivate(controller, "queue").content?.isEmpty()); const sinkClosePromise = @getByIdDirectPrivate(controller, "closeAlgorithm").@call(); @writableStreamDefaultControllerClearAlgorithms(controller); |