diff options
-rw-r--r-- | src/bun.js/bindings/JSReadableHelper.cpp | 228 | ||||
-rw-r--r-- | src/bun.js/bindings/JSReadableHelper.h | 11 | ||||
-rw-r--r-- | src/bun.js/bindings/JSReadableState.cpp | 210 | ||||
-rw-r--r-- | src/bun.js/bindings/JSReadableState.h | 25 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.cpp | 10 | ||||
-rw-r--r-- | src/bun.js/streams.exports.js | 59 |
6 files changed, 405 insertions, 138 deletions
diff --git a/src/bun.js/bindings/JSReadableHelper.cpp b/src/bun.js/bindings/JSReadableHelper.cpp new file mode 100644 index 000000000..ef17982d4 --- /dev/null +++ b/src/bun.js/bindings/JSReadableHelper.cpp @@ -0,0 +1,228 @@ +#include "JSReadableHelper.h" +#include "JSReadableState.h" +#include "JSBufferList.h" +#include "JSBuffer.h" +#include "JSEventEmitter.h" +#include "JavaScriptCore/Lookup.h" +#include "JavaScriptCore/ObjectConstructor.h" +#include "ZigGlobalObject.h" +#include "JSDOMOperation.h" +#include "JSDOMAttribute.h" +#include "headers.h" +#include "JSDOMConvertEnumeration.h" + +namespace WebCore { +using namespace JSC; + +static JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore_); +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return JSValue::encode(jsUndefined()); + } + MarkedArgumentBuffer args; + args.append(jsNumber(0)); + + while ( + !state->m_reading && + !state->m_ended && + (state->m_length < state->m_highWaterMark || (state->m_flowing > 0 && state->m_length == 0))) { + int64_t len = state->m_length; + + JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + + if (len == state->m_length) + break; + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSValue streamVal = callFrame->uncheckedArgument(0); + JSValue stateVal = callFrame->uncheckedArgument(1); + + // make this static? + JSFunction* maybeReadMore_ = JSC::JSFunction::create( + vm, lexicalGlobalObject, 0, "maybeReadMore_"_s, jsReadable_maybeReadMore_, ImplementationVisibility::Public); + + lexicalGlobalObject->queueMicrotask(maybeReadMore_, streamVal, stateVal, JSValue{}, JSValue{}); + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +void flow(JSGlobalObject* lexicalGlobalObject, JSObject* stream, JSReadableState* state) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return; + } + MarkedArgumentBuffer args; + + while (state->m_flowing > 0) { + JSValue ret = JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + if (ret.isNull()) + break; + } +} + +static JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_); +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + if (!state->m_reading) { + // stream.read(0) + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return JSValue::encode(jsUndefined()); + } + MarkedArgumentBuffer args; + args.append(jsNumber(0)); + JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + } + + state->m_resumeScheduled = true; + // stream.emit('resume') + auto eventType = Identifier::fromString(vm, "resume"_s); + MarkedArgumentBuffer args; + auto emitter = jsDynamicCast<JSEventEmitter*>(stream); + if (!emitter) { + throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s); + return JSValue::encode(jsUndefined()); + } + emitter->wrapped().emitForBindings(eventType, args); + + flow(lexicalGlobalObject, stream, state); + + if (state->m_flowing && !state->m_reading) { + // stream.read(0) + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return JSValue::encode(jsUndefined()); + } + MarkedArgumentBuffer args; + args.append(jsNumber(0)); + JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSValue streamVal = callFrame->uncheckedArgument(0); + JSValue stateVal = callFrame->uncheckedArgument(1); + + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + if (!state->m_resumeScheduled) { + state->m_resumeScheduled = true; + // make this static? + JSFunction* resume_ = JSC::JSFunction::create( + vm, lexicalGlobalObject, 0, "resume_"_s, jsReadable_resume_, ImplementationVisibility::Public); + + lexicalGlobalObject->queueMicrotask(resume_, streamVal, stateVal, JSValue{}, JSValue{}); + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_emitReadable_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + JSValue errored = state->getDirect(vm, JSC::Identifier::fromString(vm, "errored"_s)); + if (!state->m_destroyed && !errored.toBoolean(lexicalGlobalObject) && (state->m_length || state->m_ended)) { + // stream.emit('readable') + auto eventType = Identifier::fromString(vm, "readable"_s); + MarkedArgumentBuffer args; + auto emitter = jsDynamicCast<JSEventEmitter*>(stream); + if (!emitter) { + throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s); + return JSValue::encode(jsUndefined()); + } + emitter->wrapped().emitForBindings(eventType, args); + + state->m_emittedReadable = false; + } + + state->m_needReadable = state->m_flowing <= 0 && !state->m_ended && state->m_length <= state->m_highWaterMark; + flow(lexicalGlobalObject, stream, state); + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +} // namespace WebCore diff --git a/src/bun.js/bindings/JSReadableHelper.h b/src/bun.js/bindings/JSReadableHelper.h new file mode 100644 index 000000000..9a60fc301 --- /dev/null +++ b/src/bun.js/bindings/JSReadableHelper.h @@ -0,0 +1,11 @@ +#pragma once + +#include "root.h" + +namespace WebCore { + +JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore); +JSC_DECLARE_HOST_FUNCTION(jsReadable_resume); +JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_); + +} // namespace WebCore diff --git a/src/bun.js/bindings/JSReadableState.cpp b/src/bun.js/bindings/JSReadableState.cpp index fe19449ff..bf1bc1155 100644 --- a/src/bun.js/bindings/JSReadableState.cpp +++ b/src/bun.js/bindings/JSReadableState.cpp @@ -14,8 +14,6 @@ namespace WebCore { using namespace JSC; static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_pipesCount); -static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_paused); -static JSC_DECLARE_CUSTOM_GETTER(setJSReadableState_paused); int64_t getHighWaterMark(JSC::VM& vm, JSC::JSGlobalObject* globalObject, bool isDuplex, JSObject* options) { @@ -52,79 +50,31 @@ void JSReadableState::finishCreation(JSC::VM& vm, JSC::JSGlobalObject* globalObj } putDirect(vm, WTFMove(objectModeIdent), JSC::jsBoolean(objectMode)); - int64_t highWaterMark = objectMode ? 16 : 16 * 1024; // default value + m_highWaterMark = objectMode ? 16 : 16 * 1024; // default value if (options != nullptr) { int64_t customHightWaterMark = getHighWaterMark(vm, globalObject, isDuplex, options); if (customHightWaterMark >= 0) - highWaterMark = customHightWaterMark; + m_highWaterMark = customHightWaterMark; } - putDirect(vm, JSC::Identifier::fromString(vm, "highWaterMark"_s), JSC::jsNumber(highWaterMark)); putDirect(vm, JSC::Identifier::fromString(vm, "buffer"_s), JSBufferList::create( vm, globalObject, reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBufferListStructure())); - putDirect(vm, JSC::Identifier::fromString(vm, "length"_s), JSC::jsNumber(0)); putDirect(vm, JSC::Identifier::fromString(vm, "pipes"_s), JSC::constructEmptyArray(globalObject, nullptr, 0)); - putDirect(vm, JSC::Identifier::fromString(vm, "flowing"_s), JSC::jsNull()); - putDirect(vm, JSC::Identifier::fromString(vm, "ended"_s), JSC::jsBoolean(false)); - putDirect(vm, JSC::Identifier::fromString(vm, "endEmitted"_s), JSC::jsBoolean(false)); - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - putDirect(vm, JSC::Identifier::fromString(vm, "reading"_s), JSC::jsBoolean(false)); - - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - putDirect(vm, JSC::Identifier::fromString(vm, "constructed"_s), JSC::jsBoolean(true)); - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - putDirect(vm, JSC::Identifier::fromString(vm, "sync"_s), JSC::jsBoolean(true)); - - putDirect(vm, JSC::Identifier::fromString(vm, "needReadable"_s), JSC::jsBoolean(false)); - putDirect(vm, JSC::Identifier::fromString(vm, "emittedReadable"_s), JSC::jsBoolean(false)); - putDirect(vm, JSC::Identifier::fromString(vm, "readableListening"_s), JSC::jsBoolean(false)); - putDirect(vm, JSC::Identifier::fromString(vm, "resumeScheduled"_s), JSC::jsBoolean(false)); - - // Should close be emitted on destroy. Defaults to true. - putDirect(vm, JSC::Identifier::fromString(vm, "errorEmitted"_s), JSC::jsBoolean(false)); if (options == nullptr) { - // Should .destroy() be called after 'end' (and potentially 'finish'). - putDirect(vm, JSC::Identifier::fromString(vm, "emitClose"_s), JSC::jsBoolean(false)); - // Has it been destroyed. - putDirect(vm, JSC::Identifier::fromString(vm, "autoDestroy"_s), JSC::jsBoolean(false)); + m_emitClose = false; + m_autoDestroy = false; } else { - // Should .destroy() be called after 'end' (and potentially 'finish'). - auto emitCloseIdent = JSC::Identifier::fromString(vm, "emitClose"_s); - JSC::JSValue emitCloseVal = options->getDirect(vm, emitCloseIdent); - putDirect(vm, WTFMove(emitCloseIdent), JSC::jsBoolean(!emitCloseVal.isBoolean() || emitCloseVal.toBoolean(globalObject))); + JSC::JSValue emitCloseVal = options->getDirect(vm, JSC::Identifier::fromString(vm, "emitClose"_s)); + m_emitClose = !emitCloseVal.isBoolean() || emitCloseVal.toBoolean(globalObject); // Has it been destroyed. - auto autoDestroyIdent = JSC::Identifier::fromString(vm, "autoDestroy"_s); - JSC::JSValue autoDestroyVal = options->getDirect(vm, autoDestroyIdent); - putDirect(vm, WTFMove(autoDestroyIdent), JSC::jsBoolean(!autoDestroyVal.isBoolean() || autoDestroyVal.toBoolean(globalObject))); + JSC::JSValue autoDestroyVal = options->getDirect(vm, JSC::Identifier::fromString(vm, "autoDestroy"_s)); + m_autoDestroy = !autoDestroyVal.isBoolean() || autoDestroyVal.toBoolean(globalObject); } - // Indicates whether the stream has errored. When true no further - // _read calls, 'data' or 'readable' events should occur. This is needed - // since when autoDestroy is disabled we need a way to tell whether the - // stream has failed. - putDirect(vm, JSC::Identifier::fromString(vm, "destroyed"_s), JSC::jsBoolean(false)); - // Indicates whether the stream has finished destroying. putDirect(vm, JSC::Identifier::fromString(vm, "errored"_s), JSC::jsNull()); - // True if close has been emitted or would have been emitted - // depending on emitClose. - putDirect(vm, JSC::Identifier::fromString(vm, "closed"_s), JSC::jsBoolean(false)); - - // Crypto is kind of old and crusty. Historically, its default string - // encoding is 'binary' so we have to make this configurable. - // Everything else in the universe uses 'utf8', though. - putDirect(vm, JSC::Identifier::fromString(vm, "closeEmitted"_s), JSC::jsBoolean(false)); - // Ref the piped dest which we need a drain event on it // type: null | Writable | Set<Writable>. auto defaultEncodingIdent = JSC::Identifier::fromString(vm, "defaultEncoding"_s); @@ -140,11 +90,6 @@ void JSReadableState::finishCreation(JSC::VM& vm, JSC::JSGlobalObject* globalObj } putDirect(vm, JSC::Identifier::fromString(vm, "awaitDrainWriters"_s), JSC::jsNull()); - // If true, a maybeReadMore has been scheduled. - putDirect(vm, JSC::Identifier::fromString(vm, "multiAwaitDrain"_s), JSC::jsBoolean(false)); - - putDirect(vm, JSC::Identifier::fromString(vm, "readingMore"_s), JSC::jsBoolean(false)); - putDirect(vm, JSC::Identifier::fromString(vm, "dataEmitted"_s), JSC::jsBoolean(false)); auto decoderIdent = JSC::Identifier::fromString(vm, "decoder"_s); auto encodingIdent = JSC::Identifier::fromString(vm, "encoding"_s); @@ -201,38 +146,133 @@ JSC_DEFINE_CUSTOM_GETTER(jsReadableState_pipesCount, (JSGlobalObject * lexicalGl RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNumber(pipes->length()))); } -JSC_DEFINE_CUSTOM_GETTER(jsReadableState_paused, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) -{ - auto& vm = JSC::getVM(lexicalGlobalObject); - auto throwScope = DECLARE_THROW_SCOPE(vm); - JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); - if (!state) { - RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); +#define JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(NAME) \ + static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_##NAME); \ + JSC_DEFINE_CUSTOM_GETTER(jsReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) \ + { \ + auto& vm = JSC::getVM(lexicalGlobalObject); \ + auto throwScope = DECLARE_THROW_SCOPE(vm); \ + JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ + if (!state) { \ + RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); \ + } \ + if (state->m_##NAME == 0) \ + RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNull())); \ + RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsBoolean(state->m_##NAME > 0))); \ + } \ + static JSC_DECLARE_CUSTOM_SETTER(setJSReadableState_##NAME); \ + JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) \ + { \ + auto& vm = JSC::getVM(lexicalGlobalObject); \ + auto throwScope = DECLARE_THROW_SCOPE(vm); \ + JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ + if (!state) { \ + RETURN_IF_EXCEPTION(throwScope, false); \ + } \ + auto value = JSC::JSValue::decode(encodedValue); \ + state->m_##NAME = value.isNull() ? 0 : value.toBoolean(lexicalGlobalObject) ? 1 : -1; \ + RELEASE_AND_RETURN(throwScope, true); \ } - if (state->m_paused == 0) - RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNull())); - RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsBoolean(state->m_paused > 0))); -} -JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_paused, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) -{ - auto& vm = JSC::getVM(lexicalGlobalObject); - auto throwScope = DECLARE_THROW_SCOPE(vm); - JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); - if (!state) { - RETURN_IF_EXCEPTION(throwScope, false); +JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(paused) +JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(flowing) + +#undef JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER + +#define JSReadableState_GETTER_SETTER(NAME, TYPE) \ + static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_##NAME); \ + JSC_DEFINE_CUSTOM_GETTER(jsReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) \ + { \ + auto& vm = JSC::getVM(lexicalGlobalObject); \ + auto throwScope = DECLARE_THROW_SCOPE(vm); \ + JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ + if (!state) { \ + RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); \ + } \ + RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::js##TYPE(state->m_##NAME))); \ + } \ + \ + static JSC_DECLARE_CUSTOM_SETTER(setJSReadableState_##NAME); \ + JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) \ + { \ + auto& vm = JSC::getVM(lexicalGlobalObject); \ + auto throwScope = DECLARE_THROW_SCOPE(vm); \ + JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ + if (!state) { \ + RETURN_IF_EXCEPTION(throwScope, false); \ + } \ + state->m_##NAME = JSC::JSValue::decode(encodedValue).to##TYPE(lexicalGlobalObject); \ + RETURN_IF_EXCEPTION(throwScope, false); \ + RELEASE_AND_RETURN(throwScope, true); \ } - state->m_paused = JSC::JSValue::decode(encodedValue).toBoolean(lexicalGlobalObject) ? 1 : -1; - RELEASE_AND_RETURN(throwScope, true); -} + +#define JSReadableState_BOOLEAN_GETTER_SETTER(NAME) \ + JSReadableState_GETTER_SETTER(NAME, Boolean) + +#define JSReadableState_NUMBER_GETTER_SETTER(NAME) \ + JSReadableState_GETTER_SETTER(NAME, Number) + +JSReadableState_BOOLEAN_GETTER_SETTER(ended) +JSReadableState_BOOLEAN_GETTER_SETTER(endEmitted) +JSReadableState_BOOLEAN_GETTER_SETTER(reading) +JSReadableState_BOOLEAN_GETTER_SETTER(constructed) +JSReadableState_BOOLEAN_GETTER_SETTER(sync) +JSReadableState_BOOLEAN_GETTER_SETTER(needReadable) +JSReadableState_BOOLEAN_GETTER_SETTER(emittedReadable) +JSReadableState_BOOLEAN_GETTER_SETTER(readableListening) +JSReadableState_BOOLEAN_GETTER_SETTER(resumeScheduled) +JSReadableState_BOOLEAN_GETTER_SETTER(errorEmitted) +JSReadableState_BOOLEAN_GETTER_SETTER(emitClose) +JSReadableState_BOOLEAN_GETTER_SETTER(autoDestroy) +JSReadableState_BOOLEAN_GETTER_SETTER(destroyed) +JSReadableState_BOOLEAN_GETTER_SETTER(closed) +JSReadableState_BOOLEAN_GETTER_SETTER(closeEmitted) +JSReadableState_BOOLEAN_GETTER_SETTER(multiAwaitDrain) +JSReadableState_BOOLEAN_GETTER_SETTER(readingMore) +JSReadableState_BOOLEAN_GETTER_SETTER(dataEmitted) + +JSReadableState_NUMBER_GETTER_SETTER(length) +JSReadableState_NUMBER_GETTER_SETTER(highWaterMark) + +#undef JSReadableState_NUMBER_GETTER_SETTER +#undef JSReadableState_BOOLEAN_GETTER_SETTER +#undef JSReadableState_GETTER_SETTER + +#define JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(NAME) \ + { #NAME ""_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_##NAME, setJSReadableState_##NAME } } /* Hash table for prototype */ static const HashTableValue JSReadableStatePrototypeTableValues[] = { { "pipesCount"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_pipesCount, 0 } }, - { "paused"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_paused, setJSReadableState_paused } }, + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(paused), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(flowing), + + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(ended), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(endEmitted), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(reading), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(constructed), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(sync), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(needReadable), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(emittedReadable), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(readableListening), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(resumeScheduled), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(errorEmitted), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(emitClose), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(autoDestroy), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(destroyed), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(closed), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(closeEmitted), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(multiAwaitDrain), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(readingMore), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(dataEmitted), + + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(length), + JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(highWaterMark), }; +#undef JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE + void JSReadableStatePrototype::finishCreation(VM& vm, JSC::JSGlobalObject* globalThis) { Base::finishCreation(vm); diff --git a/src/bun.js/bindings/JSReadableState.h b/src/bun.js/bindings/JSReadableState.h index fd78f0411..d06a9dbc3 100644 --- a/src/bun.js/bindings/JSReadableState.h +++ b/src/bun.js/bindings/JSReadableState.h @@ -46,7 +46,30 @@ public: static void destroy(JSCell*) {} // 0 for null, 1 for true, -1 for false - int8_t m_paused; + int8_t m_paused = 0; + int8_t m_flowing = 0; + bool m_ended = false; + bool m_endEmitted = false; + bool m_reading = false; + bool m_constructed = true; + bool m_sync = true; + bool m_needReadable = false; + bool m_emittedReadable = false; + bool m_readableListening = false; + bool m_resumeScheduled = false; + bool m_errorEmitted = false; + // These 2 are initialized from options + bool m_emitClose; + bool m_autoDestroy; + bool m_destroyed = false; + bool m_closed = false; + bool m_closeEmitted = false; + bool m_multiAwaitDrain = false; + bool m_readingMore = false; + bool m_dataEmitted = false; + + int64_t m_length = 0; + int64_t m_highWaterMark; }; class JSReadableStatePrototype : public JSC::JSNonFinalObject { diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 1cedb04cc..012ad2d66 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -89,6 +89,7 @@ #include "JSFetchHeaders.h" #include "JSStringDecoder.h" #include "JSReadableState.h" +#include "JSReadableHelper.h" #include "Process.h" @@ -1042,6 +1043,15 @@ JSC: auto* obj = constructEmptyObject(globalObject); obj->putDirect(vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "BufferList"_s)), reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBufferList(), 0); obj->putDirect(vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "ReadableState"_s)), reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSReadableState(), 0); + obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "maybeReadMore"_s)), + JSC::JSFunction::create(vm, globalObject, 0, "maybeReadMore"_s, jsReadable_maybeReadMore, ImplementationVisibility::Public), 0); + obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "resume"_s)), + JSC::JSFunction::create(vm, globalObject, 0, "resume"_s, jsReadable_resume, ImplementationVisibility::Public), 0); + obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "emitReadable_"_s)), + JSC::JSFunction::create(vm, globalObject, 0, "emitReadable_"_s, jsReadable_emitReadable_, ImplementationVisibility::Public), 0); return JSValue::encode(obj); } diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index fffb9d0cd..9bb74daf4 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -2485,10 +2485,12 @@ var require_readable = __commonJS({ var debug = require_util().debuglog("stream", (fn) => { debug = fn; }); - var BufferList = - globalThis[Symbol.for("Bun.lazy")]("bun:stream").BufferList; + const { + maybeReadMore, + resume, + emitReadable_, + } = globalThis[Symbol.for("Bun.lazy")]("bun:stream"); var destroyImpl = require_destroy(); - var { getHighWaterMark, getDefaultHighWaterMark } = require_state(); var { aggregateTwoErrors, codes: { @@ -2769,7 +2771,7 @@ var require_readable = __commonJS({ } else { state.needReadable = false; state.emittedReadable = true; - emitReadable_(stream); + emitReadable_(stream, state); } } function emitReadable(stream) { @@ -2779,39 +2781,8 @@ var require_readable = __commonJS({ if (!state.emittedReadable) { debug("emitReadable", state.flowing); state.emittedReadable = true; - runOnNextTick(emitReadable_, stream); - } - } - function emitReadable_(stream) { - const state = stream._readableState; - debug("emitReadable_", state.destroyed, state.length, state.ended); - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit("readable"); - state.emittedReadable = false; - } - state.needReadable = - !state.flowing && !state.ended && state.length <= state.highWaterMark; - flow(stream); - } - function maybeReadMore(stream, state) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - runOnNextTick(maybeReadMore_, stream, state); - } - } - function maybeReadMore_(stream, state) { - while ( - !state.reading && - !state.ended && - (state.length < state.highWaterMark || - (state.flowing && state.length === 0)) - ) { - const len = state.length; - debug("maybeReadMore read 0"); - stream.read(0); - if (len === state.length) break; + runOnNextTick(emitReadable_, stream, state); } - state.readingMore = false; } Readable.prototype._read = function (n) { throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); @@ -3041,22 +3012,6 @@ var require_readable = __commonJS({ state.paused = false; return this; }; - function resume(stream, state) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - runOnNextTick(resume_, stream, state); - } - } - function resume_(stream, state) { - debug("resume", state.reading); - if (!state.reading) { - stream.read(0); - } - state.resumeScheduled = false; - stream.emit("resume"); - flow(stream); - if (state.flowing && !state.reading) stream.read(0); - } Readable.prototype.pause = function () { debug("call pause flowing=%j", this._readableState.flowing); if (this._readableState.flowing !== false) { |