diff options
Diffstat (limited to 'src/bun.js/api/server.zig')
-rw-r--r-- | src/bun.js/api/server.zig | 371 |
1 files changed, 366 insertions, 5 deletions
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index b4a351089..30ba4d714 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -2542,6 +2542,7 @@ pub const ServerWebSocket = struct { websocket: uws.AnyWebSocket = undefined, closed: bool = false, binary_type: JSC.JSValue.JSType = .Uint8Array, + opened: bool = false, pub usingnamespace JSC.Codegen.JSServerWebSocket; @@ -2562,6 +2563,7 @@ pub const ServerWebSocket = struct { const onOpenHandler = handler.onOpen; this.this_value = .zero; + this.opened = false; if (value_to_cache != .zero) { const current_this = this.getThisValue(); ServerWebSocket.dataSetCached(current_this, globalObject, value_to_cache); @@ -2579,10 +2581,15 @@ pub const ServerWebSocket = struct { const error_handler = handler.onError; ws.cork(&corker, Corker.run); const result = corker.result; + this.opened = true; if (result.isAnyError(globalObject)) { log("onOpen exception", .{}); - ws.close(); + if (!this.closed) { + this.closed = true; + this.websocket.end(1000, ""); + } + _ = ServerWebSocket.dangerouslySetPtr(this_value, null); handler.active_connections -|= 1; this_value.unprotect(); @@ -2808,7 +2815,7 @@ pub const ServerWebSocket = struct { return JSValue.jsNumber( // if 0, return 0 // else return number of bytes sent - @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer.slice(), .text, compress))) * @intCast(i32, @truncate(u31, buffer.len)), + @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer.slice(), .binary, compress))) * @intCast(i32, @truncate(u31, buffer.len)), ); } @@ -2830,6 +2837,195 @@ pub const ServerWebSocket = struct { return .zero; } + pub fn publishText( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + const args = callframe.arguments(4); + + if (args.len < 1) { + log("publish()", .{}); + globalThis.throw("publish requires at least 1 argument", .{}); + return .zero; + } + + if (this.closed) { + log("publish() closed", .{}); + return JSValue.jsNumber(0); + } + + const topic_value = args.ptr[0]; + const message_value = args.ptr[1]; + const compress_value = args.ptr[2]; + + if (topic_value.isEmptyOrUndefinedOrNull() or !topic_value.isString()) { + log("publish() topic invalid", .{}); + globalThis.throw("publishText requires a topic string", .{}); + return .zero; + } + + var topic_slice = topic_value.toSlice(globalThis, bun.default_allocator); + defer topic_slice.deinit(); + if (topic_slice.len == 0) { + globalThis.throw("publishText requires a non-empty topic", .{}); + return .zero; + } + + const compress = args.len > 1 and compress_value.toBoolean(); + + if (message_value.isEmptyOrUndefinedOrNull() or !message_value.isString()) { + globalThis.throw("publishText requires a non-empty message", .{}); + return .zero; + } + + var string_slice = message_value.toSlice(globalThis, bun.default_allocator); + defer string_slice.deinit(); + if (string_slice.len == 0) { + return JSValue.jsNumber(0); + } + + const buffer = string_slice.slice(); + return JSValue.jsNumber( + // if 0, return 0 + // else return number of bytes sent + @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer, .text, compress))) * @intCast(i32, @truncate(u31, buffer.len)), + ); + } + + pub fn publishBinary( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + const args = callframe.arguments(4); + + if (args.len < 1) { + log("publishBinary()", .{}); + globalThis.throw("publishBinary requires at least 1 argument", .{}); + return .zero; + } + + if (this.closed) { + log("publishBinary() closed", .{}); + return JSValue.jsNumber(0); + } + + const topic_value = args.ptr[0]; + const message_value = args.ptr[1]; + const compress_value = args.ptr[2]; + + if (topic_value.isEmptyOrUndefinedOrNull() or !topic_value.isString()) { + log("publishBinary() topic invalid", .{}); + globalThis.throw("publishBinary requires a topic string", .{}); + return .zero; + } + + var topic_slice = topic_value.toSlice(globalThis, bun.default_allocator); + defer topic_slice.deinit(); + if (topic_slice.len == 0) { + globalThis.throw("publishBinary requires a non-empty topic", .{}); + return .zero; + } + + const compress = args.len > 1 and compress_value.toBoolean(); + + if (message_value.isEmptyOrUndefinedOrNull()) { + globalThis.throw("publishBinary requires a non-empty message", .{}); + return .zero; + } + const buffer = message_value.asArrayBuffer(globalThis) orelse { + globalThis.throw("publishBinary expects an ArrayBufferView", .{}); + return .zero; + }; + + if (buffer.len == 0) { + return JSC.JSValue.jsNumber(0); + } + + return JSValue.jsNumber( + // if 0, return 0 + // else return number of bytes sent + @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), buffer.slice(), .binary, compress))) * @intCast(i32, @truncate(u31, buffer.len)), + ); + } + + pub fn publishBinaryWithoutTypeChecks( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + topic_str: *JSC.JSString, + buffer: *JSC.JSUint8Array, + ) callconv(.C) JSC.JSValue { + if (this.closed) { + log("publishBinary() closed", .{}); + return JSValue.jsNumber(0); + } + + var topic_slice = topic_str.toSlice(globalThis, bun.default_allocator); + defer topic_slice.deinit(); + if (topic_slice.len == 0) { + globalThis.throw("publishBinary requires a non-empty topic", .{}); + return .zero; + } + + const compress = true; + + const slice = buffer.slice(); + if (slice.len == 0) { + return JSC.JSValue.jsNumber(0); + } + + return JSValue.jsNumber( + // if 0, return 0 + // else return number of bytes sent + @as(i32, @boolToInt(this.websocket.publishWithOptions(this.handler.app, topic_slice.slice(), slice, .binary, compress))) * @intCast( + i32, + @truncate(u31, slice.len), + ), + ); + } + + pub fn publishTextWithoutTypeChecks( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + topic_str: *JSC.JSString, + str: *JSC.JSString, + ) callconv(.C) JSC.JSValue { + if (this.closed) { + log("publishBinary() closed", .{}); + return JSValue.jsNumber(0); + } + + var topic_slice = topic_str.toSlice(globalThis, bun.default_allocator); + defer topic_slice.deinit(); + if (topic_slice.len == 0) { + globalThis.throw("publishBinary requires a non-empty topic", .{}); + return .zero; + } + + const compress = true; + + const slice = str.toSlice(globalThis, bun.default_allocator); + if (slice.len == 0) { + return JSC.JSValue.jsNumber(0); + } + + return JSValue.jsNumber( + // if 0, return 0 + // else return number of bytes sent + @as(i32, @boolToInt(this.websocket.publishWithOptions( + this.handler.app, + topic_slice.slice(), + slice.slice(), + .text, + compress, + ))) * @intCast( + i32, + @truncate(u31, slice.len), + ), + ); + } + pub fn cork( this: *ServerWebSocket, globalThis: *JSC.JSGlobalObject, @@ -2901,8 +3097,7 @@ pub const ServerWebSocket = struct { if (message_value.asArrayBuffer(globalThis)) |buffer| { if (buffer.len == 0) { - globalThis.throw("send requires a non-empty message", .{}); - return .zero; + return JSValue.jsNumber(0); } switch (this.websocket.send(buffer.slice(), .binary, compress, true)) { @@ -2948,6 +3143,172 @@ pub const ServerWebSocket = struct { return .zero; } + pub fn sendText( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + const args = callframe.arguments(2); + + if (args.len < 1) { + log("sendText()", .{}); + globalThis.throw("sendText requires at least 1 argument", .{}); + return .zero; + } + + if (this.closed) { + log("sendText() closed", .{}); + return JSValue.jsNumber(0); + } + + const message_value = args.ptr[0]; + const compress_value = args.ptr[1]; + + const compress = args.len > 1 and compress_value.toBoolean(); + + if (message_value.isEmptyOrUndefinedOrNull() or !message_value.isString()) { + globalThis.throw("sendText expects a string", .{}); + return .zero; + } + + var string_slice = message_value.toSlice(globalThis, bun.default_allocator); + defer string_slice.deinit(); + if (string_slice.len == 0) { + return JSValue.jsNumber(0); + } + + const buffer = string_slice.slice(); + switch (this.websocket.send(buffer, .text, compress, true)) { + .backpressure => { + log("sendText() backpressure ({d} bytes string)", .{buffer.len}); + return JSValue.jsNumber(-1); + }, + .success => { + log("sendText() success ({d} bytes string)", .{buffer.len}); + return JSValue.jsNumber(buffer.len); + }, + .dropped => { + log("sendText() dropped ({d} bytes string)", .{buffer.len}); + return JSValue.jsNumber(0); + }, + } + } + + pub fn sendTextWithoutTypeChecks( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + message_str: *JSC.JSString, + compress: bool, + ) callconv(.C) JSValue { + if (this.closed) { + log("sendText() closed", .{}); + return JSValue.jsNumber(0); + } + + var string_slice = message_str.toSlice(globalThis, bun.default_allocator); + defer string_slice.deinit(); + if (string_slice.len == 0) { + return JSValue.jsNumber(0); + } + + const buffer = string_slice.slice(); + switch (this.websocket.send(buffer, .text, compress, true)) { + .backpressure => { + log("sendText() backpressure ({d} bytes string)", .{buffer.len}); + return JSValue.jsNumber(-1); + }, + .success => { + log("sendText() success ({d} bytes string)", .{buffer.len}); + return JSValue.jsNumber(buffer.len); + }, + .dropped => { + log("sendText() dropped ({d} bytes string)", .{buffer.len}); + return JSValue.jsNumber(0); + }, + } + } + + pub fn sendBinary( + this: *ServerWebSocket, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSValue { + const args = callframe.arguments(2); + + if (args.len < 1) { + log("sendBinary()", .{}); + globalThis.throw("sendBinary requires at least 1 argument", .{}); + return .zero; + } + + if (this.closed) { + log("sendBinary() closed", .{}); + return JSValue.jsNumber(0); + } + + const message_value = args.ptr[0]; + const compress_value = args.ptr[1]; + + const compress = args.len > 1 and compress_value.toBoolean(); + + const buffer = message_value.asArrayBuffer(globalThis) orelse { + globalThis.throw("sendBinary requires an ArrayBufferView", .{}); + return .zero; + }; + + if (buffer.len == 0) { + return JSValue.jsNumber(0); + } + + switch (this.websocket.send(buffer.slice(), .binary, compress, true)) { + .backpressure => { + log("sendBinary() backpressure ({d} bytes)", .{buffer.len}); + return JSValue.jsNumber(-1); + }, + .success => { + log("sendBinary() success ({d} bytes)", .{buffer.len}); + return JSValue.jsNumber(buffer.slice().len); + }, + .dropped => { + log("sendBinary() dropped ({d} bytes)", .{buffer.len}); + return JSValue.jsNumber(0); + }, + } + } + + pub fn sendBinaryWithoutTypeChecks( + this: *ServerWebSocket, + _: *JSC.JSGlobalObject, + array_buffer: *JSC.JSUint8Array, + compress: bool, + ) callconv(.C) JSValue { + if (this.closed) { + log("sendBinary() closed", .{}); + return JSValue.jsNumber(0); + } + + const buffer = array_buffer.slice(); + + if (buffer.len == 0) { + return JSValue.jsNumber(0); + } + + switch (this.websocket.send(buffer, .binary, compress, true)) { + .backpressure => { + log("sendBinary() backpressure ({d} bytes)", .{buffer.len}); + return JSValue.jsNumber(-1); + }, + .success => { + log("sendBinary() success ({d} bytes)", .{buffer.len}); + return JSValue.jsNumber(buffer.len); + }, + .dropped => { + log("sendBinary() dropped ({d} bytes)", .{buffer.len}); + return JSValue.jsNumber(0); + }, + } + } + pub fn getData( _: *ServerWebSocket, _: *JSC.JSGlobalObject, @@ -2988,7 +3349,7 @@ pub const ServerWebSocket = struct { log("close()", .{}); if (this.closed) { - return .zero; + return JSValue.jsUndefined(); } const code = if (args.len > 0) args.ptr[0].toInt32() else @as(i32, 1000); |