diff options
author | 2023-03-30 23:46:13 -0500 | |
---|---|---|
committer | 2023-04-06 17:34:31 -0500 | |
commit | b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c (patch) | |
tree | 5288bad9164e679cb1dfdabcb8696188b28f62ca /src/bun.js/builtins/js/NodeEvents.js | |
parent | 91f02989dc0432079693c5781269a07973099067 (diff) | |
download | bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.tar.gz bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.tar.zst bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.zip |
wip(node:events): get async iter working, work through test flakiness
Diffstat (limited to 'src/bun.js/builtins/js/NodeEvents.js')
-rw-r--r-- | src/bun.js/builtins/js/NodeEvents.js | 235 |
1 files changed, 135 insertions, 100 deletions
diff --git a/src/bun.js/builtins/js/NodeEvents.js b/src/bun.js/builtins/js/NodeEvents.js index b94d30725..02e944fde 100644 --- a/src/bun.js/builtins/js/NodeEvents.js +++ b/src/bun.js/builtins/js/NodeEvents.js @@ -26,30 +26,36 @@ function onAsyncIterator(emitter, event, options) { "use strict"; - var AbortError = class AbortError extends Error { - constructor(message = "The operation was aborted", options = void 0) { - if (options !== void 0 && typeof options !== "object") { - throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); - } - super(message, options); - this.code = "ABORT_ERR"; - this.name = "AbortError"; - } - }; - - var { AbortSignal, Object, Number } = globalThis; + // var AbortError = class AbortError extends Error { + // constructor(message = "The operation was aborted", options = void 0) { + // if (options !== void 0 && typeof options !== "object") { + // throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); + // } + // super(message, options); + // this.code = "ABORT_ERR"; + // this.name = "AbortError"; + // } + // }; + + // var { AbortSignal, Object, Number, console } = globalThis; + var { Object, Number, console, Symbol } = globalThis; + // console.log("AbortSignal", AbortSignal); + + function isUndefinedOrNull(value) { + return value === undefined || value === null; + } - if (@isUndefinedOrNull(options)) options = {}; + if (isUndefinedOrNull(options)) options = {}; // Parameters validation var signal = options.signal; - if (!@isUndefinedOrNull(signal) && !(signal instanceof AbortSignal)) - @throwTypeError("options.signal must be an AbortSignal"); + // if (!isUndefinedOrNull(signal) && !(signal instanceof AbortSignal)) + // @throwTypeError("options.signal must be an AbortSignal"); - if (signal?.aborted) { - // TODO: Make this a builtin - throw new AbortError(@undefined, { cause: signal?.reason }); - } + // if (signal?.aborted) { + // // TODO: Make this a builtin + // throw new AbortError(undefined, { cause: signal?.reason }); + // } var highWatermark = options.highWatermark ?? Number.MAX_SAFE_INTEGER; if (highWatermark < 1) @@ -66,10 +72,11 @@ function onAsyncIterator(emitter, event, options) { var error = null; var finished = false; var size = 0; + var listeners = []; - function abortListener() { - errorHandler(new AbortError(@undefined, { cause: signal?.reason })); - } + // function abortListener() { + // errorHandler(new AbortError(undefined, { cause: signal?.reason })); + // } function eventHandler(value) { if (unconsumedPromises.isEmpty()) { @@ -79,116 +86,131 @@ function onAsyncIterator(emitter, event, options) { emitter.pause(); } unconsumedEvents.push(value); - } else unconsumedPromises.shift().@resolve.@call(@undefined, { value, done: false }); + } else unconsumedPromises.shift().@resolve.@call(undefined, [value]); } function closeHandler() { removeAllListeners(listeners); finished = true; - var doneResult = { value: @undefined, done: true }; while (!unconsumedPromises.isEmpty()) { - unconsumedPromises.shift().@resolve.@call(@undefined, doneResult); + unconsumedPromises.shift().@resolve.@call(undefined, [undefined]); } - return @createFulfilledPromise(doneResult); + return @createFulfilledPromise([undefined]); } function errorHandler(err) { if (unconsumedPromises.isEmpty()) error = err; - else unconsumedPromises.shift().@reject.@call(@undefined, err); + else unconsumedPromises.shift().@reject.@call(undefined, err); closeHandler(); } function addEventListener(emitter, event, handler) { emitter.on(event, handler); - @arrayPush(listeners, emitter, event, handler); + listeners.push([emitter, event, handler]); + // @arrayPush(listeners, [emitter, event, handler]); } function removeAllListeners() { + var heldEmitter; while (listeners.length > 0) { - var [emitter, event, handler] = @arrayPop(listeners); + // var [emitter, event, handler] = @arrayPop(listeners); + var entry = listeners.pop(); + var [emitter, event, handler] = entry; + if (event === "foo") heldEmitter = emitter; + console.log(emitter, event, handler); emitter.off(event, handler); } + console.log(heldEmitter.listenerCount("foo")); } var iterator = async function* NodeEventsOnAsyncIterator() { // First, we consume all unread events - if (size) { - var value = unconsumedEvents.shift(); - size--; - if (paused && size < lowWatermark) { - emitter.resume(); - paused = false; + while (!finished) { + if (size) { + var values = []; + while (size) { + values.push(unconsumedEvents.shift()); + size--; + if (paused && size < lowWatermark) { + emitter.resume(); + paused = false; + break; + } + } + yield @createFulfilledPromise(values); } - yield @createFulfilledPromise({ value, done: false }); - } - - // Then we error, if an error happened - // This happens one time if at all, because after 'error' - // we stop listening - if (error) { - var p = @Promise.@reject(error); - // Only the first element errors - error = null; - yield p; - } - // If the iterator is finished, resolve to done - if (finished) yield closeHandler(); + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error) { + throw error; + } - // Wait until an event happens - var nextEventPromiseCapability = @newPromiseCapability(@Promise); - unconsumedPromises.push(nextEventPromiseCapability); - yield nextEventPromiseCapability.@promise; + // If the iterator is finished, break + if (finished) { + console.log("FINISHED") + yield closeHandler(); + break; + }; + + // Wait until an event happens + var nextEventPromiseCapability = @newPromiseCapability(@Promise); + unconsumedPromises.push(nextEventPromiseCapability); + yield nextEventPromiseCapability.@promise; + } }; - // TODO: Use builtin - Object.defineProperties(iterator, { - "throw": { - value: (err) => { - // TODO: Use Error builtin? - if (err === undefined || err === null || !(err instanceof Error)) { - @throwTypeError("The argument must be an instance of Error"); - } - errorHandler(err); - }, - }, - "return": { - value: () => { - return closeHandler(); - } - }, - // [kWatermarkData]: { - // /** - // * The current queue size - // */ - // get size() { - // return size; - // }, - // /** - // * The low watermark. The emitter is resumed every time size is lower than it - // */ - // get low() { - // return lowWatermark; - // }, - // /** - // * The high watermark. The emitter is paused every time size is higher than it - // */ - // get high() { - // return highWatermark; - // }, - // /** - // * It checks whether the emitter is paused by the watermark controller or not - // */ - // get isPaused() { - // return paused; - // }, - // }, - }); + // // TODO: Use builtin + // Object.defineProperties(iterator, { + // "throw": { + // value: (err) => { + // // TODO: Use Error builtin? + // if (err === undefined || err === null || !(err instanceof Error)) { + // @throwTypeError("The argument must be an instance of Error"); + // } + // errorHandler(err); + // }, + // }, + // "return": { + // value: () => { + // return closeHandler(); + // } + // }, + // [Symbol.asyncIterator]: { + // value: () => iterator, + // }, + // // [kWatermarkData]: { + // // /** + // // * The current queue size + // // */ + // // get size() { + // // return size; + // // }, + // // /** + // // * The low watermark. The emitter is resumed every time size is lower than it + // // */ + // // get low() { + // // return lowWatermark; + // // }, + // // /** + // // * The high watermark. The emitter is paused every time size is higher than it + // // */ + // // get high() { + // // return highWatermark; + // // }, + // // /** + // // * It checks whether the emitter is paused by the watermark controller or not + // // */ + // // get isPaused() { + // // return paused; + // // }, + // // }, + // }); // Adding event handlers - var listeners = []; addEventListener(emitter, event, eventHandler); if (event !== "error" && typeof emitter.on === "function") { addEventListener(emitter, "error", errorHandler); @@ -200,8 +222,21 @@ function onAsyncIterator(emitter, event, options) { } } - if (signal) - signal.once("abort", abortListener); + // if (signal) + // signal.once("abort", abortListener); - return iterator; + return { + throw: (err) => { + if (err === undefined || err === null || !(err instanceof Error)) { + @throwTypeError("The argument must be an instance of Error"); + } + errorHandler(err); + }, + return: () => { + console.log("we're here"); + return closeHandler(); + }, + next: () => iterator.next(), + [Symbol.asyncIterator]: iterator, + }; } |