aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/NodeEvents.js
diff options
context:
space:
mode:
authorGravatar Derrick Farris <mr.dcfarris@gmail.com> 2023-03-30 23:46:13 -0500
committerGravatar Derrick Farris <mr.dcfarris@gmail.com> 2023-04-06 17:34:31 -0500
commitb2625b4ab922b43c8e58fbdf4d8031d7b2099f0c (patch)
tree5288bad9164e679cb1dfdabcb8696188b28f62ca /src/bun.js/builtins/js/NodeEvents.js
parent91f02989dc0432079693c5781269a07973099067 (diff)
downloadbun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.tar.gz
bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.tar.zst
bun-b2625b4ab922b43c8e58fbdf4d8031d7b2099f0c.zip
wip(node:events): get async iter working, work through test flakiness
Diffstat (limited to 'src/bun.js/builtins/js/NodeEvents.js')
-rw-r--r--src/bun.js/builtins/js/NodeEvents.js235
1 files changed, 135 insertions, 100 deletions
diff --git a/src/bun.js/builtins/js/NodeEvents.js b/src/bun.js/builtins/js/NodeEvents.js
index b94d30725..02e944fde 100644
--- a/src/bun.js/builtins/js/NodeEvents.js
+++ b/src/bun.js/builtins/js/NodeEvents.js
@@ -26,30 +26,36 @@
function onAsyncIterator(emitter, event, options) {
"use strict";
- var AbortError = class AbortError extends Error {
- constructor(message = "The operation was aborted", options = void 0) {
- if (options !== void 0 && typeof options !== "object") {
- throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
- }
- super(message, options);
- this.code = "ABORT_ERR";
- this.name = "AbortError";
- }
- };
-
- var { AbortSignal, Object, Number } = globalThis;
+ // var AbortError = class AbortError extends Error {
+ // constructor(message = "The operation was aborted", options = void 0) {
+ // if (options !== void 0 && typeof options !== "object") {
+ // throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
+ // }
+ // super(message, options);
+ // this.code = "ABORT_ERR";
+ // this.name = "AbortError";
+ // }
+ // };
+
+ // var { AbortSignal, Object, Number, console } = globalThis;
+ var { Object, Number, console, Symbol } = globalThis;
+ // console.log("AbortSignal", AbortSignal);
+
+ function isUndefinedOrNull(value) {
+ return value === undefined || value === null;
+ }
- if (@isUndefinedOrNull(options)) options = {};
+ if (isUndefinedOrNull(options)) options = {};
// Parameters validation
var signal = options.signal;
- if (!@isUndefinedOrNull(signal) && !(signal instanceof AbortSignal))
- @throwTypeError("options.signal must be an AbortSignal");
+ // if (!isUndefinedOrNull(signal) && !(signal instanceof AbortSignal))
+ // @throwTypeError("options.signal must be an AbortSignal");
- if (signal?.aborted) {
- // TODO: Make this a builtin
- throw new AbortError(@undefined, { cause: signal?.reason });
- }
+ // if (signal?.aborted) {
+ // // TODO: Make this a builtin
+ // throw new AbortError(undefined, { cause: signal?.reason });
+ // }
var highWatermark = options.highWatermark ?? Number.MAX_SAFE_INTEGER;
if (highWatermark < 1)
@@ -66,10 +72,11 @@ function onAsyncIterator(emitter, event, options) {
var error = null;
var finished = false;
var size = 0;
+ var listeners = [];
- function abortListener() {
- errorHandler(new AbortError(@undefined, { cause: signal?.reason }));
- }
+ // function abortListener() {
+ // errorHandler(new AbortError(undefined, { cause: signal?.reason }));
+ // }
function eventHandler(value) {
if (unconsumedPromises.isEmpty()) {
@@ -79,116 +86,131 @@ function onAsyncIterator(emitter, event, options) {
emitter.pause();
}
unconsumedEvents.push(value);
- } else unconsumedPromises.shift().@resolve.@call(@undefined, { value, done: false });
+ } else unconsumedPromises.shift().@resolve.@call(undefined, [value]);
}
function closeHandler() {
removeAllListeners(listeners);
finished = true;
- var doneResult = { value: @undefined, done: true };
while (!unconsumedPromises.isEmpty()) {
- unconsumedPromises.shift().@resolve.@call(@undefined, doneResult);
+ unconsumedPromises.shift().@resolve.@call(undefined, [undefined]);
}
- return @createFulfilledPromise(doneResult);
+ return @createFulfilledPromise([undefined]);
}
function errorHandler(err) {
if (unconsumedPromises.isEmpty()) error = err;
- else unconsumedPromises.shift().@reject.@call(@undefined, err);
+ else unconsumedPromises.shift().@reject.@call(undefined, err);
closeHandler();
}
function addEventListener(emitter, event, handler) {
emitter.on(event, handler);
- @arrayPush(listeners, emitter, event, handler);
+ listeners.push([emitter, event, handler]);
+ // @arrayPush(listeners, [emitter, event, handler]);
}
function removeAllListeners() {
+ var heldEmitter;
while (listeners.length > 0) {
- var [emitter, event, handler] = @arrayPop(listeners);
+ // var [emitter, event, handler] = @arrayPop(listeners);
+ var entry = listeners.pop();
+ var [emitter, event, handler] = entry;
+ if (event === "foo") heldEmitter = emitter;
+ console.log(emitter, event, handler);
emitter.off(event, handler);
}
+ console.log(heldEmitter.listenerCount("foo"));
}
var iterator = async function* NodeEventsOnAsyncIterator() {
// First, we consume all unread events
- if (size) {
- var value = unconsumedEvents.shift();
- size--;
- if (paused && size < lowWatermark) {
- emitter.resume();
- paused = false;
+ while (!finished) {
+ if (size) {
+ var values = [];
+ while (size) {
+ values.push(unconsumedEvents.shift());
+ size--;
+ if (paused && size < lowWatermark) {
+ emitter.resume();
+ paused = false;
+ break;
+ }
+ }
+ yield @createFulfilledPromise(values);
}
- yield @createFulfilledPromise({ value, done: false });
- }
-
- // Then we error, if an error happened
- // This happens one time if at all, because after 'error'
- // we stop listening
- if (error) {
- var p = @Promise.@reject(error);
- // Only the first element errors
- error = null;
- yield p;
- }
- // If the iterator is finished, resolve to done
- if (finished) yield closeHandler();
+ // Then we error, if an error happened
+ // This happens one time if at all, because after 'error'
+ // we stop listening
+ if (error) {
+ throw error;
+ }
- // Wait until an event happens
- var nextEventPromiseCapability = @newPromiseCapability(@Promise);
- unconsumedPromises.push(nextEventPromiseCapability);
- yield nextEventPromiseCapability.@promise;
+ // If the iterator is finished, break
+ if (finished) {
+ console.log("FINISHED")
+ yield closeHandler();
+ break;
+ };
+
+ // Wait until an event happens
+ var nextEventPromiseCapability = @newPromiseCapability(@Promise);
+ unconsumedPromises.push(nextEventPromiseCapability);
+ yield nextEventPromiseCapability.@promise;
+ }
};
- // TODO: Use builtin
- Object.defineProperties(iterator, {
- "throw": {
- value: (err) => {
- // TODO: Use Error builtin?
- if (err === undefined || err === null || !(err instanceof Error)) {
- @throwTypeError("The argument must be an instance of Error");
- }
- errorHandler(err);
- },
- },
- "return": {
- value: () => {
- return closeHandler();
- }
- },
- // [kWatermarkData]: {
- // /**
- // * The current queue size
- // */
- // get size() {
- // return size;
- // },
- // /**
- // * The low watermark. The emitter is resumed every time size is lower than it
- // */
- // get low() {
- // return lowWatermark;
- // },
- // /**
- // * The high watermark. The emitter is paused every time size is higher than it
- // */
- // get high() {
- // return highWatermark;
- // },
- // /**
- // * It checks whether the emitter is paused by the watermark controller or not
- // */
- // get isPaused() {
- // return paused;
- // },
- // },
- });
+ // // TODO: Use builtin
+ // Object.defineProperties(iterator, {
+ // "throw": {
+ // value: (err) => {
+ // // TODO: Use Error builtin?
+ // if (err === undefined || err === null || !(err instanceof Error)) {
+ // @throwTypeError("The argument must be an instance of Error");
+ // }
+ // errorHandler(err);
+ // },
+ // },
+ // "return": {
+ // value: () => {
+ // return closeHandler();
+ // }
+ // },
+ // [Symbol.asyncIterator]: {
+ // value: () => iterator,
+ // },
+ // // [kWatermarkData]: {
+ // // /**
+ // // * The current queue size
+ // // */
+ // // get size() {
+ // // return size;
+ // // },
+ // // /**
+ // // * The low watermark. The emitter is resumed every time size is lower than it
+ // // */
+ // // get low() {
+ // // return lowWatermark;
+ // // },
+ // // /**
+ // // * The high watermark. The emitter is paused every time size is higher than it
+ // // */
+ // // get high() {
+ // // return highWatermark;
+ // // },
+ // // /**
+ // // * It checks whether the emitter is paused by the watermark controller or not
+ // // */
+ // // get isPaused() {
+ // // return paused;
+ // // },
+ // // },
+ // });
// Adding event handlers
- var listeners = [];
addEventListener(emitter, event, eventHandler);
if (event !== "error" && typeof emitter.on === "function") {
addEventListener(emitter, "error", errorHandler);
@@ -200,8 +222,21 @@ function onAsyncIterator(emitter, event, options) {
}
}
- if (signal)
- signal.once("abort", abortListener);
+ // if (signal)
+ // signal.once("abort", abortListener);
- return iterator;
+ return {
+ throw: (err) => {
+ if (err === undefined || err === null || !(err instanceof Error)) {
+ @throwTypeError("The argument must be an instance of Error");
+ }
+ errorHandler(err);
+ },
+ return: () => {
+ console.log("we're here");
+ return closeHandler();
+ },
+ next: () => iterator.next(),
+ [Symbol.asyncIterator]: iterator,
+ };
}