diff options
| author | 2023-05-21 23:36:42 -0300 | |
|---|---|---|
| committer | 2023-05-21 19:36:42 -0700 | |
| commit | fd21243efd86f54b374beb90faaedcdc3c4df266 (patch) | |
| tree | dffc060ace8f0a2eb4903708d076451e621cfb71 /src | |
| parent | d90f7c7bf61e6350c211f7b22f44a3a80a1088f2 (diff) | |
| download | bun-fd21243efd86f54b374beb90faaedcdc3c4df266.tar.gz bun-fd21243efd86f54b374beb90faaedcdc3c4df266.tar.zst bun-fd21243efd86f54b374beb90faaedcdc3c4df266.zip | |
WS send with callback (#2986)
* WS send with callback
* add opts.compress support
* fmt
* compress is the only option we care
* add ws client options
* only change ws client when using blob
* fmt
* fix errors
* fixup
* fixup
* fmt
Diffstat (limited to 'src')
| -rw-r--r-- | src/bun.js/ws.exports.js | 75 |
1 files changed, 35 insertions, 40 deletions
diff --git a/src/bun.js/ws.exports.js b/src/bun.js/ws.exports.js index b560fa713..959cf82a7 100644 --- a/src/bun.js/ws.exports.js +++ b/src/bun.js/ws.exports.js @@ -21,36 +21,28 @@ class BunWebSocket extends globalThis.WebSocket { return this.#binaryType; } set binaryType(type) { - if (type !== "nodebuffer" && type !== "buffer" && type !== "blob" && type !== "arraybuffer") { - throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'"); + if (type !== "nodebuffer" && type !== "blob" && type !== "arraybuffer") { + throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'"); + } + if (type !== "blob") { + super.binaryType = type; } this.#binaryType = type; } + + send(data, opts, cb) { + super.send(data, opts?.compress); + typeof cb === "function" && cb(); + } + on(event, callback) { if (event === "message") { - var handler = ({ message }) => { + var handler = ({ data }) => { try { - if (typeof message === "string") { - if (this.#binaryType === "arraybuffer") { - message = encoder.encode(message).buffer; - } else if (this.#binaryType === "blob") { - message = new Blob([message], { type: "text/plain" }); - } else { - // nodebuffer or buffer - message = Buffer.from(message); - } - } else { - //Uint8Array - if (this.#binaryType === "arraybuffer") { - message = message.buffer; - } else if (this.#binaryType === "blob") { - message = new Blob([message]); - } else { - // nodebuffer or buffer - message = Buffer.from(message); - } + if (this.#binaryType == "blob") { + data = new Blob([data]); } - callback(message); + callback(data); } catch (e) { globalThis.reportError(e); } @@ -349,13 +341,8 @@ class BunWebSocketMocked extends EventEmitter { this.#url = url; this.#bufferedAmount = 0; binaryType = binaryType || "arraybuffer"; - if ( - binaryType !== "nodebuffer" && - binaryType !== "buffer" && - binaryType !== "blob" && - binaryType !== "arraybuffer" - ) { - throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'"); + if (binaryType !== "nodebuffer" && binaryType !== "blob" && binaryType !== "arraybuffer") { + throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'"); } this.#binaryType = binaryType; this.#protocol = protocol; @@ -383,7 +370,7 @@ class BunWebSocketMocked extends EventEmitter { } else if (this.#binaryType === "blob") { message = new Blob([message], { type: "text/plain" }); } else { - // nodebuffer or buffer + // nodebuffer message = Buffer.from(message); } } else { @@ -393,10 +380,11 @@ class BunWebSocketMocked extends EventEmitter { } else if (this.#binaryType === "blob") { message = new Blob([message]); } else { - // nodebuffer or buffer + // nodebuffer message = Buffer.from(message); } } + this.emit("message", message); } @@ -418,28 +406,35 @@ class BunWebSocketMocked extends EventEmitter { #drain(ws) { const chunk = this.#enquedMessages[0]; if (chunk) { - const written = ws.send(chunk); + const [data, compress, cb] = chunk; + const written = ws.send(data, compress); if (written == -1) { // backpressure wait until next drain event return; } + typeof cb === "function" && cb(); + this.#bufferedAmount -= chunk.length; this.#enquedMessages.shift(); } } - send(data) { + send(data, opts, cb) { if (this.#state === 1) { - const written = this.#ws.send(data); + const compress = opts?.compress; + const written = this.#ws.send(data, compress); if (written == -1) { // backpressure - this.#enquedMessages.push(data); + this.#enquedMessages.push([data, compress, cb]); this.#bufferedAmount += data.length; + return; } + + typeof cb === "function" && cb(); } else if (this.#state === 0) { // not connected yet - this.#enquedMessages.push(data); + this.#enquedMessages.push([data, opts?.compress, cb]); this.#bufferedAmount += data.length; } } @@ -455,8 +450,8 @@ class BunWebSocketMocked extends EventEmitter { } set binaryType(type) { - if (type !== "nodebuffer" && type !== "buffer" && type !== "blob" && type !== "arraybuffer") { - throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'"); + if (type !== "nodebuffer" && type !== "blob" && type !== "arraybuffer") { + throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'"); } this.#binaryType = type; } @@ -775,7 +770,7 @@ class Server extends EventEmitter { ? this.options.handleProtocols(protocols, request) : protocols.values().next().value; } - const ws = new BunWebSocketMocked(request.url, protocol, extensions, "arraybuffer"); + const ws = new BunWebSocketMocked(request.url, protocol, extensions, "nodebuffer"); const headers = ["HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", "Connection: Upgrade"]; this.emit("headers", headers, request); |
