aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api/server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r--src/bun.js/api/server.zig371
1 files changed, 366 insertions, 5 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index b4a351089..30ba4d714 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -2542,6 +2542,7 @@ pub const ServerWebSocket = struct {
websocket: uws.AnyWebSocket = undefined,
closed: bool = false,
binary_type: JSC.JSValue.JSType = .Uint8Array,
+ opened: bool = false,
pub usingnamespace JSC.Codegen.JSServerWebSocket;
@@ -2562,6 +2563,7 @@ pub const ServerWebSocket = struct {
const onOpenHandler = handler.onOpen;
this.this_value = .zero;
+ this.opened = false;
if (value_to_cache != .zero) {
const current_this = this.getThisValue();
ServerWebSocket.dataSetCached(current_this, globalObject, value_to_cache);
@@ -2579,10 +2581,15 @@ pub const ServerWebSocket = struct {
const error_handler = handler.onError;
ws.cork(&corker, Corker.run);
const result = corker.result;
+ this.opened = true;
if (result.isAnyError(globalObject)) {
log("onOpen exception", .{});
- ws.close();
+ if (!this.closed) {
+ this.closed = true;
+ this.websocket.end(1000, "");
+ }
+
_ = ServerWebSocket.dangerouslySetPtr(this_value, null);
handler.active_connections -|= 1;
this_value.unprotect();
@@ -2808,7 +2815,7 @@ pub const ServerWebSocket = struct {
return JSValue.jsNumber(
// if 0, return 0
// else return number of bytes sent
- @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer.slice(), .text, compress))) * @intCast(i32, @truncate(u31, buffer.len)),
+ @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer.slice(), .binary, compress))) * @intCast(i32, @truncate(u31, buffer.len)),
);
}
@@ -2830,6 +2837,195 @@ pub const ServerWebSocket = struct {
return .zero;
}
+ pub fn publishText(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ callframe: *JSC.CallFrame,
+ ) callconv(.C) JSValue {
+ const args = callframe.arguments(4);
+
+ if (args.len < 1) {
+ log("publish()", .{});
+ globalThis.throw("publish requires at least 1 argument", .{});
+ return .zero;
+ }
+
+ if (this.closed) {
+ log("publish() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ const topic_value = args.ptr[0];
+ const message_value = args.ptr[1];
+ const compress_value = args.ptr[2];
+
+ if (topic_value.isEmptyOrUndefinedOrNull() or !topic_value.isString()) {
+ log("publish() topic invalid", .{});
+ globalThis.throw("publishText requires a topic string", .{});
+ return .zero;
+ }
+
+ var topic_slice = topic_value.toSlice(globalThis, bun.default_allocator);
+ defer topic_slice.deinit();
+ if (topic_slice.len == 0) {
+ globalThis.throw("publishText requires a non-empty topic", .{});
+ return .zero;
+ }
+
+ const compress = args.len > 1 and compress_value.toBoolean();
+
+ if (message_value.isEmptyOrUndefinedOrNull() or !message_value.isString()) {
+ globalThis.throw("publishText requires a non-empty message", .{});
+ return .zero;
+ }
+
+ var string_slice = message_value.toSlice(globalThis, bun.default_allocator);
+ defer string_slice.deinit();
+ if (string_slice.len == 0) {
+ return JSValue.jsNumber(0);
+ }
+
+ const buffer = string_slice.slice();
+ return JSValue.jsNumber(
+ // if 0, return 0
+ // else return number of bytes sent
+ @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer, .text, compress))) * @intCast(i32, @truncate(u31, buffer.len)),
+ );
+ }
+
+ pub fn publishBinary(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ callframe: *JSC.CallFrame,
+ ) callconv(.C) JSValue {
+ const args = callframe.arguments(4);
+
+ if (args.len < 1) {
+ log("publishBinary()", .{});
+ globalThis.throw("publishBinary requires at least 1 argument", .{});
+ return .zero;
+ }
+
+ if (this.closed) {
+ log("publishBinary() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ const topic_value = args.ptr[0];
+ const message_value = args.ptr[1];
+ const compress_value = args.ptr[2];
+
+ if (topic_value.isEmptyOrUndefinedOrNull() or !topic_value.isString()) {
+ log("publishBinary() topic invalid", .{});
+ globalThis.throw("publishBinary requires a topic string", .{});
+ return .zero;
+ }
+
+ var topic_slice = topic_value.toSlice(globalThis, bun.default_allocator);
+ defer topic_slice.deinit();
+ if (topic_slice.len == 0) {
+ globalThis.throw("publishBinary requires a non-empty topic", .{});
+ return .zero;
+ }
+
+ const compress = args.len > 1 and compress_value.toBoolean();
+
+ if (message_value.isEmptyOrUndefinedOrNull()) {
+ globalThis.throw("publishBinary requires a non-empty message", .{});
+ return .zero;
+ }
+ const buffer = message_value.asArrayBuffer(globalThis) orelse {
+ globalThis.throw("publishBinary expects an ArrayBufferView", .{});
+ return .zero;
+ };
+
+ if (buffer.len == 0) {
+ return JSC.JSValue.jsNumber(0);
+ }
+
+ return JSValue.jsNumber(
+ // if 0, return 0
+ // else return number of bytes sent
+ @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer.slice(), .binary, compress))) * @intCast(i32, @truncate(u31, buffer.len)),
+ );
+ }
+
+ pub fn publishBinaryWithoutTypeChecks(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ topic_str: *JSC.JSString,
+ buffer: *JSC.JSUint8Array,
+ ) callconv(.C) JSC.JSValue {
+ if (this.closed) {
+ log("publishBinary() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ var topic_slice = topic_str.toSlice(globalThis, bun.default_allocator);
+ defer topic_slice.deinit();
+ if (topic_slice.len == 0) {
+ globalThis.throw("publishBinary requires a non-empty topic", .{});
+ return .zero;
+ }
+
+ const compress = true;
+
+ const slice = buffer.slice();
+ if (slice.len == 0) {
+ return JSC.JSValue.jsNumber(0);
+ }
+
+ return JSValue.jsNumber(
+ // if 0, return 0
+ // else return number of bytes sent
+ @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), slice, .binary, compress))) * @intCast(
+ i32,
+ @truncate(u31, slice.len),
+ ),
+ );
+ }
+
+ pub fn publishTextWithoutTypeChecks(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ topic_str: *JSC.JSString,
+ str: *JSC.JSString,
+ ) callconv(.C) JSC.JSValue {
+ if (this.closed) {
+ log("publishBinary() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ var topic_slice = topic_str.toSlice(globalThis, bun.default_allocator);
+ defer topic_slice.deinit();
+ if (topic_slice.len == 0) {
+ globalThis.throw("publishBinary requires a non-empty topic", .{});
+ return .zero;
+ }
+
+ const compress = true;
+
+ const slice = str.toSlice(globalThis, bun.default_allocator);
+ if (slice.len == 0) {
+ return JSC.JSValue.jsNumber(0);
+ }
+
+ return JSValue.jsNumber(
+ // if 0, return 0
+ // else return number of bytes sent
+ @as(i32, @boolToInt(this.websocket.publishWithOptions(
+ this.handler.app,
+ topic_slice.slice(),
+ slice.slice(),
+ .text,
+ compress,
+ ))) * @intCast(
+ i32,
+ @truncate(u31, slice.len),
+ ),
+ );
+ }
+
pub fn cork(
this: *ServerWebSocket,
globalThis: *JSC.JSGlobalObject,
@@ -2901,8 +3097,7 @@ pub const ServerWebSocket = struct {
if (message_value.asArrayBuffer(globalThis)) |buffer| {
if (buffer.len == 0) {
- globalThis.throw("send requires a non-empty message", .{});
- return .zero;
+ return JSValue.jsNumber(0);
}
switch (this.websocket.send(buffer.slice(), .binary, compress, true)) {
@@ -2948,6 +3143,172 @@ pub const ServerWebSocket = struct {
return .zero;
}
+ pub fn sendText(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ callframe: *JSC.CallFrame,
+ ) callconv(.C) JSValue {
+ const args = callframe.arguments(2);
+
+ if (args.len < 1) {
+ log("sendText()", .{});
+ globalThis.throw("sendText requires at least 1 argument", .{});
+ return .zero;
+ }
+
+ if (this.closed) {
+ log("sendText() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ const message_value = args.ptr[0];
+ const compress_value = args.ptr[1];
+
+ const compress = args.len > 1 and compress_value.toBoolean();
+
+ if (message_value.isEmptyOrUndefinedOrNull() or !message_value.isString()) {
+ globalThis.throw("sendText expects a string", .{});
+ return .zero;
+ }
+
+ var string_slice = message_value.toSlice(globalThis, bun.default_allocator);
+ defer string_slice.deinit();
+ if (string_slice.len == 0) {
+ return JSValue.jsNumber(0);
+ }
+
+ const buffer = string_slice.slice();
+ switch (this.websocket.send(buffer, .text, compress, true)) {
+ .backpressure => {
+ log("sendText() backpressure ({d} bytes string)", .{buffer.len});
+ return JSValue.jsNumber(-1);
+ },
+ .success => {
+ log("sendText() success ({d} bytes string)", .{buffer.len});
+ return JSValue.jsNumber(buffer.len);
+ },
+ .dropped => {
+ log("sendText() dropped ({d} bytes string)", .{buffer.len});
+ return JSValue.jsNumber(0);
+ },
+ }
+ }
+
+ pub fn sendTextWithoutTypeChecks(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ message_str: *JSC.JSString,
+ compress: bool,
+ ) callconv(.C) JSValue {
+ if (this.closed) {
+ log("sendText() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ var string_slice = message_str.toSlice(globalThis, bun.default_allocator);
+ defer string_slice.deinit();
+ if (string_slice.len == 0) {
+ return JSValue.jsNumber(0);
+ }
+
+ const buffer = string_slice.slice();
+ switch (this.websocket.send(buffer, .text, compress, true)) {
+ .backpressure => {
+ log("sendText() backpressure ({d} bytes string)", .{buffer.len});
+ return JSValue.jsNumber(-1);
+ },
+ .success => {
+ log("sendText() success ({d} bytes string)", .{buffer.len});
+ return JSValue.jsNumber(buffer.len);
+ },
+ .dropped => {
+ log("sendText() dropped ({d} bytes string)", .{buffer.len});
+ return JSValue.jsNumber(0);
+ },
+ }
+ }
+
+ pub fn sendBinary(
+ this: *ServerWebSocket,
+ globalThis: *JSC.JSGlobalObject,
+ callframe: *JSC.CallFrame,
+ ) callconv(.C) JSValue {
+ const args = callframe.arguments(2);
+
+ if (args.len < 1) {
+ log("sendBinary()", .{});
+ globalThis.throw("sendBinary requires at least 1 argument", .{});
+ return .zero;
+ }
+
+ if (this.closed) {
+ log("sendBinary() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ const message_value = args.ptr[0];
+ const compress_value = args.ptr[1];
+
+ const compress = args.len > 1 and compress_value.toBoolean();
+
+ const buffer = message_value.asArrayBuffer(globalThis) orelse {
+ globalThis.throw("sendBinary requires an ArrayBufferView", .{});
+ return .zero;
+ };
+
+ if (buffer.len == 0) {
+ return JSValue.jsNumber(0);
+ }
+
+ switch (this.websocket.send(buffer.slice(), .binary, compress, true)) {
+ .backpressure => {
+ log("sendBinary() backpressure ({d} bytes)", .{buffer.len});
+ return JSValue.jsNumber(-1);
+ },
+ .success => {
+ log("sendBinary() success ({d} bytes)", .{buffer.len});
+ return JSValue.jsNumber(buffer.slice().len);
+ },
+ .dropped => {
+ log("sendBinary() dropped ({d} bytes)", .{buffer.len});
+ return JSValue.jsNumber(0);
+ },
+ }
+ }
+
+ pub fn sendBinaryWithoutTypeChecks(
+ this: *ServerWebSocket,
+ _: *JSC.JSGlobalObject,
+ array_buffer: *JSC.JSUint8Array,
+ compress: bool,
+ ) callconv(.C) JSValue {
+ if (this.closed) {
+ log("sendBinary() closed", .{});
+ return JSValue.jsNumber(0);
+ }
+
+ const buffer = array_buffer.slice();
+
+ if (buffer.len == 0) {
+ return JSValue.jsNumber(0);
+ }
+
+ switch (this.websocket.send(buffer, .binary, compress, true)) {
+ .backpressure => {
+ log("sendBinary() backpressure ({d} bytes)", .{buffer.len});
+ return JSValue.jsNumber(-1);
+ },
+ .success => {
+ log("sendBinary() success ({d} bytes)", .{buffer.len});
+ return JSValue.jsNumber(buffer.len);
+ },
+ .dropped => {
+ log("sendBinary() dropped ({d} bytes)", .{buffer.len});
+ return JSValue.jsNumber(0);
+ },
+ }
+ }
+
pub fn getData(
_: *ServerWebSocket,
_: *JSC.JSGlobalObject,
@@ -2988,7 +3349,7 @@ pub const ServerWebSocket = struct {
log("close()", .{});
if (this.closed) {
- return .zero;
+ return JSValue.jsUndefined();
}
const code = if (args.len > 0) args.ptr[0].toInt32() else @as(i32, 1000);