aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-11-12 18:30:12 -0800
committerGravatar GitHub <noreply@github.com> 2022-11-12 18:30:12 -0800
commit21bf3ddaf23c842dc12a1d76dbd3b48daf08f349 (patch)
tree06706104877984e9f083fed7c3278c9d007193cc /src/bun.js/api
parent514f2a8eddf1a1d35a33cc096ed7403a79afe36f (diff)
downloadbun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.gz
bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.zst
bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.zip
Redo how we poll pipes (#1496)
* Fix pipe * Handle unregistered * Fix failing test
Diffstat (limited to 'src/bun.js/api')
-rw-r--r--src/bun.js/api/bun/spawn.zig13
-rw-r--r--src/bun.js/api/bun/subprocess.zig199
2 files changed, 146 insertions, 66 deletions
diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig
index c1deebd3a..d594d44a7 100644
--- a/src/bun.js/api/bun/spawn.zig
+++ b/src/bun.js/api/bun/spawn.zig
@@ -208,8 +208,17 @@ pub const PosixSpawn = struct {
envp,
);
- if (Maybe(pid_t).errno(rc)) |err| {
- return err;
+ if (comptime bun.Environment.isLinux) {
+ // rc is negative because it's libc errno
+ if (rc > 0) {
+ if (Maybe(pid_t).errnoSysP(-rc, .posix_spawn, path)) |err| {
+ return err;
+ }
+ }
+ } else {
+ if (Maybe(pid_t).errnoSysP(rc, .posix_spawn, path)) |err| {
+ return err;
+ }
}
return Maybe(pid_t){ .result = pid };
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index c82b4744f..dcfa88a40 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -31,7 +31,7 @@ pub const Subprocess = struct {
killed: bool = false,
reffer: JSC.Ref = JSC.Ref.init(),
- poll_ref: JSC.PollRef = JSC.PollRef.init(),
+ poll_ref: ?*JSC.FilePoll = null,
exit_promise: JSC.Strong = .{},
@@ -56,7 +56,7 @@ pub const Subprocess = struct {
pub fn ref(this: *Subprocess) void {
this.reffer.ref(this.globalThis.bunVM());
- this.poll_ref.ref(this.globalThis.bunVM());
+ if (this.poll_ref) |poll| poll.ref(this.globalThis.bunVM());
}
pub fn unref(this: *Subprocess) void {
@@ -96,7 +96,7 @@ pub const Subprocess = struct {
return;
}
- if (this.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ if (this.buffer.fd != JSC.Node.invalid_fd) {
this.buffer.close();
}
}
@@ -184,7 +184,7 @@ pub const Subprocess = struct {
// TODO: handle when there's pending unread data in the pipe
// For some reason, this currently hangs forever
- if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) {
if (this.pipe.buffer.canRead())
this.pipe.buffer.readIfPossible(true);
}
@@ -349,8 +349,8 @@ pub const Subprocess = struct {
pub const BufferedInput = struct {
remain: []const u8 = "",
- fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor),
- poll_ref: JSC.PollRef = .{},
+ fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
+ poll_ref: ?*JSC.FilePoll = null,
written: usize = 0,
source: union(enum) {
@@ -358,14 +358,23 @@ pub const Subprocess = struct {
array_buffer: JSC.ArrayBuffer.Strong,
},
- pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady);
+ pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady);
pub fn onReady(this: *BufferedInput, _: i64) void {
this.write();
}
pub fn canWrite(this: *BufferedInput) bool {
- return bun.isWritable(this.fd);
+ const is_writable = bun.isWritable(this.fd);
+ if (is_writable) {
+ if (this.poll_ref) |poll_ref| {
+ poll_ref.flags.insert(.writable);
+ poll_ref.flags.insert(.fifo);
+ std.debug.assert(poll_ref.flags.contains(.poll_writable));
+ }
+ }
+
+ return is_writable;
}
pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void {
@@ -376,6 +385,7 @@ pub const Subprocess = struct {
// because we don't want to block the thread waiting for the write
if (!this.canWrite()) {
this.watch(this.fd);
+ this.poll_ref.?.flags.insert(.fifo);
return;
}
}
@@ -387,7 +397,6 @@ pub const Subprocess = struct {
var to_write = this.remain;
if (to_write.len == 0) {
- if (this.poll_ref.isActive()) this.unwatch(this.fd);
// we are done!
this.closeFDIfOpen();
return;
@@ -406,6 +415,7 @@ pub const Subprocess = struct {
});
this.watch(this.fd);
+ this.poll_ref.?.flags.insert(.fifo);
return;
}
@@ -445,11 +455,14 @@ pub const Subprocess = struct {
}
fn closeFDIfOpen(this: *BufferedInput) void {
- if (this.poll_ref.isActive()) this.unwatch(this.fd);
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinit();
+ }
- if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ if (this.fd != JSC.Node.invalid_fd) {
_ = JSC.Node.Syscall.close(this.fd);
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ this.fd = JSC.Node.invalid_fd;
}
}
@@ -470,12 +483,12 @@ pub const Subprocess = struct {
pub const BufferedOutput = struct {
internal_buffer: bun.ByteList = .{},
max_internal_buffer: u32 = default_max_buffer_size,
- fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor),
+ fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd,
received_eof: bool = false,
pending_error: ?JSC.Node.Syscall.Error = null,
- poll_ref: JSC.PollRef = .{},
+ poll_ref: ?*JSC.FilePoll = null,
- pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .read, ready);
+ pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready);
pub fn ready(this: *BufferedOutput, _: i64) void {
// TODO: what happens if the task was already enqueued after unwatch()?
@@ -483,7 +496,17 @@ pub const Subprocess = struct {
}
pub fn canRead(this: *BufferedOutput) bool {
- return bun.isReadable(this.fd);
+ const is_readable = bun.isReadable(this.fd);
+
+ if (is_readable) {
+ if (this.poll_ref) |poll_ref| {
+ poll_ref.flags.insert(.readable);
+ poll_ref.flags.insert(.fifo);
+ std.debug.assert(poll_ref.flags.contains(.poll_readable));
+ }
+ }
+
+ return is_readable;
}
pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void {
@@ -495,6 +518,7 @@ pub const Subprocess = struct {
// and we don't want this to become an event loop ticking point
if (!this.canRead()) {
this.watch(this.fd);
+ this.poll_ref.?.flags.insert(.fifo);
return;
}
}
@@ -502,9 +526,33 @@ pub const Subprocess = struct {
this.readAll(force);
}
+ pub fn closeOnEOF(this: *BufferedOutput) bool {
+ var poll = this.poll_ref orelse return true;
+ poll.flags.insert(.eof);
+ return false;
+ }
+
pub fn readAll(this: *BufferedOutput, comptime force: bool) void {
+ if (this.poll_ref) |poll| {
+ const is_readable = poll.isReadable();
+ if (!is_readable and poll.isEOF()) {
+ if (poll.isHUP()) {
+ this.autoCloseFileDescriptor();
+ }
+
+ return;
+ } else if (!is_readable and poll.isHUP()) {
+ this.autoCloseFileDescriptor();
+ return;
+ } else if (!is_readable) {
+ if (comptime !force) {
+ return;
+ }
+ }
+ }
+
// read as much as we can from the pipe
- while (this.internal_buffer.len <= this.max_internal_buffer) {
+ while (this.internal_buffer.len < this.max_internal_buffer) {
var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined;
var buf: []u8 = buffer_[0..];
@@ -517,7 +565,9 @@ pub const Subprocess = struct {
switch (JSC.Node.Syscall.read(this.fd, buf)) {
.err => |e| {
if (e.isRetry()) {
- this.watch(this.fd);
+ if (!this.isWatching())
+ this.watch(this.fd);
+ this.poll_ref.?.flags.insert(.fifo);
return;
}
@@ -558,7 +608,9 @@ pub const Subprocess = struct {
if (comptime !force) {
if (buf[bytes_read..].len > 0 or !this.canRead()) {
- this.watch(this.fd);
+ if (!this.isWatching())
+ this.watch(this.fd);
+ this.poll_ref.?.flags.insert(.fifo);
this.received_eof = true;
return;
}
@@ -566,6 +618,10 @@ pub const Subprocess = struct {
// we consider a short read as being EOF
this.received_eof = this.received_eof or bytes_read < buf.len;
if (this.received_eof) {
+ if (this.closeOnEOF()) {
+ this.autoCloseFileDescriptor();
+ }
+
// do not auto-close the file descriptor here
// it's totally legit to have a short read
return;
@@ -579,7 +635,7 @@ pub const Subprocess = struct {
pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob {
const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis);
this.internal_buffer = bun.ByteList.init("");
- std.debug.assert(this.fd == std.math.maxInt(JSC.Node.FileDescriptor));
+ std.debug.assert(this.fd == JSC.Node.invalid_fd);
std.debug.assert(this.received_eof);
return blob;
}
@@ -600,55 +656,61 @@ pub const Subprocess = struct {
).?;
}
+ var poll_ref = this.poll_ref;
+ this.poll_ref = null;
+
return JSC.WebCore.ReadableStream.fromJS(
- JSC.WebCore.ReadableStream.fromBlob(
+ JSC.WebCore.ReadableStream.fromBlobWithPoll(
globalThis,
&this.toBlob(globalThis),
0,
+ poll_ref,
),
globalThis,
).?;
}
}
- std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
-
- // BufferedOutput is going away
- // let's make sure we don't watch it anymore
- if (this.poll_ref.isActive()) {
- this.unwatch(this.fd);
- }
-
- // There could still be data waiting to be read in the pipe
- // so we need to create a new stream that will read from the
- // pipe and then return the blob.
- var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis);
- const result = JSC.WebCore.ReadableStream.fromJS(
- JSC.WebCore.ReadableStream.fromBlob(
+ std.debug.assert(this.fd != JSC.Node.invalid_fd);
+ {
+ var poll_ref = this.poll_ref;
+ this.poll_ref = null;
+
+ // There could still be data waiting to be read in the pipe
+ // so we need to create a new stream that will read from the
+ // pipe and then return the blob.
+ var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis);
+ const result = JSC.WebCore.ReadableStream.fromJS(
+ JSC.WebCore.ReadableStream.fromBlobWithPoll(
+ globalThis,
+ &blob,
+ 0,
+ poll_ref,
+ ),
globalThis,
- &blob,
- 0,
- ),
- globalThis,
- ).?;
- blob.detach();
- result.ptr.File.buffered_data = this.internal_buffer;
- result.ptr.File.stored_global_this_ = globalThis;
- result.ptr.File.finished = exited;
- this.internal_buffer = bun.ByteList.init("");
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
- this.received_eof = false;
- return result;
+ ).?;
+ blob.detach();
+
+ result.ptr.File.buffered_data = this.internal_buffer;
+ result.ptr.File.stored_global_this_ = globalThis;
+ result.ptr.File.finished = exited;
+ this.internal_buffer = bun.ByteList.init("");
+ this.fd = JSC.Node.invalid_fd;
+ this.received_eof = false;
+ return result;
+ }
}
pub fn autoCloseFileDescriptor(this: *BufferedOutput) void {
const fd = this.fd;
- if (fd == std.math.maxInt(JSC.Node.FileDescriptor))
+ if (fd == JSC.Node.invalid_fd)
return;
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ this.fd = JSC.Node.invalid_fd;
- if (this.poll_ref.isActive())
- this.unwatch(fd);
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinit();
+ }
_ = JSC.Node.Syscall.close(fd);
}
@@ -826,8 +888,9 @@ pub const Subprocess = struct {
.items = &.{},
.capacity = 0,
};
+ var jsc_vm = globalThis.bunVM();
- var cwd = globalThis.bunVM().bundler.fs.top_level_dir;
+ var cwd = jsc_vm.bundler.fs.top_level_dir;
var stdio = [3]Stdio{
.{ .ignore = .{} },
@@ -841,13 +904,13 @@ pub const Subprocess = struct {
}
var on_exit_callback = JSValue.zero;
- var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse "";
+ var PATH = jsc_vm.bundler.env.get("PATH") orelse "";
var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined;
var cmd_value = JSValue.zero;
var args = args_;
{
if (args.isEmptyOrUndefinedOrNull()) {
- globalThis.throwInvalidArguments("cmds must be an array", .{});
+ globalThis.throwInvalidArguments("cmd must be an array", .{});
return .zero;
}
@@ -858,7 +921,7 @@ pub const Subprocess = struct {
} else if (args.get(globalThis, "cmd")) |cmd_value_| {
cmd_value = cmd_value_;
} else {
- globalThis.throwInvalidArguments("cmds must be an array", .{});
+ globalThis.throwInvalidArguments("cmd must be an array", .{});
return .zero;
}
@@ -1015,7 +1078,7 @@ pub const Subprocess = struct {
defer actions.deinit();
if (env_array.items.len == 0) {
- env_array.items = globalThis.bunVM().bundler.env.map.createNullDelimitedEnvMap(allocator) catch |err| return globalThis.handleError(err, "in posix_spawn");
+ env_array.items = jsc_vm.bundler.env.map.createNullDelimitedEnvMap(allocator) catch |err| return globalThis.handleError(err, "in posix_spawn");
env_array.capacity = env_array.items.len;
}
@@ -1100,7 +1163,7 @@ pub const Subprocess = struct {
var status: u32 = 0;
// ensure we don't leak the child process on error
_ = std.os.linux.waitpid(pid, &status, 0);
- return JSValue.jsUndefined();
+ return .zero;
},
}
};
@@ -1135,11 +1198,12 @@ pub const Subprocess = struct {
subprocess.this_jsvalue.set(globalThis, out);
if (comptime !is_sync) {
- switch (globalThis.bunVM().poller.watch(
- @intCast(JSC.Node.FileDescriptor, pidfd),
+ var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess);
+ subprocess.poll_ref = poll;
+ switch (poll.register(
+ jsc_vm.uws_event_loop.?,
.process,
- Subprocess,
- subprocess,
+ true,
)) {
.result => {},
.err => |err| {
@@ -1237,7 +1301,14 @@ pub const Subprocess = struct {
if (!sync) {
var vm = this.globalThis.bunVM();
- this.unrefWithoutGC(vm);
+ this.reffer.unref(vm);
+
+ // prevent duplicate notifications
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinitWithVM(vm);
+ }
+
this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this);
this.has_waitpid_task = true;
vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task));
@@ -1245,7 +1316,7 @@ pub const Subprocess = struct {
}
pub fn unrefWithoutGC(this: *Subprocess, vm: *JSC.VirtualMachine) void {
- this.poll_ref.unref(vm);
+ if (this.poll_ref) |poll| poll.unref(vm);
this.reffer.unref(vm);
}