diff options
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r-- | src/bun.js/webcore/streams.zig | 266 |
1 files changed, 169 insertions, 97 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 681ff6172..07e792fe0 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -948,7 +948,7 @@ pub const FileSink = struct { next: ?Sink = null, auto_close: bool = false, auto_truncate: bool = false, - opened_fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), mode: JSC.Node.Mode = 0, chunk_size: usize = 0, pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{ @@ -963,6 +963,9 @@ pub const FileSink = struct { prevent_process_exit: bool = false, reachable_from_js: bool = true, + poll_ref: JSC.PollRef = .{}, + + pub usingnamespace NewReadyWatcher(@This(), .write, ready); pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) { var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; @@ -987,7 +990,9 @@ pub const FileSink = struct { }; this.mode = stat.mode; - this.opened_fd = fd; + this.fd = fd; + + this.auto_truncate = this.auto_truncate and (std.os.S.ISREG(this.mode)); return .{ .result = {} }; } @@ -998,9 +1003,9 @@ pub const FileSink = struct { } pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) { - if (this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)) { - _ = JSC.Node.Syscall.close(this.opened_fd); - this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor); + if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + _ = JSC.Node.Syscall.close(this.fd); + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); } this.done = false; @@ -1034,16 +1039,17 @@ pub const FileSink = struct { } pub fn flush(this: *FileSink) StreamResult.Writable { - std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)); + std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); var total: usize = this.written; const initial = total; defer this.written = total; - const fd = this.opened_fd; + const fd = this.fd; var remain = this.buffer.slice(); remain = remain[@minimum(this.head, remain.len)..]; + const initial_remain = remain; defer { - std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(this.buffer.ptr)); + std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(initial_remain.ptr)); if (remain.len == 0) { this.head = 0; @@ -1060,7 +1066,7 @@ pub const FileSink = struct { switch (res.err.getErrno()) { retry => { - this.watch(); + this.watch(fd); return .{ .pending = &this.pending, }; @@ -1085,11 +1091,11 @@ pub const FileSink = struct { if (this.requested_end) { this.done = true; if (this.auto_truncate) - std.os.ftruncate(this.opened_fd, total) catch {}; + std.os.ftruncate(this.fd, total) catch {}; if (this.auto_close) { - _ = JSC.Node.Syscall.close(this.opened_fd); - this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor); + _ = JSC.Node.Syscall.close(this.fd); + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); } } this.pending.run(); @@ -1097,6 +1103,9 @@ pub const FileSink = struct { } pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, _: bool) JSC.Node.Maybe(JSValue) { + if (this.isPending()) { + return .{ .result = JSC.JSValue.jsUndefined() }; + } const result = this.flush(); if (result == .err) { @@ -1109,9 +1118,11 @@ pub const FileSink = struct { } fn cleanup(this: *FileSink) void { - if (this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)) { - _ = JSC.Node.Syscall.close(this.opened_fd); - this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.unwatch(this.fd); + + if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + _ = JSC.Node.Syscall.close(this.fd); + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); } if (this.buffer.cap > 0) { @@ -1120,6 +1131,9 @@ pub const FileSink = struct { this.done = true; this.head = 0; } + + this.pending.result = .done; + this.pending.run(); } pub fn finalize(this: *FileSink) void { @@ -1151,25 +1165,11 @@ pub const FileSink = struct { }; } - pub fn watch(this: *FileSink) void { - std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)); - _ = JSC.VirtualMachine.vm.poller.watch(this.opened_fd, .write, FileSink, this); - this.scheduled_count += 1; - } - - pub fn unwatch(this: *FileSink) void { - std.debug.assert(this.scheduled_count > 0); - std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)); - _ = JSC.VirtualMachine.vm.poller.unwatch(this.opened_fd, .write, FileSink, this); - this.scheduled_count -= 1; - } - pub fn toJS(this: *FileSink, globalThis: *JSGlobalObject) JSValue { return JSSink.createObject(globalThis, this); } - pub fn onPoll(this: *FileSink, _: i64, _: u16) void { - this.scheduled_count -= 1; + pub fn ready(this: *FileSink, _: i64) void { _ = this.flush(); } @@ -1246,7 +1246,7 @@ pub const FileSink = struct { } fn isPending(this: *const FileSink) bool { - return this.scheduled_count > 0; + return this.poll_ref.status == .active; } pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { @@ -2531,13 +2531,16 @@ pub fn ReadableStreamSource( } pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue { + const arguments = callFrame.arguments(2); + var array = arguments.ptr[1].asObjectRef(); + switch (result) { .err => |err| { globalThis.vm().throwError(globalThis, err.toJSC(globalThis)); return JSValue.jsUndefined(); }, .temporary_and_done, .owned_and_done, .into_array_and_done => { - JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), callFrame.argument(2).asObjectRef(), 0, JSValue.jsBoolean(true).asObjectRef(), null); + JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), array, 0, JSValue.jsBoolean(true).asObjectRef(), null); return result.toJS(globalThis); }, else => return result.toJS(globalThis), @@ -2691,7 +2694,6 @@ pub const ByteStream = struct { has_received_last_chunk: bool = false, pending: StreamResult.Pending = StreamResult.Pending{ .frame = undefined, - .used = false, .result = .{ .done = {} }, }, done: bool = false, @@ -2794,7 +2796,7 @@ pub const ByteStream = struct { const chunk = stream.slice(); - if (!this.pending.used) { + if (this.pending.state != .pending) { std.debug.assert(this.buffer.items.len == 0); var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)]; const pending_buffer_len = this.pending_buffer.len; @@ -2941,15 +2943,13 @@ pub const ByteStream = struct { this.done = true; if (this.pending_value) |ref| { this.pending_value = null; - ref.destroy(undefined); + ref.destroy(); } if (view != .zero) { this.pending_buffer = &.{}; this.pending.result = .{ .done = {} }; - if (!this.pending.used) { - resume this.pending.frame; - } + this.pending.run(); } } @@ -2959,17 +2959,14 @@ pub const ByteStream = struct { if (this.pending_value) |ref| { this.pending_value = null; - ref.destroy(undefined); + ref.destroy(); } if (!this.done) { this.done = true; this.pending_buffer = &.{}; this.pending.result = .{ .done = {} }; - - if (!this.pending.used) { - resume this.pending.frame; - } + this.pending.run(); } bun.default_allocator.destroy(this.parent()); @@ -2980,7 +2977,7 @@ pub const ByteStream = struct { pub const FileBlobLoader = struct { buf: []u8 = &[_]u8{}, - protected_view: JSC.JSValue = JSC.JSValue.zero, + view: JSC.Strong = .{}, fd: JSC.Node.FileDescriptor = 0, auto_close: bool = false, loop: *JSC.EventLoop = undefined, @@ -3000,6 +2997,14 @@ pub const FileBlobLoader = struct { concurrent: Concurrent = Concurrent{}, input_tag: StreamResult.Tag = StreamResult.Tag.done, started: bool = false, + stored_global_this_: ?*JSC.JSGlobalObject = null, + poll_ref: JSC.PollRef = .{}, + + pub usingnamespace NewReadyWatcher(@This(), .read, ready); + + pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject { + return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis; + } const FileReader = @This(); @@ -3017,15 +3022,6 @@ pub const FileBlobLoader = struct { }; } - pub fn watch(this: *FileReader) ?JSC.Node.Syscall.Error { - switch (JSC.VirtualMachine.vm.poller.watch(this.fd, .read, FileBlobLoader, this)) { - .err => |err| return err, - else => {}, - } - this.scheduled_count += 1; - return null; - } - const Concurrent = struct { read: Blob.SizeType = 0, task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback }, @@ -3126,11 +3122,10 @@ pub const FileBlobLoader = struct { pub fn onJSThread(task_ctx: *anyopaque) void { var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx); - const protected_view = this.protected_view; - defer protected_view.unprotect(); - this.protected_view = JSC.JSValue.zero; + const view = this.view.get().?; + defer this.view.clear(); - if (this.finalized and this.scheduled_count == 0) { + if (this.finalized and this.scheduled_count > 0) { this.pending.run(); this.scheduled_count -= 1; @@ -3154,7 +3149,7 @@ pub const FileBlobLoader = struct { return; } - this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read), protected_view); + this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read), view, false, this.buf); this.pending.run(); this.scheduled_count -= 1; if (this.pending.result.isDone()) { @@ -3309,8 +3304,7 @@ pub const FileBlobLoader = struct { return .{ .done = {} }; }, run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => { - this.protected_view = view; - this.protected_view.protect(); + this.view.set(this.globalThis(), view); // should never be reached this.pending.result = .{ .err = Syscall.Error.todo, @@ -3324,7 +3318,7 @@ pub const FileBlobLoader = struct { else => {}, } - return this.read(buffer, view); + return this.read(buffer, view, null); } fn maybeAutoClose(this: *FileBlobLoader) void { @@ -3334,7 +3328,7 @@ pub const FileBlobLoader = struct { } } - fn handleReadChunk(this: *FileBlobLoader, result: usize, view: JSC.JSValue) StreamResult { + fn handleReadChunk(this: *FileBlobLoader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult { std.debug.assert(this.started); this.total_read += @intCast(Blob.SizeType, result); @@ -3355,9 +3349,17 @@ pub const FileBlobLoader = struct { const has_more = remaining > 0; if (!has_more) { + if (owned) { + return .{ .owned_and_done = bun.ByteList.init(buf) }; + } + return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result), .value = view } }; } + if (owned) { + return .{ .owned = bun.ByteList.init(buf) }; + } + return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result), .value = view } }; } @@ -3365,27 +3367,68 @@ pub const FileBlobLoader = struct { this: *FileBlobLoader, read_buf: []u8, view: JSC.JSValue, + /// provided via kqueue(), only on macOS + available_to_read: ?c_int, ) StreamResult { std.debug.assert(this.started); + std.debug.assert(read_buf.len > 0); + + var buf_to_use = read_buf; + var free_buffer_on_error: bool = false; + + // if it's a pipe, we really don't know what to expect what the max size will be + // if the pipe is sending us WAY bigger data than what we can fit in the buffer + // we allocate a new buffer of up to 4 MB + if (std.os.S.ISFIFO(this.mode)) { + outer: { + var len: c_int = available_to_read orelse 0; + + // macOS FIONREAD doesn't seem to work here + // but we can get this information from the kqueue callback so we don't need to + if (len == 0) { + const FIONREAD = if (Environment.isLinux) std.os.FIONREAD else bun.C.FIONREAD; + const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len); + if (rc != 0) { + len = 0; + } + } - const rc = - Syscall.read(this.fd, read_buf); + if (len > read_buf.len * 10 and read_buf.len < std.mem.page_size) { + // then we need to allocate a buffer + // to read into + // this + buf_to_use = bun.default_allocator.alloc( + u8, + @intCast( + usize, + @minimum( + len, + 1024 * 1024 * 4, + ), + ), + ) catch break :outer; + free_buffer_on_error = true; + } + } + } + + const rc = Syscall.read(this.fd, buf_to_use); switch (rc) { .err => |err| { - const retry = - std.os.E.AGAIN; + const retry = std.os.E.AGAIN; switch (err.getErrno()) { retry => { - this.protected_view = view; - this.protected_view.protect(); - this.buf = read_buf; - if (this.watch()) |watch_fail| { - this.finalize(); - return .{ .err = watch_fail }; + if (free_buffer_on_error) { + bun.default_allocator.free(buf_to_use); + buf_to_use = read_buf; } + this.view.set(this.globalThis(), view); + this.buf = read_buf; + this.watch(this.fd); + return .{ .pending = &this.pending, }; @@ -3401,19 +3444,28 @@ pub const FileBlobLoader = struct { return .{ .err = sys }; }, .result => |result| { - return this.handleReadChunk(result, view); + if (result == 0 and free_buffer_on_error) { + bun.default_allocator.free(buf_to_use); + buf_to_use = read_buf; + + return this.handleReadChunk(result, view, true, buf_to_use); + } else if (free_buffer_on_error) { + this.view.clear(); + this.buf = &.{}; + return this.handleReadChunk(result, view, true, buf_to_use); + } + + return this.handleReadChunk(result, view, false, buf_to_use); }, } } /// Called from Poller - pub fn onPoll(this: *FileBlobLoader, sizeOrOffset: i64, _: u16) void { + pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void { std.debug.assert(this.started); - this.scheduled_count -= 1; - const protected_view = this.protected_view; - defer protected_view.unprotect(); - this.protected_view = JSValue.zero; + const view = this.view.get() orelse .zero; + defer this.view.clear(); var available_to_read: usize = std.math.maxInt(usize); if (comptime Environment.isMac) { @@ -3446,35 +3498,29 @@ pub const FileBlobLoader = struct { this.buf.len = @minimum(this.buf.len, available_to_read); } - this.pending.result = this.read(this.buf, protected_view); + this.pending.result = this.read( + this.buf, + view, + if (available_to_read == std.math.maxInt(usize)) + null + else + @truncate(c_int, @intCast(isize, available_to_read)), + ); this.pending.run(); } - pub fn unwatch(this: *FileBlobLoader) void { - std.debug.assert(this.scheduled_count > 0); - std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); - _ = JSC.VirtualMachine.vm.poller.unwatch(this.fd, .read, FileBlobLoader, this); - this.scheduled_count -= 1; - } - pub fn finalize(this: *FileBlobLoader) void { if (this.finalized) return; - this.finalized = true; - if (this.scheduled_count > 0) { - this.unwatch(); - } + this.unwatch(this.fd); + this.finalized = true; this.pending.result = .{ .done = {} }; this.pending.run(); - if (this.protected_view != .zero) { - this.protected_view.unprotect(); - this.protected_view = .zero; - - this.buf = &.{}; - } + this.view.deinit(); + this.buf = &.{}; this.maybeAutoClose(); @@ -3483,7 +3529,6 @@ pub const FileBlobLoader = struct { pub fn onCancel(this: *FileBlobLoader) void { this.cancelled = true; - this.deinit(); } @@ -3501,6 +3546,33 @@ pub const FileBlobLoader = struct { pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPullInto, onCancel, deinit); }; +pub fn NewReadyWatcher( + comptime Context: type, + comptime flag_: JSC.Poller.Flag, + comptime onReady: anytype, +) type { + return struct { + const flag = flag_; + const ready = onReady; + + const Watcher = @This(); + + pub fn onPoll(this: *Context, sizeOrOffset: i64, _: u16) void { + ready(this, sizeOrOffset); + } + + pub fn unwatch(this: *Context, fd: JSC.Node.FileDescriptor) void { + std.debug.assert(fd != std.math.maxInt(JSC.Node.FileDescriptor)); + _ = JSC.VirtualMachine.vm.poller.unwatch(fd, flag, Context, this); + } + + pub fn watch(this: *Context, fd: JSC.Node.FileDescriptor) void { + std.debug.assert(fd != std.math.maxInt(JSC.Node.FileDescriptor)); + _ = JSC.VirtualMachine.vm.poller.watch(fd, flag, Context, this); + } + }; +} + // pub const HTTPRequest = RequestBodyStreamer(false); // pub const HTTPSRequest = RequestBodyStreamer(true); // pub fn ResponseBodyStreamer(comptime is_ssl: bool) type { |