diff options
Diffstat (limited to '')
| -rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 81 | ||||
| -rw-r--r-- | src/bun.js/webcore/streams.zig | 17 | ||||
| -rw-r--r-- | src/global.zig | 28 |
3 files changed, 84 insertions, 42 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 6bf839ca1..b6dfc666d 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -325,18 +325,15 @@ pub const Subprocess = struct { this.write(@intCast(usize, @maximum(size_or_offset, 0))); } + pub fn canWrite(this: *BufferedInput) bool { + return bun.isWritable(this.fd); + } + pub fn writeIfPossible(this: *BufferedInput) void { // we ask, "Is it possible to write right now?" // we do this rather than epoll or kqueue() // because we don't want to block the thread waiting for the write - var polls = &[_]std.os.pollfd{ - .{ - .fd = this.fd, - .events = std.os.POLL.OUT | std.os.POLL.ERR, - .revents = 0, - }, - }; - if ((std.os.poll(polls, 0) catch 0) == 0) { + if (!this.canWrite()) { this.watch(this.fd); return; } @@ -438,19 +435,16 @@ pub const Subprocess = struct { this.readAll(); } + pub fn canRead(this: *BufferedOutput) bool { + return bun.isReadable(this.fd); + } + pub fn readIfPossible(this: *BufferedOutput) void { // we ask, "Is it possible to read right now?" // we do this rather than epoll or kqueue() // because we don't want to block the thread waiting for the read - var polls = &[_]std.os.pollfd{ - .{ - .fd = this.fd, - .events = std.os.POLL.IN | std.os.POLL.ERR, - .revents = 0, - }, - }; - - if ((std.os.poll(polls, 0) catch 0) == 0) { + if (!this.canRead()) { + this.watch(this.fd); return; } @@ -493,19 +487,19 @@ pub const Subprocess = struct { .result => |bytes_read| { log("readAll() {d}", .{bytes_read}); - if (bytes_read == 0) { + if (bytes_read > 0) { + if (buf.ptr == available.ptr) { + this.internal_buffer.len += @truncate(u32, bytes_read); + } else { + _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory"); + } + } + + if (buf[bytes_read..].len > 0 or !this.canRead()) { this.watch(this.fd); this.received_eof = true; return; } - - if (buf.ptr == available.ptr) { - this.internal_buffer.len += @truncate(u32, bytes_read); - } else { - _ = this.internal_buffer.write(bun.default_allocator, buf[0..bytes_read]) catch @panic("Ran out of memory"); - } - - continue; }, } } @@ -520,12 +514,11 @@ pub const Subprocess = struct { } pub fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream { - if (this.poll_ref.isActive()) - this.unwatch(this.fd); - if (exited) { // exited + received EOF => no more read() if (this.received_eof) { + this.autoCloseFileDescriptor(); + // also no data at all if (this.internal_buffer.len == 0) { this.close(); @@ -549,6 +542,12 @@ pub const Subprocess = struct { std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); + // BufferedOutput is going away + // let's make sure we don't watch it anymore + if (this.poll_ref.isActive()) { + this.unwatch(this.fd); + } + // There could still be data waiting to be read in the pipe // so we need to create a new stream that will read from the // pipe and then return the blob. @@ -571,14 +570,20 @@ pub const Subprocess = struct { return result; } - pub fn close(this: *BufferedOutput) void { - if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { - if (this.poll_ref.isActive()) - this.unwatch(this.fd); + pub fn autoCloseFileDescriptor(this: *BufferedOutput) void { + const fd = this.fd; + if (fd == std.math.maxInt(JSC.Node.FileDescriptor)) + return; + this.fd = std.math.maxInt(JSC.Node.FileDescriptor); - _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); - } + if (this.poll_ref.isActive()) + this.unwatch(fd); + + _ = JSC.Node.Syscall.close(fd); + } + + pub fn close(this: *BufferedOutput) void { + this.autoCloseFileDescriptor(); if (this.internal_buffer.cap > 0) { this.internal_buffer.listManaged(bun.default_allocator).deinit(); @@ -1180,9 +1185,7 @@ pub const Subprocess = struct { const idx: usize = if (std_fileno == 0) 0 else 1; try actions.dup2(pipe_fd[idx], std_fileno); - - if (comptime Environment.isMac) - try actions.close(pipe_fd[1 - idx]); + try actions.close(pipe_fd[1 - idx]); }, .fd => |fd| { try actions.dup2(fd, std_fileno); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 61a232b5c..15eec6262 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -3556,8 +3556,23 @@ pub const FileBlobLoader = struct { switch (rc) { .err => |err| { const retry = std.os.E.AGAIN; + const errno = brk: { + const _errno = err.getErrno(); + if (comptime Environment.isLinux) { + // EPERM and its a FIFO on Linux? Trying to read past a FIFO which has already + // sent a 0 + // Let's retry later. + if (std.os.S.ISFIFO(this.mode) and + !this.close_on_eof and _errno == .PERM) + { + break :brk .AGAIN; + } + } + + break :brk _errno; + }; - switch (err.getErrno()) { + switch (errno) { retry => { if (this.finished) { return .{ .done = {} }; diff --git a/src/global.zig b/src/global.zig index a1d33bef5..f0e98861f 100644 --- a/src/global.zig +++ b/src/global.zig @@ -336,6 +336,30 @@ pub fn assertNonBlocking(fd: anytype) void { } pub fn ensureNonBlocking(fd: anytype) void { - const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch unreachable; - _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch unreachable; + const current = std.os.fcntl(fd, std.os.F.GETFL, 0) catch 0; + _ = std.os.fcntl(fd, std.os.F.SETFL, current | std.os.O.NONBLOCK) catch 0; +} + +pub fn isReadable(fd: std.os.fd_t) bool { + var polls = &[_]std.os.pollfd{ + .{ + .fd = fd, + .events = std.os.POLL.IN | std.os.POLL.ERR, + .revents = 0, + }, + }; + + return (std.os.poll(polls, 0) catch 0) != 0; +} + +pub fn isWritable(fd: std.os.fd_t) bool { + var polls = &[_]std.os.pollfd{ + .{ + .fd = fd, + .events = std.os.POLL.OUT | std.os.POLL.ERR, + .revents = 0, + }, + }; + + return (std.os.poll(polls, 0) catch 0) != 0; } |
