diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/api/bun/socket.zig | 16 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 22 | ||||
-rw-r--r-- | src/bun.js/api/html_rewriter.zig | 2 | ||||
-rw-r--r-- | src/bun.js/base.zig | 120 | ||||
-rw-r--r-- | src/bun.js/bindings/JSReadableHelper.cpp | 68 | ||||
-rw-r--r-- | src/bun.js/bindings/JSReadableHelper.h | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/JSSink.cpp | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/JSSink.h | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/JSSinkLookupTable.h | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.cpp | 49 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.h | 3 | ||||
-rw-r--r-- | src/bun.js/bindings/bindings.zig | 16 | ||||
-rw-r--r-- | src/bun.js/bindings/exports.zig | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/headers.h | 2 | ||||
-rw-r--r-- | src/bun.js/child_process.exports.js | 122 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 1 | ||||
-rw-r--r-- | src/bun.js/node/syscall.zig | 7 | ||||
-rw-r--r-- | src/bun.js/streams.exports.js | 388 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 3 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 188 |
20 files changed, 724 insertions, 293 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index 77b4a266f..5bc1629c0 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -112,12 +112,15 @@ const Handlers = struct { pub fn callErrorHandler(this: *Handlers, thisValue: JSValue, err: []const JSValue) bool { const onError = this.onError; if (onError == .zero) { + if (err.len > 0) + this.vm.onUnhandledError(this.globalObject, err[0]); + return false; } const result = onError.callWithThis(this.globalObject, thisValue, err); if (!result.isEmptyOrUndefinedOrNull() and result.isAnyError(this.globalObject)) { - this.vm.runErrorHandler(result, null); + this.vm.onUnhandledError(this.globalObject, result); } return true; @@ -827,8 +830,6 @@ fn NewSocket(comptime ssl: bool) type { if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) { return; } - - handlers.vm.runErrorHandler(result, null); } } pub fn onTimeout( @@ -857,8 +858,6 @@ fn NewSocket(comptime ssl: bool) type { if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) { return; } - - handlers.vm.runErrorHandler(result, null); } } pub fn onConnectError(this: *This, socket: Socket, errno: c_int) void { @@ -948,7 +947,6 @@ fn NewSocket(comptime ssl: bool) type { return; } - handlers.vm.runErrorHandler(result, null); return; } } @@ -984,8 +982,6 @@ fn NewSocket(comptime ssl: bool) type { if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) { return; } - - handlers.vm.runErrorHandler(result, null); } } @@ -1015,8 +1011,6 @@ fn NewSocket(comptime ssl: bool) type { if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) { return; } - - handlers.vm.runErrorHandler(result, null); } } @@ -1046,8 +1040,6 @@ fn NewSocket(comptime ssl: bool) type { if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) { return; } - - handlers.vm.runErrorHandler(result, null); } } diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index dcfa88a40..2d8127901 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -30,7 +30,6 @@ pub const Subprocess = struct { stderr: Readable, killed: bool = false, - reffer: JSC.Ref = JSC.Ref.init(), poll_ref: ?*JSC.FilePoll = null, exit_promise: JSC.Strong = .{}, @@ -55,13 +54,14 @@ pub const Subprocess = struct { globalThis: *JSC.JSGlobalObject, pub fn ref(this: *Subprocess) void { - this.reffer.ref(this.globalThis.bunVM()); - if (this.poll_ref) |poll| poll.ref(this.globalThis.bunVM()); + var vm = this.globalThis.bunVM(); + if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm); } pub fn unref(this: *Subprocess) void { this.this_jsvalue.clear(); - this.unrefWithoutGC(this.globalThis.bunVM()); + var vm = this.globalThis.bunVM(); + if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm); } pub fn constructor( @@ -182,8 +182,6 @@ pub const Subprocess = struct { .pipe => { defer this.close(); - // TODO: handle when there's pending unread data in the pipe - // For some reason, this currently hangs forever if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) { if (this.pipe.buffer.canRead()) this.pipe.buffer.readIfPossible(true); @@ -1301,7 +1299,6 @@ pub const Subprocess = struct { if (!sync) { var vm = this.globalThis.bunVM(); - this.reffer.unref(vm); // prevent duplicate notifications if (this.poll_ref) |poll| { @@ -1309,17 +1306,10 @@ pub const Subprocess = struct { poll.deinitWithVM(vm); } - this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this); - this.has_waitpid_task = true; - vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task)); + this.onExit(); } } - pub fn unrefWithoutGC(this: *Subprocess, vm: *JSC.VirtualMachine) void { - if (this.poll_ref) |poll| poll.unref(vm); - this.reffer.unref(vm); - } - fn onExit(this: *Subprocess) void { this.closePorts(); @@ -1348,7 +1338,7 @@ pub const Subprocess = struct { ); if (result.isAnyError(this.globalThis)) { - this.globalThis.bunVM().runErrorHandler(result, null); + this.globalThis.bunVM().onUnhandledError(this.globalThis, result); } } diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index eb602223b..9ab424491 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -850,7 +850,7 @@ fn HandlerCallback( switch (promise.status(this.global.vm())) { JSC.JSPromise.Status.Pending => unreachable, JSC.JSPromise.Status.Rejected => { - JavaScript.VirtualMachine.vm.runErrorHandler(promise.result(this.global.vm()), null); + JavaScript.VirtualMachine.vm.onUnhandledError(this.global, promise.result(this.global.vm())); @field(zig_element, field_name) = null; return false; }, diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 37eb3e626..deeab246a 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -3985,7 +3985,7 @@ pub const FilePoll = struct { flags: Flags.Set = Flags.Set{}, owner: Owner = Deactivated.owner, - const FileBlobLoader = JSC.WebCore.FileBlobLoader; + const FileReader = JSC.WebCore.FileReader; const FileSink = JSC.WebCore.FileSink; const Subprocess = JSC.Subprocess; const BufferedInput = Subprocess.BufferedInput; @@ -3995,7 +3995,7 @@ pub const FilePoll = struct { }; pub const Owner = bun.TaggedPointerUnion(.{ - FileBlobLoader, + FileReader, FileSink, Subprocess, BufferedInput, @@ -4016,12 +4016,12 @@ pub const FilePoll = struct { pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void { poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*)); - poll.onUpdate(loop); + poll.onUpdate(loop, kqueue_event.data); } pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void { poll.updateFlags(Flags.fromEpollEvent(epoll_event.*)); - poll.onUpdate(loop); + poll.onUpdate(loop, 0); } pub fn clearEvent(poll: *FilePoll, flag: Flags) void { @@ -4072,16 +4072,16 @@ pub const FilePoll = struct { return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process); } - pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void { + pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop, size_or_offset: i64) void { if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) { if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop); poll.flags.insert(.needs_rearm); } var ptr = poll.owner; switch (ptr.tag()) { - @field(Owner.Tag, "FileBlobLoader") => { - log("onUpdate: FileBlobLoader", .{}); - ptr.as(FileBlobLoader).onPoll(0, 0); + @field(Owner.Tag, "FileReader") => { + log("onUpdate: FileReader", .{}); + ptr.as(FileReader).onPoll(size_or_offset, 0); }, @field(Owner.Tag, "Subprocess") => { log("onUpdate: Subprocess", .{}); @@ -4092,18 +4092,18 @@ pub const FilePoll = struct { @field(Owner.Tag, "FileSink") => { log("onUpdate: FileSink", .{}); var loader = ptr.as(JSC.WebCore.FileSink); - loader.onPoll(0, 0); + loader.onPoll(size_or_offset, 0); }, @field(Owner.Tag, "BufferedInput") => { log("onUpdate: BufferedInput", .{}); var loader = ptr.as(JSC.Subprocess.BufferedInput); - loader.onReady(0); + loader.onReady(size_or_offset); }, @field(Owner.Tag, "BufferedOutput") => { log("onUpdate: BufferedOutput", .{}); var loader = ptr.as(JSC.Subprocess.BufferedOutput); - loader.ready(0); + loader.ready(size_or_offset); }, else => {}, } @@ -4196,34 +4196,45 @@ pub const FilePoll = struct { const log = Output.scoped(.FilePoll, false); pub inline fn isActive(this: *const FilePoll) bool { - return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable); + return this.flags.contains(.has_incremented_poll_count); } /// Make calling ref() on this poll into a no-op. - pub fn disable(this: *FilePoll) void { - if (this.isRegistered()) { - this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?); - } - - this.unref(); + pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (this.flags.contains(.disable)) + return; this.flags.insert(.disable); + + vm.uws_event_loop.?.active -= @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count))); + } + + pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (!this.flags.contains(.disable)) + return; + this.flags.remove(.disable); + + vm.uws_event_loop.?.active += @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count))); + } + + pub fn canActivate(this: *const FilePoll) bool { + return !this.flags.contains(.has_incremented_poll_count); } /// Only intended to be used from EventLoop.Pollable pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void { std.debug.assert(this.flags.contains(.has_incremented_poll_count)); + loop.num_polls -= @as(i32, @boolToInt(this.flags.contains(.has_incremented_poll_count))); + loop.active -= @as(u32, @boolToInt(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count))); + this.flags.remove(.has_incremented_poll_count); - loop.num_polls -= 1; - loop.active -= 1; } /// Only intended to be used from EventLoop.Pollable pub fn activate(this: *FilePoll, loop: *uws.Loop) void { - std.debug.assert(!this.flags.contains(.has_incremented_poll_count)); - std.debug.assert(!this.flags.contains(.disable)); + loop.num_polls += @as(i32, @boolToInt(!this.flags.contains(.has_incremented_poll_count))); + loop.active += @as(u32, @boolToInt(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count))); + this.flags.insert(.has_incremented_poll_count); - loop.num_polls += 1; - loop.active += 1; } pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { @@ -4240,9 +4251,20 @@ pub const FilePoll = struct { return poll; } + pub inline fn canRef(this: *const FilePoll) bool { + if (this.flags.contains(.disable)) + return false; + + return !this.flags.contains(.has_incremented_poll_count); + } + + pub inline fn canUnref(this: *const FilePoll) bool { + return this.flags.contains(.has_incremented_poll_count); + } + /// Prevent a poll from keeping the process alive. pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (!this.isActive()) + if (!this.canUnref()) return; log("unref", .{}); this.deactivate(vm.uws_event_loop.?); @@ -4250,7 +4272,7 @@ pub const FilePoll = struct { /// Allow a poll to keep the process alive. pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (this.isActive()) + if (this.canRef()) return; log("ref", .{}); this.activate(vm.uws_event_loop.?); @@ -4296,11 +4318,13 @@ pub const FilePoll = struct { std.debug.assert(this.fd != invalid_fd); if (comptime Environment.isLinux) { + const one_shot_flag: u32 = if (!this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT; + const flags: u32 = switch (flag) { .process, .readable, - => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), - .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), + => linux.EPOLL.IN | linux.EPOLL.HUP | one_shot_flag, + .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | one_shot_flag, else => unreachable, }; @@ -4318,6 +4342,7 @@ pub const FilePoll = struct { } } else if (comptime Environment.isMac) { var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + const one_shot_flag: @TypeOf(changelist[0].flags) = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT; changelist[0] = switch (flag) { .readable => .{ .ident = @intCast(u64, fd), @@ -4325,7 +4350,7 @@ pub const FilePoll = struct { .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .flags = std.c.EV_ADD | one_shot_flag, .ext = .{ 0, 0 }, }, .writable => .{ @@ -4334,7 +4359,7 @@ pub const FilePoll = struct { .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .flags = std.c.EV_ADD | one_shot_flag, .ext = .{ 0, 0 }, }, .process => .{ @@ -4343,9 +4368,10 @@ pub const FilePoll = struct { .data = 0, .fflags = std.c.NOTE_EXIT, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_ADD, + .flags = std.c.EV_ADD | one_shot_flag, .ext = .{ 0, 0 }, }, + else => unreachable, }; // output events only include change errors @@ -4393,17 +4419,20 @@ pub const FilePoll = struct { } else { @compileError("TODO: Pollable"); } - - if (!this.isActive()) this.activate(loop); + if (this.canActivate()) + this.activate(loop); this.flags.insert(switch (flag) { - .process, .readable => .poll_readable, + .readable => .poll_readable, + .process => if (comptime Environment.isLinux) .poll_readable else .poll_process, .writable => .poll_writable, else => unreachable, }); + this.flags.remove(.needs_rearm); + return JSC.Maybe(void).success; } - pub const invalid_fd = std.math.maxInt(u32); + pub const invalid_fd = JSC.Node.invalid_fd; pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) { if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) { @@ -4424,6 +4453,14 @@ pub const FilePoll = struct { return JSC.Maybe(void).success; }; + if (this.flags.contains(.needs_rearm)) { + log("unregister: {s} ({d}) skipped due to needs_rearm", .{ @tagName(flag), fd }); + this.flags.remove(.poll_process); + this.flags.remove(.poll_readable); + this.flags.remove(.poll_process); + return JSC.Maybe(void).success; + } + log("unregister: {s} ({d})", .{ @tagName(flag), fd }); if (comptime Environment.isLinux) { @@ -4441,22 +4478,22 @@ pub const FilePoll = struct { var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); changelist[0] = switch (flag) { - .read => .{ + .readable => .{ .ident = @intCast(u64, fd), .filter = std.os.system.EVFILT_READ, .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .flags = std.c.EV_DELETE, .ext = .{ 0, 0 }, }, - .write => .{ + .writable => .{ .ident = @intCast(u64, fd), .filter = std.os.system.EVFILT_WRITE, .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .flags = std.c.EV_DELETE, .ext = .{ 0, 0 }, }, .process => .{ @@ -4465,7 +4502,7 @@ pub const FilePoll = struct { .data = 0, .fflags = std.c.NOTE_EXIT, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .flags = std.c.EV_DELETE, .ext = .{ 0, 0 }, }, else => unreachable, @@ -4498,10 +4535,9 @@ pub const FilePoll = struct { } const errno = std.c.getErrno(rc); - switch (rc) { std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, - else => unreachable, + else => {}, } } else { @compileError("TODO: Pollable"); diff --git a/src/bun.js/bindings/JSReadableHelper.cpp b/src/bun.js/bindings/JSReadableHelper.cpp index 384664ce9..3845d62f5 100644 --- a/src/bun.js/bindings/JSReadableHelper.cpp +++ b/src/bun.js/bindings/JSReadableHelper.cpp @@ -11,7 +11,7 @@ #include "JSDOMAttribute.h" #include "headers.h" #include "JSDOMConvertEnumeration.h" - +#include "JavaScriptCore/StrongInlines.h" namespace WebCore { using namespace JSC; @@ -56,13 +56,15 @@ static bool callRead(JSValue stream, JSFunction* read, JSC::MarkedArgumentBuffer return !ret.isUndefinedOrNull(); } -static JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore_); -JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) { JSReadableHelper_EXTRACT_STREAM_STATE - auto read - = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto clientData + = WebCore::clientData(vm); + auto readIdentifier = clientData->builtinNames().readPublicName(); + auto read = stream->get(lexicalGlobalObject, readIdentifier); + auto callData = JSC::getCallData(read); if (callData.type == CallData::Type::None) { throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); @@ -85,24 +87,14 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlo RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); } -JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) -{ - JSReadableHelper_EXTRACT_STREAM_STATE - - // make this static? - JSFunction* maybeReadMore_ - = JSC::JSFunction::create(vm, lexicalGlobalObject, 0, "maybeReadMore_"_s, jsReadable_maybeReadMore_, ImplementationVisibility::Public); - - lexicalGlobalObject->queueMicrotask(maybeReadMore_, JSValue(stream), JSValue(state), JSValue {}, JSValue {}); - RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); -} - void flow(JSGlobalObject* lexicalGlobalObject, JSObject* streamObj, JSReadableState* state) { VM& vm = lexicalGlobalObject->vm(); auto throwScope = DECLARE_THROW_SCOPE(vm); - auto read = streamObj->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto clientData = WebCore::clientData(vm); + auto readIdentifier = clientData->builtinNames().readPublicName(); + auto read = streamObj->get(lexicalGlobalObject, readIdentifier); auto callData = JSC::getCallData(read); if (callData.type == CallData::Type::None) { @@ -118,8 +110,7 @@ void flow(JSGlobalObject* lexicalGlobalObject, JSObject* streamObj, JSReadableSt } } -static JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_); -JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) +JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) { JSReadableHelper_EXTRACT_STREAM_STATE @@ -132,18 +123,20 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObje } auto& emitter = jsEmitterWrap->wrapped(); + auto clientData = WebCore::clientData(vm); + auto readIdentifier = clientData->builtinNames().readPublicName(); if (!state->getBool(JSReadableState::reading)) { // stream.read(0) MarkedArgumentBuffer args; args.append(jsNumber(0)); - callRead(stream, jsCast<JSFunction*>(stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s))), WTFMove(args), vm, lexicalGlobalObject, emitter); + callRead(stream, jsCast<JSFunction*>(stream->get(lexicalGlobalObject, readIdentifier)), WTFMove(args), vm, lexicalGlobalObject, emitter); } state->setBool(JSReadableState::resumeScheduled, true); // stream.emit('resume') - auto eventType = Identifier::fromString(vm, "resume"_s); + auto eventType = clientData->builtinNames().resumePublicName(); MarkedArgumentBuffer args; emitter.emitForBindings(eventType, args); @@ -152,7 +145,7 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObje if (state->m_flowing > 0 && !state->getBool(JSReadableState::reading)) { // stream.read(0) - auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s)); + auto read = stream->get(lexicalGlobalObject, readIdentifier); auto callData = JSC::getCallData(read); if (callData.type == CallData::Type::None) { throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read)); @@ -165,31 +158,16 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObje RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); } -JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) -{ - JSReadableHelper_EXTRACT_STREAM_STATE - - if (!state->getBool(JSReadableState::resumeScheduled)) - { - state->setBool(JSReadableState::resumeScheduled, true); - // make this static? - JSFunction* resume_ = JSC::JSFunction::create( - vm, lexicalGlobalObject, 0, "resume_"_s, jsReadable_resume_, ImplementationVisibility::Public); - - lexicalGlobalObject->queueMicrotask(resume_, JSValue(stream), JSValue(state), JSValue {}, JSValue {}); - } - RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined())); -} - EncodedJSValue emitReadable_(JSGlobalObject* lexicalGlobalObject, JSObject* stream, JSReadableState* state) { VM& vm = lexicalGlobalObject->vm(); auto throwScope = DECLARE_THROW_SCOPE(vm); - JSValue errored = state->m_errored.get(); if (!state->getBool(JSReadableState::destroyed) && !errored.toBoolean(lexicalGlobalObject) && (state->m_length || state->getBool(JSReadableState::ended))) { // stream.emit('readable') - auto eventType = Identifier::fromString(vm, "readable"_s); + auto clientData = WebCore::clientData(vm); + + auto eventType = clientData->builtinNames().readablePublicName(); MarkedArgumentBuffer args; auto emitter = jsDynamicCast<JSEventEmitter*>(stream); if (!emitter) { @@ -206,7 +184,6 @@ EncodedJSValue emitReadable_(JSGlobalObject* lexicalGlobalObject, JSObject* stre return JSValue::encode(jsUndefined()); } -JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_); JSC_DEFINE_HOST_FUNCTION(jsReadable_emitReadable_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) { JSReadableHelper_EXTRACT_STREAM_STATE @@ -223,11 +200,8 @@ EncodedJSValue emitReadable(JSGlobalObject* lexicalGlobalObject, JSObject* strea state->setBool(JSReadableState::needReadable, false); if (!state->getBool(JSReadableState::emittedReadable)) { state->setBool(JSReadableState::emittedReadable, true); - // make this static? - JSFunction* emitReadable_ = JSC::JSFunction::create( - vm, lexicalGlobalObject, 0, "emitReadable_"_s, jsReadable_emitReadable_, ImplementationVisibility::Public); - - lexicalGlobalObject->queueMicrotask(emitReadable_, JSValue(stream), JSValue(state), JSValue {}, JSValue {}); + Zig::GlobalObject* globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject); + globalObject->queueMicrotask(JSValue(globalObject->emitReadableNextTickFunction()), JSValue(stream), JSValue(state), JSValue {}, JSValue {}); } return JSValue::encode(jsUndefined()); } diff --git a/src/bun.js/bindings/JSReadableHelper.h b/src/bun.js/bindings/JSReadableHelper.h index 3a25c07cb..6746bcbec 100644 --- a/src/bun.js/bindings/JSReadableHelper.h +++ b/src/bun.js/bindings/JSReadableHelper.h @@ -8,5 +8,7 @@ JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore); JSC_DECLARE_HOST_FUNCTION(jsReadable_resume); JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable); JSC_DECLARE_HOST_FUNCTION(jsReadable_onEofChunk); +JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_); +JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_); } // namespace WebCore diff --git a/src/bun.js/bindings/JSSink.cpp b/src/bun.js/bindings/JSSink.cpp index c4097c8b4..a342793f0 100644 --- a/src/bun.js/bindings/JSSink.cpp +++ b/src/bun.js/bindings/JSSink.cpp @@ -1,6 +1,6 @@ // AUTO-GENERATED FILE. DO NOT EDIT. -// Generated by 'make generate-sink' at 2022-10-09T04:03:11.763Z +// Generated by 'make generate-sink' at 2022-11-13T22:44:00.280Z // To regenerate this file, run: // // make generate-sink diff --git a/src/bun.js/bindings/JSSink.h b/src/bun.js/bindings/JSSink.h index 4a7f26682..2c9d5e2a6 100644 --- a/src/bun.js/bindings/JSSink.h +++ b/src/bun.js/bindings/JSSink.h @@ -1,6 +1,6 @@ // AUTO-GENERATED FILE. DO NOT EDIT. -// Generated by 'make generate-sink' at 2022-10-09T04:03:11.760Z +// Generated by 'make generate-sink' at 2022-11-13T22:44:00.279Z // #pragma once diff --git a/src/bun.js/bindings/JSSinkLookupTable.h b/src/bun.js/bindings/JSSinkLookupTable.h index be66e3530..14d547708 100644 --- a/src/bun.js/bindings/JSSinkLookupTable.h +++ b/src/bun.js/bindings/JSSinkLookupTable.h @@ -1,4 +1,4 @@ -// Automatically generated from src/bun.js/bindings/JSSink.cpp using /Users/zhiyuan.guo/Documents/playground/bun/src/bun.js/WebKit/Source/JavaScriptCore/create_hash_table. DO NOT EDIT! +// Automatically generated from src/bun.js/bindings/JSSink.cpp using /Users/jarred/Code/bun/src/bun.js/WebKit/Source/JavaScriptCore/create_hash_table. DO NOT EDIT! diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index c01615685..052295bd2 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -1052,7 +1052,7 @@ JSC: return ByteBlob__JSReadableStreamSource__load(globalObject); } case ReadableStreamTag::File: { - return FileBlobLoader__JSReadableStreamSource__load(globalObject); + return FileReader__JSReadableStreamSource__load(globalObject); } case ReadableStreamTag::Bytes: { return ByteStream__JSReadableStreamSource__load(globalObject); @@ -2329,6 +2329,10 @@ void GlobalObject::finishCreation(VM& vm) [](const Initializer<JSFunction>& init) { init.set(JSFunction::create(init.vm, init.owner, 4, "performMicrotask"_s, jsFunctionPerformMicrotask, ImplementationVisibility::Public)); }); + m_emitReadableNextTickFunction.initLater( + [](const Initializer<JSFunction>& init) { + init.set(JSFunction::create(init.vm, init.owner, 4, "emitReadable"_s, WebCore::jsReadable_emitReadable_, ImplementationVisibility::Public)); + }); m_performMicrotaskVariadicFunction.initLater( [](const Initializer<JSFunction>& init) { @@ -2741,6 +2745,47 @@ JSC_DEFINE_CUSTOM_GETTER(functionLazyNavigatorGetter, return JSC::JSValue::encode(reinterpret_cast<Zig::GlobalObject*>(globalObject)->navigatorObject()); } +JSC_DEFINE_HOST_FUNCTION(functionGetDirectStreamDetails, (JSC::JSGlobalObject * lexicalGlobalObject, JSC::CallFrame* callFrame)) +{ + auto* globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject); + JSC::VM& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + auto argCount = callFrame->argumentCount(); + if (argCount != 1) { + return JSC::JSValue::encode(JSC::jsNull()); + } + + auto stream = callFrame->argument(0); + if (!stream.isObject()) { + return JSC::JSValue::encode(JSC::jsNull()); + } + + auto* streamObject = stream.getObject(); + auto* readableStream = jsDynamicCast<WebCore::JSReadableStream*>(streamObject); + if (!readableStream) { + return JSC::JSValue::encode(JSC::jsNull()); + } + + auto clientData = WebCore::clientData(vm); + + JSValue ptrValue = readableStream->get(globalObject, clientData->builtinNames().bunNativePtrPrivateName()); + JSValue typeValue = readableStream->get(globalObject, clientData->builtinNames().bunNativeTypePrivateName()); + auto result = ptrValue.asAnyInt(); + + if (result == 0 || !typeValue.isNumber()) { + return JSC::JSValue::encode(JSC::jsNull()); + } + + readableStream->putDirect(vm, clientData->builtinNames().bunNativePtrPrivateName(), jsUndefined(), 0); + readableStream->putDirect(vm, clientData->builtinNames().bunNativeTypePrivateName(), jsUndefined(), 0); + + auto* resultObject = JSC::constructEmptyObject(globalObject, globalObject->objectPrototype(), 2); + resultObject->putDirect(vm, clientData->builtinNames().streamPublicName(), ptrValue, 0); + resultObject->putDirect(vm, clientData->builtinNames().dataPublicName(), typeValue, 0); + + return JSC::JSValue::encode(resultObject); +} + void GlobalObject::addBuiltinGlobals(JSC::VM& vm) { m_builtinInternalFunctions.initialize(*this); @@ -2855,6 +2900,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm) extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.createWritableStreamFromInternalPrivateName(), JSFunction::create(vm, this, 1, String(), createWritableStreamFromInternal, ImplementationVisibility::Public), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly)); extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.fulfillModuleSyncPrivateName(), JSFunction::create(vm, this, 1, String(), functionFulfillModuleSync, ImplementationVisibility::Public), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly | PropertyAttribute::Function)); extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.commonJSSymbolPrivateName(), JSC::Symbol::create(vm, vm.symbolRegistry().symbolForKey(CommonJSSymbolKey)), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly)); + extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.directPrivateName(), JSFunction::create(vm, this, 1, String(), functionGetDirectStreamDetails, ImplementationVisibility::Public), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly | PropertyAttribute::Function)); this->addStaticGlobals(extraStaticGlobals.data(), extraStaticGlobals.size()); @@ -3262,6 +3308,7 @@ void GlobalObject::visitChildrenImpl(JSCell* cell, Visitor& visitor) thisObject->m_subtleCryptoObject.visit(visitor); thisObject->m_JSHTTPResponseController.visit(visitor); thisObject->m_callSiteStructure.visit(visitor); + thisObject->m_emitReadableNextTickFunction.visit(visitor); for (auto& barrier : thisObject->m_thenables) { visitor.append(barrier); diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h index 68fa452b2..9c37eb145 100644 --- a/src/bun.js/bindings/ZigGlobalObject.h +++ b/src/bun.js/bindings/ZigGlobalObject.h @@ -235,6 +235,8 @@ public: JSC::JSFunction* performMicrotaskFunction() { return m_performMicrotaskFunction.getInitializedOnMainThread(this); } JSC::JSFunction* performMicrotaskVariadicFunction() { return m_performMicrotaskVariadicFunction.getInitializedOnMainThread(this); } + JSC::JSFunction* emitReadableNextTickFunction() { return m_emitReadableNextTickFunction.getInitializedOnMainThread(this); } + JSC::JSObject* processObject() { return m_processObject.getInitializedOnMainThread(this); @@ -467,6 +469,7 @@ private: LazyProperty<JSGlobalObject, JSC::Structure> m_pendingVirtualModuleResultStructure; LazyProperty<JSGlobalObject, JSFunction> m_performMicrotaskFunction; LazyProperty<JSGlobalObject, JSFunction> m_performMicrotaskVariadicFunction; + LazyProperty<JSGlobalObject, JSFunction> m_emitReadableNextTickFunction; LazyProperty<JSGlobalObject, JSMap> m_lazyReadableStreamPrototypeMap; LazyProperty<JSGlobalObject, JSMap> m_requireMap; LazyProperty<JSGlobalObject, JSObject> m_encodeIntoObjectPrototype; diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index c374ff0ec..509138cf2 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -2473,6 +2473,9 @@ pub const JSValueReprInt = i64; pub const JSValue = enum(JSValueReprInt) { zero = 0, @"undefined" = @bitCast(JSValueReprInt, @as(i64, 0xa)), + @"null" = @bitCast(JSValueReprInt, @as(i64, 0x2)), + @"true" = @bitCast(JSValueReprInt, @as(i64, 0x4)), + @"false" = @bitCast(JSValueReprInt, @as(i64, 0x6)), _, pub const Type = JSValueReprInt; @@ -2997,18 +3000,21 @@ pub const JSValue = enum(JSValueReprInt) { return jsNumberWithType(@TypeOf(number), number); } - pub fn jsNull() JSValue { - return cppFn("jsNull", .{}); + pub inline fn jsNull() JSValue { + return JSValue.@"null"; } pub inline fn jsUndefined() JSValue { return JSValue.@"undefined"; } + pub inline fn jsBoolean(i: bool) JSValue { + const out = cppFn("jsBoolean", .{i}); + return out; + } + pub fn jsTDZValue() JSValue { return cppFn("jsTDZValue", .{}); } - pub fn jsBoolean(i: bool) JSValue { - return cppFn("jsBoolean", .{i}); - } + pub fn jsDoubleNumber(i: f64) JSValue { return cppFn("jsDoubleNumber", .{i}); } diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig index 2e2c5e556..933466b19 100644 --- a/src/bun.js/bindings/exports.zig +++ b/src/bun.js/bindings/exports.zig @@ -179,7 +179,7 @@ pub const NodePath = JSC.Node.Path; // Web Streams pub const JSReadableStreamBlob = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource; -pub const JSReadableStreamFile = JSC.WebCore.FileBlobLoader.Source.JSReadableStreamSource; +pub const JSReadableStreamFile = JSC.WebCore.FileReader.Source.JSReadableStreamSource; pub const JSReadableStreamBytes = JSC.WebCore.ByteStream.Source.JSReadableStreamSource; // Sinks diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index 8450e7390..a3f71adb3 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -826,7 +826,7 @@ ZIG_DECL JSC__JSValue ByteBlob__JSReadableStreamSource__load(JSC__JSGlobalObject #ifdef __cplusplus -ZIG_DECL JSC__JSValue FileBlobLoader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0); +ZIG_DECL JSC__JSValue FileReader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0); #endif diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js index 10a538333..bc401cc37 100644 --- a/src/bun.js/child_process.exports.js +++ b/src/bun.js/child_process.exports.js @@ -118,7 +118,7 @@ export function spawn(file, args, options) { } timeoutId = null; } - }, options.timeout); + }); child.once("exit", () => { if (timeoutId) { @@ -852,10 +852,6 @@ export class ChildProcess extends EventEmitter { spawnfile; spawnargs; pid; - stdin; - stdout; - stderr; - stdio; channel; get killed() { @@ -875,8 +871,8 @@ export class ChildProcess extends EventEmitter { this.exitCode = this.#handle.exitCode; } - if (this.stdin) { - this.stdin.destroy(); + if (this.#stdin) { + this.#stdin.destroy(); } if (this.#handle) { @@ -920,35 +916,73 @@ export class ChildProcess extends EventEmitter { this.#exited = true; } - #getBunSpawnIo(stdio, options) { - const result = [null]; - switch (stdio[0]) { - case "pipe": - result[0] = new WrappedFileSink(this.#handle.stdin); - break; - case "inherit": - result[0] = process.stdin; - break; - default: - result[0] = null; + #getBunSpawnIo(i, encoding) { + const io = this.#stdioOptions[i]; + switch (i) { + case 0: { + switch (io) { + case "pipe": + return new WrappedFileSink(this.#handle.stdin); + case "inherit": + return process.stdin || null; + default: + return null; + } + } + case 2: + case 1: { + switch (io) { + case "pipe": + return ReadableFromWeb(this.#handle[fdToStdioName(i)], { + encoding, + }); + break; + case "inherit": + return process[fdToStdioName(i)] || null; + break; + default: + return null; + } break; - } - let i = 1; - for (; i < stdio.length; i++) { - switch (stdio[i]) { - case "pipe": - result[i] = ReadableFromWeb(this.#handle[fdToStdioName(i)], { - encoding: options.encoding || undefined, - }); - break; - case "inherit": - result[i] = process[fdToStdioName(i)]; - break; - default: - result[i] = null; } } - return result; + } + + #stdin; + #stdout; + #stderr; + #stdioObject; + #encoding; + #stdioOptions; + + #createStdioObject() { + return Object.create(null, { + 0: { + get: () => this.stdin, + }, + 1: { + get: () => this.stdout, + }, + 2: { + get: () => this.stderr, + }, + }); + } + + get stdin() { + return (this.#stdin ??= this.#getBunSpawnIo(0, this.#encoding)); + } + + get stdout() { + return (this.#stdout ??= this.#getBunSpawnIo(1, this.#encoding)); + } + + get stderr() { + return (this.#stderr ??= this.#getBunSpawnIo(2, this.#encoding)); + } + + get stdio() { + return (this.#stdioObject ??= this.#createStdioObject()); } spawn(options) { @@ -974,20 +1008,22 @@ export class ChildProcess extends EventEmitter { // } validateString(options.file, "options.file"); - this.spawnfile = options.file; + var file; + file = this.spawnfile = options.file; + var spawnargs; if (options.args == null) { - this.spawnargs = []; + spawnargs = this.spawnargs = []; } else { validateArray(options.args, "options.args"); - this.spawnargs = options.args; + spawnargs = this.spawnargs = options.args; } - const stdio = options.stdio || "pipe"; + const stdio = options.stdio || ["pipe", "pipe", "pipe"]; const bunStdio = getBunStdioFromOptions(stdio); this.#handle = Bun.spawn({ - cmd: [options.file, ...this.spawnargs], + cmd: spawnargs, stdin: bunStdio[0], stdout: bunStdio[1], stderr: bunStdio[2], @@ -996,16 +1032,12 @@ export class ChildProcess extends EventEmitter { onExit: this.#handleOnExit.bind(this), }); this.#handleExited = this.#handle.exited; - - this.stdio = this.#getBunSpawnIo(bunStdio, options); - this.stdin = this.stdio[0]; - this.stdout = this.stdio[1]; - this.stderr = this.stdio[2]; + this.#encoding = options.encoding || undefined; + this.#stdioOptions = bunStdio; + this.pid = this.#handle.pid; process.nextTick(onSpawnNT, this); - this.pid = this.#handle.pid; - // If no `stdio` option was given - use default // let stdio = options.stdio || "pipe"; // TODO: reset default // let stdio = options.stdio || ["pipe", "pipe", "pipe"]; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 23bcf542f..47b54e8ca 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -1801,6 +1801,7 @@ pub const VirtualMachine = struct { "url", "info", "pkg", + "errors", }; if (error_instance != .zero and error_instance.isCell() and error_instance.jsType().canGet()) { diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig index dd961b510..32788cedc 100644 --- a/src/bun.js/node/syscall.zig +++ b/src/bun.js/node/syscall.zig @@ -16,6 +16,8 @@ const C = @import("../../global.zig").C; const linux = os.linux; const Maybe = JSC.Maybe; +const log = bun.Output.scoped(.SYS, false); + // On Linux AARCh64, zig is missing stat & lstat syscalls const use_libc = (Environment.isLinux and Environment.isAarch64) or Environment.isMac; pub const system = if (Environment.isLinux) linux else @import("io").darwin; @@ -180,6 +182,7 @@ pub fn getErrno(rc: anytype) std.os.E { pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) Maybe(JSC.Node.FileDescriptor) { while (true) { const rc = Syscall.system.open(file_path, flags, perm); + log("open({s}): {d}", .{ file_path, rc }); return switch (Syscall.getErrno(rc)) { .SUCCESS => .{ .result = @intCast(JSC.Node.FileDescriptor, rc) }, .INTR => continue, @@ -288,16 +291,20 @@ pub fn read(fd: os.fd_t, buf: []u8) Maybe(usize) { if (comptime Environment.isMac) { const rc = system.@"read$NOCANCEL"(fd, buf.ptr, adjusted_len); if (Maybe(usize).errnoSys(rc, .read)) |err| { + log("read error: {d} ({d} bytes, {d} fd)", .{ err.err.errno, buf.len, fd }); return err; } + log("read: {d} bytes, {d} fd", .{ rc, fd }); return Maybe(usize){ .result = @intCast(usize, rc) }; } else { while (true) { const rc = sys.read(fd, buf.ptr, adjusted_len); if (Maybe(usize).errnoSys(rc, .read)) |err| { if (err.getErrno() == .INTR) continue; + log("read error: {d} ({d} bytes, {d} fd)", .{ err.err.errno, buf.len, fd }); return err; } + log("read: {d} bytes, {d} fd", .{ rc, fd }); return Maybe(usize){ .result = @intCast(usize, rc) }; } } diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 24e34447d..15d32adb5 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -1,6 +1,6 @@ // "readable-stream" npm package // just transpiled -var { isPromise } = import.meta.primordials; +var { isPromise, isCallable, direct } = import.meta.primordials; var __create = Object.create; var __defProp = Object.defineProperty; @@ -2145,7 +2145,7 @@ var require_destroy = __commonJS({ } if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); else if (err) { - err.stack; + Error.captureStackTrace(err); if (w && !w.errored) { w.errored = err; } @@ -2605,56 +2605,96 @@ var require_readable = __commonJS({ class ReadableFromWeb extends Readable { #reader; #closed; + #pendingChunks; + #stream; - constructor(options) { - const { objectMode, highWaterMark, encoding, signal, reader } = options; + constructor(options, stream) { + const { objectMode, highWaterMark, encoding, signal } = options; super({ objectMode, highWaterMark, encoding, signal, }); + this.#pendingChunks = []; + this.#reader = undefined; + this.#stream = stream; + this.#closed = false; + } - this.#reader = reader; - this.#reader.closed - .then(() => { - this.#closed = true; - }) - .catch((error) => { - this.#closed = true; - destroy(this, error); - }); + #drainPending() { + var pendingChunks = this.#pendingChunks, + pendingChunksI = 0, + pendingChunksCount = pendingChunks.length; + + for (; pendingChunksI < pendingChunksCount; pendingChunksI++) { + const chunk = pendingChunks[pendingChunksI]; + pendingChunks[pendingChunksI] = undefined; + if (!this.push(chunk, undefined)) { + this.#pendingChunks = pendingChunks.slice(pendingChunksI + 1); + return true; + } + } + + if (pendingChunksCount > 0) { + this.#pendingChunks = []; + } + + return false; + } + + #handleDone(reader) { + reader.releaseLock(); + this.#reader = undefined; + this.#closed = true; + this.push(null); + return; } async _read() { + var stream = this.#stream, + reader = this.#reader; + if (stream) { + reader = this.#reader = stream.getReader(); + this.#stream = undefined; + } else if (this.#drainPending()) { + return; + } + var deferredError; try { - var done, value; - const firstResult = this.#reader.readMany(); + do { + var done = false, + value; + const firstResult = reader.readMany(); - if (isPromise(firstResult)) { - const result = await firstResult; - done = result.done; - value = result.value; - } else { - done = firstResult.done; - value = firstResult.value; - } + if (isPromise(firstResult)) { + ({ done, value } = await firstResult); + if (this.#closed) { + this.#pendingChunks.push(...value); + return; + } + } else { + ({ done, value } = firstResult); + } - if (done) { - this.push(null); - return; - } + if (done) { + this.#handleDone(reader); + return; + } - if (!value) - throw new Error( - `Invalid value from ReadableStream reader: ${value}`, - ); - if (ArrayIsArray(value)) { - this.push(...value); - } else { - this.push(value); - } + if (!this.push(value[0])) { + this.#pendingChunks = value.slice(1); + return; + } + + for (let i = 1, count = value.length; i < count; i++) { + if (!this.push(value[i])) { + this.#pendingChunks = value.slice(i + 1); + return; + } + } + } while (this._readableState.flowing && !this.#closed); } catch (e) { deferredError = e; } finally { @@ -2664,8 +2704,15 @@ var require_readable = __commonJS({ _destroy(error, callback) { if (!this.#closed) { - this.#reader.releaseLock(); - this.#reader.cancel(error).then(done).catch(done); + var reader = this.#reader; + if (reader) { + this.#reader = undefined; + reader.cancel(error).finally(() => { + this.#closed = true; + callback(error); + }); + } + return; } try { @@ -2709,16 +2756,24 @@ var require_readable = __commonJS({ throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding"); validateBoolean(objectMode, "options.objectMode"); - const reader = readableStream.getReader(); - const readable = new ReadableFromWeb({ - highWaterMark, - encoding, - objectMode, - signal, - reader, - }); + const nativeStream = getNativeReadableStream( + Readable, + readableStream, + options, + ); - return readable; + return ( + nativeStream || + new ReadableFromWeb( + { + highWaterMark, + encoding, + objectMode, + signal, + }, + readableStream, + ) + ); } module.exports = Readable; @@ -2726,8 +2781,15 @@ var require_readable = __commonJS({ var { addAbortSignal } = require_add_abort_signal(); var eos = require_end_of_stream(); var debug = (name) => {}; - const { maybeReadMore, resume, emitReadable, onEofChunk } = - globalThis[Symbol.for("Bun.lazy")]("bun:stream"); + const { + maybeReadMore: _maybeReadMore, + resume, + emitReadable, + onEofChunk, + } = globalThis[Symbol.for("Bun.lazy")]("bun:stream"); + function maybeReadMore(stream, state) { + process.nextTick(_maybeReadMore, stream, state); + } var destroyImpl = require_destroy(); var { aggregateTwoErrors, @@ -2984,10 +3046,17 @@ var require_readable = __commonJS({ // Call internal read method try { - const result = this._read(state.highWaterMark); - const then = result?.then; - if (then && typeof then === "function") { - then.call(result, nop, function (err) { + var result = this._read(state.highWaterMark); + if (isPromise(result)) { + const peeked = Bun.peek(result); + if (peeked !== result) { + result = peeked; + } + } + + var then = result?.then; + if (then && isCallable(then)) { + result.then(nop, function (err) { errorOrDestroy(this, err); }); } @@ -3823,7 +3892,7 @@ var require_writable = __commonJS({ state.length -= state.writelen; state.writelen = 0; if (er) { - er.stack; + Error.captureStackTrace(er); if (!state.errored) { state.errored = er; } @@ -5603,6 +5672,217 @@ var require_ours = __commonJS({ }, }); +/** + * Bun native stream wrapper + * + * This glue code lets us avoid using ReadableStreams to wrap Bun internal streams + * + */ +function createNativeStream(nativeType, Readable) { + var [pull, start, cancel, setClose, deinit, updateRef] = + globalThis[Symbol.for("Bun.lazy")](nativeType); + + var closer = [false]; + var handleNumberResult = function (nativeReadable, result, view, isClosed) { + if (result > 0) { + const slice = view.subarray(0, result); + const remainder = view.subarray(result); + if (remainder.byteLength > 0) { + nativeReadable.push(slice); + } + + if (isClosed) { + nativeReadable.push(null); + } + + return remainder.byteLength > 0 ? remainder : undefined; + } + + if (isClosed) { + nativeReadable.push(null); + } + }; + + var handleArrayBufferViewResult = function ( + nativeReadable, + result, + view, + isClosed, + ) { + if (result.byteLength > 0) { + nativeReadable.push(result); + } + + if (isClosed) { + nativeReadable.push(null); + } + + return view; + }; + + var handleResult = function (nativeReadable, result, view, isClosed) { + if (typeof result === "number") { + return handleNumberResult(nativeReadable, result, view, isClosed); + } else if (typeof result === "boolean") { + nativeReadable.push(null); + return view?.byteLength ?? 0 > 0 ? view : undefined; + } else if (ArrayBuffer.isView(result)) { + return handleArrayBufferViewResult( + nativeReadable, + result, + view, + isClosed, + ); + } else { + throw new Error("Invalid result from pull"); + } + }; + var NativeReadable = class NativeReadable extends Readable { + #ptr; + #refCount = 1; + #constructed = false; + #remainingChunk = undefined; + #highWaterMark; + #pendingRead = false; + constructor(ptr, options = {}) { + super(options); + if (typeof options.highWaterMark === "number") { + this.#highWaterMark = options.highWaterMark; + } else { + this.#highWaterMark = 256 * 1024; + } + this.#ptr = ptr; + this.#constructed = false; + this.#remainingChunk = undefined; + this.#pendingRead = false; + } + + _read(highWaterMark) { + if (this.#pendingRead) return; + + var ptr = this.#ptr; + if (ptr === 0) { + this.push(null); + return; + } + + if (!this.#constructed) { + this.#constructed = true; + start(ptr, this.#highWaterMark); + } + + return this.#internalRead(this.#getRemainingChunk(), ptr); + } + + #getRemainingChunk() { + var chunk = this.#remainingChunk; + var highWaterMark = this.#highWaterMark; + if ((chunk?.byteLength ?? 0 < 512) && highWaterMark > 512) { + this.#remainingChunk = chunk = new Buffer(this.#highWaterMark); + } + + return chunk; + } + + #internalRead(view, ptr) { + closer[0] = false; + var result = pull(ptr, view, closer); + if (isPromise(result)) { + this.#pendingRead = true; + var originalFlowing = this._readableState.flowing; + this._readableState.flowing = false; + return result.then( + (result) => { + this._readableState.flowing = originalFlowing; + this.#pendingRead = false; + this.#remainingChunk = handleResult(this, result, view, closer[0]); + }, + (reason) => errorOrDestroy(this, reason), + ); + } else { + this.#remainingChunk = handleResult(this, result, view, closer[0]); + } + } + + _construct(cb) { + this._readableState.constructed = true; + cb(); + } + _destroy(error, callback) { + var ptr = this.#ptr; + if (ptr === 0) { + callback(error); + return; + } + + this.#ptr = 0; + if (updateRef) { + updateRef(ptr, false); + } + process.nextTick(deinit, ptr); + cancel(ptr, error); + callback(error); + } + + ref() { + var ptr = this.#ptr; + if (ptr === 0) return; + if (this.#refCount++ === 0) { + updateRef(ptr, true); + } + } + + unref() { + var ptr = this.#ptr; + if (ptr === 0) return; + if (this.#refCount-- === 1) { + updateRef(ptr, false); + } + } + }; + + if (!updateRef) { + NativeReadable.prototype.ref = undefined; + NativeReadable.prototype.unref = undefined; + } + + return NativeReadable; +} + +var nativeReadableStreamPrototypes = { + 0: undefined, + 1: undefined, + 2: undefined, + 3: undefined, + 4: undefined, + 5: undefined, +}; +function getNativeReadableStreamPrototype(nativeType, Readable) { + return (nativeReadableStreamPrototypes[nativeType] ||= createNativeStream( + nativeType, + Readable, + )); +} + +function getNativeReadableStream(Readable, stream, options) { + if ( + !(stream && typeof stream === "object" && stream instanceof ReadableStream) + ) { + return undefined; + } + + const native = direct(stream); + if (!native) { + return undefined; + } + const { stream: ptr, data: type } = native; + + const NativeReadable = getNativeReadableStreamPrototype(type, Readable); + + return new NativeReadable(ptr, options); +} +/** --- Bun native stream wrapper --- */ + var stream_exports, wrapper; stream_exports = require_ours(); wrapper = diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 81e3c6a51..2f4ee919c 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -477,9 +477,6 @@ pub const Fetch = struct { const headers_string = "headers"; const method_string = "method"; - var fetch_body_string: MutableString = undefined; - var fetch_body_string_loaded = false; - const JSType = js.JSType; pub const fetch_error_no_args = "fetch() expects a string but received no arguments."; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 4ec4f9f9c..7ec20b046 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -160,7 +160,7 @@ pub const ReadableStream = struct { /// ReadableByteStreamController /// but with a FileLoader /// we can skip the FileLoader and just use the underlying File - File: *FileBlobLoader, + File: *FileReader, /// This is a direct readable stream /// That means we can turn it into whatever we want @@ -212,7 +212,7 @@ pub const ReadableStream = struct { .File => ReadableStream{ .value = value, .ptr = .{ - .File = ptr.asPtr(FileBlobLoader), + .File = ptr.asPtr(FileReader), }, }, @@ -266,7 +266,7 @@ pub const ReadableStream = struct { return reader.toJS(globalThis); }, .file => { - var reader = bun.default_allocator.create(FileBlobLoader.Source) catch unreachable; + var reader = bun.default_allocator.create(FileReader.Source) catch unreachable; reader.* = .{ .context = undefined, }; @@ -1141,6 +1141,10 @@ pub const FileSink = struct { } pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable { + return flushMaybePollWithSize(this, std.math.maxInt(usize)); + } + + pub fn flushMaybePollWithSize(this: *FileSink, writable_size: usize) StreamResult.Writable { std.debug.assert(this.fd != JSC.Node.invalid_fd); var total: usize = this.written; @@ -1168,12 +1172,23 @@ pub const FileSink = struct { } } - const max_to_write = if (is_fifo) this.max_write_size else remain.len; + const max_to_write = + if (is_fifo) + brk: { + if (comptime Environment.isMac) { + break :brk if (writable_size == std.math.maxInt(usize)) + max_fifo_size + else + writable_size; + } + + break :brk this.max_write_size; + } else remain.len; while (remain.len > 0) { const write_buf = remain[0..@minimum(remain.len, max_to_write)]; - log("Write {d} bytes (fd: {d})", .{ write_buf.len, fd }); + log("Write {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ write_buf.len, fd, this.head, remain.len, total }); const res = JSC.Node.Syscall.write(fd, write_buf); if (res == .err) { const retry = @@ -1202,7 +1217,7 @@ pub const FileSink = struct { remain = remain[res.result..]; total += res.result; - log("Wrote {d} bytes (fd: {d})", .{ res.result, fd }); + log("Wrote {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ res.result, fd, this.head, remain.len, total }); if (res.result == 0) { if (this.poll_ref) |poll| { @@ -1341,7 +1356,7 @@ pub const FileSink = struct { return JSSink.createObject(globalThis, this); } - pub fn ready(this: *FileSink, _: i64) void { + pub fn ready(this: *FileSink, writable: i64) void { var remain = this.buffer.slice(); const pending = remain[@minimum(this.head, remain.len)..].len; if (pending == 0) { @@ -1352,7 +1367,11 @@ pub const FileSink = struct { return; } - _ = this.flushMaybePoll(); + if (comptime Environment.isMac) { + _ = this.flushMaybePollWithSize(@intCast(usize, @maximum(writable, 0))); + } else { + _ = this.flushMaybePollWithSize(std.math.maxInt(usize)); + } } pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { @@ -1429,7 +1448,7 @@ pub const FileSink = struct { fn isPending(this: *const FileSink) bool { var poll_ref = this.poll_ref orelse return false; - return poll_ref.isActive(); + return poll_ref.isRegistered(); } pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { @@ -2592,6 +2611,7 @@ pub fn ReadableStreamSource( comptime onPull: anytype, comptime onCancel: fn (this: *Context) void, comptime deinit: fn (this: *Context) void, + comptime setRefUnrefFn: ?fn (this: *Context, enable: bool) void, ) type { return struct { context: Context, @@ -2610,6 +2630,24 @@ pub fn ReadableStreamSource( return onPull(&this.context, buf, JSValue.zero); } + pub fn ref(this: *This) void { + if (setRefUnrefFn) |setRefUnref| { + setRefUnref(&this.context, true); + } + } + + pub fn unref(this: *This) void { + if (setRefUnrefFn) |setRefUnref| { + setRefUnref(&this.context, false); + } + } + + pub fn setRef(this: *This, value: bool) void { + if (setRefUnrefFn) |setRefUnref| { + setRefUnref(&this.context, value); + } + } + pub fn start( this: *This, ) StreamStart { @@ -2665,6 +2703,8 @@ pub fn ReadableStreamSource( return ReadableStream.fromNative(globalThis, Context.tag, this); } + const supports_ref = setRefUnrefFn != null; + pub const JSReadableStreamSource = struct { pub const shim = JSC.Shimmer(std.mem.span(name_), "JSReadableStreamSource", @This()); pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamSource", .{std.mem.span(name_)}); @@ -2724,6 +2764,13 @@ pub fn ReadableStreamSource( return JSC.JSValue.jsUndefined(); } + pub fn updateRef(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); + const ref_or_unref = callFrame.argument(1).asBoolean(); + this.setRef(ref_or_unref); + return JSC.JSValue.jsUndefined(); + } + fn onClose(ptr: *anyopaque) void { var this = bun.cast(*ReadableStreamSourceType, ptr); _ = this.close_jsvalue.call(this.globalThis, &.{}); @@ -2738,23 +2785,17 @@ pub fn ReadableStreamSource( pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue { if (comptime JSC.is_bindgen) unreachable; - if (comptime Environment.allow_assert) { - // this should be cached per globals object - const OnlyOnce = struct { - pub threadlocal var last_globals: ?*JSGlobalObject = null; - }; - if (OnlyOnce.last_globals) |last_globals| { - std.debug.assert(last_globals != globalThis); - } - OnlyOnce.last_globals = globalThis; - } - + // This is used also in Node.js streams return JSC.JSArray.from(globalThis, &.{ - JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.pull, true), - JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.start, true), - JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.cancel, true), - JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.setClose, true), - JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.deinit, true), + JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.pull, true), + JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.start, true), + JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.cancel, true), + JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.setClose, true), + JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.deinit, true), + if (supports_ref) + JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.updateRef, true) + else + JSC.JSValue.jsNull(), }); } @@ -2770,6 +2811,7 @@ pub fn ReadableStreamSource( _ = JSReadableStreamSource.cancel; _ = JSReadableStreamSource.setClose; _ = JSReadableStreamSource.load; + _ = JSReadableStreamSource.deinit; } } }; @@ -2847,7 +2889,7 @@ pub const ByteBlobLoader = struct { bun.default_allocator.destroy(this); } - pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit); + pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit, null); }; pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; @@ -3127,10 +3169,11 @@ pub const ByteStream = struct { bun.default_allocator.destroy(this.parent()); } - pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit); + pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit, null); }; -pub const FileBlobLoader = struct { +/// **Not** the Web "FileReader" API +pub const FileReader = struct { buf: []u8 = &[_]u8{}, view: JSC.Strong = .{}, fd: JSC.Node.FileDescriptor = 0, @@ -3169,17 +3212,15 @@ pub const FileBlobLoader = struct { pub usingnamespace NewReadyWatcher(@This(), .readable, ready); - pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject { + pub inline fn globalThis(this: *FileReader) *JSC.JSGlobalObject { return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis; } - const FileReader = @This(); - const run_on_different_thread_size = bun.huge_allocator_threshold; pub const tag = ReadableStream.Tag.File; - pub fn setupWithPoll(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void { + pub fn setupWithPoll(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void { store.ref(); this.* = .{ .loop = JSC.VirtualMachine.vm.eventLoop(), @@ -3193,11 +3234,11 @@ pub const FileBlobLoader = struct { } } - pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void { + pub fn setup(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType) void { this.setupWithPoll(store, chunk_size, null); } - pub fn finish(this: *FileBlobLoader) void { + pub fn finish(this: *FileReader) void { if (this.finished) return; this.finished = true; this.close_on_eof = true; @@ -3227,12 +3268,12 @@ pub const FileBlobLoader = struct { concurrent_task: JSC.ConcurrentTask = .{}, pub fn taskCallback(task: *NetworkThread.Task) void { - var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task)); + var this = @fieldParentPtr(FileReader, "concurrent", @fieldParentPtr(Concurrent, "task", task)); var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this}); } - pub fn onRead(this: *FileBlobLoader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void { + pub fn onRead(this: *FileReader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void { this.concurrent.read = @truncate(Blob.SizeType, result catch |err| { if (@hasField(HTTPClient.NetworkThread.Completion, "result")) { this.pending.result = .{ @@ -3258,7 +3299,7 @@ pub const FileBlobLoader = struct { resume this.concurrent.read_frame; } - pub fn scheduleRead(this: *FileBlobLoader) void { + pub fn scheduleRead(this: *FileReader) void { if (comptime Environment.isMac) { var remaining = this.buf[this.concurrent.read..]; @@ -3296,7 +3337,7 @@ pub const FileBlobLoader = struct { } AsyncIO.global.read( - *FileBlobLoader, + *FileReader, this, onRead, &this.concurrent.completion, @@ -3316,7 +3357,7 @@ pub const FileBlobLoader = struct { } pub fn onJSThread(task_ctx: *anyopaque) void { - var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx); + var this: *FileReader = bun.cast(*FileReader, task_ctx); const view = this.view.get().?; defer this.view.clear(); @@ -3352,12 +3393,12 @@ pub const FileBlobLoader = struct { } } - pub fn scheduleMainThreadTask(this: *FileBlobLoader) void { + pub fn scheduleMainThreadTask(this: *FileReader) void { this.concurrent.main_thread_task.ctx = this; this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task)); } - fn runAsync(this: *FileBlobLoader) void { + fn runAsync(this: *FileReader) void { this.concurrent.read = 0; Concurrent.scheduleRead(this); @@ -3382,7 +3423,7 @@ pub const FileBlobLoader = struct { const default_fifo_chunk_size = 64 * 1024; const default_file_chunk_size = 1024 * 1024 * 2; - pub fn onStart(this: *FileBlobLoader) StreamStart { + pub fn onStart(this: *FileReader) StreamStart { var file = &this.store.data.file; std.debug.assert(!this.started); this.started = true; @@ -3480,7 +3521,7 @@ pub const FileBlobLoader = struct { return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) }; } - fn calculateChunkSize(this: *FileBlobLoader, available_to_read: usize) usize { + fn calculateChunkSize(this: *FileReader, available_to_read: usize) usize { const file = &this.store.data.file; const chunk_size: usize = if (this.user_chunk_size > 0) @@ -3496,7 +3537,7 @@ pub const FileBlobLoader = struct { @minimum(available_to_read, chunk_size); } - pub fn onPullInto(this: *FileBlobLoader, buffer: []u8, view: JSC.JSValue) StreamResult { + pub fn onPullInto(this: *FileReader, buffer: []u8, view: JSC.JSValue) StreamResult { const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); std.debug.assert(this.started); @@ -3531,14 +3572,14 @@ pub const FileBlobLoader = struct { return this.read(buffer, view, null); } - fn maybeAutoClose(this: *FileBlobLoader) void { + fn maybeAutoClose(this: *FileReader) void { if (this.auto_close) { _ = Syscall.close(this.fd); this.auto_close = false; } } - fn handleReadChunk(this: *FileBlobLoader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult { + fn handleReadChunk(this: *FileReader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult { std.debug.assert(this.started); this.total_read += @intCast(Blob.SizeType, result); @@ -3573,7 +3614,7 @@ pub const FileBlobLoader = struct { } pub fn read( - this: *FileBlobLoader, + this: *FileReader, read_buf: []u8, view: JSC.JSValue, /// provided via kqueue(), only on macOS @@ -3665,6 +3706,10 @@ pub const FileBlobLoader = struct { } if (this.poll_ref) |poll| { + if ((available_to_read orelse 0) > 0) { + poll.flags.insert(.readable); + } + const is_readable = poll.isReadable(); if (!is_readable and poll.isEOF()) { if (poll.isHUP()) { @@ -3676,6 +3721,11 @@ pub const FileBlobLoader = struct { this.finalize(); return .{ .done = {} }; } else if (!is_readable and poll.isRegistered()) { + if (this.finished) { + this.finalize(); + return .{ .done = {} }; + } + if (view != .zero) { this.view.set(this.globalThis(), view); this.buf = read_buf; @@ -3753,6 +3803,12 @@ pub const FileBlobLoader = struct { if (result < buf_to_use.len) { // do not insert .eof here poll.flags.remove(.readable); + + if (result > 0 and !poll.flags.contains(.hup) and !this.finished) { + // partial read, but not close. be sure to ask for more data + if (!this.isWatching()) + this.watch(fd); + } } } } @@ -3770,7 +3826,7 @@ pub const FileBlobLoader = struct { this.view.set(this.globalThis(), view); this.buf = read_buf; if (!this.isWatching()) - this.watch(this.fd); + this.watch(fd); this.poll_ref.?.flags.remove(.readable); return .{ @@ -3784,23 +3840,21 @@ pub const FileBlobLoader = struct { } /// Called from Poller - pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void { - std.debug.assert(this.started); - + pub fn ready(this: *FileReader, sizeOrOffset: i64) void { const view = this.view.get() orelse .zero; defer this.view.clear(); - var available_to_read: usize = std.math.maxInt(usize); - if (comptime Environment.isMac) { + const available_to_read: usize = if (comptime Environment.isMac) brk: { if (this.isFIFO()) { - available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); + break :brk @intCast(usize, @maximum(sizeOrOffset, 0)); } else if (std.os.S.ISREG(this.mode)) { // Returns when the file pointer is not at the end of // file. data contains the offset from current position // to end of file, and may be negative. - available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); + break :brk @intCast(usize, @maximum(sizeOrOffset, 0)); } - } + break :brk std.math.maxInt(usize); + } else std.math.maxInt(usize); if (this.finalized and this.scheduled_count == 0) { if (this.pending.state == .pending) { // should never be reached @@ -3844,7 +3898,7 @@ pub const FileBlobLoader = struct { this.pending.run(); } - pub fn finalize(this: *FileBlobLoader) void { + pub fn finalize(this: *FileReader) void { if (this.finalized) return; @@ -3875,23 +3929,33 @@ pub const FileBlobLoader = struct { this.store.deref(); } - pub fn onCancel(this: *FileBlobLoader) void { + pub fn onCancel(this: *FileReader) void { this.cancelled = true; this.deinit(); } - pub fn deinit(this: *FileBlobLoader) void { + pub fn deinit(this: *FileReader) void { this.finalize(); if (this.scheduled_count == 0 and this.pending.state == .pending) { this.destroy(); } } - pub fn destroy(this: *FileBlobLoader) void { + pub fn destroy(this: *FileReader) void { bun.default_allocator.destroy(this); } - pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPullInto, onCancel, deinit); + pub fn setRefOrUnref(this: *FileReader, value: bool) void { + if (this.poll_ref) |poll| { + if (value) { + poll.enableKeepingProcessAlive(this.globalThis().bunVM()); + } else { + poll.disableKeepingProcessAlive(this.globalThis().bunVM()); + } + } + } + + pub const Source = ReadableStreamSource(@This(), "FileReader", onStart, onPullInto, onCancel, deinit, setRefOrUnref); }; pub fn NewReadyWatcher( @@ -3943,7 +4007,7 @@ pub fn NewReadyWatcher( pub fn isWatching(this: *const Context) bool { if (this.poll_ref) |poll| { - return poll.flags.contains(flag.poll()); + return poll.flags.contains(flag.poll()) and !poll.flags.contains(.needs_rearm); } return false; |