aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/api/bun.zig15
-rw-r--r--src/bun.js/api/bun/spawn.zig6
-rw-r--r--src/bun.js/api/bun/subprocess.zig527
-rw-r--r--src/bun.js/api/html_rewriter.zig3
-rw-r--r--src/bun.js/api/server.zig15
-rw-r--r--src/bun.js/base.zig49
-rw-r--r--src/bun.js/bindings/URLSearchParams.cpp11
-rw-r--r--src/bun.js/bindings/URLSearchParams.h1
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp1
-rw-r--r--src/bun.js/bindings/bindings.cpp2
-rw-r--r--src/bun.js/bindings/headers-cpp.h2
-rw-r--r--src/bun.js/bindings/headers.h2
-rw-r--r--src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp91
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js89
-rw-r--r--src/bun.js/child_process.exports.js5
-rw-r--r--src/bun.js/event_loop.zig2
-rw-r--r--src/bun.js/node/node_fs.zig10
-rw-r--r--src/bun.js/node/syscall.zig20
-rw-r--r--src/bun.js/node/types.zig22
-rw-r--r--src/bun.js/node_timers.exports.js28
-rw-r--r--src/bun.js/streams.exports.js12
-rw-r--r--src/bun.js/webcore/response.zig100
-rw-r--r--src/bun.js/webcore/streams.zig1703
23 files changed, 1587 insertions, 1129 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index 3a88f7a04..ee26b09f5 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -2448,8 +2448,7 @@ pub const Timer = struct {
}
pub fn deinit(this: *Timeout) void {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
var vm = this.globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -2465,7 +2464,7 @@ pub const Timer = struct {
countdown: JSValue,
repeat: bool,
) !void {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
var vm = globalThis.bunVM();
// We don't deal with nesting levels directly
@@ -2534,7 +2533,7 @@ pub const Timer = struct {
callback: JSValue,
countdown: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
const id = globalThis.bunVM().timer.last_id;
globalThis.bunVM().timer.last_id +%= 1;
@@ -2548,7 +2547,7 @@ pub const Timer = struct {
callback: JSValue,
countdown: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
const id = globalThis.bunVM().timer.last_id;
globalThis.bunVM().timer.last_id +%= 1;
@@ -2559,7 +2558,7 @@ pub const Timer = struct {
}
pub fn clearTimer(timer_id: JSValue, _: *JSGlobalObject, repeats: bool) void {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
var map = if (repeats) &VirtualMachine.vm.timer.interval_map else &VirtualMachine.vm.timer.timeout_map;
const id: Timeout.ID = .{
@@ -2580,7 +2579,7 @@ pub const Timer = struct {
globalThis: *JSGlobalObject,
id: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
Timer.clearTimer(id, globalThis, false);
return JSValue.jsUndefined();
}
@@ -2588,7 +2587,7 @@ pub const Timer = struct {
globalThis: *JSGlobalObject,
id: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
Timer.clearTimer(id, globalThis, true);
return JSValue.jsUndefined();
}
diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig
index d594d44a7..afcc5509b 100644
--- a/src/bun.js/api/bun/spawn.zig
+++ b/src/bun.js/api/bun/spawn.zig
@@ -57,8 +57,6 @@ pub const PosixSpawn = struct {
} else {
_ = system.posix_spawnattr_destroy(&self.attr);
}
-
- self.* = undefined;
}
pub fn get(self: Attr) !u16 {
@@ -207,6 +205,10 @@ pub const PosixSpawn = struct {
argv,
envp,
);
+ if (comptime bun.Environment.allow_assert)
+ JSC.Node.Syscall.syslog("posix_spawn({s}) = {d} ({d})", .{
+ path, rc, pid,
+ });
if (comptime bun.Environment.isLinux) {
// rc is negative because it's libc errno
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index e09c5db2b..c85e0396f 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -28,7 +28,6 @@ pub const Subprocess = struct {
stdin: Writable,
stdout: Readable,
stderr: Readable,
-
killed: bool = false,
poll_ref: ?*JSC.FilePoll = null,
@@ -47,8 +46,22 @@ pub const Subprocess = struct {
finalized: bool = false,
globalThis: *JSC.JSGlobalObject,
-
+ observable_getters: std.enums.EnumSet(enum {
+ stdin,
+ stdout,
+ stderr,
+ }) = .{},
has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
+ is_sync: bool = false,
+
+ pub fn hasExited(this: *const Subprocess) bool {
+ return this.exit_code != null or this.waitpid_err != null;
+ }
+
+ pub fn updateHasPendingActivityFlag(this: *Subprocess) void {
+ @fence(.SeqCst);
+ this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst);
+ }
pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
@fence(.Acquire);
@@ -78,7 +91,7 @@ pub const Subprocess = struct {
}
const Readable = union(enum) {
- fd: JSC.Node.FileDescriptor,
+ fd: bun.FileDescriptor,
pipe: Pipe,
inherit: void,
@@ -102,44 +115,37 @@ pub const Subprocess = struct {
return;
}
- if (this.buffer.fd != JSC.Node.invalid_fd) {
- this.buffer.close();
- }
+ this.buffer.close();
}
pub fn toJS(this: *@This(), readable: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue {
- if (this.* == .stream) {
- if (this.stream.ptr == .File) {
- this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable);
- }
- return this.stream.toJS();
+ if (this.* != .stream) {
+ const stream = this.buffer.toReadableStream(globalThis, exited);
+ this.* = .{ .stream = stream };
}
- const is_fifo = this.buffer.is_fifo;
- const stream = this.buffer.toReadableStream(globalThis, exited);
- this.* = .{ .stream = stream };
+
if (this.stream.ptr == .File) {
- this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable);
- this.stream.ptr.File.is_fifo = is_fifo;
+ this.stream.ptr.File.setSignal(JSC.WebCore.Signal.init(readable));
}
- return stream.value;
+
+ return this.stream.toJS();
}
};
- pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable {
+ pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
+ _ = JSC.Node.Syscall.close(other_fd);
break :brk .{
.pipe = .{
- .buffer = BufferedOutput{
- .fd = fd,
- .is_fifo = true,
- },
+ .buffer = undefined,
},
};
},
- .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) },
+ .path => Readable{ .ignore = {} },
+ .blob, .fd => Readable{ .fd = @intCast(bun.FileDescriptor, fd) },
else => unreachable,
};
}
@@ -159,7 +165,7 @@ pub const Subprocess = struct {
},
.pipe => {
if (this.pipe == .stream and this.pipe.stream.ptr == .File)
- this.pipe.stream.ptr.File.signal.clear();
+ this.pipe.stream.ptr.File.readable().FIFO.signal.clear();
this.pipe.done();
},
else => {},
@@ -190,10 +196,8 @@ pub const Subprocess = struct {
.pipe => {
defer this.close();
- if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) {
- if (this.pipe.buffer.canRead())
- this.pipe.buffer.readIfPossible(true);
- }
+ if (this.pipe.buffer.canRead())
+ this.pipe.buffer.readAll();
var bytes = this.pipe.buffer.internal_buffer.slice();
this.pipe.buffer.internal_buffer = .{};
@@ -216,6 +220,7 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
+ this.observable_getters.insert(.stderr);
return this.stderr.toJS(globalThis, this.exit_code != null);
}
@@ -223,6 +228,7 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
+ this.observable_getters.insert(.stdin);
return this.stdin.toJS(globalThis);
}
@@ -230,6 +236,7 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
+ this.observable_getters.insert(.stdout);
return this.stdout.toJS(globalThis, this.exit_code != null);
}
@@ -291,41 +298,22 @@ pub const Subprocess = struct {
return .{ .result = {} };
}
- pub fn onKill(
- this: *Subprocess,
- ) void {
- if (this.killed) {
- return;
- }
-
- this.killed = true;
- this.closePorts();
+ fn hasCalledGetter(this: *Subprocess, comptime getter: @Type(.EnumLiteral)) bool {
+ return this.observable_getters.contains(getter);
}
- pub fn closePorts(this: *Subprocess) void {
- const pidfd = this.pidfd;
-
- if (comptime Environment.isLinux) {
- this.pidfd = std.math.maxInt(std.os.fd_t);
+ fn closeProcess(this: *Subprocess) void {
+ if (comptime !Environment.isLinux) {
+ return;
}
- defer {
- if (comptime Environment.isLinux) {
- if (pidfd != std.math.maxInt(std.os.fd_t)) {
- _ = std.os.close(pidfd);
- }
- }
- }
+ const pidfd = this.pidfd;
- if (this.stdout == .pipe) {
- this.stdout.pipe.finish();
- }
+ this.pidfd = std.math.maxInt(std.os.fd_t);
- if (this.stderr == .pipe) {
- this.stderr.pipe.finish();
+ if (pidfd != std.math.maxInt(std.os.fd_t)) {
+ _ = std.os.close(pidfd);
}
-
- this.stdin.close();
}
pub fn doRef(this: *Subprocess, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSValue {
@@ -354,7 +342,7 @@ pub const Subprocess = struct {
pub const BufferedInput = struct {
remain: []const u8 = "",
- fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
+ fd: bun.FileDescriptor = bun.invalid_fd,
poll_ref: ?*JSC.FilePoll = null,
written: usize = 0,
@@ -366,6 +354,10 @@ pub const Subprocess = struct {
pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady);
pub fn onReady(this: *BufferedInput, _: i64) void {
+ if (this.fd == bun.invalid_fd) {
+ return;
+ }
+
this.write();
}
@@ -395,10 +387,14 @@ pub const Subprocess = struct {
}
}
- this.write();
+ this.writeAllowBlocking(is_sync);
}
pub fn write(this: *BufferedInput) void {
+ this.writeAllowBlocking(false);
+ }
+
+ pub fn writeAllowBlocking(this: *BufferedInput, allow_blocking: bool) void {
var to_write = this.remain;
if (to_write.len == 0) {
@@ -450,7 +446,7 @@ pub const Subprocess = struct {
to_write = to_write[bytes_written..];
// we are done or it accepts no more input
- if (this.remain.len == 0 or bytes_written == 0) {
+ if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) {
this.deinit();
return;
}
@@ -465,9 +461,9 @@ pub const Subprocess = struct {
poll.deinit();
}
- if (this.fd != JSC.Node.invalid_fd) {
+ if (this.fd != bun.invalid_fd) {
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = JSC.Node.invalid_fd;
+ this.fd = bun.invalid_fd;
}
}
@@ -487,186 +483,137 @@ pub const Subprocess = struct {
pub const BufferedOutput = struct {
internal_buffer: bun.ByteList = .{},
- max_internal_buffer: u32 = default_max_buffer_size,
- fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
- received_eof: bool = false,
- pending_error: ?JSC.Node.Syscall.Error = null,
- poll_ref: ?*JSC.FilePoll = null,
- is_fifo: bool = false,
+ fifo: JSC.WebCore.FIFO = undefined,
+ auto_sizer: JSC.WebCore.AutoSizer = undefined,
+ status: Status = .{
+ .pending = {},
+ },
- pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready);
+ pub const Status = union(enum) {
+ pending: void,
+ done: void,
+ err: JSC.Node.Syscall.Error,
+ };
- pub fn ready(this: *BufferedOutput, available_to_read: i64) void {
- if (comptime Environment.isMac) {
- if (this.poll_ref) |poll| {
- if (available_to_read > 0) {
- poll.flags.insert(.readable);
- } else {
- poll.flags.remove(.readable);
- }
- }
- }
+ pub fn init(fd: bun.FileDescriptor) BufferedOutput {
+ return BufferedOutput{
+ .internal_buffer = .{},
+ .fifo = JSC.WebCore.FIFO{
+ .fd = fd,
+ },
+ };
+ }
- // TODO: what happens if the task was already enqueued after unwatch()?
- this.readAll(false);
+ pub fn setup(this: *BufferedOutput, allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) void {
+ this.* = init(fd);
+ this.auto_sizer = .{
+ .max = max_size,
+ .allocator = allocator,
+ .buffer = &this.internal_buffer,
+ };
+ this.watch();
}
pub fn canRead(this: *BufferedOutput) bool {
- const is_readable = bun.isReadable(this.fd);
-
- if (is_readable) {
- if (this.poll_ref) |poll_ref| {
- poll_ref.flags.insert(.readable);
- poll_ref.flags.insert(.fifo);
- std.debug.assert(poll_ref.flags.contains(.poll_readable));
- }
- }
-
- return is_readable;
+ return bun.isReadable(this.fifo.fd);
}
- pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void {
- if (comptime !force) {
- // we ask, "Is it possible to read right now?"
- // we do this rather than epoll or kqueue()
- // because we don't want to block the thread waiting for the read
- // and because kqueue or epoll might return other unrelated events
- // and we don't want this to become an event loop ticking point
- if (!this.canRead()) {
- this.watch(this.fd);
+ pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void {
+ switch (result) {
+ .pending => {
+ this.watch();
return;
- }
- }
-
- this.readAll(force);
- }
-
- pub fn closeOnEOF(this: *BufferedOutput) bool {
- var poll = this.poll_ref orelse return true;
- poll.flags.insert(.eof);
- return false;
- }
-
- pub fn readAll(this: *BufferedOutput, comptime force: bool) void {
- if (this.poll_ref) |poll| {
- const is_readable = poll.isReadable();
- if (!is_readable and poll.isEOF()) {
- if (poll.isHUP()) {
- this.autoCloseFileDescriptor();
- }
+ },
+ .err => |err| {
+ this.status = .{ .err = err };
+ this.fifo.close();
return;
- } else if (!is_readable and poll.isHUP()) {
- this.autoCloseFileDescriptor();
- return;
- } else if (!is_readable) {
+ },
+ .done => {
+ this.status = .{ .done = {} };
+ this.fifo.close();
return;
- }
+ },
+ else => {
+ const slice = result.slice();
+ this.internal_buffer.len += @truncate(u32, slice.len);
+ if (slice.len > 0)
+ std.debug.assert(this.internal_buffer.contains(slice));
+
+ if (result.isDone()) {
+ this.status = .{ .done = {} };
+ this.fifo.close();
+ }
+ },
}
+ }
- // read as much as we can from the pipe
- while (this.internal_buffer.len < this.max_internal_buffer) {
- var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined;
-
- var buf: []u8 = buffer_[0..];
-
- var available = this.internal_buffer.ptr[this.internal_buffer.len..this.internal_buffer.cap];
- if (available.len >= buf.len) {
- buf = available;
+ pub fn readAll(this: *BufferedOutput) void {
+ while (@as(usize, this.internal_buffer.len) < this.auto_sizer.max and this.status == .pending) {
+ var stack_buffer: [8096]u8 = undefined;
+ var stack_buf: []u8 = stack_buffer[0..];
+ var buf_to_use = stack_buf;
+ var available = this.internal_buffer.available();
+ if (available.len >= stack_buf.len) {
+ buf_to_use = available;
}
- switch (JSC.Node.Syscall.read(this.fd, buf)) {
- .err => |e| {
- if (e.isRetry()) {
- if (!this.isWatching() and this.isFIFO())
- this.watch(this.fd);
- this.poll_ref.?.flags.insert(.fifo);
- return;
- }
-
- if (comptime Environment.isMac) {
- // INTR is returned on macOS when the process is killed
- // It probably sent SIGPIPE but we have the handler for
- // that disabled.
- // We know it's the "real" INTR because we use read$NOCANCEL
- if (e.getErrno() == .INTR) {
- this.received_eof = true;
- this.autoCloseFileDescriptor();
- return;
- }
- } else {
- if (comptime Environment.allow_assert) {
- std.debug.assert(e.getErrno() != .INTR); // Bun's read() function should retry on EINTR
- }
- }
+ const result = this.fifo.read(buf_to_use, this.fifo.to_read);
- // fail
- log("readAll() fail: {s}", .{@tagName(e.getErrno())});
- this.pending_error = e;
- this.internal_buffer.listManaged(bun.default_allocator).deinit();
- this.internal_buffer = .{};
+ switch (result) {
+ .pending => {
+ this.watch();
return;
},
+ .err => |err| {
+ this.status = .{ .err = err };
+ this.fifo.close();
- .result => |bytes_read| {
- log("readAll() {d}", .{bytes_read});
-
- if (bytes_read > 0) {
- if (buf.ptr == available.ptr) {
- this.internal_buffer.len += @truncate(u32, bytes_read);
- } else {
- _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory");
- }
- }
-
- if (comptime !force) {
- if (buf[bytes_read..].len > 0 or !this.canRead()) {
- if (!this.isWatching())
- this.watch(this.fd);
- if (this.is_fifo)
- this.poll_ref.?.flags.insert(.fifo)
- else
- this.received_eof = true;
- return;
- }
+ return;
+ },
+ .done => {
+ this.status = .{ .done = {} };
+ this.fifo.close();
+ return;
+ },
+ .read => |slice| {
+ if (slice.ptr == stack_buf.ptr) {
+ this.internal_buffer.append(this.auto_sizer.allocator, slice) catch @panic("out of memory");
} else {
- // we consider a short read as being EOF
- this.received_eof = !this.is_fifo and this.received_eof or bytes_read < buf.len;
- if (this.received_eof) {
- if (this.closeOnEOF()) {
- this.autoCloseFileDescriptor();
- }
+ this.internal_buffer.len += @truncate(u32, slice.len);
+ }
- // do not auto-close the file descriptor here
- // it's totally legit to have a short read
- return;
- }
+ if (slice.len < buf_to_use.len) {
+ this.watch();
+ return;
}
},
}
}
}
+ fn watch(this: *BufferedOutput) void {
+ this.fifo.pending.set(BufferedOutput, this, onRead);
+ if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd);
+ return;
+ }
+
pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob {
const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis);
this.internal_buffer = bun.ByteList.init("");
- std.debug.assert(this.fd == JSC.Node.invalid_fd);
- std.debug.assert(this.received_eof);
return blob;
}
pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream {
if (exited) {
// exited + received EOF => no more read()
- if (this.received_eof) {
- var poll_ref = this.poll_ref;
- this.poll_ref = null;
-
- this.autoCloseFileDescriptor();
-
+ if (this.fifo.isClosed()) {
// also no data at all
if (this.internal_buffer.len == 0) {
- this.close();
+ if (this.internal_buffer.cap > 0) {
+ this.internal_buffer.deinitWithAllocator(this.auto_sizer.allocator);
+ }
// so we return an empty stream
return JSC.WebCore.ReadableStream.fromJS(
JSC.WebCore.ReadableStream.empty(globalThis),
@@ -675,70 +622,52 @@ pub const Subprocess = struct {
}
return JSC.WebCore.ReadableStream.fromJS(
- JSC.WebCore.ReadableStream.fromBlobWithPoll(
+ JSC.WebCore.ReadableStream.fromBlob(
globalThis,
&this.toBlob(globalThis),
0,
- poll_ref,
),
globalThis,
).?;
+ } else {
+ this.fifo.close_on_empty_read = true;
}
}
- std.debug.assert(this.fd != JSC.Node.invalid_fd);
{
- var poll_ref = this.poll_ref;
- this.poll_ref = null;
+ const internal_buffer = this.internal_buffer;
+ this.internal_buffer = bun.ByteList.init("");
// There could still be data waiting to be read in the pipe
// so we need to create a new stream that will read from the
// pipe and then return the blob.
- var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis);
const result = JSC.WebCore.ReadableStream.fromJS(
- JSC.WebCore.ReadableStream.fromBlobWithPoll(
+ JSC.WebCore.ReadableStream.fromFIFO(
globalThis,
- &blob,
- 0,
- poll_ref,
+ &this.fifo,
+ internal_buffer,
),
globalThis,
).?;
- blob.detach();
- result.ptr.File.buffered_data = this.internal_buffer;
- result.ptr.File.stored_global_this_ = globalThis;
- result.ptr.File.finished = exited;
- this.internal_buffer = bun.ByteList.init("");
- this.fd = JSC.Node.invalid_fd;
- this.received_eof = false;
return result;
}
}
- pub fn autoCloseFileDescriptor(this: *BufferedOutput) void {
- const fd = this.fd;
- if (fd == JSC.Node.invalid_fd)
- return;
- this.fd = JSC.Node.invalid_fd;
-
- if (this.poll_ref) |poll| {
- this.poll_ref = null;
- poll.deinit();
- }
-
- _ = JSC.Node.Syscall.close(fd);
- }
-
pub fn close(this: *BufferedOutput) void {
- this.autoCloseFileDescriptor();
+ switch (this.status) {
+ .done => {},
+ .pending => {
+ this.fifo.close();
+ this.status = .{ .done = {} };
+ },
+ .err => {},
+ }
if (this.internal_buffer.cap > 0) {
this.internal_buffer.listManaged(bun.default_allocator).deinit();
this.internal_buffer = .{};
}
-
- this.received_eof = true;
}
};
@@ -748,7 +677,7 @@ pub const Subprocess = struct {
pipe: *JSC.WebCore.FileSink,
readable_stream: JSC.WebCore.ReadableStream,
},
- fd: JSC.Node.FileDescriptor,
+ fd: bun.FileDescriptor,
buffered_input: BufferedInput,
inherit: void,
ignore: void,
@@ -763,7 +692,7 @@ pub const Subprocess = struct {
pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
- pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
+ pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
switch (stdio) {
.pipe => {
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
@@ -771,7 +700,9 @@ pub const Subprocess = struct {
.fd = fd,
.buffer = bun.ByteList.init(&.{}),
.allocator = globalThis.bunVM().allocator,
+ .auto_close = true,
};
+ if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
sink.mode = std.os.S.IFIFO;
if (stdio == .pipe) {
if (stdio.pipe) |readable| {
@@ -787,6 +718,7 @@ pub const Subprocess = struct {
return Writable{ .pipe = sink };
},
.array_buffer, .blob => {
+ if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined };
switch (stdio) {
.array_buffer => |array_buffer| {
@@ -800,7 +732,7 @@ pub const Subprocess = struct {
return Writable{ .buffered_input = buffered_input };
},
.fd => {
- return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) };
+ return Writable{ .fd = @intCast(bun.FileDescriptor, fd) };
},
.inherit => {
return Writable{ .inherit = {} };
@@ -842,15 +774,21 @@ pub const Subprocess = struct {
}
};
- pub fn finalize(this: *Subprocess) callconv(.C) void {
- this.unref();
- this.closePorts();
- this.stdout.close();
+ pub fn finalizeSync(this: *Subprocess) void {
+ this.closeProcess();
+ this.stdin.close();
this.stderr.close();
+ this.stdin.close();
- this.finalized = true;
- bun.default_allocator.destroy(this);
+ this.exit_promise.deinit();
+ this.on_exit_callback.deinit();
+ }
+
+ pub fn finalize(this: *Subprocess) callconv(.C) void {
+ std.debug.assert(!this.hasPendingActivity());
+ this.finalizeSync();
log("Finalize", .{});
+ bun.default_allocator.destroy(this);
}
pub fn getExited(
@@ -1203,18 +1141,27 @@ pub const Subprocess = struct {
.globalThis = globalThis,
.pid = pid,
.pidfd = pidfd,
- .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch {
+ .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch {
globalThis.throw("out of memory", .{});
return .zero;
},
- .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis),
- .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis),
+ .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis),
+ .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
+ .is_sync = is_sync,
};
if (subprocess.stdin == .pipe) {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
}
+ if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
+ subprocess.stdout.pipe.buffer.setup(jsc_vm.allocator, stdout_pipe[0], default_max_buffer_size);
+ }
+
+ if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
+ subprocess.stderr.pipe.buffer.setup(jsc_vm.allocator, stderr_pipe[0], default_max_buffer_size);
+ }
+
const out = if (comptime !is_sync)
subprocess.toJS(globalThis)
else
@@ -1223,7 +1170,7 @@ pub const Subprocess = struct {
if (comptime !is_sync) {
var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess);
subprocess.poll_ref = poll;
- switch (poll.register(
+ switch (subprocess.poll_ref.?.register(
jsc_vm.uws_event_loop.?,
.process,
true,
@@ -1252,20 +1199,20 @@ pub const Subprocess = struct {
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
if (comptime is_sync) {
if (subprocess.stdout.pipe.buffer.canRead()) {
- subprocess.stdout.pipe.buffer.readAll(true);
+ subprocess.stdout.pipe.buffer.readAll();
}
} else if (!lazy) {
- subprocess.stdout.pipe.buffer.readIfPossible(false);
+ subprocess.stdout.pipe.buffer.readAll();
}
}
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
if (comptime is_sync) {
if (subprocess.stderr.pipe.buffer.canRead()) {
- subprocess.stderr.pipe.buffer.readAll(true);
+ subprocess.stderr.pipe.buffer.readAll();
}
} else if (!lazy) {
- subprocess.stderr.pipe.buffer.readIfPossible(false);
+ subprocess.stderr.pipe.buffer.readAll();
}
}
@@ -1273,11 +1220,43 @@ pub const Subprocess = struct {
return out;
}
- subprocess.wait(true);
+ if (subprocess.stdin == .buffered_input) {
+ while (subprocess.stdin.buffered_input.remain.len > 0) {
+ subprocess.stdin.buffered_input.writeIfPossible(true);
+ }
+ }
+
+ {
+ var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess);
+ subprocess.poll_ref = poll;
+ switch (subprocess.poll_ref.?.register(
+ jsc_vm.uws_event_loop.?,
+ .process,
+ true,
+ )) {
+ .result => {},
+ .err => |err| {
+ if (err.getErrno() == .SRCH) {
+ @panic("This shouldn't happen");
+ }
+
+ // process has already exited
+ // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
+ subprocess.onExitNotification();
+ },
+ }
+ }
+
+ while (!subprocess.hasExited()) {
+ jsc_vm.tick();
+ jsc_vm.eventLoop().autoTick();
+ }
+
+ // subprocess.wait(true);
const exitCode = subprocess.exit_code orelse 1;
const stdout = subprocess.stdout.toBufferedValue(globalThis);
const stderr = subprocess.stderr.toBufferedValue(globalThis);
- subprocess.finalize();
+ subprocess.finalizeSync();
const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4);
sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode)));
@@ -1290,16 +1269,22 @@ pub const Subprocess = struct {
pub fn onExitNotification(
this: *Subprocess,
) void {
- this.wait(false);
+ this.wait(this.is_sync);
}
pub fn wait(this: *Subprocess, sync: bool) void {
if (this.has_waitpid_task) {
return;
}
-
+ defer this.updateHasPendingActivityFlag();
this.has_waitpid_task = true;
const pid = this.pid;
+
+ if (!sync) {
+ // signal to the other end we are definitely done
+ this.stdin.close();
+ }
+
switch (PosixSpawn.waitpid(pid, 0)) {
.err => |err| {
this.waitpid_err = err;
@@ -1331,14 +1316,16 @@ pub const Subprocess = struct {
poll.deinitWithVM(vm);
}
- this.onExit();
+ this.onExit(this.globalThis);
}
}
- fn onExit(this: *Subprocess) void {
- defer this.updateHasPendingActivity();
- this.closePorts();
+ fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void {
+ this.stdin.close();
+ this.stdout.close();
+ this.stderr.close();
+ defer this.updateHasPendingActivity();
this.has_waitpid_task = false;
if (this.on_exit_callback.trySwap()) |callback| {
@@ -1349,7 +1336,7 @@ pub const Subprocess = struct {
const waitpid_value: JSValue =
if (this.waitpid_err) |err|
- err.toJSC(this.globalThis)
+ err.toJSC(globalThis)
else
JSC.JSValue.jsUndefined();
@@ -1359,21 +1346,21 @@ pub const Subprocess = struct {
};
const result = callback.call(
- this.globalThis,
+ globalThis,
args[0 .. @as(usize, @boolToInt(this.exit_code != null)) + @as(usize, @boolToInt(this.waitpid_err != null))],
);
- if (result.isAnyError(this.globalThis)) {
- this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
+ if (result.isAnyError(globalThis)) {
+ globalThis.bunVM().onUnhandledError(globalThis, result);
}
}
if (this.exit_promise.trySwap()) |promise| {
if (this.exit_code) |code| {
- promise.asPromise().?.resolve(this.globalThis, JSValue.jsNumber(code));
+ promise.asPromise().?.resolve(globalThis, JSValue.jsNumber(code));
} else if (this.waitpid_err) |err| {
this.waitpid_err = null;
- promise.asPromise().?.reject(this.globalThis, err.toJSC(this.globalThis));
+ promise.asPromise().?.reject(globalThis, err.toJSC(globalThis));
} else {
// crash in debug mode
if (comptime Environment.allow_assert)
@@ -1395,7 +1382,7 @@ pub const Subprocess = struct {
const Stdio = union(enum) {
inherit: void,
ignore: void,
- fd: JSC.Node.FileDescriptor,
+ fd: bun.FileDescriptor,
path: JSC.Node.PathLike,
blob: JSC.WebCore.AnyBlob,
pipe: ?JSC.WebCore.ReadableStream,
@@ -1454,7 +1441,7 @@ pub const Subprocess = struct {
if (blob.needsToReadFile()) {
if (blob.store()) |store| {
if (store.data.file.pathlike == .fd) {
- if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) {
+ if (store.data.file.pathlike.fd == @intCast(bun.FileDescriptor, i)) {
stdio_array[i] = Stdio{ .inherit = {} };
} else {
switch (@intCast(std.os.fd_t, i)) {
@@ -1520,7 +1507,7 @@ pub const Subprocess = struct {
return false;
}
- const fd = @intCast(JSC.Node.FileDescriptor, fd_);
+ const fd = @intCast(bun.FileDescriptor, fd_);
switch (@intCast(std.os.fd_t, i)) {
std.os.STDIN_FILENO => {
diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig
index 91248dc5c..e99e49f07 100644
--- a/src/bun.js/api/html_rewriter.zig
+++ b/src/bun.js/api/html_rewriter.zig
@@ -817,8 +817,7 @@ fn HandlerCallback(
) (fn (*HandlerType, *LOLHTMLType) bool) {
return struct {
pub fn callback(this: *HandlerType, value: *LOLHTMLType) bool {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
var zig_element = bun.default_allocator.create(ZigType) catch unreachable;
@field(zig_element, field_name) = value;
// At the end of this scope, the value is no longer valid
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index c4724b1b8..531d4830b 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1428,10 +1428,17 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
stream.value.ensureStillAlive();
- if (!stream.isLocked(this.server.globalThis)) {
- streamLog("is not locked", .{});
- this.renderMissing();
- return;
+ const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and
+ response_stream.sink.buffer.len == 0);
+
+ if (!stream.isLocked(this.server.globalThis) and !is_in_progress) {
+ if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| {
+ if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) {
+ streamLog("is not locked", .{});
+ this.renderMissing();
+ return;
+ }
+ }
}
this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index e456a7113..981c0d2e4 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -3931,7 +3931,7 @@ pub const PollRef = struct {
/// Make calling ref() on this poll into a no-op.
pub fn disable(this: *PollRef) void {
- this.unref();
+ this.unref(JSC.VirtualMachine.vm);
this.status = .done;
}
@@ -3987,6 +3987,7 @@ pub const FilePoll = struct {
const FileReader = JSC.WebCore.FileReader;
const FileSink = JSC.WebCore.FileSink;
+ const FIFO = JSC.WebCore.FIFO;
const Subprocess = JSC.Subprocess;
const BufferedInput = Subprocess.BufferedInput;
const BufferedOutput = Subprocess.BufferedOutput;
@@ -3999,7 +4000,7 @@ pub const FilePoll = struct {
FileSink,
Subprocess,
BufferedInput,
- BufferedOutput,
+ FIFO,
Deactivated,
});
@@ -4009,6 +4010,7 @@ pub const FilePoll = struct {
flags.remove(.writable);
flags.remove(.process);
flags.remove(.eof);
+ flags.remove(.hup);
flags.setUnion(updated);
poll.flags = flags;
@@ -4079,9 +4081,9 @@ pub const FilePoll = struct {
}
var ptr = poll.owner;
switch (ptr.tag()) {
- @field(Owner.Tag, "FileReader") => {
- log("onUpdate: FileReader", .{});
- ptr.as(FileReader).onPoll(size_or_offset, 0);
+ @field(Owner.Tag, "FIFO") => {
+ log("onUpdate: FIFO", .{});
+ ptr.as(FIFO).ready(size_or_offset);
},
@field(Owner.Tag, "Subprocess") => {
log("onUpdate: Subprocess", .{});
@@ -4095,17 +4097,9 @@ pub const FilePoll = struct {
loader.onPoll(size_or_offset, 0);
},
- @field(Owner.Tag, "BufferedInput") => {
- log("onUpdate: BufferedInput", .{});
- var loader = ptr.as(JSC.Subprocess.BufferedInput);
- loader.onReady(size_or_offset);
+ else => {
+ log("onUpdate: disconnected?", .{});
},
- @field(Owner.Tag, "BufferedOutput") => {
- log("onUpdate: BufferedOutput", .{});
- var loader = ptr.as(JSC.Subprocess.BufferedOutput);
- loader.ready(size_or_offset);
- },
- else => {},
}
}
@@ -4155,15 +4149,20 @@ pub const FilePoll = struct {
var flags = Flags.Set{};
if (kqueue_event.filter == std.os.system.EVFILT_READ) {
flags.insert(Flags.readable);
+ log("readable", .{});
if (kqueue_event.flags & std.os.system.EV_EOF != 0) {
- flags.insert(Flags.eof);
+ flags.insert(Flags.hup);
+ log("hup", .{});
}
} else if (kqueue_event.filter == std.os.system.EVFILT_WRITE) {
flags.insert(Flags.writable);
+ log("writable", .{});
if (kqueue_event.flags & std.os.system.EV_EOF != 0) {
flags.insert(Flags.hup);
+ log("hup", .{});
}
} else if (kqueue_event.filter == std.os.system.EVFILT_PROC) {
+ log("proc", .{});
flags.insert(Flags.process);
}
return flags;
@@ -4245,17 +4244,15 @@ pub const FilePoll = struct {
this.flags.insert(.has_incremented_poll_count);
}
- pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll {
+ pub fn init(vm: *JSC.VirtualMachine, fd: bun.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll {
return initWithOwner(vm, fd, flags, Owner.init(owner));
}
- pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll {
+ pub fn initWithOwner(vm: *JSC.VirtualMachine, fd: bun.FileDescriptor, flags: Flags.Struct, owner: Owner) *FilePoll {
var poll = vm.rareData().filePolls(vm).get();
- poll.* = .{
- .fd = @intCast(u32, fd),
- .flags = Flags.Set.init(flags),
- .owner = owner,
- };
+ poll.fd = @intCast(u32, fd);
+ poll.flags = Flags.Set.init(flags);
+ poll.owner = owner;
return poll;
}
@@ -4350,7 +4347,7 @@ pub const FilePoll = struct {
}
} else if (comptime Environment.isMac) {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
- const one_shot_flag: @TypeOf(changelist[0].flags) = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT;
+ const one_shot_flag: u16 = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT;
changelist[0] = switch (flag) {
.readable => .{
.ident = @intCast(u64, fd),
@@ -4410,7 +4407,7 @@ pub const FilePoll = struct {
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
// with EV_ERROR set in flags and the system error in data.
- if (changelist[0].flags == std.c.EV_ERROR) {
+ if (changelist[0].flags == std.c.EV_ERROR and changelist[0].data != 0) {
return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
// Otherwise, -1 will be returned, and errno will be set to
// indicate the error condition.
@@ -4440,7 +4437,7 @@ pub const FilePoll = struct {
return JSC.Maybe(void).success;
}
- pub const invalid_fd = JSC.Node.invalid_fd;
+ const invalid_fd = bun.invalid_fd;
pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) {
if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) {
diff --git a/src/bun.js/bindings/URLSearchParams.cpp b/src/bun.js/bindings/URLSearchParams.cpp
index 0ce9554c7..9c168b4f6 100644
--- a/src/bun.js/bindings/URLSearchParams.cpp
+++ b/src/bun.js/bindings/URLSearchParams.cpp
@@ -96,9 +96,11 @@ void URLSearchParams::set(const String& name, const String& value)
return false;
});
updateURL();
+ needsSorting = true;
return;
}
m_pairs.append({ name, value });
+ needsSorting = true;
updateURL();
}
@@ -106,6 +108,7 @@ void URLSearchParams::append(const String& name, const String& value)
{
m_pairs.append({ name, value });
updateURL();
+ needsSorting = true;
}
Vector<String> URLSearchParams::getAll(const String& name) const
@@ -122,10 +125,12 @@ Vector<String> URLSearchParams::getAll(const String& name) const
void URLSearchParams::remove(const String& name)
{
- m_pairs.removeAllMatching([&](const auto& pair) {
- return pair.key == name;
- });
+ if (!m_pairs.removeAllMatching([&](const auto& pair) {
+ return pair.key == name;
+ }))
+ return;
updateURL();
+ needsSorting = true;
}
String URLSearchParams::toString() const
diff --git a/src/bun.js/bindings/URLSearchParams.h b/src/bun.js/bindings/URLSearchParams.h
index 3749ed9f9..486098adc 100644
--- a/src/bun.js/bindings/URLSearchParams.h
+++ b/src/bun.js/bindings/URLSearchParams.h
@@ -72,6 +72,7 @@ private:
WeakPtr<DOMURL> m_associatedURL;
Vector<KeyValuePair<String, String>> m_pairs;
+ bool needsSorting { true };
};
} // namespace WebCore
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index 2ea4bc65a..5d7c49292 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -2382,6 +2382,7 @@ void GlobalObject::finishCreation(VM& vm)
auto& global = *reinterpret_cast<Zig::GlobalObject*>(init.owner);
if (global.crypto == nullptr) {
global.crypto = WebCore::SubtleCrypto::createPtr(global.scriptExecutionContext());
+ global.crypto->ref();
}
init.set(
diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp
index 977f1ca26..8c0df075e 100644
--- a/src/bun.js/bindings/bindings.cpp
+++ b/src/bun.js/bindings/bindings.cpp
@@ -2277,7 +2277,7 @@ static void populateStackFramePosition(const JSC::StackFrame* stackFrame, ZigStr
// Make sure the range is valid
WTF::StringView sourceString = m_codeBlock->source().provider()->source();
- if (!expressionStop || expressionStart > static_cast<int>(sourceString.length())) {
+ if (expressionStop < 1 || expressionStart > static_cast<int>(sourceString.length())) {
return;
}
diff --git a/src/bun.js/bindings/headers-cpp.h b/src/bun.js/bindings/headers-cpp.h
index 05368907e..e5f4cb8f8 100644
--- a/src/bun.js/bindings/headers-cpp.h
+++ b/src/bun.js/bindings/headers-cpp.h
@@ -1,4 +1,4 @@
-//-- AUTOGENERATED FILE -- 1668983536
+//-- AUTOGENERATED FILE -- 1669191472
// clang-format off
#pragma once
diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h
index 99b046b59..a9d106029 100644
--- a/src/bun.js/bindings/headers.h
+++ b/src/bun.js/bindings/headers.h
@@ -1,5 +1,5 @@
// clang-format off
-//-- AUTOGENERATED FILE -- 1668983536
+//-- AUTOGENERATED FILE -- 1669191472
#pragma once
#include <stddef.h>
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
index 90b9eeada..f2a3d5a55 100644
--- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
+++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
@@ -1039,7 +1039,7 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =
"\n" \
" if (highWaterMark) {\n" \
" sink.start({\n" \
- " highWaterMark,\n" \
+ " highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,\n" \
" });\n" \
" }\n" \
"\n" \
@@ -2270,7 +2270,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo
const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
-const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2614;
+const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3647;
static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsLazyLoadStreamCode =
"(function (stream, autoAllocateChunkSize) {\n" \
@@ -2280,7 +2280,7 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" var nativePtr = @getByIdDirectPrivate(stream, \"bunNativePtr\");\n" \
" var Prototype = @lazyStreamPrototypeMap.@get(nativeType);\n" \
" if (Prototype === @undefined) {\n" \
- " var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);\n" \
+ " var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);\n" \
" var closer = [false];\n" \
" var handleResult;\n" \
" function handleNativeReadableStreamPromiseResult(val) {\n" \
@@ -2293,8 +2293,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
"\n" \
" handleResult = function handleResult(result, controller, view) {\n" \
" \"use strict\";\n" \
- "\n" \
- " \n" \
" if (result && @isPromise(result)) {\n" \
" return result.then(\n" \
" handleNativeReadableStreamPromiseResult.bind({\n" \
@@ -2319,53 +2317,96 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" }\n" \
" };\n" \
"\n" \
+ " function createResult(tag, controller, view, closer) {\n" \
+ " closer[0] = false;\n" \
+ "\n" \
+ " var result;\n" \
+ " try {\n" \
+ " result = pull(tag, view, closer);\n" \
+ " } catch (err) {\n" \
+ " return controller.error(err);\n" \
+ " }\n" \
+ "\n" \
+ " return handleResult(result, controller, view);\n" \
+ " }\n" \
+ "\n" \
" Prototype = class NativeReadableStreamSource {\n" \
- " constructor(tag, autoAllocateChunkSize) {\n" \
- " this.pull = this.pull_.bind(tag);\n" \
- " this.cancel = this.cancel_.bind(tag);\n" \
+ " constructor(tag, autoAllocateChunkSize, drainValue) {\n" \
+ " this.#tag = tag;\n" \
+ " this.pull = this.#pull.bind(this);\n" \
+ " this.cancel = this.#cancel.bind(this);\n" \
" this.autoAllocateChunkSize = autoAllocateChunkSize;\n" \
+ "\n" \
+ " if (drainValue !== @undefined) {\n" \
+ " this.start = (controller) => {\n" \
+ " controller.enqueue(drainValue);\n" \
+ " console.log(\"chunkSize\", chunkSize);\n" \
+ " };\n" \
+ " }\n" \
" }\n" \
"\n" \
" pull;\n" \
" cancel;\n" \
+ " start;\n" \
"\n" \
+ " #tag;\n" \
" type = \"bytes\";\n" \
" autoAllocateChunkSize = 0;\n" \
- "\n" \
+ " \n" \
" static startSync = start;\n" \
+ " \n" \
+ " \n" \
+ " #pull(controller) {\n" \
+ " var tag = this.#tag;\n" \
"\n" \
- " pull_(controller) {\n" \
- " closer[0] = false;\n" \
- "\n" \
- " var result;\n" \
- "\n" \
- " const view = controller.byobRequest.view;\n" \
- " try {\n" \
- " result = pull(this, view, closer);\n" \
- " } catch (err) {\n" \
- " return controller.error(err);\n" \
+ " if (!tag) {\n" \
+ " controller.close();\n" \
+ " return;\n" \
" }\n" \
"\n" \
- " return handleResult(result, controller, view);\n" \
+ " createResult(tag, controller, controller.byobRequest.view, closer);\n" \
" }\n" \
"\n" \
- " cancel_(reason) {\n" \
- " cancel(this, reason);\n" \
+ " #cancel(reason) {\n" \
+ " var tag = this.#tag;\n" \
+ " setRefOrUnref && setRefOrUnref(tag, false);\n" \
+ " cancel(tag, reason);\n" \
" }\n" \
" static deinit = deinit;\n" \
" static registry = new FinalizationRegistry(deinit);\n" \
+ " static drain = drain;\n" \
" };\n" \
" @lazyStreamPrototypeMap.@set(nativeType, Prototype);\n" \
" }\n" \
"\n" \
" const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);\n" \
+ " var drainValue;\n" \
+ " const drainFn = Prototype.drain;\n" \
+ " if (drainFn) {\n" \
+ " drainValue = drainFn(nativePtr);\n" \
+ " }\n" \
"\n" \
" //\n" \
" if (chunkSize === 0) {\n" \
- " @readableStreamClose(stream);\n" \
- " return null;\n" \
+ " if ((drainValue?.byteLength ?? 0) > 0) {\n" \
+ " deinit && nativePtr && @enqueueJob(deinit, nativePtr);\n" \
+ " return {\n" \
+ " start(controller) {\n" \
+ " controller.enqueue(drainValue);\n" \
+ " controller.close();\n" \
+ " },\n" \
+ " type: \"bytes\",\n" \
+ " };\n" \
+ " }\n" \
+ "\n" \
+ " return {\n" \
+ " start(controller) {\n" \
+ " controller.close();\n" \
+ " },\n" \
+ " type: \"bytes\",\n" \
+ " };\n" \
" }\n" \
- " var instance = new Prototype(nativePtr, chunkSize);\n" \
+ " var instance = new Prototype(nativePtr, chunkSize, drainValue);\n" \
" Prototype.registry.register(instance, nativePtr);\n" \
" return instance;\n" \
"})\n" \
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index e8c9667a0..def4d51a3 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -839,7 +839,7 @@ function readDirectStream(stream, sink, underlyingSource) {
if (highWaterMark) {
sink.start({
- highWaterMark,
+ highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,
});
}
@@ -1857,7 +1857,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
var Prototype = @lazyStreamPrototypeMap.@get(nativeType);
if (Prototype === @undefined) {
- var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
+ var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);
var closer = [false];
var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
@@ -1870,8 +1870,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
handleResult = function handleResult(result, controller, view) {
"use strict";
-
-
if (result && @isPromise(result)) {
return result.then(
handleNativeReadableStreamPromiseResult.bind({
@@ -1896,53 +1894,96 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
}
};
+ function createResult(tag, controller, view, closer) {
+ closer[0] = false;
+
+ var result;
+ try {
+ result = pull(tag, view, closer);
+ } catch (err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
Prototype = class NativeReadableStreamSource {
- constructor(tag, autoAllocateChunkSize) {
- this.pull = this.pull_.bind(tag);
- this.cancel = this.cancel_.bind(tag);
+ constructor(tag, autoAllocateChunkSize, drainValue) {
+ this.#tag = tag;
+ this.pull = this.#pull.bind(this);
+ this.cancel = this.#cancel.bind(this);
this.autoAllocateChunkSize = autoAllocateChunkSize;
+
+ if (drainValue !== @undefined) {
+ this.start = (controller) => {
+ controller.enqueue(drainValue);
+ console.log("chunkSize", chunkSize);
+ };
+ }
}
pull;
cancel;
+ start;
+ #tag;
type = "bytes";
autoAllocateChunkSize = 0;
-
+
static startSync = start;
+
+
+ #pull(controller) {
+ var tag = this.#tag;
- pull_(controller) {
- closer[0] = false;
-
- var result;
-
- const view = controller.byobRequest.view;
- try {
- result = pull(this, view, closer);
- } catch (err) {
- return controller.error(err);
+ if (!tag) {
+ controller.close();
+ return;
}
- return handleResult(result, controller, view);
+ createResult(tag, controller, controller.byobRequest.view, closer);
}
- cancel_(reason) {
- cancel(this, reason);
+ #cancel(reason) {
+ var tag = this.#tag;
+ setRefOrUnref && setRefOrUnref(tag, false);
+ cancel(tag, reason);
}
static deinit = deinit;
static registry = new FinalizationRegistry(deinit);
+ static drain = drain;
};
@lazyStreamPrototypeMap.@set(nativeType, Prototype);
}
const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+ var drainValue;
+ const drainFn = Prototype.drain;
+ if (drainFn) {
+ drainValue = drainFn(nativePtr);
+ }
// empty file, no need for native back-and-forth on this
if (chunkSize === 0) {
- @readableStreamClose(stream);
- return null;
+ if ((drainValue?.byteLength ?? 0) > 0) {
+ deinit && nativePtr && @enqueueJob(deinit, nativePtr);
+ return {
+ start(controller) {
+ controller.enqueue(drainValue);
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return {
+ start(controller) {
+ controller.close();
+ },
+ type: "bytes",
+ };
}
- var instance = new Prototype(nativePtr, chunkSize);
+ var instance = new Prototype(nativePtr, chunkSize, drainValue);
Prototype.registry.register(instance, nativePtr);
return instance;
}
diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js
index a9a1589dd..4819ebda0 100644
--- a/src/bun.js/child_process.exports.js
+++ b/src/bun.js/child_process.exports.js
@@ -1033,13 +1033,16 @@ export class ChildProcess extends EventEmitter {
const stdio = options.stdio || ["pipe", "pipe", "pipe"];
const bunStdio = getBunStdioFromOptions(stdio);
+ var env = options.envPairs || undefined;
+ if (env === process.env) env = undefined;
+
this.#handle = Bun.spawn({
cmd: spawnargs,
stdin: bunStdio[0],
stdout: bunStdio[1],
stderr: bunStdio[2],
cwd: options.cwd || undefined,
- env: options.envPairs || undefined,
+ env,
onExit: this.#handleOnExit.bind(this),
lazy: true,
});
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 28e12fb84..d83bd3575 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -132,7 +132,7 @@ pub fn IOTask(comptime Context: type) type {
pub fn deinit(this: *This) void {
var allocator = this.allocator;
this.ref.unref(this.event_loop.virtual_machine);
- this.* = undefined;
+
allocator.destroy(this);
}
};
diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig
index b5ba2b983..394e705db 100644
--- a/src/bun.js/node/node_fs.zig
+++ b/src/bun.js/node/node_fs.zig
@@ -23,7 +23,7 @@ const linux = os.linux;
const PathOrBuffer = JSC.Node.PathOrBuffer;
const PathLike = JSC.Node.PathLike;
const PathOrFileDescriptor = JSC.Node.PathOrFileDescriptor;
-const FileDescriptor = JSC.Node.FileDescriptor;
+const FileDescriptor = bun.FileDescriptor;
const DirIterator = @import("./dir_iterator.zig");
const Path = @import("../../resolver/resolve_path.zig");
const FileSystem = @import("../../fs.zig").FileSystem;
@@ -1823,7 +1823,7 @@ const Arguments = struct {
.file = undefined,
.global_object = ctx.ptr(),
};
- var fd: FileDescriptor = JSC.Node.invalid_fd;
+ var fd: FileDescriptor = bun.invalid_fd;
if (arguments.next()) |arg| {
arguments.eat();
@@ -1918,7 +1918,7 @@ const Arguments = struct {
}
}
- if (fd != JSC.Node.invalid_fd) {
+ if (fd != bun.invalid_fd) {
stream.file = .{ .fd = fd };
} else if (path) |path_| {
stream.file = .{ .path = path_ };
@@ -1957,7 +1957,7 @@ const Arguments = struct {
.file = undefined,
.global_object = ctx.ptr(),
};
- var fd: FileDescriptor = JSC.Node.invalid_fd;
+ var fd: FileDescriptor = bun.invalid_fd;
if (arguments.next()) |arg| {
arguments.eat();
@@ -2044,7 +2044,7 @@ const Arguments = struct {
}
}
- if (fd != JSC.Node.invalid_fd) {
+ if (fd != bun.invalid_fd) {
stream.file = .{ .fd = fd };
} else if (path) |path_| {
stream.file = .{ .path = path_ };
diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig
index e2d197073..1554186bc 100644
--- a/src/bun.js/node/syscall.zig
+++ b/src/bun.js/node/syscall.zig
@@ -11,12 +11,13 @@ const JSC = @import("../../jsc.zig");
const SystemError = JSC.SystemError;
const bun = @import("../../global.zig");
const MAX_PATH_BYTES = bun.MAX_PATH_BYTES;
-const fd_t = bun.FileDescriptorType;
+const fd_t = bun.FileDescriptor;
const C = @import("../../global.zig").C;
const linux = os.linux;
const Maybe = JSC.Maybe;
const log = bun.Output.scoped(.SYS, false);
+pub const syslog = log;
// On Linux AARCh64, zig is missing stat & lstat syscalls
const use_libc = (Environment.isLinux and Environment.isAarch64) or Environment.isMac;
@@ -124,7 +125,7 @@ pub fn getcwd(buf: *[bun.MAX_PATH_BYTES]u8) Maybe([]const u8) {
Result.errnoSys(0, .getcwd).?;
}
-pub fn fchmod(fd: JSC.Node.FileDescriptor, mode: JSC.Node.Mode) Maybe(void) {
+pub fn fchmod(fd: bun.FileDescriptor, mode: JSC.Node.Mode) Maybe(void) {
return Maybe(void).errnoSys(C.fchmod(fd, mode), .fchmod) orelse
Maybe(void).success;
}
@@ -146,7 +147,7 @@ pub fn lstat(path: [:0]const u8) Maybe(os.Stat) {
return Maybe(os.Stat){ .result = stat_ };
}
-pub fn fstat(fd: JSC.Node.FileDescriptor) Maybe(os.Stat) {
+pub fn fstat(fd: bun.FileDescriptor) Maybe(os.Stat) {
var stat_ = mem.zeroes(os.Stat);
if (Maybe(os.Stat).errnoSys(fstatSym(fd, &stat_), .fstat)) |err| return err;
return Maybe(os.Stat){ .result = stat_ };
@@ -162,7 +163,7 @@ pub fn mkdir(file_path: [:0]const u8, flags: JSC.Node.Mode) Maybe(void) {
}
}
-pub fn fcntl(fd: JSC.Node.FileDescriptor, cmd: i32, arg: usize) Maybe(usize) {
+pub fn fcntl(fd: bun.FileDescriptor, cmd: i32, arg: usize) Maybe(usize) {
const result = fcntl_symbol(fd, cmd, arg);
if (Maybe(usize).errnoSys(result, .fcntl)) |err| return err;
return .{ .result = @intCast(usize, result) };
@@ -179,12 +180,12 @@ pub fn getErrno(rc: anytype) std.os.E {
};
}
-pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) Maybe(JSC.Node.FileDescriptor) {
+pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) Maybe(bun.FileDescriptor) {
while (true) {
const rc = Syscall.system.open(file_path, flags, perm);
log("open({s}): {d}", .{ file_path, rc });
return switch (Syscall.getErrno(rc)) {
- .SUCCESS => .{ .result = @intCast(JSC.Node.FileDescriptor, rc) },
+ .SUCCESS => .{ .result = @intCast(bun.FileDescriptor, rc) },
.INTR => continue,
else => |err| {
return Maybe(std.os.fd_t){
@@ -204,7 +205,10 @@ pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode)
// That error is not unreachable for us
pub fn close(fd: std.os.fd_t) ?Syscall.Error {
log("close({d})", .{fd});
- std.debug.assert(fd != JSC.Node.invalid_fd);
+ std.debug.assert(fd != bun.invalid_fd);
+ if (comptime std.meta.trait.isSignedInt(@TypeOf(fd)))
+ std.debug.assert(fd > -1);
+
if (comptime Environment.isMac) {
// This avoids the EINTR problem.
return switch (system.getErrno(system.@"close$NOCANCEL"(fd))) {
@@ -685,7 +689,7 @@ pub const Error = struct {
}
};
-pub fn setPipeCapacityOnLinux(fd: JSC.Node.FileDescriptor, capacity: usize) Maybe(usize) {
+pub fn setPipeCapacityOnLinux(fd: bun.FileDescriptor, capacity: usize) Maybe(usize) {
if (comptime !Environment.isLinux) @compileError("Linux-only");
std.debug.assert(capacity > 0);
diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig
index 680425ddc..020fd5f2d 100644
--- a/src/bun.js/node/types.zig
+++ b/src/bun.js/node/types.zig
@@ -43,7 +43,6 @@ pub fn DeclEnum(comptime T: type) type {
});
}
-pub const FileDescriptor = os.fd_t;
pub const Flavor = enum {
sync,
promise,
@@ -602,7 +601,7 @@ pub const PathLike = union(Tag) {
};
pub const Valid = struct {
- pub fn fileDescriptor(fd: FileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) bool {
+ pub fn fileDescriptor(fd: bun.FileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) bool {
if (fd < 0) {
JSC.throwInvalidArguments("Invalid file descriptor, must not be negative number", .{}, ctx, exception);
return false;
@@ -756,14 +755,14 @@ pub const ArgumentsSlice = struct {
}
};
-pub fn fileDescriptorFromJS(ctx: JSC.C.JSContextRef, value: JSC.JSValue, exception: JSC.C.ExceptionRef) ?FileDescriptor {
+pub fn fileDescriptorFromJS(ctx: JSC.C.JSContextRef, value: JSC.JSValue, exception: JSC.C.ExceptionRef) ?bun.FileDescriptor {
if (!value.isNumber() or value.isBigInt()) return null;
const fd = value.toInt32();
if (!Valid.fileDescriptor(fd, ctx, exception)) {
return null;
}
- return @truncate(FileDescriptor, fd);
+ return @truncate(bun.FileDescriptor, fd);
}
var _get_time_prop_string: ?JSC.C.JSStringRef = null;
@@ -826,18 +825,18 @@ pub fn modeFromJS(ctx: JSC.C.JSContextRef, value: JSC.JSValue, exception: JSC.C.
pub const PathOrFileDescriptor = union(Tag) {
path: PathLike,
- fd: FileDescriptor,
+ fd: bun.FileDescriptor,
pub const Tag = enum { fd, path };
- pub fn hash(this: PathOrFileDescriptor) u64 {
+ pub fn hash(this: JSC.Node.PathOrFileDescriptor) u64 {
return switch (this) {
.path => std.hash.Wyhash.hash(0, this.path.slice()),
.fd => std.hash.Wyhash.hash(0, std.mem.asBytes(&this.fd)),
};
}
- pub fn format(this: PathOrFileDescriptor, comptime fmt: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
+ pub fn format(this: JSC.Node.PathOrFileDescriptor, comptime fmt: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
if (fmt.len != 0 and fmt != "s") {
@compileError("Unsupported format argument: '" ++ fmt ++ "'.");
}
@@ -847,20 +846,20 @@ pub const PathOrFileDescriptor = union(Tag) {
}
}
- pub fn fromJS(ctx: JSC.C.JSContextRef, arguments: *ArgumentsSlice, allocator: std.mem.Allocator, exception: JSC.C.ExceptionRef) ?PathOrFileDescriptor {
+ pub fn fromJS(ctx: JSC.C.JSContextRef, arguments: *ArgumentsSlice, allocator: std.mem.Allocator, exception: JSC.C.ExceptionRef) ?JSC.Node.PathOrFileDescriptor {
const first = arguments.next() orelse return null;
if (fileDescriptorFromJS(ctx, first, exception)) |fd| {
arguments.eat();
- return PathOrFileDescriptor{ .fd = fd };
+ return JSC.Node.PathOrFileDescriptor{ .fd = fd };
}
if (exception.* != null) return null;
- return PathOrFileDescriptor{ .path = PathLike.fromJSWithAllocator(ctx, arguments, allocator, exception) orelse return null };
+ return JSC.Node.PathOrFileDescriptor{ .path = PathLike.fromJSWithAllocator(ctx, arguments, allocator, exception) orelse return null };
}
- pub fn toJS(this: PathOrFileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) JSC.C.JSValueRef {
+ pub fn toJS(this: JSC.Node.PathOrFileDescriptor, ctx: JSC.C.JSContextRef, exception: JSC.C.ExceptionRef) JSC.C.JSValueRef {
return switch (this) {
.path => this.path.toJS(ctx, exception),
.fd => JSC.JSValue.jsNumberFromInt32(@intCast(i32, this.fd)).asRef(),
@@ -1578,6 +1577,7 @@ pub const Path = struct {
if (name_.isEmpty()) {
return JSC.ZigString.Empty.toValue(globalThis);
}
+
const out = std.fmt.allocPrint(allocator, "{s}{s}", .{ name_, ext }) catch unreachable;
defer allocator.free(out);
diff --git a/src/bun.js/node_timers.exports.js b/src/bun.js/node_timers.exports.js
index 58e660412..d46916ac5 100644
--- a/src/bun.js/node_timers.exports.js
+++ b/src/bun.js/node_timers.exports.js
@@ -1,3 +1,31 @@
+class Timeout {
+ #id;
+ #refCount = 1;
+ #clearFunction;
+
+ constructor(id, clearFunction) {
+ this.#id = id;
+ this.#refCount = 1;
+ this.#clearFunction = clearFunction;
+ }
+
+ ref() {
+ this.#refCount += 1;
+ }
+
+ hasRef() {
+ return this.#refCount > 0;
+ }
+
+ unref() {
+ this.#refCount -= 1;
+ var clearFunction = this.#clearFunction;
+ if (clearFunction && this.#refCount === 0) {
+ this.#clearFunction = null;
+ clearFunction(this.#id);
+ }
+ }
+}
export const setInterval = globalThis.setInterval;
export const setImmediate = globalThis.queueMicrotask;
export const setTimeout = globalThis.setTimeout;
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 28481dbb3..991c07ad0 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -2039,7 +2039,7 @@ var require_destroy = __commonJS({
r.destroyed = true;
}
if (!s.constructed) {
- this.once(kDestroy, function (er) {
+ this.once(kDestroy, (er) => {
_destroy(this, aggregateTwoErrors(er, err), cb);
});
} else {
@@ -5725,6 +5725,8 @@ function createNativeStream(nativeType, Readable) {
var DYNAMICALLY_ADJUST_CHUNK_SIZE =
process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1";
+ const finalizer = new FinalizationRegistry((ptr) => ptr && deinit(ptr));
+
var NativeReadable = class NativeReadable extends Readable {
#ptr;
#refCount = 1;
@@ -5733,6 +5735,7 @@ function createNativeStream(nativeType, Readable) {
#highWaterMark;
#pendingRead = false;
#hasResized = !DYNAMICALLY_ADJUST_CHUNK_SIZE;
+ #unregisterToken;
constructor(ptr, options = {}) {
super(options);
if (typeof options.highWaterMark === "number") {
@@ -5744,6 +5747,8 @@ function createNativeStream(nativeType, Readable) {
this.#constructed = false;
this.#remainingChunk = undefined;
this.#pendingRead = false;
+ this.#unregisterToken = {};
+ finalizer.register(this, this.#ptr, this.#unregisterToken);
}
_read(highWaterMark) {
@@ -5789,6 +5794,10 @@ function createNativeStream(nativeType, Readable) {
return chunk;
}
+ push(result, encoding) {
+ return super.push(...arguments);
+ }
+
#handleResult(result, view, isClosed) {
if (typeof result === "number") {
if (result >= this.#highWaterMark && !this.#hasResized && !isClosed) {
@@ -5844,6 +5853,7 @@ function createNativeStream(nativeType, Readable) {
return;
}
+ finalizer.unregister(this.#unregisterToken);
this.#ptr = 0;
if (updateRef) {
updateRef(ptr, false);
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 0d01a218f..5c996da45 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -470,7 +470,7 @@ pub const Response = struct {
}
};
-const null_fd = JSC.Node.invalid_fd;
+const null_fd = bun.invalid_fd;
pub const Fetch = struct {
const headers_string = "headers";
@@ -590,8 +590,7 @@ pub const Fetch = struct {
}
pub fn onDone(this: *FetchTasklet) void {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
const globalThis = this.global_this;
@@ -641,6 +640,11 @@ pub const Fetch = struct {
fn toBodyValue(this: *FetchTasklet) Body.Value {
var response_buffer = this.response_buffer.list;
+ const response = Body.Value{
+ .InternalBlob = .{
+ .bytes = response_buffer.toManaged(bun.default_allocator),
+ },
+ };
this.response_buffer = .{
.allocator = default_allocator,
.list = .{
@@ -654,12 +658,7 @@ pub const Fetch = struct {
// defer response_buffer.deinit(bun.default_allocator);
// return .{ .InlineBlob = inline_blob };
// }
-
- return .{
- .InternalBlob = .{
- .bytes = response_buffer.toManaged(bun.default_allocator),
- },
- };
+ return response;
}
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
@@ -1488,7 +1487,7 @@ pub const Blob = struct {
needs_async: *bool,
comptime needs_open: bool,
) JSC.JSValue {
- const fd: JSC.Node.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: {
+ const fd: bun.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: {
var file_path: [bun.MAX_PATH_BYTES]u8 = undefined;
switch (JSC.Node.Syscall.open(
pathlike.path.sliceZ(&file_path),
@@ -1608,7 +1607,7 @@ pub const Blob = struct {
needs_async: *bool,
comptime needs_open: bool,
) JSC.JSValue {
- const fd: JSC.Node.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: {
+ const fd: bun.FileDescriptor = if (comptime !needs_open) pathlike.fd else brk: {
var file_path: [bun.MAX_PATH_BYTES]u8 = undefined;
switch (JSC.Node.Syscall.open(
pathlike.path.sliceZ(&file_path),
@@ -1819,16 +1818,21 @@ pub const Blob = struct {
}
pub fn deinit(this: *Blob.Store) void {
+ const allocator = this.allocator;
+
switch (this.data) {
.bytes => |*bytes| {
bytes.deinit();
},
.file => |file| {
VirtualMachine.vm.removeFileBlob(file.pathlike);
+ if (file.pathlike == .path) {
+ allocator.free(bun.constStrToU8(file.pathlike.path.slice()));
+ }
},
}
- this.allocator.destroy(this);
+ allocator.destroy(this);
}
pub fn fromArrayList(list: std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator) !*Blob.Store {
@@ -1843,7 +1847,7 @@ pub const Blob = struct {
else
std.os.O.RDONLY | __opener_flags;
- pub fn getFdMac(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor {
+ pub fn getFdMac(this: *This) AsyncIO.OpenError!bun.FileDescriptor {
var buf: [bun.MAX_PATH_BYTES]u8 = undefined;
var path_string = if (@hasField(This, "file_store"))
this.file_store.pathlike.path
@@ -1865,7 +1869,7 @@ pub const Blob = struct {
return this.opened_fd;
}
- pub fn getFd(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor {
+ pub fn getFd(this: *This) AsyncIO.OpenError!bun.FileDescriptor {
if (this.opened_fd != null_fd) {
return this.opened_fd;
}
@@ -1877,7 +1881,7 @@ pub const Blob = struct {
}
}
- pub fn getFdLinux(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor {
+ pub fn getFdLinux(this: *This) AsyncIO.OpenError!bun.FileDescriptor {
var aio = &AsyncIO.global;
var buf: [bun.MAX_PATH_BYTES]u8 = undefined;
@@ -1915,7 +1919,7 @@ pub const Blob = struct {
return this.opened_fd;
}
- pub fn onOpen(this: *This, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.OpenError!JSC.Node.FileDescriptor) void {
+ pub fn onOpen(this: *This, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.OpenError!bun.FileDescriptor) void {
this.opened_fd = result catch {
this.errno = AsyncIO.asError(-completion.result);
@@ -1981,7 +1985,7 @@ pub const Blob = struct {
read_frame: @Frame(ReadFile.doRead) = undefined,
close_frame: @Frame(ReadFile.doClose) = undefined,
open_completion: HTTPClient.NetworkThread.Completion = undefined,
- opened_fd: JSC.Node.FileDescriptor = null_fd,
+ opened_fd: bun.FileDescriptor = null_fd,
read_completion: HTTPClient.NetworkThread.Completion = undefined,
read_len: SizeType = 0,
read_off: SizeType = 0,
@@ -1995,8 +1999,6 @@ pub const Blob = struct {
onCompleteCtx: *anyopaque = undefined,
onCompleteCallback: OnReadFileCallback = undefined,
- convert_to_byte_blob: bool = false,
-
pub const Read = struct {
buf: []u8,
is_temporary: bool = false,
@@ -2103,44 +2105,22 @@ pub const Blob = struct {
} });
return;
}
- var store = this.store.?;
- if (this.convert_to_byte_blob and this.file_store.pathlike == .path) {
- VirtualMachine.vm.removeFileBlob(this.file_store.pathlike);
- }
+ var store = this.store.?;
+ var buf = this.buffer;
+ defer store.deref();
+ defer bun.default_allocator.destroy(this);
if (this.system_error) |err| {
- bun.default_allocator.destroy(this);
- store.deref();
cb(cb_ctx, ResultType{ .err = err });
return;
}
- var buf = this.buffer;
- const is_temporary = !this.convert_to_byte_blob;
- if (this.convert_to_byte_blob) {
- if (store.data == .bytes) {
- bun.default_allocator.free(this.buffer);
- buf = store.data.bytes.slice();
- } else if (store.data == .file) {
- if (this.file_store.pathlike == .path) {
- if (this.file_store.pathlike.path == .string) {
- bun.default_allocator.free(this.file_store.pathlike.path.slice());
- }
- }
- store.data = .{ .bytes = ByteStore.init(buf, bun.default_allocator) };
- }
- }
-
- bun.default_allocator.destroy(this);
-
// Attempt to free it as soon as possible
if (store.ref_count > 1) {
- store.deref();
- cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = is_temporary } });
+ cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = true } });
} else {
- cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = is_temporary } });
- store.deref();
+ cb(cb_ctx, .{ .result = .{ .buf = buf, .is_temporary = true } });
}
}
pub fn run(this: *ReadFile, task: *ReadFileTask) void {
@@ -2238,7 +2218,6 @@ pub const Blob = struct {
return;
};
this.buffer = bytes;
- this.convert_to_byte_blob = std.os.S.ISREG(stat.mode) and this.file_store.pathlike == .path;
var remain = bytes;
while (remain.len > 0) {
@@ -2268,7 +2247,7 @@ pub const Blob = struct {
file_blob: Blob,
bytes_blob: Blob,
- opened_fd: JSC.Node.FileDescriptor = null_fd,
+ opened_fd: bun.FileDescriptor = null_fd,
open_frame: OpenFrameType = undefined,
write_frame: @Frame(WriteFile.doWrite) = undefined,
close_frame: @Frame(WriteFile.doClose) = undefined,
@@ -2477,8 +2456,8 @@ pub const Blob = struct {
offset: SizeType = 0,
size: SizeType = 0,
max_length: SizeType = Blob.max_size,
- destination_fd: JSC.Node.FileDescriptor = null_fd,
- source_fd: JSC.Node.FileDescriptor = null_fd,
+ destination_fd: bun.FileDescriptor = null_fd,
+ source_fd: bun.FileDescriptor = null_fd,
system_error: ?SystemError = null,
@@ -3458,7 +3437,7 @@ pub const Blob = struct {
}
}
- pub fn NewReadFileHandler(comptime Function: anytype, comptime lifetime: Lifetime) type {
+ pub fn NewReadFileHandler(comptime Function: anytype) type {
return struct {
context: Blob,
promise: JSPromise.Strong = .{},
@@ -3472,14 +3451,9 @@ pub const Blob = struct {
switch (bytes_) {
.result => |result| {
const bytes = result.buf;
- const is_temporary = result.is_temporary;
if (blob.size > 0)
blob.size = @minimum(@truncate(u32, bytes.len), blob.size);
- if (!is_temporary) {
- promise.resolve(globalThis, Function(&blob, globalThis, bytes, comptime lifetime));
- } else {
- promise.resolve(globalThis, Function(&blob, globalThis, bytes, .temporary));
- }
+ promise.resolve(globalThis, Function(&blob, globalThis, bytes, .transfer));
},
.err => |err| {
promise.reject(globalThis, err.toErrorInstance(globalThis));
@@ -3530,8 +3504,8 @@ pub const Blob = struct {
read_file_task.schedule();
}
- pub fn doReadFile(this: *Blob, comptime Function: anytype, comptime lifetime: Lifetime, global: *JSGlobalObject) JSValue {
- const Handler = NewReadFileHandler(Function, lifetime);
+ pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObject) JSValue {
+ const Handler = NewReadFileHandler(Function);
var promise = JSPromise.create(global);
var handler = Handler{
@@ -3618,7 +3592,7 @@ pub const Blob = struct {
pub fn toString(this: *Blob, global: *JSGlobalObject, comptime lifetime: Lifetime) JSValue {
if (this.needsToReadFile()) {
- return this.doReadFile(toStringWithBytes, lifetime, global);
+ return this.doReadFile(toStringWithBytes, global);
}
const view_: []u8 =
@@ -3632,7 +3606,7 @@ pub const Blob = struct {
pub fn toJSON(this: *Blob, global: *JSGlobalObject, comptime lifetime: Lifetime) JSValue {
if (this.needsToReadFile()) {
- return this.doReadFile(toJSONWithBytes, lifetime, global);
+ return this.doReadFile(toJSONWithBytes, global);
}
var view_ = this.sharedView();
@@ -3704,7 +3678,7 @@ pub const Blob = struct {
pub fn toArrayBuffer(this: *Blob, global: *JSGlobalObject, comptime lifetime: Lifetime) JSValue {
if (this.needsToReadFile()) {
- return this.doReadFile(toArrayBufferWithBytes, lifetime, global);
+ return this.doReadFile(toArrayBufferWithBytes, global);
}
var view_ = this.sharedView();
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 7a20b8562..54819a4e5 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -75,16 +75,18 @@ pub const ReadableStream = struct {
return AnyBlob{ .Blob = blob };
},
.File => |blobby| {
- var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis);
- blobby.store.ref();
+ if (blobby.lazy_readable == .blob) {
+ var blob = JSC.WebCore.Blob.initWithStore(blobby.lazy_readable.blob, globalThis);
+ blob.store.?.ref();
- // it should be lazy, file shouldn't have opened yet.
- std.debug.assert(!blobby.started);
+ // it should be lazy, file shouldn't have opened yet.
+ std.debug.assert(!blobby.started);
- stream.detach(globalThis);
- blobby.deinit();
- stream.done();
- return AnyBlob{ .Blob = blob };
+ stream.detach(globalThis);
+ blobby.deinit();
+ stream.done();
+ return AnyBlob{ .Blob = blob };
+ }
},
.Bytes => |bytes| {
@@ -100,8 +102,10 @@ pub const ReadableStream = struct {
return null;
},
- else => return null,
+ else => {},
}
+
+ return null;
}
pub fn done(this: *const ReadableStream) void {
@@ -247,12 +251,7 @@ pub const ReadableStream = struct {
}
pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
- return fromBlobWithPoll(globalThis, blob, recommended_chunk_size, null);
- }
-
- pub fn fromBlobWithPoll(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) JSC.JSValue {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
var store = blob.store orelse {
return ReadableStream.empty(globalThis);
};
@@ -260,6 +259,7 @@ pub const ReadableStream = struct {
.bytes => {
var reader = bun.default_allocator.create(ByteBlobLoader.Source) catch unreachable;
reader.* = .{
+ .globalThis = globalThis,
.context = undefined,
};
reader.context.setup(blob, recommended_chunk_size);
@@ -268,17 +268,51 @@ pub const ReadableStream = struct {
.file => {
var reader = bun.default_allocator.create(FileReader.Source) catch unreachable;
reader.* = .{
- .context = undefined,
+ .globalThis = globalThis,
+ .context = .{
+ .lazy_readable = .{
+ .blob = store,
+ },
+ },
};
- reader.context.setupWithPoll(store, recommended_chunk_size, poll);
+ store.ref();
return reader.toJS(globalThis);
},
}
}
+ pub fn fromFIFO(
+ globalThis: *JSGlobalObject,
+ fifo: *FIFO,
+ buffered_data: bun.ByteList,
+ ) JSC.JSValue {
+ JSC.markBinding(@src());
+ var reader = bun.default_allocator.create(FileReader.Source) catch unreachable;
+ reader.* = .{
+ .globalThis = globalThis,
+ .context = .{
+ .buffered_data = buffered_data,
+ .lazy_readable = .{
+ .readable = .{
+ .FIFO = fifo.*,
+ },
+ },
+ },
+ };
+
+ if (reader.context.lazy_readable.readable.FIFO.poll_ref) |poll| {
+ poll.owner.set(&reader.context.lazy_readable.readable.FIFO);
+ fifo.poll_ref = null;
+ }
+ reader.context.lazy_readable.readable.FIFO.pending.future = undefined;
+ reader.context.lazy_readable.readable.FIFO.auto_sizer = null;
+ reader.context.lazy_readable.readable.FIFO.pending.state = .none;
+
+ return reader.toJS(globalThis);
+ }
+
pub fn empty(globalThis: *JSGlobalObject) JSC.JSValue {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
return ReadableStream__empty(globalThis);
}
@@ -288,7 +322,7 @@ pub const ReadableStream = struct {
invalid = 0,
_,
- pub fn init(filedes: JSC.Node.FileDescriptor) StreamTag {
+ pub fn init(filedes: bun.FileDescriptor) StreamTag {
var bytes = [8]u8{ 1, 0, 0, 0, 0, 0, 0, 0 };
const filedes_ = @bitCast([8]u8, @as(usize, @truncate(u56, @intCast(usize, filedes))));
bytes[1..8].* = filedes_[0..7].*;
@@ -296,14 +330,14 @@ pub const ReadableStream = struct {
return @intToEnum(StreamTag, @bitCast(u64, bytes));
}
- pub fn fd(this: StreamTag) JSC.Node.FileDescriptor {
+ pub fn fd(this: StreamTag) bun.FileDescriptor {
var bytes = @bitCast([8]u8, @enumToInt(this));
if (bytes[0] != 1) {
- return JSC.Node.invalid_fd;
+ return bun.invalid_fd;
}
var out: u64 = 0;
@bitCast([8]u8, out)[0..7].* = bytes[1..8].*;
- return @intCast(JSC.Node.FileDescriptor, out);
+ return @intCast(bun.FileDescriptor, out);
}
};
};
@@ -439,7 +473,7 @@ pub const StreamStart = union(Tag) {
return .{
.FileSink = .{
- .input_path = .{ .fd = JSC.Node.invalid_fd },
+ .input_path = .{ .fd = bun.invalid_fd },
.chunk_size = chunk_size,
},
};
@@ -523,39 +557,72 @@ pub const StreamResult = union(Tag) {
into_array_and_done: Blob.SizeType,
pub const Pending = struct {
- frame: anyframe,
+ future: Future = undefined,
result: Writable,
consumed: Blob.SizeType = 0,
state: StreamResult.Pending.State = .none,
- pub fn run(this: *Writable.Pending) void {
- if (this.state != .pending) {
- return;
+ pub const Future = union(enum) {
+ promise: struct {
+ promise: *JSPromise,
+ globalThis: *JSC.JSGlobalObject,
+ },
+ handler: Handler,
+ };
+
+ pub fn promise(this: *Writable.Pending, globalThis: *JSC.JSGlobalObject) *JSPromise {
+ var prom = JSPromise.create(globalThis);
+ this.future = .{
+ .promise = .{ .promise = prom, .globalThis = globalThis },
+ };
+ this.state = .pending;
+ return prom;
+ }
+
+ pub const Handler = struct {
+ ctx: *anyopaque,
+ handler: Fn,
+
+ pub const Fn = fn (ctx: *anyopaque, result: StreamResult.Writable) void;
+
+ pub fn init(this: *Handler, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult.Writable) void) void {
+ this.ctx = ctx;
+ this.handler = struct {
+ const handler = handler_fn;
+ pub fn onHandle(ctx_: *anyopaque, result: StreamResult.Writable) void {
+ @call(.{ .modifier = .always_inline }, handler, .{ bun.cast(*Context, ctx_), result });
+ }
+ }.onHandle;
}
+ };
+ pub fn run(this: *Writable.Pending) void {
+ if (this.state != .pending) return;
this.state = .used;
- resume this.frame;
+ switch (this.future) {
+ .promise => |p| {
+ Writable.fulfillPromise(this.result, p.promise, p.globalThis);
+ },
+ .handler => |h| {
+ h.handler(h.ctx, this.result);
+ },
+ }
}
};
- pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Writable.Pending) void {
- var frame = bun.default_allocator.create(@Frame(Writable.toPromisedWrap)) catch unreachable;
- pending.state = .pending;
- frame.* = async Writable.toPromisedWrap(globalThis, promise, pending);
- pending.frame = frame;
- }
-
pub fn isDone(this: *const Writable) bool {
return switch (this.*) {
.owned_and_done, .temporary_and_done, .into_array_and_done, .done, .err => true,
else => false,
};
}
- fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Writable.Pending) void {
- suspend {}
-
- const result: Writable = pending.result;
+ pub fn fulfillPromise(
+ result: Writable,
+ promise: *JSPromise,
+ globalThis: *JSGlobalObject,
+ ) void {
+ promise.asValue(globalThis).unprotect();
switch (result) {
.err => |err| {
promise.reject(globalThis, err.toJSC(globalThis));
@@ -585,9 +652,9 @@ pub const StreamResult = union(Tag) {
.done => JSC.JSValue.jsBoolean(true),
.pending => |pending| brk: {
- var promise = JSC.JSPromise.create(globalThis);
- Writable.toPromised(globalThis, promise, pending);
- break :brk promise.asValue(globalThis);
+ const promise_value = pending.promise(globalThis).asValue(globalThis);
+ promise_value.protect();
+ break :brk promise_value;
},
};
}
@@ -599,10 +666,59 @@ pub const StreamResult = union(Tag) {
};
pub const Pending = struct {
- frame: anyframe,
- result: StreamResult,
+ future: Future = undefined,
+ result: StreamResult = .{ .done = {} },
state: State = .none,
+ pub fn set(this: *Pending, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void {
+ this.future.init(Context, ctx, handler_fn);
+ this.state = .pending;
+ }
+
+ pub fn promise(this: *Pending, globalObject: *JSC.JSGlobalObject) *JSC.JSPromise {
+ var prom = JSC.JSPromise.create(globalObject);
+ this.future = .{
+ .promise = .{
+ .promise = prom,
+ .globalThis = globalObject,
+ },
+ };
+ this.state = .pending;
+ return prom;
+ }
+
+ pub const Future = union(enum) {
+ promise: struct {
+ promise: *JSPromise,
+ globalThis: *JSC.JSGlobalObject,
+ },
+ handler: Handler,
+
+ pub fn init(this: *Future, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void {
+ this.* = .{
+ .handler = undefined,
+ };
+ this.handler.init(Context, ctx, handler_fn);
+ }
+ };
+
+ pub const Handler = struct {
+ ctx: *anyopaque,
+ handler: Fn,
+
+ pub const Fn = fn (ctx: *anyopaque, result: StreamResult) void;
+
+ pub fn init(this: *Handler, comptime Context: type, ctx: *Context, comptime handler_fn: fn (*Context, StreamResult) void) void {
+ this.ctx = ctx;
+ this.handler = struct {
+ const handler = handler_fn;
+ pub fn onHandle(ctx_: *anyopaque, result: StreamResult) void {
+ @call(.{ .modifier = .always_inline }, handler, .{ bun.cast(*Context, ctx_), result });
+ }
+ }.onHandle;
+ }
+ };
+
pub const State = enum {
none,
pending,
@@ -612,7 +728,14 @@ pub const StreamResult = union(Tag) {
pub fn run(this: *Pending) void {
if (this.state != .pending) return;
this.state = .used;
- resume this.frame;
+ switch (this.future) {
+ .promise => |p| {
+ StreamResult.fulfillPromise(this.result, p.promise, p.globalThis);
+ },
+ .handler => |h| {
+ h.handler(h.ctx, this.result);
+ },
+ }
}
};
@@ -623,11 +746,8 @@ pub const StreamResult = union(Tag) {
};
}
- fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
- suspend {}
-
- const result: StreamResult = pending.result;
-
+ pub fn fulfillPromise(result: StreamResult, promise: *JSC.JSPromise, globalThis: *JSC.JSGlobalObject) void {
+ promise.asValue(globalThis).unprotect();
switch (result) {
.err => |err| {
promise.reject(globalThis, err.toJSC(globalThis));
@@ -641,13 +761,6 @@ pub const StreamResult = union(Tag) {
}
}
- pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
- var frame = bun.default_allocator.create(@Frame(toPromisedWrap)) catch unreachable;
- pending.state = .pending;
- frame.* = async toPromisedWrap(globalThis, promise, pending);
- pending.frame = frame;
- }
-
pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue {
switch (this.*) {
.owned => |list| {
@@ -675,9 +788,9 @@ pub const StreamResult = union(Tag) {
return JSC.JSValue.jsNumberFromInt64(array.len);
},
.pending => |pending| {
- var promise = JSC.JSPromise.create(globalThis);
- toPromised(globalThis, promise, pending);
- return promise.asValue(globalThis);
+ const promise = pending.promise(globalThis).asValue(globalThis);
+ promise.protect();
+ return promise;
},
.err => |err| {
@@ -1001,15 +1114,6 @@ pub const Sink = struct {
}
};
-pub const PathOrFileDescriptor = union(enum) {
- path: ZigString.Slice,
- fd: JSC.Node.FileDescriptor,
-
- pub fn deinit(this: *const PathOrFileDescriptor) void {
- if (this.* == .path) this.path.deinit();
- }
-};
-
pub const FileSink = struct {
buffer: bun.ByteList,
allocator: std.mem.Allocator,
@@ -1018,11 +1122,10 @@ pub const FileSink = struct {
next: ?Sink = null,
auto_close: bool = false,
auto_truncate: bool = false,
- fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
+ fd: bun.FileDescriptor = bun.invalid_fd,
mode: JSC.Node.Mode = 0,
chunk_size: usize = 0,
pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{
- .frame = undefined,
.result = .{ .done = {} },
},
@@ -1032,13 +1135,16 @@ pub const FileSink = struct {
requested_end: bool = false,
has_adjusted_pipe_size_on_linux: bool = false,
max_write_size: usize = std.math.maxInt(usize),
- prevent_process_exit: bool = false,
reachable_from_js: bool = true,
poll_ref: ?*JSC.FilePoll = null,
pub usingnamespace NewReadyWatcher(@This(), .writable, ready);
const log = Output.scoped(.FileSink, false);
+ pub fn isReachable(this: *const FileSink) bool {
+ return this.reachable_from_js or this.signal.isDead();
+ }
+
const max_fifo_size = 64 * 1024;
pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) {
var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
@@ -1079,9 +1185,9 @@ pub const FileSink = struct {
}
pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) {
- if (this.fd != JSC.Node.invalid_fd) {
+ if (this.fd != bun.invalid_fd) {
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = JSC.Node.invalid_fd;
+ this.fd = bun.invalid_fd;
}
this.done = false;
@@ -1116,7 +1222,7 @@ pub const FileSink = struct {
return flushMaybePoll(this);
}
- fn adjustPipeLengthOnLinux(this: *FileSink, fd: JSC.Node.FileDescriptor, remain_len: usize) void {
+ fn adjustPipeLengthOnLinux(this: *FileSink, fd: bun.FileDescriptor, remain_len: usize) void {
// On Linux, we can adjust the pipe size to avoid blocking.
this.has_adjusted_pipe_size_on_linux = true;
@@ -1135,7 +1241,7 @@ pub const FileSink = struct {
}
pub fn flushMaybePollWithSize(this: *FileSink, writable_size: usize) StreamResult.Writable {
- std.debug.assert(this.fd != JSC.Node.invalid_fd);
+ std.debug.assert(this.fd != bun.invalid_fd);
var total: usize = this.written;
const initial = total;
@@ -1180,6 +1286,12 @@ pub const FileSink = struct {
break :brk writable_size;
if (this.poll_ref) |poll| {
+ if (poll.isHUP()) {
+ this.done = true;
+ this.cleanup();
+ return .{ .done = {} };
+ }
+
if (poll.isWritable()) {
break :brk this.max_write_size;
}
@@ -1288,11 +1400,11 @@ pub const FileSink = struct {
}
if (this.auto_truncate)
- std.os.ftruncate(this.fd, total) catch {};
+ std.os.ftruncate(fd, total) catch {};
if (this.auto_close) {
- _ = JSC.Node.Syscall.close(this.fd);
- this.fd = JSC.Node.invalid_fd;
+ _ = JSC.Node.Syscall.close(fd);
+ this.fd = bun.invalid_fd;
}
}
this.pending.run();
@@ -1300,7 +1412,7 @@ pub const FileSink = struct {
}
pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, _: bool) JSC.Node.Maybe(JSValue) {
- if (this.isPending()) {
+ if (this.isPending() or this.done) {
return .{ .result = JSC.JSValue.jsUndefined() };
}
const result = this.flush();
@@ -1320,13 +1432,15 @@ pub const FileSink = struct {
poll.deinit();
}
- if (this.fd != JSC.Node.invalid_fd) {
- if (this.scheduled_count > 0) {
- this.scheduled_count = 0;
- }
+ if (this.auto_close) {
+ if (this.fd != bun.invalid_fd) {
+ if (this.scheduled_count > 0) {
+ this.scheduled_count = 0;
+ }
- _ = JSC.Node.Syscall.close(this.fd);
- this.fd = JSC.Node.invalid_fd;
+ _ = JSC.Node.Syscall.close(this.fd);
+ this.fd = bun.invalid_fd;
+ }
}
if (this.buffer.cap > 0) {
@@ -1345,7 +1459,16 @@ pub const FileSink = struct {
this.cleanup();
this.reachable_from_js = false;
- if (!this.prevent_process_exit)
+ if (!this.isReachable())
+ this.allocator.destroy(this);
+ }
+
+ pub fn onHangup(this: *FileSink) void {
+ this.done = true;
+ this.signal.clear();
+ this.cleanup();
+
+ if (!this.isReachable())
this.allocator.destroy(this);
}
@@ -1393,6 +1516,9 @@ pub const FileSink = struct {
}
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
+ if (this.done) {
+ return .{ .done = {} };
+ }
const input = data.slice();
if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size) {
@@ -1422,6 +1548,10 @@ pub const FileSink = struct {
}
pub const writeBytes = write;
pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable {
+ if (this.done) {
+ return .{ .done = {} };
+ }
+
const input = data.slice();
if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size and strings.isAllASCII(input)) {
@@ -1450,6 +1580,10 @@ pub const FileSink = struct {
return .{ .owned = len };
}
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
+ if (this.done) {
+ return .{ .done = {} };
+ }
+
if (this.next) |*next| {
return next.writeUTF16(data);
}
@@ -1465,11 +1599,38 @@ pub const FileSink = struct {
}
fn isPending(this: *const FileSink) bool {
+ if (this.done) return false;
var poll_ref = this.poll_ref orelse return false;
return poll_ref.isRegistered() and !poll_ref.flags.contains(.needs_rearm);
}
+ pub fn close(this: *FileSink) void {
+ if (this.done)
+ return;
+
+ this.done = true;
+ const fd = this.fd;
+ if (fd != bun.invalid_fd) {
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinit();
+ }
+
+ this.fd = bun.invalid_fd;
+ if (this.auto_close)
+ _ = JSC.Node.Syscall.close(fd);
+ this.signal.close(null);
+ }
+
+ this.pending.result = .done;
+ this.pending.run();
+ }
+
pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
+ if (this.done) {
+ return .{ .result = {} };
+ }
+
if (this.next) |*next| {
return next.end(err);
}
@@ -1501,6 +1662,11 @@ pub const FileSink = struct {
std.debug.assert(this.next == null);
this.requested_end = true;
+ if (this.fd == bun.invalid_fd) {
+ this.cleanup();
+ return .{ .result = JSValue.jsNumber(this.written) };
+ }
+
const flushed = this.flush();
if (flushed == .err) {
@@ -2737,6 +2903,7 @@ pub fn ReadableStreamSource(
pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamSource", .{std.mem.span(name_)});
pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
const arguments = callFrame.arguments(3);
var this = arguments.ptr[0].asPtr(ReadableStreamSourceType);
const view = arguments.ptr[1];
@@ -2750,6 +2917,7 @@ pub fn ReadableStreamSource(
);
}
pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
this.globalThis = globalThis;
switch (this.startFromJS()) {
@@ -2778,11 +2946,13 @@ pub fn ReadableStreamSource(
}
}
pub fn cancel(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
this.cancel();
return JSC.JSValue.jsUndefined();
}
pub fn setClose(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
this.close_ctx = this;
this.close_handler = JSReadableStreamSource.onClose;
@@ -2792,6 +2962,7 @@ pub fn ReadableStreamSource(
}
pub fn updateRef(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
const ref_or_unref = callFrame.argument(1).asBoolean();
this.setRef(ref_or_unref);
@@ -2799,18 +2970,21 @@ pub fn ReadableStreamSource(
}
fn onClose(ptr: *anyopaque) void {
+ JSC.markBinding(@src());
var this = bun.cast(*ReadableStreamSourceType, ptr);
_ = this.close_jsvalue.call(this.globalThis, &.{});
// this.closer
}
pub fn deinit(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
this.deinit();
return JSValue.jsUndefined();
}
pub fn drain(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ JSC.markBinding(@src());
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
var list = this.drain();
if (list.len > 0) {
@@ -2844,15 +3018,8 @@ pub fn ReadableStreamSource(
});
comptime {
- if (!JSC.is_bindgen) {
+ if (!JSC.is_bindgen)
@export(load, .{ .name = Export[0].symbol_name });
- _ = JSReadableStreamSource.pull;
- _ = JSReadableStreamSource.start;
- _ = JSReadableStreamSource.cancel;
- _ = JSReadableStreamSource.setClose;
- _ = JSReadableStreamSource.load;
- _ = JSReadableStreamSource.deinit;
- }
}
};
};
@@ -2955,6 +3122,15 @@ pub const ByteBlobLoader = struct {
pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
+pub const PathOrFileDescriptor = union(enum) {
+ path: ZigString.Slice,
+ fd: bun.FileDescriptor,
+
+ pub fn deinit(this: *const PathOrFileDescriptor) void {
+ if (this.* == .path) this.path.deinit();
+ }
+};
+
pub const Pipe = struct {
ctx: ?*anyopaque = null,
onPipe: ?PipeFunction = null,
@@ -2983,7 +3159,6 @@ pub const ByteStream = struct {
},
has_received_last_chunk: bool = false,
pending: StreamResult.Pending = StreamResult.Pending{
- .frame = undefined,
.result = .{ .done = {} },
},
done: bool = false,
@@ -3242,109 +3417,534 @@ pub const ByteStream = struct {
);
};
-/// **Not** the Web "FileReader" API
-pub const FileReader = struct {
+pub const ReadResult = union(enum) {
+ pending: void,
+ err: Syscall.Error,
+ done: void,
+ read: []u8,
+
+ pub fn toStream(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool) StreamResult {
+ return toStreamWithIsDone(
+ this,
+ pending,
+ buf,
+ view,
+ close_on_empty,
+ false,
+ );
+ }
+ pub fn toStreamWithIsDone(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool, is_done: bool) StreamResult {
+ return switch (this) {
+ .pending => .{ .pending = pending },
+ .err => .{ .err = this.err },
+ .done => .{ .done = {} },
+ .read => |slice| brk: {
+ const owned = slice.ptr != buf.ptr;
+ const done = is_done or (close_on_empty and slice.len == 0);
+
+ break :brk if (owned and done)
+ StreamResult{ .owned_and_done = bun.ByteList.init(slice) }
+ else if (owned)
+ StreamResult{ .owned = bun.ByteList.init(slice) }
+ else if (done)
+ StreamResult{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, slice.len), .value = view } }
+ else
+ StreamResult{ .into_array = .{ .len = @truncate(Blob.SizeType, slice.len), .value = view } };
+ },
+ };
+ }
+};
+
+pub const AutoSizer = struct {
+ buffer: *bun.ByteList,
+ allocator: std.mem.Allocator,
+ max: usize,
+
+ pub fn resize(this: *AutoSizer, size: usize) ![]u8 {
+ const available = this.buffer.cap - this.buffer.len;
+ if (available >= size) return this.buffer.ptr[this.buffer.len..this.buffer.cap][0..size];
+ const to_grow = size -| available;
+ if (to_grow + @as(usize, this.buffer.cap) > this.max)
+ return this.buffer.ptr[this.buffer.len..this.buffer.cap];
+
+ var list = this.buffer.listManaged(this.allocator);
+ const prev_len = list.items.len;
+ try list.ensureTotalCapacity(to_grow + @as(usize, this.buffer.cap));
+ this.buffer.update(list);
+ return this.buffer.ptr[prev_len..@as(usize, this.buffer.cap)];
+ }
+};
+
+pub const FIFO = struct {
buf: []u8 = &[_]u8{},
view: JSC.Strong = .{},
- fd: JSC.Node.FileDescriptor = 0,
- auto_close: bool = false,
- loop: *JSC.EventLoop = undefined,
- mode: JSC.Node.Mode = 0,
- store: *Blob.Store,
- total_read: Blob.SizeType = 0,
- finalized: bool = false,
- callback: anyframe = undefined,
- buffered_data: bun.ByteList = .{},
- buffered_data_max: u32 = 0,
+ poll_ref: ?*JSC.FilePoll = null,
+ fd: bun.FileDescriptor = 0,
+ to_read: ?u32 = null,
+ close_on_empty_read: bool = false,
+ auto_sizer: ?*AutoSizer = null,
pending: StreamResult.Pending = StreamResult.Pending{
- .frame = undefined,
+ .future = undefined,
.state = .none,
.result = .{ .done = {} },
},
- cancelled: bool = false,
+ signal: JSC.WebCore.Signal = .{},
+ is_first_read: bool = true,
+ auto_close: bool = true,
+
+ pub usingnamespace NewReadyWatcher(@This(), .readable, ready);
+
+ pub fn finish(this: *FIFO) void {
+ this.close_on_empty_read = true;
+ if (this.poll_ref) |poll| {
+ poll.flags.insert(.hup);
+ }
+
+ this.pending.result = .{ .done = {} };
+ this.pending.run();
+ }
+
+ pub fn close(this: *FIFO) void {
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinit();
+ }
+
+ const fd = this.fd;
+ if (fd != bun.invalid_fd) {
+ this.fd = bun.invalid_fd;
+ if (this.auto_close)
+ _ = JSC.Node.Syscall.close(fd);
+ this.signal.close(null);
+ }
+
+ this.to_read = null;
+ this.pending.result = .{ .done = {} };
+
+ this.pending.run();
+ }
+
+ pub fn isClosed(this: *FIFO) bool {
+ return this.fd == bun.invalid_fd;
+ }
+
+ pub fn getAvailableToReadOnLinux(this: *FIFO) u32 {
+ var len: c_int = 0;
+ const rc: c_int = std.c.ioctl(this.fd, std.os.linux.T.FIONREAD, &len);
+ if (rc != 0) {
+ len = 0;
+ }
+
+ if (len > 0) {
+ if (this.poll_ref) |poll| {
+ poll.flags.insert(.readable);
+ }
+ } else {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.readable);
+ }
+
+ return @as(u32, 0);
+ }
+
+ return @intCast(u32, @maximum(len, 0));
+ }
+
+ pub fn adjustCapacityOnLinux(this: *FIFO, current: u32, max: u32) u32 {
+ // we do not un-mark it as readable if there's nothing in the pipe
+ if (!this.has_adjusted_pipe_size_on_linux) {
+ if (current > 0 and max >= std.mem.page_size * 16) {
+ this.has_adjusted_pipe_size_on_linux = true;
+ _ = Syscall.setPipeCapacityOnLinux(this.fd, @minimum(max * 4, Syscall.getMaxPipeSizeOnLinux()));
+ }
+ }
+ }
+
+ pub fn cannotRead(this: *FIFO, available: u32) ?ReadResult {
+ if (comptime Environment.isLinux) {
+ if (available > 0 and available != std.math.maxInt(u32)) {
+ return null;
+ }
+ }
+
+ if (this.poll_ref) |poll| {
+ if (comptime Environment.isMac) {
+ if (available > 0 and available != std.math.maxInt(u32)) {
+ poll.flags.insert(.readable);
+ }
+ }
+
+ const is_readable = poll.isReadable();
+ if (!is_readable and (this.close_on_empty_read or poll.isHUP())) {
+ // it might be readable actually
+ this.close_on_empty_read = true;
+ if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
+ return null;
+ }
+
+ return .done;
+ } else if (!is_readable and poll.isWatching()) {
+ // this happens if we've registered a watcher but we haven't
+ // ticked the event loop since registering it
+ if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
+ return null;
+ }
+
+ return .pending;
+ }
+ }
+
+ if (comptime Environment.isLinux) {
+ if (available == 0) {
+ std.debug.assert(this.poll_ref == null);
+ return .pending;
+ }
+ } else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
+ // we don't know if it's readable or not
+ if (!bun.isReadable(this.fd)) {
+ // we hung up
+ if (this.close_on_empty_read)
+ return .done;
+
+ return .pending;
+ }
+ }
+
+ return null;
+ }
+
+ pub fn getAvailableToRead(this: *FIFO, size_or_offset: i64) ?u32 {
+ if (comptime Environment.isLinux) {
+ return this.getAvailableToReadOnLinux();
+ }
+
+ if (size_or_offset != std.math.maxInt(@TypeOf(size_or_offset)))
+ this.to_read = @intCast(u32, @maximum(size_or_offset, 0));
+
+ return this.to_read;
+ }
+
+ pub fn ready(this: *FIFO, sizeOrOffset: i64) void {
+ const available_to_read = this.getAvailableToRead(sizeOrOffset);
+ if (this.isClosed()) {
+ this.unwatch(this.poll_ref.?.fd);
+ return;
+ }
+
+ const read_result = this.read(this.buf, available_to_read);
+ if (read_result == .read and read_result.read.len == 0) {
+ this.unwatch(this.poll_ref.?.fd);
+ this.close();
+ return;
+ }
+
+ if (read_result == .read) {
+ if (this.to_read) |*to_read| {
+ to_read.* = to_read.* -| @truncate(u32, read_result.read.len);
+ }
+ }
+
+ this.pending.result = read_result.toStream(
+ &this.pending,
+ this.buf,
+ this.view.get() orelse .zero,
+ this.close_on_empty_read,
+ );
+ this.pending.run();
+ }
+
+ pub fn readFromJS(
+ this: *FIFO,
+ buf_: []u8,
+ view: JSValue,
+ globalThis: *JSC.JSGlobalObject,
+ ) StreamResult {
+ if (this.isClosed()) {
+ return .{ .done = {} };
+ }
+
+ if (!this.isWatching()) {
+ this.watch(this.fd);
+ }
+
+ const read_result = this.read(buf_, this.to_read);
+ if (read_result == .read and read_result.read.len == 0) {
+ this.close();
+ return .{ .done = {} };
+ }
+
+ if (read_result == .read) {
+ if (this.to_read) |*to_read| {
+ to_read.* = to_read.* -| @truncate(u32, read_result.read.len);
+ }
+ }
+
+ if (read_result == .pending) {
+ this.buf = buf_;
+ this.view.set(globalThis, view);
+ if (!this.isWatching()) this.watch(this.fd);
+ std.debug.assert(this.isWatching());
+ return .{ .pending = &this.pending };
+ }
+
+ return read_result.toStream(&this.pending, buf_, view, this.close_on_empty_read);
+ }
+
+ pub fn read(
+ this: *FIFO,
+ buf_: []u8,
+ /// provided via kqueue(), only on macOS
+ kqueue_read_amt: ?u32,
+ ) ReadResult {
+ const available_to_read = this.getAvailableToRead(
+ if (kqueue_read_amt != null)
+ @intCast(i64, kqueue_read_amt.?)
+ else
+ std.math.maxInt(i64),
+ );
+
+ if (this.cannotRead(available_to_read orelse std.math.maxInt(u32))) |res| {
+ return switch (res) {
+ .pending => .{ .pending = {} },
+ .done => .{ .done = {} },
+ else => unreachable,
+ };
+ }
+
+ var buf = buf_;
+
+ if (available_to_read) |amt| {
+ if (amt >= buf.len) {
+ if (comptime Environment.isLinux) {
+ this.adjustCapacityOnLinux(amt, buf.len);
+ }
+
+ if (this.auto_sizer) |sizer| {
+ buf = sizer.resize(amt) catch buf_;
+ }
+ }
+ }
+
+ return this.doRead(buf);
+ }
+
+ fn doRead(
+ this: *FIFO,
+ buf: []u8,
+ ) ReadResult {
+ switch (Syscall.read(this.fd, buf)) {
+ .err => |err| {
+ const retry = std.os.E.AGAIN;
+ const errno = brk: {
+ const _errno = err.getErrno();
+ if (comptime Environment.isLinux) {
+ // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already
+ // sent a 0
+ // Let's retry later.
+ break :brk .AGAIN;
+ }
+
+ break :brk _errno;
+ };
+
+ switch (errno) {
+ retry => {
+ return .{ .pending = {} };
+ },
+ else => {},
+ }
+
+ return .{ .err = err };
+ },
+ .result => |result| {
+ if (this.poll_ref) |poll| {
+ if (comptime Environment.isLinux) {
+ // do not insert .eof here
+ if (result < buf.len)
+ poll.flags.remove(.readable);
+ } else {
+ // Since we have no way of querying FIFO capacity
+ // its only okay to read when kqueue says its readable
+ // otherwise we might block the process
+ poll.flags.remove(.readable);
+ }
+ }
+
+ if (result == 0)
+ return .{ .read = buf[0..0] };
+
+ return .{ .read = buf[0..result] };
+ },
+ }
+ }
+};
+
+pub const File = struct {
+ buf: []u8 = &[_]u8{},
+ view: JSC.Strong = .{},
+
+ poll_ref: JSC.PollRef = .{},
+ fd: bun.FileDescriptor = bun.invalid_fd,
+ concurrent: Concurrent = .{},
+ loop: *JSC.EventLoop,
+ seekable: bool = false,
+ auto_close: bool = false,
+ remaining_bytes: Blob.SizeType = std.math.maxInt(Blob.SizeType),
user_chunk_size: Blob.SizeType = 0,
+ total_read: Blob.SizeType = 0,
+ mode: JSC.Node.Mode = 0,
+ pending: StreamResult.Pending = .{},
scheduled_count: u32 = 0,
- concurrent: Concurrent = Concurrent{},
- started: bool = false,
- stored_global_this_: ?*JSC.JSGlobalObject = null,
- poll_ref: ?*JSC.FilePoll = null,
- has_adjusted_pipe_size_on_linux: bool = false,
- is_fifo: bool = false,
- finished: bool = false,
- /// When we have some way of knowing that EOF truly is the write end of the
- /// pipe being closed
- /// Set this to true so we automatically mark it as done
- /// This is used in Bun.spawn() to automatically close stdout and stderr
- /// when the process exits
- close_on_eof: bool = false,
+ pub fn close(this: *File) void {
+ if (this.auto_close) {
+ this.auto_close = false;
+ const fd = this.fd;
+ if (fd != bun.invalid_fd) {
+ this.fd = bun.invalid_fd;
+ _ = Syscall.close(fd);
+ }
+ }
- signal: JSC.WebCore.Signal = .{},
+ this.poll_ref.disable();
- pub usingnamespace NewReadyWatcher(@This(), .readable, ready);
+ this.view.clear();
+ this.buf.len = 0;
- pub inline fn globalThis(this: *FileReader) *JSC.JSGlobalObject {
- return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis;
+ this.pending.result = .{ .done = {} };
+ this.pending.run();
}
- const run_on_different_thread_size = bun.huge_allocator_threshold;
+ pub fn deinit(this: *File) void {
+ this.close();
+ }
- pub const tag = ReadableStream.Tag.File;
+ pub fn isClosed(this: *const File) bool {
+ return this.fd == bun.invalid_fd;
+ }
- pub fn setupWithPoll(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void {
- store.ref();
- this.* = .{
- .loop = JSC.VirtualMachine.vm.eventLoop(),
- .auto_close = store.data.file.pathlike == .path,
- .store = store,
- .user_chunk_size = chunk_size,
- .poll_ref = poll,
+ fn calculateChunkSize(this: *File, available_to_read: usize) usize {
+ const chunk_size: usize = if (this.user_chunk_size > 0)
+ @as(usize, this.user_chunk_size)
+ else if (this.isSeekable())
+ @as(usize, default_file_chunk_size)
+ else
+ @as(usize, default_fifo_chunk_size);
+
+ return if (this.remaining_bytes > 0 and this.isSeekable())
+ if (available_to_read != std.math.maxInt(usize))
+ @minimum(chunk_size, available_to_read)
+ else
+ @minimum(this.remaining_bytes -| this.total_read, chunk_size)
+ else
+ @minimum(available_to_read, chunk_size);
+ }
+
+ pub fn start(
+ this: *File,
+ file: *Blob.FileStore,
+ ) StreamStart {
+ var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
+ var auto_close = file.pathlike == .path;
+
+ var fd = if (!auto_close)
+ file.pathlike.fd
+ else switch (Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) {
+ .result => |_fd| _fd,
+ .err => |err| {
+ return .{ .err = err.withPath(file.pathlike.path.slice()) };
+ },
};
- if (this.poll_ref) |poll_| {
- poll_.owner.set(this);
+
+ if (!auto_close) {
+ // ensure we have non-blocking IO set
+ switch (Syscall.fcntl(fd, std.os.F.GETFL, 0)) {
+ .err => return .{ .err = Syscall.Error.fromCode(std.os.E.BADF, .fcntl) },
+ .result => |flags| {
+ // if we do not, clone the descriptor and set non-blocking
+ // it is important for us to clone it so we don't cause Weird Things to happen
+ if ((flags & std.os.O.NONBLOCK) == 0) {
+ auto_close = true;
+ fd = switch (Syscall.fcntl(fd, std.os.F.DUPFD, 0)) {
+ .result => |_fd| @intCast(@TypeOf(fd), _fd),
+ .err => |err| return .{ .err = err },
+ };
+
+ switch (Syscall.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK)) {
+ .err => |err| return .{ .err = err },
+ .result => |_| {},
+ }
+ }
+ },
+ }
}
- }
- pub fn setup(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType) void {
- this.setupWithPoll(store, chunk_size, null);
- }
+ const stat: std.os.Stat = switch (Syscall.fstat(fd)) {
+ .result => |result| result,
+ .err => |err| {
+ if (auto_close) {
+ _ = Syscall.close(fd);
+ }
+ return .{ .err = err };
+ },
+ };
- pub fn finish(this: *FileReader) void {
- if (this.finished) return;
- this.finished = true;
- this.close_on_eof = true;
-
- // we are done
- // resolve any promises with done
- // but there could still be data in the pipe
- // so we shouldn't actually end it, just means that we no longer will retry on EGAGAIN
- if (this.pending.state == .pending) {
- if (this.buffered_data.len > 0) {
- this.pending.result = .{ .owned = this.buffered_data };
- this.buffered_data = .{};
- } else {
- this.pending.result = .{ .done = {} };
+ if (std.os.S.ISDIR(stat.mode)) {
+ if (auto_close) {
+ _ = Syscall.close(fd);
+ }
+ return .{ .err = Syscall.Error.fromCode(.ISDIR, .fstat) };
+ }
+
+ if (std.os.S.ISSOCK(stat.mode)) {
+ if (auto_close) {
+ _ = Syscall.close(fd);
+ }
+ return .{ .err = Syscall.Error.fromCode(.INVAL, .fstat) };
+ }
+
+ file.mode = @intCast(JSC.Node.Mode, stat.mode);
+ this.mode = file.mode;
+
+ this.seekable = std.os.S.ISREG(stat.mode);
+ file.seekable = this.seekable;
+
+ if (this.seekable) {
+ this.remaining_bytes = @intCast(Blob.SizeType, stat.size);
+
+ if (this.remaining_bytes == 0) {
+ if (auto_close) {
+ _ = Syscall.close(fd);
+ }
+
+ return .{ .empty = {} };
}
- this.pending.run();
}
+
+ this.fd = fd;
+ this.auto_close = auto_close;
+
+ return StreamStart{ .ready = {} };
+ }
+
+ pub fn isSeekable(this: File) bool {
+ return this.seekable;
}
const Concurrent = struct {
read: Blob.SizeType = 0,
task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback },
completion: AsyncIO.Completion = undefined,
- read_frame: anyframe = undefined,
chunk_size: Blob.SizeType = 0,
main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null },
concurrent_task: JSC.ConcurrentTask = .{},
pub fn taskCallback(task: *NetworkThread.Task) void {
- var this = @fieldParentPtr(FileReader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
- var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable;
- _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this});
+ var this = @fieldParentPtr(File, "concurrent", @fieldParentPtr(Concurrent, "task", task));
+ runAsync(this);
}
- pub fn onRead(this: *FileReader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void {
+ pub fn onRead(this: *File, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void {
this.concurrent.read = @truncate(Blob.SizeType, result catch |err| {
if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
this.pending.result = .{
@@ -3363,14 +3963,14 @@ pub const FileReader = struct {
};
}
this.concurrent.read = 0;
- resume this.concurrent.read_frame;
+ scheduleMainThreadTask(this);
return;
});
- resume this.concurrent.read_frame;
+ scheduleMainThreadTask(this);
}
- pub fn scheduleRead(this: *FileReader) void {
+ pub fn scheduleRead(this: *File) void {
if (comptime Environment.isMac) {
var remaining = this.buf[this.concurrent.read..];
@@ -3408,7 +4008,7 @@ pub const FileReader = struct {
}
AsyncIO.global.read(
- *FileReader,
+ *File,
this,
onRead,
&this.concurrent.completion,
@@ -3417,602 +4017,345 @@ pub const FileReader = struct {
null,
);
- suspend {
- var _frame = @frame();
- var this_frame = bun.default_allocator.create(std.meta.Child(@TypeOf(_frame))) catch unreachable;
- this_frame.* = _frame.*;
- this.concurrent.read_frame = this_frame;
- }
-
scheduleMainThreadTask(this);
}
pub fn onJSThread(task_ctx: *anyopaque) void {
- var this: *FileReader = bun.cast(*FileReader, task_ctx);
+ var this: *File = bun.cast(*File, task_ctx);
const view = this.view.get().?;
defer this.view.clear();
- if (this.finalized and this.scheduled_count > 0) {
- this.pending.run();
- this.scheduled_count -= 1;
-
+ if (this.isClosed()) {
this.deinit();
return;
}
- if (this.pending.state == .pending and this.pending.result == .err and this.concurrent.read == 0) {
- resume this.pending.frame;
- this.scheduled_count -= 1;
- this.finalize();
- return;
- }
-
if (this.concurrent.read == 0) {
this.pending.result = .{ .done = {} };
- resume this.pending.frame;
- this.scheduled_count -= 1;
- this.finalize();
- return;
+ } else if (view != .zero) {
+ this.pending.result = .{
+ .into_array = .{
+ .value = view,
+ .len = @truncate(Blob.SizeType, this.concurrent.read),
+ },
+ };
+ } else {
+ this.pending.result = .{
+ .owned = bun.ByteList.init(this.buf),
+ };
}
- this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read), view, false, this.buf);
this.pending.run();
- this.scheduled_count -= 1;
- if (this.pending.result.isDone()) {
- this.finalize();
- }
}
- pub fn scheduleMainThreadTask(this: *FileReader) void {
+ pub fn scheduleMainThreadTask(this: *File) void {
this.concurrent.main_thread_task.ctx = this;
this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task));
}
- fn runAsync(this: *FileReader) void {
+ fn runAsync(this: *File) void {
this.concurrent.read = 0;
Concurrent.scheduleRead(this);
-
- suspend {
- bun.default_allocator.destroy(@frame());
- }
}
};
- pub fn scheduleAsync(this: *FileReader, chunk_size: Blob.SizeType) void {
+ pub fn scheduleAsync(
+ this: *File,
+ chunk_size: Blob.SizeType,
+ globalThis: *JSC.JSGlobalObject,
+ ) void {
this.scheduled_count += 1;
- this.pollRef().ref(this.globalThis().bunVM());
- std.debug.assert(this.started);
+ this.poll_ref.ref(globalThis.bunVM());
NetworkThread.init() catch {};
+
this.concurrent.chunk_size = chunk_size;
NetworkThread.global.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 });
}
- // macOS default pipe size is page_size, 16k, or 64k. It changes based on how much was written
- // Linux default pipe size is 16 pages of memory
- const default_fifo_chunk_size = 64 * 1024;
- const default_file_chunk_size = 1024 * 1024 * 2;
-
- pub fn onStart(this: *FileReader) StreamStart {
- var file = &this.store.data.file;
- std.debug.assert(!this.started);
- this.started = true;
- var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
- var auto_close = this.auto_close;
- defer this.auto_close = auto_close;
- var fd = if (!auto_close)
- file.pathlike.fd
- else switch (Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) {
- .result => |_fd| _fd,
- .err => |err| {
- this.deinit();
- return .{ .err = err.withPath(file.pathlike.path.slice()) };
- },
- };
-
- if (this.poll_ref != null or this.is_fifo) {
- file.seekable = false;
- this.is_fifo = true;
- if (this.poll_ref) |poll|
- std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd));
- } else {
- if (!auto_close) {
- // ensure we have non-blocking IO set
- switch (Syscall.fcntl(fd, std.os.F.GETFL, 0)) {
- .err => return .{ .err = Syscall.Error.fromCode(std.os.E.BADF, .fcntl) },
- .result => |flags| {
- // if we do not, clone the descriptor and set non-blocking
- // it is important for us to clone it so we don't cause Weird Things to happen
- if ((flags & std.os.O.NONBLOCK) == 0) {
- auto_close = true;
- fd = switch (Syscall.fcntl(fd, std.os.F.DUPFD, 0)) {
- .result => |_fd| @intCast(@TypeOf(fd), _fd),
- .err => |err| return .{ .err = err },
- };
-
- switch (Syscall.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK)) {
- .err => |err| return .{ .err = err },
- .result => |_| {},
- }
- }
- },
- }
- }
-
- const stat: std.os.Stat = switch (Syscall.fstat(fd)) {
- .result => |result| result,
- .err => |err| {
- if (auto_close) {
- _ = Syscall.close(fd);
- }
- this.deinit();
- return .{ .err = err.withPath(file.pathlike.path.slice()) };
- },
- };
-
- if (std.os.S.ISDIR(stat.mode)) {
- const err = Syscall.Error.fromCode(.ISDIR, .fstat);
- if (auto_close) {
- _ = Syscall.close(fd);
- }
- this.deinit();
- return .{ .err = err };
- }
-
- if (std.os.S.ISSOCK(stat.mode)) {
- const err = Syscall.Error.fromCode(.INVAL, .fstat);
+ pub fn read(this: *File, buf: []u8) ReadResult {
+ if (this.fd == bun.invalid_fd)
+ return .{ .done = {} };
- if (auto_close) {
- _ = Syscall.close(fd);
- }
- this.deinit();
- return .{ .err = err };
- }
+ if (this.seekable and this.remaining_bytes == 0)
+ return .{ .done = {} };
- file.seekable = std.os.S.ISREG(stat.mode);
- file.mode = @intCast(JSC.Node.Mode, stat.mode);
- this.mode = file.mode;
- this.is_fifo = std.os.S.ISFIFO(stat.mode);
+ return this.doRead(buf);
+ }
- if (file.seekable orelse false)
- file.max_size = @intCast(Blob.SizeType, stat.size);
+ pub fn readFromJS(this: *File, buf: []u8, view: JSValue, globalThis: *JSC.JSGlobalObject) StreamResult {
+ const read_result = this.read(buf);
+ if (read_result == .read and read_result.read.len == 0) {
+ this.close();
+ return .{ .done = {} };
}
- if ((file.seekable orelse false) and file.max_size == 0) {
- if (auto_close) {
- _ = Syscall.close(fd);
- }
- this.deinit();
- return .{ .empty = {} };
+ if (read_result == .read) {
+ this.remaining_bytes -|= @intCast(Blob.SizeType, read_result.read.len);
}
- this.fd = fd;
- this.auto_close = auto_close;
-
- const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
- this.signal.start();
- return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
- }
-
- fn calculateChunkSize(this: *FileReader, available_to_read: usize) usize {
- const file = &this.store.data.file;
+ if (read_result == .pending) {
+ if (this.scheduled_count == 0) {
+ this.buf = buf;
+ this.view.set(globalThis, view);
+ this.scheduleAsync(@truncate(Blob.SizeType, buf.len), globalThis);
+ }
- const chunk_size: usize = if (this.user_chunk_size > 0)
- @as(usize, this.user_chunk_size)
- else if (file.seekable orelse false)
- @as(usize, default_file_chunk_size)
- else
- @as(usize, default_fifo_chunk_size);
+ return .{ .pending = &this.pending };
+ }
- return if (file.max_size > 0)
- if (available_to_read != std.math.maxInt(usize)) @minimum(chunk_size, available_to_read) else @minimum(@maximum(this.total_read, file.max_size) - this.total_read, chunk_size)
- else
- @minimum(available_to_read, chunk_size);
+ return read_result.toStream(&this.pending, buf, view, false);
}
- pub fn onPullInto(this: *FileReader, buffer: []u8, view: JSC.JSValue) StreamResult {
- const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
- std.debug.assert(this.started);
-
- switch (chunk_size) {
- 0 => {
- std.debug.assert(this.store.data.file.seekable orelse false);
- this.finalize();
- return .{ .done = {} };
- },
- run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
- if (!this.isFIFO()) {
- this.view.set(this.globalThis(), view);
- // should never be reached
- this.pending.result = .{
- .err = Syscall.Error.todo,
- };
- this.buf = buffer;
-
- this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
+ pub fn doRead(this: *File, buf: []u8) ReadResult {
+ switch (Syscall.read(this.fd, buf)) {
+ .err => |err| {
+ const retry = std.os.E.AGAIN;
+ const errno = err.getErrno();
- return .{ .pending = &this.pending };
+ switch (errno) {
+ retry => {
+ return .{ .pending = {} };
+ },
+ else => {
+ return .{ .err = err };
+ },
}
},
- else => {},
- }
+ .result => |result| {
+ this.remaining_bytes -|= @truncate(@TypeOf(this.remaining_bytes), result);
- return this.read(buffer, view, null);
- }
+ if (result == 0) {
+ return .{ .done = {} };
+ }
- fn maybeAutoClose(this: *FileReader) void {
- if (this.auto_close) {
- _ = Syscall.close(this.fd);
- this.auto_close = false;
+ return .{ .read = buf[0..result] };
+ },
}
}
+};
- fn handleReadChunk(this: *FileReader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult {
- std.debug.assert(this.started);
-
- this.total_read += @intCast(Blob.SizeType, result);
- const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false)
- this.store.data.file.max_size -| this.total_read
- else
- @as(Blob.SizeType, std.math.maxInt(Blob.SizeType));
+// macOS default pipe size is page_size, 16k, or 64k. It changes based on how much was written
+// Linux default pipe size is 16 pages of memory
+const default_fifo_chunk_size = 64 * 1024;
+const default_file_chunk_size = 1024 * 1024 * 2;
- // this handles:
- // - empty file
- // - stream closed for some reason
- // - FIFO returned EOF
- if ((result == 0 and (remaining == 0 or this.close_on_eof))) {
- this.finalize();
- return .{ .done = {} };
- }
+/// **Not** the Web "FileReader" API
+pub const FileReader = struct {
+ buffered_data: bun.ByteList = .{},
- const has_more = remaining > 0;
+ total_read: Blob.SizeType = 0,
+ max_read: Blob.SizeType = 0,
- if (!has_more) {
- if (owned) {
- return .{ .owned_and_done = bun.ByteList.init(buf) };
- }
- return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result), .value = view } };
- }
+ cancelled: bool = false,
+ started: bool = false,
+ stored_global_this_: ?*JSC.JSGlobalObject = null,
+ user_chunk_size: Blob.SizeType = 0,
+ lazy_readable: Readable.Lazy = undefined,
- if (owned) {
- return .{ .owned = bun.ByteList.init(buf) };
+ pub fn setSignal(this: *FileReader, signal: Signal) void {
+ switch (this.lazy_readable) {
+ .readable => {
+ if (this.lazy_readable.readable == .FIFO)
+ this.lazy_readable.readable.FIFO.signal = signal;
+ },
+ else => {},
}
-
- return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result), .value = view } };
}
- pub fn read(
- this: *FileReader,
- read_buf: []u8,
- view: JSC.JSValue,
- /// provided via kqueue(), only on macOS
- available_to_read: ?c_int,
- ) StreamResult {
- std.debug.assert(this.started);
- std.debug.assert(read_buf.len > 0);
-
- const fd = this.fd;
-
- if (fd == JSC.Node.invalid_fd) {
- std.debug.assert(this.poll_ref == null);
- return .{ .done = {} };
- }
-
- var buf_to_use = read_buf;
- var free_buffer_on_error: bool = false;
- var pipe_is_empty_on_linux = false;
- var len: c_int = available_to_read orelse 0;
-
- // if it's a pipe, we really don't know what to expect what the max size will be
- // if the pipe is sending us WAY bigger data than what we can fit in the buffer
- // we allocate a new buffer of up to 4 MB
- if (this.isFIFO() and view != .zero) {
- outer: {
- // macOS FIONREAD doesn't seem to work here
- // Kernel code implies it only is enabled for FIFOs which exist
- // in the filesystem
- if (comptime Environment.isLinux) {
- if (len == 0) {
- const rc: c_int = std.c.ioctl(fd, std.os.linux.T.FIONREAD, &len);
- if (rc != 0) {
- len = 0;
- }
+ pub fn readable(this: *FileReader) *Readable {
+ return &this.lazy_readable.readable;
+ }
- if (len > 0) {
- if (this.poll_ref) |poll| {
- poll.flags.insert(.readable);
- }
- } else {
- if (this.poll_ref) |poll| {
- poll.flags.remove(.readable);
- }
+ pub const Readable = union(enum) {
+ FIFO: FIFO,
+ File: File,
- pipe_is_empty_on_linux = true;
- }
+ pub const Lazy = union(enum) {
+ readable: Readable,
+ blob: *Blob.Store,
+ empty: void,
- // we do not un-mark it as readable if there's nothing in the pipe
- if (!this.has_adjusted_pipe_size_on_linux) {
- if (len > 0 and buf_to_use.len >= std.mem.page_size * 16) {
- this.has_adjusted_pipe_size_on_linux = true;
- _ = Syscall.setPipeCapacityOnLinux(fd, @minimum(buf_to_use.len * 4, Syscall.getMaxPipeSizeOnLinux()));
- }
- }
- }
- }
- if (len > read_buf.len * 10 and read_buf.len < std.mem.page_size) {
- // then we need to allocate a buffer
- // to read into
- // this
- buf_to_use = bun.default_allocator.alloc(
- u8,
- @intCast(
- usize,
- @minimum(
- len,
- 1024 * 1024 * 4,
- ),
- ),
- ) catch break :outer;
- free_buffer_on_error = true;
+ pub fn finish(this: *Lazy) void {
+ switch (this.readable) {
+ .FIFO => {
+ this.readable.FIFO.finish();
+ },
+ .File => {},
}
}
- }
- if (this.poll_ref) |poll| {
- if (comptime Environment.isMac) {
- if ((available_to_read orelse 0) > 0) {
- poll.flags.insert(.readable);
+ pub fn isClosed(this: *Lazy) bool {
+ switch (this.*) {
+ .empty, .blob => {
+ return true;
+ },
+ .readable => {
+ return this.readable.isClosed();
+ },
}
}
- const is_readable = poll.isReadable();
- if (!is_readable and poll.isEOF()) {
- if (poll.isHUP()) {
- this.finalize();
- }
-
- return .{ .done = {} };
- } else if (!is_readable and poll.isHUP()) {
- this.finalize();
- return .{ .done = {} };
- } else if (!is_readable) {
- if (this.finished) {
- this.finalize();
- return .{ .done = {} };
- }
-
- if (view != .zero) {
- this.view.set(this.globalThis(), view);
- this.buf = read_buf;
- if (!this.isWatching())
- this.watch(fd);
+ pub fn deinit(this: *Lazy) void {
+ switch (this.*) {
+ .blob => |blob| {
+ blob.deref();
+ },
+ .readable => {
+ this.readable.deinit();
+ },
+ .empty => {},
}
-
- return .{
- .pending = &this.pending,
- };
+ this.* = .{ .empty = {} };
}
- }
-
- if (comptime Environment.isLinux) {
- if (pipe_is_empty_on_linux) {
- std.debug.assert(this.poll_ref == null);
- if (view != .zero) {
- this.view.set(this.globalThis(), view);
- this.buf = read_buf;
- }
+ };
- this.watch(fd);
- return .{
- .pending = &this.pending,
- };
+ pub fn deinit(this: *Readable) void {
+ switch (this.*) {
+ .FIFO => {
+ this.FIFO.close();
+ },
+ .File => {
+ this.File.deinit();
+ },
}
- } else if (this.isFIFO() and this.poll_ref == null and available_to_read == null) {
- // we don't know if it's readable or not
- if (!bun.isReadable(fd)) {
- if (free_buffer_on_error) {
- bun.default_allocator.free(buf_to_use);
- buf_to_use = read_buf;
- }
-
- if (view != .zero) {
- this.view.set(this.globalThis(), view);
- this.buf = read_buf;
- }
+ }
- this.watch(fd);
- return .{
- .pending = &this.pending,
- };
+ pub fn isClosed(this: *Readable) bool {
+ switch (this.*) {
+ .FIFO => {
+ return this.FIFO.isClosed();
+ },
+ .File => {
+ return this.File.isClosed();
+ },
}
}
- switch (Syscall.read(fd, buf_to_use)) {
- .err => |err| {
- const retry = std.os.E.AGAIN;
- const errno = brk: {
- const _errno = err.getErrno();
- if (comptime Environment.isLinux) {
- // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already
- // sent a 0
- // Let's retry later.
- if (this.isFIFO() and
- !this.close_on_eof and _errno == .PERM)
- {
- break :brk .AGAIN;
- }
+ pub fn close(this: *Readable) void {
+ switch (this.*) {
+ .FIFO => {
+ this.FIFO.close();
+ },
+ .File => {
+ if (this.File.concurrent) |concurrent| {
+ this.File.concurrent = null;
+ concurrent.close();
}
- break :brk _errno;
- };
-
- switch (errno) {
- retry => {
- if (this.finished) {
- if (this.poll_ref) |poll| {
- this.poll_ref = null;
- poll.deinit();
- }
-
- return .{ .done = {} };
- }
+ this.File.close();
+ },
+ }
+ }
- if (free_buffer_on_error) {
- bun.default_allocator.free(buf_to_use);
- buf_to_use = read_buf;
- }
+ pub fn read(
+ this: *Readable,
+ read_buf: []u8,
+ view: JSC.JSValue,
+ global: *JSC.JSGlobalObject,
+ ) StreamResult {
+ return switch (std.meta.activeTag(this.*)) {
+ .FIFO => this.FIFO.readFromJS(read_buf, view, global),
+ .File => this.File.readFromJS(read_buf, view, global),
+ };
+ }
- if (view != .zero) {
- this.view.set(this.globalThis(), view);
- this.buf = read_buf;
- if (!this.isWatching())
- this.watch(this.fd);
- }
+ pub fn isSeekable(this: Readable) bool {
+ if (this == .File) {
+ return this.File.isSeekable();
+ }
- return .{
- .pending = &this.pending,
- };
- },
- else => {},
- }
- const sys = if (this.store.data.file.pathlike == .path and this.store.data.file.pathlike.path.slice().len > 0)
- err.withPath(this.store.data.file.pathlike.path.slice())
- else
- err;
+ return false;
+ }
- this.finalize();
- return .{ .err = sys };
- },
- .result => |result| {
- if (this.isFIFO()) {
- if (this.poll_ref) |poll| {
- if (comptime Environment.isLinux) {
- // do not insert .eof here
- if (result < buf_to_use.len)
- poll.flags.remove(.readable);
- } else {
- // Since we have no way of querying FIFO capacity
- // its only okay to read when kqueue says its readable
- // otherwise we might block the process
- poll.flags.remove(.readable);
- }
- }
+ pub fn watch(this: *Readable) void {
+ switch (this.*) {
+ .FIFO => {
+ if (!this.FIFO.isWatching())
+ this.FIFO.watch(this.FIFO.fd);
+ },
+ }
+ }
+ };
- if (!this.finished and !this.isWatching())
- this.watch(fd);
- }
+ pub inline fn globalThis(this: *FileReader) *JSC.JSGlobalObject {
+ return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis;
+ }
- if (result == 0 and free_buffer_on_error) {
- bun.default_allocator.free(buf_to_use);
- buf_to_use = read_buf;
- } else if (free_buffer_on_error) {
- this.view.clear();
- this.buf = &.{};
- return this.handleReadChunk(result, view, true, buf_to_use);
- }
+ const run_on_different_thread_size = bun.huge_allocator_threshold;
- if (result == 0 and this.isFIFO() and view != .zero) {
- this.view.set(this.globalThis(), view);
- this.buf = read_buf;
- return .{
- .pending = &this.pending,
- };
- }
+ pub const tag = ReadableStream.Tag.File;
- return this.handleReadChunk(result, view, false, buf_to_use);
+ pub fn fromReadable(this: *FileReader, chunk_size: Blob.SizeType, readable_: *Readable) void {
+ this.* = .{
+ .lazy_readable = .{
+ .readable = readable_.*,
},
- }
+ };
+ this.user_chunk_size = chunk_size;
}
- /// Called from Poller
- pub fn ready(this: *FileReader, sizeOrOffset: i64) void {
- const view = this.view.get() orelse .zero;
- defer this.view.clear();
+ pub fn finish(this: *FileReader) void {
+ this.lazy_readable.finish();
+ }
- const available_to_read: usize = if (comptime Environment.isMac) brk: {
- if (this.isFIFO()) {
- break :brk @intCast(usize, @maximum(sizeOrOffset, 0));
- } else if (std.os.S.ISREG(this.mode)) {
- // Returns when the file pointer is not at the end of
- // file. data contains the offset from current position
- // to end of file, and may be negative.
- break :brk @intCast(usize, @maximum(sizeOrOffset, 0));
- }
- break :brk std.math.maxInt(usize);
- } else std.math.maxInt(usize);
- if (this.finalized and this.scheduled_count == 0) {
- if (this.pending.state == .pending) {
- // should never be reached
- this.pending.result = .{
- .err = Syscall.Error.todo,
- };
- resume this.pending.frame;
- }
- this.deinit();
- return;
- }
+ pub fn onStart(this: *FileReader) StreamStart {
+ if (!this.started) {
+ this.started = true;
+
+ switch (this.lazy_readable) {
+ .blob => |blob| {
+ defer blob.deref();
+ var readable_file: File = .{ .loop = this.globalThis().bunVM().eventLoop() };
+ const result = readable_file.start(&blob.data.file);
+ if (result != .ready) {
+ return result;
+ }
- // If we do nothing here, stop watching the file descriptor
- var unschedule = this.poll_ref != null;
- defer {
- if (unschedule) {
- if (this.poll_ref) |ref| {
- _ = ref.unregister(this.globalThis().bunVM().uws_event_loop.?);
- }
+ if (std.os.S.ISFIFO(readable_file.mode)) {
+ this.lazy_readable = .{
+ .readable = .{
+ .FIFO = FIFO{
+ .fd = readable_file.fd,
+ .auto_close = readable_file.auto_close,
+ },
+ },
+ };
+ } else {
+ this.lazy_readable = .{
+ .readable = .{ .File = readable_file },
+ };
+ }
+ },
+ .readable => {},
+ .empty => return .{ .empty = {} },
}
}
- if (this.cancelled) {
- return;
- }
- if (this.buf.len == 0) {
- return;
- } else {
- this.buf.len = @minimum(this.buf.len, available_to_read);
+ if (this.readable().* == .File) {
+ const chunk_size = this.readable().File.calculateChunkSize(std.math.maxInt(usize));
+ return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
}
- this.pending.result = this.read(
- this.buf,
- view,
- if (available_to_read == std.math.maxInt(usize))
- null
- else
- @truncate(c_int, @intCast(isize, available_to_read)),
- );
- unschedule = false;
- this.pending.run();
+ return .{ .chunk_size = if (this.user_chunk_size == 0) default_fifo_chunk_size else this.user_chunk_size };
}
- pub fn finalize(this: *FileReader) void {
- if (this.finalized)
- return;
-
- this.signal.close(null);
-
- if (this.buffered_data.cap > 0) {
- this.buffered_data.listManaged(bun.default_allocator).deinit();
- this.buffered_data.cap = 0;
- }
-
- this.finished = true;
+ pub fn onPullInto(this: *FileReader, buffer: []u8, view: JSC.JSValue) StreamResult {
+ std.debug.assert(this.started);
+ return this.readable().read(buffer, view, this.globalThis());
+ }
- if (this.poll_ref) |poll| {
- this.poll_ref = null;
- poll.deinit();
+ fn isFIFO(this: *const FileReader) bool {
+ if (this.lazy_readable == .readable) {
+ return this.lazy_readable.readable == .FIFO;
}
- this.finalized = true;
-
- this.pending.result = .{ .done = {} };
- this.pending.run();
-
- this.view.deinit();
- this.buf = &.{};
-
- this.maybeAutoClose();
+ return false;
+ }
- this.store.deref();
+ pub fn finalize(this: *FileReader) void {
+ this.lazy_readable.deinit();
}
pub fn onCancel(this: *FileReader) void {
@@ -4022,7 +4365,7 @@ pub const FileReader = struct {
pub fn deinit(this: *FileReader) void {
this.finalize();
- if (this.scheduled_count == 0 and this.pending.state == .pending) {
+ if (this.lazy_readable.isClosed()) {
this.destroy();
}
}
@@ -4032,18 +4375,30 @@ pub const FileReader = struct {
}
pub fn setRefOrUnref(this: *FileReader, value: bool) void {
- if (this.poll_ref) |poll| {
- if (value) {
- poll.enableKeepingProcessAlive(this.globalThis().bunVM());
- } else {
- poll.disableKeepingProcessAlive(this.globalThis().bunVM());
+ if (this.lazy_readable == .readable) {
+ switch (this.lazy_readable.readable) {
+ .FIFO => {
+ if (this.lazy_readable.readable.FIFO.poll_ref) |poll| {
+ if (value) {
+ poll.enableKeepingProcessAlive(this.globalThis().bunVM());
+ } else {
+ poll.disableKeepingProcessAlive(this.globalThis().bunVM());
+ }
+ }
+ },
+ .File => {
+ if (value)
+ this.lazy_readable.readable.File.poll_ref.ref(JSC.VirtualMachine.vm)
+ else
+ this.lazy_readable.readable.File.poll_ref.unref(JSC.VirtualMachine.vm);
+ },
}
}
}
pub fn drainInternalBuffer(this: *FileReader) bun.ByteList {
- var buffered = this.buffered_data;
- if (buffered.len > 0) {
+ const buffered = this.buffered_data;
+ if (buffered.cap > 0) {
this.buffered_data = .{};
}
@@ -4093,8 +4448,9 @@ pub fn NewReadyWatcher(
ready(this, sizeOrOffset);
}
- pub fn unwatch(this: *Context, fd: JSC.Node.FileDescriptor) void {
- std.debug.assert(@intCast(JSC.Node.FileDescriptor, this.poll_ref.?.fd) == fd);
+ pub fn unwatch(this: *Context, fd_: anytype) void {
+ const fd = @intCast(c_int, fd_);
+ std.debug.assert(@intCast(c_int, this.poll_ref.?.fd) == fd);
std.debug.assert(
this.poll_ref.?.unregister(JSC.VirtualMachine.vm.uws_event_loop.?) == .result,
);
@@ -4121,7 +4477,8 @@ pub fn NewReadyWatcher(
return false;
}
- pub fn watch(this: *Context, fd: JSC.Node.FileDescriptor) void {
+ pub fn watch(this: *Context, fd_: anytype) void {
+ const fd = @intCast(c_int, fd_);
var poll_ref: *JSC.FilePoll = this.poll_ref orelse brk: {
this.poll_ref = JSC.FilePoll.init(
JSC.VirtualMachine.vm,
@@ -4133,6 +4490,7 @@ pub fn NewReadyWatcher(
break :brk this.poll_ref.?;
};
std.debug.assert(poll_ref.fd == fd);
+ std.debug.assert(!this.isWatching());
switch (poll_ref.register(JSC.VirtualMachine.vm.uws_event_loop.?, flag, true)) {
.err => |err| {
bun.unreachablePanic("FilePoll.register failed: {d}", .{err.errno});
@@ -4154,3 +4512,4 @@ pub fn NewReadyWatcher(
// pub fn onError(this: *Streamer): anytype,
// };
// }
+