aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/api/bun/socket.zig16
-rw-r--r--src/bun.js/api/bun/subprocess.zig22
-rw-r--r--src/bun.js/api/html_rewriter.zig2
-rw-r--r--src/bun.js/base.zig120
-rw-r--r--src/bun.js/bindings/JSReadableHelper.cpp68
-rw-r--r--src/bun.js/bindings/JSReadableHelper.h2
-rw-r--r--src/bun.js/bindings/JSSink.cpp2
-rw-r--r--src/bun.js/bindings/JSSink.h2
-rw-r--r--src/bun.js/bindings/JSSinkLookupTable.h2
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp49
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.h3
-rw-r--r--src/bun.js/bindings/bindings.zig16
-rw-r--r--src/bun.js/bindings/exports.zig2
-rw-r--r--src/bun.js/bindings/headers.h2
-rw-r--r--src/bun.js/child_process.exports.js122
-rw-r--r--src/bun.js/javascript.zig1
-rw-r--r--src/bun.js/node/syscall.zig7
-rw-r--r--src/bun.js/streams.exports.js388
-rw-r--r--src/bun.js/webcore/response.zig3
-rw-r--r--src/bun.js/webcore/streams.zig188
20 files changed, 724 insertions, 293 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig
index 77b4a266f..5bc1629c0 100644
--- a/src/bun.js/api/bun/socket.zig
+++ b/src/bun.js/api/bun/socket.zig
@@ -112,12 +112,15 @@ const Handlers = struct {
pub fn callErrorHandler(this: *Handlers, thisValue: JSValue, err: []const JSValue) bool {
const onError = this.onError;
if (onError == .zero) {
+ if (err.len > 0)
+ this.vm.onUnhandledError(this.globalObject, err[0]);
+
return false;
}
const result = onError.callWithThis(this.globalObject, thisValue, err);
if (!result.isEmptyOrUndefinedOrNull() and result.isAnyError(this.globalObject)) {
- this.vm.runErrorHandler(result, null);
+ this.vm.onUnhandledError(this.globalObject, result);
}
return true;
@@ -827,8 +830,6 @@ fn NewSocket(comptime ssl: bool) type {
if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) {
return;
}
-
- handlers.vm.runErrorHandler(result, null);
}
}
pub fn onTimeout(
@@ -857,8 +858,6 @@ fn NewSocket(comptime ssl: bool) type {
if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) {
return;
}
-
- handlers.vm.runErrorHandler(result, null);
}
}
pub fn onConnectError(this: *This, socket: Socket, errno: c_int) void {
@@ -948,7 +947,6 @@ fn NewSocket(comptime ssl: bool) type {
return;
}
- handlers.vm.runErrorHandler(result, null);
return;
}
}
@@ -984,8 +982,6 @@ fn NewSocket(comptime ssl: bool) type {
if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) {
return;
}
-
- handlers.vm.runErrorHandler(result, null);
}
}
@@ -1015,8 +1011,6 @@ fn NewSocket(comptime ssl: bool) type {
if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) {
return;
}
-
- handlers.vm.runErrorHandler(result, null);
}
}
@@ -1046,8 +1040,6 @@ fn NewSocket(comptime ssl: bool) type {
if (handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, result })) {
return;
}
-
- handlers.vm.runErrorHandler(result, null);
}
}
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index dcfa88a40..2d8127901 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -30,7 +30,6 @@ pub const Subprocess = struct {
stderr: Readable,
killed: bool = false,
- reffer: JSC.Ref = JSC.Ref.init(),
poll_ref: ?*JSC.FilePoll = null,
exit_promise: JSC.Strong = .{},
@@ -55,13 +54,14 @@ pub const Subprocess = struct {
globalThis: *JSC.JSGlobalObject,
pub fn ref(this: *Subprocess) void {
- this.reffer.ref(this.globalThis.bunVM());
- if (this.poll_ref) |poll| poll.ref(this.globalThis.bunVM());
+ var vm = this.globalThis.bunVM();
+ if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm);
}
pub fn unref(this: *Subprocess) void {
this.this_jsvalue.clear();
- this.unrefWithoutGC(this.globalThis.bunVM());
+ var vm = this.globalThis.bunVM();
+ if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm);
}
pub fn constructor(
@@ -182,8 +182,6 @@ pub const Subprocess = struct {
.pipe => {
defer this.close();
- // TODO: handle when there's pending unread data in the pipe
- // For some reason, this currently hangs forever
if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) {
if (this.pipe.buffer.canRead())
this.pipe.buffer.readIfPossible(true);
@@ -1301,7 +1299,6 @@ pub const Subprocess = struct {
if (!sync) {
var vm = this.globalThis.bunVM();
- this.reffer.unref(vm);
// prevent duplicate notifications
if (this.poll_ref) |poll| {
@@ -1309,17 +1306,10 @@ pub const Subprocess = struct {
poll.deinitWithVM(vm);
}
- this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this);
- this.has_waitpid_task = true;
- vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task));
+ this.onExit();
}
}
- pub fn unrefWithoutGC(this: *Subprocess, vm: *JSC.VirtualMachine) void {
- if (this.poll_ref) |poll| poll.unref(vm);
- this.reffer.unref(vm);
- }
-
fn onExit(this: *Subprocess) void {
this.closePorts();
@@ -1348,7 +1338,7 @@ pub const Subprocess = struct {
);
if (result.isAnyError(this.globalThis)) {
- this.globalThis.bunVM().runErrorHandler(result, null);
+ this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
}
}
diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig
index eb602223b..9ab424491 100644
--- a/src/bun.js/api/html_rewriter.zig
+++ b/src/bun.js/api/html_rewriter.zig
@@ -850,7 +850,7 @@ fn HandlerCallback(
switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => unreachable,
JSC.JSPromise.Status.Rejected => {
- JavaScript.VirtualMachine.vm.runErrorHandler(promise.result(this.global.vm()), null);
+ JavaScript.VirtualMachine.vm.onUnhandledError(this.global, promise.result(this.global.vm()));
@field(zig_element, field_name) = null;
return false;
},
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 37eb3e626..deeab246a 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -3985,7 +3985,7 @@ pub const FilePoll = struct {
flags: Flags.Set = Flags.Set{},
owner: Owner = Deactivated.owner,
- const FileBlobLoader = JSC.WebCore.FileBlobLoader;
+ const FileReader = JSC.WebCore.FileReader;
const FileSink = JSC.WebCore.FileSink;
const Subprocess = JSC.Subprocess;
const BufferedInput = Subprocess.BufferedInput;
@@ -3995,7 +3995,7 @@ pub const FilePoll = struct {
};
pub const Owner = bun.TaggedPointerUnion(.{
- FileBlobLoader,
+ FileReader,
FileSink,
Subprocess,
BufferedInput,
@@ -4016,12 +4016,12 @@ pub const FilePoll = struct {
pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void {
poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*));
- poll.onUpdate(loop);
+ poll.onUpdate(loop, kqueue_event.data);
}
pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void {
poll.updateFlags(Flags.fromEpollEvent(epoll_event.*));
- poll.onUpdate(loop);
+ poll.onUpdate(loop, 0);
}
pub fn clearEvent(poll: *FilePoll, flag: Flags) void {
@@ -4072,16 +4072,16 @@ pub const FilePoll = struct {
return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process);
}
- pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void {
+ pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop, size_or_offset: i64) void {
if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) {
if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop);
poll.flags.insert(.needs_rearm);
}
var ptr = poll.owner;
switch (ptr.tag()) {
- @field(Owner.Tag, "FileBlobLoader") => {
- log("onUpdate: FileBlobLoader", .{});
- ptr.as(FileBlobLoader).onPoll(0, 0);
+ @field(Owner.Tag, "FileReader") => {
+ log("onUpdate: FileReader", .{});
+ ptr.as(FileReader).onPoll(size_or_offset, 0);
},
@field(Owner.Tag, "Subprocess") => {
log("onUpdate: Subprocess", .{});
@@ -4092,18 +4092,18 @@ pub const FilePoll = struct {
@field(Owner.Tag, "FileSink") => {
log("onUpdate: FileSink", .{});
var loader = ptr.as(JSC.WebCore.FileSink);
- loader.onPoll(0, 0);
+ loader.onPoll(size_or_offset, 0);
},
@field(Owner.Tag, "BufferedInput") => {
log("onUpdate: BufferedInput", .{});
var loader = ptr.as(JSC.Subprocess.BufferedInput);
- loader.onReady(0);
+ loader.onReady(size_or_offset);
},
@field(Owner.Tag, "BufferedOutput") => {
log("onUpdate: BufferedOutput", .{});
var loader = ptr.as(JSC.Subprocess.BufferedOutput);
- loader.ready(0);
+ loader.ready(size_or_offset);
},
else => {},
}
@@ -4196,34 +4196,45 @@ pub const FilePoll = struct {
const log = Output.scoped(.FilePoll, false);
pub inline fn isActive(this: *const FilePoll) bool {
- return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable);
+ return this.flags.contains(.has_incremented_poll_count);
}
/// Make calling ref() on this poll into a no-op.
- pub fn disable(this: *FilePoll) void {
- if (this.isRegistered()) {
- this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?);
- }
-
- this.unref();
+ pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (this.flags.contains(.disable))
+ return;
this.flags.insert(.disable);
+
+ vm.uws_event_loop.?.active -= @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count)));
+ }
+
+ pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (!this.flags.contains(.disable))
+ return;
+ this.flags.remove(.disable);
+
+ vm.uws_event_loop.?.active += @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count)));
+ }
+
+ pub fn canActivate(this: *const FilePoll) bool {
+ return !this.flags.contains(.has_incremented_poll_count);
}
/// Only intended to be used from EventLoop.Pollable
pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void {
std.debug.assert(this.flags.contains(.has_incremented_poll_count));
+ loop.num_polls -= @as(i32, @boolToInt(this.flags.contains(.has_incremented_poll_count)));
+ loop.active -= @as(u32, @boolToInt(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count)));
+
this.flags.remove(.has_incremented_poll_count);
- loop.num_polls -= 1;
- loop.active -= 1;
}
/// Only intended to be used from EventLoop.Pollable
pub fn activate(this: *FilePoll, loop: *uws.Loop) void {
- std.debug.assert(!this.flags.contains(.has_incremented_poll_count));
- std.debug.assert(!this.flags.contains(.disable));
+ loop.num_polls += @as(i32, @boolToInt(!this.flags.contains(.has_incremented_poll_count)));
+ loop.active += @as(u32, @boolToInt(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count)));
+
this.flags.insert(.has_incremented_poll_count);
- loop.num_polls += 1;
- loop.active += 1;
}
pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll {
@@ -4240,9 +4251,20 @@ pub const FilePoll = struct {
return poll;
}
+ pub inline fn canRef(this: *const FilePoll) bool {
+ if (this.flags.contains(.disable))
+ return false;
+
+ return !this.flags.contains(.has_incremented_poll_count);
+ }
+
+ pub inline fn canUnref(this: *const FilePoll) bool {
+ return this.flags.contains(.has_incremented_poll_count);
+ }
+
/// Prevent a poll from keeping the process alive.
pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
- if (!this.isActive())
+ if (!this.canUnref())
return;
log("unref", .{});
this.deactivate(vm.uws_event_loop.?);
@@ -4250,7 +4272,7 @@ pub const FilePoll = struct {
/// Allow a poll to keep the process alive.
pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
- if (this.isActive())
+ if (this.canRef())
return;
log("ref", .{});
this.activate(vm.uws_event_loop.?);
@@ -4296,11 +4318,13 @@ pub const FilePoll = struct {
std.debug.assert(this.fd != invalid_fd);
if (comptime Environment.isLinux) {
+ const one_shot_flag: u32 = if (!this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT;
+
const flags: u32 = switch (flag) {
.process,
.readable,
- => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT),
- .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT),
+ => linux.EPOLL.IN | linux.EPOLL.HUP | one_shot_flag,
+ .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | one_shot_flag,
else => unreachable,
};
@@ -4318,6 +4342,7 @@ pub const FilePoll = struct {
}
} else if (comptime Environment.isMac) {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
+ const one_shot_flag: @TypeOf(changelist[0].flags) = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT;
changelist[0] = switch (flag) {
.readable => .{
.ident = @intCast(u64, fd),
@@ -4325,7 +4350,7 @@ pub const FilePoll = struct {
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
+ .flags = std.c.EV_ADD | one_shot_flag,
.ext = .{ 0, 0 },
},
.writable => .{
@@ -4334,7 +4359,7 @@ pub const FilePoll = struct {
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
+ .flags = std.c.EV_ADD | one_shot_flag,
.ext = .{ 0, 0 },
},
.process => .{
@@ -4343,9 +4368,10 @@ pub const FilePoll = struct {
.data = 0,
.fflags = std.c.NOTE_EXIT,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_ADD,
+ .flags = std.c.EV_ADD | one_shot_flag,
.ext = .{ 0, 0 },
},
+ else => unreachable,
};
// output events only include change errors
@@ -4393,17 +4419,20 @@ pub const FilePoll = struct {
} else {
@compileError("TODO: Pollable");
}
-
- if (!this.isActive()) this.activate(loop);
+ if (this.canActivate())
+ this.activate(loop);
this.flags.insert(switch (flag) {
- .process, .readable => .poll_readable,
+ .readable => .poll_readable,
+ .process => if (comptime Environment.isLinux) .poll_readable else .poll_process,
.writable => .poll_writable,
else => unreachable,
});
+ this.flags.remove(.needs_rearm);
+
return JSC.Maybe(void).success;
}
- pub const invalid_fd = std.math.maxInt(u32);
+ pub const invalid_fd = JSC.Node.invalid_fd;
pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) {
if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) {
@@ -4424,6 +4453,14 @@ pub const FilePoll = struct {
return JSC.Maybe(void).success;
};
+ if (this.flags.contains(.needs_rearm)) {
+ log("unregister: {s} ({d}) skipped due to needs_rearm", .{ @tagName(flag), fd });
+ this.flags.remove(.poll_process);
+ this.flags.remove(.poll_readable);
+ this.flags.remove(.poll_process);
+ return JSC.Maybe(void).success;
+ }
+
log("unregister: {s} ({d})", .{ @tagName(flag), fd });
if (comptime Environment.isLinux) {
@@ -4441,22 +4478,22 @@ pub const FilePoll = struct {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
changelist[0] = switch (flag) {
- .read => .{
+ .readable => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_READ,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .flags = std.c.EV_DELETE,
.ext = .{ 0, 0 },
},
- .write => .{
+ .writable => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_WRITE,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .flags = std.c.EV_DELETE,
.ext = .{ 0, 0 },
},
.process => .{
@@ -4465,7 +4502,7 @@ pub const FilePoll = struct {
.data = 0,
.fflags = std.c.NOTE_EXIT,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .flags = std.c.EV_DELETE,
.ext = .{ 0, 0 },
},
else => unreachable,
@@ -4498,10 +4535,9 @@ pub const FilePoll = struct {
}
const errno = std.c.getErrno(rc);
-
switch (rc) {
std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
- else => unreachable,
+ else => {},
}
} else {
@compileError("TODO: Pollable");
diff --git a/src/bun.js/bindings/JSReadableHelper.cpp b/src/bun.js/bindings/JSReadableHelper.cpp
index 384664ce9..3845d62f5 100644
--- a/src/bun.js/bindings/JSReadableHelper.cpp
+++ b/src/bun.js/bindings/JSReadableHelper.cpp
@@ -11,7 +11,7 @@
#include "JSDOMAttribute.h"
#include "headers.h"
#include "JSDOMConvertEnumeration.h"
-
+#include "JavaScriptCore/StrongInlines.h"
namespace WebCore {
using namespace JSC;
@@ -56,13 +56,15 @@ static bool callRead(JSValue stream, JSFunction* read, JSC::MarkedArgumentBuffer
return !ret.isUndefinedOrNull();
}
-static JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore_);
-JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
{
JSReadableHelper_EXTRACT_STREAM_STATE
- auto read
- = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto clientData
+ = WebCore::clientData(vm);
+ auto readIdentifier = clientData->builtinNames().readPublicName();
+ auto read = stream->get(lexicalGlobalObject, readIdentifier);
+
auto callData = JSC::getCallData(read);
if (callData.type == CallData::Type::None) {
throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read));
@@ -85,24 +87,14 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore_, (JSGlobalObject * lexicalGlo
RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
}
-JSC_DEFINE_HOST_FUNCTION(jsReadable_maybeReadMore, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
-{
- JSReadableHelper_EXTRACT_STREAM_STATE
-
- // make this static?
- JSFunction* maybeReadMore_
- = JSC::JSFunction::create(vm, lexicalGlobalObject, 0, "maybeReadMore_"_s, jsReadable_maybeReadMore_, ImplementationVisibility::Public);
-
- lexicalGlobalObject->queueMicrotask(maybeReadMore_, JSValue(stream), JSValue(state), JSValue {}, JSValue {});
- RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
-}
-
void flow(JSGlobalObject* lexicalGlobalObject, JSObject* streamObj, JSReadableState* state)
{
VM& vm = lexicalGlobalObject->vm();
auto throwScope = DECLARE_THROW_SCOPE(vm);
- auto read = streamObj->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto clientData = WebCore::clientData(vm);
+ auto readIdentifier = clientData->builtinNames().readPublicName();
+ auto read = streamObj->get(lexicalGlobalObject, readIdentifier);
auto callData = JSC::getCallData(read);
if (callData.type == CallData::Type::None) {
@@ -118,8 +110,7 @@ void flow(JSGlobalObject* lexicalGlobalObject, JSObject* streamObj, JSReadableSt
}
}
-static JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_);
-JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
+JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
{
JSReadableHelper_EXTRACT_STREAM_STATE
@@ -132,18 +123,20 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObje
}
auto& emitter = jsEmitterWrap->wrapped();
+ auto clientData = WebCore::clientData(vm);
+ auto readIdentifier = clientData->builtinNames().readPublicName();
if (!state->getBool(JSReadableState::reading)) {
// stream.read(0)
MarkedArgumentBuffer args;
args.append(jsNumber(0));
- callRead(stream, jsCast<JSFunction*>(stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s))), WTFMove(args), vm, lexicalGlobalObject, emitter);
+ callRead(stream, jsCast<JSFunction*>(stream->get(lexicalGlobalObject, readIdentifier)), WTFMove(args), vm, lexicalGlobalObject, emitter);
}
state->setBool(JSReadableState::resumeScheduled, true);
// stream.emit('resume')
- auto eventType = Identifier::fromString(vm, "resume"_s);
+ auto eventType = clientData->builtinNames().resumePublicName();
MarkedArgumentBuffer args;
emitter.emitForBindings(eventType, args);
@@ -152,7 +145,7 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObje
if (state->m_flowing > 0 && !state->getBool(JSReadableState::reading)) {
// stream.read(0)
- auto read = stream->get(lexicalGlobalObject, Identifier::fromString(vm, "read"_s));
+ auto read = stream->get(lexicalGlobalObject, readIdentifier);
auto callData = JSC::getCallData(read);
if (callData.type == CallData::Type::None) {
throwException(lexicalGlobalObject, throwScope, createNotAFunctionError(lexicalGlobalObject, read));
@@ -165,31 +158,16 @@ JSC_DEFINE_HOST_FUNCTION(jsReadable_resume_, (JSGlobalObject * lexicalGlobalObje
RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
}
-JSC_DEFINE_HOST_FUNCTION(jsReadable_resume, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
-{
- JSReadableHelper_EXTRACT_STREAM_STATE
-
- if (!state->getBool(JSReadableState::resumeScheduled))
- {
- state->setBool(JSReadableState::resumeScheduled, true);
- // make this static?
- JSFunction* resume_ = JSC::JSFunction::create(
- vm, lexicalGlobalObject, 0, "resume_"_s, jsReadable_resume_, ImplementationVisibility::Public);
-
- lexicalGlobalObject->queueMicrotask(resume_, JSValue(stream), JSValue(state), JSValue {}, JSValue {});
- }
- RELEASE_AND_RETURN(throwScope, JSValue::encode(jsUndefined()));
-}
-
EncodedJSValue emitReadable_(JSGlobalObject* lexicalGlobalObject, JSObject* stream, JSReadableState* state)
{
VM& vm = lexicalGlobalObject->vm();
auto throwScope = DECLARE_THROW_SCOPE(vm);
-
JSValue errored = state->m_errored.get();
if (!state->getBool(JSReadableState::destroyed) && !errored.toBoolean(lexicalGlobalObject) && (state->m_length || state->getBool(JSReadableState::ended))) {
// stream.emit('readable')
- auto eventType = Identifier::fromString(vm, "readable"_s);
+ auto clientData = WebCore::clientData(vm);
+
+ auto eventType = clientData->builtinNames().readablePublicName();
MarkedArgumentBuffer args;
auto emitter = jsDynamicCast<JSEventEmitter*>(stream);
if (!emitter) {
@@ -206,7 +184,6 @@ EncodedJSValue emitReadable_(JSGlobalObject* lexicalGlobalObject, JSObject* stre
return JSValue::encode(jsUndefined());
}
-JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_);
JSC_DEFINE_HOST_FUNCTION(jsReadable_emitReadable_, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
{
JSReadableHelper_EXTRACT_STREAM_STATE
@@ -223,11 +200,8 @@ EncodedJSValue emitReadable(JSGlobalObject* lexicalGlobalObject, JSObject* strea
state->setBool(JSReadableState::needReadable, false);
if (!state->getBool(JSReadableState::emittedReadable)) {
state->setBool(JSReadableState::emittedReadable, true);
- // make this static?
- JSFunction* emitReadable_ = JSC::JSFunction::create(
- vm, lexicalGlobalObject, 0, "emitReadable_"_s, jsReadable_emitReadable_, ImplementationVisibility::Public);
-
- lexicalGlobalObject->queueMicrotask(emitReadable_, JSValue(stream), JSValue(state), JSValue {}, JSValue {});
+ Zig::GlobalObject* globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject);
+ globalObject->queueMicrotask(JSValue(globalObject->emitReadableNextTickFunction()), JSValue(stream), JSValue(state), JSValue {}, JSValue {});
}
return JSValue::encode(jsUndefined());
}
diff --git a/src/bun.js/bindings/JSReadableHelper.h b/src/bun.js/bindings/JSReadableHelper.h
index 3a25c07cb..6746bcbec 100644
--- a/src/bun.js/bindings/JSReadableHelper.h
+++ b/src/bun.js/bindings/JSReadableHelper.h
@@ -8,5 +8,7 @@ JSC_DECLARE_HOST_FUNCTION(jsReadable_maybeReadMore);
JSC_DECLARE_HOST_FUNCTION(jsReadable_resume);
JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable);
JSC_DECLARE_HOST_FUNCTION(jsReadable_onEofChunk);
+JSC_DECLARE_HOST_FUNCTION(jsReadable_resume_);
+JSC_DECLARE_HOST_FUNCTION(jsReadable_emitReadable_);
} // namespace WebCore
diff --git a/src/bun.js/bindings/JSSink.cpp b/src/bun.js/bindings/JSSink.cpp
index c4097c8b4..a342793f0 100644
--- a/src/bun.js/bindings/JSSink.cpp
+++ b/src/bun.js/bindings/JSSink.cpp
@@ -1,6 +1,6 @@
// AUTO-GENERATED FILE. DO NOT EDIT.
-// Generated by 'make generate-sink' at 2022-10-09T04:03:11.763Z
+// Generated by 'make generate-sink' at 2022-11-13T22:44:00.280Z
// To regenerate this file, run:
//
// make generate-sink
diff --git a/src/bun.js/bindings/JSSink.h b/src/bun.js/bindings/JSSink.h
index 4a7f26682..2c9d5e2a6 100644
--- a/src/bun.js/bindings/JSSink.h
+++ b/src/bun.js/bindings/JSSink.h
@@ -1,6 +1,6 @@
// AUTO-GENERATED FILE. DO NOT EDIT.
-// Generated by 'make generate-sink' at 2022-10-09T04:03:11.760Z
+// Generated by 'make generate-sink' at 2022-11-13T22:44:00.279Z
//
#pragma once
diff --git a/src/bun.js/bindings/JSSinkLookupTable.h b/src/bun.js/bindings/JSSinkLookupTable.h
index be66e3530..14d547708 100644
--- a/src/bun.js/bindings/JSSinkLookupTable.h
+++ b/src/bun.js/bindings/JSSinkLookupTable.h
@@ -1,4 +1,4 @@
-// Automatically generated from src/bun.js/bindings/JSSink.cpp using /Users/zhiyuan.guo/Documents/playground/bun/src/bun.js/WebKit/Source/JavaScriptCore/create_hash_table. DO NOT EDIT!
+// Automatically generated from src/bun.js/bindings/JSSink.cpp using /Users/jarred/Code/bun/src/bun.js/WebKit/Source/JavaScriptCore/create_hash_table. DO NOT EDIT!
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index c01615685..052295bd2 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -1052,7 +1052,7 @@ JSC:
return ByteBlob__JSReadableStreamSource__load(globalObject);
}
case ReadableStreamTag::File: {
- return FileBlobLoader__JSReadableStreamSource__load(globalObject);
+ return FileReader__JSReadableStreamSource__load(globalObject);
}
case ReadableStreamTag::Bytes: {
return ByteStream__JSReadableStreamSource__load(globalObject);
@@ -2329,6 +2329,10 @@ void GlobalObject::finishCreation(VM& vm)
[](const Initializer<JSFunction>& init) {
init.set(JSFunction::create(init.vm, init.owner, 4, "performMicrotask"_s, jsFunctionPerformMicrotask, ImplementationVisibility::Public));
});
+ m_emitReadableNextTickFunction.initLater(
+ [](const Initializer<JSFunction>& init) {
+ init.set(JSFunction::create(init.vm, init.owner, 4, "emitReadable"_s, WebCore::jsReadable_emitReadable_, ImplementationVisibility::Public));
+ });
m_performMicrotaskVariadicFunction.initLater(
[](const Initializer<JSFunction>& init) {
@@ -2741,6 +2745,47 @@ JSC_DEFINE_CUSTOM_GETTER(functionLazyNavigatorGetter,
return JSC::JSValue::encode(reinterpret_cast<Zig::GlobalObject*>(globalObject)->navigatorObject());
}
+JSC_DEFINE_HOST_FUNCTION(functionGetDirectStreamDetails, (JSC::JSGlobalObject * lexicalGlobalObject, JSC::CallFrame* callFrame))
+{
+ auto* globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject);
+ JSC::VM& vm = globalObject->vm();
+ auto scope = DECLARE_THROW_SCOPE(vm);
+ auto argCount = callFrame->argumentCount();
+ if (argCount != 1) {
+ return JSC::JSValue::encode(JSC::jsNull());
+ }
+
+ auto stream = callFrame->argument(0);
+ if (!stream.isObject()) {
+ return JSC::JSValue::encode(JSC::jsNull());
+ }
+
+ auto* streamObject = stream.getObject();
+ auto* readableStream = jsDynamicCast<WebCore::JSReadableStream*>(streamObject);
+ if (!readableStream) {
+ return JSC::JSValue::encode(JSC::jsNull());
+ }
+
+ auto clientData = WebCore::clientData(vm);
+
+ JSValue ptrValue = readableStream->get(globalObject, clientData->builtinNames().bunNativePtrPrivateName());
+ JSValue typeValue = readableStream->get(globalObject, clientData->builtinNames().bunNativeTypePrivateName());
+ auto result = ptrValue.asAnyInt();
+
+ if (result == 0 || !typeValue.isNumber()) {
+ return JSC::JSValue::encode(JSC::jsNull());
+ }
+
+ readableStream->putDirect(vm, clientData->builtinNames().bunNativePtrPrivateName(), jsUndefined(), 0);
+ readableStream->putDirect(vm, clientData->builtinNames().bunNativeTypePrivateName(), jsUndefined(), 0);
+
+ auto* resultObject = JSC::constructEmptyObject(globalObject, globalObject->objectPrototype(), 2);
+ resultObject->putDirect(vm, clientData->builtinNames().streamPublicName(), ptrValue, 0);
+ resultObject->putDirect(vm, clientData->builtinNames().dataPublicName(), typeValue, 0);
+
+ return JSC::JSValue::encode(resultObject);
+}
+
void GlobalObject::addBuiltinGlobals(JSC::VM& vm)
{
m_builtinInternalFunctions.initialize(*this);
@@ -2855,6 +2900,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)
extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.createWritableStreamFromInternalPrivateName(), JSFunction::create(vm, this, 1, String(), createWritableStreamFromInternal, ImplementationVisibility::Public), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly));
extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.fulfillModuleSyncPrivateName(), JSFunction::create(vm, this, 1, String(), functionFulfillModuleSync, ImplementationVisibility::Public), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly | PropertyAttribute::Function));
extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.commonJSSymbolPrivateName(), JSC::Symbol::create(vm, vm.symbolRegistry().symbolForKey(CommonJSSymbolKey)), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly));
+ extraStaticGlobals.uncheckedAppend(GlobalPropertyInfo(builtinNames.directPrivateName(), JSFunction::create(vm, this, 1, String(), functionGetDirectStreamDetails, ImplementationVisibility::Public), PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly | PropertyAttribute::Function));
this->addStaticGlobals(extraStaticGlobals.data(), extraStaticGlobals.size());
@@ -3262,6 +3308,7 @@ void GlobalObject::visitChildrenImpl(JSCell* cell, Visitor& visitor)
thisObject->m_subtleCryptoObject.visit(visitor);
thisObject->m_JSHTTPResponseController.visit(visitor);
thisObject->m_callSiteStructure.visit(visitor);
+ thisObject->m_emitReadableNextTickFunction.visit(visitor);
for (auto& barrier : thisObject->m_thenables) {
visitor.append(barrier);
diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h
index 68fa452b2..9c37eb145 100644
--- a/src/bun.js/bindings/ZigGlobalObject.h
+++ b/src/bun.js/bindings/ZigGlobalObject.h
@@ -235,6 +235,8 @@ public:
JSC::JSFunction* performMicrotaskFunction() { return m_performMicrotaskFunction.getInitializedOnMainThread(this); }
JSC::JSFunction* performMicrotaskVariadicFunction() { return m_performMicrotaskVariadicFunction.getInitializedOnMainThread(this); }
+ JSC::JSFunction* emitReadableNextTickFunction() { return m_emitReadableNextTickFunction.getInitializedOnMainThread(this); }
+
JSC::JSObject* processObject()
{
return m_processObject.getInitializedOnMainThread(this);
@@ -467,6 +469,7 @@ private:
LazyProperty<JSGlobalObject, JSC::Structure> m_pendingVirtualModuleResultStructure;
LazyProperty<JSGlobalObject, JSFunction> m_performMicrotaskFunction;
LazyProperty<JSGlobalObject, JSFunction> m_performMicrotaskVariadicFunction;
+ LazyProperty<JSGlobalObject, JSFunction> m_emitReadableNextTickFunction;
LazyProperty<JSGlobalObject, JSMap> m_lazyReadableStreamPrototypeMap;
LazyProperty<JSGlobalObject, JSMap> m_requireMap;
LazyProperty<JSGlobalObject, JSObject> m_encodeIntoObjectPrototype;
diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig
index c374ff0ec..509138cf2 100644
--- a/src/bun.js/bindings/bindings.zig
+++ b/src/bun.js/bindings/bindings.zig
@@ -2473,6 +2473,9 @@ pub const JSValueReprInt = i64;
pub const JSValue = enum(JSValueReprInt) {
zero = 0,
@"undefined" = @bitCast(JSValueReprInt, @as(i64, 0xa)),
+ @"null" = @bitCast(JSValueReprInt, @as(i64, 0x2)),
+ @"true" = @bitCast(JSValueReprInt, @as(i64, 0x4)),
+ @"false" = @bitCast(JSValueReprInt, @as(i64, 0x6)),
_,
pub const Type = JSValueReprInt;
@@ -2997,18 +3000,21 @@ pub const JSValue = enum(JSValueReprInt) {
return jsNumberWithType(@TypeOf(number), number);
}
- pub fn jsNull() JSValue {
- return cppFn("jsNull", .{});
+ pub inline fn jsNull() JSValue {
+ return JSValue.@"null";
}
pub inline fn jsUndefined() JSValue {
return JSValue.@"undefined";
}
+ pub inline fn jsBoolean(i: bool) JSValue {
+ const out = cppFn("jsBoolean", .{i});
+ return out;
+ }
+
pub fn jsTDZValue() JSValue {
return cppFn("jsTDZValue", .{});
}
- pub fn jsBoolean(i: bool) JSValue {
- return cppFn("jsBoolean", .{i});
- }
+
pub fn jsDoubleNumber(i: f64) JSValue {
return cppFn("jsDoubleNumber", .{i});
}
diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig
index 2e2c5e556..933466b19 100644
--- a/src/bun.js/bindings/exports.zig
+++ b/src/bun.js/bindings/exports.zig
@@ -179,7 +179,7 @@ pub const NodePath = JSC.Node.Path;
// Web Streams
pub const JSReadableStreamBlob = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource;
-pub const JSReadableStreamFile = JSC.WebCore.FileBlobLoader.Source.JSReadableStreamSource;
+pub const JSReadableStreamFile = JSC.WebCore.FileReader.Source.JSReadableStreamSource;
pub const JSReadableStreamBytes = JSC.WebCore.ByteStream.Source.JSReadableStreamSource;
// Sinks
diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h
index 8450e7390..a3f71adb3 100644
--- a/src/bun.js/bindings/headers.h
+++ b/src/bun.js/bindings/headers.h
@@ -826,7 +826,7 @@ ZIG_DECL JSC__JSValue ByteBlob__JSReadableStreamSource__load(JSC__JSGlobalObject
#ifdef __cplusplus
-ZIG_DECL JSC__JSValue FileBlobLoader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0);
+ZIG_DECL JSC__JSValue FileReader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0);
#endif
diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js
index 10a538333..bc401cc37 100644
--- a/src/bun.js/child_process.exports.js
+++ b/src/bun.js/child_process.exports.js
@@ -118,7 +118,7 @@ export function spawn(file, args, options) {
}
timeoutId = null;
}
- }, options.timeout);
+ });
child.once("exit", () => {
if (timeoutId) {
@@ -852,10 +852,6 @@ export class ChildProcess extends EventEmitter {
spawnfile;
spawnargs;
pid;
- stdin;
- stdout;
- stderr;
- stdio;
channel;
get killed() {
@@ -875,8 +871,8 @@ export class ChildProcess extends EventEmitter {
this.exitCode = this.#handle.exitCode;
}
- if (this.stdin) {
- this.stdin.destroy();
+ if (this.#stdin) {
+ this.#stdin.destroy();
}
if (this.#handle) {
@@ -920,35 +916,73 @@ export class ChildProcess extends EventEmitter {
this.#exited = true;
}
- #getBunSpawnIo(stdio, options) {
- const result = [null];
- switch (stdio[0]) {
- case "pipe":
- result[0] = new WrappedFileSink(this.#handle.stdin);
- break;
- case "inherit":
- result[0] = process.stdin;
- break;
- default:
- result[0] = null;
+ #getBunSpawnIo(i, encoding) {
+ const io = this.#stdioOptions[i];
+ switch (i) {
+ case 0: {
+ switch (io) {
+ case "pipe":
+ return new WrappedFileSink(this.#handle.stdin);
+ case "inherit":
+ return process.stdin || null;
+ default:
+ return null;
+ }
+ }
+ case 2:
+ case 1: {
+ switch (io) {
+ case "pipe":
+ return ReadableFromWeb(this.#handle[fdToStdioName(i)], {
+ encoding,
+ });
+ break;
+ case "inherit":
+ return process[fdToStdioName(i)] || null;
+ break;
+ default:
+ return null;
+ }
break;
- }
- let i = 1;
- for (; i < stdio.length; i++) {
- switch (stdio[i]) {
- case "pipe":
- result[i] = ReadableFromWeb(this.#handle[fdToStdioName(i)], {
- encoding: options.encoding || undefined,
- });
- break;
- case "inherit":
- result[i] = process[fdToStdioName(i)];
- break;
- default:
- result[i] = null;
}
}
- return result;
+ }
+
+ #stdin;
+ #stdout;
+ #stderr;
+ #stdioObject;
+ #encoding;
+ #stdioOptions;
+
+ #createStdioObject() {
+ return Object.create(null, {
+ 0: {
+ get: () => this.stdin,
+ },
+ 1: {
+ get: () => this.stdout,
+ },
+ 2: {
+ get: () => this.stderr,
+ },
+ });
+ }
+
+ get stdin() {
+ return (this.#stdin ??= this.#getBunSpawnIo(0, this.#encoding));
+ }
+
+ get stdout() {
+ return (this.#stdout ??= this.#getBunSpawnIo(1, this.#encoding));
+ }
+
+ get stderr() {
+ return (this.#stderr ??= this.#getBunSpawnIo(2, this.#encoding));
+ }
+
+ get stdio() {
+ return (this.#stdioObject ??= this.#createStdioObject());
}
spawn(options) {
@@ -974,20 +1008,22 @@ export class ChildProcess extends EventEmitter {
// }
validateString(options.file, "options.file");
- this.spawnfile = options.file;
+ var file;
+ file = this.spawnfile = options.file;
+ var spawnargs;
if (options.args == null) {
- this.spawnargs = [];
+ spawnargs = this.spawnargs = [];
} else {
validateArray(options.args, "options.args");
- this.spawnargs = options.args;
+ spawnargs = this.spawnargs = options.args;
}
- const stdio = options.stdio || "pipe";
+ const stdio = options.stdio || ["pipe", "pipe", "pipe"];
const bunStdio = getBunStdioFromOptions(stdio);
this.#handle = Bun.spawn({
- cmd: [options.file, ...this.spawnargs],
+ cmd: spawnargs,
stdin: bunStdio[0],
stdout: bunStdio[1],
stderr: bunStdio[2],
@@ -996,16 +1032,12 @@ export class ChildProcess extends EventEmitter {
onExit: this.#handleOnExit.bind(this),
});
this.#handleExited = this.#handle.exited;
-
- this.stdio = this.#getBunSpawnIo(bunStdio, options);
- this.stdin = this.stdio[0];
- this.stdout = this.stdio[1];
- this.stderr = this.stdio[2];
+ this.#encoding = options.encoding || undefined;
+ this.#stdioOptions = bunStdio;
+ this.pid = this.#handle.pid;
process.nextTick(onSpawnNT, this);
- this.pid = this.#handle.pid;
-
// If no `stdio` option was given - use default
// let stdio = options.stdio || "pipe"; // TODO: reset default
// let stdio = options.stdio || ["pipe", "pipe", "pipe"];
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index 23bcf542f..47b54e8ca 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -1801,6 +1801,7 @@ pub const VirtualMachine = struct {
"url",
"info",
"pkg",
+ "errors",
};
if (error_instance != .zero and error_instance.isCell() and error_instance.jsType().canGet()) {
diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig
index dd961b510..32788cedc 100644
--- a/src/bun.js/node/syscall.zig
+++ b/src/bun.js/node/syscall.zig
@@ -16,6 +16,8 @@ const C = @import("../../global.zig").C;
const linux = os.linux;
const Maybe = JSC.Maybe;
+const log = bun.Output.scoped(.SYS, false);
+
// On Linux AARCh64, zig is missing stat & lstat syscalls
const use_libc = (Environment.isLinux and Environment.isAarch64) or Environment.isMac;
pub const system = if (Environment.isLinux) linux else @import("io").darwin;
@@ -180,6 +182,7 @@ pub fn getErrno(rc: anytype) std.os.E {
pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) Maybe(JSC.Node.FileDescriptor) {
while (true) {
const rc = Syscall.system.open(file_path, flags, perm);
+ log("open({s}): {d}", .{ file_path, rc });
return switch (Syscall.getErrno(rc)) {
.SUCCESS => .{ .result = @intCast(JSC.Node.FileDescriptor, rc) },
.INTR => continue,
@@ -288,16 +291,20 @@ pub fn read(fd: os.fd_t, buf: []u8) Maybe(usize) {
if (comptime Environment.isMac) {
const rc = system.@"read$NOCANCEL"(fd, buf.ptr, adjusted_len);
if (Maybe(usize).errnoSys(rc, .read)) |err| {
+ log("read error: {d} ({d} bytes, {d} fd)", .{ err.err.errno, buf.len, fd });
return err;
}
+ log("read: {d} bytes, {d} fd", .{ rc, fd });
return Maybe(usize){ .result = @intCast(usize, rc) };
} else {
while (true) {
const rc = sys.read(fd, buf.ptr, adjusted_len);
if (Maybe(usize).errnoSys(rc, .read)) |err| {
if (err.getErrno() == .INTR) continue;
+ log("read error: {d} ({d} bytes, {d} fd)", .{ err.err.errno, buf.len, fd });
return err;
}
+ log("read: {d} bytes, {d} fd", .{ rc, fd });
return Maybe(usize){ .result = @intCast(usize, rc) };
}
}
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 24e34447d..15d32adb5 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -1,6 +1,6 @@
// "readable-stream" npm package
// just transpiled
-var { isPromise } = import.meta.primordials;
+var { isPromise, isCallable, direct } = import.meta.primordials;
var __create = Object.create;
var __defProp = Object.defineProperty;
@@ -2145,7 +2145,7 @@ var require_destroy = __commonJS({
}
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err);
else if (err) {
- err.stack;
+ Error.captureStackTrace(err);
if (w && !w.errored) {
w.errored = err;
}
@@ -2605,56 +2605,96 @@ var require_readable = __commonJS({
class ReadableFromWeb extends Readable {
#reader;
#closed;
+ #pendingChunks;
+ #stream;
- constructor(options) {
- const { objectMode, highWaterMark, encoding, signal, reader } = options;
+ constructor(options, stream) {
+ const { objectMode, highWaterMark, encoding, signal } = options;
super({
objectMode,
highWaterMark,
encoding,
signal,
});
+ this.#pendingChunks = [];
+ this.#reader = undefined;
+ this.#stream = stream;
+ this.#closed = false;
+ }
- this.#reader = reader;
- this.#reader.closed
- .then(() => {
- this.#closed = true;
- })
- .catch((error) => {
- this.#closed = true;
- destroy(this, error);
- });
+ #drainPending() {
+ var pendingChunks = this.#pendingChunks,
+ pendingChunksI = 0,
+ pendingChunksCount = pendingChunks.length;
+
+ for (; pendingChunksI < pendingChunksCount; pendingChunksI++) {
+ const chunk = pendingChunks[pendingChunksI];
+ pendingChunks[pendingChunksI] = undefined;
+ if (!this.push(chunk, undefined)) {
+ this.#pendingChunks = pendingChunks.slice(pendingChunksI + 1);
+ return true;
+ }
+ }
+
+ if (pendingChunksCount > 0) {
+ this.#pendingChunks = [];
+ }
+
+ return false;
+ }
+
+ #handleDone(reader) {
+ reader.releaseLock();
+ this.#reader = undefined;
+ this.#closed = true;
+ this.push(null);
+ return;
}
async _read() {
+ var stream = this.#stream,
+ reader = this.#reader;
+ if (stream) {
+ reader = this.#reader = stream.getReader();
+ this.#stream = undefined;
+ } else if (this.#drainPending()) {
+ return;
+ }
+
var deferredError;
try {
- var done, value;
- const firstResult = this.#reader.readMany();
+ do {
+ var done = false,
+ value;
+ const firstResult = reader.readMany();
- if (isPromise(firstResult)) {
- const result = await firstResult;
- done = result.done;
- value = result.value;
- } else {
- done = firstResult.done;
- value = firstResult.value;
- }
+ if (isPromise(firstResult)) {
+ ({ done, value } = await firstResult);
+ if (this.#closed) {
+ this.#pendingChunks.push(...value);
+ return;
+ }
+ } else {
+ ({ done, value } = firstResult);
+ }
- if (done) {
- this.push(null);
- return;
- }
+ if (done) {
+ this.#handleDone(reader);
+ return;
+ }
- if (!value)
- throw new Error(
- `Invalid value from ReadableStream reader: ${value}`,
- );
- if (ArrayIsArray(value)) {
- this.push(...value);
- } else {
- this.push(value);
- }
+ if (!this.push(value[0])) {
+ this.#pendingChunks = value.slice(1);
+ return;
+ }
+
+ for (let i = 1, count = value.length; i < count; i++) {
+ if (!this.push(value[i])) {
+ this.#pendingChunks = value.slice(i + 1);
+ return;
+ }
+ }
+ } while (this._readableState.flowing && !this.#closed);
} catch (e) {
deferredError = e;
} finally {
@@ -2664,8 +2704,15 @@ var require_readable = __commonJS({
_destroy(error, callback) {
if (!this.#closed) {
- this.#reader.releaseLock();
- this.#reader.cancel(error).then(done).catch(done);
+ var reader = this.#reader;
+ if (reader) {
+ this.#reader = undefined;
+ reader.cancel(error).finally(() => {
+ this.#closed = true;
+ callback(error);
+ });
+ }
+
return;
}
try {
@@ -2709,16 +2756,24 @@ var require_readable = __commonJS({
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
validateBoolean(objectMode, "options.objectMode");
- const reader = readableStream.getReader();
- const readable = new ReadableFromWeb({
- highWaterMark,
- encoding,
- objectMode,
- signal,
- reader,
- });
+ const nativeStream = getNativeReadableStream(
+ Readable,
+ readableStream,
+ options,
+ );
- return readable;
+ return (
+ nativeStream ||
+ new ReadableFromWeb(
+ {
+ highWaterMark,
+ encoding,
+ objectMode,
+ signal,
+ },
+ readableStream,
+ )
+ );
}
module.exports = Readable;
@@ -2726,8 +2781,15 @@ var require_readable = __commonJS({
var { addAbortSignal } = require_add_abort_signal();
var eos = require_end_of_stream();
var debug = (name) => {};
- const { maybeReadMore, resume, emitReadable, onEofChunk } =
- globalThis[Symbol.for("Bun.lazy")]("bun:stream");
+ const {
+ maybeReadMore: _maybeReadMore,
+ resume,
+ emitReadable,
+ onEofChunk,
+ } = globalThis[Symbol.for("Bun.lazy")]("bun:stream");
+ function maybeReadMore(stream, state) {
+ process.nextTick(_maybeReadMore, stream, state);
+ }
var destroyImpl = require_destroy();
var {
aggregateTwoErrors,
@@ -2984,10 +3046,17 @@ var require_readable = __commonJS({
// Call internal read method
try {
- const result = this._read(state.highWaterMark);
- const then = result?.then;
- if (then && typeof then === "function") {
- then.call(result, nop, function (err) {
+ var result = this._read(state.highWaterMark);
+ if (isPromise(result)) {
+ const peeked = Bun.peek(result);
+ if (peeked !== result) {
+ result = peeked;
+ }
+ }
+
+ var then = result?.then;
+ if (then && isCallable(then)) {
+ result.then(nop, function (err) {
errorOrDestroy(this, err);
});
}
@@ -3823,7 +3892,7 @@ var require_writable = __commonJS({
state.length -= state.writelen;
state.writelen = 0;
if (er) {
- er.stack;
+ Error.captureStackTrace(er);
if (!state.errored) {
state.errored = er;
}
@@ -5603,6 +5672,217 @@ var require_ours = __commonJS({
},
});
+/**
+ * Bun native stream wrapper
+ *
+ * This glue code lets us avoid using ReadableStreams to wrap Bun internal streams
+ *
+ */
+function createNativeStream(nativeType, Readable) {
+ var [pull, start, cancel, setClose, deinit, updateRef] =
+ globalThis[Symbol.for("Bun.lazy")](nativeType);
+
+ var closer = [false];
+ var handleNumberResult = function (nativeReadable, result, view, isClosed) {
+ if (result > 0) {
+ const slice = view.subarray(0, result);
+ const remainder = view.subarray(result);
+ if (remainder.byteLength > 0) {
+ nativeReadable.push(slice);
+ }
+
+ if (isClosed) {
+ nativeReadable.push(null);
+ }
+
+ return remainder.byteLength > 0 ? remainder : undefined;
+ }
+
+ if (isClosed) {
+ nativeReadable.push(null);
+ }
+ };
+
+ var handleArrayBufferViewResult = function (
+ nativeReadable,
+ result,
+ view,
+ isClosed,
+ ) {
+ if (result.byteLength > 0) {
+ nativeReadable.push(result);
+ }
+
+ if (isClosed) {
+ nativeReadable.push(null);
+ }
+
+ return view;
+ };
+
+ var handleResult = function (nativeReadable, result, view, isClosed) {
+ if (typeof result === "number") {
+ return handleNumberResult(nativeReadable, result, view, isClosed);
+ } else if (typeof result === "boolean") {
+ nativeReadable.push(null);
+ return view?.byteLength ?? 0 > 0 ? view : undefined;
+ } else if (ArrayBuffer.isView(result)) {
+ return handleArrayBufferViewResult(
+ nativeReadable,
+ result,
+ view,
+ isClosed,
+ );
+ } else {
+ throw new Error("Invalid result from pull");
+ }
+ };
+ var NativeReadable = class NativeReadable extends Readable {
+ #ptr;
+ #refCount = 1;
+ #constructed = false;
+ #remainingChunk = undefined;
+ #highWaterMark;
+ #pendingRead = false;
+ constructor(ptr, options = {}) {
+ super(options);
+ if (typeof options.highWaterMark === "number") {
+ this.#highWaterMark = options.highWaterMark;
+ } else {
+ this.#highWaterMark = 256 * 1024;
+ }
+ this.#ptr = ptr;
+ this.#constructed = false;
+ this.#remainingChunk = undefined;
+ this.#pendingRead = false;
+ }
+
+ _read(highWaterMark) {
+ if (this.#pendingRead) return;
+
+ var ptr = this.#ptr;
+ if (ptr === 0) {
+ this.push(null);
+ return;
+ }
+
+ if (!this.#constructed) {
+ this.#constructed = true;
+ start(ptr, this.#highWaterMark);
+ }
+
+ return this.#internalRead(this.#getRemainingChunk(), ptr);
+ }
+
+ #getRemainingChunk() {
+ var chunk = this.#remainingChunk;
+ var highWaterMark = this.#highWaterMark;
+ if ((chunk?.byteLength ?? 0 < 512) && highWaterMark > 512) {
+ this.#remainingChunk = chunk = new Buffer(this.#highWaterMark);
+ }
+
+ return chunk;
+ }
+
+ #internalRead(view, ptr) {
+ closer[0] = false;
+ var result = pull(ptr, view, closer);
+ if (isPromise(result)) {
+ this.#pendingRead = true;
+ var originalFlowing = this._readableState.flowing;
+ this._readableState.flowing = false;
+ return result.then(
+ (result) => {
+ this._readableState.flowing = originalFlowing;
+ this.#pendingRead = false;
+ this.#remainingChunk = handleResult(this, result, view, closer[0]);
+ },
+ (reason) => errorOrDestroy(this, reason),
+ );
+ } else {
+ this.#remainingChunk = handleResult(this, result, view, closer[0]);
+ }
+ }
+
+ _construct(cb) {
+ this._readableState.constructed = true;
+ cb();
+ }
+ _destroy(error, callback) {
+ var ptr = this.#ptr;
+ if (ptr === 0) {
+ callback(error);
+ return;
+ }
+
+ this.#ptr = 0;
+ if (updateRef) {
+ updateRef(ptr, false);
+ }
+ process.nextTick(deinit, ptr);
+ cancel(ptr, error);
+ callback(error);
+ }
+
+ ref() {
+ var ptr = this.#ptr;
+ if (ptr === 0) return;
+ if (this.#refCount++ === 0) {
+ updateRef(ptr, true);
+ }
+ }
+
+ unref() {
+ var ptr = this.#ptr;
+ if (ptr === 0) return;
+ if (this.#refCount-- === 1) {
+ updateRef(ptr, false);
+ }
+ }
+ };
+
+ if (!updateRef) {
+ NativeReadable.prototype.ref = undefined;
+ NativeReadable.prototype.unref = undefined;
+ }
+
+ return NativeReadable;
+}
+
+var nativeReadableStreamPrototypes = {
+ 0: undefined,
+ 1: undefined,
+ 2: undefined,
+ 3: undefined,
+ 4: undefined,
+ 5: undefined,
+};
+function getNativeReadableStreamPrototype(nativeType, Readable) {
+ return (nativeReadableStreamPrototypes[nativeType] ||= createNativeStream(
+ nativeType,
+ Readable,
+ ));
+}
+
+function getNativeReadableStream(Readable, stream, options) {
+ if (
+ !(stream && typeof stream === "object" && stream instanceof ReadableStream)
+ ) {
+ return undefined;
+ }
+
+ const native = direct(stream);
+ if (!native) {
+ return undefined;
+ }
+ const { stream: ptr, data: type } = native;
+
+ const NativeReadable = getNativeReadableStreamPrototype(type, Readable);
+
+ return new NativeReadable(ptr, options);
+}
+/** --- Bun native stream wrapper --- */
+
var stream_exports, wrapper;
stream_exports = require_ours();
wrapper =
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 81e3c6a51..2f4ee919c 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -477,9 +477,6 @@ pub const Fetch = struct {
const headers_string = "headers";
const method_string = "method";
- var fetch_body_string: MutableString = undefined;
- var fetch_body_string_loaded = false;
-
const JSType = js.JSType;
pub const fetch_error_no_args = "fetch() expects a string but received no arguments.";
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 4ec4f9f9c..7ec20b046 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -160,7 +160,7 @@ pub const ReadableStream = struct {
/// ReadableByteStreamController
/// but with a FileLoader
/// we can skip the FileLoader and just use the underlying File
- File: *FileBlobLoader,
+ File: *FileReader,
/// This is a direct readable stream
/// That means we can turn it into whatever we want
@@ -212,7 +212,7 @@ pub const ReadableStream = struct {
.File => ReadableStream{
.value = value,
.ptr = .{
- .File = ptr.asPtr(FileBlobLoader),
+ .File = ptr.asPtr(FileReader),
},
},
@@ -266,7 +266,7 @@ pub const ReadableStream = struct {
return reader.toJS(globalThis);
},
.file => {
- var reader = bun.default_allocator.create(FileBlobLoader.Source) catch unreachable;
+ var reader = bun.default_allocator.create(FileReader.Source) catch unreachable;
reader.* = .{
.context = undefined,
};
@@ -1141,6 +1141,10 @@ pub const FileSink = struct {
}
pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable {
+ return flushMaybePollWithSize(this, std.math.maxInt(usize));
+ }
+
+ pub fn flushMaybePollWithSize(this: *FileSink, writable_size: usize) StreamResult.Writable {
std.debug.assert(this.fd != JSC.Node.invalid_fd);
var total: usize = this.written;
@@ -1168,12 +1172,23 @@ pub const FileSink = struct {
}
}
- const max_to_write = if (is_fifo) this.max_write_size else remain.len;
+ const max_to_write =
+ if (is_fifo)
+ brk: {
+ if (comptime Environment.isMac) {
+ break :brk if (writable_size == std.math.maxInt(usize))
+ max_fifo_size
+ else
+ writable_size;
+ }
+
+ break :brk this.max_write_size;
+ } else remain.len;
while (remain.len > 0) {
const write_buf = remain[0..@minimum(remain.len, max_to_write)];
- log("Write {d} bytes (fd: {d})", .{ write_buf.len, fd });
+ log("Write {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ write_buf.len, fd, this.head, remain.len, total });
const res = JSC.Node.Syscall.write(fd, write_buf);
if (res == .err) {
const retry =
@@ -1202,7 +1217,7 @@ pub const FileSink = struct {
remain = remain[res.result..];
total += res.result;
- log("Wrote {d} bytes (fd: {d})", .{ res.result, fd });
+ log("Wrote {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ res.result, fd, this.head, remain.len, total });
if (res.result == 0) {
if (this.poll_ref) |poll| {
@@ -1341,7 +1356,7 @@ pub const FileSink = struct {
return JSSink.createObject(globalThis, this);
}
- pub fn ready(this: *FileSink, _: i64) void {
+ pub fn ready(this: *FileSink, writable: i64) void {
var remain = this.buffer.slice();
const pending = remain[@minimum(this.head, remain.len)..].len;
if (pending == 0) {
@@ -1352,7 +1367,11 @@ pub const FileSink = struct {
return;
}
- _ = this.flushMaybePoll();
+ if (comptime Environment.isMac) {
+ _ = this.flushMaybePollWithSize(@intCast(usize, @maximum(writable, 0)));
+ } else {
+ _ = this.flushMaybePollWithSize(std.math.maxInt(usize));
+ }
}
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
@@ -1429,7 +1448,7 @@ pub const FileSink = struct {
fn isPending(this: *const FileSink) bool {
var poll_ref = this.poll_ref orelse return false;
- return poll_ref.isActive();
+ return poll_ref.isRegistered();
}
pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
@@ -2592,6 +2611,7 @@ pub fn ReadableStreamSource(
comptime onPull: anytype,
comptime onCancel: fn (this: *Context) void,
comptime deinit: fn (this: *Context) void,
+ comptime setRefUnrefFn: ?fn (this: *Context, enable: bool) void,
) type {
return struct {
context: Context,
@@ -2610,6 +2630,24 @@ pub fn ReadableStreamSource(
return onPull(&this.context, buf, JSValue.zero);
}
+ pub fn ref(this: *This) void {
+ if (setRefUnrefFn) |setRefUnref| {
+ setRefUnref(&this.context, true);
+ }
+ }
+
+ pub fn unref(this: *This) void {
+ if (setRefUnrefFn) |setRefUnref| {
+ setRefUnref(&this.context, false);
+ }
+ }
+
+ pub fn setRef(this: *This, value: bool) void {
+ if (setRefUnrefFn) |setRefUnref| {
+ setRefUnref(&this.context, value);
+ }
+ }
+
pub fn start(
this: *This,
) StreamStart {
@@ -2665,6 +2703,8 @@ pub fn ReadableStreamSource(
return ReadableStream.fromNative(globalThis, Context.tag, this);
}
+ const supports_ref = setRefUnrefFn != null;
+
pub const JSReadableStreamSource = struct {
pub const shim = JSC.Shimmer(std.mem.span(name_), "JSReadableStreamSource", @This());
pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamSource", .{std.mem.span(name_)});
@@ -2724,6 +2764,13 @@ pub fn ReadableStreamSource(
return JSC.JSValue.jsUndefined();
}
+ pub fn updateRef(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ const ref_or_unref = callFrame.argument(1).asBoolean();
+ this.setRef(ref_or_unref);
+ return JSC.JSValue.jsUndefined();
+ }
+
fn onClose(ptr: *anyopaque) void {
var this = bun.cast(*ReadableStreamSourceType, ptr);
_ = this.close_jsvalue.call(this.globalThis, &.{});
@@ -2738,23 +2785,17 @@ pub fn ReadableStreamSource(
pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue {
if (comptime JSC.is_bindgen) unreachable;
- if (comptime Environment.allow_assert) {
- // this should be cached per globals object
- const OnlyOnce = struct {
- pub threadlocal var last_globals: ?*JSGlobalObject = null;
- };
- if (OnlyOnce.last_globals) |last_globals| {
- std.debug.assert(last_globals != globalThis);
- }
- OnlyOnce.last_globals = globalThis;
- }
-
+ // This is used also in Node.js streams
return JSC.JSArray.from(globalThis, &.{
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.pull, true),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.start, true),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.cancel, true),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.setClose, true),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.deinit, true),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.pull, true),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.start, true),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.cancel, true),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.setClose, true),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.deinit, true),
+ if (supports_ref)
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.updateRef, true)
+ else
+ JSC.JSValue.jsNull(),
});
}
@@ -2770,6 +2811,7 @@ pub fn ReadableStreamSource(
_ = JSReadableStreamSource.cancel;
_ = JSReadableStreamSource.setClose;
_ = JSReadableStreamSource.load;
+ _ = JSReadableStreamSource.deinit;
}
}
};
@@ -2847,7 +2889,7 @@ pub const ByteBlobLoader = struct {
bun.default_allocator.destroy(this);
}
- pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
+ pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit, null);
};
pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
@@ -3127,10 +3169,11 @@ pub const ByteStream = struct {
bun.default_allocator.destroy(this.parent());
}
- pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit);
+ pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit, null);
};
-pub const FileBlobLoader = struct {
+/// **Not** the Web "FileReader" API
+pub const FileReader = struct {
buf: []u8 = &[_]u8{},
view: JSC.Strong = .{},
fd: JSC.Node.FileDescriptor = 0,
@@ -3169,17 +3212,15 @@ pub const FileBlobLoader = struct {
pub usingnamespace NewReadyWatcher(@This(), .readable, ready);
- pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject {
+ pub inline fn globalThis(this: *FileReader) *JSC.JSGlobalObject {
return this.stored_global_this_ orelse @fieldParentPtr(Source, "context", this).globalThis;
}
- const FileReader = @This();
-
const run_on_different_thread_size = bun.huge_allocator_threshold;
pub const tag = ReadableStream.Tag.File;
- pub fn setupWithPoll(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void {
+ pub fn setupWithPoll(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType, poll: ?*JSC.FilePoll) void {
store.ref();
this.* = .{
.loop = JSC.VirtualMachine.vm.eventLoop(),
@@ -3193,11 +3234,11 @@ pub const FileBlobLoader = struct {
}
}
- pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void {
+ pub fn setup(this: *FileReader, store: *Blob.Store, chunk_size: Blob.SizeType) void {
this.setupWithPoll(store, chunk_size, null);
}
- pub fn finish(this: *FileBlobLoader) void {
+ pub fn finish(this: *FileReader) void {
if (this.finished) return;
this.finished = true;
this.close_on_eof = true;
@@ -3227,12 +3268,12 @@ pub const FileBlobLoader = struct {
concurrent_task: JSC.ConcurrentTask = .{},
pub fn taskCallback(task: *NetworkThread.Task) void {
- var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
+ var this = @fieldParentPtr(FileReader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable;
_ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this});
}
- pub fn onRead(this: *FileBlobLoader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void {
+ pub fn onRead(this: *FileReader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void {
this.concurrent.read = @truncate(Blob.SizeType, result catch |err| {
if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
this.pending.result = .{
@@ -3258,7 +3299,7 @@ pub const FileBlobLoader = struct {
resume this.concurrent.read_frame;
}
- pub fn scheduleRead(this: *FileBlobLoader) void {
+ pub fn scheduleRead(this: *FileReader) void {
if (comptime Environment.isMac) {
var remaining = this.buf[this.concurrent.read..];
@@ -3296,7 +3337,7 @@ pub const FileBlobLoader = struct {
}
AsyncIO.global.read(
- *FileBlobLoader,
+ *FileReader,
this,
onRead,
&this.concurrent.completion,
@@ -3316,7 +3357,7 @@ pub const FileBlobLoader = struct {
}
pub fn onJSThread(task_ctx: *anyopaque) void {
- var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx);
+ var this: *FileReader = bun.cast(*FileReader, task_ctx);
const view = this.view.get().?;
defer this.view.clear();
@@ -3352,12 +3393,12 @@ pub const FileBlobLoader = struct {
}
}
- pub fn scheduleMainThreadTask(this: *FileBlobLoader) void {
+ pub fn scheduleMainThreadTask(this: *FileReader) void {
this.concurrent.main_thread_task.ctx = this;
this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task));
}
- fn runAsync(this: *FileBlobLoader) void {
+ fn runAsync(this: *FileReader) void {
this.concurrent.read = 0;
Concurrent.scheduleRead(this);
@@ -3382,7 +3423,7 @@ pub const FileBlobLoader = struct {
const default_fifo_chunk_size = 64 * 1024;
const default_file_chunk_size = 1024 * 1024 * 2;
- pub fn onStart(this: *FileBlobLoader) StreamStart {
+ pub fn onStart(this: *FileReader) StreamStart {
var file = &this.store.data.file;
std.debug.assert(!this.started);
this.started = true;
@@ -3480,7 +3521,7 @@ pub const FileBlobLoader = struct {
return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
}
- fn calculateChunkSize(this: *FileBlobLoader, available_to_read: usize) usize {
+ fn calculateChunkSize(this: *FileReader, available_to_read: usize) usize {
const file = &this.store.data.file;
const chunk_size: usize = if (this.user_chunk_size > 0)
@@ -3496,7 +3537,7 @@ pub const FileBlobLoader = struct {
@minimum(available_to_read, chunk_size);
}
- pub fn onPullInto(this: *FileBlobLoader, buffer: []u8, view: JSC.JSValue) StreamResult {
+ pub fn onPullInto(this: *FileReader, buffer: []u8, view: JSC.JSValue) StreamResult {
const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
std.debug.assert(this.started);
@@ -3531,14 +3572,14 @@ pub const FileBlobLoader = struct {
return this.read(buffer, view, null);
}
- fn maybeAutoClose(this: *FileBlobLoader) void {
+ fn maybeAutoClose(this: *FileReader) void {
if (this.auto_close) {
_ = Syscall.close(this.fd);
this.auto_close = false;
}
}
- fn handleReadChunk(this: *FileBlobLoader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult {
+ fn handleReadChunk(this: *FileReader, result: usize, view: JSC.JSValue, owned: bool, buf: []u8) StreamResult {
std.debug.assert(this.started);
this.total_read += @intCast(Blob.SizeType, result);
@@ -3573,7 +3614,7 @@ pub const FileBlobLoader = struct {
}
pub fn read(
- this: *FileBlobLoader,
+ this: *FileReader,
read_buf: []u8,
view: JSC.JSValue,
/// provided via kqueue(), only on macOS
@@ -3665,6 +3706,10 @@ pub const FileBlobLoader = struct {
}
if (this.poll_ref) |poll| {
+ if ((available_to_read orelse 0) > 0) {
+ poll.flags.insert(.readable);
+ }
+
const is_readable = poll.isReadable();
if (!is_readable and poll.isEOF()) {
if (poll.isHUP()) {
@@ -3676,6 +3721,11 @@ pub const FileBlobLoader = struct {
this.finalize();
return .{ .done = {} };
} else if (!is_readable and poll.isRegistered()) {
+ if (this.finished) {
+ this.finalize();
+ return .{ .done = {} };
+ }
+
if (view != .zero) {
this.view.set(this.globalThis(), view);
this.buf = read_buf;
@@ -3753,6 +3803,12 @@ pub const FileBlobLoader = struct {
if (result < buf_to_use.len) {
// do not insert .eof here
poll.flags.remove(.readable);
+
+ if (result > 0 and !poll.flags.contains(.hup) and !this.finished) {
+ // partial read, but not close. be sure to ask for more data
+ if (!this.isWatching())
+ this.watch(fd);
+ }
}
}
}
@@ -3770,7 +3826,7 @@ pub const FileBlobLoader = struct {
this.view.set(this.globalThis(), view);
this.buf = read_buf;
if (!this.isWatching())
- this.watch(this.fd);
+ this.watch(fd);
this.poll_ref.?.flags.remove(.readable);
return .{
@@ -3784,23 +3840,21 @@ pub const FileBlobLoader = struct {
}
/// Called from Poller
- pub fn ready(this: *FileBlobLoader, sizeOrOffset: i64) void {
- std.debug.assert(this.started);
-
+ pub fn ready(this: *FileReader, sizeOrOffset: i64) void {
const view = this.view.get() orelse .zero;
defer this.view.clear();
- var available_to_read: usize = std.math.maxInt(usize);
- if (comptime Environment.isMac) {
+ const available_to_read: usize = if (comptime Environment.isMac) brk: {
if (this.isFIFO()) {
- available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0));
+ break :brk @intCast(usize, @maximum(sizeOrOffset, 0));
} else if (std.os.S.ISREG(this.mode)) {
// Returns when the file pointer is not at the end of
// file. data contains the offset from current position
// to end of file, and may be negative.
- available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0));
+ break :brk @intCast(usize, @maximum(sizeOrOffset, 0));
}
- }
+ break :brk std.math.maxInt(usize);
+ } else std.math.maxInt(usize);
if (this.finalized and this.scheduled_count == 0) {
if (this.pending.state == .pending) {
// should never be reached
@@ -3844,7 +3898,7 @@ pub const FileBlobLoader = struct {
this.pending.run();
}
- pub fn finalize(this: *FileBlobLoader) void {
+ pub fn finalize(this: *FileReader) void {
if (this.finalized)
return;
@@ -3875,23 +3929,33 @@ pub const FileBlobLoader = struct {
this.store.deref();
}
- pub fn onCancel(this: *FileBlobLoader) void {
+ pub fn onCancel(this: *FileReader) void {
this.cancelled = true;
this.deinit();
}
- pub fn deinit(this: *FileBlobLoader) void {
+ pub fn deinit(this: *FileReader) void {
this.finalize();
if (this.scheduled_count == 0 and this.pending.state == .pending) {
this.destroy();
}
}
- pub fn destroy(this: *FileBlobLoader) void {
+ pub fn destroy(this: *FileReader) void {
bun.default_allocator.destroy(this);
}
- pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPullInto, onCancel, deinit);
+ pub fn setRefOrUnref(this: *FileReader, value: bool) void {
+ if (this.poll_ref) |poll| {
+ if (value) {
+ poll.enableKeepingProcessAlive(this.globalThis().bunVM());
+ } else {
+ poll.disableKeepingProcessAlive(this.globalThis().bunVM());
+ }
+ }
+ }
+
+ pub const Source = ReadableStreamSource(@This(), "FileReader", onStart, onPullInto, onCancel, deinit, setRefOrUnref);
};
pub fn NewReadyWatcher(
@@ -3943,7 +4007,7 @@ pub fn NewReadyWatcher(
pub fn isWatching(this: *const Context) bool {
if (this.poll_ref) |poll| {
- return poll.flags.contains(flag.poll());
+ return poll.flags.contains(flag.poll()) and !poll.flags.contains(.needs_rearm);
}
return false;