aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStream.js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStream.js84
1 files changed, 48 insertions, 36 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
index 5a5ea4094..4d7113888 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
@@ -89,79 +89,69 @@ function initializeReadableStream(underlyingSource, strategy)
}
@globalPrivate
-function createNativeReadableStream(nativePtr, nativeType) {
+function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
"use strict";
var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map;
var Prototype = cached.@get(nativeType);
if (Prototype === @undefined) {
var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
var closer = [false];
-
+var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
"use strict";
- var {r, c} = this;
- this.r = @undefined;
+ var {c, v} = this;
this.c = @undefined;
- r(val, c);
+ this.v = @undefined;
+ handleResult(val, c, v);
}
- function closeNativeReadableStreamOnNextTick(controller) {
- "use strict";
- controller.close();
- controller = @undefined;
- }
-
- var handleResult = function handleResult(result, controller) {
+
+ handleResult = function handleResult(result, controller, view) {
"use strict";
if (result && @isPromise(result)) {
- return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, r: handleResult}), controller.error);
+ return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err));
} else if (result !== false) {
- controller.enqueue(result);
+ if (view && view.byteLength === result) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
}
if (closer[0] || result === false) {
- @enqueueJob(closeNativeReadableStreamOnNextTick, controller);
+ @enqueueJob(() => controller.close());
closer[0] = false;
}
- }
+ };
Prototype = class NativeReadableStreamSource {
- constructor(tag) {
+ constructor(tag, autoAllocateChunkSize) {
this.pull = this.pull_.bind(tag);
- this.start = this.start_.bind(tag);
this.cancel = this.cancel_.bind(tag);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
}
pull;
- start;
cancel;
-
- pull_(controller) {
- closer[0] = false;
- var result;
-
- try {
- result = pull(this, closer);
- } catch(err) {
- return controller.error(err);
- }
- return handleResult(result, controller);
- }
+ type = "bytes";
+ autoAllocateChunkSize = 0;
- start_(controller) {
- setClose(this, controller.close);
+ static startSync = start;
+
+ pull_(controller) {
closer[0] = false;
var result;
+ const view = controller.byobRequest.view;
try {
- result = start(this, closer);
+ result = pull(this, view, closer);
} catch(err) {
return controller.error(err);
}
- return handleResult(result, controller);
+ return handleResult(result, controller, view);
}
cancel_(reason) {
@@ -173,7 +163,29 @@ function createNativeReadableStream(nativePtr, nativeType) {
cached.@set(nativeType, Prototype);
}
- var instance = new Prototype(nativePtr);
+ // either returns the chunk size
+ // or throws an error
+ // should never return a Promise
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ return new @ReadableStream({
+ start(controller) {
+ controller.close();
+ },
+
+ pull() {
+
+ },
+
+ cancel() {
+
+ },
+ });
+ }
+
+ var instance = new Prototype(nativePtr, chunkSize);
Prototype.registry.register(instance, nativePtr);
var stream = new @ReadableStream(instance);
@putByIdDirectPrivate(stream, "bunNativeType", nativeType);