diff options
author | 2022-07-02 01:31:35 -0700 | |
---|---|---|
committer | 2022-07-02 01:36:04 -0700 | |
commit | 500e5ed949b578b6e075b3afafb850244963f55a (patch) | |
tree | 2448389b199011423795688fe262ebbb123e7233 /src/bun.js/builtins/js/ReadableStreamInternals.js | |
parent | b995e7797d8d2ece9fe2f68eb0b29908a733356d (diff) | |
download | bun-500e5ed949b578b6e075b3afafb850244963f55a.tar.gz bun-500e5ed949b578b6e075b3afafb850244963f55a.tar.zst bun-500e5ed949b578b6e075b3afafb850244963f55a.zip |
[streams] Rename `drain()` -> `flush()`
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 110 |
1 files changed, 58 insertions, 52 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index c959f24fa..353e3f9e9 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -782,21 +782,29 @@ function isReadableStreamDefaultController(controller) { function readDirectStream(stream, sink, underlyingSource) { "use strict"; + + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + + var {close: originalClose, pull} = underlyingSource; + underlyingSource = @undefined; + - var originalClose = underlyingSource.close; - var reader; + var fakeReader = { + }; var close = (reason) => { - originalClose && originalClose(reason); try { - reader && reader.releaseLock(); - } catch (e) {} - @readableStreamClose(stream, reason); - @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + originalClose && originalClose(reason); + } catch (e) { + + } + originalClose = @undefined; + @putByIdDirectPrivate(stream, "reader", @undefined); @putByIdDirectPrivate(stream, "readableStreamController", null); - close = @undefined; - reader = @undefined; + @putByIdDirectPrivate(stream, "state", @streamClosed); + stream = @undefined; + fakeReader = @undefined; }; - var pull = underlyingSource.pull; + if (!pull) { close(); @@ -811,7 +819,6 @@ function readDirectStream(stream, sink, underlyingSource) { @putByIdDirectPrivate(stream, "readableStreamController", sink); @putByIdDirectPrivate(stream, "start", @undefined); - @putByIdDirectPrivate(stream, "underlyingSource", @undefined); const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); @@ -823,10 +830,10 @@ function readDirectStream(stream, sink, underlyingSource) { @startDirectStream.@call(sink, stream, pull, close); - // lock the stream, relying on close() or end() to eventaully close it - reader = stream.getReader(); - + // isReadableStreamLocked() checks for truthiness of "reader" + @putByIdDirectPrivate(stream, "reader", fakeReader); pull(sink); + sink = @undefined; } @globalPrivate; @@ -976,7 +983,7 @@ function handleDirectStreamError(e) { } this.error = - this.drain = + this.flush = this.write = this.close = this.end = @@ -1019,9 +1026,9 @@ function onPullDirectStream(controller) { } controller._deferClose = -1; - controller._deferDrain = -1; + controller._deferFlush = -1; var deferClose; - var deferDrain; + var deferFlush; // Direct streams allow @pull to be called multiple times, unlike the spec. // Backpressure is handled by the destination, not by the underlying source. @@ -1044,8 +1051,8 @@ function onPullDirectStream(controller) { return @handleDirectStreamErrorReject.@call(controller, e); } finally { deferClose = controller._deferClose; - deferDrain = controller._deferDrain; - controller._deferDrain = controller._deferClose = 0; + deferFlush = controller._deferFlush; + controller._deferFlush = controller._deferClose = 0; } var promiseToReturn; @@ -1065,9 +1072,9 @@ function onPullDirectStream(controller) { return promiseToReturn; } - // not done, but they called drain() - if (deferDrain === 1) { - @onDrainDirectStream.@call(controller); + // not done, but they called flush() + if (deferFlush === 1) { + @onFlushDirectStream.@call(controller); } return promiseToReturn; @@ -1101,9 +1108,9 @@ function onCloseDirectStream(reason) { } catch (e) {} } - var drained; + var flushed; try { - drained = this.@sink.end(); + flushed = this.@sink.end(); @putByIdDirectPrivate(this, "sink", @undefined); } catch (e) { if (this._pendingRead) { @@ -1116,7 +1123,7 @@ function onCloseDirectStream(reason) { } this.error = - this.drain = + this.flush = this.write = this.close = this.end = @@ -1126,18 +1133,18 @@ function onCloseDirectStream(reason) { if (reader && @isReadableStreamDefaultReader(reader)) { var _pendingRead = this._pendingRead; - if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { + if (_pendingRead && @isPromise(_pendingRead) && flushed?.byteLength) { this._pendingRead = @undefined; - @fulfillPromise(_pendingRead, { value: drained, done: false }); + @fulfillPromise(_pendingRead, { value: flushed, done: false }); @readableStreamClose(stream); return; } } - if (drained?.byteLength) { + if (flushed?.byteLength) { var requests = @getByIdDirectPrivate(reader, "readRequests"); if (requests?.isNotEmpty()) { - @readableStreamFulfillReadRequest(stream, drained, false); + @readableStreamFulfillReadRequest(stream, flushed, false); @readableStreamClose(stream); return; } @@ -1145,10 +1152,10 @@ function onCloseDirectStream(reason) { @putByIdDirectPrivate(stream, "state", @streamReadable); this.@pull = () => { var thisResult = @createFulfilledPromise({ - value: drained, + value: flushed, done: false, }); - drained = @undefined; + flushed = @undefined; @readableStreamClose(stream); stream = @undefined; return thisResult; @@ -1163,7 +1170,7 @@ function onCloseDirectStream(reason) { @readableStreamClose(stream); } -function onDrainDirectStream() { +function onFlushDirectStream() { "use strict"; var stream = this.@controlledReadableStream; @@ -1175,23 +1182,23 @@ function onDrainDirectStream() { var _pendingRead = this._pendingRead; this._pendingRead = @undefined; if (_pendingRead && @isPromise(_pendingRead)) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { + var flushed = this.@sink.flush(); + if (flushed?.byteLength) { this._pendingRead = @getByIdDirectPrivate( stream, "readRequests" )?.shift(); - @fulfillPromise(_pendingRead, { value: drained, done: false }); + @fulfillPromise(_pendingRead, { value: flushed, done: false }); } else { this._pendingRead = _pendingRead; } } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - @readableStreamFulfillReadRequest(stream, drained, false); + var flushed = this.@sink.flush(); + if (flushed?.byteLength) { + @readableStreamFulfillReadRequest(stream, flushed, false); } - } else if (this._deferDrain === -1) { - this._deferDrain = 1; + } else if (this._deferFlush === -1) { + this._deferFlush = 1; } } @@ -1243,7 +1250,7 @@ function createTextStream(highWaterMark) { return byteLength; }, - drain() { + flush() { return 0; }, @@ -1324,10 +1331,10 @@ function initializeTextStream(underlyingSource, highWaterMark) { error: @handleDirectStreamError, end: @onCloseDirectStream, @close: @onCloseDirectStream, - drain: @onDrainDirectStream, + flush: @onFlushDirectStream, _pendingRead: @undefined, _deferClose: 0, - _deferDrain: 0, + _deferFlush: 0, _deferCloseReason: @undefined, _handleError: @undefined, }; @@ -1358,7 +1365,7 @@ function initializeArrayStream(underlyingSource, highWaterMark) { return chunk.byteLength || chunk.length; }, - drain() { + flush() { return 0; }, @@ -1386,10 +1393,10 @@ function initializeArrayStream(underlyingSource, highWaterMark) { error: @handleDirectStreamError, end: @onCloseDirectStream, @close: @onCloseDirectStream, - drain: @onDrainDirectStream, + flush: @onFlushDirectStream, _pendingRead: @undefined, _deferClose: 0, - _deferDrain: 0, + _deferFlush: 0, _deferCloseReason: @undefined, _handleError: @undefined, }; @@ -1424,10 +1431,10 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) { error: @handleDirectStreamError, end: @onCloseDirectStream, @close: @onCloseDirectStream, - drain: @onDrainDirectStream, + flush: @onFlushDirectStream, _pendingRead: @undefined, _deferClose: 0, - _deferDrain: 0, + _deferFlush: 0, _deferCloseReason: @undefined, _handleError: @undefined, }; @@ -1808,8 +1815,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); - var cached = @lazyStreamPrototypeMap; - var Prototype = cached.@get(nativeType); + var Prototype = @lazyStreamPrototypeMap.@get(nativeType); if (Prototype === @undefined) { var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); var closer = [false]; @@ -1882,7 +1888,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { static deinit = deinit; static registry = new FinalizationRegistry(deinit); }; - cached.@set(nativeType, Prototype); + @lazyStreamPrototypeMap.@set(nativeType, Prototype); } const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); @@ -1974,7 +1980,7 @@ function readableStreamToArrayBufferDirect(stream, underlyingSource) { @fulfillPromise(capability.@promise, sink.end()); } }, - drain() { + flush() { return 0; }, write: sink.write.bind(sink), |