aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-07-02 01:31:35 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-07-02 01:36:04 -0700
commit500e5ed949b578b6e075b3afafb850244963f55a (patch)
tree2448389b199011423795688fe262ebbb123e7233 /src/bun.js/builtins/js/ReadableStreamInternals.js
parentb995e7797d8d2ece9fe2f68eb0b29908a733356d (diff)
downloadbun-500e5ed949b578b6e075b3afafb850244963f55a.tar.gz
bun-500e5ed949b578b6e075b3afafb850244963f55a.tar.zst
bun-500e5ed949b578b6e075b3afafb850244963f55a.zip
[streams] Rename `drain()` -> `flush()`
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js110
1 files changed, 58 insertions, 52 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index c959f24fa..353e3f9e9 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -782,21 +782,29 @@ function isReadableStreamDefaultController(controller) {
function readDirectStream(stream, sink, underlyingSource) {
"use strict";
+
+ @putByIdDirectPrivate(stream, "underlyingSource", @undefined);
+
+ var {close: originalClose, pull} = underlyingSource;
+ underlyingSource = @undefined;
+
- var originalClose = underlyingSource.close;
- var reader;
+ var fakeReader = {
+ };
var close = (reason) => {
- originalClose && originalClose(reason);
try {
- reader && reader.releaseLock();
- } catch (e) {}
- @readableStreamClose(stream, reason);
- @putByIdDirectPrivate(stream, "underlyingSource", @undefined);
+ originalClose && originalClose(reason);
+ } catch (e) {
+
+ }
+ originalClose = @undefined;
+ @putByIdDirectPrivate(stream, "reader", @undefined);
@putByIdDirectPrivate(stream, "readableStreamController", null);
- close = @undefined;
- reader = @undefined;
+ @putByIdDirectPrivate(stream, "state", @streamClosed);
+ stream = @undefined;
+ fakeReader = @undefined;
};
- var pull = underlyingSource.pull;
+
if (!pull) {
close();
@@ -811,7 +819,6 @@ function readDirectStream(stream, sink, underlyingSource) {
@putByIdDirectPrivate(stream, "readableStreamController", sink);
@putByIdDirectPrivate(stream, "start", @undefined);
- @putByIdDirectPrivate(stream, "underlyingSource", @undefined);
const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
@@ -823,10 +830,10 @@ function readDirectStream(stream, sink, underlyingSource) {
@startDirectStream.@call(sink, stream, pull, close);
- // lock the stream, relying on close() or end() to eventaully close it
- reader = stream.getReader();
-
+ // isReadableStreamLocked() checks for truthiness of "reader"
+ @putByIdDirectPrivate(stream, "reader", fakeReader);
pull(sink);
+ sink = @undefined;
}
@globalPrivate;
@@ -976,7 +983,7 @@ function handleDirectStreamError(e) {
}
this.error =
- this.drain =
+ this.flush =
this.write =
this.close =
this.end =
@@ -1019,9 +1026,9 @@ function onPullDirectStream(controller) {
}
controller._deferClose = -1;
- controller._deferDrain = -1;
+ controller._deferFlush = -1;
var deferClose;
- var deferDrain;
+ var deferFlush;
// Direct streams allow @pull to be called multiple times, unlike the spec.
// Backpressure is handled by the destination, not by the underlying source.
@@ -1044,8 +1051,8 @@ function onPullDirectStream(controller) {
return @handleDirectStreamErrorReject.@call(controller, e);
} finally {
deferClose = controller._deferClose;
- deferDrain = controller._deferDrain;
- controller._deferDrain = controller._deferClose = 0;
+ deferFlush = controller._deferFlush;
+ controller._deferFlush = controller._deferClose = 0;
}
var promiseToReturn;
@@ -1065,9 +1072,9 @@ function onPullDirectStream(controller) {
return promiseToReturn;
}
- // not done, but they called drain()
- if (deferDrain === 1) {
- @onDrainDirectStream.@call(controller);
+ // not done, but they called flush()
+ if (deferFlush === 1) {
+ @onFlushDirectStream.@call(controller);
}
return promiseToReturn;
@@ -1101,9 +1108,9 @@ function onCloseDirectStream(reason) {
} catch (e) {}
}
- var drained;
+ var flushed;
try {
- drained = this.@sink.end();
+ flushed = this.@sink.end();
@putByIdDirectPrivate(this, "sink", @undefined);
} catch (e) {
if (this._pendingRead) {
@@ -1116,7 +1123,7 @@ function onCloseDirectStream(reason) {
}
this.error =
- this.drain =
+ this.flush =
this.write =
this.close =
this.end =
@@ -1126,18 +1133,18 @@ function onCloseDirectStream(reason) {
if (reader && @isReadableStreamDefaultReader(reader)) {
var _pendingRead = this._pendingRead;
- if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) {
+ if (_pendingRead && @isPromise(_pendingRead) && flushed?.byteLength) {
this._pendingRead = @undefined;
- @fulfillPromise(_pendingRead, { value: drained, done: false });
+ @fulfillPromise(_pendingRead, { value: flushed, done: false });
@readableStreamClose(stream);
return;
}
}
- if (drained?.byteLength) {
+ if (flushed?.byteLength) {
var requests = @getByIdDirectPrivate(reader, "readRequests");
if (requests?.isNotEmpty()) {
- @readableStreamFulfillReadRequest(stream, drained, false);
+ @readableStreamFulfillReadRequest(stream, flushed, false);
@readableStreamClose(stream);
return;
}
@@ -1145,10 +1152,10 @@ function onCloseDirectStream(reason) {
@putByIdDirectPrivate(stream, "state", @streamReadable);
this.@pull = () => {
var thisResult = @createFulfilledPromise({
- value: drained,
+ value: flushed,
done: false,
});
- drained = @undefined;
+ flushed = @undefined;
@readableStreamClose(stream);
stream = @undefined;
return thisResult;
@@ -1163,7 +1170,7 @@ function onCloseDirectStream(reason) {
@readableStreamClose(stream);
}
-function onDrainDirectStream() {
+function onFlushDirectStream() {
"use strict";
var stream = this.@controlledReadableStream;
@@ -1175,23 +1182,23 @@ function onDrainDirectStream() {
var _pendingRead = this._pendingRead;
this._pendingRead = @undefined;
if (_pendingRead && @isPromise(_pendingRead)) {
- var drained = this.@sink.drain();
- if (drained?.byteLength) {
+ var flushed = this.@sink.flush();
+ if (flushed?.byteLength) {
this._pendingRead = @getByIdDirectPrivate(
stream,
"readRequests"
)?.shift();
- @fulfillPromise(_pendingRead, { value: drained, done: false });
+ @fulfillPromise(_pendingRead, { value: flushed, done: false });
} else {
this._pendingRead = _pendingRead;
}
} else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
- var drained = this.@sink.drain();
- if (drained?.byteLength) {
- @readableStreamFulfillReadRequest(stream, drained, false);
+ var flushed = this.@sink.flush();
+ if (flushed?.byteLength) {
+ @readableStreamFulfillReadRequest(stream, flushed, false);
}
- } else if (this._deferDrain === -1) {
- this._deferDrain = 1;
+ } else if (this._deferFlush === -1) {
+ this._deferFlush = 1;
}
}
@@ -1243,7 +1250,7 @@ function createTextStream(highWaterMark) {
return byteLength;
},
- drain() {
+ flush() {
return 0;
},
@@ -1324,10 +1331,10 @@ function initializeTextStream(underlyingSource, highWaterMark) {
error: @handleDirectStreamError,
end: @onCloseDirectStream,
@close: @onCloseDirectStream,
- drain: @onDrainDirectStream,
+ flush: @onFlushDirectStream,
_pendingRead: @undefined,
_deferClose: 0,
- _deferDrain: 0,
+ _deferFlush: 0,
_deferCloseReason: @undefined,
_handleError: @undefined,
};
@@ -1358,7 +1365,7 @@ function initializeArrayStream(underlyingSource, highWaterMark) {
return chunk.byteLength || chunk.length;
},
- drain() {
+ flush() {
return 0;
},
@@ -1386,10 +1393,10 @@ function initializeArrayStream(underlyingSource, highWaterMark) {
error: @handleDirectStreamError,
end: @onCloseDirectStream,
@close: @onCloseDirectStream,
- drain: @onDrainDirectStream,
+ flush: @onFlushDirectStream,
_pendingRead: @undefined,
_deferClose: 0,
- _deferDrain: 0,
+ _deferFlush: 0,
_deferCloseReason: @undefined,
_handleError: @undefined,
};
@@ -1424,10 +1431,10 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) {
error: @handleDirectStreamError,
end: @onCloseDirectStream,
@close: @onCloseDirectStream,
- drain: @onDrainDirectStream,
+ flush: @onFlushDirectStream,
_pendingRead: @undefined,
_deferClose: 0,
- _deferDrain: 0,
+ _deferFlush: 0,
_deferCloseReason: @undefined,
_handleError: @undefined,
};
@@ -1808,8 +1815,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
var nativeType = @getByIdDirectPrivate(stream, "bunNativeType");
var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
- var cached = @lazyStreamPrototypeMap;
- var Prototype = cached.@get(nativeType);
+ var Prototype = @lazyStreamPrototypeMap.@get(nativeType);
if (Prototype === @undefined) {
var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
var closer = [false];
@@ -1882,7 +1888,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
static deinit = deinit;
static registry = new FinalizationRegistry(deinit);
};
- cached.@set(nativeType, Prototype);
+ @lazyStreamPrototypeMap.@set(nativeType, Prototype);
}
const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
@@ -1974,7 +1980,7 @@ function readableStreamToArrayBufferDirect(stream, underlyingSource) {
@fulfillPromise(capability.@promise, sink.end());
}
},
- drain() {
+ flush() {
return 0;
},
write: sink.write.bind(sink),