aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-20 04:26:49 -0700
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-06-22 06:56:47 -0700
commit0c12f1684f5dceec0609b3371465c0c653d2b24f (patch)
tree0dec66c497a41e2b2be96d2f5ba6f1e63be00884
parent07c010a37768e26afe46a032fcf9904242c03930 (diff)
downloadbun-0c12f1684f5dceec0609b3371465c0c653d2b24f.tar.gz
bun-0c12f1684f5dceec0609b3371465c0c653d2b24f.tar.zst
bun-0c12f1684f5dceec0609b3371465c0c653d2b24f.zip
[websockets] Support receiving data of length 128 - 65354, bigger not supported yet
-rw-r--r--src/http/websocket_http_client.zig197
1 files changed, 43 insertions, 154 deletions
diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig
index 8522c6038..5dd81f6db 100644
--- a/src/http/websocket_http_client.zig
+++ b/src/http/websocket_http_client.zig
@@ -383,12 +383,12 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type {
return;
}
- if (!strings.eqlComptime(connection_header.value, "Upgrade")) {
+ if (!strings.eqlCaseInsensitiveASCII(connection_header.value, "Upgrade", true)) {
this.terminate(ErrorCode.invalid_connection_header);
return;
}
- if (!strings.eqlComptime(upgrade_header.value, "websocket")) {
+ if (!strings.eqlCaseInsensitiveASCII(upgrade_header.value, "websocket", true)) {
this.terminate(ErrorCode.invalid_upgrade_header);
return;
}
@@ -426,7 +426,6 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type {
this.terminate(ErrorCode.failed_to_write);
return;
}
- std.debug.assert(@intCast(usize, wrote) >= this.to_send.len);
this.to_send = this.to_send[@minimum(@intCast(usize, wrote), this.to_send.len)..];
}
pub fn handleTimeout(
@@ -633,14 +632,14 @@ fn parseWebSocketHeader(
// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
// | Payload Data continued ... |
// +---------------------------------------------------------------+
- const header = @bitCast(WebsocketHeader, @bitCast(u16, bytes));
+ const header = @bitCast(WebsocketHeader, @byteSwap(u16, @bitCast(u16, bytes)));
const payload = @as(usize, header.len);
payload_length.* = payload;
receiving_type.* = header.opcode;
is_fragmented.* = switch (header.opcode) {
.Continue => true,
else => false,
- };
+ } or !header.final;
is_final.* = header.final;
need_compression.* = header.compressed;
@@ -649,12 +648,14 @@ fn parseWebSocketHeader(
}
return switch (header.opcode) {
- .Text, .Continue, .Binary => switch (payload) {
- 0...125 => ReceiveState.need_body,
- 126 => ReceiveState.extended_payload_length_16,
- 127 => ReceiveState.extended_payload_length_64,
- else => unreachable,
- },
+ .Text, .Continue, .Binary => if (payload <= 125)
+ return .need_body
+ else if (payload == 126)
+ return .extended_payload_length_16
+ else if (payload == 127)
+ return .extended_payload_length_64
+ else
+ return .fail,
.Close => ReceiveState.close,
.Ping => ReceiveState.ping,
.Pong => ReceiveState.pong,
@@ -763,7 +764,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
receive_state: ReceiveState = ReceiveState.need_header,
receive_header: WebsocketHeader = @bitCast(WebsocketHeader, @as(u16, 0)),
- receive_remaining: usize = 0,
receiving_type: Opcode = Opcode.ResB,
ping_frame_bytes: [128 + 6]u8 = [_]u8{0} ** (128 + 6),
@@ -772,12 +772,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
receive_frame: usize = 0,
receive_body_remain: usize = 0,
receive_pending_chunk_len: usize = 0,
- receive_body_buf: ?*BodyBuf = null,
- receive_overflow_buffer: std.ArrayListUnmanaged(u8) = .{},
+ receive_buffer: bun.LinearFifo(u8, .Dynamic),
send_buffer: bun.LinearFifo(u8, .Dynamic),
- send_len: usize = 0,
- send_off: usize = 0,
globalThis: *JSC.JSGlobalObject,
@@ -817,7 +814,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
this.clearSendBuffers(true);
this.ping_len = 0;
this.receive_pending_chunk_len = 0;
- this.receive_remaining = 0;
}
pub fn cancel(this: *WebSocket) callconv(.C) void {
@@ -852,14 +848,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
this.fail(code);
}
- // fn getBody(this: *WebSocket) *BodyBufBytes {
- // if (this.send_body_buf == null) {
- // this.send_body_buf = BodyBufPool.get(bun.default_allocator);
- // }
-
- // return &this.send_body_buf.?.data;
- // }
-
fn getReceiveBody(this: *WebSocket) *BodyBufBytes {
if (this.receive_body_buf == null) {
this.receive_body_buf = BodyBufPool.get(bun.default_allocator);
@@ -869,32 +857,22 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
}
fn clearReceiveBuffers(this: *WebSocket, free: bool) void {
- if (this.receive_body_buf) |receive_buf| {
- receive_buf.release();
- this.receive_body_buf = null;
- }
-
+ this.receive_buffer.discard(this.receive_buffer.count);
if (free) {
- this.receive_overflow_buffer.clearAndFree(bun.default_allocator);
- } else {
- this.receive_overflow_buffer.clearRetainingCapacity();
+ this.receive_buffer.deinit();
+ this.receive_buffer.buf.len = 0;
}
+ this.receive_pending_chunk_len = 0;
+ this.receive_body_remain = 0;
}
fn clearSendBuffers(this: *WebSocket, free: bool) void {
- // if (this.send_body_buf) |buf| {
- // buf.release();
- // this.send_body_buf = null;
- // }
-
- this.send_buffer.discard(this.send_buffer.count);
+ this.send_buffer.head = 0;
+ this.send_buffer.count = 0;
if (free) {
this.send_buffer.deinit();
this.send_buffer.buf.len = 0;
}
-
- this.send_off = 0;
- this.send_len = 0;
}
fn dispatchData(this: *WebSocket, data_: []const u8, kind: Opcode) void {
@@ -910,7 +888,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
this.terminate(ErrorCode.invalid_utf8);
return;
};
- defer this.clearReceiveBuffers(false);
var outstring = JSC.ZigString.Empty;
if (utf16_bytes_) |utf16| {
outstring = JSC.ZigString.from16Slice(utf16);
@@ -926,66 +903,35 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
.Binary => {
JSC.markBinding();
WebSocket__didReceiveBytes(out, data_.ptr, data_.len);
- this.clearReceiveBuffers(false);
},
else => unreachable,
}
}
- pub fn consume(this: *WebSocket, data_: []const u8, max: usize, kind: Opcode, is_final: bool) usize {
+ pub fn consume(this: *WebSocket, data_: []const u8, left_in_fragment: usize, kind: Opcode, is_final: bool) usize {
std.debug.assert(kind == .Text or kind == .Binary);
- std.debug.assert(data_.len <= max);
-
- const can_dispatch_data = is_final and data_.len == max;
+ std.debug.assert(data_.len <= left_in_fragment);
+ std.debug.assert(data_.len > 0);
// did all the data fit in the buffer?
// we can avoid copying & allocating a temporary buffer
- if (can_dispatch_data and this.receive_pending_chunk_len == 0) {
+ if (is_final and data_.len == left_in_fragment and this.receive_pending_chunk_len == 0) {
this.dispatchData(data_, kind);
return data_.len;
}
- // if we previously allocated a buffer and there's room, attempt to use that one
- const new_pending_chunk_len = max + this.receive_pending_chunk_len;
- if (new_pending_chunk_len <= this.receive_overflow_buffer.capacity) {
- @memcpy(this.receive_overflow_buffer.items.ptr + this.receive_overflow_buffer.items.len, data_.ptr, data_.len);
- this.receive_overflow_buffer.items.len += data_.len;
- if (can_dispatch_data) {
- this.dispatchData(this.receive_overflow_buffer.items, kind);
- this.receive_pending_chunk_len = 0;
- } else {
- this.receive_pending_chunk_len = this.receive_overflow_buffer.items.len;
- }
- return data_.len;
- }
-
- if (new_pending_chunk_len <= body_buf_len) {
- // if our previously-allocated buffer is too small or we don't have one, use from the pool
- var body = this.getReceiveBody();
- @memcpy(body[this.receive_pending_chunk_len..].ptr, data_.ptr, data_.len);
- if (can_dispatch_data) {
- this.dispatchData(body[0..new_pending_chunk_len], kind);
- this.receive_pending_chunk_len = 0;
- } else {
- this.receive_pending_chunk_len += data_.len;
- }
- return data_.len;
- }
+ var writable = this.receive_buffer.writableWithSize(data_.len) catch unreachable;
+ @memcpy(writable.ptr, data_.ptr, data_.len);
+ this.receive_buffer.update(data_.len);
- {
- // we need to copy the data into a potentially large temporary buffer
- this.receive_overflow_buffer.appendSlice(bun.default_allocator, data_) catch {
- this.terminate(ErrorCode.failed_to_allocate_memory);
- return 0;
- };
- if (can_dispatch_data) {
- this.dispatchData(this.receive_overflow_buffer.items, kind);
- this.receive_pending_chunk_len = 0;
- } else {
- this.receive_pending_chunk_len = this.receive_overflow_buffer.items.len;
- }
- return data_.len;
+ if (left_in_fragment > data_.len and left_in_fragment - data_.len - this.receive_pending_chunk_len == 0) {
+ this.receive_pending_chunk_len = 0;
+ this.dispatchData(this.receive_buffer.readableSlice(0), kind);
+ this.clearReceiveBuffers(false);
+ } else {
+ this.receive_pending_chunk_len -|= left_in_fragment;
}
+ return data_.len;
}
pub fn handleData(this: *WebSocket, socket: Socket, data_: []const u8) void {
@@ -1054,9 +1000,13 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
&is_final,
&need_compression,
);
- if (receiving_type == .Text or receiving_type == .Binary) {
- last_receive_data_type = receiving_type;
- }
+
+ last_receive_data_type =
+ if (receiving_type == .Text or receiving_type == .Binary)
+ receiving_type
+ else
+ last_receive_data_type;
+
data = data[2..];
if (receiving_type.isControl() and is_fragmented) {
@@ -1221,71 +1171,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
const write_len = bytes.len(&content_byte_len);
std.debug.assert(write_len > 0);
- // this.send_len += write_len;
var writable = this.send_buffer.writableWithSize(write_len) catch unreachable;
bytes.copy(this.globalThis, writable[0..write_len], content_byte_len);
this.send_buffer.update(write_len);
- // const send_end = this.send_off + this.send_len;
- // var ring_buffer = &this.send_overflow_buffer;
- // const prev_len = this.send_len - write_len;
-
- // if (send_end <= ring_buffer.capacity) {
- // const mid = ring_buffer.capacity / 2;
- // if (send_end > mid and this.send_len < mid) {
- // std.mem.copyBackwards(u8, ring_buffer.items.ptr[0..prev_len], ring_buffer.items.ptr[this.send_off..ring_buffer.capacity][0..prev_len]);
- // this.send_off = 0;
- // ring_buffer.items.len = prev_len;
- // bytes.copy(this.globalThis, ring_buffer.items.ptr[prev_len..this.send_len], content_byte_len);
- // } else {
- // bytes.copy(this.globalThis, ring_buffer.items.ptr[send_end - write_len .. send_end], content_byte_len);
- // }
- // ring_buffer.items.len += write_len;
-
- // out_buf = ring_buffer.items[this.send_off..][0..this.send_len];
- // } else if (send_end <= body_buf_len) {
- // var buf = this.getBody();
- // bytes.copy(this.globalThis, buf[send_end - write_len ..][0..write_len], content_byte_len);
- // out_buf = buf[this.send_off..][0..this.send_len];
- // } else {
- // if (this.send_body_buf) |send_body| {
- // var buf = &send_body.data;
- // // transfer existing send buffer to overflow buffer
- // ring_buffer.ensureTotalCapacityPrecise(bun.default_allocator, this.send_len) catch {
- // this.terminate(ErrorCode.failed_to_allocate_memory);
- // return false;
- // };
- // @memcpy(ring_buffer.items.ptr, buf[this.send_off..].ptr, prev_len);
- // send_body.release();
- // bytes.copy(this.globalThis, (ring_buffer.items.ptr + prev_len)[0..write_len], content_byte_len);
- // ring_buffer.items.len = this.send_len;
- // this.send_body_buf = null;
- // this.send_off = 0;
- // } else if (send_end <= ring_buffer.capacity) {
- // bytes.copy(this.globalThis, (ring_buffer.items.ptr + (send_end - write_len))[0..write_len], content_byte_len);
- // ring_buffer.items.len += write_len;
- // } else {
- // // we need to re-allocate the array
- // ring_buffer.ensureTotalCapacity(bun.default_allocator, this.send_len) catch {
- // this.terminate(ErrorCode.failed_to_allocate_memory);
- // return false;
- // };
-
- // const data_to_copy = ring_buffer.items[this.send_off..][0..prev_len];
- // const position_in_slice = ring_buffer.items[0..prev_len];
- // if (data_to_copy.len > 0 and data_to_copy.ptr != position_in_slice.ptr)
- // std.mem.copyBackwards(
- // u8,
- // position_in_slice,
- // data_to_copy,
- // );
-
- // ring_buffer.items.len = this.send_len;
- // bytes.copy(this.globalThis, ring_buffer.items[prev_len..], content_byte_len);
- // this.send_off = 0;
- // }
-
- // out_buf = ring_buffer.items[this.send_off..][0..this.send_len];
- // }
if (do_write) {
if (comptime Environment.allow_assert) {
@@ -1533,11 +1421,12 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
.tcp = undefined,
.outgoing_websocket = outgoing,
.globalThis = globalThis,
- .receive_overflow_buffer = .{},
.send_buffer = bun.LinearFifo(u8, .Dynamic).init(bun.default_allocator),
+ .receive_buffer = bun.LinearFifo(u8, .Dynamic).init(bun.default_allocator),
},
) orelse return null;
adopted.send_buffer.ensureTotalCapacity(2048) catch return null;
+ adopted.receive_buffer.ensureTotalCapacity(2048) catch return null;
_ = globalThis.bunVM().eventLoop().ready_tasks_count.fetchAdd(1, .Monotonic);
return @ptrCast(
*anyopaque,