aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bun.js/api/bun/subprocess.zig81
-rw-r--r--src/bun.js/webcore/streams.zig17
-rw-r--r--src/global.zig28
3 files changed, 84 insertions, 42 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 6bf839ca1..b6dfc666d 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -325,18 +325,15 @@ pub const Subprocess = struct {
this.write(@intCast(usize, @maximum(size_or_offset, 0)));
}
+ pub fn canWrite(this: *BufferedInput) bool {
+ return bun.isWritable(this.fd);
+ }
+
pub fn writeIfPossible(this: *BufferedInput) void {
// we ask, "Is it possible to write right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the write
- var polls = &[_]std.os.pollfd{
- .{
- .fd = this.fd,
- .events = std.os.POLL.OUT | std.os.POLL.ERR,
- .revents = 0,
- },
- };
- if ((std.os.poll(polls, 0) catch 0) == 0) {
+ if (!this.canWrite()) {
this.watch(this.fd);
return;
}
@@ -438,19 +435,16 @@ pub const Subprocess = struct {
this.readAll();
}
+ pub fn canRead(this: *BufferedOutput) bool {
+ return bun.isReadable(this.fd);
+ }
+
pub fn readIfPossible(this: *BufferedOutput) void {
// we ask, "Is it possible to read right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the read
- var polls = &[_]std.os.pollfd{
- .{
- .fd = this.fd,
- .events = std.os.POLL.IN | std.os.POLL.ERR,
- .revents = 0,
- },
- };
-
- if ((std.os.poll(polls, 0) catch 0) == 0) {
+ if (!this.canRead()) {
+ this.watch(this.fd);
return;
}
@@ -493,19 +487,19 @@ pub const Subprocess = struct {
.result => |bytes_read| {
log("readAll() {d}", .{bytes_read});
- if (bytes_read == 0) {
+ if (bytes_read > 0) {
+ if (buf.ptr == available.ptr) {
+ this.internal_buffer.len += @truncate(u32, bytes_read);
+ } else {
+ _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory");
+ }
+ }
+
+ if (buf[bytes_read..].len > 0 or !this.canRead()) {
this.watch(this.fd);
this.received_eof = true;
return;
}
-
- if (buf.ptr == available.ptr) {
- this.internal_buffer.len += @truncate(u32, bytes_read);
- } else {
- _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory");
- }
-
- continue;
},
}
}
@@ -520,12 +514,11 @@ pub const Subprocess = struct {
}
pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream {
- if (this.poll_ref.isActive())
- this.unwatch(this.fd);
-
if (exited) {
// exited + received EOF => no more read()
if (this.received_eof) {
+ this.autoCloseFileDescriptor();
+
// also no data at all
if (this.internal_buffer.len == 0) {
this.close();
@@ -549,6 +542,12 @@ pub const Subprocess = struct {
std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ // BufferedOutput is going away
+ // let's make sure we don't watch it anymore
+ if (this.poll_ref.isActive()) {
+ this.unwatch(this.fd);
+ }
+
// There could still be data waiting to be read in the pipe
// so we need to create a new stream that will read from the
// pipe and then return the blob.
@@ -571,14 +570,20 @@ pub const Subprocess = struct {
return result;
}
- pub fn close(this: *BufferedOutput) void {
- if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
- if (this.poll_ref.isActive())
- this.unwatch(this.fd);
+ pub fn autoCloseFileDescriptor(this: *BufferedOutput) void {
+ const fd = this.fd;
+ if (fd == std.math.maxInt(JSC.Node.FileDescriptor))
+ return;
+ this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
- _ = JSC.Node.Syscall.close(this.fd);
- this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
- }
+ if (this.poll_ref.isActive())
+ this.unwatch(fd);
+
+ _ = JSC.Node.Syscall.close(fd);
+ }
+
+ pub fn close(this: *BufferedOutput) void {
+ this.autoCloseFileDescriptor();
if (this.internal_buffer.cap > 0) {
this.internal_buffer.listManaged(bun.default_allocator).deinit();
@@ -1180,9 +1185,7 @@ pub const Subprocess = struct {
const idx: usize = if (std_fileno == 0) 0 else 1;
try actions.dup2(pipe_fd[idx], std_fileno);
-
- if (comptime Environment.isMac)
- try actions.close(pipe_fd[1 - idx]);
+ try actions.close(pipe_fd[1 - idx]);
},
.fd => |fd| {
try actions.dup2(fd, std_fileno);
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 61a232b5c..15eec6262 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -3556,8 +3556,23 @@ pub const FileBlobLoader = struct {
switch (rc) {
.err => |err| {
const retry = std.os.E.AGAIN;
+ const errno = brk: {
+ const _errno = err.getErrno();
+ if (comptime Environment.isLinux) {
+ // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already
+ // sent a 0
+ // Let's retry later.
+ if (std.os.S.ISFIFO(this.mode) and
+ !this.close_on_eof and _errno == .PERM)
+ {
+ break :brk .AGAIN;
+ }
+ }
+
+ break :brk _errno;
+ };
- switch (err.getErrno()) {
+ switch (errno) {
retry => {
if (this.finished) {
return .{ .done = {} };
diff --git a/src/global.zig b/src/global.zig
index a1d33bef5..f0e98861f 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -336,6 +336,30 @@ pub fn assertNonBlocking(fd: anytype) void {
}
pub fn ensureNonBlocking(fd: anytype) void {
- const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch unreachable;
- _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch unreachable;
+ const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch 0;
+ _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch 0;
+}
+
+pub fn isReadable(fd: std.os.fd_t) bool {
+ var polls = &[_]std.os.pollfd{
+ .{
+ .fd = fd,
+ .events = std.os.POLL.IN | std.os.POLL.ERR,
+ .revents = 0,
+ },
+ };
+
+ return (std.os.poll(polls, 0) catch 0) != 0;
+}
+
+pub fn isWritable(fd: std.os.fd_t) bool {
+ var polls = &[_]std.os.pollfd{
+ .{
+ .fd = fd,
+ .events = std.os.POLL.OUT | std.os.POLL.ERR,
+ .revents = 0,
+ },
+ };
+
+ return (std.os.poll(polls, 0) catch 0) != 0;
}