aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/bindings/JSReadableHelper.cpp228
-rw-r--r--src/bun.js/bindings/JSReadableHelper.h11
-rw-r--r--src/bun.js/bindings/JSReadableState.cpp210
-rw-r--r--src/bun.js/bindings/JSReadableState.h25
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp10
-rw-r--r--src/bun.js/streams.exports.js59
6 files changed, 405 insertions, 138 deletions
diff --git a/src/bun.js/bindings/JSReadableHelper.cpp b/src/bun.js/bindings/JSReadableHelper.cpp
new file mode 100644
index 000000000..ef17982d4
--- /dev/null
+++ b/src/bun.js/bindings/JSReadableHelper.cpp
@@ -0,0 +1,228 @@
+#include "JSReadableHelper.h"
+#include "JSReadableState.h"
+#include "JSBufferList.h"
+#include "JSBuffer.h"
+#include "JSEventEmitter.h"
+#include "JavaScriptCore/Lookup.h"
+#include "JavaScriptCore/ObjectConstructor.h"
+#include "ZigGlobalObject.h"
+#include "JSDOMOperation.h"
+#include "JSDOMAttribute.h"
+#include "headers.h"
+#include "JSDOMConvertEnumeration.h"
+
+namespace WebCore {
+using namespace JSC;
+
+static JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore_);
+JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+{
+ VM& vm = lexicalGlobalObject->vm();
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+
+ if (callFrame->argumentCount() < 2) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject);
+ RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined()));
+ JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1));
+ if (!state) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto callData = JSC::getCallData(read);
+ if (callData.type == CallData::Type::None) {
+ throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read));
+ return JSValue::encode(jsUndefined());
+ }
+ MarkedArgumentBuffer args;
+ args.append(jsNumber(0));
+
+ while (
+ !state->m_reading &&
+ !state->m_ended &&
+ (state->m_length < state->m_highWaterMark || (state->m_flowing > 0 && state->m_length == 0))) {
+ int64_t len = state->m_length;
+
+ JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args);
+
+ if (len == state->m_length)
+ break;
+ }
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
+}
+
+JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+{
+ VM& vm = lexicalGlobalObject->vm();
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+
+ if (callFrame->argumentCount() < 2) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ JSValue streamVal = callFrame->uncheckedArgument(0);
+ JSValue stateVal = callFrame->uncheckedArgument(1);
+
+ // make this static?
+ JSFunction* maybeReadMore_ = JSC::JSFunction::create(
+ vm, lexicalGlobalObject, 0, "maybeReadMore_"_s, jsReadable_maybeReadMore_, ImplementationVisibility::Public);
+
+ lexicalGlobalObject->queueMicrotask(maybeReadMore_, streamVal, stateVal, JSValue{}, JSValue{});
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
+}
+
+void flow(JSGlobalObject* lexicalGlobalObject, JSObject* stream, JSReadableState* state)
+{
+ VM& vm = lexicalGlobalObject->vm();
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+
+ auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto callData = JSC::getCallData(read);
+ if (callData.type == CallData::Type::None) {
+ throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read));
+ return;
+ }
+ MarkedArgumentBuffer args;
+
+ while (state->m_flowing > 0) {
+ JSValue ret = JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args);
+ if (ret.isNull())
+ break;
+ }
+}
+
+static JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_);
+JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+{
+ VM& vm = lexicalGlobalObject->vm();
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+
+ if (callFrame->argumentCount() < 2) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject);
+ RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined()));
+ JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1));
+ if (!state) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ if (!state->m_reading) {
+ // stream.read(0)
+ auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto callData = JSC::getCallData(read);
+ if (callData.type == CallData::Type::None) {
+ throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read));
+ return JSValue::encode(jsUndefined());
+ }
+ MarkedArgumentBuffer args;
+ args.append(jsNumber(0));
+ JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args);
+ }
+
+ state->m_resumeScheduled = true;
+ // stream.emit('resume')
+ auto eventType = Identifier::fromString(vm, "resume"_s);
+ MarkedArgumentBuffer args;
+ auto emitter = jsDynamicCast<JSEventEmitter*>(stream);
+ if (!emitter) {
+ throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s);
+ return JSValue::encode(jsUndefined());
+ }
+ emitter->wrapped().emitForBindings(eventType, args);
+
+ flow(lexicalGlobalObject, stream, state);
+
+ if (state->m_flowing && !state->m_reading) {
+ // stream.read(0)
+ auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto callData = JSC::getCallData(read);
+ if (callData.type == CallData::Type::None) {
+ throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read));
+ return JSValue::encode(jsUndefined());
+ }
+ MarkedArgumentBuffer args;
+ args.append(jsNumber(0));
+ JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args);
+ }
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
+}
+
+JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+{
+ VM& vm = lexicalGlobalObject->vm();
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+
+ if (callFrame->argumentCount() < 2) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ JSValue streamVal = callFrame->uncheckedArgument(0);
+ JSValue stateVal = callFrame->uncheckedArgument(1);
+
+ JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1));
+ if (!state) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ if (!state->m_resumeScheduled) {
+ state->m_resumeScheduled = true;
+ // make this static?
+ JSFunction* resume_ = JSC::JSFunction::create(
+ vm, lexicalGlobalObject, 0, "resume_"_s, jsReadable_resume_, ImplementationVisibility::Public);
+
+ lexicalGlobalObject->queueMicrotask(resume_, streamVal, stateVal, JSValue{}, JSValue{});
+ }
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
+}
+
+JSC_DEFINE_HOST_FUNCTION(jsReadable_emitReadable_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+{
+ VM& vm = lexicalGlobalObject->vm();
+ auto throwScope = DECLARE_THROW_SCOPE(vm);
+
+ if (callFrame->argumentCount() < 2) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject);
+ RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined()));
+ JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1));
+ if (!state) {
+ throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s);
+ return JSValue::encode(jsUndefined());
+ }
+
+ JSValue errored = state->getDirect(vm, JSC::Identifier::fromString(vm, "errored"_s));
+ if (!state->m_destroyed && !errored.toBoolean(lexicalGlobalObject) && (state->m_length || state->m_ended)) {
+ // stream.emit('readable')
+ auto eventType = Identifier::fromString(vm, "readable"_s);
+ MarkedArgumentBuffer args;
+ auto emitter = jsDynamicCast<JSEventEmitter*>(stream);
+ if (!emitter) {
+ throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s);
+ return JSValue::encode(jsUndefined());
+ }
+ emitter->wrapped().emitForBindings(eventType, args);
+
+ state->m_emittedReadable = false;
+ }
+
+ state->m_needReadable = state->m_flowing <= 0 && !state->m_ended && state->m_length <= state->m_highWaterMark;
+ flow(lexicalGlobalObject, stream, state);
+ RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
+}
+
+} // namespace WebCore
diff --git a/src/bun.js/bindings/JSReadableHelper.h b/src/bun.js/bindings/JSReadableHelper.h
new file mode 100644
index 000000000..9a60fc301
--- /dev/null
+++ b/src/bun.js/bindings/JSReadableHelper.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "root.h"
+
+namespace WebCore {
+
+JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore);
+JSC_DECLARE_HOST_FUNCTION(jsReadable_resume);
+JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_);
+
+} // namespace WebCore
diff --git a/src/bun.js/bindings/JSReadableState.cpp b/src/bun.js/bindings/JSReadableState.cpp
index fe19449ff..bf1bc1155 100644
--- a/src/bun.js/bindings/JSReadableState.cpp
+++ b/src/bun.js/bindings/JSReadableState.cpp
@@ -14,8 +14,6 @@ namespace WebCore {
using namespace JSC;
static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_pipesCount);
-static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_paused);
-static JSC_DECLARE_CUSTOM_GETTER(setJSReadableState_paused);
int64_t getHighWaterMark(JSC::VM& vm, JSC::JSGlobalObject* globalObject, bool isDuplex, JSObject* options)
{
@@ -52,79 +50,31 @@ void JSReadableState::finishCreation(JSC::VM& vm, JSC::JSGlobalObject* globalObj
}
putDirect(vm, WTFMove(objectModeIdent), JSC::jsBoolean(objectMode));
- int64_t highWaterMark = objectMode ? 16 : 16 * 1024; // default value
+ m_highWaterMark = objectMode ? 16 : 16 * 1024; // default value
if (options != nullptr) {
int64_t customHightWaterMark = getHighWaterMark(vm, globalObject, isDuplex, options);
if (customHightWaterMark >= 0)
- highWaterMark = customHightWaterMark;
+ m_highWaterMark = customHightWaterMark;
}
- putDirect(vm, JSC::Identifier::fromString(vm, "highWaterMark"_s), JSC::jsNumber(highWaterMark));
putDirect(vm, JSC::Identifier::fromString(vm, "buffer"_s), JSBufferList::create(
vm, globalObject, reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBufferListStructure()));
- putDirect(vm, JSC::Identifier::fromString(vm, "length"_s), JSC::jsNumber(0));
putDirect(vm, JSC::Identifier::fromString(vm, "pipes"_s), JSC::constructEmptyArray(globalObject, nullptr, 0));
- putDirect(vm, JSC::Identifier::fromString(vm, "flowing"_s), JSC::jsNull());
- putDirect(vm, JSC::Identifier::fromString(vm, "ended"_s), JSC::jsBoolean(false));
- putDirect(vm, JSC::Identifier::fromString(vm, "endEmitted"_s), JSC::jsBoolean(false));
- // Stream is still being constructed and cannot be
- // destroyed until construction finished or failed.
- // Async construction is opt in, therefore we start as
- // constructed.
- putDirect(vm, JSC::Identifier::fromString(vm, "reading"_s), JSC::jsBoolean(false));
-
- // A flag to be able to tell if the event 'readable'/'data' is emitted
- // immediately, or on a later tick. We set this to true at first, because
- // any actions that shouldn't happen until "later" should generally also
- // not happen before the first read call.
- putDirect(vm, JSC::Identifier::fromString(vm, "constructed"_s), JSC::jsBoolean(true));
-
- // Whenever we return null, then we set a flag to say
- // that we're awaiting a 'readable' event emission.
- putDirect(vm, JSC::Identifier::fromString(vm, "sync"_s), JSC::jsBoolean(true));
-
- putDirect(vm, JSC::Identifier::fromString(vm, "needReadable"_s), JSC::jsBoolean(false));
- putDirect(vm, JSC::Identifier::fromString(vm, "emittedReadable"_s), JSC::jsBoolean(false));
- putDirect(vm, JSC::Identifier::fromString(vm, "readableListening"_s), JSC::jsBoolean(false));
- putDirect(vm, JSC::Identifier::fromString(vm, "resumeScheduled"_s), JSC::jsBoolean(false));
-
- // Should close be emitted on destroy. Defaults to true.
- putDirect(vm, JSC::Identifier::fromString(vm, "errorEmitted"_s), JSC::jsBoolean(false));
if (options == nullptr) {
- // Should .destroy() be called after 'end' (and potentially 'finish').
- putDirect(vm, JSC::Identifier::fromString(vm, "emitClose"_s), JSC::jsBoolean(false));
- // Has it been destroyed.
- putDirect(vm, JSC::Identifier::fromString(vm, "autoDestroy"_s), JSC::jsBoolean(false));
+ m_emitClose = false;
+ m_autoDestroy = false;
} else {
- // Should .destroy() be called after 'end' (and potentially 'finish').
- auto emitCloseIdent = JSC::Identifier::fromString(vm, "emitClose"_s);
- JSC::JSValue emitCloseVal = options->getDirect(vm, emitCloseIdent);
- putDirect(vm, WTFMove(emitCloseIdent), JSC::jsBoolean(!emitCloseVal.isBoolean() || emitCloseVal.toBoolean(globalObject)));
+ JSC::JSValue emitCloseVal = options->getDirect(vm, JSC::Identifier::fromString(vm, "emitClose"_s));
+ m_emitClose = !emitCloseVal.isBoolean() || emitCloseVal.toBoolean(globalObject);
// Has it been destroyed.
- auto autoDestroyIdent = JSC::Identifier::fromString(vm, "autoDestroy"_s);
- JSC::JSValue autoDestroyVal = options->getDirect(vm, autoDestroyIdent);
- putDirect(vm, WTFMove(autoDestroyIdent), JSC::jsBoolean(!autoDestroyVal.isBoolean() || autoDestroyVal.toBoolean(globalObject)));
+ JSC::JSValue autoDestroyVal = options->getDirect(vm, JSC::Identifier::fromString(vm, "autoDestroy"_s));
+ m_autoDestroy = !autoDestroyVal.isBoolean() || autoDestroyVal.toBoolean(globalObject);
}
- // Indicates whether the stream has errored. When true no further
- // _read calls, 'data' or 'readable' events should occur. This is needed
- // since when autoDestroy is disabled we need a way to tell whether the
- // stream has failed.
- putDirect(vm, JSC::Identifier::fromString(vm, "destroyed"_s), JSC::jsBoolean(false));
-
// Indicates whether the stream has finished destroying.
putDirect(vm, JSC::Identifier::fromString(vm, "errored"_s), JSC::jsNull());
- // True if close has been emitted or would have been emitted
- // depending on emitClose.
- putDirect(vm, JSC::Identifier::fromString(vm, "closed"_s), JSC::jsBoolean(false));
-
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- putDirect(vm, JSC::Identifier::fromString(vm, "closeEmitted"_s), JSC::jsBoolean(false));
-
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>.
auto defaultEncodingIdent = JSC::Identifier::fromString(vm, "defaultEncoding"_s);
@@ -140,11 +90,6 @@ void JSReadableState::finishCreation(JSC::VM& vm, JSC::JSGlobalObject* globalObj
}
putDirect(vm, JSC::Identifier::fromString(vm, "awaitDrainWriters"_s), JSC::jsNull());
- // If true, a maybeReadMore has been scheduled.
- putDirect(vm, JSC::Identifier::fromString(vm, "multiAwaitDrain"_s), JSC::jsBoolean(false));
-
- putDirect(vm, JSC::Identifier::fromString(vm, "readingMore"_s), JSC::jsBoolean(false));
- putDirect(vm, JSC::Identifier::fromString(vm, "dataEmitted"_s), JSC::jsBoolean(false));
auto decoderIdent = JSC::Identifier::fromString(vm, "decoder"_s);
auto encodingIdent = JSC::Identifier::fromString(vm, "encoding"_s);
@@ -201,38 +146,133 @@ JSC_DEFINE_CUSTOM_GETTER(jsReadableState_pipesCount, (JSGlobalObject * lexicalGl
RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNumber(pipes->length())));
}
-JSC_DEFINE_CUSTOM_GETTER(jsReadableState_paused, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName))
-{
- auto& vm = JSC::getVM(lexicalGlobalObject);
- auto throwScope = DECLARE_THROW_SCOPE(vm);
- JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue));
- if (!state) {
- RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined()));
+#define JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(NAME) \
+ static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_##NAME); \
+ JSC_DEFINE_CUSTOM_GETTER(jsReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) \
+ { \
+ auto& vm = JSC::getVM(lexicalGlobalObject); \
+ auto throwScope = DECLARE_THROW_SCOPE(vm); \
+ JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \
+ if (!state) { \
+ RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); \
+ } \
+ if (state->m_##NAME == 0) \
+ RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNull())); \
+ RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsBoolean(state->m_##NAME > 0))); \
+ } \
+ static JSC_DECLARE_CUSTOM_SETTER(setJSReadableState_##NAME); \
+ JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) \
+ { \
+ auto& vm = JSC::getVM(lexicalGlobalObject); \
+ auto throwScope = DECLARE_THROW_SCOPE(vm); \
+ JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \
+ if (!state) { \
+ RETURN_IF_EXCEPTION(throwScope, false); \
+ } \
+ auto value = JSC::JSValue::decode(encodedValue); \
+ state->m_##NAME = value.isNull() ? 0 : value.toBoolean(lexicalGlobalObject) ? 1 : -1; \
+ RELEASE_AND_RETURN(throwScope, true); \
}
- if (state->m_paused == 0)
- RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNull()));
- RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsBoolean(state->m_paused > 0)));
-}
-JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_paused, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName))
-{
- auto& vm = JSC::getVM(lexicalGlobalObject);
- auto throwScope = DECLARE_THROW_SCOPE(vm);
- JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue));
- if (!state) {
- RETURN_IF_EXCEPTION(throwScope, false);
+JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(paused)
+JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(flowing)
+
+#undef JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER
+
+#define JSReadableState_GETTER_SETTER(NAME, TYPE) \
+ static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_##NAME); \
+ JSC_DEFINE_CUSTOM_GETTER(jsReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) \
+ { \
+ auto& vm = JSC::getVM(lexicalGlobalObject); \
+ auto throwScope = DECLARE_THROW_SCOPE(vm); \
+ JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \
+ if (!state) { \
+ RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); \
+ } \
+ RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::js##TYPE(state->m_##NAME))); \
+ } \
+ \
+ static JSC_DECLARE_CUSTOM_SETTER(setJSReadableState_##NAME); \
+ JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) \
+ { \
+ auto& vm = JSC::getVM(lexicalGlobalObject); \
+ auto throwScope = DECLARE_THROW_SCOPE(vm); \
+ JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \
+ if (!state) { \
+ RETURN_IF_EXCEPTION(throwScope, false); \
+ } \
+ state->m_##NAME = JSC::JSValue::decode(encodedValue).to##TYPE(lexicalGlobalObject); \
+ RETURN_IF_EXCEPTION(throwScope, false); \
+ RELEASE_AND_RETURN(throwScope, true); \
}
- state->m_paused = JSC::JSValue::decode(encodedValue).toBoolean(lexicalGlobalObject) ? 1 : -1;
- RELEASE_AND_RETURN(throwScope, true);
-}
+
+#define JSReadableState_BOOLEAN_GETTER_SETTER(NAME) \
+ JSReadableState_GETTER_SETTER(NAME, Boolean)
+
+#define JSReadableState_NUMBER_GETTER_SETTER(NAME) \
+ JSReadableState_GETTER_SETTER(NAME, Number)
+
+JSReadableState_BOOLEAN_GETTER_SETTER(ended)
+JSReadableState_BOOLEAN_GETTER_SETTER(endEmitted)
+JSReadableState_BOOLEAN_GETTER_SETTER(reading)
+JSReadableState_BOOLEAN_GETTER_SETTER(constructed)
+JSReadableState_BOOLEAN_GETTER_SETTER(sync)
+JSReadableState_BOOLEAN_GETTER_SETTER(needReadable)
+JSReadableState_BOOLEAN_GETTER_SETTER(emittedReadable)
+JSReadableState_BOOLEAN_GETTER_SETTER(readableListening)
+JSReadableState_BOOLEAN_GETTER_SETTER(resumeScheduled)
+JSReadableState_BOOLEAN_GETTER_SETTER(errorEmitted)
+JSReadableState_BOOLEAN_GETTER_SETTER(emitClose)
+JSReadableState_BOOLEAN_GETTER_SETTER(autoDestroy)
+JSReadableState_BOOLEAN_GETTER_SETTER(destroyed)
+JSReadableState_BOOLEAN_GETTER_SETTER(closed)
+JSReadableState_BOOLEAN_GETTER_SETTER(closeEmitted)
+JSReadableState_BOOLEAN_GETTER_SETTER(multiAwaitDrain)
+JSReadableState_BOOLEAN_GETTER_SETTER(readingMore)
+JSReadableState_BOOLEAN_GETTER_SETTER(dataEmitted)
+
+JSReadableState_NUMBER_GETTER_SETTER(length)
+JSReadableState_NUMBER_GETTER_SETTER(highWaterMark)
+
+#undef JSReadableState_NUMBER_GETTER_SETTER
+#undef JSReadableState_BOOLEAN_GETTER_SETTER
+#undef JSReadableState_GETTER_SETTER
+
+#define JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(NAME) \
+ { #NAME ""_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_##NAME, setJSReadableState_##NAME } }
/* Hash table for prototype */
static const HashTableValue JSReadableStatePrototypeTableValues[]
= {
{ "pipesCount"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_pipesCount, 0 } },
- { "paused"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_paused, setJSReadableState_paused } },
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(paused),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(flowing),
+
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(ended),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(endEmitted),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(reading),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(constructed),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(sync),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(needReadable),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(emittedReadable),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(readableListening),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(resumeScheduled),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(errorEmitted),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(emitClose),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(autoDestroy),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(destroyed),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(closed),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(closeEmitted),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(multiAwaitDrain),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(readingMore),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(dataEmitted),
+
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(length),
+ JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(highWaterMark),
};
+#undef JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE
+
void JSReadableStatePrototype::finishCreation(VM& vm, JSC::JSGlobalObject* globalThis)
{
Base::finishCreation(vm);
diff --git a/src/bun.js/bindings/JSReadableState.h b/src/bun.js/bindings/JSReadableState.h
index fd78f0411..d06a9dbc3 100644
--- a/src/bun.js/bindings/JSReadableState.h
+++ b/src/bun.js/bindings/JSReadableState.h
@@ -46,7 +46,30 @@ public:
static void destroy(JSCell*) {}
// 0 for null, 1 for true, -1 for false
- int8_t m_paused;
+ int8_t m_paused = 0;
+ int8_t m_flowing = 0;
+ bool m_ended = false;
+ bool m_endEmitted = false;
+ bool m_reading = false;
+ bool m_constructed = true;
+ bool m_sync = true;
+ bool m_needReadable = false;
+ bool m_emittedReadable = false;
+ bool m_readableListening = false;
+ bool m_resumeScheduled = false;
+ bool m_errorEmitted = false;
+ // These 2 are initialized from options
+ bool m_emitClose;
+ bool m_autoDestroy;
+ bool m_destroyed = false;
+ bool m_closed = false;
+ bool m_closeEmitted = false;
+ bool m_multiAwaitDrain = false;
+ bool m_readingMore = false;
+ bool m_dataEmitted = false;
+
+ int64_t m_length = 0;
+ int64_t m_highWaterMark;
};
class JSReadableStatePrototype : public JSC::JSNonFinalObject {
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index 1cedb04cc..012ad2d66 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -89,6 +89,7 @@
#include "JSFetchHeaders.h"
#include "JSStringDecoder.h"
#include "JSReadableState.h"
+#include "JSReadableHelper.h"
#include "Process.h"
@@ -1042,6 +1043,15 @@ JSC:
auto* obj = constructEmptyObject(globalObject);
obj->putDirect(vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "BufferList"_s)), reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBufferList(), 0);
obj->putDirect(vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "ReadableState"_s)), reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSReadableState(), 0);
+ obj->putDirect(
+ vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "maybeReadMore"_s)),
+ JSC::JSFunction::create(vm, globalObject, 0, "maybeReadMore"_s, jsReadable_maybeReadMore, ImplementationVisibility::Public), 0);
+ obj->putDirect(
+ vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "resume"_s)),
+ JSC::JSFunction::create(vm, globalObject, 0, "resume"_s, jsReadable_resume, ImplementationVisibility::Public), 0);
+ obj->putDirect(
+ vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "emitReadable_"_s)),
+ JSC::JSFunction::create(vm, globalObject, 0, "emitReadable_"_s, jsReadable_emitReadable_, ImplementationVisibility::Public), 0);
return JSValue::encode(obj);
}
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index fffb9d0cd..9bb74daf4 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -2485,10 +2485,12 @@ var require_readable = __commonJS({
var debug = require_util().debuglog("stream", (fn) => {
debug = fn;
});
- var BufferList =
- globalThis[Symbol.for("Bun.lazy")]("bun:stream").BufferList;
+ const {
+ maybeReadMore,
+ resume,
+ emitReadable_,
+ } = globalThis[Symbol.for("Bun.lazy")]("bun:stream");
var destroyImpl = require_destroy();
- var { getHighWaterMark, getDefaultHighWaterMark } = require_state();
var {
aggregateTwoErrors,
codes: {
@@ -2769,7 +2771,7 @@ var require_readable = __commonJS({
} else {
state.needReadable = false;
state.emittedReadable = true;
- emitReadable_(stream);
+ emitReadable_(stream, state);
}
}
function emitReadable(stream) {
@@ -2779,39 +2781,8 @@ var require_readable = __commonJS({
if (!state.emittedReadable) {
debug("emitReadable", state.flowing);
state.emittedReadable = true;
- runOnNextTick(emitReadable_, stream);
- }
- }
- function emitReadable_(stream) {
- const state = stream._readableState;
- debug("emitReadable_", state.destroyed, state.length, state.ended);
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit("readable");
- state.emittedReadable = false;
- }
- state.needReadable =
- !state.flowing && !state.ended && state.length <= state.highWaterMark;
- flow(stream);
- }
- function maybeReadMore(stream, state) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true;
- runOnNextTick(maybeReadMore_, stream, state);
- }
- }
- function maybeReadMore_(stream, state) {
- while (
- !state.reading &&
- !state.ended &&
- (state.length < state.highWaterMark ||
- (state.flowing && state.length === 0))
- ) {
- const len = state.length;
- debug("maybeReadMore read 0");
- stream.read(0);
- if (len === state.length) break;
+ runOnNextTick(emitReadable_, stream, state);
}
- state.readingMore = false;
}
Readable.prototype._read = function (n) {
throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
@@ -3041,22 +3012,6 @@ var require_readable = __commonJS({
state.paused = false;
return this;
};
- function resume(stream, state) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- runOnNextTick(resume_, stream, state);
- }
- }
- function resume_(stream, state) {
- debug("resume", state.reading);
- if (!state.reading) {
- stream.read(0);
- }
- state.resumeScheduled = false;
- stream.emit("resume");
- flow(stream);
- if (state.flowing && !state.reading) stream.read(0);
- }
Readable.prototype.pause = function () {
debug("call pause flowing=%j", this._readableState.flowing);
if (this._readableState.flowing !== false) {