aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-11-23 07:14:33 -0800
committerGravatar GitHub <noreply@github.com> 2022-11-23 07:14:33 -0800
commitac36ea51cfb85130403ac09299f8e1207bad4bcb (patch)
treea05bc2d34295bc0087b68b799155f18050451721 /src/bun.js/builtins/js/ReadableStreamInternals.js
parentae3fcb5bd89a4ac908ba6d4cdb1be4e7c7f0ea81 (diff)
downloadbun-ac36ea51cfb85130403ac09299f8e1207bad4bcb.tar.gz
bun-ac36ea51cfb85130403ac09299f8e1207bad4bcb.tar.zst
bun-ac36ea51cfb85130403ac09299f8e1207bad4bcb.zip
possibly more reliable Bun.spawn (#1547)
* wip * wip * Fix bug with stdin * zig fmt * seems to work! * Update streams.test.js Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js89
1 files changed, 65 insertions, 24 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index e8c9667a0..def4d51a3 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -839,7 +839,7 @@ function readDirectStream(stream, sink, underlyingSource) {
if (highWaterMark) {
sink.start({
- highWaterMark,
+ highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,
});
}
@@ -1857,7 +1857,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
var Prototype = @lazyStreamPrototypeMap.@get(nativeType);
if (Prototype === @undefined) {
- var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
+ var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);
var closer = [false];
var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
@@ -1870,8 +1870,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
handleResult = function handleResult(result, controller, view) {
"use strict";
-
-
if (result && @isPromise(result)) {
return result.then(
handleNativeReadableStreamPromiseResult.bind({
@@ -1896,53 +1894,96 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
}
};
+ function createResult(tag, controller, view, closer) {
+ closer[0] = false;
+
+ var result;
+ try {
+ result = pull(tag, view, closer);
+ } catch (err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
Prototype = class NativeReadableStreamSource {
- constructor(tag, autoAllocateChunkSize) {
- this.pull = this.pull_.bind(tag);
- this.cancel = this.cancel_.bind(tag);
+ constructor(tag, autoAllocateChunkSize, drainValue) {
+ this.#tag = tag;
+ this.pull = this.#pull.bind(this);
+ this.cancel = this.#cancel.bind(this);
this.autoAllocateChunkSize = autoAllocateChunkSize;
+
+ if (drainValue !== @undefined) {
+ this.start = (controller) => {
+ controller.enqueue(drainValue);
+ console.log("chunkSize", chunkSize);
+ };
+ }
}
pull;
cancel;
+ start;
+ #tag;
type = "bytes";
autoAllocateChunkSize = 0;
-
+
static startSync = start;
+
+
+ #pull(controller) {
+ var tag = this.#tag;
- pull_(controller) {
- closer[0] = false;
-
- var result;
-
- const view = controller.byobRequest.view;
- try {
- result = pull(this, view, closer);
- } catch (err) {
- return controller.error(err);
+ if (!tag) {
+ controller.close();
+ return;
}
- return handleResult(result, controller, view);
+ createResult(tag, controller, controller.byobRequest.view, closer);
}
- cancel_(reason) {
- cancel(this, reason);
+ #cancel(reason) {
+ var tag = this.#tag;
+ setRefOrUnref && setRefOrUnref(tag, false);
+ cancel(tag, reason);
}
static deinit = deinit;
static registry = new FinalizationRegistry(deinit);
+ static drain = drain;
};
@lazyStreamPrototypeMap.@set(nativeType, Prototype);
}
const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+ var drainValue;
+ const drainFn = Prototype.drain;
+ if (drainFn) {
+ drainValue = drainFn(nativePtr);
+ }
// empty file, no need for native back-and-forth on this
if (chunkSize === 0) {
- @readableStreamClose(stream);
- return null;
+ if ((drainValue?.byteLength ?? 0) > 0) {
+ deinit && nativePtr && @enqueueJob(deinit, nativePtr);
+ return {
+ start(controller) {
+ controller.enqueue(drainValue);
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return {
+ start(controller) {
+ controller.close();
+ },
+ type: "bytes",
+ };
}
- var instance = new Prototype(nativePtr, chunkSize);
+ var instance = new Prototype(nativePtr, chunkSize, drainValue);
Prototype.registry.register(instance, nativePtr);
return instance;
}