aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/streams.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r--src/bun.js/webcore/streams.zig130
1 files changed, 65 insertions, 65 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 176a96655..c355ae13d 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -433,7 +433,7 @@ pub const StreamStart = union(Tag) {
if (value.get(globalThis, "highWaterMark")) |chunkSize| {
empty = false;
- chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @maximum(0, @truncate(i51, chunkSize.toInt64())));
+ chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @max(0, @truncate(i51, chunkSize.toInt64())));
}
if (!empty) {
@@ -450,7 +450,7 @@ pub const StreamStart = union(Tag) {
var chunk_size: JSC.WebCore.Blob.SizeType = 0;
if (value.get(globalThis, "highWaterMark")) |chunkSize| {
- chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @maximum(0, @truncate(i51, chunkSize.toInt64())));
+ chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @max(0, @truncate(i51, chunkSize.toInt64())));
}
if (value.get(globalThis, "path")) |path| {
@@ -486,7 +486,7 @@ pub const StreamStart = union(Tag) {
if (value.get(globalThis, "highWaterMark")) |chunkSize| {
empty = false;
- chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @maximum(256, @truncate(i51, chunkSize.toInt64())));
+ chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @max(256, @truncate(i51, chunkSize.toInt64())));
}
if (!empty) {
@@ -585,14 +585,14 @@ pub const StreamResult = union(Tag) {
ctx: *anyopaque,
handler: Fn,
- pub const Fn = fn (ctx: *anyopaque, result: StreamResult.Writable) void;
+ pub const Fn = *const fn (ctx: *anyopaque, result: StreamResult.Writable) void;
pub fn init(this: *Handler, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult.Writable) void) void {
this.ctx = ctx;
this.handler = struct {
const handler = handler_fn;
pub fn onHandle(ctx_: *anyopaque, result: StreamResult.Writable) void {
- @call(.{ .modifier = .always_inline }, handler, .{ bun.cast(*Context, ctx_), result });
+ @call(.always_inline, handler, .{ bun.cast(*Context, ctx_), result });
}
}.onHandle;
}
@@ -708,14 +708,14 @@ pub const StreamResult = union(Tag) {
ctx: *anyopaque,
handler: Fn,
- pub const Fn = fn (ctx: *anyopaque, result: StreamResult) void;
+ pub const Fn = *const fn (ctx: *anyopaque, result: StreamResult) void;
pub fn init(this: *Handler, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void {
this.ctx = ctx;
this.handler = struct {
const handler = handler_fn;
pub fn onHandle(ctx_: *anyopaque, result: StreamResult) void {
- @call(.{ .modifier = .always_inline }, handler, .{ bun.cast(*Context, ctx_), result });
+ @call(.always_inline, handler, .{ bun.cast(*Context, ctx_), result });
}
}.onHandle;
}
@@ -852,9 +852,9 @@ pub const Signal = struct {
}
pub const VTable = struct {
- pub const OnCloseFn = fn (this: *anyopaque, err: ?Syscall.Error) void;
- pub const OnReadyFn = fn (this: *anyopaque, amount: ?Blob.SizeType, offset: ?Blob.SizeType) void;
- pub const OnStartFn = fn (this: *anyopaque) void;
+ pub const OnCloseFn = *const (fn (this: *anyopaque, err: ?Syscall.Error) void);
+ pub const OnReadyFn = *const (fn (this: *anyopaque, amount: ?Blob.SizeType, offset: ?Blob.SizeType) void);
+ pub const OnStartFn = *const (fn (this: *anyopaque) void);
close: OnCloseFn,
ready: OnReadyFn,
start: OnStartFn,
@@ -1007,11 +1007,11 @@ pub const Sink = struct {
};
pub const VTable = struct {
- pub const WriteUTF16Fn = fn (this: *anyopaque, data: StreamResult) StreamResult.Writable;
- pub const WriteUTF8Fn = fn (this: *anyopaque, data: StreamResult) StreamResult.Writable;
- pub const WriteLatin1Fn = fn (this: *anyopaque, data: StreamResult) StreamResult.Writable;
- pub const EndFn = fn (this: *anyopaque, err: ?Syscall.Error) JSC.Node.Maybe(void);
- pub const ConnectFn = fn (this: *anyopaque, signal: Signal) JSC.Node.Maybe(void);
+ pub const WriteUTF16Fn = *const (fn (this: *anyopaque, data: StreamResult) StreamResult.Writable);
+ pub const WriteUTF8Fn = *const (fn (this: *anyopaque, data: StreamResult) StreamResult.Writable);
+ pub const WriteLatin1Fn = *const (fn (this: *anyopaque, data: StreamResult) StreamResult.Writable);
+ pub const EndFn = *const (fn (this: *anyopaque, err: ?Syscall.Error) JSC.Node.Maybe(void));
+ pub const ConnectFn = *const (fn (this: *anyopaque, signal: Signal) JSC.Node.Maybe(void));
connect: ConnectFn,
write: WriteUTF8Fn,
@@ -1150,9 +1150,9 @@ pub const FileSink = struct {
pub fn updateRef(this: *FileSink, value: bool) void {
if (this.poll_ref) |poll| {
if (value)
- poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm)
+ poll.enableKeepingProcessAlive(JSC.VirtualMachine.get())
else
- poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ poll.disableKeepingProcessAlive(JSC.VirtualMachine.get());
}
}
@@ -1232,7 +1232,7 @@ pub const FileSink = struct {
// On Linux, we can adjust the pipe size to avoid blocking.
this.has_adjusted_pipe_size_on_linux = true;
- switch (JSC.Node.Syscall.setPipeCapacityOnLinux(fd, @minimum(Syscall.getMaxPipeSizeOnLinux(), remain_len))) {
+ switch (JSC.Node.Syscall.setPipeCapacityOnLinux(fd, @min(Syscall.getMaxPipeSizeOnLinux(), remain_len))) {
.result => |len| {
if (len > 0) {
this.max_write_size = len;
@@ -1249,7 +1249,7 @@ pub const FileSink = struct {
const initial = total;
const fd = this.fd;
var remain = buffer;
- remain = remain[@minimum(this.head, remain.len)..];
+ remain = remain[@min(this.head, remain.len)..];
if (remain.len == 0) return .{ .owned = 0 };
defer this.written = total;
@@ -1330,7 +1330,7 @@ pub const FileSink = struct {
if (max_to_write > 0) {
while (remain.len > 0) {
- const write_buf = remain[0..@minimum(remain.len, max_to_write)];
+ const write_buf = remain[0..@min(remain.len, max_to_write)];
const res = JSC.Node.Syscall.write(fd, write_buf);
if (res == .err) {
@@ -1526,7 +1526,7 @@ pub const FileSink = struct {
pub fn ready(this: *FileSink, writable: i64) void {
var remain = this.buffer.slice();
- const pending = remain[@minimum(this.head, remain.len)..].len;
+ const pending = remain[@min(this.head, remain.len)..].len;
if (pending == 0) {
if (this.isWatching()) {
this.unwatch(this.fd);
@@ -1536,7 +1536,7 @@ pub const FileSink = struct {
}
if (comptime Environment.isMac) {
- _ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), @intCast(usize, @maximum(writable, 0)));
+ _ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), @intCast(usize, @max(writable, 0)));
} else {
_ = this.flushMaybePollWithSizeAndBuffer(this.buffer.slice(), std.math.maxInt(usize));
}
@@ -1852,7 +1852,7 @@ pub const ArrayBufferSink = struct {
var list = this.bytes.listManaged(this.allocator);
this.bytes = bun.ByteList.init("");
return ArrayBuffer.fromBytes(
- list.toOwnedSlice(),
+ try list.toOwnedSlice(),
if (as_uint8array)
.Uint8Array
else
@@ -1871,7 +1871,7 @@ pub const ArrayBufferSink = struct {
this.done = true;
this.signal.close(null);
return .{ .result = ArrayBuffer.fromBytes(
- list.toOwnedSlice(),
+ list.toOwnedSlice() catch @panic("TODO"),
if (this.as_uint8array)
.Uint8Array
else
@@ -1896,7 +1896,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub const name = std.fmt.comptimePrint("{s}", .{std.mem.span(name_)});
// This attaches it to JS
- pub const SinkSignal = struct {
+ pub const SinkSignal = extern struct {
cpp: JSValue,
pub fn init(cpp: JSValue) Signal {
@@ -2219,15 +2219,15 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
}
pub const Export = shim.exportFunctions(.{
- .@"finalize" = finalize,
- .@"write" = write,
- .@"close" = close,
- .@"flush" = flush,
- .@"start" = start,
- .@"end" = end,
- .@"construct" = construct,
- .@"endWithSink" = endWithSink,
- .@"updateRef" = updateRef,
+ .finalize = finalize,
+ .write = write,
+ .close = close,
+ .flush = flush,
+ .start = start,
+ .end = end,
+ .construct = construct,
+ .endWithSink = endWithSink,
+ .updateRef = updateRef,
});
pub fn updateRef(ptr: *anyopaque, value: bool) callconv(.C) void {
@@ -2332,7 +2332,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
fn send(this: *@This(), buf: []const u8) bool {
std.debug.assert(!this.done);
- defer log("send: {d} bytes (backpressure: {d})", .{ buf.len, this.has_backpressure });
+ defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure });
if (this.requested_end and !this.res.state().isHttpWriteCalled()) {
const success = this.res.tryEnd(buf, this.end_len, false);
@@ -2373,7 +2373,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// do not write more than available
// if we do, it will cause this to be delayed until the next call, each time
- const to_write = @minimum(@truncate(Blob.SizeType, write_offset), @as(Blob.SizeType, this.buffer.len));
+ const to_write = @min(@truncate(Blob.SizeType, write_offset), @as(Blob.SizeType, this.buffer.len));
// figure out how much data exactly to write
const readable = this.readableSlice()[0..to_write];
@@ -2399,7 +2399,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (!this.done and !this.requested_end and !this.hasBackpressure()) {
const pending = @truncate(Blob.SizeType, write_offset) -| to_write;
const written_after_flush = this.wrote - initial_wrote;
- const to_report = pending - @minimum(written_after_flush, pending);
+ const to_report = pending - @min(written_after_flush, pending);
if ((written_after_flush == initial_wrote and pending == 0) or to_report > 0) {
this.signal.ready(to_report, null);
@@ -2475,7 +2475,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
- log("flushFromJS({s})", .{wait});
+ log("flushFromJS({any})", .{wait});
if (!wait) {
return this.flushFromJSNoWait();
}
@@ -2667,7 +2667,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// In this case, it's always an error
pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Node.Maybe(void) {
- log("end({s})", .{err});
+ log("end({any})", .{err});
if (this.requested_end) {
return .{ .result = {} };
@@ -2814,7 +2814,7 @@ pub fn ReadableStreamSource(
comptime onStart: anytype,
comptime onPull: anytype,
comptime onCancel: fn (this: *Context) void,
- comptime deinit: fn (this: *Context) void,
+ comptime deinit_fn: fn (this: *Context) void,
comptime setRefUnrefFn: ?fn (this: *Context, enable: bool) void,
comptime drainInternalBuffer: ?fn (this: *Context) bun.ByteList,
) type {
@@ -2823,7 +2823,7 @@ pub fn ReadableStreamSource(
cancelled: bool = false,
deinited: bool = false,
pending_err: ?Syscall.Error = null,
- close_handler: ?fn (*anyopaque) void = null,
+ close_handler: ?*const fn (*anyopaque) void = null,
close_ctx: ?*anyopaque = null,
close_jsvalue: JSValue = JSValue.zero,
globalThis: *JSGlobalObject = undefined,
@@ -2892,7 +2892,7 @@ pub fn ReadableStreamSource(
return;
}
this.deinited = true;
- deinit(&this.context);
+ deinit_fn(&this.context);
}
pub fn getError(this: *This) ?Syscall.Error {
@@ -3034,7 +3034,7 @@ pub fn ReadableStreamSource(
}
pub const Export = shim.exportFunctions(.{
- .@"load" = load,
+ .load = load,
});
comptime {
@@ -3066,7 +3066,7 @@ pub const ByteBlobLoader = struct {
this.* = ByteBlobLoader{
.offset = blobe.offset,
.store = blobe.store.?,
- .chunk_size = if (user_chunk_size > 0) @minimum(user_chunk_size, blobe.size) else @minimum(1024 * 1024 * 2, blobe.size),
+ .chunk_size = if (user_chunk_size > 0) @min(user_chunk_size, blobe.size) else @min(1024 * 1024 * 2, blobe.size),
.remain = blobe.size,
.done = false,
};
@@ -3086,7 +3086,7 @@ pub const ByteBlobLoader = struct {
var temporary = this.store.sharedView();
temporary = temporary[this.offset..];
- temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
+ temporary = temporary[0..@min(buffer.len, @min(temporary.len, this.remain))];
if (temporary.len == 0) {
this.store.deref();
this.done = true;
@@ -3120,7 +3120,7 @@ pub const ByteBlobLoader = struct {
pub fn drain(this: *ByteBlobLoader) bun.ByteList {
var temporary = this.store.sharedView();
temporary = temporary[this.offset..];
- temporary = temporary[0..@minimum(16384, @minimum(temporary.len, this.remain))];
+ temporary = temporary[0..@min(16384, @min(temporary.len, this.remain))];
var cloned = bun.ByteList.init(temporary).listManaged(bun.default_allocator).clone() catch @panic("Out of memory");
this.offset +|= @truncate(Blob.SizeType, cloned.items.len);
@@ -3141,7 +3141,7 @@ pub const ByteBlobLoader = struct {
);
};
-pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
+pub const PipeFunction = *const fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
pub const PathOrFileDescriptor = union(enum) {
path: ZigString.Slice,
@@ -3202,14 +3202,14 @@ pub const ByteStream = struct {
}
if (this.has_received_last_chunk) {
- return .{ .chunk_size = @truncate(Blob.SizeType, @minimum(1024 * 1024 * 2, this.buffer.items.len)) };
+ return .{ .chunk_size = @truncate(Blob.SizeType, @min(1024 * 1024 * 2, this.buffer.items.len)) };
}
if (this.highWaterMark == 0) {
return .{ .ready = void{} };
}
- return .{ .chunk_size = @maximum(this.highWaterMark, std.mem.page_size) };
+ return .{ .chunk_size = @max(this.highWaterMark, std.mem.page_size) };
}
pub fn value(this: *@This()) JSValue {
@@ -3260,7 +3260,7 @@ pub const ByteStream = struct {
if (this.pending.state == .pending) {
std.debug.assert(this.buffer.items.len == 0);
- var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)];
+ var to_copy = this.pending_buffer[0..@min(chunk.len, this.pending_buffer.len)];
const pending_buffer_len = this.pending_buffer.len;
std.debug.assert(to_copy.ptr != chunk.ptr);
@memcpy(to_copy.ptr, chunk.ptr, to_copy.len);
@@ -3347,7 +3347,7 @@ pub const ByteStream = struct {
if (this.buffer.items.len > 0) {
std.debug.assert(this.value() == .zero);
- const to_write = @minimum(
+ const to_write = @min(
this.buffer.items.len - this.offset,
buffer.len,
);
@@ -3571,7 +3571,7 @@ pub const FIFO = struct {
return @as(u32, 0);
}
- return @intCast(u32, @maximum(len, 0));
+ return @intCast(u32, @max(len, 0));
}
pub fn adjustPipeCapacityOnLinux(this: *FIFO, current: usize, max: usize) void {
@@ -3579,7 +3579,7 @@ pub const FIFO = struct {
if (!this.has_adjusted_pipe_size_on_linux) {
if (current > 0 and max >= std.mem.page_size * 16) {
this.has_adjusted_pipe_size_on_linux = true;
- _ = Syscall.setPipeCapacityOnLinux(this.fd, @minimum(max * 4, Syscall.getMaxPipeSizeOnLinux()));
+ _ = Syscall.setPipeCapacityOnLinux(this.fd, @min(max * 4, Syscall.getMaxPipeSizeOnLinux()));
}
}
}
@@ -3668,7 +3668,7 @@ pub const FIFO = struct {
}
if (size_or_offset != std.math.maxInt(@TypeOf(size_or_offset)))
- this.to_read = @intCast(u32, @maximum(size_or_offset, 0));
+ this.to_read = @intCast(u32, @max(size_or_offset, 0));
return this.to_read;
}
@@ -3909,11 +3909,11 @@ pub const File = struct {
return if (this.remaining_bytes > 0 and this.isSeekable())
if (available_to_read != std.math.maxInt(usize))
- @minimum(chunk_size, available_to_read)
+ @min(chunk_size, available_to_read)
else
- @minimum(this.remaining_bytes -| this.total_read, chunk_size)
+ @min(this.remaining_bytes -| this.total_read, chunk_size)
else
- @minimum(available_to_read, chunk_size);
+ @min(available_to_read, chunk_size);
}
pub fn start(
@@ -4044,7 +4044,7 @@ pub const File = struct {
this.pending.result = .{
.err = Syscall.Error{
// this is too hacky
- .errno = @truncate(Syscall.Error.Int, @intCast(u16, @maximum(1, @errorToInt(err)))),
+ .errno = @truncate(Syscall.Error.Int, @intCast(u16, @max(1, @errorToInt(err)))),
.syscall = .read,
},
};
@@ -4062,7 +4062,7 @@ pub const File = struct {
var remaining = this.buf[this.concurrent.read..];
while (remaining.len > 0) {
- const to_read = @minimum(@as(usize, this.concurrent.chunk_size), remaining.len);
+ const to_read = @min(@as(usize, this.concurrent.chunk_size), remaining.len);
switch (Syscall.read(this.fd, remaining[0..to_read])) {
.err => |err| {
const retry = std.os.E.AGAIN;
@@ -4503,9 +4503,9 @@ pub const FileReader = struct {
},
.File => {
if (value)
- this.lazy_readable.readable.File.poll_ref.ref(JSC.VirtualMachine.vm)
+ this.lazy_readable.readable.File.poll_ref.ref(JSC.VirtualMachine.get())
else
- this.lazy_readable.readable.File.poll_ref.unref(JSC.VirtualMachine.vm);
+ this.lazy_readable.readable.File.poll_ref.unref(JSC.VirtualMachine.get());
},
}
}
@@ -4570,14 +4570,14 @@ pub fn NewReadyWatcher(
const fd = @intCast(c_int, fd_);
std.debug.assert(@intCast(c_int, this.poll_ref.?.fd) == fd);
std.debug.assert(
- this.poll_ref.?.unregister(JSC.VirtualMachine.vm.uws_event_loop.?) == .result,
+ this.poll_ref.?.unregister(JSC.VirtualMachine.get().uws_event_loop.?) == .result,
);
}
pub fn pollRef(this: *Context) *JSC.FilePoll {
return this.poll_ref orelse brk: {
this.poll_ref = JSC.FilePoll.init(
- JSC.VirtualMachine.vm,
+ JSC.VirtualMachine.get(),
this.fd,
.{},
Context,
@@ -4599,7 +4599,7 @@ pub fn NewReadyWatcher(
const fd = @intCast(c_int, fd_);
var poll_ref: *JSC.FilePoll = this.poll_ref orelse brk: {
this.poll_ref = JSC.FilePoll.init(
- JSC.VirtualMachine.vm,
+ JSC.VirtualMachine.get(),
fd,
.{},
Context,
@@ -4609,7 +4609,7 @@ pub fn NewReadyWatcher(
};
std.debug.assert(poll_ref.fd == fd);
std.debug.assert(!this.isWatching());
- switch (poll_ref.register(JSC.VirtualMachine.vm.uws_event_loop.?, flag, true)) {
+ switch (poll_ref.register(JSC.VirtualMachine.get().uws_event_loop.?, flag, true)) {
.err => |err| {
bun.unreachablePanic("FilePoll.register failed: {d}", .{err.errno});
},