diff options
Diffstat (limited to 'src/bun.js/builtins/js/NodeEvents.js')
-rw-r--r-- | src/bun.js/builtins/js/NodeEvents.js | 191 |
1 files changed, 71 insertions, 120 deletions
diff --git a/src/bun.js/builtins/js/NodeEvents.js b/src/bun.js/builtins/js/NodeEvents.js index 02e944fde..6c80f81c8 100644 --- a/src/bun.js/builtins/js/NodeEvents.js +++ b/src/bun.js/builtins/js/NodeEvents.js @@ -26,36 +26,35 @@ function onAsyncIterator(emitter, event, options) { "use strict"; - // var AbortError = class AbortError extends Error { - // constructor(message = "The operation was aborted", options = void 0) { - // if (options !== void 0 && typeof options !== "object") { - // throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); - // } - // super(message, options); - // this.code = "ABORT_ERR"; - // this.name = "AbortError"; - // } - // }; - - // var { AbortSignal, Object, Number, console } = globalThis; - var { Object, Number, console, Symbol } = globalThis; - // console.log("AbortSignal", AbortSignal); - - function isUndefinedOrNull(value) { - return value === undefined || value === null; - } + var { AbortSignal, Number, Error } = globalThis; + + var AbortError = class AbortError extends Error { + constructor(message = "The operation was aborted", options = void 0) { + if (options !== void 0 && typeof options !== "object") { + throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); + } + super(message, options); + this.code = "ABORT_ERR"; + this.name = "AbortError"; + } + }; + + if (@isUndefinedOrNull(emitter)) @throwTypeError("emitter is required"); + // TODO: Do a more accurate check + if (!(typeof emitter === "object" && @isCallable(emitter.emit) && @isCallable(emitter.on))) + @throwTypeError("emitter must be an EventEmitter"); - if (isUndefinedOrNull(options)) options = {}; + if (@isUndefinedOrNull(options)) options = {}; // Parameters validation var signal = options.signal; - // if (!isUndefinedOrNull(signal) && !(signal instanceof AbortSignal)) - // @throwTypeError("options.signal must be an AbortSignal"); + if (!@isUndefinedOrNull(signal) && !(signal instanceof AbortSignal)) + @throwTypeError("options.signal must be an AbortSignal"); - // if (signal?.aborted) { - // // TODO: Make this a builtin - // throw new AbortError(undefined, { cause: signal?.reason }); - // } + if (signal?.aborted) { + // TODO: Make this a builtin + throw new AbortError(@undefined, { cause: signal?.reason }); + } var highWatermark = options.highWatermark ?? Number.MAX_SAFE_INTEGER; if (highWatermark < 1) @@ -74,9 +73,9 @@ function onAsyncIterator(emitter, event, options) { var size = 0; var listeners = []; - // function abortListener() { - // errorHandler(new AbortError(undefined, { cause: signal?.reason })); - // } + function abortListener() { + errorHandler(new AbortError(@undefined, { cause: signal?.reason })); + } function eventHandler(value) { if (unconsumedPromises.isEmpty()) { @@ -86,22 +85,22 @@ function onAsyncIterator(emitter, event, options) { emitter.pause(); } unconsumedEvents.push(value); - } else unconsumedPromises.shift().@resolve.@call(undefined, [value]); + } else unconsumedPromises.shift().@resolve.@call(@undefined, [value]); } function closeHandler() { removeAllListeners(listeners); finished = true; while (!unconsumedPromises.isEmpty()) { - unconsumedPromises.shift().@resolve.@call(undefined, [undefined]); + const promise = unconsumedPromises.shift(); + promise.@resolve.@call(@undefined, [@undefined]); } - - return @createFulfilledPromise([undefined]); + return @createFulfilledPromise([@undefined]); } function errorHandler(err) { if (unconsumedPromises.isEmpty()) error = err; - else unconsumedPromises.shift().@reject.@call(undefined, err); + else unconsumedPromises.shift().@reject.@call(@undefined, err); closeHandler(); } @@ -109,40 +108,33 @@ function onAsyncIterator(emitter, event, options) { function addEventListener(emitter, event, handler) { emitter.on(event, handler); listeners.push([emitter, event, handler]); - // @arrayPush(listeners, [emitter, event, handler]); } function removeAllListeners() { - var heldEmitter; while (listeners.length > 0) { - // var [emitter, event, handler] = @arrayPop(listeners); var entry = listeners.pop(); var [emitter, event, handler] = entry; - if (event === "foo") heldEmitter = emitter; - console.log(emitter, event, handler); emitter.off(event, handler); } - console.log(heldEmitter.listenerCount("foo")); } - var iterator = async function* NodeEventsOnAsyncIterator() { - // First, we consume all unread events - while (!finished) { - if (size) { - var values = []; - while (size) { - values.push(unconsumedEvents.shift()); - size--; - if (paused && size < lowWatermark) { - emitter.resume(); - paused = false; - break; - } + var createIterator = async function* NodeEventsOnAsyncIterator() { + // First, we consume all unread events + try { + while (true) { + // Go through queued events + while (size) { + const value = unconsumedEvents.shift(); + size--; + if (paused && size < lowWatermark) { + emitter.resume(); + paused = false; + break; } - yield @createFulfilledPromise(values); + yield @createFulfilledPromise([value]); } - // Then we error, if an error happened + // Check if error happened before yielding anything // This happens one time if at all, because after 'error' // we stop listening if (error) { @@ -150,66 +142,18 @@ function onAsyncIterator(emitter, event, options) { } // If the iterator is finished, break - if (finished) { - console.log("FINISHED") - yield closeHandler(); - break; - }; + if (finished) break; // Wait until an event happens var nextEventPromiseCapability = @newPromiseCapability(@Promise); unconsumedPromises.push(nextEventPromiseCapability); yield nextEventPromiseCapability.@promise; } + } finally { + closeHandler(); + } }; - // // TODO: Use builtin - // Object.defineProperties(iterator, { - // "throw": { - // value: (err) => { - // // TODO: Use Error builtin? - // if (err === undefined || err === null || !(err instanceof Error)) { - // @throwTypeError("The argument must be an instance of Error"); - // } - // errorHandler(err); - // }, - // }, - // "return": { - // value: () => { - // return closeHandler(); - // } - // }, - // [Symbol.asyncIterator]: { - // value: () => iterator, - // }, - // // [kWatermarkData]: { - // // /** - // // * The current queue size - // // */ - // // get size() { - // // return size; - // // }, - // // /** - // // * The low watermark. The emitter is resumed every time size is lower than it - // // */ - // // get low() { - // // return lowWatermark; - // // }, - // // /** - // // * The high watermark. The emitter is paused every time size is higher than it - // // */ - // // get high() { - // // return highWatermark; - // // }, - // // /** - // // * It checks whether the emitter is paused by the watermark controller or not - // // */ - // // get isPaused() { - // // return paused; - // // }, - // // }, - // }); - // Adding event handlers addEventListener(emitter, event, eventHandler); if (event !== "error" && typeof emitter.on === "function") { @@ -222,21 +166,28 @@ function onAsyncIterator(emitter, event, options) { } } - // if (signal) - // signal.once("abort", abortListener); + if (signal) + signal.once("abort", abortListener); - return { - throw: (err) => { - if (err === undefined || err === null || !(err instanceof Error)) { - @throwTypeError("The argument must be an instance of Error"); - } - errorHandler(err); + var iterator = createIterator(); + + @Object.defineProperties(iterator, { + return: { + value: function() { + return closeHandler(); + }, + }, + throw: { + value: function(err) { + if (!err || !(err instanceof Error)) { + throw new TypeError("EventEmitter.AsyncIterator must be called with an error"); + } + errorHandler(err); + }, }, - return: () => { - console.log("we're here"); - return closeHandler(); + [Symbol.asyncIterator]: { + value: function() { return this; } }, - next: () => iterator.next(), - [Symbol.asyncIterator]: iterator, - }; + }); + return iterator; } |