aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-05-21 23:36:42 -0300
committerGravatar GitHub <noreply@github.com> 2023-05-21 19:36:42 -0700
commitfd21243efd86f54b374beb90faaedcdc3c4df266 (patch)
treedffc060ace8f0a2eb4903708d076451e621cfb71 /src
parentd90f7c7bf61e6350c211f7b22f44a3a80a1088f2 (diff)
downloadbun-fd21243efd86f54b374beb90faaedcdc3c4df266.tar.gz
bun-fd21243efd86f54b374beb90faaedcdc3c4df266.tar.zst
bun-fd21243efd86f54b374beb90faaedcdc3c4df266.zip
WS send with callback (#2986)
* WS send with callback * add opts.compress support * fmt * compress is the only option we care * add ws client options * only change ws client when using blob * fmt * fix errors * fixup * fixup * fmt
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/ws.exports.js75
1 files changed, 35 insertions, 40 deletions
diff --git a/src/bun.js/ws.exports.js b/src/bun.js/ws.exports.js
index b560fa713..959cf82a7 100644
--- a/src/bun.js/ws.exports.js
+++ b/src/bun.js/ws.exports.js
@@ -21,36 +21,28 @@ class BunWebSocket extends globalThis.WebSocket {
return this.#binaryType;
}
set binaryType(type) {
- if (type !== "nodebuffer" && type !== "buffer" && type !== "blob" && type !== "arraybuffer") {
- throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'");
+ if (type !== "nodebuffer" && type !== "blob" && type !== "arraybuffer") {
+ throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'");
+ }
+ if (type !== "blob") {
+ super.binaryType = type;
}
this.#binaryType = type;
}
+
+ send(data, opts, cb) {
+ super.send(data, opts?.compress);
+ typeof cb === "function" && cb();
+ }
+
on(event, callback) {
if (event === "message") {
- var handler = ({ message }) => {
+ var handler = ({ data }) => {
try {
- if (typeof message === "string") {
- if (this.#binaryType === "arraybuffer") {
- message = encoder.encode(message).buffer;
- } else if (this.#binaryType === "blob") {
- message = new Blob([message], { type: "text/plain" });
- } else {
- // nodebuffer or buffer
- message = Buffer.from(message);
- }
- } else {
- //Uint8Array
- if (this.#binaryType === "arraybuffer") {
- message = message.buffer;
- } else if (this.#binaryType === "blob") {
- message = new Blob([message]);
- } else {
- // nodebuffer or buffer
- message = Buffer.from(message);
- }
+ if (this.#binaryType == "blob") {
+ data = new Blob([data]);
}
- callback(message);
+ callback(data);
} catch (e) {
globalThis.reportError(e);
}
@@ -349,13 +341,8 @@ class BunWebSocketMocked extends EventEmitter {
this.#url = url;
this.#bufferedAmount = 0;
binaryType = binaryType || "arraybuffer";
- if (
- binaryType !== "nodebuffer" &&
- binaryType !== "buffer" &&
- binaryType !== "blob" &&
- binaryType !== "arraybuffer"
- ) {
- throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'");
+ if (binaryType !== "nodebuffer" && binaryType !== "blob" && binaryType !== "arraybuffer") {
+ throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'");
}
this.#binaryType = binaryType;
this.#protocol = protocol;
@@ -383,7 +370,7 @@ class BunWebSocketMocked extends EventEmitter {
} else if (this.#binaryType === "blob") {
message = new Blob([message], { type: "text/plain" });
} else {
- // nodebuffer or buffer
+ // nodebuffer
message = Buffer.from(message);
}
} else {
@@ -393,10 +380,11 @@ class BunWebSocketMocked extends EventEmitter {
} else if (this.#binaryType === "blob") {
message = new Blob([message]);
} else {
- // nodebuffer or buffer
+ // nodebuffer
message = Buffer.from(message);
}
}
+
this.emit("message", message);
}
@@ -418,28 +406,35 @@ class BunWebSocketMocked extends EventEmitter {
#drain(ws) {
const chunk = this.#enquedMessages[0];
if (chunk) {
- const written = ws.send(chunk);
+ const [data, compress, cb] = chunk;
+ const written = ws.send(data, compress);
if (written == -1) {
// backpressure wait until next drain event
return;
}
+ typeof cb === "function" && cb();
+
this.#bufferedAmount -= chunk.length;
this.#enquedMessages.shift();
}
}
- send(data) {
+ send(data, opts, cb) {
if (this.#state === 1) {
- const written = this.#ws.send(data);
+ const compress = opts?.compress;
+ const written = this.#ws.send(data, compress);
if (written == -1) {
// backpressure
- this.#enquedMessages.push(data);
+ this.#enquedMessages.push([data, compress, cb]);
this.#bufferedAmount += data.length;
+ return;
}
+
+ typeof cb === "function" && cb();
} else if (this.#state === 0) {
// not connected yet
- this.#enquedMessages.push(data);
+ this.#enquedMessages.push([data, opts?.compress, cb]);
this.#bufferedAmount += data.length;
}
}
@@ -455,8 +450,8 @@ class BunWebSocketMocked extends EventEmitter {
}
set binaryType(type) {
- if (type !== "nodebuffer" && type !== "buffer" && type !== "blob" && type !== "arraybuffer") {
- throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'");
+ if (type !== "nodebuffer" && type !== "blob" && type !== "arraybuffer") {
+ throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'");
}
this.#binaryType = type;
}
@@ -775,7 +770,7 @@ class Server extends EventEmitter {
? this.options.handleProtocols(protocols, request)
: protocols.values().next().value;
}
- const ws = new BunWebSocketMocked(request.url, protocol, extensions, "arraybuffer");
+ const ws = new BunWebSocketMocked(request.url, protocol, extensions, "nodebuffer");
const headers = ["HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", "Connection: Upgrade"];
this.emit("headers", headers, request);