diff options
| author | 2022-09-09 10:52:27 +0800 | |
|---|---|---|
| committer | 2022-09-08 19:52:27 -0700 | |
| commit | 8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3 (patch) | |
| tree | b7af0511edfb0ca115a4f28f43a8be0f14ae63d8 | |
| parent | a3cc9aaf6f66bac85e6318d3e09c0f1961d18210 (diff) | |
| download | bun-8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3.tar.gz bun-8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3.tar.zst bun-8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3.zip | |
Add native helper functions for Readable and convert ReadableState properties to getter/setter (#1218)
| -rw-r--r-- | src/bun.js/bindings/JSReadableHelper.cpp | 228 | ||||
| -rw-r--r-- | src/bun.js/bindings/JSReadableHelper.h | 11 | ||||
| -rw-r--r-- | src/bun.js/bindings/JSReadableState.cpp | 210 | ||||
| -rw-r--r-- | src/bun.js/bindings/JSReadableState.h | 25 | ||||
| -rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.cpp | 10 | ||||
| -rw-r--r-- | src/bun.js/streams.exports.js | 59 | 
6 files changed, 405 insertions, 138 deletions
| diff --git a/src/bun.js/bindings/JSReadableHelper.cpp b/src/bun.js/bindings/JSReadableHelper.cpp new file mode 100644 index 000000000..ef17982d4 --- /dev/null +++ b/src/bun.js/bindings/JSReadableHelper.cpp @@ -0,0 +1,228 @@ +#include "JSReadableHelper.h" +#include "JSReadableState.h" +#include "JSBufferList.h" +#include "JSBuffer.h" +#include "JSEventEmitter.h" +#include "JavaScriptCore/Lookup.h" +#include "JavaScriptCore/ObjectConstructor.h" +#include "ZigGlobalObject.h" +#include "JSDOMOperation.h" +#include "JSDOMAttribute.h" +#include "headers.h" +#include "JSDOMConvertEnumeration.h" + +namespace WebCore { +using namespace JSC; + +static JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore_); +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ +    VM& vm = lexicalGlobalObject->vm(); +    auto throwScope = DECLARE_THROW_SCOPE(vm); + +    if (callFrame->argumentCount() < 2) { +        throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); +        return JSValue::encode(jsUndefined()); +    } + +    JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); +    RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); +    JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); +    if (!state) { +        throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); +        return JSValue::encode(jsUndefined()); +    } + +    auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); +    auto callData = JSC::getCallData(read); +    if (callData.type == CallData::Type::None) { +        throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); +        return JSValue::encode(jsUndefined()); +    } +    MarkedArgumentBuffer args; +    args.append(jsNumber(0)); + +    while ( +        !state->m_reading && +        !state->m_ended && +        (state->m_length < state->m_highWaterMark || (state->m_flowing > 0 && state->m_length == 0))) { +        int64_t len = state->m_length; + +        JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + +        if (len == state->m_length) +            break; +    } +    RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ +    VM& vm = lexicalGlobalObject->vm(); +    auto throwScope = DECLARE_THROW_SCOPE(vm); + +    if (callFrame->argumentCount() < 2) { +        throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); +        return JSValue::encode(jsUndefined()); +    } + +    JSValue streamVal = callFrame->uncheckedArgument(0); +    JSValue stateVal = callFrame->uncheckedArgument(1); + +    // make this static? +    JSFunction* maybeReadMore_ = JSC::JSFunction::create( +        vm, lexicalGlobalObject, 0, "maybeReadMore_"_s, jsReadable_maybeReadMore_, ImplementationVisibility::Public); + +    lexicalGlobalObject->queueMicrotask(maybeReadMore_, streamVal, stateVal, JSValue{}, JSValue{}); +    RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +void flow(JSGlobalObject* lexicalGlobalObject, JSObject* stream, JSReadableState* state) +{ +    VM& vm = lexicalGlobalObject->vm(); +    auto throwScope = DECLARE_THROW_SCOPE(vm); + +    auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); +    auto callData = JSC::getCallData(read); +    if (callData.type == CallData::Type::None) { +        throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); +        return; +    } +    MarkedArgumentBuffer args; + +    while (state->m_flowing > 0) { +        JSValue ret = JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); +        if (ret.isNull()) +            break; +    } +} + +static JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_); +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ +    VM& vm = lexicalGlobalObject->vm(); +    auto throwScope = DECLARE_THROW_SCOPE(vm); + +    if (callFrame->argumentCount() < 2) { +        throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); +        return JSValue::encode(jsUndefined()); +    } + +    JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); +    RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); +    JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); +    if (!state) { +        throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); +        return JSValue::encode(jsUndefined()); +    } + +    if (!state->m_reading) { +        // stream.read(0) +        auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); +        auto callData = JSC::getCallData(read); +        if (callData.type == CallData::Type::None) { +            throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); +            return JSValue::encode(jsUndefined()); +        } +        MarkedArgumentBuffer args; +        args.append(jsNumber(0)); +        JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); +    } + +    state->m_resumeScheduled = true; +    // stream.emit('resume') +    auto eventType = Identifier::fromString(vm, "resume"_s); +    MarkedArgumentBuffer args; +    auto emitter = jsDynamicCast<JSEventEmitter*>(stream); +    if (!emitter) { +        throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s); +        return JSValue::encode(jsUndefined()); +    } +    emitter->wrapped().emitForBindings(eventType, args); + +    flow(lexicalGlobalObject, stream, state); + +    if (state->m_flowing && !state->m_reading) { +        // stream.read(0) +        auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); +        auto callData = JSC::getCallData(read); +        if (callData.type == CallData::Type::None) { +            throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); +            return JSValue::encode(jsUndefined()); +        } +        MarkedArgumentBuffer args; +        args.append(jsNumber(0)); +        JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); +    } +    RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ +    VM& vm = lexicalGlobalObject->vm(); +    auto throwScope = DECLARE_THROW_SCOPE(vm); + +    if (callFrame->argumentCount() < 2) { +        throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); +        return JSValue::encode(jsUndefined()); +    } + +    JSValue streamVal = callFrame->uncheckedArgument(0); +    JSValue stateVal = callFrame->uncheckedArgument(1); + +    JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); +    if (!state) { +        throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); +        return JSValue::encode(jsUndefined()); +    } + +    if (!state->m_resumeScheduled) { +        state->m_resumeScheduled = true; +        // make this static? +        JSFunction* resume_ = JSC::JSFunction::create( +            vm, lexicalGlobalObject, 0, "resume_"_s, jsReadable_resume_, ImplementationVisibility::Public); + +        lexicalGlobalObject->queueMicrotask(resume_, streamVal, stateVal, JSValue{}, JSValue{}); +    } +    RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_emitReadable_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ +    VM& vm = lexicalGlobalObject->vm(); +    auto throwScope = DECLARE_THROW_SCOPE(vm); + +    if (callFrame->argumentCount() < 2) { +        throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); +        return JSValue::encode(jsUndefined()); +    } + +    JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); +    RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); +    JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); +    if (!state) { +        throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); +        return JSValue::encode(jsUndefined()); +    } + +    JSValue errored = state->getDirect(vm, JSC::Identifier::fromString(vm, "errored"_s)); +    if (!state->m_destroyed && !errored.toBoolean(lexicalGlobalObject) && (state->m_length || state->m_ended)) { +        // stream.emit('readable') +        auto eventType = Identifier::fromString(vm, "readable"_s); +        MarkedArgumentBuffer args; +        auto emitter = jsDynamicCast<JSEventEmitter*>(stream); +        if (!emitter) { +            throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s); +            return JSValue::encode(jsUndefined()); +        } +        emitter->wrapped().emitForBindings(eventType, args); + +        state->m_emittedReadable = false; +    } + +    state->m_needReadable = state->m_flowing <= 0 && !state->m_ended && state->m_length <= state->m_highWaterMark; +    flow(lexicalGlobalObject, stream, state); +    RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +} // namespace WebCore diff --git a/src/bun.js/bindings/JSReadableHelper.h b/src/bun.js/bindings/JSReadableHelper.h new file mode 100644 index 000000000..9a60fc301 --- /dev/null +++ b/src/bun.js/bindings/JSReadableHelper.h @@ -0,0 +1,11 @@ +#pragma once + +#include "root.h" + +namespace WebCore { + +JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore); +JSC_DECLARE_HOST_FUNCTION(jsReadable_resume); +JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_); + +} // namespace WebCore diff --git a/src/bun.js/bindings/JSReadableState.cpp b/src/bun.js/bindings/JSReadableState.cpp index fe19449ff..bf1bc1155 100644 --- a/src/bun.js/bindings/JSReadableState.cpp +++ b/src/bun.js/bindings/JSReadableState.cpp @@ -14,8 +14,6 @@ namespace WebCore {  using namespace JSC;  static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_pipesCount); -static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_paused); -static JSC_DECLARE_CUSTOM_GETTER(setJSReadableState_paused);  int64_t getHighWaterMark(JSC::VM& vm, JSC::JSGlobalObject* globalObject, bool isDuplex, JSObject* options)  { @@ -52,79 +50,31 @@ void JSReadableState::finishCreation(JSC::VM& vm, JSC::JSGlobalObject* globalObj      }      putDirect(vm, WTFMove(objectModeIdent), JSC::jsBoolean(objectMode)); -    int64_t highWaterMark = objectMode ? 16 : 16 * 1024;  // default value +    m_highWaterMark = objectMode ? 16 : 16 * 1024;  // default value      if (options != nullptr) {          int64_t customHightWaterMark = getHighWaterMark(vm, globalObject, isDuplex, options);          if (customHightWaterMark >= 0) -            highWaterMark = customHightWaterMark; +            m_highWaterMark = customHightWaterMark;      } -    putDirect(vm, JSC::Identifier::fromString(vm, "highWaterMark"_s), JSC::jsNumber(highWaterMark));      putDirect(vm, JSC::Identifier::fromString(vm, "buffer"_s), JSBufferList::create(          vm, globalObject, reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBufferListStructure())); -    putDirect(vm, JSC::Identifier::fromString(vm, "length"_s), JSC::jsNumber(0));      putDirect(vm, JSC::Identifier::fromString(vm, "pipes"_s), JSC::constructEmptyArray(globalObject, nullptr, 0)); -    putDirect(vm, JSC::Identifier::fromString(vm, "flowing"_s), JSC::jsNull()); -    putDirect(vm, JSC::Identifier::fromString(vm, "ended"_s), JSC::jsBoolean(false)); -    putDirect(vm, JSC::Identifier::fromString(vm, "endEmitted"_s), JSC::jsBoolean(false)); -    // Stream is still being constructed and cannot be -    // destroyed until construction finished or failed. -    // Async construction is opt in, therefore we start as -    // constructed. -    putDirect(vm, JSC::Identifier::fromString(vm, "reading"_s), JSC::jsBoolean(false)); - -    // A flag to be able to tell if the event 'readable'/'data' is emitted -    // immediately, or on a later tick.  We set this to true at first, because -    // any actions that shouldn't happen until "later" should generally also -    // not happen before the first read call. -    putDirect(vm, JSC::Identifier::fromString(vm, "constructed"_s), JSC::jsBoolean(true)); - -    // Whenever we return null, then we set a flag to say -    // that we're awaiting a 'readable' event emission. -    putDirect(vm, JSC::Identifier::fromString(vm, "sync"_s), JSC::jsBoolean(true)); - -    putDirect(vm, JSC::Identifier::fromString(vm, "needReadable"_s), JSC::jsBoolean(false)); -    putDirect(vm, JSC::Identifier::fromString(vm, "emittedReadable"_s), JSC::jsBoolean(false)); -    putDirect(vm, JSC::Identifier::fromString(vm, "readableListening"_s), JSC::jsBoolean(false)); -    putDirect(vm, JSC::Identifier::fromString(vm, "resumeScheduled"_s), JSC::jsBoolean(false)); - -    // Should close be emitted on destroy. Defaults to true. -    putDirect(vm, JSC::Identifier::fromString(vm, "errorEmitted"_s), JSC::jsBoolean(false));      if (options == nullptr) { -        // Should .destroy() be called after 'end' (and potentially 'finish'). -        putDirect(vm, JSC::Identifier::fromString(vm, "emitClose"_s), JSC::jsBoolean(false)); -        // Has it been destroyed. -        putDirect(vm, JSC::Identifier::fromString(vm, "autoDestroy"_s), JSC::jsBoolean(false)); +        m_emitClose = false; +        m_autoDestroy = false;      } else { -        // Should .destroy() be called after 'end' (and potentially 'finish'). -        auto emitCloseIdent = JSC::Identifier::fromString(vm, "emitClose"_s); -        JSC::JSValue emitCloseVal = options->getDirect(vm, emitCloseIdent); -        putDirect(vm, WTFMove(emitCloseIdent), JSC::jsBoolean(!emitCloseVal.isBoolean() || emitCloseVal.toBoolean(globalObject))); +        JSC::JSValue emitCloseVal = options->getDirect(vm, JSC::Identifier::fromString(vm, "emitClose"_s)); +        m_emitClose = !emitCloseVal.isBoolean() || emitCloseVal.toBoolean(globalObject);          // Has it been destroyed. -        auto autoDestroyIdent = JSC::Identifier::fromString(vm, "autoDestroy"_s); -        JSC::JSValue autoDestroyVal = options->getDirect(vm, autoDestroyIdent); -        putDirect(vm, WTFMove(autoDestroyIdent), JSC::jsBoolean(!autoDestroyVal.isBoolean() || autoDestroyVal.toBoolean(globalObject))); +        JSC::JSValue autoDestroyVal = options->getDirect(vm, JSC::Identifier::fromString(vm, "autoDestroy"_s)); +        m_autoDestroy = !autoDestroyVal.isBoolean() || autoDestroyVal.toBoolean(globalObject);      } -    // Indicates whether the stream has errored. When true no further -    // _read calls, 'data' or 'readable' events should occur. This is needed -    // since when autoDestroy is disabled we need a way to tell whether the -    // stream has failed. -    putDirect(vm, JSC::Identifier::fromString(vm, "destroyed"_s), JSC::jsBoolean(false)); -      // Indicates whether the stream has finished destroying.      putDirect(vm, JSC::Identifier::fromString(vm, "errored"_s), JSC::jsNull()); -    // True if close has been emitted or would have been emitted -    // depending on emitClose. -    putDirect(vm, JSC::Identifier::fromString(vm, "closed"_s), JSC::jsBoolean(false)); - -    // Crypto is kind of old and crusty.  Historically, its default string -    // encoding is 'binary' so we have to make this configurable. -    // Everything else in the universe uses 'utf8', though. -    putDirect(vm, JSC::Identifier::fromString(vm, "closeEmitted"_s), JSC::jsBoolean(false)); -      // Ref the piped dest which we need a drain event on it      // type: null | Writable | Set<Writable>.      auto defaultEncodingIdent = JSC::Identifier::fromString(vm, "defaultEncoding"_s); @@ -140,11 +90,6 @@ void JSReadableState::finishCreation(JSC::VM& vm, JSC::JSGlobalObject* globalObj      }      putDirect(vm, JSC::Identifier::fromString(vm, "awaitDrainWriters"_s), JSC::jsNull()); -    // If true, a maybeReadMore has been scheduled. -    putDirect(vm, JSC::Identifier::fromString(vm, "multiAwaitDrain"_s), JSC::jsBoolean(false)); - -    putDirect(vm, JSC::Identifier::fromString(vm, "readingMore"_s), JSC::jsBoolean(false)); -    putDirect(vm, JSC::Identifier::fromString(vm, "dataEmitted"_s), JSC::jsBoolean(false));      auto decoderIdent = JSC::Identifier::fromString(vm, "decoder"_s);      auto encodingIdent = JSC::Identifier::fromString(vm, "encoding"_s); @@ -201,38 +146,133 @@ JSC_DEFINE_CUSTOM_GETTER(jsReadableState_pipesCount, (JSGlobalObject * lexicalGl      RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNumber(pipes->length())));  } -JSC_DEFINE_CUSTOM_GETTER(jsReadableState_paused, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) -{ -    auto& vm = JSC::getVM(lexicalGlobalObject); -    auto throwScope = DECLARE_THROW_SCOPE(vm); -    JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); -    if (!state) { -        RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); +#define JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(NAME) \ +    static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_##NAME); \ +    JSC_DEFINE_CUSTOM_GETTER(jsReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) \ +    { \ +        auto& vm = JSC::getVM(lexicalGlobalObject); \ +        auto throwScope = DECLARE_THROW_SCOPE(vm); \ +        JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ +        if (!state) { \ +            RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); \ +        } \ +        if (state->m_##NAME == 0) \ +            RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNull())); \ +        RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsBoolean(state->m_##NAME > 0))); \ +    } \ +    static JSC_DECLARE_CUSTOM_SETTER(setJSReadableState_##NAME); \ +    JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) \ +    { \ +        auto& vm = JSC::getVM(lexicalGlobalObject); \ +        auto throwScope = DECLARE_THROW_SCOPE(vm); \ +        JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ +        if (!state) { \ +            RETURN_IF_EXCEPTION(throwScope, false); \ +        } \ +        auto value = JSC::JSValue::decode(encodedValue); \ +        state->m_##NAME = value.isNull() ? 0 : value.toBoolean(lexicalGlobalObject) ? 1 : -1; \ +        RELEASE_AND_RETURN(throwScope, true); \      } -    if (state->m_paused == 0) -        RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsNull())); -    RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::jsBoolean(state->m_paused > 0))); -} -JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_paused, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) -{ -    auto& vm = JSC::getVM(lexicalGlobalObject); -    auto throwScope = DECLARE_THROW_SCOPE(vm); -    JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); -    if (!state) { -        RETURN_IF_EXCEPTION(throwScope, false); +JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(paused) +JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER(flowing) + +#undef JSReadableState_NULLABLE_BOOLEAN_GETTER_SETTER + +#define JSReadableState_GETTER_SETTER(NAME, TYPE)                                                                                      \ +    static JSC_DECLARE_CUSTOM_GETTER(jsReadableState_##NAME); \ +    JSC_DEFINE_CUSTOM_GETTER(jsReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName attributeName)) \ +    {                                                                                                                                              \ +        auto& vm = JSC::getVM(lexicalGlobalObject); \ +        auto throwScope = DECLARE_THROW_SCOPE(vm); \ +        JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ +        if (!state) { \ +            RETURN_IF_EXCEPTION(throwScope, JSC::JSValue::encode(JSC::jsUndefined())); \ +        } \ +        RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(JSC::js##TYPE(state->m_##NAME))); \ +    } \ +    \ +    static JSC_DECLARE_CUSTOM_SETTER(setJSReadableState_##NAME); \ +    JSC_DEFINE_CUSTOM_SETTER(setJSReadableState_##NAME, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, EncodedJSValue encodedValue, PropertyName attributeName)) \ +    { \ +        auto& vm = JSC::getVM(lexicalGlobalObject); \ +        auto throwScope = DECLARE_THROW_SCOPE(vm); \ +        JSReadableState* state = JSC::jsDynamicCast<JSReadableState*>(JSValue::decode(thisValue)); \ +        if (!state) { \ +            RETURN_IF_EXCEPTION(throwScope, false); \ +        } \ +        state->m_##NAME = JSC::JSValue::decode(encodedValue).to##TYPE(lexicalGlobalObject); \ +        RETURN_IF_EXCEPTION(throwScope, false); \ +        RELEASE_AND_RETURN(throwScope, true); \      } -    state->m_paused = JSC::JSValue::decode(encodedValue).toBoolean(lexicalGlobalObject) ? 1 : -1; -    RELEASE_AND_RETURN(throwScope, true); -} + +#define JSReadableState_BOOLEAN_GETTER_SETTER(NAME)       \ +    JSReadableState_GETTER_SETTER(NAME, Boolean) + +#define JSReadableState_NUMBER_GETTER_SETTER(NAME)        \ +    JSReadableState_GETTER_SETTER(NAME, Number) + +JSReadableState_BOOLEAN_GETTER_SETTER(ended) +JSReadableState_BOOLEAN_GETTER_SETTER(endEmitted) +JSReadableState_BOOLEAN_GETTER_SETTER(reading) +JSReadableState_BOOLEAN_GETTER_SETTER(constructed) +JSReadableState_BOOLEAN_GETTER_SETTER(sync) +JSReadableState_BOOLEAN_GETTER_SETTER(needReadable) +JSReadableState_BOOLEAN_GETTER_SETTER(emittedReadable) +JSReadableState_BOOLEAN_GETTER_SETTER(readableListening) +JSReadableState_BOOLEAN_GETTER_SETTER(resumeScheduled) +JSReadableState_BOOLEAN_GETTER_SETTER(errorEmitted) +JSReadableState_BOOLEAN_GETTER_SETTER(emitClose) +JSReadableState_BOOLEAN_GETTER_SETTER(autoDestroy) +JSReadableState_BOOLEAN_GETTER_SETTER(destroyed) +JSReadableState_BOOLEAN_GETTER_SETTER(closed) +JSReadableState_BOOLEAN_GETTER_SETTER(closeEmitted) +JSReadableState_BOOLEAN_GETTER_SETTER(multiAwaitDrain) +JSReadableState_BOOLEAN_GETTER_SETTER(readingMore) +JSReadableState_BOOLEAN_GETTER_SETTER(dataEmitted) + +JSReadableState_NUMBER_GETTER_SETTER(length) +JSReadableState_NUMBER_GETTER_SETTER(highWaterMark) + +#undef JSReadableState_NUMBER_GETTER_SETTER +#undef JSReadableState_BOOLEAN_GETTER_SETTER +#undef JSReadableState_GETTER_SETTER + +#define JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(NAME) \ +    { #NAME ""_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_##NAME, setJSReadableState_##NAME } }  /* Hash table for prototype */  static const HashTableValue JSReadableStatePrototypeTableValues[]      = {            { "pipesCount"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_pipesCount, 0 } }, -          { "paused"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsReadableState_paused, setJSReadableState_paused } }, +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(paused), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(flowing), +           +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(ended), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(endEmitted), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(reading), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(constructed), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(sync), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(needReadable), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(emittedReadable), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(readableListening), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(resumeScheduled), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(errorEmitted), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(emitClose), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(autoDestroy), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(destroyed), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(closed), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(closeEmitted), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(multiAwaitDrain), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(readingMore), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(dataEmitted), + +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(length), +          JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE(highWaterMark),        }; +#undef JSReadableState_GETTER_SETTER_HASH_TABLE_VALUE +  void JSReadableStatePrototype::finishCreation(VM& vm, JSC::JSGlobalObject* globalThis)  {      Base::finishCreation(vm); diff --git a/src/bun.js/bindings/JSReadableState.h b/src/bun.js/bindings/JSReadableState.h index fd78f0411..d06a9dbc3 100644 --- a/src/bun.js/bindings/JSReadableState.h +++ b/src/bun.js/bindings/JSReadableState.h @@ -46,7 +46,30 @@ public:      static void destroy(JSCell*) {}      // 0 for null, 1 for true, -1 for false -    int8_t m_paused; +    int8_t m_paused = 0; +    int8_t m_flowing = 0; +    bool m_ended = false; +    bool m_endEmitted = false; +    bool m_reading = false; +    bool m_constructed = true; +    bool m_sync = true; +    bool m_needReadable = false; +    bool m_emittedReadable = false; +    bool m_readableListening = false; +    bool m_resumeScheduled = false; +    bool m_errorEmitted = false; +    // These 2 are initialized from options +    bool m_emitClose; +    bool m_autoDestroy; +    bool m_destroyed = false; +    bool m_closed = false; +    bool m_closeEmitted = false; +    bool m_multiAwaitDrain = false; +    bool m_readingMore = false; +    bool m_dataEmitted = false; + +    int64_t m_length = 0; +    int64_t m_highWaterMark;  };  class JSReadableStatePrototype : public JSC::JSNonFinalObject { diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 1cedb04cc..012ad2d66 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -89,6 +89,7 @@  #include "JSFetchHeaders.h"  #include "JSStringDecoder.h"  #include "JSReadableState.h" +#include "JSReadableHelper.h"  #include "Process.h" @@ -1042,6 +1043,15 @@ JSC:              auto* obj = constructEmptyObject(globalObject);              obj->putDirect(vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "BufferList"_s)), reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBufferList(), 0);              obj->putDirect(vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "ReadableState"_s)), reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSReadableState(), 0); +            obj->putDirect( +                vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "maybeReadMore"_s)), +                JSC::JSFunction::create(vm, globalObject, 0, "maybeReadMore"_s, jsReadable_maybeReadMore, ImplementationVisibility::Public), 0); +            obj->putDirect( +                vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "resume"_s)), +                JSC::JSFunction::create(vm, globalObject, 0, "resume"_s, jsReadable_resume, ImplementationVisibility::Public), 0); +            obj->putDirect( +                vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "emitReadable_"_s)), +                JSC::JSFunction::create(vm, globalObject, 0, "emitReadable_"_s, jsReadable_emitReadable_, ImplementationVisibility::Public), 0);              return JSValue::encode(obj);          } diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index fffb9d0cd..9bb74daf4 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -2485,10 +2485,12 @@ var require_readable = __commonJS({      var debug = require_util().debuglog("stream", (fn) => {        debug = fn;      }); -    var BufferList = -      globalThis[Symbol.for("Bun.lazy")]("bun:stream").BufferList; +    const { +      maybeReadMore, +      resume, +      emitReadable_, +    } = globalThis[Symbol.for("Bun.lazy")]("bun:stream");      var destroyImpl = require_destroy(); -    var { getHighWaterMark, getDefaultHighWaterMark } = require_state();      var {        aggregateTwoErrors,        codes: { @@ -2769,7 +2771,7 @@ var require_readable = __commonJS({        } else {          state.needReadable = false;          state.emittedReadable = true; -        emitReadable_(stream); +        emitReadable_(stream, state);        }      }      function emitReadable(stream) { @@ -2779,39 +2781,8 @@ var require_readable = __commonJS({        if (!state.emittedReadable) {          debug("emitReadable", state.flowing);          state.emittedReadable = true; -        runOnNextTick(emitReadable_, stream); -      } -    } -    function emitReadable_(stream) { -      const state = stream._readableState; -      debug("emitReadable_", state.destroyed, state.length, state.ended); -      if (!state.destroyed && !state.errored && (state.length || state.ended)) { -        stream.emit("readable"); -        state.emittedReadable = false; -      } -      state.needReadable = -        !state.flowing && !state.ended && state.length <= state.highWaterMark; -      flow(stream); -    } -    function maybeReadMore(stream, state) { -      if (!state.readingMore && state.constructed) { -        state.readingMore = true; -        runOnNextTick(maybeReadMore_, stream, state); -      } -    } -    function maybeReadMore_(stream, state) { -      while ( -        !state.reading && -        !state.ended && -        (state.length < state.highWaterMark || -          (state.flowing && state.length === 0)) -      ) { -        const len = state.length; -        debug("maybeReadMore read 0"); -        stream.read(0); -        if (len === state.length) break; +        runOnNextTick(emitReadable_, stream, state);        } -      state.readingMore = false;      }      Readable.prototype._read = function (n) {        throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); @@ -3041,22 +3012,6 @@ var require_readable = __commonJS({        state.paused = false;        return this;      }; -    function resume(stream, state) { -      if (!state.resumeScheduled) { -        state.resumeScheduled = true; -        runOnNextTick(resume_, stream, state); -      } -    } -    function resume_(stream, state) { -      debug("resume", state.reading); -      if (!state.reading) { -        stream.read(0); -      } -      state.resumeScheduled = false; -      stream.emit("resume"); -      flow(stream); -      if (state.flowing && !state.reading) stream.read(0); -    }      Readable.prototype.pause = function () {        debug("call pause flowing=%j", this._readableState.flowing);        if (this._readableState.flowing !== false) { | 
