diff options
author | 2023-03-30 23:46:13 -0500 | |
---|---|---|
committer | 2023-04-06 17:34:31 -0500 | |
commit | b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c (patch) | |
tree | 5288bad9164e679cb1dfdabcb8696188b28f62ca /src | |
parent | 91f02989dc0432079693c5781269a07973099067 (diff) | |
download | bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.tar.gz bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.tar.zst bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.zip |
wip(node:events): get async iter working, work through test flakiness
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/bindings/headers-cpp.h | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/headers.h | 2 | ||||
-rw-r--r-- | src/bun.js/builtins/cpp/NodeEventsBuiltins.cpp | 231 | ||||
-rw-r--r-- | src/bun.js/builtins/js/NodeEvents.js | 235 |
4 files changed, 269 insertions, 201 deletions
diff --git a/src/bun.js/bindings/headers-cpp.h b/src/bun.js/bindings/headers-cpp.h index cb352a76a..06a42eb57 100644 --- a/src/bun.js/bindings/headers-cpp.h +++ b/src/bun.js/bindings/headers-cpp.h @@ -1,4 +1,4 @@ -//-- AUTOGENERATED FILE -- 1679596954 +//-- AUTOGENERATED FILE -- 1680036484 // clang-format off #pragma once diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index f076b4719..a53938787 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -1,5 +1,5 @@ // clang-format off -//-- AUTOGENERATED FILE -- 1679596954 +//-- AUTOGENERATED FILE -- 1680036484 #pragma once #include <stddef.h> diff --git a/src/bun.js/builtins/cpp/NodeEventsBuiltins.cpp b/src/bun.js/builtins/cpp/NodeEventsBuiltins.cpp index 1e26a934b..6ae9863cb 100644 --- a/src/bun.js/builtins/cpp/NodeEventsBuiltins.cpp +++ b/src/bun.js/builtins/cpp/NodeEventsBuiltins.cpp @@ -51,38 +51,42 @@ namespace WebCore { const JSC::ConstructAbility s_nodeEventsOnAsyncIteratorCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_nodeEventsOnAsyncIteratorCodeConstructorKind = JSC::ConstructorKind::None; const JSC::ImplementationVisibility s_nodeEventsOnAsyncIteratorCodeImplementationVisibility = JSC::ImplementationVisibility::Public; -const int s_nodeEventsOnAsyncIteratorCodeLength = 4291; +const int s_nodeEventsOnAsyncIteratorCodeLength = 4024; static const JSC::Intrinsic s_nodeEventsOnAsyncIteratorCodeIntrinsic = JSC::NoIntrinsic; const char* const s_nodeEventsOnAsyncIteratorCode = "(function (emitter, event, options) {\n" \ " \"use strict\";\n" \ "\n" \ - " var AbortError = class AbortError extends Error {\n" \ - " constructor(message = \"The operation was aborted\", options = void 0) {\n" \ - " if (options !== void 0 && typeof options !== \"object\") {\n" \ - " throw new Error(`Invalid AbortError options:\\n" \ - "\\n" \ - "${JSON.stringify(options, null, 2)}`);\n" \ - " }\n" \ - " super(message, options);\n" \ - " this.code = \"ABORT_ERR\";\n" \ - " this.name = \"AbortError\";\n" \ - " }\n" \ - " };\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + "\n" \ + " //\n" \ + " var { Object, Number, console, Symbol } = globalThis;\n" \ + " //\n" \ "\n" \ - " var { AbortSignal, Object, Number } = globalThis;\n" \ + " function isUndefinedOrNull(value) {\n" \ + " return value === undefined || value === null;\n" \ + " }\n" \ "\n" \ - " if (@isUndefinedOrNull(options)) options = {};\n" \ + " if (isUndefinedOrNull(options)) options = {};\n" \ "\n" \ " //\n" \ " var signal = options.signal;\n" \ - " if (!@isUndefinedOrNull(signal) && !(signal instanceof AbortSignal))\n" \ - " @throwTypeError(\"options.signal must be an AbortSignal\");\n" \ + " //\n" \ + " //\n" \ "\n" \ - " if (signal?.aborted) {\n" \ - " //\n" \ - " throw new AbortError(@undefined, { cause: signal?.reason });\n" \ - " }\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ "\n" \ " var highWatermark = options.highWatermark ?? Number.MAX_SAFE_INTEGER;\n" \ " if (highWatermark < 1) \n" \ @@ -99,10 +103,11 @@ const char* const s_nodeEventsOnAsyncIteratorCode = " var error = null;\n" \ " var finished = false;\n" \ " var size = 0;\n" \ + " var listeners = [];\n" \ "\n" \ - " function abortListener() {\n" \ - " errorHandler(new AbortError(@undefined, { cause: signal?.reason }));\n" \ - " }\n" \ + " //\n" \ + " //\n" \ + " //\n" \ "\n" \ " function eventHandler(value) {\n" \ " if (unconsumedPromises.isEmpty()) {\n" \ @@ -112,116 +117,131 @@ const char* const s_nodeEventsOnAsyncIteratorCode = " emitter.pause();\n" \ " }\n" \ " unconsumedEvents.push(value);\n" \ - " } else unconsumedPromises.shift().@resolve.@call(@undefined, { value, done: false });\n" \ + " } else unconsumedPromises.shift().@resolve.@call(undefined, [value]);\n" \ " }\n" \ "\n" \ " function closeHandler() {\n" \ " removeAllListeners(listeners);\n" \ " finished = true;\n" \ - " var doneResult = { value: @undefined, done: true };\n" \ " while (!unconsumedPromises.isEmpty()) {\n" \ - " unconsumedPromises.shift().@resolve.@call(@undefined, doneResult);\n" \ + " unconsumedPromises.shift().@resolve.@call(undefined, [undefined]);\n" \ " }\n" \ " \n" \ - " return @createFulfilledPromise(doneResult);\n" \ + " return @createFulfilledPromise([undefined]);\n" \ " }\n" \ "\n" \ " function errorHandler(err) {\n" \ " if (unconsumedPromises.isEmpty()) error = err;\n" \ - " else unconsumedPromises.shift().@reject.@call(@undefined, err);\n" \ + " else unconsumedPromises.shift().@reject.@call(undefined, err);\n" \ " \n" \ " closeHandler();\n" \ " }\n" \ " \n" \ " function addEventListener(emitter, event, handler) {\n" \ " emitter.on(event, handler);\n" \ - " @arrayPush(listeners, emitter, event, handler);\n" \ + " listeners.push([emitter, event, handler]);\n" \ + " //\n" \ " }\n" \ " \n" \ " function removeAllListeners() {\n" \ + " var heldEmitter;\n" \ " while (listeners.length > 0) {\n" \ - " var [emitter, event, handler] = @arrayPop(listeners);\n" \ + " //\n" \ + " var entry = listeners.pop();\n" \ + " var [emitter, event, handler] = entry;\n" \ + " if (event === \"foo\") heldEmitter = emitter;\n" \ + " console.log(emitter, event, handler);\n" \ " emitter.off(event, handler);\n" \ " }\n" \ + " console.log(heldEmitter.listenerCount(\"foo\"));\n" \ " }\n" \ "\n" \ " var iterator = async function* NodeEventsOnAsyncIterator() {\n" \ " //\n" \ - " if (size) {\n" \ - " var value = unconsumedEvents.shift();\n" \ - " size--;\n" \ - " if (paused && size < lowWatermark) {\n" \ - " emitter.resume();\n" \ - " paused = false;\n" \ + " while (!finished) {\n" \ + " if (size) {\n" \ + " var values = [];\n" \ + " while (size) {\n" \ + " values.push(unconsumedEvents.shift());\n" \ + " size--;\n" \ + " if (paused && size < lowWatermark) {\n" \ + " emitter.resume();\n" \ + " paused = false;\n" \ + " break;\n" \ + " }\n" \ + " }\n" \ + " yield @createFulfilledPromise(values);\n" \ " }\n" \ - " yield @createFulfilledPromise({ value, done: false });\n" \ - " }\n" \ "\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " if (error) {\n" \ - " var p = @Promise.@reject(error);\n" \ " //\n" \ - " error = null;\n" \ - " yield p;\n" \ - " }\n" \ + " //\n" \ + " //\n" \ + " if (error) {\n" \ + " throw error;\n" \ + " }\n" \ "\n" \ - " //\n" \ - " if (finished) yield closeHandler();\n" \ + " //\n" \ + " if (finished) {\n" \ + " console.log(\"FINISHED\")\n" \ + " yield closeHandler();\n" \ + " break;\n" \ + " };\n" \ "\n" \ - " //\n" \ - " var nextEventPromiseCapability = @newPromiseCapability(@Promise);\n" \ - " unconsumedPromises.push(nextEventPromiseCapability);\n" \ - " yield nextEventPromiseCapability.@promise;\n" \ + " //\n" \ + " var nextEventPromiseCapability = @newPromiseCapability(@Promise);\n" \ + " unconsumedPromises.push(nextEventPromiseCapability);\n" \ + " yield nextEventPromiseCapability.@promise;\n" \ + " }\n" \ " };\n" \ "\n" \ " //\n" \ - " Object.defineProperties(iterator, {\n" \ - " \"throw\": {\n" \ - " value: (err) => {\n" \ - " //\n" \ - " if (err === undefined || err === null || !(err instanceof Error)) {\n" \ - " @throwTypeError(\"The argument must be an instance of Error\");\n" \ - " }\n" \ - " errorHandler(err);\n" \ - " },\n" \ - " },\n" \ - " \"return\": {\n" \ - " value: () => {\n" \ - " return closeHandler();\n" \ - " }\n" \ - " },\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " //\n" \ - " });\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ + " //\n" \ "\n" \ " //\n" \ - " var listeners = [];\n" \ " addEventListener(emitter, event, eventHandler);\n" \ " if (event !== \"error\" && typeof emitter.on === \"function\") {\n" \ " addEventListener(emitter, \"error\", errorHandler);\n" \ @@ -233,10 +253,23 @@ const char* const s_nodeEventsOnAsyncIteratorCode = " }\n" \ " }\n" \ "\n" \ - " if (signal)\n" \ - " signal.once(\"abort\", abortListener);\n" \ + " //\n" \ + " //\n" \ "\n" \ - " return iterator;\n" \ + " return {\n" \ + " throw: (err) => {\n" \ + " if (err === undefined || err === null || !(err instanceof Error)) {\n" \ + " @throwTypeError(\"The argument must be an instance of Error\");\n" \ + " }\n" \ + " errorHandler(err);\n" \ + " },\n" \ + " return: () => {\n" \ + " console.log(\"we're here\");\n" \ + " return closeHandler();\n" \ + " },\n" \ + " next: () => iterator.next(),\n" \ + " [Symbol.asyncIterator]: iterator,\n" \ + " };\n" \ "})\n" \ ; diff --git a/src/bun.js/builtins/js/NodeEvents.js b/src/bun.js/builtins/js/NodeEvents.js index b94d30725..02e944fde 100644 --- a/src/bun.js/builtins/js/NodeEvents.js +++ b/src/bun.js/builtins/js/NodeEvents.js @@ -26,30 +26,36 @@ function onAsyncIterator(emitter, event, options) { "use strict"; - var AbortError = class AbortError extends Error { - constructor(message = "The operation was aborted", options = void 0) { - if (options !== void 0 && typeof options !== "object") { - throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); - } - super(message, options); - this.code = "ABORT_ERR"; - this.name = "AbortError"; - } - }; - - var { AbortSignal, Object, Number } = globalThis; + // var AbortError = class AbortError extends Error { + // constructor(message = "The operation was aborted", options = void 0) { + // if (options !== void 0 && typeof options !== "object") { + // throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); + // } + // super(message, options); + // this.code = "ABORT_ERR"; + // this.name = "AbortError"; + // } + // }; + + // var { AbortSignal, Object, Number, console } = globalThis; + var { Object, Number, console, Symbol } = globalThis; + // console.log("AbortSignal", AbortSignal); + + function isUndefinedOrNull(value) { + return value === undefined || value === null; + } - if (@isUndefinedOrNull(options)) options = {}; + if (isUndefinedOrNull(options)) options = {}; // Parameters validation var signal = options.signal; - if (!@isUndefinedOrNull(signal) && !(signal instanceof AbortSignal)) - @throwTypeError("options.signal must be an AbortSignal"); + // if (!isUndefinedOrNull(signal) && !(signal instanceof AbortSignal)) + // @throwTypeError("options.signal must be an AbortSignal"); - if (signal?.aborted) { - // TODO: Make this a builtin - throw new AbortError(@undefined, { cause: signal?.reason }); - } + // if (signal?.aborted) { + // // TODO: Make this a builtin + // throw new AbortError(undefined, { cause: signal?.reason }); + // } var highWatermark = options.highWatermark ?? Number.MAX_SAFE_INTEGER; if (highWatermark < 1) @@ -66,10 +72,11 @@ function onAsyncIterator(emitter, event, options) { var error = null; var finished = false; var size = 0; + var listeners = []; - function abortListener() { - errorHandler(new AbortError(@undefined, { cause: signal?.reason })); - } + // function abortListener() { + // errorHandler(new AbortError(undefined, { cause: signal?.reason })); + // } function eventHandler(value) { if (unconsumedPromises.isEmpty()) { @@ -79,116 +86,131 @@ function onAsyncIterator(emitter, event, options) { emitter.pause(); } unconsumedEvents.push(value); - } else unconsumedPromises.shift().@resolve.@call(@undefined, { value, done: false }); + } else unconsumedPromises.shift().@resolve.@call(undefined, [value]); } function closeHandler() { removeAllListeners(listeners); finished = true; - var doneResult = { value: @undefined, done: true }; while (!unconsumedPromises.isEmpty()) { - unconsumedPromises.shift().@resolve.@call(@undefined, doneResult); + unconsumedPromises.shift().@resolve.@call(undefined, [undefined]); } - return @createFulfilledPromise(doneResult); + return @createFulfilledPromise([undefined]); } function errorHandler(err) { if (unconsumedPromises.isEmpty()) error = err; - else unconsumedPromises.shift().@reject.@call(@undefined, err); + else unconsumedPromises.shift().@reject.@call(undefined, err); closeHandler(); } function addEventListener(emitter, event, handler) { emitter.on(event, handler); - @arrayPush(listeners, emitter, event, handler); + listeners.push([emitter, event, handler]); + // @arrayPush(listeners, [emitter, event, handler]); } function removeAllListeners() { + var heldEmitter; while (listeners.length > 0) { - var [emitter, event, handler] = @arrayPop(listeners); + // var [emitter, event, handler] = @arrayPop(listeners); + var entry = listeners.pop(); + var [emitter, event, handler] = entry; + if (event === "foo") heldEmitter = emitter; + console.log(emitter, event, handler); emitter.off(event, handler); } + console.log(heldEmitter.listenerCount("foo")); } var iterator = async function* NodeEventsOnAsyncIterator() { // First, we consume all unread events - if (size) { - var value = unconsumedEvents.shift(); - size--; - if (paused && size < lowWatermark) { - emitter.resume(); - paused = false; + while (!finished) { + if (size) { + var values = []; + while (size) { + values.push(unconsumedEvents.shift()); + size--; + if (paused && size < lowWatermark) { + emitter.resume(); + paused = false; + break; + } + } + yield @createFulfilledPromise(values); } - yield @createFulfilledPromise({ value, done: false }); - } - - // Then we error, if an error happened - // This happens one time if at all, because after 'error' - // we stop listening - if (error) { - var p = @Promise.@reject(error); - // Only the first element errors - error = null; - yield p; - } - // If the iterator is finished, resolve to done - if (finished) yield closeHandler(); + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error) { + throw error; + } - // Wait until an event happens - var nextEventPromiseCapability = @newPromiseCapability(@Promise); - unconsumedPromises.push(nextEventPromiseCapability); - yield nextEventPromiseCapability.@promise; + // If the iterator is finished, break + if (finished) { + console.log("FINISHED") + yield closeHandler(); + break; + }; + + // Wait until an event happens + var nextEventPromiseCapability = @newPromiseCapability(@Promise); + unconsumedPromises.push(nextEventPromiseCapability); + yield nextEventPromiseCapability.@promise; + } }; - // TODO: Use builtin - Object.defineProperties(iterator, { - "throw": { - value: (err) => { - // TODO: Use Error builtin? - if (err === undefined || err === null || !(err instanceof Error)) { - @throwTypeError("The argument must be an instance of Error"); - } - errorHandler(err); - }, - }, - "return": { - value: () => { - return closeHandler(); - } - }, - // [kWatermarkData]: { - // /** - // * The current queue size - // */ - // get size() { - // return size; - // }, - // /** - // * The low watermark. The emitter is resumed every time size is lower than it - // */ - // get low() { - // return lowWatermark; - // }, - // /** - // * The high watermark. The emitter is paused every time size is higher than it - // */ - // get high() { - // return highWatermark; - // }, - // /** - // * It checks whether the emitter is paused by the watermark controller or not - // */ - // get isPaused() { - // return paused; - // }, - // }, - }); + // // TODO: Use builtin + // Object.defineProperties(iterator, { + // "throw": { + // value: (err) => { + // // TODO: Use Error builtin? + // if (err === undefined || err === null || !(err instanceof Error)) { + // @throwTypeError("The argument must be an instance of Error"); + // } + // errorHandler(err); + // }, + // }, + // "return": { + // value: () => { + // return closeHandler(); + // } + // }, + // [Symbol.asyncIterator]: { + // value: () => iterator, + // }, + // // [kWatermarkData]: { + // // /** + // // * The current queue size + // // */ + // // get size() { + // // return size; + // // }, + // // /** + // // * The low watermark. The emitter is resumed every time size is lower than it + // // */ + // // get low() { + // // return lowWatermark; + // // }, + // // /** + // // * The high watermark. The emitter is paused every time size is higher than it + // // */ + // // get high() { + // // return highWatermark; + // // }, + // // /** + // // * It checks whether the emitter is paused by the watermark controller or not + // // */ + // // get isPaused() { + // // return paused; + // // }, + // // }, + // }); // Adding event handlers - var listeners = []; addEventListener(emitter, event, eventHandler); if (event !== "error" && typeof emitter.on === "function") { addEventListener(emitter, "error", errorHandler); @@ -200,8 +222,21 @@ function onAsyncIterator(emitter, event, options) { } } - if (signal) - signal.once("abort", abortListener); + // if (signal) + // signal.once("abort", abortListener); - return iterator; + return { + throw: (err) => { + if (err === undefined || err === null || !(err instanceof Error)) { + @throwTypeError("The argument must be an instance of Error"); + } + errorHandler(err); + }, + return: () => { + console.log("we're here"); + return closeHandler(); + }, + next: () => iterator.next(), + [Symbol.asyncIterator]: iterator, + }; } |