aboutsummaryrefslogtreecommitdiff
path: root/test/js/node/async_hooks
diff options
context:
space:
mode:
authorGravatar dave caruso <me@paperdave.net> 2023-07-19 17:20:00 -0700
committerGravatar GitHub <noreply@github.com> 2023-07-19 17:20:00 -0700
commit9b6dc49575df5fb953918c284505f24741138130 (patch)
tree3a052876fa8c6524e0c8d18479aabe38e2d5a52a /test/js/node/async_hooks
parent723e9d1ea7c7fdb424ecedd0fb023524366322c4 (diff)
downloadbun-9b6dc49575df5fb953918c284505f24741138130.tar.gz
bun-9b6dc49575df5fb953918c284505f24741138130.tar.zst
bun-9b6dc49575df5fb953918c284505f24741138130.zip
Implement `AsyncLocalStorage` (#3089)
* work to get async local storage working. * a * a * everything but queueMicrotask * sdfghj * . * finish * tests * test * ok * done * im so stupid * Upgrade WebKit * refactor * refactor * changes requested * oops * cool * fix runInAsyncScope
Diffstat (limited to 'test/js/node/async_hooks')
-rw-r--r--test/js/node/async_hooks/AsyncLocalStorage.test.ts568
-rw-r--r--test/js/node/async_hooks/EventEmitterAsyncResource.test.ts29
-rw-r--r--test/js/node/async_hooks/async_hooks.node.test.ts34
3 files changed, 631 insertions, 0 deletions
diff --git a/test/js/node/async_hooks/AsyncLocalStorage.test.ts b/test/js/node/async_hooks/AsyncLocalStorage.test.ts
new file mode 100644
index 000000000..1a7aad050
--- /dev/null
+++ b/test/js/node/async_hooks/AsyncLocalStorage.test.ts
@@ -0,0 +1,568 @@
+import { AsyncLocalStorage, AsyncResource } from "async_hooks";
+import { beforeEach, describe, expect, test } from "bun:test";
+
+describe("AsyncLocalStorage", () => {
+ test("throw inside of AsyncLocalStorage.run() will be passed out", () => {
+ const s = new AsyncLocalStorage();
+ expect(() => {
+ s.run(1, () => {
+ throw new Error("error");
+ });
+ }).toThrow("error");
+ });
+});
+
+test("AsyncResource", () => {
+ const resource = new AsyncResource("prisma-client-request");
+ var called = false;
+ resource.runInAsyncScope(
+ () => {
+ called = true;
+ },
+ null,
+ "foo",
+ "bar",
+ );
+ expect(called).toBe(true);
+});
+
+describe("async context passes through", () => {
+ beforeEach(() => {
+ /* @ts-ignore */
+ const { set } = globalThis[Symbol.for("Bun.lazy")]("async_hooks");
+ // just in case
+ set(undefined);
+ });
+ test("syncronously", () => {
+ const s = new AsyncLocalStorage();
+ s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ s.run("value", () => {
+ s.run("second", () => {
+ expect(s.getStore()).toBe("second");
+ });
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("promise.then", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let resolve!: () => void;
+ const promise = new Promise<void>(r => (resolve = r));
+ let v!: string;
+ s.run("value", () => {
+ promise.then(() => {
+ v = s.getStore()!;
+ });
+ });
+ resolve();
+ await promise;
+ expect(v).toBe("value");
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("nested promises", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let resolve!: () => void;
+ let resolve2!: () => void;
+ const promise = new Promise<void>(r => (resolve = r));
+ const promise2 = new Promise<void>(r => (resolve2 = r));
+ let v!: string;
+ const resolved = Promise.resolve(5);
+ // console.log(1);
+ s.run("value", () => {
+ // console.log(2);
+ promise.then(() => {
+ // console.log(3);
+ new Promise<void>(resolve => {
+ // console.log(4);
+ setTimeout(() => {
+ // console.log(5);
+ resolve();
+ }, 1);
+ }).then(() => {
+ // console.log(6);
+ resolved.then(() => {
+ // console.log(7);
+ v = s.getStore()!;
+ resolve2();
+ });
+ });
+ });
+ });
+ resolve();
+ await promise2;
+ expect(v).toBe("value");
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("await 1", async () => {
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+ await 1;
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("await an actual promise", async () => {
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+ await Promise.resolve(1);
+ expect(s.getStore()).toBe("value");
+ await Bun.sleep(2);
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("setTimeout", async () => {
+ let resolve: (x: string) => void;
+ const promise = new Promise<string>(r => (resolve = r));
+ const s = new AsyncLocalStorage<string>();
+ s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+ setTimeout(() => {
+ resolve(s.getStore()!);
+ }, 2);
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(await promise).toBe("value");
+ });
+ test("setInterval", async () => {
+ let resolve: (x: string[]) => void;
+ const promise = new Promise<string[]>(r => (resolve = r));
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+ const array: string[] = [];
+ const interval = setInterval(() => {
+ array.push(s.getStore()!);
+ if (array.length === 3) {
+ clearInterval(interval);
+ resolve(array);
+ }
+ }, 5);
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(await promise).toEqual(["value", "value", "value"]);
+ });
+ test("setImmediate", async () => {
+ let resolve: (x: string) => void;
+ const promise = new Promise<string>(r => (resolve = r));
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+ setImmediate(() => {
+ resolve(s.getStore()!);
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(await promise).toBe("value");
+ });
+ test("process.nextTick", async () => {
+ let resolve: (x: string) => void;
+ const promise = new Promise<string>(r => (resolve = r));
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+ process.nextTick(() => {
+ resolve(s.getStore()!);
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(await promise).toBe("value");
+ });
+ test("queueMicrotask", async () => {
+ let resolve: (x: string) => void;
+ const promise = new Promise<string>(r => (resolve = r));
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+ queueMicrotask(() => {
+ resolve(s.getStore()!);
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(await promise).toBe("value");
+ });
+ test("promise catch", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let reject!: () => void;
+ let promise = new Promise<void>((_, r) => (reject = r));
+ let v!: string;
+ s.run("value", () => {
+ promise = promise.catch(() => {
+ v = s.getStore()!;
+ });
+ });
+ reject();
+ await promise;
+ expect(v).toBe("value");
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("promise finally", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let resolve!: () => void;
+ let promise = new Promise<void>(r => (resolve = r));
+ let v!: string;
+ s.run("value", () => {
+ promise = promise.finally(() => {
+ v = s.getStore()!;
+ });
+ });
+ resolve();
+ await promise;
+ expect(v).toBe("value");
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("fetch", async () => {
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+ const response = await fetch("https://bun.sh") //
+ .then(r => {
+ expect(s.getStore()).toBe("value");
+ return true;
+ });
+ expect(s.getStore()).toBe("value");
+ expect(response).toBe(true);
+ });
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("Bun.spawn() onExit", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let value: string | undefined;
+ let resolve!: () => void;
+ const promise = new Promise<void>(r => (resolve = r));
+ await s.run("value", () => {
+ expect(s.getStore()).toBe("value");
+
+ const x = Bun.spawn({
+ cmd: ["echo", "hello"],
+ onExit(subprocess, exitCode, signalCode, error) {
+ value = s.getStore()!;
+ resolve();
+ },
+ });
+
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ await promise;
+ expect(value).toBe("value");
+ });
+ test("Bun.serve", async () => {
+ const s = new AsyncLocalStorage<string>();
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+
+ const server = Bun.serve({
+ port: 0,
+ fetch(request, server) {
+ return new Response(s.getStore()!);
+ },
+ });
+
+ const response = await fetch(server.hostname + ":" + server.port);
+ expect(await response.text()).toBe("value");
+
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("readable stream .start", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let stream!: ReadableStream;
+ s.run("value", async () => {
+ stream = new ReadableStream({
+ start(controller) {
+ controller.enqueue(s.getStore()!);
+ controller.close();
+ },
+ });
+ });
+ const reader = stream.getReader();
+ const result = await reader.read();
+ expect(result.value).toBe("value");
+ const result2 = await reader.read();
+ expect(result2.done).toBe(true);
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("readable stream .pull", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let stream!: ReadableStream;
+ s.run("value", async () => {
+ stream = new ReadableStream(
+ {
+ start(controller) {
+ controller.enqueue(new Uint8Array(500));
+ },
+ pull(controller) {
+ controller.enqueue(s.getStore()!);
+ controller.close();
+ },
+ },
+ {
+ highWaterMark: 1,
+ size() {
+ return 500;
+ },
+ },
+ );
+ });
+ const reader = stream.getReader();
+ const result = await reader.read();
+ const result2 = await reader.read();
+ expect(result2.value).toBe("value");
+ const result3 = await reader.read();
+ expect(result3.done).toBe(true);
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("readable stream .pull 2", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let stream!: ReadableStream;
+ let n = 0;
+ s.run("value", async () => {
+ stream = new ReadableStream(
+ {
+ start(controller) {
+ controller.enqueue(new Uint8Array(500));
+ },
+ async pull(controller) {
+ controller.enqueue(s.getStore()!);
+ n++;
+ if (n < 5) {
+ await new Promise(r => setTimeout(r, 1));
+ } else {
+ controller.close();
+ }
+ },
+ },
+ {
+ highWaterMark: 1,
+ size() {
+ return 500;
+ },
+ },
+ );
+ });
+ expect(s.getStore()).toBe(undefined);
+ const reader = stream.getReader();
+ const result = await reader.read();
+ const result2 = await reader.read();
+ expect(result2.value).toBe("value");
+ const result3 = await reader.read();
+ expect(result3.value).toBe("value");
+ const result4 = await reader.read();
+ expect(result4.value).toBe("value");
+ const result5 = await reader.read();
+ expect(result5.value).toBe("value");
+ const result6 = await reader.read();
+ expect(result6.value).toBe("value");
+ const result7 = await reader.read();
+ expect(result7.done).toBe(true);
+ expect(s.getStore()).toBe(undefined);
+ });
+ test("readable stream .cancel", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let stream!: ReadableStream;
+ let value: string | undefined;
+ let resolve!: () => void;
+ let promise = new Promise<void>(r => (resolve = r));
+ s.run("value", async () => {
+ stream = new ReadableStream({
+ start(controller) {},
+ cancel(reason) {
+ value = s.getStore();
+ resolve();
+ },
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ const reader = stream.getReader();
+ reader.cancel();
+ await promise;
+ expect(value).toBe("value");
+ });
+ test("readable stream direct .pull", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let stream!: ReadableStream;
+ let value: string | undefined;
+ let value2: string | undefined;
+ let resolve!: () => void;
+ let promise = new Promise<void>(r => (resolve = r));
+ s.run("value", async () => {
+ stream = new ReadableStream({
+ type: "direct",
+ pull(controller) {
+ value = s.getStore();
+ controller.write("hello");
+ controller.close();
+ resolve();
+ },
+ cancel(reason) {},
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ const reader = stream.getReader();
+ await reader.read();
+ await promise;
+ expect(value).toBe("value");
+ });
+ // blocked by a bug with .cancel
+ test.todo("readable stream direct .cancel", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let stream!: ReadableStream;
+ let value: string | undefined;
+ let value2: string | undefined;
+ let resolve!: () => void;
+ let promise = new Promise<void>(r => (resolve = r));
+ s.run("value", async () => {
+ stream = new ReadableStream({
+ type: "direct",
+ pull(controller) {
+ value = s.getStore();
+ controller.write("hello");
+ },
+ cancel(reason) {
+ console.log("1");
+ value2 = s.getStore();
+ resolve();
+ },
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ const reader = stream.getReader();
+ await reader.read();
+ await reader.cancel();
+ await stream.cancel();
+ await promise;
+ expect(value).toBe("value");
+ expect(value2).toBe("value");
+ });
+ test("Websocket Server", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let values_server: string[] = [];
+ let resolve: () => void;
+ const promise = new Promise<void>(r => (resolve = r));
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+
+ const server = Bun.serve({
+ port: 0,
+ fetch(request, server) {
+ if (server.upgrade(request)) return null as any;
+ return new Response(s.getStore()!);
+ },
+ websocket: {
+ open(ws) {
+ values_server.push("open:" + s.getStore());
+ },
+ message(ws, message) {
+ values_server.push("message:" + s.getStore());
+ ws.close();
+ },
+ close(ws, code, message) {
+ values_server.push("close:" + s.getStore());
+ },
+ },
+ });
+
+ const ws = new WebSocket("ws://" + server.hostname + ":" + server.port);
+ ws.addEventListener("open", () => {
+ ws.send("hello");
+ });
+ ws.addEventListener("close", () => {
+ resolve();
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ await promise;
+ expect(values_server).toEqual(["open:value", "message:value", "close:value"]);
+ });
+ test.todo("WebSocket client", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let values_client: string[] = [];
+ let resolve: () => void;
+ const promise = new Promise<void>(r => (resolve = r));
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+
+ const server = Bun.serve({
+ port: 0,
+ fetch(request, server) {
+ if (server.upgrade(request)) return null as any;
+ return new Response(s.getStore()!);
+ },
+ websocket: {
+ open(ws) {},
+ message(ws, message) {
+ ws.close();
+ },
+ close(ws, code, message) {},
+ },
+ });
+
+ const ws = new WebSocket("ws://" + server.hostname + ":" + server.port);
+ ws.addEventListener("open", () => {
+ ws.send("hello");
+ values_client.push("open:" + s.getStore());
+ });
+ ws.addEventListener("close", () => {
+ resolve();
+ values_client.push("close:" + s.getStore());
+ });
+ });
+ expect(s.getStore()).toBe(undefined);
+ await promise;
+ expect(values_client).toEqual(["open:value", "close:value"]);
+ });
+ test("node:fs callback", async () => {
+ const fs = require("fs");
+ const s = new AsyncLocalStorage<string>();
+ let resolve: (x: string) => void;
+ const promise = new Promise<string>(r => (resolve = r));
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+ fs.readFile(import.meta.path, () => {
+ resolve(s.getStore()!);
+ });
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(await promise).toBe("value");
+ });
+ test("node:fs/promises", async () => {
+ const fs = require("fs").promises;
+ const s = new AsyncLocalStorage<string>();
+ let v!: string;
+ await s.run("value", async () => {
+ expect(s.getStore()).toBe("value");
+ await fs.readFile(import.meta.path).then(() => {
+ v = s.getStore()!;
+ });
+ expect(s.getStore()).toBe("value");
+ });
+ expect(s.getStore()).toBe(undefined);
+ expect(v).toBe("value");
+ });
+ test("Bun.build plugin", async () => {
+ const s = new AsyncLocalStorage<string>();
+ let a = undefined;
+ await s.run("value", async () => {
+ Bun.build({
+ entrypoints: [import.meta.path],
+ plugins: [
+ {
+ name: "test",
+ setup(build) {
+ a = s.getStore();
+ },
+ },
+ ],
+ });
+ });
+ expect(a).toBe("value");
+ });
+});
diff --git a/test/js/node/async_hooks/EventEmitterAsyncResource.test.ts b/test/js/node/async_hooks/EventEmitterAsyncResource.test.ts
new file mode 100644
index 000000000..bbcadf9bf
--- /dev/null
+++ b/test/js/node/async_hooks/EventEmitterAsyncResource.test.ts
@@ -0,0 +1,29 @@
+import EventEmitter, { EventEmitterAsyncResource } from "events";
+import { AsyncLocalStorage } from "async_hooks";
+import { describe, test, expect } from "bun:test";
+
+describe("EventEmitterAsyncResource", () => {
+ test("is an EventEmitter", () => {
+ const ee = new EventEmitterAsyncResource("test");
+ expect(ee).toBeInstanceOf(EventEmitterAsyncResource);
+ expect(ee).toBeInstanceOf(EventEmitter);
+ });
+ test("has context tracking", () => {
+ let ee;
+ const asl = new AsyncLocalStorage();
+ asl.run(123, () => {
+ ee = new EventEmitterAsyncResource("test");
+ });
+
+ let val;
+ ee.on("test", () => {
+ val = asl.getStore();
+ });
+
+ asl.run(456, () => {
+ ee.emit("test");
+ });
+
+ expect(val).toBe(123);
+ });
+});
diff --git a/test/js/node/async_hooks/async_hooks.node.test.ts b/test/js/node/async_hooks/async_hooks.node.test.ts
new file mode 100644
index 000000000..107c58a8c
--- /dev/null
+++ b/test/js/node/async_hooks/async_hooks.node.test.ts
@@ -0,0 +1,34 @@
+import { AsyncLocalStorage } from "async_hooks";
+import assert from "assert";
+
+test("node async_hooks.AsyncLocalStorage enable disable", async () => {
+ const asyncLocalStorage = new AsyncLocalStorage();
+
+ asyncLocalStorage.run(new Map(), () => {
+ asyncLocalStorage.getStore().set("foo", "bar");
+ process.nextTick(() => {
+ assert.strictEqual(asyncLocalStorage.getStore().get("foo"), "bar");
+ process.nextTick(() => {
+ assert.strictEqual(asyncLocalStorage.getStore(), undefined);
+ });
+
+ asyncLocalStorage.disable();
+ assert.strictEqual(asyncLocalStorage.getStore(), undefined);
+
+ // Calls to exit() should not mess with enabled status
+ asyncLocalStorage.exit(() => {
+ assert.strictEqual(asyncLocalStorage.getStore(), undefined);
+ });
+ assert.strictEqual(asyncLocalStorage.getStore(), undefined);
+
+ process.nextTick(() => {
+ assert.strictEqual(asyncLocalStorage.getStore(), undefined);
+ asyncLocalStorage.run(new Map().set("bar", "foo"), () => {
+ assert.strictEqual(asyncLocalStorage.getStore().get("bar"), "foo");
+
+ done();
+ });
+ });
+ });
+ });
+});