aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-09-25 13:08:51 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-09-25 13:08:51 -0700
commita4d46fc7db7459e3e7e895d3013ffe65b0f0078c (patch)
tree8c6d78f3ccc127b16ff38fe098223bd9b69c2736
parent7ce4a4e3d3f5f06a1258eefc49ce1da166e43886 (diff)
downloadbun-jarred/subprocess.tar.gz
bun-jarred/subprocess.tar.zst
bun-jarred/subprocess.zip
-rw-r--r--examples/spawn.ts23
-rw-r--r--src/bun.js/api/bun/spawn.zig19
-rw-r--r--src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp5
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js3
-rw-r--r--src/bun.js/event_loop.zig115
-rw-r--r--src/bun.js/webcore/streams.zig30
6 files changed, 180 insertions, 15 deletions
diff --git a/examples/spawn.ts b/examples/spawn.ts
new file mode 100644
index 000000000..c29cc4f21
--- /dev/null
+++ b/examples/spawn.ts
@@ -0,0 +1,23 @@
+import { readableStreamToText } from "bun";
+import { spawn } from "bun";
+
+const proc = spawn({
+ cmd: ["ls", "-l"],
+
+ // Both of these forms work:
+
+ // as an array:
+ stdio: ["ignore", "pipe", "ignore"],
+
+ // You can also use "inherit" to inherit the parent's stdio.
+ // stdin: "inherit",
+
+ // You can pass a Bun.file to save it to a file:
+ // stdout: Bun.file("/tmp/stdout.txt"),
+});
+
+const result = await readableStreamToText(proc.stdout);
+
+await proc.exitStatus;
+
+console.log(result);
diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig
index 58dac9e9f..c1deebd3a 100644
--- a/src/bun.js/api/bun/spawn.zig
+++ b/src/bun.js/api/bun/spawn.zig
@@ -4,14 +4,19 @@ const string = bun.string;
const std = @import("std");
fn _getSystem() type {
- if (comptime bun.Environment.isLinux) {
- return struct {
- pub usingnamespace std.os.system;
- pub usingnamespace bun.C.linux;
- };
- }
+ // this is a workaround for a Zig stage1 bug
+ // the "usingnamespace" is evaluating in dead branches
+ return brk: {
+ if (comptime bun.Environment.isLinux) {
+ const Type = bun.C.linux;
+ break :brk struct {
+ pub usingnamespace std.os.system;
+ pub usingnamespace Type;
+ };
+ }
- return std.os.system;
+ break :brk std.os.system;
+ };
}
const system = _getSystem();
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
index a862fc0b4..79db2b727 100644
--- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
+++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
@@ -2253,7 +2253,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo
const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
-const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2701;
+const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2512;
static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsLazyLoadStreamCode =
"(function (stream, autoAllocateChunkSize) {\n" \
@@ -2277,7 +2277,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" handleResult = function handleResult(result, controller, view) {\n" \
" \"use strict\";\n" \
"\n" \
- " console.log(\"handleResult\", result, controller, view);\n" \
" \n" \
" if (result && @isPromise(result)) {\n" \
" return result.then(\n" \
@@ -2289,10 +2288,8 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" );\n" \
" } else if (result !== false) {\n" \
" if (view && view.byteLength === result) {\n" \
- " console.log(\"view\", result, controller.byobRequest);\n" \
" controller.byobRequest.respondWithNewView(view);\n" \
" } else {\n" \
- " console.log(\"result\", result, controller.byobRequest);\n" \
" controller.byobRequest.respond(result);\n" \
" }\n" \
" }\n" \
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index a1e496290..3d14535ca 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -1855,7 +1855,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
handleResult = function handleResult(result, controller, view) {
"use strict";
- console.log("handleResult", result, controller, view);
if (result && @isPromise(result)) {
return result.then(
@@ -1867,10 +1866,8 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
);
} else if (result !== false) {
if (view && view.byteLength === result) {
- console.log("view", result, controller.byobRequest);
controller.byobRequest.respondWithNewView(view);
} else {
- console.log("result", result, controller.byobRequest);
controller.byobRequest.respond(result);
}
}
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 238b3907f..747bf01e0 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -463,17 +463,28 @@ pub const Poller = struct {
@field(Pollable.Tag, "FileBlobLoader") => {
var loader = ptr.as(FileBlobLoader);
loop.active -= 1;
+ loop.num_polls -= 1;
+
loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags);
},
@field(Pollable.Tag, "Subprocess") => {
var loader = ptr.as(JSC.Subprocess);
loop.num_polls -= 1;
+ loop.active -= 1;
// kqueue sends the same notification multiple times in the same tick potentially
// so we have to dedupe it
_ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable;
},
+ @field(Pollable.Tag, "FileSink") => {
+ var loader = ptr.as(JSC.WebCore.FileSink);
+
+ loop.num_polls -= 1;
+ loop.active -= 1;
+
+ loader.onPoll(0, 0);
+ },
else => |tag| {
bun.Output.panic(
"Internal error\nUnknown pollable tag: {d}\n",
@@ -489,17 +500,28 @@ pub const Poller = struct {
@field(Pollable.Tag, "FileBlobLoader") => {
var loader = ptr.as(FileBlobLoader);
loop.active -= 1;
+ loop.num_polls -= 1;
+
loader.onPoll(0, 0);
},
@field(Pollable.Tag, "Subprocess") => {
var loader = ptr.as(JSC.Subprocess);
loop.num_polls -= 1;
+ loop.active -= 1;
// kqueue sends the same notification multiple times in the same tick potentially
// so we have to dedupe it
_ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable;
},
+ @field(Pollable.Tag, "FileSink") => {
+ var loader = ptr.as(JSC.WebCore.FileSink);
+
+ loop.num_polls -= 1;
+ loop.active -= 1;
+
+ loader.onPoll(0, 0);
+ },
else => unreachable,
}
}
@@ -627,6 +649,99 @@ pub const Poller = struct {
}
}
+ pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) {
+ if (this.loop == null) {
+ this.loop = uws.Loop.get();
+ JSC.VirtualMachine.vm.uws_event_loop = this.loop.?;
+ }
+ const watcher_fd = this.loop.?.fd;
+
+ if (comptime Environment.isLinux) {
+ const ctl = linux.epoll_ctl(
+ watcher_fd,
+ linux.EPOLL.CTL_DEL,
+ fd,
+ null,
+ );
+
+ if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
+ return errno;
+ }
+
+ return JSC.Maybe(void).success;
+ } else if (comptime Environment.isMac) {
+ var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
+ changelist[0] = switch (flag) {
+ .read => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_READ,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(ctx).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .write => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_WRITE,
+ .data = 0,
+ .fflags = 0,
+ .udata = @ptrToInt(Pollable.init(ctx).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ .process => .{
+ .ident = @intCast(u64, fd),
+ .filter = std.os.system.EVFILT_PROC,
+ .data = 0,
+ .fflags = std.c.NOTE_EXIT,
+ .udata = @ptrToInt(Pollable.init(ctx).ptr()),
+ .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .ext = .{ 0, 0 },
+ },
+ };
+
+ // output events only include change errors
+ const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
+
+ // The kevent() system call returns the number of events placed in
+ // the eventlist, up to the value given by nevents. If the time
+ // limit expires, then kevent() returns 0.
+ const rc = std.os.system.kevent64(
+ watcher_fd,
+ &changelist,
+ 1,
+ // The same array may be used for the changelist and eventlist.
+ &changelist,
+ 1,
+ KEVENT_FLAG_ERROR_EVENTS,
+ &timeout,
+ );
+ // If an error occurs while
+ // processing an element of the changelist and there is enough room
+ // in the eventlist, then the event will be placed in the eventlist
+ // with EV_ERROR set in flags and the system error in data.
+ if (changelist[0].flags == std.c.EV_ERROR) {
+ return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
+ // Otherwise, -1 will be returned, and errno will be set to
+ // indicate the error condition.
+ }
+
+ const errno = std.c.getErrno(rc);
+
+ if (errno == .SUCCESS) {
+ return JSC.Maybe(void).success;
+ }
+
+ switch (rc) {
+ std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
+ else => unreachable,
+ }
+ } else {
+ @compileError("TODO: Poller");
+ }
+ }
+
pub fn tick(this: *Poller) void {
var loop = this.loop orelse return;
if (loop.active == 0) return;
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index abf220ad7..23aad70ec 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1135,13 +1135,20 @@ pub const FileSink = struct {
this.scheduled_count += 1;
}
+ pub fn unwatch(this: *FileSink) void {
+ std.debug.assert(this.scheduled_count > 0);
+ std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ _ = JSC.VirtualMachine.vm.poller.unwatch(this.opened_fd, .write, FileSink, this);
+ this.scheduled_count -= 1;
+ }
+
pub fn toJS(this: *FileSink, globalThis: *JSGlobalObject) JSValue {
return JSSink.createObject(globalThis, this);
}
pub fn onPoll(this: *FileSink, _: i64, _: u16) void {
this.scheduled_count -= 1;
- this.flush();
+ _ = this.flush();
}
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
@@ -3185,11 +3192,32 @@ pub const FileBlobLoader = struct {
this.pending.run();
}
+ pub fn unwatch(this: *FileBlobLoader) void {
+ std.debug.assert(this.scheduled_count > 0);
+ std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
+ _ = JSC.VirtualMachine.vm.poller.unwatch(this.fd, .read, FileBlobLoader, this);
+ this.scheduled_count -= 1;
+ }
+
pub fn finalize(this: *FileBlobLoader) void {
if (this.finalized)
return;
this.finalized = true;
+ if (this.scheduled_count > 0) {
+ this.unwatch();
+ }
+
+ this.pending.result = .{ .done = {} };
+ this.pending.run();
+
+ if (this.protected_view != .zero) {
+ this.protected_view.unprotect();
+ this.protected_view = .zero;
+
+ this.buf = &.{};
+ }
+
this.maybeAutoClose();
this.store.deref();