diff options
author | 2022-06-15 07:17:42 -0700 | |
---|---|---|
committer | 2022-06-15 07:17:42 -0700 | |
commit | 56e88fb4dd06e07569ddc3861e2e8e21f71e45b8 (patch) | |
tree | a355df05ffd4665916c3eefb8370e154d1128f72 /src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js | |
parent | d93f09331394148441d142930fea236a9fd73c5c (diff) | |
download | bun-56e88fb4dd06e07569ddc3861e2e8e21f71e45b8.tar.gz bun-56e88fb4dd06e07569ddc3861e2e8e21f71e45b8.tar.zst bun-56e88fb4dd06e07569ddc3861e2e8e21f71e45b8.zip |
direct streams mostly workjarred/direct
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js | 500 |
1 files changed, 454 insertions, 46 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js index 5d0779745..d2dfd5137 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js @@ -67,48 +67,6 @@ function privateInitializeReadableStreamDefaultController(stream, underlyingSour return this; } - -// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. -// The other part is implemented in privateInitializeReadableStreamDefaultController. -function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) -{ - "use strict"; - - const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); - - const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); - const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); - - @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); - @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); - @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); - @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); - @putByIdDirectPrivate(stream, "readableStreamController", controller); - - if (@getByIdDirectPrivate(controller, "sink") === @undefined) { - @readableStreamDefaultControllerStart(controller); - } - -} - -function readableStreamDefaultControllerStart(controller) { - if (@getByIdDirectPrivate(controller, "started") !== -1) - return; - - const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); - const startMethod = underlyingSource.start; - @putByIdDirectPrivate(controller, "started", 0); - - return @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { - @putByIdDirectPrivate(controller, "started", 1); - @assert(!@getByIdDirectPrivate(controller, "pulling")); - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - }, (error) => { - @readableStreamDefaultControllerError(controller, error); - }); -} - function readableStreamDefaultControllerError(controller, error) { "use strict"; @@ -157,12 +115,88 @@ function acquireReadableStreamDefaultReader(stream) "use strict"; var start = @getByIdDirectPrivate(stream, "start"); if (start) { - start(); + start.@call(stream); } return new @ReadableStreamDefaultReader(stream); } +// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. +// The other part is implemented in privateInitializeReadableStreamDefaultController. +function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) +{ + "use strict"; + + const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); + + const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); + @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); + @putByIdDirectPrivate(stream, "readableStreamController", controller); + + @readableStreamDefaultControllerStart(controller); + +} + + +function createReadableStreamController(underlyingSource, strategy, fromLazy) { + if (fromLazy) { + @putByIdDirectPrivate(this, "start", @undefined); + } + + const type = underlyingSource.type; + const typeString = @toString(type); + + if (typeString === "bytes") { + // if (!@readableByteStreamAPIEnabled()) + // @throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 0; + if (strategy.size !== @undefined) + @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); + + @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream)); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark); + } else if (type === @undefined) { + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 1; + + @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); + } else + @throwRangeError("Invalid type for underlying source"); + +} + +function readableStreamDefaultControllerStart(controller) { + "use strict"; + + + + if (@getByIdDirectPrivate(controller, "started") !== -1) + return; + + const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + @putByIdDirectPrivate(controller, "started", 0); + + @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + }, (error) => { + @readableStreamDefaultControllerError(controller, error); + }); +} + + // FIXME: Replace readableStreamPipeTo by below function. // This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) @@ -563,6 +597,281 @@ function isReadableStreamDefaultController(controller) return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource"); } + +@globalPrivate +function assignDirectStream() { + "use strict"; + + var stream = this; +} + + +function handleDirectStreamError(e) { + "use strict"; + + var controller = this; + var sink = controller.@sink; + if (sink) { + @putByIdDirectPrivate(controller, "sink", @undefined); + try { + sink.close(e); + } catch (f) {} + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, e); + } catch (e) { + } + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = @undefined; + @rejectPromise(pend, e); + } + } catch (f) {} + var stream = controller.@controlledReadableStream; + if (stream) @readableStreamError(stream, e); +} + +function handleDirectStreamErrorReject(e) { + @handleDirectStreamError.@call(this, e); + return @Promise.@reject(e); +} + +function onPullDirectStream(controller) +{ + + "use strict"; + + var stream = controller.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + + controller._deferClose = -1; + controller._deferDrain = -1; + var deferClose; + var deferDrain; + + // Direct streams allow @pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.@underlyingSource.pull( + controller, + ); + + if (result && @isPromise(result)) { + if (controller._handleError === @undefined) { + controller._handleError = @handleDirectStreamErrorReject.bind(controller); + } + + @Promise.prototype.catch.@call(result, controller._handleError); + } + } catch(e) { + return @handleDirectStreamErrorReject.@call(controller, e); + } finally { + deferClose = controller._deferClose; + deferDrain = controller._deferDrain; + controller._deferDrain = controller._deferClose = 0; + } + + + var promiseToReturn; + + + if (controller._pendingRead === @undefined) { + controller._pendingRead = promiseToReturn = @newPromise(); + } else { + promiseToReturn = @readableStreamAddReadRequest(stream); + } + + + // they called close during @pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = @undefined; + @onCloseDirectStream.@call(controller, reason); + return promiseToReturn; + } + + // not done, but they called drain() + if (deferDrain === 1) { + @onDrainDirectStream.@call(controller); + } + + + return promiseToReturn; +} + +function noopDoneFunction() { + return @Promise.@resolve({value: @undefined, done: true}); +} + +function onReadableStreamDirectControllerClosed(reason) +{ + "use strict"; + @throwTypeError("ReadableStreamDirectController is now closed"); +} + +function onCloseDirectStream(reason) +{ + "use strict"; + var stream = this.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } + + @putByIdDirectPrivate(stream, "state", @streamClosing); + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, reason); + } catch (e) { + + } + } + + var drained; + try { + drained = this.@sink.end(); + @putByIdDirectPrivate(this, "sink", @undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @rejectPromise(read, e); + } + @readableStreamError(stream, e); + return; + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + var reader = @getByIdDirectPrivate(stream, "reader"); + + if (reader && @isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { + this._pendingRead = @undefined; + @fulfillPromise(_pendingRead, {value: drained, done: false}); + @readableStreamClose(stream); + return; + } + } + + if (drained?.byteLength) { + var requests = @getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, drained, false); + @readableStreamClose(stream); + return; + } + + @putByIdDirectPrivate(stream, "state", @streamReadable); + this.@pull = () => { + var thisResult = @createFulfilledPromise({value: drained, done: false}); + drained = @undefined; + @readableStreamClose(stream); + stream = @undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @putByIdDirectPrivate(this, "pull", @noopDoneFunction); + @fulfillPromise(read, {value: @undefined, done: true}); + } + + @readableStreamClose(stream); +} + +function onDrainDirectStream() +{ + "use strict"; + + var stream = this.@controlledReadableStream; + var reader = @getByIdDirectPrivate(stream, "reader"); + if (!reader || !@isReadableStreamDefaultReader(reader)) { + return; + } + + var _pendingRead = this._pendingRead; + this._pendingRead = @undefined; + if (_pendingRead && @isPromise(_pendingRead)) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); + @fulfillPromise(_pendingRead, {value: drained, done: false}); + } else { + this._pendingRead = _pendingRead; + } + } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + @readableStreamFulfillReadRequest(stream, drained, false); + } + } else if (this._deferDrain === -1) { + this._deferDrain = 1; + } + +} + +function initializeArrayBufferStream(underlyingSource, highWaterMark) +{ + "use strict"; + + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; + var sink = new globalThis.Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write.bind(sink), + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + +} + function readableStreamError(stream, error) { "use strict"; @@ -597,11 +906,13 @@ function readableStreamError(stream, error) function readableStreamDefaultControllerShouldCallPull(controller) { - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + "use strict"; + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false; - if (!(@getByIdDirectPrivate(controller, "started") > 0)) + if (!(@getByIdDirectPrivate(controller, "started") === 1)) return false; if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return false; @@ -619,7 +930,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; - if (!(@getByIdDirectPrivate(controller, "started") > 0)) + if (!(@getByIdDirectPrivate(controller, "started") === 1)) return; if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return; @@ -629,6 +940,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) return; } + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); @putByIdDirectPrivate(controller, "pulling", true); @@ -636,6 +948,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) @putByIdDirectPrivate(controller, "pulling", false); if (@getByIdDirectPrivate(controller, "pullAgain")) { @putByIdDirectPrivate(controller, "pullAgain", false); + @readableStreamDefaultControllerCallPullIfNeeded(controller); } }, function(error) { @@ -849,3 +1162,98 @@ function readableStreamDefaultControllerCanCloseOrEnqueue(controller) return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable; } + + +function lazyLoadStream(stream, autoAllocateChunkSize) { + "use strict"; + + @putByIdDirectPrivate(stream, "start", @undefined); + var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType"); + var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); + + var cached = globalThis[globalThis.Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; + var Prototype = cached.@get(nativeType); + if (Prototype === @undefined) { + var [pull, start, cancel, setClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + "use strict"; + var {c, v} = this; + this.c = @undefined; + this.v = @undefined; + handleResult(val, c, v); + } + + handleResult = function handleResult(result, controller, view) { + "use strict"; + + if (result && @isPromise(result)) { + return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); + } else if (result !== false) { + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } + + if (closer[0] || result === false) { + @enqueueJob(() => controller.close()); + closer[0] = false; + } + }; + + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize) { + this.pull = this.pull_.bind(tag); + this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; + } + + pull; + cancel; + + type = "bytes"; + autoAllocateChunkSize = 0; + + static startSync = start; + + pull_(controller) { + closer[0] = false; + var result; + + const view = controller.byobRequest.view; + try { + result = pull(this, view, closer); + } catch(err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + + cancel_(reason) { + cancel(this, reason); + } + + static registry = new FinalizationRegistry(deinit); + } + cached.@set(nativeType, Prototype); + } + + // either returns the chunk size + // or throws an error + // should never return a Promise + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + @readableStreamClose(stream); + return; + } + + var instance = new Prototype(nativePtr, chunkSize); + Prototype.registry.register(instance, nativePtr); + @createReadableStreamController.@call(stream, instance, @undefined, true); +}
\ No newline at end of file |