aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorGravatar Ashcon Partovi <ashcon@partovi.net> 2023-07-13 09:39:43 -0700
committerGravatar GitHub <noreply@github.com> 2023-07-13 09:39:43 -0700
commit9eb8eea2a81be6a20abb62544dc54a35ff4173a5 (patch)
tree63ab7bc037477b0db836067ac6c49c273d251353 /test
parent04b4157232006c882cdd693f566b31945618b924 (diff)
downloadbun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.tar.gz
bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.tar.zst
bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.zip
Implement `ping()`, `pong()`, `terminate()` for WebSocket client and server (#3257)
Diffstat (limited to 'test')
-rw-r--r--test/harness.ts6
-rw-r--r--test/js/bun/websocket/websocket-client-echo.mjs70
-rw-r--r--test/js/bun/websocket/websocket-server.test.ts1712
-rw-r--r--test/js/first_party/ws/ws.test.ts284
-rw-r--r--test/js/web/websocket/websocket-client.test.ts280
-rw-r--r--test/js/web/websocket/websocket-server-echo.mjs94
6 files changed, 1265 insertions, 1181 deletions
diff --git a/test/harness.ts b/test/harness.ts
index b0cedba74..8b850bbfc 100644
--- a/test/harness.ts
+++ b/test/harness.ts
@@ -1,4 +1,4 @@
-import { gc as bunGC, unsafe } from "bun";
+import { gc as bunGC, unsafe, which } from "bun";
import { heapStats } from "bun:jsc";
import path from "path";
import fs from "fs";
@@ -16,6 +16,10 @@ export function bunExe() {
return process.execPath;
}
+export function nodeExe(): string | null {
+ return which("node") || null;
+}
+
export function gc(force = true) {
bunGC(force);
}
diff --git a/test/js/bun/websocket/websocket-client-echo.mjs b/test/js/bun/websocket/websocket-client-echo.mjs
new file mode 100644
index 000000000..d721f673c
--- /dev/null
+++ b/test/js/bun/websocket/websocket-client-echo.mjs
@@ -0,0 +1,70 @@
+import { WebSocket } from "ws";
+
+let url;
+try {
+ url = new URL(process.argv[2]);
+} catch {
+ throw new Error(`Usage: ${process.argv0} websocket-client-echo.mjs <url>`);
+}
+
+const ws = new WebSocket(url, {
+ perMessageDeflate: false,
+});
+
+ws.on("open", () => {
+ console.log("Connected", ws.url); // read by test script
+ console.error("Connected", ws.url);
+});
+
+ws.on("message", (data, isBinary) => {
+ if (isBinary) {
+ console.error("Received binary message:", data);
+ } else {
+ console.error("Received text message:", data);
+ }
+ ws.send(data, { binary: isBinary });
+
+ if (data === "ping") {
+ console.error("Sending ping");
+ ws.ping();
+ } else if (data === "pong") {
+ console.error("Sending pong");
+ ws.pong();
+ } else if (data === "close") {
+ console.error("Sending close");
+ ws.close();
+ } else if (data === "terminate") {
+ console.error("Sending terminate");
+ ws.terminate();
+ }
+});
+
+ws.on("ping", data => {
+ console.error("Received ping:", data);
+ ws.ping(data);
+});
+
+ws.on("pong", data => {
+ console.error("Received pong:", data);
+ ws.pong(data);
+});
+
+ws.on("error", error => {
+ console.error("Received error:", error);
+});
+
+ws.on("close", (code, reason, wasClean) => {
+ if (wasClean === true) {
+ console.error("Received abrupt close:", code, reason);
+ } else {
+ console.error("Received close:", code, reason);
+ }
+});
+
+ws.on("redirect", url => {
+ console.error("Received redirect:", url);
+});
+
+ws.on("unexpected-response", (_, response) => {
+ console.error("Received unexpected response:", response.statusCode, { ...response.headers });
+});
diff --git a/test/js/bun/websocket/websocket-server.test.ts b/test/js/bun/websocket/websocket-server.test.ts
index 7a79f9f5f..eb9b773a3 100644
--- a/test/js/bun/websocket/websocket-server.test.ts
+++ b/test/js/bun/websocket/websocket-server.test.ts
@@ -1,1255 +1,607 @@
-import { describe, expect, it } from "bun:test";
-import { gcTick } from "harness";
-import { serve, ServerWebSocket } from "bun";
-
-describe("websocket server", () => {
- it("send & receive empty messages", done => {
- const serverReceived: any[] = [];
- const clientReceived: any[] = [];
- var clientDone = false;
- var serverDone = false;
-
- let server = Bun.serve({
- websocket: {
- open(ws) {
- ws.send("");
- ws.send(new ArrayBuffer(0));
- },
- message(ws, data) {
- serverReceived.push(data);
-
- if (serverReceived.length === 2) {
- if (serverReceived.find(d => d === "") === undefined) {
- done(new Error("expected empty string"));
- }
-
- if (!serverReceived.find(d => d.byteLength === 0)) {
- done(new Error("expected empty Buffer"));
- }
-
- serverDone = true;
+import { describe, it, expect, afterEach } from "bun:test";
+import type { Server, Subprocess, WebSocketHandler } from "bun";
+import { serve, spawn } from "bun";
+import { bunEnv, bunExe, nodeExe } from "harness";
+import { drainMicrotasks, fullGC } from "bun:jsc";
+
+const strings = [
+ {
+ label: "string (ascii)",
+ message: "ascii",
+ bytes: [0x61, 0x73, 0x63, 0x69, 0x69],
+ },
+ {
+ label: "string (latin1)",
+ message: "latin1-©",
+ bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9],
+ },
+ {
+ label: "string (utf-8)",
+ message: "utf8-😶",
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6],
+ },
+];
+
+const buffers = [
+ {
+ label: "Uint8Array (utf-8)",
+ message: new TextEncoder().encode("utf8-🙂"),
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82],
+ },
+ {
+ label: "ArrayBuffer (utf-8)",
+ message: new TextEncoder().encode("utf8-🙃").buffer,
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83],
+ },
+ {
+ label: "Buffer (utf-8)",
+ message: Buffer.from("utf8-🤩"),
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9],
+ },
+];
+
+const messages = [...strings, ...buffers];
+
+const binaryTypes = [
+ {
+ label: "nodebuffer",
+ type: Buffer,
+ },
+ {
+ label: "arraybuffer",
+ type: ArrayBuffer,
+ },
+ {
+ label: "uint8array",
+ type: Uint8Array,
+ },
+] as const;
+
+let servers: Server[] = [];
+let clients: Subprocess[] = [];
+
+afterEach(() => {
+ for (const server of servers) {
+ server.stop(true);
+ }
+ for (const client of clients) {
+ client.kill();
+ }
+});
- if (clientDone && serverDone) {
- z.close();
- server.stop(true);
- done();
- }
+describe("Server", () => {
+ describe("websocket", () => {
+ test("open", done => ({
+ open(ws) {
+ expect(ws).toBeDefined();
+ expect(ws).toHaveProperty("data", { id: 0 });
+ done();
+ },
+ }));
+ test("close", done => ({
+ open(ws) {
+ ws.close();
+ },
+ close(ws, code, reason) {
+ expect(ws).toBeDefined();
+ expect(ws).toHaveProperty("data", { id: 0 });
+ expect(code).toBeInteger();
+ expect(reason).toBeString();
+ done();
+ },
+ }));
+ test("message", done => ({
+ open(ws) {
+ ws.send("Hello");
+ },
+ message(ws, data) {
+ expect(ws).toBeDefined();
+ expect(ws).toHaveProperty("data", { id: 0 });
+ expect(data).toBeDefined();
+ done();
+ },
+ }));
+ test("drain", done => ({
+ backpressureLimit: 1,
+ open(ws) {
+ const data = new Uint8Array(1 * 1024 * 1024);
+ // send data until backpressure is triggered
+ for (let i = 0; i < 10; i++) {
+ if (ws.send(data) < 1) { // backpressure or dropped
+ break;
}
- },
- close() {},
- },
- fetch(req, server) {
- if (!server.upgrade(req)) {
- return new Response(null, { status: 404 });
}
},
- port: 0,
- });
-
- let z = new WebSocket(`ws://${server.hostname}:${server.port}`);
- z.onmessage = e => {
- clientReceived.push(e.data);
-
- if (clientReceived.length === 2) {
- if (clientReceived.find(d => d === "") === undefined) {
- done(new Error("expected empty string"));
- }
-
- if (!clientReceived.find(d => d.byteLength === 0)) {
- done(new Error("expected empty Buffer"));
+ drain(ws) {
+ expect(ws).toBeDefined();
+ expect(ws).toHaveProperty("data", { id: 0 });
+ done();
+ },
+ }));
+ test("ping", done => ({
+ open(ws) {
+ ws.ping();
+ },
+ ping(ws, data) {
+ expect(ws).toBeDefined();
+ expect(ws).toHaveProperty("data", { id: 0 });
+ expect(data).toBeInstanceOf(Buffer);
+ done();
+ },
+ }));
+ test("pong", done => ({
+ open(ws) {
+ ws.pong();
+ },
+ pong(ws, data) {
+ expect(ws).toBeDefined();
+ expect(ws).toHaveProperty("data", { id: 0 });
+ expect(data).toBeInstanceOf(Buffer);
+ done();
+ },
+ }));
+ test("maxPayloadLength", done => ({
+ maxPayloadLength: 4,
+ open(ws) {
+ ws.send("Hello!");
+ },
+ close(_, code) {
+ expect(code).toBe(1006);
+ done();
+ },
+ }));
+ test("backpressureLimit", done => ({
+ backpressureLimit: 1,
+ open(ws) {
+ const data = new Uint8Array(1 * 1024 * 1024);
+ expect(ws.send(data.slice(0, 1))).toBe(1); // sent
+ let backpressure;
+ for (let i = 0; i < 10; i++) {
+ if (ws.send(data) === -1) {
+ backpressure = true;
+ break;
+ }
}
-
- clientDone = true;
- if (clientDone && serverDone) {
- server.stop(true);
- z.close();
-
- done();
+ if (!backpressure) {
+ done(new Error("backpressure not triggered"));
+ return;
}
- }
- };
- z.addEventListener("open", () => {
- z.send("");
- z.send(new Buffer(0));
- });
- });
-
- it("remoteAddress works", done => {
- let server = Bun.serve({
- websocket: {
- message() {},
- open(ws) {
- try {
- expect(ws.remoteAddress).toBe("127.0.0.1");
- done();
- } catch (e) {
- done(e);
+ let dropped;
+ for (let i = 0; i < 10; i++) {
+ if (ws.send(data) === 0) {
+ dropped = true;
+ break;
}
- },
- close() {},
- },
- fetch(req, server) {
- if (!server.upgrade(req)) {
- return new Response(null, { status: 404 });
}
- },
- port: 0,
- });
-
- let z = new WebSocket(`ws://${server.hostname}:${server.port}`);
- z.addEventListener("open", () => {
- setTimeout(() => z.close(), 0);
- });
- z.addEventListener("close", () => {
- server.stop();
- });
- });
- it("can do publish()", async done => {
- var server = serve({
- port: 0,
- websocket: {
- // FIXME: update this test to not rely on publishToSelf: true,
- publishToSelf: true,
-
- open(ws) {
- ws.subscribe("all");
- },
- message(ws, msg) {},
- close(ws) {},
- },
- fetch(req, server) {
- if (server.upgrade(req)) {
+ if (!dropped) {
+ done(new Error("message not dropped"));
return;
}
-
- return new Response("success");
- },
- });
-
- await new Promise<void>((resolve2, reject2) => {
- var socket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- var clientCounter = 0;
-
- socket.onmessage = e => {
- expect(e.data).toBe("hello");
- resolve2();
- };
- socket.onopen = () => {
- queueMicrotask(() => {
- server.publish("all", "hello");
- });
- };
- });
- server.stop(true);
- done();
- });
-
- it("can do publish() with publishToSelf: false", async done => {
- var server = serve({
- port: 0,
- websocket: {
- open(ws) {
- ws.subscribe("all");
- ws.publish("all", "hey");
- server.publish("all", "hello");
- },
- message(ws, msg) {
- if (new TextDecoder().decode(msg as Uint8Array) !== "hello") {
- done(new Error("unexpected message"));
+ done();
+ },
+ }));
+ // FIXME: close() callback is called, but only after timeout?
+ it.todo("closeOnBackpressureLimit");
+ /*
+ test("closeOnBackpressureLimit", done => ({
+ closeOnBackpressureLimit: true,
+ backpressureLimit: 1,
+ open(ws) {
+ const data = new Uint8Array(1 * 1024 * 1024);
+ // send data until backpressure is triggered
+ for (let i = 0; i < 10; i++) {
+ if (ws.send(data) < 1) {
+ return;
}
- },
- close(ws) {},
- publishToSelf: false,
- },
- fetch(req, server) {
- if (server.upgrade(req)) {
- return;
}
-
- return new Response("success");
+ done(new Error("backpressure not triggered"));
},
- });
-
- await new Promise<void>((resolve2, reject2) => {
- var socket = new WebSocket(`ws://${server.hostname}:${server.port}`);
-
- socket.onmessage = e => {
- expect(e.data).toBe("hello");
- resolve2();
- };
- });
- server.stop(true);
- done();
+ close(_, code) {
+ expect(code).toBe(1006);
+ done();
+ },
+ }));
+ */
+ it.todo("perMessageDeflate");
});
+});
- for (let method of ["publish", "publishText", "publishBinary"] as const) {
- describe(method, () => {
- it("in close() should work", async () => {
- var count = 0;
- var server = serve({
- port: 0,
- websocket: {
- // FIXME: update this test to not rely on publishToSelf: true,
- publishToSelf: true,
-
- open(ws) {
- ws.subscribe("all");
- },
- message(ws, msg) {},
- close(ws) {
- (ws[method] as any)("all", method === "publishBinary" ? Buffer.from("bye!") : "bye!");
- count++;
-
- if (count >= 2) {
- server.stop(true);
- }
- },
- },
- fetch(req, server) {
- if (server.upgrade(req)) {
- return;
- }
-
- return new Response("success");
- },
- });
-
+describe("ServerWebSocket", () => {
+ test("readyState", done => ({
+ open(ws) {
+ expect(ws.readyState).toBe(WebSocket.OPEN);
+ ws.close();
+ },
+ close(ws) {
+ expect(ws.readyState).toBe(WebSocket.CLOSED);
+ done();
+ },
+ }));
+ test("remoteAddress", done => ({
+ open(ws) {
+ expect(ws.remoteAddress).toMatch(/^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/);
+ done();
+ },
+ }));
+ describe("binaryType", () => {
+ test("(default)", done => ({
+ open(ws) {
+ expect(ws.binaryType).toBe("nodebuffer");
+ done();
+ },
+ }));
+ test("(invalid)", done => ({
+ open(ws) {
try {
- const first = await new Promise<WebSocket>((resolve2, reject2) => {
- var socket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- socket.onopen = () => resolve2(socket);
- });
-
- await new Promise<WebSocket>((resolve2, reject2) => {
- var socket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- socket.onopen = () => {
- queueMicrotask(() => first.close());
- };
- socket.onmessage = ev => {
- var msg = ev.data;
- if (typeof msg !== "string") {
- msg = new TextDecoder().decode(msg);
- }
-
- if (msg === "bye!") {
- socket.close(0);
- resolve2(socket);
- } else {
- reject2(msg);
- }
- };
- });
- } finally {
- server.stop(true);
- }
- });
- });
- }
-
- it("close inside open", async () => {
- var resolve: () => void;
- var server = serve({
- port: 0,
- websocket: {
- open(ws) {},
- message(ws, msg) {},
- close() {
- resolve();
- server.stop(true);
- },
- },
- fetch(req, server) {
- if (
- server.upgrade(req, {
- data: "hello world",
-
- // check that headers works
- headers: {
- "x-a": "text/plain",
- },
- })
- ) {
- if (server.upgrade(req)) {
- throw new Error("should not upgrade twice");
- }
- return;
+ // @ts-expect-error
+ ws.binaryType = "invalid";
+ done(new Error("Expected an error"));
+ } catch (cause) {
+ done();
}
-
- return new Response("noooooo hello world");
},
- });
-
- await new Promise<void>((resolve_, reject) => {
- resolve = resolve_;
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onopen = () => {
- websocket.close();
- };
- websocket.onmessage = e => {};
- websocket.onerror = e => {};
- });
- });
-
- it("headers error doesn't crash", async () => {
- await new Promise<void>((resolve, reject) => {
- const server = serve({
- port: 0,
- websocket: {
- open(ws) {
- ws.close();
- },
- message(ws, msg) {},
- close() {
- resolve();
- server.stop(true);
- },
+ }));
+ for (const { label, type } of binaryTypes) {
+ test(label, done => ({
+ open(ws) {
+ ws.binaryType = label;
+ expect(ws.binaryType).toBe(label);
+ ws.send(new Uint8Array(1));
},
- error(err) {
- resolve();
- server.stop(true);
+ message(ws, received) {
+ expect(received).toBeInstanceOf(type);
+ ws.ping();
},
- fetch(req, server) {
- expect(() => {
- if (
- server.upgrade(req, {
- data: "hello world",
- headers: 1238 as any,
- })
- ) {
- reject(new Error("should not upgrade"));
- return new Response("should not upgrade");
- }
- }).toThrow("upgrade options.headers must be a Headers or an object");
- resolve();
- return new Response("success");
+ ping(ws, received) {
+ expect(received).toBeInstanceOf(type);
+ ws.pong();
},
- });
-
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onopen = () => websocket.close();
- websocket.onmessage = e => {};
- websocket.onerror = e => {};
- });
+ pong(_, received) {
+ expect(received).toBeInstanceOf(type);
+ done();
+ },
+ }));
+ }
});
- it("can do hello world", async () => {
- const server = serve({
- port: 0,
- websocket: {
+ describe("send()", () => {
+ for (const { label, message, bytes } of messages) {
+ test(label, done => ({
open(ws) {
- server.stop();
+ ws.send(message);
},
- message(ws, msg) {
- ws.send("hello world");
- },
- },
- fetch(req, server) {
- server.stop();
- if (
- server.upgrade(req, {
- data: "hello world",
-
- // check that headers works
- headers: {
- "x-a": "text/plain",
- },
- })
- ) {
- if (server.upgrade(req)) {
- throw new Error("should not upgrade twice");
+ message(_, received) {
+ if (typeof received === "string") {
+ expect(received).toBe(message);
+ } else {
+ expect(received).toEqual(Buffer.from(bytes));
}
- return;
- }
-
- return new Response("noooooo hello world");
+ done();
+ },
+ }));
+ }
+ test(
+ "(benchmark)",
+ (done, connect) => {
+ const maxClients = 10;
+ const maxMessages = 10_000;
+ let count = 0;
+ return {
+ open(ws) {
+ if (ws.data.id < maxClients) {
+ connect();
+ }
+ for (let i = 0; i < maxMessages; i++) {
+ ws.send(`${i}`, true);
+ ws.sendText(`${i}`, true);
+ ws.sendBinary(Buffer.from(`${i}`), true);
+ }
+ },
+ message() {
+ if (++count === maxClients * maxMessages * 3) {
+ done();
+ }
+ },
+ };
},
- });
-
- await new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onopen = () => {
- websocket.send("hello world");
- };
- websocket.onmessage = e => {
- try {
- expect(e.data).toBe("hello world");
- resolve();
- } catch (r) {
- reject(r);
- } finally {
- websocket.close();
- }
- };
- websocket.onerror = e => {
- reject(e);
- };
- });
+ 30_000,
+ );
});
-
- it("fetch() allows a Response object to be returned for an upgraded ServerWebSocket", () => {
- const server = serve({
- port: 0,
- websocket: {
+ describe("sendBinary()", () => {
+ for (const { label, message, bytes } of buffers) {
+ test(label, done => ({
open(ws) {
- server.stop();
+ ws.sendBinary(message);
},
- message(ws, msg) {
- ws.send("hello world");
+ message(_, received) {
+ expect(received).toEqual(Buffer.from(bytes));
+ done();
},
- },
- error(err) {
- console.error(err);
- },
- fetch(req, server) {
- server.stop();
- if (
- server.upgrade(req, {
- data: "hello world",
-
- // check that headers works
- headers: {
- "x-a": "text/plain",
- },
- })
- ) {
- if (server.upgrade(req)) {
- throw new Error("should not upgrade twice");
- }
- return new Response("lol!", {
- status: 101,
- });
- }
-
- return new Response("noooooo hello world");
- },
- });
-
- return new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onopen = () => {
- websocket.send("hello world");
- };
- websocket.onmessage = e => {
- try {
- expect(e.data).toBe("hello world");
- resolve();
- } catch (r) {
- reject(r);
- } finally {
- websocket.close();
- }
- };
- websocket.onerror = e => {
- reject(e);
- };
- });
+ }));
+ }
});
-
- it("fetch() allows a Promise<Response> object to be returned for an upgraded ServerWebSocket", () => {
- const server = serve({
- port: 0,
- websocket: {
- async open(ws) {
- server.stop();
+ describe("sendText()", () => {
+ for (const { label, message } of strings) {
+ test(label, done => ({
+ open(ws) {
+ ws.sendText(message);
},
- async message(ws, msg) {
- await 1;
- ws.send("hello world");
+ message(_, received) {
+ expect(received).toEqual(message);
+ done();
},
- },
- error(err) {
- console.error(err);
- },
- async fetch(req, server) {
- server.stop();
- await 1;
- if (
- server.upgrade(req, {
- data: "hello world",
-
- // check that headers works
- headers: {
- "x-a": "text/plain",
- },
- })
- ) {
- if (server.upgrade(req)) {
- throw new Error("should not upgrade twice");
- }
- return new Response("lol!", {
- status: 101,
- });
- }
-
- return new Response("noooooo hello world");
- },
- });
- return new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onopen = () => {
- websocket.send("hello world");
- };
- websocket.onmessage = e => {
- try {
- expect(e.data).toBe("hello world");
- resolve();
- } catch (r) {
- reject(r);
- } finally {
- websocket.close();
- }
- };
- websocket.onerror = e => {
- reject(e);
- };
- });
+ }));
+ }
});
- it("binaryType works", async () => {
- var done = false;
- const server = serve({
- port: 0,
- websocket: {
+ describe("subscribe()", () => {
+ for (const { label, message } of strings) {
+ test(label, done => ({
open(ws) {
- server.stop();
+ expect(ws.isSubscribed(message)).toBeFalse();
+ ws.subscribe(message);
+ expect(ws.isSubscribed(message)).toBeTrue();
+ ws.unsubscribe(message);
+ expect(ws.isSubscribed(message)).toBeFalse();
+ done();
},
- message(ws, msg) {
- // The first message is supposed to be "uint8array"
- // Then after uint8array, we switch it to "nodebuffer"
- // Then after nodebuffer, we switch it to "arraybuffer"
- // and then we're done
- switch (ws.binaryType) {
- case "nodebuffer": {
- for (let badType of [
- 123,
- NaN,
- Symbol("uint8array"),
- "uint16array",
- "uint32array",
- "float32array",
- "float64array",
- "garbage",
- ]) {
- expect(() => {
- /* @ts-ignore */
- ws.binaryType = badType;
- }).toThrow();
- }
- expect(ws.binaryType).toBe("nodebuffer");
- ws.binaryType = "uint8array";
- expect(ws.binaryType).toBe("uint8array");
- expect(msg instanceof Uint8Array).toBe(true);
- expect(Buffer.isBuffer(msg)).toBe(true);
- break;
- }
-
- case "uint8array": {
- expect(ws.binaryType).toBe("uint8array");
- ws.binaryType = "arraybuffer";
- expect(ws.binaryType).toBe("arraybuffer");
- expect(msg instanceof Uint8Array).toBe(true);
- expect(Buffer.isBuffer(msg)).toBe(false);
- break;
- }
-
- case "arraybuffer": {
- expect(ws.binaryType).toBe("arraybuffer");
- expect(msg instanceof ArrayBuffer).toBe(true);
- done = true;
- break;
- }
-
- default: {
- throw new Error("unknown binaryType");
- }
+ }));
+ }
+ });
+ describe("publish()", () => {
+ for (const { label, message, bytes } of messages) {
+ const topic = typeof message === "string" ? message : label;
+ test(label, (done, connect) => ({
+ async open(ws) {
+ ws.subscribe(topic);
+ if (ws.data.id === 0) {
+ connect();
+ } else if (ws.data.id === 1) {
+ ws.publish(topic, message);
}
-
- ws.send("hello world");
},
- },
- fetch(req, server) {
- server.stop();
- if (server.upgrade(req, { data: "hello world" })) {
- if (server.upgrade(req)) {
- throw new Error("should not upgrade twice");
+ message(ws, received) {
+ if (ws.data.id === 1) {
+ throw new Error("Expected publish() to not send to self");
}
- return;
- }
-
- return new Response("noooooo hello world");
- },
- });
-
- const isDone = await new Promise<boolean>((resolve, reject) => {
- var counter = 0;
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onopen = () => {
- websocket.send(Buffer.from("hello world"));
- };
- websocket.onmessage = e => {
- try {
- expect(e.data).toBe("hello world");
-
- if (counter++ > 2) {
- websocket.close();
- resolve(done);
+ if (typeof message === "string") {
+ expect(received).toBe(message);
+ } else {
+ expect(received).toEqual(Buffer.from(bytes));
}
- websocket.send(Buffer.from("oaksd"));
- } catch (r) {
- websocket.close();
- reject(r);
- }
- };
- websocket.onerror = e => {
- reject(e);
- };
- });
- expect(isDone).toBe(true);
- });
-
- it("does not upgrade for non-websocket connections", async () => {
- await new Promise<void>(async (resolve, reject) => {
- var server = serve({
- port: 0,
- websocket: {
- open(ws) {
- ws.send("hello world");
- },
- message(ws, msg) {},
+ done();
},
- fetch(req, server) {
- if (server.upgrade(req)) {
- reject(new Error("should not upgrade"));
+ }));
+ }
+ });
+ describe("publishBinary()", () => {
+ for (const { label, message, bytes } of buffers) {
+ test(label, (done, connect) => ({
+ async open(ws) {
+ ws.subscribe(label);
+ if (ws.data.id === 0) {
+ connect();
+ } else if (ws.data.id === 1) {
+ ws.publishBinary(label, message);
}
-
- return new Response("success");
},
- });
-
- const response = await fetch(`http://${server.hostname}:${server.port}`);
- expect(await response.text()).toBe("success");
- resolve();
- server.stop(true);
- });
- });
-
- it("does not upgrade for non-websocket servers", async () => {
- await new Promise<void>(async (resolve, reject) => {
- const server = serve({
- port: 0,
- fetch(req, server) {
- server.stop();
- expect(() => {
- server.upgrade(req);
- }).toThrow('To enable websocket support, set the "websocket" object in Bun.serve({})');
- return new Response("success");
+ message(ws, received) {
+ if (ws.data.id === 1) {
+ throw new Error("Expected publish() to not send to self");
+ }
+ expect(received).toEqual(Buffer.from(bytes));
+ done();
},
- });
-
- const response = await fetch(`http://${server.hostname}:${server.port}`);
- expect(await response.text()).toBe("success");
- resolve();
- });
+ }));
+ }
});
-
- it("async can do hello world", async () => {
- const server = serve({
- port: 0,
- websocket: {
+ describe("publishText()", () => {
+ for (const { label, message } of strings) {
+ test(label, (done, connect) => ({
async open(ws) {
- server.stop(true);
- ws.send("hello world");
- },
- message(ws, msg) {},
- },
- async fetch(req, server) {
- server.stop();
- await 1;
- if (server.upgrade(req)) return;
-
- return new Response("noooooo hello world");
- },
- });
-
- await new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
-
- websocket.onmessage = e => {
- try {
- expect(e.data).toBe("hello world");
- resolve();
- } catch (r) {
- reject(r);
- } finally {
- websocket.close();
- }
- };
- websocket.onerror = e => {
- reject(e);
- };
- });
- });
-
- it("publishText()", async () => {
- await new Promise<void>((resolve, reject) => {
- var websocket: WebSocket;
- var server = serve({
- port: 0,
- websocket: {
- // FIXME: update this test to not rely on publishToSelf: true,
- publishToSelf: true,
-
- async open(ws) {
- // we don't care about the data
- // we just want to make sure the DOMJIT call doesn't crash
- for (let i = 0; i < 40_000; i++) ws.publishText("hello", "world");
- websocket.close();
- server.stop(true);
- resolve();
- },
- message(ws, msg) {},
+ ws.subscribe(label);
+ if (ws.data.id === 0) {
+ connect();
+ } else if (ws.data.id === 1) {
+ ws.publishText(label, message);
+ }
},
- async fetch(req, server) {
- await 1;
- if (server.upgrade(req)) return;
-
- return new Response("noooooo hello world");
+ message(ws, received) {
+ if (ws.data.id === 1) {
+ throw new Error("Expected publish() to not send to self");
+ }
+ expect(received).toEqual(message);
+ done();
},
- });
-
- websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- });
+ }));
+ }
});
-
- it("publishBinary()", async () => {
- const bytes = Buffer.from("hello");
-
- await new Promise<void>((resolve, reject) => {
- var websocket: WebSocket;
- var server = serve({
- port: 0,
- websocket: {
- // FIXME: update this test to not rely on publishToSelf: true,
- publishToSelf: true,
-
- async open(ws) {
- // we don't care about the data
- // we just want to make sure the DOMJIT call doesn't crash
- for (let i = 0; i < 40_000; i++) ws.publishBinary("hello", bytes);
- websocket.close();
- server.stop(true);
- resolve();
- },
- message(ws, msg) {},
- },
- async fetch(req, server) {
- await 1;
- if (server.upgrade(req)) return;
-
- return new Response("noooooo hello world");
+ describe("publish() with { publishToSelf: true }", () => {
+ for (const { label, message, bytes } of messages) {
+ const topic = typeof message === "string" ? message : label;
+ test(label, done => ({
+ publishToSelf: true,
+ async open(ws) {
+ ws.subscribe(topic);
+ ws.publish(topic, message);
+ },
+ message(_, received) {
+ if (typeof message === "string") {
+ expect(received).toBe(message);
+ } else {
+ expect(received).toEqual(Buffer.from(bytes));
+ }
+ done();
},
- });
-
- websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- });
+ }));
+ }
});
-
- it("sendText()", async () => {
- await new Promise<void>((resolve, reject) => {
- var websocket: WebSocket;
- var server = serve({
- port: 0,
- websocket: {
- async open(ws) {
- // we don't care about the data
- // we just want to make sure the DOMJIT call doesn't crash
- for (let i = 0; i < 40_000; i++) ws.sendText("hello world", true);
- resolve();
- websocket.close();
- server.stop(true);
- },
- message(ws, msg) {},
+ describe("ping()", () => {
+ test("(no argument)", done => ({
+ open(ws) {
+ ws.ping();
+ },
+ ping(_, received) {
+ expect(received).toBeEmpty();
+ done();
+ },
+ }));
+ for (const { label, message, bytes } of messages) {
+ test(label, done => ({
+ open(ws) {
+ ws.ping(message);
},
- async fetch(req, server) {
- await 1;
- if (server.upgrade(req)) return;
-
- return new Response("noooooo hello world");
+ ping(_, received) {
+ expect(received).toEqual(Buffer.from(bytes));
+ done();
},
- });
- websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- });
+ }));
+ }
});
-
- it("sendBinary()", async () => {
- const bytes = Buffer.from("hello");
- await new Promise<void>((resolve, reject) => {
- var websocket: WebSocket;
- var server = serve({
- port: 0,
- websocket: {
- async open(ws) {
- // we don't care about the data
- // we just want to make sure the DOMJIT call doesn't crash
- for (let i = 0; i < 40_000; i++) ws.sendBinary(bytes, true);
- websocket.close();
- server.stop(true);
- resolve();
- },
- message(ws, msg) {},
+ describe("pong()", () => {
+ test("(no argument)", done => ({
+ open(ws) {
+ ws.pong();
+ },
+ pong(_, received) {
+ expect(received).toBeEmpty();
+ done();
+ },
+ }));
+ for (const { label, message, bytes } of messages) {
+ test(label, done => ({
+ open(ws) {
+ ws.pong(message);
},
- async fetch(req, server) {
- await 1;
- if (server.upgrade(req)) return;
-
- return new Response("noooooo hello world");
+ pong(_, received) {
+ expect(received).toEqual(Buffer.from(bytes));
+ done();
},
- });
-
- websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- });
+ }));
+ }
});
-
- it("can do hello world corked", async () => {
- const server = serve({
- port: 0,
- websocket: {
- open(ws) {
- server.stop();
- ws.send("hello world");
- },
- message(ws, msg) {
+ test("cork()", done => {
+ let count = 0;
+ return {
+ open(ws) {
+ setTimeout(() => {
ws.cork(() => {
- ws.send("hello world");
+ ws.send("1");
+ ws.sendText("2");
+ ws.sendBinary(new TextEncoder().encode("3"));
});
- },
- },
- fetch(req, server) {
- server.stop();
- if (server.upgrade(req)) return;
- return new Response("noooooo hello world");
+ }, 5);
},
- });
-
- await new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
-
- websocket.onmessage = e => {
- try {
- expect(e.data).toBe("hello world");
- resolve();
- } catch (r) {
- reject(r);
- } finally {
- websocket.close();
+ message(_, message) {
+ if (typeof message === "string") {
+ expect(+message).toBe(++count);
+ } else {
+ expect(+new TextDecoder().decode(message)).toBe(++count);
}
- };
- websocket.onerror = e => {
- reject(e);
- };
- });
- server.stop(true);
- });
-
- it("can do some back and forth", async () => {
- var dataCount = 0;
- const server = serve({
- port: 0,
- websocket: {
- open(ws) {
- server.stop();
- },
- message(ws, msg) {
- if (msg === "first") {
- ws.send("first");
- return;
- }
- ws.send(`counter: ${dataCount++}`);
- },
- },
- fetch(req, server) {
- server.stop();
- if (
- server.upgrade(req, {
- data: { count: 0 },
- })
- )
- return new Response("noooooo hello world");
- },
- });
-
- await new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onerror = e => {
- reject(e);
- };
-
- var counter = 0;
- websocket.onopen = () => websocket.send("first");
- websocket.onmessage = e => {
- try {
- switch (counter++) {
- case 0: {
- expect(e.data).toBe("first");
- websocket.send("where are the loops");
- break;
- }
- case 1: {
- expect(e.data).toBe("counter: 0");
- websocket.send("br0ther may i have some loops");
- break;
- }
- case 2: {
- expect(e.data).toBe("counter: 1");
- websocket.send("br0ther may i have some loops");
- break;
- }
- case 3: {
- expect(e.data).toBe("counter: 2");
- resolve();
- break;
- }
- }
- } catch (r) {
- reject(r);
- websocket.close();
+ if (count === 3) {
+ done();
}
- };
- });
- server.stop(true);
- });
-
- it("send rope strings", async () => {
- var ropey = "hello world".repeat(10);
- var sendQueue: any[] = [];
- for (var i = 0; i < 20; i++) {
- sendQueue.push(ropey + " " + i);
- }
-
- var serverCounter = 0;
- var clientCounter = 0;
-
- const server = serve({
- port: 0,
- websocket: {
- open(ws) {},
- message(ws, msg) {
- ws.send(sendQueue[serverCounter++] + " ");
- serverCounter % 10 === 0 && gcTick();
- },
- },
- fetch(req, server) {
- if (
- server.upgrade(req, {
- data: { count: 0 },
- })
- )
- return;
-
- return new Response("noooooo hello world");
},
- });
- try {
- await new Promise<void>((resolve, reject) => {
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onerror = e => {
- reject(e);
- };
-
- websocket.onopen = () => {
- server.stop();
- websocket.send("first");
- };
-
- websocket.onmessage = e => {
- try {
- const expected = sendQueue[clientCounter++] + " ";
- expect(e.data).toBe(expected);
- websocket.send("next");
- if (clientCounter === sendQueue.length) {
- websocket.close();
- resolve();
- }
- } catch (r) {
- reject(r);
- console.error(r);
- websocket.close();
- }
- };
- });
- } catch (e) {
- throw e;
- } finally {
- server.stop(true);
- }
+ };
});
-
- // this test sends 50 messages to 10 connected clients via pubsub
- it("pub/sub", async () => {
- var ropey = "hello world".repeat(10);
- var sendQueue: any[] = [];
- for (var i = 0; i < 50; i++) {
- sendQueue.push(ropey + " " + i);
- }
-
- var serverCounter = 0;
- var clientCount = 0;
- const server = serve({
- port: 0,
- websocket: {
+ describe("close()", () => {
+ test("(no arguments)", done => ({
+ open(ws) {
+ ws.close();
+ },
+ close(_, code, reason) {
+ expect(code).toBe(1000);
+ expect(reason).toBeEmpty();
+ done();
+ },
+ }));
+ test("(no reason)", done => ({
+ open(ws) {
+ ws.close(1001);
+ },
+ close(_, code, reason) {
+ expect(code).toBe(1001);
+ expect(reason).toBeEmpty();
+ done();
+ },
+ }));
+ for (const { label, message } of strings) {
+ test(label, done => ({
open(ws) {
- ws.subscribe("test");
- if (!ws.isSubscribed("test")) {
- throw new Error("not subscribed");
- }
- ws.unsubscribe("test");
- if (ws.isSubscribed("test")) {
- throw new Error("subscribed");
- }
- ws.subscribe("test");
- clientCount++;
- if (clientCount === 10) {
- setTimeout(() => server.publish("test", "hello world"), 1);
- }
+ ws.close(1002, message);
},
- message(ws, msg) {
- if (serverCounter < sendQueue.length) server.publish("test", sendQueue[serverCounter++] + " ");
+ close(_, code, reason) {
+ expect(code).toBe(1002);
+ expect(reason).toBe(message);
+ done();
},
- },
- fetch(req) {
- if (
- server.upgrade(req, {
- data: { count: 0 },
- })
- )
- return;
- return new Response("noooooo hello world");
- },
- });
- try {
- const connections = new Array(10);
- const websockets = new Array(connections.length);
- var doneCounter = 0;
- await new Promise<void>(done => {
- for (var i = 0; i < connections.length; i++) {
- var j = i;
- var resolve: (_?: unknown) => void,
- reject: (_?: unknown) => void,
- resolveConnection: (_?: unknown) => void,
- rejectConnection: (_?: unknown) => void;
- connections[j] = new Promise((res, rej) => {
- resolveConnection = res;
- rejectConnection = rej;
- });
- websockets[j] = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
- websocket.onerror = e => {
- reject(e);
- };
- websocket.onclose = () => {
- doneCounter++;
- if (doneCounter === connections.length) {
- done();
- }
- };
- var hasOpened = false;
- websocket.onopen = () => {
- if (!hasOpened) {
- hasOpened = true;
- resolve(websocket);
- }
- };
-
- let clientCounter = -1;
- var hasSentThisTick = false;
-
- websocket.onmessage = e => {
- if (!hasOpened) {
- hasOpened = true;
- resolve(websocket);
- }
-
- if (e.data === "hello world") {
- clientCounter = 0;
- websocket.send("first");
- return;
- }
-
- try {
- expect(!!sendQueue.find(a => a + " " === e.data)).toBe(true);
-
- if (!hasSentThisTick) {
- websocket.send("second");
- hasSentThisTick = true;
- queueMicrotask(() => {
- hasSentThisTick = false;
- });
- }
-
- if (clientCounter++ === sendQueue.length - 1) {
- websocket.close();
- resolveConnection();
- }
- } catch (r) {
- console.error(r);
- websocket.close();
- rejectConnection(r);
- }
- };
- }
- });
- } catch (e) {
- throw e;
- } finally {
- server.stop(true);
- gcTick();
+ }));
}
-
- expect(serverCounter).toBe(sendQueue.length);
- }, 30_000);
- it("can close with reason and code #2631", done => {
- let timeout: any;
- let server = Bun.serve({
- websocket: {
- message(ws) {
- ws.close(2000, "test");
- },
- open(ws) {
- try {
- expect(ws.remoteAddress).toBe("127.0.0.1");
- } catch (e) {
- clearTimeout(timeout);
- done(e);
- }
- },
- close(ws, code, reason) {
- try {
- expect(code).toBe(2000);
- expect(reason).toBe("test");
- clearTimeout(timeout);
- done();
- } catch (e) {
- clearTimeout(timeout);
- done(e);
- }
- },
- },
- fetch(req, server) {
- if (!server.upgrade(req)) {
- return new Response(null, { status: 404 });
- }
- },
- port: 0,
- });
-
- let z = new WebSocket(`ws://${server.hostname}:${server.port}`);
- z.addEventListener("open", () => {
- z.send("test");
- });
- z.addEventListener("close", () => {
- server.stop();
- });
-
- timeout = setTimeout(() => {
- done(new Error("Did not close in time"));
- server.stop(true);
- }, 1000);
});
+ test("terminate()", done => ({
+ open(ws) {
+ ws.terminate();
+ },
+ close(_, code, reason) {
+ expect(code).toBe(1006);
+ expect(reason).toBeEmpty();
+ done();
+ },
+ }));
+});
- it("can close with code and without reason #2631", done => {
- let timeout: any;
- let server = Bun.serve({
- websocket: {
- message(ws) {
- ws.close(2000);
- },
- open(ws) {
- try {
- expect(ws.remoteAddress).toBe("127.0.0.1");
- } catch (e) {
- done(e);
- clearTimeout(timeout);
- }
- },
- close(ws, code, reason) {
- clearTimeout(timeout);
-
- try {
- expect(code).toBe(2000);
- expect(reason).toBe("");
- done();
- } catch (e) {
- done(e);
- }
- },
- },
- fetch(req, server) {
- if (!server.upgrade(req)) {
- return new Response(null, { status: 404 });
+function test(
+ label: string,
+ fn: (done: (err?: unknown) => void, connect: () => Promise<void>) => Partial<WebSocketHandler<{ id: number }>>,
+ timeout?: number,
+) {
+ it(
+ label,
+ testDone => {
+ let isDone = false;
+ const done = (err?: unknown) => {
+ if (!isDone) {
+ isDone = true;
+ server.stop();
+ testDone(err);
}
- },
- port: 0,
- });
-
- let z = new WebSocket(`ws://${server.hostname}:${server.port}`);
- z.addEventListener("open", () => {
- z.send("test");
- });
- z.addEventListener("close", () => {
- server.stop();
- });
-
- timeout = setTimeout(() => {
- done(new Error("Did not close in time"));
- server.stop(true);
- }, 1000);
- });
- it("can close without reason or code #2631", done => {
- let timeout: any;
- let server = Bun.serve({
- websocket: {
- message(ws) {
- ws.close();
- },
- open(ws) {
- try {
- expect(ws.remoteAddress).toBe("127.0.0.1");
- } catch (e) {
- clearTimeout(timeout);
- done(e);
+ };
+ let id = 0;
+ const server: Server = serve({
+ port: 0,
+ fetch(request, server) {
+ const data = { id: id++ };
+ if (server.upgrade(request, { data })) {
+ return;
}
+ return new Response();
},
- close(ws, code, reason) {
- clearTimeout(timeout);
- try {
- expect(code).toBe(1006);
- expect(reason).toBe("");
- done();
- } catch (e) {
- done(e);
- }
+ websocket: {
+ sendPings: false,
+ message() {},
+ ...fn(done, () => connect(server)),
},
- },
- fetch(req, server) {
- if (!server.upgrade(req)) {
- return new Response(null, { status: 404 });
- }
- },
- port: 0,
- });
-
- let z = new WebSocket(`ws://${server.hostname}:${server.port}`);
- z.addEventListener("open", () => {
- z.send("test");
- });
- z.addEventListener("close", () => {
- server.stop();
- });
-
- timeout = setTimeout(() => {
- done(new Error("Did not close in time"));
- server.stop(true);
- }, 1000);
+ });
+ servers.push(server);
+ connect(server);
+ },
+ { timeout: timeout ?? 1000 },
+ );
+}
+
+async function connect(server: Server): Promise<void> {
+ const url = new URL(`ws://${server.hostname}:${server.port}/`);
+ const { pathname } = new URL("./websocket-client-echo.mjs", import.meta.url);
+ // @ts-ignore
+ const client = spawn({
+ cmd: [nodeExe() ?? bunExe(), pathname, url],
+ cwd: import.meta.dir,
+ env: bunEnv,
+ stderr: "ignore",
+ stdout: "pipe",
});
-});
+ clients.push(client);
+ for await (const chunk of client.stdout) {
+ if (new TextDecoder().decode(chunk).includes("Connected")) {
+ return;
+ }
+ }
+}
diff --git a/test/js/first_party/ws/ws.test.ts b/test/js/first_party/ws/ws.test.ts
new file mode 100644
index 000000000..c0b56a200
--- /dev/null
+++ b/test/js/first_party/ws/ws.test.ts
@@ -0,0 +1,284 @@
+import { describe, it, expect, beforeEach, afterEach } from "bun:test";
+import type { Subprocess } from "bun";
+import { spawn } from "bun";
+import { bunEnv, bunExe, nodeExe } from "harness";
+import { WebSocket } from "ws";
+
+const strings = [
+ {
+ label: "string (ascii)",
+ message: "ascii",
+ bytes: [0x61, 0x73, 0x63, 0x69, 0x69],
+ },
+ {
+ label: "string (latin1)",
+ message: "latin1-©",
+ bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9],
+ },
+ {
+ label: "string (utf-8)",
+ message: "utf8-😶",
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6],
+ },
+];
+
+const buffers = [
+ {
+ label: "Uint8Array (utf-8)",
+ message: new TextEncoder().encode("utf8-🙂"),
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82],
+ },
+ {
+ label: "ArrayBuffer (utf-8)",
+ message: new TextEncoder().encode("utf8-🙃").buffer,
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83],
+ },
+ {
+ label: "Buffer (utf-8)",
+ message: Buffer.from("utf8-🤩"),
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9],
+ },
+];
+
+const messages = [...strings, ...buffers];
+
+const binaryTypes = [
+ {
+ label: "nodebuffer",
+ type: Buffer,
+ },
+ {
+ label: "arraybuffer",
+ type: ArrayBuffer,
+ },
+] as const;
+
+let servers: Subprocess[] = [];
+let clients: WebSocket[] = [];
+
+function cleanUp() {
+ for (const client of clients) {
+ client.terminate();
+ }
+ for (const server of servers) {
+ server.kill();
+ }
+}
+
+beforeEach(cleanUp);
+afterEach(cleanUp);
+
+describe("WebSocket", () => {
+ test("url", (ws, done) => {
+ expect(ws.url).toStartWith("ws://");
+ done();
+ });
+ test("readyState", (ws, done) => {
+ expect(ws.readyState).toBe(WebSocket.CONNECTING);
+ ws.on("open", () => {
+ expect(ws.readyState).toBe(WebSocket.OPEN);
+ ws.close();
+ });
+ ws.on("close", () => {
+ expect(ws.readyState).toBe(WebSocket.CLOSED);
+ done();
+ });
+ });
+ describe("binaryType", () => {
+ test("(default)", (ws, done) => {
+ expect(ws.binaryType).toBe("nodebuffer");
+ done();
+ });
+ test("(invalid)", (ws, done) => {
+ try {
+ // @ts-expect-error
+ ws.binaryType = "invalid";
+ done(new Error("Expected an error"));
+ } catch {
+ done();
+ }
+ });
+ for (const { label, type } of binaryTypes) {
+ test(label, (ws, done) => {
+ ws.binaryType = label;
+ ws.on("open", () => {
+ expect(ws.binaryType).toBe(label);
+ ws.send(new Uint8Array(1));
+ });
+ ws.on("message", (data, isBinary) => {
+ expect(data).toBeInstanceOf(type);
+ expect(isBinary).toBeTrue();
+ ws.ping();
+ });
+ ws.on("ping", (data) => {
+ expect(data).toBeInstanceOf(type);
+ ws.pong();
+ });
+ ws.on("pong", (data) => {
+ expect(data).toBeInstanceOf(type);
+ done();
+ });
+ });
+ }
+ });
+ describe("send()", () => {
+ for (const { label, message, bytes } of messages) {
+ test(label, (ws, done) => {
+ ws.on("open", () => {
+ ws.send(message);
+ });
+ ws.on("message", (data, isBinary) => {
+ if (typeof data === "string") {
+ expect(data).toBe(message);
+ expect(isBinary).toBeFalse();
+ } else {
+ expect(data).toEqual(Buffer.from(bytes));
+ expect(isBinary).toBeTrue();
+ }
+ done();
+ });
+ });
+ }
+ });
+ describe("ping()", () => {
+ test("(no argument)", (ws, done) => {
+ ws.on("open", () => {
+ ws.ping();
+ });
+ ws.on("ping", (data) => {
+ expect(data).toBeInstanceOf(Buffer);
+ done();
+ });
+ });
+ for (const { label, message, bytes } of messages) {
+ test(label, (ws, done) => {
+ ws.on("open", () => {
+ ws.ping(message);
+ });
+ ws.on("ping", (data) => {
+ expect(data).toEqual(Buffer.from(bytes));
+ done();
+ });
+ });
+ }
+ });
+ describe("pong()", () => {
+ test("(no argument)", (ws, done) => {
+ ws.on("open", () => {
+ ws.pong();
+ });
+ ws.on("pong", (data) => {
+ expect(data).toBeInstanceOf(Buffer);
+ done();
+ });
+ });
+ for (const { label, message, bytes } of messages) {
+ test(label, (ws, done) => {
+ ws.on("open", () => {
+ ws.pong(message);
+ });
+ ws.on("pong", (data) => {
+ expect(data).toEqual(Buffer.from(bytes));
+ done();
+ });
+ });
+ }
+ });
+ describe("close()", () => {
+ test("(no arguments)", (ws, done) => {
+ ws.on("open", () => {
+ ws.close();
+ });
+ ws.on("close", (code: number, reason: string, wasClean: boolean) => {
+ expect(code).toBe(1000);
+ expect(reason).toBeString();
+ expect(wasClean).toBeTrue();
+ done();
+ });
+ });
+ test("(no reason)", (ws, done) => {
+ ws.on("open", () => {
+ ws.close(1001);
+ });
+ ws.on("close", (code: number, reason: string, wasClean: boolean) => {
+ expect(code).toBe(1001);
+ expect(reason).toBeString();
+ expect(wasClean).toBeTrue();
+ done();
+ });
+ });
+ // FIXME: Encoding issue
+ // Expected: "latin1-©"
+ // Received: "latin1-©"
+ /*
+ for (const { label, message } of strings) {
+ test(label, (ws, done) => {
+ ws.on("open", () => {
+ ws.close(1002, message);
+ });
+ ws.on("close", (code, reason, wasClean) => {
+ expect(code).toBe(1002);
+ expect(reason).toBe(message);
+ expect(wasClean).toBeTrue();
+ done();
+ });
+ });
+ }
+ */
+ });
+ test("terminate()", (ws, done) => {
+ ws.on("open", () => {
+ ws.terminate();
+ });
+ ws.on("close", (code: number, reason: string, wasClean: boolean) => {
+ expect(code).toBe(1006);
+ expect(reason).toBeString();
+ expect(wasClean).toBeFalse();
+ done();
+ });
+ });
+});
+
+function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) {
+ it(
+ label,
+ testDone => {
+ let isDone = false;
+ const done = (err?: unknown) => {
+ if (!isDone) {
+ isDone = true;
+ testDone(err);
+ }
+ };
+ listen()
+ .then(url => {
+ const ws = new WebSocket(url);
+ clients.push(ws);
+ fn(ws, done);
+ })
+ .catch(done);
+ },
+ { timeout: timeout ?? 1000 },
+ );
+}
+
+async function listen(): Promise<URL> {
+ const { pathname } = new URL("../../web/websocket/websocket-server-echo.mjs", import.meta.url);
+ const server = spawn({
+ cmd: [nodeExe() ?? bunExe(), pathname],
+ cwd: import.meta.dir,
+ env: bunEnv,
+ stderr: "ignore",
+ stdout: "pipe",
+ });
+ servers.push(server);
+ for await (const chunk of server.stdout) {
+ const text = new TextDecoder().decode(chunk);
+ try {
+ return new URL(text);
+ } catch {
+ throw new Error(`Invalid URL: '${text}'`);
+ }
+ }
+ throw new Error("No URL found?");
+}
diff --git a/test/js/web/websocket/websocket-client.test.ts b/test/js/web/websocket/websocket-client.test.ts
new file mode 100644
index 000000000..470fa71c0
--- /dev/null
+++ b/test/js/web/websocket/websocket-client.test.ts
@@ -0,0 +1,280 @@
+import { describe, it, expect, beforeEach, afterEach } from "bun:test";
+import type { Subprocess } from "bun";
+import { spawn } from "bun";
+import { bunEnv, bunExe, nodeExe } from "harness";
+
+const strings = [
+ {
+ label: "string (ascii)",
+ message: "ascii",
+ bytes: [0x61, 0x73, 0x63, 0x69, 0x69],
+ },
+ {
+ label: "string (latin1)",
+ message: "latin1-©",
+ bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9],
+ },
+ {
+ label: "string (utf-8)",
+ message: "utf8-😶",
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6],
+ },
+];
+
+const buffers = [
+ {
+ label: "Uint8Array (utf-8)",
+ message: new TextEncoder().encode("utf8-🙂"),
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82],
+ },
+ {
+ label: "ArrayBuffer (utf-8)",
+ message: new TextEncoder().encode("utf8-🙃").buffer,
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83],
+ },
+ {
+ label: "Buffer (utf-8)",
+ message: Buffer.from("utf8-🤩"),
+ bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9],
+ },
+];
+
+const messages = [...strings, ...buffers];
+
+const binaryTypes = [
+ {
+ label: "nodebuffer",
+ type: Buffer,
+ },
+ {
+ label: "arraybuffer",
+ type: ArrayBuffer,
+ },
+] as const;
+
+let servers: Subprocess[] = [];
+let clients: WebSocket[] = [];
+
+function cleanUp() {
+ for (const client of clients) {
+ client.terminate();
+ }
+ for (const server of servers) {
+ server.kill();
+ }
+}
+
+beforeEach(cleanUp);
+afterEach(cleanUp);
+
+describe("WebSocket", () => {
+ test("url", (ws, done) => {
+ expect(ws.url).toStartWith("ws://");
+ done();
+ });
+ test("readyState", (ws, done) => {
+ expect(ws.readyState).toBe(WebSocket.CONNECTING);
+ ws.addEventListener("open", () => {
+ expect(ws.readyState).toBe(WebSocket.OPEN);
+ ws.close();
+ });
+ ws.addEventListener("close", () => {
+ expect(ws.readyState).toBe(WebSocket.CLOSED);
+ done();
+ });
+ });
+ describe("binaryType", () => {
+ test("(default)", (ws, done) => {
+ expect(ws.binaryType).toBe("nodebuffer");
+ done();
+ });
+ test("(invalid)", (ws, done) => {
+ try {
+ // @ts-expect-error
+ ws.binaryType = "invalid";
+ done(new Error("Expected an error"));
+ } catch {
+ done();
+ }
+ });
+ for (const { label, type } of binaryTypes) {
+ test(label, (ws, done) => {
+ ws.binaryType = label;
+ ws.addEventListener("open", () => {
+ expect(ws.binaryType).toBe(label);
+ ws.send(new Uint8Array(1));
+ });
+ ws.addEventListener("message", ({ data }) => {
+ expect(data).toBeInstanceOf(type);
+ ws.ping();
+ });
+ ws.addEventListener("ping", ({ data }) => {
+ expect(data).toBeInstanceOf(type);
+ ws.pong();
+ });
+ ws.addEventListener("pong", ({ data }) => {
+ expect(data).toBeInstanceOf(type);
+ done();
+ });
+ });
+ }
+ });
+ describe("send()", () => {
+ for (const { label, message, bytes } of messages) {
+ test(label, (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.send(message);
+ });
+ ws.addEventListener("message", ({ data }) => {
+ if (typeof data === "string") {
+ expect(data).toBe(message);
+ } else {
+ expect(data).toEqual(Buffer.from(bytes));
+ }
+ done();
+ });
+ });
+ }
+ });
+ describe("ping()", () => {
+ test("(no argument)", (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.ping();
+ });
+ ws.addEventListener("ping", ({ data }) => {
+ expect(data).toBeInstanceOf(Buffer);
+ done();
+ });
+ });
+ for (const { label, message, bytes } of messages) {
+ test(label, (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.ping(message);
+ });
+ ws.addEventListener("ping", ({ data }) => {
+ expect(data).toEqual(Buffer.from(bytes));
+ done();
+ });
+ });
+ }
+ });
+ describe("pong()", () => {
+ test("(no argument)", (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.pong();
+ });
+ ws.addEventListener("pong", ({ data }) => {
+ expect(data).toBeInstanceOf(Buffer);
+ done();
+ });
+ });
+ for (const { label, message, bytes } of messages) {
+ test(label, (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.pong(message);
+ });
+ ws.addEventListener("pong", ({ data }) => {
+ expect(data).toEqual(Buffer.from(bytes));
+ done();
+ });
+ });
+ }
+ });
+ describe("close()", () => {
+ test("(no arguments)", (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.close();
+ });
+ ws.addEventListener("close", ({ code, reason, wasClean }) => {
+ expect(code).toBe(1000);
+ expect(reason).toBeString();
+ expect(wasClean).toBeTrue();
+ done();
+ });
+ });
+ test("(no reason)", (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.close(1001);
+ });
+ ws.addEventListener("close", ({ code, reason, wasClean }) => {
+ expect(code).toBe(1001);
+ expect(reason).toBeString();
+ expect(wasClean).toBeTrue();
+ done();
+ });
+ });
+ // FIXME: Encoding issue
+ // Expected: "latin1-©"
+ // Received: "latin1-©"
+ /*
+ for (const { label, message } of strings) {
+ test(label, (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.close(1002, message);
+ });
+ ws.addEventListener("close", ({ code, reason, wasClean }) => {
+ expect(code).toBe(1002);
+ expect(reason).toBe(message);
+ expect(wasClean).toBeTrue();
+ done();
+ });
+ });
+ }
+ */
+ });
+ test("terminate()", (ws, done) => {
+ ws.addEventListener("open", () => {
+ ws.terminate();
+ });
+ ws.addEventListener("close", ({ code, reason, wasClean }) => {
+ expect(code).toBe(1006);
+ expect(reason).toBeString();
+ expect(wasClean).toBeFalse();
+ done();
+ });
+ });
+});
+
+function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) {
+ it(
+ label,
+ testDone => {
+ let isDone = false;
+ const done = (err?: unknown) => {
+ if (!isDone) {
+ isDone = true;
+ testDone(err);
+ }
+ };
+ listen()
+ .then(url => {
+ const ws = new WebSocket(url);
+ clients.push(ws);
+ fn(ws, done);
+ })
+ .catch(done);
+ },
+ { timeout: timeout ?? 1000 },
+ );
+}
+
+async function listen(): Promise<URL> {
+ const { pathname } = new URL("./websocket-server-echo.mjs", import.meta.url);
+ const server = spawn({
+ cmd: [nodeExe() ?? bunExe(), pathname],
+ cwd: import.meta.dir,
+ env: bunEnv,
+ stderr: "ignore",
+ stdout: "pipe",
+ });
+ servers.push(server);
+ for await (const chunk of server.stdout) {
+ const text = new TextDecoder().decode(chunk);
+ try {
+ return new URL(text);
+ } catch {
+ throw new Error(`Invalid URL: '${text}'`);
+ }
+ }
+ throw new Error("No URL found?");
+}
diff --git a/test/js/web/websocket/websocket-server-echo.mjs b/test/js/web/websocket/websocket-server-echo.mjs
new file mode 100644
index 000000000..7ce1b205b
--- /dev/null
+++ b/test/js/web/websocket/websocket-server-echo.mjs
@@ -0,0 +1,94 @@
+import { createServer } from "node:http";
+import { WebSocketServer } from "ws";
+
+const server = createServer();
+const wss = new WebSocketServer({
+ perMessageDeflate: false,
+ noServer: true,
+});
+
+server.on("listening", () => {
+ const { address, port, family } = server.address();
+ const { href } = new URL(family === "IPv6" ? `ws://[${address}]:${port}` : `ws://${address}:${port}`);
+ console.log(href);
+ console.error("Listening:", href);
+});
+
+server.on("request", (request, response) => {
+ console.error("Received request:", { ...request.headers });
+ response.end();
+});
+
+server.on("clientError", (error, socket) => {
+ console.error("Received client error:", error);
+ socket.end();
+});
+
+server.on("error", error => {
+ console.error("Received error:", error);
+});
+
+server.on("upgrade", (request, socket, head) => {
+ console.error("Received upgrade:", { ...request.headers });
+
+ socket.on("data", data => {
+ console.error("Received bytes:", data);
+ });
+
+ wss.handleUpgrade(request, socket, head, ws => {
+ wss.emit("connection", ws, request);
+ });
+});
+
+wss.on("connection", (ws, request) => {
+ console.error("Received connection:", request.socket.remoteAddress);
+
+ ws.on("message", message => {
+ console.error("Received message:", message);
+ ws.send(message);
+
+ if (message === "ping") {
+ console.error("Sending ping");
+ ws.ping();
+ } else if (message === "pong") {
+ console.error("Sending pong");
+ ws.pong();
+ } else if (message === "close") {
+ console.error("Sending close");
+ ws.close();
+ } else if (message === "terminate") {
+ console.error("Sending terminate");
+ ws.terminate();
+ }
+ });
+
+ ws.on("ping", data => {
+ console.error("Received ping:", data);
+ ws.ping(data);
+ });
+
+ ws.on("pong", data => {
+ console.error("Received pong:", data);
+ ws.pong(data);
+ });
+
+ ws.on("close", (code, reason) => {
+ console.error("Received close:", code, reason);
+ });
+
+ ws.on("error", error => {
+ console.error("Received error:", error);
+ });
+});
+
+server.on("close", () => {
+ console.error("Server closed");
+});
+
+process.on("exit", exitCode => {
+ console.error("Server exited:", exitCode);
+});
+
+const hostname = process.env.HOST || "127.0.0.1";
+const port = parseInt(process.env.PORT) || 0;
+server.listen(port, hostname);