diff options
author | 2022-09-09 10:52:27 +0800 | |
---|---|---|
committer | 2022-09-08 19:52:27 -0700 | |
commit | 8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3 (patch) | |
tree | b7af0511edfb0ca115a4f28f43a8be0f14ae63d8 /src/bun.js/bindings/JSReadableHelper.cpp | |
parent | a3cc9aaf6f66bac85e6318d3e09c0f1961d18210 (diff) | |
download | bun-8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3.tar.gz bun-8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3.tar.zst bun-8d8b72cf3f7119f0bc018513d89eca5f8ec44ab3.zip |
Add native helper functions for Readable and convert ReadableState properties to getter/setter (#1218)
Diffstat (limited to 'src/bun.js/bindings/JSReadableHelper.cpp')
-rw-r--r-- | src/bun.js/bindings/JSReadableHelper.cpp | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/src/bun.js/bindings/JSReadableHelper.cpp b/src/bun.js/bindings/JSReadableHelper.cpp new file mode 100644 index 000000000..ef17982d4 --- /dev/null +++ b/src/bun.js/bindings/JSReadableHelper.cpp @@ -0,0 +1,228 @@ +#include "JSReadableHelper.h" +#include "JSReadableState.h" +#include "JSBufferList.h" +#include "JSBuffer.h" +#include "JSEventEmitter.h" +#include "JavaScriptCore/Lookup.h" +#include "JavaScriptCore/ObjectConstructor.h" +#include "ZigGlobalObject.h" +#include "JSDOMOperation.h" +#include "JSDOMAttribute.h" +#include "headers.h" +#include "JSDOMConvertEnumeration.h" + +namespace WebCore { +using namespace JSC; + +static JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore_); +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return JSValue::encode(jsUndefined()); + } + MarkedArgumentBuffer args; + args.append(jsNumber(0)); + + while ( + !state->m_reading && + !state->m_ended && + (state->m_length < state->m_highWaterMark || (state->m_flowing > 0 && state->m_length == 0))) { + int64_t len = state->m_length; + + JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + + if (len == state->m_length) + break; + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSValue streamVal = callFrame->uncheckedArgument(0); + JSValue stateVal = callFrame->uncheckedArgument(1); + + // make this static? + JSFunction* maybeReadMore_ = JSC::JSFunction::create( + vm, lexicalGlobalObject, 0, "maybeReadMore_"_s, jsReadable_maybeReadMore_, ImplementationVisibility::Public); + + lexicalGlobalObject->queueMicrotask(maybeReadMore_, streamVal, stateVal, JSValue{}, JSValue{}); + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +void flow(JSGlobalObject* lexicalGlobalObject, JSObject* stream, JSReadableState* state) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return; + } + MarkedArgumentBuffer args; + + while (state->m_flowing > 0) { + JSValue ret = JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + if (ret.isNull()) + break; + } +} + +static JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_); +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + if (!state->m_reading) { + // stream.read(0) + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return JSValue::encode(jsUndefined()); + } + MarkedArgumentBuffer args; + args.append(jsNumber(0)); + JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + } + + state->m_resumeScheduled = true; + // stream.emit('resume') + auto eventType = Identifier::fromString(vm, "resume"_s); + MarkedArgumentBuffer args; + auto emitter = jsDynamicCast<JSEventEmitter*>(stream); + if (!emitter) { + throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s); + return JSValue::encode(jsUndefined()); + } + emitter->wrapped().emitForBindings(eventType, args); + + flow(lexicalGlobalObject, stream, state); + + if (state->m_flowing && !state->m_reading) { + // stream.read(0) + auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto callData = JSC::getCallData(read); + if (callData.type == CallData::Type::None) { + throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); + return JSValue::encode(jsUndefined()); + } + MarkedArgumentBuffer args; + args.append(jsNumber(0)); + JSC::call(lexicalGlobalObject, read, callData, JSValue(stream), args); + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSValue streamVal = callFrame->uncheckedArgument(0); + JSValue stateVal = callFrame->uncheckedArgument(1); + + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + if (!state->m_resumeScheduled) { + state->m_resumeScheduled = true; + // make this static? + JSFunction* resume_ = JSC::JSFunction::create( + vm, lexicalGlobalObject, 0, "resume_"_s, jsReadable_resume_, ImplementationVisibility::Public); + + lexicalGlobalObject->queueMicrotask(resume_, streamVal, stateVal, JSValue{}, JSValue{}); + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +JSC_DEFINE_HOST_FUNCTION(jsReadable_emitReadable_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + VM& vm = lexicalGlobalObject->vm(); + auto throwScope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(lexicalGlobalObject, throwScope, "Not enough arguments"_s); + return JSValue::encode(jsUndefined()); + } + + JSObject* stream = callFrame->uncheckedArgument(0).toObject(lexicalGlobalObject); + RETURN_IF_EXCEPTION(throwScope, JSValue::encode(jsUndefined())); + JSReadableState* state = jsDynamicCast<JSReadableState*>(callFrame->uncheckedArgument(1)); + if (!state) { + throwTypeError(lexicalGlobalObject, throwScope, "Second argument not ReadableState"_s); + return JSValue::encode(jsUndefined()); + } + + JSValue errored = state->getDirect(vm, JSC::Identifier::fromString(vm, "errored"_s)); + if (!state->m_destroyed && !errored.toBoolean(lexicalGlobalObject) && (state->m_length || state->m_ended)) { + // stream.emit('readable') + auto eventType = Identifier::fromString(vm, "readable"_s); + MarkedArgumentBuffer args; + auto emitter = jsDynamicCast<JSEventEmitter*>(stream); + if (!emitter) { + throwTypeError(lexicalGlobalObject, throwScope, "stream is not EventEmitter"_s); + return JSValue::encode(jsUndefined()); + } + emitter->wrapped().emitForBindings(eventType, args); + + state->m_emittedReadable = false; + } + + state->m_needReadable = state->m_flowing <= 0 && !state->m_ended && state->m_length <= state->m_highWaterMark; + flow(lexicalGlobalObject, stream, state); + RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); +} + +} // namespace WebCore |