aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/streams.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r--src/bun.js/webcore/streams.zig332
1 files changed, 287 insertions, 45 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 9b8f5dd6f..ff4712041 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1896,12 +1896,222 @@ pub const ArrayBufferSink = struct {
pub const JSSink = NewJSSink(@This(), "ArrayBufferSink");
};
+pub const BrotliCompressorSink = struct {
+ state: ?*bun.brotli.BrotliEncoderState = null,
+ allocator: std.mem.Allocator,
+ done: bool = false,
+ signal: Signal = .{},
+ next: ?Sink = null,
+ output_buffer: bun.ByteList = bun.ByteList{},
+ chunk_size: u32 = 16 * 1024,
+
+ pub fn connect(this: *BrotliCompressorSink, signal: Signal) void {
+ std.debug.assert(this.reader == null);
+ this.signal = signal;
+ }
+
+ pub fn start(this: *BrotliCompressorSink, _: StreamStart) JSC.Node.Maybe(void) {
+ this.output_buffer.len = 0;
+
+ if (this.state == null) {
+ this.state = bun.brotli.BrotliEncoderState.init();
+ }
+
+ // switch (stream_start) {
+ // .BrotliCompressorSink => |config| {
+ // if (config.chunk_size > 0) {
+ // list.ensureTotalCapacityPrecise(config.chunk_size) catch return .{ .err = Syscall.Error.oom };
+ // this.bytes.update(list);
+ // }
+
+ // this.as_uint8array = config.as_uint8array;
+ // this.streaming = config.stream;
+ // },
+ // else => {},
+ // }
+
+ this.done = false;
+
+ this.signal.start();
+ return .{ .result = {} };
+ }
+
+ pub fn flush(this: *BrotliCompressorSink) JSC.Node.Maybe(void) {
+ _ = this;
+ return .{ .result = {} };
+ }
+
+ pub fn flushFromJS(this: *BrotliCompressorSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
+ _ = wait;
+
+ if (this.output_buffer.len > 0) {
+ if (this.next) |*next| {
+ var list = this.output_buffer;
+ this.output_buffer = bun.ByteList.init("");
+ return .{ .result = next.writeBytes(.{ .owned = list }).toJS(globalThis) };
+ }
+
+ return .{ .result = JSC.JSValue.jsNumber(this.output_buffer.len) };
+ }
+
+ return .{ .result = JSC.JSValue.jsNumber(0) };
+ }
+
+ pub fn finalize(this: *BrotliCompressorSink) void {
+ if (this.state) |state| {
+ state.deinit();
+ }
+ this.output_buffer.deinitWithAllocator(bun.default_allocator);
+ this.allocator.destroy(this);
+ }
+
+ pub fn init(allocator: std.mem.Allocator, next: ?Sink) !*BrotliCompressorSink {
+ var this = try allocator.create(BrotliCompressorSink);
+ this.* = BrotliCompressorSink{
+ .bytes = bun.ByteList.init(&.{}),
+ .allocator = allocator,
+ .next = next,
+ };
+ return this;
+ }
+
+ pub fn construct(
+ this: *BrotliCompressorSink,
+ allocator: std.mem.Allocator,
+ ) void {
+ this.* = BrotliCompressorSink{
+ .allocator = allocator,
+ .next = null,
+ .state = null,
+ };
+ }
+
+ pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
+ var state = this.state orelse return .{ .done = {} };
+ var initial_slice = data.slice();
+ var slice = initial_slice;
+ if (this.output_buffer.cap == 0) {
+ this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ }
+ var output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap];
+
+ while (slice.len > 0) {
+ if (this.output_buffer.cap - this.output_buffer.len < 512) {
+ this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap];
+ }
+ const max = output_slice.len;
+ const res = state.write(&slice, &output_slice);
+ this.output_buffer.len += @truncate(u32, max - output_slice.len);
+
+ if (res) {
+ if (this.output_buffer.len > 0) {
+ if (this.next) |*next| {
+ var output_buffer = this.output_buffer;
+
+ if (data.isDone()) {
+ this.output_buffer = .{};
+ return next.writeBytes(.{ .owned_and_done = output_buffer });
+ }
+ }
+
+ if (data.isDone()) {
+ return .{ .owned_and_done = @truncate(Blob.SizeType, initial_slice.len - slice.len) };
+ }
+ }
+ } else {
+ @panic("Unhandled brotli error");
+ }
+ }
+
+ return .{ .owned = @truncate(Blob.SizeType, initial_slice.len - slice.len) };
+ }
+ pub const writeBytes = write;
+ pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable {
+ if (strings.isAllASCII(data.slice())) {
+ return this.write(data);
+ }
+
+ var allocated = strings.allocateLatin1IntoUTF8(bun.default_allocator, []const u8, data.slice()) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ defer bun.default_allocator.free(allocated);
+ return this.write(.{ .temporary = bun.ByteList.init(allocated) });
+ }
+ pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
+ if (this.next) |*next| {
+ return next.writeUTF16(data);
+ }
+ var bytes = strings.toUTF8Alloc(bun.default_allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ defer bun.default_allocator.free(bytes);
+ return this.write(.{ .temporary = bun.ByteList.init(bytes) });
+ }
+
+ pub fn end(this: *BrotliCompressorSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
+ if (this.next) |*next| {
+ return next.end(err);
+ }
+ this.signal.close(err);
+ return .{ .result = {} };
+ }
+
+ pub fn toJS(this: *BrotliCompressorSink, globalThis: *JSGlobalObject) JSValue {
+ return JSSink.createObject(globalThis, this);
+ }
+
+ pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
+ var state = this.state orelse return .{ .result = JSC.JSValue.jsUndefined() };
+ var input: []const u8 = "";
+ if (this.output_buffer.cap == 0) {
+ this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ }
+
+ while (state.hasMoreOutput()) : (this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch @panic("OOM")) {
+ var output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap];
+ const initial_len = output_slice.len;
+
+ if (!state.finish(&input, &output_slice)) {
+ @panic("Unhandled brotli error");
+ }
+
+ this.output_buffer.len += @truncate(u32, initial_len - output_slice.len);
+ }
+
+ state.deinit();
+ this.state = null;
+
+ std.debug.assert(this.next == null);
+ var list = this.output_buffer.listManaged(this.allocator);
+ this.output_buffer = bun.ByteList.init("");
+ this.done = true;
+ this.signal.close(null);
+ return .{ .result = JSC.JSValue.createBuffer(globalThis, list.items, bun.default_allocator) };
+ }
+
+ pub fn sink(this: *BrotliCompressorSink) Sink {
+ return Sink.init(this);
+ }
+
+ pub const JSSink = NewJSSink(@This(), "BrotliCompressorSink");
+};
+
pub const BrotliDecompressorSink = struct {
state: ?*bun.brotli.BrotliDecoderState = null,
allocator: std.mem.Allocator,
done: bool = false,
signal: Signal = .{},
streaming: bool = false,
+ next: ?Sink = null,
+ output_buffer: bun.ByteList = bun.ByteList{},
+ chunk_size: u32 = 16 * 1024,
pub fn connect(this: *BrotliDecompressorSink, signal: Signal) void {
std.debug.assert(this.reader == null);
@@ -1909,7 +2119,7 @@ pub const BrotliDecompressorSink = struct {
}
pub fn start(this: *BrotliDecompressorSink, _: StreamStart) JSC.Node.Maybe(void) {
- this.bytes.len = 0;
+ this.output_buffer.len = 0;
if (this.state) |existing| {
if (existing.isUsed()) {
@@ -1948,16 +2158,25 @@ pub const BrotliDecompressorSink = struct {
pub fn flushFromJS(this: *BrotliDecompressorSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
_ = wait;
- _ = globalThis;
- _ = this;
- return .{ .result = JSValue.jsNumber(0) };
+ if (this.output_buffer.len > 0) {
+ if (this.next) |*next| {
+ var list = this.output_buffer;
+ this.output_buffer = bun.ByteList.init("");
+ return .{ .result = next.writeBytes(.{ .owned = list }).toJS(globalThis) };
+ }
+
+ return .{ .result = JSC.JSValue.jsNumber(this.output_buffer.len) };
+ }
+
+ return .{ .result = JSC.JSValue.jsNumber(0) };
}
pub fn finalize(this: *BrotliDecompressorSink) void {
if (this.state) |state| {
state.deinit();
}
+ this.output_buffer.deinitWithAllocator(bun.default_allocator);
this.allocator.destroy(this);
}
@@ -1984,30 +2203,75 @@ pub const BrotliDecompressorSink = struct {
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
var state = this.state orelse return .{ .done = {} };
- const initial_slice = data.slice();
+ var initial_slice = data.slice();
var slice = initial_slice;
- state.write(&slice);
+ if (this.output_buffer.cap == 0) {
+ this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ }
+ var output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap];
+
+ while (true) {
+ if (this.output_buffer.cap - this.output_buffer.len < 512) {
+ this.output_buffer.ensureUnusedCapacity(bun.default_allocator, this.chunk_size) catch {
+ return .{ .err = Syscall.Error.oom };
+ };
+ output_slice = this.output_buffer.ptr[this.output_buffer.len..this.output_buffer.cap];
+ }
+ const max = output_slice.len;
+ const res = state.write(&slice, &output_slice);
+ this.output_buffer.len += @truncate(u32, max - output_slice.len);
+
+ switch (res) {
+ .success => {
+ if (this.next) |*next| {
+ var output_buffer = this.output_buffer;
+ this.output_buffer = .{};
+ return next.writeBytes(.{ .owned_and_done = output_buffer });
+ }
+ this.signal.ready(null, null);
+ return .{ .owned_and_done = @truncate(Blob.SizeType, initial_slice.len - slice.len) };
+ },
+ .@"error" => {
+ return .{ .err = Syscall.Error.oom };
+ },
+ .needs_more_input => {
+ this.signal.ready(null, null);
+ return .{ .owned = @truncate(Blob.SizeType, initial_slice.len - slice.len) };
+ },
+
+ .needs_more_output => {
+ if (this.next) |*next| {
+ var output_buffer = this.output_buffer;
+ this.output_buffer = .{};
+ return next.writeBytes(.{ .owned = output_buffer });
+ }
+ },
+ }
+ }
}
pub const writeBytes = write;
pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable {
- if (this.next) |*next| {
- return next.writeLatin1(data);
+ if (strings.isAllASCII(data.slice())) {
+ return this.write(data);
}
- const len = this.bytes.writeLatin1(this.allocator, data.slice()) catch {
+
+ var allocated = strings.allocateLatin1IntoUTF8(bun.default_allocator, []const u8, data.slice()) catch {
return .{ .err = Syscall.Error.oom };
};
- this.signal.ready(null, null);
- return .{ .owned = len };
+ defer bun.default_allocator.free(allocated);
+ return this.write(.{ .temporary = bun.ByteList.init(allocated) });
}
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.next) |*next| {
return next.writeUTF16(data);
}
- const len = this.bytes.writeUTF16(this.allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch {
+ var bytes = strings.toUTF8Alloc(bun.default_allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch {
return .{ .err = Syscall.Error.oom };
};
- this.signal.ready(null, null);
- return .{ .owned = len };
+ defer bun.default_allocator.free(bytes);
+ return this.write(.{ .temporary = bun.ByteList.init(bytes) });
}
pub fn end(this: *BrotliDecompressorSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
@@ -2018,44 +2282,22 @@ pub const BrotliDecompressorSink = struct {
return .{ .result = {} };
}
- pub fn toJS(this: *BrotliDecompressorSink, globalThis: *JSGlobalObject, as_uint8array: bool) JSValue {
- if (this.streaming) {
- const value: JSValue = switch (as_uint8array) {
- true => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .Uint8Array),
- false => JSC.ArrayBuffer.create(globalThis, this.bytes.slice(), .ArrayBuffer),
- };
- this.bytes.len = 0;
- return value;
- }
-
- var list = this.bytes.listManaged(this.allocator);
- this.bytes = bun.ByteList.init("");
- return ArrayBuffer.fromBytes(
- try list.toOwnedSlice(),
- if (as_uint8array)
- .Uint8Array
- else
- .ArrayBuffer,
- ).toJS(globalThis, null);
+ pub fn toJS(this: *BrotliDecompressorSink, globalThis: *JSGlobalObject) JSValue {
+ return JSSink.createObject(globalThis, this);
}
- pub fn endFromJS(this: *BrotliDecompressorSink, _: *JSGlobalObject) JSC.Node.Maybe(ArrayBuffer) {
- if (this.done) {
- return .{ .result = ArrayBuffer.fromBytes(&[_]u8{}, .ArrayBuffer) };
+ pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
+ if (this.state) |state| {
+ state.deinit();
+ this.state = null;
}
std.debug.assert(this.next == null);
- var list = this.bytes.listManaged(this.allocator);
- this.bytes = bun.ByteList.init("");
+ var list = this.output_buffer.listManaged(this.allocator);
+ this.output_buffer = bun.ByteList.init("");
this.done = true;
this.signal.close(null);
- return .{ .result = ArrayBuffer.fromBytes(
- list.toOwnedSlice() catch @panic("TODO"),
- if (this.as_uint8array)
- .Uint8Array
- else
- .ArrayBuffer,
- ) };
+ return .{ .result = JSC.JSValue.createBuffer(globalThis, list.items, bun.default_allocator) };
}
pub fn sink(this: *BrotliDecompressorSink) Sink {