diff options
-rw-r--r-- | examples/spawn.ts | 23 | ||||
-rw-r--r-- | src/bun.js/api/bun/spawn.zig | 19 | ||||
-rw-r--r-- | src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp | 5 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 3 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 115 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 30 |
6 files changed, 180 insertions, 15 deletions
diff --git a/examples/spawn.ts b/examples/spawn.ts new file mode 100644 index 000000000..c29cc4f21 --- /dev/null +++ b/examples/spawn.ts @@ -0,0 +1,23 @@ +import { readableStreamToText } from "bun"; +import { spawn } from "bun"; + +const proc = spawn({ + cmd: ["ls", "-l"], + + // Both of these forms work: + + // as an array: + stdio: ["ignore", "pipe", "ignore"], + + // You can also use "inherit" to inherit the parent's stdio. + // stdin: "inherit", + + // You can pass a Bun.file to save it to a file: + // stdout: Bun.file("/tmp/stdout.txt"), +}); + +const result = await readableStreamToText(proc.stdout); + +await proc.exitStatus; + +console.log(result); diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index 58dac9e9f..c1deebd3a 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -4,14 +4,19 @@ const string = bun.string; const std = @import("std"); fn _getSystem() type { - if (comptime bun.Environment.isLinux) { - return struct { - pub usingnamespace std.os.system; - pub usingnamespace bun.C.linux; - }; - } + // this is a workaround for a Zig stage1 bug + // the "usingnamespace" is evaluating in dead branches + return brk: { + if (comptime bun.Environment.isLinux) { + const Type = bun.C.linux; + break :brk struct { + pub usingnamespace std.os.system; + pub usingnamespace Type; + }; + } - return std.os.system; + break :brk std.os.system; + }; } const system = _getSystem(); diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp index a862fc0b4..79db2b727 100644 --- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp +++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp @@ -2253,7 +2253,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None; const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public; -const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2701; +const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2512; static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_readableStreamInternalsLazyLoadStreamCode = "(function (stream, autoAllocateChunkSize) {\n" \ @@ -2277,7 +2277,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = " handleResult = function handleResult(result, controller, view) {\n" \ " \"use strict\";\n" \ "\n" \ - " console.log(\"handleResult\", result, controller, view);\n" \ " \n" \ " if (result && @isPromise(result)) {\n" \ " return result.then(\n" \ @@ -2289,10 +2288,8 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = " );\n" \ " } else if (result !== false) {\n" \ " if (view && view.byteLength === result) {\n" \ - " console.log(\"view\", result, controller.byobRequest);\n" \ " controller.byobRequest.respondWithNewView(view);\n" \ " } else {\n" \ - " console.log(\"result\", result, controller.byobRequest);\n" \ " controller.byobRequest.respond(result);\n" \ " }\n" \ " }\n" \ diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index a1e496290..3d14535ca 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -1855,7 +1855,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { handleResult = function handleResult(result, controller, view) { "use strict"; - console.log("handleResult", result, controller, view); if (result && @isPromise(result)) { return result.then( @@ -1867,10 +1866,8 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { ); } else if (result !== false) { if (view && view.byteLength === result) { - console.log("view", result, controller.byobRequest); controller.byobRequest.respondWithNewView(view); } else { - console.log("result", result, controller.byobRequest); controller.byobRequest.respond(result); } } diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 238b3907f..747bf01e0 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -463,17 +463,28 @@ pub const Poller = struct { @field(Pollable.Tag, "FileBlobLoader") => { var loader = ptr.as(FileBlobLoader); loop.active -= 1; + loop.num_polls -= 1; + loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags); }, @field(Pollable.Tag, "Subprocess") => { var loader = ptr.as(JSC.Subprocess); loop.num_polls -= 1; + loop.active -= 1; // kqueue sends the same notification multiple times in the same tick potentially // so we have to dedupe it _ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable; }, + @field(Pollable.Tag, "FileSink") => { + var loader = ptr.as(JSC.WebCore.FileSink); + + loop.num_polls -= 1; + loop.active -= 1; + + loader.onPoll(0, 0); + }, else => |tag| { bun.Output.panic( "Internal error\nUnknown pollable tag: {d}\n", @@ -489,17 +500,28 @@ pub const Poller = struct { @field(Pollable.Tag, "FileBlobLoader") => { var loader = ptr.as(FileBlobLoader); loop.active -= 1; + loop.num_polls -= 1; + loader.onPoll(0, 0); }, @field(Pollable.Tag, "Subprocess") => { var loader = ptr.as(JSC.Subprocess); loop.num_polls -= 1; + loop.active -= 1; // kqueue sends the same notification multiple times in the same tick potentially // so we have to dedupe it _ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable; }, + @field(Pollable.Tag, "FileSink") => { + var loader = ptr.as(JSC.WebCore.FileSink); + + loop.num_polls -= 1; + loop.active -= 1; + + loader.onPoll(0, 0); + }, else => unreachable, } } @@ -627,6 +649,99 @@ pub const Poller = struct { } } + pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) { + if (this.loop == null) { + this.loop = uws.Loop.get(); + JSC.VirtualMachine.vm.uws_event_loop = this.loop.?; + } + const watcher_fd = this.loop.?.fd; + + if (comptime Environment.isLinux) { + const ctl = linux.epoll_ctl( + watcher_fd, + linux.EPOLL.CTL_DEL, + fd, + null, + ); + + if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + return errno; + } + + return JSC.Maybe(void).success; + } else if (comptime Environment.isMac) { + var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + changelist[0] = switch (flag) { + .read => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(ctx).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .write => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(Pollable.init(ctx).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + .process => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_PROC, + .data = 0, + .fflags = std.c.NOTE_EXIT, + .udata = @ptrToInt(Pollable.init(ctx).ptr()), + .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .ext = .{ 0, 0 }, + }, + }; + + // output events only include change errors + const KEVENT_FLAG_ERROR_EVENTS = 0x000002; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = std.os.system.kevent64( + watcher_fd, + &changelist, + 1, + // The same array may be used for the changelist and eventlist. + &changelist, + 1, + KEVENT_FLAG_ERROR_EVENTS, + &timeout, + ); + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (changelist[0].flags == std.c.EV_ERROR) { + return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + const errno = std.c.getErrno(rc); + + if (errno == .SUCCESS) { + return JSC.Maybe(void).success; + } + + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, + else => unreachable, + } + } else { + @compileError("TODO: Poller"); + } + } + pub fn tick(this: *Poller) void { var loop = this.loop orelse return; if (loop.active == 0) return; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index abf220ad7..23aad70ec 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1135,13 +1135,20 @@ pub const FileSink = struct { this.scheduled_count += 1; } + pub fn unwatch(this: *FileSink) void { + std.debug.assert(this.scheduled_count > 0); + std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)); + _ = JSC.VirtualMachine.vm.poller.unwatch(this.opened_fd, .write, FileSink, this); + this.scheduled_count -= 1; + } + pub fn toJS(this: *FileSink, globalThis: *JSGlobalObject) JSValue { return JSSink.createObject(globalThis, this); } pub fn onPoll(this: *FileSink, _: i64, _: u16) void { this.scheduled_count -= 1; - this.flush(); + _ = this.flush(); } pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { @@ -3185,11 +3192,32 @@ pub const FileBlobLoader = struct { this.pending.run(); } + pub fn unwatch(this: *FileBlobLoader) void { + std.debug.assert(this.scheduled_count > 0); + std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); + _ = JSC.VirtualMachine.vm.poller.unwatch(this.fd, .read, FileBlobLoader, this); + this.scheduled_count -= 1; + } + pub fn finalize(this: *FileBlobLoader) void { if (this.finalized) return; this.finalized = true; + if (this.scheduled_count > 0) { + this.unwatch(); + } + + this.pending.result = .{ .done = {} }; + this.pending.run(); + + if (this.protected_view != .zero) { + this.protected_view.unprotect(); + this.protected_view = .zero; + + this.buf = &.{}; + } + this.maybeAutoClose(); this.store.deref(); |