diff options
author | 2023-07-13 09:39:43 -0700 | |
---|---|---|
committer | 2023-07-13 09:39:43 -0700 | |
commit | 9eb8eea2a81be6a20abb62544dc54a35ff4173a5 (patch) | |
tree | 63ab7bc037477b0db836067ac6c49c273d251353 | |
parent | 04b4157232006c882cdd693f566b31945618b924 (diff) | |
download | bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.tar.gz bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.tar.zst bun-9eb8eea2a81be6a20abb62544dc54a35ff4173a5.zip |
Implement `ping()`, `pong()`, `terminate()` for WebSocket client and server (#3257)
26 files changed, 2958 insertions, 1858 deletions
Binary files differ diff --git a/package.json b/package.json index 7c923579a..60e572514 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ }, "devDependencies": { "@types/react": "^18.0.25", + "@types/ws": "^8.5.5", "@typescript-eslint/eslint-plugin": "^5.31.0", "@typescript-eslint/parser": "^5.31.0", "bun-webkit": "0.0.1-dcaa801946a9526c0c4a40dceb9168b81aeb7973" diff --git a/packages/bun-types/bun.d.ts b/packages/bun-types/bun.d.ts index 86ded9aa0..d4e27fb4b 100644 --- a/packages/bun-types/bun.d.ts +++ b/packages/bun-types/bun.d.ts @@ -1284,17 +1284,28 @@ declare module "bun" { function build(config: BuildConfig): Promise<BuildOutput>; /** - * **0** means the message was **dropped** + * A status that represents the outcome of a sent message. * - * **-1** means **backpressure** - * - * **> 0** is the **number of bytes sent** + * - if **0**, the message was **dropped**. + * - if **-1**, there is **backpressure** of messages. + * - if **>0**, it represents the **number of bytes sent**. * + * @example + * ```js + * const status = ws.send("Hello!"); + * if (status === 0) { + * console.log("Message was dropped"); + * } else if (status === -1) { + * console.log("Backpressure was applied"); + * } else { + * console.log(`Success! Sent ${status} bytes`); + * } + * ``` */ type ServerWebSocketSendStatus = 0 | -1 | number; /** - * Fast WebSocket API designed for server environments. + * A fast WebSocket designed for servers. * * Features: * - **Message compression** - Messages can be compressed @@ -1304,315 +1315,248 @@ declare module "bun" { * * This is slightly different than the browser {@link WebSocket} which Bun supports for clients. * - * Powered by [uWebSockets](https://github.com/uNetworking/uWebSockets) + * Powered by [uWebSockets](https://github.com/uNetworking/uWebSockets). + * + * @example + * import { serve } from "bun"; + * + * serve({ + * websocket: { + * open(ws) { + * console.log("Connected", ws.remoteAddress); + * }, + * message(ws, data) { + * console.log("Received", data); + * ws.send(data); + * }, + * close(ws, code, reason) { + * console.log("Disconnected", code, reason); + * }, + * } + * }); */ export interface ServerWebSocket<T = undefined> { /** + * Sends a message to the client. * - * Send a message to the client. - * - * @param data The message to send - * @param compress Should the data be compressed? Ignored if the client does not support compression. - * - * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. - * - * @example - * - * ```js - * const status = ws.send("Hello World"); - * if (status === 0) { - * console.log("Message was dropped"); - * } else if (status === -1) { - * console.log("Backpressure was applied"); - * } else { - * console.log(`Message sent! ${status} bytes sent`); - * } - * ``` - * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. * @example - * - * ```js - * ws.send("Feeling very compressed", true); - * ``` - * - * @example - * - * ```js + * ws.send("Hello!"); + * ws.send("Compress this.", true); * ws.send(new Uint8Array([1, 2, 3, 4])); - * ``` - * - * @example - * - * ```js - * ws.send(new ArrayBuffer(4)); - * ``` - * - * @example - * - * ```js - * ws.send(new DataView(new ArrayBuffer(4))); - * ``` - * */ - send( - data: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer, - compress?: boolean, - ): ServerWebSocketSendStatus; + send(data: string | BufferSource, compress?: boolean): ServerWebSocketSendStatus; /** + * Sends a text message to the client. * - * Send a message to the client. - * - * This function is the same as {@link ServerWebSocket.send} but it only accepts a string. This function includes a fast path. - * - * @param data The message to send - * @param compress Should the data be compressed? Ignored if the client does not support compression. - * - * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. - * - * @example - * - * ```js - * const status = ws.send("Hello World"); - * if (status === 0) { - * console.log("Message was dropped"); - * } else if (status === -1) { - * console.log("Backpressure was applied"); - * } else { - * console.log(`Message sent! ${status} bytes sent`); - * } - * ``` - * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. * @example - * - * ```js - * ws.send("Feeling very compressed", true); - * ``` - * - * + * ws.send("Hello!"); + * ws.send("Compress this.", true); */ sendText(data: string, compress?: boolean): ServerWebSocketSendStatus; /** + * Sends a binary message to the client. * - * Send a message to the client. - * - * This function is the same as {@link ServerWebSocket.send} but it only accepts Uint8Array. - * - * @param data The message to send - * @param compress Should the data be compressed? Ignored if the client does not support compression. - * - * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. - * - * - * ```js - * ws.sendBinary(new Uint8Array([1, 2, 3, 4])); - * ``` - * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. * @example - * - * ```js - * ws.sendBinary(new ArrayBuffer(4)); - * ``` - * - * @example - * - * ```js - * ws.sendBinary(new DataView(new ArrayBuffer(4))); - * ``` - * + * ws.send(new TextEncoder().encode("Hello!")); + * ws.send(new Uint8Array([1, 2, 3, 4]), true); */ - sendBinary(data: Uint8Array, compress?: boolean): ServerWebSocketSendStatus; + sendBinary(data: BufferSource, compress?: boolean): ServerWebSocketSendStatus; /** - * Gently close the connection. - * - * @param code The close code + * Closes the connection. * - * @param reason The close reason + * Here is a list of close codes: + * - `1000` means "normal closure" **(default)** + * - `1009` means a message was too big and was rejected + * - `1011` means the server encountered an error + * - `1012` means the server is restarting + * - `1013` means the server is too busy or the client is rate-limited + * - `4000` through `4999` are reserved for applications (you can use it!) * - * To close the connection abruptly, use `close(0, "")` + * To close the connection abruptly, use `terminate()`. + * + * @param code The close code to send + * @param reason The close reason to send */ close(code?: number, reason?: string): void; /** - * Send a message to all subscribers of a topic - * - * @param topic The topic to publish to - * @param data The data to send - * @param compress Should the data be compressed? Ignored if the client does not support compression. - * - * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. - * - * @example - * - * ```js - * ws.publish("chat", "Hello World"); - * ``` + * Abruptly close the connection. * - * @example - * ```js - * ws.publish("chat", new Uint8Array([1, 2, 3, 4])); - * ``` - * - * @example - * ```js - * ws.publish("chat", new ArrayBuffer(4), true); - * ``` - * - * @example - * ```js - * ws.publish("chat", new DataView(new ArrayBuffer(4))); - * ``` + * To gracefully close the connection, use `close()`. */ - publish( - topic: string, - data: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer, - compress?: boolean, - ): ServerWebSocketSendStatus; + terminate(): void; /** - * Send a message to all subscribers of a topic + * Sends a ping. * - * This function is the same as {@link publish} but only accepts string input. This function has a fast path. - * - * @param topic The topic to publish to * @param data The data to send - * @param compress Should the data be compressed? Ignored if the client does not support compression. - * - * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. - * - * @example - * - * ```js - * ws.publishText("chat", "Hello World"); - * ``` - * */ - publishText( - topic: string, - data: string, - compress?: boolean, - ): ServerWebSocketSendStatus; + ping(data?: string | BufferSource): ServerWebSocketSendStatus; /** - * Send a message to all subscribers of a topic + * Sends a pong. * - * This function is the same as {@link publish} but only accepts a Uint8Array. This function has a fast path. - * - * @param topic The topic to publish to * @param data The data to send - * @param compress Should the data be compressed? Ignored if the client does not support compression. - * - * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. - * - * @example - * - * ```js - * ws.publishBinary("chat", "Hello World"); - * ``` + */ + pong(data?: string | BufferSource): ServerWebSocketSendStatus; + + /** + * Sends a message to subscribers of the topic. * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. * @example - * ```js - * ws.publishBinary("chat", new Uint8Array([1, 2, 3, 4])); - * ``` + * ws.publish("chat", "Hello!"); + * ws.publish("chat", "Compress this.", true); + * ws.publish("chat", new Uint8Array([1, 2, 3, 4])); + */ + publish(topic: string, data: string | BufferSource, compress?: boolean): ServerWebSocketSendStatus; + + /** + * Sends a text message to subscribers of the topic. * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. * @example - * ```js - * ws.publishBinary("chat", new ArrayBuffer(4), true); - * ``` + * ws.publish("chat", "Hello!"); + * ws.publish("chat", "Compress this.", true); + */ + publishText(topic: string, data: string, compress?: boolean): ServerWebSocketSendStatus; + + /** + * Sends a binary message to subscribers of the topic. * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. * @example - * ```js - * ws.publishBinary("chat", new DataView(new ArrayBuffer(4))); - * ``` + * ws.publish("chat", new TextEncoder().encode("Hello!")); + * ws.publish("chat", new Uint8Array([1, 2, 3, 4]), true); */ - publishBinary( - topic: string, - data: Uint8Array, - compress?: boolean, - ): ServerWebSocketSendStatus; + publishBinary(topic: string, data: BufferSource, compress?: boolean): ServerWebSocketSendStatus; /** - * Subscribe to a topic - * @param topic The topic to subscribe to + * Subscribes a client to the topic. * + * @param topic The topic name. * @example - * ```js * ws.subscribe("chat"); - * ``` */ subscribe(topic: string): void; /** - * Unsubscribe from a topic - * @param topic The topic to unsubscribe from + * Unsubscribes a client to the topic. * + * @param topic The topic name. * @example - * ```js * ws.unsubscribe("chat"); - * ``` - * */ unsubscribe(topic: string): void; /** - * Is the socket subscribed to a topic? - * @param topic The topic to check + * Is the client subscribed to a topic? * - * @returns `true` if the socket is subscribed to the topic, `false` otherwise + * @param topic The topic name. + * @example + * ws.subscribe("chat"); + * console.log(ws.isSubscribed("chat")); // true */ isSubscribed(topic: string): boolean; /** - * The remote address of the client + * Batches `send()` and `publish()` operations, which makes it faster to send data. + * + * The `message`, `open`, and `drain` callbacks are automatically corked, so + * you only need to call this if you are sending messages outside of those + * callbacks or in async functions. + * + * @param callback The callback to run. * @example - * ```js - * console.log(socket.remoteAddress); // "127.0.0.1" - * ``` + * ws.cork((ctx) => { + * ctx.send("These messages"); + * ctx.sendText("are sent"); + * ctx.sendBinary(new TextEncoder().encode("together!")); + * }); */ - readonly remoteAddress: string; + cork<T = unknown>(callback: (ws: ServerWebSocket<T>) => T): T; /** - * Ready state of the socket + * The IP address of the client. * * @example - * ```js - * console.log(socket.readyState); // 1 - * ``` + * console.log(socket.remoteAddress); // "127.0.0.1" */ - readonly readyState: -1 | 0 | 1 | 2 | 3; + readonly remoteAddress: string; /** - * The data from the {@link Server.upgrade} function + * The ready state of the client. * - * Put any data you want to share between the `fetch` function and the websocket here. + * - if `0`, the client is connecting. + * - if `1`, the client is connected. + * - if `2`, the client is closing. + * - if `3`, the client is closed. * - * You can read/write to this property at any time. + * @example + * console.log(socket.readyState); // 1 */ - data: T; + readonly readyState: WebSocketReadyState /** - * Batch data sent to a {@link ServerWebSocket} + * Sets how binary data is returned in events. * - * This makes it significantly faster to {@link ServerWebSocket.send} or {@link ServerWebSocket.publish} multiple messages + * - if `nodebuffer`, binary data is returned as `Buffer` objects. **(default)** + * - if `arraybuffer`, binary data is returned as `ArrayBuffer` objects. + * - if `uint8array`, binary data is returned as `Uint8Array` objects. * - * The `message`, `open`, and `drain` callbacks are automatically corked, so - * you only need to call this if you are sending messages outside of those - * callbacks or in async functions + * @example + * let ws: WebSocket; + * ws.binaryType = "uint8array"; + * ws.addEventListener("message", ({ data }) => { + * console.log(data instanceof Uint8Array); // true + * }); */ - cork: (callback: (ws: ServerWebSocket<T>) => any) => void | Promise<void>; + binaryType?: "nodebuffer" | "arraybuffer" | "uint8array"; /** - * Configure the {@link WebSocketHandler.message} callback to return a {@link ArrayBuffer} or {@link Buffer} instead of a {@link Uint8Array} + * Custom data that you can assign to a client, can be read and written at any time. * - * @default "nodebuffer" - * - * In Bun v0.6.2 and earlier, this defaulted to "uint8array" + * @example + * import { serve } from "bun"; + * + * serve({ + * fetch(request, server) { + * const data = { + * accessToken: request.headers.get("Authorization"), + * }; + * if (server.upgrade(request, { data })) { + * return; + * } + * return new Response(); + * }, + * websocket: { + * open(ws) { + * console.log(ws.data.accessToken); + * } + * } + * }); */ - binaryType?: "arraybuffer" | "uint8array" | "nodebuffer"; + data: T; } + /** + * Compression options for WebSocket messages. + */ type WebSocketCompressor = | "disable" | "shared" @@ -1664,112 +1608,120 @@ declare module "bun" { * }, * }); */ - export interface WebSocketHandler<T = undefined> { + export type WebSocketHandler<T = undefined> = { /** - * Handle an incoming message to a {@link ServerWebSocket} + * Called when the server receives an incoming message. * - * @param ws The {@link ServerWebSocket} that received the message - * @param message The message received + * If the message is not a `string`, its type is based on the value of `binaryType`. + * - if `nodebuffer`, then the message is a `Buffer`. + * - if `arraybuffer`, then the message is an `ArrayBuffer`. + * - if `uint8array`, then the message is a `Uint8Array`. * - * To change `message` to be an `ArrayBuffer` instead of a `Uint8Array`, set `ws.binaryType = "arraybuffer"` + * @param ws The websocket that sent the message + * @param message The message received */ - message: ( - ws: ServerWebSocket<T>, - message: string | Uint8Array, - ) => void | Promise<void>; + message(ws: ServerWebSocket<T>, message: string | Buffer): void | Promise<void>; /** - * The {@link ServerWebSocket} has been opened + * Called when a connection is opened. * - * @param ws The {@link ServerWebSocket} that was opened + * @param ws The websocket that was opened */ - open?: (ws: ServerWebSocket<T>) => void | Promise<void>; + open?(ws: ServerWebSocket<T>): void | Promise<void>; + /** - * The {@link ServerWebSocket} is ready for more data + * Called when a connection was previously under backpressure, + * meaning it had too many queued messages, but is now ready to receive more data. * - * @param ws The {@link ServerWebSocket} that is ready + * @param ws The websocket that is ready for more data */ - drain?: (ws: ServerWebSocket<T>) => void | Promise<void>; + drain?(ws: ServerWebSocket<T>): void | Promise<void>; + /** - * The {@link ServerWebSocket} is being closed - * @param ws The {@link ServerWebSocket} that was closed + * Called when a connection is closed. + * + * @param ws The websocket that was closed * @param code The close code * @param message The close message */ - close?: ( - ws: ServerWebSocket<T>, - code: number, - message: string, - ) => void | Promise<void>; + close?(ws: ServerWebSocket<T>, code: number, reason: string): void | Promise<void>; + /** - * Enable compression for clients that support it. By default, compression is disabled. + * Called when a ping is sent. * - * @default false - * - * `true` is equivalent to `"shared" + * @param ws The websocket that received the ping + * @param data The data sent with the ping */ - perMessageDeflate?: - | true - | false - | { - /** - * Enable compression on the {@link ServerWebSocket} - * - * @default false - * - * `true` is equivalent to `"shared" - */ - compress?: WebSocketCompressor | false | true; - /** - * Configure decompression - * - * @default false - * - * `true` is equivalent to `"shared" - */ - decompress?: WebSocketCompressor | false | true; - }; + ping?(ws: ServerWebSocket<T>, data: Buffer): void | Promise<void>; /** - * The maximum size of a message + * Called when a pong is received. + * + * @param ws The websocket that received the ping + * @param data The data sent with the ping */ - maxPayloadLength?: number; + pong?(ws: ServerWebSocket<T>, data: Buffer): void | Promise<void>; + /** - * After a connection has not received a message for this many seconds, it will be closed. - * @default 120 (2 minutes) + * Sets the maximum size of messages in bytes. + * + * Default is 16 MB, or `1024 * 1024 * 16` in bytes. */ - idleTimeout?: number; + maxPayloadLength?: number; + /** - * The maximum number of bytes that can be buffered for a single connection. - * @default 16MB + * Sets the maximum number of bytes that can be buffered on a single connection. + * + * Default is 16 MB, or `1024 * 1024 * 16` in bytes. */ backpressureLimit?: number; + /** - * Close the connection if the backpressure limit is reached. - * @default false - * @see {@link backpressureLimit} - * @see {@link ServerWebSocketSendStatus} - * @see {@link ServerWebSocket.send} - * @see {@link ServerWebSocket.publish} + * Sets if the connection should be closed if `backpressureLimit` is reached. + * + * Default is `false`. */ closeOnBackpressureLimit?: boolean; /** - * Control whether or not ws.publish() should include the `ServerWebSocket` - * that published the message. - * - * As of Bun v0.6.3, this option defaults to `false`. - * - * In Bun v0.6.2 and earlier, this option defaulted to `true`, but it was an API design mistake. A future version of Bun will eventually remove this option entirely. The better way to publish to all is to use {@link Server.publish}. + * Sets the the number of seconds to wait before timing out a connection + * due to no messages or pings. * - * if `true` {@link ServerWebSocket.publish} will publish to all subscribers, including the websocket publishing the message. + * Default is 2 minutes, or `120` in seconds. + */ + idleTimeout?: number; + + /** + * Should `ws.publish()` also send a message to `ws` (itself), if it is subscribed? * - * if `false` or `undefined`, {@link ServerWebSocket.publish} will publish to all subscribers excluding the websocket publishing the message. + * Default is `false`. + */ + publishToSelf?: boolean; + + /** + * Should the server automatically send and respond to pings to clients? * - * @default false + * Default is `true`. + */ + sendPings?: boolean; + + /** + * Sets the compression level for messages, for clients that supports it. By default, compression is disabled. * + * Default is `false`. */ - publishToSelf?: boolean; + perMessageDeflate?: + | boolean + | { + /** + * Sets the compression level. + */ + compress?: WebSocketCompressor | boolean; + /** + * Sets the decompression level. + */ + decompress?: WebSocketCompressor | boolean; + }; } interface GenericServeOptions { diff --git a/packages/bun-types/globals.d.ts b/packages/bun-types/globals.d.ts index 9e9b5138d..d74fabf1f 100644 --- a/packages/bun-types/globals.d.ts +++ b/packages/bun-types/globals.d.ts @@ -1,7 +1,7 @@ /** * "blob" is not supported yet */ -type BinaryType = "arraybuffer" | "nodebuffer" | "blob"; +type BinaryType = "nodebuffer" | "arraybuffer" | "blob"; type Transferable = ArrayBuffer; type MessageEventSource = undefined; type Encoding = "utf-8" | "windows-1252" | "utf-16"; @@ -1835,101 +1835,268 @@ declare var CustomEvent: { }; /** - * An implementation of the [WebSocket API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) + * A map of WebSocket event names to event types. */ -interface WebSocketEventMap { +type WebSocketEventMap = { + open: Event; + message: MessageEvent<string | Buffer>; close: CloseEvent; + ping: MessageEvent<Buffer>; + pong: MessageEvent<Buffer>; error: Event; - message: MessageEvent<Buffer | ArrayBuffer | string>; - open: Event; } -/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ +/** + * A state that represents if a WebSocket is connected. + * + * - `WebSocket.CONNECTING` is `0`, the connection is pending. + * - `WebSocket.OPEN` is `1`, the connection is established and `send()` is possible. + * - `WebSocket.CLOSING` is `2`, the connection is closing. + * - `WebSocket.CLOSED` is `3`, the connection is closed or couldn't be opened. + * + * @link https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState + */ +type WebSocketReadyState = 0 | 1 | 2 | 3; + +/** + * A client that makes an outgoing WebSocket connection. + * + * @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket + * @example + * const ws = new WebSocket("wss://ws.postman-echo.com/raw"); + * + * ws.addEventListener("open", () => { + * console.log("Connected"); + * }); + * ws.addEventListener("message", ({ data }) => { + * console.log("Received:", data); // string or Buffer + * }); + * ws.addEventListener("close", ({ code, reason }) => { + * console.log("Disconnected:", code, reason); + * }); + */ interface WebSocket extends EventTarget { /** - * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: + * Sends a message. + * + * @param data the string, ArrayBuffer, or ArrayBufferView to send + * @example + * let ws: WebSocket; + * ws.send("Hello!"); + * ws.send(new TextEncoder().encode("Hello?")); + */ + send(data: string | BufferSource): void; + + /** + * Closes the connection. + * + * Here is a list of close codes: + * - `1000` means "normal closure" **(default)** + * - `1001` means the client is "going away" + * - `1009` means a message was too big and was rejected + * - `1011` means the server encountered an error + * - `1012` means the server is restarting + * - `1013` means the server is too busy or the client is rate-limited + * - `4000` through `4999` are reserved for applications (you can use it!) + * + * To abruptly close the connection without a code, use `terminate()` instead. + * + * @param code the close code + * @param reason the close reason + * @example + * let ws: WebSocket; + * ws.close(1013, "Exceeded the rate limit of 100 messages per minute."); + */ + close(code?: number, reason?: string): void; + + /** + * Closes the connection, abruptly. * - * Can be set, to change how binary data is returned. The default is `"arraybuffer"`. + * To gracefuly close the connection, use `close()` instead. + */ + terminate(): void; + + /** + * Sends a ping. * - * Unlike in browsers, you can also set `binaryType` to `"nodebuffer"` to receive a {@link Buffer} object. + * @param data the string, ArrayBuffer, or ArrayBufferView to send + */ + ping(data?: string | BufferSource): void; + + /** + * Sends a pong. + * + * @param data the string, ArrayBuffer, or ArrayBufferView to send + */ + pong(data?: string | BufferSource): void; + + /** + * Sets how binary data is returned in events. + * + * - if `nodebuffer`, binary data is returned as `Buffer` objects. **(default)** + * - if `arraybuffer`, binary data is returned as `ArrayBuffer` objects. + * - if `blob`, binary data is returned as `Blob` objects. **(not supported)** + * + * In browsers, the default is `blob`, however in Bun, the default is `nodebuffer`. + * + * @example + * let ws: WebSocket; + * ws.binaryType = "arraybuffer"; + * ws.addEventListener("message", ({ data }) => { + * console.log(data instanceof ArrayBuffer); // true + * }); */ binaryType: BinaryType; + /** - * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. + * The ready state of the connection. * - * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) + * - `WebSocket.CONNECTING` is `0`, the connection is pending. + * - `WebSocket.OPEN` is `1`, the connection is established and `send()` is possible. + * - `WebSocket.CLOSING` is `2`, the connection is closing. + * - `WebSocket.CLOSED` is `3`, the connection is closed or couldn't be opened. + */ + readonly readyState: WebSocketReadyState; + + /** + * The resolved URL that established the connection. + */ + readonly url: string; + + /** + * The number of bytes that are queued, but not yet sent. + * + * When the connection is closed, the value is not reset to zero. */ readonly bufferedAmount: number; - /** Returns the extensions selected by the server, if any. */ + + /** + * The protocol selected by the server, if any, otherwise empty. + */ + readonly protocol: string; + + /** + * The extensions selected by the server, if any, otherwise empty. + */ readonly extensions: string; - onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; - onerror: ((this: WebSocket, ev: Event) => any) | null; + + /** + * Sets the event handler for `open` events. + * + * If you need multiple event handlers, use `addEventListener("open")` instead. + */ + onopen: ((this: WebSocket, ev: Event) => unknown) | null; + + /** + * Sets the event handler for `close` events. + * + * If you need multiple event handlers, use `addEventListener("close")` instead. + */ + onclose: ((this: WebSocket, event: CloseEvent) => unknown) | null; + + /** + * Sets the event handler for `message` events. + * + * If you need multiple event handlers, use `addEventListener("message")` instead. + */ onmessage: - | ((this: WebSocket, ev: WebSocketEventMap["message"]) => any) + | ((this: WebSocket, event: MessageEvent<string | Buffer>) => unknown) | null; - onopen: ((this: WebSocket, ev: Event) => any) | null; - /** Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. */ - readonly protocol: string; - /** Returns the state of the WebSocket object's connection. It can have the values described below. */ - readonly readyState: number; - /** Returns the URL that was used to establish the WebSocket connection. */ - readonly url: string; - /** Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. */ - close(code?: number, reason?: string): void; - /** Transmits data using the WebSocket connection. data can be a string, an ArrayBuffer, or an BufferSource. */ - send(data: string | ArrayBufferLike | BufferSource): void; - readonly CLOSED: number; - readonly CLOSING: number; - readonly CONNECTING: number; - readonly OPEN: number; - addEventListener<K extends keyof WebSocketEventMap>( - type: K, - listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + + /** + * Sets the event handler for `error` events. + * + * If you need multiple event handlers, use `addEventListener("error")` instead. + */ + onerror: ((this: WebSocket, event: Event) => unknown) | null; + + addEventListener<T extends keyof WebSocketEventMap>( + type: T, + listener: (this: WebSocket, event: WebSocketEventMap[T]) => unknown, options?: boolean | AddEventListenerOptions, ): void; + addEventListener( type: string, - listener: EventListenerOrEventListenerObject, + listener: (this: WebSocket, event: Event) => unknown, options?: boolean | AddEventListenerOptions, ): void; - removeEventListener<K extends keyof WebSocketEventMap>( - type: K, - listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + + removeEventListener<T extends keyof WebSocketEventMap>( + type: T, + listener: (this: WebSocket, event: WebSocketEventMap[T]) => unknown, options?: boolean | EventListenerOptions, ): void; + removeEventListener( type: string, - listener: EventListenerOrEventListenerObject, + listener: (this: WebSocket, event: Event) => unknown, options?: boolean | EventListenerOptions, ): void; } +/** + * A client that makes an outgoing WebSocket connection. + * + * @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket + * @example + * const ws = new WebSocket("wss://ws.postman-echo.com/raw"); + * + * ws.addEventListener("open", () => { + * console.log("Connected"); + * }); + * ws.addEventListener("message", ({ data }) => { + * console.log("Received:", data); // string or Buffer + * }); + * ws.addEventListener("close", ({ code, reason }) => { + * console.log("Disconnected:", code, reason); + * }); + */ declare var WebSocket: { prototype: WebSocket; - new (url: string | URL, protocols?: string | string[]): WebSocket; + + new ( + url: string | URL, + protocols?: string | string[], + ): WebSocket; + new ( url: string | URL, options: { /** - * An object specifying connection headers - * - * This is a Bun-specific extension. + * Sets the headers when establishing a connection. */ headers?: HeadersInit; /** - * A string specifying the subprotocols the server is willing to accept. + * Sets the sub-protocol the client is willing to accept. */ protocol?: string; /** - * A string array specifying the subprotocols the server is willing to accept. + * Sets the sub-protocols the client is willing to accept. */ protocols?: string[]; }, ): WebSocket; - readonly CLOSED: number; - readonly CLOSING: number; - readonly CONNECTING: number; - readonly OPEN: number; + + /** + * The connection is pending. + */ + readonly CONNECTING: 0; + + /** + * The connection is established and `send()` is possible. + */ + readonly OPEN: 1; + + /** + * The connection is closing. + */ + readonly CLOSING: 2; + + /** + * The connection is closed or couldn't be opened. + */ + readonly CLOSED: 3; }; /** diff --git a/src/bun.js/api/server.classes.ts b/src/bun.js/api/server.classes.ts index b3d174957..3aaea871f 100644 --- a/src/bun.js/api/server.classes.ts +++ b/src/bun.js/api/server.classes.ts @@ -45,7 +45,6 @@ export default [ args: ["JSUint8Array", "bool"], }, }, - publishText: { fn: "publishText", length: 2, @@ -62,10 +61,21 @@ export default [ args: ["JSString", "JSUint8Array"], }, }, - + ping: { + fn: "ping", + length: 1, + }, + pong: { + fn: "pong", + length: 1, + }, close: { fn: "close", - length: 1, + length: 3, + }, + terminate: { + fn: "terminate", + length: 0, }, cork: { fn: "cork", @@ -103,11 +113,6 @@ export default [ fn: "isSubscribed", length: 1, }, - - // topics: { - // getter: "getTopics", - // }, - remoteAddress: { getter: "getRemoteAddress", cache: true, diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 9625ff693..63e83d9bf 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -2976,11 +2976,11 @@ pub const WebSocketServer = struct { globalObject: *JSC.JSGlobalObject = undefined, handler: WebSocketServer.Handler = .{}, - maxPayloadLength: u32 = 1024 * 1024 * 16, + maxPayloadLength: u32 = 1024 * 1024 * 16, // 16MB maxLifetime: u16 = 0, - idleTimeout: u16 = 120, + idleTimeout: u16 = 120, // 2 minutes compression: i32 = 0, - backpressureLimit: u32 = 1024 * 1024 * 16, + backpressureLimit: u32 = 1024 * 1024 * 16, // 16MB sendPingsAutomatically: bool = true, resetIdleTimeoutOnSend: bool = true, closeOnBackpressureLimit: bool = false, @@ -2991,6 +2991,8 @@ pub const WebSocketServer = struct { onClose: JSC.JSValue = .zero, onDrain: JSC.JSValue = .zero, onError: JSC.JSValue = .zero, + onPing: JSC.JSValue = .zero, + onPong: JSC.JSValue = .zero, app: ?*anyopaque = null, @@ -3005,52 +3007,80 @@ pub const WebSocketServer = struct { pub fn fromJS(globalObject: *JSC.JSGlobalObject, object: JSC.JSValue) ?Handler { var handler = Handler{ .globalObject = globalObject }; + var vm = globalObject.vm(); + var valid = false; + if (object.getTruthy(globalObject, "message")) |message| { - if (!message.isCallable(globalObject.vm())) { + if (!message.isCallable(vm)) { globalObject.throwInvalidArguments("websocket expects a function for the message option", .{}); return null; } handler.onMessage = message; message.ensureStillAlive(); + valid = true; } if (object.getTruthy(globalObject, "open")) |open| { - if (!open.isCallable(globalObject.vm())) { + if (!open.isCallable(vm)) { globalObject.throwInvalidArguments("websocket expects a function for the open option", .{}); return null; } handler.onOpen = open; open.ensureStillAlive(); + valid = true; } if (object.getTruthy(globalObject, "close")) |close| { - if (!close.isCallable(globalObject.vm())) { + if (!close.isCallable(vm)) { globalObject.throwInvalidArguments("websocket expects a function for the close option", .{}); return null; } handler.onClose = close; close.ensureStillAlive(); + valid = true; } if (object.getTruthy(globalObject, "drain")) |drain| { - if (!drain.isCallable(globalObject.vm())) { + if (!drain.isCallable(vm)) { globalObject.throwInvalidArguments("websocket expects a function for the drain option", .{}); return null; } handler.onDrain = drain; drain.ensureStillAlive(); + valid = true; + } + + if (object.getTruthy(globalObject, "error")) |cb| { + if (!cb.isCallable(vm)) { + globalObject.throwInvalidArguments("websocket expects a function for the error option", .{}); + return null; + } + handler.onError = cb; + cb.ensureStillAlive(); + valid = true; + } + + if (object.getTruthy(globalObject, "ping")) |cb| { + if (!cb.isCallable(vm)) { + globalObject.throwInvalidArguments("websocket expects a function for the ping option", .{}); + return null; + } + handler.onPing = cb; + cb.ensureStillAlive(); + valid = true; } - if (object.getTruthy(globalObject, "onError")) |onError| { - if (!onError.isCallable(globalObject.vm())) { - globalObject.throwInvalidArguments("websocket expects a function for the onError option", .{}); + if (object.getTruthy(globalObject, "pong")) |cb| { + if (!cb.isCallable(vm)) { + globalObject.throwInvalidArguments("websocket expects a function for the pong option", .{}); return null; } - handler.onError = onError; - onError.ensureStillAlive(); + handler.onPong = cb; + cb.ensureStillAlive(); + valid = true; } - if (handler.onMessage != .zero or handler.onOpen != .zero) + if (valid) return handler; return null; @@ -3062,6 +3092,8 @@ pub const WebSocketServer = struct { this.onClose.protect(); this.onDrain.protect(); this.onError.protect(); + this.onPing.protect(); + this.onPong.protect(); } pub fn unprotect(this: Handler) void { @@ -3070,6 +3102,8 @@ pub const WebSocketServer = struct { this.onClose.unprotect(); this.onDrain.unprotect(); this.onError.unprotect(); + this.onPing.unprotect(); + this.onPong.unprotect(); } }; @@ -3197,6 +3231,7 @@ pub const WebSocketServer = struct { server.maxPayloadLength = @intCast(u32, @max(value.toInt64(), 0)); } } + if (object.get(globalObject, "idleTimeout")) |value| { if (!value.isUndefinedOrNull()) { if (!value.isAnyInt()) { @@ -3204,7 +3239,17 @@ pub const WebSocketServer = struct { return null; } - server.idleTimeout = value.to(u16); + var idleTimeout = @intCast(u16, @truncate(u32, @max(value.toInt64(), 0))); + if (idleTimeout > 960) { + globalObject.throwInvalidArguments("websocket expects idleTimeout to be 960 or less", .{}); + return null; + } else if (idleTimeout > 0) { + // uws does not allow idleTimeout to be between (0, 8], + // since its timer is not that accurate, therefore round up. + idleTimeout = @max(idleTimeout, 8); + } + + server.idleTimeout = idleTimeout; } } if (object.get(globalObject, "backpressureLimit")) |value| { @@ -3217,16 +3262,6 @@ pub const WebSocketServer = struct { server.backpressureLimit = @intCast(u32, @max(value.toInt64(), 0)); } } - // if (object.get(globalObject, "sendPings")) |value| { - // if (!value.isUndefinedOrNull()) { - // if (!value.isBoolean()) { - // globalObject.throwInvalidArguments("websocket expects sendPings to be a boolean", .{}); - // return null; - // } - - // server.sendPings = value.toBoolean(); - // } - // } if (object.get(globalObject, "closeOnBackpressureLimit")) |value| { if (!value.isUndefinedOrNull()) { @@ -3239,6 +3274,17 @@ pub const WebSocketServer = struct { } } + if (object.get(globalObject, "sendPings")) |value| { + if (!value.isUndefinedOrNull()) { + if (!value.isBoolean()) { + globalObject.throwInvalidArguments("websocket expects sendPings to be a boolean", .{}); + return null; + } + + server.sendPingsAutomatically = value.toBoolean(); + } + } + if (object.get(globalObject, "publishToSelf")) |value| { if (!value.isUndefinedOrNull()) { if (!value.isBoolean()) { @@ -3466,12 +3512,79 @@ pub const ServerWebSocket = struct { } } } - pub fn onPing(_: *ServerWebSocket, _: uws.AnyWebSocket, _: []const u8) void { - log("onPing", .{}); + + pub fn onPing(this: *ServerWebSocket, _: uws.AnyWebSocket, data: []const u8) void { + log("onPing: {s}", .{data}); + + var handler = this.handler; + var cb = handler.onPing; + if (cb.isEmptyOrUndefinedOrNull()) return; + + var globalThis = handler.globalObject; + const result = cb.call( + globalThis, + &[_]JSC.JSValue{ this.this_value, if (this.binary_type == .Buffer) + JSC.ArrayBuffer.create( + globalThis, + data, + .Buffer, + ) + else if (this.binary_type == .Uint8Array) + JSC.ArrayBuffer.create( + globalThis, + data, + .Uint8Array, + ) + else + JSC.ArrayBuffer.create( + globalThis, + data, + .ArrayBuffer, + ) }, + ); + + if (result.toError()) |err| { + log("onPing error", .{}); + handler.globalObject.bunVM().runErrorHandler(err, null); + } } - pub fn onPong(_: *ServerWebSocket, _: uws.AnyWebSocket, _: []const u8) void { - log("onPong", .{}); + + pub fn onPong(this: *ServerWebSocket, _: uws.AnyWebSocket, data: []const u8) void { + log("onPong: {s}", .{data}); + + var handler = this.handler; + var cb = handler.onPong; + if (cb.isEmptyOrUndefinedOrNull()) return; + + var globalThis = handler.globalObject; + const result = cb.call( + globalThis, + &[_]JSC.JSValue{ this.this_value, if (this.binary_type == .Buffer) + JSC.ArrayBuffer.create( + globalThis, + data, + .Buffer, + ) + else if (this.binary_type == .Uint8Array) + JSC.ArrayBuffer.create( + globalThis, + data, + .Uint8Array, + ) + else + JSC.ArrayBuffer.create( + globalThis, + data, + .ArrayBuffer, + ) }, + ); + + if (result.toError()) |err| { + log("onPong error", .{}); + handler.globalObject.bunVM().runErrorHandler(err, null); + } } + pub fn onClose(this: *ServerWebSocket, _: uws.AnyWebSocket, code: i32, message: []const u8) void { log("onClose", .{}); var handler = this.handler; @@ -3483,10 +3596,12 @@ pub const ServerWebSocket = struct { } } - if (handler.onClose != .zero) { + if (!handler.onClose.isEmptyOrUndefinedOrNull()) { + var str = ZigString.init(message); + str.markUTF8(); const result = handler.onClose.call( handler.globalObject, - &[_]JSC.JSValue{ this.this_value, JSValue.jsNumber(code), ZigString.init(message).toValueGC(handler.globalObject) }, + &[_]JSC.JSValue{ this.this_value, JSValue.jsNumber(code), str.toValueGC(handler.globalObject) }, ); if (result.toError()) |err| { @@ -4056,6 +4171,95 @@ pub const ServerWebSocket = struct { } } + pub fn ping( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + return sendPing(this, globalThis, callframe, "ping", .ping); + } + + pub fn pong( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + return sendPing(this, globalThis, callframe, "pong", .pong); + } + + inline fn sendPing( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + comptime name: string, + comptime opcode: uws.Opcode, + ) JSValue { + const args = callframe.arguments(2); + + if (this.closed) { + return JSValue.jsNumber(0); + } + + if (args.len > 0) { + var value = args.ptr[0]; + if (value.asArrayBuffer(globalThis)) |data| { + var buffer = data.slice(); + + switch (this.websocket.send(buffer, opcode, false, true)) { + .backpressure => { + log("{s}() backpressure ({d} bytes)", .{ name, buffer.len }); + return JSValue.jsNumber(-1); + }, + .success => { + log("{s}() success ({d} bytes)", .{ name, buffer.len }); + return JSValue.jsNumber(buffer.len); + }, + .dropped => { + log("{s}() dropped ({d} bytes)", .{ name, buffer.len }); + return JSValue.jsNumber(0); + }, + } + } else if (value.isString()) { + var string_value = value.toString(globalThis).toSlice(globalThis, bun.default_allocator); + defer string_value.deinit(); + var buffer = string_value.slice(); + + switch (this.websocket.send(buffer, opcode, false, true)) { + .backpressure => { + log("{s}() backpressure ({d} bytes)", .{ name, buffer.len }); + return JSValue.jsNumber(-1); + }, + .success => { + log("{s}() success ({d} bytes)", .{ name, buffer.len }); + return JSValue.jsNumber(buffer.len); + }, + .dropped => { + log("{s}() dropped ({d} bytes)", .{ name, buffer.len }); + return JSValue.jsNumber(0); + }, + } + } else { + globalThis.throwPretty("{s} requires a string or BufferSource", .{name}); + return .zero; + } + } + + switch (this.websocket.send(&.{}, opcode, false, true)) { + .backpressure => { + log("{s}() backpressure ({d} bytes)", .{ name, 0 }); + return JSValue.jsNumber(-1); + }, + .success => { + log("{s}() success ({d} bytes)", .{ name, 0 }); + return JSValue.jsNumber(0); + }, + .dropped => { + log("{s}() dropped ({d} bytes)", .{ name, 0 }); + return JSValue.jsNumber(0); + }, + } + } + pub fn getData( _: *ServerWebSocket, _: *JSC.JSGlobalObject, @@ -4096,26 +4300,38 @@ pub const ServerWebSocket = struct { log("close()", .{}); if (this.closed) { - return JSValue.jsUndefined(); + return .undefined; } - if (!this.opened) { - globalThis.throw("Calling close() inside open() is not supported. Consider changing your upgrade() callback instead", .{}); - return .zero; - } this.closed = true; const code = if (args.len > 0) args.ptr[0].toInt32() else @as(i32, 1000); var message_value = if (args.len > 1) args.ptr[1].toSlice(globalThis, bun.default_allocator) else ZigString.Slice.empty; defer message_value.deinit(); - if (code > 1000 or message_value.len > 0) { - this.websocket.end(code, message_value.slice()); - } else { - this.this_value.unprotect(); - this.websocket.close(); + + this.websocket.end(code, message_value.slice()); + return .undefined; + } + + pub fn terminate( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + _ = globalThis; + const args = callframe.arguments(2); + _ = args; + log("terminate()", .{}); + + if (this.closed) { + return .undefined; } - return JSValue.jsUndefined(); + this.closed = true; + this.this_value.unprotect(); + this.websocket.close(); + + return .undefined; } pub fn getBinaryType( diff --git a/src/bun.js/bindings/JSSink.cpp b/src/bun.js/bindings/JSSink.cpp index ed2554dc7..8be4fe95b 100644 --- a/src/bun.js/bindings/JSSink.cpp +++ b/src/bun.js/bindings/JSSink.cpp @@ -1,6 +1,6 @@ // AUTO-GENERATED FILE. DO NOT EDIT. -// Generated by 'make generate-sink' at 2023-07-06T14:22:07.346Z +// Generated by 'make generate-sink' at 2023-07-09T17:58:51.559Z // To regenerate this file, run: // // make generate-sink diff --git a/src/bun.js/bindings/JSSink.h b/src/bun.js/bindings/JSSink.h index 386554ebb..3826ef696 100644 --- a/src/bun.js/bindings/JSSink.h +++ b/src/bun.js/bindings/JSSink.h @@ -1,6 +1,6 @@ // AUTO-GENERATED FILE. DO NOT EDIT. -// Generated by 'make generate-sink' at 2023-07-06T14:22:07.345Z +// Generated by 'make generate-sink' at 2023-07-09T17:58:51.558Z // #pragma once diff --git a/src/bun.js/bindings/JSSinkLookupTable.h b/src/bun.js/bindings/JSSinkLookupTable.h index e4ed81629..7ff4c3f9c 100644 --- a/src/bun.js/bindings/JSSinkLookupTable.h +++ b/src/bun.js/bindings/JSSinkLookupTable.h @@ -1,4 +1,4 @@ -// Automatically generated from src/bun.js/bindings/JSSink.cpp using /home/cirospaciari/Repos/bun/src/bun.js/WebKit/Source/JavaScriptCore/create_hash_table. DO NOT EDIT! +// Automatically generated from src/bun.js/bindings/JSSink.cpp using /Users/ashcon/Desktop/code/bun/src/bun.js/WebKit/Source/JavaScriptCore/create_hash_table. DO NOT EDIT! diff --git a/src/bun.js/bindings/ZigGeneratedClasses.cpp b/src/bun.js/bindings/ZigGeneratedClasses.cpp index b4d672328..ce82dc1f1 100644 --- a/src/bun.js/bindings/ZigGeneratedClasses.cpp +++ b/src/bun.js/bindings/ZigGeneratedClasses.cpp @@ -14587,6 +14587,12 @@ JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__getBufferedAmountCallback); extern "C" EncodedJSValue ServerWebSocketPrototype__isSubscribed(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__isSubscribedCallback); +extern "C" EncodedJSValue ServerWebSocketPrototype__ping(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); +JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__pingCallback); + +extern "C" EncodedJSValue ServerWebSocketPrototype__pong(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); +JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__pongCallback); + extern "C" EncodedJSValue ServerWebSocketPrototype__publish(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__publishCallback); @@ -14686,6 +14692,9 @@ JSC_DEFINE_JIT_OPERATION(ServerWebSocketPrototype__sendTextWithoutTypeChecksWrap extern "C" EncodedJSValue ServerWebSocketPrototype__subscribe(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__subscribeCallback); +extern "C" EncodedJSValue ServerWebSocketPrototype__terminate(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); +JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__terminateCallback); + extern "C" EncodedJSValue ServerWebSocketPrototype__unsubscribe(void* ptr, JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame); JSC_DECLARE_HOST_FUNCTION(ServerWebSocketPrototype__unsubscribeCallback); @@ -14693,11 +14702,13 @@ STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSServerWebSocketPrototype, JSServerWebSocke static const HashTableValue JSServerWebSocketPrototypeTableValues[] = { { "binaryType"_s, static_cast<unsigned>(JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::GetterSetterType, ServerWebSocketPrototype__binaryTypeGetterWrap, ServerWebSocketPrototype__binaryTypeSetterWrap } }, - { "close"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__closeCallback, 1 } }, + { "close"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__closeCallback, 3 } }, { "cork"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__corkCallback, 1 } }, { "data"_s, static_cast<unsigned>(JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::GetterSetterType, ServerWebSocketPrototype__dataGetterWrap, ServerWebSocketPrototype__dataSetterWrap } }, { "getBufferedAmount"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__getBufferedAmountCallback, 0 } }, { "isSubscribed"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__isSubscribedCallback, 1 } }, + { "ping"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__pingCallback, 1 } }, + { "pong"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__pongCallback, 1 } }, { "publish"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__publishCallback, 3 } }, { "publishBinary"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::DOMJITFunction | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::DOMJITFunctionType, ServerWebSocketPrototype__publishBinaryCallback, &DOMJITSignatureForServerWebSocketPrototype__publishBinary } }, { "publishText"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::DOMJITFunction | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::DOMJITFunctionType, ServerWebSocketPrototype__publishTextCallback, &DOMJITSignatureForServerWebSocketPrototype__publishText } }, @@ -14707,6 +14718,7 @@ static const HashTableValue JSServerWebSocketPrototypeTableValues[] = { { "sendBinary"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::DOMJITFunction | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::DOMJITFunctionType, ServerWebSocketPrototype__sendBinaryCallback, &DOMJITSignatureForServerWebSocketPrototype__sendBinary } }, { "sendText"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::DOMJITFunction | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::DOMJITFunctionType, ServerWebSocketPrototype__sendTextCallback, &DOMJITSignatureForServerWebSocketPrototype__sendText } }, { "subscribe"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__subscribeCallback, 1 } }, + { "terminate"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__terminateCallback, 0 } }, { "unsubscribe"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | PropertyAttribute::DontDelete), NoIntrinsic, { HashTableValue::NativeFunctionType, ServerWebSocketPrototype__unsubscribeCallback, 1 } } }; @@ -14897,6 +14909,60 @@ JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__isSubscribedCallback, (JSGlob return ServerWebSocketPrototype__isSubscribed(thisObject->wrapped(), lexicalGlobalObject, callFrame); } +JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__pingCallback, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + auto& vm = lexicalGlobalObject->vm(); + + JSServerWebSocket* thisObject = jsDynamicCast<JSServerWebSocket*>(callFrame->thisValue()); + + if (UNLIKELY(!thisObject)) { + auto throwScope = DECLARE_THROW_SCOPE(vm); + return throwVMTypeError(lexicalGlobalObject, throwScope); + } + + JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); + +#ifdef BUN_DEBUG + /** View the file name of the JS file that called this function + * from a debugger */ + SourceOrigin sourceOrigin = callFrame->callerSourceOrigin(vm); + const char* fileName = sourceOrigin.string().utf8().data(); + static const char* lastFileName = nullptr; + if (lastFileName != fileName) { + lastFileName = fileName; + } +#endif + + return ServerWebSocketPrototype__ping(thisObject->wrapped(), lexicalGlobalObject, callFrame); +} + +JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__pongCallback, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + auto& vm = lexicalGlobalObject->vm(); + + JSServerWebSocket* thisObject = jsDynamicCast<JSServerWebSocket*>(callFrame->thisValue()); + + if (UNLIKELY(!thisObject)) { + auto throwScope = DECLARE_THROW_SCOPE(vm); + return throwVMTypeError(lexicalGlobalObject, throwScope); + } + + JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); + +#ifdef BUN_DEBUG + /** View the file name of the JS file that called this function + * from a debugger */ + SourceOrigin sourceOrigin = callFrame->callerSourceOrigin(vm); + const char* fileName = sourceOrigin.string().utf8().data(); + static const char* lastFileName = nullptr; + if (lastFileName != fileName) { + lastFileName = fileName; + } +#endif + + return ServerWebSocketPrototype__pong(thisObject->wrapped(), lexicalGlobalObject, callFrame); +} + JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__publishCallback, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) { auto& vm = lexicalGlobalObject->vm(); @@ -15129,6 +15195,33 @@ JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__subscribeCallback, (JSGlobalO return ServerWebSocketPrototype__subscribe(thisObject->wrapped(), lexicalGlobalObject, callFrame); } +JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__terminateCallback, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + auto& vm = lexicalGlobalObject->vm(); + + JSServerWebSocket* thisObject = jsDynamicCast<JSServerWebSocket*>(callFrame->thisValue()); + + if (UNLIKELY(!thisObject)) { + auto throwScope = DECLARE_THROW_SCOPE(vm); + return throwVMTypeError(lexicalGlobalObject, throwScope); + } + + JSC::EnsureStillAliveScope thisArg = JSC::EnsureStillAliveScope(thisObject); + +#ifdef BUN_DEBUG + /** View the file name of the JS file that called this function + * from a debugger */ + SourceOrigin sourceOrigin = callFrame->callerSourceOrigin(vm); + const char* fileName = sourceOrigin.string().utf8().data(); + static const char* lastFileName = nullptr; + if (lastFileName != fileName) { + lastFileName = fileName; + } +#endif + + return ServerWebSocketPrototype__terminate(thisObject->wrapped(), lexicalGlobalObject, callFrame); +} + JSC_DEFINE_HOST_FUNCTION(ServerWebSocketPrototype__unsubscribeCallback, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) { auto& vm = lexicalGlobalObject->vm(); diff --git a/src/bun.js/bindings/generated_classes.zig b/src/bun.js/bindings/generated_classes.zig index 171bba792..5f83630c5 100644 --- a/src/bun.js/bindings/generated_classes.zig +++ b/src/bun.js/bindings/generated_classes.zig @@ -3834,6 +3834,10 @@ pub const JSServerWebSocket = struct { @compileLog("Expected ServerWebSocket.getBufferedAmount to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.getBufferedAmount))); if (@TypeOf(ServerWebSocket.isSubscribed) != CallbackType) @compileLog("Expected ServerWebSocket.isSubscribed to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.isSubscribed))); + if (@TypeOf(ServerWebSocket.ping) != CallbackType) + @compileLog("Expected ServerWebSocket.ping to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.ping))); + if (@TypeOf(ServerWebSocket.pong) != CallbackType) + @compileLog("Expected ServerWebSocket.pong to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.pong))); if (@TypeOf(ServerWebSocket.publish) != CallbackType) @compileLog("Expected ServerWebSocket.publish to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.publish))); if (@TypeOf(ServerWebSocket.publishBinaryWithoutTypeChecks) != fn (*ServerWebSocket, *JSC.JSGlobalObject, *JSC.JSString, *JSC.JSUint8Array) callconv(.C) JSC.JSValue) @@ -3862,6 +3866,8 @@ pub const JSServerWebSocket = struct { @compileLog("Expected ServerWebSocket.sendText to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.sendText))); if (@TypeOf(ServerWebSocket.subscribe) != CallbackType) @compileLog("Expected ServerWebSocket.subscribe to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.subscribe))); + if (@TypeOf(ServerWebSocket.terminate) != CallbackType) + @compileLog("Expected ServerWebSocket.terminate to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.terminate))); if (@TypeOf(ServerWebSocket.unsubscribe) != CallbackType) @compileLog("Expected ServerWebSocket.unsubscribe to be a callback but received " ++ @typeName(@TypeOf(ServerWebSocket.unsubscribe))); if (!JSC.is_bindgen) { @@ -3875,6 +3881,8 @@ pub const JSServerWebSocket = struct { @export(ServerWebSocket.getReadyState, .{ .name = "ServerWebSocketPrototype__getReadyState" }); @export(ServerWebSocket.getRemoteAddress, .{ .name = "ServerWebSocketPrototype__getRemoteAddress" }); @export(ServerWebSocket.isSubscribed, .{ .name = "ServerWebSocketPrototype__isSubscribed" }); + @export(ServerWebSocket.ping, .{ .name = "ServerWebSocketPrototype__ping" }); + @export(ServerWebSocket.pong, .{ .name = "ServerWebSocketPrototype__pong" }); @export(ServerWebSocket.publish, .{ .name = "ServerWebSocketPrototype__publish" }); @export(ServerWebSocket.publishBinary, .{ .name = "ServerWebSocketPrototype__publishBinary" }); @export(ServerWebSocket.publishBinaryWithoutTypeChecks, .{ .name = "ServerWebSocketPrototype__publishBinaryWithoutTypeChecks" }); @@ -3888,6 +3896,7 @@ pub const JSServerWebSocket = struct { @export(ServerWebSocket.setBinaryType, .{ .name = "ServerWebSocketPrototype__setBinaryType" }); @export(ServerWebSocket.setData, .{ .name = "ServerWebSocketPrototype__setData" }); @export(ServerWebSocket.subscribe, .{ .name = "ServerWebSocketPrototype__subscribe" }); + @export(ServerWebSocket.terminate, .{ .name = "ServerWebSocketPrototype__terminate" }); @export(ServerWebSocket.unsubscribe, .{ .name = "ServerWebSocketPrototype__unsubscribe" }); } } diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index f507121f8..1a6a40c9f 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -32,113 +32,113 @@ typedef void* JSClassRef; #include "JavaScriptCore/JSClassRef.h" #endif #include "headers-handwritten.h" - typedef struct bJSC__ThrowScope { unsigned char bytes[8]; } bJSC__ThrowScope; - typedef char* bJSC__ThrowScope_buf; - typedef struct bJSC__Exception { unsigned char bytes[40]; } bJSC__Exception; - typedef char* bJSC__Exception_buf; - typedef struct bJSC__VM { unsigned char bytes[52176]; } bJSC__VM; - typedef char* bJSC__VM_buf; - typedef struct bJSC__JSString { unsigned char bytes[16]; } bJSC__JSString; - typedef char* bJSC__JSString_buf; - typedef struct bJSC__JSGlobalObject { unsigned char bytes[3128]; } bJSC__JSGlobalObject; - typedef char* bJSC__JSGlobalObject_buf; - typedef struct bJSC__JSCell { unsigned char bytes[8]; } bJSC__JSCell; - typedef char* bJSC__JSCell_buf; - typedef struct bJSC__JSInternalPromise { unsigned char bytes[32]; } bJSC__JSInternalPromise; - typedef char* bJSC__JSInternalPromise_buf; typedef struct bJSC__JSPromise { unsigned char bytes[32]; } bJSC__JSPromise; typedef char* bJSC__JSPromise_buf; + typedef struct bJSC__JSCell { unsigned char bytes[8]; } bJSC__JSCell; + typedef char* bJSC__JSCell_buf; + typedef struct bJSC__Exception { unsigned char bytes[40]; } bJSC__Exception; + typedef char* bJSC__Exception_buf; typedef struct bJSC__JSObject { unsigned char bytes[16]; } bJSC__JSObject; typedef char* bJSC__JSObject_buf; + typedef struct bJSC__ThrowScope { unsigned char bytes[8]; } bJSC__ThrowScope; + typedef char* bJSC__ThrowScope_buf; typedef struct bJSC__CatchScope { unsigned char bytes[8]; } bJSC__CatchScope; typedef char* bJSC__CatchScope_buf; + typedef struct bJSC__JSString { unsigned char bytes[16]; } bJSC__JSString; + typedef char* bJSC__JSString_buf; + typedef struct bJSC__JSInternalPromise { unsigned char bytes[32]; } bJSC__JSInternalPromise; + typedef char* bJSC__JSInternalPromise_buf; + typedef struct bJSC__JSGlobalObject { unsigned char bytes[3128]; } bJSC__JSGlobalObject; + typedef char* bJSC__JSGlobalObject_buf; + typedef struct bJSC__VM { unsigned char bytes[52176]; } bJSC__VM; + typedef char* bJSC__VM_buf; #ifndef __cplusplus - typedef bJSC__CatchScope JSC__CatchScope; // JSC::CatchScope - typedef ErrorableResolvedSource ErrorableResolvedSource; + typedef Bun__ArrayBuffer Bun__ArrayBuffer; + typedef bJSC__JSString JSC__JSString; // JSC::JSString typedef BunString BunString; - typedef ErrorableString ErrorableString; - typedef bJSC__ThrowScope JSC__ThrowScope; // JSC::ThrowScope - typedef bJSC__JSObject JSC__JSObject; // JSC::JSObject - typedef WebSocketClient WebSocketClient; - typedef struct WebCore__AbortSignal WebCore__AbortSignal; // WebCore::AbortSignal - typedef struct JSC__JSMap JSC__JSMap; // JSC::JSMap - typedef WebSocketHTTPSClient WebSocketHTTPSClient; - typedef JSClassRef JSClassRef; + typedef int64_t JSC__JSValue; + typedef ZigString ZigString; + typedef struct WebCore__DOMFormData WebCore__DOMFormData; // WebCore::DOMFormData + typedef struct WebCore__DOMURL WebCore__DOMURL; // WebCore::DOMURL + typedef struct WebCore__FetchHeaders WebCore__FetchHeaders; // WebCore::FetchHeaders + typedef ErrorableResolvedSource ErrorableResolvedSource; + typedef bJSC__JSPromise JSC__JSPromise; // JSC::JSPromise typedef bJSC__VM JSC__VM; // JSC::VM - typedef Bun__ArrayBuffer Bun__ArrayBuffer; - typedef Uint8Array_alias Uint8Array_alias; - typedef WebSocketClientTLS WebSocketClientTLS; - typedef bJSC__JSGlobalObject JSC__JSGlobalObject; // JSC::JSGlobalObject + typedef bJSC__CatchScope JSC__CatchScope; // JSC::CatchScope typedef ZigException ZigException; - typedef bJSC__JSPromise JSC__JSPromise; // JSC::JSPromise + typedef struct JSC__CallFrame JSC__CallFrame; // JSC::CallFrame + typedef bJSC__ThrowScope JSC__ThrowScope; // JSC::ThrowScope + typedef bJSC__Exception JSC__Exception; // JSC::Exception typedef WebSocketHTTPClient WebSocketHTTPClient; + typedef WebSocketClient WebSocketClient; + typedef WebSocketClientTLS WebSocketClientTLS; + typedef ErrorableString ErrorableString; + typedef bJSC__JSObject JSC__JSObject; // JSC::JSObject + typedef struct JSC__JSMap JSC__JSMap; // JSC::JSMap typedef SystemError SystemError; + typedef Uint8Array_alias Uint8Array_alias; typedef bJSC__JSCell JSC__JSCell; // JSC::JSCell - typedef ZigString ZigString; - typedef struct WebCore__DOMURL WebCore__DOMURL; // WebCore::DOMURL - typedef int64_t JSC__JSValue; + typedef bJSC__JSGlobalObject JSC__JSGlobalObject; // JSC::JSGlobalObject + typedef struct WebCore__AbortSignal WebCore__AbortSignal; // WebCore::AbortSignal + typedef JSClassRef JSClassRef; typedef bJSC__JSInternalPromise JSC__JSInternalPromise; // JSC::JSInternalPromise - typedef bJSC__Exception JSC__Exception; // JSC::Exception - typedef bJSC__JSString JSC__JSString; // JSC::JSString - typedef struct WebCore__DOMFormData WebCore__DOMFormData; // WebCore::DOMFormData - typedef struct JSC__CallFrame JSC__CallFrame; // JSC::CallFrame - typedef struct WebCore__FetchHeaders WebCore__FetchHeaders; // WebCore::FetchHeaders + typedef WebSocketHTTPSClient WebSocketHTTPSClient; #endif #ifdef __cplusplus namespace JSC { - class JSMap; - class JSCell; - class JSObject; class JSGlobalObject; - class JSPromise; class Exception; - class JSString; + class JSObject; class JSInternalPromise; + class JSString; + class JSCell; + class JSMap; + class JSPromise; class CatchScope; class VM; - class CallFrame; class ThrowScope; + class CallFrame; } namespace WebCore { + class FetchHeaders; class DOMFormData; - class DOMURL; class AbortSignal; - class FetchHeaders; + class DOMURL; } - typedef ErrorableResolvedSource ErrorableResolvedSource; - typedef BunString BunString; - typedef ErrorableString ErrorableString; - typedef WebSocketClient WebSocketClient; - typedef WebSocketHTTPSClient WebSocketHTTPSClient; - typedef JSClassRef JSClassRef; typedef Bun__ArrayBuffer Bun__ArrayBuffer; - typedef Uint8Array_alias Uint8Array_alias; - typedef WebSocketClientTLS WebSocketClientTLS; + typedef BunString BunString; + typedef int64_t JSC__JSValue; + typedef ZigString ZigString; + typedef ErrorableResolvedSource ErrorableResolvedSource; typedef ZigException ZigException; typedef WebSocketHTTPClient WebSocketHTTPClient; + typedef WebSocketClient WebSocketClient; + typedef WebSocketClientTLS WebSocketClientTLS; + typedef ErrorableString ErrorableString; typedef SystemError SystemError; - typedef ZigString ZigString; - typedef int64_t JSC__JSValue; - using JSC__JSMap = JSC::JSMap; - using JSC__JSCell = JSC::JSCell; - using JSC__JSObject = JSC::JSObject; + typedef Uint8Array_alias Uint8Array_alias; + typedef JSClassRef JSClassRef; + typedef WebSocketHTTPSClient WebSocketHTTPSClient; using JSC__JSGlobalObject = JSC::JSGlobalObject; - using JSC__JSPromise = JSC::JSPromise; using JSC__Exception = JSC::Exception; - using JSC__JSString = JSC::JSString; + using JSC__JSObject = JSC::JSObject; using JSC__JSInternalPromise = JSC::JSInternalPromise; + using JSC__JSString = JSC::JSString; + using JSC__JSCell = JSC::JSCell; + using JSC__JSMap = JSC::JSMap; + using JSC__JSPromise = JSC::JSPromise; using JSC__CatchScope = JSC::CatchScope; using JSC__VM = JSC::VM; - using JSC__CallFrame = JSC::CallFrame; using JSC__ThrowScope = JSC::ThrowScope; + using JSC__CallFrame = JSC::CallFrame; + using WebCore__FetchHeaders = WebCore::FetchHeaders; using WebCore__DOMFormData = WebCore::DOMFormData; - using WebCore__DOMURL = WebCore::DOMURL; using WebCore__AbortSignal = WebCore::AbortSignal; - using WebCore__FetchHeaders = WebCore::FetchHeaders; + using WebCore__DOMURL = WebCore::DOMURL; #endif @@ -734,23 +734,25 @@ ZIG_DECL void Bun__WebSocketHTTPSClient__register(JSC__JSGlobalObject* arg0, voi #ifdef __cplusplus +ZIG_DECL void Bun__WebSocketClient__cancel(WebSocketClient* arg0); ZIG_DECL void Bun__WebSocketClient__close(WebSocketClient* arg0, uint16_t arg1, const ZigString* arg2); ZIG_DECL void Bun__WebSocketClient__finalize(WebSocketClient* arg0); ZIG_DECL void* Bun__WebSocketClient__init(CppWebSocket* arg0, void* arg1, void* arg2, JSC__JSGlobalObject* arg3, unsigned char* arg4, size_t arg5); ZIG_DECL void Bun__WebSocketClient__register(JSC__JSGlobalObject* arg0, void* arg1, void* arg2); -ZIG_DECL void Bun__WebSocketClient__writeBinaryData(WebSocketClient* arg0, const unsigned char* arg1, size_t arg2); -ZIG_DECL void Bun__WebSocketClient__writeString(WebSocketClient* arg0, const ZigString* arg1); +ZIG_DECL void Bun__WebSocketClient__writeBinaryData(WebSocketClient* arg0, const unsigned char* arg1, size_t arg2, unsigned char arg3); +ZIG_DECL void Bun__WebSocketClient__writeString(WebSocketClient* arg0, const ZigString* arg1, unsigned char arg2); #endif #ifdef __cplusplus +ZIG_DECL void Bun__WebSocketClientTLS__cancel(WebSocketClientTLS* arg0); ZIG_DECL void Bun__WebSocketClientTLS__close(WebSocketClientTLS* arg0, uint16_t arg1, const ZigString* arg2); ZIG_DECL void Bun__WebSocketClientTLS__finalize(WebSocketClientTLS* arg0); ZIG_DECL void* Bun__WebSocketClientTLS__init(CppWebSocket* arg0, void* arg1, void* arg2, JSC__JSGlobalObject* arg3, unsigned char* arg4, size_t arg5); ZIG_DECL void Bun__WebSocketClientTLS__register(JSC__JSGlobalObject* arg0, void* arg1, void* arg2); -ZIG_DECL void Bun__WebSocketClientTLS__writeBinaryData(WebSocketClientTLS* arg0, const unsigned char* arg1, size_t arg2); -ZIG_DECL void Bun__WebSocketClientTLS__writeString(WebSocketClientTLS* arg0, const ZigString* arg1); +ZIG_DECL void Bun__WebSocketClientTLS__writeBinaryData(WebSocketClientTLS* arg0, const unsigned char* arg1, size_t arg2, unsigned char arg3); +ZIG_DECL void Bun__WebSocketClientTLS__writeString(WebSocketClientTLS* arg0, const ZigString* arg1, unsigned char arg2); #endif diff --git a/src/bun.js/bindings/headers.zig b/src/bun.js/bindings/headers.zig index 666369b21..be2e4a626 100644 --- a/src/bun.js/bindings/headers.zig +++ b/src/bun.js/bindings/headers.zig @@ -72,15 +72,15 @@ pub const WebSocketHTTPSClient = bindings.WebSocketHTTPSClient; pub const WebSocketClient = bindings.WebSocketClient; pub const WebSocketClientTLS = bindings.WebSocketClientTLS; pub const BunString = @import("root").bun.String; +pub const JSC__JSString = bJSC__JSString; +pub const JSC__JSPromise = bJSC__JSPromise; +pub const JSC__VM = bJSC__VM; pub const JSC__ThrowScope = bJSC__ThrowScope; +pub const JSC__Exception = bJSC__Exception; pub const JSC__JSObject = bJSC__JSObject; -pub const JSC__VM = bJSC__VM; -pub const JSC__JSGlobalObject = bJSC__JSGlobalObject; -pub const JSC__JSPromise = bJSC__JSPromise; pub const JSC__JSCell = bJSC__JSCell; +pub const JSC__JSGlobalObject = bJSC__JSGlobalObject; pub const JSC__JSInternalPromise = bJSC__JSInternalPromise; -pub const JSC__Exception = bJSC__Exception; -pub const JSC__JSString = bJSC__JSString; pub extern fn JSC__JSObject__create(arg0: *bindings.JSGlobalObject, arg1: usize, arg2: ?*anyopaque, ArgFn3: ?*const fn (?*anyopaque, [*c]bindings.JSObject, *bindings.JSGlobalObject) callconv(.C) void) JSC__JSValue; pub extern fn JSC__JSObject__getArrayLength(arg0: [*c]bindings.JSObject) usize; pub extern fn JSC__JSObject__getDirect(arg0: [*c]bindings.JSObject, arg1: *bindings.JSGlobalObject, arg2: [*c]const ZigString) JSC__JSValue; diff --git a/src/bun.js/bindings/webcore/JSWebSocket.cpp b/src/bun.js/bindings/webcore/JSWebSocket.cpp index de5c644e7..7b099544b 100644 --- a/src/bun.js/bindings/webcore/JSWebSocket.cpp +++ b/src/bun.js/bindings/webcore/JSWebSocket.cpp @@ -71,6 +71,9 @@ using namespace JSC; static JSC_DECLARE_HOST_FUNCTION(jsWebSocketPrototypeFunction_send); static JSC_DECLARE_HOST_FUNCTION(jsWebSocketPrototypeFunction_close); +static JSC_DECLARE_HOST_FUNCTION(jsWebSocketPrototypeFunction_ping); +static JSC_DECLARE_HOST_FUNCTION(jsWebSocketPrototypeFunction_pong); +static JSC_DECLARE_HOST_FUNCTION(jsWebSocketPrototypeFunction_terminate); // Attributes @@ -293,7 +296,7 @@ template<> void JSWebSocketDOMConstructor::initializeProperties(VM& vm, JSDOMGlo static const HashTableValue JSWebSocketPrototypeTableValues[] = { { "constructor"_s, static_cast<unsigned>(JSC::PropertyAttribute::DontEnum), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocketConstructor, 0 } }, - { "URL"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocket_URL, 0 } }, + { "URL"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute | JSC::PropertyAttribute::DontEnum), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocket_URL, 0 } }, { "url"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocket_url, 0 } }, { "readyState"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocket_readyState, 0 } }, { "bufferedAmount"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocket_bufferedAmount, 0 } }, @@ -306,6 +309,9 @@ static const HashTableValue JSWebSocketPrototypeTableValues[] = { { "binaryType"_s, static_cast<unsigned>(JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DOMAttribute), NoIntrinsic, { HashTableValue::GetterSetterType, jsWebSocket_binaryType, setJSWebSocket_binaryType } }, { "send"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsWebSocketPrototypeFunction_send, 1 } }, { "close"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsWebSocketPrototypeFunction_close, 0 } }, + { "ping"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsWebSocketPrototypeFunction_ping, 1 } }, + { "pong"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsWebSocketPrototypeFunction_pong, 1 } }, + { "terminate"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsWebSocketPrototypeFunction_terminate, 0 } }, { "CONNECTING"_s, JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::ConstantInteger, NoIntrinsic, { HashTableValue::ConstantType, 0 } }, { "OPEN"_s, JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::ConstantInteger, NoIntrinsic, { HashTableValue::ConstantType, 1 } }, { "CLOSING"_s, JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::ConstantInteger, NoIntrinsic, { HashTableValue::ConstantType, 2 } }, @@ -672,6 +678,199 @@ JSC_DEFINE_HOST_FUNCTION(jsWebSocketPrototypeFunction_close, (JSGlobalObject * l return IDLOperation<JSWebSocket>::call<jsWebSocketPrototypeFunction_closeBody>(*lexicalGlobalObject, *callFrame, "close"); } +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping1Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(); }))); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping2Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); + auto data = convert<IDLArrayBuffer>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data", "WebSocket", "ping", "ArrayBuffer"); }); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(*data); }))); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping3Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); + auto data = convert<IDLArrayBufferView>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data", "WebSocket", "ping", "ArrayBufferView"); }); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(data.releaseNonNull()); }))); +} + +// static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping4Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +// { +// auto& vm = JSC::getVM(lexicalGlobalObject); +// auto throwScope = DECLARE_THROW_SCOPE(vm); +// UNUSED_PARAM(throwScope); +// UNUSED_PARAM(callFrame); +// auto& impl = castedThis->wrapped(); +// EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); +// auto data = convert<IDLInterface<Blob>>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data", "WebSocket", "ping", "Blob"); }); +// RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); +// RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(*data); }))); +// } + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping5Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); + auto data = convert<IDLUSVString>(*lexicalGlobalObject, argument0.value()); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(WTFMove(data)); }))); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pingOverloadDispatcher(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + size_t argsCount = std::min<size_t>(1, callFrame->argumentCount()); + if (argsCount == 0) { + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping1Body(lexicalGlobalObject, callFrame, castedThis))); + } else if (argsCount == 1) { + JSValue distinguishingArg = callFrame->uncheckedArgument(0); + if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits<JSArrayBuffer>()) + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping2Body(lexicalGlobalObject, callFrame, castedThis))); + if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits<JSArrayBufferView>()) + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping3Body(lexicalGlobalObject, callFrame, castedThis))); + // if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits<JSBlob>()) + // RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping4Body(lexicalGlobalObject, callFrame, castedThis))); + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping5Body(lexicalGlobalObject, callFrame, castedThis))); + } + return throwVMTypeError(lexicalGlobalObject, throwScope); +} + +JSC_DEFINE_HOST_FUNCTION(jsWebSocketPrototypeFunction_ping, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + return IDLOperation<JSWebSocket>::call<jsWebSocketPrototypeFunction_pingOverloadDispatcher>(*lexicalGlobalObject, *callFrame, "ping"); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong1Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(); }))); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong2Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); + auto data = convert<IDLArrayBuffer>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data", "WebSocket", "pong", "ArrayBuffer"); }); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(*data); }))); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong3Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); + auto data = convert<IDLArrayBufferView>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data", "WebSocket", "pong", "ArrayBufferView"); }); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(data.releaseNonNull()); }))); +} + +// static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong4Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +// { +// auto& vm = JSC::getVM(lexicalGlobalObject); +// auto throwScope = DECLARE_THROW_SCOPE(vm); +// UNUSED_PARAM(throwScope); +// UNUSED_PARAM(callFrame); +// auto& impl = castedThis->wrapped(); +// EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); +// auto data = convert<IDLInterface<Blob>>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data", "WebSocket", "pong", "Blob"); }); +// RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); +// RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(*data); }))); +// } + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong5Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); + auto data = convert<IDLUSVString>(*lexicalGlobalObject, argument0.value()); + RETURN_IF_EXCEPTION(throwScope, encodedJSValue()); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(WTFMove(data)); }))); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pongOverloadDispatcher(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + size_t argsCount = std::min<size_t>(1, callFrame->argumentCount()); + if (argsCount == 0) { + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong1Body(lexicalGlobalObject, callFrame, castedThis))); + } else if (argsCount == 1) { + JSValue distinguishingArg = callFrame->uncheckedArgument(0); + if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits<JSArrayBuffer>()) + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong2Body(lexicalGlobalObject, callFrame, castedThis))); + if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits<JSArrayBufferView>()) + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong3Body(lexicalGlobalObject, callFrame, castedThis))); + // if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits<JSBlob>()) + // RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong4Body(lexicalGlobalObject, callFrame, castedThis))); + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong5Body(lexicalGlobalObject, callFrame, castedThis))); + } + return throwVMTypeError(lexicalGlobalObject, throwScope); +} + +JSC_DEFINE_HOST_FUNCTION(jsWebSocketPrototypeFunction_pong, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + return IDLOperation<JSWebSocket>::call<jsWebSocketPrototypeFunction_pongOverloadDispatcher>(*lexicalGlobalObject, *callFrame, "pong"); +} + +static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_terminateBody(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation<JSWebSocket>::ClassParameter castedThis) +{ + auto& vm = JSC::getVM(lexicalGlobalObject); + auto throwScope = DECLARE_THROW_SCOPE(vm); + UNUSED_PARAM(throwScope); + UNUSED_PARAM(callFrame); + auto& impl = castedThis->wrapped(); + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS<IDLUndefined>(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.terminate(); }))); +} + +JSC_DEFINE_HOST_FUNCTION(jsWebSocketPrototypeFunction_terminate, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +{ + return IDLOperation<JSWebSocket>::call<jsWebSocketPrototypeFunction_terminateBody>(*lexicalGlobalObject, *callFrame, "terminate"); +} + JSC::GCClient::IsoSubspace* JSWebSocket::subspaceForImpl(JSC::VM& vm) { return WebCore::subspaceForImpl<JSWebSocket, UseCustomHeapCellType::No>( diff --git a/src/bun.js/bindings/webcore/WebSocket.cpp b/src/bun.js/bindings/webcore/WebSocket.cpp index 1d6392f44..c1a4054f5 100644 --- a/src/bun.js/bindings/webcore/WebSocket.cpp +++ b/src/bun.js/bindings/webcore/WebSocket.cpp @@ -458,8 +458,7 @@ ExceptionOr<void> WebSocket::send(const String& message) return {}; } - // 0-length is allowed - this->sendWebSocketString(message); + this->sendWebSocketString(message, Opcode::Text); return {}; } @@ -477,8 +476,8 @@ ExceptionOr<void> WebSocket::send(ArrayBuffer& binaryData) } char* data = static_cast<char*>(binaryData.data()); size_t length = binaryData.byteLength(); - // 0-length is allowed - this->sendWebSocketData(data, length); + this->sendWebSocketData(data, length, Opcode::Binary); + return {}; } @@ -498,8 +497,7 @@ ExceptionOr<void> WebSocket::send(ArrayBufferView& arrayBufferView) auto buffer = arrayBufferView.unsharedBuffer().get(); char* baseAddress = reinterpret_cast<char*>(buffer->data()) + arrayBufferView.byteOffset(); size_t length = arrayBufferView.byteLength(); - // 0-length is allowed - this->sendWebSocketData(baseAddress, length); + this->sendWebSocketData(baseAddress, length, Opcode::Binary); return {}; } @@ -521,17 +519,17 @@ ExceptionOr<void> WebSocket::send(ArrayBufferView& arrayBufferView) // return {}; // } -void WebSocket::sendWebSocketData(const char* baseAddress, size_t length) +void WebSocket::sendWebSocketData(const char* baseAddress, size_t length, const Opcode op) { switch (m_connectedWebSocketKind) { case ConnectedWebSocketKind::Client: { - Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast<const unsigned char*>(baseAddress), length); + Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast<const unsigned char*>(baseAddress), length, static_cast<uint8_t>(op)); // this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode); // this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount(); break; } case ConnectedWebSocketKind::ClientSSL: { - Bun__WebSocketClientTLS__writeBinaryData(this->m_connectedWebSocket.clientSSL, reinterpret_cast<const unsigned char*>(baseAddress), length); + Bun__WebSocketClientTLS__writeBinaryData(this->m_connectedWebSocket.clientSSL, reinterpret_cast<const unsigned char*>(baseAddress), length, static_cast<uint8_t>(op)); break; } // case ConnectedWebSocketKind::Server: { @@ -550,19 +548,19 @@ void WebSocket::sendWebSocketData(const char* baseAddress, size_t length) } } -void WebSocket::sendWebSocketString(const String& message) +void WebSocket::sendWebSocketString(const String& message, const Opcode op) { switch (m_connectedWebSocketKind) { case ConnectedWebSocketKind::Client: { auto zigStr = Zig::toZigString(message); - Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr); + Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr, static_cast<uint8_t>(op)); // this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode); // this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount(); break; } case ConnectedWebSocketKind::ClientSSL: { auto zigStr = Zig::toZigString(message); - Bun__WebSocketClientTLS__writeString(this->m_connectedWebSocket.clientSSL, &zigStr); + Bun__WebSocketClientTLS__writeString(this->m_connectedWebSocket.clientSSL, &zigStr, static_cast<uint8_t>(op)); break; } // case ConnectedWebSocketKind::Server: { @@ -586,8 +584,8 @@ void WebSocket::sendWebSocketString(const String& message) ExceptionOr<void> WebSocket::close(std::optional<unsigned short> optionalCode, const String& reason) { - int code = optionalCode ? optionalCode.value() : static_cast<int>(0); - if (code == 0) { + int code = optionalCode ? optionalCode.value() : static_cast<int>(1000); + if (code == 1000) { // LOG(Network, "WebSocket %p close() without code and reason", this); } else { // LOG(Network, "WebSocket %p close() code=%d reason='%s'", this, code, reason.utf8().data()); @@ -650,6 +648,211 @@ ExceptionOr<void> WebSocket::close(std::optional<unsigned short> optionalCode, c return {}; } +ExceptionOr<void> WebSocket::terminate() +{ + LOG(Network, "WebSocket %p terminate()", this); + + if (m_state == CLOSING || m_state == CLOSED) + return {}; + if (m_state == CONNECTING) { + m_state = CLOSING; + if (m_upgradeClient != nullptr) { + void* upgradeClient = m_upgradeClient; + m_upgradeClient = nullptr; + if (m_isSecure) { + Bun__WebSocketHTTPSClient__cancel(upgradeClient); + } else { + Bun__WebSocketHTTPClient__cancel(upgradeClient); + } + } + updateHasPendingActivity(); + return {}; + } + m_state = CLOSING; + switch (m_connectedWebSocketKind) { + case ConnectedWebSocketKind::Client: { + Bun__WebSocketClient__cancel(this->m_connectedWebSocket.client); + updateHasPendingActivity(); + break; + } + case ConnectedWebSocketKind::ClientSSL: { + Bun__WebSocketClientTLS__cancel(this->m_connectedWebSocket.clientSSL); + updateHasPendingActivity(); + break; + } + default: { + break; + } + } + this->m_connectedWebSocketKind = ConnectedWebSocketKind::None; + updateHasPendingActivity(); + return {}; +} + +ExceptionOr<void> WebSocket::ping() +{ + auto message = WTF::String::number(WTF::jsCurrentTime()); + LOG(Network, "WebSocket %p ping() Sending Timestamp '%s'", this, message.data()); + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + // No exception is raised if the connection was once established but has subsequently been closed. + if (m_state == CLOSING || m_state == CLOSED) { + size_t payloadSize = message.length(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + this->sendWebSocketString(message, Opcode::Ping); + + return {}; +} + +ExceptionOr<void> WebSocket::ping(const String& message) +{ + LOG(Network, "WebSocket %p ping() Sending String '%s'", this, message.utf8().data()); + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + // No exception is raised if the connection was once established but has subsequently been closed. + if (m_state == CLOSING || m_state == CLOSED) { + auto utf8 = message.utf8(StrictConversionReplacingUnpairedSurrogatesWithFFFD); + size_t payloadSize = utf8.length(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + this->sendWebSocketString(message, Opcode::Ping); + + return {}; +} + +ExceptionOr<void> WebSocket::ping(ArrayBuffer& binaryData) +{ + LOG(Network, "WebSocket %p ping() Sending ArrayBuffer %p", this, &binaryData); + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + if (m_state == CLOSING || m_state == CLOSED) { + unsigned payloadSize = binaryData.byteLength(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + char* data = static_cast<char*>(binaryData.data()); + size_t length = binaryData.byteLength(); + this->sendWebSocketData(data, length, Opcode::Ping); + + return {}; +} + +ExceptionOr<void> WebSocket::ping(ArrayBufferView& arrayBufferView) +{ + LOG(Network, "WebSocket %p ping() Sending ArrayBufferView %p", this, &arrayBufferView); + + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + if (m_state == CLOSING || m_state == CLOSED) { + unsigned payloadSize = arrayBufferView.byteLength(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + auto buffer = arrayBufferView.unsharedBuffer().get(); + char* baseAddress = reinterpret_cast<char*>(buffer->data()) + arrayBufferView.byteOffset(); + size_t length = arrayBufferView.byteLength(); + this->sendWebSocketData(baseAddress, length, Opcode::Ping); + + return {}; +} + +ExceptionOr<void> WebSocket::pong() +{ + auto message = WTF::String::number(WTF::jsCurrentTime()); + LOG(Network, "WebSocket %p pong() Sending Timestamp '%s'", this, message.data()); + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + // No exception is raised if the connection was once established but has subsequently been closed. + if (m_state == CLOSING || m_state == CLOSED) { + size_t payloadSize = message.length(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + this->sendWebSocketString(message, Opcode::Pong); + + return {}; +} + +ExceptionOr<void> WebSocket::pong(const String& message) +{ + LOG(Network, "WebSocket %p pong() Sending String '%s'", this, message.utf8().data()); + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + // No exception is raised if the connection was once established but has subsequently been closed. + if (m_state == CLOSING || m_state == CLOSED) { + auto utf8 = message.utf8(StrictConversionReplacingUnpairedSurrogatesWithFFFD); + size_t payloadSize = utf8.length(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + this->sendWebSocketString(message, Opcode::Pong); + + return {}; +} + +ExceptionOr<void> WebSocket::pong(ArrayBuffer& binaryData) +{ + LOG(Network, "WebSocket %p pong() Sending ArrayBuffer %p", this, &binaryData); + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + if (m_state == CLOSING || m_state == CLOSED) { + unsigned payloadSize = binaryData.byteLength(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + char* data = static_cast<char*>(binaryData.data()); + size_t length = binaryData.byteLength(); + this->sendWebSocketData(data, length, Opcode::Pong); + + return {}; +} + +ExceptionOr<void> WebSocket::pong(ArrayBufferView& arrayBufferView) +{ + LOG(Network, "WebSocket %p pong() Sending ArrayBufferView %p", this, &arrayBufferView); + + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + + if (m_state == CLOSING || m_state == CLOSED) { + unsigned payloadSize = arrayBufferView.byteLength(); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); + return {}; + } + + auto buffer = arrayBufferView.unsharedBuffer().get(); + char* baseAddress = reinterpret_cast<char*>(buffer->data()) + arrayBufferView.byteOffset(); + size_t length = arrayBufferView.byteLength(); + this->sendWebSocketData(baseAddress, length, Opcode::Pong); + + return {}; +} + const URL& WebSocket::url() const { return m_url; @@ -829,7 +1032,7 @@ void WebSocket::didReceiveMessage(String&& message) // }); } -void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) +void WebSocket::didReceiveBinaryData(const AtomString& eventName, Vector<uint8_t>&& binaryData) { // LOG(Network, "WebSocket %p didReceiveBinaryData() %u byte binary message", this, static_cast<unsigned>(binaryData.size())); // queueTaskKeepingObjectAlive(*this, TaskSource::WebSocket, [this, binaryData = WTFMove(binaryData)]() mutable { @@ -840,17 +1043,16 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) // if (auto* inspector = m_channel->channelInspector()) // inspector->didReceiveWebSocketFrame(WebSocketChannelInspector::createFrame(binaryData.data(), binaryData.size(), WebSocketFrame::OpCode::OpCodeBinary)); // } - switch (m_binaryType) { // case BinaryType::Blob: // // FIXME: We just received the data from NetworkProcess, and are sending it back. This is inefficient. // dispatchEvent(MessageEvent::create(Blob::create(scriptExecutionContext(), WTFMove(binaryData), emptyString()), SecurityOrigin::create(m_url)->toString())); // break; case BinaryType::ArrayBuffer: { - if (this->hasEventListeners("message"_s)) { + if (this->hasEventListeners(eventName)) { // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener this->incPendingActivityCount(); - dispatchEvent(MessageEvent::create(ArrayBuffer::create(binaryData.data(), binaryData.size()), m_url.string())); + dispatchEvent(MessageEvent::create(eventName, ArrayBuffer::create(binaryData.data(), binaryData.size()), m_url.string())); this->decPendingActivityCount(); return; } @@ -858,9 +1060,9 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) if (auto* context = scriptExecutionContext()) { auto arrayBuffer = JSC::ArrayBuffer::create(binaryData.data(), binaryData.size()); this->incPendingActivityCount(); - context->postTask([this, buffer = WTFMove(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) { + context->postTask([this, name = WTFMove(eventName), buffer = WTFMove(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) { ASSERT(scriptExecutionContext()); - protectedThis->dispatchEvent(MessageEvent::create(buffer, m_url.string())); + protectedThis->dispatchEvent(MessageEvent::create(name, buffer, m_url.string())); protectedThis->decPendingActivityCount(); }); } @@ -869,7 +1071,7 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) } case BinaryType::NodeBuffer: { - if (this->hasEventListeners("message"_s)) { + if (this->hasEventListeners(eventName)) { // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener this->incPendingActivityCount(); JSUint8Array* buffer = jsCast<JSUint8Array*>(JSValue::decode(JSBuffer__bufferFromLength(scriptExecutionContext()->jsGlobalObject(), binaryData.size()))); @@ -880,7 +1082,7 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) init.data = buffer; init.origin = this->m_url.string(); - dispatchEvent(MessageEvent::create(eventNames().messageEvent, WTFMove(init), EventIsTrusted::Yes)); + dispatchEvent(MessageEvent::create(eventName, WTFMove(init), EventIsTrusted::Yes)); this->decPendingActivityCount(); return; } @@ -890,7 +1092,7 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) this->incPendingActivityCount(); - context->postTask([this, buffer = WTFMove(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) { + context->postTask([this, name = WTFMove(eventName), buffer = WTFMove(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) { ASSERT(scriptExecutionContext()); size_t length = buffer->byteLength(); JSUint8Array* uint8array = JSUint8Array::create( @@ -903,7 +1105,7 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) MessageEvent::Init init; init.data = uint8array; init.origin = protectedThis->m_url.string(); - protectedThis->dispatchEvent(MessageEvent::create(eventNames().messageEvent, WTFMove(init), EventIsTrusted::Yes)); + protectedThis->dispatchEvent(MessageEvent::create(name, WTFMove(init), EventIsTrusted::Yes)); protectedThis->decPendingActivityCount(); }); } @@ -914,7 +1116,7 @@ void WebSocket::didReceiveBinaryData(Vector<uint8_t>&& binaryData) // }); } -void WebSocket::didReceiveMessageError(unsigned short code, WTF::String reason) +void WebSocket::didReceiveClose(CleanStatus wasClean, unsigned short code, WTF::String reason) { // LOG(Network, "WebSocket %p didReceiveErrorMessage()", this); // queueTaskKeepingObjectAlive(*this, TaskSource::WebSocket, [this, reason = WTFMove(reason)] { @@ -924,7 +1126,7 @@ void WebSocket::didReceiveMessageError(unsigned short code, WTF::String reason) if (auto* context = scriptExecutionContext()) { this->incPendingActivityCount(); // https://html.spec.whatwg.org/multipage/web-sockets.html#feedback-from-the-protocol:concept-websocket-closed, we should synchronously fire a close event. - dispatchEvent(CloseEvent::create(code < 1002, code, reason)); + dispatchEvent(CloseEvent::create(wasClean == CleanStatus::Clean, code, reason)); this->decPendingActivityCount(); } } @@ -1051,158 +1253,158 @@ void WebSocket::didFailWithErrorCode(int32_t code) // invalid_response case 1: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Invalid response"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // expected_101_status_code case 2: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Expected 101 status code"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // missing_upgrade_header case 3: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Missing upgrade header"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // missing_connection_header case 4: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Missing connection header"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // missing_websocket_accept_header case 5: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Missing websocket accept header"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // invalid_upgrade_header case 6: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Invalid upgrade header"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // invalid_connection_header case 7: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Invalid connection header"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // invalid_websocket_version case 8: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Invalid websocket version"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // mismatch_websocket_accept_header case 9: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Mismatch websocket accept header"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // missing_client_protocol case 10: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Missing client protocol"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::Clean, 1002, message); break; } // mismatch_client_protocol case 11: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Mismatch client protocol"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::Clean, 1002, message); break; } // timeout case 12: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Timeout"); - didReceiveMessageError(1013, message); + didReceiveClose(CleanStatus::Clean, 1013, message); break; } // closed case 13: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Closed by client"); - didReceiveMessageError(1000, message); + didReceiveClose(CleanStatus::Clean, 1000, message); break; } // failed_to_write case 14: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Failed to write"); - didReceiveMessageError(1006, message); + didReceiveClose(CleanStatus::NotClean, 1006, message); break; } // failed_to_connect case 15: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Failed to connect"); - didReceiveMessageError(1006, message); + didReceiveClose(CleanStatus::NotClean, 1006, message); break; } // headers_too_large case 16: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Headers too large"); - didReceiveMessageError(1007, message); + didReceiveClose(CleanStatus::NotClean, 1007, message); break; } // ended case 17: { - static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Closed by server"); - didReceiveMessageError(1001, message); + static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Connection ended"); + didReceiveClose(CleanStatus::NotClean, 1006, message); break; } // failed_to_allocate_memory case 18: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Failed to allocate memory"); - didReceiveMessageError(1001, message); + didReceiveClose(CleanStatus::NotClean, 1001, message); break; } // control_frame_is_fragmented case 19: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Protocol error - control frame is fragmented"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // invalid_control_frame case 20: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Protocol error - invalid control frame"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // compression_unsupported case 21: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Compression not implemented yet"); - didReceiveMessageError(1011, message); + didReceiveClose(CleanStatus::Clean, 1011, message); break; } // unexpected_mask_from_server case 22: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Protocol error - unexpected mask from server"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // expected_control_frame case 23: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Protocol error - expected control frame"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // unsupported_control_frame case 24: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Protocol error - unsupported control frame"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // unexpected_opcode case 25: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Protocol error - unexpected opcode"); - didReceiveMessageError(1002, message); + didReceiveClose(CleanStatus::NotClean, 1002, message); break; } // invalid_utf8 case 26: { static NeverDestroyed<String> message = MAKE_STATIC_STRING_IMPL("Server sent invalid UTF8"); - didReceiveMessageError(1003, message); + didReceiveClose(CleanStatus::NotClean, 1003, message); break; } } @@ -1225,19 +1427,37 @@ extern "C" void WebSocket__didConnect(WebCore::WebSocket* webSocket, us_socket_t { webSocket->didConnect(socket, bufferedData, len); } -extern "C" void WebSocket__didCloseWithErrorCode(WebCore::WebSocket* webSocket, int32_t errorCode) +extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, int32_t errorCode) { webSocket->didFailWithErrorCode(errorCode); } +extern "C" void WebSocket__didClose(WebCore::WebSocket* webSocket, uint16_t errorCode, const BunString *reason) +{ + WTF::String wtf_reason = Bun::toWTFString(*reason); + webSocket->didClose(0, errorCode, WTFMove(wtf_reason)); +} extern "C" void WebSocket__didReceiveText(WebCore::WebSocket* webSocket, bool clone, const ZigString* str) { WTF::String wtf_str = clone ? Zig::toStringCopy(*str) : Zig::toString(*str); webSocket->didReceiveMessage(WTFMove(wtf_str)); } -extern "C" void WebSocket__didReceiveBytes(WebCore::WebSocket* webSocket, uint8_t* bytes, size_t len) +extern "C" void WebSocket__didReceiveBytes(WebCore::WebSocket* webSocket, uint8_t* bytes, size_t len, const uint8_t op) { - webSocket->didReceiveBinaryData({ bytes, len }); + auto opcode = static_cast<WebCore::WebSocket::Opcode>(op); + switch (opcode) { + case WebCore::WebSocket::Opcode::Binary: + webSocket->didReceiveBinaryData("message"_s, { bytes, len }); + break; + case WebCore::WebSocket::Opcode::Ping: + webSocket->didReceiveBinaryData("ping"_s, { bytes, len }); + break; + case WebCore::WebSocket::Opcode::Pong: + webSocket->didReceiveBinaryData("pong"_s, { bytes, len }); + break; + default: + break; + } } extern "C" void WebSocket__incrementPendingActivity(WebCore::WebSocket* webSocket) diff --git a/src/bun.js/bindings/webcore/WebSocket.h b/src/bun.js/bindings/webcore/WebSocket.h index 846bd186b..cf18bd40d 100644 --- a/src/bun.js/bindings/webcore/WebSocket.h +++ b/src/bun.js/bindings/webcore/WebSocket.h @@ -70,6 +70,20 @@ public: CLOSED = 3, }; + enum Opcode : unsigned char { + Continue = 0x0, + Text = 0x1, + Binary = 0x2, + Close = 0x8, + Ping = 0x9, + Pong = 0xA, + }; + + enum CleanStatus { + NotClean = 0, + Clean = 1, + }; + ExceptionOr<void> connect(const String& url); ExceptionOr<void> connect(const String& url, const String& protocol); ExceptionOr<void> connect(const String& url, const Vector<String>& protocols); @@ -80,7 +94,20 @@ public: ExceptionOr<void> send(JSC::ArrayBufferView&); // ExceptionOr<void> send(Blob&); + ExceptionOr<void> ping(); + ExceptionOr<void> ping(const String& message); + ExceptionOr<void> ping(JSC::ArrayBuffer&); + ExceptionOr<void> ping(JSC::ArrayBufferView&); + // ExceptionOr<void> ping(Blob&); + + ExceptionOr<void> pong(); + ExceptionOr<void> pong(const String& message); + ExceptionOr<void> pong(JSC::ArrayBuffer&); + ExceptionOr<void> pong(JSC::ArrayBufferView&); + // ExceptionOr<void> ping(Blob&); + ExceptionOr<void> close(std::optional<unsigned short> code, const String& reason); + ExceptionOr<void> terminate(); const URL& url() const; State readyState() const; @@ -103,7 +130,7 @@ public: void didReceiveMessage(String&& message); void didReceiveData(const char* data, size_t length); - void didReceiveBinaryData(Vector<uint8_t>&&); + void didReceiveBinaryData(const AtomString& eventName, Vector<uint8_t>&& binaryData); void updateHasPendingActivity(); bool hasPendingActivity() const @@ -154,12 +181,12 @@ private: void refEventTarget() final { ref(); } void derefEventTarget() final { deref(); } - void didReceiveMessageError(unsigned short code, WTF::String reason); + void didReceiveClose(CleanStatus wasClean, unsigned short code, WTF::String reason); void didUpdateBufferedAmount(unsigned bufferedAmount); void didStartClosingHandshake(); - void sendWebSocketString(const String& message); - void sendWebSocketData(const char* data, size_t length); + void sendWebSocketString(const String& message, const Opcode opcode); + void sendWebSocketData(const char* data, size_t length, const Opcode opcode); void failAsynchronously(); @@ -172,7 +199,12 @@ private: URL m_url; unsigned m_bufferedAmount { 0 }; unsigned m_bufferedAmountAfterClose { 0 }; - BinaryType m_binaryType { BinaryType::ArrayBuffer }; + // In browsers, the default is Blob, however most applications + // immediately change the default to ArrayBuffer. + // + // And since we know the typical usage is to override the default, + // we set NodeBuffer as the default to match the default of ServerWebSocket. + BinaryType m_binaryType { BinaryType::NodeBuffer }; String m_subprotocol; String m_extensions; void* m_upgradeClient { nullptr }; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 7d2435823..beb2d1856 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -350,12 +350,14 @@ pub const ExitHandler = struct { extern fn Bun__closeAllSQLiteDatabasesForTermination() void; pub fn dispatchOnExit(this: *ExitHandler) void { + JSC.markBinding(@src()); var vm = @fieldParentPtr(VirtualMachine, "exit_handler", this); Process__dispatchOnExit(vm.global, this.exit_code); Bun__closeAllSQLiteDatabasesForTermination(); } pub fn dispatchOnBeforeExit(this: *ExitHandler) void { + JSC.markBinding(@src()); var vm = @fieldParentPtr(VirtualMachine, "exit_handler", this); Process__dispatchOnBeforeExit(vm.global, this.exit_code); } diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig index ae8e40763..80f29525c 100644 --- a/src/http/websocket_http_client.zig +++ b/src/http/websocket_http_client.zig @@ -79,11 +79,16 @@ fn buildRequestBody( if (client_protocol.len > 0) client_protocol_hash.* = bun.hash(static_headers[1].value); - const headers_ = static_headers[0 .. 1 + @as(usize, @intFromBool(client_protocol.len > 0))]; + const pathname_ = pathname.toSlice(allocator); + const host_ = host.toSlice(allocator); + defer { + pathname_.deinit(); + host_.deinit(); + } - const pathname_ = pathname.slice(); - const host_ = host.slice(); + const headers_ = static_headers[0 .. 1 + @as(usize, @intFromBool(client_protocol.len > 0))]; const pico_headers = PicoHTTP.Headers{ .headers = headers_ }; + return try std.fmt.allocPrint( allocator, "GET {s} HTTP/1.1\r\n" ++ @@ -93,10 +98,10 @@ fn buildRequestBody( "Connection: Upgrade\r\n" ++ "Upgrade: websocket\r\n" ++ "Sec-WebSocket-Version: 13\r\n" ++ - "{any}" ++ - "{any}" ++ + "{s}" ++ + "{s}" ++ "\r\n", - .{ pathname_, host_, pico_headers, extra_headers }, + .{ pathname_.slice(), host_.slice(), pico_headers, extra_headers }, ); } @@ -137,12 +142,14 @@ const CppWebSocket = opaque { buffered_data: ?[*]u8, buffered_len: usize, ) void; - extern fn WebSocket__didCloseWithErrorCode(websocket_context: *CppWebSocket, reason: ErrorCode) void; + extern fn WebSocket__didAbruptClose(websocket_context: *CppWebSocket, reason: ErrorCode) void; + extern fn WebSocket__didClose(websocket_context: *CppWebSocket, code: u16, reason: *const bun.String) void; extern fn WebSocket__didReceiveText(websocket_context: *CppWebSocket, clone: bool, text: *const JSC.ZigString) void; - extern fn WebSocket__didReceiveBytes(websocket_context: *CppWebSocket, bytes: [*]const u8, byte_len: usize) void; + extern fn WebSocket__didReceiveBytes(websocket_context: *CppWebSocket, bytes: [*]const u8, byte_len: usize, opcode: u8) void; pub const didConnect = WebSocket__didConnect; - pub const didCloseWithErrorCode = WebSocket__didCloseWithErrorCode; + pub const didAbruptClose = WebSocket__didAbruptClose; + pub const didClose = WebSocket__didClose; pub const didReceiveText = WebSocket__didReceiveText; pub const didReceiveBytes = WebSocket__didReceiveBytes; extern fn WebSocket__incrementPendingActivity(websocket_context: *CppWebSocket) void; @@ -307,7 +314,7 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type { JSC.markBinding(@src()); if (this.outgoing_websocket) |ws| { this.outgoing_websocket = null; - ws.didCloseWithErrorCode(code); + ws.didAbruptClose(code); } this.cancel(); @@ -319,7 +326,7 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type { this.clearData(); if (this.outgoing_websocket) |ws| { this.outgoing_websocket = null; - ws.didCloseWithErrorCode(ErrorCode.ended); + ws.didAbruptClose(ErrorCode.ended); } } @@ -728,10 +735,10 @@ fn parseWebSocketHeader( return .extended_payload_length_64 else return .fail, - .Close => ReceiveState.close, - .Ping => ReceiveState.ping, - .Pong => ReceiveState.pong, - else => ReceiveState.fail, + .Close => .close, + .Ping => .ping, + .Pong => .pong, + else => .fail, }; } @@ -762,7 +769,7 @@ const Copy = union(enum) { } } - pub fn copy(this: @This(), globalThis: *JSC.JSGlobalObject, buf: []u8, content_byte_len: usize) void { + pub fn copy(this: @This(), globalThis: *JSC.JSGlobalObject, buf: []u8, content_byte_len: usize, opcode: Opcode) void { if (this == .raw) { std.debug.assert(buf.len >= this.raw.len); std.debug.assert(buf.ptr != this.raw.ptr); @@ -793,6 +800,7 @@ const Copy = union(enum) { header.mask = true; header.compressed = false; header.final = true; + header.opcode = opcode; std.debug.assert(WebsocketHeader.frameSizeIncludingMask(content_byte_len) == buf.len); @@ -803,7 +811,6 @@ const Copy = union(enum) { std.debug.assert(@as(usize, encode_into_result.written) == content_byte_len); std.debug.assert(@as(usize, encode_into_result.read) == utf16.len); header.len = WebsocketHeader.packLength(encode_into_result.written); - header.opcode = Opcode.Text; var fib = std.io.fixedBufferStream(buf); header.writeHeader(fib.writer(), encode_into_result.written) catch unreachable; @@ -817,14 +824,12 @@ const Copy = union(enum) { std.debug.assert(@as(usize, encode_into_result.read) == latin1.len); header.len = WebsocketHeader.packLength(encode_into_result.written); - header.opcode = Opcode.Text; var fib = std.io.fixedBufferStream(buf); header.writeHeader(fib.writer(), encode_into_result.written) catch unreachable; Mask.fill(globalThis, buf[mask_offset..][0..4], to_mask[0..content_byte_len], to_mask[0..content_byte_len]); }, .bytes => |bytes| { header.len = WebsocketHeader.packLength(bytes.len); - header.opcode = Opcode.Binary; var fib = std.io.fixedBufferStream(buf); header.writeHeader(fib.writer(), bytes.len) catch unreachable; Mask.fill(globalThis, buf[mask_offset..][0..4], to_mask[0..content_byte_len], bytes); @@ -846,6 +851,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { ping_frame_bytes: [128 + 6]u8 = [_]u8{0} ** (128 + 6), ping_len: u8 = 0, + ping_received: bool = false, receive_frame: usize = 0, receive_body_remain: usize = 0, @@ -899,6 +905,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { this.poll_ref.unrefOnNextTick(this.globalThis.bunVM()); this.clearReceiveBuffers(true); this.clearSendBuffers(true); + this.ping_received = false; this.ping_len = 0; this.receive_pending_chunk_len = 0; } @@ -921,8 +928,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { if (this.outgoing_websocket) |ws| { this.outgoing_websocket = null; log("fail ({s})", .{@tagName(code)}); - - ws.didCloseWithErrorCode(code); + ws.didAbruptClose(code); } this.cancel(); @@ -937,7 +943,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { if (success == 0) { if (this.outgoing_websocket) |ws| { this.outgoing_websocket = null; - ws.didCloseWithErrorCode(ErrorCode.failed_to_connect); + ws.didAbruptClose(ErrorCode.failed_to_connect); } } } @@ -947,7 +953,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { this.clearData(); if (this.outgoing_websocket) |ws| { this.outgoing_websocket = null; - ws.didCloseWithErrorCode(ErrorCode.ended); + ws.didAbruptClose(ErrorCode.ended); } } @@ -1002,16 +1008,15 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { out.didReceiveText(true, &outstring); } }, - .Binary => { + .Binary, .Ping, .Pong => { JSC.markBinding(@src()); - out.didReceiveBytes(data_.ptr, data_.len); + out.didReceiveBytes(data_.ptr, data_.len, @as(u8, @intFromEnum(kind))); }, else => unreachable, } } pub fn consume(this: *WebSocket, data_: []const u8, left_in_fragment: usize, kind: Opcode, is_final: bool) usize { - std.debug.assert(kind == .Text or kind == .Binary); std.debug.assert(data_.len <= left_in_fragment); // did all the data fit in the buffer? @@ -1074,9 +1079,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { // if we receive multiple pings in a row // we just send back the last one - if (this.ping_len > 0) { + if (this.ping_received) { _ = this.sendPong(socket); - this.ping_len = 0; + this.ping_received = false; } } } @@ -1214,10 +1219,12 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { break; } }, - .ping => { const ping_len = @min(data.len, @min(receive_body_remain, 125)); this.ping_len = ping_len; + this.ping_received = true; + + this.dispatchData(data[0..ping_len], .Ping); if (ping_len > 0) { @memcpy(this.ping_frame_bytes[6..][0..ping_len], data[0..ping_len]); @@ -1232,9 +1239,14 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { }, .pong => { const pong_len = @min(data.len, @min(receive_body_remain, this.ping_frame_bytes.len)); + + this.dispatchData(data[0..pong_len], .Pong); + data = data[pong_len..]; receive_state = .need_header; + receive_body_remain = 0; receiving_type = last_receive_data_type; + if (data.len == 0) break; }, .need_body => { @@ -1318,16 +1330,16 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { } fn copyToSendBuffer(this: *WebSocket, bytes: []const u8, do_write: bool, is_closing: bool) bool { - return this.sendData(.{ .raw = bytes }, do_write, is_closing); + return this.sendData(.{ .raw = bytes }, do_write, is_closing, .Binary); } - fn sendData(this: *WebSocket, bytes: Copy, do_write: bool, is_closing: bool) bool { + fn sendData(this: *WebSocket, bytes: Copy, do_write: bool, is_closing: bool, opcode: Opcode) bool { var content_byte_len: usize = 0; const write_len = bytes.len(&content_byte_len); std.debug.assert(write_len > 0); var writable = this.send_buffer.writableWithSize(write_len) catch unreachable; - bytes.copy(this.globalThis, writable[0..write_len], content_byte_len); + bytes.copy(this.globalThis, writable[0..write_len], content_byte_len, opcode); this.send_buffer.update(write_len); if (do_write) { @@ -1372,7 +1384,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { fn sendPong(this: *WebSocket, socket: Socket) bool { if (socket.isClosed() or socket.isShutdown()) { - this.dispatchClose(); + this.dispatchAbruptClose(); return false; } @@ -1403,7 +1415,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { ) void { log("Sending close with code {d}", .{code}); if (socket.isClosed() or socket.isShutdown()) { - this.dispatchClose(); + this.dispatchAbruptClose(); this.clearData(); return; } @@ -1419,8 +1431,12 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { var mask_buf: *[4]u8 = final_body_bytes[2..6]; std.mem.writeIntSliceBig(u16, final_body_bytes[6..8], code); + var reason = bun.String.empty; if (body) |data| { - if (body_len > 0) @memcpy(final_body_bytes[8..][0..body_len], data[0..body_len]); + if (body_len > 0) { + reason = bun.String.create(data[0..body_len]); + @memcpy(final_body_bytes[8..][0..body_len], data[0..body_len]); + } } // we must mask the code @@ -1428,7 +1444,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { Mask.fill(this.globalThis, mask_buf, slice[6..], slice[6..]); if (this.enqueueEncodedBytesMaybeFinal(socket, slice, true)) { - this.dispatchClose(); + this.dispatchClose(code, &reason); this.clearData(); } } @@ -1466,37 +1482,41 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { this: *WebSocket, ptr: [*]const u8, len: usize, + op: u8, ) callconv(.C) void { if (this.tcp.isClosed() or this.tcp.isShutdown()) { - this.dispatchClose(); + this.dispatchAbruptClose(); return; } + const opcode = @enumFromInt(Opcode, @truncate(u4, op)); const slice = ptr[0..len]; const bytes = Copy{ .bytes = slice }; // fast path: small frame, no backpressure, attempt to send without allocating const frame_size = WebsocketHeader.frameSizeIncludingMask(len); if (!this.hasBackpressure() and frame_size < stack_frame_size) { var inline_buf: [stack_frame_size]u8 = undefined; - bytes.copy(this.globalThis, inline_buf[0..frame_size], slice.len); + bytes.copy(this.globalThis, inline_buf[0..frame_size], slice.len, opcode); _ = this.enqueueEncodedBytes(this.tcp, inline_buf[0..frame_size]); return; } - _ = this.sendData(bytes, !this.hasBackpressure(), false); + _ = this.sendData(bytes, !this.hasBackpressure(), false, opcode); } pub fn writeString( this: *WebSocket, str_: *const JSC.ZigString, + op: u8, ) callconv(.C) void { const str = str_.*; if (this.tcp.isClosed() or this.tcp.isShutdown()) { - this.dispatchClose(); + this.dispatchAbruptClose(); return; } // Note: 0 is valid + const opcode = @enumFromInt(Opcode, @truncate(u4, op)); { var inline_buf: [stack_frame_size]u8 = undefined; @@ -1506,7 +1526,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { var byte_len: usize = 0; const frame_size = bytes.len(&byte_len); if (!this.hasBackpressure() and frame_size < stack_frame_size) { - bytes.copy(this.globalThis, inline_buf[0..frame_size], byte_len); + bytes.copy(this.globalThis, inline_buf[0..frame_size], byte_len, opcode); _ = this.enqueueEncodedBytes(this.tcp, inline_buf[0..frame_size]); return; } @@ -1516,7 +1536,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { var byte_len: usize = 0; const frame_size = bytes.len(&byte_len); std.debug.assert(frame_size <= stack_frame_size); - bytes.copy(this.globalThis, inline_buf[0..frame_size], byte_len); + bytes.copy(this.globalThis, inline_buf[0..frame_size], byte_len, opcode); _ = this.enqueueEncodedBytes(this.tcp, inline_buf[0..frame_size]); return; } @@ -1529,15 +1549,24 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { Copy{ .latin1 = str.slice() }, !this.hasBackpressure(), false, + opcode, ); } - fn dispatchClose(this: *WebSocket) void { + fn dispatchAbruptClose(this: *WebSocket) void { var out = this.outgoing_websocket orelse return; this.poll_ref.unrefOnNextTick(this.globalThis.bunVM()); JSC.markBinding(@src()); this.outgoing_websocket = null; - out.didCloseWithErrorCode(ErrorCode.closed); + out.didAbruptClose(ErrorCode.closed); + } + + fn dispatchClose(this: *WebSocket, code: u16, reason: *const bun.String) void { + var out = this.outgoing_websocket orelse return; + this.poll_ref.unrefOnNextTick(this.globalThis.bunVM()); + JSC.markBinding(@src()); + this.outgoing_websocket = null; + out.didClose(code, reason); } pub fn close(this: *WebSocket, code: u16, reason: ?*const JSC.ZigString) callconv(.C) void { @@ -1650,6 +1679,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { .writeBinaryData = writeBinaryData, .writeString = writeString, .close = close, + .cancel = cancel, .register = register, .init = init, .finalize = finalize, @@ -1660,9 +1690,10 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { @export(writeBinaryData, .{ .name = Export[0].symbol_name }); @export(writeString, .{ .name = Export[1].symbol_name }); @export(close, .{ .name = Export[2].symbol_name }); - @export(register, .{ .name = Export[3].symbol_name }); - @export(init, .{ .name = Export[4].symbol_name }); - @export(finalize, .{ .name = Export[5].symbol_name }); + @export(cancel, .{ .name = Export[3].symbol_name }); + @export(register, .{ .name = Export[4].symbol_name }); + @export(init, .{ .name = Export[5].symbol_name }); + @export(finalize, .{ .name = Export[6].symbol_name }); } } }; diff --git a/src/js/out/modules/thirdparty/ws.js b/src/js/out/modules/thirdparty/ws.js index 7a48da4c1..175ab5fa1 100644 --- a/src/js/out/modules/thirdparty/ws.js +++ b/src/js/out/modules/thirdparty/ws.js @@ -48,7 +48,14 @@ var emitWarning = function(type, message) { Error.captureStackTrace(err, abortHandshakeOrEmitwsClientError), server.emit("wsClientError", err, socket, req); } else abortHandshake(response, code, message); -}, kBunInternals = Symbol.for("::bunternal::"), readyStates = ["CONNECTING", "OPEN", "CLOSING", "CLOSED"], encoder = new TextEncoder, emittedWarnings = new Set; +}, kBunInternals = Symbol.for("::bunternal::"), readyStates = ["CONNECTING", "OPEN", "CLOSING", "CLOSED"], encoder = new TextEncoder, eventIds = { + open: 1, + close: 2, + message: 3, + error: 4, + ping: 5, + pong: 6 +}, emittedWarnings = new Set; class BunWebSocket extends EventEmitter { static CONNECTING = 0; @@ -59,51 +66,83 @@ class BunWebSocket extends EventEmitter { #paused = !1; #fragments = !1; #binaryType = "nodebuffer"; - readyState = BunWebSocket.CONNECTING; + #eventId = 0; constructor(url, protocols, options) { super(); let ws = this.#ws = new WebSocket(url, protocols); - ws.binaryType = "nodebuffer", ws.addEventListener("open", () => { - this.readyState = BunWebSocket.OPEN, this.emit("open"); - }), ws.addEventListener("error", (err) => { - this.readyState = BunWebSocket.CLOSED, this.emit("error", err); - }), ws.addEventListener("close", (ev) => { - this.readyState = BunWebSocket.CLOSED, this.emit("close", ev.code, ev.reason); - }), ws.addEventListener("message", (ev) => { - const isBinary = typeof ev.data !== "string"; - if (isBinary) - this.emit("message", this.#fragments ? [ev.data] : ev.data, isBinary); - else { - var encoded = encoder.encode(ev.data); - if (this.#binaryType !== "arraybuffer") - encoded = Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength); - this.emit("message", this.#fragments ? [encoded] : encoded, isBinary); - } - }); + ws.binaryType = "nodebuffer"; } on(event, listener) { - if (event === "unexpected-response" || event === "upgrade" || event === "ping" || event === "pong" || event === "redirect") + if (event === "unexpected-response" || event === "upgrade" || event === "redirect") emitWarning(event, "ws.WebSocket '" + event + "' event is not implemented in bun"); + const mask = 1 << eventIds[event]; + if (mask && (this.#eventId & mask) !== mask) { + if (this.#eventId |= mask, event === "open") + this.#ws.addEventListener("open", () => { + this.emit("open"); + }); + else if (event === "close") + this.#ws.addEventListener("close", ({ code, reason, wasClean }) => { + this.emit("close", code, reason, wasClean); + }); + else if (event === "message") + this.#ws.addEventListener("message", ({ data }) => { + const isBinary = typeof data !== "string"; + if (isBinary) + this.emit("message", this.#fragments ? [data] : data, isBinary); + else { + let encoded = encoder.encode(data); + if (this.#binaryType !== "arraybuffer") + encoded = Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength); + this.emit("message", this.#fragments ? [encoded] : encoded, isBinary); + } + }); + else if (event === "error") + this.#ws.addEventListener("error", (err) => { + this.emit("error", err); + }); + else if (event === "ping") + this.#ws.addEventListener("ping", ({ data }) => { + this.emit("ping", data); + }); + else if (event === "pong") + this.#ws.addEventListener("pong", ({ data }) => { + this.emit("pong", data); + }); + } return super.on(event, listener); } send(data, opts, cb) { - this.#ws.send(data, opts?.compress), typeof cb === "function" && cb(); + try { + this.#ws.send(data, opts?.compress); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + typeof cb === "function" && cb(); } close(code, reason) { this.#ws.close(code, reason); } + terminate() { + this.#ws.terminate(); + } + get url() { + return this.#ws.url; + } + get readyState() { + return this.#ws.readyState; + } get binaryType() { return this.#binaryType; } set binaryType(value) { - if (value) - this.#ws.binaryType = value; - } - set binaryType(value) { if (value === "nodebuffer" || value === "arraybuffer") this.#ws.binaryType = this.#binaryType = value, this.#fragments = !1; else if (value === "fragments") this.#ws.binaryType = "nodebuffer", this.#binaryType = "fragments", this.#fragments = !0; + else + throw new Error(`Invalid binaryType: ${value}`); } get protocol() { return this.#ws.protocol; @@ -148,35 +187,49 @@ class BunWebSocket extends EventEmitter { return this.#paused; } ping(data, mask, cb) { - if (this.readyState === BunWebSocket.CONNECTING) - throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); if (typeof data === "function") cb = data, data = mask = void 0; else if (typeof mask === "function") cb = mask, mask = void 0; if (typeof data === "number") data = data.toString(); - emitWarning("ping()", "ws.WebSocket.ping() is not implemented in bun"), typeof cb === "function" && cb(); + try { + this.#ws.ping(data); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + typeof cb === "function" && cb(); } pong(data, mask, cb) { - if (this.readyState === BunWebSocket.CONNECTING) - throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); if (typeof data === "function") cb = data, data = mask = void 0; else if (typeof mask === "function") cb = mask, mask = void 0; if (typeof data === "number") data = data.toString(); - emitWarning("pong()", "ws.WebSocket.pong() is not implemented in bun"), typeof cb === "function" && cb(); + try { + this.#ws.pong(data); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + typeof cb === "function" && cb(); } pause() { - if (this.readyState === WebSocket.CONNECTING || this.readyState === WebSocket.CLOSED) - return; + switch (this.readyState) { + case WebSocket.CONNECTING: + case WebSocket.CLOSED: + return; + } this.#paused = !0, emitWarning("pause()", "ws.WebSocket.pause() is not implemented in bun"); } resume() { - if (this.readyState === WebSocket.CONNECTING || this.readyState === WebSocket.CLOSED) - return; + switch (this.readyState) { + case WebSocket.CONNECTING: + case WebSocket.CLOSED: + return; + } this.#paused = !1, emitWarning("resume()", "ws.WebSocket.resume() is not implemented in bun"); } } diff --git a/src/js/thirdparty/ws.js b/src/js/thirdparty/ws.js index 5b27c5b50..e88ae6769 100644 --- a/src/js/thirdparty/ws.js +++ b/src/js/thirdparty/ws.js @@ -3,12 +3,20 @@ // this just wraps WebSocket to look like an EventEmitter // without actually using an EventEmitter polyfill -import { EventEmitter } from "events"; -import http from "http"; +import { EventEmitter } from "node:events"; +import http from "node:http"; const kBunInternals = Symbol.for("::bunternal::"); const readyStates = ["CONNECTING", "OPEN", "CLOSING", "CLOSED"]; const encoder = new TextEncoder(); +const eventIds = { + open: 1, + close: 2, + message: 3, + error: 4, + ping: 5, + pong: 6, +}; const emittedWarnings = new Set(); function emitWarning(type, message) { @@ -18,13 +26,8 @@ function emitWarning(type, message) { console.warn("[bun] Warning:", message); } -/* - * deviations: we do not implement these events - * - "unexpected-response" - * - "upgrade" - * - "ping" - * - "pong" - * - "redirect" +/** + * @link https://github.com/websockets/ws/blob/master/doc/ws.md#class-websocket */ class BunWebSocket extends EventEmitter { static CONNECTING = 0; @@ -36,54 +39,69 @@ class BunWebSocket extends EventEmitter { #paused = false; #fragments = false; #binaryType = "nodebuffer"; - readyState = BunWebSocket.CONNECTING; + + // Bitset to track whether event handlers are set. + #eventId = 0; constructor(url, protocols, options) { - // deviation: we don't support anything in `options` super(); let ws = (this.#ws = new WebSocket(url, protocols)); - ws.binaryType = "nodebuffer"; // bun's WebSocket supports "nodebuffer" - ws.addEventListener("open", () => { - this.readyState = BunWebSocket.OPEN; - this.emit("open"); - }); - ws.addEventListener("error", err => { - this.readyState = BunWebSocket.CLOSED; - this.emit("error", err); - }); - ws.addEventListener("close", ev => { - this.readyState = BunWebSocket.CLOSED; - this.emit("close", ev.code, ev.reason); - }); - ws.addEventListener("message", ev => { - const isBinary = typeof ev.data !== "string"; - if (isBinary) { - this.emit("message", this.#fragments ? [ev.data] : ev.data, isBinary); - } else { - var encoded = encoder.encode(ev.data); - if (this.#binaryType !== "arraybuffer") { - encoded = Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength); - } - this.emit("message", this.#fragments ? [encoded] : encoded, isBinary); - } - }); + ws.binaryType = "nodebuffer"; + // TODO: options } on(event, listener) { - if ( - event === "unexpected-response" || - event === "upgrade" || - event === "ping" || - event === "pong" || - event === "redirect" - ) { + if (event === "unexpected-response" || event === "upgrade" || event === "redirect") { emitWarning(event, "ws.WebSocket '" + event + "' event is not implemented in bun"); } + const mask = 1 << eventIds[event]; + if (mask && (this.#eventId & mask) !== mask) { + this.#eventId |= mask; + if (event === "open") { + this.#ws.addEventListener("open", () => { + this.emit("open"); + }); + } else if (event === "close") { + this.#ws.addEventListener("close", ({ code, reason, wasClean }) => { + this.emit("close", code, reason, wasClean); + }); + } else if (event === "message") { + this.#ws.addEventListener("message", ({ data }) => { + const isBinary = typeof data !== "string"; + if (isBinary) { + this.emit("message", this.#fragments ? [data] : data, isBinary); + } else { + let encoded = encoder.encode(data); + if (this.#binaryType !== "arraybuffer") { + encoded = Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength); + } + this.emit("message", this.#fragments ? [encoded] : encoded, isBinary); + } + }); + } else if (event === "error") { + this.#ws.addEventListener("error", (err) => { + this.emit("error", err); + }); + } else if (event === "ping") { + this.#ws.addEventListener("ping", ({ data }) => { + this.emit("ping", data); + }); + } else if (event === "pong") { + this.#ws.addEventListener("pong", ({ data }) => { + this.emit("pong", data); + }); + } + } return super.on(event, listener); } send(data, opts, cb) { - this.#ws.send(data, opts?.compress); + try { + this.#ws.send(data, opts?.compress); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } // deviation: this should be called once the data is written, not immediately typeof cb === "function" && cb(); } @@ -92,12 +110,20 @@ class BunWebSocket extends EventEmitter { this.#ws.close(code, reason); } - get binaryType() { - return this.#binaryType; + terminate() { + this.#ws.terminate(); } - set binaryType(value) { - if (value) this.#ws.binaryType = value; + get url() { + return this.#ws.url; + } + + get readyState() { + return this.#ws.readyState; + } + + get binaryType() { + return this.#binaryType; } set binaryType(value) { @@ -108,6 +134,8 @@ class BunWebSocket extends EventEmitter { this.#ws.binaryType = "nodebuffer"; this.#binaryType = "fragments"; this.#fragments = true; + } else { + throw new Error(`Invalid binaryType: ${value}`); } } @@ -170,10 +198,6 @@ class BunWebSocket extends EventEmitter { } ping(data, mask, cb) { - if (this.readyState === BunWebSocket.CONNECTING) { - throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); - } - if (typeof data === "function") { cb = data; data = mask = undefined; @@ -184,16 +208,17 @@ class BunWebSocket extends EventEmitter { if (typeof data === "number") data = data.toString(); - // deviation: we don't support ping - emitWarning("ping()", "ws.WebSocket.ping() is not implemented in bun"); + try { + this.#ws.ping(data); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + typeof cb === "function" && cb(); } pong(data, mask, cb) { - if (this.readyState === BunWebSocket.CONNECTING) { - throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); - } - if (typeof data === "function") { cb = data; data = mask = undefined; @@ -204,14 +229,21 @@ class BunWebSocket extends EventEmitter { if (typeof data === "number") data = data.toString(); - // deviation: we don't support pong - emitWarning("pong()", "ws.WebSocket.pong() is not implemented in bun"); + try { + this.#ws.pong(data); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + typeof cb === "function" && cb(); } pause() { - if (this.readyState === WebSocket.CONNECTING || this.readyState === WebSocket.CLOSED) { - return; + switch (this.readyState) { + case WebSocket.CONNECTING: + case WebSocket.CLOSED: + return; } this.#paused = true; @@ -221,8 +253,10 @@ class BunWebSocket extends EventEmitter { } resume() { - if (this.readyState === WebSocket.CONNECTING || this.readyState === WebSocket.CLOSED) { - return; + switch (this.readyState) { + case WebSocket.CONNECTING: + case WebSocket.CLOSED: + return; } this.#paused = false; diff --git a/test/harness.ts b/test/harness.ts index b0cedba74..8b850bbfc 100644 --- a/test/harness.ts +++ b/test/harness.ts @@ -1,4 +1,4 @@ -import { gc as bunGC, unsafe } from "bun"; +import { gc as bunGC, unsafe, which } from "bun"; import { heapStats } from "bun:jsc"; import path from "path"; import fs from "fs"; @@ -16,6 +16,10 @@ export function bunExe() { return process.execPath; } +export function nodeExe(): string | null { + return which("node") || null; +} + export function gc(force = true) { bunGC(force); } diff --git a/test/js/bun/websocket/websocket-client-echo.mjs b/test/js/bun/websocket/websocket-client-echo.mjs new file mode 100644 index 000000000..d721f673c --- /dev/null +++ b/test/js/bun/websocket/websocket-client-echo.mjs @@ -0,0 +1,70 @@ +import { WebSocket } from "ws"; + +let url; +try { + url = new URL(process.argv[2]); +} catch { + throw new Error(`Usage: ${process.argv0} websocket-client-echo.mjs <url>`); +} + +const ws = new WebSocket(url, { + perMessageDeflate: false, +}); + +ws.on("open", () => { + console.log("Connected", ws.url); // read by test script + console.error("Connected", ws.url); +}); + +ws.on("message", (data, isBinary) => { + if (isBinary) { + console.error("Received binary message:", data); + } else { + console.error("Received text message:", data); + } + ws.send(data, { binary: isBinary }); + + if (data === "ping") { + console.error("Sending ping"); + ws.ping(); + } else if (data === "pong") { + console.error("Sending pong"); + ws.pong(); + } else if (data === "close") { + console.error("Sending close"); + ws.close(); + } else if (data === "terminate") { + console.error("Sending terminate"); + ws.terminate(); + } +}); + +ws.on("ping", data => { + console.error("Received ping:", data); + ws.ping(data); +}); + +ws.on("pong", data => { + console.error("Received pong:", data); + ws.pong(data); +}); + +ws.on("error", error => { + console.error("Received error:", error); +}); + +ws.on("close", (code, reason, wasClean) => { + if (wasClean === true) { + console.error("Received abrupt close:", code, reason); + } else { + console.error("Received close:", code, reason); + } +}); + +ws.on("redirect", url => { + console.error("Received redirect:", url); +}); + +ws.on("unexpected-response", (_, response) => { + console.error("Received unexpected response:", response.statusCode, { ...response.headers }); +}); diff --git a/test/js/bun/websocket/websocket-server.test.ts b/test/js/bun/websocket/websocket-server.test.ts index 7a79f9f5f..eb9b773a3 100644 --- a/test/js/bun/websocket/websocket-server.test.ts +++ b/test/js/bun/websocket/websocket-server.test.ts @@ -1,1255 +1,607 @@ -import { describe, expect, it } from "bun:test"; -import { gcTick } from "harness"; -import { serve, ServerWebSocket } from "bun"; - -describe("websocket server", () => { - it("send & receive empty messages", done => { - const serverReceived: any[] = []; - const clientReceived: any[] = []; - var clientDone = false; - var serverDone = false; - - let server = Bun.serve({ - websocket: { - open(ws) { - ws.send(""); - ws.send(new ArrayBuffer(0)); - }, - message(ws, data) { - serverReceived.push(data); - - if (serverReceived.length === 2) { - if (serverReceived.find(d => d === "") === undefined) { - done(new Error("expected empty string")); - } - - if (!serverReceived.find(d => d.byteLength === 0)) { - done(new Error("expected empty Buffer")); - } - - serverDone = true; +import { describe, it, expect, afterEach } from "bun:test"; +import type { Server, Subprocess, WebSocketHandler } from "bun"; +import { serve, spawn } from "bun"; +import { bunEnv, bunExe, nodeExe } from "harness"; +import { drainMicrotasks, fullGC } from "bun:jsc"; + +const strings = [ + { + label: "string (ascii)", + message: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + message: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + message: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, +]; + +const buffers = [ + { + label: "Uint8Array (utf-8)", + message: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "ArrayBuffer (utf-8)", + message: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "Buffer (utf-8)", + message: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, +]; + +const messages = [...strings, ...buffers]; + +const binaryTypes = [ + { + label: "nodebuffer", + type: Buffer, + }, + { + label: "arraybuffer", + type: ArrayBuffer, + }, + { + label: "uint8array", + type: Uint8Array, + }, +] as const; + +let servers: Server[] = []; +let clients: Subprocess[] = []; + +afterEach(() => { + for (const server of servers) { + server.stop(true); + } + for (const client of clients) { + client.kill(); + } +}); - if (clientDone && serverDone) { - z.close(); - server.stop(true); - done(); - } +describe("Server", () => { + describe("websocket", () => { + test("open", done => ({ + open(ws) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + done(); + }, + })); + test("close", done => ({ + open(ws) { + ws.close(); + }, + close(ws, code, reason) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(code).toBeInteger(); + expect(reason).toBeString(); + done(); + }, + })); + test("message", done => ({ + open(ws) { + ws.send("Hello"); + }, + message(ws, data) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(data).toBeDefined(); + done(); + }, + })); + test("drain", done => ({ + backpressureLimit: 1, + open(ws) { + const data = new Uint8Array(1 * 1024 * 1024); + // send data until backpressure is triggered + for (let i = 0; i < 10; i++) { + if (ws.send(data) < 1) { // backpressure or dropped + break; } - }, - close() {}, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); } }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.onmessage = e => { - clientReceived.push(e.data); - - if (clientReceived.length === 2) { - if (clientReceived.find(d => d === "") === undefined) { - done(new Error("expected empty string")); - } - - if (!clientReceived.find(d => d.byteLength === 0)) { - done(new Error("expected empty Buffer")); + drain(ws) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + done(); + }, + })); + test("ping", done => ({ + open(ws) { + ws.ping(); + }, + ping(ws, data) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(data).toBeInstanceOf(Buffer); + done(); + }, + })); + test("pong", done => ({ + open(ws) { + ws.pong(); + }, + pong(ws, data) { + expect(ws).toBeDefined(); + expect(ws).toHaveProperty("data", { id: 0 }); + expect(data).toBeInstanceOf(Buffer); + done(); + }, + })); + test("maxPayloadLength", done => ({ + maxPayloadLength: 4, + open(ws) { + ws.send("Hello!"); + }, + close(_, code) { + expect(code).toBe(1006); + done(); + }, + })); + test("backpressureLimit", done => ({ + backpressureLimit: 1, + open(ws) { + const data = new Uint8Array(1 * 1024 * 1024); + expect(ws.send(data.slice(0, 1))).toBe(1); // sent + let backpressure; + for (let i = 0; i < 10; i++) { + if (ws.send(data) === -1) { + backpressure = true; + break; + } } - - clientDone = true; - if (clientDone && serverDone) { - server.stop(true); - z.close(); - - done(); + if (!backpressure) { + done(new Error("backpressure not triggered")); + return; } - } - }; - z.addEventListener("open", () => { - z.send(""); - z.send(new Buffer(0)); - }); - }); - - it("remoteAddress works", done => { - let server = Bun.serve({ - websocket: { - message() {}, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - done(); - } catch (e) { - done(e); + let dropped; + for (let i = 0; i < 10; i++) { + if (ws.send(data) === 0) { + dropped = true; + break; } - }, - close() {}, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - setTimeout(() => z.close(), 0); - }); - z.addEventListener("close", () => { - server.stop(); - }); - }); - it("can do publish()", async done => { - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - open(ws) { - ws.subscribe("all"); - }, - message(ws, msg) {}, - close(ws) {}, - }, - fetch(req, server) { - if (server.upgrade(req)) { + if (!dropped) { + done(new Error("message not dropped")); return; } - - return new Response("success"); - }, - }); - - await new Promise<void>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - var clientCounter = 0; - - socket.onmessage = e => { - expect(e.data).toBe("hello"); - resolve2(); - }; - socket.onopen = () => { - queueMicrotask(() => { - server.publish("all", "hello"); - }); - }; - }); - server.stop(true); - done(); - }); - - it("can do publish() with publishToSelf: false", async done => { - var server = serve({ - port: 0, - websocket: { - open(ws) { - ws.subscribe("all"); - ws.publish("all", "hey"); - server.publish("all", "hello"); - }, - message(ws, msg) { - if (new TextDecoder().decode(msg as Uint8Array) !== "hello") { - done(new Error("unexpected message")); + done(); + }, + })); + // FIXME: close() callback is called, but only after timeout? + it.todo("closeOnBackpressureLimit"); + /* + test("closeOnBackpressureLimit", done => ({ + closeOnBackpressureLimit: true, + backpressureLimit: 1, + open(ws) { + const data = new Uint8Array(1 * 1024 * 1024); + // send data until backpressure is triggered + for (let i = 0; i < 10; i++) { + if (ws.send(data) < 1) { + return; } - }, - close(ws) {}, - publishToSelf: false, - }, - fetch(req, server) { - if (server.upgrade(req)) { - return; } - - return new Response("success"); + done(new Error("backpressure not triggered")); }, - }); - - await new Promise<void>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - - socket.onmessage = e => { - expect(e.data).toBe("hello"); - resolve2(); - }; - }); - server.stop(true); - done(); + close(_, code) { + expect(code).toBe(1006); + done(); + }, + })); + */ + it.todo("perMessageDeflate"); }); +}); - for (let method of ["publish", "publishText", "publishBinary"] as const) { - describe(method, () => { - it("in close() should work", async () => { - var count = 0; - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - open(ws) { - ws.subscribe("all"); - }, - message(ws, msg) {}, - close(ws) { - (ws[method] as any)("all", method === "publishBinary" ? Buffer.from("bye!") : "bye!"); - count++; - - if (count >= 2) { - server.stop(true); - } - }, - }, - fetch(req, server) { - if (server.upgrade(req)) { - return; - } - - return new Response("success"); - }, - }); - +describe("ServerWebSocket", () => { + test("readyState", done => ({ + open(ws) { + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }, + close(ws) { + expect(ws.readyState).toBe(WebSocket.CLOSED); + done(); + }, + })); + test("remoteAddress", done => ({ + open(ws) { + expect(ws.remoteAddress).toMatch(/^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/); + done(); + }, + })); + describe("binaryType", () => { + test("(default)", done => ({ + open(ws) { + expect(ws.binaryType).toBe("nodebuffer"); + done(); + }, + })); + test("(invalid)", done => ({ + open(ws) { try { - const first = await new Promise<WebSocket>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - socket.onopen = () => resolve2(socket); - }); - - await new Promise<WebSocket>((resolve2, reject2) => { - var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); - socket.onopen = () => { - queueMicrotask(() => first.close()); - }; - socket.onmessage = ev => { - var msg = ev.data; - if (typeof msg !== "string") { - msg = new TextDecoder().decode(msg); - } - - if (msg === "bye!") { - socket.close(0); - resolve2(socket); - } else { - reject2(msg); - } - }; - }); - } finally { - server.stop(true); - } - }); - }); - } - - it("close inside open", async () => { - var resolve: () => void; - var server = serve({ - port: 0, - websocket: { - open(ws) {}, - message(ws, msg) {}, - close() { - resolve(); - server.stop(true); - }, - }, - fetch(req, server) { - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); - } - return; + // @ts-expect-error + ws.binaryType = "invalid"; + done(new Error("Expected an error")); + } catch (cause) { + done(); } - - return new Response("noooooo hello world"); }, - }); - - await new Promise<void>((resolve_, reject) => { - resolve = resolve_; - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.close(); - }; - websocket.onmessage = e => {}; - websocket.onerror = e => {}; - }); - }); - - it("headers error doesn't crash", async () => { - await new Promise<void>((resolve, reject) => { - const server = serve({ - port: 0, - websocket: { - open(ws) { - ws.close(); - }, - message(ws, msg) {}, - close() { - resolve(); - server.stop(true); - }, + })); + for (const { label, type } of binaryTypes) { + test(label, done => ({ + open(ws) { + ws.binaryType = label; + expect(ws.binaryType).toBe(label); + ws.send(new Uint8Array(1)); }, - error(err) { - resolve(); - server.stop(true); + message(ws, received) { + expect(received).toBeInstanceOf(type); + ws.ping(); }, - fetch(req, server) { - expect(() => { - if ( - server.upgrade(req, { - data: "hello world", - headers: 1238 as any, - }) - ) { - reject(new Error("should not upgrade")); - return new Response("should not upgrade"); - } - }).toThrow("upgrade options.headers must be a Headers or an object"); - resolve(); - return new Response("success"); + ping(ws, received) { + expect(received).toBeInstanceOf(type); + ws.pong(); }, - }); - - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => websocket.close(); - websocket.onmessage = e => {}; - websocket.onerror = e => {}; - }); + pong(_, received) { + expect(received).toBeInstanceOf(type); + done(); + }, + })); + } }); - it("can do hello world", async () => { - const server = serve({ - port: 0, - websocket: { + describe("send()", () => { + for (const { label, message, bytes } of messages) { + test(label, done => ({ open(ws) { - server.stop(); + ws.send(message); }, - message(ws, msg) { - ws.send("hello world"); - }, - }, - fetch(req, server) { - server.stop(); - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); + message(_, received) { + if (typeof received === "string") { + expect(received).toBe(message); + } else { + expect(received).toEqual(Buffer.from(bytes)); } - return; - } - - return new Response("noooooo hello world"); + done(); + }, + })); + } + test( + "(benchmark)", + (done, connect) => { + const maxClients = 10; + const maxMessages = 10_000; + let count = 0; + return { + open(ws) { + if (ws.data.id < maxClients) { + connect(); + } + for (let i = 0; i < maxMessages; i++) { + ws.send(`${i}`, true); + ws.sendText(`${i}`, true); + ws.sendBinary(Buffer.from(`${i}`), true); + } + }, + message() { + if (++count === maxClients * maxMessages * 3) { + done(); + } + }, + }; }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send("hello world"); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); + 30_000, + ); }); - - it("fetch() allows a Response object to be returned for an upgraded ServerWebSocket", () => { - const server = serve({ - port: 0, - websocket: { + describe("sendBinary()", () => { + for (const { label, message, bytes } of buffers) { + test(label, done => ({ open(ws) { - server.stop(); + ws.sendBinary(message); }, - message(ws, msg) { - ws.send("hello world"); + message(_, received) { + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }, - error(err) { - console.error(err); - }, - fetch(req, server) { - server.stop(); - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); - } - return new Response("lol!", { - status: 101, - }); - } - - return new Response("noooooo hello world"); - }, - }); - - return new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send("hello world"); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); + })); + } }); - - it("fetch() allows a Promise<Response> object to be returned for an upgraded ServerWebSocket", () => { - const server = serve({ - port: 0, - websocket: { - async open(ws) { - server.stop(); + describe("sendText()", () => { + for (const { label, message } of strings) { + test(label, done => ({ + open(ws) { + ws.sendText(message); }, - async message(ws, msg) { - await 1; - ws.send("hello world"); + message(_, received) { + expect(received).toEqual(message); + done(); }, - }, - error(err) { - console.error(err); - }, - async fetch(req, server) { - server.stop(); - await 1; - if ( - server.upgrade(req, { - data: "hello world", - - // check that headers works - headers: { - "x-a": "text/plain", - }, - }) - ) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); - } - return new Response("lol!", { - status: 101, - }); - } - - return new Response("noooooo hello world"); - }, - }); - return new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send("hello world"); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); + })); + } }); - it("binaryType works", async () => { - var done = false; - const server = serve({ - port: 0, - websocket: { + describe("subscribe()", () => { + for (const { label, message } of strings) { + test(label, done => ({ open(ws) { - server.stop(); + expect(ws.isSubscribed(message)).toBeFalse(); + ws.subscribe(message); + expect(ws.isSubscribed(message)).toBeTrue(); + ws.unsubscribe(message); + expect(ws.isSubscribed(message)).toBeFalse(); + done(); }, - message(ws, msg) { - // The first message is supposed to be "uint8array" - // Then after uint8array, we switch it to "nodebuffer" - // Then after nodebuffer, we switch it to "arraybuffer" - // and then we're done - switch (ws.binaryType) { - case "nodebuffer": { - for (let badType of [ - 123, - NaN, - Symbol("uint8array"), - "uint16array", - "uint32array", - "float32array", - "float64array", - "garbage", - ]) { - expect(() => { - /* @ts-ignore */ - ws.binaryType = badType; - }).toThrow(); - } - expect(ws.binaryType).toBe("nodebuffer"); - ws.binaryType = "uint8array"; - expect(ws.binaryType).toBe("uint8array"); - expect(msg instanceof Uint8Array).toBe(true); - expect(Buffer.isBuffer(msg)).toBe(true); - break; - } - - case "uint8array": { - expect(ws.binaryType).toBe("uint8array"); - ws.binaryType = "arraybuffer"; - expect(ws.binaryType).toBe("arraybuffer"); - expect(msg instanceof Uint8Array).toBe(true); - expect(Buffer.isBuffer(msg)).toBe(false); - break; - } - - case "arraybuffer": { - expect(ws.binaryType).toBe("arraybuffer"); - expect(msg instanceof ArrayBuffer).toBe(true); - done = true; - break; - } - - default: { - throw new Error("unknown binaryType"); - } + })); + } + }); + describe("publish()", () => { + for (const { label, message, bytes } of messages) { + const topic = typeof message === "string" ? message : label; + test(label, (done, connect) => ({ + async open(ws) { + ws.subscribe(topic); + if (ws.data.id === 0) { + connect(); + } else if (ws.data.id === 1) { + ws.publish(topic, message); } - - ws.send("hello world"); }, - }, - fetch(req, server) { - server.stop(); - if (server.upgrade(req, { data: "hello world" })) { - if (server.upgrade(req)) { - throw new Error("should not upgrade twice"); + message(ws, received) { + if (ws.data.id === 1) { + throw new Error("Expected publish() to not send to self"); } - return; - } - - return new Response("noooooo hello world"); - }, - }); - - const isDone = await new Promise<boolean>((resolve, reject) => { - var counter = 0; - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onopen = () => { - websocket.send(Buffer.from("hello world")); - }; - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - - if (counter++ > 2) { - websocket.close(); - resolve(done); + if (typeof message === "string") { + expect(received).toBe(message); + } else { + expect(received).toEqual(Buffer.from(bytes)); } - websocket.send(Buffer.from("oaksd")); - } catch (r) { - websocket.close(); - reject(r); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); - expect(isDone).toBe(true); - }); - - it("does not upgrade for non-websocket connections", async () => { - await new Promise<void>(async (resolve, reject) => { - var server = serve({ - port: 0, - websocket: { - open(ws) { - ws.send("hello world"); - }, - message(ws, msg) {}, + done(); }, - fetch(req, server) { - if (server.upgrade(req)) { - reject(new Error("should not upgrade")); + })); + } + }); + describe("publishBinary()", () => { + for (const { label, message, bytes } of buffers) { + test(label, (done, connect) => ({ + async open(ws) { + ws.subscribe(label); + if (ws.data.id === 0) { + connect(); + } else if (ws.data.id === 1) { + ws.publishBinary(label, message); } - - return new Response("success"); }, - }); - - const response = await fetch(`http://${server.hostname}:${server.port}`); - expect(await response.text()).toBe("success"); - resolve(); - server.stop(true); - }); - }); - - it("does not upgrade for non-websocket servers", async () => { - await new Promise<void>(async (resolve, reject) => { - const server = serve({ - port: 0, - fetch(req, server) { - server.stop(); - expect(() => { - server.upgrade(req); - }).toThrow('To enable websocket support, set the "websocket" object in Bun.serve({})'); - return new Response("success"); + message(ws, received) { + if (ws.data.id === 1) { + throw new Error("Expected publish() to not send to self"); + } + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }); - - const response = await fetch(`http://${server.hostname}:${server.port}`); - expect(await response.text()).toBe("success"); - resolve(); - }); + })); + } }); - - it("async can do hello world", async () => { - const server = serve({ - port: 0, - websocket: { + describe("publishText()", () => { + for (const { label, message } of strings) { + test(label, (done, connect) => ({ async open(ws) { - server.stop(true); - ws.send("hello world"); - }, - message(ws, msg) {}, - }, - async fetch(req, server) { - server.stop(); - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); - }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); - } - }; - websocket.onerror = e => { - reject(e); - }; - }); - }); - - it("publishText()", async () => { - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.publishText("hello", "world"); - websocket.close(); - server.stop(true); - resolve(); - }, - message(ws, msg) {}, + ws.subscribe(label); + if (ws.data.id === 0) { + connect(); + } else if (ws.data.id === 1) { + ws.publishText(label, message); + } }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + message(ws, received) { + if (ws.data.id === 1) { + throw new Error("Expected publish() to not send to self"); + } + expect(received).toEqual(message); + done(); }, - }); - - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("publishBinary()", async () => { - const bytes = Buffer.from("hello"); - - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - // FIXME: update this test to not rely on publishToSelf: true, - publishToSelf: true, - - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.publishBinary("hello", bytes); - websocket.close(); - server.stop(true); - resolve(); - }, - message(ws, msg) {}, - }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + describe("publish() with { publishToSelf: true }", () => { + for (const { label, message, bytes } of messages) { + const topic = typeof message === "string" ? message : label; + test(label, done => ({ + publishToSelf: true, + async open(ws) { + ws.subscribe(topic); + ws.publish(topic, message); + }, + message(_, received) { + if (typeof message === "string") { + expect(received).toBe(message); + } else { + expect(received).toEqual(Buffer.from(bytes)); + } + done(); }, - }); - - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("sendText()", async () => { - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.sendText("hello world", true); - resolve(); - websocket.close(); - server.stop(true); - }, - message(ws, msg) {}, + describe("ping()", () => { + test("(no argument)", done => ({ + open(ws) { + ws.ping(); + }, + ping(_, received) { + expect(received).toBeEmpty(); + done(); + }, + })); + for (const { label, message, bytes } of messages) { + test(label, done => ({ + open(ws) { + ws.ping(message); }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + ping(_, received) { + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }); - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("sendBinary()", async () => { - const bytes = Buffer.from("hello"); - await new Promise<void>((resolve, reject) => { - var websocket: WebSocket; - var server = serve({ - port: 0, - websocket: { - async open(ws) { - // we don't care about the data - // we just want to make sure the DOMJIT call doesn't crash - for (let i = 0; i < 40_000; i++) ws.sendBinary(bytes, true); - websocket.close(); - server.stop(true); - resolve(); - }, - message(ws, msg) {}, + describe("pong()", () => { + test("(no argument)", done => ({ + open(ws) { + ws.pong(); + }, + pong(_, received) { + expect(received).toBeEmpty(); + done(); + }, + })); + for (const { label, message, bytes } of messages) { + test(label, done => ({ + open(ws) { + ws.pong(message); }, - async fetch(req, server) { - await 1; - if (server.upgrade(req)) return; - - return new Response("noooooo hello world"); + pong(_, received) { + expect(received).toEqual(Buffer.from(bytes)); + done(); }, - }); - - websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - }); + })); + } }); - - it("can do hello world corked", async () => { - const server = serve({ - port: 0, - websocket: { - open(ws) { - server.stop(); - ws.send("hello world"); - }, - message(ws, msg) { + test("cork()", done => { + let count = 0; + return { + open(ws) { + setTimeout(() => { ws.cork(() => { - ws.send("hello world"); + ws.send("1"); + ws.sendText("2"); + ws.sendBinary(new TextEncoder().encode("3")); }); - }, - }, - fetch(req, server) { - server.stop(); - if (server.upgrade(req)) return; - return new Response("noooooo hello world"); + }, 5); }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - - websocket.onmessage = e => { - try { - expect(e.data).toBe("hello world"); - resolve(); - } catch (r) { - reject(r); - } finally { - websocket.close(); + message(_, message) { + if (typeof message === "string") { + expect(+message).toBe(++count); + } else { + expect(+new TextDecoder().decode(message)).toBe(++count); } - }; - websocket.onerror = e => { - reject(e); - }; - }); - server.stop(true); - }); - - it("can do some back and forth", async () => { - var dataCount = 0; - const server = serve({ - port: 0, - websocket: { - open(ws) { - server.stop(); - }, - message(ws, msg) { - if (msg === "first") { - ws.send("first"); - return; - } - ws.send(`counter: ${dataCount++}`); - }, - }, - fetch(req, server) { - server.stop(); - if ( - server.upgrade(req, { - data: { count: 0 }, - }) - ) - return new Response("noooooo hello world"); - }, - }); - - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onerror = e => { - reject(e); - }; - - var counter = 0; - websocket.onopen = () => websocket.send("first"); - websocket.onmessage = e => { - try { - switch (counter++) { - case 0: { - expect(e.data).toBe("first"); - websocket.send("where are the loops"); - break; - } - case 1: { - expect(e.data).toBe("counter: 0"); - websocket.send("br0ther may i have some loops"); - break; - } - case 2: { - expect(e.data).toBe("counter: 1"); - websocket.send("br0ther may i have some loops"); - break; - } - case 3: { - expect(e.data).toBe("counter: 2"); - resolve(); - break; - } - } - } catch (r) { - reject(r); - websocket.close(); + if (count === 3) { + done(); } - }; - }); - server.stop(true); - }); - - it("send rope strings", async () => { - var ropey = "hello world".repeat(10); - var sendQueue: any[] = []; - for (var i = 0; i < 20; i++) { - sendQueue.push(ropey + " " + i); - } - - var serverCounter = 0; - var clientCounter = 0; - - const server = serve({ - port: 0, - websocket: { - open(ws) {}, - message(ws, msg) { - ws.send(sendQueue[serverCounter++] + " "); - serverCounter % 10 === 0 && gcTick(); - }, - }, - fetch(req, server) { - if ( - server.upgrade(req, { - data: { count: 0 }, - }) - ) - return; - - return new Response("noooooo hello world"); }, - }); - try { - await new Promise<void>((resolve, reject) => { - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onerror = e => { - reject(e); - }; - - websocket.onopen = () => { - server.stop(); - websocket.send("first"); - }; - - websocket.onmessage = e => { - try { - const expected = sendQueue[clientCounter++] + " "; - expect(e.data).toBe(expected); - websocket.send("next"); - if (clientCounter === sendQueue.length) { - websocket.close(); - resolve(); - } - } catch (r) { - reject(r); - console.error(r); - websocket.close(); - } - }; - }); - } catch (e) { - throw e; - } finally { - server.stop(true); - } + }; }); - - // this test sends 50 messages to 10 connected clients via pubsub - it("pub/sub", async () => { - var ropey = "hello world".repeat(10); - var sendQueue: any[] = []; - for (var i = 0; i < 50; i++) { - sendQueue.push(ropey + " " + i); - } - - var serverCounter = 0; - var clientCount = 0; - const server = serve({ - port: 0, - websocket: { + describe("close()", () => { + test("(no arguments)", done => ({ + open(ws) { + ws.close(); + }, + close(_, code, reason) { + expect(code).toBe(1000); + expect(reason).toBeEmpty(); + done(); + }, + })); + test("(no reason)", done => ({ + open(ws) { + ws.close(1001); + }, + close(_, code, reason) { + expect(code).toBe(1001); + expect(reason).toBeEmpty(); + done(); + }, + })); + for (const { label, message } of strings) { + test(label, done => ({ open(ws) { - ws.subscribe("test"); - if (!ws.isSubscribed("test")) { - throw new Error("not subscribed"); - } - ws.unsubscribe("test"); - if (ws.isSubscribed("test")) { - throw new Error("subscribed"); - } - ws.subscribe("test"); - clientCount++; - if (clientCount === 10) { - setTimeout(() => server.publish("test", "hello world"), 1); - } + ws.close(1002, message); }, - message(ws, msg) { - if (serverCounter < sendQueue.length) server.publish("test", sendQueue[serverCounter++] + " "); + close(_, code, reason) { + expect(code).toBe(1002); + expect(reason).toBe(message); + done(); }, - }, - fetch(req) { - if ( - server.upgrade(req, { - data: { count: 0 }, - }) - ) - return; - return new Response("noooooo hello world"); - }, - }); - try { - const connections = new Array(10); - const websockets = new Array(connections.length); - var doneCounter = 0; - await new Promise<void>(done => { - for (var i = 0; i < connections.length; i++) { - var j = i; - var resolve: (_?: unknown) => void, - reject: (_?: unknown) => void, - resolveConnection: (_?: unknown) => void, - rejectConnection: (_?: unknown) => void; - connections[j] = new Promise((res, rej) => { - resolveConnection = res; - rejectConnection = rej; - }); - websockets[j] = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - const websocket = new WebSocket(`ws://${server.hostname}:${server.port}`); - websocket.onerror = e => { - reject(e); - }; - websocket.onclose = () => { - doneCounter++; - if (doneCounter === connections.length) { - done(); - } - }; - var hasOpened = false; - websocket.onopen = () => { - if (!hasOpened) { - hasOpened = true; - resolve(websocket); - } - }; - - let clientCounter = -1; - var hasSentThisTick = false; - - websocket.onmessage = e => { - if (!hasOpened) { - hasOpened = true; - resolve(websocket); - } - - if (e.data === "hello world") { - clientCounter = 0; - websocket.send("first"); - return; - } - - try { - expect(!!sendQueue.find(a => a + " " === e.data)).toBe(true); - - if (!hasSentThisTick) { - websocket.send("second"); - hasSentThisTick = true; - queueMicrotask(() => { - hasSentThisTick = false; - }); - } - - if (clientCounter++ === sendQueue.length - 1) { - websocket.close(); - resolveConnection(); - } - } catch (r) { - console.error(r); - websocket.close(); - rejectConnection(r); - } - }; - } - }); - } catch (e) { - throw e; - } finally { - server.stop(true); - gcTick(); + })); } - - expect(serverCounter).toBe(sendQueue.length); - }, 30_000); - it("can close with reason and code #2631", done => { - let timeout: any; - let server = Bun.serve({ - websocket: { - message(ws) { - ws.close(2000, "test"); - }, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - } catch (e) { - clearTimeout(timeout); - done(e); - } - }, - close(ws, code, reason) { - try { - expect(code).toBe(2000); - expect(reason).toBe("test"); - clearTimeout(timeout); - done(); - } catch (e) { - clearTimeout(timeout); - done(e); - } - }, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); - } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - z.send("test"); - }); - z.addEventListener("close", () => { - server.stop(); - }); - - timeout = setTimeout(() => { - done(new Error("Did not close in time")); - server.stop(true); - }, 1000); }); + test("terminate()", done => ({ + open(ws) { + ws.terminate(); + }, + close(_, code, reason) { + expect(code).toBe(1006); + expect(reason).toBeEmpty(); + done(); + }, + })); +}); - it("can close with code and without reason #2631", done => { - let timeout: any; - let server = Bun.serve({ - websocket: { - message(ws) { - ws.close(2000); - }, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - } catch (e) { - done(e); - clearTimeout(timeout); - } - }, - close(ws, code, reason) { - clearTimeout(timeout); - - try { - expect(code).toBe(2000); - expect(reason).toBe(""); - done(); - } catch (e) { - done(e); - } - }, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); +function test( + label: string, + fn: (done: (err?: unknown) => void, connect: () => Promise<void>) => Partial<WebSocketHandler<{ id: number }>>, + timeout?: number, +) { + it( + label, + testDone => { + let isDone = false; + const done = (err?: unknown) => { + if (!isDone) { + isDone = true; + server.stop(); + testDone(err); } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - z.send("test"); - }); - z.addEventListener("close", () => { - server.stop(); - }); - - timeout = setTimeout(() => { - done(new Error("Did not close in time")); - server.stop(true); - }, 1000); - }); - it("can close without reason or code #2631", done => { - let timeout: any; - let server = Bun.serve({ - websocket: { - message(ws) { - ws.close(); - }, - open(ws) { - try { - expect(ws.remoteAddress).toBe("127.0.0.1"); - } catch (e) { - clearTimeout(timeout); - done(e); + }; + let id = 0; + const server: Server = serve({ + port: 0, + fetch(request, server) { + const data = { id: id++ }; + if (server.upgrade(request, { data })) { + return; } + return new Response(); }, - close(ws, code, reason) { - clearTimeout(timeout); - try { - expect(code).toBe(1006); - expect(reason).toBe(""); - done(); - } catch (e) { - done(e); - } + websocket: { + sendPings: false, + message() {}, + ...fn(done, () => connect(server)), }, - }, - fetch(req, server) { - if (!server.upgrade(req)) { - return new Response(null, { status: 404 }); - } - }, - port: 0, - }); - - let z = new WebSocket(`ws://${server.hostname}:${server.port}`); - z.addEventListener("open", () => { - z.send("test"); - }); - z.addEventListener("close", () => { - server.stop(); - }); - - timeout = setTimeout(() => { - done(new Error("Did not close in time")); - server.stop(true); - }, 1000); + }); + servers.push(server); + connect(server); + }, + { timeout: timeout ?? 1000 }, + ); +} + +async function connect(server: Server): Promise<void> { + const url = new URL(`ws://${server.hostname}:${server.port}/`); + const { pathname } = new URL("./websocket-client-echo.mjs", import.meta.url); + // @ts-ignore + const client = spawn({ + cmd: [nodeExe() ?? bunExe(), pathname, url], + cwd: import.meta.dir, + env: bunEnv, + stderr: "ignore", + stdout: "pipe", }); -}); + clients.push(client); + for await (const chunk of client.stdout) { + if (new TextDecoder().decode(chunk).includes("Connected")) { + return; + } + } +} diff --git a/test/js/first_party/ws/ws.test.ts b/test/js/first_party/ws/ws.test.ts new file mode 100644 index 000000000..c0b56a200 --- /dev/null +++ b/test/js/first_party/ws/ws.test.ts @@ -0,0 +1,284 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import type { Subprocess } from "bun"; +import { spawn } from "bun"; +import { bunEnv, bunExe, nodeExe } from "harness"; +import { WebSocket } from "ws"; + +const strings = [ + { + label: "string (ascii)", + message: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + message: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + message: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, +]; + +const buffers = [ + { + label: "Uint8Array (utf-8)", + message: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "ArrayBuffer (utf-8)", + message: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "Buffer (utf-8)", + message: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, +]; + +const messages = [...strings, ...buffers]; + +const binaryTypes = [ + { + label: "nodebuffer", + type: Buffer, + }, + { + label: "arraybuffer", + type: ArrayBuffer, + }, +] as const; + +let servers: Subprocess[] = []; +let clients: WebSocket[] = []; + +function cleanUp() { + for (const client of clients) { + client.terminate(); + } + for (const server of servers) { + server.kill(); + } +} + +beforeEach(cleanUp); +afterEach(cleanUp); + +describe("WebSocket", () => { + test("url", (ws, done) => { + expect(ws.url).toStartWith("ws://"); + done(); + }); + test("readyState", (ws, done) => { + expect(ws.readyState).toBe(WebSocket.CONNECTING); + ws.on("open", () => { + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }); + ws.on("close", () => { + expect(ws.readyState).toBe(WebSocket.CLOSED); + done(); + }); + }); + describe("binaryType", () => { + test("(default)", (ws, done) => { + expect(ws.binaryType).toBe("nodebuffer"); + done(); + }); + test("(invalid)", (ws, done) => { + try { + // @ts-expect-error + ws.binaryType = "invalid"; + done(new Error("Expected an error")); + } catch { + done(); + } + }); + for (const { label, type } of binaryTypes) { + test(label, (ws, done) => { + ws.binaryType = label; + ws.on("open", () => { + expect(ws.binaryType).toBe(label); + ws.send(new Uint8Array(1)); + }); + ws.on("message", (data, isBinary) => { + expect(data).toBeInstanceOf(type); + expect(isBinary).toBeTrue(); + ws.ping(); + }); + ws.on("ping", (data) => { + expect(data).toBeInstanceOf(type); + ws.pong(); + }); + ws.on("pong", (data) => { + expect(data).toBeInstanceOf(type); + done(); + }); + }); + } + }); + describe("send()", () => { + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.send(message); + }); + ws.on("message", (data, isBinary) => { + if (typeof data === "string") { + expect(data).toBe(message); + expect(isBinary).toBeFalse(); + } else { + expect(data).toEqual(Buffer.from(bytes)); + expect(isBinary).toBeTrue(); + } + done(); + }); + }); + } + }); + describe("ping()", () => { + test("(no argument)", (ws, done) => { + ws.on("open", () => { + ws.ping(); + }); + ws.on("ping", (data) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.ping(message); + }); + ws.on("ping", (data) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("pong()", () => { + test("(no argument)", (ws, done) => { + ws.on("open", () => { + ws.pong(); + }); + ws.on("pong", (data) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.pong(message); + }); + ws.on("pong", (data) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("close()", () => { + test("(no arguments)", (ws, done) => { + ws.on("open", () => { + ws.close(); + }); + ws.on("close", (code: number, reason: string, wasClean: boolean) => { + expect(code).toBe(1000); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + test("(no reason)", (ws, done) => { + ws.on("open", () => { + ws.close(1001); + }); + ws.on("close", (code: number, reason: string, wasClean: boolean) => { + expect(code).toBe(1001); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + // FIXME: Encoding issue + // Expected: "latin1-©" + // Received: "latin1-©" + /* + for (const { label, message } of strings) { + test(label, (ws, done) => { + ws.on("open", () => { + ws.close(1002, message); + }); + ws.on("close", (code, reason, wasClean) => { + expect(code).toBe(1002); + expect(reason).toBe(message); + expect(wasClean).toBeTrue(); + done(); + }); + }); + } + */ + }); + test("terminate()", (ws, done) => { + ws.on("open", () => { + ws.terminate(); + }); + ws.on("close", (code: number, reason: string, wasClean: boolean) => { + expect(code).toBe(1006); + expect(reason).toBeString(); + expect(wasClean).toBeFalse(); + done(); + }); + }); +}); + +function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) { + it( + label, + testDone => { + let isDone = false; + const done = (err?: unknown) => { + if (!isDone) { + isDone = true; + testDone(err); + } + }; + listen() + .then(url => { + const ws = new WebSocket(url); + clients.push(ws); + fn(ws, done); + }) + .catch(done); + }, + { timeout: timeout ?? 1000 }, + ); +} + +async function listen(): Promise<URL> { + const { pathname } = new URL("../../web/websocket/websocket-server-echo.mjs", import.meta.url); + const server = spawn({ + cmd: [nodeExe() ?? bunExe(), pathname], + cwd: import.meta.dir, + env: bunEnv, + stderr: "ignore", + stdout: "pipe", + }); + servers.push(server); + for await (const chunk of server.stdout) { + const text = new TextDecoder().decode(chunk); + try { + return new URL(text); + } catch { + throw new Error(`Invalid URL: '${text}'`); + } + } + throw new Error("No URL found?"); +} diff --git a/test/js/web/websocket/websocket-client.test.ts b/test/js/web/websocket/websocket-client.test.ts new file mode 100644 index 000000000..470fa71c0 --- /dev/null +++ b/test/js/web/websocket/websocket-client.test.ts @@ -0,0 +1,280 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import type { Subprocess } from "bun"; +import { spawn } from "bun"; +import { bunEnv, bunExe, nodeExe } from "harness"; + +const strings = [ + { + label: "string (ascii)", + message: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + message: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + message: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, +]; + +const buffers = [ + { + label: "Uint8Array (utf-8)", + message: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "ArrayBuffer (utf-8)", + message: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "Buffer (utf-8)", + message: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, +]; + +const messages = [...strings, ...buffers]; + +const binaryTypes = [ + { + label: "nodebuffer", + type: Buffer, + }, + { + label: "arraybuffer", + type: ArrayBuffer, + }, +] as const; + +let servers: Subprocess[] = []; +let clients: WebSocket[] = []; + +function cleanUp() { + for (const client of clients) { + client.terminate(); + } + for (const server of servers) { + server.kill(); + } +} + +beforeEach(cleanUp); +afterEach(cleanUp); + +describe("WebSocket", () => { + test("url", (ws, done) => { + expect(ws.url).toStartWith("ws://"); + done(); + }); + test("readyState", (ws, done) => { + expect(ws.readyState).toBe(WebSocket.CONNECTING); + ws.addEventListener("open", () => { + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }); + ws.addEventListener("close", () => { + expect(ws.readyState).toBe(WebSocket.CLOSED); + done(); + }); + }); + describe("binaryType", () => { + test("(default)", (ws, done) => { + expect(ws.binaryType).toBe("nodebuffer"); + done(); + }); + test("(invalid)", (ws, done) => { + try { + // @ts-expect-error + ws.binaryType = "invalid"; + done(new Error("Expected an error")); + } catch { + done(); + } + }); + for (const { label, type } of binaryTypes) { + test(label, (ws, done) => { + ws.binaryType = label; + ws.addEventListener("open", () => { + expect(ws.binaryType).toBe(label); + ws.send(new Uint8Array(1)); + }); + ws.addEventListener("message", ({ data }) => { + expect(data).toBeInstanceOf(type); + ws.ping(); + }); + ws.addEventListener("ping", ({ data }) => { + expect(data).toBeInstanceOf(type); + ws.pong(); + }); + ws.addEventListener("pong", ({ data }) => { + expect(data).toBeInstanceOf(type); + done(); + }); + }); + } + }); + describe("send()", () => { + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.send(message); + }); + ws.addEventListener("message", ({ data }) => { + if (typeof data === "string") { + expect(data).toBe(message); + } else { + expect(data).toEqual(Buffer.from(bytes)); + } + done(); + }); + }); + } + }); + describe("ping()", () => { + test("(no argument)", (ws, done) => { + ws.addEventListener("open", () => { + ws.ping(); + }); + ws.addEventListener("ping", ({ data }) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.ping(message); + }); + ws.addEventListener("ping", ({ data }) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("pong()", () => { + test("(no argument)", (ws, done) => { + ws.addEventListener("open", () => { + ws.pong(); + }); + ws.addEventListener("pong", ({ data }) => { + expect(data).toBeInstanceOf(Buffer); + done(); + }); + }); + for (const { label, message, bytes } of messages) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.pong(message); + }); + ws.addEventListener("pong", ({ data }) => { + expect(data).toEqual(Buffer.from(bytes)); + done(); + }); + }); + } + }); + describe("close()", () => { + test("(no arguments)", (ws, done) => { + ws.addEventListener("open", () => { + ws.close(); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1000); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + test("(no reason)", (ws, done) => { + ws.addEventListener("open", () => { + ws.close(1001); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1001); + expect(reason).toBeString(); + expect(wasClean).toBeTrue(); + done(); + }); + }); + // FIXME: Encoding issue + // Expected: "latin1-©" + // Received: "latin1-©" + /* + for (const { label, message } of strings) { + test(label, (ws, done) => { + ws.addEventListener("open", () => { + ws.close(1002, message); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1002); + expect(reason).toBe(message); + expect(wasClean).toBeTrue(); + done(); + }); + }); + } + */ + }); + test("terminate()", (ws, done) => { + ws.addEventListener("open", () => { + ws.terminate(); + }); + ws.addEventListener("close", ({ code, reason, wasClean }) => { + expect(code).toBe(1006); + expect(reason).toBeString(); + expect(wasClean).toBeFalse(); + done(); + }); + }); +}); + +function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) { + it( + label, + testDone => { + let isDone = false; + const done = (err?: unknown) => { + if (!isDone) { + isDone = true; + testDone(err); + } + }; + listen() + .then(url => { + const ws = new WebSocket(url); + clients.push(ws); + fn(ws, done); + }) + .catch(done); + }, + { timeout: timeout ?? 1000 }, + ); +} + +async function listen(): Promise<URL> { + const { pathname } = new URL("./websocket-server-echo.mjs", import.meta.url); + const server = spawn({ + cmd: [nodeExe() ?? bunExe(), pathname], + cwd: import.meta.dir, + env: bunEnv, + stderr: "ignore", + stdout: "pipe", + }); + servers.push(server); + for await (const chunk of server.stdout) { + const text = new TextDecoder().decode(chunk); + try { + return new URL(text); + } catch { + throw new Error(`Invalid URL: '${text}'`); + } + } + throw new Error("No URL found?"); +} diff --git a/test/js/web/websocket/websocket-server-echo.mjs b/test/js/web/websocket/websocket-server-echo.mjs new file mode 100644 index 000000000..7ce1b205b --- /dev/null +++ b/test/js/web/websocket/websocket-server-echo.mjs @@ -0,0 +1,94 @@ +import { createServer } from "node:http"; +import { WebSocketServer } from "ws"; + +const server = createServer(); +const wss = new WebSocketServer({ + perMessageDeflate: false, + noServer: true, +}); + +server.on("listening", () => { + const { address, port, family } = server.address(); + const { href } = new URL(family === "IPv6" ? `ws://[${address}]:${port}` : `ws://${address}:${port}`); + console.log(href); + console.error("Listening:", href); +}); + +server.on("request", (request, response) => { + console.error("Received request:", { ...request.headers }); + response.end(); +}); + +server.on("clientError", (error, socket) => { + console.error("Received client error:", error); + socket.end(); +}); + +server.on("error", error => { + console.error("Received error:", error); +}); + +server.on("upgrade", (request, socket, head) => { + console.error("Received upgrade:", { ...request.headers }); + + socket.on("data", data => { + console.error("Received bytes:", data); + }); + + wss.handleUpgrade(request, socket, head, ws => { + wss.emit("connection", ws, request); + }); +}); + +wss.on("connection", (ws, request) => { + console.error("Received connection:", request.socket.remoteAddress); + + ws.on("message", message => { + console.error("Received message:", message); + ws.send(message); + + if (message === "ping") { + console.error("Sending ping"); + ws.ping(); + } else if (message === "pong") { + console.error("Sending pong"); + ws.pong(); + } else if (message === "close") { + console.error("Sending close"); + ws.close(); + } else if (message === "terminate") { + console.error("Sending terminate"); + ws.terminate(); + } + }); + + ws.on("ping", data => { + console.error("Received ping:", data); + ws.ping(data); + }); + + ws.on("pong", data => { + console.error("Received pong:", data); + ws.pong(data); + }); + + ws.on("close", (code, reason) => { + console.error("Received close:", code, reason); + }); + + ws.on("error", error => { + console.error("Received error:", error); + }); +}); + +server.on("close", () => { + console.error("Server closed"); +}); + +process.on("exit", exitCode => { + console.error("Server exited:", exitCode); +}); + +const hostname = process.env.HOST || "127.0.0.1"; +const port = parseInt(process.env.PORT) || 0; +server.listen(port, hostname); |