diff options
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r-- | src/bun.js/webcore/streams.zig | 332 |
1 files changed, 287 insertions, 45 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 9b8f5dd6f..ff4712041 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1896,12 +1896,222 @@ pub const ArrayBufferSink = struct { pub const JSSink = NewJSSink(@This(), "ArrayBufferSink"); }; +pub const BrotliCompressorSink = struct { + state: ?*bun.brotli.BrotliEncoderState = null, + allocator: std.mem.Allocator, + done: bool = false, + signal: Signal = .{}, + next: ?Sink = null, + output_buffer: bun.ByteList = bun.ByteList{}, + chunk_size: u32 = 16 * 1024, + + pub fn connect(this: *BrotliCompressorSink, signal: Signal) void { + std.debug.assert(this.reader == null); + this.signal = signal; + } + + pub fn start(this: *BrotliCompressorSink, _: StreamStart) JSC.Node.Maybe(void) { + this.output_buffer.len = 0; + + if (this.state == null) { + this.state = bun.brotli.BrotliEncoderState.init(); + } + + // switch (stream_start) { + // .BrotliCompressorSink => |config| { + // if (config.chunk_size > 0) { + // list.ensureTotalCapacityPrecise(config.chunk_size) catch return .{ .err = Syscall.Error.oom }; + // this.bytes.update(list); + // } + + // this.as_uint8array = config.as_uint8array; + // this.streaming = config.stream; + // }, + // else => {}, + // } + + this.done = false; + + this.signal.start(); + return .{ .result = {} }; + } + + pub fn flush(this: *BrotliCompressorSink) JSC.Node.Maybe(void) { + _ = this; + return .{ .result = {} }; + } + + pub fn flushFromJS(this: *BrotliCompressorSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { + _ = wait; + + if (this.output_buffer.len > 0) { + if (this.next) |*next| { + var list = this.output_buffer; + this.output_buffer = bun.ByteList.init(""); + return .{ .result = next.writeBytes(.{ .owned = list }).toJS(globalThis) }; + } + + return .{ .result = JSC.JSValue.jsNumber(this.output_buffer.len) }; + } + + return .{ .result = JSC.JSValue.jsNumber(0) }; + } + + pub fn finalize(this: *BrotliCompressorSink) void { + if (this.state) |state| { + state.deinit(); + } + this.output_buffer.deinitWithAllocator(bun.default_allocator); + this.allocator.destroy(this); + } + + pub fn init(allocator: std.mem.Allocator, next: ?Sink) !*BrotliCompressorSink { + var this = try allocator.create(BrotliCompressorSink); + this.* = BrotliCompressorSink{ + .bytes = bun.ByteList.init(&.{}), + .allocator = allocator, + .next = next, + }; + return this; + } + + pub fn construct( + this: *BrotliCompressorSink, + allocator: std.mem.Allocator, + ) void { + this.* = BrotliCompressorSink{ + .allocator = allocator, + .next = null, + .state = null, + }; + } + + pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { + var state = this.state orelse return .{ .done = {} }; + var initial_slice = data.slice(); + var slice = initial_slice; + if (this.output_buffer.cap == 0) { + this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch { + return .{ .err = Syscall.Error.oom }; + }; + } + var output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap]; + + while (slice.len > 0) { + if (this.output_buffer.cap - this.output_buffer.len < 512) { + this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch { + return .{ .err = Syscall.Error.oom }; + }; + output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap]; + } + const max = output_slice.len; + const res = state.write(&slice, &output_slice); + this.output_buffer.len += @truncate(u32, max - output_slice.len); + + if (res) { + if (this.output_buffer.len > 0) { + if (this.next) |*next| { + var output_buffer = this.output_buffer; + + if (data.isDone()) { + this.output_buffer = .{}; + return next.writeBytes(.{ .owned_and_done = output_buffer }); + } + } + + if (data.isDone()) { + return .{ .owned_and_done = @truncate(Blob.SizeType, initial_slice.len - slice.len) }; + } + } + } else { + @panic("Unhandled brotli error"); + } + } + + return .{ .owned = @truncate(Blob.SizeType, initial_slice.len - slice.len) }; + } + pub const writeBytes = write; + pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable { + if (strings.isAllASCII(data.slice())) { + return this.write(data); + } + + var allocated = strings.allocateLatin1IntoUTF8(bun.default_allocator, []const u8, data.slice()) catch { + return .{ .err = Syscall.Error.oom }; + }; + defer bun.default_allocator.free(allocated); + return this.write(.{ .temporary = bun.ByteList.init(allocated) }); + } + pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.next) |*next| { + return next.writeUTF16(data); + } + var bytes = strings.toUTF8Alloc(bun.default_allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch { + return .{ .err = Syscall.Error.oom }; + }; + defer bun.default_allocator.free(bytes); + return this.write(.{ .temporary = bun.ByteList.init(bytes) }); + } + + pub fn end(this: *BrotliCompressorSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { + if (this.next) |*next| { + return next.end(err); + } + this.signal.close(err); + return .{ .result = {} }; + } + + pub fn toJS(this: *BrotliCompressorSink, globalThis: *JSGlobalObject) JSValue { + return JSSink.createObject(globalThis, this); + } + + pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { + var state = this.state orelse return .{ .result = JSC.JSValue.jsUndefined() }; + var input: []const u8 = ""; + if (this.output_buffer.cap == 0) { + this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch { + return .{ .err = Syscall.Error.oom }; + }; + } + + while (state.hasMoreOutput()) : (this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch @panic("OOM")) { + var output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap]; + const initial_len = output_slice.len; + + if (!state.finish(&input, &output_slice)) { + @panic("Unhandled brotli error"); + } + + this.output_buffer.len += @truncate(u32, initial_len - output_slice.len); + } + + state.deinit(); + this.state = null; + + std.debug.assert(this.next == null); + var list = this.output_buffer.listManaged(this.allocator); + this.output_buffer = bun.ByteList.init(""); + this.done = true; + this.signal.close(null); + return .{ .result = JSC.JSValue.createBuffer(globalThis, list.items, bun.default_allocator) }; + } + + pub fn sink(this: *BrotliCompressorSink) Sink { + return Sink.init(this); + } + + pub const JSSink = NewJSSink(@This(), "BrotliCompressorSink"); +}; + pub const BrotliDecompressorSink = struct { state: ?*bun.brotli.BrotliDecoderState = null, allocator: std.mem.Allocator, done: bool = false, signal: Signal = .{}, streaming: bool = false, + next: ?Sink = null, + output_buffer: bun.ByteList = bun.ByteList{}, + chunk_size: u32 = 16 * 1024, pub fn connect(this: *BrotliDecompressorSink, signal: Signal) void { std.debug.assert(this.reader == null); @@ -1909,7 +2119,7 @@ pub const BrotliDecompressorSink = struct { } pub fn start(this: *BrotliDecompressorSink, _: StreamStart) JSC.Node.Maybe(void) { - this.bytes.len = 0; + this.output_buffer.len = 0; if (this.state) |existing| { if (existing.isUsed()) { @@ -1948,16 +2158,25 @@ pub const BrotliDecompressorSink = struct { pub fn flushFromJS(this: *BrotliDecompressorSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { _ = wait; - _ = globalThis; - _ = this; - return .{ .result = JSValue.jsNumber(0) }; + if (this.output_buffer.len > 0) { + if (this.next) |*next| { + var list = this.output_buffer; + this.output_buffer = bun.ByteList.init(""); + return .{ .result = next.writeBytes(.{ .owned = list }).toJS(globalThis) }; + } + + return .{ .result = JSC.JSValue.jsNumber(this.output_buffer.len) }; + } + + return .{ .result = JSC.JSValue.jsNumber(0) }; } pub fn finalize(this: *BrotliDecompressorSink) void { if (this.state) |state| { state.deinit(); } + this.output_buffer.deinitWithAllocator(bun.default_allocator); this.allocator.destroy(this); } @@ -1984,30 +2203,75 @@ pub const BrotliDecompressorSink = struct { pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { var state = this.state orelse return .{ .done = {} }; - const initial_slice = data.slice(); + var initial_slice = data.slice(); var slice = initial_slice; - state.write(&slice); + if (this.output_buffer.cap == 0) { + this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch { + return .{ .err = Syscall.Error.oom }; + }; + } + var output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap]; + + while (true) { + if (this.output_buffer.cap - this.output_buffer.len < 512) { + this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch { + return .{ .err = Syscall.Error.oom }; + }; + output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap]; + } + const max = output_slice.len; + const res = state.write(&slice, &output_slice); + this.output_buffer.len += @truncate(u32, max - output_slice.len); + + switch (res) { + .success => { + if (this.next) |*next| { + var output_buffer = this.output_buffer; + this.output_buffer = .{}; + return next.writeBytes(.{ .owned_and_done = output_buffer }); + } + this.signal.ready(null, null); + return .{ .owned_and_done = @truncate(Blob.SizeType, initial_slice.len - slice.len) }; + }, + .@"error" => { + return .{ .err = Syscall.Error.oom }; + }, + .needs_more_input => { + this.signal.ready(null, null); + return .{ .owned = @truncate(Blob.SizeType, initial_slice.len - slice.len) }; + }, + + .needs_more_output => { + if (this.next) |*next| { + var output_buffer = this.output_buffer; + this.output_buffer = .{}; + return next.writeBytes(.{ .owned = output_buffer }); + } + }, + } + } } pub const writeBytes = write; pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable { - if (this.next) |*next| { - return next.writeLatin1(data); + if (strings.isAllASCII(data.slice())) { + return this.write(data); } - const len = this.bytes.writeLatin1(this.allocator, data.slice()) catch { + + var allocated = strings.allocateLatin1IntoUTF8(bun.default_allocator, []const u8, data.slice()) catch { return .{ .err = Syscall.Error.oom }; }; - this.signal.ready(null, null); - return .{ .owned = len }; + defer bun.default_allocator.free(allocated); + return this.write(.{ .temporary = bun.ByteList.init(allocated) }); } pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable { if (this.next) |*next| { return next.writeUTF16(data); } - const len = this.bytes.writeUTF16(this.allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch { + var bytes = strings.toUTF8Alloc(bun.default_allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch { return .{ .err = Syscall.Error.oom }; }; - this.signal.ready(null, null); - return .{ .owned = len }; + defer bun.default_allocator.free(bytes); + return this.write(.{ .temporary = bun.ByteList.init(bytes) }); } pub fn end(this: *BrotliDecompressorSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { @@ -2018,44 +2282,22 @@ pub const BrotliDecompressorSink = struct { return .{ .result = {} }; } - pub fn toJS(this: *BrotliDecompressorSink, globalThis: *JSGlobalObject, as_uint8array: bool) JSValue { - if (this.streaming) { - const value: JSValue = switch (as_uint8array) { - true => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .Uint8Array), - false => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .ArrayBuffer), - }; - this.bytes.len = 0; - return value; - } - - var list = this.bytes.listManaged(this.allocator); - this.bytes = bun.ByteList.init(""); - return ArrayBuffer.fromBytes( - try list.toOwnedSlice(), - if (as_uint8array) - .Uint8Array - else - .ArrayBuffer, - ).toJS(globalThis, null); + pub fn toJS(this: *BrotliDecompressorSink, globalThis: *JSGlobalObject) JSValue { + return JSSink.createObject(globalThis, this); } - pub fn endFromJS(this: *BrotliDecompressorSink, _: *JSGlobalObject) JSC.Node.Maybe(ArrayBuffer) { - if (this.done) { - return .{ .result = ArrayBuffer.fromBytes(&[_]u8{}, .ArrayBuffer) }; + pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { + if (this.state) |state| { + state.deinit(); + this.state = null; } std.debug.assert(this.next == null); - var list = this.bytes.listManaged(this.allocator); - this.bytes = bun.ByteList.init(""); + var list = this.output_buffer.listManaged(this.allocator); + this.output_buffer = bun.ByteList.init(""); this.done = true; this.signal.close(null); - return .{ .result = ArrayBuffer.fromBytes( - list.toOwnedSlice() catch @panic("TODO"), - if (this.as_uint8array) - .Uint8Array - else - .ArrayBuffer, - ) }; + return .{ .result = JSC.JSValue.createBuffer(globalThis, list.items, bun.default_allocator) }; } pub fn sink(this: *BrotliDecompressorSink) Sink { |