aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/streams.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r--src/bun.js/webcore/streams.zig266
1 files changed, 169 insertions, 97 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 681ff6172..07e792fe0 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -948,7 +948,7 @@ pub const FileSink = struct {
next: ?Sink = null,
auto_close: bool = false,
auto_truncate: bool = false,
- opened_fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor),
+ fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor),
mode: JSC.Node.Mode = 0,
chunk_size: usize = 0,
pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{
@@ -963,6 +963,9 @@ pub const FileSink = struct {
prevent_process_exit: bool = false,
reachable_from_js: bool = true,
+ poll_ref: JSC.PollRef = .{},
+
+ pub usingnamespace NewReadyWatcher(@This(), .write, ready);
pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) {
var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
@@ -987,7 +990,9 @@ pub const FileSink = struct {
};
this.mode = stat.mode;
- this.opened_fd = fd;
+ this.fd = fd;
+
+ this.auto_truncate = this.auto_truncate and (std.os.S.ISREG(this.mode));
return .{ .result = {} };
}
@@ -998,9 +1003,9 @@ pub const FileSink = struct {
}
pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) {
- if (this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
- _ = JSC.Node.Syscall.close(this.opened_fd);
- this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ _ = JSC.Node.Syscall.close(this.fd);
+ this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
}
this.done = false;
@@ -1034,16 +1039,17 @@ pub const FileSink = struct {
}
pub fn flush(this: *FileSink) StreamResult.Writable {
- std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
var total: usize = this.written;
const initial = total;
defer this.written = total;
- const fd = this.opened_fd;
+ const fd = this.fd;
var remain = this.buffer.slice();
remain = remain[@minimum(this.head, remain.len)..];
+ const initial_remain = remain;
defer {
- std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(this.buffer.ptr));
+ std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(initial_remain.ptr));
if (remain.len == 0) {
this.head = 0;
@@ -1060,7 +1066,7 @@ pub const FileSink = struct {
switch (res.err.getErrno()) {
retry => {
- this.watch();
+ this.watch(fd);
return .{
.pending = &this.pending,
};
@@ -1085,11 +1091,11 @@ pub const FileSink = struct {
if (this.requested_end) {
this.done = true;
if (this.auto_truncate)
- std.os.ftruncate(this.opened_fd, total) catch {};
+ std.os.ftruncate(this.fd, total) catch {};
if (this.auto_close) {
- _ = JSC.Node.Syscall.close(this.opened_fd);
- this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ _ = JSC.Node.Syscall.close(this.fd);
+ this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
}
}
this.pending.run();
@@ -1097,6 +1103,9 @@ pub const FileSink = struct {
}
pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, _: bool) JSC.Node.Maybe(JSValue) {
+ if (this.isPending()) {
+ return .{ .result = JSC.JSValue.jsUndefined() };
+ }
const result = this.flush();
if (result == .err) {
@@ -1109,9 +1118,11 @@ pub const FileSink = struct {
}
fn cleanup(this: *FileSink) void {
- if (this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
- _ = JSC.Node.Syscall.close(this.opened_fd);
- this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ this.unwatch(this.fd);
+
+ if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ _ = JSC.Node.Syscall.close(this.fd);
+ this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
}
if (this.buffer.cap > 0) {
@@ -1120,6 +1131,9 @@ pub const FileSink = struct {
this.done = true;
this.head = 0;
}
+
+ this.pending.result = .done;
+ this.pending.run();
}
pub fn finalize(this: *FileSink) void {
@@ -1151,25 +1165,11 @@ pub const FileSink = struct {
};
}
- pub fn watch(this: *FileSink) void {
- std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor));
- _ = JSC.VirtualMachine.vm.poller.watch(this.opened_fd, .write, FileSink, this);
- this.scheduled_count += 1;
- }
-
- pub fn unwatch(this: *FileSink) void {
- std.debug.assert(this.scheduled_count > 0);
- std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor));
- _ = JSC.VirtualMachine.vm.poller.unwatch(this.opened_fd, .write, FileSink, this);
- this.scheduled_count -= 1;
- }
-
pub fn toJS(this: *FileSink, globalThis: *JSGlobalObject) JSValue {
return JSSink.createObject(globalThis, this);
}
- pub fn onPoll(this: *FileSink, _: i64, _: u16) void {
- this.scheduled_count -= 1;
+ pub fn ready(this: *FileSink, _: i64) void {
_ = this.flush();
}
@@ -1246,7 +1246,7 @@ pub const FileSink = struct {
}
fn isPending(this: *const FileSink) bool {
- return this.scheduled_count > 0;
+ return this.poll_ref.status == .active;
}
pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
@@ -2531,13 +2531,16 @@ pub fn ReadableStreamSource(
}
pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue {
+ const arguments = callFrame.arguments(2);
+ var array = arguments.ptr[1].asObjectRef();
+
switch (result) {
.err => |err| {
globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
return JSValue.jsUndefined();
},
.temporary_and_done, .owned_and_done, .into_array_and_done => {
- JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), callFrame.argument(2).asObjectRef(), 0, JSValue.jsBoolean(true).asObjectRef(), null);
+ JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), array, 0, JSValue.jsBoolean(true).asObjectRef(), null);
return result.toJS(globalThis);
},
else => return result.toJS(globalThis),
@@ -2691,7 +2694,6 @@ pub const ByteStream = struct {
has_received_last_chunk: bool = false,
pending: StreamResult.Pending = StreamResult.Pending{
.frame = undefined,
- .used = false,
.result = .{ .done = {} },
},
done: bool = false,
@@ -2794,7 +2796,7 @@ pub const ByteStream = struct {
const chunk = stream.slice();
- if (!this.pending.used) {
+ if (this.pending.state != .pending) {
std.debug.assert(this.buffer.items.len == 0);
var to_copy = this.pending_buffer[0..@minimum(chunk.len, this.pending_buffer.len)];
const pending_buffer_len = this.pending_buffer.len;
@@ -2941,15 +2943,13 @@ pub const ByteStream = struct {
this.done = true;
if (this.pending_value) |ref| {
this.pending_value = null;
- ref.destroy(undefined);
+ ref.destroy();
}
if (view != .zero) {
this.pending_buffer = &.{};
this.pending.result = .{ .done = {} };
- if (!this.pending.used) {
- resume this.pending.frame;
- }
+ this.pending.run();
}
}
@@ -2959,17 +2959,14 @@ pub const ByteStream = struct {
if (this.pending_value) |ref| {
this.pending_value = null;
- ref.destroy(undefined);
+ ref.destroy();
}
if (!this.done) {
this.done = true;
this.pending_buffer = &.{};
this.pending.result = .{ .done = {} };
-
- if (!this.pending.used) {
- resume this.pending.frame;
- }
+ this.pending.run();
}
bun.default_allocator.destroy(this.parent());
@@ -2980,7 +2977,7 @@ pub const ByteStream = struct {
pub const FileBlobLoader = struct {
buf: []u8 = &[_]u8{},
- protected_view: JSC.JSValue = JSC.JSValue.zero,
+ view: JSC.Strong = .{},
fd: JSC.Node.FileDescriptor = 0,
auto_close: bool = false,
loop: *JSC.EventLoop = undefined,
@@ -3000,6 +2997,14 @@ pub const FileBlobLoader = struct {
concurrent: Concurrent = Concurrent{},
input_tag: StreamResult.Tag = StreamResult.Tag.done,
started: bool = false,
+ stored_global_this_: ?*JSC.JSGlobalObject = null,
+ poll_ref: JSC.PollRef = .{},
+
+ pub usingnamespace NewReadyWatcher(@This(), .read, ready);
+
+ pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject {
+ return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis;
+ }
const FileReader = @This();
@@ -3017,15 +3022,6 @@ pub const FileBlobLoader = struct {
};
}
- pub fn watch(this: *FileReader) ?JSC.Node.Syscall.Error {
- switch (JSC.VirtualMachine.vm.poller.watch(this.fd, .read, FileBlobLoader, this)) {
- .err => |err| return err,
- else => {},
- }
- this.scheduled_count += 1;
- return null;
- }
-
const Concurrent = struct {
read: Blob.SizeType = 0,
task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback },
@@ -3126,11 +3122,10 @@ pub const FileBlobLoader = struct {
pub fn onJSThread(task_ctx: *anyopaque) void {
var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx);
- const protected_view = this.protected_view;
- defer protected_view.unprotect();
- this.protected_view = JSC.JSValue.zero;
+ const view = this.view.get().?;
+ defer this.view.clear();
- if (this.finalized and this.scheduled_count == 0) {
+ if (this.finalized and this.scheduled_count > 0) {
this.pending.run();
this.scheduled_count -= 1;
@@ -3154,7 +3149,7 @@ pub const FileBlobLoader = struct {
return;
}
- this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read), protected_view);
+ this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read), view, false, this.buf);
this.pending.run();
this.scheduled_count -= 1;
if (this.pending.result.isDone()) {
@@ -3309,8 +3304,7 @@ pub const FileBlobLoader = struct {
return .{ .done = {} };
},
run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
- this.protected_view = view;
- this.protected_view.protect();
+ this.view.set(this.globalThis(), view);
// should never be reached
this.pending.result = .{
.err = Syscall.Error.todo,
@@ -3324,7 +3318,7 @@ pub const FileBlobLoader = struct {
else => {},
}
- return this.read(buffer, view);
+ return this.read(buffer, view, null);
}
fn maybeAutoClose(this: *FileBlobLoader) void {
@@ -3334,7 +3328,7 @@ pub const FileBlobLoader = struct {
}
}
- fn handleReadChunk(this: *FileBlobLoader, result: usize, view: JSC.JSValue) StreamResult {
+ fn handleReadChunk(this: *FileBlobLoader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult {
std.debug.assert(this.started);
this.total_read += @intCast(Blob.SizeType, result);
@@ -3355,9 +3349,17 @@ pub const FileBlobLoader = struct {
const has_more = remaining > 0;
if (!has_more) {
+ if (owned) {
+ return .{ .owned_and_done = bun.ByteList.init(buf) };
+ }
+
return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result), .value = view } };
}
+ if (owned) {
+ return .{ .owned = bun.ByteList.init(buf) };
+ }
+
return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result), .value = view } };
}
@@ -3365,27 +3367,68 @@ pub const FileBlobLoader = struct {
this: *FileBlobLoader,
read_buf: []u8,
view: JSC.JSValue,
+ /// provided via kqueue(), only on macOS
+ available_to_read: ?c_int,
) StreamResult {
std.debug.assert(this.started);
+ std.debug.assert(read_buf.len > 0);
+
+ var buf_to_use = read_buf;
+ var free_buffer_on_error: bool = false;
+
+ // if it's a pipe, we really don't know what to expect what the max size will be
+ // if the pipe is sending us WAY bigger data than what we can fit in the buffer
+ // we allocate a new buffer of up to 4 MB
+ if (std.os.S.ISFIFO(this.mode)) {
+ outer: {
+ var len: c_int = available_to_read orelse 0;
+
+ // macOS FIONREAD doesn't seem to work here
+ // but we can get this information from the kqueue callback so we don't need to
+ if (len == 0) {
+ const FIONREAD = if (Environment.isLinux) std.os.FIONREAD else bun.C.FIONREAD;
+ const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len);
+ if (rc != 0) {
+ len = 0;
+ }
+ }
- const rc =
- Syscall.read(this.fd, read_buf);
+ if (len > read_buf.len * 10 and read_buf.len < std.mem.page_size) {
+ // then we need to allocate a buffer
+ // to read into
+ // this
+ buf_to_use = bun.default_allocator.alloc(
+ u8,
+ @intCast(
+ usize,
+ @minimum(
+ len,
+ 1024 * 1024 * 4,
+ ),
+ ),
+ ) catch break :outer;
+ free_buffer_on_error = true;
+ }
+ }
+ }
+
+ const rc = Syscall.read(this.fd, buf_to_use);
switch (rc) {
.err => |err| {
- const retry =
- std.os.E.AGAIN;
+ const retry = std.os.E.AGAIN;
switch (err.getErrno()) {
retry => {
- this.protected_view = view;
- this.protected_view.protect();
- this.buf = read_buf;
- if (this.watch()) |watch_fail| {
- this.finalize();
- return .{ .err = watch_fail };
+ if (free_buffer_on_error) {
+ bun.default_allocator.free(buf_to_use);
+ buf_to_use = read_buf;
}
+ this.view.set(this.globalThis(), view);
+ this.buf = read_buf;
+ this.watch(this.fd);
+
return .{
.pending = &this.pending,
};
@@ -3401,19 +3444,28 @@ pub const FileBlobLoader = struct {
return .{ .err = sys };
},
.result => |result| {
- return this.handleReadChunk(result, view);
+ if (result == 0 and free_buffer_on_error) {
+ bun.default_allocator.free(buf_to_use);
+ buf_to_use = read_buf;
+
+ return this.handleReadChunk(result, view, true, buf_to_use);
+ } else if (free_buffer_on_error) {
+ this.view.clear();
+ this.buf = &.{};
+ return this.handleReadChunk(result, view, true, buf_to_use);
+ }
+
+ return this.handleReadChunk(result, view, false, buf_to_use);
},
}
}
/// Called from Poller
- pub fn onPoll(this: *FileBlobLoader, sizeOrOffset: i64, _: u16) void {
+ pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void {
std.debug.assert(this.started);
- this.scheduled_count -= 1;
- const protected_view = this.protected_view;
- defer protected_view.unprotect();
- this.protected_view = JSValue.zero;
+ const view = this.view.get() orelse .zero;
+ defer this.view.clear();
var available_to_read: usize = std.math.maxInt(usize);
if (comptime Environment.isMac) {
@@ -3446,35 +3498,29 @@ pub const FileBlobLoader = struct {
this.buf.len = @minimum(this.buf.len, available_to_read);
}
- this.pending.result = this.read(this.buf, protected_view);
+ this.pending.result = this.read(
+ this.buf,
+ view,
+ if (available_to_read == std.math.maxInt(usize))
+ null
+ else
+ @truncate(c_int, @intCast(isize, available_to_read)),
+ );
this.pending.run();
}
- pub fn unwatch(this: *FileBlobLoader) void {
- std.debug.assert(this.scheduled_count > 0);
- std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
- _ = JSC.VirtualMachine.vm.poller.unwatch(this.fd, .read, FileBlobLoader, this);
- this.scheduled_count -= 1;
- }
-
pub fn finalize(this: *FileBlobLoader) void {
if (this.finalized)
return;
- this.finalized = true;
- if (this.scheduled_count > 0) {
- this.unwatch();
- }
+ this.unwatch(this.fd);
+ this.finalized = true;
this.pending.result = .{ .done = {} };
this.pending.run();
- if (this.protected_view != .zero) {
- this.protected_view.unprotect();
- this.protected_view = .zero;
-
- this.buf = &.{};
- }
+ this.view.deinit();
+ this.buf = &.{};
this.maybeAutoClose();
@@ -3483,7 +3529,6 @@ pub const FileBlobLoader = struct {
pub fn onCancel(this: *FileBlobLoader) void {
this.cancelled = true;
-
this.deinit();
}
@@ -3501,6 +3546,33 @@ pub const FileBlobLoader = struct {
pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPullInto, onCancel, deinit);
};
+pub fn NewReadyWatcher(
+ comptime Context: type,
+ comptime flag_: JSC.Poller.Flag,
+ comptime onReady: anytype,
+) type {
+ return struct {
+ const flag = flag_;
+ const ready = onReady;
+
+ const Watcher = @This();
+
+ pub fn onPoll(this: *Context, sizeOrOffset: i64, _: u16) void {
+ ready(this, sizeOrOffset);
+ }
+
+ pub fn unwatch(this: *Context, fd: JSC.Node.FileDescriptor) void {
+ std.debug.assert(fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ _ = JSC.VirtualMachine.vm.poller.unwatch(fd, flag, Context, this);
+ }
+
+ pub fn watch(this: *Context, fd: JSC.Node.FileDescriptor) void {
+ std.debug.assert(fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ _ = JSC.VirtualMachine.vm.poller.watch(fd, flag, Context, this);
+ }
+ };
+}
+
// pub const HTTPRequest = RequestBodyStreamer(false);
// pub const HTTPSRequest = RequestBodyStreamer(true);
// pub fn ResponseBodyStreamer(comptime is_ssl: bool) type {