aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api')
-rw-r--r--src/bun.js/api/bun.zig15
-rw-r--r--src/bun.js/api/bun/spawn.zig6
-rw-r--r--src/bun.js/api/bun/subprocess.zig527
-rw-r--r--src/bun.js/api/html_rewriter.zig3
-rw-r--r--src/bun.js/api/server.zig15
5 files changed, 280 insertions, 286 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index 3a88f7a04..ee26b09f5 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -2448,8 +2448,7 @@ pub const Timer = struct {
}
pub fn deinit(this: *Timeout) void {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
var vm = this.globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -2465,7 +2464,7 @@ pub const Timer = struct {
countdown: JSValue,
repeat: bool,
) !void {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
var vm = globalThis.bunVM();
// We don't deal with nesting levels directly
@@ -2534,7 +2533,7 @@ pub const Timer = struct {
callback: JSValue,
countdown: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
const id = globalThis.bunVM().timer.last_id;
globalThis.bunVM().timer.last_id +%= 1;
@@ -2548,7 +2547,7 @@ pub const Timer = struct {
callback: JSValue,
countdown: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
const id = globalThis.bunVM().timer.last_id;
globalThis.bunVM().timer.last_id +%= 1;
@@ -2559,7 +2558,7 @@ pub const Timer = struct {
}
pub fn clearTimer(timer_id: JSValue, _: *JSGlobalObject, repeats: bool) void {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
var map = if (repeats) &VirtualMachine.vm.timer.interval_map else &VirtualMachine.vm.timer.timeout_map;
const id: Timeout.ID = .{
@@ -2580,7 +2579,7 @@ pub const Timer = struct {
globalThis: *JSGlobalObject,
id: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
Timer.clearTimer(id, globalThis, false);
return JSValue.jsUndefined();
}
@@ -2588,7 +2587,7 @@ pub const Timer = struct {
globalThis: *JSGlobalObject,
id: JSValue,
) callconv(.C) JSValue {
- if (comptime is_bindgen) unreachable;
+ JSC.markBinding(@src());
Timer.clearTimer(id, globalThis, true);
return JSValue.jsUndefined();
}
diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig
index d594d44a7..afcc5509b 100644
--- a/src/bun.js/api/bun/spawn.zig
+++ b/src/bun.js/api/bun/spawn.zig
@@ -57,8 +57,6 @@ pub const PosixSpawn = struct {
} else {
_ = system.posix_spawnattr_destroy(&self.attr);
}
-
- self.* = undefined;
}
pub fn get(self: Attr) !u16 {
@@ -207,6 +205,10 @@ pub const PosixSpawn = struct {
argv,
envp,
);
+ if (comptime bun.Environment.allow_assert)
+ JSC.Node.Syscall.syslog("posix_spawn({s}) = {d} ({d})", .{
+ path, rc, pid,
+ });
if (comptime bun.Environment.isLinux) {
// rc is negative because it's libc errno
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index e09c5db2b..c85e0396f 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -28,7 +28,6 @@ pub const Subprocess = struct {
stdin: Writable,
stdout: Readable,
stderr: Readable,
-
killed: bool = false,
poll_ref: ?*JSC.FilePoll = null,
@@ -47,8 +46,22 @@ pub const Subprocess = struct {
finalized: bool = false,
globalThis: *JSC.JSGlobalObject,
-
+ observable_getters: std.enums.EnumSet(enum {
+ stdin,
+ stdout,
+ stderr,
+ }) = .{},
has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
+ is_sync: bool = false,
+
+ pub fn hasExited(this: *const Subprocess) bool {
+ return this.exit_code != null or this.waitpid_err != null;
+ }
+
+ pub fn updateHasPendingActivityFlag(this: *Subprocess) void {
+ @fence(.SeqCst);
+ this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst);
+ }
pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
@fence(.Acquire);
@@ -78,7 +91,7 @@ pub const Subprocess = struct {
}
const Readable = union(enum) {
- fd: JSC.Node.FileDescriptor,
+ fd: bun.FileDescriptor,
pipe: Pipe,
inherit: void,
@@ -102,44 +115,37 @@ pub const Subprocess = struct {
return;
}
- if (this.buffer.fd != JSC.Node.invalid_fd) {
- this.buffer.close();
- }
+ this.buffer.close();
}
pub fn toJS(this: *@This(), readable: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue {
- if (this.* == .stream) {
- if (this.stream.ptr == .File) {
- this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable);
- }
- return this.stream.toJS();
+ if (this.* != .stream) {
+ const stream = this.buffer.toReadableStream(globalThis, exited);
+ this.* = .{ .stream = stream };
}
- const is_fifo = this.buffer.is_fifo;
- const stream = this.buffer.toReadableStream(globalThis, exited);
- this.* = .{ .stream = stream };
+
if (this.stream.ptr == .File) {
- this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable);
- this.stream.ptr.File.is_fifo = is_fifo;
+ this.stream.ptr.File.setSignal(JSC.WebCore.Signal.init(readable));
}
- return stream.value;
+
+ return this.stream.toJS();
}
};
- pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable {
+ pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
+ _ = JSC.Node.Syscall.close(other_fd);
break :brk .{
.pipe = .{
- .buffer = BufferedOutput{
- .fd = fd,
- .is_fifo = true,
- },
+ .buffer = undefined,
},
};
},
- .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) },
+ .path => Readable{ .ignore = {} },
+ .blob, .fd => Readable{ .fd = @intCast(bun.FileDescriptor, fd) },
else => unreachable,
};
}
@@ -159,7 +165,7 @@ pub const Subprocess = struct {
},
.pipe => {
if (this.pipe == .stream and this.pipe.stream.ptr == .File)
- this.pipe.stream.ptr.File.signal.clear();
+ this.pipe.stream.ptr.File.readable().FIFO.signal.clear();
this.pipe.done();
},
else => {},
@@ -190,10 +196,8 @@ pub const Subprocess = struct {
.pipe => {
defer this.close();
- if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) {
- if (this.pipe.buffer.canRead())
- this.pipe.buffer.readIfPossible(true);
- }
+ if (this.pipe.buffer.canRead())
+ this.pipe.buffer.readAll();
var bytes = this.pipe.buffer.internal_buffer.slice();
this.pipe.buffer.internal_buffer = .{};
@@ -216,6 +220,7 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
+ this.observable_getters.insert(.stderr);
return this.stderr.toJS(globalThis, this.exit_code != null);
}
@@ -223,6 +228,7 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
+ this.observable_getters.insert(.stdin);
return this.stdin.toJS(globalThis);
}
@@ -230,6 +236,7 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
+ this.observable_getters.insert(.stdout);
return this.stdout.toJS(globalThis, this.exit_code != null);
}
@@ -291,41 +298,22 @@ pub const Subprocess = struct {
return .{ .result = {} };
}
- pub fn onKill(
- this: *Subprocess,
- ) void {
- if (this.killed) {
- return;
- }
-
- this.killed = true;
- this.closePorts();
+ fn hasCalledGetter(this: *Subprocess, comptime getter: @Type(.EnumLiteral)) bool {
+ return this.observable_getters.contains(getter);
}
- pub fn closePorts(this: *Subprocess) void {
- const pidfd = this.pidfd;
-
- if (comptime Environment.isLinux) {
- this.pidfd = std.math.maxInt(std.os.fd_t);
+ fn closeProcess(this: *Subprocess) void {
+ if (comptime !Environment.isLinux) {
+ return;
}
- defer {
- if (comptime Environment.isLinux) {
- if (pidfd != std.math.maxInt(std.os.fd_t)) {
- _ = std.os.close(pidfd);
- }
- }
- }
+ const pidfd = this.pidfd;
- if (this.stdout == .pipe) {
- this.stdout.pipe.finish();
- }
+ this.pidfd = std.math.maxInt(std.os.fd_t);
- if (this.stderr == .pipe) {
- this.stderr.pipe.finish();
+ if (pidfd != std.math.maxInt(std.os.fd_t)) {
+ _ = std.os.close(pidfd);
}
-
- this.stdin.close();
}
pub fn doRef(this: *Subprocess, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSValue {
@@ -354,7 +342,7 @@ pub const Subprocess = struct {
pub const BufferedInput = struct {
remain: []const u8 = "",
- fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
+ fd: bun.FileDescriptor = bun.invalid_fd,
poll_ref: ?*JSC.FilePoll = null,
written: usize = 0,
@@ -366,6 +354,10 @@ pub const Subprocess = struct {
pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady);
pub fn onReady(this: *BufferedInput, _: i64) void {
+ if (this.fd == bun.invalid_fd) {
+ return;
+ }
+
this.write();
}
@@ -395,10 +387,14 @@ pub const Subprocess = struct {
}
}
- this.write();
+ this.writeAllowBlocking(is_sync);
}
pub fn write(this: *BufferedInput) void {
+ this.writeAllowBlocking(false);
+ }
+
+ pub fn writeAllowBlocking(this: *BufferedInput, allow_blocking: bool) void {
var to_write = this.remain;
if (to_write.len == 0) {
@@ -450,7 +446,7 @@ pub const Subprocess = struct {
to_write = to_write[bytes_written..];
// we are done or it accepts no more input
- if (this.remain.len == 0 or bytes_written == 0) {
+ if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) {
this.deinit();
return;
}
@@ -465,9 +461,9 @@ pub const Subprocess = struct {
poll.deinit();
}
- if (this.fd != JSC.Node.invalid_fd) {
+ if (this.fd != bun.invalid_fd) {
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = JSC.Node.invalid_fd;
+ this.fd = bun.invalid_fd;
}
}
@@ -487,186 +483,137 @@ pub const Subprocess = struct {
pub const BufferedOutput = struct {
internal_buffer: bun.ByteList = .{},
- max_internal_buffer: u32 = default_max_buffer_size,
- fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
- received_eof: bool = false,
- pending_error: ?JSC.Node.Syscall.Error = null,
- poll_ref: ?*JSC.FilePoll = null,
- is_fifo: bool = false,
+ fifo: JSC.WebCore.FIFO = undefined,
+ auto_sizer: JSC.WebCore.AutoSizer = undefined,
+ status: Status = .{
+ .pending = {},
+ },
- pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready);
+ pub const Status = union(enum) {
+ pending: void,
+ done: void,
+ err: JSC.Node.Syscall.Error,
+ };
- pub fn ready(this: *BufferedOutput, available_to_read: i64) void {
- if (comptime Environment.isMac) {
- if (this.poll_ref) |poll| {
- if (available_to_read > 0) {
- poll.flags.insert(.readable);
- } else {
- poll.flags.remove(.readable);
- }
- }
- }
+ pub fn init(fd: bun.FileDescriptor) BufferedOutput {
+ return BufferedOutput{
+ .internal_buffer = .{},
+ .fifo = JSC.WebCore.FIFO{
+ .fd = fd,
+ },
+ };
+ }
- // TODO: what happens if the task was already enqueued after unwatch()?
- this.readAll(false);
+ pub fn setup(this: *BufferedOutput, allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) void {
+ this.* = init(fd);
+ this.auto_sizer = .{
+ .max = max_size,
+ .allocator = allocator,
+ .buffer = &this.internal_buffer,
+ };
+ this.watch();
}
pub fn canRead(this: *BufferedOutput) bool {
- const is_readable = bun.isReadable(this.fd);
-
- if (is_readable) {
- if (this.poll_ref) |poll_ref| {
- poll_ref.flags.insert(.readable);
- poll_ref.flags.insert(.fifo);
- std.debug.assert(poll_ref.flags.contains(.poll_readable));
- }
- }
-
- return is_readable;
+ return bun.isReadable(this.fifo.fd);
}
- pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void {
- if (comptime !force) {
- // we ask, "Is it possible to read right now?"
- // we do this rather than epoll or kqueue()
- // because we don't want to block the thread waiting for the read
- // and because kqueue or epoll might return other unrelated events
- // and we don't want this to become an event loop ticking point
- if (!this.canRead()) {
- this.watch(this.fd);
+ pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void {
+ switch (result) {
+ .pending => {
+ this.watch();
return;
- }
- }
-
- this.readAll(force);
- }
-
- pub fn closeOnEOF(this: *BufferedOutput) bool {
- var poll = this.poll_ref orelse return true;
- poll.flags.insert(.eof);
- return false;
- }
-
- pub fn readAll(this: *BufferedOutput, comptime force: bool) void {
- if (this.poll_ref) |poll| {
- const is_readable = poll.isReadable();
- if (!is_readable and poll.isEOF()) {
- if (poll.isHUP()) {
- this.autoCloseFileDescriptor();
- }
+ },
+ .err => |err| {
+ this.status = .{ .err = err };
+ this.fifo.close();
return;
- } else if (!is_readable and poll.isHUP()) {
- this.autoCloseFileDescriptor();
- return;
- } else if (!is_readable) {
+ },
+ .done => {
+ this.status = .{ .done = {} };
+ this.fifo.close();
return;
- }
+ },
+ else => {
+ const slice = result.slice();
+ this.internal_buffer.len += @truncate(u32, slice.len);
+ if (slice.len > 0)
+ std.debug.assert(this.internal_buffer.contains(slice));
+
+ if (result.isDone()) {
+ this.status = .{ .done = {} };
+ this.fifo.close();
+ }
+ },
}
+ }
- // read as much as we can from the pipe
- while (this.internal_buffer.len < this.max_internal_buffer) {
- var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined;
-
- var buf: []u8 = buffer_[0..];
-
- var available = this.internal_buffer.ptr[this.internal_buffer.len..this.internal_buffer.cap];
- if (available.len >= buf.len) {
- buf = available;
+ pub fn readAll(this: *BufferedOutput) void {
+ while (@as(usize, this.internal_buffer.len) < this.auto_sizer.max and this.status == .pending) {
+ var stack_buffer: [8096]u8 = undefined;
+ var stack_buf: []u8 = stack_buffer[0..];
+ var buf_to_use = stack_buf;
+ var available = this.internal_buffer.available();
+ if (available.len >= stack_buf.len) {
+ buf_to_use = available;
}
- switch (JSC.Node.Syscall.read(this.fd, buf)) {
- .err => |e| {
- if (e.isRetry()) {
- if (!this.isWatching() and this.isFIFO())
- this.watch(this.fd);
- this.poll_ref.?.flags.insert(.fifo);
- return;
- }
-
- if (comptime Environment.isMac) {
- // INTR is returned on macOS when the process is killed
- // It probably sent SIGPIPE but we have the handler for
- // that disabled.
- // We know it's the "real" INTR because we use read$NOCANCEL
- if (e.getErrno() == .INTR) {
- this.received_eof = true;
- this.autoCloseFileDescriptor();
- return;
- }
- } else {
- if (comptime Environment.allow_assert) {
- std.debug.assert(e.getErrno() != .INTR); // Bun's read() function should retry on EINTR
- }
- }
+ const result = this.fifo.read(buf_to_use, this.fifo.to_read);
- // fail
- log("readAll() fail: {s}", .{@tagName(e.getErrno())});
- this.pending_error = e;
- this.internal_buffer.listManaged(bun.default_allocator).deinit();
- this.internal_buffer = .{};
+ switch (result) {
+ .pending => {
+ this.watch();
return;
},
+ .err => |err| {
+ this.status = .{ .err = err };
+ this.fifo.close();
- .result => |bytes_read| {
- log("readAll() {d}", .{bytes_read});
-
- if (bytes_read > 0) {
- if (buf.ptr == available.ptr) {
- this.internal_buffer.len += @truncate(u32, bytes_read);
- } else {
- _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory");
- }
- }
-
- if (comptime !force) {
- if (buf[bytes_read..].len > 0 or !this.canRead()) {
- if (!this.isWatching())
- this.watch(this.fd);
- if (this.is_fifo)
- this.poll_ref.?.flags.insert(.fifo)
- else
- this.received_eof = true;
- return;
- }
+ return;
+ },
+ .done => {
+ this.status = .{ .done = {} };
+ this.fifo.close();
+ return;
+ },
+ .read => |slice| {
+ if (slice.ptr == stack_buf.ptr) {
+ this.internal_buffer.append(this.auto_sizer.allocator, slice) catch @panic("out of memory");
} else {
- // we consider a short read as being EOF
- this.received_eof = !this.is_fifo and this.received_eof or bytes_read < buf.len;
- if (this.received_eof) {
- if (this.closeOnEOF()) {
- this.autoCloseFileDescriptor();
- }
+ this.internal_buffer.len += @truncate(u32, slice.len);
+ }
- // do not auto-close the file descriptor here
- // it's totally legit to have a short read
- return;
- }
+ if (slice.len < buf_to_use.len) {
+ this.watch();
+ return;
}
},
}
}
}
+ fn watch(this: *BufferedOutput) void {
+ this.fifo.pending.set(BufferedOutput, this, onRead);
+ if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd);
+ return;
+ }
+
pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob {
const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis);
this.internal_buffer = bun.ByteList.init("");
- std.debug.assert(this.fd == JSC.Node.invalid_fd);
- std.debug.assert(this.received_eof);
return blob;
}
pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream {
if (exited) {
// exited + received EOF => no more read()
- if (this.received_eof) {
- var poll_ref = this.poll_ref;
- this.poll_ref = null;
-
- this.autoCloseFileDescriptor();
-
+ if (this.fifo.isClosed()) {
// also no data at all
if (this.internal_buffer.len == 0) {
- this.close();
+ if (this.internal_buffer.cap > 0) {
+ this.internal_buffer.deinitWithAllocator(this.auto_sizer.allocator);
+ }
// so we return an empty stream
return JSC.WebCore.ReadableStream.fromJS(
JSC.WebCore.ReadableStream.empty(globalThis),
@@ -675,70 +622,52 @@ pub const Subprocess = struct {
}
return JSC.WebCore.ReadableStream.fromJS(
- JSC.WebCore.ReadableStream.fromBlobWithPoll(
+ JSC.WebCore.ReadableStream.fromBlob(
globalThis,
&this.toBlob(globalThis),
0,
- poll_ref,
),
globalThis,
).?;
+ } else {
+ this.fifo.close_on_empty_read = true;
}
}
- std.debug.assert(this.fd != JSC.Node.invalid_fd);
{
- var poll_ref = this.poll_ref;
- this.poll_ref = null;
+ const internal_buffer = this.internal_buffer;
+ this.internal_buffer = bun.ByteList.init("");
// There could still be data waiting to be read in the pipe
// so we need to create a new stream that will read from the
// pipe and then return the blob.
- var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis);
const result = JSC.WebCore.ReadableStream.fromJS(
- JSC.WebCore.ReadableStream.fromBlobWithPoll(
+ JSC.WebCore.ReadableStream.fromFIFO(
globalThis,
- &blob,
- 0,
- poll_ref,
+ &this.fifo,
+ internal_buffer,
),
globalThis,
).?;
- blob.detach();
- result.ptr.File.buffered_data = this.internal_buffer;
- result.ptr.File.stored_global_this_ = globalThis;
- result.ptr.File.finished = exited;
- this.internal_buffer = bun.ByteList.init("");
- this.fd = JSC.Node.invalid_fd;
- this.received_eof = false;
return result;
}
}
- pub fn autoCloseFileDescriptor(this: *BufferedOutput) void {
- const fd = this.fd;
- if (fd == JSC.Node.invalid_fd)
- return;
- this.fd = JSC.Node.invalid_fd;
-
- if (this.poll_ref) |poll| {
- this.poll_ref = null;
- poll.deinit();
- }
-
- _ = JSC.Node.Syscall.close(fd);
- }
-
pub fn close(this: *BufferedOutput) void {
- this.autoCloseFileDescriptor();
+ switch (this.status) {
+ .done => {},
+ .pending => {
+ this.fifo.close();
+ this.status = .{ .done = {} };
+ },
+ .err => {},
+ }
if (this.internal_buffer.cap > 0) {
this.internal_buffer.listManaged(bun.default_allocator).deinit();
this.internal_buffer = .{};
}
-
- this.received_eof = true;
}
};
@@ -748,7 +677,7 @@ pub const Subprocess = struct {
pipe: *JSC.WebCore.FileSink,
readable_stream: JSC.WebCore.ReadableStream,
},
- fd: JSC.Node.FileDescriptor,
+ fd: bun.FileDescriptor,
buffered_input: BufferedInput,
inherit: void,
ignore: void,
@@ -763,7 +692,7 @@ pub const Subprocess = struct {
pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
- pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
+ pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
switch (stdio) {
.pipe => {
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
@@ -771,7 +700,9 @@ pub const Subprocess = struct {
.fd = fd,
.buffer = bun.ByteList.init(&.{}),
.allocator = globalThis.bunVM().allocator,
+ .auto_close = true,
};
+ if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
sink.mode = std.os.S.IFIFO;
if (stdio == .pipe) {
if (stdio.pipe) |readable| {
@@ -787,6 +718,7 @@ pub const Subprocess = struct {
return Writable{ .pipe = sink };
},
.array_buffer, .blob => {
+ if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined };
switch (stdio) {
.array_buffer => |array_buffer| {
@@ -800,7 +732,7 @@ pub const Subprocess = struct {
return Writable{ .buffered_input = buffered_input };
},
.fd => {
- return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) };
+ return Writable{ .fd = @intCast(bun.FileDescriptor, fd) };
},
.inherit => {
return Writable{ .inherit = {} };
@@ -842,15 +774,21 @@ pub const Subprocess = struct {
}
};
- pub fn finalize(this: *Subprocess) callconv(.C) void {
- this.unref();
- this.closePorts();
- this.stdout.close();
+ pub fn finalizeSync(this: *Subprocess) void {
+ this.closeProcess();
+ this.stdin.close();
this.stderr.close();
+ this.stdin.close();
- this.finalized = true;
- bun.default_allocator.destroy(this);
+ this.exit_promise.deinit();
+ this.on_exit_callback.deinit();
+ }
+
+ pub fn finalize(this: *Subprocess) callconv(.C) void {
+ std.debug.assert(!this.hasPendingActivity());
+ this.finalizeSync();
log("Finalize", .{});
+ bun.default_allocator.destroy(this);
}
pub fn getExited(
@@ -1203,18 +1141,27 @@ pub const Subprocess = struct {
.globalThis = globalThis,
.pid = pid,
.pidfd = pidfd,
- .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch {
+ .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch {
globalThis.throw("out of memory", .{});
return .zero;
},
- .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis),
- .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis),
+ .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis),
+ .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
+ .is_sync = is_sync,
};
if (subprocess.stdin == .pipe) {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
}
+ if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
+ subprocess.stdout.pipe.buffer.setup(jsc_vm.allocator, stdout_pipe[0], default_max_buffer_size);
+ }
+
+ if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
+ subprocess.stderr.pipe.buffer.setup(jsc_vm.allocator, stderr_pipe[0], default_max_buffer_size);
+ }
+
const out = if (comptime !is_sync)
subprocess.toJS(globalThis)
else
@@ -1223,7 +1170,7 @@ pub const Subprocess = struct {
if (comptime !is_sync) {
var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess);
subprocess.poll_ref = poll;
- switch (poll.register(
+ switch (subprocess.poll_ref.?.register(
jsc_vm.uws_event_loop.?,
.process,
true,
@@ -1252,20 +1199,20 @@ pub const Subprocess = struct {
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
if (comptime is_sync) {
if (subprocess.stdout.pipe.buffer.canRead()) {
- subprocess.stdout.pipe.buffer.readAll(true);
+ subprocess.stdout.pipe.buffer.readAll();
}
} else if (!lazy) {
- subprocess.stdout.pipe.buffer.readIfPossible(false);
+ subprocess.stdout.pipe.buffer.readAll();
}
}
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
if (comptime is_sync) {
if (subprocess.stderr.pipe.buffer.canRead()) {
- subprocess.stderr.pipe.buffer.readAll(true);
+ subprocess.stderr.pipe.buffer.readAll();
}
} else if (!lazy) {
- subprocess.stderr.pipe.buffer.readIfPossible(false);
+ subprocess.stderr.pipe.buffer.readAll();
}
}
@@ -1273,11 +1220,43 @@ pub const Subprocess = struct {
return out;
}
- subprocess.wait(true);
+ if (subprocess.stdin == .buffered_input) {
+ while (subprocess.stdin.buffered_input.remain.len > 0) {
+ subprocess.stdin.buffered_input.writeIfPossible(true);
+ }
+ }
+
+ {
+ var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess);
+ subprocess.poll_ref = poll;
+ switch (subprocess.poll_ref.?.register(
+ jsc_vm.uws_event_loop.?,
+ .process,
+ true,
+ )) {
+ .result => {},
+ .err => |err| {
+ if (err.getErrno() == .SRCH) {
+ @panic("This shouldn't happen");
+ }
+
+ // process has already exited
+ // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
+ subprocess.onExitNotification();
+ },
+ }
+ }
+
+ while (!subprocess.hasExited()) {
+ jsc_vm.tick();
+ jsc_vm.eventLoop().autoTick();
+ }
+
+ // subprocess.wait(true);
const exitCode = subprocess.exit_code orelse 1;
const stdout = subprocess.stdout.toBufferedValue(globalThis);
const stderr = subprocess.stderr.toBufferedValue(globalThis);
- subprocess.finalize();
+ subprocess.finalizeSync();
const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4);
sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode)));
@@ -1290,16 +1269,22 @@ pub const Subprocess = struct {
pub fn onExitNotification(
this: *Subprocess,
) void {
- this.wait(false);
+ this.wait(this.is_sync);
}
pub fn wait(this: *Subprocess, sync: bool) void {
if (this.has_waitpid_task) {
return;
}
-
+ defer this.updateHasPendingActivityFlag();
this.has_waitpid_task = true;
const pid = this.pid;
+
+ if (!sync) {
+ // signal to the other end we are definitely done
+ this.stdin.close();
+ }
+
switch (PosixSpawn.waitpid(pid, 0)) {
.err => |err| {
this.waitpid_err = err;
@@ -1331,14 +1316,16 @@ pub const Subprocess = struct {
poll.deinitWithVM(vm);
}
- this.onExit();
+ this.onExit(this.globalThis);
}
}
- fn onExit(this: *Subprocess) void {
- defer this.updateHasPendingActivity();
- this.closePorts();
+ fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void {
+ this.stdin.close();
+ this.stdout.close();
+ this.stderr.close();
+ defer this.updateHasPendingActivity();
this.has_waitpid_task = false;
if (this.on_exit_callback.trySwap()) |callback| {
@@ -1349,7 +1336,7 @@ pub const Subprocess = struct {
const waitpid_value: JSValue =
if (this.waitpid_err) |err|
- err.toJSC(this.globalThis)
+ err.toJSC(globalThis)
else
JSC.JSValue.jsUndefined();
@@ -1359,21 +1346,21 @@ pub const Subprocess = struct {
};
const result = callback.call(
- this.globalThis,
+ globalThis,
args[0 .. @as(usize, @boolToInt(this.exit_code != null)) + @as(usize, @boolToInt(this.waitpid_err != null))],
);
- if (result.isAnyError(this.globalThis)) {
- this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
+ if (result.isAnyError(globalThis)) {
+ globalThis.bunVM().onUnhandledError(globalThis, result);
}
}
if (this.exit_promise.trySwap()) |promise| {
if (this.exit_code) |code| {
- promise.asPromise().?.resolve(this.globalThis, JSValue.jsNumber(code));
+ promise.asPromise().?.resolve(globalThis, JSValue.jsNumber(code));
} else if (this.waitpid_err) |err| {
this.waitpid_err = null;
- promise.asPromise().?.reject(this.globalThis, err.toJSC(this.globalThis));
+ promise.asPromise().?.reject(globalThis, err.toJSC(globalThis));
} else {
// crash in debug mode
if (comptime Environment.allow_assert)
@@ -1395,7 +1382,7 @@ pub const Subprocess = struct {
const Stdio = union(enum) {
inherit: void,
ignore: void,
- fd: JSC.Node.FileDescriptor,
+ fd: bun.FileDescriptor,
path: JSC.Node.PathLike,
blob: JSC.WebCore.AnyBlob,
pipe: ?JSC.WebCore.ReadableStream,
@@ -1454,7 +1441,7 @@ pub const Subprocess = struct {
if (blob.needsToReadFile()) {
if (blob.store()) |store| {
if (store.data.file.pathlike == .fd) {
- if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) {
+ if (store.data.file.pathlike.fd == @intCast(bun.FileDescriptor, i)) {
stdio_array[i] = Stdio{ .inherit = {} };
} else {
switch (@intCast(std.os.fd_t, i)) {
@@ -1520,7 +1507,7 @@ pub const Subprocess = struct {
return false;
}
- const fd = @intCast(JSC.Node.FileDescriptor, fd_);
+ const fd = @intCast(bun.FileDescriptor, fd_);
switch (@intCast(std.os.fd_t, i)) {
std.os.STDIN_FILENO => {
diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig
index 91248dc5c..e99e49f07 100644
--- a/src/bun.js/api/html_rewriter.zig
+++ b/src/bun.js/api/html_rewriter.zig
@@ -817,8 +817,7 @@ fn HandlerCallback(
) (fn (*HandlerType, *LOLHTMLType) bool) {
return struct {
pub fn callback(this: *HandlerType, value: *LOLHTMLType) bool {
- if (comptime JSC.is_bindgen)
- unreachable;
+ JSC.markBinding(@src());
var zig_element = bun.default_allocator.create(ZigType) catch unreachable;
@field(zig_element, field_name) = value;
// At the end of this scope, the value is no longer valid
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index c4724b1b8..531d4830b 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1428,10 +1428,17 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
stream.value.ensureStillAlive();
- if (!stream.isLocked(this.server.globalThis)) {
- streamLog("is not locked", .{});
- this.renderMissing();
- return;
+ const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and
+ response_stream.sink.buffer.len == 0);
+
+ if (!stream.isLocked(this.server.globalThis) and !is_in_progress) {
+ if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| {
+ if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) {
+ streamLog("is not locked", .{});
+ this.renderMissing();
+ return;
+ }
+ }
}
this.resp.onAborted(*ResponseStream, ResponseStream.onAborted, &response_stream.sink);