diff options
author | 2022-11-09 23:54:56 -0800 | |
---|---|---|
committer | 2022-11-09 23:54:56 -0800 | |
commit | 8753c483ff3e23929048eedeb4047b0b0aef281d (patch) | |
tree | c951d19cca6e9bab358c4c8095e8902d36b8c280 | |
parent | 2149e1f0a095d5a4854f0ec7a2b638a3146e388f (diff) | |
download | bun-8753c483ff3e23929048eedeb4047b0b0aef281d.tar.gz bun-8753c483ff3e23929048eedeb4047b0b0aef281d.tar.zst bun-8753c483ff3e23929048eedeb4047b0b0aef281d.zip |
Implement Server.publish()
Fixes https://github.com/oven-sh/bun/issues/1417
-rw-r--r-- | packages/bun-types/bun.d.ts | 36 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 61 | ||||
-rw-r--r-- | test/bun.js/websocket-server.test.ts | 36 |
3 files changed, 133 insertions, 0 deletions
diff --git a/packages/bun-types/bun.d.ts b/packages/bun-types/bun.d.ts index 8611ccb30..4098cca4d 100644 --- a/packages/bun-types/bun.d.ts +++ b/packages/bun-types/bun.d.ts @@ -1474,6 +1474,42 @@ declare module "bun" { ): boolean; /** + * Send a message to all connected {@link ServerWebSocket} subscribed to a topic + * + * @param topic The topic to publish to + * @param data The data to send + * @param compress Should the data be compressed? Ignored if the client does not support compression. + * + * @returns 0 if the message was dropped, -1 if backpressure was applied, or the number of bytes sent. + * + * @example + * + * ```js + * server.publish("chat", "Hello World"); + * ``` + * + * @example + * ```js + * server.publish("chat", new Uint8Array([1, 2, 3, 4])); + * ``` + * + * @example + * ```js + * server.publish("chat", new ArrayBuffer(4), true); + * ``` + * + * @example + * ```js + * server.publish("chat", new DataView(new ArrayBuffer(4))); + * ``` + */ + publish( + topic: string, + data: string | ArrayBufferView | ArrayBuffer, + compress?: boolean, + ): ServerWebSocketSendStatus; + + /** * How many requests are in-flight right now? */ readonly pendingRequests: number; diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 2ada0aef2..f4d173c95 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -3657,6 +3657,10 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { .upgrade = .{ .rfn = JSC.wrapSync(ThisServer, "onUpgrade"), }, + + .publish = .{ + .rfn = JSC.wrapSync(ThisServer, "publish"), + }, }, .{ .port = .{ @@ -3680,6 +3684,63 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { }, ); + pub fn publish(this: *ThisServer, globalThis: *JSC.JSGlobalObject, topic: ZigString, message_value: JSValue, compress_value: ?JSValue, exception: JSC.C.ExceptionRef) JSValue { + if (this.config.websocket == null) + return JSValue.jsNumber(0); + + var app = this.app; + + if (topic.len == 0) { + httplog("publish() topic invalid", .{}); + JSC.JSError(this.vm.allocator, "publish requires a topic string", .{}, globalThis, exception); + return .zero; + } + + var topic_slice = topic.toSlice(bun.default_allocator); + defer topic_slice.deinit(); + if (topic_slice.len == 0) { + JSC.JSError(this.vm.allocator, "publish requires a non-empty topic", .{}, globalThis, exception); + return .zero; + } + + const compress = (compress_value orelse JSValue.jsBoolean(true)).toBoolean(); + + if (message_value.isEmptyOrUndefinedOrNull()) { + JSC.JSError(this.vm.allocator, "publish requires a non-empty message", .{}, globalThis, exception); + return .zero; + } + + if (message_value.asArrayBuffer(globalThis)) |buffer| { + if (buffer.len == 0) { + JSC.JSError(this.vm.allocator, "publish requires a non-empty message", .{}, globalThis, exception); + return .zero; + } + + return JSValue.jsNumber( + // if 0, return 0 + // else return number of bytes sent + @as(i32, @boolToInt(uws.AnyWebSocket.publishWithOptions(ssl_enabled, app, topic_slice.slice(), buffer.slice(), .binary, compress))) * @intCast(i32, @truncate(u31, buffer.len)), + ); + } + + { + var string_slice = message_value.toSlice(globalThis, bun.default_allocator); + defer string_slice.deinit(); + if (string_slice.len == 0) { + return JSValue.jsNumber(0); + } + + const buffer = string_slice.slice(); + return JSValue.jsNumber( + // if 0, return 0 + // else return number of bytes sent + @as(i32, @boolToInt(uws.AnyWebSocket.publishWithOptions(ssl_enabled, app, topic_slice.slice(), buffer, .text, compress))) * @intCast(i32, @truncate(u31, buffer.len)), + ); + } + + return .zero; + } + pub fn onUpgrade( this: *ThisServer, globalThis: *JSC.JSGlobalObject, diff --git a/test/bun.js/websocket-server.test.ts b/test/bun.js/websocket-server.test.ts index dd88d6707..acc69aa03 100644 --- a/test/bun.js/websocket-server.test.ts +++ b/test/bun.js/websocket-server.test.ts @@ -12,6 +12,42 @@ function getPort() { } describe("websocket server", () => { + it("can do publish()", async (done) => { + var server = serve({ + port: getPort(), + websocket: { + open(ws) { + ws.subscribe("all"); + }, + message(ws, msg) {}, + close(ws) {}, + }, + fetch(req, server) { + if (server.upgrade(req)) { + return; + } + + return new Response("success"); + }, + }); + + await new Promise<WebSocket>((resolve2, reject2) => { + var socket = new WebSocket(`ws://${server.hostname}:${server.port}`); + var clientCounter = 0; + + socket.onmessage = (e) => { + expect(e.data).toBe("hello"); + resolve2(); + }; + socket.onopen = () => { + queueMicrotask(() => { + server.publish("all", "hello"); + }); + }; + }); + server.stop(); + done(); + }); for (let method of ["publish", "publishText", "publishBinary"]) { describe(method, () => { it("in close() should work", async () => { |