aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/api/bun/subprocess.zig167
-rw-r--r--src/bun.js/base.zig12
-rw-r--r--src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp12
-rw-r--r--src/bun.js/webcore/streams.zig60
4 files changed, 206 insertions, 45 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 14febbd00..d803704f3 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -51,6 +51,11 @@ pub const Subprocess = struct {
stdout,
stderr,
}) = .{},
+ closed: std.enums.EnumSet(enum {
+ stdin,
+ stdout,
+ stderr,
+ }) = .{},
has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
is_sync: bool = false,
@@ -76,11 +81,33 @@ pub const Subprocess = struct {
pub fn ref(this: *Subprocess) void {
var vm = this.globalThis.bunVM();
if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm);
+ if (!this.hasCalledGetter(.stdin)) {
+ this.stdin.ref();
+ }
+
+ if (!this.hasCalledGetter(.stdout)) {
+ this.stdout.ref();
+ }
+
+ if (!this.hasCalledGetter(.stderr)) {
+ this.stdout.ref();
+ }
}
pub fn unref(this: *Subprocess) void {
var vm = this.globalThis.bunVM();
if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm);
+ if (!this.hasCalledGetter(.stdin)) {
+ this.stdin.unref();
+ }
+
+ if (!this.hasCalledGetter(.stdout)) {
+ this.stdout.unref();
+ }
+
+ if (!this.hasCalledGetter(.stderr)) {
+ this.stdout.unref();
+ }
}
pub fn constructor(
@@ -98,6 +125,32 @@ pub const Subprocess = struct {
ignore: void,
closed: void,
+ pub fn ref(this: *Readable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe == .buffer) {
+ if (this.pipe.buffer.fifo.poll_ref) |poll| {
+ poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ }
+ },
+ else => {},
+ }
+ }
+
+ pub fn unref(this: *Readable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe == .buffer) {
+ if (this.pipe.buffer.fifo.poll_ref) |poll| {
+ poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ }
+ },
+ else => {},
+ }
+ }
+
pub const Pipe = union(enum) {
stream: JSC.WebCore.ReadableStream,
buffer: BufferedOutput,
@@ -167,8 +220,23 @@ pub const Subprocess = struct {
},
else => {},
}
+ }
- this.* = .closed;
+ pub fn finalize(this: *Readable) void {
+ switch (this.*) {
+ .fd => |fd| {
+ _ = JSC.Node.Syscall.close(fd);
+ },
+ .pipe => {
+ if (this.pipe == .stream and this.pipe.stream.ptr == .File) {
+ this.close();
+ return;
+ }
+
+ this.pipe.buffer.close();
+ },
+ else => {},
+ }
}
pub fn toJS(this: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue {
@@ -191,10 +259,8 @@ pub const Subprocess = struct {
return JSValue.jsNumber(fd);
},
.pipe => {
- defer this.close();
-
- if (this.pipe.buffer.canRead())
- this.pipe.buffer.readAll();
+ this.pipe.buffer.fifo.close_on_empty_read = true;
+ this.pipe.buffer.readAll();
var bytes = this.pipe.buffer.internal_buffer.slice();
this.pipe.buffer.internal_buffer = .{};
@@ -519,11 +585,6 @@ pub const Subprocess = struct {
.allocator = allocator,
.buffer = &this.internal_buffer,
};
- this.watch();
- }
-
- pub fn canRead(this: *BufferedOutput) bool {
- return bun.isReadable(this.fifo.fd) == .ready;
}
pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void {
@@ -549,7 +610,7 @@ pub const Subprocess = struct {
if (slice.len > 0)
std.debug.assert(this.internal_buffer.contains(slice));
- if (result.isDone()) {
+ if (result.isDone() or (slice.len == 0 and this.fifo.poll_ref != null and this.fifo.poll_ref.?.isHUP())) {
this.status = .{ .done = {} };
this.fifo.close();
}
@@ -602,6 +663,8 @@ pub const Subprocess = struct {
}
fn watch(this: *BufferedOutput) void {
+ std.debug.assert(this.fifo.fd != bun.invalid_fd);
+
this.fifo.pending.set(BufferedOutput, this, onRead);
if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd);
return;
@@ -637,8 +700,6 @@ pub const Subprocess = struct {
),
globalThis,
).?;
- } else {
- this.fifo.close_on_empty_read = true;
}
}
@@ -657,7 +718,8 @@ pub const Subprocess = struct {
),
globalThis,
).?;
-
+ this.fifo.fd = bun.invalid_fd;
+ this.fifo.poll_ref = null;
return result;
}
}
@@ -690,6 +752,28 @@ pub const Subprocess = struct {
inherit: void,
ignore: void,
+ pub fn ref(this: *Writable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe.poll_ref) |poll| {
+ poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ },
+ else => {},
+ }
+ }
+
+ pub fn unref(this: *Writable) void {
+ switch (this.*) {
+ .pipe => {
+ if (this.pipe.poll_ref) |poll| {
+ poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
+ }
+ },
+ else => {},
+ }
+ }
+
// When the stream has closed we need to be notified to prevent a use-after-free
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
pub fn onClose(this: *Writable, _: ?JSC.Node.Syscall.Error) void {
@@ -761,7 +845,7 @@ pub const Subprocess = struct {
};
}
- pub fn close(this: *Writable) void {
+ pub fn finalize(this: *Writable) void {
return switch (this.*) {
.pipe => |pipe| {
pipe.close();
@@ -780,13 +864,50 @@ pub const Subprocess = struct {
.inherit => {},
};
}
+
+ pub fn close(this: *Writable) void {
+ return switch (this.*) {
+ .pipe => {},
+ .pipe_to_readable_stream => |*pipe_to_readable_stream| {
+ _ = pipe_to_readable_stream.pipe.end(null);
+ },
+ .fd => |fd| {
+ _ = JSC.Node.Syscall.close(fd);
+ this.* = .{ .ignore = {} };
+ },
+ .buffered_input => {
+ this.buffered_input.deinit();
+ },
+ .ignore => {},
+ .inherit => {},
+ };
+ }
};
+ fn closeIO(this: *Subprocess, comptime io: @Type(.EnumLiteral)) void {
+ if (this.closed.contains(io)) return;
+ this.closed.insert(io);
+
+ // If you never referenced stdout/stderr, they won't be garbage collected.
+ //
+ // That means:
+ // 1. We need to stop watching them
+ // 2. We need to free the memory
+ // 3. We need to halt any pending reads (1)
+ if (!this.hasCalledGetter(io)) {
+ @field(this, @tagName(io)).finalize();
+ } else {
+ @field(this, @tagName(io)).close();
+ }
+ }
+
+ // This must only be run once per Subprocess
pub fn finalizeSync(this: *Subprocess) void {
this.closeProcess();
- this.stdin.close();
- this.stderr.close();
- this.stdout.close();
+
+ this.closeIO(.stdin);
+ this.closeIO(.stdout);
+ this.closeIO(.stderr);
this.exit_promise.deinit();
this.on_exit_callback.deinit();
@@ -1220,9 +1341,7 @@ pub const Subprocess = struct {
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
if (comptime is_sync) {
- if (subprocess.stdout.pipe.buffer.canRead()) {
- subprocess.stdout.pipe.buffer.readAll();
- }
+ subprocess.stdout.pipe.buffer.readAll();
} else if (!lazy) {
subprocess.stdout.pipe.buffer.readAll();
}
@@ -1230,9 +1349,7 @@ pub const Subprocess = struct {
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
if (comptime is_sync) {
- if (subprocess.stderr.pipe.buffer.canRead()) {
- subprocess.stderr.pipe.buffer.readAll();
- }
+ subprocess.stderr.pipe.buffer.readAll();
} else if (!lazy) {
subprocess.stderr.pipe.buffer.readAll();
}
@@ -1298,7 +1415,7 @@ pub const Subprocess = struct {
if (this.has_waitpid_task) {
return;
}
- defer this.updateHasPendingActivityFlag();
+ defer if (sync) this.updateHasPendingActivityFlag();
this.has_waitpid_task = true;
const pid = this.pid;
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 9acf675cb..dc0b98e61 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -3329,6 +3329,8 @@ pub const FilePoll = struct {
return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process);
}
+ const kqueue_or_epoll = if (Environment.isMac) "kevent" else "epoll";
+
pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop, size_or_offset: i64) void {
if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) {
if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop);
@@ -3337,23 +3339,23 @@ pub const FilePoll = struct {
var ptr = poll.owner;
switch (ptr.tag()) {
@field(Owner.Tag, "FIFO") => {
- log("onUpdate: FIFO", .{});
- ptr.as(FIFO).ready(size_or_offset);
+ log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) FIFO", .{poll.fd});
+ ptr.as(FIFO).ready(size_or_offset, poll.flags.contains(.hup));
},
@field(Owner.Tag, "Subprocess") => {
- log("onUpdate: Subprocess", .{});
+ log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) Subprocess", .{poll.fd});
var loader = ptr.as(JSC.Subprocess);
loader.onExitNotification();
},
@field(Owner.Tag, "FileSink") => {
- log("onUpdate: FileSink", .{});
+ log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) FileSink", .{poll.fd});
var loader = ptr.as(JSC.WebCore.FileSink);
loader.onPoll(size_or_offset, 0);
},
else => {
- log("onUpdate: disconnected?", .{});
+ log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) disconnected?", .{poll.fd});
},
}
}
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
index 72231d8b3..4e08d5f38 100644
--- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
+++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
@@ -2268,7 +2268,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo
const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
-const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3840;
+const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3983;
static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsLazyLoadStreamCode =
"(function (stream, autoAllocateChunkSize) {\n" \
@@ -2289,6 +2289,14 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" handleResult(val, c, v);\n" \
" }\n" \
"\n" \
+ " function callClose(controller) {\n" \
+ " try {\n" \
+ " controller.close();\n" \
+ " } catch(e) {\n" \
+ " globalThis.reportError(e);\n" \
+ " }\n" \
+ " }\n" \
+ "\n" \
" handleResult = function handleResult(result, controller, view) {\n" \
" \"use strict\";\n" \
" if (result && @isPromise(result)) {\n" \
@@ -2310,7 +2318,7 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" }\n" \
"\n" \
" if (closer[0] || result === false) {\n" \
- " @enqueueJob(() => controller.close());\n" \
+ " @enqueueJob(callClose, controller);\n" \
" closer[0] = false;\n" \
" }\n" \
" };\n" \
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 7f82d694e..263525ab8 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -308,6 +308,7 @@ pub const ReadableStream = struct {
reader.context.lazy_readable.readable.FIFO.pending.future = undefined;
reader.context.lazy_readable.readable.FIFO.auto_sizer = null;
reader.context.lazy_readable.readable.FIFO.pending.state = .none;
+ reader.context.lazy_readable.readable.FIFO.drained = buffered_data.len == 0;
return reader.toJS(globalThis);
}
@@ -3517,6 +3518,7 @@ pub const FIFO = struct {
is_first_read: bool = true,
auto_close: bool = true,
has_adjusted_pipe_size_on_linux: bool = false,
+ drained: bool = true,
pub usingnamespace NewReadyWatcher(@This(), .readable, ready);
@@ -3610,6 +3612,12 @@ pub const FIFO = struct {
this.close_on_empty_read = false;
return null;
},
+ // we need to read the 0 at the end or else we are not truly done
+ .hup => {
+ this.close_on_empty_read = true;
+ poll.flags.insert(.hup);
+ return null;
+ },
else => {},
}
@@ -3647,7 +3655,11 @@ pub const FIFO = struct {
} else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
// we don't know if it's readable or not
return switch (bun.isReadable(this.fd)) {
- .hup, .ready => null,
+ .hup => {
+ this.close_on_empty_read = true;
+ return null;
+ },
+ .ready => null,
else => ReadResult{ .pending = {} },
};
}
@@ -3666,17 +3678,34 @@ pub const FIFO = struct {
return this.to_read;
}
- pub fn ready(this: *FIFO, sizeOrOffset: i64) void {
+ pub fn ready(this: *FIFO, sizeOrOffset: i64, is_hup: bool) void {
if (this.isClosed()) {
if (this.isWatching())
this.unwatch(this.poll_ref.?.fd);
return;
}
- if (this.buf.len == 0) {
+ if (comptime Environment.isMac) {
+ if (sizeOrOffset == 0 and is_hup and this.drained) {
+ this.close();
+ return;
+ }
+ } else if (is_hup and this.drained and this.getAvailableToReadOnLinux() == 0) {
+ this.close();
return;
}
+ if (this.buf.len == 0) {
+ var auto_sizer = this.auto_sizer orelse return;
+ if (comptime Environment.isMac) {
+ if (sizeOrOffset > 0) {
+ this.buf = auto_sizer.resize(@intCast(usize, sizeOrOffset)) catch return;
+ } else {
+ this.buf = auto_sizer.resize(8096) catch return;
+ }
+ }
+ }
+
const read_result = this.read(
this.buf,
// On Linux, we end up calling ioctl() twice if we don't do this
@@ -3687,13 +3716,6 @@ pub const FIFO = struct {
null,
);
- if (read_result == .read and read_result.read.len == 0) {
- if (this.isWatching())
- this.unwatch(this.poll_ref.?.fd);
- this.close();
- return;
- }
-
if (read_result == .read) {
if (this.to_read) |*to_read| {
to_read.* = to_read.* -| @truncate(u32, read_result.read.len);
@@ -3768,6 +3790,7 @@ pub const FIFO = struct {
}
var buf = buf_;
+ std.debug.assert(buf.len > 0);
if (available_to_read) |amt| {
if (amt >= buf.len) {
@@ -3828,9 +3851,9 @@ pub const FIFO = struct {
}
}
- if (result == 0)
+ if (result == 0) {
return .{ .read = buf[0..0] };
-
+ }
return .{ .read = buf[0..result] };
},
}
@@ -4238,6 +4261,14 @@ pub const FileReader = struct {
blob: *Blob.Store,
empty: void,
+ pub fn onDrain(this: *Lazy) void {
+ if (this.* == .readable) {
+ if (this.readable == .FIFO) {
+ this.readable.FIFO.drained = true;
+ }
+ }
+ }
+
pub fn finish(this: *Lazy) void {
switch (this.readable) {
.FIFO => {
@@ -4380,6 +4411,7 @@ pub const FileReader = struct {
.FIFO = FIFO{
.fd = readable_file.fd,
.auto_close = readable_file.auto_close,
+ .drained = this.buffered_data.len == 0,
},
},
};
@@ -4394,7 +4426,8 @@ pub const FileReader = struct {
.readable => {},
.empty => return .{ .empty = {} },
}
- }
+ } else if (this.lazy_readable == .empty)
+ return .{ .empty = {} };
if (this.readable().* == .File) {
const chunk_size = this.readable().File.calculateChunkSize(std.math.maxInt(usize));
@@ -4463,6 +4496,7 @@ pub const FileReader = struct {
pub fn drainInternalBuffer(this: *FileReader) bun.ByteList {
const buffered = this.buffered_data;
+ this.lazy_readable.onDrain();
if (buffered.cap > 0) {
this.buffered_data = .{};
}