aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/webcore/response.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-02 03:00:45 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-02 03:00:45 -0700
commite5eabc0658d2133603596ec17a6e7c956c5fe28c (patch)
tree8e50a0bfa0ca9eba4145191720bb7d412bf8d26f /src/javascript/jsc/webcore/response.zig
parent121c2960de87c53cc6bdd5f92fab627a74d21a2b (diff)
downloadbun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.gz
bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.zst
bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.zip
Faster ReadableStream
Diffstat (limited to 'src/javascript/jsc/webcore/response.zig')
-rw-r--r--src/javascript/jsc/webcore/response.zig329
1 files changed, 114 insertions, 215 deletions
diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig
index 137ce35b8..3e42c174a 100644
--- a/src/javascript/jsc/webcore/response.zig
+++ b/src/javascript/jsc/webcore/response.zig
@@ -1557,7 +1557,7 @@ pub const Blob = struct {
is_all_ascii: ?bool = null,
allocator: std.mem.Allocator,
- // 8 MB ought to be enough
+ // 2 MB ought to be enough
pub const max_chunk_size = 1024 * 1024 * 2;
pub export fn BlobStore__ref(ptr: *anyopaque) void {
@@ -4988,6 +4988,12 @@ pub const FetchEvent = struct {
}
};
+pub const StreamStart = union(enum) {
+ empty: void,
+ err: JSC.Node.Syscall.Error,
+ chunk_size: Blob.SizeType,
+};
+
pub const StreamResult = union(enum) {
owned: bun.ByteList,
owned_and_done: bun.ByteList,
@@ -5000,8 +5006,8 @@ pub const StreamResult = union(enum) {
done: void,
pub const IntoArray = struct {
- value: JSValue,
- len: usize = std.math.maxInt(usize),
+ value: JSValue = JSValue.zero,
+ len: Blob.SizeType = std.math.maxInt(Blob.SizeType),
};
pub const Pending = struct {
@@ -5010,6 +5016,13 @@ pub const StreamResult = union(enum) {
used: bool = false,
};
+ pub fn isDone(this: *const StreamResult) bool {
+ return switch (this.*) {
+ .owned_and_done, .temporary_and_done, .into_array_and_done, .done, .err => true,
+ else => false,
+ };
+ }
+
fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
suspend {}
@@ -5038,10 +5051,10 @@ pub const StreamResult = union(enum) {
pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue {
switch (this.*) {
.owned => |list| {
- return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJSAutoAllocator(globalThis.ref(), null);
+ return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null);
},
.owned_and_done => |list| {
- return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJSAutoAllocator(globalThis.ref(), null);
+ return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null);
},
.temporary => |temp| {
var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len);
@@ -5056,10 +5069,10 @@ pub const StreamResult = union(enum) {
return array;
},
.into_array => |array| {
- return array.value;
+ return JSC.JSValue.jsNumberFromInt64(array.len);
},
.into_array_and_done => |array| {
- return array.value;
+ return JSC.JSValue.jsNumberFromInt64(array.len);
},
.pending => |pending| {
var promise = JSC.JSPromise.create(globalThis);
@@ -5115,7 +5128,7 @@ pub fn WritableStreamSink(
return onWrite(&this.context, bytes);
}
- pub fn start(this: *This) StreamResult {
+ pub fn start(this: *This) StreamStart {
return onStart(&this.context);
}
@@ -5291,21 +5304,21 @@ pub fn ReadableStreamSource(
const This = @This();
const ReadableStreamSourceType = @This();
- pub fn pull(this: *This) StreamResult {
- return onPull(&this.context);
+ pub fn pull(this: *This, buf: []u8) StreamResult {
+ return onPull(&this.context, buf, JSValue.zero);
}
pub fn start(
this: *This,
- ) StreamResult {
+ ) StreamStart {
return onStart(&this.context);
}
- pub fn pullFromJS(this: *This) StreamResult {
- return onPull(&this.context);
+ pub fn pullFromJS(this: *This, buf: []u8, view: JSValue) StreamResult {
+ return onPull(&this.context, buf, view);
}
- pub fn startFromJS(this: *This) StreamResult {
+ pub fn startFromJS(this: *This) StreamStart {
return onStart(&this.context);
}
@@ -5356,19 +5369,24 @@ pub fn ReadableStreamSource(
pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ const view = callFrame.argument(1);
+ view.ensureStillAlive();
+ var buffer = view.asArrayBuffer(globalThis) orelse return JSC.JSValue.jsUndefined();
return processResult(
globalThis,
callFrame,
- this.pullFromJS(),
+ this.pullFromJS(buffer.slice(), view),
);
}
pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
- return processResult(
- globalThis,
- callFrame,
- this.startFromJS(),
- );
+ switch (this.startFromJS()) {
+ .empty => return JSValue.jsNumber(0),
+ .chunk_size => |size| return JSValue.jsNumber(size),
+ .err => |err| {
+ return err.toJSC(globalThis);
+ },
+ }
}
pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue {
@@ -5475,30 +5493,35 @@ pub const ByteBlobLoader = struct {
};
}
- pub fn onStart(this: *ByteBlobLoader) StreamResult {
- return this.onPull();
+ pub fn onStart(this: *ByteBlobLoader) StreamStart {
+ return .{ .chunk_size = this.chunk_size };
}
- pub fn onPull(this: *ByteBlobLoader) StreamResult {
+ pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
if (this.done) {
return .{ .done = {} };
}
var temporary = this.store.sharedView();
temporary = temporary[this.offset..];
- temporary = temporary[0..@minimum(this.chunk_size, @minimum(temporary.len, this.remain))];
+
+ temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
if (temporary.len == 0) {
this.store.deref();
this.done = true;
return .{ .done = {} };
}
- this.remain -|= @intCast(Blob.SizeType, temporary.len);
- this.offset +|= @intCast(Blob.SizeType, temporary.len);
+ const copied = @intCast(Blob.SizeType, temporary.len);
- return .{
- .temporary = bun.ByteList.init(temporary),
- };
+ this.remain -|= copied;
+ this.offset +|= copied;
+ @memcpy(temporary.ptr, buffer.ptr, temporary.len);
+ if (this.remain == 0) {
+ return .{ .into_array_and_done = .{ .value = array, .len = copied } };
+ }
+
+ return .{ .into_array = .{ .value = array, .len = copied } };
}
pub fn onCancel(_: *ByteBlobLoader) void {}
@@ -5517,7 +5540,7 @@ pub const ByteBlobLoader = struct {
pub const FileBlobLoader = struct {
buf: []u8 = &[_]u8{},
- uint8array: JSC.JSValue = JSC.JSValue.zero,
+ protected_view: JSC.JSValue = JSC.JSValue.zero,
fd: JSC.Node.FileDescriptor = 0,
auto_close: bool = false,
loop: *JSC.EventLoop = undefined,
@@ -5538,7 +5561,7 @@ pub const FileBlobLoader = struct {
const FileReader = @This();
- const run_on_different_thread_size = 1024 * 512;
+ const run_on_different_thread_size = bun.huge_allocator_threshold;
pub const tag = ReadableStream.Tag.File;
@@ -5635,40 +5658,23 @@ pub const FileBlobLoader = struct {
scheduleMainThreadTask(this);
return;
}
+ }
- AsyncIO.global.read(
- *FileBlobLoader,
- this,
- onRead,
- &this.concurrent.completion,
- this.fd,
- this.buf[this.concurrent.read..],
- null,
- );
-
- suspend {
- var _frame = @frame();
- var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable;
- this_frame.* = _frame.*;
- this.concurrent.read_frame = this_frame;
- }
- } else {
- AsyncIO.global.read(
- *FileBlobLoader,
- this,
- onRead,
- &this.concurrent.completion,
- this.fd,
- this.buf[this.concurrent.read..],
- null,
- );
+ AsyncIO.global.read(
+ *FileBlobLoader,
+ this,
+ onRead,
+ &this.concurrent.completion,
+ this.fd,
+ this.buf[this.concurrent.read..],
+ null,
+ );
- suspend {
- var _frame = @frame();
- var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable;
- this_frame.* = _frame.*;
- this.concurrent.read_frame = this_frame;
- }
+ suspend {
+ var _frame = @frame();
+ var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable;
+ this_frame.* = _frame.*;
+ this.concurrent.read_frame = this_frame;
}
scheduleMainThreadTask(this);
@@ -5676,12 +5682,16 @@ pub const FileBlobLoader = struct {
pub fn onJSThread(task_ctx: *anyopaque) void {
var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx);
+ const protected_view = this.protected_view;
+ defer protected_view.unprotect();
+ this.protected_view = JSC.JSValue.zero;
if (this.finalized and this.scheduled_count == 0) {
if (!this.pending.used) {
resume this.pending.frame;
}
this.scheduled_count -= 1;
+
this.deinit();
return;
@@ -5702,9 +5712,12 @@ pub const FileBlobLoader = struct {
return;
}
- this.pending.result = this.handleReadChunk(this.buf, @as(usize, this.concurrent.read));
+ this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read));
resume this.pending.frame;
this.scheduled_count -= 1;
+ if (this.pending.result.isDone()) {
+ this.finalize();
+ }
}
pub fn scheduleMainThreadTask(this: *FileBlobLoader) void {
@@ -5714,14 +5727,6 @@ pub const FileBlobLoader = struct {
fn runAsync(this: *FileBlobLoader) void {
this.concurrent.read = 0;
- this.buf = bun.auto_allocator.alloc(u8, this.concurrent.chunk_size) catch {
- this.pending.result = .{ .err = JSC.Node.Syscall.Error.oom };
- this.concurrent.read = 0;
- scheduleMainThreadTask(this);
-
- return;
- };
- _ = bun.C.posix_madvise(this.buf.ptr, this.buf.len, 2);
Concurrent.scheduleRead(this);
@@ -5742,7 +5747,7 @@ pub const FileBlobLoader = struct {
const default_fifo_chunk_size = 1024;
const default_file_chunk_size = 1024 * 1024 * 2;
- pub fn onStart(this: *FileBlobLoader) StreamResult {
+ pub fn onStart(this: *FileBlobLoader) StreamStart {
var file = &this.store.data.file;
var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
var auto_close = this.auto_close;
@@ -5800,15 +5805,6 @@ pub const FileBlobLoader = struct {
return .{ .err = err };
}
- // if (comptime Environment.isMac) {
- // if (std.os.S.ISSOCK(stat.mode)) {
- // // darwin doesn't support os.MSG.NOSIGNAL,
- // // but instead a socket option to avoid SIGPIPE.
- // const _bytes = &std.mem.toBytes(@as(c_int, 1));
- // _ = std.os.darwin.setsockopt(fd, std.os.SOL.SOCKET, std.os.SO.NOSIGPIPE, _bytes, @intCast(std.os.socklen_t, _bytes.len));
- // }
- // }
-
file.seekable = std.os.S.ISREG(stat.mode);
file.mode = @intCast(JSC.Node.Mode, stat.mode);
this.mode = file.mode;
@@ -5821,45 +5817,14 @@ pub const FileBlobLoader = struct {
_ = JSC.Node.Syscall.close(fd);
}
this.deinit();
- return .{ .done = {} };
+ return .{ .empty = {} };
}
this.fd = fd;
this.auto_close = auto_close;
const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
-
- if (chunk_size >= run_on_different_thread_size) {
- // should never be reached
- this.pending.result = .{
- .err = JSC.Node.Syscall.Error.todo,
- };
-
- this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
-
- return .{ .pending = &this.pending };
- }
-
- if (this.allocateBuffer(chunk_size)) |err| {
- this.deinit();
- return .{ .err = err };
- }
-
- return this.read(this.buf);
- }
-
- // Disabled because it's not fully implemented
- // Maybe we should allocate as an ArrayBuffer instead of a Uint8Array?
- fn shouldUseJSCHeap(this: *const FileBlobLoader, chunk_size: Blob.SizeType) bool {
- _ = this;
- _ = chunk_size;
- return false;
- // const file = &this.store.data.file;
-
- // if (!(file.seekable orelse false))
- // return false;
-
- // return file.max_size - this.total_read >= chunk_size;
+ return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
}
fn calculateChunkSize(this: *FileBlobLoader, available_to_read: usize) usize {
@@ -5878,55 +5843,32 @@ pub const FileBlobLoader = struct {
@minimum(available_to_read, chunk_size);
}
- fn allocateBuffer(this: *FileBlobLoader, chunk_size: usize) ?JSC.Node.Syscall.Error {
- var file = &this.store.data.file;
-
- if (this.shouldUseJSCHeap(@intCast(Blob.SizeType, chunk_size))) {
- this.uint8array = JSValue.createUninitializedUint8Array(this.loop.global, chunk_size);
- this.uint8array.ensureStillAlive();
- this.buf = this.uint8array.asArrayBuffer(this.loop.global).?.slice();
- } else {
- this.buf = bun.auto_allocator.alloc(
- u8,
- chunk_size,
- ) catch {
- this.maybeAutoClose();
- return JSC.Node.Syscall.Error.oom.withPath(if (file.pathlike.path.slice().len > 0) file.pathlike.path.slice() else "");
- };
- }
-
- return null;
- }
-
- pub fn onPull(this: *FileBlobLoader) StreamResult {
- if (this.buf.len == 0) {
- const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
+ pub fn onPull(this: *FileBlobLoader, buffer: []u8, view: JSC.JSValue) StreamResult {
+ const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
- switch (chunk_size) {
- 0 => {
- std.debug.assert(this.store.data.file.seekable orelse false);
- this.finalize();
- return .{ .done = {} };
- },
- run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
- // should never be reached
- this.pending.result = .{
- .err = JSC.Node.Syscall.Error.todo,
- };
+ switch (chunk_size) {
+ 0 => {
+ std.debug.assert(this.store.data.file.seekable orelse false);
+ this.finalize();
+ return .{ .done = {} };
+ },
+ run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
+ this.protected_view = view;
+ this.protected_view.protect();
+ // should never be reached
+ this.pending.result = .{
+ .err = JSC.Node.Syscall.Error.todo,
+ };
+ this.buf = buffer;
- this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
+ this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
- return .{ .pending = &this.pending };
- },
- else => {
- if (this.allocateBuffer(chunk_size)) |err| {
- return .{ .err = err };
- }
- },
- }
+ return .{ .pending = &this.pending };
+ },
+ else => {},
}
- return this.read(this.buf);
+ return this.read(buffer, view);
}
fn maybeAutoClose(this: *FileBlobLoader) void {
@@ -5936,7 +5878,7 @@ pub const FileBlobLoader = struct {
}
}
- fn handleReadChunk(this: *FileBlobLoader, read_buf: []u8, result: usize) StreamResult {
+ fn handleReadChunk(this: *FileBlobLoader, result: usize) StreamResult {
this.total_read += @intCast(Blob.SizeType, result);
const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false)
this.store.data.file.max_size -| this.total_read
@@ -5954,52 +5896,16 @@ pub const FileBlobLoader = struct {
const has_more = remaining > 0;
if (!has_more) {
- this.uint8array.ensureStillAlive();
-
- defer {
- this.uint8array.ensureStillAlive();
- this.buf = &.{};
- this.uint8array = JSValue.zero;
- this.finalize();
- }
-
- if (this.uint8array.isEmpty()) {
- return .{
- .owned_and_done = bun.ByteList.init(read_buf[0..result]),
- };
- } else {
- return .{
- .into_array_and_done = .{
- .value = this.uint8array,
- .len = result,
- },
- };
- }
+ return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result) } };
}
- if (this.uint8array.isEmpty()) {
- defer this.buf = &.{};
- return .{
- .owned = bun.ByteList.init(read_buf[0..result]),
- };
- } else {
- defer {
- this.buf = &.{};
- this.uint8array = JSValue.zero;
- }
-
- return .{
- .into_array = .{
- .value = this.uint8array,
- .len = result,
- },
- };
- }
+ return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result) } };
}
pub fn read(
this: *FileBlobLoader,
read_buf: []u8,
+ view: JSC.JSValue,
) StreamResult {
const rc =
JSC.Node.Syscall.read(this.fd, read_buf);
@@ -6013,6 +5919,9 @@ pub const FileBlobLoader = struct {
switch (err.getErrno()) {
retry => {
+ this.protected_view = view;
+ this.protected_view.protect();
+ this.buf = read_buf;
this.watch();
return .{
.pending = &this.pending,
@@ -6029,7 +5938,7 @@ pub const FileBlobLoader = struct {
return .{ .err = sys };
},
.result => |result| {
- return this.handleReadChunk(read_buf, result);
+ return this.handleReadChunk(result);
},
}
}
@@ -6037,6 +5946,9 @@ pub const FileBlobLoader = struct {
pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void {
var this: *FileReader = bun.cast(*FileReader, task.?);
this.scheduled_count -= 1;
+ const protected_view = this.protected_view;
+ defer protected_view.unprotect();
+ this.protected_view = JSValue.zero;
var available_to_read: usize = std.math.maxInt(usize);
if (comptime Environment.isMac) {
@@ -6064,16 +5976,12 @@ pub const FileBlobLoader = struct {
return;
if (this.buf.len == 0) {
- if (this.allocateBuffer(available_to_read)) |err| {
- this.pending.result = .{ .err = err };
- resume this.pending.frame;
- return;
- }
+ return;
} else {
this.buf.len = @minimum(this.buf.len, available_to_read);
}
- this.pending.result = this.read(this.buf);
+ this.pending.result = this.read(this.buf, this.protected_view);
resume this.pending.frame;
}
@@ -6081,15 +5989,6 @@ pub const FileBlobLoader = struct {
if (this.finalized)
return;
this.finalized = true;
- if (this.buf.len > 0) {
- if (this.uint8array.isEmpty()) {
- bun.default_allocator.free(this.buf);
- this.buf = &.{};
- } else {
- this.buf = &.{};
- this.uint8array = JSValue.zero;
- }
- }
this.maybeAutoClose();