diff options
author | 2023-07-13 09:39:43 -0700 | |
---|---|---|
committer | 2023-07-13 09:39:43 -0700 | |
commit | 9eb8eea2a81be6a20abb62544dc54a35ff4173a5 (patch) | |
tree | 63ab7bc037477b0db836067ac6c49c273d251353 /test | |
parent | 04b4157232006c882cdd693f566b31945618b924 (diff) | |
download | bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.tar.gz bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.tar.zst bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.zip |
Implement `ping()`, `pong()`, `terminate()` for WebSocket client and server (#3257)
Diffstat (limited to 'test')
-rw-r--r-- | test/harness.ts | 6 | ||||
-rw-r--r-- | test/js/bun/websocket/websocket-client-echo.mjs | 70 | ||||
-rw-r--r-- | test/js/bun/websocket/websocket-server.test.ts | 1712 | ||||
-rw-r--r-- | test/js/first_party/ws/ws.test.ts | 284 | ||||
-rw-r--r-- | test/js/web/websocket/websocket-client.test.ts | 280 | ||||
-rw-r--r-- | test/js/web/websocket/websocket-server-echo.mjs | 94 |
6 files changed, 1265 insertions, 1181 deletions
diff --git a/test/harness.ts b/test/harness.ts index b0cedba74..8b850bbfc 100644 --- a/test/harness.ts +++ b/test/harness.ts @@ -1,4 +1,4 @@ -import { gc as bunGC, unsafe } from "bun"; +import { gc as bunGC, unsafe, which } from "bun"; import { heapStats } from "bun:jsc"; import path from "path"; import fs from "fs"; @@ -16,6 +16,10 @@ export function bunExe() { return process.execPath; } +export function nodeExe(): string | null { + return which("node") || null; +} + export function gc(force = true) { bunGC(force); } diff --git a/test/js/bun/websocket/websocket-client-echo.mjs b/test/js/bun/websocket/websocket-client-echo.mjs new file mode 100644 index 000000000..d721f673c --- /dev/null +++ b/test/js/bun/websocket/websocket-client-echo.mjs @@ -0,0 +1,70 @@ +import { WebSocket } from "ws"; + +let url; +try { + url = new URL(process.argv[2]); +} catch { + throw new Error(`Usage: ${process.argv0} websocket-client-echo.mjs <url>`); +} + +const ws = new WebSocket(url, { + perMessageDeflate: false, +}); + +ws.on("open", () => { + console.log("Connected", ws.url); // read by test script + console.error("Connected", ws.url); +}); + +ws.on("message", (data, isBinary) => { + if (isBinary) { + console.error("Received binary message:", data); + } else { + console.error("Received text message:", data); + } + ws.send(data, { binary: isBinary }); + + if (data === "ping") { + console.error("Sending ping"); + ws.ping(); + } else if (data === "pong") { + console.error("Sending pong"); + ws.pong(); + } else if (data === "close") { + console.error("Sending close"); + ws.close(); + } else if (data === "terminate") { + console.error("Sending terminate"); + ws.terminate(); + } +}); + +ws.on("ping", data => { + console.error("Received ping:", data); + ws.ping(data); +}); + +ws.on("pong", data => { + console.error("Received pong:", data); + ws.pong(data); +}); + +ws.on("error", error => { + console.error("Received error:", error); +}); + +ws.on("close", (code, reason, wasClean) => { + if (wasClean === true) { + console.error("Received abrupt close:", code, reason); + } else { + console.error("Received close:", code, reason); + } +}); + +ws.on("redirect", url => { + console.error("Received redirect:", url); +}); + +ws.on("unexpected-response", (_, response) => { + console.error("Received unexpected response:", response.statusCode, { ...response.headers }); +}); diff --git a/test/js/bun/websocket/websocket-server.test.ts b/test/js/bun/websocket/websocket-server.test.ts index 7a79f9f5f..eb9b773a3 100644 --- a/test/js/bun/websocket/websocket-server.test.ts +++ b/test/js/bun/websocket/websocket-server.test.ts @@ -1,1255 +1,607 @@ -import { describe, expect, it } from "bun:test"; -import { gcTick } from "harness"; -import { serve, ServerWebSocket } from "bun"; - -describe("websocket server", () => { - it("send & receive empty messages", done => { - const serverReceived: any[] = []; - const clientReceived: any[] = []; - var clientDone = false; - var serverDone = false; - - let server = Bun.serve({ - websocket: { - open(ws) { - ws.send(""); - ws.send(new ArrayBuffer(0)); - }, - message(ws, data) { - serverReceived.push(data); - - if (serverReceived.length === 2) { - if (serverReceived.find(d => d === "") === undefined) { - done(new Error("expected empty string")); - } - - if (!serverReceived.find(d => d.byteLength === 0)) { - done(new Error("expected empty Buffer")); - } - - serverDone = true; +import { describe, it, expect, afterEach } from "bun:test"; +import type { Server, Subprocess, WebSocketHandler } from "bun"; +import { serve, spawn } from "bun"; +import { bunEnv, bunExe, nodeExe } from "harness"; +import { drainMicrotasks, fullGC } from "bun:jsc"; + +const strings = [ + { + label: "string (ascii)", + message: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + message: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + message: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, +]; + +const buffers = [ + { + label: "Uint8Array (utf-8)", + message: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "ArrayBuffer (utf-8)", + message: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "Buffer (utf-8)", + message: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, +]; + +const messages = [...strings, ...buffers]; + +const binaryTypes = [ + { + label: "nodebuffer", + type: Buffer, + }, + { + label: "arraybuffer", + type: ArrayBuffer, + }, + { + label: "uint8array", + type: Uint8Array, + }, +] as const; + +let servers: Server[] = []; +let clients: Subprocess[] = []; + +afterEach(() => { + for (const server of servers) { + server.stop(true); + } + for (const client of clients) { + client.kill(); + } +}); - if (clientDone && serverDone) { - z.close(); - server.stop(true); - done(); - } +describe("Server", () => { + describe("websocket", () => { + test("open", done => ({ + open(ws) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + done(); + }, + })); + test("close", done => ({ + open(ws) { + ws.close(); + }, + close(ws, code, reason) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(code).toBeInteger(); + expect(reason).toBeString(); + done(); + }, + })); + test("message", done => ({ + open(ws) { + ws.send("Hello"); + }, + message(ws, data) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(data).toBeDefined(); + done(); + }, + })); + test("drain", done => ({ + backpressureLimit: 1, + open(ws) { + const data = new Uint8Array(1 * 1024 * 1024); + // send data until backpressure is triggered + for (let i = 0; i < 10; i++) { + if (ws.send(data) < 1) { // backpressure or dropped + break; } - }, - close() {}, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); } }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.onmessage = e => { - clientReceived.push(e.data); - - if (clientReceived.length === 2) { - if (clientReceived.find(d => d === "") === undefined) { - done(new Error("expected empty string")); - } - - if (!clientReceived.find(d => d.byteLength === 0)) { - done(new Error("expected empty Buffer")); + drain(ws) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + done(); + }, + })); + test("ping", done => ({ + open(ws) { + ws.ping(); + }, + ping(ws, data) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(data).toBeInstanceOf(Buffer); + done(); + }, + })); + test("pong", done => ({ + open(ws) { + ws.pong(); + }, + pong(ws, data) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(data).toBeInstanceOf(Buffer); + done(); + }, + })); + test("maxPayloadLength", done => ({ + maxPayloadLength: 4, + open(ws) { + ws.send("Hello!"); + }, + close(_, code) { + expect(code).toBe(1006); + done(); + }, + })); + test("backpressureLimit", done => ({ + backpressureLimit: 1, + open(ws) { + const data = new Uint8Array(1 * 1024 * 1024); + expect(ws.send(data.slice(0, 1))).toBe(1); // sent + let backpressure; + for (let i = 0; i < 10; i++) { + if (ws.send(data) === -1) { + backpressure = true; + break; + } } - - clientDone = true; - if (clientDone && serverDone) { - server.stop(true); - z.close(); - - done(); + if (!backpressure) { + done(new Error("backpressure not triggered")); + return; } - } - }; - z.addEventListener("open", () => { - z.send(""); - z.send(new Buffer(0)); - }); - }); - - it("remoteAddress works", done => { - let server = Bun.serve({ - websocket: { - message() {}, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - done(); - } catch (e) { - done(e); + let dropped; + for (let i = 0; i < 10; i++) { + if (ws.send(data) === 0) { + dropped = true; + break; } - }, - close() {}, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - setTimeout(() => z.close(), 0); - }); - z.addEventListener("close", () => { - server.stop(); - }); - }); - it("can do publish()", async done => { - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - open(ws) { - ws.subscribe("all"); - }, - message(ws, msg) {}, - close(ws) {}, - }, - fetch(req, server) { - if (server.upgrade(req)) { + if (!dropped) { + done(new Error("message not dropped")); return; } - - return new Response("success"); - }, - }); - - await new Promise<void>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - var clientCounter = 0; - - socket.onmessage = e => { - expect(e.data).toBe("hello"); - resolve2(); - }; - socket.onopen = () => { - queueMicrotask(() => { - server.publish("all", "hello"); - }); - }; - }); - server.stop(true); - done(); - }); - - it("can do publish() with publishToSelf: false", async done => { - var server = serve({ - port: 0, - websocket: { - open(ws) { - ws.subscribe("all"); - ws.publish("all", "hey"); - server.publish("all", "hello"); - }, - message(ws, msg) { - if (new TextDecoder().decode(msg as Uint8Array) !== "hello") { - done(new Error("unexpected message")); + done(); + }, + })); + // FIXME: close() callback is called, but only after timeout? + it.todo("closeOnBackpressureLimit"); + /* + test("closeOnBackpressureLimit", done => ({ + closeOnBackpressureLimit: true, + backpressureLimit: 1, + open(ws) { + const data = new Uint8Array(1 * 1024 * 1024); + // send data until backpressure is triggered + for (let i = 0; i < 10; i++) { + if (ws.send(data) < 1) { + return; } - }, - close(ws) {}, - publishToSelf: false, - }, - fetch(req, server) { - if (server.upgrade(req)) { - return; } - - return new Response("success"); + done(new Error("backpressure not triggered")); }, - }); - - await new Promise<void>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - - socket.onmessage = e => { - expect(e.data).toBe("hello"); - resolve2(); - }; - }); - server.stop(true); - done(); + close(_, code) { + expect(code).toBe(1006); + done(); + }, + })); + */ + it.todo("perMessageDeflate"); }); +}); - for (let method of ["publish", "publishText", "publishBinary"] as const) { - describe(method, () => { - it("in close() should work", async () => { - var count = 0; - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - open(ws) { - ws.subscribe("all"); - }, - message(ws, msg) {}, - close(ws) { - (ws[method] as any)("all", method === "publishBinary" ? Buffer.from("bye!") : "bye!"); - count++; - - if (count >= 2) { - server.stop(true); - } - }, - }, - fetch(req, server) { - if (server.upgrade(req)) { - return; - } - - return new Response("success"); - }, - }); - +describe("ServerWebSocket", () => { + test("readyState", done => ({ + open(ws) { + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }, + close(ws) { + expect(ws.readyState).toBe(WebSocket.CLOSED); + done(); + }, + })); + test("remoteAddress", done => ({ + open(ws) { + expect(ws.remoteAddress).toMatch(/^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/); + done(); + }, + })); + describe("binaryType", () => { + test("(default)", done => ({ + open(ws) { + expect(ws.binaryType).toBe("nodebuffer"); + done(); + }, + })); + test("(invalid)", done => ({ + open(ws) { try { - const first = await new Promise<WebSocket>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - socket.onopen = () => resolve2(socket); - }); - - await new Promise<WebSocket>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - socket.onopen = () => { - queueMicrotask(() => first.close()); - }; - socket.onmessage = ev => { - var msg = ev.data; - if (typeof msg !== "string") { - msg = new TextDecoder().decode(msg); - } - - if (msg === "bye!") { - socket.close(0); - resolve2(socket); - } else { - reject2(msg); - } - }; - }); - } finally { - server.stop(true); - } - }); - }); - } - - it("close inside open", async () => { - var resolve: () => void; - var server = serve({ - port: 0, - websocket: { - open(ws) {}, - message(ws, msg) {}, - close() { - resolve(); - server.stop(true); - }, - }, - fetch(req, server) { - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); - } - return; + // @ts-expect-error + ws.binaryType = "invalid"; + done(new Error("Expected an error")); + } catch (cause) { + done(); } - - return new Response("noooooo hello world"); }, - }); - - await new Promise<void>((resolve_, reject) => { - resolve = resolve_; - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.close(); - }; - websocket.onmessage = e => {}; - websocket.onerror = e => {}; - }); - }); - - it("headers error doesn't crash", async () => { - await new Promise<void>((resolve, reject) => { - const server = serve({ - port: 0, - websocket: { - open(ws) { - ws.close(); - }, - message(ws, msg) {}, - close() { - resolve(); - server.stop(true); - }, + })); + for (const { label, type } of binaryTypes) { + test(label, done => ({ + open(ws) { + ws.binaryType = label; + expect(ws.binaryType).toBe(label); + ws.send(new Uint8Array(1)); }, - error(err) { - resolve(); - server.stop(true); + message(ws, received) { + expect(received).toBeInstanceOf(type); + ws.ping(); }, - fetch(req, server) { - expect(() => { - if ( - server.upgrade(req, { - data: "hello world", - headers: 1238 as any, - }) - ) { - reject(new Error("should not upgrade")); - return new Response("should not upgrade"); - } - }).toThrow("upgrade options.headers must be a Headers or an object"); - resolve(); - return new Response("success"); + ping(ws, received) { + expect(received).toBeInstanceOf(type); + ws.pong(); }, - }); - - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => websocket.close(); - websocket.onmessage = e => {}; - websocket.onerror = e => {}; - }); + pong(_, received) { + expect(received).toBeInstanceOf(type); + done(); + }, + })); + } }); - it("can do hello world", async () => { - const server = serve({ - port: 0, - websocket: { + describe("send()", () => { + for (const { label, message, bytes } of messages) { + test(label, done => ({ open(ws) { - server.stop(); + ws.send(message); }, - message(ws, msg) { - ws.send("hello world"); - }, - }, - fetch(req, server) { - server.stop(); - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); + message(_, received) { + if (typeof received === "string") { + expect(received).toBe(message); + } else { + expect(received).toEqual(Buffer.from(bytes)); } - return; - } - - return new Response("noooooo hello world"); + done(); + }, + })); + } + test( + "(benchmark)", + (done, connect) => { + const maxClients = 10; + const maxMessages = 10_000; + let count = 0; + return { + open(ws) { + if (ws.data.id < maxClients) { + connect(); + } + for (let i = 0; i < maxMessages; i++) { + ws.send(`${i}`, true); + ws.sendText(`${i}`, true); + ws.sendBinary(Buffer.from(`${i}`), true); + } + }, + message() { + if (++count === maxClients * maxMessages * 3) { + done(); + } + }, + }; }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send("hello world"); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); + 30_000, + ); }); - - it("fetch() allows a Response object to be returned for an upgraded ServerWebSocket", () => { - const server = serve({ - port: 0, - websocket: { + describe("sendBinary()", () => { + for (const { label, message, bytes } of buffers) { + test(label, done => ({ open(ws) { - server.stop(); + ws.sendBinary(message); }, - message(ws, msg) { - ws.send("hello world"); + message(_, received) { + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }, - error(err) { - console.error(err); - }, - fetch(req, server) { - server.stop(); - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); - } - return new Response("lol!", { - status: 101, - }); - } - - return new Response("noooooo hello world"); - }, - }); - - return new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send("hello world"); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); + })); + } }); - - it("fetch() allows a Promise<Response> object to be returned for an upgraded ServerWebSocket", () => { - const server = serve({ - port: 0, - websocket: { - async open(ws) { - server.stop(); + describe("sendText()", () => { + for (const { label, message } of strings) { + test(label, done => ({ + open(ws) { + ws.sendText(message); }, - async message(ws, msg) { - await 1; - ws.send("hello world"); + message(_, received) { + expect(received).toEqual(message); + done(); }, - }, - error(err) { - console.error(err); - }, - async fetch(req, server) { - server.stop(); - await 1; - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); - } - return new Response("lol!", { - status: 101, - }); - } - - return new Response("noooooo hello world"); - }, - }); - return new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send("hello world"); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); + })); + } }); - it("binaryType works", async () => { - var done = false; - const server = serve({ - port: 0, - websocket: { + describe("subscribe()", () => { + for (const { label, message } of strings) { + test(label, done => ({ open(ws) { - server.stop(); + expect(ws.isSubscribed(message)).toBeFalse(); + ws.subscribe(message); + expect(ws.isSubscribed(message)).toBeTrue(); + ws.unsubscribe(message); + expect(ws.isSubscribed(message)).toBeFalse(); + done(); }, - message(ws, msg) { - // The first message is supposed to be "uint8array" - // Then after uint8array, we switch it to "nodebuffer" - // Then after nodebuffer, we switch it to "arraybuffer" - // and then we're done - switch (ws.binaryType) { - case "nodebuffer": { - for (let badType of [ - 123, - NaN, - Symbol("uint8array"), - "uint16array", - "uint32array", - "float32array", - "float64array", - "garbage", - ]) { - expect(() => { - /* @ts-ignore */ - ws.binaryType = badType; - }).toThrow(); - } - expect(ws.binaryType).toBe("nodebuffer"); - ws.binaryType = "uint8array"; - expect(ws.binaryType).toBe("uint8array"); - expect(msg instanceof Uint8Array).toBe(true); - expect(Buffer.isBuffer(msg)).toBe(true); - break; - } - - case "uint8array": { - expect(ws.binaryType).toBe("uint8array"); - ws.binaryType = "arraybuffer"; - expect(ws.binaryType).toBe("arraybuffer"); - expect(msg instanceof Uint8Array).toBe(true); - expect(Buffer.isBuffer(msg)).toBe(false); - break; - } - - case "arraybuffer": { - expect(ws.binaryType).toBe("arraybuffer"); - expect(msg instanceof ArrayBuffer).toBe(true); - done = true; - break; - } - - default: { - throw new Error("unknown binaryType"); - } + })); + } + }); + describe("publish()", () => { + for (const { label, message, bytes } of messages) { + const topic = typeof message === "string" ? message : label; + test(label, (done, connect) => ({ + async open(ws) { + ws.subscribe(topic); + if (ws.data.id === 0) { + connect(); + } else if (ws.data.id === 1) { + ws.publish(topic, message); } - - ws.send("hello world"); }, - }, - fetch(req, server) { - server.stop(); - if (server.upgrade(req, { data: "hello world" })) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); + message(ws, received) { + if (ws.data.id === 1) { + throw new Error("Expected publish() to not send to self"); } - return; - } - - return new Response("noooooo hello world"); - }, - }); - - const isDone = await new Promise<boolean>((resolve, reject) => { - var counter = 0; - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send(Buffer.from("hello world")); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - - if (counter++ > 2) { - websocket.close(); - resolve(done); + if (typeof message === "string") { + expect(received).toBe(message); + } else { + expect(received).toEqual(Buffer.from(bytes)); } - websocket.send(Buffer.from("oaksd")); - } catch (r) { - websocket.close(); - reject(r); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); - expect(isDone).toBe(true); - }); - - it("does not upgrade for non-websocket connections", async () => { - await new Promise<void>(async (resolve, reject) => { - var server = serve({ - port: 0, - websocket: { - open(ws) { - ws.send("hello world"); - }, - message(ws, msg) {}, + done(); }, - fetch(req, server) { - if (server.upgrade(req)) { - reject(new Error("should not upgrade")); + })); + } + }); + describe("publishBinary()", () => { + for (const { label, message, bytes } of buffers) { + test(label, (done, connect) => ({ + async open(ws) { + ws.subscribe(label); + if (ws.data.id === 0) { + connect(); + } else if (ws.data.id === 1) { + ws.publishBinary(label, message); } - - return new Response("success"); }, - }); - - const response = await fetch(`http://${server.hostname}:${server.port}`); - expect(await response.text()).toBe("success"); - resolve(); - server.stop(true); - }); - }); - - it("does not upgrade for non-websocket servers", async () => { - await new Promise<void>(async (resolve, reject) => { - const server = serve({ - port: 0, - fetch(req, server) { - server.stop(); - expect(() => { - server.upgrade(req); - }).toThrow('To enable websocket support, set the "websocket" object in Bun.serve({})'); - return new Response("success"); + message(ws, received) { + if (ws.data.id === 1) { + throw new Error("Expected publish() to not send to self"); + } + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }); - - const response = await fetch(`http://${server.hostname}:${server.port}`); - expect(await response.text()).toBe("success"); - resolve(); - }); + })); + } }); - - it("async can do hello world", async () => { - const server = serve({ - port: 0, - websocket: { + describe("publishText()", () => { + for (const { label, message } of strings) { + test(label, (done, connect) => ({ async open(ws) { - server.stop(true); - ws.send("hello world"); - }, - message(ws, msg) {}, - }, - async fetch(req, server) { - server.stop(); - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); - }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); - }); - - it("publishText()", async () => { - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.publishText("hello", "world"); - websocket.close(); - server.stop(true); - resolve(); - }, - message(ws, msg) {}, + ws.subscribe(label); + if (ws.data.id === 0) { + connect(); + } else if (ws.data.id === 1) { + ws.publishText(label, message); + } }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + message(ws, received) { + if (ws.data.id === 1) { + throw new Error("Expected publish() to not send to self"); + } + expect(received).toEqual(message); + done(); }, - }); - - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("publishBinary()", async () => { - const bytes = Buffer.from("hello"); - - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.publishBinary("hello", bytes); - websocket.close(); - server.stop(true); - resolve(); - }, - message(ws, msg) {}, - }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + describe("publish() with { publishToSelf: true }", () => { + for (const { label, message, bytes } of messages) { + const topic = typeof message === "string" ? message : label; + test(label, done => ({ + publishToSelf: true, + async open(ws) { + ws.subscribe(topic); + ws.publish(topic, message); + }, + message(_, received) { + if (typeof message === "string") { + expect(received).toBe(message); + } else { + expect(received).toEqual(Buffer.from(bytes)); + } + done(); }, - }); - - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("sendText()", async () => { - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.sendText("hello world", true); - resolve(); - websocket.close(); - server.stop(true); - }, - message(ws, msg) {}, + describe("ping()", () => { + test("(no argument)", done => ({ + open(ws) { + ws.ping(); + }, + ping(_, received) { + expect(received).toBeEmpty(); + done(); + }, + })); + for (const { label, message, bytes } of messages) { + test(label, done => ({ + open(ws) { + ws.ping(message); }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + ping(_, received) { + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }); - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("sendBinary()", async () => { - const bytes = Buffer.from("hello"); - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.sendBinary(bytes, true); - websocket.close(); - server.stop(true); - resolve(); - }, - message(ws, msg) {}, + describe("pong()", () => { + test("(no argument)", done => ({ + open(ws) { + ws.pong(); + }, + pong(_, received) { + expect(received).toBeEmpty(); + done(); + }, + })); + for (const { label, message, bytes } of messages) { + test(label, done => ({ + open(ws) { + ws.pong(message); }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + pong(_, received) { + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }); - - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("can do hello world corked", async () => { - const server = serve({ - port: 0, - websocket: { - open(ws) { - server.stop(); - ws.send("hello world"); - }, - message(ws, msg) { + test("cork()", done => { + let count = 0; + return { + open(ws) { + setTimeout(() => { ws.cork(() => { - ws.send("hello world"); + ws.send("1"); + ws.sendText("2"); + ws.sendBinary(new TextEncoder().encode("3")); }); - }, - }, - fetch(req, server) { - server.stop(); - if (server.upgrade(req)) return; - return new Response("noooooo hello world"); + }, 5); }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); + message(_, message) { + if (typeof message === "string") { + expect(+message).toBe(++count); + } else { + expect(+new TextDecoder().decode(message)).toBe(++count); } - }; - websocket.onerror = e => { - reject(e); - }; - }); - server.stop(true); - }); - - it("can do some back and forth", async () => { - var dataCount = 0; - const server = serve({ - port: 0, - websocket: { - open(ws) { - server.stop(); - }, - message(ws, msg) { - if (msg === "first") { - ws.send("first"); - return; - } - ws.send(`counter: ${dataCount++}`); - }, - }, - fetch(req, server) { - server.stop(); - if ( - server.upgrade(req, { - data: { count: 0 }, - }) - ) - return new Response("noooooo hello world"); - }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onerror = e => { - reject(e); - }; - - var counter = 0; - websocket.onopen = () => websocket.send("first"); - websocket.onmessage = e => { - try { - switch (counter++) { - case 0: { - expect(e.data).toBe("first"); - websocket.send("where are the loops"); - break; - } - case 1: { - expect(e.data).toBe("counter: 0"); - websocket.send("br0ther may i have some loops"); - break; - } - case 2: { - expect(e.data).toBe("counter: 1"); - websocket.send("br0ther may i have some loops"); - break; - } - case 3: { - expect(e.data).toBe("counter: 2"); - resolve(); - break; - } - } - } catch (r) { - reject(r); - websocket.close(); + if (count === 3) { + done(); } - }; - }); - server.stop(true); - }); - - it("send rope strings", async () => { - var ropey = "hello world".repeat(10); - var sendQueue: any[] = []; - for (var i = 0; i < 20; i++) { - sendQueue.push(ropey + " " + i); - } - - var serverCounter = 0; - var clientCounter = 0; - - const server = serve({ - port: 0, - websocket: { - open(ws) {}, - message(ws, msg) { - ws.send(sendQueue[serverCounter++] + " "); - serverCounter % 10 === 0 && gcTick(); - }, - }, - fetch(req, server) { - if ( - server.upgrade(req, { - data: { count: 0 }, - }) - ) - return; - - return new Response("noooooo hello world"); }, - }); - try { - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onerror = e => { - reject(e); - }; - - websocket.onopen = () => { - server.stop(); - websocket.send("first"); - }; - - websocket.onmessage = e => { - try { - const expected = sendQueue[clientCounter++] + " "; - expect(e.data).toBe(expected); - websocket.send("next"); - if (clientCounter === sendQueue.length) { - websocket.close(); - resolve(); - } - } catch (r) { - reject(r); - console.error(r); - websocket.close(); - } - }; - }); - } catch (e) { - throw e; - } finally { - server.stop(true); - } + }; }); - - // this test sends 50 messages to 10 connected clients via pubsub - it("pub/sub", async () => { - var ropey = "hello world".repeat(10); - var sendQueue: any[] = []; - for (var i = 0; i < 50; i++) { - sendQueue.push(ropey + " " + i); - } - - var serverCounter = 0; - var clientCount = 0; - const server = serve({ - port: 0, - websocket: { + describe("close()", () => { + test("(no arguments)", done => ({ + open(ws) { + ws.close(); + }, + close(_, code, reason) { + expect(code).toBe(1000); + expect(reason).toBeEmpty(); + done(); + }, + })); + test("(no reason)", done => ({ + open(ws) { + ws.close(1001); + }, + close(_, code, reason) { + expect(code).toBe(1001); + expect(reason).toBeEmpty(); + done(); + }, + })); + for (const { label, message } of strings) { + test(label, done => ({ open(ws) { - ws.subscribe("test"); - if (!ws.isSubscribed("test")) { - throw new Error("not subscribed"); - } - ws.unsubscribe("test"); - if (ws.isSubscribed("test")) { - throw new Error("subscribed"); - } - ws.subscribe("test"); - clientCount++; - if (clientCount === 10) { - setTimeout(() => server.publish("test", "hello world"), 1); - } + ws.close(1002, message); }, - message(ws, msg) { - if (serverCounter < sendQueue.length) server.publish("test", sendQueue[serverCounter++] + " "); + close(_, code, reason) { + expect(code).toBe(1002); + expect(reason).toBe(message); + done(); }, - }, - fetch(req) { - if ( - server.upgrade(req, { - data: { count: 0 }, - }) - ) - return; - return new Response("noooooo hello world"); - }, - }); - try { - const connections = new Array(10); - const websockets = new Array(connections.length); - var doneCounter = 0; - await new Promise<void>(done => { - for (var i = 0; i < connections.length; i++) { - var j = i; - var resolve: (_?: unknown) => void, - reject: (_?: unknown) => void, - resolveConnection: (_?: unknown) => void, - rejectConnection: (_?: unknown) => void; - connections[j] = new Promise((res, rej) => { - resolveConnection = res; - rejectConnection = rej; - }); - websockets[j] = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onerror = e => { - reject(e); - }; - websocket.onclose = () => { - doneCounter++; - if (doneCounter === connections.length) { - done(); - } - }; - var hasOpened = false; - websocket.onopen = () => { - if (!hasOpened) { - hasOpened = true; - resolve(websocket); - } - }; - - let clientCounter = -1; - var hasSentThisTick = false; - - websocket.onmessage = e => { - if (!hasOpened) { - hasOpened = true; - resolve(websocket); - } - - if (e.data === "hello world") { - clientCounter = 0; - websocket.send("first"); - return; - } - - try { - expect(!!sendQueue.find(a => a + " " === e.data)).toBe(true); - - if (!hasSentThisTick) { - websocket.send("second"); - hasSentThisTick = true; - queueMicrotask(() => { - hasSentThisTick = false; - }); - } - - if (clientCounter++ === sendQueue.length - 1) { - websocket.close(); - resolveConnection(); - } - } catch (r) { - console.error(r); - websocket.close(); - rejectConnection(r); - } - }; - } - }); - } catch (e) { - throw e; - } finally { - server.stop(true); - gcTick(); + })); } - - expect(serverCounter).toBe(sendQueue.length); - }, 30_000); - it("can close with reason and code #2631", done => { - let timeout: any; - let server = Bun.serve({ - websocket: { - message(ws) { - ws.close(2000, "test"); - }, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - } catch (e) { - clearTimeout(timeout); - done(e); - } - }, - close(ws, code, reason) { - try { - expect(code).toBe(2000); - expect(reason).toBe("test"); - clearTimeout(timeout); - done(); - } catch (e) { - clearTimeout(timeout); - done(e); - } - }, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); - } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - z.send("test"); - }); - z.addEventListener("close", () => { - server.stop(); - }); - - timeout = setTimeout(() => { - done(new Error("Did not close in time")); - server.stop(true); - }, 1000); }); + test("terminate()", done => ({ + open(ws) { + ws.terminate(); + }, + close(_, code, reason) { + expect(code).toBe(1006); + expect(reason).toBeEmpty(); + done(); + }, + })); +}); - it("can close with code and without reason #2631", done => { - let timeout: any; - let server = Bun.serve({ - websocket: { - message(ws) { - ws.close(2000); - }, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - } catch (e) { - done(e); - clearTimeout(timeout); - } - }, - close(ws, code, reason) { - clearTimeout(timeout); - - try { - expect(code).toBe(2000); - expect(reason).toBe(""); - done(); - } catch (e) { - done(e); - } - }, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); +function test( + label: string, + fn: (done: (err?: unknown) => void, connect: () => Promise<void>) => Partial<WebSocketHandler<{ id: number }>>, + timeout?: number, +) { + it( + label, + testDone => { + let isDone = false; + const done = (err?: unknown) => { + if (!isDone) { + isDone = true; + server.stop(); + testDone(err); } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - z.send("test"); - }); - z.addEventListener("close", () => { - server.stop(); - }); - - timeout = setTimeout(() => { - done(new Error("Did not close in time")); - server.stop(true); - }, 1000); - }); - it("can close without reason or code #2631", done => { - let timeout: any; - let server = Bun.serve({ - websocket: { - message(ws) { - ws.close(); - }, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - } catch (e) { - clearTimeout(timeout); - done(e); + }; + let id = 0; + const server: Server = serve({ + port: 0, + fetch(request, server) { + const data = { id: id++ }; + if (server.upgrade(request, { data })) { + return; } + return new Response(); }, - close(ws, code, reason) { - clearTimeout(timeout); - try { - expect(code).toBe(1006); - expect(reason).toBe(""); - done(); - } catch (e) { - done(e); - } + websocket: { + sendPings: false, + message() {}, + ...fn(done, () => connect(server)), }, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); - } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - z.send("test"); - }); - z.addEventListener("close", () => { - server.stop(); - }); - - timeout = setTimeout(() => { - done(new Error("Did not close in time")); - server.stop(true); - }, 1000); + }); + servers.push(server); + connect(server); + }, + { timeout: timeout ?? 1000 }, + ); +} + +async function connect(server: Server): Promise<void> { + const url = new URL(`ws://${server.hostname}:${server.port}/`); + const { pathname } = new URL("./websocket-client-echo.mjs", import.meta.url); + // @ts-ignore + const client = spawn({ + cmd: [nodeExe() ?? bunExe(), pathname, url], + cwd: import.meta.dir, + env: bunEnv, + stderr: "ignore", + stdout: "pipe", }); -}); + clients.push(client); + for await (const chunk of client.stdout) { + if (new TextDecoder().decode(chunk).includes("Connected")) { + return; + } + } +} diff --git a/test/js/first_party/ws/ws.test.ts b/test/js/first_party/ws/ws.test.ts new file mode 100644 index 000000000..c0b56a200 --- /dev/null +++ b/test/js/first_party/ws/ws.test.ts @@ -0,0 +1,284 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import type { Subprocess } from "bun"; +import { spawn } from "bun"; +import { bunEnv, bunExe, nodeExe } from "harness"; +import { WebSocket } from "ws"; + +const strings = [ + { + label: "string (ascii)", + message: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + message: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + message: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, +]; + +const buffers = [ + { + label: "Uint8Array (utf-8)", + message: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "ArrayBuffer (utf-8)", + message: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "Buffer (utf-8)", + message: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, +]; + +const messages = [...strings, ...buffers]; + +const binaryTypes = [ + { + label: "nodebuffer", + type: Buffer, + }, + { + label: "arraybuffer", + type: ArrayBuffer, + }, +] as const; + +let servers: Subprocess[] = []; +let clients: WebSocket[] = []; + +function cleanUp() { + for (const client of clients) { + client.terminate(); + } + for (const server of servers) { + server.kill(); + } +} + +beforeEach(cleanUp); +afterEach(cleanUp); + +describe("WebSocket", () => { + test("url", (ws, done) => { + expect(ws.url).toStartWith("ws://"); + done(); + }); + test("readyState", (ws, done) => { + expect(ws.readyState).toBe(WebSocket.CONNECTING); + ws.on("open", () => { + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }); + ws.on("close", () => { + expect(ws.readyState).toBe(WebSocket.CLOSED); + done(); + }); + }); + describe("binaryType", () => { + test("(default)", (ws, done) => { + expect(ws.binaryType).toBe("nodebuffer"); + done(); + }); + test("(invalid)", (ws, done) => { + try { + // @ts-expect-error + ws.binaryType = "invalid"; + done(new Error("Expected an error")); + } catch { + done(); + } + }); + for (const { label, type } of binaryTypes) { + test(label, (ws, done) => { + ws.binaryType = label; + ws.on("open", () => { + expect(ws.binaryType).toBe(label); + ws.send(new Uint8Array(1)); + }); + ws.on("message", (data, isBinary) => { + expect(data).toBeInstanceOf(type); + expect(isBinary).toBeTrue(); + ws.ping(); + }); + ws.on("ping", (data) => { + expect(data).toBeInstanceOf(type); + ws.pong(); + }); + ws.on("pong", (data) => { + expect(data).toBeInstanceOf(type); + done(); + }); + }); + } + }); + describe("send()", () => { + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.send(message); + }); + ws.on("message", (data, isBinary) => { + if (typeof data === "string") { + expect(data).toBe(message); + expect(isBinary).toBeFalse(); + } else { + expect(data).toEqual(Buffer.from(bytes)); + expect(isBinary).toBeTrue(); + } + done(); + }); + }); + } + }); + describe("ping()", () => { + test("(no argument)", (ws, done) => { + ws.on("open", () => { + ws.ping(); + }); + ws.on("ping", (data) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.ping(message); + }); + ws.on("ping", (data) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("pong()", () => { + test("(no argument)", (ws, done) => { + ws.on("open", () => { + ws.pong(); + }); + ws.on("pong", (data) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.pong(message); + }); + ws.on("pong", (data) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("close()", () => { + test("(no arguments)", (ws, done) => { + ws.on("open", () => { + ws.close(); + }); + ws.on("close", (code: number, reason: string, wasClean: boolean) => { + expect(code).toBe(1000); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + test("(no reason)", (ws, done) => { + ws.on("open", () => { + ws.close(1001); + }); + ws.on("close", (code: number, reason: string, wasClean: boolean) => { + expect(code).toBe(1001); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + // FIXME: Encoding issue + // Expected: "latin1-©" + // Received: "latin1-©" + /* + for (const { label, message } of strings) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.close(1002, message); + }); + ws.on("close", (code, reason, wasClean) => { + expect(code).toBe(1002); + expect(reason).toBe(message); + expect(wasClean).toBeTrue(); + done(); + }); + }); + } + */ + }); + test("terminate()", (ws, done) => { + ws.on("open", () => { + ws.terminate(); + }); + ws.on("close", (code: number, reason: string, wasClean: boolean) => { + expect(code).toBe(1006); + expect(reason).toBeString(); + expect(wasClean).toBeFalse(); + done(); + }); + }); +}); + +function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) { + it( + label, + testDone => { + let isDone = false; + const done = (err?: unknown) => { + if (!isDone) { + isDone = true; + testDone(err); + } + }; + listen() + .then(url => { + const ws = new WebSocket(url); + clients.push(ws); + fn(ws, done); + }) + .catch(done); + }, + { timeout: timeout ?? 1000 }, + ); +} + +async function listen(): Promise<URL> { + const { pathname } = new URL("../../web/websocket/websocket-server-echo.mjs", import.meta.url); + const server = spawn({ + cmd: [nodeExe() ?? bunExe(), pathname], + cwd: import.meta.dir, + env: bunEnv, + stderr: "ignore", + stdout: "pipe", + }); + servers.push(server); + for await (const chunk of server.stdout) { + const text = new TextDecoder().decode(chunk); + try { + return new URL(text); + } catch { + throw new Error(`Invalid URL: '${text}'`); + } + } + throw new Error("No URL found?"); +} diff --git a/test/js/web/websocket/websocket-client.test.ts b/test/js/web/websocket/websocket-client.test.ts new file mode 100644 index 000000000..470fa71c0 --- /dev/null +++ b/test/js/web/websocket/websocket-client.test.ts @@ -0,0 +1,280 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import type { Subprocess } from "bun"; +import { spawn } from "bun"; +import { bunEnv, bunExe, nodeExe } from "harness"; + +const strings = [ + { + label: "string (ascii)", + message: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + message: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + message: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, +]; + +const buffers = [ + { + label: "Uint8Array (utf-8)", + message: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "ArrayBuffer (utf-8)", + message: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "Buffer (utf-8)", + message: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, +]; + +const messages = [...strings, ...buffers]; + +const binaryTypes = [ + { + label: "nodebuffer", + type: Buffer, + }, + { + label: "arraybuffer", + type: ArrayBuffer, + }, +] as const; + +let servers: Subprocess[] = []; +let clients: WebSocket[] = []; + +function cleanUp() { + for (const client of clients) { + client.terminate(); + } + for (const server of servers) { + server.kill(); + } +} + +beforeEach(cleanUp); +afterEach(cleanUp); + +describe("WebSocket", () => { + test("url", (ws, done) => { + expect(ws.url).toStartWith("ws://"); + done(); + }); + test("readyState", (ws, done) => { + expect(ws.readyState).toBe(WebSocket.CONNECTING); + ws.addEventListener("open", () => { + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }); + ws.addEventListener("close", () => { + expect(ws.readyState).toBe(WebSocket.CLOSED); + done(); + }); + }); + describe("binaryType", () => { + test("(default)", (ws, done) => { + expect(ws.binaryType).toBe("nodebuffer"); + done(); + }); + test("(invalid)", (ws, done) => { + try { + // @ts-expect-error + ws.binaryType = "invalid"; + done(new Error("Expected an error")); + } catch { + done(); + } + }); + for (const { label, type } of binaryTypes) { + test(label, (ws, done) => { + ws.binaryType = label; + ws.addEventListener("open", () => { + expect(ws.binaryType).toBe(label); + ws.send(new Uint8Array(1)); + }); + ws.addEventListener("message", ({ data }) => { + expect(data).toBeInstanceOf(type); + ws.ping(); + }); + ws.addEventListener("ping", ({ data }) => { + expect(data).toBeInstanceOf(type); + ws.pong(); + }); + ws.addEventListener("pong", ({ data }) => { + expect(data).toBeInstanceOf(type); + done(); + }); + }); + } + }); + describe("send()", () => { + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.send(message); + }); + ws.addEventListener("message", ({ data }) => { + if (typeof data === "string") { + expect(data).toBe(message); + } else { + expect(data).toEqual(Buffer.from(bytes)); + } + done(); + }); + }); + } + }); + describe("ping()", () => { + test("(no argument)", (ws, done) => { + ws.addEventListener("open", () => { + ws.ping(); + }); + ws.addEventListener("ping", ({ data }) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.ping(message); + }); + ws.addEventListener("ping", ({ data }) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("pong()", () => { + test("(no argument)", (ws, done) => { + ws.addEventListener("open", () => { + ws.pong(); + }); + ws.addEventListener("pong", ({ data }) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.pong(message); + }); + ws.addEventListener("pong", ({ data }) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("close()", () => { + test("(no arguments)", (ws, done) => { + ws.addEventListener("open", () => { + ws.close(); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1000); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + test("(no reason)", (ws, done) => { + ws.addEventListener("open", () => { + ws.close(1001); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1001); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + // FIXME: Encoding issue + // Expected: "latin1-©" + // Received: "latin1-©" + /* + for (const { label, message } of strings) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.close(1002, message); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1002); + expect(reason).toBe(message); + expect(wasClean).toBeTrue(); + done(); + }); + }); + } + */ + }); + test("terminate()", (ws, done) => { + ws.addEventListener("open", () => { + ws.terminate(); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1006); + expect(reason).toBeString(); + expect(wasClean).toBeFalse(); + done(); + }); + }); +}); + +function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) { + it( + label, + testDone => { + let isDone = false; + const done = (err?: unknown) => { + if (!isDone) { + isDone = true; + testDone(err); + } + }; + listen() + .then(url => { + const ws = new WebSocket(url); + clients.push(ws); + fn(ws, done); + }) + .catch(done); + }, + { timeout: timeout ?? 1000 }, + ); +} + +async function listen(): Promise<URL> { + const { pathname } = new URL("./websocket-server-echo.mjs", import.meta.url); + const server = spawn({ + cmd: [nodeExe() ?? bunExe(), pathname], + cwd: import.meta.dir, + env: bunEnv, + stderr: "ignore", + stdout: "pipe", + }); + servers.push(server); + for await (const chunk of server.stdout) { + const text = new TextDecoder().decode(chunk); + try { + return new URL(text); + } catch { + throw new Error(`Invalid URL: '${text}'`); + } + } + throw new Error("No URL found?"); +} diff --git a/test/js/web/websocket/websocket-server-echo.mjs b/test/js/web/websocket/websocket-server-echo.mjs new file mode 100644 index 000000000..7ce1b205b --- /dev/null +++ b/test/js/web/websocket/websocket-server-echo.mjs @@ -0,0 +1,94 @@ +import { createServer } from "node:http"; +import { WebSocketServer } from "ws"; + +const server = createServer(); +const wss = new WebSocketServer({ + perMessageDeflate: false, + noServer: true, +}); + +server.on("listening", () => { + const { address, port, family } = server.address(); + const { href } = new URL(family === "IPv6" ? `ws://[${address}]:${port}` : `ws://${address}:${port}`); + console.log(href); + console.error("Listening:", href); +}); + +server.on("request", (request, response) => { + console.error("Received request:", { ...request.headers }); + response.end(); +}); + +server.on("clientError", (error, socket) => { + console.error("Received client error:", error); + socket.end(); +}); + +server.on("error", error => { + console.error("Received error:", error); +}); + +server.on("upgrade", (request, socket, head) => { + console.error("Received upgrade:", { ...request.headers }); + + socket.on("data", data => { + console.error("Received bytes:", data); + }); + + wss.handleUpgrade(request, socket, head, ws => { + wss.emit("connection", ws, request); + }); +}); + +wss.on("connection", (ws, request) => { + console.error("Received connection:", request.socket.remoteAddress); + + ws.on("message", message => { + console.error("Received message:", message); + ws.send(message); + + if (message === "ping") { + console.error("Sending ping"); + ws.ping(); + } else if (message === "pong") { + console.error("Sending pong"); + ws.pong(); + } else if (message === "close") { + console.error("Sending close"); + ws.close(); + } else if (message === "terminate") { + console.error("Sending terminate"); + ws.terminate(); + } + }); + + ws.on("ping", data => { + console.error("Received ping:", data); + ws.ping(data); + }); + + ws.on("pong", data => { + console.error("Received pong:", data); + ws.pong(data); + }); + + ws.on("close", (code, reason) => { + console.error("Received close:", code, reason); + }); + + ws.on("error", error => { + console.error("Received error:", error); + }); +}); + +server.on("close", () => { + console.error("Server closed"); +}); + +process.on("exit", exitCode => { + console.error("Server exited:", exitCode); +}); + +const hostname = process.env.HOST || "127.0.0.1"; +const port = parseInt(process.env.PORT) || 0; +server.listen(port, hostname); |