diff options
Diffstat (limited to 'test/bun.js/websocket-server.test.ts')
-rw-r--r-- | test/bun.js/websocket-server.test.ts | 296 |
1 files changed, 293 insertions, 3 deletions
diff --git a/test/bun.js/websocket-server.test.ts b/test/bun.js/websocket-server.test.ts index 086529a95..0d188fcce 100644 --- a/test/bun.js/websocket-server.test.ts +++ b/test/bun.js/websocket-server.test.ts @@ -1,6 +1,7 @@ -import { file, gc, serve } from "bun"; +import { file, serve } from "bun"; import { afterEach, describe, it, expect } from "bun:test"; -import { readFileSync } from "fs"; +import { readFileSync, writeSync } from "fs"; +import { gcTick } from "gc"; import { resolve } from "path"; var port = 4321; @@ -18,11 +19,169 @@ describe("websocket server", () => { port: getPort(), websocket: { open(ws) { - ws.send("hello world"); + ws.send(ws.data); }, message(ws, msg) {}, }, fetch(req, server) { + if (server.upgrade(req, { data: "hello world" })) { + if (server.upgrade(req)) { + throw new Error("should not upgrade twice"); + } + return; + } + + return new Response("noooooo hello world"); + }, + }); + + await new Promise((resolve, reject) => { + const websocket = new WebSocket(`ws://localhost:${server.port}`); + + websocket.onmessage = (e) => { + try { + expect(e.data).toBe("hello world"); + resolve(); + } catch (r) { + reject(r); + return; + } finally { + server?.stop(); + websocket.close(); + } + }; + websocket.onerror = (e) => { + reject(e); + }; + }); + }); + + it("binaryType works", async () => { + var done = false; + var server = serve({ + port: getPort(), + websocket: { + open(ws) { + ws.send(ws.data); + }, + message(ws, msg) { + if (ws.binaryType === "uint8array") { + expect(ws.binaryType).toBe("uint8array"); + ws.binaryType = "arraybuffer"; + expect(ws.binaryType).toBe("arraybuffer"); + expect(msg instanceof Uint8Array).toBe(true); + } else { + expect(ws.binaryType).toBe("arraybuffer"); + expect(msg instanceof ArrayBuffer).toBe(true); + done = true; + } + + ws.send("hello world"); + }, + }, + fetch(req, server) { + if (server.upgrade(req, { data: "hello world" })) { + if (server.upgrade(req)) { + throw new Error("should not upgrade twice"); + } + return; + } + + return new Response("noooooo hello world"); + }, + }); + + await new Promise((resolve, reject) => { + var counter = 0; + const websocket = new WebSocket(`ws://localhost:${server.port}`); + websocket.onopen = () => { + websocket.send(Buffer.from("hello world")); + }; + websocket.onmessage = (e) => { + try { + expect(e.data).toBe("hello world"); + + if (counter++ > 0) { + server?.stop(); + websocket.close(); + resolve(done); + } + websocket.send(Buffer.from("oaksd")); + } catch (r) { + server?.stop(); + websocket.close(); + reject(r); + return; + } finally { + } + }; + websocket.onerror = (e) => { + reject(e); + }; + }); + }); + + it("does not upgrade for non-websocket connections", async () => { + await new Promise(async (resolve, reject) => { + var server = serve({ + port: getPort(), + websocket: { + open(ws) { + ws.send("hello world"); + }, + message(ws, msg) {}, + }, + fetch(req, server) { + if (server.upgrade(req)) { + reject("should not upgrade"); + } + + return new Response("success"); + }, + }); + + const response = await fetch(`http://localhost:${server.port}`); + expect(await response.text()).toBe("success"); + resolve(); + server.stop(); + }); + }); + + it("does not upgrade for non-websocket servers", async () => { + await new Promise(async (resolve, reject) => { + var server = serve({ + port: getPort(), + + fetch(req, server) { + try { + server.upgrade(req); + reject("should not upgrade"); + } catch (e) { + resolve(); + } + + return new Response("success"); + }, + }); + + const response = await fetch(`http://localhost:${server.port}`); + expect(await response.text()).toBe("success"); + resolve(); + server.stop(); + }); + }); + + it("async can do hello world", async () => { + var server = serve({ + port: getPort(), + websocket: { + async open(ws) { + ws.send("hello world"); + }, + message(ws, msg) {}, + }, + async fetch(req, server) { + await 1; if (server.upgrade(req)) return; return new Response("noooooo hello world"); @@ -176,6 +335,7 @@ describe("websocket server", () => { open(ws) {}, message(ws, msg) { ws.send(sendQueue[serverCounter++] + " "); + gcTick(); }, }, fetch(req, server) { @@ -220,4 +380,134 @@ describe("websocket server", () => { server?.stop(); }); + + // this test sends 100 messages to 10 connected clients via pubsub + it("pub/sub", async () => { + var ropey = "hello world".repeat(10); + var sendQueue = []; + for (var i = 0; i < 100; i++) { + sendQueue.push(ropey + " " + i); + gcTick(); + } + var serverCounter = 0; + var clientCount = 0; + var server = serve({ + port: getPort(), + websocket: { + open(ws) { + ws.subscribe("test"); + gcTick(); + if (!ws.isSubscribed("test")) { + throw new Error("not subscribed"); + } + ws.unsubscribe("test"); + if (ws.isSubscribed("test")) { + throw new Error("subscribed"); + } + ws.subscribe("test"); + clientCount++; + if (clientCount === 10) + setTimeout(() => ws.publish("test", "hello world"), 1); + }, + message(ws, msg) { + if (serverCounter < sendQueue.length) + ws.publish("test", sendQueue[serverCounter++] + " "); + }, + }, + fetch(req, server) { + gcTick(); + + if ( + server.upgrade(req, { + data: { count: 0 }, + }) + ) + return; + return new Response("noooooo hello world"); + }, + }); + + const connections = new Array(10); + const websockets = new Array(connections.length); + var doneCounter = 0; + await new Promise((done) => { + for (var i = 0; i < connections.length; i++) { + var j = i; + var resolve, reject, resolveConnection, rejectConnection; + connections[j] = new Promise((res, rej) => { + resolveConnection = res; + rejectConnection = rej; + }); + websockets[j] = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + gcTick(); + const websocket = new WebSocket(`ws://localhost:${server.port}`); + websocket.onerror = (e) => { + reject(e); + }; + websocket.onclose = () => { + doneCounter++; + if (doneCounter === connections.length) { + done(); + } + }; + var hasOpened = false; + websocket.onopen = () => { + if (!hasOpened) { + hasOpened = true; + resolve(websocket); + } + }; + + let clientCounter = -1; + var hasSentThisTick = false; + + websocket.onmessage = (e) => { + gcTick(); + + if (!hasOpened) { + hasOpened = true; + resolve(websocket); + } + + if (e.data === "hello world") { + clientCounter = 0; + websocket.send("first"); + return; + } + + try { + expect(!!sendQueue.find((a) => a + " " === e.data)).toBe(true); + + if (!hasSentThisTick) { + websocket.send("second"); + hasSentThisTick = true; + queueMicrotask(() => { + hasSentThisTick = false; + }); + } + + gcTick(); + + if (clientCounter++ === sendQueue.length - 1) { + websocket.close(); + resolveConnection(); + } + } catch (r) { + console.error(r); + server?.stop(); + websocket.close(); + rejectConnection(r); + gcTick(); + return; + } finally { + } + }; + } + }); + server?.stop(); + expect(serverCounter).toBe(sendQueue.length); + }); }); |