diff options
author | 2022-06-20 04:26:49 -0700 | |
---|---|---|
committer | 2022-06-22 06:56:47 -0700 | |
commit | 0c12f1684f5dceec0609b3371465c0c653d2b24f (patch) | |
tree | 0dec66c497a41e2b2be96d2f5ba6f1e63be00884 /src | |
parent | 07c010a37768e26afe46a032fcf9904242c03930 (diff) | |
download | bun-0c12f1684f5dceec0609b3371465c0c653d2b24f.tar.gz bun-0c12f1684f5dceec0609b3371465c0c653d2b24f.tar.zst bun-0c12f1684f5dceec0609b3371465c0c653d2b24f.zip |
[websockets] Support receiving data of length 128 - 65354, bigger not supported yet
Diffstat (limited to 'src')
-rw-r--r-- | src/http/websocket_http_client.zig | 197 |
1 files changed, 43 insertions, 154 deletions
diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig index 8522c6038..5dd81f6db 100644 --- a/src/http/websocket_http_client.zig +++ b/src/http/websocket_http_client.zig @@ -383,12 +383,12 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type { return; } - if (!strings.eqlComptime(connection_header.value, "Upgrade")) { + if (!strings.eqlCaseInsensitiveASCII(connection_header.value, "Upgrade", true)) { this.terminate(ErrorCode.invalid_connection_header); return; } - if (!strings.eqlComptime(upgrade_header.value, "websocket")) { + if (!strings.eqlCaseInsensitiveASCII(upgrade_header.value, "websocket", true)) { this.terminate(ErrorCode.invalid_upgrade_header); return; } @@ -426,7 +426,6 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type { this.terminate(ErrorCode.failed_to_write); return; } - std.debug.assert(@intCast(usize, wrote) >= this.to_send.len); this.to_send = this.to_send[@minimum(@intCast(usize, wrote), this.to_send.len)..]; } pub fn handleTimeout( @@ -633,14 +632,14 @@ fn parseWebSocketHeader( // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + // | Payload Data continued ... | // +---------------------------------------------------------------+ - const header = @bitCast(WebsocketHeader, @bitCast(u16, bytes)); + const header = @bitCast(WebsocketHeader, @byteSwap(u16, @bitCast(u16, bytes))); const payload = @as(usize, header.len); payload_length.* = payload; receiving_type.* = header.opcode; is_fragmented.* = switch (header.opcode) { .Continue => true, else => false, - }; + } or !header.final; is_final.* = header.final; need_compression.* = header.compressed; @@ -649,12 +648,14 @@ fn parseWebSocketHeader( } return switch (header.opcode) { - .Text, .Continue, .Binary => switch (payload) { - 0...125 => ReceiveState.need_body, - 126 => ReceiveState.extended_payload_length_16, - 127 => ReceiveState.extended_payload_length_64, - else => unreachable, - }, + .Text, .Continue, .Binary => if (payload <= 125) + return .need_body + else if (payload == 126) + return .extended_payload_length_16 + else if (payload == 127) + return .extended_payload_length_64 + else + return .fail, .Close => ReceiveState.close, .Ping => ReceiveState.ping, .Pong => ReceiveState.pong, @@ -763,7 +764,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { receive_state: ReceiveState = ReceiveState.need_header, receive_header: WebsocketHeader = @bitCast(WebsocketHeader, @as(u16, 0)), - receive_remaining: usize = 0, receiving_type: Opcode = Opcode.ResB, ping_frame_bytes: [128 + 6]u8 = [_]u8{0} ** (128 + 6), @@ -772,12 +772,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { receive_frame: usize = 0, receive_body_remain: usize = 0, receive_pending_chunk_len: usize = 0, - receive_body_buf: ?*BodyBuf = null, - receive_overflow_buffer: std.ArrayListUnmanaged(u8) = .{}, + receive_buffer: bun.LinearFifo(u8, .Dynamic), send_buffer: bun.LinearFifo(u8, .Dynamic), - send_len: usize = 0, - send_off: usize = 0, globalThis: *JSC.JSGlobalObject, @@ -817,7 +814,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { this.clearSendBuffers(true); this.ping_len = 0; this.receive_pending_chunk_len = 0; - this.receive_remaining = 0; } pub fn cancel(this: *WebSocket) callconv(.C) void { @@ -852,14 +848,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { this.fail(code); } - // fn getBody(this: *WebSocket) *BodyBufBytes { - // if (this.send_body_buf == null) { - // this.send_body_buf = BodyBufPool.get(bun.default_allocator); - // } - - // return &this.send_body_buf.?.data; - // } - fn getReceiveBody(this: *WebSocket) *BodyBufBytes { if (this.receive_body_buf == null) { this.receive_body_buf = BodyBufPool.get(bun.default_allocator); @@ -869,32 +857,22 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { } fn clearReceiveBuffers(this: *WebSocket, free: bool) void { - if (this.receive_body_buf) |receive_buf| { - receive_buf.release(); - this.receive_body_buf = null; - } - + this.receive_buffer.discard(this.receive_buffer.count); if (free) { - this.receive_overflow_buffer.clearAndFree(bun.default_allocator); - } else { - this.receive_overflow_buffer.clearRetainingCapacity(); + this.receive_buffer.deinit(); + this.receive_buffer.buf.len = 0; } + this.receive_pending_chunk_len = 0; + this.receive_body_remain = 0; } fn clearSendBuffers(this: *WebSocket, free: bool) void { - // if (this.send_body_buf) |buf| { - // buf.release(); - // this.send_body_buf = null; - // } - - this.send_buffer.discard(this.send_buffer.count); + this.send_buffer.head = 0; + this.send_buffer.count = 0; if (free) { this.send_buffer.deinit(); this.send_buffer.buf.len = 0; } - - this.send_off = 0; - this.send_len = 0; } fn dispatchData(this: *WebSocket, data_: []const u8, kind: Opcode) void { @@ -910,7 +888,6 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { this.terminate(ErrorCode.invalid_utf8); return; }; - defer this.clearReceiveBuffers(false); var outstring = JSC.ZigString.Empty; if (utf16_bytes_) |utf16| { outstring = JSC.ZigString.from16Slice(utf16); @@ -926,66 +903,35 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { .Binary => { JSC.markBinding(); WebSocket__didReceiveBytes(out, data_.ptr, data_.len); - this.clearReceiveBuffers(false); }, else => unreachable, } } - pub fn consume(this: *WebSocket, data_: []const u8, max: usize, kind: Opcode, is_final: bool) usize { + pub fn consume(this: *WebSocket, data_: []const u8, left_in_fragment: usize, kind: Opcode, is_final: bool) usize { std.debug.assert(kind == .Text or kind == .Binary); - std.debug.assert(data_.len <= max); - - const can_dispatch_data = is_final and data_.len == max; + std.debug.assert(data_.len <= left_in_fragment); + std.debug.assert(data_.len > 0); // did all the data fit in the buffer? // we can avoid copying & allocating a temporary buffer - if (can_dispatch_data and this.receive_pending_chunk_len == 0) { + if (is_final and data_.len == left_in_fragment and this.receive_pending_chunk_len == 0) { this.dispatchData(data_, kind); return data_.len; } - // if we previously allocated a buffer and there's room, attempt to use that one - const new_pending_chunk_len = max + this.receive_pending_chunk_len; - if (new_pending_chunk_len <= this.receive_overflow_buffer.capacity) { - @memcpy(this.receive_overflow_buffer.items.ptr + this.receive_overflow_buffer.items.len, data_.ptr, data_.len); - this.receive_overflow_buffer.items.len += data_.len; - if (can_dispatch_data) { - this.dispatchData(this.receive_overflow_buffer.items, kind); - this.receive_pending_chunk_len = 0; - } else { - this.receive_pending_chunk_len = this.receive_overflow_buffer.items.len; - } - return data_.len; - } - - if (new_pending_chunk_len <= body_buf_len) { - // if our previously-allocated buffer is too small or we don't have one, use from the pool - var body = this.getReceiveBody(); - @memcpy(body[this.receive_pending_chunk_len..].ptr, data_.ptr, data_.len); - if (can_dispatch_data) { - this.dispatchData(body[0..new_pending_chunk_len], kind); - this.receive_pending_chunk_len = 0; - } else { - this.receive_pending_chunk_len += data_.len; - } - return data_.len; - } + var writable = this.receive_buffer.writableWithSize(data_.len) catch unreachable; + @memcpy(writable.ptr, data_.ptr, data_.len); + this.receive_buffer.update(data_.len); - { - // we need to copy the data into a potentially large temporary buffer - this.receive_overflow_buffer.appendSlice(bun.default_allocator, data_) catch { - this.terminate(ErrorCode.failed_to_allocate_memory); - return 0; - }; - if (can_dispatch_data) { - this.dispatchData(this.receive_overflow_buffer.items, kind); - this.receive_pending_chunk_len = 0; - } else { - this.receive_pending_chunk_len = this.receive_overflow_buffer.items.len; - } - return data_.len; + if (left_in_fragment > data_.len and left_in_fragment - data_.len - this.receive_pending_chunk_len == 0) { + this.receive_pending_chunk_len = 0; + this.dispatchData(this.receive_buffer.readableSlice(0), kind); + this.clearReceiveBuffers(false); + } else { + this.receive_pending_chunk_len -|= left_in_fragment; } + return data_.len; } pub fn handleData(this: *WebSocket, socket: Socket, data_: []const u8) void { @@ -1054,9 +1000,13 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { &is_final, &need_compression, ); - if (receiving_type == .Text or receiving_type == .Binary) { - last_receive_data_type = receiving_type; - } + + last_receive_data_type = + if (receiving_type == .Text or receiving_type == .Binary) + receiving_type + else + last_receive_data_type; + data = data[2..]; if (receiving_type.isControl() and is_fragmented) { @@ -1221,71 +1171,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { const write_len = bytes.len(&content_byte_len); std.debug.assert(write_len > 0); - // this.send_len += write_len; var writable = this.send_buffer.writableWithSize(write_len) catch unreachable; bytes.copy(this.globalThis, writable[0..write_len], content_byte_len); this.send_buffer.update(write_len); - // const send_end = this.send_off + this.send_len; - // var ring_buffer = &this.send_overflow_buffer; - // const prev_len = this.send_len - write_len; - - // if (send_end <= ring_buffer.capacity) { - // const mid = ring_buffer.capacity / 2; - // if (send_end > mid and this.send_len < mid) { - // std.mem.copyBackwards(u8, ring_buffer.items.ptr[0..prev_len], ring_buffer.items.ptr[this.send_off..ring_buffer.capacity][0..prev_len]); - // this.send_off = 0; - // ring_buffer.items.len = prev_len; - // bytes.copy(this.globalThis, ring_buffer.items.ptr[prev_len..this.send_len], content_byte_len); - // } else { - // bytes.copy(this.globalThis, ring_buffer.items.ptr[send_end - write_len .. send_end], content_byte_len); - // } - // ring_buffer.items.len += write_len; - - // out_buf = ring_buffer.items[this.send_off..][0..this.send_len]; - // } else if (send_end <= body_buf_len) { - // var buf = this.getBody(); - // bytes.copy(this.globalThis, buf[send_end - write_len ..][0..write_len], content_byte_len); - // out_buf = buf[this.send_off..][0..this.send_len]; - // } else { - // if (this.send_body_buf) |send_body| { - // var buf = &send_body.data; - // // transfer existing send buffer to overflow buffer - // ring_buffer.ensureTotalCapacityPrecise(bun.default_allocator, this.send_len) catch { - // this.terminate(ErrorCode.failed_to_allocate_memory); - // return false; - // }; - // @memcpy(ring_buffer.items.ptr, buf[this.send_off..].ptr, prev_len); - // send_body.release(); - // bytes.copy(this.globalThis, (ring_buffer.items.ptr + prev_len)[0..write_len], content_byte_len); - // ring_buffer.items.len = this.send_len; - // this.send_body_buf = null; - // this.send_off = 0; - // } else if (send_end <= ring_buffer.capacity) { - // bytes.copy(this.globalThis, (ring_buffer.items.ptr + (send_end - write_len))[0..write_len], content_byte_len); - // ring_buffer.items.len += write_len; - // } else { - // // we need to re-allocate the array - // ring_buffer.ensureTotalCapacity(bun.default_allocator, this.send_len) catch { - // this.terminate(ErrorCode.failed_to_allocate_memory); - // return false; - // }; - - // const data_to_copy = ring_buffer.items[this.send_off..][0..prev_len]; - // const position_in_slice = ring_buffer.items[0..prev_len]; - // if (data_to_copy.len > 0 and data_to_copy.ptr != position_in_slice.ptr) - // std.mem.copyBackwards( - // u8, - // position_in_slice, - // data_to_copy, - // ); - - // ring_buffer.items.len = this.send_len; - // bytes.copy(this.globalThis, ring_buffer.items[prev_len..], content_byte_len); - // this.send_off = 0; - // } - - // out_buf = ring_buffer.items[this.send_off..][0..this.send_len]; - // } if (do_write) { if (comptime Environment.allow_assert) { @@ -1533,11 +1421,12 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { .tcp = undefined, .outgoing_websocket = outgoing, .globalThis = globalThis, - .receive_overflow_buffer = .{}, .send_buffer = bun.LinearFifo(u8, .Dynamic).init(bun.default_allocator), + .receive_buffer = bun.LinearFifo(u8, .Dynamic).init(bun.default_allocator), }, ) orelse return null; adopted.send_buffer.ensureTotalCapacity(2048) catch return null; + adopted.receive_buffer.ensureTotalCapacity(2048) catch return null; _ = globalThis.bunVM().eventLoop().ready_tasks_count.fetchAdd(1, .Monotonic); return @ptrCast( *anyopaque, |