aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/diagnostics_channel.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/diagnostics_channel.js')
-rw-r--r--src/js/node/diagnostics_channel.js416
1 files changed, 416 insertions, 0 deletions
diff --git a/src/js/node/diagnostics_channel.js b/src/js/node/diagnostics_channel.js
new file mode 100644
index 000000000..664c80693
--- /dev/null
+++ b/src/js/node/diagnostics_channel.js
@@ -0,0 +1,416 @@
+// Hardcoded module "node:diagnostics_channel"
+// Reference: https://github.com/nodejs/node/blob/fb47afc335ef78a8cef7eac52b8ee7f045300696/lib/diagnostics_channel.js
+const SafeMap = Map;
+const SafeFinalizationRegistry = FinalizationRegistry;
+
+const ArrayPrototypeAt = (array, index) => array[index];
+const ArrayPrototypeIndexOf = (array, value) => array.indexOf(value);
+const ArrayPrototypePush = (array, value) => array.push(value);
+const ArrayPrototypeSplice = (array, start, deleteCount) => array.splice(start, deleteCount);
+const ObjectGetPrototypeOf = Object.getPrototypeOf;
+const ObjectSetPrototypeOf = Object.setPrototypeOf;
+const SymbolHasInstance = Symbol.hasInstance;
+const ReflectApply = Reflect.apply;
+const PromiseResolve = Promise.resolve;
+const PromiseReject = Promise.reject;
+const PromisePrototypeThen = (promise, onFulfilled, onRejected) => promise.then(onFulfilled, onRejected);
+
+// TODO: https://github.com/nodejs/node/blob/fb47afc335ef78a8cef7eac52b8ee7f045300696/src/node_util.h#L13
+class WeakReference extends WeakRef {
+ #refs = 0;
+
+ get() {
+ return this.deref();
+ }
+
+ incRef() {
+ return ++this.#refs;
+ }
+
+ decRef() {
+ return --this.#refs;
+ }
+}
+
+// Can't delete when weakref count reaches 0 as it could increment again.
+// Only GC can be used as a valid time to clean up the channels map.
+class WeakRefMap extends SafeMap {
+ #finalizers = new SafeFinalizationRegistry(key => {
+ this.delete(key);
+ });
+
+ set(key, value) {
+ this.#finalizers.register(value, key);
+ return super.set(key, new WeakReference(value));
+ }
+
+ get(key) {
+ return super.get(key)?.get();
+ }
+
+ incRef(key) {
+ return super.get(key)?.incRef();
+ }
+
+ decRef(key) {
+ return super.get(key)?.decRef();
+ }
+}
+
+function markActive(channel) {
+ ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
+ channel._subscribers = [];
+ channel._stores = new SafeMap();
+}
+
+function maybeMarkInactive(channel) {
+ // When there are no more active subscribers or bound, restore to fast prototype.
+ if (!channel._subscribers.length && !channel._stores.size) {
+ ObjectSetPrototypeOf(channel, Channel.prototype);
+ channel._subscribers = undefined;
+ channel._stores = undefined;
+ }
+}
+
+function defaultTransform(data) {
+ return data;
+}
+
+function wrapStoreRun(store, data, next, transform = defaultTransform) {
+ return () => {
+ let context;
+ try {
+ context = transform(data);
+ } catch (err) {
+ process.nextTick(() => reportError(err));
+ return next();
+ }
+
+ return store.run(context, next);
+ };
+}
+
+class ActiveChannel {
+ subscribe(subscription) {
+ validateFunction(subscription, "subscription");
+
+ ArrayPrototypePush(this._subscribers, subscription);
+ channels.incRef(this.name);
+ }
+
+ unsubscribe(subscription) {
+ const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
+ if (index === -1) return false;
+
+ ArrayPrototypeSplice(this._subscribers, index, 1);
+
+ channels.decRef(this.name);
+ maybeMarkInactive(this);
+
+ return true;
+ }
+
+ bindStore(store, transform) {
+ const replacing = this._stores.has(store);
+ if (!replacing) channels.incRef(this.name);
+ this._stores.set(store, transform);
+ }
+
+ unbindStore(store) {
+ if (!this._stores.has(store)) {
+ return false;
+ }
+
+ this._stores.delete(store);
+
+ channels.decRef(this.name);
+ maybeMarkInactive(this);
+
+ return true;
+ }
+
+ get hasSubscribers() {
+ return true;
+ }
+
+ publish(data) {
+ for (let i = 0; i < (this._subscribers?.length || 0); i++) {
+ try {
+ const onMessage = this._subscribers[i];
+ onMessage(data, this.name);
+ } catch (err) {
+ process.nextTick(() => reportError(err));
+ }
+ }
+ }
+
+ runStores(data, fn, thisArg, ...args) {
+ let run = () => {
+ this.publish(data);
+ return ReflectApply(fn, thisArg, args);
+ };
+
+ for (const entry of this._stores.entries()) {
+ const store = entry[0];
+ const transform = entry[1];
+ run = wrapStoreRun(store, data, run, transform);
+ }
+
+ return run();
+ }
+}
+
+class Channel {
+ constructor(name) {
+ this._subscribers = undefined;
+ this._stores = undefined;
+ this.name = name;
+
+ channels.set(name, this);
+ }
+
+ static [SymbolHasInstance](instance) {
+ const prototype = ObjectGetPrototypeOf(instance);
+ return prototype === Channel.prototype || prototype === ActiveChannel.prototype;
+ }
+
+ subscribe(subscription) {
+ markActive(this);
+ this.subscribe(subscription);
+ }
+
+ unsubscribe() {
+ return false;
+ }
+
+ bindStore(store, transform) {
+ markActive(this);
+ this.bindStore(store, transform);
+ }
+
+ unbindStore() {
+ return false;
+ }
+
+ get hasSubscribers() {
+ return false;
+ }
+
+ publish() {}
+
+ runStores(data, fn, thisArg, ...args) {
+ return ReflectApply(fn, thisArg, args);
+ }
+}
+
+const channels = new WeakRefMap();
+
+function channel(name) {
+ const channel = channels.get(name);
+ if (channel) return channel;
+
+ if (typeof name !== "string" && typeof name !== "symbol") {
+ throw new ERR_INVALID_ARG_TYPE("channel", ["string", "symbol"], name);
+ }
+
+ return new Channel(name);
+}
+
+function subscribe(name, subscription) {
+ return channel(name).subscribe(subscription);
+}
+
+function unsubscribe(name, subscription) {
+ return channel(name).unsubscribe(subscription);
+}
+
+function hasSubscribers(name) {
+ const channel = channels.get(name);
+ if (!channel) return false;
+
+ return channel.hasSubscribers;
+}
+
+const traceEvents = ["start", "end", "asyncStart", "asyncEnd", "error"];
+
+function assertChannel(value, name) {
+ if (!(value instanceof Channel)) {
+ throw new ERR_INVALID_ARG_TYPE(name, ["Channel"], value);
+ }
+}
+
+class TracingChannel {
+ constructor(nameOrChannels) {
+ if (typeof nameOrChannels === "string") {
+ this.start = channel(`tracing:${nameOrChannels}:start`);
+ this.end = channel(`tracing:${nameOrChannels}:end`);
+ this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
+ this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
+ this.error = channel(`tracing:${nameOrChannels}:error`);
+ } else if (typeof nameOrChannels === "object") {
+ const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
+
+ assertChannel(start, "nameOrChannels.start");
+ assertChannel(end, "nameOrChannels.end");
+ assertChannel(asyncStart, "nameOrChannels.asyncStart");
+ assertChannel(asyncEnd, "nameOrChannels.asyncEnd");
+ assertChannel(error, "nameOrChannels.error");
+
+ this.start = start;
+ this.end = end;
+ this.asyncStart = asyncStart;
+ this.asyncEnd = asyncEnd;
+ this.error = error;
+ } else {
+ throw new ERR_INVALID_ARG_TYPE("nameOrChannels", ["string", "object", "Channel"], nameOrChannels);
+ }
+ }
+
+ subscribe(handlers) {
+ for (const name of traceEvents) {
+ if (!handlers[name]) continue;
+
+ this[name]?.subscribe(handlers[name]);
+ }
+ }
+
+ unsubscribe(handlers) {
+ let done = true;
+
+ for (const name of traceEvents) {
+ if (!handlers[name]) continue;
+
+ if (!this[name]?.unsubscribe(handlers[name])) {
+ done = false;
+ }
+ }
+
+ return done;
+ }
+
+ traceSync(fn, context = {}, thisArg, ...args) {
+ const { start, end, error } = this;
+
+ return start.runStores(context, () => {
+ try {
+ const result = ReflectApply(fn, thisArg, args);
+ context.result = result;
+ return result;
+ } catch (err) {
+ context.error = err;
+ error.publish(context);
+ throw err;
+ } finally {
+ end.publish(context);
+ }
+ });
+ }
+
+ tracePromise(fn, context = {}, thisArg, ...args) {
+ const { start, end, asyncStart, asyncEnd, error } = this;
+
+ function reject(err) {
+ context.error = err;
+ error.publish(context);
+ asyncStart.publish(context);
+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
+ asyncEnd.publish(context);
+ return PromiseReject(err);
+ }
+
+ function resolve(result) {
+ context.result = result;
+ asyncStart.publish(context);
+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
+ asyncEnd.publish(context);
+ return result;
+ }
+
+ return start.runStores(context, () => {
+ try {
+ let promise = ReflectApply(fn, thisArg, args);
+ // Convert thenables to native promises
+ if (!(promise instanceof Promise)) {
+ promise = PromiseResolve(promise);
+ }
+ return PromisePrototypeThen(promise, resolve, reject);
+ } catch (err) {
+ context.error = err;
+ error.publish(context);
+ throw err;
+ } finally {
+ end.publish(context);
+ }
+ });
+ }
+
+ traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
+ const { start, end, asyncStart, asyncEnd, error } = this;
+
+ function wrappedCallback(err, res) {
+ if (err) {
+ context.error = err;
+ error.publish(context);
+ } else {
+ context.result = res;
+ }
+
+ // Using runStores here enables manual context failure recovery
+ asyncStart.runStores(context, () => {
+ try {
+ if (callback) {
+ return ReflectApply(callback, this, arguments);
+ }
+ } finally {
+ asyncEnd.publish(context);
+ }
+ });
+ }
+
+ const callback = ArrayPrototypeAt(args, position);
+ if (typeof callback !== "function") {
+ throw new ERR_INVALID_ARG_TYPE("callback", ["function"], callback);
+ }
+ ArrayPrototypeSplice(args, position, 1, wrappedCallback);
+
+ return start.runStores(context, () => {
+ try {
+ return ReflectApply(fn, thisArg, args);
+ } catch (err) {
+ context.error = err;
+ error.publish(context);
+ throw err;
+ } finally {
+ end.publish(context);
+ }
+ });
+ }
+}
+
+function tracingChannel(nameOrChannels) {
+ return new TracingChannel(nameOrChannels);
+}
+
+class ERR_INVALID_ARG_TYPE extends TypeError {
+ constructor(name, expected, actual) {
+ super(`The ${name} argument must be of type ${expected}. Received type ${typeof actual}`);
+ this.code = "ERR_INVALID_ARG_TYPE";
+ }
+}
+
+function validateFunction(callable, field) {
+ if (typeof callable !== "function") {
+ throw new ERR_INVALID_ARG_TYPE(field, "Function", callable);
+ }
+
+ return callable;
+}
+
+export default {
+ channel,
+ hasSubscribers,
+ subscribe,
+ tracingChannel,
+ unsubscribe,
+ Channel,
+};