aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-09 23:54:56 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-09 23:54:56 -0800
commit8753c483ff3e23929048eedeb4047b0b0aef281d (patch)
treec951d19cca6e9bab358c4c8095e8902d36b8c280
parent2149e1f0a095d5a4854f0ec7a2b638a3146e388f (diff)
downloadbun-8753c483ff3e23929048eedeb4047b0b0aef281d.tar.gz
bun-8753c483ff3e23929048eedeb4047b0b0aef281d.tar.zst
bun-8753c483ff3e23929048eedeb4047b0b0aef281d.zip
Implement Server.publish()
Fixes https://github.com/oven-sh/bun/issues/1417
-rw-r--r--packages/bun-types/bun.d.ts36
-rw-r--r--src/bun.js/api/server.zig61
-rw-r--r--test/bun.js/websocket-server.test.ts36
3 files changed, 133 insertions, 0 deletions
diff --git a/packages/bun-types/bun.d.ts b/packages/bun-types/bun.d.ts
index 8611ccb30..4098cca4d 100644
--- a/packages/bun-types/bun.d.ts
+++ b/packages/bun-types/bun.d.ts
@@ -1474,6 +1474,42 @@ declare module "bun" {
): boolean;
/**
+ * Send a message to all connected {@link ServerWebSocket} subscribed to a topic
+ *
+ * @param topic The topic to publish to
+ * @param data The data to send
+ * @param compress Should the data be compressed? Ignored if the client does not support compression.
+ *
+ * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent.
+ *
+ * @example
+ *
+ * ```js
+ * server.publish("chat", "Hello World");
+ * ```
+ *
+ * @example
+ * ```js
+ * server.publish("chat", new Uint8Array([1, 2, 3, 4]));
+ * ```
+ *
+ * @example
+ * ```js
+ * server.publish("chat", new ArrayBuffer(4), true);
+ * ```
+ *
+ * @example
+ * ```js
+ * server.publish("chat", new DataView(new ArrayBuffer(4)));
+ * ```
+ */
+ publish(
+ topic: string,
+ data: string | ArrayBufferView | ArrayBuffer,
+ compress?: boolean,
+ ): ServerWebSocketSendStatus;
+
+ /**
* How many requests are in-flight right now?
*/
readonly pendingRequests: number;
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 2ada0aef2..f4d173c95 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -3657,6 +3657,10 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
.upgrade = .{
.rfn = JSC.wrapSync(ThisServer, "onUpgrade"),
},
+
+ .publish = .{
+ .rfn = JSC.wrapSync(ThisServer, "publish"),
+ },
},
.{
.port = .{
@@ -3680,6 +3684,63 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
},
);
+ pub fn publish(this: *ThisServer, globalThis: *JSC.JSGlobalObject, topic: ZigString, message_value: JSValue, compress_value: ?JSValue, exception: JSC.C.ExceptionRef) JSValue {
+ if (this.config.websocket == null)
+ return JSValue.jsNumber(0);
+
+ var app = this.app;
+
+ if (topic.len == 0) {
+ httplog("publish() topic invalid", .{});
+ JSC.JSError(this.vm.allocator, "publish requires a topic string", .{}, globalThis, exception);
+ return .zero;
+ }
+
+ var topic_slice = topic.toSlice(bun.default_allocator);
+ defer topic_slice.deinit();
+ if (topic_slice.len == 0) {
+ JSC.JSError(this.vm.allocator, "publish requires a non-empty topic", .{}, globalThis, exception);
+ return .zero;
+ }
+
+ const compress = (compress_value orelse JSValue.jsBoolean(true)).toBoolean();
+
+ if (message_value.isEmptyOrUndefinedOrNull()) {
+ JSC.JSError(this.vm.allocator, "publish requires a non-empty message", .{}, globalThis, exception);
+ return .zero;
+ }
+
+ if (message_value.asArrayBuffer(globalThis)) |buffer| {
+ if (buffer.len == 0) {
+ JSC.JSError(this.vm.allocator, "publish requires a non-empty message", .{}, globalThis, exception);
+ return .zero;
+ }
+
+ return JSValue.jsNumber(
+ // if 0, return 0
+ // else return number of bytes sent
+ @as(i32, @boolToInt(uws.AnyWebSocket.publishWithOptions(ssl_enabled, app, topic_slice.slice(), buffer.slice(), .binary, compress))) * @intCast(i32, @truncate(u31, buffer.len)),
+ );
+ }
+
+ {
+ var string_slice = message_value.toSlice(globalThis, bun.default_allocator);
+ defer string_slice.deinit();
+ if (string_slice.len == 0) {
+ return JSValue.jsNumber(0);
+ }
+
+ const buffer = string_slice.slice();
+ return JSValue.jsNumber(
+ // if 0, return 0
+ // else return number of bytes sent
+ @as(i32, @boolToInt(uws.AnyWebSocket.publishWithOptions(ssl_enabled, app, topic_slice.slice(), buffer, .text, compress))) * @intCast(i32, @truncate(u31, buffer.len)),
+ );
+ }
+
+ return .zero;
+ }
+
pub fn onUpgrade(
this: *ThisServer,
globalThis: *JSC.JSGlobalObject,
diff --git a/test/bun.js/websocket-server.test.ts b/test/bun.js/websocket-server.test.ts
index dd88d6707..acc69aa03 100644
--- a/test/bun.js/websocket-server.test.ts
+++ b/test/bun.js/websocket-server.test.ts
@@ -12,6 +12,42 @@ function getPort() {
}
describe("websocket server", () => {
+ it("can do publish()", async (done) => {
+ var server = serve({
+ port: getPort(),
+ websocket: {
+ open(ws) {
+ ws.subscribe("all");
+ },
+ message(ws, msg) {},
+ close(ws) {},
+ },
+ fetch(req, server) {
+ if (server.upgrade(req)) {
+ return;
+ }
+
+ return new Response("success");
+ },
+ });
+
+ await new Promise<WebSocket>((resolve2, reject2) => {
+ var socket = new WebSocket(`ws://${server.hostname}:${server.port}`);
+ var clientCounter = 0;
+
+ socket.onmessage = (e) => {
+ expect(e.data).toBe("hello");
+ resolve2();
+ };
+ socket.onopen = () => {
+ queueMicrotask(() => {
+ server.publish("all", "hello");
+ });
+ };
+ });
+ server.stop();
+ done();
+ });
for (let method of ["publish", "publishText", "publishBinary"]) {
describe(method, () => {
it("in close() should work", async () => {