aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/NodeEvents.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js/NodeEvents.js')
-rw-r--r--src/bun.js/builtins/js/NodeEvents.js191
1 files changed, 71 insertions, 120 deletions
diff --git a/src/bun.js/builtins/js/NodeEvents.js b/src/bun.js/builtins/js/NodeEvents.js
index 02e944fde..6c80f81c8 100644
--- a/src/bun.js/builtins/js/NodeEvents.js
+++ b/src/bun.js/builtins/js/NodeEvents.js
@@ -26,36 +26,35 @@
function onAsyncIterator(emitter, event, options) {
"use strict";
- // var AbortError = class AbortError extends Error {
- // constructor(message = "The operation was aborted", options = void 0) {
- // if (options !== void 0 && typeof options !== "object") {
- // throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
- // }
- // super(message, options);
- // this.code = "ABORT_ERR";
- // this.name = "AbortError";
- // }
- // };
-
- // var { AbortSignal, Object, Number, console } = globalThis;
- var { Object, Number, console, Symbol } = globalThis;
- // console.log("AbortSignal", AbortSignal);
-
- function isUndefinedOrNull(value) {
- return value === undefined || value === null;
- }
+ var { AbortSignal, Number, Error } = globalThis;
+
+ var AbortError = class AbortError extends Error {
+ constructor(message = "The operation was aborted", options = void 0) {
+ if (options !== void 0 && typeof options !== "object") {
+ throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
+ }
+ super(message, options);
+ this.code = "ABORT_ERR";
+ this.name = "AbortError";
+ }
+ };
+
+ if (@isUndefinedOrNull(emitter)) @throwTypeError("emitter is required");
+ // TODO: Do a more accurate check
+ if (!(typeof emitter === "object" && @isCallable(emitter.emit) && @isCallable(emitter.on)))
+ @throwTypeError("emitter must be an EventEmitter");
- if (isUndefinedOrNull(options)) options = {};
+ if (@isUndefinedOrNull(options)) options = {};
// Parameters validation
var signal = options.signal;
- // if (!isUndefinedOrNull(signal) && !(signal instanceof AbortSignal))
- // @throwTypeError("options.signal must be an AbortSignal");
+ if (!@isUndefinedOrNull(signal) && !(signal instanceof AbortSignal))
+ @throwTypeError("options.signal must be an AbortSignal");
- // if (signal?.aborted) {
- // // TODO: Make this a builtin
- // throw new AbortError(undefined, { cause: signal?.reason });
- // }
+ if (signal?.aborted) {
+ // TODO: Make this a builtin
+ throw new AbortError(@undefined, { cause: signal?.reason });
+ }
var highWatermark = options.highWatermark ?? Number.MAX_SAFE_INTEGER;
if (highWatermark < 1)
@@ -74,9 +73,9 @@ function onAsyncIterator(emitter, event, options) {
var size = 0;
var listeners = [];
- // function abortListener() {
- // errorHandler(new AbortError(undefined, { cause: signal?.reason }));
- // }
+ function abortListener() {
+ errorHandler(new AbortError(@undefined, { cause: signal?.reason }));
+ }
function eventHandler(value) {
if (unconsumedPromises.isEmpty()) {
@@ -86,22 +85,22 @@ function onAsyncIterator(emitter, event, options) {
emitter.pause();
}
unconsumedEvents.push(value);
- } else unconsumedPromises.shift().@resolve.@call(undefined, [value]);
+ } else unconsumedPromises.shift().@resolve.@call(@undefined, [value]);
}
function closeHandler() {
removeAllListeners(listeners);
finished = true;
while (!unconsumedPromises.isEmpty()) {
- unconsumedPromises.shift().@resolve.@call(undefined, [undefined]);
+ const promise = unconsumedPromises.shift();
+ promise.@resolve.@call(@undefined, [@undefined]);
}
-
- return @createFulfilledPromise([undefined]);
+ return @createFulfilledPromise([@undefined]);
}
function errorHandler(err) {
if (unconsumedPromises.isEmpty()) error = err;
- else unconsumedPromises.shift().@reject.@call(undefined, err);
+ else unconsumedPromises.shift().@reject.@call(@undefined, err);
closeHandler();
}
@@ -109,40 +108,33 @@ function onAsyncIterator(emitter, event, options) {
function addEventListener(emitter, event, handler) {
emitter.on(event, handler);
listeners.push([emitter, event, handler]);
- // @arrayPush(listeners, [emitter, event, handler]);
}
function removeAllListeners() {
- var heldEmitter;
while (listeners.length > 0) {
- // var [emitter, event, handler] = @arrayPop(listeners);
var entry = listeners.pop();
var [emitter, event, handler] = entry;
- if (event === "foo") heldEmitter = emitter;
- console.log(emitter, event, handler);
emitter.off(event, handler);
}
- console.log(heldEmitter.listenerCount("foo"));
}
- var iterator = async function* NodeEventsOnAsyncIterator() {
- // First, we consume all unread events
- while (!finished) {
- if (size) {
- var values = [];
- while (size) {
- values.push(unconsumedEvents.shift());
- size--;
- if (paused && size < lowWatermark) {
- emitter.resume();
- paused = false;
- break;
- }
+ var createIterator = async function* NodeEventsOnAsyncIterator() {
+ // First, we consume all unread events
+ try {
+ while (true) {
+ // Go through queued events
+ while (size) {
+ const value = unconsumedEvents.shift();
+ size--;
+ if (paused && size < lowWatermark) {
+ emitter.resume();
+ paused = false;
+ break;
}
- yield @createFulfilledPromise(values);
+ yield @createFulfilledPromise([value]);
}
- // Then we error, if an error happened
+ // Check if error happened before yielding anything
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
@@ -150,66 +142,18 @@ function onAsyncIterator(emitter, event, options) {
}
// If the iterator is finished, break
- if (finished) {
- console.log("FINISHED")
- yield closeHandler();
- break;
- };
+ if (finished) break;
// Wait until an event happens
var nextEventPromiseCapability = @newPromiseCapability(@Promise);
unconsumedPromises.push(nextEventPromiseCapability);
yield nextEventPromiseCapability.@promise;
}
+ } finally {
+ closeHandler();
+ }
};
- // // TODO: Use builtin
- // Object.defineProperties(iterator, {
- // "throw": {
- // value: (err) => {
- // // TODO: Use Error builtin?
- // if (err === undefined || err === null || !(err instanceof Error)) {
- // @throwTypeError("The argument must be an instance of Error");
- // }
- // errorHandler(err);
- // },
- // },
- // "return": {
- // value: () => {
- // return closeHandler();
- // }
- // },
- // [Symbol.asyncIterator]: {
- // value: () => iterator,
- // },
- // // [kWatermarkData]: {
- // // /**
- // // * The current queue size
- // // */
- // // get size() {
- // // return size;
- // // },
- // // /**
- // // * The low watermark. The emitter is resumed every time size is lower than it
- // // */
- // // get low() {
- // // return lowWatermark;
- // // },
- // // /**
- // // * The high watermark. The emitter is paused every time size is higher than it
- // // */
- // // get high() {
- // // return highWatermark;
- // // },
- // // /**
- // // * It checks whether the emitter is paused by the watermark controller or not
- // // */
- // // get isPaused() {
- // // return paused;
- // // },
- // // },
- // });
-
// Adding event handlers
addEventListener(emitter, event, eventHandler);
if (event !== "error" && typeof emitter.on === "function") {
@@ -222,21 +166,28 @@ function onAsyncIterator(emitter, event, options) {
}
}
- // if (signal)
- // signal.once("abort", abortListener);
+ if (signal)
+ signal.once("abort", abortListener);
- return {
- throw: (err) => {
- if (err === undefined || err === null || !(err instanceof Error)) {
- @throwTypeError("The argument must be an instance of Error");
- }
- errorHandler(err);
+ var iterator = createIterator();
+
+ @Object.defineProperties(iterator, {
+ return: {
+ value: function() {
+ return closeHandler();
+ },
+ },
+ throw: {
+ value: function(err) {
+ if (!err || !(err instanceof Error)) {
+ throw new TypeError("EventEmitter.AsyncIterator must be called with an error");
+ }
+ errorHandler(err);
+ },
},
- return: () => {
- console.log("we're here");
- return closeHandler();
+ [Symbol.asyncIterator]: {
+ value: function() { return this; }
},
- next: () => iterator.next(),
- [Symbol.asyncIterator]: iterator,
- };
+ });
+ return iterator;
}