aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-15 22:10:12 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-15 22:10:12 -0700
commite6fbbd48db4077ab3d35fef322d2612cb6141a12 (patch)
tree7f8c43e4f7f4a051ab1b35e93aab2baab550e6e6 /src/javascript/jsc/bindings/builtins/js
parent56e88fb4dd06e07569ddc3861e2e8e21f71e45b8 (diff)
downloadbun-e6fbbd48db4077ab3d35fef322d2612cb6141a12.tar.gz
bun-e6fbbd48db4077ab3d35fef322d2612cb6141a12.tar.zst
bun-e6fbbd48db4077ab3d35fef322d2612cb6141a12.zip
Fix lazy loading internal streams
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js41
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStream.js33
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js36
3 files changed, 52 insertions, 58 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
index f59a7bdbc..01da62e1a 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js
@@ -41,7 +41,7 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou
@putByIdDirectPrivate(this, "pulling", false);
@readableByteStreamControllerClearPendingPullIntos(this);
@putByIdDirectPrivate(this, "queue", @newQueue());
- @putByIdDirectPrivate(this, "started", -1);
+ @putByIdDirectPrivate(this, "started", 0);
@putByIdDirectPrivate(this, "closeRequested", false);
let hwm = @toNumber(highWaterMark);
@@ -58,38 +58,28 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou
@putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize);
@putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO());
- @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel);
- @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull);
- if (@getByIdDirectPrivate(underlyingByteSource, "lazy") === true) {
- @putByIdDirectPrivate(this, "start", () => @readableStreamByteStreamControllerStart(this));
- } else {
- @putByIdDirectPrivate(this, "start", @undefined);
- @readableStreamByteStreamControllerStart(this);
- }
+ const controller = this;
+ @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
+ @assert(!@getByIdDirectPrivate(controller, "pulling"));
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ }, (error) => {
+ if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
+ @readableByteStreamControllerError(controller, error);
+ });
+ @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel);
+ @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull);
+
return this;
}
function readableStreamByteStreamControllerStart(controller) {
"use strict";
@putByIdDirectPrivate(controller, "start", @undefined);
-
- if (@getByIdDirectPrivate(controller, "started") !== -1)
- return;
- @putByIdDirectPrivate(controller, "started", 0);
- var stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
- return @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => {
- @putByIdDirectPrivate(controller, "started", 1);
- @assert(!@getByIdDirectPrivate(controller, "pulling"));
- @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
- @readableByteStreamControllerCallPullIfNeeded(controller);
- }, (error) => {
- var stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
- if (stream && @getByIdDirectPrivate(stream, "state") === @streamReadable)
- @readableByteStreamControllerError(controller, error);
- });
}
@@ -237,8 +227,7 @@ function readableByteStreamControllerPull(controller)
const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
@assert(@readableStreamHasDefaultReader(stream));
-
- if (@getByIdDirectPrivate(controller, "queue").size > 0) {
+ if (@getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) {
const entry = @getByIdDirectPrivate(controller, "queue").content.shift();
@getByIdDirectPrivate(controller, "queue").size -= entry.byteLength;
@readableByteStreamControllerHandleQueueDrain(controller);
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
index cfe68d13c..db7cf85a8 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
@@ -29,7 +29,7 @@ function initializeReadableStream(underlyingSource, strategy)
"use strict";
if (underlyingSource === @undefined)
- underlyingSource = { };
+ underlyingSource = { @bunNativeType: 0, @bunNativePtr: 0, @lazy: false };
if (strategy === @undefined)
strategy = { };
@@ -49,6 +49,8 @@ function initializeReadableStream(underlyingSource, strategy)
// Initialized with null value to enable distinction with undefined case.
@putByIdDirectPrivate(this, "readableStreamController", null);
+ @putByIdDirectPrivate(this, "bunNativeType", @getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0);
+ @putByIdDirectPrivate(this, "bunNativePtr", @getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0);
const isDirect = underlyingSource.type === "direct";
// direct streams are always lazy
@@ -65,16 +67,20 @@ function initializeReadableStream(underlyingSource, strategy)
return this;
}
if (isDirect) {
- if ("start" in underlyingSource && typeof underlyingSource.start === "function")
- @throwTypeError("\"start\" for direct streams are not implemented yet");
-
- @putByIdDirectPrivate(this, "start", () => @createReadableStreamController.@call(this, underlyingSource, strategy, true));
+ @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy));
} else if (isLazy) {
const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
- @putByIdDirectPrivate(this, "start", () => @lazyLoadStream(this, autoAllocateChunkSize));
+
+
+ @putByIdDirectPrivate(this, "start", () => {
+ const instance = @lazyLoadStream(this, autoAllocateChunkSize);
+ if (instance) {
+ @createReadableStreamController(this, instance, strategy);
+ }
+ });
} else {
@putByIdDirectPrivate(this, "start", @undefined);
- @createReadableStreamController.@call(this, underlyingSource, strategy, false);
+ @createReadableStreamController(this, underlyingSource, strategy);
}
@@ -327,6 +333,8 @@ function consumeReadableStream(nativePtr, nativeType, inputStream) {
@globalPrivate
function createEmptyReadableStream() {
+ "use strict";
+
var stream = new @ReadableStream({
pull() {},
});
@@ -337,13 +345,12 @@ function createEmptyReadableStream() {
@globalPrivate
function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
"use strict";
- stream = new @ReadableStream({
+ return new @ReadableStream({
@lazy: true,
+ @bunNativeType: nativeType,
+ @bunNativePtr: nativePtr,
autoAllocateChunkSize: autoAllocateChunkSize,
});
- @putByIdDirectPrivate(stream, "bunNativeType", nativeType);
- @putByIdDirectPrivate(stream, "bunNativePtr", nativePtr);
- return stream;
}
function cancel(reason)
@@ -370,8 +377,10 @@ function getReader(options)
if (mode === @undefined) {
var start_ = @getByIdDirectPrivate(this, "start");
if (start_) {
- start_.@call(this);
+ @putByIdDirectPrivate(this, "start", @undefined);
+ start_();
}
+
return new @ReadableStreamDefaultReader(this);
}
// String conversion is required by spec, hence double equals.
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
index d2dfd5137..3e6590f31 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
@@ -143,10 +143,8 @@ function setupReadableStreamDefaultController(stream, underlyingSource, size, hi
}
-function createReadableStreamController(underlyingSource, strategy, fromLazy) {
- if (fromLazy) {
- @putByIdDirectPrivate(this, "start", @undefined);
- }
+function createReadableStreamController(stream, underlyingSource, strategy) {
+ "use strict";
const type = underlyingSource.type;
const typeString = @toString(type);
@@ -160,15 +158,15 @@ function createReadableStreamController(underlyingSource, strategy, fromLazy) {
if (strategy.size !== @undefined)
@throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
- @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream));
+ @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream));
} else if (typeString === "direct") {
var highWaterMark = strategy?.highWaterMark;
- @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark);
+ @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark);
} else if (type === @undefined) {
if (strategy.highWaterMark === @undefined)
strategy.highWaterMark = 1;
- @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel);
+ @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel);
} else
@throwRangeError("Invalid type for underlying source");
@@ -313,6 +311,8 @@ function pipeToDoReadWrite(pipeState)
function pipeToErrorsMustBePropagatedForward(pipeState)
{
+ "use strict";
+
const action = () => {
pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
const error = @getByIdDirectPrivate(pipeState.source, "storedError");
@@ -351,6 +351,7 @@ function pipeToErrorsMustBePropagatedBackward(pipeState)
function pipeToClosingMustBePropagatedForward(pipeState)
{
+ "use strict";
const action = () => {
pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
const error = @getByIdDirectPrivate(pipeState.source, "storedError");
@@ -369,6 +370,7 @@ function pipeToClosingMustBePropagatedForward(pipeState)
function pipeToClosingMustBePropagatedBackward(pipeState)
{
+ "use strict";
if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed")
return;
@@ -1167,14 +1169,12 @@ function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
function lazyLoadStream(stream, autoAllocateChunkSize) {
"use strict";
- @putByIdDirectPrivate(stream, "start", @undefined);
- var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType");
- var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
-
- var cached = globalThis[globalThis.Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map;
+ var nativeType = @getByIdDirectPrivate(stream, "bunNativeType");
+ var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
+ var cached = @lazyStreamPrototypeMap;
var Prototype = cached.@get(nativeType);
if (Prototype === @undefined) {
- var [pull, start, cancel, setClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType);
+ var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
var closer = [false];
var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
@@ -1236,24 +1236,20 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
cancel_(reason) {
cancel(this, reason);
}
-
+ static deinit = deinit;
static registry = new FinalizationRegistry(deinit);
}
cached.@set(nativeType, Prototype);
}
- // either returns the chunk size
- // or throws an error
- // should never return a Promise
const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
// empty file, no need for native back-and-forth on this
if (chunkSize === 0) {
@readableStreamClose(stream);
- return;
+ return null;
}
-
var instance = new Prototype(nativePtr, chunkSize);
Prototype.registry.register(instance, nativePtr);
- @createReadableStreamController.@call(stream, instance, @undefined, true);
+ return instance;
} \ No newline at end of file