aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-11-14 04:08:36 -0800
committerGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-11-14 04:08:36 -0800
commit20eff9f6d21742540a7bf68c3dc364bf68b23504 (patch)
tree0b4b61987ab18b18d221d0c944960b72bb7a1fdf /src
parent69eedb4c9232c7c3e75bc9f17a6ca81c06adcfe0 (diff)
downloadbun-20eff9f6d21742540a7bf68c3dc364bf68b23504.tar.gz
bun-20eff9f6d21742540a7bf68c3dc364bf68b23504.tar.zst
bun-20eff9f6d21742540a7bf68c3dc364bf68b23504.zip
Bugfixes and perf improvements to child_process
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/api/bun/subprocess.zig55
-rw-r--r--src/bun.js/base.zig8
-rw-r--r--src/bun.js/child_process.exports.js14
-rw-r--r--src/bun.js/node/node_fs.zig7
-rw-r--r--src/bun.js/node/node_fs_binding.zig8
-rw-r--r--src/bun.js/node/syscall.zig65
-rw-r--r--src/bun.js/streams.exports.js72
-rw-r--r--src/bun.js/webcore/streams.zig285
-rw-r--r--src/cli/test_command.zig6
-rw-r--r--src/global.zig5
-rw-r--r--src/linux_c.zig2
11 files changed, 348 insertions, 179 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 2d8127901..2b30449ac 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -108,11 +108,12 @@ pub const Subprocess = struct {
}
return this.stream.toJS();
}
-
+ const is_fifo = this.buffer.is_fifo;
const stream = this.buffer.toReadableStream(globalThis, exited);
this.* = .{ .stream = stream };
if (this.stream.ptr == .File) {
this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable);
+ this.stream.ptr.File.is_fifo = is_fifo;
}
return stream.value;
}
@@ -127,6 +128,7 @@ pub const Subprocess = struct {
.pipe = .{
.buffer = BufferedOutput{
.fd = fd,
+ .is_fifo = true,
},
},
};
@@ -485,10 +487,21 @@ pub const Subprocess = struct {
received_eof: bool = false,
pending_error: ?JSC.Node.Syscall.Error = null,
poll_ref: ?*JSC.FilePoll = null,
+ is_fifo: bool = false,
pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready);
- pub fn ready(this: *BufferedOutput, _: i64) void {
+ pub fn ready(this: *BufferedOutput, available_to_read: i64) void {
+ if (comptime Environment.isMac) {
+ if (this.poll_ref) |poll| {
+ if (available_to_read > 0) {
+ poll.flags.insert(.readable);
+ } else {
+ poll.flags.remove(.readable);
+ }
+ }
+ }
+
// TODO: what happens if the task was already enqueued after unwatch()?
this.readAll(false);
}
@@ -516,7 +529,6 @@ pub const Subprocess = struct {
// and we don't want this to become an event loop ticking point
if (!this.canRead()) {
this.watch(this.fd);
- this.poll_ref.?.flags.insert(.fifo);
return;
}
}
@@ -543,9 +555,7 @@ pub const Subprocess = struct {
this.autoCloseFileDescriptor();
return;
} else if (!is_readable) {
- if (comptime !force) {
- return;
- }
+ return;
}
}
@@ -563,7 +573,7 @@ pub const Subprocess = struct {
switch (JSC.Node.Syscall.read(this.fd, buf)) {
.err => |e| {
if (e.isRetry()) {
- if (!this.isWatching())
+ if (!this.isWatching() and this.isFIFO())
this.watch(this.fd);
this.poll_ref.?.flags.insert(.fifo);
return;
@@ -608,13 +618,15 @@ pub const Subprocess = struct {
if (buf[bytes_read..].len > 0 or !this.canRead()) {
if (!this.isWatching())
this.watch(this.fd);
- this.poll_ref.?.flags.insert(.fifo);
- this.received_eof = true;
+ if (this.is_fifo)
+ this.poll_ref.?.flags.insert(.fifo)
+ else
+ this.received_eof = true;
return;
}
} else {
// we consider a short read as being EOF
- this.received_eof = this.received_eof or bytes_read < buf.len;
+ this.received_eof = !this.is_fifo and this.received_eof or bytes_read < buf.len;
if (this.received_eof) {
if (this.closeOnEOF()) {
this.autoCloseFileDescriptor();
@@ -642,6 +654,9 @@ pub const Subprocess = struct {
if (exited) {
// exited + received EOF => no more read()
if (this.received_eof) {
+ var poll_ref = this.poll_ref;
+ this.poll_ref = null;
+
this.autoCloseFileDescriptor();
// also no data at all
@@ -654,9 +669,6 @@ pub const Subprocess = struct {
).?;
}
- var poll_ref = this.poll_ref;
- this.poll_ref = null;
-
return JSC.WebCore.ReadableStream.fromJS(
JSC.WebCore.ReadableStream.fromBlobWithPoll(
globalThis,
@@ -900,7 +912,7 @@ pub const Subprocess = struct {
stdio[1] = .{ .pipe = null };
stdio[2] = .{ .pipe = null };
}
-
+ var lazy = false;
var on_exit_callback = JSValue.zero;
var PATH = jsc_vm.bundler.env.get("PATH") orelse "";
var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined;
@@ -916,6 +928,9 @@ pub const Subprocess = struct {
if (args_type.isArray()) {
cmd_value = args;
args = secondaryArgsValue orelse JSValue.zero;
+ } else if (!args.isObject()) {
+ globalThis.throwInvalidArguments("cmd must be an array", .{});
+ return .zero;
} else if (args.get(globalThis, "cmd")) |cmd_value_| {
cmd_value = cmd_value_;
} else {
@@ -1054,6 +1069,14 @@ pub const Subprocess = struct {
return .zero;
}
}
+
+ if (comptime !is_sync) {
+ if (args.get(globalThis, "lazy")) |lazy_val| {
+ if (lazy_val.isBoolean()) {
+ lazy = lazy_val.toBoolean();
+ }
+ }
+ }
}
}
@@ -1229,7 +1252,7 @@ pub const Subprocess = struct {
if (subprocess.stdout.pipe.buffer.canRead()) {
subprocess.stdout.pipe.buffer.readAll(true);
}
- } else {
+ } else if (!lazy) {
subprocess.stdout.pipe.buffer.readIfPossible(false);
}
}
@@ -1239,7 +1262,7 @@ pub const Subprocess = struct {
if (subprocess.stderr.pipe.buffer.canRead()) {
subprocess.stderr.pipe.buffer.readAll(true);
}
- } else {
+ } else if (!lazy) {
subprocess.stderr.pipe.buffer.readIfPossible(false);
}
}
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index deeab246a..3d1233589 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -4199,6 +4199,14 @@ pub const FilePoll = struct {
return this.flags.contains(.has_incremented_poll_count);
}
+ pub inline fn isWatching(this: *const FilePoll) bool {
+ return !this.flags.contains(.needs_rearm) and (this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process));
+ }
+
+ pub inline fn isKeepingProcessAlive(this: *const FilePoll) bool {
+ return !this.flags.contains(.disable) and this.isActive();
+ }
+
/// Make calling ref() on this poll into a no-op.
pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (this.flags.contains(.disable))
diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js
index bc401cc37..a9a1589dd 100644
--- a/src/bun.js/child_process.exports.js
+++ b/src/bun.js/child_process.exports.js
@@ -78,6 +78,17 @@ const debug = process.env.DEBUG ? console.log : () => {};
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+function spawnTimeoutFunction(child, timeoutHolder) {
+ var timeoutId = timeoutHolder.timeoutId;
+ if (timeoutId > -1) {
+ try {
+ child.kill(killSignal);
+ } catch (err) {
+ child.emit("error", err);
+ }
+ timeoutHolder.timeoutId = -1;
+ }
+}
/**
* Spawns a new process using the given `file`.
* @param {string} file
@@ -537,7 +548,7 @@ export function spawnSync(file, args, options) {
if (!success) {
result.error = new SystemError(
- result.stderr,
+ result.output[2],
options.file,
"spawnSync",
-1,
@@ -1030,6 +1041,7 @@ export class ChildProcess extends EventEmitter {
cwd: options.cwd || undefined,
env: options.envPairs || undefined,
onExit: this.#handleOnExit.bind(this),
+ lazy: true,
});
this.#handleExited = this.#handle.exited;
this.#encoding = options.encoding || undefined;
diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig
index ff9f1ba1f..b5ba2b983 100644
--- a/src/bun.js/node/node_fs.zig
+++ b/src/bun.js/node/node_fs.zig
@@ -3639,11 +3639,8 @@ pub const NodeFS = struct {
pub fn rmdir(this: *NodeFS, args: Arguments.RmDir, comptime flavor: Flavor) Maybe(Return.Rmdir) {
switch (comptime flavor) {
.sync => {
- var dir = args.old_path.sliceZ(&this.sync_error_buf);
- switch (Syscall.getErrno(system.rmdir(dir))) {
- .SUCCESS => return Maybe(Return.Rmdir).success,
- else => |err| return Maybe(Return.Rmdir).errnoSys(err, .rmdir),
- }
+ return Maybe(Return.Rmdir).errnoSysP(system.rmdir(args.path.sliceZ(&this.sync_error_buf)), .rmdir, args.path.slice()) orelse
+ Maybe(Return.Rmdir).success;
},
else => {},
}
diff --git a/src/bun.js/node/node_fs_binding.zig b/src/bun.js/node/node_fs_binding.zig
index d4d97e9a9..a76c83637 100644
--- a/src/bun.js/node/node_fs_binding.zig
+++ b/src/bun.js/node/node_fs_binding.zig
@@ -243,6 +243,10 @@ pub const NodeFSBindings = JSC.NewClass(
.name = "rm",
.rfn = call(.rm),
},
+ .rmdir = .{
+ .name = "rmdir",
+ .rfn = call(.rmdir),
+ },
.realpath = .{
.name = "realpath",
.rfn = call(.realpath),
@@ -426,6 +430,10 @@ pub const NodeFSBindings = JSC.NewClass(
.name = "rmSync",
.rfn = callSync(.rm),
},
+ .rmdirSync = .{
+ .name = "rmdirSync",
+ .rfn = callSync(.rmdir),
+ },
},
.{},
);
diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig
index 7ad927c3a..14cee4e36 100644
--- a/src/bun.js/node/syscall.zig
+++ b/src/bun.js/node/syscall.zig
@@ -203,6 +203,7 @@ pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode)
// The zig standard library marks BADF as unreachable
// That error is not unreachable for us
pub fn close(fd: std.os.fd_t) ?Syscall.Error {
+ log("close({d})", .{fd});
if (comptime Environment.isMac) {
// This avoids the EINTR problem.
return switch (system.getErrno(system.@"close$NOCANCEL"(fd))) {
@@ -682,3 +683,67 @@ pub const Error = struct {
return this.toSystemError().toErrorInstance(ptr);
}
};
+
+pub fn setPipeCapacityOnLinux(fd: JSC.Node.FileDescriptor, capacity: usize) Maybe(usize) {
+ if (comptime !Environment.isLinux) @compileError("Linux-only");
+ std.debug.assert(capacity > 0);
+
+ // In Linux versions before 2.6.11, the capacity of a
+ // pipe was the same as the system page size (e.g., 4096
+ // bytes on i386). Since Linux 2.6.11, the pipe
+ // capacity is 16 pages (i.e., 65,536 bytes in a system
+ // with a page size of 4096 bytes). Since Linux 2.6.35,
+ // the default pipe capacity is 16 pages, but the
+ // capacity can be queried and set using the
+ // fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations.
+ // See fcntl(2) for more information.
+ //:# define F_SETPIPE_SZ 1031 /* Set pipe page size array.
+ const F_SETPIPE_SZ = 1031;
+ const F_GETPIPE_SZ = 1032;
+
+ // We don't use glibc here
+ // It didn't work. Always returned 0.
+ const pipe_len = std.os.linux.fcntl(fd, F_GETPIPE_SZ, 0);
+ if (Maybe(usize).errno(pipe_len)) |err| return err;
+ if (pipe_len == 0) return Maybe(usize){ .result = 0 };
+ if (pipe_len >= capacity) return Maybe(usize){ .result = pipe_len };
+
+ const new_pipe_len = std.os.linux.fcntl(fd, F_SETPIPE_SZ, capacity);
+ if (Maybe(usize).errno(new_pipe_len)) |err| return err;
+ return Maybe(usize){ .result = new_pipe_len };
+}
+
+pub fn getMaxPipeSizeOnLinux() usize {
+ return @intCast(
+ usize,
+ bun.once(struct {
+ fn once() c_int {
+ const strings = bun.strings;
+ const default_out_size = 512 * 1024;
+ const pipe_max_size_fd = switch (JSC.Node.Syscall.open("/proc/sys/fs/pipe-max-size", std.os.O.RDONLY, 0)) {
+ .result => |fd2| fd2,
+ .err => |err| {
+ log("Failed to open /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno});
+ return default_out_size;
+ },
+ };
+ defer _ = JSC.Node.Syscall.close(pipe_max_size_fd);
+ var max_pipe_size_buf: [128]u8 = undefined;
+ const max_pipe_size = switch (JSC.Node.Syscall.read(pipe_max_size_fd, max_pipe_size_buf[0..])) {
+ .result => |bytes_read| std.fmt.parseInt(i64, strings.trim(max_pipe_size_buf[0..bytes_read], "\n"), 10) catch |err| {
+ log("Failed to parse /proc/sys/fs/pipe-max-size: {any}\n", .{@errorName(err)});
+ return default_out_size;
+ },
+ .err => |err| {
+ log("Failed to read /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno});
+ return default_out_size;
+ },
+ };
+
+ // we set the absolute max to 8 MB because honestly that's a huge pipe
+ // my current linux machine only goes up to 1 MB, so that's very unlikely to be hit
+ return @minimum(@truncate(c_int, max_pipe_size -| 32), 1024 * 1024 * 8);
+ }
+ }.once, c_int),
+ );
+}
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 15d32adb5..4e812437e 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -5679,7 +5679,7 @@ var require_ours = __commonJS({
*
*/
function createNativeStream(nativeType, Readable) {
- var [pull, start, cancel, setClose, deinit, updateRef] =
+ var [pull, start, cancel, setClose, deinit, updateRef, drainFn] =
globalThis[Symbol.for("Bun.lazy")](nativeType);
var closer = [false];
@@ -5687,7 +5687,7 @@ function createNativeStream(nativeType, Readable) {
if (result > 0) {
const slice = view.subarray(0, result);
const remainder = view.subarray(result);
- if (remainder.byteLength > 0) {
+ if (slice.byteLength > 0) {
nativeReadable.push(slice);
}
@@ -5701,6 +5701,8 @@ function createNativeStream(nativeType, Readable) {
if (isClosed) {
nativeReadable.push(null);
}
+
+ return view;
};
var handleArrayBufferViewResult = function (
@@ -5720,23 +5722,9 @@ function createNativeStream(nativeType, Readable) {
return view;
};
- var handleResult = function (nativeReadable, result, view, isClosed) {
- if (typeof result === "number") {
- return handleNumberResult(nativeReadable, result, view, isClosed);
- } else if (typeof result === "boolean") {
- nativeReadable.push(null);
- return view?.byteLength ?? 0 > 0 ? view : undefined;
- } else if (ArrayBuffer.isView(result)) {
- return handleArrayBufferViewResult(
- nativeReadable,
- result,
- view,
- isClosed,
- );
- } else {
- throw new Error("Invalid result from pull");
- }
- };
+ var DYNAMICALLY_ADJUST_CHUNK_SIZE =
+ process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1";
+
var NativeReadable = class NativeReadable extends Readable {
#ptr;
#refCount = 1;
@@ -5744,6 +5732,7 @@ function createNativeStream(nativeType, Readable) {
#remainingChunk = undefined;
#highWaterMark;
#pendingRead = false;
+ #hasResized = !DYNAMICALLY_ADJUST_CHUNK_SIZE;
constructor(ptr, options = {}) {
super(options);
if (typeof options.highWaterMark === "number") {
@@ -5767,13 +5756,24 @@ function createNativeStream(nativeType, Readable) {
}
if (!this.#constructed) {
- this.#constructed = true;
- start(ptr, this.#highWaterMark);
+ this.#internalConstruct(ptr);
}
return this.#internalRead(this.#getRemainingChunk(), ptr);
}
+ #internalConstruct(ptr) {
+ this.#constructed = true;
+ start(ptr, this.#highWaterMark);
+
+ if (drainFn) {
+ const drainResult = drainFn(ptr);
+ if ((drainResult?.byteLength ?? 0) > 0) {
+ this.push(drainResult);
+ }
+ }
+ }
+
#getRemainingChunk() {
var chunk = this.#remainingChunk;
var highWaterMark = this.#highWaterMark;
@@ -5784,23 +5784,43 @@ function createNativeStream(nativeType, Readable) {
return chunk;
}
+ #handleResult(result, view, isClosed) {
+ if (typeof result === "number") {
+ if (result >= this.#highWaterMark && !this.#hasResized) {
+ this.#highWaterMark *= 2;
+ this.#hasResized = true;
+ }
+
+ return handleNumberResult(this, result, view, isClosed);
+ } else if (typeof result === "boolean") {
+ this.push(null);
+ return view?.byteLength ?? 0 > 0 ? view : undefined;
+ } else if (ArrayBuffer.isView(result)) {
+ if (result.byteLength >= this.#highWaterMark && !this.#hasResized) {
+ this.#highWaterMark *= 2;
+ this.#hasResized = true;
+ }
+
+ return handleArrayBufferViewResult(this, result, view, isClosed);
+ } else {
+ throw new Error("Invalid result from pull");
+ }
+ }
+
#internalRead(view, ptr) {
closer[0] = false;
var result = pull(ptr, view, closer);
if (isPromise(result)) {
this.#pendingRead = true;
- var originalFlowing = this._readableState.flowing;
- this._readableState.flowing = false;
return result.then(
(result) => {
- this._readableState.flowing = originalFlowing;
this.#pendingRead = false;
- this.#remainingChunk = handleResult(this, result, view, closer[0]);
+ this.#remainingChunk = this.#handleResult(result, view, closer[0]);
},
(reason) => errorOrDestroy(this, reason),
);
} else {
- this.#remainingChunk = handleResult(this, result, view, closer[0]);
+ this.#remainingChunk = this.#handleResult(result, view, closer[0]);
}
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 7ec20b046..c9a3fb226 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1117,26 +1117,16 @@ pub const FileSink = struct {
}
fn adjustPipeLengthOnLinux(this: *FileSink, fd: JSC.Node.FileDescriptor, remain_len: usize) void {
- const F_SETPIPE_SZ = 1031;
- const F_GETPIPE_SZ = 1032;
-
// On Linux, we can adjust the pipe size to avoid blocking.
this.has_adjusted_pipe_size_on_linux = true;
- var pipe_len: c_int = 0;
- _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len);
- if (pipe_len < 0) return;
-
- // If we have a valid pipe_len, then pessimistically set it to that.
- this.max_write_size = @intCast(usize, pipe_len);
- if (pipe_len < remain_len) {
- // If our real pipe length is less than the amount of data we have left to write,
- // let's figure out what the maximum pipe size is and grow it to that.
- var out_size = getMaxPipeSizeOnLinux();
- _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size);
- if (out_size > 0) {
- this.max_write_size = @intCast(usize, out_size);
- }
+ switch (JSC.Node.Syscall.setPipeCapacityOnLinux(fd, @minimum(Syscall.getMaxPipeSizeOnLinux(), remain_len))) {
+ .result => |len| {
+ if (len > 0) {
+ this.max_write_size = len;
+ }
+ },
+ else => {},
}
}
@@ -1184,12 +1174,10 @@ pub const FileSink = struct {
break :brk this.max_write_size;
} else remain.len;
-
while (remain.len > 0) {
const write_buf = remain[0..@minimum(remain.len, max_to_write)];
-
- log("Write {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ write_buf.len, fd, this.head, remain.len, total });
const res = JSC.Node.Syscall.write(fd, write_buf);
+
if (res == .err) {
const retry =
std.os.E.AGAIN;
@@ -1298,13 +1286,14 @@ pub const FileSink = struct {
}
fn cleanup(this: *FileSink) void {
+ if (this.poll_ref) |poll| {
+ this.poll_ref = null;
+ poll.deinit();
+ }
+
if (this.fd != JSC.Node.invalid_fd) {
if (this.scheduled_count > 0) {
this.scheduled_count = 0;
- if (this.poll_ref) |poll| {
- this.poll_ref = null;
- poll.deinit();
- }
}
_ = JSC.Node.Syscall.close(this.fd);
@@ -1448,7 +1437,7 @@ pub const FileSink = struct {
fn isPending(this: *const FileSink) bool {
var poll_ref = this.poll_ref orelse return false;
- return poll_ref.isRegistered();
+ return poll_ref.isRegistered() and !poll_ref.flags.contains(.needs_rearm);
}
pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
@@ -2612,6 +2601,7 @@ pub fn ReadableStreamSource(
comptime onCancel: fn (this: *Context) void,
comptime deinit: fn (this: *Context) void,
comptime setRefUnrefFn: ?fn (this: *Context, enable: bool) void,
+ comptime drainInternalBuffer: ?fn (this: *Context) bun.ByteList,
) type {
return struct {
context: Context,
@@ -2699,6 +2689,14 @@ pub fn ReadableStreamSource(
return null;
}
+ pub fn drain(this: *This) bun.ByteList {
+ if (drainInternalBuffer) |drain_fn| {
+ return drain_fn(&this.context);
+ }
+
+ return .{};
+ }
+
pub fn toJS(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject) JSC.JSValue {
return ReadableStream.fromNative(globalThis, Context.tag, this);
}
@@ -2783,6 +2781,15 @@ pub fn ReadableStreamSource(
return JSValue.jsUndefined();
}
+ pub fn drain(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ var list = this.drain();
+ if (list.len > 0) {
+ return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis, null);
+ }
+ return JSValue.jsUndefined();
+ }
+
pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue {
if (comptime JSC.is_bindgen) unreachable;
// This is used also in Node.js streams
@@ -2796,6 +2803,10 @@ pub fn ReadableStreamSource(
JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.updateRef, true)
else
JSC.JSValue.jsNull(),
+ if (drainInternalBuffer != null)
+ JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.drain, true)
+ else
+ JSC.JSValue.jsNull(),
});
}
@@ -2889,7 +2900,16 @@ pub const ByteBlobLoader = struct {
bun.default_allocator.destroy(this);
}
- pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit, null);
+ pub const Source = ReadableStreamSource(
+ @This(),
+ "ByteBlob",
+ onStart,
+ onPull,
+ onCancel,
+ deinit,
+ null,
+ null,
+ );
};
pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
@@ -3169,7 +3189,16 @@ pub const ByteStream = struct {
bun.default_allocator.destroy(this.parent());
}
- pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit, null);
+ pub const Source = ReadableStreamSource(
+ @This(),
+ "ByteStream",
+ onStart,
+ onPull,
+ onCancel,
+ deinit,
+ null,
+ null,
+ );
};
/// **Not** the Web "FileReader" API
@@ -3199,6 +3228,7 @@ pub const FileReader = struct {
stored_global_this_: ?*JSC.JSGlobalObject = null,
poll_ref: ?*JSC.FilePoll = null,
has_adjusted_pipe_size_on_linux: bool = false,
+ is_fifo: bool = false,
finished: bool = false,
/// When we have some way of knowing that EOF truly is the write end of the
@@ -3440,9 +3470,11 @@ pub const FileReader = struct {
},
};
- if (this.poll_ref) |poll| {
+ if (this.poll_ref != null or this.is_fifo) {
file.seekable = false;
- std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd));
+ this.is_fifo = true;
+ if (this.poll_ref) |poll|
+ std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd));
} else {
if (!auto_close) {
// ensure we have non-blocking IO set
@@ -3500,6 +3532,7 @@ pub const FileReader = struct {
file.seekable = std.os.S.ISREG(stat.mode);
file.mode = @intCast(JSC.Node.Mode, stat.mode);
this.mode = file.mode;
+ this.is_fifo = std.os.S.ISFIFO(stat.mode);
if (file.seekable orelse false)
file.max_size = @intCast(Blob.SizeType, stat.size);
@@ -3541,13 +3574,6 @@ pub const FileReader = struct {
const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
std.debug.assert(this.started);
- if (this.buffered_data.len > 0) {
- const data = this.buffered_data;
- this.buffered_data.len = 0;
- this.buffered_data.cap = 0;
- return .{ .owned = data };
- }
-
switch (chunk_size) {
0 => {
std.debug.assert(this.store.data.file.seekable orelse false);
@@ -3555,16 +3581,18 @@ pub const FileReader = struct {
return .{ .done = {} };
},
run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
- this.view.set(this.globalThis(), view);
- // should never be reached
- this.pending.result = .{
- .err = Syscall.Error.todo,
- };
- this.buf = buffer;
+ if (!this.isFIFO()) {
+ this.view.set(this.globalThis(), view);
+ // should never be reached
+ this.pending.result = .{
+ .err = Syscall.Error.todo,
+ };
+ this.buf = buffer;
- this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
+ this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
- return .{ .pending = &this.pending };
+ return .{ .pending = &this.pending };
+ }
},
else => {},
}
@@ -3632,56 +3660,41 @@ pub const FileReader = struct {
var buf_to_use = read_buf;
var free_buffer_on_error: bool = false;
+ var pipe_is_empty_on_linux = bun.VoidUnless(bool, Environment.isLinux, false);
+ var len: c_int = available_to_read orelse 0;
// if it's a pipe, we really don't know what to expect what the max size will be
// if the pipe is sending us WAY bigger data than what we can fit in the buffer
// we allocate a new buffer of up to 4 MB
if (this.isFIFO() and view != .zero) {
outer: {
- var len: c_int = available_to_read orelse 0;
// macOS FIONREAD doesn't seem to work here
// but we can get this information from the kqueue callback so we don't need to
if (comptime Environment.isLinux) {
if (len == 0) {
- const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD;
- const rc: c_int = std.c.ioctl(fd, FIONREAD, &len);
+ const rc: c_int = std.c.ioctl(fd, std.os.linux.T.FIONREAD, &len);
if (rc != 0) {
len = 0;
}
- // In Linux versions before 2.6.11, the capacity of a
- // pipe was the same as the system page size (e.g., 4096
- // bytes on i386). Since Linux 2.6.11, the pipe
- // capacity is 16 pages (i.e., 65,536 bytes in a system
- // with a page size of 4096 bytes). Since Linux 2.6.35,
- // the default pipe capacity is 16 pages, but the
- // capacity can be queried and set using the
- // fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations.
- // See fcntl(2) for more information.
-
- //:# define F_SETPIPE_SZ 1031 /* Set pipe page size array.
- const F_SETPIPE_SZ = 1031;
- const F_GETPIPE_SZ = 1032;
-
if (len > 0) {
if (this.poll_ref) |poll| {
poll.flags.insert(.readable);
}
- } else if (this.poll_ref) |poll| {
- poll.flags.remove(.readable);
+ } else {
+ if (this.poll_ref) |poll| {
+ poll.flags.remove(.readable);
+ }
+
+ pipe_is_empty_on_linux = true;
}
+ // we do not un-mark it as readable if there's nothing in the pipe
if (!this.has_adjusted_pipe_size_on_linux) {
- if (len >= std.mem.page_size * 16) {
+ if (len > 0 and buf_to_use.len >= std.mem.page_size * 16) {
this.has_adjusted_pipe_size_on_linux = true;
- var pipe_len: c_int = 0;
- _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len);
-
- if (pipe_len > 0 and pipe_len < std.mem.page_size * 16) {
- var out_size: c_int = getMaxPipeSizeOnLinux();
- _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size);
- }
+ _ = Syscall.setPipeCapacityOnLinux(fd, @minimum(buf_to_use.len * 4, Syscall.getMaxPipeSizeOnLinux()));
}
}
}
@@ -3706,8 +3719,10 @@ pub const FileReader = struct {
}
if (this.poll_ref) |poll| {
- if ((available_to_read orelse 0) > 0) {
- poll.flags.insert(.readable);
+ if (comptime Environment.isMac) {
+ if ((available_to_read orelse 0) > 0) {
+ poll.flags.insert(.readable);
+ }
}
const is_readable = poll.isReadable();
@@ -3720,7 +3735,7 @@ pub const FileReader = struct {
} else if (!is_readable and poll.isHUP()) {
this.finalize();
return .{ .done = {} };
- } else if (!is_readable and poll.isRegistered()) {
+ } else if (!is_readable) {
if (this.finished) {
this.finalize();
return .{ .done = {} };
@@ -3730,18 +3745,46 @@ pub const FileReader = struct {
this.view.set(this.globalThis(), view);
this.buf = read_buf;
if (!this.isWatching())
- this.watch(this.fd);
+ this.watch(fd);
+ }
+
+ return .{
+ .pending = &this.pending,
+ };
+ }
+ }
+
+ if (comptime Environment.isLinux) {
+ if (pipe_is_empty_on_linux) {
+ std.debug.assert(this.poll_ref == null);
+ if (view != .zero) {
+ this.view.set(this.globalThis(), view);
+ this.buf = read_buf;
}
+ this.watch(fd);
return .{
.pending = &this.pending,
};
}
}
- const rc = Syscall.read(fd, buf_to_use);
+ // const rc: JSC.Node.Maybe(usize) = if (comptime Environment.isLinux) brk: {
+ // if (len == 65536 and this.has_adjusted_pipe_size_on_linux and buf_to_use.len > len) {
+ // var iovecs = [_]std.os.iovec{.{ .iov_base = @intToPtr([*]u8, @ptrToInt(buf_to_use.ptr)), .iov_len = @intCast(usize, buf_to_use.len) }};
+ // const rc = bun.C.linux.vmsplice(fd, &iovecs, 1, 0);
+ // Output.debug("vmsplice({d}, {d}) = {d}", .{ fd, buf_to_use.len, rc });
+ // if (JSC.Node.Maybe(usize).errnoSys(rc, .read)) |err| {
+ // break :brk err;
+ // }
- switch (rc) {
+ // break :brk JSC.Node.Maybe(usize){ .result = @intCast(usize, rc) };
+ // }
+
+ // break :brk Syscall.read(fd, buf_to_use);
+ // } else Syscall.read(fd, buf_to_use);
+
+ switch (Syscall.read(fd, buf_to_use)) {
.err => |err| {
const retry = std.os.E.AGAIN;
const errno = brk: {
@@ -3798,19 +3841,16 @@ pub const FileReader = struct {
return .{ .err = sys };
},
.result => |result| {
- if (this.poll_ref) |poll| {
- if (this.isFIFO()) {
- if (result < buf_to_use.len) {
- // do not insert .eof here
- poll.flags.remove(.readable);
+ if (this.isFIFO()) {
+ if (this.poll_ref) |poll| {
- if (result > 0 and !poll.flags.contains(.hup) and !this.finished) {
- // partial read, but not close. be sure to ask for more data
- if (!this.isWatching())
- this.watch(fd);
- }
- }
+ // do not insert .eof here
+ if (result < buf_to_use.len)
+ poll.flags.remove(.readable);
}
+
+ if (!this.finished and !this.isWatching())
+ this.watch(fd);
}
if (result == 0 and free_buffer_on_error) {
@@ -3822,13 +3862,11 @@ pub const FileReader = struct {
return this.handleReadChunk(result, view, true, buf_to_use);
}
- if (result == 0 and !this.finished and !this.close_on_eof and this.isFIFO()) {
+ if (result == 0 and this.isFIFO() and view != .zero) {
this.view.set(this.globalThis(), view);
this.buf = read_buf;
- if (!this.isWatching())
- this.watch(fd);
- this.poll_ref.?.flags.remove(.readable);
-
+ if (this.poll_ref) |poll|
+ poll.flags.remove(.readable);
return .{
.pending = &this.pending,
};
@@ -3955,7 +3993,25 @@ pub const FileReader = struct {
}
}
- pub const Source = ReadableStreamSource(@This(), "FileReader", onStart, onPullInto, onCancel, deinit, setRefOrUnref);
+ pub fn drainInternalBuffer(this: *FileReader) bun.ByteList {
+ var buffered = this.buffered_data;
+ if (buffered.len > 0) {
+ this.buffered_data = .{};
+ }
+
+ return buffered;
+ }
+
+ pub const Source = ReadableStreamSource(
+ @This(),
+ "FileReader",
+ onStart,
+ onPullInto,
+ onCancel,
+ deinit,
+ setRefOrUnref,
+ drainInternalBuffer,
+ );
};
pub fn NewReadyWatcher(
@@ -3970,11 +4026,15 @@ pub fn NewReadyWatcher(
const Watcher = @This();
pub inline fn isFIFO(this: *const Context) bool {
- if (this.poll_ref) |poll| {
- return poll.flags.contains(.fifo) or poll.flags.contains(.tty);
+ if (comptime @hasField(Context, "is_fifo")) {
+ return this.is_fifo;
+ }
+
+ if (this.poll_ref != null) {
+ return true;
}
- if (@hasField(Context, "mode")) {
+ if (comptime @hasField(Context, "mode")) {
return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode);
}
@@ -4046,34 +4106,3 @@ pub fn NewReadyWatcher(
// pub fn onError(this: *Streamer): anytype,
// };
// }
-
-fn getMaxPipeSizeOnLinux() c_int {
- return bun.once(struct {
- fn once() c_int {
- const default_out_size = 512 * 1024;
- const pipe_max_size_fd = switch (JSC.Node.Syscall.open("/proc/sys/fs/pipe-max-size", std.os.O.RDONLY, 0)) {
- .result => |fd2| fd2,
- .err => |err| {
- Output.debug("Failed to open /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno});
- return default_out_size;
- },
- };
- defer _ = JSC.Node.Syscall.close(pipe_max_size_fd);
- var max_pipe_size_buf: [128]u8 = undefined;
- const max_pipe_size = switch (JSC.Node.Syscall.read(pipe_max_size_fd, max_pipe_size_buf[0..])) {
- .result => |bytes_read| std.fmt.parseInt(i64, strings.trim(max_pipe_size_buf[0..bytes_read], "\n"), 10) catch |err| {
- Output.debug("Failed to parse /proc/sys/fs/pipe-max-size: {any}\n", .{@errorName(err)});
- return default_out_size;
- },
- .err => |err| {
- Output.debug("Failed to read /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno});
- return default_out_size;
- },
- };
-
- // we set the absolute max to 8 MB because honestly that's a huge pipe
- // my current linux machine only goes up to 1 MB, so that's very unlikely to be hit
- return @minimum(@truncate(c_int, max_pipe_size), 1024 * 1024 * 8);
- }
- }.once, c_int);
-}
diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig
index ac210dfe2..3f0082940 100644
--- a/src/cli/test_command.zig
+++ b/src/cli/test_command.zig
@@ -504,10 +504,10 @@ pub const TestCommand = struct {
if (!Jest.Jest.runner.?.has_pending_tests) Jest.Jest.runner.?.drain();
vm.eventLoop().tick();
- while (Jest.Jest.runner.?.has_pending_tests) : (vm.eventLoop().tick()) {
- vm.eventLoop().tick();
- if (!Jest.Jest.runner.?.has_pending_tests) break;
+ while (Jest.Jest.runner.?.has_pending_tests) {
vm.eventLoop().autoTick();
+ if (!Jest.Jest.runner.?.has_pending_tests) break;
+ vm.eventLoop().tick();
}
}
_ = vm.global.vm().runGC(false);
diff --git a/src/global.zig b/src/global.zig
index 6135e807d..97e5dc29d 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -396,3 +396,8 @@ pub fn once(comptime function: anytype, comptime ReturnType: type) ReturnType {
return Result.execute();
}
+
+pub fn VoidUnless(comptime T: type, comptime cond: bool, comptime default: T) T {
+ if (cond) return default;
+ return {};
+}
diff --git a/src/linux_c.zig b/src/linux_c.zig
index 04a2a508c..b200becdc 100644
--- a/src/linux_c.zig
+++ b/src/linux_c.zig
@@ -462,3 +462,5 @@ pub extern "c" fn posix_spawn_file_actions_addfchdir_np(actions: *posix_spawn_fi
// pub extern "c" fn posix_spawn_file_actions_addinherit_np(actions: *posix_spawn_file_actions_t, filedes: fd_t) c_int;
pub extern "c" fn posix_spawn_file_actions_addchdir_np(actions: *posix_spawn_file_actions_t, path: [*:0]const u8) c_int;
+
+pub extern fn vmsplice(fd: c_int, iovec: [*]const std.os.iovec, iovec_count: usize, flags: u32) isize;